Changeset View
Changeset View
Standalone View
Standalone View
swh/provenance/provenance.py
# Copyright (C) 2021 The Software Heritage developers | # Copyright (C) 2021-2022 The Software Heritage developers | ||||||||||||||||||||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||||||||||||||||||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||||||||||||||||||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||||||||||||||||||||
from datetime import datetime | from datetime import datetime | ||||||||||||||||||||||
import logging | import logging | ||||||||||||||||||||||
import os | import os | ||||||||||||||||||||||
from types import TracebackType | from types import TracebackType | ||||||||||||||||||||||
▲ Show 20 Lines • Show All 65 Lines • ▼ Show 20 Lines | return ProvenanceCache( | ||||||||||||||||||||||
origin=OriginCache(data={}, added=set()), | origin=OriginCache(data={}, added=set()), | ||||||||||||||||||||||
revision_origin=RevisionCache(data={}, added=set()), | revision_origin=RevisionCache(data={}, added=set()), | ||||||||||||||||||||||
revision_before_revision={}, | revision_before_revision={}, | ||||||||||||||||||||||
revision_in_origin=set(), | revision_in_origin=set(), | ||||||||||||||||||||||
) | ) | ||||||||||||||||||||||
class Provenance: | class Provenance: | ||||||||||||||||||||||
MAX_CACHE_ELEMENTS = 100000 | |||||||||||||||||||||||
def __init__(self, storage: ProvenanceStorageInterface) -> None: | def __init__(self, storage: ProvenanceStorageInterface) -> None: | ||||||||||||||||||||||
self.storage = storage | self.storage = storage | ||||||||||||||||||||||
self.cache = new_cache() | self.cache = new_cache() | ||||||||||||||||||||||
def __enter__(self) -> ProvenanceInterface: | def __enter__(self) -> ProvenanceInterface: | ||||||||||||||||||||||
self.open() | self.open() | ||||||||||||||||||||||
return self | return self | ||||||||||||||||||||||
def __exit__( | def __exit__( | ||||||||||||||||||||||
self, | self, | ||||||||||||||||||||||
exc_type: Optional[Type[BaseException]], | exc_type: Optional[Type[BaseException]], | ||||||||||||||||||||||
exc_val: Optional[BaseException], | exc_val: Optional[BaseException], | ||||||||||||||||||||||
exc_tb: Optional[TracebackType], | exc_tb: Optional[TracebackType], | ||||||||||||||||||||||
) -> None: | ) -> None: | ||||||||||||||||||||||
self.close() | self.close() | ||||||||||||||||||||||
def _flush_limit_reached(self) -> bool: | |||||||||||||||||||||||
return max(self._get_cache_stats().values()) > self.MAX_CACHE_ELEMENTS | |||||||||||||||||||||||
def _get_cache_stats(self) -> Dict[str, int]: | |||||||||||||||||||||||
return { | |||||||||||||||||||||||
k: len(v["data"]) | |||||||||||||||||||||||
if (isinstance(v, dict) and v.get("data") is not None) | |||||||||||||||||||||||
else len(v) # type: ignore | |||||||||||||||||||||||
for (k, v) in self.cache.items() | |||||||||||||||||||||||
} | |||||||||||||||||||||||
def clear_caches(self) -> None: | def clear_caches(self) -> None: | ||||||||||||||||||||||
self.cache = new_cache() | self.cache = new_cache() | ||||||||||||||||||||||
vlorentzUnsubmitted Done Inline Actions
vlorentz: | |||||||||||||||||||||||
Done Inline Actionsnice thanks vsellier: nice thanks | |||||||||||||||||||||||
def close(self) -> None: | def close(self) -> None: | ||||||||||||||||||||||
self.storage.close() | self.storage.close() | ||||||||||||||||||||||
@statsd.timed(metric=BACKEND_DURATION_METRIC, tags={"method": "flush"}) | @statsd.timed(metric=BACKEND_DURATION_METRIC, tags={"method": "flush"}) | ||||||||||||||||||||||
def flush(self) -> None: | def flush(self) -> None: | ||||||||||||||||||||||
self.flush_revision_content_layer() | self.flush_revision_content_layer() | ||||||||||||||||||||||
self.flush_origin_revision_layer() | self.flush_origin_revision_layer() | ||||||||||||||||||||||
self.clear_caches() | self.clear_caches() | ||||||||||||||||||||||
def flush_if_necessary(self) -> bool: | |||||||||||||||||||||||
Done Inline Actions
ardumont: | |||||||||||||||||||||||
"""Flush if the number of cached information reached a limit.""" | |||||||||||||||||||||||
LOGGER.info("Cache stats: %s", self._get_cache_stats()) | |||||||||||||||||||||||
if self._flush_limit_reached(): | |||||||||||||||||||||||
self.flush() | |||||||||||||||||||||||
Done Inline Actions
would this work? vlorentz: would this work? | |||||||||||||||||||||||
Done Inline Actionsit works with a test on the data presence because of directory_flatten and revision_before_revision which are simple dict vsellier: it works with a test on the data presence because of `directory_flatten` and… | |||||||||||||||||||||||
return True | |||||||||||||||||||||||
else: | |||||||||||||||||||||||
return False | |||||||||||||||||||||||
@statsd.timed( | @statsd.timed( | ||||||||||||||||||||||
metric=BACKEND_DURATION_METRIC, tags={"method": "flush_origin_revision"} | metric=BACKEND_DURATION_METRIC, tags={"method": "flush_origin_revision"} | ||||||||||||||||||||||
) | ) | ||||||||||||||||||||||
def flush_origin_revision_layer(self) -> None: | def flush_origin_revision_layer(self) -> None: | ||||||||||||||||||||||
# Origins and revisions should be inserted first so that internal ids' | # Origins and revisions should be inserted first so that internal ids' | ||||||||||||||||||||||
# resolution works properly. | # resolution works properly. | ||||||||||||||||||||||
urls = { | urls = { | ||||||||||||||||||||||
sha1: url | sha1: url | ||||||||||||||||||||||
▲ Show 20 Lines • Show All 379 Lines • Show Last 20 Lines |