From 7271332d13eadd3e201882288042207fceae50bd Mon Sep 17 00:00:00 2001 From: Eugene Crosser Date: Fri, 27 May 2022 22:51:09 +0200 Subject: [PATCH] typing: make zmsg.py typecheck --- gps303/gps303proto.py | 5 +-- gps303/storage.py | 7 +--- gps303/zmsg.py | 75 ++++++++++++++++++++++++++----------------- 3 files changed, 49 insertions(+), 38 deletions(-) diff --git a/gps303/gps303proto.py b/gps303/gps303proto.py index 8036d65..b70cacf 100755 --- a/gps303/gps303proto.py +++ b/gps303/gps303proto.py @@ -18,6 +18,7 @@ from datetime import datetime, timezone from enum import Enum from inspect import isclass from struct import error, pack, unpack +from typing import Any, Callable, Tuple __all__ = ( "class_by_prefix", @@ -153,8 +154,8 @@ class Respond(Enum): class GPS303Pkt(metaclass=MetaPkt): RESPOND = Respond.NON # Do not send anything back by default PROTO: int - IN_KWARGS = () - OUT_KWARGS = () + IN_KWARGS: Tuple[Tuple[str, Callable, Any], ...] = () + OUT_KWARGS: Tuple[Tuple[str, Callable, Any], ...] = () def __init__(self, *args, **kwargs): """ diff --git a/gps303/storage.py b/gps303/storage.py index 5368efc..14afc49 100644 --- a/gps303/storage.py +++ b/gps303/storage.py @@ -32,14 +32,9 @@ def runserver(conf): datetime.fromtimestamp(zmsg.when).astimezone(tz=timezone.utc), zmsg.packet.hex(), ) - if zmsg.peeraddr is not None: - addr, port = zmsg.peeraddr - peeraddr = str((str(addr), port)) - else: - peeraddr = None stow( is_incoming=zmsg.is_incoming, - peeraddr=peeraddr, + peeraddr=str(zmsg.peeraddr), when=zmsg.when, imei=zmsg.imei, proto=proto_of_message(zmsg.packet), diff --git a/gps303/zmsg.py b/gps303/zmsg.py index 1fe1812..73a4f80 100644 --- a/gps303/zmsg.py +++ b/gps303/zmsg.py @@ -2,36 +2,47 @@ import ipaddress as ip from struct import pack, unpack +from typing import Any, cast, Optional, Tuple, Type, Union __all__ = "Bcast", "Resp", "topic" -def pack_peer(peeraddr): - try: - if peeraddr is None: - saddr = "::" - port = 0 - else: - saddr, port, _x, _y = peeraddr - addr = ip.ip_address(saddr) - except ValueError: +def pack_peer( + peeraddr: Union[None, Tuple[str, int], Tuple[str, int, Any, Any]] +) -> bytes: + if peeraddr is None: + addr: Union[ip.IPv4Address, ip.IPv6Address] = ip.IPv6Address(0) + port = 0 + elif len(peeraddr) == 2: + peeraddr = cast(Tuple[str, int], peeraddr) saddr, port = peeraddr - a4 = ip.ip_address(saddr) - addr = ip.IPv6Address(b"\0\0\0\0\0\0\0\0\0\0\xff\xff" + a4.packed) + addr = ip.ip_address(saddr) + elif len(peeraddr) == 4: + peeraddr = cast(Tuple[str, int, Any, Any], peeraddr) + saddr, port, _x, _y = peeraddr + addr = ip.ip_address(saddr) + if isinstance(addr, ip.IPv4Address): + addr = ip.IPv6Address(b"\0\0\0\0\0\0\0\0\0\0\xff\xff" + addr.packed) return addr.packed + pack("!H", port) -def unpack_peer(buffer): +def unpack_peer( + buffer: bytes, +) -> Tuple[str, int]: a6 = ip.IPv6Address(buffer[:16]) port = unpack("!H", buffer[16:])[0] - addr = a6.ipv4_mapped - if addr is None: - addr = a6 - return (addr, port) + a4 = a6.ipv4_mapped + if a4 is not None: + return (str(a4), port) + elif a6 == ip.IPv6Address("::"): + return ("", 0) + return (str(a6), port) class _Zmsg: - def __init__(self, *args, **kwargs): + KWARGS: Tuple[Tuple[str, Any], ...] + + def __init__(self, *args: Any, **kwargs: Any) -> None: if len(args) == 1: self.decode(args[0]) elif bool(kwargs): @@ -46,7 +57,7 @@ class _Zmsg: + str(kwargs) ) - def __repr__(self): + def __repr__(self) -> str: return "{}({})".format( self.__class__.__name__, ", ".join( @@ -62,24 +73,28 @@ class _Zmsg: ), ) - def __eq__(self, other): - return all( - [getattr(self, k) == getattr(other, k) for k, _ in self.KWARGS] - ) + def __eq__(self, other: object) -> bool: + if isinstance(other, self.__class__): + return all( + [getattr(self, k) == getattr(other, k) for k, _ in self.KWARGS] + ) + return NotImplemented - def decode(self, buffer): + def decode(self, buffer: bytes) -> None: raise NotImplementedError( self.__class__.__name__ + "must implement `decode()` method" ) @property - def packed(self): + def packed(self) -> bytes: raise NotImplementedError( self.__class__.__name__ + "must implement `packed()` property" ) -def topic(proto, is_incoming=True, imei=None): +def topic( + proto: int, is_incoming: bool = True, imei: Optional[str] = None +) -> bytes: return pack("BB", is_incoming, proto) + ( b"" if imei is None else pack("16s", imei.encode()) ) @@ -98,7 +113,7 @@ class Bcast(_Zmsg): ) @property - def packed(self): + def packed(self) -> bytes: return ( pack( "BB16s", @@ -117,10 +132,10 @@ class Bcast(_Zmsg): + self.packet ) - def decode(self, buffer): + def decode(self, buffer: bytes) -> None: self.is_incoming = bool(buffer[0]) self.proto = buffer[1] - self.imei = buffer[2:18].decode() + self.imei: Optional[str] = buffer[2:18].decode() if self.imei == "0000000000000000": self.imei = None self.when = unpack("!d", buffer[18:26])[0] @@ -134,7 +149,7 @@ class Resp(_Zmsg): KWARGS = (("imei", None), ("when", None), ("packet", b"")) @property - def packed(self): + def packed(self) -> bytes: return ( pack( "16s", @@ -150,7 +165,7 @@ class Resp(_Zmsg): + self.packet ) - def decode(self, buffer): + def decode(self, buffer: bytes) -> None: self.imei = buffer[:16].decode() self.when = unpack("!d", buffer[16:24])[0] self.packet = buffer[24:] -- 2.39.2