swh_storage_backend_config = {'cls': 'local', 'db': 'postgresql://postgres@127.0.0.1:13153/tests', 'journal_writer': {'brokers': ['127.0.0.1:50499'], 'client_id': 'kafka_writer-1', 'cls': 'kafka', 'prefix': 'ztdpgwfqas-1'}, 'objstorage': {'args': {}, 'cls': 'memory'}}
kafka_prefix = 'ztdpgwfqas', kafka_consumer_group = 'test-consumer-ztdpgwfqas'
kafka_server = '127.0.0.1:50499'
@patch("swh.storage.backfill.RANGE_GENERATORS", RANGE_GENERATORS)
def test_backfiller(
swh_storage_backend_config,
kafka_prefix: str,
kafka_consumer_group: str,
kafka_server: str,
):
prefix1 = f"{kafka_prefix}-1"
prefix2 = f"{kafka_prefix}-2"
journal1 = {
"cls": "kafka",
"brokers": [kafka_server],
"client_id": "kafka_writer-1",
"prefix": prefix1,
}
swh_storage_backend_config["journal_writer"] = journal1
storage = get_storage(**swh_storage_backend_config)
# fill the storage and the journal (under prefix1)
for object_type, objects in TEST_OBJECTS.items():
method = getattr(storage, object_type + "_add")
method(objects)
# now apply the backfiller on the storage to fill the journal under prefix2
backfiller_config = {
"brokers": [kafka_server],
"client_id": "kafka_writer-2",
"prefix": prefix2,
"storage_dbconn": swh_storage_backend_config["db"],
}
# Backfilling
backfiller = JournalBackfiller(backfiller_config)
for object_type in TEST_OBJECTS:
backfiller.run(object_type, None, None)
# now check journal content are the same under both topics
# use the replayer scaffolding to fill storages to make is a bit easier
# Replaying #1
sto1 = get_storage(cls="memory")
replayer1 = JournalClient(
brokers=kafka_server,
group_id=f"{kafka_consumer_group}-1",
prefix=prefix1,
stop_on_eof=True,
)
worker_fn1 = functools.partial(process_replay_objects, storage=sto1)
> replayer1.process(worker_fn1)
.tox/py3/lib/python3.7/site-packages/swh/storage/tests/test_backfill.py:230:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.tox/py3/lib/python3.7/site-packages/swh/journal/client.py:264: in process
batch_processed, at_eof = self.handle_messages(messages, worker_fn)
.tox/py3/lib/python3.7/site-packages/swh/journal/client.py:291: in handle_messages
worker_fn(dict(objects))
.tox/py3/lib/python3.7/site-packages/swh/storage/replay.py:55: in process_replay_objects
_insert_objects(object_type, objects, storage)
.tox/py3/lib/python3.7/site-packages/swh/storage/replay.py:127: in _insert_objects
model_objs.append(converter_fn(obj))
.tox/py3/lib/python3.7/site-packages/swh/model/model.py:73: in from_dict
return cls(**d)
<attrs generated init swh.model.model.OriginVisit>:12: in __init__
__attr_validator_date(self, __attr_date, self.date)
.tox/py3/lib/python3.7/site-packages/attrs_strict/_type_validation.py:63: in _validator
_validate_elements(attribute, field, attribute.type)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
attribute = Attribute(name='date', default=NOTHING, validator=<function type_validator.<locals>._validator at 0x7fcd840ed510>, rep...True, hash=None, init=True, metadata=mappingproxy({}), type=<class 'datetime.datetime'>, converter=None, kw_only=False)
value = '2018-11-27 17:20:39+00:00', expected_type = <class 'datetime.datetime'>
def _validate_elements(attribute, value, expected_type):
if expected_type is None:
return
base_type = _get_base_type(expected_type)
if base_type == typing.Any:
return
if base_type != typing.Union and not isinstance( # type: ignore
value, base_type
):
> raise AttributeTypeError(value, attribute)
E attrs_strict._error.AttributeTypeError: date must be <class 'datetime.datetime'> (got 2018-11-27 17:20:39+00:00 that is a <class 'str'>)
.tox/py3/lib/python3.7/site-packages/attrs_strict/_type_validation.py:80: AttributeTypeError
TEST RESULT
TEST RESULT
- Run At
- Jun 24 2020, 3:16 PM