From dadf4dacbf30a26bdd8eba78c0e71a3713f307bf Mon Sep 17 00:00:00 2001 From: Daylin Morgan Date: Mon, 7 Aug 2023 19:00:47 -0500 Subject: [PATCH] feat: add filtering to `viv list` --- src/viv/viv.py | 237 +++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 202 insertions(+), 35 deletions(-) diff --git a/src/viv/viv.py b/src/viv/viv.py index c8f69a9..eb0a1f8 100755 --- a/src/viv/viv.py +++ b/src/viv/viv.py @@ -53,7 +53,7 @@ from typing import ( from urllib.error import HTTPError from urllib.request import urlopen -__version__ = "23.8a1-8-g156f358-dev" +__version__ = "23.8a1-10-g7cedb07-dev" class Spinner: @@ -146,19 +146,6 @@ class Env: return _path_ok(Path(self.xdg_data_home) / "viv") / "viv.log" -class Cache: - def __init__(self) -> None: - self.base = Env().viv_cache - - @property - def src(self) -> Path: - return _path_ok(self.base / "src") - - @property - def venv(self) -> Path: - return _path_ok(self.base / "venvs") - - class Cfg: @property def src(self) -> Path: @@ -166,6 +153,18 @@ class Cfg: p.parent.mkdir(exist_ok=True, parents=True) return p + @property + def cache_base(self) -> Path: + return Env().viv_cache + + @property + def cache_src(self) -> Path: + return _path_ok(self.cache_base / "src") + + @property + def cache_venv(self) -> Path: + return _path_ok(self.cache_base / "venvs") + class Ansi: """control ouptut of ansi(VT100) control codes""" @@ -503,7 +502,7 @@ if __name__ == "__main__": ("CLI", cli), ("Running Source", running), ("Local Source", local), - ("Cache", Cache().base), + ("Cache", Cfg().cache_base), ) ) + "\n" @@ -668,6 +667,39 @@ class CustomHelpFormatter(RawDescriptionHelpFormatter, HelpFormatter): self._add_item(self._format_action, [action]) +class KVAppendAction(Action): + def __init__(self, *args, keys, **kwargs): + self._keys = keys + super(KVAppendAction, self).__init__(*args, **kwargs) + + # TODO: add choices option to class? + def __call__(self, parser, namespace, values, option_string=None): + try: + (k, v) = values.split(":") + if k not in self._keys: + err_quit( + "".join( + ( + f"unexpected key: {a.yellow}{k}{a.end} for {self.dest},", + " must be one of: ", + ", ".join((a.style(k, "bold") for k in self._keys)), + ) + ) + ) + d = {k: v} + except ValueError: + err_quit( + f"failed to parse key-value for {self.dest}" + f'"{a.bold}{values}{a.end}" as k:v' + ) + items = getattr(namespace, self.dest) + if not items: + items = {} + + items.update(d) + setattr(namespace, self.dest, items) + + class ArgumentParser(StdArgParser): def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) @@ -768,18 +800,18 @@ class Meta: @classmethod def load(cls, name: str) -> "Meta": - if not (Cache().venv / name / "vivmeta.json").exists(): + if not (Cfg().cache_venv / name / "vivmeta.json").exists(): log.warning(f"possibly corrupted vivenv: {name}") # add empty values for corrupted vivenvs so it will still load return cls(name=name, spec=[""], files=[""], exe="", id="") else: - meta = json.loads((Cache().venv / name / "vivmeta.json").read_text()) + meta = json.loads((Cfg().cache_venv / name / "vivmeta.json").read_text()) return cls(**meta) def write(self, p: Path | None = None) -> None: if not p: - p = (Cache().venv) / self.name / "vivmeta.json" + p = (Cfg().cache_venv) / self.name / "vivmeta.json" p.write_text(json.dumps(self.__dict__)) @@ -810,7 +842,7 @@ class ViVenv: self.set_path(path) if not metadata: - if self.name in (d.name for d in Cache().venv.iterdir()): + if self.name in (d.name for d in Cfg().cache_venv.iterdir()): self.loaded = True self.meta = Meta.load(self.name) else: @@ -835,7 +867,7 @@ class ViVenv: return vivenv def set_path(self, path: Path | None = None) -> None: - self.path = path if path else Cache().venv / self.name + self.path = path if path else Cfg().cache_venv / self.name self.python = str((self.path / "bin" / "python").absolute()) self.pip = ("pip", "--python", self.python) @@ -906,6 +938,9 @@ class ViVenv: sys.path = [p for p in (path_to_add, *sys.path) if p != site.USER_SITE] site.addsitedir(path_to_add) + def files_exist(self) -> bool: + return len([f for f in self.meta.files if Path(f).is_file()]) == 0 + def show(self) -> None: _id = ( self.meta.id[:8] @@ -978,7 +1013,7 @@ def use(*packages: str, track_exe: bool = False, name: str = "") -> Path: def get_venvs() -> Dict[str, ViVenv]: vivenvs = {} - for p in Cache().venv.iterdir(): + for p in Cfg().cache_venv.iterdir(): vivenv = ViVenv.load(p.name) vivenvs[vivenv.name] = vivenv return vivenvs @@ -1036,7 +1071,7 @@ def fetch_source(reference: str) -> str: (hash := hashlib.sha256()).update(src.encode()) sha256 = hash.hexdigest() - cached_src_file = Cache().src / f"{sha256}.py" + cached_src_file = Cfg().cache_src / f"{sha256}.py" if not cached_src_file.is_file(): cached_src_file.write_text(src) @@ -1099,10 +1134,105 @@ def deps_block(txt: str) -> List[str]: ) +def _parse_date(txt: str) -> datetime: + """attempt to parse datetime string + + acceptable formats `%Y-%m-%d` & `%Y-%m-%dT%H:%M` + """ + + try: + date = datetime.strptime( + txt, + "%Y-%m-%d", + ) + return date + except ValueError: + pass + + try: + datetime.strptime(txt, "%Y-%m-%dT%H:%M") + return date + except ValueError: + pass + + err_quit( + f"failed to parse {a.yellow}{txt}{a.end} as datetime\n" + "acceptable formats `%Y-%m-%d` & `%Y-%m-%dT%H:%M`" + ) + + +class Cache: + def __init__(self) -> None: + self.vivenvs = self._get_venvs() + + def _get_venvs(self) -> Dict[str, ViVenv]: + vivenvs = {} + for p in Cfg().cache_venv.iterdir(): + vivenv = ViVenv.load(p.name) + vivenvs[vivenv.name] = vivenv + return vivenvs + + @staticmethod + def _compare_dates( + vivenv: ViVenv, date_name: str, when: str, date: datetime + ) -> bool: + vivenv_date = datetime.strptime( + getattr(vivenv.meta, date_name), "%Y-%m-%d %H:%M:%S.%f" + ) + if when == "before": + return vivenv_date < date + else: + return vivenv_date > date + + def _filter_date(self, date_name: str, when: str, date: datetime) -> List[ViVenv]: + return { + vivenv + for _, vivenv in self.vivenvs.items() + if self._compare_dates( + vivenv, + date_name, + when, + date, + ) + } + + def _filter_file(self, file: str) -> List[ViVenv]: + if file == "None": + return { + vivenv for _, vivenv in self.vivenvs.items() if vivenv.files_exist() + } + else: + p = Path(file).absolute().resolve() + if not p.is_file(): + err_quit(f"Unable to find local file: {file}") + return { + vivenv + for _, vivenv in self.vivenvs.items() + if str(p) in vivenv.meta.files + } + + def filter(self, filters=Dict[str, str]) -> Dict[str, ViVenv]: + vivenv_sets = [] + + for k, v in filters.items(): + if "-" in k: # date-based filters all have hyphen + (date_name, when) = k.split("-") + vivenv_sets.append(self._filter_date(date_name, when, _parse_date(v))) + else: + vivenv_sets.append(self._filter_file(v)) + + if vivenv_sets: + return {vivenv.name: vivenv for vivenv in set.union(*vivenv_sets)} + else: + return {} + + class Viv: def __init__(self) -> None: self.t = Template() - self.vivenvs = get_venvs() + self._cache = Cache() + # compat layer + self.vivenvs = self._cache.vivenvs self._get_sources() self.name = "viv" if self.local else "python3 <(curl -fsSL viv.dayl.in/viv.py)" @@ -1208,22 +1338,45 @@ class Viv: sys.stdout.write(self.t.frozen_import(path, self.local_source, spec)) - def list(self, quiet: bool, full: bool, use_json: bool) -> None: - """list all vivenvs""" + def list( + self, + quiet: bool, + verbose: bool, + use_json: bool, + filter: List[str], + ) -> None: + """\ + list vivenvs + + examples: + `viv list \\ + --filter "accessed-after:2023-08-01"` + `viv list -q --filter \\ + "created-before:$(date -d '2 weeks ago' +'%Y-%m-%d')"` + `viv list --filter "file:./script.py"` + `viv list --filter "file:None"` + """ + + if filter: + vivenvs = self._cache.filter(filter) + else: + vivenvs = self._cache.vivenvs if quiet: - sys.stdout.write("\n".join(self.vivenvs) + "\n") - elif len(self.vivenvs) == 0: + sys.stdout.write("\n".join(vivenvs) + "\n") + elif len(self._cache.vivenvs) == 0: log.info("no vivenvs setup") - elif full: - for _, vivenv in self.vivenvs.items(): + elif len(vivenvs) == 0 and filter: + log.info("no vivenvs match filter") + elif verbose: + for _, vivenv in vivenvs.items(): vivenv.tree() elif use_json: sys.stdout.write( - json.dumps({k: v.meta.__dict__ for k, v in self.vivenvs.items()}) + json.dumps({k: v.meta.__dict__ for k, v in vivenvs.items()}) ) else: - for _, vivenv in self.vivenvs.items(): + for _, vivenv in vivenvs.items(): vivenv.show() def exe(self, vivenv_id: str, cmd: str, rest: List[str]) -> None: @@ -1554,17 +1707,31 @@ class Cli: args = { ("list",): [ Arg( - "-f", - "--full", - help="show full metadata for vivenvs", + "-v", + "--verbose", + help="pretty print full metadata for vivenvs", action="store_true", ), Arg( "-q", "--quiet", - help="suppress non-essential output", + help="show only ids", action="store_true", ), + Arg( + "-f", + "--filter", + help="filter vivenvs based on key:val", + metavar="", + action=KVAppendAction, + keys=[ + "created-before", + "created-after", + "accessed-before", + "accessed-after", + "files", + ], + ), ], ("shim",): [ Arg(