Changeset View
Changeset View
Standalone View
Standalone View
swh/web/common/origin_save.py
# Copyright (C) 2018-2020 The Software Heritage developers | # Copyright (C) 2018-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU Affero General Public License version 3, or any later version | # License: GNU Affero General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from bisect import bisect_right | from bisect import bisect_right | ||||
from datetime import datetime, timezone, timedelta | from datetime import datetime, timezone, timedelta | ||||
from itertools import product | from itertools import product | ||||
import json | import json | ||||
import logging | import logging | ||||
from typing import Any, Dict | |||||
from django.core.exceptions import ObjectDoesNotExist | from django.core.exceptions import ObjectDoesNotExist | ||||
from django.core.exceptions import ValidationError | from django.core.exceptions import ValidationError | ||||
from django.core.validators import URLValidator | from django.core.validators import URLValidator | ||||
from django.utils.html import escape | from django.utils.html import escape | ||||
from prometheus_client import Gauge | from prometheus_client import Gauge | ||||
▲ Show 20 Lines • Show All 400 Lines • ▼ Show 20 Lines | def get_save_origin_requests(visit_type, origin_url): | ||||
if sors.count() == 0: | if sors.count() == 0: | ||||
raise NotFoundExc( | raise NotFoundExc( | ||||
("No save requests found for visit of type " "%s on origin with url %s.") | ("No save requests found for visit of type " "%s on origin with url %s.") | ||||
% (visit_type, origin_url) | % (visit_type, origin_url) | ||||
) | ) | ||||
return get_save_origin_requests_from_queryset(sors) | return get_save_origin_requests_from_queryset(sors) | ||||
def get_save_origin_task_info(save_request_id): | def get_save_origin_task_info( | ||||
save_request_id: int, full_info: bool = True | |||||
) -> Dict[str, Any]: | |||||
""" | """ | ||||
Get detailed information about an accepted save origin request | Get detailed information about an accepted save origin request | ||||
and its associated loading task. | and its associated loading task. | ||||
If the associated loading task info is archived and removed | If the associated loading task info is archived and removed | ||||
from the scheduler database, returns an empty dictionary. | from the scheduler database, returns an empty dictionary. | ||||
Args: | Args: | ||||
save_request_id (int): identifier of a save origin request | save_request_id: identifier of a save origin request | ||||
full_info: whether to return detailed info for staff users | |||||
Returns: | Returns: | ||||
dict: A dictionary with the following keys: | A dictionary with the following keys: | ||||
- **type**: loading task type | - **type**: loading task type | ||||
- **arguments**: loading task arguments | - **arguments**: loading task arguments | ||||
- **id**: loading task database identifier | - **id**: loading task database identifier | ||||
- **backend_id**: loading task celery identifier | - **backend_id**: loading task celery identifier | ||||
- **scheduled**: loading task scheduling date | - **scheduled**: loading task scheduling date | ||||
- **ended**: loading task termination date | - **ended**: loading task termination date | ||||
- **status**: loading task execution status | - **status**: loading task execution status | ||||
Show All 21 Lines | ) -> Dict[str, Any]: | ||||
task_run = task_run[0] if task_run else None | task_run = task_run[0] if task_run else None | ||||
if task_run is None: | if task_run is None: | ||||
return {} | return {} | ||||
task_run["type"] = task["type"] | task_run["type"] = task["type"] | ||||
task_run["arguments"] = task["arguments"] | task_run["arguments"] = task["arguments"] | ||||
task_run["id"] = task_run["task"] | task_run["id"] = task_run["task"] | ||||
del task_run["task"] | del task_run["task"] | ||||
del task_run["metadata"] | del task_run["metadata"] | ||||
del task_run["started"] | |||||
es_workers_index_url = config.get_config()["es_workers_index_url"] | es_workers_index_url = config.get_config()["es_workers_index_url"] | ||||
if not es_workers_index_url: | if not es_workers_index_url: | ||||
return task_run | return task_run | ||||
es_workers_index_url += "/_search" | es_workers_index_url += "/_search" | ||||
if save_request.visit_date: | if save_request.visit_date: | ||||
min_ts = save_request.visit_date | min_ts = save_request.visit_date | ||||
max_ts = min_ts + timedelta(days=7) | max_ts = min_ts + timedelta(days=7) | ||||
else: | else: | ||||
min_ts = save_request.request_date | min_ts = save_request.request_date | ||||
max_ts = min_ts + timedelta(days=30) | max_ts = min_ts + timedelta(days=30) | ||||
min_ts = int(min_ts.timestamp()) * 1000 | min_ts_unix = int(min_ts.timestamp()) * 1000 | ||||
max_ts = int(max_ts.timestamp()) * 1000 | max_ts_unix = int(max_ts.timestamp()) * 1000 | ||||
save_task_status = _save_task_status[task["status"]] | save_task_status = _save_task_status[task["status"]] | ||||
priority = "3" if save_task_status == SAVE_TASK_FAILED else "6" | priority = "3" if save_task_status == SAVE_TASK_FAILED else "6" | ||||
query = { | query = { | ||||
"bool": { | "bool": { | ||||
"must": [ | "must": [ | ||||
{"match_phrase": {"priority": {"query": priority}}}, | {"match_phrase": {"priority": {"query": priority}}}, | ||||
{"match_phrase": {"swh_task_id": {"query": task_run["backend_id"]}}}, | {"match_phrase": {"swh_task_id": {"query": task_run["backend_id"]}}}, | ||||
{ | { | ||||
"range": { | "range": { | ||||
"@timestamp": { | "@timestamp": { | ||||
"gte": min_ts, | "gte": min_ts_unix, | ||||
"lte": max_ts, | "lte": max_ts_unix, | ||||
"format": "epoch_millis", | "format": "epoch_millis", | ||||
} | } | ||||
} | } | ||||
}, | }, | ||||
] | ] | ||||
} | } | ||||
} | } | ||||
Show All 18 Lines | try: | ||||
if "hostname" in task_run_info: | if "hostname" in task_run_info: | ||||
task_run["worker"] = task_run_info["hostname"] | task_run["worker"] = task_run_info["hostname"] | ||||
elif "host" in task_run_info: | elif "host" in task_run_info: | ||||
task_run["worker"] = task_run_info["host"] | task_run["worker"] = task_run_info["host"] | ||||
except Exception as exc: | except Exception as exc: | ||||
logger.warning("Request to Elasticsearch failed\n%s", exc) | logger.warning("Request to Elasticsearch failed\n%s", exc) | ||||
sentry_sdk.capture_exception(exc) | sentry_sdk.capture_exception(exc) | ||||
if not full_info: | |||||
for field in ("id", "backend_id", "worker"): | |||||
# remove some staff only fields | |||||
task_run.pop(field, None) | |||||
if "message" in task_run and "Loading failure" in task_run["message"]: | |||||
# hide traceback for non staff users, only display exception | |||||
message_lines = task_run["message"].split("\n") | |||||
message = "" | |||||
for line in message_lines: | |||||
if line.startswith("Traceback"): | |||||
break | |||||
message += f"{line}\n" | |||||
message += message_lines[-1] | |||||
task_run["message"] = message | |||||
return task_run | return task_run | ||||
SUBMITTED_SAVE_REQUESTS_METRIC = "swh_web_submitted_save_requests" | SUBMITTED_SAVE_REQUESTS_METRIC = "swh_web_submitted_save_requests" | ||||
_submitted_save_requests_gauge = Gauge( | _submitted_save_requests_gauge = Gauge( | ||||
name=SUBMITTED_SAVE_REQUESTS_METRIC, | name=SUBMITTED_SAVE_REQUESTS_METRIC, | ||||
documentation="Number of submitted origin save requests", | documentation="Number of submitted origin save requests", | ||||
▲ Show 20 Lines • Show All 55 Lines • Show Last 20 Lines |