]> www.average.org Git - loctrkd.git/blob - gps303/collector.py
do not respond to hibernation; minor cleanup
[loctrkd.git] / gps303 / collector.py
1 """ TCP server that communicates with terminals """
2
3 from logging import getLogger
4 from os import umask
5 from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
6 from time import time
7 from struct import pack
8 import zmq
9
10 from . import common
11 from .gps303proto import (
12     HIBERNATION,
13     LOGIN,
14     inline_response,
15     parse_message,
16     proto_of_message,
17 )
18 from .zmsg import Bcast, Resp
19
20 log = getLogger("gps303/collector")
21
22
23 class Client:
24     """Connected socket to the terminal plus buffer and metadata"""
25
26     def __init__(self, sock, addr):
27         self.sock = sock
28         self.addr = addr
29         self.buffer = b""
30         self.imei = None
31
32     def close(self):
33         log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei)
34         self.sock.close()
35         self.buffer = b""
36
37     def recv(self):
38         """Read from the socket and parse complete messages"""
39         try:
40             segment = self.sock.recv(4096)
41         except OSError:
42             log.warning(
43                 "Reading from fd %d (IMEI %s): %s",
44                 self.sock.fileno(),
45                 self.imei,
46                 e,
47             )
48             return None
49         if not segment:  # Terminal has closed connection
50             log.info(
51                 "EOF reading from fd %d (IMEI %s)",
52                 self.sock.fileno(),
53                 self.imei,
54             )
55             return None
56         when = time()
57         self.buffer += segment
58         msgs = []
59         while True:
60             framestart = self.buffer.find(b"xx")
61             if framestart == -1:  # No frames, return whatever we have
62                 break
63             if framestart > 0:  # Should not happen, report
64                 log.warning(
65                     'Undecodable data "%s" from fd %d (IMEI %s)',
66                     self.buffer[:framestart].hex(),
67                     self.sock.fileno(),
68                     self.imei,
69                 )
70                 self.buffer = self.buffer[framestart:]
71             # At this point, buffer starts with a packet
72             frameend = self.buffer.find(b"\r\n", 4)
73             if frameend == -1:  # Incomplete frame, return what we have
74                 break
75             packet = self.buffer[2:frameend]
76             self.buffer = self.buffer[frameend + 2 :]
77             if proto_of_message(packet) == LOGIN.PROTO:
78                 self.imei = parse_message(packet).imei
79                 log.info(
80                     "LOGIN from fd %d (IMEI %s)", self.sock.fileno(), self.imei
81                 )
82             msgs.append((when, self.addr, packet))
83         return msgs
84
85     def send(self, buffer):
86         try:
87             self.sock.send(b"xx" + buffer + b"\r\n")
88         except OSError as e:
89             log.error(
90                 "Sending to fd %d (IMEI %s): %s",
91                 self.sock.fileno,
92                 self.imei,
93                 e,
94             )
95
96
97 class Clients:
98     def __init__(self):
99         self.by_fd = {}
100         self.by_imei = {}
101
102     def add(self, clntsock, clntaddr):
103         fd = clntsock.fileno()
104         log.info("Start serving fd %d from %s", fd, clntaddr)
105         self.by_fd[fd] = Client(clntsock, clntaddr)
106         return fd
107
108     def stop(self, fd):
109         clnt = self.by_fd[fd]
110         log.info("Stop serving fd %d (IMEI %s)", clnt.sock.fileno(), clnt.imei)
111         clnt.close()
112         if clnt.imei:
113             del self.by_imei[clnt.imei]
114         del self.by_fd[fd]
115
116     def recv(self, fd):
117         clnt = self.by_fd[fd]
118         msgs = clnt.recv()
119         if msgs is None:
120             return None
121         result = []
122         for when, peeraddr, packet in msgs:
123             if proto_of_message(packet) == LOGIN.PROTO:  # Could do blindly...
124                 self.by_imei[clnt.imei] = clnt
125             result.append((clnt.imei, when, peeraddr, packet))
126             log.debug(
127                 "Received from %s (IMEI %s): %s",
128                 peeraddr,
129                 clnt.imei,
130                 packet.hex(),
131             )
132         return result
133
134     def response(self, resp):
135         if resp.imei in self.by_imei:
136             self.by_imei[resp.imei].send(resp.packet)
137         else:
138             log.info("Not connected (IMEI %s)", resp.imei)
139
140
141 def runserver(conf):
142     zctx = zmq.Context()
143     zpub = zctx.socket(zmq.PUB)
144     zpull = zctx.socket(zmq.PULL)
145     oldmask = umask(0o117)
146     zpub.bind(conf.get("collector", "publishurl"))
147     zpull.bind(conf.get("collector", "listenurl"))
148     umask(oldmask)
149     tcpl = socket(AF_INET6, SOCK_STREAM)
150     tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
151     tcpl.bind(("", conf.getint("collector", "port")))
152     tcpl.listen(5)
153     tcpfd = tcpl.fileno()
154     poller = zmq.Poller()
155     poller.register(zpull, flags=zmq.POLLIN)
156     poller.register(tcpfd, flags=zmq.POLLIN)
157     clients = Clients()
158     try:
159         while True:
160             tosend = []
161             topoll = []
162             tostop = []
163             events = poller.poll(1000)
164             for sk, fl in events:
165                 if sk is zpull:
166                     while True:
167                         try:
168                             msg = zpull.recv(zmq.NOBLOCK)
169                             zmsg = Resp(msg)
170                             tosend.append(zmsg)
171                         except zmq.Again:
172                             break
173                 elif sk == tcpfd:
174                     clntsock, clntaddr = tcpl.accept()
175                     topoll.append((clntsock, clntaddr))
176                 elif fl & zmq.POLLIN:
177                     received = clients.recv(sk)
178                     if received is None:
179                         log.debug(
180                             "Terminal gone from fd %d (IMEI %s)", sk, imei
181                         )
182                         tostop.append(sk)
183                     else:
184                         for imei, when, peeraddr, packet in received:
185                             proto = proto_of_message(packet)
186                             zpub.send(
187                                 Bcast(
188                                     proto=proto,
189                                     imei=imei,
190                                     when=when,
191                                     peeraddr=peeraddr,
192                                     packet=packet,
193                                 ).packed
194                             )
195                             if proto == HIBERNATION.PROTO:
196                                 log.debug(
197                                     "HIBERNATION from fd %d (IMEI %s)",
198                                     sk,
199                                     imei,
200                                 )
201                                 tostop.append(sk)
202                             respmsg = inline_response(packet)
203                             if respmsg is not None:
204                                 tosend.append(
205                                     Resp(imei=imei, when=when, packet=respmsg)
206                                 )
207                 else:
208                     log.debug("Stray event: %s on socket %s", fl, sk)
209             # poll queue consumed, make changes now
210             for fd in tostop:
211                 poller.unregister(fd)
212                 clients.stop(fd)
213             for zmsg in tosend:
214                 zpub.send(
215                     Bcast(
216                         is_incoming=False,
217                         proto=proto_of_message(zmsg.packet),
218                         when=zmsg.when,
219                         imei=zmsg.imei,
220                         packet=zmsg.packet,
221                     ).packed
222                 )
223                 log.debug("Sending to the client: %s", zmsg)
224                 clients.response(zmsg)
225             for clntsock, clntaddr in topoll:
226                 fd = clients.add(clntsock, clntaddr)
227                 poller.register(fd, flags=zmq.POLLIN)
228     except KeyboardInterrupt:
229         pass
230
231
232 if __name__.endswith("__main__"):
233     runserver(common.init(log))