diff --git a/src/swydd/__init__.py b/src/swydd/__init__.py index 57ec693..86a94f5 100644 --- a/src/swydd/__init__.py +++ b/src/swydd/__init__.py @@ -162,7 +162,7 @@ class Context: return self._tasks[self._ids[name]] -class SwyddFlags: +class Flags: def __init__(self) -> None: self._flags: Dict[str, Any] = {} self._flag_defs: List[Tuple[Tuple[str, ...], Any]] = [] @@ -189,10 +189,10 @@ class SwyddFlags: ctx = Context() -flags = SwyddFlags() +flags = Flags() -class SwyddSubResult: +class SubResult: def __init__( self, code: int, @@ -206,7 +206,7 @@ class SwyddSubResult: self._proces = process @classmethod - def from_completed_process(cls, p: CompletedProcess) -> "SwyddSubResult": + def from_completed_process(cls, p: CompletedProcess) -> "SubResult": return cls(p.returncode, p.stdout, p.stderr, p) @classmethod @@ -215,49 +215,60 @@ class SwyddSubResult: p: Popen, stdout: str = "", stderr: str = "", - ) -> "SwyddSubResult": + ) -> "SubResult": return cls(p.returncode, stdout, stderr, p) -class SwyddProc: - def __init__( - self, cmd: str | None = None, output: bool = False, **kwargs: Any - ) -> None: +class Proc: + """a class""" + + def __init__(self, cmd: str, **kwargs: Any) -> None: self._cmd = cmd if cmd: self.cmd = shlex.split(cmd) - self.output = output self.cmd_kwargs = kwargs - @classmethod - def __call__(cls, *args, **kwargs) -> "SwyddProc": - return cls(*args, **kwargs) + def pipe(self, proc: str | Proc) -> ProcPipe: + """ProcPipe the output into `proc` + + Parameters + ---------- + proc + Command to feed stdout into + Returns + ------- + ProcPipe + """ - def pipe(self, proc: "str | SwyddProc") -> "SwyddPipe | SwyddProc": if isinstance(proc, str): - if self._cmd is None: - return SwyddPipe(SwyddProc(proc)) - else: - return SwyddPipe(self, proc) - elif isinstance(proc, SwyddProc): - return SwyddPipe(proc) + return ProcPipe(self, proc) + elif isinstance(proc, Proc): + return ProcPipe(proc) - def then(self, proc: "str | SwyddProc | SwyddSeq") -> "SwyddSeq": - if self._cmd: - return SwyddSeq(self, proc) + def then(self, proc: str | Proc | ProcSeq) -> ProcSeq: + """If successful execute next `proc` - if isinstance(proc, SwyddProc): - return SwyddSeq(proc) + Parameters + ---------- + proc + Command to execute after + Returns + ------- + ProcSeq + """ + + if isinstance(proc, Proc): + return ProcSeq(proc) # should swydd seq even be supported here? - elif isinstance(proc, SwyddSeq): + elif isinstance(proc, ProcSeq): return proc else: - return SwyddSeq(SwyddProc(proc)) + return ProcSeq(Proc(proc)) - def _build_kwargs(self) -> Dict[str, Any]: + def _build_kwargs(self, output: bool) -> Dict[str, Any]: sub_kwargs: Dict[str, Any] = dict(env={**os.environ, **ctx._env}) - if self.output: + if output: sub_kwargs["text"] = True # assume text is the desired output sub_kwargs["stdout"] = PIPE @@ -268,11 +279,10 @@ class SwyddProc: if ctx.verbose: sys.stdout.write(f"swydd exec | {self._cmd}\n") - def execute(self, output: bool = False) -> SwyddSubResult: - self.output = self.output or output + def execute(self, output: bool = False) -> SubResult: self._show_command() - p = Popen(self.cmd, **self._build_kwargs()) + p = Popen(self.cmd, **self._build_kwargs(output)) try: out, err = p.communicate() @@ -283,27 +293,27 @@ class SwyddProc: p.wait() out, err = p.communicate() - return SwyddSubResult.from_popen(p, out, err) + return SubResult.from_popen(p, out, err) def check(self) -> bool: return self.execute().code == 0 -class SwyddPipe: - def __init__(self, *procs: "str | SwyddProc | SwyddPipe") -> None: +class ProcPipe: + """ + prefer: :func:`Proc.pipe` + """ + + def __init__(self, *procs: str | Proc | ProcPipe) -> None: self._procs = [] for proc in procs: if isinstance(proc, str): - self._procs.append(SwyddProc(proc)) - elif isinstance(proc, SwyddProc): + self._procs.append(Proc(proc)) + elif isinstance(proc, Proc): self._procs.append(proc) - elif isinstance(proc, SwyddPipe): + elif isinstance(proc, ProcPipe): self._procs.extend(proc._procs) - @classmethod - def __call__(cls, *args, **kwargs) -> "SwyddPipe": - return cls(*args, **kwargs) - def check(self) -> bool: return self.execute().code == 0 @@ -312,7 +322,7 @@ class SwyddPipe: cmd_str = " | ".join([p._cmd for p in self._procs]) sys.stdout.write(f"swydd exec | {cmd_str}\n") - def execute(self, output: bool = False) -> SwyddSubResult: + def execute(self, output: bool = False) -> SubResult: procs = [] sub_kwargs: Dict[str, Any] = dict(env={**os.environ, **ctx._env}) self._show_command() @@ -343,36 +353,36 @@ class SwyddPipe: procs[-1].wait() out, err = procs[-1].communicate() - return SwyddSubResult.from_popen(procs[-1], out, err) + return SubResult.from_popen(procs[-1], out, err) - def pipe(self, proc: "str | SwyddProc | SwyddPipe") -> "SwyddPipe": - return SwyddPipe(self, proc) + def pipe(self, proc: "str | Proc | ProcPipe") -> "ProcPipe": + return ProcPipe(self, proc) -class SwyddSeq: - def __init__(self, *procs: "str | SwyddProc | SwyddSeq") -> None: +class ProcSeq: + """ + prefer: :func:`Proc.then` + """ + + def __init__(self, *procs: "str | Proc | ProcSeq") -> None: self._procs = [] for proc in procs: - if isinstance(proc, SwyddSeq): + if isinstance(proc, ProcSeq): self._procs.extend(self._procs) - if isinstance(proc, SwyddProc): + if isinstance(proc, Proc): self._procs.append(proc) elif isinstance(proc, str): - self._procs.append(SwyddProc(proc)) + self._procs.append(Proc(proc)) def _show_command(self) -> None: if ctx.verbose: cmd_str = " && ".join([p._cmd for p in self._procs]) sys.stderr.write(f"sywdd exec | {cmd_str}\n") - @classmethod - def __call__(cls, *args, **kwargs) -> "SwyddSeq": - return cls(*args, **kwargs) + def then(self, proc: "str | Proc | ProcSeq") -> "ProcSeq": + return ProcSeq(*self._procs, proc) - def then(self, proc: "str | SwyddProc | SwyddSeq") -> "SwyddSeq": - return SwyddSeq(*self._procs, proc) - - def execute(self, output: bool = False) -> "SwyddSubResult": + def execute(self, output: bool = False) -> "SubResult": self._show_command() results = [] @@ -396,30 +406,6 @@ class SwyddSeq: return self.run() == 0 -# TODO: best interface for "get" -class SwyddGet: - def __call__( - self, proc: str | SwyddProc | SwyddPipe | SwyddSeq, stdout=True, stderr=False - ) -> str: - if isinstance(proc, str): - result = SwyddProc(proc, output=True).execute() - elif isinstance(proc, SwyddPipe): - result = proc.execute(output=True) - elif isinstance(proc, SwyddProc): - result = proc.execute() - elif isinstance(proc, SwyddSeq): - result = proc.execute(output=True) - else: - raise NotImplementedError(f"not implemented for type: {type(exec)}") - - output = "" - if stdout and result.stdout: - output += result.stdout.strip() - if stderr and result.stderr: - output += result.stderr.strip() - return output - - def _get_caller_path() -> Path: # NOTE: jupyter will hate this code I'm sure for i, frame in enumerate(inspect.stack()): @@ -428,35 +414,14 @@ def _get_caller_path() -> Path: raise ValueError("failed to find root directory of runner") -class SwyddSub: - def __call__(self, proc: str | SwyddPipe | SwyddProc | SwyddSeq) -> bool: - if isinstance(proc, str): - return SwyddProc(proc).check() - elif ( - isinstance(proc, SwyddProc) - or isinstance(proc, SwyddSeq) - or isinstance(proc, SwyddPipe) - ): - return proc.check() - else: - raise ValueError(f"unspported type: {type(exec)}") +# Asset(ag) / f +# Asset / a <- also support! -class SwyddPath: - _root = None - _path = None - - def __init__(self, p: Path | None = None) -> None: - if p: - self._path = Path(p) - - @classmethod - def __call__(cls, p: str) -> "SwyddPath": - return cls.from_str(p) - - @classmethod - def from_str(cls, p: str) -> "SwyddPath": - return cls() / p +class Asset: + def __init__(self, p: str) -> None: + self._root = _get_caller_path() + self._path = self._root / p def read(self) -> str: if self._path: @@ -464,38 +429,28 @@ class SwyddPath: else: raise ValueError("path is not set") - def __truediv__(self, p: str | Path) -> "SwyddPath": - if not (root := self._root): - root = _get_caller_path() - - if not self._path: - if isinstance(p, str): - return SwyddPath(root / p) - elif isinstance(p, Path): - return SwyddPath(p) - else: - og = self._path.relative_to(root) - - return SwyddPath(og / p) + def __truediv__(self, p: str) -> "Asset": + self._path = self._path / p + return self def _check(self) -> Path: if self._path is None: raise ValueError("todo") return self._path - def _write_text(self, txt: str) -> "SwyddPath": + def _write_text(self, txt: str) -> "Asset": p = self._check() p.parent.mkdir(exist_ok=True) p.write_text(txt + "\n") return self - def write(self, src: "str | SwyddPath") -> "SwyddPath": + def write(self, src: "str | Asset") -> "Asset": if isinstance(src, str): return self._write_text(src) - elif isinstance(src, SwyddPath): + elif isinstance(src, Asset): return self._write_text(src.read()) - def _append_text(self, txt: str) -> "SwyddPath": + def _append_text(self, txt: str) -> "Asset": p = self._check() p.parent.mkdir(exist_ok=True) with p.open("a") as f: @@ -503,11 +458,9 @@ class SwyddPath: f.write("\n") return self - def rename(self, dst: "str | SwyddPath | Path") -> None: + def rename(self, dst: "str | Asset | Path") -> None: if isinstance(dst, str): - dst_p = SwyddPath.from_str( - dst - )._check() # <- TODO: ensure this uses self._root? + dst_p = Asset(dst)._check() elif isinstance(dst, Path): dst_p = dst else: @@ -515,18 +468,18 @@ class SwyddPath: src_p = self._check() src_p.rename(dst_p) - def copy(self, src: "str | SwyddPath") -> None: + def copy(self, src: "str | Asset") -> None: if isinstance(src, str): self.write(src) - elif isinstance(src, SwyddPath): + elif isinstance(src, Asset): dst_p = self._check() src_p = src._check() shutil.copyfile(src_p, dst_p) - def append(self, src: "str | SwyddPath") -> "SwyddPath": + def append(self, src: "str | Asset") -> "Asset": if isinstance(src, str): return self._append_text(src) - elif isinstance(src, SwyddPath): + elif isinstance(src, Asset): return self._append_text(src.read().strip()) @@ -543,6 +496,8 @@ def _inspect_wrapper(place, func): def task( arg: Callable[..., Any] | None = None, ): + """decorator to convert a function into a swydd task""" + def wrapper( func: Callable[..., Any] | None = None, ) -> Callable[..., Callable[..., None]]: @@ -604,6 +559,18 @@ def option( short: str = "", **help_kwargs: str, ) -> Callable[[Callable[..., Any]], Callable[..., Callable[..., None]]]: + """add help and additional args for option + + Parameters + ---------- + name + variable name used in wrapped function, subsitute underscores with hypens + help + help description for variable + **help_kwargs + kwargs which will be passed onto :func:`argparse.ArgumentParser.add_argument` + """ + def wrapper(func: Callable[..., Any]) -> Callable[..., Callable[..., None]]: ctx._update_option(func, name.replace("-", "_"), help, short, **help_kwargs) @@ -726,7 +693,7 @@ def _generate_task_subparser( def executor(*args, **kwargs): for need in task.needs: - asset(need)._check() + Asset(need)._check() f = ( _target_generator(target, ctx._graph.nodes[target])(task.func) @@ -771,6 +738,13 @@ def _add_targets( def cli(default: str | None = None) -> None: + """activate swydd cli + + Parameters + ---------- + default + The default args passed line passed on to sywdd. + """ ctx._generate_graph() if len(sys.argv) > 1 and sys.argv[1] == "+swydd": _internal_cli() @@ -834,48 +808,103 @@ def cli(default: str | None = None) -> None: f(**args) -( - proc, - pipe, - seq, - sub, - get, - asset, -) = ( - SwyddProc(), - SwyddPipe(), - SwyddSeq(), - SwyddSub(), - SwyddGet(), - SwyddPath(), -) +def get(proc: str | Proc | ProcPipe | ProcSeq, stdout=True, stderr=False) -> str: + """execute subprocess and capture outputs + + see also: :func:`geterr` + + Parameters + ---------- + proc + Command to execute. + stdout + If true, capture stdout + stderr + If true, capture stderr + + Returns + ------- + str + Captured output + """ + if isinstance(proc, str): + result = Proc(proc).execute(output=True) + elif isinstance(proc, ProcPipe): + result = proc.execute(output=True) + elif isinstance(proc, Proc): + result = proc.execute() + elif isinstance(proc, ProcSeq): + result = proc.execute(output=True) + else: + raise NotImplementedError(f"not implemented for type: {type(exec)}") + + output = "" + if stdout and result.stdout: + output += result.stdout.strip() + if stderr and result.stderr: + output += result.stderr.strip() + return output -def geterr(*args, **kwargs) -> str: - get_kwargs = dict(stderr=True, stdout=False) - get_kwargs.update(kwargs) - return get(*args, **get_kwargs) +def geterr( + proc: str | Proc | ProcPipe | ProcSeq, +) -> str: + """execute subprocess and return stderr + + + Parameters + ---------- + proc : str, :class:`.Proc`, :class:`.ProcSeq`, :class:`.ProcPipe` + Command to execute. + + Returns + ------- + str + Output captured from stderr. + """ + return get(proc, stdout=True, stderr=False) + + +# TODO: think more about how "rest" should work... +def sub(proc: str | Proc | ProcPipe | ProcSeq, rest: bool = False) -> bool: + if isinstance(proc, str): + cmd = proc if not rest else (proc + " " + " ".join(ctx.rest)) + return Proc(cmd).check() + elif ( + isinstance(proc, Proc) + or isinstance(proc, ProcSeq) + or isinstance(proc, ProcPipe) + ): + if rest: + raise ValueError("rest is only supported when passing str") + return proc.check() + else: + raise ValueError(f"unspported type: {type(exec)}") def setenv(key: str, value: str) -> None: + """Set environment variable shared by all Procs""" ctx._env.update({key: value}) __all__ = [ - "proc", - "pipe", - "seq", "sub", "get", - "asset", "ctx", "geterr", "setenv", "cli", "task", + "targets", + "needs", + "option", "flags", + "Proc", + "ProcPipe", + "ProcSeq", + "Asset", ] if __name__ == "__main__": sys.stderr.write("this module should not be invoked directly\n") - sys.exit(1) + sys.exit()