]> www.average.org Git - loctrkd.git/blob - gps303/gps303proto.py
fix l3str/l3int breakage provoked by typeckeck
[loctrkd.git] / gps303 / gps303proto.py
1 """
2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
5
6 Forewarnings:
7 1. There is no security whatsoever. If you know the module's IMEI,
8    you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10    the TCP stream) makes it vulnerable to coincidental appearance
11    of framing bytes in the middle of the message. Most of the time
12    the server will receive one message in one TCP segment (i.e. in
13    one `recv()` operation, but relying on that would break things
14    if the path has lower MTU than the size of a message.
15 """
16
17 from datetime import datetime, timezone
18 from enum import Enum
19 from inspect import isclass
20 from struct import error, pack, unpack
21 from typing import (
22     Any,
23     Callable,
24     Dict,
25     List,
26     Optional,
27     Tuple,
28     Type,
29     TYPE_CHECKING,
30     Union,
31 )
32
33 __all__ = (
34     "class_by_prefix",
35     "inline_response",
36     "parse_message",
37     "proto_by_name",
38     "DecodeError",
39     "Respond",
40     "GPS303Pkt",
41     "UNKNOWN",
42     "LOGIN",
43     "SUPERVISION",
44     "HEARTBEAT",
45     "GPS_POSITIONING",
46     "GPS_OFFLINE_POSITIONING",
47     "STATUS",
48     "HIBERNATION",
49     "RESET",
50     "WHITELIST_TOTAL",
51     "WIFI_OFFLINE_POSITIONING",
52     "TIME",
53     "PROHIBIT_LBS",
54     "GPS_LBS_SWITCH_TIMES",
55     "REMOTE_MONITOR_PHONE",
56     "SOS_PHONE",
57     "DAD_PHONE",
58     "MOM_PHONE",
59     "STOP_UPLOAD",
60     "GPS_OFF_PERIOD",
61     "DND_PERIOD",
62     "RESTART_SHUTDOWN",
63     "DEVICE",
64     "ALARM_CLOCK",
65     "STOP_ALARM",
66     "SETUP",
67     "SYNCHRONOUS_WHITELIST",
68     "RESTORE_PASSWORD",
69     "WIFI_POSITIONING",
70     "MANUAL_POSITIONING",
71     "BATTERY_CHARGE",
72     "CHARGER_CONNECTED",
73     "CHARGER_DISCONNECTED",
74     "VIBRATION_RECEIVED",
75     "POSITION_UPLOAD_INTERVAL",
76     "SOS_ALARM",
77     "UNKNOWN_B3",
78 )
79
80
81 class DecodeError(Exception):
82     def __init__(self, e: Exception, **kwargs: Any) -> None:
83         super().__init__(e)
84         for k, v in kwargs.items():
85             setattr(self, k, v)
86
87
88 def intx(x: Union[str, int]) -> int:
89     if isinstance(x, str):
90         x = int(x, 0)
91     return x
92
93
94 def hhmm(x: str) -> str:
95     """Check for the string that represents hours and minutes"""
96     if not isinstance(x, str) or len(x) != 4:
97         raise ValueError(str(x) + " is not a four-character string")
98     hh = int(x[:2])
99     mm = int(x[2:])
100     if hh < 0 or hh > 23 or mm < 0 or mm > 59:
101         raise ValueError(str(x) + " does not contain valid hours and minutes")
102     return x
103
104
105 def l3str(x: Union[str, List[str]]) -> List[str]:
106     if isinstance(x, str):
107         lx = x.split(",")
108     else:
109         lx = x
110     if len(lx) != 3 or not all(isinstance(el, str) for el in x):
111         raise ValueError(str(lx) + " is not a list of three strings")
112     return lx
113
114
115 def l3int(x: Union[str, List[int]]) -> List[int]:
116     if isinstance(x, str):
117         lx = [int(el) for el in x.split(",")]
118     else:
119         lx = x
120     if len(lx) != 3 or not all(isinstance(el, int) for el in lx):
121         raise ValueError(str(lx) + " is not a list of three integers")
122     return lx
123
124
125 class MetaPkt(type):
126     """
127     For each class corresponding to a message, automatically create
128     two nested classes `In` and `Out` that also inherit from their
129     "nest". Class attribute `IN_KWARGS` defined in the "nest" is
130     copied to the `In` nested class under the name `KWARGS`, and
131     likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
132     to the nested class `Out`. In addition, method `encode` is
133     defined in both classes equal to `in_encode()` and `out_encode()`
134     respectively.
135     """
136
137     if TYPE_CHECKING:
138
139         def __getattr__(self, name: str) -> Any:
140             pass
141
142         def __setattr__(self, name: str, value: Any) -> None:
143             pass
144
145     def __new__(
146         cls: Type["MetaPkt"],
147         name: str,
148         bases: Tuple[type, ...],
149         attrs: Dict[str, Any],
150     ) -> "MetaPkt":
151         newcls = super().__new__(cls, name, bases, attrs)
152         newcls.In = super().__new__(
153             cls,
154             name + ".In",
155             (newcls,) + bases,
156             {
157                 "KWARGS": newcls.IN_KWARGS,
158                 "decode": newcls.in_decode,
159                 "encode": newcls.in_encode,
160             },
161         )
162         newcls.Out = super().__new__(
163             cls,
164             name + ".Out",
165             (newcls,) + bases,
166             {
167                 "KWARGS": newcls.OUT_KWARGS,
168                 "decode": newcls.out_decode,
169                 "encode": newcls.out_encode,
170             },
171         )
172         return newcls
173
174
175 class Respond(Enum):
176     NON = 0  # Incoming, no response needed
177     INL = 1  # Birirectional, use `inline_response()`
178     EXT = 2  # Birirectional, use external responder
179
180
181 class GPS303Pkt(metaclass=MetaPkt):
182     RESPOND = Respond.NON  # Do not send anything back by default
183     PROTO: int
184     IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
185     OUT_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
186     KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
187     In: Type["GPS303Pkt"]
188     Out: Type["GPS303Pkt"]
189
190     if TYPE_CHECKING:
191
192         def __getattr__(self, name: str) -> Any:
193             pass
194
195         def __setattr__(self, name: str, value: Any) -> None:
196             pass
197
198     def __init__(self, *args: Any, **kwargs: Any):
199         """
200         Construct the object _either_ from (length, payload),
201         _or_ from the values of individual fields
202         """
203         assert not args or (len(args) == 2 and not kwargs)
204         if args:  # guaranteed to be two arguments at this point
205             self.length, self.payload = args
206             try:
207                 self.decode(self.length, self.payload)
208             except error as e:
209                 raise DecodeError(e, obj=self)
210         else:
211             for kw, typ, dfl in self.KWARGS:
212                 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
213             if kwargs:
214                 raise ValueError(
215                     self.__class__.__name__ + " stray kwargs " + str(kwargs)
216                 )
217
218     def __repr__(self) -> str:
219         return "{}({})".format(
220             self.__class__.__name__,
221             ", ".join(
222                 "{}={}".format(
223                     k,
224                     'bytes.fromhex("{}")'.format(v.hex())
225                     if isinstance(v, bytes)
226                     else v.__repr__(),
227                 )
228                 for k, v in self.__dict__.items()
229                 if not k.startswith("_")
230             ),
231         )
232
233     decode: Callable[["GPS303Pkt", int, bytes], None]
234
235     def in_decode(self, length: int, packet: bytes) -> None:
236         # Overridden in subclasses, otherwise do not decode payload
237         return
238
239     def out_decode(self, length: int, packet: bytes) -> None:
240         # Overridden in subclasses, otherwise do not decode payload
241         return
242
243     encode: Callable[["GPS303Pkt"], bytes]
244
245     def in_encode(self) -> bytes:
246         # Necessary to emulate terminal, which is not implemented
247         raise NotImplementedError(
248             self.__class__.__name__ + ".encode() not implemented"
249         )
250
251     def out_encode(self) -> bytes:
252         # Overridden in subclasses, otherwise make empty payload
253         return b""
254
255     @property
256     def packed(self) -> bytes:
257         payload = self.encode()
258         length = len(payload) + 1
259         return pack("BB", length, self.PROTO) + payload
260
261
262 class UNKNOWN(GPS303Pkt):
263     PROTO = 256  # > 255 is impossible in real packets
264
265
266 class LOGIN(GPS303Pkt):
267     PROTO = 0x01
268     RESPOND = Respond.INL
269     # Default response for ACK, can also respond with STOP_UPLOAD
270
271     def in_decode(self, length: int, payload: bytes) -> None:
272         self.imei = payload[:-1].hex()
273         self.ver = unpack("B", payload[-1:])[0]
274
275
276 class SUPERVISION(GPS303Pkt):
277     PROTO = 0x05
278     OUT_KWARGS = (("status", int, 1),)
279
280     def out_encode(self) -> bytes:
281         # 1: The device automatically answers Pickup effect
282         # 2: Automatically Answering Two-way Calls
283         # 3: Ring manually answer the two-way call
284         return pack("B", self.status)
285
286
287 class HEARTBEAT(GPS303Pkt):
288     PROTO = 0x08
289     RESPOND = Respond.INL
290
291
292 class _GPS_POSITIONING(GPS303Pkt):
293     RESPOND = Respond.INL
294
295     def in_decode(self, length: int, payload: bytes) -> None:
296         self.dtime = payload[:6]
297         if self.dtime == b"\0\0\0\0\0\0":
298             self.devtime = None
299         else:
300             yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
301             self.devtime = datetime(
302                 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
303             )
304         self.gps_data_length = payload[6] >> 4
305         self.gps_nb_sat = payload[6] & 0x0F
306         lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
307         self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
308         flip_lon = bool(flags & 0b0000100000000000)  # bit 4
309         flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
310         self.heading = flags & 0b0000001111111111  # bits 6 - last
311         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
312         self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
313         self.speed = speed
314         self.flags = flags
315
316     def out_encode(self) -> bytes:
317         tup = datetime.utcnow().timetuple()
318         ttup = (tup[0] % 100,) + tup[1:6]
319         return pack("BBBBBB", *ttup)
320
321
322 class GPS_POSITIONING(_GPS_POSITIONING):
323     PROTO = 0x10
324
325
326 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
327     PROTO = 0x11
328
329
330 class STATUS(GPS303Pkt):
331     PROTO = 0x13
332     RESPOND = Respond.EXT
333     OUT_KWARGS = (("upload_interval", int, 25),)
334
335     def in_decode(self, length: int, payload: bytes) -> None:
336         self.batt, self.ver, self.timezone, self.intvl = unpack(
337             "BBBB", payload[:4]
338         )
339         if len(payload) > 4:
340             self.signal: Optional[int] = payload[4]
341         else:
342             self.signal = None
343
344     def out_encode(self) -> bytes:  # Set interval in minutes
345         return pack("B", self.upload_interval)
346
347
348 class HIBERNATION(GPS303Pkt):  # Server can send to send devicee to sleep
349     PROTO = 0x14
350
351
352 class RESET(GPS303Pkt):
353     # Device sends when it got reset SMS
354     # Server can send to initiate factory reset
355     PROTO = 0x15
356
357
358 class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
359     PROTO = 0x16
360     OUT_KWARGS = (("number", int, 3),)
361
362     def out_encode(self) -> bytes:  # Number of whitelist entries
363         return pack("B", self.number)
364
365
366 class _WIFI_POSITIONING(GPS303Pkt):
367     def in_decode(self, length: int, payload: bytes) -> None:
368         self.dtime = payload[:6]
369         if self.dtime == b"\0\0\0\0\0\0":
370             self.devtime = None
371         else:
372             self.devtime = datetime.strptime(
373                 self.dtime.hex(), "%y%m%d%H%M%S"
374             ).astimezone(tz=timezone.utc)
375         self.wifi_aps = []
376         for i in range(self.length):  # length has special meaning here
377             slice = payload[6 + i * 7 : 13 + i * 7]
378             self.wifi_aps.append(
379                 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
380             )
381         gsm_slice = payload[6 + self.length * 7 :]
382         ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
383         self.gsm_cells = []
384         for i in range(ncells):
385             slice = gsm_slice[4 + i * 5 : 9 + i * 5]
386             locac, cellid, sigstr = unpack(
387                 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
388             )
389             self.gsm_cells.append((locac, cellid, -sigstr))
390
391
392 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
393     PROTO = 0x17
394     RESPOND = Respond.INL
395
396     def out_encode(self) -> bytes:
397         return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
398
399
400 class TIME(GPS303Pkt):
401     PROTO = 0x30
402     RESPOND = Respond.INL
403
404     def out_encode(self) -> bytes:
405         return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
406
407
408 class PROHIBIT_LBS(GPS303Pkt):
409     PROTO = 0x33
410     OUT_KWARGS = (("status", int, 1),)
411
412     def out_encode(self) -> bytes:  # Server sent, 0-off, 1-on
413         return pack("B", self.status)
414
415
416 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
417     PROTO = 0x34
418
419     # Data is in packed decimal
420     # 00/01 - GPS on/off
421     # 00/01 - Don't set / Set upload period
422     # HHMMHHMM - Upload period
423     # 00/01 - LBS on/off
424     # 00/01 - Don't set / Set time of boot
425     # HHMM  - Time of boot
426     # 00/01 - Don't set / Set time of shutdown
427     # HHMM  - Time of shutdown
428     def out_encode(self) -> bytes:
429         return b""  # TODO
430
431
432 class _SET_PHONE(GPS303Pkt):
433     OUT_KWARGS = (("phone", str, ""),)
434
435     def out_encode(self) -> bytes:
436         self.phone: str
437         return self.phone.encode("")
438
439
440 class REMOTE_MONITOR_PHONE(_SET_PHONE):
441     PROTO = 0x40
442
443
444 class SOS_PHONE(_SET_PHONE):
445     PROTO = 0x41
446
447
448 class DAD_PHONE(_SET_PHONE):
449     PROTO = 0x42
450
451
452 class MOM_PHONE(_SET_PHONE):
453     PROTO = 0x43
454
455
456 class STOP_UPLOAD(GPS303Pkt):  # Server response to LOGIN to thwart the device
457     PROTO = 0x44
458
459
460 class GPS_OFF_PERIOD(GPS303Pkt):
461     PROTO = 0x46
462     OUT_KWARGS = (
463         ("onoff", int, 0),
464         ("fm", hhmm, "0000"),
465         ("to", hhmm, "2359"),
466     )
467
468     def out_encode(self) -> bytes:
469         return (
470             pack("B", self.onoff)
471             + bytes.fromhex(self.fm)
472             + bytes.fromhex(self.to)
473         )
474
475
476 class DND_PERIOD(GPS303Pkt):
477     PROTO = 0x47
478     OUT_KWARGS = (
479         ("onoff", int, 0),
480         ("week", int, 3),
481         ("fm1", hhmm, "0000"),
482         ("to1", hhmm, "2359"),
483         ("fm2", hhmm, "0000"),
484         ("to2", hhmm, "2359"),
485     )
486
487     def out_encode(self) -> bytes:
488         return (
489             pack("B", self.onoff)
490             + pack("B", self.week)
491             + bytes.fromhex(self.fm1)
492             + bytes.fromhex(self.to1)
493             + bytes.fromhex(self.fm2)
494             + bytes.fromhex(self.to2)
495         )
496
497
498 class RESTART_SHUTDOWN(GPS303Pkt):
499     PROTO = 0x48
500     OUT_KWARGS = (("flag", int, 0),)
501
502     def out_encode(self) -> bytes:
503         # 1 - restart
504         # 2 - shutdown
505         return pack("B", self.flag)
506
507
508 class DEVICE(GPS303Pkt):
509     PROTO = 0x49
510     OUT_KWARGS = (("flag", int, 0),)
511
512     # 0 - Stop looking for equipment
513     # 1 - Start looking for equipment
514     def out_encode(self) -> bytes:
515         return pack("B", self.flag)
516
517
518 class ALARM_CLOCK(GPS303Pkt):
519     PROTO = 0x50
520
521     def out_encode(self) -> bytes:
522         # TODO implement parsing kwargs
523         alarms = ((0, "0000"), (0, "0000"), (0, "0000"))
524         return b"".join(
525             pack("B", day) + bytes.fromhex(tm) for day, tm in alarms
526         )
527
528
529 class STOP_ALARM(GPS303Pkt):
530     PROTO = 0x56
531
532     def in_decode(self, length: int, payload: bytes) -> None:
533         self.flag = payload[0]
534
535
536 class SETUP(GPS303Pkt):
537     PROTO = 0x57
538     RESPOND = Respond.EXT
539     OUT_KWARGS = (
540         ("uploadintervalseconds", intx, 0x0300),
541         ("binaryswitch", intx, 0b00110001),
542         ("alarms", l3int, [0, 0, 0]),
543         ("dndtimeswitch", int, 0),
544         ("dndtimes", l3int, [0, 0, 0]),
545         ("gpstimeswitch", int, 0),
546         ("gpstimestart", int, 0),
547         ("gpstimestop", int, 0),
548         ("phonenumbers", l3str, ["", "", ""]),
549     )
550
551     def out_encode(self) -> bytes:
552         def pack3b(x: int) -> bytes:
553             return pack("!I", x)[1:]
554
555         return b"".join(
556             [
557                 pack("!H", self.uploadintervalseconds),
558                 pack("B", self.binaryswitch),
559             ]
560             + [pack3b(el) for el in self.alarms]
561             + [
562                 pack("B", self.dndtimeswitch),
563             ]
564             + [pack3b(el) for el in self.dndtimes]
565             + [
566                 pack("B", self.gpstimeswitch),
567                 pack("!H", self.gpstimestart),
568                 pack("!H", self.gpstimestop),
569             ]
570             + [b";".join([el.encode() for el in self.phonenumbers])]
571         )
572
573
574 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
575     PROTO = 0x58
576
577
578 class RESTORE_PASSWORD(GPS303Pkt):
579     PROTO = 0x67
580
581
582 class WIFI_POSITIONING(_WIFI_POSITIONING):
583     PROTO = 0x69
584     RESPOND = Respond.EXT
585     OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
586
587     def out_encode(self) -> bytes:
588         if self.latitude is None or self.longitude is None:
589             return b""
590         return "{:+#010.8g},{:+#010.8g}".format(
591             self.latitude, self.longitude
592         ).encode()
593
594     def out_decode(self, length: int, payload: bytes) -> None:
595         lat, lon = payload.decode().split(",")
596         self.latitude = float(lat)
597         self.longitude = float(lon)
598
599
600 class MANUAL_POSITIONING(GPS303Pkt):
601     PROTO = 0x80
602
603     def in_decode(self, length: int, payload: bytes) -> None:
604         self.flag = payload[0] if len(payload) > 0 else -1
605         self.reason = {
606             1: "Incorrect time",
607             2: "LBS less",
608             3: "WiFi less",
609             4: "LBS search > 3 times",
610             5: "Same LBS and WiFi data",
611             6: "LBS prohibited, WiFi absent",
612             7: "GPS spacing < 50 m",
613         }.get(self.flag, "Unknown")
614
615
616 class BATTERY_CHARGE(GPS303Pkt):
617     PROTO = 0x81
618
619
620 class CHARGER_CONNECTED(GPS303Pkt):
621     PROTO = 0x82
622
623
624 class CHARGER_DISCONNECTED(GPS303Pkt):
625     PROTO = 0x83
626
627
628 class VIBRATION_RECEIVED(GPS303Pkt):
629     PROTO = 0x94
630
631
632 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
633     PROTO = 0x98
634     RESPOND = Respond.EXT
635     OUT_KWARGS = (("interval", int, 10),)
636
637     def in_decode(self, length: int, payload: bytes) -> None:
638         self.interval = unpack("!H", payload[:2])
639
640     def out_encode(self) -> bytes:
641         return pack("!H", self.interval)
642
643
644 class SOS_ALARM(GPS303Pkt):
645     PROTO = 0x99
646
647
648 class UNKNOWN_B3(GPS303Pkt):
649     PROTO = 0xB3
650     IN_KWARGS = (("asciidata", str, ""),)
651
652     def in_decode(self, length: int, payload: bytes) -> None:
653         self.asciidata = payload.decode()
654
655
656 # Build dicts protocol number -> class and class name -> protocol number
657 CLASSES = {}
658 PROTOS = {}
659 if True:  # just to indent the code, sorry!
660     for cls in [
661         cls
662         for name, cls in globals().items()
663         if isclass(cls)
664         and issubclass(cls, GPS303Pkt)
665         and not name.startswith("_")
666     ]:
667         if hasattr(cls, "PROTO"):
668             CLASSES[cls.PROTO] = cls
669             PROTOS[cls.__name__] = cls.PROTO
670
671
672 def class_by_prefix(
673     prefix: str,
674 ) -> Union[Type[GPS303Pkt], List[Tuple[str, int]]]:
675     lst = [
676         (name, proto)
677         for name, proto in PROTOS.items()
678         if name.upper().startswith(prefix.upper())
679     ]
680     if len(lst) != 1:
681         return lst
682     _, proto = lst[0]
683     return CLASSES[proto]
684
685
686 def proto_by_name(name: str) -> int:
687     return PROTOS.get(name, -1)
688
689
690 def proto_of_message(packet: bytes) -> int:
691     return packet[1]
692
693
694 def inline_response(packet: bytes) -> Optional[bytes]:
695     proto = proto_of_message(packet)
696     if proto in CLASSES:
697         cls = CLASSES[proto]
698         if cls.RESPOND is Respond.INL:
699             return cls.Out().packed
700     return None
701
702
703 def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
704     """From a packet (without framing bytes) derive the XXX.In object"""
705     length, proto = unpack("BB", packet[:2])
706     payload = packet[2:]
707     if proto not in CLASSES:
708         cause: Union[DecodeError, ValueError] = ValueError(
709             f"Proto {proto} is unknown"
710         )
711     else:
712         try:
713             if is_incoming:
714                 return CLASSES[proto].In(length, payload)
715             else:
716                 return CLASSES[proto].Out(length, payload)
717         except DecodeError as e:
718             cause = e
719     if is_incoming:
720         retobj = UNKNOWN.In(length, payload)
721     else:
722         retobj = UNKNOWN.Out(length, payload)
723     retobj.PROTO = proto  # Override class attr with object attr
724     retobj.cause = cause
725     return retobj