Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_replay.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import datetime | import datetime | ||||
import functools | import functools | ||||
import logging | |||||
import random | import random | ||||
from subprocess import Popen | from subprocess import Popen | ||||
from typing import Tuple | from typing import Tuple | ||||
import dateutil | import dateutil | ||||
from confluent_kafka import Producer | from confluent_kafka import Producer | ||||
from hypothesis import strategies, given, settings | from hypothesis import strategies, given, settings | ||||
import pytest | import pytest | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.journal.client import JournalClient | from swh.journal.client import JournalClient | ||||
from swh.journal.serializers import key_to_kafka, value_to_kafka | from swh.journal.serializers import key_to_kafka, value_to_kafka | ||||
from swh.journal.replay import process_replay_objects, is_hash_in_bytearray | from swh.journal.replay import process_replay_objects, is_hash_in_bytearray | ||||
from swh.model.hashutil import hash_to_hex | |||||
from swh.model.model import Content | |||||
from .conftest import OBJECT_TYPE_KEYS | from .conftest import OBJECT_TYPE_KEYS, DUPLICATE_CONTENTS | ||||
from .utils import MockedJournalClient, MockedKafkaWriter | from .utils import MockedJournalClient, MockedKafkaWriter | ||||
storage_config = { | storage_config = { | ||||
'cls': 'pipeline', | 'cls': 'pipeline', | ||||
'steps': [ | 'steps': [ | ||||
{'cls': 'validate'}, | {'cls': 'validate'}, | ||||
{'cls': 'memory'}, | {'cls': 'memory'}, | ||||
] | ] | ||||
} | } | ||||
def make_topic(kafka_prefix: str, object_type: str) -> str: | |||||
return kafka_prefix + '.' + object_type | |||||
def test_storage_play( | def test_storage_play( | ||||
kafka_prefix: str, | kafka_prefix: str, | ||||
kafka_consumer_group: str, | kafka_consumer_group: str, | ||||
kafka_server: Tuple[Popen, int]): | kafka_server: Tuple[Popen, int], | ||||
caplog): | |||||
"""Optimal replayer scenario. | |||||
This: | |||||
- writes objects to the topic | |||||
- replayer consumes objects from the topic and replay them | |||||
""" | |||||
(_, port) = kafka_server | |||||
kafka_prefix += '.swh.journal.objects' | |||||
storage = get_storage(**storage_config) | |||||
producer = Producer({ | |||||
'bootstrap.servers': 'localhost:{}'.format(port), | |||||
'client.id': 'test producer', | |||||
'enable.idempotence': 'true', | |||||
}) | |||||
now = datetime.datetime.now(tz=datetime.timezone.utc) | |||||
# Fill Kafka | |||||
nb_sent = 0 | |||||
nb_visits = 0 | |||||
for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): | |||||
topic = make_topic(kafka_prefix, object_type) | |||||
for object_ in objects: | |||||
key = bytes(random.randint(0, 255) for _ in range(40)) | |||||
object_ = object_.copy() | |||||
if object_type == 'content': | |||||
object_['ctime'] = now | |||||
elif object_type == 'origin_visit': | |||||
nb_visits += 1 | |||||
object_['visit'] = nb_visits | |||||
producer.produce( | |||||
topic=topic, key=key_to_kafka(key), | |||||
value=value_to_kafka(object_), | |||||
) | |||||
nb_sent += 1 | |||||
producer.flush() | |||||
caplog.set_level(logging.ERROR, 'swh.journal.replay') | |||||
# Fill the storage from Kafka | |||||
replayer = JournalClient( | |||||
brokers='localhost:%d' % kafka_server[1], | |||||
group_id=kafka_consumer_group, | |||||
prefix=kafka_prefix, | |||||
stop_after_objects=nb_sent, | |||||
) | |||||
worker_fn = functools.partial(process_replay_objects, storage=storage) | |||||
nb_inserted = 0 | |||||
while nb_inserted < nb_sent: | |||||
nb_inserted += replayer.process(worker_fn) | |||||
assert nb_sent == nb_inserted | |||||
# Check the objects were actually inserted in the storage | |||||
assert OBJECT_TYPE_KEYS['revision'][1] == \ | |||||
list(storage.revision_get( | |||||
[rev['id'] for rev in OBJECT_TYPE_KEYS['revision'][1]])) | |||||
assert OBJECT_TYPE_KEYS['release'][1] == \ | |||||
list(storage.release_get( | |||||
[rel['id'] for rel in OBJECT_TYPE_KEYS['release'][1]])) | |||||
origins = list(storage.origin_get( | |||||
[orig for orig in OBJECT_TYPE_KEYS['origin'][1]])) | |||||
assert OBJECT_TYPE_KEYS['origin'][1] == \ | |||||
[{'url': orig['url']} for orig in origins] | |||||
for origin in origins: | |||||
origin_url = origin['url'] | |||||
expected_visits = [ | |||||
{ | |||||
**visit, | |||||
'origin': origin_url, | |||||
'date': dateutil.parser.parse(visit['date']), | |||||
} | |||||
for visit in OBJECT_TYPE_KEYS['origin_visit'][1] | |||||
if visit['origin'] == origin['url'] | |||||
] | |||||
actual_visits = list(storage.origin_visit_get( | |||||
origin_url)) | |||||
for visit in actual_visits: | |||||
del visit['visit'] # opaque identifier | |||||
assert expected_visits == actual_visits | |||||
input_contents = OBJECT_TYPE_KEYS['content'][1] | |||||
contents = storage.content_get_metadata( | |||||
[cont['sha1'] for cont in input_contents]) | |||||
assert len(contents) == len(input_contents) | |||||
assert contents == {cont['sha1']: [cont] for cont in input_contents} | |||||
olasd: That should be done before the log messages are emitted, not that late. I guess this works… | |||||
Done Inline ActionsRight. ardumont: Right. | |||||
collision = 0 | |||||
for record in caplog.records: | |||||
logtext = record.getMessage() | |||||
if 'Colliding contents:' in logtext: | |||||
collision += 1 | |||||
Not Done Inline ActionsCould you also check that the hashes are available in the log arguments? olasd: Could you also check that the hashes are available in the log arguments? | |||||
Done Inline ActionsRight, currently doing it. ardumont: Right, currently doing it. | |||||
assert collision == 0, "No collision should be detected" | |||||
def test_storage_play_with_collision( | |||||
kafka_prefix: str, | |||||
kafka_consumer_group: str, | |||||
kafka_server: Tuple[Popen, int], | |||||
caplog): | |||||
"""Another replayer scenario with collisions. | |||||
This: | |||||
- writes objects to the topic, including colliding contents | |||||
- replayer consumes objects from the topic and replay them | |||||
- This drops the colliding contents from the replay when detected | |||||
""" | |||||
(_, port) = kafka_server | (_, port) = kafka_server | ||||
kafka_prefix += '.swh.journal.objects' | kafka_prefix += '.swh.journal.objects' | ||||
storage = get_storage(**storage_config) | storage = get_storage(**storage_config) | ||||
producer = Producer({ | producer = Producer({ | ||||
'bootstrap.servers': 'localhost:{}'.format(port), | 'bootstrap.servers': 'localhost:{}'.format(port), | ||||
'client.id': 'test producer', | 'client.id': 'test producer', | ||||
'enable.idempotence': 'true', | 'enable.idempotence': 'true', | ||||
}) | }) | ||||
now = datetime.datetime.now(tz=datetime.timezone.utc) | now = datetime.datetime.now(tz=datetime.timezone.utc) | ||||
# Fill Kafka | # Fill Kafka | ||||
nb_sent = 0 | nb_sent = 0 | ||||
nb_visits = 0 | nb_visits = 0 | ||||
for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): | for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): | ||||
topic = kafka_prefix + '.' + object_type | topic = make_topic(kafka_prefix, object_type) | ||||
for object_ in objects: | for object_ in objects: | ||||
key = bytes(random.randint(0, 255) for _ in range(40)) | key = bytes(random.randint(0, 255) for _ in range(40)) | ||||
object_ = object_.copy() | object_ = object_.copy() | ||||
if object_type == 'content': | if object_type == 'content': | ||||
object_['ctime'] = now | object_['ctime'] = now | ||||
elif object_type == 'origin_visit': | elif object_type == 'origin_visit': | ||||
nb_visits += 1 | nb_visits += 1 | ||||
object_['visit'] = nb_visits | object_['visit'] = nb_visits | ||||
producer.produce( | producer.produce( | ||||
topic=topic, key=key_to_kafka(key), | topic=topic, key=key_to_kafka(key), | ||||
value=value_to_kafka(object_), | value=value_to_kafka(object_), | ||||
) | ) | ||||
nb_sent += 1 | nb_sent += 1 | ||||
# Create collision in input data | |||||
# They are not written in the destination | |||||
for content in DUPLICATE_CONTENTS: | |||||
topic = make_topic(kafka_prefix, 'content') | |||||
producer.produce( | |||||
topic=topic, key=key_to_kafka(key), | |||||
value=value_to_kafka(content), | |||||
) | |||||
nb_sent += 1 | |||||
producer.flush() | producer.flush() | ||||
caplog.set_level(logging.ERROR, 'swh.journal.replay') | |||||
# Fill the storage from Kafka | # Fill the storage from Kafka | ||||
replayer = JournalClient( | replayer = JournalClient( | ||||
brokers='localhost:%d' % kafka_server[1], | brokers='localhost:%d' % kafka_server[1], | ||||
group_id=kafka_consumer_group, | group_id=kafka_consumer_group, | ||||
prefix=kafka_prefix, | prefix=kafka_prefix, | ||||
stop_after_objects=nb_sent, | stop_after_objects=nb_sent, | ||||
) | ) | ||||
worker_fn = functools.partial(process_replay_objects, storage=storage) | worker_fn = functools.partial(process_replay_objects, storage=storage) | ||||
Show All 32 Lines | for origin in origins: | ||||
assert expected_visits == actual_visits | assert expected_visits == actual_visits | ||||
input_contents = OBJECT_TYPE_KEYS['content'][1] | input_contents = OBJECT_TYPE_KEYS['content'][1] | ||||
contents = storage.content_get_metadata( | contents = storage.content_get_metadata( | ||||
[cont['sha1'] for cont in input_contents]) | [cont['sha1'] for cont in input_contents]) | ||||
assert len(contents) == len(input_contents) | assert len(contents) == len(input_contents) | ||||
assert contents == {cont['sha1']: [cont] for cont in input_contents} | assert contents == {cont['sha1']: [cont] for cont in input_contents} | ||||
collision = 0 | |||||
actual_hashes = {} | |||||
for record in caplog.records: | |||||
logtext = record.getMessage() | |||||
if 'Colliding contents' in logtext: | |||||
collision += 1 | |||||
actual_hashes = record.args['hashes'] | |||||
assert collision == 1, "1 collision should be detected" | |||||
algo = 'sha1' | |||||
expected_colliding_hash = hash_to_hex(DUPLICATE_CONTENTS[0][algo]) | |||||
expected_hash_key = f'{algo}-{expected_colliding_hash}' | |||||
assert list(actual_hashes.keys()) == [expected_hash_key] | |||||
actual_colliding_hashes = actual_hashes[expected_hash_key] | |||||
assert len(actual_colliding_hashes) == len(DUPLICATE_CONTENTS) | |||||
for content in DUPLICATE_CONTENTS: | |||||
expected_content_hashes = Content.from_dict(content).hashes() | |||||
assert expected_content_hashes in actual_colliding_hashes | |||||
olasdUnsubmitted Not Done Inline ActionsI guess changing the schema to fixed keys will make this test a little bit simpler as well olasd: I guess changing the schema to fixed keys will make this test a little bit simpler as well | |||||
def _test_write_replay_origin_visit(visits): | def _test_write_replay_origin_visit(visits): | ||||
"""Helper function to write tests for origin_visit. | """Helper function to write tests for origin_visit. | ||||
Each visit (a dict) given in the 'visits' argument will be sent to | Each visit (a dict) given in the 'visits' argument will be sent to | ||||
a (mocked) kafka queue, which a in-memory-storage backed replayer is | a (mocked) kafka queue, which a in-memory-storage backed replayer is | ||||
listening to. | listening to. | ||||
▲ Show 20 Lines • Show All 115 Lines • Show Last 20 Lines |
That should be done before the log messages are emitted, not that late. I guess this works because capturing error messages is the default.