]> www.average.org Git - loctrkd.git/blob - gps303/gps303proto.py
Add some more commands
[loctrkd.git] / gps303 / gps303proto.py
1 """
2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
5
6 Forewarnings:
7 1. There is no security whatsoever. If you know the module's IMEI,
8    you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10    the TCP stream) makes it vulnerable to coincidental appearance
11    of framing bytes in the middle of the message. Most of the time
12    the server will receive one message in one TCP segment (i.e. in
13    one `recv()` operation, but relying on that would break things
14    if the path has lower MTU than the size of a message.
15 """
16
17 from datetime import datetime, timezone
18 from inspect import isclass
19 from logging import getLogger
20 from struct import pack, unpack
21
22 __all__ = (
23     "handle_packet",
24     "inline_response",
25     "make_object",
26     "make_response",
27     "parse_message",
28     "proto_by_name",
29     "GPS303Pkt",
30     "UNKNOWN",
31     "LOGIN",
32     "SUPERVISION",
33     "HEARTBEAT",
34     "GPS_POSITIONING",
35     "GPS_OFFLINE_POSITIONING",
36     "STATUS",
37     "HIBERNATION",
38     "RESET",
39     "WHITELIST_TOTAL",
40     "WIFI_OFFLINE_POSITIONING",
41     "TIME",
42     "MOM_PHONE",
43     "STOP_ALARM",
44     "SETUP",
45     "SYNCHRONOUS_WHITELIST",
46     "RESTORE_PASSWORD",
47     "WIFI_POSITIONING",
48     "MANUAL_POSITIONING",
49     "BATTERY_CHARGE",
50     "CHARGER_CONNECTED",
51     "CHARGER_DISCONNECTED",
52     "VIBRATION_RECEIVED",
53     "POSITION_UPLOAD_INTERVAL",
54 )
55
56 log = getLogger("gps303")
57
58
59 class GPS303Pkt:
60     PROTO: int
61     INLINE = True
62
63     def __init__(self, *args, **kwargs):
64         assert len(args) == 0
65         for k, v in kwargs.items():
66             setattr(self, k, v)
67
68     def __repr__(self):
69         return "{}({})".format(
70             self.__class__.__name__,
71             ", ".join(
72                 "{}={}".format(
73                     k,
74                     'bytes.fromhex("{}")'.format(v.hex())
75                     if isinstance(v, bytes)
76                     else v.__repr__(),
77                 )
78                 for k, v in self.__dict__.items()
79                 if not k.startswith("_")
80             ),
81         )
82
83     @classmethod
84     def from_packet(cls, length, payload):
85         return cls(payload=payload, length=length)
86
87     def to_packet(self):
88         return pack("BB", self.length, self.PROTO) + self.payload
89
90     @classmethod
91     def make_packet(cls, payload):
92         assert isinstance(payload, bytes)
93         length = len(payload) + 1
94         if length > 6:
95             length -= 6
96         return pack("BB", length, cls.PROTO) + payload
97
98     @classmethod
99     def inline_response(cls, packet):
100         if cls.INLINE:
101             return cls.make_packet(b"")
102         else:
103             return None
104
105
106 class UNKNOWN(GPS303Pkt):
107     PROTO = 256  # > 255 is impossible in real packets
108     INLINE = False
109
110
111 class LOGIN(GPS303Pkt):
112     PROTO = 0x01
113
114     @classmethod
115     def from_packet(cls, length, payload):
116         self = super().from_packet(length, payload)
117         self.imei = payload[:-1].hex()
118         self.ver = unpack("B", payload[-1:])[0]
119         return self
120
121
122 class SUPERVISION(GPS303Pkt):  # Server sends supervision number status
123     PROTO = 0x05
124     INLINE = False
125
126     def response(self, supnum=0):
127         # 1: The device automatically answers Pickup effect
128         # 2: Automatically Answering Two-way Calls
129         # 3: Ring manually answer the two-way call
130         return self.make_packet(pack("B", supnum))
131
132
133 class HEARTBEAT(GPS303Pkt):
134     PROTO = 0x08
135
136
137 class _GPS_POSITIONING(GPS303Pkt):
138     @classmethod
139     def from_packet(cls, length, payload):
140         self = super().from_packet(length, payload)
141         self.dtime = payload[:6]
142         if self.dtime == b"\0\0\0\0\0\0":
143             self.devtime = None
144         else:
145             self.devtime = datetime(
146                 *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
147             )
148         self.gps_data_length = payload[6] >> 4
149         self.gps_nb_sat = payload[6] & 0x0F
150         lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
151         self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
152         flip_lon = bool(flags & 0b0000100000000000)  # bit 4
153         flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
154         self.heading = flags & 0b0000001111111111  # bits 6 - last
155         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
156         self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
157         self.speed = speed
158         self.flags = flags
159         return self
160
161     @classmethod
162     def inline_response(cls, packet):
163         return cls.make_packet(packet[2:8])
164
165
166 class GPS_POSITIONING(_GPS_POSITIONING):
167     PROTO = 0x10
168
169
170 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
171     PROTO = 0x11
172
173
174 class STATUS(GPS303Pkt):
175     PROTO = 0x13
176     INLINE = False
177
178     @classmethod
179     def from_packet(cls, length, payload):
180         self = super().from_packet(length, payload)
181         if len(payload) == 5:
182             (
183                 self.batt,
184                 self.ver,
185                 self.timezone,
186                 self.intvl,
187                 self.signal,
188             ) = unpack("BBBBB", payload)
189         elif len(payload) == 4:
190             self.batt, self.ver, self.timezone, self.intvl = unpack(
191                 "BBBB", payload
192             )
193             self.signal = None
194         return self
195
196     def response(self, upload_interval=25):  # Set interval in minutes
197         return self.make_packet(pack("B", upload_interval))
198
199
200 class HIBERNATION(GPS303Pkt):
201     PROTO = 0x14
202
203
204 class RESET(GPS303Pkt):  # Device sends when it got reset SMS
205     PROTO = 0x15
206     INLINE = False
207
208     def response(self):  # Server can send to initiate factory reset
209         return self.make_packet(b"")
210
211
212 class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
213     PROTO = 0x16
214     INLINE = False
215
216     def response(self, number=3):  # Number of whitelist entries
217         return self.make_packet(pack("B", number))
218
219
220 class _WIFI_POSITIONING(GPS303Pkt):
221     @classmethod
222     def from_packet(cls, length, payload):
223         self = super().from_packet(length, payload)
224         self.dtime = payload[:6]
225         if self.dtime == b"\0\0\0\0\0\0":
226             self.devtime = None
227         else:
228             self.devtime = datetime.strptime(
229                 self.dtime.hex(), "%y%m%d%H%M%S"
230             ).astimezone(tz=timezone.utc)
231         self.wifi_aps = []
232         for i in range(self.length):  # length has special meaning here
233             slice = payload[6 + i * 7 : 13 + i * 7]
234             self.wifi_aps.append(
235                 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
236             )
237         gsm_slice = payload[6 + self.length * 7 :]
238         ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
239         self.gsm_cells = []
240         for i in range(ncells):
241             slice = gsm_slice[4 + i * 5 : 9 + i * 5]
242             locac, cellid, sigstr = unpack(
243                 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
244             )
245             self.gsm_cells.append((locac, cellid, -sigstr))
246         return self
247
248
249 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
250     PROTO = 0x17
251
252     @classmethod
253     def inline_response(cls, packet):
254         return cls.make_packet(packet[2:8])
255
256
257 class TIME(GPS303Pkt):
258     PROTO = 0x30
259
260     @classmethod
261     def inline_response(cls, packet):
262         return pack(
263             "!BBHBBBBB", 7, cls.PROTO, *datetime.utcnow().timetuple()[:6]
264         )
265
266
267 class PROHIBIT_LBS(GPS303Pkt):
268     PROTO = 0x33
269     INLINE = False
270
271     def response(self, status=1):  # Server sent, 0-off, 1-on
272         return self.make_packet(pack("B", status))
273
274
275 class LBS_SWITCH_TIMES(GPS303Pkt):
276     PROTO = 0x34
277     INLINE = False
278
279     def response(self):
280         return self.make_packet(b"")
281
282
283 class REMOTE_MONITOR_PHONE(GPS303Pkt):
284     PROTO = 0x40
285     INLINE = False
286
287
288 class SOS_PHONE(GPS303Pkt):
289     PROTO = 0x41
290     INLINE = False
291
292
293 class DAD_PHONE(GPS303Pkt):
294     PROTO = 0x42
295     INLINE = False
296
297
298 class MOM_PHONE(GPS303Pkt):
299     PROTO = 0x43
300     INLINE = False
301
302
303 class STOP_UPLOAD(GPS303Pkt):  # Server response to LOGIN to thwart the device
304     PROTO = 0x44
305
306
307 class STOP_ALARM(GPS303Pkt):
308     PROTO = 0x56
309
310
311 class SETUP(GPS303Pkt):
312     PROTO = 0x57
313     INLINE = False
314
315     def response(
316         self,
317         uploadIntervalSeconds=0x0300,
318         binarySwitch=0b00110001,
319         alarms=[0, 0, 0],
320         dndTimeSwitch=0,
321         dndTimes=[0, 0, 0],
322         gpsTimeSwitch=0,
323         gpsTimeStart=0,
324         gpsTimeStop=0,
325         phoneNumbers=["", "", ""],
326     ):
327         def pack3b(x):
328             return pack("!I", x)[1:]
329
330         payload = b"".join(
331             [
332                 pack("!H", uploadIntervalSeconds),
333                 pack("B", binarySwitch),
334             ]
335             + [pack3b(el) for el in alarms]
336             + [
337                 pack("B", dndTimeSwitch),
338             ]
339             + [pack3b(el) for el in dndTimes]
340             + [
341                 pack("B", gpsTimeSwitch),
342                 pack("!H", gpsTimeStart),
343                 pack("!H", gpsTimeStop),
344             ]
345             + [b";".join([el.encode() for el in phoneNumbers])]
346         )
347         return self.make_packet(payload)
348
349
350 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
351     PROTO = 0x58
352
353
354 class RESTORE_PASSWORD(GPS303Pkt):
355     PROTO = 0x67
356
357
358 class WIFI_POSITIONING(_WIFI_POSITIONING):
359     PROTO = 0x69
360     INLINE = False
361
362     def response(self, lat=None, lon=None):
363         if lat is None or lon is None:
364             payload = b""
365         else:
366             payload = "{:+#010.8g},{:+#010.8g}".format(lat, lon).encode(
367                 "ascii"
368             )
369         return self.make_packet(payload)
370
371
372 class MANUAL_POSITIONING(GPS303Pkt):
373     PROTO = 0x80
374     INLINE = False
375
376
377 class BATTERY_CHARGE(GPS303Pkt):
378     PROTO = 0x81
379
380
381 class CHARGER_CONNECTED(GPS303Pkt):
382     PROTO = 0x82
383
384
385 class CHARGER_DISCONNECTED(GPS303Pkt):
386     PROTO = 0x83
387
388
389 class VIBRATION_RECEIVED(GPS303Pkt):
390     PROTO = 0x94
391
392
393 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
394     PROTO = 0x98
395     INLINE = False
396
397     @classmethod
398     def from_packet(cls, length, payload):
399         self = super().from_packet(length, payload)
400         self.interval = unpack("!H", payload[:2])
401         return self
402
403     def response(self, interval=10):
404         return self.make_packet(pack("!H", interval))
405
406
407 class SOS_ALARM(GPS303Pkt):
408     PROTO = 0x99
409
410
411 # Build dicts protocol number -> class and class name -> protocol number
412 CLASSES = {}
413 PROTOS = {}
414 if True:  # just to indent the code, sorry!
415     for cls in [
416         cls
417         for name, cls in globals().items()
418         if isclass(cls)
419         and issubclass(cls, GPS303Pkt)
420         and not name.startswith("_")
421     ]:
422         if hasattr(cls, "PROTO"):
423             CLASSES[cls.PROTO] = cls
424             PROTOS[cls.__name__] = cls.PROTO
425
426
427 def proto_by_name(name):
428     return PROTOS.get(name, -1)
429
430
431 def proto_of_message(packet):
432     return unpack("B", packet[1:2])[0]
433
434
435 def inline_response(packet):
436     proto = proto_of_message(packet)
437     if proto in CLASSES:
438         return CLASSES[proto].inline_response(packet)
439     else:
440         return None
441
442
443 def make_object(length, proto, payload):
444     if proto in CLASSES:
445         return CLASSES[proto].from_packet(length, payload)
446     else:
447         retobj = UNKNOWN.from_packet(length, payload)
448         retobj.PROTO = proto  # Override class attr with object attr
449         return retobj
450
451
452 def parse_message(packet):
453     length, proto = unpack("BB", packet[:2])
454     payload = packet[2:]
455     adjust = 2 if proto == STATUS.PROTO else 4  # Weird special case
456     if (
457         proto not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO)
458         and length > 1
459         and len(payload) + adjust != length
460     ):
461         log.warning(
462             "With proto %d length is %d but payload length is %d+%d",
463             proto,
464             length,
465             len(payload),
466             adjust,
467         )
468     return make_object(length, proto, payload)
469
470
471 def handle_packet(packet):  # DEPRECATED
472     if len(packet) < 6 or packet[:2] != b"xx" or packet[-2:] != b"\r\n":
473         return UNKNOWN.from_packet(len(packet), packet)
474     return parse_message(packet[2:-2])
475
476
477 def make_response(msg, **kwargs):  # DEPRECATED
478     inframe = msg.response(**kwargs)
479     return None if inframe is None else b"xx" + inframe + b"\r\n"