Changeset View
Changeset View
Standalone View
Standalone View
swh/web/common/origin_save.py
Show First 20 Lines • Show All 724 Lines • ▼ Show 20 Lines | Returns: | ||||
- **duration**: task execution time (only if it succeeded) | - **duration**: task execution time (only if it succeeded) | ||||
- **worker**: name of the worker that executed the task | - **worker**: name of the worker that executed the task | ||||
""" | """ | ||||
try: | try: | ||||
save_request = SaveOriginRequest.objects.get(id=save_request_id) | save_request = SaveOriginRequest.objects.get(id=save_request_id) | ||||
except ObjectDoesNotExist: | except ObjectDoesNotExist: | ||||
return {} | return {} | ||||
task_info: Dict[str, Any] = {} | |||||
if save_request.note is not None: | |||||
task_info["note"] = save_request.note | |||||
try: | |||||
task = scheduler().get_tasks([save_request.loading_task_id]) | task = scheduler().get_tasks([save_request.loading_task_id]) | ||||
except Exception: | |||||
# to avoid mocking GET responses of /save/task/info/ endpoint when running | |||||
# cypress tests as scheduler is not available in that case | |||||
task = None | |||||
task = task[0] if task else None | task = task[0] if task else None | ||||
if task is None: | if task is None: | ||||
return {} | return task_info | ||||
task_run = scheduler().get_task_runs([task["id"]]) | task_run = scheduler().get_task_runs([task["id"]]) | ||||
task_run = task_run[0] if task_run else None | task_run = task_run[0] if task_run else None | ||||
if task_run is None: | if task_run is None: | ||||
return {} | return task_info | ||||
task_run["type"] = task["type"] | task_info.update(task_run) | ||||
task_run["arguments"] = task["arguments"] | task_info["type"] = task["type"] | ||||
task_run["id"] = task_run["task"] | task_info["arguments"] = task["arguments"] | ||||
del task_run["task"] | task_info["id"] = task_run["task"] | ||||
del task_run["metadata"] | del task_info["task"] | ||||
# Enrich the task run with the loading visit status | del task_info["metadata"] | ||||
task_run["visit_status"] = save_request.visit_status | # Enrich the task info with the loading visit status | ||||
task_info["visit_status"] = save_request.visit_status | |||||
es_workers_index_url = get_config()["es_workers_index_url"] | es_workers_index_url = get_config()["es_workers_index_url"] | ||||
if not es_workers_index_url: | if not es_workers_index_url: | ||||
return task_run | return task_info | ||||
es_workers_index_url += "/_search" | es_workers_index_url += "/_search" | ||||
if save_request.visit_date: | if save_request.visit_date: | ||||
min_ts = save_request.visit_date | min_ts = save_request.visit_date | ||||
max_ts = min_ts + timedelta(days=7) | max_ts = min_ts + timedelta(days=7) | ||||
else: | else: | ||||
min_ts = save_request.request_date | min_ts = save_request.request_date | ||||
max_ts = min_ts + timedelta(days=MAX_THRESHOLD_DAYS) | max_ts = min_ts + timedelta(days=MAX_THRESHOLD_DAYS) | ||||
Show All 27 Lines | try: | ||||
json={"query": query, "sort": ["@timestamp"]}, | json={"query": query, "sort": ["@timestamp"]}, | ||||
timeout=30, | timeout=30, | ||||
) | ) | ||||
results = json.loads(response.text) | results = json.loads(response.text) | ||||
if results["hits"]["total"]["value"] >= 1: | if results["hits"]["total"]["value"] >= 1: | ||||
task_run_info = results["hits"]["hits"][-1]["_source"] | task_run_info = results["hits"]["hits"][-1]["_source"] | ||||
if "swh_logging_args_runtime" in task_run_info: | if "swh_logging_args_runtime" in task_run_info: | ||||
duration = task_run_info["swh_logging_args_runtime"] | duration = task_run_info["swh_logging_args_runtime"] | ||||
task_run["duration"] = duration | task_info["duration"] = duration | ||||
if "message" in task_run_info: | if "message" in task_run_info: | ||||
task_run["message"] = task_run_info["message"] | task_info["message"] = task_run_info["message"] | ||||
if "swh_logging_args_name" in task_run_info: | if "swh_logging_args_name" in task_run_info: | ||||
task_run["name"] = task_run_info["swh_logging_args_name"] | task_info["name"] = task_run_info["swh_logging_args_name"] | ||||
elif "swh_task_name" in task_run_info: | elif "swh_task_name" in task_run_info: | ||||
task_run["name"] = task_run_info["swh_task_name"] | task_info["name"] = task_run_info["swh_task_name"] | ||||
if "hostname" in task_run_info: | if "hostname" in task_run_info: | ||||
task_run["worker"] = task_run_info["hostname"] | task_info["worker"] = task_run_info["hostname"] | ||||
elif "host" in task_run_info: | elif "host" in task_run_info: | ||||
task_run["worker"] = task_run_info["host"] | task_info["worker"] = task_run_info["host"] | ||||
except Exception as exc: | except Exception as exc: | ||||
logger.warning("Request to Elasticsearch failed\n%s", exc) | logger.warning("Request to Elasticsearch failed\n%s", exc) | ||||
sentry_sdk.capture_exception(exc) | sentry_sdk.capture_exception(exc) | ||||
if not full_info: | if not full_info: | ||||
for field in ("id", "backend_id", "worker"): | for field in ("id", "backend_id", "worker"): | ||||
# remove some staff only fields | # remove some staff only fields | ||||
task_run.pop(field, None) | task_info.pop(field, None) | ||||
if "message" in task_run and "Loading failure" in task_run["message"]: | if "message" in task_run and "Loading failure" in task_run["message"]: | ||||
# hide traceback for non staff users, only display exception | # hide traceback for non staff users, only display exception | ||||
message_lines = task_run["message"].split("\n") | message_lines = task_info["message"].split("\n") | ||||
message = "" | message = "" | ||||
for line in message_lines: | for line in message_lines: | ||||
if line.startswith("Traceback"): | if line.startswith("Traceback"): | ||||
break | break | ||||
message += f"{line}\n" | message += f"{line}\n" | ||||
message += message_lines[-1] | message += message_lines[-1] | ||||
task_run["message"] = message | task_info["message"] = message | ||||
return task_run | return task_info | ||||
SUBMITTED_SAVE_REQUESTS_METRIC = "swh_web_submitted_save_requests" | SUBMITTED_SAVE_REQUESTS_METRIC = "swh_web_submitted_save_requests" | ||||
_submitted_save_requests_gauge = Gauge( | _submitted_save_requests_gauge = Gauge( | ||||
name=SUBMITTED_SAVE_REQUESTS_METRIC, | name=SUBMITTED_SAVE_REQUESTS_METRIC, | ||||
documentation="Number of submitted origin save requests", | documentation="Number of submitted origin save requests", | ||||
labelnames=["status", "visit_type"], | labelnames=["status", "visit_type"], | ||||
▲ Show 20 Lines • Show All 90 Lines • Show Last 20 Lines |