]> www.average.org Git - loctrkd.git/blob - gps303/collector.py
formatting: revive black formatting
[loctrkd.git] / gps303 / collector.py
1 """ TCP server that communicates with terminals """
2
3 from configparser import ConfigParser
4 from logging import getLogger
5 from os import umask
6 from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
7 from struct import pack
8 from time import time
9 from typing import Dict, List, Optional, Tuple
10 import zmq
11
12 from . import common
13 from .gps303proto import (
14     HIBERNATION,
15     LOGIN,
16     inline_response,
17     parse_message,
18     proto_of_message,
19 )
20 from .zmsg import Bcast, Resp
21
22 log = getLogger("gps303/collector")
23
24
25 class Client:
26     """Connected socket to the terminal plus buffer and metadata"""
27
28     def __init__(self, sock: socket, addr: Tuple[str, int]) -> None:
29         self.sock = sock
30         self.addr = addr
31         self.buffer = b""
32         self.imei: Optional[str] = None
33
34     def close(self) -> None:
35         log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei)
36         self.sock.close()
37         self.buffer = b""
38
39     def recv(self) -> Optional[List[Tuple[float, Tuple[str, int], bytes]]]:
40         """Read from the socket and parse complete messages"""
41         try:
42             segment = self.sock.recv(4096)
43         except OSError as e:
44             log.warning(
45                 "Reading from fd %d (IMEI %s): %s",
46                 self.sock.fileno(),
47                 self.imei,
48                 e,
49             )
50             return None
51         if not segment:  # Terminal has closed connection
52             log.info(
53                 "EOF reading from fd %d (IMEI %s)",
54                 self.sock.fileno(),
55                 self.imei,
56             )
57             return None
58         when = time()
59         self.buffer += segment
60         msgs = []
61         while True:
62             framestart = self.buffer.find(b"xx")
63             if framestart == -1:  # No frames, return whatever we have
64                 break
65             if framestart > 0:  # Should not happen, report
66                 log.warning(
67                     'Undecodable data "%s" from fd %d (IMEI %s)',
68                     self.buffer[:framestart].hex(),
69                     self.sock.fileno(),
70                     self.imei,
71                 )
72                 self.buffer = self.buffer[framestart:]
73             # At this point, buffer starts with a packet
74             if len(self.buffer) < 6:  # no len and proto - cannot proceed
75                 break
76             exp_end = self.buffer[2] + 3  # Expect '\r\n' here
77             frameend = 0
78             # Length field can legitimeely be much less than the
79             # length of the packet (e.g. WiFi positioning), but
80             # it _should not_ be greater. Still sometimes it is.
81             # Luckily, not by too much: by maybe two or three bytes?
82             # Do this embarrassing hack to avoid accidental match
83             # of some binary data in the packet against '\r\n'.
84             while True:
85                 frameend = self.buffer.find(b"\r\n", frameend)
86                 if frameend >= (exp_end - 3):  # Found realistic match
87                     break
88             if frameend == -1:  # Incomplete frame, return what we have
89                 break
90             packet = self.buffer[2:frameend]
91             self.buffer = self.buffer[frameend + 2 :]
92             if proto_of_message(packet) == LOGIN.PROTO:
93                 self.imei = parse_message(packet).imei
94                 log.info(
95                     "LOGIN from fd %d (IMEI %s)", self.sock.fileno(), self.imei
96                 )
97             msgs.append((when, self.addr, packet))
98         return msgs
99
100     def send(self, buffer: bytes) -> None:
101         try:
102             self.sock.send(b"xx" + buffer + b"\r\n")
103         except OSError as e:
104             log.error(
105                 "Sending to fd %d (IMEI %s): %s",
106                 self.sock.fileno,
107                 self.imei,
108                 e,
109             )
110
111
112 class Clients:
113     def __init__(self) -> None:
114         self.by_fd: Dict[int, Client] = {}
115         self.by_imei: Dict[str, Client] = {}
116
117     def add(self, clntsock: socket, clntaddr: Tuple[str, int]) -> int:
118         fd = clntsock.fileno()
119         log.info("Start serving fd %d from %s", fd, clntaddr)
120         self.by_fd[fd] = Client(clntsock, clntaddr)
121         return fd
122
123     def stop(self, fd: int) -> None:
124         clnt = self.by_fd[fd]
125         log.info("Stop serving fd %d (IMEI %s)", clnt.sock.fileno(), clnt.imei)
126         clnt.close()
127         if clnt.imei:
128             del self.by_imei[clnt.imei]
129         del self.by_fd[fd]
130
131     def recv(
132         self, fd: int
133     ) -> Optional[List[Tuple[Optional[str], float, Tuple[str, int], bytes]]]:
134         clnt = self.by_fd[fd]
135         msgs = clnt.recv()
136         if msgs is None:
137             return None
138         result = []
139         for when, peeraddr, packet in msgs:
140             if proto_of_message(packet) == LOGIN.PROTO:  # Could do blindly...
141                 if clnt.imei:
142                     self.by_imei[clnt.imei] = clnt
143                 else:
144                     log.warning(
145                         "Login message from %s: %s, but client imei unfilled",
146                         peeraddr,
147                         packet,
148                     )
149             result.append((clnt.imei, when, peeraddr, packet))
150             log.debug(
151                 "Received from %s (IMEI %s): %s",
152                 peeraddr,
153                 clnt.imei,
154                 packet.hex(),
155             )
156         return result
157
158     def response(self, resp: Resp) -> None:
159         if resp.imei in self.by_imei:
160             self.by_imei[resp.imei].send(resp.packet)
161         else:
162             log.info("Not connected (IMEI %s)", resp.imei)
163
164
165 def runserver(conf: ConfigParser) -> None:
166     # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
167     zctx = zmq.Context()  # type: ignore
168     zpub = zctx.socket(zmq.PUB)  # type: ignore
169     zpull = zctx.socket(zmq.PULL)  # type: ignore
170     oldmask = umask(0o117)
171     zpub.bind(conf.get("collector", "publishurl"))
172     zpull.bind(conf.get("collector", "listenurl"))
173     umask(oldmask)
174     tcpl = socket(AF_INET6, SOCK_STREAM)
175     tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
176     tcpl.bind(("", conf.getint("collector", "port")))
177     tcpl.listen(5)
178     tcpfd = tcpl.fileno()
179     poller = zmq.Poller()  # type: ignore
180     poller.register(zpull, flags=zmq.POLLIN)
181     poller.register(tcpfd, flags=zmq.POLLIN)
182     clients = Clients()
183     try:
184         while True:
185             tosend = []
186             topoll = []
187             tostop = []
188             events = poller.poll(1000)
189             for sk, fl in events:
190                 if sk is zpull:
191                     while True:
192                         try:
193                             msg = zpull.recv(zmq.NOBLOCK)
194                             zmsg = Resp(msg)
195                             tosend.append(zmsg)
196                         except zmq.Again:
197                             break
198                 elif sk == tcpfd:
199                     clntsock, clntaddr = tcpl.accept()
200                     topoll.append((clntsock, clntaddr))
201                 elif fl & zmq.POLLIN:
202                     received = clients.recv(sk)
203                     if received is None:
204                         log.debug("Terminal gone from fd %d", sk)
205                         tostop.append(sk)
206                     else:
207                         for imei, when, peeraddr, packet in received:
208                             proto = proto_of_message(packet)
209                             zpub.send(
210                                 Bcast(
211                                     proto=proto,
212                                     imei=imei,
213                                     when=when,
214                                     peeraddr=peeraddr,
215                                     packet=packet,
216                                 ).packed
217                             )
218                             if proto == HIBERNATION.PROTO:
219                                 log.debug(
220                                     "HIBERNATION from fd %d (IMEI %s)",
221                                     sk,
222                                     imei,
223                                 )
224                                 tostop.append(sk)
225                             respmsg = inline_response(packet)
226                             if respmsg is not None:
227                                 tosend.append(
228                                     Resp(imei=imei, when=when, packet=respmsg)
229                                 )
230                 else:
231                     log.debug("Stray event: %s on socket %s", fl, sk)
232             # poll queue consumed, make changes now
233             for fd in tostop:
234                 poller.unregister(fd)  # type: ignore
235                 clients.stop(fd)
236             for zmsg in tosend:
237                 zpub.send(
238                     Bcast(
239                         is_incoming=False,
240                         proto=proto_of_message(zmsg.packet),
241                         when=zmsg.when,
242                         imei=zmsg.imei,
243                         packet=zmsg.packet,
244                     ).packed
245                 )
246                 log.debug("Sending to the client: %s", zmsg)
247                 clients.response(zmsg)
248             for clntsock, clntaddr in topoll:
249                 fd = clients.add(clntsock, clntaddr)
250                 poller.register(fd, flags=zmq.POLLIN)
251     except KeyboardInterrupt:
252         pass
253
254
255 if __name__.endswith("__main__"):
256     runserver(common.init(log))