diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -10,6 +10,7 @@ from typing_extensions import Literal, TypedDict +from swh.core.statsd import statsd from swh.model.model import Sha1Git from .interface import ( @@ -80,11 +81,91 @@ def close(self) -> None: self.storage.close() + @statsd.timed( + metric="swh_provenance_backend_accesstime_seconds", + tags={"method": "flush"}, + ) def flush(self) -> None: - # Revision-content layer insertions ############################################ + self.flush_revision_content_layer() + self.flush_origin_revision_layer() + self.clear_caches() - # After relations, dates for the entities can be safely set, acknowledging that - # these entities won't need to be reprocessed in case of failure. + @statsd.timed( + metric="swh_provenance_backend_accesstime_seconds", + tags={"method": "flush_origin_revision"}, + ) + def flush_origin_revision_layer(self) -> None: + # Origins and revisions should be inserted first so that internal ids' + # resolution works properly. + urls = { + sha1: url + for sha1, url in self.cache["origin"]["data"].items() + if sha1 in self.cache["origin"]["added"] + } + if urls: + while not self.storage.origin_add(urls): + LOGGER.warning( + "Unable to write origins urls to the storage. Retrying..." + ) + + rev_orgs = { + # Destinations in this relation should match origins in the next one + **{ + src: RevisionData(date=None, origin=None) + for src in self.cache["revision_before_revision"] + }, + **{ + src: RevisionData(date=None, origin=None) + for src, _ in self.cache["revision_in_origin"] + }, + # This dictionary comes last so that non-null origins take precedence + **{ + src: RevisionData(date=None, origin=org) + for src, org in self.cache["revision_origin"]["data"].items() + if src in self.cache["revision_origin"]["added"] + }, + } + if rev_orgs: + while not self.storage.revision_add(rev_orgs): + LOGGER.warning( + "Unable to write revision entities to the storage. Retrying..." + ) + + # Second, flat models for revisions' histories (ie. revision-before-revision). + if self.cache["revision_before_revision"]: + rev_before_rev = { + src: {RelationData(dst=dst, path=None) for dst in dsts} + for src, dsts in self.cache["revision_before_revision"].items() + } + while not self.storage.relation_add( + RelationType.REV_BEFORE_REV, rev_before_rev + ): + LOGGER.warning( + "Unable to write %s rows to the storage. Retrying...", + RelationType.REV_BEFORE_REV, + ) + + # Heads (ie. revision-in-origin entries) should be inserted once flat models for + # their histories were already added. This is to guarantee consistent results if + # something needs to be reprocessed due to a failure: already inserted heads + # won't get reprocessed in such a case. + if self.cache["revision_in_origin"]: + rev_in_org: Dict[Sha1Git, Set[RelationData]] = {} + for src, dst in self.cache["revision_in_origin"]: + rev_in_org.setdefault(src, set()).add(RelationData(dst=dst, path=None)) + while not self.storage.relation_add(RelationType.REV_IN_ORG, rev_in_org): + LOGGER.warning( + "Unable to write %s rows to the storage. Retrying...", + RelationType.REV_IN_ORG, + ) + + @statsd.timed( + metric="swh_provenance_backend_accesstime_seconds", + tags={"method": "flush_revision_content"}, + ) + def flush_revision_content_layer(self) -> None: + # Register in the storage all content, directories and revisions that are + # involved in this layer's associated relations. cnts = { src for src, _, _ in self.cache["content_in_revision"] @@ -195,70 +276,6 @@ "Unable to write revision dates to the storage. Retrying..." ) - # Origin-revision layer insertions ############################################# - - # Origins and revisions should be inserted first so that internal ids' - # resolution works properly. - urls = { - sha1: url - for sha1, url in self.cache["origin"]["data"].items() - if sha1 in self.cache["origin"]["added"] - } - if urls: - while not self.storage.origin_add(urls): - LOGGER.warning( - "Unable to write origins urls to the storage. Retrying..." - ) - - rev_orgs = { - # Destinations in this relation should match origins in the next one - **{ - src: RevisionData(date=None, origin=None) - for src in self.cache["revision_before_revision"] - }, - **{ - # This relation comes second so that non-None origins take precedence - src: RevisionData(date=None, origin=org) - for src, org in self.cache["revision_in_origin"] - }, - } - if rev_orgs: - while not self.storage.revision_add(rev_orgs): - LOGGER.warning( - "Unable to write revision entities to the storage. Retrying..." - ) - - # Second, flat models for revisions' histories (ie. revision-before-revision). - if self.cache["revision_before_revision"]: - rev_before_rev = { - src: {RelationData(dst=dst, path=None) for dst in dsts} - for src, dsts in self.cache["revision_before_revision"].items() - } - while not self.storage.relation_add( - RelationType.REV_BEFORE_REV, rev_before_rev - ): - LOGGER.warning( - "Unable to write %s rows to the storage. Retrying...", - RelationType.REV_BEFORE_REV, - ) - - # Heads (ie. revision-in-origin entries) should be inserted once flat models for - # their histories were already added. This is to guarantee consistent results if - # something needs to be reprocessed due to a failure: already inserted heads - # won't get reprocessed in such a case. - if self.cache["revision_in_origin"]: - rev_in_org: Dict[Sha1Git, Set[RelationData]] = {} - for src, dst in self.cache["revision_in_origin"]: - rev_in_org.setdefault(src, set()).add(RelationData(dst=dst, path=None)) - while not self.storage.relation_add(RelationType.REV_IN_ORG, rev_in_org): - LOGGER.warning( - "Unable to write %s rows to the storage. Retrying...", - RelationType.REV_IN_ORG, - ) - - # clear local cache ############################################################ - self.clear_caches() - def content_add_to_directory( self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes ) -> None: