2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
7 1. There is no security whatsoever. If you know the module's IMEI,
8 you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10 the TCP stream) makes it vulnerable to coincidental appearance
11 of framing bytes in the middle of the message. Most of the time
12 the server will receive one message in one TCP segment (i.e. in
13 one `recv()` operation, but relying on that would break things
14 if the path has lower MTU than the size of a message.
17 from datetime import datetime, timezone
18 from inspect import isclass
19 from logging import getLogger
20 from struct import pack, unpack
35 "GPS_OFFLINE_POSITIONING",
40 "WIFI_OFFLINE_POSITIONING",
45 "SYNCHRONOUS_WHITELIST",
51 "CHARGER_DISCONNECTED",
53 "POSITION_UPLOAD_INTERVAL",
56 log = getLogger("gps303")
63 def __init__(self, *args, **kwargs):
65 for k, v in kwargs.items():
69 return "{}({})".format(
70 self.__class__.__name__,
74 'bytes.fromhex("{}")'.format(v.hex())
75 if isinstance(v, bytes)
78 for k, v in self.__dict__.items()
79 if not k.startswith("_")
84 def from_packet(cls, length, payload):
85 return cls(payload=payload, length=length)
88 return pack("BB", self.length, self.PROTO) + self.payload
91 def response(cls, *args):
94 assert len(args) == 1 and isinstance(args[0], bytes)
96 length = len(payload) + 1
99 return pack("BB", length, cls.PROTO) + payload
102 class UNKNOWN(GPS303Pkt):
103 PROTO = 256 # > 255 is impossible in real packets
106 class LOGIN(GPS303Pkt):
110 def from_packet(cls, length, payload):
111 self = super().from_packet(length, payload)
112 self.imei = payload[:-1].hex()
113 self.ver = unpack("B", payload[-1:])[0]
118 return super().response(b"")
121 class SUPERVISION(GPS303Pkt): # Server sends supervision number status
125 def response(cls, supnum=0):
126 # 1: The device automatically answers Pickup effect
127 # 2: Automatically Answering Two-way Calls
128 # 3: Ring manually answer the two-way call
129 return super().response(b"")
132 class HEARTBEAT(GPS303Pkt):
136 class _GPS_POSITIONING(GPS303Pkt):
138 def from_packet(cls, length, payload):
139 self = super().from_packet(length, payload)
140 self.dtime = payload[:6]
141 if self.dtime == b"\0\0\0\0\0\0":
144 self.devtime = datetime(
145 *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
147 self.gps_data_length = payload[6] >> 4
148 self.gps_nb_sat = payload[6] & 0x0F
149 lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
150 self.gps_is_valid = bool(flags & 0b0001000000000000) # bit 3
151 flip_lon = bool(flags & 0b0000100000000000) # bit 4
152 flip_lat = not bool(flags & 0b0000010000000000) # bit 5
153 self.heading = flags & 0b0000001111111111 # bits 6 - last
154 self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
155 self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
161 return super().response(self.dtime)
164 class GPS_POSITIONING(_GPS_POSITIONING):
168 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
172 class STATUS(GPS303Pkt):
176 def from_packet(cls, length, payload):
177 self = super().from_packet(length, payload)
178 if len(payload) == 5:
185 ) = unpack("BBBBB", payload)
186 elif len(payload) == 4:
187 self.batt, self.ver, self.timezone, self.intvl = unpack(
193 def response(self, upload_interval=25): # Set interval in minutes
194 return super().response(pack("B", upload_interval))
197 class HIBERNATION(GPS303Pkt):
201 class RESET(GPS303Pkt): # Device sends when it got reset SMS
204 def response(self): # Server can send to initiate factory reset
205 return super().response(b"")
208 class WHITELIST_TOTAL(GPS303Pkt): # Server sends to initiage sync (0x58)
211 def response(self, number=3): # Number of whitelist entries
212 return super().response(pack("B", number))
215 class _WIFI_POSITIONING(GPS303Pkt):
217 def from_packet(cls, length, payload):
218 self = super().from_packet(length, payload)
219 self.dtime = payload[:6]
220 if self.dtime == b"\0\0\0\0\0\0":
223 self.devtime = datetime.strptime(
224 self.dtime.hex(), "%y%m%d%H%M%S"
225 ).astimezone(tz=timezone.utc)
227 for i in range(self.length): # length has special meaning here
228 slice = payload[6 + i * 7 : 13 + i * 7]
229 self.wifi_aps.append(
230 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
232 gsm_slice = payload[6 + self.length * 7 :]
233 ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
235 for i in range(ncells):
236 slice = gsm_slice[4 + i * 5 : 9 + i * 5]
237 locac, cellid, sigstr = unpack(
238 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
240 self.gsm_cells.append((locac, cellid, -sigstr))
244 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
248 return super().response(self.dtime)
251 class TIME(GPS303Pkt):
255 payload = pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
256 return super().response(payload)
259 class PROHIBIT_LBS(GPS303Pkt):
262 def response(self, status=1): # Server sent, 0-off, 1-on
263 return super().response(pack("B", status))
266 class MOM_PHONE(GPS303Pkt):
270 class STOP_UPLOAD(GPS303Pkt): # Server response to LOGIN to thwart the device
274 return super().response(b"")
277 class STOP_ALARM(GPS303Pkt):
281 class SETUP(GPS303Pkt):
286 uploadIntervalSeconds=0x0300,
287 binarySwitch=0b00110001,
294 phoneNumbers=["", "", ""],
297 return pack("!I", x)[1:]
301 pack("!H", uploadIntervalSeconds),
302 pack("B", binarySwitch),
304 + [pack3b(el) for el in alarms]
306 pack("B", dndTimeSwitch),
308 + [pack3b(el) for el in dndTimes]
310 pack("B", gpsTimeSwitch),
311 pack("!H", gpsTimeStart),
312 pack("!H", gpsTimeStop),
314 + [b";".join([el.encode() for el in phoneNumbers])]
316 return super().response(payload)
319 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
323 class RESTORE_PASSWORD(GPS303Pkt):
327 class WIFI_POSITIONING(_WIFI_POSITIONING):
330 def response(self, lat=None, lon=None):
331 if lat is None or lon is None:
334 payload = "{:+#010.8g},{:+#010.8g}".format(lat, lon).encode(
337 return super().response(payload)
340 class MANUAL_POSITIONING(GPS303Pkt):
344 class BATTERY_CHARGE(GPS303Pkt):
348 class CHARGER_CONNECTED(GPS303Pkt):
352 class CHARGER_DISCONNECTED(GPS303Pkt):
356 class VIBRATION_RECEIVED(GPS303Pkt):
360 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
364 def from_packet(cls, length, payload):
365 self = super().from_packet(length, payload)
366 self.interval = unpack("!H", payload[:2])
370 return super().response(pack("!H", self.interval))
373 class SOS_ALARM(GPS303Pkt):
377 # Build dicts protocol number -> class and class name -> protocol number
380 if True: # just to indent the code, sorry!
383 for name, cls in globals().items()
385 and issubclass(cls, GPS303Pkt)
386 and not name.startswith("_")
388 if hasattr(cls, "PROTO"):
389 CLASSES[cls.PROTO] = cls
390 PROTOS[cls.__name__] = cls.PROTO
393 def proto_by_name(name):
394 return PROTOS.get(name, -1)
397 def proto_of_message(packet):
398 return unpack("B", packet[1:2])[0]
401 def make_object(length, proto, payload):
403 return CLASSES[proto].from_packet(length, payload)
405 retobj = UNKNOWN.from_packet(length, payload)
406 retobj.PROTO = proto # Override class attr with object attr
410 def parse_message(packet):
411 length, proto = unpack("BB", packet[:2])
413 adjust = 2 if proto == STATUS.PROTO else 4 # Weird special case
416 not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO)
418 and len(payload) + adjust != length
421 "With proto %d length is %d but payload length is %d+%d",
427 return make_object(length, proto, payload)
430 def handle_packet(packet): # DEPRECATED
431 if len(packet) < 6 or packet[:2] != b"xx" or packet[-2:] != b"\r\n":
432 return UNKNOWN.from_packet(len(packet), packet)
433 return parse_message(packet[2:-2])
436 def make_response(msg, **kwargs): # DEPRECATED
437 inframe = msg.response(**kwargs)
438 return None if inframe is None else b"xx" + inframe + b"\r\n"
441 def set_config(config): # Note that we are setting _class_ attribute
442 GPS303Pkt.CONFIG = config