]> www.average.org Git - loctrkd.git/blob - gps303/gps303proto.py
Don't make unneeded responses, better debug log
[loctrkd.git] / gps303 / gps303proto.py
1 """
2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
5
6 Forewarnings:
7 1. There is no security whatsoever. If you know the module's IMEI,
8    you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10    the TCP stream) makes it vulnerable to coincidental appearance
11    of framing bytes in the middle of the message. Most of the time
12    the server will receive one message in one TCP segment (i.e. in
13    one `recv()` operation, but relying on that would break things
14    if the path has lower MTU than the size of a message.
15 """
16
17 from datetime import datetime, timezone
18 from enum import Enum
19 from inspect import isclass
20 from logging import getLogger
21 from struct import pack, unpack
22
23 __all__ = (
24     "class_by_prefix",
25     "handle_packet",
26     "inline_response",
27     "make_object",
28     "make_response",
29     "parse_message",
30     "proto_by_name",
31     "Dir",
32     "GPS303Pkt",
33     "UNKNOWN",
34     "LOGIN",
35     "SUPERVISION",
36     "HEARTBEAT",
37     "GPS_POSITIONING",
38     "GPS_OFFLINE_POSITIONING",
39     "STATUS",
40     "HIBERNATION",
41     "RESET",
42     "WHITELIST_TOTAL",
43     "WIFI_OFFLINE_POSITIONING",
44     "TIME",
45     "MOM_PHONE",
46     "STOP_ALARM",
47     "SETUP",
48     "SYNCHRONOUS_WHITELIST",
49     "RESTORE_PASSWORD",
50     "WIFI_POSITIONING",
51     "MANUAL_POSITIONING",
52     "BATTERY_CHARGE",
53     "CHARGER_CONNECTED",
54     "CHARGER_DISCONNECTED",
55     "VIBRATION_RECEIVED",
56     "POSITION_UPLOAD_INTERVAL",
57 )
58
59 log = getLogger("gps303")
60
61
62 class Dir(Enum):
63     IN = 0  # Incoming, no response needed
64     INLINE = 2  # Birirectional, use `inline_response()`
65     EXT = 3  # Birirectional, use external responder
66     OUT = 4  # Outgoing, should not appear on input
67
68
69 class GPS303Pkt:
70     PROTO: int
71     DIR = Dir.IN  # Do not send anything back by default
72
73     def __init__(self, *args, **kwargs):
74         assert len(args) == 0
75         for k, v in kwargs.items():
76             setattr(self, k, v)
77
78     def __repr__(self):
79         return "{}({})".format(
80             self.__class__.__name__,
81             ", ".join(
82                 "{}={}".format(
83                     k,
84                     'bytes.fromhex("{}")'.format(v.hex())
85                     if isinstance(v, bytes)
86                     else v.__repr__(),
87                 )
88                 for k, v in self.__dict__.items()
89                 if not k.startswith("_")
90             ),
91         )
92
93     @classmethod
94     def from_packet(cls, length, payload):
95         return cls(payload=payload, length=length)
96
97     def to_packet(self):
98         return pack("BB", self.length, self.PROTO) + self.payload
99
100     @classmethod
101     def make_packet(cls, payload):
102         assert isinstance(payload, bytes)
103         length = len(payload) + 1  # plus proto byte
104         # if length > 6:
105         #     length -= 6
106         return pack("BB", length, cls.PROTO) + payload
107
108     @classmethod
109     def inline_response(cls, packet):
110         if cls.DIR is Dir.INLINE:
111             return cls.make_packet(b"")
112         else:
113             return None
114
115
116 class UNKNOWN(GPS303Pkt):
117     PROTO = 256  # > 255 is impossible in real packets
118
119
120 class LOGIN(GPS303Pkt):
121     PROTO = 0x01
122     DIR = Dir.INLINE
123     # Default response for ACK, can also respond with STOP_UPLOAD
124
125     @classmethod
126     def from_packet(cls, length, payload):
127         self = super().from_packet(length, payload)
128         self.imei = payload[:-1].hex()
129         self.ver = unpack("B", payload[-1:])[0]
130         return self
131
132
133 class SUPERVISION(GPS303Pkt):
134     PROTO = 0x05
135     DIR = Dir.OUT
136
137     @classmethod
138     def response(cls, status=0):
139         # 1: The device automatically answers Pickup effect
140         # 2: Automatically Answering Two-way Calls
141         # 3: Ring manually answer the two-way call
142         return cls.make_packet(pack("B", status))
143
144
145 class HEARTBEAT(GPS303Pkt):
146     PROTO = 0x08
147     DIR = Dir.INLINE
148
149
150 class _GPS_POSITIONING(GPS303Pkt):
151     DIR = Dir.INLINE
152
153     @classmethod
154     def from_packet(cls, length, payload):
155         self = super().from_packet(length, payload)
156         self.dtime = payload[:6]
157         if self.dtime == b"\0\0\0\0\0\0":
158             self.devtime = None
159         else:
160             self.devtime = datetime(
161                 *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
162             )
163         self.gps_data_length = payload[6] >> 4
164         self.gps_nb_sat = payload[6] & 0x0F
165         lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
166         self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
167         flip_lon = bool(flags & 0b0000100000000000)  # bit 4
168         flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
169         self.heading = flags & 0b0000001111111111  # bits 6 - last
170         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
171         self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
172         self.speed = speed
173         self.flags = flags
174         return self
175
176     @classmethod
177     def inline_response(cls, packet):
178         tup = datetime.utcnow().timetuple()
179         ttup = (tup[0] % 100,) + tup[1:6]
180         return cls.make_packet(pack("BBBBBB", *ttup))
181
182
183 class GPS_POSITIONING(_GPS_POSITIONING):
184     PROTO = 0x10
185
186
187 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
188     PROTO = 0x11
189
190
191 class STATUS(GPS303Pkt):
192     PROTO = 0x13
193     DIR = Dir.EXT
194
195     @classmethod
196     def from_packet(cls, length, payload):
197         self = super().from_packet(length, payload)
198         if len(payload) == 5:
199             (
200                 self.batt,
201                 self.ver,
202                 self.timezone,
203                 self.intvl,
204                 self.signal,
205             ) = unpack("BBBBB", payload)
206         elif len(payload) == 4:
207             self.batt, self.ver, self.timezone, self.intvl = unpack(
208                 "BBBB", payload
209             )
210             self.signal = None
211         return self
212
213     @classmethod
214     def response(cls, upload_interval=25):  # Set interval in minutes
215         return cls.make_packet(pack("B", upload_interval))
216
217
218 class HIBERNATION(GPS303Pkt):
219     PROTO = 0x14
220     DIR = Dir.INLINE
221
222     @classmethod
223     def response(cls):  # Server can send to send devicee to sleep
224         return cls.make_packet(b"")
225
226
227 class RESET(GPS303Pkt):  # Device sends when it got reset SMS
228     PROTO = 0x15
229
230     @classmethod
231     def response(cls):  # Server can send to initiate factory reset
232         return cls.make_packet(b"")
233
234
235 class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
236     PROTO = 0x16
237     DIR = Dir.OUT
238
239     @classmethod
240     def response(cls, number=3):  # Number of whitelist entries
241         return cls.make_packet(pack("B", number))
242
243
244 class _WIFI_POSITIONING(GPS303Pkt):
245     @classmethod
246     def from_packet(cls, length, payload):
247         self = super().from_packet(length, payload)
248         self.dtime = payload[:6]
249         if self.dtime == b"\0\0\0\0\0\0":
250             self.devtime = None
251         else:
252             self.devtime = datetime.strptime(
253                 self.dtime.hex(), "%y%m%d%H%M%S"
254             ).astimezone(tz=timezone.utc)
255         self.wifi_aps = []
256         for i in range(self.length):  # length has special meaning here
257             slice = payload[6 + i * 7 : 13 + i * 7]
258             self.wifi_aps.append(
259                 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
260             )
261         gsm_slice = payload[6 + self.length * 7 :]
262         ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
263         self.gsm_cells = []
264         for i in range(ncells):
265             slice = gsm_slice[4 + i * 5 : 9 + i * 5]
266             locac, cellid, sigstr = unpack(
267                 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
268             )
269             self.gsm_cells.append((locac, cellid, -sigstr))
270         return self
271
272
273 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
274     PROTO = 0x17
275     DIR = Dir.INLINE
276
277     @classmethod
278     def inline_response(cls, packet):
279         return cls.make_packet(
280             bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
281         )
282
283
284 class TIME(GPS303Pkt):
285     PROTO = 0x30
286     DIR = Dir.INLINE
287
288     @classmethod
289     def inline_response(cls, packet):
290         return cls.make_packet(
291             pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
292         )
293
294
295 class PROHIBIT_LBS(GPS303Pkt):
296     PROTO = 0x33
297     DIR = Dir.OUT
298
299     @classmethod
300     def response(cls, status=1):  # Server sent, 0-off, 1-on
301         return cls.make_packet(pack("B", status))
302
303
304 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
305     PROTO = 0x34
306     DIR = Dir.OUT
307
308     @classmethod
309     def response(cls):
310         # Data is in packed decimal
311         # 00/01 - GPS on/off
312         # 00/01 - Don't set / Set upload period
313         # HHMMHHMM - Upload period
314         # 00/01 - LBS on/off
315         # 00/01 - Don't set / Set time of boot
316         # HHMM  - Time of boot
317         # 00/01 - Don't set / Set time of shutdown
318         # HHMM  - Time of shutdown
319         return cls.make_packet(b"")  # TODO
320
321
322 class _SET_PHONE(GPS303Pkt):
323     DIR = Dir.OUT
324
325     @classmethod
326     def response(cls, phone):
327         return cls.make_packet(phone.encode())
328
329
330 class REMOTE_MONITOR_PHONE(_SET_PHONE):
331     PROTO = 0x40
332
333
334 class SOS_PHONE(_SET_PHONE):
335     PROTO = 0x41
336
337
338 class DAD_PHONE(_SET_PHONE):
339     PROTO = 0x42
340
341
342 class MOM_PHONE(_SET_PHONE):
343     PROTO = 0x43
344
345
346 class STOP_UPLOAD(GPS303Pkt):  # Server response to LOGIN to thwart the device
347     PROTO = 0x44
348     DIR = Dir.OUT
349
350     @classmethod
351     def response(cls):
352         return cls.make_packet(b"")
353
354
355 class GPS_OFF_PERIOD(GPS303Pkt):
356     PROTO = 0x46
357     DIR = Dir.OUT
358
359     @classmethod
360     def response(cls, onoff=0, fm="0000", to="2359"):
361         return cls.make_packet(
362             pack("B", onoff) + bytes.fromhex(fm) + bytes.fromhex(to)
363         )
364
365
366 class DND_PERIOD(GPS303Pkt):
367     PROTO = 0x47
368     DIR = Dir.OUT
369
370     @classmethod
371     def response(
372         cls, onoff=0, week=3, fm1="0000", to1="2359", fm2="0000", to2="2359"
373     ):
374         return cls.make_packet(
375             pack("B", onoff)
376             + pack("B", week)
377             + bytes.fromhex(fm1)
378             + bytes.fromhex(to1)
379             + bytes.fromhex(fm2)
380             + bytes.fromhex(to2)
381         )
382
383
384 class RESTART_SHUTDOWN(GPS303Pkt):
385     PROTO = 0x48
386     DIR = Dir.OUT
387
388     @classmethod
389     def response(cls, flag=2):
390         # 1 - restart
391         # 2 - shutdown
392         return cls.make_packet(pack("B", flag))
393
394
395 class DEVICE(GPS303Pkt):
396     PROTO = 0x49
397     DIR = Dir.OUT
398
399     @classmethod
400     def response(cls, flag=0):
401         # 0 - Stop looking for equipment
402         # 1 - Start looking for equipment
403         return cls.make_packet(pack("B", flag))
404
405
406 class ALARM_CLOCK(GPS303Pkt):
407     PROTO = 0x50
408     DIR = Dir.OUT
409
410     @classmethod
411     def response(cls, alarms=((0, "0000"), (0, "0000"), (0, "0000"))):
412         return b"".join(
413             cls("B", day) + bytes.fromhex(tm) for day, tm in alarms
414         )
415
416
417 class STOP_ALARM(GPS303Pkt):
418     PROTO = 0x56
419
420     @classmethod
421     def from_packet(cls, length, payload):
422         self = super().from_packet(length, payload)
423         self.flag = payload[0]
424         return self
425
426
427 class SETUP(GPS303Pkt):
428     PROTO = 0x57
429     DIR = Dir.EXT
430
431     @classmethod
432     def response(
433         cls,
434         uploadintervalseconds=0x0300,
435         binaryswitch=0b00110001,
436         alarms=[0, 0, 0],
437         dndtimeswitch=0,
438         dndtimes=[0, 0, 0],
439         gpstimeswitch=0,
440         gpstimestart=0,
441         gpstimestop=0,
442         phonenumbers=["", "", ""],
443     ):
444         def pack3b(x):
445             return pack("!I", x)[1:]
446
447         payload = b"".join(
448             [
449                 pack("!H", uploadintervalseconds),
450                 pack("B", binaryswitch),
451             ]
452             + [pack3b(el) for el in alarms]
453             + [
454                 pack("B", dndtimeswitch),
455             ]
456             + [pack3b(el) for el in dndtimes]
457             + [
458                 pack("B", gpstimeswitch),
459                 pack("!H", gpstimestart),
460                 pack("!H", gpstimestop),
461             ]
462             + [b";".join([el.encode() for el in phonenumbers])]
463         )
464         return cls.make_packet(payload)
465
466
467 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
468     PROTO = 0x58
469
470
471 class RESTORE_PASSWORD(GPS303Pkt):
472     PROTO = 0x67
473
474
475 class WIFI_POSITIONING(_WIFI_POSITIONING):
476     PROTO = 0x69
477     DIR = Dir.EXT
478
479     @classmethod
480     def response(cls, lat=None, lon=None):
481         if lat is None or lon is None:
482             payload = b""
483         else:
484             payload = "{:+#010.8g},{:+#010.8g}".format(lat, lon).encode(
485                 "ascii"
486             )
487         return cls.make_packet(payload)
488
489
490 class MANUAL_POSITIONING(GPS303Pkt):
491     PROTO = 0x80
492     DIR = Dir.OUT
493
494     @classmethod
495     def from_packet(cls, length, payload):
496         self = super().from_packet(length, payload)
497         self.flag = payload[0] if len(payload) > 0 else None
498         self.reason = {
499             1: "Incorrect time",
500             2: "LBS less",
501             3: "WiFi less",
502             4: "LBS search > 3 times",
503             5: "Same LBS and WiFi data",
504             6: "LBS prohibited, WiFi absent",
505             7: "GPS spacing < 50 m",
506         }.get(self.flag, "Unknown")
507         return self
508
509     @classmethod
510     def response(cls):
511         return cls.make_packet(b"")
512
513
514 class BATTERY_CHARGE(GPS303Pkt):
515     PROTO = 0x81
516
517
518 class CHARGER_CONNECTED(GPS303Pkt):
519     PROTO = 0x82
520
521
522 class CHARGER_DISCONNECTED(GPS303Pkt):
523     PROTO = 0x83
524
525
526 class VIBRATION_RECEIVED(GPS303Pkt):
527     PROTO = 0x94
528
529
530 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
531     PROTO = 0x98
532     DIR = Dir.EXT
533
534     @classmethod
535     def from_packet(cls, length, payload):
536         self = super().from_packet(length, payload)
537         self.interval = unpack("!H", payload[:2])
538         return self
539
540     @classmethod
541     def response(cls, interval=10):
542         return cls.make_packet(pack("!H", interval))
543
544
545 class SOS_ALARM(GPS303Pkt):
546     PROTO = 0x99
547
548
549 # Build dicts protocol number -> class and class name -> protocol number
550 CLASSES = {}
551 PROTOS = {}
552 if True:  # just to indent the code, sorry!
553     for cls in [
554         cls
555         for name, cls in globals().items()
556         if isclass(cls)
557         and issubclass(cls, GPS303Pkt)
558         and not name.startswith("_")
559     ]:
560         if hasattr(cls, "PROTO"):
561             CLASSES[cls.PROTO] = cls
562             PROTOS[cls.__name__] = cls.PROTO
563
564
565 def class_by_prefix(prefix):
566     lst = [
567         (name, proto)
568         for name, proto in PROTOS.items()
569         if name.upper().startswith(prefix.upper())
570     ]
571     if len(lst) != 1:
572         return lst
573     _, proto = lst[0]
574     return CLASSES[proto]
575
576
577 def proto_by_name(name):
578     return PROTOS.get(name, -1)
579
580
581 def proto_of_message(packet):
582     return unpack("B", packet[1:2])[0]
583
584
585 def inline_response(packet):
586     proto = proto_of_message(packet)
587     if proto in CLASSES:
588         return CLASSES[proto].inline_response(packet)
589     else:
590         return None
591
592
593 def make_object(length, proto, payload):
594     if proto in CLASSES:
595         return CLASSES[proto].from_packet(length, payload)
596     else:
597         retobj = UNKNOWN.from_packet(length, payload)
598         retobj.PROTO = proto  # Override class attr with object attr
599         return retobj
600
601
602 def parse_message(packet):
603     length, proto = unpack("BB", packet[:2])
604     payload = packet[2:]
605     adjust = 2 if proto == STATUS.PROTO else 4  # Weird special case
606     if (
607         proto not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO)
608         and length > 1
609         and len(payload) + adjust != length
610     ):
611         log.warning(
612             "With proto %d length is %d but payload length is %d+%d",
613             proto,
614             length,
615             len(payload),
616             adjust,
617         )
618     return make_object(length, proto, payload)
619
620
621 def handle_packet(packet):  # DEPRECATED
622     if len(packet) < 6 or packet[:2] != b"xx" or packet[-2:] != b"\r\n":
623         return UNKNOWN.from_packet(len(packet), packet)
624     return parse_message(packet[2:-2])
625
626
627 def make_response(msg, **kwargs):  # DEPRECATED
628     inframe = msg.response(**kwargs)
629     return None if inframe is None else b"xx" + inframe + b"\r\n"