Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/replay.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from collections import Counter | |||||
import logging | import logging | ||||
from typing import Any, Callable, Container, Dict, List | from typing import Any, Callable, Container | ||||
from typing import Counter as CounterT | |||||
from typing import Dict, List, Optional, TypeVar, Union, cast | |||||
try: | try: | ||||
from systemd.daemon import notify | from systemd.daemon import notify | ||||
except ImportError: | except ImportError: | ||||
notify = None | notify = None | ||||
from swh.core.statsd import statsd | from swh.core.statsd import statsd | ||||
from swh.journal.serializers import kafka_to_value | |||||
from swh.model.hashutil import hash_to_hex | |||||
from swh.model.model import ( | from swh.model.model import ( | ||||
BaseContent, | BaseContent, | ||||
BaseModel, | BaseModel, | ||||
Content, | Content, | ||||
Directory, | Directory, | ||||
ExtID, | ExtID, | ||||
HashableObject, | |||||
MetadataAuthority, | MetadataAuthority, | ||||
MetadataFetcher, | MetadataFetcher, | ||||
Origin, | Origin, | ||||
OriginVisit, | OriginVisit, | ||||
OriginVisitStatus, | OriginVisitStatus, | ||||
RawExtrinsicMetadata, | RawExtrinsicMetadata, | ||||
Release, | Release, | ||||
Revision, | Revision, | ||||
SkippedContent, | SkippedContent, | ||||
Snapshot, | Snapshot, | ||||
) | ) | ||||
from swh.storage.exc import HashCollision | from swh.storage.exc import HashCollision, StorageArgumentException | ||||
from swh.storage.fixer import fix_objects | from swh.storage.fixer import object_fixers | ||||
from swh.storage.interface import StorageInterface | from swh.storage.interface import StorageInterface | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
GRAPH_OPERATIONS_METRIC = "swh_graph_replayer_operations_total" | GRAPH_OPERATIONS_METRIC = "swh_graph_replayer_operations_total" | ||||
GRAPH_DURATION_METRIC = "swh_graph_replayer_duration_seconds" | GRAPH_DURATION_METRIC = "swh_graph_replayer_duration_seconds" | ||||
object_converter_fn: Dict[str, Callable[[Dict], BaseModel]] = { | object_converter_fn: Dict[str, Callable[[Dict], BaseModel]] = { | ||||
"origin": Origin.from_dict, | "origin": Origin.from_dict, | ||||
"origin_visit": OriginVisit.from_dict, | "origin_visit": OriginVisit.from_dict, | ||||
"origin_visit_status": OriginVisitStatus.from_dict, | "origin_visit_status": OriginVisitStatus.from_dict, | ||||
"snapshot": Snapshot.from_dict, | "snapshot": Snapshot.from_dict, | ||||
"revision": Revision.from_dict, | "revision": Revision.from_dict, | ||||
"release": Release.from_dict, | "release": Release.from_dict, | ||||
"directory": Directory.from_dict, | "directory": Directory.from_dict, | ||||
"content": Content.from_dict, | "content": Content.from_dict, | ||||
"skipped_content": SkippedContent.from_dict, | "skipped_content": SkippedContent.from_dict, | ||||
"metadata_authority": MetadataAuthority.from_dict, | "metadata_authority": MetadataAuthority.from_dict, | ||||
"metadata_fetcher": MetadataFetcher.from_dict, | "metadata_fetcher": MetadataFetcher.from_dict, | ||||
"raw_extrinsic_metadata": RawExtrinsicMetadata.from_dict, | "raw_extrinsic_metadata": RawExtrinsicMetadata.from_dict, | ||||
"extid": ExtID.from_dict, | "extid": ExtID.from_dict, | ||||
} | } | ||||
class ModelObjectDeserializer: | |||||
def __init__( | |||||
vlorentz: use `swh.storage.utils.remove_keys` | |||||
Done Inline Actionsthx douardda: thx | |||||
self, | |||||
validate: bool = True, | |||||
raise_on_error: bool = False, | |||||
reporter: Optional[Callable[[str, bytes], None]] = None, | |||||
): | |||||
self.validate = validate | |||||
self.reporter = reporter | |||||
self.raise_on_error = raise_on_error | |||||
Done Inline Actionscould you add a docstring? in particular, explain the args of reporter vlorentz: could you add a docstring? in particular, explain the args of `reporter` | |||||
def convert(self, object_type: str, msg: bytes) -> Optional[BaseModel]: | |||||
dict_repr = kafka_to_value(msg) | |||||
if object_type in object_fixers: | |||||
dict_repr = object_fixers[object_type](dict_repr) | |||||
obj = object_converter_fn[object_type](dict_repr) | |||||
if self.validate: | |||||
if isinstance(obj, HashableObject): | |||||
cid = obj.compute_hash() | |||||
if obj.id != cid: | |||||
error_msg = ( | |||||
f"Object has id {hash_to_hex(obj.id)}, " | |||||
f"but it should be {hash_to_hex(cid)}: {obj}" | |||||
) | |||||
logger.error(error_msg) | |||||
self.report_failure(msg, obj) | |||||
if self.raise_on_error: | |||||
raise StorageArgumentException(error_msg) | |||||
return None | |||||
return obj | |||||
def report_failure(self, msg: bytes, obj: BaseModel): | |||||
if self.reporter: | |||||
oid: str = "" | |||||
if hasattr(obj, "swhid"): | |||||
swhid = obj.swhid() # type: ignore[attr-defined] | |||||
oid = str(swhid) | |||||
elif isinstance(obj, HashableObject): | |||||
uid = obj.compute_hash() | |||||
oid = f"{obj.object_type}:{uid.hex()}" | |||||
Done Inline ActionsTIL this comment syntax vlorentz: TIL this comment syntax | |||||
if oid: | |||||
self.reporter(oid, msg) | |||||
Done Inline ActionsDo we have any HashableObject class without a swhid method? vlorentz: Do we have any `HashableObject` class without a `swhid` method? | |||||
Done Inline ActionsExtID douardda: ExtID | |||||
def process_replay_objects( | def process_replay_objects( | ||||
Done Inline Actionscan you rename object_converter_fn and object_fixers to be in caps? (it's a little confusing below when using them, they look like variables) vlorentz: can you rename `object_converter_fn` and `object_fixers` to be in caps? (it's a little… | |||||
all_objects: Dict[str, List[Dict[str, Any]]], *, storage: StorageInterface | all_objects: Dict[str, List[BaseModel]], *, storage: StorageInterface | ||||
) -> None: | ) -> None: | ||||
for (object_type, objects) in all_objects.items(): | for (object_type, objects) in all_objects.items(): | ||||
logger.debug("Inserting %s %s objects", len(objects), object_type) | logger.debug("Inserting %s %s objects", len(objects), object_type) | ||||
with statsd.timed(GRAPH_DURATION_METRIC, tags={"object_type": object_type}): | with statsd.timed(GRAPH_DURATION_METRIC, tags={"object_type": object_type}): | ||||
_insert_objects(object_type, objects, storage) | _insert_objects(object_type, objects, storage) | ||||
statsd.increment( | statsd.increment( | ||||
GRAPH_OPERATIONS_METRIC, len(objects), tags={"object_type": object_type} | GRAPH_OPERATIONS_METRIC, len(objects), tags={"object_type": object_type} | ||||
) | ) | ||||
if notify: | if notify: | ||||
notify("WATCHDOG=1") | notify("WATCHDOG=1") | ||||
ContentType = TypeVar("ContentType", bound=BaseContent) | |||||
def collision_aware_content_add( | def collision_aware_content_add( | ||||
content_add_fn: Callable[[List[Any]], Dict[str, int]], contents: List[BaseContent], | content_add_fn: Callable[[List[ContentType]], Dict[str, int]] | ||||
) -> None: | ) -> Callable[[List[ContentType]], Dict[str, int]]: | ||||
"""Add contents to storage. If a hash collision is detected, an error is | """Add contents to storage. If a hash collision is detected, an error is | ||||
logged. Then this adds the other non colliding contents to the storage. | logged. Then this adds the other non colliding contents to the storage. | ||||
Args: | Args: | ||||
content_add_fn: Storage content callable | content_add_fn: Storage content callable | ||||
contents: List of contents or skipped contents to add to storage | contents: List of contents or skipped contents to add to storage | ||||
""" | """ | ||||
def wrapper(contents: List[ContentType]) -> Dict[str, int]: | |||||
if not contents: | if not contents: | ||||
return | return {} | ||||
colliding_content_hashes: List[Dict[str, Any]] = [] | colliding_content_hashes: List[Dict[str, Any]] = [] | ||||
results: CounterT[str] = Counter() | |||||
while True: | while True: | ||||
try: | try: | ||||
content_add_fn(contents) | results.update(content_add_fn(contents)) | ||||
except HashCollision as e: | except HashCollision as e: | ||||
colliding_content_hashes.append( | colliding_content_hashes.append( | ||||
{ | { | ||||
"algo": e.algo, | "algo": e.algo, | ||||
"hash": e.hash_id, # hex hash id | "hash": e.hash_id, # hex hash id | ||||
"objects": e.colliding_contents, # hex hashes | "objects": e.colliding_contents, # hex hashes | ||||
} | } | ||||
) | ) | ||||
colliding_hashes = e.colliding_content_hashes() | colliding_hashes = e.colliding_content_hashes() | ||||
# Drop the colliding contents from the transaction | # Drop the colliding contents from the transaction | ||||
contents = [c for c in contents if c.hashes() not in colliding_hashes] | contents = [c for c in contents if c.hashes() not in colliding_hashes] | ||||
else: | else: | ||||
# Successfully added contents, we are done | # Successfully added contents, we are done | ||||
break | break | ||||
if colliding_content_hashes: | if colliding_content_hashes: | ||||
for collision in colliding_content_hashes: | for collision in colliding_content_hashes: | ||||
logger.error("Collision detected: %(collision)s", {"collision": collision}) | logger.error( | ||||
"Collision detected: %(collision)s", {"collision": collision} | |||||
) | |||||
return dict(results) | |||||
return wrapper | |||||
def dict_key_dropper(d: Dict, keys_to_drop: Container) -> Dict: | def dict_key_dropper(d: Dict, keys_to_drop: Container) -> Dict: | ||||
"""Returns a copy of the dict d without any key listed in keys_to_drop""" | """Returns a copy of the dict d without any key listed in keys_to_drop""" | ||||
return {k: v for (k, v) in d.items() if k not in keys_to_drop} | return {k: v for (k, v) in d.items() if k not in keys_to_drop} | ||||
Done Inline Actionswhy not use functools.partial instead of returning a callable? vlorentz: why not use `functools.partial` instead of returning a callable? | |||||
Done Inline Actionswould work too yes, why not douardda: would work too yes, why not | |||||
def _insert_objects( | def _insert_objects( | ||||
object_type: str, objects: List[Dict], storage: StorageInterface | object_type: str, objects: List[BaseModel], storage: StorageInterface | ||||
) -> None: | ) -> None: | ||||
"""Insert objects of type object_type in the storage. | """Insert objects of type object_type in the storage. | ||||
""" | """ | ||||
objects = fix_objects(object_type, objects) | if object_type not in object_converter_fn: | ||||
logger.warning("Received a series of %s, this should not happen", object_type) | |||||
return | |||||
if object_type == "content": | method = getattr(storage, f"{object_type}_add") | ||||
# for bw compat, skipped content should now be delivered in the skipped_content | if object_type == "skipped_content": | ||||
# topic | method = collision_aware_content_add(method) | ||||
contents: List[BaseContent] = [] | elif object_type == "content": | ||||
skipped_contents: List[BaseContent] = [] | method = collision_aware_content_add(storage.content_add_metadata) | ||||
for content in objects: | |||||
c = BaseContent.from_dict(content) | |||||
if isinstance(c, SkippedContent): | |||||
logger.warning( | |||||
"Received a series of skipped_content in the " | |||||
"content topic, this should not happen anymore" | |||||
) | |||||
skipped_contents.append(c) | |||||
else: | |||||
contents.append(c) | |||||
collision_aware_content_add(storage.skipped_content_add, skipped_contents) | |||||
collision_aware_content_add(storage.content_add_metadata, contents) | |||||
elif object_type == "skipped_content": | |||||
skipped_contents = [SkippedContent.from_dict(obj) for obj in objects] | |||||
collision_aware_content_add(storage.skipped_content_add, skipped_contents) | |||||
elif object_type in ("origin_visit", "origin_visit_status"): | elif object_type in ("origin_visit", "origin_visit_status"): | ||||
origins: List[Origin] = [] | origins: List[Origin] = [] | ||||
converter_fn = object_converter_fn[object_type] | for obj in cast(List[Union[OriginVisit, OriginVisitStatus]], objects): | ||||
model_objs = [] | origins.append(Origin(url=obj.origin)) | ||||
for obj in objects: | |||||
origins.append(Origin(url=obj["origin"])) | |||||
model_objs.append(converter_fn(obj)) | |||||
storage.origin_add(origins) | storage.origin_add(origins) | ||||
method = getattr(storage, f"{object_type}_add") | |||||
method(model_objs) | |||||
elif object_type == "raw_extrinsic_metadata": | elif object_type == "raw_extrinsic_metadata": | ||||
converted = [RawExtrinsicMetadata.from_dict(o) for o in objects] | emds = cast(List[RawExtrinsicMetadata], objects) | ||||
authorities = {emd.authority for emd in converted} | authorities = {emd.authority for emd in emds} | ||||
fetchers = {emd.fetcher for emd in converted} | fetchers = {emd.fetcher for emd in emds} | ||||
storage.metadata_authority_add(list(authorities)) | storage.metadata_authority_add(list(authorities)) | ||||
storage.metadata_fetcher_add(list(fetchers)) | storage.metadata_fetcher_add(list(fetchers)) | ||||
storage.raw_extrinsic_metadata_add(converted) | method(objects) | ||||
elif object_type == "revision": | |||||
# drop the metadata field from the revision (is any); this field is | |||||
# about to be dropped from the data model (in favor of | |||||
# raw_extrinsic_metadata) and there can be bogus values in the existing | |||||
# journal (metadata with \0000 in it) | |||||
method = getattr(storage, object_type + "_add") | |||||
method( | |||||
[ | |||||
object_converter_fn[object_type](dict_key_dropper(o, ("metadata",))) | |||||
for o in objects | |||||
] | |||||
) | |||||
elif object_type in ( | |||||
"directory", | |||||
"extid", | |||||
"revision", | |||||
"release", | |||||
"snapshot", | |||||
"origin", | |||||
"metadata_fetcher", | |||||
"metadata_authority", | |||||
): | |||||
method = getattr(storage, object_type + "_add") | |||||
method([object_converter_fn[object_type](o) for o in objects]) | |||||
else: | |||||
logger.warning("Received a series of %s, this should not happen", object_type) |
use swh.storage.utils.remove_keys