diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,3 +1,3 @@ swh.core[db,http] >= 0.0.60 swh.model >= 0.0.60 -swh.storage >= 0.0.172 +swh.storage >= 0.0.177 diff --git a/swh/journal/replay.py b/swh/journal/replay.py --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -6,7 +6,7 @@ import copy import logging from time import time -from typing import Callable, Dict, List, Optional +from typing import Any, Callable, Dict, Iterable, List, Optional from sentry_sdk import capture_exception, push_scope try: @@ -22,7 +22,7 @@ from swh.core.statsd import statsd from swh.model.identifiers import normalize_timestamp from swh.model.hashutil import hash_to_hex -from swh.model.model import SHA1_SIZE +from swh.model.model import BaseContent, SkippedContent, SHA1_SIZE from swh.objstorage.objstorage import ( ID_HASH_ALGO, ObjNotFoundError, ObjStorage, ) @@ -237,20 +237,60 @@ return objects +def collision_aware_content_add( + content_add_fn: Callable[[Iterable[Any]], None], + contents: List[BaseContent]) -> None: + """Add contents to storage. If a hash collision is detected, an error is + logged. Then this adds the other non colliding contents to the storage. + + Args: + content_add_fn: Storage content callable + contents: List of contents or skipped contents to add to storage + + """ + if not contents: + return + colliding_content_hashes: List[Dict[str, Any]] = [] + while True: + try: + content_add_fn(c.to_dict() for c in contents) + except HashCollision as e: + algo, hash_id, colliding_hashes = e.args + hash_id = hash_to_hex(hash_id) + colliding_content_hashes.append({ + 'algo': algo, + 'hash': hash_to_hex(hash_id), + 'objects': [{k: hash_to_hex(v) for k, v in collision.items()} + for collision in colliding_hashes] + }) + # Drop the colliding contents from the transaction + contents = [c for c in contents + if c.hashes() not in colliding_hashes] + else: + # Successfully added contents, we are done + break + if colliding_content_hashes: + for collision in colliding_content_hashes: + logger.error('Collision detected: %(collision)s', { + 'collision': collision + }) + + def _insert_objects(object_type, objects, storage): objects = fix_objects(object_type, objects) if object_type == 'content': - try: - storage.skipped_content_add( - (obj for obj in objects if obj.get('status') == 'absent')) - except HashCollision as e: - logger.error('(SkippedContent) Hash collision: %s', e.args) + contents, skipped_contents = [], [] + for content in objects: + c = BaseContent.from_dict(content) + if isinstance(c, SkippedContent): + skipped_contents.append(c) + else: + contents.append(c) - try: - storage.content_add_metadata( - (obj for obj in objects if obj.get('status') != 'absent')) - except HashCollision as e: - logger.error('(Content) Hash collision: %s', e.args) + collision_aware_content_add( + storage.skipped_content_add, skipped_contents) + collision_aware_content_add( + storage.content_add_metadata, contents) elif object_type in ('directory', 'revision', 'release', 'snapshot', 'origin'): diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py --- a/swh/journal/tests/conftest.py +++ b/swh/journal/tests/conftest.py @@ -23,7 +23,6 @@ logger = logging.getLogger(__name__) - CONTENTS = [ { 'length': 3, @@ -36,6 +35,26 @@ }, ] +duplicate_content1 = { + 'length': 4, + 'sha1': hash_to_bytes( + '44973274ccef6ab4dfaaf86599792fa9c3fe4689'), + 'sha1_git': b'another-foo', + 'blake2s256': b'another-bar', + 'sha256': b'another-baz', + 'status': 'visible', +} + +# Craft a sha1 collision +duplicate_content2 = duplicate_content1.copy() +sha1_array = bytearray(duplicate_content1['sha1_git']) +sha1_array[0] += 1 +duplicate_content2['sha1_git'] = bytes(sha1_array) + + +DUPLICATE_CONTENTS = [duplicate_content1, duplicate_content2] + + COMMITTERS = [ { 'fullname': b'foo', diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py --- a/swh/journal/tests/test_replay.py +++ b/swh/journal/tests/test_replay.py @@ -1,13 +1,14 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import functools +import logging import random from subprocess import Popen -from typing import Tuple +from typing import Dict, Tuple import dateutil from confluent_kafka import Producer @@ -19,8 +20,10 @@ from swh.journal.client import JournalClient from swh.journal.serializers import key_to_kafka, value_to_kafka from swh.journal.replay import process_replay_objects, is_hash_in_bytearray +from swh.model.hashutil import hash_to_hex +from swh.model.model import Content -from .conftest import OBJECT_TYPE_KEYS +from .conftest import OBJECT_TYPE_KEYS, DUPLICATE_CONTENTS from .utils import MockedJournalClient, MockedKafkaWriter @@ -33,10 +36,22 @@ } +def make_topic(kafka_prefix: str, object_type: str) -> str: + return kafka_prefix + '.' + object_type + + def test_storage_play( kafka_prefix: str, kafka_consumer_group: str, - kafka_server: Tuple[Popen, int]): + kafka_server: Tuple[Popen, int], + caplog): + """Optimal replayer scenario. + + This: + - writes objects to the topic + - replayer consumes objects from the topic and replay them + + """ (_, port) = kafka_server kafka_prefix += '.swh.journal.objects' @@ -54,7 +69,7 @@ nb_sent = 0 nb_visits = 0 for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): - topic = kafka_prefix + '.' + object_type + topic = make_topic(kafka_prefix, object_type) for object_ in objects: key = bytes(random.randint(0, 255) for _ in range(40)) object_ = object_.copy() @@ -71,6 +86,7 @@ producer.flush() + caplog.set_level(logging.ERROR, 'swh.journal.replay') # Fill the storage from Kafka replayer = JournalClient( brokers='localhost:%d' % kafka_server[1], @@ -119,6 +135,147 @@ assert len(contents) == len(input_contents) assert contents == {cont['sha1']: [cont] for cont in input_contents} + collision = 0 + for record in caplog.records: + logtext = record.getMessage() + if 'Colliding contents:' in logtext: + collision += 1 + + assert collision == 0, "No collision should be detected" + + +def test_storage_play_with_collision( + kafka_prefix: str, + kafka_consumer_group: str, + kafka_server: Tuple[Popen, int], + caplog): + """Another replayer scenario with collisions. + + This: + - writes objects to the topic, including colliding contents + - replayer consumes objects from the topic and replay them + - This drops the colliding contents from the replay when detected + + """ + (_, port) = kafka_server + kafka_prefix += '.swh.journal.objects' + + storage = get_storage(**storage_config) + + producer = Producer({ + 'bootstrap.servers': 'localhost:{}'.format(port), + 'client.id': 'test producer', + 'enable.idempotence': 'true', + }) + + now = datetime.datetime.now(tz=datetime.timezone.utc) + + # Fill Kafka + nb_sent = 0 + nb_visits = 0 + for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): + topic = make_topic(kafka_prefix, object_type) + for object_ in objects: + key = bytes(random.randint(0, 255) for _ in range(40)) + object_ = object_.copy() + if object_type == 'content': + object_['ctime'] = now + elif object_type == 'origin_visit': + nb_visits += 1 + object_['visit'] = nb_visits + producer.produce( + topic=topic, key=key_to_kafka(key), + value=value_to_kafka(object_), + ) + nb_sent += 1 + + # Create collision in input data + # They are not written in the destination + for content in DUPLICATE_CONTENTS: + topic = make_topic(kafka_prefix, 'content') + producer.produce( + topic=topic, key=key_to_kafka(key), + value=value_to_kafka(content), + ) + + nb_sent += 1 + + producer.flush() + + caplog.set_level(logging.ERROR, 'swh.journal.replay') + # Fill the storage from Kafka + replayer = JournalClient( + brokers='localhost:%d' % kafka_server[1], + group_id=kafka_consumer_group, + prefix=kafka_prefix, + stop_after_objects=nb_sent, + ) + worker_fn = functools.partial(process_replay_objects, storage=storage) + nb_inserted = 0 + while nb_inserted < nb_sent: + nb_inserted += replayer.process(worker_fn) + assert nb_sent == nb_inserted + + # Check the objects were actually inserted in the storage + assert OBJECT_TYPE_KEYS['revision'][1] == \ + list(storage.revision_get( + [rev['id'] for rev in OBJECT_TYPE_KEYS['revision'][1]])) + assert OBJECT_TYPE_KEYS['release'][1] == \ + list(storage.release_get( + [rel['id'] for rel in OBJECT_TYPE_KEYS['release'][1]])) + + origins = list(storage.origin_get( + [orig for orig in OBJECT_TYPE_KEYS['origin'][1]])) + assert OBJECT_TYPE_KEYS['origin'][1] == \ + [{'url': orig['url']} for orig in origins] + for origin in origins: + origin_url = origin['url'] + expected_visits = [ + { + **visit, + 'origin': origin_url, + 'date': dateutil.parser.parse(visit['date']), + } + for visit in OBJECT_TYPE_KEYS['origin_visit'][1] + if visit['origin'] == origin['url'] + ] + actual_visits = list(storage.origin_visit_get( + origin_url)) + for visit in actual_visits: + del visit['visit'] # opaque identifier + assert expected_visits == actual_visits + + input_contents = OBJECT_TYPE_KEYS['content'][1] + contents = storage.content_get_metadata( + [cont['sha1'] for cont in input_contents]) + assert len(contents) == len(input_contents) + assert contents == {cont['sha1']: [cont] for cont in input_contents} + + nb_collisions = 0 + + actual_collision: Dict + for record in caplog.records: + logtext = record.getMessage() + if 'Collision detected:' in logtext: + nb_collisions += 1 + actual_collision = record.args['collision'] + + assert nb_collisions == 1, "1 collision should be detected" + + algo = 'sha1' + assert actual_collision['algo'] == algo + expected_colliding_hash = hash_to_hex(DUPLICATE_CONTENTS[0][algo]) + assert actual_collision['hash'] == expected_colliding_hash + + actual_colliding_hashes = actual_collision['objects'] + assert len(actual_colliding_hashes) == len(DUPLICATE_CONTENTS) + for content in DUPLICATE_CONTENTS: + expected_content_hashes = { + k: hash_to_hex(v) + for k, v in Content.from_dict(content).hashes().items() + } + assert expected_content_hashes in actual_colliding_hashes + def _test_write_replay_origin_visit(visits): """Helper function to write tests for origin_visit. diff --git a/tox.ini b/tox.ini --- a/tox.ini +++ b/tox.ini @@ -7,6 +7,7 @@ testing deps = pytest-cov + dev: pdbpp setenv = SWH_KAFKA_ROOT = {env:SWH_KAFKA_ROOT:swh/journal/tests/kafka} commands =