]> www.average.org Git - loctrkd.git/blob - gps303/gps303proto.py
db8fd014637279ca256b14d44466b3d7e5eb3a0f
[loctrkd.git] / gps303 / gps303proto.py
1 """
2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
5
6 Forewarnings:
7 1. There is no security whatsoever. If you know the module's IMEI,
8    you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10    the TCP stream) makes it vulnerable to coincidental appearance
11    of framing bytes in the middle of the message. Most of the time
12    the server will receive one message in one TCP segment (i.e. in
13    one `recv()` operation, but relying on that would break things
14    if the path has lower MTU than the size of a message.
15 """
16
17 from datetime import datetime, timezone
18 from enum import Enum
19 from inspect import isclass
20 from logging import getLogger
21 from struct import pack, unpack
22
23 __all__ = (
24     "class_by_prefix",
25     "inline_response",
26     "make_object",
27     "parse_message",
28     "proto_by_name",
29     "Respond",
30     "GPS303Pkt",
31     "UNKNOWN",
32     "LOGIN",
33     "SUPERVISION",
34     "HEARTBEAT",
35     "GPS_POSITIONING",
36     "GPS_OFFLINE_POSITIONING",
37     "STATUS",
38     "HIBERNATION",
39     "RESET",
40     "WHITELIST_TOTAL",
41     "WIFI_OFFLINE_POSITIONING",
42     "TIME",
43     "MOM_PHONE",
44     "STOP_ALARM",
45     "SETUP",
46     "SYNCHRONOUS_WHITELIST",
47     "RESTORE_PASSWORD",
48     "WIFI_POSITIONING",
49     "MANUAL_POSITIONING",
50     "BATTERY_CHARGE",
51     "CHARGER_CONNECTED",
52     "CHARGER_DISCONNECTED",
53     "VIBRATION_RECEIVED",
54     "POSITION_UPLOAD_INTERVAL",
55 )
56
57 log = getLogger("gps303")
58
59
60 class MetaPkt(type):
61     """
62     For each class corresponding to a message, automatically create
63     two nested classes `In` and `Out` that also inherit from their
64     "nest". Class attribute `IN_KWARGS` defined in the "nest" is
65     copied to the `In` nested class under the name `KWARGS`, and
66     likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
67     to the nested class `Out`. In addition, method `encode` is
68     defined in both classes equal to `in_encode()` and `out_encode()`
69     respectively.
70     """
71
72     def __new__(cls, name, bases, attrs):
73         newcls = super().__new__(cls, name, bases, attrs)
74         nestattrs = {"encode": lambda self: self.in_encode()}
75         if "IN_KWARGS" in attrs:
76             nestattrs["KWARGS"] = attrs["IN_KWARGS"]
77         newcls.In = super().__new__(
78             cls,
79             name + ".In",
80             (newcls,) + bases,
81             nestattrs,
82         )
83         nestattrs = {"encode": lambda self: self.out_encode()}
84         if "OUT_KWARGS" in attrs:
85             nestattrs["KWARGS"] = attrs["OUT_KWARGS"]
86         newcls.Out = super().__new__(
87             cls,
88             name + ".Out",
89             (newcls,) + bases,
90             nestattrs,
91         )
92         return newcls
93
94
95 class Respond(Enum):
96     NON = 0  # Incoming, no response needed
97     INL = 1  # Birirectional, use `inline_response()`
98     EXT = 2  # Birirectional, use external responder
99
100
101 class GPS303Pkt(metaclass=MetaPkt):
102     RESPOND = Respond.NON  # Do not send anything back by default
103     PROTO: int
104     # Have these kwargs for now, TODO redo
105     IN_KWARGS = (("length", int, 0), ("payload", bytes, b""))
106
107     def __init__(self, *args, **kwargs):
108         assert len(args) == 0
109         for kw, typ, dfl in self.KWARGS:
110             setattr(self, kw, typ(kwargs.pop(kw, dfl)))
111         if kwargs:
112             print("KWARGS", self.KWARGS)
113             print("kwargs", kwargs)
114             raise TypeError(
115                 self.__class__.__name__ + " stray kwargs " + str(kwargs)
116             )
117
118     def __repr__(self):
119         return "{}({})".format(
120             self.__class__.__name__,
121             ", ".join(
122                 "{}={}".format(
123                     k,
124                     'bytes.fromhex("{}")'.format(v.hex())
125                     if isinstance(v, bytes)
126                     else v.__repr__(),
127                 )
128                 for k, v in self.__dict__.items()
129                 if not k.startswith("_")
130             ),
131         )
132
133     def in_encode(self):
134         raise NotImplementedError(
135             self.__class__.__name__ + ".encode() not implemented"
136         )
137
138     def out_encode(self):
139         return b""
140
141     @property
142     def packed(self):
143         payload = self.encode()
144         length = len(payload) + 1
145         return pack("BB", length, self.PROTO) + payload
146
147     @classmethod
148     def from_packet(cls, length, payload):
149         return cls.In(payload=payload, length=length)
150
151
152 class UNKNOWN(GPS303Pkt):
153     PROTO = 256  # > 255 is impossible in real packets
154
155
156 class LOGIN(GPS303Pkt):
157     PROTO = 0x01
158     RESPOND = Respond.INL
159     # Default response for ACK, can also respond with STOP_UPLOAD
160
161     @classmethod
162     def from_packet(cls, length, payload):
163         self = super().from_packet(length, payload)
164         self.imei = payload[:-1].hex()
165         self.ver = unpack("B", payload[-1:])[0]
166         return self
167
168
169 class SUPERVISION(GPS303Pkt):
170     PROTO = 0x05
171     OUT_KWARGS = (("status", int, 1),)
172
173     def out_encode(self):
174         # 1: The device automatically answers Pickup effect
175         # 2: Automatically Answering Two-way Calls
176         # 3: Ring manually answer the two-way call
177         return pack("B", self.status)
178
179
180 class HEARTBEAT(GPS303Pkt):
181     PROTO = 0x08
182     RESPOND = Respond.INL
183
184
185 class _GPS_POSITIONING(GPS303Pkt):
186     RESPOND = Respond.INL
187
188     @classmethod
189     def from_packet(cls, length, payload):
190         self = super().from_packet(length, payload)
191         self.dtime = payload[:6]
192         if self.dtime == b"\0\0\0\0\0\0":
193             self.devtime = None
194         else:
195             self.devtime = datetime(
196                 *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
197             )
198         self.gps_data_length = payload[6] >> 4
199         self.gps_nb_sat = payload[6] & 0x0F
200         lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
201         self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
202         flip_lon = bool(flags & 0b0000100000000000)  # bit 4
203         flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
204         self.heading = flags & 0b0000001111111111  # bits 6 - last
205         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
206         self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
207         self.speed = speed
208         self.flags = flags
209         return self
210
211     def out_encode(self):
212         tup = datetime.utcnow().timetuple()
213         ttup = (tup[0] % 100,) + tup[1:6]
214         return pack("BBBBBB", *ttup)
215
216
217 class GPS_POSITIONING(_GPS_POSITIONING):
218     PROTO = 0x10
219
220
221 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
222     PROTO = 0x11
223
224
225 class STATUS(GPS303Pkt):
226     PROTO = 0x13
227     RESPOND = Respond.EXT
228     OUT_KWARGS = (("upload_interval", int, 25),)
229
230     @classmethod
231     def from_packet(cls, length, payload):
232         self = super().from_packet(length, payload)
233         if len(payload) == 5:
234             (
235                 self.batt,
236                 self.ver,
237                 self.timezone,
238                 self.intvl,
239                 self.signal,
240             ) = unpack("BBBBB", payload)
241         elif len(payload) == 4:
242             self.batt, self.ver, self.timezone, self.intvl = unpack(
243                 "BBBB", payload
244             )
245             self.signal = None
246         return self
247
248     def out_encode(self):  # Set interval in minutes
249         return cls.make_packet(pack("B", self.upload_interval))
250
251
252 class HIBERNATION(GPS303Pkt):  # Server can send to send devicee to sleep
253     PROTO = 0x14
254     RESPOND = Respond.INL
255
256
257 class RESET(GPS303Pkt):
258     # Device sends when it got reset SMS
259     # Server can send to initiate factory reset
260     PROTO = 0x15
261
262
263 class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
264     PROTO = 0x16
265     OUT_KWARGS = (("number", int, 3),)
266
267     def out_encode(self):  # Number of whitelist entries
268         return pack("B", number)
269
270
271 class _WIFI_POSITIONING(GPS303Pkt):
272     @classmethod
273     def from_packet(cls, length, payload):
274         self = super().from_packet(length, payload)
275         self.dtime = payload[:6]
276         if self.dtime == b"\0\0\0\0\0\0":
277             self.devtime = None
278         else:
279             self.devtime = datetime.strptime(
280                 self.dtime.hex(), "%y%m%d%H%M%S"
281             ).astimezone(tz=timezone.utc)
282         self.wifi_aps = []
283         for i in range(self.length):  # length has special meaning here
284             slice = payload[6 + i * 7 : 13 + i * 7]
285             self.wifi_aps.append(
286                 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
287             )
288         gsm_slice = payload[6 + self.length * 7 :]
289         ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
290         self.gsm_cells = []
291         for i in range(ncells):
292             slice = gsm_slice[4 + i * 5 : 9 + i * 5]
293             locac, cellid, sigstr = unpack(
294                 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
295             )
296             self.gsm_cells.append((locac, cellid, -sigstr))
297         return self
298
299
300 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
301     PROTO = 0x17
302     RESPOND = Respond.INL
303
304     def out_encode(self):
305         return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
306
307
308 class TIME(GPS303Pkt):
309     PROTO = 0x30
310     RESPOND = Respond.INL
311
312     def out_encode(self):
313         return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
314
315
316 class PROHIBIT_LBS(GPS303Pkt):
317     PROTO = 0x33
318     OUT_KWARGS = (("status", int, 1),)
319
320     def out_encode(self):  # Server sent, 0-off, 1-on
321         return pack("B", self.status)
322
323
324 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
325     PROTO = 0x34
326
327     # Data is in packed decimal
328     # 00/01 - GPS on/off
329     # 00/01 - Don't set / Set upload period
330     # HHMMHHMM - Upload period
331     # 00/01 - LBS on/off
332     # 00/01 - Don't set / Set time of boot
333     # HHMM  - Time of boot
334     # 00/01 - Don't set / Set time of shutdown
335     # HHMM  - Time of shutdown
336     def out_encode(self):
337         return b""  # TODO
338
339
340 class _SET_PHONE(GPS303Pkt):
341     OUT_KWARGS = (("phone", str, ""),)
342
343     def out_encode(self):
344         return self.phone.encode()
345
346
347 class REMOTE_MONITOR_PHONE(_SET_PHONE):
348     PROTO = 0x40
349
350
351 class SOS_PHONE(_SET_PHONE):
352     PROTO = 0x41
353
354
355 class DAD_PHONE(_SET_PHONE):
356     PROTO = 0x42
357
358
359 class MOM_PHONE(_SET_PHONE):
360     PROTO = 0x43
361
362
363 class STOP_UPLOAD(GPS303Pkt):  # Server response to LOGIN to thwart the device
364     PROTO = 0x44
365
366
367 class GPS_OFF_PERIOD(GPS303Pkt):
368     PROTO = 0x46
369     OUT_KWARGS = (("onoff", int, 0), ("fm", str, "0000"), ("to", str, "2359"))
370
371     def out_encode(self):
372         return (
373             pack("B", self.onoff)
374             + bytes.fromhex(self.fm)
375             + bytes.fromhex(self.to)
376         )
377
378
379 class DND_PERIOD(GPS303Pkt):
380     PROTO = 0x47
381     OUT_KWARGS = (
382         ("onoff", int, 0),
383         ("week", int, 3),
384         ("fm1", str, "0000"),
385         ("to1", str, "2359"),
386         ("fm2", str, "0000"),
387         ("to2", str, "2359"),
388     )
389
390     def out_endode(self):
391         return (
392             pack("B", self.onoff)
393             + pack("B", self.week)
394             + bytes.fromhex(self.fm1)
395             + bytes.fromhex(self.to1)
396             + bytes.fromhex(self.fm2)
397             + bytes.fromhex(self.to2)
398         )
399
400
401 class RESTART_SHUTDOWN(GPS303Pkt):
402     PROTO = 0x48
403     OUT_KWARGS = (("flag", int, 2),)
404
405     def out_encode(self):
406         # 1 - restart
407         # 2 - shutdown
408         return pack("B", self.flag)
409
410
411 class DEVICE(GPS303Pkt):
412     PROTO = 0x49
413     OUT_KWARGS = (("flag", int, 0),)
414
415     # 0 - Stop looking for equipment
416     # 1 - Start looking for equipment
417     def out_encode(self):
418         return pack("B", self.flag)
419
420
421 class ALARM_CLOCK(GPS303Pkt):
422     PROTO = 0x50
423
424     def out_encode(self):
425         # TODO implement parsing kwargs
426         alarms = ((0, "0000"), (0, "0000"), (0, "0000"))
427         return b"".join(
428             cls("B", day) + bytes.fromhex(tm) for day, tm in alarms
429         )
430
431
432 class STOP_ALARM(GPS303Pkt):
433     PROTO = 0x56
434
435     @classmethod
436     def from_packet(cls, length, payload):
437         self = super().from_packet(length, payload)
438         self.flag = payload[0]
439         return self
440
441
442 class SETUP(GPS303Pkt):
443     PROTO = 0x57
444     RESPOND = Respond.EXT
445     OUT_KWARGS = (  # TODO handle properly
446         ("uploadintervalseconds", int, 0x0300),
447         ("binaryswitch", int, 0b00110001),
448         ("alarms", int, [0, 0, 0]),
449         ("dndtimeswitch", int, 0),
450         ("dndtimes", int, [0, 0, 0]),
451         ("gpstimeswitch", int, 0),
452         ("gpstimestart", int, 0),
453         ("gpstimestop", int, 0),
454         ("phonenumbers", int, ["", "", ""]),
455     )
456
457     def out_encode(self):
458         def pack3b(x):
459             return pack("!I", x)[1:]
460
461         return b"".join(
462             [
463                 pack("!H", self.uploadintervalseconds),
464                 pack("B", self.binaryswitch),
465             ]
466             + [pack3b(el) for el in self.alarms]
467             + [
468                 pack("B", self.dndtimeswitch),
469             ]
470             + [pack3b(el) for el in self.dndtimes]
471             + [
472                 pack("B", self.gpstimeswitch),
473                 pack("!H", self.gpstimestart),
474                 pack("!H", self.gpstimestop),
475             ]
476             + [b";".join([el.encode() for el in self.phonenumbers])]
477         )
478
479
480 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
481     PROTO = 0x58
482
483
484 class RESTORE_PASSWORD(GPS303Pkt):
485     PROTO = 0x67
486
487
488 class WIFI_POSITIONING(_WIFI_POSITIONING):
489     PROTO = 0x69
490     RESPOND = Respond.EXT
491     OUT_KWARGS = (("lat", float, None), ("lon", float, None))
492
493     def out_encode(self):
494         if self.lat is None or self.lon is None:
495             return b""
496         return "{:+#010.8g},{:+#010.8g}".format(self.lat, self.lon).encode()
497
498
499 class MANUAL_POSITIONING(GPS303Pkt):
500     PROTO = 0x80
501
502     @classmethod
503     def from_packet(cls, length, payload):
504         self = super().from_packet(length, payload)
505         self.flag = payload[0] if len(payload) > 0 else None
506         self.reason = {
507             1: "Incorrect time",
508             2: "LBS less",
509             3: "WiFi less",
510             4: "LBS search > 3 times",
511             5: "Same LBS and WiFi data",
512             6: "LBS prohibited, WiFi absent",
513             7: "GPS spacing < 50 m",
514         }.get(self.flag, "Unknown")
515         return self
516
517
518 class BATTERY_CHARGE(GPS303Pkt):
519     PROTO = 0x81
520
521
522 class CHARGER_CONNECTED(GPS303Pkt):
523     PROTO = 0x82
524
525
526 class CHARGER_DISCONNECTED(GPS303Pkt):
527     PROTO = 0x83
528
529
530 class VIBRATION_RECEIVED(GPS303Pkt):
531     PROTO = 0x94
532
533
534 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
535     PROTO = 0x98
536     RESPOND = Respond.EXT
537     OUT_KWARGS = (("interval", int, 10),)
538
539     @classmethod
540     def from_packet(cls, length, payload):
541         self = super().from_packet(length, payload)
542         self.interval = unpack("!H", payload[:2])
543         return self
544
545     def out_encode(self):
546         return pack("!H", interval)
547
548
549 class SOS_ALARM(GPS303Pkt):
550     PROTO = 0x99
551
552
553 # Build dicts protocol number -> class and class name -> protocol number
554 CLASSES = {}
555 PROTOS = {}
556 if True:  # just to indent the code, sorry!
557     for cls in [
558         cls
559         for name, cls in globals().items()
560         if isclass(cls)
561         and issubclass(cls, GPS303Pkt)
562         and not name.startswith("_")
563     ]:
564         if hasattr(cls, "PROTO"):
565             CLASSES[cls.PROTO] = cls
566             PROTOS[cls.__name__] = cls.PROTO
567
568
569 def class_by_prefix(prefix):
570     lst = [
571         (name, proto)
572         for name, proto in PROTOS.items()
573         if name.upper().startswith(prefix.upper())
574     ]
575     if len(lst) != 1:
576         return lst
577     _, proto = lst[0]
578     return CLASSES[proto]
579
580
581 def proto_by_name(name):
582     return PROTOS.get(name, -1)
583
584
585 def proto_of_message(packet):
586     return unpack("B", packet[1:2])[0]
587
588
589 def inline_response(packet):
590     proto = proto_of_message(packet)
591     if proto in CLASSES:
592         cls = CLASSES[proto]
593         if cls.RESPOND is Respond.INL:
594             return cls.Out().packed
595     return None
596
597
598 def make_object(length, proto, payload):
599     if proto in CLASSES:
600         return CLASSES[proto].from_packet(length, payload)
601     else:
602         retobj = UNKNOWN.from_packet(length, payload)
603         retobj.PROTO = proto  # Override class attr with object attr
604         return retobj
605
606
607 def parse_message(packet):
608     length, proto = unpack("BB", packet[:2])
609     payload = packet[2:]
610     adjust = 2 if proto == STATUS.PROTO else 4  # Weird special case
611     if (
612         proto not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO)
613         and length > 1
614         and len(payload) + adjust != length
615     ):
616         log.warning(
617             "With proto %d length is %d but payload length is %d+%d",
618             proto,
619             length,
620             len(payload),
621             adjust,
622         )
623     return make_object(length, proto, payload)