]> www.average.org Git - loctrkd.git/blob - loctrkd/common.py
Implement sending commands from the web interface
[loctrkd.git] / loctrkd / common.py
1 """ Common housekeeping for all daemons """
2
3 from configparser import ConfigParser
4 from importlib import import_module
5 from getopt import getopt
6 from json import dumps
7 from logging import Formatter, getLogger, Logger, StreamHandler, DEBUG, INFO
8 from logging.handlers import SysLogHandler
9 from pkg_resources import get_distribution, DistributionNotFound
10 from sys import argv, stderr, stdout
11 from typing import Any, cast, Dict, List, Optional, Tuple, Union
12 from types import SimpleNamespace
13
14 from .protomodule import ProtoClass, ProtoModule
15
16 CONF = "/etc/loctrkd.conf"
17 pmods: List[ProtoModule] = []
18
19 try:
20     version = get_distribution("loctrkd").version
21 except DistributionNotFound:
22     version = "<local>"
23
24
25 def init_protocols(conf: ConfigParser) -> None:
26     global pmods
27     pmods = [
28         cast(ProtoModule, import_module("." + modnm, __package__))
29         for modnm in conf.get("common", "protocols").split(",")
30     ]
31
32
33 def init(
34     log: Logger, opts: Optional[List[Tuple[str, str]]] = None
35 ) -> ConfigParser:
36     if opts is None:
37         opts, _ = getopt(argv[1:], "c:d")
38     dopts = dict(opts)
39     conf = ConfigParser()
40     conf.read(dopts["-c"] if "-c" in dopts else CONF)
41     log.setLevel(DEBUG if "-d" in dopts else INFO)
42     if stdout.isatty():
43         fhdl = StreamHandler(stderr)
44         fhdl.setFormatter(
45             Formatter("%(asctime)s - %(levelname)s - %(message)s")
46         )
47         log.addHandler(fhdl)
48         log.debug("%s starting with options: %s", version, dopts)
49     else:
50         lhdl = SysLogHandler(address="/dev/log")
51         lhdl.setFormatter(
52             Formatter("%(name)s[%(process)d]: %(levelname)s - %(message)s")
53         )
54         log.addHandler(lhdl)
55         log.info("%s starting with options: %s", version, dopts)
56     init_protocols(conf)
57     return conf
58
59
60 def probe_pmod(segment: bytes) -> Optional[ProtoModule]:
61     for pmod in pmods:
62         if pmod.probe_buffer(segment):
63             return pmod
64     return None
65
66
67 def pmod_for_proto(proto: str) -> Optional[ProtoModule]:
68     for pmod in pmods:
69         if pmod.proto_handled(proto):
70             return pmod
71     return None
72
73
74 def pmod_by_name(pmodname: str) -> Optional[ProtoModule]:
75     for pmod in pmods:
76         if pmod.__name__.split(".")[-1] == pmodname:
77             return pmod
78     return None
79
80
81 def make_response(
82     pmodname: str, cmd: str, imei: str, **kwargs: Any
83 ) -> Optional[ProtoClass.Out]:
84     pmod = pmod_by_name(pmodname)
85     if pmod is None:
86         return None
87     return pmod.make_response(cmd, imei, **kwargs)
88
89
90 def parse_message(proto: str, packet: bytes, is_incoming: bool = True) -> Any:
91     pmod = pmod_for_proto(proto)
92     return pmod.parse_message(packet, is_incoming) if pmod else None
93
94
95 def exposed_protos() -> List[Tuple[str, bool]]:
96     return [item for pmod in pmods for item in pmod.exposed_protos()]
97
98
99 class Report:
100     TYPE: str
101
102     def __repr__(self) -> str:
103         return (
104             self.__class__.__name__
105             + "("
106             + ", ".join(
107                 [f"{k}={v.__repr__()}" for k, v in self.__dict__.items()]
108             )
109             + ")"
110         )
111
112     @property
113     def json(self) -> str:
114         self.type = self.TYPE
115         return dumps(self.__dict__)
116
117
118 class CoordReport(Report):
119     TYPE = "location"
120
121     def __init__(
122         self,
123         *,
124         devtime: str,
125         battery_percentage: Optional[int],
126         accuracy: Optional[float],
127         altitude: Optional[float],
128         speed: Optional[float],
129         direction: Optional[float],
130         latitude: float,
131         longitude: float,
132     ) -> None:
133         self.devtime = devtime
134         self.battery_percentage = battery_percentage
135         self.accuracy = accuracy
136         self.altitude = altitude
137         self.speed = speed
138         self.direction = direction
139         self.latitude = latitude
140         self.longitude = longitude
141
142
143 class HintReport(Report):
144     TYPE = "approximate_location"
145
146     def __init__(
147         self,
148         *,
149         devtime: str,
150         battery_percentage: Optional[int],
151         mcc: int,
152         mnc: int,
153         gsm_cells: List[Tuple[int, int, int]],
154         wifi_aps: List[Tuple[str, str, int]],
155     ) -> None:
156         self.devtime = devtime
157         self.battery_percentage = battery_percentage
158         self.mcc = mcc
159         self.mnc = mnc
160         self.gsm_cells = gsm_cells
161         self.wifi_aps = wifi_aps
162
163
164 class StatusReport(Report):
165     TYPE = "status"
166
167     def __init__(self, *, battery_percentage: int) -> None:
168         self.battery_percentage = battery_percentage