Changeset View
Changeset View
Standalone View
Standalone View
swh/fuse/cache.py
# Copyright (C) 2020 The Software Heritage developers | # Copyright (C) 2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from abc import ABC | from abc import ABC | ||||
from collections import OrderedDict | from collections import OrderedDict | ||||
from dataclasses import dataclass, field | from dataclasses import dataclass, field | ||||
from datetime import datetime | from datetime import datetime | ||||
import json | import json | ||||
import logging | import logging | ||||
from pathlib import Path | from pathlib import Path | ||||
import re | import re | ||||
import sqlite3 | import sqlite3 | ||||
import sys | import sys | ||||
from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple | from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple, Union | ||||
import aiosqlite | import aiosqlite | ||||
import dateutil.parser | import dateutil.parser | ||||
from psutil import virtual_memory | from psutil import virtual_memory | ||||
from swh.fuse.fs.artifact import RevisionHistoryShardByDate | from swh.fuse.fs.artifact import RevisionHistoryShardByDate | ||||
from swh.fuse.fs.entry import FuseDirEntry, FuseEntry | from swh.fuse.fs.entry import FuseDirEntry, FuseEntry | ||||
from swh.fuse.fs.mountpoint import CacheDir, OriginDir | from swh.fuse.fs.mountpoint import CacheDir, OriginDir | ||||
from swh.model.exceptions import ValidationError | from swh.model.exceptions import ValidationError | ||||
from swh.model.identifiers import REVISION, SWHID, parse_swhid | from swh.model.identifiers import REVISION, SWHID, parse_swhid | ||||
from swh.web.client.client import ORIGIN_VISIT, typify_json | from swh.web.client.client import ORIGIN_VISIT, typify_json | ||||
async def db_connect(conf: Dict[str, Any]) -> aiosqlite.Connection: | |||||
# In-memory (thus temporary) caching is useful for testing purposes | |||||
if conf.get("in-memory", False): | |||||
path = "file::memory:?cache=shared" | |||||
uri = True | |||||
else: | |||||
path = conf["path"] | |||||
Path(path).parent.mkdir(parents=True, exist_ok=True) | |||||
uri = False | |||||
return await aiosqlite.connect(path, uri=uri, detect_types=sqlite3.PARSE_DECLTYPES) | |||||
class FuseCache: | class FuseCache: | ||||
"""SwhFS retrieves both metadata and file contents from the Software Heritage archive | """SwhFS retrieves both metadata and file contents from the Software Heritage archive | ||||
via the network. In order to obtain reasonable performances several caches are used | via the network. In order to obtain reasonable performances several caches are used | ||||
to minimize network transfer. | to minimize network transfer. | ||||
Caches are stored on disk in SQLite databases located at | Caches are stored on disk in SQLite databases located at | ||||
`$XDG_CACHE_HOME/swh/fuse/`. | `$XDG_CACHE_HOME/swh/fuse/`. | ||||
All caches are persistent (i.e., they survive the restart of the SwhFS process) and | All caches are persistent (i.e., they survive the restart of the SwhFS process) and | ||||
global (i.e., they are shared by concurrent SwhFS processes). | global (i.e., they are shared by concurrent SwhFS processes). | ||||
We assume that no cache *invalidation* is necessary, due to intrinsic | We assume that no cache *invalidation* is necessary, due to intrinsic | ||||
properties of the Software Heritage archive, such as integrity verification | properties of the Software Heritage archive, such as integrity verification | ||||
and append-only archive changes. To clean the caches one can just remove the | and append-only archive changes. To clean the caches one can just remove the | ||||
corresponding files from disk. | corresponding files from disk. | ||||
""" | """ | ||||
def __init__(self, cache_conf: Dict[str, Any]): | def __init__(self, cache_conf: Dict[str, Any]): | ||||
self.cache_conf = cache_conf | self.cache_conf = cache_conf | ||||
async def __aenter__(self): | async def __aenter__(self): | ||||
# History and raw metadata share the same SQLite db | self.metadata = await MetadataCache(self.cache_conf["metadata"]).__aenter__() | ||||
self.metadata = MetadataCache(self.cache_conf["metadata"]) | self.blob = await BlobCache(self.cache_conf["blob"]).__aenter__() | ||||
self.history = HistoryCache(self.cache_conf["metadata"]) | # History and raw metadata share the same SQLite db (hence the same connection) | ||||
self.blob = BlobCache(self.cache_conf["blob"]) | self.history = await HistoryCache(self.metadata.conn).__aenter__() | ||||
self.direntry = DirEntryCache(self.cache_conf["direntry"]) | self.direntry = DirEntryCache(self.cache_conf["direntry"]) | ||||
await self.metadata.__aenter__() | |||||
await self.blob.__aenter__() | |||||
await self.history.__aenter__() | |||||
return self | return self | ||||
async def __aexit__(self, type=None, val=None, tb=None) -> None: | async def __aexit__(self, type=None, val=None, tb=None) -> None: | ||||
await self.metadata.__aexit__() | await self.metadata.__aexit__() | ||||
await self.blob.__aexit__() | await self.blob.__aexit__() | ||||
await self.history.__aexit__() | await self.history.__aexit__() | ||||
async def get_cached_swhids(self) -> AsyncGenerator[SWHID, None]: | async def get_cached_swhids(self) -> AsyncGenerator[SWHID, None]: | ||||
Show All 12 Lines | async def get_cached_visits(self) -> AsyncGenerator[str, None]: | ||||
cursor = await self.metadata.conn.execute("select url from visits_cache") | cursor = await self.metadata.conn.execute("select url from visits_cache") | ||||
urls = await cursor.fetchall() | urls = await cursor.fetchall() | ||||
for raw_url in urls: | for raw_url in urls: | ||||
yield raw_url[0] | yield raw_url[0] | ||||
class AbstractCache(ABC): | class AbstractCache(ABC): | ||||
""" Abstract cache implementation to share common behavior between cache | """ Abstract cache implementation to share common behavior between cache types """ | ||||
types (such as: YAML config parsing, SQLite context manager) """ | |||||
def __init__(self, conf: Dict[str, Any]): | DB_SCHEMA: str = "" | ||||
self.conf = conf | |||||
def __init__(self, init_data: Union[aiosqlite.Connection, Dict[str, Any]]): | |||||
# The cache can either be initialized from a config dict or from an | |||||
# existing SQLite connection | |||||
self.init_data = init_data | |||||
async def __aenter__(self): | async def __aenter__(self): | ||||
# In-memory (thus temporary) caching is useful for testing purposes | if isinstance(self.init_data, aiosqlite.Connection): | ||||
if self.conf.get("in-memory", False): | self.conn = self.init_data | ||||
path = "file::memory:?cache=shared" | |||||
uri = True | |||||
else: | else: | ||||
path = Path(self.conf["path"]) | self.conn = await db_connect(self.init_data) | ||||
path.parent.mkdir(parents=True, exist_ok=True) | |||||
uri = False | await self.conn.executescript(self.DB_SCHEMA) | ||||
self.conn = await aiosqlite.connect( | await self.conn.commit() | ||||
path, uri=uri, detect_types=sqlite3.PARSE_DECLTYPES | |||||
) | |||||
return self | return self | ||||
async def __aexit__(self, type=None, val=None, tb=None) -> None: | async def __aexit__(self, type=None, val=None, tb=None) -> None: | ||||
# In case we were given an existing connection, do not close it here | |||||
if not isinstance(self.init_data, aiosqlite.Connection): | |||||
await self.conn.close() | await self.conn.close() | ||||
class MetadataCache(AbstractCache): | class MetadataCache(AbstractCache): | ||||
""" The metadata cache map each artifact to the complete metadata of the | """ The metadata cache map each artifact to the complete metadata of the | ||||
referenced object. This is analogous to what is available in | referenced object. This is analogous to what is available in | ||||
`archive/<SWHID>.json` file (and generally used as data source for returning | `archive/<SWHID>.json` file (and generally used as data source for returning | ||||
the content of those files). Artifacts are identified using their SWHIDs, or | the content of those files). Artifacts are identified using their SWHIDs, or | ||||
in the case of origin visits, using their URLs. """ | in the case of origin visits, using their URLs. """ | ||||
DB_SCHEMA = """ | DB_SCHEMA = """ | ||||
create table if not exists metadata_cache ( | create table if not exists metadata_cache ( | ||||
swhid text not null primary key, | swhid text not null primary key, | ||||
metadata blob, | metadata blob, | ||||
date text | date text | ||||
); | ); | ||||
create table if not exists visits_cache ( | create table if not exists visits_cache ( | ||||
url text not null primary key, | url text not null primary key, | ||||
metadata blob, | metadata blob, | ||||
itime timestamp -- insertion time | itime timestamp -- insertion time | ||||
); | ); | ||||
""" | """ | ||||
async def __aenter__(self): | |||||
await super().__aenter__() | |||||
await self.conn.executescript(self.DB_SCHEMA) | |||||
await self.conn.commit() | |||||
return self | |||||
async def get(self, swhid: SWHID, typify: bool = True) -> Any: | async def get(self, swhid: SWHID, typify: bool = True) -> Any: | ||||
cursor = await self.conn.execute( | cursor = await self.conn.execute( | ||||
"select metadata from metadata_cache where swhid=?", (str(swhid),) | "select metadata from metadata_cache where swhid=?", (str(swhid),) | ||||
) | ) | ||||
cache = await cursor.fetchone() | cache = await cursor.fetchone() | ||||
if cache: | if cache: | ||||
metadata = json.loads(cache[0]) | metadata = json.loads(cache[0]) | ||||
return typify_json(metadata, swhid.object_type) if typify else metadata | return typify_json(metadata, swhid.object_type) if typify else metadata | ||||
▲ Show 20 Lines • Show All 58 Lines • ▼ Show 20 Lines | class BlobCache(AbstractCache): | ||||
DB_SCHEMA = """ | DB_SCHEMA = """ | ||||
create table if not exists blob_cache ( | create table if not exists blob_cache ( | ||||
swhid text not null primary key, | swhid text not null primary key, | ||||
blob blob | blob blob | ||||
); | ); | ||||
""" | """ | ||||
async def __aenter__(self): | |||||
await super().__aenter__() | |||||
await self.conn.executescript(self.DB_SCHEMA) | |||||
await self.conn.commit() | |||||
return self | |||||
async def get(self, swhid: SWHID) -> Optional[bytes]: | async def get(self, swhid: SWHID) -> Optional[bytes]: | ||||
cursor = await self.conn.execute( | cursor = await self.conn.execute( | ||||
"select blob from blob_cache where swhid=?", (str(swhid),) | "select blob from blob_cache where swhid=?", (str(swhid),) | ||||
) | ) | ||||
cache = await cursor.fetchone() | cache = await cursor.fetchone() | ||||
if cache: | if cache: | ||||
blob = cache[0] | blob = cache[0] | ||||
return blob | return blob | ||||
Show All 25 Lines | DB_SCHEMA = """ | ||||
create table if not exists history_graph ( | create table if not exists history_graph ( | ||||
src text not null, | src text not null, | ||||
dst text not null, | dst text not null, | ||||
unique(src, dst) | unique(src, dst) | ||||
); | ); | ||||
create index if not exists idx_history on history_graph(src); | create index if not exists idx_history on history_graph(src); | ||||
""" | """ | ||||
async def __aenter__(self): | |||||
await super().__aenter__() | |||||
await self.conn.executescript(self.DB_SCHEMA) | |||||
await self.conn.commit() | |||||
return self | |||||
HISTORY_REC_QUERY = """ | HISTORY_REC_QUERY = """ | ||||
with recursive | with recursive | ||||
dfs(node) AS ( | dfs(node) AS ( | ||||
values(?) | values(?) | ||||
union | union | ||||
select history_graph.dst | select history_graph.dst | ||||
from history_graph | from history_graph | ||||
join dfs on history_graph.src = dfs.node | join dfs on history_graph.src = dfs.node | ||||
▲ Show 20 Lines • Show All 127 Lines • Show Last 20 Lines |