]> www.average.org Git - loctrkd.git/blob - gps303/gps303proto.py
b70cacf1eb328571171272657c92c97d0f04b2f1
[loctrkd.git] / gps303 / gps303proto.py
1 """
2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
5
6 Forewarnings:
7 1. There is no security whatsoever. If you know the module's IMEI,
8    you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10    the TCP stream) makes it vulnerable to coincidental appearance
11    of framing bytes in the middle of the message. Most of the time
12    the server will receive one message in one TCP segment (i.e. in
13    one `recv()` operation, but relying on that would break things
14    if the path has lower MTU than the size of a message.
15 """
16
17 from datetime import datetime, timezone
18 from enum import Enum
19 from inspect import isclass
20 from struct import error, pack, unpack
21 from typing import Any, Callable, Tuple
22
23 __all__ = (
24     "class_by_prefix",
25     "inline_response",
26     "parse_message",
27     "proto_by_name",
28     "DecodeError",
29     "Respond",
30     "GPS303Pkt",
31     "UNKNOWN",
32     "LOGIN",
33     "SUPERVISION",
34     "HEARTBEAT",
35     "GPS_POSITIONING",
36     "GPS_OFFLINE_POSITIONING",
37     "STATUS",
38     "HIBERNATION",
39     "RESET",
40     "WHITELIST_TOTAL",
41     "WIFI_OFFLINE_POSITIONING",
42     "TIME",
43     "PROHIBIT_LBS",
44     "GPS_LBS_SWITCH_TIMES",
45     "REMOTE_MONITOR_PHONE",
46     "SOS_PHONE",
47     "DAD_PHONE",
48     "MOM_PHONE",
49     "STOP_UPLOAD",
50     "GPS_OFF_PERIOD",
51     "DND_PERIOD",
52     "RESTART_SHUTDOWN",
53     "DEVICE",
54     "ALARM_CLOCK",
55     "STOP_ALARM",
56     "SETUP",
57     "SYNCHRONOUS_WHITELIST",
58     "RESTORE_PASSWORD",
59     "WIFI_POSITIONING",
60     "MANUAL_POSITIONING",
61     "BATTERY_CHARGE",
62     "CHARGER_CONNECTED",
63     "CHARGER_DISCONNECTED",
64     "VIBRATION_RECEIVED",
65     "POSITION_UPLOAD_INTERVAL",
66     "SOS_ALARM",
67     "UNKNOWN_B3",
68 )
69
70
71 class DecodeError(Exception):
72     def __init__(self, e, **kwargs):
73         super().__init__(e)
74         for k, v in kwargs.items():
75             setattr(self, k, v)
76
77 def intx(x):
78     if isinstance(x, str):
79         x = int(x, 0)
80     return x
81
82
83 def hhmm(x):
84     """Check for the string that represents hours and minutes"""
85     if not isinstance(x, str) or len(x) != 4:
86         raise ValueError(str(x) + " is not a four-character string")
87     hh = int(x[:2])
88     mm = int(x[2:])
89     if hh < 0 or hh > 23 or mm < 0 or mm > 59:
90         raise ValueError(str(x) + " does not contain valid hours and minutes")
91     return x
92
93
94 def l3str(x):
95     if isinstance(x, str):
96         x = x.split(",")
97     if len(x) != 3 or not all(isinstance(el, str) for el in x):
98         raise ValueError(str(x) + " is not a list of three strings")
99     return x
100
101
102 def l3int(x):
103     if isinstance(x, str):
104         x = x.split(",")
105         x = [int(el) for el in x]
106     if len(x) != 3 or not all(isinstance(el, int) for el in x):
107         raise ValueError(str(x) + " is not a list of three integers")
108     return x
109
110
111 class MetaPkt(type):
112     """
113     For each class corresponding to a message, automatically create
114     two nested classes `In` and `Out` that also inherit from their
115     "nest". Class attribute `IN_KWARGS` defined in the "nest" is
116     copied to the `In` nested class under the name `KWARGS`, and
117     likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
118     to the nested class `Out`. In addition, method `encode` is
119     defined in both classes equal to `in_encode()` and `out_encode()`
120     respectively.
121     """
122
123     def __new__(cls, name, bases, attrs):
124         newcls = super().__new__(cls, name, bases, attrs)
125         newcls.In = super().__new__(
126             cls,
127             name + ".In",
128             (newcls,) + bases,
129             {
130                 "KWARGS": newcls.IN_KWARGS,
131                 "decode": newcls.in_decode,
132                 "encode": newcls.in_encode,
133             },
134         )
135         newcls.Out = super().__new__(
136             cls,
137             name + ".Out",
138             (newcls,) + bases,
139             {
140                 "KWARGS": newcls.OUT_KWARGS,
141                 "decode": newcls.out_decode,
142                 "encode": newcls.out_encode,
143             },
144         )
145         return newcls
146
147
148 class Respond(Enum):
149     NON = 0  # Incoming, no response needed
150     INL = 1  # Birirectional, use `inline_response()`
151     EXT = 2  # Birirectional, use external responder
152
153
154 class GPS303Pkt(metaclass=MetaPkt):
155     RESPOND = Respond.NON  # Do not send anything back by default
156     PROTO: int
157     IN_KWARGS: Tuple[Tuple[str, Callable, Any], ...] = ()
158     OUT_KWARGS: Tuple[Tuple[str, Callable, Any], ...] = ()
159
160     def __init__(self, *args, **kwargs):
161         """
162         Construct the object _either_ from (length, payload),
163         _or_ from the values of individual fields
164         """
165         assert not args or (len(args) == 2 and not kwargs)
166         if args:  # guaranteed to be two arguments at this point
167             self.length, self.payload = args
168             try:
169                 self.decode(self.length, self.payload)
170             except error as e:
171                 raise DecodeError(e, obj=self)
172         else:
173             for kw, typ, dfl in self.KWARGS:
174                 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
175             if kwargs:
176                 raise ValueError(
177                     self.__class__.__name__ + " stray kwargs " + str(kwargs)
178                 )
179
180     def __repr__(self):
181         return "{}({})".format(
182             self.__class__.__name__,
183             ", ".join(
184                 "{}={}".format(
185                     k,
186                     'bytes.fromhex("{}")'.format(v.hex())
187                     if isinstance(v, bytes)
188                     else v.__repr__(),
189                 )
190                 for k, v in self.__dict__.items()
191                 if not k.startswith("_")
192             ),
193         )
194
195     def in_decode(self, length, packet):
196         # Overridden in subclasses, otherwise do not decode payload
197         return
198
199     def out_decode(self, length, packet):
200         # Overridden in subclasses, otherwise do not decode payload
201         return
202
203     def in_encode(self):
204         # Necessary to emulate terminal, which is not implemented
205         raise NotImplementedError(
206             self.__class__.__name__ + ".encode() not implemented"
207         )
208
209     def out_encode(self):
210         # Overridden in subclasses, otherwise make empty payload
211         return b""
212
213     @property
214     def packed(self):
215         payload = self.encode()
216         length = len(payload) + 1
217         return pack("BB", length, self.PROTO) + payload
218
219
220 class UNKNOWN(GPS303Pkt):
221     PROTO = 256  # > 255 is impossible in real packets
222
223
224 class LOGIN(GPS303Pkt):
225     PROTO = 0x01
226     RESPOND = Respond.INL
227     # Default response for ACK, can also respond with STOP_UPLOAD
228
229     def in_decode(self, length, payload):
230         self.imei = payload[:-1].hex()
231         self.ver = unpack("B", payload[-1:])[0]
232         return self
233
234
235 class SUPERVISION(GPS303Pkt):
236     PROTO = 0x05
237     OUT_KWARGS = (("status", int, 1),)
238
239     def out_encode(self):
240         # 1: The device automatically answers Pickup effect
241         # 2: Automatically Answering Two-way Calls
242         # 3: Ring manually answer the two-way call
243         return pack("B", self.status)
244
245
246 class HEARTBEAT(GPS303Pkt):
247     PROTO = 0x08
248     RESPOND = Respond.INL
249
250
251 class _GPS_POSITIONING(GPS303Pkt):
252     RESPOND = Respond.INL
253
254     def in_decode(self, length, payload):
255         self.dtime = payload[:6]
256         if self.dtime == b"\0\0\0\0\0\0":
257             self.devtime = None
258         else:
259             yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
260             self.devtime = datetime(
261                 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
262             )
263         self.gps_data_length = payload[6] >> 4
264         self.gps_nb_sat = payload[6] & 0x0F
265         lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
266         self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
267         flip_lon = bool(flags & 0b0000100000000000)  # bit 4
268         flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
269         self.heading = flags & 0b0000001111111111  # bits 6 - last
270         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
271         self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
272         self.speed = speed
273         self.flags = flags
274         return self
275
276     def out_encode(self):
277         tup = datetime.utcnow().timetuple()
278         ttup = (tup[0] % 100,) + tup[1:6]
279         return pack("BBBBBB", *ttup)
280
281
282 class GPS_POSITIONING(_GPS_POSITIONING):
283     PROTO = 0x10
284
285
286 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
287     PROTO = 0x11
288
289
290 class STATUS(GPS303Pkt):
291     PROTO = 0x13
292     RESPOND = Respond.EXT
293     OUT_KWARGS = (("upload_interval", int, 25),)
294
295     def in_decode(self, length, payload):
296         self.batt, self.ver, self.timezone, self.intvl = unpack(
297             "BBBB", payload[:4]
298         )
299         if len(payload) > 4:
300             self.signal = payload[4]
301         else:
302             self.signal = None
303         return self
304
305     def out_encode(self):  # Set interval in minutes
306         return pack("B", self.upload_interval)
307
308
309 class HIBERNATION(GPS303Pkt):  # Server can send to send devicee to sleep
310     PROTO = 0x14
311
312
313 class RESET(GPS303Pkt):
314     # Device sends when it got reset SMS
315     # Server can send to initiate factory reset
316     PROTO = 0x15
317
318
319 class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
320     PROTO = 0x16
321     OUT_KWARGS = (("number", int, 3),)
322
323     def out_encode(self):  # Number of whitelist entries
324         return pack("B", number)
325
326
327 class _WIFI_POSITIONING(GPS303Pkt):
328     def in_decode(self, length, payload):
329         self.dtime = payload[:6]
330         if self.dtime == b"\0\0\0\0\0\0":
331             self.devtime = None
332         else:
333             self.devtime = datetime.strptime(
334                 self.dtime.hex(), "%y%m%d%H%M%S"
335             ).astimezone(tz=timezone.utc)
336         self.wifi_aps = []
337         for i in range(self.length):  # length has special meaning here
338             slice = payload[6 + i * 7 : 13 + i * 7]
339             self.wifi_aps.append(
340                 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
341             )
342         gsm_slice = payload[6 + self.length * 7 :]
343         ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
344         self.gsm_cells = []
345         for i in range(ncells):
346             slice = gsm_slice[4 + i * 5 : 9 + i * 5]
347             locac, cellid, sigstr = unpack(
348                 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
349             )
350             self.gsm_cells.append((locac, cellid, -sigstr))
351         return self
352
353
354 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
355     PROTO = 0x17
356     RESPOND = Respond.INL
357
358     def out_encode(self):
359         return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
360
361
362 class TIME(GPS303Pkt):
363     PROTO = 0x30
364     RESPOND = Respond.INL
365
366     def out_encode(self):
367         return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
368
369
370 class PROHIBIT_LBS(GPS303Pkt):
371     PROTO = 0x33
372     OUT_KWARGS = (("status", int, 1),)
373
374     def out_encode(self):  # Server sent, 0-off, 1-on
375         return pack("B", self.status)
376
377
378 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
379     PROTO = 0x34
380
381     # Data is in packed decimal
382     # 00/01 - GPS on/off
383     # 00/01 - Don't set / Set upload period
384     # HHMMHHMM - Upload period
385     # 00/01 - LBS on/off
386     # 00/01 - Don't set / Set time of boot
387     # HHMM  - Time of boot
388     # 00/01 - Don't set / Set time of shutdown
389     # HHMM  - Time of shutdown
390     def out_encode(self):
391         return b""  # TODO
392
393
394 class _SET_PHONE(GPS303Pkt):
395     OUT_KWARGS = (("phone", str, ""),)
396
397     def out_encode(self):
398         return self.phone.encode()
399
400
401 class REMOTE_MONITOR_PHONE(_SET_PHONE):
402     PROTO = 0x40
403
404
405 class SOS_PHONE(_SET_PHONE):
406     PROTO = 0x41
407
408
409 class DAD_PHONE(_SET_PHONE):
410     PROTO = 0x42
411
412
413 class MOM_PHONE(_SET_PHONE):
414     PROTO = 0x43
415
416
417 class STOP_UPLOAD(GPS303Pkt):  # Server response to LOGIN to thwart the device
418     PROTO = 0x44
419
420
421 class GPS_OFF_PERIOD(GPS303Pkt):
422     PROTO = 0x46
423     OUT_KWARGS = (
424         ("onoff", int, 0),
425         ("fm", hhmm, "0000"),
426         ("to", hhmm, "2359"),
427     )
428
429     def out_encode(self):
430         return (
431             pack("B", self.onoff)
432             + bytes.fromhex(self.fm)
433             + bytes.fromhex(self.to)
434         )
435
436
437 class DND_PERIOD(GPS303Pkt):
438     PROTO = 0x47
439     OUT_KWARGS = (
440         ("onoff", int, 0),
441         ("week", int, 3),
442         ("fm1", hhmm, "0000"),
443         ("to1", hhmm, "2359"),
444         ("fm2", hhmm, "0000"),
445         ("to2", hhmm, "2359"),
446     )
447
448     def out_endode(self):
449         return (
450             pack("B", self.onoff)
451             + pack("B", self.week)
452             + bytes.fromhex(self.fm1)
453             + bytes.fromhex(self.to1)
454             + bytes.fromhex(self.fm2)
455             + bytes.fromhex(self.to2)
456         )
457
458
459 class RESTART_SHUTDOWN(GPS303Pkt):
460     PROTO = 0x48
461     OUT_KWARGS = (("flag", int, 0),)
462
463     def out_encode(self):
464         # 1 - restart
465         # 2 - shutdown
466         return pack("B", self.flag)
467
468
469 class DEVICE(GPS303Pkt):
470     PROTO = 0x49
471     OUT_KWARGS = (("flag", int, 0),)
472
473     # 0 - Stop looking for equipment
474     # 1 - Start looking for equipment
475     def out_encode(self):
476         return pack("B", self.flag)
477
478
479 class ALARM_CLOCK(GPS303Pkt):
480     PROTO = 0x50
481
482     def out_encode(self):
483         # TODO implement parsing kwargs
484         alarms = ((0, "0000"), (0, "0000"), (0, "0000"))
485         return b"".join(
486             cls("B", day) + bytes.fromhex(tm) for day, tm in alarms
487         )
488
489
490 class STOP_ALARM(GPS303Pkt):
491     PROTO = 0x56
492
493     def in_decode(self, length, payload):
494         self.flag = payload[0]
495         return self
496
497
498 class SETUP(GPS303Pkt):
499     PROTO = 0x57
500     RESPOND = Respond.EXT
501     OUT_KWARGS = (
502         ("uploadintervalseconds", intx, 0x0300),
503         ("binaryswitch", intx, 0b00110001),
504         ("alarms", l3int, [0, 0, 0]),
505         ("dndtimeswitch", int, 0),
506         ("dndtimes", l3int, [0, 0, 0]),
507         ("gpstimeswitch", int, 0),
508         ("gpstimestart", int, 0),
509         ("gpstimestop", int, 0),
510         ("phonenumbers", l3str, ["", "", ""]),
511     )
512
513     def out_encode(self):
514         def pack3b(x):
515             return pack("!I", x)[1:]
516
517         return b"".join(
518             [
519                 pack("!H", self.uploadintervalseconds),
520                 pack("B", self.binaryswitch),
521             ]
522             + [pack3b(el) for el in self.alarms]
523             + [
524                 pack("B", self.dndtimeswitch),
525             ]
526             + [pack3b(el) for el in self.dndtimes]
527             + [
528                 pack("B", self.gpstimeswitch),
529                 pack("!H", self.gpstimestart),
530                 pack("!H", self.gpstimestop),
531             ]
532             + [b";".join([el.encode() for el in self.phonenumbers])]
533         )
534
535
536 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
537     PROTO = 0x58
538
539
540 class RESTORE_PASSWORD(GPS303Pkt):
541     PROTO = 0x67
542
543
544 class WIFI_POSITIONING(_WIFI_POSITIONING):
545     PROTO = 0x69
546     RESPOND = Respond.EXT
547     OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
548
549     def out_encode(self):
550         if self.latitude is None or self.longitude is None:
551             return b""
552         return "{:+#010.8g},{:+#010.8g}".format(
553             self.latitude, self.longitude
554         ).encode()
555
556     def out_decode(self, length, payload):
557         lat, lon = payload.decode().split(",")
558         self.latitude = float(lat)
559         self.longitude = float(lon)
560
561
562 class MANUAL_POSITIONING(GPS303Pkt):
563     PROTO = 0x80
564
565     def in_decode(self, length, payload):
566         self.flag = payload[0] if len(payload) > 0 else None
567         self.reason = {
568             1: "Incorrect time",
569             2: "LBS less",
570             3: "WiFi less",
571             4: "LBS search > 3 times",
572             5: "Same LBS and WiFi data",
573             6: "LBS prohibited, WiFi absent",
574             7: "GPS spacing < 50 m",
575         }.get(self.flag, "Unknown")
576         return self
577
578
579 class BATTERY_CHARGE(GPS303Pkt):
580     PROTO = 0x81
581
582
583 class CHARGER_CONNECTED(GPS303Pkt):
584     PROTO = 0x82
585
586
587 class CHARGER_DISCONNECTED(GPS303Pkt):
588     PROTO = 0x83
589
590
591 class VIBRATION_RECEIVED(GPS303Pkt):
592     PROTO = 0x94
593
594
595 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
596     PROTO = 0x98
597     RESPOND = Respond.EXT
598     OUT_KWARGS = (("interval", int, 10),)
599
600     def in_decode(self, length, payload):
601         self.interval = unpack("!H", payload[:2])
602         return self
603
604     def out_encode(self):
605         return pack("!H", interval)
606
607
608 class SOS_ALARM(GPS303Pkt):
609     PROTO = 0x99
610
611
612 class UNKNOWN_B3(GPS303Pkt):
613     PROTO = 0xB3
614     IN_KWARGS = (("asciidata", str, ""),)
615
616     def in_decode(self, length, payload):
617         self.asciidata = payload.decode()
618         return self
619
620
621 # Build dicts protocol number -> class and class name -> protocol number
622 CLASSES = {}
623 PROTOS = {}
624 if True:  # just to indent the code, sorry!
625     for cls in [
626         cls
627         for name, cls in globals().items()
628         if isclass(cls)
629         and issubclass(cls, GPS303Pkt)
630         and not name.startswith("_")
631     ]:
632         if hasattr(cls, "PROTO"):
633             CLASSES[cls.PROTO] = cls
634             PROTOS[cls.__name__] = cls.PROTO
635
636
637 def class_by_prefix(prefix):
638     lst = [
639         (name, proto)
640         for name, proto in PROTOS.items()
641         if name.upper().startswith(prefix.upper())
642     ]
643     if len(lst) != 1:
644         return lst
645     _, proto = lst[0]
646     return CLASSES[proto]
647
648
649 def proto_by_name(name):
650     return PROTOS.get(name, -1)
651
652
653 def proto_of_message(packet):
654     return unpack("B", packet[1:2])[0]
655
656
657 def inline_response(packet):
658     proto = proto_of_message(packet)
659     if proto in CLASSES:
660         cls = CLASSES[proto]
661         if cls.RESPOND is Respond.INL:
662             return cls.Out().packed
663     return None
664
665
666 def parse_message(packet, is_incoming=True):
667     """From a packet (without framing bytes) derive the XXX.In object"""
668     length, proto = unpack("BB", packet[:2])
669     payload = packet[2:]
670     if proto not in CLASSES:
671         cause = ValueError(f"Proto {proto} is unknown")
672     else:
673         try:
674             if is_incoming:
675                 return CLASSES[proto].In(length, payload)
676             else:
677                 return CLASSES[proto].Out(length, payload)
678         except DecodeError as e:
679             cause = e
680     if is_incoming:
681         retobj = UNKNOWN.In(length, payload)
682     else:
683         retobj = UNKNOWN.Out(length, payload)
684     retobj.PROTO = proto  # Override class attr with object attr
685     retobj.cause = cause
686     return retobj