Page MenuHomeSoftware Heritage

Jenkins > .tox.py3.lib.python3.7.site-packages.swh.search.tests.test_cli::test__journal_client__origin_intrinsic_metadata
Failed

TEST RESULT

Run At
Jul 16 2021, 8:07 PM
Details
swh_search = <swh.search.elasticsearch.ElasticSearch object at 0x7f37e4e25470> elasticsearch_host = '127.0.0.1:55117', kafka_prefix = 'dqfetdyblg' kafka_server = '127.0.0.1:55037' def test__journal_client__origin_intrinsic_metadata( swh_search, elasticsearch_host, kafka_prefix: str, kafka_server ): """Subscribing to origin-intrinsic-metadata should result in swh-search indexation """ origin_foobar = {"url": "https://github.com/clojure/clojure"} origin_intrinsic_metadata = { "id": origin_foobar["url"], "metadata": { "name": "clojure", "type": "SoftwareSourceCode", "license": "http://opensource.org/licenses/eclipse-1.0.php", "version": "1.10.2-master-SNAPSHOT", "@context": "https://doi.org/10.5063/schema/codemeta-2.0", "identifier": "org.clojure", "description": "Clojure core environment and runtime library.", "codeRepository": "https://repo.maven.apache.org/maven2/org/clojure/clojure", # noqa }, "indexer_configuration_id": 1, "from_revision": hash_to_bytes("f47c139e20970ee0852166f48ee2a4626632b86e"), "mappings": ["maven"], } producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test search origin intrinsic metadata producer", "acks": "all", } ) topic = f"{kafka_prefix}.origin_intrinsic_metadata" value = value_to_kafka(origin_intrinsic_metadata) producer.produce(topic=topic, key=b"bogus-origin-intrinsic-metadata", value=value) journal_objects_config = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format( broker=kafka_server, prefix=kafka_prefix, group_id="test-consumer" ) result = invoke( False, [ "journal-client", "objects", "--stop-after-objects", "1", "--object-type", "origin_intrinsic_metadata", ], journal_objects_config, > elasticsearch_host=elasticsearch_host, ) .tox/py3/lib/python3.7/site-packages/swh/search/tests/test_cli.py:278: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ .tox/py3/lib/python3.7/site-packages/swh/search/tests/test_cli.py:53: in invoke raise result.exception .tox/py3/lib/python3.7/site-packages/click/testing.py:329: in invoke cli.main(args=args or (), prog_name=prog_name, **extra) .tox/py3/lib/python3.7/site-packages/click/core.py:782: in main rv = self.invoke(ctx) .tox/py3/lib/python3.7/site-packages/click/core.py:1259: in invoke return _process_result(sub_ctx.command.invoke(sub_ctx)) .tox/py3/lib/python3.7/site-packages/click/core.py:1259: in invoke return _process_result(sub_ctx.command.invoke(sub_ctx)) .tox/py3/lib/python3.7/site-packages/click/core.py:1066: in invoke return ctx.invoke(self.callback, **ctx.params) .tox/py3/lib/python3.7/site-packages/click/core.py:610: in invoke return callback(*args, **kwargs) .tox/py3/lib/python3.7/site-packages/click/decorators.py:21: in new_func return f(get_current_context(), *args, **kwargs) .tox/py3/lib/python3.7/site-packages/swh/search/cli.py:97: in journal_client_objects client = get_journal_client(cls="kafka", **journal_cfg,) .tox/py3/lib/python3.7/site-packages/swh/journal/client.py:38: in get_journal_client return JournalClient(**kwargs) _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <swh.journal.client.JournalClient object at 0x7f37e4e262e8> brokers = ['127.0.0.1:55037'], group_id = 'test-consumer', prefix = 'dqfetdyblg' object_types = ('origin_intrinsic_metadata',), privileged = False stop_after_objects = 1, batch_size = 200, process_timeout = None auto_offset_reset = 'earliest', stop_on_eof = False, kwargs = {} debug_logging = False, group_instance_id = None consumer_settings = {'auto.offset.reset': 'earliest', 'bootstrap.servers': '127.0.0.1:55037', 'enable.auto.commit': False, 'error_cb': <function _error_cb at 0x7f37f6236950>, ...} existing_topics = ['dqfetdyblg.skipped_content', 'dqfetdyblg.origin_visit_status', 'dqfetdyblg.raw_extrinsic_metadata', 'dqfetdyblg.revision', 'dqfetdyblg.directory', 'dqfetdyblg.snapshot', ...] unknown_types = ['origin_intrinsic_metadata'] object_type = 'origin_intrinsic_metadata' topics = ('dqfetdyblg.origin_intrinsic_metadata', 'dqfetdyblg.origin_intrinsic_metadata') def __init__( self, brokers: Union[str, List[str]], group_id: str, prefix: Optional[str] = None, object_types: Optional[List[str]] = None, privileged: bool = False, stop_after_objects: Optional[int] = None, batch_size: int = 200, process_timeout: Optional[float] = None, auto_offset_reset: str = "earliest", stop_on_eof: bool = False, **kwargs, ): if prefix is None: prefix = DEFAULT_PREFIX if auto_offset_reset not in ACCEPTED_OFFSET_RESET: raise ValueError( "Option 'auto_offset_reset' only accept %s, not %s" % (ACCEPTED_OFFSET_RESET, auto_offset_reset) ) if batch_size <= 0: raise ValueError("Option 'batch_size' needs to be positive") self.value_deserializer = kafka_to_value if isinstance(brokers, str): brokers = [brokers] debug_logging = rdkafka_logger.isEnabledFor(logging.DEBUG) if debug_logging and "debug" not in kwargs: kwargs["debug"] = "consumer" # Static group instance id management group_instance_id = os.environ.get("KAFKA_GROUP_INSTANCE_ID") if group_instance_id: kwargs["group.instance.id"] = group_instance_id if "group.instance.id" in kwargs: # When doing static consumer group membership, set a higher default # session timeout. The session timeout is the duration after which # the broker considers that a consumer has left the consumer group # for good, and triggers a rebalance. Considering our current # processing pattern, 10 minutes gives the consumer ample time to # restart before that happens. if "session.timeout.ms" not in kwargs: kwargs["session.timeout.ms"] = 10 * 60 * 1000 # 10 minutes if "session.timeout.ms" in kwargs: # When the session timeout is set, rdkafka requires the max poll # interval to be set to a higher value; the max poll interval is # rdkafka's way of figuring out whether the client's message # processing thread has stalled: when the max poll interval lapses # between two calls to consumer.poll(), rdkafka leaves the consumer # group and terminates the connection to the brokers. # # We default to 1.5 times the session timeout if "max.poll.interval.ms" not in kwargs: kwargs["max.poll.interval.ms"] = kwargs["session.timeout.ms"] // 2 * 3 consumer_settings = { **kwargs, "bootstrap.servers": ",".join(brokers), "auto.offset.reset": auto_offset_reset, "group.id": group_id, "on_commit": _on_commit, "error_cb": _error_cb, "enable.auto.commit": False, "logger": rdkafka_logger, } self.stop_on_eof = stop_on_eof if self.stop_on_eof: consumer_settings["enable.partition.eof"] = True logger.debug("Consumer settings: %s", consumer_settings) self.consumer = Consumer(consumer_settings) if privileged: privileged_prefix = f"{prefix}_privileged" else: # do not attempt to subscribe to privileged topics privileged_prefix = f"{prefix}" existing_topics = [ topic for topic in self.consumer.list_topics(timeout=10).topics.keys() if ( topic.startswith(f"{prefix}.") or topic.startswith(f"{privileged_prefix}.") ) ] if not existing_topics: raise ValueError( f"The prefix {prefix} does not match any existing topic " "on the kafka broker" ) if not object_types: object_types = list({topic.split(".")[-1] for topic in existing_topics}) self.subscription = [] unknown_types = [] for object_type in object_types: topics = (f"{privileged_prefix}.{object_type}", f"{prefix}.{object_type}") for topic in topics: if topic in existing_topics: self.subscription.append(topic) break else: unknown_types.append(object_type) if unknown_types: raise ValueError( > f"Topic(s) for object types {','.join(unknown_types)} " "are unknown on the kafka broker" ) E ValueError: Topic(s) for object types origin_intrinsic_metadata are unknown on the kafka broker .tox/py3/lib/python3.7/site-packages/swh/journal/client.py:202: ValueError