]> www.average.org Git - loctrkd.git/blob - gps303/collector.py
0efc669fd908e83a480c3e86839efda8b2254d45
[loctrkd.git] / gps303 / collector.py
1 """ TCP server that communicates with terminals """
2
3 from logging import getLogger
4 from os import umask
5 from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
6 from time import time
7 from struct import pack
8 import zmq
9
10 from . import common
11 from .gps303proto import (
12     HIBERNATION,
13     LOGIN,
14     inline_response,
15     parse_message,
16     proto_of_message,
17 )
18 from .zmsg import Bcast, Resp
19
20 log = getLogger("gps303/collector")
21
22
23 class Client:
24     """Connected socket to the terminal plus buffer and metadata"""
25
26     def __init__(self, sock, addr):
27         self.sock = sock
28         self.addr = addr
29         self.buffer = b""
30         self.imei = None
31
32     def close(self):
33         log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei)
34         self.sock.close()
35         self.buffer = b""
36
37     def recv(self):
38         """Read from the socket and parse complete messages"""
39         try:
40             segment = self.sock.recv(4096)
41         except OSError as e:
42             log.warning(
43                 "Reading from fd %d (IMEI %s): %s",
44                 self.sock.fileno(),
45                 self.imei,
46                 e,
47             )
48             return None
49         if not segment:  # Terminal has closed connection
50             log.info(
51                 "EOF reading from fd %d (IMEI %s)",
52                 self.sock.fileno(),
53                 self.imei,
54             )
55             return None
56         when = time()
57         self.buffer += segment
58         msgs = []
59         while True:
60             framestart = self.buffer.find(b"xx")
61             if framestart == -1:  # No frames, return whatever we have
62                 break
63             if framestart > 0:  # Should not happen, report
64                 log.warning(
65                     'Undecodable data "%s" from fd %d (IMEI %s)',
66                     self.buffer[:framestart].hex(),
67                     self.sock.fileno(),
68                     self.imei,
69                 )
70                 self.buffer = self.buffer[framestart:]
71             # At this point, buffer starts with a packet
72             if len(self.buffer) < 6:  # no len and proto - cannot proceed
73                 break
74             exp_end = self.buffer[2] + 3  # Expect '\r\n' here
75             frameend = 0
76             # Length field can legitimeely be much less than the
77             # length of the packet (e.g. WiFi positioning), but
78             # it _should not_ be greater. Still sometimes it is.
79             # Luckily, not by too much: by maybe two or three bytes?
80             # Do this embarrassing hack to avoid accidental match
81             # of some binary data in the packet against '\r\n'.
82             while True:
83                 frameend = self.buffer.find(b"\r\n", frameend)
84                 if frameend >= (exp_end - 3):  # Found realistic match
85                     break
86             if frameend == -1:  # Incomplete frame, return what we have
87                 break
88             packet = self.buffer[2:frameend]
89             self.buffer = self.buffer[frameend + 2 :]
90             if proto_of_message(packet) == LOGIN.PROTO:
91                 self.imei = parse_message(packet).imei
92                 log.info(
93                     "LOGIN from fd %d (IMEI %s)", self.sock.fileno(), self.imei
94                 )
95             msgs.append((when, self.addr, packet))
96         return msgs
97
98     def send(self, buffer):
99         try:
100             self.sock.send(b"xx" + buffer + b"\r\n")
101         except OSError as e:
102             log.error(
103                 "Sending to fd %d (IMEI %s): %s",
104                 self.sock.fileno,
105                 self.imei,
106                 e,
107             )
108
109
110 class Clients:
111     def __init__(self):
112         self.by_fd = {}
113         self.by_imei = {}
114
115     def add(self, clntsock, clntaddr):
116         fd = clntsock.fileno()
117         log.info("Start serving fd %d from %s", fd, clntaddr)
118         self.by_fd[fd] = Client(clntsock, clntaddr)
119         return fd
120
121     def stop(self, fd):
122         clnt = self.by_fd[fd]
123         log.info("Stop serving fd %d (IMEI %s)", clnt.sock.fileno(), clnt.imei)
124         clnt.close()
125         if clnt.imei:
126             del self.by_imei[clnt.imei]
127         del self.by_fd[fd]
128
129     def recv(self, fd):
130         clnt = self.by_fd[fd]
131         msgs = clnt.recv()
132         if msgs is None:
133             return None
134         result = []
135         for when, peeraddr, packet in msgs:
136             if proto_of_message(packet) == LOGIN.PROTO:  # Could do blindly...
137                 self.by_imei[clnt.imei] = clnt
138             result.append((clnt.imei, when, peeraddr, packet))
139             log.debug(
140                 "Received from %s (IMEI %s): %s",
141                 peeraddr,
142                 clnt.imei,
143                 packet.hex(),
144             )
145         return result
146
147     def response(self, resp):
148         if resp.imei in self.by_imei:
149             self.by_imei[resp.imei].send(resp.packet)
150         else:
151             log.info("Not connected (IMEI %s)", resp.imei)
152
153
154 def runserver(conf):
155     zctx = zmq.Context()
156     zpub = zctx.socket(zmq.PUB)
157     zpull = zctx.socket(zmq.PULL)
158     oldmask = umask(0o117)
159     zpub.bind(conf.get("collector", "publishurl"))
160     zpull.bind(conf.get("collector", "listenurl"))
161     umask(oldmask)
162     tcpl = socket(AF_INET6, SOCK_STREAM)
163     tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
164     tcpl.bind(("", conf.getint("collector", "port")))
165     tcpl.listen(5)
166     tcpfd = tcpl.fileno()
167     poller = zmq.Poller()
168     poller.register(zpull, flags=zmq.POLLIN)
169     poller.register(tcpfd, flags=zmq.POLLIN)
170     clients = Clients()
171     try:
172         while True:
173             tosend = []
174             topoll = []
175             tostop = []
176             events = poller.poll(1000)
177             for sk, fl in events:
178                 if sk is zpull:
179                     while True:
180                         try:
181                             msg = zpull.recv(zmq.NOBLOCK)
182                             zmsg = Resp(msg)
183                             tosend.append(zmsg)
184                         except zmq.Again:
185                             break
186                 elif sk == tcpfd:
187                     clntsock, clntaddr = tcpl.accept()
188                     topoll.append((clntsock, clntaddr))
189                 elif fl & zmq.POLLIN:
190                     received = clients.recv(sk)
191                     if received is None:
192                         log.debug("Terminal gone from fd %d", sk)
193                         tostop.append(sk)
194                     else:
195                         for imei, when, peeraddr, packet in received:
196                             proto = proto_of_message(packet)
197                             zpub.send(
198                                 Bcast(
199                                     proto=proto,
200                                     imei=imei,
201                                     when=when,
202                                     peeraddr=peeraddr,
203                                     packet=packet,
204                                 ).packed
205                             )
206                             if proto == HIBERNATION.PROTO:
207                                 log.debug(
208                                     "HIBERNATION from fd %d (IMEI %s)",
209                                     sk,
210                                     imei,
211                                 )
212                                 tostop.append(sk)
213                             respmsg = inline_response(packet)
214                             if respmsg is not None:
215                                 tosend.append(
216                                     Resp(imei=imei, when=when, packet=respmsg)
217                                 )
218                 else:
219                     log.debug("Stray event: %s on socket %s", fl, sk)
220             # poll queue consumed, make changes now
221             for fd in tostop:
222                 poller.unregister(fd)
223                 clients.stop(fd)
224             for zmsg in tosend:
225                 zpub.send(
226                     Bcast(
227                         is_incoming=False,
228                         proto=proto_of_message(zmsg.packet),
229                         when=zmsg.when,
230                         imei=zmsg.imei,
231                         packet=zmsg.packet,
232                     ).packed
233                 )
234                 log.debug("Sending to the client: %s", zmsg)
235                 clients.response(zmsg)
236             for clntsock, clntaddr in topoll:
237                 fd = clients.add(clntsock, clntaddr)
238                 poller.register(fd, flags=zmq.POLLIN)
239     except KeyboardInterrupt:
240         pass
241
242
243 if __name__.endswith("__main__"):
244     runserver(common.init(log))