X-Git-Url: http://www.average.org/gitweb/?a=blobdiff_plain;f=gps303%2Fwsgateway.py;h=7756fe9d52ae10d0bcdb8012869ac7e78f3393b9;hb=96538346bd332d76d2cac5d6a0ef2b4e4a40de30;hp=127dbb4b002fc8c62dce6be2cd42e35b6df54e3e;hpb=7dfa156ec288ef302e945f1ad2a66b91f7ce5934;p=loctrkd.git diff --git a/gps303/wsgateway.py b/gps303/wsgateway.py index 127dbb4..7756fe9 100644 --- a/gps303/wsgateway.py +++ b/gps303/wsgateway.py @@ -1,5 +1,6 @@ """ Websocket Gateway """ +from json import loads from logging import getLogger from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR from time import time @@ -16,14 +17,15 @@ from wsproto.utilities import RemoteProtocolError import zmq from . import common +from .backlog import blinit, backlog from .zmsg import LocEvt log = getLogger("gps303/wsgateway") -htmldata = None +htmlfile = None def try_http(data, fd, e): - global htmldata + global htmlfile try: lines = data.decode().split("\r\n") request = lines[0] @@ -43,19 +45,28 @@ def try_http(data, fd, e): except ValueError: pass if op == "GET": - if htmldata is None: + if htmlfile is None: return ( f"{proto} 500 No data configured\r\n" f"Content-Type: text/plain\r\n\r\n" f"HTML data not configure on the server\r\n".encode() ) elif resource == "/": - length = len(htmldata.encode("utf-8")) - return ( - f"{proto} 200 Ok\r\n" - f"Content-Type: text/html; charset=utf-8\r\n" - f"Content-Length: {length:d}\r\n\r\n" + htmldata - ).encode("utf-8") + try: + with open(htmlfile, "rb") as fl: + htmldata = fl.read() + length = len(htmldata) + return ( + f"{proto} 200 Ok\r\n" + f"Content-Type: text/html; charset=utf-8\r\n" + f"Content-Length: {len(htmldata):d}\r\n\r\n" + ).encode("utf-8") + htmldata + except OSError: + return ( + f"{proto} 500 File not found\r\n" + f"Content-Type: text/plain\r\n\r\n" + f"HTML file could not be opened\r\n".encode() + ) else: return ( f"{proto} 404 File not found\r\n" @@ -81,6 +92,8 @@ class Client: self.addr = addr self.ws = WSConnection(ConnectionType.SERVER) self.ws_data = b"" + self.ready = False + self.imeis = set() def close(self): log.debug("Closing fd %d", self.sock.fileno()) @@ -113,6 +126,7 @@ class Client: e, ) self.ws_data = try_http(data, self.sock.fileno(), e) + self.write() # TODO this is a hack log.debug("Sending HTTP response to %d", self.sock.fileno()) msgs = None else: @@ -122,25 +136,33 @@ class Client: log.debug("WebSocket upgrade on fd %d", self.sock.fileno()) # self.ws_data += self.ws.send(event.response()) # Why not?! self.ws_data += self.ws.send(AcceptConnection()) + self.ready = True elif isinstance(event, (CloseConnection, Ping)): log.debug("%s on fd %d", event, self.sock.fileno()) self.ws_data += self.ws.send(event.response()) elif isinstance(event, TextMessage): # TODO: save imei "subscription" log.debug("%s on fd %d", event, self.sock.fileno()) - msgs.append(event.data) + msg = loads(event.data) + msgs.append(msg) + if msg.get("type", None) == "subscribe": + self.imeis = set(msg.get("imei", [])) + log.debug( + "subs list on fd %s is %s", + self.sock.fileno(), + self.imeis, + ) else: log.warning("%s on fd %d", event, self.sock.fileno()) - if self.ws_data: # Temp hack - self.write() return msgs def wants(self, imei): + log.debug("wants %s? set is %s on fd %d", imei, self.imeis, self.sock.fileno()) return True # TODO: check subscriptions def send(self, message): - # TODO: filter only wanted imei got from the client - self.ws_data += self.ws.send(Message(data=message.json)) + if self.ready and message.imei in self.imeis: + self.ws_data += self.ws.send(Message(data=message.json)) def write(self): if self.ws_data: @@ -175,13 +197,7 @@ class Clients: def recv(self, fd): clnt = self.by_fd[fd] - msgs = clnt.recv() - if msgs is None: - return None - result = [] - for msg in msgs: - log.debug("Received: %s", msg) - return result + return clnt.recv() def send(self, msg): towrite = set() @@ -198,18 +214,21 @@ class Clients: waiting.add(fd) return waiting + def subs(self): + result = set() + for clnt in self.by_fd.values(): + result |= clnt.imeis + return result + def runserver(conf): - global htmldata - try: - with open(conf.get("wsgateway", "htmlfile"), encoding="utf-8") as fl: - htmldata = fl.read() - except OSError: - pass + global htmlfile + + blinit(conf.get("storage", "dbfn"), conf.get("opencellid", "dbfn")) + htmlfile = conf.get("wsgateway", "htmlfile") zctx = zmq.Context() zsub = zctx.socket(zmq.SUB) zsub.connect(conf.get("lookaside", "publishurl")) - zsub.setsockopt(zmq.SUBSCRIBE, b"") tcpl = socket(AF_INET6, SOCK_STREAM) tcpl.setblocking(False) tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) @@ -220,14 +239,22 @@ def runserver(conf): poller.register(zsub, flags=zmq.POLLIN) poller.register(tcpfd, flags=zmq.POLLIN) clients = Clients() + activesubs = set() try: towait = set() while True: + neededsubs = clients.subs() + for imei in neededsubs - activesubs: + zsub.setsockopt(zmq.SUBSCRIBE, imei.encode()) + for imei in activesubs - neededsubs: + zsub.setsockopt(zmq.UNSUBSCRIBE, imei.encode()) + activesubs = neededsubs + log.debug("Subscribed to: %s", activesubs) tosend = [] topoll = [] tostop = [] towrite = set() - events = poller.poll(5000) + events = poller.poll() for sk, fl in events: if sk is zsub: while True: @@ -244,9 +271,15 @@ def runserver(conf): if received is None: log.debug("Client gone from fd %d", sk) tostop.append(sk) + towait.discard(fd) else: for msg in received: log.debug("Received from %d: %s", sk, msg) + if msg.get("type", None) == "subscribe": + imei = msg.get("imei") + if imei: + tosend.extend(backlog(imei[0], 5)) + towrite.add(sk) elif fl & zmq.POLLOUT: log.debug("Write now open for fd %d", sk) towrite.add(sk)