]> www.average.org Git - loctrkd.git/blob - gps303/gps303proto.py
d07d4faa81edae0854d9cdea5eccfcc3b9e0fcbb
[loctrkd.git] / gps303 / gps303proto.py
1 """
2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
5
6 Forewarnings:
7 1. There is no security whatsoever. If you know the module's IMEI,
8    you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10    the TCP stream) makes it vulnerable to coincidental appearance
11    of framing bytes in the middle of the message. Most of the time
12    the server will receive one message in one TCP segment (i.e. in
13    one `recv()` operation, but relying on that would break things
14    if the path has lower MTU than the size of a message.
15 """
16
17 from datetime import datetime, timezone
18 from enum import Enum
19 from inspect import isclass
20 from struct import pack, unpack
21
22 __all__ = (
23     "class_by_prefix",
24     "inline_response",
25     "parse_message",
26     "proto_by_name",
27     "Respond",
28     "GPS303Pkt",
29     "UNKNOWN",
30     "LOGIN",
31     "SUPERVISION",
32     "HEARTBEAT",
33     "GPS_POSITIONING",
34     "GPS_OFFLINE_POSITIONING",
35     "STATUS",
36     "HIBERNATION",
37     "RESET",
38     "WHITELIST_TOTAL",
39     "WIFI_OFFLINE_POSITIONING",
40     "TIME",
41     "PROHIBIT_LBS",
42     "GPS_LBS_SWITCH_TIMES",
43     "REMOTE_MONITOR_PHONE",
44     "SOS_PHONE",
45     "DAD_PHONE",
46     "MOM_PHONE",
47     "STOP_UPLOAD",
48     "GPS_OFF_PERIOD",
49     "DND_PERIOD",
50     "RESTART_SHUTDOWN",
51     "DEVICE",
52     "ALARM_CLOCK",
53     "STOP_ALARM",
54     "SETUP",
55     "SYNCHRONOUS_WHITELIST",
56     "RESTORE_PASSWORD",
57     "WIFI_POSITIONING",
58     "MANUAL_POSITIONING",
59     "BATTERY_CHARGE",
60     "CHARGER_CONNECTED",
61     "CHARGER_DISCONNECTED",
62     "VIBRATION_RECEIVED",
63     "POSITION_UPLOAD_INTERVAL",
64     "SOS_ALARM",
65     "UNKNOWN_B3",
66 )
67
68
69 def intx(x):
70     if isinstance(x, str):
71         x = int(x, 0)
72     return x
73
74
75 def hhmm(x):
76     """Check for the string that represents hours and minutes"""
77     if not isinstance(x, str) or len(x) != 4:
78         raise ValueError(str(x) + " is not a four-character string")
79     hh = int(x[:2])
80     mm = int(x[2:])
81     if hh < 0 or hh > 23 or mm < 0 or mm > 59:
82         raise ValueError(str(x) + " does not contain valid hours and minutes")
83     return x
84
85
86 def l3str(x):
87     if isinstance(x, str):
88         x = x.split(",")
89     if len(x) != 3 or not all(isinstance(el, str) for el in x):
90         raise ValueError(str(x) + " is not a list of three strings")
91     return x
92
93
94 def l3int(x):
95     if isinstance(x, str):
96         x = x.split(",")
97         x = [int(el) for el in x]
98     if len(x) != 3 or not all(isinstance(el, int) for el in x):
99         raise ValueError(str(x) + " is not a list of three integers")
100     return x
101
102
103 class MetaPkt(type):
104     """
105     For each class corresponding to a message, automatically create
106     two nested classes `In` and `Out` that also inherit from their
107     "nest". Class attribute `IN_KWARGS` defined in the "nest" is
108     copied to the `In` nested class under the name `KWARGS`, and
109     likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
110     to the nested class `Out`. In addition, method `encode` is
111     defined in both classes equal to `in_encode()` and `out_encode()`
112     respectively.
113     """
114
115     def __new__(cls, name, bases, attrs):
116         newcls = super().__new__(cls, name, bases, attrs)
117         newcls.In = super().__new__(
118             cls,
119             name + ".In",
120             (newcls,) + bases,
121             {
122                 "KWARGS": newcls.IN_KWARGS,
123                 "decode": newcls.in_decode,
124                 "encode": newcls.in_encode,
125             },
126         )
127         newcls.Out = super().__new__(
128             cls,
129             name + ".Out",
130             (newcls,) + bases,
131             {
132                 "KWARGS": newcls.OUT_KWARGS,
133                 "decode": newcls.out_decode,
134                 "encode": newcls.out_encode,
135             },
136         )
137         return newcls
138
139
140 class Respond(Enum):
141     NON = 0  # Incoming, no response needed
142     INL = 1  # Birirectional, use `inline_response()`
143     EXT = 2  # Birirectional, use external responder
144
145
146 class GPS303Pkt(metaclass=MetaPkt):
147     RESPOND = Respond.NON  # Do not send anything back by default
148     PROTO: int
149     IN_KWARGS = ()
150     OUT_KWARGS = ()
151
152     def __init__(self, *args, **kwargs):
153         assert len(args) == 0
154         for kw, typ, dfl in self.KWARGS:
155             setattr(self, kw, typ(kwargs.pop(kw, dfl)))
156         if kwargs:
157             print("KWARGS", self.KWARGS)
158             print("kwargs", kwargs)
159             raise TypeError(
160                 self.__class__.__name__ + " stray kwargs " + str(kwargs)
161             )
162
163     def __repr__(self):
164         return "{}({})".format(
165             self.__class__.__name__,
166             ", ".join(
167                 "{}={}".format(
168                     k,
169                     'bytes.fromhex("{}")'.format(v.hex())
170                     if isinstance(v, bytes)
171                     else v.__repr__(),
172                 )
173                 for k, v in self.__dict__.items()
174                 if not k.startswith("_")
175             ),
176         )
177
178     def in_decode(self, length, packet):
179         return
180
181     def out_decode(self, length, packet):
182         raise NotImplementedError(
183             self.__class__.__name__ + ".decode() not implemented"
184         )
185
186     def in_encode(self):
187         raise NotImplementedError(
188             self.__class__.__name__ + ".encode() not implemented"
189         )
190
191     def out_encode(self):
192         return b""
193
194     @property
195     def packed(self):
196         payload = self.encode()
197         length = len(payload) + 1
198         return pack("BB", length, self.PROTO) + payload
199
200     @classmethod
201     def from_packet(cls, length, payload):
202         self = cls.In()
203         self.length = length
204         self.payload = payload
205         self.decode(length, payload)
206         return self
207
208
209 class UNKNOWN(GPS303Pkt):
210     PROTO = 256  # > 255 is impossible in real packets
211
212
213 class LOGIN(GPS303Pkt):
214     PROTO = 0x01
215     RESPOND = Respond.INL
216     # Default response for ACK, can also respond with STOP_UPLOAD
217
218     def in_decode(self, length, payload):
219         self.imei = payload[:-1].hex()
220         self.ver = unpack("B", payload[-1:])[0]
221         return self
222
223
224 class SUPERVISION(GPS303Pkt):
225     PROTO = 0x05
226     OUT_KWARGS = (("status", int, 1),)
227
228     def out_encode(self):
229         # 1: The device automatically answers Pickup effect
230         # 2: Automatically Answering Two-way Calls
231         # 3: Ring manually answer the two-way call
232         return pack("B", self.status)
233
234
235 class HEARTBEAT(GPS303Pkt):
236     PROTO = 0x08
237     RESPOND = Respond.INL
238
239
240 class _GPS_POSITIONING(GPS303Pkt):
241     RESPOND = Respond.INL
242
243     def in_decode(self, length, payload):
244         self.dtime = payload[:6]
245         if self.dtime == b"\0\0\0\0\0\0":
246             self.devtime = None
247         else:
248             yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
249             self.devtime = datetime(
250                 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
251             )
252         self.gps_data_length = payload[6] >> 4
253         self.gps_nb_sat = payload[6] & 0x0F
254         lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
255         self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
256         flip_lon = bool(flags & 0b0000100000000000)  # bit 4
257         flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
258         self.heading = flags & 0b0000001111111111  # bits 6 - last
259         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
260         self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
261         self.speed = speed
262         self.flags = flags
263         return self
264
265     def out_encode(self):
266         tup = datetime.utcnow().timetuple()
267         ttup = (tup[0] % 100,) + tup[1:6]
268         return pack("BBBBBB", *ttup)
269
270
271 class GPS_POSITIONING(_GPS_POSITIONING):
272     PROTO = 0x10
273
274
275 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
276     PROTO = 0x11
277
278
279 class STATUS(GPS303Pkt):
280     PROTO = 0x13
281     RESPOND = Respond.EXT
282     OUT_KWARGS = (("upload_interval", int, 25),)
283
284     def in_decode(self, length, payload):
285         self.batt, self.ver, self.timezone, self.intvl = unpack(
286             "BBBB", payload[:4]
287         )
288         if len(payload) > 4:
289             self.signal = payload[4]
290         else:
291             self.signal = None
292         return self
293
294     def out_encode(self):  # Set interval in minutes
295         return pack("B", self.upload_interval)
296
297
298 class HIBERNATION(GPS303Pkt):  # Server can send to send devicee to sleep
299     PROTO = 0x14
300     RESPOND = Respond.INL
301
302
303 class RESET(GPS303Pkt):
304     # Device sends when it got reset SMS
305     # Server can send to initiate factory reset
306     PROTO = 0x15
307
308
309 class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
310     PROTO = 0x16
311     OUT_KWARGS = (("number", int, 3),)
312
313     def out_encode(self):  # Number of whitelist entries
314         return pack("B", number)
315
316
317 class _WIFI_POSITIONING(GPS303Pkt):
318     def in_decode(self, length, payload):
319         self.dtime = payload[:6]
320         if self.dtime == b"\0\0\0\0\0\0":
321             self.devtime = None
322         else:
323             self.devtime = datetime.strptime(
324                 self.dtime.hex(), "%y%m%d%H%M%S"
325             ).astimezone(tz=timezone.utc)
326         self.wifi_aps = []
327         for i in range(self.length):  # length has special meaning here
328             slice = payload[6 + i * 7 : 13 + i * 7]
329             self.wifi_aps.append(
330                 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
331             )
332         gsm_slice = payload[6 + self.length * 7 :]
333         ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
334         self.gsm_cells = []
335         for i in range(ncells):
336             slice = gsm_slice[4 + i * 5 : 9 + i * 5]
337             locac, cellid, sigstr = unpack(
338                 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
339             )
340             self.gsm_cells.append((locac, cellid, -sigstr))
341         return self
342
343
344 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
345     PROTO = 0x17
346     RESPOND = Respond.INL
347
348     def out_encode(self):
349         return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
350
351
352 class TIME(GPS303Pkt):
353     PROTO = 0x30
354     RESPOND = Respond.INL
355
356     def out_encode(self):
357         return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
358
359
360 class PROHIBIT_LBS(GPS303Pkt):
361     PROTO = 0x33
362     OUT_KWARGS = (("status", int, 1),)
363
364     def out_encode(self):  # Server sent, 0-off, 1-on
365         return pack("B", self.status)
366
367
368 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
369     PROTO = 0x34
370
371     # Data is in packed decimal
372     # 00/01 - GPS on/off
373     # 00/01 - Don't set / Set upload period
374     # HHMMHHMM - Upload period
375     # 00/01 - LBS on/off
376     # 00/01 - Don't set / Set time of boot
377     # HHMM  - Time of boot
378     # 00/01 - Don't set / Set time of shutdown
379     # HHMM  - Time of shutdown
380     def out_encode(self):
381         return b""  # TODO
382
383
384 class _SET_PHONE(GPS303Pkt):
385     OUT_KWARGS = (("phone", str, ""),)
386
387     def out_encode(self):
388         return self.phone.encode()
389
390
391 class REMOTE_MONITOR_PHONE(_SET_PHONE):
392     PROTO = 0x40
393
394
395 class SOS_PHONE(_SET_PHONE):
396     PROTO = 0x41
397
398
399 class DAD_PHONE(_SET_PHONE):
400     PROTO = 0x42
401
402
403 class MOM_PHONE(_SET_PHONE):
404     PROTO = 0x43
405
406
407 class STOP_UPLOAD(GPS303Pkt):  # Server response to LOGIN to thwart the device
408     PROTO = 0x44
409
410
411 class GPS_OFF_PERIOD(GPS303Pkt):
412     PROTO = 0x46
413     OUT_KWARGS = (
414         ("onoff", int, 0),
415         ("fm", hhmm, "0000"),
416         ("to", hhmm, "2359"),
417     )
418
419     def out_encode(self):
420         return (
421             pack("B", self.onoff)
422             + bytes.fromhex(self.fm)
423             + bytes.fromhex(self.to)
424         )
425
426
427 class DND_PERIOD(GPS303Pkt):
428     PROTO = 0x47
429     OUT_KWARGS = (
430         ("onoff", int, 0),
431         ("week", int, 3),
432         ("fm1", hhmm, "0000"),
433         ("to1", hhmm, "2359"),
434         ("fm2", hhmm, "0000"),
435         ("to2", hhmm, "2359"),
436     )
437
438     def out_endode(self):
439         return (
440             pack("B", self.onoff)
441             + pack("B", self.week)
442             + bytes.fromhex(self.fm1)
443             + bytes.fromhex(self.to1)
444             + bytes.fromhex(self.fm2)
445             + bytes.fromhex(self.to2)
446         )
447
448
449 class RESTART_SHUTDOWN(GPS303Pkt):
450     PROTO = 0x48
451     OUT_KWARGS = (("flag", int, 0),)
452
453     def out_encode(self):
454         # 1 - restart
455         # 2 - shutdown
456         return pack("B", self.flag)
457
458
459 class DEVICE(GPS303Pkt):
460     PROTO = 0x49
461     OUT_KWARGS = (("flag", int, 0),)
462
463     # 0 - Stop looking for equipment
464     # 1 - Start looking for equipment
465     def out_encode(self):
466         return pack("B", self.flag)
467
468
469 class ALARM_CLOCK(GPS303Pkt):
470     PROTO = 0x50
471
472     def out_encode(self):
473         # TODO implement parsing kwargs
474         alarms = ((0, "0000"), (0, "0000"), (0, "0000"))
475         return b"".join(
476             cls("B", day) + bytes.fromhex(tm) for day, tm in alarms
477         )
478
479
480 class STOP_ALARM(GPS303Pkt):
481     PROTO = 0x56
482
483     def in_decode(self, length, payload):
484         self.flag = payload[0]
485         return self
486
487
488 class SETUP(GPS303Pkt):
489     PROTO = 0x57
490     RESPOND = Respond.EXT
491     OUT_KWARGS = (
492         ("uploadintervalseconds", intx, 0x0300),
493         ("binaryswitch", intx, 0b00110001),
494         ("alarms", l3int, [0, 0, 0]),
495         ("dndtimeswitch", int, 0),
496         ("dndtimes", l3int, [0, 0, 0]),
497         ("gpstimeswitch", int, 0),
498         ("gpstimestart", int, 0),
499         ("gpstimestop", int, 0),
500         ("phonenumbers", l3str, ["", "", ""]),
501     )
502
503     def out_encode(self):
504         def pack3b(x):
505             return pack("!I", x)[1:]
506
507         return b"".join(
508             [
509                 pack("!H", self.uploadintervalseconds),
510                 pack("B", self.binaryswitch),
511             ]
512             + [pack3b(el) for el in self.alarms]
513             + [
514                 pack("B", self.dndtimeswitch),
515             ]
516             + [pack3b(el) for el in self.dndtimes]
517             + [
518                 pack("B", self.gpstimeswitch),
519                 pack("!H", self.gpstimestart),
520                 pack("!H", self.gpstimestop),
521             ]
522             + [b";".join([el.encode() for el in self.phonenumbers])]
523         )
524
525
526 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
527     PROTO = 0x58
528
529
530 class RESTORE_PASSWORD(GPS303Pkt):
531     PROTO = 0x67
532
533
534 class WIFI_POSITIONING(_WIFI_POSITIONING):
535     PROTO = 0x69
536     RESPOND = Respond.EXT
537     OUT_KWARGS = (("lat", float, None), ("lon", float, None))
538
539     def out_encode(self):
540         if self.lat is None or self.lon is None:
541             return b""
542         return "{:+#010.8g},{:+#010.8g}".format(self.lat, self.lon).encode()
543
544
545 class MANUAL_POSITIONING(GPS303Pkt):
546     PROTO = 0x80
547
548     def in_decode(self, length, payload):
549         self.flag = payload[0] if len(payload) > 0 else None
550         self.reason = {
551             1: "Incorrect time",
552             2: "LBS less",
553             3: "WiFi less",
554             4: "LBS search > 3 times",
555             5: "Same LBS and WiFi data",
556             6: "LBS prohibited, WiFi absent",
557             7: "GPS spacing < 50 m",
558         }.get(self.flag, "Unknown")
559         return self
560
561
562 class BATTERY_CHARGE(GPS303Pkt):
563     PROTO = 0x81
564
565
566 class CHARGER_CONNECTED(GPS303Pkt):
567     PROTO = 0x82
568
569
570 class CHARGER_DISCONNECTED(GPS303Pkt):
571     PROTO = 0x83
572
573
574 class VIBRATION_RECEIVED(GPS303Pkt):
575     PROTO = 0x94
576
577
578 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
579     PROTO = 0x98
580     RESPOND = Respond.EXT
581     OUT_KWARGS = (("interval", int, 10),)
582
583     def in_decode(self, length, payload):
584         self.interval = unpack("!H", payload[:2])
585         return self
586
587     def out_encode(self):
588         return pack("!H", interval)
589
590
591 class SOS_ALARM(GPS303Pkt):
592     PROTO = 0x99
593
594
595 class UNKNOWN_B3(GPS303Pkt):
596     PROTO = 0xB3
597     IN_KWARGS = (("asciidata", str, ""),)
598
599     def in_decode(self, length, payload):
600         self.asciidata = payload.decode()
601         return self
602
603
604 # Build dicts protocol number -> class and class name -> protocol number
605 CLASSES = {}
606 PROTOS = {}
607 if True:  # just to indent the code, sorry!
608     for cls in [
609         cls
610         for name, cls in globals().items()
611         if isclass(cls)
612         and issubclass(cls, GPS303Pkt)
613         and not name.startswith("_")
614     ]:
615         if hasattr(cls, "PROTO"):
616             CLASSES[cls.PROTO] = cls
617             PROTOS[cls.__name__] = cls.PROTO
618
619
620 def class_by_prefix(prefix):
621     lst = [
622         (name, proto)
623         for name, proto in PROTOS.items()
624         if name.upper().startswith(prefix.upper())
625     ]
626     if len(lst) != 1:
627         return lst
628     _, proto = lst[0]
629     return CLASSES[proto]
630
631
632 def proto_by_name(name):
633     return PROTOS.get(name, -1)
634
635
636 def proto_of_message(packet):
637     return unpack("B", packet[1:2])[0]
638
639
640 def inline_response(packet):
641     proto = proto_of_message(packet)
642     if proto in CLASSES:
643         cls = CLASSES[proto]
644         if cls.RESPOND is Respond.INL:
645             return cls.Out().packed
646     return None
647
648
649 def parse_message(packet):
650     """From a packet (without framing bytes) derive the XXX.In object"""
651     length, proto = unpack("BB", packet[:2])
652     payload = packet[2:]
653     if proto in CLASSES:
654         return CLASSES[proto].from_packet(length, payload)
655     else:
656         retobj = UNKNOWN.from_packet(length, payload)
657         retobj.PROTO = proto  # Override class attr with object attr
658         return retobj