Changeset View
Changeset View
Standalone View
Standalone View
swh/provenance/provenance.py
Show First 20 Lines • Show All 73 Lines • ▼ Show 20 Lines | class Provenance: | ||||
def clear_caches(self) -> None: | def clear_caches(self) -> None: | ||||
self.cache = new_cache() | self.cache = new_cache() | ||||
def flush(self) -> None: | def flush(self) -> None: | ||||
# Revision-content layer insertions ############################################ | # Revision-content layer insertions ############################################ | ||||
# For this layer, relations need to be inserted first so that, in case of | # For this layer, relations need to be inserted first so that, in case of | ||||
# failure, reprocessing the input does not generated an inconsistent database. | # failure, reprocessing the input does not generated an inconsistent database. | ||||
if self.cache["content_in_revision"]: | |||||
while not self.storage.relation_add( | while not self.storage.relation_add( | ||||
RelationType.CNT_EARLY_IN_REV, | RelationType.CNT_EARLY_IN_REV, | ||||
( | ( | ||||
RelationData(src=src, dst=dst, path=path) | RelationData(src=src, dst=dst, path=path) | ||||
for src, dst, path in self.cache["content_in_revision"] | for src, dst, path in self.cache["content_in_revision"] | ||||
), | ), | ||||
): | ): | ||||
logging.warning( | logging.warning( | ||||
f"Unable to write {RelationType.CNT_EARLY_IN_REV} rows to the storage. " | "Unable to write %s rows to the storage. Data: %s. Retrying...", | ||||
f"Data: {self.cache['content_in_revision']}. Retrying..." | RelationType.CNT_EARLY_IN_REV, | ||||
self.cache["content_in_revision"], | |||||
) | ) | ||||
if self.cache["content_in_directory"]: | |||||
while not self.storage.relation_add( | while not self.storage.relation_add( | ||||
RelationType.CNT_IN_DIR, | RelationType.CNT_IN_DIR, | ||||
( | ( | ||||
RelationData(src=src, dst=dst, path=path) | RelationData(src=src, dst=dst, path=path) | ||||
for src, dst, path in self.cache["content_in_directory"] | for src, dst, path in self.cache["content_in_directory"] | ||||
), | ), | ||||
): | ): | ||||
logging.warning( | logging.warning( | ||||
f"Unable to write {RelationType.CNT_IN_DIR} rows to the storage. " | "Unable to write %s rows to the storage. Data: %s. Retrying...", | ||||
f"Data: {self.cache['content_in_directory']}. Retrying..." | RelationType.CNT_IN_DIR, | ||||
self.cache["content_in_directory"], | |||||
) | ) | ||||
if self.cache["directory_in_revision"]: | |||||
while not self.storage.relation_add( | while not self.storage.relation_add( | ||||
RelationType.DIR_IN_REV, | RelationType.DIR_IN_REV, | ||||
( | ( | ||||
RelationData(src=src, dst=dst, path=path) | RelationData(src=src, dst=dst, path=path) | ||||
for src, dst, path in self.cache["directory_in_revision"] | for src, dst, path in self.cache["directory_in_revision"] | ||||
), | ), | ||||
): | ): | ||||
logging.warning( | logging.warning( | ||||
f"Unable to write {RelationType.DIR_IN_REV} rows to the storage. " | "Unable to write %s rows to the storage. Data: %s. Retrying...", | ||||
f"Data: {self.cache['directory_in_revision']}. Retrying..." | RelationType.DIR_IN_REV, | ||||
self.cache["directory_in_revision"], | |||||
) | ) | ||||
# After relations, dates for the entities can be safely set, acknowledging that | # After relations, dates for the entities can be safely set, acknowledging that | ||||
# these entities won't need to be reprocessed in case of failure. | # these entities won't need to be reprocessed in case of failure. | ||||
dates = { | dates = { | ||||
sha1: date | sha1: date | ||||
for sha1, date in self.cache["content"]["data"].items() | for sha1, date in self.cache["content"]["data"].items() | ||||
if sha1 in self.cache["content"]["added"] and date is not None | if sha1 in self.cache["content"]["added"] and date is not None | ||||
} | } | ||||
if dates: | |||||
while not self.storage.content_set_date(dates): | while not self.storage.content_set_date(dates): | ||||
logging.warning( | logging.warning( | ||||
f"Unable to write content dates to the storage. " | "Unable to write content dates to the storage. " | ||||
f"Data: {dates}. Retrying..." | "Data: %s. Retrying...", | ||||
dates, | |||||
) | ) | ||||
dates = { | dates = { | ||||
sha1: date | sha1: date | ||||
for sha1, date in self.cache["directory"]["data"].items() | for sha1, date in self.cache["directory"]["data"].items() | ||||
if sha1 in self.cache["directory"]["added"] and date is not None | if sha1 in self.cache["directory"]["added"] and date is not None | ||||
} | } | ||||
if dates: | |||||
while not self.storage.directory_set_date(dates): | while not self.storage.directory_set_date(dates): | ||||
logging.warning( | logging.warning( | ||||
f"Unable to write directory dates to the storage. " | "Unable to write directory dates to the storage. " | ||||
f"Data: {dates}. Retrying..." | "Data: %s. Retrying...", | ||||
dates, | |||||
) | ) | ||||
dates = { | dates = { | ||||
sha1: date | sha1: date | ||||
for sha1, date in self.cache["revision"]["data"].items() | for sha1, date in self.cache["revision"]["data"].items() | ||||
if sha1 in self.cache["revision"]["added"] and date is not None | if sha1 in self.cache["revision"]["added"] and date is not None | ||||
} | } | ||||
if dates: | |||||
while not self.storage.revision_set_date(dates): | while not self.storage.revision_set_date(dates): | ||||
logging.warning( | logging.warning( | ||||
f"Unable to write revision dates to the storage. " | "Unable to write revision dates to the storage. " | ||||
f"Data: {dates}. Retrying..." | "Data: %s. Retrying...", | ||||
dates, | |||||
) | ) | ||||
# Origin-revision layer insertions ############################################# | # Origin-revision layer insertions ############################################# | ||||
# Origins urls should be inserted first so that internal ids' resolution works | # Origins urls should be inserted first so that internal ids' resolution works | ||||
# properly. | # properly. | ||||
urls = { | urls = { | ||||
sha1: url | sha1: url | ||||
for sha1, url in self.cache["origin"]["data"].items() | for sha1, url in self.cache["origin"]["data"].items() | ||||
if sha1 in self.cache["origin"]["added"] | if sha1 in self.cache["origin"]["added"] | ||||
} | } | ||||
if urls: | |||||
while not self.storage.origin_set_url(urls): | while not self.storage.origin_set_url(urls): | ||||
logging.warning( | logging.warning( | ||||
f"Unable to write origins urls to the storage. " | "Unable to write origins urls to the storage. " | ||||
f"Data: {urls}. Retrying..." | "Data: %s. Retrying...", | ||||
urls, | |||||
) | ) | ||||
# Second, flat models for revisions' histories (ie. revision-before-revision). | # Second, flat models for revisions' histories (ie. revision-before-revision). | ||||
data: Iterable[RelationData] = sum( | data: Iterable[RelationData] = sum( | ||||
[ | [ | ||||
[ | [ | ||||
RelationData(src=prev, dst=next, path=None) | RelationData(src=prev, dst=next, path=None) | ||||
for next in self.cache["revision_before_revision"][prev] | for next in self.cache["revision_before_revision"][prev] | ||||
] | ] | ||||
for prev in self.cache["revision_before_revision"] | for prev in self.cache["revision_before_revision"] | ||||
], | ], | ||||
[], | [], | ||||
) | ) | ||||
if data: | |||||
while not self.storage.relation_add(RelationType.REV_BEFORE_REV, data): | while not self.storage.relation_add(RelationType.REV_BEFORE_REV, data): | ||||
logging.warning( | logging.warning( | ||||
f"Unable to write {RelationType.REV_BEFORE_REV} rows to the storage. " | "Unable to write %s rows to the storage. Data: %s. Retrying...", | ||||
f"Data: {data}. Retrying..." | RelationType.REV_BEFORE_REV, | ||||
data, | |||||
) | ) | ||||
# Heads (ie. revision-in-origin entries) should be inserted once flat models for | # Heads (ie. revision-in-origin entries) should be inserted once flat models for | ||||
# their histories were already added. This is to guarantee consistent results if | # their histories were already added. This is to guarantee consistent results if | ||||
# something needs to be reprocessed due to a failure: already inserted heads | # something needs to be reprocessed due to a failure: already inserted heads | ||||
# won't get reprocessed in such a case. | # won't get reprocessed in such a case. | ||||
data = ( | data = ( | ||||
RelationData(src=rev, dst=org, path=None) | RelationData(src=rev, dst=org, path=None) | ||||
for rev, org in self.cache["revision_in_origin"] | for rev, org in self.cache["revision_in_origin"] | ||||
) | ) | ||||
if data: | |||||
while not self.storage.relation_add(RelationType.REV_IN_ORG, data): | while not self.storage.relation_add(RelationType.REV_IN_ORG, data): | ||||
logging.warning( | logging.warning( | ||||
f"Unable to write {RelationType.REV_IN_ORG} rows to the storage. " | "Unable to write %s rows to the storage. Data: %s. Retrying...", | ||||
f"Data: {data}. Retrying..." | RelationType.REV_IN_ORG, | ||||
data, | |||||
) | ) | ||||
# Finally, preferred origins for the visited revisions are set (this step can be | # Finally, preferred origins for the visited revisions are set (this step can be | ||||
# reordered if required). | # reordered if required). | ||||
origins = { | origins = { | ||||
sha1: self.cache["revision_origin"]["data"][sha1] | sha1: self.cache["revision_origin"]["data"][sha1] | ||||
for sha1 in self.cache["revision_origin"]["added"] | for sha1 in self.cache["revision_origin"]["added"] | ||||
} | } | ||||
if origins: | |||||
while not self.storage.revision_set_origin(origins): | while not self.storage.revision_set_origin(origins): | ||||
logging.warning( | logging.warning( | ||||
f"Unable to write preferred origins to the storage. " | "Unable to write preferred origins to the storage. " | ||||
f"Data: {origins}. Retrying..." | "Data: %s. Retrying...", | ||||
origins, | |||||
) | ) | ||||
# clear local cache ############################################################ | # clear local cache ############################################################ | ||||
self.clear_caches() | self.clear_caches() | ||||
def content_add_to_directory( | def content_add_to_directory( | ||||
self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes | self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes | ||||
) -> None: | ) -> None: | ||||
self.cache["content_in_directory"].add( | self.cache["content_in_directory"].add( | ||||
▲ Show 20 Lines • Show All 130 Lines • Show Last 20 Lines |