1 """ Websocket Gateway """
3 from logging import getLogger
4 from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
6 from wsproto import ConnectionType, WSConnection
7 from wsproto.events import (
15 from wsproto.utilities import RemoteProtocolError
19 from .zmsg import LocEvt
21 log = getLogger("gps303/wsgateway")
25 def try_http(data, fd, e):
28 lines = data.decode().split("\r\n")
31 op, resource, proto = request.split(" ")
33 "HTTP %s for %s, proto %s from fd %d, headers: %s",
43 f"{proto} 500 No data configured\r\n"
44 f"Content-Type: text/plain\r\n\r\n"
45 f"HTML data not configure on the server\r\n".encode()
48 length = len(htmldata.encode("utf-8"))
51 f"Content-Type: text/html; charset=utf-8\r\n"
52 f"Content-Length: {length:d}\r\n\r\n" + htmldata
56 f"{proto} 404 File not found\r\n"
57 f"Content-Type: text/plain\r\n\r\n"
58 f"We can only serve \"/\"\r\n".encode()
62 f"{proto} 400 Bad request\r\n"
63 "Content-Type: text/plain\r\n\r\n"
64 "Bad request\r\n".encode()
67 log.warning("Unparseable data from fd %d: %s", fd, data)
72 """Websocket connection to the client"""
74 def __init__(self, sock, addr):
77 self.ws = WSConnection(ConnectionType.SERVER)
81 log.debug("Closing fd %d", self.sock.fileno())
86 data = self.sock.recv(4096)
89 "Reading from fd %d: %s",
93 self.ws.receive_data(None)
95 if not data: # Client has closed connection
97 "EOF reading from fd %d",
100 self.ws.receive_data(None)
103 self.ws.receive_data(data)
104 except RemoteProtocolError as e:
106 "Websocket error on fd %d, try plain http (%s)",
110 self.ws_data = try_http(data, self.sock.fileno(), e)
111 log.debug("Sending HTTP response to %d", self.sock.fileno())
115 for event in self.ws.events():
116 if isinstance(event, Request):
117 log.debug("WebSocket upgrade on fd %d", self.sock.fileno())
118 # self.ws_data += self.ws.send(event.response()) # Why not?!
119 self.ws_data += self.ws.send(AcceptConnection())
120 elif isinstance(event, (CloseConnection, Ping)):
121 log.debug("%s on fd %d", event, self.sock.fileno())
122 self.ws_data += self.ws.send(event.response())
123 elif isinstance(event, TextMessage):
124 # TODO: save imei "subscription"
125 log.debug("%s on fd %d", event, self.sock.fileno())
126 msgs.append(event.data)
128 log.warning("%s on fd %d", event, self.sock.fileno())
129 if self.ws_data: # Temp hack
133 def send(self, imei, message):
134 # TODO: filter only wanted imei got from the client
135 self.ws_data += self.ws.send(Message(data=message))
139 sent = self.sock.send(self.ws_data)
140 self.ws_data = self.ws_data[sent:]
143 "Sending to fd %d: %s",
154 def add(self, clntsock, clntaddr):
155 fd = clntsock.fileno()
156 log.info("Start serving fd %d from %s", fd, clntaddr)
157 self.by_fd[fd] = Client(clntsock, clntaddr)
161 clnt = self.by_fd[fd]
162 log.info("Stop serving fd %d", clnt.sock.fileno())
167 clnt = self.by_fd[fd]
173 log.debug("Received: %s", msg)
176 def send(self, msgs):
177 for clnt in self.by_fd.values():
186 conf.get("wsgateway", "htmlfile"), encoding="utf-8"
192 zsub = zctx.socket(zmq.SUB)
193 zsub.connect(conf.get("lookaside", "publishurl"))
194 zsub.setsockopt(zmq.SUBSCRIBE, b"")
195 tcpl = socket(AF_INET6, SOCK_STREAM)
196 tcpl.setblocking(False)
197 tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
198 tcpl.bind(("", conf.getint("wsgateway", "port")))
200 tcpfd = tcpl.fileno()
201 poller = zmq.Poller()
202 poller.register(zsub, flags=zmq.POLLIN)
203 poller.register(tcpfd, flags=zmq.POLLIN)
210 events = poller.poll(1000)
211 for sk, fl in events:
215 msg = zsub.recv(zmq.NOBLOCK)
216 tosend.append(LocEvt(msg))
220 clntsock, clntaddr = tcpl.accept()
221 topoll.append((clntsock, clntaddr))
222 elif fl & zmq.POLLIN:
223 received = clients.recv(sk)
225 log.debug("Client gone from fd %d", sk)
229 log.debug("Received from %d: %s", sk, msg)
231 log.debug("Stray event: %s on socket %s", fl, sk)
232 # poll queue consumed, make changes now
234 poller.unregister(fd)
237 log.debug("Sending to the client: %s", zmsg)
239 for clntsock, clntaddr in topoll:
240 fd = clients.add(clntsock, clntaddr)
241 poller.register(fd, flags=zmq.POLLIN)
242 # TODO: Handle write overruns (register for POLLOUT)
243 except KeyboardInterrupt:
247 if __name__.endswith("__main__"):
248 runserver(common.init(log))