diff --git a/mypy.ini b/mypy.ini index 33bc4f9..d92604e 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1,21 +1,24 @@ [mypy] namespace_packages = True warn_unused_ignores = True # 3rd party libraries without stubs (yet) +[mypy-aiosqlite.*] +ignore_missing_imports = True + [mypy-daemon.*] ignore_missing_imports = True [mypy-pkg_resources.*] ignore_missing_imports = True [mypy-pytest.*] ignore_missing_imports = True [mypy-pyfuse3.*] ignore_missing_imports = True [mypy-pyfuse3_asyncio.*] ignore_missing_imports = True diff --git a/requirements.txt b/requirements.txt index 377e603..f08cef8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ +aiosqlite pyfuse3 python-daemon diff --git a/swh/fuse/cache.py b/swh/fuse/cache.py index 134c448..f649b51 100644 --- a/swh/fuse/cache.py +++ b/swh/fuse/cache.py @@ -1,150 +1,150 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from abc import ABC -from contextlib import closing import json -import sqlite3 -from typing import Any, Dict, Optional, Set +from typing import Any, AsyncGenerator, Dict, Optional + +import aiosqlite from swh.model.identifiers import SWHID, parse_swhid from swh.web.client.client import typify class FuseCache: """ SWH FUSE retrieves both metadata and file contents from the Software Heritage archive via the network. In order to obtain reasonable performances several caches are used to minimize network transfer. Caches are stored on disk in SQLite databases located at `$XDG_CACHE_HOME/swh/fuse/`. All caches are persistent (i.e., they survive the restart of the SWH FUSE process) and global (i.e., they are shared by concurrent SWH FUSE processes). We assume that no cache *invalidation* is necessary, due to intrinsic properties of the Software Heritage archive, such as integrity verification and append-only archive changes. To clean the caches one can just remove the corresponding files from disk. """ def __init__(self, cache_conf: Dict[str, Any]): self.cache_conf = cache_conf - def __enter__(self): + async def __aenter__(self): self.metadata = MetadataCache(self.cache_conf["metadata"]) - self.metadata.__enter__() self.blob = BlobCache(self.cache_conf["blob"]) - self.blob.__enter__() + await self.metadata.__aenter__() + await self.blob.__aenter__() return self - def __exit__(self, type=None, val=None, tb=None) -> None: - self.metadata.__exit__() - self.blob.__exit__() + async def __aexit__(self, type=None, val=None, tb=None) -> None: + await self.metadata.__aexit__() + await self.blob.__aexit__() - def get_cached_swhids(self) -> Set[SWHID]: + async def get_cached_swhids(self) -> AsyncGenerator[SWHID, None]: """ Return a list of all previously cached SWHID """ - with closing(self.metadata.conn.cursor()) as metadata_cursor, ( - closing(self.blob.conn.cursor()) - ) as blob_cursor: - # Some entries can be in one cache but not in the other so create a - # set from all caches - metadata_cursor.execute("select swhid from metadata_cache") - blob_cursor.execute("select swhid from blob_cache") - swhids = metadata_cursor.fetchall() + blob_cursor.fetchall() - swhids = [parse_swhid(x[0]) for x in swhids] - return set(swhids) + meta_cursor = await self.metadata.conn.execute( + "select swhid from metadata_cache" + ) + blob_cursor = await self.blob.conn.execute("select swhid from blob_cache") + # Some entries can be in one cache but not in the other so create a set + # from all caches + swhids = await meta_cursor.fetchall() + await blob_cursor.fetchall() + swhids = [parse_swhid(x[0]) for x in swhids] + for swhid in set(swhids): + yield swhid class AbstractCache(ABC): """ Abstract cache implementation to share common behavior between cache types (such as: YAML config parsing, SQLite context manager) """ def __init__(self, conf: Dict[str, Any]): self.conf = conf - def __enter__(self): + async def __aenter__(self): # In-memory (thus temporary) caching is useful for testing purposes if self.conf.get("in-memory", False): path = ":memory:" else: path = self.conf["path"] - self.conn = sqlite3.connect(path) + self.conn = await aiosqlite.connect(path) return self - def __exit__(self, type=None, val=None, tb=None) -> None: - self.conn.close() + async def __aexit__(self, type=None, val=None, tb=None) -> None: + await self.conn.close() class MetadataCache(AbstractCache): """ The metadata cache map each SWHID to the complete metadata of the referenced object. This is analogous to what is available in `meta/.json` file (and generally used as data source for returning the content of those files). """ - def __enter__(self): - super().__enter__() - with self.conn as conn: - conn.execute("create table if not exists metadata_cache (swhid, metadata)") + async def __aenter__(self): + await super().__aenter__() + await self.conn.execute( + "create table if not exists metadata_cache (swhid, metadata)" + ) return self - def __getitem__(self, swhid: SWHID) -> Any: - with self.conn as conn, closing(conn.cursor()) as cursor: - cursor.execute( - "select metadata from metadata_cache where swhid=?", (str(swhid),) - ) - cache = cursor.fetchone() - if cache: - metadata = json.loads(cache[0]) - return typify(metadata, swhid.object_type) - else: - return None - - def __setitem__(self, swhid: SWHID, metadata: Any) -> None: - with self.conn as conn: - conn.execute( - "insert into metadata_cache values (?, ?)", - ( - str(swhid), - json.dumps( - metadata, - # Converts the typified JSON to plain str version - default=lambda x: ( - x.object_id if isinstance(x, SWHID) else x.__dict__ - ), + async def get(self, swhid: SWHID) -> Any: + cursor = await self.conn.execute( + "select metadata from metadata_cache where swhid=?", (str(swhid),) + ) + cache = await cursor.fetchone() + if cache: + metadata = json.loads(cache[0]) + return typify(metadata, swhid.object_type) + else: + return None + + async def set(self, swhid: SWHID, metadata: Any) -> None: + await self.conn.execute( + "insert into metadata_cache values (?, ?)", + ( + str(swhid), + json.dumps( + metadata, + # Converts the typified JSON to plain str version + default=lambda x: ( + x.object_id if isinstance(x, SWHID) else x.__dict__ ), ), - ) + ), + ) class BlobCache(AbstractCache): """ The blob cache map SWHIDs of type `cnt` to the bytes of their archived content. The blob cache entry for a given content object is populated, at the latest, the first time the object is `read()`-d. It might be populated earlier on due to prefetching, e.g., when a directory pointing to the given content is listed for the first time. """ - def __enter__(self): - super().__enter__() - with self.conn as conn: - conn.execute("create table if not exists blob_cache (swhid, blob)") + async def __aenter__(self): + await super().__aenter__() + await self.conn.execute("create table if not exists blob_cache (swhid, blob)") return self - def __getitem__(self, swhid: SWHID) -> Optional[str]: - with self.conn as conn, closing(conn.cursor()) as cursor: - cursor.execute("select blob from blob_cache where swhid=?", (str(swhid),)) - cache = cursor.fetchone() - if cache: - blob = cache[0] - return blob - else: - return None - - def __setitem__(self, swhid: SWHID, blob: str) -> None: - with self.conn as conn: - conn.execute("insert into blob_cache values (?, ?)", (str(swhid), blob)) + async def get(self, swhid: SWHID) -> Optional[str]: + cursor = await self.conn.execute( + "select blob from blob_cache where swhid=?", (str(swhid),) + ) + cache = await cursor.fetchone() + if cache: + blob = cache[0] + return blob + else: + return None + + async def set(self, swhid: SWHID, blob: str) -> None: + await self.conn.execute( + "insert into blob_cache values (?, ?)", (str(swhid), blob) + ) diff --git a/swh/fuse/cli.py b/swh/fuse/cli.py index d3a1aa0..60d9124 100644 --- a/swh/fuse/cli.py +++ b/swh/fuse/cli.py @@ -1,103 +1,104 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import asyncio from contextlib import ExitStack import os # WARNING: do not import unnecessary things here to keep cli startup time under # control from pathlib import Path from typing import Any, Dict, Tuple import click from daemon import DaemonContext # from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.model.cli import SWHIDParamType # All generic config code should reside in swh.core.config DEFAULT_CONFIG_PATH = os.environ.get( "SWH_CONFIG_FILE", os.path.join(click.get_app_dir("swh"), "global.yml") ) CACHE_HOME_DIR: Path = ( Path(os.environ["XDG_CACHE_HOME"]) if "XDG_CACHE_HOME" in os.environ else Path.home() / ".cache" ) DEFAULT_CONFIG: Dict[str, Tuple[str, Any]] = { "cache": ( "dict", { "metadata": {"path": CACHE_HOME_DIR / "swh/fuse/metadata.sqlite"}, "blob": {"path": CACHE_HOME_DIR / "swh/fuse/blob.sqlite"}, }, ), "web-api": ( "dict", {"url": "https://archive.softwareheritage.org/api/1", "auth-token": None,}, ), } @click.group(name="fuse", context_settings=CONTEXT_SETTINGS) # XXX conffile logic temporarily commented out due to: # XXX https://forge.softwareheritage.org/T2632 # @click.option( # "-C", # "--config-file", # default=DEFAULT_CONFIG_PATH, # type=click.Path(exists=True, dir_okay=False, path_type=str), # help="YAML configuration file", # ) @click.pass_context def cli(ctx): """Software Heritage virtual file system""" # # recursive merge not done by config.read # conf = config.read_raw_config(config.config_basepath(config_file)) # conf = config.merge_configs(DEFAULT_CONFIG, conf) conf = {} ctx.ensure_object(dict) ctx.obj["config"] = conf @cli.command() @click.argument( "path", required=True, metavar="PATH", type=click.Path(exists=True, dir_okay=True, file_okay=False), ) @click.argument("swhids", nargs=-1, metavar="[SWHID]...", type=SWHIDParamType()) @click.option( "--config-file", "-C", default=None, type=click.Path(exists=True, dir_okay=False,), help="YAML configuration file", ) @click.option( "-f", "--foreground", is_flag=True, show_default=True, help="Run FUSE system in foreground instead of daemon", ) @click.pass_context def mount(ctx, swhids, path, config_file, foreground): """ Mount the Software Heritage archive at the given mount point """ from swh.core import config from swh.fuse import fuse conf = config.read(config_file, DEFAULT_CONFIG) with ExitStack() as stack: if not foreground: stack.enter_context(DaemonContext()) - fuse.main(swhids, path, conf) + asyncio.run(fuse.main(swhids, path, conf)) diff --git a/swh/fuse/fs/artifact.py b/swh/fuse/fs/artifact.py index 590a224..b65ab71 100644 --- a/swh/fuse/fs/artifact.py +++ b/swh/fuse/fs/artifact.py @@ -1,78 +1,75 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Any, Iterator +from typing import Any, AsyncIterator from swh.fuse.fs.entry import ArtifactEntry, EntryMode from swh.model.identifiers import CONTENT, DIRECTORY, SWHID # Avoid cycling import Fuse = "Fuse" def typify(name: str, mode: int, fuse: Fuse, swhid: SWHID, prefetch: Any = None) -> Any: """ Create an artifact entry corresponding to the given artifact type """ constructor = {CONTENT: Content, DIRECTORY: Directory} return constructor[swhid.object_type](name, mode, fuse, swhid, prefetch) class Content(ArtifactEntry): """ Software Heritage content artifact. Content leaves (AKA blobs) are represented on disks as regular files, containing the corresponding bytes, as archived. Note that permissions are associated to blobs only in the context of directories. Hence, when accessing blobs from the top-level `archive/` directory, the permissions of the `archive/SWHID` file will be arbitrary and not meaningful (e.g., `0x644`). """ - def __str__(self) -> str: - return self.fuse.get_blob(self.swhid) + async def content(self) -> str: + return await self.fuse.get_blob(self.swhid) - def __len__(self) -> int: + async def length(self) -> int: # When listing entries from a directory, the API already gave us information if self.prefetch: return self.prefetch["length"] - return len(str(self)) + return len(await self.content()) - def __iter__(self): + async def __aiter__(self): raise ValueError("Cannot iterate over a content type artifact") class Directory(ArtifactEntry): """ Software Heritage directory artifact. Directory nodes are represented as directories on the file-system, containing one entry for each entry of the archived directory. Entry names and other metadata, including permissions, will correspond to the archived entry metadata. Note that the FUSE mount is read-only, no matter what the permissions say. So it is possible that, in the context of a directory, a file is presented as writable, whereas actually writing to it will fail with `EPERM`. """ - def __iter__(self) -> Iterator[ArtifactEntry]: - entries = [] - for entry in self.fuse.get_metadata(self.swhid): - entries.append( - typify( - name=entry["name"], - # Use default read-only permissions for directories, and - # archived permissions for contents - mode=( - entry["perms"] - if entry["target"].object_type == CONTENT - else int(EntryMode.RDONLY_DIR) - ), - fuse=self.fuse, - swhid=entry["target"], - # The directory API has extra info we can use to set attributes - # without additional Software Heritage API call - prefetch=entry, - ) + async def __aiter__(self) -> AsyncIterator[ArtifactEntry]: + metadata = await self.fuse.get_metadata(self.swhid) + for entry in metadata: + yield typify( + name=entry["name"], + # Use default read-only permissions for directories, and + # archived permissions for contents + mode=( + entry["perms"] + if entry["target"].object_type == CONTENT + else int(EntryMode.RDONLY_DIR) + ), + fuse=self.fuse, + swhid=entry["target"], + # The directory API has extra info we can use to set attributes + # without additional Software Heritage API call + prefetch=entry, ) - return iter(entries) diff --git a/swh/fuse/fs/entry.py b/swh/fuse/fs/entry.py index 29777bc..e16a7d5 100644 --- a/swh/fuse/fs/entry.py +++ b/swh/fuse/fs/entry.py @@ -1,64 +1,67 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from enum import IntEnum from stat import S_IFDIR, S_IFREG from typing import Any from swh.model.identifiers import SWHID # Avoid cycling import Fuse = "Fuse" class EntryMode(IntEnum): """ Default entry mode and permissions for the FUSE. The FUSE mount is always read-only, even if permissions contradict this statement (in a context of a directory, entries are listed with permissions taken from the archive). """ RDONLY_FILE = S_IFREG | 0o444 RDONLY_DIR = S_IFDIR | 0o555 class FuseEntry: """ Main wrapper class to manipulate virtual FUSE entries Attributes: name: entry filename mode: entry permission mode fuse: internal reference to the main FUSE class inode: unique integer identifying the entry """ def __init__(self, name: str, mode: int, fuse: Fuse): self.name = name self.mode = mode self.fuse = fuse self.inode = fuse._alloc_inode(self) - def __len__(self) -> int: + async def length(self) -> int: return 0 - def __iter__(self): + async def content(self): + return None + + async def __aiter__(self): return None class ArtifactEntry(FuseEntry): """ FUSE virtual entry for a Software Heritage Artifact Attributes: swhid: Software Heritage persistent identifier prefetch: optional prefetched metadata used to set entry attributes """ def __init__( self, name: str, mode: int, fuse: Fuse, swhid: SWHID, prefetch: Any = None ): super().__init__(name, mode, fuse) self.swhid = swhid self.prefetch = prefetch diff --git a/swh/fuse/fs/mountpoint.py b/swh/fuse/fs/mountpoint.py index 276f452..ab13bf9 100644 --- a/swh/fuse/fs/mountpoint.py +++ b/swh/fuse/fs/mountpoint.py @@ -1,79 +1,75 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Iterator +from typing import AsyncIterator from swh.fuse.fs.artifact import typify from swh.fuse.fs.entry import EntryMode, FuseEntry from swh.model.identifiers import CONTENT, SWHID # Avoid cycling import Fuse = "Fuse" class Root(FuseEntry): """ The FUSE mountpoint, consisting of the archive/ and meta/ directories """ def __init__(self, fuse: Fuse): super().__init__(name="root", mode=int(EntryMode.RDONLY_DIR), fuse=fuse) - def __iter__(self) -> Iterator[FuseEntry]: - entries = [ArchiveDir(self.fuse), MetaDir(self.fuse)] - return iter(entries) + async def __aiter__(self) -> AsyncIterator[FuseEntry]: + for entry in [ArchiveDir(self.fuse), MetaDir(self.fuse)]: + yield entry class ArchiveDir(FuseEntry): """ The archive/ directory is lazily populated with one entry per accessed SWHID, having actual SWHIDs as names """ def __init__(self, fuse: Fuse): super().__init__(name="archive", mode=int(EntryMode.RDONLY_DIR), fuse=fuse) - def __iter__(self) -> Iterator[FuseEntry]: - entries = [] - for swhid in self.fuse.cache.get_cached_swhids(): + async def __aiter__(self) -> AsyncIterator[FuseEntry]: + async for swhid in self.fuse.cache.get_cached_swhids(): if swhid.object_type == CONTENT: mode = EntryMode.RDONLY_FILE else: mode = EntryMode.RDONLY_DIR - entries.append(typify(str(swhid), int(mode), self.fuse, swhid)) - return iter(entries) + yield typify(str(swhid), int(mode), self.fuse, swhid) class MetaDir(FuseEntry): """ The meta/ directory contains one SWHID.json file for each SWHID entry under archive/. The JSON file contain all available meta information about the given SWHID, as returned by the Software Heritage Web API for that object. Note that, in case of pagination (e.g., snapshot objects with many branches) the JSON file will contain a complete version with all pages merged together. """ def __init__(self, fuse: Fuse): super().__init__(name="meta", mode=int(EntryMode.RDONLY_DIR), fuse=fuse) - def __iter__(self) -> Iterator[FuseEntry]: - entries = [] - for swhid in self.fuse.cache.get_cached_swhids(): - entries.append(MetaEntry(swhid, self.fuse)) - return iter(entries) + async def __aiter__(self) -> AsyncIterator[FuseEntry]: + async for swhid in self.fuse.cache.get_cached_swhids(): + yield MetaEntry(swhid, self.fuse) class MetaEntry(FuseEntry): """ An entry from the meta/ directory, containing for each accessed SWHID a corresponding SWHID.json file with all the metadata from the Software Heritage archive. """ def __init__(self, swhid: SWHID, fuse: Fuse): super().__init__( name=str(swhid) + ".json", mode=int(EntryMode.RDONLY_FILE), fuse=fuse ) self.swhid = swhid - def __str__(self) -> str: - metadata = self.fuse.get_metadata(self.swhid) + async def content(self) -> str: + metadata = await self.fuse.get_metadata(self.swhid) return str(metadata) - def __len__(self) -> int: - return len(str(self)) + async def length(self) -> int: + return len(await self.content()) diff --git a/swh/fuse/fuse.py b/swh/fuse/fuse.py index e2f63e7..8c9b6cd 100644 --- a/swh/fuse/fuse.py +++ b/swh/fuse/fuse.py @@ -1,240 +1,246 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import asyncio import errno -import itertools import logging import os from pathlib import Path import time from typing import Any, Dict, List import pyfuse3 import pyfuse3_asyncio import requests from swh.fuse.cache import FuseCache from swh.fuse.fs.entry import FuseEntry from swh.fuse.fs.mountpoint import Root from swh.model.identifiers import CONTENT, SWHID from swh.web.client.client import WebAPIClient class Fuse(pyfuse3.Operations): """ Software Heritage Filesystem in Userspace (FUSE). Locally mount parts of the archive and navigate it as a virtual file system. """ def __init__( self, root_path: Path, cache: FuseCache, conf: Dict[str, Any], ): super(Fuse, self).__init__() self._next_inode: int = pyfuse3.ROOT_INODE self._next_fd: int = 0 self._inode2entry: Dict[int, FuseEntry] = {} self.root = Root(fuse=self) self._inode2fd: Dict[int, int] = {} self._fd2inode: Dict[int, int] = {} self._inode2path: Dict[int, Path] = {self.root.inode: root_path} self.time_ns: int = time.time_ns() # start time, used as timestamp self.gid = os.getgid() self.uid = os.getuid() self.web_api = WebAPIClient( conf["web-api"]["url"], conf["web-api"]["auth-token"] ) self.cache = cache def shutdown(self) -> None: pass def _alloc_inode(self, entry: FuseEntry) -> int: """ Return a unique inode integer for a given entry """ inode = self._next_inode self._next_inode += 1 self._inode2entry[inode] = entry # TODO add inode recycling with invocation to invalidate_inode when # the dicts get too big return inode def _alloc_fd(self, inode: int) -> int: """ Return a unique file descriptor integer for a given inode """ try: return self._inode2fd[inode] except KeyError: fd = self._next_fd self._next_fd += 1 self._inode2fd[inode] = fd self._fd2inode[fd] = inode return fd def inode2entry(self, inode: int) -> FuseEntry: """ Return the entry matching a given inode """ try: return self._inode2entry[inode] except KeyError: raise pyfuse3.FUSEError(errno.ENOENT) def inode2path(self, inode: int) -> Path: """ Return the path matching a given inode """ try: return self._inode2path[inode] except KeyError: raise pyfuse3.FUSEError(errno.ENOENT) - def get_metadata(self, swhid: SWHID) -> Any: + async def get_metadata(self, swhid: SWHID) -> Any: """ Retrieve metadata for a given SWHID using Software Heritage API """ - # TODO: swh-graph API - cache = self.cache.metadata[swhid] + cache = await self.cache.metadata.get(swhid) if cache: return cache try: - metadata = self.web_api.get(swhid) - self.cache.metadata[swhid] = metadata + # TODO: swh-graph API + # TODO: async web API + loop = asyncio.get_event_loop() + metadata = await loop.run_in_executor(None, self.web_api.get, swhid) + await self.cache.metadata.set(swhid, metadata) return metadata except requests.HTTPError: logging.error(f"Unknown SWHID: '{swhid}'") - def get_blob(self, swhid: SWHID) -> str: + # TODO: should return bytes + async def get_blob(self, swhid: SWHID) -> str: """ Retrieve the blob bytes for a given content SWHID using Software Heritage API """ if swhid.object_type != CONTENT: raise pyfuse3.FUSEError(errno.EINVAL) - cache = self.cache.blob[swhid] + cache = await self.cache.blob.get(swhid) if cache: return cache - resp = list(self.web_api.content_raw(swhid)) - blob = "".join(map(bytes.decode, resp)) - self.cache.blob[swhid] = blob + loop = asyncio.get_event_loop() + resp = await loop.run_in_executor(None, self.web_api.content_raw, swhid) + blob = "".join(map(bytes.decode, list(resp))) + await self.cache.blob.set(swhid, blob) return blob - def get_attrs(self, entry: FuseEntry) -> pyfuse3.EntryAttributes: + async def get_attrs(self, entry: FuseEntry) -> pyfuse3.EntryAttributes: """ Return entry attributes """ attrs = pyfuse3.EntryAttributes() attrs.st_size = 0 attrs.st_atime_ns = self.time_ns attrs.st_ctime_ns = self.time_ns attrs.st_mtime_ns = self.time_ns attrs.st_gid = self.gid attrs.st_uid = self.uid attrs.st_ino = entry.inode attrs.st_mode = entry.mode - attrs.st_size = len(entry) + attrs.st_size = await entry.length() return attrs async def getattr( self, inode: int, _ctx: pyfuse3.RequestContext ) -> pyfuse3.EntryAttributes: """ Get attributes for a given inode """ entry = self.inode2entry(inode) - return self.get_attrs(entry) + return await self.get_attrs(entry) async def opendir(self, inode: int, _ctx: pyfuse3.RequestContext) -> int: """ Open a directory referred by a given inode """ # Re-use inode as directory handle return inode async def readdir( self, inode: int, offset: int, token: pyfuse3.ReaddirToken ) -> None: """ Read entries in an open directory """ direntry = self.inode2entry(inode) path = self.inode2path(inode) # TODO: add cache on direntry list? next_id = offset + 1 - for entry in itertools.islice(direntry, offset, None): + i = 0 + async for entry in direntry: + if i < offset: + i += 1 + continue + name = os.fsencode(entry.name) - attrs = self.get_attrs(entry) + attrs = await self.get_attrs(entry) if not pyfuse3.readdir_reply(token, name, attrs, next_id): break next_id += 1 self._inode2entry[attrs.st_ino] = entry self._inode2path[attrs.st_ino] = Path(path, entry.name) async def open( self, inode: int, _flags: int, _ctx: pyfuse3.RequestContext ) -> pyfuse3.FileInfo: """ Open an inode and return a unique file descriptor """ fd = self._alloc_fd(inode) return pyfuse3.FileInfo(fh=fd, keep_cache=True) async def read(self, fd: int, offset: int, length: int) -> bytes: """ Read `length` bytes from file descriptor `fd` at position `offset` """ try: inode = self._fd2inode[fd] entry = self.inode2entry(inode) except KeyError: raise pyfuse3.FUSEError(errno.ENOENT) - data = str(entry) + data = await entry.content() return data.encode()[offset : offset + length] async def lookup( self, parent_inode: int, name: str, _ctx: pyfuse3.RequestContext ) -> pyfuse3.EntryAttributes: """ Look up a directory entry by name and get its attributes """ name = os.fsdecode(name) path = Path(self.inode2path(parent_inode), name) parent_entry = self.inode2entry(parent_inode) - for entry in parent_entry: + async for entry in parent_entry: if name == entry.name: - attr = self.get_attrs(entry) + attr = await self.get_attrs(entry) self._inode2path[attr.st_ino] = path return attr logging.error(f"Unknown name during lookup: '{name}'") raise pyfuse3.FUSEError(errno.ENOENT) -def main(swhids: List[SWHID], root_path: Path, conf: Dict[str, Any]) -> None: +async def main(swhids: List[SWHID], root_path: Path, conf: Dict[str, Any]) -> None: """ swh-fuse CLI entry-point """ # Use pyfuse3 asyncio layer to match the rest of Software Heritage codebase pyfuse3_asyncio.enable() - with FuseCache(conf["cache"]) as cache: + async with FuseCache(conf["cache"]) as cache: fs = Fuse(root_path, cache, conf) # Initially populate the cache for swhid in swhids: - fs.get_metadata(swhid) + await fs.get_metadata(swhid) fuse_options = set(pyfuse3.default_options) fuse_options.add("fsname=swhfs") fuse_options.add("debug") pyfuse3.init(fs, root_path, fuse_options) - loop = asyncio.get_event_loop() try: - loop.run_until_complete(pyfuse3.main()) - fs.shutdown() + await pyfuse3.main() finally: + fs.shutdown() pyfuse3.close(unmount=True) - loop.close()