]> www.average.org Git - loctrkd.git/blob - loctrkd/evstore.py
storage: save both raw and rectified reports
[loctrkd.git] / loctrkd / evstore.py
1 """ sqlite event store """
2
3 from datetime import datetime
4 from json import dumps
5 from sqlite3 import connect, OperationalError
6 from typing import Any, Dict, List, Tuple
7
8 __all__ = "fetch", "initdb", "stow", "stowloc"
9
10 DB = None
11
12 SCHEMA = (
13     """create table if not exists events (
14     tstamp real not null,
15     imei text,
16     peeraddr text not null,
17     is_incoming int not null default TRUE,
18     proto text not null,
19     packet blob
20 )""",
21     """create table if not exists reports (
22     imei text,
23     devtime text not null,
24     accuracy real,
25     latitude real,
26     longitude real,
27     remainder text
28 )""",
29 )
30
31
32 def initdb(dbname: str) -> None:
33     global DB
34     DB = connect(dbname)
35     try:
36         DB.execute(
37             """alter table events add column
38                 is_incoming int not null default TRUE"""
39         )
40     except OperationalError:
41         for stmt in SCHEMA:
42             DB.execute(stmt)
43
44
45 def stow(**kwargs: Any) -> None:
46     assert DB is not None
47     parms = {
48         k: kwargs[k] if k in kwargs else v
49         for k, v in (
50             ("is_incoming", True),
51             ("peeraddr", None),
52             ("when", 0.0),
53             ("imei", None),
54             ("proto", "UNKNOWN"),
55             ("packet", b""),
56         )
57     }
58     assert len(kwargs) <= len(parms)
59     DB.execute(
60         """insert or ignore into events
61                 (tstamp, imei, peeraddr, proto, packet, is_incoming)
62                 values
63                 (:when, :imei, :peeraddr, :proto, :packet, :is_incoming)
64         """,
65         parms,
66     )
67     DB.commit()
68
69
70 def stowloc(**kwargs: Dict[str, Any]) -> None:
71     assert DB is not None
72     parms = {
73         k: kwargs.pop(k) if k in kwargs else v
74         for k, v in (
75             ("imei", None),
76             ("devtime", str(datetime.now())),
77             ("accuracy", None),
78             ("latitude", None),
79             ("longitude", None),
80         )
81     }
82     parms["remainder"] = dumps(kwargs)
83     DB.execute(
84         """insert or ignore into reports
85                 (imei, devtime, accuracy, latitude, longitude, remainder)
86                 values
87                 (:imei, :devtime, :accuracy, :latitude, :longitude, :remainder)
88         """,
89         parms,
90     )
91     DB.commit()
92
93
94 def fetch(
95     imei: str, matchlist: List[Tuple[bool, str]], backlog: int
96 ) -> List[Tuple[bool, float, str, bytes]]:
97     # matchlist is a list of tuples (is_incoming, proto)
98     # returns a list of tuples (is_incoming, timestamp, packet)
99     assert DB is not None
100     selector = " or ".join(
101         (f"(is_incoming = ? and proto = ?)" for _ in range(len(matchlist)))
102     )
103     cur = DB.cursor()
104     cur.execute(
105         f"""select is_incoming, tstamp, proto, packet from events
106                     where ({selector}) and imei = ?
107                     order by tstamp desc limit ?""",
108         tuple(item for sublist in matchlist for item in sublist)
109         + (imei, backlog),
110     )
111     result = list(cur)
112     cur.close()
113     return list(reversed(result))