2 Implementation of the protocol used by zx303 "ZhongXun Topin Locator"
3 GPS+GPRS module. Description lifted from this repository:
4 https://github.com/tobadia/petGPS/tree/master/resources
7 1. There is no security whatsoever. If you know the module's IMEI,
8 you can feed fake data to the server, including fake location.
9 2. Ad-hoc choice of framing of messages (that are transferred over
10 the TCP stream) makes it vulnerable to coincidental appearance
11 of framing bytes in the middle of the message. Most of the time
12 the server will receive one message in one TCP segment (i.e. in
13 one `recv()` operation, but relying on that would break things
14 if the path has lower MTU than the size of a message.
17 from datetime import datetime, timezone
19 from inspect import isclass
20 from struct import error, pack, unpack
22 from types import SimpleNamespace
35 from .protomodule import ProtoClass
59 def __init__(self) -> None:
62 def recv(self, segment: bytes) -> List[Union[bytes, str]]:
64 Process next segment of the stream. Return successfully deframed
65 packets as `bytes` and error messages as `str`.
68 self.buffer += segment
69 if len(self.buffer) > MAXBUFFER:
70 # We are receiving junk. Let's drop it or we run out of memory.
72 return [f"More than {MAXBUFFER} unparseable data, dropping"]
73 msgs: List[Union[bytes, str]] = []
75 framestart = self.buffer.find(b"xx")
76 if framestart == -1: # No frames, return whatever we have
78 if framestart > 0: # Should not happen, report
80 f'Undecodable data ({framestart}) "{self.buffer[:framestart][:64].hex()}"'
82 self.buffer = self.buffer[framestart:]
83 # At this point, buffer starts with a packet
84 if len(self.buffer) < 6: # no len and proto - cannot proceed
86 exp_end = self.buffer[2] + 3 # Expect '\r\n' here
88 # Length field can legitimeely be much less than the
89 # length of the packet (e.g. WiFi positioning), but
90 # it _should not_ be greater. Still sometimes it is.
91 # Luckily, not by too much: by maybe two or three bytes?
92 # Do this embarrassing hack to avoid accidental match
93 # of some binary data in the packet against '\r\n'.
95 frameend = self.buffer.find(b"\r\n", frameend + 1)
96 if frameend == -1 or frameend >= (
98 ): # Found realistic match or none
100 if frameend == -1: # Incomplete frame, return what we have
102 packet = self.buffer[2:frameend]
103 self.buffer = self.buffer[frameend + 2 :]
104 if len(packet) < 2: # frameend comes too early
105 msgs.append(f"Packet too short: {packet.hex()}")
110 def close(self) -> bytes:
116 def enframe(buffer: bytes, imei: Optional[str] = None) -> bytes:
117 return b"xx" + buffer + b"\r\n"
120 ### Parser/Constructor ###
123 class DecodeError(Exception):
124 def __init__(self, e: Exception, **kwargs: Any) -> None:
126 for k, v in kwargs.items():
130 def maybe(typ: type) -> Callable[[Any], Any]:
131 return lambda x: None if x is None else typ(x)
134 def intx(x: Union[str, int]) -> int:
135 if isinstance(x, str):
140 def boolx(x: Union[str, bool]) -> bool:
141 if isinstance(x, str):
142 if x.upper() in ("ON", "TRUE", "1"):
144 if x.upper() in ("OFF", "FALSE", "0"):
146 raise ValueError(str(x) + " could not be parsed as a Boolean")
150 def hhmm(x: str) -> str:
151 """Check for the string that represents hours and minutes"""
152 if not isinstance(x, str) or len(x) != 4:
153 raise ValueError(str(x) + " is not a four-character string")
156 if hh < 0 or hh > 23 or mm < 0 or mm > 59:
157 raise ValueError(str(x) + " does not contain valid hours and minutes")
161 def hhmmhhmm(x: str) -> str:
162 """Check for the string that represents hours and minutes twice"""
163 if not isinstance(x, str) or len(x) != 8:
164 raise ValueError(str(x) + " is not an eight-character string")
165 return hhmm(x[:4]) + hhmm(x[4:])
168 def l3str(x: Union[str, List[str]]) -> List[str]:
169 if isinstance(x, str):
173 if len(lx) != 3 or not all(isinstance(el, str) for el in x):
174 raise ValueError(str(lx) + " is not a list of three strings")
178 def l3alarms(x: Union[str, List[Tuple[int, str]]]) -> List[Tuple[int, str]]:
179 def alrmspec(sub: str) -> Tuple[int, str]:
181 raise ValueError(sub + " does not represent day and time")
195 if isinstance(x, str):
196 lx = [alrmspec(sub) for sub in x.split(",")]
199 lx.extend([(0, "0000") for _ in range(3 - len(lx))])
200 if len(lx) != 3 or any(d < 0 or d > 7 for d, tm in lx):
201 raise ValueError(str(lx) + " is a wrong alarms specification")
202 return [(d, hhmm(tm)) for d, tm in lx]
205 def l3int(x: Union[str, List[int]]) -> List[int]:
206 if isinstance(x, str):
207 lx = [int(el) for el in x.split(",")]
210 if len(lx) != 3 or not all(isinstance(el, int) for el in lx):
211 raise ValueError(str(lx) + " is not a list of three integers")
216 NON = 0 # Incoming, no response needed
217 INL = 1 # Birirectional, use `inline_response()`
218 EXT = 2 # Birirectional, use external responder
221 class GPS303Pkt(ProtoClass):
222 RESPOND = Respond.NON # Do not send anything back by default
224 IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
225 OUT_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
226 KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
227 In: Type["GPS303Pkt"]
228 Out: Type["GPS303Pkt"]
232 def __getattr__(self, name: str) -> Any:
235 def __setattr__(self, name: str, value: Any) -> None:
238 def __init__(self, *args: Any, **kwargs: Any):
240 Construct the object _either_ from (length, payload),
241 _or_ from the values of individual fields
243 assert not args or (len(args) == 2 and not kwargs)
244 if args: # guaranteed to be two arguments at this point
245 self.length, self.payload = args
247 self.decode(self.length, self.payload)
249 raise DecodeError(e, obj=self)
251 for kw, typ, dfl in self.KWARGS:
252 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
255 self.__class__.__name__ + " stray kwargs " + str(kwargs)
258 def __repr__(self) -> str:
259 return "{}({})".format(
260 self.__class__.__name__,
264 'bytes.fromhex("{}")'.format(v.hex())
265 if isinstance(v, bytes)
268 for k, v in self.__dict__.items()
269 if not k.startswith("_")
273 decode: Callable[["GPS303Pkt", int, bytes], None]
275 def in_decode(self, length: int, packet: bytes) -> None:
276 # Overridden in subclasses, otherwise do not decode payload
279 def out_decode(self, length: int, packet: bytes) -> None:
280 # Overridden in subclasses, otherwise do not decode payload
283 encode: Callable[["GPS303Pkt"], bytes]
285 def in_encode(self) -> bytes:
286 # Necessary to emulate terminal, which is not implemented
287 raise NotImplementedError(
288 self.__class__.__name__ + ".encode() not implemented"
291 def out_encode(self) -> bytes:
292 # Overridden in subclasses, otherwise make empty payload
296 def packed(self) -> bytes:
297 payload = self.encode()
298 length = getattr(self, "length", len(payload) + 1)
299 return pack("BB", length, self.PROTO) + payload
302 class UNKNOWN(GPS303Pkt):
303 PROTO = 256 # > 255 is impossible in real packets
306 class LOGIN(GPS303Pkt):
308 RESPOND = Respond.INL
309 # Default response for ACK, can also respond with STOP_UPLOAD
310 IN_KWARGS = (("imei", str, "0000000000000000"), ("ver", int, 0))
312 def in_decode(self, length: int, payload: bytes) -> None:
313 self.imei = payload[:8].ljust(8, b"\0").hex()
314 self.ver = payload[8]
316 def in_encode(self) -> bytes:
317 return bytes.fromhex(self.imei).ljust(8, b"\0")[:8] + pack(
322 class SUPERVISION(GPS303Pkt):
324 OUT_KWARGS = (("status", int, 1),)
326 def out_encode(self) -> bytes:
327 # 1: The device automatically answers Pickup effect
328 # 2: Automatically Answering Two-way Calls
329 # 3: Ring manually answer the two-way call
330 return pack("B", self.status)
333 class HEARTBEAT(GPS303Pkt):
335 RESPOND = Respond.INL
338 class _GPS_POSITIONING(GPS303Pkt):
339 RESPOND = Respond.INL
341 def in_decode(self, length: int, payload: bytes) -> None:
342 self.dtime = payload[:6]
343 if self.dtime == b"\0\0\0\0\0\0":
346 yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
347 self.devtime = datetime(
348 2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
350 self.gps_data_length = payload[6] >> 4
351 self.gps_nb_sat = payload[6] & 0x0F
352 lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
353 self.gps_is_valid = bool(flags & 0b0001000000000000) # bit 3
354 flip_lon = bool(flags & 0b0000100000000000) # bit 4
355 flip_lat = not bool(flags & 0b0000010000000000) # bit 5
356 self.heading = flags & 0b0000001111111111 # bits 6 - last
357 self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
358 self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
362 def out_encode(self) -> bytes:
363 tup = datetime.utcnow().timetuple()
364 ttup = (tup[0] % 100,) + tup[1:6]
365 return pack("BBBBBB", *ttup)
367 def rectified(self) -> SimpleNamespace: # JSON-able dict
368 return SimpleNamespace(
370 devtime=str(self.devtime),
372 direction=self.heading,
373 latitude=self.latitude,
374 longitude=self.longitude,
378 class GPS_POSITIONING(_GPS_POSITIONING):
382 class GPS_OFFLINE_POSITIONING(_GPS_POSITIONING):
386 class STATUS(GPS303Pkt):
388 RESPOND = Respond.EXT
392 ("timezone", int, 0),
394 ("signal", maybe(int), None),
396 OUT_KWARGS = (("upload_interval", int, 25),)
398 def in_decode(self, length: int, payload: bytes) -> None:
399 self.batt, self.ver, self.timezone, self.intvl = unpack(
403 self.signal: Optional[int] = payload[4]
407 def in_encode(self) -> bytes:
408 return pack("BBBB", self.batt, self.ver, self.timezone, self.intvl) + (
409 b"" if self.signal is None else pack("B", self.signal)
412 def out_encode(self) -> bytes: # Set interval in minutes
413 return pack("B", self.upload_interval)
416 class HIBERNATION(GPS303Pkt): # Server can send to send devicee to sleep
419 def in_encode(self) -> bytes:
423 class RESET(GPS303Pkt):
424 # Device sends when it got reset SMS
425 # Server can send to initiate factory reset
429 class WHITELIST_TOTAL(GPS303Pkt): # Server sends to initiage sync (0x58)
431 OUT_KWARGS = (("number", int, 3),)
433 def out_encode(self) -> bytes: # Number of whitelist entries
434 return pack("B", self.number)
437 class _WIFI_POSITIONING(GPS303Pkt):
438 IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = (
440 ("dtime", bytes, b"\0\0\0\0\0\0"),
441 ("wifi_aps", list, []),
444 ("gsm_cells", list, []),
447 def in_decode(self, length: int, payload: bytes) -> None:
448 self.dtime = payload[:6]
449 if self.dtime == b"\0\0\0\0\0\0":
452 self.devtime = datetime.strptime(
453 self.dtime.hex(), "%y%m%d%H%M%S"
454 ).astimezone(tz=timezone.utc)
456 for i in range(self.length): # length has special meaning here
457 slice = payload[6 + i * 7 : 13 + i * 7]
458 self.wifi_aps.append(
459 (":".join([format(b, "02X") for b in slice[:6]]), -slice[6])
461 gsm_slice = payload[6 + self.length * 7 :]
462 ncells, self.mcc, self.mnc = unpack("!BHB", gsm_slice[:4])
464 for i in range(ncells):
465 slice = gsm_slice[4 + i * 5 : 9 + i * 5]
466 locac, cellid, sigstr = unpack(
467 "!HHB", gsm_slice[4 + i * 5 : 9 + i * 5]
469 self.gsm_cells.append((locac, cellid, -sigstr))
471 def in_encode(self) -> bytes:
472 self.length = len(self.wifi_aps)
478 bytes.fromhex(mac.replace(":", "")).ljust(6, b"\0")[:6]
480 for mac, sigstr in self.wifi_aps
483 pack("!BHB", len(self.gsm_cells), self.mcc, self.mnc),
486 pack("!HHB", locac, cellid, -sigstr)
487 for locac, cellid, sigstr in self.gsm_cells
493 def rectified(self) -> SimpleNamespace: # JSON-able dict
494 return SimpleNamespace(
495 type="approximate_location",
496 devtime=str(self.devtime),
499 base_stations=self.gsm_cells,
500 wifi_aps=self.wifi_aps,
504 class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
506 RESPOND = Respond.INL
508 def out_encode(self) -> bytes:
509 return bytes.fromhex(datetime.utcnow().strftime("%y%m%d%H%M%S"))
512 class TIME(GPS303Pkt):
514 RESPOND = Respond.INL
516 def out_encode(self) -> bytes:
517 return pack("!HBBBBB", *datetime.utcnow().timetuple()[:6])
520 class PROHIBIT_LBS(GPS303Pkt):
522 OUT_KWARGS = (("status", int, 1),)
524 def out_encode(self) -> bytes: # Server sent, 0-off, 1-on
525 return pack("B", self.status)
528 class GPS_LBS_SWITCH_TIMES(GPS303Pkt):
532 ("gps_off", boolx, False), # Clarify the meaning of 0/1
533 ("gps_interval_set", boolx, False),
534 ("gps_interval", hhmmhhmm, "00000000"),
535 ("lbs_off", boolx, False), # Clarify the meaning of 0/1
536 ("boot_time_set", boolx, False),
537 ("boot_time", hhmm, "0000"),
538 ("shut_time_set", boolx, False),
539 ("shut_time", hhmm, "0000"),
542 def out_encode(self) -> bytes:
544 pack("B", self.gps_off)
545 + pack("B", self.gps_interval_set)
546 + bytes.fromhex(self.gps_interval)
547 + pack("B", self.lbs_off)
548 + pack("B", self.boot_time_set)
549 + bytes.fromhex(self.boot_time)
550 + pack("B", self.shut_time_set)
551 + bytes.fromhex(self.shut_time)
555 class _SET_PHONE(GPS303Pkt):
556 OUT_KWARGS = (("phone", str, ""),)
558 def out_encode(self) -> bytes:
560 return self.phone.encode("")
563 class REMOTE_MONITOR_PHONE(_SET_PHONE):
567 class SOS_PHONE(_SET_PHONE):
571 class DAD_PHONE(_SET_PHONE):
575 class MOM_PHONE(_SET_PHONE):
579 class STOP_UPLOAD(GPS303Pkt): # Server response to LOGIN to thwart the device
583 class GPS_OFF_PERIOD(GPS303Pkt):
587 ("fm", hhmm, "0000"),
588 ("to", hhmm, "2359"),
591 def out_encode(self) -> bytes:
593 pack("B", self.onoff)
594 + bytes.fromhex(self.fm)
595 + bytes.fromhex(self.to)
599 class DND_PERIOD(GPS303Pkt):
604 ("fm1", hhmm, "0000"),
605 ("to1", hhmm, "2359"),
606 ("fm2", hhmm, "0000"),
607 ("to2", hhmm, "2359"),
610 def out_encode(self) -> bytes:
612 pack("B", self.onoff)
613 + pack("B", self.week)
614 + bytes.fromhex(self.fm1)
615 + bytes.fromhex(self.to1)
616 + bytes.fromhex(self.fm2)
617 + bytes.fromhex(self.to2)
621 class RESTART_SHUTDOWN(GPS303Pkt):
623 OUT_KWARGS = (("flag", int, 0),)
625 def out_encode(self) -> bytes:
628 return pack("B", self.flag)
631 class DEVICE(GPS303Pkt):
633 OUT_KWARGS = (("flag", int, 0),)
635 # 0 - Stop looking for equipment
636 # 1 - Start looking for equipment
637 def out_encode(self) -> bytes:
638 return pack("B", self.flag)
641 class ALARM_CLOCK(GPS303Pkt):
644 Tuple[str, Callable[[Any], Any], List[Tuple[int, str]]], ...
646 ("alarms", l3alarms, []),
649 def out_encode(self) -> bytes:
651 pack("B", day) + bytes.fromhex(tm) for day, tm in self.alarms
655 class STOP_ALARM(GPS303Pkt):
658 def in_decode(self, length: int, payload: bytes) -> None:
659 self.flag = payload[0]
662 class SETUP(GPS303Pkt):
664 RESPOND = Respond.EXT
666 ("uploadintervalseconds", intx, 0x0300),
667 ("binaryswitch", intx, 0b00110001),
668 ("alarms", l3int, [0, 0, 0]),
669 ("dndtimeswitch", int, 0),
670 ("dndtimes", l3int, [0, 0, 0]),
671 ("gpstimeswitch", int, 0),
672 ("gpstimestart", int, 0),
673 ("gpstimestop", int, 0),
674 ("phonenumbers", l3str, ["", "", ""]),
677 def out_encode(self) -> bytes:
678 def pack3b(x: int) -> bytes:
679 return pack("!I", x)[1:]
683 pack("!H", self.uploadintervalseconds),
684 pack("B", self.binaryswitch),
686 + [pack3b(el) for el in self.alarms]
688 pack("B", self.dndtimeswitch),
690 + [pack3b(el) for el in self.dndtimes]
692 pack("B", self.gpstimeswitch),
693 pack("!H", self.gpstimestart),
694 pack("!H", self.gpstimestop),
696 + [b";".join([el.encode() for el in self.phonenumbers])]
699 def in_encode(self) -> bytes:
703 class SYNCHRONOUS_WHITELIST(GPS303Pkt):
707 class RESTORE_PASSWORD(GPS303Pkt):
711 class WIFI_POSITIONING(_WIFI_POSITIONING):
713 RESPOND = Respond.EXT
714 OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
716 def out_encode(self) -> bytes:
717 if self.latitude is None or self.longitude is None:
719 return "{:+#010.8g},{:+#010.8g}".format(
720 self.latitude, self.longitude
723 def out_decode(self, length: int, payload: bytes) -> None:
724 lat, lon = payload.decode().split(",")
725 self.latitude = float(lat)
726 self.longitude = float(lon)
729 class MANUAL_POSITIONING(GPS303Pkt):
732 def in_decode(self, length: int, payload: bytes) -> None:
733 self.flag = payload[0] if len(payload) > 0 else -1
738 4: "LBS search > 3 times",
739 5: "Same LBS and WiFi data",
740 6: "LBS prohibited, WiFi absent",
741 7: "GPS spacing < 50 m",
742 }.get(self.flag, "Unknown")
745 class BATTERY_CHARGE(GPS303Pkt):
749 class CHARGER_CONNECTED(GPS303Pkt):
753 class CHARGER_DISCONNECTED(GPS303Pkt):
757 class VIBRATION_RECEIVED(GPS303Pkt):
761 class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
763 RESPOND = Respond.EXT
764 OUT_KWARGS = (("interval", int, 10),)
766 def in_decode(self, length: int, payload: bytes) -> None:
767 self.interval = unpack("!H", payload[:2])
769 def out_encode(self) -> bytes:
770 return pack("!H", self.interval)
773 class SOS_ALARM(GPS303Pkt):
777 class UNKNOWN_B3(GPS303Pkt):
779 IN_KWARGS = (("asciidata", str, ""),)
781 def in_decode(self, length: int, payload: bytes) -> None:
782 self.asciidata = payload.decode()
785 # Build dicts protocol number -> class and class name -> protocol number
788 if True: # just to indent the code, sorry!
791 for name, cls in globals().items()
793 and issubclass(cls, GPS303Pkt)
794 and not name.startswith("_")
796 if hasattr(cls, "PROTO"):
797 CLASSES[cls.PROTO] = cls
798 PROTOS[cls.__name__] = cls.PROTO
803 ) -> Union[Type[GPS303Pkt], List[str]]:
804 if prefix.startswith(PROTO_PREFIX):
805 pname = prefix[len(PROTO_PREFIX) :]
807 raise KeyError(pname)
810 for name, proto in PROTOS.items()
811 if name.upper().startswith(prefix.upper())
814 return [name for name, _ in lst]
816 return CLASSES[proto]
819 def proto_handled(proto: str) -> bool:
820 return proto.startswith(PROTO_PREFIX)
823 def proto_name(obj: Union[Type[GPS303Pkt], GPS303Pkt]) -> str:
824 return PROTO_PREFIX + (
825 obj.__class__.__name__ if isinstance(obj, GPS303Pkt) else obj.__name__
829 def proto_of_message(packet: bytes) -> str:
830 return proto_name(CLASSES.get(packet[1], UNKNOWN))
833 def imei_from_packet(packet: bytes) -> Optional[str]:
834 if packet[1] == LOGIN.PROTO:
835 msg = parse_message(packet)
836 if isinstance(msg, LOGIN):
841 def is_goodbye_packet(packet: bytes) -> bool:
842 return packet[1] == HIBERNATION.PROTO
845 def inline_response(packet: bytes) -> Optional[bytes]:
849 if cls.RESPOND is Respond.INL:
850 return cls.Out().packed
854 def probe_buffer(buffer: bytes) -> bool:
855 framestart = buffer.find(b"xx")
858 if len(buffer) - framestart < 6:
863 def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
864 """From a packet (without framing bytes) derive the XXX.In object"""
865 length, proto = unpack("BB", packet[:2])
867 if proto not in CLASSES:
868 cause: Union[DecodeError, ValueError, IndexError] = ValueError(
869 f"Proto {proto} is unknown"
874 return CLASSES[proto].In(length, payload)
876 return CLASSES[proto].Out(length, payload)
877 except (DecodeError, ValueError, IndexError) as e:
880 retobj = UNKNOWN.In(length, payload)
882 retobj = UNKNOWN.Out(length, payload)
883 retobj.PROTO = proto # Override class attr with object attr
888 def exposed_protos() -> List[Tuple[str, bool]]:
890 (proto_name(cls), cls.RESPOND is Respond.EXT)
891 for cls in CLASSES.values()
892 if hasattr(cls, "rectified")