Changeset View
Changeset View
Standalone View
Standalone View
swh/web/common/origin_save.py
Show All 32 Lines | from swh.web.common.models import ( | ||||
SAVE_TASK_RUNNING, | SAVE_TASK_RUNNING, | ||||
SAVE_TASK_SCHEDULED, | SAVE_TASK_SCHEDULED, | ||||
SAVE_TASK_SUCCEEDED, | SAVE_TASK_SUCCEEDED, | ||||
SaveAuthorizedOrigin, | SaveAuthorizedOrigin, | ||||
SaveOriginRequest, | SaveOriginRequest, | ||||
SaveUnauthorizedOrigin, | SaveUnauthorizedOrigin, | ||||
) | ) | ||||
from swh.web.common.origin_visits import get_origin_visits | from swh.web.common.origin_visits import get_origin_visits | ||||
from swh.web.common.typing import OriginInfo | from swh.web.common.typing import OriginInfo, SaveOriginRequestInfo | ||||
from swh.web.common.utils import SWH_WEB_METRICS_REGISTRY, parse_iso8601_date_to_utc | from swh.web.common.utils import SWH_WEB_METRICS_REGISTRY, parse_iso8601_date_to_utc | ||||
scheduler = config.scheduler() | scheduler = config.scheduler() | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
def get_origin_save_authorized_urls() -> List[str]: | def get_origin_save_authorized_urls() -> List[str]: | ||||
▲ Show 20 Lines • Show All 170 Lines • ▼ Show 20 Lines | else: | ||||
time_delta = time_now - save_request.request_date | time_delta = time_now - save_request.request_date | ||||
# consider the task as failed if it is still in scheduled state | # consider the task as failed if it is still in scheduled state | ||||
# 30 days after its submission | # 30 days after its submission | ||||
if time_delta.days > 30: | if time_delta.days > 30: | ||||
save_task_status = SAVE_TASK_FAILED | save_task_status = SAVE_TASK_FAILED | ||||
return visit_date, save_task_status | return visit_date, save_task_status | ||||
def _save_request_dict( | def _update_save_request_info( | ||||
save_request: SaveOriginRequest, | save_request: SaveOriginRequest, | ||||
anlambert: to be moved in the model | |||||
task: Optional[Dict[str, Any]] = None, | task: Optional[Dict[str, Any]] = None, | ||||
task_run: Optional[Dict[str, Any]] = None, | task_run: Optional[Dict[str, Any]] = None, | ||||
) -> Dict[str, Any]: | ) -> SaveOriginRequestInfo: | ||||
"""Update save request information out of task and task_run information. | """Update save request information out of task and task_run information. | ||||
Args: | Args: | ||||
save_request: Save request | save_request: Save request | ||||
task: Associated scheduler task information about the save request | task: Associated scheduler task information about the save request | ||||
task_run: Most recent run occurrence of the associated task | task_run: Most recent run occurrence of the associated task | ||||
Returns: | Returns: | ||||
▲ Show 20 Lines • Show All 47 Lines • ▼ Show 20 Lines | if ( | ||||
and save_request.loading_task_status != save_task_status | and save_request.loading_task_status != save_task_status | ||||
): | ): | ||||
save_request.loading_task_status = save_task_status | save_request.loading_task_status = save_task_status | ||||
must_save = True | must_save = True | ||||
if must_save: | if must_save: | ||||
save_request.save() | save_request.save() | ||||
return { | return save_request.to_dict() | ||||
"id": save_request.id, | |||||
"visit_type": save_request.visit_type, | |||||
"visit_status": save_request.visit_status, | |||||
"origin_url": save_request.origin_url, | |||||
"save_request_date": save_request.request_date.isoformat(), | |||||
"save_request_status": save_request.status, | |||||
"save_task_status": save_request.loading_task_status, | |||||
"visit_date": visit_date.isoformat() if visit_date else None, | |||||
} | |||||
def create_save_origin_request(visit_type: str, origin_url: str) -> Dict[str, Any]: | def create_save_origin_request( | ||||
visit_type: str, origin_url: str | |||||
) -> SaveOriginRequestInfo: | |||||
""" | """ | ||||
Create a loading task to save a software origin into the archive. | Create a loading task to save a software origin into the archive. | ||||
This function aims to create a software origin loading task | This function aims to create a software origin loading task | ||||
trough the use of the swh-scheduler component. | trough the use of the swh-scheduler component. | ||||
First, some checks are performed to see if the visit type and origin | First, some checks are performed to see if the visit type and origin | ||||
url are valid but also if the the save request can be accepted. | url are valid but also if the the save request can be accepted. | ||||
Show All 34 Lines | ) -> SaveOriginRequestInfo: | ||||
# if the origin save request is accepted, create a scheduler | # if the origin save request is accepted, create a scheduler | ||||
# task to load it into the archive | # task to load it into the archive | ||||
if save_request_status == SAVE_REQUEST_ACCEPTED: | if save_request_status == SAVE_REQUEST_ACCEPTED: | ||||
# create a task with high priority | # create a task with high priority | ||||
kwargs = { | kwargs = { | ||||
"priority": "high", | "priority": "high", | ||||
"url": origin_url, | "url": origin_url, | ||||
} | } | ||||
sor = None | sor = None | ||||
Done Inline Actionsdid not realize i renamed this, i'll revert ardumont: did not realize i renamed this, i'll revert | |||||
# get list of previously sumitted save requests | # get list of previously sumitted save requests | ||||
current_sors = list( | current_sors = list( | ||||
SaveOriginRequest.objects.filter( | SaveOriginRequest.objects.filter( | ||||
visit_type=visit_type, origin_url=origin_url | visit_type=visit_type, origin_url=origin_url | ||||
) | ) | ||||
) | ) | ||||
can_create_task = False | can_create_task = False | ||||
Show All 9 Lines | if save_request_status == SAVE_REQUEST_ACCEPTED: | ||||
can_create_task = True | can_create_task = True | ||||
# a task has already been created to load the origin | # a task has already been created to load the origin | ||||
elif sor.loading_task_id != -1: | elif sor.loading_task_id != -1: | ||||
# get the scheduler task and its status | # get the scheduler task and its status | ||||
tasks = scheduler.get_tasks([sor.loading_task_id]) | tasks = scheduler.get_tasks([sor.loading_task_id]) | ||||
task = tasks[0] if tasks else None | task = tasks[0] if tasks else None | ||||
task_runs = scheduler.get_task_runs([sor.loading_task_id]) | task_runs = scheduler.get_task_runs([sor.loading_task_id]) | ||||
task_run = task_runs[0] if task_runs else None | task_run = task_runs[0] if task_runs else None | ||||
save_request = _save_request_dict(sor, task, task_run) | save_request_info = _update_save_request_info(sor, task, task_run) | ||||
task_status = save_request["save_task_status"] | task_status = save_request_info["save_task_status"] | ||||
# create a new scheduler task only if the previous one has been | # create a new scheduler task only if the previous one has been | ||||
# already executed | # already executed | ||||
if ( | if ( | ||||
task_status == SAVE_TASK_FAILED | task_status == SAVE_TASK_FAILED | ||||
or task_status == SAVE_TASK_SUCCEEDED | or task_status == SAVE_TASK_SUCCEEDED | ||||
): | ): | ||||
can_create_task = True | can_create_task = True | ||||
sor = None | sor = None | ||||
▲ Show 20 Lines • Show All 41 Lines • ▼ Show 20 Lines | if save_request_status == SAVE_REQUEST_REJECTED: | ||||
raise ForbiddenExc( | raise ForbiddenExc( | ||||
( | ( | ||||
'The "save code now" request has been rejected ' | 'The "save code now" request has been rejected ' | ||||
"because the provided origin url is blacklisted." | "because the provided origin url is blacklisted." | ||||
) | ) | ||||
) | ) | ||||
assert sor is not None | assert sor is not None | ||||
return _save_request_dict(sor, task) | return _update_save_request_info(sor, task) | ||||
def get_save_origin_requests_from_queryset( | def update_save_origin_requests_from_queryset( | ||||
requests_queryset: QuerySet, | requests_queryset: QuerySet, | ||||
) -> List[Dict[str, Any]]: | ) -> List[SaveOriginRequestInfo]: | ||||
""" | """Update all save requests from a SaveOriginRequest queryset, update their status in db | ||||
Get all save requests from a SaveOriginRequest queryset. | and return the list of impacted save_requests. | ||||
Args: | Args: | ||||
requests_queryset (django.db.models.QuerySet): input | requests_queryset: input SaveOriginRequest queryset | ||||
SaveOriginRequest queryset | |||||
Returns: | Returns: | ||||
list: A list of save origin requests dict as described in | list: A list of save origin request info dicts as described in | ||||
:func:`swh.web.common.origin_save.create_save_origin_request` | :func:`swh.web.common.origin_save.create_save_origin_request` | ||||
""" | """ | ||||
task_ids = [] | task_ids = [] | ||||
for sor in requests_queryset: | for sor in requests_queryset: | ||||
task_ids.append(sor.loading_task_id) | task_ids.append(sor.loading_task_id) | ||||
save_requests = [] | save_requests = [] | ||||
if task_ids: | if task_ids: | ||||
tasks = scheduler.get_tasks(task_ids) | tasks = scheduler.get_tasks(task_ids) | ||||
tasks = {task["id"]: task for task in tasks} | tasks = {task["id"]: task for task in tasks} | ||||
task_runs = scheduler.get_task_runs(tasks) | task_runs = scheduler.get_task_runs(tasks) | ||||
task_runs = {task_run["task"]: task_run for task_run in task_runs} | task_runs = {task_run["task"]: task_run for task_run in task_runs} | ||||
for sor in requests_queryset: | for sor in requests_queryset: | ||||
sr_dict = _save_request_dict( | sr_dict = _update_save_request_info( | ||||
sor, tasks.get(sor.loading_task_id), task_runs.get(sor.loading_task_id) | sor, tasks.get(sor.loading_task_id), task_runs.get(sor.loading_task_id), | ||||
) | ) | ||||
save_requests.append(sr_dict) | save_requests.append(sr_dict) | ||||
return save_requests | return save_requests | ||||
def get_save_origin_requests(visit_type: str, origin_url: str) -> List[Dict[str, Any]]: | def refresh_save_origin_request_statuses() -> List[SaveOriginRequestInfo]: | ||||
Done Inline Actionsi'll add a docstring here. ardumont: i'll add a docstring here. | |||||
"""Refresh non-terminal save origin requests (SOR) in the backend. | |||||
Non-terminal SOR are requests whose status is **accepted** and their task status are | |||||
either **created**, **not yet scheduled**, **scheduled** or **running**. | |||||
This shall compute this list of SOR, checks their status in the scheduler and | |||||
optionally elasticsearch for their current status. Then update those in db. | |||||
Finally, this returns the refreshed information on those SOR. | |||||
""" | |||||
non_terminal_statuses = ( | |||||
SAVE_TASK_NOT_CREATED, | |||||
SAVE_TASK_NOT_YET_SCHEDULED, | |||||
SAVE_TASK_RUNNING, | |||||
SAVE_TASK_SCHEDULED, | |||||
) | |||||
save_requests = SaveOriginRequest.objects.filter( | |||||
status=SAVE_REQUEST_ACCEPTED, loading_task_status__in=non_terminal_statuses | |||||
) | |||||
# update save request statuses | |||||
return ( | |||||
update_save_origin_requests_from_queryset(save_requests) | |||||
if save_requests.count() > 0 | |||||
else [] | |||||
) | |||||
def get_save_origin_requests( | |||||
visit_type: str, origin_url: str | |||||
) -> List[SaveOriginRequestInfo]: | |||||
""" | """ | ||||
Get all save requests for a given software origin. | Get all save requests for a given software origin. | ||||
Args: | Args: | ||||
visit_type (str): the type of visit | visit_type (str): the type of visit | ||||
origin_url (str): the url of the origin | origin_url (str): the url of the origin | ||||
Raises: | Raises: | ||||
Show All 10 Lines | ) -> List[SaveOriginRequestInfo]: | ||||
sors = SaveOriginRequest.objects.filter( | sors = SaveOriginRequest.objects.filter( | ||||
visit_type=visit_type, origin_url=origin_url | visit_type=visit_type, origin_url=origin_url | ||||
) | ) | ||||
if sors.count() == 0: | if sors.count() == 0: | ||||
raise NotFoundExc( | raise NotFoundExc( | ||||
("No save requests found for visit of type " "%s on origin with url %s.") | ("No save requests found for visit of type " "%s on origin with url %s.") | ||||
% (visit_type, origin_url) | % (visit_type, origin_url) | ||||
) | ) | ||||
return get_save_origin_requests_from_queryset(sors) | return update_save_origin_requests_from_queryset(sors) | ||||
def get_save_origin_task_info( | def get_save_origin_task_info( | ||||
save_request_id: int, full_info: bool = True | save_request_id: int, full_info: bool = True | ||||
) -> Dict[str, Any]: | ) -> Dict[str, Any]: | ||||
""" | """ | ||||
Get detailed information about an accepted save origin request | Get detailed information about an accepted save origin request | ||||
and its associated loading task. | and its associated loading task. | ||||
▲ Show 20 Lines • Show All 201 Lines • ▼ Show 20 Lines | def compute_save_requests_metrics() -> None: | ||||
) | ) | ||||
for labels in product(duration_load_task_statuses, visit_types): | for labels in product(duration_load_task_statuses, visit_types): | ||||
_accepted_save_requests_delay_gauge.labels(*labels).set(0) | _accepted_save_requests_delay_gauge.labels(*labels).set(0) | ||||
for sor in SaveOriginRequest.objects.all(): | for sor in SaveOriginRequest.objects.all(): | ||||
if sor.status == SAVE_REQUEST_ACCEPTED: | if sor.status == SAVE_REQUEST_ACCEPTED: | ||||
_accepted_save_requests_gauge.labels( | _accepted_save_requests_gauge.labels( | ||||
load_task_status=sor.loading_task_status, visit_type=sor.visit_type | load_task_status=sor.loading_task_status, visit_type=sor.visit_type, | ||||
).inc() | ).inc() | ||||
_submitted_save_requests_gauge.labels( | _submitted_save_requests_gauge.labels( | ||||
status=sor.status, visit_type=sor.visit_type | status=sor.status, visit_type=sor.visit_type | ||||
).inc() | ).inc() | ||||
if ( | if ( | ||||
sor.loading_task_status in (SAVE_TASK_SUCCEEDED, SAVE_TASK_FAILED) | sor.loading_task_status in (SAVE_TASK_SUCCEEDED, SAVE_TASK_FAILED) | ||||
and sor.visit_date is not None | and sor.visit_date is not None | ||||
and sor.request_date is not None | and sor.request_date is not None | ||||
): | ): | ||||
delay = sor.visit_date.timestamp() - sor.request_date.timestamp() | delay = sor.visit_date.timestamp() - sor.request_date.timestamp() | ||||
_accepted_save_requests_delay_gauge.labels( | _accepted_save_requests_delay_gauge.labels( | ||||
load_task_status=sor.loading_task_status, visit_type=sor.visit_type | load_task_status=sor.loading_task_status, visit_type=sor.visit_type, | ||||
).inc(delay) | ).inc(delay) |
to be moved in the model