diff --git a/PKG-INFO b/PKG-INFO index a2cb919..9ebd27a 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,69 +1,69 @@ Metadata-Version: 2.1 Name: swh.indexer -Version: 0.0.137 +Version: 0.0.138 Summary: Software Heritage Content Indexer Home-page: https://forge.softwareheritage.org/diffusion/78/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Source, https://forge.softwareheritage.org/source/swh-indexer Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Description: swh-indexer ============ Tools to compute multiple indexes on SWH's raw contents: - content: - mimetype - ctags - language - fossology-license - metadata - revision: - metadata An indexer is in charge of: - looking up objects - extracting information from those objects - store those information in the swh-indexer db There are multiple indexers working on different object types: - content indexer: works with content sha1 hashes - revision indexer: works with revision sha1 hashes - origin indexer: works with origin identifiers Indexation procedure: - receive batch of ids - retrieve the associated data depending on object type - compute for that object some index - store the result to swh's storage Current content indexers: - mimetype (queue swh_indexer_content_mimetype): detect the encoding and mimetype - language (queue swh_indexer_content_language): detect the programming language - ctags (queue swh_indexer_content_ctags): compute tags information - fossology-license (queue swh_indexer_fossology_license): compute the license - metadata: translate file into translated_metadata dict Current revision indexers: - metadata: detects files containing metadata and retrieves translated_metadata in content_metadata table in storage or run content indexer to translate files. Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/requirements-swh.txt b/requirements-swh.txt index e390a08..be61102 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,6 +1,6 @@ swh.core >= 0.0.53 swh.model >= 0.0.15 swh.objstorage >= 0.0.28 -swh.scheduler >= 0.0.39 +swh.scheduler >= 0.0.47 swh.storage >= 0.0.123 swh.journal >= 0.0.6 diff --git a/swh.indexer.egg-info/PKG-INFO b/swh.indexer.egg-info/PKG-INFO index a2cb919..9ebd27a 100644 --- a/swh.indexer.egg-info/PKG-INFO +++ b/swh.indexer.egg-info/PKG-INFO @@ -1,69 +1,69 @@ Metadata-Version: 2.1 Name: swh.indexer -Version: 0.0.137 +Version: 0.0.138 Summary: Software Heritage Content Indexer Home-page: https://forge.softwareheritage.org/diffusion/78/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Source, https://forge.softwareheritage.org/source/swh-indexer Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Description: swh-indexer ============ Tools to compute multiple indexes on SWH's raw contents: - content: - mimetype - ctags - language - fossology-license - metadata - revision: - metadata An indexer is in charge of: - looking up objects - extracting information from those objects - store those information in the swh-indexer db There are multiple indexers working on different object types: - content indexer: works with content sha1 hashes - revision indexer: works with revision sha1 hashes - origin indexer: works with origin identifiers Indexation procedure: - receive batch of ids - retrieve the associated data depending on object type - compute for that object some index - store the result to swh's storage Current content indexers: - mimetype (queue swh_indexer_content_mimetype): detect the encoding and mimetype - language (queue swh_indexer_content_language): detect the programming language - ctags (queue swh_indexer_content_ctags): compute tags information - fossology-license (queue swh_indexer_fossology_license): compute the license - metadata: translate file into translated_metadata dict Current revision indexers: - metadata: detects files containing metadata and retrieves translated_metadata in content_metadata table in storage or run content indexer to translate files. Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh.indexer.egg-info/SOURCES.txt b/swh.indexer.egg-info/SOURCES.txt index d7ce799..3974b12 100644 --- a/swh.indexer.egg-info/SOURCES.txt +++ b/swh.indexer.egg-info/SOURCES.txt @@ -1,83 +1,84 @@ MANIFEST.in Makefile README.md requirements-swh.txt requirements.txt setup.py version.txt sql/bin/db-upgrade sql/bin/dot_add_content sql/doc/json/.gitignore sql/doc/json/Makefile sql/doc/json/indexer_configuration.tool_configuration.schema.json sql/doc/json/revision_metadata.translated_metadata.json sql/json/.gitignore sql/json/Makefile sql/json/indexer_configuration.tool_configuration.schema.json sql/json/revision_metadata.translated_metadata.json sql/upgrades/115.sql sql/upgrades/116.sql sql/upgrades/117.sql sql/upgrades/118.sql sql/upgrades/119.sql sql/upgrades/120.sql sql/upgrades/121.sql sql/upgrades/122.sql swh/__init__.py swh.indexer.egg-info/PKG-INFO swh.indexer.egg-info/SOURCES.txt swh.indexer.egg-info/dependency_links.txt swh.indexer.egg-info/entry_points.txt swh.indexer.egg-info/requires.txt swh.indexer.egg-info/top_level.txt swh/indexer/__init__.py swh/indexer/cli.py swh/indexer/codemeta.py swh/indexer/ctags.py swh/indexer/fossology_license.py swh/indexer/indexer.py swh/indexer/journal_client.py swh/indexer/language.py swh/indexer/metadata.py swh/indexer/metadata_detector.py swh/indexer/metadata_dictionary.py swh/indexer/mimetype.py swh/indexer/origin_head.py swh/indexer/rehash.py swh/indexer/tasks.py swh/indexer/data/codemeta/CITATION swh/indexer/data/codemeta/LICENSE swh/indexer/data/codemeta/codemeta.jsonld swh/indexer/data/codemeta/crosswalk.csv swh/indexer/sql/10-swh-init.sql swh/indexer/sql/20-swh-enums.sql swh/indexer/sql/30-swh-schema.sql swh/indexer/sql/40-swh-func.sql swh/indexer/sql/50-swh-data.sql swh/indexer/sql/60-swh-indexes.sql swh/indexer/storage/__init__.py swh/indexer/storage/converters.py swh/indexer/storage/db.py swh/indexer/storage/in_memory.py swh/indexer/storage/api/__init__.py swh/indexer/storage/api/client.py swh/indexer/storage/api/server.py swh/indexer/storage/api/wsgi.py swh/indexer/tests/__init__.py swh/indexer/tests/conftest.py swh/indexer/tests/tasks.py +swh/indexer/tests/test_cli.py swh/indexer/tests/test_ctags.py swh/indexer/tests/test_fossology_license.py swh/indexer/tests/test_language.py swh/indexer/tests/test_metadata.py swh/indexer/tests/test_mimetype.py swh/indexer/tests/test_origin_head.py swh/indexer/tests/test_origin_metadata.py swh/indexer/tests/utils.py swh/indexer/tests/storage/__init__.py swh/indexer/tests/storage/generate_data_test.py swh/indexer/tests/storage/test_api_client.py swh/indexer/tests/storage/test_converters.py swh/indexer/tests/storage/test_in_memory.py swh/indexer/tests/storage/test_server.py swh/indexer/tests/storage/test_storage.py \ No newline at end of file diff --git a/swh.indexer.egg-info/requires.txt b/swh.indexer.egg-info/requires.txt index 04368bc..9d81572 100644 --- a/swh.indexer.egg-info/requires.txt +++ b/swh.indexer.egg-info/requires.txt @@ -1,18 +1,18 @@ vcversioner pygments click chardet file_magic pyld xmltodict swh.core>=0.0.53 swh.model>=0.0.15 swh.objstorage>=0.0.28 -swh.scheduler>=0.0.39 +swh.scheduler>=0.0.47 swh.storage>=0.0.123 swh.journal>=0.0.6 [testing] pytest<4 pytest-postgresql hypothesis>=3.11.0 diff --git a/swh/indexer/cli.py b/swh/indexer/cli.py index 78b0737..cb82793 100644 --- a/swh/indexer/cli.py +++ b/swh/indexer/cli.py @@ -1,25 +1,186 @@ -# Copyright (C) 2015-2019 The Software Heritage developers +# Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import click +from swh.core import config +from swh.scheduler import get_scheduler +from swh.scheduler.utils import create_task_dict +from swh.storage import get_storage + +from swh.indexer.metadata_dictionary import MAPPINGS +from swh.indexer.storage import get_indexer_storage from swh.indexer.storage.api.server import load_and_check_config, app -@click.command() +CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) + +TASK_BATCH_SIZE = 1000 # Number of tasks per query to the scheduler + + +@click.group(context_settings=CONTEXT_SETTINGS) +@click.option('--config-file', '-C', default=None, + type=click.Path(exists=True, dir_okay=False,), + help="Configuration file.") +@click.pass_context +def cli(ctx, config_file): + """Software Heritage Indexer CLI interface + """ + ctx.ensure_object(dict) + + conf = config.read(config_file) + ctx.obj['config'] = conf + + +def _get_api(getter, config, config_key, url): + if url: + config[config_key] = { + 'cls': 'remote', + 'args': {'url': url} + } + elif config_key not in config: + raise click.ClickException( + 'Missing configuration for {}'.format(config_key)) + return getter(**config[config_key]) + + +@cli.group('mapping') +def mapping(): + pass + + +@mapping.command('list') +def mapping_list(): + """Prints the list of known mappings.""" + mapping_names = [mapping.name for mapping in MAPPINGS.values()] + mapping_names.sort() + for mapping_name in mapping_names: + click.echo(mapping_name) + + +@cli.group('schedule') +@click.option('--scheduler-url', '-s', default=None, + help="URL of the scheduler API") +@click.option('--indexer-storage-url', '-i', default=None, + help="URL of the indexer storage API") +@click.option('--storage-url', '-g', default=None, + help="URL of the (graph) storage API") +@click.option('--dry-run/--no-dry-run', is_flag=True, + default=False, + help='Default to list only what would be scheduled.') +@click.pass_context +def schedule(ctx, scheduler_url, storage_url, indexer_storage_url, + dry_run): + """Manipulate indexer tasks via SWH Scheduler's API.""" + ctx.obj['indexer_storage'] = _get_api( + get_indexer_storage, + ctx.obj['config'], + 'indexer_storage', + indexer_storage_url + ) + ctx.obj['storage'] = _get_api( + get_storage, + ctx.obj['config'], + 'storage', + storage_url + ) + ctx.obj['scheduler'] = _get_api( + get_scheduler, + ctx.obj['config'], + 'scheduler', + scheduler_url + ) + if dry_run: + ctx.obj['scheduler'] = None + + +def list_origins_by_producer(idx_storage, mappings, tool_ids): + start = 0 + limit = 10000 + while True: + origins = list( + idx_storage.origin_intrinsic_metadata_search_by_producer( + start=start, limit=limit, ids_only=True, + mappings=mappings or None, tool_ids=tool_ids or None)) + if not origins: + break + start = origins[-1]+1 + yield from origins + + +@schedule.command('reindex_origin_metadata') +@click.option('--batch-size', '-b', 'origin_batch_size', + default=10, show_default=True, type=int, + help="Number of origins per task") +@click.option('--tool-id', '-t', 'tool_ids', type=int, multiple=True, + help="Restrict search of old metadata to this/these tool ids.") +@click.option('--mapping', '-m', 'mappings', multiple=True, + help="Mapping(s) that should be re-scheduled (eg. 'npm', " + "'gemspec', 'maven')") +@click.option('--task-type', + default='indexer_origin_metadata', show_default=True, + help="Name of the task type to schedule.") +@click.pass_context +def schedule_origin_metadata_reindex( + ctx, origin_batch_size, mappings, tool_ids, task_type): + """Schedules indexing tasks for origins that were already indexed.""" + idx_storage = ctx.obj['indexer_storage'] + scheduler = ctx.obj['scheduler'] + + origins = list_origins_by_producer(idx_storage, mappings, tool_ids) + kwargs = {"policy_update": "update-dups", "parse_ids": False} + nb_origins = 0 + nb_tasks = 0 + + while True: + task_batch = [] + for _ in range(TASK_BATCH_SIZE): + # Group origins + origin_batch = [] + for (_, origin) in zip(range(origin_batch_size), origins): + origin_batch.append(origin) + nb_origins += len(origin_batch) + if not origin_batch: + break + + # Create a task for these origins + args = [origin_batch] + task_dict = create_task_dict(task_type, 'oneshot', *args, **kwargs) + task_batch.append(task_dict) + + # Schedule a batch of tasks + if not task_batch: + break + nb_tasks += len(task_batch) + if scheduler: + scheduler.create_tasks(task_batch) + click.echo('Scheduled %d tasks (%d origins).' % (nb_tasks, nb_origins)) + + # Print final status. + if nb_tasks: + click.echo('Done.') + else: + click.echo('Nothing to do (no origin metadata matched the criteria).') + + +@cli.command('api-server') @click.argument('config-path', required=1) @click.option('--host', default='0.0.0.0', help="Host to run the server") @click.option('--port', default=5007, type=click.INT, help="Binding port of the server") @click.option('--debug/--nodebug', default=True, help="Indicates if the server should run in debug mode") -def main(config_path, host, port, debug): +def api_server(config_path, host, port, debug): api_cfg = load_and_check_config(config_path, type='any') app.config.update(api_cfg) app.run(host, port=int(port), debug=bool(debug)) +def main(): + return cli(auto_envvar_prefix='SWH_INDEXER') + + if __name__ == '__main__': main() diff --git a/swh/indexer/metadata.py b/swh/indexer/metadata.py index 672afa0..83f5a93 100644 --- a/swh/indexer/metadata.py +++ b/swh/indexer/metadata.py @@ -1,310 +1,310 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import deepcopy from swh.indexer.indexer import ContentIndexer, RevisionIndexer, OriginIndexer from swh.indexer.origin_head import OriginHeadIndexer from swh.indexer.metadata_dictionary import MAPPINGS from swh.indexer.metadata_detector import detect_metadata from swh.indexer.metadata_detector import extract_minimal_metadata_dict from swh.indexer.storage import INDEXER_CFG_KEY from swh.model import hashutil class ContentMetadataIndexer(ContentIndexer): """Content-level indexer This indexer is in charge of: - filtering out content already indexed in content_metadata - reading content from objstorage with the content's id sha1 - computing translated_metadata by given context - using the metadata_dictionary as the 'swh-metadata-translator' tool - store result in content_metadata table """ def filter(self, ids): """Filter out known sha1s and return only missing ones. """ yield from self.idx_storage.content_metadata_missing(( { 'id': sha1, 'indexer_configuration_id': self.tool['id'], } for sha1 in ids )) def index(self, id, data, log_suffix='unknown revision'): """Index sha1s' content and store result. Args: id (bytes): content's identifier data (bytes): raw content in bytes Returns: dict: dictionary representing a content_metadata. If the translation wasn't successful the translated_metadata keys will be returned as None """ result = { 'id': id, 'indexer_configuration_id': self.tool['id'], 'translated_metadata': None } try: mapping_name = self.tool['tool_configuration']['context'] log_suffix += ', content_id=%s' % hashutil.hash_to_hex(id) result['translated_metadata'] = \ MAPPINGS[mapping_name](log_suffix).translate(data) except Exception: self.log.exception( "Problem during metadata translation " "for content %s" % hashutil.hash_to_hex(id)) if result['translated_metadata'] is None: return None return result def persist_index_computations(self, results, policy_update): """Persist the results in storage. Args: results ([dict]): list of content_metadata, dict with the following keys: - id (bytes): content's identifier (sha1) - translated_metadata (jsonb): detected metadata policy_update ([str]): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ self.idx_storage.content_metadata_add( results, conflict_update=(policy_update == 'update-dups')) class RevisionMetadataIndexer(RevisionIndexer): """Revision-level indexer This indexer is in charge of: - filtering revisions already indexed in revision_metadata table with defined computation tool - retrieve all entry_files in root directory - use metadata_detector for file_names containing metadata - compute metadata translation if necessary and possible (depends on tool) - send sha1s to content indexing if possible - store the results for revision """ ADDITIONAL_CONFIG = { 'tools': ('dict', { 'name': 'swh-metadata-detector', 'version': '0.0.2', 'configuration': { 'type': 'local', 'context': list(MAPPINGS), }, }), } def filter(self, sha1_gits): """Filter out known sha1s and return only missing ones. """ yield from self.idx_storage.revision_metadata_missing(( { 'id': sha1_git, 'indexer_configuration_id': self.tool['id'], } for sha1_git in sha1_gits )) def index(self, rev): """Index rev by processing it and organizing result. use metadata_detector to iterate on filenames - if one filename detected -> sends file to content indexer - if multiple file detected -> translation needed at revision level Args: rev (dict): revision artifact from storage Returns: dict: dictionary representing a revision_metadata, with keys: - id (str): rev's identifier (sha1_git) - indexer_configuration_id (bytes): tool used - translated_metadata: dict of retrieved metadata """ result = { 'id': rev['id'], 'indexer_configuration_id': self.tool['id'], 'mappings': None, 'translated_metadata': None } try: root_dir = rev['directory'] dir_ls = self.storage.directory_ls(root_dir, recursive=False) files = [entry for entry in dir_ls if entry['type'] == 'file'] detected_files = detect_metadata(files) (mappings, metadata) = self.translate_revision_metadata( detected_files, log_suffix='revision=%s' % hashutil.hash_to_hex(rev['id'])) result['mappings'] = mappings result['translated_metadata'] = metadata except Exception as e: self.log.exception( 'Problem when indexing rev: %r', e) return result def persist_index_computations(self, results, policy_update): """Persist the results in storage. Args: results ([dict]): list of content_mimetype, dict with the following keys: - id (bytes): content's identifier (sha1) - mimetype (bytes): mimetype in bytes - encoding (bytes): encoding in bytes policy_update ([str]): either 'update-dups' or 'ignore-dups' to respectively update duplicates or ignore them """ # TODO: add functions in storage to keep data in revision_metadata self.idx_storage.revision_metadata_add( results, conflict_update=(policy_update == 'update-dups')) def translate_revision_metadata(self, detected_files, log_suffix): """ Determine plan of action to translate metadata when containing one or multiple detected files: Args: detected_files (dict): dictionary mapping context names (e.g., "npm", "authors") to list of sha1 Returns: (List[str], dict): list of mappings used and dict with translated metadata according to the CodeMeta vocabulary """ used_mappings = [MAPPINGS[context].name for context in detected_files] translated_metadata = [] tool = { 'name': 'swh-metadata-translator', 'version': '0.0.2', 'configuration': { 'type': 'local', 'context': None }, } # TODO: iterate on each context, on each file # -> get raw_contents # -> translate each content config = { k: self.config[k] for k in [INDEXER_CFG_KEY, 'objstorage', 'storage'] } config['tools'] = [tool] for context in detected_files.keys(): cfg = deepcopy(config) cfg['tools'][0]['configuration']['context'] = context c_metadata_indexer = ContentMetadataIndexer(config=cfg) # sha1s that are in content_metadata table sha1s_in_storage = [] metadata_generator = self.idx_storage.content_metadata_get( detected_files[context]) for c in metadata_generator: # extracting translated_metadata sha1 = c['id'] sha1s_in_storage.append(sha1) local_metadata = c['translated_metadata'] # local metadata is aggregated if local_metadata: translated_metadata.append(local_metadata) sha1s_filtered = [item for item in detected_files[context] if item not in sha1s_in_storage] if sha1s_filtered: # content indexing try: c_metadata_indexer.run(sha1s_filtered, policy_update='ignore-dups', log_suffix=log_suffix) # on the fly possibility: for result in c_metadata_indexer.results: local_metadata = result['translated_metadata'] translated_metadata.append(local_metadata) except Exception: self.log.exception( "Exception while indexing metadata on contents") # transform translated_metadata into min set with swh-metadata-detector min_metadata = extract_minimal_metadata_dict(translated_metadata) return (used_mappings, min_metadata) class OriginMetadataIndexer(OriginIndexer): ADDITIONAL_CONFIG = RevisionMetadataIndexer.ADDITIONAL_CONFIG USE_TOOLS = False - def __init__(self, config, **kwargs): + def __init__(self, config=None, **kwargs): super().__init__(config=config, **kwargs) self.origin_head_indexer = OriginHeadIndexer(config=config) self.revision_metadata_indexer = RevisionMetadataIndexer(config=config) def index_list(self, origins): head_rev_ids = [] origins_with_head = [] for origin in origins: head_result = self.origin_head_indexer.index(origin) if head_result: origins_with_head.append(origin) head_rev_ids.append(head_result['revision_id']) head_revs = list(self.storage.revision_get(head_rev_ids)) assert len(head_revs) == len(head_rev_ids) results = [] for (origin, rev) in zip(origins_with_head, head_revs): if not rev: self.log.warning('Missing head revision of origin %r', origin) continue rev_metadata = self.revision_metadata_indexer.index(rev) orig_metadata = { 'from_revision': rev_metadata['id'], 'origin_id': origin['id'], 'metadata': rev_metadata['translated_metadata'], 'mappings': rev_metadata['mappings'], 'indexer_configuration_id': rev_metadata['indexer_configuration_id'], } results.append((orig_metadata, rev_metadata)) return results def persist_index_computations(self, results, policy_update): conflict_update = (policy_update == 'update-dups') # Deduplicate revisions rev_metadata = [] orig_metadata = [] for (orig_item, rev_item) in results: if rev_item not in rev_metadata: rev_metadata.append(rev_item) if orig_item not in orig_metadata: orig_metadata.append(orig_item) self.idx_storage.revision_metadata_add( rev_metadata, conflict_update=conflict_update) self.idx_storage.origin_intrinsic_metadata_add( orig_metadata, conflict_update=conflict_update) diff --git a/swh/indexer/tests/test_cli.py b/swh/indexer/tests/test_cli.py new file mode 100644 index 0000000..d14b186 --- /dev/null +++ b/swh/indexer/tests/test_cli.py @@ -0,0 +1,289 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from functools import reduce +import tempfile +from unittest.mock import patch + +from click.testing import CliRunner + +from swh.model.hashutil import hash_to_bytes + +from swh.indexer.cli import cli + + +CLI_CONFIG = ''' +scheduler: + cls: foo + args: {} +storage: + cls: memory + args: {} +indexer_storage: + cls: memory + args: {} +''' + + +def fill_idx_storage(idx_storage, nb_rows): + tools = [ + { + 'tool_name': 'tool %d' % i, + 'tool_version': '0.0.1', + 'tool_configuration': {}, + } + for i in range(2) + ] + tools = idx_storage.indexer_configuration_add(tools) + + origin_metadata = [ + { + 'origin_id': origin_id, + 'from_revision': hash_to_bytes('abcd{:0>4}'.format(origin_id)), + 'indexer_configuration_id': tools[origin_id % 2]['id'], + 'metadata': {'name': 'origin %d' % origin_id}, + 'mappings': ['mapping%d' % (origin_id % 10)] + } + for origin_id in range(nb_rows) + ] + revision_metadata = [ + { + 'id': hash_to_bytes('abcd{:0>4}'.format(origin_id)), + 'indexer_configuration_id': tools[origin_id % 2]['id'], + 'metadata': {'name': 'origin %d' % origin_id}, + 'mappings': ['mapping%d' % (origin_id % 10)] + } + for origin_id in range(nb_rows) + ] + + idx_storage.revision_metadata_add(revision_metadata) + idx_storage.origin_intrinsic_metadata_add(origin_metadata) + + return [tool['id'] for tool in tools] + + +def _origins_in_task_args(tasks): + """Returns the set of origins contained in the arguments of the + provided tasks (assumed to be of type indexer_origin_metadata).""" + return reduce( + set.union, + (set(task['arguments']['args'][0]) for task in tasks), + set() + ) + + +def _assert_tasks_for_origins(tasks, origins): + expected_kwargs = {"policy_update": "update-dups", "parse_ids": False} + assert {task['type'] for task in tasks} == {'indexer_origin_metadata'} + assert all(len(task['arguments']['args']) == 1 for task in tasks) + assert all(task['arguments']['kwargs'] == expected_kwargs + for task in tasks) + assert _origins_in_task_args(tasks) == set(origins) + + +def invoke(scheduler, catch_exceptions, args): + runner = CliRunner() + with patch('swh.indexer.cli.get_scheduler') as get_scheduler_mock, \ + tempfile.NamedTemporaryFile('a', suffix='.yml') as config_fd: + config_fd.write(CLI_CONFIG) + config_fd.seek(0) + get_scheduler_mock.return_value = scheduler + result = runner.invoke(cli, ['-C' + config_fd.name] + args) + if not catch_exceptions and result.exception: + print(result.output) + raise result.exception + return result + + +def test_mapping_list(indexer_scheduler): + result = invoke(indexer_scheduler, False, [ + 'mapping', 'list', + ]) + expected_output = '\n'.join([ + 'codemeta', 'gemspec', 'maven', 'npm', 'pkg-info', '', + ]) + assert result.exit_code == 0, result.output + assert result.output == expected_output + + +@patch('swh.indexer.cli.TASK_BATCH_SIZE', 3) +def test_origin_metadata_reindex_empty_db( + indexer_scheduler, idx_storage, storage): + result = invoke(indexer_scheduler, False, [ + 'schedule', 'reindex_origin_metadata', + ]) + expected_output = ( + 'Nothing to do (no origin metadata matched the criteria).\n' + ) + assert result.exit_code == 0, result.output + assert result.output == expected_output + tasks = indexer_scheduler.search_tasks() + assert len(tasks) == 0 + + +@patch('swh.indexer.cli.TASK_BATCH_SIZE', 3) +def test_origin_metadata_reindex_divisor( + indexer_scheduler, idx_storage, storage): + """Tests the re-indexing when origin_batch_size*task_batch_size is a + divisor of nb_origins.""" + fill_idx_storage(idx_storage, 90) + + result = invoke(indexer_scheduler, False, [ + 'schedule', 'reindex_origin_metadata', + ]) + + # Check the output + expected_output = ( + 'Scheduled 3 tasks (30 origins).\n' + 'Scheduled 6 tasks (60 origins).\n' + 'Scheduled 9 tasks (90 origins).\n' + 'Done.\n' + ) + assert result.exit_code == 0, result.output + assert result.output == expected_output + + # Check scheduled tasks + tasks = indexer_scheduler.search_tasks() + assert len(tasks) == 9 + _assert_tasks_for_origins(tasks, range(90)) + + +@patch('swh.indexer.cli.TASK_BATCH_SIZE', 3) +def test_origin_metadata_reindex_dry_run( + indexer_scheduler, idx_storage, storage): + """Tests the re-indexing when origin_batch_size*task_batch_size is a + divisor of nb_origins.""" + fill_idx_storage(idx_storage, 90) + + result = invoke(indexer_scheduler, False, [ + 'schedule', '--dry-run', 'reindex_origin_metadata', + ]) + + # Check the output + expected_output = ( + 'Scheduled 3 tasks (30 origins).\n' + 'Scheduled 6 tasks (60 origins).\n' + 'Scheduled 9 tasks (90 origins).\n' + 'Done.\n' + ) + assert result.exit_code == 0, result.output + assert result.output == expected_output + + # Check scheduled tasks + tasks = indexer_scheduler.search_tasks() + assert len(tasks) == 0 + + +@patch('swh.indexer.cli.TASK_BATCH_SIZE', 3) +def test_origin_metadata_reindex_nondivisor( + indexer_scheduler, idx_storage, storage): + """Tests the re-indexing when neither origin_batch_size or + task_batch_size is a divisor of nb_origins.""" + fill_idx_storage(idx_storage, 70) + + result = invoke(indexer_scheduler, False, [ + 'schedule', 'reindex_origin_metadata', + '--batch-size', '20', + ]) + + # Check the output + expected_output = ( + 'Scheduled 3 tasks (60 origins).\n' + 'Scheduled 4 tasks (70 origins).\n' + 'Done.\n' + ) + assert result.exit_code == 0, result.output + assert result.output == expected_output + + # Check scheduled tasks + tasks = indexer_scheduler.search_tasks() + assert len(tasks) == 4 + _assert_tasks_for_origins(tasks, range(70)) + + +@patch('swh.indexer.cli.TASK_BATCH_SIZE', 3) +def test_origin_metadata_reindex_filter_one_mapping( + indexer_scheduler, idx_storage, storage): + """Tests the re-indexing when origin_batch_size*task_batch_size is a + divisor of nb_origins.""" + fill_idx_storage(idx_storage, 110) + + result = invoke(indexer_scheduler, False, [ + 'schedule', 'reindex_origin_metadata', + '--mapping', 'mapping1', + ]) + + # Check the output + expected_output = ( + 'Scheduled 2 tasks (11 origins).\n' + 'Done.\n' + ) + assert result.exit_code == 0, result.output + assert result.output == expected_output + + # Check scheduled tasks + tasks = indexer_scheduler.search_tasks() + assert len(tasks) == 2 + _assert_tasks_for_origins( + tasks, + [1, 11, 21, 31, 41, 51, 61, 71, 81, 91, 101]) + + +@patch('swh.indexer.cli.TASK_BATCH_SIZE', 3) +def test_origin_metadata_reindex_filter_two_mappings( + indexer_scheduler, idx_storage, storage): + """Tests the re-indexing when origin_batch_size*task_batch_size is a + divisor of nb_origins.""" + fill_idx_storage(idx_storage, 110) + + result = invoke(indexer_scheduler, False, [ + 'schedule', 'reindex_origin_metadata', + '--mapping', 'mapping1', '--mapping', 'mapping2', + ]) + + # Check the output + expected_output = ( + 'Scheduled 3 tasks (22 origins).\n' + 'Done.\n' + ) + assert result.exit_code == 0, result.output + assert result.output == expected_output + + # Check scheduled tasks + tasks = indexer_scheduler.search_tasks() + assert len(tasks) == 3 + _assert_tasks_for_origins( + tasks, + [1, 11, 21, 31, 41, 51, 61, 71, 81, 91, 101, + 2, 12, 22, 32, 42, 52, 62, 72, 82, 92, 102]) + + +@patch('swh.indexer.cli.TASK_BATCH_SIZE', 3) +def test_origin_metadata_reindex_filter_one_tool( + indexer_scheduler, idx_storage, storage): + """Tests the re-indexing when origin_batch_size*task_batch_size is a + divisor of nb_origins.""" + tool_ids = fill_idx_storage(idx_storage, 110) + + result = invoke(indexer_scheduler, False, [ + 'schedule', 'reindex_origin_metadata', + '--tool-id', str(tool_ids[0]), + ]) + + # Check the output + expected_output = ( + 'Scheduled 3 tasks (30 origins).\n' + 'Scheduled 6 tasks (55 origins).\n' + 'Done.\n' + ) + assert result.exit_code == 0, result.output + assert result.output == expected_output + + # Check scheduled tasks + tasks = indexer_scheduler.search_tasks() + assert len(tasks) == 6 + _assert_tasks_for_origins( + tasks, + [x*2 for x in range(55)]) diff --git a/version.txt b/version.txt index b302a09..1768319 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -v0.0.137-0-g8de1d6a \ No newline at end of file +v0.0.138-0-gd3684b8 \ No newline at end of file