Page MenuHomeSoftware Heritage

add a kafka_stream_to_value helper function in serializers.py
ClosedPublic

Authored by douardda on Jun 1 2022, 11:38 AM.

Details

Summary

similar to kafka_to_value but takes a stream as input and return a
msgpack.Unpacker object on which the user can iterate.

also make the stream journal client accept a string as output_stream config entry
and support the "-" value (for stdout).

This is useful to be able to dump a storage content is a journal-compat file,
eg. using a replayer with the following config file:

storage:
  cls: memory
  journal_writer:
    cls: stream
    output_stream: dump.msgpack
journal_client:
  xxx

or using a backfiller with something like:

storage:
  cls: postgresql
  db: xxx
journal_writer:
  cls: stream
  output_stream: dump.msgpack

Event Timeline

Build is green

Patch application report for D7933 (id=28583)

Rebasing onto d1b7ad5d9d...

First, rewinding head to replay your work on top of it...
Applying: add a kafka_stream_to_value helper function in serializers.py
Applying: make the stream journal client accept a string as output_stream config entry
Changes applied before test
commit 57e11fcc07a617fc240ed09677aad5634e2547cc
Author: David Douard <david.douard@sdfa3.org>
Date:   Wed Jun 1 09:48:17 2022 +0200

    make the stream journal client accept a string as output_stream config entry
    
    and support the "-" value (for stdout).

commit 3fe7bf36c8c83d039d99b322998407cdb7ad7fb4
Author: David Douard <david.douard@sdfa3.org>
Date:   Wed Jun 1 10:15:48 2022 +0200

    add a kafka_stream_to_value helper function in serializers.py
    
    similar to kafka_to_value but takes a stream as input and return a
    msgpack.Unpacker object on which the user can iterate.

See https://jenkins.softwareheritage.org/job/DJNL/job/tests-on-diff/207/ for more details.

ardumont added a subscriber: ardumont.

Not sure i get all this but this rather lgtm

swh/journal/tests/test_stream.py
13

Can't this be typed a bit?

70
This revision is now accepted and ready to land.Jun 2 2022, 12:56 PM
vlorentz added inline comments.
swh/journal/serializers.py
116–117

What is `stream` (iterator/file-like object/...)?

swh/journal/serializers.py
116–117

file-like object (actually something msgpack.Unpacker will eat). Renaming this arg is probably a good idea...

swh/journal/tests/test_stream.py
13

not sure it's worth it, but I'll try

70

did not know either before this :-)

Add type annotation in swh.journal.writer and rename the 'stream' arg as 'file_like'

Build is green

Patch application report for D7933 (id=28631)

Rebasing onto d1b7ad5d9d...

Current branch diff-target is up to date.
Changes applied before test
commit 569b151094ef3eee3ec6f42f49c9ec46632f51c8
Author: David Douard <david.douard@sdfa3.org>
Date:   Wed Jun 1 09:48:17 2022 +0200

    Make the stream journal client accept a string as output_stream config entry
    
    and support the "-" value (for stdout).

commit 6ea59ed6cc280618a4ae79e822852c45f6d7ea77
Author: David Douard <david.douard@sdfa3.org>
Date:   Wed Jun 1 10:15:48 2022 +0200

    Add a kafka_stream_to_value helper function in serializers.py
    
    similar to kafka_to_value but takes a stream as input and return a
    msgpack.Unpacker object on which the user can iterate.

commit 45e8d0752d2e778f6cf59cbe5aefd8c69c7eef5f
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Jun 2 17:17:39 2022 +0200

    Add type annotation for JournalWriter related code

See https://jenkins.softwareheritage.org/job/DJNL/job/tests-on-diff/208/ for more details.

Extract the type annotation part of this diff in a dedicated one

Depends on D7952.

Build is green

Patch application report for D7933 (id=28638)

Could not rebase; Attempt merge onto d1b7ad5d9d...

Updating d1b7ad5..5c61603
Fast-forward
 swh/journal/serializers.py         | 15 +++++++-
 swh/journal/tests/test_inmemory.py | 26 ++++----------
 swh/journal/tests/test_stream.py   | 72 +++++++++++++++++++++++++++-----------
 swh/journal/writer/__init__.py     | 46 +++++++++++++-----------
 swh/journal/writer/inmemory.py     | 26 ++++++--------
 swh/journal/writer/interface.py    | 41 ++++++++++++++++++++++
 swh/journal/writer/kafka.py        | 10 ++----
 swh/journal/writer/stream.py       | 22 +++++-------
 8 files changed, 159 insertions(+), 99 deletions(-)
 create mode 100644 swh/journal/writer/interface.py
