Page MenuHomeSoftware Heritage

Jenkins > .tox.py3.lib.python3.7.site-packages.swh.storage.tests.test_replay::test_storage_play_with_collision
Failed

TEST RESULT

Run At
Jun 3 2020, 12:37 PM
Details
kafka_prefix = 'vagjsgkxxd.swh.journal.objects' kafka_consumer_group = 'test-consumer-vagjsgkxxd' kafka_server = '127.0.0.1:59361' caplog = <_pytest.logging.LogCaptureFixture object at 0x7f6a000f4b70> def test_storage_play_with_collision( kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, caplog, ): """Another replayer scenario with collisions. This: - writes objects to the topic, including colliding contents - replayer consumes objects from the topic and replay them - This drops the colliding contents from the replay when detected """ kafka_prefix += ".swh.journal.objects" storage = get_storage(**storage_config) producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test producer", "enable.idempotence": "true", } ) now = datetime.datetime.now(tz=UTC) # Fill Kafka nb_sent = 0 nb_visits = 0 for object_type, objects in TEST_OBJECT_DICTS.items(): topic = f"{kafka_prefix}.{object_type}" for object_ in objects: key = bytes(random.randint(0, 255) for _ in range(40)) object_ = object_.copy() if object_type == "content": object_["ctime"] = now elif object_type == "origin_visit": nb_visits += 1 object_["visit"] = nb_visits producer.produce( topic=topic, key=key_to_kafka(key), value=value_to_kafka(object_), ) nb_sent += 1 # Create collision in input data # They are not written in the destination for content in DUPLICATE_CONTENTS: topic = f"{kafka_prefix}.content" producer.produce( topic=topic, key=key_to_kafka(key), value=value_to_kafka(content), ) nb_sent += 1 producer.flush() caplog.set_level(logging.ERROR, "swh.journal.replay") # Fill the storage from Kafka replayer = JournalClient( brokers=kafka_server, group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=nb_sent, ) worker_fn = functools.partial(process_replay_objects, storage=storage) nb_inserted = 0 while nb_inserted < nb_sent: nb_inserted += replayer.process(worker_fn) assert nb_sent == nb_inserted # Check the objects were actually inserted in the storage > assert TEST_OBJECT_DICTS["revision"] == list( storage.revision_get([rev["id"] for rev in TEST_OBJECT_DICTS["revision"]]) ) E AssertionError: assert [{'author': {...67892}}, ...}] == [{'author': {...67892}}, ...}] E At index 0 diff: {'id': b'p&\xb7\xc1\xa2\xafVR\x1e\x95\x1c\x01\xed \xf2U\xfa\x05B8', 'message': b'hello', 'date': {'timestamp': {'seconds': 1234567891, 'microseconds': 0}, 'offset': 120, 'negative_utc': False}, 'committer': {'fullname': b'foo', 'name': b'foo', 'email': b''}, 'author': {'fullname': b'foo', 'name': b'foo', 'email': b''}, 'committer_date': {'timestamp': {'seconds': 1234567891, 'microseconds': 0}, 'offset': 120, 'negative_utc': False}, 'type': 'git', 'directory': b'\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01', 'synthetic... E E ...Full output truncated (42 lines hidden), use '-vv' to show .tox/py3/lib/python3.7/site-packages/swh/storage/tests/test_replay.py:207: AssertionError