diff --git a/swh/storage/replay.py b/swh/storage/replay.py index 8fa9fec0..f03bde2e 100644 --- a/swh/storage/replay.py +++ b/swh/storage/replay.py @@ -1,233 +1,256 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import Counter from functools import partial import logging from typing import Any, Callable from typing import Counter as CounterT -from typing import Dict, List, Optional, TypeVar, Union, cast +from typing import Dict, List, Optional, Tuple, TypeVar, Union, cast +from uuid import uuid4 try: from systemd.daemon import notify except ImportError: notify = None from swh.core.statsd import statsd from swh.journal.serializers import kafka_to_value from swh.model.hashutil import hash_to_hex from swh.model.model import ( BaseContent, BaseModel, Content, Directory, ExtID, HashableObject, MetadataAuthority, MetadataFetcher, Origin, OriginVisit, OriginVisitStatus, RawExtrinsicMetadata, Release, Revision, SkippedContent, Snapshot, ) from swh.storage.exc import HashCollision, StorageArgumentException from swh.storage.interface import StorageInterface from swh.storage.utils import remove_keys logger = logging.getLogger(__name__) GRAPH_OPERATIONS_METRIC = "swh_graph_replayer_operations_total" GRAPH_DURATION_METRIC = "swh_graph_replayer_duration_seconds" OBJECT_CONVERTERS: Dict[str, Callable[[Dict], BaseModel]] = { "origin": Origin.from_dict, "origin_visit": OriginVisit.from_dict, "origin_visit_status": OriginVisitStatus.from_dict, "snapshot": Snapshot.from_dict, "revision": Revision.from_dict, "release": Release.from_dict, "directory": Directory.from_dict, "content": Content.from_dict, "skipped_content": SkippedContent.from_dict, "metadata_authority": MetadataAuthority.from_dict, "metadata_fetcher": MetadataFetcher.from_dict, "raw_extrinsic_metadata": RawExtrinsicMetadata.from_dict, "extid": ExtID.from_dict, } # Deprecated, for BW compat only. object_converter_fn = OBJECT_CONVERTERS OBJECT_FIXERS = { # drop the metadata field from the revision (if any); this field is # about to be dropped from the data model (in favor of # raw_extrinsic_metadata) and there can be bogus values in the existing # journal (metadata with \0000 in it) "revision": partial(remove_keys, keys=("metadata",)), } class ModelObjectDeserializer: """A swh.journal object deserializer that checks object validity and reports invalid objects The deserializer will directly produce BaseModel objects from journal objects representations. If validation is activated and the object is hashable, it will check if the computed hash matches the identifier of the object. If the object is invalid and a 'reporter' function is given, it will be called with 2 arguments:: reporter(object_id, journal_msg) Where 'object_id' is a string representation of the object identifier (from the journal message), and 'journal_msg' is the row message (bytes) retrieved from the journal. If 'raise_on_error' is True, a 'StorageArgumentException' exception is raised. Typical usage:: deserializer = ModelObjectDeserializer(validate=True, reporter=reporter_cb) client = get_journal_client( cls="kafka", value_deserializer=deserializer, **cfg) """ def __init__( self, validate: bool = True, raise_on_error: bool = False, reporter: Optional[Callable[[str, bytes], None]] = None, ): self.validate = validate self.reporter = reporter self.raise_on_error = raise_on_error def convert(self, object_type: str, msg: bytes) -> Optional[BaseModel]: dict_repr = kafka_to_value(msg) if object_type in OBJECT_FIXERS: dict_repr = OBJECT_FIXERS[object_type](dict_repr) - obj = OBJECT_CONVERTERS[object_type](dict_repr) + try: + obj = OBJECT_CONVERTERS[object_type](dict_repr) + except ValueError as exc: + # we do not catch AttributeTypeError here since these are (most + # likely) a clue of something very wrong is occurring, so better crash + error_msg = f"Unable to create model object {object_type}: {repr(exc)}" + self.report_failure(msg, (object_type, dict_repr)) + if self.raise_on_error: + raise StorageArgumentException(error_msg) + return None + if self.validate: if isinstance(obj, HashableObject): cid = obj.compute_hash() if obj.id != cid: error_msg = ( f"Object has id {hash_to_hex(obj.id)}, " f"but it should be {hash_to_hex(cid)}: {obj}" ) logger.error(error_msg) self.report_failure(msg, obj) if self.raise_on_error: raise StorageArgumentException(error_msg) return None return obj - def report_failure(self, msg: bytes, obj: BaseModel): + def report_failure( + self, msg: bytes, obj: Union[BaseModel, Tuple[str, Dict[str, Any]]] + ): if self.reporter: - oid: str = "" - if hasattr(obj, "swhid"): + oid: str + if isinstance(obj, tuple): + object_type, dict_repr = obj + if "id" in dict_repr: + uid = dict_repr["id"] + assert isinstance(uid, bytes) + oid = f"{object_type}:{uid.hex()}" + else: + oid = f"{object_type}:uuid:{uuid4()}" + elif hasattr(obj, "swhid"): swhid = obj.swhid() # type: ignore[attr-defined] oid = str(swhid) elif isinstance(obj, HashableObject): uid = obj.compute_hash() oid = f"{obj.object_type}:{uid.hex()}" # type: ignore[attr-defined] - if oid: - self.reporter(oid, msg) + else: + oid = f"{obj.object_type}:uuid:{uuid4()}" # type: ignore[attr-defined] + + self.reporter(oid, msg) def process_replay_objects( all_objects: Dict[str, List[BaseModel]], *, storage: StorageInterface ) -> None: for (object_type, objects) in all_objects.items(): logger.debug("Inserting %s %s objects", len(objects), object_type) with statsd.timed(GRAPH_DURATION_METRIC, tags={"object_type": object_type}): _insert_objects(object_type, objects, storage) statsd.increment( GRAPH_OPERATIONS_METRIC, len(objects), tags={"object_type": object_type} ) if notify: notify("WATCHDOG=1") ContentType = TypeVar("ContentType", bound=BaseContent) def collision_aware_content_add( contents: List[ContentType], content_add_fn: Callable[[List[ContentType]], Dict[str, int]], ) -> Dict[str, int]: """Add contents to storage. If a hash collision is detected, an error is logged. Then this adds the other non colliding contents to the storage. Args: content_add_fn: Storage content callable contents: List of contents or skipped contents to add to storage """ if not contents: return {} colliding_content_hashes: List[Dict[str, Any]] = [] results: CounterT[str] = Counter() while True: try: results.update(content_add_fn(contents)) except HashCollision as e: colliding_content_hashes.append( { "algo": e.algo, "hash": e.hash_id, # hex hash id "objects": e.colliding_contents, # hex hashes } ) colliding_hashes = e.colliding_content_hashes() # Drop the colliding contents from the transaction contents = [c for c in contents if c.hashes() not in colliding_hashes] else: # Successfully added contents, we are done break if colliding_content_hashes: for collision in colliding_content_hashes: logger.error("Collision detected: %(collision)s", {"collision": collision}) return dict(results) def _insert_objects( object_type: str, objects: List[BaseModel], storage: StorageInterface ) -> None: """Insert objects of type object_type in the storage.""" if object_type not in OBJECT_CONVERTERS: logger.warning("Received a series of %s, this should not happen", object_type) return method = getattr(storage, f"{object_type}_add") if object_type == "skipped_content": method = partial(collision_aware_content_add, content_add_fn=method) elif object_type == "content": method = partial( collision_aware_content_add, content_add_fn=storage.content_add_metadata ) elif object_type in ("origin_visit", "origin_visit_status"): origins: List[Origin] = [] for obj in cast(List[Union[OriginVisit, OriginVisitStatus]], objects): origins.append(Origin(url=obj.origin)) storage.origin_add(origins) elif object_type == "raw_extrinsic_metadata": emds = cast(List[RawExtrinsicMetadata], objects) authorities = {emd.authority for emd in emds} fetchers = {emd.fetcher for emd in emds} storage.metadata_authority_add(list(authorities)) storage.metadata_fetcher_add(list(fetchers)) method(objects) diff --git a/swh/storage/tests/test_replay.py b/swh/storage/tests/test_replay.py index 39674829..000220c6 100644 --- a/swh/storage/tests/test_replay.py +++ b/swh/storage/tests/test_replay.py @@ -1,539 +1,573 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import dataclasses import datetime import functools import logging import re from typing import Any, Container, Dict, Optional, cast import attr import pytest from swh.journal.client import JournalClient from swh.journal.serializers import kafka_to_value, key_to_kafka, value_to_kafka from swh.model.hashutil import DEFAULT_ALGORITHMS, MultiHash, hash_to_bytes, hash_to_hex from swh.model.model import Revision, RevisionType from swh.model.tests.swh_model_data import ( COMMITTERS, DATES, DUPLICATE_CONTENTS, REVISIONS, ) from swh.model.tests.swh_model_data import TEST_OBJECTS as _TEST_OBJECTS from swh.storage import get_storage from swh.storage.cassandra.model import ContentRow, SkippedContentRow from swh.storage.exc import StorageArgumentException from swh.storage.in_memory import InMemoryStorage from swh.storage.replay import ModelObjectDeserializer, process_replay_objects UTC = datetime.timezone.utc TEST_OBJECTS = _TEST_OBJECTS.copy() # add a revision with metadata to check this later is dropped while being replayed TEST_OBJECTS["revision"] = list(_TEST_OBJECTS["revision"]) + [ Revision( id=hash_to_bytes("51d9d94ab08d3f75512e3a9fd15132e0a7ca7928"), message=b"hello again", date=DATES[1], committer=COMMITTERS[1], author=COMMITTERS[0], committer_date=DATES[0], type=RevisionType.GIT, directory=b"\x03" * 20, synthetic=False, metadata={"something": "interesting"}, parents=(REVISIONS[0].id,), ), ] WRONG_ID_REG = re.compile( "Object has id [0-9a-f]{40}, but it should be [0-9a-f]{40}: .*" ) def nullify_ctime(obj): if isinstance(obj, (ContentRow, SkippedContentRow)): return dataclasses.replace(obj, ctime=None) else: return obj @pytest.fixture() def replayer_storage_and_client( kafka_prefix: str, kafka_consumer_group: str, kafka_server: str ): journal_writer_config = { "cls": "kafka", "brokers": [kafka_server], "client_id": "kafka_writer", "prefix": kafka_prefix, } storage_config: Dict[str, Any] = { "cls": "memory", "journal_writer": journal_writer_config, } storage = get_storage(**storage_config) deserializer = ModelObjectDeserializer() replayer = JournalClient( brokers=kafka_server, group_id=kafka_consumer_group, prefix=kafka_prefix, stop_on_eof=True, value_deserializer=deserializer.convert, ) yield storage, replayer def test_storage_replayer(replayer_storage_and_client, caplog): """Optimal replayer scenario. This: - writes objects to a source storage - replayer consumes objects from the topic and replays them - a destination storage is filled from this In the end, both storages should have the same content. """ src, replayer = replayer_storage_and_client # Fill Kafka using a source storage nb_sent = 0 for object_type, objects in TEST_OBJECTS.items(): method = getattr(src, object_type + "_add") method(objects) nb_sent += len(objects) caplog.set_level(logging.ERROR, "swh.journal.replay") # Fill the destination storage from Kafka dst = get_storage(cls="memory") worker_fn = functools.partial(process_replay_objects, storage=dst) nb_inserted = replayer.process(worker_fn) assert nb_sent == nb_inserted assert isinstance(src, InMemoryStorage) # needed to help mypy assert isinstance(dst, InMemoryStorage) check_replayed(src, dst) collision = 0 for record in caplog.records: logtext = record.getMessage() if "Colliding contents:" in logtext: collision += 1 assert collision == 0, "No collision should be detected" def test_storage_replay_with_collision(replayer_storage_and_client, caplog): """Another replayer scenario with collisions. This: - writes objects to the topic, including colliding contents - replayer consumes objects from the topic and replay them - This drops the colliding contents from the replay when detected """ src, replayer = replayer_storage_and_client # Fill Kafka using a source storage nb_sent = 0 for object_type, objects in TEST_OBJECTS.items(): method = getattr(src, object_type + "_add") method(objects) nb_sent += len(objects) # Create collision in input data # These should not be written in the destination producer = src.journal_writer.journal.producer prefix = src.journal_writer.journal._prefix for content in DUPLICATE_CONTENTS: topic = f"{prefix}.content" key = content.sha1 now = datetime.datetime.now(tz=UTC) content = attr.evolve(content, ctime=now) producer.produce( topic=topic, key=key_to_kafka(key), value=value_to_kafka(content.to_dict()), ) nb_sent += 1 producer.flush() caplog.set_level(logging.ERROR, "swh.journal.replay") # Fill the destination storage from Kafka dst = get_storage(cls="memory") worker_fn = functools.partial(process_replay_objects, storage=dst) nb_inserted = replayer.process(worker_fn) assert nb_sent == nb_inserted # check the logs for the collision being properly detected nb_collisions = 0 actual_collision: Dict for record in caplog.records: logtext = record.getMessage() if "Collision detected:" in logtext: nb_collisions += 1 actual_collision = record.args["collision"] assert nb_collisions == 1, "1 collision should be detected" algo = "sha1" assert actual_collision["algo"] == algo expected_colliding_hash = hash_to_hex(DUPLICATE_CONTENTS[0].get_hash(algo)) assert actual_collision["hash"] == expected_colliding_hash actual_colliding_hashes = actual_collision["objects"] assert len(actual_colliding_hashes) == len(DUPLICATE_CONTENTS) for content in DUPLICATE_CONTENTS: expected_content_hashes = { k: hash_to_hex(v) for k, v in content.hashes().items() } assert expected_content_hashes in actual_colliding_hashes # all objects from the src should exists in the dst storage assert isinstance(src, InMemoryStorage) # needed to help mypy assert isinstance(dst, InMemoryStorage) # needed to help mypy check_replayed(src, dst, exclude=["contents"]) # but the dst has one content more (one of the 2 colliding ones) assert ( len(list(src._cql_runner._contents.iter_all())) == len(list(dst._cql_runner._contents.iter_all())) - 1 ) def test_replay_skipped_content(replayer_storage_and_client): """Test the 'skipped_content' topic is properly replayed.""" src, replayer = replayer_storage_and_client _check_replay_skipped_content(src, replayer, "skipped_content") # utility functions def check_replayed( src: InMemoryStorage, dst: InMemoryStorage, exclude: Optional[Container] = None, expected_anonymized=False, ): """Simple utility function to compare the content of 2 in_memory storages""" def fix_expected(attr, row): if expected_anonymized: if attr == "releases": row = dataclasses.replace( row, author=row.author and row.author.anonymize() ) elif attr == "revisions": row = dataclasses.replace( row, author=row.author.anonymize(), committer=row.committer.anonymize(), ) if attr == "revisions": # the replayer should now drop the metadata attribute; see # swh/storgae/replay.py:_insert_objects() row.metadata = "null" return row for attr_ in ( "contents", "skipped_contents", "directories", "extid", "revisions", "releases", "snapshots", "origins", "origin_visits", "origin_visit_statuses", "raw_extrinsic_metadata", ): if exclude and attr_ in exclude: continue expected_objects = [ (id, nullify_ctime(fix_expected(attr_, obj))) for id, obj in sorted(getattr(src._cql_runner, f"_{attr_}").iter_all()) ] got_objects = [ (id, nullify_ctime(obj)) for id, obj in sorted(getattr(dst._cql_runner, f"_{attr_}").iter_all()) ] assert got_objects == expected_objects, f"Mismatch object list for {attr_}" def _check_replay_skipped_content(storage, replayer, topic): skipped_contents = _gen_skipped_contents(100) nb_sent = len(skipped_contents) producer = storage.journal_writer.journal.producer prefix = storage.journal_writer.journal._prefix for i, obj in enumerate(skipped_contents): producer.produce( topic=f"{prefix}.{topic}", key=key_to_kafka({"sha1": obj["sha1"]}), value=value_to_kafka(obj), ) producer.flush() dst_storage = get_storage(cls="memory") worker_fn = functools.partial(process_replay_objects, storage=dst_storage) nb_inserted = replayer.process(worker_fn) assert nb_sent == nb_inserted for content in skipped_contents: assert not storage.content_find({"sha1": content["sha1"]}) # no skipped_content_find API endpoint, so use this instead assert not list(dst_storage.skipped_content_missing(skipped_contents)) def _updated(d1, d2): d1.update(d2) d1.pop("data", None) return d1 def _gen_skipped_contents(n=10): # we do not use the hypothesis strategy here because this does not play well with # pytest fixtures, and it makes test execution very slow algos = DEFAULT_ALGORITHMS | {"length"} now = datetime.datetime.now(tz=UTC) return [ _updated( MultiHash.from_data(data=f"foo{i}".encode(), hash_names=algos).digest(), { "status": "absent", "reason": "why not", "origin": f"https://somewhere/{i}", "ctime": now, }, ) for i in range(n) ] @pytest.mark.parametrize("privileged", [True, False]) def test_storage_replay_anonymized( kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, privileged: bool, ): """Optimal replayer scenario. This: - writes objects to the topic - replayer consumes objects from the topic and replay them This tests the behavior with both a privileged and non-privileged replayer """ writer_config = { "cls": "kafka", "brokers": [kafka_server], "client_id": "kafka_writer", "prefix": kafka_prefix, "anonymize": True, } src_config: Dict[str, Any] = {"cls": "memory", "journal_writer": writer_config} storage = get_storage(**src_config) # Fill the src storage nb_sent = 0 for obj_type, objs in TEST_OBJECTS.items(): if obj_type in ("origin_visit", "origin_visit_status"): # these are unrelated with what we want to test here continue method = getattr(storage, obj_type + "_add") method(objs) nb_sent += len(objs) # Fill a destination storage from Kafka, potentially using privileged topics dst_storage = get_storage(cls="memory") deserializer = ModelObjectDeserializer( validate=False ) # we cannot validate an anonymized replay replayer = JournalClient( brokers=kafka_server, group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=nb_sent, privileged=privileged, value_deserializer=deserializer.convert, ) worker_fn = functools.partial(process_replay_objects, storage=dst_storage) nb_inserted = replayer.process(worker_fn) replayer.consumer.commit() assert nb_sent == nb_inserted # Check the contents of the destination storage, and whether the anonymization was # properly used assert isinstance(storage, InMemoryStorage) # needed to help mypy assert isinstance(dst_storage, InMemoryStorage) check_replayed(storage, dst_storage, expected_anonymized=not privileged) def test_storage_replayer_with_validation_ok( replayer_storage_and_client, caplog, redisdb ): """Optimal replayer scenario with validation activated and reporter set to a redis db. - writes objects to a source storage - replayer consumes objects from the topic and replays them - a destination storage is filled from this - nothing has been reported in the redis db - both storages should have the same content """ src, replayer = replayer_storage_and_client replayer.deserializer = ModelObjectDeserializer(validate=True, reporter=redisdb.set) # Fill Kafka using a source storage nb_sent = 0 for object_type, objects in TEST_OBJECTS.items(): method = getattr(src, object_type + "_add") method(objects) nb_sent += len(objects) # Fill the destination storage from Kafka dst = get_storage(cls="memory") worker_fn = functools.partial(process_replay_objects, storage=dst) nb_inserted = replayer.process(worker_fn) assert nb_sent == nb_inserted # check we do not have invalid objects reported invalid = 0 for record in caplog.records: logtext = record.getMessage() if WRONG_ID_REG.match(logtext): invalid += 1 assert invalid == 0, "Invalid objects should not be detected" assert not redisdb.keys() # so the dst should be the same as src storage check_replayed(cast(InMemoryStorage, src), cast(InMemoryStorage, dst)) def test_storage_replayer_with_validation_nok( replayer_storage_and_client, caplog, redisdb ): """Replayer scenario with invalid objects with validation and reporter set to a redis db. - writes objects to a source storage - replayer consumes objects from the topic and replays them - the destination storage is filled with only valid objects - the redis db contains the invalid (raw kafka mesg) objects """ src, replayer = replayer_storage_and_client replayer.value_deserializer = ModelObjectDeserializer( validate=True, reporter=redisdb.set ).convert caplog.set_level(logging.ERROR, "swh.journal.replay") # Fill Kafka using a source storage nb_sent = 0 for object_type, objects in TEST_OBJECTS.items(): method = getattr(src, object_type + "_add") method(objects) nb_sent += len(objects) # insert invalid objects for object_type in ("revision", "directory", "release", "snapshot"): method = getattr(src, object_type + "_add") method([attr.evolve(TEST_OBJECTS[object_type][0], id=b"\x00" * 20)]) nb_sent += 1 + # also add an object that won't even be possible to instantiate; this needs + # to be done at low kafka level (since we cannot instantiate the invalid model + # object...) + # we use directory[1] because it actually have some entries + dict_repr = { + # copy each dir entry twice + "entries": TEST_OBJECTS["directory"][1].to_dict()["entries"] * 2, + "id": b"\x01" * 20, + } + topic = f"{src.journal_writer.journal._prefix}.directory" + src.journal_writer.journal.send(topic, dict_repr["id"], dict_repr) + nb_sent += 1 # Fill the destination storage from Kafka dst = get_storage(cls="memory") worker_fn = functools.partial(process_replay_objects, storage=dst) nb_inserted = replayer.process(worker_fn) assert nb_sent == nb_inserted # check we do have invalid objects reported invalid = 0 for record in caplog.records: logtext = record.getMessage() if WRONG_ID_REG.match(logtext): invalid += 1 assert invalid == 4, "Invalid objects should be detected" assert set(redisdb.keys()) == { f"swh:1:{typ}:{'0'*40}".encode() for typ in ("rel", "rev", "snp", "dir") - } + } | {b"directory:" + b"01" * 20} for key in redisdb.keys(): # check the stored value looks right rawvalue = redisdb.get(key) value = kafka_to_value(rawvalue) assert isinstance(value, dict) assert "id" in value - assert value["id"] == b"\x00" * 20 + assert value["id"] in (b"\x00" * 20, b"\x01" * 20) # check that invalid objects did not reach the dst storage for attr_ in ( "directories", "revisions", "releases", "snapshots", ): for id, obj in sorted(getattr(dst._cql_runner, f"_{attr_}").iter_all()): - assert id != b"\x00" * 20 + assert id not in (b"\x00" * 20, b"\x01" * 20) + + # check that valid objects did reach the dst storage + # revisions + expected = [attr.evolve(rev, metadata=None) for rev in TEST_OBJECTS["revision"]] + result = dst.revision_get([obj.id for obj in TEST_OBJECTS["revision"]]) + assert result == expected + # releases + expected = TEST_OBJECTS["release"] + result = dst.release_get([obj.id for obj in TEST_OBJECTS["release"]]) + assert result == expected + # snapshot + # result from snapshot_get is paginated, so adapt the expected to be comparable + expected = [ + {"next_branch": None, **obj.to_dict()} for obj in TEST_OBJECTS["snapshot"] + ] + result = [dst.snapshot_get(obj.id) for obj in TEST_OBJECTS["snapshot"]] + assert result == expected + # directories + for directory in TEST_OBJECTS["directory"]: + assert set(dst.directory_get_entries(directory.id).results) == set( + directory.entries + ) def test_storage_replayer_with_validation_nok_raises( replayer_storage_and_client, caplog, redisdb ): """Replayer scenario with invalid objects with raise_on_error set to True This: - writes both valid & invalid objects to a source storage - a StorageArgumentException should be raised while replayer consumes objects from the topic and replays them """ src, replayer = replayer_storage_and_client replayer.value_deserializer = ModelObjectDeserializer( validate=True, reporter=redisdb.set, raise_on_error=True ).convert caplog.set_level(logging.ERROR, "swh.journal.replay") # Fill Kafka using a source storage nb_sent = 0 for object_type, objects in TEST_OBJECTS.items(): method = getattr(src, object_type + "_add") method(objects) nb_sent += len(objects) # insert invalid objects for object_type in ("revision", "directory", "release", "snapshot"): method = getattr(src, object_type + "_add") method([attr.evolve(TEST_OBJECTS[object_type][0], id=b"\x00" * 20)]) nb_sent += 1 # Fill the destination storage from Kafka dst = get_storage(cls="memory") worker_fn = functools.partial(process_replay_objects, storage=dst) with pytest.raises(StorageArgumentException): replayer.process(worker_fn) # check we do have invalid objects reported invalid = 0 for record in caplog.records: logtext = record.getMessage() if WRONG_ID_REG.match(logtext): invalid += 1 assert invalid == 1, "One invalid objects should be detected" assert len(redisdb.keys()) == 1