X-Git-Url: http://www.average.org/gitweb/?a=blobdiff_plain;f=gps303%2Fwsgateway.py;h=d3c0541158941dedd017aba669e40042fd11f928;hb=3e82a6ab6541a8f0bdb091210766e14b50228c79;hp=15a6517d0c61ac14535f346545b1c03607abcc1a;hpb=3cfd06a9124bc710153520bc7f623c71051b2fb9;p=loctrkd.git diff --git a/gps303/wsgateway.py b/gps303/wsgateway.py index 15a6517..d3c0541 100644 --- a/gps303/wsgateway.py +++ b/gps303/wsgateway.py @@ -1,5 +1,7 @@ """ Websocket Gateway """ +from datetime import datetime, timezone +from json import dumps, loads from logging import getLogger from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR from time import time @@ -16,14 +18,43 @@ from wsproto.utilities import RemoteProtocolError import zmq from . import common -from .zmsg import LocEvt +from .evstore import initdb, fetch +from .gps303proto import ( + GPS_POSITIONING, + WIFI_POSITIONING, + parse_message, +) +from .zmsg import Bcast, topic log = getLogger("gps303/wsgateway") -htmldata = None +htmlfile = None + + +def backlog(imei, numback): + result = [] + for is_incoming, timestamp, packet in fetch( + imei, + ((True, GPS_POSITIONING.PROTO), (False, WIFI_POSITIONING.PROTO)), + numback, + ): + msg = parse_message(packet, is_incoming=is_incoming) + result.append( + { + "imei": imei, + "timestamp": str( + datetime.fromtimestamp(timestamp).astimezone( + tz=timezone.utc + ) + ), + "longitude": msg.longitude, + "latitude": msg.latitude, + } + ) + return result def try_http(data, fd, e): - global htmldata + global htmlfile try: lines = data.decode().split("\r\n") request = lines[0] @@ -37,25 +68,39 @@ def try_http(data, fd, e): fd, headers, ) + try: + pos = resource.index("?") + resource = resource[:pos] + except ValueError: + pass if op == "GET": - if htmldata is None: + if htmlfile is None: return ( f"{proto} 500 No data configured\r\n" f"Content-Type: text/plain\r\n\r\n" f"HTML data not configure on the server\r\n".encode() ) elif resource == "/": - length = len(htmldata.encode("utf-8")) - return ( - f"{proto} 200 Ok\r\n" - f"Content-Type: text/html; charset=utf-8\r\n" - f"Content-Length: {length:d}\r\n\r\n" + htmldata - ).encode("utf-8") + try: + with open(htmlfile, "rb") as fl: + htmldata = fl.read() + length = len(htmldata) + return ( + f"{proto} 200 Ok\r\n" + f"Content-Type: text/html; charset=utf-8\r\n" + f"Content-Length: {len(htmldata):d}\r\n\r\n" + ).encode("utf-8") + htmldata + except OSError: + return ( + f"{proto} 500 File not found\r\n" + f"Content-Type: text/plain\r\n\r\n" + f"HTML file could not be opened\r\n".encode() + ) else: return ( f"{proto} 404 File not found\r\n" f"Content-Type: text/plain\r\n\r\n" - f"We can only serve \"/\"\r\n".encode() + f'We can only serve "/"\r\n'.encode() ) else: return ( @@ -76,6 +121,8 @@ class Client: self.addr = addr self.ws = WSConnection(ConnectionType.SERVER) self.ws_data = b"" + self.ready = False + self.imeis = set() def close(self): log.debug("Closing fd %d", self.sock.fileno()) @@ -108,6 +155,7 @@ class Client: e, ) self.ws_data = try_http(data, self.sock.fileno(), e) + self.write() # TODO this is a hack log.debug("Sending HTTP response to %d", self.sock.fileno()) msgs = None else: @@ -117,34 +165,51 @@ class Client: log.debug("WebSocket upgrade on fd %d", self.sock.fileno()) # self.ws_data += self.ws.send(event.response()) # Why not?! self.ws_data += self.ws.send(AcceptConnection()) + self.ready = True elif isinstance(event, (CloseConnection, Ping)): log.debug("%s on fd %d", event, self.sock.fileno()) self.ws_data += self.ws.send(event.response()) elif isinstance(event, TextMessage): - # TODO: save imei "subscription" log.debug("%s on fd %d", event, self.sock.fileno()) - msgs.append(event.data) + msg = loads(event.data) + msgs.append(msg) + if msg.get("type", None) == "subscribe": + self.imeis = set(msg.get("imei", [])) + log.debug( + "subs list on fd %s is %s", + self.sock.fileno(), + self.imeis, + ) else: log.warning("%s on fd %d", event, self.sock.fileno()) - if self.ws_data: # Temp hack - self.write() return msgs - def send(self, imei, message): - # TODO: filter only wanted imei got from the client - self.ws_data += self.ws.send(Message(data=message)) + def wants(self, imei): + log.debug( + "wants %s? set is %s on fd %d", + imei, + self.imeis, + self.sock.fileno(), + ) + return imei in self.imeis + + def send(self, message): + if self.ready and message["imei"] in self.imeis: + self.ws_data += self.ws.send(Message(data=dumps(message))) def write(self): - try: - sent = self.sock.send(self.ws_data) - self.ws_data = self.ws_data[sent:] - except OSError as e: - log.error( - "Sending to fd %d: %s", - self.sock.fileno(), - e, - ) - self.ws_data = b"" + if self.ws_data: + try: + sent = self.sock.send(self.ws_data) + self.ws_data = self.ws_data[sent:] + except OSError as e: + log.error( + "Sending to fd %d: %s", + self.sock.fileno(), + e, + ) + self.ws_data = b"" + return bool(self.ws_data) class Clients: @@ -165,33 +230,38 @@ class Clients: def recv(self, fd): clnt = self.by_fd[fd] - msgs = clnt.recv() - if msgs is None: - return None - result = [] - for msg in msgs: - log.debug("Received: %s", msg) - return result + return clnt.recv() + + def send(self, msg): + towrite = set() + for fd, clnt in self.by_fd.items(): + if clnt.wants(msg["imei"]): + clnt.send(msg) + towrite.add(fd) + return towrite + + def write(self, towrite): + waiting = set() + for fd, clnt in [(fd, self.by_fd.get(fd)) for fd in towrite]: + if clnt.write(): + waiting.add(fd) + return waiting - def send(self, msgs): + def subs(self): + result = set() for clnt in self.by_fd.values(): - clnt.send(msgs) - clnt.write() + result |= clnt.imeis + return result def runserver(conf): - global htmldata - try: - with open( - conf.get("wsgateway", "htmlfile"), encoding="utf-8" - ) as fl: - htmldata = fl.read() - except OSError: - pass + global htmlfile + + initdb(conf.get("storage", "dbfn")) + htmlfile = conf.get("wsgateway", "htmlfile") zctx = zmq.Context() zsub = zctx.socket(zmq.SUB) - zsub.connect(conf.get("lookaside", "publishurl")) - zsub.setsockopt(zmq.SUBSCRIBE, b"") + zsub.connect(conf.get("collector", "publishurl")) tcpl = socket(AF_INET6, SOCK_STREAM) tcpl.setblocking(False) tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) @@ -202,18 +272,55 @@ def runserver(conf): poller.register(zsub, flags=zmq.POLLIN) poller.register(tcpfd, flags=zmq.POLLIN) clients = Clients() + activesubs = set() try: + towait = set() while True: + neededsubs = clients.subs() + for imei in neededsubs - activesubs: + zsub.setsockopt( + zmq.SUBSCRIBE, + topic(GPS_POSITIONING.PROTO, True, imei), + ) + zsub.setsockopt( + zmq.SUBSCRIBE, + topic(WIFI_POSITIONING.PROTO, False, imei), + ) + for imei in activesubs - neededsubs: + zsub.setsockopt( + zmq.UNSUBSCRIBE, + topic(GPS_POSITIONING.PROTO, True, imei), + ) + zsub.setsockopt( + zmq.UNSUBSCRIBE, + topic(WIFI_POSITIONING.PROTO, False, imei), + ) + activesubs = neededsubs + log.debug("Subscribed to: %s", activesubs) tosend = [] topoll = [] tostop = [] - events = poller.poll(1000) + towrite = set() + events = poller.poll() for sk, fl in events: if sk is zsub: while True: try: - msg = zsub.recv(zmq.NOBLOCK) - tosend.append(LocEvt(msg)) + zmsg = Bcast(zsub.recv(zmq.NOBLOCK)) + msg = parse_message(zmsg.packet, zmsg.is_incoming) + log.debug("Got %s with %s", zmsg, msg) + tosend.append( + { + "imei": zmsg.imei, + "timestamp": str( + datetime.fromtimestamp( + zmsg.when + ).astimezone(tz=timezone.utc) + ), + "longitude": msg.longitude, + "latitude": msg.latitude, + } + ) except zmq.Again: break elif sk == tcpfd: @@ -224,9 +331,20 @@ def runserver(conf): if received is None: log.debug("Client gone from fd %d", sk) tostop.append(sk) + towait.discard(fd) else: for msg in received: log.debug("Received from %d: %s", sk, msg) + if msg.get("type", None) == "subscribe": + imeis = msg.get("imei") + numback = msg.get("backlog", 5) + for imei in imeis: + tosend.extend(backlog(imei, numback)) + towrite.add(sk) + elif fl & zmq.POLLOUT: + log.debug("Write now open for fd %d", sk) + towrite.add(sk) + towait.discard(sk) else: log.debug("Stray event: %s on socket %s", fl, sk) # poll queue consumed, make changes now @@ -234,12 +352,26 @@ def runserver(conf): poller.unregister(fd) clients.stop(fd) for zmsg in tosend: - log.debug("Sending to the client: %s", zmsg) - clients.send(zmsg) + log.debug("Sending to the clients: %s", zmsg) + towrite |= clients.send(zmsg) for clntsock, clntaddr in topoll: fd = clients.add(clntsock, clntaddr) poller.register(fd, flags=zmq.POLLIN) - # TODO: Handle write overruns (register for POLLOUT) + # Deal with actually writing the data out + trywrite = towrite - towait + morewait = clients.write(trywrite) + log.debug( + "towait %s, tried %s, still busy %s", + towait, + trywrite, + morewait, + ) + for fd in morewait - trywrite: # new fds waiting for write + poller.modify(fd, flags=zmq.POLLIN | zmq.POLLOUT) + for fd in trywrite - morewait: # no longer waiting for write + poller.modify(fd, flags=zmq.POLLIN) + towait &= trywrite + towait |= morewait except KeyboardInterrupt: pass