import typing as t import abc import asyncio import enum P = t.ParamSpec("P") class Omit(enum.IntEnum): OMIT = enum.auto() OMIT = Omit.OMIT class Endpoint[C, I, O](t.Protocol): def __call__(self, ctx: C, args: I, /) -> t.Awaitable[O]: ... class Filter[C, I](Endpoint[C, I, bool], t.Protocol): ... # C I R N O class Handler[C, I, N, O](t.Protocol): def __call__(self, ctx: C, args: I, next: N, /) -> t.Awaitable[O]: ... @t.runtime_checkable class Traversable(t.Protocol): __leafs__: tuple[str, ...] def leafs_of(t: Traversable) -> tuple[str, ...]: return t.__leafs__ def traverse[A](what: Traversable, acc: A, f: t.Callable[[A, t.Any], A]) -> A: """Traverse entire chain in depth.""" while True: leafs = [(what, leafs_of(what))] new_leafs = [] for what, leaf in leafs: attr = getattr(what, leaf) # type: ignore acc = f(acc, attr) if isinstance(attr, Traversable): children = leafs_of(attr) if children: new_leafs.append((attr, children)) if new_leafs: leafs = new_leafs else: break return acc def traverse_shallow[A](what: Traversable, acc: A, f: t.Callable[[A, t.Any], A]) -> A: """Traverse only the first level of chain.""" for leaf in leafs_of(what): attr = getattr(what, leaf) acc = f(acc, attr) return acc class Shard(Traversable, t.Protocol): def apply[C, I, N, O]( self: Handler[C, I, N, O], rhs: N, / ) -> "EComposable[C, I, O]": """Apply continuation to the handler.""" return Apply[C, I, N, O](self, rhs) def checked[C, I, O]( self: Endpoint[C, I, O], filter: Filter[C, I], err: Endpoint[C, I, O], ) -> "EComposable[C, I, O]": return EChecked[C, I, O](filter, self, err) def then[C, I, N, O]( self, rhs: Handler[C, I, N, O], / ) -> "Then[C, I, N, O, t.Self]": """Connect two handlers, thus lhs will run after rhs.""" return Then[C, I, N, O, t.Self](self, rhs) class EComposable[C, I, O](Shard, Endpoint[C, I, O], t.Protocol): """Composable endpoint.""" class HComposable[C, I, N, O](Shard, Handler[C, I, N, O], t.Protocol): """Composable handler.""" class EChecked[C, I, O](Shard): __slots__ = ("filter", "ok", "err") __leafs__ = ("filter", "ok", "err") def __init__( self, filter: Filter[C, I], ok: Endpoint[C, I, O], err: Endpoint[C, I, O], ) -> None: self.filter = filter self.ok = ok self.err = err async def __call__(self, ctx: C, arg: I) -> O: if await self.filter(ctx, arg): return await self.ok(ctx, arg) return await self.err(ctx, arg) class Then[C, I, N, O, L](Shard): __slots__ = ("lhs", "rhs") __leafs__ = ("lhs", "rhs") def __init__(self, lhs: L, rhs: Handler[C, I, N, O]) -> None: self.lhs = lhs self.rhs = rhs def __call__[Cl, In, On]( self: "Then[C, I, N, O, Handler[Cl, In, Endpoint[C, I, O], On]]", ctx: Cl, args: In, next: N, ) -> t.Awaitable[On]: endpoint: Endpoint[C, I, O] = Apply(self.rhs, next) return self.lhs(ctx, args, Apply(self.rhs, next)) class Apply[C, I, N, O](Shard): __slots__ = ("handler", "endpoint") __leafs__ = ("handler", "endpoint") def __init__(self, handler: Handler[C, I, N, O], endpoint: N) -> None: self.handler = handler self.endpoint = endpoint def __call__(self, ctx: C, args: I) -> t.Awaitable[O]: return self.handler(ctx, args, self.endpoint)