diff --git a/src/viv/viv.py b/src/viv/viv.py index 5a14454..744c808 100755 --- a/src/viv/viv.py +++ b/src/viv/viv.py @@ -12,6 +12,7 @@ import hashlib import inspect import itertools import json +import logging import os import re import shutil @@ -33,6 +34,7 @@ from argparse import ( from argparse import ArgumentParser as StdArgParser from dataclasses import dataclass from datetime import datetime +from logging.handlers import RotatingFileHandler from pathlib import Path from textwrap import dedent, fill from types import TracebackType @@ -50,7 +52,7 @@ from typing import ( from urllib.error import HTTPError from urllib.request import urlopen -__version__ = "23.5a5-28-g39786aa-dev" +__version__ = "23.5a5-30-g9752582-dev" class Spinner: @@ -64,8 +66,7 @@ class Spinner: self.busy = False self.spinner_visible = False self.message = message - # sys.stdout.write(message) - echo(message + " ", newline=False) + sys.stderr.write(f"{a.prefix} {a.sep} {message} ") def write_next(self) -> None: with self._screen_lock: @@ -110,6 +111,11 @@ class Spinner: sys.stderr.write("\r") +def _path_ok(p: Path) -> Path: + p.mkdir(exist_ok=True, parents=True) + return p + + class Env: defaults = dict( viv_bin_dir=Path.home() / ".local" / "bin", @@ -134,23 +140,22 @@ class Env: def _viv_spec(self) -> List[str]: return [i for i in os.getenv("VIV_SPEC", "").split(" ") if i] + @property + def _viv_log_path(self) -> Path: + return _path_ok(Path(self.xdg_data_home) / "viv") / "viv.log" + class Cache: def __init__(self) -> None: self.base = Env().viv_cache - @staticmethod - def _ensure(p: Path) -> Path: - p.mkdir(parents=True, exist_ok=True) - return p - @property def src(self) -> Path: - return self._ensure(self.base / "src") + return _path_ok(self.base / "src") @property def venv(self) -> Path: - return self._ensure(self.base / "venvs") + return _path_ok(self.base / "venvs") class Cfg: @@ -199,6 +204,9 @@ class Ansi: re.VERBOSE, ) + self.sep = f"{self.magenta}|{self.end}" + self.prefix = f"{self.cyan}viv{self.end}" + def escape(self, txt: str) -> str: return self._ansi_escape.sub("", txt) @@ -231,19 +239,94 @@ class Ansi: if not output: return - error("subprocess failed") - echo("see below for command output", style="red") - echo(f"cmd:\n {' '.join(command)}", style="red") + log.error("subprocess failed") + log.error("see below for command output") + log.error(f"cmd:\n {' '.join(command)}") new_output = [f"{self.red}->{self.end} {line}" for line in output.splitlines()] - echo("subprocess output:" + "\n".join(("", *new_output, "")), style="red") - - def viv_preamble(self, style: str = "magenta", sep: str = "::") -> str: - return f"{self.cyan}viv{self.end}{self.__dict__[style]}{sep}{self.end}" + log.error("subprocess output:" + "\n".join(("", *new_output, ""))) a = Ansi() +class MutlilineFormatter(logging.Formatter): + def format(self, record: logging.LogRecord) -> str: + outlines = [] + lines = (save_msg := record.msg).splitlines() + for line in lines: + record.msg = line + outlines.append(super().format(record)) + record.msg = save_msg + record.message = (output := "\n".join(outlines)) + return output + + +class CustomFormatter(logging.Formatter): + def __init__(self) -> None: + super().__init__() + self.FORMATS = { + **{ + level: " ".join( + ( + a.prefix, + f"{a.sep}{color}%(levelname)s{a.end}{a.sep}", + "%(message)s", + ) + ) + for level, color in { + logging.DEBUG: a.dim, + logging.WARNING: a.yellow, + logging.ERROR: a.red, + logging.CRITICAL: a.red, + }.items() + }, + logging.INFO: f"{a.prefix} {a.sep} %(message)s", + } + + def format(self, record: logging.LogRecord) -> str: + log_fmt = self.FORMATS.get(record.levelno) + formatter = MutlilineFormatter(log_fmt) + return formatter.format(record) + + +class CustomFileHandler(RotatingFileHandler): + """Custom logging handler to strip ansi before logging to file""" + + def emit(self, record: logging.LogRecord) -> None: + record.msg = a.escape(record.msg) + super().emit(record) + + +def gen_logger() -> logging.Logger: + logger = logging.getLogger("viv") + if not logger.handlers: + logger.setLevel(logging.DEBUG) + + ch = logging.StreamHandler() + ch.setLevel(logging.INFO if not Env().viv_debug else logging.DEBUG) + ch.setFormatter(CustomFormatter()) + + fh = CustomFileHandler( + Env().viv_log_path, maxBytes=10 * 1024 * 1024, backupCount=5 + ) + fh.setLevel(logging.DEBUG) + fh.setFormatter( + MutlilineFormatter("%(asctime)s | %(levelname)8s | %(message)s") + ) + + logger.addHandler(ch) + logger.addHandler(fh) + return logger + + +log = gen_logger() + + +def err_quit(*msg: str, code: int = 1) -> None: + log.error("\n".join(msg)) + sys.exit(code) + + class Template: _standalone_func = r"""def _viv_use(*pkgs, track_exe=False, name=""): import hashlib, json, os, site, shutil, sys, venv # noqa @@ -427,28 +510,24 @@ def echo( msg: str, style: str = "magenta", newline: bool = True, fd: TextIO = sys.stderr ) -> None: """output general message to stdout""" - output = f"{a.viv_preamble(style)} {msg}" - if newline: - output += "\n" + output = f"{a.prefix} {a.sep} {msg}\n" fd.write(output) -def error(msg: str, code: int = 0) -> None: +def error(*msg: str, exit: bool = True, code: int = 1) -> None: """output error message and if code provided exit""" - echo(f"{a.red}error:{a.end} {msg}", style="red") - if code: + prefix = f"{a.prefix} {a.sep}{a.red}ERROR{a.end}{a.sep} " + sys.stderr.write("\n".join((f"{prefix}{line}" for line in msg)) + "\n") + if exit: sys.exit(code) -def warn(msg: str) -> None: - """output warning message to stdout""" - echo(f"{a.yellow}warn:{a.end} {msg}", style="yellow") - - def confirm(question: str, context: str = "", yes: bool = False) -> bool: sys.stderr.write(context) + # TODO: update this sys.stderr.write( - a.viv_preamble(sep="?? ") + question + a.style(" (Y)es/(n)o ", "yellow") + f"{a.prefix} {a.sep}{a.magenta}?{a.end}{a.sep}" + f" {question} {a.yellow}(Y)es/(n)o{a.end} " ) if yes: sys.stderr.write(f"{a.green}[FORCED YES]{a.end}\n") @@ -460,8 +539,8 @@ def confirm(question: str, context: str = "", yes: bool = False) -> bool: return True elif ans in ("n", "no"): return False - sys.stdout.write("Please select (Y)es or (n)o. ") - sys.stdout.write("\n") + sys.stderr.write("Please select (Y)es or (n)o. ") + sys.stderr.write("\n") class CustomHelpFormatter(RawDescriptionHelpFormatter, HelpFormatter): @@ -594,8 +673,7 @@ class ArgumentParser(StdArgParser): ) def error(self, message: str) -> NoReturn: - error(message) - echo(f"see `{self.prog} --help` for more info", style="red") + error(message, f"see `{self.prog} --help` for more info") sys.exit(2) @@ -678,7 +756,7 @@ class Meta: @classmethod def load(cls, name: str) -> "Meta": if not (Cache().venv / name / "vivmeta.json").exists(): - warn(f"possibly corrupted vivenv: {name}") + log.warning(f"possibly corrupted vivenv: {name}") # add empty values for corrupted vivenvs so it will still load return cls(name=name, spec=[""], files=[""], exe="", id="") else: @@ -693,6 +771,7 @@ class Meta: p.write_text(json.dumps(self.__dict__)) def addfile(self, f: Path) -> None: + log.debug(f"associating {f} with {self.name}") self.accessed = str(datetime.today()) self.files = sorted({*self.files, str(f.absolute().resolve())}) @@ -712,7 +791,7 @@ class ViVenv: spec = self._validate_spec(spec) id = id if id else get_hash(spec, track_exe) - self.name = name if name else id + self.name = name if name else id[:8] self.set_path(path) if not metadata: @@ -752,14 +831,16 @@ class ViVenv: spec: sequence of package specifications """ if not set(map(type, spec)) == {str}: - error("unexepected input in package spec") - error(f"check your packages definitions: {spec}", code=1) + err_quit( + "unexepected input in package spec", + f"check your packages definitions: {spec}", + ) return sorted(spec) def create(self, quiet: bool = False) -> None: - if not quiet: - echo(f"new unique vivenv -> {self.name}") + log.info(f"new unique vivenv: {a.bold}{self.name}{a.end}") + log.debug(f"creating new venv at {self.path}") with Spinner("creating vivenv"): venv.create( self.path, @@ -788,6 +869,7 @@ class ViVenv: def activate(self) -> None: # also add sys.path here so that it comes first + log.debug(f"activating {self.name}") path_to_add = str(*(self.path / "lib").glob("python*/site-packages")) sys.path = [p for p in (path_to_add, *sys.path) if p != site.USER_SITE] site.addsitedir(path_to_add) @@ -811,12 +893,11 @@ class ViVenv: ) def tree(self) -> None: - _id = self.meta.id if self.meta.id == self.name else self.name - items = [ f"{a.magenta}{k}{a.end}: {v}" for k, v in { **{ + "id": self.meta.id, "spec": ", ".join(self.meta.spec), "created": self.meta.created, "accessed": self.meta.accessed, @@ -825,7 +906,7 @@ class ViVenv: **({"files": ""} if self.meta.files else {}), }.items() ] - rows = [f"\n{a.bold}{a.cyan}{_id}{a.end}", self._tree_leaves(items)] + rows = [f"\n{a.bold}{a.cyan}{self.name}{a.end}", self._tree_leaves(items)] if self.meta.files: rows += (self._tree_leaves(self.meta.files, indent=" "),) @@ -855,7 +936,6 @@ def use(*packages: str, track_exe: bool = False, name: str = "") -> Path: if not vivenv.loaded or Env().viv_force: vivenv.create() vivenv.install_pkgs() - vivenv.meta.addfile(get_caller_path()) vivenv.meta.write() @@ -905,12 +985,11 @@ def fetch_script(url: str) -> str: try: r = urlopen(url) except (HTTPError, ValueError) as e: - error(f"Failed to fetch from remote url:\n {a.bold}{url}{a.end}") - echo( + err_quit( + "Failed to fetch from remote url:", + f" {a.bold}{url}{a.end}", "see below:" + a.style("-> ", "red").join(["\n"] + repr(e).splitlines()), - style="red", ) - sys.exit(1) return r.read().decode("utf-8") @@ -1001,10 +1080,12 @@ class Viv: if len(matches) == 1: return matches[0] elif len(matches) > 1: - echo(f"matches {','.join((match.name for match in matches))}", style="red") - error("too many matches maybe try a longer name?", code=1) + err_quit( + "matches: " + ",".join((match.name for match in matches)), + "too many matches maybe try a longer name?", + ) else: - error(f"no matches found for {name_id}", code=1) + err_quit(f"no matches found for {name_id}") def remove(self, vivenvs: List[str]) -> None: """\ @@ -1017,12 +1098,11 @@ class Viv: for name in vivenvs: vivenv = self._match_vivenv(name) if vivenv.path.is_dir(): - echo(f"removing {vivenv.name}") + log.info(f"removing {vivenv.name}") shutil.rmtree(vivenv.path) else: - error( + err_quit( f"cowardly exiting because I didn't find vivenv: {name}", - code=1, ) def freeze( @@ -1045,19 +1125,19 @@ class Viv: vivenv.install_pkgs() vivenv.meta.write() else: - echo("re-using existing vivenv") + log.info("re-using existing vivenv") vivenv.touch() vivenv.meta.write() - echo("see below for import statements\n") + log.info("see below for import statements\n") if standalone: sys.stdout.write(self.t.standalone(spec)) return if path and not self.local_source: - error("No local viv found to import from", code=1) + err_quit("No local viv found to import from") sys.stdout.write(self.t.frozen_import(path, self.local_source, spec)) @@ -1067,7 +1147,7 @@ class Viv: if quiet: sys.stdout.write("\n".join(self.vivenvs) + "\n") elif len(self.vivenvs) == 0: - echo("no vivenvs setup") + log.info("no vivenvs setup") elif full: for _, vivenv in self.vivenvs.items(): vivenv.tree() @@ -1092,7 +1172,7 @@ class Viv: bin = vivenv.path / "bin" / cmd if not bin.exists(): - error(f"{cmd} does not exist in {vivenv.name}", code=1) + err_quit(f"{cmd} does not exist in {vivenv.name}") full_cmd = [str(bin), *rest] @@ -1104,7 +1184,7 @@ class Viv: metadata_file = vivenv.path / "vivmeta.json" if not metadata_file.is_file(): - error(f"Unable to find metadata for vivenv: {vivenv_id}", code=1) + err_quit(f"Unable to find metadata for vivenv: {vivenv_id}") if use_json: sys.stdout.write(json.dumps(vivenv.meta.__dict__)) @@ -1114,20 +1194,20 @@ class Viv: vivenv.tree() def _install_local_src(self, sha256: str, src: Path, cli: Path, yes: bool) -> None: - echo("updating local source copy of viv") + log.info("updating local source copy of viv") shutil.copy(Cache().src / f"{sha256}.py", src) make_executable(src) - echo("symlinking cli") + log.info("symlinking cli") if cli.is_file(): - echo(f"Existing file at {a.style(str(cli),'bold')}") + log.info(f"Existing file at {a.style(str(cli),'bold')}") if confirm("Would you like to overwrite it?", yes=yes): cli.unlink() cli.symlink_to(src) else: cli.symlink_to(src) - echo("Remember to include the following line in your shell rc file:") + log.info("Remember to include the following line in your shell rc file:") sys.stderr.write( ' export PYTHONPATH="$PYTHONPATH:$HOME/' f'{src.relative_to(Path.home()).parent}"\n' @@ -1149,7 +1229,7 @@ class Viv: if self.local and self.local_source: sys.stdout.write(str(self.local_source.parent) + "\n") else: - error("expected to find a local installation", code=1) + err_quit("expected to find a local installation") else: echo("Current:") sys.stderr.write( @@ -1170,7 +1250,7 @@ class Viv: sha256, next_version = self._get_new_version(ref) if self.local_version == next_version: - echo(f"no change between {ref} and local version") + log.info(f"no change between {ref} and local version") sys.exit(0) if confirm( @@ -1196,7 +1276,7 @@ class Viv: ) -> None: sha256, downloaded_version = self._get_new_version(ref) - echo(f"Downloaded version: {downloaded_version}") + log.info(f"Downloaded version: {downloaded_version}") # TODO: see if file is actually where # we are about to install and give more instructions @@ -1241,7 +1321,7 @@ class Viv: else: p.unlink() - echo( + log.info( "to re-install use: " "`python3 <(curl -fsSL viv.dayl.in/viv.py) manage install`" ) @@ -1272,7 +1352,7 @@ class Viv: output = Env().viv_bin_dir / default_bin if not output else output.absolute() if output.is_file(): - error(f"{output} already exists...exiting", code=1) + err_quit(f"{output} already exists...exiting") if freeze: spec = resolve_deps(reqs, requirements) @@ -1320,7 +1400,7 @@ class Viv: (tmppath / "viv.py").write_text( # TODO: use latest tag once ready fetch_script( - "https://raw.githubusercontent.com/daylinmorgan/viv/script-runner/src/viv/viv.py" + "https://raw.githubusercontent.com/daylinmorgan/viv/dev/src/viv/viv.py" ) ) @@ -1579,52 +1659,49 @@ class Cli: def _validate_args(self, args: Namespace) -> None: if args.func.__name__ in ("freeze", "shim"): if not args.reqs: - error("must specify a requirement", code=1) + error("must specify a requirement") if args.func.__name__ in ("freeze", "shim"): if not self.viv.local_source and not (args.standalone or args.path): - warn( + log.warning( "failed to find local copy of `viv` " "make sure to add it to your PYTHONPATH " "or consider using --path/--standalone" ) + # TODO: move this since this is code logic not flag logic if args.path and not self.viv.local_source: - error("No local viv found to import from", code=1) + error("No local viv found to import from") if args.path and args.standalone: - error("-p/--path and -s/--standalone are mutually exclusive", code=1) + error("-p/--path and -s/--standalone are mutually exclusive") if args.func.__name__ == "manage_install" and self.viv.local_source: - error(f"found existing viv installation at {self.viv.local_source}") - echo( + error( + f"found existing viv installation at {self.viv.local_source}", "use " + a.style("viv manage update", "bold") + " to modify current installation.", - style="red", ) - sys.exit(1) if args.func.__name__ == "manage_update": if not self.viv.local_source: error( a.style("viv manage update", "bold") + " should be used with an exisiting installation", - 1, ) if self.viv.git: error( a.style("viv manage update", "bold") + " shouldn't be used with a git-based installation", - 1, ) if args.func.__name__ == "run": if not (args.reqs or args.script): - error("must specify a requirement or --script", code=1) + error("must specify a requirement or --script") if args.func.__name__ == "info": if args.use_json and args.path: - error("--json and -p/--path are mutually exclusive", code=1) + error("--json and -p/--path are mutually exclusive") def _get_subcmd_parser( self,