diff --git a/swh/fuse/cache.py b/swh/fuse/cache.py index 7290e03..134c448 100644 --- a/swh/fuse/cache.py +++ b/swh/fuse/cache.py @@ -1,154 +1,150 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from abc import ABC from contextlib import closing import json -from pathlib import Path import sqlite3 -from typing import Any, Optional, Set +from typing import Any, Dict, Optional, Set from swh.model.identifiers import SWHID, parse_swhid from swh.web.client.client import typify -class Cache: +class FuseCache: """ SWH FUSE retrieves both metadata and file contents from the Software Heritage archive via the network. In order to obtain reasonable performances several caches are used to minimize network transfer. Caches are stored on disk in SQLite databases located at - `$XDG_CACHE_HOME/swh/fuse/cache.sqlite`. + `$XDG_CACHE_HOME/swh/fuse/`. All caches are persistent (i.e., they survive the restart of the SWH FUSE process) and global (i.e., they are shared by concurrent SWH FUSE processes). We assume that no cache *invalidation* is necessary, due to intrinsic properties of the Software Heritage archive, such as integrity verification and append-only archive changes. To clean the caches one can just remove the corresponding files from disk. """ - def __init__(self, cache_dir: Path): - self.cache_dir = cache_dir + def __init__(self, cache_conf: Dict[str, Any]): + self.cache_conf = cache_conf def __enter__(self): - # In-memory (thus temporary) caching is useful for testing purposes - in_memory = ":memory:" - if str(self.cache_dir) == in_memory: - metadata_cache_path = in_memory - blob_cache_path = in_memory - else: - metadata_cache_path = self.cache_dir / "metadata.sqlite" - blob_cache_path = self.cache_dir / "blob.sqlite" - - self.metadata = MetadataCache(metadata_cache_path) + self.metadata = MetadataCache(self.cache_conf["metadata"]) self.metadata.__enter__() - self.blob = BlobCache(blob_cache_path) + self.blob = BlobCache(self.cache_conf["blob"]) self.blob.__enter__() - return self def __exit__(self, type=None, val=None, tb=None) -> None: self.metadata.__exit__() self.blob.__exit__() def get_cached_swhids(self) -> Set[SWHID]: """ Return a list of all previously cached SWHID """ with closing(self.metadata.conn.cursor()) as metadata_cursor, ( closing(self.blob.conn.cursor()) ) as blob_cursor: # Some entries can be in one cache but not in the other so create a # set from all caches metadata_cursor.execute("select swhid from metadata_cache") blob_cursor.execute("select swhid from blob_cache") swhids = metadata_cursor.fetchall() + blob_cursor.fetchall() swhids = [parse_swhid(x[0]) for x in swhids] return set(swhids) -class MetadataCache: - """ The metadata cache map each SWHID to the complete metadata of the - referenced object. This is analogous to what is available in - `meta/.json` file (and generally used as data source for returning - the content of those files). """ +class AbstractCache(ABC): + """ Abstract cache implementation to share common behavior between cache + types (such as: YAML config parsing, SQLite context manager) """ - def __init__(self, path: str): - self.path = path + def __init__(self, conf: Dict[str, Any]): + self.conf = conf def __enter__(self): - self.conn = sqlite3.connect(self.path) - with self.conn: - self.conn.execute( - "create table if not exists metadata_cache (swhid, metadata)" - ) + # In-memory (thus temporary) caching is useful for testing purposes + if self.conf.get("in-memory", False): + path = ":memory:" + else: + path = self.conf["path"] + self.conn = sqlite3.connect(path) return self def __exit__(self, type=None, val=None, tb=None) -> None: self.conn.close() + +class MetadataCache(AbstractCache): + """ The metadata cache map each SWHID to the complete metadata of the + referenced object. This is analogous to what is available in + `meta/.json` file (and generally used as data source for returning + the content of those files). """ + + def __enter__(self): + super().__enter__() + with self.conn as conn: + conn.execute("create table if not exists metadata_cache (swhid, metadata)") + return self + def __getitem__(self, swhid: SWHID) -> Any: - with closing(self.conn.cursor()) as cursor: + with self.conn as conn, closing(conn.cursor()) as cursor: cursor.execute( "select metadata from metadata_cache where swhid=?", (str(swhid),) ) cache = cursor.fetchone() if cache: metadata = json.loads(cache[0]) return typify(metadata, swhid.object_type) else: return None def __setitem__(self, swhid: SWHID, metadata: Any) -> None: - with closing(self.conn.cursor()) as cursor: - cursor.execute( + with self.conn as conn: + conn.execute( "insert into metadata_cache values (?, ?)", ( str(swhid), json.dumps( metadata, # Converts the typified JSON to plain str version default=lambda x: ( x.object_id if isinstance(x, SWHID) else x.__dict__ ), ), ), ) -class BlobCache: +class BlobCache(AbstractCache): """ The blob cache map SWHIDs of type `cnt` to the bytes of their archived content. The blob cache entry for a given content object is populated, at the latest, the first time the object is `read()`-d. It might be populated earlier on due to prefetching, e.g., when a directory pointing to the given content is listed for the first time. """ - def __init__(self, path: str): - self.path = path - def __enter__(self): - self.conn = sqlite3.connect(self.path) - with self.conn: - self.conn.execute("create table if not exists blob_cache (swhid, blob)") + super().__enter__() + with self.conn as conn: + conn.execute("create table if not exists blob_cache (swhid, blob)") return self - def __exit__(self, type=None, val=None, tb=None) -> None: - self.conn.close() - def __getitem__(self, swhid: SWHID) -> Optional[str]: - with closing(self.conn.cursor()) as cursor: + with self.conn as conn, closing(conn.cursor()) as cursor: cursor.execute("select blob from blob_cache where swhid=?", (str(swhid),)) cache = cursor.fetchone() if cache: blob = cache[0] return blob else: return None def __setitem__(self, swhid: SWHID, blob: str) -> None: - with closing(self.conn.cursor()) as cursor: - cursor.execute("insert into blob_cache values (?, ?)", (str(swhid), blob)) + with self.conn as conn: + conn.execute("insert into blob_cache values (?, ?)", (str(swhid), blob)) diff --git a/swh/fuse/cli.py b/swh/fuse/cli.py index fdbcf7a..d47d033 100644 --- a/swh/fuse/cli.py +++ b/swh/fuse/cli.py @@ -1,127 +1,121 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from contextlib import ExitStack import os # WARNING: do not import unnecessary things here to keep cli startup time under # control from pathlib import Path -from typing import Any, Dict +from typing import Any, Dict, Tuple import click +from daemon import DaemonContext # from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.model.identifiers import SWHID # All generic config code should reside in swh.core.config DEFAULT_CONFIG_PATH = os.environ.get( "SWH_CONFIG_FILE", os.path.join(click.get_app_dir("swh"), "global.yml") ) CACHE_HOME_DIR: Path = ( Path(os.environ["XDG_CACHE_HOME"]) if "XDG_CACHE_HOME" in os.environ - else Path(Path.home(), ".cache") + else Path.home() / ".cache" ) -DEFAULT_CONFIG: Dict[str, Any] = { - "cache-dir": Path(CACHE_HOME_DIR, "swh", "fuse"), - "web-api": { - "url": "https://archive.softwareheritage.org/api/1", - "auth-token": None, - }, +DEFAULT_CONFIG: Dict[str, Tuple[str, Any]] = { + "cache": ( + "dict", + { + "metadata": {"path": CACHE_HOME_DIR / "swh/fuse/metadata.sqlite"}, + "blob": {"path": CACHE_HOME_DIR / "swh/fuse/blob.sqlite"}, + }, + ), + "web-api": ( + "dict", + {"url": "https://archive.softwareheritage.org/api/1", "auth-token": None,}, + ), } class SWHIDParamType(click.ParamType): """Click argument that accepts SWHID and return them as :class:`swh.model.identifiers.SWHID` instances """ name = "SWHID" def convert(self, value, param, ctx) -> SWHID: from swh.model.exceptions import ValidationError from swh.model.identifiers import parse_swhid try: return parse_swhid(value) except ValidationError: self.fail(f'"{value}" is not a valid SWHID', param, ctx) @click.group(name="fuse", context_settings=CONTEXT_SETTINGS) # XXX conffile logic temporarily commented out due to: # XXX https://forge.softwareheritage.org/T2632 # @click.option( # "-C", # "--config-file", # default=DEFAULT_CONFIG_PATH, # type=click.Path(exists=True, dir_okay=False, path_type=str), # help="YAML configuration file", # ) @click.pass_context def cli(ctx): """Software Heritage virtual file system""" # # recursive merge not done by config.read # conf = config.read_raw_config(config.config_basepath(config_file)) # conf = config.merge_configs(DEFAULT_CONFIG, conf) conf = {} ctx.ensure_object(dict) ctx.obj["config"] = conf @cli.command() @click.argument( "path", required=True, metavar="PATH", type=click.Path(exists=True, dir_okay=True, file_okay=False), ) @click.argument("swhids", nargs=-1, metavar="[SWHID]...", type=SWHIDParamType()) @click.option( - "-c", - "--cache-dir", - default=DEFAULT_CONFIG["cache-dir"], - metavar="CACHE_DIR", - show_default=True, - type=click.Path(dir_okay=True, file_okay=False), - help=""" - directory where to store cache from the Software Heritage API. To - store all the cache in memory instead of disk, use a value of ':memory:'. - """, -) -@click.option( - "-u", - "--api-url", - default=DEFAULT_CONFIG["web-api"]["url"], - metavar="API_URL", - show_default=True, - help="base URL for Software Heritage Web API", + "--config-file", + "-C", + default=None, + type=click.Path(exists=True, dir_okay=False,), + help="YAML configuration file", ) @click.option( "-f", "--foreground", is_flag=True, show_default=True, help="Run FUSE system in foreground instead of daemon", ) @click.pass_context -def mount(ctx, swhids, path, cache_dir, api_url, foreground): +def mount(ctx, swhids, path, config_file, foreground): """ Mount the Software Heritage archive at the given mount point """ + from swh.core import config from swh.fuse import fuse - if foreground: - fuse.main(swhids, path, cache_dir, api_url) - else: - import daemon - - with daemon.DaemonContext(): - fuse.main(swhids, path, cache_dir, api_url) + conf = config.read(config_file, DEFAULT_CONFIG) + with ExitStack() as stack: + if not foreground: + stack.enter_context(DaemonContext()) + fuse.main(swhids, path, conf) diff --git a/swh/fuse/fs/mountpoint.py b/swh/fuse/fs/mountpoint.py index 3d068e0..1b66c3d 100644 --- a/swh/fuse/fs/mountpoint.py +++ b/swh/fuse/fs/mountpoint.py @@ -1,56 +1,56 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Iterator -from swh.fuse.cache import Cache +from swh.fuse.cache import FuseCache from swh.fuse.fs.entry import ( ARCHIVE_DIRENTRY, META_DIRENTRY, ArtifactEntry, EntryMode, VirtualEntry, ) class Root: """ The FUSE mountpoint, consisting of the archive/ and meta/ directories """ def __iter__(self) -> Iterator[VirtualEntry]: entries = [ARCHIVE_DIRENTRY, META_DIRENTRY] return iter(entries) class Archive: """ The archive/ directory is lazily populated with one entry per accessed SWHID, having actual SWHIDs as names """ - def __init__(self, cache: Cache): + def __init__(self, cache: FuseCache): self.cache = cache def __iter__(self) -> Iterator[VirtualEntry]: entries = [] for swhid in self.cache.get_cached_swhids(): entries.append(ArtifactEntry(str(swhid), swhid)) return iter(entries) class Meta: """ The meta/ directory contains one SWHID.json file for each SWHID entry under archive/. The JSON file contain all available meta information about the given SWHID, as returned by the Software Heritage Web API for that object. Note that, in case of pagination (e.g., snapshot objects with many branches) the JSON file will contain a complete version with all pages merged together. """ - def __init__(self, cache: Cache): + def __init__(self, cache: FuseCache): self.cache = cache def __iter__(self) -> Iterator[VirtualEntry]: entries = [] for swhid in self.cache.get_cached_swhids(): filename = str(swhid) + ".json" entries.append(VirtualEntry(filename, EntryMode.RDONLY_FILE)) return iter(entries) diff --git a/swh/fuse/fuse.py b/swh/fuse/fuse.py index 2d22327..b9ec4e6 100644 --- a/swh/fuse/fuse.py +++ b/swh/fuse/fuse.py @@ -1,327 +1,333 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import asyncio import errno import itertools import logging import os from pathlib import Path import time from typing import Any, Dict, List import pyfuse3 import pyfuse3_asyncio import requests -from swh.fuse.cache import Cache +from swh.fuse.cache import FuseCache from swh.fuse.fs.artifact import Content, Directory from swh.fuse.fs.entry import ( ARCHIVE_DIRENTRY, META_DIRENTRY, ROOT_DIRENTRY, ArtifactEntry, EntryMode, VirtualEntry, ) from swh.fuse.fs.mountpoint import Archive, Meta, Root from swh.model.identifiers import CONTENT, DIRECTORY, SWHID, parse_swhid from swh.web.client.client import WebAPIClient class Fuse(pyfuse3.Operations): """ Software Heritage Filesystem in Userspace (FUSE). Locally mount parts of the archive and navigate it as a virtual file system. """ def __init__( - self, swhids: List[SWHID], root_path: Path, api_url: str, cache: Cache + self, + swhids: List[SWHID], + root_path: Path, + cache: FuseCache, + conf: Dict[str, Any], ): super(Fuse, self).__init__() self._next_inode: int = pyfuse3.ROOT_INODE self._next_fd: int = 0 root_inode = self._next_inode self._next_inode += 1 self._inode2entry: Dict[int, VirtualEntry] = {root_inode: ROOT_DIRENTRY} self._entry2inode: Dict[VirtualEntry, int] = {ROOT_DIRENTRY: root_inode} self._entry2fd: Dict[VirtualEntry, int] = {} self._fd2entry: Dict[int, VirtualEntry] = {} self._inode2path: Dict[int, Path] = {root_inode: root_path} self.time_ns: int = time.time_ns() # start time, used as timestamp self.gid = os.getgid() self.uid = os.getuid() - self.web_api = WebAPIClient(api_url) + self.web_api = WebAPIClient( + conf["web-api"]["url"], conf["web-api"]["auth-token"] + ) self.cache = cache # Initially populate the cache for swhid in swhids: self.get_metadata(swhid) def shutdown(self) -> None: pass def _alloc_inode(self, entry: VirtualEntry) -> int: """ Return a unique inode integer for a given entry """ try: return self._entry2inode[entry] except KeyError: inode = self._next_inode self._next_inode += 1 self._entry2inode[entry] = inode self._inode2entry[inode] = entry # TODO add inode recycling with invocation to invalidate_inode when # the dicts get too big return inode def _alloc_fd(self, entry: VirtualEntry) -> int: """ Return a unique file descriptor integer for a given entry """ try: return self._entry2fd[entry] except KeyError: fd = self._next_fd self._next_fd += 1 self._entry2fd[entry] = fd self._fd2entry[fd] = entry return fd def inode2entry(self, inode: int) -> VirtualEntry: """ Return the entry matching a given inode """ try: return self._inode2entry[inode] except KeyError: raise pyfuse3.FUSEError(errno.ENOENT) def entry2inode(self, entry: VirtualEntry) -> int: """ Return the inode matching a given entry """ try: return self._entry2inode[entry] except KeyError: raise pyfuse3.FUSEError(errno.ENOENT) def inode2path(self, inode: int) -> Path: """ Return the path matching a given inode """ try: return self._inode2path[inode] except KeyError: raise pyfuse3.FUSEError(errno.ENOENT) def get_metadata(self, swhid: SWHID) -> Any: """ Retrieve metadata for a given SWHID using Software Heritage API """ # TODO: swh-graph API cache = self.cache.metadata[swhid] if cache: return cache try: metadata = self.web_api.get(swhid) self.cache.metadata[swhid] = metadata return metadata except requests.HTTPError: logging.error(f"Unknown SWHID: '{swhid}'") def get_blob(self, swhid: SWHID) -> str: """ Retrieve the blob bytes for a given content SWHID using Software Heritage API """ if swhid.object_type != CONTENT: raise pyfuse3.FUSEError(errno.EINVAL) cache = self.cache.blob[swhid] if cache: return cache resp = list(self.web_api.content_raw(swhid)) blob = "".join(map(bytes.decode, resp)) self.cache.blob[swhid] = blob return blob def get_direntries(self, entry: VirtualEntry) -> Any: """ Return directory entries of a given entry """ if isinstance(entry, ArtifactEntry): if entry.swhid.object_type == CONTENT: raise pyfuse3.FUSEError(errno.ENOTDIR) metadata = self.get_metadata(entry.swhid) if entry.swhid.object_type == CONTENT: return Content(metadata) if entry.swhid.object_type == DIRECTORY: return Directory(metadata) # TODO: add other objects else: if entry == ROOT_DIRENTRY: return Root() elif entry == ARCHIVE_DIRENTRY: return Archive(self.cache) elif entry == META_DIRENTRY: return Meta(self.cache) # TODO: error handling def get_attrs(self, entry: VirtualEntry) -> pyfuse3.EntryAttributes: """ Return entry attributes """ attrs = pyfuse3.EntryAttributes() attrs.st_size = 0 attrs.st_atime_ns = self.time_ns attrs.st_ctime_ns = self.time_ns attrs.st_mtime_ns = self.time_ns attrs.st_gid = self.gid attrs.st_uid = self.uid attrs.st_ino = self._alloc_inode(entry) if isinstance(entry, ArtifactEntry): metadata = entry.prefetch or self.get_metadata(entry.swhid) if entry.swhid.object_type == CONTENT: # Only in the context of a directory entry do we have archived # permissions. Otherwise, fallback to default read-only. attrs.st_mode = metadata.get("perms", int(EntryMode.RDONLY_FILE)) attrs.st_size = metadata["length"] else: attrs.st_mode = int(EntryMode.RDONLY_DIR) else: attrs.st_mode = int(entry.mode) # Meta JSON entries (under the root meta/ directory) if entry.name.endswith(".json"): swhid = parse_swhid(entry.name.replace(".json", "")) metadata = self.get_metadata(swhid) attrs.st_size = len(str(metadata)) return attrs async def getattr( self, inode: int, _ctx: pyfuse3.RequestContext ) -> pyfuse3.EntryAttributes: """ Get attributes for a given inode """ entry = self.inode2entry(inode) return self.get_attrs(entry) async def opendir(self, inode: int, _ctx: pyfuse3.RequestContext) -> int: """ Open a directory referred by a given inode """ # Re-use inode as directory handle return inode async def readdir( self, inode: int, offset: int, token: pyfuse3.ReaddirToken ) -> None: """ Read entries in an open directory """ direntry = self.inode2entry(inode) path = self.inode2path(inode) # TODO: add cache on direntry list? entries = self.get_direntries(direntry) next_id = offset + 1 for entry in itertools.islice(entries, offset, None): name = os.fsencode(entry.name) attrs = self.get_attrs(entry) if not pyfuse3.readdir_reply(token, name, attrs, next_id): break next_id += 1 self._inode2entry[attrs.st_ino] = entry self._inode2path[attrs.st_ino] = Path(path, entry.name) async def open( self, inode: int, _flags: int, _ctx: pyfuse3.RequestContext ) -> pyfuse3.FileInfo: """ Open an inode and return a unique file descriptor """ entry = self.inode2entry(inode) fd = self._alloc_fd(entry) return pyfuse3.FileInfo(fh=fd, keep_cache=True) async def read(self, fd: int, _offset: int, _length: int) -> bytes: """ Read blob content pointed by the given `fd` (file descriptor). Both parameters `_offset` and `_length` are ignored. """ # TODO: use offset/length try: entry = self._fd2entry[fd] except KeyError: raise pyfuse3.FUSEError(errno.ENOENT) if isinstance(entry, ArtifactEntry): blob = self.get_blob(entry.swhid) return blob.encode() else: # Meta JSON entries (under the root meta/ directory) if entry.name.endswith(".json"): swhid = parse_swhid(entry.name.replace(".json", "")) metadata = self.get_metadata(swhid) return str(metadata).encode() else: # TODO: error handling raise pyfuse3.FUSEError(errno.ENOENT) async def lookup( self, parent_inode: int, name: str, _ctx: pyfuse3.RequestContext ) -> pyfuse3.EntryAttributes: """ Look up a directory entry by name and get its attributes """ name = os.fsdecode(name) path = Path(self.inode2path(parent_inode), name) parent_entry = self.inode2entry(parent_inode) attr = None if isinstance(parent_entry, ArtifactEntry): metadata = self.get_metadata(parent_entry.swhid) for entry in metadata: if entry["name"] == name: swhid = entry["target"] attr = self.get_attrs(ArtifactEntry(name, swhid)) # TODO: this is fragile, maybe cache attrs? else: if parent_entry == ROOT_DIRENTRY: if name == ARCHIVE_DIRENTRY.name: attr = self.get_attrs(ARCHIVE_DIRENTRY) elif name == META_DIRENTRY.name: attr = self.get_attrs(META_DIRENTRY) else: swhid = parse_swhid(name) attr = self.get_attrs(ArtifactEntry(name, swhid)) if attr: self._inode2path[attr.st_ino] = path return attr else: # TODO: error handling (name not found) return pyfuse3.EntryAttributes() -def main(swhids: List[SWHID], root_path: Path, cache_dir: Path, api_url: str) -> None: +def main(swhids: List[SWHID], root_path: Path, conf: Dict[str, Any]) -> None: """ swh-fuse CLI entry-point """ # Use pyfuse3 asyncio layer to match the rest of Software Heritage codebase pyfuse3_asyncio.enable() - with Cache(cache_dir) as cache: - fs = Fuse(swhids, root_path, api_url, cache) + with FuseCache(conf["cache"]) as cache: + fs = Fuse(swhids, root_path, cache, conf) fuse_options = set(pyfuse3.default_options) fuse_options.add("fsname=swhfs") fuse_options.add("debug") pyfuse3.init(fs, root_path, fuse_options) loop = asyncio.get_event_loop() try: loop.run_until_complete(pyfuse3.main()) fs.shutdown() finally: pyfuse3.close(unmount=True) loop.close()