diff --git a/swh/indexer/cli.py b/swh/indexer/cli.py index e2c904d..986f2f3 100644 --- a/swh/indexer/cli.py +++ b/swh/indexer/cli.py @@ -1,284 +1,284 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import functools import json import click from swh.core import config from swh.core.cli import CONTEXT_SETTINGS, AliasedGroup from swh.journal.client import get_journal_client from swh.scheduler import get_scheduler from swh.scheduler.cli_utils import schedule_origin_batches from swh.storage import get_storage from swh.indexer import metadata_dictionary from swh.indexer.journal_client import process_journal_objects from swh.indexer.storage import get_indexer_storage from swh.indexer.storage.api.server import load_and_check_config, app @click.group(name="indexer", context_settings=CONTEXT_SETTINGS, cls=AliasedGroup) @click.option( "--config-file", "-C", default=None, type=click.Path(exists=True, dir_okay=False,), help="Configuration file.", ) @click.pass_context def cli(ctx, config_file): """Software Heritage Indexer tools. The Indexer is used to mine the content of the archive and extract derived information from archive source code artifacts. """ ctx.ensure_object(dict) conf = config.read(config_file) ctx.obj["config"] = conf def _get_api(getter, config, config_key, url): if url: config[config_key] = {"cls": "remote", "args": {"url": url}} elif config_key not in config: raise click.ClickException("Missing configuration for {}".format(config_key)) return getter(**config[config_key]) @cli.group("mapping") def mapping(): """Manage Software Heritage Indexer mappings.""" pass @mapping.command("list") def mapping_list(): """Prints the list of known mappings.""" mapping_names = [mapping.name for mapping in metadata_dictionary.MAPPINGS.values()] mapping_names.sort() for mapping_name in mapping_names: click.echo(mapping_name) @mapping.command("list-terms") @click.option( "--exclude-mapping", multiple=True, help="Exclude the given mapping from the output" ) @click.option( "--concise", is_flag=True, default=False, help="Don't print the list of mappings supporting each term.", ) def mapping_list_terms(concise, exclude_mapping): """Prints the list of known CodeMeta terms, and which mappings support them.""" properties = metadata_dictionary.list_terms() for (property_name, supported_mappings) in sorted(properties.items()): supported_mappings = {m.name for m in supported_mappings} supported_mappings -= set(exclude_mapping) if supported_mappings: if concise: click.echo(property_name) else: click.echo("{}:".format(property_name)) click.echo("\t" + ", ".join(sorted(supported_mappings))) @mapping.command("translate") @click.argument("mapping-name") @click.argument("file", type=click.File("rb")) def mapping_translate(mapping_name, file): """Prints the list of known mappings.""" mapping_cls = [ cls for cls in metadata_dictionary.MAPPINGS.values() if cls.name == mapping_name ] if not mapping_cls: raise click.ClickException("Unknown mapping {}".format(mapping_name)) assert len(mapping_cls) == 1 mapping_cls = mapping_cls[0] mapping = mapping_cls() codemeta_doc = mapping.translate(file.read()) click.echo(json.dumps(codemeta_doc, indent=4)) @cli.group("schedule") @click.option("--scheduler-url", "-s", default=None, help="URL of the scheduler API") @click.option( "--indexer-storage-url", "-i", default=None, help="URL of the indexer storage API" ) @click.option( "--storage-url", "-g", default=None, help="URL of the (graph) storage API" ) @click.option( "--dry-run/--no-dry-run", is_flag=True, default=False, help="List only what would be scheduled.", ) @click.pass_context def schedule(ctx, scheduler_url, storage_url, indexer_storage_url, dry_run): """Manipulate Software Heritage Indexer tasks. Via SWH Scheduler's API.""" ctx.obj["indexer_storage"] = _get_api( get_indexer_storage, ctx.obj["config"], "indexer_storage", indexer_storage_url ) ctx.obj["storage"] = _get_api( get_storage, ctx.obj["config"], "storage", storage_url ) ctx.obj["scheduler"] = _get_api( get_scheduler, ctx.obj["config"], "scheduler", scheduler_url ) if dry_run: ctx.obj["scheduler"] = None def list_origins_by_producer(idx_storage, mappings, tool_ids): next_page_token = "" limit = 10000 while next_page_token is not None: result = idx_storage.origin_intrinsic_metadata_search_by_producer( page_token=next_page_token, limit=limit, ids_only=True, mappings=mappings or None, tool_ids=tool_ids or None, ) next_page_token = result.get("next_page_token") yield from result["origins"] @schedule.command("reindex_origin_metadata") @click.option( "--batch-size", "-b", "origin_batch_size", default=10, show_default=True, type=int, help="Number of origins per task", ) @click.option( "--tool-id", "-t", "tool_ids", type=int, multiple=True, help="Restrict search of old metadata to this/these tool ids.", ) @click.option( "--mapping", "-m", "mappings", multiple=True, - help="Mapping(s) that should be re-scheduled (eg. 'npm', " "'gemspec', 'maven')", + help="Mapping(s) that should be re-scheduled (eg. 'npm', 'gemspec', 'maven')", ) @click.option( "--task-type", default="index-origin-metadata", show_default=True, help="Name of the task type to schedule.", ) @click.pass_context def schedule_origin_metadata_reindex( ctx, origin_batch_size, tool_ids, mappings, task_type ): """Schedules indexing tasks for origins that were already indexed.""" idx_storage = ctx.obj["indexer_storage"] scheduler = ctx.obj["scheduler"] origins = list_origins_by_producer(idx_storage, mappings, tool_ids) kwargs = {"policy_update": "update-dups"} schedule_origin_batches(scheduler, task_type, origins, origin_batch_size, kwargs) @cli.command("journal-client") @click.option("--scheduler-url", "-s", default=None, help="URL of the scheduler API") @click.option( "--origin-metadata-task-type", default="index-origin-metadata", help="Name of the task running the origin metadata indexer.", ) @click.option( "--broker", "brokers", type=str, multiple=True, help="Kafka broker to connect to." ) @click.option( "--prefix", type=str, default=None, help="Prefix of Kafka topic names to read from." ) @click.option("--group-id", type=str, help="Consumer/group id for reading from Kafka.") @click.option( "--stop-after-objects", "-m", default=None, type=int, - help="Maximum number of objects to replay. Default is to " "run forever.", + help="Maximum number of objects to replay. Default is to run forever.", ) @click.pass_context def journal_client( ctx, scheduler_url, origin_metadata_task_type, brokers, prefix, group_id, stop_after_objects, ): """Listens for new objects from the SWH Journal, and schedules tasks to run relevant indexers (currently, only origin-intrinsic-metadata) on these new objects.""" scheduler = _get_api(get_scheduler, ctx.obj["config"], "scheduler", scheduler_url) client = get_journal_client( cls="kafka", brokers=brokers, prefix=prefix, group_id=group_id, object_types=["origin_visit"], stop_after_objects=stop_after_objects, ) worker_fn = functools.partial( process_journal_objects, scheduler=scheduler, task_names={"origin_metadata": origin_metadata_task_type,}, ) try: client.process(worker_fn) except KeyboardInterrupt: ctx.exit(0) else: print("Done.") finally: client.close() @cli.command("rpc-serve") @click.argument("config-path", required=True) @click.option("--host", default="0.0.0.0", help="Host to run the server") @click.option("--port", default=5007, type=click.INT, help="Binding port of the server") @click.option( "--debug/--nodebug", default=True, help="Indicates if the server should run in debug mode", ) def rpc_server(config_path, host, port, debug): """Starts a Software Heritage Indexer RPC HTTP server.""" api_cfg = load_and_check_config(config_path, type="any") app.config.update(api_cfg) app.run(host, port=int(port), debug=bool(debug)) def main(): return cli(auto_envvar_prefix="SWH_INDEXER") if __name__ == "__main__": main() diff --git a/swh/indexer/codemeta.py b/swh/indexer/codemeta.py index a0fb805..a900104 100644 --- a/swh/indexer/codemeta.py +++ b/swh/indexer/codemeta.py @@ -1,203 +1,203 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import collections import csv import itertools import json import os.path import re import swh.indexer from pyld import jsonld _DATA_DIR = os.path.join(os.path.dirname(swh.indexer.__file__), "data") CROSSWALK_TABLE_PATH = os.path.join(_DATA_DIR, "codemeta", "crosswalk.csv") CODEMETA_CONTEXT_PATH = os.path.join(_DATA_DIR, "codemeta", "codemeta.jsonld") with open(CODEMETA_CONTEXT_PATH) as fd: CODEMETA_CONTEXT = json.load(fd) CODEMETA_CONTEXT_URL = "https://doi.org/10.5063/schema/codemeta-2.0" CODEMETA_ALTERNATE_CONTEXT_URLS = { - ("https://raw.githubusercontent.com/codemeta/codemeta/" "master/codemeta.jsonld") + ("https://raw.githubusercontent.com/codemeta/codemeta/master/codemeta.jsonld") } CODEMETA_URI = "https://codemeta.github.io/terms/" SCHEMA_URI = "http://schema.org/" PROPERTY_BLACKLIST = { # CodeMeta properties that we cannot properly represent. SCHEMA_URI + "softwareRequirements", CODEMETA_URI + "softwareSuggestions", # Duplicate of 'author' SCHEMA_URI + "creator", } _codemeta_field_separator = re.compile(r"\s*[,/]\s*") def make_absolute_uri(local_name): definition = CODEMETA_CONTEXT["@context"][local_name] if isinstance(definition, str): return definition elif isinstance(definition, dict): prefixed_name = definition["@id"] (prefix, local_name) = prefixed_name.split(":") if prefix == "schema": canonical_name = SCHEMA_URI + local_name elif prefix == "codemeta": canonical_name = CODEMETA_URI + local_name else: assert False, prefix return canonical_name else: assert False, definition def _read_crosstable(fd): reader = csv.reader(fd) try: header = next(reader) except StopIteration: raise ValueError("empty file") data_sources = set(header) - {"Parent Type", "Property", "Type", "Description"} assert "codemeta-V1" in data_sources codemeta_translation = {data_source: {} for data_source in data_sources} terms = set() for line in reader: # For each canonical name local_name = dict(zip(header, line))["Property"] if not local_name: continue canonical_name = make_absolute_uri(local_name) if canonical_name in PROPERTY_BLACKLIST: continue terms.add(canonical_name) for (col, value) in zip(header, line): # For each cell in the row if col in data_sources: # If that's not the parentType/property/type/description for local_name in _codemeta_field_separator.split(value): # For each of the data source's properties that maps # to this canonical name if local_name.strip(): codemeta_translation[col][local_name.strip()] = canonical_name return (terms, codemeta_translation) with open(CROSSWALK_TABLE_PATH) as fd: (CODEMETA_TERMS, CROSSWALK_TABLE) = _read_crosstable(fd) def _document_loader(url, options=None): """Document loader for pyld. Reads the local codemeta.jsonld file instead of fetching it from the Internet every single time.""" if url == CODEMETA_CONTEXT_URL or url in CODEMETA_ALTERNATE_CONTEXT_URLS: return { "contextUrl": None, "documentUrl": url, "document": CODEMETA_CONTEXT, } elif url == CODEMETA_URI: raise Exception( "{} is CodeMeta's URI, use {} as context url".format( CODEMETA_URI, CODEMETA_CONTEXT_URL ) ) else: raise Exception(url) def compact(doc): """Same as `pyld.jsonld.compact`, but in the context of CodeMeta.""" return jsonld.compact( doc, CODEMETA_CONTEXT_URL, options={"documentLoader": _document_loader} ) def expand(doc): """Same as `pyld.jsonld.expand`, but in the context of CodeMeta.""" return jsonld.expand(doc, options={"documentLoader": _document_loader}) def merge_values(v1, v2): """If v1 and v2 are of the form `{"@list": l1}` and `{"@list": l2}`, returns `{"@list": l1 + l2}`. Otherwise, make them lists (if they are not already) and concatenate them. >>> merge_values('a', 'b') ['a', 'b'] >>> merge_values(['a', 'b'], 'c') ['a', 'b', 'c'] >>> merge_values({'@list': ['a', 'b']}, {'@list': ['c']}) {'@list': ['a', 'b', 'c']} """ if v1 is None: return v2 elif v2 is None: return v1 elif isinstance(v1, dict) and set(v1) == {"@list"}: assert isinstance(v1["@list"], list) if isinstance(v2, dict) and set(v2) == {"@list"}: assert isinstance(v2["@list"], list) return {"@list": v1["@list"] + v2["@list"]} else: raise ValueError("Cannot merge %r and %r" % (v1, v2)) else: if isinstance(v2, dict) and "@list" in v2: raise ValueError("Cannot merge %r and %r" % (v1, v2)) if not isinstance(v1, list): v1 = [v1] if not isinstance(v2, list): v2 = [v2] return v1 + v2 def merge_documents(documents): """Takes a list of metadata dicts, each generated from a different metadata file, and merges them. Removes duplicates, if any.""" documents = list(itertools.chain.from_iterable(map(expand, documents))) merged_document = collections.defaultdict(list) for document in documents: for (key, values) in document.items(): if key == "@id": # @id does not get expanded to a list value = values # Only one @id is allowed, move it to sameAs if "@id" not in merged_document: merged_document["@id"] = value elif value != merged_document["@id"]: if value not in merged_document[SCHEMA_URI + "sameAs"]: merged_document[SCHEMA_URI + "sameAs"].append(value) else: for value in values: if isinstance(value, dict) and set(value) == {"@list"}: # Value is of the form {'@list': [item1, item2]} # instead of the usual [item1, item2]. # We need to merge the inner lists (and mostly # preserve order). merged_value = merged_document.setdefault(key, {"@list": []}) for subvalue in value["@list"]: # merged_value must be of the form # {'@list': [item1, item2]}; as it is the same # type as value, which is an @list. if subvalue not in merged_value["@list"]: merged_value["@list"].append(subvalue) elif value not in merged_document[key]: merged_document[key].append(value) return compact(merged_document) diff --git a/swh/indexer/tests/test_cli.py b/swh/indexer/tests/test_cli.py index 08ad477..1974d2a 100644 --- a/swh/indexer/tests/test_cli.py +++ b/swh/indexer/tests/test_cli.py @@ -1,373 +1,373 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from functools import reduce import re import tempfile from unittest.mock import patch from click.testing import CliRunner from swh.journal.tests.utils import FakeKafkaMessage, MockedKafkaConsumer from swh.model.hashutil import hash_to_bytes from swh.indexer.cli import cli CLI_CONFIG = """ scheduler: cls: foo args: {} storage: cls: memory indexer_storage: cls: memory args: {} """ def fill_idx_storage(idx_storage, nb_rows): tools = [ {"tool_name": "tool %d" % i, "tool_version": "0.0.1", "tool_configuration": {},} for i in range(2) ] tools = idx_storage.indexer_configuration_add(tools) origin_metadata = [ { "id": "file://dev/%04d" % origin_id, "from_revision": hash_to_bytes("abcd{:0>4}".format(origin_id)), "indexer_configuration_id": tools[origin_id % 2]["id"], "metadata": {"name": "origin %d" % origin_id}, "mappings": ["mapping%d" % (origin_id % 10)], } for origin_id in range(nb_rows) ] revision_metadata = [ { "id": hash_to_bytes("abcd{:0>4}".format(origin_id)), "indexer_configuration_id": tools[origin_id % 2]["id"], "metadata": {"name": "origin %d" % origin_id}, "mappings": ["mapping%d" % (origin_id % 10)], } for origin_id in range(nb_rows) ] idx_storage.revision_intrinsic_metadata_add(revision_metadata) idx_storage.origin_intrinsic_metadata_add(origin_metadata) return [tool["id"] for tool in tools] def _origins_in_task_args(tasks): """Returns the set of origins contained in the arguments of the provided tasks (assumed to be of type index-origin-metadata).""" return reduce( set.union, (set(task["arguments"]["args"][0]) for task in tasks), set() ) def _assert_tasks_for_origins(tasks, origins): expected_kwargs = {"policy_update": "update-dups"} assert {task["type"] for task in tasks} == {"index-origin-metadata"} assert all(len(task["arguments"]["args"]) == 1 for task in tasks) for task in tasks: assert task["arguments"]["kwargs"] == expected_kwargs, task assert _origins_in_task_args(tasks) == set(["file://dev/%04d" % i for i in origins]) def invoke(scheduler, catch_exceptions, args): runner = CliRunner() with patch( "swh.indexer.cli.get_scheduler" ) as get_scheduler_mock, tempfile.NamedTemporaryFile( "a", suffix=".yml" ) as config_fd: config_fd.write(CLI_CONFIG) config_fd.seek(0) get_scheduler_mock.return_value = scheduler result = runner.invoke(cli, ["-C" + config_fd.name] + args) if not catch_exceptions and result.exception: print(result.output) raise result.exception return result def test_mapping_list(indexer_scheduler): result = invoke(indexer_scheduler, False, ["mapping", "list",]) expected_output = "\n".join( ["codemeta", "gemspec", "maven", "npm", "pkg-info", "",] ) assert result.exit_code == 0, result.output assert result.output == expected_output def test_mapping_list_terms(indexer_scheduler): result = invoke(indexer_scheduler, False, ["mapping", "list-terms",]) assert result.exit_code == 0, result.output assert re.search(r"http://schema.org/url:\n.*npm", result.output) assert re.search(r"http://schema.org/url:\n.*codemeta", result.output) assert re.search( r"https://codemeta.github.io/terms/developmentStatus:\n\tcodemeta", result.output, ) def test_mapping_list_terms_exclude(indexer_scheduler): result = invoke( indexer_scheduler, False, ["mapping", "list-terms", "--exclude-mapping", "codemeta"], ) assert result.exit_code == 0, result.output assert re.search(r"http://schema.org/url:\n.*npm", result.output) assert not re.search(r"http://schema.org/url:\n.*codemeta", result.output) assert not re.search( r"https://codemeta.github.io/terms/developmentStatus:\n\tcodemeta", result.output, ) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_origin_metadata_reindex_empty_db(indexer_scheduler, idx_storage, storage): result = invoke(indexer_scheduler, False, ["schedule", "reindex_origin_metadata",]) expected_output = "Nothing to do (no origin metadata matched the criteria).\n" assert result.exit_code == 0, result.output assert result.output == expected_output tasks = indexer_scheduler.search_tasks() assert len(tasks) == 0 @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_origin_metadata_reindex_divisor(indexer_scheduler, idx_storage, storage): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 90) result = invoke(indexer_scheduler, False, ["schedule", "reindex_origin_metadata",]) # Check the output expected_output = ( "Scheduled 3 tasks (30 origins).\n" "Scheduled 6 tasks (60 origins).\n" "Scheduled 9 tasks (90 origins).\n" "Done.\n" ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 9 _assert_tasks_for_origins(tasks, range(90)) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_origin_metadata_reindex_dry_run(indexer_scheduler, idx_storage, storage): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 90) result = invoke( indexer_scheduler, False, ["schedule", "--dry-run", "reindex_origin_metadata",] ) # Check the output expected_output = ( "Scheduled 3 tasks (30 origins).\n" "Scheduled 6 tasks (60 origins).\n" "Scheduled 9 tasks (90 origins).\n" "Done.\n" ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 0 @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_origin_metadata_reindex_nondivisor(indexer_scheduler, idx_storage, storage): """Tests the re-indexing when neither origin_batch_size or task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 70) result = invoke( indexer_scheduler, False, ["schedule", "reindex_origin_metadata", "--batch-size", "20",], ) # Check the output expected_output = ( "Scheduled 3 tasks (60 origins).\n" "Scheduled 4 tasks (70 origins).\n" "Done.\n" ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 4 _assert_tasks_for_origins(tasks, range(70)) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_origin_metadata_reindex_filter_one_mapping( indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 110) result = invoke( indexer_scheduler, False, ["schedule", "reindex_origin_metadata", "--mapping", "mapping1",], ) # Check the output - expected_output = "Scheduled 2 tasks (11 origins).\n" "Done.\n" + expected_output = "Scheduled 2 tasks (11 origins).\nDone.\n" assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 2 _assert_tasks_for_origins(tasks, [1, 11, 21, 31, 41, 51, 61, 71, 81, 91, 101]) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_origin_metadata_reindex_filter_two_mappings( indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" fill_idx_storage(idx_storage, 110) result = invoke( indexer_scheduler, False, [ "schedule", "reindex_origin_metadata", "--mapping", "mapping1", "--mapping", "mapping2", ], ) # Check the output - expected_output = "Scheduled 3 tasks (22 origins).\n" "Done.\n" + expected_output = "Scheduled 3 tasks (22 origins).\nDone.\n" assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 3 _assert_tasks_for_origins( tasks, [ 1, 11, 21, 31, 41, 51, 61, 71, 81, 91, 101, 2, 12, 22, 32, 42, 52, 62, 72, 82, 92, 102, ], ) @patch("swh.scheduler.cli.utils.TASK_BATCH_SIZE", 3) @patch("swh.scheduler.cli_utils.TASK_BATCH_SIZE", 3) def test_origin_metadata_reindex_filter_one_tool( indexer_scheduler, idx_storage, storage ): """Tests the re-indexing when origin_batch_size*task_batch_size is a divisor of nb_origins.""" tool_ids = fill_idx_storage(idx_storage, 110) result = invoke( indexer_scheduler, False, ["schedule", "reindex_origin_metadata", "--tool-id", str(tool_ids[0]),], ) # Check the output expected_output = ( "Scheduled 3 tasks (30 origins).\n" "Scheduled 6 tasks (55 origins).\n" "Done.\n" ) assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 6 _assert_tasks_for_origins(tasks, [x * 2 for x in range(55)]) def test_journal_client(storage, indexer_scheduler): """Test the 'swh indexer journal-client' cli tool.""" message = FakeKafkaMessage( "swh.journal.objects.origin_visit", "bogus", {"status": "full", "origin": {"url": "file://dev/0000",}}, ) consumer = MockedKafkaConsumer([message]) with patch("swh.journal.client.Consumer", return_value=consumer): result = invoke( indexer_scheduler, False, [ "journal-client", "--stop-after-objects", "1", "--broker", "192.0.2.1", "--prefix", "swh.journal.objects", "--group-id", "test-consumer", ], ) # Check the output expected_output = "Done.\n" assert result.exit_code == 0, result.output assert result.output == expected_output # Check scheduled tasks tasks = indexer_scheduler.search_tasks() assert len(tasks) == 1 _assert_tasks_for_origins(tasks, [0]) diff --git a/swh/indexer/tests/test_ctags.py b/swh/indexer/tests/test_ctags.py index fca6498..baec382 100644 --- a/swh/indexer/tests/test_ctags.py +++ b/swh/indexer/tests/test_ctags.py @@ -1,155 +1,154 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import unittest from unittest.mock import patch import pytest import swh.indexer.ctags from swh.indexer.ctags import CtagsIndexer, run_ctags from swh.indexer.tests.utils import ( CommonContentIndexerTest, SHA1_TO_CTAGS, BASE_TEST_CONFIG, OBJ_STORAGE_DATA, fill_storage, fill_obj_storage, filter_dict, ) class BasicTest(unittest.TestCase): @patch("swh.indexer.ctags.subprocess") def test_run_ctags(self, mock_subprocess): """Computing licenses from a raw content should return results """ output0 = """ {"name":"defun","kind":"function","line":1,"language":"scheme"} {"name":"name","kind":"symbol","line":5,"language":"else"}""" output1 = """ {"name":"let","kind":"var","line":10,"language":"something"}""" expected_result0 = [ {"name": "defun", "kind": "function", "line": 1, "lang": "scheme"}, {"name": "name", "kind": "symbol", "line": 5, "lang": "else"}, ] expected_result1 = [ {"name": "let", "kind": "var", "line": 10, "lang": "something"} ] for path, lang, intermediary_result, expected_result in [ (b"some/path", "lisp", output0, expected_result0), (b"some/path/2", "markdown", output1, expected_result1), ]: mock_subprocess.check_output.return_value = intermediary_result actual_result = list(run_ctags(path, lang=lang)) self.assertEqual(actual_result, expected_result) class InjectCtagsIndexer: """Override ctags computations. """ def compute_ctags(self, path, lang): """Inject fake ctags given path (sha1 identifier). """ return {"lang": lang, **SHA1_TO_CTAGS.get(path)} CONFIG = { **BASE_TEST_CONFIG, "tools": { "name": "universal-ctags", "version": "~git7859817b", "configuration": { "command_line": """ctags --fields=+lnz --sort=no """ """ --links=no """, "max_content_size": 1000, }, }, "languages": {"python": "python", "haskell": "haskell", "bar": "bar",}, "workdir": "/tmp", } class TestCtagsIndexer(CommonContentIndexerTest, unittest.TestCase): """Ctags indexer test scenarios: - Known sha1s in the input list have their data indexed - Unknown sha1 in the input list are not indexed """ legacy_get_format = True def get_indexer_results(self, ids): yield from self.idx_storage.content_ctags_get(ids) def setUp(self): super().setUp() self.indexer = CtagsIndexer(config=CONFIG) self.indexer.catch_exceptions = False self.idx_storage = self.indexer.idx_storage fill_storage(self.indexer.storage) fill_obj_storage(self.indexer.objstorage) # Prepare test input self.id0 = "01c9379dfc33803963d07c1ccc748d3fe4c96bb5" self.id1 = "d4c647f0fc257591cc9ba1722484229780d1c607" self.id2 = "688a5ef812c53907562fe379d4b3851e69c7cb15" tool = {k.replace("tool_", ""): v for (k, v) in self.indexer.tool.items()} self.expected_results = { self.id0: {"id": self.id0, "tool": tool, **SHA1_TO_CTAGS[self.id0][0],}, self.id1: {"id": self.id1, "tool": tool, **SHA1_TO_CTAGS[self.id1][0],}, self.id2: {"id": self.id2, "tool": tool, **SHA1_TO_CTAGS[self.id2][0],}, } self._set_mocks() def _set_mocks(self): def find_ctags_for_content(raw_content): for (sha1, ctags) in SHA1_TO_CTAGS.items(): if OBJ_STORAGE_DATA[sha1] == raw_content: return ctags else: raise ValueError( - ("%r not found in objstorage, can't mock " "its ctags.") - % raw_content + ("%r not found in objstorage, can't mock its ctags.") % raw_content ) def fake_language(raw_content, *args, **kwargs): ctags = find_ctags_for_content(raw_content) return {"lang": ctags[0]["lang"]} self._real_compute_language = swh.indexer.ctags.compute_language swh.indexer.ctags.compute_language = fake_language def fake_check_output(cmd, *args, **kwargs): id_ = cmd[-1].split("/")[-1] return "\n".join( json.dumps({"language": ctag["lang"], **ctag}) for ctag in SHA1_TO_CTAGS[id_] ) self._real_check_output = swh.indexer.ctags.subprocess.check_output swh.indexer.ctags.subprocess.check_output = fake_check_output def tearDown(self): swh.indexer.ctags.compute_language = self._real_compute_language swh.indexer.ctags.subprocess.check_output = self._real_check_output super().tearDown() def test_ctags_w_no_tool(): with pytest.raises(ValueError): CtagsIndexer(config=filter_dict(CONFIG, "tools")) diff --git a/swh/indexer/tests/test_origin_head.py b/swh/indexer/tests/test_origin_head.py index 10c380a..dcaab27 100644 --- a/swh/indexer/tests/test_origin_head.py +++ b/swh/indexer/tests/test_origin_head.py @@ -1,171 +1,171 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from datetime import datetime, timezone from swh.indexer.origin_head import OriginHeadIndexer from swh.indexer.tests.utils import BASE_TEST_CONFIG, fill_storage ORIGIN_HEAD_CONFIG = { **BASE_TEST_CONFIG, "tools": {"name": "origin-metadata", "version": "0.0.1", "configuration": {},}, "tasks": {"revision_intrinsic_metadata": None, "origin_intrinsic_metadata": None,}, } class OriginHeadTestIndexer(OriginHeadIndexer): """Specific indexer whose configuration is enough to satisfy the indexing tests. """ def parse_config_file(self, *args, **kwargs): return ORIGIN_HEAD_CONFIG def persist_index_computations(self, results, policy_update): self.results = results class OriginHead(unittest.TestCase): def setUp(self): self.indexer = OriginHeadTestIndexer() self.indexer.catch_exceptions = False fill_storage(self.indexer.storage) def test_git(self): self.indexer.run(["https://github.com/SoftwareHeritage/swh-storage"]) self.assertEqual( self.indexer.results, [ { "revision_id": b"8K\x12\x00d\x03\xcc\xe4]bS\xe3\x8f{" b"\xd7}\xac\xefrm", "origin_url": "https://github.com/SoftwareHeritage/swh-storage", } ], ) def test_git_partial_snapshot(self): """Checks partial snapshots are ignored.""" origin_url = "https://github.com/SoftwareHeritage/swh-core" self.indexer.storage.origin_add_one( {"url": origin_url,} ) visit = self.indexer.storage.origin_visit_add( origin_url, datetime(2019, 2, 27, tzinfo=timezone.utc), type="git" ) self.indexer.storage.snapshot_add( [ { "id": b"foo", "branches": { b"foo": None, b"HEAD": {"target_type": "alias", "target": b"foo",}, }, } ] ) self.indexer.storage.origin_visit_update( origin_url, visit.visit, status="partial", snapshot=b"foo" ) self.indexer.run([origin_url]) self.assertEqual(self.indexer.results, []) def test_vcs_missing_snapshot(self): self.indexer.storage.origin_add( [{"url": "https://github.com/SoftwareHeritage/swh-indexer",}] ) self.indexer.run(["https://github.com/SoftwareHeritage/swh-indexer"]) self.assertEqual(self.indexer.results, []) def test_pypi_missing_branch(self): origin_url = "https://pypi.org/project/abcdef/" self.indexer.storage.origin_add_one( {"url": origin_url,} ) visit = self.indexer.storage.origin_visit_add( origin_url, datetime(2019, 2, 27, tzinfo=timezone.utc), type="pypi" ) self.indexer.storage.snapshot_add( [ { "id": b"foo", "branches": { b"foo": None, b"HEAD": {"target_type": "alias", "target": b"foo",}, }, } ] ) self.indexer.storage.origin_visit_update( origin_url, visit.visit, status="full", snapshot=b"foo" ) self.indexer.run(["https://pypi.org/project/abcdef/"]) self.assertEqual(self.indexer.results, []) def test_ftp(self): self.indexer.run(["rsync://ftp.gnu.org/gnu/3dldf"]) self.assertEqual( self.indexer.results, [ { "revision_id": b"\x8e\xa9\x8e/\xea}\x9feF\xf4\x9f\xfd\xee" b"\xcc\x1a\xb4`\x8c\x8by", "origin_url": "rsync://ftp.gnu.org/gnu/3dldf", } ], ) def test_ftp_missing_snapshot(self): self.indexer.storage.origin_add([{"url": "rsync://ftp.gnu.org/gnu/foobar",}]) self.indexer.run(["rsync://ftp.gnu.org/gnu/foobar"]) self.assertEqual(self.indexer.results, []) def test_deposit(self): - self.indexer.run(["https://forge.softwareheritage.org/source/" "jesuisgpl/"]) + self.indexer.run(["https://forge.softwareheritage.org/source/jesuisgpl/"]) self.assertEqual( self.indexer.results, [ { "revision_id": b"\xe7n\xa4\x9c\x9f\xfb\xb7\xf76\x11\x08{" b"\xa6\xe9\x99\xb1\x9e]q\xeb", "origin_url": "https://forge.softwareheritage.org/source/" "jesuisgpl/", } ], ) def test_deposit_missing_snapshot(self): self.indexer.storage.origin_add( [{"url": "https://forge.softwareheritage.org/source/foobar",}] ) self.indexer.run(["https://forge.softwareheritage.org/source/foobar"]) self.assertEqual(self.indexer.results, []) def test_pypi(self): self.indexer.run(["https://pypi.org/project/limnoria/"]) self.assertEqual( self.indexer.results, [ { "revision_id": b"\x83\xb9\xb6\xc7\x05\xb1%\xd0\xfem\xd8k" b"A\x10\x9d\xc5\xfa2\xf8t", "origin_url": "https://pypi.org/project/limnoria/", } ], ) def test_svn(self): self.indexer.run(["http://0-512-md.googlecode.com/svn/"]) self.assertEqual( self.indexer.results, [ { "revision_id": b"\xe4?r\xe1,\x88\xab\xec\xe7\x9a\x87\xb8" b"\xc9\xad#.\x1bw=\x18", "origin_url": "http://0-512-md.googlecode.com/svn/", } ], )