Changeset View
Changeset View
Standalone View
Standalone View
swh/web/common/origin_save.py
# Copyright (C) 2018-2019 The Software Heritage developers | # Copyright (C) 2018-2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU Affero General Public License version 3, or any later version | # License: GNU Affero General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import json | |||||
import logging | |||||
from bisect import bisect_right | from bisect import bisect_right | ||||
from datetime import datetime, timezone | from datetime import datetime, timezone, timedelta | ||||
import requests | |||||
from django.core.exceptions import ObjectDoesNotExist | from django.core.exceptions import ObjectDoesNotExist | ||||
from django.core.exceptions import ValidationError | from django.core.exceptions import ValidationError | ||||
from django.core.validators import URLValidator | from django.core.validators import URLValidator | ||||
from django.utils.html import escape | from django.utils.html import escape | ||||
from swh.web import config | from swh.web import config | ||||
from swh.web.common import service | from swh.web.common import service | ||||
from swh.web.common.exc import BadInputExc, ForbiddenExc, NotFoundExc | from swh.web.common.exc import BadInputExc, ForbiddenExc, NotFoundExc | ||||
from swh.web.common.models import ( | from swh.web.common.models import ( | ||||
SaveUnauthorizedOrigin, SaveAuthorizedOrigin, SaveOriginRequest, | SaveUnauthorizedOrigin, SaveAuthorizedOrigin, SaveOriginRequest, | ||||
SAVE_REQUEST_ACCEPTED, SAVE_REQUEST_REJECTED, SAVE_REQUEST_PENDING, | SAVE_REQUEST_ACCEPTED, SAVE_REQUEST_REJECTED, SAVE_REQUEST_PENDING, | ||||
SAVE_TASK_NOT_YET_SCHEDULED, SAVE_TASK_SCHEDULED, | SAVE_TASK_NOT_YET_SCHEDULED, SAVE_TASK_SCHEDULED, | ||||
SAVE_TASK_SUCCEED, SAVE_TASK_FAILED, SAVE_TASK_RUNNING | SAVE_TASK_SUCCEED, SAVE_TASK_FAILED, SAVE_TASK_RUNNING | ||||
) | ) | ||||
from swh.web.common.origin_visits import get_origin_visits | from swh.web.common.origin_visits import get_origin_visits | ||||
from swh.web.common.utils import parse_timestamp | from swh.web.common.utils import parse_timestamp | ||||
from swh.scheduler.utils import create_oneshot_task_dict | from swh.scheduler.utils import create_oneshot_task_dict | ||||
scheduler = config.scheduler() | scheduler = config.scheduler() | ||||
logger = logging.getLogger(__name__) | |||||
def get_origin_save_authorized_urls(): | def get_origin_save_authorized_urls(): | ||||
""" | """ | ||||
Get the list of origin url prefixes authorized to be | Get the list of origin url prefixes authorized to be | ||||
immediately loaded into the archive (whitelist). | immediately loaded into the archive (whitelist). | ||||
Returns: | Returns: | ||||
list: The list of authorized origin url prefix | list: The list of authorized origin url prefix | ||||
▲ Show 20 Lines • Show All 326 Lines • ▼ Show 20 Lines | def get_save_origin_requests_from_queryset(requests_queryset): | ||||
Returns: | Returns: | ||||
list: A list of save origin requests dict as described in | list: A list of save origin requests dict as described in | ||||
:func:`swh.web.common.origin_save.create_save_origin_request` | :func:`swh.web.common.origin_save.create_save_origin_request` | ||||
""" | """ | ||||
task_ids = [] | task_ids = [] | ||||
for sor in requests_queryset: | for sor in requests_queryset: | ||||
task_ids.append(sor.loading_task_id) | task_ids.append(sor.loading_task_id) | ||||
requests = [] | save_requests = [] | ||||
if task_ids: | if task_ids: | ||||
tasks = scheduler.get_tasks(task_ids) | tasks = scheduler.get_tasks(task_ids) | ||||
tasks = {task['id']: task for task in tasks} | tasks = {task['id']: task for task in tasks} | ||||
for sor in requests_queryset: | for sor in requests_queryset: | ||||
sr_dict = _save_request_dict(sor, tasks.get(sor.loading_task_id)) | sr_dict = _save_request_dict(sor, tasks.get(sor.loading_task_id)) | ||||
requests.append(sr_dict) | save_requests.append(sr_dict) | ||||
return requests | return save_requests | ||||
def get_save_origin_requests(origin_type, origin_url): | def get_save_origin_requests(origin_type, origin_url): | ||||
""" | """ | ||||
Get all save requests for a given software origin. | Get all save requests for a given software origin. | ||||
Args: | Args: | ||||
origin_type (str): the type of the origin | origin_type (str): the type of the origin | ||||
Show All 10 Lines | def get_save_origin_requests(origin_type, origin_url): | ||||
_check_origin_type_savable(origin_type) | _check_origin_type_savable(origin_type) | ||||
_check_origin_url_valid(origin_url) | _check_origin_url_valid(origin_url) | ||||
sors = SaveOriginRequest.objects.filter(origin_type=origin_type, | sors = SaveOriginRequest.objects.filter(origin_type=origin_type, | ||||
origin_url=origin_url) | origin_url=origin_url) | ||||
if sors.count() == 0: | if sors.count() == 0: | ||||
raise NotFoundExc(('No save requests found for origin with type ' | raise NotFoundExc(('No save requests found for origin with type ' | ||||
'%s and url %s.') % (origin_type, origin_url)) | '%s and url %s.') % (origin_type, origin_url)) | ||||
return get_save_origin_requests_from_queryset(sors) | return get_save_origin_requests_from_queryset(sors) | ||||
def get_save_origin_task_info(save_request_id): | |||||
""" | |||||
Get detailed information about an accepted save origin request | |||||
ardumont: its associated... | |||||
and its associated loading task. | |||||
Not Done Inline Actionsis archived and removed from..., returns an empty dictionary. ardumont: is archived and removed from..., returns an empty dictionary. | |||||
If the associated loading task info is archived and removed | |||||
from the scheduler database, returns an empty dictionary. | |||||
Args: | |||||
save_request_id (int): identifier of a save origin request | |||||
Returns: | |||||
dict: A dictionary with the following keys: | |||||
Not Done Inline Actionsloading task type? (Same sentence rephrasing below?) ardumont: loading task type?
(Same sentence rephrasing below?) | |||||
Done Inline ActionsAck, shorter indeed anlambert: Ack, shorter indeed | |||||
- **type**: loading task type | |||||
- **arguments**: loading task arguments | |||||
- **id**: loading task database identifier | |||||
- **backend_id**: loading task celery identifier | |||||
- **scheduled**: loading task scheduling date | |||||
- **ended**: loading task termination date | |||||
- **status**: loading task execution status | |||||
Depending on the availability of the task logs in the elasticsearch | |||||
cluster of Software Heritage, the returned dictionary may also | |||||
contain the following keys: | |||||
- **name**: associated celery task name | |||||
- **message**: relevant log message from task execution | |||||
- **duration**: task execution time (only if it succeeded) | |||||
- **worker**: name of the worker that executed the task | |||||
""" | |||||
try: | |||||
save_request = SaveOriginRequest.objects.get(id=save_request_id) | |||||
except ObjectDoesNotExist: | |||||
return {} | |||||
task = scheduler.get_tasks([save_request.loading_task_id]) | |||||
task = task[0] if task else None | |||||
if task is None: | |||||
return {} | |||||
task_run = scheduler.get_task_runs([task['id']]) | |||||
task_run = task_run[0] if task_run else None | |||||
if task_run is None: | |||||
return {} | |||||
task_run['type'] = task['type'] | |||||
task_run['arguments'] = task['arguments'] | |||||
task_run['id'] = task_run['task'] | |||||
del task_run['task'] | |||||
del task_run['metadata'] | |||||
del task_run['started'] | |||||
es_workers_index_url = config.get_config()['es_workers_index_url'] | |||||
if not es_workers_index_url: | |||||
return task_run | |||||
es_workers_index_url += '/_search' | |||||
if save_request.visit_date: | |||||
min_ts = save_request.visit_date | |||||
max_ts = min_ts + timedelta(days=7) | |||||
else: | |||||
min_ts = save_request.request_date | |||||
max_ts = min_ts + timedelta(days=30) | |||||
min_ts = int(min_ts.timestamp()) * 1000 | |||||
max_ts = int(max_ts.timestamp()) * 1000 | |||||
save_task_status = _save_task_status[task['status']] | |||||
priority = '3' if save_task_status == SAVE_TASK_FAILED else '6' | |||||
query = { | |||||
'bool': { | |||||
'must': [ | |||||
{ | |||||
'match_phrase': { | |||||
'priority': { | |||||
'query': priority | |||||
} | |||||
} | |||||
}, | |||||
{ | |||||
'match_phrase': { | |||||
'swh_task_id': { | |||||
'query': task_run['backend_id'] | |||||
} | |||||
} | |||||
}, | |||||
{ | |||||
'range': { | |||||
'@timestamp': { | |||||
'gte': min_ts, | |||||
'lte': max_ts, | |||||
'format': 'epoch_millis' | |||||
} | |||||
} | |||||
} | |||||
] | |||||
} | |||||
} | |||||
try: | |||||
response = requests.post(es_workers_index_url, | |||||
json={'query': query, | |||||
'sort': ['@timestamp']}) | |||||
results = json.loads(response.text) | |||||
if results['hits']['total'] >= 1: | |||||
task_run_info = results['hits']['hits'][-1]['_source'] | |||||
if 'swh_logging_args_runtime' in task_run_info: | |||||
duration = task_run_info['swh_logging_args_runtime'] | |||||
task_run['duration'] = duration | |||||
if 'message' in task_run_info: | |||||
task_run['message'] = task_run_info['message'] | |||||
if 'swh_logging_args_name' in task_run_info: | |||||
task_run['name'] = task_run_info['swh_logging_args_name'] | |||||
elif 'swh_task_name' in task_run_info: | |||||
task_run['name'] = task_run_info['swh_task_name'] | |||||
if 'hostname' in task_run_info: | |||||
task_run['worker'] = task_run_info['hostname'] | |||||
elif 'host' in task_run_info: | |||||
task_run['worker'] = task_run_info['host'] | |||||
except Exception as e: | |||||
Not Done Inline ActionsLogging a warning exception should exist here, should'nt it? ardumont: Logging a warning exception should exist here, should'nt it?
At least to have some grasp on… | |||||
Done Inline ActionsAck, will log a warning here anlambert: Ack, will log a warning here | |||||
logger.warning('Request to Elasticsearch failed\n%s' % str(e)) | |||||
pass | |||||
return task_run |
its associated...