Page MenuHomeSoftware Heritage

Commit kafka messages wich offset has reach the high limit
AbandonedPublic

Authored by douardda on Sep 9 2021, 5:59 PM.

Details

Reviewers
vlorentz
Group Reviewers
Reviewers
Summary

this is necessary to ensure these messages are committed in kafka,
otherwise, since the (considered) empty partition is unsubscribed from,
it never gets committed in JournalClient.handle_messages() (since this
later only commit assigned partitions).

WARNING: doing this, we DO commit a message BEFORE actually handling it. Since this later operation may fail, said message can be de facto lost...

[the second revision in this diff handles that later aspect; not sure
I want to stash these 2 revisions or not...]

Ensure offset are committed ony after worker_fn is executed without tb

this requires to overload the JournalClient.handle_messages() method in
JournalClientOffsetRanges to make sure "pending" messages are
committed after the proper execution of worker_fn.

Doing so, we can both unsubscribe from "oef" partitions on the fly (with
"oef" meaning when the partition has been consumed up to the high
watermark offset at the beginning of the export), and commit ALL offsets
that needs to be but only after proper execution of the worker_fn
callback.

This should guarantee proper and consistent behavior (famous last
word...).

Depends on D6234

Diff Detail

Event Timeline

Build is green

Patch application report for D6235 (id=22557)

Could not rebase; Attempt merge onto 002ee70b99...

Updating 002ee70..cf209a9
Fast-forward
 swh/dataset/cli.py              |  9 ++++-
 swh/dataset/journalprocessor.py | 87 ++++++++++++++++++++++++++++-------------
 2 files changed, 67 insertions(+), 29 deletions(-)
Changes applied before test
commit cf209a99638ecfb169d52be9cfd14b56fea12bc2
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Sep 9 17:18:46 2021 +0200

    Ensure offset are committed ony after worker_fn is executed without tb
    
    this requires to overload the `JournalClient.handle_messages()` method in
    `JournalClientOffsetRanges` to make sure "pending" messages are
    committed after the proper execution of `worker_fn`.
    
    Doing so, we can both unsubscribe from "oef" partitions on the fly (with
    "oef" meaning when the partition has been consumed up to the high
    watermark offset at the beginning of the export), and commit ALL offsets
    that needs to be but only after proper execution of the `worker_fn`
    callback.
    
    This should guarantee proper and consistent behavior (famous last
    word...).

commit ea71b235b9028ed806828d3a7af4e886a1110f79
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Sep 9 14:24:32 2021 +0200

    Commit kafka messages wich offset has reach the high limit
    
    this is necessary to ensure these messages are committed in kafka,
    otherwise, since the (considered) empty partition is unsubscribed from,
    it never gets committed in `JournalClient.handle_messages()` (since this
    later only commit assigned partitions).
    
    WARNING: doing this, we DO commit a message BEFORE actually handling it.
    Since this later operation may fail, said message can be de facto
    lost...

commit 4d70697c63955e90efeecbd90f1da7994c89cdab
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Sep 9 14:20:26 2021 +0200

    Add a --reset option to export_graph cli tool
    
    allows to enforce consumong kafka topics from the beginning rather than
    stored offsets.

commit 3f331e1823e3329085f01f073fe8a6bd6f43473a
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Sep 9 14:30:25 2021 +0200

    Reduce the size of the progress bar
    
    so we get a chance to actually have a visible progress bar:
    
    - reduce the label size (shorter desc),
    - use a single 'workers' postfix (like "workers=n/m").

commit 48d246f178851dfe06e47b6c12555fcd095f5641
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Sep 9 11:54:15 2021 +0200

    Make sure the progress bar for the export reaches 100%
    
    - ensure the last offset is sent to the queue,
    - fix the computation of the progress value (off-by-one).

commit 3a2f5076dcbf791d1ef43982b70551f048ee7c3e
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Sep 9 11:47:57 2021 +0200

    Explicitly close the temporary kafka consumer in `get_offsets`
    
    used to retrieve partitions and lo/hi offets.
    
    It could cause some dead-lock/long timeout kind of situation sometime
    (especially in the developper docker environment).

commit 45126fd621e8b75c592d7c6cd3d8d1337f95c97e
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Sep 9 11:39:44 2021 +0200

    Simplify the lo/high partition offset computation
    
    The computation of lo and high offsets used to be done in 2 steps:
    - first get the watermak offsets (thus the absolute min and max offsets
      of the whole partition)
    - then, as a "hook" in `process()`, retrieve the last committed offset
      for the partition and "push" these current offsets in the progress
      queue.
    
    Instead, this simplifies a bit this process by quering the committed
    offsets while computing the hi/low offsets.

