]> www.average.org Git - loctrkd.git/blob - gps303/gps303proto.py
8f4ffcce124e23fd1a4baeb236e434578cbcae5f
[loctrkd.git] / gps303 / gps303proto.py
1 """
2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
5
6 Forewarnings:
7 1. There is no security whatsoever. If you know the module's IMEI,
8    you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10    the TCP stream) makes it vulnerable to coincidental appearance
11    of framing bytes in the middle of the message. Most of the time
12    the server will receive one message in one TCP segment (i.e. in
13    one `recv()` operation, but relying on that would break things
14    if the path has lower MTU than the size of a message.
15 """
16
17 from datetime import datetime, timezone
18 from enum import Enum
19 from inspect import isclass
20 from struct import pack, unpack
21
22 __all__ = (
23     "class_by_prefix",
24     "inline_response",
25     "parse_message",
26     "proto_by_name",
27     "Respond",
28     "GPS303Pkt",
29     "UNKNOWN",
30     "LOGIN",
31     "SUPERVISION",
32     "HEARTBEAT",
33     "GPS_POSITIONING",
34     "GPS_OFFLINE_POSITIONING",
35     "STATUS",
36     "HIBERNATION",
37     "RESET",
38     "WHITELIST_TOTAL",
39     "WIFI_OFFLINE_POSITIONING",
40     "TIME",
41     "PROHIBIT_LBS",
42     "GPS_LBS_SWITCH_TIMES",
43     "REMOTE_MONITOR_PHONE",
44     "SOS_PHONE",
45     "DAD_PHONE",
46     "MOM_PHONE",
47     "STOP_UPLOAD",
48     "GPS_OFF_PERIOD",
49     "DND_PERIOD",
50     "RESTART_SHUTDOWN",
51     "DEVICE",
52     "ALARM_CLOCK",
53     "STOP_ALARM",
54     "SETUP",
55     "SYNCHRONOUS_WHITELIST",
56     "RESTORE_PASSWORD",
57     "WIFI_POSITIONING",
58     "MANUAL_POSITIONING",
59     "BATTERY_CHARGE",
60     "CHARGER_CONNECTED",
61     "CHARGER_DISCONNECTED",
62     "VIBRATION_RECEIVED",
63     "POSITION_UPLOAD_INTERVAL",
64     "SOS_ALARM",
65     "UNKNOWN_B3",
66 )
67
68
69 def intx(x):
70     if isinstance(x, str):
71         x = int(x, 0)
72     return x
73
74
75 def hhmm(x):
76     """Check for the string that represents hours and minutes"""
77     if not isinstance(x, str) or len(x) != 4:
78         raise ValueError(str(x) + " is not a four-character string")
79     hh = int(x[:2])
80     mm = int(x[2:])
81     if hh < 0 or hh > 23 or mm < 0 or mm > 59:
82         raise ValueError(str(x) + " does not contain valid hours and minutes")
83     return x
84
85
86 def l3str(x):
87     if isinstance(x, str):
88         x = x.split(",")
89     if len(x) != 3 or not all(isinstance(el, str) for el in x):
90         raise ValueError(str(x) + " is not a list of three strings")
91     return x
92
93
94 def l3int(x):
95     if isinstance(x, str):
96         x = x.split(",")
97         x = [int(el) for el in x]
98     if len(x) != 3 or not all(isinstance(el, int) for el in x):
99         raise ValueError(str(x) + " is not a list of three integers")
100     return x
101
102
103 class MetaPkt(type):
104     """
105     For each class corresponding to a message, automatically create
106     two nested classes `In` and `Out` that also inherit from their
107     "nest". Class attribute `IN_KWARGS` defined in the "nest" is
108     copied to the `In` nested class under the name `KWARGS`, and
109     likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
110     to the nested class `Out`. In addition, method `encode` is
111     defined in both classes equal to `in_encode()` and `out_encode()`
112     respectively.
113     """
114
115     def __new__(cls, name, bases, attrs):
116         newcls = super().__new__(cls, name, bases, attrs)
117         newcls.In = super().__new__(
118             cls,
119             name + ".In",
120             (newcls,) + bases,
121             {
122                 "KWARGS": newcls.IN_KWARGS,
123                 "decode": newcls.in_decode,
124                 "encode": newcls.in_encode,
125             },
126         )
127         newcls.Out = super().__new__(
128             cls,
129             name + ".Out",
130             (newcls,) + bases,
131             {
132                 "KWARGS": newcls.OUT_KWARGS,
133                 "decode": newcls.out_decode,
134                 "encode": newcls.out_encode,
135             },
136         )
137         return newcls
138
139
140 class Respond(Enum):
141     NON = 0  # Incoming, no response needed
142     INL = 1  # Birirectional, use `inline_response()`
143     EXT = 2  # Birirectional, use external responder
144
145
146 class GPS303Pkt(metaclass=MetaPkt):
147     RESPOND = Respond.NON  # Do not send anything back by default
148     PROTO: int
149     IN_KWARGS = ()
150     OUT_KWARGS = ()
151
152     def __init__(self, *args, **kwargs):
153         """
154         Construct the object _either_ from (length, payload),
155         _or_ from the values of individual fields
156         """
157         assert not args or (len(args) == 2 and not kwargs)
158         if args:  # guaranteed to be two arguments at this point
159             self.length, self.payload = args
160             self.decode(self.length, self.payload)
161         else:
162             for kw, typ, dfl in self.KWARGS:
163                 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
164             if kwargs:
165                 raise ValueError(
166                     self.__class__.__name__ + " stray kwargs " + str(kwargs)
167                 )
168
169     def __repr__(self):
170         return "{}({})".format(
171             self.__class__.__name__,
172             ", ".join(
173                 "{}={}".format(
174                     k,
175                     'bytes.fromhex("{}")'.format(v.hex())
176                     if isinstance(v, bytes)
177                     else v.__repr__(),
178                 )
179                 for k, v in self.__dict__.items()
180                 if not k.startswith("_")
181             ),
182         )
183
184     def in_decode(self, length, packet):
185         # Overridden in subclasses, otherwise do not decode payload
186         return
187
188     def out_decode(self, length, packet):
189         # Overridden in subclasses, otherwise do not decode payload
190         return
191
192     def in_encode(self):
193         # Necessary to emulate terminal, which is not implemented
194         raise NotImplementedError(
195             self.__class__.__name__ + ".encode() not implemented"
196         )
197
198     def out_encode(self):
199         # Overridden in subclasses, otherwise make empty payload
200         return b""
201
202     @property
203     def packed(self):
204         payload = self.encode()
205         length = len(payload) + 1
206         return pack("BB", length, self.PROTO) + payload
207
208
209 class UNKNOWN(GPS303Pkt):
210     PROTO = 256  # > 255 is impossible in real packets
211
212
213 class LOGIN(GPS303Pkt):
214     PROTO = 0x01
215     RESPOND = Respond.INL
216     # Default response for ACK, can also respond with STOP_UPLOAD
217
218     def in_decode(self, length, payload):
219         self.imei = payload[:-1].hex()
220         self.ver = unpack("B", payload[-1:])[0]
221         return self
222
223
224 class SUPERVISION(GPS303Pkt):
225     PROTO = 0x05
226     OUT_KWARGS = (("status", int, 1),)
227
228     def out_encode(self):
229         # 1: The device automatically answers Pickup effect
230         # 2: Automatically Answering Two-way Calls
231         # 3: Ring manually answer the two-way call
232         return pack("B", self.status)
233
234
235 class HEARTBEAT(GPS303Pkt):
236     PROTO = 0x08
237     RESPOND = Respond.INL
238
239
240 class _GPS_POSITIONING(GPS303Pkt):
241     RESPOND = Respond.INL
242
243     def in_decode(self, length, payload):
244         self.dtime = payload[:6]
245         if self.dtime == b"\0\0\0\0\0\0":
246             self.devtime = None
247         else:
248             yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
249             self.devtime = datetime(
250                 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
251             )
252         self.gps_data_length = payload[6] >> 4
253         self.gps_nb_sat = payload[6] & 0x0F
254         lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
255         self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
256         flip_lon = bool(flags & 0b0000100000000000)  # bit 4
257         flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
258         self.heading = flags & 0b0000001111111111  # bits 6 - last
259         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
260         self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
261         self.speed = speed
262         self.flags = flags
263         return self
264
265     def out_encode(self):
266         tup = datetime.utcnow().timetuple()
267         ttup = (tup[0] % 100,) + tup[1:6]
268         return pack("BBBBBB", *ttup)
269
270
271 class GPS_POSITIONING(_GPS_POSITIONING):
272     PROTO = 0x10
273
274
275 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
276     PROTO = 0x11
277
278
279 class STATUS(GPS303Pkt):
280     PROTO = 0x13
281     RESPOND = Respond.EXT
282     OUT_KWARGS = (("upload_interval", int, 25),)
283
284     def in_decode(self, length, payload):
285         self.batt, self.ver, self.timezone, self.intvl = unpack(
286             "BBBB", payload[:4]
287         )
288         if len(payload) > 4:
289             self.signal = payload[4]
290         else:
291             self.signal = None
292         return self
293
294     def out_encode(self):  # Set interval in minutes
295         return pack("B", self.upload_interval)
296
297
298 class HIBERNATION(GPS303Pkt):  # Server can send to send devicee to sleep
299     PROTO = 0x14
300     RESPOND = Respond.INL
301
302
303 class RESET(GPS303Pkt):
304     # Device sends when it got reset SMS
305     # Server can send to initiate factory reset
306     PROTO = 0x15
307
308
309 class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
310     PROTO = 0x16
311     OUT_KWARGS = (("number", int, 3),)
312
313     def out_encode(self):  # Number of whitelist entries
314         return pack("B", number)
315
316
317 class _WIFI_POSITIONING(GPS303Pkt):
318     def in_decode(self, length, payload):
319         self.dtime = payload[:6]
320         if self.dtime == b"\0\0\0\0\0\0":
321             self.devtime = None
322         else:
323             self.devtime = datetime.strptime(
324                 self.dtime.hex(), "%y%m%d%H%M%S"
325             ).astimezone(tz=timezone.utc)
326         self.wifi_aps = []
327         for i in range(self.length):  # length has special meaning here
328             slice = payload[6 + i * 7 : 13 + i * 7]
329             self.wifi_aps.append(
330                 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
331             )
332         gsm_slice = payload[6 + self.length * 7 :]
333         ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
334         self.gsm_cells = []
335         for i in range(ncells):
336             slice = gsm_slice[4 + i * 5 : 9 + i * 5]
337             locac, cellid, sigstr = unpack(
338                 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
339             )
340             self.gsm_cells.append((locac, cellid, -sigstr))
341         return self
342
343
344 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
345     PROTO = 0x17
346     RESPOND = Respond.INL
347
348     def out_encode(self):
349         return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
350
351
352 class TIME(GPS303Pkt):
353     PROTO = 0x30
354     RESPOND = Respond.INL
355
356     def out_encode(self):
357         return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
358
359
360 class PROHIBIT_LBS(GPS303Pkt):
361     PROTO = 0x33
362     OUT_KWARGS = (("status", int, 1),)
363
364     def out_encode(self):  # Server sent, 0-off, 1-on
365         return pack("B", self.status)
366
367
368 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
369     PROTO = 0x34
370
371     # Data is in packed decimal
372     # 00/01 - GPS on/off
373     # 00/01 - Don't set / Set upload period
374     # HHMMHHMM - Upload period
375     # 00/01 - LBS on/off
376     # 00/01 - Don't set / Set time of boot
377     # HHMM  - Time of boot
378     # 00/01 - Don't set / Set time of shutdown
379     # HHMM  - Time of shutdown
380     def out_encode(self):
381         return b""  # TODO
382
383
384 class _SET_PHONE(GPS303Pkt):
385     OUT_KWARGS = (("phone", str, ""),)
386
387     def out_encode(self):
388         return self.phone.encode()
389
390
391 class REMOTE_MONITOR_PHONE(_SET_PHONE):
392     PROTO = 0x40
393
394
395 class SOS_PHONE(_SET_PHONE):
396     PROTO = 0x41
397
398
399 class DAD_PHONE(_SET_PHONE):
400     PROTO = 0x42
401
402
403 class MOM_PHONE(_SET_PHONE):
404     PROTO = 0x43
405
406
407 class STOP_UPLOAD(GPS303Pkt):  # Server response to LOGIN to thwart the device
408     PROTO = 0x44
409
410
411 class GPS_OFF_PERIOD(GPS303Pkt):
412     PROTO = 0x46
413     OUT_KWARGS = (
414         ("onoff", int, 0),
415         ("fm", hhmm, "0000"),
416         ("to", hhmm, "2359"),
417     )
418
419     def out_encode(self):
420         return (
421             pack("B", self.onoff)
422             + bytes.fromhex(self.fm)
423             + bytes.fromhex(self.to)
424         )
425
426
427 class DND_PERIOD(GPS303Pkt):
428     PROTO = 0x47
429     OUT_KWARGS = (
430         ("onoff", int, 0),
431         ("week", int, 3),
432         ("fm1", hhmm, "0000"),
433         ("to1", hhmm, "2359"),
434         ("fm2", hhmm, "0000"),
435         ("to2", hhmm, "2359"),
436     )
437
438     def out_endode(self):
439         return (
440             pack("B", self.onoff)
441             + pack("B", self.week)
442             + bytes.fromhex(self.fm1)
443             + bytes.fromhex(self.to1)
444             + bytes.fromhex(self.fm2)
445             + bytes.fromhex(self.to2)
446         )
447
448
449 class RESTART_SHUTDOWN(GPS303Pkt):
450     PROTO = 0x48
451     OUT_KWARGS = (("flag", int, 0),)
452
453     def out_encode(self):
454         # 1 - restart
455         # 2 - shutdown
456         return pack("B", self.flag)
457
458
459 class DEVICE(GPS303Pkt):
460     PROTO = 0x49
461     OUT_KWARGS = (("flag", int, 0),)
462
463     # 0 - Stop looking for equipment
464     # 1 - Start looking for equipment
465     def out_encode(self):
466         return pack("B", self.flag)
467
468
469 class ALARM_CLOCK(GPS303Pkt):
470     PROTO = 0x50
471
472     def out_encode(self):
473         # TODO implement parsing kwargs
474         alarms = ((0, "0000"), (0, "0000"), (0, "0000"))
475         return b"".join(
476             cls("B", day) + bytes.fromhex(tm) for day, tm in alarms
477         )
478
479
480 class STOP_ALARM(GPS303Pkt):
481     PROTO = 0x56
482
483     def in_decode(self, length, payload):
484         self.flag = payload[0]
485         return self
486
487
488 class SETUP(GPS303Pkt):
489     PROTO = 0x57
490     RESPOND = Respond.EXT
491     OUT_KWARGS = (
492         ("uploadintervalseconds", intx, 0x0300),
493         ("binaryswitch", intx, 0b00110001),
494         ("alarms", l3int, [0, 0, 0]),
495         ("dndtimeswitch", int, 0),
496         ("dndtimes", l3int, [0, 0, 0]),
497         ("gpstimeswitch", int, 0),
498         ("gpstimestart", int, 0),
499         ("gpstimestop", int, 0),
500         ("phonenumbers", l3str, ["", "", ""]),
501     )
502
503     def out_encode(self):
504         def pack3b(x):
505             return pack("!I", x)[1:]
506
507         return b"".join(
508             [
509                 pack("!H", self.uploadintervalseconds),
510                 pack("B", self.binaryswitch),
511             ]
512             + [pack3b(el) for el in self.alarms]
513             + [
514                 pack("B", self.dndtimeswitch),
515             ]
516             + [pack3b(el) for el in self.dndtimes]
517             + [
518                 pack("B", self.gpstimeswitch),
519                 pack("!H", self.gpstimestart),
520                 pack("!H", self.gpstimestop),
521             ]
522             + [b";".join([el.encode() for el in self.phonenumbers])]
523         )
524
525
526 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
527     PROTO = 0x58
528
529
530 class RESTORE_PASSWORD(GPS303Pkt):
531     PROTO = 0x67
532
533
534 class WIFI_POSITIONING(_WIFI_POSITIONING):
535     PROTO = 0x69
536     RESPOND = Respond.EXT
537     OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
538
539     def out_encode(self):
540         if self.latitude is None or self.longitude is None:
541             return b""
542         return "{:+#010.8g},{:+#010.8g}".format(
543             self.latitude, self.longitude
544         ).encode()
545
546     def out_decode(self, length, payload):
547         lat, lon = payload.decode().split(",")
548         self.latitude = float(lat)
549         self.longitude = float(lon)
550
551
552 class MANUAL_POSITIONING(GPS303Pkt):
553     PROTO = 0x80
554
555     def in_decode(self, length, payload):
556         self.flag = payload[0] if len(payload) > 0 else None
557         self.reason = {
558             1: "Incorrect time",
559             2: "LBS less",
560             3: "WiFi less",
561             4: "LBS search > 3 times",
562             5: "Same LBS and WiFi data",
563             6: "LBS prohibited, WiFi absent",
564             7: "GPS spacing < 50 m",
565         }.get(self.flag, "Unknown")
566         return self
567
568
569 class BATTERY_CHARGE(GPS303Pkt):
570     PROTO = 0x81
571
572
573 class CHARGER_CONNECTED(GPS303Pkt):
574     PROTO = 0x82
575
576
577 class CHARGER_DISCONNECTED(GPS303Pkt):
578     PROTO = 0x83
579
580
581 class VIBRATION_RECEIVED(GPS303Pkt):
582     PROTO = 0x94
583
584
585 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
586     PROTO = 0x98
587     RESPOND = Respond.EXT
588     OUT_KWARGS = (("interval", int, 10),)
589
590     def in_decode(self, length, payload):
591         self.interval = unpack("!H", payload[:2])
592         return self
593
594     def out_encode(self):
595         return pack("!H", interval)
596
597
598 class SOS_ALARM(GPS303Pkt):
599     PROTO = 0x99
600
601
602 class UNKNOWN_B3(GPS303Pkt):
603     PROTO = 0xB3
604     IN_KWARGS = (("asciidata", str, ""),)
605
606     def in_decode(self, length, payload):
607         self.asciidata = payload.decode()
608         return self
609
610
611 # Build dicts protocol number -> class and class name -> protocol number
612 CLASSES = {}
613 PROTOS = {}
614 if True:  # just to indent the code, sorry!
615     for cls in [
616         cls
617         for name, cls in globals().items()
618         if isclass(cls)
619         and issubclass(cls, GPS303Pkt)
620         and not name.startswith("_")
621     ]:
622         if hasattr(cls, "PROTO"):
623             CLASSES[cls.PROTO] = cls
624             PROTOS[cls.__name__] = cls.PROTO
625
626
627 def class_by_prefix(prefix):
628     lst = [
629         (name, proto)
630         for name, proto in PROTOS.items()
631         if name.upper().startswith(prefix.upper())
632     ]
633     if len(lst) != 1:
634         return lst
635     _, proto = lst[0]
636     return CLASSES[proto]
637
638
639 def proto_by_name(name):
640     return PROTOS.get(name, -1)
641
642
643 def proto_of_message(packet):
644     return unpack("B", packet[1:2])[0]
645
646
647 def inline_response(packet):
648     proto = proto_of_message(packet)
649     if proto in CLASSES:
650         cls = CLASSES[proto]
651         if cls.RESPOND is Respond.INL:
652             return cls.Out().packed
653     return None
654
655
656 def parse_message(packet, is_incoming=True):
657     """From a packet (without framing bytes) derive the XXX.In object"""
658     length, proto = unpack("BB", packet[:2])
659     payload = packet[2:]
660     if proto in CLASSES:
661         if is_incoming:
662             return CLASSES[proto].In(length, payload)
663         else:
664             return CLASSES[proto].Out(length, payload)
665     else:
666         retobj = UNKNOWN.In(length, payload)
667         retobj.PROTO = proto  # Override class attr with object attr
668         return retobj