diff --git a/swh/journal/direct_writer.py b/swh/journal/direct_writer.py new file mode 100644 index 0000000..6900a32 --- /dev/null +++ b/swh/journal/direct_writer.py @@ -0,0 +1,64 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import logging + +from kafka import KafkaProducer + +from .serializers import key_to_kafka, value_to_kafka + +logger = logging.getLogger(__name__) + + +class DirectKafkaWriter: + """This class is instantiated and used by swh-storage to write incoming + new objects to Kafka before adding them to the storage backend + (eg. postgresql) itself.""" + def __init__(self, brokers, prefix, client_id): + self._prefix = prefix + + self.producer = KafkaProducer( + bootstrap_servers=brokers, + key_serializer=key_to_kafka, + value_serializer=value_to_kafka, + client_id=client_id, + ) + + def _get_key(self, object_type, object_): + if object_type in ('revision', 'release', 'directory', 'snapshot'): + return object_['id'] + elif object_type == 'content': + return object_['sha1'] # TODO: use a dict of hashes + elif object_type == 'origin': + return {'url': object_['url'], 'type': object_['type']} + elif object_type == 'origin_visit': + return { + 'origin': object_['origin'], + 'date': str(object_['date']), + } + else: + raise ValueError('Unknown object type: %s.' % object_type) + + def _sanitize_object(self, object_type, object_): + if object_type == 'origin_visit': + # Compatibility with the publisher's format + return { + **object_, + 'date': str(object_['date']), + } + return object_ + + def write_addition(self, object_type, object_): + topic = '%s.%s' % (self._prefix, object_type) + key = self._get_key(object_type, object_) + object_ = self._sanitize_object(object_type, object_) + logger.debug('topic: %s, key: %s, value: %s' % (topic, key, object_)) + self.producer.send(topic, key=key, value=object_) + + write_update = write_addition + + def write_additions(self, object_type, objects): + for object_ in objects: + self.write_addition(object_type, object_) diff --git a/swh/journal/replay.py b/swh/journal/replay.py index 96d54a6..2f064c6 100644 --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -1,70 +1,72 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from kafka import KafkaConsumer from .serializers import kafka_to_value logger = logging.getLogger(__name__) OBJECT_TYPES = frozenset([ 'origin', 'origin_visit', 'snapshot', 'release', 'revision', 'directory', 'content', ]) class StorageReplayer: def __init__(self, brokers, prefix, consumer_id, object_types=OBJECT_TYPES): if not set(object_types).issubset(OBJECT_TYPES): raise ValueError('Unknown object types: %s' % ', '.join( set(object_types) - OBJECT_TYPES)) self._object_types = object_types self.consumer = KafkaConsumer( bootstrap_servers=brokers, value_deserializer=kafka_to_value, auto_offset_reset='earliest', enable_auto_commit=False, group_id=consumer_id, ) self.consumer.subscribe( topics=['%s.%s' % (prefix, object_type) for object_type in object_types], ) def fill(self, storage, max_messages): num = 0 for message in self.consumer: object_type = message.topic.split('.')[-1] # Got a message from a topic we did not subscribe to. assert object_type in self._object_types, object_type self.insert_object(storage, object_type, message.value) num += 1 if num >= max_messages: break return num def insert_object(self, storage, object_type, object_): if object_type in ('content', 'directory', 'revision', 'release', 'origin'): if object_type == 'content': # TODO: we don't write contents in Kafka, so we need to # find a way to insert them somehow. object_['status'] = 'absent' method = getattr(storage, object_type + '_add') method([object_]) elif object_type == 'origin_visit': origin_id = storage.origin_add_one(object_.pop('origin')) - storage.origin_visit_add(origin=origin_id, **object_) - method = getattr(storage, object_type + '_add') + visit = storage.origin_visit_add( + origin=origin_id, date=object_.pop('date')) + storage.origin_visit_update( + origin_id, visit['visit'], **object_) else: assert False diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py index 8cb3938..6f80210 100644 --- a/swh/journal/tests/conftest.py +++ b/swh/journal/tests/conftest.py @@ -1,251 +1,257 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import pytest import logging from kafka import KafkaProducer, KafkaConsumer from subprocess import Popen from typing import Tuple, Dict from pathlib import Path from pytest_kafka import ( make_zookeeper_process, make_kafka_server, constants ) from swh.journal.publisher import JournalPublisher from swh.model.hashutil import hash_to_bytes from swh.journal.serializers import kafka_to_key, key_to_kafka, kafka_to_value CONTENTS = [ { 'length': 3, 'sha1': hash_to_bytes( '34973274ccef6ab4dfaaf86599792fa9c3fe4689'), 'sha1_git': b'foo', 'blake2s256': b'bar', 'sha256': b'baz', 'status': 'visible', }, ] COMMITTER = [ { 'id': 1, 'fullname': b'foo', }, { 'id': 2, 'fullname': b'bar', } ] REVISIONS = [ { 'id': hash_to_bytes('7026b7c1a2af56521e951c01ed20f255fa054238'), 'message': b'hello', 'date': { 'timestamp': { 'seconds': 1234567891, 'microseconds': 0, }, 'offset': 120, 'negative_utc': None, }, 'committer': COMMITTER[0], 'author': COMMITTER[0], 'committer_date': None, }, { 'id': hash_to_bytes('368a48fe15b7db2383775f97c6b247011b3f14f4'), 'message': b'hello again', 'date': { 'timestamp': { 'seconds': 1234567892, 'microseconds': 0, }, 'offset': 120, 'negative_utc': None, }, 'committer': COMMITTER[1], 'author': COMMITTER[1], 'committer_date': None, }, ] RELEASES = [ { 'id': hash_to_bytes('d81cc0710eb6cf9efd5b920a8453e1e07157b6cd'), 'name': b'v0.0.1', 'date': { 'timestamp': { 'seconds': 1234567890, 'microseconds': 0, }, 'offset': 120, 'negative_utc': None, }, 'author': COMMITTER[0], }, ] ORIGINS = [ { 'url': 'https://somewhere.org/den/fox', 'type': 'git', }, { 'url': 'https://overtherainbow.org/fox/den', 'type': 'svn', } ] ORIGIN_VISITS = [ { 'origin': ORIGINS[0], - 'date': '2013-05-07T04:20:39.369271+00:00', + 'date': '2013-05-07 04:20:39.369271+00:00', + 'snapshot': None, # TODO + 'status': 'ongoing', # TODO + 'metadata': {'foo': 'bar'}, }, { 'origin': ORIGINS[0], - 'date': '2018-11-27T17:20:39.000000+00:00', + 'date': '2018-11-27 17:20:39+00:00', + 'snapshot': None, # TODO + 'status': 'ongoing', # TODO + 'metadata': {'baz': 'qux'}, } ] # From type to tuple (id, ) OBJECT_TYPE_KEYS = { 'content': ('sha1', CONTENTS), 'revision': ('id', REVISIONS), 'release': ('id', RELEASES), 'origin': (None, ORIGINS), 'origin_visit': (None, ORIGIN_VISITS), } TEST_CONFIG = { 'temporary_prefix': 'swh.tmp_journal.new', 'final_prefix': 'swh.journal.objects', 'consumer_id': 'swh.journal.publisher', 'publisher_id': 'swh.journal.publisher', 'object_types': OBJECT_TYPE_KEYS.keys(), 'max_messages': 1, # will read 1 message and stops 'storage': {'cls': 'memory', 'args': {}} } class JournalPublisherTest(JournalPublisher): """A journal publisher which override the default configuration parsing setup. """ def _prepare_storage(self, config): super()._prepare_storage(config) self.storage.content_add({'data': b'42', **c} for c in CONTENTS) self.storage.revision_add(REVISIONS) self.storage.release_add(RELEASES) origins = self.storage.origin_add(ORIGINS) origin_visits = [] for i, ov in enumerate(ORIGIN_VISITS): origin_id = origins[i]['id'] ov = self.storage.origin_visit_add(origin_id, ov['date']) origin_visits.append(ov) self.origins = origins self.origin_visits = origin_visits KAFKA_ROOT = os.environ.get('SWH_KAFKA_ROOT') KAFKA_ROOT = KAFKA_ROOT if KAFKA_ROOT else os.path.dirname(__file__) + '/kafka' if not os.path.exists(KAFKA_ROOT): msg = ('Development error: %s must exist and target an ' 'existing kafka installation' % KAFKA_ROOT) raise ValueError(msg) KAFKA_SCRIPTS = Path(KAFKA_ROOT) / 'bin' KAFKA_BIN = str(KAFKA_SCRIPTS / 'kafka-server-start.sh') ZOOKEEPER_BIN = str(KAFKA_SCRIPTS / 'zookeeper-server-start.sh') # Those defines fixtures zookeeper_proc = make_zookeeper_process(ZOOKEEPER_BIN) os.environ['KAFKA_LOG4J_OPTS'] = \ '-Dlog4j.configuration=file:%s/log4j.properties' % \ os.path.dirname(__file__) kafka_server = make_kafka_server(KAFKA_BIN, 'zookeeper_proc') logger = logging.getLogger('kafka') logger.setLevel(logging.WARN) @pytest.fixture def test_config(): """Test configuration needed for publisher/producer/consumer """ return TEST_CONFIG @pytest.fixture def producer_to_publisher( kafka_server: Tuple[Popen, int], test_config: Dict, ) -> KafkaProducer: # noqa """Producer to send message to the publisher's consumer. """ _, port = kafka_server producer = KafkaProducer( bootstrap_servers='localhost:{}'.format(port), key_serializer=key_to_kafka, value_serializer=key_to_kafka, client_id=test_config['consumer_id'], ) return producer @pytest.fixture def consumer_from_publisher(kafka_server: Tuple[Popen, int], test_config: Dict) -> KafkaConsumer: """Get a connected Kafka consumer. """ kafka_topics = ['%s.%s' % (test_config['final_prefix'], object_type) for object_type in test_config['object_types']] _, kafka_port = kafka_server consumer = KafkaConsumer( *kafka_topics, bootstrap_servers='localhost:{}'.format(kafka_port), consumer_timeout_ms=constants.DEFAULT_CONSUMER_TIMEOUT_MS, key_deserializer=kafka_to_key, value_deserializer=kafka_to_value, auto_offset_reset='earliest', enable_auto_commit=True, group_id="test-consumer" ) # Enforce auto_offset_reset=earliest even if the consumer was created # too soon wrt the server. while len(consumer.assignment()) == 0: consumer.poll(timeout_ms=20) consumer.seek_to_beginning() return consumer @pytest.fixture def publisher(kafka_server: Tuple[Popen, int], test_config: Dict) -> JournalPublisher: """Test Publisher factory. We cannot use a fixture here as we need to modify the sample. """ # consumer and producer of the publisher needs to discuss with the # right instance _, port = kafka_server test_config['brokers'] = ['localhost:{}'.format(port)] publisher = JournalPublisherTest(test_config) return publisher diff --git a/swh/journal/tests/test_direct_writer.py b/swh/journal/tests/test_direct_writer.py new file mode 100644 index 0000000..284e72e --- /dev/null +++ b/swh/journal/tests/test_direct_writer.py @@ -0,0 +1,95 @@ +# Copyright (C) 2018-2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from collections import defaultdict +import time + +from kafka import KafkaConsumer +from subprocess import Popen +from typing import Tuple + +from swh.storage import get_storage + +from swh.journal.direct_writer import DirectKafkaWriter +from swh.journal.serializers import value_to_kafka, kafka_to_value + +from .conftest import OBJECT_TYPE_KEYS + + +def assert_written(consumer): + time.sleep(0.1) # Without this, some messages are missing + + consumed_objects = defaultdict(list) + for msg in consumer: + consumed_objects[msg.topic].append((msg.key, msg.value)) + + for (object_type, (key_name, objects)) in OBJECT_TYPE_KEYS.items(): + topic = 'swh.journal.objects.%s' % object_type + (keys, values) = zip(*consumed_objects[topic]) + if key_name: + assert list(keys) == [object_[key_name] for object_ in objects] + else: + pass # TODO + + if object_type == 'origin_visit': + for value in values: + del value['visit'] + + for object_ in objects: + assert kafka_to_value(value_to_kafka(object_)) in values + + +def test_direct_writer( + kafka_server: Tuple[Popen, int], + consumer_from_publisher: KafkaConsumer): + + config = { + 'brokers': 'localhost:%d' % kafka_server[1], + 'client_id': 'direct_writer', + 'prefix': 'swh.journal.objects', + } + + writer = DirectKafkaWriter(**config) + + for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): + for (num, object_) in enumerate(objects): + if object_type == 'origin_visit': + object_ = {**object_, 'visit': num} + writer.write_addition(object_type, object_) + + assert_written(consumer_from_publisher) + + +def test_storage_direct_writer( + kafka_server: Tuple[Popen, int], + consumer_from_publisher: KafkaConsumer): + + config = { + 'brokers': 'localhost:%d' % kafka_server[1], + 'client_id': 'direct_writer', + 'prefix': 'swh.journal.objects', + } + + storage = get_storage('memory', {'journal_writer': { + 'cls': 'kafka', 'args': config}}) + + for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): + method = getattr(storage, object_type + '_add') + if object_type in ('content', 'directory', 'revision', 'release', + 'snapshot', 'origin'): + if object_type == 'content': + objects = [{**obj, 'data': b''} for obj in objects] + method(objects) + elif object_type in ('origin_visit',): + for object_ in objects: + object_ = object_.copy() + origin_id = storage.origin_add_one(object_.pop('origin')) + visit = method(origin=origin_id, date=object_.pop('date')) + visit_id = visit['visit'] + storage.origin_visit_update(origin_id, visit_id, **object_) + else: + assert False, object_type + + assert_written(consumer_from_publisher) diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py index 0c805e9..34f965e 100644 --- a/swh/journal/tests/test_replay.py +++ b/swh/journal/tests/test_replay.py @@ -1,82 +1,80 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import random from subprocess import Popen from typing import Tuple import dateutil from kafka import KafkaProducer from swh.storage import get_storage from swh.journal.serializers import key_to_kafka, value_to_kafka from swh.journal.replay import StorageReplayer from .conftest import OBJECT_TYPE_KEYS def test_storage_play( kafka_server: Tuple[Popen, int]): (_, port) = kafka_server storage = get_storage('memory', {}) producer = KafkaProducer( bootstrap_servers='localhost:{}'.format(port), key_serializer=key_to_kafka, value_serializer=value_to_kafka, client_id='test producer', ) # Fill Kafka nb_sent = 0 for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): topic = 'swh.journal.objects.' + object_type for object_ in objects: key = bytes(random.randint(0, 255) for _ in range(40)) producer.send(topic, key=key, value=object_) nb_sent += 1 # Fill the storage from Kafka config = { 'brokers': 'localhost:%d' % kafka_server[1], 'consumer_id': 'replayer', 'prefix': 'swh.journal.objects', } replayer = StorageReplayer(**config) nb_inserted = replayer.fill(storage, max_messages=nb_sent) assert nb_sent == nb_inserted # Check the objects were actually inserted in the storage assert OBJECT_TYPE_KEYS['revision'][1] == \ list(storage.revision_get( [rev['id'] for rev in OBJECT_TYPE_KEYS['revision'][1]])) assert OBJECT_TYPE_KEYS['release'][1] == \ list(storage.release_get( [rel['id'] for rel in OBJECT_TYPE_KEYS['release'][1]])) origins = list(storage.origin_get( [orig for orig in OBJECT_TYPE_KEYS['origin'][1]])) assert OBJECT_TYPE_KEYS['origin'][1] == \ [{'url': orig['url'], 'type': orig['type']} for orig in origins] for origin in origins: expected_visits = [ { + **visit, 'origin': origin['id'], 'date': dateutil.parser.parse(visit['date']), - 'snapshot': None, # TODO - 'status': 'ongoing', # TODO - 'metadata': None, # TODO } for visit in OBJECT_TYPE_KEYS['origin_visit'][1] - if visit['origin']['url'] == origin['url'] \ + if visit['origin']['url'] == origin['url'] and visit['origin']['type'] == origin['type'] ] actual_visits = list(storage.origin_visit_get(origin['id'])) for visit in actual_visits: del visit['visit'] # opaque identifier assert expected_visits == actual_visits # TODO: check for content