Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/tests/test_cli.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from collections import Counter | from collections import Counter | ||||
import copy | import copy | ||||
import functools | import functools | ||||
import logging | import logging | ||||
import re | import re | ||||
import tempfile | import tempfile | ||||
from typing import Any, Dict | |||||
from unittest.mock import patch, MagicMock | from unittest.mock import patch, MagicMock | ||||
from click.testing import CliRunner | from click.testing import CliRunner | ||||
from confluent_kafka import Producer | from confluent_kafka import Producer | ||||
import pytest | import pytest | ||||
import yaml | import yaml | ||||
from swh.model.hashutil import hash_to_hex | from swh.model.hashutil import hash_to_hex | ||||
from swh.objstorage.backends.in_memory import InMemoryObjStorage | from swh.objstorage.backends.in_memory import InMemoryObjStorage | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.journal.cli import cli, get_journal_client | from swh.journal.cli import cli, get_journal_client | ||||
from swh.journal.replay import CONTENT_REPLAY_RETRIES | from swh.journal.replay import CONTENT_REPLAY_RETRIES | ||||
from swh.journal.serializers import key_to_kafka, value_to_kafka | from swh.journal.serializers import key_to_kafka | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
CLI_CONFIG = { | CLI_CONFIG = { | ||||
"storage": {"cls": "memory",}, | "storage": {"cls": "memory",}, | ||||
"objstorage_src": {"cls": "mocked", "name": "src",}, | "objstorage_src": {"cls": "mocked", "name": "src",}, | ||||
"objstorage_dst": {"cls": "mocked", "name": "dst",}, | "objstorage_dst": {"cls": "mocked", "name": "dst",}, | ||||
} | } | ||||
@pytest.fixture | @pytest.fixture | ||||
def storage(): | def storage(): | ||||
"""An swh-storage object that gets injected into the CLI functions.""" | """An swh-storage object that gets injected into the CLI functions.""" | ||||
storage_config = {"cls": "pipeline", "steps": [{"cls": "memory"},]} | storage_config = {"cls": "pipeline", "steps": [{"cls": "memory"},]} | ||||
storage = get_storage(**storage_config) | storage = get_storage(**storage_config) | ||||
with patch("swh.journal.cli.get_storage") as get_storage_mock: | with patch("swh.storage.get_storage") as get_storage_mock: | ||||
get_storage_mock.return_value = storage | get_storage_mock.return_value = storage | ||||
yield storage | yield storage | ||||
@pytest.fixture | @pytest.fixture | ||||
def monkeypatch_retry_sleep(monkeypatch): | def monkeypatch_retry_sleep(monkeypatch): | ||||
from swh.journal.replay import copy_object, obj_in_objstorage | from swh.journal.replay import copy_object, obj_in_objstorage | ||||
▲ Show 20 Lines • Show All 46 Lines • ▼ Show 20 Lines | def test_get_journal_client_config(kafka_server): | ||||
} | } | ||||
ctx = MagicMock(obj={"config": cfg}) | ctx = MagicMock(obj={"config": cfg}) | ||||
client = get_journal_client(ctx, stop_after_objects=10, prefix="prefix") | client = get_journal_client(ctx, stop_after_objects=10, prefix="prefix") | ||||
assert client.subscription == ["prefix.content"] | assert client.subscription == ["prefix.content"] | ||||
assert client.stop_after_objects == 10 | assert client.stop_after_objects == 10 | ||||
assert client.batch_size == 50 | assert client.batch_size == 50 | ||||
def test_replay( | |||||
storage, kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, | |||||
): | |||||
kafka_prefix += ".swh.journal.objects" | |||||
producer = Producer( | |||||
{ | |||||
"bootstrap.servers": kafka_server, | |||||
"client.id": "test-producer", | |||||
"acks": "all", | |||||
} | |||||
) | |||||
snapshot = { | |||||
"id": b"foo", | |||||
"branches": {b"HEAD": {"target_type": "revision", "target": b"\x01" * 20,}}, | |||||
} # type: Dict[str, Any] | |||||
producer.produce( | |||||
topic=kafka_prefix + ".snapshot", | |||||
key=key_to_kafka(snapshot["id"]), | |||||
value=value_to_kafka(snapshot), | |||||
) | |||||
producer.flush() | |||||
logger.debug("Flushed producer") | |||||
result = invoke( | |||||
"replay", | |||||
"--stop-after-objects", | |||||
"1", | |||||
journal_config={ | |||||
"brokers": [kafka_server], | |||||
"group_id": kafka_consumer_group, | |||||
"prefix": kafka_prefix, | |||||
}, | |||||
) | |||||
expected = r"Done.\n" | |||||
assert result.exit_code == 0, result.output | |||||
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output | |||||
assert storage.snapshot_get(snapshot["id"]) == {**snapshot, "next_branch": None} | |||||
def _patch_objstorages(names): | def _patch_objstorages(names): | ||||
objstorages = {name: InMemoryObjStorage() for name in names} | objstorages = {name: InMemoryObjStorage() for name in names} | ||||
def get_mock_objstorage(cls, **args): | def get_mock_objstorage(cls, **args): | ||||
assert cls == "mocked", cls | assert cls == "mocked", cls | ||||
return objstorages[args["name"]] | return objstorages[args["name"]] | ||||
def decorator(f): | def decorator(f): | ||||
▲ Show 20 Lines • Show All 497 Lines • Show Last 20 Lines |