swh_storage = <swh.storage.in_memory.InMemoryStorage object at 0x7f5339dd22e8>
kafka_prefix = 'istrfdcsrf.swh.journal.objects'
kafka_consumer_group = 'test-consumer-istrfdcsrf'
kafka_server = '127.0.0.1:47173'
def test_replay(
swh_storage, kafka_prefix: str, kafka_consumer_group: str, kafka_server: str,
):
kafka_prefix += ".swh.journal.objects"
producer = Producer(
{
"bootstrap.servers": kafka_server,
"client.id": "test-producer",
"acks": "all",
}
)
snapshot = Snapshot(
branches={
b"HEAD": SnapshotBranch(
target_type=TargetType.REVISION, target=b"\x01" * 20,
)
},
)
snapshot_dict = snapshot.to_dict()
producer.produce(
topic=kafka_prefix + ".snapshot",
key=key_to_kafka(snapshot.id),
value=value_to_kafka(snapshot_dict),
)
producer.flush()
logger.debug("Flushed producer")
result = invoke(
"replay",
"--stop-after-objects",
"1",
journal_config={
"brokers": [kafka_server],
"group_id": kafka_consumer_group,
"prefix": kafka_prefix,
},
)
expected = r"Done.\n"
> assert result.exit_code == 0, result.output
E AssertionError:
E assert 1 == 0
E +1
E -0
.tox/py3/lib/python3.7/site-packages/swh/storage/tests/test_cli.py:106: AssertionError
TEST RESULT
TEST RESULT
- Run At
- Dec 9 2021, 5:04 PM