replayer_storage_and_client = (<swh.storage.in_memory.InMemoryStorage object at 0x7f9ed01a5e48>, <swh.journal.client.JournalClient object at 0x7f9ed019e898>)
caplog = <_pytest.logging.LogCaptureFixture object at 0x7f9ed02100f0>
def test_storage_play_with_collision(replayer_storage_and_client, caplog):
"""Another replayer scenario with collisions.
This:
- writes objects to the topic, including colliding contents
- replayer consumes objects from the topic and replay them
- This drops the colliding contents from the replay when detected
"""
src, replayer = replayer_storage_and_client
# Fill Kafka using a source storage
nb_sent = 0
for object_type, objects in TEST_OBJECTS.items():
if object_type == "origin_visit":
for visit in objects:
src.origin_visit_add(
origin_url=visit.origin, date=visit.date, type=visit.type
)
else:
method = getattr(src, object_type + "_add")
method(objects)
nb_sent += len(objects)
# Create collision in input data
# These should not be written in the destination
producer = src.journal_writer.journal.producer
prefix = src.journal_writer.journal._prefix
for content in DUPLICATE_CONTENTS:
topic = f"{prefix}.content"
key = content["sha1"]
producer.produce(
topic=topic, key=key_to_kafka(key), value=value_to_kafka(content),
)
nb_sent += 1
producer.flush()
caplog.set_level(logging.ERROR, "swh.journal.replay")
# Fill the destination storage from Kafka
dst = get_storage(cls="memory")
worker_fn = functools.partial(process_replay_objects, storage=dst)
nb_inserted = replayer.process(worker_fn)
> assert nb_sent == nb_inserted
E assert 33 == 38
E +33
E -38
.tox/py3/lib/python3.7/site-packages/swh/storage/tests/test_replay.py:147: AssertionError
TEST RESULT
TEST RESULT
- Run At
- Jun 9 2020, 11:19 AM