]> www.average.org Git - loctrkd.git/blob - gps303/gps303proto.py
84358fbcb66ee3b750a3dff2e6a4c5cc5b16cf47
[loctrkd.git] / gps303 / gps303proto.py
1 """
2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
5
6 Forewarnings:
7 1. There is no security whatsoever. If you know the module's IMEI,
8    you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10    the TCP stream) makes it vulnerable to coincidental appearance
11    of framing bytes in the middle of the message. Most of the time
12    the server will receive one message in one TCP segment (i.e. in
13    one `recv()` operation, but relying on that would break things
14    if the path has lower MTU than the size of a message.
15 """
16
17 from datetime import datetime, timezone
18 from inspect import isclass
19 from logging import getLogger
20 from struct import pack, unpack
21
22 __all__ = (
23     "handle_packet",
24     "inline_response",
25     "make_object",
26     "make_response",
27     "parse_message",
28     "proto_by_name",
29     "GPS303Pkt",
30     "UNKNOWN",
31     "LOGIN",
32     "SUPERVISION",
33     "HEARTBEAT",
34     "GPS_POSITIONING",
35     "GPS_OFFLINE_POSITIONING",
36     "STATUS",
37     "HIBERNATION",
38     "RESET",
39     "WHITELIST_TOTAL",
40     "WIFI_OFFLINE_POSITIONING",
41     "TIME",
42     "MOM_PHONE",
43     "STOP_ALARM",
44     "SETUP",
45     "SYNCHRONOUS_WHITELIST",
46     "RESTORE_PASSWORD",
47     "WIFI_POSITIONING",
48     "MANUAL_POSITIONING",
49     "BATTERY_CHARGE",
50     "CHARGER_CONNECTED",
51     "CHARGER_DISCONNECTED",
52     "VIBRATION_RECEIVED",
53     "POSITION_UPLOAD_INTERVAL",
54 )
55
56 log = getLogger("gps303")
57
58
59 class GPS303Pkt:
60     PROTO: int
61     INLINE = True
62
63     def __init__(self, *args, **kwargs):
64         assert len(args) == 0
65         for k, v in kwargs.items():
66             setattr(self, k, v)
67
68     def __repr__(self):
69         return "{}({})".format(
70             self.__class__.__name__,
71             ", ".join(
72                 "{}={}".format(
73                     k,
74                     'bytes.fromhex("{}")'.format(v.hex())
75                     if isinstance(v, bytes)
76                     else v.__repr__(),
77                 )
78                 for k, v in self.__dict__.items()
79                 if not k.startswith("_")
80             ),
81         )
82
83     @classmethod
84     def from_packet(cls, length, payload):
85         return cls(payload=payload, length=length)
86
87     def to_packet(self):
88         return pack("BB", self.length, self.PROTO) + self.payload
89
90     @classmethod
91     def make_packet(cls, payload):
92         assert isinstance(payload, bytes)
93         length = len(payload) + 1
94         if length > 6:
95             length -= 6
96         return pack("BB", length, cls.PROTO) + payload
97
98     @classmethod
99     def inline_response(cls, packet):
100         if cls.INLINE:
101             return cls.make_packet(b"")
102         else:
103             return None
104
105
106 class UNKNOWN(GPS303Pkt):
107     PROTO = 256  # > 255 is impossible in real packets
108     INLINE = False
109
110
111 class LOGIN(GPS303Pkt):
112     PROTO = 0x01
113
114     @classmethod
115     def from_packet(cls, length, payload):
116         self = super().from_packet(length, payload)
117         self.imei = payload[:-1].hex()
118         self.ver = unpack("B", payload[-1:])[0]
119         return self
120
121
122 class SUPERVISION(GPS303Pkt):  # Server sends supervision number status
123     PROTO = 0x05
124     INLINE = False
125
126     def response(self, supnum=0):
127         # 1: The device automatically answers Pickup effect
128         # 2: Automatically Answering Two-way Calls
129         # 3: Ring manually answer the two-way call
130         return self.make_packet(pack("B", supnum))
131
132
133 class HEARTBEAT(GPS303Pkt):
134     PROTO = 0x08
135
136
137 class _GPS_POSITIONING(GPS303Pkt):
138     @classmethod
139     def from_packet(cls, length, payload):
140         self = super().from_packet(length, payload)
141         self.dtime = payload[:6]
142         if self.dtime == b"\0\0\0\0\0\0":
143             self.devtime = None
144         else:
145             self.devtime = datetime(
146                 *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
147             )
148         self.gps_data_length = payload[6] >> 4
149         self.gps_nb_sat = payload[6] & 0x0F
150         lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
151         self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
152         flip_lon = bool(flags & 0b0000100000000000)  # bit 4
153         flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
154         self.heading = flags & 0b0000001111111111  # bits 6 - last
155         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
156         self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
157         self.speed = speed
158         self.flags = flags
159         return self
160
161     @classmethod
162     def inline_response(cls, packet):
163         return cls.make_packet(packet[2:8])
164
165
166 class GPS_POSITIONING(_GPS_POSITIONING):
167     PROTO = 0x10
168
169
170 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
171     PROTO = 0x11
172
173
174 class STATUS(GPS303Pkt):
175     PROTO = 0x13
176     INLINE = False
177
178     @classmethod
179     def from_packet(cls, length, payload):
180         self = super().from_packet(length, payload)
181         if len(payload) == 5:
182             (
183                 self.batt,
184                 self.ver,
185                 self.timezone,
186                 self.intvl,
187                 self.signal,
188             ) = unpack("BBBBB", payload)
189         elif len(payload) == 4:
190             self.batt, self.ver, self.timezone, self.intvl = unpack(
191                 "BBBB", payload
192             )
193             self.signal = None
194         return self
195
196     def response(self, upload_interval=25):  # Set interval in minutes
197         return self.make_packet(pack("B", upload_interval))
198
199
200 class HIBERNATION(GPS303Pkt):
201     PROTO = 0x14
202
203
204 class RESET(GPS303Pkt):  # Device sends when it got reset SMS
205     PROTO = 0x15
206     INLINE = False
207
208     def response(self):  # Server can send to initiate factory reset
209         return self.make_packet(b"")
210
211
212 class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
213     PROTO = 0x16
214     INLINE = False
215
216     def response(self, number=3):  # Number of whitelist entries
217         return self.make_packet(pack("B", number))
218
219
220 class _WIFI_POSITIONING(GPS303Pkt):
221     @classmethod
222     def from_packet(cls, length, payload):
223         self = super().from_packet(length, payload)
224         self.dtime = payload[:6]
225         if self.dtime == b"\0\0\0\0\0\0":
226             self.devtime = None
227         else:
228             self.devtime = datetime.strptime(
229                 self.dtime.hex(), "%y%m%d%H%M%S"
230             ).astimezone(tz=timezone.utc)
231         self.wifi_aps = []
232         for i in range(self.length):  # length has special meaning here
233             slice = payload[6 + i * 7 : 13 + i * 7]
234             self.wifi_aps.append(
235                 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
236             )
237         gsm_slice = payload[6 + self.length * 7 :]
238         ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
239         self.gsm_cells = []
240         for i in range(ncells):
241             slice = gsm_slice[4 + i * 5 : 9 + i * 5]
242             locac, cellid, sigstr = unpack(
243                 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
244             )
245             self.gsm_cells.append((locac, cellid, -sigstr))
246         return self
247
248
249 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
250     PROTO = 0x17
251
252     @classmethod
253     def inline_response(cls, packet):
254         return cls.make_packet(packet[2:8])
255
256
257 class TIME(GPS303Pkt):
258     PROTO = 0x30
259
260     @classmethod
261     def inline_response(cls, packet):
262         return pack(
263             "!BBHBBBBB", 7, cls.PROTO, *datetime.utcnow().timetuple()[:6]
264         )
265
266
267 class PROHIBIT_LBS(GPS303Pkt):
268     PROTO = 0x33
269     INLINE = False
270
271     def response(self, status=1):  # Server sent, 0-off, 1-on
272         return self.make_packet(pack("B", status))
273
274
275 class MOM_PHONE(GPS303Pkt):
276     PROTO = 0x43
277
278
279 class STOP_UPLOAD(GPS303Pkt):  # Server response to LOGIN to thwart the device
280     PROTO = 0x44
281
282
283 class STOP_ALARM(GPS303Pkt):
284     PROTO = 0x56
285
286
287 class SETUP(GPS303Pkt):
288     PROTO = 0x57
289     INLINE = False
290
291     def response(
292         self,
293         uploadIntervalSeconds=0x0300,
294         binarySwitch=0b00110001,
295         alarms=[0, 0, 0],
296         dndTimeSwitch=0,
297         dndTimes=[0, 0, 0],
298         gpsTimeSwitch=0,
299         gpsTimeStart=0,
300         gpsTimeStop=0,
301         phoneNumbers=["", "", ""],
302     ):
303         def pack3b(x):
304             return pack("!I", x)[1:]
305
306         payload = b"".join(
307             [
308                 pack("!H", uploadIntervalSeconds),
309                 pack("B", binarySwitch),
310             ]
311             + [pack3b(el) for el in alarms]
312             + [
313                 pack("B", dndTimeSwitch),
314             ]
315             + [pack3b(el) for el in dndTimes]
316             + [
317                 pack("B", gpsTimeSwitch),
318                 pack("!H", gpsTimeStart),
319                 pack("!H", gpsTimeStop),
320             ]
321             + [b";".join([el.encode() for el in phoneNumbers])]
322         )
323         return self.make_packet(payload)
324
325
326 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
327     PROTO = 0x58
328
329
330 class RESTORE_PASSWORD(GPS303Pkt):
331     PROTO = 0x67
332
333
334 class WIFI_POSITIONING(_WIFI_POSITIONING):
335     PROTO = 0x69
336     INLINE = False
337
338     def response(self, lat=None, lon=None):
339         if lat is None or lon is None:
340             payload = b""
341         else:
342             payload = "{:+#010.8g},{:+#010.8g}".format(lat, lon).encode(
343                 "ascii"
344             )
345         return self.make_packet(payload)
346
347
348 class MANUAL_POSITIONING(GPS303Pkt):
349     PROTO = 0x80
350     INLINE = False
351
352
353 class BATTERY_CHARGE(GPS303Pkt):
354     PROTO = 0x81
355
356
357 class CHARGER_CONNECTED(GPS303Pkt):
358     PROTO = 0x82
359
360
361 class CHARGER_DISCONNECTED(GPS303Pkt):
362     PROTO = 0x83
363
364
365 class VIBRATION_RECEIVED(GPS303Pkt):
366     PROTO = 0x94
367
368
369 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
370     PROTO = 0x98
371     INLINE = False
372
373     @classmethod
374     def from_packet(cls, length, payload):
375         self = super().from_packet(length, payload)
376         self.interval = unpack("!H", payload[:2])
377         return self
378
379     def response(self, interval=10):
380         return self.make_packet(pack("!H", interval))
381
382
383 class SOS_ALARM(GPS303Pkt):
384     PROTO = 0x99
385
386
387 # Build dicts protocol number -> class and class name -> protocol number
388 CLASSES = {}
389 PROTOS = {}
390 if True:  # just to indent the code, sorry!
391     for cls in [
392         cls
393         for name, cls in globals().items()
394         if isclass(cls)
395         and issubclass(cls, GPS303Pkt)
396         and not name.startswith("_")
397     ]:
398         if hasattr(cls, "PROTO"):
399             CLASSES[cls.PROTO] = cls
400             PROTOS[cls.__name__] = cls.PROTO
401
402
403 def proto_by_name(name):
404     return PROTOS.get(name, -1)
405
406
407 def proto_of_message(packet):
408     return unpack("B", packet[1:2])[0]
409
410
411 def inline_response(packet):
412     proto = proto_of_message(packet)
413     if proto in CLASSES:
414         return CLASSES[proto].inline_response(packet)
415     else:
416         return None
417
418
419 def make_object(length, proto, payload):
420     if proto in CLASSES:
421         return CLASSES[proto].from_packet(length, payload)
422     else:
423         retobj = UNKNOWN.from_packet(length, payload)
424         retobj.PROTO = proto  # Override class attr with object attr
425         return retobj
426
427
428 def parse_message(packet):
429     length, proto = unpack("BB", packet[:2])
430     payload = packet[2:]
431     adjust = 2 if proto == STATUS.PROTO else 4  # Weird special case
432     if (
433         proto not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO)
434         and length > 1
435         and len(payload) + adjust != length
436     ):
437         log.warning(
438             "With proto %d length is %d but payload length is %d+%d",
439             proto,
440             length,
441             len(payload),
442             adjust,
443         )
444     return make_object(length, proto, payload)
445
446
447 def handle_packet(packet):  # DEPRECATED
448     if len(packet) < 6 or packet[:2] != b"xx" or packet[-2:] != b"\r\n":
449         return UNKNOWN.from_packet(len(packet), packet)
450     return parse_message(packet[2:-2])
451
452
453 def make_response(msg, **kwargs):  # DEPRECATED
454     inframe = msg.response(**kwargs)
455     return None if inframe is None else b"xx" + inframe + b"\r\n"