diff --git a/swh/journal/cli.py b/swh/journal/cli.py index deb8bcb..984acdb 100644 --- a/swh/journal/cli.py +++ b/swh/journal/cli.py @@ -1,111 +1,118 @@ # Copyright (C) 2016-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click +import functools import logging import os from swh.core import config from swh.storage import get_storage -from swh.journal.replay import StorageReplayer +from swh.journal.client import JournalClient +from swh.journal.replay import process_replay_objects from swh.journal.backfill import JournalBackfiller CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) @click.group(context_settings=CONTEXT_SETTINGS) @click.option('--config-file', '-C', default=None, type=click.Path(exists=True, dir_okay=False,), help="Configuration file.") @click.option('--log-level', '-l', default='INFO', type=click.Choice(logging._nameToLevel.keys()), help="Log level (default to INFO)") @click.pass_context def cli(ctx, config_file, log_level): """Software Heritage Scheduler CLI interface Default to use the the local scheduler instance (plugged to the main scheduler db). """ if not config_file: config_file = os.environ.get('SWH_CONFIG_FILENAME') if not config_file: raise ValueError('You must either pass a config-file parameter ' 'or set SWH_CONFIG_FILENAME to target ' 'the config-file') if not os.path.exists(config_file): raise ValueError('%s does not exist' % config_file) conf = config.read(config_file) ctx.ensure_object(dict) logging.basicConfig( level=log_level, format='%(asctime)s %(levelname)s %(name)s %(message)s', ) logging.getLogger('kafka').setLevel(logging.INFO) ctx.obj['config'] = conf ctx.obj['loglevel'] = log_level @cli.command() @click.option('--max-messages', '-m', default=None, type=int, help='Maximum number of objects to replay. Default is to ' 'run forever.') @click.option('--broker', 'brokers', type=str, multiple=True, help='Kafka broker to connect to.') @click.option('--prefix', type=str, default='swh.journal.objects', help='Prefix of Kafka topic names to read from.') @click.option('--consumer-id', type=str, help='Name of the consumer/group id for reading from Kafka.') @click.pass_context def replay(ctx, brokers, prefix, consumer_id, max_messages): """Fill a new storage by reading a journal. """ conf = ctx.obj['config'] + logger = logging.getLogger(__name__) + logger.setLevel(ctx.obj['loglevel']) storage = get_storage(**conf.pop('storage')) - replayer = StorageReplayer(brokers, prefix, consumer_id, - storage=storage, max_messages=max_messages) + client = JournalClient(brokers, prefix, consumer_id) + worker_fn = functools.partial(process_replay_objects, storage=storage) try: - replayer.process() + nb_messages = 0 + while not max_messages or nb_messages < max_messages: + nb_messages += client.process(worker_fn) + logger.info('Processed %d messages.' % nb_messages) except KeyboardInterrupt: ctx.exit(0) else: print('Done.') @cli.command() @click.argument('object_type') @click.option('--start-object', default=None) @click.option('--end-object', default=None) @click.option('--dry-run', is_flag=True, default=False) @click.pass_context def backfiller(ctx, object_type, start_object, end_object, dry_run): """Manipulate backfiller """ conf = ctx.obj['config'] backfiller = JournalBackfiller(conf) try: backfiller.run( object_type=object_type, start_object=start_object, end_object=end_object, dry_run=dry_run) except KeyboardInterrupt: ctx.exit(0) def main(): return cli(auto_envvar_prefix='SWH_JOURNAL') if __name__ == '__main__': main() diff --git a/swh/journal/client.py b/swh/journal/client.py index 2d21f0d..e044b0d 100644 --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -1,118 +1,108 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from abc import ABCMeta, abstractmethod from kafka import KafkaConsumer import logging from .serializers import kafka_to_key, kafka_to_value logger = logging.getLogger(__name__) # Only accepted offset reset policy accepted ACCEPTED_OFFSET_RESET = ['earliest', 'latest'] # Only accepted object types ACCEPTED_OBJECT_TYPES = [ 'content', 'directory', 'revision', 'release', 'snapshot', 'origin', 'origin_visit' ] -class JournalClient(metaclass=ABCMeta): +class JournalClient: """A base client for the Software Heritage journal. The current implementation of the journal uses Apache Kafka brokers to publish messages under a given topic prefix, with each object type using a specific topic under that prefix. Clients subscribe to events specific to each object type by using the `object_types` configuration variable. Clients can be sharded by setting the `client_id` to a common value across instances. The journal will share the message throughput across the nodes sharing the same client_id. Messages are processed by the `process_objects` method in batches of maximum `max_messages`. """ def __init__( self, brokers, topic_prefix, consumer_id, object_types=ACCEPTED_OBJECT_TYPES, max_messages=0, auto_offset_reset='earliest'): if auto_offset_reset not in ACCEPTED_OFFSET_RESET: raise ValueError( 'Option \'auto_offset_reset\' only accept %s.' % ACCEPTED_OFFSET_RESET) for object_type in object_types: if object_type not in ACCEPTED_OBJECT_TYPES: raise ValueError( 'Option \'object_types\' only accepts %s.' % ACCEPTED_OFFSET_RESET) self.consumer = KafkaConsumer( bootstrap_servers=brokers, key_deserializer=kafka_to_key, value_deserializer=kafka_to_value, auto_offset_reset=auto_offset_reset, enable_auto_commit=False, group_id=consumer_id, ) self.consumer.subscribe( topics=['%s.%s' % (topic_prefix, object_type) for object_type in object_types], ) self.max_messages = max_messages self._object_types = object_types def poll(self): return self.consumer.poll() def commit(self): self.consumer.commit() - def process(self, max_messages=None): - nb_messages = 0 + def process(self, worker_fn): + """Polls Kafka for a batch of messages, and calls the worker_fn + with these messages. - while not self.max_messages or nb_messages < self.max_messages: - polled = self.poll() - for (partition, messages) in polled.items(): - object_type = partition.topic.split('.')[-1] - # Got a message from a topic we did not subscribe to. - assert object_type in self._object_types, object_type + Args: + worker_fn Callable[Dict[str, List[dict]]]: Function called with + the messages as + argument. + """ + nb_messages = 0 + polled = self.poll() + for (partition, messages) in polled.items(): + object_type = partition.topic.split('.')[-1] + # Got a message from a topic we did not subscribe to. + assert object_type in self._object_types, object_type - self.process_objects( - {object_type: [msg.value for msg in messages]}) + worker_fn({object_type: [msg.value for msg in messages]}) - nb_messages += len(messages) + nb_messages += len(messages) - self.commit() - logger.info('Processed %d messages.' % nb_messages) + self.commit() return nb_messages - - # Override the following method in the sub-classes - - @abstractmethod - def process_objects(self, messages): - """Process the objects (store, compute, etc...) - - Args: - messages (dict): Dict of key object_type (as per - configuration) and their associated values. - - """ - pass diff --git a/swh/journal/replay.py b/swh/journal/replay.py index d804679..7d8c5a2 100644 --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -1,54 +1,41 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from swh.storage import HashCollision -from .client import JournalClient - logger = logging.getLogger(__name__) -OBJECT_TYPES = frozenset([ - 'origin', 'origin_visit', 'snapshot', 'release', 'revision', - 'directory', 'content', -]) - - -class StorageReplayer(JournalClient): - def __init__(self, *args, storage, **kwargs): - super().__init__(*args, **kwargs) - self.storage = storage - - def process_objects(self, all_objects): - for (object_type, objects) in all_objects.items(): - self.insert_objects(object_type, objects) - - def insert_objects(self, object_type, objects): - if object_type in ('content', 'directory', 'revision', 'release', - 'snapshot', 'origin'): - if object_type == 'content': - # TODO: insert 'content' in batches - for object_ in objects: - try: - self.storage.content_add_metadata([object_]) - except HashCollision as e: - logger.error('Hash collision: %s', e.args) - else: - # TODO: split batches that are too large for the storage - # to handle? - method = getattr(self.storage, object_type + '_add') - method(objects) - elif object_type == 'origin_visit': - self.storage.origin_visit_upsert([ - { - **obj, - 'origin': self.storage.origin_add_one(obj['origin']) - } - for obj in objects]) - else: - assert False +def process_replay_objects(all_objects, *, storage): + for (object_type, objects) in all_objects.items(): + _insert_objects(object_type, objects, storage) + + +def _insert_objects(object_type, objects, storage): + if object_type == 'content': + # TODO: insert 'content' in batches + for object_ in objects: + try: + storage.content_add_metadata([object_]) + except HashCollision as e: + logger.error('Hash collision: %s', e.args) + elif object_type in ('directory', 'revision', 'release', + 'snapshot', 'origin'): + # TODO: split batches that are too large for the storage + # to handle? + method = getattr(storage, object_type + '_add') + method(objects) + elif object_type == 'origin_visit': + storage.origin_visit_upsert([ + { + **obj, + 'origin': storage.origin_add_one(obj['origin']) + } + for obj in objects]) + else: + assert False diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py index e5f6405..e5ab97f 100644 --- a/swh/journal/tests/test_replay.py +++ b/swh/journal/tests/test_replay.py @@ -1,97 +1,102 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime +import functools import random from subprocess import Popen from typing import Tuple import dateutil from kafka import KafkaProducer from swh.storage import get_storage +from swh.journal.client import JournalClient from swh.journal.serializers import key_to_kafka, value_to_kafka -from swh.journal.replay import StorageReplayer +from swh.journal.replay import process_replay_objects from .conftest import OBJECT_TYPE_KEYS def test_storage_play( kafka_prefix: str, kafka_server: Tuple[Popen, int]): (_, port) = kafka_server kafka_prefix += '.swh.journal.objects' storage = get_storage('memory', {}) producer = KafkaProducer( bootstrap_servers='localhost:{}'.format(port), key_serializer=key_to_kafka, value_serializer=value_to_kafka, client_id='test producer', ) now = datetime.datetime.now(tz=datetime.timezone.utc) # Fill Kafka nb_sent = 0 nb_visits = 0 for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): topic = kafka_prefix + '.' + object_type for object_ in objects: key = bytes(random.randint(0, 255) for _ in range(40)) object_ = object_.copy() if object_type == 'content': object_['ctime'] = now elif object_type == 'origin_visit': nb_visits += 1 object_['visit'] = nb_visits producer.send(topic, key=key, value=object_) nb_sent += 1 # Fill the storage from Kafka config = { 'brokers': 'localhost:%d' % kafka_server[1], 'consumer_id': 'replayer', 'topic_prefix': kafka_prefix, 'max_messages': nb_sent, } - replayer = StorageReplayer(**config, storage=storage) - nb_inserted = replayer.process() + replayer = JournalClient(**config) + worker_fn = functools.partial(process_replay_objects, storage=storage) + nb_inserted = 0 + while nb_inserted < nb_sent: + nb_inserted += replayer.process(worker_fn) assert nb_sent == nb_inserted # Check the objects were actually inserted in the storage assert OBJECT_TYPE_KEYS['revision'][1] == \ list(storage.revision_get( [rev['id'] for rev in OBJECT_TYPE_KEYS['revision'][1]])) assert OBJECT_TYPE_KEYS['release'][1] == \ list(storage.release_get( [rel['id'] for rel in OBJECT_TYPE_KEYS['release'][1]])) origins = list(storage.origin_get( [orig for orig in OBJECT_TYPE_KEYS['origin'][1]])) assert OBJECT_TYPE_KEYS['origin'][1] == \ [{'url': orig['url'], 'type': orig['type']} for orig in origins] for origin in origins: expected_visits = [ { **visit, 'origin': origin['id'], 'date': dateutil.parser.parse(visit['date']), } for visit in OBJECT_TYPE_KEYS['origin_visit'][1] if visit['origin']['url'] == origin['url'] and visit['origin']['type'] == origin['type'] ] actual_visits = list(storage.origin_visit_get(origin['id'])) for visit in actual_visits: del visit['visit'] # opaque identifier assert expected_visits == actual_visits contents = list(storage.content_get_metadata( [cont['sha1'] for cont in OBJECT_TYPE_KEYS['content'][1]])) assert None not in contents assert contents == OBJECT_TYPE_KEYS['content'][1] diff --git a/swh/journal/tests/test_write_replay.py b/swh/journal/tests/test_write_replay.py index cbb7bcd..6cf0249 100644 --- a/swh/journal/tests/test_write_replay.py +++ b/swh/journal/tests/test_write_replay.py @@ -1,153 +1,161 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import namedtuple +import functools from hypothesis import given, settings, HealthCheck from hypothesis.strategies import lists from swh.model.hypothesis_strategies import object_dicts from swh.storage.in_memory import Storage from swh.storage import HashCollision +from swh.journal.client import JournalClient, ACCEPTED_OBJECT_TYPES +from swh.journal.direct_writer import DirectKafkaWriter +from swh.journal.replay import process_replay_objects from swh.journal.serializers import ( key_to_kafka, kafka_to_key, value_to_kafka, kafka_to_value) -from swh.journal.direct_writer import DirectKafkaWriter -from swh.journal.replay import StorageReplayer, OBJECT_TYPES FakeKafkaMessage = namedtuple('FakeKafkaMessage', 'key value') FakeKafkaPartition = namedtuple('FakeKafkaPartition', 'topic') class MockedDirectKafkaWriter(DirectKafkaWriter): def __init__(self): self._prefix = 'prefix' -class MockedStorageReplayer(StorageReplayer): - def __init__(self, storage, max_messages, object_types=OBJECT_TYPES): - self.storage = storage - self.max_messages = max_messages +class MockedJournalClient(JournalClient): + def __init__(self, object_types=ACCEPTED_OBJECT_TYPES): self._object_types = object_types @given(lists(object_dicts(), min_size=1)) @settings(suppress_health_check=[HealthCheck.too_slow]) def test_write_replay_same_order(objects): committed = False queue = [] def send(topic, key, value): key = kafka_to_key(key_to_kafka(key)) value = kafka_to_value(value_to_kafka(value)) queue.append({ FakeKafkaPartition(topic): [FakeKafkaMessage(key=key, value=value)] }) def poll(): return queue.pop(0) def commit(): nonlocal committed if queue == []: committed = True storage1 = Storage() storage1.journal_writer = MockedDirectKafkaWriter() storage1.journal_writer.send = send for (obj_type, obj) in objects: obj = obj.copy() if obj_type == 'origin_visit': origin_id = storage1.origin_add_one(obj.pop('origin')) if 'visit' in obj: del obj['visit'] storage1.origin_visit_add(origin_id, **obj) else: method = getattr(storage1, obj_type + '_add') try: method([obj]) except HashCollision: pass storage2 = Storage() - replayer = MockedStorageReplayer(storage2, max_messages=len(queue)) + worker_fn = functools.partial(process_replay_objects, storage=storage2) + replayer = MockedJournalClient() replayer.poll = poll replayer.commit = commit - replayer.process() + queue_size = len(queue) + nb_messages = 0 + while nb_messages < queue_size: + nb_messages += replayer.process(worker_fn) + assert nb_messages == queue_size assert committed for attr_name in ('_contents', '_directories', '_revisions', '_releases', '_snapshots', '_origin_visits', '_origins'): assert getattr(storage1, attr_name) == getattr(storage2, attr_name), \ attr_name @given(lists(object_dicts(), min_size=1)) @settings(suppress_health_check=[HealthCheck.too_slow]) def test_write_replay_same_order_batches(objects): committed = False queue = [] def send(topic, key, value): key = kafka_to_key(key_to_kafka(key)) value = kafka_to_value(value_to_kafka(value)) partition = FakeKafkaPartition(topic) msg = FakeKafkaMessage(key=key, value=value) if queue and {partition} == set(queue[-1]): # The last message is of the same object type, groupping them queue[-1][partition].append(msg) else: queue.append({ FakeKafkaPartition(topic): [msg] }) def poll(): return queue.pop(0) def commit(): nonlocal committed if queue == []: committed = True storage1 = Storage() storage1.journal_writer = MockedDirectKafkaWriter() storage1.journal_writer.send = send for (obj_type, obj) in objects: obj = obj.copy() if obj_type == 'origin_visit': origin_id = storage1.origin_add_one(obj.pop('origin')) if 'visit' in obj: del obj['visit'] storage1.origin_visit_add(origin_id, **obj) else: method = getattr(storage1, obj_type + '_add') try: method([obj]) except HashCollision: pass queue_size = sum(len(partition) for batch in queue for partition in batch.values()) storage2 = Storage() - replayer = MockedStorageReplayer(storage2, max_messages=queue_size) + worker_fn = functools.partial(process_replay_objects, storage=storage2) + replayer = MockedJournalClient() replayer.poll = poll replayer.commit = commit - replayer.process() + nb_messages = 0 + while nb_messages < queue_size: + nb_messages += replayer.process(worker_fn) assert committed for attr_name in ('_contents', '_directories', '_revisions', '_releases', '_snapshots', '_origin_visits', '_origins'): assert getattr(storage1, attr_name) == getattr(storage2, attr_name), \ attr_name # TODO: add test for hash collision