]> www.average.org Git - loctrkd.git/blob - gps303/gps303proto.py
27733a3fa48f246212d7d86e33a0106348ec651c
[loctrkd.git] / gps303 / gps303proto.py
1 """
2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
5
6 Forewarnings:
7 1. There is no security whatsoever. If you know the module's IMEI,
8    you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10    the TCP stream) makes it vulnerable to coincidental appearance
11    of framing bytes in the middle of the message. Most of the time
12    the server will receive one message in one TCP segment (i.e. in
13    one `recv()` operation, but relying on that would break things
14    if the path has lower MTU than the size of a message.
15 """
16
17 from datetime import datetime, timezone
18 from enum import Enum
19 from inspect import isclass
20 from logging import getLogger
21 from struct import pack, unpack
22
23 __all__ = (
24     "class_by_prefix",
25     "handle_packet",
26     "inline_response",
27     "make_object",
28     "make_response",
29     "parse_message",
30     "proto_by_name",
31     "GPS303Pkt",
32     "UNKNOWN",
33     "LOGIN",
34     "SUPERVISION",
35     "HEARTBEAT",
36     "GPS_POSITIONING",
37     "GPS_OFFLINE_POSITIONING",
38     "STATUS",
39     "HIBERNATION",
40     "RESET",
41     "WHITELIST_TOTAL",
42     "WIFI_OFFLINE_POSITIONING",
43     "TIME",
44     "MOM_PHONE",
45     "STOP_ALARM",
46     "SETUP",
47     "SYNCHRONOUS_WHITELIST",
48     "RESTORE_PASSWORD",
49     "WIFI_POSITIONING",
50     "MANUAL_POSITIONING",
51     "BATTERY_CHARGE",
52     "CHARGER_CONNECTED",
53     "CHARGER_DISCONNECTED",
54     "VIBRATION_RECEIVED",
55     "POSITION_UPLOAD_INTERVAL",
56 )
57
58 log = getLogger("gps303")
59
60
61 class Dir(Enum):
62     IN = 0  # Incoming, no response needed
63     INLINE = 2  # Birirectional, use `inline_response()`
64     EXT = 3  # Birirectional, use external responder
65     OUT = 4  # Outgoing, should not appear on input
66
67
68 class GPS303Pkt:
69     PROTO: int
70     DIR = Dir.INLINE  # Most packets anticipate simple acknowledgement
71
72     def __init__(self, *args, **kwargs):
73         assert len(args) == 0
74         for k, v in kwargs.items():
75             setattr(self, k, v)
76
77     def __repr__(self):
78         return "{}({})".format(
79             self.__class__.__name__,
80             ", ".join(
81                 "{}={}".format(
82                     k,
83                     'bytes.fromhex("{}")'.format(v.hex())
84                     if isinstance(v, bytes)
85                     else v.__repr__(),
86                 )
87                 for k, v in self.__dict__.items()
88                 if not k.startswith("_")
89             ),
90         )
91
92     @classmethod
93     def from_packet(cls, length, payload):
94         return cls(payload=payload, length=length)
95
96     def to_packet(self):
97         return pack("BB", self.length, self.PROTO) + self.payload
98
99     @classmethod
100     def make_packet(cls, payload):
101         assert isinstance(payload, bytes)
102         length = len(payload) + 1  # plus proto byte
103         # if length > 6:
104         #     length -= 6
105         return pack("BB", length, cls.PROTO) + payload
106
107     @classmethod
108     def inline_response(cls, packet):
109         if cls.DIR is Dir.INLINE:
110             return cls.make_packet(b"")
111         else:
112             return None
113
114
115 class UNKNOWN(GPS303Pkt):
116     PROTO = 256  # > 255 is impossible in real packets
117     DIR = Dir.IN
118
119
120 class LOGIN(GPS303Pkt):
121     PROTO = 0x01
122     # Default response for ACK, can also respond with STOP_UPLOAD
123
124     @classmethod
125     def from_packet(cls, length, payload):
126         self = super().from_packet(length, payload)
127         self.imei = payload[:-1].hex()
128         self.ver = unpack("B", payload[-1:])[0]
129         return self
130
131
132 class SUPERVISION(GPS303Pkt):
133     PROTO = 0x05
134     DIR = Dir.OUT
135
136     @classmethod
137     def response(cls, status=0):
138         # 1: The device automatically answers Pickup effect
139         # 2: Automatically Answering Two-way Calls
140         # 3: Ring manually answer the two-way call
141         return cls.make_packet(pack("B", status))
142
143
144 class HEARTBEAT(GPS303Pkt):
145     PROTO = 0x08
146
147
148 class _GPS_POSITIONING(GPS303Pkt):
149     @classmethod
150     def from_packet(cls, length, payload):
151         self = super().from_packet(length, payload)
152         self.dtime = payload[:6]
153         if self.dtime == b"\0\0\0\0\0\0":
154             self.devtime = None
155         else:
156             self.devtime = datetime(
157                 *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
158             )
159         self.gps_data_length = payload[6] >> 4
160         self.gps_nb_sat = payload[6] & 0x0F
161         lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
162         self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
163         flip_lon = bool(flags & 0b0000100000000000)  # bit 4
164         flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
165         self.heading = flags & 0b0000001111111111  # bits 6 - last
166         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
167         self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
168         self.speed = speed
169         self.flags = flags
170         return self
171
172     @classmethod
173     def inline_response(cls, packet):
174         tup = datetime.utcnow().timetuple()
175         ttup = (tup[0] % 100,) + tup[1:6]
176         return cls.make_packet(pack("BBBBBB", *ttup))
177
178
179 class GPS_POSITIONING(_GPS_POSITIONING):
180     PROTO = 0x10
181
182
183 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
184     PROTO = 0x11
185
186
187 class STATUS(GPS303Pkt):
188     PROTO = 0x13
189     DIR = Dir.EXT
190
191     @classmethod
192     def from_packet(cls, length, payload):
193         self = super().from_packet(length, payload)
194         if len(payload) == 5:
195             (
196                 self.batt,
197                 self.ver,
198                 self.timezone,
199                 self.intvl,
200                 self.signal,
201             ) = unpack("BBBBB", payload)
202         elif len(payload) == 4:
203             self.batt, self.ver, self.timezone, self.intvl = unpack(
204                 "BBBB", payload
205             )
206             self.signal = None
207         return self
208
209     @classmethod
210     def response(cls, upload_interval=25):  # Set interval in minutes
211         return cls.make_packet(pack("B", upload_interval))
212
213
214 class HIBERNATION(GPS303Pkt):
215     PROTO = 0x14
216     DIR = Dir.EXT
217
218     @classmethod
219     def response(cls):  # Server can send to send devicee to sleep
220         return cls.make_packet(b"")
221
222
223 class RESET(GPS303Pkt):  # Device sends when it got reset SMS
224     PROTO = 0x15
225     DIR = Dir.EXT
226
227     @classmethod
228     def response(cls):  # Server can send to initiate factory reset
229         return cls.make_packet(b"")
230
231
232 class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
233     PROTO = 0x16
234     DIR = Dir.OUT
235
236     @classmethod
237     def response(cls, number=3):  # Number of whitelist entries
238         return cls.make_packet(pack("B", number))
239
240
241 class _WIFI_POSITIONING(GPS303Pkt):
242     @classmethod
243     def from_packet(cls, length, payload):
244         self = super().from_packet(length, payload)
245         self.dtime = payload[:6]
246         if self.dtime == b"\0\0\0\0\0\0":
247             self.devtime = None
248         else:
249             self.devtime = datetime.strptime(
250                 self.dtime.hex(), "%y%m%d%H%M%S"
251             ).astimezone(tz=timezone.utc)
252         self.wifi_aps = []
253         for i in range(self.length):  # length has special meaning here
254             slice = payload[6 + i * 7 : 13 + i * 7]
255             self.wifi_aps.append(
256                 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
257             )
258         gsm_slice = payload[6 + self.length * 7 :]
259         ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
260         self.gsm_cells = []
261         for i in range(ncells):
262             slice = gsm_slice[4 + i * 5 : 9 + i * 5]
263             locac, cellid, sigstr = unpack(
264                 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
265             )
266             self.gsm_cells.append((locac, cellid, -sigstr))
267         return self
268
269
270 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
271     PROTO = 0x17
272
273     @classmethod
274     def inline_response(cls, packet):
275         return cls.make_packet(
276             bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
277         )
278
279
280 class TIME(GPS303Pkt):
281     PROTO = 0x30
282
283     @classmethod
284     def inline_response(cls, packet):
285         return cls.make_packet(
286             pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
287         )
288
289
290 class PROHIBIT_LBS(GPS303Pkt):
291     PROTO = 0x33
292     DIR = Dir.OUT
293
294     @classmethod
295     def response(cls, status=1):  # Server sent, 0-off, 1-on
296         return cls.make_packet(pack("B", status))
297
298
299 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
300     PROTO = 0x34
301     DIR = Dir.OUT
302
303     @classmethod
304     def response(cls):
305         # Data is in packed decimal
306         # 00/01 - GPS on/off
307         # 00/01 - Don't set / Set upload period
308         # HHMMHHMM - Upload period
309         # 00/01 - LBS on/off
310         # 00/01 - Don't set / Set time of boot
311         # HHMM  - Time of boot
312         # 00/01 - Don't set / Set time of shutdown
313         # HHMM  - Time of shutdown
314         return cls.make_packet(b"")  # TODO
315
316
317 class _SET_PHONE(GPS303Pkt):
318     DIR = Dir.OUT
319
320     @classmethod
321     def response(cls, phone):
322         return cls.make_packet(phone.encode())
323
324
325 class REMOTE_MONITOR_PHONE(_SET_PHONE):
326     PROTO = 0x40
327
328
329 class SOS_PHONE(_SET_PHONE):
330     PROTO = 0x41
331
332
333 class DAD_PHONE(_SET_PHONE):
334     PROTO = 0x42
335
336
337 class MOM_PHONE(_SET_PHONE):
338     PROTO = 0x43
339
340
341 class STOP_UPLOAD(GPS303Pkt):  # Server response to LOGIN to thwart the device
342     PROTO = 0x44
343     DIR = Dir.OUT
344
345     @classmethod
346     def response(cls):
347         return cls.make_packet(b"")
348
349
350 class GPS_OFF_PERIOD(GPS303Pkt):
351     PROTO = 0x46
352     DIR = Dir.OUT
353
354     @classmethod
355     def response(cls, onoff=0, fm="0000", to="2359"):
356         return cls.make_packet(
357             pack("B", onoff) + bytes.fromhex(fm) + bytes.fromhex(to)
358         )
359
360
361 class DND_PERIOD(GPS303Pkt):
362     PROTO = 0x47
363     DIR = Dir.OUT
364
365     @classmethod
366     def response(
367         cls, onoff=0, week=3, fm1="0000", to1="2359", fm2="0000", to2="2359"
368     ):
369         return cls.make_packet(
370             pack("B", onoff)
371             + pack("B", week)
372             + bytes.fromhex(fm1)
373             + bytes.fromhex(to1)
374             + bytes.fromhex(fm2)
375             + bytes.fromhex(to2)
376         )
377
378
379 class RESTART_SHUTDOWN(GPS303Pkt):
380     PROTO = 0x48
381     DIR = Dir.OUT
382
383     @classmethod
384     def response(cls, flag=2):
385         # 1 - restart
386         # 2 - shutdown
387         return cls.make_packet(pack("B", flag))
388
389
390 class DEVICE(GPS303Pkt):
391     PROTO = 0x49
392     DIR = Dir.OUT
393
394     @classmethod
395     def response(cls, flag=0):
396         # 0 - Stop looking for equipment
397         # 1 - Start looking for equipment
398         return cls.make_packet(pack("B", flag))
399
400
401 class ALARM_CLOCK(GPS303Pkt):
402     PROTO = 0x50
403     DIR = Dir.OUT
404
405     @classmethod
406     def response(cls, alarms=((0, "0000"), (0, "0000"), (0, "0000"))):
407         return b"".join(
408             cls("B", day) + bytes.fromhex(tm) for day, tm in alarms
409         )
410
411
412 class STOP_ALARM(GPS303Pkt):
413     PROTO = 0x56
414
415     @classmethod
416     def from_packet(cls, length, payload):
417         self = super().from_packet(length, payload)
418         self.flag = payload[0]
419
420
421 class SETUP(GPS303Pkt):
422     PROTO = 0x57
423     DIR = Dir.EXT
424
425     @classmethod
426     def response(
427         cls,
428         uploadintervalseconds=0x0300,
429         binaryswitch=0b00110001,
430         alarms=[0, 0, 0],
431         dndtimeswitch=0,
432         dndtimes=[0, 0, 0],
433         gpstimeswitch=0,
434         gpstimestart=0,
435         gpstimestop=0,
436         phonenumbers=["", "", ""],
437     ):
438         def pack3b(x):
439             return pack("!I", x)[1:]
440
441         payload = b"".join(
442             [
443                 pack("!H", uploadintervalseconds),
444                 pack("B", binaryswitch),
445             ]
446             + [pack3b(el) for el in alarms]
447             + [
448                 pack("B", dndtimeswitch),
449             ]
450             + [pack3b(el) for el in dndtimes]
451             + [
452                 pack("B", gpstimeswitch),
453                 pack("!H", gpstimestart),
454                 pack("!H", gpstimestop),
455             ]
456             + [b";".join([el.encode() for el in phonenumbers])]
457         )
458         return cls.make_packet(payload)
459
460
461 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
462     PROTO = 0x58
463
464
465 class RESTORE_PASSWORD(GPS303Pkt):
466     PROTO = 0x67
467
468
469 class WIFI_POSITIONING(_WIFI_POSITIONING):
470     PROTO = 0x69
471     DIR = Dir.EXT
472
473     @classmethod
474     def response(cls, lat=None, lon=None):
475         if lat is None or lon is None:
476             payload = b""
477         else:
478             payload = "{:+#010.8g},{:+#010.8g}".format(lat, lon).encode(
479                 "ascii"
480             )
481         return cls.make_packet(payload)
482
483
484 class MANUAL_POSITIONING(GPS303Pkt):
485     PROTO = 0x80
486     DIR = Dir.EXT
487
488     @classmethod
489     def from_packet(cls, length, payload):
490         self = super().from_packet(length, payload)
491         self.flag = payload[0]
492         self.reason = {
493             1: "Incorrect time",
494             2: "LBS less",
495             3: "WiFi less",
496             4: "LBS search > 3 times",
497             5: "Same LBS and WiFi data",
498             6: "LBS prohibited, WiFi absent",
499             7: "GPS spacing < 50 m",
500         }.get(self.flag, "Unknown")
501
502     @classmethod
503     def response(cls):
504         return cls.make_packet(b"")
505
506
507 class BATTERY_CHARGE(GPS303Pkt):
508     PROTO = 0x81
509
510
511 class CHARGER_CONNECTED(GPS303Pkt):
512     PROTO = 0x82
513
514
515 class CHARGER_DISCONNECTED(GPS303Pkt):
516     PROTO = 0x83
517
518
519 class VIBRATION_RECEIVED(GPS303Pkt):
520     PROTO = 0x94
521
522
523 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
524     PROTO = 0x98
525     DIR = Dir.EXT
526
527     @classmethod
528     def from_packet(cls, length, payload):
529         self = super().from_packet(length, payload)
530         self.interval = unpack("!H", payload[:2])
531         return self
532
533     @classmethod
534     def response(cls, interval=10):
535         return cls.make_packet(pack("!H", interval))
536
537
538 class SOS_ALARM(GPS303Pkt):
539     PROTO = 0x99
540
541
542 # Build dicts protocol number -> class and class name -> protocol number
543 CLASSES = {}
544 PROTOS = {}
545 if True:  # just to indent the code, sorry!
546     for cls in [
547         cls
548         for name, cls in globals().items()
549         if isclass(cls)
550         and issubclass(cls, GPS303Pkt)
551         and not name.startswith("_")
552     ]:
553         if hasattr(cls, "PROTO"):
554             CLASSES[cls.PROTO] = cls
555             PROTOS[cls.__name__] = cls.PROTO
556
557
558 def class_by_prefix(prefix):
559     lst = [(name, proto) for name, proto in PROTOS.items()
560             if name.upper().startswith(prefix.upper())]
561     if len(lst) != 1:
562         return lst
563     _, proto = lst[0]
564     return CLASSES[proto]
565
566
567 def proto_by_name(name):
568     return PROTOS.get(name, -1)
569
570
571 def proto_of_message(packet):
572     return unpack("B", packet[1:2])[0]
573
574
575 def inline_response(packet):
576     proto = proto_of_message(packet)
577     if proto in CLASSES:
578         return CLASSES[proto].inline_response(packet)
579     else:
580         return None
581
582
583 def make_object(length, proto, payload):
584     if proto in CLASSES:
585         return CLASSES[proto].from_packet(length, payload)
586     else:
587         retobj = UNKNOWN.from_packet(length, payload)
588         retobj.PROTO = proto  # Override class attr with object attr
589         return retobj
590
591
592 def parse_message(packet):
593     length, proto = unpack("BB", packet[:2])
594     payload = packet[2:]
595     adjust = 2 if proto == STATUS.PROTO else 4  # Weird special case
596     if (
597         proto not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO)
598         and length > 1
599         and len(payload) + adjust != length
600     ):
601         log.warning(
602             "With proto %d length is %d but payload length is %d+%d",
603             proto,
604             length,
605             len(payload),
606             adjust,
607         )
608     return make_object(length, proto, payload)
609
610
611 def handle_packet(packet):  # DEPRECATED
612     if len(packet) < 6 or packet[:2] != b"xx" or packet[-2:] != b"\r\n":
613         return UNKNOWN.from_packet(len(packet), packet)
614     return parse_message(packet[2:-2])
615
616
617 def make_response(msg, **kwargs):  # DEPRECATED
618     inframe = msg.response(**kwargs)
619     return None if inframe is None else b"xx" + inframe + b"\r\n"