]> www.average.org Git - loctrkd.git/blob - gps303/collector.py
08178f272bd57d4d1cd102cbd6c4de45c66fb6fc
[loctrkd.git] / gps303 / collector.py
1 """ TCP server that communicates with terminals """
2
3 from logging import getLogger
4 from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
5 from time import time
6 from struct import pack
7 import zmq
8
9 from . import common
10 from .gps303proto import HIBERNATION, LOGIN, parse_message, proto_of_message
11
12 log = getLogger("gps303/collector")
13
14
15 class Bcast:
16     """Zmq message to broadcast what was received from the terminal"""
17
18     def __init__(self, imei, msg):
19         self.as_bytes = (
20             pack("B", proto_of_message(msg))
21             + ("0000000000000000" if imei is None else imei).encode()
22             + msg
23         )
24
25
26 class Resp:
27     """Zmq message received from a third party to send to the terminal"""
28
29     def __init__(self, *args, **kwargs):
30         if not kwargs and len(args) == 1 and isinstance(args[0], bytes):
31             self.imei = msg[:16].decode()
32             self.payload = msg[16:]
33         elif len(args) == 0:
34             self.imei = kwargs["imei"]
35             self.payload = kwargs["payload"]
36
37
38 class Client:
39     """Connected socket to the terminal plus buffer and metadata"""
40
41     def __init__(self, sock, addr):
42         self.sock = sock
43         self.addr = addr
44         self.buffer = b""
45         self.imei = None
46
47     def close(self):
48         log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei)
49         self.sock.close()
50         self.buffer = b""
51         self.imei = None
52
53     def recv(self):
54         """Read from the socket and parse complete messages"""
55         try:
56             segment = self.sock.recv(4096)
57         except OSError:
58             log.warning(
59                 "Reading from fd %d (IMEI %s): %s",
60                 self.sock.fileno(),
61                 self.imei,
62                 e,
63             )
64             return None
65         if not segment:  # Terminal has closed connection
66             log.info(
67                 "EOF reading from fd %d (IMEI %s)",
68                 self.sock.fileno(),
69                 self.imei,
70             )
71             return None
72         when = time()
73         self.buffer += segment
74         msgs = []
75         while True:
76             framestart = self.buffer.find(b"xx")
77             if framestart == -1:  # No frames, return whatever we have
78                 break
79             if framestart > 0:  # Should not happen, report
80                 log.warning(
81                     'Undecodable data "%s" from fd %d (IMEI %s)',
82                     self.buffer[:framestart].hex(),
83                     self.sock.fileno(),
84                     self.imei,
85                 )
86                 self.buffer = self.buffer[framestart:]
87             # At this point, buffer starts with a packet
88             frameend = self.buffer.find(b"\r\n", 4)
89             if frameend == -1:  # Incomplete frame, return what we have
90                 break
91             packet = self.buffer[2:frameend]
92             self.buffer = self.buffer[frameend + 2 :]
93             if proto_of_message(packet) == LOGIN.PROTO:
94                 self.imei = parse_message(packet).imei
95                 log.info(
96                     "LOGIN from fd %d (IMEI %s)", self.sock.fileno(), self.imei
97                 )
98             msgs.append(packet)
99         return msgs
100
101     def send(self, buffer):
102         try:
103             self.sock.send(b"xx" + buffer + b"\r\n")
104         except OSError as e:
105             log.error(
106                 "Sending to fd %d (IMEI %s): %s",
107                 self.sock.fileno,
108                 self.imei,
109                 e,
110             )
111
112
113 class Clients:
114     def __init__(self):
115         self.by_fd = {}
116         self.by_imei = {}
117
118     def add(self, clntsock, clntaddr):
119         fd = clntsock.fileno()
120         self.by_fd[fd] = Client(clntsock, clntaddr)
121         return fd
122
123     def stop(self, fd):
124         clnt = self.by_fd[fd]
125         log.info("Stop serving fd %d (IMEI %s)", clnt.sock.fileno(), clnt.imei)
126         clnt.close()
127         if clnt.imei:
128             del self.by_imei[clnt.imei]
129         del self.by_fd[fd]
130
131     def recv(self, fd):
132         clnt = self.by_fd[fd]
133         msgs = clnt.recv()
134         if msgs is None:
135             return None
136         result = []
137         for msg in msgs:
138             if proto_of_message(msg) == LOGIN.PROTO:  # Could do blindly...
139                 self.by_imei[clnt.imei] = clnt
140             result.append((clnt.imei, msg))
141         return result
142
143     def response(self, resp):
144         if resp.imei in self.by_imei:
145             self.by_imei[resp.imei].send(resp.payload)
146
147
148 def runserver(conf):
149     zctx = zmq.Context()
150     zpub = zctx.socket(zmq.PUB)
151     zpub.bind(conf.get("collector", "publishurl"))
152     zsub = zctx.socket(zmq.SUB)
153     zsub.connect(conf.get("collector", "listenurl"))
154     tcpl = socket(AF_INET, SOCK_STREAM)
155     tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
156     tcpl.bind(("", conf.getint("collector", "port")))
157     tcpl.listen(5)
158     tcpfd = tcpl.fileno()
159     poller = zmq.Poller()
160     poller.register(zsub, flags=zmq.POLLIN)
161     poller.register(tcpfd, flags=zmq.POLLIN)
162     clients = Clients()
163     try:
164         while True:
165             tosend = []
166             topoll = []
167             tostop = []
168             events = poller.poll(10)
169             for sk, fl in events:
170                 if sk is zsub:
171                     while True:
172                         try:
173                             msg = zsub.recv(zmq.NOBLOCK)
174                             tosend.append(Resp(msg))
175                         except zmq.Again:
176                             break
177                 elif sk == tcpfd:
178                     clntsock, clntaddr = tcpl.accept()
179                     topoll.append((clntsock, clntaddr))
180                 else:
181                     for imei, msg in clients.recv(sk):
182                         zpub.send(Bcast(imei, msg).as_bytes)
183                         if (
184                             msg is None
185                             or proto_of_message(msg) == HIBERNATION.PROTO
186                         ):
187                             log.debug(
188                                 "HIBERNATION from fd %d (IMEI %s)", sk, imei
189                             )
190                             tostop.append(sk)
191                         elif proto_of_message(msg) == LOGIN.PROTO:
192                             clients.response(Resp(imei=imei, payload=LOGIN.response()))
193             # poll queue consumed, make changes now
194             for fd in tostop:
195                 poller.unregister(fd)
196                 clients.stop(fd)
197             for zmsg in tosend:
198                 clients.response(zmsg)
199             for clntsock, clntaddr in topoll:
200                 fd = clients.add(clntsock, clntaddr)
201                 poller.register(fd)
202     except KeyboardInterrupt:
203         pass
204
205
206 if __name__.endswith("__main__"):
207     runserver(common.init(log))