]> www.average.org Git - loctrkd.git/blob - gps303/gps303proto.py
initial storage service
[loctrkd.git] / gps303 / gps303proto.py
1 """
2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
5
6 Forewarnings:
7 1. There is no security whatsoever. If you know the module's IMEI,
8    you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10    the TCP stream) makes it vulnerable to coincidental appearance
11    of framing bytes in the middle of the message. Most of the time
12    the server will receive one message in one TCP segment (i.e. in
13    one `recv()` operation, but relying on that would break things
14    if the path has lower MTU than the size of a message.
15 """
16
17 from datetime import datetime, timezone
18 from inspect import isclass
19 from logging import getLogger
20 from struct import pack, unpack
21
22 __all__ = (
23     "handle_packet",
24     "make_object",
25     "make_response",
26     "parse_message",
27     "proto_by_name",
28     "set_config",
29     "GPS303Pkt",
30     "UNKNOWN",
31     "LOGIN",
32     "SUPERVISION",
33     "HEARTBEAT",
34     "GPS_POSITIONING",
35     "GPS_OFFLINE_POSITIONING",
36     "STATUS",
37     "HIBERNATION",
38     "RESET",
39     "WHITELIST_TOTAL",
40     "WIFI_OFFLINE_POSITIONING",
41     "TIME",
42     "MOM_PHONE",
43     "STOP_ALARM",
44     "SETUP",
45     "SYNCHRONOUS_WHITELIST",
46     "RESTORE_PASSWORD",
47     "WIFI_POSITIONING",
48     "MANUAL_POSITIONING",
49     "BATTERY_CHARGE",
50     "CHARGER_CONNECTED",
51     "CHARGER_DISCONNECTED",
52     "VIBRATION_RECEIVED",
53     "POSITION_UPLOAD_INTERVAL",
54 )
55
56 log = getLogger("gps303")
57
58
59 class GPS303Pkt:
60     PROTO: int
61     CONFIG = None
62
63     def __init__(self, *args, **kwargs):
64         assert len(args) == 0
65         for k, v in kwargs.items():
66             setattr(self, k, v)
67
68     def __repr__(self):
69         return "{}({})".format(
70             self.__class__.__name__,
71             ", ".join(
72                 "{}={}".format(
73                     k,
74                     'bytes.fromhex("{}")'.format(v.hex())
75                     if isinstance(v, bytes)
76                     else v.__repr__(),
77                 )
78                 for k, v in self.__dict__.items()
79                 if not k.startswith("_")
80             ),
81         )
82
83     @classmethod
84     def from_packet(cls, length, payload):
85         return cls(payload=payload, length=length)
86
87     def to_packet(self):
88         return pack("BB", self.length, self.PROTO) + self.payload
89
90     @classmethod
91     def response(cls, *args):
92         if len(args) == 0:
93             return None
94         assert len(args) == 1 and isinstance(args[0], bytes)
95         payload = args[0]
96         length = len(payload) + 1
97         if length > 6:
98             length -= 6
99         return pack("BB", length, cls.PROTO) + payload
100
101
102 class UNKNOWN(GPS303Pkt):
103     PROTO = 256  # > 255 is impossible in real packets
104
105
106 class LOGIN(GPS303Pkt):
107     PROTO = 0x01
108
109     @classmethod
110     def from_packet(cls, length, payload):
111         self = super().from_packet(length, payload)
112         self.imei = payload[:-1].hex()
113         self.ver = unpack("B", payload[-1:])[0]
114         return self
115
116     @classmethod
117     def response(cls):
118         return super().response(b"")
119
120
121 class SUPERVISION(GPS303Pkt):  # Server sends supervision number status
122     PROTO = 0x05
123
124     @classmethod
125     def response(cls, supnum=0):
126         # 1: The device automatically answers Pickup effect
127         # 2: Automatically Answering Two-way Calls
128         # 3: Ring manually answer the two-way call
129         return super().response(b"")
130
131
132 class HEARTBEAT(GPS303Pkt):
133     PROTO = 0x08
134
135
136 class _GPS_POSITIONING(GPS303Pkt):
137     @classmethod
138     def from_packet(cls, length, payload):
139         self = super().from_packet(length, payload)
140         self.dtime = payload[:6]
141         if self.dtime == b"\0\0\0\0\0\0":
142             self.devtime = None
143         else:
144             self.devtime = datetime(
145                 *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
146             )
147         self.gps_data_length = payload[6] >> 4
148         self.gps_nb_sat = payload[6] & 0x0F
149         lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
150         self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
151         flip_lon =          bool(flags & 0b0000100000000000)  # bit 4
152         flip_lat = not      bool(flags & 0b0000010000000000)  # bit 5
153         self.heading =           flags & 0b0000001111111111   # bits 6 - last
154         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
155         self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
156         self.speed = speed
157         self.flags = flags
158         return self
159
160     def response(self):
161         return super().response(self.dtime)
162
163
164 class GPS_POSITIONING(_GPS_POSITIONING):
165     PROTO = 0x10
166
167
168 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
169     PROTO = 0x11
170
171
172 class STATUS(GPS303Pkt):
173     PROTO = 0x13
174
175     @classmethod
176     def from_packet(cls, length, payload):
177         self = super().from_packet(length, payload)
178         if len(payload) == 5:
179             (
180                 self.batt,
181                 self.ver,
182                 self.timezone,
183                 self.intvl,
184                 self.signal,
185             ) = unpack("BBBBB", payload)
186         elif len(payload) == 4:
187             self.batt, self.ver, self.timezone, self.intvl = unpack(
188                 "BBBB", payload
189             )
190             self.signal = None
191         return self
192
193     def response(self, upload_interval=25):  # Set interval in minutes
194         return super().response(pack("B", upload_interval))
195
196
197 class HIBERNATION(GPS303Pkt):
198     PROTO = 0x14
199
200
201 class RESET(GPS303Pkt):  # Device sends when it got reset SMS
202     PROTO = 0x15
203
204     def response(self):  # Server can send to initiate factory reset
205         return super().response(b"")
206
207
208 class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
209     PROTO = 0x16
210
211     def response(self, number=3):  # Number of whitelist entries
212         return super().response(pack("B", number))
213
214
215 class _WIFI_POSITIONING(GPS303Pkt):
216     @classmethod
217     def from_packet(cls, length, payload):
218         self = super().from_packet(length, payload)
219         self.dtime = payload[:6]
220         if self.dtime == b"\0\0\0\0\0\0":
221             self.devtime = None
222         else:
223             self.devtime = datetime.strptime(
224                 self.dtime.hex(), "%y%m%d%H%M%S"
225             ).astimezone(tz=timezone.utc)
226         self.wifi_aps = []
227         for i in range(self.length):  # length has special meaning here
228             slice = payload[6 + i * 7 : 13 + i * 7]
229             self.wifi_aps.append(
230                 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
231             )
232         gsm_slice = payload[6 + self.length * 7 :]
233         ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
234         self.gsm_cells = []
235         for i in range(ncells):
236             slice = gsm_slice[4 + i * 5 : 9 + i * 5]
237             locac, cellid, sigstr = unpack(
238                 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
239             )
240             self.gsm_cells.append((locac, cellid, -sigstr))
241         return self
242
243
244 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
245     PROTO = 0x17
246
247     def response(self):
248         return super().response(self.dtime)
249
250
251 class TIME(GPS303Pkt):
252     PROTO = 0x30
253
254     def response(self):
255         payload = pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
256         return super().response(payload)
257
258
259 class PROHIBIT_LBS(GPS303Pkt):
260     PROTO = 0x33
261
262     def response(self, status=1):  # Server sent, 0-off, 1-on
263         return super().response(pack("B", status))
264
265
266 class MOM_PHONE(GPS303Pkt):
267     PROTO = 0x43
268
269
270 class STOP_UPLOAD(GPS303Pkt):  # Server response to LOGIN to thwart the device
271     PROTO = 0x44
272
273     def response(self):
274         return super().response(b"")
275
276
277 class STOP_ALARM(GPS303Pkt):
278     PROTO = 0x56
279
280
281 class SETUP(GPS303Pkt):
282     PROTO = 0x57
283
284     def response(
285         self,
286         uploadIntervalSeconds=0x0300,
287         binarySwitch=0b00110001,
288         alarms=[0, 0, 0],
289         dndTimeSwitch=0,
290         dndTimes=[0, 0, 0],
291         gpsTimeSwitch=0,
292         gpsTimeStart=0,
293         gpsTimeStop=0,
294         phoneNumbers=["", "", ""],
295     ):
296         def pack3b(x):
297             return pack("!I", x)[1:]
298
299         payload = b"".join(
300             [
301                 pack("!H", uploadIntervalSeconds),
302                 pack("B", binarySwitch),
303             ]
304             + [pack3b(el) for el in alarms]
305             + [
306                 pack("B", dndTimeSwitch),
307             ]
308             + [pack3b(el) for el in dndTimes]
309             + [
310                 pack("B", gpsTimeSwitch),
311                 pack("!H", gpsTimeStart),
312                 pack("!H", gpsTimeStop),
313             ]
314             + [b";".join([el.encode() for el in phoneNumbers])]
315         )
316         return super().response(payload)
317
318
319 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
320     PROTO = 0x58
321
322
323 class RESTORE_PASSWORD(GPS303Pkt):
324     PROTO = 0x67
325
326
327 class WIFI_POSITIONING(_WIFI_POSITIONING):
328     PROTO = 0x69
329
330     def response(self, lat=None, lon=None):
331         if lat is None or lon is None:
332             payload = b""
333         else:
334             payload = "{:+#010.8g},{:+#010.8g}".format(lat, lon).encode(
335                 "ascii"
336             )
337         return super().response(payload)
338
339
340 class MANUAL_POSITIONING(GPS303Pkt):
341     PROTO = 0x80
342
343
344 class BATTERY_CHARGE(GPS303Pkt):
345     PROTO = 0x81
346
347
348 class CHARGER_CONNECTED(GPS303Pkt):
349     PROTO = 0x82
350
351
352 class CHARGER_DISCONNECTED(GPS303Pkt):
353     PROTO = 0x83
354
355
356 class VIBRATION_RECEIVED(GPS303Pkt):
357     PROTO = 0x94
358
359
360 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
361     PROTO = 0x98
362
363     @classmethod
364     def from_packet(cls, length, payload):
365         self = super().from_packet(length, payload)
366         self.interval = unpack("!H", payload[:2])
367         return self
368
369     def response(self):
370         return super().response(pack("!H", self.interval))
371
372
373 class SOS_ALARM(GPS303Pkt):
374     PROTO = 0x99
375
376
377 # Build dicts protocol number -> class and class name -> protocol number
378 CLASSES = {}
379 PROTOS = {}
380 if True:  # just to indent the code, sorry!
381     for cls in [
382         cls
383         for name, cls in globals().items()
384         if isclass(cls)
385         and issubclass(cls, GPS303Pkt)
386         and not name.startswith("_")
387     ]:
388         if hasattr(cls, "PROTO"):
389             CLASSES[cls.PROTO] = cls
390             PROTOS[cls.__name__] = cls.PROTO
391
392
393 def proto_by_name(name):
394     return PROTOS.get(name, -1)
395
396
397 def proto_of_message(packet):
398     return unpack("B", packet[1:2])[0]
399
400
401 def make_object(length, proto, payload):
402     if proto in CLASSES:
403         return CLASSES[proto].from_packet(length, payload)
404     else:
405         retobj = UNKNOWN.from_packet(length, payload)
406         retobj.PROTO = proto  # Override class attr with object attr
407         return retobj
408
409
410 def parse_message(packet):
411     length, proto = unpack("BB", packet[:2])
412     payload = packet[2:]
413     adjust = 2 if proto == STATUS.PROTO else 4  # Weird special case
414     if (
415         proto
416         not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO)
417         and length > 1
418         and len(payload) + adjust != length
419     ):
420         log.warning(
421             "With proto %d length is %d but payload length is %d+%d",
422             proto,
423             length,
424             len(payload),
425             adjust,
426         )
427     return make_object(length, proto, payload)
428
429
430 def handle_packet(packet):  # DEPRECATED
431     if len(packet) < 6 or packet[:2] != b"xx" or packet[-2:] != b"\r\n":
432         return UNKNOWN.from_packet(len(packet), packet)
433     return parse_message(packet[2:-2])
434
435
436 def make_response(msg, **kwargs):  # DEPRECATED
437     inframe = msg.response(**kwargs)
438     return None if inframe is None else b"xx" + inframe + b"\r\n"
439
440
441 def set_config(config):  # Note that we are setting _class_ attribute
442     GPS303Pkt.CONFIG = config