]> www.average.org Git - loctrkd.git/commitdiff
initial storage service
authorEugene Crosser <crosser@average.org>
Mon, 18 Apr 2022 22:09:44 +0000 (00:09 +0200)
committerEugene Crosser <crosser@average.org>
Mon, 18 Apr 2022 22:09:44 +0000 (00:09 +0200)
gps303.conf
gps303/__main__.py
gps303/collector.py
gps303/common.py [new file with mode: 0644]
gps303/config.py [deleted file]
gps303/evstore.py
gps303/gps303proto.py
gps303/storage.py [new file with mode: 0644]

index cdad9c7a3987ee24966c20c6fa911260befbd067..1a4f1813e783245cbc0ce7556b0da292a08c78a7 100644 (file)
@@ -1,10 +1,9 @@
 [collector]
 port = 4303
 publishurl = ipc:///tmp/collected
-listenurl = ipc:///responses
+listenurl = ipc:///tmp/responses
 
-[daemon]
-port = 4303
+[storage]
 dbfn = gps303.sqlite
 
 [opencellid]
index 51e462c3fcdf07b68fa5024bf7334dcdb83af522..461c4504e32ceae9e2cc2ccb576866dfc2d78049 100755 (executable)
@@ -27,12 +27,12 @@ if __name__.endswith("__main__"):
     log.setLevel(DEBUG if "-d" in opts else INFO)
     log.info("starting with options: %s", opts)
 
-    initdb(conf.get("daemon", "dbfn"))
+    initdb(conf.get("storage", "dbfn"))
     set_config(conf)
 
     ctlsock = socket(AF_INET, SOCK_STREAM)
     ctlsock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
-    ctlsock.bind(("", conf.getint("daemon", "port")))
+    ctlsock.bind(("", conf.getint("collector", "port")))
     ctlsock.listen(5)
     ctlfd = ctlsock.fileno()
     pollset = poll()
index 345d0b99a0de7048fc75f8a1a515007d2233318d..68c95bfb77d0ddbb75e8da64d25f6972d7e94847 100644 (file)
@@ -1,36 +1,45 @@
 """ TCP server that communicates with terminals """
 
 from getopt import getopt
-from logging import getLogger, StreamHandler, DEBUG, INFO
+from logging import getLogger
 from logging.handlers import SysLogHandler
 from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
 from time import time
-import sys
+from struct import pack
 import zmq
 
-from .config import readconfig
-from .gps303proto import parse_message, HIBERNATION, LOGIN, set_config
-
-CONF = "/etc/gps303.conf"
+from . import common
+from .gps303proto import HIBERNATION, LOGIN, parse_message, proto_of_message
 
 log = getLogger("gps303/collector")
 
 
 class Bcast:
     """Zmq message to broadcast what was received from the terminal"""
+
     def __init__(self, imei, msg):
-        self.as_bytes = imei.encode() + msg.to_packet()
+        self.as_bytes = (
+            pack("B", proto_of_message(msg))
+            + ("0000000000000000" if imei is None else imei).encode()
+            + msg
+        )
 
 
 class Resp:
     """Zmq message received from a third party to send to the terminal"""
-    def __init__(self, msg):
-        self.imei = msg[:16].decode()
-        self.payload = msg[16:]
+
+    def __init__(self, *args, **kwargs):
+        if not kwargs and len(args) == 1 and isinstance(args[0], bytes):
+            self.imei = msg[:16].decode()
+            self.payload = msg[16:]
+        elif len(args) == 0:
+            self.imei = kwargs["imei"]
+            self.payload = kwargs["payload"]
 
 
 class Client:
     """Connected socket to the terminal plus buffer and metadata"""
+
     def __init__(self, sock, addr):
         self.sock = sock
         self.addr = addr
@@ -44,16 +53,23 @@ class Client:
         self.imei = None
 
     def recv(self):
-        """ Read from the socket and parse complete messages """
+        """Read from the socket and parse complete messages"""
         try:
             segment = self.sock.recv(4096)
         except OSError:
