]> www.average.org Git - loctrkd.git/blob - gps303/collector.py
A hack in packet framing to false stop bytes match
[loctrkd.git] / gps303 / collector.py
1 """ TCP server that communicates with terminals """
2
3 from logging import getLogger
4 from os import umask
5 from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
6 from time import time
7 from struct import pack
8 import zmq
9
10 from . import common
11 from .gps303proto import (
12     HIBERNATION,
13     LOGIN,
14     inline_response,
15     parse_message,
16     proto_of_message,
17 )
18 from .zmsg import Bcast, Resp
19
20 log = getLogger("gps303/collector")
21
22
23 class Client:
24     """Connected socket to the terminal plus buffer and metadata"""
25
26     def __init__(self, sock, addr):
27         self.sock = sock
28         self.addr = addr
29         self.buffer = b""
30         self.imei = None
31
32     def close(self):
33         log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei)
34         self.sock.close()
35         self.buffer = b""
36
37     def recv(self):
38         """Read from the socket and parse complete messages"""
39         try:
40             segment = self.sock.recv(4096)
41         except OSError:
42             log.warning(
43                 "Reading from fd %d (IMEI %s): %s",
44                 self.sock.fileno(),
45                 self.imei,
46                 e,
47             )
48             return None
49         if not segment:  # Terminal has closed connection
50             log.info(
51                 "EOF reading from fd %d (IMEI %s)",
52                 self.sock.fileno(),
53                 self.imei,
54             )
55             return None
56         when = time()
57         self.buffer += segment
58         msgs = []
59         while True:
60             framestart = self.buffer.find(b"xx")
61             if framestart == -1:  # No frames, return whatever we have
62                 break
63             if framestart > 0:  # Should not happen, report
64                 log.warning(
65                     'Undecodable data "%s" from fd %d (IMEI %s)',
66                     self.buffer[:framestart].hex(),
67                     self.sock.fileno(),
68                     self.imei,
69                 )
70                 self.buffer = self.buffer[framestart:]
71             # At this point, buffer starts with a packet
72             if len(self.buffer) < 6:  # no len and proto - cannot proceed
73                 break
74             exp_end = self.buffer[2] + 3  # Expect '\r\n' here
75             frameend = 0
76             # Length field can legitimeely be much less than the
77             # length of the packet (e.g. WiFi positioning), but
78             # it _should not_ be greater. Still sometimes it is.
79             # Luckily, not by too much: by maybe two or three bytes?
80             # Do this embarrassing hack to avoid accidental match
81             # of some binary data in the packet against '\r\n'.
82             while True:
83                 frameend = self.buffer.find(b"\r\n", frameend)
84                 if frameend >= (exp_end - 3):  # Found realistic match
85                     break
86             if frameend == -1:  # Incomplete frame, return what we have
87                 break
88             packet = self.buffer[2:frameend]
89             self.buffer = self.buffer[frameend + 2 :]
90             if proto_of_message(packet) == LOGIN.PROTO:
91                 self.imei = parse_message(packet).imei
92                 log.info(
93                     "LOGIN from fd %d (IMEI %s)", self.sock.fileno(), self.imei
94                 )
95             msgs.append((when, self.addr, packet))
96         return msgs
97
98     def send(self, buffer):
99         try:
100             self.sock.send(b"xx" + buffer + b"\r\n")
101         except OSError as e:
102             log.error(
103                 "Sending to fd %d (IMEI %s): %s",
104                 self.sock.fileno,
105                 self.imei,
106                 e,
107             )
108
109
110 class Clients:
111     def __init__(self):
112         self.by_fd = {}
113         self.by_imei = {}
114
115     def add(self, clntsock, clntaddr):
116         fd = clntsock.fileno()
117         log.info("Start serving fd %d from %s", fd, clntaddr)
118         self.by_fd[fd] = Client(clntsock, clntaddr)
119         return fd
120
121     def stop(self, fd):
122         clnt = self.by_fd[fd]
123         log.info("Stop serving fd %d (IMEI %s)", clnt.sock.fileno(), clnt.imei)
124         clnt.close()
125         if clnt.imei:
126             del self.by_imei[clnt.imei]
127         del self.by_fd[fd]
128
129     def recv(self, fd):
130         clnt = self.by_fd[fd]
131         msgs = clnt.recv()
132         if msgs is None:
133             return None
134         result = []
135         for when, peeraddr, packet in msgs:
136             if proto_of_message(packet) == LOGIN.PROTO:  # Could do blindly...
137                 self.by_imei[clnt.imei] = clnt
138             result.append((clnt.imei, when, peeraddr, packet))
139             log.debug(
140                 "Received from %s (IMEI %s): %s",
141                 peeraddr,
142                 clnt.imei,
143                 packet.hex(),
144             )
145         return result
146
147     def response(self, resp):
148         if resp.imei in self.by_imei:
149             self.by_imei[resp.imei].send(resp.packet)
150         else:
151             log.info("Not connected (IMEI %s)", resp.imei)
152
153
154 def runserver(conf):
155     zctx = zmq.Context()
156     zpub = zctx.socket(zmq.PUB)
157     zpull = zctx.socket(zmq.PULL)
158     oldmask = umask(0o117)
159     zpub.bind(conf.get("collector", "publishurl"))
160     zpull.bind(conf.get("collector", "listenurl"))
161     umask(oldmask)
162     tcpl = socket(AF_INET6, SOCK_STREAM)
163     tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
164     tcpl.bind(("", conf.getint("collector", "port")))
165     tcpl.listen(5)
166     tcpfd = tcpl.fileno()
167     poller = zmq.Poller()
168     poller.register(zpull, flags=zmq.POLLIN)
169     poller.register(tcpfd, flags=zmq.POLLIN)
170     clients = Clients()
171     try:
172         while True:
173             tosend = []
174             topoll = []
175             tostop = []
176             events = poller.poll(1000)
177             for sk, fl in events:
178                 if sk is zpull:
179                     while True:
180                         try:
181                             msg = zpull.recv(zmq.NOBLOCK)
182                             zmsg = Resp(msg)
183                             tosend.append(zmsg)
184                         except zmq.Again:
185                             break
186                 elif sk == tcpfd:
187                     clntsock, clntaddr = tcpl.accept()
188                     topoll.append((clntsock, clntaddr))
189                 elif fl & zmq.POLLIN:
190                     received = clients.recv(sk)
191                     if received is None:
192                         log.debug(
193                             "Terminal gone from fd %d (IMEI %s)", sk, imei
194                         )
195                         tostop.append(sk)
196                     else:
197                         for imei, when, peeraddr, packet in received:
198                             proto = proto_of_message(packet)
199                             zpub.send(
200                                 Bcast(
201                                     proto=proto,
202                                     imei=imei,
203                                     when=when,
204                                     peeraddr=peeraddr,
205                                     packet=packet,
206                                 ).packed
207                             )
208                             if proto == HIBERNATION.PROTO:
209                                 log.debug(
210                                     "HIBERNATION from fd %d (IMEI %s)",
211                                     sk,
212                                     imei,
213                                 )
214                                 tostop.append(sk)
215                             respmsg = inline_response(packet)
216                             if respmsg is not None:
217                                 tosend.append(
218                                     Resp(imei=imei, when=when, packet=respmsg)
219                                 )
220                 else:
221                     log.debug("Stray event: %s on socket %s", fl, sk)
222             # poll queue consumed, make changes now
223             for fd in tostop:
224                 poller.unregister(fd)
225                 clients.stop(fd)
226             for zmsg in tosend:
227                 zpub.send(
228                     Bcast(
229                         is_incoming=False,
230                         proto=proto_of_message(zmsg.packet),
231                         when=zmsg.when,
232                         imei=zmsg.imei,
233                         packet=zmsg.packet,
234                     ).packed
235                 )
236                 log.debug("Sending to the client: %s", zmsg)
237                 clients.response(zmsg)
238             for clntsock, clntaddr in topoll:
239                 fd = clients.add(clntsock, clntaddr)
240                 poller.register(fd, flags=zmq.POLLIN)
241     except KeyboardInterrupt:
242         pass
243
244
245 if __name__.endswith("__main__"):
246     runserver(common.init(log))