""" TCP server that communicates with terminals """
from getopt import getopt
-from logging import getLogger, StreamHandler, DEBUG, INFO
+from logging import getLogger
from logging.handlers import SysLogHandler
from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
from time import time
-import sys
+from struct import pack
import zmq
-from .config import readconfig
-from .gps303proto import handle_packet, make_response, LOGIN, set_config
-
-CONF = "/etc/gps303.conf"
+from . import common
+from .gps303proto import HIBERNATION, LOGIN, parse_message, proto_of_message
log = getLogger("gps303/collector")
class Bcast:
"""Zmq message to broadcast what was received from the terminal"""
+
def __init__(self, imei, msg):
- self.as_bytes = imei.encode() + msg.encode()
+ self.as_bytes = (
+ pack("B", proto_of_message(msg))
+ + ("0000000000000000" if imei is None else imei).encode()
+ + msg
+ )
class Resp:
"""Zmq message received from a third party to send to the terminal"""
- def __init__(self, msg):
- self.imei = msg[:16].decode()
- self.payload = msg[16:]
+
+ def __init__(self, *args, **kwargs):
+ if not kwargs and len(args) == 1 and isinstance(args[0], bytes):
+ self.imei = msg[:16].decode()
+ self.payload = msg[16:]
+ elif len(args) == 0:
+ self.imei = kwargs["imei"]
+ self.payload = kwargs["payload"]
class Client:
"""Connected socket to the terminal plus buffer and metadata"""
+
def __init__(self, sock, addr):
self.sock = sock
self.addr = addr
self.imei = None
def close(self):
+ log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei)
self.sock.close()
self.buffer = b""
self.imei = None
def recv(self):
- segment = self.sock.recv(4096)
- if not segment:
+ """Read from the socket and parse complete messages"""
+ try:
+ segment = self.sock.recv(4096)
+ except OSError:
+ log.warning(
+ "Reading from fd %d (IMEI %s): %s",
+ self.sock.fileno(),
+ self.imei,
+ e,
+ )
+ return None
+ if not segment: # Terminal has closed connection
+ log.info(
+ "EOF reading from fd %d (IMEI %s)",
+ self.sock.fileno(),
+ self.imei,
+ )
return None
when = time()
self.buffer += segment
- # implement framing properly
- msg = handle_packet(packet, self.addr, when)
- self.buffer = self.buffer[len(packet):]
- if isinstance(msg, LOGIN):
- self.imei = msg.imei
- return msg
+ msgs = []
+ while True:
+ framestart = self.buffer.find(b"xx")
+ if framestart == -1: # No frames, return whatever we have
+ break
+ if framestart > 0: # Should not happen, report
+ log.warning(
+ 'Undecodable data "%s" from fd %d (IMEI %s)',
+ self.buffer[:framestart].hex(),
+ self.sock.fileno(),
+ self.imei,
+ )
+ self.buffer = self.buffer[framestart:]
+ # At this point, buffer starts with a packet
+ frameend = self.buffer.find(b"\r\n", 4)
+ if frameend == -1: # Incomplete frame, return what we have
+ break
+ packet = self.buffer[2:frameend]
+ self.buffer = self.buffer[frameend + 2 :]
+ if proto_of_message(packet) == LOGIN.PROTO:
+ self.imei = parse_message(packet).imei
+ log.info(
+ "LOGIN from fd %d (IMEI %s)", self.sock.fileno(), self.imei
+ )
+ msgs.append(packet)
+ return msgs
def send(self, buffer):
- self.sock.send(buffer)
+ try:
+ self.sock.send(b"xx" + buffer + b"\r\n")
+ except OSError as e:
+ log.error(
+ "Sending to fd %d (IMEI %s): %s",
+ self.sock.fileno,
+ self.imei,
+ e,
+ )
class Clients:
return fd
def stop(self, fd):
- clnt = by_fd[fd]
+ clnt = self.by_fd[fd]
+ log.info("Stop serving fd %d (IMEI %s)", clnt.sock.fileno(), clnt.imei)
clnt.close()
if clnt.imei:
del self.by_imei[clnt.imei]
del self.by_fd[fd]
def recv(self, fd):
- clnt = by_fd[fd]
- msg = clnt.recv()
- if isinstance(msg, LOGIN):
- self.by_imei[clnt.imei] = clnt
- return clnt.imei, msg
+ clnt = self.by_fd[fd]
+ msgs = clnt.recv()
+ if msgs is None:
+ return None
+ result = []
+ for msg in msgs:
+ if proto_of_message(msg) == LOGIN.PROTO: # Could do blindly...
+ self.by_imei[clnt.imei] = clnt
+ result.append((clnt.imei, msg))
+ return result
- def response(self, zmsg):
- if zmsg.imei in self.by_imei:
- clnt = self.by_imei[zmsg.imei].send(zmsg.payload)
+ def response(self, resp):
+ if resp.imei in self.by_imei:
+ self.by_imei[resp.imei].send(resp.payload)
-def runserver(opts, conf):
+def runserver(conf):
zctx = zmq.Context()
zpub = zctx.socket(zmq.PUB)
zpub.bind(conf.get("collector", "publishurl"))
except zmq.Again:
break
elif sk == tcpfd:
- clntsock, clntaddr = ctlsock.accept()
+ clntsock, clntaddr = tcpl.accept()
topoll.append((clntsock, clntaddr))
else:
- imei, msg = clients.recv(sk)
- zpub.send(Bcast(imei, msg).as_bytes)
- if msg is None or isinstance(msg, HIBERNATION):
- tostop.append(sk)
+ for imei, msg in clients.recv(sk):
+ zpub.send(Bcast(imei, msg).as_bytes)
+ if (
+ msg is None
+ or proto_of_message(msg) == HIBERNATION.PROTO
+ ):
+ log.debug(
+ "HIBERNATION from fd %d (IMEI %s)", sk, imei
+ )
+ tostop.append(sk)
+ elif proto_of_message(msg) == LOGIN.PROTO:
+ clients.response(Resp(imei=imei, payload=LOGIN.response()))
# poll queue consumed, make changes now
for fd in tostop:
+ poller.unregister(fd)
clients.stop(fd)
- pollset.unregister(fd)
for zmsg in tosend:
clients.response(zmsg)
for clntsock, clntaddr in topoll:
fd = clients.add(clntsock, clntaddr)
- pollset.register(fd)
+ poller.register(fd)
except KeyboardInterrupt:
pass
if __name__.endswith("__main__"):
- opts, _ = getopt(sys.argv[1:], "c:d")
- opts = dict(opts)
- conf = readconfig(opts["-c"] if "-c" in opts else CONF)
- if sys.stdout.isatty():
- log.addHandler(StreamHandler(sys.stderr))
- else:
- log.addHandler(SysLogHandler(address="/dev/log"))
- log.setLevel(DEBUG if "-d" in opts else INFO)
- log.info("starting with options: %s", opts)
- runserver(opts, conf)
+ runserver(common.init(log))