Page MenuHomeSoftware Heritage

Jenkins > .tox.py3.lib.python3.7.site-packages.swh.indexer.tests.test_cli::test_cli_journal_client_schedule
Failed

TEST RESULT

Run At
Jul 20 2022, 7:23 PM
Details
cli_runner = <click.testing.CliRunner object at 0x7f8cf2e52b70> swh_config = '/tmp/pytest-of-jenkins/pytest-0/test_cli_journal_client_schedu0/indexer.yml' indexer_scheduler = <swh.scheduler.backend.SchedulerBackend object at 0x7f8cf2f36400> kafka_prefix = 'nfkdjvesnd', kafka_server = '127.0.0.1:36401' consumer = <cimpl.Consumer object at 0x7f8cf2e5e408> def test_cli_journal_client_schedule( cli_runner, swh_config, indexer_scheduler, kafka_prefix: str, kafka_server, consumer: Consumer, ): """Test the 'swh indexer journal-client' cli tool.""" journal_writer = get_journal_writer( "kafka", brokers=[kafka_server], prefix=kafka_prefix, client_id="test producer", value_sanitizer=lambda object_type, value: value, flush_timeout=3, # fail early if something is going wrong ) visit_statuses = [ OriginVisitStatus( origin="file:///dev/zero", visit=1, date=now(), status="full", snapshot=None, ), OriginVisitStatus( origin="file:///dev/foobar", visit=2, date=now(), status="full", snapshot=None, ), OriginVisitStatus( origin="file:///tmp/spamegg", visit=3, date=now(), status="full", snapshot=None, ), OriginVisitStatus( origin="file:///dev/0002", visit=6, date=now(), status="full", snapshot=None, ), OriginVisitStatus( # will be filtered out due to its 'partial' status origin="file:///dev/0000", visit=4, date=now(), status="partial", snapshot=None, ), OriginVisitStatus( # will be filtered out due to its 'ongoing' status origin="file:///dev/0001", visit=5, date=now(), status="ongoing", snapshot=None, ), ] journal_writer.write_additions("origin_visit_status", visit_statuses) visit_statuses_full = [vs for vs in visit_statuses if vs.status == "full"] result = cli_runner.invoke( indexer_cli_group, [ "-C", swh_config, "journal-client", "--broker", kafka_server, "--prefix", kafka_prefix, "--group-id", "test-consumer", "--stop-after-objects", len(visit_statuses), "--origin-metadata-task-type", "index-origin-metadata", ], > catch_exceptions=False, ) .tox/py3/lib/python3.7/site-packages/swh/indexer/tests/test_cli.py:491: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ .tox/py3/lib/python3.7/site-packages/click/testing.py:408: in invoke return_value = cli.main(args=args or (), prog_name=prog_name, **extra) .tox/py3/lib/python3.7/site-packages/click/core.py:1055: in main rv = self.invoke(ctx) .tox/py3/lib/python3.7/site-packages/click/core.py:1657: in invoke return _process_result(sub_ctx.command.invoke(sub_ctx)) .tox/py3/lib/python3.7/site-packages/click/core.py:1404: in invoke return ctx.invoke(self.callback, **ctx.params) .tox/py3/lib/python3.7/site-packages/click/core.py:760: in invoke return __callback(*args, **kwargs) .tox/py3/lib/python3.7/site-packages/click/decorators.py:26: in new_func return f(get_current_context(), *args, **kwargs) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ ctx = <click.core.Context object at 0x7f8cf2dda358>, indexer = None scheduler_url = None, origin_metadata_task_type = 'index-origin-metadata' brokers = ('127.0.0.1:36401',), prefix = 'nfkdjvesnd' group_id = 'test-consumer', stop_after_objects = 6 @indexer_cli_group.command("journal-client") @click.argument( "indexer", type=click.Choice(["origin-intrinsic-metadata", "*"]), required=False # TODO: remove required=False after we stop using it ) @click.option("--scheduler-url", "-s", default=None, help="URL of the scheduler API") @click.option( "--origin-metadata-task-type", default="index-origin-metadata", help="Name of the task running the origin metadata indexer.", ) @click.option( "--broker", "brokers", type=str, multiple=True, help="Kafka broker to connect to." ) @click.option( "--prefix", type=str, default=None, help="Prefix of Kafka topic names to read from." ) @click.option("--group-id", type=str, help="Consumer/group id for reading from Kafka.") @click.option( "--stop-after-objects", "-m", default=None, type=int, help="Maximum number of objects to replay. Default is to run forever.", ) @click.pass_context def journal_client( ctx, indexer: Optional[str], scheduler_url: str, origin_metadata_task_type: str, brokers: List[str], prefix: str, group_id: str, stop_after_objects: Optional[int], ): """ Listens for new objects from the SWH Journal, and either: * runs the indexer with the name passed as argument, if any * schedules tasks to run relevant indexers (currently, only origin-intrinsic-metadata) on these new objects otherwise. Passing '*' as indexer name runs all indexers. """ import functools import warnings from swh.indexer.indexer import ObjectsDict from swh.indexer.journal_client import process_journal_objects from swh.journal.client import get_journal_client from swh.scheduler import get_scheduler cfg = ctx.obj["config"] journal_cfg = cfg.get("journal", {}) scheduler = _get_api(get_scheduler, cfg, "scheduler", scheduler_url) brokers = brokers or journal_cfg.get("brokers") if not brokers: raise ValueError("The brokers configuration is mandatory.") prefix = prefix or journal_cfg.get("prefix") group_id = group_id or journal_cfg.get("group_id") origin_metadata_task_type = origin_metadata_task_type or journal_cfg.get( "origin_metadata_task_type" ) stop_after_objects = stop_after_objects or journal_cfg.get("stop_after_objects") object_types = set() worker_fns: List[Callable[[ObjectsDict], Dict]] = [] if indexer is None: warnings.warn( "'swh indexer journal-client' with no argument creates scheduler tasks " "to index, rather than index directly.", DeprecationWarning, ) object_types.add("origin_visit_status") worker_fns.append( functools.partial( process_journal_objects, scheduler=scheduler, task_names={ "origin_metadata": origin_metadata_task_type, }, ) ) from swh.indexer.indexer import BaseIndexer idx: BaseIndexer if indexer in ("origin-intrinsic-metadata", "*"): from swh.indexer.metadata import OriginMetadataIndexer object_types.add("origin_visit_status") idx = OriginMetadataIndexer() if indexer in ("content-mimetype", "*"): from swh.indexer.mimetype import MimetypeIndexer object_types.add("content") idx = MimetypeIndexer() if indexer in ("content-fossology-license", "*"): from swh.indexer.fossology_license import FossologyLicenseIndexer object_types.add("content") idx = FossologyLicenseIndexer() > if idx: E UnboundLocalError: local variable 'idx' referenced before assignment .tox/py3/lib/python3.7/site-packages/swh/indexer/cli.py:327: UnboundLocalError