1 """ TCP server that communicates with terminals """
3 from getopt import getopt
4 from logging import getLogger, StreamHandler, DEBUG, INFO
5 from logging.handlers import SysLogHandler
6 from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
11 from .config import readconfig
12 from .gps303proto import handle_packet, make_response, LOGIN, set_config
14 CONF = "/etc/gps303.conf"
16 log = getLogger("gps303/collector")
20 """Zmq message to broadcast what was received from the terminal"""
21 def __init__(self, imei, msg):
22 self.as_bytes = imei.encode() + msg.encode()
26 """Zmq message received from a third party to send to the terminal"""
27 def __init__(self, msg):
28 self.imei = msg[:16].decode()
29 self.payload = msg[16:]
33 """Connected socket to the terminal plus buffer and metadata"""
34 def __init__(self, sock, addr):
46 segment = self.sock.recv(4096)
50 self.buffer += segment
51 # implement framing properly
52 msg = handle_packet(packet, self.addr, when)
53 self.buffer = self.buffer[len(packet):]
54 if isinstance(msg, LOGIN):
58 def send(self, buffer):
59 self.sock.send(buffer)
67 def add(self, clntsock, clntaddr):
68 fd = clntsock.fileno()
69 self.by_fd[fd] = Client(clntsock, clntaddr)
76 del self.by_imei[clnt.imei]
82 if isinstance(msg, LOGIN):
83 self.by_imei[clnt.imei] = clnt
86 def response(self, zmsg):
87 if zmsg.imei in self.by_imei:
88 clnt = self.by_imei[zmsg.imei].send(zmsg.payload)
91 def runserver(opts, conf):
93 zpub = zctx.socket(zmq.PUB)
94 zpub.bind(conf.get("collector", "publishurl"))
95 zsub = zctx.socket(zmq.SUB)
96 zsub.connect(conf.get("collector", "listenurl"))
97 tcpl = socket(AF_INET, SOCK_STREAM)
98 tcpl.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
99 tcpl.bind(("", conf.getint("collector", "port")))
101 tcpfd = tcpl.fileno()
102 poller = zmq.Poller()
103 poller.register(zsub, flags=zmq.POLLIN)
104 poller.register(tcpfd, flags=zmq.POLLIN)
111 events = poller.poll(10)
112 for sk, fl in events:
116 msg = zsub.recv(zmq.NOBLOCK)
117 tosend.append(Resp(msg))
121 clntsock, clntaddr = ctlsock.accept()
122 topoll.append((clntsock, clntaddr))
124 imei, msg = clients.recv(sk)
125 zpub.send(Bcast(imei, msg).as_bytes)
126 if msg is None or isinstance(msg, HIBERNATION):
128 # poll queue consumed, make changes now
131 pollset.unregister(fd)
133 clients.response(zmsg)
134 for clntsock, clntaddr in topoll:
135 fd = clients.add(clntsock, clntaddr)
137 except KeyboardInterrupt:
141 if __name__.endswith("__main__"):
142 opts, _ = getopt(sys.argv[1:], "c:d")
144 conf = readconfig(opts["-c"] if "-c" in opts else CONF)
145 if sys.stdout.isatty():
146 log.addHandler(StreamHandler(sys.stderr))
148 log.addHandler(SysLogHandler(address="/dev/log"))
149 log.setLevel(DEBUG if "-d" in opts else INFO)
150 log.info("starting with options: %s", opts)
151 runserver(opts, conf)