X-Git-Url: http://www.average.org/gitweb/?p=loctrkd.git;a=blobdiff_plain;f=loctrkd%2Fcommon.py;h=162fe0db07abbbd5a14e0cfbc429d93d193eb9d5;hp=227611c62fa4b8cf04f6bb22f2f8e4754cd744f2;hb=2cf0fd9d215dda17eae4261ab7967367f6aa0028;hpb=dbdf9d63af31770ad57302e16b17a2fdc526773f diff --git a/loctrkd/common.py b/loctrkd/common.py index 227611c..162fe0d 100644 --- a/loctrkd/common.py +++ b/loctrkd/common.py @@ -1,16 +1,20 @@ """ Common housekeeping for all daemons """ -from configparser import ConfigParser, SectionProxy +from configparser import ConfigParser +from importlib import import_module from getopt import getopt +from json import dumps from logging import Formatter, getLogger, Logger, StreamHandler, DEBUG, INFO from logging.handlers import SysLogHandler from pkg_resources import get_distribution, DistributionNotFound from sys import argv, stderr, stdout -from typing import Any, Dict, List, Optional, Tuple, Union +from typing import Any, cast, Dict, List, Optional, Tuple, Union +from types import SimpleNamespace + +from .protomodule import ProtoClass, ProtoModule CONF = "/etc/loctrkd.conf" -PORT = 4303 -DBFN = "/var/lib/loctrkd/loctrkd.sqlite" +pmods: List[ProtoModule] = [] try: version = get_distribution("loctrkd").version @@ -18,13 +22,22 @@ except DistributionNotFound: version = "" +def init_protocols(conf: ConfigParser) -> None: + global pmods + pmods = [ + cast(ProtoModule, import_module("." + modnm, __package__)) + for modnm in conf.get("common", "protocols").split(",") + ] + + def init( log: Logger, opts: Optional[List[Tuple[str, str]]] = None ) -> ConfigParser: if opts is None: opts, _ = getopt(argv[1:], "c:d") dopts = dict(opts) - conf = readconfig(dopts["-c"] if "-c" in dopts else CONF) + conf = ConfigParser() + conf.read(dopts["-c"] if "-c" in dopts else CONF) log.setLevel(DEBUG if "-d" in dopts else INFO) if stdout.isatty(): fhdl = StreamHandler(stderr) @@ -40,59 +53,116 @@ def init( ) log.addHandler(lhdl) log.info("%s starting with options: %s", version, dopts) + init_protocols(conf) return conf -def readconfig(fname: str) -> ConfigParser: - config = ConfigParser() - config["collector"] = { - "port": str(PORT), - } - config["storage"] = { - "dbfn": DBFN, - } - config["termconfig"] = {} - config.read(fname) - return config - - -def normconf(section: SectionProxy) -> Dict[str, Any]: - result: Dict[str, Any] = {} - for key, val in section.items(): - vals = val.split("\n") - if len(vals) > 1 and vals[0] == "": - vals = vals[1:] - lst: List[Union[str, int]] = [] - for el in vals: - try: - lst.append(int(el, 0)) - except ValueError: - if el[0] == '"' and el[-1] == '"': - el = el.strip('"').rstrip('"') - lst.append(el) - if not ( - all([isinstance(x, int) for x in lst]) - or all([isinstance(x, str) for x in lst]) - ): - raise ValueError( - "Values of %s - %s are of different type", key, vals +def probe_pmod(segment: bytes) -> Optional[ProtoModule]: + for pmod in pmods: + if pmod.probe_buffer(segment): + return pmod + return None + + +def pmod_for_proto(proto: str) -> Optional[ProtoModule]: + for pmod in pmods: + if pmod.proto_handled(proto): + return pmod + return None + + +def pmod_by_name(pmodname: str) -> Optional[ProtoModule]: + for pmod in pmods: + if pmod.__name__.split(".")[-1] == pmodname: + return pmod + return None + + +def make_response( + pmodname: str, cmd: str, imei: str, **kwargs: Any +) -> Optional[ProtoClass.Out]: + pmod = pmod_by_name(pmodname) + if pmod is None: + return None + return pmod.make_response(cmd, imei, **kwargs) + + +def parse_message(proto: str, packet: bytes, is_incoming: bool = True) -> Any: + pmod = pmod_for_proto(proto) + return pmod.parse_message(packet, is_incoming) if pmod else None + + +def exposed_protos() -> List[Tuple[str, bool]]: + return [item for pmod in pmods for item in pmod.exposed_protos()] + + +class Report: + TYPE: str + + def __repr__(self) -> str: + return ( + self.__class__.__name__ + + "(" + + ", ".join( + [f"{k}={v.__repr__()}" for k, v in self.__dict__.items()] ) - if len(lst) == 1: - result[key] = lst[0] - else: - result[key] = lst - return result + + ")" + ) + + @property + def json(self) -> str: + self.type = self.TYPE + return dumps(self.__dict__) + + +class CoordReport(Report): + TYPE = "location" + + def __init__( + self, + *, + devtime: str, + battery_percentage: Optional[int], + accuracy: Optional[float], + altitude: Optional[float], + speed: Optional[float], + direction: Optional[float], + latitude: float, + longitude: float, + ) -> None: + self.devtime = devtime + self.battery_percentage = battery_percentage + self.accuracy = accuracy + self.altitude = altitude + self.speed = speed + self.direction = direction + self.latitude = latitude + self.longitude = longitude + + +class HintReport(Report): + TYPE = "approximate_location" + def __init__( + self, + *, + devtime: str, + battery_percentage: Optional[int], + mcc: int, + mnc: int, + gsm_cells: List[Tuple[int, int, int]], + wifi_aps: List[Tuple[str, str, int]], + ) -> None: + self.devtime = devtime + self.battery_percentage = battery_percentage + self.mcc = mcc + self.mnc = mnc + self.gsm_cells = gsm_cells + self.wifi_aps = wifi_aps -if __name__ == "__main__": - from sys import argv - def _print_config(conf: ConfigParser) -> None: - for section in conf.sections(): - print("section", section) - for option in conf.options(section): - print(" ", option, conf[section][option]) +class StatusReport(Report): + TYPE = "status" - conf = readconfig(argv[1]) - _print_config(conf) - print(normconf(conf["termconfig"])) + def __init__(self, *, battery_percentage: int) -> None: + self.battery_percentage = battery_percentage