Changeset View
Changeset View
Standalone View
Standalone View
swh/fuse/cache.py
Show All 21 Lines | |||||
from swh.fuse.fs.artifact import RevisionHistoryShardByDate | from swh.fuse.fs.artifact import RevisionHistoryShardByDate | ||||
from swh.fuse.fs.entry import FuseDirEntry, FuseEntry | from swh.fuse.fs.entry import FuseDirEntry, FuseEntry | ||||
from swh.fuse.fs.mountpoint import CacheDir, OriginDir | from swh.fuse.fs.mountpoint import CacheDir, OriginDir | ||||
from swh.model.exceptions import ValidationError | from swh.model.exceptions import ValidationError | ||||
from swh.model.identifiers import REVISION, SWHID, parse_swhid | from swh.model.identifiers import REVISION, SWHID, parse_swhid | ||||
from swh.web.client.client import ORIGIN_VISIT, typify_json | from swh.web.client.client import ORIGIN_VISIT, typify_json | ||||
async def db_connect(conf: Dict[str, Any]) -> aiosqlite.Connection: | |||||
# In-memory (thus temporary) caching is useful for testing purposes | |||||
if conf.get("in-memory", False): | |||||
path = "file::memory:?cache=shared" | |||||
uri = True | |||||
else: | |||||
path = conf["path"] | |||||
Path(path).parent.mkdir(parents=True, exist_ok=True) | |||||
uri = False | |||||
return await aiosqlite.connect(path, uri=uri, detect_types=sqlite3.PARSE_DECLTYPES) | |||||
class FuseCache: | class FuseCache: | ||||
"""SwhFS retrieves both metadata and file contents from the Software Heritage archive | """SwhFS retrieves both metadata and file contents from the Software Heritage archive | ||||
via the network. In order to obtain reasonable performances several caches are used | via the network. In order to obtain reasonable performances several caches are used | ||||
to minimize network transfer. | to minimize network transfer. | ||||
Caches are stored on disk in SQLite databases located at | Caches are stored on disk in SQLite databases located at | ||||
`$XDG_CACHE_HOME/swh/fuse/`. | `$XDG_CACHE_HOME/swh/fuse/`. | ||||
All caches are persistent (i.e., they survive the restart of the SwhFS process) and | All caches are persistent (i.e., they survive the restart of the SwhFS process) and | ||||
global (i.e., they are shared by concurrent SwhFS processes). | global (i.e., they are shared by concurrent SwhFS processes). | ||||
We assume that no cache *invalidation* is necessary, due to intrinsic | We assume that no cache *invalidation* is necessary, due to intrinsic | ||||
properties of the Software Heritage archive, such as integrity verification | properties of the Software Heritage archive, such as integrity verification | ||||
and append-only archive changes. To clean the caches one can just remove the | and append-only archive changes. To clean the caches one can just remove the | ||||
corresponding files from disk. | corresponding files from disk. | ||||
""" | """ | ||||
def __init__(self, cache_conf: Dict[str, Any]): | def __init__(self, cache_conf: Dict[str, Any]): | ||||
self.cache_conf = cache_conf | self.cache_conf = cache_conf | ||||
async def __aenter__(self): | async def __aenter__(self): | ||||
# History and raw metadata share the same SQLite db | self.metadata = await MetadataCache( | ||||
self.metadata = MetadataCache(self.cache_conf["metadata"]) | conf=self.cache_conf["metadata"] | ||||
self.history = HistoryCache(self.cache_conf["metadata"]) | ).__aenter__() | ||||
self.blob = BlobCache(self.cache_conf["blob"]) | self.blob = await BlobCache(conf=self.cache_conf["blob"]).__aenter__() | ||||
# History and raw metadata share the same SQLite db (hence the same connection) | |||||
self.history = await HistoryCache( | |||||
conf=self.cache_conf["metadata"], conn=self.metadata.conn | |||||
).__aenter__() | |||||
self.direntry = DirEntryCache(self.cache_conf["direntry"]) | self.direntry = DirEntryCache(self.cache_conf["direntry"]) | ||||
await self.metadata.__aenter__() | |||||
await self.blob.__aenter__() | |||||
await self.history.__aenter__() | |||||
return self | return self | ||||
async def __aexit__(self, type=None, val=None, tb=None) -> None: | async def __aexit__(self, type=None, val=None, tb=None) -> None: | ||||
await self.metadata.__aexit__() | await self.metadata.__aexit__() | ||||
await self.blob.__aexit__() | await self.blob.__aexit__() | ||||
await self.history.__aexit__() | await self.history.__aexit__() | ||||
async def get_cached_swhids(self) -> AsyncGenerator[SWHID, None]: | async def get_cached_swhids(self) -> AsyncGenerator[SWHID, None]: | ||||
Show All 12 Lines | async def get_cached_visits(self) -> AsyncGenerator[str, None]: | ||||
cursor = await self.metadata.conn.execute("select url from visits_cache") | cursor = await self.metadata.conn.execute("select url from visits_cache") | ||||
urls = await cursor.fetchall() | urls = await cursor.fetchall() | ||||
for raw_url in urls: | for raw_url in urls: | ||||
yield raw_url[0] | yield raw_url[0] | ||||
class AbstractCache(ABC): | class AbstractCache(ABC): | ||||
""" Abstract cache implementation to share common behavior between cache | """ Abstract cache implementation to share common behavior between cache types """ | ||||
types (such as: YAML config parsing, SQLite context manager) """ | |||||
def __init__(self, conf: Dict[str, Any]): | DB_SCHEMA: str = "" | ||||
conf: Dict[str, Any] | |||||
conn: aiosqlite.Connection | |||||
def __init__( | |||||
self, conf: Dict[str, Any], conn: Optional[aiosqlite.Connection] = None | |||||
): | |||||
self.conf = conf | self.conf = conf | ||||
self.init_conn = conn | |||||
async def __aenter__(self): | async def __aenter__(self): | ||||
# In-memory (thus temporary) caching is useful for testing purposes | if self.init_conn is None: | ||||
if self.conf.get("in-memory", False): | self.conn = await db_connect(self.conf) | ||||
path = "file::memory:?cache=shared" | |||||
uri = True | |||||
else: | else: | ||||
path = Path(self.conf["path"]) | self.conn = self.init_conn | ||||
path.parent.mkdir(parents=True, exist_ok=True) | |||||
uri = False | await self.conn.executescript(self.DB_SCHEMA) | ||||
self.conn = await aiosqlite.connect( | await self.conn.commit() | ||||
path, uri=uri, detect_types=sqlite3.PARSE_DECLTYPES | |||||
) | |||||
return self | return self | ||||
async def __aexit__(self, type=None, val=None, tb=None) -> None: | async def __aexit__(self, type=None, val=None, tb=None) -> None: | ||||
# In case we were given an existing connection, do not close it here | |||||
if self.init_conn is None: | |||||
await self.conn.close() | await self.conn.close() | ||||
class MetadataCache(AbstractCache): | class MetadataCache(AbstractCache): | ||||
""" The metadata cache map each artifact to the complete metadata of the | """ The metadata cache map each artifact to the complete metadata of the | ||||
referenced object. This is analogous to what is available in | referenced object. This is analogous to what is available in | ||||
`archive/<SWHID>.json` file (and generally used as data source for returning | `archive/<SWHID>.json` file (and generally used as data source for returning | ||||
the content of those files). Artifacts are identified using their SWHIDs, or | the content of those files). Artifacts are identified using their SWHIDs, or | ||||
in the case of origin visits, using their URLs. """ | in the case of origin visits, using their URLs. """ | ||||
DB_SCHEMA = """ | DB_SCHEMA = """ | ||||
create table if not exists metadata_cache ( | create table if not exists metadata_cache ( | ||||
swhid text not null primary key, | swhid text not null primary key, | ||||
metadata blob, | metadata blob, | ||||
date text | date text | ||||
); | ); | ||||
create table if not exists visits_cache ( | create table if not exists visits_cache ( | ||||
url text not null primary key, | url text not null primary key, | ||||
metadata blob, | metadata blob, | ||||
itime timestamp -- insertion time | itime timestamp -- insertion time | ||||
); | ); | ||||
""" | """ | ||||
async def __aenter__(self): | |||||
await super().__aenter__() | |||||
await self.conn.executescript(self.DB_SCHEMA) | |||||
await self.conn.commit() | |||||
return self | |||||
async def get(self, swhid: SWHID, typify: bool = True) -> Any: | async def get(self, swhid: SWHID, typify: bool = True) -> Any: | ||||
cursor = await self.conn.execute( | cursor = await self.conn.execute( | ||||
"select metadata from metadata_cache where swhid=?", (str(swhid),) | "select metadata from metadata_cache where swhid=?", (str(swhid),) | ||||
) | ) | ||||
cache = await cursor.fetchone() | cache = await cursor.fetchone() | ||||
if cache: | if cache: | ||||
metadata = json.loads(cache[0]) | metadata = json.loads(cache[0]) | ||||
return typify_json(metadata, swhid.object_type) if typify else metadata | return typify_json(metadata, swhid.object_type) if typify else metadata | ||||
▲ Show 20 Lines • Show All 58 Lines • ▼ Show 20 Lines | class BlobCache(AbstractCache): | ||||
DB_SCHEMA = """ | DB_SCHEMA = """ | ||||
create table if not exists blob_cache ( | create table if not exists blob_cache ( | ||||
swhid text not null primary key, | swhid text not null primary key, | ||||
blob blob | blob blob | ||||
); | ); | ||||
""" | """ | ||||
async def __aenter__(self): | |||||
await super().__aenter__() | |||||
await self.conn.executescript(self.DB_SCHEMA) | |||||
await self.conn.commit() | |||||
return self | |||||
async def get(self, swhid: SWHID) -> Optional[bytes]: | async def get(self, swhid: SWHID) -> Optional[bytes]: | ||||
cursor = await self.conn.execute( | cursor = await self.conn.execute( | ||||
"select blob from blob_cache where swhid=?", (str(swhid),) | "select blob from blob_cache where swhid=?", (str(swhid),) | ||||
) | ) | ||||
cache = await cursor.fetchone() | cache = await cursor.fetchone() | ||||
if cache: | if cache: | ||||
blob = cache[0] | blob = cache[0] | ||||
return blob | return blob | ||||
Show All 25 Lines | DB_SCHEMA = """ | ||||
create table if not exists history_graph ( | create table if not exists history_graph ( | ||||
src text not null, | src text not null, | ||||
dst text not null, | dst text not null, | ||||
unique(src, dst) | unique(src, dst) | ||||
); | ); | ||||
create index if not exists idx_history on history_graph(src); | create index if not exists idx_history on history_graph(src); | ||||
""" | """ | ||||
async def __aenter__(self): | |||||
await super().__aenter__() | |||||
await self.conn.executescript(self.DB_SCHEMA) | |||||
await self.conn.commit() | |||||
return self | |||||
HISTORY_REC_QUERY = """ | HISTORY_REC_QUERY = """ | ||||
with recursive | with recursive | ||||
dfs(node) AS ( | dfs(node) AS ( | ||||
values(?) | values(?) | ||||
union | union | ||||
select history_graph.dst | select history_graph.dst | ||||
from history_graph | from history_graph | ||||
join dfs on history_graph.src = dfs.node | join dfs on history_graph.src = dfs.node | ||||
▲ Show 20 Lines • Show All 127 Lines • Show Last 20 Lines |