]> www.average.org Git - loctrkd.git/blobdiff - loctrkd/storage.py
Implement sending commands from the web interface
[loctrkd.git] / loctrkd / storage.py
index 128a4573cbacfe550d30b5c6f757a9de56b0feeb..e1be227d8ef8b11d1b0b05586f4cba2dab19f2a1 100644 (file)
@@ -2,12 +2,13 @@
 
 from configparser import ConfigParser
 from datetime import datetime, timezone
+from json import loads
 from logging import getLogger
 import zmq
 
 from . import common
-from .evstore import initdb, stow
-from .zmsg import Bcast
+from .evstore import initdb, stow, stowloc, stowpmod
+from .zmsg import Bcast, Rept
 
 log = getLogger("loctrkd/storage")
 
@@ -18,31 +19,64 @@ def runserver(conf: ConfigParser) -> None:
     initdb(dbname)
     # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
     zctx = zmq.Context()  # type: ignore
-    zsub = zctx.socket(zmq.SUB)  # type: ignore
-    zsub.connect(conf.get("collector", "publishurl"))
-    zsub.setsockopt(zmq.SUBSCRIBE, b"")
+    zraw = zctx.socket(zmq.SUB)  # type: ignore
+    zraw.connect(conf.get("collector", "publishurl"))
+    if conf.getboolean("storage", "events", fallback=False):
+        zraw.setsockopt(zmq.SUBSCRIBE, b"")
+    zrep = zctx.socket(zmq.SUB)  # type: ignore
+    zrep.connect(conf.get("rectifier", "publishurl"))
+    zrep.setsockopt(zmq.SUBSCRIBE, b"")
+    poller = zmq.Poller()  # type: ignore
+    poller.register(zraw, flags=zmq.POLLIN)
+    poller.register(zrep, flags=zmq.POLLIN)
 
     try:
         while True:
-            zmsg = Bcast(zsub.recv())
-            log.debug(
-                "%s IMEI %s from %s at %s: %s",
-                "I" if zmsg.is_incoming else "O",
-                zmsg.imei,
-                zmsg.peeraddr,
-                datetime.fromtimestamp(zmsg.when).astimezone(tz=timezone.utc),
-                zmsg.packet.hex(),
-            )
-            stow(
-                is_incoming=zmsg.is_incoming,
-                peeraddr=str(zmsg.peeraddr),
-                when=zmsg.when,
-                imei=zmsg.imei,
-                proto=zmsg.proto,
-                packet=zmsg.packet,
-            )
+            events = poller.poll(1000)
+            for sk, fl in events:
+                if sk is zraw:
+                    while True:
+                        try:
+                            zmsg = Bcast(zraw.recv(zmq.NOBLOCK))
+                        except zmq.Again:
+                            break
+                        log.debug(
+                            "%s IMEI %s from %s at %s: %s",
+                            "I" if zmsg.is_incoming else "O",
+                            zmsg.imei,
+                            zmsg.peeraddr,
+                            datetime.fromtimestamp(zmsg.when).astimezone(
+                                tz=timezone.utc
+                            ),
+                            zmsg.packet.hex(),
+                        )
+                        stow(
+                            is_incoming=zmsg.is_incoming,
+                            peeraddr=str(zmsg.peeraddr),
+                            when=zmsg.when,
+                            imei=zmsg.imei,
+                            proto=zmsg.proto,
+                            packet=zmsg.packet,
+                        )
+                elif sk is zrep:
+                    while True:
+                        try:
+                            rept = Rept(zrep.recv(zmq.NOBLOCK))
+                        except zmq.Again:
+                            break
+                        if rept.imei is not None and rept.pmod is not None:
+                            stowpmod(rept.imei, rept.pmod)
+                        data = loads(rept.payload)
+                        log.debug("R IMEI %s %s", rept.imei, data)
+                        if data.pop("type") == "location":
+                            data["imei"] = rept.imei
+                            stowloc(**data)
+
+                else:
+                    log.error("Event %s on unknown socket %s", fl, sk)
     except KeyboardInterrupt:
-        zsub.close()
+        zrep.close()
+        zraw.close()
         zctx.destroy()  # type: ignore