diff --git a/swh/fuse/fs/entry.py b/swh/fuse/fs/entry.py index af4158f..2078e09 100644 --- a/swh/fuse/fs/entry.py +++ b/swh/fuse/fs/entry.py @@ -1,75 +1,83 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations from dataclasses import dataclass, field from enum import IntEnum from pathlib import Path from stat import S_IFDIR, S_IFLNK, S_IFREG from typing import Any, AsyncIterator, Union # Avoid cycling import Fuse = "Fuse" class EntryMode(IntEnum): """ Default entry mode and permissions for the FUSE. The FUSE mount is always read-only, even if permissions contradict this statement (in a context of a directory, entries are listed with permissions taken from the archive). """ RDONLY_FILE = S_IFREG | 0o444 RDONLY_DIR = S_IFDIR | 0o555 SYMLINK = S_IFLNK | 0o444 @dataclass class FuseEntry: """ Main wrapper class to manipulate virtual FUSE entries Attributes: name: entry filename mode: entry permission mode fuse: internal reference to the main FUSE class inode: unique integer identifying the entry """ name: str mode: int depth: int fuse: Fuse inode: int = field(init=False) def __post_init__(self): self.inode = self.fuse._alloc_inode(self) async def get_content(self) -> bytes: """ Return the content of a file entry """ return None async def size(self) -> int: """ Return the size of a file entry """ return 0 async def __aiter__(self) -> AsyncIterator[FuseEntry]: """ Return the child entries of a directory entry """ yield None + async def lookup(self, name: str) -> FuseEntry: + """ Look up a FUSE entry by name """ + + async for entry in self: + if entry.name == name: + return entry + return None + def get_target(self) -> Union[str, bytes, Path]: """ Return the path target of a symlink entry """ return None def get_relative_root_path(self) -> str: return "../" * (self.depth - 1) def create_child(self, constructor: Any, **kwargs) -> FuseEntry: return constructor(depth=self.depth + 1, fuse=self.fuse, **kwargs) diff --git a/swh/fuse/fs/mountpoint.py b/swh/fuse/fs/mountpoint.py index ccaa0f1..7cfa628 100644 --- a/swh/fuse/fs/mountpoint.py +++ b/swh/fuse/fs/mountpoint.py @@ -1,86 +1,103 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from dataclasses import dataclass, field import json from typing import AsyncIterator from swh.fuse.fs.artifact import OBJTYPE_GETTERS from swh.fuse.fs.entry import EntryMode, FuseEntry -from swh.model.identifiers import CONTENT, SWHID +from swh.model.exceptions import ValidationError +from swh.model.identifiers import CONTENT, SWHID, parse_swhid @dataclass class Root(FuseEntry): """ The FUSE mountpoint, consisting of the archive/ and meta/ directories """ name: str = field(init=False, default=None) mode: int = field(init=False, default=int(EntryMode.RDONLY_DIR)) depth: int = field(init=False, default=1) async def __aiter__(self) -> AsyncIterator[FuseEntry]: yield self.create_child(ArchiveDir) yield self.create_child(MetaDir) @dataclass class ArchiveDir(FuseEntry): """ The archive/ directory is lazily populated with one entry per accessed SWHID, having actual SWHIDs as names """ name: str = field(init=False, default="archive") mode: int = field(init=False, default=int(EntryMode.RDONLY_DIR)) + def create_child(self, swhid: SWHID) -> FuseEntry: + if swhid.object_type == CONTENT: + mode = EntryMode.RDONLY_FILE + else: + mode = EntryMode.RDONLY_DIR + return super().create_child( + OBJTYPE_GETTERS[swhid.object_type], + name=str(swhid), + mode=int(mode), + swhid=swhid, + ) + async def __aiter__(self) -> AsyncIterator[FuseEntry]: async for swhid in self.fuse.cache.get_cached_swhids(): - if swhid.object_type == CONTENT: - mode = EntryMode.RDONLY_FILE - else: - mode = EntryMode.RDONLY_DIR - yield self.create_child( - OBJTYPE_GETTERS[swhid.object_type], - name=str(swhid), - mode=int(mode), - swhid=swhid, - ) + yield self.create_child(swhid) + + async def lookup(self, name: str) -> FuseEntry: + entry = await super().lookup(name) + if entry: + return entry + + # On the fly mounting of a new artifact + try: + swhid = parse_swhid(name) + await self.fuse.get_metadata(swhid) + return self.create_child(swhid) + except ValidationError: + return None @dataclass class MetaDir(FuseEntry): """ The meta/ directory contains one SWHID.json file for each SWHID entry under archive/. The JSON file contain all available meta information about the given SWHID, as returned by the Software Heritage Web API for that object. Note that, in case of pagination (e.g., snapshot objects with many branches) the JSON file will contain a complete version with all pages merged together. """ name: str = field(init=False, default="meta") mode: int = field(init=False, default=int(EntryMode.RDONLY_DIR)) async def __aiter__(self) -> AsyncIterator[FuseEntry]: async for swhid in self.fuse.cache.get_cached_swhids(): yield self.create_child( MetaEntry, name=f"{swhid}.json", mode=int(EntryMode.RDONLY_FILE), swhid=swhid, ) @dataclass class MetaEntry(FuseEntry): """ An entry from the meta/ directory, containing for each accessed SWHID a corresponding SWHID.json file with all the metadata from the Software Heritage archive. """ swhid: SWHID async def get_content(self) -> bytes: # Get raw JSON metadata from API (un-typified) metadata = await self.fuse.cache.metadata.get(self.swhid, typify=False) return json.dumps(metadata).encode() async def size(self) -> int: return len(await self.get_content()) diff --git a/swh/fuse/fuse.py b/swh/fuse/fuse.py index 086ffb5..040bd65 100644 --- a/swh/fuse/fuse.py +++ b/swh/fuse/fuse.py @@ -1,223 +1,221 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import asyncio import errno import logging import os from pathlib import Path import time from typing import Any, Dict, List import pyfuse3 import pyfuse3_asyncio import requests from swh.fuse.cache import FuseCache from swh.fuse.fs.entry import FuseEntry from swh.fuse.fs.mountpoint import Root from swh.model.identifiers import CONTENT, SWHID from swh.web.client.client import WebAPIClient class Fuse(pyfuse3.Operations): """ Software Heritage Filesystem in Userspace (FUSE). Locally mount parts of the archive and navigate it as a virtual file system. """ def __init__( self, root_path: Path, cache: FuseCache, conf: Dict[str, Any], ): super(Fuse, self).__init__() self._next_inode: int = pyfuse3.ROOT_INODE self._inode2entry: Dict[int, FuseEntry] = {} self.root = Root(fuse=self) self.time_ns: int = time.time_ns() # start time, used as timestamp self.gid = os.getgid() self.uid = os.getuid() self.web_api = WebAPIClient( conf["web-api"]["url"], conf["web-api"]["auth-token"] ) self.cache = cache def shutdown(self) -> None: pass def _alloc_inode(self, entry: FuseEntry) -> int: """ Return a unique inode integer for a given entry """ inode = self._next_inode self._next_inode += 1 self._inode2entry[inode] = entry # TODO add inode recycling with invocation to invalidate_inode when # the dicts get too big return inode def inode2entry(self, inode: int) -> FuseEntry: """ Return the entry matching a given inode """ try: return self._inode2entry[inode] except KeyError: raise pyfuse3.FUSEError(errno.ENOENT) async def get_metadata(self, swhid: SWHID) -> Any: """ Retrieve metadata for a given SWHID using Software Heritage API """ cache = await self.cache.metadata.get(swhid) if cache: return cache try: # TODO: swh-graph API typify = False # Get the raw JSON from the API # TODO: async web API loop = asyncio.get_event_loop() metadata = await loop.run_in_executor(None, self.web_api.get, swhid, typify) await self.cache.metadata.set(swhid, metadata) # Retrieve it from cache so it is correctly typed return await self.cache.metadata.get(swhid) except requests.HTTPError: logging.error(f"Unknown SWHID: '{swhid}'") async def get_blob(self, swhid: SWHID) -> bytes: """ Retrieve the blob bytes for a given content SWHID using Software Heritage API """ if swhid.object_type != CONTENT: raise pyfuse3.FUSEError(errno.EINVAL) # Make sure the metadata cache is also populated with the given SWHID await self.get_metadata(swhid) cache = await self.cache.blob.get(swhid) if cache: return cache loop = asyncio.get_event_loop() resp = await loop.run_in_executor(None, self.web_api.content_raw, swhid) blob = b"".join(list(resp)) await self.cache.blob.set(swhid, blob) return blob async def get_attrs(self, entry: FuseEntry) -> pyfuse3.EntryAttributes: """ Return entry attributes """ attrs = pyfuse3.EntryAttributes() attrs.st_size = 0 attrs.st_atime_ns = self.time_ns attrs.st_ctime_ns = self.time_ns attrs.st_mtime_ns = self.time_ns attrs.st_gid = self.gid attrs.st_uid = self.uid attrs.st_ino = entry.inode attrs.st_mode = entry.mode attrs.st_size = await entry.size() return attrs async def getattr( self, inode: int, _ctx: pyfuse3.RequestContext ) -> pyfuse3.EntryAttributes: """ Get attributes for a given inode """ entry = self.inode2entry(inode) return await self.get_attrs(entry) async def opendir(self, inode: int, _ctx: pyfuse3.RequestContext) -> int: """ Open a directory referred by a given inode """ # Re-use inode as directory handle return inode async def readdir(self, fh: int, offset: int, token: pyfuse3.ReaddirToken) -> None: """ Read entries in an open directory """ # opendir() uses inode as directory handle inode = fh # TODO: add cache on direntry list? direntry = self.inode2entry(inode) next_id = offset + 1 i = 0 async for entry in direntry: if i < offset: i += 1 continue name = os.fsencode(entry.name) attrs = await self.get_attrs(entry) if not pyfuse3.readdir_reply(token, name, attrs, next_id): break next_id += 1 self._inode2entry[attrs.st_ino] = entry async def open( self, inode: int, _flags: int, _ctx: pyfuse3.RequestContext ) -> pyfuse3.FileInfo: """ Open an inode and return a unique file handle """ # Re-use inode as file handle return pyfuse3.FileInfo(fh=inode, keep_cache=True) async def read(self, fh: int, offset: int, length: int) -> bytes: """ Read `length` bytes from file handle `fh` at position `offset` """ # open() uses inode as file handle inode = fh entry = self.inode2entry(inode) data = await entry.get_content() return data[offset : offset + length] async def lookup( self, parent_inode: int, name: str, _ctx: pyfuse3.RequestContext ) -> pyfuse3.EntryAttributes: """ Look up a directory entry by name and get its attributes """ name = os.fsdecode(name) parent_entry = self.inode2entry(parent_inode) - - async for entry in parent_entry: - if name == entry.name: - attr = await self.get_attrs(entry) - return attr - - logging.error(f"Unknown name during lookup: '{name}'") - raise pyfuse3.FUSEError(errno.ENOENT) + lookup_entry = await parent_entry.lookup(name) + if lookup_entry: + return await self.get_attrs(lookup_entry) + else: + logging.error(f"Unknown name during lookup: '{name}'") + raise pyfuse3.FUSEError(errno.ENOENT) async def readlink(self, inode: int, _ctx: pyfuse3.RequestContext) -> bytes: entry = self.inode2entry(inode) return os.fsencode(entry.get_target()) async def main(swhids: List[SWHID], root_path: Path, conf: Dict[str, Any]) -> None: """ swh-fuse CLI entry-point """ # Use pyfuse3 asyncio layer to match the rest of Software Heritage codebase pyfuse3_asyncio.enable() async with FuseCache(conf["cache"]) as cache: fs = Fuse(root_path, cache, conf) # Initially populate the cache for swhid in swhids: await fs.get_metadata(swhid) fuse_options = set(pyfuse3.default_options) fuse_options.add("fsname=swhfs") fuse_options.add("debug") pyfuse3.init(fs, root_path, fuse_options) try: await pyfuse3.main() finally: fs.shutdown() pyfuse3.close(unmount=True)