2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
7 1. There is no security whatsoever. If you know the module's IMEI,
8 you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10 the TCP stream) makes it vulnerable to coincidental appearance
11 of framing bytes in the middle of the message. Most of the time
12 the server will receive one message in one TCP segment (i.e. in
13 one `recv()` operation, but relying on that would break things
14 if the path has lower MTU than the size of a message.
17 from datetime import datetime, timezone
19 from inspect import isclass
20 from struct import error, pack, unpack
34 from .protomodule import ProtoClass
58 def __init__(self) -> None:
61 def recv(self, segment: bytes) -> List[Union[bytes, str]]:
63 Process next segment of the stream. Return successfully deframed
64 packets as `bytes` and error messages as `str`.
67 self.buffer += segment
68 if len(self.buffer) > MAXBUFFER:
69 # We are receiving junk. Let's drop it or we run out of memory.
71 return [f"More than {MAXBUFFER} unparseable data, dropping"]
72 msgs: List[Union[bytes, str]] = []
74 framestart = self.buffer.find(b"xx")
75 if framestart == -1: # No frames, return whatever we have
77 if framestart > 0: # Should not happen, report
79 f'Undecodable data ({framestart}) "{self.buffer[:framestart][:64].hex()}"'
81 self.buffer = self.buffer[framestart:]
82 # At this point, buffer starts with a packet
83 if len(self.buffer) < 6: # no len and proto - cannot proceed
85 exp_end = self.buffer[2] + 3 # Expect '\r\n' here
87 # Length field can legitimeely be much less than the
88 # length of the packet (e.g. WiFi positioning), but
89 # it _should not_ be greater. Still sometimes it is.
90 # Luckily, not by too much: by maybe two or three bytes?
91 # Do this embarrassing hack to avoid accidental match
92 # of some binary data in the packet against '\r\n'.
94 frameend = self.buffer.find(b"\r\n", frameend + 1)
95 if frameend == -1 or frameend >= (
97 ): # Found realistic match or none
99 if frameend == -1: # Incomplete frame, return what we have
101 packet = self.buffer[2:frameend]
102 self.buffer = self.buffer[frameend + 2 :]
103 if len(packet) < 2: # frameend comes too early
104 msgs.append(f"Packet too short: {packet.hex()}")
109 def close(self) -> bytes:
115 def enframe(buffer: bytes, imei: Optional[str] = None) -> bytes:
116 return b"xx" + buffer + b"\r\n"
119 ### Parser/Constructor ###
122 class DecodeError(Exception):
123 def __init__(self, e: Exception, **kwargs: Any) -> None:
125 for k, v in kwargs.items():
129 def maybe(typ: type) -> Callable[[Any], Any]:
130 return lambda x: None if x is None else typ(x)
133 def intx(x: Union[str, int]) -> int:
134 if isinstance(x, str):
139 def boolx(x: Union[str, bool]) -> bool:
140 if isinstance(x, str):
141 if x.upper() in ("ON", "TRUE", "1"):
143 if x.upper() in ("OFF", "FALSE", "0"):
145 raise ValueError(str(x) + " could not be parsed as a Boolean")
149 def hhmm(x: str) -> str:
150 """Check for the string that represents hours and minutes"""
151 if not isinstance(x, str) or len(x) != 4:
152 raise ValueError(str(x) + " is not a four-character string")
155 if hh < 0 or hh > 23 or mm < 0 or mm > 59:
156 raise ValueError(str(x) + " does not contain valid hours and minutes")
160 def hhmmhhmm(x: str) -> str:
161 """Check for the string that represents hours and minutes twice"""
162 if not isinstance(x, str) or len(x) != 8:
163 raise ValueError(str(x) + " is not an eight-character string")
164 return hhmm(x[:4]) + hhmm(x[4:])
167 def l3str(x: Union[str, List[str]]) -> List[str]:
168 if isinstance(x, str):
172 if len(lx) != 3 or not all(isinstance(el, str) for el in x):
173 raise ValueError(str(lx) + " is not a list of three strings")
177 def l3alarms(x: Union[str, List[Tuple[int, str]]]) -> List[Tuple[int, str]]:
178 def alrmspec(sub: str) -> Tuple[int, str]:
180 raise ValueError(sub + " does not represent day and time")
194 if isinstance(x, str):
195 lx = [alrmspec(sub) for sub in x.split(",")]
198 lx.extend([(0, "0000") for _ in range(3 - len(lx))])
199 if len(lx) != 3 or any(d < 0 or d > 7 for d, tm in lx):
200 raise ValueError(str(lx) + " is a wrong alarms specification")
201 return [(d, hhmm(tm)) for d, tm in lx]
204 def l3int(x: Union[str, List[int]]) -> List[int]:
205 if isinstance(x, str):
206 lx = [int(el) for el in x.split(",")]
209 if len(lx) != 3 or not all(isinstance(el, int) for el in lx):
210 raise ValueError(str(lx) + " is not a list of three integers")
215 NON = 0 # Incoming, no response needed
216 INL = 1 # Birirectional, use `inline_response()`
217 EXT = 2 # Birirectional, use external responder
220 class GPS303Pkt(ProtoClass):
221 RESPOND = Respond.NON # Do not send anything back by default
223 IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
224 OUT_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
225 KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
226 In: Type["GPS303Pkt"]
227 Out: Type["GPS303Pkt"]
231 def __getattr__(self, name: str) -> Any:
234 def __setattr__(self, name: str, value: Any) -> None:
237 def __init__(self, *args: Any, **kwargs: Any):
239 Construct the object _either_ from (length, payload),
240 _or_ from the values of individual fields
242 assert not args or (len(args) == 2 and not kwargs)
243 if args: # guaranteed to be two arguments at this point
244 self.length, self.payload = args
246 self.decode(self.length, self.payload)
248 raise DecodeError(e, obj=self)
250 for kw, typ, dfl in self.KWARGS:
251 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
254 self.__class__.__name__ + " stray kwargs " + str(kwargs)
257 def __repr__(self) -> str:
258 return "{}({})".format(
259 self.__class__.__name__,
263 'bytes.fromhex("{}")'.format(v.hex())
264 if isinstance(v, bytes)
267 for k, v in self.__dict__.items()
268 if not k.startswith("_")
272 decode: Callable[["GPS303Pkt", int, bytes], None]
274 def in_decode(self, length: int, packet: bytes) -> None:
275 # Overridden in subclasses, otherwise do not decode payload
278 def out_decode(self, length: int, packet: bytes) -> None:
279 # Overridden in subclasses, otherwise do not decode payload
282 encode: Callable[["GPS303Pkt"], bytes]
284 def in_encode(self) -> bytes:
285 # Necessary to emulate terminal, which is not implemented
286 raise NotImplementedError(
287 self.__class__.__name__ + ".encode() not implemented"
290 def out_encode(self) -> bytes:
291 # Overridden in subclasses, otherwise make empty payload
295 def packed(self) -> bytes:
296 payload = self.encode()
297 length = getattr(self, "length", len(payload) + 1)
298 return pack("BB", length, self.PROTO) + payload
301 class UNKNOWN(GPS303Pkt):
302 PROTO = 256 # > 255 is impossible in real packets
305 class LOGIN(GPS303Pkt):
307 RESPOND = Respond.INL
308 # Default response for ACK, can also respond with STOP_UPLOAD
309 IN_KWARGS = (("imei", str, "0000000000000000"), ("ver", int, 0))
311 def in_decode(self, length: int, payload: bytes) -> None:
312 self.imei = payload[:8].ljust(8, b"\0").hex()
313 self.ver = payload[8]
315 def in_encode(self) -> bytes:
316 return bytes.fromhex(self.imei).ljust(8, b"\0")[:8] + pack(
321 class SUPERVISION(GPS303Pkt):
323 OUT_KWARGS = (("status", int, 1),)
325 def out_encode(self) -> bytes:
326 # 1: The device automatically answers Pickup effect
327 # 2: Automatically Answering Two-way Calls
328 # 3: Ring manually answer the two-way call
329 return pack("B", self.status)
332 class HEARTBEAT(GPS303Pkt):
334 RESPOND = Respond.INL
337 class _GPS_POSITIONING(GPS303Pkt):
338 RESPOND = Respond.INL
340 def in_decode(self, length: int, payload: bytes) -> None:
341 self.dtime = payload[:6]
342 if self.dtime == b"\0\0\0\0\0\0":
345 yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
346 self.devtime = datetime(
347 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
349 self.gps_data_length = payload[6] >> 4
350 self.gps_nb_sat = payload[6] & 0x0F
351 lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
352 self.gps_is_valid = bool(flags & 0b0001000000000000) # bit 3
353 flip_lon = bool(flags & 0b0000100000000000) # bit 4
354 flip_lat = not bool(flags & 0b0000010000000000) # bit 5
355 self.heading = flags & 0b0000001111111111 # bits 6 - last
356 self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
357 self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
361 def out_encode(self) -> bytes:
362 tup = datetime.utcnow().timetuple()
363 ttup = (tup[0] % 100,) + tup[1:6]
364 return pack("BBBBBB", *ttup)
366 def rectified(self) -> Dict[str, Any]: # JSON-able dict
369 "devtime": str(self.devtime),
371 "direction": self.heading,
372 "latitude": self.latitude,
373 "longitude": self.longitude,
377 class GPS_POSITIONING(_GPS_POSITIONING):
381 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
385 class STATUS(GPS303Pkt):
387 RESPOND = Respond.EXT
391 ("timezone", int, 0),
393 ("signal", maybe(int), None),
395 OUT_KWARGS = (("upload_interval", int, 25),)
397 def in_decode(self, length: int, payload: bytes) -> None:
398 self.batt, self.ver, self.timezone, self.intvl = unpack(
402 self.signal: Optional[int] = payload[4]
406 def in_encode(self) -> bytes:
407 return pack("BBBB", self.batt, self.ver, self.timezone, self.intvl) + (
408 b"" if self.signal is None else pack("B", self.signal)
411 def out_encode(self) -> bytes: # Set interval in minutes
412 return pack("B", self.upload_interval)
415 class HIBERNATION(GPS303Pkt): # Server can send to send devicee to sleep
418 def in_encode(self) -> bytes:
422 class RESET(GPS303Pkt):
423 # Device sends when it got reset SMS
424 # Server can send to initiate factory reset
428 class WHITELIST_TOTAL(GPS303Pkt): # Server sends to initiage sync (0x58)
430 OUT_KWARGS = (("number", int, 3),)
432 def out_encode(self) -> bytes: # Number of whitelist entries
433 return pack("B", self.number)
436 class _WIFI_POSITIONING(GPS303Pkt):
437 IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = (
439 ("dtime", bytes, b"\0\0\0\0\0\0"),
440 ("wifi_aps", list, []),
443 ("gsm_cells", list, []),
446 def in_decode(self, length: int, payload: bytes) -> None:
447 self.dtime = payload[:6]
448 if self.dtime == b"\0\0\0\0\0\0":
451 self.devtime = datetime.strptime(
452 self.dtime.hex(), "%y%m%d%H%M%S"
453 ).astimezone(tz=timezone.utc)
455 for i in range(self.length): # length has special meaning here
456 slice = payload[6 + i * 7 : 13 + i * 7]
457 self.wifi_aps.append(
458 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
460 gsm_slice = payload[6 + self.length * 7 :]
461 ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
463 for i in range(ncells):
464 slice = gsm_slice[4 + i * 5 : 9 + i * 5]
465 locac, cellid, sigstr = unpack(
466 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
468 self.gsm_cells.append((locac, cellid, -sigstr))
470 def in_encode(self) -> bytes:
471 self.length = len(self.wifi_aps)
477 bytes.fromhex(mac.replace(":", "")).ljust(6, b"\0")[:6]
479 for mac, sigstr in self.wifi_aps
482 pack("!BHB", len(self.gsm_cells), self.mcc, self.mnc),
485 pack("!HHB", locac, cellid, -sigstr)
486 for locac, cellid, sigstr in self.gsm_cells
492 def rectified(self) -> Dict[str, Any]: # JSON-able dict
494 "type": "approximate_location",
495 "devtime": str(self.devtime),
498 "base_stations": self.gsm_cells,
499 "wifi_aps": self.wifi_aps,
503 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
505 RESPOND = Respond.INL
507 def out_encode(self) -> bytes:
508 return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
511 class TIME(GPS303Pkt):
513 RESPOND = Respond.INL
515 def out_encode(self) -> bytes:
516 return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
519 class PROHIBIT_LBS(GPS303Pkt):
521 OUT_KWARGS = (("status", int, 1),)
523 def out_encode(self) -> bytes: # Server sent, 0-off, 1-on
524 return pack("B", self.status)
527 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
531 ("gps_off", boolx, False), # Clarify the meaning of 0/1
532 ("gps_interval_set", boolx, False),
533 ("gps_interval", hhmmhhmm, "00000000"),
534 ("lbs_off", boolx, False), # Clarify the meaning of 0/1
535 ("boot_time_set", boolx, False),
536 ("boot_time", hhmm, "0000"),
537 ("shut_time_set", boolx, False),
538 ("shut_time", hhmm, "0000"),
541 def out_encode(self) -> bytes:
543 pack("B", self.gps_off)
544 + pack("B", self.gps_interval_set)
545 + bytes.fromhex(self.gps_interval)
546 + pack("B", self.lbs_off)
547 + pack("B", self.boot_time_set)
548 + bytes.fromhex(self.boot_time)
549 + pack("B", self.shut_time_set)
550 + bytes.fromhex(self.shut_time)
554 class _SET_PHONE(GPS303Pkt):
555 OUT_KWARGS = (("phone", str, ""),)
557 def out_encode(self) -> bytes:
559 return self.phone.encode("")
562 class REMOTE_MONITOR_PHONE(_SET_PHONE):
566 class SOS_PHONE(_SET_PHONE):
570 class DAD_PHONE(_SET_PHONE):
574 class MOM_PHONE(_SET_PHONE):
578 class STOP_UPLOAD(GPS303Pkt): # Server response to LOGIN to thwart the device
582 class GPS_OFF_PERIOD(GPS303Pkt):
586 ("fm", hhmm, "0000"),
587 ("to", hhmm, "2359"),
590 def out_encode(self) -> bytes:
592 pack("B", self.onoff)
593 + bytes.fromhex(self.fm)
594 + bytes.fromhex(self.to)
598 class DND_PERIOD(GPS303Pkt):
603 ("fm1", hhmm, "0000"),
604 ("to1", hhmm, "2359"),
605 ("fm2", hhmm, "0000"),
606 ("to2", hhmm, "2359"),
609 def out_encode(self) -> bytes:
611 pack("B", self.onoff)
612 + pack("B", self.week)
613 + bytes.fromhex(self.fm1)
614 + bytes.fromhex(self.to1)
615 + bytes.fromhex(self.fm2)
616 + bytes.fromhex(self.to2)
620 class RESTART_SHUTDOWN(GPS303Pkt):
622 OUT_KWARGS = (("flag", int, 0),)
624 def out_encode(self) -> bytes:
627 return pack("B", self.flag)
630 class DEVICE(GPS303Pkt):
632 OUT_KWARGS = (("flag", int, 0),)
634 # 0 - Stop looking for equipment
635 # 1 - Start looking for equipment
636 def out_encode(self) -> bytes:
637 return pack("B", self.flag)
640 class ALARM_CLOCK(GPS303Pkt):
643 Tuple[str, Callable[[Any], Any], List[Tuple[int, str]]], ...
645 ("alarms", l3alarms, []),
648 def out_encode(self) -> bytes:
650 pack("B", day) + bytes.fromhex(tm) for day, tm in self.alarms
654 class STOP_ALARM(GPS303Pkt):
657 def in_decode(self, length: int, payload: bytes) -> None:
658 self.flag = payload[0]
661 class SETUP(GPS303Pkt):
663 RESPOND = Respond.EXT
665 ("uploadintervalseconds", intx, 0x0300),
666 ("binaryswitch", intx, 0b00110001),
667 ("alarms", l3int, [0, 0, 0]),
668 ("dndtimeswitch", int, 0),
669 ("dndtimes", l3int, [0, 0, 0]),
670 ("gpstimeswitch", int, 0),
671 ("gpstimestart", int, 0),
672 ("gpstimestop", int, 0),
673 ("phonenumbers", l3str, ["", "", ""]),
676 def out_encode(self) -> bytes:
677 def pack3b(x: int) -> bytes:
678 return pack("!I", x)[1:]
682 pack("!H", self.uploadintervalseconds),
683 pack("B", self.binaryswitch),
685 + [pack3b(el) for el in self.alarms]
687 pack("B", self.dndtimeswitch),
689 + [pack3b(el) for el in self.dndtimes]
691 pack("B", self.gpstimeswitch),
692 pack("!H", self.gpstimestart),
693 pack("!H", self.gpstimestop),
695 + [b";".join([el.encode() for el in self.phonenumbers])]
698 def in_encode(self) -> bytes:
702 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
706 class RESTORE_PASSWORD(GPS303Pkt):
710 class WIFI_POSITIONING(_WIFI_POSITIONING):
712 RESPOND = Respond.EXT
713 OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
715 def out_encode(self) -> bytes:
716 if self.latitude is None or self.longitude is None:
718 return "{:+#010.8g},{:+#010.8g}".format(
719 self.latitude, self.longitude
722 def out_decode(self, length: int, payload: bytes) -> None:
723 lat, lon = payload.decode().split(",")
724 self.latitude = float(lat)
725 self.longitude = float(lon)
728 class MANUAL_POSITIONING(GPS303Pkt):
731 def in_decode(self, length: int, payload: bytes) -> None:
732 self.flag = payload[0] if len(payload) > 0 else -1
737 4: "LBS search > 3 times",
738 5: "Same LBS and WiFi data",
739 6: "LBS prohibited, WiFi absent",
740 7: "GPS spacing < 50 m",
741 }.get(self.flag, "Unknown")
744 class BATTERY_CHARGE(GPS303Pkt):
748 class CHARGER_CONNECTED(GPS303Pkt):
752 class CHARGER_DISCONNECTED(GPS303Pkt):
756 class VIBRATION_RECEIVED(GPS303Pkt):
760 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
762 RESPOND = Respond.EXT
763 OUT_KWARGS = (("interval", int, 10),)
765 def in_decode(self, length: int, payload: bytes) -> None:
766 self.interval = unpack("!H", payload[:2])
768 def out_encode(self) -> bytes:
769 return pack("!H", self.interval)
772 class SOS_ALARM(GPS303Pkt):
776 class UNKNOWN_B3(GPS303Pkt):
778 IN_KWARGS = (("asciidata", str, ""),)
780 def in_decode(self, length: int, payload: bytes) -> None:
781 self.asciidata = payload.decode()
784 # Build dicts protocol number -> class and class name -> protocol number
787 if True: # just to indent the code, sorry!
790 for name, cls in globals().items()
792 and issubclass(cls, GPS303Pkt)
793 and not name.startswith("_")
795 if hasattr(cls, "PROTO"):
796 CLASSES[cls.PROTO] = cls
797 PROTOS[cls.__name__] = cls.PROTO
802 ) -> Union[Type[GPS303Pkt], List[str]]:
803 if prefix.startswith(PROTO_PREFIX):
804 pname = prefix[len(PROTO_PREFIX) :]
806 raise KeyError(pname)
809 for name, proto in PROTOS.items()
810 if name.upper().startswith(prefix.upper())
813 return [name for name, _ in lst]
815 return CLASSES[proto]
818 def proto_handled(proto: str) -> bool:
819 return proto.startswith(PROTO_PREFIX)
822 def proto_name(obj: Union[Type[GPS303Pkt], GPS303Pkt]) -> str:
823 return PROTO_PREFIX + (
824 obj.__class__.__name__ if isinstance(obj, GPS303Pkt) else obj.__name__
828 def proto_of_message(packet: bytes) -> str:
829 return proto_name(CLASSES.get(packet[1], UNKNOWN))
832 def imei_from_packet(packet: bytes) -> Optional[str]:
833 if packet[1] == LOGIN.PROTO:
834 msg = parse_message(packet)
835 if isinstance(msg, LOGIN):
840 def is_goodbye_packet(packet: bytes) -> bool:
841 return packet[1] == HIBERNATION.PROTO
844 def inline_response(packet: bytes) -> Optional[bytes]:
848 if cls.RESPOND is Respond.INL:
849 return cls.Out().packed
853 def probe_buffer(buffer: bytes) -> bool:
854 framestart = buffer.find(b"xx")
857 if len(buffer) - framestart < 6:
862 def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
863 """From a packet (without framing bytes) derive the XXX.In object"""
864 length, proto = unpack("BB", packet[:2])
866 if proto not in CLASSES:
867 cause: Union[DecodeError, ValueError, IndexError] = ValueError(
868 f"Proto {proto} is unknown"
873 return CLASSES[proto].In(length, payload)
875 return CLASSES[proto].Out(length, payload)
876 except (DecodeError, ValueError, IndexError) as e:
879 retobj = UNKNOWN.In(length, payload)
881 retobj = UNKNOWN.Out(length, payload)
882 retobj.PROTO = proto # Override class attr with object attr
887 def exposed_protos() -> List[Tuple[str, bool]]:
889 (proto_name(cls), cls.RESPOND is Respond.EXT)
890 for cls in CLASSES.values()
891 if hasattr(cls, "rectified")