diff --git a/PKG-INFO b/PKG-INFO index 492d3de..979311c 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,69 +1,69 @@ Metadata-Version: 2.1 Name: swh.indexer -Version: 0.0.150 +Version: 0.0.151 Summary: Software Heritage Content Indexer Home-page: https://forge.softwareheritage.org/diffusion/78/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Source, https://forge.softwareheritage.org/source/swh-indexer -Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest +Project-URL: Funding, https://www.softwareheritage.org/donate Description: swh-indexer ============ Tools to compute multiple indexes on SWH's raw contents: - content: - mimetype - ctags - language - fossology-license - metadata - revision: - metadata An indexer is in charge of: - looking up objects - extracting information from those objects - store those information in the swh-indexer db There are multiple indexers working on different object types: - content indexer: works with content sha1 hashes - revision indexer: works with revision sha1 hashes - origin indexer: works with origin identifiers Indexation procedure: - receive batch of ids - retrieve the associated data depending on object type - compute for that object some index - store the result to swh's storage Current content indexers: - mimetype (queue swh_indexer_content_mimetype): detect the encoding and mimetype - language (queue swh_indexer_content_language): detect the programming language - ctags (queue swh_indexer_content_ctags): compute tags information - fossology-license (queue swh_indexer_fossology_license): compute the license - metadata: translate file into translated_metadata dict Current revision indexers: - metadata: detects files containing metadata and retrieves translated_metadata in content_metadata table in storage or run content indexer to translate files. Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh.indexer.egg-info/PKG-INFO b/swh.indexer.egg-info/PKG-INFO index 492d3de..979311c 100644 --- a/swh.indexer.egg-info/PKG-INFO +++ b/swh.indexer.egg-info/PKG-INFO @@ -1,69 +1,69 @@ Metadata-Version: 2.1 Name: swh.indexer -Version: 0.0.150 +Version: 0.0.151 Summary: Software Heritage Content Indexer Home-page: https://forge.softwareheritage.org/diffusion/78/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Source, https://forge.softwareheritage.org/source/swh-indexer -Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest +Project-URL: Funding, https://www.softwareheritage.org/donate Description: swh-indexer ============ Tools to compute multiple indexes on SWH's raw contents: - content: - mimetype - ctags - language - fossology-license - metadata - revision: - metadata An indexer is in charge of: - looking up objects - extracting information from those objects - store those information in the swh-indexer db There are multiple indexers working on different object types: - content indexer: works with content sha1 hashes - revision indexer: works with revision sha1 hashes - origin indexer: works with origin identifiers Indexation procedure: - receive batch of ids - retrieve the associated data depending on object type - compute for that object some index - store the result to swh's storage Current content indexers: - mimetype (queue swh_indexer_content_mimetype): detect the encoding and mimetype - language (queue swh_indexer_content_language): detect the programming language - ctags (queue swh_indexer_content_ctags): compute tags information - fossology-license (queue swh_indexer_fossology_license): compute the license - metadata: translate file into translated_metadata dict Current revision indexers: - metadata: detects files containing metadata and retrieves translated_metadata in content_metadata table in storage or run content indexer to translate files. Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh/indexer/journal_client.py b/swh/indexer/journal_client.py index e03975c..90aff7b 100644 --- a/swh/indexer/journal_client.py +++ b/swh/indexer/journal_client.py @@ -1,37 +1,37 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from swh.core.utils import grouper from swh.scheduler.utils import create_task_dict MAX_ORIGINS_PER_TASK = 100 def process_journal_objects(messages, *, scheduler, task_names): """Worker function for `JournalClient.process(worker_fn)`, after currification of `scheduler` and `task_names`.""" assert set(messages) == {'origin_visit'}, set(messages) process_origin_visits(messages['origin_visit'], scheduler, task_names) def process_origin_visits(visits, scheduler, task_names): task_dicts = [] logging.debug('processing origin visits %r', visits) if task_names.get('origin_metadata'): - visits = [visit for visit in visits if visit[b'status'] == b'full'] + visits = [visit for visit in visits if visit['status'] == 'full'] visit_batches = grouper(visits, MAX_ORIGINS_PER_TASK) for visit_batch in visit_batches: task_dicts.append(create_task_dict( task_names['origin_metadata'], 'oneshot', - [visit[b'origin'][b'url'] for visit in visit_batch], + [visit['origin']['url'] for visit in visit_batch], policy_update='update-dups', )) if task_dicts: scheduler.create_tasks(task_dicts) diff --git a/swh/indexer/tests/test_cli.py b/swh/indexer/tests/test_cli.py index 45fa528..6f93061 100644 --- a/swh/indexer/tests/test_cli.py +++ b/swh/indexer/tests/test_cli.py @@ -1,372 +1,372 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import namedtuple from functools import reduce import re import tempfile from unittest.mock import patch, MagicMock from click.testing import CliRunner from swh.model.hashutil import hash_to_bytes from swh.indexer.cli import cli CLI_CONFIG = ''' scheduler: cls: foo args: {} storage: cls: memory args: {} indexer_storage: cls: memory args: {} ''' def fill_idx_storage(idx_storage, nb_rows): tools = [ { 'tool_name': 'tool %d' % i, 'tool_version': '0.0.1', 'tool_configuration': {}, } for i in range(2) ] tools = idx_storage.indexer_configuration_add(tools) origin_metadata = [ { 'id': origin_id, 'origin_url': 'file:///dev/zero', 'from_revision': hash_to_bytes('abcd{:0>4}'.format(origin_id)), 'indexer_configuration_id': tools[origin_id % 2]['id'], 'metadata': {'name': 'origin %d' % origin_id}, 'mappings': ['mapping%d' % (origin_id % 10)] } for origin_id in range(nb_rows) ] revision_metadata = [ { 'id': hash_to_bytes('abcd{:0>4}'.format(origin_id)), 'indexer_configuration_id': tools[origin_id % 2]['id'], 'metadata': {'name': 'origin %d' % origin_id}, 'mappings': ['mapping%d' % (origin_id % 10)] } for origin_id in range(nb_rows) ] idx_storage.revision_intrinsic_metadata_add(revision_metadata) idx_storage.origin_intrinsic_metadata_add(origin_metadata) return [tool['id'] for tool in tools] def _origins_in_task_args(tasks): """Returns the set of origins contained in the arguments of the provided tasks (assumed to be of type index-origin-metadata).""" return reduce( set.union, (set(task['arguments']['args'][0]) for task in tasks), set() ) def _assert_tasks_for_origins(tasks, origins): expected_kwargs = {"policy_update": "update-dups"} assert {task['type'] for task in tasks} == {'index-origin-metadata'} assert all(len(task['arguments']['args']) == 1 for task in tasks) for task in tasks: assert task['arguments']['kwargs'] == expected_kwargs, task assert _origins_in_task_args(tasks) == set(origins) def invoke(scheduler, catch_exceptions, args): runner = CliRunner() with patch('swh.indexer.cli.get_scheduler') as get_scheduler_mock, \ tempfile.NamedTemporaryFile('a', suffix='.yml') as config_fd: config_fd.write(CLI_CONFIG) config_fd.seek(0) get_scheduler_mock.return_value = scheduler result = runner.invoke(cli, ['-C' + config_fd.name] + args) if not catch_exceptions and result.exception: print(result.output) raise result.exception return result def test_mapping_list(indexer_scheduler): result = invoke(indexer_scheduler, False, [ 'mapping', 'list', ]) expected_output = '\n'.join([ 'codemeta', 'gemspec', 'maven', 'npm', 'pkg-info', '', ]) assert result.exit_code == 0, result.output assert result.output == expected_output def test_mapping_list_terms(indexer_scheduler): result = invoke(indexer_scheduler, False, [ 'mapping', 'list-terms', ]) assert result.exit_code == 0, result.output assert re.search(r'http://schema.org/url:\n.*npm', result.output) assert re.search(r'http://schema.org/url:\n.*codemeta', result.output) assert re.search( r'https://codemeta.github.io/terms/developmentStatus:\n\tcodemeta', result.output) def test_mapping_list_terms_exclude(indexer_scheduler): result = invoke(indexer_scheduler, False, [ 'mapping', 'list-terms', '--exclude-mapping', 'codemeta' ]) assert result.exit_code == 0, result.output assert re.search(r'http://schema.org/url:\n.*npm', result.output) assert not re.search(r'http://schema.org/url:\n.*codemeta', result.output) assert not re.search( r'https://codemeta.github.io/terms/developmentStatus:\n\tcodemeta', result.output) @patch('swh.scheduler.cli.utils.TASK_BATCH_SIZE', 3) @patch('swh.scheduler.cli_utils.TASK_BATCH_SIZE', 3) def test_origin_metadata_reindex_empty_db( indexer_scheduler, idx_storage, storage): result = invoke(indexer_scheduler, False, [ 'schedule', 'reindex_origin_metadata', ]) expected_output = ( 'Nothing to do (no origin metadata matched the criteria).\n' ) assert result.exit_code == 0, result.output assert result.output == expected_output tasks = indexer_scheduler.search_tasks() assert len(tasks) == 0 @patch('swh.scheduler.cli.utils.TASK_BATCH_SIZE', 3) @patch('swh.scheduler.cli_utils.TASK_BATCH_SIZE', 3) def test_origin_metadata_reindex_divisor( indexer_scheduler, idx_storage, storage): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 90) result = invoke(indexer_scheduler, False, [ 'schedule', 'reindex_origin_metadata', ]) # Check the output expected_output = ( 'Scheduled 3 tasks (30 origins).\n' 'Scheduled 6 tasks (60 origins).\n' 'Scheduled 9 tasks (90 origins).\n' 'Done.\n' ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 9 _assert_tasks_for_origins(tasks, range(90)) @patch('swh.scheduler.cli.utils.TASK_BATCH_SIZE', 3) @patch('swh.scheduler.cli_utils.TASK_BATCH_SIZE', 3) def test_origin_metadata_reindex_dry_run( indexer_scheduler, idx_storage, storage): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 90) result = invoke(indexer_scheduler, False, [ 'schedule', '--dry-run', 'reindex_origin_metadata', ]) # Check the output expected_output = ( 'Scheduled 3 tasks (30 origins).\n' 'Scheduled 6 tasks (60 origins).\n' 'Scheduled 9 tasks (90 origins).\n' 'Done.\n' ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 0 @patch('swh.scheduler.cli.utils.TASK_BATCH_SIZE', 3) @patch('swh.scheduler.cli_utils.TASK_BATCH_SIZE', 3) def test_origin_metadata_reindex_nondivisor( indexer_scheduler, idx_storage, storage): """Tests the re-indexing when neither origin_batch_size or task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 70) result = invoke(indexer_scheduler, False, [ 'schedule', 'reindex_origin_metadata', '--batch-size', '20', ]) # Check the output expected_output = ( 'Scheduled 3 tasks (60 origins).\n' 'Scheduled 4 tasks (70 origins).\n' 'Done.\n' ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 4 _assert_tasks_for_origins(tasks, range(70)) @patch('swh.scheduler.cli.utils.TASK_BATCH_SIZE', 3) @patch('swh.scheduler.cli_utils.TASK_BATCH_SIZE', 3) def test_origin_metadata_reindex_filter_one_mapping( indexer_scheduler, idx_storage, storage): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 110) result = invoke(indexer_scheduler, False, [ 'schedule', 'reindex_origin_metadata', '--mapping', 'mapping1', ]) # Check the output expected_output = ( 'Scheduled 2 tasks (11 origins).\n' 'Done.\n' ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 2 _assert_tasks_for_origins( tasks, [1, 11, 21, 31, 41, 51, 61, 71, 81, 91, 101]) @patch('swh.scheduler.cli.utils.TASK_BATCH_SIZE', 3) @patch('swh.scheduler.cli_utils.TASK_BATCH_SIZE', 3) def test_origin_metadata_reindex_filter_two_mappings( indexer_scheduler, idx_storage, storage): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 110) result = invoke(indexer_scheduler, False, [ 'schedule', 'reindex_origin_metadata', '--mapping', 'mapping1', '--mapping', 'mapping2', ]) # Check the output expected_output = ( 'Scheduled 3 tasks (22 origins).\n' 'Done.\n' ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 3 _assert_tasks_for_origins( tasks, [1, 11, 21, 31, 41, 51, 61, 71, 81, 91, 101, 2, 12, 22, 32, 42, 52, 62, 72, 82, 92, 102]) @patch('swh.scheduler.cli.utils.TASK_BATCH_SIZE', 3) @patch('swh.scheduler.cli_utils.TASK_BATCH_SIZE', 3) def test_origin_metadata_reindex_filter_one_tool( indexer_scheduler, idx_storage, storage): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" tool_ids = fill_idx_storage(idx_storage, 110) result = invoke(indexer_scheduler, False, [ 'schedule', 'reindex_origin_metadata', '--tool-id', str(tool_ids[0]), ]) # Check the output expected_output = ( 'Scheduled 3 tasks (30 origins).\n' 'Scheduled 6 tasks (55 origins).\n' 'Done.\n' ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 6 _assert_tasks_for_origins( tasks, [x*2 for x in range(55)]) def test_journal_client(storage, indexer_scheduler): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" mock_consumer = MagicMock() partition = namedtuple('_partition', ['topic'])( topic='swh.journal.objects.origin_visit') message = namedtuple('_message', ['value'])( value={ - b'status': b'full', - b'origin': { - b'url': 'file:///dev/zero', + 'status': 'full', + 'origin': { + 'url': 'file:///dev/zero', } } ) mock_consumer.poll.return_value = {partition: [message]} with patch('swh.journal.client.KafkaConsumer', return_value=mock_consumer): result = invoke(indexer_scheduler, False, [ 'journal-client', '--max-messages', '1', '--broker', '192.0.2.1', '--prefix', 'swh.journal.objects', '--group-id', 'test-consumer', ]) mock_consumer.subscribe.assert_called_once_with( topics=['swh.journal.objects.origin_visit']) mock_consumer.poll.assert_called_once_with() mock_consumer.commit.assert_called_once_with() # Check the output expected_output = ( 'Processed 1 messages.\n' 'Done.\n' ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 1 _assert_tasks_for_origins( tasks, ['file:///dev/zero']) diff --git a/swh/indexer/tests/test_journal_client.py b/swh/indexer/tests/test_journal_client.py index fb6cc1b..f0b9acf 100644 --- a/swh/indexer/tests/test_journal_client.py +++ b/swh/indexer/tests/test_journal_client.py @@ -1,132 +1,132 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from unittest.mock import Mock, patch from swh.indexer.journal_client import process_journal_objects class JournalClientTest(unittest.TestCase): def testOneOriginVisit(self): mock_scheduler = Mock() messages = { 'origin_visit': [ { - b'status': b'full', - b'origin': { - b'url': 'file:///dev/zero', + 'status': 'full', + 'origin': { + 'url': 'file:///dev/zero', } }, ] } process_journal_objects( messages, scheduler=mock_scheduler, task_names={'origin_metadata': 'task-name'}) self.assertTrue(mock_scheduler.create_tasks.called) call_args = mock_scheduler.create_tasks.call_args (args, kwargs) = call_args self.assertEqual(kwargs, {}) del args[0][0]['next_run'] self.assertEqual(args, ([ { 'arguments': { 'kwargs': {'policy_update': 'update-dups'}, 'args': (['file:///dev/zero'],) }, 'policy': 'oneshot', 'type': 'task-name' }, ],)) def testOneOriginVisitBatch(self): mock_scheduler = Mock() messages = { 'origin_visit': [ { - b'status': b'full', - b'origin': { - b'url': 'file:///dev/zero', + 'status': 'full', + 'origin': { + 'url': 'file:///dev/zero', } }, { - b'status': b'full', - b'origin': { - b'url': 'file:///tmp/foobar', + 'status': 'full', + 'origin': { + 'url': 'file:///tmp/foobar', } }, ] } process_journal_objects( messages, scheduler=mock_scheduler, task_names={'origin_metadata': 'task-name'}) self.assertTrue(mock_scheduler.create_tasks.called) call_args = mock_scheduler.create_tasks.call_args (args, kwargs) = call_args self.assertEqual(kwargs, {}) del args[0][0]['next_run'] self.assertEqual(args, ([ { 'arguments': { 'kwargs': {'policy_update': 'update-dups'}, 'args': (['file:///dev/zero', 'file:///tmp/foobar'],) }, 'policy': 'oneshot', 'type': 'task-name' }, ],)) @patch('swh.indexer.journal_client.MAX_ORIGINS_PER_TASK', 2) def testOriginVisitBatches(self): mock_scheduler = Mock() messages = { 'origin_visit': [ { - b'status': b'full', - b'origin': { - b'url': 'file:///dev/zero', + 'status': 'full', + 'origin': { + 'url': 'file:///dev/zero', } }, { - b'status': b'full', - b'origin': { - b'url': 'file:///tmp/foobar', + 'status': 'full', + 'origin': { + 'url': 'file:///tmp/foobar', } }, { - b'status': b'full', - b'origin': { - b'url': 'file:///tmp/spamegg', + 'status': 'full', + 'origin': { + 'url': 'file:///tmp/spamegg', } }, ] } process_journal_objects( messages, scheduler=mock_scheduler, task_names={'origin_metadata': 'task-name'}) self.assertTrue(mock_scheduler.create_tasks.called) call_args = mock_scheduler.create_tasks.call_args (args, kwargs) = call_args self.assertEqual(kwargs, {}) del args[0][0]['next_run'] del args[0][1]['next_run'] self.assertEqual(args, ([ { 'arguments': { 'kwargs': {'policy_update': 'update-dups'}, 'args': (['file:///dev/zero', 'file:///tmp/foobar'],) }, 'policy': 'oneshot', 'type': 'task-name' }, { 'arguments': { 'kwargs': {'policy_update': 'update-dups'}, 'args': (['file:///tmp/spamegg'],) }, 'policy': 'oneshot', 'type': 'task-name' }, ],)) diff --git a/version.txt b/version.txt index a2cb480..e54989d 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.150-0-g6a9f640 \ No newline at end of file +v0.0.151-0-gcfe61a1 \ No newline at end of file