2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
7 1. There is no security whatsoever. If you know the module's IMEI,
8 you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10 the TCP stream) makes it vulnerable to coincidental appearance
11 of framing bytes in the middle of the message. Most of the time
12 the server will receive one message in one TCP segment (i.e. in
13 one `recv()` operation, but relying on that would break things
14 if the path has lower MTU than the size of a message.
17 from datetime import datetime, timezone
19 from inspect import isclass
20 from struct import error, pack, unpack
34 from .protomodule import ProtoClass
54 "GPS_OFFLINE_POSITIONING",
59 "WIFI_OFFLINE_POSITIONING",
62 "GPS_LBS_SWITCH_TIMES",
63 "REMOTE_MONITOR_PHONE",
75 "SYNCHRONOUS_WHITELIST",
81 "CHARGER_DISCONNECTED",
83 "POSITION_UPLOAD_INTERVAL",
96 def __init__(self) -> None:
99 def recv(self, segment: bytes) -> List[Union[bytes, str]]:
101 Process next segment of the stream. Return successfully deframed
102 packets as `bytes` and error messages as `str`.
105 self.buffer += segment
106 if len(self.buffer) > MAXBUFFER:
107 # We are receiving junk. Let's drop it or we run out of memory.
109 return [f"More than {MAXBUFFER} unparseable data, dropping"]
110 msgs: List[Union[bytes, str]] = []
112 framestart = self.buffer.find(b"xx")
113 if framestart == -1: # No frames, return whatever we have
115 if framestart > 0: # Should not happen, report
117 f'Undecodable data ({framestart}) "{self.buffer[:framestart][:64].hex()}"'
119 self.buffer = self.buffer[framestart:]
120 # At this point, buffer starts with a packet
121 if len(self.buffer) < 6: # no len and proto - cannot proceed
123 exp_end = self.buffer[2] + 3 # Expect '\r\n' here
125 # Length field can legitimeely be much less than the
126 # length of the packet (e.g. WiFi positioning), but
127 # it _should not_ be greater. Still sometimes it is.
128 # Luckily, not by too much: by maybe two or three bytes?
129 # Do this embarrassing hack to avoid accidental match
130 # of some binary data in the packet against '\r\n'.
132 frameend = self.buffer.find(b"\r\n", frameend + 1)
133 if frameend == -1 or frameend >= (
135 ): # Found realistic match or none
137 if frameend == -1: # Incomplete frame, return what we have
139 packet = self.buffer[2:frameend]
140 self.buffer = self.buffer[frameend + 2 :]
141 if len(packet) < 2: # frameend comes too early
142 msgs.append(f"Packet too short: {packet.hex()}")
147 def close(self) -> bytes:
153 def enframe(buffer: bytes, imei: Optional[str] = None) -> bytes:
154 return b"xx" + buffer + b"\r\n"
157 ### Parser/Constructor ###
160 class DecodeError(Exception):
161 def __init__(self, e: Exception, **kwargs: Any) -> None:
163 for k, v in kwargs.items():
167 def maybe(typ: type) -> Callable[[Any], Any]:
168 return lambda x: None if x is None else typ(x)
171 def intx(x: Union[str, int]) -> int:
172 if isinstance(x, str):
177 def boolx(x: Union[str, bool]) -> bool:
178 if isinstance(x, str):
179 if x.upper() in ("ON", "TRUE", "1"):
181 if x.upper() in ("OFF", "FALSE", "0"):
183 raise ValueError(str(x) + " could not be parsed as a Boolean")
187 def hhmm(x: str) -> str:
188 """Check for the string that represents hours and minutes"""
189 if not isinstance(x, str) or len(x) != 4:
190 raise ValueError(str(x) + " is not a four-character string")
193 if hh < 0 or hh > 23 or mm < 0 or mm > 59:
194 raise ValueError(str(x) + " does not contain valid hours and minutes")
198 def hhmmhhmm(x: str) -> str:
199 """Check for the string that represents hours and minutes twice"""
200 if not isinstance(x, str) or len(x) != 8:
201 raise ValueError(str(x) + " is not an eight-character string")
202 return hhmm(x[:4]) + hhmm(x[4:])
205 def l3str(x: Union[str, List[str]]) -> List[str]:
206 if isinstance(x, str):
210 if len(lx) != 3 or not all(isinstance(el, str) for el in x):
211 raise ValueError(str(lx) + " is not a list of three strings")
215 def l3alarms(x: Union[str, List[Tuple[int, str]]]) -> List[Tuple[int, str]]:
216 def alrmspec(sub: str) -> Tuple[int, str]:
218 raise ValueError(sub + " does not represent day and time")
232 if isinstance(x, str):
233 lx = [alrmspec(sub) for sub in x.split(",")]
236 lx.extend([(0, "0000") for _ in range(3 - len(lx))])
237 if len(lx) != 3 or any(d < 0 or d > 7 for d, tm in lx):
238 raise ValueError(str(lx) + " is a wrong alarms specification")
239 return [(d, hhmm(tm)) for d, tm in lx]
242 def l3int(x: Union[str, List[int]]) -> List[int]:
243 if isinstance(x, str):
244 lx = [int(el) for el in x.split(",")]
247 if len(lx) != 3 or not all(isinstance(el, int) for el in lx):
248 raise ValueError(str(lx) + " is not a list of three integers")
253 NON = 0 # Incoming, no response needed
254 INL = 1 # Birirectional, use `inline_response()`
255 EXT = 2 # Birirectional, use external responder
258 class GPS303Pkt(ProtoClass):
259 RESPOND = Respond.NON # Do not send anything back by default
261 IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
262 OUT_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
263 KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
264 In: Type["GPS303Pkt"]
265 Out: Type["GPS303Pkt"]
269 def __getattr__(self, name: str) -> Any:
272 def __setattr__(self, name: str, value: Any) -> None:
275 def __init__(self, *args: Any, **kwargs: Any):
277 Construct the object _either_ from (length, payload),
278 _or_ from the values of individual fields
280 assert not args or (len(args) == 2 and not kwargs)
281 if args: # guaranteed to be two arguments at this point
282 self.length, self.payload = args
284 self.decode(self.length, self.payload)
286 raise DecodeError(e, obj=self)
288 for kw, typ, dfl in self.KWARGS:
289 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
292 self.__class__.__name__ + " stray kwargs " + str(kwargs)
295 def __repr__(self) -> str:
296 return "{}({})".format(
297 self.__class__.__name__,
301 'bytes.fromhex("{}")'.format(v.hex())
302 if isinstance(v, bytes)
305 for k, v in self.__dict__.items()
306 if not k.startswith("_")
310 decode: Callable[["GPS303Pkt", int, bytes], None]
312 def in_decode(self, length: int, packet: bytes) -> None:
313 # Overridden in subclasses, otherwise do not decode payload
316 def out_decode(self, length: int, packet: bytes) -> None:
317 # Overridden in subclasses, otherwise do not decode payload
320 encode: Callable[["GPS303Pkt"], bytes]
322 def in_encode(self) -> bytes:
323 # Necessary to emulate terminal, which is not implemented
324 raise NotImplementedError(
325 self.__class__.__name__ + ".encode() not implemented"
328 def out_encode(self) -> bytes:
329 # Overridden in subclasses, otherwise make empty payload
333 def packed(self) -> bytes:
334 payload = self.encode()
335 length = getattr(self, "length", len(payload) + 1)
336 return pack("BB", length, self.PROTO) + payload
339 class UNKNOWN(GPS303Pkt):
340 PROTO = 256 # > 255 is impossible in real packets
343 class LOGIN(GPS303Pkt):
345 RESPOND = Respond.INL
346 # Default response for ACK, can also respond with STOP_UPLOAD
347 IN_KWARGS = (("imei", str, "0000000000000000"), ("ver", int, 0))
349 def in_decode(self, length: int, payload: bytes) -> None:
350 self.imei = payload[:8].ljust(8, b"\0").hex()
351 self.ver = payload[8]
353 def in_encode(self) -> bytes:
354 return bytes.fromhex(self.imei).ljust(8, b"\0")[:8] + pack(
359 class SUPERVISION(GPS303Pkt):
361 OUT_KWARGS = (("status", int, 1),)
363 def out_encode(self) -> bytes:
364 # 1: The device automatically answers Pickup effect
365 # 2: Automatically Answering Two-way Calls
366 # 3: Ring manually answer the two-way call
367 return pack("B", self.status)
370 class HEARTBEAT(GPS303Pkt):
372 RESPOND = Respond.INL
375 class _GPS_POSITIONING(GPS303Pkt):
376 RESPOND = Respond.INL
378 def in_decode(self, length: int, payload: bytes) -> None:
379 self.dtime = payload[:6]
380 if self.dtime == b"\0\0\0\0\0\0":
383 yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
384 self.devtime = datetime(
385 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
387 self.gps_data_length = payload[6] >> 4
388 self.gps_nb_sat = payload[6] & 0x0F
389 lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
390 self.gps_is_valid = bool(flags & 0b0001000000000000) # bit 3
391 flip_lon = bool(flags & 0b0000100000000000) # bit 4
392 flip_lat = not bool(flags & 0b0000010000000000) # bit 5
393 self.heading = flags & 0b0000001111111111 # bits 6 - last
394 self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
395 self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
399 def out_encode(self) -> bytes:
400 tup = datetime.utcnow().timetuple()
401 ttup = (tup[0] % 100,) + tup[1:6]
402 return pack("BBBBBB", *ttup)
405 class GPS_POSITIONING(_GPS_POSITIONING):
409 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
413 class STATUS(GPS303Pkt):
415 RESPOND = Respond.EXT
419 ("timezone", int, 0),
421 ("signal", maybe(int), None),
423 OUT_KWARGS = (("upload_interval", int, 25),)
425 def in_decode(self, length: int, payload: bytes) -> None:
426 self.batt, self.ver, self.timezone, self.intvl = unpack(
430 self.signal: Optional[int] = payload[4]
434 def in_encode(self) -> bytes:
435 return pack("BBBB", self.batt, self.ver, self.timezone, self.intvl) + (
436 b"" if self.signal is None else pack("B", self.signal)
439 def out_encode(self) -> bytes: # Set interval in minutes
440 return pack("B", self.upload_interval)
443 class HIBERNATION(GPS303Pkt): # Server can send to send devicee to sleep
446 def in_encode(self) -> bytes:
450 class RESET(GPS303Pkt):
451 # Device sends when it got reset SMS
452 # Server can send to initiate factory reset
456 class WHITELIST_TOTAL(GPS303Pkt): # Server sends to initiage sync (0x58)
458 OUT_KWARGS = (("number", int, 3),)
460 def out_encode(self) -> bytes: # Number of whitelist entries
461 return pack("B", self.number)
464 class _WIFI_POSITIONING(GPS303Pkt):
465 IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = (
467 ("dtime", bytes, b"\0\0\0\0\0\0"),
468 ("wifi_aps", list, []),
471 ("gsm_cells", list, []),
474 def in_decode(self, length: int, payload: bytes) -> None:
475 self.dtime = payload[:6]
476 if self.dtime == b"\0\0\0\0\0\0":
479 self.devtime = datetime.strptime(
480 self.dtime.hex(), "%y%m%d%H%M%S"
481 ).astimezone(tz=timezone.utc)
483 for i in range(self.length): # length has special meaning here
484 slice = payload[6 + i * 7 : 13 + i * 7]
485 self.wifi_aps.append(
486 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
488 gsm_slice = payload[6 + self.length * 7 :]
489 ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
491 for i in range(ncells):
492 slice = gsm_slice[4 + i * 5 : 9 + i * 5]
493 locac, cellid, sigstr = unpack(
494 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
496 self.gsm_cells.append((locac, cellid, -sigstr))
498 def in_encode(self) -> bytes:
499 self.length = len(self.wifi_aps)
505 bytes.fromhex(mac.replace(":", "")).ljust(6, b"\0")[:6]
507 for mac, sigstr in self.wifi_aps
510 pack("!BHB", len(self.gsm_cells), self.mcc, self.mnc),
513 pack("!HHB", locac, cellid, -sigstr)
514 for locac, cellid, sigstr in self.gsm_cells
521 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
523 RESPOND = Respond.INL
525 def out_encode(self) -> bytes:
526 return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
529 class TIME(GPS303Pkt):
531 RESPOND = Respond.INL
533 def out_encode(self) -> bytes:
534 return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
537 class PROHIBIT_LBS(GPS303Pkt):
539 OUT_KWARGS = (("status", int, 1),)
541 def out_encode(self) -> bytes: # Server sent, 0-off, 1-on
542 return pack("B", self.status)
545 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
549 ("gps_off", boolx, False), # Clarify the meaning of 0/1
550 ("gps_interval_set", boolx, False),
551 ("gps_interval", hhmmhhmm, "00000000"),
552 ("lbs_off", boolx, False), # Clarify the meaning of 0/1
553 ("boot_time_set", boolx, False),
554 ("boot_time", hhmm, "0000"),
555 ("shut_time_set", boolx, False),
556 ("shut_time", hhmm, "0000"),
559 def out_encode(self) -> bytes:
561 pack("B", self.gps_off)
562 + pack("B", self.gps_interval_set)
563 + bytes.fromhex(self.gps_interval)
564 + pack("B", self.lbs_off)
565 + pack("B", self.boot_time_set)
566 + bytes.fromhex(self.boot_time)
567 + pack("B", self.shut_time_set)
568 + bytes.fromhex(self.shut_time)
572 class _SET_PHONE(GPS303Pkt):
573 OUT_KWARGS = (("phone", str, ""),)
575 def out_encode(self) -> bytes:
577 return self.phone.encode("")
580 class REMOTE_MONITOR_PHONE(_SET_PHONE):
584 class SOS_PHONE(_SET_PHONE):
588 class DAD_PHONE(_SET_PHONE):
592 class MOM_PHONE(_SET_PHONE):
596 class STOP_UPLOAD(GPS303Pkt): # Server response to LOGIN to thwart the device
600 class GPS_OFF_PERIOD(GPS303Pkt):
604 ("fm", hhmm, "0000"),
605 ("to", hhmm, "2359"),
608 def out_encode(self) -> bytes:
610 pack("B", self.onoff)
611 + bytes.fromhex(self.fm)
612 + bytes.fromhex(self.to)
616 class DND_PERIOD(GPS303Pkt):
621 ("fm1", hhmm, "0000"),
622 ("to1", hhmm, "2359"),
623 ("fm2", hhmm, "0000"),
624 ("to2", hhmm, "2359"),
627 def out_encode(self) -> bytes:
629 pack("B", self.onoff)
630 + pack("B", self.week)
631 + bytes.fromhex(self.fm1)
632 + bytes.fromhex(self.to1)
633 + bytes.fromhex(self.fm2)
634 + bytes.fromhex(self.to2)
638 class RESTART_SHUTDOWN(GPS303Pkt):
640 OUT_KWARGS = (("flag", int, 0),)
642 def out_encode(self) -> bytes:
645 return pack("B", self.flag)
648 class DEVICE(GPS303Pkt):
650 OUT_KWARGS = (("flag", int, 0),)
652 # 0 - Stop looking for equipment
653 # 1 - Start looking for equipment
654 def out_encode(self) -> bytes:
655 return pack("B", self.flag)
658 class ALARM_CLOCK(GPS303Pkt):
661 Tuple[str, Callable[[Any], Any], List[Tuple[int, str]]], ...
663 ("alarms", l3alarms, []),
666 def out_encode(self) -> bytes:
668 pack("B", day) + bytes.fromhex(tm) for day, tm in self.alarms
672 class STOP_ALARM(GPS303Pkt):
675 def in_decode(self, length: int, payload: bytes) -> None:
676 self.flag = payload[0]
679 class SETUP(GPS303Pkt):
681 RESPOND = Respond.EXT
683 ("uploadintervalseconds", intx, 0x0300),
684 ("binaryswitch", intx, 0b00110001),
685 ("alarms", l3int, [0, 0, 0]),
686 ("dndtimeswitch", int, 0),
687 ("dndtimes", l3int, [0, 0, 0]),
688 ("gpstimeswitch", int, 0),
689 ("gpstimestart", int, 0),
690 ("gpstimestop", int, 0),
691 ("phonenumbers", l3str, ["", "", ""]),
694 def out_encode(self) -> bytes:
695 def pack3b(x: int) -> bytes:
696 return pack("!I", x)[1:]
700 pack("!H", self.uploadintervalseconds),
701 pack("B", self.binaryswitch),
703 + [pack3b(el) for el in self.alarms]
705 pack("B", self.dndtimeswitch),
707 + [pack3b(el) for el in self.dndtimes]
709 pack("B", self.gpstimeswitch),
710 pack("!H", self.gpstimestart),
711 pack("!H", self.gpstimestop),
713 + [b";".join([el.encode() for el in self.phonenumbers])]
716 def in_encode(self) -> bytes:
720 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
724 class RESTORE_PASSWORD(GPS303Pkt):
728 class WIFI_POSITIONING(_WIFI_POSITIONING):
730 RESPOND = Respond.EXT
731 OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
733 def out_encode(self) -> bytes:
734 if self.latitude is None or self.longitude is None:
736 return "{:+#010.8g},{:+#010.8g}".format(
737 self.latitude, self.longitude
740 def out_decode(self, length: int, payload: bytes) -> None:
741 lat, lon = payload.decode().split(",")
742 self.latitude = float(lat)
743 self.longitude = float(lon)
746 class MANUAL_POSITIONING(GPS303Pkt):
749 def in_decode(self, length: int, payload: bytes) -> None:
750 self.flag = payload[0] if len(payload) > 0 else -1
755 4: "LBS search > 3 times",
756 5: "Same LBS and WiFi data",
757 6: "LBS prohibited, WiFi absent",
758 7: "GPS spacing < 50 m",
759 }.get(self.flag, "Unknown")
762 class BATTERY_CHARGE(GPS303Pkt):
766 class CHARGER_CONNECTED(GPS303Pkt):
770 class CHARGER_DISCONNECTED(GPS303Pkt):
774 class VIBRATION_RECEIVED(GPS303Pkt):
778 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
780 RESPOND = Respond.EXT
781 OUT_KWARGS = (("interval", int, 10),)
783 def in_decode(self, length: int, payload: bytes) -> None:
784 self.interval = unpack("!H", payload[:2])
786 def out_encode(self) -> bytes:
787 return pack("!H", self.interval)
790 class SOS_ALARM(GPS303Pkt):
794 class UNKNOWN_B3(GPS303Pkt):
796 IN_KWARGS = (("asciidata", str, ""),)
798 def in_decode(self, length: int, payload: bytes) -> None:
799 self.asciidata = payload.decode()
802 # Build dicts protocol number -> class and class name -> protocol number
805 if True: # just to indent the code, sorry!
808 for name, cls in globals().items()
810 and issubclass(cls, GPS303Pkt)
811 and not name.startswith("_")
813 if hasattr(cls, "PROTO"):
814 CLASSES[cls.PROTO] = cls
815 PROTOS[cls.__name__] = cls.PROTO
820 ) -> Union[Type[GPS303Pkt], List[str]]:
821 if prefix.startswith(PROTO_PREFIX):
822 pname = prefix[len(PROTO_PREFIX) :]
824 raise KeyError(pname)
827 for name, proto in PROTOS.items()
828 if name.upper().startswith(prefix.upper())
831 return [name for name, _ in lst]
833 return CLASSES[proto]
836 def proto_handled(proto: str) -> bool:
837 return proto.startswith(PROTO_PREFIX)
840 def proto_name(obj: Union[Type[GPS303Pkt], GPS303Pkt]) -> str:
841 return PROTO_PREFIX + (
842 obj.__class__.__name__ if isinstance(obj, GPS303Pkt) else obj.__name__
846 def proto_of_message(packet: bytes) -> str:
847 return proto_name(CLASSES.get(packet[1], UNKNOWN))
850 def imei_from_packet(packet: bytes) -> Optional[str]:
851 if packet[1] == LOGIN.PROTO:
852 msg = parse_message(packet)
853 if isinstance(msg, LOGIN):
858 def is_goodbye_packet(packet: bytes) -> bool:
859 return packet[1] == HIBERNATION.PROTO
862 def inline_response(packet: bytes) -> Optional[bytes]:
866 if cls.RESPOND is Respond.INL:
867 return cls.Out().packed
871 def probe_buffer(buffer: bytes) -> bool:
872 framestart = buffer.find(b"xx")
875 if len(buffer) - framestart < 6:
880 def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
881 """From a packet (without framing bytes) derive the XXX.In object"""
882 length, proto = unpack("BB", packet[:2])
884 if proto not in CLASSES:
885 cause: Union[DecodeError, ValueError, IndexError] = ValueError(
886 f"Proto {proto} is unknown"
891 return CLASSES[proto].In(length, payload)
893 return CLASSES[proto].Out(length, payload)
894 except (DecodeError, ValueError, IndexError) as e:
897 retobj = UNKNOWN.In(length, payload)
899 retobj = UNKNOWN.Out(length, payload)
900 retobj.PROTO = proto # Override class attr with object attr
905 def exposed_protos() -> List[Tuple[str, bool]]:
907 (proto_name(GPS_POSITIONING), True),
908 (proto_name(WIFI_POSITIONING), False),
909 (proto_name(STATUS), True),