Changeset View
Changeset View
Standalone View
Standalone View
swh/web/common/origin_save.py
Show All 27 Lines | from swh.web.common.models import ( | ||||
SAVE_REQUEST_PENDING, | SAVE_REQUEST_PENDING, | ||||
SAVE_REQUEST_REJECTED, | SAVE_REQUEST_REJECTED, | ||||
SAVE_TASK_FAILED, | SAVE_TASK_FAILED, | ||||
SAVE_TASK_NOT_CREATED, | SAVE_TASK_NOT_CREATED, | ||||
SAVE_TASK_NOT_YET_SCHEDULED, | SAVE_TASK_NOT_YET_SCHEDULED, | ||||
SAVE_TASK_RUNNING, | SAVE_TASK_RUNNING, | ||||
SAVE_TASK_SCHEDULED, | SAVE_TASK_SCHEDULED, | ||||
SAVE_TASK_SUCCEEDED, | SAVE_TASK_SUCCEEDED, | ||||
VISIT_STATUS_CREATED, | |||||
VISIT_STATUS_ONGOING, | |||||
SaveAuthorizedOrigin, | SaveAuthorizedOrigin, | ||||
SaveOriginRequest, | SaveOriginRequest, | ||||
SaveUnauthorizedOrigin, | SaveUnauthorizedOrigin, | ||||
) | ) | ||||
from swh.web.common.origin_visits import get_origin_visits | from swh.web.common.origin_visits import get_origin_visits | ||||
from swh.web.common.typing import ( | from swh.web.common.typing import ( | ||||
OriginExistenceCheckInfo, | OriginExistenceCheckInfo, | ||||
OriginInfo, | OriginInfo, | ||||
SaveOriginRequestInfo, | SaveOriginRequestInfo, | ||||
) | ) | ||||
from swh.web.common.utils import SWH_WEB_METRICS_REGISTRY, parse_iso8601_date_to_utc | from swh.web.common.utils import SWH_WEB_METRICS_REGISTRY, parse_iso8601_date_to_utc | ||||
scheduler = config.scheduler() | scheduler = config.scheduler() | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
# Number of days in the past to lookup for information | # Number of days in the past to lookup for information | ||||
MAX_THRESHOLD_DAYS = 30 | MAX_THRESHOLD_DAYS = 30 | ||||
# Non terminal visit statuses which needs updates | |||||
NON_TERMINAL_STATUSES = [ | |||||
VISIT_STATUS_CREATED, | |||||
VISIT_STATUS_ONGOING, | |||||
] | |||||
def get_origin_save_authorized_urls() -> List[str]: | def get_origin_save_authorized_urls() -> List[str]: | ||||
""" | """ | ||||
Get the list of origin url prefixes authorized to be | Get the list of origin url prefixes authorized to be | ||||
immediately loaded into the archive (whitelist). | immediately loaded into the archive (whitelist). | ||||
Returns: | Returns: | ||||
list: The list of authorized origin url prefix | list: The list of authorized origin url prefix | ||||
▲ Show 20 Lines • Show All 200 Lines • ▼ Show 20 Lines | if time_delta.days <= MAX_THRESHOLD_DAYS: | ||||
try: | try: | ||||
origin_info = archive.lookup_origin(OriginInfo(url=save_request.origin_url)) | origin_info = archive.lookup_origin(OriginInfo(url=save_request.origin_url)) | ||||
origin_visits = get_origin_visits(origin_info) | origin_visits = get_origin_visits(origin_info) | ||||
visit_dates = [parse_iso8601_date_to_utc(v["date"]) for v in origin_visits] | visit_dates = [parse_iso8601_date_to_utc(v["date"]) for v in origin_visits] | ||||
i = bisect_right(visit_dates, save_request.request_date) | i = bisect_right(visit_dates, save_request.request_date) | ||||
if i != len(visit_dates): | if i != len(visit_dates): | ||||
visit_date = visit_dates[i] | visit_date = visit_dates[i] | ||||
visit_status = origin_visits[i]["status"] | visit_status = origin_visits[i]["status"] | ||||
if visit_status not in ("full", "partial", "not_found"): | |||||
visit_date = None | |||||
except Exception as exc: | except Exception as exc: | ||||
sentry_sdk.capture_exception(exc) | sentry_sdk.capture_exception(exc) | ||||
return visit_date, visit_status | return visit_date, visit_status | ||||
def _check_visit_update_status( | def _check_visit_update_status( | ||||
save_request: SaveOriginRequest, save_task_status: str | save_request: SaveOriginRequest, | ||||
) -> Tuple[Optional[datetime], str]: | ) -> Tuple[Optional[datetime], Optional[str], Optional[str]]: | ||||
"""Given a save request and a save task status, determine whether a save request was | """Given a save request, determine whether a save request was successful or failed. | ||||
successful or failed. | |||||
Args: | Args: | ||||
save_request: Input save origin request to retrieve information for. | save_request: Input save origin request to retrieve information for. | ||||
Returns: | Returns: | ||||
Tuple of (optional visit date, save task status) for such save request origin | Tuple of (optional visit date, optional visit status, optional save task status) | ||||
for such save request origin | |||||
""" | """ | ||||
visit_date, visit_status = _get_visit_info_for_save_request(save_request) | visit_date, visit_status = _get_visit_info_for_save_request(save_request) | ||||
save_request.visit_date = visit_date | loading_task_status = None | ||||
save_request.visit_status = visit_status | |||||
if visit_date and visit_status in ("full", "partial"): | if visit_date and visit_status in ("full", "partial"): | ||||
# visit has been performed, mark the saving task as succeeded | # visit has been performed, mark the saving task as succeeded | ||||
save_task_status = SAVE_TASK_SUCCEEDED | loading_task_status = SAVE_TASK_SUCCEEDED | ||||
elif visit_status in ("created", "ongoing"): | elif visit_status in ("created", "ongoing"): | ||||
# visit is currently running | # visit is currently running | ||||
save_task_status = SAVE_TASK_RUNNING | loading_task_status = SAVE_TASK_RUNNING | ||||
elif visit_status in ("not_found", "failed"): | elif visit_status in ("not_found", "failed"): | ||||
save_task_status = SAVE_TASK_FAILED | loading_task_status = SAVE_TASK_FAILED | ||||
else: | else: | ||||
time_now = datetime.now(tz=timezone.utc) | time_now = datetime.now(tz=timezone.utc) | ||||
time_delta = time_now - save_request.request_date | time_delta = time_now - save_request.request_date | ||||
# consider the task as failed if it is still in scheduled state | # consider the task as failed if it is still in scheduled state | ||||
# 30 days after its submission | # 30 days after its submission | ||||
if time_delta.days > MAX_THRESHOLD_DAYS: | if time_delta.days > MAX_THRESHOLD_DAYS: | ||||
save_task_status = SAVE_TASK_FAILED | loading_task_status = SAVE_TASK_FAILED | ||||
return visit_date, save_task_status | return visit_date, visit_status, loading_task_status | ||||
def _compute_task_loading_status( | |||||
task: Optional[Dict[str, Any]] = None, task_run: Optional[Dict[str, Any]] = None, | |||||
) -> Optional[str]: | |||||
loading_task_status: Optional[str] = None | |||||
# First determine the loading task status out of task information | |||||
if task: | |||||
loading_task_status = _save_task_status[task["status"]] | |||||
if task_run: | |||||
loading_task_status = _save_task_run_status[task_run["status"]] | |||||
return loading_task_status | |||||
def _update_save_request_info( | def _update_save_request_info( | ||||
save_request: SaveOriginRequest, | save_request: SaveOriginRequest, | ||||
task: Optional[Dict[str, Any]] = None, | task: Optional[Dict[str, Any]] = None, | ||||
task_run: Optional[Dict[str, Any]] = None, | task_run: Optional[Dict[str, Any]] = None, | ||||
) -> SaveOriginRequestInfo: | ) -> SaveOriginRequestInfo: | ||||
"""Update save request information out of task and task_run information. | """Update save request information out of the visit status and fallback to the task and | ||||
task_run information if the visit status is missing. | |||||
Args: | Args: | ||||
save_request: Save request | save_request: Save request | ||||
task: Associated scheduler task information about the save request | task: Associated scheduler task information about the save request | ||||
task_run: Most recent run occurrence of the associated task | task_run: Most recent run occurrence of the associated task | ||||
Returns: | Returns: | ||||
Summary of the save request information updated. | Summary of the save request information updated. | ||||
""" | """ | ||||
must_save = False | must_save = False | ||||
visit_date = save_request.visit_date | |||||
# save task still in scheduler db | |||||
if task: | |||||
save_task_status = _save_task_status[task["status"]] | |||||
if task_run: | |||||
save_task_status = _save_task_run_status[task_run["status"]] | |||||
# Consider request from which a visit date has already been found | # To determine the save code now request's final status, the visit date must be set | ||||
anlambert: s/must be in a final status/must be in a final one/ to avoid repetition | |||||
# as succeeded to avoid retrieving it again | # and the visit status must be a final one. Once they do, the save code now is | ||||
if save_task_status == SAVE_TASK_SCHEDULED and visit_date: | # definitely done. | ||||
save_task_status = SAVE_TASK_SUCCEEDED | |||||
if ( | if ( | ||||
save_task_status in (SAVE_TASK_FAILED, SAVE_TASK_SUCCEEDED) | not save_request.visit_date | ||||
and not visit_date | or not save_request.visit_status | ||||
or save_request.visit_status in NON_TERMINAL_STATUSES | |||||
): | ): | ||||
visit_date, visit_status = _get_visit_info_for_save_request(save_request) | visit_date, visit_status, loading_task_status = _check_visit_update_status( | ||||
save_request.visit_date = visit_date | save_request | ||||
save_request.visit_status = visit_status | |||||
if visit_status in ("failed", "not_found"): | |||||
save_task_status = SAVE_TASK_FAILED | |||||
must_save = True | |||||
# Check tasks still marked as scheduled / not yet scheduled | |||||
if save_task_status in (SAVE_TASK_SCHEDULED, SAVE_TASK_NOT_YET_SCHEDULED): | |||||
visit_date, save_task_status = _check_visit_update_status( | |||||
save_request, save_task_status | |||||
) | ) | ||||
# save task may have been archived | if not loading_task_status: # fallback when not provided | ||||
else: | loading_task_status = _compute_task_loading_status(task, task_run) | ||||
save_task_status = save_request.loading_task_status | |||||
if save_task_status in (SAVE_TASK_SCHEDULED, SAVE_TASK_NOT_YET_SCHEDULED): | |||||
visit_date, save_task_status = _check_visit_update_status( | |||||
save_request, save_task_status | |||||
) | |||||
else: | if visit_date != save_request.visit_date: | ||||
save_task_status = save_request.loading_task_status | must_save = True | ||||
save_request.visit_date = visit_date | |||||
if visit_status != save_request.visit_status: | |||||
must_save = True | |||||
save_request.visit_status = visit_status | |||||
if ( | if ( | ||||
# avoid to override final loading task status when already found | loading_task_status is not None | ||||
# as visit status is no longer checked once a visit date has been found | and loading_task_status != save_request.loading_task_status | ||||
save_request.loading_task_status not in (SAVE_TASK_FAILED, SAVE_TASK_SUCCEEDED) | |||||
and save_request.loading_task_status != save_task_status | |||||
): | ): | ||||
save_request.loading_task_status = save_task_status | |||||
must_save = True | must_save = True | ||||
save_request.loading_task_status = loading_task_status | |||||
if must_save: | if must_save: | ||||
save_request.save() | save_request.save() | ||||
return save_request.to_dict() | return save_request.to_dict() | ||||
def create_save_origin_request( | def create_save_origin_request( | ||||
▲ Show 20 Lines • Show All 78 Lines • ▼ Show 20 Lines | if save_request_status == SAVE_REQUEST_ACCEPTED: | ||||
"url": artifact_url, | "url": artifact_url, | ||||
"version": artifact_version, | "version": artifact_version, | ||||
"time": metadata["last_modified"], | "time": metadata["last_modified"], | ||||
"length": metadata["content_length"], | "length": metadata["content_length"], | ||||
} | } | ||||
) | ) | ||||
task_kwargs = dict(**task_kwargs, artifacts=artifacts, snapshot_append=True) | task_kwargs = dict(**task_kwargs, artifacts=artifacts, snapshot_append=True) | ||||
sor = None | sor = None | ||||
# get list of previously sumitted save requests | # get list of previously submitted save requests (most recent first) | ||||
current_sors = list( | current_sors = list( | ||||
SaveOriginRequest.objects.filter( | SaveOriginRequest.objects.filter( | ||||
visit_type=visit_type, origin_url=origin_url | visit_type=visit_type, origin_url=origin_url | ||||
) | ).order_by("-request_date") | ||||
) | ) | ||||
can_create_task = False | can_create_task = False | ||||
# if no save requests previously submitted, create the scheduler task | # if no save requests previously submitted, create the scheduler task | ||||
if not current_sors: | if not current_sors: | ||||
can_create_task = True | can_create_task = True | ||||
else: | else: | ||||
# get the latest submitted save request | # get the latest submitted save request | ||||
▲ Show 20 Lines • Show All 131 Lines • ▼ Show 20 Lines | def refresh_save_origin_request_statuses() -> List[SaveOriginRequestInfo]: | ||||
Finally, this returns the refreshed information on those SOR. | Finally, this returns the refreshed information on those SOR. | ||||
""" | """ | ||||
pivot_date = datetime.now(tz=timezone.utc) - timedelta(days=MAX_THRESHOLD_DAYS) | pivot_date = datetime.now(tz=timezone.utc) - timedelta(days=MAX_THRESHOLD_DAYS) | ||||
save_requests = SaveOriginRequest.objects.filter( | save_requests = SaveOriginRequest.objects.filter( | ||||
# Retrieve accepted request statuses (all statuses) | # Retrieve accepted request statuses (all statuses) | ||||
Q(status=SAVE_REQUEST_ACCEPTED), | Q(status=SAVE_REQUEST_ACCEPTED), | ||||
# those without the required information we need to update | # those without the required information we need to update | ||||
Q(visit_date__isnull=True) | Q(visit_status__isnull=True), | Q(visit_date__isnull=True) | ||||
| Q(visit_status__isnull=True) | |||||
| Q(visit_status__in=NON_TERMINAL_STATUSES), | |||||
# limit results to recent ones (that is roughly 30 days old at best) | # limit results to recent ones (that is roughly 30 days old at best) | ||||
Q(request_date__gte=pivot_date), | Q(request_date__gte=pivot_date), | ||||
) | ) | ||||
return ( | return ( | ||||
update_save_origin_requests_from_queryset(save_requests) | update_save_origin_requests_from_queryset(save_requests) | ||||
if save_requests.count() > 0 | if save_requests.count() > 0 | ||||
else [] | else [] | ||||
) | ) | ||||
def get_save_origin_requests( | def get_save_origin_requests( | ||||
▲ Show 20 Lines • Show All 263 Lines • Show Last 20 Lines |
s/must be in a final status/must be in a final one/ to avoid repetition