Changeset View
Changeset View
Standalone View
Standalone View
swh/provenance/provenance.py
# Copyright (C) 2021 The Software Heritage developers | # Copyright (C) 2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from datetime import datetime | from datetime import datetime | ||||
import logging | import logging | ||||
import os | import os | ||||
from typing import Dict, Generator, Iterable, Optional, Set, Tuple | from typing import Dict, Generator, Iterable, Optional, Set, Tuple | ||||
from typing_extensions import Literal, TypedDict | from typing_extensions import Literal, TypedDict | ||||
from swh.core.statsd import statsd | |||||
from swh.model.model import Sha1Git | from swh.model.model import Sha1Git | ||||
from .interface import ( | from .interface import ( | ||||
ProvenanceResult, | ProvenanceResult, | ||||
ProvenanceStorageInterface, | ProvenanceStorageInterface, | ||||
RelationData, | RelationData, | ||||
RelationType, | RelationType, | ||||
RevisionData, | RevisionData, | ||||
▲ Show 20 Lines • Show All 54 Lines • ▼ Show 20 Lines | def __init__(self, storage: ProvenanceStorageInterface) -> None: | ||||
self.cache = new_cache() | self.cache = new_cache() | ||||
def clear_caches(self) -> None: | def clear_caches(self) -> None: | ||||
self.cache = new_cache() | self.cache = new_cache() | ||||
def close(self) -> None: | def close(self) -> None: | ||||
self.storage.close() | self.storage.close() | ||||
@statsd.timed( | |||||
metric="swh_provenance_backend_accesstime_seconds", | |||||
tags={"method": "flush"}, | |||||
) | |||||
def flush(self) -> None: | def flush(self) -> None: | ||||
# Revision-content layer insertions ############################################ | self.flush_revision_content_layer() | ||||
self.flush_origin_revision_layer() | |||||
self.clear_caches() | |||||
# After relations, dates for the entities can be safely set, acknowledging that | @statsd.timed( | ||||
# these entities won't need to be reprocessed in case of failure. | metric="swh_provenance_backend_accesstime_seconds", | ||||
tags={"method": "flush_origin_revision"}, | |||||
) | |||||
def flush_origin_revision_layer(self) -> None: | |||||
# Origins and revisions should be inserted first so that internal ids' | |||||
# resolution works properly. | |||||
urls = { | |||||
sha1: url | |||||
for sha1, url in self.cache["origin"]["data"].items() | |||||
if sha1 in self.cache["origin"]["added"] | |||||
} | |||||
if urls: | |||||
while not self.storage.origin_add(urls): | |||||
LOGGER.warning( | |||||
"Unable to write origins urls to the storage. Retrying..." | |||||
) | |||||
rev_orgs = { | |||||
# Destinations in this relation should match origins in the next one | |||||
**{ | |||||
src: RevisionData(date=None, origin=None) | |||||
for src in self.cache["revision_before_revision"] | |||||
}, | |||||
**{ | |||||
src: RevisionData(date=None, origin=None) | |||||
for src, _ in self.cache["revision_in_origin"] | |||||
}, | |||||
# This dictionary comes last so that non-null origins take precedence | |||||
**{ | |||||
src: RevisionData(date=None, origin=org) | |||||
for src, org in self.cache["revision_origin"]["data"].items() | |||||
if src in self.cache["revision_origin"]["added"] | |||||
}, | |||||
douardda: It's a bit hard to tell since the fact the definition of `flush_origin_revision_layer` comes… | |||||
aevisoAuthorUnsubmitted Done Inline ActionsAdded a new diff for that. Please check it aeviso: Added a new diff for that. Please check it | |||||
} | |||||
if rev_orgs: | |||||
while not self.storage.revision_add(rev_orgs): | |||||
LOGGER.warning( | |||||
"Unable to write revision entities to the storage. Retrying..." | |||||
) | |||||
# Second, flat models for revisions' histories (ie. revision-before-revision). | |||||
if self.cache["revision_before_revision"]: | |||||
rev_before_rev = { | |||||
src: {RelationData(dst=dst, path=None) for dst in dsts} | |||||
for src, dsts in self.cache["revision_before_revision"].items() | |||||
} | |||||
while not self.storage.relation_add( | |||||
RelationType.REV_BEFORE_REV, rev_before_rev | |||||
): | |||||
LOGGER.warning( | |||||
"Unable to write %s rows to the storage. Retrying...", | |||||
RelationType.REV_BEFORE_REV, | |||||
) | |||||
# Heads (ie. revision-in-origin entries) should be inserted once flat models for | |||||
# their histories were already added. This is to guarantee consistent results if | |||||
# something needs to be reprocessed due to a failure: already inserted heads | |||||
# won't get reprocessed in such a case. | |||||
if self.cache["revision_in_origin"]: | |||||
rev_in_org: Dict[Sha1Git, Set[RelationData]] = {} | |||||
for src, dst in self.cache["revision_in_origin"]: | |||||
rev_in_org.setdefault(src, set()).add(RelationData(dst=dst, path=None)) | |||||
while not self.storage.relation_add(RelationType.REV_IN_ORG, rev_in_org): | |||||
LOGGER.warning( | |||||
"Unable to write %s rows to the storage. Retrying...", | |||||
RelationType.REV_IN_ORG, | |||||
) | |||||
@statsd.timed( | |||||
metric="swh_provenance_backend_accesstime_seconds", | |||||
tags={"method": "flush_revision_content"}, | |||||
) | |||||
def flush_revision_content_layer(self) -> None: | |||||
# Register in the storage all content, directories and revisions that are | |||||
# involved in this layer's associated relations. | |||||
cnts = { | cnts = { | ||||
src | src | ||||
for src, _, _ in self.cache["content_in_revision"] | for src, _, _ in self.cache["content_in_revision"] | ||||
| self.cache["content_in_directory"] | | self.cache["content_in_directory"] | ||||
} | } | ||||
if cnts: | if cnts: | ||||
while not self.storage.content_add(cnts): | while not self.storage.content_add(cnts): | ||||
LOGGER.warning( | LOGGER.warning( | ||||
▲ Show 20 Lines • Show All 94 Lines • ▼ Show 20 Lines | def flush_revision_content_layer(self) -> None: | ||||
if sha1 in self.cache["revision"]["added"] and date is not None | if sha1 in self.cache["revision"]["added"] and date is not None | ||||
} | } | ||||
if rev_dates: | if rev_dates: | ||||
while not self.storage.revision_add(rev_dates): | while not self.storage.revision_add(rev_dates): | ||||
LOGGER.warning( | LOGGER.warning( | ||||
"Unable to write revision dates to the storage. Retrying..." | "Unable to write revision dates to the storage. Retrying..." | ||||
) | ) | ||||
# Origin-revision layer insertions ############################################# | |||||
# Origins and revisions should be inserted first so that internal ids' | |||||
# resolution works properly. | |||||
urls = { | |||||
sha1: url | |||||
for sha1, url in self.cache["origin"]["data"].items() | |||||
if sha1 in self.cache["origin"]["added"] | |||||
} | |||||
if urls: | |||||
while not self.storage.origin_add(urls): | |||||
LOGGER.warning( | |||||
"Unable to write origins urls to the storage. Retrying..." | |||||
) | |||||
rev_orgs = { | |||||
# Destinations in this relation should match origins in the next one | |||||
**{ | |||||
src: RevisionData(date=None, origin=None) | |||||
for src in self.cache["revision_before_revision"] | |||||
}, | |||||
**{ | |||||
# This relation comes second so that non-None origins take precedence | |||||
src: RevisionData(date=None, origin=org) | |||||
for src, org in self.cache["revision_in_origin"] | |||||
}, | |||||
} | |||||
if rev_orgs: | |||||
while not self.storage.revision_add(rev_orgs): | |||||
LOGGER.warning( | |||||
"Unable to write revision entities to the storage. Retrying..." | |||||
) | |||||
# Second, flat models for revisions' histories (ie. revision-before-revision). | |||||
if self.cache["revision_before_revision"]: | |||||
rev_before_rev = { | |||||
src: {RelationData(dst=dst, path=None) for dst in dsts} | |||||
for src, dsts in self.cache["revision_before_revision"].items() | |||||
} | |||||
while not self.storage.relation_add( | |||||
RelationType.REV_BEFORE_REV, rev_before_rev | |||||
): | |||||
LOGGER.warning( | |||||
"Unable to write %s rows to the storage. Retrying...", | |||||
RelationType.REV_BEFORE_REV, | |||||
) | |||||
# Heads (ie. revision-in-origin entries) should be inserted once flat models for | |||||
# their histories were already added. This is to guarantee consistent results if | |||||
# something needs to be reprocessed due to a failure: already inserted heads | |||||
# won't get reprocessed in such a case. | |||||
if self.cache["revision_in_origin"]: | |||||
rev_in_org: Dict[Sha1Git, Set[RelationData]] = {} | |||||
for src, dst in self.cache["revision_in_origin"]: | |||||
rev_in_org.setdefault(src, set()).add(RelationData(dst=dst, path=None)) | |||||
while not self.storage.relation_add(RelationType.REV_IN_ORG, rev_in_org): | |||||
LOGGER.warning( | |||||
"Unable to write %s rows to the storage. Retrying...", | |||||
RelationType.REV_IN_ORG, | |||||
) | |||||
# clear local cache ############################################################ | |||||
self.clear_caches() | |||||
def content_add_to_directory( | def content_add_to_directory( | ||||
self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes | self, directory: DirectoryEntry, blob: FileEntry, prefix: bytes | ||||
) -> None: | ) -> None: | ||||
self.cache["content_in_directory"].add( | self.cache["content_in_directory"].add( | ||||
(blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) | (blob.id, directory.id, normalize(os.path.join(prefix, blob.name))) | ||||
) | ) | ||||
def content_add_to_revision( | def content_add_to_revision( | ||||
▲ Show 20 Lines • Show All 126 Lines • Show Last 20 Lines |
It's a bit hard to tell since the fact the definition of flush_origin_revision_layer comes before flush_revision_content_layer (I'm guessing to lexicographic order of method names) while it was the other way around originally, so it's not easy to compare both parts of the diff but it looks to me this third part of the rev_orgs dict was not present before this diff.
If I'm not mistaken, this part of the really should not be "lost" in the middle of a "pure refactoring" diff (split a method).