]> www.average.org Git - loctrkd.git/blob - gps303/gps303proto.py
cleanup of gps303proto
[loctrkd.git] / gps303 / gps303proto.py
1 """
2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
5
6 Forewarnings:
7 1. There is no security whatsoever. If you know the module's IMEI,
8    you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10    the TCP stream) makes it vulnerable to coincidental appearance
11    of framing bytes in the middle of the message. Most of the time
12    the server will receive one message in one TCP segment (i.e. in
13    one `recv()` operation, but relying on that would break things
14    if the path has lower MTU than the size of a message.
15 """
16
17 from datetime import datetime, timezone
18 from enum import Enum
19 from inspect import isclass
20 from struct import pack, unpack
21
22 __all__ = (
23     "class_by_prefix",
24     "inline_response",
25     "parse_message",
26     "proto_by_name",
27     "Respond",
28     "GPS303Pkt",
29     "UNKNOWN",
30     "LOGIN",
31     "SUPERVISION",
32     "HEARTBEAT",
33     "GPS_POSITIONING",
34     "GPS_OFFLINE_POSITIONING",
35     "STATUS",
36     "HIBERNATION",
37     "RESET",
38     "WHITELIST_TOTAL",
39     "WIFI_OFFLINE_POSITIONING",
40     "TIME",
41     "PROHIBIT_LBS",
42     "GPS_LBS_SWITCH_TIMES",
43     "REMOTE_MONITOR_PHONE",
44     "SOS_PHONE",
45     "DAD_PHONE",
46     "MOM_PHONE",
47     "STOP_UPLOAD",
48     "GPS_OFF_PERIOD",
49     "DND_PERIOD",
50     "RESTART_SHUTDOWN",
51     "DEVICE",
52     "ALARM_CLOCK",
53     "STOP_ALARM",
54     "SETUP",
55     "SYNCHRONOUS_WHITELIST",
56     "RESTORE_PASSWORD",
57     "WIFI_POSITIONING",
58     "MANUAL_POSITIONING",
59     "BATTERY_CHARGE",
60     "CHARGER_CONNECTED",
61     "CHARGER_DISCONNECTED",
62     "VIBRATION_RECEIVED",
63     "POSITION_UPLOAD_INTERVAL",
64     "SOS_ALARM",
65     "UNKNOWN_B3",
66 )
67
68
69 def intx(x):
70     if isinstance(x, str):
71         x = int(x, 0)
72     return x
73
74
75 def hhmm(x):
76     """Check for the string that represents hours and minutes"""
77     if not isinstance(x, str) or len(x) != 4:
78         raise ValueError(str(x) + " is not a four-character string")
79     hh = int(x[:2])
80     mm = int(x[2:])
81     if hh < 0 or hh > 23 or mm < 0 or mm > 59:
82         raise ValueError(str(x) + " does not contain valid hours and minutes")
83     return x
84
85
86 def l3str(x):
87     if isinstance(x, str):
88         x = x.split(",")
89     if len(x) != 3 or not all(isinstance(el, str) for el in x):
90         raise ValueError(str(x) + " is not a list of three strings")
91     return x
92
93
94 def l3int(x):
95     if isinstance(x, str):
96         x = x.split(",")
97         x = [int(el) for el in x]
98     if len(x) != 3 or not all(isinstance(el, int) for el in x):
99         raise ValueError(str(x) + " is not a list of three integers")
100     return x
101
102
103 class MetaPkt(type):
104     """
105     For each class corresponding to a message, automatically create
106     two nested classes `In` and `Out` that also inherit from their
107     "nest". Class attribute `IN_KWARGS` defined in the "nest" is
108     copied to the `In` nested class under the name `KWARGS`, and
109     likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
110     to the nested class `Out`. In addition, method `encode` is
111     defined in both classes equal to `in_encode()` and `out_encode()`
112     respectively.
113     """
114
115     def __new__(cls, name, bases, attrs):
116         newcls = super().__new__(cls, name, bases, attrs)
117         newcls.In = super().__new__(
118             cls,
119             name + ".In",
120             (newcls,) + bases,
121             {"KWARGS": newcls.IN_KWARGS, "encode": newcls.in_encode},
122         )
123         newcls.Out = super().__new__(
124             cls,
125             name + ".Out",
126             (newcls,) + bases,
127             {"KWARGS": newcls.OUT_KWARGS, "encode": newcls.out_encode},
128         )
129         return newcls
130
131
132 class Respond(Enum):
133     NON = 0  # Incoming, no response needed
134     INL = 1  # Birirectional, use `inline_response()`
135     EXT = 2  # Birirectional, use external responder
136
137
138 class GPS303Pkt(metaclass=MetaPkt):
139     RESPOND = Respond.NON  # Do not send anything back by default
140     PROTO: int
141     # Have these kwargs for now, TODO redo
142     IN_KWARGS = (("length", int, 0), ("payload", bytes, b""))
143     OUT_KWARGS = ()
144
145     def __init__(self, *args, **kwargs):
146         assert len(args) == 0
147         for kw, typ, dfl in self.KWARGS:
148             setattr(self, kw, typ(kwargs.pop(kw, dfl)))
149         if kwargs:
150             print("KWARGS", self.KWARGS)
151             print("kwargs", kwargs)
152             raise TypeError(
153                 self.__class__.__name__ + " stray kwargs " + str(kwargs)
154             )
155
156     def __repr__(self):
157         return "{}({})".format(
158             self.__class__.__name__,
159             ", ".join(
160                 "{}={}".format(
161                     k,
162                     'bytes.fromhex("{}")'.format(v.hex())
163                     if isinstance(v, bytes)
164                     else v.__repr__(),
165                 )
166                 for k, v in self.__dict__.items()
167                 if not k.startswith("_")
168             ),
169         )
170
171     def in_encode(self):
172         raise NotImplementedError(
173             self.__class__.__name__ + ".encode() not implemented"
174         )
175
176     def out_encode(self):
177         return b""
178
179     @property
180     def packed(self):
181         payload = self.encode()
182         length = len(payload) + 1
183         return pack("BB", length, self.PROTO) + payload
184
185     @classmethod
186     def from_packet(cls, length, payload):
187         return cls.In(payload=payload, length=length)
188
189
190 class UNKNOWN(GPS303Pkt):
191     PROTO = 256  # > 255 is impossible in real packets
192
193
194 class LOGIN(GPS303Pkt):
195     PROTO = 0x01
196     RESPOND = Respond.INL
197     # Default response for ACK, can also respond with STOP_UPLOAD
198
199     @classmethod
200     def from_packet(cls, length, payload):
201         self = super().from_packet(length, payload)
202         self.imei = payload[:-1].hex()
203         self.ver = unpack("B", payload[-1:])[0]
204         return self
205
206
207 class SUPERVISION(GPS303Pkt):
208     PROTO = 0x05
209     OUT_KWARGS = (("status", int, 1),)
210
211     def out_encode(self):
212         # 1: The device automatically answers Pickup effect
213         # 2: Automatically Answering Two-way Calls
214         # 3: Ring manually answer the two-way call
215         return pack("B", self.status)
216
217
218 class HEARTBEAT(GPS303Pkt):
219     PROTO = 0x08
220     RESPOND = Respond.INL
221
222
223 class _GPS_POSITIONING(GPS303Pkt):
224     RESPOND = Respond.INL
225
226     @classmethod
227     def from_packet(cls, length, payload):
228         self = super().from_packet(length, payload)
229         self.dtime = payload[:6]
230         if self.dtime == b"\0\0\0\0\0\0":
231             self.devtime = None
232         else:
233             self.devtime = datetime(
234                 *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
235             )
236         self.gps_data_length = payload[6] >> 4
237         self.gps_nb_sat = payload[6] & 0x0F
238         lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
239         self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
240         flip_lon = bool(flags & 0b0000100000000000)  # bit 4
241         flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
242         self.heading = flags & 0b0000001111111111  # bits 6 - last
243         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
244         self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
245         self.speed = speed
246         self.flags = flags
247         return self
248
249     def out_encode(self):
250         tup = datetime.utcnow().timetuple()
251         ttup = (tup[0] % 100,) + tup[1:6]
252         return pack("BBBBBB", *ttup)
253
254
255 class GPS_POSITIONING(_GPS_POSITIONING):
256     PROTO = 0x10
257
258
259 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
260     PROTO = 0x11
261
262
263 class STATUS(GPS303Pkt):
264     PROTO = 0x13
265     RESPOND = Respond.EXT
266     OUT_KWARGS = (("upload_interval", int, 25),)
267
268     @classmethod
269     def from_packet(cls, length, payload):
270         self = super().from_packet(length, payload)
271         if len(payload) == 5:
272             (
273                 self.batt,
274                 self.ver,
275                 self.timezone,
276                 self.intvl,
277                 self.signal,
278             ) = unpack("BBBBB", payload)
279         elif len(payload) == 4:
280             self.batt, self.ver, self.timezone, self.intvl = unpack(
281                 "BBBB", payload
282             )
283             self.signal = None
284         return self
285
286     def out_encode(self):  # Set interval in minutes
287         return pack("B", self.upload_interval)
288
289
290 class HIBERNATION(GPS303Pkt):  # Server can send to send devicee to sleep
291     PROTO = 0x14
292     RESPOND = Respond.INL
293
294
295 class RESET(GPS303Pkt):
296     # Device sends when it got reset SMS
297     # Server can send to initiate factory reset
298     PROTO = 0x15
299
300
301 class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
302     PROTO = 0x16
303     OUT_KWARGS = (("number", int, 3),)
304
305     def out_encode(self):  # Number of whitelist entries
306         return pack("B", number)
307
308
309 class _WIFI_POSITIONING(GPS303Pkt):
310     @classmethod
311     def from_packet(cls, length, payload):
312         self = super().from_packet(length, payload)
313         self.dtime = payload[:6]
314         if self.dtime == b"\0\0\0\0\0\0":
315             self.devtime = None
316         else:
317             self.devtime = datetime.strptime(
318                 self.dtime.hex(), "%y%m%d%H%M%S"
319             ).astimezone(tz=timezone.utc)
320         self.wifi_aps = []
321         for i in range(self.length):  # length has special meaning here
322             slice = payload[6 + i * 7 : 13 + i * 7]
323             self.wifi_aps.append(
324                 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
325             )
326         gsm_slice = payload[6 + self.length * 7 :]
327         ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
328         self.gsm_cells = []
329         for i in range(ncells):
330             slice = gsm_slice[4 + i * 5 : 9 + i * 5]
331             locac, cellid, sigstr = unpack(
332                 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
333             )
334             self.gsm_cells.append((locac, cellid, -sigstr))
335         return self
336
337
338 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
339     PROTO = 0x17
340     RESPOND = Respond.INL
341
342     def out_encode(self):
343         return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
344
345
346 class TIME(GPS303Pkt):
347     PROTO = 0x30
348     RESPOND = Respond.INL
349
350     def out_encode(self):
351         return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
352
353
354 class PROHIBIT_LBS(GPS303Pkt):
355     PROTO = 0x33
356     OUT_KWARGS = (("status", int, 1),)
357
358     def out_encode(self):  # Server sent, 0-off, 1-on
359         return pack("B", self.status)
360
361
362 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
363     PROTO = 0x34
364
365     # Data is in packed decimal
366     # 00/01 - GPS on/off
367     # 00/01 - Don't set / Set upload period
368     # HHMMHHMM - Upload period
369     # 00/01 - LBS on/off
370     # 00/01 - Don't set / Set time of boot
371     # HHMM  - Time of boot
372     # 00/01 - Don't set / Set time of shutdown
373     # HHMM  - Time of shutdown
374     def out_encode(self):
375         return b""  # TODO
376
377
378 class _SET_PHONE(GPS303Pkt):
379     OUT_KWARGS = (("phone", str, ""),)
380
381     def out_encode(self):
382         return self.phone.encode()
383
384
385 class REMOTE_MONITOR_PHONE(_SET_PHONE):
386     PROTO = 0x40
387
388
389 class SOS_PHONE(_SET_PHONE):
390     PROTO = 0x41
391
392
393 class DAD_PHONE(_SET_PHONE):
394     PROTO = 0x42
395
396
397 class MOM_PHONE(_SET_PHONE):
398     PROTO = 0x43
399
400
401 class STOP_UPLOAD(GPS303Pkt):  # Server response to LOGIN to thwart the device
402     PROTO = 0x44
403
404
405 class GPS_OFF_PERIOD(GPS303Pkt):
406     PROTO = 0x46
407     OUT_KWARGS = (
408         ("onoff", int, 0),
409         ("fm", hhmm, "0000"),
410         ("to", hhmm, "2359"),
411     )
412
413     def out_encode(self):
414         return (
415             pack("B", self.onoff)
416             + bytes.fromhex(self.fm)
417             + bytes.fromhex(self.to)
418         )
419
420
421 class DND_PERIOD(GPS303Pkt):
422     PROTO = 0x47
423     OUT_KWARGS = (
424         ("onoff", int, 0),
425         ("week", int, 3),
426         ("fm1", hhmm, "0000"),
427         ("to1", hhmm, "2359"),
428         ("fm2", hhmm, "0000"),
429         ("to2", hhmm, "2359"),
430     )
431
432     def out_endode(self):
433         return (
434             pack("B", self.onoff)
435             + pack("B", self.week)
436             + bytes.fromhex(self.fm1)
437             + bytes.fromhex(self.to1)
438             + bytes.fromhex(self.fm2)
439             + bytes.fromhex(self.to2)
440         )
441
442
443 class RESTART_SHUTDOWN(GPS303Pkt):
444     PROTO = 0x48
445     OUT_KWARGS = (("flag", int, 0),)
446
447     def out_encode(self):
448         # 1 - restart
449         # 2 - shutdown
450         return pack("B", self.flag)
451
452
453 class DEVICE(GPS303Pkt):
454     PROTO = 0x49
455     OUT_KWARGS = (("flag", int, 0),)
456
457     # 0 - Stop looking for equipment
458     # 1 - Start looking for equipment
459     def out_encode(self):
460         return pack("B", self.flag)
461
462
463 class ALARM_CLOCK(GPS303Pkt):
464     PROTO = 0x50
465
466     def out_encode(self):
467         # TODO implement parsing kwargs
468         alarms = ((0, "0000"), (0, "0000"), (0, "0000"))
469         return b"".join(
470             cls("B", day) + bytes.fromhex(tm) for day, tm in alarms
471         )
472
473
474 class STOP_ALARM(GPS303Pkt):
475     PROTO = 0x56
476
477     @classmethod
478     def from_packet(cls, length, payload):
479         self = super().from_packet(length, payload)
480         self.flag = payload[0]
481         return self
482
483
484 class SETUP(GPS303Pkt):
485     PROTO = 0x57
486     RESPOND = Respond.EXT
487     OUT_KWARGS = (
488         ("uploadintervalseconds", intx, 0x0300),
489         ("binaryswitch", intx, 0b00110001),
490         ("alarms", l3int, [0, 0, 0]),
491         ("dndtimeswitch", int, 0),
492         ("dndtimes", l3int, [0, 0, 0]),
493         ("gpstimeswitch", int, 0),
494         ("gpstimestart", int, 0),
495         ("gpstimestop", int, 0),
496         ("phonenumbers", l3str, ["", "", ""]),
497     )
498
499     def out_encode(self):
500         def pack3b(x):
501             return pack("!I", x)[1:]
502
503         return b"".join(
504             [
505                 pack("!H", self.uploadintervalseconds),
506                 pack("B", self.binaryswitch),
507             ]
508             + [pack3b(el) for el in self.alarms]
509             + [
510                 pack("B", self.dndtimeswitch),
511             ]
512             + [pack3b(el) for el in self.dndtimes]
513             + [
514                 pack("B", self.gpstimeswitch),
515                 pack("!H", self.gpstimestart),
516                 pack("!H", self.gpstimestop),
517             ]
518             + [b";".join([el.encode() for el in self.phonenumbers])]
519         )
520
521
522 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
523     PROTO = 0x58
524
525
526 class RESTORE_PASSWORD(GPS303Pkt):
527     PROTO = 0x67
528
529
530 class WIFI_POSITIONING(_WIFI_POSITIONING):
531     PROTO = 0x69
532     RESPOND = Respond.EXT
533     OUT_KWARGS = (("lat", float, None), ("lon", float, None))
534
535     def out_encode(self):
536         if self.lat is None or self.lon is None:
537             return b""
538         return "{:+#010.8g},{:+#010.8g}".format(self.lat, self.lon).encode()
539
540
541 class MANUAL_POSITIONING(GPS303Pkt):
542     PROTO = 0x80
543
544     @classmethod
545     def from_packet(cls, length, payload):
546         self = super().from_packet(length, payload)
547         self.flag = payload[0] if len(payload) > 0 else None
548         self.reason = {
549             1: "Incorrect time",
550             2: "LBS less",
551             3: "WiFi less",
552             4: "LBS search > 3 times",
553             5: "Same LBS and WiFi data",
554             6: "LBS prohibited, WiFi absent",
555             7: "GPS spacing < 50 m",
556         }.get(self.flag, "Unknown")
557         return self
558
559
560 class BATTERY_CHARGE(GPS303Pkt):
561     PROTO = 0x81
562
563
564 class CHARGER_CONNECTED(GPS303Pkt):
565     PROTO = 0x82
566
567
568 class CHARGER_DISCONNECTED(GPS303Pkt):
569     PROTO = 0x83
570
571
572 class VIBRATION_RECEIVED(GPS303Pkt):
573     PROTO = 0x94
574
575
576 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
577     PROTO = 0x98
578     RESPOND = Respond.EXT
579     OUT_KWARGS = (("interval", int, 10),)
580
581     @classmethod
582     def from_packet(cls, length, payload):
583         self = super().from_packet(length, payload)
584         self.interval = unpack("!H", payload[:2])
585         return self
586
587     def out_encode(self):
588         return pack("!H", interval)
589
590
591 class SOS_ALARM(GPS303Pkt):
592     PROTO = 0x99
593
594
595 class UNKNOWN_B3(GPS303Pkt):
596     PROTO = 0xb3
597     IN_KWARGS = (("asciidata", str, ""),)
598
599     @classmethod
600     def from_packet(cls, length, payload):
601         self = super().from_packet(length, payload)
602         self.asciidata = payload.decode()
603         return self
604
605
606 # Build dicts protocol number -> class and class name -> protocol number
607 CLASSES = {}
608 PROTOS = {}
609 if True:  # just to indent the code, sorry!
610     for cls in [
611         cls
612         for name, cls in globals().items()
613         if isclass(cls)
614         and issubclass(cls, GPS303Pkt)
615         and not name.startswith("_")
616     ]:
617         if hasattr(cls, "PROTO"):
618             CLASSES[cls.PROTO] = cls
619             PROTOS[cls.__name__] = cls.PROTO
620
621
622 def class_by_prefix(prefix):
623     lst = [
624         (name, proto)
625         for name, proto in PROTOS.items()
626         if name.upper().startswith(prefix.upper())
627     ]
628     if len(lst) != 1:
629         return lst
630     _, proto = lst[0]
631     return CLASSES[proto]
632
633
634 def proto_by_name(name):
635     return PROTOS.get(name, -1)
636
637
638 def proto_of_message(packet):
639     return unpack("B", packet[1:2])[0]
640
641
642 def inline_response(packet):
643     proto = proto_of_message(packet)
644     if proto in CLASSES:
645         cls = CLASSES[proto]
646         if cls.RESPOND is Respond.INL:
647             return cls.Out().packed
648     return None
649
650
651 def parse_message(packet):
652     """ From a packet (without framing bytes) derive the XXX.In object """
653     length, proto = unpack("BB", packet[:2])
654     payload = packet[2:]
655     if proto in CLASSES:
656         return CLASSES[proto].from_packet(length, payload)
657     else:
658         retobj = UNKNOWN.from_packet(length, payload)
659         retobj.PROTO = proto  # Override class attr with object attr
660         return retobj