Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/tests/test_cli.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from collections import namedtuple | |||||
from functools import reduce | from functools import reduce | ||||
import re | import re | ||||
import tempfile | import tempfile | ||||
from unittest.mock import patch | from unittest.mock import patch, MagicMock | ||||
from click.testing import CliRunner | from click.testing import CliRunner | ||||
from swh.model.hashutil import hash_to_bytes | from swh.model.hashutil import hash_to_bytes | ||||
from swh.indexer.cli import cli | from swh.indexer.cli import cli | ||||
▲ Show 20 Lines • Show All 53 Lines • ▼ Show 20 Lines | def _origins_in_task_args(tasks): | ||||
return reduce( | return reduce( | ||||
set.union, | set.union, | ||||
(set(task['arguments']['args'][0]) for task in tasks), | (set(task['arguments']['args'][0]) for task in tasks), | ||||
set() | set() | ||||
) | ) | ||||
def _assert_tasks_for_origins(tasks, origins): | def _assert_tasks_for_origins(tasks, origins): | ||||
expected_kwargs = {"policy_update": "update-dups", "parse_ids": False} | expected_kwargs = {"policy_update": "update-dups"} | ||||
assert {task['type'] for task in tasks} == {'index-origin-metadata'} | assert {task['type'] for task in tasks} == {'index-origin-metadata'} | ||||
assert all(len(task['arguments']['args']) == 1 for task in tasks) | assert all(len(task['arguments']['args']) == 1 for task in tasks) | ||||
assert all(task['arguments']['kwargs'] == expected_kwargs | for task in tasks: | ||||
for task in tasks) | assert task['arguments']['kwargs'] == expected_kwargs, task | ||||
assert _origins_in_task_args(tasks) == set(origins) | assert _origins_in_task_args(tasks) == set(origins) | ||||
def invoke(scheduler, catch_exceptions, args): | def invoke(scheduler, catch_exceptions, args): | ||||
runner = CliRunner() | runner = CliRunner() | ||||
with patch('swh.indexer.cli.get_scheduler') as get_scheduler_mock, \ | with patch('swh.indexer.cli.get_scheduler') as get_scheduler_mock, \ | ||||
tempfile.NamedTemporaryFile('a', suffix='.yml') as config_fd: | tempfile.NamedTemporaryFile('a', suffix='.yml') as config_fd: | ||||
config_fd.write(CLI_CONFIG) | config_fd.write(CLI_CONFIG) | ||||
▲ Show 20 Lines • Show All 223 Lines • ▼ Show 20 Lines | def test_origin_metadata_reindex_filter_one_tool( | ||||
assert result.output == expected_output | assert result.output == expected_output | ||||
# Check scheduled tasks | # Check scheduled tasks | ||||
tasks = indexer_scheduler.search_tasks() | tasks = indexer_scheduler.search_tasks() | ||||
assert len(tasks) == 6 | assert len(tasks) == 6 | ||||
_assert_tasks_for_origins( | _assert_tasks_for_origins( | ||||
tasks, | tasks, | ||||
[x*2 for x in range(55)]) | [x*2 for x in range(55)]) | ||||
def test_journal_client(storage, indexer_scheduler): | |||||
"""Tests the re-indexing when origin_batch_size*task_batch_size is a | |||||
divisor of nb_origins.""" | |||||
mock_consumer = MagicMock() | |||||
partition = namedtuple('_partition', ['topic'])( | |||||
topic='swh.journal.objects.origin_visit') | |||||
message = namedtuple('_message', ['value'])( | |||||
value={ | |||||
b'status': b'full', | |||||
b'origin': { | |||||
b'url': 'file:///dev/zero', | |||||
} | |||||
} | |||||
) | |||||
mock_consumer.poll.return_value = {partition: [message]} | |||||
with patch('swh.journal.client.KafkaConsumer', | |||||
return_value=mock_consumer): | |||||
result = invoke(indexer_scheduler, False, [ | |||||
'journal-client', | |||||
'--max-messages', '1', | |||||
'--broker', '192.0.2.1', | |||||
'--prefix', 'swh.journal.objects', | |||||
'--group-id', 'test-consumer', | |||||
]) | |||||
mock_consumer.subscribe.assert_called_once_with( | |||||
topics=['swh.journal.objects.origin_visit']) | |||||
mock_consumer.poll.assert_called_once_with() | |||||
mock_consumer.commit.assert_called_once_with() | |||||
# Check the output | |||||
expected_output = ( | |||||
'Processed 1 messages.\n' | |||||
'Done.\n' | |||||
) | |||||
assert result.exit_code == 0, result.output | |||||
assert result.output == expected_output | |||||
# Check scheduled tasks | |||||
tasks = indexer_scheduler.search_tasks() | |||||
assert len(tasks) == 1 | |||||
_assert_tasks_for_origins( | |||||
tasks, | |||||
['file:///dev/zero']) |