viv/viv.py
2023-05-29 18:13:10 +00:00

1514 lines
47 KiB
Python
Executable file

#!/usr/bin/env python3
"""Viv isn't venv!
viv -h
OR
__import__("viv").use("requests", "bs4")
"""
from __future__ import annotations
import hashlib
import itertools
import json
import os
import re
import shlex
import shutil
import site
import subprocess
import sys
import tempfile
import threading
import time
import venv
from argparse import (
SUPPRESS,
Action,
HelpFormatter,
Namespace,
RawDescriptionHelpFormatter,
_SubParsersAction,
)
from argparse import ArgumentParser as StdArgParser
from dataclasses import dataclass
from datetime import datetime
from itertools import zip_longest
from pathlib import Path
from textwrap import dedent, fill, wrap
from types import TracebackType
from typing import (
Any,
Dict,
Generator,
List,
NoReturn,
Optional,
Sequence,
TextIO,
Tuple,
Type,
)
from urllib.error import HTTPError
from urllib.request import urlopen
__version__ = "23.5a3-10-gf2c156d-dev"
class Config:
"""viv config manager"""
def __init__(self) -> None:
self._cache = Path(os.getenv("XDG_CACHE_HOME", Path.home() / ".cache")) / "viv"
def _ensure(self, p: Path) -> Path:
p.mkdir(parents=True, exist_ok=True)
return p
@property
def venvcache(self) -> Path:
return self._ensure(self._cache / "venvs")
@property
def srccache(self) -> Path:
return self._ensure(self._cache / "src")
@property
def binparent(self) -> Path:
return self._ensure(
Path(os.getenv("VIV_BIN_DIR", Path.home() / ".local" / "bin"))
)
@property
def srcdefault(self) -> Path:
parent = (
Path(os.getenv("XDG_DATA_HOME", Path.home() / ".local" / "share")) / "viv"
)
return self._ensure(parent) / "viv.py"
c = Config()
class Spinner:
"""spinner modified from:
https://raw.githubusercontent.com/Tagar/stuff/master/spinner.py
"""
def __init__(self, message: str, delay: float = 0.1) -> None:
self.spinner = itertools.cycle([f"{c} " for c in "⣾⣽⣻⢿⡿⣟⣯⣷"])
self.delay = delay
self.busy = False
self.spinner_visible = False
self.message = message
# sys.stdout.write(message)
echo(message + " ", newline=False)
def write_next(self) -> None:
with self._screen_lock:
if not self.spinner_visible:
sys.stderr.write(next(self.spinner))
self.spinner_visible = True
sys.stderr.flush()
def remove_spinner(self, cleanup: bool = False) -> None:
with self._screen_lock:
if self.spinner_visible:
sys.stderr.write("\b\b\b")
self.spinner_visible = False
if cleanup:
sys.stderr.write(" ") # overwrite spinner with blank
sys.stderr.write("\r\033[K")
sys.stderr.flush()
def spinner_task(self) -> None:
while self.busy:
self.write_next()
time.sleep(self.delay)
self.remove_spinner()
def __enter__(self) -> None:
if sys.stderr.isatty():
self._screen_lock = threading.Lock()
self.busy = True
self.thread = threading.Thread(target=self.spinner_task)
self.thread.start()
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_traceback: Optional[TracebackType],
) -> None:
if sys.stderr.isatty():
self.busy = False
self.remove_spinner(cleanup=True)
else:
sys.stderr.write("\r")
BOX: Dict[str, str] = {
"v": "",
"h": "",
"tl": "",
"tr": "",
"bl": "",
"br": "",
"sep": "",
}
@dataclass
class Ansi:
"""control ouptut of ansi(VT100) control codes"""
bold: str = "\033[1m"
dim: str = "\033[2m"
underline: str = "\033[4m"
red: str = "\033[1;31m"
green: str = "\033[1;32m"
yellow: str = "\033[1;33m"
magenta: str = "\033[1;35m"
cyan: str = "\033[1;36m"
end: str = "\033[0m"
# for argparse help
header: str = cyan
option: str = yellow
metavar: str = "\033[33m" # normal yellow
def __post_init__(self) -> None:
if os.getenv("NO_COLOR") or not sys.stderr.isatty():
for attr in self.__dict__:
setattr(self, attr, "")
self._ansi_escape = re.compile(
r"""
\x1B # ESC
(?: # 7-bit C1 Fe (except CSI)
[@-Z\\-_]
| # or [ for CSI, followed by a control sequence
\[
[0-?]* # Parameter bytes
[ -/]* # Intermediate bytes
[@-~] # Final byte
)
""",
re.VERBOSE,
)
def escape(self, txt: str) -> str:
return self._ansi_escape.sub("", txt)
def style(self, txt: str, style: str = "cyan") -> str:
"""style text with given style
Args:
txt: text to stylize
style: color/style to apply to text
Returns:
ansi escape code stylized text
"""
return f"{getattr(self,style)}{txt}{getattr(self,'end')}"
def tagline(self) -> str:
"""generate the viv tagline!"""
return " ".join(
(
self.style(f, "magenta") + self.style(rest, "cyan")
for f, rest in (("V", "iv"), ("i", "sn't"), ("v", "env!"))
)
)
def subprocess(self, output: str) -> None:
"""generate output for subprocess error
Args:
output: text output from subprocess, usually from p.stdout
"""
if not output:
return
echo("subprocess output:")
new_output = [f"{self.red}->{self.end} {line}" for line in output.splitlines()]
sys.stdout.write("\n".join(new_output) + "\n")
def _get_column_sizes(
self, rows: Tuple[Tuple[str, Sequence[str]], ...]
) -> List[int]:
"""convert list of rows to list of columns sizes
First convert rows into list of columns,
then get max string length for each column.
"""
return list(max(map(len, lst)) for lst in map(list, zip(*rows))) # type: ignore
def _make_row(self, row: Generator[Any, None, None]) -> str:
return f" {BOX['v']} " + f" {BOX['sep']} ".join(row) + f" {BOX['v']}"
def _sanitize_row(
self, sizes: List[int], row: Tuple[str, Sequence[str]]
) -> Tuple[Tuple[str, Sequence[str]], ...]:
if len(row[1]) > sizes[1]:
return tuple(
zip_longest(
(row[0],),
wrap(str(row[1]), break_on_hyphens=False, width=sizes[1]),
fillvalue="",
)
)
else:
return (row,)
def viv_preamble(self, style: str = "magenta", sep: str = "::") -> str:
return f"{self.cyan}Viv{self.end}{self.__dict__[style]}{sep}{self.end}"
def table(
self, rows: Tuple[Tuple[str, Sequence[str]], ...], header_style: str = "cyan"
) -> None:
"""generate a table with outline and styled header assumes two columns
Args:
rows: sequence of the rows, first item assumed to be header
header_style: color/style for header row
"""
sizes = self._get_column_sizes(rows)
col2_limit = shutil.get_terminal_size().columns - 20
if col2_limit < 20:
error("increase screen size to view table", code=1)
elif sizes[1] > col2_limit:
sizes[1] = col2_limit
header, rows = rows[0], rows[1:]
# this is maybe taking comprehensions too far....
table_rows = (
self._make_row(row)
for row in (
# header row
(
self.__dict__[header_style] + f"{cell:<{sizes[i]}}" + self.end
for i, cell in enumerate(header)
),
# rest of the rows
*(
(f"{cell:<{sizes[i]}}" for i, cell in enumerate(row))
for row in (
newrow
for row in rows
for newrow in self._sanitize_row(sizes, row)
)
),
)
)
sys.stderr.write(f" {BOX['tl']}{BOX['h']*(sum(sizes)+5)}{BOX['tr']}\n")
sys.stderr.write("\n".join(table_rows) + "\n")
sys.stderr.write(f" {BOX['bl']}{BOX['h']*(sum(sizes)+5)}{BOX['br']}\n")
a = Ansi()
# TODO: convet the below functions into a proper file/stream logging interface
def echo(
msg: str, style: str = "magenta", newline: bool = True, fd: TextIO = sys.stderr
) -> None:
"""output general message to stdout"""
output = f"{a.viv_preamble(style)} {msg}"
if newline:
output += "\n"
fd.write(output)
def error(msg: str, code: int = 0) -> None:
"""output error message and if code provided exit"""
echo(f"{a.red}error:{a.end} {msg}", style="red")
if code:
sys.exit(code)
def warn(msg: str) -> None:
"""output warning message to stdout"""
echo(f"{a.yellow}warn:{a.end} {msg}", style="yellow")
def confirm(question: str, context: str = "") -> bool:
sys.stderr.write(context)
sys.stderr.write(
a.viv_preamble(sep="?? ") + question + a.style(" (Y)es/(n)o ", "yellow")
)
while True:
ans = input().strip().lower()
if ans in ("y", "yes"):
return True
elif ans in ("n", "no"):
return False
sys.stdout.write("Please select (Y)es or (n)o. ")
sys.stdout.write("\n")
class CustomHelpFormatter(RawDescriptionHelpFormatter, HelpFormatter):
"""formatter to remove extra metavar on short opts"""
def _get_invocation_length(self, invocation: str) -> int:
return len(a.escape(invocation))
def _format_action_invocation(self, action: Action) -> str:
if not action.option_strings:
(metavar,) = self._metavar_formatter(action, action.dest)(1)
return a.style(metavar, style="option")
else:
parts = []
# if the Optional doesn't take a value, format is:
# -s, --long
if action.nargs == 0:
parts.extend(
[
a.style(option, style="option")
for option in action.option_strings
]
)
# if the Optional takes a value, format is:
# -s ARGS, --long ARGS
# change to
# -s, --long ARGS
else:
default = action.dest.upper()
args_string = self._format_args(action, default)
parts.extend(
[
a.style(option, style="option")
for option in action.option_strings
]
)
# add metavar to last string
parts[-1] += a.style(f" {args_string}", style="metavar")
return (", ").join(parts)
def _format_usage(self, *args: Any, **kwargs: Any) -> str:
formatted_usage = super()._format_usage(*args, **kwargs)
# patch usage with color formatting
formatted_usage = (
formatted_usage
if f"{a.header}usage{a.end}:" in formatted_usage
else formatted_usage.replace("usage:", f"{a.header}usage{a.end}:")
)
return formatted_usage
def _format_action(self, action: Action) -> str:
# determine the required width and the entry label
help_position = min(self._action_max_length + 2, self._max_help_position)
help_width = max(self._width - help_position, 11)
action_width = help_position - self._current_indent
action_header = self._format_action_invocation(action)
action_header_len = len(a.escape(action_header))
# no help; start on same line and add a final newline
if not action.help:
action_header = "%*s%s\n" % (self._current_indent, "", action_header)
# short action name; start on the same line and pad two spaces
elif action_header_len <= action_width:
# tup = self._current_indent, "", action_width, action_header
action_header = (
f"{' '*self._current_indent}{action_header}"
f"{' '*(action_width+2 - action_header_len)}"
)
indent_first = 0
# long action name; start on the next line
else:
action_header = "%*s%s\n" % (self._current_indent, "", action_header)
indent_first = help_position
# collect the pieces of the action help
parts = [action_header]
# if there was help for the action, add lines of help text
if action.help and action.help.strip():
help_text = self._expand_help(action)
if help_text:
help_lines = self._split_lines(help_text, help_width)
parts.append("%*s%s\n" % (indent_first, "", help_lines[0]))
for line in help_lines[1:]:
parts.append("%*s%s\n" % (help_position, "", line))
# or add a newline if the description doesn't end with one
elif not action_header.endswith("\n"):
parts.append("\n")
# if there are any sub-actions, add their help as well
for subaction in self._iter_indented_subactions(action):
parts.append(self._format_action(subaction))
# return a single string
return self._join_parts(parts)
def start_section(self, heading: Optional[str]) -> None:
if heading:
super().start_section(a.style(heading, style="header"))
else:
super()
def add_argument(self, action: Action) -> None:
if action.help is not SUPPRESS:
# find all invocations
get_invocation = self._format_action_invocation
invocations = [get_invocation(action)]
for subaction in self._iter_indented_subactions(action):
invocations.append(get_invocation(subaction))
# update the maximum item length accounting for ansi codes
invocation_length = max(map(self._get_invocation_length, invocations))
action_length = invocation_length + self._current_indent
self._action_max_length = max(self._action_max_length, action_length)
# add the item to the list
self._add_item(self._format_action, [action])
class ArgumentParser(StdArgParser):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.formatter_class = lambda prog: CustomHelpFormatter(
prog,
max_help_position=35,
)
def error(self, message: str) -> NoReturn:
error(message)
echo(f"see `{self.prog} --help` for more info", style="red")
sys.exit(2)
def run(
command: List[str],
spinmsg: str = "",
clean_up_path: Optional[Path] = None,
verbose: bool = False,
ignore_error: bool = False,
check_output: bool = False,
) -> str:
"""run a subcommand
Args:
command: Subcommand to be run in subprocess.
verbose: If true, print subcommand output.
"""
if spinmsg and not verbose:
with Spinner(spinmsg):
p = subprocess.run(
command,
stdout=None if verbose else subprocess.PIPE,
stderr=None if verbose else subprocess.STDOUT,
universal_newlines=True,
)
else:
p = subprocess.run(
command,
stdout=None if verbose else subprocess.PIPE,
stderr=None if verbose else subprocess.STDOUT,
universal_newlines=True,
)
if p.returncode != 0 and not ignore_error:
error("subprocess failed")
echo("see below for command output", style="red")
a.subprocess(p.stdout)
if clean_up_path and clean_up_path.is_dir():
shutil.rmtree(str(clean_up_path))
sys.exit(p.returncode)
elif check_output:
return p.stdout
else:
return ""
def get_hash(spec: Tuple[str, ...] | List[str], track_exe: bool = False) -> str:
"""generate a hash of package specifications
Args:
spec: sequence of package specifications
track_exe: if true add python executable to hash
Returns:
sha256 representation of dependencies for vivenv
"""
sha256 = hashlib.sha256()
sha256.update(
(
str(spec) + (str(Path(sys.executable).resolve()) if track_exe else "N/A")
).encode()
)
return sha256.hexdigest()
class ViVenv:
def __init__(
self,
spec: List[str],
track_exe: bool = False,
id: str | None = None,
name: str = "",
path: Path | None = None,
) -> None:
self.spec = self._validate_spec(spec)
self.exe = str(Path(sys.executable).resolve()) if track_exe else "N/A"
self.id = id if id else get_hash(spec, track_exe)
self.name = name if name else self.id
self.path = path if path else c.venvcache / self.name
self.exists = self.name in [d.name for d in c.venvcache.iterdir()]
@classmethod
def load(cls, name: str) -> "ViVenv":
"""generate a vivenv from a viv-info.json file
Args:
name: used as lookup in the vivenv cache
"""
if not (c.venvcache / name / "viv-info.json").is_file():
warn(f"possibly corrupted vivenv: {name}")
return cls(name=name, spec=[""])
else:
with (c.venvcache / name / "viv-info.json").open("r") as f:
venvconfig = json.load(f)
vivenv = cls(name=name, spec=venvconfig["spec"], id=venvconfig["id"])
vivenv.exe = venvconfig["exe"]
return vivenv
def _validate_spec(self, spec: List[str]) -> List[str]:
"""ensure spec is at least of sequence of strings
Args:
spec: sequence of package specifications
"""
if not set(map(type, spec)) == {str}:
error("unexepected input in package spec")
error(f"check your packages definitions: {spec}", code=1)
return sorted(spec)
def create(self, quiet: bool = False) -> None:
if not quiet:
echo(f"new unique vivenv -> {self.name}")
with Spinner("creating vivenv"):
builder = venv.EnvBuilder(with_pip=True, clear=True)
builder.create(self.path)
# add config to ignore pip version
with (self.path / "pip.conf").open("w") as f:
f.write("[global]\ndisable-pip-version-check = true")
def install_pkgs(self) -> None:
cmd: List[str] = [
str(self.path / "bin" / "pip"),
"install",
"--force-reinstall",
] + self.spec
run(
cmd,
spinmsg="installing packages in vivenv",
clean_up_path=self.path,
verbose=bool(os.getenv("VIV_VERBOSE")),
)
def dump_info(self, write: bool = False) -> None:
# TODO: include associated files in 'info'
# means it needs to be loaded first
# or keep a seperate file hash in c.share?
info = {
"created": str(datetime.today()),
"id": self.id,
"spec": self.spec,
"exe": self.exe,
}
# save metadata to json file
if write:
with (self.path / "viv-info.json").open("w") as f:
json.dump(info, f)
else:
info["spec"] = ", ".join(self.spec)
a.table((("key", "value"), *((k, v) for k, v in info.items())))
def use(*packages: str, track_exe: bool = False, name: str = "") -> Path:
"""create a vivenv and append to sys.path
Args:
packages: package specifications with optional version specifiers
track_exe: if true make env python exe specific
name: use as vivenv name, if not provided id is used
"""
vivenv = ViVenv(list(packages), track_exe=track_exe, name=name)
if not vivenv.exists or os.getenv("VIV_FORCE"):
vivenv.create()
vivenv.install_pkgs()
vivenv.dump_info(write=True)
modify_sys_path(vivenv.path)
return vivenv.path
def modify_sys_path(new_path: Path) -> None:
# remove user-site
for i, path in enumerate(sys.path):
if path == site.USER_SITE:
sys.path.pop(i)
sys.path.append(
str([p for p in (new_path / "lib").glob("python*/site-packages")][0])
)
def get_venvs() -> Dict[str, ViVenv]:
vivenvs = {}
for p in c.venvcache.iterdir():
vivenv = ViVenv.load(p.name)
vivenvs[vivenv.name] = vivenv
return vivenvs
# TODO: make a template class?
SYS_PATH_TEMPLATE = """__import__("sys").path.append("{path_to_viv}") # noqa"""
REL_SYS_PATH_TEMPLATE = (
"""__import__("sys").path.append(__import__("os")"""
""".path.expanduser("{path_to_viv}")) # noqa"""
)
IMPORT_TEMPLATE = """__import__("viv").use({spec}) # noqa"""
STANDALONE_TEMPLATE = r"""
# <<<<< auto-generated by viv (v{version})
# see `python3 <(curl -fsSL gh.dayl.in/viv/viv.py) --help`
# fmt: off
{func}
# fmt: on
# >>>>> code golfed with <3
""" # noqa
STANDALONE_TEMPLATE_FUNC = r"""def _viv_use(*pkgs, track_exe=False, name=""):
T,F=True,False;i,s,m,e,spec=__import__,str,map,lambda x: T if x else F,[*pkgs]
if not {*m(type,pkgs)}=={s}: raise ValueError(f"spec: {pkgs} is invalid")
ge,sys,P,ew=i("os").getenv,i("sys"),i("pathlib").Path,i("sys").stderr.write
(cache:=(P(ge("XDG_CACHE_HOME",P.home()/".cache"))/"viv"/"venvs")).mkdir(parents=T,exist_ok=T)
((sha256:=i("hashlib").sha256()).update((s(spec)+
(((exe:=("N/A",s(P(i("sys").executable).resolve()))[e(track_exe)])))).encode()))
if ((env:=cache/(name if name else (_id:=sha256.hexdigest())))
not in cache.glob("*/")) or ge("VIV_FORCE"):
v=e(ge("VIV_VERBOSE"));ew(f"generating new vivenv -> {env.name}\n")
i("venv").EnvBuilder(with_pip=T,clear=T).create(env)
with (env/"pip.conf").open("w") as f:f.write("[global]\ndisable-pip-version-check=true")
if (p:=i("subprocess").run([env/"bin"/"pip","install","--force-reinstall",*spec],text=True,
stdout=(-1,None)[v],stderr=(-2,None)[v])).returncode!=0:
if env.is_dir():i("shutil").rmtree(env)
ew(f"pip had non zero exit ({p.returncode})\n{p.stdout}\n");sys.exit(p.returncode)
with (env/"viv-info.json").open("w") as f:
i("json").dump({"created":s(i("datetime").datetime.today()),
"id":_id,"spec":spec,"exe":exe},f)
sys.path = [p for p in (*sys.path,s(*(env/"lib").glob("py*/si*"))) if p!=i("site").USER_SITE]
return env
""" # noqa
SHOW_TEMPLATE = f"""
{a.style('Version', 'bold')}: {{version}}
{a.style('CLI', 'bold')}: {{cli}}
{a.style('Running Source', 'bold')}: {{running_src}}
{a.style('Local Source', 'bold')}: {{local_src}}
"""
INSTALL_TEMPLATE = f"""
Install viv.py to {a.green}{{src_location}}{a.end}
Symlink {a.bold}{{src_location}}{a.end} to {a.bold}{{cli_location}}{a.end}
"""
UPDATE_TEMPLATE = f"""
Update source at {a.green}{{src_location}}{a.end}
Symlink {a.bold}{{src_location}}{a.end} to {a.bold}{{cli_location}}{a.end}
Version {a.bold}{{local_version}}{a.end} -> {a.bold}{{next_version}}{a.end}
"""
SHIM_TEMPLATE = """\
#!/usr/bin/env python3
{imports}
import subprocess
import sys
if __name__ == "__main__":
vivenv = {use}
sys.exit(subprocess.run([vivenv / "bin" / "{bin}", *sys.argv[1:]]).returncode)
"""
DESCRIPTION = f"""
{a.tagline()}
to create/activate a vivenv:
- from command line: `{a.style("viv -h","bold")}`
- within python script: {a.style('__import__("viv").use("typer", "rich-click")','bold')}
"""
def noqa(txt: str) -> str:
max_length = max(map(len, txt.splitlines()))
return "\n".join((f"{line:{max_length}} # noqa" for line in txt.splitlines()))
def spec_to_import(spec: List[str]) -> None:
spec_str = ", ".join(f'"{pkg}"' for pkg in spec)
sys.stdout.write(IMPORT_TEMPLATE.format(spec=spec_str) + "\n")
def combined_spec(reqs: List[str], requirements: Path) -> List[str]:
if requirements:
with requirements.open("r") as f:
reqs += f.readlines()
return reqs
def resolve_deps(args: Namespace) -> List[str]:
spec = combined_spec(args.reqs, args.requirements)
with tempfile.TemporaryDirectory(prefix="viv-") as tmpdir:
echo("generating frozen spec")
vivenv = ViVenv(spec, track_exe=False, path=Path(tmpdir))
vivenv.create(quiet=True)
# populate the environment for now use
# custom cmd since using requirements file
cmd = [
str(vivenv.path / "bin" / "pip"),
"install",
"--force-reinstall",
] + spec
run(cmd, spinmsg="resolving dependencies", clean_up_path=vivenv.path)
cmd = [str(vivenv.path / "bin" / "pip"), "freeze"]
resolved_spec = run(cmd, check_output=True)
return resolved_spec.splitlines()
def generate_import(
args: Namespace,
) -> None:
spec = resolve_deps(args)
if args.keep:
# re-create env again since path's are hard-coded
vivenv = ViVenv(spec)
if vivenv.name not in [d.name for d in c.venvcache.iterdir()] or os.getenv(
"VIV_FORCE"
):
vivenv.create()
vivenv.install_pkgs()
vivenv.dump_info(write=True)
else:
echo("re-using existing vivenv")
echo("see below for import statements\n")
if args.standalone:
sys.stdout.write(
STANDALONE_TEMPLATE.format(
version=__version__,
func=noqa(
STANDALONE_TEMPLATE_FUNC
+ "_viv_use("
+ fill(
", ".join(f'"{pkg}"' for pkg in spec),
width=100,
subsequent_indent=" ",
)
+ ")"
),
)
+ "\n"
)
return
if args.path:
if args.path == "abs":
sys.stdout.write(
SYS_PATH_TEMPLATE.format(
path_to_viv=Path(__file__).resolve().absolute().parent.parent
)
+ "\n"
)
elif args.path == "rel":
sys.stdout.write(
REL_SYS_PATH_TEMPLATE.format(
path_to_viv=str(
Path(__file__).resolve().absolute().parent.parent
).replace(str(Path.home()), "~")
)
+ "\n"
)
spec_to_import(spec)
def fetch_source(reference: str) -> str:
try:
r = urlopen(
"https://raw.githubusercontent.com/daylinmorgan/viv/"
+ reference
+ "/src/viv/viv.py"
)
except HTTPError as e:
error(
"Issue updating viv see below:"
+ a.style("-> ", "red").join(["\n"] + repr(e).splitlines())
)
if "404" in repr(e):
echo("Please check your reference is valid.", style="red")
sys.exit(1)
src = r.read()
(hash := hashlib.sha256()).update(src)
sha256 = hash.hexdigest()
cached_src_file = c.srccache / f"{sha256}.py"
if not cached_src_file.is_file():
with cached_src_file.open("w") as f:
f.write(src.decode())
return sha256
def make_executable(path: Path) -> None:
"""apply an executable bit for all users with read access"""
mode = os.stat(path).st_mode
mode |= (mode & 0o444) >> 2 # copy R bits to X
os.chmod(path, mode)
class Viv:
def __init__(self) -> None:
self.vivenvs = get_venvs()
self._get_sources()
self.name = (
"viv" if self.local else "python3 <(curl -fsSL gh.dayl.in/viv/viv.py)"
)
def _get_sources(self) -> None:
self.local_source: Optional[Path] = None
self.running_source = Path(__file__).resolve()
self.local = not str(self.running_source).startswith("/proc/")
if self.local:
self.local_source = self.running_source
self.local_version = __version__
else:
try:
_local_viv = __import__("viv")
self.local_source = (
Path(_local_viv.__file__) if _local_viv.__file__ else None
)
self.local_version = _local_viv.__version__
except ImportError:
self.local_version = "Not Found"
if self.local_source:
self.git = (self.local_source.parent.parent.parent / ".git").is_dir()
else:
self.git = False
def _check_local_source(self, args: Namespace) -> None:
if not self.local_source and not (args.standalone or args.path):
warn(
"failed to find local copy of `viv` "
"make sure to add it to your PYTHONPATH "
"or consider using --path/--standalone"
)
def _match_vivenv(self, name_id: str) -> ViVenv: # type: ignore[return]
# TODO: improve matching algorithm to favor names over id's
matches: List[ViVenv] = []
for k, v in self.vivenvs.items():
if name_id == k or v.name == name_id:
matches.append(v)
elif k.startswith(name_id) or v.id.startswith(name_id):
matches.append(v)
elif v.name.startswith(name_id):
matches.append(v)
if len(matches) == 1:
return matches[0]
elif len(matches) > 1:
echo(f"matches {','.join((match.name for match in matches))}", style="red")
error("too many matches maybe try a longer name?", code=1)
else:
error(f"no matches found for {name_id}", code=1)
def remove(self, args: Namespace) -> None:
"""\
remove a vivenv
To remove all viv venvs:
`viv rm $(viv l -q)`
"""
for name in args.vivenv:
vivenv = self._match_vivenv(name)
if vivenv.path.is_dir():
echo(f"removing {vivenv.name}")
shutil.rmtree(vivenv.path)
else:
error(
f"cowardly exiting because I didn't find vivenv: {name}",
code=1,
)
def freeze(self, args: Namespace) -> None:
"""create import statement from package spec"""
self._check_local_source(args)
if not args.reqs:
error("must specify a requirement", code=1)
if args.path and args.standalone:
error("-p/--path and -s/--standalone are mutually exclusive", code=1)
generate_import(
args,
)
def list(self, args: Namespace) -> None:
"""list all vivenvs"""
if args.quiet:
sys.stdout.write("\n".join(self.vivenvs) + "\n")
elif len(self.vivenvs) == 0:
echo("no vivenvs setup")
else:
rows = (
("vivenv", "spec"),
*(
(
f"{vivenv.name[:6]}..."
if len(vivenv.name) > 9
else vivenv.name,
", ".join(vivenv.spec),
)
for vivenv in self.vivenvs.values()
),
)
a.table(rows)
def exe(self, args: Namespace) -> None:
"""run python/pip in vivenv"""
vivenv = self._match_vivenv(args.vivenv)
pip_path, python_path = (vivenv.path / "bin" / cmd for cmd in ("pip", "python"))
# todo check for vivenv
echo(f"executing command within {vivenv.name}")
cmd = (
f"{pip_path} {' '.join(args.cmd)}"
if args.exe == "pip"
else f"{python_path} {' '.join(args.cmd)}"
) + " ".join(args.rest)
echo(f"executing {cmd}")
run(shlex.split(cmd), verbose=True)
def info(self, args: Namespace) -> None:
"""get metadata about a vivenv"""
vivenv = self._match_vivenv(args.vivenv)
metadata_file = vivenv.path / "viv-info.json"
if not metadata_file.is_file():
error(f"Unable to find metadata for vivenv: {args.vivenv}", code=1)
echo(f"more info about {vivenv.name}:")
vivenv.dump_info()
def _install_local_src(self, sha256: str, src: Path, cli: Path) -> None:
echo("updating local source copy of viv")
shutil.copy(c.srccache / f"{sha256}.py", src)
make_executable(src)
echo("symlinking cli")
if cli.is_file() and confirm(
f"Existing file at {a.style(str(cli),'bold')}, "
"would you like to overwrite it?"
):
cli.unlink()
cli.symlink_to(src)
else:
cli.symlink_to(src)
echo("Remember to include the following line in your shell rc file:")
sys.stderr.write(
' export PYTHONPATH="$PYTHONPATH:$HOME/'
f'{src.relative_to(Path.home()).parent}"\n'
)
def manage(self, args: Namespace) -> None:
"""manage viv itself"""
if args.cmd == "show":
if args.pythonpath:
if self.local and self.local_source:
sys.stdout.write(str(self.local_source.parent) + "\n")
else:
error("expected to find a local installation", code=1)
else:
echo("Current:")
sys.stderr.write(
SHOW_TEMPLATE.format(
version=__version__,
cli=shutil.which("viv"),
running_src=self.running_source,
local_src=self.local_source,
)
)
elif args.cmd == "update":
if not self.local_source:
error(
a.style("viv manage update", "bold")
+ " should be used with an exisiting installation",
1,
)
if self.git:
error(
a.style("viv manage update", "bold")
+ " shouldn't be used with a git-based installation",
1,
)
sha256 = fetch_source(args.ref)
sys.path.append(str(c.srccache))
next_version = __import__(sha256).__version__
if self.local_version == next_version:
echo(f"no change between {args.ref} and local version")
sys.exit(0)
if confirm(
"Would you like to perform the above installation steps?",
UPDATE_TEMPLATE.format(
src_location=self.local_source,
local_version=self.local_version,
cli_location=args.cli,
next_version=next_version,
),
):
self._install_local_src(
sha256,
Path(
args.src if not self.local_source else self.local_source,
),
args.cli,
)
elif args.cmd == "install":
if self.local_source:
error(f"found existing viv installation at {self.local_source}")
echo(
"use "
+ a.style("viv manage update", "bold")
+ " to modify current installation.",
style="red",
)
sys.exit(1)
sha256 = fetch_source(args.ref)
sys.path.append(str(c.srccache))
downloaded_version = __import__(sha256).__version__
echo(f"Downloaded version: {downloaded_version}")
# TODO: see if file is actually where
# we are about to install and give more instructions
if confirm(
"Would you like to perform the above installation steps?",
INSTALL_TEMPLATE.format(
src_location=args.src,
cli_location=args.cli,
),
):
self._install_local_src(sha256, args.src, args.cli)
elif args.cmd == "purge":
to_remove = []
if c._cache.is_dir():
to_remove.append(c._cache)
if args.src.is_file():
to_remove.append(
args.src.parent if args.src == c.srcdefault else args.src
)
if self.local_source and self.local_source.is_file():
if self.local_source.parent.name == "viv":
to_remove.append(self.local_source.parent)
else:
to_remove.append(self.local_source)
if args.cli.is_file():
to_remove.append(args.cli)
to_remove = list(set(to_remove))
if confirm(
"Remove the above files/directories?",
"\n".join(f" - {a.red}{p}{a.end}" for p in to_remove) + "\n",
):
for p in to_remove:
if p.is_dir():
shutil.rmtree(p)
else:
p.unlink()
echo(
"to re-install use: "
"`python3 <(curl -fsSL gh.dayl.in/viv/viv.py) manage install`"
)
def _pick_bin(self, args: Namespace) -> Tuple[str, str]:
default = re.split(r"[=><~!*]+", args.reqs[0])[0]
return default, (default if not args.bin else args.bin)
def shim(self, args: Namespace) -> None:
"""\
generate viv-powered cli apps
examples:
viv shim black
viv shim yartsu -o ~/bin/yartsu --standalone
"""
self._check_local_source(args)
if not args.reqs:
error("please specify at lease one dependency", code=1)
default_bin, bin = self._pick_bin(args)
output = (
c.binparent / default_bin if not args.output else args.output.absolute()
)
if output.is_file():
error(f"{output} already exists...exiting", code=1)
if args.freeze:
spec = resolve_deps(args)
else:
spec = combined_spec(args.reqs, args.requirements)
spec_str = ", ".join(f'"{req}"' for req in spec)
if args.standalone:
imports = STANDALONE_TEMPLATE.format(
version=__version__, func=noqa(STANDALONE_TEMPLATE_FUNC)
)
use = f"_viv_use({spec_str})"
elif args.path:
if not self.local_source:
error("No local viv found to import from", code=1)
else:
imports = (
REL_SYS_PATH_TEMPLATE.format(
path_to_viv=str(
self.local_source.resolve().absolute().parent.parent
).replace(str(Path.home()), "~")
)
if args.path == "abs"
else SYS_PATH_TEMPLATE.format(
path_to_viv=self.local_source.resolve().absolute().parent.parent
)
)
use = IMPORT_TEMPLATE.format(spec=spec_str)
else:
imports = ""
use = IMPORT_TEMPLATE.format(spec=spec_str)
if confirm(
f"Write shim for {a.style(bin,'bold')} to {a.style(output,'green')}?"
):
with output.open("w") as f:
f.write(SHIM_TEMPLATE.format(imports=imports, use=use, bin=bin))
make_executable(output)
def run(self, args: Namespace) -> None:
"""\
run an app w/ an on-demand venv
examples:
viv r pycowsay -- "Viv isn't venv\!"
viv r rich -b python -- -m rich
"""
if not args.reqs:
error("please specify at lease one dependency", code=1)
_, bin = self._pick_bin(args)
spec = combined_spec(args.reqs, args.requirements)
vivenv = ViVenv(spec)
# TODO: respect a VIV_RUN_MODE env variable as the same as keep i.e.
# ephemeral (default), semi-ephemeral (persist inside /tmp), or
# persist (use c.cache)
if not vivenv.exists or os.getenv("VIV_FORCE"):
if not args.keep:
with tempfile.TemporaryDirectory(prefix="viv-") as tmpdir:
vivenv.path = Path(tmpdir)
vivenv.create()
vivenv.install_pkgs()
sys.exit(
subprocess.run(
[vivenv.path / "bin" / bin, *args.rest]
).returncode
)
else:
vivenv.create()
vivenv.install_pkgs()
vivenv.dump_info(write=True)
sys.exit(subprocess.run([vivenv.path / "bin" / bin, *args.rest]).returncode)
def _get_subcmd_parser(
self,
subparsers: _SubParsersAction[ArgumentParser],
name: str,
attr: Optional[str] = None,
**kwargs: Any,
) -> ArgumentParser:
aliases = kwargs.pop("aliases", [name[0]])
cmd = getattr(self, attr if attr else name)
parser: ArgumentParser = subparsers.add_parser(
name,
help=cmd.__doc__.splitlines()[0],
description=dedent(cmd.__doc__),
aliases=aliases,
**kwargs,
)
parser.set_defaults(func=cmd)
return parser
def cli(self) -> None:
"""cli entrypoint"""
parser = ArgumentParser(prog=self.name, description=DESCRIPTION)
parser.add_argument(
"-V",
"--version",
action="version",
version=f"{a.bold}viv{a.end}, version {a.cyan}{__version__}{a.end}",
)
subparsers = parser.add_subparsers(
metavar="<sub-cmd>", title="subcommands", required=True
)
p_vivenv_arg = ArgumentParser(add_help=False)
p_vivenv_arg.add_argument("vivenv", help="name/hash of vivenv")
p_list = self._get_subcmd_parser(subparsers, "list")
p_list.add_argument(
"-q",
"--quiet",
help="suppress non-essential output",
action="store_true",
default=False,
)
p_exe = self._get_subcmd_parser(
subparsers,
"exe",
)
p_exe_sub = p_exe.add_subparsers(
title="subcommand", metavar="<sub-cmd>", required=True
)
p_exe_shared = ArgumentParser(add_help=False)
p_exe_shared.add_argument(
"cmd",
help="command to to execute",
nargs="*",
)
p_exe_sub.add_parser(
"python",
help="run command with python",
parents=[p_vivenv_arg, p_exe_shared],
).set_defaults(func=self.exe, exe="python")
p_exe_sub.add_parser(
"pip", help="run command with pip", parents=[p_vivenv_arg, p_exe_shared]
).set_defaults(func=self.exe, exe="pip")
p_remove = self._get_subcmd_parser(
subparsers,
"remove",
aliases=["rm"],
)
p_remove.add_argument("vivenv", help="name/hash of vivenv", nargs="*")
p_freeze_shim_shared = ArgumentParser(add_help=False)
p_freeze_shim_shared.add_argument(
"-p",
"--path",
help="generate line to add viv to sys.path",
choices=["abs", "rel"],
)
p_freeze_shim_shared.add_argument(
"-r",
"--requirements",
help="path/to/requirements.txt file",
metavar="<path>",
)
p_freeze_shim_shared.add_argument(
"-k",
"--keep",
help="preserve environment",
action="store_true",
)
p_freeze_shim_shared.add_argument(
"-s",
"--standalone",
help="generate standalone activation function",
action="store_true",
)
p_freeze_shim_shared.add_argument(
"reqs", help="requirements specifiers", nargs="*"
)
self._get_subcmd_parser(subparsers, "freeze", parents=[p_freeze_shim_shared])
self._get_subcmd_parser(
subparsers,
"info",
parents=[p_vivenv_arg],
)
p_manage_shared = ArgumentParser(add_help=False)
p_manage_shared.add_argument(
"-r",
"--ref",
help="git reference (branch/tag/commit)",
default="latest",
metavar="<ref>",
)
p_manage_shared.add_argument(
"-s",
"--src",
help="path/to/source_file",
default=c.srcdefault,
metavar="<src>",
)
p_manage_shared.add_argument(
"-c",
"--cli",
help="path/to/cli (symlink to src)",
default=Path.home() / ".local" / "bin" / "viv",
metavar="<cli>",
)
p_manage_sub = self._get_subcmd_parser(
subparsers,
name="manage",
).add_subparsers(title="subcommand", metavar="<sub-cmd>", required=True)
p_manage_sub.add_parser(
"install", help="install viv", aliases="i", parents=[p_manage_shared]
).set_defaults(func=self.manage, cmd="install")
p_manage_sub.add_parser(
"update",
help="update viv version",
aliases="u",
parents=[p_manage_shared],
).set_defaults(func=self.manage, cmd="update")
(
p_manage_show := p_manage_sub.add_parser(
"show", help="show current installation info", aliases="s"
)
).set_defaults(func=self.manage, cmd="show")
p_manage_show.add_argument(
"-p", "--pythonpath", help="show the path/to/install", action="store_true"
)
p_manage_sub.add_parser(
"purge", help="remove traces of viv", aliases="p", parents=[p_manage_shared]
).set_defaults(func=self.manage, cmd="purge")
p_shim = self._get_subcmd_parser(
subparsers, "shim", parents=[p_freeze_shim_shared]
)
p_shim.add_argument(
"-f",
"--freeze",
help="freeze/resolve all dependencies",
action="store_true",
)
p_shim.add_argument(
"-o",
"--output",
help="path/to/output file",
type=Path,
metavar="<path>",
)
p_shim.add_argument(
"-b", "--bin", help="console_script/script to invoke", metavar="<bin>"
)
p_run = self._get_subcmd_parser(subparsers, "run")
p_run.add_argument(
"-r",
"--requirements",
help="path/to/requirements.txt file",
metavar="<path>",
)
p_run.add_argument(
"-k",
"--keep",
help="preserve environment",
action="store_true",
)
p_run.add_argument("reqs", help="requirements specifiers", nargs="*")
p_run.add_argument(
"-b", "--bin", help="console_script/script to invoke", metavar="<bin>"
)
if "--" in sys.argv:
i = sys.argv.index("--")
args = parser.parse_args(sys.argv[1:i])
args.rest = sys.argv[i + 1 :]
else:
args = parser.parse_args()
args.rest = []
args.func(
args,
)
def main() -> None:
viv = Viv()
viv.cli()
if __name__ == "__main__":
main()