]> www.average.org Git - loctrkd.git/blob - gps303/gps303proto.py
the whole shebang is working now
[loctrkd.git] / gps303 / gps303proto.py
1 """
2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
5
6 Forewarnings:
7 1. There is no security whatsoever. If you know the module's IMEI,
8    you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10    the TCP stream) makes it vulnerable to coincidental appearance
11    of framing bytes in the middle of the message. Most of the time
12    the server will receive one message in one TCP segment (i.e. in
13    one `recv()` operation, but relying on that would break things
14    if the path has lower MTU than the size of a message.
15 """
16
17 from datetime import datetime, timezone
18 from inspect import isclass
19 from logging import getLogger
20 from struct import pack, unpack
21
22 __all__ = (
23     "handle_packet",
24     "inline_response",
25     "make_object",
26     "make_response",
27     "parse_message",
28     "proto_by_name",
29     "GPS303Pkt",
30     "UNKNOWN",
31     "LOGIN",
32     "SUPERVISION",
33     "HEARTBEAT",
34     "GPS_POSITIONING",
35     "GPS_OFFLINE_POSITIONING",
36     "STATUS",
37     "HIBERNATION",
38     "RESET",
39     "WHITELIST_TOTAL",
40     "WIFI_OFFLINE_POSITIONING",
41     "TIME",
42     "MOM_PHONE",
43     "STOP_ALARM",
44     "SETUP",
45     "SYNCHRONOUS_WHITELIST",
46     "RESTORE_PASSWORD",
47     "WIFI_POSITIONING",
48     "MANUAL_POSITIONING",
49     "BATTERY_CHARGE",
50     "CHARGER_CONNECTED",
51     "CHARGER_DISCONNECTED",
52     "VIBRATION_RECEIVED",
53     "POSITION_UPLOAD_INTERVAL",
54 )
55
56 log = getLogger("gps303")
57
58
59 class GPS303Pkt:
60     PROTO: int
61     INLINE = True
62
63     def __init__(self, *args, **kwargs):
64         assert len(args) == 0
65         for k, v in kwargs.items():
66             setattr(self, k, v)
67
68     def __repr__(self):
69         return "{}({})".format(
70             self.__class__.__name__,
71             ", ".join(
72                 "{}={}".format(
73                     k,
74                     'bytes.fromhex("{}")'.format(v.hex())
75                     if isinstance(v, bytes)
76                     else v.__repr__(),
77                 )
78                 for k, v in self.__dict__.items()
79                 if not k.startswith("_")
80             ),
81         )
82
83     @classmethod
84     def from_packet(cls, length, payload):
85         return cls(payload=payload, length=length)
86
87     def to_packet(self):
88         return pack("BB", self.length, self.PROTO) + self.payload
89
90     @classmethod
91     def make_packet(cls, payload):
92         assert isinstance(payload, bytes)
93         length = len(payload) + 1
94         if length > 6:
95             length -= 6
96         return pack("BB", length, cls.PROTO) + payload
97
98     @classmethod
99     def inline_response(cls, packet):
100         if cls.INLINE:
101             return cls.make_packet(b"")
102         else:
103             return None
104
105
106 class UNKNOWN(GPS303Pkt):
107     PROTO = 256  # > 255 is impossible in real packets
108     INLINE = False
109
110
111 class LOGIN(GPS303Pkt):
112     PROTO = 0x01
113
114     @classmethod
115     def from_packet(cls, length, payload):
116         self = super().from_packet(length, payload)
117         self.imei = payload[:-1].hex()
118         self.ver = unpack("B", payload[-1:])[0]
119         return self
120
121
122 class SUPERVISION(GPS303Pkt):  # Server sends supervision number status
123     PROTO = 0x05
124     INLINE = False
125
126     def response(self, supnum=0):
127         # 1: The device automatically answers Pickup effect
128         # 2: Automatically Answering Two-way Calls
129         # 3: Ring manually answer the two-way call
130         return self.make_packet(pack("B", supnum))
131
132
133 class HEARTBEAT(GPS303Pkt):
134     PROTO = 0x08
135
136
137 class _GPS_POSITIONING(GPS303Pkt):
138     @classmethod
139     def from_packet(cls, length, payload):
140         self = super().from_packet(length, payload)
141         self.dtime = payload[:6]
142         if self.dtime == b"\0\0\0\0\0\0":
143             self.devtime = None
144         else:
145             self.devtime = datetime(
146                 *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
147             )
148         self.gps_data_length = payload[6] >> 4
149         self.gps_nb_sat = payload[6] & 0x0F
150         lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
151         self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
152         flip_lon = bool(flags & 0b0000100000000000)  # bit 4
153         flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
154         self.heading = flags & 0b0000001111111111  # bits 6 - last
155         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
156         self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
157         self.speed = speed
158         self.flags = flags
159         return self
160
161     @classmethod
162     def inline_response(cls, packet):
163         return cls.make_packet(packet[2:8])
164
165
166 class GPS_POSITIONING(_GPS_POSITIONING):
167     PROTO = 0x10
168
169
170 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
171     PROTO = 0x11
172
173
174 class STATUS(GPS303Pkt):
175     PROTO = 0x13
176     INLINE = False
177
178     @classmethod
179     def from_packet(cls, length, payload):
180         self = super().from_packet(length, payload)
181         if len(payload) == 5:
182             (
183                 self.batt,
184                 self.ver,
185                 self.timezone,
186                 self.intvl,
187                 self.signal,
188             ) = unpack("BBBBB", payload)
189         elif len(payload) == 4:
190             self.batt, self.ver, self.timezone, self.intvl = unpack(
191                 "BBBB", payload
192             )
193             self.signal = None
194         return self
195
196     def response(self, upload_interval=25):  # Set interval in minutes
197         return self.make_packet(pack("B", upload_interval))
198
199
200 class HIBERNATION(GPS303Pkt):
201     PROTO = 0x14
202
203
204 class RESET(GPS303Pkt):  # Device sends when it got reset SMS
205     PROTO = 0x15
206     INLINE = False
207
208     def response(self):  # Server can send to initiate factory reset
209         return self.make_packet(b"")
210
211
212 class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
213     PROTO = 0x16
214     INLINE = False
215
216     def response(self, number=3):  # Number of whitelist entries
217         return self.make_packet(pack("B", number))
218
219
220 class _WIFI_POSITIONING(GPS303Pkt):
221     @classmethod
222     def from_packet(cls, length, payload):
223         self = super().from_packet(length, payload)
224         self.dtime = payload[:6]
225         if self.dtime == b"\0\0\0\0\0\0":
226             self.devtime = None
227         else:
228             self.devtime = datetime.strptime(
229                 self.dtime.hex(), "%y%m%d%H%M%S"
230             ).astimezone(tz=timezone.utc)
231         self.wifi_aps = []
232         for i in range(self.length):  # length has special meaning here
233             slice = payload[6 + i * 7 : 13 + i * 7]
234             self.wifi_aps.append(
235                 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
236             )
237         gsm_slice = payload[6 + self.length * 7 :]
238         ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
239         self.gsm_cells = []
240         for i in range(ncells):
241             slice = gsm_slice[4 + i * 5 : 9 + i * 5]
242             locac, cellid, sigstr = unpack(
243                 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
244             )
245             self.gsm_cells.append((locac, cellid, -sigstr))
246         return self
247
248
249 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
250     PROTO = 0x17
251
252     @classmethod
253     def inline_response(cls, packet):
254         return cls.make_packet(packet[2:8])
255
256
257 class TIME(GPS303Pkt):
258     PROTO = 0x30
259
260     @classmethod
261     def inline_response(cls, packet):
262         return pack(
263             "!BBHBBBBB", 7, cls.PROTO, *datetime.utcnow().timetuple()[:6]
264         )
265
266
267 class PROHIBIT_LBS(GPS303Pkt):
268     PROTO = 0x33
269     INLINE = False
270
271     def response(self, status=1):  # Server sent, 0-off, 1-on
272         return self.make_packet(pack("B", status))
273
274
275 class MOM_PHONE(GPS303Pkt):
276     PROTO = 0x43
277
278
279 class STOP_UPLOAD(GPS303Pkt):  # Server response to LOGIN to thwart the device
280     PROTO = 0x44
281
282
283 class STOP_ALARM(GPS303Pkt):
284     PROTO = 0x56
285
286
287 class SETUP(GPS303Pkt):
288     PROTO = 0x57
289     INLINE = False
290
291     def response(
292         self,
293         uploadIntervalSeconds=0x0300,
294         binarySwitch=0b00110001,
295         alarms=[0, 0, 0],
296         dndTimeSwitch=0,
297         dndTimes=[0, 0, 0],
298         gpsTimeSwitch=0,
299         gpsTimeStart=0,
300         gpsTimeStop=0,
301         phoneNumbers=["", "", ""],
302     ):
303         def pack3b(x):
304             return pack("!I", x)[1:]
305
306         payload = b"".join(
307             [
308                 pack("!H", uploadIntervalSeconds),
309                 pack("B", binarySwitch),
310             ]
311             + [pack3b(el) for el in alarms]
312             + [
313                 pack("B", dndTimeSwitch),
314             ]
315             + [pack3b(el) for el in dndTimes]
316             + [
317                 pack("B", gpsTimeSwitch),
318                 pack("!H", gpsTimeStart),
319                 pack("!H", gpsTimeStop),
320             ]
321             + [b";".join([el.encode() for el in phoneNumbers])]
322         )
323         return self.make_packet(payload)
324
325
326 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
327     PROTO = 0x58
328
329
330 class RESTORE_PASSWORD(GPS303Pkt):
331     PROTO = 0x67
332
333
334 class WIFI_POSITIONING(_WIFI_POSITIONING):
335     PROTO = 0x69
336     INLINE = False
337
338     def response(self, lat=None, lon=None):
339         if lat is None or lon is None:
340             payload = b""
341         else:
342             payload = "{:+#010.8g},{:+#010.8g}".format(lat, lon).encode(
343                 "ascii"
344             )
345         return self.make_packet(payload)
346
347
348 class MANUAL_POSITIONING(GPS303Pkt):
349     PROTO = 0x80
350
351
352 class BATTERY_CHARGE(GPS303Pkt):
353     PROTO = 0x81
354
355
356 class CHARGER_CONNECTED(GPS303Pkt):
357     PROTO = 0x82
358
359
360 class CHARGER_DISCONNECTED(GPS303Pkt):
361     PROTO = 0x83
362
363
364 class VIBRATION_RECEIVED(GPS303Pkt):
365     PROTO = 0x94
366
367
368 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
369     PROTO = 0x98
370     INLINE = False
371
372     @classmethod
373     def from_packet(cls, length, payload):
374         self = super().from_packet(length, payload)
375         self.interval = unpack("!H", payload[:2])
376         return self
377
378     def response(self, interval=10):
379         return self.make_packet(pack("!H", interval))
380
381
382 class SOS_ALARM(GPS303Pkt):
383     PROTO = 0x99
384
385
386 # Build dicts protocol number -> class and class name -> protocol number
387 CLASSES = {}
388 PROTOS = {}
389 if True:  # just to indent the code, sorry!
390     for cls in [
391         cls
392         for name, cls in globals().items()
393         if isclass(cls)
394         and issubclass(cls, GPS303Pkt)
395         and not name.startswith("_")
396     ]:
397         if hasattr(cls, "PROTO"):
398             CLASSES[cls.PROTO] = cls
399             PROTOS[cls.__name__] = cls.PROTO
400
401
402 def proto_by_name(name):
403     return PROTOS.get(name, -1)
404
405
406 def proto_of_message(packet):
407     return unpack("B", packet[1:2])[0]
408
409
410 def inline_response(packet):
411     proto = proto_of_message(packet)
412     if proto in CLASSES:
413         return CLASSES[proto].inline_response(packet)
414     else:
415         return None
416
417
418 def make_object(length, proto, payload):
419     if proto in CLASSES:
420         return CLASSES[proto].from_packet(length, payload)
421     else:
422         retobj = UNKNOWN.from_packet(length, payload)
423         retobj.PROTO = proto  # Override class attr with object attr
424         return retobj
425
426
427 def parse_message(packet):
428     length, proto = unpack("BB", packet[:2])
429     payload = packet[2:]
430     adjust = 2 if proto == STATUS.PROTO else 4  # Weird special case
431     if (
432         proto not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO)
433         and length > 1
434         and len(payload) + adjust != length
435     ):
436         log.warning(
437             "With proto %d length is %d but payload length is %d+%d",
438             proto,
439             length,
440             len(payload),
441             adjust,
442         )
443     return make_object(length, proto, payload)
444
445
446 def handle_packet(packet):  # DEPRECATED
447     if len(packet) < 6 or packet[:2] != b"xx" or packet[-2:] != b"\r\n":
448         return UNKNOWN.from_packet(len(packet), packet)
449     return parse_message(packet[2:-2])
450
451
452 def make_response(msg, **kwargs):  # DEPRECATED
453     inframe = msg.response(**kwargs)
454     return None if inframe is None else b"xx" + inframe + b"\r\n"