1 """ TCP server that communicates with terminals """
3 from configparser import ConfigParser
4 from importlib import import_module
5 from logging import getLogger
15 from struct import pack
17 from typing import Any, cast, Dict, List, Optional, Set, Tuple, Union
21 from .protomodule import ProtoModule
22 from .zmsg import Bcast, Resp
24 log = getLogger("loctrkd/collector")
29 pmods: List[ProtoModule] = []
33 """Connected socket to the terminal plus buffer and metadata"""
35 def __init__(self, sock: socket, addr: Any) -> None:
38 self.pmod: Optional[ProtoModule] = None
39 self.stream: Optional[ProtoModule.Stream] = None
40 self.imei: Optional[str] = None
42 def close(self) -> None:
43 log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei)
46 rest = self.stream.close()
51 "%d bytes in buffer on close: %s", len(rest), rest[:64].hex()
54 def recv(self) -> Optional[List[Tuple[float, Any, bytes]]]:
55 """Read from the socket and parse complete messages"""
57 segment = self.sock.recv(MAXBUFFER)
60 "Reading from fd %d (IMEI %s): %s",
66 if not segment: # Terminal has closed connection
68 "EOF reading from fd %d (IMEI %s)",
73 if self.stream is None:
75 if pmod.probe_buffer(segment):
77 self.stream = pmod.Stream()
79 if self.stream is None:
81 "unrecognizable %d bytes of data %s from fd %d",
89 for elem in self.stream.recv(segment):
90 if isinstance(elem, bytes):
91 msgs.append((when, self.addr, elem))
94 "%s from fd %d (IMEI %s)",
101 def send(self, buffer: bytes) -> None:
102 assert self.stream is not None and self.pmod is not None
104 self.sock.send(self.pmod.enframe(buffer, imei=self.imei))
107 "Sending to fd %d (IMEI %s): %s",
115 def __init__(self) -> None:
116 self.by_fd: Dict[int, Client] = {}
117 self.by_imei: Dict[str, Client] = {}
119 def fds(self) -> Set[int]:
120 return set(self.by_fd.keys())
122 def add(self, clntsock: socket, clntaddr: Any) -> int:
123 fd = clntsock.fileno()
124 log.info("Start serving fd %d from %s", fd, clntaddr)
125 self.by_fd[fd] = Client(clntsock, clntaddr)
128 def stop(self, fd: int) -> None:
129 if fd not in self.by_fd:
130 log.debug("Fd %d is not served, ingore stop", fd)
132 clnt = self.by_fd[fd]
133 log.info("Stop serving fd %d (IMEI %s)", clnt.sock.fileno(), clnt.imei)
135 if clnt.imei and self.by_imei[clnt.imei] == clnt: # could be replaced
136 del self.by_imei[clnt.imei]
141 ) -> Optional[List[Tuple[ProtoModule, Optional[str], float, Any, bytes]]]:
142 if fd not in self.by_fd:
143 log.debug("Client at fd %d gone, ingore event", fd)
145 clnt = self.by_fd[fd]
150 for when, peeraddr, packet in msgs:
151 assert clnt.pmod is not None
152 if clnt.imei is None:
153 imei = clnt.pmod.imei_from_packet(packet)
155 log.info("LOGIN from fd %d (IMEI %s)", fd, imei)
157 oldclnt = self.by_imei.get(clnt.imei)
158 if oldclnt is not None:
159 oldfd = oldclnt.sock.fileno()
160 log.info("Removing stale connection on fd %d", oldfd)
163 self.by_imei[clnt.imei] = clnt
164 result.append((clnt.pmod, clnt.imei, when, peeraddr, packet))
166 "Received from %s (IMEI %s): %s",
173 def response(self, resp: Resp) -> Optional[ProtoModule]:
174 if resp.imei in self.by_imei:
175 clnt = self.by_imei[resp.imei]
176 clnt.send(resp.packet)
179 log.info("Not connected (IMEI %s)", resp.imei)
183 def runserver(conf: ConfigParser, handle_hibernate: bool = True) -> None:
186 cast(ProtoModule, import_module("." + modnm, __package__))
187 for modnm in conf.get("collector", "protocols").split(",")
189 # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
190 zctx = zmq.Context() # type: ignore
191 zpub = zctx.socket(zmq.PUB) # type: ignore
192 zpull = zctx.socket(zmq.PULL) # type: ignore
193 oldmask = umask(0o117)
194 zpub.bind(conf.get("collector", "publishurl"))
195 zpull.bind(conf.get("collector", "listenurl"))
197 tcpl = socket(AF_INET6, SOCK_STREAM)
198 tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
199 tcpl.bind(("", conf.getint("collector", "port")))
201 tcpfd = tcpl.fileno()
202 poller = zmq.Poller() # type: ignore
203 poller.register(zpull, flags=zmq.POLLIN)
204 poller.register(tcpfd, flags=zmq.POLLIN)
206 pollingfds: Set[int] = set()
209 tosend: List[Resp] = []
210 toadd: List[Tuple[socket, Any]] = []
211 events = poller.poll(1000)
212 for sk, fl in events:
216 msg = zpull.recv(zmq.NOBLOCK)
222 clntsock, clntaddr = tcpl.accept()
223 clntsock.setsockopt(SOL_SOCKET, SO_KEEPALIVE, 1)
224 toadd.append((clntsock, clntaddr))
225 elif fl & zmq.POLLIN:
226 received = clients.recv(sk)
228 log.debug("Terminal gone from fd %d", sk)
231 for pmod, imei, when, peeraddr, packet in received:
232 proto = pmod.proto_of_message(packet)
243 pmod.is_goodbye_packet(packet)
247 "Goodbye from fd %d (IMEI %s)",
252 respmsg = pmod.inline_response(packet)
253 if respmsg is not None:
255 Resp(imei=imei, when=when, packet=respmsg)
258 log.debug("Stray event: %s on socket %s", fl, sk)
259 # poll queue consumed, make changes now
261 log.debug("Sending to the client: %s", zmsg)
262 rpmod = clients.response(zmsg)
263 if rpmod is not None:
267 proto=rpmod.proto_of_message(zmsg.packet),
273 for fd in pollingfds - clients.fds():
274 poller.unregister(fd) # type: ignore
275 for clntsock, clntaddr in toadd:
276 fd = clients.add(clntsock, clntaddr)
277 for fd in clients.fds() - pollingfds:
278 poller.register(fd, flags=zmq.POLLIN)
279 pollingfds = clients.fds()
280 except KeyboardInterrupt:
283 zctx.destroy() # type: ignore
287 if __name__.endswith("__main__"):
288 runserver(common.init(log))