Changeset View
Changeset View
Standalone View
Standalone View
swh/fuse/fuse.py
Show First 20 Lines • Show All 79 Lines • ▼ Show 20 Lines | async def get_metadata(self, swhid: SWHID) -> Any: | ||||
typify = False # Get the raw JSON from the API | typify = False # Get the raw JSON from the API | ||||
# TODO: async web API | # TODO: async web API | ||||
loop = asyncio.get_event_loop() | loop = asyncio.get_event_loop() | ||||
metadata = await loop.run_in_executor(None, self.web_api.get, swhid, typify) | metadata = await loop.run_in_executor(None, self.web_api.get, swhid, typify) | ||||
await self.cache.metadata.set(swhid, metadata) | await self.cache.metadata.set(swhid, metadata) | ||||
# Retrieve it from cache so it is correctly typed | # Retrieve it from cache so it is correctly typed | ||||
return await self.cache.metadata.get(swhid) | return await self.cache.metadata.get(swhid) | ||||
except requests.HTTPError as err: | except requests.HTTPError as err: | ||||
logging.error(f"Cannot fetch metadata for object {swhid}: {err}") | logging.error("Cannot fetch metadata for object %s: %s", swhid, err) | ||||
raise | raise | ||||
async def get_blob(self, swhid: SWHID) -> bytes: | async def get_blob(self, swhid: SWHID) -> bytes: | ||||
""" Retrieve the blob bytes for a given content SWHID using Software | """ Retrieve the blob bytes for a given content SWHID using Software | ||||
Heritage API """ | Heritage API """ | ||||
if swhid.object_type != CONTENT: | if swhid.object_type != CONTENT: | ||||
raise pyfuse3.FUSEError(errno.EINVAL) | raise pyfuse3.FUSEError(errno.EINVAL) | ||||
# Make sure the metadata cache is also populated with the given SWHID | # Make sure the metadata cache is also populated with the given SWHID | ||||
await self.get_metadata(swhid) | await self.get_metadata(swhid) | ||||
cache = await self.cache.blob.get(swhid) | cache = await self.cache.blob.get(swhid) | ||||
if cache: | if cache: | ||||
return cache | return cache | ||||
try: | try: | ||||
loop = asyncio.get_event_loop() | loop = asyncio.get_event_loop() | ||||
resp = await loop.run_in_executor(None, self.web_api.content_raw, swhid) | resp = await loop.run_in_executor(None, self.web_api.content_raw, swhid) | ||||
blob = b"".join(list(resp)) | blob = b"".join(list(resp)) | ||||
await self.cache.blob.set(swhid, blob) | await self.cache.blob.set(swhid, blob) | ||||
return blob | return blob | ||||
except requests.HTTPError as err: | except requests.HTTPError as err: | ||||
logging.error(f"Cannot fetch blob for object {swhid}: {err}") | logging.error("Cannot fetch blob for object %s: %s", swhid, err) | ||||
raise | raise | ||||
async def get_history(self, swhid: SWHID) -> List[SWHID]: | async def get_history(self, swhid: SWHID) -> List[SWHID]: | ||||
if swhid.object_type != REVISION: | if swhid.object_type != REVISION: | ||||
raise pyfuse3.FUSEError(errno.EINVAL) | raise pyfuse3.FUSEError(errno.EINVAL) | ||||
cache = await self.cache.history.get(swhid) | cache = await self.cache.history.get(swhid) | ||||
if cache: | if cache: | ||||
return cache | return cache | ||||
try: | try: | ||||
# Use the swh-graph API to retrieve the full history very fast | # Use the swh-graph API to retrieve the full history very fast | ||||
call = f"graph/visit/edges/{swhid}?edges=rev:rev" | call = f"graph/visit/edges/{swhid}?edges=rev:rev" | ||||
loop = asyncio.get_event_loop() | loop = asyncio.get_event_loop() | ||||
history = await loop.run_in_executor(None, self.web_api._call, call) | history = await loop.run_in_executor(None, self.web_api._call, call) | ||||
await self.cache.history.set(history.text) | await self.cache.history.set(history.text) | ||||
# Retrieve it from cache so it is correctly typed | # Retrieve it from cache so it is correctly typed | ||||
return await self.cache.history.get(swhid) | return await self.cache.history.get(swhid) | ||||
except requests.HTTPError as err: | except requests.HTTPError as err: | ||||
logging.error(f"Cannot fetch history for object {swhid}: {err}") | logging.error("Cannot fetch history for object %s: %s", swhid, err) | ||||
# Ignore exception since swh-graph does not necessarily contain the | # Ignore exception since swh-graph does not necessarily contain the | ||||
# most recent artifacts from the archive. Computing the full history | # most recent artifacts from the archive. Computing the full history | ||||
# from the Web API is too computationally intensive so simply return | # from the Web API is too computationally intensive so simply return | ||||
# an empty list. | # an empty list. | ||||
return [] | return [] | ||||
async def get_attrs(self, entry: FuseEntry) -> pyfuse3.EntryAttributes: | async def get_attrs(self, entry: FuseEntry) -> pyfuse3.EntryAttributes: | ||||
""" Return entry attributes """ | """ Return entry attributes """ | ||||
Show All 38 Lines | async def readdir(self, fh: int, offset: int, token: pyfuse3.ReaddirToken) -> None: | ||||
name = os.fsencode(entry.name) | name = os.fsencode(entry.name) | ||||
attrs = await self.get_attrs(entry) | attrs = await self.get_attrs(entry) | ||||
if not pyfuse3.readdir_reply(token, name, attrs, next_id): | if not pyfuse3.readdir_reply(token, name, attrs, next_id): | ||||
break | break | ||||
next_id += 1 | next_id += 1 | ||||
self._inode2entry[attrs.st_ino] = entry | self._inode2entry[attrs.st_ino] = entry | ||||
except Exception as err: | except Exception as err: | ||||
logging.exception(f"Cannot readdir: {err}") | logging.exception("Cannot readdir: %s", err) | ||||
raise pyfuse3.FUSEError(errno.ENOENT) | raise pyfuse3.FUSEError(errno.ENOENT) | ||||
async def open( | async def open( | ||||
self, inode: int, _flags: int, _ctx: pyfuse3.RequestContext | self, inode: int, _flags: int, _ctx: pyfuse3.RequestContext | ||||
) -> pyfuse3.FileInfo: | ) -> pyfuse3.FileInfo: | ||||
""" Open an inode and return a unique file handle """ | """ Open an inode and return a unique file handle """ | ||||
# Re-use inode as file handle | # Re-use inode as file handle | ||||
return pyfuse3.FileInfo(fh=inode, keep_cache=True) | return pyfuse3.FileInfo(fh=inode, keep_cache=True) | ||||
async def read(self, fh: int, offset: int, length: int) -> bytes: | async def read(self, fh: int, offset: int, length: int) -> bytes: | ||||
""" Read `length` bytes from file handle `fh` at position `offset` """ | """ Read `length` bytes from file handle `fh` at position `offset` """ | ||||
# open() uses inode as file handle | # open() uses inode as file handle | ||||
inode = fh | inode = fh | ||||
entry = self.inode2entry(inode) | entry = self.inode2entry(inode) | ||||
assert isinstance(entry, FuseFileEntry) | assert isinstance(entry, FuseFileEntry) | ||||
try: | try: | ||||
data = await entry.get_content() | data = await entry.get_content() | ||||
return data[offset : offset + length] | return data[offset : offset + length] | ||||
except Exception as err: | except Exception as err: | ||||
logging.exception(f"Cannot read: {err}") | logging.exception("Cannot read: %s", err) | ||||
raise pyfuse3.FUSEError(errno.ENOENT) | raise pyfuse3.FUSEError(errno.ENOENT) | ||||
async def lookup( | async def lookup( | ||||
self, parent_inode: int, name: str, _ctx: pyfuse3.RequestContext | self, parent_inode: int, name: str, _ctx: pyfuse3.RequestContext | ||||
) -> pyfuse3.EntryAttributes: | ) -> pyfuse3.EntryAttributes: | ||||
""" Look up a directory entry by name and get its attributes """ | """ Look up a directory entry by name and get its attributes """ | ||||
name = os.fsdecode(name) | name = os.fsdecode(name) | ||||
parent_entry = self.inode2entry(parent_inode) | parent_entry = self.inode2entry(parent_inode) | ||||
assert isinstance(parent_entry, FuseDirEntry) | assert isinstance(parent_entry, FuseDirEntry) | ||||
try: | try: | ||||
lookup_entry = await parent_entry.lookup(name) | lookup_entry = await parent_entry.lookup(name) | ||||
if lookup_entry: | if lookup_entry: | ||||
return await self.get_attrs(lookup_entry) | return await self.get_attrs(lookup_entry) | ||||
else: | else: | ||||
raise ValueError(f"unknown name: {name}") | raise ValueError(f"unknown name: {name}") | ||||
except Exception as err: | except Exception as err: | ||||
logging.exception(f"Cannot lookup: {err}") | logging.exception("Cannot lookup: %s", err) | ||||
raise pyfuse3.FUSEError(errno.ENOENT) | raise pyfuse3.FUSEError(errno.ENOENT) | ||||
async def readlink(self, inode: int, _ctx: pyfuse3.RequestContext) -> bytes: | async def readlink(self, inode: int, _ctx: pyfuse3.RequestContext) -> bytes: | ||||
entry = self.inode2entry(inode) | entry = self.inode2entry(inode) | ||||
assert isinstance(entry, FuseSymlinkEntry) | assert isinstance(entry, FuseSymlinkEntry) | ||||
return os.fsencode(entry.get_target()) | return os.fsencode(entry.get_target()) | ||||
async def main(swhids: List[SWHID], root_path: Path, conf: Dict[str, Any]) -> None: | async def main(swhids: List[SWHID], root_path: Path, conf: Dict[str, Any]) -> None: | ||||
""" swh-fuse CLI entry-point """ | """ swh-fuse CLI entry-point """ | ||||
# Use pyfuse3 asyncio layer to match the rest of Software Heritage codebase | # Use pyfuse3 asyncio layer to match the rest of Software Heritage codebase | ||||
pyfuse3_asyncio.enable() | pyfuse3_asyncio.enable() | ||||
async with FuseCache(conf["cache"]) as cache: | async with FuseCache(conf["cache"]) as cache: | ||||
fs = Fuse(root_path, cache, conf) | fs = Fuse(root_path, cache, conf) | ||||
# Initially populate the cache | # Initially populate the cache | ||||
for swhid in swhids: | for swhid in swhids: | ||||
try: | try: | ||||
await fs.get_metadata(swhid) | await fs.get_metadata(swhid) | ||||
except Exception as err: | except Exception as err: | ||||
logging.exception(f"Cannot prefetch object {swhid}: {err}") | logging.exception("Cannot prefetch object %s: %s", swhid, err) | ||||
fuse_options = set(pyfuse3.default_options) | fuse_options = set(pyfuse3.default_options) | ||||
fuse_options.add("fsname=swhfs") | fuse_options.add("fsname=swhfs") | ||||
if logging.root.level <= logging.DEBUG: | if logging.root.level <= logging.DEBUG: | ||||
fuse_options.add("debug") | fuse_options.add("debug") | ||||
try: | try: | ||||
pyfuse3.init(fs, root_path, fuse_options) | pyfuse3.init(fs, root_path, fuse_options) | ||||
await pyfuse3.main() | await pyfuse3.main() | ||||
except Exception as err: | except Exception as err: | ||||
logging.error(f"Error running FUSE: {err}") | logging.error("Error running FUSE: %s", err) | ||||
finally: | finally: | ||||
fs.shutdown() | fs.shutdown() | ||||
pyfuse3.close(unmount=True) | pyfuse3.close(unmount=True) |