2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
7 1. There is no security whatsoever. If you know the module's IMEI,
8 you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10 the TCP stream) makes it vulnerable to coincidental appearance
11 of framing bytes in the middle of the message. Most of the time
12 the server will receive one message in one TCP segment (i.e. in
13 one `recv()` operation, but relying on that would break things
14 if the path has lower MTU than the size of a message.
17 from datetime import datetime, timezone
18 from inspect import isclass
19 from logging import getLogger
20 from struct import pack, unpack
34 "GPS_OFFLINE_POSITIONING",
39 "WIFI_OFFLINE_POSITIONING",
44 "SYNCHRONOUS_WHITELIST",
50 "CHARGER_DISCONNECTED",
52 "POSITION_UPLOAD_INTERVAL",
55 log = getLogger("gps303")
61 def __init__(self, *args, **kwargs):
63 for k, v in kwargs.items():
67 return "{}({})".format(
68 self.__class__.__name__,
72 'bytes.fromhex("{}")'.format(v.hex())
73 if isinstance(v, bytes)
76 for k, v in self.__dict__.items()
77 if not k.startswith("_")
82 def from_packet(cls, length, payload):
83 return cls(payload=payload, length=length)
86 return pack("BB", self.length, self.PROTO) + self.payload
89 def response(cls, *args):
92 assert len(args) == 1 and isinstance(args[0], bytes)
94 length = len(payload) + 1
97 return pack("BB", length, cls.PROTO) + payload
100 class UNKNOWN(GPS303Pkt):
101 PROTO = 256 # > 255 is impossible in real packets
104 class LOGIN(GPS303Pkt):
108 def from_packet(cls, length, payload):
109 self = super().from_packet(length, payload)
110 self.imei = payload[:-1].hex()
111 self.ver = unpack("B", payload[-1:])[0]
116 return super().response(b"")
119 class SUPERVISION(GPS303Pkt): # Server sends supervision number status
123 def response(cls, supnum=0):
124 # 1: The device automatically answers Pickup effect
125 # 2: Automatically Answering Two-way Calls
126 # 3: Ring manually answer the two-way call
127 return super().response(b"")
130 class HEARTBEAT(GPS303Pkt):
134 class _GPS_POSITIONING(GPS303Pkt):
136 def from_packet(cls, length, payload):
137 self = super().from_packet(length, payload)
138 self.dtime = payload[:6]
139 if self.dtime == b"\0\0\0\0\0\0":
142 self.devtime = datetime(
143 *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
145 self.gps_data_length = payload[6] >> 4
146 self.gps_nb_sat = payload[6] & 0x0F
147 lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
148 self.gps_is_valid = bool(flags & 0b0001000000000000) # bit 3
149 flip_lon = bool(flags & 0b0000100000000000) # bit 4
150 flip_lat = not bool(flags & 0b0000010000000000) # bit 5
151 self.heading = flags & 0b0000001111111111 # bits 6 - last
152 self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
153 self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
159 return super().response(self.dtime)
162 class GPS_POSITIONING(_GPS_POSITIONING):
166 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
170 class STATUS(GPS303Pkt):
174 def from_packet(cls, length, payload):
175 self = super().from_packet(length, payload)
176 if len(payload) == 5:
183 ) = unpack("BBBBB", payload)
184 elif len(payload) == 4:
185 self.batt, self.ver, self.timezone, self.intvl = unpack(
191 def response(self, upload_interval=25): # Set interval in minutes
192 return super().response(pack("B", upload_interval))
195 class HIBERNATION(GPS303Pkt):
199 class RESET(GPS303Pkt): # Device sends when it got reset SMS
202 def response(self): # Server can send to initiate factory reset
203 return super().response(b"")
206 class WHITELIST_TOTAL(GPS303Pkt): # Server sends to initiage sync (0x58)
209 def response(self, number=3): # Number of whitelist entries
210 return super().response(pack("B", number))
213 class _WIFI_POSITIONING(GPS303Pkt):
215 def from_packet(cls, length, payload):
216 self = super().from_packet(length, payload)
217 self.dtime = payload[:6]
218 if self.dtime == b"\0\0\0\0\0\0":
221 self.devtime = datetime.strptime(
222 self.dtime.hex(), "%y%m%d%H%M%S"
223 ).astimezone(tz=timezone.utc)
225 for i in range(self.length): # length has special meaning here
226 slice = payload[6 + i * 7 : 13 + i * 7]
227 self.wifi_aps.append(
228 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
230 gsm_slice = payload[6 + self.length * 7 :]
231 ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
233 for i in range(ncells):
234 slice = gsm_slice[4 + i * 5 : 9 + i * 5]
235 locac, cellid, sigstr = unpack(
236 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
238 self.gsm_cells.append((locac, cellid, -sigstr))
242 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
246 return super().response(self.dtime)
249 class TIME(GPS303Pkt):
253 payload = pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
254 return super().response(payload)
257 class PROHIBIT_LBS(GPS303Pkt):
260 def response(self, status=1): # Server sent, 0-off, 1-on
261 return super().response(pack("B", status))
264 class MOM_PHONE(GPS303Pkt):
268 class STOP_UPLOAD(GPS303Pkt): # Server response to LOGIN to thwart the device
272 return super().response(b"")
275 class STOP_ALARM(GPS303Pkt):
279 class SETUP(GPS303Pkt):
284 uploadIntervalSeconds=0x0300,
285 binarySwitch=0b00110001,
292 phoneNumbers=["", "", ""],
295 return pack("!I", x)[1:]
299 pack("!H", uploadIntervalSeconds),
300 pack("B", binarySwitch),
302 + [pack3b(el) for el in alarms]
304 pack("B", dndTimeSwitch),
306 + [pack3b(el) for el in dndTimes]
308 pack("B", gpsTimeSwitch),
309 pack("!H", gpsTimeStart),
310 pack("!H", gpsTimeStop),
312 + [b";".join([el.encode() for el in phoneNumbers])]
314 return super().response(payload)
317 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
321 class RESTORE_PASSWORD(GPS303Pkt):
325 class WIFI_POSITIONING(_WIFI_POSITIONING):
328 def response(self, lat=None, lon=None):
329 if lat is None or lon is None:
332 payload = "{:+#010.8g},{:+#010.8g}".format(lat, lon).encode(
335 return super().response(payload)
338 class MANUAL_POSITIONING(GPS303Pkt):
342 class BATTERY_CHARGE(GPS303Pkt):
346 class CHARGER_CONNECTED(GPS303Pkt):
350 class CHARGER_DISCONNECTED(GPS303Pkt):
354 class VIBRATION_RECEIVED(GPS303Pkt):
358 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
362 def from_packet(cls, length, payload):
363 self = super().from_packet(length, payload)
364 self.interval = unpack("!H", payload[:2])
368 return super().response(pack("!H", self.interval))
371 class SOS_ALARM(GPS303Pkt):
375 # Build dicts protocol number -> class and class name -> protocol number
378 if True: # just to indent the code, sorry!
381 for name, cls in globals().items()
383 and issubclass(cls, GPS303Pkt)
384 and not name.startswith("_")
386 if hasattr(cls, "PROTO"):
387 CLASSES[cls.PROTO] = cls
388 PROTOS[cls.__name__] = cls.PROTO
391 def proto_by_name(name):
392 return PROTOS.get(name, -1)
395 def proto_of_message(packet):
396 return unpack("B", packet[1:2])[0]
399 def make_object(length, proto, payload):
401 return CLASSES[proto].from_packet(length, payload)
403 retobj = UNKNOWN.from_packet(length, payload)
404 retobj.PROTO = proto # Override class attr with object attr
408 def parse_message(packet):
409 length, proto = unpack("BB", packet[:2])
411 adjust = 2 if proto == STATUS.PROTO else 4 # Weird special case
414 not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO)
416 and len(payload) + adjust != length
419 "With proto %d length is %d but payload length is %d+%d",
425 return make_object(length, proto, payload)
428 def handle_packet(packet): # DEPRECATED
429 if len(packet) < 6 or packet[:2] != b"xx" or packet[-2:] != b"\r\n":
430 return UNKNOWN.from_packet(len(packet), packet)
431 return parse_message(packet[2:-2])
434 def make_response(msg, **kwargs): # DEPRECATED
435 inframe = msg.response(**kwargs)
436 return None if inframe is None else b"xx" + inframe + b"\r\n"