2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
7 1. There is no security whatsoever. If you know the module's IMEI,
8 you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10 the TCP stream) makes it vulnerable to coincidental appearance
11 of framing bytes in the middle of the message. Most of the time
12 the server will receive one message in one TCP segment (i.e. in
13 one `recv()` operation, but relying on that would break things
14 if the path has lower MTU than the size of a message.
17 from datetime import datetime, timezone
19 from inspect import isclass
20 from struct import error, pack, unpack
49 "GPS_OFFLINE_POSITIONING",
54 "WIFI_OFFLINE_POSITIONING",
57 "GPS_LBS_SWITCH_TIMES",
58 "REMOTE_MONITOR_PHONE",
70 "SYNCHRONOUS_WHITELIST",
76 "CHARGER_DISCONNECTED",
78 "POSITION_UPLOAD_INTERVAL",
88 class StreamError(Exception):
93 def __init__(self) -> None:
97 def enframe(buffer: bytes) -> bytes:
98 return b"xx" + buffer + b"\r\n"
100 def recv(self, segment: bytes) -> List[bytes]:
102 self.buffer += segment
103 if len(self.buffer) > MAXBUFFER:
104 # We are receiving junk. Let's drop it or we run out of memory.
107 f"More than {MAXBUFFER} unparseable data, dropping"
111 framestart = self.buffer.find(b"xx")
112 if framestart == -1: # No frames, return whatever we have
114 if framestart > 0: # Should not happen, report
115 self.buffer = self.buffer[framestart:]
117 f'Undecodable data ({framestart}) "{self.buffer[:framestart][:64].hex()}"'
119 # At this point, buffer starts with a packet
120 if len(self.buffer) < 6: # no len and proto - cannot proceed
122 exp_end = self.buffer[2] + 3 # Expect '\r\n' here
124 # Length field can legitimeely be much less than the
125 # length of the packet (e.g. WiFi positioning), but
126 # it _should not_ be greater. Still sometimes it is.
127 # Luckily, not by too much: by maybe two or three bytes?
128 # Do this embarrassing hack to avoid accidental match
129 # of some binary data in the packet against '\r\n'.
131 frameend = self.buffer.find(b"\r\n", frameend + 1)
132 if frameend == -1 or frameend >= (
134 ): # Found realistic match or none
136 if frameend == -1: # Incomplete frame, return what we have
138 packet = self.buffer[2:frameend]
139 self.buffer = self.buffer[frameend + 2 :]
140 if len(packet) < 2: # frameend comes too early
141 raise StreamError(f"Packet too short: {packet.hex()}")
145 def close(self) -> bytes:
151 ### Parser/Constructor ###
154 class DecodeError(Exception):
155 def __init__(self, e: Exception, **kwargs: Any) -> None:
157 for k, v in kwargs.items():
161 def maybe(typ: type) -> Callable[[Any], Any]:
162 return lambda x: None if x is None else typ(x)
165 def intx(x: Union[str, int]) -> int:
166 if isinstance(x, str):
171 def boolx(x: Union[str, bool]) -> bool:
172 if isinstance(x, str):
173 if x.upper() in ("ON", "TRUE", "1"):
175 if x.upper() in ("OFF", "FALSE", "0"):
177 raise ValueError(str(x) + " could not be parsed as a Boolean")
181 def hhmm(x: str) -> str:
182 """Check for the string that represents hours and minutes"""
183 if not isinstance(x, str) or len(x) != 4:
184 raise ValueError(str(x) + " is not a four-character string")
187 if hh < 0 or hh > 23 or mm < 0 or mm > 59:
188 raise ValueError(str(x) + " does not contain valid hours and minutes")
192 def hhmmhhmm(x: str) -> str:
193 """Check for the string that represents hours and minutes twice"""
194 if not isinstance(x, str) or len(x) != 8:
195 raise ValueError(str(x) + " is not an eight-character string")
196 return hhmm(x[:4]) + hhmm(x[4:])
199 def l3str(x: Union[str, List[str]]) -> List[str]:
200 if isinstance(x, str):
204 if len(lx) != 3 or not all(isinstance(el, str) for el in x):
205 raise ValueError(str(lx) + " is not a list of three strings")
209 def l3alarms(x: Union[str, List[Tuple[int, str]]]) -> List[Tuple[int, str]]:
210 def alrmspec(sub: str) -> Tuple[int, str]:
212 raise ValueError(sub + " does not represent day and time")
226 if isinstance(x, str):
227 lx = [alrmspec(sub) for sub in x.split(",")]
230 lx.extend([(0, "0000") for _ in range(3 - len(lx))])
231 if len(lx) != 3 or any(d < 0 or d > 7 for d, tm in lx):
232 raise ValueError(str(lx) + " is a wrong alarms specification")
233 return [(d, hhmm(tm)) for d, tm in lx]
236 def l3int(x: Union[str, List[int]]) -> List[int]:
237 if isinstance(x, str):
238 lx = [int(el) for el in x.split(",")]
241 if len(lx) != 3 or not all(isinstance(el, int) for el in lx):
242 raise ValueError(str(lx) + " is not a list of three integers")
248 For each class corresponding to a message, automatically create
249 two nested classes `In` and `Out` that also inherit from their
250 "nest". Class attribute `IN_KWARGS` defined in the "nest" is
251 copied to the `In` nested class under the name `KWARGS`, and
252 likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
253 to the nested class `Out`. In addition, method `encode` is
254 defined in both classes equal to `in_encode()` and `out_encode()`
260 def __getattr__(self, name: str) -> Any:
263 def __setattr__(self, name: str, value: Any) -> None:
267 cls: Type["MetaPkt"],
269 bases: Tuple[type, ...],
270 attrs: Dict[str, Any],
272 newcls = super().__new__(cls, name, bases, attrs)
273 newcls.In = super().__new__(
278 "KWARGS": newcls.IN_KWARGS,
279 "decode": newcls.in_decode,
280 "encode": newcls.in_encode,
283 newcls.Out = super().__new__(
288 "KWARGS": newcls.OUT_KWARGS,
289 "decode": newcls.out_decode,
290 "encode": newcls.out_encode,
297 NON = 0 # Incoming, no response needed
298 INL = 1 # Birirectional, use `inline_response()`
299 EXT = 2 # Birirectional, use external responder
302 class GPS303Pkt(metaclass=MetaPkt):
303 RESPOND = Respond.NON # Do not send anything back by default
305 IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
306 OUT_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
307 KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
308 In: Type["GPS303Pkt"]
309 Out: Type["GPS303Pkt"]
313 def __getattr__(self, name: str) -> Any:
316 def __setattr__(self, name: str, value: Any) -> None:
319 def __init__(self, *args: Any, **kwargs: Any):
321 Construct the object _either_ from (length, payload),
322 _or_ from the values of individual fields
324 assert not args or (len(args) == 2 and not kwargs)
325 if args: # guaranteed to be two arguments at this point
326 self.length, self.payload = args
328 self.decode(self.length, self.payload)
330 raise DecodeError(e, obj=self)
332 for kw, typ, dfl in self.KWARGS:
333 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
336 self.__class__.__name__ + " stray kwargs " + str(kwargs)
339 def __repr__(self) -> str:
340 return "{}({})".format(
341 self.__class__.__name__,
345 'bytes.fromhex("{}")'.format(v.hex())
346 if isinstance(v, bytes)
349 for k, v in self.__dict__.items()
350 if not k.startswith("_")
354 decode: Callable[["GPS303Pkt", int, bytes], None]
356 def in_decode(self, length: int, packet: bytes) -> None:
357 # Overridden in subclasses, otherwise do not decode payload
360 def out_decode(self, length: int, packet: bytes) -> None:
361 # Overridden in subclasses, otherwise do not decode payload
364 encode: Callable[["GPS303Pkt"], bytes]
366 def in_encode(self) -> bytes:
367 # Necessary to emulate terminal, which is not implemented
368 raise NotImplementedError(
369 self.__class__.__name__ + ".encode() not implemented"
372 def out_encode(self) -> bytes:
373 # Overridden in subclasses, otherwise make empty payload
377 def packed(self) -> bytes:
378 payload = self.encode()
379 length = getattr(self, "length", len(payload) + 1)
380 return pack("BB", length, self.PROTO) + payload
383 class UNKNOWN(GPS303Pkt):
384 PROTO = 256 # > 255 is impossible in real packets
387 class LOGIN(GPS303Pkt):
389 RESPOND = Respond.INL
390 # Default response for ACK, can also respond with STOP_UPLOAD
391 IN_KWARGS = (("imei", str, "0000000000000000"), ("ver", int, 0))
393 def in_decode(self, length: int, payload: bytes) -> None:
394 self.imei = payload[:8].ljust(8, b"\0").hex()
395 self.ver = payload[8]
397 def in_encode(self) -> bytes:
398 return bytes.fromhex(self.imei).ljust(8, b"\0")[:8] + pack(
403 class SUPERVISION(GPS303Pkt):
405 OUT_KWARGS = (("status", int, 1),)
407 def out_encode(self) -> bytes:
408 # 1: The device automatically answers Pickup effect
409 # 2: Automatically Answering Two-way Calls
410 # 3: Ring manually answer the two-way call
411 return pack("B", self.status)
414 class HEARTBEAT(GPS303Pkt):
416 RESPOND = Respond.INL
419 class _GPS_POSITIONING(GPS303Pkt):
420 RESPOND = Respond.INL
422 def in_decode(self, length: int, payload: bytes) -> None:
423 self.dtime = payload[:6]
424 if self.dtime == b"\0\0\0\0\0\0":
427 yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
428 self.devtime = datetime(
429 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
431 self.gps_data_length = payload[6] >> 4
432 self.gps_nb_sat = payload[6] & 0x0F
433 lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
434 self.gps_is_valid = bool(flags & 0b0001000000000000) # bit 3
435 flip_lon = bool(flags & 0b0000100000000000) # bit 4
436 flip_lat = not bool(flags & 0b0000010000000000) # bit 5
437 self.heading = flags & 0b0000001111111111 # bits 6 - last
438 self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
439 self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
443 def out_encode(self) -> bytes:
444 tup = datetime.utcnow().timetuple()
445 ttup = (tup[0] % 100,) + tup[1:6]
446 return pack("BBBBBB", *ttup)
449 class GPS_POSITIONING(_GPS_POSITIONING):
453 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
457 class STATUS(GPS303Pkt):
459 RESPOND = Respond.EXT
463 ("timezone", int, 0),
465 ("signal", maybe(int), None),
467 OUT_KWARGS = (("upload_interval", int, 25),)
469 def in_decode(self, length: int, payload: bytes) -> None:
470 self.batt, self.ver, self.timezone, self.intvl = unpack(
474 self.signal: Optional[int] = payload[4]
478 def in_encode(self) -> bytes:
479 return pack("BBBB", self.batt, self.ver, self.timezone, self.intvl) + (
480 b"" if self.signal is None else pack("B", self.signal)
483 def out_encode(self) -> bytes: # Set interval in minutes
484 return pack("B", self.upload_interval)
487 class HIBERNATION(GPS303Pkt): # Server can send to send devicee to sleep
490 def in_encode(self) -> bytes:
494 class RESET(GPS303Pkt):
495 # Device sends when it got reset SMS
496 # Server can send to initiate factory reset
500 class WHITELIST_TOTAL(GPS303Pkt): # Server sends to initiage sync (0x58)
502 OUT_KWARGS = (("number", int, 3),)
504 def out_encode(self) -> bytes: # Number of whitelist entries
505 return pack("B", self.number)
508 class _WIFI_POSITIONING(GPS303Pkt):
509 IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = (
511 ("dtime", bytes, b"\0\0\0\0\0\0"),
512 ("wifi_aps", list, []),
515 ("gsm_cells", list, []),
518 def in_decode(self, length: int, payload: bytes) -> None:
519 self.dtime = payload[:6]
520 if self.dtime == b"\0\0\0\0\0\0":
523 self.devtime = datetime.strptime(
524 self.dtime.hex(), "%y%m%d%H%M%S"
525 ).astimezone(tz=timezone.utc)
527 for i in range(self.length): # length has special meaning here
528 slice = payload[6 + i * 7 : 13 + i * 7]
529 self.wifi_aps.append(
530 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
532 gsm_slice = payload[6 + self.length * 7 :]
533 ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
535 for i in range(ncells):
536 slice = gsm_slice[4 + i * 5 : 9 + i * 5]
537 locac, cellid, sigstr = unpack(
538 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
540 self.gsm_cells.append((locac, cellid, -sigstr))
542 def in_encode(self) -> bytes:
543 self.length = len(self.wifi_aps)
549 bytes.fromhex(mac.replace(":", "")).ljust(6, b"\0")[:6]
551 for mac, sigstr in self.wifi_aps
554 pack("!BHB", len(self.gsm_cells), self.mcc, self.mnc),
557 pack("!HHB", locac, cellid, -sigstr)
558 for locac, cellid, sigstr in self.gsm_cells
565 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
567 RESPOND = Respond.INL
569 def out_encode(self) -> bytes:
570 return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
573 class TIME(GPS303Pkt):
575 RESPOND = Respond.INL
577 def out_encode(self) -> bytes:
578 return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
581 class PROHIBIT_LBS(GPS303Pkt):
583 OUT_KWARGS = (("status", int, 1),)
585 def out_encode(self) -> bytes: # Server sent, 0-off, 1-on
586 return pack("B", self.status)
589 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
593 ("gps_off", boolx, False), # Clarify the meaning of 0/1
594 ("gps_interval_set", boolx, False),
595 ("gps_interval", hhmmhhmm, "00000000"),
596 ("lbs_off", boolx, False), # Clarify the meaning of 0/1
597 ("boot_time_set", boolx, False),
598 ("boot_time", hhmm, "0000"),
599 ("shut_time_set", boolx, False),
600 ("shut_time", hhmm, "0000"),
603 def out_encode(self) -> bytes:
605 pack("B", self.gps_off)
606 + pack("B", self.gps_interval_set)
607 + bytes.fromhex(self.gps_interval)
608 + pack("B", self.lbs_off)
609 + pack("B", self.boot_time_set)
610 + bytes.fromhex(self.boot_time)
611 + pack("B", self.shut_time_set)
612 + bytes.fromhex(self.shut_time)
616 class _SET_PHONE(GPS303Pkt):
617 OUT_KWARGS = (("phone", str, ""),)
619 def out_encode(self) -> bytes:
621 return self.phone.encode("")
624 class REMOTE_MONITOR_PHONE(_SET_PHONE):
628 class SOS_PHONE(_SET_PHONE):
632 class DAD_PHONE(_SET_PHONE):
636 class MOM_PHONE(_SET_PHONE):
640 class STOP_UPLOAD(GPS303Pkt): # Server response to LOGIN to thwart the device
644 class GPS_OFF_PERIOD(GPS303Pkt):
648 ("fm", hhmm, "0000"),
649 ("to", hhmm, "2359"),
652 def out_encode(self) -> bytes:
654 pack("B", self.onoff)
655 + bytes.fromhex(self.fm)
656 + bytes.fromhex(self.to)
660 class DND_PERIOD(GPS303Pkt):
665 ("fm1", hhmm, "0000"),
666 ("to1", hhmm, "2359"),
667 ("fm2", hhmm, "0000"),
668 ("to2", hhmm, "2359"),
671 def out_encode(self) -> bytes:
673 pack("B", self.onoff)
674 + pack("B", self.week)
675 + bytes.fromhex(self.fm1)
676 + bytes.fromhex(self.to1)
677 + bytes.fromhex(self.fm2)
678 + bytes.fromhex(self.to2)
682 class RESTART_SHUTDOWN(GPS303Pkt):
684 OUT_KWARGS = (("flag", int, 0),)
686 def out_encode(self) -> bytes:
689 return pack("B", self.flag)
692 class DEVICE(GPS303Pkt):
694 OUT_KWARGS = (("flag", int, 0),)
696 # 0 - Stop looking for equipment
697 # 1 - Start looking for equipment
698 def out_encode(self) -> bytes:
699 return pack("B", self.flag)
702 class ALARM_CLOCK(GPS303Pkt):
705 Tuple[str, Callable[[Any], Any], List[Tuple[int, str]]], ...
707 ("alarms", l3alarms, []),
710 def out_encode(self) -> bytes:
712 pack("B", day) + bytes.fromhex(tm) for day, tm in self.alarms
716 class STOP_ALARM(GPS303Pkt):
719 def in_decode(self, length: int, payload: bytes) -> None:
720 self.flag = payload[0]
723 class SETUP(GPS303Pkt):
725 RESPOND = Respond.EXT
727 ("uploadintervalseconds", intx, 0x0300),
728 ("binaryswitch", intx, 0b00110001),
729 ("alarms", l3int, [0, 0, 0]),
730 ("dndtimeswitch", int, 0),
731 ("dndtimes", l3int, [0, 0, 0]),
732 ("gpstimeswitch", int, 0),
733 ("gpstimestart", int, 0),
734 ("gpstimestop", int, 0),
735 ("phonenumbers", l3str, ["", "", ""]),
738 def out_encode(self) -> bytes:
739 def pack3b(x: int) -> bytes:
740 return pack("!I", x)[1:]
744 pack("!H", self.uploadintervalseconds),
745 pack("B", self.binaryswitch),
747 + [pack3b(el) for el in self.alarms]
749 pack("B", self.dndtimeswitch),
751 + [pack3b(el) for el in self.dndtimes]
753 pack("B", self.gpstimeswitch),
754 pack("!H", self.gpstimestart),
755 pack("!H", self.gpstimestop),
757 + [b";".join([el.encode() for el in self.phonenumbers])]
760 def in_encode(self) -> bytes:
764 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
768 class RESTORE_PASSWORD(GPS303Pkt):
772 class WIFI_POSITIONING(_WIFI_POSITIONING):
774 RESPOND = Respond.EXT
775 OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
777 def out_encode(self) -> bytes:
778 if self.latitude is None or self.longitude is None:
780 return "{:+#010.8g},{:+#010.8g}".format(
781 self.latitude, self.longitude
784 def out_decode(self, length: int, payload: bytes) -> None:
785 lat, lon = payload.decode().split(",")
786 self.latitude = float(lat)
787 self.longitude = float(lon)
790 class MANUAL_POSITIONING(GPS303Pkt):
793 def in_decode(self, length: int, payload: bytes) -> None:
794 self.flag = payload[0] if len(payload) > 0 else -1
799 4: "LBS search > 3 times",
800 5: "Same LBS and WiFi data",
801 6: "LBS prohibited, WiFi absent",
802 7: "GPS spacing < 50 m",
803 }.get(self.flag, "Unknown")
806 class BATTERY_CHARGE(GPS303Pkt):
810 class CHARGER_CONNECTED(GPS303Pkt):
814 class CHARGER_DISCONNECTED(GPS303Pkt):
818 class VIBRATION_RECEIVED(GPS303Pkt):
822 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
824 RESPOND = Respond.EXT
825 OUT_KWARGS = (("interval", int, 10),)
827 def in_decode(self, length: int, payload: bytes) -> None:
828 self.interval = unpack("!H", payload[:2])
830 def out_encode(self) -> bytes:
831 return pack("!H", self.interval)
834 class SOS_ALARM(GPS303Pkt):
838 class UNKNOWN_B3(GPS303Pkt):
840 IN_KWARGS = (("asciidata", str, ""),)
842 def in_decode(self, length: int, payload: bytes) -> None:
843 self.asciidata = payload.decode()
846 # Build dicts protocol number -> class and class name -> protocol number
849 if True: # just to indent the code, sorry!
852 for name, cls in globals().items()
854 and issubclass(cls, GPS303Pkt)
855 and not name.startswith("_")
857 if hasattr(cls, "PROTO"):
858 CLASSES[cls.PROTO] = cls
859 PROTOS[cls.__name__] = cls.PROTO
864 ) -> Union[Type[GPS303Pkt], List[Tuple[str, int]]]:
867 for name, proto in PROTOS.items()
868 if name.upper().startswith(prefix.upper())
873 return CLASSES[proto]
876 def proto_by_name(name: str) -> int:
877 return PROTOS.get(name, -1)
880 def proto_of_message(packet: bytes) -> int:
884 def inline_response(packet: bytes) -> Optional[bytes]:
885 proto = proto_of_message(packet)
888 if cls.RESPOND is Respond.INL:
889 return cls.Out().packed
893 def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
894 """From a packet (without framing bytes) derive the XXX.In object"""
895 length, proto = unpack("BB", packet[:2])
897 if proto not in CLASSES:
898 cause: Union[DecodeError, ValueError, IndexError] = ValueError(
899 f"Proto {proto} is unknown"
904 return CLASSES[proto].In(length, payload)
906 return CLASSES[proto].Out(length, payload)
907 except (DecodeError, ValueError, IndexError) as e:
910 retobj = UNKNOWN.In(length, payload)
912 retobj = UNKNOWN.Out(length, payload)
913 retobj.PROTO = proto # Override class attr with object attr