]> www.average.org Git - loctrkd.git/blob - gps303/gps303proto.py
introduce `class_by_prefix()`
[loctrkd.git] / gps303 / gps303proto.py
1 """
2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
5
6 Forewarnings:
7 1. There is no security whatsoever. If you know the module's IMEI,
8    you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10    the TCP stream) makes it vulnerable to coincidental appearance
11    of framing bytes in the middle of the message. Most of the time
12    the server will receive one message in one TCP segment (i.e. in
13    one `recv()` operation, but relying on that would break things
14    if the path has lower MTU than the size of a message.
15 """
16
17 from datetime import datetime, timezone
18 from enum import Enum
19 from inspect import isclass
20 from logging import getLogger
21 from struct import pack, unpack
22
23 __all__ = (
24     "class_by_prefix",
25     "handle_packet",
26     "inline_response",
27     "make_object",
28     "make_response",
29     "parse_message",
30     "proto_by_name",
31     "GPS303Pkt",
32     "UNKNOWN",
33     "LOGIN",
34     "SUPERVISION",
35     "HEARTBEAT",
36     "GPS_POSITIONING",
37     "GPS_OFFLINE_POSITIONING",
38     "STATUS",
39     "HIBERNATION",
40     "RESET",
41     "WHITELIST_TOTAL",
42     "WIFI_OFFLINE_POSITIONING",
43     "TIME",
44     "MOM_PHONE",
45     "STOP_ALARM",
46     "SETUP",
47     "SYNCHRONOUS_WHITELIST",
48     "RESTORE_PASSWORD",
49     "WIFI_POSITIONING",
50     "MANUAL_POSITIONING",
51     "BATTERY_CHARGE",
52     "CHARGER_CONNECTED",
53     "CHARGER_DISCONNECTED",
54     "VIBRATION_RECEIVED",
55     "POSITION_UPLOAD_INTERVAL",
56 )
57
58 log = getLogger("gps303")
59
60
61 class Dir(Enum):
62     IN = 0  # Incoming, no response needed
63     INLINE = 2  # Birirectional, use `inline_response()`
64     EXT = 3  # Birirectional, use external responder
65     OUT = 4  # Outgoing, should not appear on input
66
67
68 class GPS303Pkt:
69     PROTO: int
70     DIR = Dir.INLINE  # Most packets anticipate simple acknowledgement
71
72     def __init__(self, *args, **kwargs):
73         assert len(args) == 0
74         for k, v in kwargs.items():
75             setattr(self, k, v)
76
77     def __repr__(self):
78         return "{}({})".format(
79             self.__class__.__name__,
80             ", ".join(
81                 "{}={}".format(
82                     k,
83                     'bytes.fromhex("{}")'.format(v.hex())
84                     if isinstance(v, bytes)
85                     else v.__repr__(),
86                 )
87                 for k, v in self.__dict__.items()
88                 if not k.startswith("_")
89             ),
90         )
91
92     @classmethod
93     def from_packet(cls, length, payload):
94         return cls(payload=payload, length=length)
95
96     def to_packet(self):
97         return pack("BB", self.length, self.PROTO) + self.payload
98
99     @classmethod
100     def make_packet(cls, payload):
101         assert isinstance(payload, bytes)
102         length = len(payload) + 1  # plus proto byte
103         # if length > 6:
104         #     length -= 6
105         return pack("BB", length, cls.PROTO) + payload
106
107     @classmethod
108     def inline_response(cls, packet):
109         if cls.DIR is Dir.INLINE:
110             return cls.make_packet(b"")
111         else:
112             return None
113
114
115 class UNKNOWN(GPS303Pkt):
116     PROTO = 256  # > 255 is impossible in real packets
117     DIR = Dir.IN
118
119
120 class LOGIN(GPS303Pkt):
121     PROTO = 0x01
122     # Default response for ACK, can also respond with STOP_UPLOAD
123
124     @classmethod
125     def from_packet(cls, length, payload):
126         self = super().from_packet(length, payload)
127         self.imei = payload[:-1].hex()
128         self.ver = unpack("B", payload[-1:])[0]
129         return self
130
131
132 class SUPERVISION(GPS303Pkt):
133     PROTO = 0x05
134     DIR = Dir.OUT
135
136     def response(self, status=0):
137         # 1: The device automatically answers Pickup effect
138         # 2: Automatically Answering Two-way Calls
139         # 3: Ring manually answer the two-way call
140         return self.make_packet(pack("B", status))
141
142
143 class HEARTBEAT(GPS303Pkt):
144     PROTO = 0x08
145
146
147 class _GPS_POSITIONING(GPS303Pkt):
148     @classmethod
149     def from_packet(cls, length, payload):
150         self = super().from_packet(length, payload)
151         self.dtime = payload[:6]
152         if self.dtime == b"\0\0\0\0\0\0":
153             self.devtime = None
154         else:
155             self.devtime = datetime(
156                 *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
157             )
158         self.gps_data_length = payload[6] >> 4
159         self.gps_nb_sat = payload[6] & 0x0F
160         lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
161         self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
162         flip_lon = bool(flags & 0b0000100000000000)  # bit 4
163         flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
164         self.heading = flags & 0b0000001111111111  # bits 6 - last
165         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
166         self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
167         self.speed = speed
168         self.flags = flags
169         return self
170
171     @classmethod
172     def inline_response(cls, packet):
173         tup = datetime.utcnow().timetuple()
174         ttup = (tup[0] % 100,) + tup[1:6]
175         return cls.make_packet(pack("BBBBBB", *ttup))
176
177
178 class GPS_POSITIONING(_GPS_POSITIONING):
179     PROTO = 0x10
180
181
182 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
183     PROTO = 0x11
184
185
186 class STATUS(GPS303Pkt):
187     PROTO = 0x13
188     DIR = Dir.EXT
189
190     @classmethod
191     def from_packet(cls, length, payload):
192         self = super().from_packet(length, payload)
193         if len(payload) == 5:
194             (
195                 self.batt,
196                 self.ver,
197                 self.timezone,
198                 self.intvl,
199                 self.signal,
200             ) = unpack("BBBBB", payload)
201         elif len(payload) == 4:
202             self.batt, self.ver, self.timezone, self.intvl = unpack(
203                 "BBBB", payload
204             )
205             self.signal = None
206         return self
207
208     def response(self, upload_interval=25):  # Set interval in minutes
209         return self.make_packet(pack("B", upload_interval))
210
211
212 class HIBERNATION(GPS303Pkt):
213     PROTO = 0x14
214     DIR = Dir.EXT
215
216     def response(self):  # Server can send to send devicee to sleep
217         return self.make_packet(b"")
218
219
220 class RESET(GPS303Pkt):  # Device sends when it got reset SMS
221     PROTO = 0x15
222     DIR = Dir.EXT
223
224     def response(self):  # Server can send to initiate factory reset
225         return self.make_packet(b"")
226
227
228 class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
229     PROTO = 0x16
230     DIR = Dir.OUT
231
232     def response(self, number=3):  # Number of whitelist entries
233         return self.make_packet(pack("B", number))
234
235
236 class _WIFI_POSITIONING(GPS303Pkt):
237     @classmethod
238     def from_packet(cls, length, payload):
239         self = super().from_packet(length, payload)
240         self.dtime = payload[:6]
241         if self.dtime == b"\0\0\0\0\0\0":
242             self.devtime = None
243         else:
244             self.devtime = datetime.strptime(
245                 self.dtime.hex(), "%y%m%d%H%M%S"
246             ).astimezone(tz=timezone.utc)
247         self.wifi_aps = []
248         for i in range(self.length):  # length has special meaning here
249             slice = payload[6 + i * 7 : 13 + i * 7]
250             self.wifi_aps.append(
251                 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
252             )
253         gsm_slice = payload[6 + self.length * 7 :]
254         ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
255         self.gsm_cells = []
256         for i in range(ncells):
257             slice = gsm_slice[4 + i * 5 : 9 + i * 5]
258             locac, cellid, sigstr = unpack(
259                 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
260             )
261             self.gsm_cells.append((locac, cellid, -sigstr))
262         return self
263
264
265 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
266     PROTO = 0x17
267
268     @classmethod
269     def inline_response(cls, packet):
270         return cls.make_packet(
271             bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
272         )
273
274
275 class TIME(GPS303Pkt):
276     PROTO = 0x30
277
278     @classmethod
279     def inline_response(cls, packet):
280         return cls.make_packet(
281             pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
282         )
283
284
285 class PROHIBIT_LBS(GPS303Pkt):
286     PROTO = 0x33
287     DIR = Dir.OUT
288
289     def response(self, status=1):  # Server sent, 0-off, 1-on
290         return self.make_packet(pack("B", status))
291
292
293 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
294     PROTO = 0x34
295     DIR = Dir.OUT
296
297     def response(self):
298         # Data is in packed decimal
299         # 00/01 - GPS on/off
300         # 00/01 - Don't set / Set upload period
301         # HHMMHHMM - Upload period
302         # 00/01 - LBS on/off
303         # 00/01 - Don't set / Set time of boot
304         # HHMM  - Time of boot
305         # 00/01 - Don't set / Set time of shutdown
306         # HHMM  - Time of shutdown
307         return self.make_packet(b"")  # TODO
308
309
310 class _SET_PHONE(GPS303Pkt):
311     DIR = Dir.OUT
312
313     def response(self, phone):
314         return self.make_packet(phone.encode())
315
316
317 class REMOTE_MONITOR_PHONE(_SET_PHONE):
318     PROTO = 0x40
319
320
321 class SOS_PHONE(_SET_PHONE):
322     PROTO = 0x41
323
324
325 class DAD_PHONE(_SET_PHONE):
326     PROTO = 0x42
327
328
329 class MOM_PHONE(_SET_PHONE):
330     PROTO = 0x43
331
332
333 class STOP_UPLOAD(GPS303Pkt):  # Server response to LOGIN to thwart the device
334     PROTO = 0x44
335     DIR = Dir.OUT
336
337     def response(self):
338         return self.make_packet(b"")
339
340
341 class GPS_OFF_PERIOD(GPS303Pkt):
342     PROTO = 0x46
343     DIR = Dir.OUT
344
345     def response(self, onoff=0, fm="0000", to="2359"):
346         return self.make_packet(
347             pack("B", onoff) + bytes.fromhex(fm) + bytes.fromhex(to)
348         )
349
350
351 class DND_PERIOD(GPS303Pkt):
352     PROTO = 0x47
353     DIR = Dir.OUT
354
355     def response(
356         self, onoff=0, week=3, fm1="0000", to1="2359", fm2="0000", to2="2359"
357     ):
358         return self.make_packet(
359             pack("B", onoff)
360             + pack("B", week)
361             + bytes.fromhex(fm1)
362             + bytes.fromhex(to1)
363             + bytes.fromhex(fm2)
364             + bytes.fromhex(to2)
365         )
366
367
368 class RESTART_SHUTDOWN(GPS303Pkt):
369     PROTO = 0x48
370     DIR = Dir.OUT
371
372     def response(self, flag=0):
373         # 1 - restart
374         # 2 - shutdown
375         return self.make_packet(pack("B", flag))
376
377
378 class DEVICE(GPS303Pkt):
379     PROTO = 0x49
380     DIR = Dir.OUT
381
382     def response(self, flag=0):
383         # 0 - Stop looking for equipment
384         # 1 - Start looking for equipment
385         return self.make_packet(pack("B", flag))
386
387
388 class ALARM_CLOCK(GPS303Pkt):
389     PROTO = 0x50
390     DIR = Dir.OUT
391
392     def response(self, alarms=((0, "0000"), (0, "0000"), (0, "0000"))):
393         return b"".join(
394             pack("B", day) + bytes.fromhex(tm) for day, tm in alarms
395         )
396
397
398 class STOP_ALARM(GPS303Pkt):
399     PROTO = 0x56
400
401     @classmethod
402     def from_packet(cls, length, payload):
403         self = super().from_packet(length, payload)
404         self.flag = payload[0]
405
406
407 class SETUP(GPS303Pkt):
408     PROTO = 0x57
409     DIR = Dir.EXT
410
411     def response(
412         self,
413         uploadintervalseconds=0x0300,
414         binaryswitch=0b00110001,
415         alarms=[0, 0, 0],
416         dndtimeswitch=0,
417         dndtimes=[0, 0, 0],
418         gpstimeswitch=0,
419         gpstimestart=0,
420         gpstimestop=0,
421         phonenumbers=["", "", ""],
422     ):
423         def pack3b(x):
424             return pack("!I", x)[1:]
425
426         payload = b"".join(
427             [
428                 pack("!H", uploadintervalseconds),
429                 pack("B", binaryswitch),
430             ]
431             + [pack3b(el) for el in alarms]
432             + [
433                 pack("B", dndtimeswitch),
434             ]
435             + [pack3b(el) for el in dndtimes]
436             + [
437                 pack("B", gpstimeswitch),
438                 pack("!H", gpstimestart),
439                 pack("!H", gpstimestop),
440             ]
441             + [b";".join([el.encode() for el in phonenumbers])]
442         )
443         return self.make_packet(payload)
444
445
446 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
447     PROTO = 0x58
448
449
450 class RESTORE_PASSWORD(GPS303Pkt):
451     PROTO = 0x67
452
453
454 class WIFI_POSITIONING(_WIFI_POSITIONING):
455     PROTO = 0x69
456     DIR = Dir.EXT
457
458     def response(self, lat=None, lon=None):
459         if lat is None or lon is None:
460             payload = b""
461         else:
462             payload = "{:+#010.8g},{:+#010.8g}".format(lat, lon).encode(
463                 "ascii"
464             )
465         return self.make_packet(payload)
466
467
468 class MANUAL_POSITIONING(GPS303Pkt):
469     PROTO = 0x80
470     DIR = Dir.EXT
471
472     @classmethod
473     def from_packet(cls, length, payload):
474         self = super().from_packet(length, payload)
475         self.flag = payload[0]
476         self.reason = {
477             1: "Incorrect time",
478             2: "LBS less",
479             3: "WiFi less",
480             4: "LBS search > 3 times",
481             5: "Same LBS and WiFi data",
482             6: "LBS prohibited, WiFi absent",
483             7: "GPS spacing < 50 m",
484         }.get(self.flag, "Unknown")
485
486     def response(self):
487         return self.make_packet(b"")
488
489
490 class BATTERY_CHARGE(GPS303Pkt):
491     PROTO = 0x81
492
493
494 class CHARGER_CONNECTED(GPS303Pkt):
495     PROTO = 0x82
496
497
498 class CHARGER_DISCONNECTED(GPS303Pkt):
499     PROTO = 0x83
500
501
502 class VIBRATION_RECEIVED(GPS303Pkt):
503     PROTO = 0x94
504
505
506 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
507     PROTO = 0x98
508     DIR = Dir.EXT
509
510     @classmethod
511     def from_packet(cls, length, payload):
512         self = super().from_packet(length, payload)
513         self.interval = unpack("!H", payload[:2])
514         return self
515
516     def response(self, interval=10):
517         return self.make_packet(pack("!H", interval))
518
519
520 class SOS_ALARM(GPS303Pkt):
521     PROTO = 0x99
522
523
524 # Build dicts protocol number -> class and class name -> protocol number
525 CLASSES = {}
526 PROTOS = {}
527 if True:  # just to indent the code, sorry!
528     for cls in [
529         cls
530         for name, cls in globals().items()
531         if isclass(cls)
532         and issubclass(cls, GPS303Pkt)
533         and not name.startswith("_")
534     ]:
535         if hasattr(cls, "PROTO"):
536             CLASSES[cls.PROTO] = cls
537             PROTOS[cls.__name__] = cls.PROTO
538
539
540 def class_by_prefix(prefix):
541     lst = [(name, proto) for name, proto in PROTOS.items()
542             if name.upper().startswith(prefix.upper())]
543     if len(lst) != 1:
544         return lst
545     _, proto = lst[0]
546     return CLASSES[proto]
547
548
549 def proto_by_name(name):
550     return PROTOS.get(name, -1)
551
552
553 def proto_of_message(packet):
554     return unpack("B", packet[1:2])[0]
555
556
557 def inline_response(packet):
558     proto = proto_of_message(packet)
559     if proto in CLASSES:
560         return CLASSES[proto].inline_response(packet)
561     else:
562         return None
563
564
565 def make_object(length, proto, payload):
566     if proto in CLASSES:
567         return CLASSES[proto].from_packet(length, payload)
568     else:
569         retobj = UNKNOWN.from_packet(length, payload)
570         retobj.PROTO = proto  # Override class attr with object attr
571         return retobj
572
573
574 def parse_message(packet):
575     length, proto = unpack("BB", packet[:2])
576     payload = packet[2:]
577     adjust = 2 if proto == STATUS.PROTO else 4  # Weird special case
578     if (
579         proto not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO)
580         and length > 1
581         and len(payload) + adjust != length
582     ):
583         log.warning(
584             "With proto %d length is %d but payload length is %d+%d",
585             proto,
586             length,
587             len(payload),
588             adjust,
589         )
590     return make_object(length, proto, payload)
591
592
593 def handle_packet(packet):  # DEPRECATED
594     if len(packet) < 6 or packet[:2] != b"xx" or packet[-2:] != b"\r\n":
595         return UNKNOWN.from_packet(len(packet), packet)
596     return parse_message(packet[2:-2])
597
598
599 def make_response(msg, **kwargs):  # DEPRECATED
600     inframe = msg.response(**kwargs)
601     return None if inframe is None else b"xx" + inframe + b"\r\n"