Page MenuHomeSoftware Heritage

Migrate to confluent-kafka instead of kafka-python
ClosedPublic

Authored by olasd on Wed, Sep 11, 2:58 PM.

Details

Summary

While kafka-python is a useful library, confluent-kafka is based on rdkafka
which is the upstream-supported kafka client library, and supports more
features (e.g. proper authentication modes) which we should be using when
opening our kafka brokers to the internet.

Test Plan

tox runs

Diff Detail

Repository
rDJNL Journal infrastructure
Lint
Automatic diff as part of commit; lint not applicable.
Unit
Automatic diff as part of commit; unit tests not applicable.

Event Timeline

olasd created this revision.Wed, Sep 11, 2:58 PM
vlorentz requested changes to this revision.Wed, Sep 11, 3:29 PM
vlorentz added a subscriber: vlorentz.
vlorentz added inline comments.
swh/journal/cli.py
106–107 ↗(On Diff #6678)

If the point is to have a log before the first call to process, you could just move the other logging statement here.

swh/journal/client.py
76–99

Nitpick:

consumer_settings = {
    **consumer_settings,
    'bootstrap.servers': ','.join(brokers)
    ...
}

(for consistency with other places in the code base)

120–121

Could be an attribute of the object.

137–140

Are you sure about the error handling? I can't find doc on what a non-fatal error is, so I have no idea if it's correct...

A log message may be nice.

swh/journal/direct_writer.py
73–94

swh-storage uses write_addition for content, snapshot, origin_visit, and origin. They need to be flushed too.

A boolean flag to write_addition defaulting to flushing would do it.

swh/journal/tests/conftest.py
229–230

This comment should be moved a few lines below

swh/journal/tests/test_direct_writer.py
26–49

Why not a JournalClient with max_messages=expected_messages?

swh/journal/tests/test_replay.py
61–63

missing producer.flush()

140

Isn't this simplification working only as a side-effect of the current implementation?

This revision now requires changes to proceed.Wed, Sep 11, 3:29 PM
olasd updated this revision to Diff 6681.Wed, Sep 11, 4:42 PM
olasd marked 5 inline comments as done.

Apply comments from @vlorentz

swh/journal/cli.py
106–107 ↗(On Diff #6678)

The point was to get a log message on every loop because process was blocking because I wrote buggy code. These logging statements are useless now.

swh/journal/client.py
76–99

*bleh*

120–121

Are we ever going to need to override that?

137–140

Pretty sure, yes.

I've added a log message for non-fatal errors.

swh/journal/direct_writer.py
73–94

You're right. They also needed to do that in the previous implementation (which flushed only when the library felt like it), so there's no real degradation of guarantees here.

swh/journal/tests/conftest.py
229–230

Turns out this isn't needed anymore so I scrapped it.

swh/journal/tests/test_direct_writer.py
26–49

(I kept the original rationale here)

Because we only want to test the JournalWriter in this test, not the combination of the JournalWriter *and* the Client.

swh/journal/tests/test_replay.py
140

I don't understand this question.

The whole point of this queue business was to mock what the upstream kafka consumer produces, so we don't need to add a kafka in the loop. With kafka-python, the consumer produces batches of messages grouped by partition; with confluent-kafka, it just produces a bunch of individual messages.

olasd updated this revision to Diff 6682.Wed, Sep 11, 4:43 PM

Run tox before pushing next time

vlorentz accepted this revision.Wed, Sep 11, 5:20 PM
vlorentz added inline comments.
swh/journal/client.py
120–121

I don't know

swh/journal/tests/test_direct_writer.py
26–49

You're reimplementing a large part of the Client though

This revision is now accepted and ready to land.Wed, Sep 11, 5:20 PM
This revision was automatically updated to reflect the committed changes.