diff --git a/swh/storage/proxies/filter.py b/swh/storage/proxies/filter.py index 14bff3cc..ff73e850 100644 --- a/swh/storage/proxies/filter.py +++ b/swh/storage/proxies/filter.py @@ -16,6 +16,10 @@ from swh.model.model import ( ) from swh.storage import get_storage from swh.storage.interface import StorageInterface +import logging +import time + +logger = logging.getLogger(__name__) class FilteringProxyStorage: @@ -45,6 +49,9 @@ class FilteringProxyStorage: return getattr(self.storage, key) def content_add(self, content: List[Content]) -> Dict[str, int]: + logger.error(f"content add entry count: {len(content)}") + tic = time.perf_counter() + empty_stat = { "content:add": 0, "content:add:bytes": 0, @@ -54,49 +61,106 @@ class FilteringProxyStorage: contents_to_add = self._filter_missing_contents(content) if not contents_to_add: return empty_stat - return self.storage.content_add( + + toc = time.perf_counter() + missing_duration = toc-tic + logger.error(f"content add missing: {len(contents_to_add)} ({missing_duration:0.4f})") + res = self.storage.content_add( [x for x in content if x.sha256 in contents_to_add] ) + tac = time.perf_counter() + add_duration = tac-toc + logger.error(f"content added in {add_duration:0.4f}") + logger.error(f"CSV:content;{len(content)};{toc-tic:0.4f};{len(contents_to_add)};{tac-toc:0.4f}") + return res def skipped_content_add(self, content: List[SkippedContent]) -> Dict[str, int]: + logger.error(f"skipped_content add entry count: {len(content)}") + tic = time.perf_counter() empty_stat = {"skipped_content:add": 0} if not content: return empty_stat contents_to_add = self._filter_missing_skipped_contents(content) if not contents_to_add and not any(c.sha1_git is None for c in content): return empty_stat - return self.storage.skipped_content_add( + + toc = time.perf_counter() + missing_duration = toc-tic + logger.error(f"skipped_content add missing: {len(contents_to_add)} ({missing_duration:0.4f})") + + res = self.storage.skipped_content_add( [x for x in content if x.sha1_git is None or x.sha1_git in contents_to_add] ) + tac = time.perf_counter() + add_duration = tac-toc + logger.error(f"skipped_content added in {add_duration:0.4f}") + logger.error(f"CSV:skipped_content;{len(content)};{toc-tic:0.4f};{len(contents_to_add)};{tac-toc:0.4f}") + return res def directory_add(self, directories: List[Directory]) -> Dict[str, int]: + logger.error(f"directory add entry count: {len(directories)}") + tic = time.perf_counter() empty_stat = {"directory:add": 0} if not directories: return empty_stat missing_ids = self._filter_missing_ids("directory", (d.id for d in directories)) if not missing_ids: return empty_stat - return self.storage.directory_add( + + toc = time.perf_counter() + missing_duration = toc-tic + logger.error(f"directory add missing: {len(missing_ids)} ({missing_duration:0.4f})") + + res = self.storage.directory_add( [d for d in directories if d.id in missing_ids] ) + tac = time.perf_counter() + add_duration = tac-toc + logger.error(f"directory added in {add_duration:0.4f}") + logger.error(f"CSV:directory;{len(directories)};{toc-tic:0.4f};{len(missing_ids)};{tac-toc:0.4f}") + return res def revision_add(self, revisions: List[Revision]) -> Dict[str, int]: + logger.error(f"reslease add entry count: {len(revisions)}") + tic = time.perf_counter() empty_stat = {"revision:add": 0} if not revisions: return empty_stat missing_ids = self._filter_missing_ids("revision", (r.id for r in revisions)) if not missing_ids: return empty_stat - return self.storage.revision_add([r for r in revisions if r.id in missing_ids]) + + toc = time.perf_counter() + missing_duration = toc-tic + logger.error(f"revision add missing: {len(revisions)} ({missing_duration:0.4f})") + + res = self.storage.revision_add([r for r in revisions if r.id in missing_ids]) + tac = time.perf_counter() + add_duration = tac-toc + logger.error(f"revision added in {add_duration:0.4f}") + logger.error(f"CSV:revision;{len(revisions)};{toc-tic:0.4f};{len(missing_ids)};{tac-toc:0.4f}") + return res def release_add(self, releases: List[Release]) -> Dict[str, int]: + logger.error(f"reslease add entry count: {len(releases)}") + tic = time.perf_counter() empty_stat = {"release:add": 0} if not releases: return empty_stat missing_ids = self._filter_missing_ids("release", (r.id for r in releases)) if not missing_ids: return empty_stat - return self.storage.release_add([r for r in releases if r.id in missing_ids]) + + toc = time.perf_counter() + missing_duration = toc-tic + logger.error(f"release add missing: {len(releases)} ({missing_duration:0.4f})") + + res = self.storage.release_add([r for r in releases if r.id in missing_ids]) + tac = time.perf_counter() + add_duration = tac-toc + logger.error(f"release added in {add_duration:0.4f}") + logger.error(f"CSV:release;{len(res)};{toc-tic:0.4f};{len(releases)};{tac-toc:0.4f}") + return res def _filter_missing_contents(self, contents: List[Content]) -> Set[bytes]: """Return only the content keys missing from swh