)
__all__ = (
- "GPS303Conn",
- "StreamError",
+ "Stream",
"class_by_prefix",
"inline_response",
"parse_message",
+ "probe_buffer",
"proto_by_name",
+ "proto_name",
"DecodeError",
"Respond",
"GPS303Pkt",
"UNKNOWN_B3",
)
+PROTO_PREFIX = "ZX"
+
### Deframer ###
MAXBUFFER: int = 4096
-class StreamError(Exception):
- pass
-
-
-class GPS303Conn:
+class Stream:
def __init__(self) -> None:
self.buffer = b""
def enframe(buffer: bytes) -> bytes:
return b"xx" + buffer + b"\r\n"
- def recv(self, segment: bytes) -> List[bytes]:
+ def recv(self, segment: bytes) -> List[Union[bytes, str]]:
+ """
+ Process next segment of the stream. Return successfully deframed
+ packets as `bytes` and error messages as `str`.
+ """
when = time()
self.buffer += segment
if len(self.buffer) > MAXBUFFER:
# We are receiving junk. Let's drop it or we run out of memory.
self.buffer = b""
- raise StreamError(
- f"More than {MAXBUFFER} unparseable data, dropping"
- )
- msgs = []
+ return [f"More than {MAXBUFFER} unparseable data, dropping"]
+ msgs: List[Union[bytes, str]] = []
while True:
framestart = self.buffer.find(b"xx")
if framestart == -1: # No frames, return whatever we have
break
if framestart > 0: # Should not happen, report
- self.buffer = self.buffer[framestart:]
- raise StreamError(
+ msgs.append(
f'Undecodable data ({framestart}) "{self.buffer[:framestart][:64].hex()}"'
)
+ self.buffer = self.buffer[framestart:]
# At this point, buffer starts with a packet
if len(self.buffer) < 6: # no len and proto - cannot proceed
break
packet = self.buffer[2:frameend]
self.buffer = self.buffer[frameend + 2 :]
if len(packet) < 2: # frameend comes too early
- raise StreamError(f"Packet too short: {packet.hex()}")
- msgs.append(packet)
+ msgs.append(f"Packet too short: {packet.hex()}")
+ else:
+ msgs.append(packet)
return msgs
def close(self) -> bytes:
return CLASSES[proto]
+def proto_name(obj: Union[MetaPkt, GPS303Pkt]) -> str:
+ return (
+ PROTO_PREFIX
+ + ":"
+ + (
+ obj.__class__.__name__
+ if isinstance(obj, GPS303Pkt)
+ else obj.__name__
+ )
+ ).ljust(16, "\0")[:16]
+
+
def proto_by_name(name: str) -> int:
return PROTOS.get(name, -1)
-def proto_of_message(packet: bytes) -> int:
- return packet[1]
+def proto_of_message(packet: bytes) -> str:
+ return proto_name(CLASSES.get(packet[1], UNKNOWN))
+
+
+def imei_from_packet(packet: bytes) -> Optional[str]:
+ if packet[1] == LOGIN.PROTO:
+ msg = parse_message(packet)
+ if isinstance(msg, LOGIN):
+ return msg.imei
+ return None
+
+
+def is_goodbye_packet(packet: bytes) -> bool:
+ return packet[1] == HIBERNATION.PROTO
def inline_response(packet: bytes) -> Optional[bytes]:
- proto = proto_of_message(packet)
+ proto = packet[1]
if proto in CLASSES:
cls = CLASSES[proto]
if cls.RESPOND is Respond.INL:
return None
+def probe_buffer(buffer: bytes) -> bool:
+ framestart = buffer.find(b"xx")
+ if framestart < 0:
+ return False
+ if len(buffer) - framestart < 6:
+ return False
+ return True
+
+
def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
"""From a packet (without framing bytes) derive the XXX.In object"""
length, proto = unpack("BB", packet[:2])