]> www.average.org Git - loctrkd.git/blobdiff - gps303/collector.py
Initial multiprotocol support
[loctrkd.git] / gps303 / collector.py
index df1a474f057e0fb68c9af477fc43c714b083de01..8adaeb36fa4671b7f60f7409ecec45e320806215 100644 (file)
@@ -1,6 +1,7 @@
 """ TCP server that communicates with terminals """
 
 from configparser import ConfigParser
+from importlib import import_module
 from logging import getLogger
 from os import umask
 from socket import (
@@ -13,18 +14,10 @@ from socket import (
 )
 from struct import pack
 from time import time
-from typing import Dict, List, Optional, Tuple
+from typing import Any, cast, Dict, List, Optional, Tuple, Union
 import zmq
 
 from . import common
-from .gps303proto import (
-    GPS303Conn,
-    is_goodbye_packet,
-    imei_from_packet,
-    inline_response,
-    parse_message,
-    proto_of_message,
-)
 from .zmsg import Bcast, Resp
 
 log = getLogger("gps303/collector")
@@ -32,19 +25,67 @@ log = getLogger("gps303/collector")
 MAXBUFFER: int = 4096
 
 
+class ProtoModule:
+    class Stream:
+        @staticmethod
+        def enframe(buffer: bytes) -> bytes:
+            ...
+
+        def recv(self, segment: bytes) -> List[Union[bytes, str]]:
+            ...
+
+        def close(self) -> bytes:
+            ...
+
+    @staticmethod
+    def probe_buffer(buffer: bytes) -> bool:
+        ...
+
+    @staticmethod
+    def parse_message(packet: bytes, is_incoming: bool = True) -> Any:
+        ...
+
+    @staticmethod
+    def inline_response(packet: bytes) -> Optional[bytes]:
+        ...
+
+    @staticmethod
+    def is_goodbye_packet(packet: bytes) -> bool:
+        ...
+
+    @staticmethod
+    def imei_from_packet(packet: bytes) -> Optional[str]:
+        ...
+
+    @staticmethod
+    def proto_of_message(packet: bytes) -> int:
+        ...
+
+    @staticmethod
+    def proto_by_name(name: str) -> int:
+        ...
+
+
+pmods: List[ProtoModule] = []
+
+
 class Client:
     """Connected socket to the terminal plus buffer and metadata"""
 
     def __init__(self, sock: socket, addr: Tuple[str, int]) -> None:
         self.sock = sock
         self.addr = addr
-        self.stream = GPS303Conn()
+        self.pmod: Optional[ProtoModule] = None
+        self.stream: Optional[ProtoModule.Stream] = None
         self.imei: Optional[str] = None
 
     def close(self) -> None:
         log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei)
         self.sock.close()
-        rest = self.stream.close()
+        if self.stream:
+            rest = self.stream.close()
+        else:
+            rest = b""
         if rest:
             log.warning("%d bytes in buffer on close: %s", len(rest), rest)
 
@@ -67,6 +108,20 @@ class Client:
                 self.imei,
             )
             return None
+        if self.stream is None:
+            for pmod in pmods:
+                if pmod.probe_buffer(segment):
+                    self.pmod = pmod
+                    self.stream = pmod.Stream()
+                    break
+        if self.stream is None:
+            log.info(
+                "unrecognizable %d bytes of data %s from fd %d",
+                len(segment),
+                segment[:32].hex(),
+                self.sock.fileno(),
+            )
+            return []
         when = time()
         msgs = []
         for elem in self.stream.recv(segment):
@@ -82,6 +137,7 @@ class Client:
         return msgs
 
     def send(self, buffer: bytes) -> None:
+        assert self.stream is not None
         try:
             self.sock.send(self.stream.enframe(buffer))
         except OSError as e:
@@ -114,15 +170,18 @@ class Clients:
 
     def recv(
         self, fd: int
-    ) -> Optional[List[Tuple[Optional[str], float, Tuple[str, int], bytes]]]:
+    ) -> Optional[
+        List[Tuple[ProtoModule, Optional[str], float, Tuple[str, int], bytes]]
+    ]:
         clnt = self.by_fd[fd]
         msgs = clnt.recv()
         if msgs is None:
             return None
         result = []
         for when, peeraddr, packet in msgs:
