cli_runner = <click.testing.CliRunner object at 0x7fcb16c9b320>
swh_config = '/tmp/pytest-of-jenkins/pytest-0/test_cli_journal_client_index_1/indexer.yml'
kafka_prefix = 'poywbzwmhy', kafka_server = '127.0.0.1:40417'
consumer = <cimpl.Consumer object at 0x7fcb16a51b88>
idx_storage = <swh.indexer.storage.IndexerStorage object at 0x7fcb14591e10>
storage = <swh.storage.postgresql.storage.Storage object at 0x7fcb14591d68>
mocker = <pytest_mock.plugin.MockerFixture object at 0x7fcb14591e48>
swh_indexer_config = {'compute_checksums': ['blake2b512'], 'indexer_storage': {'cls': 'local', 'db': "user=postgres password=xxx dbname=tes...cheduler': {'cls': 'local', 'db': "user=postgres password=xxx dbname=tests host=127.0.0.1 port=16585 options=''"}, ...}
indexer_name = '*'
@pytest.mark.parametrize("indexer_name", ["origin_intrinsic_metadata", "*"])
def test_cli_journal_client_index__origin_intrinsic_metadata(
cli_runner,
swh_config,
kafka_prefix: str,
kafka_server,
consumer: Consumer,
idx_storage,
storage,
mocker,
swh_indexer_config,
indexer_name: str,
):
"""Test the 'swh indexer journal-client' cli tool."""
journal_writer = get_journal_writer(
"kafka",
brokers=[kafka_server],
prefix=kafka_prefix,
client_id="test producer",
value_sanitizer=lambda object_type, value: value,
flush_timeout=3, # fail early if something is going wrong
)
visit_statuses = [
OriginVisitStatus(
origin="file:///dev/zero",
visit=1,
date=now(),
status="full",
snapshot=None,
),
OriginVisitStatus(
origin="file:///dev/foobar",
visit=2,
date=now(),
status="full",
snapshot=None,
),
OriginVisitStatus(
origin="file:///tmp/spamegg",
visit=3,
date=now(),
status="full",
snapshot=None,
),
OriginVisitStatus(
origin="file:///dev/0002",
visit=6,
date=now(),
status="full",
snapshot=None,
),
OriginVisitStatus( # will be filtered out due to its 'partial' status
origin="file:///dev/0000",
visit=4,
date=now(),
status="partial",
snapshot=None,
),
OriginVisitStatus( # will be filtered out due to its 'ongoing' status
origin="file:///dev/0001",
visit=5,
date=now(),
status="ongoing",
snapshot=None,
),
]
journal_writer.write_additions("origin_visit_status", visit_statuses)
visit_statuses_full = [vs for vs in visit_statuses if vs.status == "full"]
storage.revision_add([REVISION])
mocker.patch(
"swh.indexer.metadata.get_head_swhid",
return_value=REVISION.swhid(),
)
mocker.patch(
"swh.indexer.metadata.DirectoryMetadataIndexer.index",
return_value=[
DirectoryIntrinsicMetadataRow(
id=DIRECTORY2.id,
indexer_configuration_id=1,
mappings=["cff"],
metadata={"foo": "bar"},
)
],
)
result = cli_runner.invoke(
indexer_cli_group,
[
"-C",
swh_config,
"journal-client",
indexer_name,
"--broker",
kafka_server,
"--prefix",
kafka_prefix,
"--group-id",
"test-consumer",
"--stop-after-objects",
len(visit_statuses),
],
> catch_exceptions=False,
)
.tox/py3/lib/python3.7/site-packages/swh/indexer/tests/test_cli.py:664:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.tox/py3/lib/python3.7/site-packages/click/testing.py:408: in invoke
return_value = cli.main(args=args or (), prog_name=prog_name, **extra)
.tox/py3/lib/python3.7/site-packages/click/core.py:1055: in main
rv = self.invoke(ctx)
.tox/py3/lib/python3.7/site-packages/click/core.py:1657: in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
.tox/py3/lib/python3.7/site-packages/click/core.py:1404: in invoke
return ctx.invoke(self.callback, **ctx.params)
.tox/py3/lib/python3.7/site-packages/click/core.py:760: in invoke
return __callback(*args, **kwargs)
.tox/py3/lib/python3.7/site-packages/click/decorators.py:26: in new_func
return f(get_current_context(), *args, **kwargs)
.tox/py3/lib/python3.7/site-packages/swh/indexer/cli.py:367: in journal_client
batch_size=batch_size,
.tox/py3/lib/python3.7/site-packages/swh/journal/client.py:67: in get_journal_client
return JournalClient(**kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <swh.journal.client.JournalClient object at 0x7fcb146fffd0>
brokers = ('127.0.0.1:40417',), group_id = 'test-consumer'
prefix = 'poywbzwmhy'
object_types = ['origin_visit_status', 'raw_extrinsic_metadata', 'content']
privileged = False, stop_after_objects = 6, batch_size = None
process_timeout = None, auto_offset_reset = 'earliest', stop_on_eof = False
value_deserializer = None, kwargs = {}
def __init__(
self,
brokers: Union[str, List[str]],
group_id: str,
prefix: Optional[str] = None,
object_types: Optional[List[str]] = None,
privileged: bool = False,
stop_after_objects: Optional[int] = None,
batch_size: int = 200,
process_timeout: Optional[float] = None,
auto_offset_reset: str = "earliest",
stop_on_eof: bool = False,
value_deserializer: Optional[Callable[[str, bytes], Any]] = None,
**kwargs,
):
if prefix is None:
prefix = DEFAULT_PREFIX
if auto_offset_reset not in ACCEPTED_OFFSET_RESET:
raise ValueError(
"Option 'auto_offset_reset' only accept %s, not %s"
% (ACCEPTED_OFFSET_RESET, auto_offset_reset)
)
> if batch_size <= 0:
E TypeError: '<=' not supported between instances of 'NoneType' and 'int'
.tox/py3/lib/python3.7/site-packages/swh/journal/client.py:152: TypeError
TEST RESULT
TEST RESULT
- Run At
- Aug 31 2022, 5:26 PM