diff --git a/requirements.txt b/requirements.txt index 499f989..184c591 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ # Add here external Python modules dependencies, one per line. Module names # should match https://pypi.python.org/pypi names. For the full spec or # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html -kafka-python >= 1.3 +confluent-kafka msgpack vcversioner diff --git a/swh/journal/client.py b/swh/journal/client.py index 473fb0e..91dc230 100644 --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -1,109 +1,157 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from collections import defaultdict import logging +import time -from kafka import KafkaConsumer +from confluent_kafka import Consumer, KafkaException -from .serializers import kafka_to_key, kafka_to_value +from .serializers import kafka_to_value from swh.journal import DEFAULT_PREFIX logger = logging.getLogger(__name__) # Only accepted offset reset policy accepted ACCEPTED_OFFSET_RESET = ['earliest', 'latest'] # Only accepted object types ACCEPTED_OBJECT_TYPES = [ 'content', 'directory', 'revision', 'release', 'snapshot', 'origin', 'origin_visit' ] class JournalClient: """A base client for the Software Heritage journal. The current implementation of the journal uses Apache Kafka brokers to publish messages under a given topic prefix, with each object type using a specific topic under that prefix. If the 'prefix' argument is None (default value), it will take the default value 'swh.journal.objects'. Clients subscribe to events specific to each object type as listed in the `object_types` argument (if unset, defaults to all accepted objet types). Clients can be sharded by setting the `group_id` to a common value across instances. The journal will share the message throughput across the nodes sharing the same group_id. Messages are processed by the `process_objects` method in batches of maximum `max_messages`. Any other named argument is passed directly to KafkaConsumer(). """ def __init__( self, brokers, group_id, prefix=None, object_types=None, - max_messages=0, auto_offset_reset='earliest', **kwargs): + max_messages=0, process_timeout=0, auto_offset_reset='earliest', + **kwargs): if prefix is None: prefix = DEFAULT_PREFIX if object_types is None: object_types = ACCEPTED_OBJECT_TYPES if auto_offset_reset not in ACCEPTED_OFFSET_RESET: raise ValueError( 'Option \'auto_offset_reset\' only accept %s.' % ACCEPTED_OFFSET_RESET) for object_type in object_types: if object_type not in ACCEPTED_OBJECT_TYPES: raise ValueError( 'Option \'object_types\' only accepts %s.' % ACCEPTED_OFFSET_RESET) - self.consumer = KafkaConsumer( - bootstrap_servers=brokers, - key_deserializer=kafka_to_key, - value_deserializer=kafka_to_value, - auto_offset_reset=auto_offset_reset, - enable_auto_commit=False, - group_id=group_id, - **kwargs) + self.value_deserializer = kafka_to_value - self.consumer.subscribe( - topics=['%s.%s' % (prefix, object_type) - for object_type in object_types], - ) + if isinstance(brokers, str): + brokers = [brokers] + + consumer_settings = { + **kwargs, + 'bootstrap.servers': ','.join(brokers), + 'auto.offset.reset': auto_offset_reset, + 'group.id': group_id, + 'enable.auto.commit': False, + } + + logger.debug('Consumer settings: %s', consumer_settings) + + self.consumer = Consumer(consumer_settings, logger=logger) + + topics = ['%s.%s' % (prefix, object_type) + for object_type in object_types] + + logger.debug('Upstream topics: %s', + self.consumer.list_topics(timeout=1)) + logger.debug('Subscribing to: %s', topics) + + self.consumer.subscribe(topics=topics) self.max_messages = max_messages + self.process_timeout = process_timeout + self._object_types = object_types def process(self, worker_fn): """Polls Kafka for a batch of messages, and calls the worker_fn with these messages. Args: worker_fn Callable[Dict[str, List[dict]]]: Function called with the messages as argument. """ + start_time = time.monotonic() nb_messages = 0 - polled = self.consumer.poll() - for (partition, messages) in polled.items(): - object_type = partition.topic.split('.')[-1] + + objects = defaultdict(list) + + while True: + # timeout for message poll + timeout = 1.0 + + elapsed = time.monotonic() - start_time + if self.process_timeout: + if elapsed >= self.process_timeout: + break + + timeout = self.process_timeout - elapsed + + message = self.consumer.poll(timeout=timeout) + if not message: + continue + + error = message.error() + if error is not None: + if error.fatal(): + raise KafkaException(error) + logger.info('Received non-fatal kafka error: %s', error) + continue + + nb_messages += 1 + + object_type = message.topic().split('.')[-1] # Got a message from a topic we did not subscribe to. assert object_type in self._object_types, object_type - worker_fn({object_type: [msg.value for msg in messages]}) + objects[object_type].append( + self.value_deserializer(message.value()) + ) + + if nb_messages >= self.max_messages: + break - nb_messages += len(messages) + worker_fn(dict(objects)) self.consumer.commit() return nb_messages diff --git a/swh/journal/direct_writer.py b/swh/journal/direct_writer.py index 17a5dc1..37ed4dc 100644 --- a/swh/journal/direct_writer.py +++ b/swh/journal/direct_writer.py @@ -1,78 +1,94 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging -from kafka import KafkaProducer +from confluent_kafka import Producer from swh.model.hashutil import DEFAULT_ALGORITHMS from swh.model.model import BaseModel from .serializers import key_to_kafka, value_to_kafka logger = logging.getLogger(__name__) class DirectKafkaWriter: """This class is instantiated and used by swh-storage to write incoming new objects to Kafka before adding them to the storage backend (eg. postgresql) itself.""" def __init__(self, brokers, prefix, client_id): self._prefix = prefix - self.producer = KafkaProducer( - bootstrap_servers=brokers, - key_serializer=key_to_kafka, - value_serializer=value_to_kafka, - client_id=client_id, - ) + self.producer = Producer({ + 'bootstrap.servers': brokers, + 'client.id': client_id, + 'enable.idempotence': 'true', + }) def send(self, topic, key, value): - self.producer.send(topic=topic, key=key, value=value) + self.producer.produce( + topic=topic, + key=key_to_kafka(key), + value=value_to_kafka(value), + ) + + def flush(self): + self.producer.flush() + # Need to service the callbacks regularly by calling poll + self.producer.poll(0) def _get_key(self, object_type, object_): if object_type in ('revision', 'release', 'directory', 'snapshot'): return object_['id'] elif object_type == 'content': return object_['sha1'] # TODO: use a dict of hashes elif object_type == 'skipped_content': return { hash: object_[hash] for hash in DEFAULT_ALGORITHMS } elif object_type == 'origin': return {'url': object_['url'], 'type': object_['type']} elif object_type == 'origin_visit': return { 'origin': object_['origin'], 'date': str(object_['date']), } else: raise ValueError('Unknown object type: %s.' % object_type) def _sanitize_object(self, object_type, object_): if object_type == 'origin_visit': return { **object_, 'date': str(object_['date']), } elif object_type == 'origin': assert 'id' not in object_ return object_ - def write_addition(self, object_type, object_): + def write_addition(self, object_type, object_, flush=True): + """Write a single object to the journal""" if isinstance(object_, BaseModel): object_ = object_.to_dict() topic = '%s.%s' % (self._prefix, object_type) key = self._get_key(object_type, object_) object_ = self._sanitize_object(object_type, object_) logger.debug('topic: %s, key: %s, value: %s', topic, key, object_) self.send(topic, key=key, value=object_) + if flush: + self.flush() + write_update = write_addition - def write_additions(self, object_type, objects): + def write_additions(self, object_type, objects, flush=True): + """Write a set of objects to the journal""" for object_ in objects: - self.write_addition(object_type, object_) + self.write_addition(object_type, object_, flush=False) + + if flush: + self.flush() diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py index f46fbe8..59d09ed 100644 --- a/swh/journal/tests/conftest.py +++ b/swh/journal/tests/conftest.py @@ -1,237 +1,235 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import pytest import logging import random import string -from kafka import KafkaConsumer +from confluent_kafka import Consumer from subprocess import Popen from typing import Tuple, Dict from pathlib import Path from pytest_kafka import ( - make_zookeeper_process, make_kafka_server, constants + make_zookeeper_process, make_kafka_server ) from swh.model.hashutil import hash_to_bytes -from swh.journal.serializers import kafka_to_key, kafka_to_value + +logger = logging.getLogger(__name__) CONTENTS = [ { 'length': 3, 'sha1': hash_to_bytes( '34973274ccef6ab4dfaaf86599792fa9c3fe4689'), 'sha1_git': b'foo', 'blake2s256': b'bar', 'sha256': b'baz', 'status': 'visible', }, ] COMMITTERS = [ { 'fullname': b'foo', 'name': b'foo', 'email': b'', }, { 'fullname': b'bar', 'name': b'bar', 'email': b'', } ] DATES = [ { 'timestamp': { 'seconds': 1234567891, 'microseconds': 0, }, 'offset': 120, 'negative_utc': None, }, { 'timestamp': { 'seconds': 1234567892, 'microseconds': 0, }, 'offset': 120, 'negative_utc': None, } ] REVISIONS = [ { 'id': hash_to_bytes('7026b7c1a2af56521e951c01ed20f255fa054238'), 'message': b'hello', 'date': DATES[0], 'committer': COMMITTERS[0], 'author': COMMITTERS[0], 'committer_date': DATES[0], 'type': 'git', 'directory': '\x01'*20, 'synthetic': False, 'metadata': None, 'parents': [], }, { 'id': hash_to_bytes('368a48fe15b7db2383775f97c6b247011b3f14f4'), 'message': b'hello again', 'date': DATES[1], 'committer': COMMITTERS[1], 'author': COMMITTERS[1], 'committer_date': DATES[1], 'type': 'hg', 'directory': '\x02'*20, 'synthetic': False, 'metadata': None, 'parents': [], }, ] RELEASES = [ { 'id': hash_to_bytes('d81cc0710eb6cf9efd5b920a8453e1e07157b6cd'), 'name': b'v0.0.1', 'date': { 'timestamp': { 'seconds': 1234567890, 'microseconds': 0, }, 'offset': 120, 'negative_utc': None, }, 'author': COMMITTERS[0], 'target_type': 'revision', 'target': b'\x04'*20, 'message': b'foo', 'synthetic': False, }, ] ORIGINS = [ { 'url': 'https://somewhere.org/den/fox', 'type': 'git', }, { 'url': 'https://overtherainbow.org/fox/den', 'type': 'svn', } ] ORIGIN_VISITS = [ { 'origin': ORIGINS[0], 'date': '2013-05-07 04:20:39.369271+00:00', 'snapshot': None, # TODO 'status': 'ongoing', # TODO 'metadata': {'foo': 'bar'}, 'type': 'git', }, { 'origin': ORIGINS[0], 'date': '2018-11-27 17:20:39+00:00', 'snapshot': None, # TODO 'status': 'ongoing', # TODO 'metadata': {'baz': 'qux'}, 'type': 'git', } ] # From type to tuple (id, ) OBJECT_TYPE_KEYS = { 'content': ('sha1', CONTENTS), 'revision': ('id', REVISIONS), 'release': ('id', RELEASES), 'origin': (None, ORIGINS), 'origin_visit': (None, ORIGIN_VISITS), } KAFKA_ROOT = os.environ.get('SWH_KAFKA_ROOT') KAFKA_ROOT = KAFKA_ROOT if KAFKA_ROOT else os.path.dirname(__file__) + '/kafka' if not os.path.exists(KAFKA_ROOT): msg = ('Development error: %s must exist and target an ' 'existing kafka installation' % KAFKA_ROOT) raise ValueError(msg) KAFKA_SCRIPTS = Path(KAFKA_ROOT) / 'bin' KAFKA_BIN = str(KAFKA_SCRIPTS / 'kafka-server-start.sh') ZOOKEEPER_BIN = str(KAFKA_SCRIPTS / 'zookeeper-server-start.sh') # Those defines fixtures zookeeper_proc = make_zookeeper_process(ZOOKEEPER_BIN, scope='session') os.environ['KAFKA_LOG4J_OPTS'] = \ '-Dlog4j.configuration=file:%s/log4j.properties' % \ os.path.dirname(__file__) kafka_server = make_kafka_server(KAFKA_BIN, 'zookeeper_proc', scope='session') -logger = logging.getLogger('kafka') -logger.setLevel(logging.WARN) +kafka_logger = logging.getLogger('kafka') +kafka_logger.setLevel(logging.WARN) @pytest.fixture(scope='function') def kafka_prefix(): return ''.join(random.choice(string.ascii_lowercase) for _ in range(10)) TEST_CONFIG = { 'consumer_id': 'swh.journal.consumer', 'object_types': OBJECT_TYPE_KEYS.keys(), 'max_messages': 1, # will read 1 message and stops 'storage': {'cls': 'memory', 'args': {}}, } @pytest.fixture def test_config(kafka_server: Tuple[Popen, int], kafka_prefix: str): """Test configuration needed for producer/consumer """ _, port = kafka_server return { **TEST_CONFIG, - 'brokers': ['localhost:{}'.format(port)], + 'brokers': ['127.0.0.1:{}'.format(port)], 'prefix': kafka_prefix + '.swh.journal.objects', } @pytest.fixture def consumer( - kafka_server: Tuple[Popen, int], test_config: Dict) -> KafkaConsumer: + kafka_server: Tuple[Popen, int], + test_config: Dict, + kafka_prefix: str, +) -> Consumer: """Get a connected Kafka consumer. """ + _, kafka_port = kafka_server + consumer = Consumer({ + 'bootstrap.servers': '127.0.0.1:{}'.format(kafka_port), + 'auto.offset.reset': 'earliest', + 'enable.auto.commit': True, + 'group.id': "test-consumer-%s" % kafka_prefix, + }) + kafka_topics = [ '%s.%s' % (test_config['prefix'], object_type) - for object_type in test_config['object_types']] - _, kafka_port = kafka_server - consumer = KafkaConsumer( - *kafka_topics, - bootstrap_servers='localhost:{}'.format(kafka_port), - consumer_timeout_ms=constants.DEFAULT_CONSUMER_TIMEOUT_MS, - key_deserializer=kafka_to_key, - value_deserializer=kafka_to_value, - auto_offset_reset='earliest', - enable_auto_commit=True, - group_id="test-consumer" - ) - - # Enforce auto_offset_reset=earliest even if the consumer was created - # too soon wrt the server. - while len(consumer.assignment()) == 0: - consumer.poll(timeout_ms=20) - consumer.seek_to_beginning() + for object_type in test_config['object_types'] + ] + + consumer.subscribe(kafka_topics) return consumer diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py index 630bd87..24f90b8 100644 --- a/swh/journal/tests/test_cli.py +++ b/swh/journal/tests/test_cli.py @@ -1,205 +1,217 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import functools +import logging import re import tempfile from subprocess import Popen from typing import Tuple from unittest.mock import patch from click.testing import CliRunner -from kafka import KafkaProducer +from confluent_kafka import Producer import pytest from swh.objstorage.backends.in_memory import InMemoryObjStorage from swh.storage.in_memory import Storage from swh.journal.cli import cli from swh.journal.serializers import key_to_kafka, value_to_kafka +logger = logging.getLogger(__name__) + + CLI_CONFIG = ''' storage: cls: memory args: {} objstorage_src: cls: mocked args: name: src objstorage_dst: cls: mocked args: name: dst ''' @pytest.fixture def storage(): """An instance of swh.storage.in_memory.Storage that gets injected into the CLI functions.""" storage = Storage() with patch('swh.journal.cli.get_storage') as get_storage_mock: get_storage_mock.return_value = storage yield storage def invoke(catch_exceptions, args): runner = CliRunner() with tempfile.NamedTemporaryFile('a', suffix='.yml') as config_fd: config_fd.write(CLI_CONFIG) config_fd.seek(0) args = ['-C' + config_fd.name] + args - result = runner.invoke(cli, args) + result = runner.invoke(cli, args, obj={'log_level': logging.DEBUG}) if not catch_exceptions and result.exception: print(result.output) raise result.exception return result def test_replay( storage: Storage, kafka_prefix: str, kafka_server: Tuple[Popen, int]): (_, port) = kafka_server kafka_prefix += '.swh.journal.objects' - producer = KafkaProducer( - bootstrap_servers='localhost:{}'.format(port), - key_serializer=key_to_kafka, - value_serializer=value_to_kafka, - client_id='test-producer', - ) + producer = Producer({ + 'bootstrap.servers': 'localhost:{}'.format(port), + 'client.id': 'test-producer', + 'enable.idempotence': 'true', + }) snapshot = {'id': b'foo', 'branches': { b'HEAD': { 'target_type': 'revision', 'target': b'\x01'*20, } }} - producer.send( - topic=kafka_prefix+'.snapshot', key=snapshot['id'], value=snapshot) + producer.produce( + topic=kafka_prefix+'.snapshot', + key=key_to_kafka(snapshot['id']), + value=value_to_kafka(snapshot), + ) + producer.flush() + + logger.debug('Flushed producer') result = invoke(False, [ 'replay', - '--broker', 'localhost:%d' % port, + '--broker', '127.0.0.1:%d' % port, '--group-id', 'test-cli-consumer', '--prefix', kafka_prefix, '--max-messages', '1', ]) expected = r'Done.\n' assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output assert storage.snapshot_get(snapshot['id']) == { **snapshot, 'next_branch': None} def _patch_objstorages(names): objstorages = {name: InMemoryObjStorage() for name in names} def get_mock_objstorage(cls, args): assert cls == 'mocked', cls return objstorages[args['name']] def decorator(f): @functools.wraps(f) @patch('swh.journal.cli.get_objstorage') def newf(get_objstorage_mock, *args, **kwargs): get_objstorage_mock.side_effect = get_mock_objstorage f(*args, objstorages=objstorages, **kwargs) return newf return decorator def _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages): - producer = KafkaProducer( - bootstrap_servers='localhost:{}'.format(kafka_port), - key_serializer=key_to_kafka, - value_serializer=value_to_kafka, - client_id='test-producer', - ) + producer = Producer({ + 'bootstrap.servers': '127.0.0.1:{}'.format(kafka_port), + 'client.id': 'test-producer', + 'enable.idempotence': 'true', + }) contents = {} for i in range(10): content = b'\x00'*19 + bytes([i]) sha1 = objstorages['src'].add(content) contents[sha1] = content - producer.send(topic=kafka_prefix+'.content', key=sha1, value={ - 'sha1': sha1, - 'status': 'visible', - }) + producer.produce( + topic=kafka_prefix+'.content', + key=key_to_kafka(sha1), + value=key_to_kafka({ + 'sha1': sha1, + 'status': 'visible', + }), + ) producer.flush() return contents @_patch_objstorages(['src', 'dst']) def test_replay_content( objstorages, storage: Storage, kafka_prefix: str, kafka_server: Tuple[Popen, int]): (_, kafka_port) = kafka_server kafka_prefix += '.swh.journal.objects' contents = _fill_objstorage_and_kafka( kafka_port, kafka_prefix, objstorages) result = invoke(False, [ 'content-replay', - '--broker', 'localhost:%d' % kafka_port, + '--broker', '127.0.0.1:%d' % kafka_port, '--group-id', 'test-cli-consumer', '--prefix', kafka_prefix, '--max-messages', '10', ]) expected = r'Done.\n' assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output for (sha1, content) in contents.items(): assert sha1 in objstorages['dst'], sha1 assert objstorages['dst'].get(sha1) == content @_patch_objstorages(['src', 'dst']) def test_replay_content_exclude( objstorages, storage: Storage, kafka_prefix: str, kafka_server: Tuple[Popen, int]): (_, kafka_port) = kafka_server kafka_prefix += '.swh.journal.objects' contents = _fill_objstorage_and_kafka( kafka_port, kafka_prefix, objstorages) excluded_contents = list(contents)[0::2] # picking half of them with tempfile.NamedTemporaryFile(mode='w+b') as fd: fd.write(b''.join(sorted(excluded_contents))) fd.seek(0) result = invoke(False, [ 'content-replay', - '--broker', 'localhost:%d' % kafka_port, + '--broker', '127.0.0.1:%d' % kafka_port, '--group-id', 'test-cli-consumer', '--prefix', kafka_prefix, '--max-messages', '10', '--exclude-sha1-file', fd.name, ]) expected = r'Done.\n' assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output for (sha1, content) in contents.items(): if sha1 in excluded_contents: assert sha1 not in objstorages['dst'], sha1 else: assert sha1 in objstorages['dst'], sha1 assert objstorages['dst'].get(sha1) == content diff --git a/swh/journal/tests/test_direct_writer.py b/swh/journal/tests/test_direct_writer.py index 564c62f..c06a1a6 100644 --- a/swh/journal/tests/test_direct_writer.py +++ b/swh/journal/tests/test_direct_writer.py @@ -1,106 +1,136 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import datetime -import time -from kafka import KafkaConsumer +from confluent_kafka import Consumer, KafkaException from subprocess import Popen from typing import Tuple from swh.storage import get_storage from swh.journal.direct_writer import DirectKafkaWriter -from swh.journal.serializers import value_to_kafka, kafka_to_value +from swh.journal.serializers import ( + kafka_to_key, kafka_to_value +) from .conftest import OBJECT_TYPE_KEYS -def assert_written(consumer, kafka_prefix): - time.sleep(0.1) # Without this, some messages are missing - +def assert_written(consumer, kafka_prefix, expected_messages): consumed_objects = defaultdict(list) - for msg in consumer: - consumed_objects[msg.topic].append((msg.key, msg.value)) + + fetched_messages = 0 + retries_left = 1000 + + while fetched_messages < expected_messages: + if retries_left == 0: + raise ValueError('Timed out fetching messages from kafka') + + msg = consumer.poll(timeout=0.01) + + if not msg: + retries_left -= 1 + continue + + error = msg.error() + if error is not None: + if error.fatal(): + raise KafkaException(error) + retries_left -= 1 + continue + + fetched_messages += 1 + consumed_objects[msg.topic()].append( + (kafka_to_key(msg.key()), kafka_to_value(msg.value())) + ) for (object_type, (key_name, objects)) in OBJECT_TYPE_KEYS.items(): topic = kafka_prefix + '.' + object_type (keys, values) = zip(*consumed_objects[topic]) if key_name: assert list(keys) == [object_[key_name] for object_ in objects] else: pass # TODO if object_type == 'origin_visit': for value in values: del value['visit'] elif object_type == 'content': for value in values: del value['ctime'] for object_ in objects: - assert kafka_to_value(value_to_kafka(object_)) in values + assert object_ in values def test_direct_writer( kafka_prefix: str, kafka_server: Tuple[Popen, int], - consumer: KafkaConsumer): + consumer: Consumer): kafka_prefix += '.swh.journal.objects' config = { 'brokers': 'localhost:%d' % kafka_server[1], 'client_id': 'direct_writer', 'prefix': kafka_prefix, } writer = DirectKafkaWriter(**config) + expected_messages = 0 + for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): for (num, object_) in enumerate(objects): if object_type == 'origin_visit': object_ = {**object_, 'visit': num} if object_type == 'content': object_ = {**object_, 'ctime': datetime.datetime.now()} writer.write_addition(object_type, object_) + expected_messages += 1 - assert_written(consumer, kafka_prefix) + assert_written(consumer, kafka_prefix, expected_messages) def test_storage_direct_writer( kafka_prefix: str, kafka_server: Tuple[Popen, int], - consumer: KafkaConsumer): + consumer: Consumer): kafka_prefix += '.swh.journal.objects' config = { 'brokers': 'localhost:%d' % kafka_server[1], 'client_id': 'direct_writer', 'prefix': kafka_prefix, } storage = get_storage('memory', {'journal_writer': { 'cls': 'kafka', 'args': config}}) + expected_messages = 0 + for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): method = getattr(storage, object_type + '_add') if object_type in ('content', 'directory', 'revision', 'release', 'snapshot', 'origin'): if object_type == 'content': objects = [{**obj, 'data': b''} for obj in objects] method(objects) + expected_messages += len(objects) elif object_type in ('origin_visit',): for object_ in objects: object_ = object_.copy() origin_id = storage.origin_add_one(object_.pop('origin')) del object_['type'] visit = method(origin=origin_id, date=object_.pop('date')) + expected_messages += 1 visit_id = visit['visit'] storage.origin_visit_update(origin_id, visit_id, **object_) + expected_messages += 1 else: assert False, object_type - assert_written(consumer, kafka_prefix) + assert_written(consumer, kafka_prefix, expected_messages) diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py index b62634d..eea01e6 100644 --- a/swh/journal/tests/test_replay.py +++ b/swh/journal/tests/test_replay.py @@ -1,205 +1,207 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import functools import random from subprocess import Popen from typing import Tuple import dateutil -from kafka import KafkaProducer +from confluent_kafka import Producer from hypothesis import strategies, given, settings from swh.storage import get_storage from swh.storage.in_memory import ENABLE_ORIGIN_IDS from swh.journal.client import JournalClient from swh.journal.serializers import key_to_kafka, value_to_kafka from swh.journal.replay import process_replay_objects, is_hash_in_bytearray from .conftest import OBJECT_TYPE_KEYS from .utils import MockedJournalClient, MockedKafkaWriter def test_storage_play( kafka_prefix: str, kafka_server: Tuple[Popen, int]): (_, port) = kafka_server kafka_prefix += '.swh.journal.objects' storage = get_storage('memory', {}) - producer = KafkaProducer( - bootstrap_servers='localhost:{}'.format(port), - key_serializer=key_to_kafka, - value_serializer=value_to_kafka, - client_id='test producer', - ) + producer = Producer({ + 'bootstrap.servers': 'localhost:{}'.format(port), + 'client.id': 'test producer', + 'enable.idempotence': 'true', + }) now = datetime.datetime.now(tz=datetime.timezone.utc) # Fill Kafka nb_sent = 0 nb_visits = 0 for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): topic = kafka_prefix + '.' + object_type for object_ in objects: key = bytes(random.randint(0, 255) for _ in range(40)) object_ = object_.copy() if object_type == 'content': object_['ctime'] = now elif object_type == 'origin_visit': nb_visits += 1 object_['visit'] = nb_visits - producer.send(topic, key=key, value=object_) + producer.produce( + topic=topic, key=key_to_kafka(key), + value=value_to_kafka(object_), + ) nb_sent += 1 + producer.flush() + # Fill the storage from Kafka config = { 'brokers': 'localhost:%d' % kafka_server[1], 'group_id': 'replayer', 'prefix': kafka_prefix, 'max_messages': nb_sent, } replayer = JournalClient(**config) worker_fn = functools.partial(process_replay_objects, storage=storage) nb_inserted = 0 while nb_inserted < nb_sent: nb_inserted += replayer.process(worker_fn) assert nb_sent == nb_inserted # Check the objects were actually inserted in the storage assert OBJECT_TYPE_KEYS['revision'][1] == \ list(storage.revision_get( [rev['id'] for rev in OBJECT_TYPE_KEYS['revision'][1]])) assert OBJECT_TYPE_KEYS['release'][1] == \ list(storage.release_get( [rel['id'] for rel in OBJECT_TYPE_KEYS['release'][1]])) origins = list(storage.origin_get( [orig for orig in OBJECT_TYPE_KEYS['origin'][1]])) assert OBJECT_TYPE_KEYS['origin'][1] == \ [{'url': orig['url'], 'type': orig['type']} for orig in origins] for origin in origins: origin_id_or_url = \ origin['id'] if ENABLE_ORIGIN_IDS else origin['url'] expected_visits = [ { **visit, 'origin': origin_id_or_url, 'date': dateutil.parser.parse(visit['date']), } for visit in OBJECT_TYPE_KEYS['origin_visit'][1] if visit['origin']['url'] == origin['url'] and visit['origin']['type'] == origin['type'] ] actual_visits = list(storage.origin_visit_get( origin_id_or_url)) for visit in actual_visits: del visit['visit'] # opaque identifier assert expected_visits == actual_visits contents = list(storage.content_get_metadata( [cont['sha1'] for cont in OBJECT_TYPE_KEYS['content'][1]])) assert None not in contents assert contents == OBJECT_TYPE_KEYS['content'][1] def _test_write_replay_origin_visit(visits): """Helper function to write tests for origin_visit. Each visit (a dict) given in the 'visits' argument will be sent to a (mocked) kafka queue, which a in-memory-storage backed replayer is listening to. Check that corresponding origin visits entities are present in the storage and have correct values. """ queue = [] replayer = MockedJournalClient(queue) writer = MockedKafkaWriter(queue) # Note that flipping the order of these two insertions will crash # the test, because the legacy origin_format does not allow to create # the origin when needed (type is missing) writer.send('origin', 'foo', { 'url': 'http://example.com/', 'type': 'git', }) for visit in visits: writer.send('origin_visit', 'foo', visit) - queue_size = sum(len(partition) - for batch in queue - for partition in batch.values()) + queue_size = len(queue) storage = get_storage('memory', {}) worker_fn = functools.partial(process_replay_objects, storage=storage) nb_messages = 0 while nb_messages < queue_size: nb_messages += replayer.process(worker_fn) actual_visits = list(storage.origin_visit_get('http://example.com/')) assert len(actual_visits) == len(visits), actual_visits for vin, vout in zip(visits, actual_visits): vin = vin.copy() vout = vout.copy() if ENABLE_ORIGIN_IDS: assert vout.pop('origin') == 1 else: assert vout.pop('origin') == 'http://example.com/' vin.pop('origin') vin.setdefault('type', 'git') vin.setdefault('metadata', None) assert vin == vout def test_write_replay_legacy_origin_visit1(): """Test origin_visit when the 'origin' is just a string.""" now = datetime.datetime.now() visits = [{ 'visit': 1, 'origin': 'http://example.com/', 'date': now, 'type': 'git', 'status': 'partial', 'snapshot': None, }] _test_write_replay_origin_visit(visits) def test_write_replay_legacy_origin_visit2(): """Test origin_visit when 'type' is missing.""" now = datetime.datetime.now() visits = [{ 'visit': 1, 'origin': { 'url': 'http://example.com/', 'type': 'git', }, 'date': now, 'type': 'git', 'status': 'partial', 'snapshot': None, }] _test_write_replay_origin_visit(visits) hash_strategy = strategies.binary(min_size=20, max_size=20) @settings(max_examples=500) @given(strategies.sets(hash_strategy, min_size=0, max_size=500), strategies.sets(hash_strategy, min_size=10)) def test_is_hash_in_bytearray(haystack, needles): array = b''.join(sorted(haystack)) needles |= haystack # Exhaustively test for all objects in the array for needle in needles: assert is_hash_in_bytearray(needle, array, len(haystack)) == \ (needle in haystack) diff --git a/swh/journal/tests/test_write_replay.py b/swh/journal/tests/test_write_replay.py index 5202fdb..41c2182 100644 --- a/swh/journal/tests/test_write_replay.py +++ b/swh/journal/tests/test_write_replay.py @@ -1,89 +1,85 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import functools from hypothesis import given, settings, HealthCheck from hypothesis.strategies import lists from swh.model.hypothesis_strategies import object_dicts from swh.storage.in_memory import Storage from swh.storage import HashCollision from swh.journal.replay import process_replay_objects from swh.journal.replay import process_replay_objects_content from .utils import MockedJournalClient, MockedKafkaWriter @given(lists(object_dicts(), min_size=1)) @settings(suppress_health_check=[HealthCheck.too_slow]) def test_write_replay_same_order_batches(objects): queue = [] replayer = MockedJournalClient(queue) storage1 = Storage() storage1.journal_writer = MockedKafkaWriter(queue) for (obj_type, obj) in objects: obj = obj.copy() if obj_type == 'origin_visit': storage1.origin_add_one(obj['origin']) storage1.origin_visit_upsert([obj]) else: method = getattr(storage1, obj_type + '_add') try: method([obj]) except HashCollision: pass - queue_size = sum(len(partition) - for batch in queue - for partition in batch.values()) + queue_size = len(queue) storage2 = Storage() worker_fn = functools.partial(process_replay_objects, storage=storage2) nb_messages = 0 while nb_messages < queue_size: nb_messages += replayer.process(worker_fn) assert replayer.consumer.committed for attr_name in ('_contents', '_directories', '_revisions', '_releases', '_snapshots', '_origin_visits', '_origins'): assert getattr(storage1, attr_name) == getattr(storage2, attr_name), \ attr_name # TODO: add test for hash collision @given(lists(object_dicts(), min_size=1)) @settings(suppress_health_check=[HealthCheck.too_slow]) def test_write_replay_content(objects): queue = [] replayer = MockedJournalClient(queue) storage1 = Storage() storage1.journal_writer = MockedKafkaWriter(queue) for (obj_type, obj) in objects: obj = obj.copy() if obj_type == 'content': storage1.content_add([obj]) - queue_size = sum(len(partition) - for batch in queue - for partition in batch.values()) + queue_size = len(queue) storage2 = Storage() worker_fn = functools.partial(process_replay_objects_content, src=storage1.objstorage, dst=storage2.objstorage) nb_messages = 0 while nb_messages < queue_size: nb_messages += replayer.process(worker_fn) assert storage1.objstorage.state == storage2.objstorage.state diff --git a/swh/journal/tests/utils.py b/swh/journal/tests/utils.py index 4812607..5e54876 100644 --- a/swh/journal/tests/utils.py +++ b/swh/journal/tests/utils.py @@ -1,45 +1,58 @@ -from collections import namedtuple - from swh.journal.client import JournalClient, ACCEPTED_OBJECT_TYPES from swh.journal.direct_writer import DirectKafkaWriter -from swh.journal.serializers import ( - key_to_kafka, kafka_to_key, value_to_kafka, kafka_to_value) +from swh.journal.serializers import (kafka_to_value, key_to_kafka, + value_to_kafka) + + +class FakeKafkaMessage: + def __init__(self, topic, key, value): + self._topic = topic + self._key = key_to_kafka(key) + self._value = value_to_kafka(value) + + def topic(self): + return self._topic -FakeKafkaMessage = namedtuple('FakeKafkaMessage', 'key value') -FakeKafkaPartition = namedtuple('FakeKafkaPartition', 'topic') + def value(self): + return self._value + + def key(self): + return self._key + + def error(self): + return None class MockedKafkaWriter(DirectKafkaWriter): def __init__(self, queue): self._prefix = 'prefix' self.queue = queue def send(self, topic, key, value): - key = kafka_to_key(key_to_kafka(key)) - value = kafka_to_value(value_to_kafka(value)) - partition = FakeKafkaPartition(topic) - msg = FakeKafkaMessage(key=key, value=value) - if self.queue and {partition} == set(self.queue[-1]): - # The last message is of the same object type, groupping them - self.queue[-1][partition].append(msg) - else: - self.queue.append({partition: [msg]}) + msg = FakeKafkaMessage(topic=topic, key=key, value=value) + self.queue.append(msg) + + def flush(self): + pass class MockedKafkaConsumer: def __init__(self, queue): self.queue = queue self.committed = False - def poll(self): + def poll(self, timeout=None): return self.queue.pop(0) def commit(self): if self.queue == []: self.committed = True class MockedJournalClient(JournalClient): def __init__(self, queue, object_types=ACCEPTED_OBJECT_TYPES): self._object_types = object_types self.consumer = MockedKafkaConsumer(queue) + self.process_timeout = 0 + self.max_messages = 0 + self.value_deserializer = kafka_to_value