Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_replay.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import datetime | import datetime | ||||
import functools | import functools | ||||
import random | import random | ||||
from subprocess import Popen | from subprocess import Popen | ||||
from typing import Tuple | from typing import Tuple | ||||
import dateutil | import dateutil | ||||
from confluent_kafka import Producer | from confluent_kafka import Producer | ||||
from hypothesis import strategies, given, settings | from hypothesis import strategies, given, settings | ||||
import pytest | import pytest | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.journal.client import JournalClient | from swh.journal.client import JournalClient | ||||
from swh.journal.serializers import key_to_kafka, value_to_kafka | from swh.journal.serializers import key_to_kafka, value_to_kafka | ||||
from swh.journal.replay import process_replay_objects, is_hash_in_bytearray | from swh.journal.replay import process_replay_objects, is_hash_in_bytearray | ||||
from .conftest import OBJECT_TYPE_KEYS | from .conftest import OBJECT_TYPE_KEYS, DUPLICATE_CONTENTS | ||||
from .utils import MockedJournalClient, MockedKafkaWriter | from .utils import MockedJournalClient, MockedKafkaWriter | ||||
storage_config = { | storage_config = { | ||||
'cls': 'pipeline', | 'cls': 'pipeline', | ||||
'steps': [ | 'steps': [ | ||||
{'cls': 'validate'}, | {'cls': 'validate'}, | ||||
{'cls': 'memory'}, | {'cls': 'memory'}, | ||||
Show All 13 Lines | def test_storage_play( | ||||
producer = Producer({ | producer = Producer({ | ||||
'bootstrap.servers': 'localhost:{}'.format(port), | 'bootstrap.servers': 'localhost:{}'.format(port), | ||||
'client.id': 'test producer', | 'client.id': 'test producer', | ||||
'enable.idempotence': 'true', | 'enable.idempotence': 'true', | ||||
}) | }) | ||||
now = datetime.datetime.now(tz=datetime.timezone.utc) | now = datetime.datetime.now(tz=datetime.timezone.utc) | ||||
def make_topic(kafka_prefix: str, object_type: str) -> str: | |||||
return kafka_prefix + '.' + object_type | |||||
# Fill Kafka | # Fill Kafka | ||||
nb_sent = 0 | nb_sent = 0 | ||||
nb_visits = 0 | nb_visits = 0 | ||||
for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): | for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): | ||||
topic = kafka_prefix + '.' + object_type | topic = make_topic(kafka_prefix, object_type) | ||||
for object_ in objects: | for object_ in objects: | ||||
key = bytes(random.randint(0, 255) for _ in range(40)) | key = bytes(random.randint(0, 255) for _ in range(40)) | ||||
object_ = object_.copy() | object_ = object_.copy() | ||||
if object_type == 'content': | if object_type == 'content': | ||||
object_['ctime'] = now | object_['ctime'] = now | ||||
elif object_type == 'origin_visit': | elif object_type == 'origin_visit': | ||||
nb_visits += 1 | nb_visits += 1 | ||||
object_['visit'] = nb_visits | object_['visit'] = nb_visits | ||||
producer.produce( | producer.produce( | ||||
topic=topic, key=key_to_kafka(key), | topic=topic, key=key_to_kafka(key), | ||||
value=value_to_kafka(object_), | value=value_to_kafka(object_), | ||||
) | ) | ||||
nb_sent += 1 | nb_sent += 1 | ||||
# Create collision in input data | |||||
# They are not written in the destination | |||||
for content in DUPLICATE_CONTENTS: | |||||
topic = make_topic(kafka_prefix, 'content') | |||||
producer.produce( | |||||
topic=topic, key=key_to_kafka(key), | |||||
value=value_to_kafka(content), | |||||
) | |||||
nb_sent += 1 | |||||
producer.flush() | producer.flush() | ||||
# Fill the storage from Kafka | # Fill the storage from Kafka | ||||
replayer = JournalClient( | replayer = JournalClient( | ||||
brokers='localhost:%d' % kafka_server[1], | brokers='localhost:%d' % kafka_server[1], | ||||
group_id=kafka_consumer_group, | group_id=kafka_consumer_group, | ||||
prefix=kafka_prefix, | prefix=kafka_prefix, | ||||
stop_after_objects=nb_sent, | stop_after_objects=nb_sent, | ||||
Show All 33 Lines | for origin in origins: | ||||
del visit['visit'] # opaque identifier | del visit['visit'] # opaque identifier | ||||
assert expected_visits == actual_visits | assert expected_visits == actual_visits | ||||
input_contents = OBJECT_TYPE_KEYS['content'][1] | input_contents = OBJECT_TYPE_KEYS['content'][1] | ||||
contents = storage.content_get_metadata( | contents = storage.content_get_metadata( | ||||
[cont['sha1'] for cont in input_contents]) | [cont['sha1'] for cont in input_contents]) | ||||
assert len(contents) == len(input_contents) | assert len(contents) == len(input_contents) | ||||
assert contents == {cont['sha1']: [cont] for cont in input_contents} | assert contents == {cont['sha1']: [cont] for cont in input_contents} | ||||
olasd: That should be done before the log messages are emitted, not that late. I guess this works… | |||||
Done Inline ActionsRight. ardumont: Right. | |||||
def _test_write_replay_origin_visit(visits): | def _test_write_replay_origin_visit(visits): | ||||
"""Helper function to write tests for origin_visit. | """Helper function to write tests for origin_visit. | ||||
Each visit (a dict) given in the 'visits' argument will be sent to | Each visit (a dict) given in the 'visits' argument will be sent to | ||||
Not Done Inline ActionsCould you also check that the hashes are available in the log arguments? olasd: Could you also check that the hashes are available in the log arguments? | |||||
Done Inline ActionsRight, currently doing it. ardumont: Right, currently doing it. | |||||
a (mocked) kafka queue, which a in-memory-storage backed replayer is | a (mocked) kafka queue, which a in-memory-storage backed replayer is | ||||
listening to. | listening to. | ||||
Check that corresponding origin visits entities are present in the storage | Check that corresponding origin visits entities are present in the storage | ||||
and have correct values. | and have correct values. | ||||
""" | """ | ||||
queue = [] | queue = [] | ||||
▲ Show 20 Lines • Show All 101 Lines • ▼ Show 20 Lines | |||||
@settings(max_examples=500) | @settings(max_examples=500) | ||||
@given(strategies.sets(hash_strategy, min_size=0, max_size=500), | @given(strategies.sets(hash_strategy, min_size=0, max_size=500), | ||||
strategies.sets(hash_strategy, min_size=10)) | strategies.sets(hash_strategy, min_size=10)) | ||||
def test_is_hash_in_bytearray(haystack, needles): | def test_is_hash_in_bytearray(haystack, needles): | ||||
array = b''.join(sorted(haystack)) | array = b''.join(sorted(haystack)) | ||||
needles |= haystack # Exhaustively test for all objects in the array | needles |= haystack # Exhaustively test for all objects in the array | ||||
for needle in needles: | for needle in needles: | ||||
assert is_hash_in_bytearray(needle, array, len(haystack)) == \ | assert is_hash_in_bytearray(needle, array, len(haystack)) == \ | ||||
(needle in haystack) | (needle in haystack) | ||||
Not Done Inline ActionsI guess changing the schema to fixed keys will make this test a little bit simpler as well olasd: I guess changing the schema to fixed keys will make this test a little bit simpler as well |
That should be done before the log messages are emitted, not that late. I guess this works because capturing error messages is the default.