]> www.average.org Git - loctrkd.git/blob - loctrkd/evstore.py
Ignore pmod registrations older than an hour
[loctrkd.git] / loctrkd / evstore.py
1 """ sqlite event store """
2
3 from datetime import datetime
4 from json import dumps, loads
5 from sqlite3 import connect, OperationalError, Row
6 from typing import Any, Dict, List, Optional, Tuple
7
8 __all__ = "fetch", "initdb", "stow", "stowloc"
9
10 DB = None
11
12 SCHEMA = (
13     """create table if not exists events (
14     tstamp real not null,
15     imei text,
16     peeraddr text not null,
17     is_incoming int not null default TRUE,
18     proto text not null,
19     packet blob
20 )""",
21     """create table if not exists reports (
22     imei text,
23     devtime text not null,
24     accuracy real,
25     latitude real,
26     longitude real,
27     remainder text
28 )""",
29     """create table if not exists pmodmap (
30     imei text not null unique,
31     pmod text not null,
32     tstamp real not null default (unixepoch())
33 )""",
34 )
35
36
37 def initdb(dbname: str) -> None:
38     global DB
39     DB = connect(dbname)
40     DB.row_factory = Row
41     need_populate_pmodmap = False
42     try:
43         DB.execute("select count(pmod) from pmodmap")
44         try:
45             DB.execute("select count(tstamp) from pmodmap")
46         except OperationalError:
47             need_populate_pmodmap = True
48             DB.execute("alter table pmodmap rename to old_pmodmap")
49     except OperationalError:
50         pass  # DB was empty
51     for stmt in SCHEMA:
52         DB.execute(stmt)
53     if need_populate_pmodmap:
54         DB.execute(
55             """insert into pmodmap(imei, pmod)
56                select imei, pmod from old_pmodmap"""
57         )
58         DB.execute("drop table old_pmodmap")
59         DB.commit()
60
61
62 def stow(**kwargs: Any) -> None:
63     assert DB is not None
64     parms = {
65         k: kwargs[k] if k in kwargs else v
66         for k, v in (
67             ("is_incoming", True),
68             ("peeraddr", None),
69             ("when", 0.0),
70             ("imei", None),
71             ("proto", "UNKNOWN"),
72             ("packet", b""),
73         )
74     }
75     assert len(kwargs) <= len(parms)
76     DB.execute(
77         """insert or ignore into events
78                 (tstamp, imei, peeraddr, proto, packet, is_incoming)
79                 values
80                 (:when, :imei, :peeraddr, :proto, :packet, :is_incoming)
81         """,
82         parms,
83     )
84     DB.commit()
85
86
87 def stowloc(**kwargs: Dict[str, Any]) -> None:
88     assert DB is not None
89     parms = {
90         k: kwargs.pop(k) if k in kwargs else v
91         for k, v in (
92             ("imei", None),
93             ("devtime", str(datetime.now())),
94             ("accuracy", None),
95             ("latitude", None),
96             ("longitude", None),
97         )
98     }
99     parms["remainder"] = dumps(kwargs)
100     DB.execute(
101         """insert or ignore into reports
102                 (imei, devtime, accuracy, latitude, longitude, remainder)
103                 values
104                 (:imei, :devtime, :accuracy, :latitude, :longitude, :remainder)
105         """,
106         parms,
107     )
108     DB.commit()
109
110
111 def stowpmod(imei: str, pmod: str) -> None:
112     assert DB is not None
113     DB.execute(
114         """insert or replace into pmodmap
115                 (imei, pmod) values (:imei, :pmod)
116         """,
117         {"imei": imei, "pmod": pmod},
118     )
119     DB.commit()
120
121
122 def fetch(imei: str, backlog: int) -> List[Dict[str, Any]]:
123     assert DB is not None
124     cur = DB.cursor()
125     cur.execute(
126         """select imei, devtime, accuracy, latitude, longitude, remainder
127                     from reports where imei = ?
128                     order by devtime desc limit ?""",
129         (imei, backlog),
130     )
131     result = []
132     for row in cur:
133         dic = dict(row)
134         remainder = loads(dic.pop("remainder"))
135         dic.update(remainder)
136         result.append(dic)
137     cur.close()
138     return list(reversed(result))
139
140
141 def fetchpmod(imei: str) -> Optional[Any]:
142     assert DB is not None
143     ret = None
144     cur = DB.cursor()
145     cur.execute(
146         """select pmod from pmodmap where imei = ?
147            and tstamp > unixepoch() - 3600.0""",
148         (imei,),
149     )
150     result = cur.fetchone()
151     if result:
152         ret = result[0]
153     cur.close()
154     return ret