X-Git-Url: http://www.average.org/gitweb/?a=blobdiff_plain;f=gps303%2Fcollector.py;h=9f305e55f60197f38d84e820a793192bb93abbc2;hb=01e993a49ac30b400940fc031cc94ef893fc7200;hp=385285494d7f0782bfc3c8ab20eff1ece22bbf7e;hpb=f85690956cf13e342ac02dea298fe876c4163c95;p=loctrkd.git diff --git a/gps303/collector.py b/gps303/collector.py index 3852854..9f305e5 100644 --- a/gps303/collector.py +++ b/gps303/collector.py @@ -1,7 +1,8 @@ """ TCP server that communicates with terminals """ from logging import getLogger -from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR +from os import umask +from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR from time import time from struct import pack import zmq @@ -14,33 +15,11 @@ from .gps303proto import ( parse_message, proto_of_message, ) +from .zmsg import Bcast, Resp log = getLogger("gps303/collector") -class Bcast: - """Zmq message to broadcast what was received from the terminal""" - - def __init__(self, imei, msg): - self.as_bytes = ( - pack("B", proto_of_message(msg)) - + ("0000000000000000" if imei is None else imei).encode() - + msg - ) - - -class Resp: - """Zmq message received from a third party to send to the terminal""" - - def __init__(self, *args, **kwargs): - if not kwargs and len(args) == 1 and isinstance(args[0], bytes): - self.imei = msg[:16].decode() - self.payload = msg[16:] - elif len(args) == 0: - self.imei = kwargs["imei"] - self.payload = kwargs["payload"] - - class Client: """Connected socket to the terminal plus buffer and metadata""" @@ -54,7 +33,6 @@ class Client: log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei) self.sock.close() self.buffer = b"" - self.imei = None def recv(self): """Read from the socket and parse complete messages""" @@ -101,7 +79,7 @@ class Client: log.info( "LOGIN from fd %d (IMEI %s)", self.sock.fileno(), self.imei ) - msgs.append(packet) + msgs.append((when, self.addr, packet)) return msgs def send(self, buffer): @@ -123,6 +101,7 @@ class Clients: def add(self, clntsock, clntaddr): fd = clntsock.fileno() + log.info("Start serving fd %d from %s", fd, clntaddr) self.by_fd[fd] = Client(clntsock, clntaddr) return fd @@ -140,30 +119,40 @@ class Clients: if msgs is None: return None result = [] - for msg in msgs: - if proto_of_message(msg) == LOGIN.PROTO: # Could do blindly... + for when, peeraddr, packet in msgs: + if proto_of_message(packet) == LOGIN.PROTO: # Could do blindly... self.by_imei[clnt.imei] = clnt - result.append((clnt.imei, msg)) + result.append((clnt.imei, when, peeraddr, packet)) + log.debug( + "Received from %s (IMEI %s): %s", + peeraddr, + clnt.imei, + packet.hex(), + ) return result def response(self, resp): if resp.imei in self.by_imei: - self.by_imei[resp.imei].send(resp.payload) + self.by_imei[resp.imei].send(resp.packet) + else: + log.info("Not connected (IMEI %s)", resp.imei) def runserver(conf): zctx = zmq.Context() zpub = zctx.socket(zmq.PUB) + zpull = zctx.socket(zmq.PULL) + oldmask = umask(0o117) zpub.bind(conf.get("collector", "publishurl")) - zsub = zctx.socket(zmq.SUB) - zsub.connect(conf.get("collector", "listenurl")) - tcpl = socket(AF_INET, SOCK_STREAM) + zpull.bind(conf.get("collector", "listenurl")) + umask(oldmask) + tcpl = socket(AF_INET6, SOCK_STREAM) tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) tcpl.bind(("", conf.getint("collector", "port"))) tcpl.listen(5) tcpfd = tcpl.fileno() poller = zmq.Poller() - poller.register(zsub, flags=zmq.POLLIN) + poller.register(zpull, flags=zmq.POLLIN) poller.register(tcpfd, flags=zmq.POLLIN) clients = Clients() try: @@ -171,19 +160,19 @@ def runserver(conf): tosend = [] topoll = [] tostop = [] - events = poller.poll(10) + events = poller.poll(1000) for sk, fl in events: - if sk is zsub: + if sk is zpull: while True: try: - msg = zsub.recv(zmq.NOBLOCK) + msg = zpull.recv(zmq.NOBLOCK) tosend.append(Resp(msg)) except zmq.Again: break elif sk == tcpfd: clntsock, clntaddr = tcpl.accept() topoll.append((clntsock, clntaddr)) - else: + elif fl & zmq.POLLIN: received = clients.recv(sk) if received is None: log.debug( @@ -191,29 +180,41 @@ def runserver(conf): ) tostop.append(sk) else: - for imei, msg in received: - zpub.send(Bcast(imei, msg).as_bytes) - if proto_of_message(msg) == HIBERNATION.PROTO: + for imei, when, peeraddr, packet in received: + proto = proto_of_message(packet) + zpub.send( + Bcast( + proto=proto, + imei=imei, + when=when, + peeraddr=peeraddr, + packet=packet, + ).packed + ) + if proto == HIBERNATION.PROTO: log.debug( "HIBERNATION from fd %d (IMEI %s)", sk, imei, ) tostop.append(sk) - respmsg = inline_response(msg) + respmsg = inline_response(packet) if respmsg is not None: clients.response( - Resp(imei=imei, payload=respmsg) + Resp(imei=imei, packet=respmsg) ) + else: + log.debug("Stray event: %s on socket %s", fl, sk) # poll queue consumed, make changes now for fd in tostop: poller.unregister(fd) clients.stop(fd) for zmsg in tosend: + log.debug("Sending to the client: %s", zmsg) clients.response(zmsg) for clntsock, clntaddr in topoll: fd = clients.add(clntsock, clntaddr) - poller.register(fd) + poller.register(fd, flags=zmq.POLLIN) except KeyboardInterrupt: pass