]> www.average.org Git - loctrkd.git/blob - gps303/gps303proto.py
add some more message types
[loctrkd.git] / gps303 / gps303proto.py
1 """
2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
5
6 Forewarnings:
7 1. There is no security whatsoever. If you know the module's IMEI,
8    you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10    the TCP stream) makes it vulnerable to coincidental appearance
11    of framing bytes in the middle of the message. Most of the time
12    the server will receive one message in one TCP segment (i.e. in
13    one `recv()` operation, but relying on that would break things
14    if the path has lower MTU than the size of a message.
15 """
16
17 from datetime import datetime, timezone
18 from enum import Enum
19 from inspect import isclass
20 from logging import getLogger
21 from struct import pack, unpack
22
23 __all__ = (
24     "handle_packet",
25     "inline_response",
26     "make_object",
27     "make_response",
28     "parse_message",
29     "proto_by_name",
30     "GPS303Pkt",
31     "UNKNOWN",
32     "LOGIN",
33     "SUPERVISION",
34     "HEARTBEAT",
35     "GPS_POSITIONING",
36     "GPS_OFFLINE_POSITIONING",
37     "STATUS",
38     "HIBERNATION",
39     "RESET",
40     "WHITELIST_TOTAL",
41     "WIFI_OFFLINE_POSITIONING",
42     "TIME",
43     "MOM_PHONE",
44     "STOP_ALARM",
45     "SETUP",
46     "SYNCHRONOUS_WHITELIST",
47     "RESTORE_PASSWORD",
48     "WIFI_POSITIONING",
49     "MANUAL_POSITIONING",
50     "BATTERY_CHARGE",
51     "CHARGER_CONNECTED",
52     "CHARGER_DISCONNECTED",
53     "VIBRATION_RECEIVED",
54     "POSITION_UPLOAD_INTERVAL",
55 )
56
57 log = getLogger("gps303")
58
59
60 class Dir(Enum):
61     IN = 0  # Incoming, no response needed
62     INLINE = 2  # Birirectional, use `inline_response()`
63     EXT = 3  # Birirectional, use external responder
64     OUT = 4  # Outgoing, should not appear on input
65
66
67 class GPS303Pkt:
68     PROTO: int
69     DIR = Dir.INLINE  # Most packets anticipate simple acknowledgement
70
71     def __init__(self, *args, **kwargs):
72         assert len(args) == 0
73         for k, v in kwargs.items():
74             setattr(self, k, v)
75
76     def __repr__(self):
77         return "{}({})".format(
78             self.__class__.__name__,
79             ", ".join(
80                 "{}={}".format(
81                     k,
82                     'bytes.fromhex("{}")'.format(v.hex())
83                     if isinstance(v, bytes)
84                     else v.__repr__(),
85                 )
86                 for k, v in self.__dict__.items()
87                 if not k.startswith("_")
88             ),
89         )
90
91     @classmethod
92     def from_packet(cls, length, payload):
93         return cls(payload=payload, length=length)
94
95     def to_packet(self):
96         return pack("BB", self.length, self.PROTO) + self.payload
97
98     @classmethod
99     def make_packet(cls, payload):
100         assert isinstance(payload, bytes)
101         length = len(payload) + 1  # plus proto byte
102         # if length > 6:
103         #     length -= 6
104         return pack("BB", length, cls.PROTO) + payload
105
106     @classmethod
107     def inline_response(cls, packet):
108         if cls.DIR is Dir.INLINE:
109             return cls.make_packet(b"")
110         else:
111             return None
112
113
114 class UNKNOWN(GPS303Pkt):
115     PROTO = 256  # > 255 is impossible in real packets
116     DIR = Dir.IN
117
118
119 class LOGIN(GPS303Pkt):
120     PROTO = 0x01
121     # Default response for ACK, can also respond with STOP_UPLOAD
122
123     @classmethod
124     def from_packet(cls, length, payload):
125         self = super().from_packet(length, payload)
126         self.imei = payload[:-1].hex()
127         self.ver = unpack("B", payload[-1:])[0]
128         return self
129
130
131 class SUPERVISION(GPS303Pkt):
132     PROTO = 0x05
133     DIR = Dir.OUT
134
135     def response(self, status=0):
136         # 1: The device automatically answers Pickup effect
137         # 2: Automatically Answering Two-way Calls
138         # 3: Ring manually answer the two-way call
139         return self.make_packet(pack("B", status))
140
141
142 class HEARTBEAT(GPS303Pkt):
143     PROTO = 0x08
144
145
146 class _GPS_POSITIONING(GPS303Pkt):
147     @classmethod
148     def from_packet(cls, length, payload):
149         self = super().from_packet(length, payload)
150         self.dtime = payload[:6]
151         if self.dtime == b"\0\0\0\0\0\0":
152             self.devtime = None
153         else:
154             self.devtime = datetime(
155                 *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
156             )
157         self.gps_data_length = payload[6] >> 4
158         self.gps_nb_sat = payload[6] & 0x0F
159         lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
160         self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
161         flip_lon = bool(flags & 0b0000100000000000)  # bit 4
162         flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
163         self.heading = flags & 0b0000001111111111  # bits 6 - last
164         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
165         self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
166         self.speed = speed
167         self.flags = flags
168         return self
169
170     @classmethod
171     def inline_response(cls, packet):
172         tup = datetime.utcnow().timetuple()
173         ttup = (tup[0] % 100,) + tup[1:6]
174         return cls.make_packet(pack("BBBBBB", *ttup))
175
176
177 class GPS_POSITIONING(_GPS_POSITIONING):
178     PROTO = 0x10
179
180
181 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
182     PROTO = 0x11
183
184
185 class STATUS(GPS303Pkt):
186     PROTO = 0x13
187     DIR = Dir.EXT
188
189     @classmethod
190     def from_packet(cls, length, payload):
191         self = super().from_packet(length, payload)
192         if len(payload) == 5:
193             (
194                 self.batt,
195                 self.ver,
196                 self.timezone,
197                 self.intvl,
198                 self.signal,
199             ) = unpack("BBBBB", payload)
200         elif len(payload) == 4:
201             self.batt, self.ver, self.timezone, self.intvl = unpack(
202                 "BBBB", payload
203             )
204             self.signal = None
205         return self
206
207     def response(self, upload_interval=25):  # Set interval in minutes
208         return self.make_packet(pack("B", upload_interval))
209
210
211 class HIBERNATION(GPS303Pkt):
212     PROTO = 0x14
213     DIR = Dir.EXT
214
215     def response(self):  # Server can send to send devicee to sleep
216         return self.make_packet(b"")
217
218
219 class RESET(GPS303Pkt):  # Device sends when it got reset SMS
220     PROTO = 0x15
221     DIR = Dir.EXT
222
223     def response(self):  # Server can send to initiate factory reset
224         return self.make_packet(b"")
225
226
227 class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
228     PROTO = 0x16
229     DIR = Dir.OUT
230
231     def response(self, number=3):  # Number of whitelist entries
232         return self.make_packet(pack("B", number))
233
234
235 class _WIFI_POSITIONING(GPS303Pkt):
236     @classmethod
237     def from_packet(cls, length, payload):
238         self = super().from_packet(length, payload)
239         self.dtime = payload[:6]
240         if self.dtime == b"\0\0\0\0\0\0":
241             self.devtime = None
242         else:
243             self.devtime = datetime.strptime(
244                 self.dtime.hex(), "%y%m%d%H%M%S"
245             ).astimezone(tz=timezone.utc)
246         self.wifi_aps = []
247         for i in range(self.length):  # length has special meaning here
248             slice = payload[6 + i * 7 : 13 + i * 7]
249             self.wifi_aps.append(
250                 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
251             )
252         gsm_slice = payload[6 + self.length * 7 :]
253         ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
254         self.gsm_cells = []
255         for i in range(ncells):
256             slice = gsm_slice[4 + i * 5 : 9 + i * 5]
257             locac, cellid, sigstr = unpack(
258                 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
259             )
260             self.gsm_cells.append((locac, cellid, -sigstr))
261         return self
262
263
264 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
265     PROTO = 0x17
266
267     @classmethod
268     def inline_response(cls, packet):
269         return cls.make_packet(
270             bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
271         )
272
273
274 class TIME(GPS303Pkt):
275     PROTO = 0x30
276
277     @classmethod
278     def inline_response(cls, packet):
279         return cls.make_packet(
280             pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
281         )
282
283
284 class PROHIBIT_LBS(GPS303Pkt):
285     PROTO = 0x33
286     DIR = Dir.OUT
287
288     def response(self, status=1):  # Server sent, 0-off, 1-on
289         return self.make_packet(pack("B", status))
290
291
292 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
293     PROTO = 0x34
294     DIR = Dir.OUT
295
296     def response(self):
297         # Data is in packed decimal
298         # 00/01 - GPS on/off
299         # 00/01 - Don't set / Set upload period
300         # HHMMHHMM - Upload period
301         # 00/01 - LBS on/off
302         # 00/01 - Don't set / Set time of boot
303         # HHMM  - Time of boot
304         # 00/01 - Don't set / Set time of shutdown
305         # HHMM  - Time of shutdown
306         return self.make_packet(b"")  # TODO
307
308
309 class _SET_PHONE(GPS303Pkt):
310     DIR = Dir.OUT
311
312     def response(self, phone):
313         return self.make_packet(phone.encode())
314
315
316 class REMOTE_MONITOR_PHONE(_SET_PHONE):
317     PROTO = 0x40
318
319
320 class SOS_PHONE(_SET_PHONE):
321     PROTO = 0x41
322
323
324 class DAD_PHONE(_SET_PHONE):
325     PROTO = 0x42
326
327
328 class MOM_PHONE(_SET_PHONE):
329     PROTO = 0x43
330
331
332 class STOP_UPLOAD(GPS303Pkt):  # Server response to LOGIN to thwart the device
333     PROTO = 0x44
334     DIR = Dir.OUT
335
336     def response(self):
337         return self.make_packet(b"")
338
339
340 class GPS_OFF_PERIOD(GPS303Pkt):
341     PROTO = 0x46
342     DIR = Dir.OUT
343
344     def response(self, onoff=0, fm="0000", to="2359"):
345         return self.make_packet(
346             pack("B", onoff) + bytes.fromhex(fm) + bytes.fromhex(to)
347         )
348
349
350 class DND_PERIOD(GPS303Pkt):
351     PROTO = 0x47
352     DIR = Dir.OUT
353
354     def response(
355         self, onoff=0, week=3, fm1="0000", to1="2359", fm2="0000", to2="2359"
356     ):
357         return self.make_packet(
358             pack("B", onoff)
359             + pack("B", week)
360             + bytes.fromhex(fm1)
361             + bytes.fromhex(to1)
362             + bytes.fromhex(fm2)
363             + bytes.fromhex(to2)
364         )
365
366
367 class RESTART_SHUTDOWN(GPS303Pkt):
368     PROTO = 0x48
369     DIR = Dir.OUT
370
371     def response(self, flag=0):
372         # 1 - restart
373         # 2 - shutdown
374         return self.make_packet(pack("B", flag))
375
376
377 class DEVICE(GPS303Pkt):
378     PROTO = 0x49
379     DIR = Dir.OUT
380
381     def response(self, flag=0):
382         # 0 - Stop looking for equipment
383         # 1 - Start looking for equipment
384         return self.make_packet(pack("B", flag))
385
386
387 class ALARM_CLOCK(GPS303Pkt):
388     PROTO = 0x50
389     DIR = Dir.OUT
390
391     def response(self, alarms=((0, "0000"), (0, "0000"), (0, "0000"))):
392         return b"".join(
393             pack("B", day) + bytes.fromhex(tm) for day, tm in alarms
394         )
395
396
397 class STOP_ALARM(GPS303Pkt):
398     PROTO = 0x56
399
400     @classmethod
401     def from_packet(cls, length, payload):
402         self = super().from_packet(length, payload)
403         self.flag = payload[0]
404
405
406 class SETUP(GPS303Pkt):
407     PROTO = 0x57
408     DIR = Dir.EXT
409
410     def response(
411         self,
412         uploadintervalseconds=0x0300,
413         binaryswitch=0b00110001,
414         alarms=[0, 0, 0],
415         dndtimeswitch=0,
416         dndtimes=[0, 0, 0],
417         gpstimeswitch=0,
418         gpstimestart=0,
419         gpstimestop=0,
420         phonenumbers=["", "", ""],
421     ):
422         def pack3b(x):
423             return pack("!I", x)[1:]
424
425         payload = b"".join(
426             [
427                 pack("!H", uploadintervalseconds),
428                 pack("B", binaryswitch),
429             ]
430             + [pack3b(el) for el in alarms]
431             + [
432                 pack("B", dndtimeswitch),
433             ]
434             + [pack3b(el) for el in dndtimes]
435             + [
436                 pack("B", gpstimeswitch),
437                 pack("!H", gpstimestart),
438                 pack("!H", gpstimestop),
439             ]
440             + [b";".join([el.encode() for el in phonenumbers])]
441         )
442         return self.make_packet(payload)
443
444
445 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
446     PROTO = 0x58
447
448
449 class RESTORE_PASSWORD(GPS303Pkt):
450     PROTO = 0x67
451
452
453 class WIFI_POSITIONING(_WIFI_POSITIONING):
454     PROTO = 0x69
455     DIR = Dir.EXT
456
457     def response(self, lat=None, lon=None):
458         if lat is None or lon is None:
459             payload = b""
460         else:
461             payload = "{:+#010.8g},{:+#010.8g}".format(lat, lon).encode(
462                 "ascii"
463             )
464         return self.make_packet(payload)
465
466
467 class MANUAL_POSITIONING(GPS303Pkt):
468     PROTO = 0x80
469     DIR = Dir.EXT
470
471     @classmethod
472     def from_packet(cls, length, payload):
473         self = super().from_packet(length, payload)
474         self.flag = payload[0]
475         self.reason = {
476             1: "Incorrect time",
477             2: "LBS less",
478             3: "WiFi less",
479             4: "LBS search > 3 times",
480             5: "Same LBS and WiFi data",
481             6: "LBS prohibited, WiFi absent",
482             7: "GPS spacing < 50 m",
483         }.get(self.flag, "Unknown")
484
485     def response(self):
486         return self.make_packet(b"")
487
488
489 class BATTERY_CHARGE(GPS303Pkt):
490     PROTO = 0x81
491
492
493 class CHARGER_CONNECTED(GPS303Pkt):
494     PROTO = 0x82
495
496
497 class CHARGER_DISCONNECTED(GPS303Pkt):
498     PROTO = 0x83
499
500
501 class VIBRATION_RECEIVED(GPS303Pkt):
502     PROTO = 0x94
503
504
505 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
506     PROTO = 0x98
507     DIR = Dir.EXT
508
509     @classmethod
510     def from_packet(cls, length, payload):
511         self = super().from_packet(length, payload)
512         self.interval = unpack("!H", payload[:2])
513         return self
514
515     def response(self, interval=10):
516         return self.make_packet(pack("!H", interval))
517
518
519 class SOS_ALARM(GPS303Pkt):
520     PROTO = 0x99
521
522
523 # Build dicts protocol number -> class and class name -> protocol number
524 CLASSES = {}
525 PROTOS = {}
526 if True:  # just to indent the code, sorry!
527     for cls in [
528         cls
529         for name, cls in globals().items()
530         if isclass(cls)
531         and issubclass(cls, GPS303Pkt)
532         and not name.startswith("_")
533     ]:
534         if hasattr(cls, "PROTO"):
535             CLASSES[cls.PROTO] = cls
536             PROTOS[cls.__name__] = cls.PROTO
537
538
539 def proto_by_name(name):
540     return PROTOS.get(name, -1)
541
542
543 def proto_of_message(packet):
544     return unpack("B", packet[1:2])[0]
545
546
547 def inline_response(packet):
548     proto = proto_of_message(packet)
549     if proto in CLASSES:
550         return CLASSES[proto].inline_response(packet)
551     else:
552         return None
553
554
555 def make_object(length, proto, payload):
556     if proto in CLASSES:
557         return CLASSES[proto].from_packet(length, payload)
558     else:
559         retobj = UNKNOWN.from_packet(length, payload)
560         retobj.PROTO = proto  # Override class attr with object attr
561         return retobj
562
563
564 def parse_message(packet):
565     length, proto = unpack("BB", packet[:2])
566     payload = packet[2:]
567     adjust = 2 if proto == STATUS.PROTO else 4  # Weird special case
568     if (
569         proto not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO)
570         and length > 1
571         and len(payload) + adjust != length
572     ):
573         log.warning(
574             "With proto %d length is %d but payload length is %d+%d",
575             proto,
576             length,
577             len(payload),
578             adjust,
579         )
580     return make_object(length, proto, payload)
581
582
583 def handle_packet(packet):  # DEPRECATED
584     if len(packet) < 6 or packet[:2] != b"xx" or packet[-2:] != b"\r\n":
585         return UNKNOWN.from_packet(len(packet), packet)
586     return parse_message(packet[2:-2])
587
588
589 def make_response(msg, **kwargs):  # DEPRECATED
590     inframe = msg.response(**kwargs)
591     return None if inframe is None else b"xx" + inframe + b"\r\n"