from configparser import ConfigParser
from logging import getLogger
from os import umask
-from socket import socket, AF_INET6, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
+from socket import (
+ socket,
+ AF_INET6,
+ SOCK_STREAM,
+ SOL_SOCKET,
+ SO_KEEPALIVE,
+ SO_REUSEADDR,
+)
from struct import pack
from time import time
from typing import Dict, List, Optional, Tuple
from . import common
from .gps303proto import (
- HIBERNATION,
- LOGIN,
+ GPS303Conn,
+ is_goodbye_packet,
+ imei_from_packet,
inline_response,
parse_message,
proto_of_message,
log = getLogger("gps303/collector")
+MAXBUFFER: int = 4096
+
class Client:
"""Connected socket to the terminal plus buffer and metadata"""
def __init__(self, sock: socket, addr: Tuple[str, int]) -> None:
self.sock = sock
self.addr = addr
- self.buffer = b""
+ self.stream = GPS303Conn()
self.imei: Optional[str] = None
def close(self) -> None:
log.debug("Closing fd %d (IMEI %s)", self.sock.fileno(), self.imei)
self.sock.close()
- self.buffer = b""
+ rest = self.stream.close()
+ if rest:
+ log.warning("%d bytes in buffer on close: %s", len(rest), rest)
def recv(self) -> Optional[List[Tuple[float, Tuple[str, int], bytes]]]:
"""Read from the socket and parse complete messages"""
try:
- segment = self.sock.recv(4096)
+ segment = self.sock.recv(MAXBUFFER)
except OSError as e:
log.warning(
"Reading from fd %d (IMEI %s): %s",
)
return None
when = time()
- self.buffer += segment
msgs = []
- while True:
- framestart = self.buffer.find(b"xx")
- if framestart == -1: # No frames, return whatever we have
- break
- if framestart > 0: # Should not happen, report
+ for elem in self.stream.recv(segment):
+ if isinstance(elem, bytes):
+ msgs.append((when, self.addr, elem))
+ else:
log.warning(
- 'Undecodable data "%s" from fd %d (IMEI %s)',
- self.buffer[:framestart].hex(),
+ "%s from fd %d (IMEI %s)",
+ elem,
self.sock.fileno(),
self.imei,
)
- self.buffer = self.buffer[framestart:]
- # At this point, buffer starts with a packet
- if len(self.buffer) < 6: # no len and proto - cannot proceed
- break
- exp_end = self.buffer[2] + 3 # Expect '\r\n' here
- frameend = 0
- # Length field can legitimeely be much less than the
- # length of the packet (e.g. WiFi positioning), but
- # it _should not_ be greater. Still sometimes it is.
- # Luckily, not by too much: by maybe two or three bytes?
- # Do this embarrassing hack to avoid accidental match
- # of some binary data in the packet against '\r\n'.
- while True:
- frameend = self.buffer.find(b"\r\n", frameend)
- if frameend >= (exp_end - 3): # Found realistic match
- break
- if frameend == -1: # Incomplete frame, return what we have
- break
- packet = self.buffer[2:frameend]
- self.buffer = self.buffer[frameend + 2 :]
- if proto_of_message(packet) == LOGIN.PROTO:
- self.imei = parse_message(packet).imei
- log.info(
- "LOGIN from fd %d (IMEI %s)", self.sock.fileno(), self.imei
- )
- msgs.append((when, self.addr, packet))
return msgs
def send(self, buffer: bytes) -> None:
try:
- self.sock.send(b"xx" + buffer + b"\r\n")
+ self.sock.send(self.stream.enframe(buffer))
except OSError as e:
log.error(
"Sending to fd %d (IMEI %s): %s",
- self.sock.fileno,
+ self.sock.fileno(),
self.imei,
e,
)
return None
result = []
for when, peeraddr, packet in msgs:
- if proto_of_message(packet) == LOGIN.PROTO: # Could do blindly...
- if clnt.imei:
+ if clnt.imei is None:
+ imei = imei_from_packet(packet)
+ if imei is not None:
+ log.info("LOGIN from fd %d (IMEI %s)", fd, imei)
+ clnt.imei = imei
+ oldclnt = self.by_imei.get(clnt.imei)
+ if oldclnt is not None:
+ log.info(
+ "Orphaning fd %d with the same IMEI",
+ oldclnt.sock.fileno(),
+ )
+ oldclnt.imei = None
self.by_imei[clnt.imei] = clnt
else:
log.warning(
log.info("Not connected (IMEI %s)", resp.imei)
-def runserver(conf: ConfigParser) -> None:
+def runserver(conf: ConfigParser, handle_hibernate: bool = True) -> None:
# Is this https://github.com/zeromq/pyzmq/issues/1627 still not fixed?!
zctx = zmq.Context() # type: ignore
zpub = zctx.socket(zmq.PUB) # type: ignore
break
elif sk == tcpfd:
clntsock, clntaddr = tcpl.accept()
+ clntsock.setsockopt(SOL_SOCKET, SO_KEEPALIVE, 1)
topoll.append((clntsock, clntaddr))
elif fl & zmq.POLLIN:
received = clients.recv(sk)
packet=packet,
).packed
)
- if proto == HIBERNATION.PROTO:
+ if is_goodbye_packet(packet) and handle_hibernate:
log.debug(
- "HIBERNATION from fd %d (IMEI %s)",
+ "Goodbye from fd %d (IMEI %s)",
sk,
imei,
)
else:
log.debug("Stray event: %s on socket %s", fl, sk)
# poll queue consumed, make changes now
- for fd in tostop:
- poller.unregister(fd) # type: ignore
- clients.stop(fd)
for zmsg in tosend:
zpub.send(
Bcast(
)
log.debug("Sending to the client: %s", zmsg)
clients.response(zmsg)
+ for fd in tostop:
+ poller.unregister(fd) # type: ignore
+ clients.stop(fd)
for clntsock, clntaddr in topoll:
fd = clients.add(clntsock, clntaddr)
poller.register(fd, flags=zmq.POLLIN)
except KeyboardInterrupt:
- pass
+ zpub.close()
+ zpull.close()
+ zctx.destroy() # type: ignore
+ tcpl.close()
if __name__.endswith("__main__"):