kafka_prefix = 'vagjsgkxxd.swh.journal.objects'
kafka_consumer_group = 'test-consumer-vagjsgkxxd'
kafka_server = '127.0.0.1:59361'
caplog = <_pytest.logging.LogCaptureFixture object at 0x7f6a000f4b70>
def test_storage_play_with_collision(
kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, caplog,
):
"""Another replayer scenario with collisions.
This:
- writes objects to the topic, including colliding contents
- replayer consumes objects from the topic and replay them
- This drops the colliding contents from the replay when detected
"""
kafka_prefix += ".swh.journal.objects"
storage = get_storage(**storage_config)
producer = Producer(
{
"bootstrap.servers": kafka_server,
"client.id": "test producer",
"enable.idempotence": "true",
}
)
now = datetime.datetime.now(tz=UTC)
# Fill Kafka
nb_sent = 0
nb_visits = 0
for object_type, objects in TEST_OBJECT_DICTS.items():
topic = f"{kafka_prefix}.{object_type}"
for object_ in objects:
key = bytes(random.randint(0, 255) for _ in range(40))
object_ = object_.copy()
if object_type == "content":
object_["ctime"] = now
elif object_type == "origin_visit":
nb_visits += 1
object_["visit"] = nb_visits
producer.produce(
topic=topic, key=key_to_kafka(key), value=value_to_kafka(object_),
)
nb_sent += 1
# Create collision in input data
# They are not written in the destination
for content in DUPLICATE_CONTENTS:
topic = f"{kafka_prefix}.content"
producer.produce(
topic=topic, key=key_to_kafka(key), value=value_to_kafka(content),
)
nb_sent += 1
producer.flush()
caplog.set_level(logging.ERROR, "swh.journal.replay")
# Fill the storage from Kafka
replayer = JournalClient(
brokers=kafka_server,
group_id=kafka_consumer_group,
prefix=kafka_prefix,
stop_after_objects=nb_sent,
)
worker_fn = functools.partial(process_replay_objects, storage=storage)
nb_inserted = 0
while nb_inserted < nb_sent:
nb_inserted += replayer.process(worker_fn)
assert nb_sent == nb_inserted
# Check the objects were actually inserted in the storage
> assert TEST_OBJECT_DICTS["revision"] == list(
storage.revision_get([rev["id"] for rev in TEST_OBJECT_DICTS["revision"]])
)
E AssertionError: assert [{'author': {...67892}}, ...}] == [{'author': {...67892}}, ...}]
E At index 0 diff: {'id': b'p&\xb7\xc1\xa2\xafVR\x1e\x95\x1c\x01\xed \xf2U\xfa\x05B8', 'message': b'hello', 'date': {'timestamp': {'seconds': 1234567891, 'microseconds': 0}, 'offset': 120, 'negative_utc': False}, 'committer': {'fullname': b'foo', 'name': b'foo', 'email': b''}, 'author': {'fullname': b'foo', 'name': b'foo', 'email': b''}, 'committer_date': {'timestamp': {'seconds': 1234567891, 'microseconds': 0}, 'offset': 120, 'negative_utc': False}, 'type': 'git', 'directory': b'\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01', 'synthetic...
E
E ...Full output truncated (42 lines hidden), use '-vv' to show
.tox/py3/lib/python3.7/site-packages/swh/storage/tests/test_replay.py:207: AssertionError
TEST RESULT
TEST RESULT
- Run At
- Jun 3 2020, 12:37 PM