commit e47a3db1287b3f6ada32c3afb3270ef0947a7659
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Sep 9 14:22:37 2021 +0200

    Use proper signature for JournalClientOffsetRanges.process()

See https://jenkins.softwareheritage.org/job/DDATASET/job/tests-on-diff/7/ for more details.

vlorentz added a subscriber: vlorentz.

There's a bunch of typos in your commit/diff msg: "wich", "oef", "ony", "ALL offsets that needs to be", "stash" -> "squash"


this is necessary to ensure these messages are committed in kafka,
otherwise, since the (considered) empty partition is unsubscribed from,
it never gets committed in JournalClient.handle_messages() (since this
later only commit assigned partitions).

Why is this a problem?

swh/dataset/journalprocessor.py
91–92

needs to be updated

This revision is now accepted and ready to land.Sep 10 2021, 1:24 PM

There's a bunch of typos in your commit/diff msg: "wich", "oef", "ony", "ALL offsets that needs to be", "stash" -> "squash"


this is necessary to ensure these messages are committed in kafka,
otherwise, since the (considered) empty partition is unsubscribed from,
it never gets committed in JournalClient.handle_messages() (since this
later only commit assigned partitions).

Why is this a problem?

humm good question. I guess the main reason I did this is I was puzzled while testing/playing with swh dataset export on a small docker session: lags reported by cmak was completely inconsistent, so I dug to understand and fix the "problem".

Which indeed in the end is not that much of a problem: a few objects are replayed, but should be ignored since they are listed in the local leveldb "cache".

But since this can be confusing, it should be either "fixed" (this diff) or properly documented somehow...

lags reported by cmak was completely inconsistent

only because you have a small dataset, right?
With a larger one, the last batch of each partition should have a negligeable size.

But fair enough

lags reported by cmak was completely inconsistent

only because you have a small dataset, right?
With a larger one, the last batch of each partition should have a negligeable size.

yep, with a default 200 batch size and 16 partitions, this is very obvious when dealing with a small test database...

But fair enough

swh/dataset/journalprocessor.py
91–92

IMHO, this is still a valid description of what the method does, don't you think?

rebase, fix typos, squash revisions

Build is green

Patch application report for D6235 (id=22583)

Could not rebase; Attempt merge onto 002ee70b99...

Updating 002ee70..eff0e0b
Fast-forward
 swh/dataset/cli.py              | 12 ++++-
 swh/dataset/journalprocessor.py | 97 +++++++++++++++++++++++++++++------------
 2 files changed, 79 insertions(+), 30 deletions(-)
Changes applied before test
commit eff0e0b0124184112ad29c9b406b8f65dadf8ac7
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Sep 9 14:24:32 2021 +0200

    Commit kafka messages which offset has reach the high limit
    
    this is necessary to ensure these messages are committed in kafka,
    otherwise, since the (considered) empty partition is unsubscribed from,
    it never gets committed in `JournalClient.handle_messages()` (since this
    later only commit assigned partitions).
    
    Ensure offset are committed only after worker_fn is executed without
    error.
    
    This requires to overload the `JournalClient.handle_messages()` method in
    `JournalClientOffsetRanges` to make sure "pending" messages are
    committed after the proper execution of `worker_fn`.
    
    Doing so, we can both unsubscribe from "eof" partitions on the fly (with
    "eof" meaning when the partition has been consumed up to the high
    watermark offset at the beginning of the export), and commit ALL offsets
    that need to be, but only after proper execution of the `worker_fn`
    callback.
    
    This should guarantee proper and consistent behavior (famous last
    word...).

commit 40c24464da3a2af5918fc309d282e72b08b10c60
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Sep 9 14:20:26 2021 +0200

    Add a --reset option to export_graph cli tool
    
    allows to enforce consumong kafka topics from the beginning rather than
    stored offsets.

commit 5881ae06f636a74e7fb0addca04127bfe18b687d
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Sep 9 14:30:25 2021 +0200

    Reduce the size of the progress bar
    
    so we get a chance to actually have a visible progress bar:
    
    - reduce the label size (shorter desc),
    - use a single 'workers' postfix (like "workers=n/m").

commit 47713ee38c9498a0548535e5b8361d8158ee3e09
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Sep 9 11:54:15 2021 +0200

    Make sure the progress bar for the export reaches 100%
    
    - ensure the last offset is sent to the queue,
    - fix the computation of the progress value (off-by-one).

commit d07b2a632256da4e7778bf7b1f4a02acd03f9ca0
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Sep 9 11:47:57 2021 +0200

    Explicitly close the temporary kafka consumer in `get_offsets`
    
    used to retrieve partitions and lo/hi offets.
    
    It could cause some dead-lock/long timeout kind of situation sometime
    (especially in the developper docker environment).

