swh_search = <swh.search.elasticsearch.ElasticSearch object at 0x7f91184182b0>
elasticsearch_host = '127.0.0.1:40783', kafka_prefix = 'pjsbvrxofe'
kafka_server = '127.0.0.1:42973'
def test__journal_client__origin_visit_status(
swh_search, elasticsearch_host, kafka_prefix: str, kafka_server
):
"""Subscribing to origin-visit-status should result in swh-search indexation
"""
origin_foobar = {"url": "http://baz.foobar"}
producer = Producer(
{
"bootstrap.servers": kafka_server,
"client.id": "test search origin visit status producer",
"acks": "all",
}
)
topic = f"{kafka_prefix}.origin_visit_status"
value = value_to_kafka(
{
"origin": origin_foobar["url"],
"visit": 1,
"date": datetime.now(tz=timezone.utc),
"snapshot": None,
"status": "full",
}
)
producer.produce(topic=topic, key=b"bogus-origin-visit-status", value=value)
journal_objects_config = JOURNAL_OBJECTS_CONFIG_TEMPLATE.format(
broker=kafka_server, prefix=kafka_prefix, group_id="test-consumer"
)
result = invoke(
False,
[
"journal-client",
"objects",
"--stop-after-objects",
"1",
"--prefix",
kafka_prefix,
"--object-type",
"origin_visit_status",
],
journal_objects_config,
> elasticsearch_host=elasticsearch_host,
)
.tox/py3/lib/python3.7/site-packages/swh/search/tests/test_cli.py:206:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.tox/py3/lib/python3.7/site-packages/swh/search/tests/test_cli.py:51: in invoke
raise result.exception
.tox/py3/lib/python3.7/site-packages/click/testing.py:329: in invoke
cli.main(args=args or (), prog_name=prog_name, **extra)
.tox/py3/lib/python3.7/site-packages/click/core.py:782: in main
rv = self.invoke(ctx)
.tox/py3/lib/python3.7/site-packages/click/core.py:1259: in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
.tox/py3/lib/python3.7/site-packages/click/core.py:1259: in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
.tox/py3/lib/python3.7/site-packages/click/core.py:1066: in invoke
return ctx.invoke(self.callback, **ctx.params)
.tox/py3/lib/python3.7/site-packages/click/core.py:610: in invoke
return callback(*args, **kwargs)
.tox/py3/lib/python3.7/site-packages/click/decorators.py:21: in new_func
return f(get_current_context(), *args, **kwargs)
.tox/py3/lib/python3.7/site-packages/swh/search/cli.py:102: in journal_client_objects
nb_messages = client.process(worker_fn)
.tox/py3/lib/python3.7/site-packages/swh/journal/client.py:265: in process
batch_processed, at_eof = self.handle_messages(messages, worker_fn)
.tox/py3/lib/python3.7/site-packages/swh/journal/client.py:292: in handle_messages
worker_fn(dict(objects))
.tox/py3/lib/python3.7/site-packages/swh/search/journal_client.py:40: in process_journal_objects
process_origin_visit_statuses(messages["origin_visit_status"], search)
.tox/py3/lib/python3.7/site-packages/swh/search/journal_client.py:136: in process_origin_visit_statuses
search.origin_update(full_visit_status)
.tox/py3/lib/python3.7/site-packages/swh/search/metrics.py:23: in d
return f(*a, **kw)
.tox/py3/lib/python3.7/site-packages/swh/search/elasticsearch.py:272: in origin_update
indexed_count, errors = helpers.bulk(self._backend, actions, index=write_index)
.tox/py3/lib/python3.7/site-packages/elasticsearch/helpers/actions.py:411: in bulk
client, actions, ignore_status=ignore_status, *args, **kwargs
.tox/py3/lib/python3.7/site-packages/elasticsearch/helpers/actions.py:339: in streaming_bulk
**kwargs
.tox/py3/lib/python3.7/site-packages/elasticsearch/helpers/actions.py:256: in _process_bulk_chunk
for item in gen:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
resp = {'errors': True, 'items': [{}], 'took': 4}
bulk_data = [({'update': {'_id': '0cb823f7982094beb3ca277867e3a14a0dd559c2', '_index': 'test-write'}}, {'script': {'lang': 'painle...6-17T10:58:15.629393+00:00', 'last_revision_date': None, 'last_visit_date': '2021-06-17T10:58:15.629393+00:00', ...}})]
ignore_status = (), raise_on_error = True
def _process_bulk_chunk_success(resp, bulk_data, ignore_status, raise_on_error=True):
# if raise on error is set, we need to collect errors per chunk before raising them
errors = []
# go through request-response pairs and detect failures
for data, (op_type, item) in zip(
bulk_data, map(methodcaller("popitem"), resp["items"])
):
status_code = item.get("status", 500)
ok = 200 <= status_code < 300
if not ok and raise_on_error and status_code not in ignore_status:
# include original document source
if len(data) > 1:
item["data"] = data[1]
errors.append({op_type: item})
if ok or not errors:
# if we are not just recording all errors to be able to raise
# them all at once, yield items individually
yield ok, {op_type: item}
if errors:
> raise BulkIndexError("%i document(s) failed to index." % len(errors), errors)
E elasticsearch.helpers.errors.BulkIndexError: ('1 document(s) failed to index.', [{'update': {'_index': 'test', '_type': '_doc', '_id': '0cb823f7982094beb3ca277867e3a14a0dd559c2', 'status': 400, 'error': {'type': 'illegal_argument_exception', 'reason': 'failed to execute script', 'caused_by': {'type': 'script_exception', 'reason': 'runtime error', 'script_stack': ['java.base/java.util.Objects.requireNonNull(Objects.java:233)', 'java.base/java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1951)', 'java.base/java.time.ZonedDateTime.parse(ZonedDateTime.java:600)', 'java.base/java.time.ZonedDateTime.parse(ZonedDateTime.java:585)', 'last_revision_date = ZonedDateTime.parse(ctx._source.getOrDefault("last_revision_date", "0001-01-01T00:00:00Z"));\n\n// update origin document with new field values\n', ' ^---- HERE'], 'script': ' ...', 'lang': 'painless', 'position': {'offset': 559, 'start': 507, 'end': 670}, 'caused_by': {'type': 'null_pointer_exception', 'reason': 'text'}}}, 'data': {'scripted_upsert': True, 'upsert': {'url': 'http://baz.foobar', 'has_visits': True, 'nb_visits': 1, 'snapshot_id': None, 'last_visit_date': '2021-06-17T10:58:15.629393+00:00', 'last_eventful_visit_date': '2021-06-17T10:58:15.629393+00:00', 'last_revision_date': None, 'sha1': '0cb823f7982094beb3ca277867e3a14a0dd559c2'}, 'script': {'source': '\n// backup current visit_types field value\nList visit_types = ctx._source.getOrDefault("visit_types", []);\nint nb_visits = ctx._source.getOrDefault("nb_visits", 0);\nZonedDateTime last_visit_date = ZonedDateTime.parse(ctx._source.getOrDefault("last_visit_date", "0001-01-01T00:00:00Z"));\nString snapshot_id = ctx._source.getOrDefault("snapshot_id", "");\nZonedDateTime last_eventful_visit_date = ZonedDateTime.parse(ctx._source.getOrDefault("last_eventful_visit_date", "0001-01-01T00:00:00Z"));\nZonedDateTime last_revision_date = ZonedDateTime.parse(ctx._source.getOrDefault("last_revision_date", "0001-01-01T00:00:00Z"));\n\n// update origin document with new field values\nctx._source.putAll(params);\n\n// restore previous visit types after visit_types field overriding\nif (ctx._source.containsKey("visit_types")) {\n for (int i = 0; i < visit_types.length; ++i) {\n if (!ctx._source.visit_types.contains(visit_types[i])) {\n ctx._source.visit_types.add(visit_types[i]);\n }\n }\n}\n\n// Undo overwrite if incoming nb_visits is smaller\nif (ctx._source.containsKey("nb_visits")) {\n int incoming_nb_visits = ctx._source.getOrDefault("nb_visits", 0);\n if(incoming_nb_visits < nb_visits){\n ctx._source.nb_visits = nb_visits;\n }\n}\n\n// Undo overwrite if incoming last_visit_date is older\nif (ctx._source.containsKey("last_visit_date")) {\n ZonedDateTime incoming_last_visit_date = ZonedDateTime.parse(ctx._source.getOrDefault("last_visit_date", "0001-01-01T00:00:00Z"));\n int difference = incoming_last_visit_date.compareTo(last_visit_date); // returns -1, 0 or 1\n if(difference < 0){\n ctx._source.last_visit_date = last_visit_date;\n }\n}\n\n// Undo update of last_eventful_date and snapshot_id if\n// snapshot_id hasn\'t changed OR incoming_last_eventful_visit_date is older\nif (ctx._source.containsKey("snapshot_id")) {\n String incoming_snapshot_id = ctx._source.getOrDefault("snapshot_id", "");\n ZonedDateTime incoming_last_eventful_visit_date = ZonedDateTime.parse(ctx._source.getOrDefault("last_eventful_visit_date", "0001-01-01T00:00:00Z"));\n int difference = incoming_last_eventful_visit_date.compareTo(last_eventful_visit_date); // returns -1, 0 or 1\n if(snapshot_id == incoming_snapshot_id || difference < 0){\n ctx._source.snapshot_id = snapshot_id;\n ctx._source.last_eventful_visit_date = last_eventful_visit_date;\n }\n}\n\n// Undo overwrite if incoming last_revision_date is older\nif (ctx._source.containsKey("last_revision_date")) {\n ZonedDateTime incoming_last_revision_date = ZonedDateTime.parse(ctx._source.getOrDefault("last_revision_date", "0001-01-01T00:00:00Z"));\n int difference = incoming_last_revision_date.compareTo(last_revision_date); // returns -1, 0 or 1\n if(difference < 0){\n ctx._source.last_revision_date = last_revision_date;\n }\n}\n', 'lang': 'painless', 'params': {'url': 'http://baz.foobar', 'has_visits': True, 'nb_visits': 1, 'snapshot_id': None, 'last_visit_date': '2021-06-17T10:58:15.629393+00:00', 'last_eventful_visit_date': '2021-06-17T10:58:15.629393+00:00', 'last_revision_date': None}}}}}])
.tox/py3/lib/python3.7/site-packages/elasticsearch/helpers/actions.py:187: BulkIndexError
TEST RESULT
TEST RESULT
- Run At
- Jun 17 2021, 12:59 PM