diff --git a/swh/fuse/cache.py b/swh/fuse/cache.py --- a/swh/fuse/cache.py +++ b/swh/fuse/cache.py @@ -188,7 +188,7 @@ ) await self.conn.execute( - "insert into metadata_cache values (?, ?, ?)", + "insert or ignore into metadata_cache values (?, ?, ?)", (str(swhid), json.dumps(metadata), swhid_date), ) await self.conn.commit() @@ -281,7 +281,7 @@ cursor = await self.conn.execute(self.HISTORY_REC_QUERY, (str(swhid),),) cache = await cursor.fetchall() if not cache: - return None + return [] history = [] for row in cache: parent = row[0] @@ -315,10 +315,8 @@ logging.warning("Cannot parse object from history cache: %s", parent) return history - async def set(self, history: str) -> None: - history = history.strip() - if history: - edges = [edge.split(" ") for edge in history.split("\n")] + async def set(self, edges: List[str]) -> None: + if edges: await self.conn.executemany( "insert or ignore into history_graph values (?, ?)", edges ) diff --git a/swh/fuse/fs/artifact.py b/swh/fuse/fs/artifact.py --- a/swh/fuse/fs/artifact.py +++ b/swh/fuse/fs/artifact.py @@ -20,7 +20,7 @@ FuseSymlinkEntry, ) from swh.model.from_disk import DentryPerms -from swh.model.identifiers import CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT, SWHID +from swh.model.identifiers import CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT, SWHID, parse_swhid SWHID_REGEXP = r"swh:1:(cnt|dir|rel|rev|snp):[0-9a-f]{40}" @@ -220,10 +220,9 @@ swhid: SWHID - async def prefill_by_date_cache(self, by_date_dir: FuseDirEntry) -> None: - history = await self.fuse.get_history(self.swhid) + async def prefill_metadata_cache(self, by_date_dir, swhids) -> None: nb_api_calls = 0 - for swhid in history: + for swhid in swhids: cache = await self.fuse.cache.metadata.get(swhid) if cache: continue @@ -234,8 +233,57 @@ nb_api_calls += 1 if nb_api_calls % 100 == 0: self.fuse.cache.direntry.invalidate(by_date_dir) - # Make sure to have the latest entries once the prefilling is done - self.fuse.cache.direntry.invalidate(by_date_dir) + + async def prefill_caches(self, by_date_dir) -> None: + import requests + import functools + + try: + # Use the swh-graph API to retrieve the full history very fast + self.fuse.logger.debug("Retrieving history of %s via graph API...", self.swhid) + call = f"graph/visit/edges/{self.swhid}?edges=rev:rev" + loop = asyncio.get_event_loop() + resp = await loop.run_in_executor( + None, functools.partial(self.fuse.web_api._call, call, stream=True) + ) + # TODO: ensure chunk_size + for history in resp.iter_content(chunk_size=(50 * 2 + 2) * 500): + history = history.decode().strip() + edges = [edge.split(" ") for edge in history.split("\n")] + await self.fuse.cache.history.set(edges) + swhids = set() + for edge in edges: + if not edge: + continue + for swhid in edge: + swhids.add(parse_swhid(swhid)) + print("Start prefill", len(swhids)) + asyncio.create_task(self.prefill_metadata_cache(by_date_dir, swhids)) + except requests.HTTPError as err: + self.logger.error("Cannot fetch history for object %s: %s", swhid, err) + # Ignore exception since swh-graph does not necessarily contain the + # most recent artifacts from the archive. Computing the full history + # from the Web API is too computationally intensive so simply return + # an empty list. + except Exception as e: + print("prefill_caches excp", e) + + # async def fill_by_date_cache(self, by_date_dir: FuseDirEntry) -> None: + # history = await self.fuse.get_history(self.swhid) + # nb_api_calls = 0 + # for swhid in history: + # cache = await self.fuse.cache.metadata.get(swhid) + # if cache: + # continue + + # await self.fuse.get_metadata(swhid) + # # The by-date/ directory is cached temporarily in direntry, and + # # invalidated + updated every 100 API calls + # nb_api_calls += 1 + # if nb_api_calls % 100 == 0: + # self.fuse.cache.direntry.invalidate(by_date_dir) + # # Make sure to have the latest entries once the prefilling is done + # self.fuse.cache.direntry.invalidate(by_date_dir) async def compute_entries(self) -> AsyncIterator[FuseEntry]: by_date_dir = self.create_child( @@ -246,7 +294,7 @@ ) # Run it concurrently because of the many API calls necessary - asyncio.create_task(self.prefill_by_date_cache(by_date_dir)) + asyncio.create_task(self.prefill_caches(by_date_dir)) yield by_date_dir @@ -291,7 +339,8 @@ self.file_info_attrs["direct_io"] = True async def get_content(self) -> bytes: - history_full = await self.fuse.get_history(self.history_swhid) + # history_full = await self.fuse.get_history(self.history_swhid) + history_full = await self.fuse.cache.history.get(self.history_swhid) history_cached = await self.fuse.cache.history.get_with_date_prefix( self.history_swhid, date_prefix="" ) @@ -307,7 +356,8 @@ ) async def compute_entries(self) -> AsyncIterator[FuseEntry]: - history_full = await self.fuse.get_history(self.history_swhid) + # history_full = await self.fuse.get_history(self.history_swhid) + history_full = await self.fuse.cache.history.get(self.history_swhid) # Only check for cached revisions with the appropriate prefix, since # fetching all of them with the Web API would take too long history_cached = await self.fuse.cache.history.get_with_date_prefix( @@ -359,7 +409,8 @@ ENTRIES_REGEXP = re.compile(r"^([a-f0-9]+)|(" + SWHID_REGEXP + ")$") async def compute_entries(self) -> AsyncIterator[FuseEntry]: - history = await self.fuse.get_history(self.history_swhid) + # history = await self.fuse.get_history(self.history_swhid) + history = await self.fuse.cache.history.get(self.history_swhid) if self.prefix: root_path = self.get_relative_root_path() @@ -398,7 +449,8 @@ ENTRIES_REGEXP = re.compile(r"^([0-9]+)|(" + SWHID_REGEXP + ")$") async def compute_entries(self) -> AsyncIterator[FuseEntry]: - history = await self.fuse.get_history(self.history_swhid) + # history = await self.fuse.get_history(self.history_swhid) + history = await self.fuse.cache.history.get(self.history_swhid) if self.prefix is not None: current_page = self.prefix diff --git a/swh/fuse/fuse.py b/swh/fuse/fuse.py --- a/swh/fuse/fuse.py +++ b/swh/fuse/fuse.py @@ -126,36 +126,36 @@ self.logger.error("Cannot fetch blob for object %s: %s", swhid, err) raise - async def get_history(self, swhid: SWHID) -> List[SWHID]: - """ Retrieve a revision's history using Software Heritage Graph API """ - - if swhid.object_type != REVISION: - raise pyfuse3.FUSEError(errno.EINVAL) - - cache = await self.cache.history.get(swhid) - if cache: - self.logger.debug( - "Found history of %s in cache (%d ancestors)", swhid, len(cache) - ) - return cache - - try: - # Use the swh-graph API to retrieve the full history very fast - self.logger.debug("Retrieving history of %s via graph API...", swhid) - call = f"graph/visit/edges/{swhid}?edges=rev:rev" - loop = asyncio.get_event_loop() - history = await loop.run_in_executor(None, self.web_api._call, call) - await self.cache.history.set(history.text) - # Retrieve it from cache so it is correctly typed - res = await self.cache.history.get(swhid) - return res - except requests.HTTPError as err: - self.logger.error("Cannot fetch history for object %s: %s", swhid, err) - # Ignore exception since swh-graph does not necessarily contain the - # most recent artifacts from the archive. Computing the full history - # from the Web API is too computationally intensive so simply return - # an empty list. - return [] + # async def get_history(self, swhid: SWHID) -> List[SWHID]: + # """ Retrieve a revision's history using Software Heritage Graph API """ + + # if swhid.object_type != REVISION: + # raise pyfuse3.FUSEError(errno.EINVAL) + + # cache = await self.cache.history.get(swhid) + # if cache: + # self.logger.debug( + # "Found history of %s in cache (%d ancestors)", swhid, len(cache) + # ) + # return cache + + # try: + # # Use the swh-graph API to retrieve the full history very fast + # self.logger.debug("Retrieving history of %s via graph API...", swhid) + # call = f"graph/visit/edges/{swhid}?edges=rev:rev" + # loop = asyncio.get_event_loop() + # history = await loop.run_in_executor(None, self.web_api._call, call) + # await self.cache.history.set(history.text) + # # Retrieve it from cache so it is correctly typed + # res = await self.cache.history.get(swhid) + # return res + # except requests.HTTPError as err: + # self.logger.error("Cannot fetch history for object %s: %s", swhid, err) + # # Ignore exception since swh-graph does not necessarily contain the + # # most recent artifacts from the archive. Computing the full history + # # from the Web API is too computationally intensive so simply return + # # an empty list. + # return [] async def get_visits(self, url_encoded: str) -> List[Dict[str, Any]]: """ Retrieve origin visits given an encoded-URL using Software Heritage API """ diff --git a/swh/fuse/tests/conftest.py b/swh/fuse/tests/conftest.py --- a/swh/fuse/tests/conftest.py +++ b/swh/fuse/tests/conftest.py @@ -25,6 +25,8 @@ if not api_call.endswith("raw/") and not api_call.startswith("graph/"): data = json.dumps(data) + # TODO: change headers to mock streaming graph/ API + http_method = requests_mock.get if api_call.startswith("origin/") and api_call.endswith("get/"): http_method = requests_mock.head