]> www.average.org Git - loctrkd.git/commitdiff
Initial multiprotocol support
authorEugene Crosser <crosser@average.org>
Fri, 1 Jul 2022 23:33:07 +0000 (01:33 +0200)
committerEugene Crosser <crosser@average.org>
Thu, 14 Jul 2022 20:38:10 +0000 (22:38 +0200)
Protocol module is loaded dynamically

debian/gps303.conf
gps303/collector.py
gps303/gps303proto.py
test/common.py
test/test_storage.py

index 9d5ed3ca161c977f4018153d85765309cf502303..8bfff8e140f28faa3b271475585a116fb6cc39e4 100644 (file)
@@ -4,6 +4,8 @@
 port = 4303
 publishurl = ipc:///var/lib/gps303/collected
 listenurl = ipc:///var/lib/gps303/responses
 port = 4303
 publishurl = ipc:///var/lib/gps303/collected
 listenurl = ipc:///var/lib/gps303/responses
+# comma-separated list of tracker protocols to accept
+protocols = gps303proto
 
 [wsgateway]
 port = 5049
 
 [wsgateway]
 port = 5049
index df1a474f057e0fb68c9af477fc43c714b083de01..8adaeb36fa4671b7f60f7409ecec45e320806215 100644 (file)
@@ -1,6 +1,7 @@
 """ TCP server that communicates with terminals """
 
 from configparser import ConfigParser
 """ TCP server that communicates with terminals """
 
 from configparser import ConfigParser
+from importlib import import_module
 from logging import getLogger
 from os import umask
 from socket import (
 from logging import getLogger
 from os import umask
 from socket import (
@@ -13,18 +14,10 @@ from socket import (
 )
 from struct import pack
 from time import time
 )
 from struct import pack
 from time import time
-from typing import Dict, List, Optional, Tuple
+from typing import Any, cast, Dict, List, Optional, Tuple, Union
 import zmq
 
 from . import common
 import zmq
 
 from . import common
-from .gps303proto import (
-    GPS303Conn,
-    is_goodbye_packet,
-    imei_from_packet,
-    inline_response,
-    parse_message,
-    proto_of_message,
-)
 from .zmsg import Bcast, Resp
 
 log = getLogger("gps303/collector")
 from .zmsg import Bcast, Resp
 
 log = getLogger("gps303/collector")
@@ -32,19 +25,67 @@ log = getLogger("gps303/collector")
 MAXBUFFER: int = 4096
 
 
 MAXBUFFER: int = 4096
 
 
+class ProtoModule:
+    class Stream:
+        @staticmethod
+        def enframe(buffer: bytes) -> bytes:
+            ...
+
+        def recv(self, segment: bytes) -> List[Union[bytes, str]]:
+            ...
+
+        def close(self) -> bytes:
+            ...
+
+    @staticmethod
+    def probe_buffer(buffer: bytes) -> bool:
+        ...
+
+    @staticmethod
+    def parse_message(packet: bytes, is_incoming: bool = True) -> Any:
+        ...
+
+    @staticmethod
+    def inline_response(packet: bytes) -> Optional[bytes]:
+        ...
+
+    @staticmethod
+    def is_goodbye_packet(packet: bytes) -> bool:
+        ...
+
+    @staticmethod
+    def imei_from_packet(packet: bytes) -> Optional[str]:
+        ...
+
+    @staticmethod
+    def proto_of_message(packet: bytes) -> int:
+        ...
+
+    @staticmethod
+    def proto_by_name(name: str) -> int:
+        ...
+
+
+pmods: List[ProtoModule] = []
+
+
 class Client:
     """Connected socket to the terminal plus buffer and metadata"""
 
     def __init__(self, sock: socket, addr: Tuple[str, int]) -> None:
         self.sock = sock
         self.addr = addr
 class Client:
     """Connected socket to the terminal plus buffer and metadata"""
 
     def __init__(self, sock: socket, addr: Tuple[str, int]) -> None:
         self.sock = sock
         self.addr = addr
-        self.stream = GPS303Conn()
+        self.pmod: Optional[ProtoModule] = None
+        self.stream: Optional[ProtoModule.Stream] = None
         self.imei: Optional[str] = None
 
     def close(self) -> None:
         log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei)
         self.sock.close()
         self.imei: Optional[str] = None
 
     def close(self) -> None:
         log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei)
         self.sock.close()
-        rest = self.stream.close()
+        if self.stream:
+            rest = self.stream.close()
+        else:
+            rest = b""
         if rest:
             log.warning("%d bytes in buffer on close: %s", len(rest), rest)
 
         if rest:
             log.warning("%d bytes in buffer on close: %s", len(rest), rest)
 
@@ -67,6 +108,20 @@ class Client:
                 self.imei,
             )
             return None
                 self.imei,
             )
             return None
