from __future__ import annotations
import inspect
import textwrap
from collections.abc import Callable
from types import EllipsisType
from typing import NamedTuple, TypeVar, Any, Type, ParamSpec
from typing_extensions import Self
__all__ = [
'DispatchCommand',
'DispatchCommandNotFound',
'dispatch_group',
'Dispatch',
]
T = TypeVar('T')
P = ParamSpec('P')
R = TypeVar('R')
ARGCLZ_DISPATCH_COMMAND = '__argclz_dispatch_command__'
[docs]
class DispatchCommand(NamedTuple):
"""
The information of :func:`~argclz.dispatch.annotations.dispatch` function.
Use :func:`~argclz.dispatch.annotations.dispatch` instead.
Do not create this class directly.
"""
group: str | None
"""dispatch group"""
command: str
"""primary command"""
aliases: tuple[str, ...]
"""secondary command"""
order: float
"""order of this command shown in the help document"""
usage: str | None
"""usage line of this command """
func: Callable[..., Any]
"""target function"""
validators: dict[str, Callable[[str], Any]]
"""parameter validators"""
hidden: bool = False
"""Is it hidden?"""
@property
def commands(self) -> list[str]:
"""all acceptable commands"""
return [self.command, *self.aliases]
[docs]
def parameters(self) -> list[CommandParameter]:
"""information of command's parameters"""
s = inspect.signature(self.func)
return [CommandParameter.of(name, para) for i, (name, para) in enumerate(s.parameters.items()) if i > 0]
@property
def doc(self) -> str | None:
"""document of the command."""
return self.func.__doc__
[docs]
def __call__(self, zelf: Any, *args: Any, **kwargs: Any) -> Any:
"""
invoke commands.
**parameter pre-processing**
If any argument in *args* is a str that matches `'name=value'` patterns,
it will be parsed into a keyword argument. It allows use `key=value` pattern
in commandline without knowing the position of the parameter.
**parameter post-processing**
If any argument has a validator (by :func:`~argclz.dispatch.annotations.validator_for`),
the argument will be casted to desired type (if it is a str) and be validated.
An `ValueError` will be raised when validation fail.
:param zelf: instance of the target function
:param args: positional arguments of the target function
:param kwargs: keyword arguments of the target function
:return: target function's return
"""
_args = []
_kwargs = dict(kwargs)
for value in args:
if isinstance(value, str):
match value.partition('='):
case (value, '', ''):
_args.append(value)
case (k, '=', value):
_kwargs[k] = value
else:
_args.append(value)
a = inspect.signature(self.func).bind_partial(zelf, *_args, **_kwargs)
for par, validator in self.validators.items():
try:
val = a.arguments[par]
except KeyError:
continue
try:
new_value = validator(val)
except (ValueError, TypeError, IndexError, KeyError) as e:
raise ValueError(f'command {self.command} argument "{par}" : {e}') from e
a.arguments[par] = new_value
return self.func(*a.args, **a.kwargs)
[docs]
class DispatchGroup(NamedTuple):
"""dispatch group."""
group: str
"""group name"""
[docs]
def __call__(self, command: str,
*alias: str,
order: float = 5,
usage: str | None = None,
hidden=False):
"""
A decorator that mark a function as a dispatch target function.
All functions decorated in same dispatch group should have save
function signature (at least for non-default parameters). For example:
**Example**
>>> class D(Dispatch):
... command_group = dispatch_group('A')
... @command_group('A')
... def function_a(self, a, b, c=None):
... pass
:param command: primary command name
:param alias: secondary command names
:param order: order of this command shown in the :meth:`~argclz.dispatch.core.Dispatch.build_command_usages()`
:param usage: usage line of this command shown in the :meth:`~argclz.dispatch.core.Dispatch.build_command_usages()`
:param hidden: hide this command from :meth:`~argclz.dispatch.core.Dispatch.list_commands()`
"""
from .annotations import dispatch
return dispatch(command, *alias, group=self.group, order=order, usage=usage, hidden=hidden)
def __set_name__(self, owner, name):
if not issubclass(owner, Dispatch):
raise TypeError('owner not Dispatch')
[docs]
def __get__(self, instance: Dispatch | None, owner: Type[Dispatch]) -> BoundDispatchGroup:
if instance is None:
return BoundDispatchGroup(owner, self.group)
else:
return BoundDispatchGroup(instance, self.group)
[docs]
class BoundDispatchGroup(NamedTuple):
zelf: Dispatch | Type[Dispatch]
group: str
[docs]
def list_commands(self, *,
all: bool = False) -> list[DispatchCommand]:
"""list all :func:`~argclz.dispatch.annotations.dispatch` info in this group.
:param all: including hidden commands
:return: list of DispatchCommand
"""
return self.zelf.list_commands(self.group, all=all)
[docs]
def find_command(self, command: str) -> DispatchCommand | None:
"""find :func:`~argclz.dispatch.annotations.dispatch` function according to *command* in this group.
:param command: command or one of command's aliases
:return: found DispatchCommand
"""
return self.zelf.find_command(command, group=self.group)
[docs]
def invoke_command(self, command: str, *args, **kwargs) -> Any:
"""invoke a :func:`~argclz.dispatch.annotations.dispatch` function in this group.
:param command: command or one of command's aliases
:param args: positional arguments of the target function
:param kwargs: keyword arguments of the target function
:return: function's return
:raise DispatchCommandNotFound:
"""
if isinstance(self.zelf, type):
raise TypeError()
if (info := self.find_command(command)) is None:
raise DispatchCommandNotFound(command, self.group)
return info(self.zelf, *args, **kwargs)
[docs]
def dispatch_group(group: str) -> DispatchGroup:
"""
Create a dispatch group.
**Example**
>>> class D(Dispatch):
... command_group = dispatch_group('A')
... @command_group('A')
... def function_a(self, a, b, c=None):
... pass
dispatch_group can be assign inside a :class:`Dispatch` (like example above) or
at the global level.
:param group: group name.
:return:
:raise TypeError: If it is assigned in a non- :class:`Dispatch` class.
"""
return DispatchGroup(group)
[docs]
class CommandParameter(NamedTuple):
name: str
optional: bool
kind: inspect._ParameterKind
[docs]
@classmethod
def of(cls, name: str, para: inspect.Parameter) -> Self:
optional = para.default is not inspect.Parameter.empty
return cls(name, optional, para.kind)
[docs]
def usage(self):
name = self.name.upper()
match self.kind:
case inspect.Parameter.VAR_KEYWORD:
return f'**{name}'
case inspect.Parameter.VAR_POSITIONAL:
return f'*{name}'
case inspect.Parameter.KEYWORD_ONLY:
ret = f'{name}='
case _:
ret = name
if self.optional:
return f'[{ret}]'
else:
return ret
[docs]
class DispatchCommandNotFound(RuntimeError):
[docs]
def __init__(self, command: str, group: str | None = None):
if group is None:
message = f'command {command} not found'
else:
message = f'command {group}:{command} not found'
super().__init__(message)
class CommandHelps(NamedTuple):
commands: list[str]
order: float
usage: str | None
params: list[CommandParameter]
doc: str
@classmethod
def of(cls, command: DispatchCommand) -> Self:
return cls(command.commands, command.order, command.usage, command.parameters(), command.doc or '')
def build_command_usage(self, show_para: bool = False) -> str:
if self.usage is not None:
return self.usage
match self.commands:
case [command]:
ret = command
case [command, *aliases]:
ret = command + ' (' + ', '.join(aliases) + ')'
case _:
raise RuntimeError()
if not show_para:
return ret
return ret + ' ' + ' '.join([it.usage() for it in self.params])
def brief_doc(self) -> str:
contents = textwrap.dedent(self.doc).split('\n')
ret = []
for content in contents:
content = content.strip()
if content == '':
if len(ret):
break
else:
continue
if content.endswith('.'):
ret.append(content)
break
try:
i = content.index('. ')
except ValueError:
pass
else:
ret.append(content[:i + 1])
break
ret.append(content)
return ' '.join(ret)
[docs]
class Dispatch:
"""
A :func:`~argclz.dispatch.annotations.dispatch` functions container that
it is able to find and run the target function by corresponding name (``command`` here).
**Example**
>>> from argclz.dispatch import Dispatch, dispatch
... class Main(Dispatch):
... @dispatch('A')
... def run_a(self): ...
... Main().invoke_command('A')
"""
[docs]
@classmethod
def list_commands(cls, group: str | DispatchGroup | BoundDispatchGroup | None | EllipsisType = ..., *,
all: bool = False) -> list[DispatchCommand]:
"""list all :func:`~argclz.dispatch.annotations.dispatch` functions.
:param group: dispatch group.
:param all: including hidden commands
:return: list of DispatchCommand
"""
if isinstance(group, (DispatchGroup, BoundDispatchGroup)):
group = group.group
ret = []
for attr in dir(cls):
attr_value = getattr(cls, attr)
info: DispatchCommand | None = getattr(attr_value, ARGCLZ_DISPATCH_COMMAND, None)
if info is not None:
if group is ... or group == info.group:
if all or not info.hidden:
ret.append(info)
return ret
[docs]
@classmethod
def find_command(cls, command: str,
group: str | DispatchGroup | BoundDispatchGroup | None | EllipsisType = ...) -> DispatchCommand | None:
"""find :func:`~argclz.dispatch.annotations.dispatch` function according to *command*.
:param command: command or one of command's aliases
:param group: dispatch group
:return: found :class:`DispatchCommand`
"""
if isinstance(group, (DispatchGroup, BoundDispatchGroup)):
group = group.group
for attr in dir(cls):
attr_value = getattr(cls, attr)
info: DispatchCommand | None = getattr(attr_value, ARGCLZ_DISPATCH_COMMAND, None)
if info is not None:
if group is ... or group == info.group:
if command == info.command or command in info.aliases:
return info
return None
[docs]
def invoke_command(self, command: str, *args, **kwargs) -> Any:
"""invoke a :func:`~argclz.dispatch.annotations.dispatch` function in the default group.
:param command: command or one of command's aliases
:param args: positional arguments of the target function
:param kwargs: keyword arguments of the target function
:return: target function's return
:raise DispatchCommandNotFound:
"""
if (info := self.find_command(command, None)) is None:
raise DispatchCommandNotFound(command)
return info(self, *args, **kwargs)
[docs]
def invoke_group_command(self, group: str | DispatchGroup | BoundDispatchGroup, command: str, *args,
**kwargs) -> Any:
"""invoke a :func:`~argclz.dispatch.annotations.dispatch` function in a certain group.
:param group: dispatch group
:param command: command or one of command's aliases
:param args: positional arguments of the target function
:param kwargs: keyword arguments of the target function
:return: target function's return
:raise DispatchCommandNotFound:
"""
if (info := self.find_command(command, group)) is None:
group_name = group.group if isinstance(group, (DispatchGroup, BoundDispatchGroup)) else group
raise DispatchCommandNotFound(command, group_name)
return info(self, *args, **kwargs)
[docs]
@classmethod
def build_command_usages(cls, group: str | None = None, *,
show_para: bool = False,
width: int = 120,
doc_indent: int = 20) -> str:
"""
Build a help document for :func:`~argclz.dispatch.annotations.dispatch` functions
in this class.
:param group: for functions in the group.
:param show_para: show parameters.
:param width: text-wrap width.
:param doc_indent: description indent.
:return: help document.
"""
ret = []
commands = cls.list_commands(group)
commands.sort(key=lambda it: it.order)
for info in commands:
info = CommandHelps.of(info)
header = info.build_command_usage(show_para=show_para)
content = info.brief_doc()
if len(header) < doc_indent:
content = header + ' ' * (doc_indent - len(header)) + content
ret.extend(textwrap.wrap(content, width,
subsequent_indent=' ' * doc_indent,
break_long_words=True,
break_on_hyphens=True))
else:
ret.append(header)
ret.extend(textwrap.wrap(content, width,
initial_indent=' ' * doc_indent,
subsequent_indent=' ' * doc_indent,
break_long_words=True,
break_on_hyphens=True))
return '\n'.join(ret)