X-Git-Url: http://www.average.org/gitweb/?p=loctrkd.git;a=blobdiff_plain;f=loctrkd%2Fwsgateway.py;fp=loctrkd%2Fwsgateway.py;h=c5dcf5a785a03d13e8748bbfb441afec5b21492d;hp=e94845eec4215e04797bde4e0dce43b05e8780ec;hb=2cf0fd9d215dda17eae4261ab7967367f6aa0028;hpb=6eff65f7b03bc66a479df0fd694250e1e0b7c5ae diff --git a/loctrkd/wsgateway.py b/loctrkd/wsgateway.py index e94845e..c5dcf5a 100644 --- a/loctrkd/wsgateway.py +++ b/loctrkd/wsgateway.py @@ -22,9 +22,9 @@ from wsproto.utilities import RemoteProtocolError import zmq from . import common -from .evstore import initdb, fetch +from .evstore import initdb, fetch, fetchpmod from .protomodule import ProtoModule -from .zmsg import Rept, rtopic +from .zmsg import Rept, Resp, rtopic log = getLogger("loctrkd/wsgateway") @@ -101,6 +101,9 @@ class Client: self.ready = False self.imeis: Set[str] = set() + def __str__(self) -> str: + return f"{self.__class__.__name__}(fd={self.sock.fileno()}, addr={self.addr})" + def close(self) -> None: log.debug("Closing fd %d", self.sock.fileno()) self.sock.close() @@ -247,6 +250,42 @@ class Clients: return result +def sendcmd(zpush: Any, wsmsg: Dict[str, Any]) -> Dict[str, Any]: + imei = wsmsg.pop("imei", None) + cmd = wsmsg.pop("type", None) + if imei is None or cmd is None: + log.info("Unhandled message %s %s %s", cmd, imei, wsmsg) + return { + "type": "cmdresult", + "imei": imei, + "result": "Did not get imei or cmd", + } + pmod = fetchpmod(imei) + if pmod is None: + log.info("Uknown type of recipient for %s %s %s", cmd, imei, wsmsg) + return { + "type": "cmdresult", + "imei": imei, + "result": "Type of the terminal is unknown", + } + tmsg = common.make_response(pmod, cmd, imei, **wsmsg) + if tmsg is None: + log.info("Could not make packet for %s %s %s", cmd, imei, wsmsg) + return { + "type": "cmdresult", + "imei": imei, + "result": f"{cmd} unimplemented for terminal protocol {pmod}", + } + resp = Resp(imei=imei, when=time(), packet=tmsg.packed) + log.debug("Response: %s", resp) + zpush.send(resp.packed) + return { + "type": "cmdresult", + "imei": imei, + "result": f"{cmd} sent to {imei}", + } + + def runserver(conf: ConfigParser) -> None: global htmlfile initdb(conf.get("storage", "dbfn")) @@ -255,6 +294,8 @@ def runserver(conf: ConfigParser) -> None: zctx = zmq.Context() # type: ignore zsub = zctx.socket(zmq.SUB) # type: ignore zsub.connect(conf.get("rectifier", "publishurl")) + zpush = zctx.socket(zmq.PUSH) # type: ignore + zpush.connect(conf.get("collector", "listenurl")) tcpl = socket(AF_INET6, SOCK_STREAM) tcpl.setblocking(False) tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) @@ -315,6 +356,8 @@ def runserver(conf: ConfigParser) -> None: for msg in backlog(imei, numback) ] ) + else: + tosend.append((clnt, sendcmd(zpush, wsmsg))) towrite.add(sk) elif fl & zmq.POLLOUT: log.debug("Write now open for fd %d", sk)