]> www.average.org Git - loctrkd.git/blob - gps303/gps303proto.py
typing: annotate gps303proto.py (mostly)
[loctrkd.git] / gps303 / gps303proto.py
1 """
2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
5
6 Forewarnings:
7 1. There is no security whatsoever. If you know the module's IMEI,
8    you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10    the TCP stream) makes it vulnerable to coincidental appearance
11    of framing bytes in the middle of the message. Most of the time
12    the server will receive one message in one TCP segment (i.e. in
13    one `recv()` operation, but relying on that would break things
14    if the path has lower MTU than the size of a message.
15 """
16
17 from datetime import datetime, timezone
18 from enum import Enum
19 from inspect import isclass
20 from struct import error, pack, unpack
21 from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union
22
23 __all__ = (
24     "class_by_prefix",
25     "inline_response",
26     "parse_message",
27     "proto_by_name",
28     "DecodeError",
29     "Respond",
30     "GPS303Pkt",
31     "UNKNOWN",
32     "LOGIN",
33     "SUPERVISION",
34     "HEARTBEAT",
35     "GPS_POSITIONING",
36     "GPS_OFFLINE_POSITIONING",
37     "STATUS",
38     "HIBERNATION",
39     "RESET",
40     "WHITELIST_TOTAL",
41     "WIFI_OFFLINE_POSITIONING",
42     "TIME",
43     "PROHIBIT_LBS",
44     "GPS_LBS_SWITCH_TIMES",
45     "REMOTE_MONITOR_PHONE",
46     "SOS_PHONE",
47     "DAD_PHONE",
48     "MOM_PHONE",
49     "STOP_UPLOAD",
50     "GPS_OFF_PERIOD",
51     "DND_PERIOD",
52     "RESTART_SHUTDOWN",
53     "DEVICE",
54     "ALARM_CLOCK",
55     "STOP_ALARM",
56     "SETUP",
57     "SYNCHRONOUS_WHITELIST",
58     "RESTORE_PASSWORD",
59     "WIFI_POSITIONING",
60     "MANUAL_POSITIONING",
61     "BATTERY_CHARGE",
62     "CHARGER_CONNECTED",
63     "CHARGER_DISCONNECTED",
64     "VIBRATION_RECEIVED",
65     "POSITION_UPLOAD_INTERVAL",
66     "SOS_ALARM",
67     "UNKNOWN_B3",
68 )
69
70
71 class DecodeError(Exception):
72     def __init__(self, e: Exception, **kwargs: Any) -> None:
73         super().__init__(e)
74         for k, v in kwargs.items():
75             setattr(self, k, v)
76
77
78 def intx(x: Union[str, int]) -> int:
79     if isinstance(x, str):
80         x = int(x, 0)
81     return x
82
83
84 def hhmm(x: str) -> str:
85     """Check for the string that represents hours and minutes"""
86     if not isinstance(x, str) or len(x) != 4:
87         raise ValueError(str(x) + " is not a four-character string")
88     hh = int(x[:2])
89     mm = int(x[2:])
90     if hh < 0 or hh > 23 or mm < 0 or mm > 59:
91         raise ValueError(str(x) + " does not contain valid hours and minutes")
92     return x
93
94
95 def l3str(x: Union[str, List[str]]) -> List[str]:
96     if isinstance(x, str):
97         lx = x.split(",")
98     if len(lx) != 3 or not all(isinstance(el, str) for el in x):
99         raise ValueError(str(lx) + " is not a list of three strings")
100     return lx
101
102
103 def l3int(x: Union[str, List[int]]) -> List[int]:
104     if isinstance(x, str):
105         lx = [int(el) for el in x.split(",")]
106     if len(lx) != 3 or not all(isinstance(el, int) for el in lx):
107         raise ValueError(str(lx) + " is not a list of three integers")
108     return lx
109
110
111 class MetaPkt(type):
112     """
113     For each class corresponding to a message, automatically create
114     two nested classes `In` and `Out` that also inherit from their
115     "nest". Class attribute `IN_KWARGS` defined in the "nest" is
116     copied to the `In` nested class under the name `KWARGS`, and
117     likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
118     to the nested class `Out`. In addition, method `encode` is
119     defined in both classes equal to `in_encode()` and `out_encode()`
120     respectively.
121     """
122
123     def __new__(
124         cls, name: str, bases: Tuple[type, ...], attrs: Dict[str, Any]
125     ) -> "MetaPkt":
126         newcls = super().__new__(cls, name, bases, attrs)
127         setattr(
128             newcls,
129             "In",
130             super().__new__(
131                 cls,
132                 name + ".In",
133                 (newcls,) + bases,
134                 {
135                     "KWARGS": newcls.IN_KWARGS,
136                     "decode": newcls.in_decode,
137                     "encode": newcls.in_encode,
138                 },
139             ),
140         )
141         setattr(
142             newcls,
143             "Out",
144             super().__new__(
145                 cls,
146                 name + ".Out",
147                 (newcls,) + bases,
148                 {
149                     "KWARGS": newcls.OUT_KWARGS,
150                     "decode": newcls.out_decode,
151                     "encode": newcls.out_encode,
152                 },
153             ),
154         )
155         return newcls
156
157
158 class Respond(Enum):
159     NON = 0  # Incoming, no response needed
160     INL = 1  # Birirectional, use `inline_response()`
161     EXT = 2  # Birirectional, use external responder
162
163
164 class GPS303Pkt(metaclass=MetaPkt):
165     RESPOND = Respond.NON  # Do not send anything back by default
166     PROTO: int
167     IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
168     OUT_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
169     KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
170     In: Type["GPS303Pkt"]
171     Out: Type["GPS303Pkt"]
172
173     def __init__(self, *args: Any, **kwargs: Any):
174         """
175         Construct the object _either_ from (length, payload),
176         _or_ from the values of individual fields
177         """
178         assert not args or (len(args) == 2 and not kwargs)
179         if args:  # guaranteed to be two arguments at this point
180             self.length, self.payload = args
181             try:
182                 self.decode(self.length, self.payload)
183             except error as e:
184                 raise DecodeError(e, obj=self)
185         else:
186             for kw, typ, dfl in self.KWARGS:
187                 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
188             if kwargs:
189                 raise ValueError(
190                     self.__class__.__name__ + " stray kwargs " + str(kwargs)
191                 )
192
193     def __repr__(self) -> str:
194         return "{}({})".format(
195             self.__class__.__name__,
196             ", ".join(
197                 "{}={}".format(
198                     k,
199                     'bytes.fromhex("{}")'.format(v.hex())
200                     if isinstance(v, bytes)
201                     else v.__repr__(),
202                 )
203                 for k, v in self.__dict__.items()
204                 if not k.startswith("_")
205             ),
206         )
207
208     decode: Callable[["GPS303Pkt", int, bytes], None]
209
210     def in_decode(self, length: int, packet: bytes) -> None:
211         # Overridden in subclasses, otherwise do not decode payload
212         return
213
214     def out_decode(self, length: int, packet: bytes) -> None:
215         # Overridden in subclasses, otherwise do not decode payload
216         return
217
218     encode: Callable[["GPS303Pkt"], bytes]
219
220     def in_encode(self) -> bytes:
221         # Necessary to emulate terminal, which is not implemented
222         raise NotImplementedError(
223             self.__class__.__name__ + ".encode() not implemented"
224         )
225
226     def out_encode(self) -> bytes:
227         # Overridden in subclasses, otherwise make empty payload
228         return b""
229
230     @property
231     def packed(self) -> bytes:
232         payload = self.encode()
233         length = len(payload) + 1
234         return pack("BB", length, self.PROTO) + payload
235
236
237 class UNKNOWN(GPS303Pkt):
238     PROTO = 256  # > 255 is impossible in real packets
239
240
241 class LOGIN(GPS303Pkt):
242     PROTO = 0x01
243     RESPOND = Respond.INL
244     # Default response for ACK, can also respond with STOP_UPLOAD
245
246     def in_decode(self, length: int, payload: bytes) -> None:
247         self.imei = payload[:-1].hex()
248         self.ver = unpack("B", payload[-1:])[0]
249
250
251 class SUPERVISION(GPS303Pkt):
252     PROTO = 0x05
253     OUT_KWARGS = (("status", int, 1),)
254
255     def out_encode(self) -> bytes:
256         # 1: The device automatically answers Pickup effect
257         # 2: Automatically Answering Two-way Calls
258         # 3: Ring manually answer the two-way call
259         return pack("B", self.status)
260
261
262 class HEARTBEAT(GPS303Pkt):
263     PROTO = 0x08
264     RESPOND = Respond.INL
265
266
267 class _GPS_POSITIONING(GPS303Pkt):
268     RESPOND = Respond.INL
269
270     def in_decode(self, length: int, payload: bytes) -> None:
271         self.dtime = payload[:6]
272         if self.dtime == b"\0\0\0\0\0\0":
273             self.devtime = None
274         else:
275             yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
276             self.devtime = datetime(
277                 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
278             )
279         self.gps_data_length = payload[6] >> 4
280         self.gps_nb_sat = payload[6] & 0x0F
281         lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
282         self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
283         flip_lon = bool(flags & 0b0000100000000000)  # bit 4
284         flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
285         self.heading = flags & 0b0000001111111111  # bits 6 - last
286         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
287         self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
288         self.speed = speed
289         self.flags = flags
290
291     def out_encode(self) -> bytes:
292         tup = datetime.utcnow().timetuple()
293         ttup = (tup[0] % 100,) + tup[1:6]
294         return pack("BBBBBB", *ttup)
295
296
297 class GPS_POSITIONING(_GPS_POSITIONING):
298     PROTO = 0x10
299
300
301 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
302     PROTO = 0x11
303
304
305 class STATUS(GPS303Pkt):
306     PROTO = 0x13
307     RESPOND = Respond.EXT
308     OUT_KWARGS = (("upload_interval", int, 25),)
309
310     def in_decode(self, length: int, payload: bytes) -> None:
311         self.batt, self.ver, self.timezone, self.intvl = unpack(
312             "BBBB", payload[:4]
313         )
314         if len(payload) > 4:
315             self.signal: Optional[int] = payload[4]
316         else:
317             self.signal = None
318
319     def out_encode(self) -> bytes:  # Set interval in minutes
320         return pack("B", self.upload_interval)
321
322
323 class HIBERNATION(GPS303Pkt):  # Server can send to send devicee to sleep
324     PROTO = 0x14
325
326
327 class RESET(GPS303Pkt):
328     # Device sends when it got reset SMS
329     # Server can send to initiate factory reset
330     PROTO = 0x15
331
332
333 class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
334     PROTO = 0x16
335     OUT_KWARGS = (("number", int, 3),)
336
337     def out_encode(self) -> bytes:  # Number of whitelist entries
338         return pack("B", self.number)
339
340
341 class _WIFI_POSITIONING(GPS303Pkt):
342     def in_decode(self, length: int, payload: bytes) -> None:
343         self.dtime = payload[:6]
344         if self.dtime == b"\0\0\0\0\0\0":
345             self.devtime = None
346         else:
347             self.devtime = datetime.strptime(
348                 self.dtime.hex(), "%y%m%d%H%M%S"
349             ).astimezone(tz=timezone.utc)
350         self.wifi_aps = []
351         for i in range(self.length):  # length has special meaning here
352             slice = payload[6 + i * 7 : 13 + i * 7]
353             self.wifi_aps.append(
354                 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
355             )
356         gsm_slice = payload[6 + self.length * 7 :]
357         ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
358         self.gsm_cells = []
359         for i in range(ncells):
360             slice = gsm_slice[4 + i * 5 : 9 + i * 5]
361             locac, cellid, sigstr = unpack(
362                 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
363             )
364             self.gsm_cells.append((locac, cellid, -sigstr))
365
366
367 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
368     PROTO = 0x17
369     RESPOND = Respond.INL
370
371     def out_encode(self) -> bytes:
372         return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
373
374
375 class TIME(GPS303Pkt):
376     PROTO = 0x30
377     RESPOND = Respond.INL
378
379     def out_encode(self) -> bytes:
380         return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
381
382
383 class PROHIBIT_LBS(GPS303Pkt):
384     PROTO = 0x33
385     OUT_KWARGS = (("status", int, 1),)
386
387     def out_encode(self) -> bytes:  # Server sent, 0-off, 1-on
388         return pack("B", self.status)
389
390
391 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
392     PROTO = 0x34
393
394     # Data is in packed decimal
395     # 00/01 - GPS on/off
396     # 00/01 - Don't set / Set upload period
397     # HHMMHHMM - Upload period
398     # 00/01 - LBS on/off
399     # 00/01 - Don't set / Set time of boot
400     # HHMM  - Time of boot
401     # 00/01 - Don't set / Set time of shutdown
402     # HHMM  - Time of shutdown
403     def out_encode(self) -> bytes:
404         return b""  # TODO
405
406
407 class _SET_PHONE(GPS303Pkt):
408     OUT_KWARGS = (("phone", str, ""),)
409
410     def out_encode(self) -> bytes:
411         self.phone: str
412         return self.phone.encode("")
413
414
415 class REMOTE_MONITOR_PHONE(_SET_PHONE):
416     PROTO = 0x40
417
418
419 class SOS_PHONE(_SET_PHONE):
420     PROTO = 0x41
421
422
423 class DAD_PHONE(_SET_PHONE):
424     PROTO = 0x42
425
426
427 class MOM_PHONE(_SET_PHONE):
428     PROTO = 0x43
429
430
431 class STOP_UPLOAD(GPS303Pkt):  # Server response to LOGIN to thwart the device
432     PROTO = 0x44
433
434
435 class GPS_OFF_PERIOD(GPS303Pkt):
436     PROTO = 0x46
437     OUT_KWARGS = (
438         ("onoff", int, 0),
439         ("fm", hhmm, "0000"),
440         ("to", hhmm, "2359"),
441     )
442
443     def out_encode(self) -> bytes:
444         return (
445             pack("B", self.onoff)
446             + bytes.fromhex(self.fm)
447             + bytes.fromhex(self.to)
448         )
449
450
451 class DND_PERIOD(GPS303Pkt):
452     PROTO = 0x47
453     OUT_KWARGS = (
454         ("onoff", int, 0),
455         ("week", int, 3),
456         ("fm1", hhmm, "0000"),
457         ("to1", hhmm, "2359"),
458         ("fm2", hhmm, "0000"),
459         ("to2", hhmm, "2359"),
460     )
461
462     def out_encode(self) -> bytes:
463         return (
464             pack("B", self.onoff)
465             + pack("B", self.week)
466             + bytes.fromhex(self.fm1)
467             + bytes.fromhex(self.to1)
468             + bytes.fromhex(self.fm2)
469             + bytes.fromhex(self.to2)
470         )
471
472
473 class RESTART_SHUTDOWN(GPS303Pkt):
474     PROTO = 0x48
475     OUT_KWARGS = (("flag", int, 0),)
476
477     def out_encode(self) -> bytes:
478         # 1 - restart
479         # 2 - shutdown
480         return pack("B", self.flag)
481
482
483 class DEVICE(GPS303Pkt):
484     PROTO = 0x49
485     OUT_KWARGS = (("flag", int, 0),)
486
487     # 0 - Stop looking for equipment
488     # 1 - Start looking for equipment
489     def out_encode(self) -> bytes:
490         return pack("B", self.flag)
491
492
493 class ALARM_CLOCK(GPS303Pkt):
494     PROTO = 0x50
495
496     def out_encode(self) -> bytes:
497         # TODO implement parsing kwargs
498         alarms = ((0, "0000"), (0, "0000"), (0, "0000"))
499         return b"".join(
500             pack("B", day) + bytes.fromhex(tm) for day, tm in alarms
501         )
502
503
504 class STOP_ALARM(GPS303Pkt):
505     PROTO = 0x56
506
507     def in_decode(self, length: int, payload: bytes) -> None:
508         self.flag = payload[0]
509
510
511 class SETUP(GPS303Pkt):
512     PROTO = 0x57
513     RESPOND = Respond.EXT
514     OUT_KWARGS = (
515         ("uploadintervalseconds", intx, 0x0300),
516         ("binaryswitch", intx, 0b00110001),
517         ("alarms", l3int, [0, 0, 0]),
518         ("dndtimeswitch", int, 0),
519         ("dndtimes", l3int, [0, 0, 0]),
520         ("gpstimeswitch", int, 0),
521         ("gpstimestart", int, 0),
522         ("gpstimestop", int, 0),
523         ("phonenumbers", l3str, ["", "", ""]),
524     )
525
526     def out_encode(self) -> bytes:
527         def pack3b(x: int) -> bytes:
528             return pack("!I", x)[1:]
529
530         return b"".join(
531             [
532                 pack("!H", self.uploadintervalseconds),
533                 pack("B", self.binaryswitch),
534             ]
535             + [pack3b(el) for el in self.alarms]
536             + [
537                 pack("B", self.dndtimeswitch),
538             ]
539             + [pack3b(el) for el in self.dndtimes]
540             + [
541                 pack("B", self.gpstimeswitch),
542                 pack("!H", self.gpstimestart),
543                 pack("!H", self.gpstimestop),
544             ]
545             + [b";".join([el.encode() for el in self.phonenumbers])]
546         )
547
548
549 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
550     PROTO = 0x58
551
552
553 class RESTORE_PASSWORD(GPS303Pkt):
554     PROTO = 0x67
555
556
557 class WIFI_POSITIONING(_WIFI_POSITIONING):
558     PROTO = 0x69
559     RESPOND = Respond.EXT
560     OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
561
562     def out_encode(self) -> bytes:
563         if self.latitude is None or self.longitude is None:
564             return b""
565         return "{:+#010.8g},{:+#010.8g}".format(
566             self.latitude, self.longitude
567         ).encode()
568
569     def out_decode(self, length: int, payload: bytes) -> None:
570         lat, lon = payload.decode().split(",")
571         self.latitude = float(lat)
572         self.longitude = float(lon)
573
574
575 class MANUAL_POSITIONING(GPS303Pkt):
576     PROTO = 0x80
577
578     def in_decode(self, length: int, payload: bytes) -> None:
579         self.flag = payload[0] if len(payload) > 0 else -1
580         self.reason = {
581             1: "Incorrect time",
582             2: "LBS less",
583             3: "WiFi less",
584             4: "LBS search > 3 times",
585             5: "Same LBS and WiFi data",
586             6: "LBS prohibited, WiFi absent",
587             7: "GPS spacing < 50 m",
588         }.get(self.flag, "Unknown")
589
590
591 class BATTERY_CHARGE(GPS303Pkt):
592     PROTO = 0x81
593
594
595 class CHARGER_CONNECTED(GPS303Pkt):
596     PROTO = 0x82
597
598
599 class CHARGER_DISCONNECTED(GPS303Pkt):
600     PROTO = 0x83
601
602
603 class VIBRATION_RECEIVED(GPS303Pkt):
604     PROTO = 0x94
605
606
607 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
608     PROTO = 0x98
609     RESPOND = Respond.EXT
610     OUT_KWARGS = (("interval", int, 10),)
611
612     def in_decode(self, length: int, payload: bytes) -> None:
613         self.interval = unpack("!H", payload[:2])
614
615     def out_encode(self) -> bytes:
616         return pack("!H", self.interval)
617
618
619 class SOS_ALARM(GPS303Pkt):
620     PROTO = 0x99
621
622
623 class UNKNOWN_B3(GPS303Pkt):
624     PROTO = 0xB3
625     IN_KWARGS = (("asciidata", str, ""),)
626
627     def in_decode(self, length: int, payload: bytes) -> None:
628         self.asciidata = payload.decode()
629
630
631 # Build dicts protocol number -> class and class name -> protocol number
632 CLASSES = {}
633 PROTOS = {}
634 if True:  # just to indent the code, sorry!
635     for cls in [
636         cls
637         for name, cls in globals().items()
638         if isclass(cls)
639         and issubclass(cls, GPS303Pkt)
640         and not name.startswith("_")
641     ]:
642         if hasattr(cls, "PROTO"):
643             CLASSES[cls.PROTO] = cls
644             PROTOS[cls.__name__] = cls.PROTO
645
646
647 def class_by_prefix(prefix: str) -> Union[type, List[Tuple[str, int]]]:
648     lst = [
649         (name, proto)
650         for name, proto in PROTOS.items()
651         if name.upper().startswith(prefix.upper())
652     ]
653     if len(lst) != 1:
654         return lst
655     _, proto = lst[0]
656     return CLASSES[proto]
657
658
659 def proto_by_name(name: str) -> int:
660     return PROTOS.get(name, -1)
661
662
663 def proto_of_message(packet: bytes) -> int:
664     return packet[1]
665
666
667 def inline_response(packet: bytes) -> Optional[bytes]:
668     proto = proto_of_message(packet)
669     if proto in CLASSES:
670         cls = CLASSES[proto]
671         if cls.RESPOND is Respond.INL:
672             return cls.Out().packed
673     return None
674
675
676 def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
677     """From a packet (without framing bytes) derive the XXX.In object"""
678     length, proto = unpack("BB", packet[:2])
679     payload = packet[2:]
680     if proto not in CLASSES:
681         cause: Union[DecodeError, ValueError] = ValueError(
682             f"Proto {proto} is unknown"
683         )
684     else:
685         try:
686             if is_incoming:
687                 return CLASSES[proto].In(length, payload)
688             else:
689                 return CLASSES[proto].Out(length, payload)
690         except DecodeError as e:
691             cause = e
692     if is_incoming:
693         retobj = UNKNOWN.In(length, payload)
694     else:
695         retobj = UNKNOWN.Out(length, payload)
696     retobj.PROTO = proto  # Override class attr with object attr
697     retobj.cause = cause
698     return retobj