]> www.average.org Git - loctrkd.git/blob - gps303/GT06mod.py
Work with cell location data; use opencellid
[loctrkd.git] / gps303 / GT06mod.py
1 """
2 Implementation of the protocol used by zx303 GPS+GPRS module
3 Description from https://github.com/tobadia/petGPS/tree/master/resources
4 """
5
6 from datetime import datetime, timezone
7 from inspect import isclass
8 from logging import getLogger
9 from struct import pack, unpack
10
11 __all__ = (
12     "handle_packet",
13     "make_object",
14     "make_response",
15     "set_config",
16     "UNKNOWN",
17     "LOGIN",
18     "SUPERVISION",
19     "HEARTBEAT",
20     "GPS_POSITIONING",
21     "GPS_OFFLINE_POSITIONING",
22     "STATUS",
23     "HIBERNATION",
24     "RESET",
25     "WHITELIST_TOTAL",
26     "WIFI_OFFLINE_POSITIONING",
27     "TIME",
28     "MOM_PHONE",
29     "STOP_ALARM",
30     "SETUP",
31     "SYNCHRONOUS_WHITELIST",
32     "RESTORE_PASSWORD",
33     "WIFI_POSITIONING",
34     "MANUAL_POSITIONING",
35     "BATTERY_CHARGE",
36     "CHARGER_CONNECTED",
37     "CHARGER_DISCONNECTED",
38     "VIBRATION_RECEIVED",
39     "POSITION_UPLOAD_INTERVAL",
40 )
41
42 log = getLogger("gps303")
43
44
45 class _GT06pkt:
46     PROTO: int
47     CONFIG = None
48
49     def __init__(self, *args, **kwargs):
50         assert len(args) == 0
51         for k, v in kwargs.items():
52             setattr(self, k, v)
53
54     def __repr__(self):
55         return "{}({})".format(
56             self.__class__.__name__,
57             ", ".join(
58                 "{}={}".format(
59                     k,
60                     'bytes.fromhex("{}")'.format(v.hex())
61                     if isinstance(v, bytes)
62                     else v.__repr__(),
63                 )
64                 for k, v in self.__dict__.items()
65                 if not k.startswith("_")
66             ),
67         )
68
69     @classmethod
70     def from_packet(cls, length, proto, payload):
71         return cls(proto=proto, payload=payload, length=length)
72
73     def response(self, *args):
74         if len(args) == 0:
75             return None
76         assert len(args) == 1 and isinstance(args[0], bytes)
77         payload = args[0]
78         length = len(payload) + 1
79         if length > 6:
80             length -= 6
81         return b"xx" + pack("BB", length, self.proto) + payload + b"\r\n"
82
83
84 class UNKNOWN(_GT06pkt):
85     pass
86
87
88 class LOGIN(_GT06pkt):
89     PROTO = 0x01
90
91     @classmethod
92     def from_packet(cls, length, proto, payload):
93         self = super().from_packet(length, proto, payload)
94         self.imei = payload[:-1].hex()
95         self.ver = unpack("B", payload[-1:])[0]
96         return self
97
98     def response(self):
99         return super().response(b"")
100
101
102 class SUPERVISION(_GT06pkt):
103     PROTO = 0x05
104
105
106 class HEARTBEAT(_GT06pkt):
107     PROTO = 0x08
108
109
110 class _GPS_POSITIONING(_GT06pkt):
111     @classmethod
112     def from_packet(cls, length, proto, payload):
113         self = super().from_packet(length, proto, payload)
114         self.dtime = payload[:6]
115         if self.dtime == b"\0\0\0\0\0\0":
116             self.devtime = None
117         else:
118             self.devtime = datetime(
119                 *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
120             )
121         self.gps_data_length = payload[6] >> 4
122         self.gps_nb_sat = payload[6] & 0x0F
123         lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
124         self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
125         flip_lon = bool(flags & 0b0000100000000000)  # bit 4
126         flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
127         self.heading = flags & 0b0000001111111111  # bits 6 - last
128         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
129         self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
130         self.speed = speed
131         self.flags = flags
132         return self
133
134     def response(self):
135         return super().response(self.dtime)
136
137
138 class GPS_POSITIONING(_GPS_POSITIONING):
139     PROTO = 0x10
140
141
142 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
143     PROTO = 0x11
144
145
146 class STATUS(_GT06pkt):
147     PROTO = 0x13
148
149     @classmethod
150     def from_packet(cls, length, proto, payload):
151         self = super().from_packet(length, proto, payload)
152         if len(payload) == 5:
153             self.batt, self.ver, self.intvl, self.signal, _ = unpack(
154                 "BBBBB", payload
155             )
156         elif len(payload) == 4:
157             self.batt, self.ver, self.intvl, _ = unpack("BBBB", payload)
158             self.signal = None
159         return self
160
161
162 class HIBERNATION(_GT06pkt):
163     PROTO = 0x14
164
165
166 class RESET(_GT06pkt):
167     PROTO = 0x15
168
169
170 class WHITELIST_TOTAL(_GT06pkt):
171     PROTO = 0x16
172
173
174 class _WIFI_POSITIONING(_GT06pkt):
175     @classmethod
176     def from_packet(cls, length, proto, payload):
177         self = super().from_packet(length, proto, payload)
178         self.dtime = payload[:6]
179         if self.dtime == b"\0\0\0\0\0\0":
180             self.devtime = None
181         else:
182             self.devtime = datetime.strptime(
183                 self.dtime.hex(), "%y%m%d%H%M%S"
184             ).astimezone(tz=timezone.utc)
185         self.wifi_aps = []
186         for i in range(self.length):  # length has special meaning here
187             slice = payload[6 + i * 7 : 13 + i * 7]
188             self.wifi_aps.append(
189                 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
190             )
191         gsm_slice = payload[6 + self.length * 7 :]
192         ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
193         self.gsm_cells = []
194         for i in range(ncells):
195             slice = gsm_slice[4 + i * 5 : 9 + i * 5]
196             locac, cellid, sigstr = unpack(
197                 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
198             )
199             self.gsm_cells.append((locac, cellid, -sigstr))
200         return self
201
202
203 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
204     PROTO = 0x17
205
206     def response(self):
207         return super().response(self.dtime)
208
209
210 class TIME(_GT06pkt):
211     PROTO = 0x30
212
213     def response(self):
214         payload = pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
215         return super().response(payload)
216
217
218 class MOM_PHONE(_GT06pkt):
219     PROTO = 0x43
220
221
222 class STOP_ALARM(_GT06pkt):
223     PROTO = 0x56
224
225
226 class SETUP(_GT06pkt):
227     PROTO = 0x57
228
229     def response(
230         self,
231         uploadIntervalSeconds=0x0300,
232         binarySwitch=0b00110001,
233         alarms=[0, 0, 0],
234         dndTimeSwitch=0,
235         dndTimes=[0, 0, 0],
236         gpsTimeSwitch=0,
237         gpsTimeStart=0,
238         gpsTimeStop=0,
239         phoneNumbers=["", "", ""],
240     ):
241         def pack3b(x):
242             return pack("!I", x)[1:]
243
244         payload = b"".join(
245             [
246                 pack("!H", uploadIntervalSeconds),
247                 pack("B", binarySwitch),
248             ]
249             + [pack3b(el) for el in alarms]
250             + [
251                 pack("B", dndTimeSwitch),
252             ]
253             + [pack3b(el) for el in dndTimes]
254             + [
255                 pack("B", gpsTimeSwitch),
256                 pack("!H", gpsTimeStart),
257                 pack("!H", gpsTimeStop),
258             ]
259             + [b";".join([el.encode() for el in phoneNumbers])]
260         )
261         return super().response(payload)
262
263
264 class SYNCHRONOUS_WHITELIST(_GT06pkt):
265     PROTO = 0x58
266
267
268 class RESTORE_PASSWORD(_GT06pkt):
269     PROTO = 0x67
270
271
272 class WIFI_POSITIONING(_WIFI_POSITIONING):
273     PROTO = 0x69
274
275     def response(self):
276         payload = b""  # TODO fill payload
277         return super().response(payload)
278
279
280 class MANUAL_POSITIONING(_GT06pkt):
281     PROTO = 0x80
282
283
284 class BATTERY_CHARGE(_GT06pkt):
285     PROTO = 0x81
286
287
288 class CHARGER_CONNECTED(_GT06pkt):
289     PROTO = 0x82
290
291
292 class CHARGER_DISCONNECTED(_GT06pkt):
293     PROTO = 0x83
294
295
296 class VIBRATION_RECEIVED(_GT06pkt):
297     PROTO = 0x94
298
299
300 class POSITION_UPLOAD_INTERVAL(_GT06pkt):
301     PROTO = 0x98
302
303     @classmethod
304     def from_packet(cls, length, proto, payload):
305         self = super().from_packet(length, proto, payload)
306         self.interval = unpack("!H", payload[:2])
307         return self
308
309     def response(self):
310         return super().response(pack("!H", self.interval))
311
312
313 # Build a dict protocol number -> class
314 CLASSES = {}
315 if True:  # just to indent the code, sorry!
316     for cls in [
317         cls
318         for name, cls in globals().items()
319         if isclass(cls)
320         and issubclass(cls, _GT06pkt)
321         and not name.startswith("_")
322     ]:
323         if hasattr(cls, "PROTO"):
324             CLASSES[cls.PROTO] = cls
325
326
327 def make_object(length, proto, payload):
328     if proto in CLASSES:
329         return CLASSES[proto].from_packet(length, proto, payload)
330     else:
331         return UNKNOWN.from_packet(length, proto, payload)
332
333
334 def handle_packet(packet, addr, when):
335     if len(packet) < 6:
336         return UNKNOWN.from_packet(0, 0, packet)
337     else:
338         xx, length, proto = unpack("!2sBB", packet[:4])
339         crlf = packet[-2:]
340         payload = packet[4:-2]
341         adjust = 2 if proto == STATUS.PROTO else 4  # Weird special case
342         if (
343             proto
344             not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO)
345             and length > 1
346             and len(payload) + adjust != length
347         ):
348             log.warning(
349                 "With proto %d length is %d but payload length is %d+%d",
350                 proto,
351                 length,
352                 len(payload),
353                 adjust,
354             )
355         if xx != b"xx" or crlf != b"\r\n":
356             return UNKNOWN.from_packet(length, proto, packet)  # full packet
357         else:
358             return make_object(length, proto, payload)
359
360
361 def make_response(msg):
362     return msg.response()
363
364
365 def set_config(config):  # Note that we are setting _class_ attribute
366     _GT06pkt.CONFIG = config