diff --git a/mypy.ini b/mypy.ini --- a/mypy.ini +++ b/mypy.ini @@ -11,5 +11,8 @@ [mypy-pytest.*] ignore_missing_imports = True -# [mypy-add_your_lib_here.*] -# ignore_missing_imports = True +[mypy-pyfuse3.*] +ignore_missing_imports = True + +[mypy-pyfuse3_asyncio.*] +ignore_missing_imports = True diff --git a/swh/fuse/cache.py b/swh/fuse/cache.py new file mode 100644 --- /dev/null +++ b/swh/fuse/cache.py @@ -0,0 +1,89 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import json +from pathlib import Path +import sqlite3 +from typing import Any, List, Optional + +from swh.model.identifiers import CONTENT, SWHID, parse_swhid +from swh.web.client.client import typify + + +class Cache: + """ Cache all information retrieved from the Software Heritage API to + minimize API calls. """ + + def __init__(self, cache_dir: Path): + if str(cache_dir) == ":memory:": + self.conn = sqlite3.connect(":memory:") + else: + cache_path = Path(cache_dir, "cache.sqlite") + cache_path.parents[0].mkdir(parents=True, exist_ok=True) + self.conn = sqlite3.connect(cache_path) + + self.db = self.conn.cursor() + self.db.execute("create table if not exists metadata_cache (swhid, metadata)") + self.db.execute("create table if not exists blob_cache (swhid, blob)") + self.conn.commit() + + def get_metadata(self, swhid: SWHID) -> Any: + """ Return previously cached JSON metadata associated with a SWHID """ + + self.db.execute( + "select metadata from metadata_cache where swhid=?", (str(swhid),) + ) + cache = self.db.fetchone() + if cache: + metadata = json.loads(cache[0]) + return typify(metadata, swhid.object_type) + else: + return None + + def put_metadata(self, swhid: SWHID, metadata: Any) -> None: + """ Cache JSON metadata associated with a SWHID """ + + self.db.execute( + "insert into metadata_cache values (?, ?)", + ( + str(swhid), + json.dumps( + metadata, + # Converts the typified JSON to plain str version + default=lambda x: x.object_id + if isinstance(x, SWHID) + else x.__dict__, + ), + ), + ) + self.conn.commit() + + def get_metadata_swhids(self) -> List[SWHID]: + """ Return a list of SWHID of all previously cached entry """ + + self.db.execute("select swhid from metadata_cache") + swhids = self.db.fetchall() + swhids = [parse_swhid(x[0]) for x in swhids] + return swhids + + def get_blob(self, swhid: SWHID) -> Optional[str]: + """ Return previously cached blob bytes associated with a content SWHID """ + + if swhid.object_type != CONTENT: + raise AttributeError("Cannot retrieve blob from non-content object type") + + self.db.execute("select blob from blob_cache where swhid=?", (str(swhid),)) + cache = self.db.fetchone() + if cache: + blob = cache[0] + return blob + else: + return None + + def put_blob(self, swhid: SWHID, blob: str) -> None: + """ Cache blob bytes associated with a content SWHID """ + + self.db.execute("insert into blob_cache values (?, ?)", (str(swhid), blob)) + self.conn.commit() diff --git a/swh/fuse/cli.py b/swh/fuse/cli.py --- a/swh/fuse/cli.py +++ b/swh/fuse/cli.py @@ -3,29 +3,57 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import os + # WARNING: do not import unnecessary things here to keep cli startup time under # control -import os +from pathlib import Path from typing import Any, Dict import click # from swh.core import config from swh.core.cli import CONTEXT_SETTINGS +from swh.model.identifiers import SWHID # All generic config code should reside in swh.core.config DEFAULT_CONFIG_PATH = os.environ.get( "SWH_CONFIG_FILE", os.path.join(click.get_app_dir("swh"), "global.yml") ) +CACHE_HOME_DIR: Path = ( + Path(os.environ["XDG_CACHE_HOME"]) + if "XDG_CACHE_HOME" in os.environ + else Path(Path.home(), ".cache") +) + DEFAULT_CONFIG: Dict[str, Any] = { + "cache-dir": Path(CACHE_HOME_DIR, "swh", "fuse"), "web-api": { "url": "https://archive.softwareheritage.org/api/1", "auth-token": None, - } + }, } +class SWHIDParamType(click.ParamType): + """Click argument that accepts SWHID and return them as + :class:`swh.model.identifiers.SWHID` instances + + """ + + name = "SWHID" + + def convert(self, value, param, ctx) -> SWHID: + from swh.model.exceptions import ValidationError + from swh.model.identifiers import parse_swhid + + try: + return parse_swhid(value) + except ValidationError: + self.fail(f'"{value}" is not a valid SWHID', param, ctx) + + @click.group(name="fuse", context_settings=CONTEXT_SETTINGS) # XXX conffile logic temporarily commented out due to: # XXX https://forge.softwareheritage.org/T2632 @@ -50,6 +78,25 @@ @cli.command() +@click.argument("swhid", required=True, metavar="SWHID", type=SWHIDParamType()) +@click.argument( + "path", + required=True, + metavar="PATH", + type=click.Path(exists=True, dir_okay=True, file_okay=False), +) +@click.option( + "-c", + "--cache-dir", + default=DEFAULT_CONFIG["cache-dir"], + metavar="CACHE_DIR", + show_default=True, + type=click.Path(dir_okay=True, file_okay=False), + help=""" + directory where to store cache from the Software Heritage API. To + store all the cache in memory instead of disk, use a value of ':memory:'. + """, +) @click.option( "-u", "--api-url", @@ -59,8 +106,9 @@ help="base URL for Software Heritage Web API", ) @click.pass_context -def mount(ctx, api_url): - """Mount the Software Heritage archive at the given mount point""" - from .fuse import fuse # XXX +def mount(ctx, swhid, path, cache_dir, api_url): + """ Mount the Software Heritage archive at the given mount point """ + + from swh.fuse import fuse - fuse() + fuse.main(swhid, path, cache_dir, api_url) diff --git a/swh/fuse/fs/artifact.py b/swh/fuse/fs/artifact.py new file mode 100644 --- /dev/null +++ b/swh/fuse/fs/artifact.py @@ -0,0 +1,49 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from typing import Any, Dict, Iterator, List + +from swh.fuse.fs.entry import ArtifactEntry, VirtualEntry + + +class Content: + """ Software Heritage content artifact. + + Content leaves (AKA blobs) are represented on disks as regular files, + containing the corresponding bytes, as archived. + + Note that permissions are associated to blobs only in the context of + directories. Hence, when accessing blobs from the top-level `archive/` + directory, the permissions of the `archive/SWHID` file will be arbitrary and + not meaningful (e.g., `0x644`). """ + + def __init__(self, json: Dict[str, Any]): + self.json = json + + +class Directory: + """ Software Heritage directory artifact. + + Directory nodes are represented as directories on the file-system, + containing one entry for each entry of the archived directory. Entry names + and other metadata, including permissions, will correspond to the archived + entry metadata. + + Note that the FUSE mount is read-only, no matter what the permissions say. + So it is possible that, in the context of a directory, a file is presented + as writable, whereas actually writing to it will fail with `EPERM`. """ + + def __init__(self, json: List[Dict[str, Any]]): + self.json = json + + def __iter__(self) -> Iterator[VirtualEntry]: + entries = [] + for entry in self.json: + name, swhid = entry["name"], entry["target"] + # The directory API has extra info we can use to set attributes + # without additional Software Heritage API call + prefetch = entry + entries.append(ArtifactEntry(name, swhid, prefetch)) + return iter(entries) diff --git a/swh/fuse/fs/entry.py b/swh/fuse/fs/entry.py new file mode 100644 --- /dev/null +++ b/swh/fuse/fs/entry.py @@ -0,0 +1,55 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from enum import IntEnum +from stat import S_IFDIR, S_IFREG +from typing import Any, Dict + +from swh.model.identifiers import SWHID + + +class EntryMode(IntEnum): + """ Default entry mode and permissions for the FUSE. + + The FUSE mount is always read-only, even if permissions contradict this + statement (in a context of a directory, entries are listed with permissions + taken from the archive). + """ + + RDONLY_FILE = S_IFREG | 0o444 + RDONLY_DIR = S_IFDIR | 0o555 + + +class VirtualEntry: + """ Main wrapper class to manipulate virtual FUSE entries + + Attributes: + name (str): entry filename + mode (EntryMode): entry permission mode + """ + + def __init__(self, name: str, mode: EntryMode): + self.name = name + self.mode = mode + + +class ArtifactEntry(VirtualEntry): + """ FUSE virtual entry for a Software Heritage Artifact + + Attributes: + name (str): entry filename + swhid (SWHID): Software Heritage persistent identifier + prefetch (dict): optional prefetched metadata used to set entry attributes + """ + + def __init__(self, name: str, swhid: SWHID, prefetch: Dict[str, Any] = None): + self.name = name + self.swhid = swhid + self.prefetch = prefetch + + +ROOT_ENTRY = VirtualEntry("root", EntryMode.RDONLY_DIR) +ARCHIVE_ENTRY = VirtualEntry("archive", EntryMode.RDONLY_DIR) +META_ENTRY = VirtualEntry("meta", EntryMode.RDONLY_DIR) diff --git a/swh/fuse/fs/mountpoint.py b/swh/fuse/fs/mountpoint.py new file mode 100644 --- /dev/null +++ b/swh/fuse/fs/mountpoint.py @@ -0,0 +1,56 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from typing import Iterator + +from swh.fuse.cache import Cache +from swh.fuse.fs.entry import ( + ARCHIVE_ENTRY, + META_ENTRY, + ArtifactEntry, + EntryMode, + VirtualEntry, +) + + +class Root: + """ The FUSE mountpoint, consisting of the archive/ and meta/ directories """ + + def __iter__(self) -> Iterator[VirtualEntry]: + entries = [ARCHIVE_ENTRY, META_ENTRY] + return iter(entries) + + +class Archive: + """ The archive/ directory is lazily populated with one entry per accessed + SWHID, having actual SWHIDs as names """ + + def __init__(self, cache: Cache): + self.cache = cache + + def __iter__(self) -> Iterator[VirtualEntry]: + entries = [] + for swhid in self.cache.get_metadata_swhids(): + entries.append(ArtifactEntry(str(swhid), swhid)) + return iter(entries) + + +class Meta: + """ The meta/ directory contains one SWHID.json file for each SWHID entry + under archive/. The JSON file contain all available meta information about + the given SWHID, as returned by the Software Heritage Web API for that + object. Note that, in case of pagination (e.g., snapshot objects with many + branches) the JSON file will contain a complete version with all pages + merged together. """ + + def __init__(self, cache: Cache): + self.cache = cache + + def __iter__(self) -> Iterator[VirtualEntry]: + entries = [] + for swhid in self.cache.get_metadata_swhids(): + filename = str(swhid) + ".json" + entries.append(VirtualEntry(filename, EntryMode.RDONLY_FILE)) + return iter(entries) diff --git a/swh/fuse/fuse.py b/swh/fuse/fuse.py --- a/swh/fuse/fuse.py +++ b/swh/fuse/fuse.py @@ -3,11 +3,302 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import asyncio +import errno +import itertools +import os +from pathlib import Path +import time +from typing import Any, Dict -import logging -import sys +import pyfuse3 +import pyfuse3_asyncio +import requests +from swh.fuse.cache import Cache +from swh.fuse.fs.artifact import Content, Directory +from swh.fuse.fs.entry import ( + ARCHIVE_ENTRY, + META_ENTRY, + ROOT_ENTRY, + ArtifactEntry, + EntryMode, + VirtualEntry, +) +from swh.fuse.fs.mountpoint import Archive, Meta, Root +from swh.model.identifiers import CONTENT, DIRECTORY, SWHID, parse_swhid +from swh.web.client.client import WebAPIClient -def fuse(): - logging.error("not implemented: FUSE mounting") - sys.exit(1) +# Use pyfuse3 asyncio layer to match the rest of Software Heritage codebase +pyfuse3_asyncio.enable() + + +class Fuse(pyfuse3.Operations): + """ Software Heritage Filesystem in Userspace (FUSE). Locally mount parts of + the archive and naviguate it as a virtual file system. """ + + def __init__( + self, root_swhid: SWHID, root_path: Path, cache_dir: Path, api_url: str + ): + super(Fuse, self).__init__() + + root_inode = pyfuse3.ROOT_INODE + self._inode2entry: Dict[int, VirtualEntry] = {root_inode: ROOT_ENTRY} + self._entry2inode: Dict[VirtualEntry, int] = {ROOT_ENTRY: root_inode} + self._entry2fd: Dict[VirtualEntry, int] = {} + self._fd2entry: Dict[int, VirtualEntry] = {} + self._inode2path: Dict[int, Path] = {root_inode: root_path} + + self._next_inode: int = root_inode + 1 + self._next_fd: int = 0 + + self.time_ns: int = time.time_ns() # start time, used as timestamp + self.gid = os.getgid() + self.uid = os.getuid() + + self.web_api = WebAPIClient(api_url) + self.cache = Cache(cache_dir) + + # TODO check if root_swhid actually exists in the graph + # Initially populate the cache + self.get_metadata(root_swhid) + + def _alloc_inode(self, entry: VirtualEntry) -> int: + """ Return a unique inode integer for a given entry """ + + try: + return self._entry2inode[entry] + except KeyError: + inode = self._next_inode + self._next_inode += 1 + self._entry2inode[entry] = inode + self._inode2entry[inode] = entry + + # TODO add inode recycling with invocation to invalidate_inode when + # the dicts get too big + + return inode + + def _alloc_fd(self, entry: VirtualEntry) -> int: + """ Return a unique file descriptor integer for a given entry """ + + try: + return self._entry2fd[entry] + except KeyError: + fd = self._next_fd + self._next_fd += 1 + self._entry2fd[entry] = fd + self._fd2entry[fd] = entry + return fd + + def inode2entry(self, inode: int) -> VirtualEntry: + """ Return the entry matching a given inode """ + + try: + return self._inode2entry[inode] + except KeyError: + raise pyfuse3.FUSEError(errno.ENOENT) + + def entry2inode(self, entry: VirtualEntry) -> int: + """ Return the inode matching a given entry """ + + try: + return self._entry2inode[entry] + except KeyError: + raise pyfuse3.FUSEError(errno.ENOENT) + + def inode2path(self, inode: int) -> Path: + """ Return the path matching a given inode """ + + try: + return self._inode2path[inode] + except KeyError: + raise pyfuse3.FUSEError(errno.ENOENT) + + def get_metadata(self, swhid: SWHID) -> Any: + """ Retrieve metadata for a given SWHID using Software Heritage API """ + + # TODO: swh-graph API + cache = self.cache.get_metadata(swhid) + if cache: + return cache + + metadata = self.web_api.get(swhid) + self.cache.put_metadata(swhid, metadata) + return metadata + + def get_blob(self, swhid: SWHID) -> str: + """ Retrieve the blob bytes for a given content SWHID using Software + Heritage API """ + + if swhid.object_type != CONTENT: + raise pyfuse3.FUSEError(errno.EINVAL) + + cache = self.cache.get_blob(swhid) + if cache: + return cache + + metadata = self.get_metadata(swhid) + blob = requests.get(metadata["data_url"]).text + self.cache.put_blob(swhid, blob) + return blob + + def get_direntries(self, entry: VirtualEntry) -> Any: + """ Return directory entries of a given entry """ + + if isinstance(entry, ArtifactEntry): + if entry.swhid.object_type == CONTENT: + raise pyfuse3.FUSEError(errno.ENOTDIR) + + metadata = self.get_metadata(entry.swhid) + if entry.swhid.object_type == CONTENT: + return Content(metadata) + if entry.swhid.object_type == DIRECTORY: + return Directory(metadata) + # TODO: add other objects + else: + if entry == ROOT_ENTRY: + return Root() + elif entry == ARCHIVE_ENTRY: + return Archive(self.cache) + elif entry == META_ENTRY: + return Meta(self.cache) + # TODO: error handling + + def get_attrs(self, entry: VirtualEntry) -> pyfuse3.EntryAttributes: + """ Return entry attributes """ + + attrs = pyfuse3.EntryAttributes() + attrs.st_size = 0 + attrs.st_atime_ns = self.time_ns + attrs.st_ctime_ns = self.time_ns + attrs.st_mtime_ns = self.time_ns + attrs.st_gid = self.gid + attrs.st_uid = self.uid + attrs.st_ino = self._alloc_inode(entry) + + if isinstance(entry, ArtifactEntry): + metadata = entry.prefetch or self.get_metadata(entry.swhid) + if entry.swhid.object_type == CONTENT: + # Only in the context of a directory entry do we have archived + # permissions. Otherwise, fallback to default read-only. + attrs.st_mode = metadata.get("perms", int(EntryMode.RDONLY_FILE)) + attrs.st_size = metadata["length"] + else: + attrs.st_mode = int(EntryMode.RDONLY_DIR) + else: + attrs.st_mode = int(entry.mode) + if entry.name.endswith(".json"): + filename = Path(entry.name).stem + swhid = parse_swhid(filename) + metadata = self.get_metadata(swhid) + attrs.st_size = len(str(metadata)) + + return attrs + + async def getattr( + self, inode: int, _ctx: pyfuse3.RequestContext + ) -> pyfuse3.EntryAttributes: + """ Get attributes for a given inode """ + + entry = self.inode2entry(inode) + return self.get_attrs(entry) + + async def opendir(self, inode: int, _ctx: pyfuse3.RequestContext) -> int: + """ Open a directory referred by a given inode """ + + # Re-use inode as directory handle + return inode + + async def readdir( + self, inode: int, offset: int, token: pyfuse3.ReaddirToken + ) -> None: + """ Read entries in an open directory """ + + direntry = self.inode2entry(inode) + path = self.inode2path(inode) + + # TODO: add cache on direntry list? + entries = self.get_direntries(direntry) + next_id = offset + 1 + for entry in itertools.islice(entries, offset, None): + name = os.fsencode(entry.name) + attrs = self.get_attrs(entry) + if not pyfuse3.readdir_reply(token, name, attrs, next_id): + break + + next_id += 1 + self._inode2entry[attrs.st_ino] = entry + self._inode2path[attrs.st_ino] = Path(path, entry.name) + + async def open( + self, inode: int, _flags: int, _ctx: pyfuse3.RequestContext + ) -> pyfuse3.FileInfo: + """ Open an inode and return a unique file descriptor """ + + entry = self.inode2entry(inode) + fd = self._alloc_fd(entry) + return pyfuse3.FileInfo(fh=fd, keep_cache=True) + + async def read(self, fd: int, _offset: int, _length: int) -> bytes: + """ Read blob content pointed by the given `fd` (file descriptor). Both + parameters `_offset` and `_length` are ignored. """ + # TODO: use offset/length + + try: + entry = self._fd2entry[fd] + except KeyError: + raise pyfuse3.FUSEError(errno.ENOENT) + + if isinstance(entry, ArtifactEntry): + blob = self.get_blob(entry.swhid) + return blob.encode() + else: + if entry.name.endswith(".json"): + filename = Path(entry.name).stem + swhid = parse_swhid(filename) + metadata = self.get_metadata(swhid) + return str(metadata).encode() + else: + # TODO: error handling + raise pyfuse3.FUSEError(errno.ENOENT) + + async def lookup( + self, parent_inode: int, name: str, _ctx: pyfuse3.RequestContext + ) -> pyfuse3.EntryAttributes: + """ Look up a directory entry by name and get its attributes """ + + name = os.fsdecode(name) + path = Path(self.inode2path(parent_inode), name) + + parent_entry = self.inode2entry(parent_inode) + if isinstance(parent_entry, ArtifactEntry): + metadata = self.get_metadata(parent_entry.swhid) + for entry in metadata: + if entry["name"] == name: + swhid = entry["target"] + attr = self.get_attrs(ArtifactEntry(name, swhid)) + self._inode2path[attr.st_ino] = path + return attr + + # TODO: error handling (name not found) + return pyfuse3.EntryAttributes() + + +def main(root_swhid: SWHID, root_path: Path, cache_dir: Path, api_url: str) -> None: + """ swh-fuse CLI entry-point """ + + fs = Fuse(root_swhid, root_path, cache_dir, api_url) + + fuse_options = set(pyfuse3.default_options) + fuse_options.add("fsname=swhfs") + fuse_options.add("debug") + pyfuse3.init(fs, root_path, fuse_options) + + loop = asyncio.get_event_loop() + try: + loop.run_until_complete(pyfuse3.main()) + fs.shutdown() + finally: + pyfuse3.close(unmount=True) + loop.close()