]> www.average.org Git - loctrkd.git/blobdiff - gps303/collector.py
initial storage service
[loctrkd.git] / gps303 / collector.py
index 345d0b99a0de7048fc75f8a1a515007d2233318d..68c95bfb77d0ddbb75e8da64d25f6972d7e94847 100644 (file)
@@ -1,36 +1,45 @@
 """ TCP server that communicates with terminals """
 
 from getopt import getopt
-from logging import getLogger, StreamHandler, DEBUG, INFO
+from logging import getLogger
 from logging.handlers import SysLogHandler
 from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
 from time import time
-import sys
+from struct import pack
 import zmq
 
-from .config import readconfig
-from .gps303proto import parse_message, HIBERNATION, LOGIN, set_config
-
-CONF = "/etc/gps303.conf"
+from . import common
+from .gps303proto import HIBERNATION, LOGIN, parse_message, proto_of_message
 
 log = getLogger("gps303/collector")
 
 
 class Bcast:
     """Zmq message to broadcast what was received from the terminal"""
+
     def __init__(self, imei, msg):
-        self.as_bytes = imei.encode() + msg.to_packet()
+        self.as_bytes = (
+            pack("B", proto_of_message(msg))
+            + ("0000000000000000" if imei is None else imei).encode()
+            + msg
+        )
 
 
 class Resp:
     """Zmq message received from a third party to send to the terminal"""
-    def __init__(self, msg):
-        self.imei = msg[:16].decode()
-        self.payload = msg[16:]
+
+    def __init__(self, *args, **kwargs):
+        if not kwargs and len(args) == 1 and isinstance(args[0], bytes):
+            self.imei = msg[:16].decode()
+            self.payload = msg[16:]
+        elif len(args) == 0:
+            self.imei = kwargs["imei"]
+            self.payload = kwargs["payload"]
 
 
 class Client:
     """Connected socket to the terminal plus buffer and metadata"""
+
     def __init__(self, sock, addr):
         self.sock = sock
         self.addr = addr
@@ -44,16 +53,23 @@ class Client:
         self.imei = None
 
     def recv(self):
-        """ Read from the socket and parse complete messages """
+        """Read from the socket and parse complete messages"""
         try:
             segment = self.sock.recv(4096)
         except OSError:
-            log.warning("Reading from fd %d (IMEI %s): %s",
-                    self.sock.fileno(), self.imei, e)
+            log.warning(
+                "Reading from fd %d (IMEI %s): %s",
+                self.sock.fileno(),
+                self.imei,
+                e,
+            )
             return None
         if not segment:  # Terminal has closed connection
-            log.info("EOF reading from fd %d (IMEI %s)",
-                    self.sock.fileno(), self.imei)
+            log.info(
+                "EOF reading from fd %d (IMEI %s)",
+                self.sock.fileno(),
+                self.imei,
+            )
             return None
         when = time()
         self.buffer += segment
@@ -63,28 +79,38 @@ class Client:
             if framestart == -1:  # No frames, return whatever we have
                 break
             if framestart > 0:  # Should not happen, report
-                log.warning("Undecodable data \"%s\" from fd %d (IMEI %s)",
-                        self.buffer[:framestart].hex(), self.sock.fileno(), self.imei)
+                log.warning(
+                    'Undecodable data "%s" from fd %d (IMEI %s)',
+                    self.buffer[:framestart].hex(),
+                    self.sock.fileno(),
+                    self.imei,
+                )
                 self.buffer = self.buffer[framestart:]
             # At this point, buffer starts with a packet
             frameend = self.buffer.find(b"\r\n", 4)
             if frameend == -1:  # Incomplete frame, return what we have
                 break
-            msg = parse_message(self.buffer[2:frameend])
-            self.buffer = self.buffer[frameend+2:]
-            if isinstance(msg, LOGIN):
-                self.imei = msg.imei
-                log.info("LOGIN from fd %d: IMEI %s",
-                        self.sock.fileno(), self.imei)
-            msgs.append(msg)
+            packet = self.buffer[2:frameend]
+            self.buffer = self.buffer[frameend + 2 :]
+            if proto_of_message(packet) == LOGIN.PROTO:
+                self.imei = parse_message(packet).imei
+                log.info(
+                    "LOGIN from fd %d (IMEI %s)", self.sock.fileno(), self.imei
+                )
+            msgs.append(packet)
         return msgs
 
     def send(self, buffer):
         try:
             self.sock.send(b"xx" + buffer + b"\r\n")
         except OSError as e:
-            log.error("Sending to fd %d (IMEI %s): %s",
-                    self.sock.fileno, self.imei, e)
+            log.error(
+                "Sending to fd %d (IMEI %s): %s",
+                self.sock.fileno,
+                self.imei,
+                e,
+            )
+
 
 class Clients:
     def __init__(self):
@@ -107,9 +133,11 @@ class Clients:
     def recv(self, fd):
         clnt = self.by_fd[fd]
         msgs = clnt.recv()
+        if msgs is None:
+            return None
         result = []
         for msg in msgs:
-            if isinstance(msg, LOGIN):
+            if proto_of_message(msg) == LOGIN.PROTO:  # Could do blindly...
                 self.by_imei[clnt.imei] = clnt
             result.append((clnt.imei, msg))
         return result
@@ -119,7 +147,7 @@ class Clients:
             self.by_imei[resp.imei].send(resp.payload)
 
 
-def runserver(opts, conf):
+def runserver(conf):
     zctx = zmq.Context()
     zpub = zctx.socket(zmq.PUB)
     zpub.bind(conf.get("collector", "publishurl"))
@@ -154,9 +182,16 @@ def runserver(opts, conf):
                 else:
                     for imei, msg in clients.recv(sk):
                         zpub.send(Bcast(imei, msg).as_bytes)
-                        if msg is None or isinstance(msg, HIBERNATION):
-                            log.debug("HIBERNATION from fd %d", sk)
+                        if (
+                            msg is None
+                            or proto_of_message(msg) == HIBERNATION.PROTO
+                        ):
+                            log.debug(
+                                "HIBERNATION from fd %d (IMEI %s)", sk, imei
+                            )
                             tostop.append(sk)
+                        elif proto_of_message(msg) == LOGIN.PROTO:
+                            clients.response(Resp(imei=imei, payload=LOGIN.response()))
             # poll queue consumed, make changes now
             for fd in tostop:
                 poller.unregister(fd)
@@ -171,13 +206,4 @@ def runserver(opts, conf):
 
 
 if __name__.endswith("__main__"):
-    opts, _ = getopt(sys.argv[1:], "c:d")
-    opts = dict(opts)
-    conf = readconfig(opts["-c"] if "-c" in opts else CONF)
-    if sys.stdout.isatty():
-        log.addHandler(StreamHandler(sys.stderr))
-    else:
-        log.addHandler(SysLogHandler(address="/dev/log"))
-    log.setLevel(DEBUG if "-d" in opts else INFO)
-    log.info("starting with options: %s", opts)
-    runserver(opts, conf)
+    runserver(common.init(log))