kafka_prefix = 'zdpqjvezem', kafka_server = '127.0.0.1:50137'
consumer = <cimpl.Consumer object at 0x7f6247d7c688>
def test_storage_direct_writer(kafka_prefix: str, kafka_server, consumer: Consumer):
writer_config = {
"cls": "kafka",
"brokers": [kafka_server],
"client_id": "kafka_writer",
"prefix": kafka_prefix,
"anonymize": False,
}
storage_config: Dict[str, Any] = {
"cls": "pipeline",
"steps": [{"cls": "memory", "journal_writer": writer_config},],
}
storage = get_storage(**storage_config)
expected_messages = 0
for obj_type, objs in TEST_OBJECTS.items():
method = getattr(storage, obj_type + "_add")
if obj_type in (
"content",
"skipped_content",
"directory",
"extid",
"metadata_authority",
"metadata_fetcher",
"revision",
"release",
"snapshot",
"origin",
"origin_visit_status",
"raw_extrinsic_metadata",
):
> method(objs)
.tox/py3/lib/python3.7/site-packages/swh/storage/tests/test_kafka_writer.py:54:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <swh.storage.in_memory.InMemoryStorage object at 0x7f605d654fd0>
ids = [ExtID(extid_type='git256', extid=b'\x03\x03\x03\x03\x03\x03\x03\x03\x03\x03\x03\x03\x03\x03\x03\x03\x03\x03\x03\x03\x...', object_type=<ObjectType.REVISION: 'rev'>), id=b'\xb6\xf9ap\x1a\xf5\x89\xea{\xeb\xf8\xf8\xea\x85\x9e\x0b\xc6\xe8#:')]
def extid_add(self, ids: List[ExtID]) -> Dict[str, int]:
if not self._allow_overwrite:
extids = [
extid
for extid in ids
if not self._cql_runner.extid_get_from_pk(
extid_type=extid.extid_type, extid=extid.extid, target=extid.target,
)
]
> self.journal_writer.extid_add(extids)
E UnboundLocalError: local variable 'extids' referenced before assignment
.tox/py3/lib/python3.7/site-packages/swh/storage/cassandra/storage.py:1398: UnboundLocalError
TEST RESULT
TEST RESULT
- Run At
- Apr 29 2021, 11:06 AM