diff --git a/swh/web/api/views/origin_save.py b/swh/web/api/views/origin_save.py --- a/swh/web/api/views/origin_save.py +++ b/swh/web/api/views/origin_save.py @@ -1,4 +1,4 @@ -# Copyright (C) 2018-2019 The Software Heritage developers +# Copyright (C) 2018-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information diff --git a/swh/web/common/models.py b/swh/web/common/models.py --- a/swh/web/common/models.py +++ b/swh/web/common/models.py @@ -1,10 +1,12 @@ -# Copyright (C) 2018-2019 The Software Heritage developers +# Copyright (C) 2018-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from django.db import models +from swh.web.common.typing import SaveOriginRequestInfo + class SaveAuthorizedOrigin(models.Model): """ @@ -105,16 +107,25 @@ ordering = ["-id"] indexes = [models.Index(fields=["origin_url", "status"])] - def __str__(self): - return str( - { - "id": self.id, - "request_date": self.request_date, - "visit_type": self.visit_type, - "visit_status": self.visit_status, - "origin_url": self.origin_url, - "status": self.status, - "loading_task_id": self.loading_task_id, - "visit_date": self.visit_date, - } + def to_dict(self) -> SaveOriginRequestInfo: + """Map the request save model object to a json serializable dict. + + Returns: + The corresponding SaveOriginRequetsInfo json serializable dict. + + """ + visit_date = self.visit_date + return SaveOriginRequestInfo( + id=self.id, + origin_url=self.origin_url, + visit_type=self.visit_type, + save_request_date=self.request_date.isoformat(), + save_request_status=self.status, + save_task_status=self.loading_task_status, + visit_status=self.visit_status, + visit_date=visit_date.isoformat() if visit_date else None, + loading_task_id=self.loading_task_id, ) + + def __str__(self) -> str: + return str(self.to_dict()) diff --git a/swh/web/common/origin_save.py b/swh/web/common/origin_save.py --- a/swh/web/common/origin_save.py +++ b/swh/web/common/origin_save.py @@ -38,7 +38,7 @@ SaveUnauthorizedOrigin, ) from swh.web.common.origin_visits import get_origin_visits -from swh.web.common.typing import OriginInfo +from swh.web.common.typing import OriginInfo, SaveOriginRequestInfo from swh.web.common.utils import SWH_WEB_METRICS_REGISTRY, parse_iso8601_date_to_utc scheduler = config.scheduler() @@ -225,11 +225,11 @@ return visit_date, save_task_status -def _save_request_dict( +def _update_save_request_info( save_request: SaveOriginRequest, task: Optional[Dict[str, Any]] = None, task_run: Optional[Dict[str, Any]] = None, -) -> Dict[str, Any]: +) -> SaveOriginRequestInfo: """Update save request information out of task and task_run information. Args: @@ -293,19 +293,12 @@ if must_save: save_request.save() - return { - "id": save_request.id, - "visit_type": save_request.visit_type, - "visit_status": save_request.visit_status, - "origin_url": save_request.origin_url, - "save_request_date": save_request.request_date.isoformat(), - "save_request_status": save_request.status, - "save_task_status": save_request.loading_task_status, - "visit_date": visit_date.isoformat() if visit_date else None, - } + return save_request.to_dict() -def create_save_origin_request(visit_type: str, origin_url: str) -> Dict[str, Any]: +def create_save_origin_request( + visit_type: str, origin_url: str +) -> SaveOriginRequestInfo: """ Create a loading task to save a software origin into the archive. @@ -382,8 +375,8 @@ task = tasks[0] if tasks else None task_runs = scheduler.get_task_runs([sor.loading_task_id]) task_run = task_runs[0] if task_runs else None - save_request = _save_request_dict(sor, task, task_run) - task_status = save_request["save_task_status"] + save_request_info = _update_save_request_info(sor, task, task_run) + task_status = save_request_info["save_task_status"] # create a new scheduler task only if the previous one has been # already executed if ( @@ -441,22 +434,22 @@ ) assert sor is not None - return _save_request_dict(sor, task) + return _update_save_request_info(sor, task) -def get_save_origin_requests_from_queryset( +def update_save_origin_requests_from_queryset( requests_queryset: QuerySet, -) -> List[Dict[str, Any]]: - """ - Get all save requests from a SaveOriginRequest queryset. +) -> List[SaveOriginRequestInfo]: + """Update all save requests from a SaveOriginRequest queryset, update their status in db + and return the list of impacted save_requests. Args: - requests_queryset (django.db.models.QuerySet): input - SaveOriginRequest queryset + requests_queryset: input SaveOriginRequest queryset Returns: - list: A list of save origin requests dict as described in + list: A list of save origin request info dicts as described in :func:`swh.web.common.origin_save.create_save_origin_request` + """ task_ids = [] for sor in requests_queryset: @@ -468,14 +461,45 @@ task_runs = scheduler.get_task_runs(tasks) task_runs = {task_run["task"]: task_run for task_run in task_runs} for sor in requests_queryset: - sr_dict = _save_request_dict( - sor, tasks.get(sor.loading_task_id), task_runs.get(sor.loading_task_id) + sr_dict = _update_save_request_info( + sor, tasks.get(sor.loading_task_id), task_runs.get(sor.loading_task_id), ) save_requests.append(sr_dict) return save_requests -def get_save_origin_requests(visit_type: str, origin_url: str) -> List[Dict[str, Any]]: +def refresh_save_origin_request_statuses() -> List[SaveOriginRequestInfo]: + """Refresh non-terminal save origin requests (SOR) in the backend. + + Non-terminal SOR are requests whose status is **accepted** and their task status are + either **created**, **not yet scheduled**, **scheduled** or **running**. + + This shall compute this list of SOR, checks their status in the scheduler and + optionally elasticsearch for their current status. Then update those in db. + + Finally, this returns the refreshed information on those SOR. + + """ + non_terminal_statuses = ( + SAVE_TASK_NOT_CREATED, + SAVE_TASK_NOT_YET_SCHEDULED, + SAVE_TASK_RUNNING, + SAVE_TASK_SCHEDULED, + ) + save_requests = SaveOriginRequest.objects.filter( + status=SAVE_REQUEST_ACCEPTED, loading_task_status__in=non_terminal_statuses + ) + # update save request statuses + return ( + update_save_origin_requests_from_queryset(save_requests) + if save_requests.count() > 0 + else [] + ) + + +def get_save_origin_requests( + visit_type: str, origin_url: str +) -> List[SaveOriginRequestInfo]: """ Get all save requests for a given software origin. @@ -502,7 +526,7 @@ ("No save requests found for visit of type " "%s on origin with url %s.") % (visit_type, origin_url) ) - return get_save_origin_requests_from_queryset(sors) + return update_save_origin_requests_from_queryset(sors) def get_save_origin_task_info( @@ -717,22 +741,27 @@ for labels in product(duration_load_task_statuses, visit_types): _accepted_save_requests_delay_gauge.labels(*labels).set(0) - for sor in SaveOriginRequest.objects.all(): - if sor.status == SAVE_REQUEST_ACCEPTED: + for save_request in SaveOriginRequest.objects.all(): + if save_request.status == SAVE_REQUEST_ACCEPTED: _accepted_save_requests_gauge.labels( - load_task_status=sor.loading_task_status, visit_type=sor.visit_type + load_task_status=save_request.loading_task_status, + visit_type=save_request.visit_type, ).inc() _submitted_save_requests_gauge.labels( - status=sor.status, visit_type=sor.visit_type + status=save_request.status, visit_type=save_request.visit_type ).inc() if ( - sor.loading_task_status in (SAVE_TASK_SUCCEEDED, SAVE_TASK_FAILED) - and sor.visit_date is not None - and sor.request_date is not None + save_request.loading_task_status in (SAVE_TASK_SUCCEEDED, SAVE_TASK_FAILED) + and save_request.visit_date is not None + and save_request.request_date is not None ): - delay = sor.visit_date.timestamp() - sor.request_date.timestamp() + delay = ( + save_request.visit_date.timestamp() + - save_request.request_date.timestamp() + ) _accepted_save_requests_delay_gauge.labels( - load_task_status=sor.loading_task_status, visit_type=sor.visit_type + load_task_status=save_request.loading_task_status, + visit_type=save_request.visit_type, ).inc(delay) diff --git a/swh/web/common/typing.py b/swh/web/common/typing.py --- a/swh/web/common/typing.py +++ b/swh/web/common/typing.py @@ -1,4 +1,4 @@ -# Copyright (C) 2020 The Software Heritage developers +# Copyright (C) 2020-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -224,3 +224,24 @@ PagedResult = CorePagedResult[TResult, str] + + +class SaveOriginRequestInfo(TypedDict): + id: int + """Unique key""" + save_request_date: str + """Date of the creation request""" + visit_type: str + """Type of the visit""" + visit_status: Optional[str] + """Status of the visit""" + origin_url: str + """Origin to ingest""" + save_request_status: str + """Status of the request""" + loading_task_id: Optional[int] + """Identifier of the loading task in the scheduler if scheduled""" + visit_date: Optional[str] + """End of the visit if terminated""" + save_task_status: str + """Status of the scheduled task""" diff --git a/swh/web/management/__init__.py b/swh/web/management/__init__.py new file mode 100644 diff --git a/swh/web/management/commands/__init__.py b/swh/web/management/commands/__init__.py new file mode 100644 diff --git a/swh/web/management/commands/refresh_save_origin_request_status.py b/swh/web/management/commands/refresh_save_origin_request_status.py new file mode 100644 --- /dev/null +++ b/swh/web/management/commands/refresh_save_origin_request_status.py @@ -0,0 +1,21 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU Affero General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from django.core.management.base import BaseCommand + +from swh.web.common.origin_save import refresh_save_origin_request_statuses + + +class RefreshCommand(BaseCommand): + help = "Refresh save code now origin requests periodically" + + def handle(self, *args, **options): + refreshed_statuses = refresh_save_origin_request_statuses() + if len(refreshed_statuses) > 0: + msg = f"Successfully updated {len(refreshed_statuses)} save request(s)." + else: + msg = "Nothing to do." + + self.stdout.write(self.style.SUCCESS(msg)) diff --git a/swh/web/misc/origin_save.py b/swh/web/misc/origin_save.py --- a/swh/web/misc/origin_save.py +++ b/swh/web/misc/origin_save.py @@ -1,4 +1,4 @@ -# Copyright (C) 2018-2019 The Software Heritage developers +# Copyright (C) 2018-2021 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -16,7 +16,6 @@ from swh.web.common.origin_save import ( create_save_origin_request, get_savable_visit_types, - get_save_origin_requests_from_queryset, get_save_origin_task_info, ) from swh.web.common.utils import EnforceCSRFAuthentication @@ -87,9 +86,7 @@ table_data["recordsFiltered"] = save_requests.count() paginator = Paginator(save_requests, length) - table_data["data"] = get_save_origin_requests_from_queryset( - paginator.page(page).object_list - ) + table_data["data"] = [sor.to_dict() for sor in paginator.page(page).object_list] return JsonResponse(table_data) diff --git a/swh/web/tests/common/test_origin_save.py b/swh/web/tests/common/test_origin_save.py --- a/swh/web/tests/common/test_origin_save.py +++ b/swh/web/tests/common/test_origin_save.py @@ -16,14 +16,17 @@ SAVE_REQUEST_ACCEPTED, SAVE_TASK_FAILED, SAVE_TASK_RUNNING, + SAVE_TASK_SCHEDULED, SAVE_TASK_SUCCEEDED, + VISIT_STATUS_FULL, SaveOriginRequest, ) from swh.web.common.origin_save import ( get_save_origin_requests, get_save_origin_task_info, + refresh_save_origin_request_statuses, ) -from swh.web.common.typing import OriginVisitInfo +from swh.web.common.typing import OriginVisitInfo, SaveOriginRequestInfo from swh.web.config import get_config _es_url = "http://esnode1.internal.softwareheritage.org:9200" @@ -277,6 +280,38 @@ return sors +@pytest.mark.parametrize("visit_date", [None, "some-date"]) +def test_from_save_origin_request_to_save_request_info_dict(visit_date): + """Ensure save request to json serializable dict is fine + + """ + request_date = datetime.now(tz=timezone.utc) + _visit_date = request_date + timedelta(minutes=5) if visit_date else None + request_date = datetime.now(tz=timezone.utc) + sor = SaveOriginRequest( + request_date=request_date, + visit_type=_visit_type, + visit_status=VISIT_STATUS_FULL, + origin_url=_origin_url, + status=SAVE_REQUEST_ACCEPTED, + loading_task_status=None, + visit_date=_visit_date, + loading_task_id=1, + ) + + assert sor.to_dict() == SaveOriginRequestInfo( + id=sor.id, + origin_url=sor.origin_url, + visit_type=sor.visit_type, + save_request_date=sor.request_date.isoformat(), + save_request_status=sor.status, + save_task_status=sor.loading_task_status, + visit_status=sor.visit_status, + visit_date=_visit_date.isoformat() if _visit_date else None, + loading_task_id=sor.loading_task_id, + ) + + @pytest.mark.django_db @pytest.mark.parametrize("visit_status", ["created", "ongoing",]) def test_get_save_origin_requests_no_visit_date_found(mocker, visit_status): @@ -352,3 +387,38 @@ assert sors[0]["save_task_status"] == SAVE_TASK_SUCCEEDED assert sors[0]["visit_date"] is None assert sors[0]["visit_status"] is None + + # nothing to refresh so nothing to return + assert len(refresh_save_origin_request_statuses()) == 0 + + +@pytest.mark.django_db +def test_refresh_save_request_statuses(mocker, api_client): + """Refresh filters non-terminal save origins requests and update if changes + + """ + sors = _get_save_origin_requests( + mocker, load_status=SAVE_TASK_SCHEDULED, visit_status=None, + ) + assert len(sors) == 1 + + # no changes so refresh does detect the entry but does nothing + sors = refresh_save_origin_request_statuses() + + assert len(sors) == 1 + for sor in sors: + # as it turns out, in this test, this won't update anything as no new status got + # returned by the scheduler + assert sor["save_task_status"] == SAVE_TASK_SCHEDULED + + # make the scheduler return eventful for that task + _mock_scheduler(mocker) + + # Detected entry, this time it should be updated + sors = refresh_save_origin_request_statuses() + + assert len(sors) == 1 + for sor in sors: + # as it turns out, in this test, this won't update anything as no new status got + # returned by the scheduler + assert sor["save_task_status"] == SAVE_TASK_SUCCEEDED