]> www.average.org Git - loctrkd.git/blobdiff - loctrkd/storage.py
Update changelog for 2.00 release
[loctrkd.git] / loctrkd / storage.py
index 128a4573cbacfe550d30b5c6f757a9de56b0feeb..843991502772c69693e77d34c5eba6737ebb2681 100644 (file)
@@ -2,47 +2,83 @@
 
 from configparser import ConfigParser
 from datetime import datetime, timezone
+from json import loads
 from logging import getLogger
 import zmq
 
 from . import common
-from .evstore import initdb, stow
-from .zmsg import Bcast
+from .evstore import initdb, stow, stowloc, stowpmod
+from .zmsg import Bcast, Rept
 
 log = getLogger("loctrkd/storage")
 
 
 def runserver(conf: ConfigParser) -> None:
+    stowevents = conf.getboolean("storage", "events", fallback=False)
     dbname = conf.get("storage", "dbfn")
     log.info('Using Sqlite3 database "%s"', dbname)
     initdb(dbname)
     # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
     zctx = zmq.Context()  # type: ignore
-    zsub = zctx.socket(zmq.SUB)  # type: ignore
-    zsub.connect(conf.get("collector", "publishurl"))
-    zsub.setsockopt(zmq.SUBSCRIBE, b"")
+    zraw = zctx.socket(zmq.SUB)  # type: ignore
+    zraw.connect(conf.get("collector", "publishurl"))
+    zraw.setsockopt(zmq.SUBSCRIBE, b"")
+    zrep = zctx.socket(zmq.SUB)  # type: ignore
+    zrep.connect(conf.get("rectifier", "publishurl"))
+    zrep.setsockopt(zmq.SUBSCRIBE, b"")
+    poller = zmq.Poller()  # type: ignore
+    poller.register(zraw, flags=zmq.POLLIN)
+    poller.register(zrep, flags=zmq.POLLIN)
 
     try:
         while True:
-            zmsg = Bcast(zsub.recv())
-            log.debug(
-                "%s IMEI %s from %s at %s: %s",
-                "I" if zmsg.is_incoming else "O",
-                zmsg.imei,
-                zmsg.peeraddr,
-                datetime.fromtimestamp(zmsg.when).astimezone(tz=timezone.utc),
-                zmsg.packet.hex(),
-            )
-            stow(
-                is_incoming=zmsg.is_incoming,
-                peeraddr=str(zmsg.peeraddr),
-                when=zmsg.when,
-                imei=zmsg.imei,
-                proto=zmsg.proto,
-                packet=zmsg.packet,
-            )
+            events = poller.poll(1000)
+            for sk, fl in events:
+                if sk is zraw:
+                    while True:
+                        try:
+                            zmsg = Bcast(zraw.recv(zmq.NOBLOCK))
+                        except zmq.Again:
+                            break
+                        log.debug(
+                            "%s IMEI %s from %s at %s %s: %s",
+                            "I" if zmsg.is_incoming else "O",
+                            zmsg.imei,
+                            zmsg.peeraddr,
+                            zmsg.pmod,
+                            datetime.fromtimestamp(zmsg.when).astimezone(
+                                tz=timezone.utc
+                            ),
+                            zmsg.packet.hex(),
+                        )
+                        if zmsg.imei is not None and zmsg.pmod is not None:
+                            stowpmod(zmsg.imei, zmsg.pmod)
+                        if stowevents:
+                            stow(
+                                is_incoming=zmsg.is_incoming,
+                                peeraddr=str(zmsg.peeraddr),
+                                when=zmsg.when,
+                                imei=zmsg.imei,
+                                proto=zmsg.proto,
+                                packet=zmsg.packet,
+                            )
+                elif sk is zrep:
+                    while True:
+                        try:
+                            rept = Rept(zrep.recv(zmq.NOBLOCK))
+                        except zmq.Again:
+                            break
+                        data = loads(rept.payload)
+                        log.debug("R IMEI %s %s", rept.imei, data)
+                        if data.pop("type") == "location":
+                            data["imei"] = rept.imei
+                            stowloc(**data)
+
+                else:
+                    log.error("Event %s on unknown socket %s", fl, sk)
     except KeyboardInterrupt:
-        zsub.close()
+        zrep.close()
+        zraw.close()
         zctx.destroy()  # type: ignore