]> www.average.org Git - loctrkd.git/blob - loctrkd/collector.py
26d37f33f60af39a123c8aafa043c50244125ef0
[loctrkd.git] / loctrkd / collector.py
1 """ TCP server that communicates with terminals """
2
3 from configparser import ConfigParser
4 from importlib import import_module
5 from logging import getLogger
6 from os import umask
7 from socket import (
8     socket,
9     AF_INET6,
10     SOCK_STREAM,
11     SOL_SOCKET,
12     SO_KEEPALIVE,
13     SO_REUSEADDR,
14 )
15 from struct import pack
16 from time import time
17 from typing import Any, cast, Dict, List, Optional, Set, Tuple, Union
18 import zmq
19
20 from . import common
21 from .zmsg import Bcast, Resp
22
23 log = getLogger("loctrkd/collector")
24
25 MAXBUFFER: int = 4096
26
27
28 class ProtoModule:
29     class Stream:
30         def recv(self, segment: bytes) -> List[Union[bytes, str]]:
31             ...
32
33         def close(self) -> bytes:
34             ...
35
36     @staticmethod
37     def enframe(buffer: bytes, imei: Optional[str] = None) -> bytes:
38         ...
39
40     @staticmethod
41     def probe_buffer(buffer: bytes) -> bool:
42         ...
43
44     @staticmethod
45     def parse_message(packet: bytes, is_incoming: bool = True) -> Any:
46         ...
47
48     @staticmethod
49     def inline_response(packet: bytes) -> Optional[bytes]:
50         ...
51
52     @staticmethod
53     def is_goodbye_packet(packet: bytes) -> bool:
54         ...
55
56     @staticmethod
57     def imei_from_packet(packet: bytes) -> Optional[str]:
58         ...
59
60     @staticmethod
61     def proto_of_message(packet: bytes) -> str:
62         ...
63
64
65 pmods: List[ProtoModule] = []
66
67
68 class Client:
69     """Connected socket to the terminal plus buffer and metadata"""
70
71     def __init__(self, sock: socket, addr: Tuple[str, int]) -> None:
72         self.sock = sock
73         self.addr = addr
74         self.pmod: Optional[ProtoModule] = None
75         self.stream: Optional[ProtoModule.Stream] = None
76         self.imei: Optional[str] = None
77
78     def close(self) -> None:
79         log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei)
80         self.sock.close()
81         if self.stream:
82             rest = self.stream.close()
83         else:
84             rest = b""
85         if rest:
86             log.warning(
87                 "%d bytes in buffer on close: %s", len(rest), rest[:64].hex()
88             )
89
90     def recv(self) -> Optional[List[Tuple[float, Tuple[str, int], bytes]]]:
91         """Read from the socket and parse complete messages"""
92         try:
93             segment = self.sock.recv(MAXBUFFER)
94         except OSError as e:
95             log.warning(
96                 "Reading from fd %d (IMEI %s): %s",
97                 self.sock.fileno(),
98                 self.imei,
99                 e,
100             )
101             return None
102         if not segment:  # Terminal has closed connection
103             log.info(
104                 "EOF reading from fd %d (IMEI %s)",
105                 self.sock.fileno(),
106                 self.imei,
107             )
108             return None
109         if self.stream is None:
110             for pmod in pmods:
111                 if pmod.probe_buffer(segment):
112                     self.pmod = pmod
113                     self.stream = pmod.Stream()
114                     break
115         if self.stream is None:
116             log.info(
117                 "unrecognizable %d bytes of data %s from fd %d",
118                 len(segment),
119                 segment[:32].hex(),
120                 self.sock.fileno(),
121             )
122             return []
123         when = time()
124         msgs = []
125         for elem in self.stream.recv(segment):
126             if isinstance(elem, bytes):
127                 msgs.append((when, self.addr, elem))
128             else:
129                 log.warning(
130                     "%s from fd %d (IMEI %s)",
131                     elem,
132                     self.sock.fileno(),
133                     self.imei,
134                 )
135         return msgs
136
137     def send(self, buffer: bytes) -> None:
138         assert self.stream is not None and self.pmod is not None
139         try:
140             self.sock.send(self.pmod.enframe(buffer, imei=self.imei))
141         except OSError as e:
142             log.error(
143                 "Sending to fd %d (IMEI %s): %s",
144                 self.sock.fileno(),
145                 self.imei,
146                 e,
147             )
148
149
150 class Clients:
151     def __init__(self) -> None:
152         self.by_fd: Dict[int, Client] = {}
153         self.by_imei: Dict[str, Client] = {}
154         self.tostop: Set[int] = set()
155
156     def add(self, clntsock: socket, clntaddr: Tuple[str, int]) -> int:
157         fd = clntsock.fileno()
158         log.info("Start serving fd %d from %s", fd, clntaddr)
159         self.by_fd[fd] = Client(clntsock, clntaddr)
160         return fd
161
162     def stop(self, fd: int) -> None:
163         clnt = self.by_fd[fd]
164         log.info("Stop serving fd %d (IMEI %s)", clnt.sock.fileno(), clnt.imei)
165         clnt.close()
166         if clnt.imei and self.by_imei[clnt.imei] == clnt:  # could be replaced
167             del self.by_imei[clnt.imei]
168         del self.by_fd[fd]
169
170     def recv(
171         self, fd: int
172     ) -> Optional[
173         List[Tuple[ProtoModule, Optional[str], float, Tuple[str, int], bytes]]
174     ]:
175         clnt = self.by_fd[fd]
176         msgs = clnt.recv()
177         if msgs is None:
178             return None
179         result = []
180         for when, peeraddr, packet in msgs:
181             assert clnt.pmod is not None
182             if clnt.imei is None:
183                 imei = clnt.pmod.imei_from_packet(packet)
184                 if imei is not None:
185                     log.info("LOGIN from fd %d (IMEI %s)", fd, imei)
186                     clnt.imei = imei
187                     oldclnt = self.by_imei.get(clnt.imei)
188                     if oldclnt is not None:
189                         oldfd = oldclnt.sock.fileno()
190                         log.info("Removing stale connection on fd %d", oldfd)
191                         oldclnt.imei = None
192                         self.tostop.add(oldfd)
193                     self.by_imei[clnt.imei] = clnt
194             result.append((clnt.pmod, clnt.imei, when, peeraddr, packet))
195             log.debug(
196                 "Received from %s (IMEI %s): %s",
197                 peeraddr,
198                 clnt.imei,
199                 packet.hex(),
200             )
201         return result
202
203     def response(self, resp: Resp) -> Optional[ProtoModule]:
204         if resp.imei in self.by_imei:
205             clnt = self.by_imei[resp.imei]
206             clnt.send(resp.packet)
207             return clnt.pmod
208         else:
209             log.info("Not connected (IMEI %s)", resp.imei)
210             return None
211
212
213 def runserver(conf: ConfigParser, handle_hibernate: bool = True) -> None:
214     global pmods
215     pmods = [
216         cast(ProtoModule, import_module("." + modnm, __package__))
217         for modnm in conf.get("collector", "protocols").split(",")
218     ]
219     # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
220     zctx = zmq.Context()  # type: ignore
221     zpub = zctx.socket(zmq.PUB)  # type: ignore
222     zpull = zctx.socket(zmq.PULL)  # type: ignore
223     oldmask = umask(0o117)
224     zpub.bind(conf.get("collector", "publishurl"))
225     zpull.bind(conf.get("collector", "listenurl"))
226     umask(oldmask)
227     tcpl = socket(AF_INET6, SOCK_STREAM)
228     tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
229     tcpl.bind(("", conf.getint("collector", "port")))
230     tcpl.listen(5)
231     tcpfd = tcpl.fileno()
232     poller = zmq.Poller()  # type: ignore
233     poller.register(zpull, flags=zmq.POLLIN)
234     poller.register(tcpfd, flags=zmq.POLLIN)
235     clients = Clients()
236     try:
237         while True:
238             tosend = []
239             topoll = []
240             clients.tostop = set()
241             events = poller.poll(1000)
242             for sk, fl in events:
243                 if sk is zpull:
244                     while True:
245                         try:
246                             msg = zpull.recv(zmq.NOBLOCK)
247                             zmsg = Resp(msg)
248                             tosend.append(zmsg)
249                         except zmq.Again:
250                             break
251                 elif sk == tcpfd:
252                     clntsock, clntaddr = tcpl.accept()
253                     clntsock.setsockopt(SOL_SOCKET, SO_KEEPALIVE, 1)
254                     topoll.append((clntsock, clntaddr))
255                 elif fl & zmq.POLLIN:
256                     received = clients.recv(sk)
257                     if received is None:
258                         log.debug("Terminal gone from fd %d", sk)
259                         clients.tostop.add(sk)
260                     else:
261                         for pmod, imei, when, peeraddr, packet in received:
262                             proto = pmod.proto_of_message(packet)
263                             zpub.send(
264                                 Bcast(
265                                     proto=proto,
266                                     imei=imei,
267                                     when=when,
268                                     peeraddr=peeraddr,
269                                     packet=packet,
270                                 ).packed
271                             )
272                             if (
273                                 pmod.is_goodbye_packet(packet)
274                                 and handle_hibernate
275                             ):
276                                 log.debug(
277                                     "Goodbye from fd %d (IMEI %s)",
278                                     sk,
279                                     imei,
280                                 )
281                                 clients.tostop.add(sk)
282                             respmsg = pmod.inline_response(packet)
283                             if respmsg is not None:
284                                 tosend.append(
285                                     Resp(imei=imei, when=when, packet=respmsg)
286                                 )
287                 else:
288                     log.debug("Stray event: %s on socket %s", fl, sk)
289             # poll queue consumed, make changes now
290             for zmsg in tosend:
291                 log.debug("Sending to the client: %s", zmsg)
292                 rpmod = clients.response(zmsg)
293                 if rpmod is not None:
294                     zpub.send(
295                         Bcast(
296                             is_incoming=False,
297                             proto=rpmod.proto_of_message(zmsg.packet),
298                             when=zmsg.when,
299                             imei=zmsg.imei,
300                             packet=zmsg.packet,
301                         ).packed
302                     )
303             for fd in clients.tostop:
304                 poller.unregister(fd)  # type: ignore
305                 clients.stop(fd)
306             for clntsock, clntaddr in topoll:
307                 fd = clients.add(clntsock, clntaddr)
308                 poller.register(fd, flags=zmq.POLLIN)
309     except KeyboardInterrupt:
310         zpub.close()
311         zpull.close()
312         zctx.destroy()  # type: ignore
313         tcpl.close()
314
315
316 if __name__.endswith("__main__"):
317     runserver(common.init(log))