Changeset View
Changeset View
Standalone View
Standalone View
swh/web/tests/common/test_origin_save.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU Affero General Public License version 3, or any later version | # License: GNU Affero General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import json | import re | ||||
import os | |||||
from datetime import datetime, timedelta, timezone | from datetime import datetime, timedelta, timezone | ||||
from functools import partial | |||||
import pytest | import pytest | ||||
import requests_mock | import requests | ||||
from swh.web.common.models import ( | from swh.core.pytest_plugin import get_response_cb | ||||
SaveOriginRequest | |||||
) | from swh.web.common.models import SaveOriginRequest | ||||
from swh.web.common.origin_save import get_save_origin_task_info | from swh.web.common.origin_save import get_save_origin_task_info | ||||
from swh.web.config import get_config | from swh.web.config import get_config | ||||
_RESOURCES_PATH = os.path.join(os.path.dirname(__file__), '../resources') | |||||
_es_url = 'http://esnode1.internal.softwareheritage.org:9200' | _es_url = 'http://esnode1.internal.softwareheritage.org:9200' | ||||
_es_workers_index_url = '%s/swh_workers-*' % _es_url | _es_workers_index_url = '%s/swh_workers-*' % _es_url | ||||
@pytest.fixture(autouse=True) | |||||
def requests_mock_datadir(datadir, requests_mock_datadir): | |||||
"""Override default behavior to deal with post method | |||||
""" | |||||
cb = partial(get_response_cb, datadir=datadir) | |||||
requests_mock_datadir.post(re.compile('https?://'), body=cb) | |||||
return requests_mock_datadir | |||||
@pytest.mark.django_db | |||||
def test_get_save_origin_archived_task_info(mocker): | |||||
_get_save_origin_task_info_test(mocker, task_archived=True) | |||||
@pytest.mark.django_db | |||||
def test_get_save_origin_task_info_with_es(mocker): | |||||
_get_save_origin_task_info_test(mocker, es_available=True) | |||||
@pytest.mark.django_db | |||||
def test_get_save_origin_task_info_without_es(mocker): | |||||
_get_save_origin_task_info_test(mocker, es_available=False) | |||||
def _get_save_origin_task_info_test(mocker, task_archived=False, | def _get_save_origin_task_info_test(mocker, task_archived=False, | ||||
es_available=True): | es_available=True): | ||||
swh_web_config = get_config() | swh_web_config = get_config() | ||||
if es_available: | if es_available: | ||||
swh_web_config.update({'es_workers_index_url': _es_workers_index_url}) | swh_web_config.update({'es_workers_index_url': _es_workers_index_url}) | ||||
else: | else: | ||||
swh_web_config.update({'es_workers_index_url': ''}) | swh_web_config.update({'es_workers_index_url': ''}) | ||||
sor_id = 4473 | sor_id = 4473 | ||||
Show All 36 Lines | task_run = { | ||||
'metadata': {}, | 'metadata': {}, | ||||
'scheduled': datetime(2019, 8, 30, 23, 8, 34, 282021), | 'scheduled': datetime(2019, 8, 30, 23, 8, 34, 282021), | ||||
'started': None, | 'started': None, | ||||
'status': 'failed', | 'status': 'failed', | ||||
'task': 203525448 | 'task': 203525448 | ||||
} | } | ||||
mock_scheduler.get_task_runs.return_value = [task_run] | mock_scheduler.get_task_runs.return_value = [task_run] | ||||
es_response = os.path.join(_RESOURCES_PATH, | es_response = requests.post('%s/_search' % _es_workers_index_url).json() | ||||
'json/es_task_info_response.json') | |||||
with open(es_response) as json_fd: | |||||
es_response = json.load(json_fd) | |||||
task_exec_data = es_response['hits']['hits'][-1]['_source'] | task_exec_data = es_response['hits']['hits'][-1]['_source'] | ||||
with requests_mock.Mocker() as requests_mocker: | |||||
requests_mocker.register_uri('POST', _es_workers_index_url+'/_search', | |||||
json=es_response) | |||||
sor_task_info = get_save_origin_task_info(sor_id) | sor_task_info = get_save_origin_task_info(sor_id) | ||||
expected_result = { | expected_result = { | ||||
'type': task['type'], | 'type': task['type'], | ||||
'arguments': task['arguments'], | 'arguments': task['arguments'], | ||||
'id': task['id'], | 'id': task['id'], | ||||
'backend_id': task_run['backend_id'], | 'backend_id': task_run['backend_id'], | ||||
'scheduled': task_run['scheduled'], | 'scheduled': task_run['scheduled'], | ||||
'ended': task_run['ended'], | 'ended': task_run['ended'], | ||||
'status': task_run['status'], | 'status': task_run['status'], | ||||
} if not task_archived else {} | } if not task_archived else {} | ||||
if es_available and not task_archived: | if es_available and not task_archived: | ||||
expected_result.update({ | expected_result.update({ | ||||
'message': task_exec_data['message'], | 'message': task_exec_data['message'], | ||||
'name': task_exec_data['swh_task_name'], | 'name': task_exec_data['swh_task_name'], | ||||
'worker': task_exec_data['hostname'] | 'worker': task_exec_data['hostname'] | ||||
}) | }) | ||||
assert sor_task_info == expected_result | assert sor_task_info == expected_result | ||||
@pytest.mark.django_db | |||||
def test_get_save_origin_archived_task_info(mocker): | |||||
_get_save_origin_task_info_test(mocker, task_archived=True) | |||||
@pytest.mark.django_db | |||||
def test_get_save_origin_task_info_with_es(mocker): | |||||
_get_save_origin_task_info_test(mocker, es_available=True) | |||||
@pytest.mark.django_db | |||||
def test_get_save_origin_task_info_without_es(mocker): | |||||
_get_save_origin_task_info_test(mocker, es_available=False) |