diff --git a/requirements-test.txt b/requirements-test.txt index 6d644bb..7587ec0 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,4 +1,4 @@ pytest -swh.model +swh.model >= 0.0.32 pytest-kafka hypothesis diff --git a/swh/journal/replay.py b/swh/journal/replay.py index eb894b6..ae446ce 100644 --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -1,74 +1,77 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from kafka import KafkaConsumer +from swh.storage import HashCollision + from .serializers import kafka_to_value logger = logging.getLogger(__name__) OBJECT_TYPES = frozenset([ 'origin', 'origin_visit', 'snapshot', 'release', 'revision', 'directory', 'content', ]) class StorageReplayer: def __init__(self, brokers, prefix, consumer_id, object_types=OBJECT_TYPES): if not set(object_types).issubset(OBJECT_TYPES): raise ValueError('Unknown object types: %s' % ', '.join( set(object_types) - OBJECT_TYPES)) self._object_types = object_types self.consumer = KafkaConsumer( bootstrap_servers=brokers, value_deserializer=kafka_to_value, auto_offset_reset='earliest', enable_auto_commit=False, group_id=consumer_id, ) self.consumer.subscribe( topics=['%s.%s' % (prefix, object_type) for object_type in object_types], ) def poll(self): yield from self.consumer def fill(self, storage, max_messages=None): num = 0 for message in self.poll(): object_type = message.topic.split('.')[-1] # Got a message from a topic we did not subscribe to. assert object_type in self._object_types, object_type self.insert_object(storage, object_type, message.value) num += 1 if max_messages and num >= max_messages: break return num def insert_object(self, storage, object_type, object_): if object_type in ('content', 'directory', 'revision', 'release', 'snapshot', 'origin'): if object_type == 'content': - method = storage.content_add_metadata + try: + storage.content_add_metadata([object_]) + except HashCollision as e: + logger.error('Hash collision: %s', e.args) else: method = getattr(storage, object_type + '_add') - method([object_]) + method([object_]) elif object_type == 'origin_visit': - origin_id = storage.origin_add_one(object_.pop('origin')) - visit = storage.origin_visit_add( - origin=origin_id, date=object_.pop('date')) - storage.origin_visit_update( - origin_id, visit['visit'], **object_) + storage.origin_visit_upsert([{ + **object_, + 'origin': storage.origin_add_one(object_['origin'])}]) else: assert False diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py index 3e2f133..cdf6cc7 100644 --- a/swh/journal/tests/test_replay.py +++ b/swh/journal/tests/test_replay.py @@ -1,92 +1,96 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import random from subprocess import Popen from typing import Tuple import dateutil from kafka import KafkaProducer from swh.storage import get_storage from swh.journal.serializers import key_to_kafka, value_to_kafka from swh.journal.replay import StorageReplayer from .conftest import OBJECT_TYPE_KEYS def test_storage_play( kafka_prefix: str, kafka_server: Tuple[Popen, int]): (_, port) = kafka_server kafka_prefix += '.swh.journal.objects' storage = get_storage('memory', {}) producer = KafkaProducer( bootstrap_servers='localhost:{}'.format(port), key_serializer=key_to_kafka, value_serializer=value_to_kafka, client_id='test producer', ) now = datetime.datetime.now(tz=datetime.timezone.utc) # Fill Kafka nb_sent = 0 + nb_visits = 0 for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): topic = kafka_prefix + '.' + object_type for object_ in objects: key = bytes(random.randint(0, 255) for _ in range(40)) object_ = object_.copy() if object_type == 'content': object_['ctime'] = now + elif object_type == 'origin_visit': + nb_visits += 1 + object_['visit'] = nb_visits producer.send(topic, key=key, value=object_) nb_sent += 1 # Fill the storage from Kafka config = { 'brokers': 'localhost:%d' % kafka_server[1], 'consumer_id': 'replayer', 'prefix': kafka_prefix, } replayer = StorageReplayer(**config) nb_inserted = replayer.fill(storage, max_messages=nb_sent) assert nb_sent == nb_inserted # Check the objects were actually inserted in the storage assert OBJECT_TYPE_KEYS['revision'][1] == \ list(storage.revision_get( [rev['id'] for rev in OBJECT_TYPE_KEYS['revision'][1]])) assert OBJECT_TYPE_KEYS['release'][1] == \ list(storage.release_get( [rel['id'] for rel in OBJECT_TYPE_KEYS['release'][1]])) origins = list(storage.origin_get( [orig for orig in OBJECT_TYPE_KEYS['origin'][1]])) assert OBJECT_TYPE_KEYS['origin'][1] == \ [{'url': orig['url'], 'type': orig['type']} for orig in origins] for origin in origins: expected_visits = [ { **visit, 'origin': origin['id'], 'date': dateutil.parser.parse(visit['date']), } for visit in OBJECT_TYPE_KEYS['origin_visit'][1] if visit['origin']['url'] == origin['url'] and visit['origin']['type'] == origin['type'] ] actual_visits = list(storage.origin_visit_get(origin['id'])) for visit in actual_visits: del visit['visit'] # opaque identifier assert expected_visits == actual_visits contents = list(storage.content_get_metadata( [cont['sha1'] for cont in OBJECT_TYPE_KEYS['content'][1]])) assert None not in contents assert contents == OBJECT_TYPE_KEYS['content'][1] diff --git a/swh/journal/tests/test_write_replay.py b/swh/journal/tests/test_write_replay.py index aa3f79c..03d2bc6 100644 --- a/swh/journal/tests/test_write_replay.py +++ b/swh/journal/tests/test_write_replay.py @@ -1,91 +1,75 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import namedtuple from hypothesis import given, settings, HealthCheck -from hypothesis.strategies import lists, one_of, composite +from hypothesis.strategies import lists -from swh.model.hashutil import MultiHash +from swh.model.hypothesis_strategies import object_dicts from swh.storage.in_memory import Storage -from swh.storage.tests.algos.test_snapshot import snapshots, origins -from swh.storage.tests.generate_data_test import gen_raw_content +from swh.storage import HashCollision from swh.journal.serializers import ( key_to_kafka, kafka_to_key, value_to_kafka, kafka_to_value) from swh.journal.direct_writer import DirectKafkaWriter from swh.journal.replay import StorageReplayer, OBJECT_TYPES FakeKafkaMessage = namedtuple('FakeKafkaMessage', 'topic key value') class MockedDirectKafkaWriter(DirectKafkaWriter): def __init__(self): self._prefix = 'prefix' class MockedStorageReplayer(StorageReplayer): def __init__(self, object_types=OBJECT_TYPES): self._object_types = object_types -@composite -def contents(draw): - """Generate valid and consistent content. - - Context: Test purposes - - Args: - **draw**: Used by hypothesis to generate data - - Returns: - dict representing a content. - - """ - raw_content = draw(gen_raw_content()) - return { - 'data': raw_content, - 'length': len(raw_content), - 'status': 'visible', - **MultiHash.from_data(raw_content).digest() - } - - -objects = lists(one_of( - origins().map(lambda x: ('origin', x)), - snapshots().map(lambda x: ('snapshot', x)), - contents().map(lambda x: ('content', x)), -)) - - -@given(objects) +@given(lists(object_dicts())) @settings(suppress_health_check=[HealthCheck.too_slow]) def test_write_replay_same_order(objects): queue = [] def send(topic, key, value): key = kafka_to_key(key_to_kafka(key)) value = kafka_to_value(value_to_kafka(value)) queue.append(FakeKafkaMessage(topic=topic, key=key, value=value)) def poll(): yield from queue storage1 = Storage() storage1.journal_writer = MockedDirectKafkaWriter() storage1.journal_writer.send = send for (obj_type, obj) in objects: - method = getattr(storage1, obj_type + '_add') - method([obj]) + obj = obj.copy() + if obj_type == 'origin_visit': + origin_id = storage1.origin_add_one(obj.pop('origin')) + if 'visit' in obj: + del obj['visit'] + storage1.origin_visit_add(origin_id, **obj) + else: + method = getattr(storage1, obj_type + '_add') + try: + method([obj]) + except HashCollision: + pass storage2 = Storage() replayer = MockedStorageReplayer() replayer.poll = poll replayer.fill(storage2) - for attr in ('_contents', '_directories', '_revisions', '_releases', - '_snapshots', '_origin_visits', '_origins'): - assert getattr(storage1, attr) == getattr(storage2, attr), attr + for attr_name in ('_contents', '_directories', '_revisions', '_releases', + '_snapshots', '_origin_visits', '_origins'): + assert getattr(storage1, attr_name) == getattr(storage2, attr_name), \ + attr_name + + +# TODO: add test for hash collision