Page MenuHomeSoftware Heritage

Add a --reset option to export_graph cli tool
AbandonedPublic

Authored by douardda on Thu, Sep 9, 5:57 PM.

Details

Reviewers
None
Group Reviewers
Reviewers
Summary

allows to enforce consumong kafka topics from the beginning rather than
stored offsets.

Depends on D6233

Diff Detail

Event Timeline

Build is green

Patch application report for D6234 (id=22556)

Could not rebase; Attempt merge onto 002ee70b99...

Updating 002ee70..4d70697
Fast-forward
 swh/dataset/cli.py              |  9 +++++-
 swh/dataset/journalprocessor.py | 66 +++++++++++++++++++++++++----------------
 2 files changed, 49 insertions(+), 26 deletions(-)
Changes applied before test
commit 4d70697c63955e90efeecbd90f1da7994c89cdab
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Sep 9 14:20:26 2021 +0200

    Add a --reset option to export_graph cli tool
    
    allows to enforce consumong kafka topics from the beginning rather than
    stored offsets.

commit 3f331e1823e3329085f01f073fe8a6bd6f43473a
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Sep 9 14:30:25 2021 +0200

    Reduce the size of the progress bar
    
    so we get a chance to actually have a visible progress bar:
    
    - reduce the label size (shorter desc),
    - use a single 'workers' postfix (like "workers=n/m").

commit 48d246f178851dfe06e47b6c12555fcd095f5641
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Sep 9 11:54:15 2021 +0200

    Make sure the progress bar for the export reaches 100%
    
    - ensure the last offset is sent to the queue,
    - fix the computation of the progress value (off-by-one).

commit 3a2f5076dcbf791d1ef43982b70551f048ee7c3e
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Sep 9 11:47:57 2021 +0200

    Explicitly close the temporary kafka consumer in `get_offsets`
    
    used to retrieve partitions and lo/hi offets.
    
    It could cause some dead-lock/long timeout kind of situation sometime
    (especially in the developper docker environment).

commit 45126fd621e8b75c592d7c6cd3d8d1337f95c97e
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Sep 9 11:39:44 2021 +0200

    Simplify the lo/high partition offset computation
    
    The computation of lo and high offsets used to be done in 2 steps:
    - first get the watermak offsets (thus the absolute min and max offsets
      of the whole partition)
    - then, as a "hook" in `process()`, retrieve the last committed offset
      for the partition and "push" these current offsets in the progress
      queue.
    
    Instead, this simplifies a bit this process by quering the committed
    offsets while computing the hi/low offsets.

commit e47a3db1287b3f6ada32c3afb3270ef0947a7659
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Sep 9 14:22:37 2021 +0200

    Use proper signature for JournalClientOffsetRanges.process()

See https://jenkins.softwareheritage.org/job/DDATASET/job/tests-on-diff/6/ for more details.

Can we keep the reset stuff outside the journalprocessor.py logic? It's already complex enough

swh/dataset/cli.py
72

copy-pasted too fast

Can we keep the reset stuff outside the journalprocessor.py logic? It's already complex enough

I'll give it a try

rebase and fix --reset help messsage

Build is green

Patch application report for D6234 (id=22582)

Could not rebase; Attempt merge onto 002ee70b99...

Updating 002ee70..40c2446
Fast-forward
 swh/dataset/cli.py              | 12 ++++++-
 swh/dataset/journalprocessor.py | 76 +++++++++++++++++++++++++++--------------
 2 files changed, 61 insertions(+), 27 deletions(-)
Changes applied before test
commit 40c24464da3a2af5918fc309d282e72b08b10c60
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Sep 9 14:20:26 2021 +0200

    Add a --reset option to export_graph cli tool
    
    allows to enforce consumong kafka topics from the beginning rather than
    stored offsets.

commit 5881ae06f636a74e7fb0addca04127bfe18b687d
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Sep 9 14:30:25 2021 +0200

    Reduce the size of the progress bar
    
    so we get a chance to actually have a visible progress bar:
    
    - reduce the label size (shorter desc),
    - use a single 'workers' postfix (like "workers=n/m").

commit 47713ee38c9498a0548535e5b8361d8158ee3e09
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Sep 9 11:54:15 2021 +0200

    Make sure the progress bar for the export reaches 100%
    
    - ensure the last offset is sent to the queue,
    - fix the computation of the progress value (off-by-one).

commit d07b2a632256da4e7778bf7b1f4a02acd03f9ca0
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Sep 9 11:47:57 2021 +0200

    Explicitly close the temporary kafka consumer in `get_offsets`
    
    used to retrieve partitions and lo/hi offets.
    
    It could cause some dead-lock/long timeout kind of situation sometime
    (especially in the developper docker environment).

