]> www.average.org Git - loctrkd.git/blob - loctrkd/wsgateway.py
Implement sending commands from the web interface
[loctrkd.git] / loctrkd / wsgateway.py
1 """ Websocket Gateway """
2
3 from configparser import ConfigParser
4 from datetime import datetime, timezone
5 from importlib import import_module
6 from json import dumps, loads
7 from logging import getLogger
8 from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
9 from time import time
10 from typing import Any, cast, Dict, List, Optional, Set, Tuple
11 from wsproto import ConnectionType, WSConnection
12 from wsproto.events import (
13     AcceptConnection,
14     CloseConnection,
15     Event,
16     Message,
17     Ping,
18     Request,
19     TextMessage,
20 )
21 from wsproto.utilities import RemoteProtocolError
22 import zmq
23
24 from . import common
25 from .evstore import initdb, fetch, fetchpmod
26 from .protomodule import ProtoModule
27 from .zmsg import Rept, Resp, rtopic
28
29 log = getLogger("loctrkd/wsgateway")
30
31 htmlfile = None
32
33
34 def backlog(imei: str, numback: int) -> List[Dict[str, Any]]:
35     result = []
36     for report in fetch(imei, numback):
37         report["type"] = "location"
38         timestamp = report.pop("devtime")
39         report["timestamp"] = timestamp
40         result.append(report)
41     return result
42
43
44 def try_http(data: bytes, fd: int, e: Exception) -> bytes:
45     global htmlfile
46     try:
47         lines = data.decode().split("\r\n")
48         request = lines[0]
49         headers = lines[1:]
50         op, resource, proto = request.split(" ")
51         log.debug(
52             "HTTP %s for %s, proto %s from fd %d, headers: %s",
53             op,
54             resource,
55             proto,
56             fd,
57             headers,
58         )
59         if op == "GET":
60             if htmlfile is None:
61                 return (
62                     f"{proto} 500 No data configured\r\n"
63                     f"Content-Type: text/plain\r\n\r\n"
64                     f"HTML data not configured on the server\r\n".encode()
65                 )
66             else:
67                 try:
68                     with open(htmlfile, "rb") as fl:
69                         htmldata = fl.read()
70                     length = len(htmldata)
71                     return (
72                         f"{proto} 200 Ok\r\n"
73                         f"Content-Type: text/html; charset=utf-8\r\n"
74                         f"Content-Length: {len(htmldata):d}\r\n\r\n"
75                     ).encode("utf-8") + htmldata
76                 except OSError:
77                     return (
78                         f"{proto} 500 File not found\r\n"
79                         f"Content-Type: text/plain\r\n\r\n"
80                         f"HTML file could not be opened\r\n".encode()
81                     )
82         else:
83             return (
84                 f"{proto} 400 Bad request\r\n"
85                 "Content-Type: text/plain\r\n\r\n"
86                 "Bad request\r\n".encode()
87             )
88     except ValueError:
89         log.warning("Unparseable data from fd %d: %s", fd, data)
90         raise e
91
92
93 class Client:
94     """Websocket connection to the client"""
95
96     def __init__(self, sock: socket, addr: Tuple[str, int]) -> None:
97         self.sock = sock
98         self.addr = addr
99         self.ws = WSConnection(ConnectionType.SERVER)
100         self.ws_data = b""
101         self.ready = False
102         self.imeis: Set[str] = set()
103
104     def __str__(self) -> str:
105         return f"{self.__class__.__name__}(fd={self.sock.fileno()}, addr={self.addr})"
106
107     def close(self) -> None:
108         log.debug("Closing fd %d", self.sock.fileno())
109         self.sock.close()
110
111     def recv(self) -> Optional[List[Dict[str, Any]]]:
112         try:
113             data = self.sock.recv(4096)
114         except OSError as e:
115             log.warning(
116                 "Reading from fd %d: %s",
117                 self.sock.fileno(),
118                 e,
119             )
120             self.ws.receive_data(None)
121             return None
122         if not data:  # Client has closed connection
123             log.info(
124                 "EOF reading from fd %d",
125                 self.sock.fileno(),
126             )
127             self.ws.receive_data(None)
128             return None
129         try:
130             self.ws.receive_data(data)
131         except RemoteProtocolError as e:
132             log.debug(
133                 "Websocket error on fd %d, try plain http (%s)",
134                 self.sock.fileno(),
135                 e,
136             )
137             self.ws_data = try_http(data, self.sock.fileno(), e)
138             # this `write` is a hack - writing _ought_ to be done at the
139             # stage when all other writes are performed. But I could not
140             # arrange it so in a logical way. Let it stay this way. The
141             # whole http server affair is a hack anyway.
142             self.write()
143             log.debug("Sending HTTP response to %d", self.sock.fileno())
144             msgs = None
145         else:
146             msgs = []
147             for event in self.ws.events():
148                 if isinstance(event, Request):
149                     log.debug("WebSocket upgrade on fd %d", self.sock.fileno())
150                     # self.ws_data += self.ws.send(event.response())  # Why not?!
151                     self.ws_data += self.ws.send(AcceptConnection())
152                     self.ready = True
153                 elif isinstance(event, (CloseConnection, Ping)):
154                     log.debug("%s on fd %d", event, self.sock.fileno())
155                     self.ws_data += self.ws.send(event.response())
156                 elif isinstance(event, TextMessage):
157                     log.debug("%s on fd %d", event, self.sock.fileno())
158                     msg = loads(event.data)
159                     msgs.append(msg)
160                     if msg.get("type", None) == "subscribe":
161                         self.imeis = set(msg.get("imei", []))
162                         log.debug(
163                             "subs list on fd %s is %s",
164                             self.sock.fileno(),
165                             self.imeis,
166                         )
167                 else:
168                     log.warning("%s on fd %d", event, self.sock.fileno())
169         return msgs
170
171     def wants(self, imei: str) -> bool:
172         log.debug(
173             "wants %s? set is %s on fd %d",
174             imei,
175             self.imeis,
176             self.sock.fileno(),
177         )
178         return imei in self.imeis
179
180     def send(self, message: Dict[str, Any]) -> None:
181         if self.ready and message["imei"] in self.imeis:
182             self.ws_data += self.ws.send(Message(data=dumps(message)))
183
184     def write(self) -> bool:
185         if self.ws_data:
186             try:
187                 sent = self.sock.send(self.ws_data)
188                 self.ws_data = self.ws_data[sent:]
189             except OSError as e:
190                 log.error(
191                     "Sending to fd %d: %s",
192                     self.sock.fileno(),
193                     e,
194                 )
195                 self.ws_data = b""
196         return bool(self.ws_data)
197
198
199 class Clients:
200     def __init__(self) -> None:
201         self.by_fd: Dict[int, Client] = {}
202
203     def add(self, clntsock: socket, clntaddr: Tuple[str, int]) -> int:
204         fd = clntsock.fileno()
205         log.info("Start serving fd %d from %s", fd, clntaddr)
206         self.by_fd[fd] = Client(clntsock, clntaddr)
207         return fd
208
209     def stop(self, fd: int) -> None:
210         clnt = self.by_fd[fd]
211         log.info("Stop serving fd %d", clnt.sock.fileno())
212         clnt.close()
213         del self.by_fd[fd]
214
215     def recv(self, fd: int) -> Tuple[Client, Optional[List[Dict[str, Any]]]]:
216         clnt = self.by_fd[fd]
217         return (clnt, clnt.recv())
218
219     def send(self, clnt: Optional[Client], msg: Dict[str, Any]) -> Set[int]:
220         towrite = set()
221         if clnt is None:
222             for fd, cl in self.by_fd.items():
223                 if cl.wants(msg["imei"]):
224                     cl.send(msg)
225                     towrite.add(fd)
226         else:
227             fd = clnt.sock.fileno()
228             if self.by_fd.get(fd, None) == clnt:
229                 clnt.send(msg)
230                 towrite.add(fd)
231             else:
232                 log.info(
233                     "Trying to send %s to client at %d, not in service",
234                     msg,
235                     fd,
236                 )
237         return towrite
238
239     def write(self, towrite: Set[int]) -> Set[int]:
240         waiting = set()
241         for fd, clnt in [(fd, self.by_fd.get(fd)) for fd in towrite]:
242             if clnt and clnt.write():
243                 waiting.add(fd)
244         return waiting
245
246     def subs(self) -> Set[str]:
247         result = set()
248         for clnt in self.by_fd.values():
249             result |= clnt.imeis
250         return result
251
252
253 def sendcmd(zpush: Any, wsmsg: Dict[str, Any]) -> Dict[str, Any]:
254     imei = wsmsg.pop("imei", None)
255     cmd = wsmsg.pop("type", None)
256     if imei is None or cmd is None:
257         log.info("Unhandled message %s %s %s", cmd, imei, wsmsg)
258         return {
259             "type": "cmdresult",
260             "imei": imei,
261             "result": "Did not get imei or cmd",
262         }
263     pmod = fetchpmod(imei)
264     if pmod is None:
265         log.info("Uknown type of recipient for %s %s %s", cmd, imei, wsmsg)
266         return {
267             "type": "cmdresult",
268             "imei": imei,
269             "result": "Type of the terminal is unknown",
270         }
271     tmsg = common.make_response(pmod, cmd, imei, **wsmsg)
272     if tmsg is None:
273         log.info("Could not make packet for %s %s %s", cmd, imei, wsmsg)
274         return {
275             "type": "cmdresult",
276             "imei": imei,
277             "result": f"{cmd} unimplemented for terminal protocol {pmod}",
278         }
279     resp = Resp(imei=imei, when=time(), packet=tmsg.packed)
280     log.debug("Response: %s", resp)
281     zpush.send(resp.packed)
282     return {
283         "type": "cmdresult",
284         "imei": imei,
285         "result": f"{cmd} sent to {imei}",
286     }
287
288
289 def runserver(conf: ConfigParser) -> None:
290     global htmlfile
291     initdb(conf.get("storage", "dbfn"))
292     htmlfile = conf.get("wsgateway", "htmlfile", fallback=None)
293     # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
294     zctx = zmq.Context()  # type: ignore
295     zsub = zctx.socket(zmq.SUB)  # type: ignore
296     zsub.connect(conf.get("rectifier", "publishurl"))
297     zpush = zctx.socket(zmq.PUSH)  # type: ignore
298     zpush.connect(conf.get("collector", "listenurl"))
299     tcpl = socket(AF_INET6, SOCK_STREAM)
300     tcpl.setblocking(False)
301     tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
302     tcpl.bind(("", conf.getint("wsgateway", "port")))
303     tcpl.listen(5)
304     tcpfd = tcpl.fileno()
305     poller = zmq.Poller()  # type: ignore
306     poller.register(zsub, flags=zmq.POLLIN)
307     poller.register(tcpfd, flags=zmq.POLLIN)
308     clients = Clients()
309     activesubs: Set[str] = set()
310     try:
311         towait: Set[int] = set()
312         while True:
313             neededsubs = clients.subs()
314             for imei in neededsubs - activesubs:
315                 zsub.setsockopt(zmq.SUBSCRIBE, rtopic(imei))
316             for imei in activesubs - neededsubs:
317                 zsub.setsockopt(zmq.UNSUBSCRIBE, rtopic(imei))
318             activesubs = neededsubs
319             log.debug("Subscribed to: %s", activesubs)
320             tosend: List[Tuple[Optional[Client], Dict[str, Any]]] = []
321             topoll = []
322             tostop = []
323             towrite = set()
324             events = poller.poll()
325             for sk, fl in events:
326                 if sk is zsub:
327                     while True:
328                         try:
329                             zmsg = Rept(zsub.recv(zmq.NOBLOCK))
330                             msg = loads(zmsg.payload)
331                             msg["imei"] = zmsg.imei
332                             log.debug("Got %s, sending %s", zmsg, msg)
333                             tosend.append((None, msg))
334                         except zmq.Again:
335                             break
336                 elif sk == tcpfd:
337                     clntsock, clntaddr = tcpl.accept()
338                     topoll.append((clntsock, clntaddr))
339                 elif fl & zmq.POLLIN:
340                     clnt, received = clients.recv(sk)
341                     if received is None:
342                         log.debug("Client gone from fd %d", sk)
343                         tostop.append(sk)
344                         towait.discard(sk)
345                     else:
346                         for wsmsg in received:
347                             log.debug("Received from %d: %s", sk, wsmsg)
348                             if wsmsg.get("type", None) == "subscribe":
349                                 # Have to live w/o typeckeding from json
350                                 imeis = cast(List[str], wsmsg.get("imei"))
351                                 numback: int = wsmsg.get("backlog", 5)
352                                 for imei in imeis:
353                                     tosend.extend(
354                                         [
355                                             (clnt, msg)
356                                             for msg in backlog(imei, numback)
357                                         ]
358                                     )
359                             else:
360                                 tosend.append((clnt, sendcmd(zpush, wsmsg)))
361                         towrite.add(sk)
362                 elif fl & zmq.POLLOUT:
363                     log.debug("Write now open for fd %d", sk)
364                     towrite.add(sk)
365                     towait.discard(sk)
366                 else:
367                     log.debug("Stray event: %s on socket %s", fl, sk)
368             # poll queue consumed, make changes now
369             for fd in tostop:
370                 poller.unregister(fd)  # type: ignore
371                 clients.stop(fd)
372             for towhom, wsmsg in tosend:
373                 log.debug("Sending to the client %s: %s", towhom, wsmsg)
374                 towrite |= clients.send(towhom, wsmsg)
375             for clntsock, clntaddr in topoll:
376                 fd = clients.add(clntsock, clntaddr)
377                 poller.register(fd, flags=zmq.POLLIN)
378             # Deal with actually writing the data out
379             trywrite = towrite - towait
380             morewait = clients.write(trywrite)
381             log.debug(
382                 "towait %s, tried %s, still busy %s",
383                 towait,
384                 trywrite,
385                 morewait,
386             )
387             for fd in morewait - trywrite:  # new fds waiting for write
388                 poller.modify(fd, flags=zmq.POLLIN | zmq.POLLOUT)  # type: ignore
389             for fd in trywrite - morewait:  # no longer waiting for write
390                 poller.modify(fd, flags=zmq.POLLIN)  # type: ignore
391             towait &= trywrite
392             towait |= morewait
393     except KeyboardInterrupt:
394         zsub.close()
395         zctx.destroy()  # type: ignore
396         tcpl.close()
397
398
399 if __name__.endswith("__main__"):
400     runserver(common.init(log))