]> www.average.org Git - loctrkd.git/blob - gps303/gps303proto.py
Fix metaclass, now works
[loctrkd.git] / gps303 / gps303proto.py
1 """
2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
5
6 Forewarnings:
7 1. There is no security whatsoever. If you know the module's IMEI,
8    you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10    the TCP stream) makes it vulnerable to coincidental appearance
11    of framing bytes in the middle of the message. Most of the time
12    the server will receive one message in one TCP segment (i.e. in
13    one `recv()` operation, but relying on that would break things
14    if the path has lower MTU than the size of a message.
15 """
16
17 from datetime import datetime, timezone
18 from enum import Enum
19 from inspect import isclass
20 from logging import getLogger
21 from struct import pack, unpack
22
23 __all__ = (
24     "class_by_prefix",
25     "inline_response",
26     "make_object",
27     "parse_message",
28     "proto_by_name",
29     "Respond",
30     "GPS303Pkt",
31     "UNKNOWN",
32     "LOGIN",
33     "SUPERVISION",
34     "HEARTBEAT",
35     "GPS_POSITIONING",
36     "GPS_OFFLINE_POSITIONING",
37     "STATUS",
38     "HIBERNATION",
39     "RESET",
40     "WHITELIST_TOTAL",
41     "WIFI_OFFLINE_POSITIONING",
42     "TIME",
43     "MOM_PHONE",
44     "STOP_ALARM",
45     "SETUP",
46     "SYNCHRONOUS_WHITELIST",
47     "RESTORE_PASSWORD",
48     "WIFI_POSITIONING",
49     "MANUAL_POSITIONING",
50     "BATTERY_CHARGE",
51     "CHARGER_CONNECTED",
52     "CHARGER_DISCONNECTED",
53     "VIBRATION_RECEIVED",
54     "POSITION_UPLOAD_INTERVAL",
55 )
56
57 log = getLogger("gps303")
58
59
60 class MetaPkt(type):
61     """
62     For each class corresponding to a message, automatically create
63     two nested classes `In` and `Out` that also inherit from their
64     "nest". Class attribute `IN_KWARGS` defined in the "nest" is
65     copied to the `In` nested class under the name `KWARGS`, and
66     likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
67     to the nested class `Out`. In addition, method `encode` is
68     defined in both classes equal to `in_encode()` and `out_encode()`
69     respectively.
70     """
71
72     def __new__(cls, name, bases, attrs):
73         newcls = super().__new__(cls, name, bases, attrs)
74         newcls.In = super().__new__(
75             cls,
76             name + ".In",
77             (newcls,) + bases,
78             {"KWARGS": newcls.IN_KWARGS, "encode": newcls.in_encode},
79         )
80         newcls.Out = super().__new__(
81             cls,
82             name + ".Out",
83             (newcls,) + bases,
84             {"KWARGS": newcls.OUT_KWARGS, "encode": newcls.out_encode},
85         )
86         return newcls
87
88
89 class Respond(Enum):
90     NON = 0  # Incoming, no response needed
91     INL = 1  # Birirectional, use `inline_response()`
92     EXT = 2  # Birirectional, use external responder
93
94
95 class GPS303Pkt(metaclass=MetaPkt):
96     RESPOND = Respond.NON  # Do not send anything back by default
97     PROTO: int
98     # Have these kwargs for now, TODO redo
99     IN_KWARGS = (("length", int, 0), ("payload", bytes, b""))
100     OUT_KWARGS = ()
101
102     def __init__(self, *args, **kwargs):
103         assert len(args) == 0
104         for kw, typ, dfl in self.KWARGS:
105             setattr(self, kw, typ(kwargs.pop(kw, dfl)))
106         if kwargs:
107             print("KWARGS", self.KWARGS)
108             print("kwargs", kwargs)
109             raise TypeError(
110                 self.__class__.__name__ + " stray kwargs " + str(kwargs)
111             )
112
113     def __repr__(self):
114         return "{}({})".format(
115             self.__class__.__name__,
116             ", ".join(
117                 "{}={}".format(
118                     k,
119                     'bytes.fromhex("{}")'.format(v.hex())
120                     if isinstance(v, bytes)
121                     else v.__repr__(),
122                 )
123                 for k, v in self.__dict__.items()
124                 if not k.startswith("_")
125             ),
126         )
127
128     def in_encode(self):
129         raise NotImplementedError(
130             self.__class__.__name__ + ".encode() not implemented"
131         )
132
133     def out_encode(self):
134         return b""
135
136     @property
137     def packed(self):
138         payload = self.encode()
139         length = len(payload) + 1
140         return pack("BB", length, self.PROTO) + payload
141
142     @classmethod
143     def from_packet(cls, length, payload):
144         return cls.In(payload=payload, length=length)
145
146
147 class UNKNOWN(GPS303Pkt):
148     PROTO = 256  # > 255 is impossible in real packets
149
150
151 class LOGIN(GPS303Pkt):
152     PROTO = 0x01
153     RESPOND = Respond.INL
154     # Default response for ACK, can also respond with STOP_UPLOAD
155
156     @classmethod
157     def from_packet(cls, length, payload):
158         self = super().from_packet(length, payload)
159         self.imei = payload[:-1].hex()
160         self.ver = unpack("B", payload[-1:])[0]
161         return self
162
163
164 class SUPERVISION(GPS303Pkt):
165     PROTO = 0x05
166     OUT_KWARGS = (("status", int, 1),)
167
168     def out_encode(self):
169         # 1: The device automatically answers Pickup effect
170         # 2: Automatically Answering Two-way Calls
171         # 3: Ring manually answer the two-way call
172         return pack("B", self.status)
173
174
175 class HEARTBEAT(GPS303Pkt):
176     PROTO = 0x08
177     RESPOND = Respond.INL
178
179
180 class _GPS_POSITIONING(GPS303Pkt):
181     RESPOND = Respond.INL
182
183     @classmethod
184     def from_packet(cls, length, payload):
185         self = super().from_packet(length, payload)
186         self.dtime = payload[:6]
187         if self.dtime == b"\0\0\0\0\0\0":
188             self.devtime = None
189         else:
190             self.devtime = datetime(
191                 *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
192             )
193         self.gps_data_length = payload[6] >> 4
194         self.gps_nb_sat = payload[6] & 0x0F
195         lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
196         self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
197         flip_lon = bool(flags & 0b0000100000000000)  # bit 4
198         flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
199         self.heading = flags & 0b0000001111111111  # bits 6 - last
200         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
201         self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
202         self.speed = speed
203         self.flags = flags
204         return self
205
206     def out_encode(self):
207         tup = datetime.utcnow().timetuple()
208         ttup = (tup[0] % 100,) + tup[1:6]
209         return pack("BBBBBB", *ttup)
210
211
212 class GPS_POSITIONING(_GPS_POSITIONING):
213     PROTO = 0x10
214
215
216 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
217     PROTO = 0x11
218
219
220 class STATUS(GPS303Pkt):
221     PROTO = 0x13
222     RESPOND = Respond.EXT
223     OUT_KWARGS = (("upload_interval", int, 25),)
224
225     @classmethod
226     def from_packet(cls, length, payload):
227         self = super().from_packet(length, payload)
228         if len(payload) == 5:
229             (
230                 self.batt,
231                 self.ver,
232                 self.timezone,
233                 self.intvl,
234                 self.signal,
235             ) = unpack("BBBBB", payload)
236         elif len(payload) == 4:
237             self.batt, self.ver, self.timezone, self.intvl = unpack(
238                 "BBBB", payload
239             )
240             self.signal = None
241         return self
242
243     def out_encode(self):  # Set interval in minutes
244         return cls.make_packet(pack("B", self.upload_interval))
245
246
247 class HIBERNATION(GPS303Pkt):  # Server can send to send devicee to sleep
248     PROTO = 0x14
249     RESPOND = Respond.INL
250
251
252 class RESET(GPS303Pkt):
253     # Device sends when it got reset SMS
254     # Server can send to initiate factory reset
255     PROTO = 0x15
256
257
258 class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
259     PROTO = 0x16
260     OUT_KWARGS = (("number", int, 3),)
261
262     def out_encode(self):  # Number of whitelist entries
263         return pack("B", number)
264
265
266 class _WIFI_POSITIONING(GPS303Pkt):
267     @classmethod
268     def from_packet(cls, length, payload):
269         self = super().from_packet(length, payload)
270         self.dtime = payload[:6]
271         if self.dtime == b"\0\0\0\0\0\0":
272             self.devtime = None
273         else:
274             self.devtime = datetime.strptime(
275                 self.dtime.hex(), "%y%m%d%H%M%S"
276             ).astimezone(tz=timezone.utc)
277         self.wifi_aps = []
278         for i in range(self.length):  # length has special meaning here
279             slice = payload[6 + i * 7 : 13 + i * 7]
280             self.wifi_aps.append(
281                 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
282             )
283         gsm_slice = payload[6 + self.length * 7 :]
284         ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
285         self.gsm_cells = []
286         for i in range(ncells):
287             slice = gsm_slice[4 + i * 5 : 9 + i * 5]
288             locac, cellid, sigstr = unpack(
289                 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
290             )
291             self.gsm_cells.append((locac, cellid, -sigstr))
292         return self
293
294
295 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
296     PROTO = 0x17
297     RESPOND = Respond.INL
298
299     def out_encode(self):
300         return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
301
302
303 class TIME(GPS303Pkt):
304     PROTO = 0x30
305     RESPOND = Respond.INL
306
307     def out_encode(self):
308         return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
309
310
311 class PROHIBIT_LBS(GPS303Pkt):
312     PROTO = 0x33
313     OUT_KWARGS = (("status", int, 1),)
314
315     def out_encode(self):  # Server sent, 0-off, 1-on
316         return pack("B", self.status)
317
318
319 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
320     PROTO = 0x34
321
322     # Data is in packed decimal
323     # 00/01 - GPS on/off
324     # 00/01 - Don't set / Set upload period
325     # HHMMHHMM - Upload period
326     # 00/01 - LBS on/off
327     # 00/01 - Don't set / Set time of boot
328     # HHMM  - Time of boot
329     # 00/01 - Don't set / Set time of shutdown
330     # HHMM  - Time of shutdown
331     def out_encode(self):
332         return b""  # TODO
333
334
335 class _SET_PHONE(GPS303Pkt):
336     OUT_KWARGS = (("phone", str, ""),)
337
338     def out_encode(self):
339         return self.phone.encode()
340
341
342 class REMOTE_MONITOR_PHONE(_SET_PHONE):
343     PROTO = 0x40
344
345
346 class SOS_PHONE(_SET_PHONE):
347     PROTO = 0x41
348
349
350 class DAD_PHONE(_SET_PHONE):
351     PROTO = 0x42
352
353
354 class MOM_PHONE(_SET_PHONE):
355     PROTO = 0x43
356
357
358 class STOP_UPLOAD(GPS303Pkt):  # Server response to LOGIN to thwart the device
359     PROTO = 0x44
360
361
362 class GPS_OFF_PERIOD(GPS303Pkt):
363     PROTO = 0x46
364     OUT_KWARGS = (("onoff", int, 0), ("fm", str, "0000"), ("to", str, "2359"))
365
366     def out_encode(self):
367         return (
368             pack("B", self.onoff)
369             + bytes.fromhex(self.fm)
370             + bytes.fromhex(self.to)
371         )
372
373
374 class DND_PERIOD(GPS303Pkt):
375     PROTO = 0x47
376     OUT_KWARGS = (
377         ("onoff", int, 0),
378         ("week", int, 3),
379         ("fm1", str, "0000"),
380         ("to1", str, "2359"),
381         ("fm2", str, "0000"),
382         ("to2", str, "2359"),
383     )
384
385     def out_endode(self):
386         return (
387             pack("B", self.onoff)
388             + pack("B", self.week)
389             + bytes.fromhex(self.fm1)
390             + bytes.fromhex(self.to1)
391             + bytes.fromhex(self.fm2)
392             + bytes.fromhex(self.to2)
393         )
394
395
396 class RESTART_SHUTDOWN(GPS303Pkt):
397     PROTO = 0x48
398     OUT_KWARGS = (("flag", int, 2),)
399
400     def out_encode(self):
401         # 1 - restart
402         # 2 - shutdown
403         return pack("B", self.flag)
404
405
406 class DEVICE(GPS303Pkt):
407     PROTO = 0x49
408     OUT_KWARGS = (("flag", int, 0),)
409
410     # 0 - Stop looking for equipment
411     # 1 - Start looking for equipment
412     def out_encode(self):
413         return pack("B", self.flag)
414
415
416 class ALARM_CLOCK(GPS303Pkt):
417     PROTO = 0x50
418
419     def out_encode(self):
420         # TODO implement parsing kwargs
421         alarms = ((0, "0000"), (0, "0000"), (0, "0000"))
422         return b"".join(
423             cls("B", day) + bytes.fromhex(tm) for day, tm in alarms
424         )
425
426
427 class STOP_ALARM(GPS303Pkt):
428     PROTO = 0x56
429
430     @classmethod
431     def from_packet(cls, length, payload):
432         self = super().from_packet(length, payload)
433         self.flag = payload[0]
434         return self
435
436
437 class SETUP(GPS303Pkt):
438     PROTO = 0x57
439     RESPOND = Respond.EXT
440     OUT_KWARGS = (  # TODO handle properly
441         ("uploadintervalseconds", int, 0x0300),
442         ("binaryswitch", int, 0b00110001),
443         ("alarms", lambda x: x, [0, 0, 0]),
444         ("dndtimeswitch", int, 0),
445         ("dndtimes", lambda x: x, [0, 0, 0]),
446         ("gpstimeswitch", int, 0),
447         ("gpstimestart", int, 0),
448         ("gpstimestop", int, 0),
449         ("phonenumbers", lambda x: x, ["", "", ""]),
450     )
451
452     def out_encode(self):
453         def pack3b(x):
454             return pack("!I", x)[1:]
455
456         return b"".join(
457             [
458                 pack("!H", self.uploadintervalseconds),
459                 pack("B", self.binaryswitch),
460             ]
461             + [pack3b(el) for el in self.alarms]
462             + [
463                 pack("B", self.dndtimeswitch),
464             ]
465             + [pack3b(el) for el in self.dndtimes]
466             + [
467                 pack("B", self.gpstimeswitch),
468                 pack("!H", self.gpstimestart),
469                 pack("!H", self.gpstimestop),
470             ]
471             + [b";".join([el.encode() for el in self.phonenumbers])]
472         )
473
474
475 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
476     PROTO = 0x58
477
478
479 class RESTORE_PASSWORD(GPS303Pkt):
480     PROTO = 0x67
481
482
483 class WIFI_POSITIONING(_WIFI_POSITIONING):
484     PROTO = 0x69
485     RESPOND = Respond.EXT
486     OUT_KWARGS = (("lat", float, None), ("lon", float, None))
487
488     def out_encode(self):
489         if self.lat is None or self.lon is None:
490             return b""
491         return "{:+#010.8g},{:+#010.8g}".format(self.lat, self.lon).encode()
492
493
494 class MANUAL_POSITIONING(GPS303Pkt):
495     PROTO = 0x80
496
497     @classmethod
498     def from_packet(cls, length, payload):
499         self = super().from_packet(length, payload)
500         self.flag = payload[0] if len(payload) > 0 else None
501         self.reason = {
502             1: "Incorrect time",
503             2: "LBS less",
504             3: "WiFi less",
505             4: "LBS search > 3 times",
506             5: "Same LBS and WiFi data",
507             6: "LBS prohibited, WiFi absent",
508             7: "GPS spacing < 50 m",
509         }.get(self.flag, "Unknown")
510         return self
511
512
513 class BATTERY_CHARGE(GPS303Pkt):
514     PROTO = 0x81
515
516
517 class CHARGER_CONNECTED(GPS303Pkt):
518     PROTO = 0x82
519
520
521 class CHARGER_DISCONNECTED(GPS303Pkt):
522     PROTO = 0x83
523
524
525 class VIBRATION_RECEIVED(GPS303Pkt):
526     PROTO = 0x94
527
528
529 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
530     PROTO = 0x98
531     RESPOND = Respond.EXT
532     OUT_KWARGS = (("interval", int, 10),)
533
534     @classmethod
535     def from_packet(cls, length, payload):
536         self = super().from_packet(length, payload)
537         self.interval = unpack("!H", payload[:2])
538         return self
539
540     def out_encode(self):
541         return pack("!H", interval)
542
543
544 class SOS_ALARM(GPS303Pkt):
545     PROTO = 0x99
546
547
548 # Build dicts protocol number -> class and class name -> protocol number
549 CLASSES = {}
550 PROTOS = {}
551 if True:  # just to indent the code, sorry!
552     for cls in [
553         cls
554         for name, cls in globals().items()
555         if isclass(cls)
556         and issubclass(cls, GPS303Pkt)
557         and not name.startswith("_")
558     ]:
559         if hasattr(cls, "PROTO"):
560             CLASSES[cls.PROTO] = cls
561             PROTOS[cls.__name__] = cls.PROTO
562
563
564 def class_by_prefix(prefix):
565     lst = [
566         (name, proto)
567         for name, proto in PROTOS.items()
568         if name.upper().startswith(prefix.upper())
569     ]
570     if len(lst) != 1:
571         return lst
572     _, proto = lst[0]
573     return CLASSES[proto]
574
575
576 def proto_by_name(name):
577     return PROTOS.get(name, -1)
578
579
580 def proto_of_message(packet):
581     return unpack("B", packet[1:2])[0]
582
583
584 def inline_response(packet):
585     proto = proto_of_message(packet)
586     if proto in CLASSES:
587         cls = CLASSES[proto]
588         if cls.RESPOND is Respond.INL:
589             return cls.Out().packed
590     return None
591
592
593 def make_object(length, proto, payload):
594     if proto in CLASSES:
595         return CLASSES[proto].from_packet(length, payload)
596     else:
597         retobj = UNKNOWN.from_packet(length, payload)
598         retobj.PROTO = proto  # Override class attr with object attr
599         return retobj
600
601
602 def parse_message(packet):
603     length, proto = unpack("BB", packet[:2])
604     payload = packet[2:]
605     adjust = 2 if proto == STATUS.PROTO else 4  # Weird special case
606     if (
607         proto not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO)
608         and length > 1
609         and len(payload) + adjust != length
610     ):
611         log.warning(
612             "With proto %d length is %d but payload length is %d+%d",
613             proto,
614             length,
615             len(payload),
616             adjust,
617         )
618     return make_object(length, proto, payload)