2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
7 1. There is no security whatsoever. If you know the module's IMEI,
8 you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10 the TCP stream) makes it vulnerable to coincidental appearance
11 of framing bytes in the middle of the message. Most of the time
12 the server will receive one message in one TCP segment (i.e. in
13 one `recv()` operation, but relying on that would break things
14 if the path has lower MTU than the size of a message.
17 from datetime import datetime, timezone
18 from inspect import isclass
19 from logging import getLogger
20 from struct import pack, unpack
35 "GPS_OFFLINE_POSITIONING",
40 "WIFI_OFFLINE_POSITIONING",
45 "SYNCHRONOUS_WHITELIST",
51 "CHARGER_DISCONNECTED",
53 "POSITION_UPLOAD_INTERVAL",
56 log = getLogger("gps303")
63 def __init__(self, *args, **kwargs):
65 for k, v in kwargs.items():
69 return "{}({})".format(
70 self.__class__.__name__,
74 'bytes.fromhex("{}")'.format(v.hex())
75 if isinstance(v, bytes)
78 for k, v in self.__dict__.items()
79 if not k.startswith("_")
84 def from_packet(cls, length, payload):
85 return cls(payload=payload, length=length)
88 return pack("BB", self.length, self.PROTO) + self.payload
91 def make_packet(cls, payload):
92 assert isinstance(payload, bytes)
93 length = len(payload) + 1
96 return pack("BB", length, cls.PROTO) + payload
99 def inline_response(cls, packet):
101 return cls.make_packet(b"")
106 class UNKNOWN(GPS303Pkt):
107 PROTO = 256 # > 255 is impossible in real packets
111 class LOGIN(GPS303Pkt):
115 def from_packet(cls, length, payload):
116 self = super().from_packet(length, payload)
117 self.imei = payload[:-1].hex()
118 self.ver = unpack("B", payload[-1:])[0]
122 class SUPERVISION(GPS303Pkt): # Server sends supervision number status
126 def response(self, supnum=0):
127 # 1: The device automatically answers Pickup effect
128 # 2: Automatically Answering Two-way Calls
129 # 3: Ring manually answer the two-way call
130 return self.make_packet(pack("B", supnum))
133 class HEARTBEAT(GPS303Pkt):
137 class _GPS_POSITIONING(GPS303Pkt):
139 def from_packet(cls, length, payload):
140 self = super().from_packet(length, payload)
141 self.dtime = payload[:6]
142 if self.dtime == b"\0\0\0\0\0\0":
145 self.devtime = datetime(
146 *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
148 self.gps_data_length = payload[6] >> 4
149 self.gps_nb_sat = payload[6] & 0x0F
150 lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
151 self.gps_is_valid = bool(flags & 0b0001000000000000) # bit 3
152 flip_lon = bool(flags & 0b0000100000000000) # bit 4
153 flip_lat = not bool(flags & 0b0000010000000000) # bit 5
154 self.heading = flags & 0b0000001111111111 # bits 6 - last
155 self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
156 self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
162 def inline_response(cls, packet):
163 return cls.make_packet(packet[2:8])
166 class GPS_POSITIONING(_GPS_POSITIONING):
170 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
174 class STATUS(GPS303Pkt):
179 def from_packet(cls, length, payload):
180 self = super().from_packet(length, payload)
181 if len(payload) == 5:
188 ) = unpack("BBBBB", payload)
189 elif len(payload) == 4:
190 self.batt, self.ver, self.timezone, self.intvl = unpack(
196 def response(self, upload_interval=25): # Set interval in minutes
197 return self.make_packet(pack("B", upload_interval))
200 class HIBERNATION(GPS303Pkt):
204 class RESET(GPS303Pkt): # Device sends when it got reset SMS
208 def response(self): # Server can send to initiate factory reset
209 return self.make_packet(b"")
212 class WHITELIST_TOTAL(GPS303Pkt): # Server sends to initiage sync (0x58)
216 def response(self, number=3): # Number of whitelist entries
217 return self.make_packet(pack("B", number))
220 class _WIFI_POSITIONING(GPS303Pkt):
222 def from_packet(cls, length, payload):
223 self = super().from_packet(length, payload)
224 self.dtime = payload[:6]
225 if self.dtime == b"\0\0\0\0\0\0":
228 self.devtime = datetime.strptime(
229 self.dtime.hex(), "%y%m%d%H%M%S"
230 ).astimezone(tz=timezone.utc)
232 for i in range(self.length): # length has special meaning here
233 slice = payload[6 + i * 7 : 13 + i * 7]
234 self.wifi_aps.append(
235 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
237 gsm_slice = payload[6 + self.length * 7 :]
238 ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
240 for i in range(ncells):
241 slice = gsm_slice[4 + i * 5 : 9 + i * 5]
242 locac, cellid, sigstr = unpack(
243 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
245 self.gsm_cells.append((locac, cellid, -sigstr))
249 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
253 def inline_response(cls, packet):
254 return cls.make_packet(packet[2:8])
257 class TIME(GPS303Pkt):
261 def inline_response(cls, packet):
263 "!BBHBBBBB", 7, cls.PROTO, *datetime.utcnow().timetuple()[:6]
267 class PROHIBIT_LBS(GPS303Pkt):
271 def response(self, status=1): # Server sent, 0-off, 1-on
272 return self.make_packet(pack("B", status))
275 class MOM_PHONE(GPS303Pkt):
279 class STOP_UPLOAD(GPS303Pkt): # Server response to LOGIN to thwart the device
283 class STOP_ALARM(GPS303Pkt):
287 class SETUP(GPS303Pkt):
293 uploadIntervalSeconds=0x0300,
294 binarySwitch=0b00110001,
301 phoneNumbers=["", "", ""],
304 return pack("!I", x)[1:]
308 pack("!H", uploadIntervalSeconds),
309 pack("B", binarySwitch),
311 + [pack3b(el) for el in alarms]
313 pack("B", dndTimeSwitch),
315 + [pack3b(el) for el in dndTimes]
317 pack("B", gpsTimeSwitch),
318 pack("!H", gpsTimeStart),
319 pack("!H", gpsTimeStop),
321 + [b";".join([el.encode() for el in phoneNumbers])]
323 return self.make_packet(payload)
326 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
330 class RESTORE_PASSWORD(GPS303Pkt):
334 class WIFI_POSITIONING(_WIFI_POSITIONING):
338 def response(self, lat=None, lon=None):
339 if lat is None or lon is None:
342 payload = "{:+#010.8g},{:+#010.8g}".format(lat, lon).encode(
345 return self.make_packet(payload)
348 class MANUAL_POSITIONING(GPS303Pkt):
353 class BATTERY_CHARGE(GPS303Pkt):
357 class CHARGER_CONNECTED(GPS303Pkt):
361 class CHARGER_DISCONNECTED(GPS303Pkt):
365 class VIBRATION_RECEIVED(GPS303Pkt):
369 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
374 def from_packet(cls, length, payload):
375 self = super().from_packet(length, payload)
376 self.interval = unpack("!H", payload[:2])
379 def response(self, interval=10):
380 return self.make_packet(pack("!H", interval))
383 class SOS_ALARM(GPS303Pkt):
387 # Build dicts protocol number -> class and class name -> protocol number
390 if True: # just to indent the code, sorry!
393 for name, cls in globals().items()
395 and issubclass(cls, GPS303Pkt)
396 and not name.startswith("_")
398 if hasattr(cls, "PROTO"):
399 CLASSES[cls.PROTO] = cls
400 PROTOS[cls.__name__] = cls.PROTO
403 def proto_by_name(name):
404 return PROTOS.get(name, -1)
407 def proto_of_message(packet):
408 return unpack("B", packet[1:2])[0]
411 def inline_response(packet):
412 proto = proto_of_message(packet)
414 return CLASSES[proto].inline_response(packet)
419 def make_object(length, proto, payload):
421 return CLASSES[proto].from_packet(length, payload)
423 retobj = UNKNOWN.from_packet(length, payload)
424 retobj.PROTO = proto # Override class attr with object attr
428 def parse_message(packet):
429 length, proto = unpack("BB", packet[:2])
431 adjust = 2 if proto == STATUS.PROTO else 4 # Weird special case
433 proto not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO)
435 and len(payload) + adjust != length
438 "With proto %d length is %d but payload length is %d+%d",
444 return make_object(length, proto, payload)
447 def handle_packet(packet): # DEPRECATED
448 if len(packet) < 6 or packet[:2] != b"xx" or packet[-2:] != b"\r\n":
449 return UNKNOWN.from_packet(len(packet), packet)
450 return parse_message(packet[2:-2])
453 def make_response(msg, **kwargs): # DEPRECATED
454 inframe = msg.response(**kwargs)
455 return None if inframe is None else b"xx" + inframe + b"\r\n"