diff --git a/swh/storage/cli.py b/swh/storage/cli.py --- a/swh/storage/cli.py +++ b/swh/storage/cli.py @@ -13,7 +13,7 @@ from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group -from swh.storage.replay import object_converter_fn +from swh.storage.replay import ModelObjectDeserializer, object_converter_fn try: from systemd.daemon import notify @@ -187,11 +187,21 @@ conf = ctx.obj["config"] storage = get_storage(**conf.pop("storage")) + if "error_reporter" in conf: + from redis import Redis + + reporter = Redis(**conf["error_reporter"]).set + else: + reporter = None + deserializer = ModelObjectDeserializer(reporter=reporter) + client_cfg = conf.pop("journal_client") + client_cfg["value_deserializer"] = deserializer.convert if object_types: client_cfg["object_types"] = object_types if stop_after_objects: client_cfg["stop_after_objects"] = stop_after_objects + try: client = get_journal_client(**client_cfg) except ValueError as exc: diff --git a/swh/storage/fixer.py b/swh/storage/fixer.py --- a/swh/storage/fixer.py +++ b/swh/storage/fixer.py @@ -6,7 +6,7 @@ import copy import datetime import logging -from typing import Any, Dict, List +from typing import Any, Callable, Dict, List from swh.model.model import Origin @@ -249,21 +249,21 @@ return o +object_fixers: Dict[str, Callable[[Dict], Dict]] = { + "content": _fix_content, + "revision": _fix_revision, + "origin": _fix_origin, + "origin_visit": _fix_origin_visit, + "raw_extrinsic_metadata": _fix_raw_extrinsic_metadata, +} + + def fix_objects(object_type: str, objects: List[Dict]) -> List[Dict]: """ Fix legacy objects from the journal to bring them up to date with the latest storage schema. """ - if object_type == "content": - return [_fix_content(v) for v in objects] - elif object_type == "revision": - revisions = [_fix_revision(v) for v in objects] - return [rev for rev in revisions if rev is not None] - elif object_type == "origin": - return [_fix_origin(v) for v in objects] - elif object_type == "origin_visit": - return [_fix_origin_visit(v) for v in objects] - elif object_type == "raw_extrinsic_metadata": - return [_fix_raw_extrinsic_metadata(v) for v in objects] - else: - return objects + if object_type in object_fixers: + fixer = object_fixers[object_type] + objects = [fixer(v) for v in objects] + return objects diff --git a/swh/storage/replay.py b/swh/storage/replay.py --- a/swh/storage/replay.py +++ b/swh/storage/replay.py @@ -3,8 +3,11 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from collections import Counter import logging -from typing import Any, Callable, Container, Dict, List +from typing import Any, Callable, Container +from typing import Counter as CounterT +from typing import Dict, List, Optional, TypeVar, Union, cast try: from systemd.daemon import notify @@ -12,12 +15,15 @@ notify = None from swh.core.statsd import statsd +from swh.journal.serializers import kafka_to_value +from swh.model.hashutil import hash_to_hex from swh.model.model import ( BaseContent, BaseModel, Content, Directory, ExtID, + HashableObject, MetadataAuthority, MetadataFetcher, Origin, @@ -29,8 +35,8 @@ SkippedContent, Snapshot, ) -from swh.storage.exc import HashCollision -from swh.storage.fixer import fix_objects +from swh.storage.exc import HashCollision, StorageArgumentException +from swh.storage.fixer import object_fixers from swh.storage.interface import StorageInterface logger = logging.getLogger(__name__) @@ -56,8 +62,51 @@ } +class ModelObjectDeserializer: + def __init__( + self, + validate: bool = True, + raise_on_error: bool = False, + reporter: Optional[Callable[[str, bytes], None]] = None, + ): + self.validate = validate + self.reporter = reporter + self.raise_on_error = raise_on_error + + def convert(self, object_type: str, msg: bytes) -> BaseModel: + dict_repr = kafka_to_value(msg) + if object_type in object_fixers: + dict_repr = object_fixers[object_type](dict_repr) + obj = object_converter_fn[object_type](dict_repr) + if self.validate: + if isinstance(obj, HashableObject): + cid = obj.compute_hash() + if obj.id != cid: + error_msg = ( + f"Object has id {hash_to_hex(obj.id)}, " + f"but it should be {hash_to_hex(cid)}: {obj}" + ) + logger.error(error_msg) + self.report_failure(msg, obj) + if self.raise_on_error: + raise StorageArgumentException(error_msg) + return obj + + def report_failure(self, msg: bytes, obj: BaseModel): + if self.reporter: + oid: str = "" + if hasattr(obj, "swhid"): + swhid = obj.swhid() # type: ignore[attr-defined] + oid = str(swhid) + elif isinstance(obj, HashableObject): + uid = obj.compute_hash() + oid = f"{obj.object_type}:{uid.hex()}" + if oid: + self.reporter(oid, msg) + + def process_replay_objects( - all_objects: Dict[str, List[Dict[str, Any]]], *, storage: StorageInterface + all_objects: Dict[str, List[BaseModel]], *, storage: StorageInterface ) -> None: for (object_type, objects) in all_objects.items(): logger.debug("Inserting %s %s objects", len(objects), object_type) @@ -70,9 +119,12 @@ notify("WATCHDOG=1") +ContentType = TypeVar("ContentType", bound=BaseContent) + + def collision_aware_content_add( - content_add_fn: Callable[[List[Any]], Dict[str, int]], contents: List[BaseContent], -) -> None: + content_add_fn: Callable[[List[ContentType]], Dict[str, int]] +) -> Callable[[List[ContentType]], Dict[str, int]]: """Add contents to storage. If a hash collision is detected, an error is logged. Then this adds the other non colliding contents to the storage. @@ -81,29 +133,37 @@ contents: List of contents or skipped contents to add to storage """ - if not contents: - return - colliding_content_hashes: List[Dict[str, Any]] = [] - while True: - try: - content_add_fn(contents) - except HashCollision as e: - colliding_content_hashes.append( - { - "algo": e.algo, - "hash": e.hash_id, # hex hash id - "objects": e.colliding_contents, # hex hashes - } - ) - colliding_hashes = e.colliding_content_hashes() - # Drop the colliding contents from the transaction - contents = [c for c in contents if c.hashes() not in colliding_hashes] - else: - # Successfully added contents, we are done - break - if colliding_content_hashes: - for collision in colliding_content_hashes: - logger.error("Collision detected: %(collision)s", {"collision": collision}) + + def wrapper(contents: List[ContentType]) -> Dict[str, int]: + if not contents: + return {} + colliding_content_hashes: List[Dict[str, Any]] = [] + results: CounterT[str] = Counter() + while True: + try: + results.update(content_add_fn(contents)) + except HashCollision as e: + colliding_content_hashes.append( + { + "algo": e.algo, + "hash": e.hash_id, # hex hash id + "objects": e.colliding_contents, # hex hashes + } + ) + colliding_hashes = e.colliding_content_hashes() + # Drop the colliding contents from the transaction + contents = [c for c in contents if c.hashes() not in colliding_hashes] + else: + # Successfully added contents, we are done + break + if colliding_content_hashes: + for collision in colliding_content_hashes: + logger.error( + "Collision detected: %(collision)s", {"collision": collision} + ) + return dict(results) + + return wrapper def dict_key_dropper(d: Dict, keys_to_drop: Container) -> Dict: @@ -112,73 +172,29 @@ def _insert_objects( - object_type: str, objects: List[Dict], storage: StorageInterface + object_type: str, objects: List[BaseModel], storage: StorageInterface ) -> None: """Insert objects of type object_type in the storage. """ - objects = fix_objects(object_type, objects) - - if object_type == "content": - # for bw compat, skipped content should now be delivered in the skipped_content - # topic - contents: List[BaseContent] = [] - skipped_contents: List[BaseContent] = [] - for content in objects: - c = BaseContent.from_dict(content) - if isinstance(c, SkippedContent): - logger.warning( - "Received a series of skipped_content in the " - "content topic, this should not happen anymore" - ) - skipped_contents.append(c) - else: - contents.append(c) - collision_aware_content_add(storage.skipped_content_add, skipped_contents) - collision_aware_content_add(storage.content_add_metadata, contents) - elif object_type == "skipped_content": - skipped_contents = [SkippedContent.from_dict(obj) for obj in objects] - collision_aware_content_add(storage.skipped_content_add, skipped_contents) + if object_type not in object_converter_fn: + logger.warning("Received a series of %s, this should not happen", object_type) + return + + method = getattr(storage, f"{object_type}_add") + if object_type == "skipped_content": + method = collision_aware_content_add(method) + elif object_type == "content": + method = collision_aware_content_add(storage.content_add_metadata) elif object_type in ("origin_visit", "origin_visit_status"): origins: List[Origin] = [] - converter_fn = object_converter_fn[object_type] - model_objs = [] - for obj in objects: - origins.append(Origin(url=obj["origin"])) - model_objs.append(converter_fn(obj)) + for obj in cast(List[Union[OriginVisit, OriginVisitStatus]], objects): + origins.append(Origin(url=obj.origin)) storage.origin_add(origins) - method = getattr(storage, f"{object_type}_add") - method(model_objs) elif object_type == "raw_extrinsic_metadata": - converted = [RawExtrinsicMetadata.from_dict(o) for o in objects] - authorities = {emd.authority for emd in converted} - fetchers = {emd.fetcher for emd in converted} + emds = cast(List[RawExtrinsicMetadata], objects) + authorities = {emd.authority for emd in emds} + fetchers = {emd.fetcher for emd in emds} storage.metadata_authority_add(list(authorities)) storage.metadata_fetcher_add(list(fetchers)) - storage.raw_extrinsic_metadata_add(converted) - elif object_type == "revision": - # drop the metadata field from the revision (is any); this field is - # about to be dropped from the data model (in favor of - # raw_extrinsic_metadata) and there can be bogus values in the existing - # journal (metadata with \0000 in it) - method = getattr(storage, object_type + "_add") - method( - [ - object_converter_fn[object_type](dict_key_dropper(o, ("metadata",))) - for o in objects - ] - ) - elif object_type in ( - "directory", - "extid", - "revision", - "release", - "snapshot", - "origin", - "metadata_fetcher", - "metadata_authority", - ): - method = getattr(storage, object_type + "_add") - method([object_converter_fn[object_type](o) for o in objects]) - else: - logger.warning("Received a series of %s, this should not happen", object_type) + method(objects) diff --git a/swh/storage/tests/test_backfill.py b/swh/storage/tests/test_backfill.py --- a/swh/storage/tests/test_backfill.py +++ b/swh/storage/tests/test_backfill.py @@ -20,7 +20,7 @@ raw_extrinsic_metadata_target_ranges, ) from swh.storage.in_memory import InMemoryStorage -from swh.storage.replay import process_replay_objects +from swh.storage.replay import ModelObjectDeserializer, process_replay_objects from swh.storage.tests.test_replay import check_replayed TEST_CONFIG = { @@ -239,7 +239,6 @@ } swh_storage_backend_config["journal_writer"] = journal1 storage = get_storage(**swh_storage_backend_config) - # fill the storage and the journal (under prefix1) for object_type, objects in TEST_OBJECTS.items(): method = getattr(storage, object_type + "_add") @@ -266,13 +265,16 @@ # now check journal content are the same under both topics # use the replayer scaffolding to fill storages to make is a bit easier # Replaying #1 + deserializer = ModelObjectDeserializer() sto1 = get_storage(cls="memory") replayer1 = JournalClient( brokers=kafka_server, group_id=f"{kafka_consumer_group}-1", prefix=prefix1, stop_on_eof=True, + value_deserializer=deserializer.convert, ) + worker_fn1 = functools.partial(process_replay_objects, storage=sto1) replayer1.process(worker_fn1) @@ -283,6 +285,7 @@ group_id=f"{kafka_consumer_group}-2", prefix=prefix2, stop_on_eof=True, + value_deserializer=deserializer.convert, ) worker_fn2 = functools.partial(process_replay_objects, storage=sto2) replayer2.process(worker_fn2) diff --git a/swh/storage/tests/test_replay.py b/swh/storage/tests/test_replay.py --- a/swh/storage/tests/test_replay.py +++ b/swh/storage/tests/test_replay.py @@ -26,7 +26,7 @@ from swh.storage import get_storage from swh.storage.cassandra.model import ContentRow, SkippedContentRow from swh.storage.in_memory import InMemoryStorage -from swh.storage.replay import process_replay_objects +from swh.storage.replay import ModelObjectDeserializer, process_replay_objects UTC = datetime.timezone.utc @@ -70,11 +70,13 @@ "journal_writer": journal_writer_config, } storage = get_storage(**storage_config) + deserializer = ModelObjectDeserializer() replayer = JournalClient( brokers=kafka_server, group_id=kafka_consumer_group, prefix=kafka_prefix, stop_on_eof=True, + value_deserializer=deserializer.convert, ) yield storage, replayer @@ -207,12 +209,6 @@ _check_replay_skipped_content(src, replayer, "skipped_content") -def test_replay_skipped_content_bwcompat(replayer_storage_and_client): - """Test the 'content' topic can be used to replay SkippedContent objects.""" - src, replayer = replayer_storage_and_client - _check_replay_skipped_content(src, replayer, "content") - - # utility functions @@ -355,12 +351,14 @@ # Fill a destination storage from Kafka, potentially using privileged topics dst_storage = get_storage(cls="memory") + deserializer = ModelObjectDeserializer() replayer = JournalClient( brokers=kafka_server, group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=nb_sent, privileged=privileged, + value_deserializer=deserializer.convert, ) worker_fn = functools.partial(process_replay_objects, storage=dst_storage)