diff --git a/swh/fuse/cache.py b/swh/fuse/cache.py --- a/swh/fuse/cache.py +++ b/swh/fuse/cache.py @@ -13,7 +13,7 @@ import re import sqlite3 import sys -from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple +from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple, Union import aiosqlite import dateutil.parser @@ -27,6 +27,19 @@ from swh.web.client.client import ORIGIN_VISIT, typify_json +async def db_connect(conf: Dict[str, Any]) -> aiosqlite.Connection: + # In-memory (thus temporary) caching is useful for testing purposes + if conf.get("in-memory", False): + path = "file::memory:?cache=shared" + uri = True + else: + path = conf["path"] + Path(path).parent.mkdir(parents=True, exist_ok=True) + uri = False + + return await aiosqlite.connect(path, uri=uri, detect_types=sqlite3.PARSE_DECLTYPES) + + class FuseCache: """SwhFS retrieves both metadata and file contents from the Software Heritage archive via the network. In order to obtain reasonable performances several caches are used @@ -49,14 +62,12 @@ self.cache_conf = cache_conf async def __aenter__(self): - # History and raw metadata share the same SQLite db - self.metadata = MetadataCache(self.cache_conf["metadata"]) - self.history = HistoryCache(self.cache_conf["metadata"]) - self.blob = BlobCache(self.cache_conf["blob"]) + self.metadata = await MetadataCache(self.cache_conf["metadata"]).__aenter__() + self.blob = await BlobCache(self.cache_conf["blob"]).__aenter__() + # History and raw metadata share the same SQLite db (hence the same connection) + self.history = await HistoryCache(self.metadata.conn).__aenter__() self.direntry = DirEntryCache(self.cache_conf["direntry"]) - await self.metadata.__aenter__() - await self.blob.__aenter__() - await self.history.__aenter__() + return self async def __aexit__(self, type=None, val=None, tb=None) -> None: @@ -85,28 +96,29 @@ class AbstractCache(ABC): - """ Abstract cache implementation to share common behavior between cache - types (such as: YAML config parsing, SQLite context manager) """ + """ Abstract cache implementation to share common behavior between cache types """ - def __init__(self, conf: Dict[str, Any]): - self.conf = conf + DB_SCHEMA: str = "" + + def __init__(self, init_data: Union[aiosqlite.Connection, Dict[str, Any]]): + # The cache can either be initialized from a config dict or from an + # existing SQLite connection + self.init_data = init_data async def __aenter__(self): - # In-memory (thus temporary) caching is useful for testing purposes - if self.conf.get("in-memory", False): - path = "file::memory:?cache=shared" - uri = True + if isinstance(self.init_data, aiosqlite.Connection): + self.conn = self.init_data else: - path = Path(self.conf["path"]) - path.parent.mkdir(parents=True, exist_ok=True) - uri = False - self.conn = await aiosqlite.connect( - path, uri=uri, detect_types=sqlite3.PARSE_DECLTYPES - ) + self.conn = await db_connect(self.init_data) + + await self.conn.executescript(self.DB_SCHEMA) + await self.conn.commit() return self async def __aexit__(self, type=None, val=None, tb=None) -> None: - await self.conn.close() + # In case we were given an existing connection, do not close it here + if not isinstance(self.init_data, aiosqlite.Connection): + await self.conn.close() class MetadataCache(AbstractCache): @@ -130,12 +142,6 @@ ); """ - async def __aenter__(self): - await super().__aenter__() - await self.conn.executescript(self.DB_SCHEMA) - await self.conn.commit() - return self - async def get(self, swhid: SWHID, typify: bool = True) -> Any: cursor = await self.conn.execute( "select metadata from metadata_cache where swhid=?", (str(swhid),) @@ -210,12 +216,6 @@ ); """ - async def __aenter__(self): - await super().__aenter__() - await self.conn.executescript(self.DB_SCHEMA) - await self.conn.commit() - return self - async def get(self, swhid: SWHID) -> Optional[bytes]: cursor = await self.conn.execute( "select blob from blob_cache where swhid=?", (str(swhid),) @@ -257,12 +257,6 @@ create index if not exists idx_history on history_graph(src); """ - async def __aenter__(self): - await super().__aenter__() - await self.conn.executescript(self.DB_SCHEMA) - await self.conn.commit() - return self - HISTORY_REC_QUERY = """ with recursive dfs(node) AS (