diff --git a/swh/journal/__init__.py b/swh/journal/__init__.py index e69de29..3dd2c0c 100644 --- a/swh/journal/__init__.py +++ b/swh/journal/__init__.py @@ -0,0 +1,7 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +# the default prefix for kafka's topics +DEFAULT_PREFIX = 'swh.journal.objects' diff --git a/swh/journal/client.py b/swh/journal/client.py index b0e0b0c..b7f63ff 100644 --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -1,102 +1,102 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from kafka import KafkaConsumer import logging from .serializers import kafka_to_key, kafka_to_value - +from swh.journal import DEFAULT_PREFIX logger = logging.getLogger(__name__) # Only accepted offset reset policy accepted ACCEPTED_OFFSET_RESET = ['earliest', 'latest'] # Only accepted object types ACCEPTED_OBJECT_TYPES = [ 'content', 'directory', 'revision', 'release', 'snapshot', 'origin', 'origin_visit' ] class JournalClient: """A base client for the Software Heritage journal. The current implementation of the journal uses Apache Kafka brokers to publish messages under a given topic prefix, with each object type using a specific topic under that prefix. Clients subscribe to events specific to each object type by using the `object_types` configuration variable. Clients can be sharded by setting the `client_id` to a common value across instances. The journal will share the message throughput across the nodes sharing the same client_id. Messages are processed by the `process_objects` method in batches of maximum `max_messages`. """ def __init__( - self, brokers, topic_prefix, consumer_id, + self, brokers, group_id, prefix=DEFAULT_PREFIX, object_types=ACCEPTED_OBJECT_TYPES, max_messages=0, auto_offset_reset='earliest'): if auto_offset_reset not in ACCEPTED_OFFSET_RESET: raise ValueError( 'Option \'auto_offset_reset\' only accept %s.' % ACCEPTED_OFFSET_RESET) for object_type in object_types: if object_type not in ACCEPTED_OBJECT_TYPES: raise ValueError( 'Option \'object_types\' only accepts %s.' % ACCEPTED_OFFSET_RESET) self.consumer = KafkaConsumer( bootstrap_servers=brokers, key_deserializer=kafka_to_key, value_deserializer=kafka_to_value, auto_offset_reset=auto_offset_reset, enable_auto_commit=False, - group_id=consumer_id, + group_id=group_id, ) self.consumer.subscribe( - topics=['%s.%s' % (topic_prefix, object_type) + topics=['%s.%s' % (prefix, object_type) for object_type in object_types], ) self.max_messages = max_messages self._object_types = object_types def process(self, worker_fn): """Polls Kafka for a batch of messages, and calls the worker_fn with these messages. Args: worker_fn Callable[Dict[str, List[dict]]]: Function called with the messages as argument. """ nb_messages = 0 polled = self.consumer.poll() for (partition, messages) in polled.items(): object_type = partition.topic.split('.')[-1] # Got a message from a topic we did not subscribe to. assert object_type in self._object_types, object_type worker_fn({object_type: [msg.value for msg in messages]}) nb_messages += len(messages) self.consumer.commit() return nb_messages diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py index e5ab97f..9499c99 100644 --- a/swh/journal/tests/test_replay.py +++ b/swh/journal/tests/test_replay.py @@ -1,102 +1,102 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import functools import random from subprocess import Popen from typing import Tuple import dateutil from kafka import KafkaProducer from swh.storage import get_storage from swh.journal.client import JournalClient from swh.journal.serializers import key_to_kafka, value_to_kafka from swh.journal.replay import process_replay_objects from .conftest import OBJECT_TYPE_KEYS def test_storage_play( kafka_prefix: str, kafka_server: Tuple[Popen, int]): (_, port) = kafka_server kafka_prefix += '.swh.journal.objects' storage = get_storage('memory', {}) producer = KafkaProducer( bootstrap_servers='localhost:{}'.format(port), key_serializer=key_to_kafka, value_serializer=value_to_kafka, client_id='test producer', ) now = datetime.datetime.now(tz=datetime.timezone.utc) # Fill Kafka nb_sent = 0 nb_visits = 0 for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): topic = kafka_prefix + '.' + object_type for object_ in objects: key = bytes(random.randint(0, 255) for _ in range(40)) object_ = object_.copy() if object_type == 'content': object_['ctime'] = now elif object_type == 'origin_visit': nb_visits += 1 object_['visit'] = nb_visits producer.send(topic, key=key, value=object_) nb_sent += 1 # Fill the storage from Kafka config = { 'brokers': 'localhost:%d' % kafka_server[1], - 'consumer_id': 'replayer', - 'topic_prefix': kafka_prefix, + 'group_id': 'replayer', + 'prefix': kafka_prefix, 'max_messages': nb_sent, } replayer = JournalClient(**config) worker_fn = functools.partial(process_replay_objects, storage=storage) nb_inserted = 0 while nb_inserted < nb_sent: nb_inserted += replayer.process(worker_fn) assert nb_sent == nb_inserted # Check the objects were actually inserted in the storage assert OBJECT_TYPE_KEYS['revision'][1] == \ list(storage.revision_get( [rev['id'] for rev in OBJECT_TYPE_KEYS['revision'][1]])) assert OBJECT_TYPE_KEYS['release'][1] == \ list(storage.release_get( [rel['id'] for rel in OBJECT_TYPE_KEYS['release'][1]])) origins = list(storage.origin_get( [orig for orig in OBJECT_TYPE_KEYS['origin'][1]])) assert OBJECT_TYPE_KEYS['origin'][1] == \ [{'url': orig['url'], 'type': orig['type']} for orig in origins] for origin in origins: expected_visits = [ { **visit, 'origin': origin['id'], 'date': dateutil.parser.parse(visit['date']), } for visit in OBJECT_TYPE_KEYS['origin_visit'][1] if visit['origin']['url'] == origin['url'] and visit['origin']['type'] == origin['type'] ] actual_visits = list(storage.origin_visit_get(origin['id'])) for visit in actual_visits: del visit['visit'] # opaque identifier assert expected_visits == actual_visits contents = list(storage.content_get_metadata( [cont['sha1'] for cont in OBJECT_TYPE_KEYS['content'][1]])) assert None not in contents assert contents == OBJECT_TYPE_KEYS['content'][1]