diff --git a/swh/journal/replay.py b/swh/journal/replay.py new file mode 100644 --- /dev/null +++ b/swh/journal/replay.py @@ -0,0 +1,70 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import logging + +from kafka import KafkaConsumer + +from .serializers import kafka_to_value + +logger = logging.getLogger(__name__) + + +OBJECT_TYPES = frozenset([ + 'origin', 'origin_visit', 'snapshot', 'release', 'revision', + 'directory', 'content', +]) + + +class StorageReplayer: + def __init__(self, brokers, prefix, consumer_id, + object_types=OBJECT_TYPES): + if not set(object_types).issubset(OBJECT_TYPES): + raise ValueError('Unknown object types: %s' % ', '.join( + set(object_types) - OBJECT_TYPES)) + + self._object_types = object_types + self.consumer = KafkaConsumer( + bootstrap_servers=brokers, + value_deserializer=kafka_to_value, + auto_offset_reset='earliest', + enable_auto_commit=False, + group_id=consumer_id, + ) + self.consumer.subscribe( + topics=['%s.%s' % (prefix, object_type) + for object_type in object_types], + ) + + def fill(self, storage, max_messages): + num = 0 + for message in self.consumer: + object_type = message.topic.split('.')[-1] + + # Got a message from a topic we did not subscribe to. + assert object_type in self._object_types, object_type + + self.insert_object(storage, object_type, message.value) + + num += 1 + if num >= max_messages: + break + return num + + def insert_object(self, storage, object_type, object_): + if object_type in ('content', 'directory', 'revision', 'release', + 'origin'): + if object_type == 'content': + # TODO: we don't write contents in Kafka, so we need to + # find a way to insert them somehow. + object_['status'] = 'absent' + method = getattr(storage, object_type + '_add') + method([object_]) + elif object_type == 'origin_visit': + origin_id = storage.origin_add_one(object_.pop('origin')) + storage.origin_visit_add(origin=origin_id, **object_) + method = getattr(storage, object_type + '_add') + else: + assert False diff --git a/swh/journal/serializers.py b/swh/journal/serializers.py --- a/swh/journal/serializers.py +++ b/swh/journal/serializers.py @@ -5,6 +5,8 @@ import msgpack +from swh.core.api.serializers import msgpack_dumps, msgpack_loads + def key_to_kafka(key): """Serialize a key, possibly a dict, in a predictable way""" @@ -22,9 +24,9 @@ def value_to_kafka(value): """Serialize some data for storage in kafka""" - return msgpack.dumps(value, use_bin_type=True) + return msgpack_dumps(value) def kafka_to_value(kafka_value): """Deserialize some data stored in kafka""" - return msgpack.loads(kafka_value) + return msgpack_loads(kafka_value) diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py --- a/swh/journal/tests/conftest.py +++ b/swh/journal/tests/conftest.py @@ -37,11 +37,11 @@ COMMITTER = [ { 'id': 1, - 'fullname': 'foo', + 'fullname': b'foo', }, { 'id': 2, - 'fullname': 'bar', + 'fullname': b'bar', } ] @@ -107,9 +107,11 @@ ORIGIN_VISITS = [ { + 'origin': ORIGINS[0], 'date': '2013-05-07T04:20:39.369271+00:00', }, { + 'origin': ORIGINS[0], 'date': '2018-11-27T17:20:39.000000+00:00', } ] @@ -119,6 +121,8 @@ 'content': ('sha1', CONTENTS), 'revision': ('id', REVISIONS), 'release': ('id', RELEASES), + 'origin': (None, ORIGINS), + 'origin_visit': (None, ORIGIN_VISITS), } diff --git a/swh/journal/tests/test_publisher_kafka.py b/swh/journal/tests/test_publisher_kafka.py --- a/swh/journal/tests/test_publisher_kafka.py +++ b/swh/journal/tests/test_publisher_kafka.py @@ -33,8 +33,12 @@ # object type's id label key object_key_id, expected_objects = OBJECT_TYPE_KEYS[object_type] # objects to send to the publisher - objects = [{object_key_id: c[object_key_id]} - for c in expected_objects] + if object_key_id: + objects = [{object_key_id: c[object_key_id]} + for c in expected_objects] + else: + # TODO: add support for origin and origin_visit + return # send message to the publisher for obj in objects: diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py new file mode 100644 --- /dev/null +++ b/swh/journal/tests/test_replay.py @@ -0,0 +1,82 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import random +from subprocess import Popen +from typing import Tuple + +import dateutil +from kafka import KafkaProducer + +from swh.storage import get_storage + +from swh.journal.serializers import key_to_kafka, value_to_kafka +from swh.journal.replay import StorageReplayer + +from .conftest import OBJECT_TYPE_KEYS + + +def test_storage_play( + kafka_server: Tuple[Popen, int]): + (_, port) = kafka_server + + storage = get_storage('memory', {}) + + producer = KafkaProducer( + bootstrap_servers='localhost:{}'.format(port), + key_serializer=key_to_kafka, + value_serializer=value_to_kafka, + client_id='test producer', + ) + + # Fill Kafka + nb_sent = 0 + for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): + topic = 'swh.journal.objects.' + object_type + for object_ in objects: + key = bytes(random.randint(0, 255) for _ in range(40)) + producer.send(topic, key=key, value=object_) + nb_sent += 1 + + # Fill the storage from Kafka + config = { + 'brokers': 'localhost:%d' % kafka_server[1], + 'consumer_id': 'replayer', + 'prefix': 'swh.journal.objects', + } + replayer = StorageReplayer(**config) + nb_inserted = replayer.fill(storage, max_messages=nb_sent) + assert nb_sent == nb_inserted + + # Check the objects were actually inserted in the storage + assert OBJECT_TYPE_KEYS['revision'][1] == \ + list(storage.revision_get( + [rev['id'] for rev in OBJECT_TYPE_KEYS['revision'][1]])) + assert OBJECT_TYPE_KEYS['release'][1] == \ + list(storage.release_get( + [rel['id'] for rel in OBJECT_TYPE_KEYS['release'][1]])) + + origins = list(storage.origin_get( + [orig for orig in OBJECT_TYPE_KEYS['origin'][1]])) + assert OBJECT_TYPE_KEYS['origin'][1] == \ + [{'url': orig['url'], 'type': orig['type']} for orig in origins] + for origin in origins: + expected_visits = [ + { + 'origin': origin['id'], + 'date': dateutil.parser.parse(visit['date']), + 'snapshot': None, # TODO + 'status': 'ongoing', # TODO + 'metadata': None, # TODO + } + for visit in OBJECT_TYPE_KEYS['origin_visit'][1] + if visit['origin']['url'] == origin['url'] \ + and visit['origin']['type'] == origin['type'] + ] + actual_visits = list(storage.origin_visit_get(origin['id'])) + for visit in actual_visits: + del visit['visit'] # opaque identifier + assert expected_visits == actual_visits + # TODO: check for content