swh_search = <swh.search.elasticsearch.ElasticSearch object at 0x7f4d5019e3c8>
elasticsearch_host = '127.0.0.1:38625', kafka_prefix = 'wgldzywuhj'
kafka_server = '127.0.0.1:59971'
def test__journal_client__origin_visit(
swh_search, elasticsearch_host, kafka_prefix: str, kafka_server
):
"""Tests the re-indexing when origin_batch_size*task_batch_size is a
divisor of nb_origins."""
origin_foobar = {"url": "http://baz.foobar"}
producer = Producer(
{
"bootstrap.servers": kafka_server,
"client.id": "test search origin visit producer",
"acks": "all",
}
)
topic = f"{kafka_prefix}.origin_visit"
value = value_to_kafka({"origin": origin_foobar["url"], "type": "git"})
producer.produce(topic=topic, key=b"bogus-origin-visit", value=value)
journal_objects_config = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format(
broker=kafka_server, prefix=kafka_prefix, group_id="test-consumer"
)
result = invoke(
False,
[
"journal-client",
"objects",
"--stop-after-objects",
"1",
"--object-type",
"origin_visit",
],
journal_objects_config,
> elasticsearch_host=elasticsearch_host,
)
.tox/py3/lib/python3.7/site-packages/swh/search/tests/test_cli.py:143:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.tox/py3/lib/python3.7/site-packages/swh/search/tests/test_cli.py:50: in invoke
raise result.exception
.tox/py3/lib/python3.7/site-packages/click/testing.py:329: in invoke
cli.main(args=args or (), prog_name=prog_name, **extra)
.tox/py3/lib/python3.7/site-packages/click/core.py:782: in main
rv = self.invoke(ctx)
.tox/py3/lib/python3.7/site-packages/click/core.py:1259: in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
.tox/py3/lib/python3.7/site-packages/click/core.py:1259: in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
.tox/py3/lib/python3.7/site-packages/click/core.py:1066: in invoke
return ctx.invoke(self.callback, **ctx.params)
.tox/py3/lib/python3.7/site-packages/click/core.py:610: in invoke
return callback(*args, **kwargs)
.tox/py3/lib/python3.7/site-packages/click/decorators.py:21: in new_func
return f(get_current_context(), *args, **kwargs)
.tox/py3/lib/python3.7/site-packages/swh/search/cli.py:102: in journal_client_objects
nb_messages = client.process(worker_fn)
.tox/py3/lib/python3.7/site-packages/swh/journal/client.py:265: in process
batch_processed, at_eof = self.handle_messages(messages, worker_fn)
.tox/py3/lib/python3.7/site-packages/swh/journal/client.py:292: in handle_messages
worker_fn(dict(objects))
.tox/py3/lib/python3.7/site-packages/swh/search/journal_client.py:25: in process_journal_objects
process_origin_visits(messages["origin_visit"], search)
.tox/py3/lib/python3.7/site-packages/swh/search/journal_client.py:53: in process_origin_visits
for visit in visits
.tox/py3/lib/python3.7/site-packages/swh/search/metrics.py:23: in d
return f(*a, **kw)
.tox/py3/lib/python3.7/site-packages/swh/search/elasticsearch.py:207: in origin_update
indexed_count, errors = helpers.bulk(self._backend, actions, index=write_index)
.tox/py3/lib/python3.7/site-packages/elasticsearch/helpers/actions.py:411: in bulk
client, actions, ignore_status=ignore_status, *args, **kwargs
.tox/py3/lib/python3.7/site-packages/elasticsearch/helpers/actions.py:339: in streaming_bulk
**kwargs
.tox/py3/lib/python3.7/site-packages/elasticsearch/helpers/actions.py:256: in _process_bulk_chunk
for item in gen:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
resp = {'errors': True, 'items': [{}], 'took': 2}
bulk_data = [({'update': {'_id': '0cb823f7982094beb3ca277867e3a14a0dd559c2', '_index': 'test-write'}}, {'script': {'lang': 'painle..., 'upsert': {'sha1': '0cb823f7982094beb3ca277867e3a14a0dd559c2', 'url': 'http://baz.foobar', 'visit_types': ['git']}})]
ignore_status = (), raise_on_error = True
def _process_bulk_chunk_success(resp, bulk_data, ignore_status, raise_on_error=True):
# if raise on error is set, we need to collect errors per chunk before raising them
errors = []
# go through request-response pairs and detect failures
for data, (op_type, item) in zip(
bulk_data, map(methodcaller("popitem"), resp["items"])
):
status_code = item.get("status", 500)
ok = 200 <= status_code < 300
if not ok and raise_on_error and status_code not in ignore_status:
# include original document source
if len(data) > 1:
item["data"] = data[1]
errors.append({op_type: item})
if ok or not errors:
# if we are not just recording all errors to be able to raise
# them all at once, yield items individually
yield ok, {op_type: item}
if errors:
> raise BulkIndexError("%i document(s) failed to index." % len(errors), errors)
E elasticsearch.helpers.errors.BulkIndexError: ('1 document(s) failed to index.', [{'update': {'_index': 'test', '_type': '_doc', '_id': '0cb823f7982094beb3ca277867e3a14a0dd559c2', 'status': 400, 'error': {'type': 'illegal_argument_exception', 'reason': 'failed to execute script', 'caused_by': {'type': 'script_exception', 'reason': 'compile error', 'script_stack': ['... it_types field value\nLit visit_types = ctx._source ...', ' ^---- HERE'], 'script': '\n// backup current visit_types field value\nLit visit_types = ctx._source.getOrDefault("visit_types", []);\n\n// update origin document with new field values\nctx._source.putAll(params);\n\n// restore previous visit types after visit_types field overriding\nif (ctx._source.containsKey("visit_types")) {\n for (int i = 0; i < visit_types.length; ++i) {\n if (!ctx._source.visit_types.contains(visit_types[i])) {\n ctx._source.visit_types.add(visit_types[i]);\n }\n }\n}\n', 'lang': 'painless', 'position': {'offset': 47, 'start': 22, 'end': 72}, 'caused_by': {'type': 'illegal_argument_exception', 'reason': 'invalid declaration: cannot resolve type [Lit]'}}}, 'data': {'scripted_upsert': True, 'upsert': {'url': 'http://baz.foobar', 'visit_types': ['git'], 'sha1': '0cb823f7982094beb3ca277867e3a14a0dd559c2'}, 'script': {'source': '\n// backup current visit_types field value\nLit visit_types = ctx._source.getOrDefault("visit_types", []);\n\n// update origin document with new field values\nctx._source.putAll(params);\n\n// restore previous visit types after visit_types field overriding\nif (ctx._source.containsKey("visit_types")) {\n for (int i = 0; i < visit_types.length; ++i) {\n if (!ctx._source.visit_types.contains(visit_types[i])) {\n ctx._source.visit_types.add(visit_types[i]);\n }\n }\n}\n', 'lang': 'painless', 'params': {'url': 'http://baz.foobar', 'visit_types': ['git']}}}}}])
.tox/py3/lib/python3.7/site-packages/elasticsearch/helpers/actions.py:187: BulkIndexError
TEST RESULT
TEST RESULT
- Run At
- Jun 11 2021, 12:30 PM