swh_storage_backend_config = {'check_config': {'check_write': True}, 'cls': 'local', 'db': "dbname=storage user=postgres host=127.0.0.1 port=22755 ...riter': {'brokers': ['127.0.0.1:50137'], 'client_id': 'kafka_writer-1', 'cls': 'kafka', 'prefix': 'yogcppttap-1'}, ...}
kafka_prefix = 'yogcppttap', kafka_consumer_group = 'test-consumer-yogcppttap'
kafka_server = '127.0.0.1:50137'
caplog = <_pytest.logging.LogCaptureFixture object at 0x7f6247e44978>
@patch("swh.storage.backfill.RANGE_GENERATORS", RANGE_GENERATORS)
def test_backfiller(
swh_storage_backend_config,
kafka_prefix: str,
kafka_consumer_group: str,
kafka_server: str,
caplog,
):
prefix1 = f"{kafka_prefix}-1"
prefix2 = f"{kafka_prefix}-2"
journal1 = {
"cls": "kafka",
"brokers": [kafka_server],
"client_id": "kafka_writer-1",
"prefix": prefix1,
}
swh_storage_backend_config["journal_writer"] = journal1
storage = get_storage(**swh_storage_backend_config)
# fill the storage and the journal (under prefix1)
for object_type, objects in TEST_OBJECTS.items():
method = getattr(storage, object_type + "_add")
method(objects)
# now apply the backfiller on the storage to fill the journal under prefix2
backfiller_config = {
"journal_writer": {
"brokers": [kafka_server],
"client_id": "kafka_writer-2",
"prefix": prefix2,
},
"storage": swh_storage_backend_config,
}
# Backfilling
backfiller = JournalBackfiller(backfiller_config)
for object_type in TEST_OBJECTS:
backfiller.run(object_type, None, None)
# Trace log messages for unhandled object types in the replayer
caplog.set_level(logging.DEBUG, "swh.storage.replay")
# now check journal content are the same under both topics
# use the replayer scaffolding to fill storages to make is a bit easier
# Replaying #1
sto1 = get_storage(cls="memory")
replayer1 = JournalClient(
brokers=kafka_server,
group_id=f"{kafka_consumer_group}-1",
prefix=prefix1,
stop_on_eof=True,
)
worker_fn1 = functools.partial(process_replay_objects, storage=sto1)
> replayer1.process(worker_fn1)
.tox/py3/lib/python3.7/site-packages/swh/storage/tests/test_backfill.py:277:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.tox/py3/lib/python3.7/site-packages/swh/journal/client.py:265: in process
batch_processed, at_eof = self.handle_messages(messages, worker_fn)
.tox/py3/lib/python3.7/site-packages/swh/journal/client.py:292: in handle_messages
worker_fn(dict(objects))
.tox/py3/lib/python3.7/site-packages/swh/storage/replay.py:62: in process_replay_objects
_insert_objects(object_type, objects, storage)
.tox/py3/lib/python3.7/site-packages/swh/storage/replay.py:177: in _insert_objects
method([object_converter_fn[object_type](o) for o in objects])
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <swh.storage.in_memory.InMemoryStorage object at 0x7f6248086940>
ids = [ExtID(extid_type='git256', extid=b'\x03\x03\x03\x03\x03\x03\x03\x03\x03\x03\x03\x03\x03\x03\x03\x03\x03\x03\x03\x03\x...', object_type=<ObjectType.REVISION: 'rev'>), id=b'\xb6\xf9ap\x1a\xf5\x89\xea{\xeb\xf8\xf8\xea\x85\x9e\x0b\xc6\xe8#:')]
def extid_add(self, ids: List[ExtID]) -> Dict[str, int]:
if not self._allow_overwrite:
extids = [
extid
for extid in ids
if not self._cql_runner.extid_get_from_pk(
extid_type=extid.extid_type, extid=extid.extid, target=extid.target,
)
]
> self.journal_writer.extid_add(extids)
E UnboundLocalError: local variable 'extids' referenced before assignment
.tox/py3/lib/python3.7/site-packages/swh/storage/cassandra/storage.py:1398: UnboundLocalError
TEST RESULT
TEST RESULT
- Run At
- Apr 29 2021, 11:06 AM