diff --git a/swh/fuse/cache.py b/swh/fuse/cache.py index b167f60..b8cdb6b 100644 --- a/swh/fuse/cache.py +++ b/swh/fuse/cache.py @@ -1,388 +1,400 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from abc import ABC from collections import OrderedDict from dataclasses import dataclass, field from datetime import datetime import json import logging from pathlib import Path import re import sqlite3 import sys from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple import aiosqlite import dateutil.parser from psutil import virtual_memory from swh.fuse.fs.artifact import RevisionHistoryShardByDate from swh.fuse.fs.entry import FuseDirEntry, FuseEntry from swh.fuse.fs.mountpoint import CacheDir, OriginDir from swh.model.exceptions import ValidationError from swh.model.identifiers import REVISION, SWHID, parse_swhid from swh.web.client.client import ORIGIN_VISIT, typify_json class FuseCache: """SwhFS retrieves both metadata and file contents from the Software Heritage archive via the network. In order to obtain reasonable performances several caches are used to minimize network transfer. Caches are stored on disk in SQLite databases located at `$XDG_CACHE_HOME/swh/fuse/`. All caches are persistent (i.e., they survive the restart of the SwhFS process) and global (i.e., they are shared by concurrent SwhFS processes). We assume that no cache *invalidation* is necessary, due to intrinsic properties of the Software Heritage archive, such as integrity verification and append-only archive changes. To clean the caches one can just remove the corresponding files from disk. """ def __init__(self, cache_conf: Dict[str, Any]): self.cache_conf = cache_conf async def __aenter__(self): # History and raw metadata share the same SQLite db self.metadata = MetadataCache(self.cache_conf["metadata"]) self.history = HistoryCache(self.cache_conf["metadata"]) self.blob = BlobCache(self.cache_conf["blob"]) self.direntry = DirEntryCache(self.cache_conf["direntry"]) await self.metadata.__aenter__() await self.blob.__aenter__() await self.history.__aenter__() return self async def __aexit__(self, type=None, val=None, tb=None) -> None: await self.metadata.__aexit__() await self.blob.__aexit__() await self.history.__aexit__() async def get_cached_swhids(self) -> AsyncGenerator[SWHID, None]: """ Return a list of all previously cached SWHID """ # Use the metadata db since it should always contain all accessed SWHIDs metadata_cursor = await self.metadata.conn.execute( "select swhid from metadata_cache" ) swhids = await metadata_cursor.fetchall() for raw_swhid in swhids: yield parse_swhid(raw_swhid[0]) async def get_cached_visits(self) -> AsyncGenerator[str, None]: """ Return a list of all previously cached visit URL """ cursor = await self.metadata.conn.execute("select url from visits_cache") urls = await cursor.fetchall() for raw_url in urls: yield raw_url[0] class AbstractCache(ABC): """ Abstract cache implementation to share common behavior between cache types (such as: YAML config parsing, SQLite context manager) """ def __init__(self, conf: Dict[str, Any]): self.conf = conf async def __aenter__(self): # In-memory (thus temporary) caching is useful for testing purposes if self.conf.get("in-memory", False): path = "file::memory:?cache=shared" uri = True else: path = Path(self.conf["path"]) path.parent.mkdir(parents=True, exist_ok=True) uri = False self.conn = await aiosqlite.connect( path, uri=uri, detect_types=sqlite3.PARSE_DECLTYPES ) return self async def __aexit__(self, type=None, val=None, tb=None) -> None: await self.conn.close() class MetadataCache(AbstractCache): """ The metadata cache map each artifact to the complete metadata of the referenced object. This is analogous to what is available in `archive/.json` file (and generally used as data source for returning the content of those files). Artifacts are identified using their SWHIDs, or in the case of origin visits, using their URLs. """ DB_SCHEMA = """ create table if not exists metadata_cache ( swhid text not null primary key, metadata blob, date text ); create table if not exists visits_cache ( url text not null primary key, metadata blob, itime timestamp -- insertion time ); """ async def __aenter__(self): await super().__aenter__() await self.conn.executescript(self.DB_SCHEMA) await self.conn.commit() return self async def get(self, swhid: SWHID, typify: bool = True) -> Any: cursor = await self.conn.execute( "select metadata from metadata_cache where swhid=?", (str(swhid),) ) cache = await cursor.fetchone() if cache: metadata = json.loads(cache[0]) return typify_json(metadata, swhid.object_type) if typify else metadata else: return None async def get_visits(self, url_encoded: str) -> Optional[List[Dict[str, Any]]]: cursor = await self.conn.execute( "select metadata, itime from visits_cache where url=?", (url_encoded,), ) cache = await cursor.fetchone() if cache: metadata, itime = cache[0], cache[1] # Force-update cache with (potentially) new origin visits diff = datetime.now() - itime if diff.days >= 1: return None visits = json.loads(metadata) visits_typed = [typify_json(v, ORIGIN_VISIT) for v in visits] return visits_typed else: return None async def set(self, swhid: SWHID, metadata: Any) -> None: # Fill in the date column for revisions (used as cache for history/by-date/) swhid_date = "" if swhid.object_type == REVISION: date = dateutil.parser.parse(metadata["date"]) swhid_date = RevisionHistoryShardByDate.DATE_FMT.format( year=date.year, month=date.month, day=date.day ) await self.conn.execute( "insert into metadata_cache values (?, ?, ?)", (str(swhid), json.dumps(metadata), swhid_date), ) await self.conn.commit() async def set_visits(self, url_encoded: str, visits: List[Dict[str, Any]]) -> None: await self.conn.execute( "insert or replace into visits_cache values (?, ?, ?)", (url_encoded, json.dumps(visits), datetime.now()), ) await self.conn.commit() + async def remove(self, swhid: SWHID) -> None: + await self.conn.execute( + "delete from metadata_cache where swhid=?", (str(swhid),), + ) + await self.conn.commit() + class BlobCache(AbstractCache): """ The blob cache map SWHIDs of type `cnt` to the bytes of their archived content. The blob cache entry for a given content object is populated, at the latest, the first time the object is `read()`-d. It might be populated earlier on due to prefetching, e.g., when a directory pointing to the given content is listed for the first time. """ DB_SCHEMA = """ create table if not exists blob_cache ( swhid text not null primary key, blob blob ); """ async def __aenter__(self): await super().__aenter__() await self.conn.executescript(self.DB_SCHEMA) await self.conn.commit() return self async def get(self, swhid: SWHID) -> Optional[bytes]: cursor = await self.conn.execute( "select blob from blob_cache where swhid=?", (str(swhid),) ) cache = await cursor.fetchone() if cache: blob = cache[0] return blob else: return None async def set(self, swhid: SWHID, blob: bytes) -> None: await self.conn.execute( "insert into blob_cache values (?, ?)", (str(swhid), blob) ) await self.conn.commit() + async def remove(self, swhid: SWHID) -> None: + await self.conn.execute( + "delete from blob_cache where swhid=?", (str(swhid),), + ) + await self.conn.commit() + class HistoryCache(AbstractCache): """ The history cache map SWHIDs of type `rev` to a list of `rev` SWHIDs corresponding to all its revision ancestors, sorted in reverse topological order. As the parents cache, the history cache is lazily populated and can be prefetched. To efficiently store the ancestor lists, the history cache represents ancestors as graph edges (a pair of two SWHID nodes), meaning the history cache is shared amongst all revisions parents. """ DB_SCHEMA = """ create table if not exists history_graph ( src text not null, dst text not null, unique(src, dst) ); create index if not exists idx_history on history_graph(src); """ async def __aenter__(self): await super().__aenter__() await self.conn.executescript(self.DB_SCHEMA) await self.conn.commit() return self HISTORY_REC_QUERY = """ with recursive dfs(node) AS ( values(?) union select history_graph.dst from history_graph join dfs on history_graph.src = dfs.node ) -- Do not keep the root node since it is not an ancestor select * from dfs limit -1 offset 1 """ async def get(self, swhid: SWHID) -> Optional[List[SWHID]]: cursor = await self.conn.execute(self.HISTORY_REC_QUERY, (str(swhid),),) cache = await cursor.fetchall() if not cache: return None history = [] for row in cache: parent = row[0] try: history.append(parse_swhid(parent)) except ValidationError: logging.warning("Cannot parse object from history cache: %s", parent) return history async def get_with_date_prefix( self, swhid: SWHID, date_prefix: str ) -> List[Tuple[SWHID, str]]: cursor = await self.conn.execute( f""" select swhid, date from ( {self.HISTORY_REC_QUERY} ) as history join metadata_cache on history.node = metadata_cache.swhid where metadata_cache.date like '{date_prefix}%' """, (str(swhid),), ) cache = await cursor.fetchall() if not cache: return [] history = [] for row in cache: parent, date = row[0], row[1] try: history.append((parse_swhid(parent), date)) except ValidationError: logging.warning("Cannot parse object from history cache: %s", parent) return history async def set(self, history: str) -> None: history = history.strip() if history: edges = [edge.split(" ") for edge in history.split("\n")] await self.conn.executemany( "insert or ignore into history_graph values (?, ?)", edges ) await self.conn.commit() class DirEntryCache: """ The direntry cache map inode representing directories to the entries they contain. Each entry comes with its name as well as file attributes (i.e., all its needed to perform a detailed directory listing). Additional attributes of each directory entry should be looked up on a entry by entry basis, possibly hitting other caches. The direntry cache for a given dir is populated, at the latest, when the content of the directory is listed. More aggressive prefetching might happen. For instance, when first opening a dir a recursive listing of it can be retrieved from the remote backend and used to recursively populate the direntry cache for all (transitive) sub-directories. """ @dataclass class LRU(OrderedDict): max_ram: int used_ram: int = field(init=False, default=0) def sizeof(self, value: Any) -> int: # Rough size estimate in bytes for a list of entries return len(value) * 1000 def __getitem__(self, key: Any) -> Any: value = super().__getitem__(key) self.move_to_end(key) return value def __setitem__(self, key: Any, value: Any) -> None: if key in self: self.move_to_end(key) else: self.used_ram += self.sizeof(value) super().__setitem__(key, value) while self.used_ram > self.max_ram and self: oldest = next(iter(self)) self.used_ram -= self.sizeof(oldest) del self[oldest] def __init__(self, conf: Dict[str, Any]): m = re.match(r"(\d+)\s*(.+)\s*", conf["maxram"]) if not m: logging.error("Cannot parse direntry maxram config: %s", conf["maxram"]) sys.exit(1) num = float(m.group(1)) unit = m.group(2).upper() if unit == "%": max_ram = int(num * virtual_memory().available / 100) else: units = {"B": 1, "KB": 10 ** 3, "MB": 10 ** 6, "GB": 10 ** 9} max_ram = int(float(num) * units[unit]) self.lru_cache = self.LRU(max_ram) def get(self, direntry: FuseDirEntry) -> Optional[List[FuseEntry]]: return self.lru_cache.get(direntry.inode, None) def set(self, direntry: FuseDirEntry, entries: List[FuseEntry]) -> None: - if isinstance(direntry, (CacheDir, OriginDir)): + if isinstance(direntry, (CacheDir, CacheDir.ArtifactShardBySwhid, OriginDir)): # The `cache/` and `origin/` directories are populated on the fly pass elif ( isinstance(direntry, RevisionHistoryShardByDate) and not direntry.is_status_done ): # The `by-date/' directory is populated in parallel so only cache it # once it has finished fetching all data from the API pass else: self.lru_cache[direntry.inode] = entries diff --git a/swh/fuse/fs/entry.py b/swh/fuse/fs/entry.py index 8777d73..3a4f5d8 100644 --- a/swh/fuse/fs/entry.py +++ b/swh/fuse/fs/entry.py @@ -1,144 +1,151 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from __future__ import annotations from dataclasses import dataclass, field from enum import IntEnum from pathlib import Path import re from stat import S_IFDIR, S_IFLNK, S_IFREG from typing import Any, AsyncIterator, Dict, Optional, Pattern, Sequence, Union # Avoid cycling import Fuse = "Fuse" class EntryMode(IntEnum): """ Default entry mode and permissions for the FUSE. The FUSE mount is always read-only, even if permissions contradict this statement (in a context of a directory, entries are listed with permissions taken from the archive). """ RDONLY_FILE = S_IFREG | 0o444 RDONLY_DIR = S_IFDIR | 0o555 - SYMLINK = S_IFLNK | 0o444 + RDONLY_LNK = S_IFLNK | 0o444 + + # `cache/` sub-directories need the write permission in order to invalidate + # cached artifacts using `rm {SWHID}` + RDWR_DIR = S_IFDIR | 0o755 @dataclass class FuseEntry: """ Main wrapper class to manipulate virtual FUSE entries Attributes: name: entry filename mode: entry permission mode fuse: internal reference to the main FUSE class inode: unique integer identifying the entry """ name: str mode: int depth: int fuse: Fuse inode: int = field(init=False) file_info_attrs: Dict[str, Any] = field(init=False, default_factory=dict) def __post_init__(self): self.inode = self.fuse._alloc_inode(self) # By default, let the kernel cache previously accessed data self.file_info_attrs["keep_cache"] = True async def size(self) -> int: """ Return the size (in bytes) of an entry """ raise NotImplementedError + async def unlink(self, name: str) -> None: + raise NotImplementedError + def get_relative_root_path(self) -> str: return "../" * (self.depth - 1) def create_child(self, constructor: Any, **kwargs) -> FuseEntry: return constructor(depth=self.depth + 1, fuse=self.fuse, **kwargs) @dataclass class FuseFileEntry(FuseEntry): """ FUSE virtual file entry """ async def get_content(self) -> bytes: """ Return the content of a file entry """ raise NotImplementedError async def size(self) -> int: return len(await self.get_content()) @dataclass class FuseDirEntry(FuseEntry): """ FUSE virtual directory entry """ ENTRIES_REGEXP: Optional[Pattern] = field(init=False, default=None) async def size(self) -> int: return 0 def validate_entry(self, name: str) -> bool: """ Return true if the name matches the directory entries regular expression, and false otherwise """ if self.ENTRIES_REGEXP: return re.match(self.ENTRIES_REGEXP, name) else: return True async def compute_entries(self) -> Sequence[FuseEntry]: """ Return the child entries of a directory entry """ raise NotImplementedError async def get_entries(self, offset: int = 0) -> AsyncIterator[FuseEntry]: """ Return the child entries of a directory entry using direntry cache """ cache = self.fuse.cache.direntry.get(self) if cache: entries = cache else: entries = [x async for x in self.compute_entries()] self.fuse.cache.direntry.set(self, entries) # Avoid copy by manual iteration (instead of slicing) and use of a # generator (instead of returning the full list every time) for i in range(offset, len(entries)): yield entries[i] async def lookup(self, name: str) -> FuseEntry: """ Look up a FUSE entry by name """ async for entry in self.get_entries(): if entry.name == name: return entry return None @dataclass class FuseSymlinkEntry(FuseEntry): """ FUSE virtual symlink entry Attributes: target: path to symlink target """ - mode: int = field(init=False, default=int(EntryMode.SYMLINK)) + mode: int = field(init=False, default=int(EntryMode.RDONLY_LNK)) target: Union[str, bytes, Path] async def size(self) -> int: return len(str(self.target)) def get_target(self) -> Union[str, bytes, Path]: """ Return the path target of a symlink entry """ return self.target diff --git a/swh/fuse/fs/mountpoint.py b/swh/fuse/fs/mountpoint.py index ff563da..ca47768 100644 --- a/swh/fuse/fs/mountpoint.py +++ b/swh/fuse/fs/mountpoint.py @@ -1,231 +1,241 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from dataclasses import dataclass, field import json from pathlib import Path import re from typing import AsyncIterator from swh.fuse.fs.artifact import OBJTYPE_GETTERS, SWHID_REGEXP, Origin from swh.fuse.fs.entry import ( EntryMode, FuseDirEntry, FuseEntry, FuseFileEntry, FuseSymlinkEntry, ) from swh.model.exceptions import ValidationError from swh.model.identifiers import CONTENT, SWHID, parse_swhid JSON_SUFFIX = ".json" @dataclass class Root(FuseDirEntry): """ The FUSE mountpoint, consisting of the archive/ and origin/ directories """ name: str = field(init=False, default=None) mode: int = field(init=False, default=int(EntryMode.RDONLY_DIR)) depth: int = field(init=False, default=1) async def compute_entries(self) -> AsyncIterator[FuseEntry]: yield self.create_child(ArchiveDir) yield self.create_child(OriginDir) yield self.create_child(CacheDir) yield self.create_child(Readme) @dataclass class ArchiveDir(FuseDirEntry): """ The `archive/` virtual directory allows to mount any artifact on the fly using its SWHID as name. The associated metadata of the artifact from the Software Heritage Web API can also be accessed through the `SWHID.json` file (in case of pagination, the JSON file will contain a complete version with all pages merged together). Note: the archive directory cannot be listed with ls, but entries in it can be accessed (e.g., using cat or cd). """ name: str = field(init=False, default="archive") mode: int = field(init=False, default=int(EntryMode.RDONLY_DIR)) ENTRIES_REGEXP = re.compile(r"^(" + SWHID_REGEXP + ")(.json)?$") async def compute_entries(self) -> AsyncIterator[FuseEntry]: return yield async def lookup(self, name: str) -> FuseEntry: # On the fly mounting of a new artifact try: if name.endswith(JSON_SUFFIX): swhid = parse_swhid(name[: -len(JSON_SUFFIX)]) return self.create_child( MetaEntry, name=f"{swhid}{JSON_SUFFIX}", mode=int(EntryMode.RDONLY_FILE), swhid=swhid, ) else: swhid = parse_swhid(name) await self.fuse.get_metadata(swhid) return self.create_child( OBJTYPE_GETTERS[swhid.object_type], name=str(swhid), mode=int( EntryMode.RDONLY_FILE if swhid.object_type == CONTENT else EntryMode.RDONLY_DIR ), swhid=swhid, ) except ValidationError: return None @dataclass class MetaEntry(FuseFileEntry): """ An entry for a `archive/.json` file, containing all the SWHID's metadata from the Software Heritage archive. """ swhid: SWHID async def get_content(self) -> bytes: # Make sure the metadata is in cache await self.fuse.get_metadata(self.swhid) # Retrieve raw JSON metadata from cache (un-typified) metadata = await self.fuse.cache.metadata.get(self.swhid, typify=False) json_str = json.dumps(metadata, indent=self.fuse.conf["json-indent"]) return (json_str + "\n").encode() async def size(self) -> int: return len(await self.get_content()) @dataclass class OriginDir(FuseDirEntry): """ The origin/ directory is lazily populated with one entry per accessed origin URL (mangled to create a valid UNIX filename). The URL encoding is done using the percent-encoding mechanism described in RFC 3986. """ name: str = field(init=False, default="origin") mode: int = field(init=False, default=int(EntryMode.RDONLY_DIR)) ENTRIES_REGEXP = re.compile(r"^.*%3A.*$") # %3A is the encoded version of ':' def create_child(self, url_encoded: str) -> FuseEntry: return super().create_child( Origin, name=url_encoded, mode=int(EntryMode.RDONLY_DIR), ) async def compute_entries(self) -> AsyncIterator[FuseEntry]: async for url in self.fuse.cache.get_cached_visits(): yield self.create_child(url) async def lookup(self, name: str) -> FuseEntry: entry = await super().lookup(name) if entry: return entry # On the fly mounting of new origin url try: url_encoded = name await self.fuse.get_visits(url_encoded) return self.create_child(url_encoded) except ValueError: return None @dataclass class CacheDir(FuseDirEntry): """ The cache/ directory is an on-disk representation of locally cached objects and metadata. Via this directory you can browse cached data and selectively remove them from the cache, freeing disk space. (See `swh fs clean` in the {ref}`CLI ` to completely empty the cache). The directory is populated with symlinks to: all artifacts, identified by their SWHIDs and sharded by the first two character of their object id, the metadata identified by a `SWHID.json` entry, and the `origin/` directory. """ name: str = field(init=False, default="cache") mode: int = field(init=False, default=int(EntryMode.RDONLY_DIR)) ENTRIES_REGEXP = re.compile(r"^([a-f0-9]{2})|(" + OriginDir.name + ")$") @dataclass class ArtifactShardBySwhid(FuseDirEntry): ENTRIES_REGEXP = re.compile(r"^(" + SWHID_REGEXP + ")$") prefix: str = field(default="") async def compute_entries(self) -> AsyncIterator[FuseEntry]: root_path = self.get_relative_root_path() async for swhid in self.fuse.cache.get_cached_swhids(): if not swhid.object_id.startswith(self.prefix): continue yield self.create_child( FuseSymlinkEntry, name=str(swhid), target=Path(root_path, f"archive/{swhid}"), ) yield self.create_child( FuseSymlinkEntry, name=f"{swhid}{JSON_SUFFIX}", target=Path(root_path, f"archive/{swhid}{JSON_SUFFIX}"), ) + async def unlink(self, name: str) -> None: + try: + if name.endswith(JSON_SUFFIX): + name = name[: -len(JSON_SUFFIX)] + swhid = parse_swhid(name) + await self.fuse.cache.metadata.remove(swhid) + await self.fuse.cache.blob.remove(swhid) + except ValidationError: + raise + async def compute_entries(self) -> AsyncIterator[FuseEntry]: prefixes = set() async for swhid in self.fuse.cache.get_cached_swhids(): prefixes.add(swhid.object_id[:2]) for prefix in prefixes: yield self.create_child( CacheDir.ArtifactShardBySwhid, name=prefix, - mode=int(EntryMode.RDONLY_DIR), + mode=int(EntryMode.RDWR_DIR), prefix=prefix, ) yield self.create_child( FuseSymlinkEntry, name=OriginDir.name, target=Path(self.get_relative_root_path(), OriginDir.name), ) @dataclass class Readme(FuseFileEntry): """ Top-level README to explain briefly what is SwhFS. """ name: str = field(init=False, default="README") mode: int = field(init=False, default=int(EntryMode.RDONLY_FILE)) CONTENT = """Welcome to the Software Heritage Filesystem (SwhFS)! This is a user-space POSIX filesystem to browse the Software Heritage archive, as if it were locally available. The SwhFS mount point contains 3 directories, all initially empty and lazily populated: - "archive": virtual directory to mount any Software Heritage artifact on the fly using its SWHID as name. Note: this directory cannot be listed with ls, but entries in it can be accessed (e.g., using cat or cd). - "origin": virtual directory to mount any origin using its encoded URL as name. - "cache": on-disk representation of locally cached objects and metadata. Try it yourself: $ cat archive/swh:1:cnt:c839dea9e8e6f0528b468214348fee8669b305b2 #include int main(void) { printf("Hello, World!\\n"); } You can find more details and examples in the SwhFS online documentation: https://docs.softwareheritage.org/devel/swh-fuse/ """ async def get_content(self) -> bytes: return self.CONTENT.encode() diff --git a/swh/fuse/fuse.py b/swh/fuse/fuse.py index fae8a1f..989e7b3 100644 --- a/swh/fuse/fuse.py +++ b/swh/fuse/fuse.py @@ -1,339 +1,359 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import asyncio import errno import functools import logging import os from pathlib import Path import time from typing import Any, Dict, List import urllib.parse import pyfuse3 import pyfuse3_asyncio import requests from swh.fuse import LOGGER_NAME from swh.fuse.cache import FuseCache from swh.fuse.fs.entry import FuseDirEntry, FuseEntry, FuseFileEntry, FuseSymlinkEntry from swh.fuse.fs.mountpoint import Root from swh.model.identifiers import CONTENT, REVISION, SWHID from swh.web.client.client import WebAPIClient class Fuse(pyfuse3.Operations): """ Software Heritage Filesystem in Userspace (FUSE). Locally mount parts of the archive and navigate it as a virtual file system. """ def __init__( self, root_path: Path, cache: FuseCache, conf: Dict[str, Any], ): super(Fuse, self).__init__() self._next_inode: int = pyfuse3.ROOT_INODE self._inode2entry: Dict[int, FuseEntry] = {} self.root = Root(fuse=self) self.conf = conf self.logger = logging.getLogger(LOGGER_NAME) self.time_ns: int = time.time_ns() # start time, used as timestamp self.gid = os.getgid() self.uid = os.getuid() self.web_api = WebAPIClient( conf["web-api"]["url"], conf["web-api"]["auth-token"] ) self.cache = cache def shutdown(self) -> None: pass def _alloc_inode(self, entry: FuseEntry) -> int: """ Return a unique inode integer for a given entry """ inode = self._next_inode self._next_inode += 1 self._inode2entry[inode] = entry # TODO add inode recycling with invocation to invalidate_inode when # the dicts get too big return inode def _remove_inode(self, inode: int) -> None: try: del self._inode2entry[inode] except KeyError: pass def inode2entry(self, inode: int) -> FuseEntry: """ Return the entry matching a given inode """ try: return self._inode2entry[inode] except KeyError: raise pyfuse3.FUSEError(errno.ENOENT) async def get_metadata(self, swhid: SWHID) -> Any: """ Retrieve metadata for a given SWHID using Software Heritage API """ cache = await self.cache.metadata.get(swhid) if cache: return cache try: typify = False # Get the raw JSON from the API # TODO: async web API loop = asyncio.get_event_loop() metadata = await loop.run_in_executor(None, self.web_api.get, swhid, typify) await self.cache.metadata.set(swhid, metadata) # Retrieve it from cache so it is correctly typed return await self.cache.metadata.get(swhid) except requests.HTTPError as err: self.logger.error("Cannot fetch metadata for object %s: %s", swhid, err) raise async def get_blob(self, swhid: SWHID) -> bytes: """ Retrieve the blob bytes for a given content SWHID using Software Heritage API """ if swhid.object_type != CONTENT: raise pyfuse3.FUSEError(errno.EINVAL) # Make sure the metadata cache is also populated with the given SWHID await self.get_metadata(swhid) cache = await self.cache.blob.get(swhid) if cache: self.logger.debug("Found blob %s in cache", swhid) return cache try: self.logger.debug("Retrieving blob %s via web API...", swhid) loop = asyncio.get_event_loop() resp = await loop.run_in_executor(None, self.web_api.content_raw, swhid) blob = b"".join(list(resp)) await self.cache.blob.set(swhid, blob) return blob except requests.HTTPError as err: self.logger.error("Cannot fetch blob for object %s: %s", swhid, err) raise async def get_history(self, swhid: SWHID) -> List[SWHID]: """ Retrieve a revision's history using Software Heritage Graph API """ if swhid.object_type != REVISION: raise pyfuse3.FUSEError(errno.EINVAL) cache = await self.cache.history.get(swhid) if cache: self.logger.debug( "Found history of %s in cache (%d ancestors)", swhid, len(cache) ) return cache try: # Use the swh-graph API to retrieve the full history very fast self.logger.debug("Retrieving history of %s via graph API...", swhid) call = f"graph/visit/edges/{swhid}?edges=rev:rev" loop = asyncio.get_event_loop() history = await loop.run_in_executor(None, self.web_api._call, call) await self.cache.history.set(history.text) # Retrieve it from cache so it is correctly typed res = await self.cache.history.get(swhid) return res except requests.HTTPError as err: self.logger.error("Cannot fetch history for object %s: %s", swhid, err) # Ignore exception since swh-graph does not necessarily contain the # most recent artifacts from the archive. Computing the full history # from the Web API is too computationally intensive so simply return # an empty list. return [] async def get_visits(self, url_encoded: str) -> List[Dict[str, Any]]: """ Retrieve origin visits given an encoded-URL using Software Heritage API """ cache = await self.cache.metadata.get_visits(url_encoded) if cache: self.logger.debug( "Found %d visits for origin '%s' in cache", len(cache), url_encoded, ) return cache try: self.logger.debug( "Retrieving visits for origin '%s' via web API...", url_encoded ) typify = False # Get the raw JSON from the API loop = asyncio.get_event_loop() # Web API only takes non-encoded URL url = urllib.parse.unquote_plus(url_encoded) origin_exists = await loop.run_in_executor( None, self.web_api.origin_exists, url ) if not origin_exists: raise ValueError("origin does not exist") visits_it = await loop.run_in_executor( None, functools.partial(self.web_api.visits, url, typify=typify) ) visits = list(visits_it) await self.cache.metadata.set_visits(url_encoded, visits) # Retrieve it from cache so it is correctly typed res = await self.cache.metadata.get_visits(url_encoded) return res except (ValueError, requests.HTTPError) as err: self.logger.error( "Cannot fetch visits for origin '%s': %s", url_encoded, err ) raise async def get_attrs(self, entry: FuseEntry) -> pyfuse3.EntryAttributes: """ Return entry attributes """ attrs = pyfuse3.EntryAttributes() attrs.st_size = 0 attrs.st_atime_ns = self.time_ns attrs.st_ctime_ns = self.time_ns attrs.st_mtime_ns = self.time_ns attrs.st_gid = self.gid attrs.st_uid = self.uid attrs.st_ino = entry.inode attrs.st_mode = entry.mode attrs.st_size = await entry.size() return attrs async def getattr( self, inode: int, _ctx: pyfuse3.RequestContext ) -> pyfuse3.EntryAttributes: """ Get attributes for a given inode """ entry = self.inode2entry(inode) return await self.get_attrs(entry) async def opendir(self, inode: int, _ctx: pyfuse3.RequestContext) -> int: """ Open a directory referred by a given inode """ # Re-use inode as directory handle self.logger.debug("opendir(inode=%d)", inode) return inode async def readdir(self, fh: int, offset: int, token: pyfuse3.ReaddirToken) -> None: """ Read entries in an open directory """ # opendir() uses inode as directory handle inode = fh direntry = self.inode2entry(inode) self.logger.debug( "readdir(dirname=%s, fh=%d, offset=%d)", direntry.name, fh, offset ) assert isinstance(direntry, FuseDirEntry) next_id = offset + 1 try: async for entry in direntry.get_entries(offset): name = os.fsencode(entry.name) attrs = await self.get_attrs(entry) if not pyfuse3.readdir_reply(token, name, attrs, next_id): break next_id += 1 self._inode2entry[attrs.st_ino] = entry except Exception as err: self.logger.exception("Cannot readdir: %s", err) raise pyfuse3.FUSEError(errno.ENOENT) async def open( self, inode: int, _flags: int, _ctx: pyfuse3.RequestContext ) -> pyfuse3.FileInfo: """ Open an inode and return a unique file handle """ # Re-use inode as file handle self.logger.debug("open(inode=%d)", inode) entry = self.inode2entry(inode) return pyfuse3.FileInfo(fh=inode, **entry.file_info_attrs) async def read(self, fh: int, offset: int, length: int) -> bytes: """ Read `length` bytes from file handle `fh` at position `offset` """ # open() uses inode as file handle inode = fh entry = self.inode2entry(inode) self.logger.debug( "read(name=%s, fh=%d, offset=%d, length=%d)", entry.name, fh, offset, length ) assert isinstance(entry, FuseFileEntry) try: data = await entry.get_content() return data[offset : offset + length] except Exception as err: self.logger.exception("Cannot read: %s", err) raise pyfuse3.FUSEError(errno.ENOENT) async def lookup( self, parent_inode: int, name: str, _ctx: pyfuse3.RequestContext ) -> pyfuse3.EntryAttributes: """ Look up a directory entry by name and get its attributes """ name = os.fsdecode(name) parent_entry = self.inode2entry(parent_inode) self.logger.debug( "lookup(parent_name=%s, parent_inode=%d, name=%s)", parent_entry.name, parent_inode, name, ) assert isinstance(parent_entry, FuseDirEntry) try: if parent_entry.validate_entry(name): lookup_entry = await parent_entry.lookup(name) if lookup_entry: return await self.get_attrs(lookup_entry) except Exception as err: self.logger.exception("Cannot lookup: %s", err) raise pyfuse3.FUSEError(errno.ENOENT) async def readlink(self, inode: int, _ctx: pyfuse3.RequestContext) -> bytes: entry = self.inode2entry(inode) self.logger.debug("readlink(name=%s, inode=%d)", entry.name, inode) assert isinstance(entry, FuseSymlinkEntry) return os.fsencode(entry.get_target()) + async def unlink( + self, parent_inode: int, name: str, _ctx: pyfuse3.RequestContext + ) -> None: + """ Remove a file """ + + name = os.fsdecode(name) + parent_entry = self.inode2entry(parent_inode) + self.logger.debug( + "unlink(parent_name=%s, parent_inode=%d, name=%s)", + parent_entry.name, + parent_inode, + name, + ) + + try: + await parent_entry.unlink(name) + except Exception as err: + self.logger.exception("Cannot unlink: %s", err) + raise pyfuse3.FUSEError(errno.ENOENT) + async def main(swhids: List[SWHID], root_path: Path, conf: Dict[str, Any]) -> None: """ swh-fuse CLI entry-point """ # Use pyfuse3 asyncio layer to match the rest of Software Heritage codebase pyfuse3_asyncio.enable() async with FuseCache(conf["cache"]) as cache: fs = Fuse(root_path, cache, conf) # Initially populate the cache for swhid in swhids: try: await fs.get_metadata(swhid) except Exception as err: fs.logger.exception("Cannot prefetch object %s: %s", swhid, err) fuse_options = set(pyfuse3.default_options) fuse_options.add("fsname=swhfs") try: pyfuse3.init(fs, root_path, fuse_options) await pyfuse3.main() except Exception as err: fs.logger.error("Error running FUSE: %s", err) finally: fs.shutdown() pyfuse3.close(unmount=True) diff --git a/swh/fuse/tests/test_cache.py b/swh/fuse/tests/test_cache.py new file mode 100644 index 0000000..b230709 --- /dev/null +++ b/swh/fuse/tests/test_cache.py @@ -0,0 +1,33 @@ +# Copyright (C) 2020 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import os + +from swh.fuse.tests.data.config import REGULAR_FILE +from swh.model.identifiers import parse_swhid + + +def test_cache_artifact(fuse_mntdir): + assert os.listdir(fuse_mntdir / "cache") == ["origin"] + + (fuse_mntdir / "archive" / REGULAR_FILE).is_file() + + swhid = parse_swhid(REGULAR_FILE) + assert os.listdir(fuse_mntdir / "cache") == [swhid.object_id[:2], "origin"] + + +def test_purge_artifact(fuse_mntdir): + DEFAULT_CACHE_CONTENT = ["origin"] + + assert os.listdir(fuse_mntdir / "cache") == DEFAULT_CACHE_CONTENT + + # Access a content artifact... + (fuse_mntdir / "archive" / REGULAR_FILE).is_file() + assert os.listdir(fuse_mntdir / "cache") != DEFAULT_CACHE_CONTENT + # ... and remove it from cache + swhid = parse_swhid(REGULAR_FILE) + os.unlink(fuse_mntdir / "cache" / swhid.object_id[:2] / str(swhid)) + + assert os.listdir(fuse_mntdir / "cache") == DEFAULT_CACHE_CONTENT