Union,
)
+from .protomodule import ProtoClass
+
__all__ = (
"Stream",
"class_by_prefix",
return lx
-class MetaPkt(type):
- """
- For each class corresponding to a message, automatically create
- two nested classes `In` and `Out` that also inherit from their
- "nest". Class attribute `IN_KWARGS` defined in the "nest" is
- copied to the `In` nested class under the name `KWARGS`, and
- likewise, `OUT_KWARGS` of the nest class is copied as `KWARGS`
- to the nested class `Out`. In addition, method `encode` is
- defined in both classes equal to `in_encode()` and `out_encode()`
- respectively.
- """
-
- if TYPE_CHECKING:
-
- def __getattr__(self, name: str) -> Any:
- pass
-
- def __setattr__(self, name: str, value: Any) -> None:
- pass
-
- def __new__(
- cls: Type["MetaPkt"],
- name: str,
- bases: Tuple[type, ...],
- attrs: Dict[str, Any],
- ) -> "MetaPkt":
- newcls = super().__new__(cls, name, bases, attrs)
- newcls.In = super().__new__(
- cls,
- name + ".In",
- (newcls,) + bases,
- {
- "KWARGS": newcls.IN_KWARGS,
- "decode": newcls.in_decode,
- "encode": newcls.in_encode,
- },
- )
- newcls.Out = super().__new__(
- cls,
- name + ".Out",
- (newcls,) + bases,
- {
- "KWARGS": newcls.OUT_KWARGS,
- "decode": newcls.out_decode,
- "encode": newcls.out_encode,
- },
- )
- return newcls
-
-
class Respond(Enum):
NON = 0 # Incoming, no response needed
INL = 1 # Birirectional, use `inline_response()`
EXT = 2 # Birirectional, use external responder
-class GPS303Pkt(metaclass=MetaPkt):
+class GPS303Pkt(ProtoClass):
RESPOND = Respond.NON # Do not send anything back by default
PROTO: int
IN_KWARGS: Tuple[Tuple[str, Callable[[Any], Any], Any], ...] = ()
return proto.startswith(PROTO_PREFIX)
-def proto_name(obj: Union[MetaPkt, GPS303Pkt]) -> str:
+def proto_name(obj: Union[Type[GPS303Pkt], GPS303Pkt]) -> str:
return PROTO_PREFIX + (
obj.__class__.__name__ if isinstance(obj, GPS303Pkt) else obj.__name__
)