]> www.average.org Git - loctrkd.git/blob - gps303/gps303proto.py
rename protocol module to "gps303proto"
[loctrkd.git] / gps303 / gps303proto.py
1 """
2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
5
6 Forewarnings:
7 1. There is no security whatsoever. If you know the module's IMEI,
8    you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10    the TCP stream) makes it vulnerable to coincidental appearance
11    of framing bytes in the middle of the message. Most of the time
12    the server will receive one message in one TCP segment (i.e. in
13    one `recv()` operation, but relying on that would break things
14    if the path has lower MTU than the size of a message.
15 """
16
17 from datetime import datetime, timezone
18 from inspect import isclass
19 from logging import getLogger
20 from struct import pack, unpack
21
22 __all__ = (
23     "handle_packet",
24     "make_object",
25     "make_response",
26     "set_config",
27     "UNKNOWN",
28     "LOGIN",
29     "SUPERVISION",
30     "HEARTBEAT",
31     "GPS_POSITIONING",
32     "GPS_OFFLINE_POSITIONING",
33     "STATUS",
34     "HIBERNATION",
35     "RESET",
36     "WHITELIST_TOTAL",
37     "WIFI_OFFLINE_POSITIONING",
38     "TIME",
39     "MOM_PHONE",
40     "STOP_ALARM",
41     "SETUP",
42     "SYNCHRONOUS_WHITELIST",
43     "RESTORE_PASSWORD",
44     "WIFI_POSITIONING",
45     "MANUAL_POSITIONING",
46     "BATTERY_CHARGE",
47     "CHARGER_CONNECTED",
48     "CHARGER_DISCONNECTED",
49     "VIBRATION_RECEIVED",
50     "POSITION_UPLOAD_INTERVAL",
51 )
52
53 log = getLogger("gps303")
54
55
56 class _GT06pkt:
57     PROTO: int
58     CONFIG = None
59
60     def __init__(self, *args, **kwargs):
61         assert len(args) == 0
62         for k, v in kwargs.items():
63             setattr(self, k, v)
64
65     def __repr__(self):
66         return "{}({})".format(
67             self.__class__.__name__,
68             ", ".join(
69                 "{}={}".format(
70                     k,
71                     'bytes.fromhex("{}")'.format(v.hex())
72                     if isinstance(v, bytes)
73                     else v.__repr__(),
74                 )
75                 for k, v in self.__dict__.items()
76                 if not k.startswith("_")
77             ),
78         )
79
80     @classmethod
81     def from_packet(cls, length, proto, payload):
82         return cls(proto=proto, payload=payload, length=length)
83
84     def response(self, *args):
85         if len(args) == 0:
86             return None
87         assert len(args) == 1 and isinstance(args[0], bytes)
88         payload = args[0]
89         length = len(payload) + 1
90         if length > 6:
91             length -= 6
92         return b"xx" + pack("BB", length, self.proto) + payload + b"\r\n"
93
94
95 class UNKNOWN(_GT06pkt):
96     pass
97
98
99 class LOGIN(_GT06pkt):
100     PROTO = 0x01
101
102     @classmethod
103     def from_packet(cls, length, proto, payload):
104         self = super().from_packet(length, proto, payload)
105         self.imei = payload[:-1].hex()
106         self.ver = unpack("B", payload[-1:])[0]
107         return self
108
109     def response(self):
110         return super().response(b"")
111
112
113 class SUPERVISION(_GT06pkt):  # Server sends supervision number status
114     PROTO = 0x05
115
116     def response(self, supnum=0):
117         # 1: The device automatically answers Pickup effect
118         # 2: Automatically Answering Two-way Calls
119         # 3: Ring manually answer the two-way call
120         return super().response(b"")
121
122
123 class HEARTBEAT(_GT06pkt):
124     PROTO = 0x08
125
126
127 class _GPS_POSITIONING(_GT06pkt):
128     @classmethod
129     def from_packet(cls, length, proto, payload):
130         self = super().from_packet(length, proto, payload)
131         self.dtime = payload[:6]
132         if self.dtime == b"\0\0\0\0\0\0":
133             self.devtime = None
134         else:
135             self.devtime = datetime(
136                 *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
137             )
138         self.gps_data_length = payload[6] >> 4
139         self.gps_nb_sat = payload[6] & 0x0F
140         lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
141         self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
142         flip_lon = bool(flags & 0b0000100000000000)  # bit 4
143         flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
144         self.heading = flags & 0b0000001111111111  # bits 6 - last
145         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
146         self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
147         self.speed = speed
148         self.flags = flags
149         return self
150
151     def response(self):
152         return super().response(self.dtime)
153
154
155 class GPS_POSITIONING(_GPS_POSITIONING):
156     PROTO = 0x10
157
158
159 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
160     PROTO = 0x11
161
162
163 class STATUS(_GT06pkt):
164     PROTO = 0x13
165
166     @classmethod
167     def from_packet(cls, length, proto, payload):
168         self = super().from_packet(length, proto, payload)
169         if len(payload) == 5:
170             (
171                 self.batt,
172                 self.ver,
173                 self.timezone,
174                 self.intvl,
175                 self.signal,
176             ) = unpack("BBBBB", payload)
177         elif len(payload) == 4:
178             self.batt, self.ver, self.timezone, self.intvl = unpack(
179                 "BBBB", payload
180             )
181             self.signal = None
182         return self
183
184     def response(self, upload_interval=25):  # Set interval in minutes
185         return super().response(pack("B", upload_interval))
186
187
188 class HIBERNATION(_GT06pkt):
189     PROTO = 0x14
190
191
192 class RESET(_GT06pkt):  # Device sends when it got reset SMS
193     PROTO = 0x15
194
195     def response(self):  # Server can send to initiate factory reset
196         return super().response(b"")
197
198
199 class WHITELIST_TOTAL(_GT06pkt):  # Server sends to initiage sync (0x58)
200     PROTO = 0x16
201
202     def response(self, number=3):  # Number of whitelist entries
203         return super().response(pack("B", number))
204
205
206 class _WIFI_POSITIONING(_GT06pkt):
207     @classmethod
208     def from_packet(cls, length, proto, payload):
209         self = super().from_packet(length, proto, payload)
210         self.dtime = payload[:6]
211         if self.dtime == b"\0\0\0\0\0\0":
212             self.devtime = None
213         else:
214             self.devtime = datetime.strptime(
215                 self.dtime.hex(), "%y%m%d%H%M%S"
216             ).astimezone(tz=timezone.utc)
217         self.wifi_aps = []
218         for i in range(self.length):  # length has special meaning here
219             slice = payload[6 + i * 7 : 13 + i * 7]
220             self.wifi_aps.append(
221                 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
222             )
223         gsm_slice = payload[6 + self.length * 7 :]
224         ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
225         self.gsm_cells = []
226         for i in range(ncells):
227             slice = gsm_slice[4 + i * 5 : 9 + i * 5]
228             locac, cellid, sigstr = unpack(
229                 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
230             )
231             self.gsm_cells.append((locac, cellid, -sigstr))
232         return self
233
234
235 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
236     PROTO = 0x17
237
238     def response(self):
239         return super().response(self.dtime)
240
241
242 class TIME(_GT06pkt):
243     PROTO = 0x30
244
245     def response(self):
246         payload = pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
247         return super().response(payload)
248
249
250 class PROHIBIT_LBS(_GT06pkt):
251     PROTO = 0x33
252
253     def response(self, status=1):  # Server sent, 0-off, 1-on
254         return super().response(pack("B", status))
255
256
257 class MOM_PHONE(_GT06pkt):
258     PROTO = 0x43
259
260
261 class STOP_UPLOAD(_GT06pkt):  # Server response to LOGIN to thwart the device
262     PROTO = 0x44
263
264     def response(self):
265         return super().response(b"")
266
267
268 class STOP_ALARM(_GT06pkt):
269     PROTO = 0x56
270
271
272 class SETUP(_GT06pkt):
273     PROTO = 0x57
274
275     def response(
276         self,
277         uploadIntervalSeconds=0x0300,
278         binarySwitch=0b00110001,
279         alarms=[0, 0, 0],
280         dndTimeSwitch=0,
281         dndTimes=[0, 0, 0],
282         gpsTimeSwitch=0,
283         gpsTimeStart=0,
284         gpsTimeStop=0,
285         phoneNumbers=["", "", ""],
286     ):
287         def pack3b(x):
288             return pack("!I", x)[1:]
289
290         payload = b"".join(
291             [
292                 pack("!H", uploadIntervalSeconds),
293                 pack("B", binarySwitch),
294             ]
295             + [pack3b(el) for el in alarms]
296             + [
297                 pack("B", dndTimeSwitch),
298             ]
299             + [pack3b(el) for el in dndTimes]
300             + [
301                 pack("B", gpsTimeSwitch),
302                 pack("!H", gpsTimeStart),
303                 pack("!H", gpsTimeStop),
304             ]
305             + [b";".join([el.encode() for el in phoneNumbers])]
306         )
307         return super().response(payload)
308
309
310 class SYNCHRONOUS_WHITELIST(_GT06pkt):
311     PROTO = 0x58
312
313
314 class RESTORE_PASSWORD(_GT06pkt):
315     PROTO = 0x67
316
317
318 class WIFI_POSITIONING(_WIFI_POSITIONING):
319     PROTO = 0x69
320
321     def response(self, lat=None, lon=None):
322         if lat is None or lon is None:
323             payload = b""
324         else:
325             payload = "{:+#010.8g},{:+#010.8g}".format(lat, lon).encode(
326                 "ascii"
327             )
328         return super().response(payload)
329
330
331 class MANUAL_POSITIONING(_GT06pkt):
332     PROTO = 0x80
333
334
335 class BATTERY_CHARGE(_GT06pkt):
336     PROTO = 0x81
337
338
339 class CHARGER_CONNECTED(_GT06pkt):
340     PROTO = 0x82
341
342
343 class CHARGER_DISCONNECTED(_GT06pkt):
344     PROTO = 0x83
345
346
347 class VIBRATION_RECEIVED(_GT06pkt):
348     PROTO = 0x94
349
350
351 class POSITION_UPLOAD_INTERVAL(_GT06pkt):
352     PROTO = 0x98
353
354     @classmethod
355     def from_packet(cls, length, proto, payload):
356         self = super().from_packet(length, proto, payload)
357         self.interval = unpack("!H", payload[:2])
358         return self
359
360     def response(self):
361         return super().response(pack("!H", self.interval))
362
363
364 class SOS_ALARM(_GT06pkt):
365     PROTO = 0x99
366
367
368 # Build a dict protocol number -> class
369 CLASSES = {}
370 if True:  # just to indent the code, sorry!
371     for cls in [
372         cls
373         for name, cls in globals().items()
374         if isclass(cls)
375         and issubclass(cls, _GT06pkt)
376         and not name.startswith("_")
377     ]:
378         if hasattr(cls, "PROTO"):
379             CLASSES[cls.PROTO] = cls
380
381
382 def make_object(length, proto, payload):
383     if proto in CLASSES:
384         return CLASSES[proto].from_packet(length, proto, payload)
385     else:
386         return UNKNOWN.from_packet(length, proto, payload)
387
388
389 def handle_packet(packet, addr, when):
390     if len(packet) < 6:
391         return UNKNOWN.from_packet(0, 0, packet)
392     else:
393         xx, length, proto = unpack("!2sBB", packet[:4])
394         crlf = packet[-2:]
395         payload = packet[4:-2]
396         adjust = 2 if proto == STATUS.PROTO else 4  # Weird special case
397         if (
398             proto
399             not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO)
400             and length > 1
401             and len(payload) + adjust != length
402         ):
403             log.warning(
404                 "With proto %d length is %d but payload length is %d+%d",
405                 proto,
406                 length,
407                 len(payload),
408                 adjust,
409             )
410         if xx != b"xx" or crlf != b"\r\n":
411             return UNKNOWN.from_packet(length, proto, packet)  # full packet
412         else:
413             return make_object(length, proto, payload)
414
415
416 def make_response(msg, **kwargs):
417     return msg.response(**kwargs)
418
419
420 def set_config(config):  # Note that we are setting _class_ attribute
421     _GT06pkt.CONFIG = config