]> www.average.org Git - loctrkd.git/blob - gps303/collector.py
Initial version of zmq based architecture
[loctrkd.git] / gps303 / collector.py
1 from getopt import getopt
2 from logging import getLogger, StreamHandler, DEBUG, INFO
3 from logging.handlers import SysLogHandler
4 from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
5 from time import time
6 import sys
7 import zmq
8
9 from .config import readconfig
10 from .GT06mod import handle_packet, make_response, LOGIN, set_config
11
12 CONF = "/etc/gps303.conf"
13
14 log = getLogger("gps303/collector")
15
16
17 class Bcast:
18     def __init__(self, imei, msg):
19         self.as_bytes = imei.encode() + msg.encode()
20
21
22 class Zmsg:
23     def __init__(self, msg):
24         self.imei = msg[:16].decode()
25         self.payload = msg[16:]
26
27
28 class Client:
29     def __init__(self, clntsock, clntaddr):
30         self.clntsock = clntsock
31         self.clntaddr = clntaddr
32         self.buffer = b""
33         self.imei = None
34
35     def close(self):
36         self.clntsock.close()
37
38     def recv(self):
39         packet = self.clntsock.recv(4096)
40         if not packet:
41             return None
42         when = time()
43         self.buffer += packet
44         # implement framing properly
45         msg = handle_packet(packet, self.clntaddr, when)
46         self.buffer = self.buffer[len(packet):]
47         if isinstance(msg, LOGIN):
48             self.imei = msg.imei
49         return msg
50
51     def send(self, buffer):
52         self.clntsock.send(buffer)
53
54
55 class Clients:
56     def __init__(self):
57         self.by_fd = {}
58         self.by_imei = {}
59
60     def add(self, clntsock, clntaddr):
61         fd = clntsock.fileno()
62         self.by_fd[fd] = Client(clntsock, clntaddr)
63         return fd
64
65     def stop(self, fd):
66         clnt = by_fd[fd]
67         clnt.close()
68         if clnt.imei:
69             del self.by_imei[clnt.imei]
70         del self.by_fd[fd]
71
72     def recv(self, fd):
73         clnt = by_fd[fd]
74         msg = clnt.recv()
75         if isinstance(msg, LOGIN):
76             self.by_imei[clnt.imei] = clnt
77         return clnt.imei, msg
78
79     def response(self, zmsg):
80         if zmsg.imei in self.by_imei:
81             clnt = self.by_imei[zmsg.imei].send(zmsg.payload)
82
83
84 def runserver(opts, conf):
85     zctx = zmq.Context()
86     zpub = zctx.socket(zmq.PUB)
87     zpub.bind(conf.get("collector", "publishurl"))
88     zsub = zctx.socket(zmq.SUB)
89     zsub.connect(conf.get("collector", "listenurl"))
90     tcpl = socket(AF_INET, SOCK_STREAM)
91     tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
92     tcpl.bind(("", conf.getint("collector", "port")))
93     tcpl.listen(5)
94     tcpfd = tcpl.fileno()
95     poller = zmq.Poller()
96     poller.register(zsub, flags=zmq.POLLIN)
97     poller.register(tcpfd, flags=zmq.POLLIN)
98     clients = Clients()
99     try:
100         while True:
101             tosend = []
102             topoll = []
103             tostop = []
104             events = poller.poll(10)
105             for sk, fl in events:
106                 if sk is zsub:
107                     while True:
108                         try:
109                             msg = zsub.recv(zmq.NOBLOCK)
110                             tosend.append(Zmsg(msg))
111                         except zmq.Again:
112                             break
113                 elif sk == tcpfd:
114                     clntsock, clntaddr = ctlsock.accept()
115                     topoll.append((clntsock, clntaddr))
116                 else:
117                     imei, msg = clients.recv(sk)
118                     zpub.send(Bcast(imei, msg).as_bytes)
119                     if msg is None or isinstance(msg, HIBERNATION):
120                         tostop.append(sk)
121             # poll queue consumed, make changes now
122             for fd in tostop:
123                 clients.stop(fd)
124                 pollset.unregister(fd)
125             for zmsg in tosend:
126                 clients.response(zmsg)
127             for clntsock, clntaddr in topoll:
128                 fd = clients.add(clntsock, clntaddr)
129                 pollset.register(fd)
130     except KeyboardInterrupt:
131         pass
132
133
134 if __name__.endswith("__main__"):
135     opts, _ = getopt(sys.argv[1:], "c:d")
136     opts = dict(opts)
137     conf = readconfig(opts["-c"] if "-c" in opts else CONF)
138     if sys.stdout.isatty():
139         log.addHandler(StreamHandler(sys.stderr))
140     else:
141         log.addHandler(SysLogHandler(address="/dev/log"))
142     log.setLevel(DEBUG if "-d" in opts else INFO)
143     log.info("starting with options: %s", opts)
144     runserver(opts, conf)