diff --git a/MANIFEST.in b/MANIFEST.in index 3148ae2..6c8d3a7 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,4 +1,5 @@ include Makefile include requirements*.txt include version.txt recursive-include swh py.typed +include swh/journal/tests/log4j.properties diff --git a/PKG-INFO b/PKG-INFO index b9b3a06..a7b272c 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,70 +1,70 @@ Metadata-Version: 2.1 Name: swh.journal -Version: 0.0.26 +Version: 0.0.27 Summary: Software Heritage Journal utilities Home-page: https://forge.softwareheritage.org/diffusion/DJNL/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-journal Description: swh-journal =========== Persistent logger of changes to the archive, with publish-subscribe support. See the [documentation](https://docs.softwareheritage.org/devel/swh-journal/index.html#software-heritage-journal) for more details. # Local test As a pre-requisite, you need a kakfa installation path. The following target will take care of this: ``` make install ``` Then, provided you are in the right virtual environment as described in the [swh getting-started](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup): ``` pytest ``` or: ``` tox ``` # Running ## publisher Command: ``` $ swh-journal --config-file ~/.config/swh/journal/publisher.yml \ publisher ``` # Auto-completion To have the completion, add the following in your ~/.virtualenvs/swh/bin/postactivate: ``` eval "$(_SWH_JOURNAL_COMPLETE=$autocomplete_cmd swh-journal)" ``` Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/requirements-swh.txt b/requirements-swh.txt index 85f1a2c..932ebfd 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,3 +1,3 @@ swh.core[db,http] >= 0.0.60 -swh.model >= 0.0.40 -swh.storage >= 0.0.172 +swh.model >= 0.0.60 +swh.storage >= 0.0.178 diff --git a/swh.journal.egg-info/PKG-INFO b/swh.journal.egg-info/PKG-INFO index b9b3a06..a7b272c 100644 --- a/swh.journal.egg-info/PKG-INFO +++ b/swh.journal.egg-info/PKG-INFO @@ -1,70 +1,70 @@ Metadata-Version: 2.1 Name: swh.journal -Version: 0.0.26 +Version: 0.0.27 Summary: Software Heritage Journal utilities Home-page: https://forge.softwareheritage.org/diffusion/DJNL/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-journal Description: swh-journal =========== Persistent logger of changes to the archive, with publish-subscribe support. See the [documentation](https://docs.softwareheritage.org/devel/swh-journal/index.html#software-heritage-journal) for more details. # Local test As a pre-requisite, you need a kakfa installation path. The following target will take care of this: ``` make install ``` Then, provided you are in the right virtual environment as described in the [swh getting-started](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup): ``` pytest ``` or: ``` tox ``` # Running ## publisher Command: ``` $ swh-journal --config-file ~/.config/swh/journal/publisher.yml \ publisher ``` # Auto-completion To have the completion, add the following in your ~/.virtualenvs/swh/bin/postactivate: ``` eval "$(_SWH_JOURNAL_COMPLETE=$autocomplete_cmd swh-journal)" ``` Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh.journal.egg-info/SOURCES.txt b/swh.journal.egg-info/SOURCES.txt index c2fd5ae..ac59be6 100644 --- a/swh.journal.egg-info/SOURCES.txt +++ b/swh.journal.egg-info/SOURCES.txt @@ -1,36 +1,37 @@ MANIFEST.in Makefile README.md requirements-swh.txt requirements-test.txt requirements.txt setup.py version.txt swh/__init__.py swh.journal.egg-info/PKG-INFO swh.journal.egg-info/SOURCES.txt swh.journal.egg-info/dependency_links.txt swh.journal.egg-info/entry_points.txt swh.journal.egg-info/requires.txt swh.journal.egg-info/top_level.txt swh/journal/__init__.py swh/journal/backfill.py swh/journal/cli.py swh/journal/client.py swh/journal/direct_writer.py swh/journal/py.typed swh/journal/replay.py swh/journal/serializers.py swh/journal/tests/__init__.py swh/journal/tests/conftest.py +swh/journal/tests/log4j.properties swh/journal/tests/test_backfill.py swh/journal/tests/test_cli.py swh/journal/tests/test_client.py swh/journal/tests/test_kafka_writer.py swh/journal/tests/test_replay.py swh/journal/tests/test_serializers.py swh/journal/tests/test_write_replay.py swh/journal/tests/utils.py swh/journal/writer/__init__.py swh/journal/writer/inmemory.py swh/journal/writer/kafka.py \ No newline at end of file diff --git a/swh.journal.egg-info/requires.txt b/swh.journal.egg-info/requires.txt index c7288f6..d70491f 100644 --- a/swh.journal.egg-info/requires.txt +++ b/swh.journal.egg-info/requires.txt @@ -1,13 +1,13 @@ confluent-kafka msgpack tenacity vcversioner swh.core[db,http]>=0.0.60 -swh.model>=0.0.40 -swh.storage>=0.0.172 +swh.model>=0.0.60 +swh.storage>=0.0.178 [testing] pytest swh.model>=0.0.34 pytest-kafka hypothesis diff --git a/swh/journal/cli.py b/swh/journal/cli.py index 3be79e4..606a0df 100644 --- a/swh/journal/cli.py +++ b/swh/journal/cli.py @@ -1,271 +1,229 @@ # Copyright (C) 2016-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import functools import logging import mmap import os -import time import click try: from systemd.daemon import notify except ImportError: notify = None from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.model.model import SHA1_SIZE from swh.storage import get_storage from swh.objstorage import get_objstorage from swh.journal.client import JournalClient from swh.journal.replay import is_hash_in_bytearray from swh.journal.replay import process_replay_objects from swh.journal.replay import process_replay_objects_content from swh.journal.backfill import JournalBackfiller @click.group(name='journal', context_settings=CONTEXT_SETTINGS) @click.option('--config-file', '-C', default=None, type=click.Path(exists=True, dir_okay=False,), help="Configuration file.") @click.pass_context def cli(ctx, config_file): """Software Heritage Journal tools. The journal is a persistent logger of changes to the archive, with publish-subscribe support. """ if not config_file: config_file = os.environ.get('SWH_CONFIG_FILENAME') if config_file: if not os.path.exists(config_file): raise ValueError('%s does not exist' % config_file) conf = config.read(config_file) else: conf = {} ctx.ensure_object(dict) ctx.obj['config'] = conf def get_journal_client(ctx, **kwargs): conf = ctx.obj['config'].get('journal', {}) conf.update({k: v for (k, v) in kwargs.items() if v not in (None, ())}) if not conf.get('brokers'): ctx.fail('You must specify at least one kafka broker.') if not isinstance(conf['brokers'], (list, tuple)): conf['brokers'] = [conf['brokers']] return JournalClient(**conf) @cli.command() -@click.option('--max-messages', '-m', default=None, type=int, - help='Maximum number of objects to replay. Default is to ' +@click.option('--stop-after-objects', '-n', default=None, type=int, + help='Stop after processing this many objects. Default is to ' 'run forever.') -@click.option('--broker', 'brokers', type=str, multiple=True, - help='Kafka broker to connect to. ' - '(deprecated, use the config file instead)') -@click.option('--prefix', type=str, default=None, - help='Prefix of Kafka topic names to read from. ' - '(deprecated, use the config file instead)') -@click.option('--group-id', type=str, - help='Name of the group id for reading from Kafka. ' - '(deprecated, use the config file instead)') @click.pass_context -def replay(ctx, brokers, prefix, group_id, max_messages): +def replay(ctx, stop_after_objects): """Fill a Storage by reading a Journal. There can be several 'replayers' filling a Storage as long as they use the same `group-id`. """ - logger = logging.getLogger(__name__) conf = ctx.obj['config'] try: storage = get_storage(**conf.pop('storage')) except KeyError: ctx.fail('You must have a storage configured in your config file.') client = get_journal_client( - ctx, brokers=brokers, prefix=prefix, group_id=group_id, - max_messages=max_messages) + ctx, stop_after_objects=stop_after_objects) worker_fn = functools.partial(process_replay_objects, storage=storage) if notify: notify('READY=1') try: - nb_messages = 0 - last_log_time = 0 - while not max_messages or nb_messages < max_messages: - nb_messages += client.process(worker_fn) - if notify: - notify('WATCHDOG=1') - if time.time() - last_log_time >= 60: - # Log at most once per minute. - logger.info('Processed %d messages.' % nb_messages) - last_log_time = time.time() + client.process(worker_fn) except KeyboardInterrupt: ctx.exit(0) else: print('Done.') finally: if notify: notify('STOPPING=1') client.close() @cli.command() @click.argument('object_type') @click.option('--start-object', default=None) @click.option('--end-object', default=None) @click.option('--dry-run', is_flag=True, default=False) @click.pass_context def backfiller(ctx, object_type, start_object, end_object, dry_run): """Run the backfiller The backfiller list objects from a Storage and produce journal entries from there. Typically used to rebuild a journal or compensate for missing objects in a journal (eg. due to a downtime of this later). The configuration file requires the following entries: - brokers: a list of kafka endpoints (the journal) in which entries will be added. - storage_dbconn: URL to connect to the storage DB. - prefix: the prefix of the topics (topics will be .). - client_id: the kafka client ID. """ conf = ctx.obj['config'] backfiller = JournalBackfiller(conf) if notify: notify('READY=1') try: backfiller.run( object_type=object_type, start_object=start_object, end_object=end_object, dry_run=dry_run) except KeyboardInterrupt: if notify: notify('STOPPING=1') ctx.exit(0) @cli.command('content-replay') -@click.option('--max-messages', '-m', default=None, type=int, - help='Maximum number of objects to replay. Default is to ' +@click.option('--stop-after-objects', '-n', default=None, type=int, + help='Stop after processing this many objects. Default is to ' 'run forever.') -@click.option('--broker', 'brokers', type=str, multiple=True, - help='Kafka broker to connect to.' - '(deprecated, use the config file instead)') -@click.option('--prefix', type=str, default=None, - help='Prefix of Kafka topic names to read from.' - '(deprecated, use the config file instead)') -@click.option('--group-id', type=str, - help='Name of the group id for reading from Kafka.' - '(deprecated, use the config file instead)') @click.option('--exclude-sha1-file', default=None, type=click.File('rb'), help='File containing a sorted array of hashes to be excluded.') @click.option('--check-dst/--no-check-dst', default=True, help='Check whether the destination contains the object before ' 'copying.') @click.pass_context -def content_replay(ctx, max_messages, - brokers, prefix, group_id, exclude_sha1_file, check_dst): +def content_replay(ctx, stop_after_objects, exclude_sha1_file, check_dst): """Fill a destination Object Storage (typically a mirror) by reading a Journal and retrieving objects from an existing source ObjStorage. There can be several 'replayers' filling a given ObjStorage as long as they use the same `group-id`. You can use the `KAFKA_GROUP_INSTANCE_ID` environment variable to use KIP-345 static group membership. This service retrieves object ids to copy from the 'content' topic. It will only copy object's content if the object's description in the kafka nmessage has the status:visible set. `--exclude-sha1-file` may be used to exclude some hashes to speed-up the replay in case many of the contents are already in the destination objstorage. It must contain a concatenation of all (sha1) hashes, and it must be sorted. This file will not be fully loaded into memory at any given time, so it can be arbitrarily large. `--check-dst` sets whether the replayer should check in the destination ObjStorage before copying an object. You can turn that off if you know you're copying to an empty ObjStorage. """ - logger = logging.getLogger(__name__) conf = ctx.obj['config'] try: objstorage_src = get_objstorage(**conf.pop('objstorage_src')) except KeyError: ctx.fail('You must have a source objstorage configured in ' 'your config file.') try: objstorage_dst = get_objstorage(**conf.pop('objstorage_dst')) except KeyError: ctx.fail('You must have a destination objstorage configured ' 'in your config file.') if exclude_sha1_file: map_ = mmap.mmap(exclude_sha1_file.fileno(), 0, prot=mmap.PROT_READ) if map_.size() % SHA1_SIZE != 0: ctx.fail('--exclude-sha1 must link to a file whose size is an ' 'exact multiple of %d bytes.' % SHA1_SIZE) nb_excluded_hashes = int(map_.size()/SHA1_SIZE) def exclude_fn(obj): return is_hash_in_bytearray(obj['sha1'], map_, nb_excluded_hashes) else: exclude_fn = None client = get_journal_client( - ctx, brokers=brokers, prefix=prefix, group_id=group_id, - max_messages=max_messages, object_types=('content',)) + ctx, stop_after_objects=stop_after_objects, object_types=('content',)) worker_fn = functools.partial( process_replay_objects_content, src=objstorage_src, dst=objstorage_dst, exclude_fn=exclude_fn, check_dst=check_dst) if notify: notify('READY=1') try: - nb_messages = 0 - last_log_time = 0 - while not max_messages or nb_messages < max_messages: - nb_messages += client.process(worker_fn) - if notify: - notify('WATCHDOG=1') - if time.time() - last_log_time >= 60: - # Log at most once per minute. - logger.info('Processed %d messages.' % nb_messages) - last_log_time = time.time() + client.process(worker_fn) except KeyboardInterrupt: ctx.exit(0) else: print('Done.') finally: if notify: notify('STOPPING=1') client.close() def main(): logging.basicConfig() return cli(auto_envvar_prefix='SWH_JOURNAL') if __name__ == '__main__': main() diff --git a/swh/journal/client.py b/swh/journal/client.py index a7a7b68..9c8a97e 100644 --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -1,254 +1,282 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import logging import os import time +from typing import Any, Dict, List, Optional, Set, Tuple, Union from confluent_kafka import Consumer, KafkaException, KafkaError from .serializers import kafka_to_value from swh.journal import DEFAULT_PREFIX logger = logging.getLogger(__name__) rdkafka_logger = logging.getLogger(__name__ + '.rdkafka') # Only accepted offset reset policy accepted ACCEPTED_OFFSET_RESET = ['earliest', 'latest'] # Only accepted object types ACCEPTED_OBJECT_TYPES = [ 'content', 'directory', 'revision', 'release', 'snapshot', 'origin', 'origin_visit' ] # Errors that Kafka raises too often and are not useful; therefore they # we lower their log level to DEBUG instead of INFO. _SPAMMY_ERRORS = [ KafkaError._NO_OFFSET, ] def _error_cb(error): if error.fatal(): raise KafkaException(error) if error.code() in _SPAMMY_ERRORS: logger.debug('Received non-fatal kafka error: %s', error) else: logger.info('Received non-fatal kafka error: %s', error) def _on_commit(error, partitions): if error is not None: _error_cb(error) class JournalClient: """A base client for the Software Heritage journal. The current implementation of the journal uses Apache Kafka brokers to publish messages under a given topic prefix, with each - object type using a specific topic under that prefix. If the 'prefix' + object type using a specific topic under that prefix. If the `prefix` argument is None (default value), it will take the default value - 'swh.journal.objects'. + `'swh.journal.objects'`. Clients subscribe to events specific to each object type as listed in the `object_types` argument (if unset, defaults to all accepted object types). Clients can be sharded by setting the `group_id` to a common value across instances. The journal will share the message throughput across the nodes sharing the same group_id. - Messages are processed by the `worker_fn` callback passed to the - `process` method, in batches of maximum `max_messages`. + Messages are processed by the `worker_fn` callback passed to the `process` + method, in batches of maximum `batch_size` messages (defaults to 200). + + If set, the processing stops after processing `stop_after_objects` messages + in total. + + `stop_on_eof` stops the processing when the client has reached the end of + each partition in turn. + + `auto_offset_reset` sets the behavior of the client when the consumer group + initializes: `'earliest'` (the default) processes all objects since the + inception of the topics; `''` Any other named argument is passed directly to KafkaConsumer(). """ def __init__( - self, brokers, group_id, prefix=None, object_types=None, - max_messages=0, process_timeout=0, auto_offset_reset='earliest', - stop_on_eof=False, **kwargs): + self, + brokers: Union[str, List[str]], + group_id: str, + prefix: Optional[str] = None, + object_types: Optional[List[str]] = None, + stop_after_objects: Optional[int] = None, + batch_size: int = 200, + process_timeout: Optional[float] = None, + auto_offset_reset: str = 'earliest', + stop_on_eof: bool = False, + **kwargs + ): if prefix is None: prefix = DEFAULT_PREFIX if object_types is None: object_types = ACCEPTED_OBJECT_TYPES if auto_offset_reset not in ACCEPTED_OFFSET_RESET: raise ValueError( 'Option \'auto_offset_reset\' only accept %s, not %s' % (ACCEPTED_OFFSET_RESET, auto_offset_reset)) for object_type in object_types: if object_type not in ACCEPTED_OBJECT_TYPES: raise ValueError( 'Option \'object_types\' only accepts %s, not %s.' % (ACCEPTED_OBJECT_TYPES, object_type)) + if batch_size <= 0: + raise ValueError("Option 'batch_size' needs to be positive") + self.value_deserializer = kafka_to_value if isinstance(brokers, str): brokers = [brokers] debug_logging = rdkafka_logger.isEnabledFor(logging.DEBUG) if debug_logging and 'debug' not in kwargs: kwargs['debug'] = 'consumer' # Static group instance id management group_instance_id = os.environ.get('KAFKA_GROUP_INSTANCE_ID') if group_instance_id: kwargs['group.instance.id'] = group_instance_id if 'group.instance.id' in kwargs: # When doing static consumer group membership, set a higher default # session timeout. The session timeout is the duration after which # the broker considers that a consumer has left the consumer group # for good, and triggers a rebalance. Considering our current # processing pattern, 10 minutes gives the consumer ample time to # restart before that happens. if 'session.timeout.ms' not in kwargs: kwargs['session.timeout.ms'] = 10 * 60 * 1000 # 10 minutes if 'session.timeout.ms' in kwargs: # When the session timeout is set, rdkafka requires the max poll # interval to be set to a higher value; the max poll interval is # rdkafka's way of figuring out whether the client's message # processing thread has stalled: when the max poll interval lapses # between two calls to consumer.poll(), rdkafka leaves the consumer # group and terminates the connection to the brokers. # # We default to 1.5 times the session timeout if 'max.poll.interval.ms' not in kwargs: kwargs['max.poll.interval.ms'] = ( kwargs['session.timeout.ms'] // 2 * 3 ) consumer_settings = { **kwargs, 'bootstrap.servers': ','.join(brokers), 'auto.offset.reset': auto_offset_reset, 'group.id': group_id, 'on_commit': _on_commit, 'error_cb': _error_cb, 'enable.auto.commit': False, 'logger': rdkafka_logger, } self.stop_on_eof = stop_on_eof if self.stop_on_eof: consumer_settings['enable.partition.eof'] = True logger.debug('Consumer settings: %s', consumer_settings) self.consumer = Consumer(consumer_settings) topics = ['%s.%s' % (prefix, object_type) for object_type in object_types] logger.debug('Upstream topics: %s', self.consumer.list_topics(timeout=10)) logger.debug('Subscribing to: %s', topics) self.consumer.subscribe(topics=topics) - self.max_messages = max_messages + self.stop_after_objects = stop_after_objects self.process_timeout = process_timeout - self.eof_reached = set() + self.eof_reached: Set[Tuple[str, str]] = set() + self.batch_size = batch_size self._object_types = object_types def process(self, worker_fn): """Polls Kafka for a batch of messages, and calls the worker_fn with these messages. Args: worker_fn Callable[Dict[str, List[dict]]]: Function called with the messages as argument. """ start_time = time.monotonic() - nb_messages = 0 + total_objects_processed = 0 while True: # timeout for message poll timeout = 1.0 elapsed = time.monotonic() - start_time if self.process_timeout: # +0.01 to prevent busy-waiting on / spamming consumer.poll. # consumer.consume() returns shortly before X expired # (a matter of milliseconds), so after it returns a first # time, it would then be called with a timeout in the order # of milliseconds, therefore returning immediately, then be # called again, etc. if elapsed + 0.01 >= self.process_timeout: break timeout = self.process_timeout - elapsed - num_messages = 20 - - if self.max_messages: - if nb_messages >= self.max_messages: + batch_size = self.batch_size + if self.stop_after_objects: + if total_objects_processed >= self.stop_after_objects: break - num_messages = min(num_messages, self.max_messages-nb_messages) + + # clamp batch size to avoid overrunning stop_after_objects + batch_size = min( + self.stop_after_objects-total_objects_processed, + batch_size, + ) messages = self.consumer.consume( - timeout=timeout, num_messages=num_messages) + timeout=timeout, num_messages=batch_size) if not messages: continue - nb_processed, at_eof = self.handle_messages(messages, worker_fn) - nb_messages += nb_processed + batch_processed, at_eof = self.handle_messages(messages, worker_fn) + total_objects_processed += batch_processed if at_eof: break - return nb_messages + return total_objects_processed def handle_messages(self, messages, worker_fn): - objects = defaultdict(list) + objects: Dict[str, List[Any]] = defaultdict(list) nb_processed = 0 for message in messages: error = message.error() if error is not None: if error.code() == KafkaError._PARTITION_EOF: self.eof_reached.add( (message.topic(), message.partition()) ) else: _error_cb(error) continue nb_processed += 1 object_type = message.topic().split('.')[-1] # Got a message from a topic we did not subscribe to. assert object_type in self._object_types, object_type objects[object_type].append(self.deserialize_message(message)) if objects: worker_fn(dict(objects)) self.consumer.commit() at_eof = (self.stop_on_eof and all( (tp.topic, tp.partition) in self.eof_reached for tp in self.consumer.assignment() )) return nb_processed, at_eof def deserialize_message(self, message): return self.value_deserializer(message.value()) def close(self): self.consumer.close() diff --git a/swh/journal/replay.py b/swh/journal/replay.py index a654d02..83a3227 100644 --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -1,449 +1,586 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy import logging from time import time -from typing import Callable, Dict, List, Optional +from typing import ( + Any, Callable, Dict, Iterable, List, Optional +) +from sentry_sdk import capture_exception, push_scope try: from systemd.daemon import notify except ImportError: notify = None -from tenacity import retry, stop_after_attempt, wait_random_exponential +from tenacity import ( + retry, retry_if_exception_type, stop_after_attempt, + wait_random_exponential, +) from swh.core.statsd import statsd from swh.model.identifiers import normalize_timestamp from swh.model.hashutil import hash_to_hex -from swh.model.model import SHA1_SIZE + +from swh.model.model import ( + BaseContent, BaseModel, Content, Directory, Origin, OriginVisit, Revision, + SHA1_SIZE, SkippedContent, Snapshot, Release +) from swh.objstorage.objstorage import ( ID_HASH_ALGO, ObjNotFoundError, ObjStorage, ) from swh.storage import HashCollision logger = logging.getLogger(__name__) GRAPH_OPERATIONS_METRIC = "swh_graph_replayer_operations_total" GRAPH_DURATION_METRIC = "swh_graph_replayer_duration_seconds" CONTENT_OPERATIONS_METRIC = "swh_content_replayer_operations_total" +CONTENT_RETRY_METRIC = "swh_content_replayer_retries_total" CONTENT_BYTES_METRIC = "swh_content_replayer_bytes" CONTENT_DURATION_METRIC = "swh_content_replayer_duration_seconds" +object_converter_fn: Dict[str, Callable[[Dict], BaseModel]] = { + 'origin': Origin.from_dict, + 'origin_visit': OriginVisit.from_dict, + 'snapshot': Snapshot.from_dict, + 'revision': Revision.from_dict, + 'release': Release.from_dict, + 'directory': Directory.from_dict, + 'content': Content.from_dict, + 'skipped_content': SkippedContent.from_dict, +} + + def process_replay_objects(all_objects, *, storage): for (object_type, objects) in all_objects.items(): logger.debug("Inserting %s %s objects", len(objects), object_type) with statsd.timed(GRAPH_DURATION_METRIC, tags={'object_type': object_type}): _insert_objects(object_type, objects, storage) statsd.increment(GRAPH_OPERATIONS_METRIC, len(objects), tags={'object_type': object_type}) if notify: notify('WATCHDOG=1') def _fix_revision_pypi_empty_string(rev): """PyPI loader failed to encode empty strings as bytes, see: swh:1:rev:8f0095ee0664867055d03de9bcc8f95b91d8a2b9 or https://forge.softwareheritage.org/D1772 """ rev = { **rev, 'author': rev['author'].copy(), 'committer': rev['committer'].copy(), } if rev['author'].get('email') == '': rev['author']['email'] = b'' if rev['author'].get('name') == '': rev['author']['name'] = b'' if rev['committer'].get('email') == '': rev['committer']['email'] = b'' if rev['committer'].get('name') == '': rev['committer']['name'] = b'' return rev def _fix_revision_transplant_source(rev): if rev.get('metadata') and rev['metadata'].get('extra_headers'): rev = copy.deepcopy(rev) rev['metadata']['extra_headers'] = [ [key, value.encode('ascii')] if key == 'transplant_source' and isinstance(value, str) else [key, value] for (key, value) in rev['metadata']['extra_headers']] return rev def _check_date(date): """Returns whether the date can be represented in backends with sane limits on timestamps and timezones (resp. signed 64-bits and signed 16 bits), and that microseconds is valid (ie. between 0 and 10^6). """ if date is None: return True date = normalize_timestamp(date) return (-2**63 <= date['timestamp']['seconds'] < 2**63) \ and (0 <= date['timestamp']['microseconds'] < 10**6) \ and (-2**15 <= date['offset'] < 2**15) def _check_revision_date(rev): """Exclude revisions with invalid dates. See https://forge.softwareheritage.org/T1339""" return _check_date(rev['date']) and _check_date(rev['committer_date']) -def _fix_revisions(revisions): - good_revisions = [] - for rev in revisions: - rev = _fix_revision_pypi_empty_string(rev) - rev = _fix_revision_transplant_source(rev) - if not _check_revision_date(rev): - logging.warning('Excluding revision (invalid date): %r', rev) - continue - if rev not in good_revisions: - good_revisions.append(rev) - return good_revisions - - -def _fix_origin_visits(visits): - good_visits = [] - for visit in visits: - visit = visit.copy() - if 'type' not in visit: - if isinstance(visit['origin'], dict) and 'type' in visit['origin']: - # Very old version of the schema: visits did not have a type, - # but their 'origin' field was a dict with a 'type' key. - visit['type'] = visit['origin']['type'] - else: - # Very very old version of the schema: 'type' is missing, - # so there is nothing we can do to fix it. - raise ValueError('Got an origin_visit too old to be replayed.') - if isinstance(visit['origin'], dict): - # Old version of the schema: visit['origin'] was a dict. - visit['origin'] = visit['origin']['url'] - good_visits.append(visit) - return good_visits - - -def fix_objects(object_type, objects): - """Converts a possibly old object from the journal to its current - expected format. - - List of conversions: - - Empty author name/email in PyPI releases: +def _fix_revisions(revisions: List[Dict]) -> List[Dict]: + """Adapt revisions into a list of (current) storage compatible dicts. >>> from pprint import pprint >>> date = { ... 'timestamp': { ... 'seconds': 1565096932, ... 'microseconds': 0, ... }, ... 'offset': 0, ... } - >>> pprint(fix_objects('revision', [{ + >>> pprint(_fix_revisions([{ ... 'author': {'email': '', 'fullname': b'', 'name': ''}, ... 'committer': {'email': '', 'fullname': b'', 'name': ''}, ... 'date': date, ... 'committer_date': date, ... }])) [{'author': {'email': b'', 'fullname': b'', 'name': b''}, 'committer': {'email': b'', 'fullname': b'', 'name': b''}, 'committer_date': {'offset': 0, 'timestamp': {'microseconds': 0, 'seconds': 1565096932}}, 'date': {'offset': 0, 'timestamp': {'microseconds': 0, 'seconds': 1565096932}}}] Fix type of 'transplant_source' extra headers: - >>> revs = fix_objects('revision', [{ + >>> revs = _fix_revisions([{ ... 'author': {'email': '', 'fullname': b'', 'name': ''}, ... 'committer': {'email': '', 'fullname': b'', 'name': ''}, ... 'date': date, ... 'committer_date': date, ... 'metadata': { ... 'extra_headers': [ ... ['time_offset_seconds', b'-3600'], - ... ['transplant_source', '29c154a012a70f49df983625090434587622b39e'] + ... ['transplant_source', '29c154a012a70f49df983625090434587622b39e'] # noqa ... ]} ... }]) >>> pprint(revs[0]['metadata']['extra_headers']) [['time_offset_seconds', b'-3600'], ['transplant_source', b'29c154a012a70f49df983625090434587622b39e']] Filter out revisions with invalid dates: >>> from copy import deepcopy >>> invalid_date1 = deepcopy(date) >>> invalid_date1['timestamp']['microseconds'] = 1000000000 # > 10^6 - >>> fix_objects('revision', [{ + >>> _fix_revisions([{ ... 'author': {'email': '', 'fullname': b'', 'name': b''}, ... 'committer': {'email': '', 'fullname': b'', 'name': b''}, ... 'date': invalid_date1, ... 'committer_date': date, ... }]) [] >>> invalid_date2 = deepcopy(date) >>> invalid_date2['timestamp']['seconds'] = 2**70 # > 10^63 - >>> fix_objects('revision', [{ + >>> _fix_revisions([{ ... 'author': {'email': '', 'fullname': b'', 'name': b''}, ... 'committer': {'email': '', 'fullname': b'', 'name': b''}, ... 'date': invalid_date2, ... 'committer_date': date, ... }]) [] >>> invalid_date3 = deepcopy(date) >>> invalid_date3['offset'] = 2**20 # > 10^15 - >>> fix_objects('revision', [{ + >>> _fix_revisions([{ ... 'author': {'email': '', 'fullname': b'', 'name': b''}, ... 'committer': {'email': '', 'fullname': b'', 'name': b''}, ... 'date': date, ... 'committer_date': invalid_date3, ... }]) [] + """ + good_revisions: List = [] + for rev in revisions: + rev = _fix_revision_pypi_empty_string(rev) + rev = _fix_revision_transplant_source(rev) + if not _check_revision_date(rev): + logging.warning('Excluding revision (invalid date): %r', rev) + continue + if rev not in good_revisions: + good_revisions.append(rev) + return good_revisions + + +def _fix_origin_visit(visit: Dict) -> OriginVisit: + """Adapt origin visits into a list of current storage compatible + OriginVisits. `visit['origin']` is a dict instead of an URL: - >>> pprint(fix_objects('origin_visit', [{ + >>> from datetime import datetime, timezone + >>> from pprint import pprint + >>> date = datetime(2020, 2, 27, 14, 39, 19, tzinfo=timezone.utc) + >>> pprint(_fix_origin_visit({ ... 'origin': {'url': 'http://foo'}, + ... 'date': date, ... 'type': 'git', - ... }])) - [{'origin': 'http://foo', 'type': 'git'}] + ... 'status': 'ongoing', + ... 'snapshot': None, + ... }).to_dict()) + {'date': datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=datetime.timezone.utc), + 'metadata': None, + 'origin': 'http://foo', + 'snapshot': None, + 'status': 'ongoing', + 'type': 'git'} `visit['type']` is missing , but `origin['visit']['type']` exists: - >>> pprint(fix_objects('origin_visit', [ - ... {'origin': {'type': 'hg', 'url': 'http://foo'} - ... }])) - [{'origin': 'http://foo', 'type': 'hg'}] - """ # noqa + >>> pprint(_fix_origin_visit( + ... {'origin': {'type': 'hg', 'url': 'http://foo'}, + ... 'date': date, + ... 'status': 'ongoing', + ... 'snapshot': None, + ... }).to_dict()) + {'date': datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=datetime.timezone.utc), + 'metadata': None, + 'origin': 'http://foo', + 'snapshot': None, + 'status': 'ongoing', + 'type': 'hg'} - if object_type == 'revision': - objects = _fix_revisions(objects) - elif object_type == 'origin_visit': - objects = _fix_origin_visits(objects) - return objects + """ # noqa + visit = visit.copy() + if 'type' not in visit: + if isinstance(visit['origin'], dict) and 'type' in visit['origin']: + # Very old version of the schema: visits did not have a type, + # but their 'origin' field was a dict with a 'type' key. + visit['type'] = visit['origin']['type'] + else: + # Very very old version of the schema: 'type' is missing, + # so there is nothing we can do to fix it. + raise ValueError('Got an origin_visit too old to be replayed.') + if isinstance(visit['origin'], dict): + # Old version of the schema: visit['origin'] was a dict. + visit['origin'] = visit['origin']['url'] + if 'metadata' not in visit: + visit['metadata'] = None + return OriginVisit.from_dict(visit) + + +def collision_aware_content_add( + content_add_fn: Callable[[Iterable[Any]], None], + contents: List[BaseContent]) -> None: + """Add contents to storage. If a hash collision is detected, an error is + logged. Then this adds the other non colliding contents to the storage. + Args: + content_add_fn: Storage content callable + contents: List of contents or skipped contents to add to storage -def _insert_objects(object_type, objects, storage): - objects = fix_objects(object_type, objects) - if object_type == 'content': + """ + if not contents: + return + colliding_content_hashes: List[Dict[str, Any]] = [] + while True: try: - storage.skipped_content_add( - (obj for obj in objects if obj.get('status') == 'absent')) + content_add_fn(contents) except HashCollision as e: - logger.error('(SkippedContent) Hash collision: %s', e.args) + algo, hash_id, colliding_hashes = e.args + hash_id = hash_to_hex(hash_id) + colliding_content_hashes.append({ + 'algo': algo, + 'hash': hash_to_hex(hash_id), + 'objects': [{k: hash_to_hex(v) for k, v in collision.items()} + for collision in colliding_hashes] + }) + # Drop the colliding contents from the transaction + contents = [c for c in contents + if c.hashes() not in colliding_hashes] + else: + # Successfully added contents, we are done + break + if colliding_content_hashes: + for collision in colliding_content_hashes: + logger.error('Collision detected: %(collision)s', { + 'collision': collision + }) - try: - storage.content_add_metadata( - (obj for obj in objects if obj.get('status') != 'absent')) - except HashCollision as e: - logger.error('(Content) Hash collision: %s', e.args) - elif object_type in ('directory', 'revision', 'release', - 'snapshot', 'origin'): - # TODO: split batches that are too large for the storage - # to handle? - method = getattr(storage, object_type + '_add') - method(objects) +def _insert_objects(object_type: str, objects: List[Dict], storage) -> None: + """Insert objects of type object_type in the storage. + + """ + if object_type == 'content': + contents: List[BaseContent] = [] + skipped_contents: List[BaseContent] = [] + for content in objects: + c = BaseContent.from_dict(content) + if isinstance(c, SkippedContent): + skipped_contents.append(c) + else: + contents.append(c) + + collision_aware_content_add( + storage.skipped_content_add, skipped_contents) + collision_aware_content_add( + storage.content_add_metadata, contents) + elif object_type == 'revision': + storage.revision_add( + Revision.from_dict(r) for r in _fix_revisions(objects) + ) elif object_type == 'origin_visit': - for visit in objects: - storage.origin_add_one({'url': visit['origin']}) - if 'metadata' not in visit: - visit['metadata'] = None - storage.origin_visit_upsert(objects) + visits = [_fix_origin_visit(v) for v in objects] + storage.origin_add(Origin(url=v.origin) for v in visits) + storage.origin_visit_upsert(visits) + elif object_type in ('directory', 'release', 'snapshot', 'origin'): + method = getattr(storage, object_type + '_add') + method(object_converter_fn[object_type](o) for o in objects) else: logger.warning('Received a series of %s, this should not happen', object_type) def is_hash_in_bytearray(hash_, array, nb_hashes, hash_size=SHA1_SIZE): """ Checks if the given hash is in the provided `array`. The array must be a *sorted* list of sha1 hashes, and contain `nb_hashes` hashes (so its size must by `nb_hashes*hash_size` bytes). Args: hash_ (bytes): the hash to look for array (bytes): a sorted concatenated array of hashes (may be of any type supporting slice indexing, eg. :class:`mmap.mmap`) nb_hashes (int): number of hashes in the array hash_size (int): size of a hash (defaults to 20, for SHA1) Example: >>> import os >>> hash1 = os.urandom(20) >>> hash2 = os.urandom(20) >>> hash3 = os.urandom(20) >>> array = b''.join(sorted([hash1, hash2])) >>> is_hash_in_bytearray(hash1, array, 2) True >>> is_hash_in_bytearray(hash2, array, 2) True >>> is_hash_in_bytearray(hash3, array, 2) False """ if len(hash_) != hash_size: raise ValueError('hash_ does not match the provided hash_size.') def get_hash(position): return array[position*hash_size:(position+1)*hash_size] # Regular dichotomy: left = 0 right = nb_hashes while left < right-1: middle = int((right+left)/2) pivot = get_hash(middle) if pivot == hash_: return True elif pivot < hash_: left = middle else: right = middle return get_hash(left) == hash_ +class ReplayError(Exception): + """An error occurred during the replay of an object""" + def __init__(self, operation, *, obj_id, exc): + self.operation = operation + self.obj_id = hash_to_hex(obj_id) + self.exc = exc + + def __str__(self): + return "ReplayError(doing %s, %s, %s)" % ( + self.operation, self.obj_id, self.exc + ) + + +def log_replay_retry(retry_obj, sleep, last_result): + """Log a retry of the content replayer""" + exc = last_result.exception() + logger.debug('Retry operation %(operation)s on %(obj_id)s: %(exc)s', + {'operation': exc.operation, 'obj_id': exc.obj_id, + 'exc': str(exc.exc)}) + + statsd.increment(CONTENT_RETRY_METRIC, tags={ + 'operation': exc.operation, + 'attempt': str(retry_obj.statistics['attempt_number']), + }) + + +def log_replay_error(last_attempt): + """Log a replay error to sentry""" + exc = last_attempt.exception() + with push_scope() as scope: + scope.set_tag('operation', exc.operation) + scope.set_extra('obj_id', exc.obj_id) + capture_exception(exc.exc) + + logger.error( + 'Failed operation %(operation)s on %(obj_id)s after %(retries)s' + ' retries: %(exc)s', { + 'obj_id': exc.obj_id, 'operation': exc.operation, + 'exc': str(exc.exc), 'retries': last_attempt.attempt_number, + }) + + return None + + +CONTENT_REPLAY_RETRIES = 3 + +content_replay_retry = retry( + retry=retry_if_exception_type(ReplayError), + stop=stop_after_attempt(CONTENT_REPLAY_RETRIES), + wait=wait_random_exponential(multiplier=1, max=60), + before_sleep=log_replay_retry, + retry_error_callback=log_replay_error, +) + + +@content_replay_retry def copy_object(obj_id, src, dst): hex_obj_id = hash_to_hex(obj_id) obj = '' try: with statsd.timed(CONTENT_DURATION_METRIC, tags={'request': 'get'}): obj = src.get(obj_id) logger.debug('retrieved %(obj_id)s', {'obj_id': hex_obj_id}) with statsd.timed(CONTENT_DURATION_METRIC, tags={'request': 'put'}): dst.add(obj, obj_id=obj_id, check_presence=False) logger.debug('copied %(obj_id)s', {'obj_id': hex_obj_id}) statsd.increment(CONTENT_BYTES_METRIC, len(obj)) - except Exception as exc: - logger.error('Failed to copy %(obj_id)s: %(exc)s', - {'obj_id': hex_obj_id, 'exc': str(exc)}) + except ObjNotFoundError: + logger.error('Failed to copy %(obj_id)s: object not found', + {'obj_id': hex_obj_id}) raise + except Exception as exc: + raise ReplayError('copy', obj_id=obj_id, exc=exc) from None return len(obj) -@retry(stop=stop_after_attempt(3), - reraise=True, - wait=wait_random_exponential(multiplier=1, max=60)) +@content_replay_retry def obj_in_objstorage(obj_id, dst): """Check if an object is already in an objstorage, tenaciously""" - return obj_id in dst + try: + return obj_id in dst + except Exception as exc: + raise ReplayError('in_dst', obj_id=obj_id, exc=exc) from None def process_replay_objects_content( all_objects: Dict[str, List[dict]], *, src: ObjStorage, dst: ObjStorage, exclude_fn: Optional[Callable[[dict], bool]] = None, check_dst: bool = True, ): """ Takes a list of records from Kafka (see :py:func:`swh.journal.client.JournalClient.process`) and copies them from the `src` objstorage to the `dst` objstorage, if: * `obj['status']` is `'visible'` * `exclude_fn(obj)` is `False` (if `exclude_fn` is provided) * `obj['sha1'] not in dst` (if `check_dst` is True) Args: all_objects: Objects passed by the Kafka client. Most importantly, `all_objects['content'][*]['sha1']` is the sha1 hash of each content. src: An object storage (see :py:func:`swh.objstorage.get_objstorage`) dst: An object storage (see :py:func:`swh.objstorage.get_objstorage`) exclude_fn: Determines whether an object should be copied. check_dst: Determines whether we should check the destination objstorage before copying. Example: >>> from swh.objstorage import get_objstorage >>> src = get_objstorage('memory', {}) >>> dst = get_objstorage('memory', {}) >>> id1 = src.add(b'foo bar') >>> id2 = src.add(b'baz qux') >>> kafka_partitions = { ... 'content': [ ... { ... 'sha1': id1, ... 'status': 'visible', ... }, ... { ... 'sha1': id2, ... 'status': 'visible', ... }, ... ] ... } >>> process_replay_objects_content( ... kafka_partitions, src=src, dst=dst, ... exclude_fn=lambda obj: obj['sha1'] == id1) >>> id1 in dst False >>> id2 in dst True """ vol = [] nb_skipped = 0 + nb_failures = 0 t0 = time() for (object_type, objects) in all_objects.items(): if object_type != 'content': logger.warning( 'Received a series of %s, this should not happen', object_type) continue for obj in objects: obj_id = obj[ID_HASH_ALGO] if obj['status'] != 'visible': nb_skipped += 1 logger.debug('skipped %s (status=%s)', hash_to_hex(obj_id), obj['status']) statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "skipped", "status": obj["status"]}) elif exclude_fn and exclude_fn(obj): nb_skipped += 1 logger.debug('skipped %s (manually excluded)', hash_to_hex(obj_id)) statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "excluded"}) elif check_dst and obj_in_objstorage(obj_id, dst): nb_skipped += 1 logger.debug('skipped %s (in dst)', hash_to_hex(obj_id)) statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "in_dst"}) else: try: copied = copy_object(obj_id, src, dst) except ObjNotFoundError: nb_skipped += 1 statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "not_in_src"}) else: - vol.append(copied) - statsd.increment(CONTENT_OPERATIONS_METRIC, - tags={"decision": "copied"}) + if copied is None: + nb_failures += 1 + statsd.increment(CONTENT_OPERATIONS_METRIC, + tags={"decision": "failed"}) + else: + vol.append(copied) + statsd.increment(CONTENT_OPERATIONS_METRIC, + tags={"decision": "copied"}) dt = time() - t0 logger.info( 'processed %s content objects in %.1fsec ' - '(%.1f obj/sec, %.1fMB/sec) - %d failures - %d skipped', + '(%.1f obj/sec, %.1fMB/sec) - %d failed - %d skipped', len(vol), dt, len(vol)/dt, sum(vol)/1024/1024/dt, - len([x for x in vol if not x]), + nb_failures, nb_skipped) if notify: notify('WATCHDOG=1') diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py index 782e371..d75792a 100644 --- a/swh/journal/tests/conftest.py +++ b/swh/journal/tests/conftest.py @@ -1,245 +1,264 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import pytest import logging import random import string from confluent_kafka import Consumer from subprocess import Popen from typing import Any, Dict, List, Optional, Tuple from pathlib import Path from pytest_kafka import ( make_zookeeper_process, make_kafka_server, ZOOKEEPER_CONFIG_TEMPLATE, ) from swh.model.hashutil import hash_to_bytes logger = logging.getLogger(__name__) - CONTENTS = [ { 'length': 3, 'sha1': hash_to_bytes( '34973274ccef6ab4dfaaf86599792fa9c3fe4689'), 'sha1_git': b'foo', 'blake2s256': b'bar', 'sha256': b'baz', 'status': 'visible', }, ] +duplicate_content1 = { + 'length': 4, + 'sha1': hash_to_bytes( + '44973274ccef6ab4dfaaf86599792fa9c3fe4689'), + 'sha1_git': b'another-foo', + 'blake2s256': b'another-bar', + 'sha256': b'another-baz', + 'status': 'visible', +} + +# Craft a sha1 collision +duplicate_content2 = duplicate_content1.copy() +sha1_array = bytearray(duplicate_content1['sha1_git']) +sha1_array[0] += 1 +duplicate_content2['sha1_git'] = bytes(sha1_array) + + +DUPLICATE_CONTENTS = [duplicate_content1, duplicate_content2] + + COMMITTERS = [ { 'fullname': b'foo', 'name': b'foo', 'email': b'', }, { 'fullname': b'bar', 'name': b'bar', 'email': b'', } ] DATES = [ { 'timestamp': { 'seconds': 1234567891, 'microseconds': 0, }, 'offset': 120, 'negative_utc': None, }, { 'timestamp': { 'seconds': 1234567892, 'microseconds': 0, }, 'offset': 120, 'negative_utc': None, } ] REVISIONS = [ { 'id': hash_to_bytes('7026b7c1a2af56521e951c01ed20f255fa054238'), 'message': b'hello', 'date': DATES[0], 'committer': COMMITTERS[0], 'author': COMMITTERS[0], 'committer_date': DATES[0], 'type': 'git', 'directory': '\x01'*20, 'synthetic': False, 'metadata': None, 'parents': [], }, { 'id': hash_to_bytes('368a48fe15b7db2383775f97c6b247011b3f14f4'), 'message': b'hello again', 'date': DATES[1], 'committer': COMMITTERS[1], 'author': COMMITTERS[1], 'committer_date': DATES[1], 'type': 'hg', 'directory': '\x02'*20, 'synthetic': False, 'metadata': None, 'parents': [], }, ] RELEASES = [ { 'id': hash_to_bytes('d81cc0710eb6cf9efd5b920a8453e1e07157b6cd'), 'name': b'v0.0.1', 'date': { 'timestamp': { 'seconds': 1234567890, 'microseconds': 0, }, 'offset': 120, 'negative_utc': None, }, 'author': COMMITTERS[0], 'target_type': 'revision', 'target': b'\x04'*20, 'message': b'foo', 'synthetic': False, }, ] ORIGINS = [ { 'url': 'https://somewhere.org/den/fox', }, { 'url': 'https://overtherainbow.org/fox/den', } ] ORIGIN_VISITS = [ { 'origin': ORIGINS[0]['url'], 'date': '2013-05-07 04:20:39.369271+00:00', 'snapshot': None, # TODO 'status': 'ongoing', # TODO 'metadata': {'foo': 'bar'}, 'type': 'git', }, { 'origin': ORIGINS[0]['url'], 'date': '2018-11-27 17:20:39+00:00', 'snapshot': None, # TODO 'status': 'ongoing', # TODO 'metadata': {'baz': 'qux'}, 'type': 'git', } ] # From type to tuple (id, ) OBJECT_TYPE_KEYS = { 'content': ('sha1', CONTENTS), 'revision': ('id', REVISIONS), 'release': ('id', RELEASES), 'origin': (None, ORIGINS), 'origin_visit': (None, ORIGIN_VISITS), } # type: Dict[str, Tuple[Optional[str], List[Dict[str, Any]]]] KAFKA_ROOT = os.environ.get('SWH_KAFKA_ROOT') KAFKA_ROOT = KAFKA_ROOT if KAFKA_ROOT else os.path.dirname(__file__) + '/kafka' if not os.path.exists(KAFKA_ROOT): msg = ('Development error: %s must exist and target an ' 'existing kafka installation' % KAFKA_ROOT) raise ValueError(msg) KAFKA_SCRIPTS = Path(KAFKA_ROOT) / 'bin' KAFKA_BIN = str(KAFKA_SCRIPTS / 'kafka-server-start.sh') ZOOKEEPER_BIN = str(KAFKA_SCRIPTS / 'zookeeper-server-start.sh') ZK_CONFIG_TEMPLATE = ZOOKEEPER_CONFIG_TEMPLATE + '\nadmin.enableServer=false\n' # Those defines fixtures zookeeper_proc = make_zookeeper_process(ZOOKEEPER_BIN, zk_config_template=ZK_CONFIG_TEMPLATE, scope='session') os.environ['KAFKA_LOG4J_OPTS'] = \ '-Dlog4j.configuration=file:%s/log4j.properties' % \ os.path.dirname(__file__) kafka_server = make_kafka_server(KAFKA_BIN, 'zookeeper_proc', scope='session') kafka_logger = logging.getLogger('kafka') kafka_logger.setLevel(logging.WARN) @pytest.fixture(scope='function') def kafka_prefix(): """Pick a random prefix for kafka topics on each call""" return ''.join(random.choice(string.ascii_lowercase) for _ in range(10)) @pytest.fixture(scope='function') def kafka_consumer_group(kafka_prefix: str): """Pick a random consumer group for kafka consumers on each call""" return "test-consumer-%s" % kafka_prefix TEST_CONFIG = { 'consumer_id': 'swh.journal.consumer', 'object_types': OBJECT_TYPE_KEYS.keys(), - 'max_messages': 1, # will read 1 message and stops + 'stop_after_objects': 1, # will read 1 object and stop 'storage': {'cls': 'memory', 'args': {}}, } @pytest.fixture def test_config(kafka_server: Tuple[Popen, int], kafka_prefix: str): """Test configuration needed for producer/consumer """ _, port = kafka_server return { **TEST_CONFIG, 'brokers': ['127.0.0.1:{}'.format(port)], 'prefix': kafka_prefix + '.swh.journal.objects', } @pytest.fixture def consumer( kafka_server: Tuple[Popen, int], test_config: Dict, kafka_consumer_group: str, ) -> Consumer: """Get a connected Kafka consumer. """ _, kafka_port = kafka_server consumer = Consumer({ 'bootstrap.servers': '127.0.0.1:{}'.format(kafka_port), 'auto.offset.reset': 'earliest', 'enable.auto.commit': True, 'group.id': kafka_consumer_group, }) kafka_topics = [ '%s.%s' % (test_config['prefix'], object_type) for object_type in test_config['object_types'] ] consumer.subscribe(kafka_topics) yield consumer consumer.close() diff --git a/swh/journal/tests/log4j.properties b/swh/journal/tests/log4j.properties new file mode 100644 index 0000000..cce181c --- /dev/null +++ b/swh/journal/tests/log4j.properties @@ -0,0 +1,14 @@ +# Set root logger level to DEBUG and its only appender to A1. +log4j.rootLogger=DEBUG, A1 + +# A1 is set to be a ConsoleAppender. +log4j.appender.A1=org.apache.log4j.ConsoleAppender + +# A1 uses PatternLayout. +log4j.appender.A1.layout=org.apache.log4j.PatternLayout +log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + +log4j.logger.org.apache.log4j=WARN +log4j.logger.org.apache.kafka=WARN +log4j.logger.kafka=WARN +log4j.logger.org.apache.zookeeper=WARN diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py index 809f7e0..078e074 100644 --- a/swh/journal/tests/test_cli.py +++ b/swh/journal/tests/test_cli.py @@ -1,516 +1,639 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import Counter +import copy import functools import logging import re import tempfile from subprocess import Popen from typing import Any, Dict, Tuple from unittest.mock import patch from click.testing import CliRunner from confluent_kafka import Producer import pytest +import yaml from swh.model.hashutil import hash_to_hex from swh.objstorage.backends.in_memory import InMemoryObjStorage from swh.storage import get_storage from swh.journal.cli import cli +from swh.journal.replay import CONTENT_REPLAY_RETRIES from swh.journal.serializers import key_to_kafka, value_to_kafka logger = logging.getLogger(__name__) -CLI_CONFIG = ''' -storage: - cls: memory - args: {} -objstorage_src: - cls: mocked - args: - name: src -objstorage_dst: - cls: mocked - args: - name: dst -''' +CLI_CONFIG = { + 'storage': { + 'cls': 'memory', + }, + 'objstorage_src': { + 'cls': 'mocked', + 'name': 'src', + }, + 'objstorage_dst': { + 'cls': 'mocked', + 'name': 'dst', + }, +} @pytest.fixture def storage(): """An swh-storage object that gets injected into the CLI functions.""" storage_config = { 'cls': 'pipeline', 'steps': [ - {'cls': 'validate'}, {'cls': 'memory'}, ] } storage = get_storage(**storage_config) with patch('swh.journal.cli.get_storage') as get_storage_mock: get_storage_mock.return_value = storage yield storage -def invoke(catch_exceptions, args, env=None): +@pytest.fixture +def monkeypatch_retry_sleep(monkeypatch): + from swh.journal.replay import copy_object, obj_in_objstorage + monkeypatch.setattr(copy_object.retry, 'sleep', lambda x: None) + monkeypatch.setattr(obj_in_objstorage.retry, 'sleep', lambda x: None) + + +def invoke(*args, env=None, journal_config=None): + config = copy.deepcopy(CLI_CONFIG) + if journal_config: + config['journal'] = journal_config + runner = CliRunner() with tempfile.NamedTemporaryFile('a', suffix='.yml') as config_fd: - config_fd.write(CLI_CONFIG) + yaml.dump(config, config_fd) config_fd.seek(0) - args = ['-C' + config_fd.name] + args - result = runner.invoke( + args = ['-C' + config_fd.name] + list(args) + return runner.invoke( cli, args, obj={'log_level': logging.DEBUG}, env=env, ) - if not catch_exceptions and result.exception: - print(result.output) - raise result.exception - return result def test_replay( storage, kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int]): - (_, port) = kafka_server + (_, kafka_port) = kafka_server kafka_prefix += '.swh.journal.objects' producer = Producer({ - 'bootstrap.servers': 'localhost:{}'.format(port), + 'bootstrap.servers': 'localhost:{}'.format(kafka_port), 'client.id': 'test-producer', - 'enable.idempotence': 'true', + 'acks': 'all', }) snapshot = {'id': b'foo', 'branches': { b'HEAD': { 'target_type': 'revision', 'target': b'\x01'*20, } }} # type: Dict[str, Any] producer.produce( topic=kafka_prefix+'.snapshot', key=key_to_kafka(snapshot['id']), value=value_to_kafka(snapshot), ) producer.flush() logger.debug('Flushed producer') - result = invoke(False, [ + result = invoke( 'replay', - '--broker', '127.0.0.1:%d' % port, - '--group-id', kafka_consumer_group, - '--prefix', kafka_prefix, - '--max-messages', '1', - ]) + '--stop-after-objects', '1', + journal_config={ + 'brokers': ['127.0.0.1:%d' % kafka_port], + 'group_id': kafka_consumer_group, + 'prefix': kafka_prefix, + }, + ) + expected = r'Done.\n' assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output assert storage.snapshot_get(snapshot['id']) == { **snapshot, 'next_branch': None} def _patch_objstorages(names): objstorages = {name: InMemoryObjStorage() for name in names} - def get_mock_objstorage(cls, args): + def get_mock_objstorage(cls, **args): assert cls == 'mocked', cls return objstorages[args['name']] def decorator(f): @functools.wraps(f) @patch('swh.journal.cli.get_objstorage') def newf(get_objstorage_mock, *args, **kwargs): get_objstorage_mock.side_effect = get_mock_objstorage f(*args, objstorages=objstorages, **kwargs) return newf return decorator NUM_CONTENTS = 10 def _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages): producer = Producer({ 'bootstrap.servers': '127.0.0.1:{}'.format(kafka_port), 'client.id': 'test-producer', - 'enable.idempotence': 'true', + 'acks': 'all', }) contents = {} for i in range(NUM_CONTENTS): content = b'\x00'*19 + bytes([i]) sha1 = objstorages['src'].add(content) contents[sha1] = content producer.produce( topic=kafka_prefix+'.content', key=key_to_kafka(sha1), value=key_to_kafka({ 'sha1': sha1, 'status': 'visible', }), ) producer.flush() return contents @_patch_objstorages(['src', 'dst']) def test_replay_content( objstorages, storage, kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int]): (_, kafka_port) = kafka_server kafka_prefix += '.swh.journal.objects' contents = _fill_objstorage_and_kafka( kafka_port, kafka_prefix, objstorages) - result = invoke(False, [ + result = invoke( 'content-replay', - '--broker', '127.0.0.1:%d' % kafka_port, - '--group-id', kafka_consumer_group, - '--prefix', kafka_prefix, - '--max-messages', str(NUM_CONTENTS), - ]) + '--stop-after-objects', str(NUM_CONTENTS), + journal_config={ + 'brokers': ['127.0.0.1:%d' % kafka_port], + 'group_id': kafka_consumer_group, + 'prefix': kafka_prefix, + }, + ) + expected = r'Done.\n' assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output for (sha1, content) in contents.items(): assert sha1 in objstorages['dst'], sha1 assert objstorages['dst'].get(sha1) == content @_patch_objstorages(['src', 'dst']) def test_replay_content_structured_log( objstorages, storage, kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int], caplog): (_, kafka_port) = kafka_server kafka_prefix += '.swh.journal.objects' contents = _fill_objstorage_and_kafka( kafka_port, kafka_prefix, objstorages) caplog.set_level(logging.DEBUG, 'swh.journal.replay') expected_obj_ids = set(hash_to_hex(sha1) for sha1 in contents) - result = invoke(False, [ + result = invoke( 'content-replay', - '--broker', '127.0.0.1:%d' % kafka_port, - '--group-id', kafka_consumer_group, - '--prefix', kafka_prefix, - '--max-messages', str(NUM_CONTENTS), - ]) + '--stop-after-objects', str(NUM_CONTENTS), + journal_config={ + 'brokers': ['127.0.0.1:%d' % kafka_port], + 'group_id': kafka_consumer_group, + 'prefix': kafka_prefix, + }, + ) expected = r'Done.\n' assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output copied = set() for record in caplog.records: logtext = record.getMessage() if 'copied' in logtext: copied.add(record.args['obj_id']) assert copied == expected_obj_ids, ( "Mismatched logging; see captured log output for details." ) @_patch_objstorages(['src', 'dst']) def test_replay_content_static_group_id( objstorages, storage, kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int], caplog): (_, kafka_port) = kafka_server kafka_prefix += '.swh.journal.objects' contents = _fill_objstorage_and_kafka( kafka_port, kafka_prefix, objstorages) # Setup log capture to fish the consumer settings out of the log messages caplog.set_level(logging.DEBUG, 'swh.journal.client') - result = invoke(False, [ + result = invoke( 'content-replay', - '--broker', '127.0.0.1:%d' % kafka_port, - '--group-id', kafka_consumer_group, - '--prefix', kafka_prefix, - '--max-messages', str(NUM_CONTENTS), - ], {'KAFKA_GROUP_INSTANCE_ID': 'static-group-instance-id'}) + '--stop-after-objects', str(NUM_CONTENTS), + env={'KAFKA_GROUP_INSTANCE_ID': 'static-group-instance-id'}, + journal_config={ + 'brokers': ['127.0.0.1:%d' % kafka_port], + 'group_id': kafka_consumer_group, + 'prefix': kafka_prefix, + }, + ) expected = r'Done.\n' assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output consumer_settings = None for record in caplog.records: if 'Consumer settings' in record.message: consumer_settings = record.args break assert consumer_settings is not None, ( 'Failed to get consumer settings out of the consumer log. ' 'See log capture for details.' ) assert consumer_settings['group.instance.id'] == 'static-group-instance-id' assert consumer_settings['session.timeout.ms'] == 60 * 10 * 1000 assert consumer_settings['max.poll.interval.ms'] == 90 * 10 * 1000 for (sha1, content) in contents.items(): assert sha1 in objstorages['dst'], sha1 assert objstorages['dst'].get(sha1) == content @_patch_objstorages(['src', 'dst']) def test_replay_content_exclude( objstorages, storage, kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int]): (_, kafka_port) = kafka_server kafka_prefix += '.swh.journal.objects' contents = _fill_objstorage_and_kafka( kafka_port, kafka_prefix, objstorages) excluded_contents = list(contents)[0::2] # picking half of them with tempfile.NamedTemporaryFile(mode='w+b') as fd: fd.write(b''.join(sorted(excluded_contents))) fd.seek(0) - result = invoke(False, [ + result = invoke( 'content-replay', - '--broker', '127.0.0.1:%d' % kafka_port, - '--group-id', kafka_consumer_group, - '--prefix', kafka_prefix, - '--max-messages', str(NUM_CONTENTS), + '--stop-after-objects', str(NUM_CONTENTS), '--exclude-sha1-file', fd.name, - ]) + journal_config={ + 'brokers': ['127.0.0.1:%d' % kafka_port], + 'group_id': kafka_consumer_group, + 'prefix': kafka_prefix, + }, + ) expected = r'Done.\n' assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output for (sha1, content) in contents.items(): if sha1 in excluded_contents: assert sha1 not in objstorages['dst'], sha1 else: assert sha1 in objstorages['dst'], sha1 assert objstorages['dst'].get(sha1) == content NUM_CONTENTS_DST = 5 @_patch_objstorages(['src', 'dst']) @pytest.mark.parametrize("check_dst,expected_copied,expected_in_dst", [ (True, NUM_CONTENTS - NUM_CONTENTS_DST, NUM_CONTENTS_DST), (False, NUM_CONTENTS, 0), ]) def test_replay_content_check_dst( objstorages, storage, kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int], check_dst: bool, expected_copied: int, expected_in_dst: int, caplog): (_, kafka_port) = kafka_server kafka_prefix += '.swh.journal.objects' contents = _fill_objstorage_and_kafka( kafka_port, kafka_prefix, objstorages) for i, (sha1, content) in enumerate(contents.items()): if i >= NUM_CONTENTS_DST: break objstorages['dst'].add(content, obj_id=sha1) caplog.set_level(logging.DEBUG, 'swh.journal.replay') - result = invoke(False, [ + result = invoke( 'content-replay', - '--broker', '127.0.0.1:%d' % kafka_port, - '--group-id', kafka_consumer_group, - '--prefix', kafka_prefix, - '--max-messages', str(NUM_CONTENTS), + '--stop-after-objects', str(NUM_CONTENTS), '--check-dst' if check_dst else '--no-check-dst', - ]) + journal_config={ + 'brokers': ['127.0.0.1:%d' % kafka_port], + 'group_id': kafka_consumer_group, + 'prefix': kafka_prefix, + }, + ) expected = r'Done.\n' assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output copied = 0 in_dst = 0 for record in caplog.records: logtext = record.getMessage() if 'copied' in logtext: copied += 1 elif 'in dst' in logtext: in_dst += 1 assert (copied == expected_copied and in_dst == expected_in_dst), ( "Unexpected amount of objects copied, see the captured log for details" ) for (sha1, content) in contents.items(): assert sha1 in objstorages['dst'], sha1 assert objstorages['dst'].get(sha1) == content class FlakyObjStorage(InMemoryObjStorage): def __init__(self, *args, **kwargs): state = kwargs.pop('state') self.failures_left = Counter(kwargs.pop('failures')) super().__init__(*args, **kwargs) if state: self.state = state def flaky_operation(self, op, obj_id): if self.failures_left[op, obj_id] > 0: self.failures_left[op, obj_id] -= 1 raise RuntimeError( 'Failed %s on %s' % (op, hash_to_hex(obj_id)) ) def get(self, obj_id): self.flaky_operation('get', obj_id) return super().get(obj_id) def add(self, data, obj_id=None, check_presence=True): self.flaky_operation('add', obj_id) return super().add(data, obj_id=obj_id, check_presence=check_presence) def __contains__(self, obj_id): self.flaky_operation('in', obj_id) return super().__contains__(obj_id) @_patch_objstorages(['src', 'dst']) def test_replay_content_check_dst_retry( objstorages, storage, kafka_prefix: str, kafka_consumer_group: str, - kafka_server: Tuple[Popen, int]): + kafka_server: Tuple[Popen, int], + monkeypatch_retry_sleep): (_, kafka_port) = kafka_server kafka_prefix += '.swh.journal.objects' contents = _fill_objstorage_and_kafka( kafka_port, kafka_prefix, objstorages) failures = {} for i, (sha1, content) in enumerate(contents.items()): if i >= NUM_CONTENTS_DST: break objstorages['dst'].add(content, obj_id=sha1) failures['in', sha1] = 1 orig_dst = objstorages['dst'] objstorages['dst'] = FlakyObjStorage(state=orig_dst.state, failures=failures) - result = invoke(False, [ + result = invoke( 'content-replay', - '--broker', '127.0.0.1:%d' % kafka_port, - '--group-id', kafka_consumer_group, - '--prefix', kafka_prefix, - '--max-messages', str(NUM_CONTENTS), + '--stop-after-objects', str(NUM_CONTENTS), '--check-dst', - ]) + journal_config={ + 'brokers': ['127.0.0.1:%d' % kafka_port], + 'group_id': kafka_consumer_group, + 'prefix': kafka_prefix, + }, + ) expected = r'Done.\n' assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output for (sha1, content) in contents.items(): assert sha1 in objstorages['dst'], sha1 assert objstorages['dst'].get(sha1) == content +@_patch_objstorages(['src', 'dst']) +def test_replay_content_failed_copy_retry( + objstorages, + storage, + kafka_prefix: str, + kafka_consumer_group: str, + kafka_server: Tuple[Popen, int], + caplog, + monkeypatch_retry_sleep): + (_, kafka_port) = kafka_server + kafka_prefix += '.swh.journal.objects' + + contents = _fill_objstorage_and_kafka( + kafka_port, kafka_prefix, objstorages) + + add_failures = {} + get_failures = {} + definitely_failed = set() + + # We want to generate operations failing 1 to CONTENT_REPLAY_RETRIES times. + # We generate failures for 2 different operations, get and add. + num_retry_contents = 2 * CONTENT_REPLAY_RETRIES + + assert num_retry_contents < NUM_CONTENTS, ( + "Need to generate more test contents to properly test retry behavior" + ) + + for i, sha1 in enumerate(contents): + if i >= num_retry_contents: + break + + # This generates a number of failures, up to CONTENT_REPLAY_RETRIES + num_failures = (i % CONTENT_REPLAY_RETRIES) + 1 + + # This generates failures of add for the first CONTENT_REPLAY_RETRIES + # objects, then failures of get. + if i < CONTENT_REPLAY_RETRIES: + add_failures['add', sha1] = num_failures + else: + get_failures['get', sha1] = num_failures + + # Only contents that have CONTENT_REPLAY_RETRIES or more are + # definitely failing + if num_failures >= CONTENT_REPLAY_RETRIES: + definitely_failed.add(hash_to_hex(sha1)) + + objstorages['dst'] = FlakyObjStorage( + state=objstorages['dst'].state, + failures=add_failures, + ) + objstorages['src'] = FlakyObjStorage( + state=objstorages['src'].state, + failures=get_failures, + ) + + caplog.set_level(logging.DEBUG, 'swh.journal.replay') + + result = invoke( + 'content-replay', + '--stop-after-objects', str(NUM_CONTENTS), + journal_config={ + 'brokers': ['127.0.0.1:%d' % kafka_port], + 'group_id': kafka_consumer_group, + 'prefix': kafka_prefix, + }, + ) + expected = r'Done.\n' + assert result.exit_code == 0, result.output + assert re.fullmatch(expected, result.output, re.MULTILINE), result.output + + copied = 0 + actually_failed = set() + for record in caplog.records: + logtext = record.getMessage() + if 'copied' in logtext: + copied += 1 + elif 'Failed operation' in logtext: + assert record.levelno == logging.ERROR + assert record.args['retries'] == CONTENT_REPLAY_RETRIES + actually_failed.add(record.args['obj_id']) + + assert actually_failed == definitely_failed, ( + 'Unexpected object copy failures; see captured log for details' + ) + + for (sha1, content) in contents.items(): + if hash_to_hex(sha1) in definitely_failed: + assert sha1 not in objstorages['dst'] + continue + + assert sha1 in objstorages['dst'], sha1 + assert objstorages['dst'].get(sha1) == content + + @_patch_objstorages(['src', 'dst']) def test_replay_content_objnotfound( objstorages, storage, kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int], caplog): (_, kafka_port) = kafka_server kafka_prefix += '.swh.journal.objects' contents = _fill_objstorage_and_kafka( kafka_port, kafka_prefix, objstorages) num_contents_deleted = 5 contents_deleted = set() for i, sha1 in enumerate(contents): if i >= num_contents_deleted: break del objstorages['src'].state[sha1] contents_deleted.add(hash_to_hex(sha1)) caplog.set_level(logging.DEBUG, 'swh.journal.replay') - result = invoke(False, [ + result = invoke( 'content-replay', - '--broker', '127.0.0.1:%d' % kafka_port, - '--group-id', kafka_consumer_group, - '--prefix', kafka_prefix, - '--max-messages', str(NUM_CONTENTS), - ]) + '--stop-after-objects', str(NUM_CONTENTS), + journal_config={ + 'brokers': ['127.0.0.1:%d' % kafka_port], + 'group_id': kafka_consumer_group, + 'prefix': kafka_prefix, + }, + ) expected = r'Done.\n' assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output copied = 0 not_in_src = set() for record in caplog.records: logtext = record.getMessage() if 'copied' in logtext: copied += 1 elif 'object not found' in logtext: # Check that the object id can be recovered from logs assert record.levelno == logging.ERROR not_in_src.add(record.args['obj_id']) assert copied == NUM_CONTENTS - num_contents_deleted, ( "Unexpected number of contents copied" ) assert not_in_src == contents_deleted, ( "Mismatch between deleted contents and not_in_src logs" ) for (sha1, content) in contents.items(): if sha1 not in objstorages['src']: continue assert sha1 in objstorages['dst'], sha1 assert objstorages['dst'].get(sha1) == content diff --git a/swh/journal/tests/test_client.py b/swh/journal/tests/test_client.py index 23c78ab..122f304 100644 --- a/swh/journal/tests/test_client.py +++ b/swh/journal/tests/test_client.py @@ -1,87 +1,138 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from subprocess import Popen -from typing import Tuple +from typing import Dict, List, Tuple from unittest.mock import MagicMock from confluent_kafka import Producer +import pytest from swh.model.hypothesis_strategies import revisions +from swh.model.model import Content from swh.journal.client import JournalClient from swh.journal.serializers import key_to_kafka, value_to_kafka def test_client( kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int]): (_, port) = kafka_server kafka_prefix += '.swh.journal.objects' producer = Producer({ 'bootstrap.servers': 'localhost:{}'.format(port), 'client.id': 'test producer', - 'enable.idempotence': 'true', + 'acks': 'all', }) rev = revisions().example() # Fill Kafka producer.produce( topic=kafka_prefix + '.revision', key=key_to_kafka(rev.id), value=value_to_kafka(rev.to_dict()), ) producer.flush() - config = { - 'brokers': 'localhost:%d' % kafka_server[1], - 'group_id': kafka_consumer_group, - 'prefix': kafka_prefix, - 'max_messages': 1, - } - client = JournalClient(**config) - + client = JournalClient( + brokers='localhost:%d' % kafka_server[1], + group_id=kafka_consumer_group, + prefix=kafka_prefix, + stop_after_objects=1, + ) worker_fn = MagicMock() client.process(worker_fn) worker_fn.assert_called_once_with({'revision': [rev.to_dict()]}) def test_client_eof( kafka_prefix: str, kafka_consumer_group: str, kafka_server: Tuple[Popen, int]): (_, port) = kafka_server kafka_prefix += '.swh.journal.objects' producer = Producer({ 'bootstrap.servers': 'localhost:{}'.format(port), 'client.id': 'test producer', - 'enable.idempotence': 'true', + 'acks': 'all', }) rev = revisions().example() # Fill Kafka producer.produce( topic=kafka_prefix + '.revision', key=key_to_kafka(rev.id), value=value_to_kafka(rev.to_dict()), ) producer.flush() - config = { - 'brokers': 'localhost:%d' % kafka_server[1], - 'group_id': kafka_consumer_group, - 'prefix': kafka_prefix, - 'stop_on_eof': True, - } - client = JournalClient(**config) + client = JournalClient( + brokers='localhost:%d' % kafka_server[1], + group_id=kafka_consumer_group, + prefix=kafka_prefix, + stop_after_objects=None, + stop_on_eof=True, + ) worker_fn = MagicMock() client.process(worker_fn) worker_fn.assert_called_once_with({'revision': [rev.to_dict()]}) + + +@pytest.mark.parametrize("batch_size", [1, 5, 100]) +def test_client_batch_size( + kafka_prefix: str, + kafka_consumer_group: str, + kafka_server: Tuple[Popen, int], + batch_size: int, +): + (_, port) = kafka_server + kafka_prefix += '.swh.journal.objects' + + num_objects = 2 * batch_size + 1 + assert num_objects < 256, "Too many objects, generation will fail" + + producer = Producer({ + 'bootstrap.servers': 'localhost:{}'.format(port), + 'client.id': 'test producer', + 'acks': 'all', + }) + + contents = [Content.from_data(bytes([i])) for i in range(num_objects)] + + # Fill Kafka + for content in contents: + producer.produce( + topic=kafka_prefix + '.content', + key=key_to_kafka(content.sha1), + value=value_to_kafka(content.to_dict()), + ) + + producer.flush() + + client = JournalClient( + brokers=['localhost:%d' % kafka_server[1]], + group_id=kafka_consumer_group, + prefix=kafka_prefix, + stop_after_objects=num_objects, + batch_size=batch_size, + ) + + collected_output: List[Dict] = [] + + def worker_fn(objects): + received = objects['content'] + assert len(received) <= batch_size + collected_output.extend(received) + + client.process(worker_fn) + + assert collected_output == [content.to_dict() for content in contents] diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py index 847dfe9..90861a2 100644 --- a/swh/journal/tests/test_kafka_writer.py +++ b/swh/journal/tests/test_kafka_writer.py @@ -1,150 +1,161 @@ -# Copyright (C) 2018-2019 The Software Heritage developers +# Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import datetime from confluent_kafka import Consumer, KafkaException from subprocess import Popen -from typing import Tuple +from typing import List, Tuple from swh.storage import get_storage -from swh.journal.writer.kafka import KafkaJournalWriter +from swh.journal.replay import object_converter_fn from swh.journal.serializers import ( kafka_to_key, kafka_to_value ) +from swh.journal.writer.kafka import KafkaJournalWriter + +from swh.model.model import Content, Origin, BaseModel from .conftest import OBJECT_TYPE_KEYS def assert_written(consumer, kafka_prefix, expected_messages): consumed_objects = defaultdict(list) fetched_messages = 0 retries_left = 1000 while fetched_messages < expected_messages: if retries_left == 0: raise ValueError('Timed out fetching messages from kafka') msg = consumer.poll(timeout=0.01) if not msg: retries_left -= 1 continue error = msg.error() if error is not None: if error.fatal(): raise KafkaException(error) retries_left -= 1 continue fetched_messages += 1 consumed_objects[msg.topic()].append( (kafka_to_key(msg.key()), kafka_to_value(msg.value())) ) for (object_type, (key_name, objects)) in OBJECT_TYPE_KEYS.items(): topic = kafka_prefix + '.' + object_type (keys, values) = zip(*consumed_objects[topic]) if key_name: assert list(keys) == [object_[key_name] for object_ in objects] else: pass # TODO if object_type == 'origin_visit': for value in values: del value['visit'] elif object_type == 'content': for value in values: del value['ctime'] for object_ in objects: assert object_ in values def test_kafka_writer( kafka_prefix: str, kafka_server: Tuple[Popen, int], consumer: Consumer): kafka_prefix += '.swh.journal.objects' config = { 'brokers': ['localhost:%d' % kafka_server[1]], 'client_id': 'kafka_writer', 'prefix': kafka_prefix, 'producer_config': { 'message.max.bytes': 100000000, } } writer = KafkaJournalWriter(**config) expected_messages = 0 for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): for (num, object_) in enumerate(objects): if object_type == 'origin_visit': object_ = {**object_, 'visit': num} if object_type == 'content': object_ = {**object_, 'ctime': datetime.datetime.now()} writer.write_addition(object_type, object_) expected_messages += 1 assert_written(consumer, kafka_prefix, expected_messages) def test_storage_direct_writer( kafka_prefix: str, kafka_server: Tuple[Popen, int], consumer: Consumer): kafka_prefix += '.swh.journal.objects' writer_config = { 'cls': 'kafka', 'brokers': ['localhost:%d' % kafka_server[1]], 'client_id': 'kafka_writer', 'prefix': kafka_prefix, 'producer_config': { 'message.max.bytes': 100000000, } } storage_config = { 'cls': 'pipeline', 'steps': [ - {'cls': 'validate'}, {'cls': 'memory', 'journal_writer': writer_config}, ] } storage = get_storage(**storage_config) expected_messages = 0 for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): method = getattr(storage, object_type + '_add') if object_type in ('content', 'directory', 'revision', 'release', 'snapshot', 'origin'): + objects_: List[BaseModel] if object_type == 'content': - objects = [{**obj, 'data': b''} for obj in objects] - method(objects) + objects_ = [ + Content.from_dict({ + **obj, 'data': b''}) + for obj in objects + ] + else: + objects_ = [ + object_converter_fn[object_type](obj) + for obj in objects + ] + method(objects_) expected_messages += len(objects) elif object_type in ('origin_visit',): for object_ in objects: object_ = object_.copy() origin_url = object_.pop('origin') - storage.origin_add_one({'url': origin_url}) - visit = method(origin=origin_url, date=object_.pop('date'), + storage.origin_add_one(Origin(url=origin_url)) + visit = method(origin_url, date=object_.pop('date'), type=object_.pop('type')) expected_messages += 1 - visit_id = visit['visit'] - storage.origin_visit_update(origin_url, visit_id, **object_) + storage.origin_visit_update(origin_url, visit.visit, **object_) expected_messages += 1 else: assert False, object_type assert_written(consumer, kafka_prefix, expected_messages) diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py index acb614e..3865da0 100644 --- a/swh/journal/tests/test_replay.py +++ b/swh/journal/tests/test_replay.py @@ -1,246 +1,400 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import functools +import logging import random from subprocess import Popen -from typing import Tuple +from typing import Dict, Tuple import dateutil from confluent_kafka import Producer from hypothesis import strategies, given, settings import pytest from swh.storage import get_storage from swh.journal.client import JournalClient from swh.journal.serializers import key_to_kafka, value_to_kafka from swh.journal.replay import process_replay_objects, is_hash_in_bytearray +from swh.model.hashutil import hash_to_hex +from swh.model.model import Content -from .conftest import OBJECT_TYPE_KEYS +from .conftest import OBJECT_TYPE_KEYS, DUPLICATE_CONTENTS from .utils import MockedJournalClient, MockedKafkaWriter storage_config = { 'cls': 'pipeline', 'steps': [ - {'cls': 'validate'}, {'cls': 'memory'}, ] } +def make_topic(kafka_prefix: str, object_type: str) -> str: + return kafka_prefix + '.' + object_type + + def test_storage_play( kafka_prefix: str, kafka_consumer_group: str, - kafka_server: Tuple[Popen, int]): + kafka_server: Tuple[Popen, int], + caplog): + """Optimal replayer scenario. + + This: + - writes objects to the topic + - replayer consumes objects from the topic and replay them + + """ + (_, port) = kafka_server + kafka_prefix += '.swh.journal.objects' + + storage = get_storage(**storage_config) + + producer = Producer({ + 'bootstrap.servers': 'localhost:{}'.format(port), + 'client.id': 'test producer', + 'acks': 'all', + }) + + now = datetime.datetime.now(tz=datetime.timezone.utc) + + # Fill Kafka + nb_sent = 0 + nb_visits = 0 + for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): + topic = make_topic(kafka_prefix, object_type) + for object_ in objects: + key = bytes(random.randint(0, 255) for _ in range(40)) + object_ = object_.copy() + if object_type == 'content': + object_['ctime'] = now + elif object_type == 'origin_visit': + nb_visits += 1 + object_['visit'] = nb_visits + producer.produce( + topic=topic, key=key_to_kafka(key), + value=value_to_kafka(object_), + ) + nb_sent += 1 + + producer.flush() + + caplog.set_level(logging.ERROR, 'swh.journal.replay') + # Fill the storage from Kafka + replayer = JournalClient( + brokers='localhost:%d' % kafka_server[1], + group_id=kafka_consumer_group, + prefix=kafka_prefix, + stop_after_objects=nb_sent, + ) + worker_fn = functools.partial(process_replay_objects, storage=storage) + nb_inserted = 0 + while nb_inserted < nb_sent: + nb_inserted += replayer.process(worker_fn) + assert nb_sent == nb_inserted + + # Check the objects were actually inserted in the storage + assert OBJECT_TYPE_KEYS['revision'][1] == \ + list(storage.revision_get( + [rev['id'] for rev in OBJECT_TYPE_KEYS['revision'][1]])) + assert OBJECT_TYPE_KEYS['release'][1] == \ + list(storage.release_get( + [rel['id'] for rel in OBJECT_TYPE_KEYS['release'][1]])) + + origins = list(storage.origin_get( + [orig for orig in OBJECT_TYPE_KEYS['origin'][1]])) + assert OBJECT_TYPE_KEYS['origin'][1] == \ + [{'url': orig['url']} for orig in origins] + for origin in origins: + origin_url = origin['url'] + expected_visits = [ + { + **visit, + 'origin': origin_url, + 'date': dateutil.parser.parse(visit['date']), + } + for visit in OBJECT_TYPE_KEYS['origin_visit'][1] + if visit['origin'] == origin['url'] + ] + actual_visits = list(storage.origin_visit_get( + origin_url)) + for visit in actual_visits: + del visit['visit'] # opaque identifier + assert expected_visits == actual_visits + + input_contents = OBJECT_TYPE_KEYS['content'][1] + contents = storage.content_get_metadata( + [cont['sha1'] for cont in input_contents]) + assert len(contents) == len(input_contents) + assert contents == {cont['sha1']: [cont] for cont in input_contents} + + collision = 0 + for record in caplog.records: + logtext = record.getMessage() + if 'Colliding contents:' in logtext: + collision += 1 + + assert collision == 0, "No collision should be detected" + + +def test_storage_play_with_collision( + kafka_prefix: str, + kafka_consumer_group: str, + kafka_server: Tuple[Popen, int], + caplog): + """Another replayer scenario with collisions. + + This: + - writes objects to the topic, including colliding contents + - replayer consumes objects from the topic and replay them + - This drops the colliding contents from the replay when detected + + """ (_, port) = kafka_server kafka_prefix += '.swh.journal.objects' storage = get_storage(**storage_config) producer = Producer({ 'bootstrap.servers': 'localhost:{}'.format(port), 'client.id': 'test producer', 'enable.idempotence': 'true', }) now = datetime.datetime.now(tz=datetime.timezone.utc) # Fill Kafka nb_sent = 0 nb_visits = 0 for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): - topic = kafka_prefix + '.' + object_type + topic = make_topic(kafka_prefix, object_type) for object_ in objects: key = bytes(random.randint(0, 255) for _ in range(40)) object_ = object_.copy() if object_type == 'content': object_['ctime'] = now elif object_type == 'origin_visit': nb_visits += 1 object_['visit'] = nb_visits producer.produce( topic=topic, key=key_to_kafka(key), value=value_to_kafka(object_), ) nb_sent += 1 + # Create collision in input data + # They are not written in the destination + for content in DUPLICATE_CONTENTS: + topic = make_topic(kafka_prefix, 'content') + producer.produce( + topic=topic, key=key_to_kafka(key), + value=value_to_kafka(content), + ) + + nb_sent += 1 + producer.flush() + caplog.set_level(logging.ERROR, 'swh.journal.replay') # Fill the storage from Kafka - config = { - 'brokers': 'localhost:%d' % kafka_server[1], - 'group_id': kafka_consumer_group, - 'prefix': kafka_prefix, - 'max_messages': nb_sent, - } - replayer = JournalClient(**config) + replayer = JournalClient( + brokers='localhost:%d' % kafka_server[1], + group_id=kafka_consumer_group, + prefix=kafka_prefix, + stop_after_objects=nb_sent, + ) worker_fn = functools.partial(process_replay_objects, storage=storage) nb_inserted = 0 while nb_inserted < nb_sent: nb_inserted += replayer.process(worker_fn) assert nb_sent == nb_inserted # Check the objects were actually inserted in the storage assert OBJECT_TYPE_KEYS['revision'][1] == \ list(storage.revision_get( [rev['id'] for rev in OBJECT_TYPE_KEYS['revision'][1]])) assert OBJECT_TYPE_KEYS['release'][1] == \ list(storage.release_get( [rel['id'] for rel in OBJECT_TYPE_KEYS['release'][1]])) origins = list(storage.origin_get( [orig for orig in OBJECT_TYPE_KEYS['origin'][1]])) assert OBJECT_TYPE_KEYS['origin'][1] == \ [{'url': orig['url']} for orig in origins] for origin in origins: origin_url = origin['url'] expected_visits = [ { **visit, 'origin': origin_url, 'date': dateutil.parser.parse(visit['date']), } for visit in OBJECT_TYPE_KEYS['origin_visit'][1] if visit['origin'] == origin['url'] ] actual_visits = list(storage.origin_visit_get( origin_url)) for visit in actual_visits: del visit['visit'] # opaque identifier assert expected_visits == actual_visits input_contents = OBJECT_TYPE_KEYS['content'][1] contents = storage.content_get_metadata( [cont['sha1'] for cont in input_contents]) assert len(contents) == len(input_contents) assert contents == {cont['sha1']: [cont] for cont in input_contents} + nb_collisions = 0 + + actual_collision: Dict + for record in caplog.records: + logtext = record.getMessage() + if 'Collision detected:' in logtext: + nb_collisions += 1 + actual_collision = record.args['collision'] + + assert nb_collisions == 1, "1 collision should be detected" + + algo = 'sha1' + assert actual_collision['algo'] == algo + expected_colliding_hash = hash_to_hex(DUPLICATE_CONTENTS[0][algo]) + assert actual_collision['hash'] == expected_colliding_hash + + actual_colliding_hashes = actual_collision['objects'] + assert len(actual_colliding_hashes) == len(DUPLICATE_CONTENTS) + for content in DUPLICATE_CONTENTS: + expected_content_hashes = { + k: hash_to_hex(v) + for k, v in Content.from_dict(content).hashes().items() + } + assert expected_content_hashes in actual_colliding_hashes + def _test_write_replay_origin_visit(visits): """Helper function to write tests for origin_visit. Each visit (a dict) given in the 'visits' argument will be sent to a (mocked) kafka queue, which a in-memory-storage backed replayer is listening to. Check that corresponding origin visits entities are present in the storage and have correct values. """ queue = [] replayer = MockedJournalClient(queue) writer = MockedKafkaWriter(queue) # Note that flipping the order of these two insertions will crash # the test, because the legacy origin_format does not allow to create # the origin when needed (type is missing) writer.send('origin', 'foo', { 'url': 'http://example.com/', 'type': 'git', }) for visit in visits: writer.send('origin_visit', 'foo', visit) queue_size = len(queue) - assert replayer.max_messages == 0 - replayer.max_messages = queue_size + assert replayer.stop_after_objects is None + replayer.stop_after_objects = queue_size storage = get_storage(**storage_config) worker_fn = functools.partial(process_replay_objects, storage=storage) - nb_messages = 0 - while nb_messages < queue_size: - nb_messages += replayer.process(worker_fn) + + replayer.process(worker_fn) actual_visits = list(storage.origin_visit_get('http://example.com/')) assert len(actual_visits) == len(visits), actual_visits for vin, vout in zip(visits, actual_visits): vin = vin.copy() vout = vout.copy() assert vout.pop('origin') == 'http://example.com/' vin.pop('origin') vin.setdefault('type', 'git') vin.setdefault('metadata', None) assert vin == vout def test_write_replay_origin_visit(): """Test origin_visit when the 'origin' is just a string.""" now = datetime.datetime.now() visits = [{ 'visit': 1, 'origin': 'http://example.com/', 'date': now, 'type': 'git', 'status': 'partial', 'snapshot': None, }] _test_write_replay_origin_visit(visits) def test_write_replay_legacy_origin_visit1(): """Test origin_visit when there is no type.""" now = datetime.datetime.now() visits = [{ 'visit': 1, 'origin': 'http://example.com/', 'date': now, 'status': 'partial', 'snapshot': None, }] with pytest.raises(ValueError, match='too old'): _test_write_replay_origin_visit(visits) def test_write_replay_legacy_origin_visit2(): """Test origin_visit when 'type' is missing from the visit, but not from the origin.""" now = datetime.datetime.now() visits = [{ 'visit': 1, 'origin': { 'url': 'http://example.com/', 'type': 'git', }, 'date': now, 'type': 'git', 'status': 'partial', 'snapshot': None, }] _test_write_replay_origin_visit(visits) def test_write_replay_legacy_origin_visit3(): """Test origin_visit when the origin is a dict""" now = datetime.datetime.now() visits = [{ 'visit': 1, 'origin': { 'url': 'http://example.com/', }, 'date': now, 'type': 'git', 'status': 'partial', 'snapshot': None, }] _test_write_replay_origin_visit(visits) hash_strategy = strategies.binary(min_size=20, max_size=20) @settings(max_examples=500) @given(strategies.sets(hash_strategy, min_size=0, max_size=500), strategies.sets(hash_strategy, min_size=10)) def test_is_hash_in_bytearray(haystack, needles): array = b''.join(sorted(haystack)) needles |= haystack # Exhaustively test for all objects in the array for needle in needles: assert is_hash_in_bytearray(needle, array, len(haystack)) == \ (needle in haystack) diff --git a/swh/journal/tests/test_write_replay.py b/swh/journal/tests/test_write_replay.py index e417627..d096da6 100644 --- a/swh/journal/tests/test_write_replay.py +++ b/swh/journal/tests/test_write_replay.py @@ -1,158 +1,161 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import functools from unittest.mock import patch import attr from hypothesis import given, settings, HealthCheck from hypothesis.strategies import lists -from swh.model.hypothesis_strategies import object_dicts +from swh.model.hypothesis_strategies import ( + object_dicts, present_contents +) +from swh.model.model import Origin from swh.storage import get_storage, HashCollision -from swh.journal.replay import process_replay_objects -from swh.journal.replay import process_replay_objects_content +from swh.journal.replay import ( + process_replay_objects, process_replay_objects_content, object_converter_fn +) from .utils import MockedJournalClient, MockedKafkaWriter storage_config = { - 'cls': 'pipeline', - 'steps': [ - {'cls': 'validate'}, - {'cls': 'memory', 'journal_writer': {'cls': 'memory'}}, - ] + 'cls': 'memory', + 'journal_writer': {'cls': 'memory'}, } def empty_person_name_email(rev_or_rel): """Empties the 'name' and 'email' fields of the author/committer fields of a revision or release; leaving only the fullname.""" if getattr(rev_or_rel, 'author', None): rev_or_rel = attr.evolve( rev_or_rel, author=attr.evolve( rev_or_rel.author, name=b'', email=b'', ) ) if getattr(rev_or_rel, 'committer', None): rev_or_rel = attr.evolve( rev_or_rel, committer=attr.evolve( rev_or_rel.committer, name=b'', email=b'', ) ) return rev_or_rel @given(lists(object_dicts(), min_size=1)) @settings(suppress_health_check=[HealthCheck.too_slow]) def test_write_replay_same_order_batches(objects): queue = [] replayer = MockedJournalClient(queue) with patch('swh.journal.writer.inmemory.InMemoryJournalWriter', return_value=MockedKafkaWriter(queue)): storage1 = get_storage(**storage_config) + # Write objects to storage1 for (obj_type, obj) in objects: - obj = obj.copy() + if obj_type == 'content' and obj.get('status') == 'absent': + obj_type = 'skipped_content' + + obj = object_converter_fn[obj_type](obj) + if obj_type == 'origin_visit': - storage1.origin_add_one({'url': obj['origin']}) + storage1.origin_add_one(Origin(url=obj.origin)) storage1.origin_visit_upsert([obj]) else: - if obj_type == 'content' and obj.get('status') == 'absent': - obj_type = 'skipped_content' method = getattr(storage1, obj_type + '_add') try: method([obj]) except HashCollision: pass + # Bail out early if we didn't insert any relevant objects... queue_size = len(queue) - assert replayer.max_messages == 0 - replayer.max_messages = queue_size + assert queue_size != 0, "No test objects found; hypothesis strategy bug?" + + assert replayer.stop_after_objects is None + replayer.stop_after_objects = queue_size storage2 = get_storage(**storage_config) worker_fn = functools.partial(process_replay_objects, storage=storage2) - nb_messages = 0 - while nb_messages < queue_size: - nb_messages += replayer.process(worker_fn) + + replayer.process(worker_fn) assert replayer.consumer.committed for attr_name in ('_contents', '_directories', '_snapshots', '_origin_visits', '_origins'): assert getattr(storage1, attr_name) == getattr(storage2, attr_name), \ attr_name # When hypothesis generates a revision and a release with same # author (or committer) fullname but different name or email, then # the storage will use the first name/email it sees. # This first one will be either the one from the revision or the release, # and since there is no order guarantees, storage2 has 1/2 chance of # not seeing the same order as storage1, therefore we need to strip # them out before comparing. for attr_name in ('_revisions', '_releases'): items1 = {k: empty_person_name_email(v) for (k, v) in getattr(storage1, attr_name).items()} items2 = {k: empty_person_name_email(v) for (k, v) in getattr(storage2, attr_name).items()} assert items1 == items2, attr_name # TODO: add test for hash collision -@given(lists(object_dicts(), min_size=1)) +@given(lists(present_contents(), min_size=1)) @settings(suppress_health_check=[HealthCheck.too_slow]) def test_write_replay_content(objects): queue = [] replayer = MockedJournalClient(queue) with patch('swh.journal.writer.inmemory.InMemoryJournalWriter', return_value=MockedKafkaWriter(queue)): storage1 = get_storage(**storage_config) contents = [] - for (obj_type, obj) in objects: - obj = obj.copy() - if obj_type == 'content': - # avoid hash collision - if not storage1.content_find(obj): - if obj.get('status') != 'absent': - storage1.content_add([obj]) - contents.append(obj) + for obj in objects: + storage1.content_add([obj]) + contents.append(obj) + # Bail out early if we didn't insert any relevant objects... queue_size = len(queue) - assert replayer.max_messages == 0 - replayer.max_messages = queue_size + assert queue_size != 0, "No test objects found; hypothesis strategy bug?" + + assert replayer.stop_after_objects is None + replayer.stop_after_objects = queue_size storage2 = get_storage(**storage_config) objstorage1 = storage1.objstorage.objstorage objstorage2 = storage2.objstorage.objstorage worker_fn = functools.partial(process_replay_objects_content, src=objstorage1, dst=objstorage2) - nb_messages = 0 - while nb_messages < queue_size: - nb_messages += replayer.process(worker_fn) + + replayer.process(worker_fn) # only content with status visible will be copied in storage2 expected_objstorage_state = { - c['sha1']: c['data'] for c in contents if c['status'] == 'visible' + c.sha1: c.data for c in contents if c.status == 'visible' } assert expected_objstorage_state == objstorage2.state diff --git a/swh/journal/tests/utils.py b/swh/journal/tests/utils.py index 49f7b1d..0b8880e 100644 --- a/swh/journal/tests/utils.py +++ b/swh/journal/tests/utils.py @@ -1,75 +1,76 @@ from swh.journal.client import JournalClient, ACCEPTED_OBJECT_TYPES from swh.journal.writer.kafka import KafkaJournalWriter from swh.journal.serializers import (kafka_to_value, key_to_kafka, value_to_kafka) class FakeKafkaMessage: def __init__(self, topic, key, value): self._topic = topic self._key = key_to_kafka(key) self._value = value_to_kafka(value) def topic(self): return self._topic def value(self): return self._value def key(self): return self._key def error(self): return None class MockedKafkaWriter(KafkaJournalWriter): def __init__(self, queue): self._prefix = 'prefix' self.queue = queue def send(self, topic, key, value): msg = FakeKafkaMessage(topic=topic, key=key, value=value) self.queue.append(msg) def flush(self): pass class MockedKafkaConsumer: """Mimic the confluent_kafka.Consumer API, producing the messages stored in `queue`. You're only allowed to subscribe to topics in which the queue has messages. """ def __init__(self, queue): self.queue = queue self.committed = False def consume(self, num_messages, timeout=None): L = self.queue[0:num_messages] self.queue[0:num_messages] = [] return L def commit(self): if self.queue == []: self.committed = True def list_topics(self, timeout=None): return set(message.topic() for message in self.queue) def subscribe(self, topics): unknown_topics = set(topics) - self.list_topics() if unknown_topics: raise ValueError('Unknown topics %s' % ', '.join(unknown_topics)) class MockedJournalClient(JournalClient): def __init__(self, queue, object_types=ACCEPTED_OBJECT_TYPES): self._object_types = object_types self.consumer = MockedKafkaConsumer(queue) - self.process_timeout = 0 - self.max_messages = 0 + self.process_timeout = None + self.stop_after_objects = None self.value_deserializer = kafka_to_value self.stop_on_eof = False + self.batch_size = 200 diff --git a/swh/journal/writer/kafka.py b/swh/journal/writer/kafka.py index d4728fb..648d20a 100644 --- a/swh/journal/writer/kafka.py +++ b/swh/journal/writer/kafka.py @@ -1,114 +1,115 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from confluent_kafka import Producer, KafkaException from swh.model.hashutil import DEFAULT_ALGORITHMS from swh.model.model import BaseModel from swh.journal.serializers import key_to_kafka, value_to_kafka logger = logging.getLogger(__name__) class KafkaJournalWriter: """This class is instantiated and used by swh-storage to write incoming new objects to Kafka before adding them to the storage backend (eg. postgresql) itself.""" def __init__(self, brokers, prefix, client_id, producer_config=None): self._prefix = prefix if isinstance(brokers, str): brokers = [brokers] if not producer_config: producer_config = {} self.producer = Producer({ 'bootstrap.servers': ','.join(brokers), 'client.id': client_id, 'on_delivery': self._on_delivery, 'error_cb': self._error_cb, 'logger': logger, - 'enable.idempotence': 'true', + 'acks': 'all', **producer_config, }) def _error_cb(self, error): if error.fatal(): raise KafkaException(error) logger.info('Received non-fatal kafka error: %s', error) def _on_delivery(self, error, message): if error is not None: self._error_cb(error) def send(self, topic, key, value): self.producer.produce( topic=topic, key=key_to_kafka(key), value=value_to_kafka(value), ) # Need to service the callbacks regularly by calling poll self.producer.poll(0) def flush(self): self.producer.flush() def _get_key(self, object_type, object_): if object_type in ('revision', 'release', 'directory', 'snapshot'): return object_['id'] elif object_type == 'content': return object_['sha1'] # TODO: use a dict of hashes elif object_type == 'skipped_content': return { hash: object_[hash] for hash in DEFAULT_ALGORITHMS } elif object_type == 'origin': return {'url': object_['url']} elif object_type == 'origin_visit': return { 'origin': object_['origin'], 'date': str(object_['date']), } else: raise ValueError('Unknown object type: %s.' % object_type) def _sanitize_object(self, object_type, object_): if object_type == 'origin_visit': return { **object_, 'date': str(object_['date']), } elif object_type == 'origin': assert 'id' not in object_ return object_ - def write_addition(self, object_type, object_, flush=True): + def _write_addition(self, object_type, object_): """Write a single object to the journal""" if isinstance(object_, BaseModel): object_ = object_.to_dict() topic = '%s.%s' % (self._prefix, object_type) key = self._get_key(object_type, object_) - object_ = self._sanitize_object(object_type, object_) - logger.debug('topic: %s, key: %s, value: %s', topic, key, object_) - self.send(topic, key=key, value=object_) + dict_ = self._sanitize_object(object_type, object_) + logger.debug('topic: %s, key: %s, value: %s', topic, key, dict_) + self.send(topic, key=key, value=dict_) - if flush: - self.flush() + def write_addition(self, object_type, object_): + """Write a single object to the journal""" + self._write_addition(object_type, object_) + self.flush() write_update = write_addition - def write_additions(self, object_type, objects, flush=True): + def write_additions(self, object_type, objects): """Write a set of objects to the journal""" for object_ in objects: - self.write_addition(object_type, object_, flush=False) + self._write_addition(object_type, object_) - if flush: - self.flush() + self.flush() diff --git a/version.txt b/version.txt index 3ae0241..c514853 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.26-0-g06688ba \ No newline at end of file +v0.0.27-0-gd3d5b79 \ No newline at end of file