Compare commits

..

No commits in common. "ed8a4458c9819f09a172629c17a90b9cd9d66c7c" and "454349ea23f8d492cfffe2600d988525e7ed71b3" have entirely different histories.

6 changed files with 122 additions and 176 deletions

2
.gitignore vendored
View file

@ -111,7 +111,7 @@ ipython_config.py
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm-python
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/

View file

@ -1,16 +1,25 @@
# See https://pre-commit.com for more information
# See https://pre-commit.com/hooks.html for more hooks
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.4.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-added-large-files
- repo: https://github.com/pycqa/isort
rev: 5.12.0
hooks:
- id: isort
- repo: https://github.com/psf/black
rev: 23.3.0
rev: 23.1.0
hooks:
- id: black
language_version: python
- repo: https://github.com/charliermarsh/ruff-pre-commit
rev: 'v0.0.270'
rev: 'v0.0.245'
hooks:
- id: ruff
args: [--fix, --exit-non-zero-on-fix, --show-fixes]
- repo: local
hooks:
- id: set-version

View file

@ -1,33 +0,0 @@
#!/usr/bin/env python3
"""
Proof of concept output for a 'shim generator'
that would essentially emulate pipx
A possible cli signature
viv shim black -o ~/bin/black
"""
import sys
sys.path.append("/home/daylin/.viv/src") # noqa
import subprocess
import viv
if __name__ == "__main__":
vivenv = viv.use(
"black==23.3.0",
"click==8.1.3",
"mypy-extensions==1.0.0",
"packaging==23.1",
"pathspec==0.11.1",
"platformdirs==3.5.1",
)
sys.exit(
subprocess.run(
[
vivenv.path / "bin" / "black",
*sys.argv[1:],
]
).returncode
)

View file

@ -34,5 +34,4 @@ ignore = ["E402"]
[tool.mypy]
warn_return_any = true
check_untyped_defs = true
disallow_untyped_defs = true
warn_unused_configs = true

View file

