]> www.average.org Git - loctrkd.git/blob - gps303/common.py
typechecking: annotate common.py
[loctrkd.git] / gps303 / common.py
1 """ Common housekeeping for all daemons """
2
3 from configparser import ConfigParser, SectionProxy
4 from getopt import getopt
5 from logging import Formatter, getLogger, Logger, StreamHandler, DEBUG, INFO
6 from logging.handlers import SysLogHandler
7 from pkg_resources import get_distribution, DistributionNotFound
8 from sys import argv, stderr, stdout
9 from typing import Any, Dict, List, Optional, Tuple, Union
10
11 CONF = "/etc/gps303.conf"
12 PORT = 4303
13 DBFN = "/var/lib/gps303/gps303.sqlite"
14
15 try:
16     version = get_distribution("gps303").version
17 except DistributionNotFound:
18     version = "<local>"
19
20
21 def init(
22     log: Logger, opts: Optional[List[Tuple[str, str]]] = None
23 ) -> ConfigParser:
24     if opts is None:
25         opts, _ = getopt(argv[1:], "c:d")
26     dopts = dict(opts)
27     conf = readconfig(dopts["-c"] if "-c" in dopts else CONF)
28     log.setLevel(DEBUG if "-d" in dopts else INFO)
29     if stdout.isatty():
30         fhdl = StreamHandler(stderr)
31         fhdl.setFormatter(
32             Formatter("%(asctime)s - %(levelname)s - %(message)s")
33         )
34         log.addHandler(fhdl)
35         log.debug("%s starting with options: %s", version, dopts)
36     else:
37         lhdl = SysLogHandler(address="/dev/log")
38         lhdl.setFormatter(
39             Formatter("%(name)s[%(process)d]: %(levelname)s - %(message)s")
40         )
41         log.addHandler(lhdl)
42         log.info("%s starting with options: %s", version, dopts)
43     return conf
44
45
46 def readconfig(fname: str) -> ConfigParser:
47     config = ConfigParser()
48     config["collector"] = {
49         "port": str(PORT),
50     }
51     config["storage"] = {
52         "dbfn": DBFN,
53     }
54     config["termconfig"] = {}
55     config.read(fname)
56     return config
57
58
59 def normconf(section: SectionProxy) -> Dict[str, Any]:
60     result: Dict[str, Any] = {}
61     for key, val in section.items():
62         vals = val.split("\n")
63         if len(vals) > 1 and vals[0] == "":
64             vals = vals[1:]
65         lst: List[Union[str, int]] = []
66         for el in vals:
67             try:
68                 lst.append(int(el, 0))
69             except ValueError:
70                 if el[0] == '"' and el[-1] == '"':
71                     el = el.strip('"').rstrip('"')
72                 lst.append(el)
73         if not (
74             all([isinstance(x, int) for x in lst])
75             or all([isinstance(x, str) for x in lst])
76         ):
77             raise ValueError(
78                 "Values of %s - %s are of different type", key, vals
79             )
80         if len(lst) == 1:
81             result[key] = lst[0]
82         else:
83             result[key] = lst
84     return result
85
86
87 if __name__ == "__main__":
88     from sys import argv
89
90     def _print_config(conf: ConfigParser) -> None:
91         for section in conf.sections():
92             print("section", section)
93             for option in conf.options(section):
94                 print("    ", option, conf[section][option])
95
96     conf = readconfig(argv[1])
97     _print_config(conf)
98     print(normconf(conf["termconfig"]))