Changeset View
Changeset View
Standalone View
Standalone View
swh/provenance/provenance.py
# Copyright (C) 2021 The Software Heritage developers | # Copyright (C) 2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from datetime import datetime | from datetime import datetime | ||||
import logging | import logging | ||||
import os | import os | ||||
from types import TracebackType | from types import TracebackType | ||||
from typing import Dict, Generator, Iterable, Optional, Set, Tuple, Type | from typing import Dict, Generator, Iterable, Optional, Set, Tuple, Type | ||||
from typing_extensions import Literal, TypedDict | from typing_extensions import Literal, TypedDict | ||||
from swh.core.statsd import statsd | |||||
from swh.model.model import Sha1Git | from swh.model.model import Sha1Git | ||||
from .interface import ( | from .interface import ( | ||||
ProvenanceInterface, | ProvenanceInterface, | ||||
ProvenanceResult, | ProvenanceResult, | ||||
ProvenanceStorageInterface, | ProvenanceStorageInterface, | ||||
RelationData, | RelationData, | ||||
RelationType, | RelationType, | ||||
RevisionData, | RevisionData, | ||||
) | ) | ||||
from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry | from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry | ||||
LOGGER = logging.getLogger(__name__) | LOGGER = logging.getLogger(__name__) | ||||
BACKEND_DURATION_METRIC = "swh_provenance_backend_duration_seconds" | |||||
class DatetimeCache(TypedDict): | class DatetimeCache(TypedDict): | ||||
data: Dict[Sha1Git, Optional[datetime]] | data: Dict[Sha1Git, Optional[datetime]] | ||||
added: Set[Sha1Git] | added: Set[Sha1Git] | ||||
class OriginCache(TypedDict): | class OriginCache(TypedDict): | ||||
data: Dict[Sha1Git, str] | data: Dict[Sha1Git, str] | ||||
▲ Show 20 Lines • Show All 53 Lines • ▼ Show 20 Lines | ) -> None: | ||||
self.close() | self.close() | ||||
def clear_caches(self) -> None: | def clear_caches(self) -> None: | ||||
self.cache = new_cache() | self.cache = new_cache() | ||||
def close(self) -> None: | def close(self) -> None: | ||||
self.storage.close() | self.storage.close() | ||||
@statsd.timed(metric=BACKEND_DURATION_METRIC, tags={"method": "flush"}) | |||||
def flush(self) -> None: | def flush(self) -> None: | ||||
self.flush_revision_content_layer() | self.flush_revision_content_layer() | ||||
self.flush_origin_revision_layer() | self.flush_origin_revision_layer() | ||||
self.clear_caches() | self.clear_caches() | ||||
@statsd.timed( | |||||
metric=BACKEND_DURATION_METRIC, tags={"method": "flush_origin_revision"} | |||||
) | |||||
def flush_origin_revision_layer(self) -> None: | def flush_origin_revision_layer(self) -> None: | ||||
# Origins and revisions should be inserted first so that internal ids' | # Origins and revisions should be inserted first so that internal ids' | ||||
# resolution works properly. | # resolution works properly. | ||||
urls = { | urls = { | ||||
sha1: url | sha1: url | ||||
for sha1, url in self.cache["origin"]["data"].items() | for sha1, url in self.cache["origin"]["data"].items() | ||||
if sha1 in self.cache["origin"]["added"] | if sha1 in self.cache["origin"]["added"] | ||||
} | } | ||||
Show All 12 Lines | def flush_origin_revision_layer(self) -> None: | ||||
**{ | **{ | ||||
# This relation comes second so that non-None origins take precedence | # This relation comes second so that non-None origins take precedence | ||||
src: RevisionData(date=None, origin=org) | src: RevisionData(date=None, origin=org) | ||||
for src, org in self.cache["revision_in_origin"] | for src, org in self.cache["revision_in_origin"] | ||||
}, | }, | ||||
} | } | ||||
if rev_orgs: | if rev_orgs: | ||||
while not self.storage.revision_add(rev_orgs): | while not self.storage.revision_add(rev_orgs): | ||||
LOGGER.warning( | LOGGER.warning( | ||||
"Unable to write revision entities to the storage. Retrying..." | "Unable to write revision entities to the storage. Retrying..." | ||||
) | ) | ||||
# Second, flat models for revisions' histories (ie. revision-before-revision). | # Second, flat models for revisions' histories (ie. revision-before-revision). | ||||
douardda: It's a bit hard to tell since the fact the definition of `flush_origin_revision_layer` comes… | |||||
Done Inline ActionsAdded a new diff for that. Please check it aeviso: Added a new diff for that. Please check it | |||||
if self.cache["revision_before_revision"]: | if self.cache["revision_before_revision"]: | ||||
rev_before_rev = { | rev_before_rev = { | ||||
src: {RelationData(dst=dst, path=None) for dst in dsts} | src: {RelationData(dst=dst, path=None) for dst in dsts} | ||||
for src, dsts in self.cache["revision_before_revision"].items() | for src, dsts in self.cache["revision_before_revision"].items() | ||||
} | } | ||||
while not self.storage.relation_add( | while not self.storage.relation_add( | ||||
RelationType.REV_BEFORE_REV, rev_before_rev | RelationType.REV_BEFORE_REV, rev_before_rev | ||||
): | ): | ||||
Show All 11 Lines | def flush_origin_revision_layer(self) -> None: | ||||
for src, dst in self.cache["revision_in_origin"]: | for src, dst in self.cache["revision_in_origin"]: | ||||
rev_in_org.setdefault(src, set()).add(RelationData(dst=dst, path=None)) | rev_in_org.setdefault(src, set()).add(RelationData(dst=dst, path=None)) | ||||
while not self.storage.relation_add(RelationType.REV_IN_ORG, rev_in_org): | while not self.storage.relation_add(RelationType.REV_IN_ORG, rev_in_org): | ||||
LOGGER.warning( | LOGGER.warning( | ||||
"Unable to write %s rows to the storage. Retrying...", | "Unable to write %s rows to the storage. Retrying...", | ||||
RelationType.REV_IN_ORG, | RelationType.REV_IN_ORG, | ||||
) | ) | ||||
@statsd.timed( | |||||
metric=BACKEND_DURATION_METRIC, tags={"method": "flush_revision_content"} | |||||
) | |||||
def flush_revision_content_layer(self) -> None: | def flush_revision_content_layer(self) -> None: | ||||
# Register in the storage all content, directories and revisions that are | # Register in the storage all content, directories and revisions that are | ||||
# involved in this layer's associated relations. | # involved in this layer's associated relations. | ||||
cnts = { | cnts = { | ||||
src | src | ||||
for src, _, _ in self.cache["content_in_revision"] | for src, _, _ in self.cache["content_in_revision"] | ||||
| self.cache["content_in_directory"] | | self.cache["content_in_directory"] | ||||
} | } | ||||
▲ Show 20 Lines • Show All 242 Lines • Show Last 20 Lines |
It's a bit hard to tell since the fact the definition of flush_origin_revision_layer comes before flush_revision_content_layer (I'm guessing to lexicographic order of method names) while it was the other way around originally, so it's not easy to compare both parts of the diff but it looks to me this third part of the rev_orgs dict was not present before this diff.
If I'm not mistaken, this part of the really should not be "lost" in the middle of a "pure refactoring" diff (split a method).