]> www.average.org Git - loctrkd.git/blob - loctrkd/common.py
Convert recitifier to multiprotocol support
[loctrkd.git] / loctrkd / common.py
1 """ Common housekeeping for all daemons """
2
3 from configparser import ConfigParser
4 from importlib import import_module
5 from getopt import getopt
6 from json import dumps
7 from logging import Formatter, getLogger, Logger, StreamHandler, DEBUG, INFO
8 from logging.handlers import SysLogHandler
9 from pkg_resources import get_distribution, DistributionNotFound
10 from sys import argv, stderr, stdout
11 from typing import Any, cast, Dict, List, Optional, Tuple, Union
12 from types import SimpleNamespace
13
14 from .protomodule import ProtoModule
15
16 CONF = "/etc/loctrkd.conf"
17 pmods: List[ProtoModule] = []
18
19 try:
20     version = get_distribution("loctrkd").version
21 except DistributionNotFound:
22     version = "<local>"
23
24
25 def init_protocols(conf: ConfigParser) -> None:
26     global pmods
27     pmods = [
28         cast(ProtoModule, import_module("." + modnm, __package__))
29         for modnm in conf.get("common", "protocols").split(",")
30     ]
31
32
33 def init(
34     log: Logger, opts: Optional[List[Tuple[str, str]]] = None
35 ) -> ConfigParser:
36     if opts is None:
37         opts, _ = getopt(argv[1:], "c:d")
38     dopts = dict(opts)
39     conf = ConfigParser()
40     conf.read(dopts["-c"] if "-c" in dopts else CONF)
41     log.setLevel(DEBUG if "-d" in dopts else INFO)
42     if stdout.isatty():
43         fhdl = StreamHandler(stderr)
44         fhdl.setFormatter(
45             Formatter("%(asctime)s - %(levelname)s - %(message)s")
46         )
47         log.addHandler(fhdl)
48         log.debug("%s starting with options: %s", version, dopts)
49     else:
50         lhdl = SysLogHandler(address="/dev/log")
51         lhdl.setFormatter(
52             Formatter("%(name)s[%(process)d]: %(levelname)s - %(message)s")
53         )
54         log.addHandler(lhdl)
55         log.info("%s starting with options: %s", version, dopts)
56     init_protocols(conf)
57     return conf
58
59
60 def probe_pmod(segment: bytes) -> Optional[ProtoModule]:
61     for pmod in pmods:
62         if pmod.probe_buffer(segment):
63             return pmod
64     return None
65
66
67 def pmod_for_proto(proto: str) -> Optional[ProtoModule]:
68     for pmod in pmods:
69         if pmod.proto_handled(proto):
70             return pmod
71     return None
72
73
74 def parse_message(proto: str, packet: bytes, is_incoming: bool = True) -> Any:
75     pmod = pmod_for_proto(proto)
76     return pmod.parse_message(packet, is_incoming) if pmod else None
77
78
79 def exposed_protos() -> List[Tuple[str, bool]]:
80     return [item for pmod in pmods for item in pmod.exposed_protos()]
81
82
83 class Report(SimpleNamespace):
84     TYPE: str
85
86     @property
87     def json(self) -> str:
88         self.type = self.TYPE
89         return dumps(self.__dict__)
90
91
92 class CoordReport(Report):
93     TYPE = "location"
94
95     def __init__(
96         self,
97         *,
98         devtime: str,
99         battery_percentage: int,
100         accuracy: float,
101         altitude: float,
102         speed: float,
103         direction: float,
104         latitude: float,
105         longitude: float
106     ) -> None:
107         super().__init__(
108             devtime=devtime,
109             battery_percentage=battery_percentage,
110             accuracy=accuracy,
111             altitude=altitude,
112             speed=speed,
113             direction=direction,
114             latitude=latitude,
115             longitude=longitude,
116         )
117
118
119 class HintReport(Report):
120     TYPE = "approximate_location"
121
122     def __init__(
123         self,
124         *,
125         devtime: str,
126         battery_percentage: int,
127         mcc: int,
128         mnc: int,
129         gsm_cells: List[Tuple[int, int, int]],
130         wifi_aps: List[Tuple[str, str, int]]
131     ) -> None:
132         super().__init__(
133             devtime=devtime,
134             battery_percentage=battery_percentage,
135             mcc=mcc,
136             mnc=mnc,
137             gsm_cells=gsm_cells,
138             wifi_aps=wifi_aps,
139         )
140
141
142 class StatusReport(Report):
143     TYPE = "status"
144
145     def __init__(self, *, battery_percentage: int) -> None:
146         super().__init__(battery_percentage=battery_percentage)