from enum import Enum
from inspect import isclass
from struct import error, pack, unpack
+from time import time
from typing import (
Any,
Callable,
)
__all__ = (
+ "Stream",
"class_by_prefix",
"inline_response",
"parse_message",
+ "probe_buffer",
"proto_by_name",
+ "proto_name",
"DecodeError",
"Respond",
"GPS303Pkt",
"UNKNOWN_B3",
)
+PROTO_PREFIX = "ZX"
+
+### Deframer ###
+
+MAXBUFFER: int = 4096
+
+
+class Stream:
+ def __init__(self) -> None:
+ self.buffer = b""
+
+ @staticmethod
+ def enframe(buffer: bytes) -> bytes:
+ return b"xx" + buffer + b"\r\n"
+
+ def recv(self, segment: bytes) -> List[Union[bytes, str]]:
+ """
+ Process next segment of the stream. Return successfully deframed
+ packets as `bytes` and error messages as `str`.
+ """
+ when = time()
+ self.buffer += segment
+ if len(self.buffer) > MAXBUFFER:
+ # We are receiving junk. Let's drop it or we run out of memory.
+ self.buffer = b""
+ return [f"More than {MAXBUFFER} unparseable data, dropping"]
+ msgs: List[Union[bytes, str]] = []
+ while True:
+ framestart = self.buffer.find(b"xx")
+ if framestart == -1: # No frames, return whatever we have
+ break
+ if framestart > 0: # Should not happen, report
+ msgs.append(
+ f'Undecodable data ({framestart}) "{self.buffer[:framestart][:64].hex()}"'
+ )
+ self.buffer = self.buffer[framestart:]
+ # At this point, buffer starts with a packet
+ if len(self.buffer) < 6: # no len and proto - cannot proceed
+ break
+ exp_end = self.buffer[2] + 3 # Expect '\r\n' here
+ frameend = 0
+ # Length field can legitimeely be much less than the
+ # length of the packet (e.g. WiFi positioning), but
+ # it _should not_ be greater. Still sometimes it is.
+ # Luckily, not by too much: by maybe two or three bytes?
+ # Do this embarrassing hack to avoid accidental match
+ # of some binary data in the packet against '\r\n'.
+ while True:
+ frameend = self.buffer.find(b"\r\n", frameend + 1)
+ if frameend == -1 or frameend >= (
+ exp_end - 3
+ ): # Found realistic match or none
+ break
+ if frameend == -1: # Incomplete frame, return what we have
+ break
+ packet = self.buffer[2:frameend]
+ self.buffer = self.buffer[frameend + 2 :]
+ if len(packet) < 2: # frameend comes too early
+ msgs.append(f"Packet too short: {packet.hex()}")
+ else:
+ msgs.append(packet)
+ return msgs
+
+ def close(self) -> bytes:
+ ret = self.buffer
+ self.buffer = b""
+ return ret
+
+
+### Parser/Constructor ###
+
class DecodeError(Exception):
def __init__(self, e: Exception, **kwargs: Any) -> None:
setattr(self, k, v)
-def maybe_int(x: Optional[int]) -> Optional[int]:
- return None if x is None else int(x)
+def maybe(typ: type) -> Callable[[Any], Any]:
+ return lambda x: None if x is None else typ(x)
def intx(x: Union[str, int]) -> int:
@property
def packed(self) -> bytes:
payload = self.encode()
- length = len(payload) + 1
+ length = getattr(self, "length", len(payload) + 1)
return pack("BB", length, self.PROTO) + payload
("ver", int, 0),
("timezone", int, 0),
("intvl", int, 0),
- ("signal", maybe_int, None),
+ ("signal", maybe(int), None),
)
OUT_KWARGS = (("upload_interval", int, 25),)
self.signal = None
def in_encode(self) -> bytes:
- return (
- pack("BBBB", self.batt, self.ver, self.timezone, self.intvl) + b""
- if self.signal is None
- else pack("B", self.signal)
+ return pack("BBBB", self.batt, self.ver, self.timezone, self.intvl) + (
+ b"" if self.signal is None else pack("B", self.signal)
)
def out_encode(self) -> bytes: # Set interval in minutes
class _WIFI_POSITIONING(GPS303Pkt):
+ IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = (
+ # IN_KWARGS = (
+ ("dtime", bytes, b"\0\0\0\0\0\0"),
+ ("wifi_aps", list, []),
+ ("mcc", int, 0),
+ ("mnc", int, 0),
+ ("gsm_cells", list, []),
+ )
+
def in_decode(self, length: int, payload: bytes) -> None:
self.dtime = payload[:6]
if self.dtime == b"\0\0\0\0\0\0":
)
self.gsm_cells.append((locac, cellid, -sigstr))
+ def in_encode(self) -> bytes:
+ self.length = len(self.wifi_aps)
+ return b"".join(
+ [
+ self.dtime,
+ b"".join(
+ [
+ bytes.fromhex(mac.replace(":", "")).ljust(6, b"\0")[:6]
+ + pack("B", -sigstr)
+ for mac, sigstr in self.wifi_aps
+ ]
+ ),
+ pack("!BHB", len(self.gsm_cells), self.mcc, self.mnc),
+ b"".join(
+ [
+ pack("!HHB", locac, cellid, -sigstr)
+ for locac, cellid, sigstr in self.gsm_cells
+ ]
+ ),
+ ]
+ )
+
class WIFI_OFFLINE_POSITIONING(_WIFI_POSITIONING):
PROTO = 0x17
+ [b";".join([el.encode() for el in self.phonenumbers])]
)
+ def in_encode(self) -> bytes:
+ return b""
+
class SYNCHRONOUS_WHITELIST(GPS303Pkt):
PROTO = 0x58
return CLASSES[proto]
+def proto_name(obj: Union[MetaPkt, GPS303Pkt]) -> str:
+ return (
+ PROTO_PREFIX
+ + ":"
+ + (
+ obj.__class__.__name__
+ if isinstance(obj, GPS303Pkt)
+ else obj.__name__
+ )
+ ).ljust(16, "\0")[:16]
+
+
def proto_by_name(name: str) -> int:
return PROTOS.get(name, -1)
-def proto_of_message(packet: bytes) -> int:
- return packet[1]
+def proto_of_message(packet: bytes) -> str:
+ return proto_name(CLASSES.get(packet[1], UNKNOWN))
+
+
+def imei_from_packet(packet: bytes) -> Optional[str]:
+ if packet[1] == LOGIN.PROTO:
+ msg = parse_message(packet)
+ if isinstance(msg, LOGIN):
+ return msg.imei
+ return None
+
+
+def is_goodbye_packet(packet: bytes) -> bool:
+ return packet[1] == HIBERNATION.PROTO
def inline_response(packet: bytes) -> Optional[bytes]:
- proto = proto_of_message(packet)
+ proto = packet[1]
if proto in CLASSES:
cls = CLASSES[proto]
if cls.RESPOND is Respond.INL:
return None
+def probe_buffer(buffer: bytes) -> bool:
+ framestart = buffer.find(b"xx")
+ if framestart < 0:
+ return False
+ if len(buffer) - framestart < 6:
+ return False
+ return True
+
+
def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
"""From a packet (without framing bytes) derive the XXX.In object"""
length, proto = unpack("BB", packet[:2])