diff --git a/mypy.ini b/mypy.ini --- a/mypy.ini +++ b/mypy.ini @@ -22,3 +22,6 @@ [mypy-systemd.daemon.*] ignore_missing_imports = True + +[mypy-tenacity.*] +ignore_missing_imports = True diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -3,4 +3,5 @@ # dependency lines, see https://pip.readthedocs.org/en/1.1/requirements.html confluent-kafka msgpack +tenacity vcversioner diff --git a/swh/journal/replay.py b/swh/journal/replay.py --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -13,6 +13,8 @@ except ImportError: notify = None +from tenacity import retry, stop_after_attempt, wait_random_exponential + from swh.core.statsd import statsd from swh.model.identifiers import normalize_timestamp from swh.model.hashutil import hash_to_hex @@ -322,6 +324,14 @@ return len(obj) +@retry(stop=stop_after_attempt(3), + reraise=True, + wait=wait_random_exponential(multiplier=1, max=60)) +def obj_in_objstorage(obj_id, dst): + """Check if an object is already in an objstorage, tenaciously""" + return obj_id in dst + + def process_replay_objects_content( all_objects: Dict[str, List[dict]], *, @@ -401,7 +411,7 @@ hash_to_hex(obj_id)) statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "excluded"}) - elif check_dst and obj_id in dst: + elif check_dst and obj_in_objstorage(obj_id, dst): nb_skipped += 1 logger.debug('skipped %s (in dst)', hash_to_hex(obj_id)) statsd.increment(CONTENT_OPERATIONS_METRIC, diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py --- a/swh/journal/tests/test_cli.py +++ b/swh/journal/tests/test_cli.py @@ -3,6 +3,7 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from collections import Counter import functools import logging import re @@ -338,3 +339,59 @@ for (sha1, content) in contents.items(): assert sha1 in objstorages['dst'], sha1 assert objstorages['dst'].get(sha1) == content + + +@_patch_objstorages(['src', 'dst']) +def test_replay_content_check_dst_retry( + objstorages, + storage, + kafka_prefix: str, + kafka_consumer_group: str, + kafka_server: Tuple[Popen, int]): + (_, kafka_port) = kafka_server + kafka_prefix += '.swh.journal.objects' + + contents = _fill_objstorage_and_kafka( + kafka_port, kafka_prefix, objstorages) + + class FlakyMemoryObjStorage(InMemoryObjStorage): + def __init__(self, *args, **kwargs): + state = kwargs.pop('state') + super().__init__(*args, **kwargs) + if state: + self.state = state + self.contains_failed = Counter() + + def __contains__(self, id): + if self.contains_failed[id] == 0: + self.contains_failed[id] += 1 + raise ValueError('This contains is flaky') + + return super().__contains__(id) + + for i, (sha1, content) in enumerate(contents.items()): + if i >= NUM_CONTENTS_DST: + break + + objstorages['dst'].add(content, obj_id=sha1) + + dst_state = objstorages['dst'].state + flaky_objstorage = FlakyMemoryObjStorage(state=dst_state) + objstorages['dst'] = flaky_objstorage + dst_contains_failed = flaky_objstorage.contains_failed + + result = invoke(False, [ + 'content-replay', + '--broker', '127.0.0.1:%d' % kafka_port, + '--group-id', kafka_consumer_group, + '--prefix', kafka_prefix, + '--max-messages', str(NUM_CONTENTS), + '--check-dst', + ]) + expected = r'Done.\n' + assert result.exit_code == 0, result.output + assert re.fullmatch(expected, result.output, re.MULTILINE), result.output + + for (sha1, content) in contents.items(): + assert dst_contains_failed[sha1] >= 0 + assert dst_state[sha1] == content