new eget config compliant wrapper

This commit is contained in:
Daylin Morgan 2022-12-12 22:52:54 -06:00
parent 198d2afd52
commit ff1bdf9c18
1 changed files with 256 additions and 0 deletions

View File

@ -0,0 +1,256 @@
#!/usr/bin/env python3
import argparse
import json
import os
import shlex
import subprocess
import sys
from dataclasses import dataclass
from datetime import datetime, timedelta
from pathlib import Path
try:
import tomllib
except ImportError:
pass
EGET_CONFIG = Path.home() / ".config" / "eget" / "eget.toml"
class Color:
def __init__(self):
self.red = "\033[1;31m"
self.green = "\033[1;32m"
self.yellow = "\033[1;33m"
self.magenta = "\033[1;35m"
self.cyan = "\033[1;36m"
self.end = "\033[0m"
if os.getenv("NO_COLOR"):
for attr in self.__dict__:
setattr(self, attr, "")
@dataclass
class Repo:
owner: str
name: str
bin_name: str
def get_repo_name(self):
return f"{self.owner}/{self.name}"
class Config:
def __init__(self):
if tomllib:
settings, config = self.parse_toml_tomllib()
else:
settings, config = self.parse_toml()
self.settings = settings
self.repos = {}
for repo, info in config.items():
owner, name = repo.split("/")
self.repos[name] = Repo(
owner=owner, name=name, bin_name=get_bin_name(info, name)
)
@staticmethod
def parse_toml():
"""use dasel to read config and return as json"""
# todo: ensure dasel has been installed...
config_json_str = subprocess.check_output(
shlex.split(f"dasel -f {EGET_CONFIG} -w json")
)
config = json.loads(config_json_str)
if "global" in config:
settings = config["global"]
del config["global"]
return settings, config
return None, config
@staticmethod
def parse_toml_tomllib():
"""use dasel to read config and return as json"""
with EGET_CONFIG.open("rb") as f:
config = tomllib.load(f)
if "global" in config:
settings = config["global"]
del config["global"]
return settings, config
return None, config
class CustomHelpFormatter(argparse.HelpFormatter):
"""remove redundant metavars"""
def __init__(self, prog):
super().__init__(prog, max_help_position=40, width=80)
def _format_action_invocation(self, action):
if not action.option_strings or action.nargs == 0:
return super()._format_action_invocation(action)
default = self._get_default_metavar_for_optional(action)
args_string = self._format_args(action, default)
return ", ".join(action.option_strings) + " " + args_string
def make_parser():
parser = argparse.ArgumentParser(
formatter_class=lambda prog: CustomHelpFormatter(prog)
)
parser.add_argument(
"-l", "--list", help="list all configured tools", action="store_true"
)
parser.add_argument(
"-i",
"--install",
metavar="<repo or bin name>",
help="install specified tool (can be used multiple time)",
action="append",
)
parser.add_argument(
"--verbose", help="show eget output", action="store_true")
parser.add_argument("--info", help="return full repo name")
return parser
def run_cmd(command: str, verbose: bool = False, ignore_error: bool = False) -> None:
"""run a subcommand
Args:
command: Subcommand to be run in subprocess.
verbose: If true, print subcommand output.
"""
p = subprocess.run(
shlex.split(command),
stdout=None if verbose else subprocess.PIPE,
stderr=None if verbose else subprocess.STDOUT,
universal_newlines=True,
)
if p.returncode != 0 and not ignore_error:
print()
print(p.stdout)
err_msg = (
f"{color.red}error{color.end}: failed to download"
" see above for eget output"
)
echo(err_msg, hue="red")
sys.exit(1)
def echo(msg: str, header=False, hue="cyan") -> None:
if header:
print(f"==>{color.magenta} {msg} {color.end}<==")
else:
print(f"{color.__dict__[hue]}::{color.end} {msg}")
def get_mod_info(name):
# TODO: get global target info from config file
binary = Path.home() / "bin" / name
if binary.is_file():
mod_date = datetime.fromtimestamp(os.path.getmtime(binary))
date_str = (
f"{color.yellow}{mod_date.strftime('%y.%m.%d')}{color.end}"
if datetime.today() - mod_date > timedelta(days=60)
else mod_date.strftime("%y.%m.%d")
)
return f"{color.green}✓{color.end} {date_str}"
else:
return f"{color.red}✗{color.end}"
def make_row(name, owner, mod_info, lens):
sep = "|"
return (
f"{color.cyan}{name:{lens['name']}}{color.end}"
f" {sep} "
f"{color.yellow}{owner:{lens['owner']}}{color.end}"
f" {sep} "
f"{mod_info}"
)
def get_bin_name(info, name):
if "target" in info:
return info["target"]
elif "file" in info:
return info["file"]
else:
return name
def list_bins(config):
echo("listing configured tools")
lens = {
"owner": max([len(repo.owner) for repo in config.repos.values()]),
"name": max([len(name) for name in config.repos]),
}
print(make_row("name", "owner", "installed", lens))
print(lens["owner"] * "-" + lens["name"] * "-" + "-----------------")
for name in sorted(config.repos):
repo = config.repos[name]
mod_info = get_mod_info(repo.bin_name)
print(make_row(name, repo.owner, mod_info, lens))
def install_bin(config: Config, tool: str, verbose: bool = False):
if tool in config.repos:
repo_str = config.repos[tool].get_repo_name()
else:
for repo in config.repos.values():
if tool == repo.bin_name:
repo_str = repo.get_repo_name()
break
else:
echo(f"{color.red}error{color.end} {tool} not found in config", hue="red")
sys.exit(1)
echo(f"installing {tool}")
cmd = f"eget {repo_str}"
run_cmd(cmd, verbose=verbose)
def main():
parser = make_parser()
args = parser.parse_args()
config = Config()
if args.info:
# todo implement config method to retrieve value or error out
repo = config.repos[args.info]
print(f"{repo.owner}/{repo.name}")
sys.exit(0)
echo("Aweget the wrapper for eget", header=True)
if args.list:
list_bins(config)
elif args.install:
echo(f"attempting to install: {', '.join(args.install)}")
for tool in args.install:
install_bin(config, tool, args.verbose)
else:
echo("no arguments specified", hue="red")
echo("see below for help")
parser.print_help()
if __name__ == "__main__":
color = Color()
main()