Changeset View
Changeset View
Standalone View
Standalone View
swh/icinga_plugins/tests/test_vault.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019-2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import time | import time | ||||
from click.testing import CliRunner | from swh.icinga_plugins.tests.utils import invoke | ||||
from swh.icinga_plugins.cli import icinga_cli_group | |||||
from .web_scenario import WebScenario | from .web_scenario import WebScenario | ||||
dir_id = "ab" * 20 | dir_id = "ab" * 20 | ||||
response_pending = { | response_pending = { | ||||
"obj_id": dir_id, | "obj_id": dir_id, | ||||
"obj_type": "directory", | "obj_type": "directory", | ||||
Show All 27 Lines | |||||
class FakeStorage: | class FakeStorage: | ||||
def __init__(self, foo, **kwargs): | def __init__(self, foo, **kwargs): | ||||
pass | pass | ||||
def directory_get_random(self): | def directory_get_random(self): | ||||
return bytes.fromhex(dir_id) | return bytes.fromhex(dir_id) | ||||
def invoke(args, catch_exceptions=False): | |||||
runner = CliRunner() | |||||
result = runner.invoke(icinga_cli_group, args) | |||||
if not catch_exceptions and result.exception: | |||||
print(result.output) | |||||
raise result.exception | |||||
return result | |||||
def test_vault_immediate_success(requests_mock, mocker, mocked_time): | def test_vault_immediate_success(requests_mock, mocker, mocked_time): | ||||
scenario = WebScenario() | scenario = WebScenario() | ||||
url = f"mock://swh-web.example.org/api/1/vault/directory/{dir_id}/" | url = f"mock://swh-web.example.org/api/1/vault/directory/{dir_id}/" | ||||
scenario.add_step("get", url, {}, status_code=404) | scenario.add_step("get", url, {}, status_code=404) | ||||
scenario.add_step("post", url, response_pending) | scenario.add_step("post", url, response_pending) | ||||
scenario.add_step("get", url, response_done) | scenario.add_step("get", url, response_done) | ||||
▲ Show 20 Lines • Show All 221 Lines • Show Last 20 Lines |