diff --git a/mypy.ini b/mypy.ini --- a/mypy.ini +++ b/mypy.ini @@ -11,5 +11,8 @@ [mypy-pytest.*] ignore_missing_imports = True -# [mypy-add_your_lib_here.*] -# ignore_missing_imports = True +[mypy-pyfuse3.*] +ignore_missing_imports = True + +[mypy-pyfuse3_asyncio.*] +ignore_missing_imports = True diff --git a/swh/fuse/cache.py b/swh/fuse/cache.py new file mode 100644 --- /dev/null +++ b/swh/fuse/cache.py @@ -0,0 +1,71 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import json +from pathlib import Path +import sqlite3 + +from swh.model.identifiers import CONTENT, SWHID, parse_swhid +from swh.web.client.client import typify + + +class Cache: + def __init__(self, cache_dir): + if cache_dir == ":memory:": + self.conn = sqlite3.connect(":memory:") + else: + cache_path = Path(cache_dir, "cache.sqlite") + cache_path.parents[0].mkdir(parents=True, exist_ok=True) + self.conn = sqlite3.connect(cache_path) + + self.db = self.conn.cursor() + self.db.execute("create table if not exists metadata_cache (swhid, metadata)") + self.db.execute("create table if not exists blob_cache (swhid, blob)") + self.conn.commit() + + def get_metadata(self, swhid: SWHID): + self.db.execute( + "select metadata from metadata_cache where swhid=?", (str(swhid),) + ) + cache = self.db.fetchone() + if cache: + metadata = json.loads(cache[0]) + return typify(metadata, swhid.object_type) + + def put_metadata(self, swhid: SWHID, metadata): + self.db.execute( + "insert into metadata_cache values (?, ?)", + ( + str(swhid), + json.dumps( + metadata, + # Converts the typified JSON to plain str version + default=lambda x: x.object_id + if isinstance(x, SWHID) + else x.__dict__, + ), + ), + ) + self.conn.commit() + + def get_metadata_swhids(self): + self.db.execute("select swhid from metadata_cache") + swhids = self.db.fetchall() + swhids = [parse_swhid(x[0]) for x in swhids] + return swhids + + def get_blob(self, swhid: SWHID): + if swhid.object_type != CONTENT: + raise AttributeError("Cannot retrieve blob from non-content object type") + + self.db.execute("select blob from blob_cache where swhid=?", (str(swhid),)) + cache = self.db.fetchone() + if cache: + blob = cache[0] + return blob + + def put_blob(self, swhid: SWHID, blob): + self.db.execute("insert into blob_cache values (?, ?)", (str(swhid), blob)) + self.conn.commit() diff --git a/swh/fuse/cli.py b/swh/fuse/cli.py --- a/swh/fuse/cli.py +++ b/swh/fuse/cli.py @@ -3,29 +3,57 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import os + # WARNING: do not import unnecessary things here to keep cli startup time under # control -import os +from pathlib import Path from typing import Any, Dict import click # from swh.core import config from swh.core.cli import CONTEXT_SETTINGS +from swh.model.identifiers import SWHID # All generic config code should reside in swh.core.config DEFAULT_CONFIG_PATH = os.environ.get( "SWH_CONFIG_FILE", os.path.join(click.get_app_dir("swh"), "global.yml") ) +CACHE_HOME_DIR = ( + Path(os.environ["XDG_CACHE_HOME"]) + if "XDG_CACHE_HOME" in os.environ + else Path(Path.home(), ".cache") +) + DEFAULT_CONFIG: Dict[str, Any] = { + "cache-dir": Path(CACHE_HOME_DIR, "swh", "fuse"), "web-api": { "url": "https://archive.softwareheritage.org/api/1", "auth-token": None, - } + }, } +class SWHIDParamType(click.ParamType): + """Click argument that accepts SWHID and return them as + :class:`swh.model.identifiers.SWHID` instances + + """ + + name = "SWHID" + + def convert(self, value, param, ctx) -> SWHID: + from swh.model.exceptions import ValidationError + from swh.model.identifiers import parse_swhid + + try: + return parse_swhid(value) + except ValidationError: + self.fail(f'"{value}" is not a valid SWHID', param, ctx) + + @click.group(name="fuse", context_settings=CONTEXT_SETTINGS) # XXX conffile logic temporarily commented out due to: # XXX https://forge.softwareheritage.org/T2632 @@ -50,6 +78,25 @@ @cli.command() +@click.argument("swhid", required=True, metavar="SWHID", type=SWHIDParamType()) +@click.argument( + "path", + required=True, + metavar="PATH", + type=click.Path(exists=True, dir_okay=True, file_okay=False), +) +@click.option( + "-c", + "--cache-dir", + default=DEFAULT_CONFIG["cache-dir"], + metavar="CACHE_DIR", + show_default=True, + type=click.Path(dir_okay=True, file_okay=False), + help=""" + directory where to store cache from the Software Heritage API. To + store all the cache in memory instead of disk, use a value of ':memory:'. + """, +) @click.option( "-u", "--api-url", @@ -59,8 +106,8 @@ help="base URL for Software Heritage Web API", ) @click.pass_context -def mount(ctx, api_url): +def mount(ctx, swhid, path, cache_dir, api_url): """Mount the Software Heritage archive at the given mount point""" - from .fuse import fuse # XXX + from swh.fuse import fuse - fuse() + fuse.main(swhid, path, cache_dir, api_url) diff --git a/swh/fuse/fs/artifact.py b/swh/fuse/fs/artifact.py new file mode 100644 --- /dev/null +++ b/swh/fuse/fs/artifact.py @@ -0,0 +1,23 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from swh.fuse.fs.entry import ArtifactEntry + + +class Content: + def __init__(self, json): + self.json = json + + +class Directory: + def __init__(self, json): + self.json = json + + def __iter__(self): + entries = [] + for entry in self.json: + name, swhid = entry["name"], entry["target"] + entries.append(ArtifactEntry(name, swhid)) + return iter(entries) diff --git a/swh/fuse/fs/entry.py b/swh/fuse/fs/entry.py new file mode 100644 --- /dev/null +++ b/swh/fuse/fs/entry.py @@ -0,0 +1,31 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from enum import IntEnum +from stat import S_IFDIR, S_IFREG + +from swh.model.identifiers import SWHID + + +class EntryMode(IntEnum): + RDONLY_FILE = S_IFREG | 0o444 + RDONLY_DIR = S_IFDIR | 0o555 + + +class VirtualEntry: + def __init__(self, name: str, mode: EntryMode): + self.name = name + self.mode = mode + + +class ArtifactEntry(VirtualEntry): + def __init__(self, name: str, swhid: SWHID): + self.name = name + self.swhid = swhid + + +ROOT_ENTRY = VirtualEntry("root", EntryMode.RDONLY_DIR) +ARCHIVE_ENTRY = VirtualEntry("archive", EntryMode.RDONLY_DIR) +META_ENTRY = VirtualEntry("meta", EntryMode.RDONLY_DIR) diff --git a/swh/fuse/fs/mountpoint.py b/swh/fuse/fs/mountpoint.py new file mode 100644 --- /dev/null +++ b/swh/fuse/fs/mountpoint.py @@ -0,0 +1,41 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from swh.fuse.fs.entry import ( + ARCHIVE_ENTRY, + META_ENTRY, + ArtifactEntry, + EntryMode, + VirtualEntry, +) + + +class Root: + def __iter__(self): + entries = [ARCHIVE_ENTRY, META_ENTRY] + return iter(entries) + + +class Archive: + def __init__(self, cache): + self.cache = cache + + def __iter__(self): + entries = [] + for swhid in self.cache.get_metadata_swhids(): + entries.append(ArtifactEntry(str(swhid), swhid)) + return iter(entries) + + +class Meta: + def __init__(self, cache): + self.cache = cache + + def __iter__(self): + entries = [] + for swhid in self.cache.get_metadata_swhids(): + filename = str(swhid) + ".json" + entries.append(VirtualEntry(filename, EntryMode.RDONLY_FILE)) + return iter(entries) diff --git a/swh/fuse/fuse.py b/swh/fuse/fuse.py --- a/swh/fuse/fuse.py +++ b/swh/fuse/fuse.py @@ -3,11 +3,266 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import asyncio +import errno +import itertools +import os +from pathlib import Path +import time +from typing import Dict -import logging -import sys +import pyfuse3 +import pyfuse3_asyncio +import requests +from swh.fuse.cache import Cache +from swh.fuse.fs.artifact import Content, Directory +from swh.fuse.fs.entry import ( + ARCHIVE_ENTRY, + META_ENTRY, + ROOT_ENTRY, + ArtifactEntry, + EntryMode, + VirtualEntry, +) +from swh.fuse.fs.mountpoint import Archive, Meta, Root +from swh.model.identifiers import CONTENT, DIRECTORY, SWHID, parse_swhid +from swh.web.client.client import WebAPIClient -def fuse(): - logging.error("not implemented: FUSE mounting") - sys.exit(1) +# Use pyfuse3 asyncio layer to match the rest of Software Heritage codebase +pyfuse3_asyncio.enable() + + +class Fuse(pyfuse3.Operations): + def __init__( + self, root_swhid: SWHID, root_path: Path, cache_dir: Path, api_url: str + ): + super(Fuse, self).__init__() + + # TODO check if root_swhid actually exists in the graph + + root_inode = pyfuse3.ROOT_INODE + self._inode2entry: Dict[int, VirtualEntry] = {root_inode: ROOT_ENTRY} + self._entry2inode: Dict[VirtualEntry, int] = {ROOT_ENTRY: root_inode} + self._entry2fd: Dict[VirtualEntry, int] = {} + self._fd2entry: Dict[int, VirtualEntry] = {} + self._inode2path: Dict[int, Path] = {root_inode: root_path} + + self._next_inode: int = root_inode + 1 + self._next_fd: int = 0 + + self.time_ns: int = time.time_ns() # start time, used as timestamp + self.gid = os.getgid() + self.uid = os.getuid() + + self.web_api = WebAPIClient(api_url) + self.cache = Cache(cache_dir) + + # Initially populate the cache + self.get_metadata(root_swhid) + + def _alloc_inode(self, entry: VirtualEntry) -> int: + try: + return self._entry2inode[entry] + except KeyError: + inode = self._next_inode + self._next_inode += 1 + self._entry2inode[entry] = inode + self._inode2entry[inode] = entry + + # TODO add inode recycling with invocation to invalidate_inode when + # the dicts get too big + + return inode + + def _alloc_fd(self, entry: VirtualEntry) -> int: + try: + return self._entry2fd[entry] + except KeyError: + fd = self._next_fd + self._next_fd += 1 + self._entry2fd[entry] = fd + self._fd2entry[fd] = entry + return fd + + def inode2entry(self, inode: int) -> VirtualEntry: + try: + return self._inode2entry[inode] + except KeyError: + raise pyfuse3.FUSEError(errno.ENOENT) + + def entry2inode(self, entry: VirtualEntry) -> int: + try: + return self._entry2inode[entry] + except KeyError: + raise pyfuse3.FUSEError(errno.ENOENT) + + def inode2path(self, inode: int) -> Path: + try: + return self._inode2path[inode] + except KeyError: + raise pyfuse3.FUSEError(errno.ENOENT) + + def get_metadata(self, swhid: SWHID): + # TODO: swh-graph API + cache = self.cache.get_metadata(swhid) + if cache: + return cache + + metadata = self.web_api.get(swhid) + self.cache.put_metadata(swhid, metadata) + return metadata + + def get_blob(self, swhid: SWHID): + if swhid.object_type != CONTENT: + return None + + cache = self.cache.get_blob(swhid) + if cache: + return cache + + metadata = self.get_metadata(swhid) + blob = requests.get(metadata["data_url"]).text + self.cache.put_blob(swhid, blob) + return blob + + def get_direntries(self, entry: VirtualEntry): + if isinstance(entry, ArtifactEntry): + if entry.swhid.object_type == CONTENT: + raise pyfuse3.FUSEError(errno.ENOTDIR) + + metadata = self.get_metadata(entry.swhid) + if entry.swhid.object_type == CONTENT: + return Content(metadata) + if entry.swhid.object_type == DIRECTORY: + return Directory(metadata) + # TODO: add other objects + else: + if entry == ROOT_ENTRY: + return Root() + elif entry == ARCHIVE_ENTRY: + return Archive(self.cache) + elif entry == META_ENTRY: + return Meta(self.cache) + # TODO: error handling + + def get_attrs(self, entry: VirtualEntry, prefetch=None) -> pyfuse3.EntryAttributes: + attrs = pyfuse3.EntryAttributes() + attrs.st_size = 0 + attrs.st_atime_ns = self.time_ns + attrs.st_ctime_ns = self.time_ns + attrs.st_mtime_ns = self.time_ns + attrs.st_gid = self.gid + attrs.st_uid = self.uid + attrs.st_ino = self._alloc_inode(entry) + + if isinstance(entry, ArtifactEntry): + metadata = prefetch or self.get_metadata(entry.swhid) + if entry.swhid.object_type == CONTENT: + attrs.st_mode = int(EntryMode.RDONLY_FILE) + # Only the directory API prefetch stores the permissions + if prefetch: + attrs.st_mode = metadata["perms"] + attrs.st_size = metadata["length"] + else: + attrs.st_mode = int(EntryMode.RDONLY_DIR) + else: + attrs.st_mode = int(entry.mode) + if entry.name.endswith(".json"): + filename = Path(entry.name).stem + swhid = parse_swhid(filename) + metadata = self.get_metadata(swhid) + attrs.st_size = len(str(metadata)) + + return attrs + + async def getattr(self, inode, ctx): + entry = self.inode2entry(inode) + return self.get_attrs(entry) + + async def opendir(self, inode, ctx): + # Re-use inode as directory handle + return inode + + async def readdir(self, inode, offset, token): + direntry = self.inode2entry(inode) + path = self.inode2path(inode) + + # TODO: add cache on direntry list? + entries = self.get_direntries(direntry) + current_id = offset + for entry in itertools.islice(entries, offset, None): + # The directory API has extra info we can use to set attributes + # without additional SWH API call + prefetch = None + if isinstance(entries, Directory): + prefetch = entries.json[current_id] + + attrs = self.get_attrs(entry, prefetch) + next_id = current_id + 1 + if not pyfuse3.readdir_reply( + token, os.fsencode(entry.name), attrs, next_id + ): + break + + current_id = next_id + self._inode2entry[attrs.st_ino] = entry + self._inode2path[attrs.st_ino] = Path(path, entry.name) + + async def open(self, inode, flags, ctx): + entry = self.inode2entry(inode) + fd = self._alloc_fd(entry) + return pyfuse3.FileInfo(fh=fd, keep_cache=True) + + async def read(self, fd, offset, length): + try: + entry = self._fd2entry[fd] + except KeyError: + raise pyfuse3.FUSEError(errno.ENOENT) + + if isinstance(entry, ArtifactEntry): + blob = self.get_blob(entry.swhid) + return blob.encode() + else: + if entry.name.endswith(".json"): + filename = Path(entry.name).stem + swhid = parse_swhid(filename) + metadata = self.get_metadata(swhid) + return str(metadata).encode() + else: + # TODO: error handling? + pass + + async def lookup(self, parent_inode, name, ctx): + name = os.fsdecode(name) + path = Path(self.inode2path(parent_inode), name) + + parent_entry = self.inode2entry(parent_inode) + if isinstance(parent_entry, ArtifactEntry): + metadata = self.get_metadata(parent_entry.swhid) + for entry in metadata: + if entry["name"] == name: + swhid = entry["target"] + attr = self.get_attrs(ArtifactEntry(name, swhid)) + self._inode2path[attr.st_ino] = path + return attr + + # TODO: error handling (name not found) + return pyfuse3.EntryAttributes() + + +def main(root_swhid: SWHID, root_path: Path, cache_dir: Path, api_url: str) -> None: + fs = Fuse(root_swhid, root_path, cache_dir, api_url) + + fuse_options = set(pyfuse3.default_options) + fuse_options.add("fsname=swhfs") + fuse_options.add("debug") + pyfuse3.init(fs, root_path, fuse_options) + + loop = asyncio.get_event_loop() + try: + loop.run_until_complete(pyfuse3.main()) + fs.shutdown() + finally: + pyfuse3.close(unmount=True) + loop.close()