]> www.average.org Git - loctrkd.git/blob - gps303/GT06mod.py
3256d6ca5ae4f9e61fde001c3b3e8891b1e8fc74
[loctrkd.git] / gps303 / GT06mod.py
1 """
2 Implementation of the protocol used by zx303 GPS+GPRS module
3 Description from https://github.com/tobadia/petGPS/tree/master/resources
4 """
5
6 from datetime import datetime
7 from inspect import isclass
8 from logging import getLogger
9 from struct import pack, unpack
10
11 __all__ = ("handle_packet", "make_response")
12
13 log = getLogger("gps303")
14
15
16 class _GT06pkt:
17     PROTO: int
18     CONFIG = None
19
20     def __init__(self, *args, **kwargs):
21         assert len(args) == 0
22         for k, v in kwargs.items():
23             setattr(self, k, v)
24
25     def __repr__(self):
26         return "{}({})".format(
27             self.__class__.__name__,
28             ", ".join(
29                 "{}={}".format(
30                     k,
31                     'bytes.fromhex("{}")'.format(v.hex())
32                     if isinstance(v, bytes)
33                     else v.__repr__(),
34                 )
35                 for k, v in self.__dict__.items()
36                 if not k.startswith("_")
37             ),
38         )
39
40     @classmethod
41     def from_packet(cls, length, proto, payload):
42         adjust = 2 if proto == STATUS.PROTO else 4  # Weird special case
43         if length > 1 and len(payload) + adjust != length:
44             log.warning(
45                 "length is %d but payload length is %d", length, len(payload)
46             )
47         return cls(length=length, proto=proto, payload=payload)
48
49     def response(self, *args):
50         if len(args) == 0:
51             return None
52         assert len(args) == 1 and isinstance(args[0], bytes)
53         payload = args[0]
54         length = len(payload) + 1
55         if length > 6:
56             length -= 6
57         return b"xx" + pack("BB", length, self.proto) + payload + b"\r\n"
58
59
60 class UNKNOWN(_GT06pkt):
61     pass
62
63
64 class LOGIN(_GT06pkt):
65     PROTO = 0x01
66
67     @classmethod
68     def from_packet(cls, length, proto, payload):
69         self = super().from_packet(length, proto, payload)
70         self.imei = payload[:-1].hex()
71         self.ver = unpack("B", payload[-1:])[0]
72         return self
73
74     def response(self):
75         return super().response(b"")
76
77
78 class SUPERVISION(_GT06pkt):
79     PROTO = 0x05
80
81
82 class HEARTBEAT(_GT06pkt):
83     PROTO = 0x08
84
85
86 class _GPS_POSITIONING(_GT06pkt):
87
88     @classmethod
89     def from_packet(cls, length, proto, payload):
90         self = super().from_packet(length, proto, payload)
91         self.dtime = payload[:6]
92         # TODO parse the rest
93         return self
94
95     def response(self):
96         return super().response(self.dtime)
97
98
99 class GPS_POSITIONING(_GPS_POSITIONING):
100     PROTO = 0x10
101
102
103 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
104     PROTO = 0x11
105
106
107 class STATUS(_GT06pkt):
108     PROTO = 0x13
109
110     @classmethod
111     def from_packet(cls, length, proto, payload):
112         self = super().from_packet(length, proto, payload)
113         if len(payload) == 5:
114             self.batt, self.ver, self.intvl, self.signal, _ = unpack(
115                 "BBBBB", payload
116             )
117         elif len(payload) == 4:
118             self.batt, self.ver, self.intvl, _ = unpack("BBBB", payload)
119             self.signal = None
120         return self
121
122
123 class HIBERNATION(_GT06pkt):
124     PROTO = 0x14
125
126
127 class RESET(_GT06pkt):
128     PROTO = 0x15
129
130
131 class WHITELIST_TOTAL(_GT06pkt):
132     PROTO = 0x16
133
134
135 class WIFI_OFFLINE_POSITIONING(_GT06pkt):
136     PROTO = 0x17
137
138
139 class TIME(_GT06pkt):
140     PROTO = 0x30
141
142     def response(self):
143         payload = pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
144         return super().response(payload)
145
146
147 class MOM_PHONE(_GT06pkt):
148     PROTO = 0x43
149
150
151 class STOP_ALARM(_GT06pkt):
152     PROTO = 0x56
153
154
155 class SETUP(_GT06pkt):
156     PROTO = 0x57
157
158     def response(
159         self,
160         uploadIntervalSeconds=0x0300,
161         binarySwitch=0b00110001,
162         alarms=[0, 0, 0],
163         dndTimeSwitch=0,
164         dndTimes=[0, 0, 0],
165         gpsTimeSwitch=0,
166         gpsTimeStart=0,
167         gpsTimeStop=0,
168         phoneNumbers=["", "", ""],
169     ):
170         def pack3b(x):
171             return pack("!I", x)[1:]
172
173         payload = b"".join(
174             [
175                 pack("!H", uploadIntervalSeconds),
176                 pack("B", binarySwitch),
177             ]
178             + [pack3b(el) for el in alarms]
179             + [
180                 pack("B", dndTimeSwitch),
181             ]
182             + [pack3b(el) for el in dndTimes]
183             + [
184                 pack("B", gpsTimeSwitch),
185                 pack("!H", gpsTimeStart),
186                 pack("!H", gpsTimeStop),
187             ]
188             + [b";".join([el.encode() for el in phoneNumbers])]
189         )
190         return super().response(payload)
191
192
193 class SYNCHRONOUS_WHITELIST(_GT06pkt):
194     PROTO = 0x58
195
196
197 class RESTORE_PASSWORD(_GT06pkt):
198     PROTO = 0x67
199
200
201 class WIFI_POSITIONING(_GT06pkt):
202     PROTO = 0x69
203
204
205 class MANUAL_POSITIONING(_GT06pkt):
206     PROTO = 0x80
207
208
209 class BATTERY_CHARGE(_GT06pkt):
210     PROTO = 0x81
211
212
213 class CHARGER_CONNECTED(_GT06pkt):
214     PROTO = 0x82
215
216
217 class CHARGER_DISCONNECTED(_GT06pkt):
218     PROTO = 0x83
219
220
221 class VIBRATION_RECEIVED(_GT06pkt):
222     PROTO = 0x94
223
224
225 class POSITION_UPLOAD_INTERVAL(_GT06pkt):
226     PROTO = 0x98
227
228     @classmethod
229     def from_packet(cls, length, proto, payload):
230         self = super().from_packet(length, proto, payload)
231         self.interval = unpack("!H", payload[:2])
232         return self
233
234     def response(self):
235         return super().response(pack("!H", self.interval))
236
237
238 # Build a dict protocol number -> class
239 CLASSES = {}
240 if True:  # just to indent the code, sorry!
241     for cls in [
242         cls
243         for name, cls in globals().items()
244         if isclass(cls)
245         and issubclass(cls, _GT06pkt)
246         and not name.startswith("_")
247     ]:
248         if hasattr(cls, "PROTO"):
249             CLASSES[cls.PROTO] = cls
250
251
252 def handle_packet(packet, addr, when):
253     if len(packet) < 6:
254         msg = UNKNOWN.from_packet(0, 0, packet)
255     else:
256         xx, length, proto = unpack("!2sBB", packet[:4])
257         crlf = packet[-2:]
258         payload = packet[4:-2]
259         if xx != b"xx" or crlf != b"\r\n" or proto not in CLASSES:
260             msg = UNKNOWN.from_packet(length, proto, packet)
261         else:
262             msg = CLASSES[proto].from_packet(length, proto, payload)
263     return msg
264
265 def make_response(msg):
266     return msg.response()
267
268 def set_config(config):  # Note that we are setting _class_ attribute
269     _GT06pkt.CONFIG = config