]> www.average.org Git - loctrkd.git/blob - gps303/gps303proto.py
make parse_message return UNKNOWN on parse crash
[loctrkd.git] / gps303 / gps303proto.py
1 """
2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
5
6 Forewarnings:
7 1. There is no security whatsoever. If you know the module's IMEI,
8    you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10    the TCP stream) makes it vulnerable to coincidental appearance
11    of framing bytes in the middle of the message. Most of the time
12    the server will receive one message in one TCP segment (i.e. in
13    one `recv()` operation, but relying on that would break things
14    if the path has lower MTU than the size of a message.
15 """
16
17 from datetime import datetime, timezone
18 from enum import Enum
19 from inspect import isclass
20 from struct import error, pack, unpack
21
22 __all__ = (
23     "class_by_prefix",
24     "inline_response",
25     "parse_message",
26     "proto_by_name",
27     "DecodeError",
28     "Respond",
29     "GPS303Pkt",
30     "UNKNOWN",
31     "LOGIN",
32     "SUPERVISION",
33     "HEARTBEAT",
34     "GPS_POSITIONING",
35     "GPS_OFFLINE_POSITIONING",
36     "STATUS",
37     "HIBERNATION",
38     "RESET",
39     "WHITELIST_TOTAL",
40     "WIFI_OFFLINE_POSITIONING",
41     "TIME",
42     "PROHIBIT_LBS",
43     "GPS_LBS_SWITCH_TIMES",
44     "REMOTE_MONITOR_PHONE",
45     "SOS_PHONE",
46     "DAD_PHONE",
47     "MOM_PHONE",
48     "STOP_UPLOAD",
49     "GPS_OFF_PERIOD",
50     "DND_PERIOD",
51     "RESTART_SHUTDOWN",
52     "DEVICE",
53     "ALARM_CLOCK",
54     "STOP_ALARM",
55     "SETUP",
56     "SYNCHRONOUS_WHITELIST",
57     "RESTORE_PASSWORD",
58     "WIFI_POSITIONING",
59     "MANUAL_POSITIONING",
60     "BATTERY_CHARGE",
61     "CHARGER_CONNECTED",
62     "CHARGER_DISCONNECTED",
63     "VIBRATION_RECEIVED",
64     "POSITION_UPLOAD_INTERVAL",
65     "SOS_ALARM",
66     "UNKNOWN_B3",
67 )
68
69
70 class DecodeError(Exception):
71     def __init__(self, e, **kwargs):
72         super().__init__(e)
73         for k, v in kwargs.items():
74             setattr(self, k, v)
75
76 def intx(x):
77     if isinstance(x, str):
78         x = int(x, 0)
79     return x
80
81
82 def hhmm(x):
83     """Check for the string that represents hours and minutes"""
84     if not isinstance(x, str) or len(x) != 4:
85         raise ValueError(str(x) + " is not a four-character string")
86     hh = int(x[:2])
87     mm = int(x[2:])
88     if hh < 0 or hh > 23 or mm < 0 or mm > 59:
89         raise ValueError(str(x) + " does not contain valid hours and minutes")
90     return x
91
92
93 def l3str(x):
94     if isinstance(x, str):
95         x = x.split(",")
96     if len(x) != 3 or not all(isinstance(el, str) for el in x):
97         raise ValueError(str(x) + " is not a list of three strings")
98     return x
99
100
101 def l3int(x):
102     if isinstance(x, str):
103         x = x.split(",")
104         x = [int(el) for el in x]
105     if len(x) != 3 or not all(isinstance(el, int) for el in x):
106         raise ValueError(str(x) + " is not a list of three integers")
107     return x
108
109
110 class MetaPkt(type):
111     """
112     For each class corresponding to a message, automatically create
113     two nested classes `In` and `Out` that also inherit from their
114     "nest". Class attribute `IN_KWARGS` defined in the "nest" is
115     copied to the `In` nested class under the name `KWARGS`, and
116     likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
117     to the nested class `Out`. In addition, method `encode` is
118     defined in both classes equal to `in_encode()` and `out_encode()`
119     respectively.
120     """
121
122     def __new__(cls, name, bases, attrs):
123         newcls = super().__new__(cls, name, bases, attrs)
124         newcls.In = super().__new__(
125             cls,
126             name + ".In",
127             (newcls,) + bases,
128             {
129                 "KWARGS": newcls.IN_KWARGS,
130                 "decode": newcls.in_decode,
131                 "encode": newcls.in_encode,
132             },
133         )
134         newcls.Out = super().__new__(
135             cls,
136             name + ".Out",
137             (newcls,) + bases,
138             {
139                 "KWARGS": newcls.OUT_KWARGS,
140                 "decode": newcls.out_decode,
141                 "encode": newcls.out_encode,
142             },
143         )
144         return newcls
145
146
147 class Respond(Enum):
148     NON = 0  # Incoming, no response needed
149     INL = 1  # Birirectional, use `inline_response()`
150     EXT = 2  # Birirectional, use external responder
151
152
153 class GPS303Pkt(metaclass=MetaPkt):
154     RESPOND = Respond.NON  # Do not send anything back by default
155     PROTO: int
156     IN_KWARGS = ()
157     OUT_KWARGS = ()
158
159     def __init__(self, *args, **kwargs):
160         """
161         Construct the object _either_ from (length, payload),
162         _or_ from the values of individual fields
163         """
164         assert not args or (len(args) == 2 and not kwargs)
165         if args:  # guaranteed to be two arguments at this point
166             self.length, self.payload = args
167             try:
168                 self.decode(self.length, self.payload)
169             except error as e:
170                 raise DecodeError(e, obj=self)
171         else:
172             for kw, typ, dfl in self.KWARGS:
173                 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
174             if kwargs:
175                 raise ValueError(
176                     self.__class__.__name__ + " stray kwargs " + str(kwargs)
177                 )
178
179     def __repr__(self):
180         return "{}({})".format(
181             self.__class__.__name__,
182             ", ".join(
183                 "{}={}".format(
184                     k,
185                     'bytes.fromhex("{}")'.format(v.hex())
186                     if isinstance(v, bytes)
187                     else v.__repr__(),
188                 )
189                 for k, v in self.__dict__.items()
190                 if not k.startswith("_")
191             ),
192         )
193
194     def in_decode(self, length, packet):
195         # Overridden in subclasses, otherwise do not decode payload
196         return
197
198     def out_decode(self, length, packet):
199         # Overridden in subclasses, otherwise do not decode payload
200         return
201
202     def in_encode(self):
203         # Necessary to emulate terminal, which is not implemented
204         raise NotImplementedError(
205             self.__class__.__name__ + ".encode() not implemented"
206         )
207
208     def out_encode(self):
209         # Overridden in subclasses, otherwise make empty payload
210         return b""
211
212     @property
213     def packed(self):
214         payload = self.encode()
215         length = len(payload) + 1
216         return pack("BB", length, self.PROTO) + payload
217
218
219 class UNKNOWN(GPS303Pkt):
220     PROTO = 256  # > 255 is impossible in real packets
221
222
223 class LOGIN(GPS303Pkt):
224     PROTO = 0x01
225     RESPOND = Respond.INL
226     # Default response for ACK, can also respond with STOP_UPLOAD
227
228     def in_decode(self, length, payload):
229         self.imei = payload[:-1].hex()
230         self.ver = unpack("B", payload[-1:])[0]
231         return self
232
233
234 class SUPERVISION(GPS303Pkt):
235     PROTO = 0x05
236     OUT_KWARGS = (("status", int, 1),)
237
238     def out_encode(self):
239         # 1: The device automatically answers Pickup effect
240         # 2: Automatically Answering Two-way Calls
241         # 3: Ring manually answer the two-way call
242         return pack("B", self.status)
243
244
245 class HEARTBEAT(GPS303Pkt):
246     PROTO = 0x08
247     RESPOND = Respond.INL
248
249
250 class _GPS_POSITIONING(GPS303Pkt):
251     RESPOND = Respond.INL
252
253     def in_decode(self, length, payload):
254         self.dtime = payload[:6]
255         if self.dtime == b"\0\0\0\0\0\0":
256             self.devtime = None
257         else:
258             yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
259             self.devtime = datetime(
260                 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
261             )
262         self.gps_data_length = payload[6] >> 4
263         self.gps_nb_sat = payload[6] & 0x0F
264         lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
265         self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
266         flip_lon = bool(flags & 0b0000100000000000)  # bit 4
267         flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
268         self.heading = flags & 0b0000001111111111  # bits 6 - last
269         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
270         self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
271         self.speed = speed
272         self.flags = flags
273         return self
274
275     def out_encode(self):
276         tup = datetime.utcnow().timetuple()
277         ttup = (tup[0] % 100,) + tup[1:6]
278         return pack("BBBBBB", *ttup)
279
280
281 class GPS_POSITIONING(_GPS_POSITIONING):
282     PROTO = 0x10
283
284
285 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
286     PROTO = 0x11
287
288
289 class STATUS(GPS303Pkt):
290     PROTO = 0x13
291     RESPOND = Respond.EXT
292     OUT_KWARGS = (("upload_interval", int, 25),)
293
294     def in_decode(self, length, payload):
295         self.batt, self.ver, self.timezone, self.intvl = unpack(
296             "BBBB", payload[:4]
297         )
298         if len(payload) > 4:
299             self.signal = payload[4]
300         else:
301             self.signal = None
302         return self
303
304     def out_encode(self):  # Set interval in minutes
305         return pack("B", self.upload_interval)
306
307
308 class HIBERNATION(GPS303Pkt):  # Server can send to send devicee to sleep
309     PROTO = 0x14
310
311
312 class RESET(GPS303Pkt):
313     # Device sends when it got reset SMS
314     # Server can send to initiate factory reset
315     PROTO = 0x15
316
317
318 class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
319     PROTO = 0x16
320     OUT_KWARGS = (("number", int, 3),)
321
322     def out_encode(self):  # Number of whitelist entries
323         return pack("B", number)
324
325
326 class _WIFI_POSITIONING(GPS303Pkt):
327     def in_decode(self, length, payload):
328         self.dtime = payload[:6]
329         if self.dtime == b"\0\0\0\0\0\0":
330             self.devtime = None
331         else:
332             self.devtime = datetime.strptime(
333                 self.dtime.hex(), "%y%m%d%H%M%S"
334             ).astimezone(tz=timezone.utc)
335         self.wifi_aps = []
336         for i in range(self.length):  # length has special meaning here
337             slice = payload[6 + i * 7 : 13 + i * 7]
338             self.wifi_aps.append(
339                 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
340             )
341         gsm_slice = payload[6 + self.length * 7 :]
342         ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
343         self.gsm_cells = []
344         for i in range(ncells):
345             slice = gsm_slice[4 + i * 5 : 9 + i * 5]
346             locac, cellid, sigstr = unpack(
347                 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
348             )
349             self.gsm_cells.append((locac, cellid, -sigstr))
350         return self
351
352
353 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
354     PROTO = 0x17
355     RESPOND = Respond.INL
356
357     def out_encode(self):
358         return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
359
360
361 class TIME(GPS303Pkt):
362     PROTO = 0x30
363     RESPOND = Respond.INL
364
365     def out_encode(self):
366         return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
367
368
369 class PROHIBIT_LBS(GPS303Pkt):
370     PROTO = 0x33
371     OUT_KWARGS = (("status", int, 1),)
372
373     def out_encode(self):  # Server sent, 0-off, 1-on
374         return pack("B", self.status)
375
376
377 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
378     PROTO = 0x34
379
380     # Data is in packed decimal
381     # 00/01 - GPS on/off
382     # 00/01 - Don't set / Set upload period
383     # HHMMHHMM - Upload period
384     # 00/01 - LBS on/off
385     # 00/01 - Don't set / Set time of boot
386     # HHMM  - Time of boot
387     # 00/01 - Don't set / Set time of shutdown
388     # HHMM  - Time of shutdown
389     def out_encode(self):
390         return b""  # TODO
391
392
393 class _SET_PHONE(GPS303Pkt):
394     OUT_KWARGS = (("phone", str, ""),)
395
396     def out_encode(self):
397         return self.phone.encode()
398
399
400 class REMOTE_MONITOR_PHONE(_SET_PHONE):
401     PROTO = 0x40
402
403
404 class SOS_PHONE(_SET_PHONE):
405     PROTO = 0x41
406
407
408 class DAD_PHONE(_SET_PHONE):
409     PROTO = 0x42
410
411
412 class MOM_PHONE(_SET_PHONE):
413     PROTO = 0x43
414
415
416 class STOP_UPLOAD(GPS303Pkt):  # Server response to LOGIN to thwart the device
417     PROTO = 0x44
418
419
420 class GPS_OFF_PERIOD(GPS303Pkt):
421     PROTO = 0x46
422     OUT_KWARGS = (
423         ("onoff", int, 0),
424         ("fm", hhmm, "0000"),
425         ("to", hhmm, "2359"),
426     )
427
428     def out_encode(self):
429         return (
430             pack("B", self.onoff)
431             + bytes.fromhex(self.fm)
432             + bytes.fromhex(self.to)
433         )
434
435
436 class DND_PERIOD(GPS303Pkt):
437     PROTO = 0x47
438     OUT_KWARGS = (
439         ("onoff", int, 0),
440         ("week", int, 3),
441         ("fm1", hhmm, "0000"),
442         ("to1", hhmm, "2359"),
443         ("fm2", hhmm, "0000"),
444         ("to2", hhmm, "2359"),
445     )
446
447     def out_endode(self):
448         return (
449             pack("B", self.onoff)
450             + pack("B", self.week)
451             + bytes.fromhex(self.fm1)
452             + bytes.fromhex(self.to1)
453             + bytes.fromhex(self.fm2)
454             + bytes.fromhex(self.to2)
455         )
456
457
458 class RESTART_SHUTDOWN(GPS303Pkt):
459     PROTO = 0x48
460     OUT_KWARGS = (("flag", int, 0),)
461
462     def out_encode(self):
463         # 1 - restart
464         # 2 - shutdown
465         return pack("B", self.flag)
466
467
468 class DEVICE(GPS303Pkt):
469     PROTO = 0x49
470     OUT_KWARGS = (("flag", int, 0),)
471
472     # 0 - Stop looking for equipment
473     # 1 - Start looking for equipment
474     def out_encode(self):
475         return pack("B", self.flag)
476
477
478 class ALARM_CLOCK(GPS303Pkt):
479     PROTO = 0x50
480
481     def out_encode(self):
482         # TODO implement parsing kwargs
483         alarms = ((0, "0000"), (0, "0000"), (0, "0000"))
484         return b"".join(
485             cls("B", day) + bytes.fromhex(tm) for day, tm in alarms
486         )
487
488
489 class STOP_ALARM(GPS303Pkt):
490     PROTO = 0x56
491
492     def in_decode(self, length, payload):
493         self.flag = payload[0]
494         return self
495
496
497 class SETUP(GPS303Pkt):
498     PROTO = 0x57
499     RESPOND = Respond.EXT
500     OUT_KWARGS = (
501         ("uploadintervalseconds", intx, 0x0300),
502         ("binaryswitch", intx, 0b00110001),
503         ("alarms", l3int, [0, 0, 0]),
504         ("dndtimeswitch", int, 0),
505         ("dndtimes", l3int, [0, 0, 0]),
506         ("gpstimeswitch", int, 0),
507         ("gpstimestart", int, 0),
508         ("gpstimestop", int, 0),
509         ("phonenumbers", l3str, ["", "", ""]),
510     )
511
512     def out_encode(self):
513         def pack3b(x):
514             return pack("!I", x)[1:]
515
516         return b"".join(
517             [
518                 pack("!H", self.uploadintervalseconds),
519                 pack("B", self.binaryswitch),
520             ]
521             + [pack3b(el) for el in self.alarms]
522             + [
523                 pack("B", self.dndtimeswitch),
524             ]
525             + [pack3b(el) for el in self.dndtimes]
526             + [
527                 pack("B", self.gpstimeswitch),
528                 pack("!H", self.gpstimestart),
529                 pack("!H", self.gpstimestop),
530             ]
531             + [b";".join([el.encode() for el in self.phonenumbers])]
532         )
533
534
535 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
536     PROTO = 0x58
537
538
539 class RESTORE_PASSWORD(GPS303Pkt):
540     PROTO = 0x67
541
542
543 class WIFI_POSITIONING(_WIFI_POSITIONING):
544     PROTO = 0x69
545     RESPOND = Respond.EXT
546     OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
547
548     def out_encode(self):
549         if self.latitude is None or self.longitude is None:
550             return b""
551         return "{:+#010.8g},{:+#010.8g}".format(
552             self.latitude, self.longitude
553         ).encode()
554
555     def out_decode(self, length, payload):
556         lat, lon = payload.decode().split(",")
557         self.latitude = float(lat)
558         self.longitude = float(lon)
559
560
561 class MANUAL_POSITIONING(GPS303Pkt):
562     PROTO = 0x80
563
564     def in_decode(self, length, payload):
565         self.flag = payload[0] if len(payload) > 0 else None
566         self.reason = {
567             1: "Incorrect time",
568             2: "LBS less",
569             3: "WiFi less",
570             4: "LBS search > 3 times",
571             5: "Same LBS and WiFi data",
572             6: "LBS prohibited, WiFi absent",
573             7: "GPS spacing < 50 m",
574         }.get(self.flag, "Unknown")
575         return self
576
577
578 class BATTERY_CHARGE(GPS303Pkt):
579     PROTO = 0x81
580
581
582 class CHARGER_CONNECTED(GPS303Pkt):
583     PROTO = 0x82
584
585
586 class CHARGER_DISCONNECTED(GPS303Pkt):
587     PROTO = 0x83
588
589
590 class VIBRATION_RECEIVED(GPS303Pkt):
591     PROTO = 0x94
592
593
594 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
595     PROTO = 0x98
596     RESPOND = Respond.EXT
597     OUT_KWARGS = (("interval", int, 10),)
598
599     def in_decode(self, length, payload):
600         self.interval = unpack("!H", payload[:2])
601         return self
602
603     def out_encode(self):
604         return pack("!H", interval)
605
606
607 class SOS_ALARM(GPS303Pkt):
608     PROTO = 0x99
609
610
611 class UNKNOWN_B3(GPS303Pkt):
612     PROTO = 0xB3
613     IN_KWARGS = (("asciidata", str, ""),)
614
615     def in_decode(self, length, payload):
616         self.asciidata = payload.decode()
617         return self
618
619
620 # Build dicts protocol number -> class and class name -> protocol number
621 CLASSES = {}
622 PROTOS = {}
623 if True:  # just to indent the code, sorry!
624     for cls in [
625         cls
626         for name, cls in globals().items()
627         if isclass(cls)
628         and issubclass(cls, GPS303Pkt)
629         and not name.startswith("_")
630     ]:
631         if hasattr(cls, "PROTO"):
632             CLASSES[cls.PROTO] = cls
633             PROTOS[cls.__name__] = cls.PROTO
634
635
636 def class_by_prefix(prefix):
637     lst = [
638         (name, proto)
639         for name, proto in PROTOS.items()
640         if name.upper().startswith(prefix.upper())
641     ]
642     if len(lst) != 1:
643         return lst
644     _, proto = lst[0]
645     return CLASSES[proto]
646
647
648 def proto_by_name(name):
649     return PROTOS.get(name, -1)
650
651
652 def proto_of_message(packet):
653     return unpack("B", packet[1:2])[0]
654
655
656 def inline_response(packet):
657     proto = proto_of_message(packet)
658     if proto in CLASSES:
659         cls = CLASSES[proto]
660         if cls.RESPOND is Respond.INL:
661             return cls.Out().packed
662     return None
663
664
665 def parse_message(packet, is_incoming=True):
666     """From a packet (without framing bytes) derive the XXX.In object"""
667     length, proto = unpack("BB", packet[:2])
668     payload = packet[2:]
669     if proto not in CLASSES:
670         cause = ValueError(f"Proto {proto} is unknown")
671     else:
672         try:
673             if is_incoming:
674                 return CLASSES[proto].In(length, payload)
675             else:
676                 return CLASSES[proto].Out(length, payload)
677         except DecodeError as e:
678             cause = e
679     if is_incoming:
680         retobj = UNKNOWN.In(length, payload)
681     else:
682         retobj = UNKNOWN.Out(length, payload)
683     retobj.PROTO = proto  # Override class attr with object attr
684     retobj.cause = cause
685     return retobj