Changes applied before test
commit 5c616035d09243373cd3f4178ba6bdb5f9377fcb
Author: David Douard <david.douard@sdfa3.org>
Date:   Wed Jun 1 09:48:17 2022 +0200

    Make the stream journal client accept a string as output_stream config entry
    
    and support the "-" value (for stdout).

commit f57a86e71e84f1fbe414e28fe2ff4c460c3db725
Author: David Douard <david.douard@sdfa3.org>
Date:   Wed Jun 1 10:15:48 2022 +0200

    Add a kafka_stream_to_value helper function in serializers.py
    
    similar to kafka_to_value but takes a stream as input and return a
    msgpack.Unpacker object on which the user can iterate.

commit 52398ad80336a3d850ca8e7264b4ce20d5c34d89
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Jun 2 17:17:39 2022 +0200

    Add type annotation for JournalWriter related code
    
    - add a JournalWriterInterface in a new interface.py module,
    - add a flush() method to this interface,
    - remove (now unused) write_update methods,
    - move all type-related code in this module,
    - fix InMemoryJournalWriter type annotations.

commit db9d20294153430aad0975ecb67d50732fbf7699
Author: David Douard <david.douard@sdfa3.org>
Date:   Fri Jun 3 09:46:16 2022 +0200

    Remove unused 'privileged' argument from journal writer's write_addition(s)
    
    for StreamJournalWriter and InMemoryJournalWriter, and make them
    consistent with KafkaJournalWriter.

See https://jenkins.softwareheritage.org/job/DJNL/job/tests-on-diff/210/ for more details.

rebase and update copyrights

swh/journal/serializers.py
116–117

I think the convention is to call it fd (eg. see the os module) rather than file-like.

Build is green

Patch application report for D7933 (id=28640)

Could not rebase; Attempt merge onto d1b7ad5d9d...

Updating d1b7ad5..9b69a08
Fast-forward
 swh/journal/serializers.py         | 17 +++++++--
 swh/journal/tests/test_inmemory.py | 31 +++++++---------
 swh/journal/tests/test_stream.py   | 74 ++++++++++++++++++++++++++------------
 swh/journal/writer/__init__.py     | 48 +++++++++++++------------
 swh/journal/writer/inmemory.py     | 28 +++++++--------
 swh/journal/writer/interface.py    | 41 +++++++++++++++++++++
 swh/journal/writer/kafka.py        | 12 ++-----
 swh/journal/writer/stream.py       | 24 +++++--------
 8 files changed, 170 insertions(+), 105 deletions(-)
 create mode 100644 swh/journal/writer/interface.py
Changes applied before test
commit 9b69a08c31695cc585b919ca5615b3aee736b47f
Author: David Douard <david.douard@sdfa3.org>
Date:   Wed Jun 1 09:48:17 2022 +0200

    Make the stream journal client accept a string as output_stream config entry
    
    and support the "-" value (for stdout).

commit 6b4f9599b80a36435dcc2913db3cd44d0253d596
Author: David Douard <david.douard@sdfa3.org>
Date:   Wed Jun 1 10:15:48 2022 +0200

    Add a kafka_stream_to_value helper function in serializers.py
    
    similar to kafka_to_value but takes a stream as input and return a
    msgpack.Unpacker object on which the user can iterate.

commit 5b91758fd5f7691946d73fa93e0f4e7123cba2a0
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Jun 2 17:17:39 2022 +0200

    Add type annotation for JournalWriter related code
    
    - add a JournalWriterInterface in a new interface.py module,
    - add a flush() method to this interface,
    - remove (now unused) write_update methods,
    - move all type-related code in this module,
    - fix InMemoryJournalWriter type annotations.

commit 908f015465a3bcd20b0701e7ec80ca4d5cb0b223
Author: David Douard <david.douard@sdfa3.org>
Date:   Fri Jun 3 09:46:16 2022 +0200

    Remove unused 'privileged' argument from journal writer's write_addition(s)
    
    for StreamJournalWriter and InMemoryJournalWriter, and make them
    consistent with KafkaJournalWriter.

