]> www.average.org Git - loctrkd.git/blob - gps303/wsgateway.py
1843ce9b01d426aa897e4625d6ef6721bbc7421c
[loctrkd.git] / gps303 / wsgateway.py
1 """ Websocket Gateway """
2
3 from datetime import datetime, timezone
4 from json import dumps, loads
5 from logging import getLogger
6 from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
7 from time import time
8 from wsproto import ConnectionType, WSConnection
9 from wsproto.events import (
10     AcceptConnection,
11     CloseConnection,
12     Message,
13     Ping,
14     Request,
15     TextMessage,
16 )
17 from wsproto.utilities import RemoteProtocolError
18 import zmq
19
20 from . import common
21 from .evstore import initdb, fetch
22 from .gps303proto import (
23     GPS_POSITIONING,
24     STATUS,
25     WIFI_POSITIONING,
26     parse_message,
27 )
28 from .zmsg import Bcast, topic
29
30 log = getLogger("gps303/wsgateway")
31 htmlfile = None
32
33
34 def backlog(imei, numback):
35     result = []
36     for is_incoming, timestamp, packet in fetch(
37         imei,
38         ((True, GPS_POSITIONING.PROTO), (False, WIFI_POSITIONING.PROTO)),
39         numback,
40     ):
41         msg = parse_message(packet, is_incoming=is_incoming)
42         result.append(
43             {
44                 "type": "location",
45                 "imei": imei,
46                 "timestamp": str(
47                     datetime.fromtimestamp(timestamp).astimezone(
48                         tz=timezone.utc
49                     )
50                 ),
51                 "longitude": msg.longitude,
52                 "latitude": msg.latitude,
53                 "accuracy": "gps"
54                 if isinstance(msg, GPS_POSITIONING)
55                 else "approximate",
56             }
57         )
58     return result
59
60
61 def try_http(data, fd, e):
62     global htmlfile
63     try:
64         lines = data.decode().split("\r\n")
65         request = lines[0]
66         headers = lines[1:]
67         op, resource, proto = request.split(" ")
68         log.debug(
69             "HTTP %s for %s, proto %s from fd %d, headers: %s",
70             op,
71             resource,
72             proto,
73             fd,
74             headers,
75         )
76         if op == "GET":
77             if htmlfile is None:
78                 return (
79                     f"{proto} 500 No data configured\r\n"
80                     f"Content-Type: text/plain\r\n\r\n"
81                     f"HTML data not configured on the server\r\n".encode()
82                 )
83             else:
84                 try:
85                     with open(htmlfile, "rb") as fl:
86                         htmldata = fl.read()
87                     length = len(htmldata)
88                     return (
89                         f"{proto} 200 Ok\r\n"
90                         f"Content-Type: text/html; charset=utf-8\r\n"
91                         f"Content-Length: {len(htmldata):d}\r\n\r\n"
92                     ).encode("utf-8") + htmldata
93                 except OSError:
94                     return (
95                         f"{proto} 500 File not found\r\n"
96                         f"Content-Type: text/plain\r\n\r\n"
97                         f"HTML file could not be opened\r\n".encode()
98                     )
99         else:
100             return (
101                 f"{proto} 400 Bad request\r\n"
102                 "Content-Type: text/plain\r\n\r\n"
103                 "Bad request\r\n".encode()
104             )
105     except ValueError:
106         log.warning("Unparseable data from fd %d: %s", fd, data)
107         raise e
108
109
110 class Client:
111     """Websocket connection to the client"""
112
113     def __init__(self, sock, addr):
114         self.sock = sock
115         self.addr = addr
116         self.ws = WSConnection(ConnectionType.SERVER)
117         self.ws_data = b""
118         self.ready = False
119         self.imeis = set()
120
121     def close(self):
122         log.debug("Closing fd %d", self.sock.fileno())
123         self.sock.close()
124
125     def recv(self):
126         try:
127             data = self.sock.recv(4096)
128         except OSError:
129             log.warning(
130                 "Reading from fd %d: %s",
131                 self.sock.fileno(),
132                 e,
133             )
134             self.ws.receive_data(None)
135             return None
136         if not data:  # Client has closed connection
137             log.info(
138                 "EOF reading from fd %d",
139                 self.sock.fileno(),
140             )
141             self.ws.receive_data(None)
142             return None
143         try:
144             self.ws.receive_data(data)
145         except RemoteProtocolError as e:
146             log.debug(
147                 "Websocket error on fd %d, try plain http (%s)",
148                 self.sock.fileno(),
149                 e,
150             )
151             self.ws_data = try_http(data, self.sock.fileno(), e)
152             self.write()  # TODO this is a hack
153             log.debug("Sending HTTP response to %d", self.sock.fileno())
154             msgs = None
155         else:
156             msgs = []
157             for event in self.ws.events():
158                 if isinstance(event, Request):
159                     log.debug("WebSocket upgrade on fd %d", self.sock.fileno())
160                     # self.ws_data += self.ws.send(event.response())  # Why not?!
161                     self.ws_data += self.ws.send(AcceptConnection())
162                     self.ready = True
163                 elif isinstance(event, (CloseConnection, Ping)):
164                     log.debug("%s on fd %d", event, self.sock.fileno())
165                     self.ws_data += self.ws.send(event.response())
166                 elif isinstance(event, TextMessage):
167                     log.debug("%s on fd %d", event, self.sock.fileno())
168                     msg = loads(event.data)
169                     msgs.append(msg)
170                     if msg.get("type", None) == "subscribe":
171                         self.imeis = set(msg.get("imei", []))
172                         log.debug(
173                             "subs list on fd %s is %s",
174                             self.sock.fileno(),
175                             self.imeis,
176                         )
177                 else:
178                     log.warning("%s on fd %d", event, self.sock.fileno())
179         return msgs
180
181     def wants(self, imei):
182         log.debug(
183             "wants %s? set is %s on fd %d",
184             imei,
185             self.imeis,
186             self.sock.fileno(),
187         )
188         return imei in self.imeis
189
190     def send(self, message):
191         if self.ready and message["imei"] in self.imeis:
192             self.ws_data += self.ws.send(Message(data=dumps(message)))
193
194     def write(self):
195         if self.ws_data:
196             try:
197                 sent = self.sock.send(self.ws_data)
198                 self.ws_data = self.ws_data[sent:]
199             except OSError as e:
200                 log.error(
201                     "Sending to fd %d: %s",
202                     self.sock.fileno(),
203                     e,
204                 )
205                 self.ws_data = b""
206         return bool(self.ws_data)
207
208
209 class Clients:
210     def __init__(self):
211         self.by_fd = {}
212
213     def add(self, clntsock, clntaddr):
214         fd = clntsock.fileno()
215         log.info("Start serving fd %d from %s", fd, clntaddr)
216         self.by_fd[fd] = Client(clntsock, clntaddr)
217         return fd
218
219     def stop(self, fd):
220         clnt = self.by_fd[fd]
221         log.info("Stop serving fd %d", clnt.sock.fileno())
222         clnt.close()
223         del self.by_fd[fd]
224
225     def recv(self, fd):
226         clnt = self.by_fd[fd]
227         return clnt.recv()
228
229     def send(self, msg):
230         towrite = set()
231         for fd, clnt in self.by_fd.items():
232             if clnt.wants(msg["imei"]):
233                 clnt.send(msg)
234                 towrite.add(fd)
235         return towrite
236
237     def write(self, towrite):
238         waiting = set()
239         for fd, clnt in [(fd, self.by_fd.get(fd)) for fd in towrite]:
240             if clnt.write():
241                 waiting.add(fd)
242         return waiting
243
244     def subs(self):
245         result = set()
246         for clnt in self.by_fd.values():
247             result |= clnt.imeis
248         return result
249
250
251 def runserver(conf):
252     global htmlfile
253
254     initdb(conf.get("storage", "dbfn"))
255     htmlfile = conf.get("wsgateway", "htmlfile")
256     zctx = zmq.Context()
257     zsub = zctx.socket(zmq.SUB)
258     zsub.connect(conf.get("collector", "publishurl"))
259     tcpl = socket(AF_INET6, SOCK_STREAM)
260     tcpl.setblocking(False)
261     tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
262     tcpl.bind(("", conf.getint("wsgateway", "port")))
263     tcpl.listen(5)
264     tcpfd = tcpl.fileno()
265     poller = zmq.Poller()
266     poller.register(zsub, flags=zmq.POLLIN)
267     poller.register(tcpfd, flags=zmq.POLLIN)
268     clients = Clients()
269     activesubs = set()
270     try:
271         towait = set()
272         while True:
273             neededsubs = clients.subs()
274             for imei in neededsubs - activesubs:
275                 zsub.setsockopt(
276                     zmq.SUBSCRIBE,
277                     topic(GPS_POSITIONING.PROTO, True, imei),
278                 )
279                 zsub.setsockopt(
280                     zmq.SUBSCRIBE,
281                     topic(WIFI_POSITIONING.PROTO, False, imei),
282                 )
283                 zsub.setsockopt(
284                     zmq.SUBSCRIBE,
285                     topic(STATUS.PROTO, True, imei),
286                 )
287             for imei in activesubs - neededsubs:
288                 zsub.setsockopt(
289                     zmq.UNSUBSCRIBE,
290                     topic(GPS_POSITIONING.PROTO, True, imei),
291                 )
292                 zsub.setsockopt(
293                     zmq.UNSUBSCRIBE,
294                     topic(WIFI_POSITIONING.PROTO, False, imei),
295                 )
296                 zsub.setsockopt(
297                     zmq.UNSUBSCRIBE,
298                     topic(STATUS.PROTO, True, imei),
299                 )
300             activesubs = neededsubs
301             log.debug("Subscribed to: %s", activesubs)
302             tosend = []
303             topoll = []
304             tostop = []
305             towrite = set()
306             events = poller.poll()
307             for sk, fl in events:
308                 if sk is zsub:
309                     while True:
310                         try:
311                             zmsg = Bcast(zsub.recv(zmq.NOBLOCK))
312                             msg = parse_message(zmsg.packet, zmsg.is_incoming)
313                             log.debug("Got %s with %s", zmsg, msg)
314                             if isinstance(msg, STATUS):
315                                 tosend.append(
316                                     {
317                                         "type": "status",
318                                         "imei": zmsg.imei,
319                                         "timestamp": str(
320                                             datetime.fromtimestamp(
321                                                 zmsg.when
322                                             ).astimezone(tz=timezone.utc)
323                                         ),
324                                         "battery": msg.batt,
325                                     }
326                                 )
327                             else:
328                                 tosend.append(
329                                     {
330                                         "type": "location",
331                                         "imei": zmsg.imei,
332                                         "timestamp": str(
333                                             datetime.fromtimestamp(
334                                                 zmsg.when
335                                             ).astimezone(tz=timezone.utc)
336                                         ),
337                                         "longitude": msg.longitude,
338                                         "latitude": msg.latitude,
339                                         "accuracy": "gps"
340                                         if zmsg.is_incoming
341                                         else "approximate",
342                                     }
343                                 )
344                         except zmq.Again:
345                             break
346                 elif sk == tcpfd:
347                     clntsock, clntaddr = tcpl.accept()
348                     topoll.append((clntsock, clntaddr))
349                 elif fl & zmq.POLLIN:
350                     received = clients.recv(sk)
351                     if received is None:
352                         log.debug("Client gone from fd %d", sk)
353                         tostop.append(sk)
354                         towait.discard(fd)
355                     else:
356                         for msg in received:
357                             log.debug("Received from %d: %s", sk, msg)
358                             if msg.get("type", None) == "subscribe":
359                                 imeis = msg.get("imei")
360                                 numback = msg.get("backlog", 5)
361                                 for imei in imeis:
362                                     tosend.extend(backlog(imei, numback))
363                         towrite.add(sk)
364                 elif fl & zmq.POLLOUT:
365                     log.debug("Write now open for fd %d", sk)
366                     towrite.add(sk)
367                     towait.discard(sk)
368                 else:
369                     log.debug("Stray event: %s on socket %s", fl, sk)
370             # poll queue consumed, make changes now
371             for fd in tostop:
372                 poller.unregister(fd)
373                 clients.stop(fd)
374             for zmsg in tosend:
375                 log.debug("Sending to the clients: %s", zmsg)
376                 towrite |= clients.send(zmsg)
377             for clntsock, clntaddr in topoll:
378                 fd = clients.add(clntsock, clntaddr)
379                 poller.register(fd, flags=zmq.POLLIN)
380             # Deal with actually writing the data out
381             trywrite = towrite - towait
382             morewait = clients.write(trywrite)
383             log.debug(
384                 "towait %s, tried %s, still busy %s",
385                 towait,
386                 trywrite,
387                 morewait,
388             )
389             for fd in morewait - trywrite:  # new fds waiting for write
390                 poller.modify(fd, flags=zmq.POLLIN | zmq.POLLOUT)
391             for fd in trywrite - morewait:  # no longer waiting for write
392                 poller.modify(fd, flags=zmq.POLLIN)
393             towait &= trywrite
394             towait |= morewait
395     except KeyboardInterrupt:
396         pass
397
398
399 if __name__.endswith("__main__"):
400     runserver(common.init(log))