]> www.average.org Git - loctrkd.git/blobdiff - loctrkd/common.py
Implement sending commands from the web interface
[loctrkd.git] / loctrkd / common.py
index 227611c62fa4b8cf04f6bb22f2f8e4754cd744f2..162fe0db07abbbd5a14e0cfbc429d93d193eb9d5 100644 (file)
@@ -1,16 +1,20 @@
 """ Common housekeeping for all daemons """
 
-from configparser import ConfigParser, SectionProxy
+from configparser import ConfigParser
+from importlib import import_module
 from getopt import getopt
+from json import dumps
 from logging import Formatter, getLogger, Logger, StreamHandler, DEBUG, INFO
 from logging.handlers import SysLogHandler
 from pkg_resources import get_distribution, DistributionNotFound
 from sys import argv, stderr, stdout
-from typing import Any, Dict, List, Optional, Tuple, Union
+from typing import Any, cast, Dict, List, Optional, Tuple, Union
+from types import SimpleNamespace
+
+from .protomodule import ProtoClass, ProtoModule
 
 CONF = "/etc/loctrkd.conf"
-PORT = 4303
-DBFN = "/var/lib/loctrkd/loctrkd.sqlite"
+pmods: List[ProtoModule] = []
 
 try:
     version = get_distribution("loctrkd").version
@@ -18,13 +22,22 @@ except DistributionNotFound:
     version = "<local>"
 
 
+def init_protocols(conf: ConfigParser) -> None:
+    global pmods
+    pmods = [
+        cast(ProtoModule, import_module("." + modnm, __package__))
+        for modnm in conf.get("common", "protocols").split(",")
+    ]
+
+
 def init(
     log: Logger, opts: Optional[List[Tuple[str, str]]] = None
 ) -> ConfigParser:
     if opts is None:
         opts, _ = getopt(argv[1:], "c:d")
     dopts = dict(opts)
-    conf = readconfig(dopts["-c"] if "-c" in dopts else CONF)
+    conf = ConfigParser()
+    conf.read(dopts["-c"] if "-c" in dopts else CONF)
     log.setLevel(DEBUG if "-d" in dopts else INFO)
     if stdout.isatty():
         fhdl = StreamHandler(stderr)
@@ -40,59 +53,116 @@ def init(
         )
         log.addHandler(lhdl)
         log.info("%s starting with options: %s", version, dopts)
+    init_protocols(conf)
     return conf
 
 
-def readconfig(fname: str) -> ConfigParser:
-    config = ConfigParser()
-    config["collector"] = {
-        "port": str(PORT),
-    }
-    config["storage"] = {
-        "dbfn": DBFN,
-    }
-    config["termconfig"] = {}
-    config.read(fname)
-    return config
-
-
-def normconf(section: SectionProxy) -> Dict[str, Any]:
-    result: Dict[str, Any] = {}
-    for key, val in section.items():
-        vals = val.split("\n")
-        if len(vals) > 1 and vals[0] == "":
-            vals = vals[1:]
-        lst: List[Union[str, int]] = []
-        for el in vals:
-            try:
-                lst.append(int(el, 0))
-            except ValueError:
-                if el[0] == '"' and el[-1] == '"':
-                    el = el.strip('"').rstrip('"')
-                lst.append(el)
-        if not (
-            all([isinstance(x, int) for x in lst])
-            or all([isinstance(x, str) for x in lst])
-        ):
-            raise ValueError(
-                "Values of %s - %s are of different type", key, vals
+def probe_pmod(segment: bytes) -> Optional[ProtoModule]:
+    for pmod in pmods:
+        if pmod.probe_buffer(segment):
+            return pmod
+    return None
+
+
+def pmod_for_proto(proto: str) -> Optional[ProtoModule]:
+    for pmod in pmods:
+        if pmod.proto_handled(proto):
+            return pmod
+    return None
+
+
+def pmod_by_name(pmodname: str) -> Optional[ProtoModule]:
+    for pmod in pmods:
+        if pmod.__name__.split(".")[-1] == pmodname:
+            return pmod
+    return None
+
+
+def make_response(
+    pmodname: str, cmd: str, imei: str, **kwargs: Any
+) -> Optional[ProtoClass.Out]:
+    pmod = pmod_by_name(pmodname)
+    if pmod is None:
+        return None
+    return pmod.make_response(cmd, imei, **kwargs)
+
+
+def parse_message(proto: str, packet: bytes, is_incoming: bool = True) -> Any:
+    pmod = pmod_for_proto(proto)
+    return pmod.parse_message(packet, is_incoming) if pmod else None
+
+
+def exposed_protos() -> List[Tuple[str, bool]]:
+    return [item for pmod in pmods for item in pmod.exposed_protos()]
+
+
+class Report:
+    TYPE: str
+
+    def __repr__(self) -> str:
+        return (
+            self.__class__.__name__
+            + "("
+            + ", ".join(
+                [f"{k}={v.__repr__()}" for k, v in self.__dict__.items()]
             )
-        if len(lst) == 1:
-            result[key] = lst[0]
-        else:
-            result[key] = lst
-    return result
+            + ")"
+        )
+
+    @property
+    def json(self) -> str:
+        self.type = self.TYPE
+        return dumps(self.__dict__)
+
+
+class CoordReport(Report):
+    TYPE = "location"
+
+    def __init__(
+        self,
+        *,
+        devtime: str,
+        battery_percentage: Optional[int],
+        accuracy: Optional[float],
+        altitude: Optional[float],
+        speed: Optional[float],
+        direction: Optional[float],
+        latitude: float,
+        longitude: float,
+    ) -> None:
+        self.devtime = devtime
+        self.battery_percentage = battery_percentage
+        self.accuracy = accuracy
+        self.altitude = altitude
+        self.speed = speed
+        self.direction = direction
+        self.latitude = latitude
+        self.longitude = longitude
+
+
+class HintReport(Report):
+    TYPE = "approximate_location"
 
+    def __init__(
+        self,
+        *,
+        devtime: str,
+        battery_percentage: Optional[int],
+        mcc: int,
+        mnc: int,
+        gsm_cells: List[Tuple[int, int, int]],
+        wifi_aps: List[Tuple[str, str, int]],
+    ) -> None:
+        self.devtime = devtime
+        self.battery_percentage = battery_percentage
+        self.mcc = mcc
+        self.mnc = mnc
+        self.gsm_cells = gsm_cells
+        self.wifi_aps = wifi_aps
 
-if __name__ == "__main__":
-    from sys import argv
 
-    def _print_config(conf: ConfigParser) -> None:
-        for section in conf.sections():
-            print("section", section)
-            for option in conf.options(section):
-                print("    ", option, conf[section][option])
+class StatusReport(Report):
+    TYPE = "status"
 
-    conf = readconfig(argv[1])
-    _print_config(conf)
-    print(normconf(conf["termconfig"]))
+    def __init__(self, *, battery_percentage: int) -> None:
+        self.battery_percentage = battery_percentage