]> www.average.org Git - loctrkd.git/blob - loctrkd/collector.py
788cb11ca1ee8b186838ea1ef882a5b6faf4ea83
[loctrkd.git] / loctrkd / collector.py
1 """ TCP server that communicates with terminals """
2
3 from configparser import ConfigParser
4 from importlib import import_module
5 from logging import getLogger
6 from os import umask
7 from socket import (
8     socket,
9     AF_INET6,
10     SOCK_STREAM,
11     SOL_SOCKET,
12     SO_KEEPALIVE,
13     SO_REUSEADDR,
14 )
15 from struct import pack
16 from time import time
17 from typing import Any, Dict, List, Optional, Set, Tuple, Union
18 import zmq
19
20 from . import common
21 from .protomodule import ProtoModule
22 from .zmsg import Bcast, Resp
23
24 log = getLogger("loctrkd/collector")
25
26 MAXBUFFER: int = 4096
27
28
29 class Client:
30     """Connected socket to the terminal plus buffer and metadata"""
31
32     def __init__(self, sock: socket, addr: Any) -> None:
33         self.sock = sock
34         self.addr = addr
35         self.pmod: Optional[ProtoModule] = None
36         self.stream: Optional[ProtoModule.Stream] = None
37         self.imei: Optional[str] = None
38
39     def close(self) -> None:
40         log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei)
41         self.sock.close()
42         if self.stream:
43             rest = self.stream.close()
44         else:
45             rest = b""
46         if rest:
47             log.warning(
48                 "%d bytes in buffer on close: %s", len(rest), rest[:64].hex()
49             )
50
51     def recv(self) -> Optional[List[Tuple[float, Any, bytes]]]:
52         """Read from the socket and parse complete messages"""
53         try:
54             segment = self.sock.recv(MAXBUFFER)
55         except OSError as e:
56             log.warning(
57                 "Reading from fd %d (IMEI %s): %s",
58                 self.sock.fileno(),
59                 self.imei,
60                 e,
61             )
62             return None
63         if not segment:  # Terminal has closed connection
64             log.info(
65                 "EOF reading from fd %d (IMEI %s)",
66                 self.sock.fileno(),
67                 self.imei,
68             )
69             return None
70         if self.stream is None:
71             self.pmod = common.probe_pmod(segment)
72             if self.pmod is not None:
73                 self.stream = self.pmod.Stream()
74         if self.stream is None:
75             log.info(
76                 "unrecognizable %d bytes of data %s from fd %d",
77                 len(segment),
78                 segment[:32].hex(),
79                 self.sock.fileno(),
80             )
81             return []
82         when = time()
83         msgs = []
84         for elem in self.stream.recv(segment):
85             if isinstance(elem, bytes):
86                 msgs.append((when, self.addr, elem))
87             else:
88                 log.warning(
89                     "%s from fd %d (IMEI %s)",
90                     elem,
91                     self.sock.fileno(),
92                     self.imei,
93                 )
94         return msgs
95
96     def send(self, buffer: bytes) -> None:
97         assert self.stream is not None and self.pmod is not None
98         try:
99             self.sock.send(self.pmod.enframe(buffer, imei=self.imei))
100         except OSError as e:
101             log.error(
102                 "Sending to fd %d (IMEI %s): %s",
103                 self.sock.fileno(),
104                 self.imei,
105                 e,
106             )
107
108
109 class Clients:
110     def __init__(self) -> None:
111         self.by_fd: Dict[int, Client] = {}
112         self.by_imei: Dict[str, Client] = {}
113
114     def fds(self) -> Set[int]:
115         return set(self.by_fd.keys())
116
117     def add(self, clntsock: socket, clntaddr: Any) -> int:
118         fd = clntsock.fileno()
119         log.info("Start serving fd %d from %s", fd, clntaddr)
120         self.by_fd[fd] = Client(clntsock, clntaddr)
121         return fd
122
123     def stop(self, fd: int) -> None:
124         if fd not in self.by_fd:
125             log.debug("Fd %d is not served, ingore stop", fd)
126             return
127         clnt = self.by_fd[fd]
128         log.info("Stop serving fd %d (IMEI %s)", clnt.sock.fileno(), clnt.imei)
129         clnt.close()
130         if clnt.imei and self.by_imei[clnt.imei] == clnt:  # could be replaced
131             del self.by_imei[clnt.imei]
132         del self.by_fd[fd]
133
134     def recv(
135         self, fd: int
136     ) -> Optional[List[Tuple[ProtoModule, Optional[str], float, Any, bytes]]]:
137         if fd not in self.by_fd:
138             log.debug("Client at fd %d gone, ingore event", fd)
139             return None
140         clnt = self.by_fd[fd]
141         msgs = clnt.recv()
142         if msgs is None:
143             return None
144         result = []
145         for when, peeraddr, packet in msgs:
146             assert clnt.pmod is not None
147             if clnt.imei is None:
148                 imei = clnt.pmod.imei_from_packet(packet)
149                 if imei is not None:
150                     log.info("LOGIN from fd %d (IMEI %s)", fd, imei)
151                     clnt.imei = imei
152                     oldclnt = self.by_imei.get(clnt.imei)
153                     if oldclnt is not None:
154                         oldfd = oldclnt.sock.fileno()
155                         log.info("Removing stale connection on fd %d", oldfd)
156                         oldclnt.imei = None
157                         self.stop(oldfd)
158                     self.by_imei[clnt.imei] = clnt
159             result.append((clnt.pmod, clnt.imei, when, peeraddr, packet))
160             log.debug(
161                 "Received from %s (IMEI %s): %s",
162                 peeraddr,
163                 clnt.imei,
164                 packet.hex(),
165             )
166         return result
167
168     def response(self, resp: Resp) -> Optional[ProtoModule]:
169         if resp.imei in self.by_imei:
170             clnt = self.by_imei[resp.imei]
171             clnt.send(resp.packet)
172             return clnt.pmod
173         else:
174             log.info("Not connected (IMEI %s)", resp.imei)
175             return None
176
177
178 def runserver(conf: ConfigParser, handle_hibernate: bool = True) -> None:
179     # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
180     zctx = zmq.Context()  # type: ignore
181     zpub = zctx.socket(zmq.PUB)  # type: ignore
182     zpull = zctx.socket(zmq.PULL)  # type: ignore
183     oldmask = umask(0o117)
184     zpub.bind(conf.get("collector", "publishurl"))
185     zpull.bind(conf.get("collector", "listenurl"))
186     umask(oldmask)
187     tcpl = socket(AF_INET6, SOCK_STREAM)
188     tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
189     tcpl.bind(("", conf.getint("collector", "port")))
190     tcpl.listen(5)
191     tcpfd = tcpl.fileno()
192     poller = zmq.Poller()  # type: ignore
193     poller.register(zpull, flags=zmq.POLLIN)
194     poller.register(tcpfd, flags=zmq.POLLIN)
195     clients = Clients()
196     pollingfds: Set[int] = set()
197     try:
198         while True:
199             tosend: List[Resp] = []
200             toadd: List[Tuple[socket, Any]] = []
201             events = poller.poll(1000)
202             for sk, fl in events:
203                 if sk is zpull:
204                     while True:
205                         try:
206                             msg = zpull.recv(zmq.NOBLOCK)
207                             zmsg = Resp(msg)
208                             tosend.append(zmsg)
209                         except zmq.Again:
210                             break
211                 elif sk == tcpfd:
212                     clntsock, clntaddr = tcpl.accept()
213                     clntsock.setsockopt(SOL_SOCKET, SO_KEEPALIVE, 1)
214                     toadd.append((clntsock, clntaddr))
215                 elif fl & zmq.POLLIN:
216                     received = clients.recv(sk)
217                     if received is None:
218                         log.debug("Terminal gone from fd %d", sk)
219                         clients.stop(sk)
220                     else:
221                         for pmod, imei, when, peeraddr, packet in received:
222                             proto = pmod.proto_of_message(packet)
223                             zpub.send(
224                                 Bcast(
225                                     proto=proto,
226                                     imei=imei,
227                                     when=when,
228                                     peeraddr=peeraddr,
229                                     packet=packet,
230                                 ).packed
231                             )
232                             if (
233                                 pmod.is_goodbye_packet(packet)
234                                 and handle_hibernate
235                             ):
236                                 log.debug(
237                                     "Goodbye from fd %d (IMEI %s)",
238                                     sk,
239                                     imei,
240                                 )
241                                 clients.stop(sk)
242                             respmsg = pmod.inline_response(packet)
243                             if respmsg is not None:
244                                 tosend.append(
245                                     Resp(imei=imei, when=when, packet=respmsg)
246                                 )
247                 else:
248                     log.debug("Stray event: %s on socket %s", fl, sk)
249             # poll queue consumed, make changes now
250             for zmsg in tosend:
251                 log.debug("Sending to the client: %s", zmsg)
252                 rpmod = clients.response(zmsg)
253                 if rpmod is not None:
254                     zpub.send(
255                         Bcast(
256                             is_incoming=False,
257                             proto=rpmod.proto_of_message(zmsg.packet),
258                             when=zmsg.when,
259                             imei=zmsg.imei,
260                             packet=zmsg.packet,
261                         ).packed
262                     )
263             for fd in pollingfds - clients.fds():
264                 poller.unregister(fd)  # type: ignore
265             for clntsock, clntaddr in toadd:
266                 fd = clients.add(clntsock, clntaddr)
267             for fd in clients.fds() - pollingfds:
268                 poller.register(fd, flags=zmq.POLLIN)
269             pollingfds = clients.fds()
270     except KeyboardInterrupt:
271         zpub.close()
272         zpull.close()
273         zctx.destroy()  # type: ignore
274         tcpl.close()
275
276
277 if __name__.endswith("__main__"):
278     runserver(common.init(log))