Changeset View
Changeset View
Standalone View
Standalone View
swh/provenance/provenance.py
Show First 20 Lines • Show All 89 Lines • ▼ Show 20 Lines | class Provenance: | ||||
def clear_caches(self) -> None: | def clear_caches(self) -> None: | ||||
self.cache = new_cache() | self.cache = new_cache() | ||||
def close(self) -> None: | def close(self) -> None: | ||||
self.storage.close() | self.storage.close() | ||||
def flush(self) -> None: | def flush(self) -> None: | ||||
# Revision-content layer insertions ############################################ | self.flush_revision_content_layer() | ||||
self.flush_origin_revision_layer() | |||||
self.clear_caches() | |||||
# After relations, dates for the entities can be safely set, acknowledging that | def flush_origin_revision_layer(self) -> None: | ||||
# these entities won't need to be reprocessed in case of failure. | # Origins and revisions should be inserted first so that internal ids' | ||||
# resolution works properly. | |||||
urls = { | |||||
sha1: url | |||||
for sha1, url in self.cache["origin"]["data"].items() | |||||
if sha1 in self.cache["origin"]["added"] | |||||
} | |||||
if urls: | |||||
while not self.storage.origin_add(urls): | |||||
LOGGER.warning( | |||||
"Unable to write origins urls to the storage. Retrying..." | |||||
) | |||||
rev_orgs = { | |||||
# Destinations in this relation should match origins in the next one | |||||
**{ | |||||
src: RevisionData(date=None, origin=None) | |||||
for src in self.cache["revision_before_revision"] | |||||
}, | |||||
**{ | |||||
# This relation comes second so that non-None origins take precedence | |||||
src: RevisionData(date=None, origin=org) | |||||
for src, org in self.cache["revision_in_origin"] | |||||
}, | |||||
} | |||||
if rev_orgs: | |||||
while not self.storage.revision_add(rev_orgs): | |||||
LOGGER.warning( | |||||
"Unable to write revision entities to the storage. Retrying..." | |||||
) | |||||
# Second, flat models for revisions' histories (ie. revision-before-revision). | |||||
if self.cache["revision_before_revision"]: | |||||
rev_before_rev = { | |||||
src: {RelationData(dst=dst, path=None) for dst in dsts} | |||||
for src, dsts in self.cache["revision_before_revision"].items() | |||||
} | |||||
while not self.storage.relation_add( | |||||
RelationType.REV_BEFORE_REV, rev_before_rev | |||||
): | |||||
LOGGER.warning( | |||||
"Unable to write %s rows to the storage. Retrying...", | |||||
RelationType.REV_BEFORE_REV, | |||||
) | |||||
# Heads (ie. revision-in-origin entries) should be inserted once flat models for | |||||
# their histories were already added. This is to guarantee consistent results if | |||||
# something needs to be reprocessed due to a failure: already inserted heads | |||||
# won't get reprocessed in such a case. | |||||
if self.cache["revision_in_origin"]: | |||||
rev_in_org: Dict[Sha1Git, Set[RelationData]] = {} | |||||
for src, dst in self.cache["revision_in_origin"]: | |||||
rev_in_org.setdefault(src, set()).add(RelationData(dst=dst, path=None)) | |||||
while not self.storage.relation_add(RelationType.REV_IN_ORG, rev_in_org): | |||||
LOGGER.warning( | |||||
"Unable to write %s rows to the storage. Retrying...", | |||||
RelationType.REV_IN_ORG, | |||||
) | |||||
def flush_revision_content_layer(self) -> None: | |||||
# Register in the storage all content, directories and revisions that are | |||||
# involved in this layer's associated relations. | |||||
cnts = { | cnts = { | ||||
src | src | ||||
for src, _, _ in self.cache["content_in_revision"] | for src, _, _ in self.cache["content_in_revision"] | ||||
| self.cache["content_in_directory"] | | self.cache["content_in_directory"] | ||||
} | } | ||||
if cnts: | if cnts: | ||||
while not self.storage.content_add(cnts): | while not self.storage.content_add(cnts): | ||||
LOGGER.warning( | LOGGER.warning( | ||||
▲ Show 20 Lines • Show All 94 Lines • ▼ Show 20 Lines | def flush_revision_content_layer(self) -> None: | ||||
if sha1 in self.cache["revision"]["added"] and date is not None | if sha1 in self.cache["revision"]["added"] and date is not None | ||||
} | } | ||||
if rev_dates: | if rev_dates: | ||||
while not self.storage.revision_add(rev_dates): | while not self.storage.revision_add(rev_dates): | ||||
LOGGER.warning( | LOGGER.warning( | ||||
"Unable to write revision dates to the storage. Retrying..." | "Unable to write revision dates to the storage. Retrying..." | ||||
) | ) | ||||
# Origin-revision layer insertions ############################################# | |||||
# Origins and revisions should be inserted first so that internal ids' | |||||
# resolution works properly. | |||||
urls = { | |||||
sha1: url | |||||
for sha1, url in self.cache["origin"]["data"].items() | |||||
if sha1 in self.cache["origin"]["added"] | |||||
} | |||||
if urls: | |||||
while not self.storage.origin_add(urls): | |||||
LOGGER.warning( | |||||
"Unable to write origins urls to the storage. Retrying..." | |||||
) | |||||
rev_orgs = { | |||||
# Destinations in this relation should match origins in the next one | |||||
**{ | |||||
src: RevisionData(date=None, origin=None) | |||||
for src in self.cache["revision_before_revision"] | |||||
}, | |||||
**{ | |||||
# This relation comes second so that non-None origins take precedence | |||||
src: RevisionData(date=None, origin=org) | |||||
for src, org in self.cache["revision_in_origin"] | |||||
}, | |||||
} | |||||
if rev_orgs: | |||||
while not self.storage.revision_add(rev_orgs): | |||||
LOGGER.warning( | |||||
"Unable to write revision entities to the storage. Retrying..." | |||||
) | |||||
# Second, flat models for revisions' histories (ie. revision-before-revision). | |||||
if self.cache["revision_before_revision"]: | |||||
rev_before_rev = { | |||||
src: {RelationData(dst=dst, path=None) for dst in dsts} | |||||
for src, dsts in self.cache["revision_before_revision"].items() | |||||
} | |||||
while not self.storage.relation_add( | |||||
RelationType.REV_BEFORE_REV, rev_before_rev | |||||
): | |||||
LOGGER.warning( | |||||
"Unable to write %s rows to the storage. Retrying...", | |||||
RelationType.REV_BEFORE_REV, | |||||
) | |||||
# Heads (ie. revision-in-origin entries) should be inserted once flat models for | |||||
# their histories were already added. This is to guarantee consistent results if | |||||
# something needs to be reprocessed due to a failure: already inserted heads | |||||
# won't get reprocessed in such a case. | |||||
if self.cache["revision_in_origin"]: | |||||
rev_in_org: Dict[Sha1Git, Set[RelationData]] = {} | |||||
for src, dst in self.cache["revision_in_origin"]: | |||||
rev_in_org.setdefault(src, set()).add(RelationData(dst=dst, path=None)) | |||||
while not self.storage.relation_add(RelationType.REV_IN_ORG, rev_in_org): | |||||
LOGGER.warning( | |||||
"Unable to write %s rows to the storage. Retrying...", | |||||
RelationType.REV_IN_ORG, | |||||
) | |||||
# clear local cache ############################################################ | |||||
self.clear_caches() | |||||
def content_add_to_directory( | def content_add_to_directory( | ||||
self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes | self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes | ||||
) -> None: | ) -> None: | ||||
self.cache["content_in_directory"].add( | self.cache["content_in_directory"].add( | ||||
(blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) | (blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) | ||||
) | ) | ||||
def content_add_to_revision( | def content_add_to_revision( | ||||
▲ Show 20 Lines • Show All 129 Lines • Show Last 20 Lines |