]> www.average.org Git - loctrkd.git/blob - gps303/gps303proto.py
cleanup, make monolitic app work again
[loctrkd.git] / gps303 / gps303proto.py
1 """
2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
5
6 Forewarnings:
7 1. There is no security whatsoever. If you know the module's IMEI,
8    you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10    the TCP stream) makes it vulnerable to coincidental appearance
11    of framing bytes in the middle of the message. Most of the time
12    the server will receive one message in one TCP segment (i.e. in
13    one `recv()` operation, but relying on that would break things
14    if the path has lower MTU than the size of a message.
15 """
16
17 from datetime import datetime, timezone
18 from inspect import isclass
19 from logging import getLogger
20 from struct import pack, unpack
21
22 __all__ = (
23     "handle_packet",
24     "make_object",
25     "make_response",
26     "parse_message",
27     "proto_by_name",
28     "GPS303Pkt",
29     "UNKNOWN",
30     "LOGIN",
31     "SUPERVISION",
32     "HEARTBEAT",
33     "GPS_POSITIONING",
34     "GPS_OFFLINE_POSITIONING",
35     "STATUS",
36     "HIBERNATION",
37     "RESET",
38     "WHITELIST_TOTAL",
39     "WIFI_OFFLINE_POSITIONING",
40     "TIME",
41     "MOM_PHONE",
42     "STOP_ALARM",
43     "SETUP",
44     "SYNCHRONOUS_WHITELIST",
45     "RESTORE_PASSWORD",
46     "WIFI_POSITIONING",
47     "MANUAL_POSITIONING",
48     "BATTERY_CHARGE",
49     "CHARGER_CONNECTED",
50     "CHARGER_DISCONNECTED",
51     "VIBRATION_RECEIVED",
52     "POSITION_UPLOAD_INTERVAL",
53 )
54
55 log = getLogger("gps303")
56
57
58 class GPS303Pkt:
59     PROTO: int
60
61     def __init__(self, *args, **kwargs):
62         assert len(args) == 0
63         for k, v in kwargs.items():
64             setattr(self, k, v)
65
66     def __repr__(self):
67         return "{}({})".format(
68             self.__class__.__name__,
69             ", ".join(
70                 "{}={}".format(
71                     k,
72                     'bytes.fromhex("{}")'.format(v.hex())
73                     if isinstance(v, bytes)
74                     else v.__repr__(),
75                 )
76                 for k, v in self.__dict__.items()
77                 if not k.startswith("_")
78             ),
79         )
80
81     @classmethod
82     def from_packet(cls, length, payload):
83         return cls(payload=payload, length=length)
84
85     def to_packet(self):
86         return pack("BB", self.length, self.PROTO) + self.payload
87
88     @classmethod
89     def response(cls, *args):
90         if len(args) == 0:
91             return None
92         assert len(args) == 1 and isinstance(args[0], bytes)
93         payload = args[0]
94         length = len(payload) + 1
95         if length > 6:
96             length -= 6
97         return pack("BB", length, cls.PROTO) + payload
98
99
100 class UNKNOWN(GPS303Pkt):
101     PROTO = 256  # > 255 is impossible in real packets
102
103
104 class LOGIN(GPS303Pkt):
105     PROTO = 0x01
106
107     @classmethod
108     def from_packet(cls, length, payload):
109         self = super().from_packet(length, payload)
110         self.imei = payload[:-1].hex()
111         self.ver = unpack("B", payload[-1:])[0]
112         return self
113
114     @classmethod
115     def response(cls):
116         return super().response(b"")
117
118
119 class SUPERVISION(GPS303Pkt):  # Server sends supervision number status
120     PROTO = 0x05
121
122     @classmethod
123     def response(cls, supnum=0):
124         # 1: The device automatically answers Pickup effect
125         # 2: Automatically Answering Two-way Calls
126         # 3: Ring manually answer the two-way call
127         return super().response(b"")
128
129
130 class HEARTBEAT(GPS303Pkt):
131     PROTO = 0x08
132
133
134 class _GPS_POSITIONING(GPS303Pkt):
135     @classmethod
136     def from_packet(cls, length, payload):
137         self = super().from_packet(length, payload)
138         self.dtime = payload[:6]
139         if self.dtime == b"\0\0\0\0\0\0":
140             self.devtime = None
141         else:
142             self.devtime = datetime(
143                 *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
144             )
145         self.gps_data_length = payload[6] >> 4
146         self.gps_nb_sat = payload[6] & 0x0F
147         lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
148         self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
149         flip_lon =          bool(flags & 0b0000100000000000)  # bit 4
150         flip_lat = not      bool(flags & 0b0000010000000000)  # bit 5
151         self.heading =           flags & 0b0000001111111111   # bits 6 - last
152         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
153         self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
154         self.speed = speed
155         self.flags = flags
156         return self
157
158     def response(self):
159         return super().response(self.dtime)
160
161
162 class GPS_POSITIONING(_GPS_POSITIONING):
163     PROTO = 0x10
164
165
166 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
167     PROTO = 0x11
168
169
170 class STATUS(GPS303Pkt):
171     PROTO = 0x13
172
173     @classmethod
174     def from_packet(cls, length, payload):
175         self = super().from_packet(length, payload)
176         if len(payload) == 5:
177             (
178                 self.batt,
179                 self.ver,
180                 self.timezone,
181                 self.intvl,
182                 self.signal,
183             ) = unpack("BBBBB", payload)
184         elif len(payload) == 4:
185             self.batt, self.ver, self.timezone, self.intvl = unpack(
186                 "BBBB", payload
187             )
188             self.signal = None
189         return self
190
191     def response(self, upload_interval=25):  # Set interval in minutes
192         return super().response(pack("B", upload_interval))
193
194
195 class HIBERNATION(GPS303Pkt):
196     PROTO = 0x14
197
198
199 class RESET(GPS303Pkt):  # Device sends when it got reset SMS
200     PROTO = 0x15
201
202     def response(self):  # Server can send to initiate factory reset
203         return super().response(b"")
204
205
206 class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
207     PROTO = 0x16
208
209     def response(self, number=3):  # Number of whitelist entries
210         return super().response(pack("B", number))
211
212
213 class _WIFI_POSITIONING(GPS303Pkt):
214     @classmethod
215     def from_packet(cls, length, payload):
216         self = super().from_packet(length, payload)
217         self.dtime = payload[:6]
218         if self.dtime == b"\0\0\0\0\0\0":
219             self.devtime = None
220         else:
221             self.devtime = datetime.strptime(
222                 self.dtime.hex(), "%y%m%d%H%M%S"
223             ).astimezone(tz=timezone.utc)
224         self.wifi_aps = []
225         for i in range(self.length):  # length has special meaning here
226             slice = payload[6 + i * 7 : 13 + i * 7]
227             self.wifi_aps.append(
228                 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
229             )
230         gsm_slice = payload[6 + self.length * 7 :]
231         ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
232         self.gsm_cells = []
233         for i in range(ncells):
234             slice = gsm_slice[4 + i * 5 : 9 + i * 5]
235             locac, cellid, sigstr = unpack(
236                 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
237             )
238             self.gsm_cells.append((locac, cellid, -sigstr))
239         return self
240
241
242 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
243     PROTO = 0x17
244
245     def response(self):
246         return super().response(self.dtime)
247
248
249 class TIME(GPS303Pkt):
250     PROTO = 0x30
251
252     def response(self):
253         payload = pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
254         return super().response(payload)
255
256
257 class PROHIBIT_LBS(GPS303Pkt):
258     PROTO = 0x33
259
260     def response(self, status=1):  # Server sent, 0-off, 1-on
261         return super().response(pack("B", status))
262
263
264 class MOM_PHONE(GPS303Pkt):
265     PROTO = 0x43
266
267
268 class STOP_UPLOAD(GPS303Pkt):  # Server response to LOGIN to thwart the device
269     PROTO = 0x44
270
271     def response(self):
272         return super().response(b"")
273
274
275 class STOP_ALARM(GPS303Pkt):
276     PROTO = 0x56
277
278
279 class SETUP(GPS303Pkt):
280     PROTO = 0x57
281
282     def response(
283         self,
284         uploadIntervalSeconds=0x0300,
285         binarySwitch=0b00110001,
286         alarms=[0, 0, 0],
287         dndTimeSwitch=0,
288         dndTimes=[0, 0, 0],
289         gpsTimeSwitch=0,
290         gpsTimeStart=0,
291         gpsTimeStop=0,
292         phoneNumbers=["", "", ""],
293     ):
294         def pack3b(x):
295             return pack("!I", x)[1:]
296
297         payload = b"".join(
298             [
299                 pack("!H", uploadIntervalSeconds),
300                 pack("B", binarySwitch),
301             ]
302             + [pack3b(el) for el in alarms]
303             + [
304                 pack("B", dndTimeSwitch),
305             ]
306             + [pack3b(el) for el in dndTimes]
307             + [
308                 pack("B", gpsTimeSwitch),
309                 pack("!H", gpsTimeStart),
310                 pack("!H", gpsTimeStop),
311             ]
312             + [b";".join([el.encode() for el in phoneNumbers])]
313         )
314         return super().response(payload)
315
316
317 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
318     PROTO = 0x58
319
320
321 class RESTORE_PASSWORD(GPS303Pkt):
322     PROTO = 0x67
323
324
325 class WIFI_POSITIONING(_WIFI_POSITIONING):
326     PROTO = 0x69
327
328     def response(self, lat=None, lon=None):
329         if lat is None or lon is None:
330             payload = b""
331         else:
332             payload = "{:+#010.8g},{:+#010.8g}".format(lat, lon).encode(
333                 "ascii"
334             )
335         return super().response(payload)
336
337
338 class MANUAL_POSITIONING(GPS303Pkt):
339     PROTO = 0x80
340
341
342 class BATTERY_CHARGE(GPS303Pkt):
343     PROTO = 0x81
344
345
346 class CHARGER_CONNECTED(GPS303Pkt):
347     PROTO = 0x82
348
349
350 class CHARGER_DISCONNECTED(GPS303Pkt):
351     PROTO = 0x83
352
353
354 class VIBRATION_RECEIVED(GPS303Pkt):
355     PROTO = 0x94
356
357
358 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
359     PROTO = 0x98
360
361     @classmethod
362     def from_packet(cls, length, payload):
363         self = super().from_packet(length, payload)
364         self.interval = unpack("!H", payload[:2])
365         return self
366
367     def response(self):
368         return super().response(pack("!H", self.interval))
369
370
371 class SOS_ALARM(GPS303Pkt):
372     PROTO = 0x99
373
374
375 # Build dicts protocol number -> class and class name -> protocol number
376 CLASSES = {}
377 PROTOS = {}
378 if True:  # just to indent the code, sorry!
379     for cls in [
380         cls
381         for name, cls in globals().items()
382         if isclass(cls)
383         and issubclass(cls, GPS303Pkt)
384         and not name.startswith("_")
385     ]:
386         if hasattr(cls, "PROTO"):
387             CLASSES[cls.PROTO] = cls
388             PROTOS[cls.__name__] = cls.PROTO
389
390
391 def proto_by_name(name):
392     return PROTOS.get(name, -1)
393
394
395 def proto_of_message(packet):
396     return unpack("B", packet[1:2])[0]
397
398
399 def make_object(length, proto, payload):
400     if proto in CLASSES:
401         return CLASSES[proto].from_packet(length, payload)
402     else:
403         retobj = UNKNOWN.from_packet(length, payload)
404         retobj.PROTO = proto  # Override class attr with object attr
405         return retobj
406
407
408 def parse_message(packet):
409     length, proto = unpack("BB", packet[:2])
410     payload = packet[2:]
411     adjust = 2 if proto == STATUS.PROTO else 4  # Weird special case
412     if (
413         proto
414         not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO)
415         and length > 1
416         and len(payload) + adjust != length
417     ):
418         log.warning(
419             "With proto %d length is %d but payload length is %d+%d",
420             proto,
421             length,
422             len(payload),
423             adjust,
424         )
425     return make_object(length, proto, payload)
426
427
428 def handle_packet(packet):  # DEPRECATED
429     if len(packet) < 6 or packet[:2] != b"xx" or packet[-2:] != b"\r\n":
430         return UNKNOWN.from_packet(len(packet), packet)
431     return parse_message(packet[2:-2])
432
433
434 def make_response(msg, **kwargs):  # DEPRECATED
435     inframe = msg.response(**kwargs)
436     return None if inframe is None else b"xx" + inframe + b"\r\n"
437