-            log.warning("Reading from fd %d (IMEI %s): %s",
-                    self.sock.fileno(), self.imei, e)
+            log.warning(
+                "Reading from fd %d (IMEI %s): %s",
+                self.sock.fileno(),
+                self.imei,
+                e,
+            )
             return None
         if not segment:  # Terminal has closed connection
-            log.info("EOF reading from fd %d (IMEI %s)",
-                    self.sock.fileno(), self.imei)
+            log.info(
+                "EOF reading from fd %d (IMEI %s)",
+                self.sock.fileno(),
+                self.imei,
+            )
             return None
         when = time()
         self.buffer += segment
@@ -63,28 +79,38 @@ class Client:
             if framestart == -1:  # No frames, return whatever we have
                 break
             if framestart > 0:  # Should not happen, report
-                log.warning("Undecodable data \"%s\" from fd %d (IMEI %s)",
-                        self.buffer[:framestart].hex(), self.sock.fileno(), self.imei)
+                log.warning(
+                    'Undecodable data "%s" from fd %d (IMEI %s)',
+                    self.buffer[:framestart].hex(),
+                    self.sock.fileno(),
+                    self.imei,
+                )
                 self.buffer = self.buffer[framestart:]
             # At this point, buffer starts with a packet
             frameend = self.buffer.find(b"\r\n", 4)
             if frameend == -1:  # Incomplete frame, return what we have
                 break
-            msg = parse_message(self.buffer[2:frameend])
-            self.buffer = self.buffer[frameend+2:]
-            if isinstance(msg, LOGIN):
-                self.imei = msg.imei
-                log.info("LOGIN from fd %d: IMEI %s",
-                        self.sock.fileno(), self.imei)
-            msgs.append(msg)
+            packet = self.buffer[2:frameend]
+            self.buffer = self.buffer[frameend + 2 :]
+            if proto_of_message(packet) == LOGIN.PROTO:
+                self.imei = parse_message(packet).imei
+                log.info(
+                    "LOGIN from fd %d (IMEI %s)", self.sock.fileno(), self.imei
+                )
+            msgs.append(packet)
         return msgs
 
     def send(self, buffer):
         try:
             self.sock.send(b"xx" + buffer + b"\r\n")
         except OSError as e:
-            log.error("Sending to fd %d (IMEI %s): %s",
-                    self.sock.fileno, self.imei, e)
+            log.error(
+                "Sending to fd %d (IMEI %s): %s",
+                self.sock.fileno,
+                self.imei,
+                e,
+            )
+
 
 class Clients:
     def __init__(self):
@@ -107,9 +133,11 @@ class Clients:
     def recv(self, fd):
         clnt = self.by_fd[fd]
         msgs = clnt.recv()
+        if msgs is None:
+            return None
         result = []
         for msg in msgs:
-            if isinstance(msg, LOGIN):
+            if proto_of_message(msg) == LOGIN.PROTO:  # Could do blindly...
                 self.by_imei[clnt.imei] = clnt
             result.append((clnt.imei, msg))
         return result
@@ -119,7 +147,7 @@ class Clients:
             self.by_imei[resp.imei].send(resp.payload)
 
 
-def runserver(opts, conf):
+def runserver(conf):
     zctx = zmq.Context()
     zpub = zctx.socket(zmq.PUB)
     zpub.bind(conf.get("collector", "publishurl"))
@@ -154,9 +182,16 @@ def runserver(opts, conf):
                 else:
                     for imei, msg in clients.recv(sk):
                         zpub.send(Bcast(imei, msg).as_bytes)
-                        if msg is None or isinstance(msg, HIBERNATION):
-                            log.debug("HIBERNATION from fd %d", sk)
+                        if (
+                            msg is None
+                            or proto_of_message(msg) == HIBERNATION.PROTO
+                        ):
+                            log.debug(
+                                "HIBERNATION from fd %d (IMEI %s)", sk, imei
+                            )
                             tostop.append(sk)
