diff --git a/swh/fuse/cache.py b/swh/fuse/cache.py --- a/swh/fuse/cache.py +++ b/swh/fuse/cache.py @@ -11,16 +11,17 @@ from pathlib import Path import re import sys -from typing import Any, AsyncGenerator, Dict, List, Optional +from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple import aiosqlite +import dateutil.parser from psutil import virtual_memory from swh.fuse.fs.artifact import RevisionHistoryShardByDate from swh.fuse.fs.entry import FuseDirEntry, FuseEntry from swh.fuse.fs.mountpoint import ArchiveDir, MetaDir from swh.model.exceptions import ValidationError -from swh.model.identifiers import SWHID, parse_swhid +from swh.model.identifiers import REVISION, SWHID, parse_swhid from swh.web.client.client import typify_json @@ -46,9 +47,10 @@ self.cache_conf = cache_conf async def __aenter__(self): + # TODO: unified name for metadata/history? self.metadata = MetadataCache(self.cache_conf["metadata"]) self.blob = BlobCache(self.cache_conf["blob"]) - self.history = HistoryCache(self.cache_conf["history"]) + self.history = HistoryCache(self.cache_conf["metadata"]) self.direntry = DirEntryCache(self.cache_conf["direntry"]) await self.metadata.__aenter__() await self.blob.__aenter__() @@ -82,11 +84,13 @@ async def __aenter__(self): # In-memory (thus temporary) caching is useful for testing purposes if self.conf.get("in-memory", False): - path = ":memory:" + path = "file::memory:?cache=shared" + uri = True else: path = Path(self.conf["path"]) path.parent.mkdir(parents=True, exist_ok=True) - self.conn = await aiosqlite.connect(path) + uri = False + self.conn = await aiosqlite.connect(path, uri=uri) return self async def __aexit__(self, type=None, val=None, tb=None) -> None: @@ -104,6 +108,13 @@ await self.conn.execute( "create table if not exists metadata_cache (swhid, metadata)" ) + await self.conn.execute( + "create index if not exists idx_metadata on metadata_cache(swhid)" + ) + await self.conn.execute("create table if not exists date_cache (swhid, date)") + await self.conn.execute( + "create index if not exists idx_date on date_cache(swhid)" + ) await self.conn.commit() return self @@ -123,24 +134,17 @@ "insert into metadata_cache values (?, ?)", (str(swhid), json.dumps(metadata)), ) - await self.conn.commit() - - async def get_cached_subset(self, swhids: List[SWHID]) -> List[SWHID]: - swhids_str = ",".join(f'"{x}"' for x in swhids) - cursor = await self.conn.execute( - f"select swhid from metadata_cache where swhid in ({swhids_str})" - ) - cache = await cursor.fetchall() - res = [] - for row in cache: - swhid = row[0] - try: - res.append(parse_swhid(swhid)) - except ValidationError: - logging.warning("Cannot parse object from metadata cache: %s", swhid) + if swhid.object_type == REVISION: + date = dateutil.parser.parse(metadata["date"]) + date_fmt = RevisionHistoryShardByDate.DATE_FMT.format( + year=date.year, month=date.month, day=date.day + ) + await self.conn.execute( + "insert into date_cache values (?, ?)", (str(swhid), date_fmt,) + ) - return res + await self.conn.commit() class BlobCache(AbstractCache): @@ -155,6 +159,9 @@ async def __aenter__(self): await super().__aenter__() await self.conn.execute("create table if not exists blob_cache (swhid, blob)") + await self.conn.execute( + "create index if not exists idx_blob on blob_cache(swhid)" + ) await self.conn.commit() return self @@ -196,36 +203,58 @@ """ ) await self.conn.execute( - "create index if not exists index_history_graph on history_graph(src)" + "create index if not exists idx_history on history_graph(src)" ) await self.conn.commit() return self + HISTORY_REC_QUERY = """ + with recursive + dfs(node) AS ( + values(?) + union + select history_graph.dst + from history_graph + join dfs on history_graph.src = dfs.node + ) + -- Do not keep the root node since it is not an ancestor + select * from dfs limit -1 offset 1 + """ + async def get(self, swhid: SWHID) -> Optional[List[SWHID]]: + cursor = await self.conn.execute(self.HISTORY_REC_QUERY, (str(swhid),),) + cache = await cursor.fetchall() + if not cache: + return None + history = [] + for row in cache: + parent = row[0] + try: + history.append(parse_swhid(parent)) + except ValidationError: + logging.warning("Cannot parse object from history cache: %s", parent) + return history + + async def get_with_date_prefix( + self, swhid: SWHID, date_prefix + ) -> List[Tuple[SWHID, str]]: cursor = await self.conn.execute( - """ - with recursive - dfs(node) AS ( - values(?) - union - select history_graph.dst - from history_graph - join dfs on history_graph.src = dfs.node - ) - -- Do not keep the root node since it is not an ancestor - select * from dfs limit -1 offset 1 + f""" + select swhid, date from ( {self.HISTORY_REC_QUERY} ) as history + join date_cache on history.node = date_cache.swhid + where date_cache.date like '{date_prefix}%' """, (str(swhid),), ) cache = await cursor.fetchall() if not cache: - return None + return [] history = [] for row in cache: - parent = row[0] + parent, date = row[0], row[1] try: - history.append(parse_swhid(parent)) + history.append((parse_swhid(parent), date)) except ValidationError: logging.warning("Cannot parse object from history cache: %s", parent) return history diff --git a/swh/fuse/cli.py b/swh/fuse/cli.py --- a/swh/fuse/cli.py +++ b/swh/fuse/cli.py @@ -30,7 +30,6 @@ "cache": { "metadata": {"path": CACHE_HOME_DIR / "swh/fuse/metadata.sqlite"}, "blob": {"path": CACHE_HOME_DIR / "swh/fuse/blob.sqlite"}, - "history": {"path": CACHE_HOME_DIR / "swh/fuse/history.sqlite"}, "direntry": {"maxram": "10%"}, }, "web-api": { @@ -197,5 +196,5 @@ pass conf = ctx.obj["config"] - for cache_name in ["blob", "metadata", "history"]: + for cache_name in ["blob", "metadata"]: rm_cache(conf, cache_name) diff --git a/swh/fuse/fs/artifact.py b/swh/fuse/fs/artifact.py --- a/swh/fuse/fs/artifact.py +++ b/swh/fuse/fs/artifact.py @@ -266,20 +266,17 @@ async def compute_entries(self) -> AsyncIterator[FuseEntry]: history = await self.fuse.get_history(self.history_swhid) - # Only check for cached revisions since fetching all of them with the - # Web API would take too long - swhids = await self.fuse.cache.metadata.get_cached_subset(history) + # Only check for cached revisions with the appropriate prefix, since + # fetching all of them with the Web API would take too long + swhids = await self.fuse.cache.history.get_with_date_prefix( + self.history_swhid, date_prefix=self.prefix + ) depth = self.prefix.count("/") root_path = self.get_relative_root_path() sharded_dirs = set() - for swhid in swhids: - meta = await self.fuse.cache.metadata.get(swhid) - date = meta["date"] - sharded_name = self.DATE_FMT.format( - year=date.year, month=date.month, day=date.day - ) + for (swhid, sharded_name) in swhids: if not sharded_name.startswith(self.prefix): continue @@ -302,6 +299,7 @@ history_swhid=self.history_swhid, ) + # TODO: store len(history) somewhere to avoid recompute? self.is_status_done = len(swhids) == len(history) if not self.is_status_done and depth == 0: yield self.create_child( diff --git a/swh/fuse/tests/conftest.py b/swh/fuse/tests/conftest.py --- a/swh/fuse/tests/conftest.py +++ b/swh/fuse/tests/conftest.py @@ -35,11 +35,7 @@ tmpfile = NamedTemporaryFile(suffix=".swh-fuse-test.yml") config = { - "cache": { - "metadata": {"in-memory": True}, - "blob": {"in-memory": True}, - "history": {"in-memory": True}, - }, + "cache": {"metadata": {"in-memory": True}, "blob": {"in-memory": True},}, "web-api": {"url": API_URL, "auth-token": None}, } diff --git a/swh/fuse/tests/test_revision.py b/swh/fuse/tests/test_revision.py --- a/swh/fuse/tests/test_revision.py +++ b/swh/fuse/tests/test_revision.py @@ -72,11 +72,10 @@ assert depth2 in (os.listdir(dir_by_page / depth1)) dir_by_date = dir_path / "by-date" - # Wait max 1 second to populate by-date/ dir - for i in range(100): - if ".status" not in os.listdir(dir_by_date): - break - time.sleep(0.01) + # Wait max 10 seconds to populate by-date/ dir + # for i in range(1000): + while ".status" in os.listdir(dir_by_date): + time.sleep(0.1) for swhid in expected: meta = get_data_from_web_archive(str(swhid)) date = dateutil.parser.parse(meta["date"])