diff --git a/swh/indexer/journal_client.py b/swh/indexer/journal_client.py index e03975c..90aff7b 100644 --- a/swh/indexer/journal_client.py +++ b/swh/indexer/journal_client.py @@ -1,37 +1,37 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from swh.core.utils import grouper from swh.scheduler.utils import create_task_dict MAX_ORIGINS_PER_TASK = 100 def process_journal_objects(messages, *, scheduler, task_names): """Worker function for `JournalClient.process(worker_fn)`, after currification of `scheduler` and `task_names`.""" assert set(messages) == {'origin_visit'}, set(messages) process_origin_visits(messages['origin_visit'], scheduler, task_names) def process_origin_visits(visits, scheduler, task_names): task_dicts = [] logging.debug('processing origin visits %r', visits) if task_names.get('origin_metadata'): - visits = [visit for visit in visits if visit[b'status'] == b'full'] + visits = [visit for visit in visits if visit['status'] == 'full'] visit_batches = grouper(visits, MAX_ORIGINS_PER_TASK) for visit_batch in visit_batches: task_dicts.append(create_task_dict( task_names['origin_metadata'], 'oneshot', - [visit[b'origin'][b'url'] for visit in visit_batch], + [visit['origin']['url'] for visit in visit_batch], policy_update='update-dups', )) if task_dicts: scheduler.create_tasks(task_dicts) diff --git a/swh/indexer/tests/test_cli.py b/swh/indexer/tests/test_cli.py index 45fa528..6f93061 100644 --- a/swh/indexer/tests/test_cli.py +++ b/swh/indexer/tests/test_cli.py @@ -1,372 +1,372 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import namedtuple from functools import reduce import re import tempfile from unittest.mock import patch, MagicMock from click.testing import CliRunner from swh.model.hashutil import hash_to_bytes from swh.indexer.cli import cli CLI_CONFIG = ''' scheduler: cls: foo args: {} storage: cls: memory args: {} indexer_storage: cls: memory args: {} ''' def fill_idx_storage(idx_storage, nb_rows): tools = [ { 'tool_name': 'tool %d' % i, 'tool_version': '0.0.1', 'tool_configuration': {}, } for i in range(2) ] tools = idx_storage.indexer_configuration_add(tools) origin_metadata = [ { 'id': origin_id, 'origin_url': 'file:///dev/zero', 'from_revision': hash_to_bytes('abcd{:0>4}'.format(origin_id)), 'indexer_configuration_id': tools[origin_id % 2]['id'], 'metadata': {'name': 'origin %d' % origin_id}, 'mappings': ['mapping%d' % (origin_id % 10)] } for origin_id in range(nb_rows) ] revision_metadata = [ { 'id': hash_to_bytes('abcd{:0>4}'.format(origin_id)), 'indexer_configuration_id': tools[origin_id % 2]['id'], 'metadata': {'name': 'origin %d' % origin_id}, 'mappings': ['mapping%d' % (origin_id % 10)] } for origin_id in range(nb_rows) ] idx_storage.revision_intrinsic_metadata_add(revision_metadata) idx_storage.origin_intrinsic_metadata_add(origin_metadata) return [tool['id'] for tool in tools] def _origins_in_task_args(tasks): """Returns the set of origins contained in the arguments of the provided tasks (assumed to be of type index-origin-metadata).""" return reduce( set.union, (set(task['arguments']['args'][0]) for task in tasks), set() ) def _assert_tasks_for_origins(tasks, origins): expected_kwargs = {"policy_update": "update-dups"} assert {task['type'] for task in tasks} == {'index-origin-metadata'} assert all(len(task['arguments']['args']) == 1 for task in tasks) for task in tasks: assert task['arguments']['kwargs'] == expected_kwargs, task assert _origins_in_task_args(tasks) == set(origins) def invoke(scheduler, catch_exceptions, args): runner = CliRunner() with patch('swh.indexer.cli.get_scheduler') as get_scheduler_mock, \ tempfile.NamedTemporaryFile('a', suffix='.yml') as config_fd: config_fd.write(CLI_CONFIG) config_fd.seek(0) get_scheduler_mock.return_value = scheduler result = runner.invoke(cli, ['-C' + config_fd.name] + args) if not catch_exceptions and result.exception: print(result.output) raise result.exception return result def test_mapping_list(indexer_scheduler): result = invoke(indexer_scheduler, False, [ 'mapping', 'list', ]) expected_output = '\n'.join([ 'codemeta', 'gemspec', 'maven', 'npm', 'pkg-info', '', ]) assert result.exit_code == 0, result.output assert result.output == expected_output def test_mapping_list_terms(indexer_scheduler): result = invoke(indexer_scheduler, False, [ 'mapping', 'list-terms', ]) assert result.exit_code == 0, result.output assert re.search(r'http://schema.org/url:\n.*npm', result.output) assert re.search(r'http://schema.org/url:\n.*codemeta', result.output) assert re.search( r'https://codemeta.github.io/terms/developmentStatus:\n\tcodemeta', result.output) def test_mapping_list_terms_exclude(indexer_scheduler): result = invoke(indexer_scheduler, False, [ 'mapping', 'list-terms', '--exclude-mapping', 'codemeta' ]) assert result.exit_code == 0, result.output assert re.search(r'http://schema.org/url:\n.*npm', result.output) assert not re.search(r'http://schema.org/url:\n.*codemeta', result.output) assert not re.search( r'https://codemeta.github.io/terms/developmentStatus:\n\tcodemeta', result.output) @patch('swh.scheduler.cli.utils.TASK_BATCH_SIZE', 3) @patch('swh.scheduler.cli_utils.TASK_BATCH_SIZE', 3) def test_origin_metadata_reindex_empty_db( indexer_scheduler, idx_storage, storage): result = invoke(indexer_scheduler, False, [ 'schedule', 'reindex_origin_metadata', ]) expected_output = ( 'Nothing to do (no origin metadata matched the criteria).\n' ) assert result.exit_code == 0, result.output assert result.output == expected_output tasks = indexer_scheduler.search_tasks() assert len(tasks) == 0 @patch('swh.scheduler.cli.utils.TASK_BATCH_SIZE', 3) @patch('swh.scheduler.cli_utils.TASK_BATCH_SIZE', 3) def test_origin_metadata_reindex_divisor( indexer_scheduler, idx_storage, storage): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 90) result = invoke(indexer_scheduler, False, [ 'schedule', 'reindex_origin_metadata', ]) # Check the output expected_output = ( 'Scheduled 3 tasks (30 origins).\n' 'Scheduled 6 tasks (60 origins).\n' 'Scheduled 9 tasks (90 origins).\n' 'Done.\n' ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 9 _assert_tasks_for_origins(tasks, range(90)) @patch('swh.scheduler.cli.utils.TASK_BATCH_SIZE', 3) @patch('swh.scheduler.cli_utils.TASK_BATCH_SIZE', 3) def test_origin_metadata_reindex_dry_run( indexer_scheduler, idx_storage, storage): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 90) result = invoke(indexer_scheduler, False, [ 'schedule', '--dry-run', 'reindex_origin_metadata', ]) # Check the output expected_output = ( 'Scheduled 3 tasks (30 origins).\n' 'Scheduled 6 tasks (60 origins).\n' 'Scheduled 9 tasks (90 origins).\n' 'Done.\n' ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 0 @patch('swh.scheduler.cli.utils.TASK_BATCH_SIZE', 3) @patch('swh.scheduler.cli_utils.TASK_BATCH_SIZE', 3) def test_origin_metadata_reindex_nondivisor( indexer_scheduler, idx_storage, storage): """Tests the re-indexing when neither origin_batch_size or task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 70) result = invoke(indexer_scheduler, False, [ 'schedule', 'reindex_origin_metadata', '--batch-size', '20', ]) # Check the output expected_output = ( 'Scheduled 3 tasks (60 origins).\n' 'Scheduled 4 tasks (70 origins).\n' 'Done.\n' ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 4 _assert_tasks_for_origins(tasks, range(70)) @patch('swh.scheduler.cli.utils.TASK_BATCH_SIZE', 3) @patch('swh.scheduler.cli_utils.TASK_BATCH_SIZE', 3) def test_origin_metadata_reindex_filter_one_mapping( indexer_scheduler, idx_storage, storage): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 110) result = invoke(indexer_scheduler, False, [ 'schedule', 'reindex_origin_metadata', '--mapping', 'mapping1', ]) # Check the output expected_output = ( 'Scheduled 2 tasks (11 origins).\n' 'Done.\n' ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 2 _assert_tasks_for_origins( tasks, [1, 11, 21, 31, 41, 51, 61, 71, 81, 91, 101]) @patch('swh.scheduler.cli.utils.TASK_BATCH_SIZE', 3) @patch('swh.scheduler.cli_utils.TASK_BATCH_SIZE', 3) def test_origin_metadata_reindex_filter_two_mappings( indexer_scheduler, idx_storage, storage): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 110) result = invoke(indexer_scheduler, False, [ 'schedule', 'reindex_origin_metadata', '--mapping', 'mapping1', '--mapping', 'mapping2', ]) # Check the output expected_output = ( 'Scheduled 3 tasks (22 origins).\n' 'Done.\n' ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 3 _assert_tasks_for_origins( tasks, [1, 11, 21, 31, 41, 51, 61, 71, 81, 91, 101, 2, 12, 22, 32, 42, 52, 62, 72, 82, 92, 102]) @patch('swh.scheduler.cli.utils.TASK_BATCH_SIZE', 3) @patch('swh.scheduler.cli_utils.TASK_BATCH_SIZE', 3) def test_origin_metadata_reindex_filter_one_tool( indexer_scheduler, idx_storage, storage): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" tool_ids = fill_idx_storage(idx_storage, 110) result = invoke(indexer_scheduler, False, [ 'schedule', 'reindex_origin_metadata', '--tool-id', str(tool_ids[0]), ]) # Check the output expected_output = ( 'Scheduled 3 tasks (30 origins).\n' 'Scheduled 6 tasks (55 origins).\n' 'Done.\n' ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 6 _assert_tasks_for_origins( tasks, [x*2 for x in range(55)]) def test_journal_client(storage, indexer_scheduler): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" mock_consumer = MagicMock() partition = namedtuple('_partition', ['topic'])( topic='swh.journal.objects.origin_visit') message = namedtuple('_message', ['value'])( value={ - b'status': b'full', - b'origin': { - b'url': 'file:///dev/zero', + 'status': 'full', + 'origin': { + 'url': 'file:///dev/zero', } } ) mock_consumer.poll.return_value = {partition: [message]} with patch('swh.journal.client.KafkaConsumer', return_value=mock_consumer): result = invoke(indexer_scheduler, False, [ 'journal-client', '--max-messages', '1', '--broker', '192.0.2.1', '--prefix', 'swh.journal.objects', '--group-id', 'test-consumer', ]) mock_consumer.subscribe.assert_called_once_with( topics=['swh.journal.objects.origin_visit']) mock_consumer.poll.assert_called_once_with() mock_consumer.commit.assert_called_once_with() # Check the output expected_output = ( 'Processed 1 messages.\n' 'Done.\n' ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 1 _assert_tasks_for_origins( tasks, ['file:///dev/zero']) diff --git a/swh/indexer/tests/test_journal_client.py b/swh/indexer/tests/test_journal_client.py index fb6cc1b..f0b9acf 100644 --- a/swh/indexer/tests/test_journal_client.py +++ b/swh/indexer/tests/test_journal_client.py @@ -1,132 +1,132 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from unittest.mock import Mock, patch from swh.indexer.journal_client import process_journal_objects class JournalClientTest(unittest.TestCase): def testOneOriginVisit(self): mock_scheduler = Mock() messages = { 'origin_visit': [ { - b'status': b'full', - b'origin': { - b'url': 'file:///dev/zero', + 'status': 'full', + 'origin': { + 'url': 'file:///dev/zero', } }, ] } process_journal_objects( messages, scheduler=mock_scheduler, task_names={'origin_metadata': 'task-name'}) self.assertTrue(mock_scheduler.create_tasks.called) call_args = mock_scheduler.create_tasks.call_args (args, kwargs) = call_args self.assertEqual(kwargs, {}) del args[0][0]['next_run'] self.assertEqual(args, ([ { 'arguments': { 'kwargs': {'policy_update': 'update-dups'}, 'args': (['file:///dev/zero'],) }, 'policy': 'oneshot', 'type': 'task-name' }, ],)) def testOneOriginVisitBatch(self): mock_scheduler = Mock() messages = { 'origin_visit': [ { - b'status': b'full', - b'origin': { - b'url': 'file:///dev/zero', + 'status': 'full', + 'origin': { + 'url': 'file:///dev/zero', } }, { - b'status': b'full', - b'origin': { - b'url': 'file:///tmp/foobar', + 'status': 'full', + 'origin': { + 'url': 'file:///tmp/foobar', } }, ] } process_journal_objects( messages, scheduler=mock_scheduler, task_names={'origin_metadata': 'task-name'}) self.assertTrue(mock_scheduler.create_tasks.called) call_args = mock_scheduler.create_tasks.call_args (args, kwargs) = call_args self.assertEqual(kwargs, {}) del args[0][0]['next_run'] self.assertEqual(args, ([ { 'arguments': { 'kwargs': {'policy_update': 'update-dups'}, 'args': (['file:///dev/zero', 'file:///tmp/foobar'],) }, 'policy': 'oneshot', 'type': 'task-name' }, ],)) @patch('swh.indexer.journal_client.MAX_ORIGINS_PER_TASK', 2) def testOriginVisitBatches(self): mock_scheduler = Mock() messages = { 'origin_visit': [ { - b'status': b'full', - b'origin': { - b'url': 'file:///dev/zero', + 'status': 'full', + 'origin': { + 'url': 'file:///dev/zero', } }, { - b'status': b'full', - b'origin': { - b'url': 'file:///tmp/foobar', + 'status': 'full', + 'origin': { + 'url': 'file:///tmp/foobar', } }, { - b'status': b'full', - b'origin': { - b'url': 'file:///tmp/spamegg', + 'status': 'full', + 'origin': { + 'url': 'file:///tmp/spamegg', } }, ] } process_journal_objects( messages, scheduler=mock_scheduler, task_names={'origin_metadata': 'task-name'}) self.assertTrue(mock_scheduler.create_tasks.called) call_args = mock_scheduler.create_tasks.call_args (args, kwargs) = call_args self.assertEqual(kwargs, {}) del args[0][0]['next_run'] del args[0][1]['next_run'] self.assertEqual(args, ([ { 'arguments': { 'kwargs': {'policy_update': 'update-dups'}, 'args': (['file:///dev/zero', 'file:///tmp/foobar'],) }, 'policy': 'oneshot', 'type': 'task-name' }, { 'arguments': { 'kwargs': {'policy_update': 'update-dups'}, 'args': (['file:///tmp/spamegg'],) }, 'policy': 'oneshot', 'type': 'task-name' }, ],))