]> www.average.org Git - loctrkd.git/blob - gps303/gps303proto.py
remove forgotten make_packet() call
[loctrkd.git] / gps303 / gps303proto.py
1 """
2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
5
6 Forewarnings:
7 1. There is no security whatsoever. If you know the module's IMEI,
8    you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10    the TCP stream) makes it vulnerable to coincidental appearance
11    of framing bytes in the middle of the message. Most of the time
12    the server will receive one message in one TCP segment (i.e. in
13    one `recv()` operation, but relying on that would break things
14    if the path has lower MTU than the size of a message.
15 """
16
17 from datetime import datetime, timezone
18 from enum import Enum
19 from inspect import isclass
20 from logging import getLogger
21 from struct import pack, unpack
22
23 __all__ = (
24     "class_by_prefix",
25     "inline_response",
26     "make_object",
27     "parse_message",
28     "proto_by_name",
29     "Respond",
30     "GPS303Pkt",
31     "UNKNOWN",
32     "LOGIN",
33     "SUPERVISION",
34     "HEARTBEAT",
35     "GPS_POSITIONING",
36     "GPS_OFFLINE_POSITIONING",
37     "STATUS",
38     "HIBERNATION",
39     "RESET",
40     "WHITELIST_TOTAL",
41     "WIFI_OFFLINE_POSITIONING",
42     "TIME",
43     "MOM_PHONE",
44     "STOP_ALARM",
45     "SETUP",
46     "SYNCHRONOUS_WHITELIST",
47     "RESTORE_PASSWORD",
48     "WIFI_POSITIONING",
49     "MANUAL_POSITIONING",
50     "BATTERY_CHARGE",
51     "CHARGER_CONNECTED",
52     "CHARGER_DISCONNECTED",
53     "VIBRATION_RECEIVED",
54     "POSITION_UPLOAD_INTERVAL",
55 )
56
57 log = getLogger("gps303")
58
59
60 def intx(x):
61     if isinstance(x, str):
62         x = int(x, 0)
63     return x
64
65
66 def hhmm(x):
67     """Check for the string that represents hours and minutes"""
68     if not isinstance(x, str) or len(x) != 4:
69         raise ValueError(str(x) + " is not a four-character string")
70     hh = int(x[:2])
71     mm = int(x[2:])
72     if hh < 0 or hh > 23 or mm < 0 or mm > 59:
73         raise ValueError(str(x) + " does not contain valid hours and minutes")
74     return x
75
76
77 def l3str(x):
78     if isinstance(x, str):
79         x = x.split(",")
80     if len(x) != 3 or not all(isinstance(el, str) for el in x):
81         raise ValueError(str(x) + " is not a list of three strings")
82     return x
83
84
85 def l3int(x):
86     if isinstance(x, str):
87         x = x.split(",")
88         x = [int(el) for el in x]
89     if len(x) != 3 or not all(isinstance(el, int) for el in x):
90         raise ValueError(str(x) + " is not a list of three integers")
91     return x
92
93
94 class MetaPkt(type):
95     """
96     For each class corresponding to a message, automatically create
97     two nested classes `In` and `Out` that also inherit from their
98     "nest". Class attribute `IN_KWARGS` defined in the "nest" is
99     copied to the `In` nested class under the name `KWARGS`, and
100     likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
101     to the nested class `Out`. In addition, method `encode` is
102     defined in both classes equal to `in_encode()` and `out_encode()`
103     respectively.
104     """
105
106     def __new__(cls, name, bases, attrs):
107         newcls = super().__new__(cls, name, bases, attrs)
108         newcls.In = super().__new__(
109             cls,
110             name + ".In",
111             (newcls,) + bases,
112             {"KWARGS": newcls.IN_KWARGS, "encode": newcls.in_encode},
113         )
114         newcls.Out = super().__new__(
115             cls,
116             name + ".Out",
117             (newcls,) + bases,
118             {"KWARGS": newcls.OUT_KWARGS, "encode": newcls.out_encode},
119         )
120         return newcls
121
122
123 class Respond(Enum):
124     NON = 0  # Incoming, no response needed
125     INL = 1  # Birirectional, use `inline_response()`
126     EXT = 2  # Birirectional, use external responder
127
128
129 class GPS303Pkt(metaclass=MetaPkt):
130     RESPOND = Respond.NON  # Do not send anything back by default
131     PROTO: int
132     # Have these kwargs for now, TODO redo
133     IN_KWARGS = (("length", int, 0), ("payload", bytes, b""))
134     OUT_KWARGS = ()
135
136     def __init__(self, *args, **kwargs):
137         assert len(args) == 0
138         for kw, typ, dfl in self.KWARGS:
139             setattr(self, kw, typ(kwargs.pop(kw, dfl)))
140         if kwargs:
141             print("KWARGS", self.KWARGS)
142             print("kwargs", kwargs)
143             raise TypeError(
144                 self.__class__.__name__ + " stray kwargs " + str(kwargs)
145             )
146
147     def __repr__(self):
148         return "{}({})".format(
149             self.__class__.__name__,
150             ", ".join(
151                 "{}={}".format(
152                     k,
153                     'bytes.fromhex("{}")'.format(v.hex())
154                     if isinstance(v, bytes)
155                     else v.__repr__(),
156                 )
157                 for k, v in self.__dict__.items()
158                 if not k.startswith("_")
159             ),
160         )
161
162     def in_encode(self):
163         raise NotImplementedError(
164             self.__class__.__name__ + ".encode() not implemented"
165         )
166
167     def out_encode(self):
168         return b""
169
170     @property
171     def packed(self):
172         payload = self.encode()
173         length = len(payload) + 1
174         return pack("BB", length, self.PROTO) + payload
175
176     @classmethod
177     def from_packet(cls, length, payload):
178         return cls.In(payload=payload, length=length)
179
180
181 class UNKNOWN(GPS303Pkt):
182     PROTO = 256  # > 255 is impossible in real packets
183
184
185 class LOGIN(GPS303Pkt):
186     PROTO = 0x01
187     RESPOND = Respond.INL
188     # Default response for ACK, can also respond with STOP_UPLOAD
189
190     @classmethod
191     def from_packet(cls, length, payload):
192         self = super().from_packet(length, payload)
193         self.imei = payload[:-1].hex()
194         self.ver = unpack("B", payload[-1:])[0]
195         return self
196
197
198 class SUPERVISION(GPS303Pkt):
199     PROTO = 0x05
200     OUT_KWARGS = (("status", int, 1),)
201
202     def out_encode(self):
203         # 1: The device automatically answers Pickup effect
204         # 2: Automatically Answering Two-way Calls
205         # 3: Ring manually answer the two-way call
206         return pack("B", self.status)
207
208
209 class HEARTBEAT(GPS303Pkt):
210     PROTO = 0x08
211     RESPOND = Respond.INL
212
213
214 class _GPS_POSITIONING(GPS303Pkt):
215     RESPOND = Respond.INL
216
217     @classmethod
218     def from_packet(cls, length, payload):
219         self = super().from_packet(length, payload)
220         self.dtime = payload[:6]
221         if self.dtime == b"\0\0\0\0\0\0":
222             self.devtime = None
223         else:
224             self.devtime = datetime(
225                 *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
226             )
227         self.gps_data_length = payload[6] >> 4
228         self.gps_nb_sat = payload[6] & 0x0F
229         lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
230         self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
231         flip_lon = bool(flags & 0b0000100000000000)  # bit 4
232         flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
233         self.heading = flags & 0b0000001111111111  # bits 6 - last
234         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
235         self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
236         self.speed = speed
237         self.flags = flags
238         return self
239
240     def out_encode(self):
241         tup = datetime.utcnow().timetuple()
242         ttup = (tup[0] % 100,) + tup[1:6]
243         return pack("BBBBBB", *ttup)
244
245
246 class GPS_POSITIONING(_GPS_POSITIONING):
247     PROTO = 0x10
248
249
250 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
251     PROTO = 0x11
252
253
254 class STATUS(GPS303Pkt):
255     PROTO = 0x13
256     RESPOND = Respond.EXT
257     OUT_KWARGS = (("upload_interval", int, 25),)
258
259     @classmethod
260     def from_packet(cls, length, payload):
261         self = super().from_packet(length, payload)
262         if len(payload) == 5:
263             (
264                 self.batt,
265                 self.ver,
266                 self.timezone,
267                 self.intvl,
268                 self.signal,
269             ) = unpack("BBBBB", payload)
270         elif len(payload) == 4:
271             self.batt, self.ver, self.timezone, self.intvl = unpack(
272                 "BBBB", payload
273             )
274             self.signal = None
275         return self
276
277     def out_encode(self):  # Set interval in minutes
278         return pack("B", self.upload_interval)
279
280
281 class HIBERNATION(GPS303Pkt):  # Server can send to send devicee to sleep
282     PROTO = 0x14
283     RESPOND = Respond.INL
284
285
286 class RESET(GPS303Pkt):
287     # Device sends when it got reset SMS
288     # Server can send to initiate factory reset
289     PROTO = 0x15
290
291
292 class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
293     PROTO = 0x16
294     OUT_KWARGS = (("number", int, 3),)
295
296     def out_encode(self):  # Number of whitelist entries
297         return pack("B", number)
298
299
300 class _WIFI_POSITIONING(GPS303Pkt):
301     @classmethod
302     def from_packet(cls, length, payload):
303         self = super().from_packet(length, payload)
304         self.dtime = payload[:6]
305         if self.dtime == b"\0\0\0\0\0\0":
306             self.devtime = None
307         else:
308             self.devtime = datetime.strptime(
309                 self.dtime.hex(), "%y%m%d%H%M%S"
310             ).astimezone(tz=timezone.utc)
311         self.wifi_aps = []
312         for i in range(self.length):  # length has special meaning here
313             slice = payload[6 + i * 7 : 13 + i * 7]
314             self.wifi_aps.append(
315                 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
316             )
317         gsm_slice = payload[6 + self.length * 7 :]
318         ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
319         self.gsm_cells = []
320         for i in range(ncells):
321             slice = gsm_slice[4 + i * 5 : 9 + i * 5]
322             locac, cellid, sigstr = unpack(
323                 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
324             )
325             self.gsm_cells.append((locac, cellid, -sigstr))
326         return self
327
328
329 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
330     PROTO = 0x17
331     RESPOND = Respond.INL
332
333     def out_encode(self):
334         return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
335
336
337 class TIME(GPS303Pkt):
338     PROTO = 0x30
339     RESPOND = Respond.INL
340
341     def out_encode(self):
342         return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
343
344
345 class PROHIBIT_LBS(GPS303Pkt):
346     PROTO = 0x33
347     OUT_KWARGS = (("status", int, 1),)
348
349     def out_encode(self):  # Server sent, 0-off, 1-on
350         return pack("B", self.status)
351
352
353 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
354     PROTO = 0x34
355
356     # Data is in packed decimal
357     # 00/01 - GPS on/off
358     # 00/01 - Don't set / Set upload period
359     # HHMMHHMM - Upload period
360     # 00/01 - LBS on/off
361     # 00/01 - Don't set / Set time of boot
362     # HHMM  - Time of boot
363     # 00/01 - Don't set / Set time of shutdown
364     # HHMM  - Time of shutdown
365     def out_encode(self):
366         return b""  # TODO
367
368
369 class _SET_PHONE(GPS303Pkt):
370     OUT_KWARGS = (("phone", str, ""),)
371
372     def out_encode(self):
373         return self.phone.encode()
374
375
376 class REMOTE_MONITOR_PHONE(_SET_PHONE):
377     PROTO = 0x40
378
379
380 class SOS_PHONE(_SET_PHONE):
381     PROTO = 0x41
382
383
384 class DAD_PHONE(_SET_PHONE):
385     PROTO = 0x42
386
387
388 class MOM_PHONE(_SET_PHONE):
389     PROTO = 0x43
390
391
392 class STOP_UPLOAD(GPS303Pkt):  # Server response to LOGIN to thwart the device
393     PROTO = 0x44
394
395
396 class GPS_OFF_PERIOD(GPS303Pkt):
397     PROTO = 0x46
398     OUT_KWARGS = (
399         ("onoff", int, 0),
400         ("fm", hhmm, "0000"),
401         ("to", hhmm, "2359"),
402     )
403
404     def out_encode(self):
405         return (
406             pack("B", self.onoff)
407             + bytes.fromhex(self.fm)
408             + bytes.fromhex(self.to)
409         )
410
411
412 class DND_PERIOD(GPS303Pkt):
413     PROTO = 0x47
414     OUT_KWARGS = (
415         ("onoff", int, 0),
416         ("week", int, 3),
417         ("fm1", hhmm, "0000"),
418         ("to1", hhmm, "2359"),
419         ("fm2", hhmm, "0000"),
420         ("to2", hhmm, "2359"),
421     )
422
423     def out_endode(self):
424         return (
425             pack("B", self.onoff)
426             + pack("B", self.week)
427             + bytes.fromhex(self.fm1)
428             + bytes.fromhex(self.to1)
429             + bytes.fromhex(self.fm2)
430             + bytes.fromhex(self.to2)
431         )
432
433
434 class RESTART_SHUTDOWN(GPS303Pkt):
435     PROTO = 0x48
436     OUT_KWARGS = (("flag", int, 0),)
437
438     def out_encode(self):
439         # 1 - restart
440         # 2 - shutdown
441         return pack("B", self.flag)
442
443
444 class DEVICE(GPS303Pkt):
445     PROTO = 0x49
446     OUT_KWARGS = (("flag", int, 0),)
447
448     # 0 - Stop looking for equipment
449     # 1 - Start looking for equipment
450     def out_encode(self):
451         return pack("B", self.flag)
452
453
454 class ALARM_CLOCK(GPS303Pkt):
455     PROTO = 0x50
456
457     def out_encode(self):
458         # TODO implement parsing kwargs
459         alarms = ((0, "0000"), (0, "0000"), (0, "0000"))
460         return b"".join(
461             cls("B", day) + bytes.fromhex(tm) for day, tm in alarms
462         )
463
464
465 class STOP_ALARM(GPS303Pkt):
466     PROTO = 0x56
467
468     @classmethod
469     def from_packet(cls, length, payload):
470         self = super().from_packet(length, payload)
471         self.flag = payload[0]
472         return self
473
474
475 class SETUP(GPS303Pkt):
476     PROTO = 0x57
477     RESPOND = Respond.EXT
478     OUT_KWARGS = (
479         ("uploadintervalseconds", intx, 0x0300),
480         ("binaryswitch", intx, 0b00110001),
481         ("alarms", l3int, [0, 0, 0]),
482         ("dndtimeswitch", int, 0),
483         ("dndtimes", l3int, [0, 0, 0]),
484         ("gpstimeswitch", int, 0),
485         ("gpstimestart", int, 0),
486         ("gpstimestop", int, 0),
487         ("phonenumbers", l3str, ["", "", ""]),
488     )
489
490     def out_encode(self):
491         def pack3b(x):
492             return pack("!I", x)[1:]
493
494         return b"".join(
495             [
496                 pack("!H", self.uploadintervalseconds),
497                 pack("B", self.binaryswitch),
498             ]
499             + [pack3b(el) for el in self.alarms]
500             + [
501                 pack("B", self.dndtimeswitch),
502             ]
503             + [pack3b(el) for el in self.dndtimes]
504             + [
505                 pack("B", self.gpstimeswitch),
506                 pack("!H", self.gpstimestart),
507                 pack("!H", self.gpstimestop),
508             ]
509             + [b";".join([el.encode() for el in self.phonenumbers])]
510         )
511
512
513 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
514     PROTO = 0x58
515
516
517 class RESTORE_PASSWORD(GPS303Pkt):
518     PROTO = 0x67
519
520
521 class WIFI_POSITIONING(_WIFI_POSITIONING):
522     PROTO = 0x69
523     RESPOND = Respond.EXT
524     OUT_KWARGS = (("lat", float, None), ("lon", float, None))
525
526     def out_encode(self):
527         if self.lat is None or self.lon is None:
528             return b""
529         return "{:+#010.8g},{:+#010.8g}".format(self.lat, self.lon).encode()
530
531
532 class MANUAL_POSITIONING(GPS303Pkt):
533     PROTO = 0x80
534
535     @classmethod
536     def from_packet(cls, length, payload):
537         self = super().from_packet(length, payload)
538         self.flag = payload[0] if len(payload) > 0 else None
539         self.reason = {
540             1: "Incorrect time",
541             2: "LBS less",
542             3: "WiFi less",
543             4: "LBS search > 3 times",
544             5: "Same LBS and WiFi data",
545             6: "LBS prohibited, WiFi absent",
546             7: "GPS spacing < 50 m",
547         }.get(self.flag, "Unknown")
548         return self
549
550
551 class BATTERY_CHARGE(GPS303Pkt):
552     PROTO = 0x81
553
554
555 class CHARGER_CONNECTED(GPS303Pkt):
556     PROTO = 0x82
557
558
559 class CHARGER_DISCONNECTED(GPS303Pkt):
560     PROTO = 0x83
561
562
563 class VIBRATION_RECEIVED(GPS303Pkt):
564     PROTO = 0x94
565
566
567 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
568     PROTO = 0x98
569     RESPOND = Respond.EXT
570     OUT_KWARGS = (("interval", int, 10),)
571
572     @classmethod
573     def from_packet(cls, length, payload):
574         self = super().from_packet(length, payload)
575         self.interval = unpack("!H", payload[:2])
576         return self
577
578     def out_encode(self):
579         return pack("!H", interval)
580
581
582 class SOS_ALARM(GPS303Pkt):
583     PROTO = 0x99
584
585
586 # Build dicts protocol number -> class and class name -> protocol number
587 CLASSES = {}
588 PROTOS = {}
589 if True:  # just to indent the code, sorry!
590     for cls in [
591         cls
592         for name, cls in globals().items()
593         if isclass(cls)
594         and issubclass(cls, GPS303Pkt)
595         and not name.startswith("_")
596     ]:
597         if hasattr(cls, "PROTO"):
598             CLASSES[cls.PROTO] = cls
599             PROTOS[cls.__name__] = cls.PROTO
600
601
602 def class_by_prefix(prefix):
603     lst = [
604         (name, proto)
605         for name, proto in PROTOS.items()
606         if name.upper().startswith(prefix.upper())
607     ]
608     if len(lst) != 1:
609         return lst
610     _, proto = lst[0]
611     return CLASSES[proto]
612
613
614 def proto_by_name(name):
615     return PROTOS.get(name, -1)
616
617
618 def proto_of_message(packet):
619     return unpack("B", packet[1:2])[0]
620
621
622 def inline_response(packet):
623     proto = proto_of_message(packet)
624     if proto in CLASSES:
625         cls = CLASSES[proto]
626         if cls.RESPOND is Respond.INL:
627             return cls.Out().packed
628     return None
629
630
631 def make_object(length, proto, payload):
632     if proto in CLASSES:
633         return CLASSES[proto].from_packet(length, payload)
634     else:
635         retobj = UNKNOWN.from_packet(length, payload)
636         retobj.PROTO = proto  # Override class attr with object attr
637         return retobj
638
639
640 def parse_message(packet):
641     length, proto = unpack("BB", packet[:2])
642     payload = packet[2:]
643     adjust = 2 if proto == STATUS.PROTO else 4  # Weird special case
644     if (
645         proto not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO)
646         and length > 1
647         and len(payload) + adjust != length
648     ):
649         log.warning(
650             "With proto %d length is %d but payload length is %d+%d",
651             proto,
652             length,
653             len(payload),
654             adjust,
655         )
656     return make_object(length, proto, payload)