]> www.average.org Git - loctrkd.git/blob - gps303/collector.py
move stream parser/deframer to the protocol module
[loctrkd.git] / gps303 / collector.py
1 """ TCP server that communicates with terminals """
2
3 from configparser import ConfigParser
4 from logging import getLogger
5 from os import umask
6 from socket import (
7     socket,
8     AF_INET6,
9     SOCK_STREAM,
10     SOL_SOCKET,
11     SO_KEEPALIVE,
12     SO_REUSEADDR,
13 )
14 from struct import pack
15 from time import time
16 from typing import Dict, List, Optional, Tuple
17 import zmq
18
19 from . import common
20 from .gps303proto import (
21     GPS303Conn,
22     StreamError,
23     HIBERNATION,
24     LOGIN,
25     inline_response,
26     parse_message,
27     proto_of_message,
28 )
29 from .zmsg import Bcast, Resp
30
31 log = getLogger("gps303/collector")
32
33 MAXBUFFER: int = 4096
34
35
36 class Client:
37     """Connected socket to the terminal plus buffer and metadata"""
38
39     def __init__(self, sock: socket, addr: Tuple[str, int]) -> None:
40         self.sock = sock
41         self.addr = addr
42         self.stream = GPS303Conn()
43         self.imei: Optional[str] = None
44
45     def close(self) -> None:
46         log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei)
47         self.sock.close()
48         rest = self.stream.close()
49         if rest:
50             log.warning("%d bytes in buffer on close: %s", len(rest), rest)
51
52     def recv(self) -> Optional[List[Tuple[float, Tuple[str, int], bytes]]]:
53         """Read from the socket and parse complete messages"""
54         try:
55             segment = self.sock.recv(MAXBUFFER)
56         except OSError as e:
57             log.warning(
58                 "Reading from fd %d (IMEI %s): %s",
59                 self.sock.fileno(),
60                 self.imei,
61                 e,
62             )
63             return None
64         if not segment:  # Terminal has closed connection
65             log.info(
66                 "EOF reading from fd %d (IMEI %s)",
67                 self.sock.fileno(),
68                 self.imei,
69             )
70             return None
71         when = time()
72         while True:
73             try:
74                 return [
75                     (when, self.addr, packet)
76                     for packet in self.stream.recv(segment)
77                 ]
78             except StreamError as e:
79                 log.warning(
80                     "%s from fd %d (IMEI %s)", e, self.sock.fileno(), self.imei
81                 )
82
83     def send(self, buffer: bytes) -> None:
84         try:
85             self.sock.send(self.stream.enframe(buffer))
86         except OSError as e:
87             log.error(
88                 "Sending to fd %d (IMEI %s): %s",
89                 self.sock.fileno(),
90                 self.imei,
91                 e,
92             )
93
94     def set_imei(self, imei: str) -> None:
95         self.imei = imei
96
97
98 class Clients:
99     def __init__(self) -> None:
100         self.by_fd: Dict[int, Client] = {}
101         self.by_imei: Dict[str, Client] = {}
102
103     def add(self, clntsock: socket, clntaddr: Tuple[str, int]) -> int:
104         fd = clntsock.fileno()
105         log.info("Start serving fd %d from %s", fd, clntaddr)
106         self.by_fd[fd] = Client(clntsock, clntaddr)
107         return fd
108
109     def stop(self, fd: int) -> None:
110         clnt = self.by_fd[fd]
111         log.info("Stop serving fd %d (IMEI %s)", clnt.sock.fileno(), clnt.imei)
112         clnt.close()
113         if clnt.imei:
114             del self.by_imei[clnt.imei]
115         del self.by_fd[fd]
116
117     def recv(
118         self, fd: int
119     ) -> Optional[List[Tuple[Optional[str], float, Tuple[str, int], bytes]]]:
120         clnt = self.by_fd[fd]
121         msgs = clnt.recv()
122         if msgs is None:
123             return None
124         result = []
125         for when, peeraddr, packet in msgs:
126             if proto_of_message(packet) == LOGIN.PROTO:
127                 msg = parse_message(packet)
128                 if isinstance(msg, LOGIN):  # Can be unparseable
129                     if clnt.imei is None:
130                         clnt.imei = msg.imei
131                         log.info(
132                             "LOGIN from fd %d (IMEI %s)",
133                             clnt.sock.fileno(),
134                             clnt.imei,
135                         )
136                         oldclnt = self.by_imei.get(clnt.imei)
137                         if oldclnt is not None:
138                             log.info(
139                                 "Orphaning fd %d with the same IMEI",
140                                 oldclnt.sock.fileno(),
141                             )
142                             oldclnt.imei = None
143                     self.by_imei[clnt.imei] = clnt
144                 else:
145                     log.warning(
146                         "Login message from %s: %s, but client imei unfilled",
147                         peeraddr,
148                         packet,
149                     )
150             result.append((clnt.imei, when, peeraddr, packet))
151             log.debug(
152                 "Received from %s (IMEI %s): %s",
153                 peeraddr,
154                 clnt.imei,
155                 packet.hex(),
156             )
157         return result
158
159     def response(self, resp: Resp) -> None:
160         if resp.imei in self.by_imei:
161             self.by_imei[resp.imei].send(resp.packet)
162         else:
163             log.info("Not connected (IMEI %s)", resp.imei)
164
165
166 def runserver(conf: ConfigParser, handle_hibernate: bool = True) -> None:
167     # Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
168     zctx = zmq.Context()  # type: ignore
169     zpub = zctx.socket(zmq.PUB)  # type: ignore
170     zpull = zctx.socket(zmq.PULL)  # type: ignore
171     oldmask = umask(0o117)
172     zpub.bind(conf.get("collector", "publishurl"))
173     zpull.bind(conf.get("collector", "listenurl"))
174     umask(oldmask)
175     tcpl = socket(AF_INET6, SOCK_STREAM)
176     tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
177     tcpl.bind(("", conf.getint("collector", "port")))
178     tcpl.listen(5)
179     tcpfd = tcpl.fileno()
180     poller = zmq.Poller()  # type: ignore
181     poller.register(zpull, flags=zmq.POLLIN)
182     poller.register(tcpfd, flags=zmq.POLLIN)
183     clients = Clients()
184     try:
185         while True:
186             tosend = []
187             topoll = []
188             tostop = []
189             events = poller.poll(1000)
190             for sk, fl in events:
191                 if sk is zpull:
192                     while True:
193                         try:
194                             msg = zpull.recv(zmq.NOBLOCK)
195                             zmsg = Resp(msg)
196                             tosend.append(zmsg)
197                         except zmq.Again:
198                             break
199                 elif sk == tcpfd:
200                     clntsock, clntaddr = tcpl.accept()
201                     clntsock.setsockopt(SOL_SOCKET, SO_KEEPALIVE, 1)
202                     topoll.append((clntsock, clntaddr))
203                 elif fl & zmq.POLLIN:
204                     received = clients.recv(sk)
205                     if received is None:
206                         log.debug("Terminal gone from fd %d", sk)
207                         tostop.append(sk)
208                     else:
209                         for imei, when, peeraddr, packet in received:
210                             proto = proto_of_message(packet)
211                             zpub.send(
212                                 Bcast(
213                                     proto=proto,
214                                     imei=imei,
215                                     when=when,
216                                     peeraddr=peeraddr,
217                                     packet=packet,
218                                 ).packed
219                             )
220                             if proto == HIBERNATION.PROTO and handle_hibernate:
221                                 log.debug(
222                                     "HIBERNATION from fd %d (IMEI %s)",
223                                     sk,
224                                     imei,
225                                 )
226                                 tostop.append(sk)
227                             respmsg = inline_response(packet)
228                             if respmsg is not None:
229                                 tosend.append(
230                                     Resp(imei=imei, when=when, packet=respmsg)
231                                 )
232                 else:
233                     log.debug("Stray event: %s on socket %s", fl, sk)
234             # poll queue consumed, make changes now
235             for zmsg in tosend:
236                 zpub.send(
237                     Bcast(
238                         is_incoming=False,
239                         proto=proto_of_message(zmsg.packet),
240                         when=zmsg.when,
241                         imei=zmsg.imei,
242                         packet=zmsg.packet,
243                     ).packed
244                 )
245                 log.debug("Sending to the client: %s", zmsg)
246                 clients.response(zmsg)
247             for fd in tostop:
248                 poller.unregister(fd)  # type: ignore
249                 clients.stop(fd)
250             for clntsock, clntaddr in topoll:
251                 fd = clients.add(clntsock, clntaddr)
252                 poller.register(fd, flags=zmq.POLLIN)
253     except KeyboardInterrupt:
254         zpub.close()
255         zpull.close()
256         zctx.destroy()  # type: ignore
257         tcpl.close()
258
259
260 if __name__.endswith("__main__"):
261     runserver(common.init(log))