]> www.average.org Git - loctrkd.git/blobdiff - gps303/collector.py
initial storage service
[loctrkd.git] / gps303 / collector.py
index 28c636d33642764118067bcadf0355377999b2c2..68c95bfb77d0ddbb75e8da64d25f6972d7e94847 100644 (file)
@@ -1,36 +1,45 @@
 """ TCP server that communicates with terminals """
 
 from getopt import getopt
-from logging import getLogger, StreamHandler, DEBUG, INFO
+from logging import getLogger
 from logging.handlers import SysLogHandler
 from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
 from time import time
-import sys
+from struct import pack
 import zmq
 
-from .config import readconfig
-from .gps303proto import handle_packet, make_response, LOGIN, set_config
-
-CONF = "/etc/gps303.conf"
+from . import common
+from .gps303proto import HIBERNATION, LOGIN, parse_message, proto_of_message
 
 log = getLogger("gps303/collector")
 
 
 class Bcast:
     """Zmq message to broadcast what was received from the terminal"""
+
     def __init__(self, imei, msg):
-        self.as_bytes = imei.encode() + msg.encode()
+        self.as_bytes = (
+            pack("B", proto_of_message(msg))
+            + ("0000000000000000" if imei is None else imei).encode()
+            + msg
+        )
 
 
 class Resp:
     """Zmq message received from a third party to send to the terminal"""
-    def __init__(self, msg):
-        self.imei = msg[:16].decode()
-        self.payload = msg[16:]
+
+    def __init__(self, *args, **kwargs):
+        if not kwargs and len(args) == 1 and isinstance(args[0], bytes):
+            self.imei = msg[:16].decode()
+            self.payload = msg[16:]
+        elif len(args) == 0:
+            self.imei = kwargs["imei"]
+            self.payload = kwargs["payload"]
 
 
 class Client:
     """Connected socket to the terminal plus buffer and metadata"""
+
     def __init__(self, sock, addr):
         self.sock = sock
         self.addr = addr
@@ -38,25 +47,69 @@ class Client:
         self.imei = None
 
     def close(self):
+        log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei)
         self.sock.close()
         self.buffer = b""
         self.imei = None
 
     def recv(self):
-        segment = self.sock.recv(4096)
-        if not segment:
+        """Read from the socket and parse complete messages"""
+        try:
+            segment = self.sock.recv(4096)
+        except OSError:
+            log.warning(
+                "Reading from fd %d (IMEI %s): %s",
+                self.sock.fileno(),
+                self.imei,
+                e,
+            )
+            return None
+        if not segment:  # Terminal has closed connection
+            log.info(
+                "EOF reading from fd %d (IMEI %s)",
+                self.sock.fileno(),
+                self.imei,
+            )
             return None
         when = time()
         self.buffer += segment
-        # implement framing properly
-        msg = handle_packet(packet, self.addr, when)
-        self.buffer = self.buffer[len(packet):]
-        if isinstance(msg, LOGIN):
-            self.imei = msg.imei
-        return msg
+        msgs = []
+        while True:
+            framestart = self.buffer.find(b"xx")
+            if framestart == -1:  # No frames, return whatever we have
+                break
+            if framestart > 0:  # Should not happen, report
+                log.warning(
+                    'Undecodable data "%s" from fd %d (IMEI %s)',
+                    self.buffer[:framestart].hex(),
+                    self.sock.fileno(),
+                    self.imei,
+                )
+                self.buffer = self.buffer[framestart:]
+            # At this point, buffer starts with a packet
+            frameend = self.buffer.find(b"\r\n", 4)
+            if frameend == -1:  # Incomplete frame, return what we have
+                break
+            packet = self.buffer[2:frameend]
+            self.buffer = self.buffer[frameend + 2 :]
+            if proto_of_message(packet) == LOGIN.PROTO:
+                self.imei = parse_message(packet).imei
+                log.info(
+                    "LOGIN from fd %d (IMEI %s)", self.sock.fileno(), self.imei
+                )
+            msgs.append(packet)
+        return msgs
 
     def send(self, buffer):
