swh_storage_backend_config = {'cls': 'local', 'db': 'postgresql://postgres@127.0.0.1:27340/tests', 'journal_writer': {'brokers': ['127.0.0.1:37923'], 'client_id': 'kafka_writer-1', 'cls': 'kafka', 'prefix': 'rohaqrhlcp-1'}, 'objstorage': {'args': {}, 'cls': 'memory'}}
kafka_prefix = 'rohaqrhlcp', kafka_consumer_group = 'test-consumer-rohaqrhlcp'
kafka_server = '127.0.0.1:37923'
@patch("swh.storage.backfill.RANGE_GENERATORS", RANGE_GENERATORS)
def test_backfiller(
swh_storage_backend_config,
kafka_prefix: str,
kafka_consumer_group: str,
kafka_server: str,
):
prefix1 = f"{kafka_prefix}-1"
prefix2 = f"{kafka_prefix}-2"
journal1 = {
"cls": "kafka",
"brokers": [kafka_server],
"client_id": "kafka_writer-1",
"prefix": prefix1,
}
swh_storage_backend_config["journal_writer"] = journal1
storage = get_storage(**swh_storage_backend_config)
# fill the storage and the journal (under prefix1)
for object_type, objects in TEST_OBJECTS.items():
method = getattr(storage, object_type + "_add")
method(objects)
# now apply the backfiller on the storage to fill the journal under prefix2
backfiller_config = {
"brokers": [kafka_server],
"client_id": "kafka_writer-2",
"prefix": prefix2,
"storage_dbconn": swh_storage_backend_config["db"],
}
# Backfilling
backfiller = JournalBackfiller(backfiller_config)
for object_type in TEST_OBJECTS:
backfiller.run(object_type, None, None)
# now check journal content are the same under both topics
# use the replayer scaffolding to fill storages to make is a bit easier
# Replaying #1
sto1 = get_storage(cls="memory")
replayer1 = JournalClient(
brokers=kafka_server,
group_id=f"{kafka_consumer_group}-1",
prefix=prefix1,
stop_on_eof=True,
)
worker_fn1 = functools.partial(process_replay_objects, storage=sto1)
replayer1.process(worker_fn1)
# Replaying #2
sto2 = get_storage(cls="memory")
replayer2 = JournalClient(
brokers=kafka_server,
group_id=f"{kafka_consumer_group}-2",
prefix=prefix2,
stop_on_eof=True,
)
worker_fn2 = functools.partial(process_replay_objects, storage=sto2)
replayer2.process(worker_fn2)
# Compare storages
> check_replayed(sto1, sto2)
.tox/py3/lib/python3.7/site-packages/swh/storage/tests/test_backfill.py:244:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
src = <swh.storage.in_memory.InMemoryStorage object at 0x7f8b76a75860>
dst = <swh.storage.in_memory.InMemoryStorage object at 0x7f8b76a75e80>
expected_anonymized = False
def check_replayed(src, dst, expected_anonymized=False):
"""Simple utility function to compare the content of 2 in_memory storages
If expected_anonymized is True, objects from the source storage are anonymized
before comparing with the destination storage.
"""
def maybe_anonymize(obj):
if expected_anonymized:
return obj.anonymize() or obj
return obj
expected_persons = {maybe_anonymize(person) for person in src._persons.values()}
got_persons = set(dst._persons.values())
assert got_persons == expected_persons
for attr in (
"contents",
"skipped_contents",
"directories",
"revisions",
"releases",
"snapshots",
"origins",
"origin_visit_statuses",
):
expected_objects = [
(id, maybe_anonymize(obj))
> for id, obj in sorted(getattr(src, f"_{attr}").items())
]
E AttributeError: 'InMemoryStorage' object has no attribute '_contents'
.tox/py3/lib/python3.7/site-packages/swh/storage/tests/test_replay.py:353: AttributeError
TEST RESULT
TEST RESULT
- Run At
- Aug 12 2020, 7:43 PM