diff --git a/swh/fuse/cli.py b/swh/fuse/cli.py --- a/swh/fuse/cli.py +++ b/swh/fuse/cli.py @@ -36,6 +36,7 @@ "url": "https://archive.softwareheritage.org/api/1", "auth-token": None, }, + "sharding": {"depth": 1, "length": 2,}, } diff --git a/swh/fuse/fs/artifact.py b/swh/fuse/fs/artifact.py --- a/swh/fuse/fs/artifact.py +++ b/swh/fuse/fs/artifact.py @@ -114,7 +114,10 @@ yield self.create_child( FuseSymlinkEntry, name=name, - target=Path(self.get_relative_root_path(), f"archive/{swhid}"), + target=Path( + self.get_relative_root_path(), + f"archive/{self.get_sharded_name(swhid)}", + ), ) else: raise ValueError("Unknown directory entry type: {swhid.object_type}") @@ -157,7 +160,7 @@ yield self.create_child( FuseSymlinkEntry, name="root", - target=Path(root_path, f"archive/{directory}"), + target=Path(root_path, f"archive/{self.get_sharded_name(directory)}"), ) yield self.create_child( FuseSymlinkEntry, @@ -196,7 +199,7 @@ yield self.create_child( FuseSymlinkEntry, name=str(i + 1), - target=Path(root_path, f"archive/{parent}"), + target=Path(root_path, f"archive/{self.get_sharded_name(parent)}"), ) @@ -213,7 +216,7 @@ yield self.create_child( FuseSymlinkEntry, name=str(swhid), - target=Path(root_path, f"archive/{swhid}"), + target=Path(root_path, f"archive/{self.get_sharded_name(swhid)}"), ) @@ -261,7 +264,9 @@ target = metadata["target"] yield self.create_child( - FuseSymlinkEntry, name="target", target=Path(root_path, f"archive/{target}") + FuseSymlinkEntry, + name="target", + target=Path(root_path, f"archive/{self.get_sharded_name(target)}"), ) yield self.create_child( ReleaseType, @@ -275,7 +280,7 @@ yield self.create_child( FuseSymlinkEntry, name="root", - target=Path(root_path, f"archive/{target_dir}"), + target=Path(root_path, f"archive/{self.get_sharded_name(target_dir)}"), ) @@ -316,7 +321,9 @@ yield self.create_child( FuseSymlinkEntry, name=name, - target=Path(root_path, f"archive/{branch_meta['target']}"), + target=Path( + root_path, f"archive/{self.get_sharded_name(branch_meta['target'])}" + ), ) diff --git a/swh/fuse/fs/entry.py b/swh/fuse/fs/entry.py --- a/swh/fuse/fs/entry.py +++ b/swh/fuse/fs/entry.py @@ -9,7 +9,9 @@ from enum import IntEnum from pathlib import Path from stat import S_IFDIR, S_IFLNK, S_IFREG -from typing import Any, Union +from typing import Any, List, Union + +from swh.model.identifiers import SWHID # Avoid cycling import Fuse = "Fuse" @@ -56,6 +58,35 @@ def get_relative_root_path(self) -> str: return "../" * (self.depth - 1) + def get_sharded_name(self, swhid: SWHID) -> str: + sharding = self.fuse.conf["sharding"] + if sharding["depth"] <= 0: + return str(swhid) + else: + basename = swhid.object_id + name, i = "", 0 + for _ in range(sharding["depth"]): + name += basename[i : i + sharding["length"]] + name += "/" + i += sharding["length"] + # Always keep the full SWHID as the path basename (otherwise we + # loose the SWHID object type information) + name += str(swhid) + return name + + def get_sharded_next_prefixes( + self, current_prefix: str, entries: List[SWHID] + ) -> List[SWHID]: + prefix_len = len(current_prefix) + next_prefixes = set() + for swhid in entries: + next_prefix = swhid.object_id[ + prefix_len : prefix_len + self.fuse.conf["sharding"]["length"] + ] + + next_prefixes.add(next_prefix) + return list(next_prefixes) + def create_child(self, constructor: Any, **kwargs) -> FuseEntry: return constructor(depth=self.depth + 1, fuse=self.fuse, **kwargs) @@ -89,6 +120,41 @@ return None +# @dataclass +# class FuseDirEntrySharded(FuseDirEntry): +# """ TODO """ +# +# entries: List[FuseEntry] +# sharding_depth: int = field(default=0) +# prefix: str = field(default="") +# mode: int = field(init=False, default=int(EntryMode.RDONLY_DIR)) +# +# async def __aiter__(self): +# if self.sharding_depth == self.fuse.conf["sharding"]["depth"]: +# for entry in self.entries: +# entry.depth = self.depth +# yield entry +# else: +# subdirs = {} +# for entry in self.entries: +# name = self.get_sharded_name(entry.swhid) +# prefix_len = len(self.prefix) +# +# next_prefix = name[ +# prefix_len : prefix_len + self.fuse.conf["sharding"]["length"] +# ] +# subdirs.setdefault(next_prefix, []).append(entry) +# +# for subdir, subentries in subdirs.items(): +# yield self.create_child( +# FuseDirEntrySharded, +# name=subdir, +# prefix=f"{self.prefix}{subdir}/", +# entries=subentries, +# sharding_depth=self.sharding_depth + 1, +# ) + + @dataclass class FuseSymlinkEntry(FuseEntry): """ FUSE virtual symlink entry diff --git a/swh/fuse/fs/mountpoint.py b/swh/fuse/fs/mountpoint.py --- a/swh/fuse/fs/mountpoint.py +++ b/swh/fuse/fs/mountpoint.py @@ -31,9 +31,15 @@ """ The archive/ directory is lazily populated with one entry per accessed SWHID, having actual SWHIDs as names """ + prefix: str = field(default="") name: str = field(init=False, default="archive") mode: int = field(init=False, default=int(EntryMode.RDONLY_DIR)) + def __post_init__(self): + super().__post_init__() + if self.prefix: + self.name = self.prefix[-self.fuse.conf["sharding"]["length"] :] + def create_child(self, swhid: SWHID) -> FuseEntry: if swhid.object_type == CONTENT: mode = EntryMode.RDONLY_FILE @@ -47,21 +53,34 @@ ) async def __aiter__(self) -> AsyncIterator[FuseEntry]: + entries = [] async for swhid in self.fuse.cache.get_cached_swhids(): - yield self.create_child(swhid) + if swhid.object_id.startswith(self.prefix): + entries.append(swhid) + + sharding_depth = len(self.prefix) // self.fuse.conf["sharding"]["length"] + if sharding_depth < self.fuse.conf["sharding"]["depth"]: + # Sharding intermediate directories + for prefix in self.get_sharded_next_prefixes(self.prefix, entries): + yield super().create_child(ArchiveDir, prefix=self.prefix + prefix) + else: + # Real archive entries + for swhid in entries: + yield self.create_child(swhid) async def lookup(self, name: str) -> FuseEntry: entry = await super().lookup(name) if entry: return entry - # On the fly mounting of a new artifact try: + # On the fly mounting of a new artifact swhid = parse_swhid(name) await self.fuse.get_metadata(swhid) return self.create_child(swhid) except ValidationError: - return None + # Sharded intermediate directory + return super().create_child(ArchiveDir, prefix=self.prefix + name) @dataclass @@ -73,17 +92,43 @@ branches) the JSON file will contain a complete version with all pages merged together. """ + prefix: str = field(default="") name: str = field(init=False, default="meta") mode: int = field(init=False, default=int(EntryMode.RDONLY_DIR)) + def __post_init__(self): + super().__post_init__() + if self.prefix: + self.name = self.prefix[-self.fuse.conf["sharding"]["length"] :] + async def __aiter__(self) -> AsyncIterator[FuseEntry]: + entries = [] async for swhid in self.fuse.cache.get_cached_swhids(): - yield self.create_child( - MetaEntry, - name=f"{swhid}.json", - mode=int(EntryMode.RDONLY_FILE), - swhid=swhid, - ) + if swhid.object_id.startswith(self.prefix): + entries.append(swhid) + + sharding_depth = len(self.prefix) // self.fuse.conf["sharding"]["length"] + if sharding_depth < self.fuse.conf["sharding"]["depth"]: + # Sharding intermediate directories + for prefix in self.get_sharded_next_prefixes(self.prefix, entries): + yield self.create_child(MetaDir, prefix=self.prefix + prefix) + else: + # Real meta entries + for swhid in entries: + yield self.create_child( + MetaEntry, + name=f"{swhid}.json", + mode=int(EntryMode.RDONLY_FILE), + swhid=swhid, + ) + + async def lookup(self, name: str) -> FuseEntry: + entry = await super().lookup(name) + if entry: + return entry + + # Sharded intermediate directory + return super().create_child(MetaDir, prefix=self.prefix + name) @dataclass diff --git a/swh/fuse/fuse.py b/swh/fuse/fuse.py --- a/swh/fuse/fuse.py +++ b/swh/fuse/fuse.py @@ -35,6 +35,7 @@ self._inode2entry: Dict[int, FuseEntry] = {} self.root = Root(fuse=self) + self.conf = conf self.time_ns: int = time.time_ns() # start time, used as timestamp self.gid = os.getgid() diff --git a/swh/fuse/tests/conftest.py b/swh/fuse/tests/conftest.py --- a/swh/fuse/tests/conftest.py +++ b/swh/fuse/tests/conftest.py @@ -41,6 +41,7 @@ "history": {"in-memory": True}, }, "web-api": {"url": API_URL, "auth-token": None}, + "sharding": {"depth": 0}, } # Run FUSE in foreground mode but in a separate process, so it does not