mirror of
https://github.com/daylinmorgan/viv.git
synced 2024-12-22 10:40:44 -06:00
feat: add filtering to viv list
This commit is contained in:
parent
6dfc9c566a
commit
dadf4dacbf
1 changed files with 202 additions and 35 deletions
237
src/viv/viv.py
237
src/viv/viv.py
|
@ -53,7 +53,7 @@ from typing import (
|
||||||
from urllib.error import HTTPError
|
from urllib.error import HTTPError
|
||||||
from urllib.request import urlopen
|
from urllib.request import urlopen
|
||||||
|
|
||||||
__version__ = "23.8a1-8-g156f358-dev"
|
__version__ = "23.8a1-10-g7cedb07-dev"
|
||||||
|
|
||||||
|
|
||||||
class Spinner:
|
class Spinner:
|
||||||
|
@ -146,19 +146,6 @@ class Env:
|
||||||
return _path_ok(Path(self.xdg_data_home) / "viv") / "viv.log"
|
return _path_ok(Path(self.xdg_data_home) / "viv") / "viv.log"
|
||||||
|
|
||||||
|
|
||||||
class Cache:
|
|
||||||
def __init__(self) -> None:
|
|
||||||
self.base = Env().viv_cache
|
|
||||||
|
|
||||||
@property
|
|
||||||
def src(self) -> Path:
|
|
||||||
return _path_ok(self.base / "src")
|
|
||||||
|
|
||||||
@property
|
|
||||||
def venv(self) -> Path:
|
|
||||||
return _path_ok(self.base / "venvs")
|
|
||||||
|
|
||||||
|
|
||||||
class Cfg:
|
class Cfg:
|
||||||
@property
|
@property
|
||||||
def src(self) -> Path:
|
def src(self) -> Path:
|
||||||
|
@ -166,6 +153,18 @@ class Cfg:
|
||||||
p.parent.mkdir(exist_ok=True, parents=True)
|
p.parent.mkdir(exist_ok=True, parents=True)
|
||||||
return p
|
return p
|
||||||
|
|
||||||
|
@property
|
||||||
|
def cache_base(self) -> Path:
|
||||||
|
return Env().viv_cache
|
||||||
|
|
||||||
|
@property
|
||||||
|
def cache_src(self) -> Path:
|
||||||
|
return _path_ok(self.cache_base / "src")
|
||||||
|
|
||||||
|
@property
|
||||||
|
def cache_venv(self) -> Path:
|
||||||
|
return _path_ok(self.cache_base / "venvs")
|
||||||
|
|
||||||
|
|
||||||
class Ansi:
|
class Ansi:
|
||||||
"""control ouptut of ansi(VT100) control codes"""
|
"""control ouptut of ansi(VT100) control codes"""
|
||||||
|
@ -503,7 +502,7 @@ if __name__ == "__main__":
|
||||||
("CLI", cli),
|
("CLI", cli),
|
||||||
("Running Source", running),
|
("Running Source", running),
|
||||||
("Local Source", local),
|
("Local Source", local),
|
||||||
("Cache", Cache().base),
|
("Cache", Cfg().cache_base),
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
+ "\n"
|
+ "\n"
|
||||||
|
@ -668,6 +667,39 @@ class CustomHelpFormatter(RawDescriptionHelpFormatter, HelpFormatter):
|
||||||
self._add_item(self._format_action, [action])
|
self._add_item(self._format_action, [action])
|
||||||
|
|
||||||
|
|
||||||
|
class KVAppendAction(Action):
|
||||||
|
def __init__(self, *args, keys, **kwargs):
|
||||||
|
self._keys = keys
|
||||||
|
super(KVAppendAction, self).__init__(*args, **kwargs)
|
||||||
|
|
||||||
|
# TODO: add choices option to class?
|
||||||
|
def __call__(self, parser, namespace, values, option_string=None):
|
||||||
|
try:
|
||||||
|
(k, v) = values.split(":")
|
||||||
|
if k not in self._keys:
|
||||||
|
err_quit(
|
||||||
|
"".join(
|
||||||
|
(
|
||||||
|
f"unexpected key: {a.yellow}{k}{a.end} for {self.dest},",
|
||||||
|
" must be one of: ",
|
||||||
|
", ".join((a.style(k, "bold") for k in self._keys)),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
d = {k: v}
|
||||||
|
except ValueError:
|
||||||
|
err_quit(
|
||||||
|
f"failed to parse key-value for {self.dest}"
|
||||||
|
f'"{a.bold}{values}{a.end}" as k:v'
|
||||||
|
)
|
||||||
|
items = getattr(namespace, self.dest)
|
||||||
|
if not items:
|
||||||
|
items = {}
|
||||||
|
|
||||||
|
items.update(d)
|
||||||
|
setattr(namespace, self.dest, items)
|
||||||
|
|
||||||
|
|
||||||
class ArgumentParser(StdArgParser):
|
class ArgumentParser(StdArgParser):
|
||||||
def __init__(self, *args: Any, **kwargs: Any) -> None:
|
def __init__(self, *args: Any, **kwargs: Any) -> None:
|
||||||
super().__init__(*args, **kwargs)
|
super().__init__(*args, **kwargs)
|
||||||
|
@ -768,18 +800,18 @@ class Meta:
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def load(cls, name: str) -> "Meta":
|
def load(cls, name: str) -> "Meta":
|
||||||
if not (Cache().venv / name / "vivmeta.json").exists():
|
if not (Cfg().cache_venv / name / "vivmeta.json").exists():
|
||||||
log.warning(f"possibly corrupted vivenv: {name}")
|
log.warning(f"possibly corrupted vivenv: {name}")
|
||||||
# add empty values for corrupted vivenvs so it will still load
|
# add empty values for corrupted vivenvs so it will still load
|
||||||
return cls(name=name, spec=[""], files=[""], exe="", id="")
|
return cls(name=name, spec=[""], files=[""], exe="", id="")
|
||||||
else:
|
else:
|
||||||
meta = json.loads((Cache().venv / name / "vivmeta.json").read_text())
|
meta = json.loads((Cfg().cache_venv / name / "vivmeta.json").read_text())
|
||||||
|
|
||||||
return cls(**meta)
|
return cls(**meta)
|
||||||
|
|
||||||
def write(self, p: Path | None = None) -> None:
|
def write(self, p: Path | None = None) -> None:
|
||||||
if not p:
|
if not p:
|
||||||
p = (Cache().venv) / self.name / "vivmeta.json"
|
p = (Cfg().cache_venv) / self.name / "vivmeta.json"
|
||||||
|
|
||||||
p.write_text(json.dumps(self.__dict__))
|
p.write_text(json.dumps(self.__dict__))
|
||||||
|
|
||||||
|
@ -810,7 +842,7 @@ class ViVenv:
|
||||||
self.set_path(path)
|
self.set_path(path)
|
||||||
|
|
||||||
if not metadata:
|
if not metadata:
|
||||||
if self.name in (d.name for d in Cache().venv.iterdir()):
|
if self.name in (d.name for d in Cfg().cache_venv.iterdir()):
|
||||||
self.loaded = True
|
self.loaded = True
|
||||||
self.meta = Meta.load(self.name)
|
self.meta = Meta.load(self.name)
|
||||||
else:
|
else:
|
||||||
|
@ -835,7 +867,7 @@ class ViVenv:
|
||||||
return vivenv
|
return vivenv
|
||||||
|
|
||||||
def set_path(self, path: Path | None = None) -> None:
|
def set_path(self, path: Path | None = None) -> None:
|
||||||
self.path = path if path else Cache().venv / self.name
|
self.path = path if path else Cfg().cache_venv / self.name
|
||||||
self.python = str((self.path / "bin" / "python").absolute())
|
self.python = str((self.path / "bin" / "python").absolute())
|
||||||
self.pip = ("pip", "--python", self.python)
|
self.pip = ("pip", "--python", self.python)
|
||||||
|
|
||||||
|
@ -906,6 +938,9 @@ class ViVenv:
|
||||||
sys.path = [p for p in (path_to_add, *sys.path) if p != site.USER_SITE]
|
sys.path = [p for p in (path_to_add, *sys.path) if p != site.USER_SITE]
|
||||||
site.addsitedir(path_to_add)
|
site.addsitedir(path_to_add)
|
||||||
|
|
||||||
|
def files_exist(self) -> bool:
|
||||||
|
return len([f for f in self.meta.files if Path(f).is_file()]) == 0
|
||||||
|
|
||||||
def show(self) -> None:
|
def show(self) -> None:
|
||||||
_id = (
|
_id = (
|
||||||
self.meta.id[:8]
|
self.meta.id[:8]
|
||||||
|
@ -978,7 +1013,7 @@ def use(*packages: str, track_exe: bool = False, name: str = "") -> Path:
|
||||||
|
|
||||||
def get_venvs() -> Dict[str, ViVenv]:
|
def get_venvs() -> Dict[str, ViVenv]:
|
||||||
vivenvs = {}
|
vivenvs = {}
|
||||||
for p in Cache().venv.iterdir():
|
for p in Cfg().cache_venv.iterdir():
|
||||||
vivenv = ViVenv.load(p.name)
|
vivenv = ViVenv.load(p.name)
|
||||||
vivenvs[vivenv.name] = vivenv
|
vivenvs[vivenv.name] = vivenv
|
||||||
return vivenvs
|
return vivenvs
|
||||||
|
@ -1036,7 +1071,7 @@ def fetch_source(reference: str) -> str:
|
||||||
(hash := hashlib.sha256()).update(src.encode())
|
(hash := hashlib.sha256()).update(src.encode())
|
||||||
sha256 = hash.hexdigest()
|
sha256 = hash.hexdigest()
|
||||||
|
|
||||||
cached_src_file = Cache().src / f"{sha256}.py"
|
cached_src_file = Cfg().cache_src / f"{sha256}.py"
|
||||||
|
|
||||||
if not cached_src_file.is_file():
|
if not cached_src_file.is_file():
|
||||||
cached_src_file.write_text(src)
|
cached_src_file.write_text(src)
|
||||||
|
@ -1099,10 +1134,105 @@ def deps_block(txt: str) -> List[str]:
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_date(txt: str) -> datetime:
|
||||||
|
"""attempt to parse datetime string
|
||||||
|
|
||||||
|
acceptable formats `%Y-%m-%d` & `%Y-%m-%dT%H:%M`
|
||||||
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
date = datetime.strptime(
|
||||||
|
txt,
|
||||||
|
"%Y-%m-%d",
|
||||||
|
)
|
||||||
|
return date
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
try:
|
||||||
|
datetime.strptime(txt, "%Y-%m-%dT%H:%M")
|
||||||
|
return date
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
err_quit(
|
||||||
|
f"failed to parse {a.yellow}{txt}{a.end} as datetime\n"
|
||||||
|
"acceptable formats `%Y-%m-%d` & `%Y-%m-%dT%H:%M`"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class Cache:
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self.vivenvs = self._get_venvs()
|
||||||
|
|
||||||
|
def _get_venvs(self) -> Dict[str, ViVenv]:
|
||||||
|
vivenvs = {}
|
||||||
|
for p in Cfg().cache_venv.iterdir():
|
||||||
|
vivenv = ViVenv.load(p.name)
|
||||||
|
vivenvs[vivenv.name] = vivenv
|
||||||
|
return vivenvs
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _compare_dates(
|
||||||
|
vivenv: ViVenv, date_name: str, when: str, date: datetime
|
||||||
|
) -> bool:
|
||||||
|
vivenv_date = datetime.strptime(
|
||||||
|
getattr(vivenv.meta, date_name), "%Y-%m-%d %H:%M:%S.%f"
|
||||||
|
)
|
||||||
|
if when == "before":
|
||||||
|
return vivenv_date < date
|
||||||
|
else:
|
||||||
|
return vivenv_date > date
|
||||||
|
|
||||||
|
def _filter_date(self, date_name: str, when: str, date: datetime) -> List[ViVenv]:
|
||||||
|
return {
|
||||||
|
vivenv
|
||||||
|
for _, vivenv in self.vivenvs.items()
|
||||||
|
if self._compare_dates(
|
||||||
|
vivenv,
|
||||||
|
date_name,
|
||||||
|
when,
|
||||||
|
date,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
def _filter_file(self, file: str) -> List[ViVenv]:
|
||||||
|
if file == "None":
|
||||||
|
return {
|
||||||
|
vivenv for _, vivenv in self.vivenvs.items() if vivenv.files_exist()
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
p = Path(file).absolute().resolve()
|
||||||
|
if not p.is_file():
|
||||||
|
err_quit(f"Unable to find local file: {file}")
|
||||||
|
return {
|
||||||
|
vivenv
|
||||||
|
for _, vivenv in self.vivenvs.items()
|
||||||
|
if str(p) in vivenv.meta.files
|
||||||
|
}
|
||||||
|
|
||||||
|
def filter(self, filters=Dict[str, str]) -> Dict[str, ViVenv]:
|
||||||
|
vivenv_sets = []
|
||||||
|
|
||||||
|
for k, v in filters.items():
|
||||||
|
if "-" in k: # date-based filters all have hyphen
|
||||||
|
(date_name, when) = k.split("-")
|
||||||
|
vivenv_sets.append(self._filter_date(date_name, when, _parse_date(v)))
|
||||||
|
else:
|
||||||
|
vivenv_sets.append(self._filter_file(v))
|
||||||
|
|
||||||
|
if vivenv_sets:
|
||||||
|
return {vivenv.name: vivenv for vivenv in set.union(*vivenv_sets)}
|
||||||
|
else:
|
||||||
|
return {}
|
||||||
|
|
||||||
|
|
||||||
class Viv:
|
class Viv:
|
||||||
def __init__(self) -> None:
|
def __init__(self) -> None:
|
||||||
self.t = Template()
|
self.t = Template()
|
||||||
self.vivenvs = get_venvs()
|
self._cache = Cache()
|
||||||
|
# compat layer
|
||||||
|
self.vivenvs = self._cache.vivenvs
|
||||||
self._get_sources()
|
self._get_sources()
|
||||||
self.name = "viv" if self.local else "python3 <(curl -fsSL viv.dayl.in/viv.py)"
|
self.name = "viv" if self.local else "python3 <(curl -fsSL viv.dayl.in/viv.py)"
|
||||||
|
|
||||||
|
@ -1208,22 +1338,45 @@ class Viv:
|
||||||
|
|
||||||
sys.stdout.write(self.t.frozen_import(path, self.local_source, spec))
|
sys.stdout.write(self.t.frozen_import(path, self.local_source, spec))
|
||||||
|
|
||||||
def list(self, quiet: bool, full: bool, use_json: bool) -> None:
|
def list(
|
||||||
"""list all vivenvs"""
|
self,
|
||||||
|
quiet: bool,
|
||||||
|
verbose: bool,
|
||||||
|
use_json: bool,
|
||||||
|
filter: List[str],
|
||||||
|
) -> None:
|
||||||
|
"""\
|
||||||
|
list vivenvs
|
||||||
|
|
||||||
|
examples:
|
||||||
|
`viv list \\
|
||||||
|
--filter "accessed-after:2023-08-01"`
|
||||||
|
`viv list -q --filter \\
|
||||||
|
"created-before:$(date -d '2 weeks ago' +'%Y-%m-%d')"`
|
||||||
|
`viv list --filter "file:./script.py"`
|
||||||
|
`viv list --filter "file:None"`
|
||||||
|
"""
|
||||||
|
|
||||||
|
if filter:
|
||||||
|
vivenvs = self._cache.filter(filter)
|
||||||
|
else:
|
||||||
|
vivenvs = self._cache.vivenvs
|
||||||
|
|
||||||
if quiet:
|
if quiet:
|
||||||
sys.stdout.write("\n".join(self.vivenvs) + "\n")
|
sys.stdout.write("\n".join(vivenvs) + "\n")
|
||||||
elif len(self.vivenvs) == 0:
|
elif len(self._cache.vivenvs) == 0:
|
||||||
log.info("no vivenvs setup")
|
log.info("no vivenvs setup")
|
||||||
elif full:
|
elif len(vivenvs) == 0 and filter:
|
||||||
for _, vivenv in self.vivenvs.items():
|
log.info("no vivenvs match filter")
|
||||||
|
elif verbose:
|
||||||
|
for _, vivenv in vivenvs.items():
|
||||||
vivenv.tree()
|
vivenv.tree()
|
||||||
elif use_json:
|
elif use_json:
|
||||||
sys.stdout.write(
|
sys.stdout.write(
|
||||||
json.dumps({k: v.meta.__dict__ for k, v in self.vivenvs.items()})
|
json.dumps({k: v.meta.__dict__ for k, v in vivenvs.items()})
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
for _, vivenv in self.vivenvs.items():
|
for _, vivenv in vivenvs.items():
|
||||||
vivenv.show()
|
vivenv.show()
|
||||||
|
|
||||||
def exe(self, vivenv_id: str, cmd: str, rest: List[str]) -> None:
|
def exe(self, vivenv_id: str, cmd: str, rest: List[str]) -> None:
|
||||||
|
@ -1554,17 +1707,31 @@ class Cli:
|
||||||
args = {
|
args = {
|
||||||
("list",): [
|
("list",): [
|
||||||
Arg(
|
Arg(
|
||||||
"-f",
|
"-v",
|
||||||
"--full",
|
"--verbose",
|
||||||
help="show full metadata for vivenvs",
|
help="pretty print full metadata for vivenvs",
|
||||||
action="store_true",
|
action="store_true",
|
||||||
),
|
),
|
||||||
Arg(
|
Arg(
|
||||||
"-q",
|
"-q",
|
||||||
"--quiet",
|
"--quiet",
|
||||||
help="suppress non-essential output",
|
help="show only ids",
|
||||||
action="store_true",
|
action="store_true",
|
||||||
),
|
),
|
||||||
|
Arg(
|
||||||
|
"-f",
|
||||||
|
"--filter",
|
||||||
|
help="filter vivenvs based on key:val",
|
||||||
|
metavar="<key:value>",
|
||||||
|
action=KVAppendAction,
|
||||||
|
keys=[
|
||||||
|
"created-before",
|
||||||
|
"created-after",
|
||||||
|
"accessed-before",
|
||||||
|
"accessed-after",
|
||||||
|
"files",
|
||||||
|
],
|
||||||
|
),
|
||||||
],
|
],
|
||||||
("shim",): [
|
("shim",): [
|
||||||
Arg(
|
Arg(
|
||||||
|
|
Loading…
Reference in a new issue