diff --git a/.gitignore b/.gitignore index ff3b0df..a02593c 100644 --- a/.gitignore +++ b/.gitignore @@ -111,7 +111,7 @@ ipython_config.py # pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it # in version control. # https://pdm.fming.dev/#use-with-ide -.pdm.toml +.pdm-python # PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm __pypackages__/ diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index cae23bc..9ad90db 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,25 +1,16 @@ # See https://pre-commit.com for more information # See https://pre-commit.com/hooks.html for more hooks repos: - - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v4.4.0 - hooks: - - id: trailing-whitespace - - id: end-of-file-fixer - - id: check-added-large-files - - repo: https://github.com/pycqa/isort - rev: 5.12.0 - hooks: - - id: isort - repo: https://github.com/psf/black - rev: 23.1.0 + rev: 23.3.0 hooks: - id: black language_version: python - repo: https://github.com/charliermarsh/ruff-pre-commit - rev: 'v0.0.245' + rev: 'v0.0.270' hooks: - id: ruff + args: [--fix, --exit-non-zero-on-fix, --show-fixes] - repo: local hooks: - id: set-version diff --git a/pyproject.toml b/pyproject.toml index 044ef2e..a870435 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,4 +34,5 @@ ignore = ["E402"] [tool.mypy] warn_return_any = true check_untyped_defs = true +disallow_untyped_defs = true warn_unused_configs = true diff --git a/src/viv/viv.py b/src/viv/viv.py index fb5b44c..2e1aaea 100755 --- a/src/viv/viv.py +++ b/src/viv/viv.py @@ -6,6 +6,8 @@ __import__("viv").use("requests", "bs4") """ +from __future__ import annotations + import hashlib import itertools import json @@ -20,17 +22,34 @@ import tempfile import threading import time import venv -from argparse import SUPPRESS +from argparse import SUPPRESS, Action from argparse import ArgumentParser as StdArgParser -from argparse import HelpFormatter, RawDescriptionHelpFormatter +from argparse import ( + HelpFormatter, + Namespace, + RawDescriptionHelpFormatter, + _SubParsersAction, +) from dataclasses import dataclass from datetime import datetime from itertools import zip_longest from pathlib import Path from textwrap import dedent, wrap -from typing import Dict, List, Tuple +from types import TracebackType +from typing import ( + Any, + Dict, + List, + NoReturn, + Optional, + Sequence, + TextIO, + Tuple, + Type, + Generator, +) -__version__ = "22.12a3-37-gbfc2592-dev" +__version__ = "22.12a3-49-g5084208-dev" @dataclass @@ -41,7 +60,7 @@ class Config: Path(os.getenv("XDG_CACHE_HOME", Path.home() / ".cache")) / "viv" / "venvs" ) - def __post_init__(self): + def __post_init__(self) -> None: self.venvcache.mkdir(parents=True, exist_ok=True) @@ -53,7 +72,7 @@ class Spinner: https://raw.githubusercontent.com/Tagar/stuff/master/spinner.py """ - def __init__(self, message, delay=0.1): + def __init__(self, message: str, delay: float = 0.1) -> None: self.spinner = itertools.cycle([f"{c} " for c in "⣾⣽⣻⢿⡿⣟⣯⣷"]) self.delay = delay self.busy = False @@ -62,40 +81,42 @@ class Spinner: # sys.stdout.write(message) echo(message + " ", newline=False) - def write_next(self): + def write_next(self) -> None: with self._screen_lock: if not self.spinner_visible: sys.stderr.write(next(self.spinner)) self.spinner_visible = True sys.stderr.flush() - def remove_spinner(self, cleanup=False): + def remove_spinner(self, cleanup: bool = False) -> None: with self._screen_lock: if self.spinner_visible: sys.stderr.write("\b\b\b") - # sys.stdout.write("\b") self.spinner_visible = False if cleanup: sys.stderr.write(" ") # overwrite spinner with blank - # sys.stdout.write("\r") # move to next line - # move back then delete the line sys.stderr.write("\r\033[K") sys.stderr.flush() - def spinner_task(self): + def spinner_task(self) -> None: while self.busy: self.write_next() time.sleep(self.delay) self.remove_spinner() - def __enter__(self): + def __enter__(self) -> None: if sys.stderr.isatty(): self._screen_lock = threading.Lock() self.busy = True self.thread = threading.Thread(target=self.spinner_task) self.thread.start() - def __exit__(self, exc_type, exc_val, exc_traceback): # noqa + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_traceback: Optional[TracebackType], + ) -> None: if sys.stderr.isatty(): self.busy = False self.remove_spinner(cleanup=True) @@ -133,7 +154,7 @@ class Ansi: option: str = yellow metavar: str = "\033[33m" # normal yellow - def __post_init__(self): + def __post_init__(self) -> None: if os.getenv("NO_COLOR") or not sys.stderr.isatty(): for attr in self.__dict__: setattr(self, attr, "") @@ -166,7 +187,7 @@ class Ansi: """ return f"{getattr(self,style)}{txt}{getattr(self,'end')}" - def tagline(self): + def tagline(self) -> str: """generate the viv tagline!""" return " ".join( @@ -176,70 +197,78 @@ class Ansi: ) ) - def subprocess(self, output): + def subprocess(self, output: str) -> None: """generate output for subprocess error Args: output: text output from subprocess, usually from p.stdout """ + echo("subprocess output:") new_output = [f"{self.red}->{self.end} {line}" for line in output.splitlines()] - sys.stdout.write("\n".join(new_output) + "\n") - def _get_column_size(self, sizes, row): - for i, length in enumerate(len(cell) for cell in row): - if length > sizes[i]: - sizes[i] = length - return sizes + def _get_column_sizes( + self, rows: Tuple[Tuple[str, Sequence[str]], ...] + ) -> List[int]: + """convert list of rows to list of columns sizes - def _make_row(self, row) -> str: + First convert rows into list of columns, + then get max string length for each column. + """ + return list(max(map(len, lst)) for lst in map(list, zip(*rows))) # type: ignore + + def _make_row(self, row: Generator[Any, None, None]) -> str: return f" {BOX['v']} " + f" {BOX['sep']} ".join(row) + f" {BOX['v']}" - def _sanitize_row(self, sizes: List[int], row: Tuple[str]) -> Tuple[Tuple[str]]: + def _sanitize_row( + self, sizes: List[int], row: Tuple[str, Sequence[str]] + ) -> Tuple[Tuple[str, Sequence[str]], ...]: if len(row[1]) > sizes[1]: - return zip_longest( - (row[0],), - wrap(row[1], break_on_hyphens=False, width=sizes[1]), - fillvalue="", + return tuple( + zip_longest( + (row[0],), + wrap(str(row[1]), break_on_hyphens=False, width=sizes[1]), + fillvalue="", + ) ) else: return (row,) - def table(self, rows, header_style="cyan") -> None: - """generate a table with outline and styled header + def table( + self, rows: Tuple[Tuple[str, Sequence[str]], ...], header_style: str = "cyan" + ) -> None: + """generate a table with outline and styled header assumes two columns Args: rows: sequence of the rows, first item assumed to be header header_style: color/style for header row """ - sizes = [0] * len(rows[0]) - for row in rows: - sizes = self._get_column_size(sizes, row) + sizes = self._get_column_sizes(rows) - col1_limit = shutil.get_terminal_size().columns - 20 - if col1_limit < 20: + col2_limit = shutil.get_terminal_size().columns - 20 + if col2_limit < 20: error("increase screen size to view table", code=1) + elif sizes[1] > col2_limit: + sizes[1] = col2_limit - if sizes[1] > col1_limit: - sizes[1] = col1_limit - + header, rows = rows[0], rows[1:] # this is maybe taking comprehensions too far.... - table_rows = ( self._make_row(row) for row in ( # header row ( self.__dict__[header_style] + f"{cell:<{sizes[i]}}" + self.end - for i, cell in enumerate(rows[0]) + for i, cell in enumerate(header) ), + # rest of the rows *( (f"{cell:<{sizes[i]}}" for i, cell in enumerate(row)) for row in ( newrow - for row in rows[1:] + for row in rows for newrow in self._sanitize_row(sizes, row) ) ), @@ -254,19 +283,21 @@ class Ansi: a = Ansi() -def error(msg, code: int = 0): +def error(msg: str, code: int = 0) -> None: """output error message and if code provided exit""" echo(f"{a.red}error:{a.end} {msg}", style="red") if code: sys.exit(code) -def warn(msg): +def warn(msg: str) -> None: """output warning message to stdout""" echo(f"{a.yellow}warn:{a.end} {msg}", style="yellow") -def echo(msg: str, style="magenta", newline=True, fd=sys.stderr) -> None: +def echo( + msg: str, style: str = "magenta", newline: bool = True, fd: TextIO = sys.stderr +) -> None: """output general message to stdout""" output = f"{a.cyan}Viv{a.end}{a.__dict__[style]}::{a.end} {msg}" if newline: @@ -275,12 +306,12 @@ def echo(msg: str, style="magenta", newline=True, fd=sys.stderr) -> None: def run( - command: List[str | Path], + command: List[str], spinmsg: str = "", - clean_up_path: Path | None = None, + clean_up_path: Optional[Path] = None, verbose: bool = False, ignore_error: bool = False, - check_output=False, + check_output: bool = False, ) -> str: """run a subcommand @@ -385,9 +416,9 @@ class ViVenv: with (self.path / "pip.conf").open("w") as f: f.write("[global]\ndisable-pip-version-check = true") - def install_pkgs(self): - cmd: List[str | Path] = [ - self.path / "bin" / "pip", + def install_pkgs(self) -> None: + cmd: List[str] = [ + str(self.path / "bin" / "pip"), "install", "--force-reinstall", ] + self.spec @@ -399,7 +430,7 @@ class ViVenv: verbose=bool(os.getenv("VIV_VERBOSE")), ) - def dump_info(self, write=False): + def dump_info(self, write: bool = False) -> None: # TODO: include associated files in 'info' # means it needs to be loaded first info = { @@ -438,7 +469,7 @@ def use(*packages: str, track_exe: bool = False, name: str = "") -> None: modify_sys_path(vivenv.path) -def validate_spec(spec): +def validate_spec(spec: Tuple[str, ...]) -> None: """ensure spec is at least of sequence of strings Args: @@ -450,7 +481,7 @@ def validate_spec(spec): error(f"check your packages definitions: {spec}", code=1) -def modify_sys_path(new_path: Path): +def modify_sys_path(new_path: Path) -> None: # remove user-site for i, path in enumerate(sys.path): if path == site.USER_SITE: @@ -461,7 +492,7 @@ def modify_sys_path(new_path: Path): ) -def get_venvs(): +def get_venvs() -> Dict[str, ViVenv]: vivenvs = {} for p in c.venvcache.iterdir(): vivenv = ViVenv.load(p.name) @@ -519,14 +550,14 @@ def spec_to_import(spec: List[str]) -> None: sys.stdout.write(IMPORT_TEMPLATE.format(spec=spec_str) + "\n") -def freeze_venv(spec: List[str], path: Path | None = None): +def freeze_venv(spec: List[str], path: Path | None = None) -> Tuple[ViVenv, str]: vivenv = ViVenv(spec, track_exe=False, path=path) vivenv.create() # populate the environment for now use # custom cmd since using requirements file cmd = [ - vivenv.path / "bin" / "pip", + str(vivenv.path / "bin" / "pip"), "install", "--force-reinstall", ] + spec @@ -534,7 +565,7 @@ def freeze_venv(spec: List[str], path: Path | None = None): run(cmd, spinmsg="resolving dependencies", clean_up_path=vivenv.path) # generate a frozen environment - cmd = [vivenv.path / "bin" / "pip", "freeze"] + cmd = [str(vivenv.path / "bin" / "pip"), "freeze"] resolved_spec = run(cmd, check_output=True) return vivenv, resolved_spec @@ -542,7 +573,7 @@ def freeze_venv(spec: List[str], path: Path | None = None): def generate_import( requirements: Path, reqs: List[str], - vivenvs, + vivenvs: Dict[str, ViVenv], include_path: bool, keep: bool, standalone: bool, @@ -617,10 +648,10 @@ def generate_import( class CustomHelpFormatter(RawDescriptionHelpFormatter, HelpFormatter): """formatter to remove extra metavar on short opts""" - def _get_invocation_length(self, invocation): + def _get_invocation_length(self, invocation: str) -> int: return len(a.escape(invocation)) - def _format_action_invocation(self, action): + def _format_action_invocation(self, action: Action) -> str: if not action.option_strings: (metavar,) = self._metavar_formatter(action, action.dest)(1) return a.style(metavar, style="option") @@ -653,7 +684,7 @@ class CustomHelpFormatter(RawDescriptionHelpFormatter, HelpFormatter): parts[-1] += a.style(f" {args_string}", style="metavar") return (", ").join(parts) - def _format_usage(self, *args, **kwargs): + def _format_usage(self, *args: Any, **kwargs: Any) -> str: formatted_usage = super()._format_usage(*args, **kwargs) # patch usage with color formatting formatted_usage = ( @@ -663,7 +694,7 @@ class CustomHelpFormatter(RawDescriptionHelpFormatter, HelpFormatter): ) return formatted_usage - def _format_action(self, action): + def _format_action(self, action: Action) -> str: # determine the required width and the entry label help_position = min(self._action_max_length + 2, self._max_help_position) help_width = max(self._width - help_position, 11) @@ -673,11 +704,10 @@ class CustomHelpFormatter(RawDescriptionHelpFormatter, HelpFormatter): # no help; start on same line and add a final newline if not action.help: - tup = self._current_indent, "", action_header - action_header = "%*s%s\n" % tup + action_header = "%*s%s\n" % (self._current_indent, "", action_header) # short action name; start on the same line and pad two spaces elif action_header_len <= action_width: - tup = self._current_indent, "", action_width, action_header + # tup = self._current_indent, "", action_width, action_header action_header = ( f"{' '*self._current_indent}{action_header}" f"{' '*(action_width+2 - action_header_len)}" @@ -686,8 +716,7 @@ class CustomHelpFormatter(RawDescriptionHelpFormatter, HelpFormatter): # long action name; start on the next line else: - tup = self._current_indent, "", action_header - action_header = "%*s%s\n" % tup + action_header = "%*s%s\n" % (self._current_indent, "", action_header) indent_first = help_position # collect the pieces of the action help @@ -713,10 +742,13 @@ class CustomHelpFormatter(RawDescriptionHelpFormatter, HelpFormatter): # return a single string return self._join_parts(parts) - def start_section(self, heading: str) -> None: - return super().start_section(a.style(heading, style="header")) + def start_section(self, heading: Optional[str]) -> None: + if heading: + super().start_section(a.style(heading, style="header")) + else: + super() - def add_argument(self, action): + def add_argument(self, action: Action) -> None: if action.help is not SUPPRESS: # find all invocations get_invocation = self._format_action_invocation @@ -734,14 +766,14 @@ class CustomHelpFormatter(RawDescriptionHelpFormatter, HelpFormatter): class ArgumentParser(StdArgParser): - def __init__(self, *args, **kwargs): + def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) self.formatter_class = lambda prog: CustomHelpFormatter( prog, max_help_position=35 ) - def error(self, message): + def error(self, message: str) -> NoReturn: error(message) echo("see below for help\n", style="red") self.print_help() @@ -761,12 +793,12 @@ within python script: class Viv: - def __init__(self): + def __init__(self) -> None: self.vivenvs = get_venvs() - def _match_vivenv(self, name_id: str) -> ViVenv: + def _match_vivenv(self, name_id: str) -> ViVenv: # type: ignore[return] # TODO: improve matching algorithm to favor names over id's - matches = [] + matches: List[ViVenv] = [] for k, v in self.vivenvs.items(): if name_id == k or v.name == name_id: matches.append(v) @@ -774,15 +806,16 @@ class Viv: matches.append(v) elif v.name.startswith(name_id): matches.append(v) - if not matches: - error(f"no matches found for {name_id}", code=1) + + if len(matches) == 1: + return matches[0] elif len(matches) > 1: echo(f"matches {','.join((match.name for match in matches))}", style="red") error("too many matches maybe try a longer name?", code=1) else: - return matches[0] + error(f"no matches found for {name_id}", code=1) - def remove(self, args): + def remove(self, args: Namespace) -> None: """\ remove a vivenv @@ -801,7 +834,7 @@ class Viv: code=1, ) - def freeze(self, args): + def freeze(self, args: Namespace) -> None: """create import statement from package spec""" if not args.reqs: @@ -817,7 +850,7 @@ class Viv: args.standalone, ) - def list(self, args): + def list(self, args: Namespace) -> None: """list all vivenvs""" if args.quiet: @@ -839,14 +872,14 @@ class Viv: ) a.table(rows) - def exe(self, args): + def exe(self, args: Namespace) -> None: """run python/pip in vivenv""" vivenv = self._match_vivenv(args.vivenv) pip_path, python_path = (vivenv.path / "bin" / cmd for cmd in ("pip", "python")) # todo check for vivenv - echo(f"executing command within {args.vivenv}") + echo(f"executing command within {vivenv.name}") cmd = ( f"{pip_path} {' '.join(args.cmd)}" @@ -857,7 +890,7 @@ class Viv: echo(f"executing {cmd}") run(shlex.split(cmd), verbose=True) - def info(self, args): + def info(self, args: Namespace) -> None: """get metadata about a vivenv""" vivenv = self._match_vivenv(args.vivenv) metadata_file = vivenv.path / "viv-info.json" @@ -869,10 +902,12 @@ class Viv: vivenv.dump_info() - def _get_subcmd_parser(self, subparsers, name: str, **kwargs) -> ArgumentParser: + def _get_subcmd_parser( + self, subparsers: _SubParsersAction[ArgumentParser], name: str, **kwargs: Any + ) -> ArgumentParser: aliases = kwargs.pop("aliases", [name[0]]) cmd = getattr(self, name) - parser = subparsers.add_parser( + parser: ArgumentParser = subparsers.add_parser( name, help=cmd.__doc__.splitlines()[0], description=dedent(cmd.__doc__), @@ -883,7 +918,7 @@ class Viv: return parser - def cli(self): + def cli(self) -> None: """cli entrypoint""" parser = ArgumentParser(description=description) @@ -986,7 +1021,7 @@ class Viv: args.func(args) -def main(): +def main() -> None: viv = Viv() viv.cli() diff --git a/todo.md b/todo.md index 488735b..5d5aa2c 100644 --- a/todo.md +++ b/todo.md @@ -5,5 +5,6 @@ - [ ] use config file (probably ini or toml for python>=3.11) - [ ] enable a garbage collection based on time or file existence (configurable) - [ ] unit tests (v important) +- [ ] update command - [ ] add more options to filter `viv list`