]> www.average.org Git - loctrkd.git/blob - gps303/gps303proto.py
implement `inline_response()`
[loctrkd.git] / gps303 / gps303proto.py
1 """
2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
5
6 Forewarnings:
7 1. There is no security whatsoever. If you know the module's IMEI,
8    you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10    the TCP stream) makes it vulnerable to coincidental appearance
11    of framing bytes in the middle of the message. Most of the time
12    the server will receive one message in one TCP segment (i.e. in
13    one `recv()` operation, but relying on that would break things
14    if the path has lower MTU than the size of a message.
15 """
16
17 from datetime import datetime, timezone
18 from inspect import isclass
19 from logging import getLogger
20 from struct import pack, unpack
21
22 __all__ = (
23     "handle_packet",
24     "inline_response",
25     "make_object",
26     "make_response",
27     "parse_message",
28     "proto_by_name",
29     "GPS303Pkt",
30     "UNKNOWN",
31     "LOGIN",
32     "SUPERVISION",
33     "HEARTBEAT",
34     "GPS_POSITIONING",
35     "GPS_OFFLINE_POSITIONING",
36     "STATUS",
37     "HIBERNATION",
38     "RESET",
39     "WHITELIST_TOTAL",
40     "WIFI_OFFLINE_POSITIONING",
41     "TIME",
42     "MOM_PHONE",
43     "STOP_ALARM",
44     "SETUP",
45     "SYNCHRONOUS_WHITELIST",
46     "RESTORE_PASSWORD",
47     "WIFI_POSITIONING",
48     "MANUAL_POSITIONING",
49     "BATTERY_CHARGE",
50     "CHARGER_CONNECTED",
51     "CHARGER_DISCONNECTED",
52     "VIBRATION_RECEIVED",
53     "POSITION_UPLOAD_INTERVAL",
54 )
55
56 log = getLogger("gps303")
57
58
59 class GPS303Pkt:
60     PROTO: int
61
62     def __init__(self, *args, **kwargs):
63         assert len(args) == 0
64         for k, v in kwargs.items():
65             setattr(self, k, v)
66
67     def __repr__(self):
68         return "{}({})".format(
69             self.__class__.__name__,
70             ", ".join(
71                 "{}={}".format(
72                     k,
73                     'bytes.fromhex("{}")'.format(v.hex())
74                     if isinstance(v, bytes)
75                     else v.__repr__(),
76                 )
77                 for k, v in self.__dict__.items()
78                 if not k.startswith("_")
79             ),
80         )
81
82     @classmethod
83     def from_packet(cls, length, payload):
84         return cls(payload=payload, length=length)
85
86     def to_packet(self):
87         return pack("BB", self.length, self.PROTO) + self.payload
88
89     @classmethod
90     def make_packet(cls, payload):
91         assert isinstance(payload, bytes)
92         length = len(payload) + 1
93         if length > 6:
94             length -= 6
95         return pack("BB", length, cls.PROTO) + payload
96
97     @classmethod
98     def inline_response(cls, packet):
99         return cls.make_packet(b"")
100
101
102 class UNKNOWN(GPS303Pkt):
103     PROTO = 256  # > 255 is impossible in real packets
104
105     @classmethod
106     def inline_response(cls, packet):
107         return None
108
109
110 class LOGIN(GPS303Pkt):
111     PROTO = 0x01
112
113     @classmethod
114     def from_packet(cls, length, payload):
115         self = super().from_packet(length, payload)
116         self.imei = payload[:-1].hex()
117         self.ver = unpack("B", payload[-1:])[0]
118         return self
119
120     def response(self):
121         return self.make_packet(b"")
122
123
124 class SUPERVISION(GPS303Pkt):  # Server sends supervision number status
125     PROTO = 0x05
126
127     def response(self, supnum=0):
128         # 1: The device automatically answers Pickup effect
129         # 2: Automatically Answering Two-way Calls
130         # 3: Ring manually answer the two-way call
131         return super().response(b"")
132
133
134 class HEARTBEAT(GPS303Pkt):
135     PROTO = 0x08
136
137
138 class _GPS_POSITIONING(GPS303Pkt):
139     @classmethod
140     def from_packet(cls, length, payload):
141         self = super().from_packet(length, payload)
142         self.dtime = payload[:6]
143         if self.dtime == b"\0\0\0\0\0\0":
144             self.devtime = None
145         else:
146             self.devtime = datetime(
147                 *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
148             )
149         self.gps_data_length = payload[6] >> 4
150         self.gps_nb_sat = payload[6] & 0x0F
151         lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
152         self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
153         flip_lon = bool(flags & 0b0000100000000000)  # bit 4
154         flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
155         self.heading = flags & 0b0000001111111111  # bits 6 - last
156         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
157         self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
158         self.speed = speed
159         self.flags = flags
160         return self
161
162     @classmethod
163     def inline_response(cls, packet):
164         return cls.make_packet(packet[2:8])
165
166     def response(self):
167         return self.make_packet(self.dtime)
168
169
170 class GPS_POSITIONING(_GPS_POSITIONING):
171     PROTO = 0x10
172
173
174 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
175     PROTO = 0x11
176
177
178 class STATUS(GPS303Pkt):
179     PROTO = 0x13
180
181     @classmethod
182     def from_packet(cls, length, payload):
183         self = super().from_packet(length, payload)
184         if len(payload) == 5:
185             (
186                 self.batt,
187                 self.ver,
188                 self.timezone,
189                 self.intvl,
190                 self.signal,
191             ) = unpack("BBBBB", payload)
192         elif len(payload) == 4:
193             self.batt, self.ver, self.timezone, self.intvl = unpack(
194                 "BBBB", payload
195             )
196             self.signal = None
197         return self
198
199     @classmethod
200     def inline_response(cls, packet):
201         return None
202
203     def response(self, upload_interval=25):  # Set interval in minutes
204         return super().response(pack("B", upload_interval))
205
206
207 class HIBERNATION(GPS303Pkt):
208     PROTO = 0x14
209
210
211 class RESET(GPS303Pkt):  # Device sends when it got reset SMS
212     PROTO = 0x15
213
214     def response(self):  # Server can send to initiate factory reset
215         return self.make_packet(b"")
216
217
218 class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
219     PROTO = 0x16
220
221     def response(self, number=3):  # Number of whitelist entries
222         return super().response(pack("B", number))
223
224
225 class _WIFI_POSITIONING(GPS303Pkt):
226     @classmethod
227     def from_packet(cls, length, payload):
228         self = super().from_packet(length, payload)
229         self.dtime = payload[:6]
230         if self.dtime == b"\0\0\0\0\0\0":
231             self.devtime = None
232         else:
233             self.devtime = datetime.strptime(
234                 self.dtime.hex(), "%y%m%d%H%M%S"
235             ).astimezone(tz=timezone.utc)
236         self.wifi_aps = []
237         for i in range(self.length):  # length has special meaning here
238             slice = payload[6 + i * 7 : 13 + i * 7]
239             self.wifi_aps.append(
240                 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
241             )
242         gsm_slice = payload[6 + self.length * 7 :]
243         ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
244         self.gsm_cells = []
245         for i in range(ncells):
246             slice = gsm_slice[4 + i * 5 : 9 + i * 5]
247             locac, cellid, sigstr = unpack(
248                 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
249             )
250             self.gsm_cells.append((locac, cellid, -sigstr))
251         return self
252
253
254 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
255     PROTO = 0x17
256
257     @classmethod
258     def inline_response(cls, packet):
259         return cls.make_packet(packet[2:8])
260
261     def response(self):
262         return super().response(self.dtime)
263
264
265 class TIME(GPS303Pkt):
266     PROTO = 0x30
267
268     @classmethod
269     def inline_response(cls, packet):
270         return pack(
271             "!BBHBBBBB", 7, cls.PROTO, *datetime.utcnow().timetuple()[:6]
272         )
273
274     def response(self):
275         payload = pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
276         return super().response(payload)
277
278
279 class PROHIBIT_LBS(GPS303Pkt):
280     PROTO = 0x33
281
282     def response(self, status=1):  # Server sent, 0-off, 1-on
283         return super().response(pack("B", status))
284
285
286 class MOM_PHONE(GPS303Pkt):
287     PROTO = 0x43
288
289
290 class STOP_UPLOAD(GPS303Pkt):  # Server response to LOGIN to thwart the device
291     PROTO = 0x44
292
293     def response(self):
294         return super().response(b"")
295
296
297 class STOP_ALARM(GPS303Pkt):
298     PROTO = 0x56
299
300
301 class SETUP(GPS303Pkt):
302     PROTO = 0x57
303
304     def response(
305         self,
306         uploadIntervalSeconds=0x0300,
307         binarySwitch=0b00110001,
308         alarms=[0, 0, 0],
309         dndTimeSwitch=0,
310         dndTimes=[0, 0, 0],
311         gpsTimeSwitch=0,
312         gpsTimeStart=0,
313         gpsTimeStop=0,
314         phoneNumbers=["", "", ""],
315     ):
316         def pack3b(x):
317             return pack("!I", x)[1:]
318
319         payload = b"".join(
320             [
321                 pack("!H", uploadIntervalSeconds),
322                 pack("B", binarySwitch),
323             ]
324             + [pack3b(el) for el in alarms]
325             + [
326                 pack("B", dndTimeSwitch),
327             ]
328             + [pack3b(el) for el in dndTimes]
329             + [
330                 pack("B", gpsTimeSwitch),
331                 pack("!H", gpsTimeStart),
332                 pack("!H", gpsTimeStop),
333             ]
334             + [b";".join([el.encode() for el in phoneNumbers])]
335         )
336         return super().response(payload)
337
338
339 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
340     PROTO = 0x58
341
342
343 class RESTORE_PASSWORD(GPS303Pkt):
344     PROTO = 0x67
345
346
347 class WIFI_POSITIONING(_WIFI_POSITIONING):
348     PROTO = 0x69
349
350     @classmethod
351     def inline_response(cls, packet):
352         return None
353
354     def response(self, lat=None, lon=None):
355         if lat is None or lon is None:
356             payload = b""
357         else:
358             payload = "{:+#010.8g},{:+#010.8g}".format(lat, lon).encode(
359                 "ascii"
360             )
361         return self.make_packet(payload)
362
363
364 class MANUAL_POSITIONING(GPS303Pkt):
365     PROTO = 0x80
366
367
368 class BATTERY_CHARGE(GPS303Pkt):
369     PROTO = 0x81
370
371
372 class CHARGER_CONNECTED(GPS303Pkt):
373     PROTO = 0x82
374
375
376 class CHARGER_DISCONNECTED(GPS303Pkt):
377     PROTO = 0x83
378
379
380 class VIBRATION_RECEIVED(GPS303Pkt):
381     PROTO = 0x94
382
383
384 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
385     PROTO = 0x98
386
387     @classmethod
388     def from_packet(cls, length, payload):
389         self = super().from_packet(length, payload)
390         self.interval = unpack("!H", payload[:2])
391         return self
392
393     @classmethod
394     def inline_response(cls, packet):
395         return cls.make_packet(packet[2:4])
396
397     def response(self):
398         return self.make_packet(pack("!H", self.interval))
399
400
401 class SOS_ALARM(GPS303Pkt):
402     PROTO = 0x99
403
404
405 # Build dicts protocol number -> class and class name -> protocol number
406 CLASSES = {}
407 PROTOS = {}
408 if True:  # just to indent the code, sorry!
409     for cls in [
410         cls
411         for name, cls in globals().items()
412         if isclass(cls)
413         and issubclass(cls, GPS303Pkt)
414         and not name.startswith("_")
415     ]:
416         if hasattr(cls, "PROTO"):
417             CLASSES[cls.PROTO] = cls
418             PROTOS[cls.__name__] = cls.PROTO
419
420
421 def proto_by_name(name):
422     return PROTOS.get(name, -1)
423
424
425 def proto_of_message(packet):
426     return unpack("B", packet[1:2])[0]
427
428
429 def inline_response(packet):
430     proto = proto_of_message(packet)
431     if proto in CLASSES:
432         return CLASSES[proto].inline_response(packet)
433     else:
434         return None
435
436
437 def make_object(length, proto, payload):
438     if proto in CLASSES:
439         return CLASSES[proto].from_packet(length, payload)
440     else:
441         retobj = UNKNOWN.from_packet(length, payload)
442         retobj.PROTO = proto  # Override class attr with object attr
443         return retobj
444
445
446 def parse_message(packet):
447     length, proto = unpack("BB", packet[:2])
448     payload = packet[2:]
449     adjust = 2 if proto == STATUS.PROTO else 4  # Weird special case
450     if (
451         proto not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO)
452         and length > 1
453         and len(payload) + adjust != length
454     ):
455         log.warning(
456             "With proto %d length is %d but payload length is %d+%d",
457             proto,
458             length,
459             len(payload),
460             adjust,
461         )
462     return make_object(length, proto, payload)
463
464
465 def handle_packet(packet):  # DEPRECATED
466     if len(packet) < 6 or packet[:2] != b"xx" or packet[-2:] != b"\r\n":
467         return UNKNOWN.from_packet(len(packet), packet)
468     return parse_message(packet[2:-2])
469
470
471 def make_response(msg, **kwargs):  # DEPRECATED
472     inframe = msg.response(**kwargs)
473     return None if inframe is None else b"xx" + inframe + b"\r\n"