diff --git a/setup.py b/setup.py
index 01a9bde..6925684 100755
--- a/setup.py
+++ b/setup.py
@@ -1,81 +1,81 @@
#!/usr/bin/env python3
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from setuptools import setup, find_packages
-
-from os import path, walk
from io import open
+from os import path, walk
+
+from setuptools import find_packages, setup
here = path.abspath(path.dirname(__file__))
# Get the long description from the README file
with open(path.join(here, 'README.md'), encoding='utf-8') as f:
long_description = f.read()
def parse_requirements(name=None):
if name:
reqf = 'requirements-%s.txt' % name
else:
reqf = 'requirements.txt'
requirements = []
if not path.exists(reqf):
return requirements
with open(reqf) as f:
for line in f.readlines():
line = line.strip()
if not line or line.startswith('#'):
continue
requirements.append(line)
return requirements
# package generated static assets as module data files
data_files = []
for root, _, files in walk('data/'):
root_files = [path.join(root, i) for i in files]
data_files.append((path.join('share/swh/icinga-plugins', root),
root_files))
setup(
name='swh.icinga_plugins',
description='Icinga plugins for Software Heritage infrastructure '
'monitoring',
long_description=long_description,
long_description_content_type='text/markdown',
python_requires=">=3.7",
author='Software Heritage developers',
author_email='swh-devel@inria.fr',
url='https://forge.softwareheritage.org/diffusion/swh-icinga-plugins',
packages=find_packages(), # packages's modules
install_requires=parse_requirements() + parse_requirements('swh'),
tests_require=parse_requirements('test'),
setup_requires=['setuptools-scm'],
use_scm_version=True,
extras_require={'testing': parse_requirements('test')},
include_package_data=True,
entry_points='''
[swh.cli.subcommands]
icinga_plugins=swh.icinga_plugins.cli:cli
''',
classifiers=[
"Programming Language :: Python :: 3",
"Intended Audience :: Developers",
"License :: OSI Approved :: GNU General Public License v3 (GPLv3)",
"Operating System :: OS Independent",
"Development Status :: 3 - Alpha",
],
project_urls={
'Bug Reports': 'https://forge.softwareheritage.org/maniphest',
'Funding': 'https://www.softwareheritage.org/donate',
'Source':
'https://forge.softwareheritage.org/source/swh-icinga-plugins',
},
data_files=data_files
)
diff --git a/swh/icinga_plugins/deposit.py b/swh/icinga_plugins/deposit.py
index 92fffe1..03da3e7 100644
--- a/swh/icinga_plugins/deposit.py
+++ b/swh/icinga_plugins/deposit.py
@@ -1,122 +1,122 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import sys
import time
-from .base_check import BaseCheck
-
from swh.deposit.client import PublicApiDepositClient
+from .base_check import BaseCheck
+
class DepositCheck(BaseCheck):
TYPE = 'DEPOSIT'
DEFAULT_WARNING_THRESHOLD = 120
DEFAULT_CRITICAL_THRESHOLD = 3600
def __init__(self, obj):
super().__init__(obj)
self._poll_interval = obj['poll_interval']
self._archive_path = obj['archive']
self._metadata_path = obj['metadata']
self._collection = obj['collection']
self._client = PublicApiDepositClient({
'url': obj['server'],
'auth': {
'username': obj['username'],
'password': obj['password'],
},
})
def upload_deposit(self):
result = self._client.deposit_create(
archive=self._archive_path, metadata=self._metadata_path,
collection=self._collection, in_progress=False,
slug='check-deposit-%s' % datetime.datetime.now().isoformat())
self._deposit_id = result['deposit_id']
return result
def get_deposit_status(self):
return self._client.deposit_status(
collection=self._collection,
deposit_id=self._deposit_id)
def wait_while_status(self, statuses, start_time, metrics, result):
while result['deposit_status'] in statuses:
metrics['total_time'] = time.time() - start_time
if metrics['total_time'] > self.critical_threshold:
self.print_result(
'CRITICAL',
f'Timed out while in status '
f'{result["deposit_status"]} '
f'({metrics["total_time"]}s seconds since deposit '
f'started)',
**metrics)
sys.exit(2)
time.sleep(self._poll_interval)
result = self.get_deposit_status()
return result
def main(self):
start_time = time.time()
metrics = {}
# Upload the archive and metadata
result = self.upload_deposit()
metrics['upload_time'] = time.time() - start_time
# Wait for validation
result = self.wait_while_status(
['deposited'], start_time, metrics, result)
metrics['total_time'] = time.time() - start_time
metrics['validation_time'] = \
metrics['total_time'] - metrics['upload_time']
# Check validation succeeded
if result['deposit_status'] == 'rejected':
self.print_result(
'CRITICAL',
f'Deposit was rejected: {result["deposit_status_detail"]}',
**metrics)
return 2
# Wait for loading
result = self.wait_while_status(
['verified', 'loading'], start_time, metrics, result)
metrics['total_time'] = time.time() - start_time
metrics['load_time'] = (
metrics['total_time'] - metrics['upload_time']
- metrics['validation_time'])
# Check loading succeeded
if result['deposit_status'] == 'failed':
self.print_result(
'CRITICAL',
f'Deposit loading failed: {result["deposit_status_detail"]}',
**metrics)
return 2
# Check for unexpected status
if result['deposit_status'] != 'done':
self.print_result(
'CRITICAL',
f'Deposit got unexpected status: {result["deposit_status"]} '
f'({result["deposit_status_detail"]})',
**metrics)
return 2
# Everything went fine, check total time wasn't too large and
# print result
(status_code, status) = self.get_status(metrics['total_time'])
self.print_result(
status,
f'Deposit took {metrics["total_time"]:.2f}s and succeeded.',
**metrics)
return status_code
diff --git a/swh/icinga_plugins/tests/test_deposit.py b/swh/icinga_plugins/tests/test_deposit.py
index 7df3095..453e145 100644
--- a/swh/icinga_plugins/tests/test_deposit.py
+++ b/swh/icinga_plugins/tests/test_deposit.py
@@ -1,375 +1,375 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import io
import os
import tarfile
import time
from click.testing import CliRunner
import pytest
from swh.icinga_plugins.cli import cli
-from .web_scenario import WebScenario
+from .web_scenario import WebScenario
BASE_URL = 'http://swh-deposit.example.org/1'
COMMON_OPTIONS = [
'--server', BASE_URL,
'--username', 'test',
'--password', 'test',
'--collection', 'testcol',
]
SAMPLE_METADATA = '''
Test Software
swh
test-software
No One
'''
ENTRY_TEMPLATE = '''
42
2019-12-19 18:11:00
foo.tar.gz
{status}
http://purl.org/net/sword/package/SimpleZip
'''
STATUS_TEMPLATE = '''
42
{status}
{status_detail}
'''
@pytest.fixture(scope='session')
def tmp_path(tmp_path_factory):
return tmp_path_factory.mktemp(__name__)
@pytest.fixture(scope='session')
def sample_metadata(tmp_path):
"""Returns a sample metadata file's path
"""
path = os.path.join(tmp_path, 'metadata.xml')
with open(path, 'w') as fd:
fd.write(SAMPLE_METADATA)
return path
@pytest.fixture(scope='session')
def sample_archive(tmp_path):
"""Returns a sample archive's path
"""
path = os.path.join(tmp_path, 'archive.tar.gz')
with tarfile.open(path, 'w:gz') as tf:
tf.addfile(
tarfile.TarInfo('hello.py'),
io.BytesIO(b'print("Hello world")'))
return path
def invoke(args, catch_exceptions=False):
runner = CliRunner()
result = runner.invoke(cli, args)
if not catch_exceptions and result.exception:
print(result.output)
raise result.exception
return result
def test_deposit_immediate_success(
requests_mock, mocker, sample_archive, sample_metadata, mocked_time):
scenario = WebScenario()
scenario.add_step(
'post', BASE_URL + '/testcol/',
ENTRY_TEMPLATE.format(status='done'))
scenario.install_mock(requests_mock)
result = invoke([
'check-deposit',
*COMMON_OPTIONS,
'single',
'--archive', sample_archive,
'--metadata', sample_metadata,
])
assert result.output == (
"DEPOSIT OK - Deposit took 0.00s and succeeded.\n"
"| 'load_time' = 0.00s\n"
"| 'total_time' = 0.00s\n"
"| 'upload_time' = 0.00s\n"
"| 'validation_time' = 0.00s\n")
assert result.exit_code == 0, result.output
def test_deposit_delays(
requests_mock, mocker, sample_archive, sample_metadata, mocked_time):
scenario = WebScenario()
scenario.add_step(
'post', BASE_URL + '/testcol/',
ENTRY_TEMPLATE.format(status='deposited'))
scenario.add_step(
'get', BASE_URL + '/testcol/42/status/',
STATUS_TEMPLATE.format(status='verified', status_detail=''))
scenario.add_step(
'get', BASE_URL + '/testcol/42/status/',
STATUS_TEMPLATE.format(status='loading', status_detail=''))
scenario.add_step(
'get', BASE_URL + '/testcol/42/status/',
STATUS_TEMPLATE.format(status='done', status_detail=''))
scenario.install_mock(requests_mock)
result = invoke([
'check-deposit',
*COMMON_OPTIONS,
'single',
'--archive', sample_archive,
'--metadata', sample_metadata,
])
assert result.output == (
"DEPOSIT OK - Deposit took 30.00s and succeeded.\n"
"| 'load_time' = 20.00s\n"
"| 'total_time' = 30.00s\n"
"| 'upload_time' = 0.00s\n"
"| 'validation_time' = 10.00s\n")
assert result.exit_code == 0, result.output
def test_deposit_delay_warning(
requests_mock, mocker, sample_archive, sample_metadata, mocked_time):
scenario = WebScenario()
scenario.add_step(
'post', BASE_URL + '/testcol/',
ENTRY_TEMPLATE.format(status='deposited'))
scenario.add_step(
'get', BASE_URL + '/testcol/42/status/',
STATUS_TEMPLATE.format(status='verified', status_detail=''))
scenario.add_step(
'get', BASE_URL + '/testcol/42/status/',
STATUS_TEMPLATE.format(status='done', status_detail=''))
scenario.install_mock(requests_mock)
result = invoke([
'--warning', '15',
'check-deposit',
*COMMON_OPTIONS,
'single',
'--archive', sample_archive,
'--metadata', sample_metadata,
], catch_exceptions=True)
assert result.output == (
"DEPOSIT WARNING - Deposit took 20.00s and succeeded.\n"
"| 'load_time' = 10.00s\n"
"| 'total_time' = 20.00s\n"
"| 'upload_time' = 0.00s\n"
"| 'validation_time' = 10.00s\n")
assert result.exit_code == 1, result.output
def test_deposit_delay_critical(
requests_mock, mocker, sample_archive, sample_metadata, mocked_time):
scenario = WebScenario()
scenario.add_step(
'post', BASE_URL + '/testcol/',
ENTRY_TEMPLATE.format(status='deposited'))
scenario.add_step(
'get', BASE_URL + '/testcol/42/status/',
STATUS_TEMPLATE.format(status='verified', status_detail=''))
scenario.add_step(
'get', BASE_URL + '/testcol/42/status/',
STATUS_TEMPLATE.format(status='done', status_detail=''),
callback=lambda: time.sleep(60))
scenario.install_mock(requests_mock)
result = invoke([
'--critical', '50',
'check-deposit',
*COMMON_OPTIONS,
'single',
'--archive', sample_archive,
'--metadata', sample_metadata,
], catch_exceptions=True)
assert result.output == (
"DEPOSIT CRITICAL - Deposit took 80.00s and succeeded.\n"
"| 'load_time' = 70.00s\n"
"| 'total_time' = 80.00s\n"
"| 'upload_time' = 0.00s\n"
"| 'validation_time' = 10.00s\n")
assert result.exit_code == 2, result.output
def test_deposit_timeout(
requests_mock, mocker, sample_archive, sample_metadata, mocked_time):
scenario = WebScenario()
scenario.add_step(
'post', BASE_URL + '/testcol/',
ENTRY_TEMPLATE.format(status='deposited'),
callback=lambda: time.sleep(1500))
scenario.add_step(
'get', BASE_URL + '/testcol/42/status/',
STATUS_TEMPLATE.format(status='verified', status_detail=''),
callback=lambda: time.sleep(1500))
scenario.add_step(
'get', BASE_URL + '/testcol/42/status/',
STATUS_TEMPLATE.format(status='loading', status_detail=''),
callback=lambda: time.sleep(1500))
scenario.install_mock(requests_mock)
result = invoke([
'check-deposit',
*COMMON_OPTIONS,
'single',
'--archive', sample_archive,
'--metadata', sample_metadata,
], catch_exceptions=True)
assert result.output == (
"DEPOSIT CRITICAL - Timed out while in status loading "
"(4520.0s seconds since deposit started)\n"
"| 'total_time' = 4520.00s\n"
"| 'upload_time' = 1500.00s\n"
"| 'validation_time' = 1510.00s\n")
assert result.exit_code == 2, result.output
def test_deposit_rejected(
requests_mock, mocker, sample_archive, sample_metadata, mocked_time):
scenario = WebScenario()
scenario.add_step(
'post', BASE_URL + '/testcol/',
ENTRY_TEMPLATE.format(status='deposited'))
scenario.add_step(
'get', BASE_URL + '/testcol/42/status/',
STATUS_TEMPLATE.format(status='rejected', status_detail='booo'))
scenario.install_mock(requests_mock)
result = invoke([
'check-deposit',
*COMMON_OPTIONS,
'single',
'--archive', sample_archive,
'--metadata', sample_metadata,
], catch_exceptions=True)
assert result.output == (
"DEPOSIT CRITICAL - Deposit was rejected: booo\n"
"| 'total_time' = 10.00s\n"
"| 'upload_time' = 0.00s\n"
"| 'validation_time' = 10.00s\n")
assert result.exit_code == 2, result.output
def test_deposit_failed(
requests_mock, mocker, sample_archive, sample_metadata, mocked_time):
scenario = WebScenario()
scenario.add_step(
'post', BASE_URL + '/testcol/',
ENTRY_TEMPLATE.format(status='deposited'))
scenario.add_step(
'get', BASE_URL + '/testcol/42/status/',
STATUS_TEMPLATE.format(status='verified', status_detail=''))
scenario.add_step(
'get', BASE_URL + '/testcol/42/status/',
STATUS_TEMPLATE.format(status='loading', status_detail=''))
scenario.add_step(
'get', BASE_URL + '/testcol/42/status/',
STATUS_TEMPLATE.format(status='failed', status_detail='booo'))
scenario.install_mock(requests_mock)
result = invoke([
'check-deposit',
*COMMON_OPTIONS,
'single',
'--archive', sample_archive,
'--metadata', sample_metadata,
], catch_exceptions=True)
assert result.output == (
"DEPOSIT CRITICAL - Deposit loading failed: booo\n"
"| 'load_time' = 20.00s\n"
"| 'total_time' = 30.00s\n"
"| 'upload_time' = 0.00s\n"
"| 'validation_time' = 10.00s\n")
assert result.exit_code == 2, result.output
def test_deposit_unexpected_status(
requests_mock, mocker, sample_archive, sample_metadata, mocked_time):
scenario = WebScenario()
scenario.add_step(
'post', BASE_URL + '/testcol/',
ENTRY_TEMPLATE.format(status='deposited'))
scenario.add_step(
'get', BASE_URL + '/testcol/42/status/',
STATUS_TEMPLATE.format(status='verified', status_detail=''))
scenario.add_step(
'get', BASE_URL + '/testcol/42/status/',
STATUS_TEMPLATE.format(status='loading', status_detail=''))
scenario.add_step(
'get', BASE_URL + '/testcol/42/status/',
STATUS_TEMPLATE.format(status='what', status_detail='booo'))
scenario.install_mock(requests_mock)
result = invoke([
'check-deposit',
*COMMON_OPTIONS,
'single',
'--archive', sample_archive,
'--metadata', sample_metadata,
], catch_exceptions=True)
assert result.output == (
"DEPOSIT CRITICAL - Deposit got unexpected status: what (booo)\n"
"| 'load_time' = 20.00s\n"
"| 'total_time' = 30.00s\n"
"| 'upload_time' = 0.00s\n"
"| 'validation_time' = 10.00s\n")
assert result.exit_code == 2, result.output
diff --git a/swh/icinga_plugins/tests/test_vault.py b/swh/icinga_plugins/tests/test_vault.py
index ee99db4..b6ef51a 100644
--- a/swh/icinga_plugins/tests/test_vault.py
+++ b/swh/icinga_plugins/tests/test_vault.py
@@ -1,256 +1,256 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import time
from click.testing import CliRunner
from swh.icinga_plugins.cli import cli
-from .web_scenario import WebScenario
+from .web_scenario import WebScenario
dir_id = 'ab'*20
response_pending = {
"obj_id": dir_id,
"obj_type": "directory",
"progress_message": "foo",
"status": "pending"
}
response_done = {
"fetch_url": f"/api/1/vault/directory/{dir_id}/raw/",
"id": 9,
"obj_id": dir_id,
"obj_type": "directory",
"status": "done"
}
response_failed = {
"obj_id": dir_id,
"obj_type": "directory",
"progress_message": "foobar",
"status": "failed"
}
response_unknown_status = {
"obj_id": dir_id,
"obj_type": "directory",
"progress_message": "what",
"status": "boo"
}
class FakeStorage:
def __init__(self, foo, **kwargs):
pass
def directory_get_random(self):
return bytes.fromhex(dir_id)
def invoke(args, catch_exceptions=False):
runner = CliRunner()
result = runner.invoke(cli, args)
if not catch_exceptions and result.exception:
print(result.output)
raise result.exception
return result
def test_vault_immediate_success(requests_mock, mocker, mocked_time):
scenario = WebScenario()
url = f'mock://swh-web.example.org/api/1/vault/directory/{dir_id}/'
scenario.add_step('get', url, {}, status_code=404)
scenario.add_step('post', url, response_pending)
scenario.add_step('get', url, response_done)
scenario.install_mock(requests_mock)
get_storage_mock = mocker.patch('swh.icinga_plugins.vault.get_storage')
get_storage_mock.side_effect = FakeStorage
result = invoke([
'check-vault',
'--swh-web-url', 'mock://swh-web.example.org',
'--swh-storage-url', 'foo://example.org',
'directory',
])
assert result.output == (
f"VAULT OK - cooking directory {dir_id} took "
f"10.00s and succeeded.\n"
f"| 'total_time' = 10.00s\n")
assert result.exit_code == 0, result.output
def test_vault_delayed_success(requests_mock, mocker, mocked_time):
scenario = WebScenario()
url = f'mock://swh-web.example.org/api/1/vault/directory/{dir_id}/'
scenario.add_step('get', url, {}, status_code=404)
scenario.add_step('post', url, response_pending)
scenario.add_step('get', url, response_pending)
scenario.add_step('get', url, response_done)
scenario.install_mock(requests_mock)
get_storage_mock = mocker.patch('swh.icinga_plugins.vault.get_storage')
get_storage_mock.side_effect = FakeStorage
result = invoke([
'check-vault',
'--swh-web-url', 'mock://swh-web.example.org',
'--swh-storage-url', 'foo://example.org',
'directory',
])
assert result.output == (
f"VAULT OK - cooking directory {dir_id} took "
f"20.00s and succeeded.\n"
f"| 'total_time' = 20.00s\n")
assert result.exit_code == 0, result.output
def test_vault_failure(requests_mock, mocker, mocked_time):
scenario = WebScenario()
url = f'mock://swh-web.example.org/api/1/vault/directory/{dir_id}/'
scenario.add_step('get', url, {}, status_code=404)
scenario.add_step('post', url, response_pending)
scenario.add_step('get', url, response_failed)
scenario.install_mock(requests_mock)
get_storage_mock = mocker.patch('swh.icinga_plugins.vault.get_storage')
get_storage_mock.side_effect = FakeStorage
result = invoke([
'check-vault',
'--swh-web-url', 'mock://swh-web.example.org',
'--swh-storage-url', 'foo://example.org',
'directory',
], catch_exceptions=True)
assert result.output == (
f"VAULT CRITICAL - cooking directory {dir_id} took "
f"10.00s and failed with: foobar\n"
f"| 'total_time' = 10.00s\n")
assert result.exit_code == 2, result.output
def test_vault_unknown_status(requests_mock, mocker, mocked_time):
scenario = WebScenario()
url = f'mock://swh-web.example.org/api/1/vault/directory/{dir_id}/'
scenario.add_step('get', url, {}, status_code=404)
scenario.add_step('post', url, response_pending)
scenario.add_step('get', url, response_unknown_status)
scenario.install_mock(requests_mock)
get_storage_mock = mocker.patch('swh.icinga_plugins.vault.get_storage')
get_storage_mock.side_effect = FakeStorage
result = invoke([
'check-vault',
'--swh-web-url', 'mock://swh-web.example.org',
'--swh-storage-url', 'foo://example.org',
'directory',
], catch_exceptions=True)
assert result.output == (
f"VAULT CRITICAL - cooking directory {dir_id} took "
f"10.00s and resulted in unknown status: boo\n"
f"| 'total_time' = 10.00s\n")
assert result.exit_code == 2, result.output
def test_vault_timeout(requests_mock, mocker, mocked_time):
scenario = WebScenario()
url = f'mock://swh-web.example.org/api/1/vault/directory/{dir_id}/'
scenario.add_step('get', url, {}, status_code=404)
scenario.add_step('post', url, response_pending)
scenario.add_step('get', url, response_pending)
scenario.add_step('get', url, response_pending,
callback=lambda: time.sleep(4000))
scenario.install_mock(requests_mock)
get_storage_mock = mocker.patch('swh.icinga_plugins.vault.get_storage')
get_storage_mock.side_effect = FakeStorage
result = invoke([
'check-vault',
'--swh-web-url', 'mock://swh-web.example.org',
'--swh-storage-url', 'foo://example.org',
'directory',
], catch_exceptions=True)
assert result.output == (
f"VAULT CRITICAL - cooking directory {dir_id} took more than "
f"4020.00s and has status: foo\n"
f"| 'total_time' = 4020.00s\n")
assert result.exit_code == 2, result.output
def test_vault_cached_directory(requests_mock, mocker, mocked_time):
"""First serves a directory that's already in the cache, to
test that vault_check requests another one."""
scenario = WebScenario()
url = f'mock://swh-web.example.org/api/1/vault/directory/{dir_id}/'
scenario.add_step('get', url, {}, status_code=200)
scenario.add_step('get', url, {}, status_code=404)
scenario.add_step('post', url, response_pending)
scenario.add_step('get', url, response_done)
scenario.install_mock(requests_mock)
get_storage_mock = mocker.patch('swh.icinga_plugins.vault.get_storage')
get_storage_mock.side_effect = FakeStorage
result = invoke([
'check-vault',
'--swh-web-url', 'mock://swh-web.example.org',
'--swh-storage-url', 'foo://example.org',
'directory',
])
assert result.output == (
f"VAULT OK - cooking directory {dir_id} took "
f"10.00s and succeeded.\n"
f"| 'total_time' = 10.00s\n")
assert result.exit_code == 0, result.output
def test_vault_no_directory(requests_mock, mocker, mocked_time):
"""Tests with an empty storage"""
scenario = WebScenario()
scenario.install_mock(requests_mock)
get_storage_mock = mocker.patch('swh.icinga_plugins.vault.get_storage')
get_storage_mock.side_effect = FakeStorage
mocker.patch(
f'{__name__}.FakeStorage.directory_get_random', return_value=None)
result = invoke([
'check-vault',
'--swh-web-url', 'mock://swh-web.example.org',
'--swh-storage-url', 'foo://example.org',
'directory',
], catch_exceptions=True)
assert result.output == (
"VAULT CRITICAL - No directory exists in the archive.\n")
assert result.exit_code == 2, result.output
diff --git a/swh/icinga_plugins/tests/web_scenario.py b/swh/icinga_plugins/tests/web_scenario.py
index 0a0c57e..846ee6e 100644
--- a/swh/icinga_plugins/tests/web_scenario.py
+++ b/swh/icinga_plugins/tests/web_scenario.py
@@ -1,90 +1,90 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
"""Wrapper around requests-mock to mock successive responses
from a web service.
Tests can build successive steps by calling :py:meth:`WebScenario.add_step`
with specifications of what endpoints should be called and in what order."""
from dataclasses import dataclass
import json
-from typing import List, Set, Optional, Callable
+from typing import Callable, List, Optional, Set
import requests_mock
@dataclass(frozen=True)
class Step:
expected_method: str
expected_url: str
response: object
status_code: int = 200
callback: Optional[Callable[[], int]] = None
@dataclass(frozen=True)
class Endpoint:
method: str
url: str
class WebScenario:
"""Stores the state of the successive calls to the web service
expected by tests."""
_steps: List[Step]
_endpoints: Set[Endpoint]
_current_step: int
def __init__(self):
self._steps = []
self._endpoints = set()
self._current_step = 0
def add_endpoint(self, *args, **kwargs):
"""Adds an endpoint to be mocked.
Arguments are the same as :py:class:Endpoint.
"""
self._endpoints.add(Endpoint(*args, **kwargs))
def add_step(self, *args, **kwargs):
"""Adds an expected call to the list of expected calls.
Also automatically calls :py:meth:`add_endpoint` so the
associated endpoint is mocked.
Arguments are the same as :py:class:`Step`.
"""
step = Step(*args, **kwargs)
self._steps.append(step)
self.add_endpoint(step.expected_method, step.expected_url)
def install_mock(self, mocker: requests_mock.Mocker):
"""Mocks entrypoints registered with :py:meth:`add_endpoint`
(or :py:meth:`add_step`) using the provided mocker.
"""
for endpoint in self._endpoints:
mocker.register_uri(
endpoint.method.upper(), endpoint.url,
text=self._request_callback)
def _request_callback(self, request, context):
step = self._steps[self._current_step]
assert request.url == step.expected_url
assert request.method.upper() == step.expected_method.upper()
self._current_step += 1
context.status_code = step.status_code
if step.callback:
step.callback()
if isinstance(step.response, str):
return step.response
else:
return json.dumps(step.response)