]> www.average.org Git - loctrkd.git/blob - gps303/gps303proto.py
b0259a49631e1573af22825fad081ece6030d28f
[loctrkd.git] / gps303 / gps303proto.py
1 """
2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
5
6 Forewarnings:
7 1. There is no security whatsoever. If you know the module's IMEI,
8    you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10    the TCP stream) makes it vulnerable to coincidental appearance
11    of framing bytes in the middle of the message. Most of the time
12    the server will receive one message in one TCP segment (i.e. in
13    one `recv()` operation, but relying on that would break things
14    if the path has lower MTU than the size of a message.
15 """
16
17 from datetime import datetime, timezone
18 from enum import Enum
19 from inspect import isclass
20 from logging import getLogger
21 from struct import pack, unpack
22
23 __all__ = (
24     "class_by_prefix",
25     "inline_response",
26     "make_object",
27     "parse_message",
28     "proto_by_name",
29     "Dir",
30     "GPS303Pkt",
31     "UNKNOWN",
32     "LOGIN",
33     "SUPERVISION",
34     "HEARTBEAT",
35     "GPS_POSITIONING",
36     "GPS_OFFLINE_POSITIONING",
37     "STATUS",
38     "HIBERNATION",
39     "RESET",
40     "WHITELIST_TOTAL",
41     "WIFI_OFFLINE_POSITIONING",
42     "TIME",
43     "MOM_PHONE",
44     "STOP_ALARM",
45     "SETUP",
46     "SYNCHRONOUS_WHITELIST",
47     "RESTORE_PASSWORD",
48     "WIFI_POSITIONING",
49     "MANUAL_POSITIONING",
50     "BATTERY_CHARGE",
51     "CHARGER_CONNECTED",
52     "CHARGER_DISCONNECTED",
53     "VIBRATION_RECEIVED",
54     "POSITION_UPLOAD_INTERVAL",
55 )
56
57 log = getLogger("gps303")
58
59
60 class Dir(Enum):
61     IN = 0  # Incoming, no response needed
62     INLINE = 2  # Birirectional, use `inline_response()`
63     EXT = 3  # Birirectional, use external responder
64     OUT = 4  # Outgoing, should not appear on input
65
66
67 class GPS303Pkt:
68     PROTO: int
69     DIR = Dir.IN  # Do not send anything back by default
70
71     def __init__(self, *args, **kwargs):
72         assert len(args) == 0
73         for k, v in kwargs.items():
74             setattr(self, k, v)
75
76     def __repr__(self):
77         return "{}({})".format(
78             self.__class__.__name__,
79             ", ".join(
80                 "{}={}".format(
81                     k,
82                     'bytes.fromhex("{}")'.format(v.hex())
83                     if isinstance(v, bytes)
84                     else v.__repr__(),
85                 )
86                 for k, v in self.__dict__.items()
87                 if not k.startswith("_")
88             ),
89         )
90
91     @classmethod
92     def from_packet(cls, length, payload):
93         return cls(payload=payload, length=length)
94
95     def to_packet(self):
96         return pack("BB", self.length, self.PROTO) + self.payload
97
98     @classmethod
99     def make_packet(cls, payload):
100         assert isinstance(payload, bytes)
101         length = len(payload) + 1  # plus proto byte
102         # if length > 6:
103         #     length -= 6
104         return pack("BB", length, cls.PROTO) + payload
105
106     @classmethod
107     def inline_response(cls, packet):
108         if cls.DIR is Dir.INLINE:
109             return cls.make_packet(b"")
110         else:
111             return None
112
113
114 class UNKNOWN(GPS303Pkt):
115     PROTO = 256  # > 255 is impossible in real packets
116
117
118 class LOGIN(GPS303Pkt):
119     PROTO = 0x01
120     DIR = Dir.INLINE
121     # Default response for ACK, can also respond with STOP_UPLOAD
122
123     @classmethod
124     def from_packet(cls, length, payload):
125         self = super().from_packet(length, payload)
126         self.imei = payload[:-1].hex()
127         self.ver = unpack("B", payload[-1:])[0]
128         return self
129
130
131 class SUPERVISION(GPS303Pkt):
132     PROTO = 0x05
133     DIR = Dir.OUT
134
135     @classmethod
136     def response(cls, status=0):
137         # 1: The device automatically answers Pickup effect
138         # 2: Automatically Answering Two-way Calls
139         # 3: Ring manually answer the two-way call
140         return cls.make_packet(pack("B", status))
141
142
143 class HEARTBEAT(GPS303Pkt):
144     PROTO = 0x08
145     DIR = Dir.INLINE
146
147
148 class _GPS_POSITIONING(GPS303Pkt):
149     DIR = Dir.INLINE
150
151     @classmethod
152     def from_packet(cls, length, payload):
153         self = super().from_packet(length, payload)
154         self.dtime = payload[:6]
155         if self.dtime == b"\0\0\0\0\0\0":
156             self.devtime = None
157         else:
158             self.devtime = datetime(
159                 *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
160             )
161         self.gps_data_length = payload[6] >> 4
162         self.gps_nb_sat = payload[6] & 0x0F
163         lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
164         self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
165         flip_lon = bool(flags & 0b0000100000000000)  # bit 4
166         flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
167         self.heading = flags & 0b0000001111111111  # bits 6 - last
168         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
169         self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
170         self.speed = speed
171         self.flags = flags
172         return self
173
174     @classmethod
175     def inline_response(cls, packet):
176         tup = datetime.utcnow().timetuple()
177         ttup = (tup[0] % 100,) + tup[1:6]
178         return cls.make_packet(pack("BBBBBB", *ttup))
179
180
181 class GPS_POSITIONING(_GPS_POSITIONING):
182     PROTO = 0x10
183
184
185 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
186     PROTO = 0x11
187
188
189 class STATUS(GPS303Pkt):
190     PROTO = 0x13
191     DIR = Dir.EXT
192
193     @classmethod
194     def from_packet(cls, length, payload):
195         self = super().from_packet(length, payload)
196         if len(payload) == 5:
197             (
198                 self.batt,
199                 self.ver,
200                 self.timezone,
201                 self.intvl,
202                 self.signal,
203             ) = unpack("BBBBB", payload)
204         elif len(payload) == 4:
205             self.batt, self.ver, self.timezone, self.intvl = unpack(
206                 "BBBB", payload
207             )
208             self.signal = None
209         return self
210
211     @classmethod
212     def response(cls, upload_interval=25):  # Set interval in minutes
213         return cls.make_packet(pack("B", upload_interval))
214
215
216 class HIBERNATION(GPS303Pkt):
217     PROTO = 0x14
218     DIR = Dir.INLINE
219
220     @classmethod
221     def response(cls):  # Server can send to send devicee to sleep
222         return cls.make_packet(b"")
223
224
225 class RESET(GPS303Pkt):  # Device sends when it got reset SMS
226     PROTO = 0x15
227
228     @classmethod
229     def response(cls):  # Server can send to initiate factory reset
230         return cls.make_packet(b"")
231
232
233 class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
234     PROTO = 0x16
235     DIR = Dir.OUT
236
237     @classmethod
238     def response(cls, number=3):  # Number of whitelist entries
239         return cls.make_packet(pack("B", number))
240
241
242 class _WIFI_POSITIONING(GPS303Pkt):
243     @classmethod
244     def from_packet(cls, length, payload):
245         self = super().from_packet(length, payload)
246         self.dtime = payload[:6]
247         if self.dtime == b"\0\0\0\0\0\0":
248             self.devtime = None
249         else:
250             self.devtime = datetime.strptime(
251                 self.dtime.hex(), "%y%m%d%H%M%S"
252             ).astimezone(tz=timezone.utc)
253         self.wifi_aps = []
254         for i in range(self.length):  # length has special meaning here
255             slice = payload[6 + i * 7 : 13 + i * 7]
256             self.wifi_aps.append(
257                 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
258             )
259         gsm_slice = payload[6 + self.length * 7 :]
260         ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
261         self.gsm_cells = []
262         for i in range(ncells):
263             slice = gsm_slice[4 + i * 5 : 9 + i * 5]
264             locac, cellid, sigstr = unpack(
265                 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
266             )
267             self.gsm_cells.append((locac, cellid, -sigstr))
268         return self
269
270
271 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
272     PROTO = 0x17
273     DIR = Dir.INLINE
274
275     @classmethod
276     def inline_response(cls, packet):
277         return cls.make_packet(
278             bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
279         )
280
281
282 class TIME(GPS303Pkt):
283     PROTO = 0x30
284     DIR = Dir.INLINE
285
286     @classmethod
287     def inline_response(cls, packet):
288         return cls.make_packet(
289             pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
290         )
291
292
293 class PROHIBIT_LBS(GPS303Pkt):
294     PROTO = 0x33
295     DIR = Dir.OUT
296
297     @classmethod
298     def response(cls, status=1):  # Server sent, 0-off, 1-on
299         return cls.make_packet(pack("B", status))
300
301
302 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
303     PROTO = 0x34
304     DIR = Dir.OUT
305
306     @classmethod
307     def response(cls):
308         # Data is in packed decimal
309         # 00/01 - GPS on/off
310         # 00/01 - Don't set / Set upload period
311         # HHMMHHMM - Upload period
312         # 00/01 - LBS on/off
313         # 00/01 - Don't set / Set time of boot
314         # HHMM  - Time of boot
315         # 00/01 - Don't set / Set time of shutdown
316         # HHMM  - Time of shutdown
317         return cls.make_packet(b"")  # TODO
318
319
320 class _SET_PHONE(GPS303Pkt):
321     DIR = Dir.OUT
322
323     @classmethod
324     def response(cls, phone):
325         return cls.make_packet(phone.encode())
326
327
328 class REMOTE_MONITOR_PHONE(_SET_PHONE):
329     PROTO = 0x40
330
331
332 class SOS_PHONE(_SET_PHONE):
333     PROTO = 0x41
334
335
336 class DAD_PHONE(_SET_PHONE):
337     PROTO = 0x42
338
339
340 class MOM_PHONE(_SET_PHONE):
341     PROTO = 0x43
342
343
344 class STOP_UPLOAD(GPS303Pkt):  # Server response to LOGIN to thwart the device
345     PROTO = 0x44
346     DIR = Dir.OUT
347
348     @classmethod
349     def response(cls):
350         return cls.make_packet(b"")
351
352
353 class GPS_OFF_PERIOD(GPS303Pkt):
354     PROTO = 0x46
355     DIR = Dir.OUT
356
357     @classmethod
358     def response(cls, onoff=0, fm="0000", to="2359"):
359         return cls.make_packet(
360             pack("B", onoff) + bytes.fromhex(fm) + bytes.fromhex(to)
361         )
362
363
364 class DND_PERIOD(GPS303Pkt):
365     PROTO = 0x47
366     DIR = Dir.OUT
367
368     @classmethod
369     def response(
370         cls, onoff=0, week=3, fm1="0000", to1="2359", fm2="0000", to2="2359"
371     ):
372         return cls.make_packet(
373             pack("B", onoff)
374             + pack("B", week)
375             + bytes.fromhex(fm1)
376             + bytes.fromhex(to1)
377             + bytes.fromhex(fm2)
378             + bytes.fromhex(to2)
379         )
380
381
382 class RESTART_SHUTDOWN(GPS303Pkt):
383     PROTO = 0x48
384     DIR = Dir.OUT
385
386     @classmethod
387     def response(cls, flag=2):
388         # 1 - restart
389         # 2 - shutdown
390         return cls.make_packet(pack("B", flag))
391
392
393 class DEVICE(GPS303Pkt):
394     PROTO = 0x49
395     DIR = Dir.OUT
396
397     @classmethod
398     def response(cls, flag=0):
399         # 0 - Stop looking for equipment
400         # 1 - Start looking for equipment
401         return cls.make_packet(pack("B", flag))
402
403
404 class ALARM_CLOCK(GPS303Pkt):
405     PROTO = 0x50
406     DIR = Dir.OUT
407
408     @classmethod
409     def response(cls, alarms=((0, "0000"), (0, "0000"), (0, "0000"))):
410         return b"".join(
411             cls("B", day) + bytes.fromhex(tm) for day, tm in alarms
412         )
413
414
415 class STOP_ALARM(GPS303Pkt):
416     PROTO = 0x56
417
418     @classmethod
419     def from_packet(cls, length, payload):
420         self = super().from_packet(length, payload)
421         self.flag = payload[0]
422         return self
423
424
425 class SETUP(GPS303Pkt):
426     PROTO = 0x57
427     DIR = Dir.EXT
428
429     @classmethod
430     def response(
431         cls,
432         uploadintervalseconds=0x0300,
433         binaryswitch=0b00110001,
434         alarms=[0, 0, 0],
435         dndtimeswitch=0,
436         dndtimes=[0, 0, 0],
437         gpstimeswitch=0,
438         gpstimestart=0,
439         gpstimestop=0,
440         phonenumbers=["", "", ""],
441     ):
442         def pack3b(x):
443             return pack("!I", x)[1:]
444
445         payload = b"".join(
446             [
447                 pack("!H", uploadintervalseconds),
448                 pack("B", binaryswitch),
449             ]
450             + [pack3b(el) for el in alarms]
451             + [
452                 pack("B", dndtimeswitch),
453             ]
454             + [pack3b(el) for el in dndtimes]
455             + [
456                 pack("B", gpstimeswitch),
457                 pack("!H", gpstimestart),
458                 pack("!H", gpstimestop),
459             ]
460             + [b";".join([el.encode() for el in phonenumbers])]
461         )
462         return cls.make_packet(payload)
463
464
465 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
466     PROTO = 0x58
467
468
469 class RESTORE_PASSWORD(GPS303Pkt):
470     PROTO = 0x67
471
472
473 class WIFI_POSITIONING(_WIFI_POSITIONING):
474     PROTO = 0x69
475     DIR = Dir.EXT
476
477     @classmethod
478     def response(cls, lat=None, lon=None):
479         if lat is None or lon is None:
480             payload = b""
481         else:
482             payload = "{:+#010.8g},{:+#010.8g}".format(lat, lon).encode(
483                 "ascii"
484             )
485         return cls.make_packet(payload)
486
487
488 class MANUAL_POSITIONING(GPS303Pkt):
489     PROTO = 0x80
490     DIR = Dir.OUT
491
492     @classmethod
493     def from_packet(cls, length, payload):
494         self = super().from_packet(length, payload)
495         self.flag = payload[0] if len(payload) > 0 else None
496         self.reason = {
497             1: "Incorrect time",
498             2: "LBS less",
499             3: "WiFi less",
500             4: "LBS search > 3 times",
501             5: "Same LBS and WiFi data",
502             6: "LBS prohibited, WiFi absent",
503             7: "GPS spacing < 50 m",
504         }.get(self.flag, "Unknown")
505         return self
506
507     @classmethod
508     def response(cls):
509         return cls.make_packet(b"")
510
511
512 class BATTERY_CHARGE(GPS303Pkt):
513     PROTO = 0x81
514
515
516 class CHARGER_CONNECTED(GPS303Pkt):
517     PROTO = 0x82
518
519
520 class CHARGER_DISCONNECTED(GPS303Pkt):
521     PROTO = 0x83
522
523
524 class VIBRATION_RECEIVED(GPS303Pkt):
525     PROTO = 0x94
526
527
528 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
529     PROTO = 0x98
530     DIR = Dir.EXT
531
532     @classmethod
533     def from_packet(cls, length, payload):
534         self = super().from_packet(length, payload)
535         self.interval = unpack("!H", payload[:2])
536         return self
537
538     @classmethod
539     def response(cls, interval=10):
540         return cls.make_packet(pack("!H", interval))
541
542
543 class SOS_ALARM(GPS303Pkt):
544     PROTO = 0x99
545
546
547 # Build dicts protocol number -> class and class name -> protocol number
548 CLASSES = {}
549 PROTOS = {}
550 if True:  # just to indent the code, sorry!
551     for cls in [
552         cls
553         for name, cls in globals().items()
554         if isclass(cls)
555         and issubclass(cls, GPS303Pkt)
556         and not name.startswith("_")
557     ]:
558         if hasattr(cls, "PROTO"):
559             CLASSES[cls.PROTO] = cls
560             PROTOS[cls.__name__] = cls.PROTO
561
562
563 def class_by_prefix(prefix):
564     lst = [
565         (name, proto)
566         for name, proto in PROTOS.items()
567         if name.upper().startswith(prefix.upper())
568     ]
569     if len(lst) != 1:
570         return lst
571     _, proto = lst[0]
572     return CLASSES[proto]
573
574
575 def proto_by_name(name):
576     return PROTOS.get(name, -1)
577
578
579 def proto_of_message(packet):
580     return unpack("B", packet[1:2])[0]
581
582
583 def inline_response(packet):
584     proto = proto_of_message(packet)
585     if proto in CLASSES:
586         return CLASSES[proto].inline_response(packet)
587     else:
588         return None
589
590
591 def make_object(length, proto, payload):
592     if proto in CLASSES:
593         return CLASSES[proto].from_packet(length, payload)
594     else:
595         retobj = UNKNOWN.from_packet(length, payload)
596         retobj.PROTO = proto  # Override class attr with object attr
597         return retobj
598
599
600 def parse_message(packet):
601     length, proto = unpack("BB", packet[:2])
602     payload = packet[2:]
603     adjust = 2 if proto == STATUS.PROTO else 4  # Weird special case
604     if (
605         proto not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO)
606         and length > 1
607         and len(payload) + adjust != length
608     ):
609         log.warning(
610             "With proto %d length is %d but payload length is %d+%d",
611             proto,
612             length,
613             len(payload),
614             adjust,
615         )
616     return make_object(length, proto, payload)