]> www.average.org Git - loctrkd.git/blob - gps303/collector.py
typechecking: annotate collector.py
[loctrkd.git] / gps303 / collector.py
1 """ TCP server that communicates with terminals """
2
3 from configparser import ConfigParser
4 from logging import getLogger
5 from os import umask
6 from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
7 from struct import pack
8 from time import time
9 from typing import Dict, List, Optional, Tuple
10 import zmq
11
12 from . import common
13 from .gps303proto import (
14     HIBERNATION,
15     LOGIN,
16     inline_response,
17     parse_message,
18     proto_of_message,
19 )
20 from .zmsg import Bcast, Resp
21
22 log = getLogger("gps303/collector")
23
24
25 class Client:
26     """Connected socket to the terminal plus buffer and metadata"""
27
28     def __init__(self, sock: socket, addr: Tuple[str, int]) -> None:
29         self.sock = sock
30         self.addr = addr
31         self.buffer = b""
32         self.imei: Optional[str] = None
33
34     def close(self) -> None:
35         log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei)
36         self.sock.close()
37         self.buffer = b""
38
39     def recv(self) -> Optional[List[Tuple[float, Tuple[str, int], bytes]]]:
40         """Read from the socket and parse complete messages"""
41         try:
42             segment = self.sock.recv(4096)
43         except OSError as e:
44             log.warning(
45                 "Reading from fd %d (IMEI %s): %s",
46                 self.sock.fileno(),
47                 self.imei,
48                 e,
49             )
50             return None
51         if not segment:  # Terminal has closed connection
52             log.info(
53                 "EOF reading from fd %d (IMEI %s)",
54                 self.sock.fileno(),
55                 self.imei,
56             )
57             return None
58         when = time()
59         self.buffer += segment
60         msgs = []
61         while True:
62             framestart = self.buffer.find(b"xx")
63             if framestart == -1:  # No frames, return whatever we have
64                 break
65             if framestart > 0:  # Should not happen, report
66                 log.warning(
67                     'Undecodable data "%s" from fd %d (IMEI %s)',
68                     self.buffer[:framestart].hex(),
69                     self.sock.fileno(),
70                     self.imei,
71                 )
72                 self.buffer = self.buffer[framestart:]
73             # At this point, buffer starts with a packet
74             if len(self.buffer) < 6:  # no len and proto - cannot proceed
75                 break
76             exp_end = self.buffer[2] + 3  # Expect '\r\n' here
77             frameend = 0
78             # Length field can legitimeely be much less than the
79             # length of the packet (e.g. WiFi positioning), but
80             # it _should not_ be greater. Still sometimes it is.
81             # Luckily, not by too much: by maybe two or three bytes?
82             # Do this embarrassing hack to avoid accidental match
83             # of some binary data in the packet against '\r\n'.
84             while True:
85                 frameend = self.buffer.find(b"\r\n", frameend)
86                 if frameend >= (exp_end - 3):  # Found realistic match
87                     break
88             if frameend == -1:  # Incomplete frame, return what we have
89                 break
90             packet = self.buffer[2:frameend]
91             self.buffer = self.buffer[frameend + 2 :]
92             if proto_of_message(packet) == LOGIN.PROTO:
93                 self.imei = parse_message(packet).imei
94                 log.info(
95                     "LOGIN from fd %d (IMEI %s)", self.sock.fileno(), self.imei
96                 )
97             msgs.append((when, self.addr, packet))
98         return msgs
99
100     def send(self, buffer: bytes) -> None:
101         try:
102             self.sock.send(b"xx" + buffer + b"\r\n")
103         except OSError as e:
104             log.error(
105                 "Sending to fd %d (IMEI %s): %s",
106                 self.sock.fileno,
107                 self.imei,
108                 e,
109             )
110
111
112 class Clients:
113     def __init__(self) -> None:
114         self.by_fd: Dict[int, Client] = {}
115         self.by_imei: Dict[str, Client] = {}
116
117     def add(self, clntsock: socket, clntaddr: Tuple[str, int]) -> int:
118         fd = clntsock.fileno()
119         log.info("Start serving fd %d from %s", fd, clntaddr)
120         self.by_fd[fd] = Client(clntsock, clntaddr)
121         return fd
122
123     def stop(self, fd: int) -> None:
124         clnt = self.by_fd[fd]
125         log.info("Stop serving fd %d (IMEI %s)", clnt.sock.fileno(), clnt.imei)
126         clnt.close()
127         if clnt.imei:
128             del self.by_imei[clnt.imei]
129         del self.by_fd[fd]
130
131     def recv(self, fd: int) -> Optional[List[Tuple[Optional[str], float, Tuple[str, int], bytes]]]:
132         clnt = self.by_fd[fd]
133         msgs = clnt.recv()
134         if msgs is None:
135             return None
136         result = []
137         for when, peeraddr, packet in msgs:
138             if proto_of_message(packet) == LOGIN.PROTO:  # Could do blindly...
139                 if clnt.imei:
140                     self.by_imei[clnt.imei] = clnt
141                 else:
142                     log.warning("Login message from %s: %s, but client imei unfilled", peeraddr, packet)
143             result.append((clnt.imei, when, peeraddr, packet))
144             log.debug(
145                 "Received from %s (IMEI %s): %s",
146                 peeraddr,
147                 clnt.imei,
148                 packet.hex(),
149             )
150         return result
151
152     def response(self, resp: Resp) -> None:
153         if resp.imei in self.by_imei:
154             self.by_imei[resp.imei].send(resp.packet)
155         else:
156             log.info("Not connected (IMEI %s)", resp.imei)
157
158
159 def runserver(conf: ConfigParser) -> None:
160     # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
161     zctx = zmq.Context()  # type: ignore
162     zpub = zctx.socket(zmq.PUB)  # type: ignore
163     zpull = zctx.socket(zmq.PULL)  # type: ignore
164     oldmask = umask(0o117)
165     zpub.bind(conf.get("collector", "publishurl"))
166     zpull.bind(conf.get("collector", "listenurl"))
167     umask(oldmask)
168     tcpl = socket(AF_INET6, SOCK_STREAM)
169     tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
170     tcpl.bind(("", conf.getint("collector", "port")))
171     tcpl.listen(5)
172     tcpfd = tcpl.fileno()
173     poller = zmq.Poller()  # type: ignore
174     poller.register(zpull, flags=zmq.POLLIN)
175     poller.register(tcpfd, flags=zmq.POLLIN)
176     clients = Clients()
177     try:
178         while True:
179             tosend = []
180             topoll = []
181             tostop = []
182             events = poller.poll(1000)
183             for sk, fl in events:
184                 if sk is zpull:
185                     while True:
186                         try:
187                             msg = zpull.recv(zmq.NOBLOCK)
188                             zmsg = Resp(msg)
189                             tosend.append(zmsg)
190                         except zmq.Again:
191                             break
192                 elif sk == tcpfd:
193                     clntsock, clntaddr = tcpl.accept()
194                     topoll.append((clntsock, clntaddr))
195                 elif fl & zmq.POLLIN:
196                     received = clients.recv(sk)
197                     if received is None:
198                         log.debug("Terminal gone from fd %d", sk)
199                         tostop.append(sk)
200                     else:
201                         for imei, when, peeraddr, packet in received:
202                             proto = proto_of_message(packet)
203                             zpub.send(
204                                 Bcast(
205                                     proto=proto,
206                                     imei=imei,
207                                     when=when,
208                                     peeraddr=peeraddr,
209                                     packet=packet,
210                                 ).packed
211                             )
212                             if proto == HIBERNATION.PROTO:
213                                 log.debug(
214                                     "HIBERNATION from fd %d (IMEI %s)",
215                                     sk,
216                                     imei,
217                                 )
218                                 tostop.append(sk)
219                             respmsg = inline_response(packet)
220                             if respmsg is not None:
221                                 tosend.append(
222                                     Resp(imei=imei, when=when, packet=respmsg)
223                                 )
224                 else:
225                     log.debug("Stray event: %s on socket %s", fl, sk)
226             # poll queue consumed, make changes now
227             for fd in tostop:
228                 poller.unregister(fd)  # type: ignore
229                 clients.stop(fd)
230             for zmsg in tosend:
231                 zpub.send(
232                     Bcast(
233                         is_incoming=False,
234                         proto=proto_of_message(zmsg.packet),
235                         when=zmsg.when,
236                         imei=zmsg.imei,
237                         packet=zmsg.packet,
238                     ).packed
239                 )
240                 log.debug("Sending to the client: %s", zmsg)
241                 clients.response(zmsg)
242             for clntsock, clntaddr in topoll:
243                 fd = clients.add(clntsock, clntaddr)
244                 poller.register(fd, flags=zmq.POLLIN)
245     except KeyboardInterrupt:
246         pass
247
248
249 if __name__.endswith("__main__"):
250     runserver(common.init(log))