Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/fuse/fuse.py b/swh/fuse/fuse.py
index 6e8c7bf..92baa8d 100644
--- a/swh/fuse/fuse.py
+++ b/swh/fuse/fuse.py
@@ -1,227 +1,246 @@
# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import asyncio
import errno
import logging
import os
from pathlib import Path
import time
from typing import Any, Dict, List
import pyfuse3
import pyfuse3_asyncio
import requests
from swh.fuse.cache import FuseCache
from swh.fuse.fs.entry import FuseDirEntry, FuseEntry, FuseFileEntry, FuseSymlinkEntry
from swh.fuse.fs.mountpoint import Root
from swh.model.identifiers import CONTENT, SWHID
from swh.web.client.client import WebAPIClient
class Fuse(pyfuse3.Operations):
""" Software Heritage Filesystem in Userspace (FUSE). Locally mount parts of
the archive and navigate it as a virtual file system. """
def __init__(
self, root_path: Path, cache: FuseCache, conf: Dict[str, Any],
):
super(Fuse, self).__init__()
self._next_inode: int = pyfuse3.ROOT_INODE
self._inode2entry: Dict[int, FuseEntry] = {}
self.root = Root(fuse=self)
self.time_ns: int = time.time_ns() # start time, used as timestamp
self.gid = os.getgid()
self.uid = os.getuid()
self.web_api = WebAPIClient(
conf["web-api"]["url"], conf["web-api"]["auth-token"]
)
self.cache = cache
def shutdown(self) -> None:
pass
def _alloc_inode(self, entry: FuseEntry) -> int:
""" Return a unique inode integer for a given entry """
inode = self._next_inode
self._next_inode += 1
self._inode2entry[inode] = entry
# TODO add inode recycling with invocation to invalidate_inode when
# the dicts get too big
return inode
def inode2entry(self, inode: int) -> FuseEntry:
""" Return the entry matching a given inode """
try:
return self._inode2entry[inode]
except KeyError:
raise pyfuse3.FUSEError(errno.ENOENT)
async def get_metadata(self, swhid: SWHID) -> Any:
""" Retrieve metadata for a given SWHID using Software Heritage API """
cache = await self.cache.metadata.get(swhid)
if cache:
return cache
try:
# TODO: swh-graph API
typify = False # Get the raw JSON from the API
# TODO: async web API
loop = asyncio.get_event_loop()
metadata = await loop.run_in_executor(None, self.web_api.get, swhid, typify)
await self.cache.metadata.set(swhid, metadata)
# Retrieve it from cache so it is correctly typed
return await self.cache.metadata.get(swhid)
- except requests.HTTPError:
- logging.error(f"Unknown SWHID: '{swhid}'")
+ except requests.HTTPError as err:
+ logging.error(f"Cannot fetch metadata for object {swhid}: {err}")
+ raise
async def get_blob(self, swhid: SWHID) -> bytes:
""" Retrieve the blob bytes for a given content SWHID using Software
Heritage API """
if swhid.object_type != CONTENT:
raise pyfuse3.FUSEError(errno.EINVAL)
# Make sure the metadata cache is also populated with the given SWHID
await self.get_metadata(swhid)
cache = await self.cache.blob.get(swhid)
if cache:
return cache
- loop = asyncio.get_event_loop()
- resp = await loop.run_in_executor(None, self.web_api.content_raw, swhid)
- blob = b"".join(list(resp))
- await self.cache.blob.set(swhid, blob)
- return blob
+ try:
+ loop = asyncio.get_event_loop()
+ resp = await loop.run_in_executor(None, self.web_api.content_raw, swhid)
+ blob = b"".join(list(resp))
+ await self.cache.blob.set(swhid, blob)
+ return blob
+ except requests.HTTPError as err:
+ logging.error(f"Cannot fetch blob for object {swhid}: {err}")
+ raise
async def get_attrs(self, entry: FuseEntry) -> pyfuse3.EntryAttributes:
""" Return entry attributes """
attrs = pyfuse3.EntryAttributes()
attrs.st_size = 0
attrs.st_atime_ns = self.time_ns
attrs.st_ctime_ns = self.time_ns
attrs.st_mtime_ns = self.time_ns
attrs.st_gid = self.gid
attrs.st_uid = self.uid
attrs.st_ino = entry.inode
attrs.st_mode = entry.mode
attrs.st_size = await entry.size()
return attrs
async def getattr(
self, inode: int, _ctx: pyfuse3.RequestContext
) -> pyfuse3.EntryAttributes:
""" Get attributes for a given inode """
entry = self.inode2entry(inode)
return await self.get_attrs(entry)
async def opendir(self, inode: int, _ctx: pyfuse3.RequestContext) -> int:
""" Open a directory referred by a given inode """
# Re-use inode as directory handle
return inode
async def readdir(self, fh: int, offset: int, token: pyfuse3.ReaddirToken) -> None:
""" Read entries in an open directory """
# opendir() uses inode as directory handle
inode = fh
# TODO: add cache on direntry list?
direntry = self.inode2entry(inode)
assert isinstance(direntry, FuseDirEntry)
next_id = offset + 1
i = 0
- async for entry in direntry:
- if i < offset:
- i += 1
- continue
-
- name = os.fsencode(entry.name)
- attrs = await self.get_attrs(entry)
- if not pyfuse3.readdir_reply(token, name, attrs, next_id):
- break
-
- next_id += 1
- self._inode2entry[attrs.st_ino] = entry
+ try:
+ async for entry in direntry:
+ if i < offset:
+ i += 1
+ continue
+
+ name = os.fsencode(entry.name)
+ attrs = await self.get_attrs(entry)
+ if not pyfuse3.readdir_reply(token, name, attrs, next_id):
+ break
+
+ next_id += 1
+ self._inode2entry[attrs.st_ino] = entry
+ except Exception as err:
+ logging.error(f"Cannot readdir: {err}")
+ raise pyfuse3.FUSEError(errno.ENOENT)
async def open(
self, inode: int, _flags: int, _ctx: pyfuse3.RequestContext
) -> pyfuse3.FileInfo:
""" Open an inode and return a unique file handle """
# Re-use inode as file handle
return pyfuse3.FileInfo(fh=inode, keep_cache=True)
async def read(self, fh: int, offset: int, length: int) -> bytes:
""" Read `length` bytes from file handle `fh` at position `offset` """
# open() uses inode as file handle
inode = fh
entry = self.inode2entry(inode)
assert isinstance(entry, FuseFileEntry)
- data = await entry.get_content()
- return data[offset : offset + length]
+ try:
+ data = await entry.get_content()
+ return data[offset : offset + length]
+ except Exception as err:
+ logging.error(f"Cannot read: {err}")
+ raise pyfuse3.FUSEError(errno.ENOENT)
async def lookup(
self, parent_inode: int, name: str, _ctx: pyfuse3.RequestContext
) -> pyfuse3.EntryAttributes:
""" Look up a directory entry by name and get its attributes """
name = os.fsdecode(name)
parent_entry = self.inode2entry(parent_inode)
assert isinstance(parent_entry, FuseDirEntry)
- lookup_entry = await parent_entry.lookup(name)
- if lookup_entry:
- return await self.get_attrs(lookup_entry)
- else:
- logging.error(f"Unknown name during lookup: '{name}'")
+ try:
+ lookup_entry = await parent_entry.lookup(name)
+ if lookup_entry:
+ return await self.get_attrs(lookup_entry)
+ else:
+ raise ValueError(f"unknown name: {name}")
+ except Exception as err:
+ logging.error(f"Cannot lookup: {err}")
raise pyfuse3.FUSEError(errno.ENOENT)
async def readlink(self, inode: int, _ctx: pyfuse3.RequestContext) -> bytes:
entry = self.inode2entry(inode)
assert isinstance(entry, FuseSymlinkEntry)
return os.fsencode(entry.get_target())
async def main(swhids: List[SWHID], root_path: Path, conf: Dict[str, Any]) -> None:
""" swh-fuse CLI entry-point """
# Use pyfuse3 asyncio layer to match the rest of Software Heritage codebase
pyfuse3_asyncio.enable()
async with FuseCache(conf["cache"]) as cache:
fs = Fuse(root_path, cache, conf)
# Initially populate the cache
for swhid in swhids:
- await fs.get_metadata(swhid)
+ try:
+ await fs.get_metadata(swhid)
+ except Exception as err:
+ logging.error(f"Cannot prefetch object {swhid}: {err}")
fuse_options = set(pyfuse3.default_options)
fuse_options.add("fsname=swhfs")
fuse_options.add("debug")
try:
pyfuse3.init(fs, root_path, fuse_options)
await pyfuse3.main()
- except Exception as e:
- logging.error(f"Error running FUSE: {e}")
+ except Exception as err:
+ logging.error(f"Error running FUSE: {err}")
finally:
fs.shutdown()
pyfuse3.close(unmount=True)

File Metadata

Mime Type
text/x-diff
Expires
Thu, Jul 3, 11:44 AM (4 d, 13 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3247459

Event Timeline