import zmq
from . import common
+from .backlog import blinit, backlog
from .zmsg import LocEvt
log = getLogger("gps303/wsgateway")
def recv(self, fd):
clnt = self.by_fd[fd]
- msgs = clnt.recv()
- if msgs is None:
- return None
- result = []
- for msg in msgs:
- log.debug("Received: %s", msg)
- result.append(msg)
- return result
+ return clnt.recv()
def send(self, msg):
towrite = set()
waiting.add(fd)
return waiting
+ def subs(self):
+ result = set()
+ for clnt in self.by_fd.values():
+ result |= clnt.imeis
+ return result
+
def runserver(conf):
global htmlfile
+ blinit(conf.get("storage", "dbfn"), conf.get("opencellid", "dbfn"))
htmlfile = conf.get("wsgateway", "htmlfile")
zctx = zmq.Context()
zsub = zctx.socket(zmq.SUB)
zsub.connect(conf.get("lookaside", "publishurl"))
- zsub.setsockopt(zmq.SUBSCRIBE, b"")
tcpl = socket(AF_INET6, SOCK_STREAM)
tcpl.setblocking(False)
tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
poller.register(zsub, flags=zmq.POLLIN)
poller.register(tcpfd, flags=zmq.POLLIN)
clients = Clients()
+ activesubs = set()
try:
towait = set()
while True:
+ neededsubs = clients.subs()
+ for imei in neededsubs - activesubs:
+ zsub.setsockopt(zmq.SUBSCRIBE, imei.encode())
+ for imei in activesubs - neededsubs:
+ zsub.setsockopt(zmq.UNSUBSCRIBE, imei.encode())
+ activesubs = neededsubs
+ log.debug("Subscribed to: %s", activesubs)
tosend = []
topoll = []
tostop = []
else:
for msg in received:
log.debug("Received from %d: %s", sk, msg)
+ if msg.get("type", None) == "subscribe":
+ imei = msg.get("imei")
+ numback = msg.get("backlog", 5)
+ for elem in imei:
+ tosend.extend(backlog(elem, numback))
towrite.add(sk)
elif fl & zmq.POLLOUT:
log.debug("Write now open for fd %d", sk)