]> www.average.org Git - loctrkd.git/blob - loctrkd/common.py
941a93e008dbf793cad604d0df1fd86a81809b63
[loctrkd.git] / loctrkd / common.py
1 """ Common housekeeping for all daemons """
2
3 from configparser import ConfigParser
4 from importlib import import_module
5 from getopt import getopt
6 from json import dumps
7 from logging import Formatter, getLogger, Logger, StreamHandler, DEBUG, INFO
8 from logging.handlers import SysLogHandler
9 from pkg_resources import get_distribution, DistributionNotFound
10 from sys import argv, stderr, stdout
11 from typing import Any, cast, Dict, List, Optional, Tuple, Union
12 from types import SimpleNamespace
13
14 from .protomodule import ProtoModule
15
16 CONF = "/etc/loctrkd.conf"
17 pmods: List[ProtoModule] = []
18
19 try:
20     version = get_distribution("loctrkd").version
21 except DistributionNotFound:
22     version = "<local>"
23
24
25 def init_protocols(conf: ConfigParser) -> None:
26     global pmods
27     pmods = [
28         cast(ProtoModule, import_module("." + modnm, __package__))
29         for modnm in conf.get("common", "protocols").split(",")
30     ]
31
32
33 def init(
34     log: Logger, opts: Optional[List[Tuple[str, str]]] = None
35 ) -> ConfigParser:
36     if opts is None:
37         opts, _ = getopt(argv[1:], "c:d")
38     dopts = dict(opts)
39     conf = ConfigParser()
40     conf.read(dopts["-c"] if "-c" in dopts else CONF)
41     log.setLevel(DEBUG if "-d" in dopts else INFO)
42     if stdout.isatty():
43         fhdl = StreamHandler(stderr)
44         fhdl.setFormatter(
45             Formatter("%(asctime)s - %(levelname)s - %(message)s")
46         )
47         log.addHandler(fhdl)
48         log.debug("%s starting with options: %s", version, dopts)
49     else:
50         lhdl = SysLogHandler(address="/dev/log")
51         lhdl.setFormatter(
52             Formatter("%(name)s[%(process)d]: %(levelname)s - %(message)s")
53         )
54         log.addHandler(lhdl)
55         log.info("%s starting with options: %s", version, dopts)
56     init_protocols(conf)
57     return conf
58
59
60 def probe_pmod(segment: bytes) -> Optional[ProtoModule]:
61     for pmod in pmods:
62         if pmod.probe_buffer(segment):
63             return pmod
64     return None
65
66
67 def pmod_for_proto(proto: str) -> Optional[ProtoModule]:
68     for pmod in pmods:
69         if pmod.proto_handled(proto):
70             return pmod
71     return None
72
73
74 def parse_message(proto: str, packet: bytes, is_incoming: bool = True) -> Any:
75     pmod = pmod_for_proto(proto)
76     return pmod.parse_message(packet, is_incoming) if pmod else None
77
78
79 def exposed_protos() -> List[Tuple[str, bool]]:
80     return [item for pmod in pmods for item in pmod.exposed_protos()]
81
82
83 class Report:
84     TYPE: str
85
86     def __repr__(self) -> str:
87         return (
88             self.__class__.__name__
89             + "("
90             + ", ".join(
91                 [f"{k}={v.__repr__()}" for k, v in self.__dict__.items()]
92             )
93             + ")"
94         )
95
96     @property
97     def json(self) -> str:
98         self.type = self.TYPE
99         return dumps(self.__dict__)
100
101
102 class CoordReport(Report):
103     TYPE = "location"
104
105     def __init__(
106         self,
107         *,
108         devtime: str,
109         battery_percentage: Optional[int],
110         accuracy: Optional[float],
111         altitude: Optional[float],
112         speed: Optional[float],
113         direction: Optional[float],
114         latitude: float,
115         longitude: float,
116     ) -> None:
117         self.devtime = devtime
118         self.battery_percentage = battery_percentage
119         self.accuracy = accuracy
120         self.altitude = altitude
121         self.speed = speed
122         self.direction = direction
123         self.latitude = latitude
124         self.longitude = longitude
125
126
127 class HintReport(Report):
128     TYPE = "approximate_location"
129
130     def __init__(
131         self,
132         *,
133         devtime: str,
134         battery_percentage: Optional[int],
135         mcc: int,
136         mnc: int,
137         gsm_cells: List[Tuple[int, int, int]],
138         wifi_aps: List[Tuple[str, str, int]],
139     ) -> None:
140         self.devtime = devtime
141         self.battery_percentage = battery_percentage
142         self.mcc = mcc
143         self.mnc = mnc
144         self.gsm_cells = gsm_cells
145         self.wifi_aps = wifi_aps
146
147
148 class StatusReport(Report):
149     TYPE = "status"
150
151     def __init__(self, *, battery_percentage: int) -> None:
152         self.battery_percentage = battery_percentage