]> www.average.org Git - loctrkd.git/commitdiff
typing: make zmsg.py typecheck
authorEugene Crosser <crosser@average.org>
Fri, 27 May 2022 20:51:09 +0000 (22:51 +0200)
committerEugene Crosser <crosser@average.org>
Fri, 27 May 2022 21:18:40 +0000 (23:18 +0200)
gps303/gps303proto.py
gps303/storage.py
gps303/zmsg.py

index 8036d65e92a1a2e789b16b68774817c7a303a13c..b70cacf1eb328571171272657c92c97d0f04b2f1 100755 (executable)
@@ -18,6 +18,7 @@ from datetime import datetime, timezone
 from enum import Enum
 from inspect import isclass
 from struct import error, pack, unpack
 from enum import Enum
 from inspect import isclass
 from struct import error, pack, unpack
+from typing import Any, Callable, Tuple
 
 __all__ = (
     "class_by_prefix",
 
 __all__ = (
     "class_by_prefix",
@@ -153,8 +154,8 @@ class Respond(Enum):
 class GPS303Pkt(metaclass=MetaPkt):
     RESPOND = Respond.NON  # Do not send anything back by default
     PROTO: int
 class GPS303Pkt(metaclass=MetaPkt):
     RESPOND = Respond.NON  # Do not send anything back by default
     PROTO: int
-    IN_KWARGS = ()
-    OUT_KWARGS = ()
+    IN_KWARGS: Tuple[Tuple[str, Callable, Any], ...] = ()
+    OUT_KWARGS: Tuple[Tuple[str, Callable, Any], ...] = ()
 
     def __init__(self, *args, **kwargs):
         """
 
     def __init__(self, *args, **kwargs):
         """
index 5368efc575e9910bae5da767c0d1adcede239624..14afc49e1fde494f5aebcf96bb2582f572fe1de7 100644 (file)
@@ -32,14 +32,9 @@ def runserver(conf):
                 datetime.fromtimestamp(zmsg.when).astimezone(tz=timezone.utc),
                 zmsg.packet.hex(),
             )
                 datetime.fromtimestamp(zmsg.when).astimezone(tz=timezone.utc),
                 zmsg.packet.hex(),
             )
-            if zmsg.peeraddr is not None:
-                addr, port = zmsg.peeraddr
-                peeraddr = str((str(addr), port))
-            else:
-                peeraddr = None
             stow(
                 is_incoming=zmsg.is_incoming,
             stow(
                 is_incoming=zmsg.is_incoming,
-                peeraddr=peeraddr,
+                peeraddr=str(zmsg.peeraddr),
                 when=zmsg.when,
                 imei=zmsg.imei,
                 proto=proto_of_message(zmsg.packet),
                 when=zmsg.when,
                 imei=zmsg.imei,
                 proto=proto_of_message(zmsg.packet),
index 1fe18123c441739a9891b1433ec8bc276b5c876a..73a4f801aa273079540ef9a83441b5d166feecbb 100644 (file)
@@ -2,36 +2,47 @@
 
 import ipaddress as ip
 from struct import pack, unpack
 
 import ipaddress as ip
 from struct import pack, unpack
+from typing import Any, cast, Optional, Tuple, Type, Union
 
 __all__ = "Bcast", "Resp", "topic"
 
 
 
 __all__ = "Bcast", "Resp", "topic"
 
 
-def pack_peer(peeraddr):
-    try:
-        if peeraddr is None:
-            saddr = "::"
-            port = 0
-        else:
-            saddr, port, _x, _y = peeraddr
-        addr = ip.ip_address(saddr)
-    except ValueError:
+def pack_peer(
+    peeraddr: Union[None, Tuple[str, int], Tuple[str, int, Any, Any]]
+) -> bytes:
+    if peeraddr is None:
+        addr: Union[ip.IPv4Address, ip.IPv6Address] = ip.IPv6Address(0)
+        port = 0
+    elif len(peeraddr) == 2:
+        peeraddr = cast(Tuple[str, int], peeraddr)
         saddr, port = peeraddr
         saddr, port = peeraddr
-        a4 = ip.ip_address(saddr)
-        addr = ip.IPv6Address(b"\0\0\0\0\0\0\0\0\0\0\xff\xff" + a4.packed)
+        addr = ip.ip_address(saddr)
+    elif len(peeraddr) == 4:
+        peeraddr = cast(Tuple[str, int, Any, Any], peeraddr)
+        saddr, port, _x, _y = peeraddr
+        addr = ip.ip_address(saddr)
+    if isinstance(addr, ip.IPv4Address):
+        addr = ip.IPv6Address(b"\0\0\0\0\0\0\0\0\0\0\xff\xff" + addr.packed)
     return addr.packed + pack("!H", port)
 
 
     return addr.packed + pack("!H", port)
 
 
-def unpack_peer(buffer):
+def unpack_peer(
+    buffer: bytes,
+) -> Tuple[str, int]:
     a6 = ip.IPv6Address(buffer[:16])
     port = unpack("!H", buffer[16:])[0]
     a6 = ip.IPv6Address(buffer[:16])
     port = unpack("!H", buffer[16:])[0]
-    addr = a6.ipv4_mapped
-    if addr is None:
-        addr = a6
-    return (addr, port)
+    a4 = a6.ipv4_mapped
+    if a4 is not None:
+        return (str(a4), port)
+    elif a6 == ip.IPv6Address("::"):
+        return ("", 0)
+    return (str(a6), port)
 
 
 class _Zmsg:
 
 
 class _Zmsg:
-    def __init__(self, *args, **kwargs):
+    KWARGS: Tuple[Tuple[str, Any], ...]
+
+    def __init__(self, *args: Any, **kwargs: Any) -> None:
         if len(args) == 1:
             self.decode(args[0])
         elif bool(kwargs):
         if len(args) == 1:
             self.decode(args[0])
         elif bool(kwargs):
@@ -46,7 +57,7 @@ class _Zmsg:
                 + str(kwargs)
             )
 
                 + str(kwargs)
             )
 
