2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
7 1. There is no security whatsoever. If you know the module's IMEI,
8 you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10 the TCP stream) makes it vulnerable to coincidental appearance
11 of framing bytes in the middle of the message. Most of the time
12 the server will receive one message in one TCP segment (i.e. in
13 one `recv()` operation, but relying on that would break things
14 if the path has lower MTU than the size of a message.
17 from datetime import datetime, timezone
19 from inspect import isclass
20 from struct import error, pack, unpack
51 "GPS_OFFLINE_POSITIONING",
56 "WIFI_OFFLINE_POSITIONING",
59 "GPS_LBS_SWITCH_TIMES",
60 "REMOTE_MONITOR_PHONE",
72 "SYNCHRONOUS_WHITELIST",
78 "CHARGER_DISCONNECTED",
80 "POSITION_UPLOAD_INTERVAL",
93 def __init__(self) -> None:
96 def recv(self, segment: bytes) -> List[Union[bytes, str]]:
98 Process next segment of the stream. Return successfully deframed
99 packets as `bytes` and error messages as `str`.
102 self.buffer += segment
103 if len(self.buffer) > MAXBUFFER:
104 # We are receiving junk. Let's drop it or we run out of memory.
106 return [f"More than {MAXBUFFER} unparseable data, dropping"]
107 msgs: List[Union[bytes, str]] = []
109 framestart = self.buffer.find(b"xx")
110 if framestart == -1: # No frames, return whatever we have
112 if framestart > 0: # Should not happen, report
114 f'Undecodable data ({framestart}) "{self.buffer[:framestart][:64].hex()}"'
116 self.buffer = self.buffer[framestart:]
117 # At this point, buffer starts with a packet
118 if len(self.buffer) < 6: # no len and proto - cannot proceed
120 exp_end = self.buffer[2] + 3 # Expect '\r\n' here
122 # Length field can legitimeely be much less than the
123 # length of the packet (e.g. WiFi positioning), but
124 # it _should not_ be greater. Still sometimes it is.
125 # Luckily, not by too much: by maybe two or three bytes?
126 # Do this embarrassing hack to avoid accidental match
127 # of some binary data in the packet against '\r\n'.
129 frameend = self.buffer.find(b"\r\n", frameend + 1)
130 if frameend == -1 or frameend >= (
132 ): # Found realistic match or none
134 if frameend == -1: # Incomplete frame, return what we have
136 packet = self.buffer[2:frameend]
137 self.buffer = self.buffer[frameend + 2 :]
138 if len(packet) < 2: # frameend comes too early
139 msgs.append(f"Packet too short: {packet.hex()}")
144 def close(self) -> bytes:
150 def enframe(buffer: bytes, imei: Optional[str] = None) -> bytes:
151 return b"xx" + buffer + b"\r\n"
154 ### Parser/Constructor ###
157 class DecodeError(Exception):
158 def __init__(self, e: Exception, **kwargs: Any) -> None:
160 for k, v in kwargs.items():
164 def maybe(typ: type) -> Callable[[Any], Any]:
165 return lambda x: None if x is None else typ(x)
168 def intx(x: Union[str, int]) -> int:
169 if isinstance(x, str):
174 def boolx(x: Union[str, bool]) -> bool:
175 if isinstance(x, str):
176 if x.upper() in ("ON", "TRUE", "1"):
178 if x.upper() in ("OFF", "FALSE", "0"):
180 raise ValueError(str(x) + " could not be parsed as a Boolean")
184 def hhmm(x: str) -> str:
185 """Check for the string that represents hours and minutes"""
186 if not isinstance(x, str) or len(x) != 4:
187 raise ValueError(str(x) + " is not a four-character string")
190 if hh < 0 or hh > 23 or mm < 0 or mm > 59:
191 raise ValueError(str(x) + " does not contain valid hours and minutes")
195 def hhmmhhmm(x: str) -> str:
196 """Check for the string that represents hours and minutes twice"""
197 if not isinstance(x, str) or len(x) != 8:
198 raise ValueError(str(x) + " is not an eight-character string")
199 return hhmm(x[:4]) + hhmm(x[4:])
202 def l3str(x: Union[str, List[str]]) -> List[str]:
203 if isinstance(x, str):
207 if len(lx) != 3 or not all(isinstance(el, str) for el in x):
208 raise ValueError(str(lx) + " is not a list of three strings")
212 def l3alarms(x: Union[str, List[Tuple[int, str]]]) -> List[Tuple[int, str]]:
213 def alrmspec(sub: str) -> Tuple[int, str]:
215 raise ValueError(sub + " does not represent day and time")
229 if isinstance(x, str):
230 lx = [alrmspec(sub) for sub in x.split(",")]
233 lx.extend([(0, "0000") for _ in range(3 - len(lx))])
234 if len(lx) != 3 or any(d < 0 or d > 7 for d, tm in lx):
235 raise ValueError(str(lx) + " is a wrong alarms specification")
236 return [(d, hhmm(tm)) for d, tm in lx]
239 def l3int(x: Union[str, List[int]]) -> List[int]:
240 if isinstance(x, str):
241 lx = [int(el) for el in x.split(",")]
244 if len(lx) != 3 or not all(isinstance(el, int) for el in lx):
245 raise ValueError(str(lx) + " is not a list of three integers")
251 For each class corresponding to a message, automatically create
252 two nested classes `In` and `Out` that also inherit from their
253 "nest". Class attribute `IN_KWARGS` defined in the "nest" is
254 copied to the `In` nested class under the name `KWARGS`, and
255 likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
256 to the nested class `Out`. In addition, method `encode` is
257 defined in both classes equal to `in_encode()` and `out_encode()`
263 def __getattr__(self, name: str) -> Any:
266 def __setattr__(self, name: str, value: Any) -> None:
270 cls: Type["MetaPkt"],
272 bases: Tuple[type, ...],
273 attrs: Dict[str, Any],
275 newcls = super().__new__(cls, name, bases, attrs)
276 newcls.In = super().__new__(
281 "KWARGS": newcls.IN_KWARGS,
282 "decode": newcls.in_decode,
283 "encode": newcls.in_encode,
286 newcls.Out = super().__new__(
291 "KWARGS": newcls.OUT_KWARGS,
292 "decode": newcls.out_decode,
293 "encode": newcls.out_encode,
300 NON = 0 # Incoming, no response needed
301 INL = 1 # Birirectional, use `inline_response()`
302 EXT = 2 # Birirectional, use external responder
305 class GPS303Pkt(metaclass=MetaPkt):
306 RESPOND = Respond.NON # Do not send anything back by default
308 IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
309 OUT_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
310 KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
311 In: Type["GPS303Pkt"]
312 Out: Type["GPS303Pkt"]
316 def __getattr__(self, name: str) -> Any:
319 def __setattr__(self, name: str, value: Any) -> None:
322 def __init__(self, *args: Any, **kwargs: Any):
324 Construct the object _either_ from (length, payload),
325 _or_ from the values of individual fields
327 assert not args or (len(args) == 2 and not kwargs)
328 if args: # guaranteed to be two arguments at this point
329 self.length, self.payload = args
331 self.decode(self.length, self.payload)
333 raise DecodeError(e, obj=self)
335 for kw, typ, dfl in self.KWARGS:
336 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
339 self.__class__.__name__ + " stray kwargs " + str(kwargs)
342 def __repr__(self) -> str:
343 return "{}({})".format(
344 self.__class__.__name__,
348 'bytes.fromhex("{}")'.format(v.hex())
349 if isinstance(v, bytes)
352 for k, v in self.__dict__.items()
353 if not k.startswith("_")
357 decode: Callable[["GPS303Pkt", int, bytes], None]
359 def in_decode(self, length: int, packet: bytes) -> None:
360 # Overridden in subclasses, otherwise do not decode payload
363 def out_decode(self, length: int, packet: bytes) -> None:
364 # Overridden in subclasses, otherwise do not decode payload
367 encode: Callable[["GPS303Pkt"], bytes]
369 def in_encode(self) -> bytes:
370 # Necessary to emulate terminal, which is not implemented
371 raise NotImplementedError(
372 self.__class__.__name__ + ".encode() not implemented"
375 def out_encode(self) -> bytes:
376 # Overridden in subclasses, otherwise make empty payload
380 def packed(self) -> bytes:
381 payload = self.encode()
382 length = getattr(self, "length", len(payload) + 1)
383 return pack("BB", length, self.PROTO) + payload
386 class UNKNOWN(GPS303Pkt):
387 PROTO = 256 # > 255 is impossible in real packets
390 class LOGIN(GPS303Pkt):
392 RESPOND = Respond.INL
393 # Default response for ACK, can also respond with STOP_UPLOAD
394 IN_KWARGS = (("imei", str, "0000000000000000"), ("ver", int, 0))
396 def in_decode(self, length: int, payload: bytes) -> None:
397 self.imei = payload[:8].ljust(8, b"\0").hex()
398 self.ver = payload[8]
400 def in_encode(self) -> bytes:
401 return bytes.fromhex(self.imei).ljust(8, b"\0")[:8] + pack(
406 class SUPERVISION(GPS303Pkt):
408 OUT_KWARGS = (("status", int, 1),)
410 def out_encode(self) -> bytes:
411 # 1: The device automatically answers Pickup effect
412 # 2: Automatically Answering Two-way Calls
413 # 3: Ring manually answer the two-way call
414 return pack("B", self.status)
417 class HEARTBEAT(GPS303Pkt):
419 RESPOND = Respond.INL
422 class _GPS_POSITIONING(GPS303Pkt):
423 RESPOND = Respond.INL
425 def in_decode(self, length: int, payload: bytes) -> None:
426 self.dtime = payload[:6]
427 if self.dtime == b"\0\0\0\0\0\0":
430 yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
431 self.devtime = datetime(
432 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
434 self.gps_data_length = payload[6] >> 4
435 self.gps_nb_sat = payload[6] & 0x0F
436 lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
437 self.gps_is_valid = bool(flags & 0b0001000000000000) # bit 3
438 flip_lon = bool(flags & 0b0000100000000000) # bit 4
439 flip_lat = not bool(flags & 0b0000010000000000) # bit 5
440 self.heading = flags & 0b0000001111111111 # bits 6 - last
441 self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
442 self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
446 def out_encode(self) -> bytes:
447 tup = datetime.utcnow().timetuple()
448 ttup = (tup[0] % 100,) + tup[1:6]
449 return pack("BBBBBB", *ttup)
452 class GPS_POSITIONING(_GPS_POSITIONING):
456 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
460 class STATUS(GPS303Pkt):
462 RESPOND = Respond.EXT
466 ("timezone", int, 0),
468 ("signal", maybe(int), None),
470 OUT_KWARGS = (("upload_interval", int, 25),)
472 def in_decode(self, length: int, payload: bytes) -> None:
473 self.batt, self.ver, self.timezone, self.intvl = unpack(
477 self.signal: Optional[int] = payload[4]
481 def in_encode(self) -> bytes:
482 return pack("BBBB", self.batt, self.ver, self.timezone, self.intvl) + (
483 b"" if self.signal is None else pack("B", self.signal)
486 def out_encode(self) -> bytes: # Set interval in minutes
487 return pack("B", self.upload_interval)
490 class HIBERNATION(GPS303Pkt): # Server can send to send devicee to sleep
493 def in_encode(self) -> bytes:
497 class RESET(GPS303Pkt):
498 # Device sends when it got reset SMS
499 # Server can send to initiate factory reset
503 class WHITELIST_TOTAL(GPS303Pkt): # Server sends to initiage sync (0x58)
505 OUT_KWARGS = (("number", int, 3),)
507 def out_encode(self) -> bytes: # Number of whitelist entries
508 return pack("B", self.number)
511 class _WIFI_POSITIONING(GPS303Pkt):
512 IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = (
514 ("dtime", bytes, b"\0\0\0\0\0\0"),
515 ("wifi_aps", list, []),
518 ("gsm_cells", list, []),
521 def in_decode(self, length: int, payload: bytes) -> None:
522 self.dtime = payload[:6]
523 if self.dtime == b"\0\0\0\0\0\0":
526 self.devtime = datetime.strptime(
527 self.dtime.hex(), "%y%m%d%H%M%S"
528 ).astimezone(tz=timezone.utc)
530 for i in range(self.length): # length has special meaning here
531 slice = payload[6 + i * 7 : 13 + i * 7]
532 self.wifi_aps.append(
533 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
535 gsm_slice = payload[6 + self.length * 7 :]
536 ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
538 for i in range(ncells):
539 slice = gsm_slice[4 + i * 5 : 9 + i * 5]
540 locac, cellid, sigstr = unpack(
541 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
543 self.gsm_cells.append((locac, cellid, -sigstr))
545 def in_encode(self) -> bytes:
546 self.length = len(self.wifi_aps)
552 bytes.fromhex(mac.replace(":", "")).ljust(6, b"\0")[:6]
554 for mac, sigstr in self.wifi_aps
557 pack("!BHB", len(self.gsm_cells), self.mcc, self.mnc),
560 pack("!HHB", locac, cellid, -sigstr)
561 for locac, cellid, sigstr in self.gsm_cells
568 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
570 RESPOND = Respond.INL
572 def out_encode(self) -> bytes:
573 return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
576 class TIME(GPS303Pkt):
578 RESPOND = Respond.INL
580 def out_encode(self) -> bytes:
581 return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
584 class PROHIBIT_LBS(GPS303Pkt):
586 OUT_KWARGS = (("status", int, 1),)
588 def out_encode(self) -> bytes: # Server sent, 0-off, 1-on
589 return pack("B", self.status)
592 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
596 ("gps_off", boolx, False), # Clarify the meaning of 0/1
597 ("gps_interval_set", boolx, False),
598 ("gps_interval", hhmmhhmm, "00000000"),
599 ("lbs_off", boolx, False), # Clarify the meaning of 0/1
600 ("boot_time_set", boolx, False),
601 ("boot_time", hhmm, "0000"),
602 ("shut_time_set", boolx, False),
603 ("shut_time", hhmm, "0000"),
606 def out_encode(self) -> bytes:
608 pack("B", self.gps_off)
609 + pack("B", self.gps_interval_set)
610 + bytes.fromhex(self.gps_interval)
611 + pack("B", self.lbs_off)
612 + pack("B", self.boot_time_set)
613 + bytes.fromhex(self.boot_time)
614 + pack("B", self.shut_time_set)
615 + bytes.fromhex(self.shut_time)
619 class _SET_PHONE(GPS303Pkt):
620 OUT_KWARGS = (("phone", str, ""),)
622 def out_encode(self) -> bytes:
624 return self.phone.encode("")
627 class REMOTE_MONITOR_PHONE(_SET_PHONE):
631 class SOS_PHONE(_SET_PHONE):
635 class DAD_PHONE(_SET_PHONE):
639 class MOM_PHONE(_SET_PHONE):
643 class STOP_UPLOAD(GPS303Pkt): # Server response to LOGIN to thwart the device
647 class GPS_OFF_PERIOD(GPS303Pkt):
651 ("fm", hhmm, "0000"),
652 ("to", hhmm, "2359"),
655 def out_encode(self) -> bytes:
657 pack("B", self.onoff)
658 + bytes.fromhex(self.fm)
659 + bytes.fromhex(self.to)
663 class DND_PERIOD(GPS303Pkt):
668 ("fm1", hhmm, "0000"),
669 ("to1", hhmm, "2359"),
670 ("fm2", hhmm, "0000"),
671 ("to2", hhmm, "2359"),
674 def out_encode(self) -> bytes:
676 pack("B", self.onoff)
677 + pack("B", self.week)
678 + bytes.fromhex(self.fm1)
679 + bytes.fromhex(self.to1)
680 + bytes.fromhex(self.fm2)
681 + bytes.fromhex(self.to2)
685 class RESTART_SHUTDOWN(GPS303Pkt):
687 OUT_KWARGS = (("flag", int, 0),)
689 def out_encode(self) -> bytes:
692 return pack("B", self.flag)
695 class DEVICE(GPS303Pkt):
697 OUT_KWARGS = (("flag", int, 0),)
699 # 0 - Stop looking for equipment
700 # 1 - Start looking for equipment
701 def out_encode(self) -> bytes:
702 return pack("B", self.flag)
705 class ALARM_CLOCK(GPS303Pkt):
708 Tuple[str, Callable[[Any], Any], List[Tuple[int, str]]], ...
710 ("alarms", l3alarms, []),
713 def out_encode(self) -> bytes:
715 pack("B", day) + bytes.fromhex(tm) for day, tm in self.alarms
719 class STOP_ALARM(GPS303Pkt):
722 def in_decode(self, length: int, payload: bytes) -> None:
723 self.flag = payload[0]
726 class SETUP(GPS303Pkt):
728 RESPOND = Respond.EXT
730 ("uploadintervalseconds", intx, 0x0300),
731 ("binaryswitch", intx, 0b00110001),
732 ("alarms", l3int, [0, 0, 0]),
733 ("dndtimeswitch", int, 0),
734 ("dndtimes", l3int, [0, 0, 0]),
735 ("gpstimeswitch", int, 0),
736 ("gpstimestart", int, 0),
737 ("gpstimestop", int, 0),
738 ("phonenumbers", l3str, ["", "", ""]),
741 def out_encode(self) -> bytes:
742 def pack3b(x: int) -> bytes:
743 return pack("!I", x)[1:]
747 pack("!H", self.uploadintervalseconds),
748 pack("B", self.binaryswitch),
750 + [pack3b(el) for el in self.alarms]
752 pack("B", self.dndtimeswitch),
754 + [pack3b(el) for el in self.dndtimes]
756 pack("B", self.gpstimeswitch),
757 pack("!H", self.gpstimestart),
758 pack("!H", self.gpstimestop),
760 + [b";".join([el.encode() for el in self.phonenumbers])]
763 def in_encode(self) -> bytes:
767 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
771 class RESTORE_PASSWORD(GPS303Pkt):
775 class WIFI_POSITIONING(_WIFI_POSITIONING):
777 RESPOND = Respond.EXT
778 OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
780 def out_encode(self) -> bytes:
781 if self.latitude is None or self.longitude is None:
783 return "{:+#010.8g},{:+#010.8g}".format(
784 self.latitude, self.longitude
787 def out_decode(self, length: int, payload: bytes) -> None:
788 lat, lon = payload.decode().split(",")
789 self.latitude = float(lat)
790 self.longitude = float(lon)
793 class MANUAL_POSITIONING(GPS303Pkt):
796 def in_decode(self, length: int, payload: bytes) -> None:
797 self.flag = payload[0] if len(payload) > 0 else -1
802 4: "LBS search > 3 times",
803 5: "Same LBS and WiFi data",
804 6: "LBS prohibited, WiFi absent",
805 7: "GPS spacing < 50 m",
806 }.get(self.flag, "Unknown")
809 class BATTERY_CHARGE(GPS303Pkt):
813 class CHARGER_CONNECTED(GPS303Pkt):
817 class CHARGER_DISCONNECTED(GPS303Pkt):
821 class VIBRATION_RECEIVED(GPS303Pkt):
825 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
827 RESPOND = Respond.EXT
828 OUT_KWARGS = (("interval", int, 10),)
830 def in_decode(self, length: int, payload: bytes) -> None:
831 self.interval = unpack("!H", payload[:2])
833 def out_encode(self) -> bytes:
834 return pack("!H", self.interval)
837 class SOS_ALARM(GPS303Pkt):
841 class UNKNOWN_B3(GPS303Pkt):
843 IN_KWARGS = (("asciidata", str, ""),)
845 def in_decode(self, length: int, payload: bytes) -> None:
846 self.asciidata = payload.decode()
849 # Build dicts protocol number -> class and class name -> protocol number
852 if True: # just to indent the code, sorry!
855 for name, cls in globals().items()
857 and issubclass(cls, GPS303Pkt)
858 and not name.startswith("_")
860 if hasattr(cls, "PROTO"):
861 CLASSES[cls.PROTO] = cls
862 PROTOS[cls.__name__] = cls.PROTO
867 ) -> Union[Type[GPS303Pkt], List[Tuple[str, int]]]:
870 for name, proto in PROTOS.items()
871 if name.upper().startswith(prefix.upper())
876 return CLASSES[proto]
879 def proto_handled(proto: str) -> bool:
880 return proto.startswith(PROTO_PREFIX)
883 def proto_name(obj: Union[MetaPkt, GPS303Pkt]) -> str:
884 return PROTO_PREFIX + (
885 obj.__class__.__name__ if isinstance(obj, GPS303Pkt) else obj.__name__
889 def proto_by_name(name: str) -> int:
890 return PROTOS.get(name, -1)
893 def proto_of_message(packet: bytes) -> str:
894 return proto_name(CLASSES.get(packet[1], UNKNOWN))
897 def imei_from_packet(packet: bytes) -> Optional[str]:
898 if packet[1] == LOGIN.PROTO:
899 msg = parse_message(packet)
900 if isinstance(msg, LOGIN):
905 def is_goodbye_packet(packet: bytes) -> bool:
906 return packet[1] == HIBERNATION.PROTO
909 def inline_response(packet: bytes) -> Optional[bytes]:
913 if cls.RESPOND is Respond.INL:
914 return cls.Out().packed
918 def probe_buffer(buffer: bytes) -> bool:
919 framestart = buffer.find(b"xx")
922 if len(buffer) - framestart < 6:
927 def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
928 """From a packet (without framing bytes) derive the XXX.In object"""
929 length, proto = unpack("BB", packet[:2])
931 if proto not in CLASSES:
932 cause: Union[DecodeError, ValueError, IndexError] = ValueError(
933 f"Proto {proto} is unknown"
938 return CLASSES[proto].In(length, payload)
940 return CLASSES[proto].Out(length, payload)
941 except (DecodeError, ValueError, IndexError) as e:
944 retobj = UNKNOWN.In(length, payload)
946 retobj = UNKNOWN.Out(length, payload)
947 retobj.PROTO = proto # Override class attr with object attr