Changeset View
Standalone View
swh/provenance/provenance.py
Show All 9 Lines | |||||
from typing import Dict, Generator, Iterable, Optional, Set, Tuple, Type | from typing import Dict, Generator, Iterable, Optional, Set, Tuple, Type | ||||
from typing_extensions import Literal, TypedDict | from typing_extensions import Literal, TypedDict | ||||
from swh.core.statsd import statsd | from swh.core.statsd import statsd | ||||
from swh.model.model import Sha1Git | from swh.model.model import Sha1Git | ||||
from .interface import ( | from .interface import ( | ||||
DirectoryData, | |||||
ProvenanceInterface, | ProvenanceInterface, | ||||
ProvenanceResult, | ProvenanceResult, | ||||
ProvenanceStorageInterface, | ProvenanceStorageInterface, | ||||
RelationData, | RelationData, | ||||
RelationType, | RelationType, | ||||
RevisionData, | RevisionData, | ||||
) | ) | ||||
from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry | from .model import DirectoryEntry, FileEntry, OriginEntry, RevisionEntry | ||||
from .util import path_normalize | from .util import path_normalize | ||||
LOGGER = logging.getLogger(__name__) | LOGGER = logging.getLogger(__name__) | ||||
BACKEND_DURATION_METRIC = "swh_provenance_backend_duration_seconds" | BACKEND_DURATION_METRIC = "swh_provenance_backend_duration_seconds" | ||||
BACKEND_OPERATIONS_METRIC = "swh_provenance_backend_operations_total" | BACKEND_OPERATIONS_METRIC = "swh_provenance_backend_operations_total" | ||||
class DatetimeCache(TypedDict): | class DatetimeCache(TypedDict): | ||||
data: Dict[Sha1Git, Optional[datetime]] | data: Dict[Sha1Git, Optional[datetime]] # None means unknown | ||||
added: Set[Sha1Git] | added: Set[Sha1Git] | ||||
class OriginCache(TypedDict): | class OriginCache(TypedDict): | ||||
data: Dict[Sha1Git, str] | data: Dict[Sha1Git, str] | ||||
added: Set[Sha1Git] | added: Set[Sha1Git] | ||||
class RevisionCache(TypedDict): | class RevisionCache(TypedDict): | ||||
data: Dict[Sha1Git, Sha1Git] | data: Dict[Sha1Git, Sha1Git] | ||||
added: Set[Sha1Git] | added: Set[Sha1Git] | ||||
class ProvenanceCache(TypedDict): | class ProvenanceCache(TypedDict): | ||||
content: DatetimeCache | content: DatetimeCache | ||||
directory: DatetimeCache | directory: DatetimeCache | ||||
directory_flatten: Dict[Sha1Git, Optional[bool]] # None means unknown | |||||
revision: DatetimeCache | revision: DatetimeCache | ||||
# below are insertion caches only | # below are insertion caches only | ||||
content_in_revision: Set[Tuple[Sha1Git, Sha1Git, bytes]] | content_in_revision: Set[Tuple[Sha1Git, Sha1Git, bytes]] | ||||
content_in_directory: Set[Tuple[Sha1Git, Sha1Git, bytes]] | content_in_directory: Set[Tuple[Sha1Git, Sha1Git, bytes]] | ||||
directory_in_revision: Set[Tuple[Sha1Git, Sha1Git, bytes]] | directory_in_revision: Set[Tuple[Sha1Git, Sha1Git, bytes]] | ||||
# these two are for the origin layer | # these two are for the origin layer | ||||
origin: OriginCache | origin: OriginCache | ||||
revision_origin: RevisionCache | revision_origin: RevisionCache | ||||
revision_before_revision: Dict[Sha1Git, Set[Sha1Git]] | revision_before_revision: Dict[Sha1Git, Set[Sha1Git]] | ||||
revision_in_origin: Set[Tuple[Sha1Git, Sha1Git]] | revision_in_origin: Set[Tuple[Sha1Git, Sha1Git]] | ||||
def new_cache() -> ProvenanceCache: | def new_cache() -> ProvenanceCache: | ||||
return ProvenanceCache( | return ProvenanceCache( | ||||
content=DatetimeCache(data={}, added=set()), | content=DatetimeCache(data={}, added=set()), | ||||
directory=DatetimeCache(data={}, added=set()), | directory=DatetimeCache(data={}, added=set()), | ||||
directory_flatten={}, | |||||
revision=DatetimeCache(data={}, added=set()), | revision=DatetimeCache(data={}, added=set()), | ||||
content_in_revision=set(), | content_in_revision=set(), | ||||
content_in_directory=set(), | content_in_directory=set(), | ||||
directory_in_revision=set(), | directory_in_revision=set(), | ||||
origin=OriginCache(data={}, added=set()), | origin=OriginCache(data={}, added=set()), | ||||
revision_origin=RevisionCache(data={}, added=set()), | revision_origin=RevisionCache(data={}, added=set()), | ||||
revision_before_revision={}, | revision_before_revision={}, | ||||
revision_in_origin=set(), | revision_in_origin=set(), | ||||
▲ Show 20 Lines • Show All 113 Lines • ▼ Show 20 Lines | class Provenance: | ||||
@statsd.timed( | @statsd.timed( | ||||
metric=BACKEND_DURATION_METRIC, tags={"method": "flush_revision_content"} | metric=BACKEND_DURATION_METRIC, tags={"method": "flush_revision_content"} | ||||
) | ) | ||||
def flush_revision_content_layer(self) -> None: | def flush_revision_content_layer(self) -> None: | ||||
# Register in the storage all entities, to ensure the coming relations can | # Register in the storage all entities, to ensure the coming relations can | ||||
# properly resolve any internal reference if needed. Content and directory | # properly resolve any internal reference if needed. Content and directory | ||||
# entries may safely be registered with their associated dates. In contrast, | # entries may safely be registered with their associated dates. In contrast, | ||||
# revision entries should be registered without date, as it is used to | # revision entries should be registered without date, as it is used to | ||||
# acknowledge that the flushing was successful. | # acknowledge that the flushing was successful. Also, directories are | ||||
# registered with their flatten flag not set. | |||||
cnt_dates = { | cnt_dates = { | ||||
sha1: date | sha1: date | ||||
for sha1, date in self.cache["content"]["data"].items() | for sha1, date in self.cache["content"]["data"].items() | ||||
if sha1 in self.cache["content"]["added"] and date is not None | if sha1 in self.cache["content"]["added"] and date is not None | ||||
} | } | ||||
if cnt_dates: | if cnt_dates: | ||||
while not self.storage.content_add(cnt_dates): | while not self.storage.content_add(cnt_dates): | ||||
statsd.increment( | statsd.increment( | ||||
metric=BACKEND_OPERATIONS_METRIC, | metric=BACKEND_OPERATIONS_METRIC, | ||||
tags={"method": "flush_revision_content_retry_content_date"}, | tags={"method": "flush_revision_content_retry_content_date"}, | ||||
) | ) | ||||
LOGGER.warning( | LOGGER.warning( | ||||
"Unable to write content dates to the storage. Retrying..." | "Unable to write content dates to the storage. Retrying..." | ||||
) | ) | ||||
dir_dates = { | dir_dates = { | ||||
sha1: date | sha1: DirectoryData(date=date, flat=False) | ||||
for sha1, date in self.cache["directory"]["data"].items() | for sha1, date in self.cache["directory"]["data"].items() | ||||
if sha1 in self.cache["directory"]["added"] and date is not None | if sha1 in self.cache["directory"]["added"] and date is not None | ||||
} | } | ||||
if dir_dates: | if dir_dates: | ||||
while not self.storage.directory_add(dir_dates): | while not self.storage.directory_add(dir_dates): | ||||
statsd.increment( | statsd.increment( | ||||
metric=BACKEND_OPERATIONS_METRIC, | metric=BACKEND_OPERATIONS_METRIC, | ||||
tags={"method": "flush_revision_content_retry_directory_date"}, | tags={"method": "flush_revision_content_retry_directory_date"}, | ||||
▲ Show 20 Lines • Show All 78 Lines • ▼ Show 20 Lines | def flush_revision_content_layer(self) -> None: | ||||
"method": "flush_revision_content_retry_directory_in_revision" | "method": "flush_revision_content_retry_directory_in_revision" | ||||
}, | }, | ||||
) | ) | ||||
LOGGER.warning( | LOGGER.warning( | ||||
"Unable to write %s rows to the storage. Retrying...", | "Unable to write %s rows to the storage. Retrying...", | ||||
RelationType.DIR_IN_REV, | RelationType.DIR_IN_REV, | ||||
) | ) | ||||
# After relations, dates for the revisions can be safely set, acknowledging that | # After relations, flatten flags for directories can be safely set (if | ||||
# these revisions won't need to be reprocessed in case of failure. | # applicable) acknowledging those directories that have already be flattened. | ||||
# Similarly, dates for the revisions are set to acknowledge that these revisions | |||||
# won't need to be reprocessed in case of failure. | |||||
dir_acks = { | |||||
sha1: DirectoryData( | |||||
douardda: maybe `.get(sha1, False)` is a bit better. | |||||
Done Inline ActionsIs not the same. The cache might return None because the associated value to the key is None, and not only due to the key not being defined. In either case we want to replace that none for a False aeviso: Is not the same. The cache might return `None` because the associated value to the key is… | |||||
Done Inline ActionsAh I see indeed the cache in declared as Optional[bool] but I don't see how this can be the case (that we have a None value, that is). Actually I don't see where we fill this cache in this diff. Oh! it looks this is only used in the following revision (which makes it confusing to me). Would have been easier to put the addition of this new cache entry in D6714 (if I understand all this correctly). douardda: Ah I see indeed the cache in declared as `Optional[bool]` but I don't see how this can be the… | |||||
Done Inline ActionsIf the requested directory is not in the storage, then the cache will hold a None value to avoid querying the storage multiple times. That's why we use Optional[bool] here aeviso: If the requested directory is not in the storage, then the cache will hold a `None` value to… | |||||
Done Inline ActionsI don't get why we would cache a negative result; I mean this directory might very well be known by the provenance index next time we ask for it. I find it very weird to cache an information like "we don't know about this yet". What do I miss here? douardda: I don't get why we would cache a negative result; I mean this directory might very well be… | |||||
Done Inline ActionsBecause we want the information to be consistent during the current batch of elements to process. Decisions are take based on these queries and having a different results might lead to wrong decisions. aeviso: Because we want the information to be consistent during the current batch of elements to… | |||||
Done Inline ActionsHumm I really would have preferred not to merge both the "is known" and "is flattened" information in a single (boolean) flag, but meh. At least please document (in the cache declaration in ProvenanceCache) the reason for this value to be an Optional and that None actually means unknown. douardda: Humm I really would have preferred not to merge both the "is known" and "is flattened"… | |||||
date=date, flat=self.cache["directory_flatten"].get(sha1) or False | |||||
) | |||||
for sha1, date in self.cache["directory"]["data"].items() | |||||
if sha1 in self.cache["directory"]["added"] and date is not None | |||||
} | |||||
if dir_acks: | |||||
while not self.storage.directory_add(dir_acks): | |||||
statsd.increment( | |||||
metric=BACKEND_OPERATIONS_METRIC, | |||||
tags={"method": "flush_revision_content_retry_directory_ack"}, | |||||
) | |||||
LOGGER.warning( | |||||
"Unable to write directory dates to the storage. Retrying..." | |||||
) | |||||
rev_dates = { | rev_dates = { | ||||
sha1: RevisionData(date=date, origin=None) | sha1: RevisionData(date=date, origin=None) | ||||
for sha1, date in self.cache["revision"]["data"].items() | for sha1, date in self.cache["revision"]["data"].items() | ||||
if sha1 in self.cache["revision"]["added"] and date is not None | if sha1 in self.cache["revision"]["added"] and date is not None | ||||
} | } | ||||
if rev_dates: | if rev_dates: | ||||
while not self.storage.revision_add(rev_dates): | while not self.storage.revision_add(rev_dates): | ||||
statsd.increment( | statsd.increment( | ||||
▲ Show 20 Lines • Show All 64 Lines • ▼ Show 20 Lines | class Provenance: | ||||
def get_dates( | def get_dates( | ||||
self, | self, | ||||
entity: Literal["content", "directory", "revision"], | entity: Literal["content", "directory", "revision"], | ||||
ids: Iterable[Sha1Git], | ids: Iterable[Sha1Git], | ||||
) -> Dict[Sha1Git, datetime]: | ) -> Dict[Sha1Git, datetime]: | ||||
cache = self.cache[entity] | cache = self.cache[entity] | ||||
missing_ids = set(id for id in ids if id not in cache) | missing_ids = set(id for id in ids if id not in cache) | ||||
if missing_ids: | if missing_ids: | ||||
if entity == "revision": | if entity == "content": | ||||
updated = { | cache["data"].update(self.storage.content_get(missing_ids)) | ||||
elif entity == "directory": | |||||
cache["data"].update( | |||||
{ | |||||
id: dir.date | |||||
for id, dir in self.storage.directory_get(missing_ids).items() | |||||
} | |||||
) | |||||
elif entity == "revision": | |||||
cache["data"].update( | |||||
{ | |||||
id: rev.date | id: rev.date | ||||
for id, rev in self.storage.revision_get(missing_ids).items() | for id, rev in self.storage.revision_get(missing_ids).items() | ||||
} | } | ||||
else: | ) | ||||
updated = getattr(self.storage, f"{entity}_get")(missing_ids) | |||||
cache["data"].update(updated) | |||||
dates: Dict[Sha1Git, datetime] = {} | dates: Dict[Sha1Git, datetime] = {} | ||||
for sha1 in ids: | for sha1 in ids: | ||||
date = cache["data"].setdefault(sha1, None) | date = cache["data"].setdefault(sha1, None) | ||||
if date is not None: | if date is not None: | ||||
dates[sha1] = date | dates[sha1] = date | ||||
return dates | return dates | ||||
def open(self) -> None: | def open(self) -> None: | ||||
▲ Show 20 Lines • Show All 52 Lines • Show Last 20 Lines |
maybe .get(sha1, False) is a bit better.