1 """ Websocket Gateway """
3 from logging import getLogger
4 from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
6 from wsproto import ConnectionType, WSConnection
7 from wsproto.events import (
15 from wsproto.utilities import RemoteProtocolError
19 from .zmsg import LocEvt
21 log = getLogger("gps303/wsgateway")
25 def try_http(data, fd, e):
28 lines = data.decode().split("\r\n")
31 op, resource, proto = request.split(" ")
33 "HTTP %s for %s, proto %s from fd %d, headers: %s",
41 pos = resource.index("?")
42 resource = resource[:pos]
48 f"{proto} 500 No data configured\r\n"
49 f"Content-Type: text/plain\r\n\r\n"
50 f"HTML data not configure on the server\r\n".encode()
53 length = len(htmldata.encode("utf-8"))
56 f"Content-Type: text/html; charset=utf-8\r\n"
57 f"Content-Length: {length:d}\r\n\r\n" + htmldata
61 f"{proto} 404 File not found\r\n"
62 f"Content-Type: text/plain\r\n\r\n"
63 f"We can only serve \"/\"\r\n".encode()
67 f"{proto} 400 Bad request\r\n"
68 "Content-Type: text/plain\r\n\r\n"
69 "Bad request\r\n".encode()
72 log.warning("Unparseable data from fd %d: %s", fd, data)
77 """Websocket connection to the client"""
79 def __init__(self, sock, addr):
82 self.ws = WSConnection(ConnectionType.SERVER)
86 log.debug("Closing fd %d", self.sock.fileno())
91 data = self.sock.recv(4096)
94 "Reading from fd %d: %s",
98 self.ws.receive_data(None)
100 if not data: # Client has closed connection
102 "EOF reading from fd %d",
105 self.ws.receive_data(None)
108 self.ws.receive_data(data)
109 except RemoteProtocolError as e:
111 "Websocket error on fd %d, try plain http (%s)",
115 self.ws_data = try_http(data, self.sock.fileno(), e)
116 log.debug("Sending HTTP response to %d", self.sock.fileno())
120 for event in self.ws.events():
121 if isinstance(event, Request):
122 log.debug("WebSocket upgrade on fd %d", self.sock.fileno())
123 # self.ws_data += self.ws.send(event.response()) # Why not?!
124 self.ws_data += self.ws.send(AcceptConnection())
125 elif isinstance(event, (CloseConnection, Ping)):
126 log.debug("%s on fd %d", event, self.sock.fileno())
127 self.ws_data += self.ws.send(event.response())
128 elif isinstance(event, TextMessage):
129 # TODO: save imei "subscription"
130 log.debug("%s on fd %d", event, self.sock.fileno())
131 msgs.append(event.data)
133 log.warning("%s on fd %d", event, self.sock.fileno())
134 if self.ws_data: # Temp hack
138 def wants(self, imei):
139 return True # TODO: check subscriptions
141 def send(self, message):
142 # TODO: filter only wanted imei got from the client
143 self.ws_data += self.ws.send(Message(data=message.json))
147 sent = self.sock.send(self.ws_data)
148 self.ws_data = self.ws_data[sent:]
151 "Sending to fd %d: %s",
162 def add(self, clntsock, clntaddr):
163 fd = clntsock.fileno()
164 log.info("Start serving fd %d from %s", fd, clntaddr)
165 self.by_fd[fd] = Client(clntsock, clntaddr)
169 clnt = self.by_fd[fd]
170 log.info("Stop serving fd %d", clnt.sock.fileno())
175 clnt = self.by_fd[fd]
181 log.debug("Received: %s", msg)
185 for clnt in self.by_fd.values():
186 if clnt.wants(msg.imei):
195 conf.get("wsgateway", "htmlfile"), encoding="utf-8"
201 zsub = zctx.socket(zmq.SUB)
202 zsub.connect(conf.get("lookaside", "publishurl"))
203 zsub.setsockopt(zmq.SUBSCRIBE, b"")
204 tcpl = socket(AF_INET6, SOCK_STREAM)
205 tcpl.setblocking(False)
206 tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
207 tcpl.bind(("", conf.getint("wsgateway", "port")))
209 tcpfd = tcpl.fileno()
210 poller = zmq.Poller()
211 poller.register(zsub, flags=zmq.POLLIN)
212 poller.register(tcpfd, flags=zmq.POLLIN)
219 events = poller.poll(5000)
220 for sk, fl in events:
224 zmsg = LocEvt(zsub.recv(zmq.NOBLOCK))
229 clntsock, clntaddr = tcpl.accept()
230 topoll.append((clntsock, clntaddr))
231 elif fl & zmq.POLLIN:
232 received = clients.recv(sk)
234 log.debug("Client gone from fd %d", sk)
238 log.debug("Received from %d: %s", sk, msg)
240 log.debug("Stray event: %s on socket %s", fl, sk)
241 # poll queue consumed, make changes now
243 poller.unregister(fd)
246 log.debug("Sending to the client: %s", zmsg)
248 for clntsock, clntaddr in topoll:
249 fd = clients.add(clntsock, clntaddr)
250 poller.register(fd, flags=zmq.POLLIN)
251 # TODO: Handle write overruns (register for POLLOUT)
252 except KeyboardInterrupt:
256 if __name__.endswith("__main__"):
257 runserver(common.init(log))