diff --git a/swh/deposit/tests/api/test_collection_post_binary.py b/swh/deposit/tests/api/test_collection_post_binary.py index c1bedfbf..a6a6b501 100644 --- a/swh/deposit/tests/api/test_collection_post_binary.py +++ b/swh/deposit/tests/api/test_collection_post_binary.py @@ -1,382 +1,337 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Tests the handling of the binary content when doing a POST Col-IRI.""" from io import BytesIO import uuid from django.urls import reverse import pytest from rest_framework import status from swh.deposit.config import COL_IRI, DEPOSIT_STATUS_DEPOSITED from swh.deposit.models import Deposit, DepositRequest from swh.deposit.parsers import parse_xml -from swh.deposit.tests.common import check_archive, create_arborescence_archive +from swh.deposit.tests.common import ( + check_archive, + create_arborescence_archive, + post_archive, +) def test_post_deposit_binary_no_slug( authenticated_client, deposit_collection, sample_archive, deposit_user, mocker ): """Posting a binary deposit without slug header should generate one """ id_ = str(uuid.uuid4()) mocker.patch("uuid.uuid4", return_value=id_) url = reverse(COL_IRI, args=[deposit_collection.name]) # when - response = authenticated_client.post( - url, - content_type="application/zip", # as zip - data=sample_archive["data"], - # + headers - CONTENT_LENGTH=sample_archive["length"], - HTTP_CONTENT_MD5=sample_archive["md5sum"], - HTTP_PACKAGING="http://purl.org/net/sword/package/SimpleZip", - HTTP_IN_PROGRESS="false", - HTTP_CONTENT_DISPOSITION="attachment; filename=filename0", + response = post_archive( + authenticated_client, url, sample_archive, in_progress="false", ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] deposit = Deposit.objects.get(pk=deposit_id) assert deposit.collection == deposit_collection assert deposit.origin_url == deposit_user.provider_url + id_ assert deposit.status == DEPOSIT_STATUS_DEPOSITED def test_post_deposit_binary_support( authenticated_client, deposit_collection, sample_archive ): """Binary upload with content-type not in [zip,x-tar] should return 415 """ # given url = reverse(COL_IRI, args=[deposit_collection.name]) external_id = "some-external-id-1" # when response = authenticated_client.post( url, - content_type="application/octet-stream", - data=sample_archive["data"], - # + headers - CONTENT_LENGTH=sample_archive["length"], + sample_archive, HTTP_SLUG=external_id, - HTTP_CONTENT_MD5=sample_archive["md5sum"], - HTTP_PACKAGING="http://purl.org/net/sword/package/SimpleZip", + content_type="application/octet-stream", HTTP_IN_PROGRESS="false", - HTTP_CONTENT_DISPOSITION="attachment; filename=filename0", ) # then assert response.status_code == status.HTTP_415_UNSUPPORTED_MEDIA_TYPE with pytest.raises(Deposit.DoesNotExist): Deposit.objects.get(external_id=external_id) def test_post_deposit_binary_upload_ok( authenticated_client, deposit_collection, sample_archive ): """Binary upload with correct headers should return 201 with receipt """ # given url = reverse(COL_IRI, args=[deposit_collection.name]) external_id = "some-external-id-1" # when - response = authenticated_client.post( + response = post_archive( + authenticated_client, url, - content_type="application/zip", # as zip - data=sample_archive["data"], - # + headers - CONTENT_LENGTH=sample_archive["length"], - # other headers needs HTTP_ prefix to be taken into account + sample_archive, HTTP_SLUG=external_id, - HTTP_CONTENT_MD5=sample_archive["md5sum"], - HTTP_PACKAGING="http://purl.org/net/sword/package/SimpleZip", HTTP_IN_PROGRESS="false", - HTTP_CONTENT_DISPOSITION="attachment; filename=%s" % (sample_archive["name"],), ) # then response_content = parse_xml(BytesIO(response.content)) assert response.status_code == status.HTTP_201_CREATED deposit_id = response_content["swh:deposit_id"] deposit = Deposit.objects.get(pk=deposit_id) assert deposit.status == DEPOSIT_STATUS_DEPOSITED assert deposit.external_id == external_id assert deposit.collection == deposit_collection assert deposit.swhid is None deposit_request = DepositRequest.objects.get(deposit=deposit) check_archive(sample_archive["name"], deposit_request.archive.name) assert deposit_request.metadata is None assert deposit_request.raw_metadata is None response_content = parse_xml(BytesIO(response.content)) assert response_content["swh:deposit_archive"] == sample_archive["name"] assert int(response_content["swh:deposit_id"]) == deposit.id assert response_content["swh:deposit_status"] == deposit.status # deprecated tags assert response_content["atom:deposit_archive"] == sample_archive["name"] assert int(response_content["atom:deposit_id"]) == deposit.id assert response_content["atom:deposit_status"] == deposit.status edit_iri = reverse("edit_iri", args=[deposit_collection.name, deposit.id]) assert response._headers["location"] == ( "Location", "http://testserver" + edit_iri, ) def test_post_deposit_binary_failure_unsupported_packaging_header( authenticated_client, deposit_collection, sample_archive ): """Bin deposit without supported content_disposition header returns 400 """ # given url = reverse(COL_IRI, args=[deposit_collection.name]) external_id = "some-external-id" # when - response = authenticated_client.post( + response = post_archive( + authenticated_client, url, - content_type="application/zip", - data=sample_archive["data"], - # + headers - CONTENT_LENGTH=sample_archive["length"], + sample_archive, HTTP_SLUG=external_id, - HTTP_CONTENT_MD5=sample_archive["md5sum"], HTTP_PACKAGING="something-unsupported", - HTTP_CONTENT_DISPOSITION="attachment; filename=filename0", ) # then assert response.status_code == status.HTTP_400_BAD_REQUEST assert ( b"The packaging provided something-unsupported is not supported" in response.content ) with pytest.raises(Deposit.DoesNotExist): Deposit.objects.get(external_id=external_id) def test_post_deposit_binary_upload_no_content_disposition_header( authenticated_client, deposit_collection, sample_archive ): """Binary upload without content_disposition header should return 400 """ # given url = reverse(COL_IRI, args=[deposit_collection.name]) external_id = "some-external-id" # when - response = authenticated_client.post( + response = post_archive( + authenticated_client, url, - content_type="application/zip", - data=sample_archive["data"], - # + headers - CONTENT_LENGTH=sample_archive["length"], + sample_archive, HTTP_SLUG=external_id, - HTTP_CONTENT_MD5=sample_archive["md5sum"], - HTTP_PACKAGING="http://purl.org/net/sword/package/SimpleZip", HTTP_IN_PROGRESS="false", + HTTP_CONTENT_DISPOSITION=None, ) # then assert response.status_code == status.HTTP_400_BAD_REQUEST assert b"CONTENT_DISPOSITION header is mandatory" in response.content with pytest.raises(Deposit.DoesNotExist): Deposit.objects.get(external_id=external_id) def test_post_deposit_mediation_not_supported( authenticated_client, deposit_collection, sample_archive ): """Binary upload with mediation should return a 412 response """ # given url = reverse(COL_IRI, args=[deposit_collection.name]) external_id = "some-external-id-1" # when - response = authenticated_client.post( + response = post_archive( + authenticated_client, url, - content_type="application/zip", - data=sample_archive["data"], - # + headers - CONTENT_LENGTH=sample_archive["length"], + sample_archive, HTTP_SLUG=external_id, - HTTP_CONTENT_MD5=sample_archive["md5sum"], - HTTP_PACKAGING="http://purl.org/net/sword/package/SimpleZip", HTTP_IN_PROGRESS="false", HTTP_ON_BEHALF_OF="someone", - HTTP_CONTENT_DISPOSITION="attachment; filename=filename0", ) # then assert response.status_code == status.HTTP_412_PRECONDITION_FAILED with pytest.raises(Deposit.DoesNotExist): Deposit.objects.get(external_id=external_id) def test_post_deposit_binary_upload_fail_if_upload_size_limit_exceeded( authenticated_client, deposit_collection, sample_archive, tmp_path ): """Binary upload must not exceed the limit set up... """ tmp_path = str(tmp_path) url = reverse(COL_IRI, args=[deposit_collection.name]) archive = create_arborescence_archive( tmp_path, "archive2", "file2", b"some content in file", up_to_size=500 ) external_id = "some-external-id" # when - response = authenticated_client.post( + response = post_archive( + authenticated_client, url, - content_type="application/zip", - data=archive["data"], - # + headers - CONTENT_LENGTH=archive["length"], + archive, HTTP_SLUG=external_id, - HTTP_CONTENT_MD5=archive["md5sum"], - HTTP_PACKAGING="http://purl.org/net/sword/package/SimpleZip", HTTP_IN_PROGRESS="false", - HTTP_CONTENT_DISPOSITION="attachment; filename=filename0", ) # then assert response.status_code == status.HTTP_413_REQUEST_ENTITY_TOO_LARGE assert b"Upload size limit exceeded" in response.content with pytest.raises(Deposit.DoesNotExist): Deposit.objects.get(external_id=external_id) def test_post_deposit_binary_upload_fail_if_content_length_missing( authenticated_client, deposit_collection, sample_archive, tmp_path ): """The Content-Length header is mandatory """ tmp_path = str(tmp_path) url = reverse(COL_IRI, args=[deposit_collection.name]) archive = create_arborescence_archive( tmp_path, "archive2", "file2", b"some content in file", up_to_size=500 ) external_id = "some-external-id" # when - response = authenticated_client.post( + response = post_archive( + authenticated_client, url, - content_type="application/zip", - data=archive["data"], - # + headers + archive, CONTENT_LENGTH=None, HTTP_SLUG=external_id, - HTTP_CONTENT_MD5=archive["md5sum"], - HTTP_PACKAGING="http://purl.org/net/sword/package/SimpleZip", HTTP_IN_PROGRESS="false", - HTTP_CONTENT_DISPOSITION="attachment; filename=filename0", ) # then assert response.status_code == status.HTTP_400_BAD_REQUEST assert b"the CONTENT_LENGTH header must be sent." in response.content with pytest.raises(Deposit.DoesNotExist): Deposit.objects.get(external_id=external_id) def test_post_deposit_2_post_2_different_deposits( authenticated_client, deposit_collection, sample_archive ): """2 posting deposits should return 2 different 201 with receipt """ url = reverse(COL_IRI, args=[deposit_collection.name]) # when - response = authenticated_client.post( + response = post_archive( + authenticated_client, url, - content_type="application/zip", # as zip - data=sample_archive["data"], - # + headers - CONTENT_LENGTH=sample_archive["length"], + sample_archive, HTTP_SLUG="some-external-id-1", - HTTP_CONTENT_MD5=sample_archive["md5sum"], - HTTP_PACKAGING="http://purl.org/net/sword/package/SimpleZip", HTTP_IN_PROGRESS="false", - HTTP_CONTENT_DISPOSITION="attachment; filename=filename0", ) # then assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] deposit = Deposit.objects.get(pk=deposit_id) deposits = Deposit.objects.all() assert len(deposits) == 1 assert deposits[0] == deposit # second post - response = authenticated_client.post( + response = post_archive( + authenticated_client, url, - content_type="application/x-tar", # as zip - data=sample_archive["data"], - # + headers - CONTENT_LENGTH=sample_archive["length"], + sample_archive, + content_type="application/x-tar", HTTP_SLUG="another-external-id", - HTTP_CONTENT_MD5=sample_archive["md5sum"], - HTTP_PACKAGING="http://purl.org/net/sword/package/SimpleZip", HTTP_IN_PROGRESS="false", - HTTP_CONTENT_DISPOSITION="attachment; filename=filename1", ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id2 = response_content["swh:deposit_id"] deposit2 = Deposit.objects.get(pk=deposit_id2) assert deposit != deposit2 deposits = Deposit.objects.all().order_by("id") assert len(deposits) == 2 assert list(deposits), [deposit == deposit2] diff --git a/swh/deposit/tests/api/test_deposit_update_binary.py b/swh/deposit/tests/api/test_deposit_update_binary.py index 0829d418..bad6ee65 100644 --- a/swh/deposit/tests/api/test_deposit_update_binary.py +++ b/swh/deposit/tests/api/test_deposit_update_binary.py @@ -1,437 +1,406 @@ # Copyright (C) 2017-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Tests updates on EM-IRI""" from io import BytesIO from django.core.files.uploadedfile import InMemoryUploadedFile from django.urls import reverse from rest_framework import status from swh.deposit.config import COL_IRI, DEPOSIT_STATUS_DEPOSITED, EM_IRI, SE_IRI from swh.deposit.models import Deposit, DepositRequest from swh.deposit.parsers import parse_xml -from swh.deposit.tests.common import check_archive, create_arborescence_archive +from swh.deposit.tests.common import ( + check_archive, + create_arborescence_archive, + post_archive, + put_archive, +) def test_post_deposit_binary_and_post_to_add_another_archive( authenticated_client, deposit_collection, sample_archive, tmp_path ): """Updating a deposit should return a 201 with receipt """ tmp_path = str(tmp_path) url = reverse(COL_IRI, args=[deposit_collection.name]) external_id = "some-external-id-1" # when - response = authenticated_client.post( + response = post_archive( + authenticated_client, url, - content_type="application/zip", # as zip - data=sample_archive["data"], - # + headers - CONTENT_LENGTH=sample_archive["length"], + sample_archive, HTTP_SLUG=external_id, - HTTP_CONTENT_MD5=sample_archive["md5sum"], - HTTP_PACKAGING="http://purl.org/net/sword/package/SimpleZip", HTTP_IN_PROGRESS="true", - HTTP_CONTENT_DISPOSITION="attachment; filename=%s" % (sample_archive["name"],), ) # then assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] deposit = Deposit.objects.get(pk=deposit_id) assert deposit.status == "partial" assert deposit.external_id == external_id assert deposit.collection == deposit_collection assert deposit.swhid is None deposit_request = DepositRequest.objects.get(deposit=deposit) assert deposit_request.deposit == deposit assert deposit_request.type == "archive" check_archive(sample_archive["name"], deposit_request.archive.name) # 2nd archive to upload archive2 = create_arborescence_archive( tmp_path, "archive2", "file2", b"some other content in file" ) # uri to update the content update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit_id]) # adding another archive for the deposit and finalizing it - response = authenticated_client.post( - update_uri, - content_type="application/zip", # as zip - data=archive2["data"], - # + headers - CONTENT_LENGTH=archive2["length"], - HTTP_SLUG=external_id, - HTTP_CONTENT_MD5=archive2["md5sum"], - HTTP_PACKAGING="http://purl.org/net/sword/package/SimpleZip", - HTTP_CONTENT_DISPOSITION="attachment; filename=%s" % (archive2["name"]), + response = post_archive( + authenticated_client, update_uri, archive2, HTTP_SLUG=external_id, ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit = Deposit.objects.get(pk=deposit_id) assert deposit.status == DEPOSIT_STATUS_DEPOSITED assert deposit.external_id == external_id assert deposit.collection == deposit_collection assert deposit.swhid is None deposit_requests = list( DepositRequest.objects.filter(deposit=deposit).order_by("id") ) # 2 deposit requests for the same deposit assert len(deposit_requests) == 2 assert deposit_requests[0].deposit == deposit assert deposit_requests[0].type == "archive" check_archive(sample_archive["name"], deposit_requests[0].archive.name) assert deposit_requests[1].deposit == deposit assert deposit_requests[1].type == "archive" check_archive(archive2["name"], deposit_requests[1].archive.name) # only 1 deposit in db deposits = Deposit.objects.all() assert len(deposits) == 1 def test_replace_archive_to_deposit_is_possible( tmp_path, partial_deposit, deposit_collection, authenticated_client, sample_archive, atom_dataset, ): """Replace all archive with another one should return a 204 response """ tmp_path = str(tmp_path) # given deposit = partial_deposit requests = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(list(requests)) == 1 check_archive(sample_archive["name"], requests[0].archive.name) # we have no metadata for that deposit requests = list(DepositRequest.objects.filter(deposit=deposit, type="metadata")) assert len(requests) == 0 response = authenticated_client.post( reverse(SE_IRI, args=[deposit_collection.name, deposit.id]), content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data1"], HTTP_SLUG=deposit.external_id, HTTP_IN_PROGRESS=True, ) requests = list(DepositRequest.objects.filter(deposit=deposit, type="metadata")) assert len(requests) == 1 update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit.id]) external_id = "some-external-id-1" archive2 = create_arborescence_archive( tmp_path, "archive2", "file2", b"some other content in file" ) - response = authenticated_client.put( + response = put_archive( + authenticated_client, update_uri, - content_type="application/zip", # as zip - data=archive2["data"], - # + headers - CONTENT_LENGTH=archive2["length"], + archive2, HTTP_SLUG=external_id, - HTTP_CONTENT_MD5=archive2["md5sum"], - HTTP_PACKAGING="http://purl.org/net/sword/package/SimpleZip", HTTP_IN_PROGRESS="false", - HTTP_CONTENT_DISPOSITION="attachment; filename=%s" % (archive2["name"],), ) assert response.status_code == status.HTTP_204_NO_CONTENT requests = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(list(requests)) == 1 check_archive(archive2["name"], requests[0].archive.name) # check we did not touch the other parts requests = list(DepositRequest.objects.filter(deposit=deposit, type="metadata")) assert len(requests) == 1 def test_add_archive_to_unknown_deposit( authenticated_client, deposit_collection, atom_dataset ): """Adding metadata to unknown deposit should return a 404 response """ unknown_deposit_id = 997 try: Deposit.objects.get(pk=unknown_deposit_id) except Deposit.DoesNotExist: assert True url = reverse(EM_IRI, args=[deposit_collection.name, unknown_deposit_id]) response = authenticated_client.post( url, content_type="application/zip", data=atom_dataset["entry-data1"] ) assert response.status_code == status.HTTP_404_NOT_FOUND response_content = parse_xml(response.content) assert ( "Deposit %s does not exist" % unknown_deposit_id == response_content["sword:error"]["atom:summary"] ) def test_replace_archive_to_unknown_deposit( authenticated_client, deposit_collection, atom_dataset ): """Replacing archive to unknown deposit should return a 404 response """ unknown_deposit_id = 996 try: Deposit.objects.get(pk=unknown_deposit_id) except Deposit.DoesNotExist: assert True url = reverse(EM_IRI, args=[deposit_collection.name, unknown_deposit_id]) response = authenticated_client.put( url, content_type="application/zip", data=atom_dataset["entry-data1"] ) assert response.status_code == status.HTTP_404_NOT_FOUND response_content = parse_xml(response.content) assert ( "Deposit %s does not exist" % unknown_deposit_id == response_content["sword:error"]["atom:summary"] ) def test_add_archive_to_deposit_is_possible( tmp_path, authenticated_client, deposit_collection, partial_deposit_with_metadata, sample_archive, ): """Add another archive to a deposit return a 201 response """ tmp_path = str(tmp_path) deposit = partial_deposit_with_metadata requests = DepositRequest.objects.filter(deposit=deposit, type="archive") assert len(requests) == 1 check_archive(sample_archive["name"], requests[0].archive.name) requests_meta0 = DepositRequest.objects.filter(deposit=deposit, type="metadata") assert len(requests_meta0) == 1 update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit.id]) external_id = "some-external-id-1" archive2 = create_arborescence_archive( tmp_path, "archive2", "file2", b"some other content in file" ) - response = authenticated_client.post( + response = post_archive( + authenticated_client, update_uri, - content_type="application/zip", # as zip - data=archive2["data"], - # + headers - CONTENT_LENGTH=archive2["length"], + archive2, HTTP_SLUG=external_id, - HTTP_CONTENT_MD5=archive2["md5sum"], - HTTP_PACKAGING="http://purl.org/net/sword/package/SimpleZip", HTTP_IN_PROGRESS="false", - HTTP_CONTENT_DISPOSITION="attachment; filename=%s" % (archive2["name"],), ) assert response.status_code == status.HTTP_201_CREATED requests = DepositRequest.objects.filter(deposit=deposit, type="archive").order_by( "id" ) assert len(requests) == 2 # first archive still exists check_archive(sample_archive["name"], requests[0].archive.name) # a new one was added check_archive(archive2["name"], requests[1].archive.name) # check we did not touch the other parts requests_meta1 = DepositRequest.objects.filter(deposit=deposit, type="metadata") assert len(requests_meta1) == 1 assert set(requests_meta0) == set(requests_meta1) def test_post_deposit_then_update_refused( authenticated_client, deposit_collection, sample_archive, atom_dataset, tmp_path ): """Updating a deposit with status 'ready' should return a 400 """ tmp_path = str(tmp_path) url = reverse(COL_IRI, args=[deposit_collection.name]) external_id = "some-external-id-1" # when - response = authenticated_client.post( + response = post_archive( + authenticated_client, url, - content_type="application/zip", # as zip - data=sample_archive["data"], - # + headers - CONTENT_LENGTH=sample_archive["length"], + sample_archive, HTTP_SLUG=external_id, - HTTP_CONTENT_MD5=sample_archive["md5sum"], - HTTP_PACKAGING="http://purl.org/net/sword/package/SimpleZip", HTTP_IN_PROGRESS="false", - HTTP_CONTENT_DISPOSITION="attachment; filename=filename0", ) # then assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] deposit = Deposit.objects.get(pk=deposit_id) assert deposit.status == DEPOSIT_STATUS_DEPOSITED assert deposit.external_id == external_id assert deposit.collection == deposit_collection assert deposit.swhid is None deposit_request = DepositRequest.objects.get(deposit=deposit) assert deposit_request.deposit == deposit - check_archive("filename0", deposit_request.archive.name) + check_archive(sample_archive["name"], deposit_request.archive.name) # updating/adding is forbidden # uri to update the content edit_iri = reverse("edit_iri", args=[deposit_collection.name, deposit_id]) se_iri = reverse("se_iri", args=[deposit_collection.name, deposit_id]) em_iri = reverse("em_iri", args=[deposit_collection.name, deposit_id]) # Testing all update/add endpoint should fail # since the status is ready archive2 = create_arborescence_archive( tmp_path, "archive2", "file2", b"some content in file 2" ) # replacing file is no longer possible since the deposit's # status is ready - r = authenticated_client.put( + r = put_archive( + authenticated_client, em_iri, - content_type="application/zip", - data=archive2["data"], - CONTENT_LENGTH=archive2["length"], + archive2, HTTP_SLUG=external_id, - HTTP_CONTENT_MD5=archive2["md5sum"], - HTTP_PACKAGING="http://purl.org/net/sword/package/SimpleZip", HTTP_IN_PROGRESS="false", - HTTP_CONTENT_DISPOSITION="attachment; filename=filename0", ) assert r.status_code == status.HTTP_400_BAD_REQUEST assert b"You can only act on deposit with status 'partial'" in r.content # adding file is no longer possible since the deposit's status # is ready - r = authenticated_client.post( + r = post_archive( + authenticated_client, em_iri, - content_type="application/zip", - data=archive2["data"], - CONTENT_LENGTH=archive2["length"], + archive2, HTTP_SLUG=external_id, - HTTP_CONTENT_MD5=archive2["md5sum"], - HTTP_PACKAGING="http://purl.org/net/sword/package/SimpleZip", HTTP_IN_PROGRESS="false", - HTTP_CONTENT_DISPOSITION="attachment; filename=filename0", ) assert r.status_code == status.HTTP_400_BAD_REQUEST assert b"You can only act on deposit with status 'partial'" in r.content # replacing metadata is no longer possible since the deposit's # status is ready r = authenticated_client.put( edit_iri, content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data-deposit-binary"], CONTENT_LENGTH=len(atom_dataset["entry-data-deposit-binary"]), HTTP_SLUG=external_id, ) assert r.status_code == status.HTTP_400_BAD_REQUEST assert b"You can only act on deposit with status 'partial'" in r.content # adding new metadata is no longer possible since the # deposit's status is ready r = authenticated_client.post( se_iri, content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data-deposit-binary"], CONTENT_LENGTH=len(atom_dataset["entry-data-deposit-binary"]), HTTP_SLUG=external_id, ) assert r.status_code == status.HTTP_400_BAD_REQUEST assert b"You can only act on deposit with status 'partial'" in r.content archive_content = b"some content representing archive" archive = InMemoryUploadedFile( BytesIO(archive_content), field_name="archive0", name="archive0", content_type="application/zip", size=len(archive_content), charset=None, ) atom_entry = InMemoryUploadedFile( BytesIO(atom_dataset["entry-data-deposit-binary"].encode("utf-8")), field_name="atom0", name="atom0", content_type='application/atom+xml; charset="utf-8"', size=len(atom_dataset["entry-data-deposit-binary"]), charset="utf-8", ) # replacing multipart metadata is no longer possible since the # deposit's status is ready r = authenticated_client.put( edit_iri, format="multipart", data={"archive": archive, "atom_entry": atom_entry,}, ) assert r.status_code == status.HTTP_400_BAD_REQUEST assert b"You can only act on deposit with status 'partial'" in r.content # adding new metadata is no longer possible since the # deposit's status is ready r = authenticated_client.post( se_iri, format="multipart", data={"archive": archive, "atom_entry": atom_entry,}, ) assert r.status_code == status.HTTP_400_BAD_REQUEST assert b"You can only act on deposit with status 'partial'" in r.content diff --git a/swh/deposit/tests/common.py b/swh/deposit/tests/common.py index 3b02d7bf..399f6481 100644 --- a/swh/deposit/tests/common.py +++ b/swh/deposit/tests/common.py @@ -1,146 +1,165 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import hashlib import os import re import tarfile import tempfile from swh.core import tarball def compute_info(archive_path): """Given a path, compute information on path. """ with open(archive_path, "rb") as f: length = 0 sha1sum = hashlib.sha1() md5sum = hashlib.md5() data = b"" for chunk in f: sha1sum.update(chunk) md5sum.update(chunk) length += len(chunk) data += chunk return { "dir": os.path.dirname(archive_path), "name": os.path.basename(archive_path), "path": archive_path, "length": length, "sha1sum": sha1sum.hexdigest(), "md5sum": md5sum.hexdigest(), "data": data, } def _compress(path, extension, dir_path): """Compress path according to extension """ if extension == "zip" or extension == "tar": return tarball.compress(path, extension, dir_path) elif "." in extension: split_ext = extension.split(".") if split_ext[0] != "tar": raise ValueError( "Development error, only zip or tar archive supported, " "%s not supported" % extension ) # deal with specific tar mode = split_ext[1] supported_mode = ["xz", "gz", "bz2"] if mode not in supported_mode: raise ValueError( "Development error, only %s supported, %s not supported" % (supported_mode, mode) ) files = tarball._ls(dir_path) with tarfile.open(path, "w:%s" % mode) as t: for fpath, fname in files: t.add(fpath, arcname=fname, recursive=False) return path def create_arborescence_archive( root_path, archive_name, filename, content, up_to_size=None, extension="zip" ): """Build an archive named archive_name in the root_path. This archive contains one file named filename with the content content. Args: root_path (str): Location path of the archive to create archive_name (str): Archive's name (without extension) filename (str): Archive's content is only one filename content (bytes): Content of the filename up_to_size (int | None): Fill in the blanks size to oversize or complete an archive's size extension (str): Extension of the archive to write (default is zip) Returns: dict with the keys: - dir: the directory of that archive - path: full path to the archive - sha1sum: archive's sha1sum - length: archive's length """ os.makedirs(root_path, exist_ok=True) archive_path_dir = tempfile.mkdtemp(dir=root_path) dir_path = os.path.join(archive_path_dir, archive_name) os.mkdir(dir_path) filepath = os.path.join(dir_path, filename) _length = len(content) count = 0 batch_size = 128 with open(filepath, "wb") as f: f.write(content) if up_to_size: # fill with blank content up to a given size count += _length while count < up_to_size: f.write(b"0" * batch_size) count += batch_size _path = "%s.%s" % (dir_path, extension) _path = _compress(_path, extension, dir_path) return compute_info(_path) def create_archive_with_archive(root_path, name, archive): """Create an archive holding another. """ invalid_archive_path = os.path.join(root_path, name) with tarfile.open(invalid_archive_path, "w:gz") as _archive: _archive.add(archive["path"], arcname=archive["name"]) return compute_info(invalid_archive_path) def check_archive(archive_name: str, archive_name_to_check: str): """Helper function to ensure archive_name is present within the archive_name_to_check. Raises: AssertionError if archive_name is not present within archive_name_to_check """ ARCHIVE_FILEPATH_PATTERN = re.compile( r"client_[0-9].*/[0-9]{8}-[0-9]{6}\.[0-9]{6}/[a-zA-Z0-9.].*" ) assert ARCHIVE_FILEPATH_PATTERN.match(archive_name_to_check) if "." in archive_name: filename, extension = archive_name.split(".") pattern = re.compile(".*/%s.*\\.%s" % (filename, extension)) else: pattern = re.compile(".*/%s" % archive_name) assert pattern.match(archive_name_to_check) is not None + + +def _post_or_put_archive(f, url, archive, slug=None, in_progress=None, **kwargs): + default_kwargs = dict( + content_type="application/zip", + CONTENT_LENGTH=archive["length"], + HTTP_CONTENT_DISPOSITION="attachment; filename=%s" % (archive["name"],), + HTTP_PACKAGING="http://purl.org/net/sword/package/SimpleZip", + ) + kwargs = {**default_kwargs, **kwargs} + return f(url, data=archive["data"], HTTP_CONTENT_MD5=archive["md5sum"], **kwargs,) + + +def post_archive(authenticated_client, *args, **kwargs): + return _post_or_put_archive(authenticated_client.post, *args, **kwargs) + + +def put_archive(authenticated_client, *args, **kwargs): + return _post_or_put_archive(authenticated_client.put, *args, **kwargs) diff --git a/swh/deposit/tests/conftest.py b/swh/deposit/tests/conftest.py index 9e420696..d6b27759 100644 --- a/swh/deposit/tests/conftest.py +++ b/swh/deposit/tests/conftest.py @@ -1,480 +1,475 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import base64 from functools import partial from io import BytesIO import os import re from typing import Mapping from django.test.utils import setup_databases # type: ignore from django.urls import reverse import psycopg2 from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT import pytest from rest_framework import status from rest_framework.test import APIClient import yaml from swh.core.config import read from swh.core.pytest_plugin import get_response_cb from swh.deposit.config import ( COL_IRI, DEPOSIT_STATUS_DEPOSITED, DEPOSIT_STATUS_LOAD_FAILURE, DEPOSIT_STATUS_LOAD_SUCCESS, DEPOSIT_STATUS_PARTIAL, DEPOSIT_STATUS_REJECTED, DEPOSIT_STATUS_VERIFIED, SE_IRI, setup_django_for, ) from swh.deposit.parsers import parse_xml -from swh.deposit.tests.common import create_arborescence_archive +from swh.deposit.tests.common import create_arborescence_archive, post_archive from swh.model.identifiers import DIRECTORY, REVISION, SNAPSHOT, swhid from swh.scheduler import get_scheduler # mypy is asked to ignore the import statement above because setup_databases # is not part of the d.t.utils.__all__ variable. TEST_USER = { "username": "test", "password": "password", "email": "test@example.org", "provider_url": "https://hal-test.archives-ouvertes.fr/", "domain": "archives-ouvertes.fr/", "collection": {"name": "test"}, } ANOTHER_TEST_USER = { "username": "test2", "password": "password2", "email": "test@example2.org", "provider_url": "https://hal-test.archives-ouvertes.example/", "domain": "archives-ouvertes.example/", "collection": {"name": "another-collection"}, } def pytest_configure(): setup_django_for("testing") @pytest.fixture def requests_mock_datadir(datadir, requests_mock_datadir): """Override default behavior to deal with put/post methods """ cb = partial(get_response_cb, datadir=datadir) requests_mock_datadir.put(re.compile("https://"), body=cb) requests_mock_datadir.post(re.compile("https://"), body=cb) return requests_mock_datadir @pytest.fixture() def deposit_config(swh_scheduler_config, swh_storage_backend_config): return { "max_upload_size": 500, "extraction_dir": "/tmp/swh-deposit/test/extraction-dir", "checks": False, "scheduler": {"cls": "local", **swh_scheduler_config,}, "storage_metadata": swh_storage_backend_config, } @pytest.fixture() def deposit_config_path(tmp_path, monkeypatch, deposit_config): conf_path = os.path.join(tmp_path, "deposit.yml") with open(conf_path, "w") as f: f.write(yaml.dump(deposit_config)) monkeypatch.setenv("SWH_CONFIG_FILENAME", conf_path) return conf_path @pytest.fixture(autouse=True) def deposit_autoconfig(deposit_config_path): """Enforce config for deposit classes inherited from APIConfig.""" cfg = read(deposit_config_path) if "scheduler" in cfg: # scheduler setup: require the check-deposit and load-deposit tasks scheduler = get_scheduler(**cfg["scheduler"]) task_types = [ { "type": "check-deposit", "backend_name": "swh.deposit.loader.tasks.ChecksDepositTsk", "description": "Check deposit metadata/archive before loading", "num_retries": 3, }, { "type": "load-deposit", "backend_name": "swh.loader.package.deposit.tasks.LoadDeposit", "description": "Loading deposit archive into swh archive", "num_retries": 3, }, ] for task_type in task_types: scheduler.create_task_type(task_type) @pytest.fixture(scope="session") def django_db_setup(request, django_db_blocker, postgresql_proc): from django.conf import settings settings.DATABASES["default"].update( { ("ENGINE", "django.db.backends.postgresql"), ("NAME", "tests"), ("USER", postgresql_proc.user), # noqa ("HOST", postgresql_proc.host), # noqa ("PORT", postgresql_proc.port), # noqa } ) with django_db_blocker.unblock(): setup_databases( verbosity=request.config.option.verbose, interactive=False, keepdb=False ) def execute_sql(sql): """Execute sql to postgres db""" with psycopg2.connect(database="postgres") as conn: conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) cur = conn.cursor() cur.execute(sql) @pytest.fixture(autouse=True, scope="session") def swh_proxy(): """Automatically inject this fixture in all tests to ensure no outside connection takes place. """ os.environ["http_proxy"] = "http://localhost:999" os.environ["https_proxy"] = "http://localhost:999" def create_deposit_collection(collection_name: str): """Create a deposit collection with name collection_name """ from swh.deposit.models import DepositCollection try: collection = DepositCollection._default_manager.get(name=collection_name) except DepositCollection.DoesNotExist: collection = DepositCollection(name=collection_name) collection.save() return collection def deposit_collection_factory(collection_name=TEST_USER["collection"]["name"]): @pytest.fixture def _deposit_collection(db, collection_name=collection_name): return create_deposit_collection(collection_name) return _deposit_collection deposit_collection = deposit_collection_factory() deposit_another_collection = deposit_collection_factory("another-collection") def _create_deposit_user(db, collection, user_data): """Create/Return the test_user "test" """ from swh.deposit.models import DepositClient try: user = DepositClient._default_manager.get(username=user_data["username"]) except DepositClient.DoesNotExist: user = DepositClient._default_manager.create_user( username=user_data["username"], email=user_data["email"], password=user_data["password"], provider_url=user_data["provider_url"], domain=user_data["domain"], ) user.collections = [collection.id] user.save() return user @pytest.fixture def deposit_user(db, deposit_collection): return _create_deposit_user(db, deposit_collection, TEST_USER) @pytest.fixture def deposit_another_user(db, deposit_another_collection): return _create_deposit_user(db, deposit_another_collection, ANOTHER_TEST_USER) @pytest.fixture def client(): """Override pytest-django one which does not work for djangorestframework. """ return APIClient() # <- drf's client def _create_authenticated_client(client, user, user_data): """Returned a logged client This also patched the client instance to keep a reference on the associated deposit_user. """ _token = "%s:%s" % (user.username, user_data["password"]) token = base64.b64encode(_token.encode("utf-8")) authorization = "Basic %s" % token.decode("utf-8") client.credentials(HTTP_AUTHORIZATION=authorization) client.deposit_client = user yield client client.logout() @pytest.fixture def authenticated_client(client, deposit_user): yield from _create_authenticated_client(client, deposit_user, TEST_USER) @pytest.fixture def another_authenticated_client(deposit_another_user): client = APIClient() yield from _create_authenticated_client( client, deposit_another_user, ANOTHER_TEST_USER ) @pytest.fixture def sample_archive(tmp_path): """Returns a sample archive """ tmp_path = str(tmp_path) # pytest version limitation in previous version archive = create_arborescence_archive( tmp_path, "archive1", "file1", b"some content in file" ) return archive @pytest.fixture def atom_dataset(datadir) -> Mapping[str, str]: """Compute the paths to atom files. Returns: Dict of atom name per content (bytes) """ atom_path = os.path.join(datadir, "atom") data = {} for filename in os.listdir(atom_path): filepath = os.path.join(atom_path, filename) with open(filepath, "rb") as f: raw_content = f.read().decode("utf-8") # Keep the filename without extension atom_name = filename.split(".")[0] data[atom_name] = raw_content return data def create_deposit( authenticated_client, collection_name: str, sample_archive, external_id: str, deposit_status=DEPOSIT_STATUS_DEPOSITED, in_progress=False, ): """Create a skeleton shell deposit """ url = reverse(COL_IRI, args=[collection_name]) # when - response = authenticated_client.post( + response = post_archive( + authenticated_client, url, - content_type="application/zip", # as zip - data=sample_archive["data"], - # + headers - CONTENT_LENGTH=sample_archive["length"], + sample_archive, HTTP_SLUG=external_id, - HTTP_CONTENT_MD5=sample_archive["md5sum"], - HTTP_PACKAGING="http://purl.org/net/sword/package/SimpleZip", HTTP_IN_PROGRESS=str(in_progress).lower(), - HTTP_CONTENT_DISPOSITION="attachment; filename=%s" % (sample_archive["name"]), ) # then assert response.status_code == status.HTTP_201_CREATED, response.content.decode() from swh.deposit.models import Deposit response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content["swh:deposit_id"] deposit = Deposit._default_manager.get(id=deposit_id) if deposit.status != deposit_status: deposit.status = deposit_status deposit.save() assert deposit.status == deposit_status return deposit def create_binary_deposit( authenticated_client, collection_name: str, deposit_status: str = DEPOSIT_STATUS_DEPOSITED, atom_dataset: Mapping[str, bytes] = {}, **kwargs, ): """Create a deposit with both metadata and archive set. Then alters its status to `deposit_status`. """ deposit = create_deposit( authenticated_client, collection_name, deposit_status=DEPOSIT_STATUS_PARTIAL, **kwargs, ) origin_url = deposit.client.provider_url + deposit.external_id response = authenticated_client.post( reverse(SE_IRI, args=[collection_name, deposit.id]), content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data0"] % origin_url, HTTP_IN_PROGRESS="true", ) assert response.status_code == status.HTTP_201_CREATED assert deposit.status == DEPOSIT_STATUS_PARTIAL from swh.deposit.models import Deposit deposit = Deposit._default_manager.get(pk=deposit.id) assert deposit.status == deposit_status return deposit def deposit_factory(deposit_status=DEPOSIT_STATUS_DEPOSITED, in_progress=False): """Build deposit with a specific status """ @pytest.fixture() def _deposit( sample_archive, deposit_collection, authenticated_client, deposit_status=deposit_status, ): external_id = "external-id-%s" % deposit_status return create_deposit( authenticated_client, deposit_collection.name, sample_archive, external_id=external_id, deposit_status=deposit_status, in_progress=in_progress, ) return _deposit deposited_deposit = deposit_factory() rejected_deposit = deposit_factory(deposit_status=DEPOSIT_STATUS_REJECTED) partial_deposit = deposit_factory( deposit_status=DEPOSIT_STATUS_PARTIAL, in_progress=True ) verified_deposit = deposit_factory(deposit_status=DEPOSIT_STATUS_VERIFIED) completed_deposit = deposit_factory(deposit_status=DEPOSIT_STATUS_LOAD_SUCCESS) failed_deposit = deposit_factory(deposit_status=DEPOSIT_STATUS_LOAD_FAILURE) @pytest.fixture def partial_deposit_with_metadata( sample_archive, deposit_collection, authenticated_client, atom_dataset ): """Returns deposit with archive and metadata provided, status 'partial' """ return create_binary_deposit( authenticated_client, deposit_collection.name, sample_archive=sample_archive, external_id="external-id-partial", in_progress=True, deposit_status=DEPOSIT_STATUS_PARTIAL, atom_dataset=atom_dataset, ) @pytest.fixture def partial_deposit_only_metadata( deposit_collection, authenticated_client, atom_dataset ): response = authenticated_client.post( reverse(COL_IRI, args=[deposit_collection.name]), content_type="application/atom+xml;type=entry", data=atom_dataset["entry-data1"], HTTP_SLUG="external-id-partial", HTTP_IN_PROGRESS=True, ) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(response.content) deposit_id = response_content["swh:deposit_id"] from swh.deposit.models import Deposit deposit = Deposit._default_manager.get(pk=deposit_id) assert deposit.status == DEPOSIT_STATUS_PARTIAL return deposit @pytest.fixture def complete_deposit(sample_archive, deposit_collection, authenticated_client): """Returns a completed deposit (load success) """ deposit = create_deposit( authenticated_client, deposit_collection.name, sample_archive, external_id="external-id-complete", deposit_status=DEPOSIT_STATUS_LOAD_SUCCESS, ) origin = "https://hal.archives-ouvertes.fr/hal-01727745" directory_id = "42a13fc721c8716ff695d0d62fc851d641f3a12b" revision_id = "548b3c0a2bb43e1fca191e24b5803ff6b3bc7c10" snapshot_id = "e5e82d064a9c3df7464223042e0c55d72ccff7f0" deposit.swhid = swhid(DIRECTORY, directory_id) deposit.swhid_context = swhid( DIRECTORY, directory_id, metadata={ "origin": origin, "visit": swhid(SNAPSHOT, snapshot_id), "anchor": swhid(REVISION, revision_id), "path": "/", }, ) deposit.save() return deposit @pytest.fixture() def tmp_path(tmp_path): return str(tmp_path) # issue with oldstable's pytest version