Changeset View
Changeset View
Standalone View
Standalone View
swh/web/tests/api/views/test_origin_save.py
Show First 20 Lines • Show All 454 Lines • ▼ Show 20 Lines | ): | ||||
response = check_api_post_responses(api_client, url, status_code=200) | response = check_api_post_responses(api_client, url, status_code=200) | ||||
assert response.data["save_request_status"] == SAVE_REQUEST_PENDING | assert response.data["save_request_status"] == SAVE_REQUEST_PENDING | ||||
with pytest.raises(ObjectDoesNotExist): | with pytest.raises(ObjectDoesNotExist): | ||||
SaveAuthorizedOrigin.objects.get(url=origin_to_review) | SaveAuthorizedOrigin.objects.get(url=origin_to_review) | ||||
def test_create_save_request_accepted_ambassador_user( | def test_create_save_request_bundle_with_ambassador_user( | ||||
api_client, origin_to_review, keycloak_oidc, mocker, requests_mock, | |||||
): | |||||
keycloak_oidc.realm_permissions = [SWH_AMBASSADOR_PERMISSION] | |||||
oidc_profile = keycloak_oidc.login() | |||||
api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {oidc_profile['refresh_token']}") | |||||
originUrl = "https://somewhere.org/simple" | |||||
artifact_version = "1.2.3" | |||||
artifact_filename = f"tarball-{artifact_version}.tar.gz" | |||||
artifact_url = f"{originUrl}/{artifact_filename}" | |||||
content_length = "100" | |||||
last_modified = "Sun, 21 Aug 2011 16:26:32 GMT" | |||||
requests_mock.head( | |||||
artifact_url, | |||||
status_code=200, | |||||
headers={"content-length": content_length, "last-modified": last_modified,}, | |||||
) | |||||
mock_scheduler = mocker.patch("swh.web.common.origin_save.scheduler") | |||||
mock_scheduler.get_task_runs.return_value = [] | |||||
mock_scheduler.create_tasks.return_value = [ | |||||
{ | |||||
"id": 10, | |||||
"priority": "high", | |||||
"policy": "oneshot", | |||||
"status": "next_run_not_scheduled", | |||||
"type": "load-archive-files", | |||||
"arguments": { | |||||
"args": [], | |||||
"kwargs": { | |||||
"url": originUrl, | |||||
"artifacts": [ | |||||
{ | |||||
"url": artifact_url, | |||||
"filename": artifact_filename, | |||||
"version": artifact_version, | |||||
"time": last_modified, | |||||
"length": content_length, | |||||
} | |||||
], | |||||
}, | |||||
}, | |||||
}, | |||||
] | |||||
# then | |||||
url = reverse( | |||||
"api-1-save-origin", | |||||
url_args={"visit_type": "bundle", "origin_url": originUrl,}, | |||||
) | |||||
response = check_api_post_response( | |||||
api_client, | |||||
url, | |||||
status_code=200, | |||||
data={ | |||||
"artifact_url": artifact_url, | |||||
"artifact_filename": artifact_filename, | |||||
"artifact_version": artifact_version, | |||||
}, | |||||
) | |||||
assert response.data["save_request_status"] == SAVE_REQUEST_ACCEPTED | |||||
assert SaveAuthorizedOrigin.objects.get(url=originUrl) | |||||
def test_create_save_request_bundle_accepted_ambassador_user( | |||||
api_client, origin_to_review, keycloak_oidc, mocker | api_client, origin_to_review, keycloak_oidc, mocker | ||||
): | ): | ||||
keycloak_oidc.realm_permissions = [SWH_AMBASSADOR_PERMISSION] | keycloak_oidc.realm_permissions = [SWH_AMBASSADOR_PERMISSION] | ||||
oidc_profile = keycloak_oidc.login() | oidc_profile = keycloak_oidc.login() | ||||
api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {oidc_profile['refresh_token']}") | api_client.credentials(HTTP_AUTHORIZATION=f"Bearer {oidc_profile['refresh_token']}") | ||||
check_created_save_request_status( | check_created_save_request_status( | ||||
▲ Show 20 Lines • Show All 59 Lines • Show Last 20 Lines |