swh_search = <swh.search.elasticsearch.ElasticSearch object at 0x7f37e4e25470>
elasticsearch_host = '127.0.0.1:55117', kafka_prefix = 'dqfetdyblg'
kafka_server = '127.0.0.1:55037'
def test__journal_client__origin_intrinsic_metadata(
swh_search, elasticsearch_host, kafka_prefix: str, kafka_server
):
"""Subscribing to origin-intrinsic-metadata should result in swh-search indexation
"""
origin_foobar = {"url": "https://github.com/clojure/clojure"}
origin_intrinsic_metadata = {
"id": origin_foobar["url"],
"metadata": {
"name": "clojure",
"type": "SoftwareSourceCode",
"license": "http://opensource.org/licenses/eclipse-1.0.php",
"version": "1.10.2-master-SNAPSHOT",
"@context": "https://doi.org/10.5063/schema/codemeta-2.0",
"identifier": "org.clojure",
"description": "Clojure core environment and runtime library.",
"codeRepository": "https://repo.maven.apache.org/maven2/org/clojure/clojure", # noqa
},
"indexer_configuration_id": 1,
"from_revision": hash_to_bytes("f47c139e20970ee0852166f48ee2a4626632b86e"),
"mappings": ["maven"],
}
producer = Producer(
{
"bootstrap.servers": kafka_server,
"client.id": "test search origin intrinsic metadata producer",
"acks": "all",
}
)
topic = f"{kafka_prefix}.origin_intrinsic_metadata"
value = value_to_kafka(origin_intrinsic_metadata)
producer.produce(topic=topic, key=b"bogus-origin-intrinsic-metadata", value=value)
journal_objects_config = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format(
broker=kafka_server, prefix=kafka_prefix, group_id="test-consumer"
)
result = invoke(
False,
[
"journal-client",
"objects",
"--stop-after-objects",
"1",
"--object-type",
"origin_intrinsic_metadata",
],
journal_objects_config,
> elasticsearch_host=elasticsearch_host,
)
.tox/py3/lib/python3.7/site-packages/swh/search/tests/test_cli.py:278:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.tox/py3/lib/python3.7/site-packages/swh/search/tests/test_cli.py:53: in invoke
raise result.exception
.tox/py3/lib/python3.7/site-packages/click/testing.py:329: in invoke
cli.main(args=args or (), prog_name=prog_name, **extra)
.tox/py3/lib/python3.7/site-packages/click/core.py:782: in main
rv = self.invoke(ctx)
.tox/py3/lib/python3.7/site-packages/click/core.py:1259: in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
.tox/py3/lib/python3.7/site-packages/click/core.py:1259: in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
.tox/py3/lib/python3.7/site-packages/click/core.py:1066: in invoke
return ctx.invoke(self.callback, **ctx.params)
.tox/py3/lib/python3.7/site-packages/click/core.py:610: in invoke
return callback(*args, **kwargs)
.tox/py3/lib/python3.7/site-packages/click/decorators.py:21: in new_func
return f(get_current_context(), *args, **kwargs)
.tox/py3/lib/python3.7/site-packages/swh/search/cli.py:97: in journal_client_objects
client = get_journal_client(cls="kafka", **journal_cfg,)
.tox/py3/lib/python3.7/site-packages/swh/journal/client.py:38: in get_journal_client
return JournalClient(**kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <swh.journal.client.JournalClient object at 0x7f37e4e262e8>
brokers = ['127.0.0.1:55037'], group_id = 'test-consumer', prefix = 'dqfetdyblg'
object_types = ('origin_intrinsic_metadata',), privileged = False
stop_after_objects = 1, batch_size = 200, process_timeout = None
auto_offset_reset = 'earliest', stop_on_eof = False, kwargs = {}
debug_logging = False, group_instance_id = None
consumer_settings = {'auto.offset.reset': 'earliest', 'bootstrap.servers': '127.0.0.1:55037', 'enable.auto.commit': False, 'error_cb': <function _error_cb at 0x7f37f6236950>, ...}
existing_topics = ['dqfetdyblg.skipped_content', 'dqfetdyblg.origin_visit_status', 'dqfetdyblg.raw_extrinsic_metadata', 'dqfetdyblg.revision', 'dqfetdyblg.directory', 'dqfetdyblg.snapshot', ...]
unknown_types = ['origin_intrinsic_metadata']
object_type = 'origin_intrinsic_metadata'
topics = ('dqfetdyblg.origin_intrinsic_metadata', 'dqfetdyblg.origin_intrinsic_metadata')
def __init__(
self,
brokers: Union[str, List[str]],
group_id: str,
prefix: Optional[str] = None,
object_types: Optional[List[str]] = None,
privileged: bool = False,
stop_after_objects: Optional[int] = None,
batch_size: int = 200,
process_timeout: Optional[float] = None,
auto_offset_reset: str = "earliest",
stop_on_eof: bool = False,
**kwargs,
):
if prefix is None:
prefix = DEFAULT_PREFIX
if auto_offset_reset not in ACCEPTED_OFFSET_RESET:
raise ValueError(
"Option 'auto_offset_reset' only accept %s, not %s"
% (ACCEPTED_OFFSET_RESET, auto_offset_reset)
)
if batch_size <= 0:
raise ValueError("Option 'batch_size' needs to be positive")
self.value_deserializer = kafka_to_value
if isinstance(brokers, str):
brokers = [brokers]
debug_logging = rdkafka_logger.isEnabledFor(logging.DEBUG)
if debug_logging and "debug" not in kwargs:
kwargs["debug"] = "consumer"
# Static group instance id management
group_instance_id = os.environ.get("KAFKA_GROUP_INSTANCE_ID")
if group_instance_id:
kwargs["group.instance.id"] = group_instance_id
if "group.instance.id" in kwargs:
# When doing static consumer group membership, set a higher default
# session timeout. The session timeout is the duration after which
# the broker considers that a consumer has left the consumer group
# for good, and triggers a rebalance. Considering our current
# processing pattern, 10 minutes gives the consumer ample time to
# restart before that happens.
if "session.timeout.ms" not in kwargs:
kwargs["session.timeout.ms"] = 10 * 60 * 1000 # 10 minutes
if "session.timeout.ms" in kwargs:
# When the session timeout is set, rdkafka requires the max poll
# interval to be set to a higher value; the max poll interval is
# rdkafka's way of figuring out whether the client's message
# processing thread has stalled: when the max poll interval lapses
# between two calls to consumer.poll(), rdkafka leaves the consumer
# group and terminates the connection to the brokers.
#
# We default to 1.5 times the session timeout
if "max.poll.interval.ms" not in kwargs:
kwargs["max.poll.interval.ms"] = kwargs["session.timeout.ms"] // 2 * 3
consumer_settings = {
**kwargs,
"bootstrap.servers": ",".join(brokers),
"auto.offset.reset": auto_offset_reset,
"group.id": group_id,
"on_commit": _on_commit,
"error_cb": _error_cb,
"enable.auto.commit": False,
"logger": rdkafka_logger,
}
self.stop_on_eof = stop_on_eof
if self.stop_on_eof:
consumer_settings["enable.partition.eof"] = True
logger.debug("Consumer settings: %s", consumer_settings)
self.consumer = Consumer(consumer_settings)
if privileged:
privileged_prefix = f"{prefix}_privileged"
else: # do not attempt to subscribe to privileged topics
privileged_prefix = f"{prefix}"
existing_topics = [
topic
for topic in self.consumer.list_topics(timeout=10).topics.keys()
if (
topic.startswith(f"{prefix}.")
or topic.startswith(f"{privileged_prefix}.")
)
]
if not existing_topics:
raise ValueError(
f"The prefix {prefix} does not match any existing topic "
"on the kafka broker"
)
if not object_types:
object_types = list({topic.split(".")[-1] for topic in existing_topics})
self.subscription = []
unknown_types = []
for object_type in object_types:
topics = (f"{privileged_prefix}.{object_type}", f"{prefix}.{object_type}")
for topic in topics:
if topic in existing_topics:
self.subscription.append(topic)
break
else:
unknown_types.append(object_type)
if unknown_types:
raise ValueError(
> f"Topic(s) for object types {','.join(unknown_types)} "
"are unknown on the kafka broker"
)
E ValueError: Topic(s) for object types origin_intrinsic_metadata are unknown on the kafka broker
.tox/py3/lib/python3.7/site-packages/swh/journal/client.py:202: ValueError
TEST RESULT
TEST RESULT
- Run At
- Jul 16 2021, 8:07 PM