diff --git a/swh/journal/client.py b/swh/journal/client.py index 56c6519..473fb0e 100644 --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -1,108 +1,109 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from kafka import KafkaConsumer import logging +from kafka import KafkaConsumer + from .serializers import kafka_to_key, kafka_to_value from swh.journal import DEFAULT_PREFIX logger = logging.getLogger(__name__) # Only accepted offset reset policy accepted ACCEPTED_OFFSET_RESET = ['earliest', 'latest'] # Only accepted object types ACCEPTED_OBJECT_TYPES = [ 'content', 'directory', 'revision', 'release', 'snapshot', 'origin', 'origin_visit' ] class JournalClient: """A base client for the Software Heritage journal. The current implementation of the journal uses Apache Kafka brokers to publish messages under a given topic prefix, with each object type using a specific topic under that prefix. If the 'prefix' argument is None (default value), it will take the default value 'swh.journal.objects'. Clients subscribe to events specific to each object type as listed in the `object_types` argument (if unset, defaults to all accepted objet types). Clients can be sharded by setting the `group_id` to a common value across instances. The journal will share the message throughput across the nodes sharing the same group_id. Messages are processed by the `process_objects` method in batches of maximum `max_messages`. Any other named argument is passed directly to KafkaConsumer(). """ def __init__( self, brokers, group_id, prefix=None, object_types=None, max_messages=0, auto_offset_reset='earliest', **kwargs): if prefix is None: prefix = DEFAULT_PREFIX if object_types is None: object_types = ACCEPTED_OBJECT_TYPES if auto_offset_reset not in ACCEPTED_OFFSET_RESET: raise ValueError( 'Option \'auto_offset_reset\' only accept %s.' % ACCEPTED_OFFSET_RESET) for object_type in object_types: if object_type not in ACCEPTED_OBJECT_TYPES: raise ValueError( 'Option \'object_types\' only accepts %s.' % ACCEPTED_OFFSET_RESET) self.consumer = KafkaConsumer( bootstrap_servers=brokers, key_deserializer=kafka_to_key, value_deserializer=kafka_to_value, auto_offset_reset=auto_offset_reset, enable_auto_commit=False, group_id=group_id, **kwargs) self.consumer.subscribe( topics=['%s.%s' % (prefix, object_type) for object_type in object_types], ) self.max_messages = max_messages self._object_types = object_types def process(self, worker_fn): """Polls Kafka for a batch of messages, and calls the worker_fn with these messages. Args: worker_fn Callable[Dict[str, List[dict]]]: Function called with the messages as argument. """ nb_messages = 0 polled = self.consumer.poll() for (partition, messages) in polled.items(): object_type = partition.topic.split('.')[-1] # Got a message from a topic we did not subscribe to. assert object_type in self._object_types, object_type worker_fn({object_type: [msg.value for msg in messages]}) nb_messages += len(messages) self.consumer.commit() return nb_messages diff --git a/swh/journal/replay.py b/swh/journal/replay.py index f5cc762..bc44939 100644 --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -1,90 +1,99 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from time import time import logging from concurrent.futures import ThreadPoolExecutor from swh.storage import HashCollision from swh.model.hashutil import hash_to_hex from swh.objstorage.objstorage import ID_HASH_ALGO from swh.core.statsd import statsd logger = logging.getLogger(__name__) def process_replay_objects(all_objects, *, storage): for (object_type, objects) in all_objects.items(): _insert_objects(object_type, objects, storage) def _insert_objects(object_type, objects, storage): if object_type == 'content': # TODO: insert 'content' in batches for object_ in objects: try: storage.content_add_metadata([object_]) except HashCollision as e: logger.error('Hash collision: %s', e.args) elif object_type in ('directory', 'revision', 'release', 'snapshot', 'origin'): # TODO: split batches that are too large for the storage # to handle? method = getattr(storage, object_type + '_add') method(objects) elif object_type == 'origin_visit': - storage.origin_visit_upsert([ - { - **obj, - 'origin': storage.origin_add_one(obj['origin']) - } - for obj in objects]) + for visit in objects: + if isinstance(visit['origin'], str): + # old format; note that it will crash with the pg and + # in-mem storages if the origin is not already known, + # but there is no other choice because we can't add an + # origin without knowing its type. Non-pg storages + # don't use a numeric FK internally, + visit['origin'] = {'url': visit['origin']} + else: + storage.origin_add_one(visit['origin']) + if 'type' not in visit: + # old format + visit['type'] = visit['origin']['type'] + + storage.origin_visit_upsert(objects) else: logger.warning('Received a series of %s, this should not happen', object_type) def copy_object(obj_id, src, dst): statsd_name = 'swh_journal_content_replayer_%s_duration_seconds' try: with statsd.timed(statsd_name % 'get'): obj = src.get(obj_id) with statsd.timed(statsd_name % 'put'): dst.add(obj, obj_id=obj_id, check_presence=False) logger.debug('copied %s', hash_to_hex(obj_id)) statsd.increment( 'swh_journal_content_replayer_bytes_total', len(obj)) except Exception: obj = '' logger.exception('Failed to copy %s', hash_to_hex(obj_id)) return len(obj) def process_replay_objects_content(all_objects, *, src, dst, concurrency=8): vol = [] t0 = time() with ThreadPoolExecutor(max_workers=concurrency) as executor: for (object_type, objects) in all_objects.items(): if object_type != 'content': logger.warning( 'Received a series of %s, this should not happen', object_type) continue for obj in objects: obj_id = obj[ID_HASH_ALGO] if obj['status'] == 'visible': fut = executor.submit(copy_object, obj_id, src, dst) fut.add_done_callback(lambda fn: vol.append(fn.result())) else: logger.debug('skipped %s (%s)', hash_to_hex(obj_id), obj['status']) dt = time() - t0 logger.info( 'processed %s content objects in %.1fsec ' '(%.1f obj/sec, %.1fMB/sec) - %s failures', len(vol), dt, len(vol)/dt, sum(vol)/1024/1024/dt, len([x for x in vol if not x])) diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py index 9499c99..c4ab9c5 100644 --- a/swh/journal/tests/test_replay.py +++ b/swh/journal/tests/test_replay.py @@ -1,102 +1,198 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import functools import random from subprocess import Popen from typing import Tuple import dateutil from kafka import KafkaProducer from swh.storage import get_storage +from swh.storage.in_memory import ENABLE_ORIGIN_IDS from swh.journal.client import JournalClient from swh.journal.serializers import key_to_kafka, value_to_kafka from swh.journal.replay import process_replay_objects from .conftest import OBJECT_TYPE_KEYS +from .utils import MockedJournalClient, MockedKafkaWriter def test_storage_play( kafka_prefix: str, kafka_server: Tuple[Popen, int]): (_, port) = kafka_server kafka_prefix += '.swh.journal.objects' storage = get_storage('memory', {}) producer = KafkaProducer( bootstrap_servers='localhost:{}'.format(port), key_serializer=key_to_kafka, value_serializer=value_to_kafka, client_id='test producer', ) now = datetime.datetime.now(tz=datetime.timezone.utc) # Fill Kafka nb_sent = 0 nb_visits = 0 for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): topic = kafka_prefix + '.' + object_type for object_ in objects: key = bytes(random.randint(0, 255) for _ in range(40)) object_ = object_.copy() if object_type == 'content': object_['ctime'] = now elif object_type == 'origin_visit': nb_visits += 1 object_['visit'] = nb_visits producer.send(topic, key=key, value=object_) nb_sent += 1 # Fill the storage from Kafka config = { 'brokers': 'localhost:%d' % kafka_server[1], 'group_id': 'replayer', 'prefix': kafka_prefix, 'max_messages': nb_sent, } replayer = JournalClient(**config) worker_fn = functools.partial(process_replay_objects, storage=storage) nb_inserted = 0 while nb_inserted < nb_sent: nb_inserted += replayer.process(worker_fn) assert nb_sent == nb_inserted # Check the objects were actually inserted in the storage assert OBJECT_TYPE_KEYS['revision'][1] == \ list(storage.revision_get( [rev['id'] for rev in OBJECT_TYPE_KEYS['revision'][1]])) assert OBJECT_TYPE_KEYS['release'][1] == \ list(storage.release_get( [rel['id'] for rel in OBJECT_TYPE_KEYS['release'][1]])) origins = list(storage.origin_get( [orig for orig in OBJECT_TYPE_KEYS['origin'][1]])) assert OBJECT_TYPE_KEYS['origin'][1] == \ [{'url': orig['url'], 'type': orig['type']} for orig in origins] for origin in origins: expected_visits = [ { **visit, 'origin': origin['id'], 'date': dateutil.parser.parse(visit['date']), } for visit in OBJECT_TYPE_KEYS['origin_visit'][1] if visit['origin']['url'] == origin['url'] and visit['origin']['type'] == origin['type'] ] actual_visits = list(storage.origin_visit_get(origin['id'])) for visit in actual_visits: del visit['visit'] # opaque identifier assert expected_visits == actual_visits contents = list(storage.content_get_metadata( [cont['sha1'] for cont in OBJECT_TYPE_KEYS['content'][1]])) assert None not in contents assert contents == OBJECT_TYPE_KEYS['content'][1] + + +def test_write_replay_legacy_origin_visit1(): + """Test origin_visit when the 'origin' is just a string.""" + queue = [] + replayer = MockedJournalClient(queue) + writer = MockedKafkaWriter(queue) + + # Note that flipping the order of these two insertions will crash + # the test, because the legacy origin_format does not allow to create + # the origin when needed (type is missing) + now = datetime.datetime.now() + writer.send('origin', 'foo', { + 'url': 'http://example.com/', + 'type': 'git', + }) + writer.send('origin_visit', 'foo', { + 'visit': 1, + 'origin': 'http://example.com/', + 'date': now, + }) + + queue_size = sum(len(partition) + for batch in queue + for partition in batch.values()) + + storage = get_storage('memory', {}) + worker_fn = functools.partial(process_replay_objects, storage=storage) + nb_messages = 0 + while nb_messages < queue_size: + nb_messages += replayer.process(worker_fn) + + visits = list(storage.origin_visit_get('http://example.com/')) + + if ENABLE_ORIGIN_IDS: + assert visits == [{ + 'visit': 1, + 'origin': 1, + 'date': now, + }] + else: + assert visits == [{ + 'visit': 1, + 'origin': {'url': 'http://example.com/'}, + 'date': now, + }] + + +def test_write_replay_legacy_origin_visit2(): + """Test origin_visit when 'type' is missing.""" + queue = [] + replayer = MockedJournalClient(queue) + writer = MockedKafkaWriter(queue) + + now = datetime.datetime.now() + writer.send('origin', 'foo', { + 'url': 'http://example.com/', + 'type': 'git', + }) + writer.send('origin_visit', 'foo', { + 'visit': 1, + 'origin': { + 'url': 'http://example.com/', + 'type': 'git', + }, + 'date': now, + }) + + queue_size = sum(len(partition) + for batch in queue + for partition in batch.values()) + + storage = get_storage('memory', {}) + worker_fn = functools.partial(process_replay_objects, storage=storage) + nb_messages = 0 + while nb_messages < queue_size: + nb_messages += replayer.process(worker_fn) + + visits = list(storage.origin_visit_get('http://example.com/')) + + if ENABLE_ORIGIN_IDS: + assert visits == [{ + 'visit': 1, + 'origin': 1, + 'date': now, + 'type': 'git', + }] + else: + assert visits == [{ + 'visit': 1, + 'origin': {'url': 'http://example.com/'}, + 'date': now, + 'type': 'git', + }] diff --git a/swh/journal/tests/test_write_replay.py b/swh/journal/tests/test_write_replay.py index 86e5e49..caee858 100644 --- a/swh/journal/tests/test_write_replay.py +++ b/swh/journal/tests/test_write_replay.py @@ -1,133 +1,91 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from collections import namedtuple import functools from hypothesis import given, settings, HealthCheck from hypothesis.strategies import lists from swh.model.hypothesis_strategies import object_dicts from swh.storage.in_memory import Storage from swh.storage import HashCollision -from swh.journal.client import JournalClient, ACCEPTED_OBJECT_TYPES -from swh.journal.direct_writer import DirectKafkaWriter from swh.journal.replay import process_replay_objects from swh.journal.replay import process_replay_objects_content -from swh.journal.serializers import ( - key_to_kafka, kafka_to_key, value_to_kafka, kafka_to_value) - -FakeKafkaMessage = namedtuple('FakeKafkaMessage', 'key value') -FakeKafkaPartition = namedtuple('FakeKafkaPartition', 'topic') - - -class MockedKafkaWriter(DirectKafkaWriter): - def __init__(self, queue): - self._prefix = 'prefix' - self.queue = queue - - def send(self, topic, key, value): - key = kafka_to_key(key_to_kafka(key)) - value = kafka_to_value(value_to_kafka(value)) - partition = FakeKafkaPartition(topic) - msg = FakeKafkaMessage(key=key, value=value) - if self.queue and {partition} == set(self.queue[-1]): - # The last message is of the same object type, groupping them - self.queue[-1][partition].append(msg) - else: - self.queue.append({partition: [msg]}) - - -class MockedKafkaConsumer: - def __init__(self, queue): - self.queue = queue - self.committed = False - - def poll(self): - return self.queue.pop(0) - - def commit(self): - if self.queue == []: - self.committed = True - -class MockedJournalClient(JournalClient): - def __init__(self, queue, object_types=ACCEPTED_OBJECT_TYPES): - self._object_types = object_types - self.consumer = MockedKafkaConsumer(queue) +from .utils import MockedJournalClient, MockedKafkaWriter @given(lists(object_dicts(), min_size=1)) @settings(suppress_health_check=[HealthCheck.too_slow]) def test_write_replay_same_order_batches(objects): queue = [] replayer = MockedJournalClient(queue) storage1 = Storage() storage1.journal_writer = MockedKafkaWriter(queue) for (obj_type, obj) in objects: obj = obj.copy() if obj_type == 'origin_visit': origin_id = storage1.origin_add_one(obj.pop('origin')) if 'visit' in obj: del obj['visit'] storage1.origin_visit_add(origin_id, **obj) else: method = getattr(storage1, obj_type + '_add') try: method([obj]) except HashCollision: pass queue_size = sum(len(partition) for batch in queue for partition in batch.values()) storage2 = Storage() worker_fn = functools.partial(process_replay_objects, storage=storage2) nb_messages = 0 while nb_messages < queue_size: nb_messages += replayer.process(worker_fn) assert replayer.consumer.committed for attr_name in ('_contents', '_directories', '_revisions', '_releases', '_snapshots', '_origin_visits', '_origins'): assert getattr(storage1, attr_name) == getattr(storage2, attr_name), \ attr_name # TODO: add test for hash collision @given(lists(object_dicts(), min_size=1)) @settings(suppress_health_check=[HealthCheck.too_slow]) def test_write_replay_content(objects): queue = [] replayer = MockedJournalClient(queue) storage1 = Storage() storage1.journal_writer = MockedKafkaWriter(queue) for (obj_type, obj) in objects: obj = obj.copy() if obj_type == 'content': storage1.content_add([obj]) queue_size = sum(len(partition) for batch in queue for partition in batch.values()) storage2 = Storage() worker_fn = functools.partial(process_replay_objects_content, src=storage1.objstorage, dst=storage2.objstorage) nb_messages = 0 while nb_messages < queue_size: nb_messages += replayer.process(worker_fn) assert storage1.objstorage.state == storage2.objstorage.state diff --git a/swh/journal/tests/utils.py b/swh/journal/tests/utils.py new file mode 100644 index 0000000..4812607 --- /dev/null +++ b/swh/journal/tests/utils.py @@ -0,0 +1,45 @@ +from collections import namedtuple + +from swh.journal.client import JournalClient, ACCEPTED_OBJECT_TYPES +from swh.journal.direct_writer import DirectKafkaWriter +from swh.journal.serializers import ( + key_to_kafka, kafka_to_key, value_to_kafka, kafka_to_value) + +FakeKafkaMessage = namedtuple('FakeKafkaMessage', 'key value') +FakeKafkaPartition = namedtuple('FakeKafkaPartition', 'topic') + + +class MockedKafkaWriter(DirectKafkaWriter): + def __init__(self, queue): + self._prefix = 'prefix' + self.queue = queue + + def send(self, topic, key, value): + key = kafka_to_key(key_to_kafka(key)) + value = kafka_to_value(value_to_kafka(value)) + partition = FakeKafkaPartition(topic) + msg = FakeKafkaMessage(key=key, value=value) + if self.queue and {partition} == set(self.queue[-1]): + # The last message is of the same object type, groupping them + self.queue[-1][partition].append(msg) + else: + self.queue.append({partition: [msg]}) + + +class MockedKafkaConsumer: + def __init__(self, queue): + self.queue = queue + self.committed = False + + def poll(self): + return self.queue.pop(0) + + def commit(self): + if self.queue == []: + self.committed = True + + +class MockedJournalClient(JournalClient): + def __init__(self, queue, object_types=ACCEPTED_OBJECT_TYPES): + self._object_types = object_types + self.consumer = MockedKafkaConsumer(queue)