Page MenuHomeSoftware Heritage

fuse.py
No OneTemporary

# Copyright (C) 2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import asyncio
import errno
import logging
import os
from pathlib import Path
import time
from typing import Any, Dict, List
import pyfuse3
import pyfuse3_asyncio
import requests
from swh.fuse.cache import FuseCache
from swh.fuse.fs.entry import FuseDirEntry, FuseEntry, FuseFileEntry, FuseSymlinkEntry
from swh.fuse.fs.mountpoint import Root
from swh.model.identifiers import CONTENT, REVISION, SWHID
from swh.web.client.client import WebAPIClient
class Fuse(pyfuse3.Operations):
""" Software Heritage Filesystem in Userspace (FUSE). Locally mount parts of
the archive and navigate it as a virtual file system. """
def __init__(
self, root_path: Path, cache: FuseCache, conf: Dict[str, Any],
):
super(Fuse, self).__init__()
self._next_inode: int = pyfuse3.ROOT_INODE
self._inode2entry: Dict[int, FuseEntry] = {}
self.root = Root(fuse=self)
self.time_ns: int = time.time_ns() # start time, used as timestamp
self.gid = os.getgid()
self.uid = os.getuid()
self.web_api = WebAPIClient(
conf["web-api"]["url"], conf["web-api"]["auth-token"]
)
self.cache = cache
def shutdown(self) -> None:
pass
def _alloc_inode(self, entry: FuseEntry) -> int:
""" Return a unique inode integer for a given entry """
inode = self._next_inode
self._next_inode += 1
self._inode2entry[inode] = entry
# TODO add inode recycling with invocation to invalidate_inode when
# the dicts get too big
return inode
def inode2entry(self, inode: int) -> FuseEntry:
""" Return the entry matching a given inode """
try:
return self._inode2entry[inode]
except KeyError:
raise pyfuse3.FUSEError(errno.ENOENT)
async def get_metadata(self, swhid: SWHID) -> Any:
""" Retrieve metadata for a given SWHID using Software Heritage API """
cache = await self.cache.metadata.get(swhid)
if cache:
return cache
try:
typify = False # Get the raw JSON from the API
# TODO: async web API
loop = asyncio.get_event_loop()
metadata = await loop.run_in_executor(None, self.web_api.get, swhid, typify)
await self.cache.metadata.set(swhid, metadata)
# Retrieve it from cache so it is correctly typed
return await self.cache.metadata.get(swhid)
except requests.HTTPError as err:
logging.error(f"Cannot fetch metadata for object {swhid}: {err}")
raise
async def get_blob(self, swhid: SWHID) -> bytes:
""" Retrieve the blob bytes for a given content SWHID using Software
Heritage API """
if swhid.object_type != CONTENT:
raise pyfuse3.FUSEError(errno.EINVAL)
# Make sure the metadata cache is also populated with the given SWHID
await self.get_metadata(swhid)
cache = await self.cache.blob.get(swhid)
if cache:
return cache
try:
loop = asyncio.get_event_loop()
resp = await loop.run_in_executor(None, self.web_api.content_raw, swhid)
blob = b"".join(list(resp))
await self.cache.blob.set(swhid, blob)
return blob
except requests.HTTPError as err:
logging.error(f"Cannot fetch blob for object {swhid}: {err}")
raise
async def get_history(self, swhid: SWHID) -> List[SWHID]:
if swhid.object_type != REVISION:
raise pyfuse3.FUSEError(errno.EINVAL)
cache = await self.cache.history.get(swhid)
if cache:
return cache
try:
# Use the swh-graph API to retrieve the full history very fast
call = f"graph/visit/edges/{swhid}?edges=rev:rev"
loop = asyncio.get_event_loop()
history = await loop.run_in_executor(None, self.web_api._call, call)
await self.cache.history.set(history.text)
# Retrieve it from cache so it is correctly typed
return await self.cache.history.get(swhid)
except requests.HTTPError as err:
logging.error(f"Cannot fetch history for object {swhid}: {err}")
# Ignore exception since swh-graph does not necessarily contain the
# most recent artifacts from the archive. Computing the full history
# from the Web API is too computationally intensive so simply return
# an empty list.
return []
async def get_attrs(self, entry: FuseEntry) -> pyfuse3.EntryAttributes:
""" Return entry attributes """
attrs = pyfuse3.EntryAttributes()
attrs.st_size = 0
attrs.st_atime_ns = self.time_ns
attrs.st_ctime_ns = self.time_ns
attrs.st_mtime_ns = self.time_ns
attrs.st_gid = self.gid
attrs.st_uid = self.uid
attrs.st_ino = entry.inode
attrs.st_mode = entry.mode
attrs.st_size = await entry.size()
return attrs
async def getattr(
self, inode: int, _ctx: pyfuse3.RequestContext
) -> pyfuse3.EntryAttributes:
""" Get attributes for a given inode """
entry = self.inode2entry(inode)
return await self.get_attrs(entry)
async def opendir(self, inode: int, _ctx: pyfuse3.RequestContext) -> int:
""" Open a directory referred by a given inode """
# Re-use inode as directory handle
return inode
async def readdir(self, fh: int, offset: int, token: pyfuse3.ReaddirToken) -> None:
""" Read entries in an open directory """
# opendir() uses inode as directory handle
inode = fh
# TODO: add cache on direntry list?
direntry = self.inode2entry(inode)
assert isinstance(direntry, FuseDirEntry)
next_id = offset + 1
i = 0
try:
async for entry in direntry:
if i < offset:
i += 1
continue
name = os.fsencode(entry.name)
attrs = await self.get_attrs(entry)
if not pyfuse3.readdir_reply(token, name, attrs, next_id):
break
next_id += 1
self._inode2entry[attrs.st_ino] = entry
except Exception as err:
logging.debug(f"Cannot readdir: {err}")
raise pyfuse3.FUSEError(errno.ENOENT)
async def open(
self, inode: int, _flags: int, _ctx: pyfuse3.RequestContext
) -> pyfuse3.FileInfo:
""" Open an inode and return a unique file handle """
# Re-use inode as file handle
return pyfuse3.FileInfo(fh=inode, keep_cache=True)
async def read(self, fh: int, offset: int, length: int) -> bytes:
""" Read `length` bytes from file handle `fh` at position `offset` """
# open() uses inode as file handle
inode = fh
entry = self.inode2entry(inode)
assert isinstance(entry, FuseFileEntry)
try:
data = await entry.get_content()
return data[offset : offset + length]
except Exception as err:
logging.debug(f"Cannot read: {err}")
raise pyfuse3.FUSEError(errno.ENOENT)
async def lookup(
self, parent_inode: int, name: str, _ctx: pyfuse3.RequestContext
) -> pyfuse3.EntryAttributes:
""" Look up a directory entry by name and get its attributes """
name = os.fsdecode(name)
parent_entry = self.inode2entry(parent_inode)
assert isinstance(parent_entry, FuseDirEntry)
try:
lookup_entry = await parent_entry.lookup(name)
if lookup_entry:
return await self.get_attrs(lookup_entry)
else:
raise ValueError(f"unknown name: {name}")
except Exception as err:
logging.debug(f"Cannot lookup: {err}")
raise pyfuse3.FUSEError(errno.ENOENT)
async def readlink(self, inode: int, _ctx: pyfuse3.RequestContext) -> bytes:
entry = self.inode2entry(inode)
assert isinstance(entry, FuseSymlinkEntry)
return os.fsencode(entry.get_target())
async def main(swhids: List[SWHID], root_path: Path, conf: Dict[str, Any]) -> None:
""" swh-fuse CLI entry-point """
# Use pyfuse3 asyncio layer to match the rest of Software Heritage codebase
pyfuse3_asyncio.enable()
async with FuseCache(conf["cache"]) as cache:
fs = Fuse(root_path, cache, conf)
# Initially populate the cache
for swhid in swhids:
try:
await fs.get_metadata(swhid)
except Exception as err:
logging.error(f"Cannot prefetch object {swhid}: {err}")
fuse_options = set(pyfuse3.default_options)
fuse_options.add("fsname=swhfs")
if logging.root.level <= logging.DEBUG:
fuse_options.add("debug")
try:
pyfuse3.init(fs, root_path, fuse_options)
await pyfuse3.main()
except Exception as err:
logging.error(f"Error running FUSE: {err}")
finally:
fs.shutdown()
pyfuse3.close(unmount=True)

File Metadata

Mime Type
text/x-python
Expires
Thu, Apr 17, 9:37 AM (5 d, 4 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3284448

Event Timeline