]> www.average.org Git - loctrkd.git/blob - gps303/__main__.py
typeckeck: annotate __main__ and fix gps303proto
[loctrkd.git] / gps303 / __main__.py
1 """ Command line tool for sending requests to the terminal """
2
3 from configparser import ConfigParser
4 from datetime import datetime, timezone
5 from getopt import getopt
6 from logging import getLogger
7 from sys import argv
8 from time import time
9 from typing import List, Tuple
10 import zmq
11
12 from . import common
13 from .gps303proto import *
14 from .zmsg import Bcast, Resp
15
16 log = getLogger("gps303")
17
18
19 def main(
20     conf: ConfigParser, opts: List[Tuple[str, str]], args: List[str]
21 ) -> None:
22     # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
23     zctx = zmq.Context()  # type: ignore
24     zpush = zctx.socket(zmq.PUSH)  # type: ignore
25     zpush.connect(conf.get("collector", "listenurl"))
26
27     if len(args) < 2:
28         raise ValueError(
29             "Too few args, need IMEI and command min: " + str(args)
30         )
31     imei = args[0]
32     cmd = args[1]
33     args = args[2:]
34     cls = class_by_prefix(cmd)
35     if isinstance(cls, list):
36         raise ValueError("Prefix does not select a single class: " + str(cls))
37     kwargs = dict([arg.split("=") for arg in args])
38     for arg in args:
39         k, v = arg.split("=")
40         kwargs[k] = v
41     resp = Resp(imei=imei, when=time(), packet=cls.Out(**kwargs).packed)
42     log.debug("Response: %s", resp)
43     zpush.send(resp.packed)
44
45
46 if __name__.endswith("__main__"):
47     opts, args = getopt(argv[1:], "c:d")
48     main(common.init(log, opts=opts), opts, args)