@ -6,8 +6,6 @@
__import__("viv").use("requests", "bs4")
"""
from __future__ import annotations
import hashlib
import itertools
import json
@ -22,34 +20,17 @@ import tempfile
import threading
import time
import venv
from argparse import SUPPRESS, Action
from argparse import SUPPRESS
from argparse import ArgumentParser as StdArgParser
from argparse import (
HelpFormatter,
Namespace,
RawDescriptionHelpFormatter,
_SubParsersAction,
)
from argparse import HelpFormatter, RawDescriptionHelpFormatter
from dataclasses import dataclass
from datetime import datetime
from itertools import zip_longest
from pathlib import Path
from textwrap import dedent, wrap
from types import TracebackType
from typing import (
Any,
Dict,
List,
NoReturn,
Optional,
Sequence,
TextIO,
Tuple,
Type,
Generator,
)
from typing import Dict, List, Tuple
__version__ = "22.12a3-41-g5c40210-dev"
__version__ = "22.12a3-37-gbfc2592-dev"
@dataclass
@ -60,7 +41,7 @@ class Config:
Path(os.getenv("XDG_CACHE_HOME", Path.home() / ".cache")) / "viv" / "venvs"
)
def __post_init__(self) -> None:
def __post_init__(self):
self.venvcache.mkdir(parents=True, exist_ok=True)
@ -72,7 +53,7 @@ class Spinner:
https://raw.githubusercontent.com/Tagar/stuff/master/spinner.py
"""
def __init__(self, message: str, delay: float = 0.1) -> None:
def __init__(self, message, delay=0.1):
self.spinner = itertools.cycle([f"{c} " for c in "⣾⣽⣻⢿⡿⣟⣯⣷"])
self.delay = delay
self.busy = False
@ -81,42 +62,40 @@ class Spinner:
# sys.stdout.write(message)
echo(message + " ", newline=False)
def write_next(self) -> None:
def write_next(self):
with self._screen_lock:
if not self.spinner_visible:
sys.stderr.write(next(self.spinner))
self.spinner_visible = True
sys.stderr.flush()
def remove_spinner(self, cleanup: bool = False) -> None:
def remove_spinner(self, cleanup=False):
with self._screen_lock:
if self.spinner_visible:
sys.stderr.write("\b\b\b")
# sys.stdout.write("\b")
self.spinner_visible = False
if cleanup:
sys.stderr.write(" ") # overwrite spinner with blank
# sys.stdout.write("\r") # move to next line
# move back then delete the line
sys.stderr.write("\r\033[K")
sys.stderr.flush()
def spinner_task(self) -> None:
def spinner_task(self):
while self.busy:
self.write_next()
time.sleep(self.delay)
self.remove_spinner()
def __enter__(self) -> None:
def __enter__(self):
if sys.stderr.isatty():
self._screen_lock = threading.Lock()
self.busy = True
self.thread = threading.Thread(target=self.spinner_task)
self.thread.start()
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_traceback: Optional[TracebackType],
) -> None:
def __exit__(self, exc_type, exc_val, exc_traceback): # noqa
if sys.stderr.isatty():
self.busy = False
self.remove_spinner(cleanup=True)
@ -154,7 +133,7 @@ class Ansi:
option: str = yellow
metavar: str = "\033[33m" # normal yellow
def __post_init__(self) -> None:
def __post_init__(self):
if os.getenv("NO_COLOR") or not sys.stderr.isatty():
for attr in self.__dict__:
setattr(self, attr, "")
@ -187,7 +166,7 @@ class Ansi:
"""
return f"{getattr(self,style)}{txt}{getattr(self,'end')}"
def tagline(self) -> str:
def tagline(self):
"""generate the viv tagline!"""
return " ".join(
@ -197,78 +176,70 @@ class Ansi:
)
)
def subprocess(self, output: str) -> None:
def subprocess(self, output):
"""generate output for subprocess error
Args:
output: text output from subprocess, usually from p.stdout
"""
echo("subprocess output:")
new_output = [f"{self.red}->{self.end} {line}" for line in output.splitlines()]
sys.stdout.write("\n".join(new_output) + "\n")
def _get_column_sizes(
self, rows: Tuple[Tuple[str, Sequence[str]], ...]
) -> List[int]:
"""convert list of rows to list of columns sizes
def _get_column_size(self, sizes, row):
for i, length in enumerate(len(cell) for cell in row):
if length > sizes[i]:
sizes[i] = length
return sizes
First convert rows into list of columns,
then get max string length for each column.
"""
return list(max(map(len, lst)) for lst in map(list, zip(*rows))) # type: ignore
def _make_row(self, row: Generator[Any, None, None]) -> str:
def _make_row(self, row) -> str:
return f" {BOX['v']} " + f" {BOX['sep']} ".join(row) + f" {BOX['v']}"
def _sanitize_row(
self, sizes: List[int], row: Tuple[str, Sequence[str]]
) -> Tuple[Tuple[str, Sequence[str]], ...]:
def _sanitize_row(self, sizes: List[int], row: Tuple[str]) -> Tuple[Tuple[str]]:
if len(row[1]) > sizes[1]:
return tuple(
zip_longest(
return zip_longest(
(row[0],),
wrap(str(row[1]), break_on_hyphens=False, width=sizes[1]),
wrap(row[1], break_on_hyphens=False, width=sizes[1]),
fillvalue="",
)
)
else:
return (row,)
def table(
self, rows: Tuple[Tuple[str, Sequence[str]], ...], header_style: str = "cyan"
) -> None:
"""generate a table with outline and styled header assumes two columns
def table(self, rows, header_style="cyan") -> None:
"""generate a table with outline and styled header
Args:
rows: sequence of the rows, first item assumed to be header
header_style: color/style for header row
"""
sizes = self._get_column_sizes(rows)
sizes = [0] * len(rows[0])
for row in rows:
sizes = self._get_column_size(sizes, row)
col2_limit = shutil.get_terminal_size().columns - 20
if col2_limit < 20:
col1_limit = shutil.get_terminal_size().columns - 20
if col1_limit < 20:
error("increase screen size to view table", code=1)
elif sizes[1] > col2_limit:
sizes[1] = col2_limit
header, rows = rows[0], rows[1:]
if sizes[1] > col1_limit:
sizes[1] = col1_limit
# this is maybe taking comprehensions too far....
table_rows = (
self._make_row(row)
for row in (
# header row
(
self.__dict__[header_style] + f"{cell:<{sizes[i]}}" + self.end
for i, cell in enumerate(header)
for i, cell in enumerate(rows[0])
),
# rest of the rows
*(
(f"{cell:<{sizes[i]}}" for i, cell in enumerate(row))
for row in (
newrow
for row in rows
for row in rows[1:]
for newrow in self._sanitize_row(sizes, row)
)
),
@ -283,21 +254,19 @@ class Ansi:
a = Ansi()
def error(msg: str, code: int = 0) -> None:
def error(msg, code: int = 0):
"""output error message and if code provided exit"""
echo(f"{a.red}error:{a.end} {msg}", style="red")
if code:
sys.exit(code)
def warn(msg: str) -> None:
def warn(msg):
"""output warning message to stdout"""
echo(f"{a.yellow}warn:{a.end} {msg}", style="yellow")
def echo(
msg: str, style: str = "magenta", newline: bool = True, fd: TextIO = sys.stderr
) -> None:
def echo(msg: str, style="magenta", newline=True, fd=sys.stderr) -> None:
"""output general message to stdout"""
output = f"{a.cyan}Viv{a.end}{a.__dict__[style]}::{a.end} {msg}"
if newline:
@ -306,12 +275,12 @@ def echo(
def run(
command: List[str],
command: List[str | Path],
spinmsg: str = "",
clean_up_path: Optional[Path] = None,
clean_up_path: Path | None = None,
verbose: bool = False,
ignore_error: bool = False,
check_output: bool = False,
check_output=False,
) -> str:
"""run a subcommand
@ -416,9 +385,9 @@ class ViVenv:
with (self.path / "pip.conf").open("w") as f:
f.write("[global]\ndisable-pip-version-check = true")
def install_pkgs(self) -> None:
cmd: List[str] = [
str(self.path / "bin" / "pip"),
def install_pkgs(self):
cmd: List[str | Path] = [
self.path / "bin" / "pip",
"install",
"--force-reinstall",
] + self.spec
@ -430,7 +399,7 @@ class ViVenv:
verbose=bool(os.getenv("VIV_VERBOSE")),
)
def dump_info(self, write: bool = False) -> None:
def dump_info(self, write=False):
# TODO: include associated files in 'info'
# means it needs to be loaded first
info = {
@ -469,7 +438,7 @@ def use(*packages: str, track_exe: bool = False, name: str = "") -> None:
modify_sys_path(vivenv.path)
def validate_spec(spec: Tuple[str, ...]) -> None:
def validate_spec(spec):
"""ensure spec is at least of sequence of strings
Args:
@ -481,7 +450,7 @@ def validate_spec(spec: Tuple[str, ...]) -> None:
error(f"check your packages definitions: {spec}", code=1)
def modify_sys_path(new_path: Path) -> None:
def modify_sys_path(new_path: Path):
# remove user-site
for i, path in enumerate(sys.path):
if path == site.USER_SITE:
@ -492,7 +461,7 @@ def modify_sys_path(new_path: Path) -> None:
)
def get_venvs() -> Dict[str, ViVenv]:
def get_venvs():
vivenvs = {}
for p in c.venvcache.iterdir():
vivenv = ViVenv.load(p.name)
@ -550,14 +519,14 @@ def spec_to_import(spec: List[str]) -> None:
sys.stdout.write(IMPORT_TEMPLATE.format(spec=spec_str) + "\n")
def freeze_venv(spec: List[str], path: Path | None = None) -> Tuple[ViVenv, str]:
def freeze_venv(spec: List[str], path: Path | None = None):
vivenv = ViVenv(spec, track_exe=False, path=path)
vivenv.create()
# populate the environment for now use
# custom cmd since using requirements file
cmd = [
str(vivenv.path / "bin" / "pip"),
vivenv.path / "bin" / "pip",
"install",
"--force-reinstall",
] + spec
@ -565,7 +534,7 @@ def freeze_venv(spec: List[str], path: Path | None = None) -> Tuple[ViVenv, str]
run(cmd, spinmsg="resolving dependencies", clean_up_path=vivenv.path)
# generate a frozen environment
cmd = [str(vivenv.path / "bin" / "pip"), "freeze"]
cmd = [vivenv.path / "bin" / "pip", "freeze"]
resolved_spec = run(cmd, check_output=True)
return vivenv, resolved_spec
@ -573,34 +542,41 @@ def freeze_venv(spec: List[str], path: Path | None = None) -> Tuple[ViVenv, str]
def generate_import(
requirements: Path,
reqs: List[str],
vivenvs: Dict[str, ViVenv],
vivenvs,
include_path: bool,
keep: bool,
standalone: bool,
) -> None:
# TODO: make compatible with Venv class for now just use the name /tmp/
reqs_from_file = []
if requirements:
with requirements.open("r") as f:
reqs_from_file = f.readlines()
with tempfile.TemporaryDirectory() as tmpdir:
echo("generating frozen spec")
vivenv, resolved_spec = freeze_venv(reqs + reqs_from_file, path=Path(tmpdir))
# refactor to make the below steps context dependent with tmpdir path
if keep:
# create env again since path's are hard-coded
vivenv = ViVenv(resolved_spec.splitlines())
# TODO: remove directory if any errors occur?
echo("generating new vivenv")
vivenv, resolved_spec = freeze_venv(reqs + reqs_from_file)
if vivenv.name not in [d.name for d in c.venvcache.iterdir()] or os.getenv(
"VIV_FORCE"
):
vivenv.create()
vivenv.install_pkgs()
# update id and move vivenv
vivenv.spec = resolved_spec.splitlines()
vivenv.id = get_hash(resolved_spec.splitlines())
echo(f"updated hash -> {vivenv.id}")
if not (c.venvcache / vivenv.id).exists():
vivenv.path = vivenv.path.rename(c.venvcache / vivenv.id)
vivenv.dump_info(write=True)
else:
echo("re-using existing vivenv")
echo("this vivenv already exists cleaning up temporary vivenv")
shutil.rmtree(vivenv.path)
else:
with tempfile.TemporaryDirectory() as tmpdir: #
echo("generating temporary vivenv ")
vivenv, resolved_spec = freeze_venv(
reqs + reqs_from_file, path=Path(tmpdir)
)
echo("see below for import statements\n")
@ -641,10 +617,10 @@ def generate_import(
class CustomHelpFormatter(RawDescriptionHelpFormatter, HelpFormatter):
"""formatter to remove extra metavar on short opts"""
def _get_invocation_length(self, invocation: str) -> int:
def _get_invocation_length(self, invocation):
return len(a.escape(invocation))
def _format_action_invocation(self, action: Action) -> str:
def _format_action_invocation(self, action):
if not action.option_strings:
(metavar,) = self._metavar_formatter(action, action.dest)(1)
return a.style(metavar, style="option")
@ -677,7 +653,7 @@ class CustomHelpFormatter(RawDescriptionHelpFormatter, HelpFormatter):
parts[-1] += a.style(f" {args_string}", style="metavar")
return (", ").join(parts)
def _format_usage(self, *args: Any, **kwargs: Any) -> str:
def _format_usage(self, *args, **kwargs):
formatted_usage = super()._format_usage(*args, **kwargs)
# patch usage with color formatting
formatted_usage = (
@ -687,7 +663,7 @@ class CustomHelpFormatter(RawDescriptionHelpFormatter, HelpFormatter):
)
return formatted_usage
def _format_action(self, action: Action) -> str:
def _format_action(self, action):
# determine the required width and the entry label
help_position = min(self._action_max_length + 2, self._max_help_position)
help_width = max(self._width - help_position, 11)
@ -697,10 +673,11 @@ class CustomHelpFormatter(RawDescriptionHelpFormatter, HelpFormatter):
# no help; start on same line and add a final newline
if not action.help:
action_header = "%*s%s\n" % (self._current_indent, "", action_header)
tup = self._current_indent, "", action_header
action_header = "%*s%s\n" % tup
# short action name; start on the same line and pad two spaces
elif action_header_len <= action_width:
# tup = self._current_indent, "", action_width, action_header
tup = self._current_indent, "", action_width, action_header
action_header = (
f"{' '*self._current_indent}{action_header}"
f"{' '*(action_width+2 - action_header_len)}"
@ -709,7 +686,8 @@ class CustomHelpFormatter(RawDescriptionHelpFormatter, HelpFormatter):
# long action name; start on the next line
else:
action_header = "%*s%s\n" % (self._current_indent, "", action_header)
tup = self._current_indent, "", action_header
action_header = "%*s%s\n" % tup
indent_first = help_position
# collect the pieces of the action help
@ -735,13 +713,10 @@ class CustomHelpFormatter(RawDescriptionHelpFormatter, HelpFormatter):
# return a single string
return self._join_parts(parts)
def start_section(self, heading: Optional[str]) -> None:
if heading:
super().start_section(a.style(heading, style="header"))
else:
super()
def start_section(self, heading: str) -> None:
return super().start_section(a.style(heading, style="header"))
def add_argument(self, action: Action) -> None:
def add_argument(self, action):
if action.help is not SUPPRESS:
# find all invocations
get_invocation = self._format_action_invocation
@ -759,14 +734,14 @@ class CustomHelpFormatter(RawDescriptionHelpFormatter, HelpFormatter):
class ArgumentParser(StdArgParser):
def __init__(self, *args: Any, **kwargs: Any) -> None:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.formatter_class = lambda prog: CustomHelpFormatter(
prog, max_help_position=35
)
def error(self, message: str) -> NoReturn:
def error(self, message):
error(message)
echo("see below for help\n", style="red")
self.print_help()
@ -786,12 +761,12 @@ within python script:
class Viv:
def __init__(self) -> None:
def __init__(self):
self.vivenvs = get_venvs()
def _match_vivenv(self, name_id: str) -> ViVenv: # type: ignore[return]
def _match_vivenv(self, name_id: str) -> ViVenv:
# TODO: improve matching algorithm to favor names over id's
matches: List[ViVenv] = []
matches = []
for k, v in self.vivenvs.items():
if name_id == k or v.name == name_id:
matches.append(v)
@ -799,16 +774,15 @@ class Viv:
matches.append(v)
elif v.name.startswith(name_id):
matches.append(v)
if len(matches) == 1:
return matches[0]
if not matches:
error(f"no matches found for {name_id}", code=1)
elif len(matches) > 1:
echo(f"matches {','.join((match.name for match in matches))}", style="red")
error("too many matches maybe try a longer name?", code=1)
else:
error(f"no matches found for {name_id}", code=1)
return matches[0]
def remove(self, args: Namespace) -> None:
def remove(self, args):
"""\
remove a vivenv
@ -827,7 +801,7 @@ class Viv:
code=1,
)
def freeze(self, args: Namespace) -> None:
def freeze(self, args):
"""create import statement from package spec"""
if not args.reqs:
@ -843,7 +817,7 @@ class Viv:
args.standalone,
)
def list(self, args: Namespace) -> None:
def list(self, args):
"""list all vivenvs"""
if args.quiet:
@ -865,14 +839,14 @@ class Viv:
)
a.table(rows)
def exe(self, args: Namespace) -> None:
def exe(self, args):
"""run python/pip in vivenv"""
vivenv = self._match_vivenv(args.vivenv)
pip_path, python_path = (vivenv.path / "bin" / cmd for cmd in ("pip", "python"))
# todo check for vivenv
echo(f"executing command within {vivenv.name}")
echo(f"executing command within {args.vivenv}")
cmd = (
f"{pip_path} {' '.join(args.cmd)}"
@ -883,7 +857,7 @@ class Viv:
echo(f"executing {cmd}")
run(shlex.split(cmd), verbose=True)
def info(self, args: Namespace) -> None:
def info(self, args):
"""get metadata about a vivenv"""
vivenv = self._match_vivenv(args.vivenv)
metadata_file = vivenv.path / "viv-info.json"
@ -895,12 +869,10 @@ class Viv:
vivenv.dump_info()
def _get_subcmd_parser(
self, subparsers: _SubParsersAction[ArgumentParser], name: str, **kwargs: Any
) -> ArgumentParser:
def _get_subcmd_parser(self, subparsers, name: str, **kwargs) -> ArgumentParser:
aliases = kwargs.pop("aliases", [name[0]])
cmd = getattr(self, name)
parser: ArgumentParser = subparsers.add_parser(
parser = subparsers.add_parser(
name,
help=cmd.__doc__.splitlines()[0],
description=dedent(cmd.__doc__),
@ -911,7 +883,7 @@ class Viv:
return parser
def cli(self) -> None:
def cli(self):
"""cli entrypoint"""
parser = ArgumentParser(description=description)
@ -1014,7 +986,7 @@ class Viv:
args.func(args)
def main() -> None:
def main():
viv = Viv()
viv.cli()

View file

@ -5,6 +5,5 @@
- [ ] use config file (probably ini or toml for python>=3.11)
- [ ] enable a garbage collection based on time or file existence (configurable)
- [ ] unit tests (v important)
- [ ] update command
- [ ] add more options to filter `viv list`