]> www.average.org Git - loctrkd.git/blob - gps303/gps303proto.py
31073d8ceacb6ff541d926fb78c72587ff91acf9
[loctrkd.git] / gps303 / gps303proto.py
1 """
2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
5
6 Forewarnings:
7 1. There is no security whatsoever. If you know the module's IMEI,
8    you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10    the TCP stream) makes it vulnerable to coincidental appearance
11    of framing bytes in the middle of the message. Most of the time
12    the server will receive one message in one TCP segment (i.e. in
13    one `recv()` operation, but relying on that would break things
14    if the path has lower MTU than the size of a message.
15 """
16
17 from datetime import datetime, timezone
18 from enum import Enum
19 from inspect import isclass
20 from logging import getLogger
21 from struct import pack, unpack
22
23 __all__ = (
24     "class_by_prefix",
25     "inline_response",
26     "make_object",
27     "parse_message",
28     "proto_by_name",
29     "Respond",
30     "GPS303Pkt",
31     "UNKNOWN",
32     "LOGIN",
33     "SUPERVISION",
34     "HEARTBEAT",
35     "GPS_POSITIONING",
36     "GPS_OFFLINE_POSITIONING",
37     "STATUS",
38     "HIBERNATION",
39     "RESET",
40     "WHITELIST_TOTAL",
41     "WIFI_OFFLINE_POSITIONING",
42     "TIME",
43     "PROHIBIT_LBS",
44     "GPS_LBS_SWITCH_TIMES",
45     "REMOTE_MONITOR_PHONE",
46     "SOS_PHONE",
47     "DAD_PHONE",
48     "MOM_PHONE",
49     "STOP_UPLOAD",
50     "GPS_OFF_PERIOD",
51     "DND_PERIOD",
52     "RESTART_SHUTDOWN",
53     "DEVICE",
54     "ALARM_CLOCK",
55     "STOP_ALARM",
56     "SETUP",
57     "SYNCHRONOUS_WHITELIST",
58     "RESTORE_PASSWORD",
59     "WIFI_POSITIONING",
60     "MANUAL_POSITIONING",
61     "BATTERY_CHARGE",
62     "CHARGER_CONNECTED",
63     "CHARGER_DISCONNECTED",
64     "VIBRATION_RECEIVED",
65     "POSITION_UPLOAD_INTERVAL",
66     "SOS_ALARM",
67 )
68
69 log = getLogger("gps303")
70
71
72 def intx(x):
73     if isinstance(x, str):
74         x = int(x, 0)
75     return x
76
77
78 def hhmm(x):
79     """Check for the string that represents hours and minutes"""
80     if not isinstance(x, str) or len(x) != 4:
81         raise ValueError(str(x) + " is not a four-character string")
82     hh = int(x[:2])
83     mm = int(x[2:])
84     if hh < 0 or hh > 23 or mm < 0 or mm > 59:
85         raise ValueError(str(x) + " does not contain valid hours and minutes")
86     return x
87
88
89 def l3str(x):
90     if isinstance(x, str):
91         x = x.split(",")
92     if len(x) != 3 or not all(isinstance(el, str) for el in x):
93         raise ValueError(str(x) + " is not a list of three strings")
94     return x
95
96
97 def l3int(x):
98     if isinstance(x, str):
99         x = x.split(",")
100         x = [int(el) for el in x]
101     if len(x) != 3 or not all(isinstance(el, int) for el in x):
102         raise ValueError(str(x) + " is not a list of three integers")
103     return x
104
105
106 class MetaPkt(type):
107     """
108     For each class corresponding to a message, automatically create
109     two nested classes `In` and `Out` that also inherit from their
110     "nest". Class attribute `IN_KWARGS` defined in the "nest" is
111     copied to the `In` nested class under the name `KWARGS`, and
112     likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
113     to the nested class `Out`. In addition, method `encode` is
114     defined in both classes equal to `in_encode()` and `out_encode()`
115     respectively.
116     """
117
118     def __new__(cls, name, bases, attrs):
119         newcls = super().__new__(cls, name, bases, attrs)
120         newcls.In = super().__new__(
121             cls,
122             name + ".In",
123             (newcls,) + bases,
124             {"KWARGS": newcls.IN_KWARGS, "encode": newcls.in_encode},
125         )
126         newcls.Out = super().__new__(
127             cls,
128             name + ".Out",
129             (newcls,) + bases,
130             {"KWARGS": newcls.OUT_KWARGS, "encode": newcls.out_encode},
131         )
132         return newcls
133
134
135 class Respond(Enum):
136     NON = 0  # Incoming, no response needed
137     INL = 1  # Birirectional, use `inline_response()`
138     EXT = 2  # Birirectional, use external responder
139
140
141 class GPS303Pkt(metaclass=MetaPkt):
142     RESPOND = Respond.NON  # Do not send anything back by default
143     PROTO: int
144     # Have these kwargs for now, TODO redo
145     IN_KWARGS = (("length", int, 0), ("payload", bytes, b""))
146     OUT_KWARGS = ()
147
148     def __init__(self, *args, **kwargs):
149         assert len(args) == 0
150         for kw, typ, dfl in self.KWARGS:
151             setattr(self, kw, typ(kwargs.pop(kw, dfl)))
152         if kwargs:
153             print("KWARGS", self.KWARGS)
154             print("kwargs", kwargs)
155             raise TypeError(
156                 self.__class__.__name__ + " stray kwargs " + str(kwargs)
157             )
158
159     def __repr__(self):
160         return "{}({})".format(
161             self.__class__.__name__,
162             ", ".join(
163                 "{}={}".format(
164                     k,
165                     'bytes.fromhex("{}")'.format(v.hex())
166                     if isinstance(v, bytes)
167                     else v.__repr__(),
168                 )
169                 for k, v in self.__dict__.items()
170                 if not k.startswith("_")
171             ),
172         )
173
174     def in_encode(self):
175         raise NotImplementedError(
176             self.__class__.__name__ + ".encode() not implemented"
177         )
178
179     def out_encode(self):
180         return b""
181
182     @property
183     def packed(self):
184         payload = self.encode()
185         length = len(payload) + 1
186         return pack("BB", length, self.PROTO) + payload
187
188     @classmethod
189     def from_packet(cls, length, payload):
190         return cls.In(payload=payload, length=length)
191
192
193 class UNKNOWN(GPS303Pkt):
194     PROTO = 256  # > 255 is impossible in real packets
195
196
197 class LOGIN(GPS303Pkt):
198     PROTO = 0x01
199     RESPOND = Respond.INL
200     # Default response for ACK, can also respond with STOP_UPLOAD
201
202     @classmethod
203     def from_packet(cls, length, payload):
204         self = super().from_packet(length, payload)
205         self.imei = payload[:-1].hex()
206         self.ver = unpack("B", payload[-1:])[0]
207         return self
208
209
210 class SUPERVISION(GPS303Pkt):
211     PROTO = 0x05
212     OUT_KWARGS = (("status", int, 1),)
213
214     def out_encode(self):
215         # 1: The device automatically answers Pickup effect
216         # 2: Automatically Answering Two-way Calls
217         # 3: Ring manually answer the two-way call
218         return pack("B", self.status)
219
220
221 class HEARTBEAT(GPS303Pkt):
222     PROTO = 0x08
223     RESPOND = Respond.INL
224
225
226 class _GPS_POSITIONING(GPS303Pkt):
227     RESPOND = Respond.INL
228
229     @classmethod
230     def from_packet(cls, length, payload):
231         self = super().from_packet(length, payload)
232         self.dtime = payload[:6]
233         if self.dtime == b"\0\0\0\0\0\0":
234             self.devtime = None
235         else:
236             self.devtime = datetime(
237                 *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
238             )
239         self.gps_data_length = payload[6] >> 4
240         self.gps_nb_sat = payload[6] & 0x0F
241         lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
242         self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
243         flip_lon = bool(flags & 0b0000100000000000)  # bit 4
244         flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
245         self.heading = flags & 0b0000001111111111  # bits 6 - last
246         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
247         self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
248         self.speed = speed
249         self.flags = flags
250         return self
251
252     def out_encode(self):
253         tup = datetime.utcnow().timetuple()
254         ttup = (tup[0] % 100,) + tup[1:6]
255         return pack("BBBBBB", *ttup)
256
257
258 class GPS_POSITIONING(_GPS_POSITIONING):
259     PROTO = 0x10
260
261
262 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
263     PROTO = 0x11
264
265
266 class STATUS(GPS303Pkt):
267     PROTO = 0x13
268     RESPOND = Respond.EXT
269     OUT_KWARGS = (("upload_interval", int, 25),)
270
271     @classmethod
272     def from_packet(cls, length, payload):
273         self = super().from_packet(length, payload)
274         if len(payload) == 5:
275             (
276                 self.batt,
277                 self.ver,
278                 self.timezone,
279                 self.intvl,
280                 self.signal,
281             ) = unpack("BBBBB", payload)
282         elif len(payload) == 4:
283             self.batt, self.ver, self.timezone, self.intvl = unpack(
284                 "BBBB", payload
285             )
286             self.signal = None
287         return self
288
289     def out_encode(self):  # Set interval in minutes
290         return pack("B", self.upload_interval)
291
292
293 class HIBERNATION(GPS303Pkt):  # Server can send to send devicee to sleep
294     PROTO = 0x14
295     RESPOND = Respond.INL
296
297
298 class RESET(GPS303Pkt):
299     # Device sends when it got reset SMS
300     # Server can send to initiate factory reset
301     PROTO = 0x15
302
303
304 class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
305     PROTO = 0x16
306     OUT_KWARGS = (("number", int, 3),)
307
308     def out_encode(self):  # Number of whitelist entries
309         return pack("B", number)
310
311
312 class _WIFI_POSITIONING(GPS303Pkt):
313     @classmethod
314     def from_packet(cls, length, payload):
315         self = super().from_packet(length, payload)
316         self.dtime = payload[:6]
317         if self.dtime == b"\0\0\0\0\0\0":
318             self.devtime = None
319         else:
320             self.devtime = datetime.strptime(
321                 self.dtime.hex(), "%y%m%d%H%M%S"
322             ).astimezone(tz=timezone.utc)
323         self.wifi_aps = []
324         for i in range(self.length):  # length has special meaning here
325             slice = payload[6 + i * 7 : 13 + i * 7]
326             self.wifi_aps.append(
327                 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
328             )
329         gsm_slice = payload[6 + self.length * 7 :]
330         ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
331         self.gsm_cells = []
332         for i in range(ncells):
333             slice = gsm_slice[4 + i * 5 : 9 + i * 5]
334             locac, cellid, sigstr = unpack(
335                 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
336             )
337             self.gsm_cells.append((locac, cellid, -sigstr))
338         return self
339
340
341 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
342     PROTO = 0x17
343     RESPOND = Respond.INL
344
345     def out_encode(self):
346         return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
347
348
349 class TIME(GPS303Pkt):
350     PROTO = 0x30
351     RESPOND = Respond.INL
352
353     def out_encode(self):
354         return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
355
356
357 class PROHIBIT_LBS(GPS303Pkt):
358     PROTO = 0x33
359     OUT_KWARGS = (("status", int, 1),)
360
361     def out_encode(self):  # Server sent, 0-off, 1-on
362         return pack("B", self.status)
363
364
365 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
366     PROTO = 0x34
367
368     # Data is in packed decimal
369     # 00/01 - GPS on/off
370     # 00/01 - Don't set / Set upload period
371     # HHMMHHMM - Upload period
372     # 00/01 - LBS on/off
373     # 00/01 - Don't set / Set time of boot
374     # HHMM  - Time of boot
375     # 00/01 - Don't set / Set time of shutdown
376     # HHMM  - Time of shutdown
377     def out_encode(self):
378         return b""  # TODO
379
380
381 class _SET_PHONE(GPS303Pkt):
382     OUT_KWARGS = (("phone", str, ""),)
383
384     def out_encode(self):
385         return self.phone.encode()
386
387
388 class REMOTE_MONITOR_PHONE(_SET_PHONE):
389     PROTO = 0x40
390
391
392 class SOS_PHONE(_SET_PHONE):
393     PROTO = 0x41
394
395
396 class DAD_PHONE(_SET_PHONE):
397     PROTO = 0x42
398
399
400 class MOM_PHONE(_SET_PHONE):
401     PROTO = 0x43
402
403
404 class STOP_UPLOAD(GPS303Pkt):  # Server response to LOGIN to thwart the device
405     PROTO = 0x44
406
407
408 class GPS_OFF_PERIOD(GPS303Pkt):
409     PROTO = 0x46
410     OUT_KWARGS = (
411         ("onoff", int, 0),
412         ("fm", hhmm, "0000"),
413         ("to", hhmm, "2359"),
414     )
415
416     def out_encode(self):
417         return (
418             pack("B", self.onoff)
419             + bytes.fromhex(self.fm)
420             + bytes.fromhex(self.to)
421         )
422
423
424 class DND_PERIOD(GPS303Pkt):
425     PROTO = 0x47
426     OUT_KWARGS = (
427         ("onoff", int, 0),
428         ("week", int, 3),
429         ("fm1", hhmm, "0000"),
430         ("to1", hhmm, "2359"),
431         ("fm2", hhmm, "0000"),
432         ("to2", hhmm, "2359"),
433     )
434
435     def out_endode(self):
436         return (
437             pack("B", self.onoff)
438             + pack("B", self.week)
439             + bytes.fromhex(self.fm1)
440             + bytes.fromhex(self.to1)
441             + bytes.fromhex(self.fm2)
442             + bytes.fromhex(self.to2)
443         )
444
445
446 class RESTART_SHUTDOWN(GPS303Pkt):
447     PROTO = 0x48
448     OUT_KWARGS = (("flag", int, 0),)
449
450     def out_encode(self):
451         # 1 - restart
452         # 2 - shutdown
453         return pack("B", self.flag)
454
455
456 class DEVICE(GPS303Pkt):
457     PROTO = 0x49
458     OUT_KWARGS = (("flag", int, 0),)
459
460     # 0 - Stop looking for equipment
461     # 1 - Start looking for equipment
462     def out_encode(self):
463         return pack("B", self.flag)
464
465
466 class ALARM_CLOCK(GPS303Pkt):
467     PROTO = 0x50
468
469     def out_encode(self):
470         # TODO implement parsing kwargs
471         alarms = ((0, "0000"), (0, "0000"), (0, "0000"))
472         return b"".join(
473             cls("B", day) + bytes.fromhex(tm) for day, tm in alarms
474         )
475
476
477 class STOP_ALARM(GPS303Pkt):
478     PROTO = 0x56
479
480     @classmethod
481     def from_packet(cls, length, payload):
482         self = super().from_packet(length, payload)
483         self.flag = payload[0]
484         return self
485
486
487 class SETUP(GPS303Pkt):
488     PROTO = 0x57
489     RESPOND = Respond.EXT
490     OUT_KWARGS = (
491         ("uploadintervalseconds", intx, 0x0300),
492         ("binaryswitch", intx, 0b00110001),
493         ("alarms", l3int, [0, 0, 0]),
494         ("dndtimeswitch", int, 0),
495         ("dndtimes", l3int, [0, 0, 0]),
496         ("gpstimeswitch", int, 0),
497         ("gpstimestart", int, 0),
498         ("gpstimestop", int, 0),
499         ("phonenumbers", l3str, ["", "", ""]),
500     )
501
502     def out_encode(self):
503         def pack3b(x):
504             return pack("!I", x)[1:]
505
506         return b"".join(
507             [
508                 pack("!H", self.uploadintervalseconds),
509                 pack("B", self.binaryswitch),
510             ]
511             + [pack3b(el) for el in self.alarms]
512             + [
513                 pack("B", self.dndtimeswitch),
514             ]
515             + [pack3b(el) for el in self.dndtimes]
516             + [
517                 pack("B", self.gpstimeswitch),
518                 pack("!H", self.gpstimestart),
519                 pack("!H", self.gpstimestop),
520             ]
521             + [b";".join([el.encode() for el in self.phonenumbers])]
522         )
523
524
525 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
526     PROTO = 0x58
527
528
529 class RESTORE_PASSWORD(GPS303Pkt):
530     PROTO = 0x67
531
532
533 class WIFI_POSITIONING(_WIFI_POSITIONING):
534     PROTO = 0x69
535     RESPOND = Respond.EXT
536     OUT_KWARGS = (("lat", float, None), ("lon", float, None))
537
538     def out_encode(self):
539         if self.lat is None or self.lon is None:
540             return b""
541         return "{:+#010.8g},{:+#010.8g}".format(self.lat, self.lon).encode()
542
543
544 class MANUAL_POSITIONING(GPS303Pkt):
545     PROTO = 0x80
546
547     @classmethod
548     def from_packet(cls, length, payload):
549         self = super().from_packet(length, payload)
550         self.flag = payload[0] if len(payload) > 0 else None
551         self.reason = {
552             1: "Incorrect time",
553             2: "LBS less",
554             3: "WiFi less",
555             4: "LBS search > 3 times",
556             5: "Same LBS and WiFi data",
557             6: "LBS prohibited, WiFi absent",
558             7: "GPS spacing < 50 m",
559         }.get(self.flag, "Unknown")
560         return self
561
562
563 class BATTERY_CHARGE(GPS303Pkt):
564     PROTO = 0x81
565
566
567 class CHARGER_CONNECTED(GPS303Pkt):
568     PROTO = 0x82
569
570
571 class CHARGER_DISCONNECTED(GPS303Pkt):
572     PROTO = 0x83
573
574
575 class VIBRATION_RECEIVED(GPS303Pkt):
576     PROTO = 0x94
577
578
579 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
580     PROTO = 0x98
581     RESPOND = Respond.EXT
582     OUT_KWARGS = (("interval", int, 10),)
583
584     @classmethod
585     def from_packet(cls, length, payload):
586         self = super().from_packet(length, payload)
587         self.interval = unpack("!H", payload[:2])
588         return self
589
590     def out_encode(self):
591         return pack("!H", interval)
592
593
594 class SOS_ALARM(GPS303Pkt):
595     PROTO = 0x99
596
597
598 # Build dicts protocol number -> class and class name -> protocol number
599 CLASSES = {}
600 PROTOS = {}
601 if True:  # just to indent the code, sorry!
602     for cls in [
603         cls
604         for name, cls in globals().items()
605         if isclass(cls)
606         and issubclass(cls, GPS303Pkt)
607         and not name.startswith("_")
608     ]:
609         if hasattr(cls, "PROTO"):
610             CLASSES[cls.PROTO] = cls
611             PROTOS[cls.__name__] = cls.PROTO
612
613
614 def class_by_prefix(prefix):
615     lst = [
616         (name, proto)
617         for name, proto in PROTOS.items()
618         if name.upper().startswith(prefix.upper())
619     ]
620     if len(lst) != 1:
621         return lst
622     _, proto = lst[0]
623     return CLASSES[proto]
624
625
626 def proto_by_name(name):
627     return PROTOS.get(name, -1)
628
629
630 def proto_of_message(packet):
631     return unpack("B", packet[1:2])[0]
632
633
634 def inline_response(packet):
635     proto = proto_of_message(packet)
636     if proto in CLASSES:
637         cls = CLASSES[proto]
638         if cls.RESPOND is Respond.INL:
639             return cls.Out().packed
640     return None
641
642
643 def make_object(length, proto, payload):
644     if proto in CLASSES:
645         return CLASSES[proto].from_packet(length, payload)
646     else:
647         retobj = UNKNOWN.from_packet(length, payload)
648         retobj.PROTO = proto  # Override class attr with object attr
649         return retobj
650
651
652 def parse_message(packet):
653     length, proto = unpack("BB", packet[:2])
654     payload = packet[2:]
655     adjust = 2 if proto == STATUS.PROTO else 4  # Weird special case
656     if (
657         proto not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO)
658         and length > 1
659         and len(payload) + adjust != length
660     ):
661         log.warning(
662             "With proto %d length is %d but payload length is %d+%d",
663             proto,
664             length,
665             len(payload),
666             adjust,
667         )
668     return make_object(length, proto, payload)