]> www.average.org Git - loctrkd.git/commitdiff
move stream parser/deframer to the protocol module
authorEugene Crosser <crosser@average.org>
Tue, 28 Jun 2022 20:27:31 +0000 (22:27 +0200)
committerEugene Crosser <crosser@average.org>
Thu, 14 Jul 2022 20:29:10 +0000 (22:29 +0200)
gps303/collector.py
gps303/gps303proto.py

index ce41cb57b1c91cdf7b3d1f656a1fe98057a672b7..5215d7b42289762b8b8bc79b2da0de0ec2203e09 100644 (file)
@@ -18,6 +18,8 @@ import zmq
 
 from . import common
 from .gps303proto import (
 
 from . import common
 from .gps303proto import (
+    GPS303Conn,
+    StreamError,
     HIBERNATION,
     LOGIN,
     inline_response,
     HIBERNATION,
     LOGIN,
     inline_response,
@@ -37,13 +39,15 @@ class Client:
     def __init__(self, sock: socket, addr: Tuple[str, int]) -> None:
         self.sock = sock
         self.addr = addr
     def __init__(self, sock: socket, addr: Tuple[str, int]) -> None:
         self.sock = sock
         self.addr = addr
-        self.buffer = b""
+        self.stream = GPS303Conn()
         self.imei: Optional[str] = None
 
     def close(self) -> None:
         log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei)
         self.sock.close()
         self.imei: Optional[str] = None
 
     def close(self) -> None:
         log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei)
         self.sock.close()
-        self.buffer = b""
+        rest = self.stream.close()
+        if rest:
+            log.warning("%d bytes in buffer on close: %s", len(rest), rest)
 
     def recv(self) -> Optional[List[Tuple[float, Tuple[str, int], bytes]]]:
         """Read from the socket and parse complete messages"""
 
     def recv(self) -> Optional[List[Tuple[float, Tuple[str, int], bytes]]]:
         """Read from the socket and parse complete messages"""
@@ -65,55 +69,20 @@ class Client:
             )
             return None
         when = time()
             )
             return None
         when = time()
-        self.buffer += segment
-        if len(self.buffer) > MAXBUFFER:
-            # We are receiving junk. Let's drop it or we run out of memory.
-            log.warning("More than %d unparseable data, dropping", MAXBUFFER)
-            self.buffer = b""
-        msgs = []
         while True:
         while True:
-            framestart = self.buffer.find(b"xx")
-            if framestart == -1:  # No frames, return whatever we have
-                break
-            if framestart > 0:  # Should not happen, report
+            try:
+                return [
+                    (when, self.addr, packet)
+                    for packet in self.stream.recv(segment)
+                ]
+            except StreamError as e:
                 log.warning(
                 log.warning(
-                    'Undecodable data (%d) "%s" from fd %d (IMEI %s)',
-                    framestart,
-                    self.buffer[:framestart][:64].hex(),
-                    self.sock.fileno(),
-                    self.imei,
+                    "%s from fd %d (IMEI %s)", e, self.sock.fileno(), self.imei
                 )
                 )
-                self.buffer = self.buffer[framestart:]
-            # At this point, buffer starts with a packet
-            if len(self.buffer) < 6:  # no len and proto - cannot proceed
-                break
-            exp_end = self.buffer[2] + 3  # Expect '\r\n' here
-            frameend = 0
-            # Length field can legitimeely be much less than the
-            # length of the packet (e.g. WiFi positioning), but
-            # it _should not_ be greater. Still sometimes it is.
-            # Luckily, not by too much: by maybe two or three bytes?
-            # Do this embarrassing hack to avoid accidental match
-            # of some binary data in the packet against '\r\n'.
-            while True:
-                frameend = self.buffer.find(b"\r\n", frameend + 1)
-                if frameend == -1 or frameend >= (
-                    exp_end - 3
-                ):  # Found realistic match or none
-                    break
-            if frameend == -1:  # Incomplete frame, return what we have
-                break
-            packet = self.buffer[2:frameend]
-            self.buffer = self.buffer[frameend + 2 :]
-            if len(packet) < 2:  # frameend comes too early
-                log.warning("Packet too short: %s", packet)
-                break
-            msgs.append((when, self.addr, packet))
-        return msgs
 
     def send(self, buffer: bytes) -> None:
         try:
 
     def send(self, buffer: bytes) -> None:
         try:
