diff --git a/swh/provenance/interface.py b/swh/provenance/interface.py --- a/swh/provenance/interface.py +++ b/swh/provenance/interface.py @@ -249,6 +249,12 @@ """Flush internal cache to the underlying `storage`.""" ... + def flush_if_necessary(self) -> bool: + """Flush internal cache to the underlying `storage`, only if the cache is too big. + return true if the cache was flushed, false otherwise + """ + ... + def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ) -> None: diff --git a/swh/provenance/origin.py b/swh/provenance/origin.py --- a/swh/provenance/origin.py +++ b/swh/provenance/origin.py @@ -105,6 +105,12 @@ LOGGER.debug("Adding revision to origin") provenance.revision_add_to_origin(origin, revision) + cache_flush_start = datetime.now() + if provenance.flush_if_necessary(): + LOGGER.info( + "Intermediate cache flush in %s", (datetime.now() - cache_flush_start) + ) + end = datetime.now() LOGGER.info("Processed origin %s in %s", origin.url, (end - start)) diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -80,6 +80,8 @@ class Provenance: + MAX_CACHE_ELEMENTS = 100000 + def __init__(self, storage: ProvenanceStorageInterface) -> None: self.storage = storage self.cache = new_cache() @@ -96,6 +98,36 @@ ) -> None: self.close() + def _flush_limit_reached(self) -> bool: + return ( + len(self.cache["content"]["data"]) > self.MAX_CACHE_ELEMENTS + or len(self.cache["directory"]["data"]) > self.MAX_CACHE_ELEMENTS + or len(self.cache["directory_flatten"]) > self.MAX_CACHE_ELEMENTS + or len(self.cache["revision"]["data"]) > self.MAX_CACHE_ELEMENTS + or len(self.cache["content_in_revision"]) > self.MAX_CACHE_ELEMENTS + or len(self.cache["content_in_directory"]) > self.MAX_CACHE_ELEMENTS + or len(self.cache["directory_in_revision"]) > self.MAX_CACHE_ELEMENTS + or len(self.cache["origin"]["data"]) > self.MAX_CACHE_ELEMENTS + or len(self.cache["revision_origin"]["data"]) > self.MAX_CACHE_ELEMENTS + or len(self.cache["revision_before_revision"]) > self.MAX_CACHE_ELEMENTS + or len(self.cache["revision_in_origin"]) > self.MAX_CACHE_ELEMENTS + ) + + def _get_cache_stats(self) -> Dict[str, int]: + return { + "content": len(self.cache["content"]["data"]), + "directory": len(self.cache["directory"]["data"]), + "directory_flatten": len(self.cache["directory_flatten"]), + "revision": len(self.cache["revision"]["data"]), + "content_in_revision": len(self.cache["content_in_revision"]), + "content_in_directory": len(self.cache["content_in_directory"]), + "directory_in_revision": len(self.cache["directory_in_revision"]), + "origin": len(self.cache["origin"]["data"]), + "revision_origin": len(self.cache["revision_origin"]["data"]), + "revision_before_revision": len(self.cache["revision_before_revision"]), + "revision_in_origin": len(self.cache["revision_in_origin"]), + } + def clear_caches(self) -> None: self.cache = new_cache() @@ -108,6 +140,15 @@ self.flush_origin_revision_layer() self.clear_caches() + def flush_if_necessary(self) -> bool: + """Check if the number of cached information reached a limit and flush if yes""" + LOGGER.info("Cache stats: %s", self._get_cache_stats()) + if self._flush_limit_reached(): + self.flush() + return True + else: + return False + @statsd.timed( metric=BACKEND_DURATION_METRIC, tags={"method": "flush_origin_revision"} )