]> www.average.org Git - loctrkd.git/blob - gps303/GT06mod.py
aa8246facd3823946cb6aaef89df746ab8d345d4
[loctrkd.git] / gps303 / GT06mod.py
1 """
2 Implementation of the protocol used by zx303 GPS+GPRS module
3 Description from https://github.com/tobadia/petGPS/tree/master/resources
4 """
5
6 from datetime import datetime
7 from inspect import isclass
8 from logging import getLogger
9 from struct import pack, unpack
10
11 __all__ = (
12     "handle_packet",
13     "make_object",
14     "make_response",
15     "set_config",
16     "UNKNOWN",
17     "LOGIN",
18     "SUPERVISION",
19     "HEARTBEAT",
20     "GPS_POSITIONING",
21     "GPS_OFFLINE_POSITIONING",
22     "STATUS",
23     "HIBERNATION",
24     "RESET",
25     "WHITELIST_TOTAL",
26     "WIFI_OFFLINE_POSITIONING",
27     "TIME",
28     "MOM_PHONE",
29     "STOP_ALARM",
30     "SETUP",
31     "SYNCHRONOUS_WHITELIST",
32     "RESTORE_PASSWORD",
33     "WIFI_POSITIONING",
34     "MANUAL_POSITIONING",
35     "BATTERY_CHARGE",
36     "CHARGER_CONNECTED",
37     "CHARGER_DISCONNECTED",
38     "VIBRATION_RECEIVED",
39     "POSITION_UPLOAD_INTERVAL",
40 )
41
42 log = getLogger("gps303")
43
44
45 class _GT06pkt:
46     PROTO: int
47     CONFIG = None
48
49     def __init__(self, *args, **kwargs):
50         assert len(args) == 0
51         for k, v in kwargs.items():
52             setattr(self, k, v)
53
54     def __repr__(self):
55         return "{}({})".format(
56             self.__class__.__name__,
57             ", ".join(
58                 "{}={}".format(
59                     k,
60                     'bytes.fromhex("{}")'.format(v.hex())
61                     if isinstance(v, bytes)
62                     else v.__repr__(),
63                 )
64                 for k, v in self.__dict__.items()
65                 if not k.startswith("_")
66             ),
67         )
68
69     @classmethod
70     def from_packet(cls, proto, payload):
71         return cls(proto=proto, payload=payload)
72
73     def response(self, *args):
74         if len(args) == 0:
75             return None
76         assert len(args) == 1 and isinstance(args[0], bytes)
77         payload = args[0]
78         length = len(payload) + 1
79         if length > 6:
80             length -= 6
81         return b"xx" + pack("BB", length, self.proto) + payload + b"\r\n"
82
83
84 class UNKNOWN(_GT06pkt):
85     pass
86
87
88 class LOGIN(_GT06pkt):
89     PROTO = 0x01
90
91     @classmethod
92     def from_packet(cls, proto, payload):
93         self = super().from_packet(proto, payload)
94         self.imei = payload[:-1].hex()
95         self.ver = unpack("B", payload[-1:])[0]
96         return self
97
98     def response(self):
99         return super().response(b"")
100
101
102 class SUPERVISION(_GT06pkt):
103     PROTO = 0x05
104
105
106 class HEARTBEAT(_GT06pkt):
107     PROTO = 0x08
108
109
110 class _GPS_POSITIONING(_GT06pkt):
111     @classmethod
112     def from_packet(cls, proto, payload):
113         self = super().from_packet(proto, payload)
114         self.dtime = payload[:6]
115         # TODO parse the rest
116         return self
117
118     def response(self):
119         return super().response(self.dtime)
120
121
122 class GPS_POSITIONING(_GPS_POSITIONING):
123     PROTO = 0x10
124
125
126 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
127     PROTO = 0x11
128
129
130 class STATUS(_GT06pkt):
131     PROTO = 0x13
132
133     @classmethod
134     def from_packet(cls, proto, payload):
135         self = super().from_packet(proto, payload)
136         if len(payload) == 5:
137             self.batt, self.ver, self.intvl, self.signal, _ = unpack(
138                 "BBBBB", payload
139             )
140         elif len(payload) == 4:
141             self.batt, self.ver, self.intvl, _ = unpack("BBBB", payload)
142             self.signal = None
143         return self
144
145
146 class HIBERNATION(_GT06pkt):
147     PROTO = 0x14
148
149
150 class RESET(_GT06pkt):
151     PROTO = 0x15
152
153
154 class WHITELIST_TOTAL(_GT06pkt):
155     PROTO = 0x16
156
157
158 class WIFI_OFFLINE_POSITIONING(_GT06pkt):
159     PROTO = 0x17
160
161
162 class TIME(_GT06pkt):
163     PROTO = 0x30
164
165     def response(self):
166         payload = pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
167         return super().response(payload)
168
169
170 class MOM_PHONE(_GT06pkt):
171     PROTO = 0x43
172
173
174 class STOP_ALARM(_GT06pkt):
175     PROTO = 0x56
176
177
178 class SETUP(_GT06pkt):
179     PROTO = 0x57
180
181     def response(
182         self,
183         uploadIntervalSeconds=0x0300,
184         binarySwitch=0b00110001,
185         alarms=[0, 0, 0],
186         dndTimeSwitch=0,
187         dndTimes=[0, 0, 0],
188         gpsTimeSwitch=0,
189         gpsTimeStart=0,
190         gpsTimeStop=0,
191         phoneNumbers=["", "", ""],
192     ):
193         def pack3b(x):
194             return pack("!I", x)[1:]
195
196         payload = b"".join(
197             [
198                 pack("!H", uploadIntervalSeconds),
199                 pack("B", binarySwitch),
200             ]
201             + [pack3b(el) for el in alarms]
202             + [
203                 pack("B", dndTimeSwitch),
204             ]
205             + [pack3b(el) for el in dndTimes]
206             + [
207                 pack("B", gpsTimeSwitch),
208                 pack("!H", gpsTimeStart),
209                 pack("!H", gpsTimeStop),
210             ]
211             + [b";".join([el.encode() for el in phoneNumbers])]
212         )
213         return super().response(payload)
214
215
216 class SYNCHRONOUS_WHITELIST(_GT06pkt):
217     PROTO = 0x58
218
219
220 class RESTORE_PASSWORD(_GT06pkt):
221     PROTO = 0x67
222
223
224 class WIFI_POSITIONING(_GT06pkt):
225     PROTO = 0x69
226
227
228 class MANUAL_POSITIONING(_GT06pkt):
229     PROTO = 0x80
230
231
232 class BATTERY_CHARGE(_GT06pkt):
233     PROTO = 0x81
234
235
236 class CHARGER_CONNECTED(_GT06pkt):
237     PROTO = 0x82
238
239
240 class CHARGER_DISCONNECTED(_GT06pkt):
241     PROTO = 0x83
242
243
244 class VIBRATION_RECEIVED(_GT06pkt):
245     PROTO = 0x94
246
247
248 class POSITION_UPLOAD_INTERVAL(_GT06pkt):
249     PROTO = 0x98
250
251     @classmethod
252     def from_packet(cls, proto, payload):
253         self = super().from_packet(proto, payload)
254         self.interval = unpack("!H", payload[:2])
255         return self
256
257     def response(self):
258         return super().response(pack("!H", self.interval))
259
260
261 # Build a dict protocol number -> class
262 CLASSES = {}
263 if True:  # just to indent the code, sorry!
264     for cls in [
265         cls
266         for name, cls in globals().items()
267         if isclass(cls)
268         and issubclass(cls, _GT06pkt)
269         and not name.startswith("_")
270     ]:
271         if hasattr(cls, "PROTO"):
272             CLASSES[cls.PROTO] = cls
273
274 def make_object(proto, payload):
275     if proto in CLASSES:
276         return CLASSES[proto].from_packet(proto, payload)
277     else:
278         return UNKNOWN.from_packet(proto, payload)
279
280 def handle_packet(packet, addr, when):
281     if len(packet) < 6:
282         return UNKNOWN.from_packet(0, packet)
283     else:
284         xx, length, proto = unpack("!2sBB", packet[:4])
285         crlf = packet[-2:]
286         payload = packet[4:-2]
287         adjust = 2 if proto == STATUS.PROTO else 4  # Weird special case
288         if length > 1 and len(payload) + adjust != length:
289             log.warning(
290                 "length is %d but payload length is %d", length, len(payload)
291             )
292         if xx != b"xx" or crlf != b"\r\n":
293             return UNKNOWN.from_packet(proto, packet)  # full packet as payload
294         else:
295             return make_object(proto, payload)
296
297
298 def make_response(msg):
299     return msg.response()
300
301
302 def set_config(config):  # Note that we are setting _class_ attribute
303     _GT06pkt.CONFIG = config