This commit is contained in:
daylinmorgan 2023-05-31 15:37:20 +00:00
parent abc7850a2a
commit 38f5365fdd
2 changed files with 293 additions and 286 deletions

2
CNAME
View file

@ -1 +1 @@
viv.dayl.in
viv.dayl.in

577
viv.py
View file

@ -1,5 +1,5 @@
#!/usr/bin/env python3
"""Viv isn't venv!
"""viv isn't venv!
viv -h
OR
@ -31,7 +31,6 @@ from argparse import (
_SubParsersAction,
)
from argparse import ArgumentParser as StdArgParser
from dataclasses import dataclass
from datetime import datetime
from itertools import zip_longest
from pathlib import Path
@ -52,7 +51,7 @@ from typing import (
from urllib.error import HTTPError
from urllib.request import urlopen
__version__ = "23.5a4"
__version__ = "23.5a4-13-ga6bd81d-dev"
class Config:
@ -147,37 +146,35 @@ class Spinner:
sys.stderr.write("\r")
BOX: Dict[str, str] = {
"v": "",
"h": "",
"tl": "",
"tr": "",
"bl": "",
"br": "",
"sep": "",
}
@dataclass
class Ansi:
"""control ouptut of ansi(VT100) control codes"""
bold: str = "\033[1m"
dim: str = "\033[2m"
underline: str = "\033[4m"
red: str = "\033[1;31m"
green: str = "\033[1;32m"
yellow: str = "\033[1;33m"
magenta: str = "\033[1;35m"
cyan: str = "\033[1;36m"
end: str = "\033[0m"
BOX: Dict[str, str] = {
"v": "",
"h": "",
"tl": "",
"tr": "",
"bl": "",
"br": "",
"sep": "",
}
# for argparse help
header: str = cyan
option: str = yellow
metavar: str = "\033[33m" # normal yellow
def __init__(self) -> None:
self.bold: str = "\033[1m"
self.dim: str = "\033[2m"
self.underline: str = "\033[4m"
self.red: str = "\033[1;31m"
self.green: str = "\033[1;32m"
self.yellow: str = "\033[1;33m"
self.magenta: str = "\033[1;35m"
self.cyan: str = "\033[1;36m"
self.end: str = "\033[0m"
# for argparse help
self.header: str = self.cyan
self.option: str = self.yellow
self.metavar: str = "\033[33m" # normal yellow
def __post_init__(self) -> None:
if os.getenv("NO_COLOR") or not sys.stderr.isatty():
for attr in self.__dict__:
setattr(self, attr, "")
@ -216,7 +213,7 @@ class Ansi:
return " ".join(
(
self.style(f, "magenta") + self.style(rest, "cyan")
for f, rest in (("V", "iv"), ("i", "sn't"), ("v", "env!"))
for f, rest in (("v", "iv"), ("i", "sn't"), ("v", "env!"))
)
)
@ -243,7 +240,11 @@ class Ansi:
return list(max(map(len, lst)) for lst in map(list, zip(*rows))) # type: ignore
def _make_row(self, row: Generator[Any, None, None]) -> str:
return f" {BOX['v']} " + f" {BOX['sep']} ".join(row) + f" {BOX['v']}"
return (
f" {self.BOX['v']} "
+ f" {self.BOX['sep']} ".join(row)
+ f" {self.BOX['v']}"
)
def _sanitize_row(
self, sizes: List[int], row: Tuple[str, Sequence[str]]
@ -260,7 +261,7 @@ class Ansi:
return (row,)
def viv_preamble(self, style: str = "magenta", sep: str = "::") -> str:
return f"{self.cyan}Viv{self.end}{self.__dict__[style]}{sep}{self.end}"
return f"{self.cyan}viv{self.end}{self.__dict__[style]}{sep}{self.end}"
def table(
self, rows: Tuple[Tuple[str, Sequence[str]], ...], header_style: str = "cyan"
@ -302,14 +303,186 @@ class Ansi:
)
)
sys.stderr.write(f" {BOX['tl']}{BOX['h']*(sum(sizes)+5)}{BOX['tr']}\n")
sys.stderr.write(
"".join(
(
" ",
self.BOX["tl"],
self.BOX["h"] * (sum(sizes) + 5),
self.BOX["tr"],
"\n",
)
)
)
sys.stderr.write("\n".join(table_rows) + "\n")
sys.stderr.write(f" {BOX['bl']}{BOX['h']*(sum(sizes)+5)}{BOX['br']}\n")
sys.stderr.write(
"".join(
(
" ",
self.BOX["bl"],
self.BOX["h"] * (sum(sizes) + 5),
self.BOX["br"],
"\n",
)
)
)
a = Ansi()
class Template:
description = f"""
{a.tagline()}
to create/activate a vivenv:
- from command line: `{a.style("viv -h","bold")}`
- within python script: {a.style('__import__("viv").use("typer", "rich-click")','bold')}
"""
_standalone_func = r"""def _viv_use(*pkgs, track_exe=False, name=""):
T,F,N=True,False,None;i,s,m,spec=__import__,str,map,[*pkgs]
e,w=lambda x: T if x else F,lambda p,t: p.write_text(t)
if not {*m(type,pkgs)}=={s}: raise ValueError(f"spec: {pkgs} is invalid")
ge,sys,P,ew=i("os").getenv,i("sys"),i("pathlib").Path,i("sys").stderr.write
(cache:=(P(ge("XDG_CACHE_HOME",P.home()/".cache"))/"viv"/"venvs")).mkdir(parents=T,exist_ok=T)
((sha256:=i("hashlib").sha256()).update((s(spec)+
(((exe:=("N/A",s(P(i("sys").executable).resolve()))[e(track_exe)])))).encode()))
if {env:=cache/(((_id:=sha256.hexdigest()),name)[e(name)])}-{*cache.glob("*/")} or ge("VIV_FORCE"):
v=e(ge("VIV_VERBOSE"));ew(f"generating new vivenv -> {env.name}\n")
i("venv").EnvBuilder(with_pip=T,clear=T).create(env)
w(env/"pip.conf","[global]\ndisable-pip-version-check=true")
if (rc:=(p:=i("subprocess").run([env/"bin"/"pip","install","--force-reinstall",*spec],text=T,
stdout=(-1,N)[v],stderr=(-2,N)[v])).returncode)!=0:
if env.is_dir():i("shutil").rmtree(env)
ew(f"pip had non zero exit ({rc})\n{p.stdout}\n");sys.exit(rc)
w(env/"viv-info.json",i("json").dumps(
{"created":s(i("datetime").datetime.today()),"id":_id,"spec":spec,"exe":exe}))
sys.path=[p for p in (*sys.path,s(*(env/"lib").glob("py*/si*")))if p!=i("site").USER_SITE]
return env""" # noqa
def noqa(self, txt: str) -> str:
max_length = max(map(len, txt.splitlines()))
return "\n".join((f"{line:{max_length}} # noqa" for line in txt.splitlines()))
def _use_str(self, spec: List[str], standalone: bool = False) -> str:
spec_str = ", ".join(f'"{req}"' for req in spec)
if standalone:
return f"""_viv_use({fill(spec_str,width=90,subsequent_indent=" ",)})"""
else:
return f"""__import__("viv").use({spec_str})"""
def standalone(self, spec: List[str]) -> str:
func_use = self.noqa(
"\n".join((self._standalone_func, self._use_str(spec, standalone=True)))
)
return f"""
# see `python3 <(curl -fsSL gh.dayl.in/viv/viv.py) --help`
# <<<<< auto-generated by viv (v{__version__})
# fmt: off
{func_use}
# fmt: on
# >>>>> code golfed with <3
"""
def _rel_import(self, local_source: Optional[Path]) -> str:
if not local_source:
raise ValueError("local source must exist")
path_to_viv = path_to_viv = str(
local_source.resolve().absolute().parent.parent
).replace(str(Path.home()), "~")
return (
"""__import__("sys").path.append(__import__("os")"""
f""".path.expanduser("{path_to_viv}")) # noqa"""
)
def _absolute_import(self, local_source: Optional[Path]) -> str:
if not local_source:
raise ValueError("local source must exist")
path_to_viv = local_source.resolve().absolute().parent.parent
return f"""__import__("sys").path.append("{path_to_viv}") # noqa"""
def frozen_import(
self, path: str, local_source: Optional[Path], spec: List[str]
) -> str:
if path == "abs":
imports = self._absolute_import(local_source)
elif path == "rel":
imports = self._rel_import(local_source)
else:
imports = ""
return f"""\
{imports}
{self.noqa(self._use_str(spec))}
"""
def shim(
self,
path: str,
local_source: Optional[Path],
standalone: bool,
spec: List[str],
bin: str,
) -> str:
if standalone:
imports = "\n".join(
("# fmt: off", self.noqa(self._standalone_func), "# fmt: on")
)
elif path == "abs":
imports = self._absolute_import(local_source)
elif path == "rel":
imports = self._rel_import(local_source)
else:
imports = ""
return f"""\
#!/usr/bin/env python3
{imports}
import subprocess
import sys
if __name__ == "__main__":
vivenv = {self.noqa(self._use_str(spec, standalone))}
sys.exit(subprocess.run([vivenv / "bin" / "{bin}", *sys.argv[1:]]).returncode)
"""
def update(
self, src: Optional[Path], cli: Path, local_version: str, next_version: str
) -> str:
return f"""
Update source at {a.green}{src}{a.end}
Symlink {a.bold}{src}{a.end} to {a.bold}{cli}{a.end}
Version {a.bold}{local_version}{a.end} -> {a.bold}{next_version}{a.end}
"""
def install(self, src: Path, cli: Path) -> str:
return f"""
Install viv.py to {a.green}{src}{a.end}
Symlink {a.bold}{src}{a.end} to {a.bold}{cli}{a.end}
"""
def show(
self, cli: Optional[Path | str], running: Path, local: Optional[Path | str]
) -> str:
return (
"\n".join(
f" {a.bold}{k}{a.end}: {v}"
for k, v in (
("Version", __version__),
("CLI", cli),
("Running Source", running),
("Local Source", local),
)
)
+ "\n"
)
t = Template()
# TODO: convet the below functions into a proper file/stream logging interface
def echo(
msg: str, style: str = "magenta", newline: bool = True, fd: TextIO = sys.stderr
@ -680,103 +853,10 @@ def get_venvs() -> Dict[str, ViVenv]:
return vivenvs
# TODO: make a template class?
SYS_PATH_TEMPLATE = """__import__("sys").path.append("{path_to_viv}") # noqa"""
REL_SYS_PATH_TEMPLATE = (
"""__import__("sys").path.append(__import__("os")"""
""".path.expanduser("{path_to_viv}")) # noqa"""
)
IMPORT_TEMPLATE = """__import__("viv").use({spec}) # noqa"""
STANDALONE_TEMPLATE = r"""
# <<<<< auto-generated by viv (v{version})
# see `python3 <(curl -fsSL gh.dayl.in/viv/viv.py) --help`
# fmt: off
{func}
# fmt: on
# >>>>> code golfed with <3
""" # noqa
STANDALONE_TEMPLATE_FUNC = r"""def _viv_use(*pkgs, track_exe=False, name=""):
T,F=True,False;i,s,m,e,spec=__import__,str,map,lambda x: T if x else F,[*pkgs]
if not {*m(type,pkgs)}=={s}: raise ValueError(f"spec: {pkgs} is invalid")
ge,sys,P,ew=i("os").getenv,i("sys"),i("pathlib").Path,i("sys").stderr.write
(cache:=(P(ge("XDG_CACHE_HOME",P.home()/".cache"))/"viv"/"venvs")).mkdir(parents=T,exist_ok=T)
((sha256:=i("hashlib").sha256()).update((s(spec)+
(((exe:=("N/A",s(P(i("sys").executable).resolve()))[e(track_exe)])))).encode()))
if ((env:=cache/(name if name else (_id:=sha256.hexdigest())))
not in cache.glob("*/")) or ge("VIV_FORCE"):
v=e(ge("VIV_VERBOSE"));ew(f"generating new vivenv -> {env.name}\n")
i("venv").EnvBuilder(with_pip=T,clear=T).create(env)
with (env/"pip.conf").open("w") as f:f.write("[global]\ndisable-pip-version-check=true")
if (p:=i("subprocess").run([env/"bin"/"pip","install","--force-reinstall",*spec],text=True,
stdout=(-1,None)[v],stderr=(-2,None)[v])).returncode!=0:
if env.is_dir():i("shutil").rmtree(env)
ew(f"pip had non zero exit ({p.returncode})\n{p.stdout}\n");sys.exit(p.returncode)
with (env/"viv-info.json").open("w") as f:
i("json").dump({"created":s(i("datetime").datetime.today()),
"id":_id,"spec":spec,"exe":exe},f)
sys.path = [p for p in (*sys.path,s(*(env/"lib").glob("py*/si*"))) if p!=i("site").USER_SITE]
return env
""" # noqa
SHOW_TEMPLATE = f"""
{a.style('Version', 'bold')}: {{version}}
{a.style('CLI', 'bold')}: {{cli}}
{a.style('Running Source', 'bold')}: {{running_src}}
{a.style('Local Source', 'bold')}: {{local_src}}
"""
INSTALL_TEMPLATE = f"""
Install viv.py to {a.green}{{src_location}}{a.end}
Symlink {a.bold}{{src_location}}{a.end} to {a.bold}{{cli_location}}{a.end}
"""
UPDATE_TEMPLATE = f"""
Update source at {a.green}{{src_location}}{a.end}
Symlink {a.bold}{{src_location}}{a.end} to {a.bold}{{cli_location}}{a.end}
Version {a.bold}{{local_version}}{a.end} -> {a.bold}{{next_version}}{a.end}
"""
SHIM_TEMPLATE = """\
#!/usr/bin/env python3
{imports}
import subprocess
import sys
if __name__ == "__main__":
vivenv = {use}
sys.exit(subprocess.run([vivenv / "bin" / "{bin}", *sys.argv[1:]]).returncode)
"""
DESCRIPTION = f"""
{a.tagline()}
to create/activate a vivenv:
- from command line: `{a.style("viv -h","bold")}`
- within python script: {a.style('__import__("viv").use("typer", "rich-click")','bold')}
"""
def noqa(txt: str) -> str:
max_length = max(map(len, txt.splitlines()))
return "\n".join((f"{line:{max_length}} # noqa" for line in txt.splitlines()))
def spec_to_import(spec: List[str]) -> None:
spec_str = ", ".join(f'"{pkg}"' for pkg in spec)
sys.stdout.write(IMPORT_TEMPLATE.format(spec=spec_str) + "\n")
def combined_spec(reqs: List[str], requirements: Path) -> List[str]:
if requirements:
with requirements.open("r") as f:
reqs += f.readlines()
return reqs
@ -803,66 +883,6 @@ def resolve_deps(args: Namespace) -> List[str]:
return resolved_spec.splitlines()
def generate_import(
args: Namespace,
) -> None:
spec = resolve_deps(args)
if args.keep:
# re-create env again since path's are hard-coded
vivenv = ViVenv(spec)
if vivenv.name not in [d.name for d in c.venvcache.iterdir()] or os.getenv(
"VIV_FORCE"
):
vivenv.create()
vivenv.install_pkgs()
vivenv.dump_info(write=True)
else:
echo("re-using existing vivenv")
echo("see below for import statements\n")
if args.standalone:
sys.stdout.write(
STANDALONE_TEMPLATE.format(
version=__version__,
func=noqa(
STANDALONE_TEMPLATE_FUNC
+ "_viv_use("
+ fill(
", ".join(f'"{pkg}"' for pkg in spec),
width=100,
subsequent_indent=" ",
)
+ ")"
),
)
+ "\n"
)
return
if args.path:
if args.path == "abs":
sys.stdout.write(
SYS_PATH_TEMPLATE.format(
path_to_viv=Path(__file__).resolve().absolute().parent.parent
)
+ "\n"
)
elif args.path == "rel":
sys.stdout.write(
REL_SYS_PATH_TEMPLATE.format(
path_to_viv=str(
Path(__file__).resolve().absolute().parent.parent
).replace(str(Path.home()), "~")
)
+ "\n"
)
spec_to_import(spec)
def fetch_source(reference: str) -> str:
try:
r = urlopen(
@ -929,21 +949,12 @@ class Viv:
else:
self.git = False
def _check_local_source(self, args: Namespace) -> None:
if not self.local_source and not (args.standalone or args.path):
warn(
"failed to find local copy of `viv` "
"make sure to add it to your PYTHONPATH "
"or consider using --path/--standalone"
)
def _match_vivenv(self, name_id: str) -> ViVenv: # type: ignore[return]
# TODO: improve matching algorithm to favor names over id's
matches: List[ViVenv] = []
for k, v in self.vivenvs.items():
if name_id == k or v.name == name_id:
matches.append(v)
elif k.startswith(name_id) or v.id.startswith(name_id):
elif k.startswith(name_id) or (v.id.startswith(name_id) and v.id == v.name):
matches.append(v)
elif v.name.startswith(name_id):
matches.append(v)
@ -978,16 +989,29 @@ class Viv:
def freeze(self, args: Namespace) -> None:
"""create import statement from package spec"""
self._check_local_source(args)
spec = resolve_deps(args)
if args.keep:
# re-create env again since path's are hard-coded
vivenv = ViVenv(spec)
if not args.reqs:
error("must specify a requirement", code=1)
if args.path and args.standalone:
error("-p/--path and -s/--standalone are mutually exclusive", code=1)
if not vivenv.exists or os.getenv("VIV_FORCE"):
vivenv.create()
vivenv.install_pkgs()
vivenv.dump_info(write=True)
generate_import(
args,
)
else:
echo("re-using existing vivenv")
echo("see below for import statements\n")
if args.standalone:
sys.stdout.write(t.standalone(spec))
return
if args.path and not self.local_source:
error("No local viv found to import from", code=1)
sys.stdout.write(t.frozen_import(args.path, self.local_source, spec))
def list(self, args: Namespace) -> None:
"""list all vivenvs"""
@ -1012,7 +1036,7 @@ class Viv:
a.table(rows)
def exe(self, args: Namespace) -> None:
"""run python/pip in vivenv"""
"""run python/pip in existing vivenv"""
vivenv = self._match_vivenv(args.vivenv)
@ -1062,6 +1086,10 @@ class Viv:
f'{src.relative_to(Path.home()).parent}"\n'
)
def _get_new_version(self, ref: str) -> Tuple[str, str]:
sys.path.append(str(c.srccache))
return (sha256 := fetch_source(ref)), __import__(sha256).__version__
def manage(self, args: Namespace) -> None:
"""manage viv itself"""
@ -1074,31 +1102,15 @@ class Viv:
else:
echo("Current:")
sys.stderr.write(
SHOW_TEMPLATE.format(
version=__version__,
t.show(
cli=shutil.which("viv"),
running_src=self.running_source,
local_src=self.local_source,
running=self.running_source,
local=self.local_source,
)
)
elif args.cmd == "update":
if not self.local_source:
error(
a.style("viv manage update", "bold")
+ " should be used with an exisiting installation",
1,
)
if self.git:
error(
a.style("viv manage update", "bold")
+ " shouldn't be used with a git-based installation",
1,
)
sha256 = fetch_source(args.ref)
sys.path.append(str(c.srccache))
next_version = __import__(sha256).__version__
sha256, next_version = self._get_new_version(args.ref)
if self.local_version == next_version:
echo(f"no change between {args.ref} and local version")
@ -1106,12 +1118,7 @@ class Viv:
if confirm(
"Would you like to perform the above installation steps?",
UPDATE_TEMPLATE.format(
src_location=self.local_source,
local_version=self.local_version,
cli_location=args.cli,
next_version=next_version,
),
t.update(self.local_source, args.cli, self.local_version, next_version),
):
self._install_local_src(
sha256,
@ -1122,19 +1129,8 @@ class Viv:
)
elif args.cmd == "install":
if self.local_source:
error(f"found existing viv installation at {self.local_source}")
echo(
"use "
+ a.style("viv manage update", "bold")
+ " to modify current installation.",
style="red",
)
sys.exit(1)
sha256, downloaded_version = self._get_new_version(args.ref)
sha256 = fetch_source(args.ref)
sys.path.append(str(c.srccache))
downloaded_version = __import__(sha256).__version__
echo(f"Downloaded version: {downloaded_version}")
# TODO: see if file is actually where
@ -1142,12 +1138,10 @@ class Viv:
if confirm(
"Would you like to perform the above installation steps?",
INSTALL_TEMPLATE.format(
src_location=args.src,
cli_location=args.cli,
),
t.install(args.src, args.cli),
):
self._install_local_src(sha256, args.src, args.cli)
elif args.cmd == "purge":
to_remove = []
if c._cache.is_dir():
@ -1193,11 +1187,6 @@ class Viv:
viv shim black
viv shim yartsu -o ~/bin/yartsu --standalone
"""
self._check_local_source(args)
if not args.reqs:
error("please specify at lease one dependency", code=1)
default_bin, bin = self._pick_bin(args)
output = (
c.binparent / default_bin if not args.output else args.output.absolute()
@ -1211,50 +1200,24 @@ class Viv:
else:
spec = combined_spec(args.reqs, args.requirements)
spec_str = ", ".join(f'"{req}"' for req in spec)
if args.standalone:
imports = STANDALONE_TEMPLATE.format(
version=__version__, func=noqa(STANDALONE_TEMPLATE_FUNC)
)
use = f"_viv_use({spec_str})"
elif args.path:
if not self.local_source:
error("No local viv found to import from", code=1)
else:
imports = (
REL_SYS_PATH_TEMPLATE.format(
path_to_viv=str(
self.local_source.resolve().absolute().parent.parent
).replace(str(Path.home()), "~")
)
if args.path == "abs"
else SYS_PATH_TEMPLATE.format(
path_to_viv=self.local_source.resolve().absolute().parent.parent
)
)
use = IMPORT_TEMPLATE.format(spec=spec_str)
else:
imports = ""
use = IMPORT_TEMPLATE.format(spec=spec_str)
if confirm(
f"Write shim for {a.style(bin,'bold')} to {a.style(output,'green')}?"
):
with output.open("w") as f:
f.write(SHIM_TEMPLATE.format(imports=imports, use=use, bin=bin))
f.write(
t.shim(args.path, self.local_source, args.standalone, spec, bin)
)
make_executable(output)
def run(self, args: Namespace) -> None:
"""\
run an app w/ an on-demand venv
run an app with an on-demand venv
examples:
viv r pycowsay -- "Viv isn't venv\!"
viv r pycowsay -- "viv isn't venv\!"
viv r rich -b python -- -m rich
"""
if not args.reqs:
error("please specify at lease one dependency", code=1)
_, bin = self._pick_bin(args)
spec = combined_spec(args.reqs, args.requirements)
@ -1302,10 +1265,53 @@ class Viv:
return parser
def _validate_args(self, args):
if args.func.__name__ in ("freeze", "shim", "run"):
if not args.reqs:
error("must specify a requirement", code=1)
if args.func.__name__ in ("freeze", "shim"):
if not self.local_source and not (args.standalone or args.path):
warn(
"failed to find local copy of `viv` "
"make sure to add it to your PYTHONPATH "
"or consider using --path/--standalone"
)
if args.path and not self.local_source:
error("No local viv found to import from", code=1)
if args.path and args.standalone:
error("-p/--path and -s/--standalone are mutually exclusive", code=1)
if args.func.__name__ == "manage":
if args.cmd == "install" and self.local_source:
error(f"found existing viv installation at {self.local_source}")
echo(
"use "
+ a.style("viv manage update", "bold")
+ " to modify current installation.",
style="red",
)
sys.exit(1)
if args.cmd == "update":
if not self.local_source:
error(
a.style("viv manage update", "bold")
+ " should be used with an exisiting installation",
1,
)
if self.git:
error(
a.style("viv manage update", "bold")
+ " shouldn't be used with a git-based installation",
1,
)
def cli(self) -> None:
"""cli entrypoint"""
parser = ArgumentParser(prog=self.name, description=DESCRIPTION)
parser = ArgumentParser(prog=self.name, description=t.description)
parser.add_argument(
"-V",
"--version",
@ -1500,6 +1506,7 @@ class Viv:
args = parser.parse_args()
args.rest = []
self._validate_args(args)
args.func(
args,
)