+        if self.stream is None:
+            for pmod in pmods:
+                if pmod.probe_buffer(segment):
+                    self.pmod = pmod
+                    self.stream = pmod.Stream()
+                    break
+        if self.stream is None:
+            log.info(
+                "unrecognizable %d bytes of data %s from fd %d",
+                len(segment),
+                segment[:32].hex(),
+                self.sock.fileno(),
+            )
+            return []
         when = time()
         msgs = []
         for elem in self.stream.recv(segment):
         when = time()
         msgs = []
         for elem in self.stream.recv(segment):
@@ -82,6 +137,7 @@ class Client:
         return msgs
 
     def send(self, buffer: bytes) -> None:
         return msgs
 
     def send(self, buffer: bytes) -> None:
+        assert self.stream is not None
         try:
             self.sock.send(self.stream.enframe(buffer))
         except OSError as e:
         try:
             self.sock.send(self.stream.enframe(buffer))
         except OSError as e:
@@ -114,15 +170,18 @@ class Clients:
 
     def recv(
         self, fd: int
 
     def recv(
         self, fd: int
-    ) -> Optional[List[Tuple[Optional[str], float, Tuple[str, int], bytes]]]:
+    ) -> Optional[
+        List[Tuple[ProtoModule, Optional[str], float, Tuple[str, int], bytes]]
+    ]:
         clnt = self.by_fd[fd]
         msgs = clnt.recv()
         if msgs is None:
             return None
         result = []
         for when, peeraddr, packet in msgs:
         clnt = self.by_fd[fd]
         msgs = clnt.recv()
         if msgs is None:
             return None
         result = []
         for when, peeraddr, packet in msgs:
+            assert clnt.pmod is not None
             if clnt.imei is None:
             if clnt.imei is None:
-                imei = imei_from_packet(packet)
+                imei = clnt.pmod.imei_from_packet(packet)
                 if imei is not None:
                     log.info("LOGIN from fd %d (IMEI %s)", fd, imei)
                     clnt.imei = imei
                 if imei is not None:
                     log.info("LOGIN from fd %d (IMEI %s)", fd, imei)
                     clnt.imei = imei
@@ -140,7 +199,7 @@ class Clients:
                         peeraddr,
                         packet,
                     )
                         peeraddr,
                         packet,
                     )
-            result.append((clnt.imei, when, peeraddr, packet))
+            result.append((clnt.pmod, clnt.imei, when, peeraddr, packet))
             log.debug(
                 "Received from %s (IMEI %s): %s",
                 peeraddr,
             log.debug(
                 "Received from %s (IMEI %s): %s",
                 peeraddr,
@@ -149,14 +208,22 @@ class Clients:
             )
         return result
 
             )
         return result
 
-    def response(self, resp: Resp) -> None:
+    def response(self, resp: Resp) -> Optional[ProtoModule]:
         if resp.imei in self.by_imei:
         if resp.imei in self.by_imei:
-            self.by_imei[resp.imei].send(resp.packet)
+            clnt = self.by_imei[resp.imei]
+            clnt.send(resp.packet)
+            return clnt.pmod
         else:
             log.info("Not connected (IMEI %s)", resp.imei)
         else:
             log.info("Not connected (IMEI %s)", resp.imei)
+            return None
 
 
 def runserver(conf: ConfigParser, handle_hibernate: bool = True) -> None:
 
 
 def runserver(conf: ConfigParser, handle_hibernate: bool = True) -> None:
+    global pmods
+    pmods = [
+        cast(ProtoModule, import_module("." + modnm, __package__))
+        for modnm in conf.get("collector", "protocols").split(",")
+    ]
     # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
     zctx = zmq.Context()  # type: ignore
     zpub = zctx.socket(zmq.PUB)  # type: ignore
     # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
     zctx = zmq.Context()  # type: ignore
     zpub = zctx.socket(zmq.PUB)  # type: ignore
@@ -199,8 +266,8 @@ def runserver(conf: ConfigParser, handle_hibernate: bool = True) -> None:
                         log.debug("Terminal gone from fd %d", sk)
                         tostop.append(sk)
                     else:
                         log.debug("Terminal gone from fd %d", sk)
                         tostop.append(sk)
                     else:
-                        for imei, when, peeraddr, packet in received:
-                            proto = proto_of_message(packet)
+                        for pmod, imei, when, peeraddr, packet in received:
+                            proto = pmod.proto_of_message(packet)
                             zpub.send(
                                 Bcast(
                                     proto=proto,
                             zpub.send(
                                 Bcast(
                                     proto=proto,
@@ -210,14 +277,17 @@ def runserver(conf: ConfigParser, handle_hibernate: bool = True) -> None:
                                     packet=packet,
                                 ).packed
                             )
                                     packet=packet,
                                 ).packed
                             )
