]> www.average.org Git - loctrkd.git/blob - gps303/gps303proto.py
do not respond to hibernation; minor cleanup
[loctrkd.git] / gps303 / gps303proto.py
1 """
2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
5
6 Forewarnings:
7 1. There is no security whatsoever. If you know the module's IMEI,
8    you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10    the TCP stream) makes it vulnerable to coincidental appearance
11    of framing bytes in the middle of the message. Most of the time
12    the server will receive one message in one TCP segment (i.e. in
13    one `recv()` operation, but relying on that would break things
14    if the path has lower MTU than the size of a message.
15 """
16
17 from datetime import datetime, timezone
18 from enum import Enum
19 from inspect import isclass
20 from struct import pack, unpack
21
22 __all__ = (
23     "class_by_prefix",
24     "inline_response",
25     "parse_message",
26     "proto_by_name",
27     "Respond",
28     "GPS303Pkt",
29     "UNKNOWN",
30     "LOGIN",
31     "SUPERVISION",
32     "HEARTBEAT",
33     "GPS_POSITIONING",
34     "GPS_OFFLINE_POSITIONING",
35     "STATUS",
36     "HIBERNATION",
37     "RESET",
38     "WHITELIST_TOTAL",
39     "WIFI_OFFLINE_POSITIONING",
40     "TIME",
41     "PROHIBIT_LBS",
42     "GPS_LBS_SWITCH_TIMES",
43     "REMOTE_MONITOR_PHONE",
44     "SOS_PHONE",
45     "DAD_PHONE",
46     "MOM_PHONE",
47     "STOP_UPLOAD",
48     "GPS_OFF_PERIOD",
49     "DND_PERIOD",
50     "RESTART_SHUTDOWN",
51     "DEVICE",
52     "ALARM_CLOCK",
53     "STOP_ALARM",
54     "SETUP",
55     "SYNCHRONOUS_WHITELIST",
56     "RESTORE_PASSWORD",
57     "WIFI_POSITIONING",
58     "MANUAL_POSITIONING",
59     "BATTERY_CHARGE",
60     "CHARGER_CONNECTED",
61     "CHARGER_DISCONNECTED",
62     "VIBRATION_RECEIVED",
63     "POSITION_UPLOAD_INTERVAL",
64     "SOS_ALARM",
65     "UNKNOWN_B3",
66 )
67
68
69 def intx(x):
70     if isinstance(x, str):
71         x = int(x, 0)
72     return x
73
74
75 def hhmm(x):
76     """Check for the string that represents hours and minutes"""
77     if not isinstance(x, str) or len(x) != 4:
78         raise ValueError(str(x) + " is not a four-character string")
79     hh = int(x[:2])
80     mm = int(x[2:])
81     if hh < 0 or hh > 23 or mm < 0 or mm > 59:
82         raise ValueError(str(x) + " does not contain valid hours and minutes")
83     return x
84
85
86 def l3str(x):
87     if isinstance(x, str):
88         x = x.split(",")
89     if len(x) != 3 or not all(isinstance(el, str) for el in x):
90         raise ValueError(str(x) + " is not a list of three strings")
91     return x
92
93
94 def l3int(x):
95     if isinstance(x, str):
96         x = x.split(",")
97         x = [int(el) for el in x]
98     if len(x) != 3 or not all(isinstance(el, int) for el in x):
99         raise ValueError(str(x) + " is not a list of three integers")
100     return x
101
102
103 class MetaPkt(type):
104     """
105     For each class corresponding to a message, automatically create
106     two nested classes `In` and `Out` that also inherit from their
107     "nest". Class attribute `IN_KWARGS` defined in the "nest" is
108     copied to the `In` nested class under the name `KWARGS`, and
109     likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
110     to the nested class `Out`. In addition, method `encode` is
111     defined in both classes equal to `in_encode()` and `out_encode()`
112     respectively.
113     """
114
115     def __new__(cls, name, bases, attrs):
116         newcls = super().__new__(cls, name, bases, attrs)
117         newcls.In = super().__new__(
118             cls,
119             name + ".In",
120             (newcls,) + bases,
121             {
122                 "KWARGS": newcls.IN_KWARGS,
123                 "decode": newcls.in_decode,
124                 "encode": newcls.in_encode,
125             },
126         )
127         newcls.Out = super().__new__(
128             cls,
129             name + ".Out",
130             (newcls,) + bases,
131             {
132                 "KWARGS": newcls.OUT_KWARGS,
133                 "decode": newcls.out_decode,
134                 "encode": newcls.out_encode,
135             },
136         )
137         return newcls
138
139
140 class Respond(Enum):
141     NON = 0  # Incoming, no response needed
142     INL = 1  # Birirectional, use `inline_response()`
143     EXT = 2  # Birirectional, use external responder
144
145
146 class GPS303Pkt(metaclass=MetaPkt):
147     RESPOND = Respond.NON  # Do not send anything back by default
148     PROTO: int
149     IN_KWARGS = ()
150     OUT_KWARGS = ()
151
152     def __init__(self, *args, **kwargs):
153         """
154         Construct the object _either_ from (length, payload),
155         _or_ from the values of individual fields
156         """
157         assert not args or (len(args) == 2 and not kwargs)
158         if args:  # guaranteed to be two arguments at this point
159             self.length, self.payload = args
160             self.decode(self.length, self.payload)
161         else:
162             for kw, typ, dfl in self.KWARGS:
163                 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
164             if kwargs:
165                 raise ValueError(
166                     self.__class__.__name__ + " stray kwargs " + str(kwargs)
167                 )
168
169     def __repr__(self):
170         return "{}({})".format(
171             self.__class__.__name__,
172             ", ".join(
173                 "{}={}".format(
174                     k,
175                     'bytes.fromhex("{}")'.format(v.hex())
176                     if isinstance(v, bytes)
177                     else v.__repr__(),
178                 )
179                 for k, v in self.__dict__.items()
180                 if not k.startswith("_")
181             ),
182         )
183
184     def in_decode(self, length, packet):
185         # Overridden in subclasses, otherwise do not decode payload
186         return
187
188     def out_decode(self, length, packet):
189         # Overridden in subclasses, otherwise do not decode payload
190         return
191
192     def in_encode(self):
193         # Necessary to emulate terminal, which is not implemented
194         raise NotImplementedError(
195             self.__class__.__name__ + ".encode() not implemented"
196         )
197
198     def out_encode(self):
199         # Overridden in subclasses, otherwise make empty payload
200         return b""
201
202     @property
203     def packed(self):
204         payload = self.encode()
205         length = len(payload) + 1
206         return pack("BB", length, self.PROTO) + payload
207
208
209 class UNKNOWN(GPS303Pkt):
210     PROTO = 256  # > 255 is impossible in real packets
211
212
213 class LOGIN(GPS303Pkt):
214     PROTO = 0x01
215     RESPOND = Respond.INL
216     # Default response for ACK, can also respond with STOP_UPLOAD
217
218     def in_decode(self, length, payload):
219         self.imei = payload[:-1].hex()
220         self.ver = unpack("B", payload[-1:])[0]
221         return self
222
223
224 class SUPERVISION(GPS303Pkt):
225     PROTO = 0x05
226     OUT_KWARGS = (("status", int, 1),)
227
228     def out_encode(self):
229         # 1: The device automatically answers Pickup effect
230         # 2: Automatically Answering Two-way Calls
231         # 3: Ring manually answer the two-way call
232         return pack("B", self.status)
233
234
235 class HEARTBEAT(GPS303Pkt):
236     PROTO = 0x08
237     RESPOND = Respond.INL
238
239
240 class _GPS_POSITIONING(GPS303Pkt):
241     RESPOND = Respond.INL
242
243     def in_decode(self, length, payload):
244         self.dtime = payload[:6]
245         if self.dtime == b"\0\0\0\0\0\0":
246             self.devtime = None
247         else:
248             yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
249             self.devtime = datetime(
250                 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
251             )
252         self.gps_data_length = payload[6] >> 4
253         self.gps_nb_sat = payload[6] & 0x0F
254         lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
255         self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
256         flip_lon = bool(flags & 0b0000100000000000)  # bit 4
257         flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
258         self.heading = flags & 0b0000001111111111  # bits 6 - last
259         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
260         self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
261         self.speed = speed
262         self.flags = flags
263         return self
264
265     def out_encode(self):
266         tup = datetime.utcnow().timetuple()
267         ttup = (tup[0] % 100,) + tup[1:6]
268         return pack("BBBBBB", *ttup)
269
270
271 class GPS_POSITIONING(_GPS_POSITIONING):
272     PROTO = 0x10
273
274
275 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
276     PROTO = 0x11
277
278
279 class STATUS(GPS303Pkt):
280     PROTO = 0x13
281     RESPOND = Respond.EXT
282     OUT_KWARGS = (("upload_interval", int, 25),)
283
284     def in_decode(self, length, payload):
285         self.batt, self.ver, self.timezone, self.intvl = unpack(
286             "BBBB", payload[:4]
287         )
288         if len(payload) > 4:
289             self.signal = payload[4]
290         else:
291             self.signal = None
292         return self
293
294     def out_encode(self):  # Set interval in minutes
295         return pack("B", self.upload_interval)
296
297
298 class HIBERNATION(GPS303Pkt):  # Server can send to send devicee to sleep
299     PROTO = 0x14
300
301
302 class RESET(GPS303Pkt):
303     # Device sends when it got reset SMS
304     # Server can send to initiate factory reset
305     PROTO = 0x15
306
307
308 class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
309     PROTO = 0x16
310     OUT_KWARGS = (("number", int, 3),)
311
312     def out_encode(self):  # Number of whitelist entries
313         return pack("B", number)
314
315
316 class _WIFI_POSITIONING(GPS303Pkt):
317     def in_decode(self, length, payload):
318         self.dtime = payload[:6]
319         if self.dtime == b"\0\0\0\0\0\0":
320             self.devtime = None
321         else:
322             self.devtime = datetime.strptime(
323                 self.dtime.hex(), "%y%m%d%H%M%S"
324             ).astimezone(tz=timezone.utc)
325         self.wifi_aps = []
326         for i in range(self.length):  # length has special meaning here
327             slice = payload[6 + i * 7 : 13 + i * 7]
328             self.wifi_aps.append(
329                 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
330             )
331         gsm_slice = payload[6 + self.length * 7 :]
332         ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
333         self.gsm_cells = []
334         for i in range(ncells):
335             slice = gsm_slice[4 + i * 5 : 9 + i * 5]
336             locac, cellid, sigstr = unpack(
337                 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
338             )
339             self.gsm_cells.append((locac, cellid, -sigstr))
340         return self
341
342
343 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
344     PROTO = 0x17
345     RESPOND = Respond.INL
346
347     def out_encode(self):
348         return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
349
350
351 class TIME(GPS303Pkt):
352     PROTO = 0x30
353     RESPOND = Respond.INL
354
355     def out_encode(self):
356         return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
357
358
359 class PROHIBIT_LBS(GPS303Pkt):
360     PROTO = 0x33
361     OUT_KWARGS = (("status", int, 1),)
362
363     def out_encode(self):  # Server sent, 0-off, 1-on
364         return pack("B", self.status)
365
366
367 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
368     PROTO = 0x34
369
370     # Data is in packed decimal
371     # 00/01 - GPS on/off
372     # 00/01 - Don't set / Set upload period
373     # HHMMHHMM - Upload period
374     # 00/01 - LBS on/off
375     # 00/01 - Don't set / Set time of boot
376     # HHMM  - Time of boot
377     # 00/01 - Don't set / Set time of shutdown
378     # HHMM  - Time of shutdown
379     def out_encode(self):
380         return b""  # TODO
381
382
383 class _SET_PHONE(GPS303Pkt):
384     OUT_KWARGS = (("phone", str, ""),)
385
386     def out_encode(self):
387         return self.phone.encode()
388
389
390 class REMOTE_MONITOR_PHONE(_SET_PHONE):
391     PROTO = 0x40
392
393
394 class SOS_PHONE(_SET_PHONE):
395     PROTO = 0x41
396
397
398 class DAD_PHONE(_SET_PHONE):
399     PROTO = 0x42
400
401
402 class MOM_PHONE(_SET_PHONE):
403     PROTO = 0x43
404
405
406 class STOP_UPLOAD(GPS303Pkt):  # Server response to LOGIN to thwart the device
407     PROTO = 0x44
408
409
410 class GPS_OFF_PERIOD(GPS303Pkt):
411     PROTO = 0x46
412     OUT_KWARGS = (
413         ("onoff", int, 0),
414         ("fm", hhmm, "0000"),
415         ("to", hhmm, "2359"),
416     )
417
418     def out_encode(self):
419         return (
420             pack("B", self.onoff)
421             + bytes.fromhex(self.fm)
422             + bytes.fromhex(self.to)
423         )
424
425
426 class DND_PERIOD(GPS303Pkt):
427     PROTO = 0x47
428     OUT_KWARGS = (
429         ("onoff", int, 0),
430         ("week", int, 3),
431         ("fm1", hhmm, "0000"),
432         ("to1", hhmm, "2359"),
433         ("fm2", hhmm, "0000"),
434         ("to2", hhmm, "2359"),
435     )
436
437     def out_endode(self):
438         return (
439             pack("B", self.onoff)
440             + pack("B", self.week)
441             + bytes.fromhex(self.fm1)
442             + bytes.fromhex(self.to1)
443             + bytes.fromhex(self.fm2)
444             + bytes.fromhex(self.to2)
445         )
446
447
448 class RESTART_SHUTDOWN(GPS303Pkt):
449     PROTO = 0x48
450     OUT_KWARGS = (("flag", int, 0),)
451
452     def out_encode(self):
453         # 1 - restart
454         # 2 - shutdown
455         return pack("B", self.flag)
456
457
458 class DEVICE(GPS303Pkt):
459     PROTO = 0x49
460     OUT_KWARGS = (("flag", int, 0),)
461
462     # 0 - Stop looking for equipment
463     # 1 - Start looking for equipment
464     def out_encode(self):
465         return pack("B", self.flag)
466
467
468 class ALARM_CLOCK(GPS303Pkt):
469     PROTO = 0x50
470
471     def out_encode(self):
472         # TODO implement parsing kwargs
473         alarms = ((0, "0000"), (0, "0000"), (0, "0000"))
474         return b"".join(
475             cls("B", day) + bytes.fromhex(tm) for day, tm in alarms
476         )
477
478
479 class STOP_ALARM(GPS303Pkt):
480     PROTO = 0x56
481
482     def in_decode(self, length, payload):
483         self.flag = payload[0]
484         return self
485
486
487 class SETUP(GPS303Pkt):
488     PROTO = 0x57
489     RESPOND = Respond.EXT
490     OUT_KWARGS = (
491         ("uploadintervalseconds", intx, 0x0300),
492         ("binaryswitch", intx, 0b00110001),
493         ("alarms", l3int, [0, 0, 0]),
494         ("dndtimeswitch", int, 0),
495         ("dndtimes", l3int, [0, 0, 0]),
496         ("gpstimeswitch", int, 0),
497         ("gpstimestart", int, 0),
498         ("gpstimestop", int, 0),
499         ("phonenumbers", l3str, ["", "", ""]),
500     )
501
502     def out_encode(self):
503         def pack3b(x):
504             return pack("!I", x)[1:]
505
506         return b"".join(
507             [
508                 pack("!H", self.uploadintervalseconds),
509                 pack("B", self.binaryswitch),
510             ]
511             + [pack3b(el) for el in self.alarms]
512             + [
513                 pack("B", self.dndtimeswitch),
514             ]
515             + [pack3b(el) for el in self.dndtimes]
516             + [
517                 pack("B", self.gpstimeswitch),
518                 pack("!H", self.gpstimestart),
519                 pack("!H", self.gpstimestop),
520             ]
521             + [b";".join([el.encode() for el in self.phonenumbers])]
522         )
523
524
525 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
526     PROTO = 0x58
527
528
529 class RESTORE_PASSWORD(GPS303Pkt):
530     PROTO = 0x67
531
532
533 class WIFI_POSITIONING(_WIFI_POSITIONING):
534     PROTO = 0x69
535     RESPOND = Respond.EXT
536     OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
537
538     def out_encode(self):
539         if self.latitude is None or self.longitude is None:
540             return b""
541         return "{:+#010.8g},{:+#010.8g}".format(
542             self.latitude, self.longitude
543         ).encode()
544
545     def out_decode(self, length, payload):
546         lat, lon = payload.decode().split(",")
547         self.latitude = float(lat)
548         self.longitude = float(lon)
549
550
551 class MANUAL_POSITIONING(GPS303Pkt):
552     PROTO = 0x80
553
554     def in_decode(self, length, payload):
555         self.flag = payload[0] if len(payload) > 0 else None
556         self.reason = {
557             1: "Incorrect time",
558             2: "LBS less",
559             3: "WiFi less",
560             4: "LBS search > 3 times",
561             5: "Same LBS and WiFi data",
562             6: "LBS prohibited, WiFi absent",
563             7: "GPS spacing < 50 m",
564         }.get(self.flag, "Unknown")
565         return self
566
567
568 class BATTERY_CHARGE(GPS303Pkt):
569     PROTO = 0x81
570
571
572 class CHARGER_CONNECTED(GPS303Pkt):
573     PROTO = 0x82
574
575
576 class CHARGER_DISCONNECTED(GPS303Pkt):
577     PROTO = 0x83
578
579
580 class VIBRATION_RECEIVED(GPS303Pkt):
581     PROTO = 0x94
582
583
584 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
585     PROTO = 0x98
586     RESPOND = Respond.EXT
587     OUT_KWARGS = (("interval", int, 10),)
588
589     def in_decode(self, length, payload):
590         self.interval = unpack("!H", payload[:2])
591         return self
592
593     def out_encode(self):
594         return pack("!H", interval)
595
596
597 class SOS_ALARM(GPS303Pkt):
598     PROTO = 0x99
599
600
601 class UNKNOWN_B3(GPS303Pkt):
602     PROTO = 0xB3
603     IN_KWARGS = (("asciidata", str, ""),)
604
605     def in_decode(self, length, payload):
606         self.asciidata = payload.decode()
607         return self
608
609
610 # Build dicts protocol number -> class and class name -> protocol number
611 CLASSES = {}
612 PROTOS = {}
613 if True:  # just to indent the code, sorry!
614     for cls in [
615         cls
616         for name, cls in globals().items()
617         if isclass(cls)
618         and issubclass(cls, GPS303Pkt)
619         and not name.startswith("_")
620     ]:
621         if hasattr(cls, "PROTO"):
622             CLASSES[cls.PROTO] = cls
623             PROTOS[cls.__name__] = cls.PROTO
624
625
626 def class_by_prefix(prefix):
627     lst = [
628         (name, proto)
629         for name, proto in PROTOS.items()
630         if name.upper().startswith(prefix.upper())
631     ]
632     if len(lst) != 1:
633         return lst
634     _, proto = lst[0]
635     return CLASSES[proto]
636
637
638 def proto_by_name(name):
639     return PROTOS.get(name, -1)
640
641
642 def proto_of_message(packet):
643     return unpack("B", packet[1:2])[0]
644
645
646 def inline_response(packet):
647     proto = proto_of_message(packet)
648     if proto in CLASSES:
649         cls = CLASSES[proto]
650         if cls.RESPOND is Respond.INL:
651             return cls.Out().packed
652     return None
653
654
655 def parse_message(packet, is_incoming=True):
656     """From a packet (without framing bytes) derive the XXX.In object"""
657     length, proto = unpack("BB", packet[:2])
658     payload = packet[2:]
659     if proto in CLASSES:
660         if is_incoming:
661             return CLASSES[proto].In(length, payload)
662         else:
663             return CLASSES[proto].Out(length, payload)
664     else:
665         retobj = UNKNOWN.In(length, payload)
666         retobj.PROTO = proto  # Override class attr with object attr
667         return retobj