allows to enforce consumong kafka topics from the beginning rather than
stored offsets.
Depends on D6233
Differential D6234
Add a --reset option to export_graph cli tool douardda on Sep 9 2021, 5:57 PM. Authored by
Details
Diff Detail
Event TimelineComment Actions Build is green Patch application report for D6234 (id=22556)Could not rebase; Attempt merge onto 002ee70b99... Updating 002ee70..4d70697 Fast-forward swh/dataset/cli.py | 9 +++++- swh/dataset/journalprocessor.py | 66 +++++++++++++++++++++++++---------------- 2 files changed, 49 insertions(+), 26 deletions(-) Changes applied before testcommit 4d70697c63955e90efeecbd90f1da7994c89cdab Author: David Douard <david.douard@sdfa3.org> Date: Thu Sep 9 14:20:26 2021 +0200 Add a --reset option to export_graph cli tool allows to enforce consumong kafka topics from the beginning rather than stored offsets. commit 3f331e1823e3329085f01f073fe8a6bd6f43473a Author: David Douard <david.douard@sdfa3.org> Date: Thu Sep 9 14:30:25 2021 +0200 Reduce the size of the progress bar so we get a chance to actually have a visible progress bar: - reduce the label size (shorter desc), - use a single 'workers' postfix (like "workers=n/m"). commit 48d246f178851dfe06e47b6c12555fcd095f5641 Author: David Douard <david.douard@sdfa3.org> Date: Thu Sep 9 11:54:15 2021 +0200 Make sure the progress bar for the export reaches 100% - ensure the last offset is sent to the queue, - fix the computation of the progress value (off-by-one). commit 3a2f5076dcbf791d1ef43982b70551f048ee7c3e Author: David Douard <david.douard@sdfa3.org> Date: Thu Sep 9 11:47:57 2021 +0200 Explicitly close the temporary kafka consumer in `get_offsets` used to retrieve partitions and lo/hi offets. It could cause some dead-lock/long timeout kind of situation sometime (especially in the developper docker environment). commit 45126fd621e8b75c592d7c6cd3d8d1337f95c97e Author: David Douard <david.douard@sdfa3.org> Date: Thu Sep 9 11:39:44 2021 +0200 Simplify the lo/high partition offset computation The computation of lo and high offsets used to be done in 2 steps: - first get the watermak offsets (thus the absolute min and max offsets of the whole partition) - then, as a "hook" in `process()`, retrieve the last committed offset for the partition and "push" these current offsets in the progress queue. Instead, this simplifies a bit this process by quering the committed offsets while computing the hi/low offsets. commit e47a3db1287b3f6ada32c3afb3270ef0947a7659 Author: David Douard <david.douard@sdfa3.org> Date: Thu Sep 9 14:22:37 2021 +0200 Use proper signature for JournalClientOffsetRanges.process() See https://jenkins.softwareheritage.org/job/DDATASET/job/tests-on-diff/6/ for more details. Comment Actions Can we keep the reset stuff outside the journalprocessor.py logic? It's already complex enough
Comment Actions Build is green Patch application report for D6234 (id=22582)Could not rebase; Attempt merge onto 002ee70b99... Updating 002ee70..40c2446 Fast-forward swh/dataset/cli.py | 12 ++++++- swh/dataset/journalprocessor.py | 76 +++++++++++++++++++++++++++-------------- 2 files changed, 61 insertions(+), 27 deletions(-) Changes applied before testcommit 40c24464da3a2af5918fc309d282e72b08b10c60 Author: David Douard <david.douard@sdfa3.org> Date: Thu Sep 9 14:20:26 2021 +0200 Add a --reset option to export_graph cli tool allows to enforce consumong kafka topics from the beginning rather than stored offsets. commit 5881ae06f636a74e7fb0addca04127bfe18b687d Author: David Douard <david.douard@sdfa3.org> Date: Thu Sep 9 14:30:25 2021 +0200 Reduce the size of the progress bar so we get a chance to actually have a visible progress bar: - reduce the label size (shorter desc), - use a single 'workers' postfix (like "workers=n/m"). commit 47713ee38c9498a0548535e5b8361d8158ee3e09 Author: David Douard <david.douard@sdfa3.org> Date: Thu Sep 9 11:54:15 2021 +0200 Make sure the progress bar for the export reaches 100% - ensure the last offset is sent to the queue, - fix the computation of the progress value (off-by-one). commit d07b2a632256da4e7778bf7b1f4a02acd03f9ca0 Author: David Douard <david.douard@sdfa3.org> Date: Thu Sep 9 11:47:57 2021 +0200 Explicitly close the temporary kafka consumer in `get_offsets` used to retrieve partitions and lo/hi offets. It could cause some dead-lock/long timeout kind of situation sometime (especially in the developper docker environment). commit 2760e322af7c5862e0329198671b49d2755491ef Author: David Douard <david.douard@sdfa3.org> Date: Thu Sep 9 11:39:44 2021 +0200 Simplify the lo/high partition offset computation The computation of lo and high offsets used to be done in 2 steps: - first get the watermak offsets (thus the absolute min and max offsets of the whole partition) - then, as a "hook" in `process()`, retrieve the last committed offset for the partition and "push" these current offsets in the progress queue. Instead, this simplifies a bit this process by quering the committed offsets while computing the hi/low offsets. commit e47a3db1287b3f6ada32c3afb3270ef0947a7659 Author: David Douard <david.douard@sdfa3.org> Date: Thu Sep 9 14:22:37 2021 +0200 Use proper signature for JournalClientOffsetRanges.process() See https://jenkins.softwareheritage.org/job/DDATASET/job/tests-on-diff/10/ for more details. Comment Actions Actually I am not sure I can easily do that: in order for a consumer to start consuming from the beginning, it must declare so during the assignment of partitions (then commit after having started consuming). There is no Consumer API (that I found) allowing to "reset stored offsets" then start consuming from there. (see https://github.com/confluentinc/confluent-kafka-dotnet/issues/995#issuecomment-521320691 for example) So either I kill this diff or it stays "intricate" with the setup of the consumer (so the whole journalprocessor.py) Note: this feature is mainly useful for testing purpose IMHO, so I suppose it's not that critical to keep it, I just find it handy when "playing" with swh dataset export Comment Actions Build is green Patch application report for D6234 (id=22606)Could not rebase; Attempt merge onto 002ee70b99... Updating 002ee70..ee72213 Fast-forward swh/dataset/cli.py | 12 ++++- swh/dataset/journalprocessor.py | 97 +++++++++++++++++++++++++++++------------ 2 files changed, 79 insertions(+), 30 deletions(-) Changes applied before testcommit ee72213c8b1d7ff31b3eab71fac62368840cd406 Author: David Douard <david.douard@sdfa3.org> Date: Mon Sep 13 15:06:53 2021 +0200 Add a --reset option to export_graph cli tool allows to enforce consuming kafka topics from the beginning rather than current position for the consumer group (aka stored offsets). commit c8db7698bc82f1472729fe4148803004b6c93b93 Author: David Douard <david.douard@sdfa3.org> Date: Thu Sep 9 14:24:32 2021 +0200 Commit kafka messages which offset has reach the high limit this is necessary to ensure these messages are committed in kafka, otherwise, since the (considered) empty partition is unsubscribed from, it never gets committed in `JournalClient.handle_messages()` (since this later only commit assigned partitions). Ensure offset are committed only after worker_fn is executed without error. This requires to overload the `JournalClient.handle_messages()` method in `JournalClientOffsetRanges` to make sure "pending" messages are committed after the proper execution of `worker_fn`. Doing so, we can both unsubscribe from "eof" partitions on the fly (with "eof" meaning when the partition has been consumed up to the high watermark offset at the beginning of the export), and commit ALL offsets that need to be, but only after proper execution of the `worker_fn` callback. This should guarantee proper and consistent behavior (famous last word...). commit bd888a6c75d216bdef4653d9cbfdf7147a32ee05 Author: David Douard <david.douard@sdfa3.org> Date: Mon Sep 13 15:04:51 2021 +0200 Add a JournalClientOffsetRanges.unsubscribe() method to make the code a bit clearer. commit 5881ae06f636a74e7fb0addca04127bfe18b687d Author: David Douard <david.douard@sdfa3.org> Date: Thu Sep 9 14:30:25 2021 +0200 Reduce the size of the progress bar so we get a chance to actually have a visible progress bar: - reduce the label size (shorter desc), - use a single 'workers' postfix (like "workers=n/m"). commit 47713ee38c9498a0548535e5b8361d8158ee3e09 Author: David Douard <david.douard@sdfa3.org> Date: Thu Sep 9 11:54:15 2021 +0200 Make sure the progress bar for the export reaches 100% - ensure the last offset is sent to the queue, - fix the computation of the progress value (off-by-one). commit d07b2a632256da4e7778bf7b1f4a02acd03f9ca0 Author: David Douard <david.douard@sdfa3.org> Date: Thu Sep 9 11:47:57 2021 +0200 Explicitly close the temporary kafka consumer in `get_offsets` used to retrieve partitions and lo/hi offets. It could cause some dead-lock/long timeout kind of situation sometime (especially in the developper docker environment). commit 2760e322af7c5862e0329198671b49d2755491ef Author: David Douard <david.douard@sdfa3.org> Date: Thu Sep 9 11:39:44 2021 +0200 Simplify the lo/high partition offset computation The computation of lo and high offsets used to be done in 2 steps: - first get the watermak offsets (thus the absolute min and max offsets of the whole partition) - then, as a "hook" in `process()`, retrieve the last committed offset for the partition and "push" these current offsets in the progress queue. Instead, this simplifies a bit this process by quering the committed offsets while computing the hi/low offsets. commit e47a3db1287b3f6ada32c3afb3270ef0947a7659 Author: David Douard <david.douard@sdfa3.org> Date: Thu Sep 9 14:22:37 2021 +0200 Use proper signature for JournalClientOffsetRanges.process() See https://jenkins.softwareheritage.org/job/DDATASET/job/tests-on-diff/13/ for more details. Comment Actions Meh. How much easier does it make testing, compared to using Kafka's CLI (from the linked comment)? Comment Actions Build is green Patch application report for D6234 (id=22623)Could not rebase; Attempt merge onto 358d84938d... Updating 358d849..b039bc4 Fast-forward swh/dataset/cli.py | 12 ++++++++- swh/dataset/journalprocessor.py | 58 ++++++++++++++++++++++++++++++++++------- 2 files changed, 59 insertions(+), 11 deletions(-) Changes applied before testcommit b039bc4fb55170f600fbaf77789e30db8da8df4e Author: David Douard <david.douard@sdfa3.org> Date: Mon Sep 13 15:06:53 2021 +0200 Add a --reset option to export_graph cli tool allows to enforce consuming kafka topics from the beginning rather than current position for the consumer group (aka stored offsets). commit fb6c2e7246c58ff91c1cc75613635447c3397785 Author: David Douard <david.douard@sdfa3.org> Date: Thu Sep 9 14:24:32 2021 +0200 Commit kafka messages which offset has reach the high limit this is necessary to ensure these messages are committed in kafka, otherwise, since the (considered) empty partition is unsubscribed from, it never gets committed in `JournalClient.handle_messages()` (since this later only commit assigned partitions). Ensure offset are committed only after worker_fn is executed without error. This requires to overload the `JournalClient.handle_messages()` method in `JournalClientOffsetRanges` to make sure "pending" messages are committed after the proper execution of `worker_fn`. Doing so, we can both unsubscribe from "eof" partitions on the fly (with "eof" meaning when the partition has been consumed up to the high watermark offset at the beginning of the export), and commit ALL offsets that need to be, but only after proper execution of the `worker_fn` callback. This should guarantee proper and consistent behavior (famous last word...). commit a507024c5ff4989720044aa3fc42ebbc341659e0 Author: David Douard <david.douard@sdfa3.org> Date: Mon Sep 13 15:04:51 2021 +0200 Add a JournalClientOffsetRanges.unsubscribe() method to make the code a bit clearer. See https://jenkins.softwareheritage.org/job/DDATASET/job/tests-on-diff/18/ for more details. Comment Actions Now I remember, that's one reason I did wrote this, in my docker env, I cannot execute this command (generates an error about a port being already in use...) but that's a valid remark indeed. Comment Actions You could also add a command in swh-dataset's entrypoint.sh that calls whatever Kafka's script does Comment Actions well it works just fine when executed from another container: docker-compose run kafka kafka-consumer-groups.sh --bootstrap-server kafka:9092 --group swh-dataset-export-test20 --topic swh.journal.objects.origin --reset-offsets --to-earliest --execute Starting docker_zookeeper_1 ... done GROUP TOPIC PARTITION NEW-OFFSET swh-dataset-export-test20 swh.journal.objects.origin 4 0 swh-dataset-export-test20 swh.journal.objects.origin 14 0 swh-dataset-export-test20 swh.journal.objects.origin 9 0 swh-dataset-export-test20 swh.journal.objects.origin 8 0 swh-dataset-export-test20 swh.journal.objects.origin 10 0 swh-dataset-export-test20 swh.journal.objects.origin 13 0 swh-dataset-export-test20 swh.journal.objects.origin 12 0 swh-dataset-export-test20 swh.journal.objects.origin 1 0 swh-dataset-export-test20 swh.journal.objects.origin 11 0 swh-dataset-export-test20 swh.journal.objects.origin 6 0 swh-dataset-export-test20 swh.journal.objects.origin 3 0 swh-dataset-export-test20 swh.journal.objects.origin 2 0 swh-dataset-export-test20 swh.journal.objects.origin 15 0 swh-dataset-export-test20 swh.journal.objects.origin 7 0 swh-dataset-export-test20 swh.journal.objects.origin 5 0 swh-dataset-export-test20 swh.journal.objects.origin 0 0 so let's forget about this diff (and maybe add some documentation somewhere) |