diff --git a/mypy.ini b/mypy.ini --- a/mypy.ini +++ b/mypy.ini @@ -14,6 +14,9 @@ [mypy-pkg_resources.*] ignore_missing_imports = True +[mypy-psutil.*] +ignore_missing_imports = True + [mypy-pytest.*] ignore_missing_imports = True @@ -22,3 +25,6 @@ [mypy-pyfuse3_asyncio.*] ignore_missing_imports = True + +[mypy-pympler.*] +ignore_missing_imports = True diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,5 @@ aiosqlite +psutil pyfuse3 +pympler python-daemon diff --git a/swh/fuse/cache.py b/swh/fuse/cache.py --- a/swh/fuse/cache.py +++ b/swh/fuse/cache.py @@ -4,13 +4,21 @@ # See top-level LICENSE file for more information from abc import ABC +from collections import OrderedDict +from dataclasses import dataclass, field import json import logging from pathlib import Path +import re +import sys from typing import Any, AsyncGenerator, Dict, List, Optional import aiosqlite +from psutil import virtual_memory +from pympler import asizeof +from swh.fuse.fs.entry import FuseDirEntry, FuseEntry +from swh.fuse.fs.mountpoint import ArchiveDir, MetaDir from swh.model.exceptions import ValidationError from swh.model.identifiers import SWHID, parse_swhid from swh.web.client.client import typify_json @@ -41,6 +49,7 @@ self.metadata = MetadataCache(self.cache_conf["metadata"]) self.blob = BlobCache(self.cache_conf["blob"]) self.history = HistoryCache(self.cache_conf["history"]) + self.direntry = DirEntryCache(self.cache_conf["direntry"]) await self.metadata.__aenter__() await self.blob.__aenter__() await self.history.__aenter__() @@ -211,3 +220,73 @@ "insert or ignore into history_graph values (?, ?)", edges ) await self.conn.commit() + + +class DirEntryCache: + """ The direntry cache map inode representing directories to the entries + they contain. Each entry comes with its name as well as file attributes + (i.e., all its needed to perform a detailed directory listing). + + Additional attributes of each directory entry should be looked up on a entry + by entry basis, possibly hitting other caches. + + The direntry cache for a given dir is populated, at the latest, when the + content of the directory is listed. More aggressive prefetching might + happen. For instance, when first opening a dir a recursive listing of it can + be retrieved from the remote backend and used to recursively populate the + direntry cache for all (transitive) sub-directories. """ + + @dataclass + class LRU(OrderedDict): + max_ram: int + used_ram: int = field(init=False, default=0) + object_size: Dict[Any, int] = field(init=False, default_factory=dict) + + def __getitem__(self, key: Any) -> Any: + value = super().__getitem__(key) + self.move_to_end(key) + return value + + def __setitem__(self, key: Any, value: Any) -> None: + if key in self: + self.move_to_end(key) + else: + nb_bytes = asizeof.asizeof(value) + self.used_ram += nb_bytes + self.object_size[key] = nb_bytes + + super().__setitem__(key, value) + + while self.used_ram > self.max_ram and self: + oldest = next(iter(self)) + self.used_ram -= self.object_size[oldest] + del self[oldest] + + def __init__(self, conf: Dict[str, Any]): + m = re.match(r"(\d+)\s*(.+)\s*", conf["maxram"]) + if not m: + logging.error(f"Cannot parse direntry maxram config: {conf['maxram']}") + sys.exit(1) + + num = float(m.group(1)) + unit = m.group(2).upper() + + if unit == "%": + max_ram = int(num * virtual_memory().available / 100) + else: + units = {"B": 1, "KB": 10 ** 3, "MB": 10 ** 6, "GB": 10 ** 9} + max_ram = int(float(num) * units[unit]) + + self.lru_cache = self.LRU(max_ram) + + def get(self, direntry: FuseDirEntry) -> Optional[List[FuseEntry]]: + return self.lru_cache.get(direntry.inode, None) + + def set(self, direntry: FuseDirEntry, entries: List[FuseEntry]) -> None: + if isinstance(direntry, ArchiveDir) or isinstance(direntry, MetaDir): + # The `archive/` and `meta/` are populated on the fly so we should + # never cache them + pass + else: + # TODO: store entries as dict referenced by name (except for history/)? + self.lru_cache[direntry.inode] = entries diff --git a/swh/fuse/cli.py b/swh/fuse/cli.py --- a/swh/fuse/cli.py +++ b/swh/fuse/cli.py @@ -31,6 +31,7 @@ "metadata": {"path": CACHE_HOME_DIR / "swh/fuse/metadata.sqlite"}, "blob": {"path": CACHE_HOME_DIR / "swh/fuse/blob.sqlite"}, "history": {"path": CACHE_HOME_DIR / "swh/fuse/history.sqlite"}, + "direntry": {"maxram": "10%"}, }, "web-api": { "url": "https://archive.softwareheritage.org/api/1", diff --git a/swh/fuse/fs/artifact.py b/swh/fuse/fs/artifact.py --- a/swh/fuse/fs/artifact.py +++ b/swh/fuse/fs/artifact.py @@ -5,7 +5,7 @@ from dataclasses import dataclass from pathlib import Path -from typing import Any, AsyncIterator, List +from typing import Any, List import urllib.parse from swh.fuse.fs.entry import ( @@ -69,7 +69,8 @@ swhid: SWHID - async def __aiter__(self) -> AsyncIterator[FuseEntry]: + async def compute_entries(self) -> List[FuseEntry]: + entries = [] metadata = await self.fuse.get_metadata(self.swhid) for entry in metadata: name = entry["name"] @@ -84,40 +85,47 @@ # 1. Regular file if swhid.object_type == CONTENT: - yield self.create_child( - Content, - name=name, - mode=mode, - swhid=swhid, - # The directory API has extra info we can use to set - # attributes without additional Software Heritage API call - prefetch=entry, + entries.append( + self.create_child( + Content, + name=name, + mode=mode, + swhid=swhid, + # The directory API has extra info we can use to set + # attributes without additional Software Heritage API call + prefetch=entry, + ) ) # 2. Regular directory elif swhid.object_type == DIRECTORY: - yield self.create_child( - Directory, name=name, mode=mode, swhid=swhid, + entries.append( + self.create_child(Directory, name=name, mode=mode, swhid=swhid,) ) # 3. Symlink elif mode == DentryPerms.symlink: - yield self.create_child( - FuseSymlinkEntry, - name=name, - # Symlink target is stored in the blob content - target=await self.fuse.get_blob(swhid), + entries.append( + self.create_child( + FuseSymlinkEntry, + name=name, + # Symlink target is stored in the blob content + target=await self.fuse.get_blob(swhid), + ) ) # 4. Submodule elif swhid.object_type == REVISION: # Make sure the revision metadata is fetched and create a # symlink to distinguish it with regular directories await self.fuse.get_metadata(swhid) - yield self.create_child( - FuseSymlinkEntry, - name=name, - target=Path(self.get_relative_root_path(), f"archive/{swhid}"), + entries.append( + self.create_child( + FuseSymlinkEntry, + name=name, + target=Path(self.get_relative_root_path(), f"archive/{swhid}"), + ) ) else: raise ValueError("Unknown directory entry type: {swhid.object_type}") + return entries @dataclass @@ -147,42 +155,53 @@ swhid: SWHID - async def __aiter__(self) -> AsyncIterator[FuseEntry]: + async def compute_entries(self) -> List[FuseEntry]: + entries = [] metadata = await self.fuse.get_metadata(self.swhid) directory = metadata["directory"] parents = metadata["parents"] root_path = self.get_relative_root_path() - yield self.create_child( - FuseSymlinkEntry, - name="root", - target=Path(root_path, f"archive/{directory}"), + entries.append( + self.create_child( + FuseSymlinkEntry, + name="root", + target=Path(root_path, f"archive/{directory}"), + ) ) - yield self.create_child( - FuseSymlinkEntry, - name="meta.json", - target=Path(root_path, f"meta/{self.swhid}.json"), + entries.append( + self.create_child( + FuseSymlinkEntry, + name="meta.json", + target=Path(root_path, f"meta/{self.swhid}.json"), + ) ) - yield self.create_child( - RevisionParents, - name="parents", - mode=int(EntryMode.RDONLY_DIR), - parents=[x["id"] for x in parents], + entries.append( + self.create_child( + RevisionParents, + name="parents", + mode=int(EntryMode.RDONLY_DIR), + parents=[x["id"] for x in parents], + ) ) if len(parents) >= 1: - yield self.create_child( - FuseSymlinkEntry, name="parent", target="parents/1/", + entries.append( + self.create_child(FuseSymlinkEntry, name="parent", target="parents/1/",) ) - yield self.create_child( - RevisionHistory, - name="history", - mode=int(EntryMode.RDONLY_DIR), - swhid=self.swhid, + entries.append( + self.create_child( + RevisionHistory, + name="history", + mode=int(EntryMode.RDONLY_DIR), + swhid=self.swhid, + ) ) + return entries + @dataclass class RevisionParents(FuseDirEntry): @@ -190,14 +209,18 @@ parents: List[SWHID] - async def __aiter__(self) -> AsyncIterator[FuseEntry]: + async def compute_entries(self) -> List[FuseEntry]: + entries = [] root_path = self.get_relative_root_path() for i, parent in enumerate(self.parents): - yield self.create_child( - FuseSymlinkEntry, - name=str(i + 1), - target=Path(root_path, f"archive/{parent}"), + entries.append( + self.create_child( + FuseSymlinkEntry, + name=str(i + 1), + target=Path(root_path, f"archive/{parent}"), + ) ) + return entries @dataclass @@ -206,15 +229,19 @@ swhid: SWHID - async def __aiter__(self) -> AsyncIterator[FuseEntry]: + async def compute_entries(self) -> List[FuseEntry]: + entries = [] history = await self.fuse.get_history(self.swhid) root_path = self.get_relative_root_path() for swhid in history: - yield self.create_child( - FuseSymlinkEntry, - name=str(swhid), - target=Path(root_path, f"archive/{swhid}"), + entries.append( + self.create_child( + FuseSymlinkEntry, + name=str(swhid), + target=Path(root_path, f"archive/{swhid}"), + ) ) + return entries @dataclass @@ -249,34 +276,46 @@ else: return None - async def __aiter__(self) -> AsyncIterator[FuseEntry]: + async def compute_entries(self) -> List[FuseEntry]: + entries = [] metadata = await self.fuse.get_metadata(self.swhid) root_path = self.get_relative_root_path() - yield self.create_child( - FuseSymlinkEntry, - name="meta.json", - target=Path(root_path, f"meta/{self.swhid}.json"), + entries.append( + self.create_child( + FuseSymlinkEntry, + name="meta.json", + target=Path(root_path, f"meta/{self.swhid}.json"), + ) ) target = metadata["target"] - yield self.create_child( - FuseSymlinkEntry, name="target", target=Path(root_path, f"archive/{target}") + entries.append( + self.create_child( + FuseSymlinkEntry, + name="target", + target=Path(root_path, f"archive/{target}"), + ) ) - yield self.create_child( - ReleaseType, - name="target_type", - mode=int(EntryMode.RDONLY_FILE), - target_type=target.object_type, + entries.append( + self.create_child( + ReleaseType, + name="target_type", + mode=int(EntryMode.RDONLY_FILE), + target_type=target.object_type, + ) ) target_dir = await self.find_root_directory(target) if target_dir is not None: - yield self.create_child( - FuseSymlinkEntry, - name="root", - target=Path(root_path, f"archive/{target_dir}"), + entries.append( + self.create_child( + FuseSymlinkEntry, + name="root", + target=Path(root_path, f"archive/{target_dir}"), + ) ) + return entries @dataclass @@ -306,19 +345,24 @@ swhid: SWHID - async def __aiter__(self) -> AsyncIterator[FuseEntry]: + async def compute_entries(self) -> List[FuseEntry]: + entries = [] metadata = await self.fuse.get_metadata(self.swhid) root_path = self.get_relative_root_path() for branch_name, branch_meta in metadata.items(): # Mangle branch name to create a valid UNIX filename name = urllib.parse.quote_plus(branch_name) - yield self.create_child( - FuseSymlinkEntry, - name=name, - target=Path(root_path, f"archive/{branch_meta['target']}"), + entries.append( + self.create_child( + FuseSymlinkEntry, + name=name, + target=Path(root_path, f"archive/{branch_meta['target']}"), + ) ) + return entries + OBJTYPE_GETTERS = { CONTENT: Content, diff --git a/swh/fuse/fs/entry.py b/swh/fuse/fs/entry.py --- a/swh/fuse/fs/entry.py +++ b/swh/fuse/fs/entry.py @@ -9,7 +9,7 @@ from enum import IntEnum from pathlib import Path from stat import S_IFDIR, S_IFLNK, S_IFREG -from typing import Any, Union +from typing import Any, AsyncIterator, List, Union # Avoid cycling import Fuse = "Fuse" @@ -75,15 +75,29 @@ async def size(self) -> int: return 0 - async def __aiter__(self): + async def compute_entries(self) -> List[FuseEntry]: """ Return the child entries of a directory entry """ raise NotImplementedError + async def get_entries(self, offset: int = 0) -> AsyncIterator[FuseEntry]: + """ Return the child entries of a directory entry using direntry cache """ + + cache = self.fuse.cache.direntry.get(self) + if cache: + entries = cache + else: + entries = await self.compute_entries() + self.fuse.cache.direntry.set(self, entries) + + # Use a generator to not return the entire list every time (eg: on cache hit) + for entry in entries[offset:]: + yield entry + async def lookup(self, name: str) -> FuseEntry: """ Look up a FUSE entry by name """ - async for entry in self: + async for entry in self.get_entries(): if entry.name == name: return entry return None diff --git a/swh/fuse/fs/mountpoint.py b/swh/fuse/fs/mountpoint.py --- a/swh/fuse/fs/mountpoint.py +++ b/swh/fuse/fs/mountpoint.py @@ -5,7 +5,7 @@ from dataclasses import dataclass, field import json -from typing import AsyncIterator +from typing import List from swh.fuse.fs.artifact import OBJTYPE_GETTERS from swh.fuse.fs.entry import EntryMode, FuseDirEntry, FuseEntry, FuseFileEntry @@ -21,9 +21,9 @@ mode: int = field(init=False, default=int(EntryMode.RDONLY_DIR)) depth: int = field(init=False, default=1) - async def __aiter__(self) -> AsyncIterator[FuseEntry]: - yield self.create_child(ArchiveDir) - yield self.create_child(MetaDir) + async def compute_entries(self) -> List[FuseEntry]: + entries = [self.create_child(ArchiveDir), self.create_child(MetaDir)] + return entries @dataclass @@ -46,9 +46,11 @@ swhid=swhid, ) - async def __aiter__(self) -> AsyncIterator[FuseEntry]: + async def compute_entries(self) -> List[FuseEntry]: + entries = [] async for swhid in self.fuse.cache.get_cached_swhids(): - yield self.create_child(swhid) + entries.append(self.create_child(swhid)) + return entries async def lookup(self, name: str) -> FuseEntry: entry = await super().lookup(name) @@ -76,14 +78,18 @@ name: str = field(init=False, default="meta") mode: int = field(init=False, default=int(EntryMode.RDONLY_DIR)) - async def __aiter__(self) -> AsyncIterator[FuseEntry]: + async def compute_entries(self) -> List[FuseEntry]: + entries = [] async for swhid in self.fuse.cache.get_cached_swhids(): - yield self.create_child( - MetaEntry, - name=f"{swhid}.json", - mode=int(EntryMode.RDONLY_FILE), - swhid=swhid, + entries.append( + self.create_child( + MetaEntry, + name=f"{swhid}.json", + mode=int(EntryMode.RDONLY_FILE), + swhid=swhid, + ) ) + return entries @dataclass diff --git a/swh/fuse/fuse.py b/swh/fuse/fuse.py --- a/swh/fuse/fuse.py +++ b/swh/fuse/fuse.py @@ -169,18 +169,12 @@ # opendir() uses inode as directory handle inode = fh - - # TODO: add cache on direntry list? direntry = self.inode2entry(inode) assert isinstance(direntry, FuseDirEntry) + next_id = offset + 1 - i = 0 try: - async for entry in direntry: - if i < offset: - i += 1 - continue - + async for entry in direntry.get_entries(offset): name = os.fsencode(entry.name) attrs = await self.get_attrs(entry) if not pyfuse3.readdir_reply(token, name, attrs, next_id):