diff --git a/PKG-INFO b/PKG-INFO index cd3a11f..b9b3a06 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,70 +1,70 @@ Metadata-Version: 2.1 Name: swh.journal -Version: 0.0.25 +Version: 0.0.26 Summary: Software Heritage Journal utilities Home-page: https://forge.softwareheritage.org/diffusion/DJNL/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-journal Description: swh-journal =========== Persistent logger of changes to the archive, with publish-subscribe support. See the [documentation](https://docs.softwareheritage.org/devel/swh-journal/index.html#software-heritage-journal) for more details. # Local test As a pre-requisite, you need a kakfa installation path. The following target will take care of this: ``` make install ``` Then, provided you are in the right virtual environment as described in the [swh getting-started](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup): ``` pytest ``` or: ``` tox ``` # Running ## publisher Command: ``` $ swh-journal --config-file ~/.config/swh/journal/publisher.yml \ publisher ``` # Auto-completion To have the completion, add the following in your ~/.virtualenvs/swh/bin/postactivate: ``` eval "$(_SWH_JOURNAL_COMPLETE=$autocomplete_cmd swh-journal)" ``` Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/requirements-swh.txt b/requirements-swh.txt index 4701c33..85f1a2c 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,3 +1,3 @@ swh.core[db,http] >= 0.0.60 swh.model >= 0.0.40 -swh.storage >= 0.0.156 +swh.storage >= 0.0.172 diff --git a/requirements.txt b/requirements.txt index 184c591..a83549b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,7 @@ # Add here external Python modules dependencies, one per line. Module names # should match https://pypi.python.org/pypi names. For the full spec or # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html confluent-kafka msgpack +tenacity vcversioner diff --git a/swh.journal.egg-info/PKG-INFO b/swh.journal.egg-info/PKG-INFO index cd3a11f..b9b3a06 100644 --- a/swh.journal.egg-info/PKG-INFO +++ b/swh.journal.egg-info/PKG-INFO @@ -1,70 +1,70 @@ Metadata-Version: 2.1 Name: swh.journal -Version: 0.0.25 +Version: 0.0.26 Summary: Software Heritage Journal utilities Home-page: https://forge.softwareheritage.org/diffusion/DJNL/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-journal Description: swh-journal =========== Persistent logger of changes to the archive, with publish-subscribe support. See the [documentation](https://docs.softwareheritage.org/devel/swh-journal/index.html#software-heritage-journal) for more details. # Local test As a pre-requisite, you need a kakfa installation path. The following target will take care of this: ``` make install ``` Then, provided you are in the right virtual environment as described in the [swh getting-started](https://docs.softwareheritage.org/devel/developer-setup.html#developer-setup): ``` pytest ``` or: ``` tox ``` # Running ## publisher Command: ``` $ swh-journal --config-file ~/.config/swh/journal/publisher.yml \ publisher ``` # Auto-completion To have the completion, add the following in your ~/.virtualenvs/swh/bin/postactivate: ``` eval "$(_SWH_JOURNAL_COMPLETE=$autocomplete_cmd swh-journal)" ``` Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh.journal.egg-info/requires.txt b/swh.journal.egg-info/requires.txt index 00b03b6..c7288f6 100644 --- a/swh.journal.egg-info/requires.txt +++ b/swh.journal.egg-info/requires.txt @@ -1,12 +1,13 @@ confluent-kafka msgpack +tenacity vcversioner swh.core[db,http]>=0.0.60 swh.model>=0.0.40 -swh.storage>=0.0.156 +swh.storage>=0.0.172 [testing] pytest swh.model>=0.0.34 pytest-kafka hypothesis diff --git a/swh/journal/cli.py b/swh/journal/cli.py index 13455d6..3be79e4 100644 --- a/swh/journal/cli.py +++ b/swh/journal/cli.py @@ -1,263 +1,271 @@ # Copyright (C) 2016-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import functools import logging import mmap import os import time import click try: from systemd.daemon import notify except ImportError: notify = None from swh.core import config from swh.core.cli import CONTEXT_SETTINGS from swh.model.model import SHA1_SIZE from swh.storage import get_storage from swh.objstorage import get_objstorage from swh.journal.client import JournalClient from swh.journal.replay import is_hash_in_bytearray from swh.journal.replay import process_replay_objects from swh.journal.replay import process_replay_objects_content from swh.journal.backfill import JournalBackfiller @click.group(name='journal', context_settings=CONTEXT_SETTINGS) @click.option('--config-file', '-C', default=None, type=click.Path(exists=True, dir_okay=False,), help="Configuration file.") @click.pass_context def cli(ctx, config_file): """Software Heritage Journal tools. The journal is a persistent logger of changes to the archive, with publish-subscribe support. """ if not config_file: config_file = os.environ.get('SWH_CONFIG_FILENAME') if config_file: if not os.path.exists(config_file): raise ValueError('%s does not exist' % config_file) conf = config.read(config_file) else: conf = {} ctx.ensure_object(dict) ctx.obj['config'] = conf def get_journal_client(ctx, **kwargs): conf = ctx.obj['config'].get('journal', {}) conf.update({k: v for (k, v) in kwargs.items() if v not in (None, ())}) if not conf.get('brokers'): ctx.fail('You must specify at least one kafka broker.') if not isinstance(conf['brokers'], (list, tuple)): conf['brokers'] = [conf['brokers']] return JournalClient(**conf) @cli.command() @click.option('--max-messages', '-m', default=None, type=int, help='Maximum number of objects to replay. Default is to ' 'run forever.') @click.option('--broker', 'brokers', type=str, multiple=True, help='Kafka broker to connect to. ' '(deprecated, use the config file instead)') @click.option('--prefix', type=str, default=None, help='Prefix of Kafka topic names to read from. ' '(deprecated, use the config file instead)') @click.option('--group-id', type=str, help='Name of the group id for reading from Kafka. ' '(deprecated, use the config file instead)') @click.pass_context def replay(ctx, brokers, prefix, group_id, max_messages): """Fill a Storage by reading a Journal. There can be several 'replayers' filling a Storage as long as they use the same `group-id`. """ logger = logging.getLogger(__name__) conf = ctx.obj['config'] try: storage = get_storage(**conf.pop('storage')) except KeyError: ctx.fail('You must have a storage configured in your config file.') client = get_journal_client( ctx, brokers=brokers, prefix=prefix, group_id=group_id, max_messages=max_messages) worker_fn = functools.partial(process_replay_objects, storage=storage) if notify: notify('READY=1') try: nb_messages = 0 last_log_time = 0 while not max_messages or nb_messages < max_messages: nb_messages += client.process(worker_fn) if notify: notify('WATCHDOG=1') if time.time() - last_log_time >= 60: # Log at most once per minute. logger.info('Processed %d messages.' % nb_messages) last_log_time = time.time() except KeyboardInterrupt: ctx.exit(0) else: print('Done.') finally: if notify: notify('STOPPING=1') client.close() @cli.command() @click.argument('object_type') @click.option('--start-object', default=None) @click.option('--end-object', default=None) @click.option('--dry-run', is_flag=True, default=False) @click.pass_context def backfiller(ctx, object_type, start_object, end_object, dry_run): """Run the backfiller The backfiller list objects from a Storage and produce journal entries from there. Typically used to rebuild a journal or compensate for missing objects in a journal (eg. due to a downtime of this later). The configuration file requires the following entries: - brokers: a list of kafka endpoints (the journal) in which entries will be added. - storage_dbconn: URL to connect to the storage DB. - prefix: the prefix of the topics (topics will be .). - client_id: the kafka client ID. """ conf = ctx.obj['config'] backfiller = JournalBackfiller(conf) if notify: notify('READY=1') try: backfiller.run( object_type=object_type, start_object=start_object, end_object=end_object, dry_run=dry_run) except KeyboardInterrupt: if notify: notify('STOPPING=1') ctx.exit(0) @cli.command('content-replay') @click.option('--max-messages', '-m', default=None, type=int, help='Maximum number of objects to replay. Default is to ' 'run forever.') @click.option('--broker', 'brokers', type=str, multiple=True, help='Kafka broker to connect to.' '(deprecated, use the config file instead)') @click.option('--prefix', type=str, default=None, help='Prefix of Kafka topic names to read from.' '(deprecated, use the config file instead)') @click.option('--group-id', type=str, help='Name of the group id for reading from Kafka.' '(deprecated, use the config file instead)') @click.option('--exclude-sha1-file', default=None, type=click.File('rb'), help='File containing a sorted array of hashes to be excluded.') +@click.option('--check-dst/--no-check-dst', default=True, + help='Check whether the destination contains the object before ' + 'copying.') @click.pass_context def content_replay(ctx, max_messages, - brokers, prefix, group_id, exclude_sha1_file): + brokers, prefix, group_id, exclude_sha1_file, check_dst): """Fill a destination Object Storage (typically a mirror) by reading a Journal and retrieving objects from an existing source ObjStorage. There can be several 'replayers' filling a given ObjStorage as long as they - use the same `group-id`. + use the same `group-id`. You can use the `KAFKA_GROUP_INSTANCE_ID` + environment variable to use KIP-345 static group membership. This service retrieves object ids to copy from the 'content' topic. It will only copy object's content if the object's description in the kafka nmessage has the status:visible set. `--exclude-sha1-file` may be used to exclude some hashes to speed-up the replay in case many of the contents are already in the destination objstorage. It must contain a concatenation of all (sha1) hashes, and it must be sorted. This file will not be fully loaded into memory at any given time, so it can be arbitrarily large. + + `--check-dst` sets whether the replayer should check in the destination + ObjStorage before copying an object. You can turn that off if you know + you're copying to an empty ObjStorage. """ logger = logging.getLogger(__name__) conf = ctx.obj['config'] try: objstorage_src = get_objstorage(**conf.pop('objstorage_src')) except KeyError: ctx.fail('You must have a source objstorage configured in ' 'your config file.') try: objstorage_dst = get_objstorage(**conf.pop('objstorage_dst')) except KeyError: ctx.fail('You must have a destination objstorage configured ' 'in your config file.') if exclude_sha1_file: map_ = mmap.mmap(exclude_sha1_file.fileno(), 0, prot=mmap.PROT_READ) if map_.size() % SHA1_SIZE != 0: ctx.fail('--exclude-sha1 must link to a file whose size is an ' 'exact multiple of %d bytes.' % SHA1_SIZE) nb_excluded_hashes = int(map_.size()/SHA1_SIZE) def exclude_fn(obj): return is_hash_in_bytearray(obj['sha1'], map_, nb_excluded_hashes) else: exclude_fn = None client = get_journal_client( ctx, brokers=brokers, prefix=prefix, group_id=group_id, max_messages=max_messages, object_types=('content',)) - worker_fn = functools.partial(process_replay_objects_content, - src=objstorage_src, - dst=objstorage_dst, - exclude_fn=exclude_fn) + worker_fn = functools.partial( + process_replay_objects_content, + src=objstorage_src, dst=objstorage_dst, exclude_fn=exclude_fn, + check_dst=check_dst) if notify: notify('READY=1') try: nb_messages = 0 last_log_time = 0 while not max_messages or nb_messages < max_messages: nb_messages += client.process(worker_fn) if notify: notify('WATCHDOG=1') if time.time() - last_log_time >= 60: # Log at most once per minute. logger.info('Processed %d messages.' % nb_messages) last_log_time = time.time() except KeyboardInterrupt: ctx.exit(0) else: print('Done.') finally: if notify: notify('STOPPING=1') client.close() def main(): logging.basicConfig() return cli(auto_envvar_prefix='SWH_JOURNAL') if __name__ == '__main__': main() diff --git a/swh/journal/client.py b/swh/journal/client.py index 0d481b0..a7a7b68 100644 --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -1,194 +1,254 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import logging +import os import time from confluent_kafka import Consumer, KafkaException, KafkaError from .serializers import kafka_to_value from swh.journal import DEFAULT_PREFIX logger = logging.getLogger(__name__) rdkafka_logger = logging.getLogger(__name__ + '.rdkafka') # Only accepted offset reset policy accepted ACCEPTED_OFFSET_RESET = ['earliest', 'latest'] # Only accepted object types ACCEPTED_OBJECT_TYPES = [ 'content', 'directory', 'revision', 'release', 'snapshot', 'origin', 'origin_visit' ] # Errors that Kafka raises too often and are not useful; therefore they # we lower their log level to DEBUG instead of INFO. _SPAMMY_ERRORS = [ KafkaError._NO_OFFSET, ] def _error_cb(error): if error.fatal(): raise KafkaException(error) if error.code() in _SPAMMY_ERRORS: logger.debug('Received non-fatal kafka error: %s', error) else: logger.info('Received non-fatal kafka error: %s', error) def _on_commit(error, partitions): if error is not None: _error_cb(error) class JournalClient: """A base client for the Software Heritage journal. The current implementation of the journal uses Apache Kafka brokers to publish messages under a given topic prefix, with each object type using a specific topic under that prefix. If the 'prefix' argument is None (default value), it will take the default value 'swh.journal.objects'. Clients subscribe to events specific to each object type as listed in the `object_types` argument (if unset, defaults to all accepted object types). Clients can be sharded by setting the `group_id` to a common value across instances. The journal will share the message throughput across the nodes sharing the same group_id. Messages are processed by the `worker_fn` callback passed to the `process` method, in batches of maximum `max_messages`. Any other named argument is passed directly to KafkaConsumer(). """ def __init__( self, brokers, group_id, prefix=None, object_types=None, max_messages=0, process_timeout=0, auto_offset_reset='earliest', - **kwargs): + stop_on_eof=False, **kwargs): if prefix is None: prefix = DEFAULT_PREFIX if object_types is None: object_types = ACCEPTED_OBJECT_TYPES if auto_offset_reset not in ACCEPTED_OFFSET_RESET: raise ValueError( 'Option \'auto_offset_reset\' only accept %s, not %s' % (ACCEPTED_OFFSET_RESET, auto_offset_reset)) for object_type in object_types: if object_type not in ACCEPTED_OBJECT_TYPES: raise ValueError( 'Option \'object_types\' only accepts %s, not %s.' % (ACCEPTED_OBJECT_TYPES, object_type)) self.value_deserializer = kafka_to_value if isinstance(brokers, str): brokers = [brokers] debug_logging = rdkafka_logger.isEnabledFor(logging.DEBUG) if debug_logging and 'debug' not in kwargs: kwargs['debug'] = 'consumer' + # Static group instance id management + group_instance_id = os.environ.get('KAFKA_GROUP_INSTANCE_ID') + if group_instance_id: + kwargs['group.instance.id'] = group_instance_id + + if 'group.instance.id' in kwargs: + # When doing static consumer group membership, set a higher default + # session timeout. The session timeout is the duration after which + # the broker considers that a consumer has left the consumer group + # for good, and triggers a rebalance. Considering our current + # processing pattern, 10 minutes gives the consumer ample time to + # restart before that happens. + if 'session.timeout.ms' not in kwargs: + kwargs['session.timeout.ms'] = 10 * 60 * 1000 # 10 minutes + + if 'session.timeout.ms' in kwargs: + # When the session timeout is set, rdkafka requires the max poll + # interval to be set to a higher value; the max poll interval is + # rdkafka's way of figuring out whether the client's message + # processing thread has stalled: when the max poll interval lapses + # between two calls to consumer.poll(), rdkafka leaves the consumer + # group and terminates the connection to the brokers. + # + # We default to 1.5 times the session timeout + if 'max.poll.interval.ms' not in kwargs: + kwargs['max.poll.interval.ms'] = ( + kwargs['session.timeout.ms'] // 2 * 3 + ) + consumer_settings = { **kwargs, 'bootstrap.servers': ','.join(brokers), 'auto.offset.reset': auto_offset_reset, 'group.id': group_id, 'on_commit': _on_commit, 'error_cb': _error_cb, 'enable.auto.commit': False, 'logger': rdkafka_logger, } + self.stop_on_eof = stop_on_eof + if self.stop_on_eof: + consumer_settings['enable.partition.eof'] = True + logger.debug('Consumer settings: %s', consumer_settings) self.consumer = Consumer(consumer_settings) topics = ['%s.%s' % (prefix, object_type) for object_type in object_types] logger.debug('Upstream topics: %s', self.consumer.list_topics(timeout=10)) logger.debug('Subscribing to: %s', topics) self.consumer.subscribe(topics=topics) self.max_messages = max_messages self.process_timeout = process_timeout + self.eof_reached = set() self._object_types = object_types def process(self, worker_fn): """Polls Kafka for a batch of messages, and calls the worker_fn with these messages. Args: worker_fn Callable[Dict[str, List[dict]]]: Function called with the messages as argument. """ start_time = time.monotonic() nb_messages = 0 - objects = defaultdict(list) - while True: # timeout for message poll timeout = 1.0 elapsed = time.monotonic() - start_time if self.process_timeout: + # +0.01 to prevent busy-waiting on / spamming consumer.poll. + # consumer.consume() returns shortly before X expired + # (a matter of milliseconds), so after it returns a first + # time, it would then be called with a timeout in the order + # of milliseconds, therefore returning immediately, then be + # called again, etc. if elapsed + 0.01 >= self.process_timeout: break timeout = self.process_timeout - elapsed num_messages = 20 if self.max_messages: if nb_messages >= self.max_messages: break num_messages = min(num_messages, self.max_messages-nb_messages) messages = self.consumer.consume( timeout=timeout, num_messages=num_messages) if not messages: continue - for message in messages: - error = message.error() - if error is not None: + nb_processed, at_eof = self.handle_messages(messages, worker_fn) + nb_messages += nb_processed + if at_eof: + break + + return nb_messages + + def handle_messages(self, messages, worker_fn): + objects = defaultdict(list) + nb_processed = 0 + + for message in messages: + error = message.error() + if error is not None: + if error.code() == KafkaError._PARTITION_EOF: + self.eof_reached.add( + (message.topic(), message.partition()) + ) + else: _error_cb(error) - continue + continue - nb_messages += 1 + nb_processed += 1 - object_type = message.topic().split('.')[-1] - # Got a message from a topic we did not subscribe to. - assert object_type in self._object_types, object_type + object_type = message.topic().split('.')[-1] + # Got a message from a topic we did not subscribe to. + assert object_type in self._object_types, object_type - objects[object_type].append( - self.value_deserializer(message.value()) - ) + objects[object_type].append(self.deserialize_message(message)) - if objects: - worker_fn(dict(objects)) - objects.clear() + if objects: + worker_fn(dict(objects)) + self.consumer.commit() - self.consumer.commit() - return nb_messages + at_eof = (self.stop_on_eof and all( + (tp.topic, tp.partition) in self.eof_reached + for tp in self.consumer.assignment() + )) + + return nb_processed, at_eof + + def deserialize_message(self, message): + return self.value_deserializer(message.value()) def close(self): self.consumer.close() diff --git a/swh/journal/replay.py b/swh/journal/replay.py index c04be02..a654d02 100644 --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -1,418 +1,449 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy -from time import time import logging -from contextlib import contextmanager +from time import time +from typing import Callable, Dict, List, Optional try: from systemd.daemon import notify except ImportError: notify = None +from tenacity import retry, stop_after_attempt, wait_random_exponential + from swh.core.statsd import statsd from swh.model.identifiers import normalize_timestamp from swh.model.hashutil import hash_to_hex from swh.model.model import SHA1_SIZE -from swh.objstorage.objstorage import ID_HASH_ALGO +from swh.objstorage.objstorage import ( + ID_HASH_ALGO, ObjNotFoundError, ObjStorage, +) from swh.storage import HashCollision logger = logging.getLogger(__name__) GRAPH_OPERATIONS_METRIC = "swh_graph_replayer_operations_total" GRAPH_DURATION_METRIC = "swh_graph_replayer_duration_seconds" CONTENT_OPERATIONS_METRIC = "swh_content_replayer_operations_total" CONTENT_BYTES_METRIC = "swh_content_replayer_bytes" CONTENT_DURATION_METRIC = "swh_content_replayer_duration_seconds" def process_replay_objects(all_objects, *, storage): for (object_type, objects) in all_objects.items(): logger.debug("Inserting %s %s objects", len(objects), object_type) with statsd.timed(GRAPH_DURATION_METRIC, tags={'object_type': object_type}): _insert_objects(object_type, objects, storage) statsd.increment(GRAPH_OPERATIONS_METRIC, len(objects), tags={'object_type': object_type}) if notify: notify('WATCHDOG=1') def _fix_revision_pypi_empty_string(rev): """PyPI loader failed to encode empty strings as bytes, see: swh:1:rev:8f0095ee0664867055d03de9bcc8f95b91d8a2b9 or https://forge.softwareheritage.org/D1772 """ rev = { **rev, 'author': rev['author'].copy(), 'committer': rev['committer'].copy(), } if rev['author'].get('email') == '': rev['author']['email'] = b'' if rev['author'].get('name') == '': rev['author']['name'] = b'' if rev['committer'].get('email') == '': rev['committer']['email'] = b'' if rev['committer'].get('name') == '': rev['committer']['name'] = b'' return rev def _fix_revision_transplant_source(rev): if rev.get('metadata') and rev['metadata'].get('extra_headers'): rev = copy.deepcopy(rev) rev['metadata']['extra_headers'] = [ [key, value.encode('ascii')] if key == 'transplant_source' and isinstance(value, str) else [key, value] for (key, value) in rev['metadata']['extra_headers']] return rev def _check_date(date): """Returns whether the date can be represented in backends with sane limits on timestamps and timezones (resp. signed 64-bits and signed 16 bits), and that microseconds is valid (ie. between 0 and 10^6). """ if date is None: return True date = normalize_timestamp(date) return (-2**63 <= date['timestamp']['seconds'] < 2**63) \ and (0 <= date['timestamp']['microseconds'] < 10**6) \ and (-2**15 <= date['offset'] < 2**15) def _check_revision_date(rev): """Exclude revisions with invalid dates. See https://forge.softwareheritage.org/T1339""" return _check_date(rev['date']) and _check_date(rev['committer_date']) def _fix_revisions(revisions): good_revisions = [] for rev in revisions: rev = _fix_revision_pypi_empty_string(rev) rev = _fix_revision_transplant_source(rev) if not _check_revision_date(rev): logging.warning('Excluding revision (invalid date): %r', rev) continue if rev not in good_revisions: good_revisions.append(rev) return good_revisions def _fix_origin_visits(visits): good_visits = [] for visit in visits: visit = visit.copy() if 'type' not in visit: if isinstance(visit['origin'], dict) and 'type' in visit['origin']: # Very old version of the schema: visits did not have a type, # but their 'origin' field was a dict with a 'type' key. visit['type'] = visit['origin']['type'] else: # Very very old version of the schema: 'type' is missing, # so there is nothing we can do to fix it. raise ValueError('Got an origin_visit too old to be replayed.') if isinstance(visit['origin'], dict): # Old version of the schema: visit['origin'] was a dict. visit['origin'] = visit['origin']['url'] good_visits.append(visit) return good_visits def fix_objects(object_type, objects): """Converts a possibly old object from the journal to its current expected format. List of conversions: Empty author name/email in PyPI releases: >>> from pprint import pprint >>> date = { ... 'timestamp': { ... 'seconds': 1565096932, ... 'microseconds': 0, ... }, ... 'offset': 0, ... } >>> pprint(fix_objects('revision', [{ ... 'author': {'email': '', 'fullname': b'', 'name': ''}, ... 'committer': {'email': '', 'fullname': b'', 'name': ''}, ... 'date': date, ... 'committer_date': date, ... }])) [{'author': {'email': b'', 'fullname': b'', 'name': b''}, 'committer': {'email': b'', 'fullname': b'', 'name': b''}, 'committer_date': {'offset': 0, 'timestamp': {'microseconds': 0, 'seconds': 1565096932}}, 'date': {'offset': 0, 'timestamp': {'microseconds': 0, 'seconds': 1565096932}}}] Fix type of 'transplant_source' extra headers: >>> revs = fix_objects('revision', [{ ... 'author': {'email': '', 'fullname': b'', 'name': ''}, ... 'committer': {'email': '', 'fullname': b'', 'name': ''}, ... 'date': date, ... 'committer_date': date, ... 'metadata': { ... 'extra_headers': [ ... ['time_offset_seconds', b'-3600'], ... ['transplant_source', '29c154a012a70f49df983625090434587622b39e'] ... ]} ... }]) >>> pprint(revs[0]['metadata']['extra_headers']) [['time_offset_seconds', b'-3600'], ['transplant_source', b'29c154a012a70f49df983625090434587622b39e']] Filter out revisions with invalid dates: >>> from copy import deepcopy >>> invalid_date1 = deepcopy(date) >>> invalid_date1['timestamp']['microseconds'] = 1000000000 # > 10^6 >>> fix_objects('revision', [{ ... 'author': {'email': '', 'fullname': b'', 'name': b''}, ... 'committer': {'email': '', 'fullname': b'', 'name': b''}, ... 'date': invalid_date1, ... 'committer_date': date, ... }]) [] >>> invalid_date2 = deepcopy(date) >>> invalid_date2['timestamp']['seconds'] = 2**70 # > 10^63 >>> fix_objects('revision', [{ ... 'author': {'email': '', 'fullname': b'', 'name': b''}, ... 'committer': {'email': '', 'fullname': b'', 'name': b''}, ... 'date': invalid_date2, ... 'committer_date': date, ... }]) [] >>> invalid_date3 = deepcopy(date) >>> invalid_date3['offset'] = 2**20 # > 10^15 >>> fix_objects('revision', [{ ... 'author': {'email': '', 'fullname': b'', 'name': b''}, ... 'committer': {'email': '', 'fullname': b'', 'name': b''}, ... 'date': date, ... 'committer_date': invalid_date3, ... }]) [] `visit['origin']` is a dict instead of an URL: >>> pprint(fix_objects('origin_visit', [{ ... 'origin': {'url': 'http://foo'}, ... 'type': 'git', ... }])) [{'origin': 'http://foo', 'type': 'git'}] `visit['type']` is missing , but `origin['visit']['type']` exists: >>> pprint(fix_objects('origin_visit', [ ... {'origin': {'type': 'hg', 'url': 'http://foo'} ... }])) [{'origin': 'http://foo', 'type': 'hg'}] """ # noqa if object_type == 'revision': objects = _fix_revisions(objects) elif object_type == 'origin_visit': objects = _fix_origin_visits(objects) return objects def _insert_objects(object_type, objects, storage): objects = fix_objects(object_type, objects) if object_type == 'content': - # TODO: insert 'content' in batches - for object_ in objects: - try: - storage.content_add_metadata([object_]) - except HashCollision as e: - logger.error('Hash collision: %s', e.args) + try: + storage.skipped_content_add( + (obj for obj in objects if obj.get('status') == 'absent')) + except HashCollision as e: + logger.error('(SkippedContent) Hash collision: %s', e.args) + + try: + storage.content_add_metadata( + (obj for obj in objects if obj.get('status') != 'absent')) + except HashCollision as e: + logger.error('(Content) Hash collision: %s', e.args) + elif object_type in ('directory', 'revision', 'release', 'snapshot', 'origin'): # TODO: split batches that are too large for the storage # to handle? method = getattr(storage, object_type + '_add') method(objects) elif object_type == 'origin_visit': for visit in objects: storage.origin_add_one({'url': visit['origin']}) if 'metadata' not in visit: visit['metadata'] = None storage.origin_visit_upsert(objects) else: logger.warning('Received a series of %s, this should not happen', object_type) def is_hash_in_bytearray(hash_, array, nb_hashes, hash_size=SHA1_SIZE): """ Checks if the given hash is in the provided `array`. The array must be a *sorted* list of sha1 hashes, and contain `nb_hashes` hashes (so its size must by `nb_hashes*hash_size` bytes). Args: hash_ (bytes): the hash to look for array (bytes): a sorted concatenated array of hashes (may be of any type supporting slice indexing, eg. :class:`mmap.mmap`) nb_hashes (int): number of hashes in the array hash_size (int): size of a hash (defaults to 20, for SHA1) Example: >>> import os >>> hash1 = os.urandom(20) >>> hash2 = os.urandom(20) >>> hash3 = os.urandom(20) >>> array = b''.join(sorted([hash1, hash2])) >>> is_hash_in_bytearray(hash1, array, 2) True >>> is_hash_in_bytearray(hash2, array, 2) True >>> is_hash_in_bytearray(hash3, array, 2) False """ if len(hash_) != hash_size: raise ValueError('hash_ does not match the provided hash_size.') def get_hash(position): return array[position*hash_size:(position+1)*hash_size] # Regular dichotomy: left = 0 right = nb_hashes while left < right-1: middle = int((right+left)/2) pivot = get_hash(middle) if pivot == hash_: return True elif pivot < hash_: left = middle else: right = middle return get_hash(left) == hash_ -@contextmanager -def retry(max_retries): - lasterror = None - for i in range(max_retries): - try: - yield - break - except Exception as exc: - lasterror = exc - else: - raise lasterror - - -def copy_object(obj_id, src, dst, max_retries=3): +def copy_object(obj_id, src, dst): + hex_obj_id = hash_to_hex(obj_id) + obj = '' try: with statsd.timed(CONTENT_DURATION_METRIC, tags={'request': 'get'}): - with retry(max_retries): - obj = src.get(obj_id) - logger.debug('retrieved %s', hash_to_hex(obj_id)) + obj = src.get(obj_id) + logger.debug('retrieved %(obj_id)s', {'obj_id': hex_obj_id}) with statsd.timed(CONTENT_DURATION_METRIC, tags={'request': 'put'}): - with retry(max_retries): - dst.add(obj, obj_id=obj_id, check_presence=False) - logger.debug('copied %s', hash_to_hex(obj_id)) - statsd.increment(CONTENT_OPERATIONS_METRIC) + dst.add(obj, obj_id=obj_id, check_presence=False) + logger.debug('copied %(obj_id)s', {'obj_id': hex_obj_id}) statsd.increment(CONTENT_BYTES_METRIC, len(obj)) - except Exception: - obj = '' - logger.error('Failed to copy %s', hash_to_hex(obj_id)) + except Exception as exc: + logger.error('Failed to copy %(obj_id)s: %(exc)s', + {'obj_id': hex_obj_id, 'exc': str(exc)}) raise return len(obj) -def process_replay_objects_content(all_objects, *, src, dst, - exclude_fn=None): +@retry(stop=stop_after_attempt(3), + reraise=True, + wait=wait_random_exponential(multiplier=1, max=60)) +def obj_in_objstorage(obj_id, dst): + """Check if an object is already in an objstorage, tenaciously""" + return obj_id in dst + + +def process_replay_objects_content( + all_objects: Dict[str, List[dict]], + *, + src: ObjStorage, + dst: ObjStorage, + exclude_fn: Optional[Callable[[dict], bool]] = None, + check_dst: bool = True, +): """ Takes a list of records from Kafka (see :py:func:`swh.journal.client.JournalClient.process`) and copies them from the `src` objstorage to the `dst` objstorage, if: * `obj['status']` is `'visible'` * `exclude_fn(obj)` is `False` (if `exclude_fn` is provided) + * `obj['sha1'] not in dst` (if `check_dst` is True) Args: - all_objects Dict[str, List[dict]]: Objects passed by the Kafka client. - Most importantly, `all_objects['content'][*]['sha1']` is the - sha1 hash of each content + all_objects: Objects passed by the Kafka client. Most importantly, + `all_objects['content'][*]['sha1']` is the sha1 hash of each + content. src: An object storage (see :py:func:`swh.objstorage.get_objstorage`) dst: An object storage (see :py:func:`swh.objstorage.get_objstorage`) - exclude_fn Optional[Callable[dict, bool]]: Determines whether - an object should be copied. + exclude_fn: Determines whether an object should be copied. + check_dst: Determines whether we should check the destination + objstorage before copying. Example: >>> from swh.objstorage import get_objstorage >>> src = get_objstorage('memory', {}) >>> dst = get_objstorage('memory', {}) >>> id1 = src.add(b'foo bar') >>> id2 = src.add(b'baz qux') >>> kafka_partitions = { ... 'content': [ ... { ... 'sha1': id1, ... 'status': 'visible', ... }, ... { ... 'sha1': id2, ... 'status': 'visible', ... }, ... ] ... } >>> process_replay_objects_content( ... kafka_partitions, src=src, dst=dst, ... exclude_fn=lambda obj: obj['sha1'] == id1) >>> id1 in dst False >>> id2 in dst True """ vol = [] nb_skipped = 0 t0 = time() for (object_type, objects) in all_objects.items(): if object_type != 'content': logger.warning( 'Received a series of %s, this should not happen', object_type) continue for obj in objects: obj_id = obj[ID_HASH_ALGO] if obj['status'] != 'visible': nb_skipped += 1 logger.debug('skipped %s (status=%s)', hash_to_hex(obj_id), obj['status']) + statsd.increment(CONTENT_OPERATIONS_METRIC, + tags={"decision": "skipped", + "status": obj["status"]}) elif exclude_fn and exclude_fn(obj): nb_skipped += 1 logger.debug('skipped %s (manually excluded)', hash_to_hex(obj_id)) + statsd.increment(CONTENT_OPERATIONS_METRIC, + tags={"decision": "excluded"}) + elif check_dst and obj_in_objstorage(obj_id, dst): + nb_skipped += 1 + logger.debug('skipped %s (in dst)', hash_to_hex(obj_id)) + statsd.increment(CONTENT_OPERATIONS_METRIC, + tags={"decision": "in_dst"}) else: - vol.append(copy_object(obj_id, src, dst)) + try: + copied = copy_object(obj_id, src, dst) + except ObjNotFoundError: + nb_skipped += 1 + statsd.increment(CONTENT_OPERATIONS_METRIC, + tags={"decision": "not_in_src"}) + else: + vol.append(copied) + statsd.increment(CONTENT_OPERATIONS_METRIC, + tags={"decision": "copied"}) dt = time() - t0 logger.info( 'processed %s content objects in %.1fsec ' '(%.1f obj/sec, %.1fMB/sec) - %d failures - %d skipped', len(vol), dt, len(vol)/dt, sum(vol)/1024/1024/dt, len([x for x in vol if not x]), nb_skipped) if notify: notify('WATCHDOG=1') diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py index 5a67b62..782e371 100644 --- a/swh/journal/tests/conftest.py +++ b/swh/journal/tests/conftest.py @@ -1,235 +1,245 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import pytest import logging import random import string from confluent_kafka import Consumer from subprocess import Popen from typing import Any, Dict, List, Optional, Tuple from pathlib import Path from pytest_kafka import ( - make_zookeeper_process, make_kafka_server + make_zookeeper_process, make_kafka_server, ZOOKEEPER_CONFIG_TEMPLATE, ) from swh.model.hashutil import hash_to_bytes logger = logging.getLogger(__name__) CONTENTS = [ { 'length': 3, 'sha1': hash_to_bytes( '34973274ccef6ab4dfaaf86599792fa9c3fe4689'), 'sha1_git': b'foo', 'blake2s256': b'bar', 'sha256': b'baz', 'status': 'visible', }, ] COMMITTERS = [ { 'fullname': b'foo', 'name': b'foo', 'email': b'', }, { 'fullname': b'bar', 'name': b'bar', 'email': b'', } ] DATES = [ { 'timestamp': { 'seconds': 1234567891, 'microseconds': 0, }, 'offset': 120, 'negative_utc': None, }, { 'timestamp': { 'seconds': 1234567892, 'microseconds': 0, }, 'offset': 120, 'negative_utc': None, } ] REVISIONS = [ { 'id': hash_to_bytes('7026b7c1a2af56521e951c01ed20f255fa054238'), 'message': b'hello', 'date': DATES[0], 'committer': COMMITTERS[0], 'author': COMMITTERS[0], 'committer_date': DATES[0], 'type': 'git', 'directory': '\x01'*20, 'synthetic': False, 'metadata': None, 'parents': [], }, { 'id': hash_to_bytes('368a48fe15b7db2383775f97c6b247011b3f14f4'), 'message': b'hello again', 'date': DATES[1], 'committer': COMMITTERS[1], 'author': COMMITTERS[1], 'committer_date': DATES[1], 'type': 'hg', 'directory': '\x02'*20, 'synthetic': False, 'metadata': None, 'parents': [], }, ] RELEASES = [ { 'id': hash_to_bytes('d81cc0710eb6cf9efd5b920a8453e1e07157b6cd'), 'name': b'v0.0.1', 'date': { 'timestamp': { 'seconds': 1234567890, 'microseconds': 0, }, 'offset': 120, 'negative_utc': None, }, 'author': COMMITTERS[0], 'target_type': 'revision', 'target': b'\x04'*20, 'message': b'foo', 'synthetic': False, }, ] ORIGINS = [ { 'url': 'https://somewhere.org/den/fox', }, { 'url': 'https://overtherainbow.org/fox/den', } ] ORIGIN_VISITS = [ { 'origin': ORIGINS[0]['url'], 'date': '2013-05-07 04:20:39.369271+00:00', 'snapshot': None, # TODO 'status': 'ongoing', # TODO 'metadata': {'foo': 'bar'}, 'type': 'git', }, { 'origin': ORIGINS[0]['url'], 'date': '2018-11-27 17:20:39+00:00', 'snapshot': None, # TODO 'status': 'ongoing', # TODO 'metadata': {'baz': 'qux'}, 'type': 'git', } ] # From type to tuple (id, ) OBJECT_TYPE_KEYS = { 'content': ('sha1', CONTENTS), 'revision': ('id', REVISIONS), 'release': ('id', RELEASES), 'origin': (None, ORIGINS), 'origin_visit': (None, ORIGIN_VISITS), } # type: Dict[str, Tuple[Optional[str], List[Dict[str, Any]]]] KAFKA_ROOT = os.environ.get('SWH_KAFKA_ROOT') KAFKA_ROOT = KAFKA_ROOT if KAFKA_ROOT else os.path.dirname(__file__) + '/kafka' if not os.path.exists(KAFKA_ROOT): msg = ('Development error: %s must exist and target an ' 'existing kafka installation' % KAFKA_ROOT) raise ValueError(msg) KAFKA_SCRIPTS = Path(KAFKA_ROOT) / 'bin' KAFKA_BIN = str(KAFKA_SCRIPTS / 'kafka-server-start.sh') ZOOKEEPER_BIN = str(KAFKA_SCRIPTS / 'zookeeper-server-start.sh') +ZK_CONFIG_TEMPLATE = ZOOKEEPER_CONFIG_TEMPLATE + '\nadmin.enableServer=false\n' # Those defines fixtures -zookeeper_proc = make_zookeeper_process(ZOOKEEPER_BIN, scope='session') +zookeeper_proc = make_zookeeper_process(ZOOKEEPER_BIN, + zk_config_template=ZK_CONFIG_TEMPLATE, + scope='session') os.environ['KAFKA_LOG4J_OPTS'] = \ '-Dlog4j.configuration=file:%s/log4j.properties' % \ os.path.dirname(__file__) kafka_server = make_kafka_server(KAFKA_BIN, 'zookeeper_proc', scope='session') kafka_logger = logging.getLogger('kafka') kafka_logger.setLevel(logging.WARN) @pytest.fixture(scope='function') def kafka_prefix(): + """Pick a random prefix for kafka topics on each call""" return ''.join(random.choice(string.ascii_lowercase) for _ in range(10)) +@pytest.fixture(scope='function') +def kafka_consumer_group(kafka_prefix: str): + """Pick a random consumer group for kafka consumers on each call""" + return "test-consumer-%s" % kafka_prefix + + TEST_CONFIG = { 'consumer_id': 'swh.journal.consumer', 'object_types': OBJECT_TYPE_KEYS.keys(), 'max_messages': 1, # will read 1 message and stops 'storage': {'cls': 'memory', 'args': {}}, } @pytest.fixture def test_config(kafka_server: Tuple[Popen, int], kafka_prefix: str): """Test configuration needed for producer/consumer """ _, port = kafka_server return { **TEST_CONFIG, 'brokers': ['127.0.0.1:{}'.format(port)], 'prefix': kafka_prefix + '.swh.journal.objects', } @pytest.fixture def consumer( kafka_server: Tuple[Popen, int], test_config: Dict, - kafka_prefix: str, + kafka_consumer_group: str, ) -> Consumer: """Get a connected Kafka consumer. """ _, kafka_port = kafka_server consumer = Consumer({ 'bootstrap.servers': '127.0.0.1:{}'.format(kafka_port), 'auto.offset.reset': 'earliest', 'enable.auto.commit': True, - 'group.id': "test-consumer-%s" % kafka_prefix, + 'group.id': kafka_consumer_group, }) kafka_topics = [ '%s.%s' % (test_config['prefix'], object_type) for object_type in test_config['object_types'] ] consumer.subscribe(kafka_topics) yield consumer consumer.close() diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py index 4613dcf..809f7e0 100644 --- a/swh/journal/tests/test_cli.py +++ b/swh/journal/tests/test_cli.py @@ -1,217 +1,516 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from collections import Counter import functools import logging import re import tempfile from subprocess import Popen from typing import Any, Dict, Tuple from unittest.mock import patch from click.testing import CliRunner from confluent_kafka import Producer import pytest +from swh.model.hashutil import hash_to_hex from swh.objstorage.backends.in_memory import InMemoryObjStorage -from swh.storage.in_memory import InMemoryStorage +from swh.storage import get_storage from swh.journal.cli import cli from swh.journal.serializers import key_to_kafka, value_to_kafka logger = logging.getLogger(__name__) CLI_CONFIG = ''' storage: cls: memory args: {} objstorage_src: cls: mocked args: name: src objstorage_dst: cls: mocked args: name: dst ''' @pytest.fixture def storage(): - """An instance of swh.storage.in_memory.InMemoryStorage that gets injected - into the CLI functions.""" - storage = InMemoryStorage() + """An swh-storage object that gets injected into the CLI functions.""" + storage_config = { + 'cls': 'pipeline', + 'steps': [ + {'cls': 'validate'}, + {'cls': 'memory'}, + ] + } + storage = get_storage(**storage_config) with patch('swh.journal.cli.get_storage') as get_storage_mock: get_storage_mock.return_value = storage yield storage -def invoke(catch_exceptions, args): +def invoke(catch_exceptions, args, env=None): runner = CliRunner() with tempfile.NamedTemporaryFile('a', suffix='.yml') as config_fd: config_fd.write(CLI_CONFIG) config_fd.seek(0) args = ['-C' + config_fd.name] + args - result = runner.invoke(cli, args, obj={'log_level': logging.DEBUG}) + result = runner.invoke( + cli, args, obj={'log_level': logging.DEBUG}, env=env, + ) if not catch_exceptions and result.exception: print(result.output) raise result.exception return result def test_replay( - storage: InMemoryStorage, + storage, kafka_prefix: str, + kafka_consumer_group: str, kafka_server: Tuple[Popen, int]): (_, port) = kafka_server kafka_prefix += '.swh.journal.objects' producer = Producer({ 'bootstrap.servers': 'localhost:{}'.format(port), 'client.id': 'test-producer', 'enable.idempotence': 'true', }) snapshot = {'id': b'foo', 'branches': { b'HEAD': { 'target_type': 'revision', 'target': b'\x01'*20, } }} # type: Dict[str, Any] producer.produce( topic=kafka_prefix+'.snapshot', key=key_to_kafka(snapshot['id']), value=value_to_kafka(snapshot), ) producer.flush() logger.debug('Flushed producer') result = invoke(False, [ 'replay', '--broker', '127.0.0.1:%d' % port, - '--group-id', 'test-cli-consumer', + '--group-id', kafka_consumer_group, '--prefix', kafka_prefix, '--max-messages', '1', ]) expected = r'Done.\n' assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output assert storage.snapshot_get(snapshot['id']) == { **snapshot, 'next_branch': None} def _patch_objstorages(names): objstorages = {name: InMemoryObjStorage() for name in names} def get_mock_objstorage(cls, args): assert cls == 'mocked', cls return objstorages[args['name']] def decorator(f): @functools.wraps(f) @patch('swh.journal.cli.get_objstorage') def newf(get_objstorage_mock, *args, **kwargs): get_objstorage_mock.side_effect = get_mock_objstorage f(*args, objstorages=objstorages, **kwargs) return newf return decorator +NUM_CONTENTS = 10 + + def _fill_objstorage_and_kafka(kafka_port, kafka_prefix, objstorages): producer = Producer({ 'bootstrap.servers': '127.0.0.1:{}'.format(kafka_port), 'client.id': 'test-producer', 'enable.idempotence': 'true', }) contents = {} - for i in range(10): + for i in range(NUM_CONTENTS): content = b'\x00'*19 + bytes([i]) sha1 = objstorages['src'].add(content) contents[sha1] = content producer.produce( topic=kafka_prefix+'.content', key=key_to_kafka(sha1), value=key_to_kafka({ 'sha1': sha1, 'status': 'visible', }), ) producer.flush() return contents @_patch_objstorages(['src', 'dst']) def test_replay_content( objstorages, - storage: InMemoryStorage, + storage, kafka_prefix: str, + kafka_consumer_group: str, kafka_server: Tuple[Popen, int]): (_, kafka_port) = kafka_server kafka_prefix += '.swh.journal.objects' contents = _fill_objstorage_and_kafka( kafka_port, kafka_prefix, objstorages) result = invoke(False, [ 'content-replay', '--broker', '127.0.0.1:%d' % kafka_port, - '--group-id', 'test-cli-consumer', + '--group-id', kafka_consumer_group, '--prefix', kafka_prefix, - '--max-messages', '10', + '--max-messages', str(NUM_CONTENTS), ]) expected = r'Done.\n' assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output for (sha1, content) in contents.items(): assert sha1 in objstorages['dst'], sha1 assert objstorages['dst'].get(sha1) == content +@_patch_objstorages(['src', 'dst']) +def test_replay_content_structured_log( + objstorages, + storage, + kafka_prefix: str, + kafka_consumer_group: str, + kafka_server: Tuple[Popen, int], + caplog): + (_, kafka_port) = kafka_server + kafka_prefix += '.swh.journal.objects' + + contents = _fill_objstorage_and_kafka( + kafka_port, kafka_prefix, objstorages) + + caplog.set_level(logging.DEBUG, 'swh.journal.replay') + + expected_obj_ids = set(hash_to_hex(sha1) for sha1 in contents) + + result = invoke(False, [ + 'content-replay', + '--broker', '127.0.0.1:%d' % kafka_port, + '--group-id', kafka_consumer_group, + '--prefix', kafka_prefix, + '--max-messages', str(NUM_CONTENTS), + ]) + expected = r'Done.\n' + assert result.exit_code == 0, result.output + assert re.fullmatch(expected, result.output, re.MULTILINE), result.output + + copied = set() + for record in caplog.records: + logtext = record.getMessage() + if 'copied' in logtext: + copied.add(record.args['obj_id']) + + assert copied == expected_obj_ids, ( + "Mismatched logging; see captured log output for details." + ) + + +@_patch_objstorages(['src', 'dst']) +def test_replay_content_static_group_id( + objstorages, + storage, + kafka_prefix: str, + kafka_consumer_group: str, + kafka_server: Tuple[Popen, int], + caplog): + (_, kafka_port) = kafka_server + kafka_prefix += '.swh.journal.objects' + + contents = _fill_objstorage_and_kafka( + kafka_port, kafka_prefix, objstorages) + + # Setup log capture to fish the consumer settings out of the log messages + caplog.set_level(logging.DEBUG, 'swh.journal.client') + + result = invoke(False, [ + 'content-replay', + '--broker', '127.0.0.1:%d' % kafka_port, + '--group-id', kafka_consumer_group, + '--prefix', kafka_prefix, + '--max-messages', str(NUM_CONTENTS), + ], {'KAFKA_GROUP_INSTANCE_ID': 'static-group-instance-id'}) + expected = r'Done.\n' + assert result.exit_code == 0, result.output + assert re.fullmatch(expected, result.output, re.MULTILINE), result.output + + consumer_settings = None + for record in caplog.records: + if 'Consumer settings' in record.message: + consumer_settings = record.args + break + + assert consumer_settings is not None, ( + 'Failed to get consumer settings out of the consumer log. ' + 'See log capture for details.' + ) + assert consumer_settings['group.instance.id'] == 'static-group-instance-id' + assert consumer_settings['session.timeout.ms'] == 60 * 10 * 1000 + assert consumer_settings['max.poll.interval.ms'] == 90 * 10 * 1000 + + for (sha1, content) in contents.items(): + assert sha1 in objstorages['dst'], sha1 + assert objstorages['dst'].get(sha1) == content + + @_patch_objstorages(['src', 'dst']) def test_replay_content_exclude( objstorages, - storage: InMemoryStorage, + storage, kafka_prefix: str, + kafka_consumer_group: str, kafka_server: Tuple[Popen, int]): (_, kafka_port) = kafka_server kafka_prefix += '.swh.journal.objects' contents = _fill_objstorage_and_kafka( kafka_port, kafka_prefix, objstorages) excluded_contents = list(contents)[0::2] # picking half of them with tempfile.NamedTemporaryFile(mode='w+b') as fd: fd.write(b''.join(sorted(excluded_contents))) fd.seek(0) result = invoke(False, [ 'content-replay', '--broker', '127.0.0.1:%d' % kafka_port, - '--group-id', 'test-cli-consumer', + '--group-id', kafka_consumer_group, '--prefix', kafka_prefix, - '--max-messages', '10', + '--max-messages', str(NUM_CONTENTS), '--exclude-sha1-file', fd.name, ]) expected = r'Done.\n' assert result.exit_code == 0, result.output assert re.fullmatch(expected, result.output, re.MULTILINE), result.output for (sha1, content) in contents.items(): if sha1 in excluded_contents: assert sha1 not in objstorages['dst'], sha1 else: assert sha1 in objstorages['dst'], sha1 assert objstorages['dst'].get(sha1) == content + + +NUM_CONTENTS_DST = 5 + + +@_patch_objstorages(['src', 'dst']) +@pytest.mark.parametrize("check_dst,expected_copied,expected_in_dst", [ + (True, NUM_CONTENTS - NUM_CONTENTS_DST, NUM_CONTENTS_DST), + (False, NUM_CONTENTS, 0), +]) +def test_replay_content_check_dst( + objstorages, + storage, + kafka_prefix: str, + kafka_consumer_group: str, + kafka_server: Tuple[Popen, int], + check_dst: bool, + expected_copied: int, + expected_in_dst: int, + caplog): + (_, kafka_port) = kafka_server + kafka_prefix += '.swh.journal.objects' + + contents = _fill_objstorage_and_kafka( + kafka_port, kafka_prefix, objstorages) + + for i, (sha1, content) in enumerate(contents.items()): + if i >= NUM_CONTENTS_DST: + break + + objstorages['dst'].add(content, obj_id=sha1) + + caplog.set_level(logging.DEBUG, 'swh.journal.replay') + + result = invoke(False, [ + 'content-replay', + '--broker', '127.0.0.1:%d' % kafka_port, + '--group-id', kafka_consumer_group, + '--prefix', kafka_prefix, + '--max-messages', str(NUM_CONTENTS), + '--check-dst' if check_dst else '--no-check-dst', + ]) + expected = r'Done.\n' + assert result.exit_code == 0, result.output + assert re.fullmatch(expected, result.output, re.MULTILINE), result.output + + copied = 0 + in_dst = 0 + for record in caplog.records: + logtext = record.getMessage() + if 'copied' in logtext: + copied += 1 + elif 'in dst' in logtext: + in_dst += 1 + + assert (copied == expected_copied and in_dst == expected_in_dst), ( + "Unexpected amount of objects copied, see the captured log for details" + ) + + for (sha1, content) in contents.items(): + assert sha1 in objstorages['dst'], sha1 + assert objstorages['dst'].get(sha1) == content + + +class FlakyObjStorage(InMemoryObjStorage): + def __init__(self, *args, **kwargs): + state = kwargs.pop('state') + self.failures_left = Counter(kwargs.pop('failures')) + super().__init__(*args, **kwargs) + if state: + self.state = state + + def flaky_operation(self, op, obj_id): + if self.failures_left[op, obj_id] > 0: + self.failures_left[op, obj_id] -= 1 + raise RuntimeError( + 'Failed %s on %s' % (op, hash_to_hex(obj_id)) + ) + + def get(self, obj_id): + self.flaky_operation('get', obj_id) + return super().get(obj_id) + + def add(self, data, obj_id=None, check_presence=True): + self.flaky_operation('add', obj_id) + return super().add(data, obj_id=obj_id, + check_presence=check_presence) + + def __contains__(self, obj_id): + self.flaky_operation('in', obj_id) + return super().__contains__(obj_id) + + +@_patch_objstorages(['src', 'dst']) +def test_replay_content_check_dst_retry( + objstorages, + storage, + kafka_prefix: str, + kafka_consumer_group: str, + kafka_server: Tuple[Popen, int]): + (_, kafka_port) = kafka_server + kafka_prefix += '.swh.journal.objects' + + contents = _fill_objstorage_and_kafka( + kafka_port, kafka_prefix, objstorages) + + failures = {} + for i, (sha1, content) in enumerate(contents.items()): + if i >= NUM_CONTENTS_DST: + break + + objstorages['dst'].add(content, obj_id=sha1) + failures['in', sha1] = 1 + + orig_dst = objstorages['dst'] + objstorages['dst'] = FlakyObjStorage(state=orig_dst.state, + failures=failures) + + result = invoke(False, [ + 'content-replay', + '--broker', '127.0.0.1:%d' % kafka_port, + '--group-id', kafka_consumer_group, + '--prefix', kafka_prefix, + '--max-messages', str(NUM_CONTENTS), + '--check-dst', + ]) + expected = r'Done.\n' + assert result.exit_code == 0, result.output + assert re.fullmatch(expected, result.output, re.MULTILINE), result.output + + for (sha1, content) in contents.items(): + assert sha1 in objstorages['dst'], sha1 + assert objstorages['dst'].get(sha1) == content + + +@_patch_objstorages(['src', 'dst']) +def test_replay_content_objnotfound( + objstorages, + storage, + kafka_prefix: str, + kafka_consumer_group: str, + kafka_server: Tuple[Popen, int], + caplog): + (_, kafka_port) = kafka_server + kafka_prefix += '.swh.journal.objects' + + contents = _fill_objstorage_and_kafka( + kafka_port, kafka_prefix, objstorages) + + num_contents_deleted = 5 + contents_deleted = set() + + for i, sha1 in enumerate(contents): + if i >= num_contents_deleted: + break + + del objstorages['src'].state[sha1] + contents_deleted.add(hash_to_hex(sha1)) + + caplog.set_level(logging.DEBUG, 'swh.journal.replay') + + result = invoke(False, [ + 'content-replay', + '--broker', '127.0.0.1:%d' % kafka_port, + '--group-id', kafka_consumer_group, + '--prefix', kafka_prefix, + '--max-messages', str(NUM_CONTENTS), + ]) + expected = r'Done.\n' + assert result.exit_code == 0, result.output + assert re.fullmatch(expected, result.output, re.MULTILINE), result.output + + copied = 0 + not_in_src = set() + for record in caplog.records: + logtext = record.getMessage() + if 'copied' in logtext: + copied += 1 + elif 'object not found' in logtext: + # Check that the object id can be recovered from logs + assert record.levelno == logging.ERROR + not_in_src.add(record.args['obj_id']) + + assert copied == NUM_CONTENTS - num_contents_deleted, ( + "Unexpected number of contents copied" + ) + + assert not_in_src == contents_deleted, ( + "Mismatch between deleted contents and not_in_src logs" + ) + + for (sha1, content) in contents.items(): + if sha1 not in objstorages['src']: + continue + assert sha1 in objstorages['dst'], sha1 + assert objstorages['dst'].get(sha1) == content diff --git a/swh/journal/tests/test_client.py b/swh/journal/tests/test_client.py index 05b23c6..23c78ab 100644 --- a/swh/journal/tests/test_client.py +++ b/swh/journal/tests/test_client.py @@ -1,50 +1,87 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from subprocess import Popen from typing import Tuple from unittest.mock import MagicMock from confluent_kafka import Producer from swh.model.hypothesis_strategies import revisions from swh.journal.client import JournalClient from swh.journal.serializers import key_to_kafka, value_to_kafka def test_client( kafka_prefix: str, + kafka_consumer_group: str, kafka_server: Tuple[Popen, int]): (_, port) = kafka_server kafka_prefix += '.swh.journal.objects' producer = Producer({ 'bootstrap.servers': 'localhost:{}'.format(port), 'client.id': 'test producer', 'enable.idempotence': 'true', }) rev = revisions().example() # Fill Kafka producer.produce( topic=kafka_prefix + '.revision', key=key_to_kafka(rev.id), value=value_to_kafka(rev.to_dict()), ) producer.flush() config = { 'brokers': 'localhost:%d' % kafka_server[1], - 'group_id': 'replayer', + 'group_id': kafka_consumer_group, 'prefix': kafka_prefix, 'max_messages': 1, } client = JournalClient(**config) worker_fn = MagicMock() client.process(worker_fn) worker_fn.assert_called_once_with({'revision': [rev.to_dict()]}) + + +def test_client_eof( + kafka_prefix: str, + kafka_consumer_group: str, + kafka_server: Tuple[Popen, int]): + (_, port) = kafka_server + kafka_prefix += '.swh.journal.objects' + + producer = Producer({ + 'bootstrap.servers': 'localhost:{}'.format(port), + 'client.id': 'test producer', + 'enable.idempotence': 'true', + }) + + rev = revisions().example() + + # Fill Kafka + producer.produce( + topic=kafka_prefix + '.revision', key=key_to_kafka(rev.id), + value=value_to_kafka(rev.to_dict()), + ) + producer.flush() + + config = { + 'brokers': 'localhost:%d' % kafka_server[1], + 'group_id': kafka_consumer_group, + 'prefix': kafka_prefix, + 'stop_on_eof': True, + } + client = JournalClient(**config) + + worker_fn = MagicMock() + client.process(worker_fn) + + worker_fn.assert_called_once_with({'revision': [rev.to_dict()]}) diff --git a/swh/journal/tests/test_kafka_writer.py b/swh/journal/tests/test_kafka_writer.py index bab6639..847dfe9 100644 --- a/swh/journal/tests/test_kafka_writer.py +++ b/swh/journal/tests/test_kafka_writer.py @@ -1,144 +1,150 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import datetime from confluent_kafka import Consumer, KafkaException from subprocess import Popen from typing import Tuple from swh.storage import get_storage from swh.journal.writer.kafka import KafkaJournalWriter from swh.journal.serializers import ( kafka_to_key, kafka_to_value ) from .conftest import OBJECT_TYPE_KEYS def assert_written(consumer, kafka_prefix, expected_messages): consumed_objects = defaultdict(list) fetched_messages = 0 retries_left = 1000 while fetched_messages < expected_messages: if retries_left == 0: raise ValueError('Timed out fetching messages from kafka') msg = consumer.poll(timeout=0.01) if not msg: retries_left -= 1 continue error = msg.error() if error is not None: if error.fatal(): raise KafkaException(error) retries_left -= 1 continue fetched_messages += 1 consumed_objects[msg.topic()].append( (kafka_to_key(msg.key()), kafka_to_value(msg.value())) ) for (object_type, (key_name, objects)) in OBJECT_TYPE_KEYS.items(): topic = kafka_prefix + '.' + object_type (keys, values) = zip(*consumed_objects[topic]) if key_name: assert list(keys) == [object_[key_name] for object_ in objects] else: pass # TODO if object_type == 'origin_visit': for value in values: del value['visit'] elif object_type == 'content': for value in values: del value['ctime'] for object_ in objects: assert object_ in values def test_kafka_writer( kafka_prefix: str, kafka_server: Tuple[Popen, int], consumer: Consumer): kafka_prefix += '.swh.journal.objects' config = { 'brokers': ['localhost:%d' % kafka_server[1]], 'client_id': 'kafka_writer', 'prefix': kafka_prefix, 'producer_config': { 'message.max.bytes': 100000000, } } writer = KafkaJournalWriter(**config) expected_messages = 0 for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): for (num, object_) in enumerate(objects): if object_type == 'origin_visit': object_ = {**object_, 'visit': num} if object_type == 'content': object_ = {**object_, 'ctime': datetime.datetime.now()} writer.write_addition(object_type, object_) expected_messages += 1 assert_written(consumer, kafka_prefix, expected_messages) def test_storage_direct_writer( kafka_prefix: str, kafka_server: Tuple[Popen, int], consumer: Consumer): kafka_prefix += '.swh.journal.objects' - config = { + writer_config = { + 'cls': 'kafka', 'brokers': ['localhost:%d' % kafka_server[1]], 'client_id': 'kafka_writer', 'prefix': kafka_prefix, 'producer_config': { 'message.max.bytes': 100000000, } } + storage_config = { + 'cls': 'pipeline', + 'steps': [ + {'cls': 'validate'}, + {'cls': 'memory', 'journal_writer': writer_config}, + ] + } - storage = get_storage('memory', journal_writer={ - 'cls': 'kafka', **config, - }) + storage = get_storage(**storage_config) expected_messages = 0 for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): method = getattr(storage, object_type + '_add') if object_type in ('content', 'directory', 'revision', 'release', 'snapshot', 'origin'): if object_type == 'content': objects = [{**obj, 'data': b''} for obj in objects] method(objects) expected_messages += len(objects) elif object_type in ('origin_visit',): for object_ in objects: object_ = object_.copy() origin_url = object_.pop('origin') storage.origin_add_one({'url': origin_url}) visit = method(origin=origin_url, date=object_.pop('date'), type=object_.pop('type')) expected_messages += 1 visit_id = visit['visit'] storage.origin_visit_update(origin_url, visit_id, **object_) expected_messages += 1 else: assert False, object_type assert_written(consumer, kafka_prefix, expected_messages) diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py index b9c3e82..acb614e 100644 --- a/swh/journal/tests/test_replay.py +++ b/swh/journal/tests/test_replay.py @@ -1,236 +1,246 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import functools import random from subprocess import Popen from typing import Tuple import dateutil from confluent_kafka import Producer from hypothesis import strategies, given, settings import pytest from swh.storage import get_storage from swh.journal.client import JournalClient from swh.journal.serializers import key_to_kafka, value_to_kafka from swh.journal.replay import process_replay_objects, is_hash_in_bytearray from .conftest import OBJECT_TYPE_KEYS from .utils import MockedJournalClient, MockedKafkaWriter +storage_config = { + 'cls': 'pipeline', + 'steps': [ + {'cls': 'validate'}, + {'cls': 'memory'}, + ] +} + + def test_storage_play( kafka_prefix: str, + kafka_consumer_group: str, kafka_server: Tuple[Popen, int]): (_, port) = kafka_server kafka_prefix += '.swh.journal.objects' - storage = get_storage('memory') + storage = get_storage(**storage_config) producer = Producer({ 'bootstrap.servers': 'localhost:{}'.format(port), 'client.id': 'test producer', 'enable.idempotence': 'true', }) now = datetime.datetime.now(tz=datetime.timezone.utc) # Fill Kafka nb_sent = 0 nb_visits = 0 for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): topic = kafka_prefix + '.' + object_type for object_ in objects: key = bytes(random.randint(0, 255) for _ in range(40)) object_ = object_.copy() if object_type == 'content': object_['ctime'] = now elif object_type == 'origin_visit': nb_visits += 1 object_['visit'] = nb_visits producer.produce( topic=topic, key=key_to_kafka(key), value=value_to_kafka(object_), ) nb_sent += 1 producer.flush() # Fill the storage from Kafka config = { 'brokers': 'localhost:%d' % kafka_server[1], - 'group_id': 'replayer', + 'group_id': kafka_consumer_group, 'prefix': kafka_prefix, 'max_messages': nb_sent, } replayer = JournalClient(**config) worker_fn = functools.partial(process_replay_objects, storage=storage) nb_inserted = 0 while nb_inserted < nb_sent: nb_inserted += replayer.process(worker_fn) assert nb_sent == nb_inserted # Check the objects were actually inserted in the storage assert OBJECT_TYPE_KEYS['revision'][1] == \ list(storage.revision_get( [rev['id'] for rev in OBJECT_TYPE_KEYS['revision'][1]])) assert OBJECT_TYPE_KEYS['release'][1] == \ list(storage.release_get( [rel['id'] for rel in OBJECT_TYPE_KEYS['release'][1]])) origins = list(storage.origin_get( [orig for orig in OBJECT_TYPE_KEYS['origin'][1]])) assert OBJECT_TYPE_KEYS['origin'][1] == \ [{'url': orig['url']} for orig in origins] for origin in origins: origin_url = origin['url'] expected_visits = [ { **visit, 'origin': origin_url, 'date': dateutil.parser.parse(visit['date']), } for visit in OBJECT_TYPE_KEYS['origin_visit'][1] if visit['origin'] == origin['url'] ] actual_visits = list(storage.origin_visit_get( origin_url)) for visit in actual_visits: del visit['visit'] # opaque identifier assert expected_visits == actual_visits input_contents = OBJECT_TYPE_KEYS['content'][1] contents = storage.content_get_metadata( [cont['sha1'] for cont in input_contents]) assert len(contents) == len(input_contents) assert contents == {cont['sha1']: [cont] for cont in input_contents} def _test_write_replay_origin_visit(visits): """Helper function to write tests for origin_visit. Each visit (a dict) given in the 'visits' argument will be sent to a (mocked) kafka queue, which a in-memory-storage backed replayer is listening to. Check that corresponding origin visits entities are present in the storage and have correct values. """ queue = [] replayer = MockedJournalClient(queue) writer = MockedKafkaWriter(queue) # Note that flipping the order of these two insertions will crash # the test, because the legacy origin_format does not allow to create # the origin when needed (type is missing) writer.send('origin', 'foo', { 'url': 'http://example.com/', 'type': 'git', }) for visit in visits: writer.send('origin_visit', 'foo', visit) queue_size = len(queue) assert replayer.max_messages == 0 replayer.max_messages = queue_size - storage = get_storage('memory') + storage = get_storage(**storage_config) worker_fn = functools.partial(process_replay_objects, storage=storage) nb_messages = 0 while nb_messages < queue_size: nb_messages += replayer.process(worker_fn) actual_visits = list(storage.origin_visit_get('http://example.com/')) assert len(actual_visits) == len(visits), actual_visits for vin, vout in zip(visits, actual_visits): vin = vin.copy() vout = vout.copy() assert vout.pop('origin') == 'http://example.com/' vin.pop('origin') vin.setdefault('type', 'git') vin.setdefault('metadata', None) assert vin == vout def test_write_replay_origin_visit(): """Test origin_visit when the 'origin' is just a string.""" now = datetime.datetime.now() visits = [{ 'visit': 1, 'origin': 'http://example.com/', 'date': now, 'type': 'git', 'status': 'partial', 'snapshot': None, }] _test_write_replay_origin_visit(visits) def test_write_replay_legacy_origin_visit1(): """Test origin_visit when there is no type.""" now = datetime.datetime.now() visits = [{ 'visit': 1, 'origin': 'http://example.com/', 'date': now, 'status': 'partial', 'snapshot': None, }] with pytest.raises(ValueError, match='too old'): _test_write_replay_origin_visit(visits) def test_write_replay_legacy_origin_visit2(): """Test origin_visit when 'type' is missing from the visit, but not from the origin.""" now = datetime.datetime.now() visits = [{ 'visit': 1, 'origin': { 'url': 'http://example.com/', 'type': 'git', }, 'date': now, 'type': 'git', 'status': 'partial', 'snapshot': None, }] _test_write_replay_origin_visit(visits) def test_write_replay_legacy_origin_visit3(): """Test origin_visit when the origin is a dict""" now = datetime.datetime.now() visits = [{ 'visit': 1, 'origin': { 'url': 'http://example.com/', }, 'date': now, 'type': 'git', 'status': 'partial', 'snapshot': None, }] _test_write_replay_origin_visit(visits) hash_strategy = strategies.binary(min_size=20, max_size=20) @settings(max_examples=500) @given(strategies.sets(hash_strategy, min_size=0, max_size=500), strategies.sets(hash_strategy, min_size=10)) def test_is_hash_in_bytearray(haystack, needles): array = b''.join(sorted(haystack)) needles |= haystack # Exhaustively test for all objects in the array for needle in needles: assert is_hash_in_bytearray(needle, array, len(haystack)) == \ (needle in haystack) diff --git a/swh/journal/tests/test_write_replay.py b/swh/journal/tests/test_write_replay.py index 6075c00..e417627 100644 --- a/swh/journal/tests/test_write_replay.py +++ b/swh/journal/tests/test_write_replay.py @@ -1,140 +1,158 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import functools +from unittest.mock import patch import attr from hypothesis import given, settings, HealthCheck from hypothesis.strategies import lists from swh.model.hypothesis_strategies import object_dicts -from swh.storage.in_memory import InMemoryStorage -from swh.storage import HashCollision +from swh.storage import get_storage, HashCollision from swh.journal.replay import process_replay_objects from swh.journal.replay import process_replay_objects_content from .utils import MockedJournalClient, MockedKafkaWriter +storage_config = { + 'cls': 'pipeline', + 'steps': [ + {'cls': 'validate'}, + {'cls': 'memory', 'journal_writer': {'cls': 'memory'}}, + ] +} + + def empty_person_name_email(rev_or_rel): """Empties the 'name' and 'email' fields of the author/committer fields of a revision or release; leaving only the fullname.""" if getattr(rev_or_rel, 'author', None): rev_or_rel = attr.evolve( rev_or_rel, author=attr.evolve( rev_or_rel.author, name=b'', email=b'', ) ) if getattr(rev_or_rel, 'committer', None): rev_or_rel = attr.evolve( rev_or_rel, committer=attr.evolve( rev_or_rel.committer, name=b'', email=b'', ) ) return rev_or_rel @given(lists(object_dicts(), min_size=1)) @settings(suppress_health_check=[HealthCheck.too_slow]) def test_write_replay_same_order_batches(objects): queue = [] replayer = MockedJournalClient(queue) - storage1 = InMemoryStorage() - storage1.journal_writer = MockedKafkaWriter(queue) + with patch('swh.journal.writer.inmemory.InMemoryJournalWriter', + return_value=MockedKafkaWriter(queue)): + storage1 = get_storage(**storage_config) for (obj_type, obj) in objects: obj = obj.copy() if obj_type == 'origin_visit': storage1.origin_add_one({'url': obj['origin']}) storage1.origin_visit_upsert([obj]) else: + if obj_type == 'content' and obj.get('status') == 'absent': + obj_type = 'skipped_content' method = getattr(storage1, obj_type + '_add') try: method([obj]) except HashCollision: pass queue_size = len(queue) assert replayer.max_messages == 0 replayer.max_messages = queue_size - storage2 = InMemoryStorage() + storage2 = get_storage(**storage_config) worker_fn = functools.partial(process_replay_objects, storage=storage2) nb_messages = 0 while nb_messages < queue_size: nb_messages += replayer.process(worker_fn) assert replayer.consumer.committed for attr_name in ('_contents', '_directories', '_snapshots', '_origin_visits', '_origins'): assert getattr(storage1, attr_name) == getattr(storage2, attr_name), \ attr_name # When hypothesis generates a revision and a release with same # author (or committer) fullname but different name or email, then # the storage will use the first name/email it sees. # This first one will be either the one from the revision or the release, # and since there is no order guarantees, storage2 has 1/2 chance of # not seeing the same order as storage1, therefore we need to strip # them out before comparing. for attr_name in ('_revisions', '_releases'): items1 = {k: empty_person_name_email(v) for (k, v) in getattr(storage1, attr_name).items()} items2 = {k: empty_person_name_email(v) for (k, v) in getattr(storage2, attr_name).items()} assert items1 == items2, attr_name # TODO: add test for hash collision @given(lists(object_dicts(), min_size=1)) @settings(suppress_health_check=[HealthCheck.too_slow]) def test_write_replay_content(objects): queue = [] replayer = MockedJournalClient(queue) - storage1 = InMemoryStorage() - storage1.journal_writer = MockedKafkaWriter(queue) + with patch('swh.journal.writer.inmemory.InMemoryJournalWriter', + return_value=MockedKafkaWriter(queue)): + storage1 = get_storage(**storage_config) contents = [] for (obj_type, obj) in objects: obj = obj.copy() if obj_type == 'content': # avoid hash collision if not storage1.content_find(obj): - storage1.content_add([obj]) + if obj.get('status') != 'absent': + storage1.content_add([obj]) contents.append(obj) queue_size = len(queue) assert replayer.max_messages == 0 replayer.max_messages = queue_size - storage2 = InMemoryStorage() + storage2 = get_storage(**storage_config) + + objstorage1 = storage1.objstorage.objstorage + objstorage2 = storage2.objstorage.objstorage + worker_fn = functools.partial(process_replay_objects_content, - src=storage1.objstorage, - dst=storage2.objstorage) + src=objstorage1, + dst=objstorage2) nb_messages = 0 while nb_messages < queue_size: nb_messages += replayer.process(worker_fn) # only content with status visible will be copied in storage2 expected_objstorage_state = { c['sha1']: c['data'] for c in contents if c['status'] == 'visible' } - assert expected_objstorage_state == storage2.objstorage.state + assert expected_objstorage_state == objstorage2.state diff --git a/swh/journal/tests/utils.py b/swh/journal/tests/utils.py index ba31b0a..49f7b1d 100644 --- a/swh/journal/tests/utils.py +++ b/swh/journal/tests/utils.py @@ -1,74 +1,75 @@ from swh.journal.client import JournalClient, ACCEPTED_OBJECT_TYPES from swh.journal.writer.kafka import KafkaJournalWriter from swh.journal.serializers import (kafka_to_value, key_to_kafka, value_to_kafka) class FakeKafkaMessage: def __init__(self, topic, key, value): self._topic = topic self._key = key_to_kafka(key) self._value = value_to_kafka(value) def topic(self): return self._topic def value(self): return self._value def key(self): return self._key def error(self): return None class MockedKafkaWriter(KafkaJournalWriter): def __init__(self, queue): self._prefix = 'prefix' self.queue = queue def send(self, topic, key, value): msg = FakeKafkaMessage(topic=topic, key=key, value=value) self.queue.append(msg) def flush(self): pass class MockedKafkaConsumer: """Mimic the confluent_kafka.Consumer API, producing the messages stored in `queue`. You're only allowed to subscribe to topics in which the queue has messages. """ def __init__(self, queue): self.queue = queue self.committed = False def consume(self, num_messages, timeout=None): L = self.queue[0:num_messages] self.queue[0:num_messages] = [] return L def commit(self): if self.queue == []: self.committed = True def list_topics(self, timeout=None): return set(message.topic() for message in self.queue) def subscribe(self, topics): unknown_topics = set(topics) - self.list_topics() if unknown_topics: raise ValueError('Unknown topics %s' % ', '.join(unknown_topics)) class MockedJournalClient(JournalClient): def __init__(self, queue, object_types=ACCEPTED_OBJECT_TYPES): self._object_types = object_types self.consumer = MockedKafkaConsumer(queue) self.process_timeout = 0 self.max_messages = 0 self.value_deserializer = kafka_to_value + self.stop_on_eof = False diff --git a/version.txt b/version.txt index a413830..3ae0241 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.25-0-g4334aa0 \ No newline at end of file +v0.0.26-0-g06688ba \ No newline at end of file