diff --git a/swh/journal/direct_writer.py b/swh/journal/direct_writer.py new file mode 100644 --- /dev/null +++ b/swh/journal/direct_writer.py @@ -0,0 +1,64 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import logging + +from kafka import KafkaProducer + +from .serializers import key_to_kafka, value_to_kafka + +logger = logging.getLogger(__name__) + + +class DirectKafkaWriter: + """This class is instantiated and used by swh-storage to write incoming + new objects to Kafka before adding them to the storage backend + (eg. postgresql) itself.""" + def __init__(self, brokers, prefix, client_id): + self._prefix = prefix + + self.producer = KafkaProducer( + bootstrap_servers=brokers, + key_serializer=key_to_kafka, + value_serializer=value_to_kafka, + client_id=client_id, + ) + + def _get_key(self, object_type, object_): + if object_type in ('revision', 'release', 'directory', 'snapshot'): + return object_['id'] + elif object_type == 'content': + return object_['sha1'] # TODO: use a dict of hashes + elif object_type == 'origin': + return {'url': object_['url'], 'type': object_['type']} + elif object_type == 'origin_visit': + return { + 'origin': object_['origin'], + 'date': str(object_['date']), + } + else: + raise ValueError('Unknown object type: %s.' % object_type) + + def _sanitize_object(self, object_type, object_): + if object_type == 'origin_visit': + # Compatibility with the publisher's format + return { + **object_, + 'date': str(object_['date']), + } + return object_ + + def write_addition(self, object_type, object_): + topic = '%s.%s' % (self._prefix, object_type) + key = self._get_key(object_type, object_) + object_ = self._sanitize_object(object_type, object_) + logger.debug('topic: %s, key: %s, value: %s' % (topic, key, object_)) + self.producer.send(topic, key=key, value=object_) + + write_update = write_addition + + def write_additions(self, object_type, objects): + for object_ in objects: + self.write_addition(object_type, object_) diff --git a/swh/journal/replay.py b/swh/journal/replay.py --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -64,7 +64,9 @@ method([object_]) elif object_type == 'origin_visit': origin_id = storage.origin_add_one(object_.pop('origin')) - storage.origin_visit_add(origin=origin_id, **object_) - method = getattr(storage, object_type + '_add') + visit = storage.origin_visit_add( + origin=origin_id, date=object_.pop('date')) + storage.origin_visit_update( + origin_id, visit['visit'], **object_) else: assert False diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py --- a/swh/journal/tests/conftest.py +++ b/swh/journal/tests/conftest.py @@ -108,11 +108,17 @@ ORIGIN_VISITS = [ { 'origin': ORIGINS[0], - 'date': '2013-05-07T04:20:39.369271+00:00', + 'date': '2013-05-07 04:20:39.369271+00:00', + 'snapshot': None, # TODO + 'status': 'ongoing', # TODO + 'metadata': {'foo': 'bar'}, }, { 'origin': ORIGINS[0], - 'date': '2018-11-27T17:20:39.000000+00:00', + 'date': '2018-11-27 17:20:39+00:00', + 'snapshot': None, # TODO + 'status': 'ongoing', # TODO + 'metadata': {'baz': 'qux'}, } ] diff --git a/swh/journal/tests/test_direct_writer.py b/swh/journal/tests/test_direct_writer.py new file mode 100644 --- /dev/null +++ b/swh/journal/tests/test_direct_writer.py @@ -0,0 +1,95 @@ +# Copyright (C) 2018-2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from collections import defaultdict +import time + +from kafka import KafkaConsumer +from subprocess import Popen +from typing import Tuple + +from swh.storage import get_storage + +from swh.journal.direct_writer import DirectKafkaWriter +from swh.journal.serializers import value_to_kafka, kafka_to_value + +from .conftest import OBJECT_TYPE_KEYS + + +def assert_written(consumer): + time.sleep(0.1) # Without this, some messages are missing + + consumed_objects = defaultdict(list) + for msg in consumer: + consumed_objects[msg.topic].append((msg.key, msg.value)) + + for (object_type, (key_name, objects)) in OBJECT_TYPE_KEYS.items(): + topic = 'swh.journal.objects.%s' % object_type + (keys, values) = zip(*consumed_objects[topic]) + if key_name: + assert list(keys) == [object_[key_name] for object_ in objects] + else: + pass # TODO + + if object_type == 'origin_visit': + for value in values: + del value['visit'] + + for object_ in objects: + assert kafka_to_value(value_to_kafka(object_)) in values + + +def test_direct_writer( + kafka_server: Tuple[Popen, int], + consumer_from_publisher: KafkaConsumer): + + config = { + 'brokers': 'localhost:%d' % kafka_server[1], + 'client_id': 'direct_writer', + 'prefix': 'swh.journal.objects', + } + + writer = DirectKafkaWriter(**config) + + for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): + for (num, object_) in enumerate(objects): + if object_type == 'origin_visit': + object_ = {**object_, 'visit': num} + writer.write_addition(object_type, object_) + + assert_written(consumer_from_publisher) + + +def test_storage_direct_writer( + kafka_server: Tuple[Popen, int], + consumer_from_publisher: KafkaConsumer): + + config = { + 'brokers': 'localhost:%d' % kafka_server[1], + 'client_id': 'direct_writer', + 'prefix': 'swh.journal.objects', + } + + storage = get_storage('memory', {'journal_writer': { + 'cls': 'kafka', 'args': config}}) + + for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): + method = getattr(storage, object_type + '_add') + if object_type in ('content', 'directory', 'revision', 'release', + 'snapshot', 'origin'): + if object_type == 'content': + objects = [{**obj, 'data': b''} for obj in objects] + method(objects) + elif object_type in ('origin_visit',): + for object_ in objects: + object_ = object_.copy() + origin_id = storage.origin_add_one(object_.pop('origin')) + visit = method(origin=origin_id, date=object_.pop('date')) + visit_id = visit['visit'] + storage.origin_visit_update(origin_id, visit_id, **object_) + else: + assert False, object_type + + assert_written(consumer_from_publisher) diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py --- a/swh/journal/tests/test_replay.py +++ b/swh/journal/tests/test_replay.py @@ -65,14 +65,12 @@ for origin in origins: expected_visits = [ { + **visit, 'origin': origin['id'], 'date': dateutil.parser.parse(visit['date']), - 'snapshot': None, # TODO - 'status': 'ongoing', # TODO - 'metadata': None, # TODO } for visit in OBJECT_TYPE_KEYS['origin_visit'][1] - if visit['origin']['url'] == origin['url'] \ + if visit['origin']['url'] == origin['url'] and visit['origin']['type'] == origin['type'] ] actual_visits = list(storage.origin_visit_get(origin['id']))