1 """ Store zmq broadcasts to sqlite """
3 from configparser import ConfigParser
4 from datetime import datetime, timezone
6 from logging import getLogger
10 from .evstore import initdb, stow, stowloc, stowpmod
11 from .zmsg import Bcast, Rept
13 log = getLogger("loctrkd/storage")
16 def runserver(conf: ConfigParser) -> None:
17 stowevents = conf.getboolean("storage", "events", fallback=False)
18 dbname = conf.get("storage", "dbfn")
19 log.info('Using Sqlite3 database "%s"', dbname)
21 # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
22 zctx = zmq.Context() # type: ignore
23 zraw = zctx.socket(zmq.SUB) # type: ignore
24 zraw.connect(conf.get("collector", "publishurl"))
25 zraw.setsockopt(zmq.SUBSCRIBE, b"")
26 zrep = zctx.socket(zmq.SUB) # type: ignore
27 zrep.connect(conf.get("rectifier", "publishurl"))
28 zrep.setsockopt(zmq.SUBSCRIBE, b"")
29 poller = zmq.Poller() # type: ignore
30 poller.register(zraw, flags=zmq.POLLIN)
31 poller.register(zrep, flags=zmq.POLLIN)
35 events = poller.poll(1000)
40 zmsg = Bcast(zraw.recv(zmq.NOBLOCK))
44 "%s IMEI %s from %s at %s %s: %s",
45 "I" if zmsg.is_incoming else "O",
49 datetime.fromtimestamp(zmsg.when).astimezone(
54 if zmsg.imei is not None and zmsg.pmod is not None:
55 stowpmod(zmsg.imei, zmsg.pmod)
58 is_incoming=zmsg.is_incoming,
59 peeraddr=str(zmsg.peeraddr),
68 rept = Rept(zrep.recv(zmq.NOBLOCK))
71 data = loads(rept.payload)
72 log.debug("R IMEI %s %s", rept.imei, data)
73 if data.pop("type") == "location":
74 data["imei"] = rept.imei
78 log.error("Event %s on unknown socket %s", fl, sk)
79 except KeyboardInterrupt:
82 zctx.destroy() # type: ignore
85 if __name__.endswith("__main__"):
86 runserver(common.init(log))