refactor: f-strings and templates

This commit is contained in:
Daylin Morgan 2023-05-30 12:00:20 -05:00
parent 8a5f63d5e7
commit b36e4d0d95
Signed by: daylin
GPG key ID: C1E52E7DD81DF79F

View file

@ -51,7 +51,7 @@ from typing import (
from urllib.error import HTTPError from urllib.error import HTTPError
from urllib.request import urlopen from urllib.request import urlopen
__version__ = "23.5a4-6-g27d7952-dev" __version__ = "23.5a4-7-g8a5f63d-dev"
class Config: class Config:
@ -146,7 +146,10 @@ class Spinner:
sys.stderr.write("\r") sys.stderr.write("\r")
BOX: Dict[str, str] = { class Ansi:
"""control ouptut of ansi(VT100) control codes"""
BOX: Dict[str, str] = {
"v": "", "v": "",
"h": "", "h": "",
"tl": "", "tl": "",
@ -154,11 +157,7 @@ BOX: Dict[str, str] = {
"bl": "", "bl": "",
"br": "", "br": "",
"sep": "", "sep": "",
} }
class Ansi:
"""control ouptut of ansi(VT100) control codes"""
def __init__(self) -> None: def __init__(self) -> None:
self.bold: str = "\033[1m" self.bold: str = "\033[1m"
@ -241,7 +240,11 @@ class Ansi:
return list(max(map(len, lst)) for lst in map(list, zip(*rows))) # type: ignore return list(max(map(len, lst)) for lst in map(list, zip(*rows))) # type: ignore
def _make_row(self, row: Generator[Any, None, None]) -> str: def _make_row(self, row: Generator[Any, None, None]) -> str:
return f" {BOX['v']} " + f" {BOX['sep']} ".join(row) + f" {BOX['v']}" return (
f" {self.BOX['v']} "
+ f" {self.BOX['sep']} ".join(row)
+ f" {self.BOX['v']}"
)
def _sanitize_row( def _sanitize_row(
self, sizes: List[int], row: Tuple[str, Sequence[str]] self, sizes: List[int], row: Tuple[str, Sequence[str]]
@ -300,14 +303,169 @@ class Ansi:
) )
) )
sys.stderr.write(f" {BOX['tl']}{BOX['h']*(sum(sizes)+5)}{BOX['tr']}\n") sys.stderr.write(
"".join(
(
" ",
self.BOX["tl"],
self.BOX["h"] * (sum(sizes) + 5),
self.BOX["tr"],
"\n",
)
)
)
sys.stderr.write("\n".join(table_rows) + "\n") sys.stderr.write("\n".join(table_rows) + "\n")
sys.stderr.write(f" {BOX['bl']}{BOX['h']*(sum(sizes)+5)}{BOX['br']}\n") sys.stderr.write(
"".join(
(
" ",
self.BOX["bl"],
self.BOX["h"] * (sum(sizes) + 5),
self.BOX["br"],
"\n",
)
)
)
a = Ansi() a = Ansi()
class Template:
description = f"""
{a.tagline()}
to create/activate a vivenv:
- from command line: `{a.style("viv -h","bold")}`
- within python script: {a.style('__import__("viv").use("typer", "rich-click")','bold')}
"""
_standalone_func = r"""def _viv_use(*pkgs, track_exe=False, name=""):
T,F=True,False;i,s,m,e,spec=__import__,str,map,lambda x: T if x else F,[*pkgs]
if not {*m(type,pkgs)}=={s}: raise ValueError(f"spec: {pkgs} is invalid")
ge,sys,P,ew=i("os").getenv,i("sys"),i("pathlib").Path,i("sys").stderr.write
(cache:=(P(ge("XDG_CACHE_HOME",P.home()/".cache"))/"viv"/"venvs")).mkdir(parents=T,exist_ok=T)
((sha256:=i("hashlib").sha256()).update((s(spec)+
(((exe:=("N/A",s(P(i("sys").executable).resolve()))[e(track_exe)])))).encode()))
if ((env:=cache/(name if name else (_id:=sha256.hexdigest())))
not in cache.glob("*/")) or ge("VIV_FORCE"):
v=e(ge("VIV_VERBOSE"));ew(f"generating new vivenv -> {env.name}\n")
i("venv").EnvBuilder(with_pip=T,clear=T).create(env)
with (env/"pip.conf").open("w") as f:f.write("[global]\ndisable-pip-version-check=true")
if (p:=i("subprocess").run([env/"bin"/"pip","install","--force-reinstall",*spec],text=True,
stdout=(-1,None)[v],stderr=(-2,None)[v])).returncode!=0:
if env.is_dir():i("shutil").rmtree(env)
ew(f"pip had non zero exit ({p.returncode})\n{p.stdout}\n");sys.exit(p.returncode)
with (env/"viv-info.json").open("w") as f:
i("json").dump({"created":s(i("datetime").datetime.today()),
"id":_id,"spec":spec,"exe":exe},f)
sys.path = [p for p in (*sys.path,s(*(env/"lib").glob("py*/si*"))) if p!=i("site").USER_SITE]
return env""" # noqa
def noqa(self, txt: str) -> str:
max_length = max(map(len, txt.splitlines()))
return "\n".join((f"{line:{max_length}} # noqa" for line in txt.splitlines()))
def _use_str(self, spec: List[str], standalone: bool = False) -> str:
spec = ", ".join(f'"{req}"' for req in spec)
if standalone:
return f"""_viv_use({fill(spec,width=90,subsequent_indent=" ",)})"""
else:
return f"""__import("viv").use({spec})"""
def standalone(self, spec: List[str]) -> str:
func_use = self.noqa(
"\n".join((self._standalone_func, self._use_str(spec, standalone=True)))
)
return f"""
# see `python3 <(curl -fsSL gh.dayl.in/viv/viv.py) --help`
# <<<<< auto-generated by viv (v{__version__})
# fmt: off
{func_use}
# fmt: on
# >>>>> code golfed with <3
"""
def _rel_import(self, local_source: Path):
path_to_viv = path_to_viv = str(
local_source.resolve().absolute().parent.parent
).replace(str(Path.home()), "~")
return (
"""__import__("sys").path.append(__import__("os")"""
f""".path.expanduser("{path_to_viv}")) # noqa"""
)
def _absolute_import(self, local_source: Path):
path_to_viv = local_source.resolve().absolute().parent.parent
return f"""__import__("sys").path.append("{path_to_viv}") # noqa"""
def frozen_import(self, path: str, local_source: Path, spec: List[str]) -> str:
if path == "abs":
imports = self._absolute_import(local_source)
elif path == "rel":
imports = self._rel_import(local_source)
else:
imports = ""
return f"""\
{imports}
{self.noqa(self._use_str(spec))}
"""
def shim(
self, path: str, local_source: Path, standalone: bool, spec: List[str], bin: str
) -> str:
if standalone:
imports = self._standalone_func
elif path == "abs":
imports = self._absolute_import(local_source)
elif path == "rel":
imports = self._rel_import(local_source)
else:
imports = ""
return f"""\
#!/usr/bin/env python3
{imports}
import subprocess
import sys
if __name__ == "__main__":
vivenv = {self._use_str(spec, standalone)}
sys.exit(subprocess.run([vivenv / "bin" / "{bin}", *sys.argv[1:]]).returncode)
"""
def update(self, src, cli, local_version, next_version):
return f"""
Update source at {a.green}{src}{a.end}
Symlink {a.bold}{src}{a.end} to {a.bold}{cli}{a.end}
Version {a.bold}{local_version}{a.end} -> {a.bold}{next_version}{a.end}
"""
def install(self, src, cli):
return f"""
Install viv.py to {a.green}{src}{a.end}
Symlink {a.bold}{src}{a.end} to {a.bold}{cli}{a.end}
"""
def show(self, cli, running, local):
return (
"\n".join(
f" {a.bold}{k}{a.end}: {v}"
for k, v in (
("Version", __version__),
("CLI", cli),
("Running Source", running),
("Local Source", local),
)
)
+ "\n"
)
t = Template()
# TODO: convet the below functions into a proper file/stream logging interface # TODO: convet the below functions into a proper file/stream logging interface
def echo( def echo(
msg: str, style: str = "magenta", newline: bool = True, fd: TextIO = sys.stderr msg: str, style: str = "magenta", newline: bool = True, fd: TextIO = sys.stderr
@ -678,103 +836,10 @@ def get_venvs() -> Dict[str, ViVenv]:
return vivenvs return vivenvs
# TODO: make a template class?
SYS_PATH_TEMPLATE = """__import__("sys").path.append("{path_to_viv}") # noqa"""
REL_SYS_PATH_TEMPLATE = (
"""__import__("sys").path.append(__import__("os")"""
""".path.expanduser("{path_to_viv}")) # noqa"""
)
IMPORT_TEMPLATE = """__import__("viv").use({spec}) # noqa"""
STANDALONE_TEMPLATE = r"""
# <<<<< auto-generated by viv (v{version})
# see `python3 <(curl -fsSL gh.dayl.in/viv/viv.py) --help`
# fmt: off
{func}
# fmt: on
# >>>>> code golfed with <3
""" # noqa
STANDALONE_TEMPLATE_FUNC = r"""def _viv_use(*pkgs, track_exe=False, name=""):
T,F=True,False;i,s,m,e,spec=__import__,str,map,lambda x: T if x else F,[*pkgs]
if not {*m(type,pkgs)}=={s}: raise ValueError(f"spec: {pkgs} is invalid")
ge,sys,P,ew=i("os").getenv,i("sys"),i("pathlib").Path,i("sys").stderr.write
(cache:=(P(ge("XDG_CACHE_HOME",P.home()/".cache"))/"viv"/"venvs")).mkdir(parents=T,exist_ok=T)
((sha256:=i("hashlib").sha256()).update((s(spec)+
(((exe:=("N/A",s(P(i("sys").executable).resolve()))[e(track_exe)])))).encode()))
if ((env:=cache/(name if name else (_id:=sha256.hexdigest())))
not in cache.glob("*/")) or ge("VIV_FORCE"):
v=e(ge("VIV_VERBOSE"));ew(f"generating new vivenv -> {env.name}\n")
i("venv").EnvBuilder(with_pip=T,clear=T).create(env)
with (env/"pip.conf").open("w") as f:f.write("[global]\ndisable-pip-version-check=true")
if (p:=i("subprocess").run([env/"bin"/"pip","install","--force-reinstall",*spec],text=True,
stdout=(-1,None)[v],stderr=(-2,None)[v])).returncode!=0:
if env.is_dir():i("shutil").rmtree(env)
ew(f"pip had non zero exit ({p.returncode})\n{p.stdout}\n");sys.exit(p.returncode)
with (env/"viv-info.json").open("w") as f:
i("json").dump({"created":s(i("datetime").datetime.today()),
"id":_id,"spec":spec,"exe":exe},f)
sys.path = [p for p in (*sys.path,s(*(env/"lib").glob("py*/si*"))) if p!=i("site").USER_SITE]
return env
""" # noqa
SHOW_TEMPLATE = f"""
{a.style('Version', 'bold')}: {{version}}
{a.style('CLI', 'bold')}: {{cli}}
{a.style('Running Source', 'bold')}: {{running_src}}
{a.style('Local Source', 'bold')}: {{local_src}}
"""
INSTALL_TEMPLATE = f"""
Install viv.py to {a.green}{{src_location}}{a.end}
Symlink {a.bold}{{src_location}}{a.end} to {a.bold}{{cli_location}}{a.end}
"""
UPDATE_TEMPLATE = f"""
Update source at {a.green}{{src_location}}{a.end}
Symlink {a.bold}{{src_location}}{a.end} to {a.bold}{{cli_location}}{a.end}
Version {a.bold}{{local_version}}{a.end} -> {a.bold}{{next_version}}{a.end}
"""
SHIM_TEMPLATE = """\
#!/usr/bin/env python3
{imports}
import subprocess
import sys
if __name__ == "__main__":
vivenv = {use}
sys.exit(subprocess.run([vivenv / "bin" / "{bin}", *sys.argv[1:]]).returncode)
"""
DESCRIPTION = f"""
{a.tagline()}
to create/activate a vivenv:
- from command line: `{a.style("viv -h","bold")}`
- within python script: {a.style('__import__("viv").use("typer", "rich-click")','bold')}
"""
def noqa(txt: str) -> str:
max_length = max(map(len, txt.splitlines()))
return "\n".join((f"{line:{max_length}} # noqa" for line in txt.splitlines()))
def spec_to_import(spec: List[str]) -> None:
spec_str = ", ".join(f'"{pkg}"' for pkg in spec)
sys.stdout.write(IMPORT_TEMPLATE.format(spec=spec_str) + "\n")
def combined_spec(reqs: List[str], requirements: Path) -> List[str]: def combined_spec(reqs: List[str], requirements: Path) -> List[str]:
if requirements: if requirements:
with requirements.open("r") as f: with requirements.open("r") as f:
reqs += f.readlines() reqs += f.readlines()
return reqs return reqs
@ -801,66 +866,6 @@ def resolve_deps(args: Namespace) -> List[str]:
return resolved_spec.splitlines() return resolved_spec.splitlines()
def generate_import(
args: Namespace,
) -> None:
spec = resolve_deps(args)
if args.keep:
# re-create env again since path's are hard-coded
vivenv = ViVenv(spec)
if vivenv.name not in [d.name for d in c.venvcache.iterdir()] or os.getenv(
"VIV_FORCE"
):
vivenv.create()
vivenv.install_pkgs()
vivenv.dump_info(write=True)
else:
echo("re-using existing vivenv")
echo("see below for import statements\n")
if args.standalone:
sys.stdout.write(
STANDALONE_TEMPLATE.format(
version=__version__,
func=noqa(
STANDALONE_TEMPLATE_FUNC
+ "_viv_use("
+ fill(
", ".join(f'"{pkg}"' for pkg in spec),
width=100,
subsequent_indent=" ",
)
+ ")"
),
)
+ "\n"
)
return
if args.path:
if args.path == "abs":
sys.stdout.write(
SYS_PATH_TEMPLATE.format(
path_to_viv=Path(__file__).resolve().absolute().parent.parent
)
+ "\n"
)
elif args.path == "rel":
sys.stdout.write(
REL_SYS_PATH_TEMPLATE.format(
path_to_viv=str(
Path(__file__).resolve().absolute().parent.parent
).replace(str(Path.home()), "~")
)
+ "\n"
)
spec_to_import(spec)
def fetch_source(reference: str) -> str: def fetch_source(reference: str) -> str:
try: try:
r = urlopen( r = urlopen(
@ -982,9 +987,29 @@ class Viv:
if args.path and args.standalone: if args.path and args.standalone:
error("-p/--path and -s/--standalone are mutually exclusive", code=1) error("-p/--path and -s/--standalone are mutually exclusive", code=1)
generate_import( spec = resolve_deps(args)
args, if args.keep:
) # re-create env again since path's are hard-coded
vivenv = ViVenv(spec)
if not vivenv.exists or os.getenv("VIV_FORCE"):
vivenv.create()
vivenv.install_pkgs()
vivenv.dump_info(write=True)
else:
echo("re-using existing vivenv")
echo("see below for import statements\n")
if args.standalone:
sys.stdout.write(t.standalone(spec))
return
if args.path and not self.local_source:
error("No local viv found to import from", code=1)
sys.stdout.write(t.frozen_import(args.path, self.local_source, spec))
def list(self, args: Namespace) -> None: def list(self, args: Namespace) -> None:
"""list all vivenvs""" """list all vivenvs"""
@ -1071,11 +1096,10 @@ class Viv:
else: else:
echo("Current:") echo("Current:")
sys.stderr.write( sys.stderr.write(
SHOW_TEMPLATE.format( t.show(
version=__version__,
cli=shutil.which("viv"), cli=shutil.which("viv"),
running_src=self.running_source, running=self.running_source,
local_src=self.local_source, local=self.local_source,
) )
) )
@ -1103,12 +1127,7 @@ class Viv:
if confirm( if confirm(
"Would you like to perform the above installation steps?", "Would you like to perform the above installation steps?",
UPDATE_TEMPLATE.format( t.update(self.local_source, args.cli, self.local_version, next_version),
src_location=self.local_source,
local_version=self.local_version,
cli_location=args.cli,
next_version=next_version,
),
): ):
self._install_local_src( self._install_local_src(
sha256, sha256,
@ -1139,10 +1158,7 @@ class Viv:
if confirm( if confirm(
"Would you like to perform the above installation steps?", "Would you like to perform the above installation steps?",
INSTALL_TEMPLATE.format( t.install(args.src, args.cli),
src_location=args.src,
cli_location=args.cli,
),
): ):
self._install_local_src(sha256, args.src, args.cli) self._install_local_src(sha256, args.src, args.cli)
elif args.cmd == "purge": elif args.cmd == "purge":
@ -1208,37 +1224,16 @@ class Viv:
else: else:
spec = combined_spec(args.reqs, args.requirements) spec = combined_spec(args.reqs, args.requirements)
spec_str = ", ".join(f'"{req}"' for req in spec) if args.path and not self.local_source:
if args.standalone:
imports = STANDALONE_TEMPLATE.format(
version=__version__, func=noqa(STANDALONE_TEMPLATE_FUNC)
)
use = f"_viv_use({spec_str})"
elif args.path:
if not self.local_source:
error("No local viv found to import from", code=1) error("No local viv found to import from", code=1)
else:
imports = (
REL_SYS_PATH_TEMPLATE.format(
path_to_viv=str(
self.local_source.resolve().absolute().parent.parent
).replace(str(Path.home()), "~")
)
if args.path == "abs"
else SYS_PATH_TEMPLATE.format(
path_to_viv=self.local_source.resolve().absolute().parent.parent
)
)
use = IMPORT_TEMPLATE.format(spec=spec_str)
else:
imports = ""
use = IMPORT_TEMPLATE.format(spec=spec_str)
if confirm( if confirm(
f"Write shim for {a.style(bin,'bold')} to {a.style(output,'green')}?" f"Write shim for {a.style(bin,'bold')} to {a.style(output,'green')}?"
): ):
with output.open("w") as f: with output.open("w") as f:
f.write(SHIM_TEMPLATE.format(imports=imports, use=use, bin=bin)) f.write(
t.shim(args.path, self.local_source, args.standalone, spec, bin)
)
make_executable(output) make_executable(output)
@ -1302,7 +1297,7 @@ class Viv:
def cli(self) -> None: def cli(self) -> None:
"""cli entrypoint""" """cli entrypoint"""
parser = ArgumentParser(prog=self.name, description=DESCRIPTION) parser = ArgumentParser(prog=self.name, description=t.description)
parser.add_argument( parser.add_argument(
"-V", "-V",
"--version", "--version",