Changeset View
Changeset View
Standalone View
Standalone View
swh/fuse/cache.py
# Copyright (C) 2020 The Software Heritage developers | # Copyright (C) 2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from abc import ABC | from abc import ABC | ||||
from collections import OrderedDict | |||||
from dataclasses import dataclass, field | |||||
import json | import json | ||||
import logging | import logging | ||||
from pathlib import Path | from pathlib import Path | ||||
import re | |||||
import sys | |||||
from typing import Any, AsyncGenerator, Dict, List, Optional | from typing import Any, AsyncGenerator, Dict, List, Optional | ||||
import aiosqlite | import aiosqlite | ||||
from psutil import virtual_memory | |||||
from pympler import asizeof | |||||
from swh.fuse.fs.entry import FuseDirEntry, FuseEntry | |||||
from swh.fuse.fs.mountpoint import ArchiveDir, MetaDir | |||||
from swh.model.exceptions import ValidationError | from swh.model.exceptions import ValidationError | ||||
from swh.model.identifiers import SWHID, parse_swhid | from swh.model.identifiers import SWHID, parse_swhid | ||||
from swh.web.client.client import typify_json | from swh.web.client.client import typify_json | ||||
class FuseCache: | class FuseCache: | ||||
"""SwhFS retrieves both metadata and file contents from the Software Heritage archive | """SwhFS retrieves both metadata and file contents from the Software Heritage archive | ||||
via the network. In order to obtain reasonable performances several caches are used | via the network. In order to obtain reasonable performances several caches are used | ||||
Show All 14 Lines | class FuseCache: | ||||
def __init__(self, cache_conf: Dict[str, Any]): | def __init__(self, cache_conf: Dict[str, Any]): | ||||
self.cache_conf = cache_conf | self.cache_conf = cache_conf | ||||
async def __aenter__(self): | async def __aenter__(self): | ||||
self.metadata = MetadataCache(self.cache_conf["metadata"]) | self.metadata = MetadataCache(self.cache_conf["metadata"]) | ||||
self.blob = BlobCache(self.cache_conf["blob"]) | self.blob = BlobCache(self.cache_conf["blob"]) | ||||
self.history = HistoryCache(self.cache_conf["history"]) | self.history = HistoryCache(self.cache_conf["history"]) | ||||
self.direntry = DirEntryCache(self.cache_conf["direntry"]) | |||||
await self.metadata.__aenter__() | await self.metadata.__aenter__() | ||||
await self.blob.__aenter__() | await self.blob.__aenter__() | ||||
await self.history.__aenter__() | await self.history.__aenter__() | ||||
return self | return self | ||||
async def __aexit__(self, type=None, val=None, tb=None) -> None: | async def __aexit__(self, type=None, val=None, tb=None) -> None: | ||||
await self.metadata.__aexit__() | await self.metadata.__aexit__() | ||||
await self.blob.__aexit__() | await self.blob.__aexit__() | ||||
▲ Show 20 Lines • Show All 154 Lines • ▼ Show 20 Lines | class HistoryCache(AbstractCache): | ||||
async def set(self, history: str) -> None: | async def set(self, history: str) -> None: | ||||
history = history.strip() | history = history.strip() | ||||
edges = [edge.split(" ") for edge in history.split("\n")] | edges = [edge.split(" ") for edge in history.split("\n")] | ||||
await self.conn.executemany( | await self.conn.executemany( | ||||
"insert or ignore into history_graph values (?, ?)", edges | "insert or ignore into history_graph values (?, ?)", edges | ||||
) | ) | ||||
await self.conn.commit() | await self.conn.commit() | ||||
class DirEntryCache: | |||||
""" The direntry cache map inode representing directories to the entries | |||||
they contain. Each entry comes with its name as well as file attributes | |||||
(i.e., all its needed to perform a detailed directory listing). | |||||
Additional attributes of each directory entry should be looked up on a entry | |||||
by entry basis, possibly hitting other caches. | |||||
The direntry cache for a given dir is populated, at the latest, when the | |||||
content of the directory is listed. More aggressive prefetching might | |||||
happen. For instance, when first opening a dir a recursive listing of it can | |||||
be retrieved from the remote backend and used to recursively populate the | |||||
direntry cache for all (transitive) sub-directories. """ | |||||
@dataclass | |||||
class LRU(OrderedDict): | |||||
max_ram: int | |||||
used_ram: int = field(init=False, default=0) | |||||
object_size: Dict[Any, int] = field(init=False, default_factory=dict) | |||||
def __getitem__(self, key: Any) -> Any: | |||||
value = super().__getitem__(key) | |||||
self.move_to_end(key) | |||||
return value | |||||
def __setitem__(self, key: Any, value: Any) -> None: | |||||
if key in self: | |||||
self.move_to_end(key) | |||||
else: | |||||
nb_bytes = asizeof.asizeof(value) | |||||
self.used_ram += nb_bytes | |||||
self.object_size[key] = nb_bytes | |||||
super().__setitem__(key, value) | |||||
while self.used_ram > self.max_ram and self: | |||||
oldest = next(iter(self)) | |||||
self.used_ram -= self.object_size[oldest] | |||||
del self[oldest] | |||||
def __init__(self, conf: Dict[str, Any]): | |||||
m = re.match(r"(\d+)\s*(.+)\s*", conf["maxram"]) | |||||
if not m: | |||||
logging.error(f"Cannot parse direntry maxram config: {conf['maxram']}") | |||||
sys.exit(1) | |||||
num = float(m.group(1)) | |||||
unit = m.group(2).upper() | |||||
if unit == "%": | |||||
max_ram = int(num * virtual_memory().available / 100) | |||||
else: | |||||
units = {"B": 1, "KB": 10 ** 3, "MB": 10 ** 6, "GB": 10 ** 9} | |||||
max_ram = int(float(num) * units[unit]) | |||||
self.lru_cache = self.LRU(max_ram) | |||||
async def get(self, direntry: FuseDirEntry) -> List[FuseEntry]: | |||||
try: | |||||
return self.lru_cache[direntry.inode] | |||||
except KeyError: | |||||
entries = [x async for x in direntry] | |||||
self.set(direntry, entries) | |||||
return entries | |||||
def set(self, direntry: FuseDirEntry, entries: List[FuseEntry]) -> None: | |||||
if isinstance(direntry, ArchiveDir) or isinstance(direntry, MetaDir): | |||||
# The `archive/` and `meta/` are populated on the fly so we should | |||||
# never cache them | |||||
pass | |||||
else: | |||||
# TODO: store entries as dict referenced by name (except for history/)? | |||||
self.lru_cache[direntry.inode] = entries |