]> www.average.org Git - loctrkd.git/blob - gps303/collector.py
rename protocol module to "gps303proto"
[loctrkd.git] / gps303 / collector.py
1 """ TCP server that communicates with terminals """
2
3 from getopt import getopt
4 from logging import getLogger, StreamHandler, DEBUG, INFO
5 from logging.handlers import SysLogHandler
6 from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
7 from time import time
8 import sys
9 import zmq
10
11 from .config import readconfig
12 from .gps303proto import handle_packet, make_response, LOGIN, set_config
13
14 CONF = "/etc/gps303.conf"
15
16 log = getLogger("gps303/collector")
17
18
19 class Bcast:
20     """Zmq message to broadcast what was received from the terminal"""
21     def __init__(self, imei, msg):
22         self.as_bytes = imei.encode() + msg.encode()
23
24
25 class Resp:
26     """Zmq message received from a third party to send to the terminal"""
27     def __init__(self, msg):
28         self.imei = msg[:16].decode()
29         self.payload = msg[16:]
30
31
32 class Client:
33     """Connected socket to the terminal plus buffer and metadata"""
34     def __init__(self, sock, addr):
35         self.sock = sock
36         self.addr = addr
37         self.buffer = b""
38         self.imei = None
39
40     def close(self):
41         self.sock.close()
42         self.buffer = b""
43         self.imei = None
44
45     def recv(self):
46         segment = self.sock.recv(4096)
47         if not segment:
48             return None
49         when = time()
50         self.buffer += segment
51         # implement framing properly
52         msg = handle_packet(packet, self.addr, when)
53         self.buffer = self.buffer[len(packet):]
54         if isinstance(msg, LOGIN):
55             self.imei = msg.imei
56         return msg
57
58     def send(self, buffer):
59         self.sock.send(buffer)
60
61
62 class Clients:
63     def __init__(self):
64         self.by_fd = {}
65         self.by_imei = {}
66
67     def add(self, clntsock, clntaddr):
68         fd = clntsock.fileno()
69         self.by_fd[fd] = Client(clntsock, clntaddr)
70         return fd
71
72     def stop(self, fd):
73         clnt = by_fd[fd]
74         clnt.close()
75         if clnt.imei:
76             del self.by_imei[clnt.imei]
77         del self.by_fd[fd]
78
79     def recv(self, fd):
80         clnt = by_fd[fd]
81         msg = clnt.recv()
82         if isinstance(msg, LOGIN):
83             self.by_imei[clnt.imei] = clnt
84         return clnt.imei, msg
85
86     def response(self, zmsg):
87         if zmsg.imei in self.by_imei:
88             clnt = self.by_imei[zmsg.imei].send(zmsg.payload)
89
90
91 def runserver(opts, conf):
92     zctx = zmq.Context()
93     zpub = zctx.socket(zmq.PUB)
94     zpub.bind(conf.get("collector", "publishurl"))
95     zsub = zctx.socket(zmq.SUB)
96     zsub.connect(conf.get("collector", "listenurl"))
97     tcpl = socket(AF_INET, SOCK_STREAM)
98     tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
99     tcpl.bind(("", conf.getint("collector", "port")))
100     tcpl.listen(5)
101     tcpfd = tcpl.fileno()
102     poller = zmq.Poller()
103     poller.register(zsub, flags=zmq.POLLIN)
104     poller.register(tcpfd, flags=zmq.POLLIN)
105     clients = Clients()
106     try:
107         while True:
108             tosend = []
109             topoll = []
110             tostop = []
111             events = poller.poll(10)
112             for sk, fl in events:
113                 if sk is zsub:
114                     while True:
115                         try:
116                             msg = zsub.recv(zmq.NOBLOCK)
117                             tosend.append(Resp(msg))
118                         except zmq.Again:
119                             break
120                 elif sk == tcpfd:
121                     clntsock, clntaddr = ctlsock.accept()
122                     topoll.append((clntsock, clntaddr))
123                 else:
124                     imei, msg = clients.recv(sk)
125                     zpub.send(Bcast(imei, msg).as_bytes)
126                     if msg is None or isinstance(msg, HIBERNATION):
127                         tostop.append(sk)
128             # poll queue consumed, make changes now
129             for fd in tostop:
130                 clients.stop(fd)
131                 pollset.unregister(fd)
132             for zmsg in tosend:
133                 clients.response(zmsg)
134             for clntsock, clntaddr in topoll:
135                 fd = clients.add(clntsock, clntaddr)
136                 pollset.register(fd)
137     except KeyboardInterrupt:
138         pass
139
140
141 if __name__.endswith("__main__"):
142     opts, _ = getopt(sys.argv[1:], "c:d")
143     opts = dict(opts)
144     conf = readconfig(opts["-c"] if "-c" in opts else CONF)
145     if sys.stdout.isatty():
146         log.addHandler(StreamHandler(sys.stderr))
147     else:
148         log.addHandler(SysLogHandler(address="/dev/log"))
149     log.setLevel(DEBUG if "-d" in opts else INFO)
150     log.info("starting with options: %s", opts)
151     runserver(opts, conf)