diff --git a/src/viv/viv.py b/src/viv/viv.py index 599596e..4dba671 100755 --- a/src/viv/viv.py +++ b/src/viv/viv.py @@ -51,7 +51,7 @@ from typing import ( from urllib.error import HTTPError from urllib.request import urlopen -__version__ = "23.5a4-6-g27d7952-dev" +__version__ = "23.5a4-7-g8a5f63d-dev" class Config: @@ -146,20 +146,19 @@ class Spinner: sys.stderr.write("\r") -BOX: Dict[str, str] = { - "v": "│", - "h": "─", - "tl": "╭", - "tr": "╮", - "bl": "╰", - "br": "╯", - "sep": "┆", -} - - class Ansi: """control ouptut of ansi(VT100) control codes""" + BOX: Dict[str, str] = { + "v": "│", + "h": "─", + "tl": "╭", + "tr": "╮", + "bl": "╰", + "br": "╯", + "sep": "┆", + } + def __init__(self) -> None: self.bold: str = "\033[1m" self.dim: str = "\033[2m" @@ -241,7 +240,11 @@ class Ansi: return list(max(map(len, lst)) for lst in map(list, zip(*rows))) # type: ignore def _make_row(self, row: Generator[Any, None, None]) -> str: - return f" {BOX['v']} " + f" {BOX['sep']} ".join(row) + f" {BOX['v']}" + return ( + f" {self.BOX['v']} " + + f" {self.BOX['sep']} ".join(row) + + f" {self.BOX['v']}" + ) def _sanitize_row( self, sizes: List[int], row: Tuple[str, Sequence[str]] @@ -300,14 +303,169 @@ class Ansi: ) ) - sys.stderr.write(f" {BOX['tl']}{BOX['h']*(sum(sizes)+5)}{BOX['tr']}\n") + sys.stderr.write( + "".join( + ( + " ", + self.BOX["tl"], + self.BOX["h"] * (sum(sizes) + 5), + self.BOX["tr"], + "\n", + ) + ) + ) sys.stderr.write("\n".join(table_rows) + "\n") - sys.stderr.write(f" {BOX['bl']}{BOX['h']*(sum(sizes)+5)}{BOX['br']}\n") + sys.stderr.write( + "".join( + ( + " ", + self.BOX["bl"], + self.BOX["h"] * (sum(sizes) + 5), + self.BOX["br"], + "\n", + ) + ) + ) a = Ansi() +class Template: + description = f""" + +{a.tagline()} +to create/activate a vivenv: +- from command line: `{a.style("viv -h","bold")}` +- within python script: {a.style('__import__("viv").use("typer", "rich-click")','bold')} +""" + + _standalone_func = r"""def _viv_use(*pkgs, track_exe=False, name=""): + T,F=True,False;i,s,m,e,spec=__import__,str,map,lambda x: T if x else F,[*pkgs] + if not {*m(type,pkgs)}=={s}: raise ValueError(f"spec: {pkgs} is invalid") + ge,sys,P,ew=i("os").getenv,i("sys"),i("pathlib").Path,i("sys").stderr.write + (cache:=(P(ge("XDG_CACHE_HOME",P.home()/".cache"))/"viv"/"venvs")).mkdir(parents=T,exist_ok=T) + ((sha256:=i("hashlib").sha256()).update((s(spec)+ + (((exe:=("N/A",s(P(i("sys").executable).resolve()))[e(track_exe)])))).encode())) + if ((env:=cache/(name if name else (_id:=sha256.hexdigest()))) + not in cache.glob("*/")) or ge("VIV_FORCE"): + v=e(ge("VIV_VERBOSE"));ew(f"generating new vivenv -> {env.name}\n") + i("venv").EnvBuilder(with_pip=T,clear=T).create(env) + with (env/"pip.conf").open("w") as f:f.write("[global]\ndisable-pip-version-check=true") + if (p:=i("subprocess").run([env/"bin"/"pip","install","--force-reinstall",*spec],text=True, + stdout=(-1,None)[v],stderr=(-2,None)[v])).returncode!=0: + if env.is_dir():i("shutil").rmtree(env) + ew(f"pip had non zero exit ({p.returncode})\n{p.stdout}\n");sys.exit(p.returncode) + with (env/"viv-info.json").open("w") as f: + i("json").dump({"created":s(i("datetime").datetime.today()), + "id":_id,"spec":spec,"exe":exe},f) + sys.path = [p for p in (*sys.path,s(*(env/"lib").glob("py*/si*"))) if p!=i("site").USER_SITE] + return env""" # noqa + + def noqa(self, txt: str) -> str: + max_length = max(map(len, txt.splitlines())) + return "\n".join((f"{line:{max_length}} # noqa" for line in txt.splitlines())) + + def _use_str(self, spec: List[str], standalone: bool = False) -> str: + spec = ", ".join(f'"{req}"' for req in spec) + if standalone: + return f"""_viv_use({fill(spec,width=90,subsequent_indent=" ",)})""" + else: + return f"""__import("viv").use({spec})""" + + def standalone(self, spec: List[str]) -> str: + func_use = self.noqa( + "\n".join((self._standalone_func, self._use_str(spec, standalone=True))) + ) + return f""" +# see `python3 <(curl -fsSL gh.dayl.in/viv/viv.py) --help` +# <<<<< auto-generated by viv (v{__version__}) +# fmt: off +{func_use} +# fmt: on +# >>>>> code golfed with <3 +""" + + def _rel_import(self, local_source: Path): + path_to_viv = path_to_viv = str( + local_source.resolve().absolute().parent.parent + ).replace(str(Path.home()), "~") + return ( + """__import__("sys").path.append(__import__("os")""" + f""".path.expanduser("{path_to_viv}")) # noqa""" + ) + + def _absolute_import(self, local_source: Path): + path_to_viv = local_source.resolve().absolute().parent.parent + return f"""__import__("sys").path.append("{path_to_viv}") # noqa""" + + def frozen_import(self, path: str, local_source: Path, spec: List[str]) -> str: + if path == "abs": + imports = self._absolute_import(local_source) + elif path == "rel": + imports = self._rel_import(local_source) + else: + imports = "" + return f"""\ +{imports} +{self.noqa(self._use_str(spec))} +""" + + def shim( + self, path: str, local_source: Path, standalone: bool, spec: List[str], bin: str + ) -> str: + if standalone: + imports = self._standalone_func + elif path == "abs": + imports = self._absolute_import(local_source) + elif path == "rel": + imports = self._rel_import(local_source) + else: + imports = "" + return f"""\ +#!/usr/bin/env python3 +{imports} +import subprocess +import sys + +if __name__ == "__main__": + vivenv = {self._use_str(spec, standalone)} + sys.exit(subprocess.run([vivenv / "bin" / "{bin}", *sys.argv[1:]]).returncode) +""" + + def update(self, src, cli, local_version, next_version): + return f""" + Update source at {a.green}{src}{a.end} + Symlink {a.bold}{src}{a.end} to {a.bold}{cli}{a.end} + Version {a.bold}{local_version}{a.end} -> {a.bold}{next_version}{a.end} + +""" + + def install(self, src, cli): + return f""" + Install viv.py to {a.green}{src}{a.end} + Symlink {a.bold}{src}{a.end} to {a.bold}{cli}{a.end} + +""" + + def show(self, cli, running, local): + return ( + "\n".join( + f" {a.bold}{k}{a.end}: {v}" + for k, v in ( + ("Version", __version__), + ("CLI", cli), + ("Running Source", running), + ("Local Source", local), + ) + ) + + "\n" + ) + + +t = Template() + + # TODO: convet the below functions into a proper file/stream logging interface def echo( msg: str, style: str = "magenta", newline: bool = True, fd: TextIO = sys.stderr @@ -678,103 +836,10 @@ def get_venvs() -> Dict[str, ViVenv]: return vivenvs -# TODO: make a template class? - -SYS_PATH_TEMPLATE = """__import__("sys").path.append("{path_to_viv}") # noqa""" -REL_SYS_PATH_TEMPLATE = ( - """__import__("sys").path.append(__import__("os")""" - """.path.expanduser("{path_to_viv}")) # noqa""" -) -IMPORT_TEMPLATE = """__import__("viv").use({spec}) # noqa""" - -STANDALONE_TEMPLATE = r""" -# <<<<< auto-generated by viv (v{version}) -# see `python3 <(curl -fsSL gh.dayl.in/viv/viv.py) --help` -# fmt: off -{func} -# fmt: on -# >>>>> code golfed with <3 -""" # noqa - -STANDALONE_TEMPLATE_FUNC = r"""def _viv_use(*pkgs, track_exe=False, name=""): - T,F=True,False;i,s,m,e,spec=__import__,str,map,lambda x: T if x else F,[*pkgs] - if not {*m(type,pkgs)}=={s}: raise ValueError(f"spec: {pkgs} is invalid") - ge,sys,P,ew=i("os").getenv,i("sys"),i("pathlib").Path,i("sys").stderr.write - (cache:=(P(ge("XDG_CACHE_HOME",P.home()/".cache"))/"viv"/"venvs")).mkdir(parents=T,exist_ok=T) - ((sha256:=i("hashlib").sha256()).update((s(spec)+ - (((exe:=("N/A",s(P(i("sys").executable).resolve()))[e(track_exe)])))).encode())) - if ((env:=cache/(name if name else (_id:=sha256.hexdigest()))) - not in cache.glob("*/")) or ge("VIV_FORCE"): - v=e(ge("VIV_VERBOSE"));ew(f"generating new vivenv -> {env.name}\n") - i("venv").EnvBuilder(with_pip=T,clear=T).create(env) - with (env/"pip.conf").open("w") as f:f.write("[global]\ndisable-pip-version-check=true") - if (p:=i("subprocess").run([env/"bin"/"pip","install","--force-reinstall",*spec],text=True, - stdout=(-1,None)[v],stderr=(-2,None)[v])).returncode!=0: - if env.is_dir():i("shutil").rmtree(env) - ew(f"pip had non zero exit ({p.returncode})\n{p.stdout}\n");sys.exit(p.returncode) - with (env/"viv-info.json").open("w") as f: - i("json").dump({"created":s(i("datetime").datetime.today()), - "id":_id,"spec":spec,"exe":exe},f) - sys.path = [p for p in (*sys.path,s(*(env/"lib").glob("py*/si*"))) if p!=i("site").USER_SITE] - return env -""" # noqa - -SHOW_TEMPLATE = f""" - {a.style('Version', 'bold')}: {{version}} - {a.style('CLI', 'bold')}: {{cli}} - {a.style('Running Source', 'bold')}: {{running_src}} - {a.style('Local Source', 'bold')}: {{local_src}} -""" - -INSTALL_TEMPLATE = f""" - Install viv.py to {a.green}{{src_location}}{a.end} - Symlink {a.bold}{{src_location}}{a.end} to {a.bold}{{cli_location}}{a.end} - -""" - -UPDATE_TEMPLATE = f""" - Update source at {a.green}{{src_location}}{a.end} - Symlink {a.bold}{{src_location}}{a.end} to {a.bold}{{cli_location}}{a.end} - Version {a.bold}{{local_version}}{a.end} -> {a.bold}{{next_version}}{a.end} - -""" - -SHIM_TEMPLATE = """\ -#!/usr/bin/env python3 - -{imports} -import subprocess -import sys - -if __name__ == "__main__": - vivenv = {use} - sys.exit(subprocess.run([vivenv / "bin" / "{bin}", *sys.argv[1:]]).returncode) -""" - -DESCRIPTION = f""" - -{a.tagline()} -to create/activate a vivenv: -- from command line: `{a.style("viv -h","bold")}` -- within python script: {a.style('__import__("viv").use("typer", "rich-click")','bold')} -""" - - -def noqa(txt: str) -> str: - max_length = max(map(len, txt.splitlines())) - return "\n".join((f"{line:{max_length}} # noqa" for line in txt.splitlines())) - - -def spec_to_import(spec: List[str]) -> None: - spec_str = ", ".join(f'"{pkg}"' for pkg in spec) - sys.stdout.write(IMPORT_TEMPLATE.format(spec=spec_str) + "\n") - - def combined_spec(reqs: List[str], requirements: Path) -> List[str]: if requirements: with requirements.open("r") as f: reqs += f.readlines() - return reqs @@ -801,66 +866,6 @@ def resolve_deps(args: Namespace) -> List[str]: return resolved_spec.splitlines() -def generate_import( - args: Namespace, -) -> None: - spec = resolve_deps(args) - if args.keep: - # re-create env again since path's are hard-coded - vivenv = ViVenv(spec) - - if vivenv.name not in [d.name for d in c.venvcache.iterdir()] or os.getenv( - "VIV_FORCE" - ): - vivenv.create() - vivenv.install_pkgs() - vivenv.dump_info(write=True) - - else: - echo("re-using existing vivenv") - - echo("see below for import statements\n") - - if args.standalone: - sys.stdout.write( - STANDALONE_TEMPLATE.format( - version=__version__, - func=noqa( - STANDALONE_TEMPLATE_FUNC - + "_viv_use(" - + fill( - ", ".join(f'"{pkg}"' for pkg in spec), - width=100, - subsequent_indent=" ", - ) - + ")" - ), - ) - + "\n" - ) - return - - if args.path: - if args.path == "abs": - sys.stdout.write( - SYS_PATH_TEMPLATE.format( - path_to_viv=Path(__file__).resolve().absolute().parent.parent - ) - + "\n" - ) - elif args.path == "rel": - sys.stdout.write( - REL_SYS_PATH_TEMPLATE.format( - path_to_viv=str( - Path(__file__).resolve().absolute().parent.parent - ).replace(str(Path.home()), "~") - ) - + "\n" - ) - - spec_to_import(spec) - - def fetch_source(reference: str) -> str: try: r = urlopen( @@ -982,9 +987,29 @@ class Viv: if args.path and args.standalone: error("-p/--path and -s/--standalone are mutually exclusive", code=1) - generate_import( - args, - ) + spec = resolve_deps(args) + if args.keep: + # re-create env again since path's are hard-coded + vivenv = ViVenv(spec) + + if not vivenv.exists or os.getenv("VIV_FORCE"): + vivenv.create() + vivenv.install_pkgs() + vivenv.dump_info(write=True) + + else: + echo("re-using existing vivenv") + + echo("see below for import statements\n") + + if args.standalone: + sys.stdout.write(t.standalone(spec)) + return + + if args.path and not self.local_source: + error("No local viv found to import from", code=1) + + sys.stdout.write(t.frozen_import(args.path, self.local_source, spec)) def list(self, args: Namespace) -> None: """list all vivenvs""" @@ -1071,11 +1096,10 @@ class Viv: else: echo("Current:") sys.stderr.write( - SHOW_TEMPLATE.format( - version=__version__, + t.show( cli=shutil.which("viv"), - running_src=self.running_source, - local_src=self.local_source, + running=self.running_source, + local=self.local_source, ) ) @@ -1103,12 +1127,7 @@ class Viv: if confirm( "Would you like to perform the above installation steps?", - UPDATE_TEMPLATE.format( - src_location=self.local_source, - local_version=self.local_version, - cli_location=args.cli, - next_version=next_version, - ), + t.update(self.local_source, args.cli, self.local_version, next_version), ): self._install_local_src( sha256, @@ -1139,10 +1158,7 @@ class Viv: if confirm( "Would you like to perform the above installation steps?", - INSTALL_TEMPLATE.format( - src_location=args.src, - cli_location=args.cli, - ), + t.install(args.src, args.cli), ): self._install_local_src(sha256, args.src, args.cli) elif args.cmd == "purge": @@ -1208,37 +1224,16 @@ class Viv: else: spec = combined_spec(args.reqs, args.requirements) - spec_str = ", ".join(f'"{req}"' for req in spec) - if args.standalone: - imports = STANDALONE_TEMPLATE.format( - version=__version__, func=noqa(STANDALONE_TEMPLATE_FUNC) - ) - use = f"_viv_use({spec_str})" - elif args.path: - if not self.local_source: - error("No local viv found to import from", code=1) - else: - imports = ( - REL_SYS_PATH_TEMPLATE.format( - path_to_viv=str( - self.local_source.resolve().absolute().parent.parent - ).replace(str(Path.home()), "~") - ) - if args.path == "abs" - else SYS_PATH_TEMPLATE.format( - path_to_viv=self.local_source.resolve().absolute().parent.parent - ) - ) - use = IMPORT_TEMPLATE.format(spec=spec_str) - else: - imports = "" - use = IMPORT_TEMPLATE.format(spec=spec_str) + if args.path and not self.local_source: + error("No local viv found to import from", code=1) if confirm( f"Write shim for {a.style(bin,'bold')} to {a.style(output,'green')}?" ): with output.open("w") as f: - f.write(SHIM_TEMPLATE.format(imports=imports, use=use, bin=bin)) + f.write( + t.shim(args.path, self.local_source, args.standalone, spec, bin) + ) make_executable(output) @@ -1302,7 +1297,7 @@ class Viv: def cli(self) -> None: """cli entrypoint""" - parser = ArgumentParser(prog=self.name, description=DESCRIPTION) + parser = ArgumentParser(prog=self.name, description=t.description) parser.add_argument( "-V", "--version",