Page MenuHomeSoftware Heritage

[swh-search] Document version conflict during parallel indexation
Closed, ResolvedPublic

Description

[discovered during the reindexation for T3433]

if several journal clients try to index at the same time 2 visits for the same origin, a conflict can occur:
For example:

'reason': '[1be714e9031338f8bf5f2b9b8d76d1c6499116f1]: version conflict, required seqNo [17673068], primary term [1]. current document has seqNo [17687322] and primary term [1]

The full stack trace:

Traceback (most recent call last):
  File "/usr/bin/swh", line 11, in <module>
    load_entry_point('swh.core==0.14.5', 'console_scripts', 'swh')()
  File "/usr/lib/python3/dist-packages/swh/core/cli/__init__.py", line 185, in main
    return swh(auto_envvar_prefix="SWH")
  File "/usr/lib/python3/dist-packages/click/core.py", line 764, in __call__
    return self.main(*args, **kwargs)
  File "/usr/lib/python3/dist-packages/click/core.py", line 717, in main
    rv = self.invoke(ctx)
  File "/usr/lib/python3/dist-packages/click/core.py", line 1137, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/usr/lib/python3/dist-packages/click/core.py", line 1137, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/usr/lib/python3/dist-packages/click/core.py", line 1137, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/usr/lib/python3/dist-packages/click/core.py", line 956, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/usr/lib/python3/dist-packages/click/core.py", line 555, in invoke
    return callback(*args, **kwargs)
  File "/usr/lib/python3/dist-packages/click/decorators.py", line 17, in new_func
    return f(get_current_context(), *args, **kwargs)
  File "/usr/lib/python3/dist-packages/swh/search/cli.py", line 143, in journal_client_objects
    nb_messages = client.process(worker_fn)
  File "/usr/lib/python3/dist-packages/swh/journal/client.py", line 265, in process
    batch_processed, at_eof = self.handle_messages(messages, worker_fn)
  File "/usr/lib/python3/dist-packages/swh/journal/client.py", line 292, in handle_messages
    worker_fn(dict(objects))
  File "/usr/lib/python3/dist-packages/swh/search/journal_client.py", line 78, in process_journal_objects
    process_origin_visit_statuses(messages["origin_visit_status"], search, storage)
  File "/usr/lib/python3/dist-packages/swh/search/journal_client.py", line 120, in process_origin_visit_statuses
    search.origin_update(processed_visit_statuses)
  File "/usr/lib/python3/dist-packages/swh/core/api/__init__.py", line 181, in meth_
    return self.post(meth._endpoint_path, post_data)
  File "/usr/lib/python3/dist-packages/swh/core/api/__init__.py", line 278, in post
    return self._decode_response(response)
  File "/usr/lib/python3/dist-packages/swh/core/api/__init__.py", line 354, in _decode_response
    self.raise_for_status(response)
  File "/usr/lib/python3/dist-packages/swh/core/api/__init__.py", line 344, in raise_for_status
    raise exception from None
