diff --git a/requirements-swh.txt b/requirements-swh.txt index 97ca355..1edfac0 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,4 +1,4 @@ # Add here internal Software Heritage dependencies, one per line. swh.core swh.model>=0.7.0 -swh.web.client +swh.web.client>=0.2.1 diff --git a/swh/fuse/cache.py b/swh/fuse/cache.py index 70ec4bc..4a9d688 100644 --- a/swh/fuse/cache.py +++ b/swh/fuse/cache.py @@ -1,150 +1,138 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from abc import ABC import json from typing import Any, AsyncGenerator, Dict, Optional import aiosqlite from swh.model.identifiers import SWHID, parse_swhid -from swh.web.client.client import typify +from swh.web.client.client import typify_json class FuseCache: """ SWH FUSE retrieves both metadata and file contents from the Software Heritage archive via the network. In order to obtain reasonable performances several caches are used to minimize network transfer. Caches are stored on disk in SQLite databases located at `$XDG_CACHE_HOME/swh/fuse/`. All caches are persistent (i.e., they survive the restart of the SWH FUSE process) and global (i.e., they are shared by concurrent SWH FUSE processes). We assume that no cache *invalidation* is necessary, due to intrinsic properties of the Software Heritage archive, such as integrity verification and append-only archive changes. To clean the caches one can just remove the corresponding files from disk. """ def __init__(self, cache_conf: Dict[str, Any]): self.cache_conf = cache_conf async def __aenter__(self): self.metadata = MetadataCache(self.cache_conf["metadata"]) self.blob = BlobCache(self.cache_conf["blob"]) await self.metadata.__aenter__() await self.blob.__aenter__() return self async def __aexit__(self, type=None, val=None, tb=None) -> None: await self.metadata.__aexit__() await self.blob.__aexit__() async def get_cached_swhids(self) -> AsyncGenerator[SWHID, None]: """ Return a list of all previously cached SWHID """ - meta_cursor = await self.metadata.conn.execute( + # Use the metadata db since it should always contain all accessed SWHIDs + metadata_cursor = await self.metadata.conn.execute( "select swhid from metadata_cache" ) - blob_cursor = await self.blob.conn.execute("select swhid from blob_cache") - # Some entries can be in one cache but not in the other so create a set - # from all caches - swhids = await meta_cursor.fetchall() + await blob_cursor.fetchall() - swhids = [parse_swhid(x[0]) for x in swhids] - for swhid in set(swhids): - yield swhid + swhids = await metadata_cursor.fetchall() + for raw_swhid in swhids: + yield parse_swhid(raw_swhid[0]) class AbstractCache(ABC): """ Abstract cache implementation to share common behavior between cache types (such as: YAML config parsing, SQLite context manager) """ def __init__(self, conf: Dict[str, Any]): self.conf = conf async def __aenter__(self): # In-memory (thus temporary) caching is useful for testing purposes if self.conf.get("in-memory", False): path = ":memory:" else: path = self.conf["path"] self.conn = await aiosqlite.connect(path) return self async def __aexit__(self, type=None, val=None, tb=None) -> None: await self.conn.close() class MetadataCache(AbstractCache): """ The metadata cache map each SWHID to the complete metadata of the referenced object. This is analogous to what is available in `meta/.json` file (and generally used as data source for returning the content of those files). """ async def __aenter__(self): await super().__aenter__() await self.conn.execute( "create table if not exists metadata_cache (swhid, metadata)" ) return self - async def get(self, swhid: SWHID) -> Any: + async def get(self, swhid: SWHID, typify: bool = True) -> Any: cursor = await self.conn.execute( "select metadata from metadata_cache where swhid=?", (str(swhid),) ) cache = await cursor.fetchone() if cache: metadata = json.loads(cache[0]) - return typify(metadata, swhid.object_type) + return typify_json(metadata, swhid.object_type) if typify else metadata else: return None async def set(self, swhid: SWHID, metadata: Any) -> None: await self.conn.execute( "insert into metadata_cache values (?, ?)", - ( - str(swhid), - json.dumps( - metadata, - # Converts the typified JSON to plain str version - default=lambda x: ( - x.object_id if isinstance(x, SWHID) else x.__dict__ - ), - ), - ), + (str(swhid), json.dumps(metadata)), ) class BlobCache(AbstractCache): """ The blob cache map SWHIDs of type `cnt` to the bytes of their archived content. The blob cache entry for a given content object is populated, at the latest, the first time the object is `read()`-d. It might be populated earlier on due to prefetching, e.g., when a directory pointing to the given content is listed for the first time. """ async def __aenter__(self): await super().__aenter__() await self.conn.execute("create table if not exists blob_cache (swhid, blob)") return self async def get(self, swhid: SWHID) -> Optional[bytes]: cursor = await self.conn.execute( "select blob from blob_cache where swhid=?", (str(swhid),) ) cache = await cursor.fetchone() if cache: blob = cache[0] return blob else: return None async def set(self, swhid: SWHID, blob: bytes) -> None: await self.conn.execute( "insert into blob_cache values (?, ?)", (str(swhid), blob) ) diff --git a/swh/fuse/fs/mountpoint.py b/swh/fuse/fs/mountpoint.py index abf42d4..bc1d7cd 100644 --- a/swh/fuse/fs/mountpoint.py +++ b/swh/fuse/fs/mountpoint.py @@ -1,75 +1,77 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import json from typing import AsyncIterator from swh.fuse.fs.artifact import typify from swh.fuse.fs.entry import EntryMode, FuseEntry from swh.model.identifiers import CONTENT, SWHID # Avoid cycling import Fuse = "Fuse" class Root(FuseEntry): """ The FUSE mountpoint, consisting of the archive/ and meta/ directories """ def __init__(self, fuse: Fuse): super().__init__(name="root", mode=int(EntryMode.RDONLY_DIR), fuse=fuse) async def __aiter__(self) -> AsyncIterator[FuseEntry]: for entry in [ArchiveDir(self.fuse), MetaDir(self.fuse)]: yield entry class ArchiveDir(FuseEntry): """ The archive/ directory is lazily populated with one entry per accessed SWHID, having actual SWHIDs as names """ def __init__(self, fuse: Fuse): super().__init__(name="archive", mode=int(EntryMode.RDONLY_DIR), fuse=fuse) async def __aiter__(self) -> AsyncIterator[FuseEntry]: async for swhid in self.fuse.cache.get_cached_swhids(): if swhid.object_type == CONTENT: mode = EntryMode.RDONLY_FILE else: mode = EntryMode.RDONLY_DIR yield typify(str(swhid), int(mode), self.fuse, swhid) class MetaDir(FuseEntry): """ The meta/ directory contains one SWHID.json file for each SWHID entry under archive/. The JSON file contain all available meta information about the given SWHID, as returned by the Software Heritage Web API for that object. Note that, in case of pagination (e.g., snapshot objects with many branches) the JSON file will contain a complete version with all pages merged together. """ def __init__(self, fuse: Fuse): super().__init__(name="meta", mode=int(EntryMode.RDONLY_DIR), fuse=fuse) async def __aiter__(self) -> AsyncIterator[FuseEntry]: async for swhid in self.fuse.cache.get_cached_swhids(): yield MetaEntry(swhid, self.fuse) class MetaEntry(FuseEntry): """ An entry from the meta/ directory, containing for each accessed SWHID a corresponding SWHID.json file with all the metadata from the Software Heritage archive. """ def __init__(self, swhid: SWHID, fuse: Fuse): super().__init__( name=str(swhid) + ".json", mode=int(EntryMode.RDONLY_FILE), fuse=fuse ) self.swhid = swhid async def content(self) -> bytes: - metadata = await self.fuse.get_metadata(self.swhid) - return str(metadata).encode() + # Get raw JSON metadata from API (un-typified) + metadata = await self.fuse.cache.metadata.get(self.swhid, typify=False) + return json.dumps(metadata).encode() async def length(self) -> int: return len(await self.content()) diff --git a/swh/fuse/fuse.py b/swh/fuse/fuse.py index d90ddac..26b827a 100644 --- a/swh/fuse/fuse.py +++ b/swh/fuse/fuse.py @@ -1,216 +1,219 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import asyncio import errno import logging import os from pathlib import Path import time from typing import Any, Dict, List import pyfuse3 import pyfuse3_asyncio import requests from swh.fuse.cache import FuseCache from swh.fuse.fs.entry import FuseEntry from swh.fuse.fs.mountpoint import Root from swh.model.identifiers import CONTENT, SWHID from swh.web.client.client import WebAPIClient class Fuse(pyfuse3.Operations): """ Software Heritage Filesystem in Userspace (FUSE). Locally mount parts of the archive and navigate it as a virtual file system. """ def __init__( self, root_path: Path, cache: FuseCache, conf: Dict[str, Any], ): super(Fuse, self).__init__() self._next_inode: int = pyfuse3.ROOT_INODE self._inode2entry: Dict[int, FuseEntry] = {} self.root = Root(fuse=self) self.time_ns: int = time.time_ns() # start time, used as timestamp self.gid = os.getgid() self.uid = os.getuid() self.web_api = WebAPIClient( conf["web-api"]["url"], conf["web-api"]["auth-token"] ) self.cache = cache def shutdown(self) -> None: pass def _alloc_inode(self, entry: FuseEntry) -> int: """ Return a unique inode integer for a given entry """ inode = self._next_inode self._next_inode += 1 self._inode2entry[inode] = entry # TODO add inode recycling with invocation to invalidate_inode when # the dicts get too big return inode def inode2entry(self, inode: int) -> FuseEntry: """ Return the entry matching a given inode """ try: return self._inode2entry[inode] except KeyError: raise pyfuse3.FUSEError(errno.ENOENT) async def get_metadata(self, swhid: SWHID) -> Any: """ Retrieve metadata for a given SWHID using Software Heritage API """ cache = await self.cache.metadata.get(swhid) if cache: return cache try: # TODO: swh-graph API + typify = False # Get the raw JSON from the API # TODO: async web API loop = asyncio.get_event_loop() - metadata = await loop.run_in_executor(None, self.web_api.get, swhid) + metadata = await loop.run_in_executor(None, self.web_api.get, swhid, typify) await self.cache.metadata.set(swhid, metadata) - return metadata + # Retrieve it from cache so it is correctly typed + return await self.cache.metadata.get(swhid) except requests.HTTPError: logging.error(f"Unknown SWHID: '{swhid}'") async def get_blob(self, swhid: SWHID) -> bytes: """ Retrieve the blob bytes for a given content SWHID using Software Heritage API """ if swhid.object_type != CONTENT: raise pyfuse3.FUSEError(errno.EINVAL) + # Make sure the metadata cache is also populated with the given SWHID + await self.get_metadata(swhid) + cache = await self.cache.blob.get(swhid) if cache: return cache loop = asyncio.get_event_loop() resp = await loop.run_in_executor(None, self.web_api.content_raw, swhid) blob = b"".join(list(resp)) await self.cache.blob.set(swhid, blob) return blob async def get_attrs(self, entry: FuseEntry) -> pyfuse3.EntryAttributes: """ Return entry attributes """ attrs = pyfuse3.EntryAttributes() attrs.st_size = 0 attrs.st_atime_ns = self.time_ns attrs.st_ctime_ns = self.time_ns attrs.st_mtime_ns = self.time_ns attrs.st_gid = self.gid attrs.st_uid = self.uid attrs.st_ino = entry.inode attrs.st_mode = entry.mode attrs.st_size = await entry.length() return attrs async def getattr( self, inode: int, _ctx: pyfuse3.RequestContext ) -> pyfuse3.EntryAttributes: """ Get attributes for a given inode """ entry = self.inode2entry(inode) return await self.get_attrs(entry) async def opendir(self, inode: int, _ctx: pyfuse3.RequestContext) -> int: """ Open a directory referred by a given inode """ # Re-use inode as directory handle return inode - async def readdir( - self, fh: int, offset: int, token: pyfuse3.ReaddirToken - ) -> None: + async def readdir(self, fh: int, offset: int, token: pyfuse3.ReaddirToken) -> None: """ Read entries in an open directory """ # opendir() uses inode as directory handle inode = fh # TODO: add cache on direntry list? direntry = self.inode2entry(inode) next_id = offset + 1 i = 0 async for entry in direntry: if i < offset: i += 1 continue name = os.fsencode(entry.name) attrs = await self.get_attrs(entry) if not pyfuse3.readdir_reply(token, name, attrs, next_id): break next_id += 1 self._inode2entry[attrs.st_ino] = entry async def open( self, inode: int, _flags: int, _ctx: pyfuse3.RequestContext ) -> pyfuse3.FileInfo: """ Open an inode and return a unique file handle """ # Re-use inode as file handle return pyfuse3.FileInfo(fh=inode, keep_cache=True) async def read(self, fh: int, offset: int, length: int) -> bytes: """ Read `length` bytes from file handle `fh` at position `offset` """ # open() uses inode as file handle inode = fh entry = self.inode2entry(inode) data = await entry.content() return data[offset : offset + length] async def lookup( self, parent_inode: int, name: str, _ctx: pyfuse3.RequestContext ) -> pyfuse3.EntryAttributes: """ Look up a directory entry by name and get its attributes """ name = os.fsdecode(name) parent_entry = self.inode2entry(parent_inode) async for entry in parent_entry: if name == entry.name: attr = await self.get_attrs(entry) return attr logging.error(f"Unknown name during lookup: '{name}'") raise pyfuse3.FUSEError(errno.ENOENT) async def main(swhids: List[SWHID], root_path: Path, conf: Dict[str, Any]) -> None: """ swh-fuse CLI entry-point """ # Use pyfuse3 asyncio layer to match the rest of Software Heritage codebase pyfuse3_asyncio.enable() async with FuseCache(conf["cache"]) as cache: fs = Fuse(root_path, cache, conf) # Initially populate the cache for swhid in swhids: await fs.get_metadata(swhid) fuse_options = set(pyfuse3.default_options) fuse_options.add("fsname=swhfs") fuse_options.add("debug") pyfuse3.init(fs, root_path, fuse_options) try: await pyfuse3.main() finally: fs.shutdown() pyfuse3.close(unmount=True)