swh_storage_backend_config = {'cls': 'local', 'db': 'postgresql://postgres@127.0.0.1:21217/tests', 'journal_writer': {'brokers': ['127.0.0.1:56779'], 'client_id': 'kafka_writer-1', 'cls': 'kafka', 'prefix': 'euqfkagjyj-1'}, 'objstorage': {'args': {}, 'cls': 'memory'}}
kafka_prefix = 'euqfkagjyj', kafka_consumer_group = 'test-consumer-euqfkagjyj'
kafka_server = '127.0.0.1:56779'
@patch("swh.storage.backfill.RANGE_GENERATORS", RANGE_GENERATORS)
def test_backfiller(
swh_storage_backend_config,
kafka_prefix: str,
kafka_consumer_group: str,
kafka_server: str,
):
prefix1 = f"{kafka_prefix}-1"
prefix2 = f"{kafka_prefix}-2"
journal1 = {
"cls": "kafka",
"brokers": [kafka_server],
"client_id": "kafka_writer-1",
"prefix": prefix1,
}
swh_storage_backend_config["journal_writer"] = journal1
storage = get_storage(**swh_storage_backend_config)
# fill the storage and the journal (under prefix1)
for object_type, objects in TEST_OBJECTS.items():
method = getattr(storage, object_type + "_add")
method(objects)
# now apply the backfiller on the storage to fill the journal under prefix2
backfiller_config = {
"brokers": [kafka_server],
"client_id": "kafka_writer-2",
"prefix": prefix2,
"storage_dbconn": swh_storage_backend_config["db"],
}
# Backfilling
backfiller = JournalBackfiller(backfiller_config)
for object_type in TEST_OBJECTS:
> backfiller.run(object_type, None, None)
.tox/py3/lib/python3.7/site-packages/swh/storage/tests/test_backfill.py:217:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.tox/py3/lib/python3.7/site-packages/swh/storage/backfill.py:538: in run
for obj in fetch(db, object_type, start=range_start, end=range_end,):
.tox/py3/lib/python3.7/site-packages/swh/storage/backfill.py:441: in fetch
record = converter(db, record)
.tox/py3/lib/python3.7/site-packages/swh/storage/backfill.py:225: in revision_converter
revision = db_to_revision(revision_d)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
db_revision = {'author_email': b'', 'author_fullname': b'foo', 'author_id': 1, 'author_name': b'foo', ...}
def db_to_revision(db_revision: Dict[str, Any]) -> Optional[Revision]:
"""Convert a database representation of a revision to its swh-model
representation."""
if db_revision["type"] is None:
assert all(
v is None for (k, v) in db_revision.items() if k not in ("id", "parents")
)
return None
author = db_to_author(
db_revision["author_fullname"],
db_revision["author_name"],
db_revision["author_email"],
)
date = db_to_date(
db_revision["date"],
db_revision["date_offset"],
db_revision["date_neg_utc_offset"],
)
committer = db_to_author(
db_revision["committer_fullname"],
db_revision["committer_name"],
db_revision["committer_email"],
)
committer_date = db_to_date(
db_revision["committer_date"],
db_revision["committer_date_offset"],
db_revision["committer_date_neg_utc_offset"],
)
assert author, "author is None"
assert committer, "committer is None"
parents = []
if "parents" in db_revision:
for parent in db_revision["parents"]:
if parent:
parents.append(parent)
metadata = db_revision["metadata"]
> extra_headers = db_revision["extra_headers"]
E KeyError: 'extra_headers'
.tox/py3/lib/python3.7/site-packages/swh/storage/converters.py:225: KeyError
TEST RESULT
TEST RESULT
- Run At
- Aug 19 2020, 2:28 PM