-        self.sock.send(buffer)
+        try:
+            self.sock.send(b"xx" + buffer + b"\r\n")
+        except OSError as e:
+            log.error(
+                "Sending to fd %d (IMEI %s): %s",
+                self.sock.fileno,
+                self.imei,
+                e,
+            )
 
 
 class Clients:
@@ -70,25 +123,31 @@ class Clients:
         return fd
 
     def stop(self, fd):
-        clnt = by_fd[fd]
+        clnt = self.by_fd[fd]
+        log.info("Stop serving fd %d (IMEI %s)", clnt.sock.fileno(), clnt.imei)
         clnt.close()
         if clnt.imei:
             del self.by_imei[clnt.imei]
         del self.by_fd[fd]
 
     def recv(self, fd):
-        clnt = by_fd[fd]
-        msg = clnt.recv()
-        if isinstance(msg, LOGIN):
-            self.by_imei[clnt.imei] = clnt
-        return clnt.imei, msg
+        clnt = self.by_fd[fd]
+        msgs = clnt.recv()
+        if msgs is None:
+            return None
+        result = []
+        for msg in msgs:
+            if proto_of_message(msg) == LOGIN.PROTO:  # Could do blindly...
+                self.by_imei[clnt.imei] = clnt
+            result.append((clnt.imei, msg))
+        return result
 
-    def response(self, zmsg):
-        if zmsg.imei in self.by_imei:
-            clnt = self.by_imei[zmsg.imei].send(zmsg.payload)
+    def response(self, resp):
+        if resp.imei in self.by_imei:
+            self.by_imei[resp.imei].send(resp.payload)
 
 
-def runserver(opts, conf):
+def runserver(conf):
     zctx = zmq.Context()
     zpub = zctx.socket(zmq.PUB)
     zpub.bind(conf.get("collector", "publishurl"))
@@ -118,34 +177,33 @@ def runserver(opts, conf):
                         except zmq.Again:
                             break
                 elif sk == tcpfd:
-                    clntsock, clntaddr = ctlsock.accept()
+                    clntsock, clntaddr = tcpl.accept()
                     topoll.append((clntsock, clntaddr))
                 else:
-                    imei, msg = clients.recv(sk)
-                    zpub.send(Bcast(imei, msg).as_bytes)
-                    if msg is None or isinstance(msg, HIBERNATION):
-                        tostop.append(sk)
+                    for imei, msg in clients.recv(sk):
+                        zpub.send(Bcast(imei, msg).as_bytes)
+                        if (
+                            msg is None
+                            or proto_of_message(msg) == HIBERNATION.PROTO
+                        ):
+                            log.debug(
+                                "HIBERNATION from fd %d (IMEI %s)", sk, imei
+                            )
+                            tostop.append(sk)
+                        elif proto_of_message(msg) == LOGIN.PROTO:
+                            clients.response(Resp(imei=imei, payload=LOGIN.response()))
             # poll queue consumed, make changes now
             for fd in tostop:
+                poller.unregister(fd)
                 clients.stop(fd)
-                pollset.unregister(fd)
             for zmsg in tosend:
                 clients.response(zmsg)
             for clntsock, clntaddr in topoll:
                 fd = clients.add(clntsock, clntaddr)
-                pollset.register(fd)
+                poller.register(fd)
     except KeyboardInterrupt:
         pass
 
 
 if __name__.endswith("__main__"):
-    opts, _ = getopt(sys.argv[1:], "c:d")
-    opts = dict(opts)
-    conf = readconfig(opts["-c"] if "-c" in opts else CONF)
-    if sys.stdout.isatty():
-        log.addHandler(StreamHandler(sys.stderr))
-    else:
-        log.addHandler(SysLogHandler(address="/dev/log"))
-    log.setLevel(DEBUG if "-d" in opts else INFO)
-    log.info("starting with options: %s", opts)
-    runserver(opts, conf)
+    runserver(common.init(log))