diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,3 +1,2 @@ swh.core[db,http] >= 0.0.60 swh.model >= 0.0.61 -swh.storage >= 0.0.181 diff --git a/requirements-test.txt b/requirements-test.txt --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,3 +1,2 @@ pytest -swh.model >= 0.0.34 hypothesis diff --git a/setup.py b/setup.py --- a/setup.py +++ b/setup.py @@ -47,10 +47,6 @@ packages=find_packages(), scripts=[], entry_points=""" - [console_scripts] - swh-journal=swh.journal.cli:main - [swh.cli.subcommands] - journal=swh.journal.cli:cli [pytest11] pytest_swh_journal = swh.journal.pytest_plugin """, diff --git a/swh/journal/cli.py b/swh/journal/cli.py --- a/swh/journal/cli.py +++ b/swh/journal/cli.py @@ -3,27 +3,11 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import functools import logging -import mmap -import os -import warnings import click -try: - from systemd.daemon import notify -except ImportError: - notify = None - -from swh.core import config from swh.core.cli import CONTEXT_SETTINGS -from swh.model.model import SHA1_SIZE -from swh.objstorage import get_objstorage - -from swh.journal.client import get_journal_client as get_client -from swh.journal.replay import is_hash_in_bytearray -from swh.journal.replay import process_replay_objects_content @click.group(name="journal", context_settings=CONTEXT_SETTINGS) @@ -36,44 +20,9 @@ ) @click.pass_context def cli(ctx, config_file): - """Software Heritage Journal tools. - - The journal is a persistent logger of changes to the archive, with - publish-subscribe support. - + """DEPRECATED Software Heritage Journal tools. """ - if not config_file: - config_file = os.environ.get("SWH_CONFIG_FILENAME") - - if config_file: - if not os.path.exists(config_file): - raise ValueError("%s does not exist" % config_file) - conf = config.read(config_file) - else: - conf = {} - - ctx.ensure_object(dict) - - ctx.obj["config"] = conf - - -def get_journal_client(ctx, **kwargs): - conf = ctx.obj["config"].copy() - if "journal" in conf: - warnings.warn( - "Journal client configuration should now be under the " - "`journal_client` field and have a `cls` argument.", - DeprecationWarning, - ) - conf["journal_client"] = {"cls": "kafka", **conf.pop("journal")} - - client_conf = conf.get("journal_client").copy() - client_conf.update(kwargs) - - try: - return get_client(**client_conf) - except ValueError as exc: - ctx.fail(exc) + pass @cli.command() @@ -129,79 +78,10 @@ ) @click.pass_context def content_replay(ctx, stop_after_objects, exclude_sha1_file, check_dst): - """Fill a destination Object Storage (typically a mirror) by reading a Journal - and retrieving objects from an existing source ObjStorage. - - There can be several 'replayers' filling a given ObjStorage as long as they - use the same `group-id`. You can use the `KAFKA_GROUP_INSTANCE_ID` - environment variable to use KIP-345 static group membership. - - This service retrieves object ids to copy from the 'content' topic. It will - only copy object's content if the object's description in the kafka - nmessage has the status:visible set. - - `--exclude-sha1-file` may be used to exclude some hashes to speed-up the - replay in case many of the contents are already in the destination - objstorage. It must contain a concatenation of all (sha1) hashes, - and it must be sorted. - This file will not be fully loaded into memory at any given time, - so it can be arbitrarily large. - - `--check-dst` sets whether the replayer should check in the destination - ObjStorage before copying an object. You can turn that off if you know - you're copying to an empty ObjStorage. - """ - conf = ctx.obj["config"] - try: - objstorage_src = get_objstorage(**conf.pop("objstorage_src")) - except KeyError: - ctx.fail("You must have a source objstorage configured in " "your config file.") - try: - objstorage_dst = get_objstorage(**conf.pop("objstorage_dst")) - except KeyError: - ctx.fail( - "You must have a destination objstorage configured " "in your config file." - ) - - if exclude_sha1_file: - map_ = mmap.mmap(exclude_sha1_file.fileno(), 0, prot=mmap.PROT_READ) - if map_.size() % SHA1_SIZE != 0: - ctx.fail( - "--exclude-sha1 must link to a file whose size is an " - "exact multiple of %d bytes." % SHA1_SIZE - ) - nb_excluded_hashes = int(map_.size() / SHA1_SIZE) - - def exclude_fn(obj): - return is_hash_in_bytearray(obj["sha1"], map_, nb_excluded_hashes) - - else: - exclude_fn = None - - client = get_journal_client( - ctx, stop_after_objects=stop_after_objects, object_types=("content",) - ) - worker_fn = functools.partial( - process_replay_objects_content, - src=objstorage_src, - dst=objstorage_dst, - exclude_fn=exclude_fn, - check_dst=check_dst, - ) - - if notify: - notify("READY=1") - - try: - client.process(worker_fn) - except KeyboardInterrupt: - ctx.exit(0) - else: - print("Done.") - finally: - if notify: - notify("STOPPING=1") - client.close() + """DEPRECATED: use `swh objstorage replay` instead. + + This needs the swh.objstorage.replayer package.""" + ctx.fail("DEPRECATED") def main(): diff --git a/swh/journal/replay.py b/swh/journal/replay.py deleted file mode 100644 --- a/swh/journal/replay.py +++ /dev/null @@ -1,303 +0,0 @@ -# Copyright (C) 2019-2020 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import logging -from time import time -from typing import Callable, Dict, List, Optional - -from sentry_sdk import capture_exception, push_scope - -try: - from systemd.daemon import notify -except ImportError: - notify = None - -from tenacity import ( - retry, - retry_if_exception_type, - stop_after_attempt, - wait_random_exponential, -) - -from swh.core.statsd import statsd -from swh.model.hashutil import hash_to_hex -from swh.model.model import SHA1_SIZE - -from swh.objstorage.objstorage import ( - ID_HASH_ALGO, - ObjNotFoundError, - ObjStorage, -) - -logger = logging.getLogger(__name__) - -CONTENT_OPERATIONS_METRIC = "swh_content_replayer_operations_total" -CONTENT_RETRY_METRIC = "swh_content_replayer_retries_total" -CONTENT_BYTES_METRIC = "swh_content_replayer_bytes" -CONTENT_DURATION_METRIC = "swh_content_replayer_duration_seconds" - - -def is_hash_in_bytearray(hash_, array, nb_hashes, hash_size=SHA1_SIZE): - """ - Checks if the given hash is in the provided `array`. The array must be - a *sorted* list of sha1 hashes, and contain `nb_hashes` hashes - (so its size must by `nb_hashes*hash_size` bytes). - - Args: - hash_ (bytes): the hash to look for - array (bytes): a sorted concatenated array of hashes (may be of - any type supporting slice indexing, eg. :class:`mmap.mmap`) - nb_hashes (int): number of hashes in the array - hash_size (int): size of a hash (defaults to 20, for SHA1) - - Example: - - >>> import os - >>> hash1 = os.urandom(20) - >>> hash2 = os.urandom(20) - >>> hash3 = os.urandom(20) - >>> array = b''.join(sorted([hash1, hash2])) - >>> is_hash_in_bytearray(hash1, array, 2) - True - >>> is_hash_in_bytearray(hash2, array, 2) - True - >>> is_hash_in_bytearray(hash3, array, 2) - False - """ - if len(hash_) != hash_size: - raise ValueError("hash_ does not match the provided hash_size.") - - def get_hash(position): - return array[position * hash_size : (position + 1) * hash_size] - - # Regular dichotomy: - left = 0 - right = nb_hashes - while left < right - 1: - middle = int((right + left) / 2) - pivot = get_hash(middle) - if pivot == hash_: - return True - elif pivot < hash_: - left = middle - else: - right = middle - return get_hash(left) == hash_ - - -class ReplayError(Exception): - """An error occurred during the replay of an object""" - - def __init__(self, operation, *, obj_id, exc): - self.operation = operation - self.obj_id = hash_to_hex(obj_id) - self.exc = exc - - def __str__(self): - return "ReplayError(doing %s, %s, %s)" % (self.operation, self.obj_id, self.exc) - - -def log_replay_retry(retry_obj, sleep, last_result): - """Log a retry of the content replayer""" - exc = last_result.exception() - logger.debug( - "Retry operation %(operation)s on %(obj_id)s: %(exc)s", - {"operation": exc.operation, "obj_id": exc.obj_id, "exc": str(exc.exc)}, - ) - - statsd.increment( - CONTENT_RETRY_METRIC, - tags={ - "operation": exc.operation, - "attempt": str(retry_obj.statistics["attempt_number"]), - }, - ) - - -def log_replay_error(last_attempt): - """Log a replay error to sentry""" - exc = last_attempt.exception() - with push_scope() as scope: - scope.set_tag("operation", exc.operation) - scope.set_extra("obj_id", exc.obj_id) - capture_exception(exc.exc) - - logger.error( - "Failed operation %(operation)s on %(obj_id)s after %(retries)s" - " retries: %(exc)s", - { - "obj_id": exc.obj_id, - "operation": exc.operation, - "exc": str(exc.exc), - "retries": last_attempt.attempt_number, - }, - ) - - return None - - -CONTENT_REPLAY_RETRIES = 3 - -content_replay_retry = retry( - retry=retry_if_exception_type(ReplayError), - stop=stop_after_attempt(CONTENT_REPLAY_RETRIES), - wait=wait_random_exponential(multiplier=1, max=60), - before_sleep=log_replay_retry, - retry_error_callback=log_replay_error, -) - - -@content_replay_retry -def copy_object(obj_id, src, dst): - hex_obj_id = hash_to_hex(obj_id) - obj = "" - try: - with statsd.timed(CONTENT_DURATION_METRIC, tags={"request": "get"}): - obj = src.get(obj_id) - logger.debug("retrieved %(obj_id)s", {"obj_id": hex_obj_id}) - - with statsd.timed(CONTENT_DURATION_METRIC, tags={"request": "put"}): - dst.add(obj, obj_id=obj_id, check_presence=False) - logger.debug("copied %(obj_id)s", {"obj_id": hex_obj_id}) - statsd.increment(CONTENT_BYTES_METRIC, len(obj)) - except ObjNotFoundError: - logger.error( - "Failed to copy %(obj_id)s: object not found", {"obj_id": hex_obj_id} - ) - raise - except Exception as exc: - raise ReplayError("copy", obj_id=obj_id, exc=exc) from None - return len(obj) - - -@content_replay_retry -def obj_in_objstorage(obj_id, dst): - """Check if an object is already in an objstorage, tenaciously""" - try: - return obj_id in dst - except Exception as exc: - raise ReplayError("in_dst", obj_id=obj_id, exc=exc) from None - - -def process_replay_objects_content( - all_objects: Dict[str, List[dict]], - *, - src: ObjStorage, - dst: ObjStorage, - exclude_fn: Optional[Callable[[dict], bool]] = None, - check_dst: bool = True, -): - """ - Takes a list of records from Kafka (see - :py:func:`swh.journal.client.JournalClient.process`) and copies them - from the `src` objstorage to the `dst` objstorage, if: - - * `obj['status']` is `'visible'` - * `exclude_fn(obj)` is `False` (if `exclude_fn` is provided) - * `obj['sha1'] not in dst` (if `check_dst` is True) - - Args: - all_objects: Objects passed by the Kafka client. Most importantly, - `all_objects['content'][*]['sha1']` is the sha1 hash of each - content. - src: An object storage (see :py:func:`swh.objstorage.get_objstorage`) - dst: An object storage (see :py:func:`swh.objstorage.get_objstorage`) - exclude_fn: Determines whether an object should be copied. - check_dst: Determines whether we should check the destination - objstorage before copying. - - Example: - - >>> from swh.objstorage import get_objstorage - >>> src = get_objstorage('memory', {}) - >>> dst = get_objstorage('memory', {}) - >>> id1 = src.add(b'foo bar') - >>> id2 = src.add(b'baz qux') - >>> kafka_partitions = { - ... 'content': [ - ... { - ... 'sha1': id1, - ... 'status': 'visible', - ... }, - ... { - ... 'sha1': id2, - ... 'status': 'visible', - ... }, - ... ] - ... } - >>> process_replay_objects_content( - ... kafka_partitions, src=src, dst=dst, - ... exclude_fn=lambda obj: obj['sha1'] == id1) - >>> id1 in dst - False - >>> id2 in dst - True - """ - vol = [] - nb_skipped = 0 - nb_failures = 0 - t0 = time() - - for (object_type, objects) in all_objects.items(): - if object_type != "content": - logger.warning( - "Received a series of %s, this should not happen", object_type - ) - continue - for obj in objects: - obj_id = obj[ID_HASH_ALGO] - if obj["status"] != "visible": - nb_skipped += 1 - logger.debug( - "skipped %s (status=%s)", hash_to_hex(obj_id), obj["status"] - ) - statsd.increment( - CONTENT_OPERATIONS_METRIC, - tags={"decision": "skipped", "status": obj["status"]}, - ) - elif exclude_fn and exclude_fn(obj): - nb_skipped += 1 - logger.debug("skipped %s (manually excluded)", hash_to_hex(obj_id)) - statsd.increment( - CONTENT_OPERATIONS_METRIC, tags={"decision": "excluded"} - ) - elif check_dst and obj_in_objstorage(obj_id, dst): - nb_skipped += 1 - logger.debug("skipped %s (in dst)", hash_to_hex(obj_id)) - statsd.increment(CONTENT_OPERATIONS_METRIC, tags={"decision": "in_dst"}) - else: - try: - copied = copy_object(obj_id, src, dst) - except ObjNotFoundError: - nb_skipped += 1 - statsd.increment( - CONTENT_OPERATIONS_METRIC, tags={"decision": "not_in_src"} - ) - else: - if copied is None: - nb_failures += 1 - statsd.increment( - CONTENT_OPERATIONS_METRIC, tags={"decision": "failed"} - ) - else: - vol.append(copied) - statsd.increment( - CONTENT_OPERATIONS_METRIC, tags={"decision": "copied"} - ) - - dt = time() - t0 - logger.info( - "processed %s content objects in %.1fsec " - "(%.1f obj/sec, %.1fMB/sec) - %d failed - %d skipped", - len(vol), - dt, - len(vol) / dt, - sum(vol) / 1024 / 1024 / dt, - nb_failures, - nb_skipped, - ) - - if notify: - notify("WATCHDOG=1") diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py deleted file mode 100644 --- a/swh/journal/tests/test_cli.py +++ /dev/null @@ -1,610 +0,0 @@ -# Copyright (C) 2019 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -from collections import Counter -import copy -import functools -import logging -import re -import tempfile -from unittest.mock import patch, MagicMock - -from click.testing import CliRunner -from confluent_kafka import Producer -import pytest -import yaml - -from swh.model.hashutil import hash_to_hex -from swh.objstorage.backends.in_memory import InMemoryObjStorage -from swh.storage import get_storage - -from swh.journal.cli import cli, get_journal_client -from swh.journal.replay import CONTENT_REPLAY_RETRIES -from swh.journal.serializers import key_to_kafka - - -logger = logging.getLogger(__name__) - - -CLI_CONFIG = { - "storage": {"cls": "memory",}, - "objstorage_src": {"cls": "mocked", "name": "src",}, - "objstorage_dst": {"cls": "mocked", "name": "dst",}, -} - - -@pytest.fixture -def storage(): - """An swh-storage object that gets injected into the CLI functions.""" - storage_config = {"cls": "pipeline", "steps": [{"cls": "memory"},]} - storage = get_storage(**storage_config) - with patch("swh.storage.get_storage") as get_storage_mock: - get_storage_mock.return_value = storage - yield storage - - -@pytest.fixture -def monkeypatch_retry_sleep(monkeypatch): - from swh.journal.replay import copy_object, obj_in_objstorage - - monkeypatch.setattr(copy_object.retry, "sleep", lambda x: None) - monkeypatch.setattr(obj_in_objstorage.retry, "sleep", lambda x: None) - - -def invoke(*args, env=None, journal_config=None): - config = copy.deepcopy(CLI_CONFIG) - if journal_config: - config["journal_client"] = journal_config.copy() - config["journal_client"]["cls"] = "kafka" - - runner = CliRunner() - with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd: - yaml.dump(config, config_fd) - config_fd.seek(0) - args = ["-C" + config_fd.name] + list(args) - return runner.invoke(cli, args, obj={"log_level": logging.DEBUG}, env=env,) - - -def test_get_journal_client_config_bwcompat(kafka_server): - cfg = { - "journal": { - "brokers": [kafka_server], - "group_id": "toto", - "prefix": "xiferp", - "object_types": ["content"], - "batch_size": 50, - } - } - ctx = MagicMock(obj={"config": cfg}) - with pytest.deprecated_call(): - client = get_journal_client(ctx, stop_after_objects=10, prefix="prefix") - assert client.subscription == ["prefix.content"] - assert client.stop_after_objects == 10 - assert client.batch_size == 50 - - -def test_get_journal_client_config(kafka_server): - cfg = { - "journal_client": { - "cls": "kafka", - "brokers": [kafka_server], - "group_id": "toto", - "prefix": "xiferp", - "object_types": ["content"], - "batch_size": 50, - } - } - ctx = MagicMock(obj={"config": cfg}) - client = get_journal_client(ctx, stop_after_objects=10, prefix="prefix") - assert client.subscription == ["prefix.content"] - assert client.stop_after_objects == 10 - assert client.batch_size == 50 - - -def _patch_objstorages(names): - objstorages = {name: InMemoryObjStorage() for name in names} - - def get_mock_objstorage(cls, **args): - assert cls == "mocked", cls - return objstorages[args["name"]] - - def decorator(f): - @functools.wraps(f) - @patch("swh.journal.cli.get_objstorage") - def newf(get_objstorage_mock, *args, **kwargs): - get_objstorage_mock.side_effect = get_mock_objstorage - f(*args, objstorages=objstorages, **kwargs) - - return newf - - return decorator - - -NUM_CONTENTS = 10 - - -def _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages): - producer = Producer( - { - "bootstrap.servers": kafka_server, - "client.id": "test-producer", - "acks": "all", - } - ) - - contents = {} - for i in range(NUM_CONTENTS): - content = b"\x00" * 19 + bytes([i]) - sha1 = objstorages["src"].add(content) - contents[sha1] = content - producer.produce( - topic=kafka_prefix + ".content", - key=key_to_kafka(sha1), - value=key_to_kafka({"sha1": sha1, "status": "visible",}), - ) - - producer.flush() - - return contents - - -@_patch_objstorages(["src", "dst"]) -def test_replay_content( - objstorages, - storage, - kafka_prefix: str, - kafka_consumer_group: str, - kafka_server: str, -): - kafka_prefix += ".swh.journal.objects" - - contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) - - result = invoke( - "content-replay", - "--stop-after-objects", - str(NUM_CONTENTS), - journal_config={ - "brokers": [kafka_server], - "group_id": kafka_consumer_group, - "prefix": kafka_prefix, - }, - ) - - expected = r"Done.\n" - assert result.exit_code == 0, result.output - assert re.fullmatch(expected, result.output, re.MULTILINE), result.output - - for (sha1, content) in contents.items(): - assert sha1 in objstorages["dst"], sha1 - assert objstorages["dst"].get(sha1) == content - - -@_patch_objstorages(["src", "dst"]) -def test_replay_content_structured_log( - objstorages, - storage, - kafka_prefix: str, - kafka_consumer_group: str, - kafka_server: str, - caplog, -): - kafka_prefix += ".swh.journal.objects" - - contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) - - caplog.set_level(logging.DEBUG, "swh.journal.replay") - - expected_obj_ids = set(hash_to_hex(sha1) for sha1 in contents) - - result = invoke( - "content-replay", - "--stop-after-objects", - str(NUM_CONTENTS), - journal_config={ - "brokers": [kafka_server], - "group_id": kafka_consumer_group, - "prefix": kafka_prefix, - }, - ) - expected = r"Done.\n" - assert result.exit_code == 0, result.output - assert re.fullmatch(expected, result.output, re.MULTILINE), result.output - - copied = set() - for record in caplog.records: - logtext = record.getMessage() - if "copied" in logtext: - copied.add(record.args["obj_id"]) - - assert ( - copied == expected_obj_ids - ), "Mismatched logging; see captured log output for details." - - -@_patch_objstorages(["src", "dst"]) -def test_replay_content_static_group_id( - objstorages, - storage, - kafka_prefix: str, - kafka_consumer_group: str, - kafka_server: str, - caplog, -): - kafka_prefix += ".swh.journal.objects" - - contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) - - # Setup log capture to fish the consumer settings out of the log messages - caplog.set_level(logging.DEBUG, "swh.journal.client") - - result = invoke( - "content-replay", - "--stop-after-objects", - str(NUM_CONTENTS), - env={"KAFKA_GROUP_INSTANCE_ID": "static-group-instance-id"}, - journal_config={ - "brokers": [kafka_server], - "group_id": kafka_consumer_group, - "prefix": kafka_prefix, - }, - ) - expected = r"Done.\n" - assert result.exit_code == 0, result.output - assert re.fullmatch(expected, result.output, re.MULTILINE), result.output - - consumer_settings = None - for record in caplog.records: - if "Consumer settings" in record.message: - consumer_settings = record.args - break - - assert consumer_settings is not None, ( - "Failed to get consumer settings out of the consumer log. " - "See log capture for details." - ) - assert consumer_settings["group.instance.id"] == "static-group-instance-id" - assert consumer_settings["session.timeout.ms"] == 60 * 10 * 1000 - assert consumer_settings["max.poll.interval.ms"] == 90 * 10 * 1000 - - for (sha1, content) in contents.items(): - assert sha1 in objstorages["dst"], sha1 - assert objstorages["dst"].get(sha1) == content - - -@_patch_objstorages(["src", "dst"]) -def test_replay_content_exclude( - objstorages, - storage, - kafka_prefix: str, - kafka_consumer_group: str, - kafka_server: str, -): - kafka_prefix += ".swh.journal.objects" - - contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) - - excluded_contents = list(contents)[0::2] # picking half of them - with tempfile.NamedTemporaryFile(mode="w+b") as fd: - fd.write(b"".join(sorted(excluded_contents))) - - fd.seek(0) - - result = invoke( - "content-replay", - "--stop-after-objects", - str(NUM_CONTENTS), - "--exclude-sha1-file", - fd.name, - journal_config={ - "brokers": [kafka_server], - "group_id": kafka_consumer_group, - "prefix": kafka_prefix, - }, - ) - expected = r"Done.\n" - assert result.exit_code == 0, result.output - assert re.fullmatch(expected, result.output, re.MULTILINE), result.output - - for (sha1, content) in contents.items(): - if sha1 in excluded_contents: - assert sha1 not in objstorages["dst"], sha1 - else: - assert sha1 in objstorages["dst"], sha1 - assert objstorages["dst"].get(sha1) == content - - -NUM_CONTENTS_DST = 5 - - -@_patch_objstorages(["src", "dst"]) -@pytest.mark.parametrize( - "check_dst,expected_copied,expected_in_dst", - [ - (True, NUM_CONTENTS - NUM_CONTENTS_DST, NUM_CONTENTS_DST), - (False, NUM_CONTENTS, 0), - ], -) -def test_replay_content_check_dst( - objstorages, - storage, - kafka_prefix: str, - kafka_consumer_group: str, - kafka_server: str, - check_dst: bool, - expected_copied: int, - expected_in_dst: int, - caplog, -): - kafka_prefix += ".swh.journal.objects" - - contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) - - for i, (sha1, content) in enumerate(contents.items()): - if i >= NUM_CONTENTS_DST: - break - - objstorages["dst"].add(content, obj_id=sha1) - - caplog.set_level(logging.DEBUG, "swh.journal.replay") - - result = invoke( - "content-replay", - "--stop-after-objects", - str(NUM_CONTENTS), - "--check-dst" if check_dst else "--no-check-dst", - journal_config={ - "brokers": [kafka_server], - "group_id": kafka_consumer_group, - "prefix": kafka_prefix, - }, - ) - expected = r"Done.\n" - assert result.exit_code == 0, result.output - assert re.fullmatch(expected, result.output, re.MULTILINE), result.output - - copied = 0 - in_dst = 0 - for record in caplog.records: - logtext = record.getMessage() - if "copied" in logtext: - copied += 1 - elif "in dst" in logtext: - in_dst += 1 - - assert ( - copied == expected_copied and in_dst == expected_in_dst - ), "Unexpected amount of objects copied, see the captured log for details" - - for (sha1, content) in contents.items(): - assert sha1 in objstorages["dst"], sha1 - assert objstorages["dst"].get(sha1) == content - - -class FlakyObjStorage(InMemoryObjStorage): - def __init__(self, *args, **kwargs): - state = kwargs.pop("state") - self.failures_left = Counter(kwargs.pop("failures")) - super().__init__(*args, **kwargs) - if state: - self.state = state - - def flaky_operation(self, op, obj_id): - if self.failures_left[op, obj_id] > 0: - self.failures_left[op, obj_id] -= 1 - raise RuntimeError("Failed %s on %s" % (op, hash_to_hex(obj_id))) - - def get(self, obj_id): - self.flaky_operation("get", obj_id) - return super().get(obj_id) - - def add(self, data, obj_id=None, check_presence=True): - self.flaky_operation("add", obj_id) - return super().add(data, obj_id=obj_id, check_presence=check_presence) - - def __contains__(self, obj_id): - self.flaky_operation("in", obj_id) - return super().__contains__(obj_id) - - -@_patch_objstorages(["src", "dst"]) -def test_replay_content_check_dst_retry( - objstorages, - storage, - kafka_prefix: str, - kafka_consumer_group: str, - kafka_server: str, - monkeypatch_retry_sleep, -): - kafka_prefix += ".swh.journal.objects" - - contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) - - failures = {} - for i, (sha1, content) in enumerate(contents.items()): - if i >= NUM_CONTENTS_DST: - break - - objstorages["dst"].add(content, obj_id=sha1) - failures["in", sha1] = 1 - - orig_dst = objstorages["dst"] - objstorages["dst"] = FlakyObjStorage(state=orig_dst.state, failures=failures) - - result = invoke( - "content-replay", - "--stop-after-objects", - str(NUM_CONTENTS), - "--check-dst", - journal_config={ - "brokers": [kafka_server], - "group_id": kafka_consumer_group, - "prefix": kafka_prefix, - }, - ) - expected = r"Done.\n" - assert result.exit_code == 0, result.output - assert re.fullmatch(expected, result.output, re.MULTILINE), result.output - - for (sha1, content) in contents.items(): - assert sha1 in objstorages["dst"], sha1 - assert objstorages["dst"].get(sha1) == content - - -@_patch_objstorages(["src", "dst"]) -def test_replay_content_failed_copy_retry( - objstorages, - storage, - kafka_prefix: str, - kafka_consumer_group: str, - kafka_server: str, - caplog, - monkeypatch_retry_sleep, -): - kafka_prefix += ".swh.journal.objects" - - contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) - - add_failures = {} - get_failures = {} - definitely_failed = set() - - # We want to generate operations failing 1 to CONTENT_REPLAY_RETRIES times. - # We generate failures for 2 different operations, get and add. - num_retry_contents = 2 * CONTENT_REPLAY_RETRIES - - assert ( - num_retry_contents < NUM_CONTENTS - ), "Need to generate more test contents to properly test retry behavior" - - for i, sha1 in enumerate(contents): - if i >= num_retry_contents: - break - - # This generates a number of failures, up to CONTENT_REPLAY_RETRIES - num_failures = (i % CONTENT_REPLAY_RETRIES) + 1 - - # This generates failures of add for the first CONTENT_REPLAY_RETRIES - # objects, then failures of get. - if i < CONTENT_REPLAY_RETRIES: - add_failures["add", sha1] = num_failures - else: - get_failures["get", sha1] = num_failures - - # Only contents that have CONTENT_REPLAY_RETRIES or more are - # definitely failing - if num_failures >= CONTENT_REPLAY_RETRIES: - definitely_failed.add(hash_to_hex(sha1)) - - objstorages["dst"] = FlakyObjStorage( - state=objstorages["dst"].state, failures=add_failures, - ) - objstorages["src"] = FlakyObjStorage( - state=objstorages["src"].state, failures=get_failures, - ) - - caplog.set_level(logging.DEBUG, "swh.journal.replay") - - result = invoke( - "content-replay", - "--stop-after-objects", - str(NUM_CONTENTS), - journal_config={ - "brokers": [kafka_server], - "group_id": kafka_consumer_group, - "prefix": kafka_prefix, - }, - ) - expected = r"Done.\n" - assert result.exit_code == 0, result.output - assert re.fullmatch(expected, result.output, re.MULTILINE), result.output - - copied = 0 - actually_failed = set() - for record in caplog.records: - logtext = record.getMessage() - if "copied" in logtext: - copied += 1 - elif "Failed operation" in logtext: - assert record.levelno == logging.ERROR - assert record.args["retries"] == CONTENT_REPLAY_RETRIES - actually_failed.add(record.args["obj_id"]) - - assert ( - actually_failed == definitely_failed - ), "Unexpected object copy failures; see captured log for details" - - for (sha1, content) in contents.items(): - if hash_to_hex(sha1) in definitely_failed: - assert sha1 not in objstorages["dst"] - continue - - assert sha1 in objstorages["dst"], sha1 - assert objstorages["dst"].get(sha1) == content - - -@_patch_objstorages(["src", "dst"]) -def test_replay_content_objnotfound( - objstorages, - storage, - kafka_prefix: str, - kafka_consumer_group: str, - kafka_server: str, - caplog, -): - kafka_prefix += ".swh.journal.objects" - - contents = _fill_objstorage_and_kafka(kafka_server, kafka_prefix, objstorages) - - num_contents_deleted = 5 - contents_deleted = set() - - for i, sha1 in enumerate(contents): - if i >= num_contents_deleted: - break - - del objstorages["src"].state[sha1] - contents_deleted.add(hash_to_hex(sha1)) - - caplog.set_level(logging.DEBUG, "swh.journal.replay") - - result = invoke( - "content-replay", - "--stop-after-objects", - str(NUM_CONTENTS), - journal_config={ - "brokers": [kafka_server], - "group_id": kafka_consumer_group, - "prefix": kafka_prefix, - }, - ) - expected = r"Done.\n" - assert result.exit_code == 0, result.output - assert re.fullmatch(expected, result.output, re.MULTILINE), result.output - - copied = 0 - not_in_src = set() - for record in caplog.records: - logtext = record.getMessage() - if "copied" in logtext: - copied += 1 - elif "object not found" in logtext: - # Check that the object id can be recovered from logs - assert record.levelno == logging.ERROR - not_in_src.add(record.args["obj_id"]) - - assert ( - copied == NUM_CONTENTS - num_contents_deleted - ), "Unexpected number of contents copied" - - assert ( - not_in_src == contents_deleted - ), "Mismatch between deleted contents and not_in_src logs" - - for (sha1, content) in contents.items(): - if sha1 not in objstorages["src"]: - continue - assert sha1 in objstorages["dst"], sha1 - assert objstorages["dst"].get(sha1) == content diff --git a/swh/journal/tests/test_replay.py b/swh/journal/tests/test_replay.py deleted file mode 100644 --- a/swh/journal/tests/test_replay.py +++ /dev/null @@ -1,33 +0,0 @@ -# Copyright (C) 2019-2020 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - - -from hypothesis import strategies, given, settings - -from swh.journal.replay import is_hash_in_bytearray - - -storage_config = {"cls": "pipeline", "steps": [{"cls": "memory"},]} - - -def make_topic(kafka_prefix: str, object_type: str) -> str: - return kafka_prefix + "." + object_type - - -hash_strategy = strategies.binary(min_size=20, max_size=20) - - -@settings(max_examples=500) -@given( - strategies.sets(hash_strategy, min_size=0, max_size=500), - strategies.sets(hash_strategy, min_size=10), -) -def test_is_hash_in_bytearray(haystack, needles): - array = b"".join(sorted(haystack)) - needles |= haystack # Exhaustively test for all objects in the array - for needle in needles: - assert is_hash_in_bytearray(needle, array, len(haystack)) == ( - needle in haystack - ) diff --git a/swh/journal/tests/test_write_replay.py b/swh/journal/tests/test_write_replay.py deleted file mode 100644 --- a/swh/journal/tests/test_write_replay.py +++ /dev/null @@ -1,54 +0,0 @@ -# Copyright (C) 2019-2020 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import functools - -from hypothesis import given, settings, HealthCheck -from hypothesis.strategies import lists - -from swh.model.hypothesis_strategies import present_contents -from swh.objstorage import get_objstorage - -from swh.journal.replay import process_replay_objects_content - -from .utils import MockedJournalClient, MockedKafkaWriter - - -@given(lists(present_contents(), min_size=1)) -@settings(suppress_health_check=[HealthCheck.too_slow]) -def test_write_replay_content(objects): - - queue = [] - replayer = MockedJournalClient(queue) - writer = MockedKafkaWriter(queue) - - objstorage1 = get_objstorage(cls="memory", args={}) - objstorage2 = get_objstorage(cls="memory", args={}) - - contents = [] - for obj in objects: - objstorage1.add(obj.data) - contents.append(obj) - writer.write_addition("content", obj) - - # Bail out early if we didn't insert any relevant objects... - queue_size = len(queue) - assert queue_size != 0, "No test objects found; hypothesis strategy bug?" - - assert replayer.stop_after_objects is None - replayer.stop_after_objects = queue_size - - worker_fn = functools.partial( - process_replay_objects_content, src=objstorage1, dst=objstorage2 - ) - - replayer.process(worker_fn) - - # only content with status visible will be copied in storage2 - expected_objstorage_state = { - c.sha1: c.data for c in contents if c.status == "visible" - } - - assert expected_objstorage_state == objstorage2.state