diff --git a/swh/fuse/cli.py b/swh/fuse/cli.py --- a/swh/fuse/cli.py +++ b/swh/fuse/cli.py @@ -37,7 +37,6 @@ "url": "https://archive.softwareheritage.org/api/1", "auth-token": None, }, - "sharding": {"depth": 1, "length": 2,}, } diff --git a/swh/fuse/fs/artifact.py b/swh/fuse/fs/artifact.py --- a/swh/fuse/fs/artifact.py +++ b/swh/fuse/fs/artifact.py @@ -6,7 +6,7 @@ import asyncio from dataclasses import dataclass, field from pathlib import Path -from typing import Any, AsyncIterator, Dict, List +from typing import Any, AsyncIterator, List import urllib.parse from swh.fuse.fs.entry import ( @@ -207,37 +207,35 @@ swhid: SWHID - async def compute_entries(self) -> AsyncIterator[FuseEntry]: + async def prefill_caches(self) -> None: history = await self.fuse.get_history(self.swhid) + for swhid in history: + await self.fuse.get_metadata(swhid) - by_date = self.create_child( + async def compute_entries(self) -> AsyncIterator[FuseEntry]: + # Run it concurrently because of the many API calls necessary + asyncio.create_task(self.prefill_caches()) + + yield self.create_child( RevisionHistoryShardByDate, name="by-date", mode=int(EntryMode.RDONLY_DIR), history_swhid=self.swhid, ) - # Populate the by-date/ directory in parallel because it needs to pull - # from the Web API all history SWHIDs date metadata - asyncio.create_task(by_date.fill_metadata_cache(history)) - yield by_date - by_hash = self.create_child( + yield self.create_child( RevisionHistoryShardByHash, name="by-hash", mode=int(EntryMode.RDONLY_DIR), history_swhid=self.swhid, ) - by_hash.fill_direntry_cache(history) - yield by_hash - by_page = self.create_child( + yield self.create_child( RevisionHistoryShardByPage, name="by-page", mode=int(EntryMode.RDONLY_DIR), history_swhid=self.swhid, ) - by_page.fill_direntry_cache(history) - yield by_page @dataclass @@ -248,6 +246,8 @@ prefix: str = field(default="") is_status_done: bool = field(default=False) + DATE_FMT = "{year:04d}/{month:02d}/{day:02d}/" + @dataclass class StatusFile(FuseFileEntry): """ Temporary file used to indicate loading progress in by-date/ """ @@ -264,48 +264,53 @@ async def size(self) -> int: return len(await self.get_content()) - async def fill_metadata_cache(self, swhids: List[SWHID]) -> None: - for swhid in swhids: - await self.fuse.get_metadata(swhid) - - def get_full_sharded_name(self, meta: Dict[str, Any]) -> str: - date = meta["date"] - return f"{date.year:04d}/{date.month:02d}/{date.day:02d}" - async def compute_entries(self) -> AsyncIterator[FuseEntry]: history = await self.fuse.get_history(self.history_swhid) - self.is_status_done = True + nb_fetched_entries = 0 depth = self.prefix.count("/") - subdirs = set() - nb_entries = 0 + root_path = self.get_relative_root_path() + sharded_dirs = set() for swhid in history: # Only check for cached revisions since fetching all of them with # the Web API would take too long + # TODO: checking metadata cache for *all* history takes ~2s on 12k + # commits, use another dedicated cache? meta = await self.fuse.cache.metadata.get(swhid) if not meta: - self.is_status_done = False continue - nb_entries += 1 - name = self.get_full_sharded_name(meta) - if not name.startswith(self.prefix): + nb_fetched_entries += 1 + date = meta["date"] + sharded_name = self.DATE_FMT.format( + year=date.year, month=date.month, day=date.day + ) + if not sharded_name.startswith(self.prefix): continue - next_prefix = name.split("/")[depth] - if next_prefix not in subdirs: - subdirs.add(next_prefix) + if depth == 3: yield self.create_child( - RevisionHistoryShardByDate, - name=next_prefix, - mode=int(EntryMode.RDONLY_DIR), - prefix=f"{self.prefix}{next_prefix}/", - history_swhid=self.history_swhid, + FuseSymlinkEntry, + name=str(swhid), + target=Path(root_path, f"archive/{swhid}"), ) + # Create sharded directories + else: + next_prefix = sharded_name.split("/")[depth] + if next_prefix not in sharded_dirs: + sharded_dirs.add(next_prefix) + yield self.create_child( + RevisionHistoryShardByDate, + name=next_prefix, + mode=int(EntryMode.RDONLY_DIR), + prefix=f"{self.prefix}{next_prefix}/", + history_swhid=self.history_swhid, + ) - if not self.is_status_done: + self.is_status_done = nb_fetched_entries == len(history) + if not self.is_status_done and depth == 0: yield self.create_child( RevisionHistoryShardByDate.StatusFile, - done=nb_entries, + done=nb_fetched_entries, todo=len(history), ) @@ -317,68 +322,34 @@ history_swhid: SWHID prefix: str = field(default="") - def get_full_sharded_name(self, swhid: SWHID) -> str: - sharding_depth = self.fuse.conf["sharding"]["depth"] - sharding_length = self.fuse.conf["sharding"]["length"] - if sharding_depth <= 0: - return str(swhid) - else: - basename = swhid.object_id - parts = [ - basename[i * sharding_length : (i + 1) * sharding_length] - for i in range(sharding_depth) - ] - # Always keep the full SWHID as the path basename (otherwise we - # loose the SWHID object type information) - parts.append(str(swhid)) - path = Path(*parts) - return str(path) - - def fill_direntry_cache(self, swhids: List[SWHID]): - sharding_depth = self.fuse.conf["sharding"]["depth"] - sharding_length = self.fuse.conf["sharding"]["length"] - depth = self.prefix.count("/") - children = [] - if depth == sharding_depth: + SHARDING_LENGTH = 2 + + async def compute_entries(self) -> AsyncIterator[FuseEntry]: + history = await self.fuse.get_history(self.history_swhid) + + if self.prefix: root_path = self.get_relative_root_path() - for swhid in swhids: - children.append( - self.create_child( + for swhid in history: + if swhid.object_id.startswith(self.prefix): + yield self.create_child( FuseSymlinkEntry, name=str(swhid), target=Path(root_path, f"archive/{swhid}"), ) - ) + # Create sharded directories else: - subdirs = {} - prefix_len = len(self.prefix) - for swhid in swhids: - name = self.get_full_sharded_name(swhid) - next_prefix = name[prefix_len : prefix_len + sharding_length] - subdirs.setdefault(next_prefix, []).append(swhid) - - # Recursive intermediate sharded directories - for subdir, subentries in subdirs.items(): - child_prefix = f"{self.prefix}{subdir}/" - child = self.create_child( - RevisionHistoryShardByHash, - name=subdir, - mode=int(EntryMode.RDONLY_DIR), - prefix=child_prefix, - history_swhid=self.history_swhid, - ) - children.append(child) - child.fill_direntry_cache(subentries) - self.fuse.cache.direntry.set(self, children) - return children - - async def compute_entries(self) -> AsyncIterator[FuseEntry]: - history = await self.fuse.get_history(self.history_swhid) - hash_prefix = self.prefix.replace("/", "") - swhids = [s for s in history if s.object_id.startswith(hash_prefix)] - - for entry in self.fill_direntry_cache(swhids): - yield entry + sharded_dirs = set() + for swhid in history: + next_prefix = swhid.object_id[: self.SHARDING_LENGTH] + if next_prefix not in sharded_dirs: + sharded_dirs.add(next_prefix) + yield self.create_child( + RevisionHistoryShardByHash, + name=next_prefix, + mode=int(EntryMode.RDONLY_DIR), + prefix=next_prefix, + history_swhid=self.history_swhid, + ) @dataclass @@ -386,50 +357,36 @@ """ Revision virtual `history/by-page` sharded directory """ history_swhid: SWHID + prefix: int = field(default=None) PAGE_SIZE = 10_000 PAGE_FMT = "{page_number:03d}" - def fill_direntry_cache(self, swhids: List[SWHID]): - page_number = -1 - page = None - page_root_path = None - page_children = [] - pages = [] - for idx, swhid in enumerate(swhids): - if idx % self.PAGE_SIZE == 0: - if page: - self.fuse.cache.direntry.set(page, page_children) - pages.append(page) - - page_number += 1 - page = self.create_child( + async def compute_entries(self) -> AsyncIterator[FuseEntry]: + history = await self.fuse.get_history(self.history_swhid) + + if self.prefix is not None: + current_page = self.prefix + root_path = self.get_relative_root_path() + max_idx = min(len(history), (current_page + 1) * self.PAGE_SIZE) + for i in range(current_page * self.PAGE_SIZE, max_idx): + swhid = history[i] + yield self.create_child( + FuseSymlinkEntry, + name=str(swhid), + target=Path(root_path, f"archive/{swhid}"), + ) + # Create sharded directories + else: + for i in range(0, len(history), self.PAGE_SIZE): + page_number = i // self.PAGE_SIZE + yield self.create_child( RevisionHistoryShardByPage, name=self.PAGE_FMT.format(page_number=page_number), mode=int(EntryMode.RDONLY_DIR), history_swhid=self.history_swhid, + prefix=page_number, ) - page_root_path = page.get_relative_root_path() - page_children = [] - - page_children.append( - page.create_child( - FuseSymlinkEntry, - name=str(swhid), - target=Path(page_root_path, f"archive/{swhid}"), - ) - ) - - if page: - self.fuse.cache.direntry.set(page, page_children) - pages.append(page) - self.fuse.cache.direntry.set(self, pages) - return pages - - async def compute_entries(self) -> AsyncIterator[FuseEntry]: - history = await self.fuse.get_history(self.history_swhid) - for entry in self.fill_direntry_cache(history): - yield entry @dataclass diff --git a/swh/fuse/tests/test_revision.py b/swh/fuse/tests/test_revision.py --- a/swh/fuse/tests/test_revision.py +++ b/swh/fuse/tests/test_revision.py @@ -2,7 +2,9 @@ import os import time -from swh.fuse.fs.artifact import RevisionHistoryShardByPage +import dateutil.parser + +from swh.fuse.fs.artifact import RevisionHistoryShardByDate, RevisionHistoryShardByPage from swh.fuse.tests.api_url import GRAPH_API_REQUEST from swh.fuse.tests.common import ( check_dir_name_entries, @@ -75,6 +77,12 @@ if ".status" not in os.listdir(dir_by_date): break time.sleep(0.01) - assert os.listdir(dir_by_date) == ["2010"] - assert os.listdir(dir_by_date / "2010") == ["06"] - assert os.listdir(dir_by_date / "2010/06") == ["25", "24", "23", "16"] + for swhid in expected: + meta = get_data_from_web_archive(str(swhid)) + date = dateutil.parser.parse(meta["date"]) + depth1 = RevisionHistoryShardByDate.DATE_FMT.format( + year=date.year, month=date.month, day=date.day + ) + depth2 = str(swhid) + assert (dir_by_date / depth1).exists() + assert depth2 in (os.listdir(dir_by_date / depth1))