]> www.average.org Git - loctrkd.git/blob - gps303/GT06mod.py
86cc8aacdc6542bab99a64a40b4befe90e42ddad
[loctrkd.git] / gps303 / GT06mod.py
1 """
2 Implementation of the protocol used by zx303 GPS+GPRS module
3 Description from https://github.com/tobadia/petGPS/tree/master/resources
4 """
5
6 from datetime import datetime, timezone
7 from inspect import isclass
8 from logging import getLogger
9 from struct import pack, unpack
10
11 __all__ = (
12     "handle_packet",
13     "make_object",
14     "make_response",
15     "set_config",
16     "UNKNOWN",
17     "LOGIN",
18     "SUPERVISION",
19     "HEARTBEAT",
20     "GPS_POSITIONING",
21     "GPS_OFFLINE_POSITIONING",
22     "STATUS",
23     "HIBERNATION",
24     "RESET",
25     "WHITELIST_TOTAL",
26     "WIFI_OFFLINE_POSITIONING",
27     "TIME",
28     "MOM_PHONE",
29     "STOP_ALARM",
30     "SETUP",
31     "SYNCHRONOUS_WHITELIST",
32     "RESTORE_PASSWORD",
33     "WIFI_POSITIONING",
34     "MANUAL_POSITIONING",
35     "BATTERY_CHARGE",
36     "CHARGER_CONNECTED",
37     "CHARGER_DISCONNECTED",
38     "VIBRATION_RECEIVED",
39     "POSITION_UPLOAD_INTERVAL",
40 )
41
42 log = getLogger("gps303")
43
44
45 class _GT06pkt:
46     PROTO: int
47     CONFIG = None
48
49     def __init__(self, *args, **kwargs):
50         assert len(args) == 0
51         for k, v in kwargs.items():
52             setattr(self, k, v)
53
54     def __repr__(self):
55         return "{}({})".format(
56             self.__class__.__name__,
57             ", ".join(
58                 "{}={}".format(
59                     k,
60                     'bytes.fromhex("{}")'.format(v.hex())
61                     if isinstance(v, bytes)
62                     else v.__repr__(),
63                 )
64                 for k, v in self.__dict__.items()
65                 if not k.startswith("_")
66             ),
67         )
68
69     @classmethod
70     def from_packet(cls, length, proto, payload):
71         return cls(proto=proto, payload=payload, length=length)
72
73     def response(self, *args):
74         if len(args) == 0:
75             return None
76         assert len(args) == 1 and isinstance(args[0], bytes)
77         payload = args[0]
78         length = len(payload) + 1
79         if length > 6:
80             length -= 6
81         return b"xx" + pack("BB", length, self.proto) + payload + b"\r\n"
82
83
84 class UNKNOWN(_GT06pkt):
85     pass
86
87
88 class LOGIN(_GT06pkt):
89     PROTO = 0x01
90
91     @classmethod
92     def from_packet(cls, length, proto, payload):
93         self = super().from_packet(length, proto, payload)
94         self.imei = payload[:-1].hex()
95         self.ver = unpack("B", payload[-1:])[0]
96         return self
97
98     def response(self):
99         return super().response(b"")
100
101
102 class SUPERVISION(_GT06pkt):  # Server sends supervision number status
103     PROTO = 0x05
104
105     def response(self, supnum=0):
106         # 1: The device automatically answers Pickup effect
107         # 2: Automatically Answering Two-way Calls
108         # 3: Ring manually answer the two-way call
109         return super().response(b"")
110
111
112 class HEARTBEAT(_GT06pkt):
113     PROTO = 0x08
114
115
116 class _GPS_POSITIONING(_GT06pkt):
117     @classmethod
118     def from_packet(cls, length, proto, payload):
119         self = super().from_packet(length, proto, payload)
120         self.dtime = payload[:6]
121         if self.dtime == b"\0\0\0\0\0\0":
122             self.devtime = None
123         else:
124             self.devtime = datetime(
125                 *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
126             )
127         self.gps_data_length = payload[6] >> 4
128         self.gps_nb_sat = payload[6] & 0x0F
129         lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
130         self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
131         flip_lon = bool(flags & 0b0000100000000000)  # bit 4
132         flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
133         self.heading = flags & 0b0000001111111111  # bits 6 - last
134         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
135         self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
136         self.speed = speed
137         self.flags = flags
138         return self
139
140     def response(self):
141         return super().response(self.dtime)
142
143
144 class GPS_POSITIONING(_GPS_POSITIONING):
145     PROTO = 0x10
146
147
148 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
149     PROTO = 0x11
150
151
152 class STATUS(_GT06pkt):
153     PROTO = 0x13
154
155     @classmethod
156     def from_packet(cls, length, proto, payload):
157         self = super().from_packet(length, proto, payload)
158         if len(payload) == 5:
159             (
160                 self.batt,
161                 self.ver,
162                 self.timezone,
163                 self.intvl,
164                 self.signal,
165             ) = unpack("BBBBB", payload)
166         elif len(payload) == 4:
167             self.batt, self.ver, self.timezone, self.intvl = unpack(
168                 "BBBB", payload
169             )
170             self.signal = None
171         return self
172
173     def response(self, upload_interval=25):  # Set interval in minutes
174         return super().response(pack("B", upload_interval))
175
176
177 class HIBERNATION(_GT06pkt):
178     PROTO = 0x14
179
180
181 class RESET(_GT06pkt):  # Device sends when it got reset SMS
182     PROTO = 0x15
183
184     def response(self):  # Server can send to initiate factory reset
185         return super().response(b"")
186
187
188 class WHITELIST_TOTAL(_GT06pkt):  # Server sends to initiage sync (0x58)
189     PROTO = 0x16
190
191     def response(self, number=3):  # Number of whitelist entries
192         return super().response(pack("B", number))
193
194
195 class _WIFI_POSITIONING(_GT06pkt):
196     @classmethod
197     def from_packet(cls, length, proto, payload):
198         self = super().from_packet(length, proto, payload)
199         self.dtime = payload[:6]
200         if self.dtime == b"\0\0\0\0\0\0":
201             self.devtime = None
202         else:
203             self.devtime = datetime.strptime(
204                 self.dtime.hex(), "%y%m%d%H%M%S"
205             ).astimezone(tz=timezone.utc)
206         self.wifi_aps = []
207         for i in range(self.length):  # length has special meaning here
208             slice = payload[6 + i * 7 : 13 + i * 7]
209             self.wifi_aps.append(
210                 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
211             )
212         gsm_slice = payload[6 + self.length * 7 :]
213         ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
214         self.gsm_cells = []
215         for i in range(ncells):
216             slice = gsm_slice[4 + i * 5 : 9 + i * 5]
217             locac, cellid, sigstr = unpack(
218                 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
219             )
220             self.gsm_cells.append((locac, cellid, -sigstr))
221         return self
222
223
224 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
225     PROTO = 0x17
226
227     def response(self):
228         return super().response(self.dtime)
229
230
231 class TIME(_GT06pkt):
232     PROTO = 0x30
233
234     def response(self):
235         payload = pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
236         return super().response(payload)
237
238
239 class PROHIBIT_LBS(_GT06pkt):
240     PROTO = 0x33
241
242     def response(self, status=1):  # Server sent, 0-off, 1-on
243         return super().response(pack("B", status))
244
245
246 class MOM_PHONE(_GT06pkt):
247     PROTO = 0x43
248
249
250 class STOP_UPLOAD(_GT06pkt):  # Server response to LOGIN to thwart the device
251     PROTO = 0x44
252
253     def response(self):
254         return super().response(b"")
255
256
257 class STOP_ALARM(_GT06pkt):
258     PROTO = 0x56
259
260
261 class SETUP(_GT06pkt):
262     PROTO = 0x57
263
264     def response(
265         self,
266         uploadIntervalSeconds=0x0300,
267         binarySwitch=0b00110001,
268         alarms=[0, 0, 0],
269         dndTimeSwitch=0,
270         dndTimes=[0, 0, 0],
271         gpsTimeSwitch=0,
272         gpsTimeStart=0,
273         gpsTimeStop=0,
274         phoneNumbers=["", "", ""],
275     ):
276         def pack3b(x):
277             return pack("!I", x)[1:]
278
279         payload = b"".join(
280             [
281                 pack("!H", uploadIntervalSeconds),
282                 pack("B", binarySwitch),
283             ]
284             + [pack3b(el) for el in alarms]
285             + [
286                 pack("B", dndTimeSwitch),
287             ]
288             + [pack3b(el) for el in dndTimes]
289             + [
290                 pack("B", gpsTimeSwitch),
291                 pack("!H", gpsTimeStart),
292                 pack("!H", gpsTimeStop),
293             ]
294             + [b";".join([el.encode() for el in phoneNumbers])]
295         )
296         return super().response(payload)
297
298
299 class SYNCHRONOUS_WHITELIST(_GT06pkt):
300     PROTO = 0x58
301
302
303 class RESTORE_PASSWORD(_GT06pkt):
304     PROTO = 0x67
305
306
307 class WIFI_POSITIONING(_WIFI_POSITIONING):
308     PROTO = 0x69
309
310     def response(self):
311         payload = b""  # TODO fill payload
312         return super().response(payload)
313
314
315 class MANUAL_POSITIONING(_GT06pkt):
316     PROTO = 0x80
317
318
319 class BATTERY_CHARGE(_GT06pkt):
320     PROTO = 0x81
321
322
323 class CHARGER_CONNECTED(_GT06pkt):
324     PROTO = 0x82
325
326
327 class CHARGER_DISCONNECTED(_GT06pkt):
328     PROTO = 0x83
329
330
331 class VIBRATION_RECEIVED(_GT06pkt):
332     PROTO = 0x94
333
334
335 class POSITION_UPLOAD_INTERVAL(_GT06pkt):
336     PROTO = 0x98
337
338     @classmethod
339     def from_packet(cls, length, proto, payload):
340         self = super().from_packet(length, proto, payload)
341         self.interval = unpack("!H", payload[:2])
342         return self
343
344     def response(self):
345         return super().response(pack("!H", self.interval))
346
347
348 class SOS_ALARM(_GT06pkt):
349     PROTO = 0x99
350
351
352 # Build a dict protocol number -> class
353 CLASSES = {}
354 if True:  # just to indent the code, sorry!
355     for cls in [
356         cls
357         for name, cls in globals().items()
358         if isclass(cls)
359         and issubclass(cls, _GT06pkt)
360         and not name.startswith("_")
361     ]:
362         if hasattr(cls, "PROTO"):
363             CLASSES[cls.PROTO] = cls
364
365
366 def make_object(length, proto, payload):
367     if proto in CLASSES:
368         return CLASSES[proto].from_packet(length, proto, payload)
369     else:
370         return UNKNOWN.from_packet(length, proto, payload)
371
372
373 def handle_packet(packet, addr, when):
374     if len(packet) < 6:
375         return UNKNOWN.from_packet(0, 0, packet)
376     else:
377         xx, length, proto = unpack("!2sBB", packet[:4])
378         crlf = packet[-2:]
379         payload = packet[4:-2]
380         adjust = 2 if proto == STATUS.PROTO else 4  # Weird special case
381         if (
382             proto
383             not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO)
384             and length > 1
385             and len(payload) + adjust != length
386         ):
387             log.warning(
388                 "With proto %d length is %d but payload length is %d+%d",
389                 proto,
390                 length,
391                 len(payload),
392                 adjust,
393             )
394         if xx != b"xx" or crlf != b"\r\n":
395             return UNKNOWN.from_packet(length, proto, packet)  # full packet
396         else:
397             return make_object(length, proto, payload)
398
399
400 def make_response(msg):
401     return msg.response()
402
403
404 def set_config(config):  # Note that we are setting _class_ attribute
405     _GT06pkt.CONFIG = config