kafka_prefix = 'qpgcvgmanf', kafka_consumer_group = 'test-consumer-qpgcvgmanf'
kafka_server = '127.0.0.1:50137', privileged = False
@pytest.mark.parametrize("privileged", [True, False])
def test_storage_play_anonymized(
kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, privileged: bool,
):
"""Optimal replayer scenario.
This:
- writes objects to the topic
- replayer consumes objects from the topic and replay them
This tests the behavior with both a privileged and non-privileged replayer
"""
writer_config = {
"cls": "kafka",
"brokers": [kafka_server],
"client_id": "kafka_writer",
"prefix": kafka_prefix,
"anonymize": True,
}
src_config: Dict[str, Any] = {"cls": "memory", "journal_writer": writer_config}
storage = get_storage(**src_config)
# Fill the src storage
nb_sent = 0
for obj_type, objs in TEST_OBJECTS.items():
if obj_type in ("origin_visit", "origin_visit_status"):
# these are unrelated with what we want to test here
continue
method = getattr(storage, obj_type + "_add")
> method(objs)
.tox/py3/lib/python3.7/site-packages/swh/storage/tests/test_replay.py:353:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <swh.storage.in_memory.InMemoryStorage object at 0x7f6134126630>
ids = [ExtID(extid_type='git256', extid=b'\x03\x03\x03\x03\x03\x03\x03\x03\x03\x03\x03\x03\x03\x03\x03\x03\x03\x03\x03\x03\x...', object_type=<ObjectType.REVISION: 'rev'>), id=b'\xb6\xf9ap\x1a\xf5\x89\xea{\xeb\xf8\xf8\xea\x85\x9e\x0b\xc6\xe8#:')]
def extid_add(self, ids: List[ExtID]) -> Dict[str, int]:
if not self._allow_overwrite:
extids = [
extid
for extid in ids
if not self._cql_runner.extid_get_from_pk(
extid_type=extid.extid_type, extid=extid.extid, target=extid.target,
)
]
> self.journal_writer.extid_add(extids)
E UnboundLocalError: local variable 'extids' referenced before assignment
.tox/py3/lib/python3.7/site-packages/swh/storage/cassandra/storage.py:1398: UnboundLocalError
TEST RESULT
TEST RESULT
- Run At
- Apr 29 2021, 11:06 AM