diff --git a/PKG-INFO b/PKG-INFO index 8f413ad..5e76949 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,71 +1,71 @@ Metadata-Version: 2.1 Name: swh.indexer -Version: 0.4.1 +Version: 0.4.2 Summary: Software Heritage Content Indexer Home-page: https://forge.softwareheritage.org/diffusion/78/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-indexer Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-indexer/ Description: swh-indexer ============ Tools to compute multiple indexes on SWH's raw contents: - content: - mimetype - ctags - language - fossology-license - metadata - revision: - metadata An indexer is in charge of: - looking up objects - extracting information from those objects - store those information in the swh-indexer db There are multiple indexers working on different object types: - content indexer: works with content sha1 hashes - revision indexer: works with revision sha1 hashes - origin indexer: works with origin identifiers Indexation procedure: - receive batch of ids - retrieve the associated data depending on object type - compute for that object some index - store the result to swh's storage Current content indexers: - mimetype (queue swh_indexer_content_mimetype): detect the encoding and mimetype - language (queue swh_indexer_content_language): detect the programming language - ctags (queue swh_indexer_content_ctags): compute tags information - fossology-license (queue swh_indexer_fossology_license): compute the license - metadata: translate file into translated_metadata dict Current revision indexers: - metadata: detects files containing metadata and retrieves translated_metadata in content_metadata table in storage or run content indexer to translate files. Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh.indexer.egg-info/PKG-INFO b/swh.indexer.egg-info/PKG-INFO index 8f413ad..5e76949 100644 --- a/swh.indexer.egg-info/PKG-INFO +++ b/swh.indexer.egg-info/PKG-INFO @@ -1,71 +1,71 @@ Metadata-Version: 2.1 Name: swh.indexer -Version: 0.4.1 +Version: 0.4.2 Summary: Software Heritage Content Indexer Home-page: https://forge.softwareheritage.org/diffusion/78/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest Project-URL: Funding, https://www.softwareheritage.org/donate Project-URL: Source, https://forge.softwareheritage.org/source/swh-indexer Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-indexer/ Description: swh-indexer ============ Tools to compute multiple indexes on SWH's raw contents: - content: - mimetype - ctags - language - fossology-license - metadata - revision: - metadata An indexer is in charge of: - looking up objects - extracting information from those objects - store those information in the swh-indexer db There are multiple indexers working on different object types: - content indexer: works with content sha1 hashes - revision indexer: works with revision sha1 hashes - origin indexer: works with origin identifiers Indexation procedure: - receive batch of ids - retrieve the associated data depending on object type - compute for that object some index - store the result to swh's storage Current content indexers: - mimetype (queue swh_indexer_content_mimetype): detect the encoding and mimetype - language (queue swh_indexer_content_language): detect the programming language - ctags (queue swh_indexer_content_ctags): compute tags information - fossology-license (queue swh_indexer_fossology_license): compute the license - metadata: translate file into translated_metadata dict Current revision indexers: - metadata: detects files containing metadata and retrieves translated_metadata in content_metadata table in storage or run content indexer to translate files. Platform: UNKNOWN Classifier: Programming Language :: Python :: 3 Classifier: Intended Audience :: Developers Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3) Classifier: Operating System :: OS Independent Classifier: Development Status :: 5 - Production/Stable Requires-Python: >=3.7 Description-Content-Type: text/markdown Provides-Extra: testing diff --git a/swh.indexer.egg-info/SOURCES.txt b/swh.indexer.egg-info/SOURCES.txt index 1233494..6f7120d 100644 --- a/swh.indexer.egg-info/SOURCES.txt +++ b/swh.indexer.egg-info/SOURCES.txt @@ -1,138 +1,138 @@ .gitignore .pre-commit-config.yaml AUTHORS CODE_OF_CONDUCT.md CONTRIBUTORS LICENSE MANIFEST.in Makefile Makefile.local README.md codemeta.json conftest.py mypy.ini pyproject.toml pytest.ini requirements-swh.txt requirements-test.txt requirements.txt setup.cfg setup.py tox.ini docs/.gitignore docs/Makefile docs/Makefile.local docs/README.md docs/conf.py docs/dev-info.rst docs/index.rst docs/metadata-workflow.rst docs/_static/.placeholder docs/_templates/.placeholder docs/images/.gitignore docs/images/Makefile docs/images/tasks-metadata-indexers.uml sql/bin/db-upgrade sql/bin/dot_add_content sql/doc/json sql/doc/json/.gitignore sql/doc/json/Makefile sql/doc/json/indexer_configuration.tool_configuration.schema.json sql/doc/json/revision_metadata.translated_metadata.json sql/json/.gitignore sql/json/Makefile sql/json/indexer_configuration.tool_configuration.schema.json sql/json/revision_metadata.translated_metadata.json sql/upgrades/115.sql sql/upgrades/116.sql sql/upgrades/117.sql sql/upgrades/118.sql sql/upgrades/119.sql sql/upgrades/120.sql sql/upgrades/121.sql sql/upgrades/122.sql sql/upgrades/123.sql sql/upgrades/124.sql sql/upgrades/125.sql sql/upgrades/126.sql sql/upgrades/127.sql sql/upgrades/128.sql sql/upgrades/129.sql sql/upgrades/130.sql sql/upgrades/131.sql sql/upgrades/132.sql swh/__init__.py swh.indexer.egg-info/PKG-INFO swh.indexer.egg-info/SOURCES.txt swh.indexer.egg-info/dependency_links.txt swh.indexer.egg-info/entry_points.txt swh.indexer.egg-info/requires.txt swh.indexer.egg-info/top_level.txt swh/indexer/__init__.py swh/indexer/cli.py swh/indexer/codemeta.py swh/indexer/ctags.py swh/indexer/fossology_license.py swh/indexer/indexer.py swh/indexer/journal_client.py swh/indexer/metadata.py swh/indexer/metadata_detector.py swh/indexer/mimetype.py swh/indexer/origin_head.py swh/indexer/py.typed swh/indexer/rehash.py swh/indexer/tasks.py swh/indexer/data/codemeta/CITATION swh/indexer/data/codemeta/LICENSE swh/indexer/data/codemeta/codemeta.jsonld swh/indexer/data/codemeta/crosswalk.csv swh/indexer/metadata_dictionary/__init__.py swh/indexer/metadata_dictionary/base.py swh/indexer/metadata_dictionary/codemeta.py swh/indexer/metadata_dictionary/maven.py swh/indexer/metadata_dictionary/npm.py swh/indexer/metadata_dictionary/python.py swh/indexer/metadata_dictionary/ruby.py -swh/indexer/sql/10-swh-init.sql -swh/indexer/sql/20-swh-enums.sql -swh/indexer/sql/30-swh-schema.sql -swh/indexer/sql/40-swh-func.sql -swh/indexer/sql/50-swh-data.sql -swh/indexer/sql/60-swh-indexes.sql +swh/indexer/sql/10-superuser-init.sql +swh/indexer/sql/20-enums.sql +swh/indexer/sql/30-schema.sql +swh/indexer/sql/50-data.sql +swh/indexer/sql/50-func.sql +swh/indexer/sql/60-indexes.sql swh/indexer/storage/__init__.py swh/indexer/storage/converters.py swh/indexer/storage/db.py swh/indexer/storage/exc.py swh/indexer/storage/in_memory.py swh/indexer/storage/interface.py swh/indexer/storage/metrics.py swh/indexer/storage/model.py swh/indexer/storage/api/__init__.py swh/indexer/storage/api/client.py swh/indexer/storage/api/serializers.py swh/indexer/storage/api/server.py swh/indexer/tests/__init__.py swh/indexer/tests/conftest.py swh/indexer/tests/tasks.py swh/indexer/tests/test_cli.py swh/indexer/tests/test_codemeta.py swh/indexer/tests/test_ctags.py swh/indexer/tests/test_fossology_license.py swh/indexer/tests/test_indexer.py swh/indexer/tests/test_journal_client.py swh/indexer/tests/test_metadata.py swh/indexer/tests/test_mimetype.py swh/indexer/tests/test_origin_head.py swh/indexer/tests/test_origin_metadata.py swh/indexer/tests/test_tasks.py swh/indexer/tests/utils.py swh/indexer/tests/storage/__init__.py swh/indexer/tests/storage/conftest.py swh/indexer/tests/storage/generate_data_test.py swh/indexer/tests/storage/test_api_client.py swh/indexer/tests/storage/test_converters.py swh/indexer/tests/storage/test_in_memory.py swh/indexer/tests/storage/test_init.py swh/indexer/tests/storage/test_metrics.py swh/indexer/tests/storage/test_server.py swh/indexer/tests/storage/test_storage.py \ No newline at end of file diff --git a/swh/indexer/cli.py b/swh/indexer/cli.py index 27d2a27..1b2b9d3 100644 --- a/swh/indexer/cli.py +++ b/swh/indexer/cli.py @@ -1,301 +1,301 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Iterator # WARNING: do not import unnecessary things here to keep cli startup time under # control import click from swh.core.cli import CONTEXT_SETTINGS, AliasedGroup from swh.core.cli import swh as swh_cli_group @swh_cli_group.group( name="indexer", context_settings=CONTEXT_SETTINGS, cls=AliasedGroup ) @click.option( "--config-file", "-C", default=None, type=click.Path(exists=True, dir_okay=False,), help="Configuration file.", ) @click.pass_context def indexer_cli_group(ctx, config_file): """Software Heritage Indexer tools. The Indexer is used to mine the content of the archive and extract derived information from archive source code artifacts. """ from swh.core import config ctx.ensure_object(dict) conf = config.read(config_file) ctx.obj["config"] = conf def _get_api(getter, config, config_key, url): if url: config[config_key] = {"cls": "remote", "args": {"url": url}} elif config_key not in config: raise click.ClickException("Missing configuration for {}".format(config_key)) return getter(**config[config_key]) @indexer_cli_group.group("mapping") def mapping(): """Manage Software Heritage Indexer mappings.""" pass @mapping.command("list") def mapping_list(): """Prints the list of known mappings.""" from swh.indexer import metadata_dictionary mapping_names = [mapping.name for mapping in metadata_dictionary.MAPPINGS.values()] mapping_names.sort() for mapping_name in mapping_names: click.echo(mapping_name) @mapping.command("list-terms") @click.option( "--exclude-mapping", multiple=True, help="Exclude the given mapping from the output" ) @click.option( "--concise", is_flag=True, default=False, help="Don't print the list of mappings supporting each term.", ) def mapping_list_terms(concise, exclude_mapping): """Prints the list of known CodeMeta terms, and which mappings support them.""" from swh.indexer import metadata_dictionary properties = metadata_dictionary.list_terms() for (property_name, supported_mappings) in sorted(properties.items()): supported_mappings = {m.name for m in supported_mappings} supported_mappings -= set(exclude_mapping) if supported_mappings: if concise: click.echo(property_name) else: click.echo("{}:".format(property_name)) click.echo("\t" + ", ".join(sorted(supported_mappings))) @mapping.command("translate") @click.argument("mapping-name") @click.argument("file", type=click.File("rb")) def mapping_translate(mapping_name, file): """Prints the list of known mappings.""" import json from swh.indexer import metadata_dictionary mapping_cls = [ cls for cls in metadata_dictionary.MAPPINGS.values() if cls.name == mapping_name ] if not mapping_cls: raise click.ClickException("Unknown mapping {}".format(mapping_name)) assert len(mapping_cls) == 1 mapping_cls = mapping_cls[0] mapping = mapping_cls() codemeta_doc = mapping.translate(file.read()) click.echo(json.dumps(codemeta_doc, indent=4)) @indexer_cli_group.group("schedule") @click.option("--scheduler-url", "-s", default=None, help="URL of the scheduler API") @click.option( "--indexer-storage-url", "-i", default=None, help="URL of the indexer storage API" ) @click.option( "--storage-url", "-g", default=None, help="URL of the (graph) storage API" ) @click.option( "--dry-run/--no-dry-run", is_flag=True, default=False, help="List only what would be scheduled.", ) @click.pass_context def schedule(ctx, scheduler_url, storage_url, indexer_storage_url, dry_run): """Manipulate Software Heritage Indexer tasks. Via SWH Scheduler's API.""" from swh.indexer.storage import get_indexer_storage from swh.scheduler import get_scheduler from swh.storage import get_storage ctx.obj["indexer_storage"] = _get_api( get_indexer_storage, ctx.obj["config"], "indexer_storage", indexer_storage_url ) ctx.obj["storage"] = _get_api( get_storage, ctx.obj["config"], "storage", storage_url ) ctx.obj["scheduler"] = _get_api( get_scheduler, ctx.obj["config"], "scheduler", scheduler_url ) if dry_run: ctx.obj["scheduler"] = None def list_origins_by_producer(idx_storage, mappings, tool_ids) -> Iterator[str]: next_page_token = "" limit = 10000 while next_page_token is not None: result = idx_storage.origin_intrinsic_metadata_search_by_producer( page_token=next_page_token, limit=limit, ids_only=True, mappings=mappings or None, tool_ids=tool_ids or None, ) next_page_token = result.next_page_token yield from result.results @schedule.command("reindex_origin_metadata") @click.option( "--batch-size", "-b", "origin_batch_size", default=10, show_default=True, type=int, help="Number of origins per task", ) @click.option( "--tool-id", "-t", "tool_ids", type=int, multiple=True, help="Restrict search of old metadata to this/these tool ids.", ) @click.option( "--mapping", "-m", "mappings", multiple=True, help="Mapping(s) that should be re-scheduled (eg. 'npm', 'gemspec', 'maven')", ) @click.option( "--task-type", default="index-origin-metadata", show_default=True, help="Name of the task type to schedule.", ) @click.pass_context def schedule_origin_metadata_reindex( ctx, origin_batch_size, tool_ids, mappings, task_type ): """Schedules indexing tasks for origins that were already indexed.""" from swh.scheduler.cli_utils import schedule_origin_batches idx_storage = ctx.obj["indexer_storage"] scheduler = ctx.obj["scheduler"] origins = list_origins_by_producer(idx_storage, mappings, tool_ids) - kwargs = {"policy_update": "update-dups"} + kwargs = {"policy_update": "update-dups", "retries_left": 1} schedule_origin_batches(scheduler, task_type, origins, origin_batch_size, kwargs) @indexer_cli_group.command("journal-client") @click.option("--scheduler-url", "-s", default=None, help="URL of the scheduler API") @click.option( "--origin-metadata-task-type", default="index-origin-metadata", help="Name of the task running the origin metadata indexer.", ) @click.option( "--broker", "brokers", type=str, multiple=True, help="Kafka broker to connect to." ) @click.option( "--prefix", type=str, default=None, help="Prefix of Kafka topic names to read from." ) @click.option("--group-id", type=str, help="Consumer/group id for reading from Kafka.") @click.option( "--stop-after-objects", "-m", default=None, type=int, help="Maximum number of objects to replay. Default is to run forever.", ) @click.pass_context def journal_client( ctx, scheduler_url, origin_metadata_task_type, brokers, prefix, group_id, stop_after_objects, ): """Listens for new objects from the SWH Journal, and schedules tasks to run relevant indexers (currently, only origin-intrinsic-metadata) on these new objects.""" import functools from swh.indexer.journal_client import process_journal_objects from swh.journal.client import get_journal_client from swh.scheduler import get_scheduler scheduler = _get_api(get_scheduler, ctx.obj["config"], "scheduler", scheduler_url) client = get_journal_client( cls="kafka", brokers=brokers, prefix=prefix, group_id=group_id, object_types=["origin_visit"], stop_after_objects=stop_after_objects, ) worker_fn = functools.partial( process_journal_objects, scheduler=scheduler, task_names={"origin_metadata": origin_metadata_task_type,}, ) try: client.process(worker_fn) except KeyboardInterrupt: ctx.exit(0) else: print("Done.") finally: client.close() @indexer_cli_group.command("rpc-serve") @click.argument("config-path", required=True) @click.option("--host", default="0.0.0.0", help="Host to run the server") @click.option("--port", default=5007, type=click.INT, help="Binding port of the server") @click.option( "--debug/--nodebug", default=True, help="Indicates if the server should run in debug mode", ) def rpc_server(config_path, host, port, debug): """Starts a Software Heritage Indexer RPC HTTP server.""" from swh.indexer.storage.api.server import app, load_and_check_config api_cfg = load_and_check_config(config_path, type="any") app.config.update(api_cfg) app.run(host, port=int(port), debug=bool(debug)) def main(): return indexer_cli_group(auto_envvar_prefix="SWH_INDEXER") if __name__ == "__main__": main() diff --git a/swh/indexer/journal_client.py b/swh/indexer/journal_client.py index 623dce2..9331e1e 100644 --- a/swh/indexer/journal_client.py +++ b/swh/indexer/journal_client.py @@ -1,44 +1,45 @@ -# Copyright (C) 2018 The Software Heritage developers +# Copyright (C) 2018-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from swh.core.utils import grouper from swh.scheduler.utils import create_task_dict MAX_ORIGINS_PER_TASK = 100 def process_journal_objects(messages, *, scheduler, task_names): """Worker function for `JournalClient.process(worker_fn)`, after currification of `scheduler` and `task_names`.""" assert set(messages) == {"origin_visit"}, set(messages) process_origin_visits(messages["origin_visit"], scheduler, task_names) def process_origin_visits(visits, scheduler, task_names): task_dicts = [] logging.debug("processing origin visits %r", visits) if task_names.get("origin_metadata"): visits = [visit for visit in visits if visit["status"] == "full"] visit_batches = grouper(visits, MAX_ORIGINS_PER_TASK) for visit_batch in visit_batches: visit_urls = [] for visit in visit_batch: if isinstance(visit["origin"], str): visit_urls.append(visit["origin"]) else: visit_urls.append(visit["origin"]["url"]) task_dicts.append( create_task_dict( task_names["origin_metadata"], "oneshot", visit_urls, policy_update="update-dups", + retries_left=1, ) ) if task_dicts: scheduler.create_tasks(task_dicts) diff --git a/swh/indexer/sql/10-swh-init.sql b/swh/indexer/sql/10-superuser-init.sql similarity index 100% rename from swh/indexer/sql/10-swh-init.sql rename to swh/indexer/sql/10-superuser-init.sql diff --git a/swh/indexer/sql/20-swh-enums.sql b/swh/indexer/sql/20-enums.sql similarity index 100% rename from swh/indexer/sql/20-swh-enums.sql rename to swh/indexer/sql/20-enums.sql diff --git a/swh/indexer/sql/30-swh-schema.sql b/swh/indexer/sql/30-schema.sql similarity index 100% rename from swh/indexer/sql/30-swh-schema.sql rename to swh/indexer/sql/30-schema.sql diff --git a/swh/indexer/sql/50-swh-data.sql b/swh/indexer/sql/50-data.sql similarity index 100% rename from swh/indexer/sql/50-swh-data.sql rename to swh/indexer/sql/50-data.sql diff --git a/swh/indexer/sql/40-swh-func.sql b/swh/indexer/sql/50-func.sql similarity index 100% rename from swh/indexer/sql/40-swh-func.sql rename to swh/indexer/sql/50-func.sql diff --git a/swh/indexer/sql/60-swh-indexes.sql b/swh/indexer/sql/60-indexes.sql similarity index 100% rename from swh/indexer/sql/60-swh-indexes.sql rename to swh/indexer/sql/60-indexes.sql diff --git a/swh/indexer/tests/conftest.py b/swh/indexer/tests/conftest.py index 53abb5b..fc8acea 100644 --- a/swh/indexer/tests/conftest.py +++ b/swh/indexer/tests/conftest.py @@ -1,99 +1,105 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from datetime import timedelta import os +from typing import List, Tuple from unittest.mock import patch import pytest import yaml from swh.indexer.storage import get_indexer_storage from swh.objstorage.factory import get_objstorage from swh.storage import get_storage from .utils import fill_obj_storage, fill_storage -TASK_NAMES = ["revision_intrinsic_metadata", "origin_intrinsic_metadata"] +TASK_NAMES: List[Tuple[str, str]] = [ + # (scheduler-task-type, task-class-test-name) + ("index-revision-metadata", "revision_intrinsic_metadata"), + ("index-origin-metadata", "origin_intrinsic_metadata"), +] @pytest.fixture def indexer_scheduler(swh_scheduler): - for taskname in TASK_NAMES: + # Insert the expected task types within the scheduler + for task_name, task_class_name in TASK_NAMES: swh_scheduler.create_task_type( { - "type": taskname, - "description": "The {} indexer testing task".format(taskname), - "backend_name": "swh.indexer.tests.tasks.{}".format(taskname), + "type": task_name, + "description": f"The {task_class_name} indexer testing task", + "backend_name": f"swh.indexer.tests.tasks.{task_class_name}", "default_interval": timedelta(days=1), "min_interval": timedelta(hours=6), "max_interval": timedelta(days=12), "num_retries": 3, } ) return swh_scheduler @pytest.fixture def idx_storage(): """An instance of in-memory indexer storage that gets injected into all indexers classes. """ idx_storage = get_indexer_storage("memory") with patch("swh.indexer.storage.in_memory.IndexerStorage") as idx_storage_mock: idx_storage_mock.return_value = idx_storage yield idx_storage @pytest.fixture def storage(): """An instance of in-memory storage that gets injected into all indexers classes. """ storage = get_storage(cls="memory") fill_storage(storage) with patch("swh.storage.in_memory.InMemoryStorage") as storage_mock: storage_mock.return_value = storage yield storage @pytest.fixture def obj_storage(): """An instance of in-memory objstorage that gets injected into all indexers classes. """ objstorage = get_objstorage("memory", {}) fill_obj_storage(objstorage) with patch.dict( "swh.objstorage.factory._STORAGE_CLASSES", {"memory": lambda: objstorage} ): yield objstorage @pytest.fixture def swh_indexer_config(): return { "storage": {"cls": "memory"}, "objstorage": {"cls": "memory", "args": {},}, "indexer_storage": {"cls": "memory", "args": {},}, "tools": { "name": "file", "version": "1:5.30-1+deb9u1", "configuration": {"type": "library", "debian-package": "python3-magic"}, }, "compute_checksums": ["blake2b512"], # for rehash indexer } @pytest.fixture def swh_config(swh_indexer_config, monkeypatch, tmp_path): conffile = os.path.join(str(tmp_path), "indexer.yml") with open(conffile, "w") as f: f.write(yaml.dump(swh_indexer_config)) monkeypatch.setenv("SWH_CONFIG_FILENAME", conffile) return conffile diff --git a/swh/indexer/tests/test_journal_client.py b/swh/indexer/tests/test_journal_client.py index 2651ada..c27bb01 100644 --- a/swh/indexer/tests/test_journal_client.py +++ b/swh/indexer/tests/test_journal_client.py @@ -1,154 +1,159 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from unittest.mock import Mock, patch from swh.indexer.journal_client import process_journal_objects class JournalClientTest(unittest.TestCase): def testOneOriginVisit(self): mock_scheduler = Mock() messages = { "origin_visit": [{"status": "full", "origin": "file:///dev/zero",},] } process_journal_objects( messages, scheduler=mock_scheduler, task_names={"origin_metadata": "task-name"}, ) self.assertTrue(mock_scheduler.create_tasks.called) call_args = mock_scheduler.create_tasks.call_args (args, kwargs) = call_args self.assertEqual(kwargs, {}) del args[0][0]["next_run"] self.assertEqual( args, ( [ { "arguments": { "kwargs": {"policy_update": "update-dups"}, "args": (["file:///dev/zero"],), }, "policy": "oneshot", "type": "task-name", + "retries_left": 1, }, ], ), ) def testOriginVisitLegacy(self): mock_scheduler = Mock() messages = { "origin_visit": [ {"status": "full", "origin": {"url": "file:///dev/zero",}}, ] } process_journal_objects( messages, scheduler=mock_scheduler, task_names={"origin_metadata": "task-name"}, ) self.assertTrue(mock_scheduler.create_tasks.called) call_args = mock_scheduler.create_tasks.call_args (args, kwargs) = call_args self.assertEqual(kwargs, {}) del args[0][0]["next_run"] self.assertEqual( args, ( [ { "arguments": { "kwargs": {"policy_update": "update-dups"}, "args": (["file:///dev/zero"],), }, "policy": "oneshot", "type": "task-name", + "retries_left": 1, }, ], ), ) def testOneOriginVisitBatch(self): mock_scheduler = Mock() messages = { "origin_visit": [ {"status": "full", "origin": "file:///dev/zero",}, {"status": "full", "origin": "file:///tmp/foobar",}, ] } process_journal_objects( messages, scheduler=mock_scheduler, task_names={"origin_metadata": "task-name"}, ) self.assertTrue(mock_scheduler.create_tasks.called) call_args = mock_scheduler.create_tasks.call_args (args, kwargs) = call_args self.assertEqual(kwargs, {}) del args[0][0]["next_run"] self.assertEqual( args, ( [ { "arguments": { "kwargs": {"policy_update": "update-dups"}, "args": (["file:///dev/zero", "file:///tmp/foobar"],), }, "policy": "oneshot", "type": "task-name", + "retries_left": 1, }, ], ), ) @patch("swh.indexer.journal_client.MAX_ORIGINS_PER_TASK", 2) def testOriginVisitBatches(self): mock_scheduler = Mock() messages = { "origin_visit": [ {"status": "full", "origin": "file:///dev/zero",}, {"status": "full", "origin": "file:///tmp/foobar",}, {"status": "full", "origin": "file:///tmp/spamegg",}, ] } process_journal_objects( messages, scheduler=mock_scheduler, task_names={"origin_metadata": "task-name"}, ) self.assertTrue(mock_scheduler.create_tasks.called) call_args = mock_scheduler.create_tasks.call_args (args, kwargs) = call_args self.assertEqual(kwargs, {}) del args[0][0]["next_run"] del args[0][1]["next_run"] self.assertEqual( args, ( [ { "arguments": { "kwargs": {"policy_update": "update-dups"}, "args": (["file:///dev/zero", "file:///tmp/foobar"],), }, "policy": "oneshot", "type": "task-name", + "retries_left": 1, }, { "arguments": { "kwargs": {"policy_update": "update-dups"}, "args": (["file:///tmp/spamegg"],), }, "policy": "oneshot", "type": "task-name", + "retries_left": 1, }, ], ), )