style: finish adding proper types

This commit is contained in:
Daylin Morgan 2023-03-16 13:52:55 -05:00
parent 454349ea23
commit ee7fe645c0
Signed by: daylin
GPG key ID: C1E52E7DD81DF79F
5 changed files with 130 additions and 102 deletions

2
.gitignore vendored
View file

@ -111,7 +111,7 @@ ipython_config.py
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
.pdm-python
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/

View file

@ -1,25 +1,16 @@
# See https://pre-commit.com for more information
# See https://pre-commit.com/hooks.html for more hooks
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.4.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-added-large-files
- repo: https://github.com/pycqa/isort
rev: 5.12.0
hooks:
- id: isort
- repo: https://github.com/psf/black
rev: 23.1.0
rev: 23.3.0
hooks:
- id: black
language_version: python
- repo: https://github.com/charliermarsh/ruff-pre-commit
rev: 'v0.0.245'
rev: 'v0.0.270'
hooks:
- id: ruff
args: [--fix, --exit-non-zero-on-fix, --show-fixes]
- repo: local
hooks:
- id: set-version

View file

@ -34,4 +34,5 @@ ignore = ["E402"]
[tool.mypy]
warn_return_any = true
check_untyped_defs = true
disallow_untyped_defs = true
warn_unused_configs = true

View file

