]> www.average.org Git - loctrkd.git/blobdiff - loctrkd/zx303proto.py
WIP converting wsgateway to multiprotocols
[loctrkd.git] / loctrkd / zx303proto.py
index efb02d249f6c118185ce2a66b23675c9acf85efd..bd19e108c770221e3a0132dfc7c863e23181622f 100755 (executable)
@@ -34,10 +34,12 @@ from typing import (
 __all__ = (
     "Stream",
     "class_by_prefix",
+    "enframe",
+    "exposed_protos",
     "inline_response",
+    "proto_handled",
     "parse_message",
     "probe_buffer",
-    "proto_by_name",
     "proto_name",
     "DecodeError",
     "Respond",
@@ -81,7 +83,7 @@ __all__ = (
     "UNKNOWN_B3",
 )
 
-PROTO_PREFIX = "ZX"
+PROTO_PREFIX = "ZX:"
 
 ### Deframer ###
 
@@ -92,10 +94,6 @@ class Stream:
     def __init__(self) -> None:
         self.buffer = b""
 
-    @staticmethod
-    def enframe(buffer: bytes) -> bytes:
-        return b"xx" + buffer + b"\r\n"
-
     def recv(self, segment: bytes) -> List[Union[bytes, str]]:
         """
         Process next segment of the stream. Return successfully deframed
@@ -150,6 +148,10 @@ class Stream:
         return ret
 
 
+def enframe(buffer: bytes, imei: Optional[str] = None) -> bytes:
+    return b"xx" + buffer + b"\r\n"
+
+
 ### Parser/Constructor ###
 
 
@@ -863,32 +865,30 @@ if True:  # just to indent the code, sorry!
 
 def class_by_prefix(
     prefix: str,
-) -> Union[Type[GPS303Pkt], List[Tuple[str, int]]]:
+) -> Union[Type[GPS303Pkt], List[str]]:
+    if prefix.startswith(PROTO_PREFIX):
+        pname = prefix[len(PROTO_PREFIX) :]
+    else:
+        raise KeyError(pname)
     lst = [
         (name, proto)
         for name, proto in PROTOS.items()
         if name.upper().startswith(prefix.upper())
     ]
     if len(lst) != 1:
-        return lst
+        return [name for name, _ in lst]
     _, proto = lst[0]
     return CLASSES[proto]
 
 
-def proto_name(obj: Union[MetaPkt, GPS303Pkt]) -> str:
-    return (
-        PROTO_PREFIX
-        + ":"
-        + (
-            obj.__class__.__name__
-            if isinstance(obj, GPS303Pkt)
-            else obj.__name__
-        )
-    ).ljust(16, "\0")[:16]
+def proto_handled(proto: str) -> bool:
+    return proto.startswith(PROTO_PREFIX)
 
 
-def proto_by_name(name: str) -> int:
-    return PROTOS.get(name, -1)
+def proto_name(obj: Union[MetaPkt, GPS303Pkt]) -> str:
+    return PROTO_PREFIX + (
+        obj.__class__.__name__ if isinstance(obj, GPS303Pkt) else obj.__name__
+    )
 
 
 def proto_of_message(packet: bytes) -> str:
@@ -948,3 +948,11 @@ def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
     retobj.PROTO = proto  # Override class attr with object attr
     retobj.cause = cause
     return retobj
+
+
+def exposed_protos() -> List[Tuple[str, bool]]:
+    return [
+        (proto_name(GPS_POSITIONING), True),
+        (proto_name(WIFI_POSITIONING), False),
+        (proto_name(STATUS), True),
+    ]