+            assert clnt.pmod is not None
             if clnt.imei is None:
-                imei = imei_from_packet(packet)
+                imei = clnt.pmod.imei_from_packet(packet)
                 if imei is not None:
                     log.info("LOGIN from fd %d (IMEI %s)", fd, imei)
                     clnt.imei = imei
@@ -140,7 +199,7 @@ class Clients:
                         peeraddr,
                         packet,
                     )
-            result.append((clnt.imei, when, peeraddr, packet))
+            result.append((clnt.pmod, clnt.imei, when, peeraddr, packet))
             log.debug(
                 "Received from %s (IMEI %s): %s",
                 peeraddr,
@@ -149,14 +208,22 @@ class Clients:
             )
         return result
 
-    def response(self, resp: Resp) -> None:
+    def response(self, resp: Resp) -> Optional[ProtoModule]:
         if resp.imei in self.by_imei:
-            self.by_imei[resp.imei].send(resp.packet)
+            clnt = self.by_imei[resp.imei]
+            clnt.send(resp.packet)
+            return clnt.pmod
         else:
             log.info("Not connected (IMEI %s)", resp.imei)
+            return None
 
 
 def runserver(conf: ConfigParser, handle_hibernate: bool = True) -> None:
+    global pmods
+    pmods = [
+        cast(ProtoModule, import_module("." + modnm, __package__))
+        for modnm in conf.get("collector", "protocols").split(",")
+    ]
     # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
     zctx = zmq.Context()  # type: ignore
     zpub = zctx.socket(zmq.PUB)  # type: ignore
@@ -199,8 +266,8 @@ def runserver(conf: ConfigParser, handle_hibernate: bool = True) -> None:
                         log.debug("Terminal gone from fd %d", sk)
                         tostop.append(sk)
                     else:
-                        for imei, when, peeraddr, packet in received:
-                            proto = proto_of_message(packet)
+                        for pmod, imei, when, peeraddr, packet in received:
+                            proto = pmod.proto_of_message(packet)
                             zpub.send(
                                 Bcast(
                                     proto=proto,
@@ -210,14 +277,17 @@ def runserver(conf: ConfigParser, handle_hibernate: bool = True) -> None:
                                     packet=packet,
                                 ).packed
                             )
-                            if is_goodbye_packet(packet) and handle_hibernate:
+                            if (
+                                pmod.is_goodbye_packet(packet)
+                                and handle_hibernate
+                            ):
                                 log.debug(
                                     "Goodbye from fd %d (IMEI %s)",
                                     sk,
                                     imei,
                                 )
                                 tostop.append(sk)
-                            respmsg = inline_response(packet)
+                            respmsg = pmod.inline_response(packet)
                             if respmsg is not None:
                                 tosend.append(
                                     Resp(imei=imei, when=when, packet=respmsg)
@@ -226,17 +296,18 @@ def runserver(conf: ConfigParser, handle_hibernate: bool = True) -> None:
                     log.debug("Stray event: %s on socket %s", fl, sk)
             # poll queue consumed, make changes now
             for zmsg in tosend:
-                zpub.send(
-                    Bcast(
-                        is_incoming=False,
-                        proto=proto_of_message(zmsg.packet),
-                        when=zmsg.when,
-                        imei=zmsg.imei,
-                        packet=zmsg.packet,
-                    ).packed
-                )
                 log.debug("Sending to the client: %s", zmsg)
-                clients.response(zmsg)
+                rpmod = clients.response(zmsg)
+                if rpmod is not None:
+                    zpub.send(
+                        Bcast(
+                            is_incoming=False,
+                            proto=rpmod.proto_of_message(zmsg.packet),
+                            when=zmsg.when,
+                            imei=zmsg.imei,
+                            packet=zmsg.packet,
+                        ).packed
+                    )
             for fd in tostop:
                 poller.unregister(fd)  # type: ignore
                 clients.stop(fd)