]> www.average.org Git - loctrkd.git/blob - loctrkd/mkgpx.py
mkgpx: quick temp switch to beesure proto
[loctrkd.git] / loctrkd / mkgpx.py
1 """ Example that produces gpx from events in evstore """
2
3 # run as:
4 # python -m loctrkd.mkgpx <IMEI>
5 # Generated gpx is emitted to stdout
6
7 from configparser import ConfigParser
8 from datetime import datetime, timezone
9 from getopt import getopt
10 from importlib import import_module
11 from logging import getLogger
12 from sqlite3 import connect
13 from sys import argv
14 from typing import Any, cast, List, Tuple
15
16 from . import common
17
18 log = getLogger("loctrkd/mkgpx")
19
20
21 class ProtoModule:
22     @staticmethod
23     def proto_handled(proto: str) -> bool:
24         ...
25
26     @staticmethod
27     def parse_message(packet: bytes, is_incoming: bool = True) -> Any:
28         ...
29
30
31 pmods: List[ProtoModule] = []
32
33
34 def main(
35     conf: ConfigParser, opts: List[Tuple[str, str]], args: List[str]
36 ) -> None:
37     global pmods
38     pmods = [
39         cast(ProtoModule, import_module("." + modnm, __package__))
40         for modnm in conf.get("collector", "protocols").split(",")
41     ]
42     db = connect(conf.get("storage", "dbfn"))
43     c = db.cursor()
44     c.execute(
45         """select tstamp, is_incoming, proto, packet from events
46            where imei = ?  and is_incoming = true
47            and proto in (?, ?)
48            order by tstamp""",
49         (args[0], "BS:UD", "BS:UD2"),
50     )
51     print(
52         """<?xml version="1.0"?>
53     <gpx version="1.1"
54     creator="loctrkd"
55     xmlns="http://www.topografix.com/GPX/1/1">
56       <name>Location Data</name>
57       <trk>
58         <name>Location Data</name>
59         <trkseg>
60     """
61     )
62
63     for tstamp, is_incoming, proto, packet in c:
64         for pmod in pmods:
65             if pmod.proto_handled(proto):
66                 msg = pmod.parse_message(packet, is_incoming=is_incoming)
67         lat, lon = msg.latitude, msg.longitude
68         isotime = (
69             datetime.fromtimestamp(tstamp)
70             .astimezone(tz=timezone.utc)
71             .isoformat()
72         )
73         isotime = isotime[: isotime.rfind(".")] + "Z"
74         trkpt = """      <trkpt lat="{}" lon="{}">
75               <time>{}</time>
76           </trkpt>""".format(
77             lat, lon, isotime
78         )
79         print(trkpt)
80         if False:
81             print(
82                 datetime.fromtimestamp(tstamp)
83                 .astimezone(tz=timezone.utc)
84                 .isoformat(),
85                 msg,
86             )
87     print(
88         """    </trkseg>
89       </trk>
90     </gpx>"""
91     )
92
93
94 if __name__.endswith("__main__"):
95     opts, args = getopt(argv[1:], "o:c:d")
96     main(common.init(log, opts=opts), opts, args)