+                        elif proto_of_message(msg) == LOGIN.PROTO:
+                            clients.response(Resp(imei=imei, payload=LOGIN.response()))
             # poll queue consumed, make changes now
             for fd in tostop:
                 poller.unregister(fd)
@@ -171,13 +206,4 @@ def runserver(opts, conf):
 
 
 if __name__.endswith("__main__"):
-    opts, _ = getopt(sys.argv[1:], "c:d")
-    opts = dict(opts)
-    conf = readconfig(opts["-c"] if "-c" in opts else CONF)
-    if sys.stdout.isatty():
-        log.addHandler(StreamHandler(sys.stderr))
-    else:
-        log.addHandler(SysLogHandler(address="/dev/log"))
-    log.setLevel(DEBUG if "-d" in opts else INFO)
-    log.info("starting with options: %s", opts)
-    runserver(opts, conf)
+    runserver(common.init(log))
diff --git a/gps303/common.py b/gps303/common.py
new file mode 100644 (file)
index 0000000..0e7aac1
--- /dev/null
@@ -0,0 +1,50 @@
+""" Common housekeeping for all daemons """
+
+from configparser import ConfigParser
+from getopt import getopt
+from logging import getLogger, StreamHandler, DEBUG, INFO
+from sys import argv, stderr, stdout
+
+CONF = "/etc/gps303.conf"
+PORT = 4303
+DBFN = "/var/lib/gps303/gps303.sqlite"
+
+def init(log):
+    opts, _ = getopt(argv[1:], "c:d")
+    opts = dict(opts)
+    conf = readconfig(opts["-c"] if "-c" in opts else CONF)
+    if stdout.isatty():
+        log.addHandler(StreamHandler(stderr))
+    else:
+        log.addHandler(SysLogHandler(address="/dev/log"))
+    log.setLevel(DEBUG if "-d" in opts else INFO)
+    log.info("starting with options: %s", opts)
+    return conf
+
+def readconfig(fname):
+    config = ConfigParser()
+    config["collector"] = {
+        "port": PORT,
+    }
+    config["storage"] = {
+        "dbfn": DBFN,
+    }
+    config["device"] = {}
+    #_print_config(config)
+    #print("now reading", fname)
+    config.read(fname)
+    #_print_config(config)
+    return config
+
+if __name__ == "__main__":
+    from sys import argv
+
+    def _print_config(conf):
+        for section in conf.sections():
+            print("section", section)
+            for option in conf.options(section):
+                print("    ", option, conf[section][option])
+
+    conf = readconfig(argv[1])
+    _print_config(conf)
+    print("binaryswitch", int(conf.get("device", "binaryswitch"), 0))
diff --git a/gps303/config.py b/gps303/config.py
deleted file mode 100644 (file)
index fe4f696..0000000
+++ /dev/null
@@ -1,29 +0,0 @@
-from configparser import ConfigParser
-
-PORT = 4303
-DBFN = "/var/lib/gps303/gps303.sqlite"
-
-def readconfig(fname):
-    config = ConfigParser()
-    config["daemon"] = {
-        "port": PORT,
-        "dbfn": DBFN,
-    }
-    config["device"] = {}
-    #_print_config(config)
-    #print("now reading", fname)
-    config.read(fname)
-    #_print_config(config)
-    return config
-
-def _print_config(conf):
-    for section in conf.sections():
-        print("section", section)
-        for option in conf.options(section):
-            print("    ", option, conf[section][option])
-
-if __name__ == "__main__":
-    from sys import argv
-    conf = readconfig(argv[1])
-    _print_config(conf)
-    print("binaryswitch", int(conf.get("device", "binaryswitch"), 0))
index 9bc60d5594a1f7656360fff741cbe1470ed346ab..b1950210555adb532e0cf645941857ca95bc3131 100644 (file)
@@ -1,10 +1,7 @@
-from logging import getLogger
 from sqlite3 import connect
 
 __all__ = ("initdb", "stow")
 
