diff --git a/swh/provenance/interface.py b/swh/provenance/interface.py --- a/swh/provenance/interface.py +++ b/swh/provenance/interface.py @@ -1,4 +1,4 @@ -# Copyright (C) 2021 The Software Heritage developers +# Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -249,6 +249,13 @@ """Flush internal cache to the underlying `storage`.""" ... + def flush_if_necessary(self) -> bool: + """Flush internal cache to the underlying `storage`, if the cache reached + a threshold (MAX_CACHE_ELEMENTS). + Return True if the cache is flushed, false otherwise. + """ + ... + def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ) -> None: diff --git a/swh/provenance/origin.py b/swh/provenance/origin.py --- a/swh/provenance/origin.py +++ b/swh/provenance/origin.py @@ -1,4 +1,4 @@ -# Copyright (C) 2021 The Software Heritage developers +# Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -105,6 +105,12 @@ LOGGER.debug("Adding revision to origin") provenance.revision_add_to_origin(origin, revision) + cache_flush_start = datetime.now() + if provenance.flush_if_necessary(): + LOGGER.info( + "Intermediate cache flush in %s", (datetime.now() - cache_flush_start) + ) + end = datetime.now() LOGGER.info("Processed origin %s in %s", origin.url, (end - start)) diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -1,4 +1,4 @@ -# Copyright (C) 2021 The Software Heritage developers +# Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -80,6 +80,8 @@ class Provenance: + MAX_CACHE_ELEMENTS = 100000 + def __init__(self, storage: ProvenanceStorageInterface) -> None: self.storage = storage self.cache = new_cache() @@ -96,6 +98,17 @@ ) -> None: self.close() + def _flush_limit_reached(self) -> bool: + return max(self._get_cache_stats().values()) > self.MAX_CACHE_ELEMENTS + + def _get_cache_stats(self) -> Dict[str, int]: + return { + k: len(v["data"]) + if (isinstance(v, dict) and v.get("data") is not None) + else len(v) # type: ignore + for (k, v) in self.cache.items() + } + def clear_caches(self) -> None: self.cache = new_cache() @@ -108,6 +121,15 @@ self.flush_origin_revision_layer() self.clear_caches() + def flush_if_necessary(self) -> bool: + """Flush if the number of cached information reached a limit.""" + LOGGER.info("Cache stats: %s", self._get_cache_stats()) + if self._flush_limit_reached(): + self.flush() + return True + else: + return False + @statsd.timed( metric=BACKEND_DURATION_METRIC, tags={"method": "flush_origin_revision"} )