]> www.average.org Git - loctrkd.git/blobdiff - loctrkd/zx303proto.py
WIP converting wsgateway to multiprotocols
[loctrkd.git] / loctrkd / zx303proto.py
index ebead92e0e28c88f64517547ed8ac1828fa5d301..bd19e108c770221e3a0132dfc7c863e23181622f 100755 (executable)
@@ -34,10 +34,12 @@ from typing import (
 __all__ = (
     "Stream",
     "class_by_prefix",
+    "enframe",
+    "exposed_protos",
     "inline_response",
+    "proto_handled",
     "parse_message",
     "probe_buffer",
-    "proto_by_name",
     "proto_name",
     "DecodeError",
     "Respond",
@@ -92,10 +94,6 @@ class Stream:
     def __init__(self) -> None:
         self.buffer = b""
 
-    @staticmethod
-    def enframe(buffer: bytes, imei: Optional[str] = None) -> bytes:
-        return b"xx" + buffer + b"\r\n"
-
     def recv(self, segment: bytes) -> List[Union[bytes, str]]:
         """
         Process next segment of the stream. Return successfully deframed
@@ -150,6 +148,10 @@ class Stream:
         return ret
 
 
+def enframe(buffer: bytes, imei: Optional[str] = None) -> bytes:
+    return b"xx" + buffer + b"\r\n"
+
+
 ### Parser/Constructor ###
 
 
@@ -863,28 +865,32 @@ if True:  # just to indent the code, sorry!
 
 def class_by_prefix(
     prefix: str,
-) -> Union[Type[GPS303Pkt], List[Tuple[str, int]]]:
+) -> Union[Type[GPS303Pkt], List[str]]:
+    if prefix.startswith(PROTO_PREFIX):
+        pname = prefix[len(PROTO_PREFIX) :]
+    else:
+        raise KeyError(pname)
     lst = [
         (name, proto)
         for name, proto in PROTOS.items()
         if name.upper().startswith(prefix.upper())
     ]
     if len(lst) != 1:
-        return lst
+        return [name for name, _ in lst]
     _, proto = lst[0]
     return CLASSES[proto]
 
 
+def proto_handled(proto: str) -> bool:
+    return proto.startswith(PROTO_PREFIX)
+
+
 def proto_name(obj: Union[MetaPkt, GPS303Pkt]) -> str:
     return PROTO_PREFIX + (
         obj.__class__.__name__ if isinstance(obj, GPS303Pkt) else obj.__name__
     )
 
 
-def proto_by_name(name: str) -> int:
-    return PROTOS.get(name, -1)
-
-
 def proto_of_message(packet: bytes) -> str:
     return proto_name(CLASSES.get(packet[1], UNKNOWN))
 
@@ -942,3 +948,11 @@ def parse_message(packet: bytes, is_incoming: bool = True) -> GPS303Pkt:
     retobj.PROTO = proto  # Override class attr with object attr
     retobj.cause = cause
     return retobj
+
+
+def exposed_protos() -> List[Tuple[str, bool]]:
+    return [
+        (proto_name(GPS_POSITIONING), True),
+        (proto_name(WIFI_POSITIONING), False),
+        (proto_name(STATUS), True),
+    ]