]> www.average.org Git - loctrkd.git/blob - gps303/GT06mod.py
31be5d3bc5bbe354a4f2ff5b94b4f88c7d2c9f93
[loctrkd.git] / gps303 / GT06mod.py
1 """
2 Implementation of the protocol used by zx303 GPS+GPRS module
3 Description from https://github.com/tobadia/petGPS/tree/master/resources
4 """
5
6 from datetime import datetime
7 from inspect import isclass
8 from logging import getLogger
9 from struct import pack, unpack
10
11 __all__ = ("handle_packet", "make_response")
12
13 log = getLogger("gps303")
14
15
16 class _GT06pkt:
17     PROTO: int
18
19     def __init__(self, *args, **kwargs):
20         assert len(args) == 0
21         for k, v in kwargs.items():
22             setattr(self, k, v)
23
24     def __repr__(self):
25         return "{}({})".format(
26             self.__class__.__name__,
27             ", ".join(
28                 "{}={}".format(
29                     k,
30                     'bytes.fromhex("{}")'.format(v.hex())
31                     if isinstance(v, bytes)
32                     else v.__repr__(),
33                 )
34                 for k, v in self.__dict__.items()
35                 if not k.startswith("_")
36             ),
37         )
38
39     @classmethod
40     def from_packet(cls, length, proto, payload):
41         adjust = 2 if proto == STATUS.PROTO else 4  # Weird special case
42         if length > 1 and len(payload) + adjust != length:
43             log.warning(
44                 "length is %d but payload length is %d", length, len(payload)
45             )
46         return cls(length=length, proto=proto, payload=payload)
47
48     def response(self, *args):
49         if len(args) == 0:
50             return None
51         assert len(args) == 1 and isinstance(args[0], bytes)
52         payload = args[0]
53         length = len(payload) + 1
54         if length > 6:
55             length -= 6
56         return b"xx" + pack("BB", length, self.proto) + payload + b"\r\n"
57
58
59 class UNKNOWN(_GT06pkt):
60     pass
61
62
63 class LOGIN(_GT06pkt):
64     PROTO = 0x01
65
66     @classmethod
67     def from_packet(cls, length, proto, payload):
68         self = super().from_packet(length, proto, payload)
69         self.imei = payload[:-1].hex()
70         self.ver = unpack("B", payload[-1:])[0]
71         return self
72
73     def response(self):
74         return super().response(b"")
75
76
77 class SUPERVISION(_GT06pkt):
78     PROTO = 0x05
79
80
81 class HEARTBEAT(_GT06pkt):
82     PROTO = 0x08
83
84
85 class _GPS_POSITIONING(_GT06pkt):
86
87     @classmethod
88     def from_packet(cls, length, proto, payload):
89         self = super().from_packet(length, proto, payload)
90         self.dtime = payload[:6]
91         # TODO parse the rest
92         return self
93
94     def response(self):
95         return super().response(self.dtime)
96
97
98 class GPS_POSITIONING(_GPS_POSITIONING):
99     PROTO = 0x10
100
101
102 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
103     PROTO = 0x11
104
105
106 class STATUS(_GT06pkt):
107     PROTO = 0x13
108
109     @classmethod
110     def from_packet(cls, length, proto, payload):
111         self = super().from_packet(length, proto, payload)
112         if len(payload) == 5:
113             self.batt, self.ver, self.intvl, self.signal, _ = unpack(
114                 "BBBBB", payload
115             )
116         elif len(payload) == 4:
117             self.batt, self.ver, self.intvl, _ = unpack("BBBB", payload)
118             self.signal = None
119         return self
120
121
122 class HIBERNATION(_GT06pkt):
123     PROTO = 0x14
124
125
126 class RESET(_GT06pkt):
127     PROTO = 0x15
128
129
130 class WHITELIST_TOTAL(_GT06pkt):
131     PROTO = 0x16
132
133
134 class WIFI_OFFLINE_POSITIONING(_GT06pkt):
135     PROTO = 0x17
136
137
138 class TIME(_GT06pkt):
139     PROTO = 0x30
140
141     def response(self):
142         payload = pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
143         return super().response(payload)
144
145
146 class MOM_PHONE(_GT06pkt):
147     PROTO = 0x43
148
149
150 class STOP_ALARM(_GT06pkt):
151     PROTO = 0x56
152
153
154 class SETUP(_GT06pkt):
155     PROTO = 0x57
156
157     def response(
158         self,
159         uploadIntervalSeconds=0x0300,
160         binarySwitch=0b00110001,
161         alarms=[0, 0, 0],
162         dndTimeSwitch=0,
163         dndTimes=[0, 0, 0],
164         gpsTimeSwitch=0,
165         gpsTimeStart=0,
166         gpsTimeStop=0,
167         phoneNumbers=["", "", ""],
168     ):
169         def pack3b(x):
170             return pack("!I", x)[1:]
171
172         payload = b"".join(
173             [
174                 pack("!H", uploadIntervalSeconds),
175                 pack("B", binarySwitch),
176             ]
177             + [pack3b(el) for el in alarms]
178             + [
179                 pack("B", dndTimeSwitch),
180             ]
181             + [pack3b(el) for el in dndTimes]
182             + [
183                 pack("B", gpsTimeSwitch),
184                 pack("!H", gpsTimeStart),
185                 pack("!H", gpsTimeStop),
186             ]
187             + [b";".join([el.encode() for el in phoneNumbers])]
188         )
189         return super().response(payload)
190
191
192 class SYNCHRONOUS_WHITELIST(_GT06pkt):
193     PROTO = 0x58
194
195
196 class RESTORE_PASSWORD(_GT06pkt):
197     PROTO = 0x67
198
199
200 class WIFI_POSITIONING(_GT06pkt):
201     PROTO = 0x69
202
203
204 class MANUAL_POSITIONING(_GT06pkt):
205     PROTO = 0x80
206
207
208 class BATTERY_CHARGE(_GT06pkt):
209     PROTO = 0x81
210
211
212 class CHARGER_CONNECTED(_GT06pkt):
213     PROTO = 0x82
214
215
216 class CHARGER_DISCONNECTED(_GT06pkt):
217     PROTO = 0x83
218
219
220 class VIBRATION_RECEIVED(_GT06pkt):
221     PROTO = 0x94
222
223
224 class POSITION_UPLOAD_INTERVAL(_GT06pkt):
225     PROTO = 0x98
226
227     @classmethod
228     def from_packet(cls, length, proto, payload):
229         self = super().from_packet(length, proto, payload)
230         self.interval = unpack("!H", payload[:2])
231         return self
232
233     def response(self):
234         return super().response(pack("!H", self.interval))
235
236
237 # Build a dict protocol number -> class
238 CLASSES = {}
239 if True:  # just to indent the code, sorry!
240     for cls in [
241         cls
242         for name, cls in globals().items()
243         if isclass(cls)
244         and issubclass(cls, _GT06pkt)
245         and not name.startswith("_")
246     ]:
247         if hasattr(cls, "PROTO"):
248             CLASSES[cls.PROTO] = cls
249
250
251 def handle_packet(packet, addr, when):
252     if len(packet) < 6:
253         msg = UNKNOWN.from_packet(0, 0, packet)
254     else:
255         xx, length, proto = unpack("!2sBB", packet[:4])
256         crlf = packet[-2:]
257         payload = packet[4:-2]
258         if xx != b"xx" or crlf != b"\r\n" or proto not in CLASSES:
259             msg = UNKNOWN.from_packet(length, proto, packet)
260         else:
261             msg = CLASSES[proto].from_packet(length, proto, payload)
262     return msg
263
264 def make_response(msg):
265     return msg.response()