]> www.average.org Git - loctrkd.git/blob - gps303/gps303proto.py
c96a6dc945312ef4b0f9df46239afa06c73f6370
[loctrkd.git] / gps303 / gps303proto.py
1 """
2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
5
6 Forewarnings:
7 1. There is no security whatsoever. If you know the module's IMEI,
8    you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10    the TCP stream) makes it vulnerable to coincidental appearance
11    of framing bytes in the middle of the message. Most of the time
12    the server will receive one message in one TCP segment (i.e. in
13    one `recv()` operation, but relying on that would break things
14    if the path has lower MTU than the size of a message.
15 """
16
17 from datetime import datetime, timezone
18 from enum import Enum
19 from inspect import isclass
20 from struct import error, pack, unpack
21 from typing import (
22     Any,
23     Callable,
24     Dict,
25     List,
26     Optional,
27     Tuple,
28     Type,
29     TYPE_CHECKING,
30     Union,
31 )
32
33 __all__ = (
34     "class_by_prefix",
35     "inline_response",
36     "parse_message",
37     "proto_by_name",
38     "DecodeError",
39     "Respond",
40     "GPS303Pkt",
41     "UNKNOWN",
42     "LOGIN",
43     "SUPERVISION",
44     "HEARTBEAT",
45     "GPS_POSITIONING",
46     "GPS_OFFLINE_POSITIONING",
47     "STATUS",
48     "HIBERNATION",
49     "RESET",
50     "WHITELIST_TOTAL",
51     "WIFI_OFFLINE_POSITIONING",
52     "TIME",
53     "PROHIBIT_LBS",
54     "GPS_LBS_SWITCH_TIMES",
55     "REMOTE_MONITOR_PHONE",
56     "SOS_PHONE",
57     "DAD_PHONE",
58     "MOM_PHONE",
59     "STOP_UPLOAD",
60     "GPS_OFF_PERIOD",
61     "DND_PERIOD",
62     "RESTART_SHUTDOWN",
63     "DEVICE",
64     "ALARM_CLOCK",
65     "STOP_ALARM",
66     "SETUP",
67     "SYNCHRONOUS_WHITELIST",
68     "RESTORE_PASSWORD",
69     "WIFI_POSITIONING",
70     "MANUAL_POSITIONING",
71     "BATTERY_CHARGE",
72     "CHARGER_CONNECTED",
73     "CHARGER_DISCONNECTED",
74     "VIBRATION_RECEIVED",
75     "POSITION_UPLOAD_INTERVAL",
76     "SOS_ALARM",
77     "UNKNOWN_B3",
78 )
79
80
81 class DecodeError(Exception):
82     def __init__(self, e: Exception, **kwargs: Any) -> None:
83         super().__init__(e)
84         for k, v in kwargs.items():
85             setattr(self, k, v)
86
87
88 def intx(x: Union[str, int]) -> int:
89     if isinstance(x, str):
90         x = int(x, 0)
91     return x
92
93
94 def hhmm(x: str) -> str:
95     """Check for the string that represents hours and minutes"""
96     if not isinstance(x, str) or len(x) != 4:
97         raise ValueError(str(x) + " is not a four-character string")
98     hh = int(x[:2])
99     mm = int(x[2:])
100     if hh < 0 or hh > 23 or mm < 0 or mm > 59:
101         raise ValueError(str(x) + " does not contain valid hours and minutes")
102     return x
103
104
105 def l3str(x: Union[str, List[str]]) -> List[str]:
106     if isinstance(x, str):
107         lx = x.split(",")
108     if len(lx) != 3 or not all(isinstance(el, str) for el in x):
109         raise ValueError(str(lx) + " is not a list of three strings")
110     return lx
111
112
113 def l3int(x: Union[str, List[int]]) -> List[int]:
114     if isinstance(x, str):
115         lx = [int(el) for el in x.split(",")]
116     if len(lx) != 3 or not all(isinstance(el, int) for el in lx):
117         raise ValueError(str(lx) + " is not a list of three integers")
118     return lx
119
120
121 class MetaPkt(type):
122     """
123     For each class corresponding to a message, automatically create
124     two nested classes `In` and `Out` that also inherit from their
125     "nest". Class attribute `IN_KWARGS` defined in the "nest" is
126     copied to the `In` nested class under the name `KWARGS`, and
127     likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
128     to the nested class `Out`. In addition, method `encode` is
129     defined in both classes equal to `in_encode()` and `out_encode()`
130     respectively.
131     """
132
133     if TYPE_CHECKING:
134
135         def __getattr__(self, name: str) -> Any:
136             pass
137
138         def __setattr__(self, name: str, value: Any) -> None:
139             pass
140
141     def __new__(
142         cls: Type["MetaPkt"],
143         name: str,
144         bases: Tuple[type, ...],
145         attrs: Dict[str, Any],
146     ) -> "MetaPkt":
147         newcls = super().__new__(cls, name, bases, attrs)
148         newcls.In = super().__new__(
149             cls,
150             name + ".In",
151             (newcls,) + bases,
152             {
153                 "KWARGS": newcls.IN_KWARGS,
154                 "decode": newcls.in_decode,
155                 "encode": newcls.in_encode,
156             },
157         )
158         newcls.Out = super().__new__(
159             cls,
160             name + ".Out",
161             (newcls,) + bases,
162             {
163                 "KWARGS": newcls.OUT_KWARGS,
164                 "decode": newcls.out_decode,
165                 "encode": newcls.out_encode,
166             },
167         )
168         return newcls
169
170
171 class Respond(Enum):
172     NON = 0  # Incoming, no response needed
173     INL = 1  # Birirectional, use `inline_response()`
174     EXT = 2  # Birirectional, use external responder
175
176
177 class GPS303Pkt(metaclass=MetaPkt):
178     RESPOND = Respond.NON  # Do not send anything back by default
179     PROTO: int
180     IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
181     OUT_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
182     KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
183     In: Type["GPS303Pkt"]
184     Out: Type["GPS303Pkt"]
185
186     if TYPE_CHECKING:
187
188         def __getattr__(self, name: str) -> Any:
189             pass
190
191         def __setattr__(self, name: str, value: Any) -> None:
192             pass
193
194     def __init__(self, *args: Any, **kwargs: Any):
195         """
196         Construct the object _either_ from (length, payload),
197         _or_ from the values of individual fields
198         """
199         assert not args or (len(args) == 2 and not kwargs)
200         if args:  # guaranteed to be two arguments at this point
201             self.length, self.payload = args
202             try:
203                 self.decode(self.length, self.payload)
204             except error as e:
205                 raise DecodeError(e, obj=self)
206         else:
207             for kw, typ, dfl in self.KWARGS:
208                 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
209             if kwargs:
210                 raise ValueError(
211                     self.__class__.__name__ + " stray kwargs " + str(kwargs)
212                 )
213
214     def __repr__(self) -> str:
215         return "{}({})".format(
216             self.__class__.__name__,
217             ", ".join(
218                 "{}={}".format(
219                     k,
220                     'bytes.fromhex("{}")'.format(v.hex())
221                     if isinstance(v, bytes)
222                     else v.__repr__(),
223                 )
224                 for k, v in self.__dict__.items()
225                 if not k.startswith("_")
226             ),
227         )
228
229     decode: Callable[["GPS303Pkt", int, bytes], None]
230
231     def in_decode(self, length: int, packet: bytes) -> None:
232         # Overridden in subclasses, otherwise do not decode payload
233         return
234
235     def out_decode(self, length: int, packet: bytes) -> None:
236         # Overridden in subclasses, otherwise do not decode payload
237         return
238
239     encode: Callable[["GPS303Pkt"], bytes]
240
241     def in_encode(self) -> bytes:
242         # Necessary to emulate terminal, which is not implemented
243         raise NotImplementedError(
244             self.__class__.__name__ + ".encode() not implemented"
245         )
246
247     def out_encode(self) -> bytes:
248         # Overridden in subclasses, otherwise make empty payload
249         return b""
250
251     @property
252     def packed(self) -> bytes:
253         payload = self.encode()
254         length = len(payload) + 1
255         return pack("BB", length, self.PROTO) + payload
256
257
258 class UNKNOWN(GPS303Pkt):
259     PROTO = 256  # > 255 is impossible in real packets
260
261
262 class LOGIN(GPS303Pkt):
263     PROTO = 0x01
264     RESPOND = Respond.INL
265     # Default response for ACK, can also respond with STOP_UPLOAD
266
267     def in_decode(self, length: int, payload: bytes) -> None:
268         self.imei = payload[:-1].hex()
269         self.ver = unpack("B", payload[-1:])[0]
270
271
272 class SUPERVISION(GPS303Pkt):
273     PROTO = 0x05
274     OUT_KWARGS = (("status", int, 1),)
275
276     def out_encode(self) -> bytes:
277         # 1: The device automatically answers Pickup effect
278         # 2: Automatically Answering Two-way Calls
279         # 3: Ring manually answer the two-way call
280         return pack("B", self.status)
281
282
283 class HEARTBEAT(GPS303Pkt):
284     PROTO = 0x08
285     RESPOND = Respond.INL
286
287
288 class _GPS_POSITIONING(GPS303Pkt):
289     RESPOND = Respond.INL
290
291     def in_decode(self, length: int, payload: bytes) -> None:
292         self.dtime = payload[:6]
293         if self.dtime == b"\0\0\0\0\0\0":
294             self.devtime = None
295         else:
296             yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
297             self.devtime = datetime(
298                 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
299             )
300         self.gps_data_length = payload[6] >> 4
301         self.gps_nb_sat = payload[6] & 0x0F
302         lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
303         self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
304         flip_lon = bool(flags & 0b0000100000000000)  # bit 4
305         flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
306         self.heading = flags & 0b0000001111111111  # bits 6 - last
307         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
308         self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
309         self.speed = speed
310         self.flags = flags
311
312     def out_encode(self) -> bytes:
313         tup = datetime.utcnow().timetuple()
314         ttup = (tup[0] % 100,) + tup[1:6]
315         return pack("BBBBBB", *ttup)
316
317
318 class GPS_POSITIONING(_GPS_POSITIONING):
319     PROTO = 0x10
320
321
322 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
323     PROTO = 0x11
324
325
326 class STATUS(GPS303Pkt):
327     PROTO = 0x13
328     RESPOND = Respond.EXT
329     OUT_KWARGS = (("upload_interval", int, 25),)
330
331     def in_decode(self, length: int, payload: bytes) -> None:
332         self.batt, self.ver, self.timezone, self.intvl = unpack(
333             "BBBB", payload[:4]
334         )
335         if len(payload) > 4:
336             self.signal: Optional[int] = payload[4]
337         else:
338             self.signal = None
339
340     def out_encode(self) -> bytes:  # Set interval in minutes
341         return pack("B", self.upload_interval)
342
343
344 class HIBERNATION(GPS303Pkt):  # Server can send to send devicee to sleep
345     PROTO = 0x14
346
347
348 class RESET(GPS303Pkt):
349     # Device sends when it got reset SMS
350     # Server can send to initiate factory reset
351     PROTO = 0x15
352
353
354 class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
355     PROTO = 0x16
356     OUT_KWARGS = (("number", int, 3),)
357
358     def out_encode(self) -> bytes:  # Number of whitelist entries
359         return pack("B", self.number)
360
361
362 class _WIFI_POSITIONING(GPS303Pkt):
363     def in_decode(self, length: int, payload: bytes) -> None:
364         self.dtime = payload[:6]
365         if self.dtime == b"\0\0\0\0\0\0":
366             self.devtime = None
367         else:
368             self.devtime = datetime.strptime(
369                 self.dtime.hex(), "%y%m%d%H%M%S"
370             ).astimezone(tz=timezone.utc)
371         self.wifi_aps = []
372         for i in range(self.length):  # length has special meaning here
373             slice = payload[6 + i * 7 : 13 + i * 7]
374             self.wifi_aps.append(
375                 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
376             )
377         gsm_slice = payload[6 + self.length * 7 :]
378         ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
379         self.gsm_cells = []
380         for i in range(ncells):
381             slice = gsm_slice[4 + i * 5 : 9 + i * 5]
382             locac, cellid, sigstr = unpack(
383                 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
384             )
385             self.gsm_cells.append((locac, cellid, -sigstr))
386
387
388 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
389     PROTO = 0x17
390     RESPOND = Respond.INL
391
392     def out_encode(self) -> bytes:
393         return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
394
395
396 class TIME(GPS303Pkt):
397     PROTO = 0x30
398     RESPOND = Respond.INL
399
400     def out_encode(self) -> bytes:
401         return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
402
403
404 class PROHIBIT_LBS(GPS303Pkt):
405     PROTO = 0x33
406     OUT_KWARGS = (("status", int, 1),)
407
408     def out_encode(self) -> bytes:  # Server sent, 0-off, 1-on
409         return pack("B", self.status)
410
411
412 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
413     PROTO = 0x34
414
415     # Data is in packed decimal
416     # 00/01 - GPS on/off
417     # 00/01 - Don't set / Set upload period
418     # HHMMHHMM - Upload period
419     # 00/01 - LBS on/off
420     # 00/01 - Don't set / Set time of boot
421     # HHMM  - Time of boot
422     # 00/01 - Don't set / Set time of shutdown
423     # HHMM  - Time of shutdown
424     def out_encode(self) -> bytes:
425         return b""  # TODO
426
427
428 class _SET_PHONE(GPS303Pkt):
429     OUT_KWARGS = (("phone", str, ""),)
430
431     def out_encode(self) -> bytes:
432         self.phone: str
433         return self.phone.encode("")
434
435
436 class REMOTE_MONITOR_PHONE(_SET_PHONE):
437     PROTO = 0x40
438
439
440 class SOS_PHONE(_SET_PHONE):
441     PROTO = 0x41
442
443
444 class DAD_PHONE(_SET_PHONE):
445     PROTO = 0x42
446
447
448 class MOM_PHONE(_SET_PHONE):
449     PROTO = 0x43
450
451
452 class STOP_UPLOAD(GPS303Pkt):  # Server response to LOGIN to thwart the device
453     PROTO = 0x44
454
455
456 class GPS_OFF_PERIOD(GPS303Pkt):
457     PROTO = 0x46
458     OUT_KWARGS = (
459         ("onoff", int, 0),
460         ("fm", hhmm, "0000"),
461         ("to", hhmm, "2359"),
462     )
463
464     def out_encode(self) -> bytes:
465         return (
466             pack("B", self.onoff)
467             + bytes.fromhex(self.fm)
468             + bytes.fromhex(self.to)
469         )
470
471
472 class DND_PERIOD(GPS303Pkt):
473     PROTO = 0x47
474     OUT_KWARGS = (
475         ("onoff", int, 0),
476         ("week", int, 3),
477         ("fm1", hhmm, "0000"),
478         ("to1", hhmm, "2359"),
479         ("fm2", hhmm, "0000"),
480         ("to2", hhmm, "2359"),
481     )
482
483     def out_encode(self) -> bytes:
484         return (
485             pack("B", self.onoff)
486             + pack("B", self.week)
487             + bytes.fromhex(self.fm1)
488             + bytes.fromhex(self.to1)
489             + bytes.fromhex(self.fm2)
490             + bytes.fromhex(self.to2)
491         )
492
493
494 class RESTART_SHUTDOWN(GPS303Pkt):
495     PROTO = 0x48
496     OUT_KWARGS = (("flag", int, 0),)
497
498     def out_encode(self) -> bytes:
499         # 1 - restart
500         # 2 - shutdown
501         return pack("B", self.flag)
502
503
504 class DEVICE(GPS303Pkt):
505     PROTO = 0x49
506     OUT_KWARGS = (("flag", int, 0),)
507
508     # 0 - Stop looking for equipment
509     # 1 - Start looking for equipment
510     def out_encode(self) -> bytes:
511         return pack("B", self.flag)
512
513
514 class ALARM_CLOCK(GPS303Pkt):
515     PROTO = 0x50
516
517     def out_encode(self) -> bytes:
518         # TODO implement parsing kwargs
519         alarms = ((0, "0000"), (0, "0000"), (0, "0000"))
520         return b"".join(
521             pack("B", day) + bytes.fromhex(tm) for day, tm in alarms
522         )
523
524
525 class STOP_ALARM(GPS303Pkt):
526     PROTO = 0x56
527
528     def in_decode(self, length: int, payload: bytes) -> None:
529         self.flag = payload[0]
530
531
532 class SETUP(GPS303Pkt):
533     PROTO = 0x57
534     RESPOND = Respond.EXT
535     OUT_KWARGS = (
536         ("uploadintervalseconds", intx, 0x0300),
537         ("binaryswitch", intx, 0b00110001),
538         ("alarms", l3int, [0, 0, 0]),
539         ("dndtimeswitch", int, 0),
540         ("dndtimes", l3int, [0, 0, 0]),
541         ("gpstimeswitch", int, 0),
542         ("gpstimestart", int, 0),
543         ("gpstimestop", int, 0),
544         ("phonenumbers", l3str, ["", "", ""]),
545     )
546
547     def out_encode(self) -> bytes:
548         def pack3b(x: int) -> bytes:
549             return pack("!I", x)[1:]
550
551         return b"".join(
552             [
553                 pack("!H", self.uploadintervalseconds),
554                 pack("B", self.binaryswitch),
555             ]
556             + [pack3b(el) for el in self.alarms]
557             + [
558                 pack("B", self.dndtimeswitch),
559             ]
560             + [pack3b(el) for el in self.dndtimes]
561             + [
562                 pack("B", self.gpstimeswitch),
563                 pack("!H", self.gpstimestart),
564                 pack("!H", self.gpstimestop),
565             ]
566             + [b";".join([el.encode() for el in self.phonenumbers])]
567         )
568
569
570 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
571     PROTO = 0x58
572
573
574 class RESTORE_PASSWORD(GPS303Pkt):
575     PROTO = 0x67
576
577
578 class WIFI_POSITIONING(_WIFI_POSITIONING):
579     PROTO = 0x69
580     RESPOND = Respond.EXT
581     OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
582
583     def out_encode(self) -> bytes:
584         if self.latitude is None or self.longitude is None:
585             return b""
586         return "{:+#010.8g},{:+#010.8g}".format(
587             self.latitude, self.longitude
588         ).encode()
589
590     def out_decode(self, length: int, payload: bytes) -> None:
591         lat, lon = payload.decode().split(",")
592         self.latitude = float(lat)
593         self.longitude = float(lon)
594
595
596 class MANUAL_POSITIONING(GPS303Pkt):
597     PROTO = 0x80
598
599     def in_decode(self, length: int, payload: bytes) -> None:
600         self.flag = payload[0] if len(payload) > 0 else -1
601         self.reason = {
602             1: "Incorrect time",
603             2: "LBS less",
604             3: "WiFi less",
605             4: "LBS search > 3 times",
606             5: "Same LBS and WiFi data",
607             6: "LBS prohibited, WiFi absent",
608             7: "GPS spacing < 50 m",
609         }.get(self.flag, "Unknown")
610
611
612 class BATTERY_CHARGE(GPS303Pkt):
613     PROTO = 0x81
614
615
616 class CHARGER_CONNECTED(GPS303Pkt):
617     PROTO = 0x82
618
619
620 class CHARGER_DISCONNECTED(GPS303Pkt):
621     PROTO = 0x83
622
623
624 class VIBRATION_RECEIVED(GPS303Pkt):
625     PROTO = 0x94
626
627
628 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
629     PROTO = 0x98
630     RESPOND = Respond.EXT
631     OUT_KWARGS = (("interval", int, 10),)
632
633     def in_decode(self, length: int, payload: bytes) -> None:
634         self.interval = unpack("!H", payload[:2])
635
636     def out_encode(self) -> bytes:
637         return pack("!H", self.interval)
638
639
640 class SOS_ALARM(GPS303Pkt):
641     PROTO = 0x99
642
643
644 class UNKNOWN_B3(GPS303Pkt):
645     PROTO = 0xB3
646     IN_KWARGS = (("asciidata", str, ""),)
647
648     def in_decode(self, length: int, payload: bytes) -> None:
649         self.asciidata = payload.decode()
650
651
652 # Build dicts protocol number -> class and class name -> protocol number
653 CLASSES = {}
654 PROTOS = {}
655 if True:  # just to indent the code, sorry!
656     for cls in [
657         cls
658         for name, cls in globals().items()
659         if isclass(cls)
660         and issubclass(cls, GPS303Pkt)
661         and not name.startswith("_")
662     ]:
663         if hasattr(cls, "PROTO"):
664             CLASSES[cls.PROTO] = cls
665             PROTOS[cls.__name__] = cls.PROTO
666
667
668 def class_by_prefix(
669     prefix: str,
670 ) -> Union[Type[GPS303Pkt], List[Tuple[str, int]]]:
671     lst = [
672         (name, proto)
673         for name, proto in PROTOS.items()
674         if name.upper().startswith(prefix.upper())
675     ]
676     if len(lst) != 1:
677         return lst
678     _, proto = lst[0]
679     return CLASSES[proto]
680
681
682 def proto_by_name(name: str) -> int:
683     return PROTOS.get(name, -1)
684
685
686 def proto_of_message(packet: bytes) -> int:
687     return packet[1]
688
689
690 def inline_response(packet: bytes) -> Optional[bytes]:
691     proto = proto_of_message(packet)
692     if proto in CLASSES:
693         cls = CLASSES[proto]
694         if cls.RESPOND is Respond.INL:
695             return cls.Out().packed
696     return None
697
698
699 def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
700     """From a packet (without framing bytes) derive the XXX.In object"""
701     length, proto = unpack("BB", packet[:2])
702     payload = packet[2:]
703     if proto not in CLASSES:
704         cause: Union[DecodeError, ValueError] = ValueError(
705             f"Proto {proto} is unknown"
706         )
707     else:
708         try:
709             if is_incoming:
710                 return CLASSES[proto].In(length, payload)
711             else:
712                 return CLASSES[proto].Out(length, payload)
713         except DecodeError as e:
714             cause = e
715     if is_incoming:
716         retobj = UNKNOWN.In(length, payload)
717     else:
718         retobj = UNKNOWN.Out(length, payload)
719     retobj.PROTO = proto  # Override class attr with object attr
720     retobj.cause = cause
721     return retobj