refactor!: allow native datatypes to be strings

This commit is contained in:
Daylin Morgan 2024-10-24 15:44:58 -05:00
parent d7fa879eb9
commit 83f94cd4c2
Signed by: daylin
GPG key ID: 950D13E9719334AD

View file

@ -12,7 +12,6 @@ from argparse import (
RawDescriptionHelpFormatter, RawDescriptionHelpFormatter,
) )
from functools import wraps from functools import wraps
from inspect import Parameter
from pathlib import Path from pathlib import Path
from subprocess import PIPE, Popen from subprocess import PIPE, Popen
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
@ -81,6 +80,16 @@ class Task:
def _mark(self) -> None: def _mark(self) -> None:
self.show = True self.show = True
def _task_repr(self) -> str:
return (
"\n".join(
f" {line}"
for line in inspect.getsource(self.func).splitlines()
if not line.startswith("@")
)
+ "\n"
)
class Graph: class Graph:
def __init__(self) -> None: def __init__(self) -> None:
@ -105,8 +114,6 @@ class Context:
self._ids: Dict[str, str] = {} self._ids: Dict[str, str] = {}
self.targets: Dict[str, Any] = {} self.targets: Dict[str, Any] = {}
self.data: Any = None self.data: Any = None
self.flags: Dict[str, Any] = {}
self._flag_defs: List[Tuple[Tuple[str, ...], Any]] = []
self.show_targets = True self.show_targets = True
self._graph = Graph() self._graph = Graph()
self.rest = [] # remaining positional args self.rest = [] # remaining positional args
@ -154,13 +161,35 @@ class Context:
assert name in self._ids assert name in self._ids
return self._tasks[self._ids[name]] return self._tasks[self._ids[name]]
def add_flag(self, *args: str, **kwargs: Any) -> None:
class SwyddFlags:
def __init__(self) -> None:
self._flags: Dict[str, Any] = {}
self._flag_defs: List[Tuple[Tuple[str, ...], Any]] = []
def __getattr__(self, name: str):
if name in self._flags:
return self._flags[name]
else:
# TODO: better error/sys.exit
raise AttributeError
def add(self, *args: str, **kwargs: Any) -> None:
name = max(args, key=len).split("-")[-1] name = max(args, key=len).split("-")[-1]
self.flags[name] = None self._flags[name] = None
self._flag_defs.append((args, kwargs)) self._flag_defs.append((args, kwargs))
def _parse_args(self, args):
for name in self._flags:
self._flags[name] = args.pop(name)
def _add_arguments(self, shared):
for flag_args, flag_kwargs in self._flag_defs:
shared.add_argument(*flag_args, **flag_kwargs)
ctx = Context() ctx = Context()
flags = SwyddFlags()
class SwyddSubResult: class SwyddSubResult:
@ -230,7 +259,7 @@ class SwyddProc:
if self.output: if self.output:
sub_kwargs["text"] = True # assume text is the desired output sub_kwargs["text"] = True # assume text is the desired output
sub_kwargs["capture_output"] = True sub_kwargs["stdout"] = PIPE
sub_kwargs.update(**self.cmd_kwargs) sub_kwargs.update(**self.cmd_kwargs)
return sub_kwargs return sub_kwargs
@ -512,11 +541,17 @@ def _inspect_wrapper(place, func):
def task( def task(
arg=None, arg: Callable[..., Any] | None = None,
): ):
def wrapper(func: Callable[..., Any]) -> Callable[..., Callable[..., None]]: def wrapper(
func: Callable[..., Any] | None = None,
) -> Callable[..., Callable[..., None]]:
# added this to prevent lsp errors
# about missing argument on wrapped functions
if func is None:
raise ValueError
ctx._add_task(func, show=not func.__name__.startswith("_")) ctx._add_task(func, show=not func.__name__.startswith("_"))
# _inspect_wrapper("task", func)
@wraps(func) @wraps(func)
def inner(*args: Any, **kwargs: Any) -> Callable[..., None]: def inner(*args: Any, **kwargs: Any) -> Callable[..., None]:
@ -581,12 +616,6 @@ def option(
return wrapper return wrapper
def manage(version: bool = False) -> None:
"""internal cli"""
if version:
print("current version", __version__)
def noop(*args, **kwargs) -> Any: def noop(*args, **kwargs) -> Any:
_ = args, kwargs _ = args, kwargs
@ -617,7 +646,27 @@ def _target_generator(
return wrapper return wrapper
# TODO: reduce how the load bearing on this function def _internal_cli():
name = "+swydd"
parser = ArgumentParser(
usage=f"%(prog)s {name} [opts]",
description="""internal cli""",
prog=os.path.basename(sys.argv[0]),
)
parser.add_argument("-V", "--version", help="show version", action="store_true")
parser.add_argument("--which", help="show path to swydd", action="store_true")
args = parser.parse_args(sys.argv[2:])
if args.version:
print("swydd, v" + __version__)
if args.which:
print("current swydd:", __file__)
sys.exit(0)
# TODO: reduce the load bearing on this function
# seperate by subparser for tasks vs subparser for target # seperate by subparser for tasks vs subparser for target
def _generate_task_subparser( def _generate_task_subparser(
shared: ArgumentParser, shared: ArgumentParser,
@ -642,21 +691,28 @@ def _generate_task_subparser(
usage=f"%(prog)s {name} [opts]", usage=f"%(prog)s {name} [opts]",
prog=prog, prog=prog,
) )
for name, info in task.params.items(): for name, info in task.params.items():
param = info.get("Parameter") # must check signature for args? param = info.get("Parameter") # must check signature for args?
annotation = param.annotation
args = [] args = []
if "short" in info: if "short" in info:
args.append("-" + info["short"]) args.append("-" + info["short"])
args.append(f"--{name.replace('_','-')}") args.append(f"--{name.replace('_','-')}")
kwargs = {"help": info.get("help", "")} kwargs = {"help": info.get("help", "")}
if param.annotation is bool: str_lit_types = {"bool": bool, "int": int, "float": float, "Path": Path}
annotation = str_lit_types.get(annotation, annotation)
if annotation is bool:
kwargs.update({"default": False, "action": "store_true"}) kwargs.update({"default": False, "action": "store_true"})
elif param.annotation != Parameter.empty: elif (annotation != inspect._empty) and (type(annotations) is not str):
kwargs.update({"type": param.annotation}) kwargs.update({"type": annotation})
kwargs.update( kwargs.update(
{"required": True} {"required": True}
if param.default == Parameter.empty if param.default == inspect._empty
else {"default": param.default} else {"default": param.default}
) )
@ -692,7 +748,7 @@ def _target_status(target: str) -> str:
for need in needs: for need in needs:
if not (p := Path(need)).is_file(): if not (p := Path(need)).is_file():
return "missing inputs!" return "missing inputs!"
needs_stats.append(p) needs_stats.append(p.stat())
if any((stat.st_mtime > target_stat.st_mtime for stat in needs_stats)): if any((stat.st_mtime > target_stat.st_mtime for stat in needs_stats)):
return "out of date" return "out of date"
@ -714,27 +770,17 @@ def _add_targets(
) )
def _task_repr(func: Callable) -> str:
return (
"\n".join(
f" {line}"
for line in inspect.getsource(func).splitlines()
if not line.startswith("@")
)
+ "\n"
)
def cli(default: str | None = None) -> None: def cli(default: str | None = None) -> None:
ctx._generate_graph() ctx._generate_graph()
if len(sys.argv) > 1 and sys.argv[1] == "+swydd":
_internal_cli()
parser = ArgumentParser( parser = ArgumentParser(
formatter_class=SubcommandHelpFormatter, usage="%(prog)s <task/target> [opts]" formatter_class=SubcommandHelpFormatter, usage="%(prog)s <task/target> [opts]"
) )
shared = ArgumentParser(add_help=False) shared = ArgumentParser(add_help=False)
for flag_args, flag_kwargs in ctx._flag_defs: flags._add_arguments(shared)
shared.add_argument(*flag_args, **flag_kwargs)
shared.add_argument( shared.add_argument(
"-v", "--verbose", help="use verbose output", action="store_true" "-v", "--verbose", help="use verbose output", action="store_true"
@ -747,11 +793,6 @@ def cli(default: str | None = None) -> None:
title="tasks", required=True, dest="pos-arg", metavar="<task/target>" title="tasks", required=True, dest="pos-arg", metavar="<task/target>"
) )
if len(sys.argv) > 1 and sys.argv[1] == "+swydd":
_generate_task_subparser(
shared, subparsers, Task(manage, name="+swydd", show=True)
)
_add_targets(shared, subparsers, ctx) _add_targets(shared, subparsers, ctx)
for task in ctx._tasks.values(): for task in ctx._tasks.values():
@ -776,15 +817,14 @@ def cli(default: str | None = None) -> None:
ctx.dry = args.pop("dry_run", False) ctx.dry = args.pop("dry_run", False)
ctx.dag = args.pop("dag", False) ctx.dag = args.pop("dag", False)
ctx.force = args.pop("force", False) ctx.force = args.pop("force", False)
for name in ctx.flags: flags._parse_args(args)
ctx.flags[name] = args.pop(name)
if f := args.pop("func", None): if f := args.pop("func", None):
if ctx.dry: if ctx.dry:
sys.stderr.write("dry run >>>\n" f" args: {args}\n") sys.stderr.write("dry run >>>\n" f" args: {args}\n")
if ctx._env: if ctx._env:
sys.stderr.write(f" env: {ctx._env}\n") sys.stderr.write(f" env: {ctx._env}\n")
sys.stderr.write(_task_repr(ctx._get_task(pos_arg).func)) sys.stderr.write(ctx._get_task(pos_arg)._task_repr())
elif ctx.dag: elif ctx.dag:
sys.stderr.write( sys.stderr.write(
"currently --dag is a noop\n" "currently --dag is a noop\n"
@ -833,6 +873,7 @@ __all__ = [
"setenv", "setenv",
"cli", "cli",
"task", "task",
"flags",
] ]
if __name__ == "__main__": if __name__ == "__main__":