]> www.average.org Git - loctrkd.git/blob - gps303/mock.py
mock use persistent history for enrered commands
[loctrkd.git] / gps303 / mock.py
1 """ Generate and publish locevt from the text input """
2
3 import atexit
4 from datetime import datetime, timezone
5 from logging import getLogger
6 from os import path, umask
7 from readline import read_history_file, set_history_length, write_history_file
8 from sys import argv
9 import zmq
10
11 from . import common
12 from .zmsg import LocEvt
13
14 log = getLogger("gps303/watch")
15
16 RL_HISTORY = path.join(path.expanduser("~"), ".gps303_history")
17
18 def main(conf):
19     zctx = zmq.Context()
20     zpub = zctx.socket(zmq.PUB)
21     oldmask = umask(0o117)
22     zpub.bind(conf.get("lookaside", "publishurl"))
23     umask(oldmask)
24     try:
25         read_history_file(RL_HISTORY)
26     except FileNotFoundError:
27         pass
28     set_history_length(1000)
29     atexit.register(write_history_file, RL_HISTORY)
30
31     while True:
32         try:
33             line = input("> ")
34         except EOFError:
35             break
36         line = line.rstrip("\r\n")
37         args = line.split(" ")
38         imei = args[0]
39         kwargs = dict([arg.split("=") for arg in args[1:]])
40         msg = LocEvt(imei=imei, **kwargs)
41         print("Publishing:", msg)
42         zpub.send(msg.packed)
43
44
45 if __name__.endswith("__main__"):
46     main(common.init(log))