]> www.average.org Git - loctrkd.git/blob - gps303/gps303proto.py
Make constructor of protocol objects more elegant
[loctrkd.git] / gps303 / gps303proto.py
1 """
2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
5
6 Forewarnings:
7 1. There is no security whatsoever. If you know the module's IMEI,
8    you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10    the TCP stream) makes it vulnerable to coincidental appearance
11    of framing bytes in the middle of the message. Most of the time
12    the server will receive one message in one TCP segment (i.e. in
13    one `recv()` operation, but relying on that would break things
14    if the path has lower MTU than the size of a message.
15 """
16
17 from datetime import datetime, timezone
18 from enum import Enum
19 from inspect import isclass
20 from struct import pack, unpack
21
22 __all__ = (
23     "class_by_prefix",
24     "inline_response",
25     "parse_message",
26     "proto_by_name",
27     "Respond",
28     "GPS303Pkt",
29     "UNKNOWN",
30     "LOGIN",
31     "SUPERVISION",
32     "HEARTBEAT",
33     "GPS_POSITIONING",
34     "GPS_OFFLINE_POSITIONING",
35     "STATUS",
36     "HIBERNATION",
37     "RESET",
38     "WHITELIST_TOTAL",
39     "WIFI_OFFLINE_POSITIONING",
40     "TIME",
41     "PROHIBIT_LBS",
42     "GPS_LBS_SWITCH_TIMES",
43     "REMOTE_MONITOR_PHONE",
44     "SOS_PHONE",
45     "DAD_PHONE",
46     "MOM_PHONE",
47     "STOP_UPLOAD",
48     "GPS_OFF_PERIOD",
49     "DND_PERIOD",
50     "RESTART_SHUTDOWN",
51     "DEVICE",
52     "ALARM_CLOCK",
53     "STOP_ALARM",
54     "SETUP",
55     "SYNCHRONOUS_WHITELIST",
56     "RESTORE_PASSWORD",
57     "WIFI_POSITIONING",
58     "MANUAL_POSITIONING",
59     "BATTERY_CHARGE",
60     "CHARGER_CONNECTED",
61     "CHARGER_DISCONNECTED",
62     "VIBRATION_RECEIVED",
63     "POSITION_UPLOAD_INTERVAL",
64     "SOS_ALARM",
65     "UNKNOWN_B3",
66 )
67
68
69 def intx(x):
70     if isinstance(x, str):
71         x = int(x, 0)
72     return x
73
74
75 def hhmm(x):
76     """Check for the string that represents hours and minutes"""
77     if not isinstance(x, str) or len(x) != 4:
78         raise ValueError(str(x) + " is not a four-character string")
79     hh = int(x[:2])
80     mm = int(x[2:])
81     if hh < 0 or hh > 23 or mm < 0 or mm > 59:
82         raise ValueError(str(x) + " does not contain valid hours and minutes")
83     return x
84
85
86 def l3str(x):
87     if isinstance(x, str):
88         x = x.split(",")
89     if len(x) != 3 or not all(isinstance(el, str) for el in x):
90         raise ValueError(str(x) + " is not a list of three strings")
91     return x
92
93
94 def l3int(x):
95     if isinstance(x, str):
96         x = x.split(",")
97         x = [int(el) for el in x]
98     if len(x) != 3 or not all(isinstance(el, int) for el in x):
99         raise ValueError(str(x) + " is not a list of three integers")
100     return x
101
102
103 class MetaPkt(type):
104     """
105     For each class corresponding to a message, automatically create
106     two nested classes `In` and `Out` that also inherit from their
107     "nest". Class attribute `IN_KWARGS` defined in the "nest" is
108     copied to the `In` nested class under the name `KWARGS`, and
109     likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
110     to the nested class `Out`. In addition, method `encode` is
111     defined in both classes equal to `in_encode()` and `out_encode()`
112     respectively.
113     """
114
115     def __new__(cls, name, bases, attrs):
116         newcls = super().__new__(cls, name, bases, attrs)
117         newcls.In = super().__new__(
118             cls,
119             name + ".In",
120             (newcls,) + bases,
121             {
122                 "KWARGS": newcls.IN_KWARGS,
123                 "decode": newcls.in_decode,
124                 "encode": newcls.in_encode,
125             },
126         )
127         newcls.Out = super().__new__(
128             cls,
129             name + ".Out",
130             (newcls,) + bases,
131             {
132                 "KWARGS": newcls.OUT_KWARGS,
133                 "decode": newcls.out_decode,
134                 "encode": newcls.out_encode,
135             },
136         )
137         return newcls
138
139
140 class Respond(Enum):
141     NON = 0  # Incoming, no response needed
142     INL = 1  # Birirectional, use `inline_response()`
143     EXT = 2  # Birirectional, use external responder
144
145
146 class GPS303Pkt(metaclass=MetaPkt):
147     RESPOND = Respond.NON  # Do not send anything back by default
148     PROTO: int
149     IN_KWARGS = ()
150     OUT_KWARGS = ()
151
152     def __init__(self, *args, **kwargs):
153         """
154         Construct the object _either_ from (length, payload),
155         _or_ from the values of individual fields
156         """
157         assert not args or (len(args) == 2 and not kwargs)
158         if args:  # guaranteed to be two arguments at this point
159             self.length, self.payload = args
160             self.decode(self.length, self.payload)
161         else:
162             for kw, typ, dfl in self.KWARGS:
163                 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
164             if kwargs:
165                 raise ValueError(
166                     self.__class__.__name__ + " stray kwargs " + str(kwargs)
167                 )
168
169     def __repr__(self):
170         return "{}({})".format(
171             self.__class__.__name__,
172             ", ".join(
173                 "{}={}".format(
174                     k,
175                     'bytes.fromhex("{}")'.format(v.hex())
176                     if isinstance(v, bytes)
177                     else v.__repr__(),
178                 )
179                 for k, v in self.__dict__.items()
180                 if not k.startswith("_")
181             ),
182         )
183
184     def in_decode(self, length, packet):
185         # Overridden in subclasses, otherwise do not decode payload
186         return
187
188     def out_decode(self, length, packet):
189         # Necessary to emulate terminal, which is not implemented
190         raise NotImplementedError(
191             self.__class__.__name__ + ".decode() not implemented"
192         )
193
194     def in_encode(self):
195         # Necessary to emulate terminal, which is not implemented
196         raise NotImplementedError(
197             self.__class__.__name__ + ".encode() not implemented"
198         )
199
200     def out_encode(self):
201         # Overridden in subclasses, otherwise make empty payload
202         return b""
203
204     @property
205     def packed(self):
206         payload = self.encode()
207         length = len(payload) + 1
208         return pack("BB", length, self.PROTO) + payload
209
210
211 class UNKNOWN(GPS303Pkt):
212     PROTO = 256  # > 255 is impossible in real packets
213
214
215 class LOGIN(GPS303Pkt):
216     PROTO = 0x01
217     RESPOND = Respond.INL
218     # Default response for ACK, can also respond with STOP_UPLOAD
219
220     def in_decode(self, length, payload):
221         self.imei = payload[:-1].hex()
222         self.ver = unpack("B", payload[-1:])[0]
223         return self
224
225
226 class SUPERVISION(GPS303Pkt):
227     PROTO = 0x05
228     OUT_KWARGS = (("status", int, 1),)
229
230     def out_encode(self):
231         # 1: The device automatically answers Pickup effect
232         # 2: Automatically Answering Two-way Calls
233         # 3: Ring manually answer the two-way call
234         return pack("B", self.status)
235
236
237 class HEARTBEAT(GPS303Pkt):
238     PROTO = 0x08
239     RESPOND = Respond.INL
240
241
242 class _GPS_POSITIONING(GPS303Pkt):
243     RESPOND = Respond.INL
244
245     def in_decode(self, length, payload):
246         self.dtime = payload[:6]
247         if self.dtime == b"\0\0\0\0\0\0":
248             self.devtime = None
249         else:
250             yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
251             self.devtime = datetime(
252                 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
253             )
254         self.gps_data_length = payload[6] >> 4
255         self.gps_nb_sat = payload[6] & 0x0F
256         lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
257         self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
258         flip_lon = bool(flags & 0b0000100000000000)  # bit 4
259         flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
260         self.heading = flags & 0b0000001111111111  # bits 6 - last
261         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
262         self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
263         self.speed = speed
264         self.flags = flags
265         return self
266
267     def out_encode(self):
268         tup = datetime.utcnow().timetuple()
269         ttup = (tup[0] % 100,) + tup[1:6]
270         return pack("BBBBBB", *ttup)
271
272
273 class GPS_POSITIONING(_GPS_POSITIONING):
274     PROTO = 0x10
275
276
277 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
278     PROTO = 0x11
279
280
281 class STATUS(GPS303Pkt):
282     PROTO = 0x13
283     RESPOND = Respond.EXT
284     OUT_KWARGS = (("upload_interval", int, 25),)
285
286     def in_decode(self, length, payload):
287         self.batt, self.ver, self.timezone, self.intvl = unpack(
288             "BBBB", payload[:4]
289         )
290         if len(payload) > 4:
291             self.signal = payload[4]
292         else:
293             self.signal = None
294         return self
295
296     def out_encode(self):  # Set interval in minutes
297         return pack("B", self.upload_interval)
298
299
300 class HIBERNATION(GPS303Pkt):  # Server can send to send devicee to sleep
301     PROTO = 0x14
302     RESPOND = Respond.INL
303
304
305 class RESET(GPS303Pkt):
306     # Device sends when it got reset SMS
307     # Server can send to initiate factory reset
308     PROTO = 0x15
309
310
311 class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
312     PROTO = 0x16
313     OUT_KWARGS = (("number", int, 3),)
314
315     def out_encode(self):  # Number of whitelist entries
316         return pack("B", number)
317
318
319 class _WIFI_POSITIONING(GPS303Pkt):
320     def in_decode(self, length, payload):
321         self.dtime = payload[:6]
322         if self.dtime == b"\0\0\0\0\0\0":
323             self.devtime = None
324         else:
325             self.devtime = datetime.strptime(
326                 self.dtime.hex(), "%y%m%d%H%M%S"
327             ).astimezone(tz=timezone.utc)
328         self.wifi_aps = []
329         for i in range(self.length):  # length has special meaning here
330             slice = payload[6 + i * 7 : 13 + i * 7]
331             self.wifi_aps.append(
332                 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
333             )
334         gsm_slice = payload[6 + self.length * 7 :]
335         ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
336         self.gsm_cells = []
337         for i in range(ncells):
338             slice = gsm_slice[4 + i * 5 : 9 + i * 5]
339             locac, cellid, sigstr = unpack(
340                 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
341             )
342             self.gsm_cells.append((locac, cellid, -sigstr))
343         return self
344
345
346 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
347     PROTO = 0x17
348     RESPOND = Respond.INL
349
350     def out_encode(self):
351         return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
352
353
354 class TIME(GPS303Pkt):
355     PROTO = 0x30
356     RESPOND = Respond.INL
357
358     def out_encode(self):
359         return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
360
361
362 class PROHIBIT_LBS(GPS303Pkt):
363     PROTO = 0x33
364     OUT_KWARGS = (("status", int, 1),)
365
366     def out_encode(self):  # Server sent, 0-off, 1-on
367         return pack("B", self.status)
368
369
370 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
371     PROTO = 0x34
372
373     # Data is in packed decimal
374     # 00/01 - GPS on/off
375     # 00/01 - Don't set / Set upload period
376     # HHMMHHMM - Upload period
377     # 00/01 - LBS on/off
378     # 00/01 - Don't set / Set time of boot
379     # HHMM  - Time of boot
380     # 00/01 - Don't set / Set time of shutdown
381     # HHMM  - Time of shutdown
382     def out_encode(self):
383         return b""  # TODO
384
385
386 class _SET_PHONE(GPS303Pkt):
387     OUT_KWARGS = (("phone", str, ""),)
388
389     def out_encode(self):
390         return self.phone.encode()
391
392
393 class REMOTE_MONITOR_PHONE(_SET_PHONE):
394     PROTO = 0x40
395
396
397 class SOS_PHONE(_SET_PHONE):
398     PROTO = 0x41
399
400
401 class DAD_PHONE(_SET_PHONE):
402     PROTO = 0x42
403
404
405 class MOM_PHONE(_SET_PHONE):
406     PROTO = 0x43
407
408
409 class STOP_UPLOAD(GPS303Pkt):  # Server response to LOGIN to thwart the device
410     PROTO = 0x44
411
412
413 class GPS_OFF_PERIOD(GPS303Pkt):
414     PROTO = 0x46
415     OUT_KWARGS = (
416         ("onoff", int, 0),
417         ("fm", hhmm, "0000"),
418         ("to", hhmm, "2359"),
419     )
420
421     def out_encode(self):
422         return (
423             pack("B", self.onoff)
424             + bytes.fromhex(self.fm)
425             + bytes.fromhex(self.to)
426         )
427
428
429 class DND_PERIOD(GPS303Pkt):
430     PROTO = 0x47
431     OUT_KWARGS = (
432         ("onoff", int, 0),
433         ("week", int, 3),
434         ("fm1", hhmm, "0000"),
435         ("to1", hhmm, "2359"),
436         ("fm2", hhmm, "0000"),
437         ("to2", hhmm, "2359"),
438     )
439
440     def out_endode(self):
441         return (
442             pack("B", self.onoff)
443             + pack("B", self.week)
444             + bytes.fromhex(self.fm1)
445             + bytes.fromhex(self.to1)
446             + bytes.fromhex(self.fm2)
447             + bytes.fromhex(self.to2)
448         )
449
450
451 class RESTART_SHUTDOWN(GPS303Pkt):
452     PROTO = 0x48
453     OUT_KWARGS = (("flag", int, 0),)
454
455     def out_encode(self):
456         # 1 - restart
457         # 2 - shutdown
458         return pack("B", self.flag)
459
460
461 class DEVICE(GPS303Pkt):
462     PROTO = 0x49
463     OUT_KWARGS = (("flag", int, 0),)
464
465     # 0 - Stop looking for equipment
466     # 1 - Start looking for equipment
467     def out_encode(self):
468         return pack("B", self.flag)
469
470
471 class ALARM_CLOCK(GPS303Pkt):
472     PROTO = 0x50
473
474     def out_encode(self):
475         # TODO implement parsing kwargs
476         alarms = ((0, "0000"), (0, "0000"), (0, "0000"))
477         return b"".join(
478             cls("B", day) + bytes.fromhex(tm) for day, tm in alarms
479         )
480
481
482 class STOP_ALARM(GPS303Pkt):
483     PROTO = 0x56
484
485     def in_decode(self, length, payload):
486         self.flag = payload[0]
487         return self
488
489
490 class SETUP(GPS303Pkt):
491     PROTO = 0x57
492     RESPOND = Respond.EXT
493     OUT_KWARGS = (
494         ("uploadintervalseconds", intx, 0x0300),
495         ("binaryswitch", intx, 0b00110001),
496         ("alarms", l3int, [0, 0, 0]),
497         ("dndtimeswitch", int, 0),
498         ("dndtimes", l3int, [0, 0, 0]),
499         ("gpstimeswitch", int, 0),
500         ("gpstimestart", int, 0),
501         ("gpstimestop", int, 0),
502         ("phonenumbers", l3str, ["", "", ""]),
503     )
504
505     def out_encode(self):
506         def pack3b(x):
507             return pack("!I", x)[1:]
508
509         return b"".join(
510             [
511                 pack("!H", self.uploadintervalseconds),
512                 pack("B", self.binaryswitch),
513             ]
514             + [pack3b(el) for el in self.alarms]
515             + [
516                 pack("B", self.dndtimeswitch),
517             ]
518             + [pack3b(el) for el in self.dndtimes]
519             + [
520                 pack("B", self.gpstimeswitch),
521                 pack("!H", self.gpstimestart),
522                 pack("!H", self.gpstimestop),
523             ]
524             + [b";".join([el.encode() for el in self.phonenumbers])]
525         )
526
527
528 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
529     PROTO = 0x58
530
531
532 class RESTORE_PASSWORD(GPS303Pkt):
533     PROTO = 0x67
534
535
536 class WIFI_POSITIONING(_WIFI_POSITIONING):
537     PROTO = 0x69
538     RESPOND = Respond.EXT
539     OUT_KWARGS = (("lat", float, None), ("lon", float, None))
540
541     def out_encode(self):
542         if self.lat is None or self.lon is None:
543             return b""
544         return "{:+#010.8g},{:+#010.8g}".format(self.lat, self.lon).encode()
545
546
547 class MANUAL_POSITIONING(GPS303Pkt):
548     PROTO = 0x80
549
550     def in_decode(self, length, payload):
551         self.flag = payload[0] if len(payload) > 0 else None
552         self.reason = {
553             1: "Incorrect time",
554             2: "LBS less",
555             3: "WiFi less",
556             4: "LBS search > 3 times",
557             5: "Same LBS and WiFi data",
558             6: "LBS prohibited, WiFi absent",
559             7: "GPS spacing < 50 m",
560         }.get(self.flag, "Unknown")
561         return self
562
563
564 class BATTERY_CHARGE(GPS303Pkt):
565     PROTO = 0x81
566
567
568 class CHARGER_CONNECTED(GPS303Pkt):
569     PROTO = 0x82
570
571
572 class CHARGER_DISCONNECTED(GPS303Pkt):
573     PROTO = 0x83
574
575
576 class VIBRATION_RECEIVED(GPS303Pkt):
577     PROTO = 0x94
578
579
580 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
581     PROTO = 0x98
582     RESPOND = Respond.EXT
583     OUT_KWARGS = (("interval", int, 10),)
584
585     def in_decode(self, length, payload):
586         self.interval = unpack("!H", payload[:2])
587         return self
588
589     def out_encode(self):
590         return pack("!H", interval)
591
592
593 class SOS_ALARM(GPS303Pkt):
594     PROTO = 0x99
595
596
597 class UNKNOWN_B3(GPS303Pkt):
598     PROTO = 0xB3
599     IN_KWARGS = (("asciidata", str, ""),)
600
601     def in_decode(self, length, payload):
602         self.asciidata = payload.decode()
603         return self
604
605
606 # Build dicts protocol number -> class and class name -> protocol number
607 CLASSES = {}
608 PROTOS = {}
609 if True:  # just to indent the code, sorry!
610     for cls in [
611         cls
612         for name, cls in globals().items()
613         if isclass(cls)
614         and issubclass(cls, GPS303Pkt)
615         and not name.startswith("_")
616     ]:
617         if hasattr(cls, "PROTO"):
618             CLASSES[cls.PROTO] = cls
619             PROTOS[cls.__name__] = cls.PROTO
620
621
622 def class_by_prefix(prefix):
623     lst = [
624         (name, proto)
625         for name, proto in PROTOS.items()
626         if name.upper().startswith(prefix.upper())
627     ]
628     if len(lst) != 1:
629         return lst
630     _, proto = lst[0]
631     return CLASSES[proto]
632
633
634 def proto_by_name(name):
635     return PROTOS.get(name, -1)
636
637
638 def proto_of_message(packet):
639     return unpack("B", packet[1:2])[0]
640
641
642 def inline_response(packet):
643     proto = proto_of_message(packet)
644     if proto in CLASSES:
645         cls = CLASSES[proto]
646         if cls.RESPOND is Respond.INL:
647             return cls.Out().packed
648     return None
649
650
651 def parse_message(packet):
652     """From a packet (without framing bytes) derive the XXX.In object"""
653     length, proto = unpack("BB", packet[:2])
654     payload = packet[2:]
655     if proto in CLASSES:
656         return CLASSES[proto].In(length, payload)
657     else:
658         retobj = UNKNOWN.In(length, payload)
659         retobj.PROTO = proto  # Override class attr with object attr
660         return retobj