X-Git-Url: http://www.average.org/gitweb/?a=blobdiff_plain;f=loctrkd%2Fstorage.py;h=e1be227d8ef8b11d1b0b05586f4cba2dab19f2a1;hb=2cf0fd9d215dda17eae4261ab7967367f6aa0028;hp=128a4573cbacfe550d30b5c6f757a9de56b0feeb;hpb=63a086cf3956b93f760b1a0344afd757e0d0392f;p=loctrkd.git diff --git a/loctrkd/storage.py b/loctrkd/storage.py index 128a457..e1be227 100644 --- a/loctrkd/storage.py +++ b/loctrkd/storage.py @@ -2,12 +2,13 @@ from configparser import ConfigParser from datetime import datetime, timezone +from json import loads from logging import getLogger import zmq from . import common -from .evstore import initdb, stow -from .zmsg import Bcast +from .evstore import initdb, stow, stowloc, stowpmod +from .zmsg import Bcast, Rept log = getLogger("loctrkd/storage") @@ -18,31 +19,64 @@ def runserver(conf: ConfigParser) -> None: initdb(dbname) # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?! zctx = zmq.Context() # type: ignore - zsub = zctx.socket(zmq.SUB) # type: ignore - zsub.connect(conf.get("collector", "publishurl")) - zsub.setsockopt(zmq.SUBSCRIBE, b"") + zraw = zctx.socket(zmq.SUB) # type: ignore + zraw.connect(conf.get("collector", "publishurl")) + if conf.getboolean("storage", "events", fallback=False): + zraw.setsockopt(zmq.SUBSCRIBE, b"") + zrep = zctx.socket(zmq.SUB) # type: ignore + zrep.connect(conf.get("rectifier", "publishurl")) + zrep.setsockopt(zmq.SUBSCRIBE, b"") + poller = zmq.Poller() # type: ignore + poller.register(zraw, flags=zmq.POLLIN) + poller.register(zrep, flags=zmq.POLLIN) try: while True: - zmsg = Bcast(zsub.recv()) - log.debug( - "%s IMEI %s from %s at %s: %s", - "I" if zmsg.is_incoming else "O", - zmsg.imei, - zmsg.peeraddr, - datetime.fromtimestamp(zmsg.when).astimezone(tz=timezone.utc), - zmsg.packet.hex(), - ) - stow( - is_incoming=zmsg.is_incoming, - peeraddr=str(zmsg.peeraddr), - when=zmsg.when, - imei=zmsg.imei, - proto=zmsg.proto, - packet=zmsg.packet, - ) + events = poller.poll(1000) + for sk, fl in events: + if sk is zraw: + while True: + try: + zmsg = Bcast(zraw.recv(zmq.NOBLOCK)) + except zmq.Again: + break + log.debug( + "%s IMEI %s from %s at %s: %s", + "I" if zmsg.is_incoming else "O", + zmsg.imei, + zmsg.peeraddr, + datetime.fromtimestamp(zmsg.when).astimezone( + tz=timezone.utc + ), + zmsg.packet.hex(), + ) + stow( + is_incoming=zmsg.is_incoming, + peeraddr=str(zmsg.peeraddr), + when=zmsg.when, + imei=zmsg.imei, + proto=zmsg.proto, + packet=zmsg.packet, + ) + elif sk is zrep: + while True: + try: + rept = Rept(zrep.recv(zmq.NOBLOCK)) + except zmq.Again: + break + if rept.imei is not None and rept.pmod is not None: + stowpmod(rept.imei, rept.pmod) + data = loads(rept.payload) + log.debug("R IMEI %s %s", rept.imei, data) + if data.pop("type") == "location": + data["imei"] = rept.imei + stowloc(**data) + + else: + log.error("Event %s on unknown socket %s", fl, sk) except KeyboardInterrupt: - zsub.close() + zrep.close() + zraw.close() zctx.destroy() # type: ignore