diff --git a/swh/fuse/cache.py b/swh/fuse/cache.py index 3bf680a..49a2d59 100644 --- a/swh/fuse/cache.py +++ b/swh/fuse/cache.py @@ -1,71 +1,89 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json from pathlib import Path import sqlite3 +from typing import Any, List, Optional from swh.model.identifiers import CONTENT, SWHID, parse_swhid from swh.web.client.client import typify class Cache: - def __init__(self, cache_dir): - if cache_dir == ":memory:": + """ Cache all information retrieved from the Software Heritage API to + minimize API calls. """ + + def __init__(self, cache_dir: Path): + if str(cache_dir) == ":memory:": self.conn = sqlite3.connect(":memory:") else: cache_path = Path(cache_dir, "cache.sqlite") cache_path.parents[0].mkdir(parents=True, exist_ok=True) self.conn = sqlite3.connect(cache_path) self.db = self.conn.cursor() self.db.execute("create table if not exists metadata_cache (swhid, metadata)") self.db.execute("create table if not exists blob_cache (swhid, blob)") self.conn.commit() - def get_metadata(self, swhid: SWHID): + def get_metadata(self, swhid: SWHID) -> Any: + """ Return previously cached JSON metadata associated with a SWHID """ + self.db.execute( "select metadata from metadata_cache where swhid=?", (str(swhid),) ) cache = self.db.fetchone() if cache: metadata = json.loads(cache[0]) return typify(metadata, swhid.object_type) + else: + return None + + def put_metadata(self, swhid: SWHID, metadata: Any) -> None: + """ Cache JSON metadata associated with a SWHID """ - def put_metadata(self, swhid: SWHID, metadata): self.db.execute( "insert into metadata_cache values (?, ?)", ( str(swhid), json.dumps( metadata, # Converts the typified JSON to plain str version default=lambda x: x.object_id if isinstance(x, SWHID) else x.__dict__, ), ), ) self.conn.commit() - def get_metadata_swhids(self): + def get_metadata_swhids(self) -> List[SWHID]: + """ Return a list of SWHID of all previously cached entry """ + self.db.execute("select swhid from metadata_cache") swhids = self.db.fetchall() swhids = [parse_swhid(x[0]) for x in swhids] return swhids - def get_blob(self, swhid: SWHID): + def get_blob(self, swhid: SWHID) -> Optional[str]: + """ Return previously cached blob bytes associated with a content SWHID """ + if swhid.object_type != CONTENT: raise AttributeError("Cannot retrieve blob from non-content object type") self.db.execute("select blob from blob_cache where swhid=?", (str(swhid),)) cache = self.db.fetchone() if cache: blob = cache[0] return blob + else: + return None + + def put_blob(self, swhid: SWHID, blob: str) -> None: + """ Cache blob bytes associated with a content SWHID """ - def put_blob(self, swhid: SWHID, blob): self.db.execute("insert into blob_cache values (?, ?)", (str(swhid), blob)) self.conn.commit() diff --git a/swh/fuse/cli.py b/swh/fuse/cli.py index d85e6c3..f3b6196 100644 --- a/swh/fuse/cli.py +++ b/swh/fuse/cli.py @@ -1,113 +1,114 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os # WARNING: do not import unnecessary things here to keep cli startup time under # control from pathlib import Path from typing import Any, Dict import click # from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.model.identifiers import SWHID # All generic config code should reside in swh.core.config DEFAULT_CONFIG_PATH = os.environ.get( "SWH_CONFIG_FILE", os.path.join(click.get_app_dir("swh"), "global.yml") ) -CACHE_HOME_DIR = ( +CACHE_HOME_DIR: Path = ( Path(os.environ["XDG_CACHE_HOME"]) if "XDG_CACHE_HOME" in os.environ else Path(Path.home(), ".cache") ) DEFAULT_CONFIG: Dict[str, Any] = { "cache-dir": Path(CACHE_HOME_DIR, "swh", "fuse"), "web-api": { "url": "https://archive.softwareheritage.org/api/1", "auth-token": None, }, } class SWHIDParamType(click.ParamType): """Click argument that accepts SWHID and return them as :class:`swh.model.identifiers.SWHID` instances """ name = "SWHID" def convert(self, value, param, ctx) -> SWHID: from swh.model.exceptions import ValidationError from swh.model.identifiers import parse_swhid try: return parse_swhid(value) except ValidationError: self.fail(f'"{value}" is not a valid SWHID', param, ctx) @click.group(name="fuse", context_settings=CONTEXT_SETTINGS) # XXX conffile logic temporarily commented out due to: # XXX https://forge.softwareheritage.org/T2632 # @click.option( # "-C", # "--config-file", # default=DEFAULT_CONFIG_PATH, # type=click.Path(exists=True, dir_okay=False, path_type=str), # help="YAML configuration file", # ) @click.pass_context def cli(ctx): """Software Heritage virtual file system""" # # recursive merge not done by config.read # conf = config.read_raw_config(config.config_basepath(config_file)) # conf = config.merge_configs(DEFAULT_CONFIG, conf) conf = {} ctx.ensure_object(dict) ctx.obj["config"] = conf @cli.command() @click.argument("swhid", required=True, metavar="SWHID", type=SWHIDParamType()) @click.argument( "path", required=True, metavar="PATH", type=click.Path(exists=True, dir_okay=True, file_okay=False), ) @click.option( "-c", "--cache-dir", default=DEFAULT_CONFIG["cache-dir"], metavar="CACHE_DIR", show_default=True, type=click.Path(dir_okay=True, file_okay=False), help=""" directory where to store cache from the Software Heritage API. To store all the cache in memory instead of disk, use a value of ':memory:'. """, ) @click.option( "-u", "--api-url", default=DEFAULT_CONFIG["web-api"]["url"], metavar="API_URL", show_default=True, help="base URL for Software Heritage Web API", ) @click.pass_context def mount(ctx, swhid, path, cache_dir, api_url): - """Mount the Software Heritage archive at the given mount point""" + """ Mount the Software Heritage archive at the given mount point """ + from swh.fuse import fuse fuse.main(swhid, path, cache_dir, api_url) diff --git a/swh/fuse/fs/artifact.py b/swh/fuse/fs/artifact.py index 8074a17..db0d289 100644 --- a/swh/fuse/fs/artifact.py +++ b/swh/fuse/fs/artifact.py @@ -1,23 +1,49 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from swh.fuse.fs.entry import ArtifactEntry +from typing import Any, Dict, Iterator, List + +from swh.fuse.fs.entry import ArtifactEntry, VirtualEntry class Content: - def __init__(self, json): + """ Software Heritage content artifact. + + Content leaves (AKA blobs) are represented on disks as regular files, + containing the corresponding bytes, as archived. + + Note that permissions are associated to blobs only in the context of + directories. Hence, when accessing blobs from the top-level `archive/` + directory, the permissions of the `archive/SWHID` file will be arbitrary and + not meaningful (e.g., `0x644`). """ + + def __init__(self, json: Dict[str, Any]): self.json = json class Directory: - def __init__(self, json): + """ Software Heritage directory artifact. + + Directory nodes are represented as directories on the file-system, + containing one entry for each entry of the archived directory. Entry names + and other metadata, including permissions, will correspond to the archived + entry metadata. + + Note that the FUSE mount is read-only, no matter what the permissions say. + So it is possible that, in the context of a directory, a file is presented + as writable, whereas actually writing to it will fail with `EPERM`. """ + + def __init__(self, json: List[Dict[str, Any]]): self.json = json - def __iter__(self): + def __iter__(self) -> Iterator[VirtualEntry]: entries = [] for entry in self.json: name, swhid = entry["name"], entry["target"] - entries.append(ArtifactEntry(name, swhid)) + # The directory API has extra info we can use to set attributes + # without additional Software Heritage API call + prefetch = entry + entries.append(ArtifactEntry(name, swhid, prefetch)) return iter(entries) diff --git a/swh/fuse/fs/entry.py b/swh/fuse/fs/entry.py index 2912c98..cb949a3 100644 --- a/swh/fuse/fs/entry.py +++ b/swh/fuse/fs/entry.py @@ -1,31 +1,55 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from enum import IntEnum from stat import S_IFDIR, S_IFREG +from typing import Any, Dict from swh.model.identifiers import SWHID class EntryMode(IntEnum): + """ Default entry mode and permissions for the FUSE. + + The FUSE mount is always read-only, even if permissions contradict this + statement (in a context of a directory, entries are listed with permissions + taken from the archive). + """ + RDONLY_FILE = S_IFREG | 0o444 RDONLY_DIR = S_IFDIR | 0o555 class VirtualEntry: + """ Main wrapper class to manipulate virtual FUSE entries + + Attributes: + name: entry filename + mode: entry permission mode + """ + def __init__(self, name: str, mode: EntryMode): self.name = name self.mode = mode class ArtifactEntry(VirtualEntry): - def __init__(self, name: str, swhid: SWHID): + """ FUSE virtual entry for a Software Heritage Artifact + + Attributes: + name: entry filename + swhid: Software Heritage persistent identifier + prefetch: optional prefetched metadata used to set entry attributes + """ + + def __init__(self, name: str, swhid: SWHID, prefetch: Dict[str, Any] = None): self.name = name self.swhid = swhid + self.prefetch = prefetch ROOT_ENTRY = VirtualEntry("root", EntryMode.RDONLY_DIR) ARCHIVE_ENTRY = VirtualEntry("archive", EntryMode.RDONLY_DIR) META_ENTRY = VirtualEntry("meta", EntryMode.RDONLY_DIR) diff --git a/swh/fuse/fs/mountpoint.py b/swh/fuse/fs/mountpoint.py index 1be3d69..c6fc84e 100644 --- a/swh/fuse/fs/mountpoint.py +++ b/swh/fuse/fs/mountpoint.py @@ -1,41 +1,56 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from typing import Iterator + +from swh.fuse.cache import Cache from swh.fuse.fs.entry import ( ARCHIVE_ENTRY, META_ENTRY, ArtifactEntry, EntryMode, VirtualEntry, ) class Root: - def __iter__(self): + """ The FUSE mountpoint, consisting of the archive/ and meta/ directories """ + + def __iter__(self) -> Iterator[VirtualEntry]: entries = [ARCHIVE_ENTRY, META_ENTRY] return iter(entries) class Archive: - def __init__(self, cache): + """ The archive/ directory is lazily populated with one entry per accessed + SWHID, having actual SWHIDs as names """ + + def __init__(self, cache: Cache): self.cache = cache - def __iter__(self): + def __iter__(self) -> Iterator[VirtualEntry]: entries = [] for swhid in self.cache.get_metadata_swhids(): entries.append(ArtifactEntry(str(swhid), swhid)) return iter(entries) class Meta: - def __init__(self, cache): + """ The meta/ directory contains one SWHID.json file for each SWHID entry + under archive/. The JSON file contain all available meta information about + the given SWHID, as returned by the Software Heritage Web API for that + object. Note that, in case of pagination (e.g., snapshot objects with many + branches) the JSON file will contain a complete version with all pages + merged together. """ + + def __init__(self, cache: Cache): self.cache = cache - def __iter__(self): + def __iter__(self) -> Iterator[VirtualEntry]: entries = [] for swhid in self.cache.get_metadata_swhids(): filename = str(swhid) + ".json" entries.append(VirtualEntry(filename, EntryMode.RDONLY_FILE)) return iter(entries) diff --git a/swh/fuse/fuse.py b/swh/fuse/fuse.py index 6735297..a73e28b 100644 --- a/swh/fuse/fuse.py +++ b/swh/fuse/fuse.py @@ -1,268 +1,304 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import asyncio import errno import itertools import os from pathlib import Path import time -from typing import Dict +from typing import Any, Dict import pyfuse3 import pyfuse3_asyncio import requests from swh.fuse.cache import Cache from swh.fuse.fs.artifact import Content, Directory from swh.fuse.fs.entry import ( ARCHIVE_ENTRY, META_ENTRY, ROOT_ENTRY, ArtifactEntry, EntryMode, VirtualEntry, ) from swh.fuse.fs.mountpoint import Archive, Meta, Root from swh.model.identifiers import CONTENT, DIRECTORY, SWHID, parse_swhid from swh.web.client.client import WebAPIClient # Use pyfuse3 asyncio layer to match the rest of Software Heritage codebase pyfuse3_asyncio.enable() class Fuse(pyfuse3.Operations): + """ Software Heritage Filesystem in Userspace (FUSE). Locally mount parts of + the archive and navigate it as a virtual file system. """ + def __init__( self, root_swhid: SWHID, root_path: Path, cache_dir: Path, api_url: str ): super(Fuse, self).__init__() - # TODO check if root_swhid actually exists in the graph - root_inode = pyfuse3.ROOT_INODE self._inode2entry: Dict[int, VirtualEntry] = {root_inode: ROOT_ENTRY} self._entry2inode: Dict[VirtualEntry, int] = {ROOT_ENTRY: root_inode} self._entry2fd: Dict[VirtualEntry, int] = {} self._fd2entry: Dict[int, VirtualEntry] = {} self._inode2path: Dict[int, Path] = {root_inode: root_path} self._next_inode: int = root_inode + 1 self._next_fd: int = 0 self.time_ns: int = time.time_ns() # start time, used as timestamp self.gid = os.getgid() self.uid = os.getuid() self.web_api = WebAPIClient(api_url) self.cache = Cache(cache_dir) + # TODO check if root_swhid actually exists in the graph # Initially populate the cache self.get_metadata(root_swhid) def _alloc_inode(self, entry: VirtualEntry) -> int: + """ Return a unique inode integer for a given entry """ + try: return self._entry2inode[entry] except KeyError: inode = self._next_inode self._next_inode += 1 self._entry2inode[entry] = inode self._inode2entry[inode] = entry # TODO add inode recycling with invocation to invalidate_inode when # the dicts get too big return inode def _alloc_fd(self, entry: VirtualEntry) -> int: + """ Return a unique file descriptor integer for a given entry """ + try: return self._entry2fd[entry] except KeyError: fd = self._next_fd self._next_fd += 1 self._entry2fd[entry] = fd self._fd2entry[fd] = entry return fd def inode2entry(self, inode: int) -> VirtualEntry: + """ Return the entry matching a given inode """ + try: return self._inode2entry[inode] except KeyError: raise pyfuse3.FUSEError(errno.ENOENT) def entry2inode(self, entry: VirtualEntry) -> int: + """ Return the inode matching a given entry """ + try: return self._entry2inode[entry] except KeyError: raise pyfuse3.FUSEError(errno.ENOENT) def inode2path(self, inode: int) -> Path: + """ Return the path matching a given inode """ + try: return self._inode2path[inode] except KeyError: raise pyfuse3.FUSEError(errno.ENOENT) - def get_metadata(self, swhid: SWHID): + def get_metadata(self, swhid: SWHID) -> Any: + """ Retrieve metadata for a given SWHID using Software Heritage API """ + # TODO: swh-graph API cache = self.cache.get_metadata(swhid) if cache: return cache metadata = self.web_api.get(swhid) self.cache.put_metadata(swhid, metadata) return metadata - def get_blob(self, swhid: SWHID): + def get_blob(self, swhid: SWHID) -> str: + """ Retrieve the blob bytes for a given content SWHID using Software + Heritage API """ + if swhid.object_type != CONTENT: - return None + raise pyfuse3.FUSEError(errno.EINVAL) cache = self.cache.get_blob(swhid) if cache: return cache metadata = self.get_metadata(swhid) blob = requests.get(metadata["data_url"]).text self.cache.put_blob(swhid, blob) return blob - def get_direntries(self, entry: VirtualEntry): + def get_direntries(self, entry: VirtualEntry) -> Any: + """ Return directory entries of a given entry """ + if isinstance(entry, ArtifactEntry): if entry.swhid.object_type == CONTENT: raise pyfuse3.FUSEError(errno.ENOTDIR) metadata = self.get_metadata(entry.swhid) if entry.swhid.object_type == CONTENT: return Content(metadata) if entry.swhid.object_type == DIRECTORY: return Directory(metadata) # TODO: add other objects else: if entry == ROOT_ENTRY: return Root() elif entry == ARCHIVE_ENTRY: return Archive(self.cache) elif entry == META_ENTRY: return Meta(self.cache) # TODO: error handling - def get_attrs(self, entry: VirtualEntry, prefetch=None) -> pyfuse3.EntryAttributes: + def get_attrs(self, entry: VirtualEntry) -> pyfuse3.EntryAttributes: + """ Return entry attributes """ + attrs = pyfuse3.EntryAttributes() attrs.st_size = 0 attrs.st_atime_ns = self.time_ns attrs.st_ctime_ns = self.time_ns attrs.st_mtime_ns = self.time_ns attrs.st_gid = self.gid attrs.st_uid = self.uid attrs.st_ino = self._alloc_inode(entry) if isinstance(entry, ArtifactEntry): - metadata = prefetch or self.get_metadata(entry.swhid) + metadata = entry.prefetch or self.get_metadata(entry.swhid) if entry.swhid.object_type == CONTENT: - attrs.st_mode = int(EntryMode.RDONLY_FILE) - # Only the directory API prefetch stores the permissions - if prefetch: - attrs.st_mode = metadata["perms"] + # Only in the context of a directory entry do we have archived + # permissions. Otherwise, fallback to default read-only. + attrs.st_mode = metadata.get("perms", int(EntryMode.RDONLY_FILE)) attrs.st_size = metadata["length"] else: attrs.st_mode = int(EntryMode.RDONLY_DIR) else: attrs.st_mode = int(entry.mode) if entry.name.endswith(".json"): filename = Path(entry.name).stem swhid = parse_swhid(filename) metadata = self.get_metadata(swhid) attrs.st_size = len(str(metadata)) return attrs - async def getattr(self, inode, ctx): + async def getattr( + self, inode: int, _ctx: pyfuse3.RequestContext + ) -> pyfuse3.EntryAttributes: + """ Get attributes for a given inode """ + entry = self.inode2entry(inode) return self.get_attrs(entry) - async def opendir(self, inode, ctx): + async def opendir(self, inode: int, _ctx: pyfuse3.RequestContext) -> int: + """ Open a directory referred by a given inode """ + # Re-use inode as directory handle return inode - async def readdir(self, inode, offset, token): + async def readdir( + self, inode: int, offset: int, token: pyfuse3.ReaddirToken + ) -> None: + """ Read entries in an open directory """ + direntry = self.inode2entry(inode) path = self.inode2path(inode) # TODO: add cache on direntry list? entries = self.get_direntries(direntry) - current_id = offset + next_id = offset + 1 for entry in itertools.islice(entries, offset, None): - # The directory API has extra info we can use to set attributes - # without additional SWH API call - prefetch = None - if isinstance(entries, Directory): - prefetch = entries.json[current_id] - - attrs = self.get_attrs(entry, prefetch) - next_id = current_id + 1 - if not pyfuse3.readdir_reply( - token, os.fsencode(entry.name), attrs, next_id - ): + name = os.fsencode(entry.name) + attrs = self.get_attrs(entry) + if not pyfuse3.readdir_reply(token, name, attrs, next_id): break - current_id = next_id + next_id += 1 self._inode2entry[attrs.st_ino] = entry self._inode2path[attrs.st_ino] = Path(path, entry.name) - async def open(self, inode, flags, ctx): + async def open( + self, inode: int, _flags: int, _ctx: pyfuse3.RequestContext + ) -> pyfuse3.FileInfo: + """ Open an inode and return a unique file descriptor """ + entry = self.inode2entry(inode) fd = self._alloc_fd(entry) return pyfuse3.FileInfo(fh=fd, keep_cache=True) - async def read(self, fd, offset, length): + async def read(self, fd: int, _offset: int, _length: int) -> bytes: + """ Read blob content pointed by the given `fd` (file descriptor). Both + parameters `_offset` and `_length` are ignored. """ + # TODO: use offset/length + try: entry = self._fd2entry[fd] except KeyError: raise pyfuse3.FUSEError(errno.ENOENT) if isinstance(entry, ArtifactEntry): blob = self.get_blob(entry.swhid) return blob.encode() else: if entry.name.endswith(".json"): filename = Path(entry.name).stem swhid = parse_swhid(filename) metadata = self.get_metadata(swhid) return str(metadata).encode() else: - # TODO: error handling? - pass + # TODO: error handling + raise pyfuse3.FUSEError(errno.ENOENT) + + async def lookup( + self, parent_inode: int, name: str, _ctx: pyfuse3.RequestContext + ) -> pyfuse3.EntryAttributes: + """ Look up a directory entry by name and get its attributes """ - async def lookup(self, parent_inode, name, ctx): name = os.fsdecode(name) path = Path(self.inode2path(parent_inode), name) parent_entry = self.inode2entry(parent_inode) if isinstance(parent_entry, ArtifactEntry): metadata = self.get_metadata(parent_entry.swhid) for entry in metadata: if entry["name"] == name: swhid = entry["target"] attr = self.get_attrs(ArtifactEntry(name, swhid)) self._inode2path[attr.st_ino] = path return attr # TODO: error handling (name not found) return pyfuse3.EntryAttributes() def main(root_swhid: SWHID, root_path: Path, cache_dir: Path, api_url: str) -> None: + """ swh-fuse CLI entry-point """ + fs = Fuse(root_swhid, root_path, cache_dir, api_url) fuse_options = set(pyfuse3.default_options) fuse_options.add("fsname=swhfs") fuse_options.add("debug") pyfuse3.init(fs, root_path, fuse_options) loop = asyncio.get_event_loop() try: loop.run_until_complete(pyfuse3.main()) fs.shutdown() finally: pyfuse3.close(unmount=True) loop.close()