diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,3 +1,3 @@ swh.core[db,http] >= 0.0.60 swh.model >= 0.0.60 -swh.storage >= 0.0.172 +swh.storage >= 0.0.177 diff --git a/swh/journal/replay.py b/swh/journal/replay.py --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -6,7 +6,7 @@ import copy import logging from time import time -from typing import Callable, Dict, List, Optional +from typing import Any, Callable, Dict, Iterable, List, Optional from sentry_sdk import capture_exception, push_scope try: @@ -22,7 +22,7 @@ from swh.core.statsd import statsd from swh.model.identifiers import normalize_timestamp from swh.model.hashutil import hash_to_hex -from swh.model.model import SHA1_SIZE +from swh.model.model import BaseContent, Content, SkippedContent, SHA1_SIZE from swh.objstorage.objstorage import ( ID_HASH_ALGO, ObjNotFoundError, ObjStorage, ) @@ -237,21 +237,47 @@ return objects -def _insert_objects(object_type, objects, storage): - objects = fix_objects(object_type, objects) - if object_type == 'content': - try: - storage.skipped_content_add( - (obj for obj in objects if obj.get('status') == 'absent')) - except HashCollision as e: - logger.error('(SkippedContent) Hash collision: %s', e.args) +def collision_aware_content_add( + content_add_fn: Callable[[Iterable[Any]], None], + contents: List[BaseContent]) -> None: + """Add contents to storage. If a hash collision is detected, an error is + logged. Then this adds the other non colliding contents to the storage. + + Args: + content_add_fn: Storage content callable + contents: List of contents or skipped contents to add to storage + """ + colliding_content_hashes: Dict[str, List[Dict[str, bytes]]] = {} + while True: + logger.debug('content-add: %s', len(contents)) try: - storage.content_add_metadata( - (obj for obj in objects if obj.get('status') != 'absent')) + content_add_fn(c.to_dict() for c in contents) except HashCollision as e: - logger.error('(Content) Hash collision: %s', e.args) + algo, hash_id, colliding_hashes = e.args + hash_id = hash_to_hex(hash_id) + colliding_content_hashes[f'{algo}-{hash_id}'] = colliding_hashes + # Drop the colliding contents from the transaction + contents = [c for c in contents + if c.hashes() not in colliding_hashes] + else: + # Successfully added contents, we are done + break + if colliding_content_hashes: + logger.error('Colliding contents: %s', colliding_content_hashes) + +def _insert_objects(object_type, objects, storage): + objects = fix_objects(object_type, objects) + if object_type == 'content': + collision_aware_content_add( + storage.skipped_content_add, + [SkippedContent.from_dict(obj) + for obj in objects if obj.get('status') == 'absent']) + collision_aware_content_add( + storage.content_add_metadata, + [Content.from_dict(obj) + for obj in objects if obj.get('status') != 'absent']) elif object_type in ('directory', 'revision', 'release', 'snapshot', 'origin'): # TODO: split batches that are too large for the storage diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py --- a/swh/journal/tests/conftest.py +++ b/swh/journal/tests/conftest.py @@ -36,6 +36,24 @@ }, ] +duplicate_content1 = { + 'length': 4, + 'sha1': hash_to_bytes( + '44973274ccef6ab4dfaaf86599792fa9c3fe4689'), + 'sha1_git': b'foosha1', + 'blake2s256': b'barblake', + 'sha256': b'bazsha256', + 'status': 'visible', +} + +duplicate_content2 = duplicate_content1.copy() +sha1_array = bytearray(duplicate_content1['sha1']) +sha1_array[0] += 1 +duplicate_content2['sha1'] = bytes(sha1_array) + +DUPLICATE_CONTENTS = [duplicate_content1, duplicate_content2] + + COMMITTERS = [ { 'fullname': b'foo', diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py --- a/swh/journal/tests/test_replay.py +++ b/swh/journal/tests/test_replay.py @@ -1,10 +1,11 @@ -# Copyright (C) 2019 The Software Heritage developers +# Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import functools +import logging import random from subprocess import Popen from typing import Tuple @@ -20,7 +21,7 @@ from swh.journal.serializers import key_to_kafka, value_to_kafka from swh.journal.replay import process_replay_objects, is_hash_in_bytearray -from .conftest import OBJECT_TYPE_KEYS +from .conftest import OBJECT_TYPE_KEYS, DUPLICATE_CONTENTS from .utils import MockedJournalClient, MockedKafkaWriter @@ -33,10 +34,112 @@ } +def make_topic(kafka_prefix: str, object_type: str) -> str: + return kafka_prefix + '.' + object_type + + def test_storage_play( kafka_prefix: str, kafka_consumer_group: str, - kafka_server: Tuple[Popen, int]): + kafka_server: Tuple[Popen, int], + caplog): + (_, port) = kafka_server + kafka_prefix += '.swh.journal.objects' + + storage = get_storage(**storage_config) + + producer = Producer({ + 'bootstrap.servers': 'localhost:{}'.format(port), + 'client.id': 'test producer', + 'enable.idempotence': 'true', + }) + + now = datetime.datetime.now(tz=datetime.timezone.utc) + + # Fill Kafka + nb_sent = 0 + nb_visits = 0 + for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): + topic = make_topic(kafka_prefix, object_type) + for object_ in objects: + key = bytes(random.randint(0, 255) for _ in range(40)) + object_ = object_.copy() + if object_type == 'content': + object_['ctime'] = now + elif object_type == 'origin_visit': + nb_visits += 1 + object_['visit'] = nb_visits + producer.produce( + topic=topic, key=key_to_kafka(key), + value=value_to_kafka(object_), + ) + nb_sent += 1 + + producer.flush() + + # Fill the storage from Kafka + replayer = JournalClient( + brokers='localhost:%d' % kafka_server[1], + group_id=kafka_consumer_group, + prefix=kafka_prefix, + stop_after_objects=nb_sent, + ) + worker_fn = functools.partial(process_replay_objects, storage=storage) + nb_inserted = 0 + while nb_inserted < nb_sent: + nb_inserted += replayer.process(worker_fn) + assert nb_sent == nb_inserted + + # Check the objects were actually inserted in the storage + assert OBJECT_TYPE_KEYS['revision'][1] == \ + list(storage.revision_get( + [rev['id'] for rev in OBJECT_TYPE_KEYS['revision'][1]])) + assert OBJECT_TYPE_KEYS['release'][1] == \ + list(storage.release_get( + [rel['id'] for rel in OBJECT_TYPE_KEYS['release'][1]])) + + origins = list(storage.origin_get( + [orig for orig in OBJECT_TYPE_KEYS['origin'][1]])) + assert OBJECT_TYPE_KEYS['origin'][1] == \ + [{'url': orig['url']} for orig in origins] + for origin in origins: + origin_url = origin['url'] + expected_visits = [ + { + **visit, + 'origin': origin_url, + 'date': dateutil.parser.parse(visit['date']), + } + for visit in OBJECT_TYPE_KEYS['origin_visit'][1] + if visit['origin'] == origin['url'] + ] + actual_visits = list(storage.origin_visit_get( + origin_url)) + for visit in actual_visits: + del visit['visit'] # opaque identifier + assert expected_visits == actual_visits + + input_contents = OBJECT_TYPE_KEYS['content'][1] + contents = storage.content_get_metadata( + [cont['sha1'] for cont in input_contents]) + assert len(contents) == len(input_contents) + assert contents == {cont['sha1']: [cont] for cont in input_contents} + + caplog.set_level(logging.ERROR, 'swh.journal.replay') + collision = 0 + for record in caplog.records: + logtext = record.getMessage() + if 'Colliding contents:' in logtext: + collision += 1 + + assert collision == 0, "No collision should be detected" + + +def test_storage_play_with_collision( + kafka_prefix: str, + kafka_consumer_group: str, + kafka_server: Tuple[Popen, int], + caplog): (_, port) = kafka_server kafka_prefix += '.swh.journal.objects' @@ -54,7 +157,7 @@ nb_sent = 0 nb_visits = 0 for (object_type, (_, objects)) in OBJECT_TYPE_KEYS.items(): - topic = kafka_prefix + '.' + object_type + topic = make_topic(kafka_prefix, object_type) for object_ in objects: key = bytes(random.randint(0, 255) for _ in range(40)) object_ = object_.copy() @@ -69,6 +172,17 @@ ) nb_sent += 1 + # Create collision in input data + # They are not written in the destination + for content in DUPLICATE_CONTENTS: + topic = make_topic(kafka_prefix, 'content') + producer.produce( + topic=topic, key=key_to_kafka(key), + value=value_to_kafka(content), + ) + + nb_sent += 1 + producer.flush() # Fill the storage from Kafka @@ -119,6 +233,15 @@ assert len(contents) == len(input_contents) assert contents == {cont['sha1']: [cont] for cont in input_contents} + caplog.set_level(logging.ERROR, 'swh.journal.replay') + collision = 0 + for record in caplog.records: + logtext = record.getMessage() + if 'Colliding contents' in logtext: + collision += 1 + + assert collision == 1 + def _test_write_replay_origin_visit(visits): """Helper function to write tests for origin_visit. diff --git a/tox.ini b/tox.ini --- a/tox.ini +++ b/tox.ini @@ -7,6 +7,7 @@ testing deps = pytest-cov + dev: pdbpp setenv = SWH_KAFKA_ROOT = {env:SWH_KAFKA_ROOT:swh/journal/tests/kafka} commands =