]> www.average.org Git - loctrkd.git/blob - loctrkd/common.py
Cleanup some of the types
[loctrkd.git] / loctrkd / common.py
1 """ Common housekeeping for all daemons """
2
3 from configparser import ConfigParser
4 from importlib import import_module
5 from getopt import getopt
6 from json import dumps
7 from logging import Formatter, getLogger, Logger, StreamHandler, DEBUG, INFO
8 from logging.handlers import SysLogHandler
9 from pkg_resources import get_distribution, DistributionNotFound
10 from sys import argv, stderr, stdout
11 from typing import Any, cast, Dict, List, Optional, Tuple, Union
12 from types import SimpleNamespace
13
14 from .protomodule import ProtoModule
15
16 CONF = "/etc/loctrkd.conf"
17 pmods: List[ProtoModule] = []
18
19 try:
20     version = get_distribution("loctrkd").version
21 except DistributionNotFound:
22     version = "<local>"
23
24
25 def init_protocols(conf: ConfigParser) -> None:
26     global pmods
27     pmods = [
28         cast(ProtoModule, import_module("." + modnm, __package__))
29         for modnm in conf.get("common", "protocols").split(",")
30     ]
31
32
33 def init(
34     log: Logger, opts: Optional[List[Tuple[str, str]]] = None
35 ) -> ConfigParser:
36     if opts is None:
37         opts, _ = getopt(argv[1:], "c:d")
38     dopts = dict(opts)
39     conf = ConfigParser()
40     conf.read(dopts["-c"] if "-c" in dopts else CONF)
41     log.setLevel(DEBUG if "-d" in dopts else INFO)
42     if stdout.isatty():
43         fhdl = StreamHandler(stderr)
44         fhdl.setFormatter(
45             Formatter("%(asctime)s - %(levelname)s - %(message)s")
46         )
47         log.addHandler(fhdl)
48         log.debug("%s starting with options: %s", version, dopts)
49     else:
50         lhdl = SysLogHandler(address="/dev/log")
51         lhdl.setFormatter(
52             Formatter("%(name)s[%(process)d]: %(levelname)s - %(message)s")
53         )
54         log.addHandler(lhdl)
55         log.info("%s starting with options: %s", version, dopts)
56     init_protocols(conf)
57     return conf
58
59
60 def probe_pmod(segment: bytes) -> Optional[ProtoModule]:
61     for pmod in pmods:
62         if pmod.probe_buffer(segment):
63             return pmod
64     return None
65
66
67 def pmod_for_proto(proto: str) -> Optional[ProtoModule]:
68     for pmod in pmods:
69         if pmod.proto_handled(proto):
70             return pmod
71     return None
72
73
74 def parse_message(proto: str, packet: bytes, is_incoming: bool = True) -> Any:
75     pmod = pmod_for_proto(proto)
76     return pmod.parse_message(packet, is_incoming) if pmod else None
77
78
79 def exposed_protos() -> List[Tuple[str, bool]]:
80     return [item for pmod in pmods for item in pmod.exposed_protos()]
81
82
83 class Report:
84     TYPE: str
85
86     def __repr__(self) -> str:
87         return (
88             self.__class__.__name__
89             + "("
90             + ", ".join([f"{k}={v}" for k, v in self.__dict__.items()])
91             + ")"
92         )
93
94     @property
95     def json(self) -> str:
96         self.type = self.TYPE
97         return dumps(self.__dict__)
98
99
100 class CoordReport(Report):
101     TYPE = "location"
102
103     def __init__(
104         self,
105         *,
106         devtime: str,
107         battery_percentage: int,
108         accuracy: float,
109         altitude: float,
110         speed: float,
111         direction: float,
112         latitude: float,
113         longitude: float,
114     ) -> None:
115         self.devtime = devtime
116         self.battery_percentage = battery_percentage
117         self.accuracy = accuracy
118         self.altitude = altitude
119         self.speed = speed
120         self.direction = direction
121         self.latitude = latitude
122         self.longitude = longitude
123
124
125 class HintReport(Report):
126     TYPE = "approximate_location"
127
128     def __init__(
129         self,
130         *,
131         devtime: str,
132         battery_percentage: int,
133         mcc: int,
134         mnc: int,
135         gsm_cells: List[Tuple[int, int, int]],
136         wifi_aps: List[Tuple[str, str, int]],
137     ) -> None:
138         self.devtime = devtime
139         self.battery_percentage = battery_percentage
140         self.mcc = mcc
141         self.mnc = mnc
142         self.gsm_cells = gsm_cells
143         self.wifi_aps = wifi_aps
144
145
146 class StatusReport(Report):
147     TYPE = "status"
148
149     def __init__(self, *, battery_percentage: int) -> None:
150         self.battery_percentage = battery_percentage