commit 2760e322af7c5862e0329198671b49d2755491ef
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Sep 9 11:39:44 2021 +0200

    Simplify the lo/high partition offset computation
    
    The computation of lo and high offsets used to be done in 2 steps:
    - first get the watermak offsets (thus the absolute min and max offsets
      of the whole partition)
    - then, as a "hook" in `process()`, retrieve the last committed offset
      for the partition and "push" these current offsets in the progress
      queue.
    
    Instead, this simplifies a bit this process by quering the committed
    offsets while computing the hi/low offsets.

commit e47a3db1287b3f6ada32c3afb3270ef0947a7659
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Sep 9 14:22:37 2021 +0200

    Use proper signature for JournalClientOffsetRanges.process()

See https://jenkins.softwareheritage.org/job/DDATASET/job/tests-on-diff/11/ for more details.

swh/dataset/journalprocessor.py
91–92

Yes, you're right. handle_messages needs a new one, though! (summarize the motivation in this diff's description)

Rebase (remove D6234 from dependencies)

but extracted a small part of D6234 in a dedicated git revision (included in this diff).

Build is green

Patch application report for D6235 (id=22605)

Could not rebase; Attempt merge onto 002ee70b99...

Updating 002ee70..c8db769
Fast-forward
 swh/dataset/journalprocessor.py | 88 +++++++++++++++++++++++++++--------------
 1 file changed, 59 insertions(+), 29 deletions(-)
Changes applied before test
commit c8db7698bc82f1472729fe4148803004b6c93b93
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Sep 9 14:24:32 2021 +0200

    Commit kafka messages which offset has reach the high limit
    
    this is necessary to ensure these messages are committed in kafka,
    otherwise, since the (considered) empty partition is unsubscribed from,
    it never gets committed in `JournalClient.handle_messages()` (since this
    later only commit assigned partitions).
    
    Ensure offset are committed only after worker_fn is executed without
    error.
    
    This requires to overload the `JournalClient.handle_messages()` method in
    `JournalClientOffsetRanges` to make sure "pending" messages are
    committed after the proper execution of `worker_fn`.
    
    Doing so, we can both unsubscribe from "eof" partitions on the fly (with
    "eof" meaning when the partition has been consumed up to the high
    watermark offset at the beginning of the export), and commit ALL offsets
    that need to be, but only after proper execution of the `worker_fn`
    callback.
    
    This should guarantee proper and consistent behavior (famous last
    word...).

commit bd888a6c75d216bdef4653d9cbfdf7147a32ee05
Author: David Douard <david.douard@sdfa3.org>
Date:   Mon Sep 13 15:04:51 2021 +0200

    Add a JournalClientOffsetRanges.unsubscribe() method
    
    to make the code a bit clearer.

commit 5881ae06f636a74e7fb0addca04127bfe18b687d
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Sep 9 14:30:25 2021 +0200

    Reduce the size of the progress bar
    
    so we get a chance to actually have a visible progress bar:
    
    - reduce the label size (shorter desc),
    - use a single 'workers' postfix (like "workers=n/m").

commit 47713ee38c9498a0548535e5b8361d8158ee3e09
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Sep 9 11:54:15 2021 +0200

    Make sure the progress bar for the export reaches 100%
    
    - ensure the last offset is sent to the queue,
    - fix the computation of the progress value (off-by-one).

commit d07b2a632256da4e7778bf7b1f4a02acd03f9ca0
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Sep 9 11:47:57 2021 +0200

    Explicitly close the temporary kafka consumer in `get_offsets`
    
    used to retrieve partitions and lo/hi offets.
    
    It could cause some dead-lock/long timeout kind of situation sometime
    (especially in the developper docker environment).

commit 2760e322af7c5862e0329198671b49d2755491ef
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Sep 9 11:39:44 2021 +0200

    Simplify the lo/high partition offset computation
    
    The computation of lo and high offsets used to be done in 2 steps:
    - first get the watermak offsets (thus the absolute min and max offsets
      of the whole partition)
    - then, as a "hook" in `process()`, retrieve the last committed offset
      for the partition and "push" these current offsets in the progress
      queue.
    
    Instead, this simplifies a bit this process by quering the committed
    offsets while computing the hi/low offsets.

commit e47a3db1287b3f6ada32c3afb3270ef0947a7659
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Sep 9 14:22:37 2021 +0200

    Use proper signature for JournalClientOffsetRanges.process()

See https://jenkins.softwareheritage.org/job/DDATASET/job/tests-on-diff/12/ for more details.

in favor of D6247 because phab/arcanist won't let me update this later any more (sorry)