]> www.average.org Git - loctrkd.git/blob - gps303/collector.py
full encoder/decoder for zmq messages
[loctrkd.git] / gps303 / collector.py
1 """ TCP server that communicates with terminals """
2
3 from logging import getLogger
4 from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
5 from time import time
6 from struct import pack
7 import zmq
8
9 from . import common
10 from .gps303proto import (
11     HIBERNATION,
12     LOGIN,
13     inline_response,
14     parse_message,
15     proto_of_message,
16 )
17 from .zmsg import Bcast, Resp
18
19 log = getLogger("gps303/collector")
20
21
22 class Client:
23     """Connected socket to the terminal plus buffer and metadata"""
24
25     def __init__(self, sock, addr):
26         self.sock = sock
27         self.addr = addr
28         self.buffer = b""
29         self.imei = None
30
31     def close(self):
32         log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei)
33         self.sock.close()
34         self.buffer = b""
35         self.imei = None
36
37     def recv(self):
38         """Read from the socket and parse complete messages"""
39         try:
40             segment = self.sock.recv(4096)
41         except OSError:
42             log.warning(
43                 "Reading from fd %d (IMEI %s): %s",
44                 self.sock.fileno(),
45                 self.imei,
46                 e,
47             )
48             return None
49         if not segment:  # Terminal has closed connection
50             log.info(
51                 "EOF reading from fd %d (IMEI %s)",
52                 self.sock.fileno(),
53                 self.imei,
54             )
55             return None
56         when = time()
57         self.buffer += segment
58         msgs = []
59         while True:
60             framestart = self.buffer.find(b"xx")
61             if framestart == -1:  # No frames, return whatever we have
62                 break
63             if framestart > 0:  # Should not happen, report
64                 log.warning(
65                     'Undecodable data "%s" from fd %d (IMEI %s)',
66                     self.buffer[:framestart].hex(),
67                     self.sock.fileno(),
68                     self.imei,
69                 )
70                 self.buffer = self.buffer[framestart:]
71             # At this point, buffer starts with a packet
72             frameend = self.buffer.find(b"\r\n", 4)
73             if frameend == -1:  # Incomplete frame, return what we have
74                 break
75             packet = self.buffer[2:frameend]
76             self.buffer = self.buffer[frameend + 2 :]
77             if proto_of_message(packet) == LOGIN.PROTO:
78                 self.imei = parse_message(packet).imei
79                 log.info(
80                     "LOGIN from fd %d (IMEI %s)", self.sock.fileno(), self.imei
81                 )
82             msgs.append((when, self.addr, packet))
83         return msgs
84
85     def send(self, buffer):
86         try:
87             self.sock.send(b"xx" + buffer + b"\r\n")
88         except OSError as e:
89             log.error(
90                 "Sending to fd %d (IMEI %s): %s",
91                 self.sock.fileno,
92                 self.imei,
93                 e,
94             )
95
96
97 class Clients:
98     def __init__(self):
99         self.by_fd = {}
100         self.by_imei = {}
101
102     def add(self, clntsock, clntaddr):
103         fd = clntsock.fileno()
104         self.by_fd[fd] = Client(clntsock, clntaddr)
105         return fd
106
107     def stop(self, fd):
108         clnt = self.by_fd[fd]
109         log.info("Stop serving fd %d (IMEI %s)", clnt.sock.fileno(), clnt.imei)
110         clnt.close()
111         if clnt.imei:
112             del self.by_imei[clnt.imei]
113         del self.by_fd[fd]
114
115     def recv(self, fd):
116         clnt = self.by_fd[fd]
117         msgs = clnt.recv()
118         if msgs is None:
119             return None
120         result = []
121         for when, peeraddr, packet in msgs:
122             if proto_of_message(packet) == LOGIN.PROTO:  # Could do blindly...
123                 self.by_imei[clnt.imei] = clnt
124             result.append((clnt.imei, when, peeraddr, packet))
125         return result
126
127     def response(self, resp):
128         if resp.imei in self.by_imei:
129             self.by_imei[resp.imei].send(resp.packet)
130
131
132 def runserver(conf):
133     zctx = zmq.Context()
134     zpub = zctx.socket(zmq.PUB)
135     zpub.bind(conf.get("collector", "publishurl"))
136     zsub = zctx.socket(zmq.SUB)
137     zsub.connect(conf.get("collector", "listenurl"))
138     tcpl = socket(AF_INET, SOCK_STREAM)
139     tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
140     tcpl.bind(("", conf.getint("collector", "port")))
141     tcpl.listen(5)
142     tcpfd = tcpl.fileno()
143     poller = zmq.Poller()
144     poller.register(zsub, flags=zmq.POLLIN)
145     poller.register(tcpfd, flags=zmq.POLLIN)
146     clients = Clients()
147     try:
148         while True:
149             tosend = []
150             topoll = []
151             tostop = []
152             events = poller.poll(10)
153             for sk, fl in events:
154                 if sk is zsub:
155                     while True:
156                         try:
157                             msg = zsub.recv(zmq.NOBLOCK)
158                             tosend.append(Resp(msg))
159                         except zmq.Again:
160                             break
161                 elif sk == tcpfd:
162                     clntsock, clntaddr = tcpl.accept()
163                     topoll.append((clntsock, clntaddr))
164                 else:
165                     received = clients.recv(sk)
166                     if received is None:
167                         log.debug(
168                             "Terminal gone from fd %d (IMEI %s)", sk, imei
169                         )
170                         tostop.append(sk)
171                     else:
172                         for imei, when, peeraddr, packet in received:
173                             proto = proto_of_message(packet)
174                             zpub.send(
175                                 Bcast(
176                                     proto=proto,
177                                     imei=imei,
178                                     when=when,
179                                     peeraddr=peeraddr,
180                                     packet=packet,
181                                 ).packed
182                             )
183                             if proto == HIBERNATION.PROTO:
184                                 log.debug(
185                                     "HIBERNATION from fd %d (IMEI %s)",
186                                     sk,
187                                     imei,
188                                 )
189                                 tostop.append(sk)
190                             respmsg = inline_response(packet)
191                             if respmsg is not None:
192                                 clients.response(
193                                     Resp(imei=imei, packet=respmsg)
194                                 )
195             # poll queue consumed, make changes now
196             for fd in tostop:
197                 poller.unregister(fd)
198                 clients.stop(fd)
199             for zmsg in tosend:
200                 clients.response(zmsg)
201             for clntsock, clntaddr in topoll:
202                 fd = clients.add(clntsock, clntaddr)
203                 poller.register(fd)
204     except KeyboardInterrupt:
205         pass
206
207
208 if __name__.endswith("__main__"):
209     runserver(common.init(log))