Changeset View
Changeset View
Standalone View
Standalone View
swh/provenance/provenance.py
Show First 20 Lines • Show All 184 Lines • ▼ Show 20 Lines | def flush_origin_revision_layer(self) -> None: | ||||
"Unable to write %s rows to the storage. Retrying...", | "Unable to write %s rows to the storage. Retrying...", | ||||
RelationType.REV_IN_ORG, | RelationType.REV_IN_ORG, | ||||
) | ) | ||||
@statsd.timed( | @statsd.timed( | ||||
metric=BACKEND_DURATION_METRIC, tags={"method": "flush_revision_content"} | metric=BACKEND_DURATION_METRIC, tags={"method": "flush_revision_content"} | ||||
) | ) | ||||
def flush_revision_content_layer(self) -> None: | def flush_revision_content_layer(self) -> None: | ||||
# Register in the storage all content, directories and revisions that are | # Register in the storage all entities, to ensure the coming relations can | ||||
# involved in this layer's associated relations. | # properly resolve any internal reference if needed. Content and directory | ||||
cnts = { | # entries may safely be registered with their associated dates. In contrast, | ||||
src | # revision entries should be registered without date, as it is used to | ||||
for src, _, _ in self.cache["content_in_revision"] | # acknowledge that the flushing was successful. | ||||
| self.cache["content_in_directory"] | cnt_dates = { | ||||
sha1: date | |||||
for sha1, date in self.cache["content"]["data"].items() | |||||
if sha1 in self.cache["content"]["added"] and date is not None | |||||
} | } | ||||
if cnts: | if cnt_dates: | ||||
while not self.storage.content_add(cnts): | while not self.storage.content_add(cnt_dates): | ||||
statsd.increment( | statsd.increment( | ||||
metric=BACKEND_OPERATIONS_METRIC, | metric=BACKEND_OPERATIONS_METRIC, | ||||
tags={"method": "flush_revision_content_retry_content_none"}, | tags={"method": "flush_revision_content_retry_content_date"}, | ||||
) | ) | ||||
LOGGER.warning( | LOGGER.warning( | ||||
"Unable to write content entities to the storage. Retrying..." | "Unable to write content dates to the storage. Retrying..." | ||||
) | ) | ||||
dirs = {dst for _, dst, _ in self.cache["content_in_directory"]} | dir_dates = { | ||||
if dirs: | sha1: date | ||||
while not self.storage.directory_add(dirs): | for sha1, date in self.cache["directory"]["data"].items() | ||||
if sha1 in self.cache["directory"]["added"] and date is not None | |||||
} | |||||
if dir_dates: | |||||
while not self.storage.directory_add(dir_dates): | |||||
statsd.increment( | statsd.increment( | ||||
metric=BACKEND_OPERATIONS_METRIC, | metric=BACKEND_OPERATIONS_METRIC, | ||||
tags={"method": "flush_revision_content_retry_directory_none"}, | tags={"method": "flush_revision_content_retry_directory_date"}, | ||||
) | ) | ||||
LOGGER.warning( | LOGGER.warning( | ||||
"Unable to write directory entities to the storage. Retrying..." | "Unable to write directory dates to the storage. Retrying..." | ||||
) | ) | ||||
revs = { | revs = { | ||||
dst | sha1 | ||||
for _, dst, _ in self.cache["content_in_revision"] | for sha1, date in self.cache["revision"]["data"].items() | ||||
| self.cache["directory_in_revision"] | if sha1 in self.cache["revision"]["added"] and date is not None | ||||
} | } | ||||
if revs: | if revs: | ||||
while not self.storage.revision_add(revs): | while not self.storage.revision_add(revs): | ||||
statsd.increment( | statsd.increment( | ||||
metric=BACKEND_OPERATIONS_METRIC, | metric=BACKEND_OPERATIONS_METRIC, | ||||
tags={"method": "flush_revision_content_retry_revision_none"}, | tags={"method": "flush_revision_content_retry_revision_none"}, | ||||
) | ) | ||||
LOGGER.warning( | LOGGER.warning( | ||||
▲ Show 20 Lines • Show All 61 Lines • ▼ Show 20 Lines | def flush_revision_content_layer(self) -> None: | ||||
"method": "flush_revision_content_retry_directory_in_revision" | "method": "flush_revision_content_retry_directory_in_revision" | ||||
}, | }, | ||||
) | ) | ||||
LOGGER.warning( | LOGGER.warning( | ||||
"Unable to write %s rows to the storage. Retrying...", | "Unable to write %s rows to the storage. Retrying...", | ||||
RelationType.DIR_IN_REV, | RelationType.DIR_IN_REV, | ||||
) | ) | ||||
# After relations, dates for the entities can be safely set, acknowledging that | # After relations, dates for the revisions can be safely set, acknowledging that | ||||
# these entities won't need to be reprocessed in case of failure. | # these revisions won't need to be reprocessed in case of failure. | ||||
cnt_dates = { | |||||
sha1: date | |||||
for sha1, date in self.cache["content"]["data"].items() | |||||
if sha1 in self.cache["content"]["added"] and date is not None | |||||
} | |||||
if cnt_dates: | |||||
while not self.storage.content_add(cnt_dates): | |||||
statsd.increment( | |||||
metric=BACKEND_OPERATIONS_METRIC, | |||||
tags={"method": "flush_revision_content_retry_content_date"}, | |||||
) | |||||
LOGGER.warning( | |||||
"Unable to write content dates to the storage. Retrying..." | |||||
) | |||||
dir_dates = { | |||||
sha1: date | |||||
for sha1, date in self.cache["directory"]["data"].items() | |||||
if sha1 in self.cache["directory"]["added"] and date is not None | |||||
} | |||||
if dir_dates: | |||||
while not self.storage.directory_add(dir_dates): | |||||
statsd.increment( | |||||
metric=BACKEND_OPERATIONS_METRIC, | |||||
tags={"method": "flush_revision_content_retry_directory_date"}, | |||||
) | |||||
LOGGER.warning( | |||||
"Unable to write directory dates to the storage. Retrying..." | |||||
) | |||||
rev_dates = { | rev_dates = { | ||||
sha1: RevisionData(date=date, origin=None) | sha1: RevisionData(date=date, origin=None) | ||||
for sha1, date in self.cache["revision"]["data"].items() | for sha1, date in self.cache["revision"]["data"].items() | ||||
if sha1 in self.cache["revision"]["added"] and date is not None | if sha1 in self.cache["revision"]["added"] and date is not None | ||||
} | } | ||||
if rev_dates: | if rev_dates: | ||||
while not self.storage.revision_add(rev_dates): | while not self.storage.revision_add(rev_dates): | ||||
statsd.increment( | statsd.increment( | ||||
▲ Show 20 Lines • Show All 140 Lines • Show Last 20 Lines |