Page MenuHomeSoftware Heritage

Commit kafka messages which offset has reach the high limit
ClosedPublic

Authored by douardda on Sep 13 2021, 4:02 PM.

Details

Summary

this is necessary to ensure these messages are committed in kafka,
otherwise, since the (considered) empty partition is unsubscribed from,
it never gets committed in JournalClient.handle_messages() (since this
later only commit assigned partitions).

Ensure offset are committed only after worker_fn is executed without
error.

This requires to overload the JournalClient.handle_messages() method in
JournalClientOffsetRanges to make sure "pending" messages are
committed after the proper execution of worker_fn.

Doing so, we can both unsubscribe from "eof" partitions on the fly (with
"eof" meaning when the partition has been consumed up to the high
watermark offset at the beginning of the export), and commit ALL offsets
that need to be, but only after proper execution of the worker_fn
callback.

This should guarantee proper and consistent behavior (famous last
word...).

Depends on D6246

(used to be D6235 but phab is freaking this later out for some reason...)

Diff Detail

Event Timeline

Build is green

Patch application report for D6247 (id=22622)

Could not rebase; Attempt merge onto 358d84938d...

Updating 358d849..fb6c2e7
Fast-forward
 swh/dataset/journalprocessor.py | 47 +++++++++++++++++++++++++++++++++--------
 1 file changed, 38 insertions(+), 9 deletions(-)
Changes applied before test
commit fb6c2e7246c58ff91c1cc75613635447c3397785
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Sep 9 14:24:32 2021 +0200

    Commit kafka messages which offset has reach the high limit
    
    this is necessary to ensure these messages are committed in kafka,
    otherwise, since the (considered) empty partition is unsubscribed from,
    it never gets committed in `JournalClient.handle_messages()` (since this
    later only commit assigned partitions).
    
    Ensure offset are committed only after worker_fn is executed without
    error.
    
    This requires to overload the `JournalClient.handle_messages()` method in
    `JournalClientOffsetRanges` to make sure "pending" messages are
    committed after the proper execution of `worker_fn`.
    
    Doing so, we can both unsubscribe from "eof" partitions on the fly (with
    "eof" meaning when the partition has been consumed up to the high
    watermark offset at the beginning of the export), and commit ALL offsets
    that need to be, but only after proper execution of the `worker_fn`
    callback.
    
    This should guarantee proper and consistent behavior (famous last
    word...).

commit a507024c5ff4989720044aa3fc42ebbc341659e0
Author: David Douard <david.douard@sdfa3.org>
Date:   Mon Sep 13 15:04:51 2021 +0200

    Add a JournalClientOffsetRanges.unsubscribe() method
    
    to make the code a bit clearer.

See https://jenkins.softwareheritage.org/job/DDATASET/job/tests-on-diff/17/ for more details.

This revision is now accepted and ready to land.Sep 13 2021, 4:05 PM