2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
7 1. There is no security whatsoever. If you know the module's IMEI,
8 you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10 the TCP stream) makes it vulnerable to coincidental appearance
11 of framing bytes in the middle of the message. Most of the time
12 the server will receive one message in one TCP segment (i.e. in
13 one `recv()` operation, but relying on that would break things
14 if the path has lower MTU than the size of a message.
17 from datetime import datetime, timezone
19 from inspect import isclass
20 from logging import getLogger
21 from struct import pack, unpack
37 "GPS_OFFLINE_POSITIONING",
42 "WIFI_OFFLINE_POSITIONING",
47 "SYNCHRONOUS_WHITELIST",
53 "CHARGER_DISCONNECTED",
55 "POSITION_UPLOAD_INTERVAL",
58 log = getLogger("gps303")
62 IN = 0 # Incoming, no response needed
63 INLINE = 2 # Birirectional, use `inline_response()`
64 EXT = 3 # Birirectional, use external responder
65 OUT = 4 # Outgoing, should not appear on input
70 DIR = Dir.INLINE # Most packets anticipate simple acknowledgement
72 def __init__(self, *args, **kwargs):
74 for k, v in kwargs.items():
78 return "{}({})".format(
79 self.__class__.__name__,
83 'bytes.fromhex("{}")'.format(v.hex())
84 if isinstance(v, bytes)
87 for k, v in self.__dict__.items()
88 if not k.startswith("_")
93 def from_packet(cls, length, payload):
94 return cls(payload=payload, length=length)
97 return pack("BB", self.length, self.PROTO) + self.payload
100 def make_packet(cls, payload):
101 assert isinstance(payload, bytes)
102 length = len(payload) + 1 # plus proto byte
105 return pack("BB", length, cls.PROTO) + payload
108 def inline_response(cls, packet):
109 if cls.DIR is Dir.INLINE:
110 return cls.make_packet(b"")
115 class UNKNOWN(GPS303Pkt):
116 PROTO = 256 # > 255 is impossible in real packets
120 class LOGIN(GPS303Pkt):
122 # Default response for ACK, can also respond with STOP_UPLOAD
125 def from_packet(cls, length, payload):
126 self = super().from_packet(length, payload)
127 self.imei = payload[:-1].hex()
128 self.ver = unpack("B", payload[-1:])[0]
132 class SUPERVISION(GPS303Pkt):
137 def response(cls, status=0):
138 # 1: The device automatically answers Pickup effect
139 # 2: Automatically Answering Two-way Calls
140 # 3: Ring manually answer the two-way call
141 return cls.make_packet(pack("B", status))
144 class HEARTBEAT(GPS303Pkt):
148 class _GPS_POSITIONING(GPS303Pkt):
150 def from_packet(cls, length, payload):
151 self = super().from_packet(length, payload)
152 self.dtime = payload[:6]
153 if self.dtime == b"\0\0\0\0\0\0":
156 self.devtime = datetime(
157 *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
159 self.gps_data_length = payload[6] >> 4
160 self.gps_nb_sat = payload[6] & 0x0F
161 lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
162 self.gps_is_valid = bool(flags & 0b0001000000000000) # bit 3
163 flip_lon = bool(flags & 0b0000100000000000) # bit 4
164 flip_lat = not bool(flags & 0b0000010000000000) # bit 5
165 self.heading = flags & 0b0000001111111111 # bits 6 - last
166 self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
167 self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
173 def inline_response(cls, packet):
174 tup = datetime.utcnow().timetuple()
175 ttup = (tup[0] % 100,) + tup[1:6]
176 return cls.make_packet(pack("BBBBBB", *ttup))
179 class GPS_POSITIONING(_GPS_POSITIONING):
183 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
187 class STATUS(GPS303Pkt):
192 def from_packet(cls, length, payload):
193 self = super().from_packet(length, payload)
194 if len(payload) == 5:
201 ) = unpack("BBBBB", payload)
202 elif len(payload) == 4:
203 self.batt, self.ver, self.timezone, self.intvl = unpack(
210 def response(cls, upload_interval=25): # Set interval in minutes
211 return cls.make_packet(pack("B", upload_interval))
214 class HIBERNATION(GPS303Pkt):
219 def response(cls): # Server can send to send devicee to sleep
220 return cls.make_packet(b"")
223 class RESET(GPS303Pkt): # Device sends when it got reset SMS
228 def response(cls): # Server can send to initiate factory reset
229 return cls.make_packet(b"")
232 class WHITELIST_TOTAL(GPS303Pkt): # Server sends to initiage sync (0x58)
237 def response(cls, number=3): # Number of whitelist entries
238 return cls.make_packet(pack("B", number))
241 class _WIFI_POSITIONING(GPS303Pkt):
243 def from_packet(cls, length, payload):
244 self = super().from_packet(length, payload)
245 self.dtime = payload[:6]
246 if self.dtime == b"\0\0\0\0\0\0":
249 self.devtime = datetime.strptime(
250 self.dtime.hex(), "%y%m%d%H%M%S"
251 ).astimezone(tz=timezone.utc)
253 for i in range(self.length): # length has special meaning here
254 slice = payload[6 + i * 7 : 13 + i * 7]
255 self.wifi_aps.append(
256 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
258 gsm_slice = payload[6 + self.length * 7 :]
259 ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
261 for i in range(ncells):
262 slice = gsm_slice[4 + i * 5 : 9 + i * 5]
263 locac, cellid, sigstr = unpack(
264 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
266 self.gsm_cells.append((locac, cellid, -sigstr))
270 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
274 def inline_response(cls, packet):
275 return cls.make_packet(
276 bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
280 class TIME(GPS303Pkt):
284 def inline_response(cls, packet):
285 return cls.make_packet(
286 pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
290 class PROHIBIT_LBS(GPS303Pkt):
295 def response(cls, status=1): # Server sent, 0-off, 1-on
296 return cls.make_packet(pack("B", status))
299 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
305 # Data is in packed decimal
307 # 00/01 - Don't set / Set upload period
308 # HHMMHHMM - Upload period
310 # 00/01 - Don't set / Set time of boot
311 # HHMM - Time of boot
312 # 00/01 - Don't set / Set time of shutdown
313 # HHMM - Time of shutdown
314 return cls.make_packet(b"") # TODO
317 class _SET_PHONE(GPS303Pkt):
321 def response(cls, phone):
322 return cls.make_packet(phone.encode())
325 class REMOTE_MONITOR_PHONE(_SET_PHONE):
329 class SOS_PHONE(_SET_PHONE):
333 class DAD_PHONE(_SET_PHONE):
337 class MOM_PHONE(_SET_PHONE):
341 class STOP_UPLOAD(GPS303Pkt): # Server response to LOGIN to thwart the device
347 return cls.make_packet(b"")
350 class GPS_OFF_PERIOD(GPS303Pkt):
355 def response(cls, onoff=0, fm="0000", to="2359"):
356 return cls.make_packet(
357 pack("B", onoff) + bytes.fromhex(fm) + bytes.fromhex(to)
361 class DND_PERIOD(GPS303Pkt):
367 cls, onoff=0, week=3, fm1="0000", to1="2359", fm2="0000", to2="2359"
369 return cls.make_packet(
379 class RESTART_SHUTDOWN(GPS303Pkt):
384 def response(cls, flag=2):
387 return cls.make_packet(pack("B", flag))
390 class DEVICE(GPS303Pkt):
395 def response(cls, flag=0):
396 # 0 - Stop looking for equipment
397 # 1 - Start looking for equipment
398 return cls.make_packet(pack("B", flag))
401 class ALARM_CLOCK(GPS303Pkt):
406 def response(cls, alarms=((0, "0000"), (0, "0000"), (0, "0000"))):
408 cls("B", day) + bytes.fromhex(tm) for day, tm in alarms
412 class STOP_ALARM(GPS303Pkt):
416 def from_packet(cls, length, payload):
417 self = super().from_packet(length, payload)
418 self.flag = payload[0]
421 class SETUP(GPS303Pkt):
428 uploadintervalseconds=0x0300,
429 binaryswitch=0b00110001,
436 phonenumbers=["", "", ""],
439 return pack("!I", x)[1:]
443 pack("!H", uploadintervalseconds),
444 pack("B", binaryswitch),
446 + [pack3b(el) for el in alarms]
448 pack("B", dndtimeswitch),
450 + [pack3b(el) for el in dndtimes]
452 pack("B", gpstimeswitch),
453 pack("!H", gpstimestart),
454 pack("!H", gpstimestop),
456 + [b";".join([el.encode() for el in phonenumbers])]
458 return cls.make_packet(payload)
461 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
465 class RESTORE_PASSWORD(GPS303Pkt):
469 class WIFI_POSITIONING(_WIFI_POSITIONING):
474 def response(cls, lat=None, lon=None):
475 if lat is None or lon is None:
478 payload = "{:+#010.8g},{:+#010.8g}".format(lat, lon).encode(
481 return cls.make_packet(payload)
484 class MANUAL_POSITIONING(GPS303Pkt):
489 def from_packet(cls, length, payload):
490 self = super().from_packet(length, payload)
491 self.flag = payload[0]
496 4: "LBS search > 3 times",
497 5: "Same LBS and WiFi data",
498 6: "LBS prohibited, WiFi absent",
499 7: "GPS spacing < 50 m",
500 }.get(self.flag, "Unknown")
504 return cls.make_packet(b"")
507 class BATTERY_CHARGE(GPS303Pkt):
511 class CHARGER_CONNECTED(GPS303Pkt):
515 class CHARGER_DISCONNECTED(GPS303Pkt):
519 class VIBRATION_RECEIVED(GPS303Pkt):
523 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
528 def from_packet(cls, length, payload):
529 self = super().from_packet(length, payload)
530 self.interval = unpack("!H", payload[:2])
534 def response(cls, interval=10):
535 return cls.make_packet(pack("!H", interval))
538 class SOS_ALARM(GPS303Pkt):
542 # Build dicts protocol number -> class and class name -> protocol number
545 if True: # just to indent the code, sorry!
548 for name, cls in globals().items()
550 and issubclass(cls, GPS303Pkt)
551 and not name.startswith("_")
553 if hasattr(cls, "PROTO"):
554 CLASSES[cls.PROTO] = cls
555 PROTOS[cls.__name__] = cls.PROTO
558 def class_by_prefix(prefix):
559 lst = [(name, proto) for name, proto in PROTOS.items()
560 if name.upper().startswith(prefix.upper())]
564 return CLASSES[proto]
567 def proto_by_name(name):
568 return PROTOS.get(name, -1)
571 def proto_of_message(packet):
572 return unpack("B", packet[1:2])[0]
575 def inline_response(packet):
576 proto = proto_of_message(packet)
578 return CLASSES[proto].inline_response(packet)
583 def make_object(length, proto, payload):
585 return CLASSES[proto].from_packet(length, payload)
587 retobj = UNKNOWN.from_packet(length, payload)
588 retobj.PROTO = proto # Override class attr with object attr
592 def parse_message(packet):
593 length, proto = unpack("BB", packet[:2])
595 adjust = 2 if proto == STATUS.PROTO else 4 # Weird special case
597 proto not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO)
599 and len(payload) + adjust != length
602 "With proto %d length is %d but payload length is %d+%d",
608 return make_object(length, proto, payload)
611 def handle_packet(packet): # DEPRECATED
612 if len(packet) < 6 or packet[:2] != b"xx" or packet[-2:] != b"\r\n":
613 return UNKNOWN.from_packet(len(packet), packet)
614 return parse_message(packet[2:-2])
617 def make_response(msg, **kwargs): # DEPRECATED
618 inframe = msg.response(**kwargs)
619 return None if inframe is None else b"xx" + inframe + b"\r\n"