from enum import Enum
from inspect import isclass
from struct import error, pack, unpack
+from time import time
from typing import (
Any,
Callable,
)
__all__ = (
+ "Stream",
"class_by_prefix",
"inline_response",
"parse_message",
+ "probe_buffer",
"proto_by_name",
+ "proto_name",
"DecodeError",
"Respond",
"GPS303Pkt",
"UNKNOWN_B3",
)
+PROTO_PREFIX = "ZX"
+
+### Deframer ###
+
+MAXBUFFER: int = 4096
+
+
+class Stream:
+ def __init__(self) -> None:
+ self.buffer = b""
+
+ @staticmethod
+ def enframe(buffer: bytes) -> bytes:
+ return b"xx" + buffer + b"\r\n"
+
+ def recv(self, segment: bytes) -> List[Union[bytes, str]]:
+ """
+ Process next segment of the stream. Return successfully deframed
+ packets as `bytes` and error messages as `str`.
+ """
+ when = time()
+ self.buffer += segment
+ if len(self.buffer) > MAXBUFFER:
+ # We are receiving junk. Let's drop it or we run out of memory.
+ self.buffer = b""
+ return [f"More than {MAXBUFFER} unparseable data, dropping"]
+ msgs: List[Union[bytes, str]] = []
+ while True:
+ framestart = self.buffer.find(b"xx")
+ if framestart == -1: # No frames, return whatever we have
+ break
+ if framestart > 0: # Should not happen, report
+ msgs.append(
+ f'Undecodable data ({framestart}) "{self.buffer[:framestart][:64].hex()}"'
+ )
+ self.buffer = self.buffer[framestart:]
+ # At this point, buffer starts with a packet
+ if len(self.buffer) < 6: # no len and proto - cannot proceed
+ break
+ exp_end = self.buffer[2] + 3 # Expect '\r\n' here
+ frameend = 0
+ # Length field can legitimeely be much less than the
+ # length of the packet (e.g. WiFi positioning), but
+ # it _should not_ be greater. Still sometimes it is.
+ # Luckily, not by too much: by maybe two or three bytes?
+ # Do this embarrassing hack to avoid accidental match
+ # of some binary data in the packet against '\r\n'.
+ while True:
+ frameend = self.buffer.find(b"\r\n", frameend + 1)
+ if frameend == -1 or frameend >= (
+ exp_end - 3
+ ): # Found realistic match or none
+ break
+ if frameend == -1: # Incomplete frame, return what we have
+ break
+ packet = self.buffer[2:frameend]
+ self.buffer = self.buffer[frameend + 2 :]
+ if len(packet) < 2: # frameend comes too early
+ msgs.append(f"Packet too short: {packet.hex()}")
+ else:
+ msgs.append(packet)
+ return msgs
+
+ def close(self) -> bytes:
+ ret = self.buffer
+ self.buffer = b""
+ return ret
+
+
+### Parser/Constructor ###
+
class DecodeError(Exception):
def __init__(self, e: Exception, **kwargs: Any) -> None:
return CLASSES[proto]
+def proto_name(obj: Union[MetaPkt, GPS303Pkt]) -> str:
+ return (
+ PROTO_PREFIX
+ + ":"
+ + (
+ obj.__class__.__name__
+ if isinstance(obj, GPS303Pkt)
+ else obj.__name__
+ )
+ ).ljust(16, "\0")[:16]
+
+
def proto_by_name(name: str) -> int:
return PROTOS.get(name, -1)
-def proto_of_message(packet: bytes) -> int:
- return packet[1]
+def proto_of_message(packet: bytes) -> str:
+ return proto_name(CLASSES.get(packet[1], UNKNOWN))
+
+
+def imei_from_packet(packet: bytes) -> Optional[str]:
+ if packet[1] == LOGIN.PROTO:
+ msg = parse_message(packet)
+ if isinstance(msg, LOGIN):
+ return msg.imei
+ return None
+
+
+def is_goodbye_packet(packet: bytes) -> bool:
+ return packet[1] == HIBERNATION.PROTO
def inline_response(packet: bytes) -> Optional[bytes]:
- proto = proto_of_message(packet)
+ proto = packet[1]
if proto in CLASSES:
cls = CLASSES[proto]
if cls.RESPOND is Respond.INL:
return None
+def probe_buffer(buffer: bytes) -> bool:
+ framestart = buffer.find(b"xx")
+ if framestart < 0:
+ return False
+ if len(buffer) - framestart < 6:
+ return False
+ return True
+
+
def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
"""From a packet (without framing bytes) derive the XXX.In object"""
length, proto = unpack("BB", packet[:2])