-            self.sock.send(b"xx" + buffer + b"\r\n")
+            self.sock.send(self.stream.enframe(buffer))
         except OSError as e:
             log.error(
                 "Sending to fd %d (IMEI %s): %s",
         except OSError as e:
             log.error(
                 "Sending to fd %d (IMEI %s): %s",
@@ -122,6 +91,9 @@ class Client:
                 e,
             )
 
                 e,
             )
 
+    def set_imei(self, imei: str) -> None:
+        self.imei = imei
+
 
 class Clients:
     def __init__(self) -> None:
 
 class Clients:
     def __init__(self) -> None:
index 0a222d05634930679995e6e4807d939d35104346..e6a80fe5118f4df0c8370539290a13c44e6de821 100755 (executable)
@@ -18,6 +18,7 @@ from datetime import datetime, timezone
 from enum import Enum
 from inspect import isclass
 from struct import error, pack, unpack
 from enum import Enum
 from inspect import isclass
 from struct import error, pack, unpack
+from time import time
 from typing import (
     Any,
     Callable,
 from typing import (
     Any,
     Callable,
@@ -31,6 +32,8 @@ from typing import (
 )
 
 __all__ = (
 )
 
 __all__ = (
+    "GPS303Conn",
+    "StreamError",
     "class_by_prefix",
     "inline_response",
     "parse_message",
     "class_by_prefix",
     "inline_response",
     "parse_message",
@@ -77,6 +80,76 @@ __all__ = (
     "UNKNOWN_B3",
 )
 
     "UNKNOWN_B3",
 )
 
+### Deframer ###
+
+MAXBUFFER: int = 4096
+
+
+class StreamError(Exception):
+    pass
+
+
+class GPS303Conn:
+    def __init__(self) -> None:
+        self.buffer = b""
+
+    @staticmethod
+    def enframe(buffer: bytes) -> bytes:
+        return b"xx" + buffer + b"\r\n"
+
+    def recv(self, segment: bytes) -> List[bytes]:
+        when = time()
+        self.buffer += segment
+        if len(self.buffer) > MAXBUFFER:
+            # We are receiving junk. Let's drop it or we run out of memory.
+            self.buffer = b""
+            raise StreamError(
+                f"More than {MAXBUFFER} unparseable data, dropping"
+            )
+        msgs = []
+        while True:
+            framestart = self.buffer.find(b"xx")
+            if framestart == -1:  # No frames, return whatever we have
+                break
+            if framestart > 0:  # Should not happen, report
+                self.buffer = self.buffer[framestart:]
+                raise StreamError(
+                    f'Undecodable data ({framestart}) "{self.buffer[:framestart][:64].hex()}"'
+                )
+            # At this point, buffer starts with a packet
+            if len(self.buffer) < 6:  # no len and proto - cannot proceed
+                break
+            exp_end = self.buffer[2] + 3  # Expect '\r\n' here
+            frameend = 0
+            # Length field can legitimeely be much less than the
+            # length of the packet (e.g. WiFi positioning), but
+            # it _should not_ be greater. Still sometimes it is.
+            # Luckily, not by too much: by maybe two or three bytes?
+            # Do this embarrassing hack to avoid accidental match
+            # of some binary data in the packet against '\r\n'.
+            while True:
+                frameend = self.buffer.find(b"\r\n", frameend + 1)
+                if frameend == -1 or frameend >= (
+                    exp_end - 3
+                ):  # Found realistic match or none
+                    break
+            if frameend == -1:  # Incomplete frame, return what we have
+                break
+            packet = self.buffer[2:frameend]
+            self.buffer = self.buffer[frameend + 2 :]
+            if len(packet) < 2:  # frameend comes too early
+                raise StreamError(f"Packet too short: {packet.hex()}")
+            msgs.append(packet)
+        return msgs
+
+    def close(self) -> bytes:
+        ret = self.buffer
+        self.buffer = b""
+        return ret
+
+
+### Parser/Constructor ###
+
 
 class DecodeError(Exception):
     def __init__(self, e: Exception, **kwargs: Any) -> None:
 
 class DecodeError(Exception):
     def __init__(self, e: Exception, **kwargs: Any) -> None: