refactor!

This commit is contained in:
Daylin Morgan 2024-10-24 16:44:08 -05:00
parent 83f94cd4c2
commit 2594234e76
Signed by: daylin
GPG key ID: 950D13E9719334AD

View file

@ -162,7 +162,7 @@ class Context:
return self._tasks[self._ids[name]]
class SwyddFlags:
class Flags:
def __init__(self) -> None:
self._flags: Dict[str, Any] = {}
self._flag_defs: List[Tuple[Tuple[str, ...], Any]] = []
@ -189,10 +189,10 @@ class SwyddFlags:
ctx = Context()
flags = SwyddFlags()
flags = Flags()
class SwyddSubResult:
class SubResult:
def __init__(
self,
code: int,
@ -206,7 +206,7 @@ class SwyddSubResult:
self._proces = process
@classmethod
def from_completed_process(cls, p: CompletedProcess) -> "SwyddSubResult":
def from_completed_process(cls, p: CompletedProcess) -> "SubResult":
return cls(p.returncode, p.stdout, p.stderr, p)
@classmethod
@ -215,49 +215,60 @@ class SwyddSubResult:
p: Popen,
stdout: str = "",
stderr: str = "",
) -> "SwyddSubResult":
) -> "SubResult":
return cls(p.returncode, stdout, stderr, p)
class SwyddProc:
def __init__(
self, cmd: str | None = None, output: bool = False, **kwargs: Any
) -> None:
class Proc:
"""a class"""
def __init__(self, cmd: str, **kwargs: Any) -> None:
self._cmd = cmd
if cmd:
self.cmd = shlex.split(cmd)
self.output = output
self.cmd_kwargs = kwargs
@classmethod
def __call__(cls, *args, **kwargs) -> "SwyddProc":
return cls(*args, **kwargs)
def pipe(self, proc: str | Proc) -> ProcPipe:
"""ProcPipe the output into `proc`
Parameters
----------
proc
Command to feed stdout into
Returns
-------
ProcPipe
"""
def pipe(self, proc: "str | SwyddProc") -> "SwyddPipe | SwyddProc":
if isinstance(proc, str):
if self._cmd is None:
return SwyddPipe(SwyddProc(proc))
else:
return SwyddPipe(self, proc)
elif isinstance(proc, SwyddProc):
return SwyddPipe(proc)
return ProcPipe(self, proc)
elif isinstance(proc, Proc):
return ProcPipe(proc)
def then(self, proc: "str | SwyddProc | SwyddSeq") -> "SwyddSeq":
if self._cmd:
return SwyddSeq(self, proc)
def then(self, proc: str | Proc | ProcSeq) -> ProcSeq:
"""If successful execute next `proc`
if isinstance(proc, SwyddProc):
return SwyddSeq(proc)
Parameters
----------
proc
Command to execute after
Returns
-------
ProcSeq
"""
if isinstance(proc, Proc):
return ProcSeq(proc)
# should swydd seq even be supported here?
elif isinstance(proc, SwyddSeq):
elif isinstance(proc, ProcSeq):
return proc
else:
return SwyddSeq(SwyddProc(proc))
return ProcSeq(Proc(proc))
def _build_kwargs(self) -> Dict[str, Any]:
def _build_kwargs(self, output: bool) -> Dict[str, Any]:
sub_kwargs: Dict[str, Any] = dict(env={**os.environ, **ctx._env})
if self.output:
if output:
sub_kwargs["text"] = True # assume text is the desired output
sub_kwargs["stdout"] = PIPE
@ -268,11 +279,10 @@ class SwyddProc:
if ctx.verbose:
sys.stdout.write(f"swydd exec | {self._cmd}\n")
def execute(self, output: bool = False) -> SwyddSubResult:
self.output = self.output or output
def execute(self, output: bool = False) -> SubResult:
self._show_command()
p = Popen(self.cmd, **self._build_kwargs())
p = Popen(self.cmd, **self._build_kwargs(output))
try:
out, err = p.communicate()
@ -283,27 +293,27 @@ class SwyddProc:
p.wait()
out, err = p.communicate()
return SwyddSubResult.from_popen(p, out, err)
return SubResult.from_popen(p, out, err)
def check(self) -> bool:
return self.execute().code == 0
class SwyddPipe:
def __init__(self, *procs: "str | SwyddProc | SwyddPipe") -> None:
class ProcPipe:
"""
prefer: :func:`Proc.pipe`
"""
def __init__(self, *procs: str | Proc | ProcPipe) -> None:
self._procs = []
for proc in procs:
if isinstance(proc, str):
self._procs.append(SwyddProc(proc))
elif isinstance(proc, SwyddProc):
self._procs.append(Proc(proc))
elif isinstance(proc, Proc):
self._procs.append(proc)
elif isinstance(proc, SwyddPipe):
elif isinstance(proc, ProcPipe):
self._procs.extend(proc._procs)
@classmethod
def __call__(cls, *args, **kwargs) -> "SwyddPipe":
return cls(*args, **kwargs)
def check(self) -> bool:
return self.execute().code == 0
@ -312,7 +322,7 @@ class SwyddPipe:
cmd_str = " | ".join([p._cmd for p in self._procs])
sys.stdout.write(f"swydd exec | {cmd_str}\n")
def execute(self, output: bool = False) -> SwyddSubResult:
def execute(self, output: bool = False) -> SubResult:
procs = []
sub_kwargs: Dict[str, Any] = dict(env={**os.environ, **ctx._env})
self._show_command()
@ -343,36 +353,36 @@ class SwyddPipe:
procs[-1].wait()
out, err = procs[-1].communicate()
return SwyddSubResult.from_popen(procs[-1], out, err)
return SubResult.from_popen(procs[-1], out, err)
def pipe(self, proc: "str | SwyddProc | SwyddPipe") -> "SwyddPipe":
return SwyddPipe(self, proc)
def pipe(self, proc: "str | Proc | ProcPipe") -> "ProcPipe":
return ProcPipe(self, proc)
class SwyddSeq:
def __init__(self, *procs: "str | SwyddProc | SwyddSeq") -> None:
class ProcSeq:
"""
prefer: :func:`Proc.then`
"""
def __init__(self, *procs: "str | Proc | ProcSeq") -> None:
self._procs = []
for proc in procs:
if isinstance(proc, SwyddSeq):
if isinstance(proc, ProcSeq):
self._procs.extend(self._procs)
if isinstance(proc, SwyddProc):
if isinstance(proc, Proc):
self._procs.append(proc)
elif isinstance(proc, str):
self._procs.append(SwyddProc(proc))
self._procs.append(Proc(proc))
def _show_command(self) -> None:
if ctx.verbose:
cmd_str = " && ".join([p._cmd for p in self._procs])
sys.stderr.write(f"sywdd exec | {cmd_str}\n")
@classmethod
def __call__(cls, *args, **kwargs) -> "SwyddSeq":
return cls(*args, **kwargs)
def then(self, proc: "str | Proc | ProcSeq") -> "ProcSeq":
return ProcSeq(*self._procs, proc)
def then(self, proc: "str | SwyddProc | SwyddSeq") -> "SwyddSeq":
return SwyddSeq(*self._procs, proc)
def execute(self, output: bool = False) -> "SwyddSubResult":
def execute(self, output: bool = False) -> "SubResult":
self._show_command()
results = []
@ -396,30 +406,6 @@ class SwyddSeq:
return self.run() == 0
# TODO: best interface for "get"
class SwyddGet:
def __call__(
self, proc: str | SwyddProc | SwyddPipe | SwyddSeq, stdout=True, stderr=False
) -> str:
if isinstance(proc, str):
result = SwyddProc(proc, output=True).execute()
elif isinstance(proc, SwyddPipe):
result = proc.execute(output=True)
elif isinstance(proc, SwyddProc):
result = proc.execute()
elif isinstance(proc, SwyddSeq):
result = proc.execute(output=True)
else:
raise NotImplementedError(f"not implemented for type: {type(exec)}")
output = ""
if stdout and result.stdout:
output += result.stdout.strip()
if stderr and result.stderr:
output += result.stderr.strip()
return output
def _get_caller_path() -> Path:
# NOTE: jupyter will hate this code I'm sure
for i, frame in enumerate(inspect.stack()):
@ -428,35 +414,14 @@ def _get_caller_path() -> Path:
raise ValueError("failed to find root directory of runner")
class SwyddSub:
def __call__(self, proc: str | SwyddPipe | SwyddProc | SwyddSeq) -> bool:
if isinstance(proc, str):
return SwyddProc(proc).check()
elif (
isinstance(proc, SwyddProc)
or isinstance(proc, SwyddSeq)
or isinstance(proc, SwyddPipe)
):
return proc.check()
else:
raise ValueError(f"unspported type: {type(exec)}")
# Asset(ag) / f
# Asset / a <- also support!
class SwyddPath:
_root = None
_path = None
def __init__(self, p: Path | None = None) -> None:
if p:
self._path = Path(p)
@classmethod
def __call__(cls, p: str) -> "SwyddPath":
return cls.from_str(p)
@classmethod
def from_str(cls, p: str) -> "SwyddPath":
return cls() / p
class Asset:
def __init__(self, p: str) -> None:
self._root = _get_caller_path()
self._path = self._root / p
def read(self) -> str:
if self._path:
@ -464,38 +429,28 @@ class SwyddPath:
else:
raise ValueError("path is not set")
def __truediv__(self, p: str | Path) -> "SwyddPath":
if not (root := self._root):
root = _get_caller_path()
if not self._path:
if isinstance(p, str):
return SwyddPath(root / p)
elif isinstance(p, Path):
return SwyddPath(p)
else:
og = self._path.relative_to(root)
return SwyddPath(og / p)
def __truediv__(self, p: str) -> "Asset":
self._path = self._path / p
return self
def _check(self) -> Path:
if self._path is None:
raise ValueError("todo")
return self._path
def _write_text(self, txt: str) -> "SwyddPath":
def _write_text(self, txt: str) -> "Asset":
p = self._check()
p.parent.mkdir(exist_ok=True)
p.write_text(txt + "\n")
return self
def write(self, src: "str | SwyddPath") -> "SwyddPath":
def write(self, src: "str | Asset") -> "Asset":
if isinstance(src, str):
return self._write_text(src)
elif isinstance(src, SwyddPath):
elif isinstance(src, Asset):
return self._write_text(src.read())
def _append_text(self, txt: str) -> "SwyddPath":
def _append_text(self, txt: str) -> "Asset":
p = self._check()
p.parent.mkdir(exist_ok=True)
with p.open("a") as f:
@ -503,11 +458,9 @@ class SwyddPath:
f.write("\n")
return self
def rename(self, dst: "str | SwyddPath | Path") -> None:
def rename(self, dst: "str | Asset | Path") -> None:
if isinstance(dst, str):
dst_p = SwyddPath.from_str(
dst
)._check() # <- TODO: ensure this uses self._root?
dst_p = Asset(dst)._check()
elif isinstance(dst, Path):
dst_p = dst
else:
@ -515,18 +468,18 @@ class SwyddPath:
src_p = self._check()
src_p.rename(dst_p)
def copy(self, src: "str | SwyddPath") -> None:
def copy(self, src: "str | Asset") -> None:
if isinstance(src, str):
self.write(src)
elif isinstance(src, SwyddPath):
elif isinstance(src, Asset):
dst_p = self._check()
src_p = src._check()
shutil.copyfile(src_p, dst_p)
def append(self, src: "str | SwyddPath") -> "SwyddPath":
def append(self, src: "str | Asset") -> "Asset":
if isinstance(src, str):
return self._append_text(src)
elif isinstance(src, SwyddPath):
elif isinstance(src, Asset):
return self._append_text(src.read().strip())
@ -543,6 +496,8 @@ def _inspect_wrapper(place, func):
def task(
arg: Callable[..., Any] | None = None,
):
"""decorator to convert a function into a swydd task"""
def wrapper(
func: Callable[..., Any] | None = None,
) -> Callable[..., Callable[..., None]]:
@ -604,6 +559,18 @@ def option(
short: str = "",
**help_kwargs: str,
) -> Callable[[Callable[..., Any]], Callable[..., Callable[..., None]]]:
"""add help and additional args for option
Parameters
----------
name
variable name used in wrapped function, subsitute underscores with hypens
help
help description for variable
**help_kwargs
kwargs which will be passed onto :func:`argparse.ArgumentParser.add_argument`
"""
def wrapper(func: Callable[..., Any]) -> Callable[..., Callable[..., None]]:
ctx._update_option(func, name.replace("-", "_"), help, short, **help_kwargs)
@ -726,7 +693,7 @@ def _generate_task_subparser(
def executor(*args, **kwargs):
for need in task.needs:
asset(need)._check()
Asset(need)._check()
f = (
_target_generator(target, ctx._graph.nodes[target])(task.func)
@ -771,6 +738,13 @@ def _add_targets(
def cli(default: str | None = None) -> None:
"""activate swydd cli
Parameters
----------
default
The default args passed line passed on to sywdd.
"""
ctx._generate_graph()
if len(sys.argv) > 1 and sys.argv[1] == "+swydd":
_internal_cli()
@ -834,48 +808,103 @@ def cli(default: str | None = None) -> None:
f(**args)
(
proc,
pipe,
seq,
sub,
get,
asset,
) = (
SwyddProc(),
SwyddPipe(),
SwyddSeq(),
SwyddSub(),
SwyddGet(),
SwyddPath(),
)
def get(proc: str | Proc | ProcPipe | ProcSeq, stdout=True, stderr=False) -> str:
"""execute subprocess and capture outputs
see also: :func:`geterr`
Parameters
----------
proc
Command to execute.
stdout
If true, capture stdout
stderr
If true, capture stderr
Returns
-------
str
Captured output
"""
if isinstance(proc, str):
result = Proc(proc).execute(output=True)
elif isinstance(proc, ProcPipe):
result = proc.execute(output=True)
elif isinstance(proc, Proc):
result = proc.execute()
elif isinstance(proc, ProcSeq):
result = proc.execute(output=True)
else:
raise NotImplementedError(f"not implemented for type: {type(exec)}")
output = ""
if stdout and result.stdout:
output += result.stdout.strip()
if stderr and result.stderr:
output += result.stderr.strip()
return output
def geterr(*args, **kwargs) -> str:
get_kwargs = dict(stderr=True, stdout=False)
get_kwargs.update(kwargs)
return get(*args, **get_kwargs)
def geterr(
proc: str | Proc | ProcPipe | ProcSeq,
) -> str:
"""execute subprocess and return stderr
Parameters
----------
proc : str, :class:`.Proc`, :class:`.ProcSeq`, :class:`.ProcPipe`
Command to execute.
Returns
-------
str
Output captured from stderr.
"""
return get(proc, stdout=True, stderr=False)
# TODO: think more about how "rest" should work...
def sub(proc: str | Proc | ProcPipe | ProcSeq, rest: bool = False) -> bool:
if isinstance(proc, str):
cmd = proc if not rest else (proc + " " + " ".join(ctx.rest))
return Proc(cmd).check()
elif (
isinstance(proc, Proc)
or isinstance(proc, ProcSeq)
or isinstance(proc, ProcPipe)
):
if rest:
raise ValueError("rest is only supported when passing str")
return proc.check()
else:
raise ValueError(f"unspported type: {type(exec)}")
def setenv(key: str, value: str) -> None:
"""Set environment variable shared by all Procs"""
ctx._env.update({key: value})
__all__ = [
"proc",
"pipe",
"seq",
"sub",
"get",
"asset",
"ctx",
"geterr",
"setenv",
"cli",
"task",
"targets",
"needs",
"option",
"flags",
"Proc",
"ProcPipe",
"ProcSeq",
"Asset",
]
if __name__ == "__main__":
sys.stderr.write("this module should not be invoked directly\n")
sys.exit(1)
sys.exit()