]> www.average.org Git - loctrkd.git/blob - gps303/gps303proto.py
c19c7c1b300c78fe2ea511c1a7859f0c35c434e4
[loctrkd.git] / gps303 / gps303proto.py
1 """
2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
5
6 Forewarnings:
7 1. There is no security whatsoever. If you know the module's IMEI,
8    you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10    the TCP stream) makes it vulnerable to coincidental appearance
11    of framing bytes in the middle of the message. Most of the time
12    the server will receive one message in one TCP segment (i.e. in
13    one `recv()` operation, but relying on that would break things
14    if the path has lower MTU than the size of a message.
15 """
16
17 from datetime import datetime, timezone
18 from enum import Enum
19 from inspect import isclass
20 from struct import pack, unpack
21
22 __all__ = (
23     "class_by_prefix",
24     "inline_response",
25     "parse_message",
26     "proto_by_name",
27     "Respond",
28     "GPS303Pkt",
29     "UNKNOWN",
30     "LOGIN",
31     "SUPERVISION",
32     "HEARTBEAT",
33     "GPS_POSITIONING",
34     "GPS_OFFLINE_POSITIONING",
35     "STATUS",
36     "HIBERNATION",
37     "RESET",
38     "WHITELIST_TOTAL",
39     "WIFI_OFFLINE_POSITIONING",
40     "TIME",
41     "PROHIBIT_LBS",
42     "GPS_LBS_SWITCH_TIMES",
43     "REMOTE_MONITOR_PHONE",
44     "SOS_PHONE",
45     "DAD_PHONE",
46     "MOM_PHONE",
47     "STOP_UPLOAD",
48     "GPS_OFF_PERIOD",
49     "DND_PERIOD",
50     "RESTART_SHUTDOWN",
51     "DEVICE",
52     "ALARM_CLOCK",
53     "STOP_ALARM",
54     "SETUP",
55     "SYNCHRONOUS_WHITELIST",
56     "RESTORE_PASSWORD",
57     "WIFI_POSITIONING",
58     "MANUAL_POSITIONING",
59     "BATTERY_CHARGE",
60     "CHARGER_CONNECTED",
61     "CHARGER_DISCONNECTED",
62     "VIBRATION_RECEIVED",
63     "POSITION_UPLOAD_INTERVAL",
64     "SOS_ALARM",
65     "UNKNOWN_B3",
66 )
67
68
69 def intx(x):
70     if isinstance(x, str):
71         x = int(x, 0)
72     return x
73
74
75 def hhmm(x):
76     """Check for the string that represents hours and minutes"""
77     if not isinstance(x, str) or len(x) != 4:
78         raise ValueError(str(x) + " is not a four-character string")
79     hh = int(x[:2])
80     mm = int(x[2:])
81     if hh < 0 or hh > 23 or mm < 0 or mm > 59:
82         raise ValueError(str(x) + " does not contain valid hours and minutes")
83     return x
84
85
86 def l3str(x):
87     if isinstance(x, str):
88         x = x.split(",")
89     if len(x) != 3 or not all(isinstance(el, str) for el in x):
90         raise ValueError(str(x) + " is not a list of three strings")
91     return x
92
93
94 def l3int(x):
95     if isinstance(x, str):
96         x = x.split(",")
97         x = [int(el) for el in x]
98     if len(x) != 3 or not all(isinstance(el, int) for el in x):
99         raise ValueError(str(x) + " is not a list of three integers")
100     return x
101
102
103 class MetaPkt(type):
104     """
105     For each class corresponding to a message, automatically create
106     two nested classes `In` and `Out` that also inherit from their
107     "nest". Class attribute `IN_KWARGS` defined in the "nest" is
108     copied to the `In` nested class under the name `KWARGS`, and
109     likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
110     to the nested class `Out`. In addition, method `encode` is
111     defined in both classes equal to `in_encode()` and `out_encode()`
112     respectively.
113     """
114
115     def __new__(cls, name, bases, attrs):
116         newcls = super().__new__(cls, name, bases, attrs)
117         newcls.In = super().__new__(
118             cls,
119             name + ".In",
120             (newcls,) + bases,
121             {
122                 "KWARGS": newcls.IN_KWARGS,
123                 "decode": newcls.in_decode,
124                 "encode": newcls.in_encode,
125             },
126         )
127         newcls.Out = super().__new__(
128             cls,
129             name + ".Out",
130             (newcls,) + bases,
131             {
132                 "KWARGS": newcls.OUT_KWARGS,
133                 "decode": newcls.out_decode,
134                 "encode": newcls.out_encode,
135             },
136         )
137         return newcls
138
139
140 class Respond(Enum):
141     NON = 0  # Incoming, no response needed
142     INL = 1  # Birirectional, use `inline_response()`
143     EXT = 2  # Birirectional, use external responder
144
145
146 class GPS303Pkt(metaclass=MetaPkt):
147     RESPOND = Respond.NON  # Do not send anything back by default
148     PROTO: int
149     IN_KWARGS = ()
150     OUT_KWARGS = ()
151
152     def __init__(self, *args, **kwargs):
153         assert len(args) == 0
154         for kw, typ, dfl in self.KWARGS:
155             setattr(self, kw, typ(kwargs.pop(kw, dfl)))
156         if kwargs:
157             print("KWARGS", self.KWARGS)
158             print("kwargs", kwargs)
159             raise TypeError(
160                 self.__class__.__name__ + " stray kwargs " + str(kwargs)
161             )
162
163     def __repr__(self):
164         return "{}({})".format(
165             self.__class__.__name__,
166             ", ".join(
167                 "{}={}".format(
168                     k,
169                     'bytes.fromhex("{}")'.format(v.hex())
170                     if isinstance(v, bytes)
171                     else v.__repr__(),
172                 )
173                 for k, v in self.__dict__.items()
174                 if not k.startswith("_")
175             ),
176         )
177
178     def in_decode(self, length, packet):
179         return
180
181     def out_decode(self, length, packet):
182         raise NotImplementedError(
183             self.__class__.__name__ + ".decode() not implemented"
184         )
185
186     def in_encode(self):
187         raise NotImplementedError(
188             self.__class__.__name__ + ".encode() not implemented"
189         )
190
191     def out_encode(self):
192         return b""
193
194     @property
195     def packed(self):
196         payload = self.encode()
197         length = len(payload) + 1
198         return pack("BB", length, self.PROTO) + payload
199
200     @classmethod
201     def from_packet(cls, length, payload):
202         self = cls.In()
203         self.length = length
204         self.payload = payload
205         self.decode(length, payload)
206         return self
207
208
209 class UNKNOWN(GPS303Pkt):
210     PROTO = 256  # > 255 is impossible in real packets
211
212
213 class LOGIN(GPS303Pkt):
214     PROTO = 0x01
215     RESPOND = Respond.INL
216     # Default response for ACK, can also respond with STOP_UPLOAD
217
218     def in_decode(self, length, payload):
219         self.imei = payload[:-1].hex()
220         self.ver = unpack("B", payload[-1:])[0]
221         return self
222
223
224 class SUPERVISION(GPS303Pkt):
225     PROTO = 0x05
226     OUT_KWARGS = (("status", int, 1),)
227
228     def out_encode(self):
229         # 1: The device automatically answers Pickup effect
230         # 2: Automatically Answering Two-way Calls
231         # 3: Ring manually answer the two-way call
232         return pack("B", self.status)
233
234
235 class HEARTBEAT(GPS303Pkt):
236     PROTO = 0x08
237     RESPOND = Respond.INL
238
239
240 class _GPS_POSITIONING(GPS303Pkt):
241     RESPOND = Respond.INL
242
243     def in_decode(self, length, payload):
244         self.dtime = payload[:6]
245         if self.dtime == b"\0\0\0\0\0\0":
246             self.devtime = None
247         else:
248             self.devtime = datetime(
249                 *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
250             )
251         self.gps_data_length = payload[6] >> 4
252         self.gps_nb_sat = payload[6] & 0x0F
253         lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
254         self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
255         flip_lon = bool(flags & 0b0000100000000000)  # bit 4
256         flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
257         self.heading = flags & 0b0000001111111111  # bits 6 - last
258         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
259         self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
260         self.speed = speed
261         self.flags = flags
262         return self
263
264     def out_encode(self):
265         tup = datetime.utcnow().timetuple()
266         ttup = (tup[0] % 100,) + tup[1:6]
267         return pack("BBBBBB", *ttup)
268
269
270 class GPS_POSITIONING(_GPS_POSITIONING):
271     PROTO = 0x10
272
273
274 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
275     PROTO = 0x11
276
277
278 class STATUS(GPS303Pkt):
279     PROTO = 0x13
280     RESPOND = Respond.EXT
281     OUT_KWARGS = (("upload_interval", int, 25),)
282
283     def in_decode(self, length, payload):
284         self.batt, self.ver, self.timezone, self.intvl = unpack(
285             "BBBB", payload[:4]
286         )
287         if len(payload) > 4:
288             self.signal = payload[4]
289         else:
290             self.signal = None
291         return self
292
293     def out_encode(self):  # Set interval in minutes
294         return pack("B", self.upload_interval)
295
296
297 class HIBERNATION(GPS303Pkt):  # Server can send to send devicee to sleep
298     PROTO = 0x14
299     RESPOND = Respond.INL
300
301
302 class RESET(GPS303Pkt):
303     # Device sends when it got reset SMS
304     # Server can send to initiate factory reset
305     PROTO = 0x15
306
307
308 class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
309     PROTO = 0x16
310     OUT_KWARGS = (("number", int, 3),)
311
312     def out_encode(self):  # Number of whitelist entries
313         return pack("B", number)
314
315
316 class _WIFI_POSITIONING(GPS303Pkt):
317     def in_decode(self, length, payload):
318         self.dtime = payload[:6]
319         if self.dtime == b"\0\0\0\0\0\0":
320             self.devtime = None
321         else:
322             self.devtime = datetime.strptime(
323                 self.dtime.hex(), "%y%m%d%H%M%S"
324             ).astimezone(tz=timezone.utc)
325         self.wifi_aps = []
326         for i in range(self.length):  # length has special meaning here
327             slice = payload[6 + i * 7 : 13 + i * 7]
328             self.wifi_aps.append(
329                 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
330             )
331         gsm_slice = payload[6 + self.length * 7 :]
332         ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
333         self.gsm_cells = []
334         for i in range(ncells):
335             slice = gsm_slice[4 + i * 5 : 9 + i * 5]
336             locac, cellid, sigstr = unpack(
337                 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
338             )
339             self.gsm_cells.append((locac, cellid, -sigstr))
340         return self
341
342
343 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
344     PROTO = 0x17
345     RESPOND = Respond.INL
346
347     def out_encode(self):
348         return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
349
350
351 class TIME(GPS303Pkt):
352     PROTO = 0x30
353     RESPOND = Respond.INL
354
355     def out_encode(self):
356         return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
357
358
359 class PROHIBIT_LBS(GPS303Pkt):
360     PROTO = 0x33
361     OUT_KWARGS = (("status", int, 1),)
362
363     def out_encode(self):  # Server sent, 0-off, 1-on
364         return pack("B", self.status)
365
366
367 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
368     PROTO = 0x34
369
370     # Data is in packed decimal
371     # 00/01 - GPS on/off
372     # 00/01 - Don't set / Set upload period
373     # HHMMHHMM - Upload period
374     # 00/01 - LBS on/off
375     # 00/01 - Don't set / Set time of boot
376     # HHMM  - Time of boot
377     # 00/01 - Don't set / Set time of shutdown
378     # HHMM  - Time of shutdown
379     def out_encode(self):
380         return b""  # TODO
381
382
383 class _SET_PHONE(GPS303Pkt):
384     OUT_KWARGS = (("phone", str, ""),)
385
386     def out_encode(self):
387         return self.phone.encode()
388
389
390 class REMOTE_MONITOR_PHONE(_SET_PHONE):
391     PROTO = 0x40
392
393
394 class SOS_PHONE(_SET_PHONE):
395     PROTO = 0x41
396
397
398 class DAD_PHONE(_SET_PHONE):
399     PROTO = 0x42
400
401
402 class MOM_PHONE(_SET_PHONE):
403     PROTO = 0x43
404
405
406 class STOP_UPLOAD(GPS303Pkt):  # Server response to LOGIN to thwart the device
407     PROTO = 0x44
408
409
410 class GPS_OFF_PERIOD(GPS303Pkt):
411     PROTO = 0x46
412     OUT_KWARGS = (
413         ("onoff", int, 0),
414         ("fm", hhmm, "0000"),
415         ("to", hhmm, "2359"),
416     )
417
418     def out_encode(self):
419         return (
420             pack("B", self.onoff)
421             + bytes.fromhex(self.fm)
422             + bytes.fromhex(self.to)
423         )
424
425
426 class DND_PERIOD(GPS303Pkt):
427     PROTO = 0x47
428     OUT_KWARGS = (
429         ("onoff", int, 0),
430         ("week", int, 3),
431         ("fm1", hhmm, "0000"),
432         ("to1", hhmm, "2359"),
433         ("fm2", hhmm, "0000"),
434         ("to2", hhmm, "2359"),
435     )
436
437     def out_endode(self):
438         return (
439             pack("B", self.onoff)
440             + pack("B", self.week)
441             + bytes.fromhex(self.fm1)
442             + bytes.fromhex(self.to1)
443             + bytes.fromhex(self.fm2)
444             + bytes.fromhex(self.to2)
445         )
446
447
448 class RESTART_SHUTDOWN(GPS303Pkt):
449     PROTO = 0x48
450     OUT_KWARGS = (("flag", int, 0),)
451
452     def out_encode(self):
453         # 1 - restart
454         # 2 - shutdown
455         return pack("B", self.flag)
456
457
458 class DEVICE(GPS303Pkt):
459     PROTO = 0x49
460     OUT_KWARGS = (("flag", int, 0),)
461
462     # 0 - Stop looking for equipment
463     # 1 - Start looking for equipment
464     def out_encode(self):
465         return pack("B", self.flag)
466
467
468 class ALARM_CLOCK(GPS303Pkt):
469     PROTO = 0x50
470
471     def out_encode(self):
472         # TODO implement parsing kwargs
473         alarms = ((0, "0000"), (0, "0000"), (0, "0000"))
474         return b"".join(
475             cls("B", day) + bytes.fromhex(tm) for day, tm in alarms
476         )
477
478
479 class STOP_ALARM(GPS303Pkt):
480     PROTO = 0x56
481
482     def in_decode(self, length, payload):
483         self.flag = payload[0]
484         return self
485
486
487 class SETUP(GPS303Pkt):
488     PROTO = 0x57
489     RESPOND = Respond.EXT
490     OUT_KWARGS = (
491         ("uploadintervalseconds", intx, 0x0300),
492         ("binaryswitch", intx, 0b00110001),
493         ("alarms", l3int, [0, 0, 0]),
494         ("dndtimeswitch", int, 0),
495         ("dndtimes", l3int, [0, 0, 0]),
496         ("gpstimeswitch", int, 0),
497         ("gpstimestart", int, 0),
498         ("gpstimestop", int, 0),
499         ("phonenumbers", l3str, ["", "", ""]),
500     )
501
502     def out_encode(self):
503         def pack3b(x):
504             return pack("!I", x)[1:]
505
506         return b"".join(
507             [
508                 pack("!H", self.uploadintervalseconds),
509                 pack("B", self.binaryswitch),
510             ]
511             + [pack3b(el) for el in self.alarms]
512             + [
513                 pack("B", self.dndtimeswitch),
514             ]
515             + [pack3b(el) for el in self.dndtimes]
516             + [
517                 pack("B", self.gpstimeswitch),
518                 pack("!H", self.gpstimestart),
519                 pack("!H", self.gpstimestop),
520             ]
521             + [b";".join([el.encode() for el in self.phonenumbers])]
522         )
523
524
525 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
526     PROTO = 0x58
527
528
529 class RESTORE_PASSWORD(GPS303Pkt):
530     PROTO = 0x67
531
532
533 class WIFI_POSITIONING(_WIFI_POSITIONING):
534     PROTO = 0x69
535     RESPOND = Respond.EXT
536     OUT_KWARGS = (("lat", float, None), ("lon", float, None))
537
538     def out_encode(self):
539         if self.lat is None or self.lon is None:
540             return b""
541         return "{:+#010.8g},{:+#010.8g}".format(self.lat, self.lon).encode()
542
543
544 class MANUAL_POSITIONING(GPS303Pkt):
545     PROTO = 0x80
546
547     def in_decode(self, length, payload):
548         self.flag = payload[0] if len(payload) > 0 else None
549         self.reason = {
550             1: "Incorrect time",
551             2: "LBS less",
552             3: "WiFi less",
553             4: "LBS search > 3 times",
554             5: "Same LBS and WiFi data",
555             6: "LBS prohibited, WiFi absent",
556             7: "GPS spacing < 50 m",
557         }.get(self.flag, "Unknown")
558         return self
559
560
561 class BATTERY_CHARGE(GPS303Pkt):
562     PROTO = 0x81
563
564
565 class CHARGER_CONNECTED(GPS303Pkt):
566     PROTO = 0x82
567
568
569 class CHARGER_DISCONNECTED(GPS303Pkt):
570     PROTO = 0x83
571
572
573 class VIBRATION_RECEIVED(GPS303Pkt):
574     PROTO = 0x94
575
576
577 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
578     PROTO = 0x98
579     RESPOND = Respond.EXT
580     OUT_KWARGS = (("interval", int, 10),)
581
582     def in_decode(self, length, payload):
583         self.interval = unpack("!H", payload[:2])
584         return self
585
586     def out_encode(self):
587         return pack("!H", interval)
588
589
590 class SOS_ALARM(GPS303Pkt):
591     PROTO = 0x99
592
593
594 class UNKNOWN_B3(GPS303Pkt):
595     PROTO = 0xB3
596     IN_KWARGS = (("asciidata", str, ""),)
597
598     def in_decode(self, length, payload):
599         self.asciidata = payload.decode()
600         return self
601
602
603 # Build dicts protocol number -> class and class name -> protocol number
604 CLASSES = {}
605 PROTOS = {}
606 if True:  # just to indent the code, sorry!
607     for cls in [
608         cls
609         for name, cls in globals().items()
610         if isclass(cls)
611         and issubclass(cls, GPS303Pkt)
612         and not name.startswith("_")
613     ]:
614         if hasattr(cls, "PROTO"):
615             CLASSES[cls.PROTO] = cls
616             PROTOS[cls.__name__] = cls.PROTO
617
618
619 def class_by_prefix(prefix):
620     lst = [
621         (name, proto)
622         for name, proto in PROTOS.items()
623         if name.upper().startswith(prefix.upper())
624     ]
625     if len(lst) != 1:
626         return lst
627     _, proto = lst[0]
628     return CLASSES[proto]
629
630
631 def proto_by_name(name):
632     return PROTOS.get(name, -1)
633
634
635 def proto_of_message(packet):
636     return unpack("B", packet[1:2])[0]
637
638
639 def inline_response(packet):
640     proto = proto_of_message(packet)
641     if proto in CLASSES:
642         cls = CLASSES[proto]
643         if cls.RESPOND is Respond.INL:
644             return cls.Out().packed
645     return None
646
647
648 def parse_message(packet):
649     """From a packet (without framing bytes) derive the XXX.In object"""
650     length, proto = unpack("BB", packet[:2])
651     payload = packet[2:]
652     if proto in CLASSES:
653         return CLASSES[proto].from_packet(length, payload)
654     else:
655         retobj = UNKNOWN.from_packet(length, payload)
656         retobj.PROTO = proto  # Override class attr with object attr
657         return retobj