-    def __repr__(self):
+    def __repr__(self) -> str:
         return "{}({})".format(
             self.__class__.__name__,
             ", ".join(
         return "{}({})".format(
             self.__class__.__name__,
             ", ".join(
@@ -62,24 +73,28 @@ class _Zmsg:
             ),
         )
 
             ),
         )
 
-    def __eq__(self, other):
-        return all(
-            [getattr(self, k) == getattr(other, k) for k, _ in self.KWARGS]
-        )
+    def __eq__(self, other: object) -> bool:
+        if isinstance(other, self.__class__):
+            return all(
+                [getattr(self, k) == getattr(other, k) for k, _ in self.KWARGS]
+            )
+        return NotImplemented
 
 
-    def decode(self, buffer):
+    def decode(self, buffer: bytes) -> None:
         raise NotImplementedError(
             self.__class__.__name__ + "must implement `decode()` method"
         )
 
     @property
         raise NotImplementedError(
             self.__class__.__name__ + "must implement `decode()` method"
         )
 
     @property
-    def packed(self):
+    def packed(self) -> bytes:
         raise NotImplementedError(
             self.__class__.__name__ + "must implement `packed()` property"
         )
 
 
         raise NotImplementedError(
             self.__class__.__name__ + "must implement `packed()` property"
         )
 
 
-def topic(proto, is_incoming=True, imei=None):
+def topic(
+    proto: int, is_incoming: bool = True, imei: Optional[str] = None
+) -> bytes:
     return pack("BB", is_incoming, proto) + (
         b"" if imei is None else pack("16s", imei.encode())
     )
     return pack("BB", is_incoming, proto) + (
         b"" if imei is None else pack("16s", imei.encode())
     )
@@ -98,7 +113,7 @@ class Bcast(_Zmsg):
     )
 
     @property
     )
 
     @property
-    def packed(self):
+    def packed(self) -> bytes:
         return (
             pack(
                 "BB16s",
         return (
             pack(
                 "BB16s",
@@ -117,10 +132,10 @@ class Bcast(_Zmsg):
             + self.packet
         )
 
             + self.packet
         )
 
-    def decode(self, buffer):
+    def decode(self, buffer: bytes) -> None:
         self.is_incoming = bool(buffer[0])
         self.proto = buffer[1]
         self.is_incoming = bool(buffer[0])
         self.proto = buffer[1]
-        self.imei = buffer[2:18].decode()
+        self.imei: Optional[str] = buffer[2:18].decode()
         if self.imei == "0000000000000000":
             self.imei = None
         self.when = unpack("!d", buffer[18:26])[0]
         if self.imei == "0000000000000000":
             self.imei = None
         self.when = unpack("!d", buffer[18:26])[0]
@@ -134,7 +149,7 @@ class Resp(_Zmsg):
     KWARGS = (("imei", None), ("when", None), ("packet", b""))
 
     @property
     KWARGS = (("imei", None), ("when", None), ("packet", b""))
 
     @property
-    def packed(self):
+    def packed(self) -> bytes:
         return (
             pack(
                 "16s",
         return (
             pack(
                 "16s",
@@ -150,7 +165,7 @@ class Resp(_Zmsg):
             + self.packet
         )
 
             + self.packet
         )
 
-    def decode(self, buffer):
+    def decode(self, buffer: bytes) -> None:
         self.imei = buffer[:16].decode()
         self.when = unpack("!d", buffer[16:24])[0]
         self.packet = buffer[24:]
         self.imei = buffer[:16].decode()
         self.when = unpack("!d", buffer[16:24])[0]
         self.packet = buffer[24:]