From 6b8d2fdeeabf1e307f1d30276331eb5da480d3d7 Mon Sep 17 00:00:00 2001 From: daylinmorgan Date: Fri, 26 May 2023 23:42:22 +0000 Subject: [PATCH] deploy: e3ebca86d2b13f84e70cd93aa5cf5b5cd9def48b --- .nojekyll | 0 index.html | 9 + viv/viv.py | 1274 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 1283 insertions(+) create mode 100644 .nojekyll create mode 100644 index.html create mode 100755 viv/viv.py diff --git a/.nojekyll b/.nojekyll new file mode 100644 index 0000000..e69de29 diff --git a/index.html b/index.html new file mode 100644 index 0000000..b4ab837 --- /dev/null +++ b/index.html @@ -0,0 +1,9 @@ + + + + + + + + + diff --git a/viv/viv.py b/viv/viv.py new file mode 100755 index 0000000..363146d --- /dev/null +++ b/viv/viv.py @@ -0,0 +1,1274 @@ +#!/usr/bin/env python3 +"""Viv isn't venv! + + viv -h + OR + __import__("viv").use("requests", "bs4") +""" + +from __future__ import annotations + +import hashlib +import itertools +import json +import os +import re +import shlex +import shutil +import site +import subprocess +import sys +import tempfile +import threading +import time +from urllib.request import urlopen +from urllib.error import HTTPError +import venv +from argparse import SUPPRESS, Action +from argparse import ArgumentParser as StdArgParser +from argparse import ( + HelpFormatter, + Namespace, + RawDescriptionHelpFormatter, + _SubParsersAction, +) +from dataclasses import dataclass +from datetime import datetime +from itertools import zip_longest +from pathlib import Path +from textwrap import dedent, wrap +from types import TracebackType +from typing import ( + Any, + Dict, + Generator, + List, + NoReturn, + Optional, + Sequence, + TextIO, + Tuple, + Type, +) + +__version__ = "22.12a3-64-gf27cdf8-dev" + + +@dataclass +class Config: + """viv config manager""" + + venvcache: Path = ( + Path(os.getenv("XDG_CACHE_HOME", Path.home() / ".cache")) / "viv" / "venvs" + ) + srccache: Path = ( + Path(os.getenv("XDG_CACHE_HOME", Path.home() / ".cache")) / "viv" / "src" + ) + srcdefault: Path = ( + Path(os.getenv("XDG_DATA_HOME", Path.home() / ".local" / "share")) + / "viv" + / "viv.py" + ) + + def __post_init__(self) -> None: + self.venvcache.mkdir(parents=True, exist_ok=True) + self.srccache.mkdir( + parents=True, + exist_ok=True, + ) + self.srcdefault.parent.mkdir(parents=True, exist_ok=True) + + +c = Config() + + +class Spinner: + """spinner modified from: + https://raw.githubusercontent.com/Tagar/stuff/master/spinner.py + """ + + def __init__(self, message: str, delay: float = 0.1) -> None: + self.spinner = itertools.cycle([f"{c} " for c in "⣾⣽⣻⢿⡿⣟⣯⣷"]) + self.delay = delay + self.busy = False + self.spinner_visible = False + self.message = message + # sys.stdout.write(message) + echo(message + " ", newline=False) + + def write_next(self) -> None: + with self._screen_lock: + if not self.spinner_visible: + sys.stderr.write(next(self.spinner)) + self.spinner_visible = True + sys.stderr.flush() + + def remove_spinner(self, cleanup: bool = False) -> None: + with self._screen_lock: + if self.spinner_visible: + sys.stderr.write("\b\b\b") + self.spinner_visible = False + if cleanup: + sys.stderr.write(" ") # overwrite spinner with blank + sys.stderr.write("\r\033[K") + sys.stderr.flush() + + def spinner_task(self) -> None: + while self.busy: + self.write_next() + time.sleep(self.delay) + self.remove_spinner() + + def __enter__(self) -> None: + if sys.stderr.isatty(): + self._screen_lock = threading.Lock() + self.busy = True + self.thread = threading.Thread(target=self.spinner_task) + self.thread.start() + + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_traceback: Optional[TracebackType], + ) -> None: + if sys.stderr.isatty(): + self.busy = False + self.remove_spinner(cleanup=True) + else: + sys.stderr.write("\r") + + +BOX: Dict[str, str] = { + "v": "│", + "h": "─", + "tl": "╭", + "tr": "╮", + "bl": "╰", + "br": "╯", + "sep": "┆", +} + + +@dataclass +class Ansi: + """control ouptut of ansi(VT100) control codes""" + + bold: str = "\033[1m" + dim: str = "\033[2m" + underline: str = "\033[4m" + red: str = "\033[1;31m" + green: str = "\033[1;32m" + yellow: str = "\033[1;33m" + magenta: str = "\033[1;35m" + cyan: str = "\033[1;36m" + end: str = "\033[0m" + + # for argparse help + header: str = cyan + option: str = yellow + metavar: str = "\033[33m" # normal yellow + + def __post_init__(self) -> None: + if os.getenv("NO_COLOR") or not sys.stderr.isatty(): + for attr in self.__dict__: + setattr(self, attr, "") + + self._ansi_escape = re.compile( + r""" + \x1B # ESC + (?: # 7-bit C1 Fe (except CSI) + [@-Z\\-_] + | # or [ for CSI, followed by a control sequence + \[ + [0-?]* # Parameter bytes + [ -/]* # Intermediate bytes + [@-~] # Final byte + ) + """, + re.VERBOSE, + ) + + def escape(self, txt: str) -> str: + return self._ansi_escape.sub("", txt) + + def style(self, txt: str, style: str = "cyan") -> str: + """style text with given style + Args: + txt: text to stylize + style: color/style to apply to text + Returns: + ansi escape code stylized text + """ + return f"{getattr(self,style)}{txt}{getattr(self,'end')}" + + def tagline(self) -> str: + """generate the viv tagline!""" + + return " ".join( + ( + self.style(f, "magenta") + self.style(rest, "cyan") + for f, rest in (("V", "iv"), ("i", "sn't"), ("v", "env!")) + ) + ) + + def subprocess(self, output: str) -> None: + """generate output for subprocess error + + Args: + output: text output from subprocess, usually from p.stdout + """ + + echo("subprocess output:") + new_output = [f"{self.red}->{self.end} {line}" for line in output.splitlines()] + sys.stdout.write("\n".join(new_output) + "\n") + + def _get_column_sizes( + self, rows: Tuple[Tuple[str, Sequence[str]], ...] + ) -> List[int]: + """convert list of rows to list of columns sizes + + First convert rows into list of columns, + then get max string length for each column. + """ + return list(max(map(len, lst)) for lst in map(list, zip(*rows))) # type: ignore + + def _make_row(self, row: Generator[Any, None, None]) -> str: + return f" {BOX['v']} " + f" {BOX['sep']} ".join(row) + f" {BOX['v']}" + + def _sanitize_row( + self, sizes: List[int], row: Tuple[str, Sequence[str]] + ) -> Tuple[Tuple[str, Sequence[str]], ...]: + if len(row[1]) > sizes[1]: + return tuple( + zip_longest( + (row[0],), + wrap(str(row[1]), break_on_hyphens=False, width=sizes[1]), + fillvalue="", + ) + ) + else: + return (row,) + + def viv_preamble(self, style: str = "magenta", sep: str = "::") -> str: + return f"{self.cyan}Viv{self.end}{self.__dict__[style]}{sep}{self.end}" + + def table( + self, rows: Tuple[Tuple[str, Sequence[str]], ...], header_style: str = "cyan" + ) -> None: + """generate a table with outline and styled header assumes two columns + + Args: + rows: sequence of the rows, first item assumed to be header + header_style: color/style for header row + """ + + sizes = self._get_column_sizes(rows) + + col2_limit = shutil.get_terminal_size().columns - 20 + if col2_limit < 20: + error("increase screen size to view table", code=1) + elif sizes[1] > col2_limit: + sizes[1] = col2_limit + + header, rows = rows[0], rows[1:] + # this is maybe taking comprehensions too far.... + table_rows = ( + self._make_row(row) + for row in ( + # header row + ( + self.__dict__[header_style] + f"{cell:<{sizes[i]}}" + self.end + for i, cell in enumerate(header) + ), + # rest of the rows + *( + (f"{cell:<{sizes[i]}}" for i, cell in enumerate(row)) + for row in ( + newrow + for row in rows + for newrow in self._sanitize_row(sizes, row) + ) + ), + ) + ) + + sys.stderr.write(f" {BOX['tl']}{BOX['h']*(sum(sizes)+5)}{BOX['tr']}\n") + sys.stderr.write("\n".join(table_rows) + "\n") + sys.stderr.write(f" {BOX['bl']}{BOX['h']*(sum(sizes)+5)}{BOX['br']}\n") + + +a = Ansi() + + +def error(msg: str, code: int = 0) -> None: + """output error message and if code provided exit""" + echo(f"{a.red}error:{a.end} {msg}", style="red") + if code: + sys.exit(code) + + +def warn(msg: str) -> None: + """output warning message to stdout""" + echo(f"{a.yellow}warn:{a.end} {msg}", style="yellow") + + +def echo( + msg: str, style: str = "magenta", newline: bool = True, fd: TextIO = sys.stderr +) -> None: + """output general message to stdout""" + output = f"{a.viv_preamble(style)} {msg}" + if newline: + output += "\n" + fd.write(output) + + +def confirm(question: str, context: str = "") -> bool: + sys.stderr.write(context) + sys.stderr.write( + a.viv_preamble(sep="?? ") + question + a.style(" (Y)es/(n)o: ", "yellow") + ) + while True: + ans = input().strip().lower() + if ans in ("y", "yes"): + return True + elif ans in ("n", "no"): + return False + sys.stdout.write("Please select (Y)es or (n)o. ") + sys.stdout.write("\n") + + +def run( + command: List[str], + spinmsg: str = "", + clean_up_path: Optional[Path] = None, + verbose: bool = False, + ignore_error: bool = False, + check_output: bool = False, +) -> str: + """run a subcommand + + Args: + command: Subcommand to be run in subprocess. + verbose: If true, print subcommand output. + """ + + if spinmsg and not verbose: + with Spinner(spinmsg): + p = subprocess.run( + command, + stdout=None if verbose else subprocess.PIPE, + stderr=None if verbose else subprocess.STDOUT, + universal_newlines=True, + ) + else: + p = subprocess.run( + command, + stdout=None if verbose else subprocess.PIPE, + stderr=None if verbose else subprocess.STDOUT, + universal_newlines=True, + ) + + if p.returncode != 0 and not ignore_error: + error("subprocess failed") + echo("see below for command output", style="red") + a.subprocess(p.stdout) + + if clean_up_path and clean_up_path.is_dir(): + shutil.rmtree(str(clean_up_path)) + + sys.exit(p.returncode) + + elif check_output: + return p.stdout + + else: + return "" + + +def get_hash(spec: Tuple[str, ...] | List[str], track_exe: bool = False) -> str: + """generate a hash of package specifications + + Args: + spec: sequence of package specifications + track_exe: if true add python executable to hash + Returns: + sha256 representation of dependencies for vivenv + """ + + sha256 = hashlib.sha256() + sha256.update( + ( + str(spec) + (str(Path(sys.executable).resolve()) if track_exe else "N/A") + ).encode() + ) + + return sha256.hexdigest() + + +class ViVenv: + def __init__( + self, + spec: List[str], + track_exe: bool = False, + id: str | None = None, + name: str = "", + path: Path | None = None, + ) -> None: + self.spec = spec + self.exe = str(Path(sys.executable).resolve()) if track_exe else "N/A" + self.id = id if id else get_hash(spec, track_exe) + self.name = name if name else self.id + self.path = path if path else c.venvcache / self.name + + @classmethod + def load(cls, name: str) -> "ViVenv": + """generate a vivenv from a viv-info.json file + Args: + name: used as lookup in the vivenv cache + """ + if not (c.venvcache / name / "viv-info.json").is_file(): + warn(f"possibly corrupted vivenv: {name}") + return cls(name=name, spec=[""]) + else: + with (c.venvcache / name / "viv-info.json").open("r") as f: + venvconfig = json.load(f) + + vivenv = cls(name=name, spec=venvconfig["spec"], id=venvconfig["id"]) + vivenv.exe = venvconfig["exe"] + + return vivenv + + def create(self) -> None: + echo(f"new unique vivenv -> {self.name}") + with Spinner("creating vivenv"): + builder = venv.EnvBuilder(with_pip=True, clear=True) + builder.create(self.path) + + # add config to ignore pip version + with (self.path / "pip.conf").open("w") as f: + f.write("[global]\ndisable-pip-version-check = true") + + def install_pkgs(self) -> None: + cmd: List[str] = [ + str(self.path / "bin" / "pip"), + "install", + "--force-reinstall", + ] + self.spec + + run( + cmd, + spinmsg="installing packages in vivenv", + clean_up_path=self.path, + verbose=bool(os.getenv("VIV_VERBOSE")), + ) + + def dump_info(self, write: bool = False) -> None: + # TODO: include associated files in 'info' + # means it needs to be loaded first + info = { + "created": str(datetime.today()), + "id": self.id, + "spec": self.spec, + "exe": self.exe, + } + # save metadata to json file + if write: + with (self.path / "viv-info.json").open("w") as f: + json.dump(info, f) + else: + info["spec"] = ", ".join(self.spec) + a.table((("key", "value"), *((k, v) for k, v in info.items()))) + + +def use(*packages: str, track_exe: bool = False, name: str = "") -> ViVenv: + """create a vivenv and append to sys.path + + Args: + packages: package specifications with optional version specifiers + track_exe: if true make env python exe specific + name: use as vivenv name, if not provided id is used + """ + validate_spec(packages) + vivenv = ViVenv(list(packages), track_exe=track_exe, name=name) + + if vivenv.name not in [d.name for d in c.venvcache.iterdir()] or os.getenv( + "VIV_FORCE" + ): + vivenv.create() + vivenv.install_pkgs() + vivenv.dump_info(write=True) + + modify_sys_path(vivenv.path) + return vivenv + + +def validate_spec(spec: Tuple[str, ...]) -> None: + """ensure spec is at least of sequence of strings + + Args: + spec: sequence of package specifications + """ + # ? make this a part of ViVenv? + if not set(map(type, spec)) == {str}: + error("unexepected input in package spec") + error(f"check your packages definitions: {spec}", code=1) + + +def modify_sys_path(new_path: Path) -> None: + # remove user-site + for i, path in enumerate(sys.path): + if path == site.USER_SITE: + sys.path.pop(i) + + sys.path.append( + str([p for p in (new_path / "lib").glob("python*/site-packages")][0]) + ) + + +def get_venvs() -> Dict[str, ViVenv]: + vivenvs = {} + for p in c.venvcache.iterdir(): + vivenv = ViVenv.load(p.name) + vivenvs[vivenv.name] = vivenv + return vivenvs + + +SYS_PATH_TEMPLATE = """__import__("sys").path.append("{path_to_viv}") # noqa""" +REL_SYS_PATH_TEMPLATE = ( + """__import__("sys").path.append(__import__("os")""" + """.path.expanduser("{path_to_viv}")) # noqa""" +) +IMPORT_TEMPLATE = """__import__("viv").use({spec}) # noqa""" + +STANDALONE_TEMPLATE = r""" +# <<<<< auto-generated by daylinmorgan/viv v{version} +# fmt: off +{use} +# fmt: on +# >>>>> code golfed with <3 +""" # noqa + +STANDALONE_TEMPLATE_USE = r"""def _viv_use(*pkgs: str, track_exe: bool = False, name: str = "") -> None: + i,s,m,e,spec=__import__,str,map,lambda x: True if x else False,[*pkgs] + if not {{*m(type,pkgs)}}=={{s}}: raise ValueError(f"spec: {{pkgs}} is invalid") + ge,sys,P,ew=i("os").getenv,i("sys"),i("pathlib").Path,i("sys").stderr.write + (cache:=(P(ge("XDG_CACHE_HOME",P.home()/".cache"))/"viv"/"venvs")).mkdir(parents=True,exist_ok=True) + ((sha256:=i("hashlib").sha256()).update((s(spec)+ + (((exe:=("N/A",s(P(i("sys").executable).resolve()))[e(track_exe)])))).encode())) + if (env:=cache/(name if name else (_id:=sha256.hexdigest()))) not in cache.glob("*/") or ge("VIV_FORCE"): + v=e(ge("VIV_VERBOSE"));ew(f"generating new vivenv -> {{env.name}}\n") + i("venv").EnvBuilder(with_pip=True,clear=True).create(env) + with (env/"pip.conf").open("w") as f:f.write("[global]\ndisable-pip-version-check=true") + if (p:=i("subprocess").run([env/"bin"/"pip","install","--force-reinstall",*spec],text=True, + stdout=(-1,None)[v],stderr=(-2,None)[v])).returncode!=0: + if env.is_dir():i("shutil").rmtree(env) + ew(f"pip had non zero exit ({{p.returncode}})\n{{p.stdout}}\n");sys.exit(p.returncode) + with (env/"viv-info.json").open("w") as f: + i("json").dump({{"created":s(i("datetime").datetime.today()),"id":_id,"spec":spec,"exe":exe}},f) + sys.path = [p for p in (*sys.path,s(*(env/"lib").glob("py*/si*"))) if p!=i("site").USER_SITE] +_viv_use({spec}) +""" # noqa + +SHOW_TEMPLATE = f""" + {a.style('Version', 'bold')}: {{version}} + {a.style('CLI', 'bold')}: {{cli}} + {a.style('Running Source', 'bold')}: {{running_src}} + {a.style('Local Source', 'bold')}: {{local_src}} +""" + +INSTALL_TEMPLATE = f""" + Install viv.py to {a.green}{{src_location}}{a.end} + Symlink {a.bold}{{src_location}}{a.end} to {a.bold}{{cli_location}}{a.end} + +""" + +UPDATE_TEMPLATE = f""" + Update source at {a.green}{{src_location}}{a.end} + Symlink {a.bold}{{src_location}}{a.end} to {a.bold}{{cli_location}}{a.end} + Version {a.bold}{{local_version}}{a.end} -> {a.bold}{{next_version}}{a.end} + +""" + + +def noqa(txt: str) -> str: + max_length = max(map(len, txt.splitlines())) + return "\n".join((f"{line:{max_length}} # noqa" for line in txt.splitlines())) + + +def spec_to_import(spec: List[str]) -> None: + spec_str = ", ".join(f'"{pkg}"' for pkg in spec) + sys.stdout.write(IMPORT_TEMPLATE.format(spec=spec_str) + "\n") + + +def freeze_venv(spec: List[str], path: Path | None = None) -> Tuple[ViVenv, str]: + vivenv = ViVenv(spec, track_exe=False, path=path) + + vivenv.create() + # populate the environment for now use + # custom cmd since using requirements file + cmd = [ + str(vivenv.path / "bin" / "pip"), + "install", + "--force-reinstall", + ] + spec + + run(cmd, spinmsg="resolving dependencies", clean_up_path=vivenv.path) + + # generate a frozen environment + cmd = [str(vivenv.path / "bin" / "pip"), "freeze"] + resolved_spec = run(cmd, check_output=True) + return vivenv, resolved_spec + + +def generate_import( + requirements: Path, + reqs: List[str], + vivenvs: Dict[str, ViVenv], + include_path: bool, + keep: bool, + standalone: bool, +) -> None: + reqs_from_file = [] + + if requirements: + with requirements.open("r") as f: + reqs_from_file = f.readlines() + + with tempfile.TemporaryDirectory() as tmpdir: + echo("generating frozen spec") + vivenv, resolved_spec = freeze_venv(reqs + reqs_from_file, path=Path(tmpdir)) + + if keep: + # create env again since path's are hard-coded + vivenv = ViVenv(resolved_spec.splitlines()) + + if vivenv.name not in [d.name for d in c.venvcache.iterdir()] or os.getenv( + "VIV_FORCE" + ): + vivenv.create() + vivenv.install_pkgs() + vivenv.dump_info(write=True) + + else: + echo("re-using existing vivenv") + + echo("see below for import statements\n") + + if standalone: + sys.stdout.write( + STANDALONE_TEMPLATE.format( + version=__version__, + use=noqa( + STANDALONE_TEMPLATE_USE.format( + spec=", ".join(f'"{pkg}"' for pkg in resolved_spec.splitlines()) + ) + ), + ) + + "\n" + ) + return + + if include_path == "absolute": + sys.stdout.write( + SYS_PATH_TEMPLATE.format( + path_to_viv=Path(__file__).resolve().absolute().parent.parent + ) + + "\n" + ) + elif include_path == "relative": + sys.stdout.write( + REL_SYS_PATH_TEMPLATE.format( + path_to_viv=str( + Path(__file__).resolve().absolute().parent.parent + ).replace(str(Path.home()), "~") + ) + + "\n" + ) + + spec_to_import(resolved_spec.splitlines()) + + +class CustomHelpFormatter(RawDescriptionHelpFormatter, HelpFormatter): + """formatter to remove extra metavar on short opts""" + + def _get_invocation_length(self, invocation: str) -> int: + return len(a.escape(invocation)) + + def _format_action_invocation(self, action: Action) -> str: + if not action.option_strings: + (metavar,) = self._metavar_formatter(action, action.dest)(1) + return a.style(metavar, style="option") + else: + parts = [] + # if the Optional doesn't take a value, format is: + # -s, --long + if action.nargs == 0: + parts.extend( + [ + a.style(option, style="option") + for option in action.option_strings + ] + ) + + # if the Optional takes a value, format is: + # -s ARGS, --long ARGS + # change to + # -s, --long ARGS + else: + default = action.dest.upper() + args_string = self._format_args(action, default) + parts.extend( + [ + a.style(option, style="option") + for option in action.option_strings + ] + ) + # add metavar to last string + parts[-1] += a.style(f" {args_string}", style="metavar") + return (", ").join(parts) + + def _format_usage(self, *args: Any, **kwargs: Any) -> str: + formatted_usage = super()._format_usage(*args, **kwargs) + # patch usage with color formatting + formatted_usage = ( + formatted_usage + if f"{a.header}usage{a.end}:" in formatted_usage + else formatted_usage.replace("usage:", f"{a.header}usage{a.end}:") + ) + return formatted_usage + + def _format_action(self, action: Action) -> str: + # determine the required width and the entry label + help_position = min(self._action_max_length + 2, self._max_help_position) + help_width = max(self._width - help_position, 11) + action_width = help_position - self._current_indent + action_header = self._format_action_invocation(action) + action_header_len = len(a.escape(action_header)) + + # no help; start on same line and add a final newline + if not action.help: + action_header = "%*s%s\n" % (self._current_indent, "", action_header) + # short action name; start on the same line and pad two spaces + elif action_header_len <= action_width: + # tup = self._current_indent, "", action_width, action_header + action_header = ( + f"{' '*self._current_indent}{action_header}" + f"{' '*(action_width+2 - action_header_len)}" + ) + indent_first = 0 + + # long action name; start on the next line + else: + action_header = "%*s%s\n" % (self._current_indent, "", action_header) + indent_first = help_position + + # collect the pieces of the action help + parts = [action_header] + + # if there was help for the action, add lines of help text + if action.help and action.help.strip(): + help_text = self._expand_help(action) + if help_text: + help_lines = self._split_lines(help_text, help_width) + parts.append("%*s%s\n" % (indent_first, "", help_lines[0])) + for line in help_lines[1:]: + parts.append("%*s%s\n" % (help_position, "", line)) + + # or add a newline if the description doesn't end with one + elif not action_header.endswith("\n"): + parts.append("\n") + + # if there are any sub-actions, add their help as well + for subaction in self._iter_indented_subactions(action): + parts.append(self._format_action(subaction)) + + # return a single string + return self._join_parts(parts) + + def start_section(self, heading: Optional[str]) -> None: + if heading: + super().start_section(a.style(heading, style="header")) + else: + super() + + def add_argument(self, action: Action) -> None: + if action.help is not SUPPRESS: + # find all invocations + get_invocation = self._format_action_invocation + invocations = [get_invocation(action)] + for subaction in self._iter_indented_subactions(action): + invocations.append(get_invocation(subaction)) + + # update the maximum item length accounting for ansi codes + invocation_length = max(map(self._get_invocation_length, invocations)) + action_length = invocation_length + self._current_indent + self._action_max_length = max(self._action_max_length, action_length) + + # add the item to the list + self._add_item(self._format_action, [action]) + + +class ArgumentParser(StdArgParser): + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + + self.formatter_class = lambda prog: CustomHelpFormatter( + prog, + max_help_position=35, + ) + + def error(self, message: str) -> NoReturn: + error(message) + echo("see below for help\n", style="red") + self.print_help() + sys.exit(2) + + +description = f""" + +{a.tagline()} +to create/activate a vivenv: +- from command line: `{a.style("viv -h","bold")}` +- within python script: {a.style('__import__("viv").use("typer", "rich-click")','bold')} +""" + + +def fetch_source(reference: str) -> str: + try: + r = urlopen( + "https://raw.githubusercontent.com/daylinmorgan/viv/" + + reference + + "/src/viv/viv.py" + ) + except HTTPError as e: + error( + "Issue updating viv see below:" + + a.style("-> ", "red").join(["\n"] + repr(e).splitlines()) + ) + if "404" in repr(e): + echo("Please check your reference is valid.", style="red") + sys.exit(1) + + src = r.read() + (hash := hashlib.sha256()).update(src) + sha256 = hash.hexdigest() + + cached_src_file = c.srccache / f"{sha256}.py" + + if not cached_src_file.is_file(): + with cached_src_file.open("w") as f: + f.write(src.decode()) + + return sha256 + + +def make_executable(path: Path) -> None: + """apply an executable bit for all users with read access""" + mode = os.stat(path).st_mode + mode |= (mode & 0o444) >> 2 # copy R bits to X + os.chmod(path, mode) + + +class Viv: + def __init__(self) -> None: + self.vivenvs = get_venvs() + self._get_sources() + self.name = ( + "viv" if self.local else "python3 <(curl -fsSL gh.dayl.in/viv/viv.py)" + ) + + def _get_sources(self) -> None: + self.local_source: Path | str + self.running_source = Path(__file__).resolve() + self.local = not str(self.running_source).startswith("/proc/") + if self.local: + self.local_source = self.running_source + self.local_version = __version__ + else: + try: + _local_viv = __import__("viv") + self.local_source = ( + _local_viv.__file__ if _local_viv.__file__ else "Not Found" + ) + self.local_version = _local_viv.__version__ + except ImportError: + self.local_source = self.local_version = "Not Found" + + self.git = self.local_source != "Not Found" and str(self.local_source).endswith( + "src/viv/__init__.py" + ) + + def _match_vivenv(self, name_id: str) -> ViVenv: # type: ignore[return] + # TODO: improve matching algorithm to favor names over id's + matches: List[ViVenv] = [] + for k, v in self.vivenvs.items(): + if name_id == k or v.name == name_id: + matches.append(v) + elif k.startswith(name_id) or v.id.startswith(name_id): + matches.append(v) + elif v.name.startswith(name_id): + matches.append(v) + + if len(matches) == 1: + return matches[0] + elif len(matches) > 1: + echo(f"matches {','.join((match.name for match in matches))}", style="red") + error("too many matches maybe try a longer name?", code=1) + else: + error(f"no matches found for {name_id}", code=1) + + def remove(self, args: Namespace) -> None: + """\ + remove a vivenv + + To remove all viv venvs: + `viv rm $(viv l -q)` + """ + + for name in args.vivenv: + vivenv = self._match_vivenv(name) + if vivenv.path.is_dir(): + echo(f"removing {vivenv.name}") + shutil.rmtree(vivenv.path) + else: + error( + f"cowardly exiting because I didn't find vivenv: {name}", + code=1, + ) + + def freeze(self, args: Namespace) -> None: + """create import statement from package spec""" + + if not args.reqs: + error("must specify a requirement") + sys.exit(1) + + generate_import( + args.requirements, + args.reqs, + self.vivenvs, + args.path, + args.keep, + args.standalone, + ) + + def list(self, args: Namespace) -> None: + """list all vivenvs""" + + if args.quiet: + sys.stdout.write("\n".join(self.vivenvs) + "\n") + elif len(self.vivenvs) == 0: + echo("no vivenvs setup") + else: + rows = ( + ("vivenv", "spec"), + *( + ( + f"{vivenv.name[:6]}..." + if len(vivenv.name) > 9 + else vivenv.name, + ", ".join(vivenv.spec), + ) + for vivenv in self.vivenvs.values() + ), + ) + a.table(rows) + + def exe(self, args: Namespace) -> None: + """run python/pip in vivenv""" + + vivenv = self._match_vivenv(args.vivenv) + + pip_path, python_path = (vivenv.path / "bin" / cmd for cmd in ("pip", "python")) + # todo check for vivenv + echo(f"executing command within {vivenv.name}") + + cmd = ( + f"{pip_path} {' '.join(args.cmd)}" + if args.exe == "pip" + else f"{python_path} {' '.join(args.cmd)}" + ) + + echo(f"executing {cmd}") + run(shlex.split(cmd), verbose=True) + + def info(self, args: Namespace) -> None: + """get metadata about a vivenv""" + vivenv = self._match_vivenv(args.vivenv) + metadata_file = vivenv.path / "viv-info.json" + + if not metadata_file.is_file(): + error(f"Unable to find metadata for vivenv: {args.vivenv}", code=1) + + echo(f"more info about {vivenv.name}:") + + vivenv.dump_info() + + def _install_local_src(self, sha256: str, src: Path, cli: Path) -> None: + echo("updating local source copy of viv") + shutil.copy(c.srccache / f"{sha256}.py", src) + make_executable(src) + echo("symlinking cli") + + if not cli.is_file(): + cli.symlink_to(src) + else: + cli.unlink() + cli.symlink_to(src) + + echo("Remember to include the following line in your shell rc file:") + sys.stderr.write( + ' export PYTHONPATH="$PYTHONPATH:$HOME/' + f'{src.relative_to(Path.home())}"\n' + ) + + def manage(self, args: Namespace) -> None: + """manage viv itself""" + + if args.cmd == "show": + echo("Current:") + sys.stdout.write( + SHOW_TEMPLATE.format( + version=__version__, + cli=shutil.which("viv"), + running_src=self.running_source, + local_src=self.local_source, + ) + ) + + elif args.cmd == "update": + if self.local_source == "Not Found": + error( + a.style("viv manage update", "bold") + + " should be used with an exisiting installation", + 1, + ) + + if self.git: + error( + a.style("viv manage update", "bold") + + " shouldn't be used with a git-based installation", + 1, + ) + sha256 = fetch_source(args.reference) + sys.path.append(str(c.srccache)) + next_version = __import__(sha256).__version__ + + if self.local_version == next_version: + echo(f"no change between {args.reference} and local version") + sys.exit(0) + + if confirm( + "Would you like to perform the above installation steps?", + UPDATE_TEMPLATE.format( + src_location=self.local_source, + local_version=self.local_version, + cli_location=args.cli, + next_version=next_version, + ), + ): + self._install_local_src( + sha256, + Path( + args.src + if self.local_source == "Not Found" + else self.local_source, + ), + args.cli, + ) + + elif args.cmd == "install": + if not self.local_source == "Not Found": + error(f"found existing viv installation at {self.local_source}") + echo( + "use " + + a.style("viv manage update", "bold") + + " to modify current installation.", + style="red", + ) + sys.exit(1) + + sha256 = fetch_source(args.reference) + sys.path.append(str(c.srccache)) + downloaded_version = __import__(sha256).__version__ + echo(f"Downloaded version: {downloaded_version}") + + # TODO: see if file is actually where + # we are about to install and give more instructions + + if confirm( + "Would you like to perform the above installation steps?", + INSTALL_TEMPLATE.format( + src_location=args.src, + cli_location=args.cli, + ), + ): + self._install_local_src(sha256, args.src, args.cli) + + def _get_subcmd_parser( + self, + subparsers: _SubParsersAction[ArgumentParser], + name: str, + attr: Optional[str] = None, + **kwargs: Any, + ) -> ArgumentParser: + aliases = kwargs.pop("aliases", [name[0]]) + cmd = getattr(self, attr if attr else name) + parser: ArgumentParser = subparsers.add_parser( + name, + help=cmd.__doc__.splitlines()[0], + description=dedent(cmd.__doc__), + aliases=aliases, + **kwargs, + ) + parser.set_defaults(func=cmd) + + return parser + + def cli(self) -> None: + """cli entrypoint""" + + parser = ArgumentParser(prog=self.name, description=description) + parser.add_argument( + "-V", + "--version", + action="version", + version=f"{a.bold}viv{a.end}, version {a.cyan}{__version__}{a.end}", + ) + + subparsers = parser.add_subparsers( + metavar="", title="subcommands", required=True + ) + p_vivenv_arg = ArgumentParser(add_help=False) + p_vivenv_arg.add_argument("vivenv", help="name/hash of vivenv") + p_list = self._get_subcmd_parser(subparsers, "list") + + p_list.add_argument( + "-q", + "--quiet", + help="suppress non-essential output", + action="store_true", + default=False, + ) + + p_exe = self._get_subcmd_parser( + subparsers, + "exe", + ) + p_exe_sub = p_exe.add_subparsers( + title="subcommand", metavar="", required=True + ) + + p_exe_shared = ArgumentParser(add_help=False) + p_exe_shared.add_argument( + "cmd", + help="command to to execute", + nargs="*", + ) + + p_exe_sub.add_parser( + "python", + help="run command with python", + parents=[p_vivenv_arg, p_exe_shared], + ).set_defaults(func=self.exe, exe="python") + + p_exe_sub.add_parser( + "pip", help="run command with pip", parents=[p_vivenv_arg, p_exe_shared] + ).set_defaults(func=self.exe, exe="pip") + + p_remove = self._get_subcmd_parser( + subparsers, + "remove", + aliases=["rm"], + ) + + p_remove.add_argument("vivenv", help="name/hash of vivenv", nargs="*") + p_freeze = self._get_subcmd_parser( + subparsers, + "freeze", + ) + p_freeze.add_argument( + "-p", + "--path", + help="generate line to add viv to sys.path", + choices=["absolute", "relative"], + ) + p_freeze.add_argument( + "-r", + "--requirements", + help="path to requirements.txt file", + metavar="", + ) + p_freeze.add_argument( + "-k", + "--keep", + help="preserve environment", + action="store_true", + ) + p_freeze.add_argument( + "-s", + "--standalone", + help="generate standalone activation function", + action="store_true", + ) + p_freeze.add_argument("reqs", help="requirements specifiers", nargs="*") + + self._get_subcmd_parser( + subparsers, + "info", + parents=[p_vivenv_arg], + ) + p_manage_shared = ArgumentParser(add_help=False) + p_manage_shared.add_argument( + "-r", + "--reference", + help="git reference (branch/tag/commit)", + default="main", + ) + + p_manage_shared.add_argument( + "-s", + "--src", + help="path/to/source_file", + default=c.srcdefault, + ) + p_manage_shared.add_argument( + "-c", + "--cli", + help="path/to/cli (symlink to src)", + default=Path.home() / "bin" / "viv", + ) + + p_manage_sub = self._get_subcmd_parser( + subparsers, + name="manage", + ).add_subparsers(title="subcommand", metavar="", required=True) + + p_manage_sub.add_parser( + "install", help="install viv", aliases="i", parents=[p_manage_shared] + ).set_defaults(func=self.manage, cmd="install") + + p_manage_sub.add_parser( + "update", + help="update viv version", + aliases="u", + parents=[p_manage_shared], + ).set_defaults(func=self.manage, cmd="update") + + p_manage_sub.add_parser( + "show", help="show current installation info", aliases="s" + ).set_defaults(func=self.manage, cmd="show") + + args = parser.parse_args() + + args.func(args) + + +def main() -> None: + viv = Viv() + viv.cli() + + +if __name__ == "__main__": + main()