diff --git a/mypy.ini b/mypy.ini --- a/mypy.ini +++ b/mypy.ini @@ -14,6 +14,9 @@ [mypy-pkg_resources.*] ignore_missing_imports = True +[mypy-psutil.*] +ignore_missing_imports = True + [mypy-pytest.*] ignore_missing_imports = True @@ -22,3 +25,6 @@ [mypy-pyfuse3_asyncio.*] ignore_missing_imports = True + +[mypy-pympler.*] +ignore_missing_imports = True diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,5 @@ aiosqlite +psutil pyfuse3 +pympler python-daemon diff --git a/swh/fuse/cache.py b/swh/fuse/cache.py --- a/swh/fuse/cache.py +++ b/swh/fuse/cache.py @@ -4,13 +4,21 @@ # See top-level LICENSE file for more information from abc import ABC +from collections import OrderedDict +from dataclasses import dataclass, field import json import logging from pathlib import Path +import re +import sys from typing import Any, AsyncGenerator, Dict, List, Optional import aiosqlite +from psutil import virtual_memory +from pympler import asizeof +from swh.fuse.fs.entry import FuseDirEntry, FuseEntry +from swh.fuse.fs.mountpoint import ArchiveDir, MetaDir from swh.model.exceptions import ValidationError from swh.model.identifiers import SWHID, parse_swhid from swh.web.client.client import typify_json @@ -41,6 +49,7 @@ self.metadata = MetadataCache(self.cache_conf["metadata"]) self.blob = BlobCache(self.cache_conf["blob"]) self.history = HistoryCache(self.cache_conf["history"]) + self.direntry = DirEntryCache(self.cache_conf["direntry"]) await self.metadata.__aenter__() await self.blob.__aenter__() await self.history.__aenter__() @@ -211,3 +220,72 @@ "insert or ignore into history_graph values (?, ?)", edges ) await self.conn.commit() + + +class DirEntryCache: + """ The direntry cache map inode representing directories to the entries + they contain. Each entry comes with its name as well as file attributes + (i.e., all its needed to perform a detailed directory listing). + + Additional attributes of each directory entry should be looked up on a entry + by entry basis, possibly hitting other caches. + + The direntry cache for a given dir is populated, at the latest, when the + content of the directory is listed. More aggressive prefetching might + happen. For instance, when first opening a dir a recursive listing of it can + be retrieved from the remote backend and used to recursively populate the + direntry cache for all (transitive) sub-directories. """ + + @dataclass + class LRU(OrderedDict): + max_ram: int + used_ram: int = field(init=False, default=0) + object_size: Dict[Any, int] = field(init=False, default_factory=dict) + + def __getitem__(self, key: Any) -> Any: + value = super().__getitem__(key) + self.move_to_end(key) + return value + + def __setitem__(self, key: Any, value: Any) -> None: + if key in self: + self.move_to_end(key) + else: + nb_bytes = asizeof.asizeof(value) + self.used_ram += nb_bytes + self.object_size[key] = nb_bytes + + super().__setitem__(key, value) + + while self.used_ram > self.max_ram and self: + oldest = next(iter(self)) + self.used_ram -= self.object_size[oldest] + del self[oldest] + + def __init__(self, conf: Dict[str, Any]): + m = re.match(r"(\d+)\s*(.+)\s*", conf["maxram"]) + if not m: + logging.error(f"Cannot parse direntry maxram config: {conf['maxram']}") + sys.exit(1) + + num = float(m.group(1)) + unit = m.group(2).upper() + + if unit == "%": + max_ram = int(num * virtual_memory().available / 100) + else: + units = {"B": 1, "KB": 10 ** 3, "MB": 10 ** 6, "GB": 10 ** 9} + max_ram = int(float(num) * units[unit]) + + self.lru_cache = self.LRU(max_ram) + + def get(self, direntry: FuseDirEntry) -> Optional[List[FuseEntry]]: + return self.lru_cache.get(direntry.inode, None) + + def set(self, direntry: FuseDirEntry, entries: List[FuseEntry]) -> None: + if isinstance(direntry, ArchiveDir) or isinstance(direntry, MetaDir): + # The `archive/` and `meta/` are populated on the fly so we should + # never cache them + pass + else: + self.lru_cache[direntry.inode] = entries diff --git a/swh/fuse/cli.py b/swh/fuse/cli.py --- a/swh/fuse/cli.py +++ b/swh/fuse/cli.py @@ -31,6 +31,7 @@ "metadata": {"path": CACHE_HOME_DIR / "swh/fuse/metadata.sqlite"}, "blob": {"path": CACHE_HOME_DIR / "swh/fuse/blob.sqlite"}, "history": {"path": CACHE_HOME_DIR / "swh/fuse/history.sqlite"}, + "direntry": {"maxram": "10%"}, }, "web-api": { "url": "https://archive.softwareheritage.org/api/1", diff --git a/swh/fuse/fs/artifact.py b/swh/fuse/fs/artifact.py --- a/swh/fuse/fs/artifact.py +++ b/swh/fuse/fs/artifact.py @@ -69,7 +69,7 @@ swhid: SWHID - async def __aiter__(self) -> AsyncIterator[FuseEntry]: + async def compute_entries(self) -> AsyncIterator[FuseEntry]: metadata = await self.fuse.get_metadata(self.swhid) for entry in metadata: name = entry["name"] @@ -147,7 +147,7 @@ swhid: SWHID - async def __aiter__(self) -> AsyncIterator[FuseEntry]: + async def compute_entries(self) -> AsyncIterator[FuseEntry]: metadata = await self.fuse.get_metadata(self.swhid) directory = metadata["directory"] parents = metadata["parents"] @@ -190,7 +190,7 @@ parents: List[SWHID] - async def __aiter__(self) -> AsyncIterator[FuseEntry]: + async def compute_entries(self) -> AsyncIterator[FuseEntry]: root_path = self.get_relative_root_path() for i, parent in enumerate(self.parents): yield self.create_child( @@ -206,7 +206,7 @@ swhid: SWHID - async def __aiter__(self) -> AsyncIterator[FuseEntry]: + async def compute_entries(self) -> AsyncIterator[FuseEntry]: history = await self.fuse.get_history(self.swhid) root_path = self.get_relative_root_path() for swhid in history: @@ -249,7 +249,7 @@ else: return None - async def __aiter__(self) -> AsyncIterator[FuseEntry]: + async def compute_entries(self) -> AsyncIterator[FuseEntry]: metadata = await self.fuse.get_metadata(self.swhid) root_path = self.get_relative_root_path() @@ -306,7 +306,7 @@ swhid: SWHID - async def __aiter__(self) -> AsyncIterator[FuseEntry]: + async def compute_entries(self) -> AsyncIterator[FuseEntry]: metadata = await self.fuse.get_metadata(self.swhid) root_path = self.get_relative_root_path() diff --git a/swh/fuse/fs/entry.py b/swh/fuse/fs/entry.py --- a/swh/fuse/fs/entry.py +++ b/swh/fuse/fs/entry.py @@ -9,7 +9,7 @@ from enum import IntEnum from pathlib import Path from stat import S_IFDIR, S_IFLNK, S_IFREG -from typing import Any, Union +from typing import Any, AsyncIterator, Sequence, Union # Avoid cycling import Fuse = "Fuse" @@ -75,15 +75,30 @@ async def size(self) -> int: return 0 - async def __aiter__(self): + async def compute_entries(self) -> Sequence[FuseEntry]: """ Return the child entries of a directory entry """ raise NotImplementedError + async def get_entries(self, offset: int = 0) -> AsyncIterator[FuseEntry]: + """ Return the child entries of a directory entry using direntry cache """ + + cache = self.fuse.cache.direntry.get(self) + if cache: + entries = cache + else: + entries = [x async for x in self.compute_entries()] + self.fuse.cache.direntry.set(self, entries) + + # Avoid copy by manual iteration (instead of slicing) and use of a + # generator (instead of returning the full list every time) + for i in range(offset, len(entries)): + yield entries[i] + async def lookup(self, name: str) -> FuseEntry: """ Look up a FUSE entry by name """ - async for entry in self: + async for entry in self.get_entries(): if entry.name == name: return entry return None diff --git a/swh/fuse/fs/mountpoint.py b/swh/fuse/fs/mountpoint.py --- a/swh/fuse/fs/mountpoint.py +++ b/swh/fuse/fs/mountpoint.py @@ -21,7 +21,7 @@ mode: int = field(init=False, default=int(EntryMode.RDONLY_DIR)) depth: int = field(init=False, default=1) - async def __aiter__(self) -> AsyncIterator[FuseEntry]: + async def compute_entries(self) -> AsyncIterator[FuseEntry]: yield self.create_child(ArchiveDir) yield self.create_child(MetaDir) @@ -46,7 +46,7 @@ swhid=swhid, ) - async def __aiter__(self) -> AsyncIterator[FuseEntry]: + async def compute_entries(self) -> AsyncIterator[FuseEntry]: async for swhid in self.fuse.cache.get_cached_swhids(): yield self.create_child(swhid) @@ -76,7 +76,7 @@ name: str = field(init=False, default="meta") mode: int = field(init=False, default=int(EntryMode.RDONLY_DIR)) - async def __aiter__(self) -> AsyncIterator[FuseEntry]: + async def compute_entries(self) -> AsyncIterator[FuseEntry]: async for swhid in self.fuse.cache.get_cached_swhids(): yield self.create_child( MetaEntry, diff --git a/swh/fuse/fuse.py b/swh/fuse/fuse.py --- a/swh/fuse/fuse.py +++ b/swh/fuse/fuse.py @@ -169,18 +169,12 @@ # opendir() uses inode as directory handle inode = fh - - # TODO: add cache on direntry list? direntry = self.inode2entry(inode) assert isinstance(direntry, FuseDirEntry) + next_id = offset + 1 - i = 0 try: - async for entry in direntry: - if i < offset: - i += 1 - continue - + async for entry in direntry.get_entries(offset): name = os.fsencode(entry.name) attrs = await self.get_attrs(entry) if not pyfuse3.readdir_reply(token, name, attrs, next_id):