diff --git a/swh/fuse/fs/artifact.py b/swh/fuse/fs/artifact.py index 2ee0488..310c59f 100644 --- a/swh/fuse/fs/artifact.py +++ b/swh/fuse/fs/artifact.py @@ -1,253 +1,277 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from dataclasses import dataclass from pathlib import Path from typing import Any, AsyncIterator, List -from swh.fuse.fs.entry import EntryMode, FuseEntry -from swh.fuse.fs.symlink import SymlinkEntry +from swh.fuse.fs.entry import ( + EntryMode, + FuseDirEntry, + FuseEntry, + FuseFileEntry, + FuseSymlinkEntry, +) from swh.model.from_disk import DentryPerms from swh.model.identifiers import CONTENT, DIRECTORY, RELEASE, REVISION, SWHID @dataclass -class ArtifactEntry(FuseEntry): - """ FUSE virtual entry for a Software Heritage Artifact +class Content(FuseFileEntry): + """ Software Heritage content artifact. Attributes: swhid: Software Heritage persistent identifier prefetch: optional prefetched metadata used to set entry attributes - """ - - swhid: SWHID - prefetch: Any = None - - -class Content(ArtifactEntry): - """ Software Heritage content artifact. Content leaves (AKA blobs) are represented on disks as regular files, containing the corresponding bytes, as archived. Note that permissions are associated to blobs only in the context of directories. Hence, when accessing blobs from the top-level `archive/` directory, the permissions of the `archive/SWHID` file will be arbitrary and not meaningful (e.g., `0x644`). """ + swhid: SWHID + prefetch: Any = None + async def get_content(self) -> bytes: data = await self.fuse.get_blob(self.swhid) if not self.prefetch: self.prefetch = {"length": len(data)} return data async def size(self) -> int: if self.prefetch: return self.prefetch["length"] else: return len(await self.get_content()) - async def __aiter__(self): - raise ValueError("Cannot iterate over a content type artifact") - -class Directory(ArtifactEntry): +@dataclass +class Directory(FuseDirEntry): """ Software Heritage directory artifact. + Attributes: + swhid: Software Heritage persistent identifier + Directory nodes are represented as directories on the file-system, containing one entry for each entry of the archived directory. Entry names and other metadata, including permissions, will correspond to the archived entry metadata. Note that the FUSE mount is read-only, no matter what the permissions say. So it is possible that, in the context of a directory, a file is presented as writable, whereas actually writing to it will fail with `EPERM`. """ + swhid: SWHID + async def __aiter__(self) -> AsyncIterator[FuseEntry]: metadata = await self.fuse.get_metadata(self.swhid) for entry in metadata: name = entry["name"] swhid = entry["target"] mode = ( # Archived permissions for directories are always set to # 0o040000 so use a read-only permission instead int(EntryMode.RDONLY_DIR) if swhid.object_type == DIRECTORY else entry["perms"] ) - # 1. Symlinks - if mode == DentryPerms.symlink: + # 1. Regular file + if swhid.object_type == CONTENT: yield self.create_child( - SymlinkEntry, + Content, + name=name, + mode=mode, + swhid=swhid, + # The directory API has extra info we can use to set + # attributes without additional Software Heritage API call + prefetch=entry, + ) + # 2. Regular directory + elif swhid.object_type == DIRECTORY: + yield self.create_child( + Directory, name=name, mode=mode, swhid=swhid, + ) + # 3. Symlink + elif mode == DentryPerms.symlink: + yield self.create_child( + FuseSymlinkEntry, name=name, # Symlink target is stored in the blob content target=await self.fuse.get_blob(swhid), ) - # 2. Submodules + # 4. Submodule elif swhid.object_type == REVISION: # Make sure the revision metadata is fetched and create a # symlink to distinguish it with regular directories await self.fuse.get_metadata(swhid) yield self.create_child( - SymlinkEntry, + FuseSymlinkEntry, name=name, target=Path(self.get_relative_root_path(), f"archive/{swhid}"), ) - # 3. Regular entries (directories, contents) else: - yield self.create_child( - OBJTYPE_GETTERS[swhid.object_type], - name=name, - mode=mode, - swhid=swhid, - # The directory API has extra info we can use to set - # attributes without additional Software Heritage API call - prefetch=entry, - ) + raise ValueError("Unknown directory entry type: {swhid.object_type}") -class Revision(ArtifactEntry): +@dataclass +class Revision(FuseDirEntry): """ Software Heritage revision artifact. + Attributes: + swhid: Software Heritage persistent identifier + Revision (AKA commit) nodes are represented on the file-system as directories with the following entries: - `root`: source tree at the time of the commit, as a symlink pointing into `archive/`, to a SWHID of type `dir` - `parents/` (note the plural): a virtual directory containing entries named `1`, `2`, `3`, etc., one for each parent commit. Each of these entry is a symlink pointing into `archive/`, to the SWHID file for the given parent commit - `parent` (note the singular): present if and only if the current commit has at least one parent commit (which is the most common case). When present it is a symlink pointing into `parents/1/` - `meta.json`: metadata for the current node, as a symlink pointing to the relevant `meta/.json` file """ + swhid: SWHID + async def __aiter__(self) -> AsyncIterator[FuseEntry]: metadata = await self.fuse.get_metadata(self.swhid) directory = metadata["directory"] parents = metadata["parents"] # Make sure all necessary metadatas are fetched await self.fuse.get_metadata(directory) for parent in parents: await self.fuse.get_metadata(parent["id"]) root_path = self.get_relative_root_path() yield self.create_child( - SymlinkEntry, name="root", target=Path(root_path, f"archive/{directory}"), + FuseSymlinkEntry, + name="root", + target=Path(root_path, f"archive/{directory}"), ) yield self.create_child( - SymlinkEntry, + FuseSymlinkEntry, name="meta.json", target=Path(root_path, f"meta/{self.swhid}.json"), ) yield self.create_child( RevisionParents, name="parents", mode=int(EntryMode.RDONLY_DIR), parents=[x["id"] for x in parents], ) if len(parents) >= 1: yield self.create_child( - SymlinkEntry, name="parent", target="parents/1/", + FuseSymlinkEntry, name="parent", target="parents/1/", ) @dataclass -class RevisionParents(FuseEntry): +class RevisionParents(FuseDirEntry): """ Revision virtual `parents/` directory """ parents: List[SWHID] async def __aiter__(self) -> AsyncIterator[FuseEntry]: root_path = self.get_relative_root_path() for i, parent in enumerate(self.parents): yield self.create_child( - SymlinkEntry, + FuseSymlinkEntry, name=str(i + 1), target=Path(root_path, f"archive/{parent}"), ) -class Release(ArtifactEntry): +@dataclass +class Release(FuseDirEntry): """ Software Heritage release artifact. + Attributes: + swhid: Software Heritage persistent identifier + Release nodes are represented on the file-system as directories with the following entries: - `target`: target node, as a symlink to `archive/` - `target_type`: regular file containing the type of the target SWHID - `root`: present if and only if the release points to something that (transitively) resolves to a directory. When present it is a symlink pointing into `archive/` to the SWHID of the given directory - `meta.json`: metadata for the current node, as a symlink pointing to the relevant `meta/.json` file """ + swhid: SWHID + async def find_root_directory(self, swhid: SWHID) -> SWHID: if swhid.object_type == RELEASE: metadata = await self.fuse.get_metadata(swhid) return await self.find_root_directory(metadata["target"]) elif swhid.object_type == REVISION: metadata = await self.fuse.get_metadata(swhid) return metadata["directory"] elif swhid.object_type == DIRECTORY: return swhid else: return None async def __aiter__(self) -> AsyncIterator[FuseEntry]: metadata = await self.fuse.get_metadata(self.swhid) root_path = self.get_relative_root_path() yield self.create_child( - SymlinkEntry, + FuseSymlinkEntry, name="meta.json", target=Path(root_path, f"meta/{self.swhid}.json"), ) target = metadata["target"] yield self.create_child( - SymlinkEntry, name="target", target=Path(root_path, f"archive/{target}") + FuseSymlinkEntry, name="target", target=Path(root_path, f"archive/{target}") ) yield self.create_child( ReleaseType, name="target_type", mode=int(EntryMode.RDONLY_FILE), target_type=target.object_type, ) target_dir = await self.find_root_directory(target) if target_dir is not None: yield self.create_child( - SymlinkEntry, + FuseSymlinkEntry, name="root", target=Path(root_path, f"archive/{target_dir}"), ) @dataclass -class ReleaseType(FuseEntry): +class ReleaseType(FuseFileEntry): """ Release type virtual file """ target_type: str async def get_content(self) -> bytes: return str.encode(self.target_type + "\n") async def size(self) -> int: return len(await self.get_content()) OBJTYPE_GETTERS = { CONTENT: Content, DIRECTORY: Directory, REVISION: Revision, RELEASE: Release, } diff --git a/swh/fuse/fs/entry.py b/swh/fuse/fs/entry.py index 2078e09..c57ee5f 100644 --- a/swh/fuse/fs/entry.py +++ b/swh/fuse/fs/entry.py @@ -1,83 +1,109 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations from dataclasses import dataclass, field from enum import IntEnum from pathlib import Path from stat import S_IFDIR, S_IFLNK, S_IFREG -from typing import Any, AsyncIterator, Union +from typing import Any, Union # Avoid cycling import Fuse = "Fuse" class EntryMode(IntEnum): """ Default entry mode and permissions for the FUSE. The FUSE mount is always read-only, even if permissions contradict this statement (in a context of a directory, entries are listed with permissions taken from the archive). """ RDONLY_FILE = S_IFREG | 0o444 RDONLY_DIR = S_IFDIR | 0o555 SYMLINK = S_IFLNK | 0o444 @dataclass class FuseEntry: """ Main wrapper class to manipulate virtual FUSE entries Attributes: name: entry filename mode: entry permission mode fuse: internal reference to the main FUSE class inode: unique integer identifying the entry """ name: str mode: int depth: int fuse: Fuse inode: int = field(init=False) def __post_init__(self): self.inode = self.fuse._alloc_inode(self) + async def size(self) -> int: + """ Return the size (in bytes) of an entry """ + + raise NotImplementedError + + def get_relative_root_path(self) -> str: + return "../" * (self.depth - 1) + + def create_child(self, constructor: Any, **kwargs) -> FuseEntry: + return constructor(depth=self.depth + 1, fuse=self.fuse, **kwargs) + + +class FuseFileEntry(FuseEntry): + """ FUSE virtual file entry """ + async def get_content(self) -> bytes: """ Return the content of a file entry """ - return None + raise NotImplementedError - async def size(self) -> int: - """ Return the size of a file entry """ +class FuseDirEntry(FuseEntry): + """ FUSE virtual directory entry """ + + async def size(self) -> int: return 0 - async def __aiter__(self) -> AsyncIterator[FuseEntry]: + async def __aiter__(self): """ Return the child entries of a directory entry """ - yield None + raise NotImplementedError async def lookup(self, name: str) -> FuseEntry: """ Look up a FUSE entry by name """ async for entry in self: if entry.name == name: return entry return None - def get_target(self) -> Union[str, bytes, Path]: - """ Return the path target of a symlink entry """ - return None +@dataclass +class FuseSymlinkEntry(FuseEntry): + """ FUSE virtual symlink entry - def get_relative_root_path(self) -> str: - return "../" * (self.depth - 1) + Attributes: + target: path to symlink target + """ - def create_child(self, constructor: Any, **kwargs) -> FuseEntry: - return constructor(depth=self.depth + 1, fuse=self.fuse, **kwargs) + mode: int = field(init=False, default=int(EntryMode.SYMLINK)) + target: Union[str, bytes, Path] + + async def size(self) -> int: + return len(str(self.target)) + + def get_target(self) -> Union[str, bytes, Path]: + """ Return the path target of a symlink entry """ + + return self.target diff --git a/swh/fuse/fs/mountpoint.py b/swh/fuse/fs/mountpoint.py index 7cfa628..b3cb02b 100644 --- a/swh/fuse/fs/mountpoint.py +++ b/swh/fuse/fs/mountpoint.py @@ -1,103 +1,103 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from dataclasses import dataclass, field import json from typing import AsyncIterator from swh.fuse.fs.artifact import OBJTYPE_GETTERS -from swh.fuse.fs.entry import EntryMode, FuseEntry +from swh.fuse.fs.entry import EntryMode, FuseDirEntry, FuseEntry, FuseFileEntry from swh.model.exceptions import ValidationError from swh.model.identifiers import CONTENT, SWHID, parse_swhid @dataclass -class Root(FuseEntry): +class Root(FuseDirEntry): """ The FUSE mountpoint, consisting of the archive/ and meta/ directories """ name: str = field(init=False, default=None) mode: int = field(init=False, default=int(EntryMode.RDONLY_DIR)) depth: int = field(init=False, default=1) async def __aiter__(self) -> AsyncIterator[FuseEntry]: yield self.create_child(ArchiveDir) yield self.create_child(MetaDir) @dataclass -class ArchiveDir(FuseEntry): +class ArchiveDir(FuseDirEntry): """ The archive/ directory is lazily populated with one entry per accessed SWHID, having actual SWHIDs as names """ name: str = field(init=False, default="archive") mode: int = field(init=False, default=int(EntryMode.RDONLY_DIR)) def create_child(self, swhid: SWHID) -> FuseEntry: if swhid.object_type == CONTENT: mode = EntryMode.RDONLY_FILE else: mode = EntryMode.RDONLY_DIR return super().create_child( OBJTYPE_GETTERS[swhid.object_type], name=str(swhid), mode=int(mode), swhid=swhid, ) async def __aiter__(self) -> AsyncIterator[FuseEntry]: async for swhid in self.fuse.cache.get_cached_swhids(): yield self.create_child(swhid) async def lookup(self, name: str) -> FuseEntry: entry = await super().lookup(name) if entry: return entry # On the fly mounting of a new artifact try: swhid = parse_swhid(name) await self.fuse.get_metadata(swhid) return self.create_child(swhid) except ValidationError: return None @dataclass -class MetaDir(FuseEntry): +class MetaDir(FuseDirEntry): """ The meta/ directory contains one SWHID.json file for each SWHID entry under archive/. The JSON file contain all available meta information about the given SWHID, as returned by the Software Heritage Web API for that object. Note that, in case of pagination (e.g., snapshot objects with many branches) the JSON file will contain a complete version with all pages merged together. """ name: str = field(init=False, default="meta") mode: int = field(init=False, default=int(EntryMode.RDONLY_DIR)) async def __aiter__(self) -> AsyncIterator[FuseEntry]: async for swhid in self.fuse.cache.get_cached_swhids(): yield self.create_child( MetaEntry, name=f"{swhid}.json", mode=int(EntryMode.RDONLY_FILE), swhid=swhid, ) @dataclass -class MetaEntry(FuseEntry): +class MetaEntry(FuseFileEntry): """ An entry from the meta/ directory, containing for each accessed SWHID a corresponding SWHID.json file with all the metadata from the Software Heritage archive. """ swhid: SWHID async def get_content(self) -> bytes: # Get raw JSON metadata from API (un-typified) metadata = await self.fuse.cache.metadata.get(self.swhid, typify=False) return json.dumps(metadata).encode() async def size(self) -> int: return len(await self.get_content()) diff --git a/swh/fuse/fs/symlink.py b/swh/fuse/fs/symlink.py deleted file mode 100644 index 2151e9f..0000000 --- a/swh/fuse/fs/symlink.py +++ /dev/null @@ -1,28 +0,0 @@ -# Copyright (C) 2020 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -from dataclasses import dataclass, field -from pathlib import Path -from typing import Union - -from swh.fuse.fs.entry import EntryMode, FuseEntry - - -@dataclass -class SymlinkEntry(FuseEntry): - """ FUSE virtual entry for symlinks - - Attributes: - target: path to symlink target - """ - - mode: int = field(init=False, default=int(EntryMode.SYMLINK)) - target: Union[str, bytes, Path] - - async def size(self) -> int: - return len(str(self.target)) - - def get_target(self) -> Union[str, bytes, Path]: - return self.target diff --git a/swh/fuse/fuse.py b/swh/fuse/fuse.py index 040bd65..3221001 100644 --- a/swh/fuse/fuse.py +++ b/swh/fuse/fuse.py @@ -1,221 +1,225 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import asyncio import errno import logging import os from pathlib import Path import time from typing import Any, Dict, List import pyfuse3 import pyfuse3_asyncio import requests from swh.fuse.cache import FuseCache -from swh.fuse.fs.entry import FuseEntry +from swh.fuse.fs.entry import FuseDirEntry, FuseEntry, FuseFileEntry, FuseSymlinkEntry from swh.fuse.fs.mountpoint import Root from swh.model.identifiers import CONTENT, SWHID from swh.web.client.client import WebAPIClient class Fuse(pyfuse3.Operations): """ Software Heritage Filesystem in Userspace (FUSE). Locally mount parts of the archive and navigate it as a virtual file system. """ def __init__( self, root_path: Path, cache: FuseCache, conf: Dict[str, Any], ): super(Fuse, self).__init__() self._next_inode: int = pyfuse3.ROOT_INODE self._inode2entry: Dict[int, FuseEntry] = {} self.root = Root(fuse=self) self.time_ns: int = time.time_ns() # start time, used as timestamp self.gid = os.getgid() self.uid = os.getuid() self.web_api = WebAPIClient( conf["web-api"]["url"], conf["web-api"]["auth-token"] ) self.cache = cache def shutdown(self) -> None: pass def _alloc_inode(self, entry: FuseEntry) -> int: """ Return a unique inode integer for a given entry """ inode = self._next_inode self._next_inode += 1 self._inode2entry[inode] = entry # TODO add inode recycling with invocation to invalidate_inode when # the dicts get too big return inode def inode2entry(self, inode: int) -> FuseEntry: """ Return the entry matching a given inode """ try: return self._inode2entry[inode] except KeyError: raise pyfuse3.FUSEError(errno.ENOENT) async def get_metadata(self, swhid: SWHID) -> Any: """ Retrieve metadata for a given SWHID using Software Heritage API """ cache = await self.cache.metadata.get(swhid) if cache: return cache try: # TODO: swh-graph API typify = False # Get the raw JSON from the API # TODO: async web API loop = asyncio.get_event_loop() metadata = await loop.run_in_executor(None, self.web_api.get, swhid, typify) await self.cache.metadata.set(swhid, metadata) # Retrieve it from cache so it is correctly typed return await self.cache.metadata.get(swhid) except requests.HTTPError: logging.error(f"Unknown SWHID: '{swhid}'") async def get_blob(self, swhid: SWHID) -> bytes: """ Retrieve the blob bytes for a given content SWHID using Software Heritage API """ if swhid.object_type != CONTENT: raise pyfuse3.FUSEError(errno.EINVAL) # Make sure the metadata cache is also populated with the given SWHID await self.get_metadata(swhid) cache = await self.cache.blob.get(swhid) if cache: return cache loop = asyncio.get_event_loop() resp = await loop.run_in_executor(None, self.web_api.content_raw, swhid) blob = b"".join(list(resp)) await self.cache.blob.set(swhid, blob) return blob async def get_attrs(self, entry: FuseEntry) -> pyfuse3.EntryAttributes: """ Return entry attributes """ attrs = pyfuse3.EntryAttributes() attrs.st_size = 0 attrs.st_atime_ns = self.time_ns attrs.st_ctime_ns = self.time_ns attrs.st_mtime_ns = self.time_ns attrs.st_gid = self.gid attrs.st_uid = self.uid attrs.st_ino = entry.inode attrs.st_mode = entry.mode attrs.st_size = await entry.size() return attrs async def getattr( self, inode: int, _ctx: pyfuse3.RequestContext ) -> pyfuse3.EntryAttributes: """ Get attributes for a given inode """ entry = self.inode2entry(inode) return await self.get_attrs(entry) async def opendir(self, inode: int, _ctx: pyfuse3.RequestContext) -> int: """ Open a directory referred by a given inode """ # Re-use inode as directory handle return inode async def readdir(self, fh: int, offset: int, token: pyfuse3.ReaddirToken) -> None: """ Read entries in an open directory """ # opendir() uses inode as directory handle inode = fh # TODO: add cache on direntry list? direntry = self.inode2entry(inode) + assert isinstance(direntry, FuseDirEntry) next_id = offset + 1 i = 0 async for entry in direntry: if i < offset: i += 1 continue name = os.fsencode(entry.name) attrs = await self.get_attrs(entry) if not pyfuse3.readdir_reply(token, name, attrs, next_id): break next_id += 1 self._inode2entry[attrs.st_ino] = entry async def open( self, inode: int, _flags: int, _ctx: pyfuse3.RequestContext ) -> pyfuse3.FileInfo: """ Open an inode and return a unique file handle """ # Re-use inode as file handle return pyfuse3.FileInfo(fh=inode, keep_cache=True) async def read(self, fh: int, offset: int, length: int) -> bytes: """ Read `length` bytes from file handle `fh` at position `offset` """ # open() uses inode as file handle inode = fh entry = self.inode2entry(inode) + assert isinstance(entry, FuseFileEntry) data = await entry.get_content() return data[offset : offset + length] async def lookup( self, parent_inode: int, name: str, _ctx: pyfuse3.RequestContext ) -> pyfuse3.EntryAttributes: """ Look up a directory entry by name and get its attributes """ name = os.fsdecode(name) parent_entry = self.inode2entry(parent_inode) + assert isinstance(parent_entry, FuseDirEntry) lookup_entry = await parent_entry.lookup(name) if lookup_entry: return await self.get_attrs(lookup_entry) else: logging.error(f"Unknown name during lookup: '{name}'") raise pyfuse3.FUSEError(errno.ENOENT) async def readlink(self, inode: int, _ctx: pyfuse3.RequestContext) -> bytes: entry = self.inode2entry(inode) + assert isinstance(entry, FuseSymlinkEntry) return os.fsencode(entry.get_target()) async def main(swhids: List[SWHID], root_path: Path, conf: Dict[str, Any]) -> None: """ swh-fuse CLI entry-point """ # Use pyfuse3 asyncio layer to match the rest of Software Heritage codebase pyfuse3_asyncio.enable() async with FuseCache(conf["cache"]) as cache: fs = Fuse(root_path, cache, conf) # Initially populate the cache for swhid in swhids: await fs.get_metadata(swhid) fuse_options = set(pyfuse3.default_options) fuse_options.add("fsname=swhfs") fuse_options.add("debug") pyfuse3.init(fs, root_path, fuse_options) try: await pyfuse3.main() finally: fs.shutdown() pyfuse3.close(unmount=True)