-log = getLogger("gps303")
-
 DB = None
 
 SCHEMA = """create table if not exists events (
@@ -19,7 +16,6 @@ SCHEMA = """create table if not exists events (
 
 def initdb(dbname):
     global DB
-    log.info('Using Sqlite3 database "%s"', dbname)
     DB = connect(dbname)
     DB.execute(SCHEMA)
 
index 1280fe15ebb6106b8890f6139077e7c4879d8485..6fa76550db1031ea901ff55d71998d1aeb9fd640 100755 (executable)
@@ -87,7 +87,8 @@ class GPS303Pkt:
     def to_packet(self):
         return pack("BB", self.length, self.PROTO) + self.payload
 
-    def response(self, *args):
+    @classmethod
+    def response(cls, *args):
         if len(args) == 0:
             return None
         assert len(args) == 1 and isinstance(args[0], bytes)
@@ -95,7 +96,7 @@ class GPS303Pkt:
         length = len(payload) + 1
         if length > 6:
             length -= 6
-        return pack("BB", length, self.PROTO) + payload
+        return pack("BB", length, cls.PROTO) + payload
 
 
 class UNKNOWN(GPS303Pkt):
@@ -112,14 +113,16 @@ class LOGIN(GPS303Pkt):
         self.ver = unpack("B", payload[-1:])[0]
         return self
 
-    def response(self):
+    @classmethod
+    def response(cls):
         return super().response(b"")
 
 
 class SUPERVISION(GPS303Pkt):  # Server sends supervision number status
     PROTO = 0x05
 
-    def response(self, supnum=0):
+    @classmethod
+    def response(cls, supnum=0):
         # 1: The device automatically answers Pickup effect
         # 2: Automatically Answering Two-way Calls
         # 3: Ring manually answer the two-way call
@@ -145,9 +148,9 @@ class _GPS_POSITIONING(GPS303Pkt):
         self.gps_nb_sat = payload[6] & 0x0F
         lat, lon, speed, flags = unpack("!IIBH", payload[7:18])
         self.gps_is_valid = bool(flags & 0b0001000000000000)  # bit 3
-        flip_lon = bool(flags & 0b0000100000000000)  # bit 4
-        flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
-        self.heading = flags & 0b0000001111111111  # bits 6 - last
+        flip_lon =          bool(flags & 0b0000100000000000)  # bit 4
+        flip_lat = not      bool(flags & 0b0000010000000000)  # bit 5
+        self.heading =           flags & 0b0000001111111111   # bits 6 - last
         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
         self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
         self.speed = speed
@@ -392,7 +395,7 @@ def proto_by_name(name):
 
 
 def proto_of_message(packet):
-    return unpack("B", packet[1:2])
+    return unpack("B", packet[1:2])[0]
 
 
 def make_object(length, proto, payload):
diff --git a/gps303/storage.py b/gps303/storage.py
new file mode 100644 (file)
index 0000000..9da1d6d
--- /dev/null
@@ -0,0 +1,38 @@
+""" Store zmq broadcasts to sqlite """
+
+from getopt import getopt
+from logging import getLogger
+from logging.handlers import SysLogHandler
+import sys
+from time import time
+import zmq
+
+from . import common
+from .evstore import initdb, stow
+from .gps303proto import parse_message
+
+log = getLogger("gps303/storage")
+
+def runserver(conf):
+    dbname = conf.get("storage", "dbfn")
+    log.info('Using Sqlite3 database "%s"', dbname)
+    initdb(dbname)
+    zctx = zmq.Context()
+    zsub = zctx.socket(zmq.SUB)
+    zsub.connect(conf.get("collector", "publishurl"))
+    zsub.setsockopt(zmq.SUBSCRIBE, b"")
+
+    try:
+        while True:
+            zmsg = zsub.recv()
+            imei = zmsg[1:17].decode()
+            packet = zmsg[17:]
+            msg = parse_message(packet)
+            log.debug("From IMEI %s: %s", imei, msg)
+            stow("", time(), imei, msg.length, msg.PROTO, msg.payload)
+    except KeyboardInterrupt:
+        pass
+
+
+if __name__.endswith("__main__"):
+    runserver(common.init(log))