]> www.average.org Git - loctrkd.git/blob - gps303/collector.py
921833c083be0ac8aecda838a338a4f72b5f818c
[loctrkd.git] / gps303 / collector.py
1 """ TCP server that communicates with terminals """
2
3 from logging import getLogger
4 from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
5 from time import time
6 from struct import pack
7 import zmq
8
9 from . import common
10 from .gps303proto import (
11     HIBERNATION,
12     LOGIN,
13     inline_response,
14     parse_message,
15     proto_of_message,
16 )
17 from .zmsg import Bcast, Resp
18
19 log = getLogger("gps303/collector")
20
21
22 class Client:
23     """Connected socket to the terminal plus buffer and metadata"""
24
25     def __init__(self, sock, addr):
26         self.sock = sock
27         self.addr = addr
28         self.buffer = b""
29         self.imei = None
30
31     def close(self):
32         log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei)
33         self.sock.close()
34         self.buffer = b""
35
36     def recv(self):
37         """Read from the socket and parse complete messages"""
38         try:
39             segment = self.sock.recv(4096)
40         except OSError:
41             log.warning(
42                 "Reading from fd %d (IMEI %s): %s",
43                 self.sock.fileno(),
44                 self.imei,
45                 e,
46             )
47             return None
48         if not segment:  # Terminal has closed connection
49             log.info(
50                 "EOF reading from fd %d (IMEI %s)",
51                 self.sock.fileno(),
52                 self.imei,
53             )
54             return None
55         when = time()
56         self.buffer += segment
57         msgs = []
58         while True:
59             framestart = self.buffer.find(b"xx")
60             if framestart == -1:  # No frames, return whatever we have
61                 break
62             if framestart > 0:  # Should not happen, report
63                 log.warning(
64                     'Undecodable data "%s" from fd %d (IMEI %s)',
65                     self.buffer[:framestart].hex(),
66                     self.sock.fileno(),
67                     self.imei,
68                 )
69                 self.buffer = self.buffer[framestart:]
70             # At this point, buffer starts with a packet
71             frameend = self.buffer.find(b"\r\n", 4)
72             if frameend == -1:  # Incomplete frame, return what we have
73                 break
74             packet = self.buffer[2:frameend]
75             self.buffer = self.buffer[frameend + 2 :]
76             if proto_of_message(packet) == LOGIN.PROTO:
77                 self.imei = parse_message(packet).imei
78                 log.info(
79                     "LOGIN from fd %d (IMEI %s)", self.sock.fileno(), self.imei
80                 )
81             msgs.append((when, self.addr, packet))
82         return msgs
83
84     def send(self, buffer):
85         try:
86             self.sock.send(b"xx" + buffer + b"\r\n")
87         except OSError as e:
88             log.error(
89                 "Sending to fd %d (IMEI %s): %s",
90                 self.sock.fileno,
91                 self.imei,
92                 e,
93             )
94
95
96 class Clients:
97     def __init__(self):
98         self.by_fd = {}
99         self.by_imei = {}
100
101     def add(self, clntsock, clntaddr):
102         fd = clntsock.fileno()
103         self.by_fd[fd] = Client(clntsock, clntaddr)
104         return fd
105
106     def stop(self, fd):
107         clnt = self.by_fd[fd]
108         log.info("Stop serving fd %d (IMEI %s)", clnt.sock.fileno(), clnt.imei)
109         clnt.close()
110         if clnt.imei:
111             del self.by_imei[clnt.imei]
112         del self.by_fd[fd]
113
114     def recv(self, fd):
115         clnt = self.by_fd[fd]
116         msgs = clnt.recv()
117         if msgs is None:
118             return None
119         result = []
120         for when, peeraddr, packet in msgs:
121             if proto_of_message(packet) == LOGIN.PROTO:  # Could do blindly...
122                 self.by_imei[clnt.imei] = clnt
123             result.append((clnt.imei, when, peeraddr, packet))
124         return result
125
126     def response(self, resp):
127         if resp.imei in self.by_imei:
128             self.by_imei[resp.imei].send(resp.packet)
129         else:
130             log.info("Not connected (IMEI %s)", resp.imei)
131
132
133 def runserver(conf):
134     zctx = zmq.Context()
135     zpub = zctx.socket(zmq.PUB)
136     zpub.bind(conf.get("collector", "publishurl"))
137     zpull = zctx.socket(zmq.PULL)
138     zpull.bind(conf.get("collector", "listenurl"))
139     tcpl = socket(AF_INET6, SOCK_STREAM)
140     tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
141     tcpl.bind(("", conf.getint("collector", "port")))
142     tcpl.listen(5)
143     tcpfd = tcpl.fileno()
144     poller = zmq.Poller()
145     poller.register(zpull, flags=zmq.POLLIN)
146     poller.register(tcpfd, flags=zmq.POLLIN)
147     clients = Clients()
148     try:
149         while True:
150             tosend = []
151             topoll = []
152             tostop = []
153             events = poller.poll(1000)
154             for sk, fl in events:
155                 if sk is zpull:
156                     while True:
157                         try:
158                             msg = zpull.recv(zmq.NOBLOCK)
159                             tosend.append(Resp(msg))
160                         except zmq.Again:
161                             break
162                 elif sk == tcpfd:
163                     clntsock, clntaddr = tcpl.accept()
164                     topoll.append((clntsock, clntaddr))
165                 elif fl & zmq.POLLIN:
166                     received = clients.recv(sk)
167                     if received is None:
168                         log.debug(
169                             "Terminal gone from fd %d (IMEI %s)", sk, imei
170                         )
171                         tostop.append(sk)
172                     else:
173                         for imei, when, peeraddr, packet in received:
174                             proto = proto_of_message(packet)
175                             zpub.send(
176                                 Bcast(
177                                     proto=proto,
178                                     imei=imei,
179                                     when=when,
180                                     peeraddr=peeraddr,
181                                     packet=packet,
182                                 ).packed
183                             )
184                             if proto == HIBERNATION.PROTO:
185                                 log.debug(
186                                     "HIBERNATION from fd %d (IMEI %s)",
187                                     sk,
188                                     imei,
189                                 )
190                                 tostop.append(sk)
191                             respmsg = inline_response(packet)
192                             if respmsg is not None:
193                                 clients.response(
194                                     Resp(imei=imei, packet=respmsg)
195                                 )
196                 else:
197                     log.debug("Stray event: %s on socket %s", fl, sk)
198             # poll queue consumed, make changes now
199             for fd in tostop:
200                 poller.unregister(fd)
201                 clients.stop(fd)
202             for zmsg in tosend:
203                 log.debug("Sending to the client: %s", zmsg)
204                 clients.response(zmsg)
205             for clntsock, clntaddr in topoll:
206                 fd = clients.add(clntsock, clntaddr)
207                 poller.register(fd, flags=zmq.POLLIN)
208     except KeyboardInterrupt:
209         pass
210
211
212 if __name__.endswith("__main__"):
213     runserver(common.init(log))