diff --git a/swh/provenance/provenance.py b/swh/provenance/provenance.py --- a/swh/provenance/provenance.py +++ b/swh/provenance/provenance.py @@ -190,38 +190,45 @@ metric=BACKEND_DURATION_METRIC, tags={"method": "flush_revision_content"} ) def flush_revision_content_layer(self) -> None: - # Register in the storage all content, directories and revisions that are - # involved in this layer's associated relations. - cnts = { - src - for src, _, _ in self.cache["content_in_revision"] - | self.cache["content_in_directory"] + # Register in the storage all entities, to ensure the coming relations can + # properly resolve any internal reference if needed. Content and directory + # entries may safely be registered with their associated dates. In contrast, + # revision entries should be registered without date, as it is used to + # acknowledge that the flushing was successful. + cnt_dates = { + sha1: date + for sha1, date in self.cache["content"]["data"].items() + if sha1 in self.cache["content"]["added"] and date is not None } - if cnts: - while not self.storage.content_add(cnts): + if cnt_dates: + while not self.storage.content_add(cnt_dates): statsd.increment( metric=BACKEND_OPERATIONS_METRIC, - tags={"method": "flush_revision_content_retry_content_none"}, + tags={"method": "flush_revision_content_retry_content_date"}, ) LOGGER.warning( - "Unable to write content entities to the storage. Retrying..." + "Unable to write content dates to the storage. Retrying..." ) - dirs = {dst for _, dst, _ in self.cache["content_in_directory"]} - if dirs: - while not self.storage.directory_add(dirs): + dir_dates = { + sha1: date + for sha1, date in self.cache["directory"]["data"].items() + if sha1 in self.cache["directory"]["added"] and date is not None + } + if dir_dates: + while not self.storage.directory_add(dir_dates): statsd.increment( metric=BACKEND_OPERATIONS_METRIC, - tags={"method": "flush_revision_content_retry_directory_none"}, + tags={"method": "flush_revision_content_retry_directory_date"}, ) LOGGER.warning( - "Unable to write directory entities to the storage. Retrying..." + "Unable to write directory dates to the storage. Retrying..." ) revs = { - dst - for _, dst, _ in self.cache["content_in_revision"] - | self.cache["directory_in_revision"] + sha1 + for sha1, date in self.cache["revision"]["data"].items() + if sha1 in self.cache["revision"]["added"] and date is not None } if revs: while not self.storage.revision_add(revs): @@ -299,38 +306,8 @@ RelationType.DIR_IN_REV, ) - # After relations, dates for the entities can be safely set, acknowledging that - # these entities won't need to be reprocessed in case of failure. - cnt_dates = { - sha1: date - for sha1, date in self.cache["content"]["data"].items() - if sha1 in self.cache["content"]["added"] and date is not None - } - if cnt_dates: - while not self.storage.content_add(cnt_dates): - statsd.increment( - metric=BACKEND_OPERATIONS_METRIC, - tags={"method": "flush_revision_content_retry_content_date"}, - ) - LOGGER.warning( - "Unable to write content dates to the storage. Retrying..." - ) - - dir_dates = { - sha1: date - for sha1, date in self.cache["directory"]["data"].items() - if sha1 in self.cache["directory"]["added"] and date is not None - } - if dir_dates: - while not self.storage.directory_add(dir_dates): - statsd.increment( - metric=BACKEND_OPERATIONS_METRIC, - tags={"method": "flush_revision_content_retry_directory_date"}, - ) - LOGGER.warning( - "Unable to write directory dates to the storage. Retrying..." - ) - + # After relations, dates for the revisions can be safely set, acknowledging that + # these revisions won't need to be reprocessed in case of failure. rev_dates = { sha1: RevisionData(date=date, origin=None) for sha1, date in self.cache["revision"]["data"].items()