diff --git a/swh/storage/replay.py b/swh/storage/replay.py --- a/swh/storage/replay.py +++ b/swh/storage/replay.py @@ -8,7 +8,8 @@ import logging from typing import Any, Callable from typing import Counter as CounterT -from typing import Dict, List, Optional, TypeVar, Union, cast +from typing import Dict, List, Optional, Tuple, TypeVar, Union, cast +from uuid import uuid4 try: from systemd.daemon import notify @@ -66,6 +67,10 @@ OBJECT_FIXERS = { + # drop the metadata field from the revision (if any); this field is + # about to be dropped from the data model (in favor of + # raw_extrinsic_metadata) and there can be bogus values in the existing + # journal (metadata with \0000 in it) "revision": partial(remove_keys, keys=("metadata",)), } @@ -115,7 +120,17 @@ dict_repr = kafka_to_value(msg) if object_type in OBJECT_FIXERS: dict_repr = OBJECT_FIXERS[object_type](dict_repr) - obj = OBJECT_CONVERTERS[object_type](dict_repr) + try: + obj = OBJECT_CONVERTERS[object_type](dict_repr) + except ValueError as exc: + # we do not catch AttributeTypeError here since these are (most + # likely) a clue of something very wrong is occurring, so better crash + error_msg = f"Unable to create model object {object_type}: {repr(exc)}" + self.report_failure(msg, (object_type, dict_repr)) + if self.raise_on_error: + raise StorageArgumentException(error_msg) + return None + if self.validate: if isinstance(obj, HashableObject): cid = obj.compute_hash() @@ -131,17 +146,29 @@ return None return obj - def report_failure(self, msg: bytes, obj: BaseModel): + def report_failure( + self, msg: bytes, obj: Union[BaseModel, Tuple[str, Dict[str, Any]]] + ): if self.reporter: - oid: str = "" - if hasattr(obj, "swhid"): + oid: str + if isinstance(obj, tuple): + object_type, dict_repr = obj + if "id" in dict_repr: + uid = dict_repr["id"] + assert isinstance(uid, bytes) + oid = f"{object_type}:{uid.hex()}" + else: + oid = f"{object_type}:uuid:{uuid4()}" + elif hasattr(obj, "swhid"): swhid = obj.swhid() # type: ignore[attr-defined] oid = str(swhid) elif isinstance(obj, HashableObject): uid = obj.compute_hash() oid = f"{obj.object_type}:{uid.hex()}" # type: ignore[attr-defined] - if oid: - self.reporter(oid, msg) + else: + oid = f"{obj.object_type}:uuid:{uuid4()}" # type: ignore[attr-defined] + + self.reporter(oid, msg) def process_replay_objects( diff --git a/swh/storage/tests/test_replay.py b/swh/storage/tests/test_replay.py --- a/swh/storage/tests/test_replay.py +++ b/swh/storage/tests/test_replay.py @@ -454,6 +454,18 @@ method = getattr(src, object_type + "_add") method([attr.evolve(TEST_OBJECTS[object_type][0], id=b"\x00" * 20)]) nb_sent += 1 + # also add an object that won't even be possible to instantiate; this needs + # to be done at low kafka level (since we cannot instantiate the invalid model + # object...) + # we use directory[1] because it actually have some entries + dict_repr = { + # copy each dir entry twice + "entries": TEST_OBJECTS["directory"][1].to_dict()["entries"] * 2, + "id": b"\x01" * 20, + } + topic = f"{src.journal_writer.journal._prefix}.directory" + src.journal_writer.journal.send(topic, dict_repr["id"], dict_repr) + nb_sent += 1 # Fill the destination storage from Kafka dst = get_storage(cls="memory") @@ -470,7 +482,7 @@ assert invalid == 4, "Invalid objects should be detected" assert set(redisdb.keys()) == { f"swh:1:{typ}:{'0'*40}".encode() for typ in ("rel", "rev", "snp", "dir") - } + } | {b"directory:" + b"01" * 20} for key in redisdb.keys(): # check the stored value looks right @@ -478,7 +490,7 @@ value = kafka_to_value(rawvalue) assert isinstance(value, dict) assert "id" in value - assert value["id"] == b"\x00" * 20 + assert value["id"] in (b"\x00" * 20, b"\x01" * 20) # check that invalid objects did not reach the dst storage for attr_ in ( @@ -488,7 +500,29 @@ "snapshots", ): for id, obj in sorted(getattr(dst._cql_runner, f"_{attr_}").iter_all()): - assert id != b"\x00" * 20 + assert id not in (b"\x00" * 20, b"\x01" * 20) + + # check that valid objects did reach the dst storage + # revisions + expected = [attr.evolve(rev, metadata=None) for rev in TEST_OBJECTS["revision"]] + result = dst.revision_get([obj.id for obj in TEST_OBJECTS["revision"]]) + assert result == expected + # releases + expected = TEST_OBJECTS["release"] + result = dst.release_get([obj.id for obj in TEST_OBJECTS["release"]]) + assert result == expected + # snapshot + # result from snapshot_get is paginated, so adapt the expected to be comparable + expected = [ + {"next_branch": None, **obj.to_dict()} for obj in TEST_OBJECTS["snapshot"] + ] + result = [dst.snapshot_get(obj.id) for obj in TEST_OBJECTS["snapshot"]] + assert result == expected + # directories + for directory in TEST_OBJECTS["directory"]: + assert set(dst.directory_get_entries(directory.id).results) == set( + directory.entries + ) def test_storage_replayer_with_validation_nok_raises(