diff --git a/swh/fuse/cache.py b/swh/fuse/cache.py --- a/swh/fuse/cache.py +++ b/swh/fuse/cache.py @@ -11,16 +11,17 @@ from pathlib import Path import re import sys -from typing import Any, AsyncGenerator, Dict, List, Optional +from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple import aiosqlite +import dateutil.parser from psutil import virtual_memory from swh.fuse.fs.artifact import RevisionHistoryShardByDate from swh.fuse.fs.entry import FuseDirEntry, FuseEntry from swh.fuse.fs.mountpoint import ArchiveDir, MetaDir from swh.model.exceptions import ValidationError -from swh.model.identifiers import SWHID, parse_swhid +from swh.model.identifiers import REVISION, SWHID, parse_swhid from swh.web.client.client import ORIGIN_VISIT, typify_json @@ -46,9 +47,10 @@ self.cache_conf = cache_conf async def __aenter__(self): + # TODO: unified name for metadata/history? self.metadata = MetadataCache(self.cache_conf["metadata"]) self.blob = BlobCache(self.cache_conf["blob"]) - self.history = HistoryCache(self.cache_conf["history"]) + self.history = HistoryCache(self.cache_conf["metadata"]) self.direntry = DirEntryCache(self.cache_conf["direntry"]) await self.metadata.__aenter__() await self.blob.__aenter__() @@ -90,11 +92,13 @@ async def __aenter__(self): # In-memory (thus temporary) caching is useful for testing purposes if self.conf.get("in-memory", False): - path = ":memory:" + path = "file::memory:?cache=shared" + uri = True else: path = Path(self.conf["path"]) path.parent.mkdir(parents=True, exist_ok=True) - self.conn = await aiosqlite.connect(path) + uri = False + self.conn = await aiosqlite.connect(path, uri=uri) return self async def __aexit__(self, type=None, val=None, tb=None) -> None: @@ -110,7 +114,7 @@ async def __aenter__(self): await super().__aenter__() await self.conn.execute( - "create table if not exists metadata_cache (swhid, metadata)" + "create table if not exists metadata_cache (swhid, metadata, date)" ) await self.conn.execute( "create table if not exists visits_cache (url, metadata)" @@ -145,10 +149,18 @@ return None async def set(self, swhid: SWHID, metadata: Any) -> None: + swhid_date = "" + if swhid.object_type == REVISION: + date = dateutil.parser.parse(metadata["date"]) + swhid_date = RevisionHistoryShardByDate.DATE_FMT.format( + year=date.year, month=date.month, day=date.day + ) + await self.conn.execute( - "insert into metadata_cache values (?, ?)", - (str(swhid), json.dumps(metadata)), + "insert into metadata_cache values (?, ?, ?)", + (str(swhid), json.dumps(metadata), swhid_date), ) + await self.conn.commit() async def set_visits(self, url_encoded: str, visits: List[Dict[str, Any]]) -> None: @@ -157,23 +169,6 @@ ) await self.conn.commit() - async def get_cached_subset(self, swhids: List[SWHID]) -> List[SWHID]: - swhids_str = ",".join(f'"{x}"' for x in swhids) - cursor = await self.conn.execute( - f"select swhid from metadata_cache where swhid in ({swhids_str})" - ) - cache = await cursor.fetchall() - - res = [] - for row in cache: - swhid = row[0] - try: - res.append(parse_swhid(swhid)) - except ValidationError: - logging.warning("Cannot parse object from metadata cache: %s", swhid) - - return res - class BlobCache(AbstractCache): """ The blob cache map SWHIDs of type `cnt` to the bytes of their archived @@ -233,31 +228,53 @@ await self.conn.commit() return self + HISTORY_REC_QUERY = """ + with recursive + dfs(node) AS ( + values(?) + union + select history_graph.dst + from history_graph + join dfs on history_graph.src = dfs.node + ) + -- Do not keep the root node since it is not an ancestor + select * from dfs limit -1 offset 1 + """ + async def get(self, swhid: SWHID) -> Optional[List[SWHID]]: + cursor = await self.conn.execute(self.HISTORY_REC_QUERY, (str(swhid),),) + cache = await cursor.fetchall() + if not cache: + return None + history = [] + for row in cache: + parent = row[0] + try: + history.append(parse_swhid(parent)) + except ValidationError: + logging.warning("Cannot parse object from history cache: %s", parent) + return history + + async def get_with_date_prefix( + self, swhid: SWHID, date_prefix + ) -> List[Tuple[SWHID, str]]: cursor = await self.conn.execute( - """ - with recursive - dfs(node) AS ( - values(?) - union - select history_graph.dst - from history_graph - join dfs on history_graph.src = dfs.node - ) - -- Do not keep the root node since it is not an ancestor - select * from dfs limit -1 offset 1 + f""" + select swhid, date from ( {self.HISTORY_REC_QUERY} ) as history + join metadata_cache on history.node = metadata_cache.swhid + where metadata_cache.date like '{date_prefix}%' """, (str(swhid),), ) cache = await cursor.fetchall() if not cache: - return None + return [] history = [] for row in cache: - parent = row[0] + parent, date = row[0], row[1] try: - history.append(parse_swhid(parent)) + history.append((parse_swhid(parent), date)) except ValidationError: logging.warning("Cannot parse object from history cache: %s", parent) return history diff --git a/swh/fuse/cli.py b/swh/fuse/cli.py --- a/swh/fuse/cli.py +++ b/swh/fuse/cli.py @@ -30,7 +30,6 @@ "cache": { "metadata": {"path": str(CACHE_HOME_DIR / "swh/fuse/metadata.sqlite")}, "blob": {"path": str(CACHE_HOME_DIR / "swh/fuse/blob.sqlite")}, - "history": {"path": str(CACHE_HOME_DIR / "swh/fuse/history.sqlite")}, "direntry": {"maxram": "10%"}, }, "web-api": { @@ -200,5 +199,5 @@ pass conf = ctx.obj["config"] - for cache_name in ["blob", "metadata", "history"]: + for cache_name in ["blob", "metadata"]: rm_cache(conf, cache_name) diff --git a/swh/fuse/fs/artifact.py b/swh/fuse/fs/artifact.py --- a/swh/fuse/fs/artifact.py +++ b/swh/fuse/fs/artifact.py @@ -265,20 +265,17 @@ async def compute_entries(self) -> AsyncIterator[FuseEntry]: history = await self.fuse.get_history(self.history_swhid) - # Only check for cached revisions since fetching all of them with the - # Web API would take too long - swhids = await self.fuse.cache.metadata.get_cached_subset(history) + # Only check for cached revisions with the appropriate prefix, since + # fetching all of them with the Web API would take too long + swhids = await self.fuse.cache.history.get_with_date_prefix( + self.history_swhid, date_prefix=self.prefix + ) depth = self.prefix.count("/") root_path = self.get_relative_root_path() sharded_dirs = set() - for swhid in swhids: - meta = await self.fuse.cache.metadata.get(swhid) - date = meta["date"] - sharded_name = self.DATE_FMT.format( - year=date.year, month=date.month, day=date.day - ) + for (swhid, sharded_name) in swhids: if not sharded_name.startswith(self.prefix): continue @@ -301,6 +298,7 @@ history_swhid=self.history_swhid, ) + # TODO: store len(history) somewhere to avoid recompute? self.is_status_done = len(swhids) == len(history) if not self.is_status_done and depth == 0: yield self.create_child( diff --git a/swh/fuse/tests/conftest.py b/swh/fuse/tests/conftest.py --- a/swh/fuse/tests/conftest.py +++ b/swh/fuse/tests/conftest.py @@ -35,11 +35,7 @@ tmpfile = NamedTemporaryFile(suffix=".swh-fuse-test.yml") config = { - "cache": { - "metadata": {"in-memory": True}, - "blob": {"in-memory": True}, - "history": {"in-memory": True}, - }, + "cache": {"metadata": {"in-memory": True}, "blob": {"in-memory": True},}, "web-api": {"url": API_URL, "auth-token": None}, "json-indent": None, }