Page MenuHomeSoftware Heritage

Migrate to confluent-kafka instead of kafka-python
ClosedPublic

Authored by olasd on Sep 11 2019, 2:58 PM.

Details

Summary

While kafka-python is a useful library, confluent-kafka is based on rdkafka
which is the upstream-supported kafka client library, and supports more
features (e.g. proper authentication modes) which we should be using when
opening our kafka brokers to the internet.

Test Plan

tox runs

Diff Detail

Repository
rDJNL Journal infrastructure
Branch
confluent-kafka
Lint
No Linters Available
Unit
No Unit Test Coverage
Build Status
Buildable 7764
Build 11164: tox-on-jenkinsJenkins
Build 11163: arc lint + arc unit

Event Timeline

vlorentz added a subscriber: vlorentz.
vlorentz added inline comments.
swh/journal/cli.py
106–107

If the point is to have a log before the first call to process, you could just move the other logging statement here.

swh/journal/client.py
76–84

Nitpick:

consumer_settings = {
    **consumer_settings,
    'bootstrap.servers': ','.join(brokers)
    ...
}

(for consistency with other places in the code base)

119–120

Could be an attribute of the object.

136–139

Are you sure about the error handling? I can't find doc on what a non-fatal error is, so I have no idea if it's correct...

A log message may be nice.

swh/journal/direct_writer.py
71–85

swh-storage uses write_addition for content, snapshot, origin_visit, and origin. They need to be flushed too.

A boolean flag to write_addition defaulting to flushing would do it.

swh/journal/tests/conftest.py
237–238

This comment should be moved a few lines below

swh/journal/tests/test_direct_writer.py
26–49

Why not a JournalClient with max_messages=expected_messages?

swh/journal/tests/test_replay.py
61

missing producer.flush()

138

Isn't this simplification working only as a side-effect of the current implementation?

This revision now requires changes to proceed.Sep 11 2019, 3:29 PM
olasd marked 5 inline comments as done.

Apply comments from @vlorentz

swh/journal/cli.py
106–107

The point was to get a log message on every loop because process was blocking because I wrote buggy code. These logging statements are useless now.

swh/journal/client.py
76–84

*bleh*

119–120

Are we ever going to need to override that?

136–139

Pretty sure, yes.

I've added a log message for non-fatal errors.

swh/journal/direct_writer.py
71–85

You're right. They also needed to do that in the previous implementation (which flushed only when the library felt like it), so there's no real degradation of guarantees here.

swh/journal/tests/conftest.py
237–238

Turns out this isn't needed anymore so I scrapped it.

swh/journal/tests/test_direct_writer.py
26–49

(I kept the original rationale here)

Because we only want to test the JournalWriter in this test, not the combination of the JournalWriter *and* the Client.

swh/journal/tests/test_replay.py
138

I don't understand this question.

The whole point of this queue business was to mock what the upstream kafka consumer produces, so we don't need to add a kafka in the loop. With kafka-python, the consumer produces batches of messages grouped by partition; with confluent-kafka, it just produces a bunch of individual messages.

Run tox before pushing next time

vlorentz added inline comments.
swh/journal/client.py
119–120

I don't know

swh/journal/tests/test_direct_writer.py
26–49

You're reimplementing a large part of the Client though

This revision is now accepted and ready to land.Sep 11 2019, 5:20 PM
This revision was automatically updated to reflect the committed changes.