]> www.average.org Git - loctrkd.git/blob - gps303/collector.py
8bcca303a627e75b9220fa89d0cdf5f81f761357
[loctrkd.git] / gps303 / collector.py
1 """ TCP server that communicates with terminals """
2
3 from logging import getLogger
4 from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
5 from time import time
6 from struct import pack
7 import zmq
8
9 from . import common
10 from .gps303proto import (
11     HIBERNATION,
12     LOGIN,
13     inline_response,
14     parse_message,
15     proto_of_message,
16 )
17 from .zmsg import Bcast, Resp
18
19 log = getLogger("gps303/collector")
20
21
22 class Client:
23     """Connected socket to the terminal plus buffer and metadata"""
24
25     def __init__(self, sock, addr):
26         self.sock = sock
27         self.addr = addr
28         self.buffer = b""
29         self.imei = None
30
31     def close(self):
32         log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei)
33         self.sock.close()
34         self.buffer = b""
35
36     def recv(self):
37         """Read from the socket and parse complete messages"""
38         try:
39             segment = self.sock.recv(4096)
40         except OSError:
41             log.warning(
42                 "Reading from fd %d (IMEI %s): %s",
43                 self.sock.fileno(),
44                 self.imei,
45                 e,
46             )
47             return None
48         if not segment:  # Terminal has closed connection
49             log.info(
50                 "EOF reading from fd %d (IMEI %s)",
51                 self.sock.fileno(),
52                 self.imei,
53             )
54             return None
55         when = time()
56         self.buffer += segment
57         msgs = []
58         while True:
59             framestart = self.buffer.find(b"xx")
60             if framestart == -1:  # No frames, return whatever we have
61                 break
62             if framestart > 0:  # Should not happen, report
63                 log.warning(
64                     'Undecodable data "%s" from fd %d (IMEI %s)',
65                     self.buffer[:framestart].hex(),
66                     self.sock.fileno(),
67                     self.imei,
68                 )
69                 self.buffer = self.buffer[framestart:]
70             # At this point, buffer starts with a packet
71             frameend = self.buffer.find(b"\r\n", 4)
72             if frameend == -1:  # Incomplete frame, return what we have
73                 break
74             packet = self.buffer[2:frameend]
75             self.buffer = self.buffer[frameend + 2 :]
76             if proto_of_message(packet) == LOGIN.PROTO:
77                 self.imei = parse_message(packet).imei
78                 log.info(
79                     "LOGIN from fd %d (IMEI %s)", self.sock.fileno(), self.imei
80                 )
81             msgs.append((when, self.addr, packet))
82         return msgs
83
84     def send(self, buffer):
85         try:
86             self.sock.send(b"xx" + buffer + b"\r\n")
87         except OSError as e:
88             log.error(
89                 "Sending to fd %d (IMEI %s): %s",
90                 self.sock.fileno,
91                 self.imei,
92                 e,
93             )
94
95
96 class Clients:
97     def __init__(self):
98         self.by_fd = {}
99         self.by_imei = {}
100
101     def add(self, clntsock, clntaddr):
102         fd = clntsock.fileno()
103         log.info("Start serving fd %d from %s", fd, clntaddr)
104         self.by_fd[fd] = Client(clntsock, clntaddr)
105         return fd
106
107     def stop(self, fd):
108         clnt = self.by_fd[fd]
109         log.info("Stop serving fd %d (IMEI %s)", clnt.sock.fileno(), clnt.imei)
110         clnt.close()
111         if clnt.imei:
112             del self.by_imei[clnt.imei]
113         del self.by_fd[fd]
114
115     def recv(self, fd):
116         clnt = self.by_fd[fd]
117         msgs = clnt.recv()
118         if msgs is None:
119             return None
120         result = []
121         for when, peeraddr, packet in msgs:
122             if proto_of_message(packet) == LOGIN.PROTO:  # Could do blindly...
123                 self.by_imei[clnt.imei] = clnt
124             result.append((clnt.imei, when, peeraddr, packet))
125             log.debug(
126                 "Received from %s (IMEI %s): %s",
127                 peeraddr,
128                 clnt.imei,
129                 packet.hex(),
130             )
131         return result
132
133     def response(self, resp):
134         if resp.imei in self.by_imei:
135             self.by_imei[resp.imei].send(resp.packet)
136         else:
137             log.info("Not connected (IMEI %s)", resp.imei)
138
139
140 def runserver(conf):
141     zctx = zmq.Context()
142     zpub = zctx.socket(zmq.PUB)
143     zpub.bind(conf.get("collector", "publishurl"))
144     zpull = zctx.socket(zmq.PULL)
145     zpull.bind(conf.get("collector", "listenurl"))
146     tcpl = socket(AF_INET6, SOCK_STREAM)
147     tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
148     tcpl.bind(("", conf.getint("collector", "port")))
149     tcpl.listen(5)
150     tcpfd = tcpl.fileno()
151     poller = zmq.Poller()
152     poller.register(zpull, flags=zmq.POLLIN)
153     poller.register(tcpfd, flags=zmq.POLLIN)
154     clients = Clients()
155     try:
156         while True:
157             tosend = []
158             topoll = []
159             tostop = []
160             events = poller.poll(1000)
161             for sk, fl in events:
162                 if sk is zpull:
163                     while True:
164                         try:
165                             msg = zpull.recv(zmq.NOBLOCK)
166                             tosend.append(Resp(msg))
167                         except zmq.Again:
168                             break
169                 elif sk == tcpfd:
170                     clntsock, clntaddr = tcpl.accept()
171                     topoll.append((clntsock, clntaddr))
172                 elif fl & zmq.POLLIN:
173                     received = clients.recv(sk)
174                     if received is None:
175                         log.debug(
176                             "Terminal gone from fd %d (IMEI %s)", sk, imei
177                         )
178                         tostop.append(sk)
179                     else:
180                         for imei, when, peeraddr, packet in received:
181                             proto = proto_of_message(packet)
182                             zpub.send(
183                                 Bcast(
184                                     proto=proto,
185                                     imei=imei,
186                                     when=when,
187                                     peeraddr=peeraddr,
188                                     packet=packet,
189                                 ).packed
190                             )
191                             if proto == HIBERNATION.PROTO:
192                                 log.debug(
193                                     "HIBERNATION from fd %d (IMEI %s)",
194                                     sk,
195                                     imei,
196                                 )
197                                 tostop.append(sk)
198                             respmsg = inline_response(packet)
199                             if respmsg is not None:
200                                 clients.response(
201                                     Resp(imei=imei, packet=respmsg)
202                                 )
203                 else:
204                     log.debug("Stray event: %s on socket %s", fl, sk)
205             # poll queue consumed, make changes now
206             for fd in tostop:
207                 poller.unregister(fd)
208                 clients.stop(fd)
209             for zmsg in tosend:
210                 log.debug("Sending to the client: %s", zmsg)
211                 clients.response(zmsg)
212             for clntsock, clntaddr in topoll:
213                 fd = clients.add(clntsock, clntaddr)
214                 poller.register(fd, flags=zmq.POLLIN)
215     except KeyboardInterrupt:
216         pass
217
218
219 if __name__.endswith("__main__"):
220     runserver(common.init(log))