swh_storage = <swh.storage.in_memory.InMemoryStorage object at 0x7f89a2c1d4a8>
kafka_prefix = 'mfgubqugtr.swh.journal.objects'
kafka_consumer_group = 'test-consumer-mfgubqugtr'
kafka_server = '127.0.0.1:37923'
def test_replay(
swh_storage, kafka_prefix: str, kafka_consumer_group: str, kafka_server: str,
):
kafka_prefix += ".swh.journal.objects"
producer = Producer(
{
"bootstrap.servers": kafka_server,
"client.id": "test-producer",
"acks": "all",
}
)
snapshot = Snapshot(
branches={
b"HEAD": SnapshotBranch(
target_type=TargetType.REVISION, target=b"\x01" * 20,
)
},
)
snapshot_dict = snapshot.to_dict()
producer.produce(
topic=kafka_prefix + ".snapshot",
key=key_to_kafka(snapshot.id),
value=value_to_kafka(snapshot_dict),
)
producer.flush()
logger.debug("Flushed producer")
result = invoke(
"replay",
"--stop-after-objects",
"1",
journal_config={
"brokers": [kafka_server],
"group_id": kafka_consumer_group,
"prefix": kafka_prefix,
},
)
expected = r"Done.\n"
assert result.exit_code == 0, result.output
assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
> assert swh_storage.snapshot_get(snapshot.id) == {
**snapshot_dict,
"next_branch": None,
}
E AssertionError: assert None == {'branches': ...branch': None}
E +None
E -{'branches': {b'HEAD': {'target': b'\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01', 'target_type': 'revision'}}, 'id': b'I\xefE\xd9E#\xf3\xfbnS\xd3\x0e\x01tE\xf2U\xc0\x9b\x13', 'next_branch': None}
.tox/py3/lib/python3.7/site-packages/swh/storage/tests/test_cli.py:111: AssertionError
TEST RESULT
TEST RESULT
- Run At
- Aug 12 2020, 7:43 PM