feat: add logging

This commit is contained in:
Daylin Morgan 2023-06-12 21:10:17 -05:00
parent 095e1017b2
commit 838b585a52
Signed by: daylin
GPG key ID: C1E52E7DD81DF79F

View file

@ -12,6 +12,7 @@ import hashlib
import inspect
import itertools
import json
import logging
import os
import re
import shutil
@ -33,6 +34,7 @@ from argparse import (
from argparse import ArgumentParser as StdArgParser
from dataclasses import dataclass
from datetime import datetime
from logging.handlers import RotatingFileHandler
from pathlib import Path
from textwrap import dedent, fill
from types import TracebackType
@ -50,7 +52,7 @@ from typing import (
from urllib.error import HTTPError
from urllib.request import urlopen
__version__ = "23.5a5-28-g39786aa-dev"
__version__ = "23.5a5-30-g9752582-dev"
class Spinner:
@ -64,8 +66,7 @@ class Spinner:
self.busy = False
self.spinner_visible = False
self.message = message
# sys.stdout.write(message)
echo(message + " ", newline=False)
sys.stderr.write(f"{a.prefix} {a.sep} {message} ")
def write_next(self) -> None:
with self._screen_lock:
@ -110,6 +111,11 @@ class Spinner:
sys.stderr.write("\r")
def _path_ok(p: Path) -> Path:
p.mkdir(exist_ok=True, parents=True)
return p
class Env:
defaults = dict(
viv_bin_dir=Path.home() / ".local" / "bin",
@ -134,23 +140,22 @@ class Env:
def _viv_spec(self) -> List[str]:
return [i for i in os.getenv("VIV_SPEC", "").split(" ") if i]
@property
def _viv_log_path(self) -> Path:
return _path_ok(Path(self.xdg_data_home) / "viv") / "viv.log"
class Cache:
def __init__(self) -> None:
self.base = Env().viv_cache
@staticmethod
def _ensure(p: Path) -> Path:
p.mkdir(parents=True, exist_ok=True)
return p
@property
def src(self) -> Path:
return self._ensure(self.base / "src")
return _path_ok(self.base / "src")
@property
def venv(self) -> Path:
return self._ensure(self.base / "venvs")
return _path_ok(self.base / "venvs")
class Cfg:
@ -199,6 +204,9 @@ class Ansi:
re.VERBOSE,
)
self.sep = f"{self.magenta}|{self.end}"
self.prefix = f"{self.cyan}viv{self.end}"
def escape(self, txt: str) -> str:
return self._ansi_escape.sub("", txt)
@ -231,19 +239,94 @@ class Ansi:
if not output:
return
error("subprocess failed")
echo("see below for command output", style="red")
echo(f"cmd:\n {' '.join(command)}", style="red")
log.error("subprocess failed")
log.error("see below for command output")
log.error(f"cmd:\n {' '.join(command)}")
new_output = [f"{self.red}->{self.end} {line}" for line in output.splitlines()]
echo("subprocess output:" + "\n".join(("", *new_output, "")), style="red")
def viv_preamble(self, style: str = "magenta", sep: str = "::") -> str:
return f"{self.cyan}viv{self.end}{self.__dict__[style]}{sep}{self.end}"
log.error("subprocess output:" + "\n".join(("", *new_output, "")))
a = Ansi()
class MutlilineFormatter(logging.Formatter):
def format(self, record: logging.LogRecord) -> str:
outlines = []
lines = (save_msg := record.msg).splitlines()
for line in lines:
record.msg = line
outlines.append(super().format(record))
record.msg = save_msg
record.message = (output := "\n".join(outlines))
return output
class CustomFormatter(logging.Formatter):
def __init__(self) -> None:
super().__init__()
self.FORMATS = {
**{
level: " ".join(
(
a.prefix,
f"{a.sep}{color}%(levelname)s{a.end}{a.sep}",
"%(message)s",
)
)
for level, color in {
logging.DEBUG: a.dim,
logging.WARNING: a.yellow,
logging.ERROR: a.red,
logging.CRITICAL: a.red,
}.items()
},
logging.INFO: f"{a.prefix} {a.sep} %(message)s",
}
def format(self, record: logging.LogRecord) -> str:
log_fmt = self.FORMATS.get(record.levelno)
formatter = MutlilineFormatter(log_fmt)
return formatter.format(record)
class CustomFileHandler(RotatingFileHandler):
"""Custom logging handler to strip ansi before logging to file"""
def emit(self, record: logging.LogRecord) -> None:
record.msg = a.escape(record.msg)
super().emit(record)
def gen_logger() -> logging.Logger:
logger = logging.getLogger("viv")
if not logger.handlers:
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.INFO if not Env().viv_debug else logging.DEBUG)
ch.setFormatter(CustomFormatter())
fh = CustomFileHandler(
Env().viv_log_path, maxBytes=10 * 1024 * 1024, backupCount=5
)
fh.setLevel(logging.DEBUG)
fh.setFormatter(
MutlilineFormatter("%(asctime)s | %(levelname)8s | %(message)s")
)
logger.addHandler(ch)
logger.addHandler(fh)
return logger
log = gen_logger()
def err_quit(*msg: str, code: int = 1) -> None:
log.error("\n".join(msg))
sys.exit(code)
class Template:
_standalone_func = r"""def _viv_use(*pkgs, track_exe=False, name=""):
import hashlib, json, os, site, shutil, sys, venv # noqa
@ -427,28 +510,24 @@ def echo(
msg: str, style: str = "magenta", newline: bool = True, fd: TextIO = sys.stderr
) -> None:
"""output general message to stdout"""
output = f"{a.viv_preamble(style)} {msg}"
if newline:
output += "\n"
output = f"{a.prefix} {a.sep} {msg}\n"
fd.write(output)
def error(msg: str, code: int = 0) -> None:
def error(*msg: str, exit: bool = True, code: int = 1) -> None:
"""output error message and if code provided exit"""
echo(f"{a.red}error:{a.end} {msg}", style="red")
if code:
prefix = f"{a.prefix} {a.sep}{a.red}ERROR{a.end}{a.sep} "
sys.stderr.write("\n".join((f"{prefix}{line}" for line in msg)) + "\n")
if exit:
sys.exit(code)
def warn(msg: str) -> None:
"""output warning message to stdout"""
echo(f"{a.yellow}warn:{a.end} {msg}", style="yellow")
def confirm(question: str, context: str = "", yes: bool = False) -> bool:
sys.stderr.write(context)
# TODO: update this
sys.stderr.write(
a.viv_preamble(sep="?? ") + question + a.style(" (Y)es/(n)o ", "yellow")
f"{a.prefix} {a.sep}{a.magenta}?{a.end}{a.sep}"
f" {question} {a.yellow}(Y)es/(n)o{a.end} "
)
if yes:
sys.stderr.write(f"{a.green}[FORCED YES]{a.end}\n")
@ -460,8 +539,8 @@ def confirm(question: str, context: str = "", yes: bool = False) -> bool:
return True
elif ans in ("n", "no"):
return False
sys.stdout.write("Please select (Y)es or (n)o. ")
sys.stdout.write("\n")
sys.stderr.write("Please select (Y)es or (n)o. ")
sys.stderr.write("\n")
class CustomHelpFormatter(RawDescriptionHelpFormatter, HelpFormatter):
@ -594,8 +673,7 @@ class ArgumentParser(StdArgParser):
)
def error(self, message: str) -> NoReturn:
error(message)
echo(f"see `{self.prog} --help` for more info", style="red")
error(message, f"see `{self.prog} --help` for more info")
sys.exit(2)
@ -678,7 +756,7 @@ class Meta:
@classmethod
def load(cls, name: str) -> "Meta":
if not (Cache().venv / name / "vivmeta.json").exists():
warn(f"possibly corrupted vivenv: {name}")
log.warning(f"possibly corrupted vivenv: {name}")
# add empty values for corrupted vivenvs so it will still load
return cls(name=name, spec=[""], files=[""], exe="", id="")
else:
@ -693,6 +771,7 @@ class Meta:
p.write_text(json.dumps(self.__dict__))
def addfile(self, f: Path) -> None:
log.debug(f"associating {f} with {self.name}")
self.accessed = str(datetime.today())
self.files = sorted({*self.files, str(f.absolute().resolve())})
@ -712,7 +791,7 @@ class ViVenv:
spec = self._validate_spec(spec)
id = id if id else get_hash(spec, track_exe)
self.name = name if name else id
self.name = name if name else id[:8]
self.set_path(path)
if not metadata:
@ -752,14 +831,16 @@ class ViVenv:
spec: sequence of package specifications
"""
if not set(map(type, spec)) == {str}:
error("unexepected input in package spec")
error(f"check your packages definitions: {spec}", code=1)
err_quit(
"unexepected input in package spec",
f"check your packages definitions: {spec}",
)
return sorted(spec)
def create(self, quiet: bool = False) -> None:
if not quiet:
echo(f"new unique vivenv -> {self.name}")
log.info(f"new unique vivenv: {a.bold}{self.name}{a.end}")
log.debug(f"creating new venv at {self.path}")
with Spinner("creating vivenv"):
venv.create(
self.path,
@ -788,6 +869,7 @@ class ViVenv:
def activate(self) -> None:
# also add sys.path here so that it comes first
log.debug(f"activating {self.name}")
path_to_add = str(*(self.path / "lib").glob("python*/site-packages"))
sys.path = [p for p in (path_to_add, *sys.path) if p != site.USER_SITE]
site.addsitedir(path_to_add)
@ -811,12 +893,11 @@ class ViVenv:
)
def tree(self) -> None:
_id = self.meta.id if self.meta.id == self.name else self.name
items = [
f"{a.magenta}{k}{a.end}: {v}"
for k, v in {
**{
"id": self.meta.id,
"spec": ", ".join(self.meta.spec),
"created": self.meta.created,
"accessed": self.meta.accessed,
@ -825,7 +906,7 @@ class ViVenv:
**({"files": ""} if self.meta.files else {}),
}.items()
]
rows = [f"\n{a.bold}{a.cyan}{_id}{a.end}", self._tree_leaves(items)]
rows = [f"\n{a.bold}{a.cyan}{self.name}{a.end}", self._tree_leaves(items)]
if self.meta.files:
rows += (self._tree_leaves(self.meta.files, indent=" "),)
@ -855,7 +936,6 @@ def use(*packages: str, track_exe: bool = False, name: str = "") -> Path:
if not vivenv.loaded or Env().viv_force:
vivenv.create()
vivenv.install_pkgs()
vivenv.meta.addfile(get_caller_path())
vivenv.meta.write()
@ -905,12 +985,11 @@ def fetch_script(url: str) -> str:
try:
r = urlopen(url)
except (HTTPError, ValueError) as e:
error(f"Failed to fetch from remote url:\n {a.bold}{url}{a.end}")
echo(
err_quit(
"Failed to fetch from remote url:",
f" {a.bold}{url}{a.end}",
"see below:" + a.style("-> ", "red").join(["\n"] + repr(e).splitlines()),
style="red",
)
sys.exit(1)
return r.read().decode("utf-8")
@ -1001,10 +1080,12 @@ class Viv:
if len(matches) == 1:
return matches[0]
elif len(matches) > 1:
echo(f"matches {','.join((match.name for match in matches))}", style="red")
error("too many matches maybe try a longer name?", code=1)
err_quit(
"matches: " + ",".join((match.name for match in matches)),
"too many matches maybe try a longer name?",
)
else:
error(f"no matches found for {name_id}", code=1)
err_quit(f"no matches found for {name_id}")
def remove(self, vivenvs: List[str]) -> None:
"""\
@ -1017,12 +1098,11 @@ class Viv:
for name in vivenvs:
vivenv = self._match_vivenv(name)
if vivenv.path.is_dir():
echo(f"removing {vivenv.name}")
log.info(f"removing {vivenv.name}")
shutil.rmtree(vivenv.path)
else:
error(
err_quit(
f"cowardly exiting because I didn't find vivenv: {name}",
code=1,
)
def freeze(
@ -1045,19 +1125,19 @@ class Viv:
vivenv.install_pkgs()
vivenv.meta.write()
else:
echo("re-using existing vivenv")
log.info("re-using existing vivenv")
vivenv.touch()
vivenv.meta.write()
echo("see below for import statements\n")
log.info("see below for import statements\n")
if standalone:
sys.stdout.write(self.t.standalone(spec))
return
if path and not self.local_source:
error("No local viv found to import from", code=1)
err_quit("No local viv found to import from")
sys.stdout.write(self.t.frozen_import(path, self.local_source, spec))
@ -1067,7 +1147,7 @@ class Viv:
if quiet:
sys.stdout.write("\n".join(self.vivenvs) + "\n")
elif len(self.vivenvs) == 0:
echo("no vivenvs setup")
log.info("no vivenvs setup")
elif full:
for _, vivenv in self.vivenvs.items():
vivenv.tree()
@ -1092,7 +1172,7 @@ class Viv:
bin = vivenv.path / "bin" / cmd
if not bin.exists():
error(f"{cmd} does not exist in {vivenv.name}", code=1)
err_quit(f"{cmd} does not exist in {vivenv.name}")
full_cmd = [str(bin), *rest]
@ -1104,7 +1184,7 @@ class Viv:
metadata_file = vivenv.path / "vivmeta.json"
if not metadata_file.is_file():
error(f"Unable to find metadata for vivenv: {vivenv_id}", code=1)
err_quit(f"Unable to find metadata for vivenv: {vivenv_id}")
if use_json:
sys.stdout.write(json.dumps(vivenv.meta.__dict__))
@ -1114,20 +1194,20 @@ class Viv:
vivenv.tree()
def _install_local_src(self, sha256: str, src: Path, cli: Path, yes: bool) -> None:
echo("updating local source copy of viv")
log.info("updating local source copy of viv")
shutil.copy(Cache().src / f"{sha256}.py", src)
make_executable(src)
echo("symlinking cli")
log.info("symlinking cli")
if cli.is_file():
echo(f"Existing file at {a.style(str(cli),'bold')}")
log.info(f"Existing file at {a.style(str(cli),'bold')}")
if confirm("Would you like to overwrite it?", yes=yes):
cli.unlink()
cli.symlink_to(src)
else:
cli.symlink_to(src)
echo("Remember to include the following line in your shell rc file:")
log.info("Remember to include the following line in your shell rc file:")
sys.stderr.write(
' export PYTHONPATH="$PYTHONPATH:$HOME/'
f'{src.relative_to(Path.home()).parent}"\n'
@ -1149,7 +1229,7 @@ class Viv:
if self.local and self.local_source:
sys.stdout.write(str(self.local_source.parent) + "\n")
else:
error("expected to find a local installation", code=1)
err_quit("expected to find a local installation")
else:
echo("Current:")
sys.stderr.write(
@ -1170,7 +1250,7 @@ class Viv:
sha256, next_version = self._get_new_version(ref)
if self.local_version == next_version:
echo(f"no change between {ref} and local version")
log.info(f"no change between {ref} and local version")
sys.exit(0)
if confirm(
@ -1196,7 +1276,7 @@ class Viv:
) -> None:
sha256, downloaded_version = self._get_new_version(ref)
echo(f"Downloaded version: {downloaded_version}")
log.info(f"Downloaded version: {downloaded_version}")
# TODO: see if file is actually where
# we are about to install and give more instructions
@ -1241,7 +1321,7 @@ class Viv:
else:
p.unlink()
echo(
log.info(
"to re-install use: "
"`python3 <(curl -fsSL viv.dayl.in/viv.py) manage install`"
)
@ -1272,7 +1352,7 @@ class Viv:
output = Env().viv_bin_dir / default_bin if not output else output.absolute()
if output.is_file():
error(f"{output} already exists...exiting", code=1)
err_quit(f"{output} already exists...exiting")
if freeze:
spec = resolve_deps(reqs, requirements)
@ -1320,7 +1400,7 @@ class Viv:
(tmppath / "viv.py").write_text(
# TODO: use latest tag once ready
fetch_script(
"https://raw.githubusercontent.com/daylinmorgan/viv/script-runner/src/viv/viv.py"
"https://raw.githubusercontent.com/daylinmorgan/viv/dev/src/viv/viv.py"
)
)
@ -1579,52 +1659,49 @@ class Cli:
def _validate_args(self, args: Namespace) -> None:
if args.func.__name__ in ("freeze", "shim"):
if not args.reqs:
error("must specify a requirement", code=1)
error("must specify a requirement")
if args.func.__name__ in ("freeze", "shim"):
if not self.viv.local_source and not (args.standalone or args.path):
warn(
log.warning(
"failed to find local copy of `viv` "
"make sure to add it to your PYTHONPATH "
"or consider using --path/--standalone"
)
# TODO: move this since this is code logic not flag logic
if args.path and not self.viv.local_source:
error("No local viv found to import from", code=1)
error("No local viv found to import from")
if args.path and args.standalone:
error("-p/--path and -s/--standalone are mutually exclusive", code=1)
error("-p/--path and -s/--standalone are mutually exclusive")
if args.func.__name__ == "manage_install" and self.viv.local_source:
error(f"found existing viv installation at {self.viv.local_source}")
echo(
error(
f"found existing viv installation at {self.viv.local_source}",
"use "
+ a.style("viv manage update", "bold")
+ " to modify current installation.",
style="red",
)
sys.exit(1)
if args.func.__name__ == "manage_update":
if not self.viv.local_source:
error(
a.style("viv manage update", "bold")
+ " should be used with an exisiting installation",
1,
)
if self.viv.git:
error(
a.style("viv manage update", "bold")
+ " shouldn't be used with a git-based installation",
1,
)
if args.func.__name__ == "run":
if not (args.reqs or args.script):
error("must specify a requirement or --script", code=1)
error("must specify a requirement or --script")
if args.func.__name__ == "info":
if args.use_json and args.path:
error("--json and -p/--path are mutually exclusive", code=1)
error("--json and -p/--path are mutually exclusive")
def _get_subcmd_parser(
self,