-                            if is_goodbye_packet(packet) and handle_hibernate:
+                            if (
+                                pmod.is_goodbye_packet(packet)
+                                and handle_hibernate
+                            ):
                                 log.debug(
                                     "Goodbye from fd %d (IMEI %s)",
                                     sk,
                                     imei,
                                 )
                                 tostop.append(sk)
                                 log.debug(
                                     "Goodbye from fd %d (IMEI %s)",
                                     sk,
                                     imei,
                                 )
                                 tostop.append(sk)
-                            respmsg = inline_response(packet)
+                            respmsg = pmod.inline_response(packet)
                             if respmsg is not None:
                                 tosend.append(
                                     Resp(imei=imei, when=when, packet=respmsg)
                             if respmsg is not None:
                                 tosend.append(
                                     Resp(imei=imei, when=when, packet=respmsg)
@@ -226,17 +296,18 @@ def runserver(conf: ConfigParser, handle_hibernate: bool = True) -> None:
                     log.debug("Stray event: %s on socket %s", fl, sk)
             # poll queue consumed, make changes now
             for zmsg in tosend:
                     log.debug("Stray event: %s on socket %s", fl, sk)
             # poll queue consumed, make changes now
             for zmsg in tosend:
-                zpub.send(
-                    Bcast(
-                        is_incoming=False,
-                        proto=proto_of_message(zmsg.packet),
-                        when=zmsg.when,
-                        imei=zmsg.imei,
-                        packet=zmsg.packet,
-                    ).packed
-                )
                 log.debug("Sending to the client: %s", zmsg)
                 log.debug("Sending to the client: %s", zmsg)
-                clients.response(zmsg)
+                rpmod = clients.response(zmsg)
+                if rpmod is not None:
+                    zpub.send(
+                        Bcast(
+                            is_incoming=False,
+                            proto=rpmod.proto_of_message(zmsg.packet),
+                            when=zmsg.when,
+                            imei=zmsg.imei,
+                            packet=zmsg.packet,
+                        ).packed
+                    )
             for fd in tostop:
                 poller.unregister(fd)  # type: ignore
                 clients.stop(fd)
             for fd in tostop:
                 poller.unregister(fd)  # type: ignore
                 clients.stop(fd)
index e597b2bbc564da0e24e37f39aa69b5e725b68e2b..e3776c3a529276fd333d619ea1946a8264f8337b 100755 (executable)
@@ -32,10 +32,11 @@ from typing import (
 )
 
 __all__ = (
 )
 
 __all__ = (
-    "GPS303Conn",
+    "Stream",
     "class_by_prefix",
     "inline_response",
     "parse_message",
     "class_by_prefix",
     "inline_response",
     "parse_message",
+    "probe_buffer",
     "proto_by_name",
     "DecodeError",
     "Respond",
     "proto_by_name",
     "DecodeError",
     "Respond",
@@ -84,7 +85,7 @@ __all__ = (
 MAXBUFFER: int = 4096
 
 
 MAXBUFFER: int = 4096
 
 
-class GPS303Conn:
+class Stream:
     def __init__(self) -> None:
         self.buffer = b""
 
     def __init__(self) -> None:
         self.buffer = b""
 
@@ -900,6 +901,15 @@ def inline_response(packet: bytes) -> Optional[bytes]:
     return None
 
 
     return None
 
 
+def probe_buffer(buffer: bytes) -> bool:
+    framestart = buffer.find(b"xx")
+    if framestart < 0:
+        return False
+    if len(buffer) - framestart < 6:
+        return False
+    return True
+
+
 def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
     """From a packet (without framing bytes) derive the XXX.In object"""
     length, proto = unpack("BB", packet[:2])
 def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
     """From a packet (without framing bytes) derive the XXX.In object"""
     length, proto = unpack("BB", packet[:2])
index 1672f46029c663eb79436d0add7bbc5ef9259227..ba4c12ade61f3bd19e42cbbdbd2646978377b363 100644 (file)
@@ -4,6 +4,7 @@ from configparser import ConfigParser, SectionProxy
 from contextlib import closing, ExitStack
 from http.server import HTTPServer, SimpleHTTPRequestHandler
 from importlib import import_module
 from contextlib import closing, ExitStack
 from http.server import HTTPServer, SimpleHTTPRequestHandler
 from importlib import import_module
+from logging import DEBUG, StreamHandler
 from multiprocessing import Process
 from os import kill, unlink
 from signal import SIGINT
 from multiprocessing import Process
 from os import kill, unlink
 from signal import SIGINT
@@ -16,7 +17,7 @@ from socket import (
     socket,
     SocketType,
 )
     socket,
     SocketType,
 )
-from sys import exit
+from sys import exit, stderr
 from tempfile import mkstemp
 from time import sleep
 from typing import Optional
 from tempfile import mkstemp
 from time import sleep
 from typing import Optional
@@ -26,7 +27,9 @@ NUMPORTS = 3
 
 
 class TestWithServers(TestCase):
 
 
 class TestWithServers(TestCase):
-    def setUp(self, *args: str, httpd: bool = False) -> None:
+    def setUp(
+        self, *args: str, httpd: bool = False, verbose: bool = False
+    ) -> None:
         freeports = []
         with ExitStack() as stack:
             for _ in range(NUMPORTS):
         freeports = []
         with ExitStack() as stack:
             for _ in range(NUMPORTS):
@@ -40,6 +43,7 @@ class TestWithServers(TestCase):
             "port": str(freeports[0]),
             "publishurl": "ipc://" + self.tmpfilebase + ".pub",
             "listenurl": "ipc://" + self.tmpfilebase + ".pul",
             "port": str(freeports[0]),
             "publishurl": "ipc://" + self.tmpfilebase + ".pub",
             "listenurl": "ipc://" + self.tmpfilebase + ".pul",
+            "protocols": "gps303proto",
         }
         self.conf["storage"] = {
             "dbfn": self.tmpfilebase + ".storage.sqlite",
         }
         self.conf["storage"] = {
             "dbfn": self.tmpfilebase + ".storage.sqlite",
@@ -61,6 +65,9 @@ class TestWithServers(TestCase):
             else:
                 kwargs = {}
             cls = import_module("gps303." + srvname, package=".")
             else:
                 kwargs = {}
             cls = import_module("gps303." + srvname, package=".")
+            if verbose:
+                cls.log.addHandler(StreamHandler(stderr))
+                cls.log.setLevel(DEBUG)
             p = Process(target=cls.runserver, args=(self.conf,), kwargs=kwargs)
             p.start()
             self.children.append((srvname, p))
             p = Process(target=cls.runserver, args=(self.conf,), kwargs=kwargs)
             p.start()
             self.children.append((srvname, p))
index 449aa514c8686baaaed232f33aef1f420a76052a..17477379edd6b91f8dd7b6c426d4ab04e777305b 100644 (file)
@@ -13,7 +13,9 @@ from gps303.ocid_dload import SCHEMA
 
 class Storage(TestWithServers):
     def setUp(self, *args: str, **kwargs: Any) -> None:
 
 class Storage(TestWithServers):
     def setUp(self, *args: str, **kwargs: Any) -> None:
-        super().setUp("collector", "storage", "lookaside", "termconfig")
+        super().setUp(
+            "collector", "storage", "lookaside", "termconfig", verbose=True
+        )
         with connect(self.conf.get("opencellid", "dbfn")) as ldb:
             ldb.execute(SCHEMA)
             ldb.executemany(
         with connect(self.conf.get("opencellid", "dbfn")) as ldb:
             ldb.execute(SCHEMA)
             ldb.executemany(
@@ -184,8 +186,8 @@ class Storage(TestWithServers):
         super().tearDown()
 
     def test_storage(self) -> None:
         super().tearDown()
 
     def test_storage(self) -> None:
-        for buf in (
-            LOGIN.In(imei="9999123456780000", ver=9).packed,
+        for msg in (
+            LOGIN.In(imei="9999123456780000", ver=9),
             WIFI_POSITIONING.In(
                 mnc=3,
                 mcc=262,
             WIFI_POSITIONING.In(
                 mnc=3,
                 mcc=262,
@@ -198,21 +200,22 @@ class Storage(TestWithServers):
                     (24420, 36243, -78),
                     (24420, 17012, -44),
                 ],
                     (24420, 36243, -78),
                     (24420, 17012, -44),
                 ],
-            ).packed,
-            SETUP.In().packed,
-            STATUS.In(signal=87).packed,
-            HIBERNATION.In().packed,
+            ),
+            SETUP.In(),
+            STATUS.In(signal=87),
+            HIBERNATION.In(),
         ):
         ):
-            send_and_drain(self.sock, b"xx" + buf + b"\r\n")
-        self.sock.close()
+            print("Send:", msg)
+            send_and_drain(self.sock, b"xx" + msg.packed + b"\r\n")
         sleep(1)
         sleep(1)
+        self.sock.close()
         got = set()
         with connect(self.conf.get("storage", "dbfn")) as db:
             for is_incoming, packet in db.execute(
                 "select is_incoming, packet from events"
             ):
                 msg = parse_message(packet, is_incoming=is_incoming)
         got = set()
         with connect(self.conf.get("storage", "dbfn")) as db:
             for is_incoming, packet in db.execute(
                 "select is_incoming, packet from events"
             ):
                 msg = parse_message(packet, is_incoming=is_incoming)
-                print(msg)
+                print("Stored:", msg)
                 got.add(type(msg))
         self.assertEqual(
             got,
                 got.add(type(msg))
         self.assertEqual(
             got,