]> www.average.org Git - loctrkd.git/blob - gps303/gps303proto.py
make collector.py work
[loctrkd.git] / gps303 / gps303proto.py
1 """
2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
5
6 Forewarnings:
7 1. There is no security whatsoever. If you know the module's IMEI,
8    you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10    the TCP stream) makes it vulnerable to coincidental appearance
11    of framing bytes in the middle of the message. Most of the time
12    the server will receive one message in one TCP segment (i.e. in
13    one `recv()` operation, but relying on that would break things
14    if the path has lower MTU than the size of a message.
15 """
16
17 from datetime import datetime, timezone
18 from inspect import isclass
19 from logging import getLogger
20 from struct import pack, unpack
21
22 __all__ = (
23     "handle_packet",
24     "make_object",
25     "make_response",
26     "parse_message",
27     "proto_by_name",
28     "set_config",
29     "GPS303Pkt",
30     "UNKNOWN",
31     "LOGIN",
32     "SUPERVISION",
33     "HEARTBEAT",
34     "GPS_POSITIONING",
35     "GPS_OFFLINE_POSITIONING",
36     "STATUS",
37     "HIBERNATION",
38     "RESET",
39     "WHITELIST_TOTAL",
40     "WIFI_OFFLINE_POSITIONING",
41     "TIME",
42     "MOM_PHONE",
43     "STOP_ALARM",
44     "SETUP",
45     "SYNCHRONOUS_WHITELIST",
46     "RESTORE_PASSWORD",
47     "WIFI_POSITIONING",
48     "MANUAL_POSITIONING",
49     "BATTERY_CHARGE",
50     "CHARGER_CONNECTED",
51     "CHARGER_DISCONNECTED",
52     "VIBRATION_RECEIVED",
53     "POSITION_UPLOAD_INTERVAL",
54 )
55
56 log = getLogger("gps303")
57
58
59 class GPS303Pkt:
60     PROTO: int
61     CONFIG = None
62
63     def __init__(self, *args, **kwargs):
64         assert len(args) == 0
65         for k, v in kwargs.items():
66             setattr(self, k, v)
67
68     def __repr__(self):
69         return "{}({})".format(
70             self.__class__.__name__,
71             ", ".join(
72                 "{}={}".format(
73                     k,
74                     'bytes.fromhex("{}")'.format(v.hex())
75                     if isinstance(v, bytes)
76                     else v.__repr__(),
77                 )
78                 for k, v in self.__dict__.items()
79                 if not k.startswith("_")
80             ),
81         )
82
83     @classmethod
84     def from_packet(cls, length, payload):
85         return cls(payload=payload, length=length)
86
87     def to_packet(self):
88         return pack("BB", self.length, self.PROTO) + self.payload
89
90     def response(self, *args):
91         if len(args) == 0:
92             return None
93         assert len(args) == 1 and isinstance(args[0], bytes)
94         payload = args[0]
95         length = len(payload) + 1
96         if length > 6:
97             length -= 6
98         return pack("BB", length, self.PROTO) + payload
99
100
101 class UNKNOWN(GPS303Pkt):
102     PROTO = 256  # > 255 is impossible in real packets
103
104
105 class LOGIN(GPS303Pkt):
106     PROTO = 0x01
107
108     @classmethod
109     def from_packet(cls, length, payload):
110         self = super().from_packet(length, payload)
111         self.imei = payload[:-1].hex()
112         self.ver = unpack("B", payload[-1:])[0]
113         return self
114
115     def response(self):
116         return super().response(b"")
117
118
119 class SUPERVISION(GPS303Pkt):  # Server sends supervision number status
120     PROTO = 0x05
121
122     def response(self, supnum=0):
123         # 1: The device automatically answers Pickup effect
124         # 2: Automatically Answering Two-way Calls
125         # 3: Ring manually answer the two-way call
126         return super().response(b"")
127
128
129 class HEARTBEAT(GPS303Pkt):
130     PROTO = 0x08
131
132
133 class _GPS_POSITIONING(GPS303Pkt):
134     @classmethod
135     def from_packet(cls, length, payload):
136         self = super().from_packet(length, payload)
137         self.dtime = payload[:6]
138         if self.dtime == b"\0\0\0\0\0\0":
139             self.devtime = None
140         else:
141             self.devtime = datetime(
142                 *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
143             )
144         self.gps_data_length = payload[6] >> 4
145         self.gps_nb_sat = payload[6] & 0x0F
146         lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
147         self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
148         flip_lon = bool(flags & 0b0000100000000000)  # bit 4
149         flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
150         self.heading = flags & 0b0000001111111111  # bits 6 - last
151         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
152         self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
153         self.speed = speed
154         self.flags = flags
155         return self
156
157     def response(self):
158         return super().response(self.dtime)
159
160
161 class GPS_POSITIONING(_GPS_POSITIONING):
162     PROTO = 0x10
163
164
165 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
166     PROTO = 0x11
167
168
169 class STATUS(GPS303Pkt):
170     PROTO = 0x13
171
172     @classmethod
173     def from_packet(cls, length, payload):
174         self = super().from_packet(length, payload)
175         if len(payload) == 5:
176             (
177                 self.batt,
178                 self.ver,
179                 self.timezone,
180                 self.intvl,
181                 self.signal,
182             ) = unpack("BBBBB", payload)
183         elif len(payload) == 4:
184             self.batt, self.ver, self.timezone, self.intvl = unpack(
185                 "BBBB", payload
186             )
187             self.signal = None
188         return self
189
190     def response(self, upload_interval=25):  # Set interval in minutes
191         return super().response(pack("B", upload_interval))
192
193
194 class HIBERNATION(GPS303Pkt):
195     PROTO = 0x14
196
197
198 class RESET(GPS303Pkt):  # Device sends when it got reset SMS
199     PROTO = 0x15
200
201     def response(self):  # Server can send to initiate factory reset
202         return super().response(b"")
203
204
205 class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
206     PROTO = 0x16
207
208     def response(self, number=3):  # Number of whitelist entries
209         return super().response(pack("B", number))
210
211
212 class _WIFI_POSITIONING(GPS303Pkt):
213     @classmethod
214     def from_packet(cls, length, payload):
215         self = super().from_packet(length, payload)
216         self.dtime = payload[:6]
217         if self.dtime == b"\0\0\0\0\0\0":
218             self.devtime = None
219         else:
220             self.devtime = datetime.strptime(
221                 self.dtime.hex(), "%y%m%d%H%M%S"
222             ).astimezone(tz=timezone.utc)
223         self.wifi_aps = []
224         for i in range(self.length):  # length has special meaning here
225             slice = payload[6 + i * 7 : 13 + i * 7]
226             self.wifi_aps.append(
227                 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
228             )
229         gsm_slice = payload[6 + self.length * 7 :]
230         ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
231         self.gsm_cells = []
232         for i in range(ncells):
233             slice = gsm_slice[4 + i * 5 : 9 + i * 5]
234             locac, cellid, sigstr = unpack(
235                 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
236             )
237             self.gsm_cells.append((locac, cellid, -sigstr))
238         return self
239
240
241 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
242     PROTO = 0x17
243
244     def response(self):
245         return super().response(self.dtime)
246
247
248 class TIME(GPS303Pkt):
249     PROTO = 0x30
250
251     def response(self):
252         payload = pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
253         return super().response(payload)
254
255
256 class PROHIBIT_LBS(GPS303Pkt):
257     PROTO = 0x33
258
259     def response(self, status=1):  # Server sent, 0-off, 1-on
260         return super().response(pack("B", status))
261
262
263 class MOM_PHONE(GPS303Pkt):
264     PROTO = 0x43
265
266
267 class STOP_UPLOAD(GPS303Pkt):  # Server response to LOGIN to thwart the device
268     PROTO = 0x44
269
270     def response(self):
271         return super().response(b"")
272
273
274 class STOP_ALARM(GPS303Pkt):
275     PROTO = 0x56
276
277
278 class SETUP(GPS303Pkt):
279     PROTO = 0x57
280
281     def response(
282         self,
283         uploadIntervalSeconds=0x0300,
284         binarySwitch=0b00110001,
285         alarms=[0, 0, 0],
286         dndTimeSwitch=0,
287         dndTimes=[0, 0, 0],
288         gpsTimeSwitch=0,
289         gpsTimeStart=0,
290         gpsTimeStop=0,
291         phoneNumbers=["", "", ""],
292     ):
293         def pack3b(x):
294             return pack("!I", x)[1:]
295
296         payload = b"".join(
297             [
298                 pack("!H", uploadIntervalSeconds),
299                 pack("B", binarySwitch),
300             ]
301             + [pack3b(el) for el in alarms]
302             + [
303                 pack("B", dndTimeSwitch),
304             ]
305             + [pack3b(el) for el in dndTimes]
306             + [
307                 pack("B", gpsTimeSwitch),
308                 pack("!H", gpsTimeStart),
309                 pack("!H", gpsTimeStop),
310             ]
311             + [b";".join([el.encode() for el in phoneNumbers])]
312         )
313         return super().response(payload)
314
315
316 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
317     PROTO = 0x58
318
319
320 class RESTORE_PASSWORD(GPS303Pkt):
321     PROTO = 0x67
322
323
324 class WIFI_POSITIONING(_WIFI_POSITIONING):
325     PROTO = 0x69
326
327     def response(self, lat=None, lon=None):
328         if lat is None or lon is None:
329             payload = b""
330         else:
331             payload = "{:+#010.8g},{:+#010.8g}".format(lat, lon).encode(
332                 "ascii"
333             )
334         return super().response(payload)
335
336
337 class MANUAL_POSITIONING(GPS303Pkt):
338     PROTO = 0x80
339
340
341 class BATTERY_CHARGE(GPS303Pkt):
342     PROTO = 0x81
343
344
345 class CHARGER_CONNECTED(GPS303Pkt):
346     PROTO = 0x82
347
348
349 class CHARGER_DISCONNECTED(GPS303Pkt):
350     PROTO = 0x83
351
352
353 class VIBRATION_RECEIVED(GPS303Pkt):
354     PROTO = 0x94
355
356
357 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
358     PROTO = 0x98
359
360     @classmethod
361     def from_packet(cls, length, payload):
362         self = super().from_packet(length, payload)
363         self.interval = unpack("!H", payload[:2])
364         return self
365
366     def response(self):
367         return super().response(pack("!H", self.interval))
368
369
370 class SOS_ALARM(GPS303Pkt):
371     PROTO = 0x99
372
373
374 # Build dicts protocol number -> class and class name -> protocol number
375 CLASSES = {}
376 PROTOS = {}
377 if True:  # just to indent the code, sorry!
378     for cls in [
379         cls
380         for name, cls in globals().items()
381         if isclass(cls)
382         and issubclass(cls, GPS303Pkt)
383         and not name.startswith("_")
384     ]:
385         if hasattr(cls, "PROTO"):
386             CLASSES[cls.PROTO] = cls
387             PROTOS[cls.__name__] = cls.PROTO
388
389
390 def proto_by_name(name):
391     return PROTOS.get(name, -1)
392
393
394 def proto_of_message(packet):
395     return unpack("B", packet[1:2])
396
397
398 def make_object(length, proto, payload):
399     if proto in CLASSES:
400         return CLASSES[proto].from_packet(length, payload)
401     else:
402         retobj = UNKNOWN.from_packet(length, payload)
403         retobj.PROTO = proto  # Override class attr with object attr
404         return retobj
405
406
407 def parse_message(packet):
408     length, proto = unpack("BB", packet[:2])
409     payload = packet[2:]
410     adjust = 2 if proto == STATUS.PROTO else 4  # Weird special case
411     if (
412         proto
413         not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO)
414         and length > 1
415         and len(payload) + adjust != length
416     ):
417         log.warning(
418             "With proto %d length is %d but payload length is %d+%d",
419             proto,
420             length,
421             len(payload),
422             adjust,
423         )
424     return make_object(length, proto, payload)
425
426
427 def handle_packet(packet):  # DEPRECATED
428     if len(packet) < 6 or packet[:2] != b"xx" or packet[-2:] != b"\r\n":
429         return UNKNOWN.from_packet(len(packet), packet)
430     return parse_message(packet[2:-2])
431
432
433 def make_response(msg, **kwargs):  # DEPRECATED
434     inframe = msg.response(**kwargs)
435     return None if inframe is None else b"xx" + inframe + b"\r\n"
436
437
438 def set_config(config):  # Note that we are setting _class_ attribute
439     GPS303Pkt.CONFIG = config