diff --git a/swh/fuse/fuse.py b/swh/fuse/fuse.py index 6e8c7bf..92baa8d 100644 --- a/swh/fuse/fuse.py +++ b/swh/fuse/fuse.py @@ -1,227 +1,246 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import asyncio import errno import logging import os from pathlib import Path import time from typing import Any, Dict, List import pyfuse3 import pyfuse3_asyncio import requests from swh.fuse.cache import FuseCache from swh.fuse.fs.entry import FuseDirEntry, FuseEntry, FuseFileEntry, FuseSymlinkEntry from swh.fuse.fs.mountpoint import Root from swh.model.identifiers import CONTENT, SWHID from swh.web.client.client import WebAPIClient class Fuse(pyfuse3.Operations): """ Software Heritage Filesystem in Userspace (FUSE). Locally mount parts of the archive and navigate it as a virtual file system. """ def __init__( self, root_path: Path, cache: FuseCache, conf: Dict[str, Any], ): super(Fuse, self).__init__() self._next_inode: int = pyfuse3.ROOT_INODE self._inode2entry: Dict[int, FuseEntry] = {} self.root = Root(fuse=self) self.time_ns: int = time.time_ns() # start time, used as timestamp self.gid = os.getgid() self.uid = os.getuid() self.web_api = WebAPIClient( conf["web-api"]["url"], conf["web-api"]["auth-token"] ) self.cache = cache def shutdown(self) -> None: pass def _alloc_inode(self, entry: FuseEntry) -> int: """ Return a unique inode integer for a given entry """ inode = self._next_inode self._next_inode += 1 self._inode2entry[inode] = entry # TODO add inode recycling with invocation to invalidate_inode when # the dicts get too big return inode def inode2entry(self, inode: int) -> FuseEntry: """ Return the entry matching a given inode """ try: return self._inode2entry[inode] except KeyError: raise pyfuse3.FUSEError(errno.ENOENT) async def get_metadata(self, swhid: SWHID) -> Any: """ Retrieve metadata for a given SWHID using Software Heritage API """ cache = await self.cache.metadata.get(swhid) if cache: return cache try: # TODO: swh-graph API typify = False # Get the raw JSON from the API # TODO: async web API loop = asyncio.get_event_loop() metadata = await loop.run_in_executor(None, self.web_api.get, swhid, typify) await self.cache.metadata.set(swhid, metadata) # Retrieve it from cache so it is correctly typed return await self.cache.metadata.get(swhid) - except requests.HTTPError: - logging.error(f"Unknown SWHID: '{swhid}'") + except requests.HTTPError as err: + logging.error(f"Cannot fetch metadata for object {swhid}: {err}") + raise async def get_blob(self, swhid: SWHID) -> bytes: """ Retrieve the blob bytes for a given content SWHID using Software Heritage API """ if swhid.object_type != CONTENT: raise pyfuse3.FUSEError(errno.EINVAL) # Make sure the metadata cache is also populated with the given SWHID await self.get_metadata(swhid) cache = await self.cache.blob.get(swhid) if cache: return cache - loop = asyncio.get_event_loop() - resp = await loop.run_in_executor(None, self.web_api.content_raw, swhid) - blob = b"".join(list(resp)) - await self.cache.blob.set(swhid, blob) - return blob + try: + loop = asyncio.get_event_loop() + resp = await loop.run_in_executor(None, self.web_api.content_raw, swhid) + blob = b"".join(list(resp)) + await self.cache.blob.set(swhid, blob) + return blob + except requests.HTTPError as err: + logging.error(f"Cannot fetch blob for object {swhid}: {err}") + raise async def get_attrs(self, entry: FuseEntry) -> pyfuse3.EntryAttributes: """ Return entry attributes """ attrs = pyfuse3.EntryAttributes() attrs.st_size = 0 attrs.st_atime_ns = self.time_ns attrs.st_ctime_ns = self.time_ns attrs.st_mtime_ns = self.time_ns attrs.st_gid = self.gid attrs.st_uid = self.uid attrs.st_ino = entry.inode attrs.st_mode = entry.mode attrs.st_size = await entry.size() return attrs async def getattr( self, inode: int, _ctx: pyfuse3.RequestContext ) -> pyfuse3.EntryAttributes: """ Get attributes for a given inode """ entry = self.inode2entry(inode) return await self.get_attrs(entry) async def opendir(self, inode: int, _ctx: pyfuse3.RequestContext) -> int: """ Open a directory referred by a given inode """ # Re-use inode as directory handle return inode async def readdir(self, fh: int, offset: int, token: pyfuse3.ReaddirToken) -> None: """ Read entries in an open directory """ # opendir() uses inode as directory handle inode = fh # TODO: add cache on direntry list? direntry = self.inode2entry(inode) assert isinstance(direntry, FuseDirEntry) next_id = offset + 1 i = 0 - async for entry in direntry: - if i < offset: - i += 1 - continue - - name = os.fsencode(entry.name) - attrs = await self.get_attrs(entry) - if not pyfuse3.readdir_reply(token, name, attrs, next_id): - break - - next_id += 1 - self._inode2entry[attrs.st_ino] = entry + try: + async for entry in direntry: + if i < offset: + i += 1 + continue + + name = os.fsencode(entry.name) + attrs = await self.get_attrs(entry) + if not pyfuse3.readdir_reply(token, name, attrs, next_id): + break + + next_id += 1 + self._inode2entry[attrs.st_ino] = entry + except Exception as err: + logging.error(f"Cannot readdir: {err}") + raise pyfuse3.FUSEError(errno.ENOENT) async def open( self, inode: int, _flags: int, _ctx: pyfuse3.RequestContext ) -> pyfuse3.FileInfo: """ Open an inode and return a unique file handle """ # Re-use inode as file handle return pyfuse3.FileInfo(fh=inode, keep_cache=True) async def read(self, fh: int, offset: int, length: int) -> bytes: """ Read `length` bytes from file handle `fh` at position `offset` """ # open() uses inode as file handle inode = fh entry = self.inode2entry(inode) assert isinstance(entry, FuseFileEntry) - data = await entry.get_content() - return data[offset : offset + length] + try: + data = await entry.get_content() + return data[offset : offset + length] + except Exception as err: + logging.error(f"Cannot read: {err}") + raise pyfuse3.FUSEError(errno.ENOENT) async def lookup( self, parent_inode: int, name: str, _ctx: pyfuse3.RequestContext ) -> pyfuse3.EntryAttributes: """ Look up a directory entry by name and get its attributes """ name = os.fsdecode(name) parent_entry = self.inode2entry(parent_inode) assert isinstance(parent_entry, FuseDirEntry) - lookup_entry = await parent_entry.lookup(name) - if lookup_entry: - return await self.get_attrs(lookup_entry) - else: - logging.error(f"Unknown name during lookup: '{name}'") + try: + lookup_entry = await parent_entry.lookup(name) + if lookup_entry: + return await self.get_attrs(lookup_entry) + else: + raise ValueError(f"unknown name: {name}") + except Exception as err: + logging.error(f"Cannot lookup: {err}") raise pyfuse3.FUSEError(errno.ENOENT) async def readlink(self, inode: int, _ctx: pyfuse3.RequestContext) -> bytes: entry = self.inode2entry(inode) assert isinstance(entry, FuseSymlinkEntry) return os.fsencode(entry.get_target()) async def main(swhids: List[SWHID], root_path: Path, conf: Dict[str, Any]) -> None: """ swh-fuse CLI entry-point """ # Use pyfuse3 asyncio layer to match the rest of Software Heritage codebase pyfuse3_asyncio.enable() async with FuseCache(conf["cache"]) as cache: fs = Fuse(root_path, cache, conf) # Initially populate the cache for swhid in swhids: - await fs.get_metadata(swhid) + try: + await fs.get_metadata(swhid) + except Exception as err: + logging.error(f"Cannot prefetch object {swhid}: {err}") fuse_options = set(pyfuse3.default_options) fuse_options.add("fsname=swhfs") fuse_options.add("debug") try: pyfuse3.init(fs, root_path, fuse_options) await pyfuse3.main() - except Exception as e: - logging.error(f"Error running FUSE: {e}") + except Exception as err: + logging.error(f"Error running FUSE: {err}") finally: fs.shutdown() pyfuse3.close(unmount=True)