See https://jenkins.softwareheritage.org/job/DJNL/job/tests-on-diff/212/ for more details.

swh/journal/serializers.py
116–117

file_like is the name of the argument in msgpack.Unpacker, so I'll stick with it.

swh/journal/tests/test_stream.py
13

So D7952 it is then.

Build has FAILED

Patch application report for D7933 (id=28650)

Could not rebase; Attempt merge onto d1b7ad5d9d...

Updating d1b7ad5..8eb5a5d
Fast-forward
 swh/journal/serializers.py             | 17 +++++++-
 swh/journal/tests/test_inmemory.py     | 31 ++++++--------
 swh/journal/tests/test_kafka_writer.py | 12 +++---
 swh/journal/tests/test_stream.py       | 74 ++++++++++++++++++++++++----------
 swh/journal/writer/__init__.py         | 48 ++++++++++++----------
 swh/journal/writer/inmemory.py         | 32 +++++++--------
 swh/journal/writer/interface.py        | 40 ++++++++++++++++++
 swh/journal/writer/kafka.py            | 42 +++++++------------
 swh/journal/writer/stream.py           | 24 +++++------
 9 files changed, 190 insertions(+), 130 deletions(-)
 create mode 100644 swh/journal/writer/interface.py
Changes applied before test
commit 8eb5a5d244f13aa99edd5c4721e5cd0e01989de9
Author: David Douard <david.douard@sdfa3.org>
Date:   Wed Jun 1 09:48:17 2022 +0200

    Make the stream journal client accept a string as output_stream config entry
    
    and support the "-" value (for stdout).

commit dc1efc74314e97dcae9c4f75ee9a74aaca7ac5b3
Author: David Douard <david.douard@sdfa3.org>
Date:   Wed Jun 1 10:15:48 2022 +0200

    Add a kafka_stream_to_value helper function in serializers.py
    
    similar to kafka_to_value but takes a stream as input and return a
    msgpack.Unpacker object on which the user can iterate.

commit 2d2c6a7dc487e2a7741a7a7652225fcf90d6d7f9
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Jun 2 17:17:39 2022 +0200

    Add type annotation for JournalWriter related code
    
    - add a JournalWriterInterface in a new interface.py module,
    - add a flush() method to this interface,
    - remove (now unused) write_update methods,
    - move all type-related code in this module,
    - fix InMemoryJournalWriter type annotations,
    - remove usage of the TValue TypeVar in favor of using ValueProtocol
      directly.
    
    foldme

commit 908f015465a3bcd20b0701e7ec80ca4d5cb0b223
Author: David Douard <david.douard@sdfa3.org>
Date:   Fri Jun 3 09:46:16 2022 +0200

    Remove unused 'privileged' argument from journal writer's write_addition(s)
    
    for StreamJournalWriter and InMemoryJournalWriter, and make them
    consistent with KafkaJournalWriter.

Link to build: https://jenkins.softwareheritage.org/job/DJNL/job/tests-on-diff/214/
See console output for more information: https://jenkins.softwareheritage.org/job/DJNL/job/tests-on-diff/214/console

Build has FAILED

Patch application report for D7933 (id=28653)

Could not rebase; Attempt merge onto d1b7ad5d9d...

Updating d1b7ad5..8eb5a5d
Fast-forward
 swh/journal/serializers.py             | 17 +++++++-
 swh/journal/tests/test_inmemory.py     | 31 ++++++--------
 swh/journal/tests/test_kafka_writer.py | 12 +++---
 swh/journal/tests/test_stream.py       | 74 ++++++++++++++++++++++++----------
 swh/journal/writer/__init__.py         | 48 ++++++++++++----------
 swh/journal/writer/inmemory.py         | 32 +++++++--------
 swh/journal/writer/interface.py        | 40 ++++++++++++++++++
 swh/journal/writer/kafka.py            | 42 +++++++------------
 swh/journal/writer/stream.py           | 24 +++++------
 9 files changed, 190 insertions(+), 130 deletions(-)
 create mode 100644 swh/journal/writer/interface.py
Changes applied before test
commit 8eb5a5d244f13aa99edd5c4721e5cd0e01989de9
Author: David Douard <david.douard@sdfa3.org>
Date:   Wed Jun 1 09:48:17 2022 +0200

    Make the stream journal client accept a string as output_stream config entry
    
    and support the "-" value (for stdout).

