Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7066560
D4569.id.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
8 KB
Subscribers
None
D4569.id.diff
View Options
diff --git a/swh/fuse/cache.py b/swh/fuse/cache.py
--- a/swh/fuse/cache.py
+++ b/swh/fuse/cache.py
@@ -11,16 +11,17 @@
from pathlib import Path
import re
import sys
-from typing import Any, AsyncGenerator, Dict, List, Optional
+from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple
import aiosqlite
+import dateutil.parser
from psutil import virtual_memory
from swh.fuse.fs.artifact import RevisionHistoryShardByDate
from swh.fuse.fs.entry import FuseDirEntry, FuseEntry
from swh.fuse.fs.mountpoint import ArchiveDir, MetaDir
from swh.model.exceptions import ValidationError
-from swh.model.identifiers import SWHID, parse_swhid
+from swh.model.identifiers import REVISION, SWHID, parse_swhid
from swh.web.client.client import ORIGIN_VISIT, typify_json
@@ -46,9 +47,10 @@
self.cache_conf = cache_conf
async def __aenter__(self):
+ # TODO: unified name for metadata/history?
self.metadata = MetadataCache(self.cache_conf["metadata"])
self.blob = BlobCache(self.cache_conf["blob"])
- self.history = HistoryCache(self.cache_conf["history"])
+ self.history = HistoryCache(self.cache_conf["metadata"])
self.direntry = DirEntryCache(self.cache_conf["direntry"])
await self.metadata.__aenter__()
await self.blob.__aenter__()
@@ -90,11 +92,13 @@
async def __aenter__(self):
# In-memory (thus temporary) caching is useful for testing purposes
if self.conf.get("in-memory", False):
- path = ":memory:"
+ path = "file::memory:?cache=shared"
+ uri = True
else:
path = Path(self.conf["path"])
path.parent.mkdir(parents=True, exist_ok=True)
- self.conn = await aiosqlite.connect(path)
+ uri = False
+ self.conn = await aiosqlite.connect(path, uri=uri)
return self
async def __aexit__(self, type=None, val=None, tb=None) -> None:
@@ -110,7 +114,7 @@
async def __aenter__(self):
await super().__aenter__()
await self.conn.execute(
- "create table if not exists metadata_cache (swhid, metadata)"
+ "create table if not exists metadata_cache (swhid, metadata, date)"
)
await self.conn.execute(
"create table if not exists visits_cache (url, metadata)"
@@ -145,10 +149,18 @@
return None
async def set(self, swhid: SWHID, metadata: Any) -> None:
+ swhid_date = ""
+ if swhid.object_type == REVISION:
+ date = dateutil.parser.parse(metadata["date"])
+ swhid_date = RevisionHistoryShardByDate.DATE_FMT.format(
+ year=date.year, month=date.month, day=date.day
+ )
+
await self.conn.execute(
- "insert into metadata_cache values (?, ?)",
- (str(swhid), json.dumps(metadata)),
+ "insert into metadata_cache values (?, ?, ?)",
+ (str(swhid), json.dumps(metadata), swhid_date),
)
+
await self.conn.commit()
async def set_visits(self, url_encoded: str, visits: List[Dict[str, Any]]) -> None:
@@ -157,23 +169,6 @@
)
await self.conn.commit()
- async def get_cached_subset(self, swhids: List[SWHID]) -> List[SWHID]:
- swhids_str = ",".join(f'"{x}"' for x in swhids)
- cursor = await self.conn.execute(
- f"select swhid from metadata_cache where swhid in ({swhids_str})"
- )
- cache = await cursor.fetchall()
-
- res = []
- for row in cache:
- swhid = row[0]
- try:
- res.append(parse_swhid(swhid))
- except ValidationError:
- logging.warning("Cannot parse object from metadata cache: %s", swhid)
-
- return res
-
class BlobCache(AbstractCache):
""" The blob cache map SWHIDs of type `cnt` to the bytes of their archived
@@ -233,31 +228,53 @@
await self.conn.commit()
return self
+ HISTORY_REC_QUERY = """
+ with recursive
+ dfs(node) AS (
+ values(?)
+ union
+ select history_graph.dst
+ from history_graph
+ join dfs on history_graph.src = dfs.node
+ )
+ -- Do not keep the root node since it is not an ancestor
+ select * from dfs limit -1 offset 1
+ """
+
async def get(self, swhid: SWHID) -> Optional[List[SWHID]]:
+ cursor = await self.conn.execute(self.HISTORY_REC_QUERY, (str(swhid),),)
+ cache = await cursor.fetchall()
+ if not cache:
+ return None
+ history = []
+ for row in cache:
+ parent = row[0]
+ try:
+ history.append(parse_swhid(parent))
+ except ValidationError:
+ logging.warning("Cannot parse object from history cache: %s", parent)
+ return history
+
+ async def get_with_date_prefix(
+ self, swhid: SWHID, date_prefix
+ ) -> List[Tuple[SWHID, str]]:
cursor = await self.conn.execute(
- """
- with recursive
- dfs(node) AS (
- values(?)
- union
- select history_graph.dst
- from history_graph
- join dfs on history_graph.src = dfs.node
- )
- -- Do not keep the root node since it is not an ancestor
- select * from dfs limit -1 offset 1
+ f"""
+ select swhid, date from ( {self.HISTORY_REC_QUERY} ) as history
+ join metadata_cache on history.node = metadata_cache.swhid
+ where metadata_cache.date like '{date_prefix}%'
""",
(str(swhid),),
)
cache = await cursor.fetchall()
if not cache:
- return None
+ return []
history = []
for row in cache:
- parent = row[0]
+ parent, date = row[0], row[1]
try:
- history.append(parse_swhid(parent))
+ history.append((parse_swhid(parent), date))
except ValidationError:
logging.warning("Cannot parse object from history cache: %s", parent)
return history
diff --git a/swh/fuse/cli.py b/swh/fuse/cli.py
--- a/swh/fuse/cli.py
+++ b/swh/fuse/cli.py
@@ -30,7 +30,6 @@
"cache": {
"metadata": {"path": str(CACHE_HOME_DIR / "swh/fuse/metadata.sqlite")},
"blob": {"path": str(CACHE_HOME_DIR / "swh/fuse/blob.sqlite")},
- "history": {"path": str(CACHE_HOME_DIR / "swh/fuse/history.sqlite")},
"direntry": {"maxram": "10%"},
},
"web-api": {
@@ -200,5 +199,5 @@
pass
conf = ctx.obj["config"]
- for cache_name in ["blob", "metadata", "history"]:
+ for cache_name in ["blob", "metadata"]:
rm_cache(conf, cache_name)
diff --git a/swh/fuse/fs/artifact.py b/swh/fuse/fs/artifact.py
--- a/swh/fuse/fs/artifact.py
+++ b/swh/fuse/fs/artifact.py
@@ -265,20 +265,17 @@
async def compute_entries(self) -> AsyncIterator[FuseEntry]:
history = await self.fuse.get_history(self.history_swhid)
- # Only check for cached revisions since fetching all of them with the
- # Web API would take too long
- swhids = await self.fuse.cache.metadata.get_cached_subset(history)
+ # Only check for cached revisions with the appropriate prefix, since
+ # fetching all of them with the Web API would take too long
+ swhids = await self.fuse.cache.history.get_with_date_prefix(
+ self.history_swhid, date_prefix=self.prefix
+ )
depth = self.prefix.count("/")
root_path = self.get_relative_root_path()
sharded_dirs = set()
- for swhid in swhids:
- meta = await self.fuse.cache.metadata.get(swhid)
- date = meta["date"]
- sharded_name = self.DATE_FMT.format(
- year=date.year, month=date.month, day=date.day
- )
+ for (swhid, sharded_name) in swhids:
if not sharded_name.startswith(self.prefix):
continue
@@ -301,6 +298,7 @@
history_swhid=self.history_swhid,
)
+ # TODO: store len(history) somewhere to avoid recompute?
self.is_status_done = len(swhids) == len(history)
if not self.is_status_done and depth == 0:
yield self.create_child(
diff --git a/swh/fuse/tests/conftest.py b/swh/fuse/tests/conftest.py
--- a/swh/fuse/tests/conftest.py
+++ b/swh/fuse/tests/conftest.py
@@ -35,11 +35,7 @@
tmpfile = NamedTemporaryFile(suffix=".swh-fuse-test.yml")
config = {
- "cache": {
- "metadata": {"in-memory": True},
- "blob": {"in-memory": True},
- "history": {"in-memory": True},
- },
+ "cache": {"metadata": {"in-memory": True}, "blob": {"in-memory": True},},
"web-api": {"url": API_URL, "auth-token": None},
"json-indent": None,
}
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Nov 5 2024, 3:09 PM (12 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3231444
Attached To
D4569: FUSE: cache: add 'date' column in metadata_cache for history/by-date
Event Timeline
Log In to Comment