1 """ Websocket Gateway """
3 from logging import getLogger
4 from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
6 from wsproto import ConnectionType, WSConnection
7 from wsproto.events import (
15 from wsproto.utilities import RemoteProtocolError
19 from .zmsg import LocEvt
21 log = getLogger("gps303/wsgateway")
25 def try_http(data, fd, e):
28 lines = data.decode().split("\r\n")
31 op, resource, proto = request.split(" ")
33 "HTTP %s for %s, proto %s from fd %d, headers: %s",
41 pos = resource.index("?")
42 resource = resource[:pos]
48 f"{proto} 500 No data configured\r\n"
49 f"Content-Type: text/plain\r\n\r\n"
50 f"HTML data not configure on the server\r\n".encode()
53 length = len(htmldata.encode("utf-8"))
56 f"Content-Type: text/html; charset=utf-8\r\n"
57 f"Content-Length: {length:d}\r\n\r\n" + htmldata
61 f"{proto} 404 File not found\r\n"
62 f"Content-Type: text/plain\r\n\r\n"
63 f'We can only serve "/"\r\n'.encode()
67 f"{proto} 400 Bad request\r\n"
68 "Content-Type: text/plain\r\n\r\n"
69 "Bad request\r\n".encode()
72 log.warning("Unparseable data from fd %d: %s", fd, data)
77 """Websocket connection to the client"""
79 def __init__(self, sock, addr):
82 self.ws = WSConnection(ConnectionType.SERVER)
86 log.debug("Closing fd %d", self.sock.fileno())
91 data = self.sock.recv(4096)
94 "Reading from fd %d: %s",
98 self.ws.receive_data(None)
100 if not data: # Client has closed connection
102 "EOF reading from fd %d",
105 self.ws.receive_data(None)
108 self.ws.receive_data(data)
109 except RemoteProtocolError as e:
111 "Websocket error on fd %d, try plain http (%s)",
115 self.ws_data = try_http(data, self.sock.fileno(), e)
116 log.debug("Sending HTTP response to %d", self.sock.fileno())
120 for event in self.ws.events():
121 if isinstance(event, Request):
122 log.debug("WebSocket upgrade on fd %d", self.sock.fileno())
123 # self.ws_data += self.ws.send(event.response()) # Why not?!
124 self.ws_data += self.ws.send(AcceptConnection())
125 elif isinstance(event, (CloseConnection, Ping)):
126 log.debug("%s on fd %d", event, self.sock.fileno())
127 self.ws_data += self.ws.send(event.response())
128 elif isinstance(event, TextMessage):
129 # TODO: save imei "subscription"
130 log.debug("%s on fd %d", event, self.sock.fileno())
131 msgs.append(event.data)
133 log.warning("%s on fd %d", event, self.sock.fileno())
134 if self.ws_data: # Temp hack
138 def wants(self, imei):
139 return True # TODO: check subscriptions
141 def send(self, message):
142 # TODO: filter only wanted imei got from the client
143 self.ws_data += self.ws.send(Message(data=message.json))
148 sent = self.sock.send(self.ws_data)
149 self.ws_data = self.ws_data[sent:]
152 "Sending to fd %d: %s",
157 return bool(self.ws_data)
164 def add(self, clntsock, clntaddr):
165 fd = clntsock.fileno()
166 log.info("Start serving fd %d from %s", fd, clntaddr)
167 self.by_fd[fd] = Client(clntsock, clntaddr)
171 clnt = self.by_fd[fd]
172 log.info("Stop serving fd %d", clnt.sock.fileno())
177 clnt = self.by_fd[fd]
183 log.debug("Received: %s", msg)
188 for fd, clnt in self.by_fd.items():
189 if clnt.wants(msg.imei):
194 def write(self, towrite):
196 for fd, clnt in [(fd, self.by_fd.get(fd)) for fd in towrite]:
205 with open(conf.get("wsgateway", "htmlfile"), encoding="utf-8") as fl:
210 zsub = zctx.socket(zmq.SUB)
211 zsub.connect(conf.get("lookaside", "publishurl"))
212 zsub.setsockopt(zmq.SUBSCRIBE, b"")
213 tcpl = socket(AF_INET6, SOCK_STREAM)
214 tcpl.setblocking(False)
215 tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
216 tcpl.bind(("", conf.getint("wsgateway", "port")))
218 tcpfd = tcpl.fileno()
219 poller = zmq.Poller()
220 poller.register(zsub, flags=zmq.POLLIN)
221 poller.register(tcpfd, flags=zmq.POLLIN)
230 events = poller.poll(5000)
231 for sk, fl in events:
235 zmsg = LocEvt(zsub.recv(zmq.NOBLOCK))
240 clntsock, clntaddr = tcpl.accept()
241 topoll.append((clntsock, clntaddr))
242 elif fl & zmq.POLLIN:
243 received = clients.recv(sk)
245 log.debug("Client gone from fd %d", sk)
249 log.debug("Received from %d: %s", sk, msg)
250 elif fl & zmq.POLLOUT:
251 log.debug("Write now open for fd %d", sk)
255 log.debug("Stray event: %s on socket %s", fl, sk)
256 # poll queue consumed, make changes now
258 poller.unregister(fd)
261 log.debug("Sending to the clients: %s", zmsg)
262 towrite |= clients.send(zmsg)
263 for clntsock, clntaddr in topoll:
264 fd = clients.add(clntsock, clntaddr)
265 poller.register(fd, flags=zmq.POLLIN)
266 # Deal with actually writing the data out
267 trywrite = towrite - towait
268 morewait = clients.write(trywrite)
270 "towait %s, tried %s, still busy %s",
275 for fd in morewait - trywrite: # new fds waiting for write
276 poller.modify(fd, flags=zmq.POLLIN | zmq.POLLOUT)
277 for fd in trywrite - morewait: # no longer waiting for write
278 poller.modify(fd, flags=zmq.POLLIN)
281 except KeyboardInterrupt:
285 if __name__.endswith("__main__"):
286 runserver(common.init(log))