diff --git a/assets/src/bundles/save/index.js b/assets/src/bundles/save/index.js index 06876012..d2c54ac7 100644 --- a/assets/src/bundles/save/index.js +++ b/assets/src/bundles/save/index.js @@ -1,567 +1,561 @@ /** * Copyright (C) 2018-2021 The Software Heritage developers * See the AUTHORS file at the top-level directory of this distribution * License: GNU Affero General Public License version 3, or any later version * See top-level LICENSE file for more information */ import {csrfPost, handleFetchError, isGitRepoUrl, htmlAlert, removeUrlFragment, getCanonicalOriginURL} from 'utils/functions'; import {swhSpinnerSrc} from 'utils/constants'; import artifactFormRowTemplate from './artifact-form-row.ejs'; let saveRequestsTable; async function originSaveRequest( originType, originUrl, extraData, acceptedCallback, pendingCallback, errorCallback ) { // Actually trigger the origin save request const addSaveOriginRequestUrl = Urls.api_1_save_origin(originType, originUrl); $('.swh-processing-save-request').css('display', 'block'); let headers = {}; let body = null; if (extraData !== {}) { body = JSON.stringify(extraData); headers = { 'Content-Type': 'application/json' }; }; try { const response = await csrfPost(addSaveOriginRequestUrl, headers, body); handleFetchError(response); const data = await response.json(); $('.swh-processing-save-request').css('display', 'none'); if (data.save_request_status === 'accepted') { acceptedCallback(); } else { pendingCallback(); } } catch (response) { $('.swh-processing-save-request').css('display', 'none'); const errorData = await response.json(); errorCallback(response.status, errorData); }; } function addArtifactVersionAutofillHandler(formId) { // autofill artifact version input with the filename from // the artifact url without extensions $(`#swh-input-artifact-url-${formId}`).on('input', function(event) { const artifactUrl = $(this).val().trim(); let filename = artifactUrl.split('/').slice(-1)[0]; if (filename !== artifactUrl) { filename = filename.replace(/tar.*$/, 'tar'); const filenameNoExt = filename.split('.').slice(0, -1).join('.'); const artifactVersion = $(`#swh-input-artifact-version-${formId}`); if (filenameNoExt !== filename) { artifactVersion.val(filenameNoExt); } } }); } export function maybeRequireExtraInputs() { // Read the actual selected value and depending on the origin type, display some extra // inputs or hide them. This makes the extra inputs disabled when not displayed. const originType = $('#swh-input-visit-type').val(); let display = 'none'; let disabled = true; if (originType === 'archives') { display = 'flex'; disabled = false; } $('.swh-save-origin-archives-form').css('display', display); if (!disabled) { // help paragraph must have block display for proper rendering $('#swh-save-origin-archives-help').css('display', 'block'); } $('.swh-save-origin-archives-form .form-control').prop('disabled', disabled); if (originType === 'archives' && $('.swh-save-origin-archives-form').length === 1) { // insert first artifact row when the archives visit type is selected for the first time $('.swh-save-origin-archives-form').last().after( artifactFormRowTemplate({deletableRow: false, formId: 0})); addArtifactVersionAutofillHandler(0); } } export function addArtifactFormRow() { const formId = $('.swh-save-origin-artifact-form').length; $('.swh-save-origin-artifact-form').last().after( artifactFormRowTemplate({ deletableRow: true, formId: formId }) ); addArtifactVersionAutofillHandler(formId); } export function deleteArtifactFormRow(event) { $(event.target).closest('.swh-save-origin-artifact-form').remove(); } const userRequestsFilterCheckbox = `
Fetching task information ...
${info.key} |
${info.value} |
---|
You can contribute to extend the content of the Software Heritage archive by submitting an origin save request. To do so, fill the required info in the form below:
A "Save code now" request takes the following parameters:
git
, for origins using Githg
, for origins using Mercurialsvn
, for origins using Subversiongit
origin into the archive, you should check that the command $ git clone <origin_url>
Once submitted, your save request can either be:
Once a save request has been accepted, you can follow its current status in the
submitted save requests list.
If you submitted requests while authenticated, you will be able
to only display your own requests.
Date | Type | Url | Request | Status | Info |
---|
' % content["hljs_language"])
assert_contains(resp, escape(content["data"]))
split_path = content_path.split("/")
filename = split_path[-1]
path = content_path.replace(filename, "")[:-1]
path_info = gen_path_info(path)
del query_params["path"]
if timestamp:
query_params["timestamp"] = format_utc_iso_date(
parse_iso8601_date_to_utc(timestamp).isoformat(), "%Y-%m-%dT%H:%M:%SZ"
)
root_dir_url = reverse("browse-origin-directory", query_params=query_params)
assert_contains(resp, '', count=len(path_info) + 1)
assert_contains(resp, '%s' % (root_dir_url, root_dir_sha1[:7]))
for p in path_info:
query_params["path"] = p["path"]
dir_url = reverse("browse-origin-directory", query_params=query_params)
assert_contains(resp, '%s' % (dir_url, p["name"]))
assert_contains(resp, " %s " % filename)
query_string = "sha1_git:" + content["sha1_git"]
url_raw = reverse(
"browse-content-raw",
url_args={"query_string": query_string},
query_params={"filename": filename},
)
assert_contains(resp, url_raw)
if "path" in query_params:
del query_params["path"]
origin_branches_url = reverse("browse-origin-branches", query_params=query_params)
assert_contains(resp, f'href="{escape(origin_branches_url)}"')
assert_contains(resp, f"Branches ({snapshot_sizes['revision']})")
origin_releases_url = reverse("browse-origin-releases", query_params=query_params)
assert_contains(resp, f'href="{escape(origin_releases_url)}">')
assert_contains(resp, f"Releases ({snapshot_sizes['release']})")
assert_contains(resp, '', count=len(origin_branches))
query_params["path"] = content_path
for branch in origin_branches:
root_dir_branch_url = reverse(
"browse-origin-content",
query_params={"branch": branch["name"], **query_params},
)
assert_contains(resp, '' % root_dir_branch_url)
assert_contains(resp, '', count=len(origin_releases))
query_params["branch"] = None
for release in origin_releases:
root_dir_release_url = reverse(
"browse-origin-content",
query_params={"release": release["name"], **query_params},
)
assert_contains(resp, '' % root_dir_release_url)
url = reverse("browse-origin-content", query_params=query_params)
resp = check_html_get_response(
client, url, status_code=200, template_used="browse/content.html"
)
snapshot = archive_data.snapshot_get(origin_visit["snapshot"])
head_rev_id = archive_data.snapshot_get_head(snapshot)
swhid_context = {
"origin": origin_info["url"],
"visit": gen_swhid(SNAPSHOT, snapshot["id"]),
"anchor": gen_swhid(REVISION, head_rev_id),
"path": f"/{content_path}",
}
swh_cnt_id = gen_swhid(CONTENT, content["sha1_git"], metadata=swhid_context)
swh_cnt_id_url = reverse("browse-swhid", url_args={"swhid": swh_cnt_id})
assert_contains(resp, swh_cnt_id)
assert_contains(resp, swh_cnt_id_url)
assert_contains(resp, "swh-take-new-snapshot")
_check_origin_link(resp, origin_info["url"])
assert_not_contains(resp, "swh-metadata-popover")
def _origin_directory_view_test_helper(
client,
archive_data,
origin_info,
origin_visit,
snapshot_sizes,
origin_branches,
origin_releases,
root_directory_sha1,
directory_entries,
visit_id=None,
timestamp=None,
snapshot_id=None,
path=None,
):
dirs = [e for e in directory_entries if e["type"] in ("dir", "rev")]
files = [e for e in directory_entries if e["type"] == "file"]
if not visit_id and not snapshot_id:
visit_id = origin_visit["visit"]
query_params = {"origin_url": origin_info["url"]}
if timestamp:
query_params["timestamp"] = timestamp
elif visit_id:
query_params["visit_id"] = visit_id
else:
query_params["snapshot"] = snapshot_id
if path:
query_params["path"] = path
url = reverse("browse-origin-directory", query_params=query_params)
resp = check_html_get_response(
client, url, status_code=200, template_used="browse/directory.html"
)
assert_contains(resp, '', count=len(dirs))
assert_contains(resp, ' ', count=len(files))
if timestamp:
query_params["timestamp"] = format_utc_iso_date(
parse_iso8601_date_to_utc(timestamp).isoformat(), "%Y-%m-%dT%H:%M:%SZ"
)
for d in dirs:
if d["type"] == "rev":
dir_url = reverse("browse-revision", url_args={"sha1_git": d["target"]})
else:
dir_path = d["name"]
if path:
dir_path = "%s/%s" % (path, d["name"])
query_params["path"] = dir_path
dir_url = reverse("browse-origin-directory", query_params=query_params,)
assert_contains(resp, dir_url)
for f in files:
file_path = f["name"]
if path:
file_path = "%s/%s" % (path, f["name"])
query_params["path"] = file_path
file_url = reverse("browse-origin-content", query_params=query_params)
assert_contains(resp, file_url)
if "path" in query_params:
del query_params["path"]
root_dir_branch_url = reverse("browse-origin-directory", query_params=query_params)
nb_bc_paths = 1
if path:
nb_bc_paths = len(path.split("/")) + 1
assert_contains(resp, '', count=nb_bc_paths)
assert_contains(
resp, '%s' % (root_dir_branch_url, root_directory_sha1[:7])
)
origin_branches_url = reverse("browse-origin-branches", query_params=query_params)
assert_contains(resp, f'href="{escape(origin_branches_url)}"')
assert_contains(resp, f"Branches ({snapshot_sizes['revision']})")
origin_releases_url = reverse("browse-origin-releases", query_params=query_params)
nb_releases = len(origin_releases)
if nb_releases > 0:
assert_contains(resp, f'href="{escape(origin_releases_url)}"')
assert_contains(resp, f"Releases ({snapshot_sizes['release']})")
if path:
query_params["path"] = path
assert_contains(resp, ' ', count=len(origin_branches))
for branch in origin_branches:
query_params["branch"] = branch["name"]
root_dir_branch_url = reverse(
"browse-origin-directory", query_params=query_params
)
assert_contains(resp, '' % root_dir_branch_url)
assert_contains(resp, '', count=len(origin_releases))
query_params["branch"] = None
for release in origin_releases:
query_params["release"] = release["name"]
root_dir_release_url = reverse(
"browse-origin-directory", query_params=query_params
)
assert_contains(resp, 'href="%s"' % root_dir_release_url)
assert_contains(resp, "vault-cook-directory")
assert_contains(resp, "vault-cook-revision")
snapshot = archive_data.snapshot_get(origin_visit["snapshot"])
head_rev_id = archive_data.snapshot_get_head(snapshot)
swhid_context = {
"origin": origin_info["url"],
"visit": gen_swhid(SNAPSHOT, snapshot["id"]),
"anchor": gen_swhid(REVISION, head_rev_id),
"path": f"/{path}" if path else None,
}
swh_dir_id = gen_swhid(
DIRECTORY, directory_entries[0]["dir_id"], metadata=swhid_context
)
swh_dir_id_url = reverse("browse-swhid", url_args={"swhid": swh_dir_id})
assert_contains(resp, swh_dir_id)
assert_contains(resp, swh_dir_id_url)
assert_contains(resp, "swh-take-new-snapshot")
_check_origin_link(resp, origin_info["url"])
assert_not_contains(resp, "swh-metadata-popover")
def _origin_branches_test_helper(
client, origin_info, origin_snapshot, snapshot_sizes, snapshot_id=None
):
query_params = {"origin_url": origin_info["url"], "snapshot": snapshot_id}
url = reverse("browse-origin-branches", query_params=query_params)
resp = check_html_get_response(
client, url, status_code=200, template_used="browse/branches.html"
)
origin_branches = origin_snapshot[0]
origin_releases = origin_snapshot[1]
origin_branches_url = reverse("browse-origin-branches", query_params=query_params)
assert_contains(resp, f'href="{escape(origin_branches_url)}"')
assert_contains(resp, f"Branches ({snapshot_sizes['revision']})")
origin_releases_url = reverse("browse-origin-releases", query_params=query_params)
nb_releases = len(origin_releases)
if nb_releases > 0:
assert_contains(resp, f'href="{escape(origin_releases_url)}">')
assert_contains(resp, f"Releases ({snapshot_sizes['release']})")
assert_contains(resp, '' % escape(browse_branch_url))
browse_revision_url = reverse(
"browse-revision",
url_args={"sha1_git": branch["revision"]},
query_params=query_params,
)
assert_contains(resp, '' % escape(browse_revision_url))
_check_origin_link(resp, origin_info["url"])
def _origin_releases_test_helper(
client, origin_info, origin_snapshot, snapshot_sizes, snapshot_id=None
):
query_params = {"origin_url": origin_info["url"], "snapshot": snapshot_id}
url = reverse("browse-origin-releases", query_params=query_params)
resp = check_html_get_response(
client, url, status_code=200, template_used="browse/releases.html"
)
origin_releases = origin_snapshot[1]
origin_branches_url = reverse("browse-origin-branches", query_params=query_params)
assert_contains(resp, f'href="{escape(origin_branches_url)}"')
assert_contains(resp, f"Branches ({snapshot_sizes['revision']})")
origin_releases_url = reverse("browse-origin-releases", query_params=query_params)
nb_releases = len(origin_releases)
if nb_releases > 0:
assert_contains(resp, f'href="{escape(origin_releases_url)}"')
assert_contains(resp, f"Releases ({snapshot_sizes['release']}")
assert_contains(resp, '' % escape(browse_release_url))
assert_contains(resp, '' % escape(browse_revision_url))
_check_origin_link(resp, origin_info["url"])
@given(
new_origin(), visit_dates(), revisions(min_size=10, max_size=10), existing_release()
)
def test_origin_branches_pagination_with_alias(
client, archive_data, mocker, new_origin, visit_dates, revisions, existing_release
):
"""
When a snapshot contains a branch or a release alias, pagination links
in the branches / releases view should be displayed.
"""
mocker.patch("swh.web.browse.snapshot_context.PER_PAGE", len(revisions) / 2)
snp_dict = {"branches": {}, "id": hash_to_bytes(random_sha1())}
for i in range(len(revisions)):
branch = "".join(random.choices(string.ascii_lowercase, k=8))
snp_dict["branches"][branch.encode()] = {
"target_type": "revision",
"target": hash_to_bytes(revisions[i]),
}
release = "".join(random.choices(string.ascii_lowercase, k=8))
snp_dict["branches"][b"RELEASE_ALIAS"] = {
"target_type": "alias",
"target": release.encode(),
}
snp_dict["branches"][release.encode()] = {
"target_type": "release",
"target": hash_to_bytes(existing_release),
}
archive_data.origin_add([new_origin])
archive_data.snapshot_add([Snapshot.from_dict(snp_dict)])
visit = archive_data.origin_visit_add(
[OriginVisit(origin=new_origin.url, date=visit_dates[0], type="git",)]
)[0]
visit_status = OriginVisitStatus(
origin=new_origin.url,
visit=visit.visit,
date=now(),
status="full",
snapshot=snp_dict["id"],
)
archive_data.origin_visit_status_add([visit_status])
url = reverse("browse-origin-branches", query_params={"origin_url": new_origin.url})
resp = check_html_get_response(
client, url, status_code=200, template_used="browse/branches.html"
)
assert_contains(resp, 'Newer
')
if len(revision_log_sorted) > per_page:
assert_contains(
resp, 'Older' % escape(next_page_url),
)
for log in revision_log_sorted[:per_page]:
revision_url = reverse("browse-revision", url_args={"sha1_git": log["id"]})
assert_contains(resp, log["id"][:7])
assert_contains(resp, log["author"]["name"])
assert_contains(resp, format_utc_iso_date(log["date"]))
assert_contains(resp, escape(log["message"]))
assert_contains(resp, format_utc_iso_date(log["committer_date"]))
assert_contains(resp, revision_url)
if len(revision_log_sorted) <= per_page:
return
resp = check_html_get_response(
client, next_page_url, status_code=200, template_used="browse/revision-log.html"
)
prev_page_url = reverse(
"browse-revision-log",
url_args={"sha1_git": revision},
query_params={"offset": 0, "per_page": per_page},
)
next_page_url = reverse(
"browse-revision-log",
url_args={"sha1_git": revision},
query_params={"offset": 2 * per_page, "per_page": per_page},
)
nb_log_entries = len(revision_log_sorted) - per_page
if nb_log_entries > per_page:
nb_log_entries = per_page
assert_contains(resp, ' Newer' % escape(prev_page_url)
)
if len(revision_log_sorted) > 2 * per_page:
assert_contains(
resp, 'Older' % escape(next_page_url),
)
if len(revision_log_sorted) <= 2 * per_page:
return
resp = check_html_get_response(
client, next_page_url, status_code=200, template_used="browse/revision-log.html"
)
prev_page_url = reverse(
"browse-revision-log",
url_args={"sha1_git": revision},
query_params={"offset": per_page, "per_page": per_page},
)
next_page_url = reverse(
"browse-revision-log",
url_args={"sha1_git": revision},
query_params={"offset": 3 * per_page, "per_page": per_page},
)
nb_log_entries = len(revision_log_sorted) - 2 * per_page
if nb_log_entries > per_page:
nb_log_entries = per_page
assert_contains(resp, ' Newer' % escape(prev_page_url)
)
if len(revision_log_sorted) > 3 * per_page:
assert_contains(
resp, 'Older' % escape(next_page_url),
)
@given(revision(), unknown_revision(), new_origin())
def test_revision_request_errors(client, revision, unknown_revision, new_origin):
url = reverse("browse-revision", url_args={"sha1_git": unknown_revision})
resp = check_html_get_response(
client, url, status_code=404, template_used="error.html"
)
assert_contains(
resp, "Revision with sha1_git %s not found" % unknown_revision, status_code=404
)
url = reverse(
"browse-revision",
url_args={"sha1_git": revision},
query_params={"origin_url": new_origin.url},
)
resp = check_html_get_response(
client, url, status_code=404, template_used="error.html"
)
assert_contains(
resp, "the origin mentioned in your request" " appears broken", status_code=404
)
@given(revision())
def test_revision_uppercase(client, revision):
url = reverse(
"browse-revision-uppercase-checksum", url_args={"sha1_git": revision.upper()}
)
resp = check_html_get_response(client, url, status_code=302)
redirect_url = reverse("browse-revision", url_args={"sha1_git": revision})
assert resp["location"] == redirect_url
def _revision_browse_checks(
client, archive_data, revision, origin_url=None, snapshot=None
):
query_params = {}
if origin_url:
query_params["origin_url"] = origin_url
if snapshot:
query_params["snapshot"] = snapshot["id"]
url = reverse(
"browse-revision", url_args={"sha1_git": revision}, query_params=query_params
)
revision_data = archive_data.revision_get(revision)
author_name = revision_data["author"]["name"]
committer_name = revision_data["committer"]["name"]
dir_id = revision_data["directory"]
if origin_url:
snapshot = archive_data.snapshot_get_latest(origin_url)
history_url = reverse(
"browse-origin-log", query_params={"revision": revision, **query_params},
)
elif snapshot:
history_url = reverse(
"browse-snapshot-log",
url_args={"snapshot_id": snapshot["id"]},
query_params={"revision": revision},
)
else:
history_url = reverse("browse-revision-log", url_args={"sha1_git": revision})
resp = check_html_get_response(
client, url, status_code=200, template_used="browse/revision.html"
)
assert_contains(resp, author_name)
assert_contains(resp, committer_name)
assert_contains(resp, history_url)
for parent in revision_data["parents"]:
parent_url = reverse(
"browse-revision", url_args={"sha1_git": parent}, query_params=query_params
)
assert_contains(resp, '%s' % (escape(parent_url), parent[:7]))
author_date = revision_data["date"]
committer_date = revision_data["committer_date"]
message_lines = revision_data["message"].split("\n")
assert_contains(resp, format_utc_iso_date(author_date))
assert_contains(resp, format_utc_iso_date(committer_date))
assert_contains(resp, escape(message_lines[0]))
assert_contains(resp, escape("\n".join(message_lines[1:])))
assert_contains(resp, "vault-cook-directory")
assert_contains(resp, "vault-cook-revision")
swh_rev_id = gen_swhid("revision", revision)
swh_rev_id_url = reverse("browse-swhid", url_args={"swhid": swh_rev_id})
assert_contains(resp, swh_rev_id)
assert_contains(resp, swh_rev_id_url)
swh_dir_id = gen_swhid("directory", dir_id)
swh_dir_id_url = reverse("browse-swhid", url_args={"swhid": swh_dir_id})
assert_contains(resp, swh_dir_id)
assert_contains(resp, swh_dir_id_url)
if origin_url:
assert_contains(resp, "swh-take-new-snapshot")
swh_rev_id = gen_swhid(REVISION, revision)
swh_rev_id_url = reverse("browse-swhid", url_args={"swhid": swh_rev_id})
if origin_url:
browse_origin_url = reverse(
"browse-origin", query_params={"origin_url": origin_url}
)
assert_contains(resp, f'href="{browse_origin_url}"')
elif snapshot:
swh_snp_id = gen_swhid("snapshot", snapshot["id"])
swh_snp_id_url = reverse("browse-swhid", url_args={"swhid": swh_snp_id})
assert_contains(resp, f'href="{swh_snp_id_url}"')
swhid_context = {}
if origin_url:
swhid_context["origin"] = origin_url
if snapshot:
swhid_context["visit"] = gen_swhid(SNAPSHOT, snapshot["id"])
swh_rev_id = gen_swhid(REVISION, revision, metadata=swhid_context)
swh_rev_id_url = reverse("browse-swhid", url_args={"swhid": swh_rev_id})
assert_contains(resp, swh_rev_id)
assert_contains(resp, swh_rev_id_url)
swhid_context["anchor"] = gen_swhid(REVISION, revision)
swh_dir_id = gen_swhid(DIRECTORY, dir_id, metadata=swhid_context)
swh_dir_id_url = reverse("browse-swhid", url_args={"swhid": swh_dir_id})
assert_contains(resp, swh_dir_id)
assert_contains(resp, swh_dir_id_url)
@given(revision())
def test_revision_invalid_path(client, archive_data, revision):
path = "foo/bar"
url = reverse(
"browse-revision", url_args={"sha1_git": revision}, query_params={"path": path}
)
resp = check_html_get_response(
client, url, status_code=404, template_used="browse/revision.html"
)
directory = archive_data.revision_get(revision)["directory"]
error_message = (
f"Directory entry with path {path} from root directory {directory} not found"
)
assert_contains(resp, error_message, status_code=404)
assert_not_contains(resp, "swh-metadata-popover", status_code=404)
@given(directory(), new_person(), new_swh_date())
def test_revision_metadata_display(archive_data, client, directory, person, date):
metadata = {"foo": "bar"}
revision = Revision(
directory=hash_to_bytes(directory),
author=person,
committer=person,
message=b"commit message",
date=TimestampWithTimezone.from_datetime(date),
committer_date=TimestampWithTimezone.from_datetime(date),
synthetic=False,
type=RevisionType.GIT,
metadata=metadata,
)
archive_data.revision_add([revision])
url = reverse("browse-revision", url_args={"sha1_git": hash_to_hex(revision.id)})
resp = check_html_get_response(
client, url, status_code=200, template_used="browse/revision.html"
)
assert_contains(resp, "swh-metadata-popover")
assert_contains(resp, escape(json.dumps(metadata, indent=4)))
diff --git a/swh/web/tests/common/test_origin_save.py b/swh/web/tests/common/test_origin_save.py
index 1bf03f80..d9faf684 100644
--- a/swh/web/tests/common/test_origin_save.py
+++ b/swh/web/tests/common/test_origin_save.py
@@ -1,759 +1,762 @@
# Copyright (C) 2019-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import datetime, timedelta, timezone
from functools import partial
import re
from typing import Optional
+import uuid
import iso8601
import pytest
import requests
from swh.core.pytest_plugin import get_response_cb
+from swh.scheduler.utils import create_oneshot_task_dict
from swh.web.common.exc import BadInputExc
from swh.web.common.models import (
SAVE_REQUEST_ACCEPTED,
SAVE_TASK_FAILED,
SAVE_TASK_RUNNING,
SAVE_TASK_SCHEDULED,
SAVE_TASK_SUCCEEDED,
VISIT_STATUS_CREATED,
VISIT_STATUS_FULL,
VISIT_STATUS_ONGOING,
VISIT_STATUS_PARTIAL,
SaveOriginRequest,
)
from swh.web.common.origin_save import (
_check_origin_exists,
_check_visit_type_savable,
_visit_type_task,
_visit_type_task_privileged,
get_savable_visit_types,
get_save_origin_requests,
get_save_origin_task_info,
origin_exists,
refresh_save_origin_request_statuses,
)
from swh.web.common.typing import (
OriginExistenceCheckInfo,
OriginVisitInfo,
SaveOriginRequestInfo,
)
from swh.web.config import get_config
_es_url = "http://esnode1.internal.softwareheritage.org:9200"
_es_workers_index_url = "%s/swh_workers-*" % _es_url
_origin_url = "https://gitlab.com/inkscape/inkscape"
_visit_type = "git"
-_task_id = 203525448
+_task_id = 1
@pytest.fixture(autouse=True)
def requests_mock_datadir(datadir, requests_mock_datadir):
"""Override default behavior to deal with post method"""
cb = partial(get_response_cb, datadir=datadir)
requests_mock_datadir.post(re.compile("https?://"), body=cb)
return requests_mock_datadir
@pytest.mark.django_db
-def test_get_save_origin_archived_task_info(mocker):
- _get_save_origin_task_info_test(mocker, task_archived=True)
+def test_get_save_origin_archived_task_info(swh_scheduler):
+ _get_save_origin_task_info_test(swh_scheduler, task_archived=True)
@pytest.mark.django_db
-def test_get_save_origin_task_full_info_with_es(mocker):
- _get_save_origin_task_info_test(mocker, es_available=True)
+def test_get_save_origin_task_info_without_es(swh_scheduler):
+ _get_save_origin_task_info_test(swh_scheduler, es_available=False)
-@pytest.mark.django_db
-def test_get_save_origin_task_info_with_es(mocker):
- _get_save_origin_task_info_test(mocker, es_available=True, full_info=False)
-
-
-@pytest.mark.django_db
-def test_get_save_origin_task_info_without_es(mocker):
- _get_save_origin_task_info_test(mocker, es_available=False)
-
-
-def _mock_scheduler(
- mocker,
+def _fill_scheduler_db(
+ swh_scheduler,
task_status="completed",
task_run_status="eventful",
task_archived=False,
visit_started_date=None,
):
- mock_scheduler = mocker.patch("swh.web.common.origin_save.scheduler")
- task = {
- "arguments": {"args": [], "kwargs": {"repo_url": _origin_url},},
- "current_interval": timedelta(days=64),
- "id": _task_id,
- "next_run": datetime.now(tz=timezone.utc) + timedelta(days=64),
- "policy": "oneshot",
- "priority": "high",
- "retries_left": 0,
- "status": task_status,
- "type": "load-git",
- }
- mock_scheduler.get_tasks.return_value = [dict(task) if not task_archived else None]
-
- task_run = {
- "backend_id": "f00c712c-e820-41ce-a07c-9bf8df914205",
- "ended": datetime.now(tz=timezone.utc) + timedelta(minutes=5),
- "id": 654270631,
- "metadata": {},
- "scheduled": datetime.now(tz=timezone.utc),
- "started": visit_started_date,
- "status": task_run_status,
- "task": _task_id,
- }
- mock_scheduler.get_task_runs.return_value = [
- dict(task_run) if not task_archived else None
- ]
+ task = task_run = None
+ if not task_archived:
+ task = swh_scheduler.create_tasks(
+ [create_oneshot_task_dict("load-git", repo_url=_origin_url)]
+ )[0]
+ backend_id = str(uuid.uuid4())
+
+ if task_status != "next_run_not_scheduled":
+ swh_scheduler.schedule_task_run(task["id"], backend_id)
+
+ if task_run_status is not None:
+ swh_scheduler.start_task_run(backend_id)
+ task_run = dict(
+ swh_scheduler.end_task_run(backend_id, task_run_status).items()
+ )
return task, task_run
@pytest.mark.parametrize(
"wrong_type,privileged_user",
[
("dummy", True),
("dumb", False),
("archives", False), # when no privilege, this is rejected
],
)
-def test__check_visit_type_savable(wrong_type, privileged_user):
+def test_check_visit_type_savable(wrong_type, privileged_user, swh_scheduler):
+
+ swh_scheduler.add_load_archive_task_type()
with pytest.raises(BadInputExc, match="Allowed types"):
_check_visit_type_savable(wrong_type, privileged_user)
# when privileged_user, the following is accepted though
_check_visit_type_savable("archives", True)
-def test_get_savable_visit_types():
+def test_get_savable_visit_types(swh_scheduler):
+
+ swh_scheduler.add_load_archive_task_type()
+
default_list = list(_visit_type_task.keys())
assert set(get_savable_visit_types()) == set(default_list)
privileged_list = default_list.copy()
privileged_list += list(_visit_type_task_privileged.keys())
assert set(get_savable_visit_types(privileged_user=True)) == set(privileged_list)
def _get_save_origin_task_info_test(
- mocker, task_archived=False, es_available=True, full_info=True
+ swh_scheduler, task_archived=False, es_available=True, full_info=True
):
swh_web_config = get_config()
if es_available:
swh_web_config.update({"es_workers_index_url": _es_workers_index_url})
else:
swh_web_config.update({"es_workers_index_url": ""})
sor = SaveOriginRequest.objects.create(
request_date=datetime.now(tz=timezone.utc),
visit_type=_visit_type,
origin_url="https://gitlab.com/inkscape/inkscape",
status=SAVE_REQUEST_ACCEPTED,
visit_date=datetime.now(tz=timezone.utc) + timedelta(hours=1),
loading_task_id=_task_id,
)
- task, task_run = _mock_scheduler(mocker, task_archived=task_archived)
+ task, task_run = _fill_scheduler_db(swh_scheduler, task_archived=task_archived)
es_response = requests.post("%s/_search" % _es_workers_index_url).json()
task_exec_data = es_response["hits"]["hits"][-1]["_source"]
sor_task_info = get_save_origin_task_info(sor.id, full_info=full_info)
expected_result = (
{
"type": task["type"],
"arguments": task["arguments"],
"id": task["id"],
"backend_id": task_run["backend_id"],
"scheduled": task_run["scheduled"],
"started": task_run["started"],
"ended": task_run["ended"],
"status": task_run["status"],
"visit_status": sor.visit_status,
}
if not task_archived
else {}
)
if es_available and not task_archived:
expected_result.update(
{
"message": task_exec_data["message"],
"name": task_exec_data["swh_task_name"],
"worker": task_exec_data["hostname"],
}
)
if not full_info:
expected_result.pop("id", None)
expected_result.pop("backend_id", None)
expected_result.pop("worker", None)
if "message" in expected_result:
message = ""
message_lines = expected_result["message"].split("\n")
for line in message_lines:
if line.startswith("Traceback"):
break
message += f"{line}\n"
message += message_lines[-1]
expected_result["message"] = message
assert sor_task_info == expected_result
@pytest.mark.django_db
-def test_get_save_origin_requests_find_visit_date(mocker):
+def test_get_save_origin_requests_find_visit_date(mocker, swh_scheduler):
# create a save request
SaveOriginRequest.objects.create(
request_date=datetime.now(tz=timezone.utc),
visit_type=_visit_type,
origin_url=_origin_url,
status=SAVE_REQUEST_ACCEPTED,
visit_date=None,
loading_task_id=_task_id,
)
# mock scheduler and archive
- _mock_scheduler(mocker)
+ _fill_scheduler_db(swh_scheduler)
mock_archive = mocker.patch("swh.web.common.origin_save.archive")
mock_archive.lookup_origin.return_value = {"url": _origin_url}
mock_get_origin_visits = mocker.patch(
"swh.web.common.origin_save.get_origin_visits"
)
# create a visit for the save request
visit_date = datetime.now(tz=timezone.utc).isoformat()
visit_info = OriginVisitInfo(
date=visit_date,
formatted_date="",
metadata={},
origin=_origin_url,
snapshot="",
status=VISIT_STATUS_FULL,
type=_visit_type,
url="",
visit=34,
)
mock_get_origin_visits.return_value = [visit_info]
# check visit date has been correctly found
sors = get_save_origin_requests(_visit_type, _origin_url)
assert len(sors) == 1
assert sors[0]["save_task_status"] == SAVE_TASK_SUCCEEDED
assert sors[0]["visit_date"] == visit_date
mock_get_origin_visits.assert_called_once()
# check visit is not searched again when it has been found
get_save_origin_requests(_visit_type, _origin_url)
mock_get_origin_visits.assert_called_once()
# check visit date are not searched for save requests older than
# one month
sor = SaveOriginRequest.objects.create(
visit_type=_visit_type,
origin_url=_origin_url,
status=SAVE_REQUEST_ACCEPTED,
loading_task_id=_task_id,
visit_date=None,
)
sor.request_date = datetime.now(tz=timezone.utc) - timedelta(days=31)
sor.save()
- _mock_scheduler(mocker, task_status="disabled", task_run_status="failed")
+ _fill_scheduler_db(swh_scheduler, task_status="disabled", task_run_status="failed")
sors = get_save_origin_requests(_visit_type, _origin_url)
assert len(sors) == 2
assert sors[0]["save_task_status"] == SAVE_TASK_FAILED
assert sors[0]["visit_date"] is None
mock_get_origin_visits.assert_called_once()
def _get_save_origin_requests(
- mocker, load_status, visit_status, request_date: Optional[datetime] = None
+ mocker,
+ swh_scheduler,
+ load_status,
+ visit_status,
+ request_date: Optional[datetime] = None,
):
"""Wrapper around the get_origin_save_origin_request call.
"""
SaveOriginRequest.objects.create(
request_date=datetime.now(tz=timezone.utc),
visit_type=_visit_type,
visit_status=visit_status,
origin_url=_origin_url,
status=SAVE_REQUEST_ACCEPTED,
visit_date=None,
loading_task_id=_task_id,
)
# mock scheduler and archives
- _mock_scheduler(
- mocker, task_status="next_run_scheduled", task_run_status=load_status
+ _fill_scheduler_db(
+ swh_scheduler, task_status="next_run_scheduled", task_run_status=load_status
)
mock_archive = mocker.patch("swh.web.common.origin_save.archive")
mock_archive.lookup_origin.return_value = {"url": _origin_url}
mock_get_origin_visits = mocker.patch(
"swh.web.common.origin_save.get_origin_visits"
)
# create a visit for the save request with status created
visit_date = datetime.now(tz=timezone.utc).isoformat()
visit_info = OriginVisitInfo(
date=visit_date,
formatted_date="",
metadata={},
origin=_origin_url,
snapshot="", # make mypy happy
status=visit_status,
type=_visit_type,
url="",
visit=34,
)
mock_get_origin_visits.return_value = [visit_info]
sors = get_save_origin_requests(_visit_type, _origin_url)
mock_get_origin_visits.assert_called_once()
return sors
@pytest.mark.parametrize("visit_date", [None, "some-date"])
def test_from_save_origin_request_to_save_request_info_dict(visit_date):
"""Ensure save request to json serializable dict is fine
"""
request_date = datetime.now(tz=timezone.utc)
_visit_date = request_date + timedelta(minutes=5) if visit_date else None
request_date = datetime.now(tz=timezone.utc)
sor = SaveOriginRequest(
request_date=request_date,
visit_type=_visit_type,
visit_status=VISIT_STATUS_FULL,
origin_url=_origin_url,
status=SAVE_REQUEST_ACCEPTED,
loading_task_status=None,
visit_date=_visit_date,
loading_task_id=1,
)
assert sor.to_dict() == SaveOriginRequestInfo(
id=sor.id,
origin_url=sor.origin_url,
visit_type=sor.visit_type,
save_request_date=sor.request_date.isoformat(),
save_request_status=sor.status,
save_task_status=sor.loading_task_status,
visit_status=sor.visit_status,
visit_date=_visit_date.isoformat() if _visit_date else None,
loading_task_id=sor.loading_task_id,
)
def test__check_origin_exists_404(requests_mock):
url_ko = "https://example.org/some-inexistant-url"
requests_mock.head(url_ko, status_code=404)
with pytest.raises(BadInputExc, match="not exist"):
_check_origin_exists(url_ko)
def test__check_origin_exists_200(requests_mock):
url = "https://example.org/url"
requests_mock.head(url, status_code=200)
# passes the check
actual_metadata = _check_origin_exists(url)
# and we actually may have retrieved some metadata on the origin
assert actual_metadata == origin_exists(url)
def test_origin_exists_404(requests_mock):
"""Origin which does not exist should be reported as inexistent"""
url_ko = "https://example.org/some-inexistant-url"
requests_mock.head(url_ko, status_code=404)
actual_result = origin_exists(url_ko)
assert actual_result == OriginExistenceCheckInfo(
origin_url=url_ko, exists=False, last_modified=None, content_length=None,
)
def test_origin_exists_200_no_data(requests_mock):
"""Existing origin should be reported as such (no extra information)"""
url = "http://example.org/real-url"
requests_mock.head(
url, status_code=200,
)
actual_result = origin_exists(url)
assert actual_result == OriginExistenceCheckInfo(
origin_url=url, exists=True, last_modified=None, content_length=None,
)
def test_origin_exists_200_with_data(requests_mock):
"""Existing origin should be reported as such (+ extra information)"""
url = "http://example.org/real-url"
requests_mock.head(
url,
status_code=200,
headers={
"content-length": "10",
"last-modified": "Sun, 21 Aug 2011 16:26:32 GMT",
},
)
actual_result = origin_exists(url)
assert actual_result == OriginExistenceCheckInfo(
origin_url=url,
exists=True,
content_length=10,
last_modified="2011-08-21T16:26:32",
)
def test_origin_exists_internet_archive(requests_mock):
"""Edge case where an artifact URL to check existence is hosted on the
Internet Archive"""
url = (
"https://web.archive.org/web/20100705043309/"
"http://www.cs.unm.edu/~mccune/old-ftp/eqp-09e.tar.gz"
)
redirect_url = (
"https://web.archive.org/web/20100610004108/"
"http://www.cs.unm.edu/~mccune/old-ftp/eqp-09e.tar.gz"
)
requests_mock.head(
url, status_code=302, headers={"Location": redirect_url,},
)
requests_mock.head(
redirect_url,
status_code=200,
headers={
"X-Archive-Orig-Last-Modified": "Tue, 12 May 2009 22:09:43 GMT",
"X-Archive-Orig-Content-Length": "121421",
},
)
actual_result = origin_exists(url)
assert actual_result == OriginExistenceCheckInfo(
origin_url=url,
exists=True,
content_length=121421,
last_modified="2009-05-12T22:09:43",
)
def test_origin_exists_200_with_data_unexpected_date_format(requests_mock):
"""Existing origin should be ok, unexpected last modif time result in no time"""
url = "http://example.org/real-url2"
# this is parsable but not as expected
unexpected_format_date = "Sun, 21 Aug 2021 16:26:32"
requests_mock.head(
url, status_code=200, headers={"last-modified": unexpected_format_date,},
)
actual_result = origin_exists(url)
# so the resulting date is None
assert actual_result == OriginExistenceCheckInfo(
origin_url=url, exists=True, content_length=None, last_modified=None,
)
@pytest.mark.django_db
@pytest.mark.parametrize("visit_status", [VISIT_STATUS_CREATED, VISIT_STATUS_ONGOING,])
-def test_get_save_origin_requests_no_visit_date_found(mocker, visit_status):
+def test_get_save_origin_requests_no_visit_date_found(
+ mocker, swh_scheduler, visit_status
+):
"""Uneventful visits with failed visit status are marked as failed
"""
sors = _get_save_origin_requests(
- mocker, load_status="scheduled", visit_status=visit_status,
+ mocker, swh_scheduler, load_status="scheduled", visit_status=visit_status,
)
# check no visit date has been found
assert len(sors) == 1
assert sors[0]["save_task_status"] == SAVE_TASK_RUNNING
assert sors[0]["visit_date"] is not None
assert sors[0]["visit_status"] == visit_status
@pytest.mark.django_db
@pytest.mark.parametrize("visit_status", ["not_found", "failed",])
-def test_get_save_origin_requests_no_failed_status_override(mocker, visit_status):
+def test_get_save_origin_requests_no_failed_status_override(
+ mocker, swh_scheduler, visit_status
+):
"""Uneventful visits with failed statuses (failed, not found) are marked as failed
"""
sors = _get_save_origin_requests(
- mocker, load_status="uneventful", visit_status=visit_status
+ mocker, swh_scheduler, load_status="uneventful", visit_status=visit_status
)
assert len(sors) == 1
assert sors[0]["save_task_status"] == SAVE_TASK_FAILED
visit_date = sors[0]["visit_date"]
assert visit_date is not None
sors = get_save_origin_requests(_visit_type, _origin_url)
assert len(sors) == 1
assert sors[0]["save_task_status"] == SAVE_TASK_FAILED
assert sors[0]["visit_status"] == visit_status
@pytest.mark.django_db
@pytest.mark.parametrize(
"load_status,visit_status",
[
("eventful", VISIT_STATUS_FULL),
("eventful", VISIT_STATUS_PARTIAL),
("uneventful", VISIT_STATUS_PARTIAL),
],
)
-def test_get_visit_info_for_save_request_succeeded(mocker, load_status, visit_status):
+def test_get_visit_info_for_save_request_succeeded(
+ mocker, swh_scheduler, load_status, visit_status
+):
"""Nominal scenario, below 30 days, returns something"""
sors = _get_save_origin_requests(
- mocker, load_status=load_status, visit_status=visit_status
+ mocker, swh_scheduler, load_status=load_status, visit_status=visit_status
)
assert len(sors) == 1
assert sors[0]["save_task_status"] == SAVE_TASK_SUCCEEDED
assert sors[0]["visit_date"] is not None
assert sors[0]["visit_status"] == visit_status
sors = get_save_origin_requests(_visit_type, _origin_url)
assert sors[0]["save_task_status"] == SAVE_TASK_SUCCEEDED
assert sors[0]["visit_status"] == visit_status
@pytest.mark.django_db
@pytest.mark.parametrize("load_status", ["eventful", "uneventful",])
-def test_get_visit_info_incomplete_visit_still_successful(mocker, load_status):
+def test_get_visit_info_incomplete_visit_still_successful(
+ mocker, swh_scheduler, load_status
+):
"""Incomplete visit information, yet the task is updated partially
"""
sors = _get_save_origin_requests(
- mocker, load_status=load_status, visit_status=None,
+ mocker, swh_scheduler, load_status=load_status, visit_status=None,
)
assert len(sors) == 1
assert sors[0]["save_task_status"] == SAVE_TASK_SUCCEEDED
# As the entry is missing the following information though
assert sors[0]["visit_date"] is not None
assert sors[0]["visit_status"] is None
# It's still detected as to be updated by the refresh routine
sors = refresh_save_origin_request_statuses()
assert len(sors) == 1
assert sors[0]["save_task_status"] == SAVE_TASK_SUCCEEDED
assert sors[0]["visit_date"] is not None
assert sors[0]["visit_status"] is None
@pytest.mark.django_db
-def test_refresh_in_progress_save_request_statuses(mocker, api_client, archive_data):
+def test_refresh_in_progress_save_request_statuses(
+ mocker, swh_scheduler, api_client, archive_data
+):
"""Refresh a pending save origins requests and update if the status changes
"""
date_now = datetime.now(tz=timezone.utc)
date_pivot = date_now - timedelta(days=30)
visit_started_date = date_now - timedelta(minutes=1)
# returned visit status
SaveOriginRequest.objects.create(
request_date=datetime.now(tz=timezone.utc),
visit_type=_visit_type,
visit_status=VISIT_STATUS_CREATED,
origin_url=_origin_url,
status=SAVE_REQUEST_ACCEPTED,
visit_date=None,
loading_task_id=_task_id,
)
# mock scheduler and archives
- _mock_scheduler(
- mocker, task_status="next_run_scheduled", task_run_status=SAVE_TASK_SCHEDULED
+ _fill_scheduler_db(
+ swh_scheduler,
+ task_status="next_run_scheduled",
+ task_run_status=SAVE_TASK_SCHEDULED,
)
mock_archive = mocker.patch("swh.web.common.origin_save.archive")
mock_archive.lookup_origin.return_value = {"url": _origin_url}
mock_get_origin_visits = mocker.patch(
"swh.web.common.origin_save.get_origin_visits"
)
# create a visit for the save request with status created
visit_date = datetime.now(tz=timezone.utc).isoformat()
visit_info = OriginVisitInfo(
date=visit_date,
formatted_date="",
metadata={},
origin=_origin_url,
snapshot="", # make mypy happy
status=VISIT_STATUS_CREATED,
type=_visit_type,
url="",
visit=34,
)
mock_get_origin_visits.return_value = [visit_info]
# make the scheduler return a running event
- _mock_scheduler(
- mocker,
+ _fill_scheduler_db(
+ swh_scheduler,
task_status="next_run_scheduled",
task_run_status="started",
visit_started_date=visit_started_date,
)
# The visit is detected but still running
sors = refresh_save_origin_request_statuses()
assert mock_get_origin_visits.called and mock_get_origin_visits.call_count == 1
assert len(sors) == 1
for sor in sors:
assert iso8601.parse_date(sor["save_request_date"]) >= date_pivot
# The status is updated
assert sor["save_task_status"] == SAVE_TASK_RUNNING
# but the following entries are missing so it's not updated
assert sor["visit_date"] is not None
assert sor["visit_status"] == VISIT_STATUS_CREATED
# make the visit status completed
# make the scheduler return a running event
- _mock_scheduler(
- mocker,
+ _fill_scheduler_db(
+ swh_scheduler,
task_status="completed",
task_run_status="eventful",
visit_started_date=visit_started_date,
)
# This time around, the origin returned will have all required information updated
# (visit date and visit status in final state)
visit_date = datetime.now(tz=timezone.utc).isoformat()
visit_info.update({"date": visit_date, "status": VISIT_STATUS_FULL})
mock_get_origin_visits.return_value = [visit_info]
# Detected entry, this time it should be updated
sors = refresh_save_origin_request_statuses()
assert len(sors) == 1
assert mock_get_origin_visits.called and mock_get_origin_visits.call_count == 1 + 1
for sor in sors:
assert iso8601.parse_date(sor["save_request_date"]) >= date_pivot
# as it turns out, in this test, this won't update anything as no new status got
# returned by the scheduler
assert sor["save_task_status"] == SAVE_TASK_SUCCEEDED
assert sor["visit_date"] == visit_date
assert sor["visit_status"] == VISIT_STATUS_FULL
# Once in final state, a sor should not be updated anymore
sors = refresh_save_origin_request_statuses()
assert len(sors) == 0
@pytest.mark.django_db
-def test_refresh_save_request_statuses(mocker, api_client, archive_data):
+def test_refresh_save_request_statuses(mocker, swh_scheduler, api_client, archive_data):
"""Refresh filters save origins requests and update if changes
"""
date_now = datetime.now(tz=timezone.utc)
date_pivot = date_now - timedelta(days=30)
# returned visit status
SaveOriginRequest.objects.create(
request_date=datetime.now(tz=timezone.utc),
visit_type=_visit_type,
visit_status=None,
origin_url=_origin_url,
status=SAVE_REQUEST_ACCEPTED,
visit_date=None,
loading_task_id=_task_id,
)
# mock scheduler and archives
- _mock_scheduler(
- mocker, task_status="next_run_scheduled", task_run_status=SAVE_TASK_SCHEDULED
+ _fill_scheduler_db(
+ swh_scheduler,
+ task_status="next_run_scheduled",
+ task_run_status=SAVE_TASK_SCHEDULED,
)
mock_archive = mocker.patch("swh.web.common.origin_save.archive")
mock_archive.lookup_origin.return_value = {"url": _origin_url}
mock_get_origin_visits = mocker.patch(
"swh.web.common.origin_save.get_origin_visits"
)
# create a visit for the save request with status created
visit_date = datetime.now(tz=timezone.utc).isoformat()
visit_info = OriginVisitInfo(
date=visit_date,
formatted_date="",
metadata={},
origin=_origin_url,
snapshot="", # make mypy happy
status=VISIT_STATUS_CREATED,
type=_visit_type,
url="",
visit=34,
)
mock_get_origin_visits.return_value = [visit_info]
# no changes so refresh does detect the entry but does nothing
sors = refresh_save_origin_request_statuses()
assert len(sors) == 1
for sor in sors:
assert iso8601.parse_date(sor["save_request_date"]) >= date_pivot
# as it turns out, in this test, this won't update anything as no new status got
# returned by the scheduler
assert sor["save_task_status"] == SAVE_TASK_RUNNING
# Information is empty
assert sor["visit_date"] == visit_date
assert sor["visit_status"] == VISIT_STATUS_CREATED
# A save code now entry is detected for update, but as nothing changes, the entry
# remains in the same state
sors = refresh_save_origin_request_statuses()
assert len(sors) == 1
for sor in sors:
assert iso8601.parse_date(sor["save_request_date"]) >= date_pivot
# Status is not updated as no new information is available on the visit status
# and the task status has not moved
assert sor["save_task_status"] == SAVE_TASK_RUNNING
# Information is empty
assert sor["visit_date"] == visit_date
assert sor["visit_status"] == VISIT_STATUS_CREATED
# This time around, the origin returned will have all information updated
# create a visit for the save request with status created
visit_date = datetime.now(tz=timezone.utc).isoformat()
visit_info = OriginVisitInfo(
date=visit_date,
formatted_date="",
metadata={},
origin=_origin_url,
snapshot="", # make mypy happy
status=VISIT_STATUS_FULL,
type=_visit_type,
url="",
visit=34,
)
mock_get_origin_visits.return_value = [visit_info]
# Detected entry, this time it should be updated
sors = refresh_save_origin_request_statuses()
assert len(sors) == 1
for sor in sors:
assert iso8601.parse_date(sor["save_request_date"]) >= date_pivot
# as it turns out, in this test, this won't update anything as no new status got
# returned by the scheduler
assert sor["save_task_status"] == SAVE_TASK_SUCCEEDED
assert sor["visit_date"] == visit_date
assert sor["visit_status"] == VISIT_STATUS_FULL
# This time, nothing left to update
sors = refresh_save_origin_request_statuses()
assert len(sors) == 0
diff --git a/swh/web/tests/conftest.py b/swh/web/tests/conftest.py
index b443dd8a..fead2e30 100644
--- a/swh/web/tests/conftest.py
+++ b/swh/web/tests/conftest.py
@@ -1,431 +1,482 @@
# Copyright (C) 2018-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
+from datetime import timedelta
import json
import os
import shutil
from subprocess import PIPE, run
import sys
from typing import Any, Dict, List, Optional
from _pytest.python import Function
from hypothesis import HealthCheck, settings
import pytest
from django.core.cache import cache
from rest_framework.test import APIClient, APIRequestFactory
from swh.model.hashutil import ALGORITHMS, hash_to_bytes
+from swh.scheduler.tests.common import TASK_TYPES
from swh.storage.algos.origin import origin_get_latest_visit_status
from swh.storage.algos.snapshot import snapshot_get_all_branches, snapshot_get_latest
from swh.web.auth.utils import OIDC_SWH_WEB_CLIENT_ID
from swh.web.common import converters
+from swh.web.common.origin_save import get_scheduler_load_task_types
from swh.web.common.typing import OriginVisitInfo
from swh.web.config import get_config
from swh.web.tests.data import get_tests_data, override_storages
# Used to skip some tests
ctags_json_missing = (
shutil.which("ctags") is None
or b"+json" not in run(["ctags", "--version"], stdout=PIPE).stdout
)
fossology_missing = shutil.which("nomossa") is None
# Register some hypothesis profiles
settings.register_profile("default", settings())
# we use getattr here to keep mypy happy regardless hypothesis version
function_scoped_fixture_check = (
[getattr(HealthCheck, "function_scoped_fixture")]
if hasattr(HealthCheck, "function_scoped_fixture")
else []
)
suppress_health_check = [
HealthCheck.too_slow,
HealthCheck.filter_too_much,
] + function_scoped_fixture_check
settings.register_profile(
"swh-web", settings(deadline=None, suppress_health_check=suppress_health_check,),
)
settings.register_profile(
"swh-web-fast",
settings(
deadline=None, max_examples=5, suppress_health_check=suppress_health_check,
),
)
def pytest_configure(config):
# Use fast hypothesis profile by default if none has been
# explicitly specified in pytest option
if config.getoption("--hypothesis-profile") is None:
settings.load_profile("swh-web-fast")
# Small hack in order to be able to run the unit tests
# without static assets generated by webpack.
# Those assets are not really needed for the Python tests
# but the django templates will fail to load due to missing
# generated file webpack-stats.json describing the js and css
# files to include.
# So generate a dummy webpack-stats.json file to overcome
# that issue.
test_dir = os.path.dirname(__file__)
# location of the static folder when running tests through tox
data_dir = os.path.join(sys.prefix, "share/swh/web")
static_dir = os.path.join(data_dir, "static")
if not os.path.exists(static_dir):
# location of the static folder when running tests locally with pytest
static_dir = os.path.join(test_dir, "../../../static")
webpack_stats = os.path.join(static_dir, "webpack-stats.json")
if os.path.exists(webpack_stats):
return
bundles_dir = os.path.join(test_dir, "../../../assets/src/bundles")
if not os.path.exists(bundles_dir):
# location of the bundles folder when running tests with tox
bundles_dir = os.path.join(data_dir, "assets/src/bundles")
_, bundles, _ = next(os.walk(bundles_dir))
mock_webpack_stats = {
"status": "done",
"publicPath": "/static",
"chunks": {},
"assets": {},
}
for bundle in bundles:
asset = f"js/{bundle}.js"
mock_webpack_stats["chunks"][bundle] = [asset]
mock_webpack_stats["assets"][asset] = {
"name": asset,
"publicPath": f"/static/{asset}",
}
with open(webpack_stats, "w") as outfile:
json.dump(mock_webpack_stats, outfile)
# Clear Django cache before each test
@pytest.fixture(autouse=True)
def django_cache_cleared():
cache.clear()
# Alias rf fixture from pytest-django
@pytest.fixture
def request_factory(rf):
return rf
# Fixture to get test client from Django REST Framework
@pytest.fixture
def api_client():
return APIClient()
# Fixture to get API request factory from Django REST Framework
@pytest.fixture
def api_request_factory():
return APIRequestFactory()
# Initialize tests data
@pytest.fixture(scope="function", autouse=True)
def tests_data():
data = get_tests_data(reset=True)
# Update swh-web configuration to use the in-memory storages
# instantiated in the tests.data module
override_storages(
data["storage"], data["idx_storage"], data["search"], data["counters"]
)
return data
# Fixture to manipulate data from a sample archive used in the tests
@pytest.fixture(scope="function")
def archive_data(tests_data):
return _ArchiveData(tests_data)
# Fixture to manipulate indexer data from a sample archive used in the tests
@pytest.fixture(scope="function")
def indexer_data(tests_data):
return _IndexerData(tests_data)
# Custom data directory for requests_mock
@pytest.fixture
def datadir():
return os.path.join(os.path.abspath(os.path.dirname(__file__)), "resources")
class _ArchiveData:
"""
Helper class to manage data from a sample test archive.
It is initialized with a reference to an in-memory storage
containing raw tests data.
It is basically a proxy to Storage interface but it overrides some methods
to retrieve those tests data in a json serializable format in order to ease
tests implementation.
"""
def __init__(self, tests_data):
self.storage = tests_data["storage"]
def __getattr__(self, key):
if key == "storage":
raise AttributeError(key)
# Forward calls to non overridden Storage methods to wrapped
# storage instance
return getattr(self.storage, key)
def content_find(self, content: Dict[str, Any]) -> Dict[str, Any]:
cnt_ids_bytes = {
algo_hash: hash_to_bytes(content[algo_hash])
for algo_hash in ALGORITHMS
if content.get(algo_hash)
}
cnt = self.storage.content_find(cnt_ids_bytes)
return converters.from_content(cnt[0].to_dict()) if cnt else cnt
def content_get(self, cnt_id: str) -> Dict[str, Any]:
cnt_id_bytes = hash_to_bytes(cnt_id)
content = self.storage.content_get([cnt_id_bytes])[0]
if content:
content_d = content.to_dict()
content_d.pop("ctime", None)
else:
content_d = None
return converters.from_swh(
content_d, hashess={"sha1", "sha1_git", "sha256", "blake2s256"}
)
def content_get_data(self, cnt_id: str) -> Optional[Dict[str, Any]]:
cnt_id_bytes = hash_to_bytes(cnt_id)
cnt_data = self.storage.content_get_data(cnt_id_bytes)
if cnt_data is None:
return None
return converters.from_content({"data": cnt_data, "sha1": cnt_id_bytes})
def directory_get(self, dir_id):
return {"id": dir_id, "content": self.directory_ls(dir_id)}
def directory_ls(self, dir_id):
cnt_id_bytes = hash_to_bytes(dir_id)
dir_content = map(
converters.from_directory_entry, self.storage.directory_ls(cnt_id_bytes)
)
return list(dir_content)
def release_get(self, rel_id: str) -> Optional[Dict[str, Any]]:
rel_id_bytes = hash_to_bytes(rel_id)
rel_data = self.storage.release_get([rel_id_bytes])[0]
return converters.from_release(rel_data) if rel_data else None
def revision_get(self, rev_id: str) -> Optional[Dict[str, Any]]:
rev_id_bytes = hash_to_bytes(rev_id)
rev_data = self.storage.revision_get([rev_id_bytes])[0]
return converters.from_revision(rev_data) if rev_data else None
def revision_log(self, rev_id, limit=None):
rev_id_bytes = hash_to_bytes(rev_id)
return list(
map(
converters.from_revision,
self.storage.revision_log([rev_id_bytes], limit=limit),
)
)
def snapshot_get_latest(self, origin_url):
snp = snapshot_get_latest(self.storage, origin_url)
return converters.from_snapshot(snp.to_dict())
def origin_get(self, origin_urls):
origins = self.storage.origin_get(origin_urls)
return [converters.from_origin(o.to_dict()) for o in origins]
def origin_visit_get(self, origin_url):
next_page_token = None
visits = []
while True:
visit_page = self.storage.origin_visit_get(
origin_url, page_token=next_page_token
)
next_page_token = visit_page.next_page_token
for visit in visit_page.results:
visit_status = self.storage.origin_visit_status_get_latest(
origin_url, visit.visit
)
visits.append(
converters.from_origin_visit(
{**visit_status.to_dict(), "type": visit.type}
)
)
if not next_page_token:
break
return visits
def origin_visit_get_by(self, origin_url: str, visit_id: int) -> OriginVisitInfo:
visit = self.storage.origin_visit_get_by(origin_url, visit_id)
assert visit is not None
visit_status = self.storage.origin_visit_status_get_latest(origin_url, visit_id)
assert visit_status is not None
return converters.from_origin_visit(
{**visit_status.to_dict(), "type": visit.type}
)
def origin_visit_status_get_latest(
self,
origin_url,
type: Optional[str] = None,
allowed_statuses: Optional[List[str]] = None,
require_snapshot: bool = False,
):
visit_status = origin_get_latest_visit_status(
self.storage,
origin_url,
type=type,
allowed_statuses=allowed_statuses,
require_snapshot=require_snapshot,
)
return (
converters.from_origin_visit(visit_status.to_dict())
if visit_status
else None
)
def snapshot_get(self, snapshot_id):
snp = snapshot_get_all_branches(self.storage, hash_to_bytes(snapshot_id))
return converters.from_snapshot(snp.to_dict())
def snapshot_get_branches(
self, snapshot_id, branches_from="", branches_count=1000, target_types=None
):
partial_branches = self.storage.snapshot_get_branches(
hash_to_bytes(snapshot_id),
branches_from.encode(),
branches_count,
target_types,
)
return converters.from_partial_branches(partial_branches)
def snapshot_get_head(self, snapshot):
if snapshot["branches"]["HEAD"]["target_type"] == "alias":
target = snapshot["branches"]["HEAD"]["target"]
head = snapshot["branches"][target]["target"]
else:
head = snapshot["branches"]["HEAD"]["target"]
return head
def snapshot_count_branches(self, snapshot_id):
counts = dict.fromkeys(("alias", "release", "revision"), 0)
counts.update(self.storage.snapshot_count_branches(hash_to_bytes(snapshot_id)))
counts.pop(None, None)
return counts
class _IndexerData:
"""
Helper class to manage indexer tests data
It is initialized with a reference to an in-memory indexer storage
containing raw tests data.
It also defines class methods to retrieve those tests data in
a json serializable format in order to ease tests implementation.
"""
def __init__(self, tests_data):
self.idx_storage = tests_data["idx_storage"]
self.mimetype_indexer = tests_data["mimetype_indexer"]
self.license_indexer = tests_data["license_indexer"]
self.ctags_indexer = tests_data["ctags_indexer"]
def content_add_mimetype(self, cnt_id):
self.mimetype_indexer.run([hash_to_bytes(cnt_id)])
def content_get_mimetype(self, cnt_id):
mimetype = self.idx_storage.content_mimetype_get([hash_to_bytes(cnt_id)])[
0
].to_dict()
return converters.from_filetype(mimetype)
def content_add_license(self, cnt_id):
self.license_indexer.run([hash_to_bytes(cnt_id)])
def content_get_license(self, cnt_id):
cnt_id_bytes = hash_to_bytes(cnt_id)
licenses = self.idx_storage.content_fossology_license_get([cnt_id_bytes])
for license in licenses:
yield converters.from_swh(license.to_dict(), hashess={"id"})
def content_add_ctags(self, cnt_id):
self.ctags_indexer.run([hash_to_bytes(cnt_id)])
def content_get_ctags(self, cnt_id):
cnt_id_bytes = hash_to_bytes(cnt_id)
ctags = self.idx_storage.content_ctags_get([cnt_id_bytes])
for ctag in ctags:
yield converters.from_swh(ctag, hashess={"id"})
@pytest.fixture
def keycloak_oidc(keycloak_oidc, mocker):
keycloak_config = get_config()["keycloak"]
keycloak_oidc.server_url = keycloak_config["server_url"]
keycloak_oidc.realm_name = keycloak_config["realm_name"]
keycloak_oidc.client_id = OIDC_SWH_WEB_CLIENT_ID
keycloak_oidc_client = mocker.patch("swh.web.auth.views.keycloak_oidc_client")
keycloak_oidc_client.return_value = keycloak_oidc
return keycloak_oidc
@pytest.fixture
def subtest(request):
"""A hack to explicitly set up and tear down fixtures.
This fixture allows you to set up and tear down fixtures within the test
function itself. This is useful (necessary!) for using Hypothesis inside
pytest, as hypothesis will call the test function multiple times, without
setting up or tearing down fixture state as it is normally the case.
Copied from the pytest-subtesthack project, public domain license
(https://github.com/untitaker/pytest-subtesthack).
"""
parent_test = request.node
def inner(func):
if hasattr(Function, "from_parent"):
item = Function.from_parent(
parent_test,
name=request.function.__name__ + "[]",
originalname=request.function.__name__,
callobj=func,
)
else:
item = Function(
name=request.function.__name__ + "[]", parent=parent_test, callobj=func
)
nextitem = parent_test # prevents pytest from tearing down module fixtures
item.ihook.pytest_runtest_setup(item=item)
item.ihook.pytest_runtest_call(item=item)
item.ihook.pytest_runtest_teardown(item=item, nextitem=nextitem)
return inner
+
+
+@pytest.fixture
+def swh_scheduler(swh_scheduler):
+ config = get_config()
+ scheduler = config["scheduler"]
+ config["scheduler"] = swh_scheduler
+ # create load-git and load-hg task types
+ for task_type in TASK_TYPES.values():
+ swh_scheduler.create_task_type(task_type)
+ # create load-svn task type
+ swh_scheduler.create_task_type(
+ {
+ "type": "load-svn",
+ "description": "Update a mercurial repository",
+ "backend_name": "swh.loader.svn.tasks.DumpMountAndLoadSvnRepository",
+ "default_interval": timedelta(days=64),
+ "min_interval": timedelta(hours=12),
+ "max_interval": timedelta(days=64),
+ "backoff_factor": 2,
+ "max_queue_length": None,
+ "num_retries": 7,
+ "retry_delay": timedelta(hours=2),
+ }
+ )
+
+ # add method to add load-archive-files task type during tests
+ def add_load_archive_task_type():
+ swh_scheduler.create_task_type(
+ {
+ "type": "load-archive-files",
+ "description": "Load tarballs",
+ "backend_name": "swh.loader.package.archive.tasks.LoadArchive",
+ "default_interval": timedelta(days=64),
+ "min_interval": timedelta(hours=12),
+ "max_interval": timedelta(days=64),
+ "backoff_factor": 2,
+ "max_queue_length": None,
+ "num_retries": 7,
+ "retry_delay": timedelta(hours=2),
+ }
+ )
+
+ swh_scheduler.add_load_archive_task_type = add_load_archive_task_type
+
+ yield swh_scheduler
+ config["scheduler"] = scheduler
+ get_scheduler_load_task_types.cache_clear()
diff --git a/swh/web/tests/misc/test_metrics.py b/swh/web/tests/misc/test_metrics.py
index 8b39b5b1..995ed451 100644
--- a/swh/web/tests/misc/test_metrics.py
+++ b/swh/web/tests/misc/test_metrics.py
@@ -1,131 +1,131 @@
# Copyright (C) 2019-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import timedelta
from itertools import product
import random
from prometheus_client.exposition import CONTENT_TYPE_LATEST
import pytest
from swh.web.common.models import (
SAVE_REQUEST_ACCEPTED,
SAVE_REQUEST_PENDING,
SAVE_REQUEST_REJECTED,
SAVE_TASK_FAILED,
SAVE_TASK_NOT_CREATED,
SAVE_TASK_NOT_YET_SCHEDULED,
SAVE_TASK_RUNNING,
SAVE_TASK_SCHEDULED,
SAVE_TASK_SUCCEEDED,
SaveOriginRequest,
)
from swh.web.common.origin_save import (
ACCEPTED_SAVE_REQUESTS_DELAY_METRIC,
ACCEPTED_SAVE_REQUESTS_METRIC,
SUBMITTED_SAVE_REQUESTS_METRIC,
get_savable_visit_types,
)
from swh.web.common.utils import reverse
from swh.web.tests.django_asserts import assert_contains
from swh.web.tests.utils import check_http_get_response
@pytest.mark.django_db
-def test_origin_save_metrics(client):
+def test_origin_save_metrics(client, swh_scheduler):
visit_types = get_savable_visit_types()
request_statuses = (
SAVE_REQUEST_ACCEPTED,
SAVE_REQUEST_REJECTED,
SAVE_REQUEST_PENDING,
)
load_task_statuses = (
SAVE_TASK_NOT_CREATED,
SAVE_TASK_NOT_YET_SCHEDULED,
SAVE_TASK_SCHEDULED,
SAVE_TASK_SUCCEEDED,
SAVE_TASK_FAILED,
SAVE_TASK_RUNNING,
)
for _ in range(random.randint(50, 100)):
visit_type = random.choice(visit_types)
request_satus = random.choice(request_statuses)
load_task_status = random.choice(load_task_statuses)
sor = SaveOriginRequest.objects.create(
origin_url="origin",
visit_type=visit_type,
status=request_satus,
loading_task_status=load_task_status,
)
if load_task_status in (SAVE_TASK_SUCCEEDED, SAVE_TASK_FAILED):
delay = random.choice(range(60))
sor.visit_date = sor.request_date + timedelta(seconds=delay)
sor.save()
# Note that this injects dates in the future for the sake of the test only
url = reverse("metrics-prometheus")
resp = check_http_get_response(
client, url, status_code=200, content_type=CONTENT_TYPE_LATEST
)
accepted_requests = SaveOriginRequest.objects.filter(status=SAVE_REQUEST_ACCEPTED)
labels_set = product(visit_types, load_task_statuses)
for labels in labels_set:
sor_count = accepted_requests.filter(
visit_type=labels[0], loading_task_status=labels[1]
).count()
metric_text = (
f"{ACCEPTED_SAVE_REQUESTS_METRIC}{{"
f'load_task_status="{labels[1]}",'
f'visit_type="{labels[0]}"}} {float(sor_count)}\n'
)
assert_contains(resp, metric_text)
labels_set = product(visit_types, request_statuses)
for labels in labels_set:
sor_count = SaveOriginRequest.objects.filter(
visit_type=labels[0], status=labels[1]
).count()
metric_text = (
f"{SUBMITTED_SAVE_REQUESTS_METRIC}{{"
f'status="{labels[1]}",'
f'visit_type="{labels[0]}"}} {float(sor_count)}\n'
)
assert_contains(resp, metric_text)
# delay metrics
save_requests = SaveOriginRequest.objects.all()
labels_set = product(visit_types, (SAVE_TASK_SUCCEEDED, SAVE_TASK_FAILED,))
for labels in labels_set:
sors = save_requests.filter(
visit_type=labels[0],
loading_task_status=labels[1],
visit_date__isnull=False,
)
delay = 0
for sor in sors:
delay += sor.visit_date.timestamp() - sor.request_date.timestamp()
metric_delay_text = (
f"{ACCEPTED_SAVE_REQUESTS_DELAY_METRIC}{{"
f'load_task_status="{labels[1]}",'
f'visit_type="{labels[0]}"}} {float(delay)}\n'
)
assert_contains(resp, metric_delay_text)
diff --git a/swh/web/tests/misc/test_origin_save.py b/swh/web/tests/misc/test_origin_save.py
index 6eaa4787..b357a138 100644
--- a/swh/web/tests/misc/test_origin_save.py
+++ b/swh/web/tests/misc/test_origin_save.py
@@ -1,180 +1,153 @@
# Copyright (C) 2019-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from datetime import datetime, timedelta, timezone
import json
import pytest
from swh.auth.django.utils import oidc_user_from_profile
-from swh.web.auth.utils import SWH_AMBASSADOR_PERMISSION
from swh.web.common.models import SaveOriginRequest
from swh.web.common.origin_save import SAVE_REQUEST_ACCEPTED, SAVE_TASK_SUCCEEDED
from swh.web.common.utils import reverse
from swh.web.tests.utils import check_http_get_response
VISIT_TYPES = ("git", "svn", "hg")
PRIVILEGED_VISIT_TYPES = tuple(list(VISIT_TYPES) + ["archives"])
def test_old_save_url_redirection(client):
url = reverse("browse-origin-save")
redirect_url = reverse("origin-save")
resp = check_http_get_response(client, url, status_code=302)
assert resp["location"] == redirect_url
-def test_save_types_list_default(client):
- """Unprivileged listing should display default list of visit types.
-
- """
- url = reverse("origin-save-types-list")
- resp = check_http_get_response(client, url, status_code=200)
-
- actual_response = resp.json()
- assert set(actual_response) == set(VISIT_TYPES)
-
-
-@pytest.mark.django_db
-def test_save_types_list_privileged(client, keycloak_oidc):
- """Privileged listing should display all visit types.
-
- """
- keycloak_oidc.realm_permissions = [SWH_AMBASSADOR_PERMISSION]
- client.login(code="", code_verifier="", redirect_uri="")
-
- url = reverse("origin-save-types-list")
- resp = check_http_get_response(client, url, status_code=200)
-
- actual_response = resp.json()
- assert set(actual_response) == set(PRIVILEGED_VISIT_TYPES)
-
-
@pytest.mark.django_db
def test_save_origin_requests_list(client, mocker):
nb_origins_per_type = 10
for visit_type in VISIT_TYPES:
for i in range(nb_origins_per_type):
SaveOriginRequest.objects.create(
request_date=datetime.now(tz=timezone.utc),
visit_type=visit_type,
origin_url=f"https://{visit_type}.example.org/project{i}",
status=SAVE_REQUEST_ACCEPTED,
visit_date=datetime.now(tz=timezone.utc) + timedelta(hours=1),
loading_task_id=i,
loading_task_status=SAVE_TASK_SUCCEEDED,
)
mock_scheduler = mocker.patch("swh.web.common.origin_save.scheduler")
mock_scheduler.get_tasks.return_value = []
mock_scheduler.get_task_runs.return_value = []
# retrieve all save requests in 3 pages, sorted in descending order
# of request creation
for i, visit_type in enumerate(reversed(VISIT_TYPES)):
url = reverse(
"origin-save-requests-list",
url_args={"status": "all"},
query_params={
"draw": i + 1,
"search[value]": "",
"order[0][column]": "0",
"columns[0][name]": "request_date",
"order[0][dir]": "desc",
"length": nb_origins_per_type,
"start": i * nb_origins_per_type,
},
)
resp = check_http_get_response(
client, url, status_code=200, content_type="application/json"
)
sors = json.loads(resp.content.decode("utf-8"))
assert sors["draw"] == i + 1
assert sors["recordsFiltered"] == len(VISIT_TYPES) * nb_origins_per_type
assert sors["recordsTotal"] == len(VISIT_TYPES) * nb_origins_per_type
assert len(sors["data"]) == nb_origins_per_type
assert all(d["visit_type"] == visit_type for d in sors["data"])
# retrieve save requests filtered by visit type in a single page
for i, visit_type in enumerate(reversed(VISIT_TYPES)):
url = reverse(
"origin-save-requests-list",
url_args={"status": "all"},
query_params={
"draw": i + 1,
"search[value]": visit_type,
"order[0][column]": "0",
"columns[0][name]": "request_date",
"order[0][dir]": "desc",
"length": nb_origins_per_type,
"start": 0,
},
)
resp = check_http_get_response(
client, url, status_code=200, content_type="application/json"
)
sors = json.loads(resp.content.decode("utf-8"))
assert sors["draw"] == i + 1
assert sors["recordsFiltered"] == nb_origins_per_type
assert sors["recordsTotal"] == len(VISIT_TYPES) * nb_origins_per_type
assert len(sors["data"]) == nb_origins_per_type
assert all(d["visit_type"] == visit_type for d in sors["data"])
@pytest.mark.django_db
def test_save_origin_requests_list_user_filter(client, mocker, keycloak_oidc):
# anonymous user created a save request
sor = SaveOriginRequest.objects.create(
request_date=datetime.now(tz=timezone.utc),
visit_type="svn",
origin_url="https://svn.example.org/user/project",
status=SAVE_REQUEST_ACCEPTED,
visit_date=datetime.now(tz=timezone.utc) + timedelta(hours=1),
loading_task_id=1,
loading_task_status=SAVE_TASK_SUCCEEDED,
)
# authenticated user created a save request
user = oidc_user_from_profile(keycloak_oidc, keycloak_oidc.login())
client.login(code="", code_verifier="", redirect_uri="")
sor = SaveOriginRequest.objects.create(
request_date=datetime.now(tz=timezone.utc),
visit_type="git",
origin_url="https://git.example.org/user/project",
status=SAVE_REQUEST_ACCEPTED,
visit_date=datetime.now(tz=timezone.utc) + timedelta(hours=1),
loading_task_id=2,
loading_task_status=SAVE_TASK_SUCCEEDED,
user_ids=f'"{user.id}"',
)
# filter save requests according to user id
url = reverse(
"origin-save-requests-list",
url_args={"status": "all"},
query_params={
"draw": 1,
"search[value]": "",
"order[0][column]": "0",
"columns[0][name]": "request_date",
"order[0][dir]": "desc",
"length": 10,
"start": "0",
"user_requests_only": "1",
},
)
resp = check_http_get_response(
client, url, status_code=200, content_type="application/json"
)
sors = json.loads(resp.content.decode("utf-8"))
assert sors["recordsFiltered"] == 1
assert sors["recordsTotal"] == 2
assert sors["data"][0] == sor.to_dict()