Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/tests/test_cli_journal.py
# Copyright (C) 2021 The Software Heritage developers | # Copyright (C) 2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import os | import os | ||||
from typing import Dict, List | from typing import Dict, List | ||||
from click.testing import CliRunner, Result | from click.testing import CliRunner, Result | ||||
from confluent_kafka import Producer | from confluent_kafka import Producer | ||||
import pytest | import pytest | ||||
import yaml | import yaml | ||||
from swh.journal.serializers import value_to_kafka | from swh.journal.serializers import value_to_kafka | ||||
from swh.scheduler import get_scheduler | from swh.scheduler import get_scheduler | ||||
from swh.scheduler.cli import cli | from swh.scheduler.cli import cli | ||||
from swh.scheduler.tests.test_journal_client import VISIT_STATUSES1 | from swh.scheduler.tests.test_journal_client import VISIT_STATUSES_1 | ||||
@pytest.fixture | @pytest.fixture | ||||
def swh_scheduler_cfg(postgresql_scheduler, kafka_server): | def swh_scheduler_cfg(postgresql_scheduler, kafka_server): | ||||
"""Journal client configuration ready""" | """Journal client configuration ready""" | ||||
return { | return { | ||||
"scheduler": {"cls": "local", "db": postgresql_scheduler.dsn,}, | "scheduler": {"cls": "local", "db": postgresql_scheduler.dsn,}, | ||||
"journal": { | "journal": { | ||||
▲ Show 20 Lines • Show All 60 Lines • ▼ Show 20 Lines | ): | ||||
swh_scheduler = get_scheduler(**swh_scheduler_cfg["scheduler"]) | swh_scheduler = get_scheduler(**swh_scheduler_cfg["scheduler"]) | ||||
producer = Producer( | producer = Producer( | ||||
{ | { | ||||
"bootstrap.servers": kafka_server, | "bootstrap.servers": kafka_server, | ||||
"client.id": "test visit-stats producer", | "client.id": "test visit-stats producer", | ||||
"acks": "all", | "acks": "all", | ||||
} | } | ||||
) | ) | ||||
visit_status = VISIT_STATUSES1[0] | visit_status = VISIT_STATUSES_1[0] | ||||
value = value_to_kafka(visit_status) | value = value_to_kafka(visit_status) | ||||
topic = "swh.journal.objects.origin_visit_status" | topic = "swh.journal.objects.origin_visit_status" | ||||
producer.produce(topic=topic, key=b"bogus-origin", value=value) | producer.produce(topic=topic, key=b"bogus-origin", value=value) | ||||
producer.flush() | producer.flush() | ||||
result = invoke( | result = invoke( | ||||
["journal-client", "--stop-after-objects", "1",], swh_scheduler_cfg_path, | ["journal-client", "--stop-after-objects", "1",], swh_scheduler_cfg_path, | ||||
Show All 12 Lines |