feat(metadata)!: revamp metadata tracking

This breaks any existing cache but now allows for tracking files that
access vivenv's
This commit is contained in:
Daylin Morgan 2023-05-31 11:30:02 -05:00
parent fc30b5143b
commit fcb48daade
Signed by: daylin
GPG key ID: C1E52E7DD81DF79F
2 changed files with 179 additions and 199 deletions

View file

@ -2,4 +2,3 @@
TAG=$(git describe --tags --always --dirty=-dev --exclude 'latest') TAG=$(git describe --tags --always --dirty=-dev --exclude 'latest')
VERSION="${TAG#v}" VERSION="${TAG#v}"
sed -i "s/__version__ = \".*\"/__version__ = \"$VERSION\"/g" src/viv/viv.py sed -i "s/__version__ = \".*\"/__version__ = \"$VERSION\"/g" src/viv/viv.py
git add src/viv/viv.py

View file

@ -9,6 +9,7 @@
from __future__ import annotations from __future__ import annotations
import hashlib import hashlib
import inspect
import itertools import itertools
import json import json
import os import os
@ -31,19 +32,17 @@ from argparse import (
_SubParsersAction, _SubParsersAction,
) )
from argparse import ArgumentParser as StdArgParser from argparse import ArgumentParser as StdArgParser
from dataclasses import dataclass
from datetime import datetime from datetime import datetime
from itertools import zip_longest
from pathlib import Path from pathlib import Path
from textwrap import dedent, fill, wrap from textwrap import dedent, fill
from types import TracebackType from types import TracebackType
from typing import ( from typing import (
Any, Any,
Dict, Dict,
Generator,
List, List,
NoReturn, NoReturn,
Optional, Optional,
Sequence,
TextIO, TextIO,
Tuple, Tuple,
Type, Type,
@ -149,16 +148,6 @@ class Spinner:
class Ansi: class Ansi:
"""control ouptut of ansi(VT100) control codes""" """control ouptut of ansi(VT100) control codes"""
BOX: Dict[str, str] = {
"v": "",
"h": "",
"tl": "",
"tr": "",
"bl": "",
"br": "",
"sep": "",
}
def __init__(self) -> None: def __init__(self) -> None:
self.bold: str = "\033[1m" self.bold: str = "\033[1m"
self.dim: str = "\033[2m" self.dim: str = "\033[2m"
@ -229,104 +218,9 @@ class Ansi:
new_output = [f"{self.red}->{self.end} {line}" for line in output.splitlines()] new_output = [f"{self.red}->{self.end} {line}" for line in output.splitlines()]
sys.stdout.write("\n".join(new_output) + "\n") sys.stdout.write("\n".join(new_output) + "\n")
def _get_column_sizes(
self, rows: Tuple[Tuple[str, Sequence[str]], ...]
) -> List[int]:
"""convert list of rows to list of columns sizes
First convert rows into list of columns,
then get max string length for each column.
"""
return list(max(map(len, lst)) for lst in map(list, zip(*rows))) # type: ignore
def _make_row(self, row: Generator[Any, None, None]) -> str:
return (
f" {self.BOX['v']} "
+ f" {self.BOX['sep']} ".join(row)
+ f" {self.BOX['v']}"
)
def _sanitize_row(
self, sizes: List[int], row: Tuple[str, Sequence[str]]
) -> Tuple[Tuple[str, Sequence[str]], ...]:
if len(row[1]) > sizes[1]:
return tuple(
zip_longest(
(row[0],),
wrap(str(row[1]), break_on_hyphens=False, width=sizes[1]),
fillvalue="",
)
)
else:
return (row,)
def viv_preamble(self, style: str = "magenta", sep: str = "::") -> str: def viv_preamble(self, style: str = "magenta", sep: str = "::") -> str:
return f"{self.cyan}viv{self.end}{self.__dict__[style]}{sep}{self.end}" return f"{self.cyan}viv{self.end}{self.__dict__[style]}{sep}{self.end}"
def table(
self, rows: Tuple[Tuple[str, Sequence[str]], ...], header_style: str = "cyan"
) -> None:
"""generate a table with outline and styled header assumes two columns
Args:
rows: sequence of the rows, first item assumed to be header
header_style: color/style for header row
"""
sizes = self._get_column_sizes(rows)
col2_limit = shutil.get_terminal_size().columns - 20
if col2_limit < 20:
error("increase screen size to view table", code=1)
elif sizes[1] > col2_limit:
sizes[1] = col2_limit
header, rows = rows[0], rows[1:]
# this is maybe taking comprehensions too far....
table_rows = (
self._make_row(row)
for row in (
# header row
(
self.__dict__[header_style] + f"{cell:<{sizes[i]}}" + self.end
for i, cell in enumerate(header)
),
# rest of the rows
*(
(f"{cell:<{sizes[i]}}" for i, cell in enumerate(row))
for row in (
newrow
for row in rows
for newrow in self._sanitize_row(sizes, row)
)
),
)
)
sys.stderr.write(
"".join(
(
" ",
self.BOX["tl"],
self.BOX["h"] * (sum(sizes) + 5),
self.BOX["tr"],
"\n",
)
)
)
sys.stderr.write("\n".join(table_rows) + "\n")
sys.stderr.write(
"".join(
(
" ",
self.BOX["bl"],
self.BOX["h"] * (sum(sizes) + 5),
self.BOX["br"],
"\n",
)
)
)
a = Ansi() a = Ansi()
@ -339,26 +233,45 @@ to create/activate a vivenv:
- from command line: `{a.style("viv -h","bold")}` - from command line: `{a.style("viv -h","bold")}`
- within python script: {a.style('__import__("viv").use("typer", "rich-click")','bold')} - within python script: {a.style('__import__("viv").use("typer", "rich-click")','bold')}
""" """
_standalone_func = r"""def _viv_use(*pkgs, track_exe=False, name=""): _standalone_func = r"""def _viv_use(*pkgs, track_exe=False, name=""):
T,F,N=True,False,None;i,s,m,spec=__import__,str,map,[*pkgs] import hashlib, json, os, site, shutil, sys, venv # noqa
e,w=lambda x: T if x else F,lambda p,t: p.write_text(t) from pathlib import Path # noqa
if not {*m(type,pkgs)}=={s}: raise ValueError(f"spec: {pkgs} is invalid") from datetime import datetime # noqa
ge,sys,P,ew=i("os").getenv,i("sys"),i("pathlib").Path,i("sys").stderr.write from subprocess import run # noqa
(cache:=(P(ge("XDG_CACHE_HOME",P.home()/".cache"))/"viv"/"venvs")).mkdir(parents=T,exist_ok=T)
((sha256:=i("hashlib").sha256()).update((s(spec)+ if not {*map(type, pkgs)} == {str}:
(((exe:=("N/A",s(P(i("sys").executable).resolve()))[e(track_exe)])))).encode())) raise ValueError(f"spec: {pkgs} is invalid")
if {env:=cache/(((_id:=sha256.hexdigest()),name)[e(name)])}-{*cache.glob("*/")} or ge("VIV_FORCE"):
v=e(ge("VIV_VERBOSE"));ew(f"generating new vivenv -> {env.name}\n") meta = dict.fromkeys(("created", "accessed"), (t := str(datetime.today())))
i("venv").EnvBuilder(with_pip=T,clear=T).create(env) runner = str(Path(__file__).absolute().resolve())
w(env/"pip.conf","[global]\ndisable-pip-version-check=true") force, verbose, xdg = map(os.getenv, ("VIV_FORCE", "VIV_VERBOSE", "XDG_CACHE_HOME"))
if (rc:=(p:=i("subprocess").run([env/"bin"/"pip","install","--force-reinstall",*spec],text=T, cache = (Path(xdg) if xdg else Path.home() / ".cache") / "viv" / "venvs"
stdout=(-1,N)[v],stderr=(-2,N)[v])).returncode)!=0: cache.mkdir(parents=True, exist_ok=True)
if env.is_dir():i("shutil").rmtree(env) exe = str(Path(sys.executable).resolve()) if track_exe else "N/A"
ew(f"pip had non zero exit ({rc})\n{p.stdout}\n");sys.exit(rc) (sha256 := hashlib.sha256()).update((str(spec := [*pkgs]) + exe).encode())
w(env/"viv-info.json",i("json").dumps( _id = sha256.hexdigest()
{"created":s(i("datetime").datetime.today()),"id":_id,"spec":spec,"exe":exe})) if (env := cache / (name if name else _id)) not in cache.glob("*/") or force:
sys.path=[p for p in (*sys.path,s(*(env/"lib").glob("py*/si*")))if p!=i("site").USER_SITE] sys.stderr.write(f"generating new vivenv -> {env.name}\n")
return env""" # noqa venv.EnvBuilder(with_pip=True, clear=True).create(env)
(env / "pip.conf").write_text("[global]\ndisable-pip-version-check=true")
run_kw = dict(zip(("stdout", "stderr"), ((None,) * 2 if verbose else (-1, 2))))
p = run([env / "bin" / "pip", "install", "--force-reinstall", *spec], **run_kw)
if (rc := p.returncode) != 0:
if env.is_dir():
shutil.rmtree(env)
sys.stderr.write(f"pip had non zero exit ({rc})\n{p.stdout.decode()}\n")
sys.exit(rc)
meta.update(dict(id=_id, spec=spec, exe=exe, name=name, files=[runner]))
else:
meta = json.loads((env / "vivmeta.json").read_text())
meta.update(dict(accessed=t, files=sorted({*meta["files"],runner})))
(env / "vivmeta.json").write_text(json.dumps(meta))
sys.path = [p for p in sys.path if not p != site.USER_SITE]
site.addsitedir(str(*(env / "lib").glob("py*/si*")))
return env
"""
def noqa(self, txt: str) -> str: def noqa(self, txt: str) -> str:
max_length = max(map(len, txt.splitlines())) max_length = max(map(len, txt.splitlines()))
@ -372,16 +285,13 @@ to create/activate a vivenv:
return f"""__import__("viv").use({spec_str})""" return f"""__import__("viv").use({spec_str})"""
def standalone(self, spec: List[str]) -> str: def standalone(self, spec: List[str]) -> str:
func_use = self.noqa( func_use = "\n".join(
"\n".join((self._standalone_func, self._use_str(spec, standalone=True))) (self._standalone_func, self.noqa(self._use_str(spec, standalone=True)))
) )
return f""" return f"""
# see `python3 <(curl -fsSL viv.dayl.in/viv.py) --help` # see `python3 <(curl -fsSL viv.dayl.in/viv.py) --help`
# <<<<< auto-generated by viv (v{__version__}) # AUTOGENERATED by viv (v{__version__})
# fmt: off
{func_use} {func_use}
# fmt: on
# >>>>> code golfed with <3
""" """
def _rel_import(self, local_source: Optional[Path]) -> str: def _rel_import(self, local_source: Optional[Path]) -> str:
@ -426,9 +336,7 @@ to create/activate a vivenv:
bin: str, bin: str,
) -> str: ) -> str:
if standalone: if standalone:
imports = "\n".join( imports = self._standalone_func
("# fmt: off", self.noqa(self._standalone_func), "# fmt: on")
)
elif path == "abs": elif path == "abs":
imports = self._absolute_import(local_source) imports = self._absolute_import(local_source)
elif path == "rel": elif path == "rel":
@ -724,37 +632,78 @@ def get_hash(spec: Tuple[str, ...] | List[str], track_exe: bool = False) -> str:
return sha256.hexdigest() return sha256.hexdigest()
@dataclass
class Meta:
name: str
id: str
spec: List[str]
files: List[str]
exe: str
created: str = ""
accessed: str = ""
@classmethod
def load(cls, name: str) -> "Meta":
if not (c.venvcache / name / "vivmeta.json").exists():
warn(f"possibly corrupted vivenv: {name}")
# add empty values for corrupted vivenvs so it will still load
return cls(name=name, spec=[""], files=[""], exe="", id="")
else:
meta = json.loads((c.venvcache / name / "vivmeta.json").read_text())
return cls(**meta)
def write(self, p: Path | None = None) -> None:
if not p:
p = (c.venvcache) / self.name / "vivmeta.json"
p.write_text(json.dumps(self.__dict__))
def addfile(self, f: Path) -> None:
self.accessed = str(datetime.today())
self.files = sorted({*self.files, str(f.absolute().resolve())})
class ViVenv: class ViVenv:
def __init__( def __init__(
self, self,
spec: List[str], spec: List[str] = [""],
track_exe: bool = False, track_exe: bool = False,
id: str | None = None, id: str | None = None,
name: str = "", name: str = "",
path: Path | None = None, path: Path | None = None,
skip_load: bool = False,
metadata: Meta | None = None,
) -> None: ) -> None:
self.spec = self._validate_spec(spec) self.loaded = False
self.exe = str(Path(sys.executable).resolve()) if track_exe else "N/A" spec = self._validate_spec(spec)
self.id = id if id else get_hash(spec, track_exe) id = id if id else get_hash(spec, track_exe)
self.name = name if name else self.id
self.name = name if name else id
self.path = path if path else c.venvcache / self.name self.path = path if path else c.venvcache / self.name
self.exists = self.name in [d.name for d in c.venvcache.iterdir()]
if not metadata:
if self.name in (d.name for d in c.venvcache.iterdir()):
self.loaded = True
self.meta = Meta.load(self.name)
else:
self.meta = Meta(
spec=spec,
name=self.name,
id=id,
files=[],
exe=str(Path(sys.executable).resolve()) if track_exe else "N/A",
)
else:
self.meta = metadata
@classmethod @classmethod
def load(cls, name: str) -> "ViVenv": def load(cls, name: str) -> "ViVenv":
"""generate a vivenv from a viv-info.json file """generate a vivenv from a vivmeta.json
Args: Args:
name: used as lookup in the vivenv cache name: used as lookup in the vivenv cache
""" """
if not (c.venvcache / name / "viv-info.json").is_file(): vivenv = cls(name=name, metadata=Meta.load(name))
warn(f"possibly corrupted vivenv: {name}")
return cls(name=name, spec=[""])
else:
with (c.venvcache / name / "viv-info.json").open("r") as f:
venvconfig = json.load(f)
vivenv = cls(name=name, spec=venvconfig["spec"], id=venvconfig["id"])
vivenv.exe = venvconfig["exe"]
return vivenv return vivenv
@ -781,12 +730,14 @@ class ViVenv:
with (self.path / "pip.conf").open("w") as f: with (self.path / "pip.conf").open("w") as f:
f.write("[global]\ndisable-pip-version-check = true") f.write("[global]\ndisable-pip-version-check = true")
self.meta.created = str(datetime.today())
def install_pkgs(self) -> None: def install_pkgs(self) -> None:
cmd: List[str] = [ cmd: List[str] = [
str(self.path / "bin" / "pip"), str(self.path / "bin" / "pip"),
"install", "install",
"--force-reinstall", "--force-reinstall",
] + self.spec ] + self.meta.spec
run( run(
cmd, cmd,
@ -795,24 +746,57 @@ class ViVenv:
verbose=bool(os.getenv("VIV_VERBOSE")), verbose=bool(os.getenv("VIV_VERBOSE")),
) )
def dump_info(self, write: bool = False) -> None: def show(self, verbose: bool = False) -> None:
# TODO: include associated files in 'info' if not verbose:
# means it needs to be loaded first _id = (
# or keep a seperate file hash in c.share? self.meta.id[:8]
info = { if self.meta.id == self.name
"created": str(datetime.today()), else (self.name[:5] + "..." if len(self.name) > 8 else self.name)
"id": self.id, )
"spec": self.spec,
"exe": self.exe,
}
# save metadata to json file sys.stdout.write(
if write: f"""{a.bold}{a.cyan}{_id}{a.end} """
with (self.path / "viv-info.json").open("w") as f: f"""{a.style(", ".join(self.meta.spec),'dim')}\n"""
json.dump(info, f) )
else: else:
info["spec"] = ", ".join(self.spec) self.tree()
a.table((("key", "value"), *((k, v) for k, v in info.items())))
def _tree_leaves(self, items: List[str], indent: str = "") -> str:
tree_chars = [""] * (len(items) - 1) + [""]
return "\n".join(
(f"{indent}{a.yellow}{c}{a.end} {i}" for c, i in zip(tree_chars, items))
)
def tree(self) -> None:
_id = self.meta.id if self.meta.id == self.name else self.name
# TODO: generarlize and loop this or make a template..
items = [
f"{a.magenta}{k}{a.end}: {v}"
for k, v in {
**{
"spec": ", ".join(self.meta.spec),
"created": self.meta.created,
"accessed": self.meta.accessed,
},
**({"exe": self.meta.exe} if self.meta.exe != "N/A" else {}),
**({"files": ""} if self.meta.files else {}),
}.items()
]
rows = [f"\n{a.bold}{a.cyan}{_id}{a.end}", self._tree_leaves(items)]
if self.meta.files:
rows += (self._tree_leaves(self.meta.files, indent=" "),)
sys.stdout.write("\n".join(rows) + "\n")
def get_caller_path() -> Path:
"""get callers callers file path"""
# viv.py is fist in stack since function is used in `viv.use()`
frame_info = inspect.stack()[2]
filepath = frame_info.filename # in python 3.5+, you can use frame_info.filename
del frame_info # drop the reference to the stack frame to avoid reference cycles
return Path(filepath).absolute()
def use(*packages: str, track_exe: bool = False, name: str = "") -> Path: def use(*packages: str, track_exe: bool = False, name: str = "") -> Path:
@ -823,12 +807,14 @@ def use(*packages: str, track_exe: bool = False, name: str = "") -> Path:
track_exe: if true make env python exe specific track_exe: if true make env python exe specific
name: use as vivenv name, if not provided id is used name: use as vivenv name, if not provided id is used
""" """
vivenv = ViVenv(list(packages), track_exe=track_exe, name=name)
if not vivenv.exists or os.getenv("VIV_FORCE"): vivenv = ViVenv(list(packages), track_exe=track_exe, name=name)
if not vivenv.loaded or os.getenv("VIV_FORCE"):
vivenv.create() vivenv.create()
vivenv.install_pkgs() vivenv.install_pkgs()
vivenv.dump_info(write=True)
vivenv.meta.addfile(get_caller_path())
vivenv.meta.write()
modify_sys_path(vivenv.path) modify_sys_path(vivenv.path)
return vivenv.path return vivenv.path
@ -836,7 +822,7 @@ def use(*packages: str, track_exe: bool = False, name: str = "") -> Path:
def modify_sys_path(new_path: Path) -> None: def modify_sys_path(new_path: Path) -> None:
sys.path = [p for p in sys.path if p is not site.USER_SITE] sys.path = [p for p in sys.path if p is not site.USER_SITE]
site.addsitedir([*(new_path / "lib").glob("python*/site-packages")][0]) site.addsitedir(str(*(new_path / "lib").glob("python*/site-packages")))
def get_venvs() -> Dict[str, ViVenv]: def get_venvs() -> Dict[str, ViVenv]:
@ -946,7 +932,9 @@ class Viv:
for k, v in self.vivenvs.items(): for k, v in self.vivenvs.items():
if name_id == k or v.name == name_id: if name_id == k or v.name == name_id:
matches.append(v) matches.append(v)
elif k.startswith(name_id) or (v.id.startswith(name_id) and v.id == v.name): elif k.startswith(name_id) or (
v.meta.id.startswith(name_id) and v.meta.id == v.name
):
matches.append(v) matches.append(v)
elif v.name.startswith(name_id): elif v.name.startswith(name_id):
matches.append(v) matches.append(v)
@ -986,11 +974,10 @@ class Viv:
# re-create env again since path's are hard-coded # re-create env again since path's are hard-coded
vivenv = ViVenv(spec) vivenv = ViVenv(spec)
if not vivenv.exists or os.getenv("VIV_FORCE"): if not vivenv.loaded or os.getenv("VIV_FORCE"):
vivenv.create() vivenv.create()
vivenv.install_pkgs() vivenv.install_pkgs()
vivenv.dump_info(write=True) vivenv.meta.write()
else: else:
echo("re-using existing vivenv") echo("re-using existing vivenv")
@ -1013,19 +1000,8 @@ class Viv:
elif len(self.vivenvs) == 0: elif len(self.vivenvs) == 0:
echo("no vivenvs setup") echo("no vivenvs setup")
else: else:
rows = ( for _, vivenv in self.vivenvs.items():
("vivenv", "spec"), vivenv.show(args.verbose)
*(
(
f"{vivenv.name[:6]}..."
if len(vivenv.name) > 9
else vivenv.name,
", ".join(vivenv.spec),
)
for vivenv in self.vivenvs.values()
),
)
a.table(rows)
def exe(self, args: Namespace) -> None: def exe(self, args: Namespace) -> None:
"""run python/pip in existing vivenv""" """run python/pip in existing vivenv"""
@ -1048,14 +1024,12 @@ class Viv:
def info(self, args: Namespace) -> None: def info(self, args: Namespace) -> None:
"""get metadata about a vivenv""" """get metadata about a vivenv"""
vivenv = self._match_vivenv(args.vivenv) vivenv = self._match_vivenv(args.vivenv)
metadata_file = vivenv.path / "viv-info.json" metadata_file = vivenv.path / "vivmeta.json"
if not metadata_file.is_file(): if not metadata_file.is_file():
error(f"Unable to find metadata for vivenv: {args.vivenv}", code=1) error(f"Unable to find metadata for vivenv: {args.vivenv}", code=1)
echo(f"more info about {vivenv.name}:") vivenv.show(verbose=True)
vivenv.dump_info()
def _install_local_src(self, sha256: str, src: Path, cli: Path) -> None: def _install_local_src(self, sha256: str, src: Path, cli: Path) -> None:
echo("updating local source copy of viv") echo("updating local source copy of viv")
@ -1218,7 +1192,7 @@ class Viv:
# ephemeral (default), semi-ephemeral (persist inside /tmp), or # ephemeral (default), semi-ephemeral (persist inside /tmp), or
# persist (use c.cache) # persist (use c.cache)
if not vivenv.exists or os.getenv("VIV_FORCE"): if not vivenv.loaded or os.getenv("VIV_FORCE"):
if not args.keep: if not args.keep:
with tempfile.TemporaryDirectory(prefix="viv-") as tmpdir: with tempfile.TemporaryDirectory(prefix="viv-") as tmpdir:
vivenv.path = Path(tmpdir) vivenv.path = Path(tmpdir)
@ -1232,7 +1206,7 @@ class Viv:
else: else:
vivenv.create() vivenv.create()
vivenv.install_pkgs() vivenv.install_pkgs()
vivenv.dump_info(write=True) vivenv.meta.write()
sys.exit(subprocess.run([vivenv.path / "bin" / bin, *args.rest]).returncode) sys.exit(subprocess.run([vivenv.path / "bin" / bin, *args.rest]).returncode)
@ -1256,7 +1230,7 @@ class Viv:
return parser return parser
def _validate_args(self, args): def _validate_args(self, args: Namespace) -> None:
if args.func.__name__ in ("freeze", "shim", "run"): if args.func.__name__ in ("freeze", "shim", "run"):
if not args.reqs: if not args.reqs:
error("must specify a requirement", code=1) error("must specify a requirement", code=1)
@ -1317,6 +1291,13 @@ class Viv:
p_vivenv_arg.add_argument("vivenv", help="name/hash of vivenv") p_vivenv_arg.add_argument("vivenv", help="name/hash of vivenv")
p_list = self._get_subcmd_parser(subparsers, "list") p_list = self._get_subcmd_parser(subparsers, "list")
p_list.add_argument(
"-v",
"--verbose",
help="show full metadata for vivenvs",
default=False,
action="store_true",
)
p_list.add_argument( p_list.add_argument(
"-q", "-q",
"--quiet", "--quiet",