commit 2760e322af7c5862e0329198671b49d2755491ef
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Sep 9 11:39:44 2021 +0200

    Simplify the lo/high partition offset computation
    
    The computation of lo and high offsets used to be done in 2 steps:
    - first get the watermak offsets (thus the absolute min and max offsets
      of the whole partition)
    - then, as a "hook" in `process()`, retrieve the last committed offset
      for the partition and "push" these current offsets in the progress
      queue.
    
    Instead, this simplifies a bit this process by quering the committed
    offsets while computing the hi/low offsets.

commit e47a3db1287b3f6ada32c3afb3270ef0947a7659
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Sep 9 14:22:37 2021 +0200

    Use proper signature for JournalClientOffsetRanges.process()

See https://jenkins.softwareheritage.org/job/DDATASET/job/tests-on-diff/10/ for more details.

Can we keep the reset stuff outside the journalprocessor.py logic? It's already complex enough

I'll give it a try

Actually I am not sure I can easily do that: in order for a consumer to start consuming from the beginning, it must declare so during the assignment of partitions (then commit after having started consuming).

There is no Consumer API (that I found) allowing to "reset stored offsets" then start consuming from there.

(see https://github.com/confluentinc/confluent-kafka-dotnet/issues/995#issuecomment-521320691 for example)

So either I kill this diff or it stays "intricate" with the setup of the consumer (so the whole journalprocessor.py)

Note: this feature is mainly useful for testing purpose IMHO, so I suppose it's not that critical to keep it, I just find it handy when "playing" with swh dataset export

Build is green

Patch application report for D6234 (id=22606)

Could not rebase; Attempt merge onto 002ee70b99...

Updating 002ee70..ee72213
Fast-forward
 swh/dataset/cli.py              | 12 ++++-
 swh/dataset/journalprocessor.py | 97 +++++++++++++++++++++++++++++------------
 2 files changed, 79 insertions(+), 30 deletions(-)
Changes applied before test
commit ee72213c8b1d7ff31b3eab71fac62368840cd406
Author: David Douard <david.douard@sdfa3.org>
Date:   Mon Sep 13 15:06:53 2021 +0200

    Add a --reset option to export_graph cli tool
    
    allows to enforce consuming kafka topics from the beginning rather than
    current position for the consumer group (aka stored offsets).

commit c8db7698bc82f1472729fe4148803004b6c93b93
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Sep 9 14:24:32 2021 +0200

    Commit kafka messages which offset has reach the high limit
    
    this is necessary to ensure these messages are committed in kafka,
    otherwise, since the (considered) empty partition is unsubscribed from,
    it never gets committed in `JournalClient.handle_messages()` (since this
    later only commit assigned partitions).
    
    Ensure offset are committed only after worker_fn is executed without
    error.
    
    This requires to overload the `JournalClient.handle_messages()` method in
    `JournalClientOffsetRanges` to make sure "pending" messages are
    committed after the proper execution of `worker_fn`.
    
    Doing so, we can both unsubscribe from "eof" partitions on the fly (with
    "eof" meaning when the partition has been consumed up to the high
    watermark offset at the beginning of the export), and commit ALL offsets
    that need to be, but only after proper execution of the `worker_fn`
    callback.
    
    This should guarantee proper and consistent behavior (famous last
    word...).

commit bd888a6c75d216bdef4653d9cbfdf7147a32ee05
Author: David Douard <david.douard@sdfa3.org>
Date:   Mon Sep 13 15:04:51 2021 +0200

    Add a JournalClientOffsetRanges.unsubscribe() method
    
    to make the code a bit clearer.

commit 5881ae06f636a74e7fb0addca04127bfe18b687d
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Sep 9 14:30:25 2021 +0200

    Reduce the size of the progress bar
    
    so we get a chance to actually have a visible progress bar:
    
    - reduce the label size (shorter desc),
    - use a single 'workers' postfix (like "workers=n/m").

commit 47713ee38c9498a0548535e5b8361d8158ee3e09
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Sep 9 11:54:15 2021 +0200

    Make sure the progress bar for the export reaches 100%
    
    - ensure the last offset is sent to the queue,
    - fix the computation of the progress value (off-by-one).

commit d07b2a632256da4e7778bf7b1f4a02acd03f9ca0
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Sep 9 11:47:57 2021 +0200

    Explicitly close the temporary kafka consumer in `get_offsets`
    
    used to retrieve partitions and lo/hi offets.
    
    It could cause some dead-lock/long timeout kind of situation sometime
    (especially in the developper docker environment).

commit 2760e322af7c5862e0329198671b49d2755491ef
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Sep 9 11:39:44 2021 +0200

    Simplify the lo/high partition offset computation
    
    The computation of lo and high offsets used to be done in 2 steps:
    - first get the watermak offsets (thus the absolute min and max offsets
      of the whole partition)
    - then, as a "hook" in `process()`, retrieve the last committed offset
      for the partition and "push" these current offsets in the progress
      queue.
    
    Instead, this simplifies a bit this process by quering the committed
    offsets while computing the hi/low offsets.

commit e47a3db1287b3f6ada32c3afb3270ef0947a7659
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Sep 9 14:22:37 2021 +0200

    Use proper signature for JournalClientOffsetRanges.process()

See https://jenkins.softwareheritage.org/job/DDATASET/job/tests-on-diff/13/ for more details.

So either I kill this diff or it stays "intricate" with the setup of the consumer (so the whole journalprocessor.py)

Note: this feature is mainly useful for testing purpose IMHO, so I suppose it's not that critical to keep it, I just find it handy when "playing" with swh dataset export

Meh. How much easier does it make testing, compared to using Kafka's CLI (from the linked comment)?

Build is green

Patch application report for D6234 (id=22623)

Could not rebase; Attempt merge onto 358d84938d...

Updating 358d849..b039bc4
Fast-forward
 swh/dataset/cli.py              | 12 ++++++++-
 swh/dataset/journalprocessor.py | 58 ++++++++++++++++++++++++++++++++++-------
 2 files changed, 59 insertions(+), 11 deletions(-)
Changes applied before test
commit b039bc4fb55170f600fbaf77789e30db8da8df4e
Author: David Douard <david.douard@sdfa3.org>
Date:   Mon Sep 13 15:06:53 2021 +0200

    Add a --reset option to export_graph cli tool
    
    allows to enforce consuming kafka topics from the beginning rather than
    current position for the consumer group (aka stored offsets).

commit fb6c2e7246c58ff91c1cc75613635447c3397785
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Sep 9 14:24:32 2021 +0200

    Commit kafka messages which offset has reach the high limit
    
    this is necessary to ensure these messages are committed in kafka,
    otherwise, since the (considered) empty partition is unsubscribed from,
    it never gets committed in `JournalClient.handle_messages()` (since this
    later only commit assigned partitions).
    
    Ensure offset are committed only after worker_fn is executed without
    error.
    
    This requires to overload the `JournalClient.handle_messages()` method in
    `JournalClientOffsetRanges` to make sure "pending" messages are
    committed after the proper execution of `worker_fn`.
    
    Doing so, we can both unsubscribe from "eof" partitions on the fly (with
    "eof" meaning when the partition has been consumed up to the high
    watermark offset at the beginning of the export), and commit ALL offsets
    that need to be, but only after proper execution of the `worker_fn`
    callback.
    
    This should guarantee proper and consistent behavior (famous last
    word...).

commit a507024c5ff4989720044aa3fc42ebbc341659e0
Author: David Douard <david.douard@sdfa3.org>
Date:   Mon Sep 13 15:04:51 2021 +0200

    Add a JournalClientOffsetRanges.unsubscribe() method
    
    to make the code a bit clearer.

See https://jenkins.softwareheritage.org/job/DDATASET/job/tests-on-diff/18/ for more details.

So either I kill this diff or it stays "intricate" with the setup of the consumer (so the whole journalprocessor.py)

Note: this feature is mainly useful for testing purpose IMHO, so I suppose it's not that critical to keep it, I just find it handy when "playing" with swh dataset export

Meh. How much easier does it make testing, compared to using Kafka's CLI (from the linked comment)?

Now I remember, that's one reason I did wrote this, in my docker env, I cannot execute this command (generates an error about a port being already in use...) but that's a valid remark indeed.
Let me check executing this command from a different container than kafka itself

You could also add a command in swh-dataset's entrypoint.sh that calls whatever Kafka's script does

You could also add a command in swh-dataset's entrypoint.sh that calls whatever Kafka's script does

well it works just fine when executed from another container:

docker-compose run kafka kafka-consumer-groups.sh --bootstrap-server kafka:9092 --group swh-dataset-export-test20 --topic swh.journal.objects.origin  --reset-offsets --to-earliest --execute 
Starting docker_zookeeper_1 ... done

GROUP                          TOPIC                          PARTITION  NEW-OFFSET     
swh-dataset-export-test20      swh.journal.objects.origin     4          0              
swh-dataset-export-test20      swh.journal.objects.origin     14         0              
swh-dataset-export-test20      swh.journal.objects.origin     9          0              
swh-dataset-export-test20      swh.journal.objects.origin     8          0              
swh-dataset-export-test20      swh.journal.objects.origin     10         0              
swh-dataset-export-test20      swh.journal.objects.origin     13         0              
swh-dataset-export-test20      swh.journal.objects.origin     12         0              
swh-dataset-export-test20      swh.journal.objects.origin     1          0              
swh-dataset-export-test20      swh.journal.objects.origin     11         0              
swh-dataset-export-test20      swh.journal.objects.origin     6          0              
swh-dataset-export-test20      swh.journal.objects.origin     3          0              
swh-dataset-export-test20      swh.journal.objects.origin     2          0              
swh-dataset-export-test20      swh.journal.objects.origin     15         0              
swh-dataset-export-test20      swh.journal.objects.origin     7          0              
swh-dataset-export-test20      swh.journal.objects.origin     5          0              
swh-dataset-export-test20      swh.journal.objects.origin     0          0

so let's forget about this diff (and maybe add some documentation somewhere)

It's not worth the trouble, and there is a better solution (server-side)