diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,3 +1,3 @@ swh.core[db,http] >= 0.0.60 swh.model >= 0.0.60 -swh.storage >= 0.0.172 +swh.storage >= 0.0.177 diff --git a/swh/journal/replay.py b/swh/journal/replay.py --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -6,7 +6,7 @@ import copy import logging from time import time -from typing import Callable, Dict, List, Optional +from typing import Any, Callable, Dict, Iterable, List, Optional from sentry_sdk import capture_exception, push_scope try: @@ -22,7 +22,7 @@ from swh.core.statsd import statsd from swh.model.identifiers import normalize_timestamp from swh.model.hashutil import hash_to_hex -from swh.model.model import SHA1_SIZE +from swh.model.model import BaseContent, Content, SkippedContent, SHA1_SIZE from swh.objstorage.objstorage import ( ID_HASH_ALGO, ObjNotFoundError, ObjStorage, ) @@ -237,21 +237,35 @@ return objects +def content_add(content_add_fn: Callable[[Iterable[Any]], None], + convert_fn: Callable[[Dict], BaseContent], + contents: List[Dict]) -> None: + """Add contents to storage. If a hash collision is detected, an error is + logged. Then this adds the other non colliding contents to the storage. + + Args: + content_add_fn: Storage content callable + contents: List of contents or skipped contents to add to storage + + """ + try: + content_add_fn(contents) + except HashCollision as e: + logger.error('Hash collision: %s', e.args) + colliding_contents: List[Dict[str, bytes]] = e.args[2] + content_add_fn( + c for c in contents + if convert_fn(c).hashes() not in colliding_contents + ) + + def _insert_objects(object_type, objects, storage): objects = fix_objects(object_type, objects) if object_type == 'content': - try: - storage.skipped_content_add( - (obj for obj in objects if obj.get('status') == 'absent')) - except HashCollision as e: - logger.error('(SkippedContent) Hash collision: %s', e.args) - - try: - storage.content_add_metadata( - (obj for obj in objects if obj.get('status') != 'absent')) - except HashCollision as e: - logger.error('(Content) Hash collision: %s', e.args) - + content_add(storage.skipped_content_add, SkippedContent.from_dict, + [obj for obj in objects if obj.get('status') == 'absent']) + content_add(storage.content_add_metadata, Content.from_dict, + [obj for obj in objects if obj.get('status') != 'absent']) elif object_type in ('directory', 'revision', 'release', 'snapshot', 'origin'): # TODO: split batches that are too large for the storage diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py --- a/swh/journal/tests/conftest.py +++ b/swh/journal/tests/conftest.py @@ -36,6 +36,24 @@ }, ] +duplicate_content1 = { + 'length': 4, + 'sha1': hash_to_bytes( + '44973274ccef6ab4dfaaf86599792fa9c3fe4689'), + 'sha1_git': b'foosha1', + 'blake2s256': b'barblake', + 'sha256': b'bazsha256', + 'status': 'visible', +} + +duplicate_content2 = duplicate_content1.copy() +sha1_array = bytearray(duplicate_content1['sha1']) +sha1_array[0] += 1 +duplicate_content2['sha1'] = bytes(sha1_array) + +DUPLICATE_CONTENTS = [duplicate_content1, duplicate_content2] + + COMMITTERS = [ { 'fullname': b'foo', diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py --- a/swh/journal/tests/test_replay.py +++ b/swh/journal/tests/test_replay.py @@ -1,4 +1,4 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -20,7 +20,7 @@ from swh.journal.serializers import key_to_kafka, value_to_kafka from swh.journal.replay import process_replay_objects, is_hash_in_bytearray -from .conftest import OBJECT_TYPE_KEYS +from .conftest import OBJECT_TYPE_KEYS, DUPLICATE_CONTENTS from .utils import MockedJournalClient, MockedKafkaWriter @@ -50,11 +50,14 @@ now = datetime.datetime.now(tz=datetime.timezone.utc) + def make_topic(kafka_prefix: str, object_type: str) -> str: + return kafka_prefix + '.' + object_type + # Fill Kafka nb_sent = 0 nb_visits = 0 for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): - topic = kafka_prefix + '.' + object_type + topic = make_topic(kafka_prefix, object_type) for object_ in objects: key = bytes(random.randint(0, 255) for _ in range(40)) object_ = object_.copy() @@ -69,6 +72,17 @@ ) nb_sent += 1 + # Create collision in input data + # They are not written in the destination + for content in DUPLICATE_CONTENTS: + topic = make_topic(kafka_prefix, 'content') + producer.produce( + topic=topic, key=key_to_kafka(key), + value=value_to_kafka(content), + ) + + nb_sent += 1 + producer.flush() # Fill the storage from Kafka diff --git a/tox.ini b/tox.ini --- a/tox.ini +++ b/tox.ini @@ -7,6 +7,7 @@ testing deps = pytest-cov + dev: pdbpp setenv = SWH_KAFKA_ROOT = {env:SWH_KAFKA_ROOT:swh/journal/tests/kafka} commands =