Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7066223
D1936.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
14 KB
Subscribers
None
D1936.diff
View Options
diff --git a/requirements-test.txt b/requirements-test.txt
--- a/requirements-test.txt
+++ b/requirements-test.txt
@@ -1,5 +1,7 @@
+hypothesis
pytest
pytest-django
-hypothesis
+pytest-mock
+requests-mock
swh.core[http] >= 0.0.61
swh.loader.git >= 0.0.47
diff --git a/swh/web/common/origin_save.py b/swh/web/common/origin_save.py
--- a/swh/web/common/origin_save.py
+++ b/swh/web/common/origin_save.py
@@ -3,8 +3,13 @@
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import json
+import logging
+
from bisect import bisect_right
-from datetime import datetime, timezone
+from datetime import datetime, timezone, timedelta
+
+import requests
from django.core.exceptions import ObjectDoesNotExist
from django.core.exceptions import ValidationError
@@ -27,6 +32,8 @@
scheduler = config.scheduler()
+logger = logging.getLogger(__name__)
+
def get_origin_save_authorized_urls():
"""
@@ -369,14 +376,14 @@
task_ids = []
for sor in requests_queryset:
task_ids.append(sor.loading_task_id)
- requests = []
+ save_requests = []
if task_ids:
tasks = scheduler.get_tasks(task_ids)
tasks = {task['id']: task for task in tasks}
for sor in requests_queryset:
sr_dict = _save_request_dict(sor, tasks.get(sor.loading_task_id))
- requests.append(sr_dict)
- return requests
+ save_requests.append(sr_dict)
+ return save_requests
def get_save_origin_requests(origin_type, origin_url):
@@ -403,3 +410,126 @@
raise NotFoundExc(('No save requests found for origin with type '
'%s and url %s.') % (origin_type, origin_url))
return get_save_origin_requests_from_queryset(sors)
+
+
+def get_save_origin_task_info(save_request_id):
+ """
+ Get detailed information about an accepted save origin request
+ and its associated loading task.
+
+ If the associated loading task info is archived and removed
+ from the scheduler database, returns an empty dictionary.
+
+ Args:
+ save_request_id (int): identifier of a save origin request
+
+ Returns:
+ dict: A dictionary with the following keys:
+ - **type**: loading task type
+ - **arguments**: loading task arguments
+ - **id**: loading task database identifier
+ - **backend_id**: loading task celery identifier
+ - **scheduled**: loading task scheduling date
+ - **ended**: loading task termination date
+ - **status**: loading task execution status
+ Depending on the availability of the task logs in the elasticsearch
+ cluster of Software Heritage, the returned dictionary may also
+ contain the following keys:
+ - **name**: associated celery task name
+ - **message**: relevant log message from task execution
+ - **duration**: task execution time (only if it succeeded)
+ - **worker**: name of the worker that executed the task
+ """
+ try:
+ save_request = SaveOriginRequest.objects.get(id=save_request_id)
+ except ObjectDoesNotExist:
+ return {}
+
+ task = scheduler.get_tasks([save_request.loading_task_id])
+ task = task[0] if task else None
+ if task is None:
+ return {}
+
+ task_run = scheduler.get_task_runs([task['id']])
+ task_run = task_run[0] if task_run else None
+ if task_run is None:
+ return {}
+ task_run['type'] = task['type']
+ task_run['arguments'] = task['arguments']
+ task_run['id'] = task_run['task']
+ del task_run['task']
+ del task_run['metadata']
+ del task_run['started']
+
+ es_workers_index_url = config.get_config()['es_workers_index_url']
+ if not es_workers_index_url:
+ return task_run
+ es_workers_index_url += '/_search'
+
+ if save_request.visit_date:
+ min_ts = save_request.visit_date
+ max_ts = min_ts + timedelta(days=7)
+ else:
+ min_ts = save_request.request_date
+ max_ts = min_ts + timedelta(days=30)
+ min_ts = int(min_ts.timestamp()) * 1000
+ max_ts = int(max_ts.timestamp()) * 1000
+
+ save_task_status = _save_task_status[task['status']]
+ priority = '3' if save_task_status == SAVE_TASK_FAILED else '6'
+
+ query = {
+ 'bool': {
+ 'must': [
+ {
+ 'match_phrase': {
+ 'priority': {
+ 'query': priority
+ }
+ }
+ },
+ {
+ 'match_phrase': {
+ 'swh_task_id': {
+ 'query': task_run['backend_id']
+ }
+ }
+ },
+ {
+ 'range': {
+ '@timestamp': {
+ 'gte': min_ts,
+ 'lte': max_ts,
+ 'format': 'epoch_millis'
+ }
+ }
+ }
+ ]
+ }
+ }
+
+ try:
+ response = requests.post(es_workers_index_url,
+ json={'query': query,
+ 'sort': ['@timestamp']})
+ results = json.loads(response.text)
+ if results['hits']['total'] >= 1:
+ task_run_info = results['hits']['hits'][-1]['_source']
+ if 'swh_logging_args_runtime' in task_run_info:
+ duration = task_run_info['swh_logging_args_runtime']
+ task_run['duration'] = duration
+ if 'message' in task_run_info:
+ task_run['message'] = task_run_info['message']
+ if 'swh_logging_args_name' in task_run_info:
+ task_run['name'] = task_run_info['swh_logging_args_name']
+ elif 'swh_task_name' in task_run_info:
+ task_run['name'] = task_run_info['swh_task_name']
+ if 'hostname' in task_run_info:
+ task_run['worker'] = task_run_info['hostname']
+ elif 'host' in task_run_info:
+ task_run['worker'] = task_run_info['host']
+ except Exception as e:
+ logger.warning('Request to Elasticsearch failed\n%s' % str(e))
+ pass
+
+ return task_run
diff --git a/swh/web/config.py b/swh/web/config.py
--- a/swh/web/config.py
+++ b/swh/web/config.py
@@ -92,7 +92,8 @@
'private_api_password': ''
}),
'coverage_count_origins': ('bool', False),
- 'e2e_tests_mode': ('bool', False)
+ 'e2e_tests_mode': ('bool', False),
+ 'es_workers_index_url': ('string', ''),
}
swhweb_config = {}
diff --git a/swh/web/tests/common/test_origin_save.py b/swh/web/tests/common/test_origin_save.py
new file mode 100644
--- /dev/null
+++ b/swh/web/tests/common/test_origin_save.py
@@ -0,0 +1,127 @@
+# Copyright (C) 2019 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU Affero General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import json
+import os
+
+from datetime import datetime, timedelta, timezone
+
+import pytest
+import requests_mock
+
+from swh.web.common.models import (
+ SaveOriginRequest
+)
+from swh.web.common.origin_save import get_save_origin_task_info
+from swh.web.config import get_config
+
+
+_RESOURCES_PATH = os.path.join(os.path.dirname(__file__), '../resources')
+
+_es_url = 'http://esnode1.internal.softwareheritage.org:9200'
+_es_workers_index_url = '%s/swh_workers-*' % _es_url
+
+
+def _get_save_origin_task_info_test(mocker, task_archived=False,
+ es_available=True):
+
+ swh_web_config = get_config()
+
+ if es_available:
+ swh_web_config.update({'es_workers_index_url': _es_workers_index_url})
+ else:
+ swh_web_config.update({'es_workers_index_url': ''})
+
+ sor_id = 4473
+
+ SaveOriginRequest.objects.create(
+ id=sor_id,
+ request_date=datetime(2019, 8, 30, 23, 7, 3, 474294,
+ tzinfo=timezone.utc),
+ origin_type='git',
+ origin_url='https://gitlab.com/inkscape/inkscape',
+ status='accepted',
+ loading_task_id=203525448,
+ visit_date=datetime(2019, 8, 30, 23, 18, 11, 54341,
+ tzinfo=timezone.utc)
+ )
+
+ mock_scheduler = mocker.patch('swh.web.common.origin_save.scheduler')
+ task = {
+ 'arguments': {
+ 'args': [],
+ 'kwargs': {
+ 'repo_url': 'https://gitlab.com/inkscape/inkscape'
+ }
+ },
+ 'current_interval': timedelta(days=64),
+ 'id': 203525448,
+ 'next_run': datetime(2019, 8, 30, 23, 7, 1, 614823),
+ 'policy': 'oneshot',
+ 'priority': 'high',
+ 'retries_left': 0,
+ 'status': 'disabled',
+ 'type': 'load-git'
+ } if not task_archived else None
+ mock_scheduler.get_tasks.return_value = [task]
+
+ task_run = {
+ 'backend_id': 'f00c712c-e820-41ce-a07c-9bf8df914205',
+ 'ended': datetime(2019, 8, 30, 23, 18, 13, 770800),
+ 'id': 654270631,
+ 'metadata': {},
+ 'scheduled': datetime(2019, 8, 30, 23, 8, 34, 282021),
+ 'started': None,
+ 'status': 'failed',
+ 'task': 203525448
+ }
+ mock_scheduler.get_task_runs.return_value = [task_run]
+
+ es_response = os.path.join(_RESOURCES_PATH,
+ 'json/es_task_info_response.json')
+ with open(es_response) as json_fd:
+ es_response = json.load(json_fd)
+
+ task_exec_data = es_response['hits']['hits'][-1]['_source']
+
+ with requests_mock.Mocker() as requests_mocker:
+ requests_mocker.register_uri('POST', _es_workers_index_url+'/_search',
+ json=es_response)
+
+ sor_task_info = get_save_origin_task_info(sor_id)
+
+ expected_result = {
+ 'type': task['type'],
+ 'arguments': task['arguments'],
+ 'id': task['id'],
+ 'backend_id': task_run['backend_id'],
+ 'scheduled': task_run['scheduled'],
+ 'ended': task_run['ended'],
+ 'status': task_run['status'],
+ } if not task_archived else {}
+
+ if es_available and not task_archived:
+ expected_result.update({
+ 'message': task_exec_data['message'],
+ 'name': task_exec_data['swh_task_name'],
+ 'worker': task_exec_data['hostname']
+ })
+
+ assert sor_task_info == expected_result
+
+
+@pytest.mark.django_db
+def test_get_save_origin_archived_task_info(mocker):
+ _get_save_origin_task_info_test(mocker, task_archived=True)
+
+
+@pytest.mark.django_db
+def test_get_save_origin_task_info_with_es(mocker):
+ _get_save_origin_task_info_test(mocker, es_available=True)
+
+
+@pytest.mark.django_db
+def test_get_save_origin_task_info_without_es(mocker):
+ _get_save_origin_task_info_test(mocker, es_available=False)
diff --git a/swh/web/tests/resources/json/es_task_info_response.json b/swh/web/tests/resources/json/es_task_info_response.json
new file mode 100644
--- /dev/null
+++ b/swh/web/tests/resources/json/es_task_info_response.json
@@ -0,0 +1,62 @@
+{
+ "took": 19,
+ "timed_out": false,
+ "_shards": {
+ "total": 194,
+ "successful": 194,
+ "skipped": 186,
+ "failed": 0
+ },
+ "hits": {
+ "total": 1,
+ "max_score": null,
+ "hits": [{
+ "_index": "swh_workers-2019.08.30",
+ "_type": "doc",
+ "_id": "uHrS5GwBjk15w1A-eZNK",
+ "_score": null,
+ "_source": {
+ "comm": "python3",
+ "code_line": "909",
+ "type": "journal",
+ "code_func": "load",
+ "transport": "journal",
+ "swh_task_name": "swh.loader.git.tasks.UpdateGitRepository",
+ "logger": "swh.loader.git.BulkLoader",
+ "swh_task_args_0": "https://gitlab.com/inkscape/inkscape",
+ "source_realtime_timestamp": "1567207093348189",
+ "code_file": "/usr/lib/python3/dist-packages/swh/loader/core/loader.py",
+ "systemd_slice": "system-swh\\x2dworker.slice",
+ "@version": "1",
+ "cap_effective": "0",
+ "boot_id": "b82af8ba13ee48258109a7dfd5058e53",
+ "machine_id": "563ec85b8bcd4ec289b9af4f52b6fa41",
+ "swh_task_id": "f00c712c-e820-41ce-a07c-9bf8df914205",
+ "gid": "1004",
+ "beat": {
+ "name": "worker13",
+ "version": "5.5.0",
+ "hostname": "worker13"
+ },
+ "priority": "3",
+ "systemd_invocation_id": "18bb45cd515d4e1794ddd4d391389045",
+ "@realtime_timestamp": 1567207093348366,
+ "pid": "675",
+ "exe": "/usr/bin/python3.5",
+ "@timestamp": "2019-08-30T23:18:13.348Z",
+ "systemd_unit": "swh-worker@loader_git.service",
+ "tags": ["beats_input_codec_plain_applied"],
+ "systemd_cgroup": "/system.slice/system-swh\\x2dworker.slice/swh-worker@loader_git.service",
+ "host": "worker13",
+ "thread_name": "MainThread",
+ "message": "[2019-08-30 23:18:13,342: ERROR/ForkPoolWorker-64335] Loading failure, updating to `partial` status\nTraceback (most recent call last):\n File \"/usr/lib/python3/dist-packages/swh/loader/core/loader.py\", line 895, in load\n more_data_to_fetch = self.fetch_data()\n File \"/usr/lib/python3/dist-packages/swh/loader/git/loader.py\", line 311, in fetch_data\n do_progress)\n File \"/usr/lib/python3/dist-packages/swh/loader/git/loader.py\", line 243, in fetch_pack_from_origin\n progress=do_activity).refs\n File \"/usr/lib/python3/dist-packages/dulwich/client.py\", line 1557, in fetch_pack\n \"git-upload-pack\", url, data=req_data.getvalue())\n File \"/usr/lib/python3/dist-packages/dulwich/client.py\", line 1467, in _smart_request\n resp, read = self._http_request(url, headers, data)\n File \"/usr/lib/python3/dist-packages/dulwich/client.py\", line 1402, in _http_request\n raise NotGitRepository()\ndulwich.errors.NotGitRepository",
+ "uid": "1004",
+ "syslog_identifier": "python3",
+ "swh_task_kwargs_base_url": "None",
+ "hostname": "worker13",
+ "cmdline": "/usr/bin/python3 -m celery worker --app=swh.scheduler.celery_backend.config.app --pool=prefork --events --concurrency=1 --maxtasksperchild=5 -Ofair --loglevel=info --without-gossip --without-mingle --without-heartbeat -n loader_git.%h"
+ },
+ "sort": [1567207093348]
+ }]
+ }
+}
\ No newline at end of file
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Nov 5 2024, 1:48 AM (10 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3225661
Attached To
D1936: Add function to get task execution info for a 'Save code now' request
Event Timeline
Log In to Comment