Compare commits

...

12 commits

8 changed files with 312 additions and 138 deletions

2
.gitignore vendored
View file

@ -111,7 +111,7 @@ ipython_config.py
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it # pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control. # in version control.
# https://pdm.fming.dev/#use-with-ide # https://pdm.fming.dev/#use-with-ide
.pdm.toml .pdm-python
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm # PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/ __pypackages__/

View file

@ -1,25 +1,16 @@
# See https://pre-commit.com for more information # See https://pre-commit.com for more information
# See https://pre-commit.com/hooks.html for more hooks # See https://pre-commit.com/hooks.html for more hooks
repos: repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.4.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-added-large-files
- repo: https://github.com/pycqa/isort
rev: 5.12.0
hooks:
- id: isort
- repo: https://github.com/psf/black - repo: https://github.com/psf/black
rev: 23.1.0 rev: 23.3.0
hooks: hooks:
- id: black - id: black
language_version: python language_version: python
- repo: https://github.com/charliermarsh/ruff-pre-commit - repo: https://github.com/charliermarsh/ruff-pre-commit
rev: 'v0.0.245' rev: 'v0.0.270'
hooks: hooks:
- id: ruff - id: ruff
args: [--fix, --exit-non-zero-on-fix, --show-fixes]
- repo: local - repo: local
hooks: hooks:
- id: set-version - id: set-version

View file

@ -91,9 +91,10 @@ It can be auto-generated with for example: `viv freeze <spec> --standalone`.
The only part necessary to modify if copied verbatim from below is the call to `_viv_use`. The only part necessary to modify if copied verbatim from below is the call to `_viv_use`.
output of `viv freeze rich --standalone`: Output of `viv freeze rich --standalone`:
```python ```python
# <<<<< auto-generated by daylinmorgan/viv (v.22.12a3) # <<<<< auto-generated by daylinmorgan/viv (v22.12a3-35-g0d0c66d-dev)
# fmt: off # fmt: off
def _viv_use(*pkgs: str, track_exe: bool = False, name: str = "") -> None: # noqa def _viv_use(*pkgs: str, track_exe: bool = False, name: str = "") -> None: # noqa
i,s,m,e,spec=__import__,str,map,lambda x: True if x else False,[*pkgs] # noqa i,s,m,e,spec=__import__,str,map,lambda x: True if x else False,[*pkgs] # noqa
@ -116,6 +117,8 @@ def _viv_use(*pkgs: str, track_exe: bool = False, name: str = "") -> None:
_viv_use("markdown-it-py==2.2.0", "mdurl==0.1.2", "Pygments==2.14.0", "rich==13.3.2") # noqa _viv_use("markdown-it-py==2.2.0", "mdurl==0.1.2", "Pygments==2.14.0", "rich==13.3.2") # noqa
# fmt: on # fmt: on
# >>>>> code golfed with <3 # >>>>> code golfed with <3
``` ```
## Alternatives ## Alternatives

33
examples/black Executable file
View file

@ -0,0 +1,33 @@
#!/usr/bin/env python3
"""
Proof of concept output for a 'shim generator'
that would essentially emulate pipx
A possible cli signature
viv shim black -o ~/bin/black
"""
import sys
sys.path.append("/home/daylin/.viv/src") # noqa
import subprocess
import viv
if __name__ == "__main__":
vivenv = viv.use(
"black==23.3.0",
"click==8.1.3",
"mypy-extensions==1.0.0",
"packaging==23.1",
"pathspec==0.11.1",
"platformdirs==3.5.1",
)
sys.exit(
subprocess.run(
[
vivenv.path / "bin" / "black",
*sys.argv[1:],
]
).returncode
)

View file

@ -5,7 +5,7 @@ With this function it's not necessary for
`viv` to exist anywhere on the system. `viv` to exist anywhere on the system.
""" """
# <<<<< auto-generated by daylinmorgan/viv (v.22.12a3) # <<<<< auto-generated by daylinmorgan/viv (v22.12a3-35-g0d0c66d-dev)
# fmt: off # fmt: off
def _viv_use(*pkgs: str, track_exe: bool = False, name: str = "") -> None: # noqa def _viv_use(*pkgs: str, track_exe: bool = False, name: str = "") -> None: # noqa
i,s,m,e,spec=__import__,str,map,lambda x: True if x else False,[*pkgs] # noqa i,s,m,e,spec=__import__,str,map,lambda x: True if x else False,[*pkgs] # noqa

View file

@ -34,4 +34,5 @@ ignore = ["E402"]
[tool.mypy] [tool.mypy]
warn_return_any = true warn_return_any = true
check_untyped_defs = true check_untyped_defs = true
disallow_untyped_defs = true
warn_unused_configs = true warn_unused_configs = true

