|
|
|
@ -6,8 +6,6 @@
|
|
|
|
|
__import__("viv").use("requests", "bs4")
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
|
|
import hashlib
|
|
|
|
|
import itertools
|
|
|
|
|
import json
|
|
|
|
@ -21,37 +19,18 @@ import sys
|
|
|
|
|
import tempfile
|
|
|
|
|
import threading
|
|
|
|
|
import time
|
|
|
|
|
from urllib.request import urlopen
|
|
|
|
|
from urllib.error import HTTPError
|
|
|
|
|
import venv
|
|
|
|
|
from argparse import SUPPRESS, Action
|
|
|
|
|
from argparse import SUPPRESS
|
|
|
|
|
from argparse import ArgumentParser as StdArgParser
|
|
|
|
|
from argparse import (
|
|
|
|
|
HelpFormatter,
|
|
|
|
|
Namespace,
|
|
|
|
|
RawDescriptionHelpFormatter,
|
|
|
|
|
_SubParsersAction,
|
|
|
|
|
)
|
|
|
|
|
from argparse import HelpFormatter, RawDescriptionHelpFormatter
|
|
|
|
|
from dataclasses import dataclass
|
|
|
|
|
from datetime import datetime
|
|
|
|
|
from itertools import zip_longest
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
from textwrap import dedent, wrap
|
|
|
|
|
from types import TracebackType
|
|
|
|
|
from typing import (
|
|
|
|
|
Any,
|
|
|
|
|
Dict,
|
|
|
|
|
List,
|
|
|
|
|
NoReturn,
|
|
|
|
|
Optional,
|
|
|
|
|
Sequence,
|
|
|
|
|
TextIO,
|
|
|
|
|
Tuple,
|
|
|
|
|
Type,
|
|
|
|
|
Generator,
|
|
|
|
|
)
|
|
|
|
|
from typing import Dict, List, Tuple
|
|
|
|
|
|
|
|
|
|
__version__ = "22.12a3-48-gf87aab9-dev"
|
|
|
|
|
__version__ = "22.12a3-35-g0d0c66d-dev"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
@ -62,7 +41,7 @@ class Config:
|
|
|
|
|
Path(os.getenv("XDG_CACHE_HOME", Path.home() / ".cache")) / "viv" / "venvs"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def __post_init__(self) -> None:
|
|
|
|
|
def __post_init__(self):
|
|
|
|
|
self.venvcache.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -74,7 +53,7 @@ class Spinner:
|
|
|
|
|
https://raw.githubusercontent.com/Tagar/stuff/master/spinner.py
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def __init__(self, message: str, delay: float = 0.1) -> None:
|
|
|
|
|
def __init__(self, message, delay=0.1):
|
|
|
|
|
self.spinner = itertools.cycle([f"{c} " for c in "⣾⣽⣻⢿⡿⣟⣯⣷"])
|
|
|
|
|
self.delay = delay
|
|
|
|
|
self.busy = False
|
|
|
|
@ -83,42 +62,40 @@ class Spinner:
|
|
|
|
|
# sys.stdout.write(message)
|
|
|
|
|
echo(message + " ", newline=False)
|
|
|
|
|
|
|
|
|
|
def write_next(self) -> None:
|
|
|
|
|
def write_next(self):
|
|
|
|
|
with self._screen_lock:
|
|
|
|
|
if not self.spinner_visible:
|
|
|
|
|
sys.stderr.write(next(self.spinner))
|
|
|
|
|
self.spinner_visible = True
|
|
|
|
|
sys.stderr.flush()
|
|
|
|
|
|
|
|
|
|
def remove_spinner(self, cleanup: bool = False) -> None:
|
|
|
|
|
def remove_spinner(self, cleanup=False):
|
|
|
|
|
with self._screen_lock:
|
|
|
|
|
if self.spinner_visible:
|
|
|
|
|
sys.stderr.write("\b\b\b")
|
|
|
|
|
# sys.stdout.write("\b")
|
|
|
|
|
self.spinner_visible = False
|
|
|
|
|
if cleanup:
|
|
|
|
|
sys.stderr.write(" ") # overwrite spinner with blank
|
|
|
|
|
# sys.stdout.write("\r") # move to next line
|
|
|
|
|
# move back then delete the line
|
|
|
|
|
sys.stderr.write("\r\033[K")
|
|
|
|
|
sys.stderr.flush()
|
|
|
|
|
|
|
|
|
|
def spinner_task(self) -> None:
|
|
|
|
|
def spinner_task(self):
|
|
|
|
|
while self.busy:
|
|
|
|
|
self.write_next()
|
|
|
|
|
time.sleep(self.delay)
|
|
|
|
|
self.remove_spinner()
|
|
|
|
|
|
|
|
|
|
def __enter__(self) -> None:
|
|
|
|
|
def __enter__(self):
|
|
|
|
|
if sys.stderr.isatty():
|
|
|
|
|
self._screen_lock = threading.Lock()
|
|
|
|
|
self.busy = True
|
|
|
|
|
self.thread = threading.Thread(target=self.spinner_task)
|
|
|
|
|
self.thread.start()
|
|
|
|
|
|
|
|
|
|
def __exit__(
|
|
|
|
|
self,
|
|
|
|
|
exc_type: Optional[Type[BaseException]],
|
|
|
|
|
exc_val: Optional[BaseException],
|
|
|
|
|
exc_traceback: Optional[TracebackType],
|
|
|
|
|
) -> None:
|
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_traceback): # noqa
|
|
|
|
|
if sys.stderr.isatty():
|
|
|
|
|
self.busy = False
|
|
|
|
|
self.remove_spinner(cleanup=True)
|
|
|
|
@ -156,7 +133,7 @@ class Ansi:
|
|
|
|
|
option: str = yellow
|
|
|
|
|
metavar: str = "\033[33m" # normal yellow
|
|
|
|
|
|
|
|
|
|
def __post_init__(self) -> None:
|
|
|
|
|
def __post_init__(self):
|
|
|
|
|
if os.getenv("NO_COLOR") or not sys.stderr.isatty():
|
|
|
|
|
for attr in self.__dict__:
|
|
|
|
|
setattr(self, attr, "")
|
|
|
|
@ -189,7 +166,7 @@ class Ansi:
|
|
|
|
|
"""
|
|
|
|
|
return f"{getattr(self,style)}{txt}{getattr(self,'end')}"
|
|
|
|
|
|
|
|
|
|
def tagline(self) -> str:
|
|
|
|
|
def tagline(self):
|
|
|
|
|
"""generate the viv tagline!"""
|
|
|
|
|
|
|
|
|
|
return " ".join(
|
|
|
|
@ -199,78 +176,70 @@ class Ansi:
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def subprocess(self, output: str) -> None:
|
|
|
|
|
def subprocess(self, output):
|
|
|
|
|
"""generate output for subprocess error
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
output: text output from subprocess, usually from p.stdout
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
echo("subprocess output:")
|
|
|
|
|
new_output = [f"{self.red}->{self.end} {line}" for line in output.splitlines()]
|
|
|
|
|
|
|
|
|
|
sys.stdout.write("\n".join(new_output) + "\n")
|
|
|
|
|
|
|
|
|
|
def _get_column_sizes(
|
|
|
|
|
self, rows: Tuple[Tuple[str, Sequence[str]], ...]
|
|
|
|
|
) -> List[int]:
|
|
|
|
|
"""convert list of rows to list of columns sizes
|
|
|
|
|
def _get_column_size(self, sizes, row):
|
|
|
|
|
for i, length in enumerate(len(cell) for cell in row):
|
|
|
|
|
if length > sizes[i]:
|
|
|
|
|
sizes[i] = length
|
|
|
|
|
return sizes
|
|
|
|
|
|
|
|
|
|
First convert rows into list of columns,
|
|
|
|
|
then get max string length for each column.
|
|
|
|
|
"""
|
|
|
|
|
return list(max(map(len, lst)) for lst in map(list, zip(*rows))) # type: ignore
|
|
|
|
|
|
|
|
|
|
def _make_row(self, row: Generator[Any, None, None]) -> str:
|
|
|
|
|
def _make_row(self, row) -> str:
|
|
|
|
|
return f" {BOX['v']} " + f" {BOX['sep']} ".join(row) + f" {BOX['v']}"
|
|
|
|
|
|
|
|
|
|
def _sanitize_row(
|
|
|
|
|
self, sizes: List[int], row: Tuple[str, Sequence[str]]
|
|
|
|
|
) -> Tuple[Tuple[str, Sequence[str]], ...]:
|
|
|
|
|
def _sanitize_row(self, sizes: List[int], row: Tuple[str]) -> Tuple[Tuple[str]]:
|
|
|
|
|
if len(row[1]) > sizes[1]:
|
|
|
|
|
return tuple(
|
|
|
|
|
zip_longest(
|
|
|
|
|
return zip_longest(
|
|
|
|
|
(row[0],),
|
|
|
|
|
wrap(str(row[1]), break_on_hyphens=False, width=sizes[1]),
|
|
|
|
|
wrap(row[1], break_on_hyphens=False, width=sizes[1]),
|
|
|
|
|
fillvalue="",
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
else:
|
|
|
|
|
return (row,)
|
|
|
|
|
|
|
|
|
|
def table(
|
|
|
|
|
self, rows: Tuple[Tuple[str, Sequence[str]], ...], header_style: str = "cyan"
|
|
|
|
|
) -> None:
|
|
|
|
|
"""generate a table with outline and styled header assumes two columns
|
|
|
|
|
def table(self, rows, header_style="cyan") -> None:
|
|
|
|
|
"""generate a table with outline and styled header
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
rows: sequence of the rows, first item assumed to be header
|
|
|
|
|
header_style: color/style for header row
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
sizes = self._get_column_sizes(rows)
|
|
|
|
|
sizes = [0] * len(rows[0])
|
|
|
|
|
for row in rows:
|
|
|
|
|
sizes = self._get_column_size(sizes, row)
|
|
|
|
|
|
|
|
|
|
col2_limit = shutil.get_terminal_size().columns - 20
|
|
|
|
|
if col2_limit < 20:
|
|
|
|
|
col1_limit = shutil.get_terminal_size().columns - 20
|
|
|
|
|
if col1_limit < 20:
|
|
|
|
|
error("increase screen size to view table", code=1)
|
|
|
|
|
elif sizes[1] > col2_limit:
|
|
|
|
|
sizes[1] = col2_limit
|
|
|
|
|
|
|
|
|
|
header, rows = rows[0], rows[1:]
|
|
|
|
|
if sizes[1] > col1_limit:
|
|
|
|
|
sizes[1] = col1_limit
|
|
|
|
|
|
|
|
|
|
# this is maybe taking comprehensions too far....
|
|
|
|
|
|
|
|
|
|
table_rows = (
|
|
|
|
|
self._make_row(row)
|
|
|
|
|
for row in (
|
|
|
|
|
# header row
|
|
|
|
|
(
|
|
|
|
|
self.__dict__[header_style] + f"{cell:<{sizes[i]}}" + self.end
|
|
|
|
|
for i, cell in enumerate(header)
|
|
|
|
|
for i, cell in enumerate(rows[0])
|
|
|
|
|
),
|
|
|
|
|
# rest of the rows
|
|
|
|
|
*(
|
|
|
|
|
(f"{cell:<{sizes[i]}}" for i, cell in enumerate(row))
|
|
|
|
|
for row in (
|
|
|
|
|
newrow
|
|
|
|
|
for row in rows
|
|
|
|
|
for row in rows[1:]
|
|
|
|
|
for newrow in self._sanitize_row(sizes, row)
|
|
|
|
|
)
|
|
|
|
|
),
|
|
|
|
@ -285,21 +254,19 @@ class Ansi:
|
|
|
|
|
a = Ansi()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def error(msg: str, code: int = 0) -> None:
|
|
|
|
|
def error(msg, code: int = 0):
|
|
|
|
|
"""output error message and if code provided exit"""
|
|
|
|
|
echo(f"{a.red}error:{a.end} {msg}", style="red")
|
|
|
|
|
if code:
|
|
|
|
|
sys.exit(code)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def warn(msg: str) -> None:
|
|
|
|
|
def warn(msg):
|
|
|
|
|
"""output warning message to stdout"""
|
|
|
|
|
echo(f"{a.yellow}warn:{a.end} {msg}", style="yellow")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def echo(
|
|
|
|
|
msg: str, style: str = "magenta", newline: bool = True, fd: TextIO = sys.stderr
|
|
|
|
|
) -> None:
|
|
|
|
|
def echo(msg: str, style="magenta", newline=True, fd=sys.stderr) -> None:
|
|
|
|
|
"""output general message to stdout"""
|
|
|
|
|
output = f"{a.cyan}Viv{a.end}{a.__dict__[style]}::{a.end} {msg}"
|
|
|
|
|
if newline:
|
|
|
|
@ -307,23 +274,13 @@ def echo(
|
|
|
|
|
fd.write(output)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def confirm(question: str) -> bool:
|
|
|
|
|
while True:
|
|
|
|
|
ans = input(question + a.style(" (Y)es/(n)o: ", "yellow")).strip().lower()
|
|
|
|
|
if ans in ("y", "yes"):
|
|
|
|
|
return True
|
|
|
|
|
elif ans in ("n", "no"):
|
|
|
|
|
return False
|
|
|
|
|
sys.stdout.write("\nPlease select (Y)es or (n)o.")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def run(
|
|
|
|
|
command: List[str],
|
|
|
|
|
command: List[str | Path],
|
|
|
|
|
spinmsg: str = "",
|
|
|
|
|
clean_up_path: Optional[Path] = None,
|
|
|
|
|
clean_up_path: Path | None = None,
|
|
|
|
|
verbose: bool = False,
|
|
|
|
|
ignore_error: bool = False,
|
|
|
|
|
check_output: bool = False,
|
|
|
|
|
check_output=False,
|
|
|
|
|
) -> str:
|
|
|
|
|
"""run a subcommand
|
|
|
|
|
|
|
|
|
@ -428,9 +385,9 @@ class ViVenv:
|
|
|
|
|
with (self.path / "pip.conf").open("w") as f:
|
|
|
|
|
f.write("[global]\ndisable-pip-version-check = true")
|
|
|
|
|
|
|
|
|
|
def install_pkgs(self) -> None:
|
|
|
|
|
cmd: List[str] = [
|
|
|
|
|
str(self.path / "bin" / "pip"),
|
|
|
|
|
def install_pkgs(self):
|
|
|
|
|
cmd: List[str | Path] = [
|
|
|
|
|
self.path / "bin" / "pip",
|
|
|
|
|
"install",
|
|
|
|
|
"--force-reinstall",
|
|
|
|
|
] + self.spec
|
|
|
|
@ -442,7 +399,7 @@ class ViVenv:
|
|
|
|
|
verbose=bool(os.getenv("VIV_VERBOSE")),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def dump_info(self, write: bool = False) -> None:
|
|
|
|
|
def dump_info(self, write=False):
|
|
|
|
|
# TODO: include associated files in 'info'
|
|
|
|
|
# means it needs to be loaded first
|
|
|
|
|
info = {
|
|
|
|
@ -460,7 +417,7 @@ class ViVenv:
|
|
|
|
|
a.table((("key", "value"), *((k, v) for k, v in info.items())))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def use(*packages: str, track_exe: bool = False, name: str = "") -> ViVenv:
|
|
|
|
|
def use(*packages: str, track_exe: bool = False, name: str = "") -> None:
|
|
|
|
|
"""create a vivenv and append to sys.path
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
@ -479,10 +436,9 @@ def use(*packages: str, track_exe: bool = False, name: str = "") -> ViVenv:
|
|
|
|
|
vivenv.dump_info(write=True)
|
|
|
|
|
|
|
|
|
|
modify_sys_path(vivenv.path)
|
|
|
|
|
return vivenv
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def validate_spec(spec: Tuple[str, ...]) -> None:
|
|
|
|
|
def validate_spec(spec):
|
|
|
|
|
"""ensure spec is at least of sequence of strings
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
@ -494,7 +450,7 @@ def validate_spec(spec: Tuple[str, ...]) -> None:
|
|
|
|
|
error(f"check your packages definitions: {spec}", code=1)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def modify_sys_path(new_path: Path) -> None:
|
|
|
|
|
def modify_sys_path(new_path: Path):
|
|
|
|
|
# remove user-site
|
|
|
|
|
for i, path in enumerate(sys.path):
|
|
|
|
|
if path == site.USER_SITE:
|
|
|
|
@ -505,7 +461,7 @@ def modify_sys_path(new_path: Path) -> None:
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_venvs() -> Dict[str, ViVenv]:
|
|
|
|
|
def get_venvs():
|
|
|
|
|
vivenvs = {}
|
|
|
|
|
for p in c.venvcache.iterdir():
|
|
|
|
|
vivenv = ViVenv.load(p.name)
|
|
|
|
@ -521,7 +477,7 @@ REL_SYS_PATH_TEMPLATE = (
|
|
|
|
|
IMPORT_TEMPLATE = """__import__("viv").use({spec}) # noqa"""
|
|
|
|
|
|
|
|
|
|
STANDALONE_TEMPLATE = r"""
|
|
|
|
|
# <<<<< auto-generated by daylinmorgan/viv v{version}
|
|
|
|
|
# <<<<< auto-generated by daylinmorgan/viv (v.22.12a3)
|
|
|
|
|
# fmt: off
|
|
|
|
|
{use}
|
|
|
|
|
# fmt: on
|
|
|
|
@ -552,12 +508,6 @@ _viv_use({spec})
|
|
|
|
|
1:
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
SHOW_TEMPLATE = f"""\
|
|
|
|
|
{a.style('Version', 'bold')}: {{version}}
|
|
|
|
|
{a.style('CLI', 'bold')}: {{cli}}
|
|
|
|
|
{a.style('Source File', 'bold')}: {{src}}
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def noqa(txt: str) -> str:
|
|
|
|
|
max_length = max(map(len, txt.splitlines()))
|
|
|
|
@ -569,14 +519,14 @@ def spec_to_import(spec: List[str]) -> None:
|
|
|
|
|
sys.stdout.write(IMPORT_TEMPLATE.format(spec=spec_str) + "\n")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def freeze_venv(spec: List[str], path: Path | None = None) -> Tuple[ViVenv, str]:
|
|
|
|
|
def freeze_venv(spec: List[str], path: Path | None = None):
|
|
|
|
|
vivenv = ViVenv(spec, track_exe=False, path=path)
|
|
|
|
|
|
|
|
|
|
vivenv.create()
|
|
|
|
|
# populate the environment for now use
|
|
|
|
|
# custom cmd since using requirements file
|
|
|
|
|
cmd = [
|
|
|
|
|
str(vivenv.path / "bin" / "pip"),
|
|
|
|
|
vivenv.path / "bin" / "pip",
|
|
|
|
|
"install",
|
|
|
|
|
"--force-reinstall",
|
|
|
|
|
] + spec
|
|
|
|
@ -584,7 +534,7 @@ def freeze_venv(spec: List[str], path: Path | None = None) -> Tuple[ViVenv, str]
|
|
|
|
|
run(cmd, spinmsg="resolving dependencies", clean_up_path=vivenv.path)
|
|
|
|
|
|
|
|
|
|
# generate a frozen environment
|
|
|
|
|
cmd = [str(vivenv.path / "bin" / "pip"), "freeze"]
|
|
|
|
|
cmd = [vivenv.path / "bin" / "pip", "freeze"]
|
|
|
|
|
resolved_spec = run(cmd, check_output=True)
|
|
|
|
|
return vivenv, resolved_spec
|
|
|
|
|
|
|
|
|
@ -592,34 +542,41 @@ def freeze_venv(spec: List[str], path: Path | None = None) -> Tuple[ViVenv, str]
|
|
|
|
|
def generate_import(
|
|
|
|
|
requirements: Path,
|
|
|
|
|
reqs: List[str],
|
|
|
|
|
vivenvs: Dict[str, ViVenv],
|
|
|
|
|
vivenvs,
|
|
|
|
|
include_path: bool,
|
|
|
|
|
keep: bool,
|
|
|
|
|
standalone: bool,
|
|
|
|
|
) -> None:
|
|
|
|
|
# TODO: make compatible with Venv class for now just use the name /tmp/
|
|
|
|
|
reqs_from_file = []
|
|
|
|
|
|
|
|
|
|
if requirements:
|
|
|
|
|
with requirements.open("r") as f:
|
|
|
|
|
reqs_from_file = f.readlines()
|
|
|
|
|
|
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
|
|
echo("generating frozen spec")
|
|
|
|
|
vivenv, resolved_spec = freeze_venv(reqs + reqs_from_file, path=Path(tmpdir))
|
|
|
|
|
|
|
|
|
|
# refactor to make the below steps context dependent with tmpdir path
|
|
|
|
|
if keep:
|
|
|
|
|
# create env again since path's are hard-coded
|
|
|
|
|
vivenv = ViVenv(resolved_spec.splitlines())
|
|
|
|
|
# TODO: remove directory if any errors occur?
|
|
|
|
|
echo("generating new vivenv")
|
|
|
|
|
vivenv, resolved_spec = freeze_venv(reqs + reqs_from_file)
|
|
|
|
|
|
|
|
|
|
if vivenv.name not in [d.name for d in c.venvcache.iterdir()] or os.getenv(
|
|
|
|
|
"VIV_FORCE"
|
|
|
|
|
):
|
|
|
|
|
vivenv.create()
|
|
|
|
|
vivenv.install_pkgs()
|
|
|
|
|
# update id and move vivenv
|
|
|
|
|
vivenv.spec = resolved_spec.splitlines()
|
|
|
|
|
vivenv.id = get_hash(resolved_spec.splitlines())
|
|
|
|
|
echo(f"updated hash -> {vivenv.id}")
|
|
|
|
|
|
|
|
|
|
if not (c.venvcache / vivenv.id).exists():
|
|
|
|
|
vivenv.path = vivenv.path.rename(c.venvcache / vivenv.id)
|
|
|
|
|
vivenv.dump_info(write=True)
|
|
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
echo("re-using existing vivenv")
|
|
|
|
|
echo("this vivenv already exists cleaning up temporary vivenv")
|
|
|
|
|
shutil.rmtree(vivenv.path)
|
|
|
|
|
else:
|
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir: #
|
|
|
|
|
echo("generating temporary vivenv ")
|
|
|
|
|
vivenv, resolved_spec = freeze_venv(
|
|
|
|
|
reqs + reqs_from_file, path=Path(tmpdir)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
echo("see below for import statements\n")
|
|
|
|
|
|
|
|
|
@ -660,10 +617,10 @@ def generate_import(
|
|
|
|
|
class CustomHelpFormatter(RawDescriptionHelpFormatter, HelpFormatter):
|
|
|
|
|
"""formatter to remove extra metavar on short opts"""
|
|
|
|
|
|
|
|
|
|
def _get_invocation_length(self, invocation: str) -> int:
|
|
|
|
|
def _get_invocation_length(self, invocation):
|
|
|
|
|
return len(a.escape(invocation))
|
|
|
|
|
|
|
|
|
|
def _format_action_invocation(self, action: Action) -> str:
|
|
|
|
|
def _format_action_invocation(self, action):
|
|
|
|
|
if not action.option_strings:
|
|
|
|
|
(metavar,) = self._metavar_formatter(action, action.dest)(1)
|
|
|
|
|
return a.style(metavar, style="option")
|
|
|
|
@ -696,7 +653,7 @@ class CustomHelpFormatter(RawDescriptionHelpFormatter, HelpFormatter):
|
|
|
|
|
parts[-1] += a.style(f" {args_string}", style="metavar")
|
|
|
|
|
return (", ").join(parts)
|
|
|
|
|
|
|
|
|
|
def _format_usage(self, *args: Any, **kwargs: Any) -> str:
|
|
|
|
|
def _format_usage(self, *args, **kwargs):
|
|
|
|
|
formatted_usage = super()._format_usage(*args, **kwargs)
|
|
|
|
|
# patch usage with color formatting
|
|
|
|
|
formatted_usage = (
|
|
|
|
@ -706,7 +663,7 @@ class CustomHelpFormatter(RawDescriptionHelpFormatter, HelpFormatter):
|
|
|
|
|
)
|
|
|
|
|
return formatted_usage
|
|
|
|
|
|
|
|
|
|
def _format_action(self, action: Action) -> str:
|
|
|
|
|
def _format_action(self, action):
|
|
|
|
|
# determine the required width and the entry label
|
|
|
|
|
help_position = min(self._action_max_length + 2, self._max_help_position)
|
|
|
|
|
help_width = max(self._width - help_position, 11)
|
|
|
|
@ -716,10 +673,11 @@ class CustomHelpFormatter(RawDescriptionHelpFormatter, HelpFormatter):
|
|
|
|
|
|
|
|
|
|
# no help; start on same line and add a final newline
|
|
|
|
|
if not action.help:
|
|
|
|
|
action_header = "%*s%s\n" % (self._current_indent, "", action_header)
|
|
|
|
|
tup = self._current_indent, "", action_header
|
|
|
|
|
action_header = "%*s%s\n" % tup
|
|
|
|
|
# short action name; start on the same line and pad two spaces
|
|
|
|
|
elif action_header_len <= action_width:
|
|
|
|
|
# tup = self._current_indent, "", action_width, action_header
|
|
|
|
|
tup = self._current_indent, "", action_width, action_header
|
|
|
|
|
action_header = (
|
|
|
|
|
f"{' '*self._current_indent}{action_header}"
|
|
|
|
|
f"{' '*(action_width+2 - action_header_len)}"
|
|
|
|
@ -728,7 +686,8 @@ class CustomHelpFormatter(RawDescriptionHelpFormatter, HelpFormatter):
|
|
|
|
|
|
|
|
|
|
# long action name; start on the next line
|
|
|
|
|
else:
|
|
|
|
|
action_header = "%*s%s\n" % (self._current_indent, "", action_header)
|
|
|
|
|
tup = self._current_indent, "", action_header
|
|
|
|
|
action_header = "%*s%s\n" % tup
|
|
|
|
|
indent_first = help_position
|
|
|
|
|
|
|
|
|
|
# collect the pieces of the action help
|
|
|
|
@ -754,13 +713,10 @@ class CustomHelpFormatter(RawDescriptionHelpFormatter, HelpFormatter):
|
|
|
|
|
# return a single string
|
|
|
|
|
return self._join_parts(parts)
|
|
|
|
|
|
|
|
|
|
def start_section(self, heading: Optional[str]) -> None:
|
|
|
|
|
if heading:
|
|
|
|
|
super().start_section(a.style(heading, style="header"))
|
|
|
|
|
else:
|
|
|
|
|
super()
|
|
|
|
|
def start_section(self, heading: str) -> None:
|
|
|
|
|
return super().start_section(a.style(heading, style="header"))
|
|
|
|
|
|
|
|
|
|
def add_argument(self, action: Action) -> None:
|
|
|
|
|
def add_argument(self, action):
|
|
|
|
|
if action.help is not SUPPRESS:
|
|
|
|
|
# find all invocations
|
|
|
|
|
get_invocation = self._format_action_invocation
|
|
|
|
@ -778,14 +734,14 @@ class CustomHelpFormatter(RawDescriptionHelpFormatter, HelpFormatter):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ArgumentParser(StdArgParser):
|
|
|
|
|
def __init__(self, *args: Any, **kwargs: Any) -> None:
|
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
|
|
|
super().__init__(*args, **kwargs)
|
|
|
|
|
|
|
|
|
|
self.formatter_class = lambda prog: CustomHelpFormatter(
|
|
|
|
|
prog, max_help_position=35
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def error(self, message: str) -> NoReturn:
|
|
|
|
|
def error(self, message):
|
|
|
|
|
error(message)
|
|
|
|
|
echo("see below for help\n", style="red")
|
|
|
|
|
self.print_help()
|
|
|
|
@ -805,12 +761,12 @@ within python script:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Viv:
|
|
|
|
|
def __init__(self) -> None:
|
|
|
|
|
def __init__(self):
|
|
|
|
|
self.vivenvs = get_venvs()
|
|
|
|
|
|
|
|
|
|
def _match_vivenv(self, name_id: str) -> ViVenv: # type: ignore[return]
|
|
|
|
|
def _match_vivenv(self, name_id: str) -> ViVenv:
|
|
|
|
|
# TODO: improve matching algorithm to favor names over id's
|
|
|
|
|
matches: List[ViVenv] = []
|
|
|
|
|
matches = []
|
|
|
|
|
for k, v in self.vivenvs.items():
|
|
|
|
|
if name_id == k or v.name == name_id:
|
|
|
|
|
matches.append(v)
|
|
|
|
@ -818,16 +774,15 @@ class Viv:
|
|
|
|
|
matches.append(v)
|
|
|
|
|
elif v.name.startswith(name_id):
|
|
|
|
|
matches.append(v)
|
|
|
|
|
|
|
|
|
|
if len(matches) == 1:
|
|
|
|
|
return matches[0]
|
|
|
|
|
if not matches:
|
|
|
|
|
error(f"no matches found for {name_id}", code=1)
|
|
|
|
|
elif len(matches) > 1:
|
|
|
|
|
echo(f"matches {','.join((match.name for match in matches))}", style="red")
|
|
|
|
|
error("too many matches maybe try a longer name?", code=1)
|
|
|
|
|
else:
|
|
|
|
|
error(f"no matches found for {name_id}", code=1)
|
|
|
|
|
return matches[0]
|
|
|
|
|
|
|
|
|
|
def remove(self, args: Namespace) -> None:
|
|
|
|
|
def remove(self, args):
|
|
|
|
|
"""\
|
|
|
|
|
remove a vivenv
|
|
|
|
|
|
|
|
|
@ -846,7 +801,7 @@ class Viv:
|
|
|
|
|
code=1,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def freeze(self, args: Namespace) -> None:
|
|
|
|
|
def freeze(self, args):
|
|
|
|
|
"""create import statement from package spec"""
|
|
|
|
|
|
|
|
|
|
if not args.reqs:
|
|
|
|
@ -862,7 +817,7 @@ class Viv:
|
|
|
|
|
args.standalone,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def list(self, args: Namespace) -> None:
|
|
|
|
|
def list(self, args):
|
|
|
|
|
"""list all vivenvs"""
|
|
|
|
|
|
|
|
|
|
if args.quiet:
|
|
|
|
@ -884,14 +839,14 @@ class Viv:
|
|
|
|
|
)
|
|
|
|
|
a.table(rows)
|
|
|
|
|
|
|
|
|
|
def exe(self, args: Namespace) -> None:
|
|
|
|
|
def exe(self, args):
|
|
|
|
|
"""run python/pip in vivenv"""
|
|
|
|
|
|
|
|
|
|
vivenv = self._match_vivenv(args.vivenv)
|
|
|
|
|
|
|
|
|
|
pip_path, python_path = (vivenv.path / "bin" / cmd for cmd in ("pip", "python"))
|
|
|
|
|
# todo check for vivenv
|
|
|
|
|
echo(f"executing command within {vivenv.name}")
|
|
|
|
|
echo(f"executing command within {args.vivenv}")
|
|
|
|
|
|
|
|
|
|
cmd = (
|
|
|
|
|
f"{pip_path} {' '.join(args.cmd)}"
|
|
|
|
@ -902,7 +857,7 @@ class Viv:
|
|
|
|
|
echo(f"executing {cmd}")
|
|
|
|
|
run(shlex.split(cmd), verbose=True)
|
|
|
|
|
|
|
|
|
|
def info(self, args: Namespace) -> None:
|
|
|
|
|
def info(self, args):
|
|
|
|
|
"""get metadata about a vivenv"""
|
|
|
|
|
vivenv = self._match_vivenv(args.vivenv)
|
|
|
|
|
metadata_file = vivenv.path / "viv-info.json"
|
|
|
|
@ -914,84 +869,10 @@ class Viv:
|
|
|
|
|
|
|
|
|
|
vivenv.dump_info()
|
|
|
|
|
|
|
|
|
|
def manage(self, args: Namespace) -> None:
|
|
|
|
|
"""manage viv installation"""
|
|
|
|
|
|
|
|
|
|
if args.cmd == "show":
|
|
|
|
|
# NOTE: could reuse the table output for this?
|
|
|
|
|
echo("Installation Info:")
|
|
|
|
|
sys.stdout.write(
|
|
|
|
|
SHOW_TEMPLATE.format(
|
|
|
|
|
version=__version__,
|
|
|
|
|
cli=shutil.which("viv"),
|
|
|
|
|
src=Path(__file__).resolve(),
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
elif args.cmd == "update":
|
|
|
|
|
echo(__file__)
|
|
|
|
|
echo(Path(__file__))
|
|
|
|
|
if str(Path(__file__).resolve()).startswith("/proc/"):
|
|
|
|
|
error(
|
|
|
|
|
a.style("viv manage update", "bold")
|
|
|
|
|
+ "should be used with a locally installed viv",
|
|
|
|
|
1,
|
|
|
|
|
)
|
|
|
|
|
try:
|
|
|
|
|
r = urlopen(
|
|
|
|
|
"https://raw.githubusercontent.com/daylinmorgan/viv/"
|
|
|
|
|
+ args.reference
|
|
|
|
|
+ "/src/viv/viv.py"
|
|
|
|
|
)
|
|
|
|
|
except HTTPError as e:
|
|
|
|
|
error(
|
|
|
|
|
"Issue updating viv see below:"
|
|
|
|
|
+ a.style("-> ", "red").join(["\n"] + repr(e).splitlines())
|
|
|
|
|
)
|
|
|
|
|
if "404" in repr(e):
|
|
|
|
|
echo("Please check your reference is valid.", style="red")
|
|
|
|
|
sys.exit(1)
|
|
|
|
|
|
|
|
|
|
viv_src = r.read()
|
|
|
|
|
|
|
|
|
|
(hash := hashlib.sha256()).update(viv_src)
|
|
|
|
|
sha256 = hash.hexdigest()
|
|
|
|
|
|
|
|
|
|
(
|
|
|
|
|
src_cache := Path(os.getenv("XDG_CACHE_HOME", Path.home() / ".cache"))
|
|
|
|
|
/ "viv"
|
|
|
|
|
/ "src"
|
|
|
|
|
).mkdir(exist_ok=True, parents=True)
|
|
|
|
|
cached_version = src_cache / f"{sha256}.py"
|
|
|
|
|
|
|
|
|
|
if not cached_version.is_file():
|
|
|
|
|
with cached_version.open("w") as f:
|
|
|
|
|
f.write(viv_src.decode())
|
|
|
|
|
|
|
|
|
|
sys.path.append(str(src_cache))
|
|
|
|
|
next_version = __import__(sha256).__version__
|
|
|
|
|
q = (
|
|
|
|
|
"Update source at: "
|
|
|
|
|
+ a.style(Path(__file__).resolve(), "bold")
|
|
|
|
|
+ f" \n from version {__version__} to {next_version}?"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if confirm(q):
|
|
|
|
|
print("AWAY THEN")
|
|
|
|
|
|
|
|
|
|
elif args.cmd == "install":
|
|
|
|
|
echo("not yet implemented. sorry")
|
|
|
|
|
|
|
|
|
|
def _get_subcmd_parser(
|
|
|
|
|
self,
|
|
|
|
|
subparsers: _SubParsersAction[ArgumentParser],
|
|
|
|
|
name: str,
|
|
|
|
|
attr: Optional[str] = None,
|
|
|
|
|
**kwargs: Any,
|
|
|
|
|
) -> ArgumentParser:
|
|
|
|
|
def _get_subcmd_parser(self, subparsers, name: str, **kwargs) -> ArgumentParser:
|
|
|
|
|
aliases = kwargs.pop("aliases", [name[0]])
|
|
|
|
|
cmd = getattr(self, attr if attr else name)
|
|
|
|
|
parser: ArgumentParser = subparsers.add_parser(
|
|
|
|
|
cmd = getattr(self, name)
|
|
|
|
|
parser = subparsers.add_parser(
|
|
|
|
|
name,
|
|
|
|
|
help=cmd.__doc__.splitlines()[0],
|
|
|
|
|
description=dedent(cmd.__doc__),
|
|
|
|
@ -1002,7 +883,7 @@ class Viv:
|
|
|
|
|
|
|
|
|
|
return parser
|
|
|
|
|
|
|
|
|
|
def cli(self) -> None:
|
|
|
|
|
def cli(self):
|
|
|
|
|
"""cli entrypoint"""
|
|
|
|
|
|
|
|
|
|
parser = ArgumentParser(description=description)
|
|
|
|
@ -1046,15 +927,16 @@ class Viv:
|
|
|
|
|
nargs="*",
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
p_exe_sub.add_parser(
|
|
|
|
|
p_exe_python = p_exe_sub.add_parser(
|
|
|
|
|
"python",
|
|
|
|
|
help="run command with python",
|
|
|
|
|
parents=[p_vivenv_arg, p_exe_shared],
|
|
|
|
|
).set_defaults(func=self.exe, exe="python")
|
|
|
|
|
|
|
|
|
|
p_exe_sub.add_parser(
|
|
|
|
|
)
|
|
|
|
|
p_exe_pip = p_exe_sub.add_parser(
|
|
|
|
|
"pip", help="run command with pip", parents=[p_vivenv_arg, p_exe_shared]
|
|
|
|
|
).set_defaults(func=self.exe, exe="pip")
|
|
|
|
|
)
|
|
|
|
|
p_exe_python.set_defaults(func=self.exe, exe="python")
|
|
|
|
|
p_exe_pip.set_defaults(func=self.exe, exe="pip")
|
|
|
|
|
|
|
|
|
|
p_remove = self._get_subcmd_parser(
|
|
|
|
|
subparsers,
|
|
|
|
@ -1099,39 +981,12 @@ class Viv:
|
|
|
|
|
parents=[p_vivenv_arg],
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
p_manage_sub = self._get_subcmd_parser(
|
|
|
|
|
subparsers, name="manage"
|
|
|
|
|
).add_subparsers(title="subcommand", metavar="<sub-cmd>", required=True)
|
|
|
|
|
|
|
|
|
|
p_manage_sub.add_parser(
|
|
|
|
|
"install",
|
|
|
|
|
help="install viv",
|
|
|
|
|
).set_defaults(func=self.manage, cmd="install")
|
|
|
|
|
|
|
|
|
|
(
|
|
|
|
|
p_manage_update := p_manage_sub.add_parser(
|
|
|
|
|
"update",
|
|
|
|
|
help="update viv version",
|
|
|
|
|
)
|
|
|
|
|
).set_defaults(func=self.manage, cmd="update")
|
|
|
|
|
|
|
|
|
|
p_manage_update.add_argument(
|
|
|
|
|
"-r",
|
|
|
|
|
"--reference",
|
|
|
|
|
help="git reference (branch/tag/commit)",
|
|
|
|
|
default="main",
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
p_manage_sub.add_parser(
|
|
|
|
|
"show", help="show current installation info"
|
|
|
|
|
).set_defaults(func=self.manage, cmd="show")
|
|
|
|
|
|
|
|
|
|
args = parser.parse_args()
|
|
|
|
|
|
|
|
|
|
args.func(args)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main() -> None:
|
|
|
|
|
def main():
|
|
|
|
|
viv = Viv()
|
|
|
|
|
viv.cli()
|
|
|
|
|
|
|
|
|
|