diff --git a/swh/journal/cli.py b/swh/journal/cli.py index 3be79e4..4b5aea3 100644 --- a/swh/journal/cli.py +++ b/swh/journal/cli.py @@ -1,271 +1,250 @@ # Copyright (C) 2016-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import functools import logging import mmap import os -import time import click try: from systemd.daemon import notify except ImportError: notify = None from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.model.model import SHA1_SIZE from swh.storage import get_storage from swh.objstorage import get_objstorage from swh.journal.client import JournalClient from swh.journal.replay import is_hash_in_bytearray from swh.journal.replay import process_replay_objects from swh.journal.replay import process_replay_objects_content from swh.journal.backfill import JournalBackfiller @click.group(name='journal', context_settings=CONTEXT_SETTINGS) @click.option('--config-file', '-C', default=None, type=click.Path(exists=True, dir_okay=False,), help="Configuration file.") @click.pass_context def cli(ctx, config_file): """Software Heritage Journal tools. The journal is a persistent logger of changes to the archive, with publish-subscribe support. """ if not config_file: config_file = os.environ.get('SWH_CONFIG_FILENAME') if config_file: if not os.path.exists(config_file): raise ValueError('%s does not exist' % config_file) conf = config.read(config_file) else: conf = {} ctx.ensure_object(dict) ctx.obj['config'] = conf def get_journal_client(ctx, **kwargs): conf = ctx.obj['config'].get('journal', {}) conf.update({k: v for (k, v) in kwargs.items() if v not in (None, ())}) if not conf.get('brokers'): ctx.fail('You must specify at least one kafka broker.') if not isinstance(conf['brokers'], (list, tuple)): conf['brokers'] = [conf['brokers']] return JournalClient(**conf) @cli.command() -@click.option('--max-messages', '-m', default=None, type=int, - help='Maximum number of objects to replay. Default is to ' +@click.option('--stop-after-objects', '-n', default=None, type=int, + help='Stop after processing this many objects. Default is to ' 'run forever.') @click.option('--broker', 'brokers', type=str, multiple=True, help='Kafka broker to connect to. ' '(deprecated, use the config file instead)') @click.option('--prefix', type=str, default=None, help='Prefix of Kafka topic names to read from. ' '(deprecated, use the config file instead)') @click.option('--group-id', type=str, help='Name of the group id for reading from Kafka. ' '(deprecated, use the config file instead)') @click.pass_context -def replay(ctx, brokers, prefix, group_id, max_messages): +def replay(ctx, brokers, prefix, group_id, stop_after_objects): """Fill a Storage by reading a Journal. There can be several 'replayers' filling a Storage as long as they use the same `group-id`. """ - logger = logging.getLogger(__name__) conf = ctx.obj['config'] try: storage = get_storage(**conf.pop('storage')) except KeyError: ctx.fail('You must have a storage configured in your config file.') client = get_journal_client( ctx, brokers=brokers, prefix=prefix, group_id=group_id, - max_messages=max_messages) + stop_after_objects=stop_after_objects) worker_fn = functools.partial(process_replay_objects, storage=storage) if notify: notify('READY=1') try: - nb_messages = 0 - last_log_time = 0 - while not max_messages or nb_messages < max_messages: - nb_messages += client.process(worker_fn) - if notify: - notify('WATCHDOG=1') - if time.time() - last_log_time >= 60: - # Log at most once per minute. - logger.info('Processed %d messages.' % nb_messages) - last_log_time = time.time() + client.process(worker_fn) except KeyboardInterrupt: ctx.exit(0) else: print('Done.') finally: if notify: notify('STOPPING=1') client.close() @cli.command() @click.argument('object_type') @click.option('--start-object', default=None) @click.option('--end-object', default=None) @click.option('--dry-run', is_flag=True, default=False) @click.pass_context def backfiller(ctx, object_type, start_object, end_object, dry_run): """Run the backfiller The backfiller list objects from a Storage and produce journal entries from there. Typically used to rebuild a journal or compensate for missing objects in a journal (eg. due to a downtime of this later). The configuration file requires the following entries: - brokers: a list of kafka endpoints (the journal) in which entries will be added. - storage_dbconn: URL to connect to the storage DB. - prefix: the prefix of the topics (topics will be .). - client_id: the kafka client ID. """ conf = ctx.obj['config'] backfiller = JournalBackfiller(conf) if notify: notify('READY=1') try: backfiller.run( object_type=object_type, start_object=start_object, end_object=end_object, dry_run=dry_run) except KeyboardInterrupt: if notify: notify('STOPPING=1') ctx.exit(0) @cli.command('content-replay') -@click.option('--max-messages', '-m', default=None, type=int, - help='Maximum number of objects to replay. Default is to ' +@click.option('--stop-after-objects', '-n', default=None, type=int, + help='Stop after processing this many objects. Default is to ' 'run forever.') @click.option('--broker', 'brokers', type=str, multiple=True, help='Kafka broker to connect to.' '(deprecated, use the config file instead)') @click.option('--prefix', type=str, default=None, help='Prefix of Kafka topic names to read from.' '(deprecated, use the config file instead)') @click.option('--group-id', type=str, help='Name of the group id for reading from Kafka.' '(deprecated, use the config file instead)') @click.option('--exclude-sha1-file', default=None, type=click.File('rb'), help='File containing a sorted array of hashes to be excluded.') @click.option('--check-dst/--no-check-dst', default=True, help='Check whether the destination contains the object before ' 'copying.') @click.pass_context -def content_replay(ctx, max_messages, +def content_replay(ctx, stop_after_objects, brokers, prefix, group_id, exclude_sha1_file, check_dst): """Fill a destination Object Storage (typically a mirror) by reading a Journal and retrieving objects from an existing source ObjStorage. There can be several 'replayers' filling a given ObjStorage as long as they use the same `group-id`. You can use the `KAFKA_GROUP_INSTANCE_ID` environment variable to use KIP-345 static group membership. This service retrieves object ids to copy from the 'content' topic. It will only copy object's content if the object's description in the kafka nmessage has the status:visible set. `--exclude-sha1-file` may be used to exclude some hashes to speed-up the replay in case many of the contents are already in the destination objstorage. It must contain a concatenation of all (sha1) hashes, and it must be sorted. This file will not be fully loaded into memory at any given time, so it can be arbitrarily large. `--check-dst` sets whether the replayer should check in the destination ObjStorage before copying an object. You can turn that off if you know you're copying to an empty ObjStorage. """ - logger = logging.getLogger(__name__) conf = ctx.obj['config'] try: objstorage_src = get_objstorage(**conf.pop('objstorage_src')) except KeyError: ctx.fail('You must have a source objstorage configured in ' 'your config file.') try: objstorage_dst = get_objstorage(**conf.pop('objstorage_dst')) except KeyError: ctx.fail('You must have a destination objstorage configured ' 'in your config file.') if exclude_sha1_file: map_ = mmap.mmap(exclude_sha1_file.fileno(), 0, prot=mmap.PROT_READ) if map_.size() % SHA1_SIZE != 0: ctx.fail('--exclude-sha1 must link to a file whose size is an ' 'exact multiple of %d bytes.' % SHA1_SIZE) nb_excluded_hashes = int(map_.size()/SHA1_SIZE) def exclude_fn(obj): return is_hash_in_bytearray(obj['sha1'], map_, nb_excluded_hashes) else: exclude_fn = None client = get_journal_client( ctx, brokers=brokers, prefix=prefix, group_id=group_id, - max_messages=max_messages, object_types=('content',)) + stop_after_objects=stop_after_objects, object_types=('content',)) worker_fn = functools.partial( process_replay_objects_content, src=objstorage_src, dst=objstorage_dst, exclude_fn=exclude_fn, check_dst=check_dst) if notify: notify('READY=1') try: - nb_messages = 0 - last_log_time = 0 - while not max_messages or nb_messages < max_messages: - nb_messages += client.process(worker_fn) - if notify: - notify('WATCHDOG=1') - if time.time() - last_log_time >= 60: - # Log at most once per minute. - logger.info('Processed %d messages.' % nb_messages) - last_log_time = time.time() + client.process(worker_fn) except KeyboardInterrupt: ctx.exit(0) else: print('Done.') finally: if notify: notify('STOPPING=1') client.close() def main(): logging.basicConfig() return cli(auto_envvar_prefix='SWH_JOURNAL') if __name__ == '__main__': main() diff --git a/swh/journal/client.py b/swh/journal/client.py index a9a1f3e..b0b363d 100644 --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -1,263 +1,277 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import logging import os import time from typing import Any, Dict, List, Optional, Set, Tuple, Union from confluent_kafka import Consumer, KafkaException, KafkaError from .serializers import kafka_to_value from swh.journal import DEFAULT_PREFIX logger = logging.getLogger(__name__) rdkafka_logger = logging.getLogger(__name__ + '.rdkafka') # Only accepted offset reset policy accepted ACCEPTED_OFFSET_RESET = ['earliest', 'latest'] # Only accepted object types ACCEPTED_OBJECT_TYPES = [ 'content', 'directory', 'revision', 'release', 'snapshot', 'origin', 'origin_visit' ] # Errors that Kafka raises too often and are not useful; therefore they # we lower their log level to DEBUG instead of INFO. _SPAMMY_ERRORS = [ KafkaError._NO_OFFSET, ] def _error_cb(error): if error.fatal(): raise KafkaException(error) if error.code() in _SPAMMY_ERRORS: logger.debug('Received non-fatal kafka error: %s', error) else: logger.info('Received non-fatal kafka error: %s', error) def _on_commit(error, partitions): if error is not None: _error_cb(error) class JournalClient: """A base client for the Software Heritage journal. The current implementation of the journal uses Apache Kafka brokers to publish messages under a given topic prefix, with each - object type using a specific topic under that prefix. If the 'prefix' + object type using a specific topic under that prefix. If the `prefix` argument is None (default value), it will take the default value - 'swh.journal.objects'. + `'swh.journal.objects'`. Clients subscribe to events specific to each object type as listed in the `object_types` argument (if unset, defaults to all accepted object types). Clients can be sharded by setting the `group_id` to a common value across instances. The journal will share the message throughput across the nodes sharing the same group_id. - Messages are processed by the `worker_fn` callback passed to the - `process` method, in batches of maximum `max_messages`. + Messages are processed by the `worker_fn` callback passed to the `process` + method, in batches of maximum 20 messages (currently hardcoded). If set, + the processing stops after processing `stop_after_objects` messages in + total. + + `stop_on_eof` stops the processing when the client has reached the end of + each partition in turn. + + `auto_offset_reset` sets the behavior of the client when the consumer group + initializes: `'earliest'` (the default) processes all objects since the + inception of the topics; `''` Any other named argument is passed directly to KafkaConsumer(). """ def __init__( self, brokers: Union[str, List[str]], group_id: str, prefix: Optional[str] = None, object_types: Optional[List[str]] = None, - max_messages: Optional[int] = None, + stop_after_objects: Optional[int] = None, process_timeout: Optional[float] = None, auto_offset_reset: str = 'earliest', stop_on_eof: bool = False, **kwargs ): if prefix is None: prefix = DEFAULT_PREFIX if object_types is None: object_types = ACCEPTED_OBJECT_TYPES if auto_offset_reset not in ACCEPTED_OFFSET_RESET: raise ValueError( 'Option \'auto_offset_reset\' only accept %s, not %s' % (ACCEPTED_OFFSET_RESET, auto_offset_reset)) for object_type in object_types: if object_type not in ACCEPTED_OBJECT_TYPES: raise ValueError( 'Option \'object_types\' only accepts %s, not %s.' % (ACCEPTED_OBJECT_TYPES, object_type)) self.value_deserializer = kafka_to_value if isinstance(brokers, str): brokers = [brokers] debug_logging = rdkafka_logger.isEnabledFor(logging.DEBUG) if debug_logging and 'debug' not in kwargs: kwargs['debug'] = 'consumer' # Static group instance id management group_instance_id = os.environ.get('KAFKA_GROUP_INSTANCE_ID') if group_instance_id: kwargs['group.instance.id'] = group_instance_id if 'group.instance.id' in kwargs: # When doing static consumer group membership, set a higher default # session timeout. The session timeout is the duration after which # the broker considers that a consumer has left the consumer group # for good, and triggers a rebalance. Considering our current # processing pattern, 10 minutes gives the consumer ample time to # restart before that happens. if 'session.timeout.ms' not in kwargs: kwargs['session.timeout.ms'] = 10 * 60 * 1000 # 10 minutes if 'session.timeout.ms' in kwargs: # When the session timeout is set, rdkafka requires the max poll # interval to be set to a higher value; the max poll interval is # rdkafka's way of figuring out whether the client's message # processing thread has stalled: when the max poll interval lapses # between two calls to consumer.poll(), rdkafka leaves the consumer # group and terminates the connection to the brokers. # # We default to 1.5 times the session timeout if 'max.poll.interval.ms' not in kwargs: kwargs['max.poll.interval.ms'] = ( kwargs['session.timeout.ms'] // 2 * 3 ) consumer_settings = { **kwargs, 'bootstrap.servers': ','.join(brokers), 'auto.offset.reset': auto_offset_reset, 'group.id': group_id, 'on_commit': _on_commit, 'error_cb': _error_cb, 'enable.auto.commit': False, 'logger': rdkafka_logger, } self.stop_on_eof = stop_on_eof if self.stop_on_eof: consumer_settings['enable.partition.eof'] = True logger.debug('Consumer settings: %s', consumer_settings) self.consumer = Consumer(consumer_settings) topics = ['%s.%s' % (prefix, object_type) for object_type in object_types] logger.debug('Upstream topics: %s', self.consumer.list_topics(timeout=10)) logger.debug('Subscribing to: %s', topics) self.consumer.subscribe(topics=topics) - self.max_messages = max_messages + self.stop_after_objects = stop_after_objects self.process_timeout = process_timeout self.eof_reached: Set[Tuple[str, str]] = set() self._object_types = object_types def process(self, worker_fn): """Polls Kafka for a batch of messages, and calls the worker_fn with these messages. Args: worker_fn Callable[Dict[str, List[dict]]]: Function called with the messages as argument. """ start_time = time.monotonic() - nb_messages = 0 + total_objects_processed = 0 while True: # timeout for message poll timeout = 1.0 elapsed = time.monotonic() - start_time if self.process_timeout: # +0.01 to prevent busy-waiting on / spamming consumer.poll. # consumer.consume() returns shortly before X expired # (a matter of milliseconds), so after it returns a first # time, it would then be called with a timeout in the order # of milliseconds, therefore returning immediately, then be # called again, etc. if elapsed + 0.01 >= self.process_timeout: break timeout = self.process_timeout - elapsed - num_messages = 20 + batch_size = 20 - if self.max_messages: - if nb_messages >= self.max_messages: + if self.stop_after_objects: + if total_objects_processed >= self.stop_after_objects: break - num_messages = min(num_messages, self.max_messages-nb_messages) + + # clamp batch size to avoid overrunning stop_after_objects + batch_size = min( + self.stop_after_objects-total_objects_processed, + batch_size, + ) messages = self.consumer.consume( - timeout=timeout, num_messages=num_messages) + timeout=timeout, num_messages=batch_size) if not messages: continue - nb_processed, at_eof = self.handle_messages(messages, worker_fn) - nb_messages += nb_processed + batch_processed, at_eof = self.handle_messages(messages, worker_fn) + total_objects_processed += batch_processed if at_eof: break - return nb_messages + return total_objects_processed def handle_messages(self, messages, worker_fn): objects: Dict[str, List[Any]] = defaultdict(list) nb_processed = 0 for message in messages: error = message.error() if error is not None: if error.code() == KafkaError._PARTITION_EOF: self.eof_reached.add( (message.topic(), message.partition()) ) else: _error_cb(error) continue nb_processed += 1 object_type = message.topic().split('.')[-1] # Got a message from a topic we did not subscribe to. assert object_type in self._object_types, object_type objects[object_type].append(self.deserialize_message(message)) if objects: worker_fn(dict(objects)) self.consumer.commit() at_eof = (self.stop_on_eof and all( (tp.topic, tp.partition) in self.eof_reached for tp in self.consumer.assignment() )) return nb_processed, at_eof def deserialize_message(self, message): return self.value_deserializer(message.value()) def close(self): self.consumer.close() diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py index 782e371..2689ca5 100644 --- a/swh/journal/tests/conftest.py +++ b/swh/journal/tests/conftest.py @@ -1,245 +1,245 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import pytest import logging import random import string from confluent_kafka import Consumer from subprocess import Popen from typing import Any, Dict, List, Optional, Tuple from pathlib import Path from pytest_kafka import ( make_zookeeper_process, make_kafka_server, ZOOKEEPER_CONFIG_TEMPLATE, ) from swh.model.hashutil import hash_to_bytes logger = logging.getLogger(__name__) CONTENTS = [ { 'length': 3, 'sha1': hash_to_bytes( '34973274ccef6ab4dfaaf86599792fa9c3fe4689'), 'sha1_git': b'foo', 'blake2s256': b'bar', 'sha256': b'baz', 'status': 'visible', }, ] COMMITTERS = [ { 'fullname': b'foo', 'name': b'foo', 'email': b'', }, { 'fullname': b'bar', 'name': b'bar', 'email': b'', } ] DATES = [ { 'timestamp': { 'seconds': 1234567891, 'microseconds': 0, }, 'offset': 120, 'negative_utc': None, }, { 'timestamp': { 'seconds': 1234567892, 'microseconds': 0, }, 'offset': 120, 'negative_utc': None, } ] REVISIONS = [ { 'id': hash_to_bytes('7026b7c1a2af56521e951c01ed20f255fa054238'), 'message': b'hello', 'date': DATES[0], 'committer': COMMITTERS[0], 'author': COMMITTERS[0], 'committer_date': DATES[0], 'type': 'git', 'directory': '\x01'*20, 'synthetic': False, 'metadata': None, 'parents': [], }, { 'id': hash_to_bytes('368a48fe15b7db2383775f97c6b247011b3f14f4'), 'message': b'hello again', 'date': DATES[1], 'committer': COMMITTERS[1], 'author': COMMITTERS[1], 'committer_date': DATES[1], 'type': 'hg', 'directory': '\x02'*20, 'synthetic': False, 'metadata': None, 'parents': [], }, ] RELEASES = [ { 'id': hash_to_bytes('d81cc0710eb6cf9efd5b920a8453e1e07157b6cd'), 'name': b'v0.0.1', 'date': { 'timestamp': { 'seconds': 1234567890, 'microseconds': 0, }, 'offset': 120, 'negative_utc': None, }, 'author': COMMITTERS[0], 'target_type': 'revision', 'target': b'\x04'*20, 'message': b'foo', 'synthetic': False, }, ] ORIGINS = [ { 'url': 'https://somewhere.org/den/fox', }, { 'url': 'https://overtherainbow.org/fox/den', } ] ORIGIN_VISITS = [ { 'origin': ORIGINS[0]['url'], 'date': '2013-05-07 04:20:39.369271+00:00', 'snapshot': None, # TODO 'status': 'ongoing', # TODO 'metadata': {'foo': 'bar'}, 'type': 'git', }, { 'origin': ORIGINS[0]['url'], 'date': '2018-11-27 17:20:39+00:00', 'snapshot': None, # TODO 'status': 'ongoing', # TODO 'metadata': {'baz': 'qux'}, 'type': 'git', } ] # From type to tuple (id, ) OBJECT_TYPE_KEYS = { 'content': ('sha1', CONTENTS), 'revision': ('id', REVISIONS), 'release': ('id', RELEASES), 'origin': (None, ORIGINS), 'origin_visit': (None, ORIGIN_VISITS), } # type: Dict[str, Tuple[Optional[str], List[Dict[str, Any]]]] KAFKA_ROOT = os.environ.get('SWH_KAFKA_ROOT') KAFKA_ROOT = KAFKA_ROOT if KAFKA_ROOT else os.path.dirname(__file__) + '/kafka' if not os.path.exists(KAFKA_ROOT): msg = ('Development error: %s must exist and target an ' 'existing kafka installation' % KAFKA_ROOT) raise ValueError(msg) KAFKA_SCRIPTS = Path(KAFKA_ROOT) / 'bin' KAFKA_BIN = str(KAFKA_SCRIPTS / 'kafka-server-start.sh') ZOOKEEPER_BIN = str(KAFKA_SCRIPTS / 'zookeeper-server-start.sh') ZK_CONFIG_TEMPLATE = ZOOKEEPER_CONFIG_TEMPLATE + '\nadmin.enableServer=false\n' # Those defines fixtures zookeeper_proc = make_zookeeper_process(ZOOKEEPER_BIN, zk_config_template=ZK_CONFIG_TEMPLATE, scope='session') os.environ['KAFKA_LOG4J_OPTS'] = \ '-Dlog4j.configuration=file:%s/log4j.properties' % \ os.path.dirname(__file__) kafka_server = make_kafka_server(KAFKA_BIN, 'zookeeper_proc', scope='session') kafka_logger = logging.getLogger('kafka') kafka_logger.setLevel(logging.WARN) @pytest.fixture(scope='function') def kafka_prefix(): """Pick a random prefix for kafka topics on each call""" return ''.join(random.choice(string.ascii_lowercase) for _ in range(10)) @pytest.fixture(scope='function') def kafka_consumer_group(kafka_prefix: str): """Pick a random consumer group for kafka consumers on each call""" return "test-consumer-%s" % kafka_prefix TEST_CONFIG = { 'consumer_id': 'swh.journal.consumer', 'object_types': OBJECT_TYPE_KEYS.keys(), - 'max_messages': 1, # will read 1 message and stops + 'stop_after_objects': 1, # will read 1 object and stop 'storage': {'cls': 'memory', 'args': {}}, } @pytest.fixture def test_config(kafka_server: Tuple[Popen, int], kafka_prefix: str): """Test configuration needed for producer/consumer """ _, port = kafka_server return { **TEST_CONFIG, 'brokers': ['127.0.0.1:{}'.format(port)], 'prefix': kafka_prefix + '.swh.journal.objects', } @pytest.fixture def consumer( kafka_server: Tuple[Popen, int], test_config: Dict, kafka_consumer_group: str, ) -> Consumer: """Get a connected Kafka consumer. """ _, kafka_port = kafka_server consumer = Consumer({ 'bootstrap.servers': '127.0.0.1:{}'.format(kafka_port), 'auto.offset.reset': 'earliest', 'enable.auto.commit': True, 'group.id': kafka_consumer_group, }) kafka_topics = [ '%s.%s' % (test_config['prefix'], object_type) for object_type in test_config['object_types'] ] consumer.subscribe(kafka_topics) yield consumer consumer.close() diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py index 6ff7650..b85ed8c 100644 --- a/swh/journal/tests/test_cli.py +++ b/swh/journal/tests/test_cli.py @@ -1,617 +1,617 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import Counter import functools import logging import re import tempfile from subprocess import Popen from typing import Any, Dict, Tuple from unittest.mock import patch from click.testing import CliRunner from confluent_kafka import Producer import pytest from swh.model.hashutil import hash_to_hex from swh.objstorage.backends.in_memory import InMemoryObjStorage from swh.storage import get_storage from swh.journal.cli import cli from swh.journal.replay import CONTENT_REPLAY_RETRIES from swh.journal.serializers import key_to_kafka, value_to_kafka logger = logging.getLogger(__name__) CLI_CONFIG = ''' storage: cls: memory args: {} objstorage_src: cls: mocked args: name: src objstorage_dst: cls: mocked args: name: dst ''' @pytest.fixture def storage(): """An swh-storage object that gets injected into the CLI functions.""" storage_config = { 'cls': 'pipeline', 'steps': [ {'cls': 'validate'}, {'cls': 'memory'}, ] } storage = get_storage(**storage_config) with patch('swh.journal.cli.get_storage') as get_storage_mock: get_storage_mock.return_value = storage yield storage @pytest.fixture def monkeypatch_retry_sleep(monkeypatch): from swh.journal.replay import copy_object, obj_in_objstorage monkeypatch.setattr(copy_object.retry, 'sleep', lambda x: None) monkeypatch.setattr(obj_in_objstorage.retry, 'sleep', lambda x: None) def invoke(catch_exceptions, args, env=None): runner = CliRunner() with tempfile.NamedTemporaryFile('a', suffix='.yml') as config_fd: config_fd.write(CLI_CONFIG) config_fd.seek(0) args = ['-C' + config_fd.name] + args result = runner.invoke( cli, args, obj={'log_level': logging.DEBUG}, env=env, ) if not catch_exceptions and result.exception: print(result.output) raise result.exception return result def test_replay( storage, kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int]): (_, port) = kafka_server kafka_prefix += '.swh.journal.objects' producer = Producer({ 'bootstrap.servers': 'localhost:{}'.format(port), 'client.id': 'test-producer', 'enable.idempotence': 'true', }) snapshot = {'id': b'foo', 'branches': { b'HEAD': { 'target_type': 'revision', 'target': b'\x01'*20, } }} # type: Dict[str, Any] producer.produce( topic=kafka_prefix+'.snapshot', key=key_to_kafka(snapshot['id']), value=value_to_kafka(snapshot), ) producer.flush() logger.debug('Flushed producer') result = invoke(False, [ 'replay', '--broker', '127.0.0.1:%d' % port, '--group-id', kafka_consumer_group, '--prefix', kafka_prefix, - '--max-messages', '1', + '--stop-after-objects', '1', ]) expected = r'Done.\n' assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output assert storage.snapshot_get(snapshot['id']) == { **snapshot, 'next_branch': None} def _patch_objstorages(names): objstorages = {name: InMemoryObjStorage() for name in names} def get_mock_objstorage(cls, args): assert cls == 'mocked', cls return objstorages[args['name']] def decorator(f): @functools.wraps(f) @patch('swh.journal.cli.get_objstorage') def newf(get_objstorage_mock, *args, **kwargs): get_objstorage_mock.side_effect = get_mock_objstorage f(*args, objstorages=objstorages, **kwargs) return newf return decorator NUM_CONTENTS = 10 def _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages): producer = Producer({ 'bootstrap.servers': '127.0.0.1:{}'.format(kafka_port), 'client.id': 'test-producer', 'enable.idempotence': 'true', }) contents = {} for i in range(NUM_CONTENTS): content = b'\x00'*19 + bytes([i]) sha1 = objstorages['src'].add(content) contents[sha1] = content producer.produce( topic=kafka_prefix+'.content', key=key_to_kafka(sha1), value=key_to_kafka({ 'sha1': sha1, 'status': 'visible', }), ) producer.flush() return contents @_patch_objstorages(['src', 'dst']) def test_replay_content( objstorages, storage, kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int]): (_, kafka_port) = kafka_server kafka_prefix += '.swh.journal.objects' contents = _fill_objstorage_and_kafka( kafka_port, kafka_prefix, objstorages) result = invoke(False, [ 'content-replay', '--broker', '127.0.0.1:%d' % kafka_port, '--group-id', kafka_consumer_group, '--prefix', kafka_prefix, - '--max-messages', str(NUM_CONTENTS), + '--stop-after-objects', str(NUM_CONTENTS), ]) expected = r'Done.\n' assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output for (sha1, content) in contents.items(): assert sha1 in objstorages['dst'], sha1 assert objstorages['dst'].get(sha1) == content @_patch_objstorages(['src', 'dst']) def test_replay_content_structured_log( objstorages, storage, kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int], caplog): (_, kafka_port) = kafka_server kafka_prefix += '.swh.journal.objects' contents = _fill_objstorage_and_kafka( kafka_port, kafka_prefix, objstorages) caplog.set_level(logging.DEBUG, 'swh.journal.replay') expected_obj_ids = set(hash_to_hex(sha1) for sha1 in contents) result = invoke(False, [ 'content-replay', '--broker', '127.0.0.1:%d' % kafka_port, '--group-id', kafka_consumer_group, '--prefix', kafka_prefix, - '--max-messages', str(NUM_CONTENTS), + '--stop-after-objects', str(NUM_CONTENTS), ]) expected = r'Done.\n' assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output copied = set() for record in caplog.records: logtext = record.getMessage() if 'copied' in logtext: copied.add(record.args['obj_id']) assert copied == expected_obj_ids, ( "Mismatched logging; see captured log output for details." ) @_patch_objstorages(['src', 'dst']) def test_replay_content_static_group_id( objstorages, storage, kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int], caplog): (_, kafka_port) = kafka_server kafka_prefix += '.swh.journal.objects' contents = _fill_objstorage_and_kafka( kafka_port, kafka_prefix, objstorages) # Setup log capture to fish the consumer settings out of the log messages caplog.set_level(logging.DEBUG, 'swh.journal.client') result = invoke(False, [ 'content-replay', '--broker', '127.0.0.1:%d' % kafka_port, '--group-id', kafka_consumer_group, '--prefix', kafka_prefix, - '--max-messages', str(NUM_CONTENTS), + '--stop-after-objects', str(NUM_CONTENTS), ], {'KAFKA_GROUP_INSTANCE_ID': 'static-group-instance-id'}) expected = r'Done.\n' assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output consumer_settings = None for record in caplog.records: if 'Consumer settings' in record.message: consumer_settings = record.args break assert consumer_settings is not None, ( 'Failed to get consumer settings out of the consumer log. ' 'See log capture for details.' ) assert consumer_settings['group.instance.id'] == 'static-group-instance-id' assert consumer_settings['session.timeout.ms'] == 60 * 10 * 1000 assert consumer_settings['max.poll.interval.ms'] == 90 * 10 * 1000 for (sha1, content) in contents.items(): assert sha1 in objstorages['dst'], sha1 assert objstorages['dst'].get(sha1) == content @_patch_objstorages(['src', 'dst']) def test_replay_content_exclude( objstorages, storage, kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int]): (_, kafka_port) = kafka_server kafka_prefix += '.swh.journal.objects' contents = _fill_objstorage_and_kafka( kafka_port, kafka_prefix, objstorages) excluded_contents = list(contents)[0::2] # picking half of them with tempfile.NamedTemporaryFile(mode='w+b') as fd: fd.write(b''.join(sorted(excluded_contents))) fd.seek(0) result = invoke(False, [ 'content-replay', '--broker', '127.0.0.1:%d' % kafka_port, '--group-id', kafka_consumer_group, '--prefix', kafka_prefix, - '--max-messages', str(NUM_CONTENTS), + '--stop-after-objects', str(NUM_CONTENTS), '--exclude-sha1-file', fd.name, ]) expected = r'Done.\n' assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output for (sha1, content) in contents.items(): if sha1 in excluded_contents: assert sha1 not in objstorages['dst'], sha1 else: assert sha1 in objstorages['dst'], sha1 assert objstorages['dst'].get(sha1) == content NUM_CONTENTS_DST = 5 @_patch_objstorages(['src', 'dst']) @pytest.mark.parametrize("check_dst,expected_copied,expected_in_dst", [ (True, NUM_CONTENTS - NUM_CONTENTS_DST, NUM_CONTENTS_DST), (False, NUM_CONTENTS, 0), ]) def test_replay_content_check_dst( objstorages, storage, kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int], check_dst: bool, expected_copied: int, expected_in_dst: int, caplog): (_, kafka_port) = kafka_server kafka_prefix += '.swh.journal.objects' contents = _fill_objstorage_and_kafka( kafka_port, kafka_prefix, objstorages) for i, (sha1, content) in enumerate(contents.items()): if i >= NUM_CONTENTS_DST: break objstorages['dst'].add(content, obj_id=sha1) caplog.set_level(logging.DEBUG, 'swh.journal.replay') result = invoke(False, [ 'content-replay', '--broker', '127.0.0.1:%d' % kafka_port, '--group-id', kafka_consumer_group, '--prefix', kafka_prefix, - '--max-messages', str(NUM_CONTENTS), + '--stop-after-objects', str(NUM_CONTENTS), '--check-dst' if check_dst else '--no-check-dst', ]) expected = r'Done.\n' assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output copied = 0 in_dst = 0 for record in caplog.records: logtext = record.getMessage() if 'copied' in logtext: copied += 1 elif 'in dst' in logtext: in_dst += 1 assert (copied == expected_copied and in_dst == expected_in_dst), ( "Unexpected amount of objects copied, see the captured log for details" ) for (sha1, content) in contents.items(): assert sha1 in objstorages['dst'], sha1 assert objstorages['dst'].get(sha1) == content class FlakyObjStorage(InMemoryObjStorage): def __init__(self, *args, **kwargs): state = kwargs.pop('state') self.failures_left = Counter(kwargs.pop('failures')) super().__init__(*args, **kwargs) if state: self.state = state def flaky_operation(self, op, obj_id): if self.failures_left[op, obj_id] > 0: self.failures_left[op, obj_id] -= 1 raise RuntimeError( 'Failed %s on %s' % (op, hash_to_hex(obj_id)) ) def get(self, obj_id): self.flaky_operation('get', obj_id) return super().get(obj_id) def add(self, data, obj_id=None, check_presence=True): self.flaky_operation('add', obj_id) return super().add(data, obj_id=obj_id, check_presence=check_presence) def __contains__(self, obj_id): self.flaky_operation('in', obj_id) return super().__contains__(obj_id) @_patch_objstorages(['src', 'dst']) def test_replay_content_check_dst_retry( objstorages, storage, kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int], monkeypatch_retry_sleep): (_, kafka_port) = kafka_server kafka_prefix += '.swh.journal.objects' contents = _fill_objstorage_and_kafka( kafka_port, kafka_prefix, objstorages) failures = {} for i, (sha1, content) in enumerate(contents.items()): if i >= NUM_CONTENTS_DST: break objstorages['dst'].add(content, obj_id=sha1) failures['in', sha1] = 1 orig_dst = objstorages['dst'] objstorages['dst'] = FlakyObjStorage(state=orig_dst.state, failures=failures) result = invoke(False, [ 'content-replay', '--broker', '127.0.0.1:%d' % kafka_port, '--group-id', kafka_consumer_group, '--prefix', kafka_prefix, - '--max-messages', str(NUM_CONTENTS), + '--stop-after-objects', str(NUM_CONTENTS), '--check-dst', ]) expected = r'Done.\n' assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output for (sha1, content) in contents.items(): assert sha1 in objstorages['dst'], sha1 assert objstorages['dst'].get(sha1) == content @_patch_objstorages(['src', 'dst']) def test_replay_content_failed_copy_retry( objstorages, storage, kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int], caplog, monkeypatch_retry_sleep): (_, kafka_port) = kafka_server kafka_prefix += '.swh.journal.objects' contents = _fill_objstorage_and_kafka( kafka_port, kafka_prefix, objstorages) add_failures = {} get_failures = {} definitely_failed = set() # We want to generate operations failing 1 to CONTENT_REPLAY_RETRIES times. # We generate failures for 2 different operations, get and add. num_retry_contents = 2 * CONTENT_REPLAY_RETRIES assert num_retry_contents < NUM_CONTENTS, ( "Need to generate more test contents to properly test retry behavior" ) for i, sha1 in enumerate(contents): if i >= num_retry_contents: break # This generates a number of failures, up to CONTENT_REPLAY_RETRIES num_failures = (i % CONTENT_REPLAY_RETRIES) + 1 # This generates failures of add for the first CONTENT_REPLAY_RETRIES # objects, then failures of get. if i < CONTENT_REPLAY_RETRIES: add_failures['add', sha1] = num_failures else: get_failures['get', sha1] = num_failures # Only contents that have CONTENT_REPLAY_RETRIES or more are # definitely failing if num_failures >= CONTENT_REPLAY_RETRIES: definitely_failed.add(hash_to_hex(sha1)) objstorages['dst'] = FlakyObjStorage( state=objstorages['dst'].state, failures=add_failures, ) objstorages['src'] = FlakyObjStorage( state=objstorages['src'].state, failures=get_failures, ) caplog.set_level(logging.DEBUG, 'swh.journal.replay') result = invoke(False, [ 'content-replay', '--broker', '127.0.0.1:%d' % kafka_port, '--group-id', kafka_consumer_group, '--prefix', kafka_prefix, - '--max-messages', str(NUM_CONTENTS), + '--stop-after-objects', str(NUM_CONTENTS), ]) expected = r'Done.\n' assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output copied = 0 actually_failed = set() for record in caplog.records: logtext = record.getMessage() if 'copied' in logtext: copied += 1 elif 'Failed operation' in logtext: assert record.levelno == logging.ERROR assert record.args['retries'] == CONTENT_REPLAY_RETRIES actually_failed.add(record.args['obj_id']) assert actually_failed == definitely_failed, ( 'Unexpected object copy failures; see captured log for details' ) for (sha1, content) in contents.items(): if hash_to_hex(sha1) in definitely_failed: assert sha1 not in objstorages['dst'] continue assert sha1 in objstorages['dst'], sha1 assert objstorages['dst'].get(sha1) == content @_patch_objstorages(['src', 'dst']) def test_replay_content_objnotfound( objstorages, storage, kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int], caplog): (_, kafka_port) = kafka_server kafka_prefix += '.swh.journal.objects' contents = _fill_objstorage_and_kafka( kafka_port, kafka_prefix, objstorages) num_contents_deleted = 5 contents_deleted = set() for i, sha1 in enumerate(contents): if i >= num_contents_deleted: break del objstorages['src'].state[sha1] contents_deleted.add(hash_to_hex(sha1)) caplog.set_level(logging.DEBUG, 'swh.journal.replay') result = invoke(False, [ 'content-replay', '--broker', '127.0.0.1:%d' % kafka_port, '--group-id', kafka_consumer_group, '--prefix', kafka_prefix, - '--max-messages', str(NUM_CONTENTS), + '--stop-after-objects', str(NUM_CONTENTS), ]) expected = r'Done.\n' assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output copied = 0 not_in_src = set() for record in caplog.records: logtext = record.getMessage() if 'copied' in logtext: copied += 1 elif 'object not found' in logtext: # Check that the object id can be recovered from logs assert record.levelno == logging.ERROR not_in_src.add(record.args['obj_id']) assert copied == NUM_CONTENTS - num_contents_deleted, ( "Unexpected number of contents copied" ) assert not_in_src == contents_deleted, ( "Mismatch between deleted contents and not_in_src logs" ) for (sha1, content) in contents.items(): if sha1 not in objstorages['src']: continue assert sha1 in objstorages['dst'], sha1 assert objstorages['dst'].get(sha1) == content diff --git a/swh/journal/tests/test_client.py b/swh/journal/tests/test_client.py index 8b0e42b..d47a941 100644 --- a/swh/journal/tests/test_client.py +++ b/swh/journal/tests/test_client.py @@ -1,85 +1,85 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from subprocess import Popen from typing import Tuple from unittest.mock import MagicMock from confluent_kafka import Producer from swh.model.hypothesis_strategies import revisions from swh.journal.client import JournalClient from swh.journal.serializers import key_to_kafka, value_to_kafka def test_client( kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int]): (_, port) = kafka_server kafka_prefix += '.swh.journal.objects' producer = Producer({ 'bootstrap.servers': 'localhost:{}'.format(port), 'client.id': 'test producer', 'enable.idempotence': 'true', }) rev = revisions().example() # Fill Kafka producer.produce( topic=kafka_prefix + '.revision', key=key_to_kafka(rev.id), value=value_to_kafka(rev.to_dict()), ) producer.flush() client = JournalClient( brokers='localhost:%d' % kafka_server[1], group_id=kafka_consumer_group, prefix=kafka_prefix, - max_messages=1, + stop_after_objects=1, ) worker_fn = MagicMock() client.process(worker_fn) worker_fn.assert_called_once_with({'revision': [rev.to_dict()]}) def test_client_eof( kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int]): (_, port) = kafka_server kafka_prefix += '.swh.journal.objects' producer = Producer({ 'bootstrap.servers': 'localhost:{}'.format(port), 'client.id': 'test producer', 'enable.idempotence': 'true', }) rev = revisions().example() # Fill Kafka producer.produce( topic=kafka_prefix + '.revision', key=key_to_kafka(rev.id), value=value_to_kafka(rev.to_dict()), ) producer.flush() client = JournalClient( brokers='localhost:%d' % kafka_server[1], group_id=kafka_consumer_group, prefix=kafka_prefix, - max_messages=1, + stop_after_objects=None, stop_on_eof=True, ) worker_fn = MagicMock() client.process(worker_fn) worker_fn.assert_called_once_with({'revision': [rev.to_dict()]}) diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py index fb3aad7..cb74503 100644 --- a/swh/journal/tests/test_replay.py +++ b/swh/journal/tests/test_replay.py @@ -1,245 +1,244 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import functools import random from subprocess import Popen from typing import Tuple import dateutil from confluent_kafka import Producer from hypothesis import strategies, given, settings import pytest from swh.storage import get_storage from swh.journal.client import JournalClient from swh.journal.serializers import key_to_kafka, value_to_kafka from swh.journal.replay import process_replay_objects, is_hash_in_bytearray from .conftest import OBJECT_TYPE_KEYS from .utils import MockedJournalClient, MockedKafkaWriter storage_config = { 'cls': 'pipeline', 'steps': [ {'cls': 'validate'}, {'cls': 'memory'}, ] } def test_storage_play( kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int]): (_, port) = kafka_server kafka_prefix += '.swh.journal.objects' storage = get_storage(**storage_config) producer = Producer({ 'bootstrap.servers': 'localhost:{}'.format(port), 'client.id': 'test producer', 'enable.idempotence': 'true', }) now = datetime.datetime.now(tz=datetime.timezone.utc) # Fill Kafka nb_sent = 0 nb_visits = 0 for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): topic = kafka_prefix + '.' + object_type for object_ in objects: key = bytes(random.randint(0, 255) for _ in range(40)) object_ = object_.copy() if object_type == 'content': object_['ctime'] = now elif object_type == 'origin_visit': nb_visits += 1 object_['visit'] = nb_visits producer.produce( topic=topic, key=key_to_kafka(key), value=value_to_kafka(object_), ) nb_sent += 1 producer.flush() # Fill the storage from Kafka replayer = JournalClient( brokers='localhost:%d' % kafka_server[1], group_id=kafka_consumer_group, prefix=kafka_prefix, - max_messages=nb_sent, + stop_after_objects=nb_sent, ) worker_fn = functools.partial(process_replay_objects, storage=storage) nb_inserted = 0 while nb_inserted < nb_sent: nb_inserted += replayer.process(worker_fn) assert nb_sent == nb_inserted # Check the objects were actually inserted in the storage assert OBJECT_TYPE_KEYS['revision'][1] == \ list(storage.revision_get( [rev['id'] for rev in OBJECT_TYPE_KEYS['revision'][1]])) assert OBJECT_TYPE_KEYS['release'][1] == \ list(storage.release_get( [rel['id'] for rel in OBJECT_TYPE_KEYS['release'][1]])) origins = list(storage.origin_get( [orig for orig in OBJECT_TYPE_KEYS['origin'][1]])) assert OBJECT_TYPE_KEYS['origin'][1] == \ [{'url': orig['url']} for orig in origins] for origin in origins: origin_url = origin['url'] expected_visits = [ { **visit, 'origin': origin_url, 'date': dateutil.parser.parse(visit['date']), } for visit in OBJECT_TYPE_KEYS['origin_visit'][1] if visit['origin'] == origin['url'] ] actual_visits = list(storage.origin_visit_get( origin_url)) for visit in actual_visits: del visit['visit'] # opaque identifier assert expected_visits == actual_visits input_contents = OBJECT_TYPE_KEYS['content'][1] contents = storage.content_get_metadata( [cont['sha1'] for cont in input_contents]) assert len(contents) == len(input_contents) assert contents == {cont['sha1']: [cont] for cont in input_contents} def _test_write_replay_origin_visit(visits): """Helper function to write tests for origin_visit. Each visit (a dict) given in the 'visits' argument will be sent to a (mocked) kafka queue, which a in-memory-storage backed replayer is listening to. Check that corresponding origin visits entities are present in the storage and have correct values. """ queue = [] replayer = MockedJournalClient(queue) writer = MockedKafkaWriter(queue) # Note that flipping the order of these two insertions will crash # the test, because the legacy origin_format does not allow to create # the origin when needed (type is missing) writer.send('origin', 'foo', { 'url': 'http://example.com/', 'type': 'git', }) for visit in visits: writer.send('origin_visit', 'foo', visit) queue_size = len(queue) - assert replayer.max_messages is None - replayer.max_messages = queue_size + assert replayer.stop_after_objects is None + replayer.stop_after_objects = queue_size storage = get_storage(**storage_config) worker_fn = functools.partial(process_replay_objects, storage=storage) - nb_messages = 0 - while nb_messages < queue_size: - nb_messages += replayer.process(worker_fn) + + replayer.process(worker_fn) actual_visits = list(storage.origin_visit_get('http://example.com/')) assert len(actual_visits) == len(visits), actual_visits for vin, vout in zip(visits, actual_visits): vin = vin.copy() vout = vout.copy() assert vout.pop('origin') == 'http://example.com/' vin.pop('origin') vin.setdefault('type', 'git') vin.setdefault('metadata', None) assert vin == vout def test_write_replay_origin_visit(): """Test origin_visit when the 'origin' is just a string.""" now = datetime.datetime.now() visits = [{ 'visit': 1, 'origin': 'http://example.com/', 'date': now, 'type': 'git', 'status': 'partial', 'snapshot': None, }] _test_write_replay_origin_visit(visits) def test_write_replay_legacy_origin_visit1(): """Test origin_visit when there is no type.""" now = datetime.datetime.now() visits = [{ 'visit': 1, 'origin': 'http://example.com/', 'date': now, 'status': 'partial', 'snapshot': None, }] with pytest.raises(ValueError, match='too old'): _test_write_replay_origin_visit(visits) def test_write_replay_legacy_origin_visit2(): """Test origin_visit when 'type' is missing from the visit, but not from the origin.""" now = datetime.datetime.now() visits = [{ 'visit': 1, 'origin': { 'url': 'http://example.com/', 'type': 'git', }, 'date': now, 'type': 'git', 'status': 'partial', 'snapshot': None, }] _test_write_replay_origin_visit(visits) def test_write_replay_legacy_origin_visit3(): """Test origin_visit when the origin is a dict""" now = datetime.datetime.now() visits = [{ 'visit': 1, 'origin': { 'url': 'http://example.com/', }, 'date': now, 'type': 'git', 'status': 'partial', 'snapshot': None, }] _test_write_replay_origin_visit(visits) hash_strategy = strategies.binary(min_size=20, max_size=20) @settings(max_examples=500) @given(strategies.sets(hash_strategy, min_size=0, max_size=500), strategies.sets(hash_strategy, min_size=10)) def test_is_hash_in_bytearray(haystack, needles): array = b''.join(sorted(haystack)) needles |= haystack # Exhaustively test for all objects in the array for needle in needles: assert is_hash_in_bytearray(needle, array, len(haystack)) == \ (needle in haystack) diff --git a/swh/journal/tests/test_write_replay.py b/swh/journal/tests/test_write_replay.py index b76f9fa..da215d1 100644 --- a/swh/journal/tests/test_write_replay.py +++ b/swh/journal/tests/test_write_replay.py @@ -1,160 +1,158 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import functools from unittest.mock import patch import attr from hypothesis import given, settings, HealthCheck from hypothesis.strategies import lists from swh.model.hypothesis_strategies import object_dicts, present_contents from swh.storage import get_storage, HashCollision from swh.journal.replay import process_replay_objects from swh.journal.replay import process_replay_objects_content from .utils import MockedJournalClient, MockedKafkaWriter storage_config = { 'cls': 'pipeline', 'steps': [ {'cls': 'validate'}, {'cls': 'memory', 'journal_writer': {'cls': 'memory'}}, ] } def empty_person_name_email(rev_or_rel): """Empties the 'name' and 'email' fields of the author/committer fields of a revision or release; leaving only the fullname.""" if getattr(rev_or_rel, 'author', None): rev_or_rel = attr.evolve( rev_or_rel, author=attr.evolve( rev_or_rel.author, name=b'', email=b'', ) ) if getattr(rev_or_rel, 'committer', None): rev_or_rel = attr.evolve( rev_or_rel, committer=attr.evolve( rev_or_rel.committer, name=b'', email=b'', ) ) return rev_or_rel @given(lists(object_dicts(), min_size=1)) @settings(suppress_health_check=[HealthCheck.too_slow]) def test_write_replay_same_order_batches(objects): queue = [] replayer = MockedJournalClient(queue) with patch('swh.journal.writer.inmemory.InMemoryJournalWriter', return_value=MockedKafkaWriter(queue)): storage1 = get_storage(**storage_config) for (obj_type, obj) in objects: obj = obj.copy() if obj_type == 'origin_visit': storage1.origin_add_one({'url': obj['origin']}) storage1.origin_visit_upsert([obj]) else: if obj_type == 'content' and obj.get('status') == 'absent': obj_type = 'skipped_content' method = getattr(storage1, obj_type + '_add') try: method([obj]) except HashCollision: pass # Bail out early if we didn't insert any relevant objects... queue_size = len(queue) assert queue_size != 0, "No test objects found; hypothesis strategy bug?" - assert replayer.max_messages is None - replayer.max_messages = queue_size + assert replayer.stop_after_objects is None + replayer.stop_after_objects = queue_size storage2 = get_storage(**storage_config) worker_fn = functools.partial(process_replay_objects, storage=storage2) - nb_messages = 0 - while nb_messages < queue_size: - nb_messages += replayer.process(worker_fn) + + replayer.process(worker_fn) assert replayer.consumer.committed for attr_name in ('_contents', '_directories', '_snapshots', '_origin_visits', '_origins'): assert getattr(storage1, attr_name) == getattr(storage2, attr_name), \ attr_name # When hypothesis generates a revision and a release with same # author (or committer) fullname but different name or email, then # the storage will use the first name/email it sees. # This first one will be either the one from the revision or the release, # and since there is no order guarantees, storage2 has 1/2 chance of # not seeing the same order as storage1, therefore we need to strip # them out before comparing. for attr_name in ('_revisions', '_releases'): items1 = {k: empty_person_name_email(v) for (k, v) in getattr(storage1, attr_name).items()} items2 = {k: empty_person_name_email(v) for (k, v) in getattr(storage2, attr_name).items()} assert items1 == items2, attr_name # TODO: add test for hash collision @given(lists(present_contents(), min_size=1)) @settings(suppress_health_check=[HealthCheck.too_slow]) def test_write_replay_content(objects): queue = [] replayer = MockedJournalClient(queue) with patch('swh.journal.writer.inmemory.InMemoryJournalWriter', return_value=MockedKafkaWriter(queue)): storage1 = get_storage(**storage_config) contents = [] for obj in objects: obj = obj.to_dict() storage1.content_add([obj]) contents.append(obj) # Bail out early if we didn't insert any relevant objects... queue_size = len(queue) assert queue_size != 0, "No test objects found; hypothesis strategy bug?" - assert replayer.max_messages is None - replayer.max_messages = queue_size + assert replayer.stop_after_objects is None + replayer.stop_after_objects = queue_size storage2 = get_storage(**storage_config) objstorage1 = storage1.objstorage.objstorage objstorage2 = storage2.objstorage.objstorage worker_fn = functools.partial(process_replay_objects_content, src=objstorage1, dst=objstorage2) - nb_messages = 0 - while nb_messages < queue_size: - nb_messages += replayer.process(worker_fn) + + replayer.process(worker_fn) # only content with status visible will be copied in storage2 expected_objstorage_state = { c['sha1']: c['data'] for c in contents if c['status'] == 'visible' } assert expected_objstorage_state == objstorage2.state diff --git a/swh/journal/tests/utils.py b/swh/journal/tests/utils.py index 89338a4..9e34487 100644 --- a/swh/journal/tests/utils.py +++ b/swh/journal/tests/utils.py @@ -1,75 +1,75 @@ from swh.journal.client import JournalClient, ACCEPTED_OBJECT_TYPES from swh.journal.writer.kafka import KafkaJournalWriter from swh.journal.serializers import (kafka_to_value, key_to_kafka, value_to_kafka) class FakeKafkaMessage: def __init__(self, topic, key, value): self._topic = topic self._key = key_to_kafka(key) self._value = value_to_kafka(value) def topic(self): return self._topic def value(self): return self._value def key(self): return self._key def error(self): return None class MockedKafkaWriter(KafkaJournalWriter): def __init__(self, queue): self._prefix = 'prefix' self.queue = queue def send(self, topic, key, value): msg = FakeKafkaMessage(topic=topic, key=key, value=value) self.queue.append(msg) def flush(self): pass class MockedKafkaConsumer: """Mimic the confluent_kafka.Consumer API, producing the messages stored in `queue`. You're only allowed to subscribe to topics in which the queue has messages. """ def __init__(self, queue): self.queue = queue self.committed = False def consume(self, num_messages, timeout=None): L = self.queue[0:num_messages] self.queue[0:num_messages] = [] return L def commit(self): if self.queue == []: self.committed = True def list_topics(self, timeout=None): return set(message.topic() for message in self.queue) def subscribe(self, topics): unknown_topics = set(topics) - self.list_topics() if unknown_topics: raise ValueError('Unknown topics %s' % ', '.join(unknown_topics)) class MockedJournalClient(JournalClient): def __init__(self, queue, object_types=ACCEPTED_OBJECT_TYPES): self._object_types = object_types self.consumer = MockedKafkaConsumer(queue) self.process_timeout = None - self.max_messages = None + self.stop_after_objects = None self.value_deserializer = kafka_to_value self.stop_on_eof = False