View file

@ -6,6 +6,8 @@
__import__("viv").use("requests", "bs4") __import__("viv").use("requests", "bs4")
""" """
from __future__ import annotations
import hashlib import hashlib
import itertools import itertools
import json import json
@ -19,18 +21,37 @@ import sys
import tempfile import tempfile
import threading import threading
import time import time
from urllib.request import urlopen
from urllib.error import HTTPError
import venv import venv
from argparse import SUPPRESS from argparse import SUPPRESS, Action
from argparse import ArgumentParser as StdArgParser from argparse import ArgumentParser as StdArgParser
from argparse import HelpFormatter, RawDescriptionHelpFormatter from argparse import (
HelpFormatter,
Namespace,
RawDescriptionHelpFormatter,
_SubParsersAction,
)
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime from datetime import datetime
from itertools import zip_longest from itertools import zip_longest
from pathlib import Path from pathlib import Path
from textwrap import dedent, wrap from textwrap import dedent, wrap
from typing import Dict, List, Tuple from types import TracebackType
from typing import (
Any,
Dict,
List,
NoReturn,
Optional,
Sequence,
TextIO,
Tuple,
Type,
Generator,
)
__version__ = "22.12a3-35-g0d0c66d-dev" __version__ = "22.12a3-48-gf87aab9-dev"
@dataclass @dataclass
@ -41,7 +62,7 @@ class Config:
Path(os.getenv("XDG_CACHE_HOME", Path.home() / ".cache")) / "viv" / "venvs" Path(os.getenv("XDG_CACHE_HOME", Path.home() / ".cache")) / "viv" / "venvs"
) )
def __post_init__(self): def __post_init__(self) -> None:
self.venvcache.mkdir(parents=True, exist_ok=True) self.venvcache.mkdir(parents=True, exist_ok=True)
@ -53,7 +74,7 @@ class Spinner:
https://raw.githubusercontent.com/Tagar/stuff/master/spinner.py https://raw.githubusercontent.com/Tagar/stuff/master/spinner.py
""" """
def __init__(self, message, delay=0.1): def __init__(self, message: str, delay: float = 0.1) -> None:
self.spinner = itertools.cycle([f"{c} " for c in "⣾⣽⣻⢿⡿⣟⣯⣷"]) self.spinner = itertools.cycle([f"{c} " for c in "⣾⣽⣻⢿⡿⣟⣯⣷"])
self.delay = delay self.delay = delay
self.busy = False self.busy = False
@ -62,40 +83,42 @@ class Spinner:
# sys.stdout.write(message) # sys.stdout.write(message)
echo(message + " ", newline=False) echo(message + " ", newline=False)
def write_next(self): def write_next(self) -> None:
with self._screen_lock: with self._screen_lock:
if not self.spinner_visible: if not self.spinner_visible:
sys.stderr.write(next(self.spinner)) sys.stderr.write(next(self.spinner))
self.spinner_visible = True self.spinner_visible = True
sys.stderr.flush() sys.stderr.flush()
def remove_spinner(self, cleanup=False): def remove_spinner(self, cleanup: bool = False) -> None:
with self._screen_lock: with self._screen_lock:
if self.spinner_visible: if self.spinner_visible:
sys.stderr.write("\b\b\b") sys.stderr.write("\b\b\b")
# sys.stdout.write("\b")
self.spinner_visible = False self.spinner_visible = False
if cleanup: if cleanup:
sys.stderr.write(" ") # overwrite spinner with blank sys.stderr.write(" ") # overwrite spinner with blank
# sys.stdout.write("\r") # move to next line
# move back then delete the line
sys.stderr.write("\r\033[K") sys.stderr.write("\r\033[K")
sys.stderr.flush() sys.stderr.flush()
def spinner_task(self): def spinner_task(self) -> None:
while self.busy: while self.busy:
self.write_next() self.write_next()
time.sleep(self.delay) time.sleep(self.delay)
self.remove_spinner() self.remove_spinner()
def __enter__(self): def __enter__(self) -> None:
if sys.stderr.isatty(): if sys.stderr.isatty():
self._screen_lock = threading.Lock() self._screen_lock = threading.Lock()
self.busy = True self.busy = True
self.thread = threading.Thread(target=self.spinner_task) self.thread = threading.Thread(target=self.spinner_task)
self.thread.start() self.thread.start()
def __exit__(self, exc_type, exc_val, exc_traceback): # noqa def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_traceback: Optional[TracebackType],
) -> None:
if sys.stderr.isatty(): if sys.stderr.isatty():
self.busy = False self.busy = False
self.remove_spinner(cleanup=True) self.remove_spinner(cleanup=True)
@ -133,7 +156,7 @@ class Ansi:
option: str = yellow option: str = yellow
metavar: str = "\033[33m" # normal yellow metavar: str = "\033[33m" # normal yellow
def __post_init__(self): def __post_init__(self) -> None:
if os.getenv("NO_COLOR") or not sys.stderr.isatty(): if os.getenv("NO_COLOR") or not sys.stderr.isatty():
for attr in self.__dict__: for attr in self.__dict__:
setattr(self, attr, "") setattr(self, attr, "")
@ -166,7 +189,7 @@ class Ansi:
""" """
return f"{getattr(self,style)}{txt}{getattr(self,'end')}" return f"{getattr(self,style)}{txt}{getattr(self,'end')}"
def tagline(self): def tagline(self) -> str:
"""generate the viv tagline!""" """generate the viv tagline!"""
return " ".join( return " ".join(
@ -176,70 +199,78 @@ class Ansi:
) )
) )
def subprocess(self, output): def subprocess(self, output: str) -> None:
"""generate output for subprocess error """generate output for subprocess error
Args: Args:
output: text output from subprocess, usually from p.stdout output: text output from subprocess, usually from p.stdout
""" """
echo("subprocess output:")
new_output = [f"{self.red}->{self.end} {line}" for line in output.splitlines()] new_output = [f"{self.red}->{self.end} {line}" for line in output.splitlines()]
sys.stdout.write("\n".join(new_output) + "\n") sys.stdout.write("\n".join(new_output) + "\n")
def _get_column_size(self, sizes, row): def _get_column_sizes(
for i, length in enumerate(len(cell) for cell in row): self, rows: Tuple[Tuple[str, Sequence[str]], ...]
if length > sizes[i]: ) -> List[int]:
sizes[i] = length """convert list of rows to list of columns sizes
return sizes
def _make_row(self, row) -> str: First convert rows into list of columns,
then get max string length for each column.
"""
return list(max(map(len, lst)) for lst in map(list, zip(*rows))) # type: ignore
def _make_row(self, row: Generator[Any, None, None]) -> str:
return f" {BOX['v']} " + f" {BOX['sep']} ".join(row) + f" {BOX['v']}" return f" {BOX['v']} " + f" {BOX['sep']} ".join(row) + f" {BOX['v']}"
def _sanitize_row(self, sizes: List[int], row: Tuple[str]) -> Tuple[Tuple[str]]: def _sanitize_row(
self, sizes: List[int], row: Tuple[str, Sequence[str]]
) -> Tuple[Tuple[str, Sequence[str]], ...]:
if len(row[1]) > sizes[1]: if len(row[1]) > sizes[1]:
return zip_longest( return tuple(
zip_longest(
(row[0],), (row[0],),
wrap(row[1], break_on_hyphens=False, width=sizes[1]), wrap(str(row[1]), break_on_hyphens=False, width=sizes[1]),
fillvalue="", fillvalue="",
) )
)
else: else:
return (row,) return (row,)
def table(self, rows, header_style="cyan") -> None: def table(
"""generate a table with outline and styled header self, rows: Tuple[Tuple[str, Sequence[str]], ...], header_style: str = "cyan"
) -> None:
"""generate a table with outline and styled header assumes two columns
Args: Args:
rows: sequence of the rows, first item assumed to be header rows: sequence of the rows, first item assumed to be header
header_style: color/style for header row header_style: color/style for header row
""" """
sizes = [0] * len(rows[0]) sizes = self._get_column_sizes(rows)
for row in rows:
sizes = self._get_column_size(sizes, row)
col1_limit = shutil.get_terminal_size().columns - 20 col2_limit = shutil.get_terminal_size().columns - 20
if col1_limit < 20: if col2_limit < 20:
error("increase screen size to view table", code=1) error("increase screen size to view table", code=1)
elif sizes[1] > col2_limit:
sizes[1] = col2_limit
if sizes[1] > col1_limit: header, rows = rows[0], rows[1:]
sizes[1] = col1_limit
# this is maybe taking comprehensions too far.... # this is maybe taking comprehensions too far....
table_rows = ( table_rows = (
self._make_row(row) self._make_row(row)
for row in ( for row in (
# header row # header row
( (
self.__dict__[header_style] + f"{cell:<{sizes[i]}}" + self.end self.__dict__[header_style] + f"{cell:<{sizes[i]}}" + self.end
for i, cell in enumerate(rows[0]) for i, cell in enumerate(header)
), ),
# rest of the rows
*( *(
(f"{cell:<{sizes[i]}}" for i, cell in enumerate(row)) (f"{cell:<{sizes[i]}}" for i, cell in enumerate(row))
for row in ( for row in (
newrow newrow
for row in rows[1:] for row in rows
for newrow in self._sanitize_row(sizes, row) for newrow in self._sanitize_row(sizes, row)
) )
), ),
@ -254,19 +285,21 @@ class Ansi:
a = Ansi() a = Ansi()
def error(msg, code: int = 0): def error(msg: str, code: int = 0) -> None:
"""output error message and if code provided exit""" """output error message and if code provided exit"""
echo(f"{a.red}error:{a.end} {msg}", style="red") echo(f"{a.red}error:{a.end} {msg}", style="red")
if code: if code:
sys.exit(code) sys.exit(code)
def warn(msg): def warn(msg: str) -> None:
"""output warning message to stdout""" """output warning message to stdout"""
echo(f"{a.yellow}warn:{a.end} {msg}", style="yellow") echo(f"{a.yellow}warn:{a.end} {msg}", style="yellow")
def echo(msg: str, style="magenta", newline=True, fd=sys.stderr) -> None: def echo(
msg: str, style: str = "magenta", newline: bool = True, fd: TextIO = sys.stderr
) -> None:
"""output general message to stdout""" """output general message to stdout"""
output = f"{a.cyan}Viv{a.end}{a.__dict__[style]}::{a.end} {msg}" output = f"{a.cyan}Viv{a.end}{a.__dict__[style]}::{a.end} {msg}"
if newline: if newline:
@ -274,13 +307,23 @@ def echo(msg: str, style="magenta", newline=True, fd=sys.stderr) -> None:
fd.write(output) fd.write(output)
def confirm(question: str) -> bool:
while True:
ans = input(question + a.style(" (Y)es/(n)o: ", "yellow")).strip().lower()
if ans in ("y", "yes"):
return True
elif ans in ("n", "no"):
return False
sys.stdout.write("\nPlease select (Y)es or (n)o.")
def run( def run(
command: List[str | Path], command: List[str],
spinmsg: str = "", spinmsg: str = "",
clean_up_path: Path | None = None, clean_up_path: Optional[Path] = None,
verbose: bool = False, verbose: bool = False,
ignore_error: bool = False, ignore_error: bool = False,
check_output=False, check_output: bool = False,
) -> str: ) -> str:
"""run a subcommand """run a subcommand
@ -385,9 +428,9 @@ class ViVenv:
with (self.path / "pip.conf").open("w") as f: with (self.path / "pip.conf").open("w") as f:
f.write("[global]\ndisable-pip-version-check = true") f.write("[global]\ndisable-pip-version-check = true")
def install_pkgs(self): def install_pkgs(self) -> None:
cmd: List[str | Path] = [ cmd: List[str] = [
self.path / "bin" / "pip", str(self.path / "bin" / "pip"),
"install", "install",
"--force-reinstall", "--force-reinstall",
] + self.spec ] + self.spec
@ -399,7 +442,7 @@ class ViVenv:
verbose=bool(os.getenv("VIV_VERBOSE")), verbose=bool(os.getenv("VIV_VERBOSE")),
) )
def dump_info(self, write=False): def dump_info(self, write: bool = False) -> None:
# TODO: include associated files in 'info' # TODO: include associated files in 'info'
# means it needs to be loaded first # means it needs to be loaded first
info = { info = {
@ -417,7 +460,7 @@ class ViVenv:
a.table((("key", "value"), *((k, v) for k, v in info.items()))) a.table((("key", "value"), *((k, v) for k, v in info.items())))
def use(*packages: str, track_exe: bool = False, name: str = "") -> None: def use(*packages: str, track_exe: bool = False, name: str = "") -> ViVenv:
"""create a vivenv and append to sys.path """create a vivenv and append to sys.path
Args: Args:
@ -436,9 +479,10 @@ def use(*packages: str, track_exe: bool = False, name: str = "") -> None:
vivenv.dump_info(write=True) vivenv.dump_info(write=True)
modify_sys_path(vivenv.path) modify_sys_path(vivenv.path)
return vivenv
def validate_spec(spec): def validate_spec(spec: Tuple[str, ...]) -> None:
"""ensure spec is at least of sequence of strings """ensure spec is at least of sequence of strings
Args: Args:
@ -450,7 +494,7 @@ def validate_spec(spec):
error(f"check your packages definitions: {spec}", code=1) error(f"check your packages definitions: {spec}", code=1)
def modify_sys_path(new_path: Path): def modify_sys_path(new_path: Path) -> None:
# remove user-site # remove user-site
for i, path in enumerate(sys.path): for i, path in enumerate(sys.path):
if path == site.USER_SITE: if path == site.USER_SITE:
@ -461,7 +505,7 @@ def modify_sys_path(new_path: Path):
) )
def get_venvs(): def get_venvs() -> Dict[str, ViVenv]:
vivenvs = {} vivenvs = {}
for p in c.venvcache.iterdir(): for p in c.venvcache.iterdir():
vivenv = ViVenv.load(p.name) vivenv = ViVenv.load(p.name)
@ -477,7 +521,7 @@ REL_SYS_PATH_TEMPLATE = (
IMPORT_TEMPLATE = """__import__("viv").use({spec}) # noqa""" IMPORT_TEMPLATE = """__import__("viv").use({spec}) # noqa"""
STANDALONE_TEMPLATE = r""" STANDALONE_TEMPLATE = r"""
# <<<<< auto-generated by daylinmorgan/viv (v.22.12a3) # <<<<< auto-generated by daylinmorgan/viv v{version}
# fmt: off # fmt: off
{use} {use}
# fmt: on # fmt: on
@ -508,6 +552,12 @@ _viv_use({spec})
1: 1:
] ]
SHOW_TEMPLATE = f"""\
{a.style('Version', 'bold')}: {{version}}
{a.style('CLI', 'bold')}: {{cli}}
{a.style('Source File', 'bold')}: {{src}}
"""
def noqa(txt: str) -> str: def noqa(txt: str) -> str:
max_length = max(map(len, txt.splitlines())) max_length = max(map(len, txt.splitlines()))
@ -519,14 +569,14 @@ def spec_to_import(spec: List[str]) -> None:
sys.stdout.write(IMPORT_TEMPLATE.format(spec=spec_str) + "\n") sys.stdout.write(IMPORT_TEMPLATE.format(spec=spec_str) + "\n")
def freeze_venv(spec: List[str], path: Path | None = None): def freeze_venv(spec: List[str], path: Path | None = None) -> Tuple[ViVenv, str]:
vivenv = ViVenv(spec, track_exe=False, path=path) vivenv = ViVenv(spec, track_exe=False, path=path)
vivenv.create() vivenv.create()
# populate the environment for now use # populate the environment for now use
# custom cmd since using requirements file # custom cmd since using requirements file
cmd = [ cmd = [
vivenv.path / "bin" / "pip", str(vivenv.path / "bin" / "pip"),
"install", "install",
"--force-reinstall", "--force-reinstall",
] + spec ] + spec
@ -534,7 +584,7 @@ def freeze_venv(spec: List[str], path: Path | None = None):
run(cmd, spinmsg="resolving dependencies", clean_up_path=vivenv.path) run(cmd, spinmsg="resolving dependencies", clean_up_path=vivenv.path)
# generate a frozen environment # generate a frozen environment
cmd = [vivenv.path / "bin" / "pip", "freeze"] cmd = [str(vivenv.path / "bin" / "pip"), "freeze"]
resolved_spec = run(cmd, check_output=True) resolved_spec = run(cmd, check_output=True)
return vivenv, resolved_spec return vivenv, resolved_spec
@ -542,41 +592,34 @@ def freeze_venv(spec: List[str], path: Path | None = None):
def generate_import( def generate_import(
requirements: Path, requirements: Path,
reqs: List[str], reqs: List[str],
vivenvs, vivenvs: Dict[str, ViVenv],
include_path: bool, include_path: bool,
keep: bool, keep: bool,
standalone: bool, standalone: bool,
) -> None: ) -> None:
# TODO: make compatible with Venv class for now just use the name /tmp/
reqs_from_file = [] reqs_from_file = []
if requirements: if requirements:
with requirements.open("r") as f: with requirements.open("r") as f:
reqs_from_file = f.readlines() reqs_from_file = f.readlines()
# refactor to make the below steps context dependent with tmpdir path with tempfile.TemporaryDirectory() as tmpdir:
echo("generating frozen spec")
vivenv, resolved_spec = freeze_venv(reqs + reqs_from_file, path=Path(tmpdir))
if keep: if keep:
# TODO: remove directory if any errors occur? # create env again since path's are hard-coded
echo("generating new vivenv") vivenv = ViVenv(resolved_spec.splitlines())
vivenv, resolved_spec = freeze_venv(reqs + reqs_from_file)
# update id and move vivenv if vivenv.name not in [d.name for d in c.venvcache.iterdir()] or os.getenv(
vivenv.spec = resolved_spec.splitlines() "VIV_FORCE"
vivenv.id = get_hash(resolved_spec.splitlines()) ):
echo(f"updated hash -> {vivenv.id}") vivenv.create()
vivenv.install_pkgs()
if not (c.venvcache / vivenv.id).exists():
vivenv.path = vivenv.path.rename(c.venvcache / vivenv.id)
vivenv.dump_info(write=True) vivenv.dump_info(write=True)
else: else:
echo("this vivenv already exists cleaning up temporary vivenv") echo("re-using existing vivenv")
shutil.rmtree(vivenv.path)
else:
with tempfile.TemporaryDirectory() as tmpdir: #
echo("generating temporary vivenv ")
vivenv, resolved_spec = freeze_venv(
reqs + reqs_from_file, path=Path(tmpdir)
)
echo("see below for import statements\n") echo("see below for import statements\n")
@ -617,10 +660,10 @@ def generate_import(
class CustomHelpFormatter(RawDescriptionHelpFormatter, HelpFormatter): class CustomHelpFormatter(RawDescriptionHelpFormatter, HelpFormatter):
"""formatter to remove extra metavar on short opts""" """formatter to remove extra metavar on short opts"""
def _get_invocation_length(self, invocation): def _get_invocation_length(self, invocation: str) -> int:
return len(a.escape(invocation)) return len(a.escape(invocation))
def _format_action_invocation(self, action): def _format_action_invocation(self, action: Action) -> str:
if not action.option_strings: if not action.option_strings:
(metavar,) = self._metavar_formatter(action, action.dest)(1) (metavar,) = self._metavar_formatter(action, action.dest)(1)
return a.style(metavar, style="option") return a.style(metavar, style="option")
@ -653,7 +696,7 @@ class CustomHelpFormatter(RawDescriptionHelpFormatter, HelpFormatter):
parts[-1] += a.style(f" {args_string}", style="metavar") parts[-1] += a.style(f" {args_string}", style="metavar")
return (", ").join(parts) return (", ").join(parts)
def _format_usage(self, *args, **kwargs): def _format_usage(self, *args: Any, **kwargs: Any) -> str:
formatted_usage = super()._format_usage(*args, **kwargs) formatted_usage = super()._format_usage(*args, **kwargs)
# patch usage with color formatting # patch usage with color formatting
formatted_usage = ( formatted_usage = (
@ -663,7 +706,7 @@ class CustomHelpFormatter(RawDescriptionHelpFormatter, HelpFormatter):
) )
return formatted_usage return formatted_usage
def _format_action(self, action): def _format_action(self, action: Action) -> str:
# determine the required width and the entry label # determine the required width and the entry label
help_position = min(self._action_max_length + 2, self._max_help_position) help_position = min(self._action_max_length + 2, self._max_help_position)
help_width = max(self._width - help_position, 11) help_width = max(self._width - help_position, 11)
@ -673,11 +716,10 @@ class CustomHelpFormatter(RawDescriptionHelpFormatter, HelpFormatter):
# no help; start on same line and add a final newline # no help; start on same line and add a final newline
if not action.help: if not action.help:
tup = self._current_indent, "", action_header action_header = "%*s%s\n" % (self._current_indent, "", action_header)
action_header = "%*s%s\n" % tup
# short action name; start on the same line and pad two spaces # short action name; start on the same line and pad two spaces
elif action_header_len <= action_width: elif action_header_len <= action_width:
tup = self._current_indent, "", action_width, action_header # tup = self._current_indent, "", action_width, action_header
action_header = ( action_header = (
f"{' '*self._current_indent}{action_header}" f"{' '*self._current_indent}{action_header}"
f"{' '*(action_width+2 - action_header_len)}" f"{' '*(action_width+2 - action_header_len)}"
@ -686,8 +728,7 @@ class CustomHelpFormatter(RawDescriptionHelpFormatter, HelpFormatter):
# long action name; start on the next line # long action name; start on the next line
else: else:
tup = self._current_indent, "", action_header action_header = "%*s%s\n" % (self._current_indent, "", action_header)
action_header = "%*s%s\n" % tup
indent_first = help_position indent_first = help_position
# collect the pieces of the action help # collect the pieces of the action help
@ -713,10 +754,13 @@ class CustomHelpFormatter(RawDescriptionHelpFormatter, HelpFormatter):
# return a single string # return a single string
return self._join_parts(parts) return self._join_parts(parts)
def start_section(self, heading: str) -> None: def start_section(self, heading: Optional[str]) -> None:
return super().start_section(a.style(heading, style="header")) if heading:
super().start_section(a.style(heading, style="header"))
else:
super()
def add_argument(self, action): def add_argument(self, action: Action) -> None:
if action.help is not SUPPRESS: if action.help is not SUPPRESS:
# find all invocations # find all invocations
get_invocation = self._format_action_invocation get_invocation = self._format_action_invocation
@ -734,14 +778,14 @@ class CustomHelpFormatter(RawDescriptionHelpFormatter, HelpFormatter):
class ArgumentParser(StdArgParser): class ArgumentParser(StdArgParser):
def __init__(self, *args, **kwargs): def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.formatter_class = lambda prog: CustomHelpFormatter( self.formatter_class = lambda prog: CustomHelpFormatter(
prog, max_help_position=35 prog, max_help_position=35
) )
def error(self, message): def error(self, message: str) -> NoReturn:
error(message) error(message)
echo("see below for help\n", style="red") echo("see below for help\n", style="red")
self.print_help() self.print_help()
@ -761,12 +805,12 @@ within python script:
class Viv: class Viv:
def __init__(self): def __init__(self) -> None:
self.vivenvs = get_venvs() self.vivenvs = get_venvs()
def _match_vivenv(self, name_id: str) -> ViVenv: def _match_vivenv(self, name_id: str) -> ViVenv: # type: ignore[return]
# TODO: improve matching algorithm to favor names over id's # TODO: improve matching algorithm to favor names over id's
matches = [] matches: List[ViVenv] = []
for k, v in self.vivenvs.items(): for k, v in self.vivenvs.items():
if name_id == k or v.name == name_id: if name_id == k or v.name == name_id:
matches.append(v) matches.append(v)
@ -774,15 +818,16 @@ class Viv:
matches.append(v) matches.append(v)
elif v.name.startswith(name_id): elif v.name.startswith(name_id):
matches.append(v) matches.append(v)
if not matches:
error(f"no matches found for {name_id}", code=1) if len(matches) == 1:
return matches[0]
elif len(matches) > 1: elif len(matches) > 1:
echo(f"matches {','.join((match.name for match in matches))}", style="red") echo(f"matches {','.join((match.name for match in matches))}", style="red")
error("too many matches maybe try a longer name?", code=1) error("too many matches maybe try a longer name?", code=1)
else: else:
return matches[0] error(f"no matches found for {name_id}", code=1)
def remove(self, args): def remove(self, args: Namespace) -> None:
"""\ """\
remove a vivenv remove a vivenv
@ -801,7 +846,7 @@ class Viv:
code=1, code=1,
) )
def freeze(self, args): def freeze(self, args: Namespace) -> None:
"""create import statement from package spec""" """create import statement from package spec"""
if not args.reqs: if not args.reqs:
@ -817,7 +862,7 @@ class Viv:
args.standalone, args.standalone,
) )
def list(self, args): def list(self, args: Namespace) -> None:
"""list all vivenvs""" """list all vivenvs"""
if args.quiet: if args.quiet:
@ -839,14 +884,14 @@ class Viv:
) )
a.table(rows) a.table(rows)
def exe(self, args): def exe(self, args: Namespace) -> None:
"""run python/pip in vivenv""" """run python/pip in vivenv"""
vivenv = self._match_vivenv(args.vivenv) vivenv = self._match_vivenv(args.vivenv)
pip_path, python_path = (vivenv.path / "bin" / cmd for cmd in ("pip", "python")) pip_path, python_path = (vivenv.path / "bin" / cmd for cmd in ("pip", "python"))
# todo check for vivenv # todo check for vivenv
echo(f"executing command within {args.vivenv}") echo(f"executing command within {vivenv.name}")
cmd = ( cmd = (
f"{pip_path} {' '.join(args.cmd)}" f"{pip_path} {' '.join(args.cmd)}"
@ -857,7 +902,7 @@ class Viv:
echo(f"executing {cmd}") echo(f"executing {cmd}")
run(shlex.split(cmd), verbose=True) run(shlex.split(cmd), verbose=True)
def info(self, args): def info(self, args: Namespace) -> None:
"""get metadata about a vivenv""" """get metadata about a vivenv"""
vivenv = self._match_vivenv(args.vivenv) vivenv = self._match_vivenv(args.vivenv)
metadata_file = vivenv.path / "viv-info.json" metadata_file = vivenv.path / "viv-info.json"
@ -869,10 +914,84 @@ class Viv:
vivenv.dump_info() vivenv.dump_info()
def _get_subcmd_parser(self, subparsers, name: str, **kwargs) -> ArgumentParser: def manage(self, args: Namespace) -> None:
"""manage viv installation"""
if args.cmd == "show":
# NOTE: could reuse the table output for this?
echo("Installation Info:")
sys.stdout.write(
SHOW_TEMPLATE.format(
version=__version__,
cli=shutil.which("viv"),
src=Path(__file__).resolve(),
)
)
elif args.cmd == "update":
echo(__file__)
echo(Path(__file__))
if str(Path(__file__).resolve()).startswith("/proc/"):
error(
a.style("viv manage update", "bold")
+ "should be used with a locally installed viv",
1,
)
try:
r = urlopen(
"https://raw.githubusercontent.com/daylinmorgan/viv/"
+ args.reference
+ "/src/viv/viv.py"
)
except HTTPError as e:
error(
"Issue updating viv see below:"
+ a.style("-> ", "red").join(["\n"] + repr(e).splitlines())
)
if "404" in repr(e):
echo("Please check your reference is valid.", style="red")
sys.exit(1)
viv_src = r.read()
(hash := hashlib.sha256()).update(viv_src)
sha256 = hash.hexdigest()
(
src_cache := Path(os.getenv("XDG_CACHE_HOME", Path.home() / ".cache"))
/ "viv"
/ "src"
).mkdir(exist_ok=True, parents=True)
cached_version = src_cache / f"{sha256}.py"
if not cached_version.is_file():
with cached_version.open("w") as f:
f.write(viv_src.decode())
sys.path.append(str(src_cache))
next_version = __import__(sha256).__version__
q = (
"Update source at: "
+ a.style(Path(__file__).resolve(), "bold")
+ f" \n from version {__version__} to {next_version}?"
)
if confirm(q):
print("AWAY THEN")
elif args.cmd == "install":
echo("not yet implemented. sorry")
def _get_subcmd_parser(
self,
subparsers: _SubParsersAction[ArgumentParser],
name: str,
attr: Optional[str] = None,
**kwargs: Any,
) -> ArgumentParser:
aliases = kwargs.pop("aliases", [name[0]]) aliases = kwargs.pop("aliases", [name[0]])
cmd = getattr(self, name) cmd = getattr(self, attr if attr else name)
parser = subparsers.add_parser( parser: ArgumentParser = subparsers.add_parser(
name, name,
help=cmd.__doc__.splitlines()[0], help=cmd.__doc__.splitlines()[0],
description=dedent(cmd.__doc__), description=dedent(cmd.__doc__),
@ -883,7 +1002,7 @@ class Viv:
return parser return parser
def cli(self): def cli(self) -> None:
"""cli entrypoint""" """cli entrypoint"""
parser = ArgumentParser(description=description) parser = ArgumentParser(description=description)
@ -927,16 +1046,15 @@ class Viv:
nargs="*", nargs="*",
) )
p_exe_python = p_exe_sub.add_parser( p_exe_sub.add_parser(
"python", "python",
help="run command with python", help="run command with python",
parents=[p_vivenv_arg, p_exe_shared], parents=[p_vivenv_arg, p_exe_shared],
) ).set_defaults(func=self.exe, exe="python")
p_exe_pip = p_exe_sub.add_parser(
p_exe_sub.add_parser(
"pip", help="run command with pip", parents=[p_vivenv_arg, p_exe_shared] "pip", help="run command with pip", parents=[p_vivenv_arg, p_exe_shared]
) ).set_defaults(func=self.exe, exe="pip")
p_exe_python.set_defaults(func=self.exe, exe="python")
p_exe_pip.set_defaults(func=self.exe, exe="pip")
p_remove = self._get_subcmd_parser( p_remove = self._get_subcmd_parser(
subparsers, subparsers,
@ -981,12 +1099,39 @@ class Viv:
parents=[p_vivenv_arg], parents=[p_vivenv_arg],
) )
p_manage_sub = self._get_subcmd_parser(
subparsers, name="manage"
).add_subparsers(title="subcommand", metavar="<sub-cmd>", required=True)
p_manage_sub.add_parser(
"install",
help="install viv",
).set_defaults(func=self.manage, cmd="install")
(
p_manage_update := p_manage_sub.add_parser(
"update",
help="update viv version",
)
).set_defaults(func=self.manage, cmd="update")
p_manage_update.add_argument(
"-r",
"--reference",
help="git reference (branch/tag/commit)",
default="main",
)
p_manage_sub.add_parser(
"show", help="show current installation info"
).set_defaults(func=self.manage, cmd="show")
args = parser.parse_args() args = parser.parse_args()
args.func(args) args.func(args)
def main(): def main() -> None:
viv = Viv() viv = Viv()
viv.cli() viv.cli()

View file

@ -5,5 +5,6 @@
- [ ] use config file (probably ini or toml for python>=3.11) - [ ] use config file (probably ini or toml for python>=3.11)
- [ ] enable a garbage collection based on time or file existence (configurable) - [ ] enable a garbage collection based on time or file existence (configurable)
- [ ] unit tests (v important) - [ ] unit tests (v important)
- [ ] update command
- [ ] add more options to filter `viv list` - [ ] add more options to filter `viv list`