swh.core.api.RemoteException: <RemoteException 500 BulkIndexError: ['1 document(s) failed to index.', [{'update': {'_index': 'origin-v0.11', '_type': '_doc', '_id': 'd4fd7b8d6bfb3623bdb49ea42b00adaed683d55f', 'status': 409, 'error': {'type': 'version_conflict_engine_exception', 'reason': '[d4fd7b8d6bfb3623bdb49ea42b00adaed683d55f]: version conflict, required seqNo [18388279], primary term [1]. current document has seqNo
[18455317] and primary term [1]', 'index_uuid': 'XOUR_jKcTtWKjlPk_8EAlA', 'shard': '11', 'index': 'origin-v0.11'}, 'data': {'scripted_upsert': True, 'upsert': {'url': 'https://www.npmjs.com/package/generator-patternslib', 'has_visits': True, 'visit_types': ['npm'], 'nb_visits': 175, 'snapshot_id': '5d23890e61f19de25c94fa307a79d237a45ed140', 'last_visit_date': '2021-02-26T18:03:15.067357+00:00', 'last_eventful_visit_date': '2021-02-26T18:03:15.067357+00:00', 'sha1': 'd4fd7b8d6bfb3623bdb49ea42b00adaed683d55f'}, 'script': {'source': '\n// utility function to get and parse date\nZonedDateTime getDate(def ctx, String date_field) {\n    String default_date = "0001-01-01T00:00:00Z";\n    String date = ctx._source.getOrDefault(date_field, default_date);\n    return ZonedDateTime.parse(date);\n}\n\n// backup current visit_types field value\nList visit_types = ctx._source.getOrDefault("visit_types", []);\nint nb_visits = ctx._source.getOrDefault("nb_visits", 0);\n\nZonedDateTime last_visit_date = getDate(ctx, "last_visit_date");\n\nString snapshot_id = ctx._source.getOrDefault("snapshot_id", "");\nZonedDateTime last_eventful_visit_date =\n    getDate(ctx, "last_eventful_visit_date");\nZonedDateTime last_revision_date = getDate(ctx, "last_revision_date");\nZonedDateTime last_release_date = getDate(ctx, "last_release_date");\n\n// update origin document with new field values\nctx._source.putAll(params);\n\n// restore previous visit types after visit_types field overriding\nif (ctx._source.containsKey("visit_types")) {\n    for (int i = 0; i < visit_types.length; ++i) {\n        if (!ctx._source.visit_types.contains(visit_types[i])) {\n            ctx._source.visit_types.add(visit_types[i]);\n        }\n    }\n}\n\n// Undo overwrite if incoming nb_visits is smaller\nif (ctx._source.containsKey("nb_visits")) {\n    int incoming_nb_visits = ctx._source.getOrDefault("nb_visits", 0);\n    if(incoming_nb_visits < nb_visits){\n        ctx._source.nb_visits = nb_visits;\n    }\n}\n\n// Undo overwrite if incoming last_visit_date is older\nif (ctx._source.containsKey("last_visit_date")) {\n    ZonedDateTime incoming_last_visit_date = getDate(ctx, "last_visit_date");\n    int difference =\n        // returns -1, 0 or 1\n        incoming_last_visit_date.compareTo(last_visit_date);\n    if(difference < 0){\n        ctx._source.last_visit_date = last_visit_date;\n    }\n}\n\n// Undo update of last_eventful_date and snapshot_id if\n// snapshot_id hasn\'t changed OR incoming_last_eventful_visit_date is older\nif (ctx._source.containsKey("snapshot_id")) {\n    String incoming_snapshot_id = ctx._source.getOrDefault("snapshot_id", "");\n    ZonedDateTime incoming_last_eventful_visit_date =\n        getDate(ctx, "last_eventful_visit_date");\n    int difference =\n        // returns -1, 0 or 1\n        incoming_last_eventful_visit_date.compareTo(last_eventful_visit_date);\n    if(snapshot_id == incoming_snapshot_id || difference < 0){\n        ctx._source.snapshot_id = snapshot_id;\n        ctx._source.last_eventful_visit_date = last_eventful_visit_date;\n    }\n}\n\n// Undo overwrite if incoming last_revision_date is older\nif (ctx._source.containsKey("last_revision_date")) {\n    ZonedDateTime incoming_last_revision_date =\n        getDate(ctx, "last_revision_date");\n    int difference =\n        // returns -1, 0 or 1\n        incoming_last_revision_date.compareTo(last_revision_date);\n    if(difference < 0){\n        ctx._source.last_revision_date = last_revision_date;\n    }\n}\n\n// Undo overwrite if incoming last_release_date is older\nif (ctx._source.containsKey("last_release_date")) {\n    ZonedDateTime incoming_last_release_date =\n        getDate(ctx, "last_release_date");\n    // returns -1, 0 or 1\n    int difference = incoming_last_release_date.compareTo(last_release_date);\n    if(difference < 0){\n        ctx._source.last_release_date = last_release_date;\n    }\n}\n', 'lang': 'painless', 'params': {'url': 'https://www.npmjs.com/package/generator-patternslib', 'has_visits': True, 'visit_types': ['npm'], 'nb_visits': 175, 'snapshot_id': '5d23890e61f19de25c94fa307a79d237a45ed140', 'last_visit_date': '2021-02-26T18:03:15.067357+00:00', 'last_eventful_visit_date': '2021-02-26T18:03:15.067357+00:00'}}}}}]]>

as @olasd suggested, it could be great to support an automatic replay when such conflicts happened.

Related Objects