diff --git a/swh/fuse/cache.py b/swh/fuse/cache.py index 6865d31..e5917c0 100644 --- a/swh/fuse/cache.py +++ b/swh/fuse/cache.py @@ -1,300 +1,317 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from abc import ABC from collections import OrderedDict from dataclasses import dataclass, field import json import logging from pathlib import Path import re import sys from typing import Any, AsyncGenerator, Dict, List, Optional import aiosqlite from psutil import virtual_memory from swh.fuse.fs.artifact import RevisionHistoryShardByDate from swh.fuse.fs.entry import FuseDirEntry, FuseEntry from swh.fuse.fs.mountpoint import ArchiveDir, MetaDir from swh.model.exceptions import ValidationError from swh.model.identifiers import SWHID, parse_swhid from swh.web.client.client import typify_json class FuseCache: """SwhFS retrieves both metadata and file contents from the Software Heritage archive via the network. In order to obtain reasonable performances several caches are used to minimize network transfer. Caches are stored on disk in SQLite databases located at `$XDG_CACHE_HOME/swh/fuse/`. All caches are persistent (i.e., they survive the restart of the SwhFS process) and global (i.e., they are shared by concurrent SwhFS processes). We assume that no cache *invalidation* is necessary, due to intrinsic properties of the Software Heritage archive, such as integrity verification and append-only archive changes. To clean the caches one can just remove the corresponding files from disk. """ def __init__(self, cache_conf: Dict[str, Any]): self.cache_conf = cache_conf async def __aenter__(self): self.metadata = MetadataCache(self.cache_conf["metadata"]) self.blob = BlobCache(self.cache_conf["blob"]) self.history = HistoryCache(self.cache_conf["history"]) self.direntry = DirEntryCache(self.cache_conf["direntry"]) await self.metadata.__aenter__() await self.blob.__aenter__() await self.history.__aenter__() return self async def __aexit__(self, type=None, val=None, tb=None) -> None: await self.metadata.__aexit__() await self.blob.__aexit__() await self.history.__aexit__() async def get_cached_swhids(self) -> AsyncGenerator[SWHID, None]: """ Return a list of all previously cached SWHID """ # Use the metadata db since it should always contain all accessed SWHIDs metadata_cursor = await self.metadata.conn.execute( "select swhid from metadata_cache" ) swhids = await metadata_cursor.fetchall() for raw_swhid in swhids: yield parse_swhid(raw_swhid[0]) class AbstractCache(ABC): """ Abstract cache implementation to share common behavior between cache types (such as: YAML config parsing, SQLite context manager) """ def __init__(self, conf: Dict[str, Any]): self.conf = conf async def __aenter__(self): # In-memory (thus temporary) caching is useful for testing purposes if self.conf.get("in-memory", False): path = ":memory:" else: path = Path(self.conf["path"]) path.parent.mkdir(parents=True, exist_ok=True) self.conn = await aiosqlite.connect(path) return self async def __aexit__(self, type=None, val=None, tb=None) -> None: await self.conn.close() class MetadataCache(AbstractCache): """ The metadata cache map each SWHID to the complete metadata of the referenced object. This is analogous to what is available in `meta/.json` file (and generally used as data source for returning the content of those files). """ async def __aenter__(self): await super().__aenter__() await self.conn.execute( "create table if not exists metadata_cache (swhid, metadata)" ) await self.conn.commit() return self async def get(self, swhid: SWHID, typify: bool = True) -> Any: cursor = await self.conn.execute( "select metadata from metadata_cache where swhid=?", (str(swhid),) ) cache = await cursor.fetchone() if cache: metadata = json.loads(cache[0]) return typify_json(metadata, swhid.object_type) if typify else metadata else: return None async def set(self, swhid: SWHID, metadata: Any) -> None: await self.conn.execute( "insert into metadata_cache values (?, ?)", (str(swhid), json.dumps(metadata)), ) await self.conn.commit() + async def get_cached_subset(self, swhids: List[SWHID]) -> List[SWHID]: + swhids_str = ",".join(f'"{x}"' for x in swhids) + cursor = await self.conn.execute( + f"select swhid from metadata_cache where swhid in ({swhids_str})" + ) + cache = await cursor.fetchall() + + res = [] + for row in cache: + swhid = row[0] + try: + res.append(parse_swhid(swhid)) + except ValidationError: + logging.warning("Cannot parse object from metadata cache: %s", swhid) + + return res + class BlobCache(AbstractCache): """ The blob cache map SWHIDs of type `cnt` to the bytes of their archived content. The blob cache entry for a given content object is populated, at the latest, the first time the object is `read()`-d. It might be populated earlier on due to prefetching, e.g., when a directory pointing to the given content is listed for the first time. """ async def __aenter__(self): await super().__aenter__() await self.conn.execute("create table if not exists blob_cache (swhid, blob)") await self.conn.commit() return self async def get(self, swhid: SWHID) -> Optional[bytes]: cursor = await self.conn.execute( "select blob from blob_cache where swhid=?", (str(swhid),) ) cache = await cursor.fetchone() if cache: blob = cache[0] return blob else: return None async def set(self, swhid: SWHID, blob: bytes) -> None: await self.conn.execute( "insert into blob_cache values (?, ?)", (str(swhid), blob) ) await self.conn.commit() class HistoryCache(AbstractCache): """ The history cache map SWHIDs of type `rev` to a list of `rev` SWHIDs corresponding to all its revision ancestors, sorted in reverse topological order. As the parents cache, the history cache is lazily populated and can be prefetched. To efficiently store the ancestor lists, the history cache represents ancestors as graph edges (a pair of two SWHID nodes), meaning the history cache is shared amongst all revisions parents. """ async def __aenter__(self): await super().__aenter__() await self.conn.execute( """ create table if not exists history_graph ( src text not null, dst text not null, unique(src, dst) ) """ ) await self.conn.execute( "create index if not exists index_history_graph on history_graph(src)" ) await self.conn.commit() return self async def get(self, swhid: SWHID) -> Optional[List[SWHID]]: cursor = await self.conn.execute( """ with recursive dfs(node) AS ( values(?) union select history_graph.dst from history_graph join dfs on history_graph.src = dfs.node ) -- Do not keep the root node since it is not an ancestor select * from dfs limit -1 offset 1 """, (str(swhid),), ) cache = await cursor.fetchall() if not cache: return None history = [] - for parent in cache: - parent = parent[0] + for row in cache: + parent = row[0] try: history.append(parse_swhid(parent)) except ValidationError: logging.warning("Cannot parse object from history cache: %s", parent) return history async def set(self, history: str) -> None: history = history.strip() if history: edges = [edge.split(" ") for edge in history.split("\n")] await self.conn.executemany( "insert or ignore into history_graph values (?, ?)", edges ) await self.conn.commit() class DirEntryCache: """ The direntry cache map inode representing directories to the entries they contain. Each entry comes with its name as well as file attributes (i.e., all its needed to perform a detailed directory listing). Additional attributes of each directory entry should be looked up on a entry by entry basis, possibly hitting other caches. The direntry cache for a given dir is populated, at the latest, when the content of the directory is listed. More aggressive prefetching might happen. For instance, when first opening a dir a recursive listing of it can be retrieved from the remote backend and used to recursively populate the direntry cache for all (transitive) sub-directories. """ @dataclass class LRU(OrderedDict): max_ram: int used_ram: int = field(init=False, default=0) def sizeof(self, value: Any) -> int: # Rough size estimate in bytes for a list of entries return len(value) * 1000 def __getitem__(self, key: Any) -> Any: value = super().__getitem__(key) self.move_to_end(key) return value def __setitem__(self, key: Any, value: Any) -> None: if key in self: self.move_to_end(key) else: self.used_ram += self.sizeof(value) super().__setitem__(key, value) while self.used_ram > self.max_ram and self: oldest = next(iter(self)) self.used_ram -= self.sizeof(oldest) del self[oldest] def __init__(self, conf: Dict[str, Any]): m = re.match(r"(\d+)\s*(.+)\s*", conf["maxram"]) if not m: logging.error("Cannot parse direntry maxram config: %s", conf["maxram"]) sys.exit(1) num = float(m.group(1)) unit = m.group(2).upper() if unit == "%": max_ram = int(num * virtual_memory().available / 100) else: units = {"B": 1, "KB": 10 ** 3, "MB": 10 ** 6, "GB": 10 ** 9} max_ram = int(float(num) * units[unit]) self.lru_cache = self.LRU(max_ram) def get(self, direntry: FuseDirEntry) -> Optional[List[FuseEntry]]: return self.lru_cache.get(direntry.inode, None) def set(self, direntry: FuseDirEntry, entries: List[FuseEntry]) -> None: if isinstance(direntry, ArchiveDir) or isinstance(direntry, MetaDir): # The `archive/` and `meta/` are populated on the fly so we should # never cache them pass elif ( isinstance(direntry, RevisionHistoryShardByDate) and not direntry.is_status_done ): # The `by-date/' directory is populated in parallel so only cache it # once it has finished fetching all data from the API pass else: self.lru_cache[direntry.inode] = entries diff --git a/swh/fuse/cli.py b/swh/fuse/cli.py index d26807a..7f43a90 100644 --- a/swh/fuse/cli.py +++ b/swh/fuse/cli.py @@ -1,202 +1,201 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control import os from pathlib import Path from typing import Any, Dict import click from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group from swh.model.cli import SWHIDParamType # All generic config code should reside in swh.core.config DEFAULT_CONFIG_PATH = os.environ.get( "SWH_CONFIG_FILE", os.path.join(click.get_app_dir("swh"), "global.yml") ) CACHE_HOME_DIR: Path = ( Path(os.environ["XDG_CACHE_HOME"]) if "XDG_CACHE_HOME" in os.environ else Path.home() / ".cache" ) DEFAULT_CONFIG: Dict[str, Any] = { "cache": { "metadata": {"path": CACHE_HOME_DIR / "swh/fuse/metadata.sqlite"}, "blob": {"path": CACHE_HOME_DIR / "swh/fuse/blob.sqlite"}, "history": {"path": CACHE_HOME_DIR / "swh/fuse/history.sqlite"}, "direntry": {"maxram": "10%"}, }, "web-api": { "url": "https://archive.softwareheritage.org/api/1", "auth-token": None, }, - "sharding": {"depth": 1, "length": 2,}, } @swh_cli_group.group(name="fs", context_settings=CONTEXT_SETTINGS) @click.option( "-C", "--config-file", default=None, type=click.Path(exists=True, dir_okay=False, path_type=str), help=f"Configuration file (default: {DEFAULT_CONFIG_PATH})", ) @click.pass_context def fuse(ctx, config_file): """Software Heritage virtual file system""" import logging import pprint from swh.core import config if not config_file: config_file = DEFAULT_CONFIG_PATH try: logging.info("Loading configuration from: %s", config_file) conf = config.read_raw_config(config.config_basepath(config_file)) if not conf: raise ValueError(f"Cannot parse configuration file: {config_file}") if config_file == DEFAULT_CONFIG_PATH: try: conf = conf["swh"]["fuse"] except KeyError: pass # recursive merge not done by config.read conf = config.merge_configs(DEFAULT_CONFIG, conf) except Exception as err: logging.warning("Using default configuration (cannot load custom one: %s)", err) conf = DEFAULT_CONFIG logging.info("Read configuration: \n%s", pprint.pformat(conf)) ctx.ensure_object(dict) ctx.obj["config"] = conf @fuse.command(name="mount") @click.argument( "path", required=True, metavar="PATH", type=click.Path(exists=True, dir_okay=True, file_okay=False), ) @click.argument("swhids", nargs=-1, metavar="[SWHID]...", type=SWHIDParamType()) @click.option( "-f/-d", "--foreground/--daemon", default=False, help="whether to run FUSE attached to the console (foreground) " "or daemonized in the background (default: daemon)", ) @click.pass_context def mount(ctx, swhids, path, foreground): """Mount the Software Heritage virtual file system at PATH. If specified, objects referenced by the given SWHIDs will be prefetched and used to populate the virtual file system (VFS). Otherwise the VFS will be populated on-demand, when accessing its content. \b Example: \b $ mkdir swhfs $ swh fs mount swhfs/ $ grep printf swhfs/archive/swh:1:cnt:c839dea9e8e6f0528b468214348fee8669b305b2 printf("Hello, World!"); $ """ import asyncio from contextlib import ExitStack import logging from daemon import DaemonContext from swh.fuse import fuse # TODO: set default logging settings when --log-config is not passed # DEFAULT_LOG_PATH = Path(".local/swh/fuse/mount.log") with ExitStack() as stack: if not foreground: # TODO: temporary fix until swh.core has the proper logging utilities # Disable logging config before daemonizing, and reset it once # daemonized to be sure to not close file handlers logging.shutdown() # Stay in the current working directory when spawning daemon cwd = os.getcwd() stack.enter_context(DaemonContext(working_directory=cwd)) logging.config.dictConfig( { "version": 1, "handlers": { "syslog": { "class": "logging.handlers.SysLogHandler", "address": "/dev/log", }, }, "root": {"level": ctx.obj["log_level"], "handlers": ["syslog"],}, } ) conf = ctx.obj["config"] asyncio.run(fuse.main(swhids, path, conf)) @fuse.command() @click.argument( "path", required=True, metavar="PATH", type=click.Path(exists=True, dir_okay=True, file_okay=False), ) @click.pass_context def umount(ctx, path): """Unmount a mounted virtual file system. Note: this is equivalent to ``fusermount -u PATH``, which can be used to unmount any FUSE-based virtual file system. See ``man fusermount3``. """ import logging import subprocess try: subprocess.run(["fusermount", "-u", path], check=True) except subprocess.CalledProcessError as err: logging.error( "cannot unmount virtual file system: '%s' returned exit status %d", " ".join(err.cmd), err.returncode, ) ctx.exit(1) @fuse.command() @click.pass_context def clean(ctx): """Clean on-disk cache(s). """ def rm_cache(conf, cache_name): try: conf["cache"][cache_name]["path"].unlink(missing_ok=True) except KeyError: pass conf = ctx.obj["config"] for cache_name in ["blob", "metadata", "history"]: rm_cache(conf, cache_name) diff --git a/swh/fuse/fs/artifact.py b/swh/fuse/fs/artifact.py index 554448d..c11fbbe 100644 --- a/swh/fuse/fs/artifact.py +++ b/swh/fuse/fs/artifact.py @@ -1,544 +1,497 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import asyncio from dataclasses import dataclass, field from pathlib import Path -from typing import Any, AsyncIterator, Dict, List +from typing import Any, AsyncIterator, List import urllib.parse from swh.fuse.fs.entry import ( EntryMode, FuseDirEntry, FuseEntry, FuseFileEntry, FuseSymlinkEntry, ) from swh.model.from_disk import DentryPerms from swh.model.identifiers import CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT, SWHID @dataclass class Content(FuseFileEntry): """ Software Heritage content artifact. Attributes: swhid: Software Heritage persistent identifier prefetch: optional prefetched metadata used to set entry attributes Content leaves (AKA blobs) are represented on disks as regular files, containing the corresponding bytes, as archived. Note that permissions are associated to blobs only in the context of directories. Hence, when accessing blobs from the top-level `archive/` directory, the permissions of the `archive/SWHID` file will be arbitrary and not meaningful (e.g., `0x644`). """ swhid: SWHID prefetch: Any = None async def get_content(self) -> bytes: data = await self.fuse.get_blob(self.swhid) if not self.prefetch: self.prefetch = {"length": len(data)} return data async def size(self) -> int: if self.prefetch: return self.prefetch["length"] else: return len(await self.get_content()) @dataclass class Directory(FuseDirEntry): """ Software Heritage directory artifact. Attributes: swhid: Software Heritage persistent identifier Directory nodes are represented as directories on the file-system, containing one entry for each entry of the archived directory. Entry names and other metadata, including permissions, will correspond to the archived entry metadata. Note that the FUSE mount is read-only, no matter what the permissions say. So it is possible that, in the context of a directory, a file is presented as writable, whereas actually writing to it will fail with `EPERM`. """ swhid: SWHID async def compute_entries(self) -> AsyncIterator[FuseEntry]: metadata = await self.fuse.get_metadata(self.swhid) for entry in metadata: name = entry["name"] swhid = entry["target"] mode = ( # Archived permissions for directories are always set to # 0o040000 so use a read-only permission instead int(EntryMode.RDONLY_DIR) if swhid.object_type == DIRECTORY else entry["perms"] ) # 1. Regular file if swhid.object_type == CONTENT: yield self.create_child( Content, name=name, mode=mode, swhid=swhid, # The directory API has extra info we can use to set # attributes without additional Software Heritage API call prefetch=entry, ) # 2. Regular directory elif swhid.object_type == DIRECTORY: yield self.create_child( Directory, name=name, mode=mode, swhid=swhid, ) # 3. Symlink elif mode == DentryPerms.symlink: yield self.create_child( FuseSymlinkEntry, name=name, # Symlink target is stored in the blob content target=await self.fuse.get_blob(swhid), ) # 4. Submodule elif swhid.object_type == REVISION: # Make sure the revision metadata is fetched and create a # symlink to distinguish it with regular directories await self.fuse.get_metadata(swhid) yield self.create_child( FuseSymlinkEntry, name=name, target=Path(self.get_relative_root_path(), f"archive/{swhid}"), ) else: raise ValueError("Unknown directory entry type: {swhid.object_type}") @dataclass class Revision(FuseDirEntry): """ Software Heritage revision artifact. Attributes: swhid: Software Heritage persistent identifier Revision (AKA commit) nodes are represented on the file-system as directories with the following entries: - `root`: source tree at the time of the commit, as a symlink pointing into `archive/`, to a SWHID of type `dir` - `parents/` (note the plural): a virtual directory containing entries named `1`, `2`, `3`, etc., one for each parent commit. Each of these entry is a symlink pointing into `archive/`, to the SWHID file for the given parent commit - `parent` (note the singular): present if and only if the current commit has at least one parent commit (which is the most common case). When present it is a symlink pointing into `parents/1/` - `history`: a virtual directory listing all its revision ancestors, sorted in reverse topological order. Each entry is a symlink pointing into `archive/SWHID`. - `meta.json`: metadata for the current node, as a symlink pointing to the relevant `meta/.json` file """ swhid: SWHID async def compute_entries(self) -> AsyncIterator[FuseEntry]: metadata = await self.fuse.get_metadata(self.swhid) directory = metadata["directory"] parents = metadata["parents"] root_path = self.get_relative_root_path() yield self.create_child( FuseSymlinkEntry, name="root", target=Path(root_path, f"archive/{directory}"), ) yield self.create_child( FuseSymlinkEntry, name="meta.json", target=Path(root_path, f"meta/{self.swhid}.json"), ) yield self.create_child( RevisionParents, name="parents", mode=int(EntryMode.RDONLY_DIR), parents=[x["id"] for x in parents], ) if len(parents) >= 1: yield self.create_child( FuseSymlinkEntry, name="parent", target="parents/1/", ) yield self.create_child( RevisionHistory, name="history", mode=int(EntryMode.RDONLY_DIR), swhid=self.swhid, ) @dataclass class RevisionParents(FuseDirEntry): """ Revision virtual `parents/` directory """ parents: List[SWHID] async def compute_entries(self) -> AsyncIterator[FuseEntry]: root_path = self.get_relative_root_path() for i, parent in enumerate(self.parents): yield self.create_child( FuseSymlinkEntry, name=str(i + 1), target=Path(root_path, f"archive/{parent}"), ) @dataclass class RevisionHistory(FuseDirEntry): """ Revision virtual `history/` directory """ swhid: SWHID - async def compute_entries(self) -> AsyncIterator[FuseEntry]: + async def prefill_caches(self) -> None: history = await self.fuse.get_history(self.swhid) + for swhid in history: + await self.fuse.get_metadata(swhid) + + async def compute_entries(self) -> AsyncIterator[FuseEntry]: + # Run it concurrently because of the many API calls necessary + asyncio.create_task(self.prefill_caches()) - by_date = self.create_child( + yield self.create_child( RevisionHistoryShardByDate, name="by-date", mode=int(EntryMode.RDONLY_DIR), history_swhid=self.swhid, ) - # Populate the by-date/ directory in parallel because it needs to pull - # from the Web API all history SWHIDs date metadata - asyncio.create_task(by_date.fill_metadata_cache(history)) - yield by_date - by_hash = self.create_child( + yield self.create_child( RevisionHistoryShardByHash, name="by-hash", mode=int(EntryMode.RDONLY_DIR), history_swhid=self.swhid, ) - by_hash.fill_direntry_cache(history) - yield by_hash - by_page = self.create_child( + yield self.create_child( RevisionHistoryShardByPage, name="by-page", mode=int(EntryMode.RDONLY_DIR), history_swhid=self.swhid, ) - by_page.fill_direntry_cache(history) - yield by_page @dataclass class RevisionHistoryShardByDate(FuseDirEntry): """ Revision virtual `history/by-date` sharded directory """ history_swhid: SWHID prefix: str = field(default="") is_status_done: bool = field(default=False) + DATE_FMT = "{year:04d}/{month:02d}/{day:02d}/" + @dataclass class StatusFile(FuseFileEntry): """ Temporary file used to indicate loading progress in by-date/ """ name: str = field(init=False, default=".status") mode: int = field(init=False, default=int(EntryMode.RDONLY_FILE)) done: int todo: int async def get_content(self) -> bytes: fmt = f"Done: {self.done}/{self.todo}\n" return fmt.encode() async def size(self) -> int: return len(await self.get_content()) - async def fill_metadata_cache(self, swhids: List[SWHID]) -> None: - for swhid in swhids: - await self.fuse.get_metadata(swhid) - - def get_full_sharded_name(self, meta: Dict[str, Any]) -> str: - date = meta["date"] - return f"{date.year:04d}/{date.month:02d}/{date.day:02d}" - async def compute_entries(self) -> AsyncIterator[FuseEntry]: history = await self.fuse.get_history(self.history_swhid) - self.is_status_done = True + # Only check for cached revisions since fetching all of them with the + # Web API would take too long + swhids = await self.fuse.cache.metadata.get_cached_subset(history) + depth = self.prefix.count("/") - subdirs = set() - nb_entries = 0 - for swhid in history: - # Only check for cached revisions since fetching all of them with - # the Web API would take too long - meta = await self.fuse.cache.metadata.get(swhid) - if not meta: - self.is_status_done = False - continue + root_path = self.get_relative_root_path() + sharded_dirs = set() - nb_entries += 1 - name = self.get_full_sharded_name(meta) - if not name.startswith(self.prefix): + for swhid in swhids: + meta = await self.fuse.cache.metadata.get(swhid) + date = meta["date"] + sharded_name = self.DATE_FMT.format( + year=date.year, month=date.month, day=date.day + ) + if not sharded_name.startswith(self.prefix): continue - next_prefix = name.split("/")[depth] - if next_prefix not in subdirs: - subdirs.add(next_prefix) + if depth == 3: yield self.create_child( - RevisionHistoryShardByDate, - name=next_prefix, - mode=int(EntryMode.RDONLY_DIR), - prefix=f"{self.prefix}{next_prefix}/", - history_swhid=self.history_swhid, + FuseSymlinkEntry, + name=str(swhid), + target=Path(root_path, f"archive/{swhid}"), ) + # Create sharded directories + else: + next_prefix = sharded_name.split("/")[depth] + if next_prefix not in sharded_dirs: + sharded_dirs.add(next_prefix) + yield self.create_child( + RevisionHistoryShardByDate, + name=next_prefix, + mode=int(EntryMode.RDONLY_DIR), + prefix=f"{self.prefix}{next_prefix}/", + history_swhid=self.history_swhid, + ) - if not self.is_status_done: + self.is_status_done = len(swhids) == len(history) + if not self.is_status_done and depth == 0: yield self.create_child( RevisionHistoryShardByDate.StatusFile, - done=nb_entries, + done=len(swhids), todo=len(history), ) @dataclass class RevisionHistoryShardByHash(FuseDirEntry): """ Revision virtual `history/by-hash` sharded directory """ history_swhid: SWHID prefix: str = field(default="") - def get_full_sharded_name(self, swhid: SWHID) -> str: - sharding_depth = self.fuse.conf["sharding"]["depth"] - sharding_length = self.fuse.conf["sharding"]["length"] - if sharding_depth <= 0: - return str(swhid) - else: - basename = swhid.object_id - parts = [ - basename[i * sharding_length : (i + 1) * sharding_length] - for i in range(sharding_depth) - ] - # Always keep the full SWHID as the path basename (otherwise we - # loose the SWHID object type information) - parts.append(str(swhid)) - path = Path(*parts) - return str(path) - - def fill_direntry_cache(self, swhids: List[SWHID]): - sharding_depth = self.fuse.conf["sharding"]["depth"] - sharding_length = self.fuse.conf["sharding"]["length"] - depth = self.prefix.count("/") - children = [] - if depth == sharding_depth: + SHARDING_LENGTH = 2 + + async def compute_entries(self) -> AsyncIterator[FuseEntry]: + history = await self.fuse.get_history(self.history_swhid) + + if self.prefix: root_path = self.get_relative_root_path() - for swhid in swhids: - children.append( - self.create_child( + for swhid in history: + if swhid.object_id.startswith(self.prefix): + yield self.create_child( FuseSymlinkEntry, name=str(swhid), target=Path(root_path, f"archive/{swhid}"), ) - ) + # Create sharded directories else: - subdirs = {} - prefix_len = len(self.prefix) - for swhid in swhids: - name = self.get_full_sharded_name(swhid) - next_prefix = name[prefix_len : prefix_len + sharding_length] - subdirs.setdefault(next_prefix, []).append(swhid) - - # Recursive intermediate sharded directories - for subdir, subentries in subdirs.items(): - child_prefix = f"{self.prefix}{subdir}/" - child = self.create_child( - RevisionHistoryShardByHash, - name=subdir, - mode=int(EntryMode.RDONLY_DIR), - prefix=child_prefix, - history_swhid=self.history_swhid, - ) - children.append(child) - child.fill_direntry_cache(subentries) - self.fuse.cache.direntry.set(self, children) - return children - - async def compute_entries(self) -> AsyncIterator[FuseEntry]: - history = await self.fuse.get_history(self.history_swhid) - hash_prefix = self.prefix.replace("/", "") - swhids = [s for s in history if s.object_id.startswith(hash_prefix)] - - for entry in self.fill_direntry_cache(swhids): - yield entry + sharded_dirs = set() + for swhid in history: + next_prefix = swhid.object_id[: self.SHARDING_LENGTH] + if next_prefix not in sharded_dirs: + sharded_dirs.add(next_prefix) + yield self.create_child( + RevisionHistoryShardByHash, + name=next_prefix, + mode=int(EntryMode.RDONLY_DIR), + prefix=next_prefix, + history_swhid=self.history_swhid, + ) @dataclass class RevisionHistoryShardByPage(FuseDirEntry): """ Revision virtual `history/by-page` sharded directory """ history_swhid: SWHID + prefix: int = field(default=None) PAGE_SIZE = 10_000 PAGE_FMT = "{page_number:03d}" - def fill_direntry_cache(self, swhids: List[SWHID]): - page_number = -1 - page = None - page_root_path = None - page_children = [] - pages = [] - for idx, swhid in enumerate(swhids): - if idx % self.PAGE_SIZE == 0: - if page: - self.fuse.cache.direntry.set(page, page_children) - pages.append(page) - - page_number += 1 - page = self.create_child( + async def compute_entries(self) -> AsyncIterator[FuseEntry]: + history = await self.fuse.get_history(self.history_swhid) + + if self.prefix is not None: + current_page = self.prefix + root_path = self.get_relative_root_path() + max_idx = min(len(history), (current_page + 1) * self.PAGE_SIZE) + for i in range(current_page * self.PAGE_SIZE, max_idx): + swhid = history[i] + yield self.create_child( + FuseSymlinkEntry, + name=str(swhid), + target=Path(root_path, f"archive/{swhid}"), + ) + # Create sharded directories + else: + for i in range(0, len(history), self.PAGE_SIZE): + page_number = i // self.PAGE_SIZE + yield self.create_child( RevisionHistoryShardByPage, name=self.PAGE_FMT.format(page_number=page_number), mode=int(EntryMode.RDONLY_DIR), history_swhid=self.history_swhid, + prefix=page_number, ) - page_root_path = page.get_relative_root_path() - page_children = [] - - page_children.append( - page.create_child( - FuseSymlinkEntry, - name=str(swhid), - target=Path(page_root_path, f"archive/{swhid}"), - ) - ) - - if page: - self.fuse.cache.direntry.set(page, page_children) - pages.append(page) - self.fuse.cache.direntry.set(self, pages) - return pages - - async def compute_entries(self) -> AsyncIterator[FuseEntry]: - history = await self.fuse.get_history(self.history_swhid) - for entry in self.fill_direntry_cache(history): - yield entry @dataclass class Release(FuseDirEntry): """ Software Heritage release artifact. Attributes: swhid: Software Heritage persistent identifier Release nodes are represented on the file-system as directories with the following entries: - `target`: target node, as a symlink to `archive/` - `target_type`: regular file containing the type of the target SWHID - `root`: present if and only if the release points to something that (transitively) resolves to a directory. When present it is a symlink pointing into `archive/` to the SWHID of the given directory - `meta.json`: metadata for the current node, as a symlink pointing to the relevant `meta/.json` file """ swhid: SWHID async def find_root_directory(self, swhid: SWHID) -> SWHID: if swhid.object_type == RELEASE: metadata = await self.fuse.get_metadata(swhid) return await self.find_root_directory(metadata["target"]) elif swhid.object_type == REVISION: metadata = await self.fuse.get_metadata(swhid) return metadata["directory"] elif swhid.object_type == DIRECTORY: return swhid else: return None async def compute_entries(self) -> AsyncIterator[FuseEntry]: metadata = await self.fuse.get_metadata(self.swhid) root_path = self.get_relative_root_path() yield self.create_child( FuseSymlinkEntry, name="meta.json", target=Path(root_path, f"meta/{self.swhid}.json"), ) target = metadata["target"] yield self.create_child( FuseSymlinkEntry, name="target", target=Path(root_path, f"archive/{target}") ) yield self.create_child( ReleaseType, name="target_type", mode=int(EntryMode.RDONLY_FILE), target_type=target.object_type, ) target_dir = await self.find_root_directory(target) if target_dir is not None: yield self.create_child( FuseSymlinkEntry, name="root", target=Path(root_path, f"archive/{target_dir}"), ) @dataclass class ReleaseType(FuseFileEntry): """ Release type virtual file """ target_type: str async def get_content(self) -> bytes: return str.encode(self.target_type + "\n") async def size(self) -> int: return len(await self.get_content()) @dataclass class Snapshot(FuseDirEntry): """ Software Heritage snapshot artifact. Attributes: swhid: Software Heritage persistent identifier Snapshot nodes are represented on the file-system as directories with one entry for each branch in the snapshot. Each entry is a symlink pointing into `archive/` to the branch target SWHID. Branch names are URL encoded (hence '/' are replaced with '%2F'). """ swhid: SWHID async def compute_entries(self) -> AsyncIterator[FuseEntry]: metadata = await self.fuse.get_metadata(self.swhid) root_path = self.get_relative_root_path() for branch_name, branch_meta in metadata.items(): # Mangle branch name to create a valid UNIX filename name = urllib.parse.quote_plus(branch_name) yield self.create_child( FuseSymlinkEntry, name=name, target=Path(root_path, f"archive/{branch_meta['target']}"), ) OBJTYPE_GETTERS = { CONTENT: Content, DIRECTORY: Directory, REVISION: Revision, RELEASE: Release, SNAPSHOT: Snapshot, } diff --git a/swh/fuse/tests/test_revision.py b/swh/fuse/tests/test_revision.py index 5c9c271..8011f14 100644 --- a/swh/fuse/tests/test_revision.py +++ b/swh/fuse/tests/test_revision.py @@ -1,80 +1,88 @@ import json import os import time -from swh.fuse.fs.artifact import RevisionHistoryShardByPage +import dateutil.parser + +from swh.fuse.fs.artifact import RevisionHistoryShardByDate, RevisionHistoryShardByPage from swh.fuse.tests.api_url import GRAPH_API_REQUEST from swh.fuse.tests.common import ( check_dir_name_entries, get_data_from_graph_archive, get_data_from_web_archive, ) from swh.fuse.tests.data.config import REV_SMALL_HISTORY, ROOT_DIR, ROOT_REV from swh.model.identifiers import parse_swhid def test_access_meta(fuse_mntdir): file_path = fuse_mntdir / "archive" / ROOT_REV / "meta.json" expected = json.dumps(get_data_from_web_archive(ROOT_REV)) assert file_path.read_text() == expected def test_list_root(fuse_mntdir): dir_path = fuse_mntdir / "archive" / ROOT_REV / "root" check_dir_name_entries(dir_path, ROOT_DIR) def test_list_parents(fuse_mntdir): rev_meta = get_data_from_web_archive(ROOT_REV) dir_path = fuse_mntdir / "archive" / ROOT_REV / "parents" for i, parent in enumerate(rev_meta["parents"]): parent_path = dir_path / str(i + 1) parent_swhid = f"swh:1:rev:{parent['id']}" assert parent_path.is_symlink() assert os.readlink(parent_path) == f"../../../archive/{parent_swhid}" def test_list_parent(fuse_mntdir): file_path = fuse_mntdir / "archive" / ROOT_REV / "parent" assert file_path.is_symlink() assert os.readlink(file_path) == "parents/1/" def test_list_history(fuse_mntdir): dir_path = fuse_mntdir / "archive" / REV_SMALL_HISTORY / "history" assert os.listdir(dir_path) == ["by-date", "by-hash", "by-page"] history_meta = get_data_from_graph_archive( REV_SMALL_HISTORY, GRAPH_API_REQUEST.HISTORY ) history = history_meta.strip() # Only keep second node in the edge because first node is redundant # information or the root node (hence not an ancestor) expected = set( map(parse_swhid, [edge.split(" ")[1] for edge in history.split("\n")]) ) dir_by_hash = dir_path / "by-hash" for swhid in expected: depth1 = swhid.object_id[:2] depth2 = str(swhid) assert (dir_by_hash / depth1).exists() assert depth2 in (os.listdir(dir_by_hash / depth1)) dir_by_page = dir_path / "by-page" for idx, swhid in enumerate(expected): page_number = idx // RevisionHistoryShardByPage.PAGE_SIZE depth1 = RevisionHistoryShardByPage.PAGE_FMT.format(page_number=page_number) depth2 = str(swhid) assert (dir_by_page / depth1).exists() assert depth2 in (os.listdir(dir_by_page / depth1)) dir_by_date = dir_path / "by-date" # Wait max 1 second to populate by-date/ dir for i in range(100): if ".status" not in os.listdir(dir_by_date): break time.sleep(0.01) - assert os.listdir(dir_by_date) == ["2010"] - assert os.listdir(dir_by_date / "2010") == ["06"] - assert os.listdir(dir_by_date / "2010/06") == ["25", "24", "23", "16"] + for swhid in expected: + meta = get_data_from_web_archive(str(swhid)) + date = dateutil.parser.parse(meta["date"]) + depth1 = RevisionHistoryShardByDate.DATE_FMT.format( + year=date.year, month=date.month, day=date.day + ) + depth2 = str(swhid) + assert (dir_by_date / depth1).exists() + assert depth2 in (os.listdir(dir_by_date / depth1))