]> www.average.org Git - loctrkd.git/blob - loctrkd/common.py
abstract protocol selection in `common`
[loctrkd.git] / loctrkd / common.py
1 """ Common housekeeping for all daemons """
2
3 from configparser import ConfigParser
4 from importlib import import_module
5 from getopt import getopt
6 from logging import Formatter, getLogger, Logger, StreamHandler, DEBUG, INFO
7 from logging.handlers import SysLogHandler
8 from pkg_resources import get_distribution, DistributionNotFound
9 from sys import argv, stderr, stdout
10 from typing import Any, cast, Dict, List, Optional, Tuple, Union
11
12 from .protomodule import ProtoModule
13
14 CONF = "/etc/loctrkd.conf"
15 pmods: List[ProtoModule] = []
16
17 try:
18     version = get_distribution("loctrkd").version
19 except DistributionNotFound:
20     version = "<local>"
21
22
23 def init_protocols(conf: ConfigParser) -> None:
24     global pmods
25     pmods = [
26         cast(ProtoModule, import_module("." + modnm, __package__))
27         for modnm in conf.get("common", "protocols").split(",")
28     ]
29
30
31 def init(
32     log: Logger, opts: Optional[List[Tuple[str, str]]] = None
33 ) -> ConfigParser:
34     if opts is None:
35         opts, _ = getopt(argv[1:], "c:d")
36     dopts = dict(opts)
37     conf = ConfigParser()
38     conf.read(dopts["-c"] if "-c" in dopts else CONF)
39     log.setLevel(DEBUG if "-d" in dopts else INFO)
40     if stdout.isatty():
41         fhdl = StreamHandler(stderr)
42         fhdl.setFormatter(
43             Formatter("%(asctime)s - %(levelname)s - %(message)s")
44         )
45         log.addHandler(fhdl)
46         log.debug("%s starting with options: %s", version, dopts)
47     else:
48         lhdl = SysLogHandler(address="/dev/log")
49         lhdl.setFormatter(
50             Formatter("%(name)s[%(process)d]: %(levelname)s - %(message)s")
51         )
52         log.addHandler(lhdl)
53         log.info("%s starting with options: %s", version, dopts)
54     init_protocols(conf)
55     return conf
56
57
58 def probe_pmod(segment: bytes) -> Optional[ProtoModule]:
59     for pmod in pmods:
60         if pmod.probe_buffer(segment):
61             return pmod
62     return None
63
64
65 def pmod_for_proto(proto: str) -> Optional[ProtoModule]:
66     for pmod in pmods:
67         if pmod.proto_handled(proto):
68             return pmod
69     return None