]> www.average.org Git - loctrkd.git/blob - loctrkd/collector.py
136ecba99aeeb602ee85028caadc0caa0caf744d
[loctrkd.git] / loctrkd / collector.py
1 """ TCP server that communicates with terminals """
2
3 from configparser import ConfigParser
4 from importlib import import_module
5 from logging import getLogger
6 from os import umask
7 from socket import (
8     socket,
9     AF_INET6,
10     SOCK_STREAM,
11     SOL_SOCKET,
12     SO_KEEPALIVE,
13     SO_REUSEADDR,
14 )
15 from struct import pack
16 from time import time
17 from typing import Any, cast, Dict, List, Optional, Tuple, Union
18 import zmq
19
20 from . import common
21 from .zmsg import Bcast, Resp
22
23 log = getLogger("loctrkd/collector")
24
25 MAXBUFFER: int = 4096
26
27
28 class ProtoModule:
29     class Stream:
30         @staticmethod
31         def enframe(buffer: bytes, imei: Optional[str] = None) -> bytes:
32             ...
33
34         def recv(self, segment: bytes) -> List[Union[bytes, str]]:
35             ...
36
37         def close(self) -> bytes:
38             ...
39
40     @staticmethod
41     def probe_buffer(buffer: bytes) -> bool:
42         ...
43
44     @staticmethod
45     def parse_message(packet: bytes, is_incoming: bool = True) -> Any:
46         ...
47
48     @staticmethod
49     def inline_response(packet: bytes) -> Optional[bytes]:
50         ...
51
52     @staticmethod
53     def is_goodbye_packet(packet: bytes) -> bool:
54         ...
55
56     @staticmethod
57     def imei_from_packet(packet: bytes) -> Optional[str]:
58         ...
59
60     @staticmethod
61     def proto_of_message(packet: bytes) -> str:
62         ...
63
64     @staticmethod
65     def proto_by_name(name: str) -> int:
66         ...
67
68
69 pmods: List[ProtoModule] = []
70
71
72 class Client:
73     """Connected socket to the terminal plus buffer and metadata"""
74
75     def __init__(self, sock: socket, addr: Tuple[str, int]) -> None:
76         self.sock = sock
77         self.addr = addr
78         self.pmod: Optional[ProtoModule] = None
79         self.stream: Optional[ProtoModule.Stream] = None
80         self.imei: Optional[str] = None
81
82     def close(self) -> None:
83         log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei)
84         self.sock.close()
85         if self.stream:
86             rest = self.stream.close()
87         else:
88             rest = b""
89         if rest:
90             log.warning(
91                 "%d bytes in buffer on close: %s", len(rest), rest[:64].hex()
92             )
93
94     def recv(self) -> Optional[List[Tuple[float, Tuple[str, int], bytes]]]:
95         """Read from the socket and parse complete messages"""
96         try:
97             segment = self.sock.recv(MAXBUFFER)
98         except OSError as e:
99             log.warning(
100                 "Reading from fd %d (IMEI %s): %s",
101                 self.sock.fileno(),
102                 self.imei,
103                 e,
104             )
105             return None
106         if not segment:  # Terminal has closed connection
107             log.info(
108                 "EOF reading from fd %d (IMEI %s)",
109                 self.sock.fileno(),
110                 self.imei,
111             )
112             return None
113         if self.stream is None:
114             for pmod in pmods:
115                 if pmod.probe_buffer(segment):
116                     self.pmod = pmod
117                     self.stream = pmod.Stream()
118                     break
119         if self.stream is None:
120             log.info(
121                 "unrecognizable %d bytes of data %s from fd %d",
122                 len(segment),
123                 segment[:32].hex(),
124                 self.sock.fileno(),
125             )
126             return []
127         when = time()
128         msgs = []
129         for elem in self.stream.recv(segment):
130             if isinstance(elem, bytes):
131                 msgs.append((when, self.addr, elem))
132             else:
133                 log.warning(
134                     "%s from fd %d (IMEI %s)",
135                     elem,
136                     self.sock.fileno(),
137                     self.imei,
138                 )
139         return msgs
140
141     def send(self, buffer: bytes) -> None:
142         assert self.stream is not None
143         try:
144             self.sock.send(self.stream.enframe(buffer, imei=self.imei))
145         except OSError as e:
146             log.error(
147                 "Sending to fd %d (IMEI %s): %s",
148                 self.sock.fileno(),
149                 self.imei,
150                 e,
151             )
152
153
154 class Clients:
155     def __init__(self) -> None:
156         self.by_fd: Dict[int, Client] = {}
157         self.by_imei: Dict[str, Client] = {}
158
159     def add(self, clntsock: socket, clntaddr: Tuple[str, int]) -> int:
160         fd = clntsock.fileno()
161         log.info("Start serving fd %d from %s", fd, clntaddr)
162         self.by_fd[fd] = Client(clntsock, clntaddr)
163         return fd
164
165     def stop(self, fd: int) -> None:
166         clnt = self.by_fd[fd]
167         log.info("Stop serving fd %d (IMEI %s)", clnt.sock.fileno(), clnt.imei)
168         clnt.close()
169         if clnt.imei:
170             del self.by_imei[clnt.imei]
171         del self.by_fd[fd]
172
173     def recv(
174         self, fd: int
175     ) -> Optional[
176         List[Tuple[ProtoModule, Optional[str], float, Tuple[str, int], bytes]]
177     ]:
178         clnt = self.by_fd[fd]
179         msgs = clnt.recv()
180         if msgs is None:
181             return None
182         result = []
183         for when, peeraddr, packet in msgs:
184             assert clnt.pmod is not None
185             if clnt.imei is None:
186                 imei = clnt.pmod.imei_from_packet(packet)
187                 if imei is not None:
188                     log.info("LOGIN from fd %d (IMEI %s)", fd, imei)
189                     clnt.imei = imei
190                     oldclnt = self.by_imei.get(clnt.imei)
191                     if oldclnt is not None:
192                         log.info(
193                             "Orphaning fd %d with the same IMEI",
194                             oldclnt.sock.fileno(),
195                         )
196                         oldclnt.imei = None
197                     self.by_imei[clnt.imei] = clnt
198                 else:
199                     log.warning(
200                         "Login message from %s: %s, but client imei unfilled",
201                         peeraddr,
202                         packet,
203                     )
204             result.append((clnt.pmod, clnt.imei, when, peeraddr, packet))
205             log.debug(
206                 "Received from %s (IMEI %s): %s",
207                 peeraddr,
208                 clnt.imei,
209                 packet.hex(),
210             )
211         return result
212
213     def response(self, resp: Resp) -> Optional[ProtoModule]:
214         if resp.imei in self.by_imei:
215             clnt = self.by_imei[resp.imei]
216             clnt.send(resp.packet)
217             return clnt.pmod
218         else:
219             log.info("Not connected (IMEI %s)", resp.imei)
220             return None
221
222
223 def runserver(conf: ConfigParser, handle_hibernate: bool = True) -> None:
224     global pmods
225     pmods = [
226         cast(ProtoModule, import_module("." + modnm, __package__))
227         for modnm in conf.get("collector", "protocols").split(",")
228     ]
229     # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
230     zctx = zmq.Context()  # type: ignore
231     zpub = zctx.socket(zmq.PUB)  # type: ignore
232     zpull = zctx.socket(zmq.PULL)  # type: ignore
233     oldmask = umask(0o117)
234     zpub.bind(conf.get("collector", "publishurl"))
235     zpull.bind(conf.get("collector", "listenurl"))
236     umask(oldmask)
237     tcpl = socket(AF_INET6, SOCK_STREAM)
238     tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
239     tcpl.bind(("", conf.getint("collector", "port")))
240     tcpl.listen(5)
241     tcpfd = tcpl.fileno()
242     poller = zmq.Poller()  # type: ignore
243     poller.register(zpull, flags=zmq.POLLIN)
244     poller.register(tcpfd, flags=zmq.POLLIN)
245     clients = Clients()
246     try:
247         while True:
248             tosend = []
249             topoll = []
250             tostop = []
251             events = poller.poll(1000)
252             for sk, fl in events:
253                 if sk is zpull:
254                     while True:
255                         try:
256                             msg = zpull.recv(zmq.NOBLOCK)
257                             zmsg = Resp(msg)
258                             tosend.append(zmsg)
259                         except zmq.Again:
260                             break
261                 elif sk == tcpfd:
262                     clntsock, clntaddr = tcpl.accept()
263                     clntsock.setsockopt(SOL_SOCKET, SO_KEEPALIVE, 1)
264                     topoll.append((clntsock, clntaddr))
265                 elif fl & zmq.POLLIN:
266                     received = clients.recv(sk)
267                     if received is None:
268                         log.debug("Terminal gone from fd %d", sk)
269                         tostop.append(sk)
270                     else:
271                         for pmod, imei, when, peeraddr, packet in received:
272                             proto = pmod.proto_of_message(packet)
273                             zpub.send(
274                                 Bcast(
275                                     proto=proto,
276                                     imei=imei,
277                                     when=when,
278                                     peeraddr=peeraddr,
279                                     packet=packet,
280                                 ).packed
281                             )
282                             if (
283                                 pmod.is_goodbye_packet(packet)
284                                 and handle_hibernate
285                             ):
286                                 log.debug(
287                                     "Goodbye from fd %d (IMEI %s)",
288                                     sk,
289                                     imei,
290                                 )
291                                 tostop.append(sk)
292                             respmsg = pmod.inline_response(packet)
293                             if respmsg is not None:
294                                 tosend.append(
295                                     Resp(imei=imei, when=when, packet=respmsg)
296                                 )
297                 else:
298                     log.debug("Stray event: %s on socket %s", fl, sk)
299             # poll queue consumed, make changes now
300             for zmsg in tosend:
301                 log.debug("Sending to the client: %s", zmsg)
302                 rpmod = clients.response(zmsg)
303                 if rpmod is not None:
304                     zpub.send(
305                         Bcast(
306                             is_incoming=False,
307                             proto=rpmod.proto_of_message(zmsg.packet),
308                             when=zmsg.when,
309                             imei=zmsg.imei,
310                             packet=zmsg.packet,
311                         ).packed
312                     )
313             for fd in tostop:
314                 poller.unregister(fd)  # type: ignore
315                 clients.stop(fd)
316             for clntsock, clntaddr in topoll:
317                 fd = clients.add(clntsock, clntaddr)
318                 poller.register(fd, flags=zmq.POLLIN)
319     except KeyboardInterrupt:
320         zpub.close()
321         zpull.close()
322         zctx.destroy()  # type: ignore
323         tcpl.close()
324
325
326 if __name__.endswith("__main__"):
327     runserver(common.init(log))