commit dc1efc74314e97dcae9c4f75ee9a74aaca7ac5b3
Author: David Douard <david.douard@sdfa3.org>
Date:   Wed Jun 1 10:15:48 2022 +0200

    Add a kafka_stream_to_value helper function in serializers.py
    
    similar to kafka_to_value but takes a stream as input and return a
    msgpack.Unpacker object on which the user can iterate.

commit 2d2c6a7dc487e2a7741a7a7652225fcf90d6d7f9
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Jun 2 17:17:39 2022 +0200

    Add type annotation for JournalWriter related code
    
    - add a JournalWriterInterface in a new interface.py module,
    - add a flush() method to this interface,
    - remove (now unused) write_update methods,
    - move all type-related code in this module,
    - fix InMemoryJournalWriter type annotations,
    - remove usage of the TValue TypeVar in favor of using ValueProtocol
      directly.
    
    foldme

commit 908f015465a3bcd20b0701e7ec80ca4d5cb0b223
Author: David Douard <david.douard@sdfa3.org>
Date:   Fri Jun 3 09:46:16 2022 +0200

    Remove unused 'privileged' argument from journal writer's write_addition(s)
    
    for StreamJournalWriter and InMemoryJournalWriter, and make them
    consistent with KafkaJournalWriter.

Link to build: https://jenkins.softwareheritage.org/job/DJNL/job/tests-on-diff/216/
See console output for more information: https://jenkins.softwareheritage.org/job/DJNL/job/tests-on-diff/216/console

Build is green

Patch application report for D7933 (id=28655)

Could not rebase; Attempt merge onto d1b7ad5d9d...

Updating d1b7ad5..a208f88
Fast-forward
 swh/journal/serializers.py             | 17 +++++++-
 swh/journal/tests/test_inmemory.py     | 39 +++++++-----------
 swh/journal/tests/test_kafka_writer.py | 12 +++---
 swh/journal/tests/test_stream.py       | 74 ++++++++++++++++++++++++----------
 swh/journal/writer/__init__.py         | 48 ++++++++++++----------
 swh/journal/writer/inmemory.py         | 32 +++++++--------
 swh/journal/writer/interface.py        | 40 ++++++++++++++++++
 swh/journal/writer/kafka.py            | 42 +++++++------------
 swh/journal/writer/stream.py           | 24 +++++------
 9 files changed, 192 insertions(+), 136 deletions(-)
 create mode 100644 swh/journal/writer/interface.py
Changes applied before test
commit a208f882a7e5d977ae0198a37b0d99645f34ceb7
Author: David Douard <david.douard@sdfa3.org>
Date:   Wed Jun 1 09:48:17 2022 +0200

    Make the stream journal client accept a string as output_stream config entry
    
    and support the "-" value (for stdout).

commit 0e90328a226044bf1248620e72b5affaafc91810
Author: David Douard <david.douard@sdfa3.org>
Date:   Wed Jun 1 10:15:48 2022 +0200

    Add a kafka_stream_to_value helper function in serializers.py
    
    similar to kafka_to_value but takes a stream as input and return a
    msgpack.Unpacker object on which the user can iterate.

commit 903f8e601aacaaf2d9326a4d7e2595ed3098913c
Author: David Douard <david.douard@sdfa3.org>
Date:   Thu Jun 2 17:17:39 2022 +0200

    Add type annotation for JournalWriter related code
    
    - add a JournalWriterInterface in a new interface.py module,
    - add a flush() method to this interface,
    - remove (now unused) write_update methods,
    - move all type-related code in this module,
    - fix InMemoryJournalWriter type annotations,
    - remove usage of the TValue TypeVar in favor of using ValueProtocol
      directly.

commit 908f015465a3bcd20b0701e7ec80ca4d5cb0b223
Author: David Douard <david.douard@sdfa3.org>
Date:   Fri Jun 3 09:46:16 2022 +0200

    Remove unused 'privileged' argument from journal writer's write_addition(s)
    
    for StreamJournalWriter and InMemoryJournalWriter, and make them
    consistent with KafkaJournalWriter.

See https://jenkins.softwareheritage.org/job/DJNL/job/tests-on-diff/218/ for more details.