diff --git a/swh/journal/replay.py b/swh/journal/replay.py new file mode 100644 --- /dev/null +++ b/swh/journal/replay.py @@ -0,0 +1,76 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import itertools +import logging + +from kafka import KafkaConsumer + +from .serializers import kafka_to_value + +logger = logging.getLogger(__name__) + + +OBJECT_TYPES = frozenset([ + 'origin', 'origin_visit', 'snapshot', 'release', 'revision', + 'directory', 'content', +]) + + +def kafka_to_dict(data): + def decode_dict_keys(v): + if isinstance(v, dict): + return {k.decode(): decode_dict_keys(v) + for (k, v) in v.items()} + elif isinstance(v, list): + return list(map(decode_dict_keys, v)) + else: + return v + return decode_dict_keys(kafka_to_value(data)) + + +class StorageReplayer: + def __init__(self, brokers, prefix, consumer_id, + object_types=OBJECT_TYPES): + if not set(object_types).issubset(OBJECT_TYPES): + raise ValueError('Unknown object types: %s' % ', '.join( + set(object_types) - OBJECT_TYPES)) + + self._object_types = object_types + self.consumer = KafkaConsumer( + bootstrap_servers=brokers, + value_deserializer=kafka_to_dict, + auto_offset_reset='earliest', + enable_auto_commit=False, + group_id=consumer_id, + ) + self.consumer.subscribe( + topics=['%s.%s' % (prefix, object_type) + for object_type in object_types], + ) + + def fill(self, storage, max_messages): + num = 0 + for (num, message) in itertools.islice(enumerate(self.consumer), + max_messages): + object_type = message.topic.split('.')[-1] + + # Got a message from a topic we did not subscribe to. + assert object_type in self._object_types, object_type + + self.insert_object(storage, object_type, message.value) + return num+1 + + def insert_object(self, storage, object_type, object_): + method = getattr(storage, object_type + '_add') + if object_type in ('content', 'directory', 'revision', 'release', + 'origin'): + if object_type == 'content': + object_['status'] = 'absent' + method([object_]) + elif object_type in ('origin_visit',): + method(**object_) + else: + assert False diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py --- a/swh/journal/tests/conftest.py +++ b/swh/journal/tests/conftest.py @@ -37,11 +37,11 @@ COMMITTER = [ { 'id': 1, - 'fullname': 'foo', + 'fullname': b'foo', }, { 'id': 2, - 'fullname': 'bar', + 'fullname': b'bar', } ] diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py new file mode 100644 --- /dev/null +++ b/swh/journal/tests/test_replay.py @@ -0,0 +1,63 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import random +from subprocess import Popen +from typing import Tuple + +from kafka import KafkaProducer + +from swh.storage import get_storage + +from swh.journal.serializers import key_to_kafka, value_to_kafka +from swh.journal.replay import StorageReplayer + +from .conftest import OBJECT_TYPE_KEYS + + +def test_storage_play( + kafka_server: Tuple[Popen, int]): + (_, port) = kafka_server + + config = { + 'brokers': 'localhost:%d' % kafka_server[1], + 'client_id': 'direct_writer', + 'prefix': 'swh.journal.objects_from_new_storage', + } + storage = get_storage('memory', {'journal_writer': { + 'cls': 'kafka', 'args': config}}) + + producer = KafkaProducer( + bootstrap_servers='localhost:{}'.format(port), + key_serializer=key_to_kafka, + value_serializer=value_to_kafka, + client_id='test producer', + ) + + # Fill Kafka + nb_sent = 0 + for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): + topic = 'swh.journal.objects_source.' + object_type + for object_ in objects: + key = bytes(random.randint(0, 255) for _ in range(40)) + producer.send(topic, key=key, value=object_) + nb_sent += 1 + + # Fill the storage from Kafka + config = { + 'brokers': 'localhost:%d' % kafka_server[1], + 'consumer_id': 'replayer', + 'prefix': 'swh.journal.objects_source', + } + replayer = StorageReplayer(**config) + nb_inserted = replayer.fill(storage, max_messages=nb_sent) + assert nb_sent == nb_inserted + + # Check the objects were actually inserted in the storage + assert OBJECT_TYPE_KEYS['revision'][1] == list(storage.revision_get( + [rev['id'] for rev in OBJECT_TYPE_KEYS['revision'][1]])) + assert OBJECT_TYPE_KEYS['release'][1] == list(storage.release_get( + [rev['id'] for rev in OBJECT_TYPE_KEYS['release'][1]])) + # TODO: check for content