Page MenuHomeSoftware Heritage

Simplify the lo/high partition offset computation
ClosedPublic

Authored by douardda on Sep 9 2021, 5:52 PM.

Details

Summary

The computation of lo and high offsets used to be done in 2 steps:

  • first get the watermak offsets (thus the absolute min and max offsets of the whole partition)
  • then, as a "hook" in process(), retrieve the last committed offset for the partition and "push" these current offsets in the progress queue.

Instead, this simplifies a bit this process by quering the committed
offsets while computing the hi/low offsets.

Also in this Diff:

  • Use proper signature for JournalClientOffsetRanges.process()
  • Explicitly close the temporary kafka consumer in get_offsets

    used to retrieve partitions and lo/hi offets.

    It could cause some dead-lock/long timeout kind of situation sometime (especially in the developper docker environment).

Diff Detail

Repository
rDDATASET Datasets
Lint
Automatic diff as part of commit; lint not applicable.
Unit
Automatic diff as part of commit; unit tests not applicable.

Event Timeline

Build is green

Patch application report for D6232 (id=22548)

Rebasing onto 002ee70b99...

Current branch diff-target is up to date.
Changes applied before test
commit 3a2f5076dcbf791d1ef43982b70551f048ee7c3e
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Sep 9 11:47:57 2021 +0200

    Explicitly close the temporary kafka consumer in `get_offsets`
    
    used to retrieve partitions and lo/hi offets.
    
    It could cause some dead-lock/long timeout kind of situation sometime
    (especially in the developper docker environment).

commit 45126fd621e8b75c592d7c6cd3d8d1337f95c97e
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Sep 9 11:39:44 2021 +0200

    Simplify the lo/high partition offset computation
    
    The computation of lo and high offsets used to be done in 2 steps:
    - first get the watermak offsets (thus the absolute min and max offsets
      of the whole partition)
    - then, as a "hook" in `process()`, retrieve the last committed offset
      for the partition and "push" these current offsets in the progress
      queue.
    
    Instead, this simplifies a bit this process by quering the committed
    offsets while computing the hi/low offsets.

commit e47a3db1287b3f6ada32c3afb3270ef0947a7659
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Sep 9 14:22:37 2021 +0200

    Use proper signature for JournalClientOffsetRanges.process()

See https://jenkins.softwareheritage.org/job/DDATASET/job/tests-on-diff/3/ for more details.

I just tried it with an empty docker instance, and I'm getting this:

Exporting edges and nodes
Exporting origin:
  - Partition offsets: 100%|████████████████████████████████████████████████████████████████████████| 16/16 [00:00<00:00, 9007.90it/s]
  - Journal export (origin): 0it [00:00, ?it/s]
Exporting origin_visit:
  - Partition offsets: 100%|████████████████████████████████████████████████████████████████████████| 16/16 [00:00<00:00, 1283.47it/s]
  - Journal export (origin_visit): 0it [00:00, ?it/s]
Exporting origin_visit_status:
  - Partition offsets: 100%|████████████████████████████████████████████████████████████████████████| 16/16 [00:00<00:00, 3153.32it/s]
  - Journal export (origin_visit_status): 0it [00:00, ?it/s]
Exporting snapshot:
  - Partition offsets: 100%|████████████████████████████████████████████████████████████████████████| 16/16 [00:00<00:00, 4813.09it/s]
  - Journal export (snapshot): 0it [00:00, ?it/s]
Exporting release:
  - Partition offsets: 100%|████████████████████████████████████████████████████████████████████████| 16/16 [00:00<00:00, 1780.88it/s]
  - Journal export (release): 0it [00:00, ?it/s]
Exporting revision:
  - Partition offsets: 100%|████████████████████████████████████████████████████████████████████████| 16/16 [00:00<00:00, 1698.40it/s]
  - Journal export (revision): 0it [00:00, ?it/s]
Exporting directory:
  - Partition offsets: 100%|████████████████████████████████████████████████████████████████████████| 16/16 [00:00<00:00, 3340.91it/s]
  - Journal export (directory): 0it [00:00, ?it/s]
Exporting content:
  - Partition offsets: 100%|████████████████████████████████████████████████████████████████████████| 16/16 [00:00<00:00, 3497.44it/s]
  - Journal export (content): 0it [00:00, ?it/s]
Exporting skipped_content:
  - Partition offsets: 100%|████████████████████████████████████████████████████████████████████████| 16/16 [00:00<00:00, 1993.37it/s]
  - Journal export (skipped_content): 0it [00:00, ?it/s]
Sorting edges and nodes
pv: /srv/softwareheritage/graph/g/edges/*/*.edges.csv.zst: No such file or directory
zstd: can't stat /srv/softwareheritage/graph/g/edges/*/*.nodes.csv.zst : No such file or directory -- ignored 
zstd: /*stdin*\: unexpected end of file

Seems to be a preexisting issue. Nevermind!

This revision is now accepted and ready to land.Sep 10 2021, 4:26 PM

I think this makes sense, but I'd appreciate @seirl's look on this as well. I made a few cosmetic comments regardless.

swh/dataset/journalprocessor.py
174

spurious comment :-)

178

Maybe worth a comment, saying that you're only processing the partition if there's been new messages since your last commit, as the condition is the same as the previous one

Add an explicit "skipped" message if a nothin is to be consumed for a topic

and several typos and fixes as reported by reviewers (thx).

Build is green

Patch application report for D6232 (id=22580)

Rebasing onto 002ee70b99...

Current branch diff-target is up to date.
Changes applied before test
commit d07b2a632256da4e7778bf7b1f4a02acd03f9ca0
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Sep 9 11:47:57 2021 +0200

    Explicitly close the temporary kafka consumer in `get_offsets`
    
    used to retrieve partitions and lo/hi offets.
    
    It could cause some dead-lock/long timeout kind of situation sometime
    (especially in the developper docker environment).

commit 2760e322af7c5862e0329198671b49d2755491ef
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Sep 9 11:39:44 2021 +0200

    Simplify the lo/high partition offset computation
    
    The computation of lo and high offsets used to be done in 2 steps:
    - first get the watermak offsets (thus the absolute min and max offsets
      of the whole partition)
    - then, as a "hook" in `process()`, retrieve the last committed offset
      for the partition and "push" these current offsets in the progress
      queue.
    
    Instead, this simplifies a bit this process by quering the committed
    offsets while computing the hi/low offsets.

commit e47a3db1287b3f6ada32c3afb3270ef0947a7659
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Sep 9 14:22:37 2021 +0200

    Use proper signature for JournalClientOffsetRanges.process()

See https://jenkins.softwareheritage.org/job/DDATASET/job/tests-on-diff/8/ for more details.