diff --git a/mypy.ini b/mypy.ini --- a/mypy.ini +++ b/mypy.ini @@ -14,6 +14,9 @@ [mypy-pkg_resources.*] ignore_missing_imports = True +[mypy-psutil.*] +ignore_missing_imports = True + [mypy-pytest.*] ignore_missing_imports = True @@ -22,3 +25,6 @@ [mypy-pyfuse3_asyncio.*] ignore_missing_imports = True + +[mypy-pympler.*] +ignore_missing_imports = True diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,5 @@ aiosqlite +psutil pyfuse3 +pympler python-daemon diff --git a/swh/fuse/cache.py b/swh/fuse/cache.py --- a/swh/fuse/cache.py +++ b/swh/fuse/cache.py @@ -4,13 +4,21 @@ # See top-level LICENSE file for more information from abc import ABC +from collections import OrderedDict +from dataclasses import dataclass, field import json import logging from pathlib import Path +import re +import sys from typing import Any, AsyncGenerator, Dict, List, Optional import aiosqlite +from psutil import virtual_memory +from pympler import asizeof +from swh.fuse.fs.entry import FuseDirEntry, FuseEntry +from swh.fuse.fs.mountpoint import ArchiveDir, MetaDir from swh.model.exceptions import ValidationError from swh.model.identifiers import SWHID, parse_swhid from swh.web.client.client import typify_json @@ -41,6 +49,7 @@ self.metadata = MetadataCache(self.cache_conf["metadata"]) self.blob = BlobCache(self.cache_conf["blob"]) self.history = HistoryCache(self.cache_conf["history"]) + self.direntry = DirEntryCache(self.cache_conf["direntry"]) await self.metadata.__aenter__() await self.blob.__aenter__() await self.history.__aenter__() @@ -211,3 +220,73 @@ "insert or ignore into history_graph values (?, ?)", edges ) await self.conn.commit() + + +class DirEntryCache: + """ The direntry cache map inode representing directories to the entries + they contain. Each entry comes with its name as well as file attributes + (i.e., all its needed to perform a detailed directory listing). + + Additional attributes of each directory entry should be looked up on a entry + by entry basis, possibly hitting other caches. + + The direntry cache for a given dir is populated, at the latest, when the + content of the directory is listed. More aggressive prefetching might + happen. For instance, when first opening a dir a recursive listing of it can + be retrieved from the remote backend and used to recursively populate the + direntry cache for all (transitive) sub-directories. """ + + @dataclass + class LRU(OrderedDict): + max_ram: int + used_ram: int = field(init=False, default=0) + object_size: Dict[Any, int] = field(init=False, default_factory=dict) + + def __getitem__(self, key: Any) -> Any: + value = super().__getitem__(key) + self.move_to_end(key) + return value + + def __setitem__(self, key: Any, value: Any) -> None: + if key in self: + self.move_to_end(key) + else: + nb_bytes = asizeof.asizeof(value) + self.used_ram += nb_bytes + self.object_size[key] = nb_bytes + + super().__setitem__(key, value) + + while self.used_ram > self.max_ram and self: + oldest = next(iter(self)) + self.used_ram -= self.object_size[oldest] + del self[oldest] + + def __init__(self, conf: Dict[str, Any]): + m = re.match(r"(\d+)\s*(.+)\s*", conf["maxram"]) + if not m: + logging.error(f"Cannot parse direntry maxram config: {conf['maxram']}") + sys.exit(1) + + num = float(m.group(1)) + unit = m.group(2).upper() + + if unit == "%": + max_ram = int(num * virtual_memory().available / 100) + else: + units = {"B": 1, "KB": 10 ** 3, "MB": 10 ** 6, "GB": 10 ** 9} + max_ram = int(float(num) * units[unit]) + + self.lru_cache = self.LRU(max_ram) + + async def get(self, direntry: FuseDirEntry) -> Optional[List[FuseEntry]]: + return self.lru_cache.get(direntry.inode, None) + + def set(self, direntry: FuseDirEntry, entries: List[FuseEntry]) -> None: + if isinstance(direntry, ArchiveDir) or isinstance(direntry, MetaDir): + # The `archive/` and `meta/` are populated on the fly so we should + # never cache them + pass + else: + # TODO: store entries as dict referenced by name (except for history/)? + self.lru_cache[direntry.inode] = entries diff --git a/swh/fuse/cli.py b/swh/fuse/cli.py --- a/swh/fuse/cli.py +++ b/swh/fuse/cli.py @@ -31,6 +31,7 @@ "metadata": {"path": CACHE_HOME_DIR / "swh/fuse/metadata.sqlite"}, "blob": {"path": CACHE_HOME_DIR / "swh/fuse/blob.sqlite"}, "history": {"path": CACHE_HOME_DIR / "swh/fuse/history.sqlite"}, + "direntry": {"maxram": "10%"}, }, "web-api": { "url": "https://archive.softwareheritage.org/api/1", diff --git a/swh/fuse/fs/entry.py b/swh/fuse/fs/entry.py --- a/swh/fuse/fs/entry.py +++ b/swh/fuse/fs/entry.py @@ -83,7 +83,7 @@ async def lookup(self, name: str) -> FuseEntry: """ Look up a FUSE entry by name """ - async for entry in self: + async for entry in self.fuse.get_direntries(self): if entry.name == name: return entry return None diff --git a/swh/fuse/fuse.py b/swh/fuse/fuse.py --- a/swh/fuse/fuse.py +++ b/swh/fuse/fuse.py @@ -9,7 +9,7 @@ import os from pathlib import Path import time -from typing import Any, Dict, List +from typing import Any, AsyncIterator, Dict, List import pyfuse3 import pyfuse3_asyncio @@ -135,6 +135,22 @@ # an empty list. return [] + async def get_direntries( + self, direntry: FuseDirEntry, offset: int = 0 + ) -> AsyncIterator[FuseEntry]: + """ Retrieve a directory entries """ + + cache = await self.cache.direntry.get(direntry) + if cache: + entries = cache + else: + entries = [x async for x in direntry] + self.cache.direntry.set(direntry, entries) + + # Use a generator to not return the entire list every time (eg: on cache hit) + for entry in entries[offset:]: + yield entry + async def get_attrs(self, entry: FuseEntry) -> pyfuse3.EntryAttributes: """ Return entry attributes """ @@ -169,18 +185,12 @@ # opendir() uses inode as directory handle inode = fh - - # TODO: add cache on direntry list? direntry = self.inode2entry(inode) assert isinstance(direntry, FuseDirEntry) + next_id = offset + 1 - i = 0 try: - async for entry in direntry: - if i < offset: - i += 1 - continue - + async for entry in self.get_direntries(direntry, offset): name = os.fsencode(entry.name) attrs = await self.get_attrs(entry) if not pyfuse3.readdir_reply(token, name, attrs, next_id):