]> www.average.org Git - loctrkd.git/blob - loctrkd/storage.py
Implement sending commands from the web interface
[loctrkd.git] / loctrkd / storage.py
1 """ Store zmq broadcasts to sqlite """
2
3 from configparser import ConfigParser
4 from datetime import datetime, timezone
5 from json import loads
6 from logging import getLogger
7 import zmq
8
9 from . import common
10 from .evstore import initdb, stow, stowloc, stowpmod
11 from .zmsg import Bcast, Rept
12
13 log = getLogger("loctrkd/storage")
14
15
16 def runserver(conf: ConfigParser) -> None:
17     dbname = conf.get("storage", "dbfn")
18     log.info('Using Sqlite3 database "%s"', dbname)
19     initdb(dbname)
20     # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
21     zctx = zmq.Context()  # type: ignore
22     zraw = zctx.socket(zmq.SUB)  # type: ignore
23     zraw.connect(conf.get("collector", "publishurl"))
24     if conf.getboolean("storage", "events", fallback=False):
25         zraw.setsockopt(zmq.SUBSCRIBE, b"")
26     zrep = zctx.socket(zmq.SUB)  # type: ignore
27     zrep.connect(conf.get("rectifier", "publishurl"))
28     zrep.setsockopt(zmq.SUBSCRIBE, b"")
29     poller = zmq.Poller()  # type: ignore
30     poller.register(zraw, flags=zmq.POLLIN)
31     poller.register(zrep, flags=zmq.POLLIN)
32
33     try:
34         while True:
35             events = poller.poll(1000)
36             for sk, fl in events:
37                 if sk is zraw:
38                     while True:
39                         try:
40                             zmsg = Bcast(zraw.recv(zmq.NOBLOCK))
41                         except zmq.Again:
42                             break
43                         log.debug(
44                             "%s IMEI %s from %s at %s: %s",
45                             "I" if zmsg.is_incoming else "O",
46                             zmsg.imei,
47                             zmsg.peeraddr,
48                             datetime.fromtimestamp(zmsg.when).astimezone(
49                                 tz=timezone.utc
50                             ),
51                             zmsg.packet.hex(),
52                         )
53                         stow(
54                             is_incoming=zmsg.is_incoming,
55                             peeraddr=str(zmsg.peeraddr),
56                             when=zmsg.when,
57                             imei=zmsg.imei,
58                             proto=zmsg.proto,
59                             packet=zmsg.packet,
60                         )
61                 elif sk is zrep:
62                     while True:
63                         try:
64                             rept = Rept(zrep.recv(zmq.NOBLOCK))
65                         except zmq.Again:
66                             break
67                         if rept.imei is not None and rept.pmod is not None:
68                             stowpmod(rept.imei, rept.pmod)
69                         data = loads(rept.payload)
70                         log.debug("R IMEI %s %s", rept.imei, data)
71                         if data.pop("type") == "location":
72                             data["imei"] = rept.imei
73                             stowloc(**data)
74
75                 else:
76                     log.error("Event %s on unknown socket %s", fl, sk)
77     except KeyboardInterrupt:
78         zrep.close()
79         zraw.close()
80         zctx.destroy()  # type: ignore
81
82
83 if __name__.endswith("__main__"):
84     runserver(common.init(log))