]> www.average.org Git - loctrkd.git/blob - loctrkd/storage.py
Change reporting of pmod to events
[loctrkd.git] / loctrkd / storage.py
1 """ Store zmq broadcasts to sqlite """
2
3 from configparser import ConfigParser
4 from datetime import datetime, timezone
5 from json import loads
6 from logging import getLogger
7 import zmq
8
9 from . import common
10 from .evstore import initdb, stow, stowloc, stowpmod
11 from .zmsg import Bcast, Rept
12
13 log = getLogger("loctrkd/storage")
14
15
16 def runserver(conf: ConfigParser) -> None:
17     stowevents = conf.getboolean("storage", "events", fallback=False)
18     dbname = conf.get("storage", "dbfn")
19     log.info('Using Sqlite3 database "%s"', dbname)
20     initdb(dbname)
21     # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
22     zctx = zmq.Context()  # type: ignore
23     zraw = zctx.socket(zmq.SUB)  # type: ignore
24     zraw.connect(conf.get("collector", "publishurl"))
25     zraw.setsockopt(zmq.SUBSCRIBE, b"")
26     zrep = zctx.socket(zmq.SUB)  # type: ignore
27     zrep.connect(conf.get("rectifier", "publishurl"))
28     zrep.setsockopt(zmq.SUBSCRIBE, b"")
29     poller = zmq.Poller()  # type: ignore
30     poller.register(zraw, flags=zmq.POLLIN)
31     poller.register(zrep, flags=zmq.POLLIN)
32
33     try:
34         while True:
35             events = poller.poll(1000)
36             for sk, fl in events:
37                 if sk is zraw:
38                     while True:
39                         try:
40                             zmsg = Bcast(zraw.recv(zmq.NOBLOCK))
41                         except zmq.Again:
42                             break
43                         log.debug(
44                             "%s IMEI %s from %s at %s %s: %s",
45                             "I" if zmsg.is_incoming else "O",
46                             zmsg.imei,
47                             zmsg.peeraddr,
48                             zmsg.pmod,
49                             datetime.fromtimestamp(zmsg.when).astimezone(
50                                 tz=timezone.utc
51                             ),
52                             zmsg.packet.hex(),
53                         )
54                         if zmsg.imei is not None and zmsg.pmod is not None:
55                             stowpmod(zmsg.imei, zmsg.pmod)
56                         if stowevents:
57                             stow(
58                                 is_incoming=zmsg.is_incoming,
59                                 peeraddr=str(zmsg.peeraddr),
60                                 when=zmsg.when,
61                                 imei=zmsg.imei,
62                                 proto=zmsg.proto,
63                                 packet=zmsg.packet,
64                             )
65                 elif sk is zrep:
66                     while True:
67                         try:
68                             rept = Rept(zrep.recv(zmq.NOBLOCK))
69                         except zmq.Again:
70                             break
71                         data = loads(rept.payload)
72                         log.debug("R IMEI %s %s", rept.imei, data)
73                         if data.pop("type") == "location":
74                             data["imei"] = rept.imei
75                             stowloc(**data)
76
77                 else:
78                     log.error("Event %s on unknown socket %s", fl, sk)
79     except KeyboardInterrupt:
80         zrep.close()
81         zraw.close()
82         zctx.destroy()  # type: ignore
83
84
85 if __name__.endswith("__main__"):
86     runserver(common.init(log))