swh_storage_backend_config = {'check_config': {'check_write': True}, 'cls': 'local', 'db': "dbname=storage user=postgres host=127.0.0.1 port=31785 ...riter': {'brokers': ['127.0.0.1:47457'], 'client_id': 'kafka_writer-1', 'cls': 'kafka', 'prefix': 'ndzoegbslf-1'}, ...}
kafka_prefix = 'ndzoegbslf', kafka_consumer_group = 'test-consumer-ndzoegbslf'
kafka_server = '127.0.0.1:47457'
caplog = <_pytest.logging.LogCaptureFixture object at 0x7f025db436a0>
@patch("swh.storage.backfill.RANGE_GENERATORS", RANGE_GENERATORS)
def test_backfiller(
swh_storage_backend_config,
kafka_prefix: str,
kafka_consumer_group: str,
kafka_server: str,
caplog,
):
prefix1 = f"{kafka_prefix}-1"
prefix2 = f"{kafka_prefix}-2"
journal1 = {
"cls": "kafka",
"brokers": [kafka_server],
"client_id": "kafka_writer-1",
"prefix": prefix1,
}
swh_storage_backend_config["journal_writer"] = journal1
storage = get_storage(**swh_storage_backend_config)
# fill the storage and the journal (under prefix1)
for object_type, objects in TEST_OBJECTS.items():
method = getattr(storage, object_type + "_add")
method(objects)
# now apply the backfiller on the storage to fill the journal under prefix2
backfiller_config = {
"journal_writer": {
"brokers": [kafka_server],
"client_id": "kafka_writer-2",
"prefix": prefix2,
},
"storage": swh_storage_backend_config,
}
# Backfilling
backfiller = JournalBackfiller(backfiller_config)
for object_type in TEST_OBJECTS:
> backfiller.run(object_type, None, None)
.tox/py3/lib/python3.7/site-packages/swh/storage/tests/test_backfill.py:261:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.tox/py3/lib/python3.7/site-packages/swh/storage/backfill.py:637: in run
writer.write_additions(object_type, objects)
.tox/py3/lib/python3.7/site-packages/swh/storage/writer.py:67: in write_additions
self.journal.write_additions(object_type, values)
.tox/py3/lib/python3.7/site-packages/swh/journal/writer/kafka.py:246: in write_additions
for object_ in objects:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
db = <swh.core.db.BaseDb object at 0x7f025db43b00>
obj_type = 'metadata_authority', start = None, end = None
def fetch(db, obj_type, start, end):
"""Fetch all obj_type's identifiers from db.
This opens one connection, stream objects and when done, close
the connection.
Args:
db (BaseDb): Db connection object
obj_type (str): Object type
start (Union[bytes|Tuple]): Range start identifier
end (Union[bytes|Tuple]): Range end identifier
Raises:
ValueError if obj_type is not supported
Yields:
Objects in the given range
"""
query, where_args, column_aliases = compute_query(obj_type, start, end)
converter = CONVERTERS.get(obj_type)
with db.cursor() as cursor:
logger.debug("Fetching data for table %s", obj_type)
logger.debug("query: %s %s", query, where_args)
> cursor.execute(query, where_args)
E psycopg2.errors.UndefinedColumn: column "metadata" does not exist
E LINE 2: select type,url,metadata
E ^
.tox/py3/lib/python3.7/site-packages/swh/storage/backfill.py:529: UndefinedColumn
TEST RESULT
TEST RESULT
- Run At
- May 4 2021, 4:16 PM