From fcb48daadeb95c3e69ca6eb901954f7bba2aa632 Mon Sep 17 00:00:00 2001 From: Daylin Morgan Date: Wed, 31 May 2023 11:30:02 -0500 Subject: [PATCH] feat(metadata)!: revamp metadata tracking This breaks any existing cache but now allows for tracking files that access vivenv's --- scripts/bump-dev.sh | 1 - src/viv/viv.py | 377 +++++++++++++++++++++----------------------- 2 files changed, 179 insertions(+), 199 deletions(-) diff --git a/scripts/bump-dev.sh b/scripts/bump-dev.sh index fea0dac..4c25a17 100755 --- a/scripts/bump-dev.sh +++ b/scripts/bump-dev.sh @@ -2,4 +2,3 @@ TAG=$(git describe --tags --always --dirty=-dev --exclude 'latest') VERSION="${TAG#v}" sed -i "s/__version__ = \".*\"/__version__ = \"$VERSION\"/g" src/viv/viv.py -git add src/viv/viv.py diff --git a/src/viv/viv.py b/src/viv/viv.py index 723486f..d1369ab 100755 --- a/src/viv/viv.py +++ b/src/viv/viv.py @@ -9,6 +9,7 @@ from __future__ import annotations import hashlib +import inspect import itertools import json import os @@ -31,19 +32,17 @@ from argparse import ( _SubParsersAction, ) from argparse import ArgumentParser as StdArgParser +from dataclasses import dataclass from datetime import datetime -from itertools import zip_longest from pathlib import Path -from textwrap import dedent, fill, wrap +from textwrap import dedent, fill from types import TracebackType from typing import ( Any, Dict, - Generator, List, NoReturn, Optional, - Sequence, TextIO, Tuple, Type, @@ -149,16 +148,6 @@ class Spinner: class Ansi: """control ouptut of ansi(VT100) control codes""" - BOX: Dict[str, str] = { - "v": "│", - "h": "─", - "tl": "╭", - "tr": "╮", - "bl": "╰", - "br": "╯", - "sep": "┆", - } - def __init__(self) -> None: self.bold: str = "\033[1m" self.dim: str = "\033[2m" @@ -229,104 +218,9 @@ class Ansi: new_output = [f"{self.red}->{self.end} {line}" for line in output.splitlines()] sys.stdout.write("\n".join(new_output) + "\n") - def _get_column_sizes( - self, rows: Tuple[Tuple[str, Sequence[str]], ...] - ) -> List[int]: - """convert list of rows to list of columns sizes - - First convert rows into list of columns, - then get max string length for each column. - """ - return list(max(map(len, lst)) for lst in map(list, zip(*rows))) # type: ignore - - def _make_row(self, row: Generator[Any, None, None]) -> str: - return ( - f" {self.BOX['v']} " - + f" {self.BOX['sep']} ".join(row) - + f" {self.BOX['v']}" - ) - - def _sanitize_row( - self, sizes: List[int], row: Tuple[str, Sequence[str]] - ) -> Tuple[Tuple[str, Sequence[str]], ...]: - if len(row[1]) > sizes[1]: - return tuple( - zip_longest( - (row[0],), - wrap(str(row[1]), break_on_hyphens=False, width=sizes[1]), - fillvalue="", - ) - ) - else: - return (row,) - def viv_preamble(self, style: str = "magenta", sep: str = "::") -> str: return f"{self.cyan}viv{self.end}{self.__dict__[style]}{sep}{self.end}" - def table( - self, rows: Tuple[Tuple[str, Sequence[str]], ...], header_style: str = "cyan" - ) -> None: - """generate a table with outline and styled header assumes two columns - - Args: - rows: sequence of the rows, first item assumed to be header - header_style: color/style for header row - """ - - sizes = self._get_column_sizes(rows) - - col2_limit = shutil.get_terminal_size().columns - 20 - if col2_limit < 20: - error("increase screen size to view table", code=1) - elif sizes[1] > col2_limit: - sizes[1] = col2_limit - - header, rows = rows[0], rows[1:] - # this is maybe taking comprehensions too far.... - table_rows = ( - self._make_row(row) - for row in ( - # header row - ( - self.__dict__[header_style] + f"{cell:<{sizes[i]}}" + self.end - for i, cell in enumerate(header) - ), - # rest of the rows - *( - (f"{cell:<{sizes[i]}}" for i, cell in enumerate(row)) - for row in ( - newrow - for row in rows - for newrow in self._sanitize_row(sizes, row) - ) - ), - ) - ) - - sys.stderr.write( - "".join( - ( - " ", - self.BOX["tl"], - self.BOX["h"] * (sum(sizes) + 5), - self.BOX["tr"], - "\n", - ) - ) - ) - sys.stderr.write("\n".join(table_rows) + "\n") - sys.stderr.write( - "".join( - ( - " ", - self.BOX["bl"], - self.BOX["h"] * (sum(sizes) + 5), - self.BOX["br"], - "\n", - ) - ) - ) - a = Ansi() @@ -339,26 +233,45 @@ to create/activate a vivenv: - from command line: `{a.style("viv -h","bold")}` - within python script: {a.style('__import__("viv").use("typer", "rich-click")','bold')} """ + _standalone_func = r"""def _viv_use(*pkgs, track_exe=False, name=""): - T,F,N=True,False,None;i,s,m,spec=__import__,str,map,[*pkgs] - e,w=lambda x: T if x else F,lambda p,t: p.write_text(t) - if not {*m(type,pkgs)}=={s}: raise ValueError(f"spec: {pkgs} is invalid") - ge,sys,P,ew=i("os").getenv,i("sys"),i("pathlib").Path,i("sys").stderr.write - (cache:=(P(ge("XDG_CACHE_HOME",P.home()/".cache"))/"viv"/"venvs")).mkdir(parents=T,exist_ok=T) - ((sha256:=i("hashlib").sha256()).update((s(spec)+ - (((exe:=("N/A",s(P(i("sys").executable).resolve()))[e(track_exe)])))).encode())) - if {env:=cache/(((_id:=sha256.hexdigest()),name)[e(name)])}-{*cache.glob("*/")} or ge("VIV_FORCE"): - v=e(ge("VIV_VERBOSE"));ew(f"generating new vivenv -> {env.name}\n") - i("venv").EnvBuilder(with_pip=T,clear=T).create(env) - w(env/"pip.conf","[global]\ndisable-pip-version-check=true") - if (rc:=(p:=i("subprocess").run([env/"bin"/"pip","install","--force-reinstall",*spec],text=T, - stdout=(-1,N)[v],stderr=(-2,N)[v])).returncode)!=0: - if env.is_dir():i("shutil").rmtree(env) - ew(f"pip had non zero exit ({rc})\n{p.stdout}\n");sys.exit(rc) - w(env/"viv-info.json",i("json").dumps( - {"created":s(i("datetime").datetime.today()),"id":_id,"spec":spec,"exe":exe})) - sys.path=[p for p in (*sys.path,s(*(env/"lib").glob("py*/si*")))if p!=i("site").USER_SITE] - return env""" # noqa + import hashlib, json, os, site, shutil, sys, venv # noqa + from pathlib import Path # noqa + from datetime import datetime # noqa + from subprocess import run # noqa + + if not {*map(type, pkgs)} == {str}: + raise ValueError(f"spec: {pkgs} is invalid") + + meta = dict.fromkeys(("created", "accessed"), (t := str(datetime.today()))) + runner = str(Path(__file__).absolute().resolve()) + force, verbose, xdg = map(os.getenv, ("VIV_FORCE", "VIV_VERBOSE", "XDG_CACHE_HOME")) + cache = (Path(xdg) if xdg else Path.home() / ".cache") / "viv" / "venvs" + cache.mkdir(parents=True, exist_ok=True) + exe = str(Path(sys.executable).resolve()) if track_exe else "N/A" + (sha256 := hashlib.sha256()).update((str(spec := [*pkgs]) + exe).encode()) + _id = sha256.hexdigest() + if (env := cache / (name if name else _id)) not in cache.glob("*/") or force: + sys.stderr.write(f"generating new vivenv -> {env.name}\n") + venv.EnvBuilder(with_pip=True, clear=True).create(env) + (env / "pip.conf").write_text("[global]\ndisable-pip-version-check=true") + run_kw = dict(zip(("stdout", "stderr"), ((None,) * 2 if verbose else (-1, 2)))) + p = run([env / "bin" / "pip", "install", "--force-reinstall", *spec], **run_kw) + if (rc := p.returncode) != 0: + if env.is_dir(): + shutil.rmtree(env) + sys.stderr.write(f"pip had non zero exit ({rc})\n{p.stdout.decode()}\n") + sys.exit(rc) + meta.update(dict(id=_id, spec=spec, exe=exe, name=name, files=[runner])) + else: + meta = json.loads((env / "vivmeta.json").read_text()) + meta.update(dict(accessed=t, files=sorted({*meta["files"],runner}))) + + (env / "vivmeta.json").write_text(json.dumps(meta)) + sys.path = [p for p in sys.path if not p != site.USER_SITE] + site.addsitedir(str(*(env / "lib").glob("py*/si*"))) + return env +""" def noqa(self, txt: str) -> str: max_length = max(map(len, txt.splitlines())) @@ -372,16 +285,13 @@ to create/activate a vivenv: return f"""__import__("viv").use({spec_str})""" def standalone(self, spec: List[str]) -> str: - func_use = self.noqa( - "\n".join((self._standalone_func, self._use_str(spec, standalone=True))) + func_use = "\n".join( + (self._standalone_func, self.noqa(self._use_str(spec, standalone=True))) ) return f""" # see `python3 <(curl -fsSL viv.dayl.in/viv.py) --help` -# <<<<< auto-generated by viv (v{__version__}) -# fmt: off +# AUTOGENERATED by viv (v{__version__}) {func_use} -# fmt: on -# >>>>> code golfed with <3 """ def _rel_import(self, local_source: Optional[Path]) -> str: @@ -426,9 +336,7 @@ to create/activate a vivenv: bin: str, ) -> str: if standalone: - imports = "\n".join( - ("# fmt: off", self.noqa(self._standalone_func), "# fmt: on") - ) + imports = self._standalone_func elif path == "abs": imports = self._absolute_import(local_source) elif path == "rel": @@ -724,37 +632,78 @@ def get_hash(spec: Tuple[str, ...] | List[str], track_exe: bool = False) -> str: return sha256.hexdigest() +@dataclass +class Meta: + name: str + id: str + spec: List[str] + files: List[str] + exe: str + created: str = "" + accessed: str = "" + + @classmethod + def load(cls, name: str) -> "Meta": + if not (c.venvcache / name / "vivmeta.json").exists(): + warn(f"possibly corrupted vivenv: {name}") + # add empty values for corrupted vivenvs so it will still load + return cls(name=name, spec=[""], files=[""], exe="", id="") + else: + meta = json.loads((c.venvcache / name / "vivmeta.json").read_text()) + + return cls(**meta) + + def write(self, p: Path | None = None) -> None: + if not p: + p = (c.venvcache) / self.name / "vivmeta.json" + + p.write_text(json.dumps(self.__dict__)) + + def addfile(self, f: Path) -> None: + self.accessed = str(datetime.today()) + self.files = sorted({*self.files, str(f.absolute().resolve())}) + + class ViVenv: def __init__( self, - spec: List[str], + spec: List[str] = [""], track_exe: bool = False, id: str | None = None, name: str = "", path: Path | None = None, + skip_load: bool = False, + metadata: Meta | None = None, ) -> None: - self.spec = self._validate_spec(spec) - self.exe = str(Path(sys.executable).resolve()) if track_exe else "N/A" - self.id = id if id else get_hash(spec, track_exe) - self.name = name if name else self.id + self.loaded = False + spec = self._validate_spec(spec) + id = id if id else get_hash(spec, track_exe) + + self.name = name if name else id self.path = path if path else c.venvcache / self.name - self.exists = self.name in [d.name for d in c.venvcache.iterdir()] + + if not metadata: + if self.name in (d.name for d in c.venvcache.iterdir()): + self.loaded = True + self.meta = Meta.load(self.name) + else: + self.meta = Meta( + spec=spec, + name=self.name, + id=id, + files=[], + exe=str(Path(sys.executable).resolve()) if track_exe else "N/A", + ) + else: + self.meta = metadata @classmethod def load(cls, name: str) -> "ViVenv": - """generate a vivenv from a viv-info.json file + """generate a vivenv from a vivmeta.json Args: name: used as lookup in the vivenv cache """ - if not (c.venvcache / name / "viv-info.json").is_file(): - warn(f"possibly corrupted vivenv: {name}") - return cls(name=name, spec=[""]) - else: - with (c.venvcache / name / "viv-info.json").open("r") as f: - venvconfig = json.load(f) - - vivenv = cls(name=name, spec=venvconfig["spec"], id=venvconfig["id"]) - vivenv.exe = venvconfig["exe"] + vivenv = cls(name=name, metadata=Meta.load(name)) return vivenv @@ -781,12 +730,14 @@ class ViVenv: with (self.path / "pip.conf").open("w") as f: f.write("[global]\ndisable-pip-version-check = true") + self.meta.created = str(datetime.today()) + def install_pkgs(self) -> None: cmd: List[str] = [ str(self.path / "bin" / "pip"), "install", "--force-reinstall", - ] + self.spec + ] + self.meta.spec run( cmd, @@ -795,24 +746,57 @@ class ViVenv: verbose=bool(os.getenv("VIV_VERBOSE")), ) - def dump_info(self, write: bool = False) -> None: - # TODO: include associated files in 'info' - # means it needs to be loaded first - # or keep a seperate file hash in c.share? - info = { - "created": str(datetime.today()), - "id": self.id, - "spec": self.spec, - "exe": self.exe, - } + def show(self, verbose: bool = False) -> None: + if not verbose: + _id = ( + self.meta.id[:8] + if self.meta.id == self.name + else (self.name[:5] + "..." if len(self.name) > 8 else self.name) + ) - # save metadata to json file - if write: - with (self.path / "viv-info.json").open("w") as f: - json.dump(info, f) + sys.stdout.write( + f"""{a.bold}{a.cyan}{_id}{a.end} """ + f"""{a.style(", ".join(self.meta.spec),'dim')}\n""" + ) else: - info["spec"] = ", ".join(self.spec) - a.table((("key", "value"), *((k, v) for k, v in info.items()))) + self.tree() + + def _tree_leaves(self, items: List[str], indent: str = "") -> str: + tree_chars = ["├"] * (len(items) - 1) + ["╰"] + return "\n".join( + (f"{indent}{a.yellow}{c}─{a.end} {i}" for c, i in zip(tree_chars, items)) + ) + + def tree(self) -> None: + _id = self.meta.id if self.meta.id == self.name else self.name + # TODO: generarlize and loop this or make a template.. + items = [ + f"{a.magenta}{k}{a.end}: {v}" + for k, v in { + **{ + "spec": ", ".join(self.meta.spec), + "created": self.meta.created, + "accessed": self.meta.accessed, + }, + **({"exe": self.meta.exe} if self.meta.exe != "N/A" else {}), + **({"files": ""} if self.meta.files else {}), + }.items() + ] + rows = [f"\n{a.bold}{a.cyan}{_id}{a.end}", self._tree_leaves(items)] + if self.meta.files: + rows += (self._tree_leaves(self.meta.files, indent=" "),) + + sys.stdout.write("\n".join(rows) + "\n") + + +def get_caller_path() -> Path: + """get callers callers file path""" + # viv.py is fist in stack since function is used in `viv.use()` + frame_info = inspect.stack()[2] + filepath = frame_info.filename # in python 3.5+, you can use frame_info.filename + del frame_info # drop the reference to the stack frame to avoid reference cycles + + return Path(filepath).absolute() def use(*packages: str, track_exe: bool = False, name: str = "") -> Path: @@ -823,12 +807,14 @@ def use(*packages: str, track_exe: bool = False, name: str = "") -> Path: track_exe: if true make env python exe specific name: use as vivenv name, if not provided id is used """ - vivenv = ViVenv(list(packages), track_exe=track_exe, name=name) - if not vivenv.exists or os.getenv("VIV_FORCE"): + vivenv = ViVenv(list(packages), track_exe=track_exe, name=name) + if not vivenv.loaded or os.getenv("VIV_FORCE"): vivenv.create() vivenv.install_pkgs() - vivenv.dump_info(write=True) + + vivenv.meta.addfile(get_caller_path()) + vivenv.meta.write() modify_sys_path(vivenv.path) return vivenv.path @@ -836,7 +822,7 @@ def use(*packages: str, track_exe: bool = False, name: str = "") -> Path: def modify_sys_path(new_path: Path) -> None: sys.path = [p for p in sys.path if p is not site.USER_SITE] - site.addsitedir([*(new_path / "lib").glob("python*/site-packages")][0]) + site.addsitedir(str(*(new_path / "lib").glob("python*/site-packages"))) def get_venvs() -> Dict[str, ViVenv]: @@ -946,7 +932,9 @@ class Viv: for k, v in self.vivenvs.items(): if name_id == k or v.name == name_id: matches.append(v) - elif k.startswith(name_id) or (v.id.startswith(name_id) and v.id == v.name): + elif k.startswith(name_id) or ( + v.meta.id.startswith(name_id) and v.meta.id == v.name + ): matches.append(v) elif v.name.startswith(name_id): matches.append(v) @@ -986,11 +974,10 @@ class Viv: # re-create env again since path's are hard-coded vivenv = ViVenv(spec) - if not vivenv.exists or os.getenv("VIV_FORCE"): + if not vivenv.loaded or os.getenv("VIV_FORCE"): vivenv.create() vivenv.install_pkgs() - vivenv.dump_info(write=True) - + vivenv.meta.write() else: echo("re-using existing vivenv") @@ -1013,19 +1000,8 @@ class Viv: elif len(self.vivenvs) == 0: echo("no vivenvs setup") else: - rows = ( - ("vivenv", "spec"), - *( - ( - f"{vivenv.name[:6]}..." - if len(vivenv.name) > 9 - else vivenv.name, - ", ".join(vivenv.spec), - ) - for vivenv in self.vivenvs.values() - ), - ) - a.table(rows) + for _, vivenv in self.vivenvs.items(): + vivenv.show(args.verbose) def exe(self, args: Namespace) -> None: """run python/pip in existing vivenv""" @@ -1048,14 +1024,12 @@ class Viv: def info(self, args: Namespace) -> None: """get metadata about a vivenv""" vivenv = self._match_vivenv(args.vivenv) - metadata_file = vivenv.path / "viv-info.json" + metadata_file = vivenv.path / "vivmeta.json" if not metadata_file.is_file(): error(f"Unable to find metadata for vivenv: {args.vivenv}", code=1) - echo(f"more info about {vivenv.name}:") - - vivenv.dump_info() + vivenv.show(verbose=True) def _install_local_src(self, sha256: str, src: Path, cli: Path) -> None: echo("updating local source copy of viv") @@ -1218,7 +1192,7 @@ class Viv: # ephemeral (default), semi-ephemeral (persist inside /tmp), or # persist (use c.cache) - if not vivenv.exists or os.getenv("VIV_FORCE"): + if not vivenv.loaded or os.getenv("VIV_FORCE"): if not args.keep: with tempfile.TemporaryDirectory(prefix="viv-") as tmpdir: vivenv.path = Path(tmpdir) @@ -1232,7 +1206,7 @@ class Viv: else: vivenv.create() vivenv.install_pkgs() - vivenv.dump_info(write=True) + vivenv.meta.write() sys.exit(subprocess.run([vivenv.path / "bin" / bin, *args.rest]).returncode) @@ -1256,7 +1230,7 @@ class Viv: return parser - def _validate_args(self, args): + def _validate_args(self, args: Namespace) -> None: if args.func.__name__ in ("freeze", "shim", "run"): if not args.reqs: error("must specify a requirement", code=1) @@ -1317,6 +1291,13 @@ class Viv: p_vivenv_arg.add_argument("vivenv", help="name/hash of vivenv") p_list = self._get_subcmd_parser(subparsers, "list") + p_list.add_argument( + "-v", + "--verbose", + help="show full metadata for vivenvs", + default=False, + action="store_true", + ) p_list.add_argument( "-q", "--quiet",