@ -6,6 +6,8 @@
__import__("viv").use("requests", "bs4")
"""
from __future__ import annotations
import hashlib
import itertools
import json
@ -20,17 +22,34 @@ import tempfile
import threading
import time
import venv
from argparse import SUPPRESS
from argparse import SUPPRESS, Action
from argparse import ArgumentParser as StdArgParser
from argparse import HelpFormatter, RawDescriptionHelpFormatter
from argparse import (
HelpFormatter,
Namespace,
RawDescriptionHelpFormatter,
_SubParsersAction,
)
from dataclasses import dataclass
from datetime import datetime
from itertools import zip_longest
from pathlib import Path
from textwrap import dedent, wrap
from typing import Dict, List, Tuple
from types import TracebackType
from typing import (
Any,
Dict,
List,
NoReturn,
Optional,
Sequence,
TextIO,
Tuple,
Type,
Generator,
)
__version__ = "22.12a3-37-gbfc2592-dev"
__version__ = "22.12a3-49-g5084208-dev"
@dataclass
@ -41,7 +60,7 @@ class Config:
Path(os.getenv("XDG_CACHE_HOME", Path.home() / ".cache")) / "viv" / "venvs"
)
def __post_init__(self):
def __post_init__(self) -> None:
self.venvcache.mkdir(parents=True, exist_ok=True)
@ -53,7 +72,7 @@ class Spinner:
https://raw.githubusercontent.com/Tagar/stuff/master/spinner.py
"""
def __init__(self, message, delay=0.1):
def __init__(self, message: str, delay: float = 0.1) -> None:
self.spinner = itertools.cycle([f"{c} " for c in "⣾⣽⣻⢿⡿⣟⣯⣷"])
self.delay = delay
self.busy = False
@ -62,40 +81,42 @@ class Spinner:
# sys.stdout.write(message)
echo(message + " ", newline=False)
def write_next(self):
def write_next(self) -> None:
with self._screen_lock:
if not self.spinner_visible:
sys.stderr.write(next(self.spinner))
self.spinner_visible = True
sys.stderr.flush()
def remove_spinner(self, cleanup=False):
def remove_spinner(self, cleanup: bool = False) -> None:
with self._screen_lock:
if self.spinner_visible:
sys.stderr.write("\b\b\b")
# sys.stdout.write("\b")
self.spinner_visible = False
if cleanup:
sys.stderr.write(" ") # overwrite spinner with blank
# sys.stdout.write("\r") # move to next line
# move back then delete the line
sys.stderr.write("\r\033[K")
sys.stderr.flush()
def spinner_task(self):
def spinner_task(self) -> None:
while self.busy:
self.write_next()
time.sleep(self.delay)
self.remove_spinner()
def __enter__(self):
def __enter__(self) -> None:
if sys.stderr.isatty():
self._screen_lock = threading.Lock()
self.busy = True
self.thread = threading.Thread(target=self.spinner_task)
self.thread.start()
def __exit__(self, exc_type, exc_val, exc_traceback): # noqa
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_traceback: Optional[TracebackType],
) -> None:
if sys.stderr.isatty():
self.busy = False
self.remove_spinner(cleanup=True)
@ -133,7 +154,7 @@ class Ansi:
option: str = yellow
metavar: str = "\033[33m" # normal yellow
def __post_init__(self):
def __post_init__(self) -> None:
if os.getenv("NO_COLOR") or not sys.stderr.isatty():
for attr in self.__dict__:
setattr(self, attr, "")
@ -166,7 +187,7 @@ class Ansi:
"""
return f"{getattr(self,style)}{txt}{getattr(self,'end')}"
def tagline(self):
def tagline(self) -> str:
"""generate the viv tagline!"""
return " ".join(
@ -176,70 +197,78 @@ class Ansi:
)
)
def subprocess(self, output):
def subprocess(self, output: str) -> None:
"""generate output for subprocess error
Args:
output: text output from subprocess, usually from p.stdout
"""
echo("subprocess output:")
new_output = [f"{self.red}->{self.end} {line}" for line in output.splitlines()]
sys.stdout.write("\n".join(new_output) + "\n")
def _get_column_size(self, sizes, row):
for i, length in enumerate(len(cell) for cell in row):
if length > sizes[i]:
sizes[i] = length
return sizes
def _get_column_sizes(
self, rows: Tuple[Tuple[str, Sequence[str]], ...]
) -> List[int]:
"""convert list of rows to list of columns sizes
def _make_row(self, row) -> str:
First convert rows into list of columns,
then get max string length for each column.
"""
return list(max(map(len, lst)) for lst in map(list, zip(*rows))) # type: ignore
def _make_row(self, row: Generator[Any, None, None]) -> str:
return f" {BOX['v']} " + f" {BOX['sep']} ".join(row) + f" {BOX['v']}"
def _sanitize_row(self, sizes: List[int], row: Tuple[str]) -> Tuple[Tuple[str]]:
def _sanitize_row(
self, sizes: List[int], row: Tuple[str, Sequence[str]]
) -> Tuple[Tuple[str, Sequence[str]], ...]:
if len(row[1]) > sizes[1]:
return zip_longest(
(row[0],),
wrap(row[1], break_on_hyphens=False, width=sizes[1]),
fillvalue="",
return tuple(
zip_longest(
(row[0],),
wrap(str(row[1]), break_on_hyphens=False, width=sizes[1]),
fillvalue="",
)
)
else:
return (row,)
def table(self, rows, header_style="cyan") -> None:
"""generate a table with outline and styled header
def table(
self, rows: Tuple[Tuple[str, Sequence[str]], ...], header_style: str = "cyan"
) -> None:
"""generate a table with outline and styled header assumes two columns
Args:
rows: sequence of the rows, first item assumed to be header
header_style: color/style for header row
"""
sizes = [0] * len(rows[0])
for row in rows:
sizes = self._get_column_size(sizes, row)
sizes = self._get_column_sizes(rows)
col1_limit = shutil.get_terminal_size().columns - 20
if col1_limit < 20:
col2_limit = shutil.get_terminal_size().columns - 20
if col2_limit < 20:
error("increase screen size to view table", code=1)
elif sizes[1] > col2_limit:
sizes[1] = col2_limit
if sizes[1] > col1_limit:
sizes[1] = col1_limit
header, rows = rows[0], rows[1:]
# this is maybe taking comprehensions too far....
table_rows = (
self._make_row(row)
for row in (
# header row
(
self.__dict__[header_style] + f"{cell:<{sizes[i]}}" + self.end
for i, cell in enumerate(rows[0])
for i, cell in enumerate(header)
),
# rest of the rows
*(
(f"{cell:<{sizes[i]}}" for i, cell in enumerate(row))
for row in (
newrow
for row in rows[1:]
for row in rows
for newrow in self._sanitize_row(sizes, row)
)
),
@ -254,19 +283,21 @@ class Ansi:
a = Ansi()
def error(msg, code: int = 0):
def error(msg: str, code: int = 0) -> None:
"""output error message and if code provided exit"""
echo(f"{a.red}error:{a.end} {msg}", style="red")
if code:
sys.exit(code)
def warn(msg):
def warn(msg: str) -> None:
"""output warning message to stdout"""
echo(f"{a.yellow}warn:{a.end} {msg}", style="yellow")
def echo(msg: str, style="magenta", newline=True, fd=sys.stderr) -> None:
def echo(
msg: str, style: str = "magenta", newline: bool = True, fd: TextIO = sys.stderr
) -> None:
"""output general message to stdout"""
output = f"{a.cyan}Viv{a.end}{a.__dict__[style]}::{a.end} {msg}"
if newline:
@ -275,12 +306,12 @@ def echo(msg: str, style="magenta", newline=True, fd=sys.stderr) -> None:
def run(
command: List[str | Path],
command: List[str],
spinmsg: str = "",
clean_up_path: Path | None = None,
clean_up_path: Optional[Path] = None,
verbose: bool = False,
ignore_error: bool = False,
check_output=False,
check_output: bool = False,
) -> str:
"""run a subcommand
@ -385,9 +416,9 @@ class ViVenv:
with (self.path / "pip.conf").open("w") as f:
f.write("[global]\ndisable-pip-version-check = true")
def install_pkgs(self):
cmd: List[str | Path] = [
self.path / "bin" / "pip",
def install_pkgs(self) -> None:
cmd: List[str] = [
str(self.path / "bin" / "pip"),
"install",
"--force-reinstall",
] + self.spec
@ -399,7 +430,7 @@ class ViVenv:
verbose=bool(os.getenv("VIV_VERBOSE")),
)
def dump_info(self, write=False):
def dump_info(self, write: bool = False) -> None:
# TODO: include associated files in 'info'
# means it needs to be loaded first
info = {
@ -438,7 +469,7 @@ def use(*packages: str, track_exe: bool = False, name: str = "") -> None:
modify_sys_path(vivenv.path)
def validate_spec(spec):
def validate_spec(spec: Tuple[str, ...]) -> None:
"""ensure spec is at least of sequence of strings
Args:
@ -450,7 +481,7 @@ def validate_spec(spec):
error(f"check your packages definitions: {spec}", code=1)
def modify_sys_path(new_path: Path):
def modify_sys_path(new_path: Path) -> None:
# remove user-site
for i, path in enumerate(sys.path):
if path == site.USER_SITE:
@ -461,7 +492,7 @@ def modify_sys_path(new_path: Path):
)
def get_venvs():
def get_venvs() -> Dict[str, ViVenv]:
vivenvs = {}
for p in c.venvcache.iterdir():
vivenv = ViVenv.load(p.name)
@ -519,14 +550,14 @@ def spec_to_import(spec: List[str]) -> None:
sys.stdout.write(IMPORT_TEMPLATE.format(spec=spec_str) + "\n")
def freeze_venv(spec: List[str], path: Path | None = None):
def freeze_venv(spec: List[str], path: Path | None = None) -> Tuple[ViVenv, str]:
vivenv = ViVenv(spec, track_exe=False, path=path)
vivenv.create()
# populate the environment for now use
# custom cmd since using requirements file
cmd = [
vivenv.path / "bin" / "pip",
str(vivenv.path / "bin" / "pip"),
"install",
"--force-reinstall",
] + spec
@ -534,7 +565,7 @@ def freeze_venv(spec: List[str], path: Path | None = None):
run(cmd, spinmsg="resolving dependencies", clean_up_path=vivenv.path)
# generate a frozen environment
cmd = [vivenv.path / "bin" / "pip", "freeze"]
cmd = [str(vivenv.path / "bin" / "pip"), "freeze"]
resolved_spec = run(cmd, check_output=True)
return vivenv, resolved_spec
@ -542,7 +573,7 @@ def freeze_venv(spec: List[str], path: Path | None = None):
def generate_import(
requirements: Path,
reqs: List[str],
vivenvs,
vivenvs: Dict[str, ViVenv],
include_path: bool,
keep: bool,
standalone: bool,
@ -617,10 +648,10 @@ def generate_import(
class CustomHelpFormatter(RawDescriptionHelpFormatter, HelpFormatter):
"""formatter to remove extra metavar on short opts"""
def _get_invocation_length(self, invocation):
def _get_invocation_length(self, invocation: str) -> int:
return len(a.escape(invocation))
def _format_action_invocation(self, action):
def _format_action_invocation(self, action: Action) -> str:
if not action.option_strings:
(metavar,) = self._metavar_formatter(action, action.dest)(1)
return a.style(metavar, style="option")
@ -653,7 +684,7 @@ class CustomHelpFormatter(RawDescriptionHelpFormatter, HelpFormatter):
parts[-1] += a.style(f" {args_string}", style="metavar")
return (", ").join(parts)
def _format_usage(self, *args, **kwargs):
def _format_usage(self, *args: Any, **kwargs: Any) -> str:
formatted_usage = super()._format_usage(*args, **kwargs)
# patch usage with color formatting
formatted_usage = (
@ -663,7 +694,7 @@ class CustomHelpFormatter(RawDescriptionHelpFormatter, HelpFormatter):
)
return formatted_usage
def _format_action(self, action):
def _format_action(self, action: Action) -> str:
# determine the required width and the entry label
help_position = min(self._action_max_length + 2, self._max_help_position)
help_width = max(self._width - help_position, 11)
@ -673,11 +704,10 @@ class CustomHelpFormatter(RawDescriptionHelpFormatter, HelpFormatter):
# no help; start on same line and add a final newline
if not action.help:
tup = self._current_indent, "", action_header
action_header = "%*s%s\n" % tup
action_header = "%*s%s\n" % (self._current_indent, "", action_header)
# short action name; start on the same line and pad two spaces
elif action_header_len <= action_width:
tup = self._current_indent, "", action_width, action_header
# tup = self._current_indent, "", action_width, action_header
action_header = (
f"{' '*self._current_indent}{action_header}"
f"{' '*(action_width+2 - action_header_len)}"
@ -686,8 +716,7 @@ class CustomHelpFormatter(RawDescriptionHelpFormatter, HelpFormatter):
# long action name; start on the next line
else:
tup = self._current_indent, "", action_header
action_header = "%*s%s\n" % tup
action_header = "%*s%s\n" % (self._current_indent, "", action_header)
indent_first = help_position
# collect the pieces of the action help
@ -713,10 +742,13 @@ class CustomHelpFormatter(RawDescriptionHelpFormatter, HelpFormatter):
# return a single string
return self._join_parts(parts)
def start_section(self, heading: str) -> None:
return super().start_section(a.style(heading, style="header"))
def start_section(self, heading: Optional[str]) -> None:
if heading:
super().start_section(a.style(heading, style="header"))
else:
super()
def add_argument(self, action):
def add_argument(self, action: Action) -> None:
if action.help is not SUPPRESS:
# find all invocations
get_invocation = self._format_action_invocation
@ -734,14 +766,14 @@ class CustomHelpFormatter(RawDescriptionHelpFormatter, HelpFormatter):
class ArgumentParser(StdArgParser):
def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.formatter_class = lambda prog: CustomHelpFormatter(
prog, max_help_position=35
)
def error(self, message):
def error(self, message: str) -> NoReturn:
error(message)
echo("see below for help\n", style="red")
self.print_help()
@ -761,12 +793,12 @@ within python script:
class Viv:
def __init__(self):
def __init__(self) -> None:
self.vivenvs = get_venvs()
def _match_vivenv(self, name_id: str) -> ViVenv:
def _match_vivenv(self, name_id: str) -> ViVenv: # type: ignore[return]
# TODO: improve matching algorithm to favor names over id's
matches = []
matches: List[ViVenv] = []
for k, v in self.vivenvs.items():
if name_id == k or v.name == name_id:
matches.append(v)
@ -774,15 +806,16 @@ class Viv:
matches.append(v)
elif v.name.startswith(name_id):
matches.append(v)
if not matches:
error(f"no matches found for {name_id}", code=1)
if len(matches) == 1:
return matches[0]
elif len(matches) > 1:
echo(f"matches {','.join((match.name for match in matches))}", style="red")
error("too many matches maybe try a longer name?", code=1)
else:
return matches[0]
error(f"no matches found for {name_id}", code=1)
def remove(self, args):
def remove(self, args: Namespace) -> None:
"""\
remove a vivenv
@ -801,7 +834,7 @@ class Viv:
code=1,
)
def freeze(self, args):
def freeze(self, args: Namespace) -> None:
"""create import statement from package spec"""
if not args.reqs:
@ -817,7 +850,7 @@ class Viv:
args.standalone,
)
def list(self, args):
def list(self, args: Namespace) -> None:
"""list all vivenvs"""
if args.quiet:
@ -839,14 +872,14 @@ class Viv:
)
a.table(rows)
def exe(self, args):
def exe(self, args: Namespace) -> None:
"""run python/pip in vivenv"""
vivenv = self._match_vivenv(args.vivenv)
pip_path, python_path = (vivenv.path / "bin" / cmd for cmd in ("pip", "python"))
# todo check for vivenv
echo(f"executing command within {args.vivenv}")
echo(f"executing command within {vivenv.name}")
cmd = (
f"{pip_path} {' '.join(args.cmd)}"
@ -857,7 +890,7 @@ class Viv:
echo(f"executing {cmd}")
run(shlex.split(cmd), verbose=True)
def info(self, args):
def info(self, args: Namespace) -> None:
"""get metadata about a vivenv"""
vivenv = self._match_vivenv(args.vivenv)
metadata_file = vivenv.path / "viv-info.json"
@ -869,10 +902,12 @@ class Viv:
vivenv.dump_info()
def _get_subcmd_parser(self, subparsers, name: str, **kwargs) -> ArgumentParser:
def _get_subcmd_parser(
self, subparsers: _SubParsersAction[ArgumentParser], name: str, **kwargs: Any
) -> ArgumentParser:
aliases = kwargs.pop("aliases", [name[0]])
cmd = getattr(self, name)
parser = subparsers.add_parser(
parser: ArgumentParser = subparsers.add_parser(
name,
help=cmd.__doc__.splitlines()[0],
description=dedent(cmd.__doc__),
@ -883,7 +918,7 @@ class Viv:
return parser
def cli(self):
def cli(self) -> None:
"""cli entrypoint"""
parser = ArgumentParser(description=description)
@ -986,7 +1021,7 @@ class Viv:
args.func(args)
def main():
def main() -> None:
viv = Viv()
viv.cli()

View file

@ -5,5 +5,6 @@
- [ ] use config file (probably ini or toml for python>=3.11)
- [ ] enable a garbage collection based on time or file existence (configurable)
- [ ] unit tests (v important)
- [ ] update command
- [ ] add more options to filter `viv list`