1 """ Websocket Gateway """
3 from configparser import ConfigParser
4 from datetime import datetime, timezone
5 from importlib import import_module
6 from json import dumps, loads
7 from logging import getLogger
8 from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
10 from typing import Any, cast, Dict, List, Optional, Set, Tuple
11 from wsproto import ConnectionType, WSConnection
12 from wsproto.events import (
21 from wsproto.utilities import RemoteProtocolError
25 from .evstore import initdb, fetch
26 from .protomodule import ProtoModule
27 from .zmsg import Bcast, topic
29 log = getLogger("loctrkd/wsgateway")
33 pmods: List[ProtoModule] = []
34 selector: List[Tuple[bool, str]] = []
37 def backlog(imei: str, numback: int) -> List[Dict[str, Any]]:
39 for is_incoming, timestamp, proto, packet in fetch(
45 if pmod.proto_handled(proto):
46 msg = pmod.parse_message(packet, is_incoming=is_incoming)
52 datetime.fromtimestamp(timestamp).astimezone(
56 "longitude": msg.longitude,
57 "latitude": msg.latitude,
59 if True # TODO isinstance(msg, GPS_POSITIONING)
66 def try_http(data: bytes, fd: int, e: Exception) -> bytes:
69 lines = data.decode().split("\r\n")
72 op, resource, proto = request.split(" ")
74 "HTTP %s for %s, proto %s from fd %d, headers: %s",
84 f"{proto} 500 No data configured\r\n"
85 f"Content-Type: text/plain\r\n\r\n"
86 f"HTML data not configured on the server\r\n".encode()
90 with open(htmlfile, "rb") as fl:
92 length = len(htmldata)
95 f"Content-Type: text/html; charset=utf-8\r\n"
96 f"Content-Length: {len(htmldata):d}\r\n\r\n"
97 ).encode("utf-8") + htmldata
100 f"{proto} 500 File not found\r\n"
101 f"Content-Type: text/plain\r\n\r\n"
102 f"HTML file could not be opened\r\n".encode()
106 f"{proto} 400 Bad request\r\n"
107 "Content-Type: text/plain\r\n\r\n"
108 "Bad request\r\n".encode()
111 log.warning("Unparseable data from fd %d: %s", fd, data)
116 """Websocket connection to the client"""
118 def __init__(self, sock: socket, addr: Tuple[str, int]) -> None:
121 self.ws = WSConnection(ConnectionType.SERVER)
124 self.imeis: Set[str] = set()
126 def close(self) -> None:
127 log.debug("Closing fd %d", self.sock.fileno())
130 def recv(self) -> Optional[List[Dict[str, Any]]]:
132 data = self.sock.recv(4096)
135 "Reading from fd %d: %s",
139 self.ws.receive_data(None)
141 if not data: # Client has closed connection
143 "EOF reading from fd %d",
146 self.ws.receive_data(None)
149 self.ws.receive_data(data)
150 except RemoteProtocolError as e:
152 "Websocket error on fd %d, try plain http (%s)",
156 self.ws_data = try_http(data, self.sock.fileno(), e)
157 # this `write` is a hack - writing _ought_ to be done at the
158 # stage when all other writes are performed. But I could not
159 # arrange it so in a logical way. Let it stay this way. The
160 # whole http server affair is a hack anyway.
162 log.debug("Sending HTTP response to %d", self.sock.fileno())
166 for event in self.ws.events():
167 if isinstance(event, Request):
168 log.debug("WebSocket upgrade on fd %d", self.sock.fileno())
169 # self.ws_data += self.ws.send(event.response()) # Why not?!
170 self.ws_data += self.ws.send(AcceptConnection())
172 elif isinstance(event, (CloseConnection, Ping)):
173 log.debug("%s on fd %d", event, self.sock.fileno())
174 self.ws_data += self.ws.send(event.response())
175 elif isinstance(event, TextMessage):
176 log.debug("%s on fd %d", event, self.sock.fileno())
177 msg = loads(event.data)
179 if msg.get("type", None) == "subscribe":
180 self.imeis = set(msg.get("imei", []))
182 "subs list on fd %s is %s",
187 log.warning("%s on fd %d", event, self.sock.fileno())
190 def wants(self, imei: str) -> bool:
192 "wants %s? set is %s on fd %d",
197 return imei in self.imeis
199 def send(self, message: Dict[str, Any]) -> None:
200 if self.ready and message["imei"] in self.imeis:
201 self.ws_data += self.ws.send(Message(data=dumps(message)))
203 def write(self) -> bool:
206 sent = self.sock.send(self.ws_data)
207 self.ws_data = self.ws_data[sent:]
210 "Sending to fd %d: %s",
215 return bool(self.ws_data)
219 def __init__(self) -> None:
220 self.by_fd: Dict[int, Client] = {}
222 def add(self, clntsock: socket, clntaddr: Tuple[str, int]) -> int:
223 fd = clntsock.fileno()
224 log.info("Start serving fd %d from %s", fd, clntaddr)
225 self.by_fd[fd] = Client(clntsock, clntaddr)
228 def stop(self, fd: int) -> None:
229 clnt = self.by_fd[fd]
230 log.info("Stop serving fd %d", clnt.sock.fileno())
234 def recv(self, fd: int) -> Optional[List[Dict[str, Any]]]:
235 clnt = self.by_fd[fd]
238 def send(self, msg: Dict[str, Any]) -> Set[int]:
240 for fd, clnt in self.by_fd.items():
241 if clnt.wants(msg["imei"]):
246 def write(self, towrite: Set[int]) -> Set[int]:
248 for fd, clnt in [(fd, self.by_fd.get(fd)) for fd in towrite]:
249 if clnt and clnt.write():
253 def subs(self) -> Set[str]:
255 for clnt in self.by_fd.values():
260 def runserver(conf: ConfigParser) -> None:
261 global htmlfile, pmods, selector
263 cast(ProtoModule, import_module("." + modnm, __package__))
264 for modnm in conf.get("collector", "protocols").split(",")
267 for proto, is_incoming in pmod.exposed_protos():
268 if proto != "ZX:STATUS": # TODO make it better
269 selector.append((is_incoming, proto))
270 initdb(conf.get("storage", "dbfn"))
271 htmlfile = conf.get("wsgateway", "htmlfile", fallback=None)
272 # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
273 zctx = zmq.Context() # type: ignore
274 zsub = zctx.socket(zmq.SUB) # type: ignore
275 zsub.connect(conf.get("collector", "publishurl"))
276 tcpl = socket(AF_INET6, SOCK_STREAM)
277 tcpl.setblocking(False)
278 tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
279 tcpl.bind(("", conf.getint("wsgateway", "port")))
281 tcpfd = tcpl.fileno()
282 poller = zmq.Poller() # type: ignore
283 poller.register(zsub, flags=zmq.POLLIN)
284 poller.register(tcpfd, flags=zmq.POLLIN)
286 activesubs: Set[str] = set()
288 towait: Set[int] = set()
290 neededsubs = clients.subs()
292 for proto, is_incoming in pmod.exposed_protos():
293 for imei in neededsubs - activesubs:
296 topic(proto, is_incoming, imei),
298 for imei in activesubs - neededsubs:
301 topic(proto, is_incoming, imei),
303 activesubs = neededsubs
304 log.debug("Subscribed to: %s", activesubs)
309 events = poller.poll()
310 for sk, fl in events:
314 zmsg = Bcast(zsub.recv(zmq.NOBLOCK))
316 if pmod.proto_handled(zmsg.proto):
317 msg = pmod.parse_message(
318 zmsg.packet, zmsg.is_incoming
320 log.debug("Got %s with %s", zmsg, msg)
321 if zmsg.proto == "ZX:STATUS":
327 datetime.fromtimestamp(
329 ).astimezone(tz=timezone.utc)
340 datetime.fromtimestamp(
342 ).astimezone(tz=timezone.utc)
344 "longitude": msg.longitude,
345 "latitude": msg.latitude,
354 clntsock, clntaddr = tcpl.accept()
355 topoll.append((clntsock, clntaddr))
356 elif fl & zmq.POLLIN:
357 received = clients.recv(sk)
359 log.debug("Client gone from fd %d", sk)
363 for wsmsg in received:
364 log.debug("Received from %d: %s", sk, wsmsg)
365 if wsmsg.get("type", None) == "subscribe":
366 # Have to live w/o typeckeding from json
367 imeis = cast(List[str], wsmsg.get("imei"))
368 numback: int = wsmsg.get("backlog", 5)
370 tosend.extend(backlog(imei, numback))
372 elif fl & zmq.POLLOUT:
373 log.debug("Write now open for fd %d", sk)
377 log.debug("Stray event: %s on socket %s", fl, sk)
378 # poll queue consumed, make changes now
380 poller.unregister(fd) # type: ignore
383 log.debug("Sending to the clients: %s", wsmsg)
384 towrite |= clients.send(wsmsg)
385 for clntsock, clntaddr in topoll:
386 fd = clients.add(clntsock, clntaddr)
387 poller.register(fd, flags=zmq.POLLIN)
388 # Deal with actually writing the data out
389 trywrite = towrite - towait
390 morewait = clients.write(trywrite)
392 "towait %s, tried %s, still busy %s",
397 for fd in morewait - trywrite: # new fds waiting for write
398 poller.modify(fd, flags=zmq.POLLIN | zmq.POLLOUT) # type: ignore
399 for fd in trywrite - morewait: # no longer waiting for write
400 poller.modify(fd, flags=zmq.POLLIN) # type: ignore
403 except KeyboardInterrupt:
405 zctx.destroy() # type: ignore
409 if __name__.endswith("__main__"):
410 runserver(common.init(log))