]> www.average.org Git - loctrkd.git/blob - gps303/collector.py
385285494d7f0782bfc3c8ab20eff1ece22bbf7e
[loctrkd.git] / gps303 / collector.py
1 """ TCP server that communicates with terminals """
2
3 from logging import getLogger
4 from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
5 from time import time
6 from struct import pack
7 import zmq
8
9 from . import common
10 from .gps303proto import (
11     HIBERNATION,
12     LOGIN,
13     inline_response,
14     parse_message,
15     proto_of_message,
16 )
17
18 log = getLogger("gps303/collector")
19
20
21 class Bcast:
22     """Zmq message to broadcast what was received from the terminal"""
23
24     def __init__(self, imei, msg):
25         self.as_bytes = (
26             pack("B", proto_of_message(msg))
27             + ("0000000000000000" if imei is None else imei).encode()
28             + msg
29         )
30
31
32 class Resp:
33     """Zmq message received from a third party to send to the terminal"""
34
35     def __init__(self, *args, **kwargs):
36         if not kwargs and len(args) == 1 and isinstance(args[0], bytes):
37             self.imei = msg[:16].decode()
38             self.payload = msg[16:]
39         elif len(args) == 0:
40             self.imei = kwargs["imei"]
41             self.payload = kwargs["payload"]
42
43
44 class Client:
45     """Connected socket to the terminal plus buffer and metadata"""
46
47     def __init__(self, sock, addr):
48         self.sock = sock
49         self.addr = addr
50         self.buffer = b""
51         self.imei = None
52
53     def close(self):
54         log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei)
55         self.sock.close()
56         self.buffer = b""
57         self.imei = None
58
59     def recv(self):
60         """Read from the socket and parse complete messages"""
61         try:
62             segment = self.sock.recv(4096)
63         except OSError:
64             log.warning(
65                 "Reading from fd %d (IMEI %s): %s",
66                 self.sock.fileno(),
67                 self.imei,
68                 e,
69             )
70             return None
71         if not segment:  # Terminal has closed connection
72             log.info(
73                 "EOF reading from fd %d (IMEI %s)",
74                 self.sock.fileno(),
75                 self.imei,
76             )
77             return None
78         when = time()
79         self.buffer += segment
80         msgs = []
81         while True:
82             framestart = self.buffer.find(b"xx")
83             if framestart == -1:  # No frames, return whatever we have
84                 break
85             if framestart > 0:  # Should not happen, report
86                 log.warning(
87                     'Undecodable data "%s" from fd %d (IMEI %s)',
88                     self.buffer[:framestart].hex(),
89                     self.sock.fileno(),
90                     self.imei,
91                 )
92                 self.buffer = self.buffer[framestart:]
93             # At this point, buffer starts with a packet
94             frameend = self.buffer.find(b"\r\n", 4)
95             if frameend == -1:  # Incomplete frame, return what we have
96                 break
97             packet = self.buffer[2:frameend]
98             self.buffer = self.buffer[frameend + 2 :]
99             if proto_of_message(packet) == LOGIN.PROTO:
100                 self.imei = parse_message(packet).imei
101                 log.info(
102                     "LOGIN from fd %d (IMEI %s)", self.sock.fileno(), self.imei
103                 )
104             msgs.append(packet)
105         return msgs
106
107     def send(self, buffer):
108         try:
109             self.sock.send(b"xx" + buffer + b"\r\n")
110         except OSError as e:
111             log.error(
112                 "Sending to fd %d (IMEI %s): %s",
113                 self.sock.fileno,
114                 self.imei,
115                 e,
116             )
117
118
119 class Clients:
120     def __init__(self):
121         self.by_fd = {}
122         self.by_imei = {}
123
124     def add(self, clntsock, clntaddr):
125         fd = clntsock.fileno()
126         self.by_fd[fd] = Client(clntsock, clntaddr)
127         return fd
128
129     def stop(self, fd):
130         clnt = self.by_fd[fd]
131         log.info("Stop serving fd %d (IMEI %s)", clnt.sock.fileno(), clnt.imei)
132         clnt.close()
133         if clnt.imei:
134             del self.by_imei[clnt.imei]
135         del self.by_fd[fd]
136
137     def recv(self, fd):
138         clnt = self.by_fd[fd]
139         msgs = clnt.recv()
140         if msgs is None:
141             return None
142         result = []
143         for msg in msgs:
144             if proto_of_message(msg) == LOGIN.PROTO:  # Could do blindly...
145                 self.by_imei[clnt.imei] = clnt
146             result.append((clnt.imei, msg))
147         return result
148
149     def response(self, resp):
150         if resp.imei in self.by_imei:
151             self.by_imei[resp.imei].send(resp.payload)
152
153
154 def runserver(conf):
155     zctx = zmq.Context()
156     zpub = zctx.socket(zmq.PUB)
157     zpub.bind(conf.get("collector", "publishurl"))
158     zsub = zctx.socket(zmq.SUB)
159     zsub.connect(conf.get("collector", "listenurl"))
160     tcpl = socket(AF_INET, SOCK_STREAM)
161     tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
162     tcpl.bind(("", conf.getint("collector", "port")))
163     tcpl.listen(5)
164     tcpfd = tcpl.fileno()
165     poller = zmq.Poller()
166     poller.register(zsub, flags=zmq.POLLIN)
167     poller.register(tcpfd, flags=zmq.POLLIN)
168     clients = Clients()
169     try:
170         while True:
171             tosend = []
172             topoll = []
173             tostop = []
174             events = poller.poll(10)
175             for sk, fl in events:
176                 if sk is zsub:
177                     while True:
178                         try:
179                             msg = zsub.recv(zmq.NOBLOCK)
180                             tosend.append(Resp(msg))
181                         except zmq.Again:
182                             break
183                 elif sk == tcpfd:
184                     clntsock, clntaddr = tcpl.accept()
185                     topoll.append((clntsock, clntaddr))
186                 else:
187                     received = clients.recv(sk)
188                     if received is None:
189                         log.debug(
190                             "Terminal gone from fd %d (IMEI %s)", sk, imei
191                         )
192                         tostop.append(sk)
193                     else:
194                         for imei, msg in received:
195                             zpub.send(Bcast(imei, msg).as_bytes)
196                             if proto_of_message(msg) == HIBERNATION.PROTO:
197                                 log.debug(
198                                     "HIBERNATION from fd %d (IMEI %s)",
199                                     sk,
200                                     imei,
201                                 )
202                                 tostop.append(sk)
203                             respmsg = inline_response(msg)
204                             if respmsg is not None:
205                                 clients.response(
206                                     Resp(imei=imei, payload=respmsg)
207                                 )
208             # poll queue consumed, make changes now
209             for fd in tostop:
210                 poller.unregister(fd)
211                 clients.stop(fd)
212             for zmsg in tosend:
213                 clients.response(zmsg)
214             for clntsock, clntaddr in topoll:
215                 fd = clients.add(clntsock, clntaddr)
216                 poller.register(fd)
217     except KeyboardInterrupt:
218         pass
219
220
221 if __name__.endswith("__main__"):
222     runserver(common.init(log))