1 """ Websocket Gateway """
3 from logging import getLogger
4 from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
6 from wsproto import ConnectionType, WSConnection
7 from wsproto.events import AcceptConnection, CloseConnection, Message, Ping, Request, TextMessage
11 from .zmsg import LocEvt
13 log = getLogger("gps303/wsgateway")
17 """Websocket connection to the client"""
19 def __init__(self, sock, addr):
22 self.ws = WSConnection(ConnectionType.SERVER)
26 log.debug("Closing fd %d", self.sock.fileno())
31 data = self.sock.recv(4096)
34 "Reading from fd %d: %s",
38 self.ws.receive_data(None)
40 if not data: # Client has closed connection
42 "EOF reading from fd %d",
45 self.ws.receive_data(None)
47 self.ws.receive_data(data)
49 for event in self.ws.events():
50 if isinstance(event, Request):
51 log.debug("WebSocket upgrade on fd %d", self.sock.fileno())
52 #self.ws_data += self.ws.send(event.response()) # Why not?!
53 self.ws_data += self.ws.send(AcceptConnection())
54 elif isinstance(event, (CloseConnection, Ping)):
55 log.debug("%s on fd %d", event, self.sock.fileno())
56 self.ws_data += self.ws.send(event.response())
57 elif isinstance(event, TextMessage):
58 # TODO: save imei "subscription"
59 log.debug("%s on fd %d", event, self.sock.fileno())
60 msgs.append(event.data)
62 log.warning("%s on fd %d", event, self.sock.fileno())
63 if self.ws_data: # Temp hack
67 def send(self, imei, message):
68 # TODO: filter only wanted imei got from the client
69 self.ws_data += self.ws.send(Message(data=message))
73 sent = self.sock.send(self.ws_data)
74 self.ws_data = self.ws_data[sent:]
77 "Sending to fd %d (IMEI %s): %s",
89 def add(self, clntsock, clntaddr):
90 fd = clntsock.fileno()
91 log.info("Start serving fd %d from %s", fd, clntaddr)
92 self.by_fd[fd] = Client(clntsock, clntaddr)
97 log.info("Stop serving fd %d", clnt.sock.fileno())
102 clnt = self.by_fd[fd]
108 log.debug("Received: %s", msg)
111 def send(self, msgs):
112 for clnt in self.by_fd.values():
118 zsub = zctx.socket(zmq.SUB)
119 zsub.connect(conf.get("lookaside", "publishurl"))
120 zsub.setsockopt(zmq.SUBSCRIBE, b"")
121 tcpl = socket(AF_INET6, SOCK_STREAM)
122 tcpl.setblocking(False)
123 tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
124 tcpl.bind(("", conf.getint("wsgateway", "port")))
126 tcpfd = tcpl.fileno()
127 poller = zmq.Poller()
128 poller.register(zsub, flags=zmq.POLLIN)
129 poller.register(tcpfd, flags=zmq.POLLIN)
136 events = poller.poll(1000)
137 for sk, fl in events:
141 msg = zsub.recv(zmq.NOBLOCK)
142 tosend.append(LocEvt(msg))
146 clntsock, clntaddr = tcpl.accept()
147 topoll.append((clntsock, clntaddr))
148 elif fl & zmq.POLLIN:
149 received = clients.recv(sk)
152 "Client gone from fd %d", sk
157 log.debug("Received from %d: %s", sk, msg)
159 log.debug("Stray event: %s on socket %s", fl, sk)
160 # poll queue consumed, make changes now
162 poller.unregister(fd)
165 log.debug("Sending to the client: %s", zmsg)
167 for clntsock, clntaddr in topoll:
168 fd = clients.add(clntsock, clntaddr)
169 poller.register(fd, flags=zmq.POLLIN)
170 # TODO: Handle write overruns (register for POLLOUT)
171 except KeyboardInterrupt:
175 if __name__.endswith("__main__"):
176 runserver(common.init(log))