1 """ Websocket Gateway """
4 from logging import getLogger
5 from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
7 from wsproto import ConnectionType, WSConnection
8 from wsproto.events import (
16 from wsproto.utilities import RemoteProtocolError
20 from .backlog import blinit, backlog
21 from .gps303proto import (
26 from .zmsg import Bcast, topic
28 log = getLogger("gps303/wsgateway")
32 def try_http(data, fd, e):
35 lines = data.decode().split("\r\n")
38 op, resource, proto = request.split(" ")
40 "HTTP %s for %s, proto %s from fd %d, headers: %s",
48 pos = resource.index("?")
49 resource = resource[:pos]
55 f"{proto} 500 No data configured\r\n"
56 f"Content-Type: text/plain\r\n\r\n"
57 f"HTML data not configure on the server\r\n".encode()
61 with open(htmlfile, "rb") as fl:
63 length = len(htmldata)
66 f"Content-Type: text/html; charset=utf-8\r\n"
67 f"Content-Length: {len(htmldata):d}\r\n\r\n"
68 ).encode("utf-8") + htmldata
71 f"{proto} 500 File not found\r\n"
72 f"Content-Type: text/plain\r\n\r\n"
73 f"HTML file could not be opened\r\n".encode()
77 f"{proto} 404 File not found\r\n"
78 f"Content-Type: text/plain\r\n\r\n"
79 f'We can only serve "/"\r\n'.encode()
83 f"{proto} 400 Bad request\r\n"
84 "Content-Type: text/plain\r\n\r\n"
85 "Bad request\r\n".encode()
88 log.warning("Unparseable data from fd %d: %s", fd, data)
93 """Websocket connection to the client"""
95 def __init__(self, sock, addr):
98 self.ws = WSConnection(ConnectionType.SERVER)
104 log.debug("Closing fd %d", self.sock.fileno())
109 data = self.sock.recv(4096)
112 "Reading from fd %d: %s",
116 self.ws.receive_data(None)
118 if not data: # Client has closed connection
120 "EOF reading from fd %d",
123 self.ws.receive_data(None)
126 self.ws.receive_data(data)
127 except RemoteProtocolError as e:
129 "Websocket error on fd %d, try plain http (%s)",
133 self.ws_data = try_http(data, self.sock.fileno(), e)
134 self.write() # TODO this is a hack
135 log.debug("Sending HTTP response to %d", self.sock.fileno())
139 for event in self.ws.events():
140 if isinstance(event, Request):
141 log.debug("WebSocket upgrade on fd %d", self.sock.fileno())
142 # self.ws_data += self.ws.send(event.response()) # Why not?!
143 self.ws_data += self.ws.send(AcceptConnection())
145 elif isinstance(event, (CloseConnection, Ping)):
146 log.debug("%s on fd %d", event, self.sock.fileno())
147 self.ws_data += self.ws.send(event.response())
148 elif isinstance(event, TextMessage):
149 # TODO: save imei "subscription"
150 log.debug("%s on fd %d", event, self.sock.fileno())
151 msg = loads(event.data)
153 if msg.get("type", None) == "subscribe":
154 self.imeis = set(msg.get("imei", []))
156 "subs list on fd %s is %s",
161 log.warning("%s on fd %d", event, self.sock.fileno())
164 def wants(self, imei):
166 "wants %s? set is %s on fd %d",
171 return True # TODO: check subscriptions
173 def send(self, message):
174 if self.ready and message.imei in self.imeis:
175 self.ws_data += self.ws.send(Message(data=message.json))
180 sent = self.sock.send(self.ws_data)
181 self.ws_data = self.ws_data[sent:]
184 "Sending to fd %d: %s",
189 return bool(self.ws_data)
196 def add(self, clntsock, clntaddr):
197 fd = clntsock.fileno()
198 log.info("Start serving fd %d from %s", fd, clntaddr)
199 self.by_fd[fd] = Client(clntsock, clntaddr)
203 clnt = self.by_fd[fd]
204 log.info("Stop serving fd %d", clnt.sock.fileno())
209 clnt = self.by_fd[fd]
214 for fd, clnt in self.by_fd.items():
215 if clnt.wants(msg.imei):
220 def write(self, towrite):
222 for fd, clnt in [(fd, self.by_fd.get(fd)) for fd in towrite]:
229 for clnt in self.by_fd.values():
237 blinit(conf.get("storage", "dbfn"))
238 htmlfile = conf.get("wsgateway", "htmlfile")
240 zsub = zctx.socket(zmq.SUB)
241 zsub.connect(conf.get("collector", "publishurl"))
242 tcpl = socket(AF_INET6, SOCK_STREAM)
243 tcpl.setblocking(False)
244 tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
245 tcpl.bind(("", conf.getint("wsgateway", "port")))
247 tcpfd = tcpl.fileno()
248 poller = zmq.Poller()
249 poller.register(zsub, flags=zmq.POLLIN)
250 poller.register(tcpfd, flags=zmq.POLLIN)
256 neededsubs = clients.subs()
257 for imei in neededsubs - activesubs:
260 topic(GPS_POSITIONING.PROTO, True),
264 topic(WIFI_POSITIONING.PROTO, False),
266 for imei in activesubs - neededsubs:
269 topic(GPS_POSITIONING.PROTO, True),
273 topic(WIFI_POSITIONING.PROTO, False),
275 activesubs = neededsubs
276 log.debug("Subscribed to: %s", activesubs)
281 events = poller.poll()
282 for sk, fl in events:
286 zmsg = Bcast(zsub.recv(zmq.NOBLOCK))
287 msg = parse_message(zmsg.packet)
289 log.debug("Got %s", zmsg)
293 clntsock, clntaddr = tcpl.accept()
294 topoll.append((clntsock, clntaddr))
295 elif fl & zmq.POLLIN:
296 received = clients.recv(sk)
298 log.debug("Client gone from fd %d", sk)
303 log.debug("Received from %d: %s", sk, msg)
304 if msg.get("type", None) == "subscribe":
305 imei = msg.get("imei")
306 numback = msg.get("backlog", 5)
308 tosend.extend(backlog(elem, numback))
310 elif fl & zmq.POLLOUT:
311 log.debug("Write now open for fd %d", sk)
315 log.debug("Stray event: %s on socket %s", fl, sk)
316 # poll queue consumed, make changes now
318 poller.unregister(fd)
321 log.debug("Sending to the clients: %s", zmsg)
322 towrite |= clients.send(zmsg)
323 for clntsock, clntaddr in topoll:
324 fd = clients.add(clntsock, clntaddr)
325 poller.register(fd, flags=zmq.POLLIN)
326 # Deal with actually writing the data out
327 trywrite = towrite - towait
328 morewait = clients.write(trywrite)
330 "towait %s, tried %s, still busy %s",
335 for fd in morewait - trywrite: # new fds waiting for write
336 poller.modify(fd, flags=zmq.POLLIN | zmq.POLLOUT)
337 for fd in trywrite - morewait: # no longer waiting for write
338 poller.modify(fd, flags=zmq.POLLIN)
341 except KeyboardInterrupt:
345 if __name__.endswith("__main__"):
346 runserver(common.init(log))