diff --git a/setup.py b/setup.py index 01a9bde..6925684 100755 --- a/setup.py +++ b/setup.py @@ -1,81 +1,81 @@ #!/usr/bin/env python3 # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from setuptools import setup, find_packages - -from os import path, walk from io import open +from os import path, walk + +from setuptools import find_packages, setup here = path.abspath(path.dirname(__file__)) # Get the long description from the README file with open(path.join(here, 'README.md'), encoding='utf-8') as f: long_description = f.read() def parse_requirements(name=None): if name: reqf = 'requirements-%s.txt' % name else: reqf = 'requirements.txt' requirements = [] if not path.exists(reqf): return requirements with open(reqf) as f: for line in f.readlines(): line = line.strip() if not line or line.startswith('#'): continue requirements.append(line) return requirements # package generated static assets as module data files data_files = [] for root, _, files in walk('data/'): root_files = [path.join(root, i) for i in files] data_files.append((path.join('share/swh/icinga-plugins', root), root_files)) setup( name='swh.icinga_plugins', description='Icinga plugins for Software Heritage infrastructure ' 'monitoring', long_description=long_description, long_description_content_type='text/markdown', python_requires=">=3.7", author='Software Heritage developers', author_email='swh-devel@inria.fr', url='https://forge.softwareheritage.org/diffusion/swh-icinga-plugins', packages=find_packages(), # packages's modules install_requires=parse_requirements() + parse_requirements('swh'), tests_require=parse_requirements('test'), setup_requires=['setuptools-scm'], use_scm_version=True, extras_require={'testing': parse_requirements('test')}, include_package_data=True, entry_points=''' [swh.cli.subcommands] icinga_plugins=swh.icinga_plugins.cli:cli ''', classifiers=[ "Programming Language :: Python :: 3", "Intended Audience :: Developers", "License :: OSI Approved :: GNU General Public License v3 (GPLv3)", "Operating System :: OS Independent", "Development Status :: 3 - Alpha", ], project_urls={ 'Bug Reports': 'https://forge.softwareheritage.org/maniphest', 'Funding': 'https://www.softwareheritage.org/donate', 'Source': 'https://forge.softwareheritage.org/source/swh-icinga-plugins', }, data_files=data_files ) diff --git a/swh/icinga_plugins/deposit.py b/swh/icinga_plugins/deposit.py index 92fffe1..03da3e7 100644 --- a/swh/icinga_plugins/deposit.py +++ b/swh/icinga_plugins/deposit.py @@ -1,122 +1,122 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import sys import time -from .base_check import BaseCheck - from swh.deposit.client import PublicApiDepositClient +from .base_check import BaseCheck + class DepositCheck(BaseCheck): TYPE = 'DEPOSIT' DEFAULT_WARNING_THRESHOLD = 120 DEFAULT_CRITICAL_THRESHOLD = 3600 def __init__(self, obj): super().__init__(obj) self._poll_interval = obj['poll_interval'] self._archive_path = obj['archive'] self._metadata_path = obj['metadata'] self._collection = obj['collection'] self._client = PublicApiDepositClient({ 'url': obj['server'], 'auth': { 'username': obj['username'], 'password': obj['password'], }, }) def upload_deposit(self): result = self._client.deposit_create( archive=self._archive_path, metadata=self._metadata_path, collection=self._collection, in_progress=False, slug='check-deposit-%s' % datetime.datetime.now().isoformat()) self._deposit_id = result['deposit_id'] return result def get_deposit_status(self): return self._client.deposit_status( collection=self._collection, deposit_id=self._deposit_id) def wait_while_status(self, statuses, start_time, metrics, result): while result['deposit_status'] in statuses: metrics['total_time'] = time.time() - start_time if metrics['total_time'] > self.critical_threshold: self.print_result( 'CRITICAL', f'Timed out while in status ' f'{result["deposit_status"]} ' f'({metrics["total_time"]}s seconds since deposit ' f'started)', **metrics) sys.exit(2) time.sleep(self._poll_interval) result = self.get_deposit_status() return result def main(self): start_time = time.time() metrics = {} # Upload the archive and metadata result = self.upload_deposit() metrics['upload_time'] = time.time() - start_time # Wait for validation result = self.wait_while_status( ['deposited'], start_time, metrics, result) metrics['total_time'] = time.time() - start_time metrics['validation_time'] = \ metrics['total_time'] - metrics['upload_time'] # Check validation succeeded if result['deposit_status'] == 'rejected': self.print_result( 'CRITICAL', f'Deposit was rejected: {result["deposit_status_detail"]}', **metrics) return 2 # Wait for loading result = self.wait_while_status( ['verified', 'loading'], start_time, metrics, result) metrics['total_time'] = time.time() - start_time metrics['load_time'] = ( metrics['total_time'] - metrics['upload_time'] - metrics['validation_time']) # Check loading succeeded if result['deposit_status'] == 'failed': self.print_result( 'CRITICAL', f'Deposit loading failed: {result["deposit_status_detail"]}', **metrics) return 2 # Check for unexpected status if result['deposit_status'] != 'done': self.print_result( 'CRITICAL', f'Deposit got unexpected status: {result["deposit_status"]} ' f'({result["deposit_status_detail"]})', **metrics) return 2 # Everything went fine, check total time wasn't too large and # print result (status_code, status) = self.get_status(metrics['total_time']) self.print_result( status, f'Deposit took {metrics["total_time"]:.2f}s and succeeded.', **metrics) return status_code diff --git a/swh/icinga_plugins/tests/test_deposit.py b/swh/icinga_plugins/tests/test_deposit.py index 7df3095..453e145 100644 --- a/swh/icinga_plugins/tests/test_deposit.py +++ b/swh/icinga_plugins/tests/test_deposit.py @@ -1,375 +1,375 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import io import os import tarfile import time from click.testing import CliRunner import pytest from swh.icinga_plugins.cli import cli -from .web_scenario import WebScenario +from .web_scenario import WebScenario BASE_URL = 'http://swh-deposit.example.org/1' COMMON_OPTIONS = [ '--server', BASE_URL, '--username', 'test', '--password', 'test', '--collection', 'testcol', ] SAMPLE_METADATA = ''' Test Software swh test-software No One ''' ENTRY_TEMPLATE = ''' 42 2019-12-19 18:11:00 foo.tar.gz {status} http://purl.org/net/sword/package/SimpleZip ''' STATUS_TEMPLATE = ''' 42 {status} {status_detail} ''' @pytest.fixture(scope='session') def tmp_path(tmp_path_factory): return tmp_path_factory.mktemp(__name__) @pytest.fixture(scope='session') def sample_metadata(tmp_path): """Returns a sample metadata file's path """ path = os.path.join(tmp_path, 'metadata.xml') with open(path, 'w') as fd: fd.write(SAMPLE_METADATA) return path @pytest.fixture(scope='session') def sample_archive(tmp_path): """Returns a sample archive's path """ path = os.path.join(tmp_path, 'archive.tar.gz') with tarfile.open(path, 'w:gz') as tf: tf.addfile( tarfile.TarInfo('hello.py'), io.BytesIO(b'print("Hello world")')) return path def invoke(args, catch_exceptions=False): runner = CliRunner() result = runner.invoke(cli, args) if not catch_exceptions and result.exception: print(result.output) raise result.exception return result def test_deposit_immediate_success( requests_mock, mocker, sample_archive, sample_metadata, mocked_time): scenario = WebScenario() scenario.add_step( 'post', BASE_URL + '/testcol/', ENTRY_TEMPLATE.format(status='done')) scenario.install_mock(requests_mock) result = invoke([ 'check-deposit', *COMMON_OPTIONS, 'single', '--archive', sample_archive, '--metadata', sample_metadata, ]) assert result.output == ( "DEPOSIT OK - Deposit took 0.00s and succeeded.\n" "| 'load_time' = 0.00s\n" "| 'total_time' = 0.00s\n" "| 'upload_time' = 0.00s\n" "| 'validation_time' = 0.00s\n") assert result.exit_code == 0, result.output def test_deposit_delays( requests_mock, mocker, sample_archive, sample_metadata, mocked_time): scenario = WebScenario() scenario.add_step( 'post', BASE_URL + '/testcol/', ENTRY_TEMPLATE.format(status='deposited')) scenario.add_step( 'get', BASE_URL + '/testcol/42/status/', STATUS_TEMPLATE.format(status='verified', status_detail='')) scenario.add_step( 'get', BASE_URL + '/testcol/42/status/', STATUS_TEMPLATE.format(status='loading', status_detail='')) scenario.add_step( 'get', BASE_URL + '/testcol/42/status/', STATUS_TEMPLATE.format(status='done', status_detail='')) scenario.install_mock(requests_mock) result = invoke([ 'check-deposit', *COMMON_OPTIONS, 'single', '--archive', sample_archive, '--metadata', sample_metadata, ]) assert result.output == ( "DEPOSIT OK - Deposit took 30.00s and succeeded.\n" "| 'load_time' = 20.00s\n" "| 'total_time' = 30.00s\n" "| 'upload_time' = 0.00s\n" "| 'validation_time' = 10.00s\n") assert result.exit_code == 0, result.output def test_deposit_delay_warning( requests_mock, mocker, sample_archive, sample_metadata, mocked_time): scenario = WebScenario() scenario.add_step( 'post', BASE_URL + '/testcol/', ENTRY_TEMPLATE.format(status='deposited')) scenario.add_step( 'get', BASE_URL + '/testcol/42/status/', STATUS_TEMPLATE.format(status='verified', status_detail='')) scenario.add_step( 'get', BASE_URL + '/testcol/42/status/', STATUS_TEMPLATE.format(status='done', status_detail='')) scenario.install_mock(requests_mock) result = invoke([ '--warning', '15', 'check-deposit', *COMMON_OPTIONS, 'single', '--archive', sample_archive, '--metadata', sample_metadata, ], catch_exceptions=True) assert result.output == ( "DEPOSIT WARNING - Deposit took 20.00s and succeeded.\n" "| 'load_time' = 10.00s\n" "| 'total_time' = 20.00s\n" "| 'upload_time' = 0.00s\n" "| 'validation_time' = 10.00s\n") assert result.exit_code == 1, result.output def test_deposit_delay_critical( requests_mock, mocker, sample_archive, sample_metadata, mocked_time): scenario = WebScenario() scenario.add_step( 'post', BASE_URL + '/testcol/', ENTRY_TEMPLATE.format(status='deposited')) scenario.add_step( 'get', BASE_URL + '/testcol/42/status/', STATUS_TEMPLATE.format(status='verified', status_detail='')) scenario.add_step( 'get', BASE_URL + '/testcol/42/status/', STATUS_TEMPLATE.format(status='done', status_detail=''), callback=lambda: time.sleep(60)) scenario.install_mock(requests_mock) result = invoke([ '--critical', '50', 'check-deposit', *COMMON_OPTIONS, 'single', '--archive', sample_archive, '--metadata', sample_metadata, ], catch_exceptions=True) assert result.output == ( "DEPOSIT CRITICAL - Deposit took 80.00s and succeeded.\n" "| 'load_time' = 70.00s\n" "| 'total_time' = 80.00s\n" "| 'upload_time' = 0.00s\n" "| 'validation_time' = 10.00s\n") assert result.exit_code == 2, result.output def test_deposit_timeout( requests_mock, mocker, sample_archive, sample_metadata, mocked_time): scenario = WebScenario() scenario.add_step( 'post', BASE_URL + '/testcol/', ENTRY_TEMPLATE.format(status='deposited'), callback=lambda: time.sleep(1500)) scenario.add_step( 'get', BASE_URL + '/testcol/42/status/', STATUS_TEMPLATE.format(status='verified', status_detail=''), callback=lambda: time.sleep(1500)) scenario.add_step( 'get', BASE_URL + '/testcol/42/status/', STATUS_TEMPLATE.format(status='loading', status_detail=''), callback=lambda: time.sleep(1500)) scenario.install_mock(requests_mock) result = invoke([ 'check-deposit', *COMMON_OPTIONS, 'single', '--archive', sample_archive, '--metadata', sample_metadata, ], catch_exceptions=True) assert result.output == ( "DEPOSIT CRITICAL - Timed out while in status loading " "(4520.0s seconds since deposit started)\n" "| 'total_time' = 4520.00s\n" "| 'upload_time' = 1500.00s\n" "| 'validation_time' = 1510.00s\n") assert result.exit_code == 2, result.output def test_deposit_rejected( requests_mock, mocker, sample_archive, sample_metadata, mocked_time): scenario = WebScenario() scenario.add_step( 'post', BASE_URL + '/testcol/', ENTRY_TEMPLATE.format(status='deposited')) scenario.add_step( 'get', BASE_URL + '/testcol/42/status/', STATUS_TEMPLATE.format(status='rejected', status_detail='booo')) scenario.install_mock(requests_mock) result = invoke([ 'check-deposit', *COMMON_OPTIONS, 'single', '--archive', sample_archive, '--metadata', sample_metadata, ], catch_exceptions=True) assert result.output == ( "DEPOSIT CRITICAL - Deposit was rejected: booo\n" "| 'total_time' = 10.00s\n" "| 'upload_time' = 0.00s\n" "| 'validation_time' = 10.00s\n") assert result.exit_code == 2, result.output def test_deposit_failed( requests_mock, mocker, sample_archive, sample_metadata, mocked_time): scenario = WebScenario() scenario.add_step( 'post', BASE_URL + '/testcol/', ENTRY_TEMPLATE.format(status='deposited')) scenario.add_step( 'get', BASE_URL + '/testcol/42/status/', STATUS_TEMPLATE.format(status='verified', status_detail='')) scenario.add_step( 'get', BASE_URL + '/testcol/42/status/', STATUS_TEMPLATE.format(status='loading', status_detail='')) scenario.add_step( 'get', BASE_URL + '/testcol/42/status/', STATUS_TEMPLATE.format(status='failed', status_detail='booo')) scenario.install_mock(requests_mock) result = invoke([ 'check-deposit', *COMMON_OPTIONS, 'single', '--archive', sample_archive, '--metadata', sample_metadata, ], catch_exceptions=True) assert result.output == ( "DEPOSIT CRITICAL - Deposit loading failed: booo\n" "| 'load_time' = 20.00s\n" "| 'total_time' = 30.00s\n" "| 'upload_time' = 0.00s\n" "| 'validation_time' = 10.00s\n") assert result.exit_code == 2, result.output def test_deposit_unexpected_status( requests_mock, mocker, sample_archive, sample_metadata, mocked_time): scenario = WebScenario() scenario.add_step( 'post', BASE_URL + '/testcol/', ENTRY_TEMPLATE.format(status='deposited')) scenario.add_step( 'get', BASE_URL + '/testcol/42/status/', STATUS_TEMPLATE.format(status='verified', status_detail='')) scenario.add_step( 'get', BASE_URL + '/testcol/42/status/', STATUS_TEMPLATE.format(status='loading', status_detail='')) scenario.add_step( 'get', BASE_URL + '/testcol/42/status/', STATUS_TEMPLATE.format(status='what', status_detail='booo')) scenario.install_mock(requests_mock) result = invoke([ 'check-deposit', *COMMON_OPTIONS, 'single', '--archive', sample_archive, '--metadata', sample_metadata, ], catch_exceptions=True) assert result.output == ( "DEPOSIT CRITICAL - Deposit got unexpected status: what (booo)\n" "| 'load_time' = 20.00s\n" "| 'total_time' = 30.00s\n" "| 'upload_time' = 0.00s\n" "| 'validation_time' = 10.00s\n") assert result.exit_code == 2, result.output diff --git a/swh/icinga_plugins/tests/test_vault.py b/swh/icinga_plugins/tests/test_vault.py index ee99db4..b6ef51a 100644 --- a/swh/icinga_plugins/tests/test_vault.py +++ b/swh/icinga_plugins/tests/test_vault.py @@ -1,256 +1,256 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import time from click.testing import CliRunner from swh.icinga_plugins.cli import cli -from .web_scenario import WebScenario +from .web_scenario import WebScenario dir_id = 'ab'*20 response_pending = { "obj_id": dir_id, "obj_type": "directory", "progress_message": "foo", "status": "pending" } response_done = { "fetch_url": f"/api/1/vault/directory/{dir_id}/raw/", "id": 9, "obj_id": dir_id, "obj_type": "directory", "status": "done" } response_failed = { "obj_id": dir_id, "obj_type": "directory", "progress_message": "foobar", "status": "failed" } response_unknown_status = { "obj_id": dir_id, "obj_type": "directory", "progress_message": "what", "status": "boo" } class FakeStorage: def __init__(self, foo, **kwargs): pass def directory_get_random(self): return bytes.fromhex(dir_id) def invoke(args, catch_exceptions=False): runner = CliRunner() result = runner.invoke(cli, args) if not catch_exceptions and result.exception: print(result.output) raise result.exception return result def test_vault_immediate_success(requests_mock, mocker, mocked_time): scenario = WebScenario() url = f'mock://swh-web.example.org/api/1/vault/directory/{dir_id}/' scenario.add_step('get', url, {}, status_code=404) scenario.add_step('post', url, response_pending) scenario.add_step('get', url, response_done) scenario.install_mock(requests_mock) get_storage_mock = mocker.patch('swh.icinga_plugins.vault.get_storage') get_storage_mock.side_effect = FakeStorage result = invoke([ 'check-vault', '--swh-web-url', 'mock://swh-web.example.org', '--swh-storage-url', 'foo://example.org', 'directory', ]) assert result.output == ( f"VAULT OK - cooking directory {dir_id} took " f"10.00s and succeeded.\n" f"| 'total_time' = 10.00s\n") assert result.exit_code == 0, result.output def test_vault_delayed_success(requests_mock, mocker, mocked_time): scenario = WebScenario() url = f'mock://swh-web.example.org/api/1/vault/directory/{dir_id}/' scenario.add_step('get', url, {}, status_code=404) scenario.add_step('post', url, response_pending) scenario.add_step('get', url, response_pending) scenario.add_step('get', url, response_done) scenario.install_mock(requests_mock) get_storage_mock = mocker.patch('swh.icinga_plugins.vault.get_storage') get_storage_mock.side_effect = FakeStorage result = invoke([ 'check-vault', '--swh-web-url', 'mock://swh-web.example.org', '--swh-storage-url', 'foo://example.org', 'directory', ]) assert result.output == ( f"VAULT OK - cooking directory {dir_id} took " f"20.00s and succeeded.\n" f"| 'total_time' = 20.00s\n") assert result.exit_code == 0, result.output def test_vault_failure(requests_mock, mocker, mocked_time): scenario = WebScenario() url = f'mock://swh-web.example.org/api/1/vault/directory/{dir_id}/' scenario.add_step('get', url, {}, status_code=404) scenario.add_step('post', url, response_pending) scenario.add_step('get', url, response_failed) scenario.install_mock(requests_mock) get_storage_mock = mocker.patch('swh.icinga_plugins.vault.get_storage') get_storage_mock.side_effect = FakeStorage result = invoke([ 'check-vault', '--swh-web-url', 'mock://swh-web.example.org', '--swh-storage-url', 'foo://example.org', 'directory', ], catch_exceptions=True) assert result.output == ( f"VAULT CRITICAL - cooking directory {dir_id} took " f"10.00s and failed with: foobar\n" f"| 'total_time' = 10.00s\n") assert result.exit_code == 2, result.output def test_vault_unknown_status(requests_mock, mocker, mocked_time): scenario = WebScenario() url = f'mock://swh-web.example.org/api/1/vault/directory/{dir_id}/' scenario.add_step('get', url, {}, status_code=404) scenario.add_step('post', url, response_pending) scenario.add_step('get', url, response_unknown_status) scenario.install_mock(requests_mock) get_storage_mock = mocker.patch('swh.icinga_plugins.vault.get_storage') get_storage_mock.side_effect = FakeStorage result = invoke([ 'check-vault', '--swh-web-url', 'mock://swh-web.example.org', '--swh-storage-url', 'foo://example.org', 'directory', ], catch_exceptions=True) assert result.output == ( f"VAULT CRITICAL - cooking directory {dir_id} took " f"10.00s and resulted in unknown status: boo\n" f"| 'total_time' = 10.00s\n") assert result.exit_code == 2, result.output def test_vault_timeout(requests_mock, mocker, mocked_time): scenario = WebScenario() url = f'mock://swh-web.example.org/api/1/vault/directory/{dir_id}/' scenario.add_step('get', url, {}, status_code=404) scenario.add_step('post', url, response_pending) scenario.add_step('get', url, response_pending) scenario.add_step('get', url, response_pending, callback=lambda: time.sleep(4000)) scenario.install_mock(requests_mock) get_storage_mock = mocker.patch('swh.icinga_plugins.vault.get_storage') get_storage_mock.side_effect = FakeStorage result = invoke([ 'check-vault', '--swh-web-url', 'mock://swh-web.example.org', '--swh-storage-url', 'foo://example.org', 'directory', ], catch_exceptions=True) assert result.output == ( f"VAULT CRITICAL - cooking directory {dir_id} took more than " f"4020.00s and has status: foo\n" f"| 'total_time' = 4020.00s\n") assert result.exit_code == 2, result.output def test_vault_cached_directory(requests_mock, mocker, mocked_time): """First serves a directory that's already in the cache, to test that vault_check requests another one.""" scenario = WebScenario() url = f'mock://swh-web.example.org/api/1/vault/directory/{dir_id}/' scenario.add_step('get', url, {}, status_code=200) scenario.add_step('get', url, {}, status_code=404) scenario.add_step('post', url, response_pending) scenario.add_step('get', url, response_done) scenario.install_mock(requests_mock) get_storage_mock = mocker.patch('swh.icinga_plugins.vault.get_storage') get_storage_mock.side_effect = FakeStorage result = invoke([ 'check-vault', '--swh-web-url', 'mock://swh-web.example.org', '--swh-storage-url', 'foo://example.org', 'directory', ]) assert result.output == ( f"VAULT OK - cooking directory {dir_id} took " f"10.00s and succeeded.\n" f"| 'total_time' = 10.00s\n") assert result.exit_code == 0, result.output def test_vault_no_directory(requests_mock, mocker, mocked_time): """Tests with an empty storage""" scenario = WebScenario() scenario.install_mock(requests_mock) get_storage_mock = mocker.patch('swh.icinga_plugins.vault.get_storage') get_storage_mock.side_effect = FakeStorage mocker.patch( f'{__name__}.FakeStorage.directory_get_random', return_value=None) result = invoke([ 'check-vault', '--swh-web-url', 'mock://swh-web.example.org', '--swh-storage-url', 'foo://example.org', 'directory', ], catch_exceptions=True) assert result.output == ( "VAULT CRITICAL - No directory exists in the archive.\n") assert result.exit_code == 2, result.output diff --git a/swh/icinga_plugins/tests/web_scenario.py b/swh/icinga_plugins/tests/web_scenario.py index 0a0c57e..846ee6e 100644 --- a/swh/icinga_plugins/tests/web_scenario.py +++ b/swh/icinga_plugins/tests/web_scenario.py @@ -1,90 +1,90 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Wrapper around requests-mock to mock successive responses from a web service. Tests can build successive steps by calling :py:meth:`WebScenario.add_step` with specifications of what endpoints should be called and in what order.""" from dataclasses import dataclass import json -from typing import List, Set, Optional, Callable +from typing import Callable, List, Optional, Set import requests_mock @dataclass(frozen=True) class Step: expected_method: str expected_url: str response: object status_code: int = 200 callback: Optional[Callable[[], int]] = None @dataclass(frozen=True) class Endpoint: method: str url: str class WebScenario: """Stores the state of the successive calls to the web service expected by tests.""" _steps: List[Step] _endpoints: Set[Endpoint] _current_step: int def __init__(self): self._steps = [] self._endpoints = set() self._current_step = 0 def add_endpoint(self, *args, **kwargs): """Adds an endpoint to be mocked. Arguments are the same as :py:class:Endpoint. """ self._endpoints.add(Endpoint(*args, **kwargs)) def add_step(self, *args, **kwargs): """Adds an expected call to the list of expected calls. Also automatically calls :py:meth:`add_endpoint` so the associated endpoint is mocked. Arguments are the same as :py:class:`Step`. """ step = Step(*args, **kwargs) self._steps.append(step) self.add_endpoint(step.expected_method, step.expected_url) def install_mock(self, mocker: requests_mock.Mocker): """Mocks entrypoints registered with :py:meth:`add_endpoint` (or :py:meth:`add_step`) using the provided mocker. """ for endpoint in self._endpoints: mocker.register_uri( endpoint.method.upper(), endpoint.url, text=self._request_callback) def _request_callback(self, request, context): step = self._steps[self._current_step] assert request.url == step.expected_url assert request.method.upper() == step.expected_method.upper() self._current_step += 1 context.status_code = step.status_code if step.callback: step.callback() if isinstance(step.response, str): return step.response else: return json.dumps(step.response)