X-Git-Url: http://www.average.org/gitweb/?a=blobdiff_plain;f=gps303%2Fwsgateway.py;h=d66070e2f57a7781fe76a8249ac176536e3192d4;hb=0c28fa6306f252934fe7e2c04522e8540c414387;hp=f9f5c6a2ad936f33c9961419245c65de14414dcd;hpb=c41f98b94321d35cf7f54a9c659fc36b13990f25;p=loctrkd.git diff --git a/gps303/wsgateway.py b/gps303/wsgateway.py index f9f5c6a..d66070e 100644 --- a/gps303/wsgateway.py +++ b/gps303/wsgateway.py @@ -1,6 +1,7 @@ """ Websocket Gateway """ -from json import loads +from datetime import datetime, timezone +from json import dumps, loads from logging import getLogger from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR from time import time @@ -17,7 +18,13 @@ from wsproto.utilities import RemoteProtocolError import zmq from . import common -from .zmsg import LocEvt +from .backlog import blinit, backlog +from .gps303proto import ( + GPS_POSITIONING, + WIFI_POSITIONING, + parse_message, +) +from .zmsg import Bcast, topic log = getLogger("gps303/wsgateway") htmlfile = None @@ -156,12 +163,17 @@ class Client: return msgs def wants(self, imei): - log.debug("wants %s? set is %s on fd %d", imei, self.imeis, self.sock.fileno()) + log.debug( + "wants %s? set is %s on fd %d", + imei, + self.imeis, + self.sock.fileno(), + ) return True # TODO: check subscriptions def send(self, message): - if self.ready and message.imei in self.imeis: - self.ws_data += self.ws.send(Message(data=message.json)) + if self.ready and message["imei"] in self.imeis: + self.ws_data += self.ws.send(Message(data=dumps(message))) def write(self): if self.ws_data: @@ -196,19 +208,12 @@ class Clients: def recv(self, fd): clnt = self.by_fd[fd] - msgs = clnt.recv() - if msgs is None: - return None - result = [] - for msg in msgs: - log.debug("Received: %s", msg) - result.append(msg) - return result + return clnt.recv() def send(self, msg): towrite = set() for fd, clnt in self.by_fd.items(): - if clnt.wants(msg.imei): + if clnt.wants(msg["imei"]): clnt.send(msg) towrite.add(fd) return towrite @@ -230,10 +235,11 @@ class Clients: def runserver(conf): global htmlfile + blinit(conf.get("storage", "dbfn")) htmlfile = conf.get("wsgateway", "htmlfile") zctx = zmq.Context() zsub = zctx.socket(zmq.SUB) - zsub.connect(conf.get("lookaside", "publishurl")) + zsub.connect(conf.get("collector", "publishurl")) tcpl = socket(AF_INET6, SOCK_STREAM) tcpl.setblocking(False) tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) @@ -250,9 +256,23 @@ def runserver(conf): while True: neededsubs = clients.subs() for imei in neededsubs - activesubs: - zsub.setsockopt(zmq.SUBSCRIBE, imei.encode()) + zsub.setsockopt( + zmq.SUBSCRIBE, + topic(GPS_POSITIONING.PROTO, True, imei), + ) + zsub.setsockopt( + zmq.SUBSCRIBE, + topic(WIFI_POSITIONING.PROTO, False, imei), + ) for imei in activesubs - neededsubs: - zsub.setsockopt(zmq.UNSUBSCRIBE, imei.encode()) + zsub.setsockopt( + zmq.UNSUBSCRIBE, + topic(GPS_POSITIONING.PROTO, True, imei), + ) + zsub.setsockopt( + zmq.UNSUBSCRIBE, + topic(WIFI_POSITIONING.PROTO, False, imei), + ) activesubs = neededsubs log.debug("Subscribed to: %s", activesubs) tosend = [] @@ -264,8 +284,21 @@ def runserver(conf): if sk is zsub: while True: try: - zmsg = LocEvt(zsub.recv(zmq.NOBLOCK)) - tosend.append(zmsg) + zmsg = Bcast(zsub.recv(zmq.NOBLOCK)) + msg = parse_message(zmsg.packet, zmsg.is_incoming) + log.debug("Got %s with %s", zmsg, msg) + tosend.append( + { + "imei": zmsg.imei, + "timestamp": str( + datetime.fromtimestamp( + zmsg.when + ).astimezone(tz=timezone.utc) + ), + "longitude": msg.longitude, + "latitude": msg.latitude, + } + ) except zmq.Again: break elif sk == tcpfd: @@ -280,6 +313,11 @@ def runserver(conf): else: for msg in received: log.debug("Received from %d: %s", sk, msg) + if msg.get("type", None) == "subscribe": + imei = msg.get("imei") + numback = msg.get("backlog", 5) + for elem in imei: + tosend.extend(backlog(elem, numback)) towrite.add(sk) elif fl & zmq.POLLOUT: log.debug("Write now open for fd %d", sk)