diff --git a/requirements-test.txt b/requirements-test.txt --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,5 +1,7 @@ +hypothesis pytest pytest-django -hypothesis +pytest-mock +requests-mock swh.core[http] >= 0.0.61 swh.loader.git >= 0.0.47 diff --git a/swh/web/common/origin_save.py b/swh/web/common/origin_save.py --- a/swh/web/common/origin_save.py +++ b/swh/web/common/origin_save.py @@ -3,8 +3,12 @@ # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information +import json + from bisect import bisect_right -from datetime import datetime, timezone +from datetime import datetime, timezone, timedelta + +import requests from django.core.exceptions import ObjectDoesNotExist from django.core.exceptions import ValidationError @@ -365,14 +369,14 @@ task_ids = [] for sor in requests_queryset: task_ids.append(sor.loading_task_id) - requests = [] + save_requests = [] if task_ids: tasks = scheduler.get_tasks(task_ids) tasks = {task['id']: task for task in tasks} for sor in requests_queryset: sr_dict = _save_request_dict(sor, tasks.get(sor.loading_task_id)) - requests.append(sr_dict) - return requests + save_requests.append(sr_dict) + return save_requests def get_save_origin_requests(origin_type, origin_url): @@ -399,3 +403,125 @@ raise NotFoundExc(('No save requests found for origin with type ' '%s and url %s.') % (origin_type, origin_url)) return get_save_origin_requests_from_queryset(sors) + + +def get_save_origin_task_info(save_request_id): + """ + Get detailed information about an accepted save origin request + and it associated loading task. + + If the associated loading task info have been archived and removed + from the scheduler database, an empty dictionary will be returned. + + Args: + save_request_id (int): identifier of a save origin request + + Returns: + dict: A dictionary with the following keys: + - **type**: type of the loading task + - **arguments**: arguments provided to the task + - **id**: database identifier of the loading task + - **backend_id**: celery identifier of the loading task + - **scheduled**: scheduling date of the loading task + - **ended**: termination date of the loading task + - **status**: execution status of the loading task + Depending on the availability of the task logs in the elasticsearch + cluster of Software Heritage, the returned dictionary may also + contain the following keys: + - **name**: name of the associated celery task + - **message**: relevant log message from the task execution + - **duration**: task execution time (only if the task succeeded) + - **worker**: name of the worker that executed the task + """ + try: + save_request = SaveOriginRequest.objects.get(id=save_request_id) + except ObjectDoesNotExist: + return {} + + task = scheduler.get_tasks([save_request.loading_task_id]) + task = task[0] if task else None + if task is None: + return {} + + task_run = scheduler.get_task_runs([task['id']]) + task_run = task_run[0] if task_run else None + if task_run is None: + return {} + task_run['type'] = task['type'] + task_run['arguments'] = task['arguments'] + task_run['id'] = task_run['task'] + del task_run['task'] + del task_run['metadata'] + del task_run['started'] + + es_workers_index_url = config.get_config()['es_workers_index_url'] + if not es_workers_index_url: + return task_run + es_workers_index_url += '/_search' + + if save_request.visit_date: + min_ts = save_request.visit_date + max_ts = min_ts + timedelta(days=7) + else: + min_ts = save_request.request_date + max_ts = min_ts + timedelta(days=30) + min_ts = int(min_ts.timestamp()) * 1000 + max_ts = int(max_ts.timestamp()) * 1000 + + save_task_status = _save_task_status[task['status']] + priority = '3' if save_task_status == SAVE_TASK_FAILED else '6' + + query = { + 'bool': { + 'must': [ + { + 'match_phrase': { + 'priority': { + 'query': priority + } + } + }, + { + 'match_phrase': { + 'swh_task_id': { + 'query': task_run['backend_id'] + } + } + }, + { + 'range': { + '@timestamp': { + 'gte': min_ts, + 'lte': max_ts, + 'format': 'epoch_millis' + } + } + } + ] + } + } + + try: + response = requests.post(es_workers_index_url, + json={'query': query, + 'sort': ['@timestamp']}) + results = json.loads(response.text) + if results['hits']['total'] >= 1: + task_run_info = results['hits']['hits'][-1]['_source'] + if 'swh_logging_args_runtime' in task_run_info: + duration = task_run_info['swh_logging_args_runtime'] + task_run['duration'] = duration + if 'message' in task_run_info: + task_run['message'] = task_run_info['message'] + if 'swh_logging_args_name' in task_run_info: + task_run['name'] = task_run_info['swh_logging_args_name'] + elif 'swh_task_name' in task_run_info: + task_run['name'] = task_run_info['swh_task_name'] + if 'hostname' in task_run_info: + task_run['worker'] = task_run_info['hostname'] + elif 'host' in task_run_info: + task_run['worker'] = task_run_info['host'] + except Exception: + pass + + return task_run diff --git a/swh/web/config.py b/swh/web/config.py --- a/swh/web/config.py +++ b/swh/web/config.py @@ -92,7 +92,8 @@ 'private_api_password': '' }), 'coverage_count_origins': ('bool', False), - 'e2e_tests_mode': ('bool', False) + 'e2e_tests_mode': ('bool', False), + 'es_workers_index_url': ('string', ''), } swhweb_config = {} diff --git a/swh/web/tests/common/test_origin_save.py b/swh/web/tests/common/test_origin_save.py new file mode 100644 --- /dev/null +++ b/swh/web/tests/common/test_origin_save.py @@ -0,0 +1,127 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU Affero General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import json +import os + +from datetime import datetime, timedelta, timezone + +import pytest +import requests_mock + +from swh.web.common.models import ( + SaveOriginRequest +) +from swh.web.common.origin_save import get_save_origin_task_info +from swh.web.config import get_config + + +_RESOURCES_PATH = os.path.join(os.path.dirname(__file__), '../resources') + +_es_url = 'http://esnode1.internal.softwareheritage.org:9200' +_es_workers_index_url = '%s/swh_workers-*' % _es_url + + +def _get_save_origin_task_info_test(mocker, task_archived=False, + es_available=True): + + swh_web_config = get_config() + + if es_available: + swh_web_config.update({'es_workers_index_url': _es_workers_index_url}) + else: + swh_web_config.update({'es_workers_index_url': ''}) + + sor_id = 4473 + + SaveOriginRequest.objects.create( + id=sor_id, + request_date=datetime(2019, 8, 30, 23, 7, 3, 474294, + tzinfo=timezone.utc), + origin_type='git', + origin_url='https://gitlab.com/inkscape/inkscape', + status='accepted', + loading_task_id=203525448, + visit_date=datetime(2019, 8, 30, 23, 18, 11, 54341, + tzinfo=timezone.utc) + ) + + mock_scheduler = mocker.patch('swh.web.common.origin_save.scheduler') + task = { + 'arguments': { + 'args': [], + 'kwargs': { + 'repo_url': 'https://gitlab.com/inkscape/inkscape' + } + }, + 'current_interval': timedelta(days=64), + 'id': 203525448, + 'next_run': datetime(2019, 8, 30, 23, 7, 1, 614823), + 'policy': 'oneshot', + 'priority': 'high', + 'retries_left': 0, + 'status': 'disabled', + 'type': 'load-git' + } if not task_archived else None + mock_scheduler.get_tasks.return_value = [task] + + task_run = { + 'backend_id': 'f00c712c-e820-41ce-a07c-9bf8df914205', + 'ended': datetime(2019, 8, 30, 23, 18, 13, 770800), + 'id': 654270631, + 'metadata': {}, + 'scheduled': datetime(2019, 8, 30, 23, 8, 34, 282021), + 'started': None, + 'status': 'failed', + 'task': 203525448 + } + mock_scheduler.get_task_runs.return_value = [task_run] + + es_response = os.path.join(_RESOURCES_PATH, + 'json/es_task_info_response.json') + with open(es_response) as json_fd: + es_response = json.load(json_fd) + + task_exec_data = es_response['hits']['hits'][-1]['_source'] + + with requests_mock.Mocker() as requests_mocker: + requests_mocker.register_uri('POST', _es_workers_index_url+'/_search', + json=es_response) + + sor_task_info = get_save_origin_task_info(sor_id) + + expected_result = { + 'type': task['type'], + 'arguments': task['arguments'], + 'id': task['id'], + 'backend_id': task_run['backend_id'], + 'scheduled': task_run['scheduled'], + 'ended': task_run['ended'], + 'status': task_run['status'], + } if not task_archived else {} + + if es_available and not task_archived: + expected_result.update({ + 'message': task_exec_data['message'], + 'name': task_exec_data['swh_task_name'], + 'worker': task_exec_data['hostname'] + }) + + assert sor_task_info == expected_result + + +@pytest.mark.django_db +def test_get_save_origin_archived_task_info(mocker): + _get_save_origin_task_info_test(mocker, task_archived=True) + + +@pytest.mark.django_db +def test_get_save_origin_task_info_with_es(mocker): + _get_save_origin_task_info_test(mocker, es_available=True) + + +@pytest.mark.django_db +def test_get_save_origin_task_info_without_es(mocker): + _get_save_origin_task_info_test(mocker, es_available=False) diff --git a/swh/web/tests/resources/json/es_task_info_response.json b/swh/web/tests/resources/json/es_task_info_response.json new file mode 100644 --- /dev/null +++ b/swh/web/tests/resources/json/es_task_info_response.json @@ -0,0 +1,62 @@ +{ + "took": 19, + "timed_out": false, + "_shards": { + "total": 194, + "successful": 194, + "skipped": 186, + "failed": 0 + }, + "hits": { + "total": 1, + "max_score": null, + "hits": [{ + "_index": "swh_workers-2019.08.30", + "_type": "doc", + "_id": "uHrS5GwBjk15w1A-eZNK", + "_score": null, + "_source": { + "comm": "python3", + "code_line": "909", + "type": "journal", + "code_func": "load", + "transport": "journal", + "swh_task_name": "swh.loader.git.tasks.UpdateGitRepository", + "logger": "swh.loader.git.BulkLoader", + "swh_task_args_0": "https://gitlab.com/inkscape/inkscape", + "source_realtime_timestamp": "1567207093348189", + "code_file": "/usr/lib/python3/dist-packages/swh/loader/core/loader.py", + "systemd_slice": "system-swh\\x2dworker.slice", + "@version": "1", + "cap_effective": "0", + "boot_id": "b82af8ba13ee48258109a7dfd5058e53", + "machine_id": "563ec85b8bcd4ec289b9af4f52b6fa41", + "swh_task_id": "f00c712c-e820-41ce-a07c-9bf8df914205", + "gid": "1004", + "beat": { + "name": "worker13", + "version": "5.5.0", + "hostname": "worker13" + }, + "priority": "3", + "systemd_invocation_id": "18bb45cd515d4e1794ddd4d391389045", + "@realtime_timestamp": 1567207093348366, + "pid": "675", + "exe": "/usr/bin/python3.5", + "@timestamp": "2019-08-30T23:18:13.348Z", + "systemd_unit": "swh-worker@loader_git.service", + "tags": ["beats_input_codec_plain_applied"], + "systemd_cgroup": "/system.slice/system-swh\\x2dworker.slice/swh-worker@loader_git.service", + "host": "worker13", + "thread_name": "MainThread", + "message": "[2019-08-30 23:18:13,342: ERROR/ForkPoolWorker-64335] Loading failure, updating to `partial` status\nTraceback (most recent call last):\n File \"/usr/lib/python3/dist-packages/swh/loader/core/loader.py\", line 895, in load\n more_data_to_fetch = self.fetch_data()\n File \"/usr/lib/python3/dist-packages/swh/loader/git/loader.py\", line 311, in fetch_data\n do_progress)\n File \"/usr/lib/python3/dist-packages/swh/loader/git/loader.py\", line 243, in fetch_pack_from_origin\n progress=do_activity).refs\n File \"/usr/lib/python3/dist-packages/dulwich/client.py\", line 1557, in fetch_pack\n \"git-upload-pack\", url, data=req_data.getvalue())\n File \"/usr/lib/python3/dist-packages/dulwich/client.py\", line 1467, in _smart_request\n resp, read = self._http_request(url, headers, data)\n File \"/usr/lib/python3/dist-packages/dulwich/client.py\", line 1402, in _http_request\n raise NotGitRepository()\ndulwich.errors.NotGitRepository", + "uid": "1004", + "syslog_identifier": "python3", + "swh_task_kwargs_base_url": "None", + "hostname": "worker13", + "cmdline": "/usr/bin/python3 -m celery worker --app=swh.scheduler.celery_backend.config.app --pool=prefork --events --concurrency=1 --maxtasksperchild=5 -Ofair --loglevel=info --without-gossip --without-mingle --without-heartbeat -n loader_git.%h" + }, + "sort": [1567207093348] + }] + } +} \ No newline at end of file