144 lines
3.6 KiB
Python
144 lines
3.6 KiB
Python
import typing as t
|
|
import abc
|
|
import asyncio
|
|
import enum
|
|
|
|
P = t.ParamSpec("P")
|
|
|
|
|
|
class Omit(enum.IntEnum):
|
|
OMIT = enum.auto()
|
|
|
|
|
|
OMIT = Omit.OMIT
|
|
|
|
|
|
class Endpoint[C, I, O](t.Protocol):
|
|
def __call__(self, ctx: C, args: I, /) -> t.Awaitable[O]: ...
|
|
|
|
|
|
class Filter[C, I](Endpoint[C, I, bool], t.Protocol): ...
|
|
|
|
|
|
# C I R N O
|
|
class Handler[C, I, N, O](t.Protocol):
|
|
def __call__(self, ctx: C, args: I, next: N, /) -> t.Awaitable[O]: ...
|
|
|
|
|
|
@t.runtime_checkable
|
|
class Traversable(t.Protocol):
|
|
__leafs__: tuple[str, ...]
|
|
|
|
|
|
def leafs_of(t: Traversable) -> tuple[str, ...]:
|
|
return t.__leafs__
|
|
|
|
|
|
def traverse[A](what: Traversable, acc: A, f: t.Callable[[A, t.Any], A]) -> A:
|
|
"""Traverse entire chain in depth."""
|
|
while True:
|
|
leafs = [(what, leafs_of(what))]
|
|
new_leafs = []
|
|
for what, leaf in leafs:
|
|
attr = getattr(what, leaf) # type: ignore
|
|
acc = f(acc, attr)
|
|
|
|
if isinstance(attr, Traversable):
|
|
children = leafs_of(attr)
|
|
if children:
|
|
new_leafs.append((attr, children))
|
|
|
|
if new_leafs:
|
|
leafs = new_leafs
|
|
else:
|
|
break
|
|
|
|
return acc
|
|
|
|
|
|
def traverse_shallow[A](what: Traversable, acc: A, f: t.Callable[[A, t.Any], A]) -> A:
|
|
"""Traverse only the first level of chain."""
|
|
for leaf in leafs_of(what):
|
|
attr = getattr(what, leaf)
|
|
acc = f(acc, attr)
|
|
|
|
return acc
|
|
|
|
|
|
class Shard(Traversable, t.Protocol):
|
|
def apply[C, I, N, O](
|
|
self: Handler[C, I, N, O], rhs: N, /
|
|
) -> "EComposable[C, I, O]":
|
|
"""Apply continuation to the handler."""
|
|
return Apply[C, I, N, O](self, rhs)
|
|
|
|
def checked[C, I, O](
|
|
self: Endpoint[C, I, O],
|
|
filter: Filter[C, I],
|
|
err: Endpoint[C, I, O],
|
|
) -> "EComposable[C, I, O]":
|
|
return EChecked[C, I, O](filter, self, err)
|
|
|
|
def then[C, I, N, O](
|
|
self, rhs: Handler[C, I, N, O], /
|
|
) -> "Then[C, I, N, O, t.Self]":
|
|
"""Connect two handlers, thus lhs will run after rhs."""
|
|
return Then[C, I, N, O, t.Self](self, rhs)
|
|
|
|
|
|
class EComposable[C, I, O](Shard, Endpoint[C, I, O], t.Protocol):
|
|
"""Composable endpoint."""
|
|
|
|
|
|
class HComposable[C, I, N, O](Shard, Handler[C, I, N, O], t.Protocol):
|
|
"""Composable handler."""
|
|
|
|
|
|
class EChecked[C, I, O](Shard):
|
|
__slots__ = ("filter", "ok", "err")
|
|
__leafs__ = ("filter", "ok", "err")
|
|
|
|
def __init__(
|
|
self,
|
|
filter: Filter[C, I],
|
|
ok: Endpoint[C, I, O],
|
|
err: Endpoint[C, I, O],
|
|
) -> None:
|
|
self.filter = filter
|
|
self.ok = ok
|
|
self.err = err
|
|
|
|
async def __call__(self, ctx: C, arg: I) -> O:
|
|
if await self.filter(ctx, arg):
|
|
return await self.ok(ctx, arg)
|
|
return await self.err(ctx, arg)
|
|
|
|
|
|
class Then[C, I, N, O, L](Shard):
|
|
__slots__ = ("lhs", "rhs")
|
|
__leafs__ = ("lhs", "rhs")
|
|
|
|
def __init__(self, lhs: L, rhs: Handler[C, I, N, O]) -> None:
|
|
self.lhs = lhs
|
|
self.rhs = rhs
|
|
|
|
def __call__[Cl, In, On](
|
|
self: "Then[C, I, N, O, Handler[Cl, In, Endpoint[C, I, O], On]]",
|
|
ctx: Cl,
|
|
args: In,
|
|
next: N,
|
|
) -> t.Awaitable[On]:
|
|
endpoint: Endpoint[C, I, O] = Apply(self.rhs, next)
|
|
return self.lhs(ctx, args, Apply(self.rhs, next))
|
|
|
|
|
|
class Apply[C, I, N, O](Shard):
|
|
__slots__ = ("handler", "endpoint")
|
|
__leafs__ = ("handler", "endpoint")
|
|
|
|
def __init__(self, handler: Handler[C, I, N, O], endpoint: N) -> None:
|
|
self.handler = handler
|
|
self.endpoint = endpoint
|
|
|
|
def __call__(self, ctx: C, args: I) -> t.Awaitable[O]:
|
|
return self.handler(ctx, args, self.endpoint)
|