]> www.average.org Git - loctrkd.git/blob - gps303/storage.py
typeckecking: annotate storage.py
[loctrkd.git] / gps303 / storage.py
1 """ Store zmq broadcasts to sqlite """
2
3 from configparser import ConfigParser
4 from datetime import datetime, timezone
5 from logging import getLogger
6 import zmq
7
8 from . import common
9 from .evstore import initdb, stow
10 from .gps303proto import proto_of_message
11 from .zmsg import Bcast
12
13 log = getLogger("gps303/storage")
14
15
16 def runserver(conf: ConfigParser) -> None:
17     dbname = conf.get("storage", "dbfn")
18     log.info('Using Sqlite3 database "%s"', dbname)
19     initdb(dbname)
20     # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
21     zctx = zmq.Context()  # type: ignore
22     zsub = zctx.socket(zmq.SUB)  # type: ignore
23     zsub.connect(conf.get("collector", "publishurl"))
24     zsub.setsockopt(zmq.SUBSCRIBE, b"")
25
26     try:
27         while True:
28             zmsg = Bcast(zsub.recv())
29             log.debug(
30                 "%s IMEI %s from %s at %s: %s",
31                 "I" if zmsg.is_incoming else "O",
32                 zmsg.imei,
33                 zmsg.peeraddr,
34                 datetime.fromtimestamp(zmsg.when).astimezone(tz=timezone.utc),
35                 zmsg.packet.hex(),
36             )
37             stow(
38                 is_incoming=zmsg.is_incoming,
39                 peeraddr=str(zmsg.peeraddr),
40                 when=zmsg.when,
41                 imei=zmsg.imei,
42                 proto=proto_of_message(zmsg.packet),
43                 packet=zmsg.packet,
44             )
45     except KeyboardInterrupt:
46         pass
47
48
49 if __name__.endswith("__main__"):
50     runserver(common.init(log))