diff --git a/MANIFEST.in b/MANIFEST.in --- a/MANIFEST.in +++ b/MANIFEST.in @@ -8,3 +8,4 @@ recursive-include swh/deposit/static * recursive-include swh/deposit/fixtures * recursive-include swh/deposit/templates * +recursive-include swh/deposit/tests/*/data * diff --git a/Makefile.local b/Makefile.local --- a/Makefile.local +++ b/Makefile.local @@ -25,6 +25,3 @@ run: gunicorn3 -b 127.0.0.1:5006 swh.deposit.wsgi - -test: - ./swh/deposit/manage.py test diff --git a/pytest.ini b/pytest.ini --- a/pytest.ini +++ b/pytest.ini @@ -1,3 +1,7 @@ [pytest] norecursedirs = docs DJANGO_SETTINGS_MODULE = swh.deposit.settings.testing + +markers = + db: execute tests using a postgresql database + fs: execute tests using the filesystem diff --git a/requirements-test.txt b/requirements-test.txt --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,3 +1,5 @@ pytest<4 pytest-django swh.scheduler[testing] +pytest-postgresql >= 2.1.0 + diff --git a/swh/deposit/loader/tasks.py b/swh/deposit/loader/tasks.py --- a/swh/deposit/loader/tasks.py +++ b/swh/deposit/loader/tasks.py @@ -1,15 +1,15 @@ -# Copyright (C) 2015-2018 The Software Heritage developers +# Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from celery import current_app as app +from celery import shared_task from swh.deposit.loader.loader import DepositLoader from swh.deposit.loader.checker import DepositChecker -@app.task(name=__name__ + '.LoadDepositArchiveTsk') +@shared_task(name=__name__ + '.LoadDepositArchiveTsk') def load_deposit_archive(archive_url, deposit_meta_url, deposit_update_url): """Deposit archive loading task described by the following steps: @@ -27,7 +27,7 @@ deposit_update_url=deposit_update_url) -@app.task(name=__name__ + '.ChecksDepositTsk') +@shared_task(name=__name__ + '.ChecksDepositTsk') def check_deposit(deposit_check_url): """Check a deposit's status diff --git a/swh/deposit/settings/development.py b/swh/deposit/settings/development.py --- a/swh/deposit/settings/development.py +++ b/swh/deposit/settings/development.py @@ -50,7 +50,7 @@ DATABASES = { 'default': { 'ENGINE': 'django.db.backends.postgresql', - 'NAME': 'swh-deposit-dev', + 'NAME': 'swh-deposit-dev', # this is no longer used in test env } } diff --git a/swh/deposit/tests/api/conftest.py b/swh/deposit/tests/api/conftest.py new file mode 100644 --- /dev/null +++ b/swh/deposit/tests/api/conftest.py @@ -0,0 +1,108 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import hashlib +import pytest + +from django.urls import reverse +from os import path, listdir +from typing import Mapping + +from swh.deposit.config import ( + DEPOSIT_STATUS_DEPOSITED, COL_IRI, DEPOSIT_STATUS_VERIFIED +) +from swh.deposit.models import Deposit +from swh.deposit.parsers import parse_xml + +from swh.deposit.api.private.deposit_check import SWHChecksDeposit + + +@pytest.fixture +def atom_dataset(datadir) -> Mapping[str, bytes]: + """Compute the paths to atom files. + + Returns: + Dict of atom name per content (bytes) + + """ + atom_path = path.join(datadir, 'atom') + data = {} + for filename in listdir(atom_path): + filepath = path.join(atom_path, filename) + with open(filepath, 'rb') as f: + raw_content = f.read() + + # Keep the filename without extension + atom_name = filename.split('.')[0] + data[atom_name] = raw_content + + return data + + +@pytest.fixture +def ready_deposit_ok(partial_deposit_with_metadata): + """Returns a deposit ready for checks (it will pass the checks). + + """ + deposit = partial_deposit_with_metadata + deposit.status = DEPOSIT_STATUS_DEPOSITED + deposit.save() + return deposit + + +@pytest.fixture +def ready_deposit_verified(partial_deposit_with_metadata): + """Returns a deposit ready for checks (it will pass the checks). + + """ + deposit = partial_deposit_with_metadata + deposit.status = DEPOSIT_STATUS_VERIFIED + deposit.save() + return deposit + + +@pytest.fixture +def ready_deposit_only_metadata(partial_deposit_only_metadata): + """Deposit in status ready that will fail the checks (because missing + archive). + + """ + deposit = partial_deposit_only_metadata + deposit.status = DEPOSIT_STATUS_DEPOSITED + deposit.save() + return deposit + + +@pytest.fixture +def ready_deposit_invalid_archive(authenticated_client, deposit_collection): + url = reverse(COL_IRI, args=[deposit_collection.name]) + + data = b'some data which is clearly not a zip file' + md5sum = hashlib.md5(data).hexdigest() + + # when + response = authenticated_client.post( + url, + content_type='application/zip', # as zip + data=data, + # + headers + CONTENT_LENGTH=len(data), + # other headers needs HTTP_ prefix to be taken into account + HTTP_SLUG='external-id-invalid', + HTTP_CONTENT_MD5=md5sum, + HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', + HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') + + response_content = parse_xml(response.content) + deposit_id = int(response_content['deposit_id']) + deposit = Deposit.objects.get(pk=deposit_id) + deposit.status = DEPOSIT_STATUS_DEPOSITED + deposit.save() + return deposit + + +@pytest.fixture +def swh_checks_deposit(): + return SWHChecksDeposit() diff --git a/swh/deposit/tests/api/data/atom/codemeta-sample.xml b/swh/deposit/tests/api/data/atom/codemeta-sample.xml new file mode 100644 --- /dev/null +++ b/swh/deposit/tests/api/data/atom/codemeta-sample.xml @@ -0,0 +1,51 @@ + + + %s + hal-01587361 + https://hal.inria.fr/hal-01587361 + https://hal.inria.fr/hal-01587361/document + https://hal.inria.fr/hal-01587361/file/AffectationRO-v1.0.0.zip + doi:10.5281/zenodo.438684 + The assignment problem + AffectationRO + Gruenpeter, Morane + [INFO] Computer Science [cs] + [INFO.INFO-RO] Computer Science [cs]/Operations Research [cs.RO] + SOFTWARE + Project in OR: The assignment problemA java implementation for the assignment problem first release + description fr + 2015-06-01 + 2017-10-19 + en + + + url stable + Version sur hal + Version entre par lutilisateur + Mots-cls + Commentaire + Rfrence interne + + Collaboration/Projet + nom du projet + id + + Voir aussi + Financement + Projet ANR + Projet Europen + Platform/OS + Dpendances + Etat du dveloppement + + license + url spdx + + Outils de dveloppement- outil no1 + Outils de dveloppement- outil no2 + http://code.com + language 1 + language 2 + diff --git a/swh/deposit/tests/api/data/atom/entry-data-badly-formatted.xml b/swh/deposit/tests/api/data/atom/entry-data-badly-formatted.xml new file mode 100644 --- /dev/null +++ b/swh/deposit/tests/api/data/atom/entry-data-badly-formatted.xml @@ -0,0 +1,2 @@ + + diff --git a/swh/deposit/tests/api/data/atom/entry-data-deposit-binary.xml b/swh/deposit/tests/api/data/atom/entry-data-deposit-binary.xml new file mode 100644 --- /dev/null +++ b/swh/deposit/tests/api/data/atom/entry-data-deposit-binary.xml @@ -0,0 +1,29 @@ + + + Title + urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a + 2005-10-07T17:17:08Z + Contributor + The abstract + + + The abstract + Access Rights + Alternative Title + Date Available + Bibliographic Citation # noqa + Contributor + Description + Has Part + Has Version + Identifier + Is Part Of + Publisher + References + Rights Holder + Source + Title + Type + + diff --git a/swh/deposit/tests/api/data/atom/entry-data-empty-body.xml b/swh/deposit/tests/api/data/atom/entry-data-empty-body.xml new file mode 100644 --- /dev/null +++ b/swh/deposit/tests/api/data/atom/entry-data-empty-body.xml @@ -0,0 +1,2 @@ + + diff --git a/swh/deposit/tests/api/data/atom/entry-data-ko.xml b/swh/deposit/tests/api/data/atom/entry-data-ko.xml new file mode 100644 --- /dev/null +++ b/swh/deposit/tests/api/data/atom/entry-data-ko.xml @@ -0,0 +1,6 @@ + + + + urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a + diff --git a/swh/deposit/tests/api/data/atom/entry-data-minimal.xml b/swh/deposit/tests/api/data/atom/entry-data-minimal.xml new file mode 100644 --- /dev/null +++ b/swh/deposit/tests/api/data/atom/entry-data-minimal.xml @@ -0,0 +1,4 @@ + + + %s + diff --git a/swh/deposit/tests/api/data/atom/entry-data-parsing-error-prone.xml b/swh/deposit/tests/api/data/atom/entry-data-parsing-error-prone.xml new file mode 100644 --- /dev/null +++ b/swh/deposit/tests/api/data/atom/entry-data-parsing-error-prone.xml @@ -0,0 +1,5 @@ + + + Composing a Web of Audio Applications + + diff --git a/swh/deposit/tests/api/data/atom/entry-data0.xml b/swh/deposit/tests/api/data/atom/entry-data0.xml new file mode 100644 --- /dev/null +++ b/swh/deposit/tests/api/data/atom/entry-data0.xml @@ -0,0 +1,26 @@ + + + Awesome Compiler + hal + urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a + %s + 2017-10-07T15:17:08Z + some awesome author + something + awesome-compiler + This is an awesome compiler destined to +awesomely compile stuff +and other stuff + compiler,programming,language + 2005-10-07T17:17:08Z + 2005-10-07T17:17:08Z + release note + related link + + Awesome + https://hoster.org/awesome-compiler + GNU/Linux + 0.0.1 + running + all + diff --git a/swh/deposit/tests/api/data/atom/entry-data1.xml b/swh/deposit/tests/api/data/atom/entry-data1.xml new file mode 100644 --- /dev/null +++ b/swh/deposit/tests/api/data/atom/entry-data1.xml @@ -0,0 +1,24 @@ + + + hal + urn:uuid:2225c695-cfb8-4ebb-aaaa-80da344efa6a + 2017-10-07T15:17:08Z + some awesome author + something + awesome-compiler + This is an awesome compiler destined to +awesomely compile stuff +and other stuff + compiler,programming,language + 2005-10-07T17:17:08Z + 2005-10-07T17:17:08Z + release note + related link + + Awesome + https://hoster.org/awesome-compiler + GNU/Linux + 0.0.1 + running + all + diff --git a/swh/deposit/tests/api/data/atom/entry-data2.xml b/swh/deposit/tests/api/data/atom/entry-data2.xml new file mode 100644 --- /dev/null +++ b/swh/deposit/tests/api/data/atom/entry-data2.xml @@ -0,0 +1,6 @@ + + + some-external-id + https://hal-test.archives-ouvertes.fr/some-external-id + some awesome author + diff --git a/swh/deposit/tests/api/data/atom/entry-data3.xml b/swh/deposit/tests/api/data/atom/entry-data3.xml new file mode 100644 --- /dev/null +++ b/swh/deposit/tests/api/data/atom/entry-data3.xml @@ -0,0 +1,6 @@ + + + another one + no one + 2017-10-07T15:17:08Z + diff --git a/swh/deposit/tests/api/data/atom/entry-update-in-place.xml b/swh/deposit/tests/api/data/atom/entry-update-in-place.xml new file mode 100644 --- /dev/null +++ b/swh/deposit/tests/api/data/atom/entry-update-in-place.xml @@ -0,0 +1,7 @@ + + + urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa7b + Title + Type + diff --git a/swh/deposit/tests/api/data/atom/error-with-decimal.xml b/swh/deposit/tests/api/data/atom/error-with-decimal.xml new file mode 100644 --- /dev/null +++ b/swh/deposit/tests/api/data/atom/error-with-decimal.xml @@ -0,0 +1,38 @@ + + + Composing a Web of Audio Applications + hal + hal-01243065 + hal-01243065 + https://hal-test.archives-ouvertes.fr/hal-01243065 + test + + + DSP programming,Web,Composability,Faust + 2017-05-03T16:08:47+02:00 + The Web offers a great opportunity to share, deploy and use programs without installation difficulties. In this article we explore the idea of freely combining/composing real-time audio applications deployed on the Web using Faust audio DSP language. + 1 + 10.4 + phpstorm + stable + + linux + php + python + C + + GNU General Public License v3.0 only + + + CeCILL Free Software License Agreement v1.1 + + + HAL + hal@ccsd.cnrs.fr + + + Someone Nice + someone@nice.fr + FFJ + + diff --git a/swh/deposit/tests/api/data/atom/metadata.xml b/swh/deposit/tests/api/data/atom/metadata.xml new file mode 100644 --- /dev/null +++ b/swh/deposit/tests/api/data/atom/metadata.xml @@ -0,0 +1,32 @@ + + + Composing a Web of Audio Applications + hal + hal-01243065 + hal-01243065 + https://hal-test.archives-ouvertes.fr/hal-01243065 + test + DSP programming + this is the description + 1 + phpstorm + stable + php + python + C + + GNU General Public License v3.0 only + + + CeCILL Free Software License Agreement v1.1 + + + HAL + hal@ccsd.cnrs.fr + + + Morane Gruenpeter + +%s + diff --git a/swh/deposit/tests/api/data/atom/tei-sample.xml b/swh/deposit/tests/api/data/atom/tei-sample.xml new file mode 100644 --- /dev/null +++ b/swh/deposit/tests/api/data/atom/tei-sample.xml @@ -0,0 +1 @@ +HAL TEI export of hal-01587083CCSDDistributed under a Creative Commons Attribution 4.0 International License

HAL API platform

questionnaire software metadataMoraneGruenpeter7de56c632362954fa84172cad80afe4einria.fr1556733MoraneGruenpeterf85a43a5fb4a2e0778a77e017f28c8fdgmail.com2017-09-29 11:21:322017-10-03 17:20:132017-10-03 17:20:132017-09-292017-09-29contributorMoraneGruenpeterf85a43a5fb4a2e0778a77e017f28c8fdgmail.comCCSDhal-01587083https://hal.inria.fr/hal-01587083gruenpeter:hal-0158708320172017questionnaire software metadataMoraneGruenpeter7de56c632362954fa84172cad80afe4einria.fr1556733EnglishComputer Science [cs]SoftwareIRILLInitiative pour la Recherche et l'Innovation sur le Logiciel Libre
https://www.irill.org/
Universite Pierre et Marie Curie - Paris 6UPMC
4 place Jussieu - 75005 Paris
http://www.upmc.fr/
Institut National de Recherche en Informatique et en AutomatiqueInria
Domaine de VoluceauRocquencourt - BP 10578153 Le Chesnay Cedex
http://www.inria.fr/en/
Universite Paris Diderot - Paris 7UPD7
5 rue Thomas-Mann - 75205 Paris cedex 13
http://www.univ-paris-diderot.fr
diff --git a/swh/deposit/tests/api/test_converters.py b/swh/deposit/tests/api/test_converters.py --- a/swh/deposit/tests/api/test_converters.py +++ b/swh/deposit/tests/api/test_converters.py @@ -1,132 +1,123 @@ -# Copyright (C) 2017-2018 The Software Heritage developers +# Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from rest_framework.test import APITestCase - from swh.deposit.api.converters import convert_status_detail -class ConvertersTestCase(APITestCase): - - def test_convert_status_detail_empty(self): - actual_status_detail = convert_status_detail({}) - self.assertIsNone(actual_status_detail) - - actual_status_detail = convert_status_detail({'dummy-keys': []}) - self.assertIsNone(actual_status_detail) +def test_convert_status_detail_empty(): + for status_detail in [{}, {'dummy-keys': []}, None]: + assert convert_status_detail(status_detail) is None - actual_status_detail = convert_status_detail(None) - self.assertIsNone(actual_status_detail) - def test_convert_status_detail(self): - status_detail = { - 'url': { - 'summary': "At least one url field must be compatible with the client\'s domain name. The following url fields failed the check", # noqa - 'fields': ['blahurl', 'testurl'], +def test_convert_status_detail(): + status_detail = { + 'url': { + 'summary': "At least one url field must be compatible with the client\'s domain name. The following url fields failed the check", # noqa + 'fields': ['blahurl', 'testurl'], + }, + 'metadata': [ + { + 'summary': 'Mandatory fields missing', + 'fields': ['url', 'title'], }, - 'metadata': [ - { - 'summary': 'Mandatory fields missing', - 'fields': ['url', 'title'], - }, - { - 'summary': 'Alternate fields missing', - 'fields': ['name or title', 'url or badurl'] - } - ], - 'archive': [{ - 'summary': 'Unreadable archive', - 'fields': ['1'], - }], - } - - expected_status_detail = '''- Mandatory fields missing (url, title) + { + 'summary': 'Alternate fields missing', + 'fields': ['name or title', 'url or badurl'] + } + ], + 'archive': [{ + 'summary': 'Unreadable archive', + 'fields': ['1'], + }], + } + + expected_status_detail = '''- Mandatory fields missing (url, title) - Alternate fields missing (name or title, url or badurl) - Unreadable archive (1) - At least one url field must be compatible with the client's domain name. The following url fields failed the check (blahurl, testurl) ''' # noqa - actual_status_detail = convert_status_detail(status_detail) + actual_status_detail = convert_status_detail(status_detail) + assert actual_status_detail == expected_status_detail - self.assertEqual(actual_status_detail, expected_status_detail) - def test_convert_status_detail_2(self): - status_detail = { - 'url': { - 'summary': 'At least one compatible url field. Failed', - 'fields': ['testurl'], +def test_convert_status_detail_2(): + status_detail = { + 'url': { + 'summary': 'At least one compatible url field. Failed', + 'fields': ['testurl'], + }, + 'metadata': [ + { + 'summary': 'Mandatory fields missing', + 'fields': ['name'], + }, + ], + 'archive': [ + { + 'summary': 'Invalid archive', + 'fields': ['2'], }, - 'metadata': [ - { - 'summary': 'Mandatory fields missing', - 'fields': ['name'], - }, - ], - 'archive': [ - { - 'summary': 'Invalid archive', - 'fields': ['2'], - }, - { - 'summary': 'Unsupported archive', - 'fields': ['1'], - } - ], - } - - expected_status_detail = '''- Mandatory fields missing (name) + { + 'summary': 'Unsupported archive', + 'fields': ['1'], + } + ], + } + + expected_status_detail = '''- Mandatory fields missing (name) - Invalid archive (2) - Unsupported archive (1) - At least one compatible url field. Failed (testurl) ''' - actual_status_detail = convert_status_detail(status_detail) + actual_status_detail = convert_status_detail(status_detail) + assert actual_status_detail == expected_status_detail - self.assertEqual(actual_status_detail, expected_status_detail) - def test_convert_status_detail_3(self): - status_detail = { - 'url': { - 'summary': 'At least one compatible url field', - }, - } +def test_convert_status_detail_3(): + status_detail = { + 'url': { + 'summary': 'At least one compatible url field', + }, + } - expected_status_detail = '- At least one compatible url field\n' - actual_status_detail = convert_status_detail(status_detail) - self.assertEqual(actual_status_detail, expected_status_detail) + expected_status_detail = '- At least one compatible url field\n' + actual_status_detail = convert_status_detail(status_detail) + assert actual_status_detail == expected_status_detail - def test_convert_status_detail_edge_case(self): - status_detail = { - 'url': { - 'summary': 'At least one compatible url field. Failed', - 'fields': ['testurl'], + +def test_convert_status_detail_edge_case(): + status_detail = { + 'url': { + 'summary': 'At least one compatible url field. Failed', + 'fields': ['testurl'], + }, + 'metadata': [ + { + 'summary': 'Mandatory fields missing', + 'fields': ['9', 10, 1.212], + }, + ], + 'archive': [ + { + 'summary': 'Invalid archive', + 'fields': ['3'], }, - 'metadata': [ - { - 'summary': 'Mandatory fields missing', - 'fields': ['9', 10, 1.212], - }, - ], - 'archive': [ - { - 'summary': 'Invalid archive', - 'fields': ['3'], - }, - { - 'summary': 'Unsupported archive', - 'fields': [2], - } - ], - } - - expected_status_detail = '''- Mandatory fields missing (9, 10, 1.212) + { + 'summary': 'Unsupported archive', + 'fields': [2], + } + ], + } + + expected_status_detail = '''- Mandatory fields missing (9, 10, 1.212) - Invalid archive (3) - Unsupported archive (2) - At least one compatible url field. Failed (testurl) ''' - actual_status_detail = convert_status_detail(status_detail) - - self.assertEqual(actual_status_detail, expected_status_detail) + actual_status_detail = convert_status_detail(status_detail) + assert actual_status_detail == expected_status_detail diff --git a/swh/deposit/tests/api/test_deposit.py b/swh/deposit/tests/api/test_deposit.py --- a/swh/deposit/tests/api/test_deposit.py +++ b/swh/deposit/tests/api/test_deposit.py @@ -8,153 +8,182 @@ from django.urls import reverse from io import BytesIO from rest_framework import status -from rest_framework.test import APITestCase -from swh.deposit.config import COL_IRI, EDIT_SE_IRI, DEPOSIT_STATUS_REJECTED -from swh.deposit.config import DEPOSIT_STATUS_PARTIAL -from swh.deposit.config import DEPOSIT_STATUS_LOAD_SUCCESS -from swh.deposit.config import DEPOSIT_STATUS_LOAD_FAILURE -from swh.deposit.models import Deposit, DepositClient, DepositCollection +from swh.deposit.config import ( + COL_IRI, EDIT_SE_IRI, DEPOSIT_STATUS_REJECTED, + DEPOSIT_STATUS_PARTIAL, DEPOSIT_STATUS_LOAD_SUCCESS, + DEPOSIT_STATUS_LOAD_FAILURE +) + +from swh.deposit.models import Deposit from swh.deposit.parsers import parse_xml -from ..common import BasicTestCase, WithAuthTestCase, CommonCreationRoutine +def test_deposit_post_will_fail_with_401(client): + """Without authentication, endpoint refuses access with 401 response + + """ + url = reverse(COL_IRI, args=['hal']) + response = client.post(url) + assert response.status_code == status.HTTP_401_UNAUTHORIZED + + +def test_access_to_another_user_collection_is_forbidden( + authenticated_client, deposit_another_collection, deposit_user): + """Access to another user collection should return a 403 -class DepositNoAuthCase(APITestCase, BasicTestCase): - """Deposit access are protected with basic authentication. + """ + coll2 = deposit_another_collection + url = reverse(COL_IRI, args=[coll2.name]) + response = authenticated_client.post(url) + assert response.status_code == status.HTTP_403_FORBIDDEN + msg = 'Client %s cannot access collection %s' % ( + deposit_user.username, coll2.name, ) + assert msg in response.content.decode('utf-8') + + +def test_delete_on_col_iri_not_supported( + authenticated_client, deposit_collection): + """Delete on col iri should return a 405 response + + """ + url = reverse(COL_IRI, args=[deposit_collection.name]) + response = authenticated_client.delete(url) + assert response.status_code == status.HTTP_405_METHOD_NOT_ALLOWED + assert 'DELETE method is not supported on this endpoint' in \ + response.content.decode('utf-8') + + +def create_deposit_with_rejection_status( + authenticated_client, deposit_collection): + url = reverse(COL_IRI, args=[deposit_collection.name]) + + data = b'some data which is clearly not a zip file' + md5sum = hashlib.md5(data).hexdigest() + external_id = 'some-external-id-1' + + # when + response = authenticated_client.post( + url, + content_type='application/zip', # as zip + data=data, + # + headers + CONTENT_LENGTH=len(data), + # other headers needs HTTP_ prefix to be taken into account + HTTP_SLUG=external_id, + HTTP_CONTENT_MD5=md5sum, + HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', + HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') + + assert response.status_code == status.HTTP_201_CREATED + response_content = parse_xml(BytesIO(response.content)) + actual_state = response_content['deposit_status'] + assert actual_state == DEPOSIT_STATUS_REJECTED + + +def test_act_on_deposit_rejected_is_not_permitted( + authenticated_client, deposit_collection, rejected_deposit, + atom_dataset): + deposit = rejected_deposit + + response = authenticated_client.post( + reverse(EDIT_SE_IRI, args=[deposit.collection.name, deposit.id]), + content_type='application/atom+xml;type=entry', + data=atom_dataset['entry-data1'], + HTTP_SLUG=deposit.external_id) + + assert response.status_code == status.HTTP_400_BAD_REQUEST + msg = 'You can only act on deposit with status '%s'' % ( + DEPOSIT_STATUS_PARTIAL, ) + assert msg in response.content.decode('utf-8') + + +def test_add_deposit_when_partial_makes_new_deposit( + authenticated_client, deposit_collection, partial_deposit, + atom_dataset): + """Posting deposit on collection when previous is partial makes new deposit """ - def test_post_will_fail_with_401(self): - """Without authentication, endpoint refuses access with 401 response + deposit = partial_deposit + assert deposit.status == DEPOSIT_STATUS_PARTIAL + + # adding a new deposit with the same external id + response = authenticated_client.post( + reverse(COL_IRI, args=[deposit_collection.name]), + content_type='application/atom+xml;type=entry', + data=atom_dataset['entry-data0'] % deposit.external_id.encode('utf-8'), + HTTP_SLUG=deposit.external_id + ) + + assert response.status_code == status.HTTP_201_CREATED + response_content = parse_xml(BytesIO(response.content)) + deposit_id = response_content['deposit_id'] + + assert deposit_id != deposit.id # new deposit + + new_deposit = Deposit.objects.get(pk=deposit_id) + assert new_deposit != deposit + assert new_deposit.parent is None + + +def test_add_deposit_when_failed_makes_new_deposit_with_no_parent( + authenticated_client, deposit_collection, failed_deposit, + atom_dataset): + """Posting deposit on collection when deposit done makes new deposit with + parent + + """ + deposit = failed_deposit + assert deposit.status == DEPOSIT_STATUS_LOAD_FAILURE + + # adding a new deposit with the same external id as a completed deposit + # creates the parenting chain + response = authenticated_client.post( + reverse(COL_IRI, args=[deposit_collection.name]), + content_type='application/atom+xml;type=entry', + data=atom_dataset['entry-data0'] % deposit.external_id.encode('utf-8'), + HTTP_SLUG=deposit.external_id) - """ - url = reverse(COL_IRI, args=[self.collection.name]) + assert response.status_code == status.HTTP_201_CREATED + response_content = parse_xml(BytesIO(response.content)) + deposit_id = response_content['deposit_id'] - # when - response = self.client.post(url) + assert deposit_id != deposit.id - # then - self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) + new_deposit = Deposit.objects.get(pk=deposit_id) + assert new_deposit != deposit + assert new_deposit.parent is None -class DepositFailuresTest(APITestCase, WithAuthTestCase, BasicTestCase, - CommonCreationRoutine): - """Deposit access are protected with basic authentication. +def test_add_deposit_when_done_makes_new_deposit_with_parent_old_one( + authenticated_client, deposit_collection, completed_deposit, + atom_dataset): + """Posting deposit on collection when deposit done makes new deposit with + parent """ - def setUp(self): - super().setUp() - # Add another user - _collection2 = DepositCollection(name='some') - _collection2.save() - _user = DepositClient.objects.create_user(username='user', - password='user') - _user.collections = [_collection2.id] - self.collection2 = _collection2 - - def test_access_to_another_user_collection_is_forbidden(self): - """Access to another user collection should return a 403 - - """ - url = reverse(COL_IRI, args=[self.collection2.name]) - response = self.client.post(url) - self.assertEqual(response.status_code, - status.HTTP_403_FORBIDDEN) - self.assertRegex(response.content.decode('utf-8'), - 'Client hal cannot access collection %s' % ( - self.collection2.name, )) - - def test_delete_on_col_iri_not_supported(self): - """Delete on col iri should return a 405 response - - """ - url = reverse(COL_IRI, args=[self.collection.name]) - response = self.client.delete(url) - self.assertEqual(response.status_code, - status.HTTP_405_METHOD_NOT_ALLOWED) - self.assertRegex(response.content.decode('utf-8'), - 'DELETE method is not supported on this endpoint') - - def create_deposit_with_rejection_status(self): - url = reverse(COL_IRI, args=[self.collection.name]) - - data = b'some data which is clearly not a zip file' - md5sum = hashlib.md5(data).hexdigest() - external_id = 'some-external-id-1' - - # when - response = self.client.post( - url, - content_type='application/zip', # as zip - data=data, - # + headers - CONTENT_LENGTH=len(data), - # other headers needs HTTP_ prefix to be taken into account - HTTP_SLUG=external_id, - HTTP_CONTENT_MD5=md5sum, - HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', - HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') - - self.assertEqual(response.status_code, status.HTTP_201_CREATED) - response_content = parse_xml(BytesIO(response.content)) - actual_state = response_content['deposit_status'] - self.assertEqual(actual_state, DEPOSIT_STATUS_REJECTED) - - def test_act_on_deposit_rejected_is_not_permitted(self): - deposit_id = self.create_deposit_with_status(DEPOSIT_STATUS_REJECTED) - - deposit = Deposit.objects.get(pk=deposit_id) - assert deposit.status == DEPOSIT_STATUS_REJECTED - - response = self.client.post( - reverse(EDIT_SE_IRI, args=[self.collection.name, deposit_id]), - content_type='application/atom+xml;type=entry', - data=self.atom_entry_data1, - HTTP_SLUG='external-id') - - self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) - self.assertRegex( - response.content.decode('utf-8'), - "You can only act on deposit with status '%s'" % ( - DEPOSIT_STATUS_PARTIAL, )) - - def test_add_deposit_with_parent(self): - # given multiple deposit already loaded - deposit_id = self.create_deposit_with_status( - status=DEPOSIT_STATUS_LOAD_SUCCESS, - external_id='some-external-id') - - deposit1 = Deposit.objects.get(pk=deposit_id) - self.assertIsNotNone(deposit1) - self.assertEqual(deposit1.external_id, 'some-external-id') - self.assertEqual(deposit1.status, DEPOSIT_STATUS_LOAD_SUCCESS) - - deposit_id2 = self.create_deposit_with_status( - status=DEPOSIT_STATUS_LOAD_SUCCESS, - external_id='some-external-id') - - deposit2 = Deposit.objects.get(pk=deposit_id2) - self.assertIsNotNone(deposit2) - self.assertEqual(deposit2.external_id, 'some-external-id') - self.assertEqual(deposit2.status, DEPOSIT_STATUS_LOAD_SUCCESS) - - deposit_id3 = self.create_deposit_with_status( - status=DEPOSIT_STATUS_LOAD_FAILURE, - external_id='some-external-id') - - deposit3 = Deposit.objects.get(pk=deposit_id3) - self.assertIsNotNone(deposit3) - self.assertEqual(deposit3.external_id, 'some-external-id') - self.assertEqual(deposit3.status, DEPOSIT_STATUS_LOAD_FAILURE) - - # when - deposit_id3 = self.create_simple_deposit_partial( - external_id='some-external-id') - - # then - deposit4 = Deposit.objects.get(pk=deposit_id3) - - self.assertIsNotNone(deposit4) - self.assertEqual(deposit4.external_id, 'some-external-id') - self.assertEqual(deposit4.status, DEPOSIT_STATUS_PARTIAL) - self.assertEqual(deposit4.parent, deposit2) + # given multiple deposit already loaded + deposit = completed_deposit + assert deposit.status == DEPOSIT_STATUS_LOAD_SUCCESS + + # adding a new deposit with the same external id as a completed deposit + # creates the parenting chain + response = authenticated_client.post( + reverse(COL_IRI, args=[deposit_collection.name]), + content_type='application/atom+xml;type=entry', + data=atom_dataset['entry-data0'] % deposit.external_id.encode('utf-8'), + HTTP_SLUG=deposit.external_id + ) + + assert response.status_code == status.HTTP_201_CREATED + response_content = parse_xml(BytesIO(response.content)) + deposit_id = response_content['deposit_id'] + + assert deposit_id != deposit.id + + new_deposit = Deposit.objects.get(pk=deposit_id) + assert deposit.collection == new_deposit.collection + assert deposit.external_id == new_deposit.external_id + + assert new_deposit != deposit + assert new_deposit.parent == deposit diff --git a/swh/deposit/tests/api/test_deposit_atom.py b/swh/deposit/tests/api/test_deposit_atom.py --- a/swh/deposit/tests/api/test_deposit_atom.py +++ b/swh/deposit/tests/api/test_deposit_atom.py @@ -3,471 +3,311 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import pytest + from django.urls import reverse from io import BytesIO from rest_framework import status -from rest_framework.test import APITestCase from swh.deposit.config import COL_IRI, DEPOSIT_STATUS_DEPOSITED -from swh.deposit.models import Deposit, DepositRequest +from swh.deposit.models import Deposit, DepositRequest, DepositCollection from swh.deposit.parsers import parse_xml -from ..common import BasicTestCase, WithAuthTestCase + +def test_post_deposit_atom_201_even_with_decimal( + authenticated_client, deposit_collection, atom_dataset): + """Posting an initial atom entry should return 201 with deposit receipt + + """ + atom_error_with_decimal = atom_dataset['error-with-decimal'] + + response = authenticated_client.post( + reverse(COL_IRI, args=[deposit_collection.name]), + content_type='application/atom+xml;type=entry', + data=atom_error_with_decimal, + HTTP_SLUG='external-id', + HTTP_IN_PROGRESS='false') + + # then + assert response.status_code == status.HTTP_201_CREATED + + response_content = parse_xml(BytesIO(response.content)) + deposit_id = response_content['deposit_id'] + + deposit = Deposit.objects.get(pk=deposit_id) + dr = DepositRequest.objects.get(deposit=deposit) + + assert dr.metadata is not None + sw_version = dr.metadata.get('codemeta:softwareVersion') + assert sw_version == '10.4' + + +def test_post_deposit_atom_400_with_empty_body( + authenticated_client, deposit_collection, atom_dataset): + """Posting empty body request should return a 400 response + + """ + response = authenticated_client.post( + reverse(COL_IRI, args=[deposit_collection.name]), + content_type='application/atom+xml;type=entry', + data=atom_dataset['entry-data-empty-body']) + assert response.status_code == status.HTTP_400_BAD_REQUEST + + +def test_post_deposit_atom_400_badly_formatted_atom( + authenticated_client, deposit_collection, atom_dataset): + """Posting a badly formatted atom should return a 400 response + + """ + response = authenticated_client.post( + reverse(COL_IRI, args=[deposit_collection.name]), + content_type='application/atom+xml;type=entry', + data=atom_dataset['entry-data-badly-formatted']) + assert response.status_code == status.HTTP_400_BAD_REQUEST + + +def test_post_deposit_atom_parsing_error( + authenticated_client, deposit_collection, atom_dataset): + """Posting parsing error prone atom should return 400 + + """ + response = authenticated_client.post( + reverse(COL_IRI, args=[deposit_collection.name]), + content_type='application/atom+xml;type=entry', + data=atom_dataset['entry-data-parsing-error-prone']) + assert response.status_code == status.HTTP_400_BAD_REQUEST + + +def test_post_deposit_atom_no_slug_header( + authenticated_client, deposit_collection, atom_dataset): + """Posting an atom entry without a slug header should return a 400 + + """ + url = reverse(COL_IRI, args=[deposit_collection.name]) + + # when + response = authenticated_client.post( + url, + content_type='application/atom+xml;type=entry', + data=atom_dataset['entry-data0'], + # + headers + HTTP_IN_PROGRESS='false') + + assert b'Missing SLUG header' in response.content + assert response.status_code == status.HTTP_400_BAD_REQUEST + + +def test_post_deposit_atom_unknown_collection( + authenticated_client, atom_dataset): + """Posting an atom entry to an unknown collection should return a 404 + + """ + unknown_collection = 'unknown-one' + with pytest.raises(DepositCollection.DoesNotExist): + DepositCollection.objects.get(name=unknown_collection) + + response = authenticated_client.post( + reverse(COL_IRI, args=[unknown_collection]), # <- unknown collection + content_type='application/atom+xml;type=entry', + data=atom_dataset['entry-data0'], + HTTP_SLUG='something') + assert response.status_code == status.HTTP_404_NOT_FOUND + + +def test_post_deposit_atom_entry_initial( + authenticated_client, deposit_collection, atom_dataset): + """Posting an initial atom entry should return 201 with deposit receipt + + """ + # given + external_id = 'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a' + + with pytest.raises(Deposit.DoesNotExist): + Deposit.objects.get(external_id=external_id) + + atom_entry_data = atom_dataset['entry-data0'] % external_id.encode('utf-8') + + # when + response = authenticated_client.post( + reverse(COL_IRI, args=[deposit_collection.name]), + content_type='application/atom+xml;type=entry', + data=atom_entry_data, + HTTP_SLUG=external_id, + HTTP_IN_PROGRESS='false') + + # then + assert response.status_code == status.HTTP_201_CREATED + + response_content = parse_xml(BytesIO(response.content)) + deposit_id = response_content['deposit_id'] + + deposit = Deposit.objects.get(pk=deposit_id) + assert deposit.collection == deposit_collection + assert deposit.external_id == external_id + assert deposit.status == DEPOSIT_STATUS_DEPOSITED + + # one associated request to a deposit + deposit_request = DepositRequest.objects.get(deposit=deposit) + assert deposit_request.metadata is not None + assert deposit_request.raw_metadata == atom_entry_data.decode('utf-8') + assert bool(deposit_request.archive) is False + + +def test_post_deposit_atom_entry_with_codemeta( + authenticated_client, deposit_collection, atom_dataset): + """Posting an initial atom entry should return 201 with deposit receipt + + """ + # given + external_id = 'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a' + + with pytest.raises(Deposit.DoesNotExist): + Deposit.objects.get(external_id=external_id) + + atom_entry_data = atom_dataset['codemeta-sample'] % external_id.encode('utf-8') # noqa + + # when + response = authenticated_client.post( + reverse(COL_IRI, args=[deposit_collection.name]), + content_type='application/atom+xml;type=entry', + data=atom_entry_data, + HTTP_SLUG=external_id, + HTTP_IN_PROGRESS='false') + + # then + assert response.status_code == status.HTTP_201_CREATED + + response_content = parse_xml(BytesIO(response.content)) + + deposit_id = response_content['deposit_id'] + + deposit = Deposit.objects.get(pk=deposit_id) + assert deposit.collection == deposit_collection + assert deposit.external_id == external_id + assert deposit.status == DEPOSIT_STATUS_DEPOSITED + + # one associated request to a deposit + deposit_request = DepositRequest.objects.get(deposit=deposit) + assert deposit_request.metadata is not None + assert deposit_request.raw_metadata == atom_entry_data.decode('utf-8') + assert bool(deposit_request.archive) is False + + +def test_post_deposit_atom_entry_tei( + authenticated_client, deposit_collection, atom_dataset): + """Posting initial atom entry as TEI should return 201 with receipt + + """ + # given + external_id = 'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a' + with pytest.raises(Deposit.DoesNotExist): + Deposit.objects.get(external_id=external_id) + + atom_entry_data = atom_dataset['tei-sample'] + + # when + response = authenticated_client.post( + reverse(COL_IRI, args=[deposit_collection.name]), + content_type='application/atom+xml;type=entry', + data=atom_entry_data, + HTTP_SLUG=external_id, + HTTP_IN_PROGRESS='false') + + # then + assert response.status_code == status.HTTP_201_CREATED + + response_content = parse_xml(BytesIO(response.content)) + deposit_id = response_content['deposit_id'] + + deposit = Deposit.objects.get(pk=deposit_id) + assert deposit.collection == deposit_collection + assert deposit.external_id == external_id + assert deposit.status == DEPOSIT_STATUS_DEPOSITED + + # one associated request to a deposit + deposit_request = DepositRequest.objects.get(deposit=deposit) + assert deposit_request.metadata is not None + assert deposit_request.raw_metadata == atom_entry_data.decode('utf-8') + assert bool(deposit_request.archive) is False -class DepositAtomEntryTestCase(APITestCase, WithAuthTestCase, BasicTestCase): - """Try and post atom entry deposit. +def test_post_deposit_atom_entry_multiple_steps( + authenticated_client, deposit_collection, atom_dataset): + """After initial deposit, updating a deposit should return a 201 """ - def setUp(self): - super().setUp() - - self.atom_entry_data0 = b""" - - Awesome Compiler - hal - urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a - %s - 2017-10-07T15:17:08Z - some awesome author - something - awesome-compiler - This is an awesome compiler destined to -awesomely compile stuff -and other stuff - compiler,programming,language - 2005-10-07T17:17:08Z - 2005-10-07T17:17:08Z - release note - related link - - Awesome - https://hoster.org/awesome-compiler - GNU/Linux - 0.0.1 - running - all -""" - - self.atom_entry_data1 = b""" - - hal - urn:uuid:2225c695-cfb8-4ebb-aaaa-80da344efa6a - 2017-10-07T15:17:08Z - some awesome author - something - awesome-compiler - This is an awesome compiler destined to -awesomely compile stuff -and other stuff - compiler,programming,language - 2005-10-07T17:17:08Z - 2005-10-07T17:17:08Z - release note - related link - - Awesome - https://hoster.org/awesome-compiler - GNU/Linux - 0.0.1 - running - all -""" - - self.atom_entry_data_badly_formatted = b""" -""" - - self.atom_error_with_decimal = b""" - - Composing a Web of Audio Applications - hal - hal-01243065 - hal-01243065 - https://hal-test.archives-ouvertes.fr/hal-01243065 - test - - - DSP programming,Web,Composability,Faust - 2017-05-03T16:08:47+02:00 - The Web offers a great opportunity to share, deploy and use programs without installation difficulties. In this article we explore the idea of freely combining/composing real-time audio applications deployed on the Web using Faust audio DSP language. - 1 - 10.4 - phpstorm - stable - - linux - php - python - C - - GNU General Public License v3.0 only - - - CeCILL Free Software License Agreement v1.1 - - - HAL - hal@ccsd.cnrs.fr - - - Someone Nice - someone@nice.fr - FFJ - - -""" # noqa - - def test_post_deposit_atom_201_even_with_decimal(self): - """Posting an initial atom entry should return 201 with deposit receipt - - """ - # given - # when - response = self.client.post( - reverse(COL_IRI, args=[self.collection.name]), - content_type='application/atom+xml;type=entry', - data=self.atom_error_with_decimal, - HTTP_SLUG='external-id', - HTTP_IN_PROGRESS='false') - - # then - self.assertEqual(response.status_code, status.HTTP_201_CREATED) - - response_content = parse_xml(BytesIO(response.content)) - deposit_id = response_content['deposit_id'] - - deposit = Deposit.objects.get(pk=deposit_id) - dr = DepositRequest.objects.get(deposit=deposit) - - self.assertIsNotNone(dr.metadata) - sw_version = dr.metadata.get('codemeta:softwareVersion') - self.assertEqual(sw_version, '10.4') - - def test_post_deposit_atom_400_with_empty_body(self): - """Posting empty body request should return a 400 response - - """ - atom_entry_data_empty_body = b""" -""" - - response = self.client.post( - reverse(COL_IRI, args=[self.collection.name]), - content_type='application/atom+xml;type=entry', - data=atom_entry_data_empty_body) - self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) - - def test_post_deposit_atom_400_badly_formatted_atom(self): - """Posting a badly formatted atom should return a 400 response - - """ - response = self.client.post( - reverse(COL_IRI, args=[self.collection.name]), - content_type='application/atom+xml;type=entry', - data=self.atom_entry_data_badly_formatted) - self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) - - def test_post_deposit_atom_400_with_parsing_error(self): - """Posting parsing error prone atom should return 400 - - """ - atom_entry_data_parsing_error_prone = b""" - - Composing a Web of Audio Applications - - -""" - response = self.client.post( - reverse(COL_IRI, args=[self.collection.name]), - content_type='application/atom+xml;type=entry', - data=atom_entry_data_parsing_error_prone) - self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) - - def test_post_deposit_atom_400_without_slug_header(self): - """Posting an atom entry without a slug header should return a 400 - - """ - url = reverse(COL_IRI, args=[self.collection.name]) - - # when - response = self.client.post( - url, - content_type='application/atom+xml;type=entry', - data=self.atom_entry_data0, - # + headers - HTTP_IN_PROGRESS='false') - - self.assertIn(b'Missing SLUG header', response.content) - self.assertEqual(response.status_code, - status.HTTP_400_BAD_REQUEST) - - def test_post_deposit_atom_404_unknown_collection(self): - """Posting an atom entry to an unknown collection should return a 404 - - """ - atom_entry_data3 = b""" - - something -""" - - response = self.client.post( - reverse(COL_IRI, args=['unknown-one']), - content_type='application/atom+xml;type=entry', - data=atom_entry_data3, - HTTP_SLUG='something') - self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) - - def test_post_deposit_atom_entry_initial(self): - """Posting an initial atom entry should return 201 with deposit receipt - - """ - # given - external_id = 'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a' - - with self.assertRaises(Deposit.DoesNotExist): - Deposit.objects.get(external_id=external_id) - - atom_entry_data = self.atom_entry_data0 % external_id.encode('utf-8') - - # when - response = self.client.post( - reverse(COL_IRI, args=[self.collection.name]), - content_type='application/atom+xml;type=entry', - data=atom_entry_data, - HTTP_SLUG='external-id', - HTTP_IN_PROGRESS='false') - - # then - self.assertEqual(response.status_code, status.HTTP_201_CREATED) - - response_content = parse_xml(BytesIO(response.content)) - deposit_id = response_content['deposit_id'] - - deposit = Deposit.objects.get(pk=deposit_id) - self.assertEqual(deposit.collection, self.collection) - self.assertEqual(deposit.external_id, external_id) - self.assertEqual(deposit.status, DEPOSIT_STATUS_DEPOSITED) - self.assertEqual(deposit.client, self.user) - - # one associated request to a deposit - deposit_request = DepositRequest.objects.get(deposit=deposit) - self.assertIsNotNone(deposit_request.metadata) - self.assertEqual( - deposit_request.raw_metadata, atom_entry_data.decode('utf-8')) - self.assertFalse(bool(deposit_request.archive)) - - def test_post_deposit_atom_entry_with_codemeta(self): - """Posting an initial atom entry should return 201 with deposit receipt - - """ - # given - external_id = 'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a' - - with self.assertRaises(Deposit.DoesNotExist): - Deposit.objects.get(external_id=external_id) - - atom_entry_data = b""" - - - - %s - hal-01587361 - https://hal.inria.fr/hal-01587361 - https://hal.inria.fr/hal-01587361/document - https://hal.inria.fr/hal-01587361/file/AffectationRO-v1.0.0.zip - doi:10.5281/zenodo.438684 - The assignment problem - AffectationRO - Gruenpeter, Morane - [INFO] Computer Science [cs] - [INFO.INFO-RO] Computer Science [cs]/Operations Research [cs.RO] - SOFTWARE - Project in OR: The assignment problemA java implementation for the assignment problem first release - description fr - 2015-06-01 - 2017-10-19 - en - - - url stable - Version sur hal - Version entre par lutilisateur - Mots-cls - Commentaire - Rfrence interne - - Collaboration/Projet - nom du projet - id - - Voir aussi - Financement - Projet ANR - Projet Europen - Platform/OS - Dpendances - Etat du dveloppement - - license - url spdx - - Outils de dveloppement- outil no1 - Outils de dveloppement- outil no2 - http://code.com - language 1 - language 2 - """ % external_id.encode('utf-8') # noqa - - # when - response = self.client.post( - reverse(COL_IRI, args=[self.collection.name]), - content_type='application/atom+xml;type=entry', - data=atom_entry_data, - HTTP_SLUG='external-id', - HTTP_IN_PROGRESS='false') - - # then - self.assertEqual(response.status_code, status.HTTP_201_CREATED) - - response_content = parse_xml(BytesIO(response.content)) - - deposit_id = response_content['deposit_id'] - - deposit = Deposit.objects.get(pk=deposit_id) - self.assertEqual(deposit.collection, self.collection) - self.assertEqual(deposit.external_id, external_id) - self.assertEqual(deposit.status, DEPOSIT_STATUS_DEPOSITED) - self.assertEqual(deposit.client, self.user) - - # one associated request to a deposit - deposit_request = DepositRequest.objects.get(deposit=deposit) - self.assertIsNotNone(deposit_request.metadata) - self.assertEqual( - deposit_request.raw_metadata, atom_entry_data.decode('utf-8')) - - self.assertFalse(bool(deposit_request.archive)) - - def test_post_deposit_atom_entry_tei(self): - """Posting initial atom entry as TEI should return 201 with receipt - - """ - # given - external_id = 'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a' - with self.assertRaises(Deposit.DoesNotExist): - Deposit.objects.get(external_id=external_id) - - atom_entry_data = b"""HAL TEI export of hal-01587083CCSDDistributed under a Creative Commons Attribution 4.0 International License

HAL API platform

questionnaire software metadataMoraneGruenpeter7de56c632362954fa84172cad80afe4einria.fr1556733MoraneGruenpeterf85a43a5fb4a2e0778a77e017f28c8fdgmail.com2017-09-29 11:21:322017-10-03 17:20:132017-10-03 17:20:132017-09-292017-09-29contributorMoraneGruenpeterf85a43a5fb4a2e0778a77e017f28c8fdgmail.comCCSDhal-01587083https://hal.inria.fr/hal-01587083gruenpeter:hal-0158708320172017questionnaire software metadataMoraneGruenpeter7de56c632362954fa84172cad80afe4einria.fr1556733EnglishComputer Science [cs]SoftwareIRILLInitiative pour la Recherche et l'Innovation sur le Logiciel Libre
https://www.irill.org/
Universite Pierre et Marie Curie - Paris 6UPMC
4 place Jussieu - 75005 Paris
http://www.upmc.fr/
Institut National de Recherche en Informatique et en AutomatiqueInria
Domaine de VoluceauRocquencourt - BP 10578153 Le Chesnay Cedex
http://www.inria.fr/en/
Universite Paris Diderot - Paris 7UPD7
5 rue Thomas-Mann - 75205 Paris cedex 13
http://www.univ-paris-diderot.fr
""" # noqa - - # when - response = self.client.post( - reverse(COL_IRI, args=[self.collection.name]), - content_type='application/atom+xml;type=entry', - data=atom_entry_data, - HTTP_SLUG=external_id, - HTTP_IN_PROGRESS='false') - - # then - self.assertEqual(response.status_code, status.HTTP_201_CREATED) - - response_content = parse_xml(BytesIO(response.content)) - deposit_id = response_content['deposit_id'] - - deposit = Deposit.objects.get(pk=deposit_id) - self.assertEqual(deposit.collection, self.collection) - self.assertEqual(deposit.external_id, external_id) - self.assertEqual(deposit.status, DEPOSIT_STATUS_DEPOSITED) - self.assertEqual(deposit.client, self.user) - - # one associated request to a deposit - deposit_request = DepositRequest.objects.get(deposit=deposit) - self.assertIsNotNone(deposit_request.metadata) - self.assertEqual( - deposit_request.raw_metadata, atom_entry_data.decode('utf-8')) - self.assertFalse(bool(deposit_request.archive)) - - def test_post_deposit_atom_entry_multiple_steps(self): - """After initial deposit, updating a deposit should return a 201 - - """ - # given - external_id = 'urn:uuid:2225c695-cfb8-4ebb-aaaa-80da344efa6a' - - with self.assertRaises(Deposit.DoesNotExist): - deposit = Deposit.objects.get(external_id=external_id) - - # when - response = self.client.post( - reverse(COL_IRI, args=[self.collection.name]), - content_type='application/atom+xml;type=entry', - data=self.atom_entry_data1, - HTTP_IN_PROGRESS='True', - HTTP_SLUG=external_id) - - # then - self.assertEqual(response.status_code, status.HTTP_201_CREATED) - - response_content = parse_xml(BytesIO(response.content)) - deposit_id = int(response_content['deposit_id']) - - deposit = Deposit.objects.get(pk=deposit_id) - self.assertEqual(deposit.collection, self.collection) - self.assertEqual(deposit.external_id, external_id) - self.assertEqual(deposit.status, 'partial') - self.assertEqual(deposit.client, self.user) - - # one associated request to a deposit - deposit_requests = DepositRequest.objects.filter(deposit=deposit) - self.assertEqual(len(deposit_requests), 1) - - atom_entry_data = b""" - - %s -""" % external_id.encode('utf-8') - - update_uri = response._headers['location'][1] - - # when updating the first deposit post - response = self.client.post( - update_uri, - content_type='application/atom+xml;type=entry', - data=atom_entry_data, - HTTP_IN_PROGRESS='False') - - # then - self.assertEqual(response.status_code, status.HTTP_201_CREATED) - - response_content = parse_xml(BytesIO(response.content)) - deposit_id = int(response_content['deposit_id']) - - deposit = Deposit.objects.get(pk=deposit_id) - self.assertEqual(deposit.collection, self.collection) - self.assertEqual(deposit.external_id, external_id) - self.assertEqual(deposit.status, DEPOSIT_STATUS_DEPOSITED) - self.assertEqual(deposit.client, self.user) - - self.assertEqual(len(Deposit.objects.all()), 1) - - # now 2 associated requests to a same deposit - deposit_requests = DepositRequest.objects.filter( - deposit=deposit).order_by('id') - self.assertEqual(len(deposit_requests), 2) - - expected_meta = [ - { - 'metadata': parse_xml(self.atom_entry_data1), - 'raw_metadata': self.atom_entry_data1.decode('utf-8'), - }, - { - 'metadata': parse_xml(atom_entry_data), - 'raw_metadata': atom_entry_data.decode('utf-8'), - } - ] - - for i, deposit_request in enumerate(deposit_requests): - actual_metadata = deposit_request.metadata - self.assertEqual(actual_metadata, - expected_meta[i]['metadata']) - self.assertEqual(deposit_request.raw_metadata, - expected_meta[i]['raw_metadata']) - self.assertFalse(bool(deposit_request.archive)) + # given + external_id = 'urn:uuid:2225c695-cfb8-4ebb-aaaa-80da344efa6a' + + with pytest.raises(Deposit.DoesNotExist): + deposit = Deposit.objects.get(external_id=external_id) + + # when + response = authenticated_client.post( + reverse(COL_IRI, args=[deposit_collection.name]), + content_type='application/atom+xml;type=entry', + data=atom_dataset['entry-data1'], + HTTP_IN_PROGRESS='True', + HTTP_SLUG=external_id) + + # then + assert response.status_code == status.HTTP_201_CREATED + + response_content = parse_xml(BytesIO(response.content)) + deposit_id = int(response_content['deposit_id']) + + deposit = Deposit.objects.get(pk=deposit_id) + assert deposit.collection == deposit_collection + assert deposit.external_id == external_id + assert deposit.status == 'partial' + + # one associated request to a deposit + deposit_requests = DepositRequest.objects.filter(deposit=deposit) + assert len(deposit_requests) == 1 + + atom_entry_data = atom_dataset['entry-data-minimal'] % external_id.encode('utf-8') # noqa + + update_uri = response._headers['location'][1] + + # when updating the first deposit post + response = authenticated_client.post( + update_uri, + content_type='application/atom+xml;type=entry', + data=atom_entry_data, + HTTP_IN_PROGRESS='False') + + # then + assert response.status_code == status.HTTP_201_CREATED + + response_content = parse_xml(BytesIO(response.content)) + deposit_id = int(response_content['deposit_id']) + + deposit = Deposit.objects.get(pk=deposit_id) + assert deposit.collection == deposit_collection + assert deposit.external_id == external_id + assert deposit.status == DEPOSIT_STATUS_DEPOSITED + + assert len(Deposit.objects.all()) == 1 + + # now 2 associated requests to a same deposit + deposit_requests = DepositRequest.objects.filter( + deposit=deposit).order_by('id') + assert len(deposit_requests) == 2 + + atom_entry_data1 = atom_dataset['entry-data1'] + expected_meta = [ + { + 'metadata': parse_xml(atom_entry_data1), + 'raw_metadata': atom_entry_data1.decode('utf-8'), + }, + { + 'metadata': parse_xml(atom_entry_data), + 'raw_metadata': atom_entry_data.decode('utf-8'), + } + ] + + for i, deposit_request in enumerate(deposit_requests): + actual_metadata = deposit_request.metadata + assert actual_metadata == expected_meta[i]['metadata'] + assert deposit_request.raw_metadata == expected_meta[i]['raw_metadata'] + assert bool(deposit_request.archive) is False diff --git a/swh/deposit/tests/api/test_deposit_binary.py b/swh/deposit/tests/api/test_deposit_binary.py --- a/swh/deposit/tests/api/test_deposit_binary.py +++ b/swh/deposit/tests/api/test_deposit_binary.py @@ -3,643 +3,542 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import pytest + from django.core.files.uploadedfile import InMemoryUploadedFile from django.urls import reverse from io import BytesIO from rest_framework import status -from rest_framework.test import APITestCase from swh.deposit.tests import TEST_CONFIG -from swh.deposit.config import COL_IRI, EM_IRI -from swh.deposit.config import DEPOSIT_STATUS_DEPOSITED +from swh.deposit.config import ( + COL_IRI, EM_IRI, DEPOSIT_STATUS_DEPOSITED, +) from swh.deposit.models import Deposit, DepositRequest from swh.deposit.parsers import parse_xml -from ..common import ( - BasicTestCase, WithAuthTestCase, create_arborescence_archive, - FileSystemCreationRoutine -) +from swh.deposit.tests.common import create_arborescence_archive, check_archive + + +def test_post_deposit_binary_no_slug( + authenticated_client, deposit_collection, sample_archive): + """Posting a binary deposit without slug header should return 400 + + """ + url = reverse(COL_IRI, args=[deposit_collection.name]) + + # when + response = authenticated_client.post( + url, + content_type='application/zip', # as zip + data=sample_archive['data'], + # + headers + CONTENT_LENGTH=sample_archive['length'], + HTTP_CONTENT_MD5=sample_archive['md5sum'], + HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', + HTTP_IN_PROGRESS='false', + HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') + + assert b'Missing SLUG header' in response.content + assert response.status_code == status.HTTP_400_BAD_REQUEST + + +def test_post_deposit_binary_support( + authenticated_client, deposit_collection, sample_archive): + """Binary upload with content-type not in [zip,x-tar] should return 415 + + """ + # given + url = reverse(COL_IRI, args=[deposit_collection.name]) + + external_id = 'some-external-id-1' + + # when + response = authenticated_client.post( + url, + content_type='application/octet-stream', + data=sample_archive['data'], + # + headers + CONTENT_LENGTH=sample_archive['length'], + HTTP_SLUG=external_id, + HTTP_CONTENT_MD5=sample_archive['md5sum'], + HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', + HTTP_IN_PROGRESS='false', + HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') + + # then + assert response.status_code == status.HTTP_415_UNSUPPORTED_MEDIA_TYPE + + with pytest.raises(Deposit.DoesNotExist): + Deposit.objects.get(external_id=external_id) + + +def test_post_deposit_binary_upload_ok( + authenticated_client, deposit_collection, sample_archive): + """Binary upload with correct headers should return 201 with receipt + + """ + # given + url = reverse(COL_IRI, args=[deposit_collection.name]) + + external_id = 'some-external-id-1' + + # when + response = authenticated_client.post( + url, + content_type='application/zip', # as zip + data=sample_archive['data'], + # + headers + CONTENT_LENGTH=sample_archive['length'], + # other headers needs HTTP_ prefix to be taken into account + HTTP_SLUG=external_id, + HTTP_CONTENT_MD5=sample_archive['md5sum'], + HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', + HTTP_IN_PROGRESS='false', + HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % ( + sample_archive['name'], )) + + # then + response_content = parse_xml(BytesIO(response.content)) + assert response.status_code == status.HTTP_201_CREATED + deposit_id = response_content['deposit_id'] + + deposit = Deposit.objects.get(pk=deposit_id) + assert deposit.status == DEPOSIT_STATUS_DEPOSITED + assert deposit.external_id == external_id + assert deposit.collection == deposit_collection + assert deposit.swh_id is None + + deposit_request = DepositRequest.objects.get(deposit=deposit) + check_archive(sample_archive['name'], deposit_request.archive.name) + + assert deposit_request.metadata is None + assert deposit_request.raw_metadata is None + + response_content = parse_xml(BytesIO(response.content)) + assert response_content['deposit_archive'] == sample_archive['name'] + assert int(response_content['deposit_id']) == deposit.id + assert response_content['deposit_status'] == deposit.status + + edit_se_iri = reverse('edit_se_iri', + args=[deposit_collection.name, deposit.id]) + + assert response._headers['location'] == ( + 'Location', 'http://testserver' + edit_se_iri) + + +def test_post_deposit_binary_failure_unsupported_packaging_header( + authenticated_client, deposit_collection, sample_archive): + """Bin deposit without supported content_disposition header returns 400 + + """ + # given + url = reverse(COL_IRI, args=[deposit_collection.name]) + + external_id = 'some-external-id' + + # when + response = authenticated_client.post( + url, + content_type='application/zip', + data=sample_archive['data'], + # + headers + CONTENT_LENGTH=sample_archive['length'], + HTTP_SLUG=external_id, + HTTP_CONTENT_MD5=sample_archive['md5sum'], + HTTP_PACKAGING='something-unsupported', + HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') + + # then + assert response.status_code == status.HTTP_400_BAD_REQUEST + with pytest.raises(Deposit.DoesNotExist): + Deposit.objects.get(external_id=external_id) + + +def test_post_deposit_binary_upload_no_content_disposition_header( + authenticated_client, deposit_collection, sample_archive): + """Binary upload without content_disposition header should return 400 + + """ + # given + url = reverse(COL_IRI, args=[deposit_collection.name]) + + external_id = 'some-external-id' + + # when + response = authenticated_client.post( + url, + content_type='application/zip', + data=sample_archive['data'], + # + headers + CONTENT_LENGTH=sample_archive['length'], + HTTP_SLUG=external_id, + HTTP_CONTENT_MD5=sample_archive['md5sum'], + HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', + HTTP_IN_PROGRESS='false') + + # then + assert response.status_code == status.HTTP_400_BAD_REQUEST + with pytest.raises(Deposit.DoesNotExist): + Deposit.objects.get(external_id=external_id) + +def test_post_deposit_mediation_not_supported( + authenticated_client, deposit_collection, sample_archive): + """Binary upload with mediation should return a 412 response -class DepositTestCase(APITestCase, WithAuthTestCase, BasicTestCase, - FileSystemCreationRoutine): - """Try and upload one single deposit + """ + # given + url = reverse(COL_IRI, args=[deposit_collection.name]) + + external_id = 'some-external-id-1' + + # when + response = authenticated_client.post( + url, + content_type='application/zip', + data=sample_archive['data'], + # + headers + CONTENT_LENGTH=sample_archive['length'], + HTTP_SLUG=external_id, + HTTP_CONTENT_MD5=sample_archive['md5sum'], + HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', + HTTP_IN_PROGRESS='false', + HTTP_ON_BEHALF_OF='someone', + HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') + + # then + assert response.status_code == status.HTTP_412_PRECONDITION_FAILED + + with pytest.raises(Deposit.DoesNotExist): + Deposit.objects.get(external_id=external_id) + + +def test_post_deposit_binary_upload_fail_if_upload_size_limit_exceeded( + authenticated_client, deposit_collection, sample_archive, tmp_path): + """Binary upload must not exceed the limit set up... + + """ + tmp_path = str(tmp_path) + url = reverse(COL_IRI, args=[deposit_collection.name]) + + archive = create_arborescence_archive( + tmp_path, 'archive2', 'file2', b'some content in file', + up_to_size=TEST_CONFIG['max_upload_size']) + + external_id = 'some-external-id' + + # when + response = authenticated_client.post( + url, + content_type='application/zip', + data=archive['data'], + # + headers + CONTENT_LENGTH=archive['length'], + HTTP_SLUG=external_id, + HTTP_CONTENT_MD5=archive['md5sum'], + HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', + HTTP_IN_PROGRESS='false', + HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') + + # then + assert response.status_code == status.HTTP_413_REQUEST_ENTITY_TOO_LARGE + assert b'Upload size limit exceeded' in response.content + + with pytest.raises(Deposit.DoesNotExist): + Deposit.objects.get(external_id=external_id) + + +def test_post_deposit_2_post_2_different_deposits( + authenticated_client, deposit_collection, sample_archive): + """2 posting deposits should return 2 different 201 with receipt + + """ + url = reverse(COL_IRI, args=[deposit_collection.name]) + + # when + response = authenticated_client.post( + url, + content_type='application/zip', # as zip + data=sample_archive['data'], + # + headers + CONTENT_LENGTH=sample_archive['length'], + HTTP_SLUG='some-external-id-1', + HTTP_CONTENT_MD5=sample_archive['md5sum'], + HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', + HTTP_IN_PROGRESS='false', + HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') + + # then + assert response.status_code == status.HTTP_201_CREATED + + response_content = parse_xml(BytesIO(response.content)) + deposit_id = response_content['deposit_id'] + + deposit = Deposit.objects.get(pk=deposit_id) + + deposits = Deposit.objects.all() + assert len(deposits) == 1 + assert deposits[0] == deposit + + # second post + response = authenticated_client.post( + url, + content_type='application/x-tar', # as zip + data=sample_archive['data'], + # + headers + CONTENT_LENGTH=sample_archive['length'], + HTTP_SLUG='another-external-id', + HTTP_CONTENT_MD5=sample_archive['md5sum'], + HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', + HTTP_IN_PROGRESS='false', + HTTP_CONTENT_DISPOSITION='attachment; filename=filename1') + + assert response.status_code == status.HTTP_201_CREATED + + response_content = parse_xml(BytesIO(response.content)) + deposit_id2 = response_content['deposit_id'] + + deposit2 = Deposit.objects.get(pk=deposit_id2) + + assert deposit != deposit2 + + deposits = Deposit.objects.all().order_by('id') + assert len(deposits) == 2 + assert list(deposits), [deposit == deposit2] + + +def test_post_deposit_binary_and_post_to_add_another_archive( + authenticated_client, deposit_collection, sample_archive, tmp_path): + """Updating a deposit should return a 201 with receipt + + """ + tmp_path = str(tmp_path) + url = reverse(COL_IRI, args=[deposit_collection.name]) + + external_id = 'some-external-id-1' + + # when + response = authenticated_client.post( + url, + content_type='application/zip', # as zip + data=sample_archive['data'], + # + headers + CONTENT_LENGTH=sample_archive['length'], + HTTP_SLUG=external_id, + HTTP_CONTENT_MD5=sample_archive['md5sum'], + HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', + HTTP_IN_PROGRESS='true', + HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % ( + sample_archive['name'], )) + + # then + assert response.status_code == status.HTTP_201_CREATED + + response_content = parse_xml(BytesIO(response.content)) + deposit_id = response_content['deposit_id'] + + deposit = Deposit.objects.get(pk=deposit_id) + assert deposit.status == 'partial' + assert deposit.external_id == external_id + assert deposit.collection == deposit_collection + assert deposit.swh_id is None + + deposit_request = DepositRequest.objects.get(deposit=deposit) + assert deposit_request.deposit == deposit + assert deposit_request.type == 'archive' + check_archive(sample_archive['name'], deposit_request.archive.name) + + # 2nd archive to upload + archive2 = create_arborescence_archive( + tmp_path, 'archive2', 'file2', b'some other content in file') + + # uri to update the content + update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit_id]) + + # adding another archive for the deposit and finalizing it + response = authenticated_client.post( + update_uri, + content_type='application/zip', # as zip + data=archive2['data'], + # + headers + CONTENT_LENGTH=archive2['length'], + HTTP_SLUG=external_id, + HTTP_CONTENT_MD5=archive2['md5sum'], + HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', + HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % ( + archive2['name'])) + + assert response.status_code == status.HTTP_201_CREATED + response_content = parse_xml(BytesIO(response.content)) + + deposit = Deposit.objects.get(pk=deposit_id) + assert deposit.status == DEPOSIT_STATUS_DEPOSITED + assert deposit.external_id == external_id + assert deposit.collection == deposit_collection + assert deposit.swh_id is None + + deposit_requests = list(DepositRequest.objects.filter(deposit=deposit). + order_by('id')) + + # 2 deposit requests for the same deposit + assert len(deposit_requests) == 2 + assert deposit_requests[0].deposit == deposit + assert deposit_requests[0].type == 'archive' + check_archive(sample_archive['name'], deposit_requests[0].archive.name) + + assert deposit_requests[1].deposit == deposit + assert deposit_requests[1].type == 'archive' + check_archive(archive2['name'], deposit_requests[1].archive.name) + + # only 1 deposit in db + deposits = Deposit.objects.all() + assert len(deposits) == 1 + + +def test_post_deposit_then_update_refused( + authenticated_client, deposit_collection, + sample_archive, atom_dataset, tmp_path): + """Updating a deposit with status 'ready' should return a 400 """ - def setUp(self): - super().setUp() - - self.atom_entry_data0 = b""" - - Awesome Compiler - hal - urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a - %s - 2017-10-07T15:17:08Z - some awesome author - something - awesome-compiler - This is an awesome compiler destined to -awesomely compile stuff -and other stuff - compiler,programming,language - 2005-10-07T17:17:08Z - 2005-10-07T17:17:08Z - release note - related link - - Awesome - https://hoster.org/awesome-compiler - GNU/Linux - 0.0.1 - running - all -""" - - self.atom_entry_data1 = b""" - - hal - urn:uuid:2225c695-cfb8-4ebb-aaaa-80da344efa6a - 2017-10-07T15:17:08Z - some awesome author - something - awesome-compiler - This is an awesome compiler destined to -awesomely compile stuff -and other stuff - compiler,programming,language - 2005-10-07T17:17:08Z - 2005-10-07T17:17:08Z - release note - related link - - Awesome - https://hoster.org/awesome-compiler - GNU/Linux - 0.0.1 - running - all -""" - - self.atom_entry_data2 = b""" - - %s -""" - - self.atom_entry_data_empty_body = b""" -""" - - self.atom_entry_data3 = b""" - - something -""" - - self.data_atom_entry_ok = b""" - - Title - urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a - 2005-10-07T17:17:08Z - Contributor - The abstract - - - The abstract - Access Rights - Alternative Title - Date Available - Bibliographic Citation # noqa - Contributor - Description - Has Part - Has Version - Identifier - Is Part Of - Publisher - References - Rights Holder - Source - Title - Type - -""" - - def test_post_deposit_binary_without_slug_header_is_bad_request(self): - """Posting a binary deposit without slug header should return 400 - - """ - url = reverse(COL_IRI, args=[self.collection.name]) - - # when - response = self.client.post( - url, - content_type='application/zip', # as zip - data=self.archive['data'], - # + headers - CONTENT_LENGTH=self.archive['length'], - HTTP_CONTENT_MD5=self.archive['md5sum'], - HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', - HTTP_IN_PROGRESS='false', - HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') - - self.assertIn(b'Missing SLUG header', response.content) - self.assertEqual(response.status_code, - status.HTTP_400_BAD_REQUEST) - - def test_post_deposit_binary_upload_final_and_status_check(self): - """Binary upload with correct headers should return 201 with receipt - - """ - # given - url = reverse(COL_IRI, args=[self.collection.name]) - - external_id = 'some-external-id-1' - - # when - response = self.client.post( - url, - content_type='application/zip', # as zip - data=self.archive['data'], - # + headers - CONTENT_LENGTH=self.archive['length'], - # other headers needs HTTP_ prefix to be taken into account - HTTP_SLUG=external_id, - HTTP_CONTENT_MD5=self.archive['md5sum'], - HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', - HTTP_IN_PROGRESS='false', - HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % ( - self.archive['name'], )) - - # then - response_content = parse_xml(BytesIO(response.content)) - self.assertEqual(response.status_code, status.HTTP_201_CREATED) - deposit_id = response_content['deposit_id'] - - deposit = Deposit.objects.get(pk=deposit_id) - self.assertEqual(deposit.status, DEPOSIT_STATUS_DEPOSITED) - self.assertEqual(deposit.external_id, external_id) - self.assertEqual(deposit.collection, self.collection) - self.assertEqual(deposit.client, self.user) - self.assertIsNone(deposit.swh_id) - - deposit_request = DepositRequest.objects.get(deposit=deposit) - self.assertEqual(deposit_request.deposit, deposit) - self.assertRegex(deposit_request.archive.name, self.archive['name']) - self.assertIsNone(deposit_request.metadata) - self.assertIsNone(deposit_request.raw_metadata) - - response_content = parse_xml(BytesIO(response.content)) - self.assertEqual(response_content['deposit_archive'], - self.archive['name']) - self.assertEqual(int(response_content['deposit_id']), - deposit.id) - self.assertEqual(response_content['deposit_status'], - deposit.status) - - edit_se_iri = reverse('edit_se_iri', - args=[self.collection.name, deposit.id]) - - self.assertEqual(response._headers['location'], - ('Location', 'http://testserver' + edit_se_iri)) - - def test_post_deposit_binary_upload_supports_zip_or_tar(self): - """Binary upload with content-type not in [zip,x-tar] should return 415 - - """ - # given - url = reverse(COL_IRI, args=[self.collection.name]) - - external_id = 'some-external-id-1' - - # when - response = self.client.post( - url, - content_type='application/octet-stream', - data=self.archive['data'], - # + headers - CONTENT_LENGTH=self.archive['length'], - HTTP_SLUG=external_id, - HTTP_CONTENT_MD5=self.archive['md5sum'], - HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', - HTTP_IN_PROGRESS='false', - HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') - - # then - self.assertEqual(response.status_code, - status.HTTP_415_UNSUPPORTED_MEDIA_TYPE) - - with self.assertRaises(Deposit.DoesNotExist): - Deposit.objects.get(external_id=external_id) - - def test_post_deposit_binary_fails_if_unsupported_packaging_header( - self): - """Bin deposit without supported content_disposition header returns 400 - - """ - # given - url = reverse(COL_IRI, args=[self.collection.name]) - - external_id = 'some-external-id' - - # when - response = self.client.post( - url, - content_type='application/zip', - data=self.archive['data'], - # + headers - CONTENT_LENGTH=self.archive['length'], - HTTP_SLUG=external_id, - HTTP_CONTENT_MD5=self.archive['md5sum'], - HTTP_PACKAGING='something-unsupported', - HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') - - # then - self.assertEqual(response.status_code, - status.HTTP_400_BAD_REQUEST) - with self.assertRaises(Deposit.DoesNotExist): - Deposit.objects.get(external_id=external_id) - - def test_post_deposit_binary_upload_fail_if_no_content_disposition_header( - self): - """Binary upload without content_disposition header should return 400 - - """ - # given - url = reverse(COL_IRI, args=[self.collection.name]) - - external_id = 'some-external-id' - - # when - response = self.client.post( - url, - content_type='application/zip', - data=self.archive['data'], - # + headers - CONTENT_LENGTH=self.archive['length'], - HTTP_SLUG=external_id, - HTTP_CONTENT_MD5=self.archive['md5sum'], - HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', - HTTP_IN_PROGRESS='false') - - # then - self.assertEqual(response.status_code, - status.HTTP_400_BAD_REQUEST) - with self.assertRaises(Deposit.DoesNotExist): - Deposit.objects.get(external_id=external_id) - - def test_post_deposit_mediation_not_supported(self): - """Binary upload with mediation should return a 412 response - - """ - # given - url = reverse(COL_IRI, args=[self.collection.name]) - - external_id = 'some-external-id-1' - - # when - response = self.client.post( - url, - content_type='application/zip', - data=self.archive['data'], - # + headers - CONTENT_LENGTH=self.archive['length'], - HTTP_SLUG=external_id, - HTTP_CONTENT_MD5=self.archive['md5sum'], - HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', - HTTP_IN_PROGRESS='false', - HTTP_ON_BEHALF_OF='someone', - HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') - - # then - self.assertEqual(response.status_code, - status.HTTP_412_PRECONDITION_FAILED) - - with self.assertRaises(Deposit.DoesNotExist): - Deposit.objects.get(external_id=external_id) - - def test_post_deposit_binary_upload_fail_if_upload_size_limit_exceeded( - self): - """Binary upload must not exceed the limit set up... - - """ - # given - url = reverse(COL_IRI, args=[self.collection.name]) - - archive = create_arborescence_archive( - self.root_path, 'archive2', 'file2', b'some content in file', - up_to_size=TEST_CONFIG['max_upload_size']) - - external_id = 'some-external-id' - - # when - response = self.client.post( - url, - content_type='application/zip', - data=archive['data'], - # + headers - CONTENT_LENGTH=archive['length'], - HTTP_SLUG=external_id, - HTTP_CONTENT_MD5=archive['md5sum'], - HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', - HTTP_IN_PROGRESS='false', - HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') - - # then - self.assertEqual(response.status_code, - status.HTTP_413_REQUEST_ENTITY_TOO_LARGE) - self.assertRegex(response.content, b'Upload size limit exceeded') - - with self.assertRaises(Deposit.DoesNotExist): - Deposit.objects.get(external_id=external_id) - - def test_post_deposit_2_post_2_different_deposits(self): - """2 posting deposits should return 2 different 201 with receipt - - """ - url = reverse(COL_IRI, args=[self.collection.name]) - - # when - response = self.client.post( - url, - content_type='application/zip', # as zip - data=self.archive['data'], - # + headers - CONTENT_LENGTH=self.archive['length'], - HTTP_SLUG='some-external-id-1', - HTTP_CONTENT_MD5=self.archive['md5sum'], - HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', - HTTP_IN_PROGRESS='false', - HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') - - # then - self.assertEqual(response.status_code, status.HTTP_201_CREATED) - - response_content = parse_xml(BytesIO(response.content)) - deposit_id = response_content['deposit_id'] - - deposit = Deposit.objects.get(pk=deposit_id) - - deposits = Deposit.objects.all() - self.assertEqual(len(deposits), 1) - self.assertEqual(deposits[0], deposit) - - # second post - response = self.client.post( - url, - content_type='application/x-tar', # as zip - data=self.archive['data'], - # + headers - CONTENT_LENGTH=self.archive['length'], - HTTP_SLUG='another-external-id', - HTTP_CONTENT_MD5=self.archive['md5sum'], - HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', - HTTP_IN_PROGRESS='false', - HTTP_CONTENT_DISPOSITION='attachment; filename=filename1') - - self.assertEqual(response.status_code, status.HTTP_201_CREATED) - - response_content = parse_xml(BytesIO(response.content)) - deposit_id2 = response_content['deposit_id'] - - deposit2 = Deposit.objects.get(pk=deposit_id2) - - self.assertNotEqual(deposit, deposit2) - - deposits = Deposit.objects.all().order_by('id') - self.assertEqual(len(deposits), 2) - self.assertEqual(list(deposits), [deposit, deposit2]) - - def test_post_deposit_binary_and_post_to_add_another_archive(self): - """Updating a deposit should return a 201 with receipt - - """ - # given - url = reverse(COL_IRI, args=[self.collection.name]) - - external_id = 'some-external-id-1' - - # when - response = self.client.post( - url, - content_type='application/zip', # as zip - data=self.archive['data'], - # + headers - CONTENT_LENGTH=self.archive['length'], - HTTP_SLUG=external_id, - HTTP_CONTENT_MD5=self.archive['md5sum'], - HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', - HTTP_IN_PROGRESS='true', - HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % ( - self.archive['name'], )) - - # then - self.assertEqual(response.status_code, status.HTTP_201_CREATED) - - response_content = parse_xml(BytesIO(response.content)) - deposit_id = response_content['deposit_id'] - - deposit = Deposit.objects.get(pk=deposit_id) - self.assertEqual(deposit.status, 'partial') - self.assertEqual(deposit.external_id, external_id) - self.assertEqual(deposit.collection, self.collection) - self.assertEqual(deposit.client, self.user) - self.assertIsNone(deposit.swh_id) - - deposit_request = DepositRequest.objects.get(deposit=deposit) - self.assertEqual(deposit_request.deposit, deposit) - self.assertEqual(deposit_request.type, 'archive') - self.assertRegex(deposit_request.archive.name, self.archive['name']) - - # 2nd archive to upload - archive2 = create_arborescence_archive( - self.root_path, 'archive2', 'file2', b'some other content in file') - - # uri to update the content - update_uri = reverse(EM_IRI, args=[self.collection.name, deposit_id]) - - # adding another archive for the deposit and finalizing it - response = self.client.post( - update_uri, - content_type='application/zip', # as zip - data=archive2['data'], - # + headers - CONTENT_LENGTH=archive2['length'], - HTTP_SLUG=external_id, - HTTP_CONTENT_MD5=archive2['md5sum'], - HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', - HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % ( - archive2['name'])) - - self.assertEqual(response.status_code, status.HTTP_201_CREATED) - response_content = parse_xml(BytesIO(response.content)) - - deposit = Deposit.objects.get(pk=deposit_id) - self.assertEqual(deposit.status, DEPOSIT_STATUS_DEPOSITED) - self.assertEqual(deposit.external_id, external_id) - self.assertEqual(deposit.collection, self.collection) - self.assertEqual(deposit.client, self.user) - self.assertIsNone(deposit.swh_id) - - deposit_requests = list(DepositRequest.objects.filter(deposit=deposit). - order_by('id')) - - # 2 deposit requests for the same deposit - self.assertEqual(len(deposit_requests), 2) - self.assertEqual(deposit_requests[0].deposit, deposit) - self.assertEqual(deposit_requests[0].type, 'archive') - self.assertRegex(deposit_requests[0].archive.name, - self.archive['name']) - - self.assertEqual(deposit_requests[1].deposit, deposit) - self.assertEqual(deposit_requests[1].type, 'archive') - self.assertRegex(deposit_requests[1].archive.name, - archive2['name']) - - # only 1 deposit in db - deposits = Deposit.objects.all() - self.assertEqual(len(deposits), 1) - - def test_post_deposit_then_post_or_put_is_refused_when_status_ready(self): - """Updating a deposit with status 'ready' should return a 400 - - """ - url = reverse(COL_IRI, args=[self.collection.name]) - - external_id = 'some-external-id-1' - - # when - response = self.client.post( - url, - content_type='application/zip', # as zip - data=self.archive['data'], - # + headers - CONTENT_LENGTH=self.archive['length'], - HTTP_SLUG=external_id, - HTTP_CONTENT_MD5=self.archive['md5sum'], - HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', - HTTP_IN_PROGRESS='false', - HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') - - # then - self.assertEqual(response.status_code, status.HTTP_201_CREATED) - - response_content = parse_xml(BytesIO(response.content)) - deposit_id = response_content['deposit_id'] - - deposit = Deposit.objects.get(pk=deposit_id) - self.assertEqual(deposit.status, DEPOSIT_STATUS_DEPOSITED) - self.assertEqual(deposit.external_id, external_id) - self.assertEqual(deposit.collection, self.collection) - self.assertEqual(deposit.client, self.user) - self.assertIsNone(deposit.swh_id) - - deposit_request = DepositRequest.objects.get(deposit=deposit) - self.assertEqual(deposit_request.deposit, deposit) - self.assertRegex(deposit_request.archive.name, 'filename0') - - # updating/adding is forbidden - - # uri to update the content - edit_se_iri = reverse( - 'edit_se_iri', args=[self.collection.name, deposit_id]) - em_iri = reverse( - 'em_iri', args=[self.collection.name, deposit_id]) - - # Testing all update/add endpoint should fail - # since the status is ready - - archive2 = create_arborescence_archive( - self.root_path, 'archive2', 'file2', b'some content in file 2') - - # replacing file is no longer possible since the deposit's - # status is ready - r = self.client.put( - em_iri, - content_type='application/zip', - data=archive2['data'], - CONTENT_LENGTH=archive2['length'], - HTTP_SLUG=external_id, - HTTP_CONTENT_MD5=archive2['md5sum'], - HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', - HTTP_IN_PROGRESS='false', - HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') - - self.assertEqual(r.status_code, status.HTTP_400_BAD_REQUEST) - - # adding file is no longer possible since the deposit's status - # is ready - r = self.client.post( - em_iri, - content_type='application/zip', - data=archive2['data'], - CONTENT_LENGTH=archive2['length'], - HTTP_SLUG=external_id, - HTTP_CONTENT_MD5=archive2['md5sum'], - HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', - HTTP_IN_PROGRESS='false', - HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') - - self.assertEqual(r.status_code, status.HTTP_400_BAD_REQUEST) - - # replacing metadata is no longer possible since the deposit's - # status is ready - r = self.client.put( - edit_se_iri, - content_type='application/atom+xml;type=entry', - data=self.data_atom_entry_ok, - CONTENT_LENGTH=len(self.data_atom_entry_ok), - HTTP_SLUG=external_id) - - self.assertEqual(r.status_code, status.HTTP_400_BAD_REQUEST) - - # adding new metadata is no longer possible since the - # deposit's status is ready - r = self.client.post( - edit_se_iri, - content_type='application/atom+xml;type=entry', - data=self.data_atom_entry_ok, - CONTENT_LENGTH=len(self.data_atom_entry_ok), - HTTP_SLUG=external_id) - - self.assertEqual(r.status_code, status.HTTP_400_BAD_REQUEST) - - archive_content = b'some content representing archive' - archive = InMemoryUploadedFile( - BytesIO(archive_content), - field_name='archive0', - name='archive0', - content_type='application/zip', - size=len(archive_content), - charset=None) - - atom_entry = InMemoryUploadedFile( - BytesIO(self.data_atom_entry_ok), - field_name='atom0', - name='atom0', - content_type='application/atom+xml; charset="utf-8"', - size=len(self.data_atom_entry_ok), - charset='utf-8') - - # replacing multipart metadata is no longer possible since the - # deposit's status is ready - r = self.client.put( - edit_se_iri, - format='multipart', - data={ - 'archive': archive, - 'atom_entry': atom_entry, - }) - - self.assertEqual(r.status_code, status.HTTP_400_BAD_REQUEST) - - # adding new metadata is no longer possible since the - # deposit's status is ready - r = self.client.post( - edit_se_iri, - format='multipart', - data={ - 'archive': archive, - 'atom_entry': atom_entry, - }) - - self.assertEqual(r.status_code, status.HTTP_400_BAD_REQUEST) + tmp_path = str(tmp_path) + url = reverse(COL_IRI, args=[deposit_collection.name]) + + external_id = 'some-external-id-1' + + # when + response = authenticated_client.post( + url, + content_type='application/zip', # as zip + data=sample_archive['data'], + # + headers + CONTENT_LENGTH=sample_archive['length'], + HTTP_SLUG=external_id, + HTTP_CONTENT_MD5=sample_archive['md5sum'], + HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', + HTTP_IN_PROGRESS='false', + HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') + + # then + assert response.status_code == status.HTTP_201_CREATED + + response_content = parse_xml(BytesIO(response.content)) + deposit_id = response_content['deposit_id'] + + deposit = Deposit.objects.get(pk=deposit_id) + assert deposit.status == DEPOSIT_STATUS_DEPOSITED + assert deposit.external_id == external_id + assert deposit.collection == deposit_collection + assert deposit.swh_id is None + + deposit_request = DepositRequest.objects.get(deposit=deposit) + assert deposit_request.deposit == deposit + check_archive('filename0', deposit_request.archive.name) + + # updating/adding is forbidden + + # uri to update the content + edit_se_iri = reverse( + 'edit_se_iri', args=[deposit_collection.name, deposit_id]) + em_iri = reverse( + 'em_iri', args=[deposit_collection.name, deposit_id]) + + # Testing all update/add endpoint should fail + # since the status is ready + + archive2 = create_arborescence_archive( + tmp_path, 'archive2', 'file2', b'some content in file 2') + + # replacing file is no longer possible since the deposit's + # status is ready + r = authenticated_client.put( + em_iri, + content_type='application/zip', + data=archive2['data'], + CONTENT_LENGTH=archive2['length'], + HTTP_SLUG=external_id, + HTTP_CONTENT_MD5=archive2['md5sum'], + HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', + HTTP_IN_PROGRESS='false', + HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') + + assert r.status_code == status.HTTP_400_BAD_REQUEST + + # adding file is no longer possible since the deposit's status + # is ready + r = authenticated_client.post( + em_iri, + content_type='application/zip', + data=archive2['data'], + CONTENT_LENGTH=archive2['length'], + HTTP_SLUG=external_id, + HTTP_CONTENT_MD5=archive2['md5sum'], + HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', + HTTP_IN_PROGRESS='false', + HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') + + assert r.status_code == status.HTTP_400_BAD_REQUEST + + # replacing metadata is no longer possible since the deposit's + # status is ready + r = authenticated_client.put( + edit_se_iri, + content_type='application/atom+xml;type=entry', + data=atom_dataset['entry-data-deposit-binary'], + CONTENT_LENGTH=len(atom_dataset['entry-data-deposit-binary']), + HTTP_SLUG=external_id) + + assert r.status_code == status.HTTP_400_BAD_REQUEST + + # adding new metadata is no longer possible since the + # deposit's status is ready + r = authenticated_client.post( + edit_se_iri, + content_type='application/atom+xml;type=entry', + data=atom_dataset['entry-data-deposit-binary'], + CONTENT_LENGTH=len(atom_dataset['entry-data-deposit-binary']), + HTTP_SLUG=external_id) + + assert r.status_code == status.HTTP_400_BAD_REQUEST + + archive_content = b'some content representing archive' + archive = InMemoryUploadedFile( + BytesIO(archive_content), + field_name='archive0', + name='archive0', + content_type='application/zip', + size=len(archive_content), + charset=None) + + atom_entry = InMemoryUploadedFile( + BytesIO(atom_dataset['entry-data-deposit-binary']), + field_name='atom0', + name='atom0', + content_type='application/atom+xml; charset="utf-8"', + size=len(atom_dataset['entry-data-deposit-binary']), + charset='utf-8') + + # replacing multipart metadata is no longer possible since the + # deposit's status is ready + r = authenticated_client.put( + edit_se_iri, + format='multipart', + data={ + 'archive': archive, + 'atom_entry': atom_entry, + }) + + assert r.status_code == status.HTTP_400_BAD_REQUEST + + # adding new metadata is no longer possible since the + # deposit's status is ready + r = authenticated_client.post( + edit_se_iri, + format='multipart', + data={ + 'archive': archive, + 'atom_entry': atom_entry, + }) + + assert r.status_code == status.HTTP_400_BAD_REQUEST diff --git a/swh/deposit/tests/api/test_deposit_delete.py b/swh/deposit/tests/api/test_deposit_delete.py --- a/swh/deposit/tests/api/test_deposit_delete.py +++ b/swh/deposit/tests/api/test_deposit_delete.py @@ -3,111 +3,119 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +from collections import defaultdict from django.urls import reverse - from rest_framework import status -from rest_framework.test import APITestCase +from typing import Mapping -from swh.deposit.config import EDIT_SE_IRI, EM_IRI, ARCHIVE_KEY, METADATA_KEY -from swh.deposit.config import DEPOSIT_STATUS_DEPOSITED +from swh.deposit.config import ( + EDIT_SE_IRI, EM_IRI, ARCHIVE_KEY, METADATA_KEY, + DEPOSIT_STATUS_DEPOSITED +) from swh.deposit.models import Deposit, DepositRequest -from ..common import BasicTestCase, WithAuthTestCase, CommonCreationRoutine - - -class DepositDeleteTest(APITestCase, WithAuthTestCase, BasicTestCase, - CommonCreationRoutine): - - def test_delete_archive_on_partial_deposit_works(self): - """Removing partial deposit's archive should return a 204 response - - """ - # given - deposit_id = self.create_deposit_partial() - deposit = Deposit.objects.get(pk=deposit_id) - deposit_requests = DepositRequest.objects.filter(deposit=deposit) - - self.assertEqual(len(deposit_requests), 2) - for dr in deposit_requests: - if dr.type == ARCHIVE_KEY: - continue - elif dr.type == METADATA_KEY: - continue - else: - self.fail('only archive and metadata type should exist ' - 'in this test context') - - # when - update_uri = reverse(EM_IRI, args=[self.collection.name, deposit_id]) - response = self.client.delete(update_uri) - # then - self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) - - deposit = Deposit.objects.get(pk=deposit_id) - requests = list(DepositRequest.objects.filter(deposit=deposit)) - - self.assertEqual(len(requests), 2) - self.assertEqual(requests[0].type, 'metadata') - self.assertEqual(requests[1].type, 'metadata') - - def test_delete_archive_on_undefined_deposit_fails(self): - """Delete undefined deposit returns a 404 response - - """ - # when - update_uri = reverse(EM_IRI, args=[self.collection.name, 999]) - response = self.client.delete(update_uri) - # then - self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) - - def test_delete_archive_on_non_partial_deposit_fails(self): - """Delete !partial status deposit should return a 400 response""" - deposit_id = self.create_deposit_ready() - deposit = Deposit.objects.get(pk=deposit_id) - self.assertEqual(deposit.status, DEPOSIT_STATUS_DEPOSITED) - - # when - update_uri = reverse(EM_IRI, args=[self.collection.name, deposit_id]) - response = self.client.delete(update_uri) - # then - self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) - deposit = Deposit.objects.get(pk=deposit_id) - self.assertIsNotNone(deposit) - - def test_delete_partial_deposit_works(self): - """Delete deposit should return a 204 response - - """ - # given - deposit_id = self.create_simple_deposit_partial() - deposit = Deposit.objects.get(pk=deposit_id) - assert deposit.id == deposit_id - - # when - url = reverse(EDIT_SE_IRI, args=[self.collection.name, deposit_id]) - response = self.client.delete(url) - # then - self.assertEqual(response.status_code, - status.HTTP_204_NO_CONTENT) - deposit_requests = list(DepositRequest.objects.filter(deposit=deposit)) - self.assertEqual(deposit_requests, []) - deposits = list(Deposit.objects.filter(pk=deposit_id)) - self.assertEqual(deposits, []) - - def test_delete_on_edit_se_iri_cannot_delete_non_partial_deposit(self): - """Delete !partial deposit should return a 400 response - - """ - # given - deposit_id = self.create_deposit_ready() - deposit = Deposit.objects.get(pk=deposit_id) - assert deposit.id == deposit_id - - # when - url = reverse(EDIT_SE_IRI, args=[self.collection.name, deposit_id]) - response = self.client.delete(url) - # then - self.assertEqual(response.status_code, - status.HTTP_400_BAD_REQUEST) - deposit = Deposit.objects.get(pk=deposit_id) - self.assertIsNotNone(deposit) + + +def count_deposit_request_types(deposit_requests) -> Mapping[str, int]: + deposit_request_types = defaultdict(int) + for dr in deposit_requests: + deposit_request_types[dr.type] += 1 + return deposit_request_types + + +def test_delete_archive_on_partial_deposit_works( + authenticated_client, partial_deposit_with_metadata, + deposit_collection): + """Removing partial deposit's archive should return a 204 response + + """ + deposit_id = partial_deposit_with_metadata.id + deposit = Deposit.objects.get(pk=deposit_id) + deposit_requests = DepositRequest.objects.filter(deposit=deposit) + + # deposit request type: 'archive', 1 'metadata' + deposit_request_types = count_deposit_request_types(deposit_requests) + assert deposit_request_types == { + ARCHIVE_KEY: 1, + METADATA_KEY: 1 + } + + # when + update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit_id]) + response = authenticated_client.delete(update_uri) + + # then + assert response.status_code == status.HTTP_204_NO_CONTENT + + deposit = Deposit.objects.get(pk=deposit_id) + deposit_requests2 = DepositRequest.objects.filter(deposit=deposit) + + deposit_request_types = count_deposit_request_types(deposit_requests2) + assert deposit_request_types == { + METADATA_KEY: 1 + } + + +def test_delete_archive_on_undefined_deposit_fails( + authenticated_client, deposit_collection, sample_archive): + """Delete undefined deposit returns a 404 response + + """ + # when + update_uri = reverse(EM_IRI, args=[deposit_collection.name, 999]) + response = authenticated_client.delete(update_uri) + # then + assert response.status_code == status.HTTP_404_NOT_FOUND + + +def test_delete_non_partial_deposit( + authenticated_client, deposit_collection, deposited_deposit): + """Delete !partial status deposit should return a 400 response + + """ + deposit = deposited_deposit + assert deposit.status == DEPOSIT_STATUS_DEPOSITED + + # when + update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit.id]) + response = authenticated_client.delete(update_uri) + # then + assert response.status_code == status.HTTP_400_BAD_REQUEST + deposit = Deposit.objects.get(pk=deposit.id) + assert deposit is not None + + +def test_delete_partial_deposit( + authenticated_client, deposit_collection, partial_deposit): + """Delete deposit should return a 204 response + + """ + # given + deposit = partial_deposit + + # when + url = reverse(EDIT_SE_IRI, args=[deposit_collection.name, deposit.id]) + response = authenticated_client.delete(url) + # then + assert response.status_code == status.HTTP_204_NO_CONTENT + deposit_requests = list(DepositRequest.objects.filter(deposit=deposit)) + assert deposit_requests == [] + deposits = list(Deposit.objects.filter(pk=deposit.id)) + assert deposits == [] + + +def test_delete_on_edit_se_iri_cannot_delete_non_partial_deposit( + authenticated_client, deposit_collection, complete_deposit): + """Delete !partial deposit should return a 400 response + + """ + # given + deposit = complete_deposit + + # when + url = reverse(EDIT_SE_IRI, args=[deposit_collection.name, deposit.id]) + response = authenticated_client.delete(url) + # then + assert response.status_code == status.HTTP_400_BAD_REQUEST + deposit = Deposit.objects.get(pk=deposit.id) + assert deposit is not None diff --git a/swh/deposit/tests/api/test_deposit_list.py b/swh/deposit/tests/api/test_deposit_list.py --- a/swh/deposit/tests/api/test_deposit_list.py +++ b/swh/deposit/tests/api/test_deposit_list.py @@ -3,92 +3,84 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from django.urls import reverse import pytest + +from django.urls import reverse from rest_framework import status -from rest_framework.test import APITestCase from swh.deposit.api.converters import convert_status_detail - -from ...config import DEPOSIT_STATUS_PARTIAL, PRIVATE_LIST_DEPOSITS -from ..common import BasicTestCase, WithAuthTestCase, CommonCreationRoutine -from ...models import Deposit +from swh.deposit.config import ( + DEPOSIT_STATUS_PARTIAL, PRIVATE_LIST_DEPOSITS, DEPOSIT_STATUS_DEPOSITED +) -@pytest.mark.fs -class CheckDepositListTest(APITestCase, WithAuthTestCase, - BasicTestCase, CommonCreationRoutine): - """Check deposit list endpoints. +@pytest.mark.django_db +def test_deposit_list( + partial_deposit, deposited_deposit, authenticated_client): + """Deposit list api should return the deposits """ - def setUp(self): - super().setUp() + status_detail = { + 'url': { + 'summary': 'At least one compatible url field. Failed', + 'fields': ['testurl'], + }, + 'metadata': [ + { + 'summary': 'Mandatory fields missing', + 'fields': ['9', 10, 1.212], + }, + ], + 'archive': [ + { + 'summary': 'Invalid archive', + 'fields': ['3'], + }, + { + 'summary': 'Unsupported archive', + 'fields': [2], + } + ], + } + partial_deposit.status_detail = status_detail + partial_deposit.save() - def test_deposit_list(self): - """Deposit list api should return the deposits + deposit_id = partial_deposit.id + deposit_id2 = deposited_deposit.id - """ - deposit_id = self.create_deposit_partial() - # amend the deposit with a status_detail - deposit = Deposit.objects.get(pk=deposit_id) - status_detail = { - 'url': { - 'summary': 'At least one compatible url field. Failed', - 'fields': ['testurl'], - }, - 'metadata': [ - { - 'summary': 'Mandatory fields missing', - 'fields': ['9', 10, 1.212], - }, - ], - 'archive': [ - { - 'summary': 'Invalid archive', - 'fields': ['3'], - }, - { - 'summary': 'Unsupported archive', - 'fields': [2], - } - ], - } - deposit.status_detail = status_detail - deposit.save() + # NOTE: does not work as documented + # https://docs.djangoproject.com/en/1.11/ref/urlresolvers/#django.core.urlresolvers.reverse # noqa + # url = reverse(PRIVATE_LIST_DEPOSITS, kwargs={'page_size': 1}) + main_url = reverse(PRIVATE_LIST_DEPOSITS) + url = '%s?page_size=1' % main_url + response = authenticated_client.get(url) - deposit_id2 = self.create_deposit_partial() + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert data['count'] == 2 # 2 deposits + expected_next = '%s?page=2&page_size=1' % main_url + assert data['next'].endswith(expected_next) is True + assert data['previous'] is None + assert len(data['results']) == 1 # page of size 1 + deposit = data['results'][0] + assert deposit['id'] == deposit_id + assert deposit['status'] == DEPOSIT_STATUS_PARTIAL + expected_status_detail = convert_status_detail(status_detail) + assert deposit['status_detail'] == expected_status_detail - # NOTE: does not work as documented - # https://docs.djangoproject.com/en/1.11/ref/urlresolvers/#django.core.urlresolvers.reverse # noqa - # url = reverse(PRIVATE_LIST_DEPOSITS, kwargs={'page_size': 1}) - main_url = reverse(PRIVATE_LIST_DEPOSITS) - url = '%s?page_size=1' % main_url - response = self.client.get(url) + # then 2nd page + response2 = authenticated_client.get(expected_next) - self.assertEqual(response.status_code, status.HTTP_200_OK) - data = response.json() - self.assertEqual(data['count'], 2) # 2 deposits - expected_next = '%s?page=2&page_size=1' % main_url - self.assertTrue(data['next'].endswith(expected_next)) - self.assertIsNone(data['previous']) - self.assertEqual(len(data['results']), 1) # page of size 1 - deposit = data['results'][0] - self.assertEqual(deposit['id'], deposit_id) - self.assertEqual(deposit['status'], DEPOSIT_STATUS_PARTIAL) - expected_status_detail = convert_status_detail(status_detail) - self.assertEqual(deposit['status_detail'], expected_status_detail) + assert response2.status_code == status.HTTP_200_OK + data2 = response2.json() - # then 2nd page - response2 = self.client.get(expected_next) + assert data2['count'] == 2 # still 2 deposits + assert data2['next'] is None - self.assertEqual(response2.status_code, status.HTTP_200_OK) - data2 = response2.json() + expected_previous = '%s?page_size=1' % main_url + assert data2['previous'].endswith(expected_previous) is True + assert len(data2['results']) == 1 # page of size 1 - self.assertEqual(data2['count'], 2) # still 2 deposits - self.assertIsNone(data2['next']) - expected_previous = '%s?page_size=1' % main_url - self.assertTrue(data2['previous'].endswith(expected_previous)) - self.assertEqual(len(data2['results']), 1) # page of size 1 - deposit2 = data2['results'][0] - self.assertEqual(deposit2['id'], deposit_id2) - self.assertEqual(deposit2['status'], DEPOSIT_STATUS_PARTIAL) + deposit2 = data2['results'][0] + assert deposit2['id'] == deposit_id2 + assert deposit2['status'] == DEPOSIT_STATUS_DEPOSITED diff --git a/swh/deposit/tests/api/test_deposit_multipart.py b/swh/deposit/tests/api/test_deposit_multipart.py --- a/swh/deposit/tests/api/test_deposit_multipart.py +++ b/swh/deposit/tests/api/test_deposit_multipart.py @@ -7,442 +7,383 @@ from django.urls import reverse from io import BytesIO from rest_framework import status -from rest_framework.test import APITestCase -from swh.deposit.config import COL_IRI -from swh.deposit.config import DEPOSIT_STATUS_DEPOSITED +from swh.deposit.config import ( + COL_IRI, DEPOSIT_STATUS_DEPOSITED +) from swh.deposit.models import Deposit, DepositRequest from swh.deposit.parsers import parse_xml -from ..common import BasicTestCase, WithAuthTestCase -from ..common import FileSystemCreationRoutine +from swh.deposit.tests.common import check_archive + + +def test_post_deposit_multipart_without_slug_header_is_bad_request( + authenticated_client, deposit_collection, atom_dataset): + # given + url = reverse(COL_IRI, args=[deposit_collection.name]) + + archive_content = b'some content representing archive' + archive = InMemoryUploadedFile( + BytesIO(archive_content), + field_name='archive0', + name='archive0', + content_type='application/zip', + size=len(archive_content), + charset=None) + + data_atom_entry = atom_dataset['entry-data-deposit-binary'] + atom_entry = InMemoryUploadedFile( + BytesIO(data_atom_entry), + field_name='atom0', + name='atom0', + content_type='application/atom+xml; charset="utf-8"', + size=len(data_atom_entry), + charset='utf-8') + + # when + response = authenticated_client.post( + url, + format='multipart', + data={ + 'archive': archive, + 'atom_entry': atom_entry, + }, + # + headers + HTTP_IN_PROGRESS='false') + + assert b'Missing SLUG header' in response.content + assert response.status_code == status.HTTP_400_BAD_REQUEST + + +def test_post_deposit_multipart_zip( + authenticated_client, deposit_collection, + atom_dataset, sample_archive): + """one multipart deposit (zip+xml) should be accepted + """ + # given + url = reverse(COL_IRI, args=[deposit_collection.name]) + + archive = InMemoryUploadedFile( + BytesIO(sample_archive['data']), + field_name=sample_archive['name'], + name=sample_archive['name'], + content_type='application/zip', + size=sample_archive['length'], + charset=None) + + data_atom_entry = atom_dataset['entry-data-deposit-binary'] + atom_entry = InMemoryUploadedFile( + BytesIO(data_atom_entry), + field_name='atom0', + name='atom0', + content_type='application/atom+xml; charset="utf-8"', + size=len(data_atom_entry), + charset='utf-8') + + external_id = 'external-id' + + # when + response = authenticated_client.post( + url, + format='multipart', + data={ + 'archive': archive, + 'atom_entry': atom_entry, + }, + # + headers + HTTP_IN_PROGRESS='false', + HTTP_SLUG=external_id) + + # then + assert response.status_code == status.HTTP_201_CREATED + + response_content = parse_xml(BytesIO(response.content)) + deposit_id = response_content['deposit_id'] + + deposit = Deposit.objects.get(pk=deposit_id) + assert deposit.status == DEPOSIT_STATUS_DEPOSITED + assert deposit.external_id == external_id + assert deposit.collection == deposit_collection + assert deposit.swh_id is None + + deposit_requests = DepositRequest.objects.filter(deposit=deposit) + assert len(deposit_requests) == 2 + for deposit_request in deposit_requests: + assert deposit_request.deposit == deposit + if deposit_request.type == 'archive': + check_archive(sample_archive['name'], deposit_request.archive.name) + assert deposit_request.metadata is None + assert deposit_request.raw_metadata is None + else: + assert deposit_request.metadata['id'] == \ + 'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a' + assert deposit_request.raw_metadata == \ + data_atom_entry.decode('utf-8') + + +def test_post_deposit_multipart_tar( + authenticated_client, deposit_collection, + atom_dataset, sample_archive): + """one multipart deposit (tar+xml) should be accepted -class DepositMultipartTestCase(APITestCase, WithAuthTestCase, BasicTestCase, - FileSystemCreationRoutine): - """Post multipart deposit scenario + """ + # given + url = reverse(COL_IRI, args=[deposit_collection.name]) + + # from django.core.files import uploadedfile + data_atom_entry = atom_dataset['entry-data-deposit-binary'] + + archive = InMemoryUploadedFile( + BytesIO(sample_archive['data']), + field_name=sample_archive['name'], + name=sample_archive['name'], + content_type='application/x-tar', + size=sample_archive['length'], + charset=None) + + atom_entry = InMemoryUploadedFile( + BytesIO(data_atom_entry), + field_name='atom0', + name='atom0', + content_type='application/atom+xml; charset="utf-8"', + size=len(data_atom_entry), + charset='utf-8') + + external_id = 'external-id' + + # when + response = authenticated_client.post( + url, + format='multipart', + data={ + 'archive': archive, + 'atom_entry': atom_entry, + }, + # + headers + HTTP_IN_PROGRESS='false', + HTTP_SLUG=external_id) + + # then + assert response.status_code == status.HTTP_201_CREATED + + response_content = parse_xml(BytesIO(response.content)) + deposit_id = response_content['deposit_id'] + + deposit = Deposit.objects.get(pk=deposit_id) + assert deposit.status == DEPOSIT_STATUS_DEPOSITED + assert deposit.external_id == external_id + assert deposit.collection == deposit_collection + assert deposit.swh_id is None + + deposit_requests = DepositRequest.objects.filter(deposit=deposit) + assert len(deposit_requests) == 2 + for deposit_request in deposit_requests: + assert deposit_request.deposit == deposit + if deposit_request.type == 'archive': + check_archive(sample_archive['name'], deposit_request.archive.name) + assert deposit_request.metadata is None + assert deposit_request.raw_metadata is None + else: + assert deposit_request.metadata['id'] == \ + 'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a' + assert deposit_request.raw_metadata == \ + data_atom_entry.decode('utf-8') + + +def test_post_deposit_multipart_put_to_replace_metadata( + authenticated_client, deposit_collection, + atom_dataset, sample_archive): + """One multipart deposit followed by a metadata update should be + accepted """ - def setUp(self): - super().setUp() - - self.data_atom_entry_ok = b""" - - Title - urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a - 2005-10-07T17:17:08Z - Contributor - The abstract - - - The abstract - Access Rights - Alternative Title - Date Available - Bibliographic Citation # noqa - Contributor - Description - Has Part - Has Version - Identifier - Is Part Of - Publisher - References - Rights Holder - Source - Title - Type - -""" - - self.data_atom_entry_update_in_place = """ - - urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa7b - Title - Type -""" - - def test_post_deposit_multipart_without_slug_header_is_bad_request(self): - # given - url = reverse(COL_IRI, args=[self.collection.name]) - data_atom_entry = self.data_atom_entry_ok - - archive_content = b'some content representing archive' - archive = InMemoryUploadedFile( - BytesIO(archive_content), - field_name='archive0', - name='archive0', - content_type='application/zip', - size=len(archive_content), - charset=None) - - atom_entry = InMemoryUploadedFile( - BytesIO(data_atom_entry), - field_name='atom0', - name='atom0', - content_type='application/atom+xml; charset="utf-8"', - size=len(data_atom_entry), - charset='utf-8') - - # when - response = self.client.post( - url, - format='multipart', - data={ - 'archive': archive, - 'atom_entry': atom_entry, - }, - # + headers - HTTP_IN_PROGRESS='false') - - self.assertIn(b'Missing SLUG header', response.content) - self.assertEqual(response.status_code, - status.HTTP_400_BAD_REQUEST) - - def test_post_deposit_multipart_zip(self): - """one multipart deposit (zip+xml) should be accepted - - """ - # given - url = reverse(COL_IRI, args=[self.collection.name]) - - # from django.core.files import uploadedfile - data_atom_entry = self.data_atom_entry_ok - - archive = InMemoryUploadedFile( - BytesIO(self.archive['data']), - field_name=self.archive['name'], - name=self.archive['name'], - content_type='application/zip', - size=self.archive['length'], - charset=None) - - atom_entry = InMemoryUploadedFile( - BytesIO(data_atom_entry), - field_name='atom0', - name='atom0', - content_type='application/atom+xml; charset="utf-8"', - size=len(data_atom_entry), - charset='utf-8') - - external_id = 'external-id' - - # when - response = self.client.post( - url, - format='multipart', - data={ - 'archive': archive, - 'atom_entry': atom_entry, - }, - # + headers - HTTP_IN_PROGRESS='false', - HTTP_SLUG=external_id) - - # then - self.assertEqual(response.status_code, status.HTTP_201_CREATED) - - response_content = parse_xml(BytesIO(response.content)) - deposit_id = response_content['deposit_id'] - - deposit = Deposit.objects.get(pk=deposit_id) - self.assertEqual(deposit.status, DEPOSIT_STATUS_DEPOSITED) - self.assertEqual(deposit.external_id, external_id) - self.assertEqual(deposit.collection, self.collection) - self.assertEqual(deposit.client, self.user) - self.assertIsNone(deposit.swh_id) - - deposit_requests = DepositRequest.objects.filter(deposit=deposit) - self.assertEqual(len(deposit_requests), 2) - for deposit_request in deposit_requests: - self.assertEqual(deposit_request.deposit, deposit) - if deposit_request.type == 'archive': - self.assertRegex(deposit_request.archive.name, - self.archive['name']) - self.assertIsNone(deposit_request.metadata) - self.assertIsNone(deposit_request.raw_metadata) - else: - self.assertEqual( - deposit_request.metadata['id'], - 'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a') - self.assertEqual(deposit_request.raw_metadata, - data_atom_entry.decode('utf-8')) - - def test_post_deposit_multipart_tar(self): - """one multipart deposit (tar+xml) should be accepted - - """ - # given - url = reverse(COL_IRI, args=[self.collection.name]) - - # from django.core.files import uploadedfile - data_atom_entry = self.data_atom_entry_ok - - archive = InMemoryUploadedFile( - BytesIO(self.archive['data']), - field_name=self.archive['name'], - name=self.archive['name'], - content_type='application/x-tar', - size=self.archive['length'], - charset=None) - - atom_entry = InMemoryUploadedFile( - BytesIO(data_atom_entry), - field_name='atom0', - name='atom0', - content_type='application/atom+xml; charset="utf-8"', - size=len(data_atom_entry), - charset='utf-8') - - external_id = 'external-id' - - # when - response = self.client.post( - url, - format='multipart', - data={ - 'archive': archive, - 'atom_entry': atom_entry, - }, - # + headers - HTTP_IN_PROGRESS='false', - HTTP_SLUG=external_id) - - # then - self.assertEqual(response.status_code, status.HTTP_201_CREATED) - - response_content = parse_xml(BytesIO(response.content)) - deposit_id = response_content['deposit_id'] - - deposit = Deposit.objects.get(pk=deposit_id) - self.assertEqual(deposit.status, DEPOSIT_STATUS_DEPOSITED) - self.assertEqual(deposit.external_id, external_id) - self.assertEqual(deposit.collection, self.collection) - self.assertEqual(deposit.client, self.user) - self.assertIsNone(deposit.swh_id) - - deposit_requests = DepositRequest.objects.filter(deposit=deposit) - self.assertEqual(len(deposit_requests), 2) - for deposit_request in deposit_requests: - self.assertEqual(deposit_request.deposit, deposit) - if deposit_request.type == 'archive': - self.assertRegex(deposit_request.archive.name, - self.archive['name']) - self.assertIsNone(deposit_request.metadata) - self.assertIsNone(deposit_request.raw_metadata) - else: - self.assertEqual( - deposit_request.metadata['id'], - 'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a') - self.assertEqual(deposit_request.raw_metadata, - data_atom_entry.decode('utf-8')) - - def test_post_deposit_multipart_put_to_replace_metadata(self): - """One multipart deposit followed by a metadata update should be - accepted - - """ - # given - url = reverse(COL_IRI, args=[self.collection.name]) - - data_atom_entry = self.data_atom_entry_ok - - archive = InMemoryUploadedFile( - BytesIO(self.archive['data']), - field_name=self.archive['name'], - name=self.archive['name'], - content_type='application/zip', - size=self.archive['length'], - charset=None) - - atom_entry = InMemoryUploadedFile( - BytesIO(data_atom_entry), - field_name='atom0', - name='atom0', - content_type='application/atom+xml; charset="utf-8"', - size=len(data_atom_entry), - charset='utf-8') - - external_id = 'external-id' - - # when - response = self.client.post( - url, - format='multipart', - data={ - 'archive': archive, - 'atom_entry': atom_entry, - }, - # + headers - HTTP_IN_PROGRESS='true', - HTTP_SLUG=external_id) - - # then - self.assertEqual(response.status_code, status.HTTP_201_CREATED) - - response_content = parse_xml(BytesIO(response.content)) - deposit_id = response_content['deposit_id'] - - deposit = Deposit.objects.get(pk=deposit_id) - self.assertEqual(deposit.status, 'partial') - self.assertEqual(deposit.external_id, external_id) - self.assertEqual(deposit.collection, self.collection) - self.assertEqual(deposit.client, self.user) - self.assertIsNone(deposit.swh_id) - - deposit_requests = DepositRequest.objects.filter(deposit=deposit) - - self.assertEqual(len(deposit_requests), 2) - for deposit_request in deposit_requests: - self.assertEqual(deposit_request.deposit, deposit) - if deposit_request.type == 'archive': - self.assertRegex(deposit_request.archive.name, - self.archive['name']) - else: - self.assertEqual( - deposit_request.metadata['id'], - 'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a') - self.assertEqual(deposit_request.raw_metadata, - data_atom_entry.decode('utf-8')) - - replace_metadata_uri = response._headers['location'][1] - response = self.client.put( - replace_metadata_uri, - content_type='application/atom+xml;type=entry', - data=self.data_atom_entry_update_in_place, - HTTP_IN_PROGRESS='false') - - self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) - - # deposit_id did not change - deposit = Deposit.objects.get(pk=deposit_id) - self.assertEqual(deposit.status, DEPOSIT_STATUS_DEPOSITED) - self.assertEqual(deposit.external_id, external_id) - self.assertEqual(deposit.collection, self.collection) - self.assertEqual(deposit.client, self.user) - self.assertIsNone(deposit.swh_id) - - deposit_requests = DepositRequest.objects.filter(deposit=deposit) - self.assertEqual(len(deposit_requests), 2) - for deposit_request in deposit_requests: - self.assertEqual(deposit_request.deposit, deposit) - if deposit_request.type == 'archive': - self.assertRegex(deposit_request.archive.name, - self.archive['name']) - else: - self.assertEqual( - deposit_request.metadata['id'], - 'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa7b') - self.assertEqual( - deposit_request.raw_metadata, - self.data_atom_entry_update_in_place) - - # FAILURE scenarios - - def test_post_deposit_multipart_only_archive_and_atom_entry(self): - """Multipart deposit only accepts one archive and one atom+xml""" - # given - url = reverse(COL_IRI, args=[self.collection.name]) - - archive_content = b'some content representing archive' - archive = InMemoryUploadedFile(BytesIO(archive_content), - field_name='archive0', - name='archive0', - content_type='application/x-tar', - size=len(archive_content), - charset=None) - - other_archive_content = b"some-other-content" - other_archive = InMemoryUploadedFile(BytesIO(other_archive_content), - field_name='atom0', - name='atom0', - content_type='application/x-tar', - size=len(other_archive_content), - charset='utf-8') - - # when - response = self.client.post( - url, - format='multipart', - data={ - 'archive': archive, - 'atom_entry': other_archive, - }, - # + headers - HTTP_IN_PROGRESS='false', - HTTP_SLUG='external-id') - - # then - self.assertEqual(response.status_code, - status.HTTP_415_UNSUPPORTED_MEDIA_TYPE) - self.assertTrue( - 'Only 1 application/zip (or application/x-tar) archive' in - response.content.decode('utf-8')) - - # when - archive.seek(0) - response = self.client.post( - url, - format='multipart', - data={ - 'archive': archive, - }, - # + headers - HTTP_IN_PROGRESS='false', - HTTP_SLUG='external-id') - - # then - self.assertEqual(response.status_code, - status.HTTP_415_UNSUPPORTED_MEDIA_TYPE) - self.assertTrue( - 'You must provide both 1 application/zip (or ' - 'application/x-tar) and 1 atom+xml entry for ' - 'multipart deposit' in response.content.decode('utf-8') - ) - - def test_post_deposit_multipart_400_when_badly_formatted_xml(self): - # given - url = reverse(COL_IRI, args=[self.collection.name]) - - data_atom_entry_ko = b""" - - - urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a - -""" - - archive_content = b'some content representing archive' - archive = InMemoryUploadedFile( - BytesIO(archive_content), - field_name='archive0', - name='archive0', - content_type='application/zip', - size=len(archive_content), - charset=None) - - atom_entry = InMemoryUploadedFile( - BytesIO(data_atom_entry_ko), - field_name='atom0', - name='atom0', - content_type='application/atom+xml; charset="utf-8"', - size=len(data_atom_entry_ko), - charset='utf-8') - - # when - response = self.client.post( - url, - format='multipart', - data={ - 'archive': archive, - 'atom_entry': atom_entry, - }, - # + headers - HTTP_IN_PROGRESS='false', - HTTP_SLUG='external-id', - ) - - self.assertIn(b'Malformed xml metadata', response.content) - self.assertEqual(response.status_code, - status.HTTP_400_BAD_REQUEST) + # given + url = reverse(COL_IRI, args=[deposit_collection.name]) + + data_atom_entry = atom_dataset['entry-data-deposit-binary'] + + archive = InMemoryUploadedFile( + BytesIO(sample_archive['data']), + field_name=sample_archive['name'], + name=sample_archive['name'], + content_type='application/zip', + size=sample_archive['length'], + charset=None) + + atom_entry = InMemoryUploadedFile( + BytesIO(data_atom_entry), + field_name='atom0', + name='atom0', + content_type='application/atom+xml; charset="utf-8"', + size=len(data_atom_entry), + charset='utf-8') + + external_id = 'external-id' + + # when + response = authenticated_client.post( + url, + format='multipart', + data={ + 'archive': archive, + 'atom_entry': atom_entry, + }, + # + headers + HTTP_IN_PROGRESS='true', + HTTP_SLUG=external_id) + + # then + assert response.status_code == status.HTTP_201_CREATED + + response_content = parse_xml(BytesIO(response.content)) + deposit_id = response_content['deposit_id'] + + deposit = Deposit.objects.get(pk=deposit_id) + assert deposit.status == 'partial' + assert deposit.external_id == external_id + assert deposit.collection == deposit_collection + assert deposit.swh_id is None + + deposit_requests = DepositRequest.objects.filter(deposit=deposit) + + assert len(deposit_requests) == 2 + for deposit_request in deposit_requests: + assert deposit_request.deposit == deposit + if deposit_request.type == 'archive': + check_archive(sample_archive['name'], deposit_request.archive.name) + else: + assert deposit_request.metadata['id'] == \ + 'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a' + assert deposit_request.raw_metadata == \ + data_atom_entry.decode('utf-8') + + replace_metadata_uri = response._headers['location'][1] + response = authenticated_client.put( + replace_metadata_uri, + content_type='application/atom+xml;type=entry', + data=atom_dataset['entry-data-deposit-binary'], + HTTP_IN_PROGRESS='false') + + assert response.status_code == status.HTTP_204_NO_CONTENT + + # deposit_id did not change + deposit = Deposit.objects.get(pk=deposit_id) + assert deposit.status == DEPOSIT_STATUS_DEPOSITED + assert deposit.external_id == external_id + assert deposit.collection == deposit_collection + assert deposit.swh_id is None + + deposit_requests = DepositRequest.objects.filter(deposit=deposit) + assert len(deposit_requests) == 2 + for deposit_request in deposit_requests: + assert deposit_request.deposit == deposit + if deposit_request.type == 'archive': + check_archive(sample_archive['name'], deposit_request.archive.name) + else: + assert deposit_request.metadata['id'] == \ + 'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a' + assert deposit_request.raw_metadata == \ + atom_dataset['entry-data-deposit-binary'].decode('utf-8') + +# FAILURE scenarios + + +def test_post_deposit_multipart_only_archive_and_atom_entry( + authenticated_client, deposit_collection): + """Multipart deposit only accepts one archive and one atom+xml""" + # given + url = reverse(COL_IRI, args=[deposit_collection.name]) + + archive_content = b'some content representing archive' + archive = InMemoryUploadedFile(BytesIO(archive_content), + field_name='archive0', + name='archive0', + content_type='application/x-tar', + size=len(archive_content), + charset=None) + + other_archive_content = b"some-other-content" + other_archive = InMemoryUploadedFile(BytesIO(other_archive_content), + field_name='atom0', + name='atom0', + content_type='application/x-tar', + size=len(other_archive_content), + charset='utf-8') + + # when + response = authenticated_client.post( + url, + format='multipart', + data={ + 'archive': archive, + 'atom_entry': other_archive, + }, + # + headers + HTTP_IN_PROGRESS='false', + HTTP_SLUG='external-id') + + # then + assert response.status_code == status.HTTP_415_UNSUPPORTED_MEDIA_TYPE + assert 'Only 1 application/zip (or application/x-tar) archive' in \ + response.content.decode('utf-8') + + # when + archive.seek(0) + response = authenticated_client.post( + url, + format='multipart', + data={ + 'archive': archive, + }, + # + headers + HTTP_IN_PROGRESS='false', + HTTP_SLUG='external-id') + + # then + assert response.status_code == status.HTTP_415_UNSUPPORTED_MEDIA_TYPE + assert ( + 'You must provide both 1 application/zip (or ' + 'application/x-tar) and 1 atom+xml entry for ' + 'multipart deposit' in response.content.decode('utf-8') + ) is True + + +def test_post_deposit_multipart_400_when_badly_formatted_xml( + authenticated_client, deposit_collection, + sample_archive, atom_dataset): + # given + url = reverse(COL_IRI, args=[deposit_collection.name]) + + archive_content = sample_archive['data'] + archive = InMemoryUploadedFile( + BytesIO(archive_content), + field_name=sample_archive['name'], + name=sample_archive['name'], + content_type='application/zip', + size=len(archive_content), + charset=None) + + data_atom_entry_ko = atom_dataset['entry-data-ko'] + atom_entry = InMemoryUploadedFile( + BytesIO(data_atom_entry_ko), + field_name='atom0', + name='atom0', + content_type='application/atom+xml; charset="utf-8"', + size=len(data_atom_entry_ko), + charset='utf-8') + + # when + response = authenticated_client.post( + url, + format='multipart', + data={ + 'archive': archive, + 'atom_entry': atom_entry, + }, + # + headers + HTTP_IN_PROGRESS='false', + HTTP_SLUG='external-id', + ) + + assert b'Malformed xml metadata' in response.content + assert response.status_code == status.HTTP_400_BAD_REQUEST diff --git a/swh/deposit/tests/api/test_deposit_private_check.py b/swh/deposit/tests/api/test_deposit_private_check.py --- a/swh/deposit/tests/api/test_deposit_private_check.py +++ b/swh/deposit/tests/api/test_deposit_private_check.py @@ -3,234 +3,261 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import unittest - from django.urls import reverse -import pytest from rest_framework import status -from rest_framework.test import APITestCase from swh.deposit.config import ( DEPOSIT_STATUS_VERIFIED, PRIVATE_CHECK_DEPOSIT, - DEPOSIT_STATUS_DEPOSITED, DEPOSIT_STATUS_REJECTED + DEPOSIT_STATUS_DEPOSITED, DEPOSIT_STATUS_REJECTED, COL_IRI ) from swh.deposit.api.private.deposit_check import ( - SWHChecksDeposit, MANDATORY_ARCHIVE_INVALID, - MANDATORY_FIELDS_MISSING, + MANDATORY_ARCHIVE_INVALID, MANDATORY_FIELDS_MISSING, MANDATORY_ARCHIVE_UNSUPPORTED, ALTERNATE_FIELDS_MISSING, MANDATORY_ARCHIVE_MISSING ) from swh.deposit.models import Deposit +from swh.deposit.parsers import parse_xml +from swh.deposit.tests.common import ( + create_arborescence_archive, create_archive_with_archive +) -from ..common import BasicTestCase, WithAuthTestCase, CommonCreationRoutine -from ..common import FileSystemCreationRoutine - - -@pytest.mark.fs -class CheckDepositTest(APITestCase, WithAuthTestCase, - BasicTestCase, CommonCreationRoutine, - FileSystemCreationRoutine): - """Check deposit endpoints. - """ - def setUp(self): - super().setUp() +PRIVATE_CHECK_DEPOSIT_NC = PRIVATE_CHECK_DEPOSIT + '-nc' - def private_deposit_url(self, deposit_id): - return reverse(PRIVATE_CHECK_DEPOSIT, - args=[self.collection.name, deposit_id]) - def test_deposit_ok(self): - """Proper deposit should succeed the checks (-> status ready) +def private_check_url_endpoints(collection, deposit): + """There are 2 endpoints to check (one with collection, one without)""" + return [ + reverse(PRIVATE_CHECK_DEPOSIT, args=[collection.name, deposit.id]), + reverse(PRIVATE_CHECK_DEPOSIT_NC, args=[deposit.id]) + ] - """ - deposit_id = self.create_simple_binary_deposit(status_partial=True) - deposit_id = self.update_binary_deposit(deposit_id, - status_partial=False) - deposit = Deposit.objects.get(pk=deposit_id) - self.assertEqual(deposit.status, DEPOSIT_STATUS_DEPOSITED) +def test_deposit_ok( + authenticated_client, deposit_collection, ready_deposit_ok): + """Proper deposit should succeed the checks (-> status ready) - url = self.private_deposit_url(deposit.id) - response = self.client.get(url) + """ + deposit = ready_deposit_ok + for url in private_check_url_endpoints(deposit_collection, deposit): + response = authenticated_client.get(url) - self.assertEqual(response.status_code, status.HTTP_200_OK) + assert response.status_code == status.HTTP_200_OK data = response.json() - self.assertEqual(data['status'], DEPOSIT_STATUS_VERIFIED) + assert data['status'] == DEPOSIT_STATUS_VERIFIED deposit = Deposit.objects.get(pk=deposit.id) - self.assertEqual(deposit.status, DEPOSIT_STATUS_VERIFIED) + assert deposit.status == DEPOSIT_STATUS_VERIFIED - def test_deposit_invalid_tarball(self): - """Deposit with tarball (of 1 tarball) should fail the checks: rejected + deposit.status = DEPOSIT_STATUS_DEPOSITED + deposit.save() - """ - for archive_extension in ['zip', 'tar', 'tar.gz', 'tar.bz2', 'tar.xz']: - deposit_id = self.create_deposit_archive_with_archive( - archive_extension) - deposit = Deposit.objects.get(pk=deposit_id) - self.assertEqual(DEPOSIT_STATUS_DEPOSITED, deposit.status) +def test_deposit_invalid_tarball( + tmp_path, authenticated_client, deposit_collection): + """Deposit with tarball (of 1 tarball) should fail the checks: rejected - url = self.private_deposit_url(deposit.id) - response = self.client.get(url) - - self.assertEqual(response.status_code, status.HTTP_200_OK) + """ + for archive_extension in ['zip', 'tar', 'tar.gz', 'tar.bz2', 'tar.xz']: + deposit = create_deposit_archive_with_archive( + tmp_path, archive_extension, + authenticated_client, + deposit_collection.name) + for url in private_check_url_endpoints(deposit_collection, deposit): + response = authenticated_client.get(url) + assert response.status_code == status.HTTP_200_OK data = response.json() - self.assertEqual(data['status'], DEPOSIT_STATUS_REJECTED) + assert data['status'] == DEPOSIT_STATUS_REJECTED details = data['details'] # archive checks failure - self.assertEqual(len(details['archive']), 1) - self.assertEqual(details['archive'][0]['summary'], - MANDATORY_ARCHIVE_INVALID) + assert len(details['archive']) == 1 + assert details['archive'][0]['summary'] == \ + MANDATORY_ARCHIVE_INVALID deposit = Deposit.objects.get(pk=deposit.id) - self.assertEqual(deposit.status, DEPOSIT_STATUS_REJECTED) + assert deposit.status == DEPOSIT_STATUS_REJECTED - def test_deposit_ko_missing_tarball(self): - """Deposit without archive should fail the checks: rejected - """ - deposit_id = self.create_deposit_ready() # no archive, only atom - deposit = Deposit.objects.get(pk=deposit_id) - self.assertEqual(DEPOSIT_STATUS_DEPOSITED, deposit.status) +def test_deposit_ko_missing_tarball( + authenticated_client, deposit_collection, ready_deposit_only_metadata): + """Deposit without archive should fail the checks: rejected - url = self.private_deposit_url(deposit.id) - response = self.client.get(url) + """ + deposit = ready_deposit_only_metadata + assert deposit.status == DEPOSIT_STATUS_DEPOSITED + + for url in private_check_url_endpoints(deposit_collection, deposit): + response = authenticated_client.get(url) - self.assertEqual(response.status_code, status.HTTP_200_OK) + assert response.status_code == status.HTTP_200_OK data = response.json() - self.assertEqual(data['status'], DEPOSIT_STATUS_REJECTED) + assert data['status'] == DEPOSIT_STATUS_REJECTED details = data['details'] # archive checks failure - self.assertEqual(len(details['archive']), 1) - self.assertEqual(details['archive'][0]['summary'], - MANDATORY_ARCHIVE_MISSING) + assert len(details['archive']) == 1 + assert details['archive'][0]['summary'] == MANDATORY_ARCHIVE_MISSING deposit = Deposit.objects.get(pk=deposit.id) - self.assertEqual(deposit.status, DEPOSIT_STATUS_REJECTED) + assert deposit.status == DEPOSIT_STATUS_REJECTED - def test_deposit_ko_unsupported_tarball(self): - """Deposit with an unsupported tarball should fail the checks: rejected + deposit.status = DEPOSIT_STATUS_DEPOSITED + deposit.save() - """ - deposit_id = self.create_deposit_with_invalid_archive() - deposit = Deposit.objects.get(pk=deposit_id) - self.assertEqual(DEPOSIT_STATUS_DEPOSITED, deposit.status) +def test_deposit_ko_unsupported_tarball( + tmp_path, authenticated_client, deposit_collection, + ready_deposit_invalid_archive): + """Deposit with an unsupported tarball should fail the checks: rejected + + """ + deposit = ready_deposit_invalid_archive + assert DEPOSIT_STATUS_DEPOSITED == deposit.status - url = self.private_deposit_url(deposit.id) - response = self.client.get(url) + for url in private_check_url_endpoints(deposit_collection, deposit): + response = authenticated_client.get(url) - self.assertEqual(response.status_code, status.HTTP_200_OK) + assert response.status_code == status.HTTP_200_OK data = response.json() - self.assertEqual(data['status'], DEPOSIT_STATUS_REJECTED) + assert data['status'] == DEPOSIT_STATUS_REJECTED details = data['details'] + # archive checks failure - self.assertEqual(len(details['archive']), 1) - self.assertEqual(details['archive'][0]['summary'], - MANDATORY_ARCHIVE_UNSUPPORTED) + assert len(details['archive']) == 1 + assert details['archive'][0]['summary'] == \ + MANDATORY_ARCHIVE_UNSUPPORTED # metadata check failure - self.assertEqual(len(details['metadata']), 2) + assert len(details['metadata']) == 2 mandatory = details['metadata'][0] - self.assertEqual(mandatory['summary'], MANDATORY_FIELDS_MISSING) - self.assertEqual(set(mandatory['fields']), - set(['author'])) + assert mandatory['summary'] == MANDATORY_FIELDS_MISSING + assert set(mandatory['fields']) == set(['author']) alternate = details['metadata'][1] - self.assertEqual(alternate['summary'], ALTERNATE_FIELDS_MISSING) - self.assertEqual(alternate['fields'], ['name or title']) + assert alternate['summary'] == ALTERNATE_FIELDS_MISSING + assert alternate['fields'] == ['name or title'] deposit = Deposit.objects.get(pk=deposit.id) - self.assertEqual(deposit.status, DEPOSIT_STATUS_REJECTED) + assert deposit.status == DEPOSIT_STATUS_REJECTED - def test_check_deposit_metadata_ok(self): - """Proper deposit should succeed the checks (-> status ready) - with all **MUST** metadata + deposit.status = DEPOSIT_STATUS_DEPOSITED + deposit.save() - using the codemeta metadata test set - """ - deposit_id = self.create_simple_binary_deposit(status_partial=True) - deposit_id_metadata = self.add_metadata_to_deposit(deposit_id) - self.assertEqual(deposit_id, deposit_id_metadata) - deposit = Deposit.objects.get(pk=deposit_id) - self.assertEqual(deposit.status, DEPOSIT_STATUS_DEPOSITED) +def test_check_deposit_metadata_ok( + authenticated_client, deposit_collection, ready_deposit_ok): + """Proper deposit should succeed the checks (-> status ready) + with all **MUST** metadata - url = self.private_deposit_url(deposit.id) + using the codemeta metadata test set + """ + deposit = ready_deposit_ok + assert deposit.status == DEPOSIT_STATUS_DEPOSITED - response = self.client.get(url) + for url in private_check_url_endpoints(deposit_collection, deposit): + response = authenticated_client.get(url) - self.assertEqual(response.status_code, status.HTTP_200_OK) + assert response.status_code == status.HTTP_200_OK data = response.json() - self.assertEqual(data['status'], DEPOSIT_STATUS_VERIFIED) + assert data['status'] == DEPOSIT_STATUS_VERIFIED deposit = Deposit.objects.get(pk=deposit.id) - self.assertEqual(deposit.status, DEPOSIT_STATUS_VERIFIED) - - -@pytest.mark.fs -class CheckDepositTest2(CheckDepositTest): - def private_deposit_url(self, deposit_id): - return reverse(PRIVATE_CHECK_DEPOSIT+'-nc', - args=[deposit_id]) - - -class CheckMetadata(unittest.TestCase, SWHChecksDeposit): - def test_check_metadata_ok(self): - actual_check, detail = self._check_metadata({ - 'url': 'something', - 'external_identifier': 'something-else', - 'name': 'foo', - 'author': 'someone', - }) - - self.assertTrue(actual_check) - self.assertIsNone(detail) - - def test_check_metadata_ok2(self): - actual_check, detail = self._check_metadata({ - 'url': 'something', - 'external_identifier': 'something-else', - 'title': 'bar', - 'author': 'someone', - }) - - self.assertTrue(actual_check) - self.assertIsNone(detail) - - def test_check_metadata_ko(self): - """Missing optional field should be caught - - """ - actual_check, error_detail = self._check_metadata({ - 'url': 'something', - 'external_identifier': 'something-else', - 'author': 'someone', - }) - - expected_error = { - 'metadata': [{ - 'summary': 'Mandatory alternate fields are missing', - 'fields': ['name or title'], - }] - } - self.assertFalse(actual_check) - self.assertEqual(error_detail, expected_error) - - def test_check_metadata_ko2(self): - """Missing mandatory fields should be caught - - """ - actual_check, error_detail = self._check_metadata({ - 'url': 'something', - 'external_identifier': 'something-else', - 'title': 'foobar', - }) - - expected_error = { - 'metadata': [{ - 'summary': 'Mandatory fields are missing', - 'fields': ['author'], - }] - } - - self.assertFalse(actual_check) - self.assertEqual(error_detail, expected_error) + assert deposit.status == DEPOSIT_STATUS_VERIFIED + + deposit.status = DEPOSIT_STATUS_DEPOSITED + deposit.save() + + +def test_check_metadata_ok(swh_checks_deposit): + actual_check, detail = swh_checks_deposit._check_metadata({ + 'url': 'something', + 'external_identifier': 'something-else', + 'name': 'foo', + 'author': 'someone', + }) + + assert actual_check is True + assert detail is None + + +def test_check_metadata_ok2(swh_checks_deposit): + actual_check, detail = swh_checks_deposit._check_metadata({ + 'url': 'something', + 'external_identifier': 'something-else', + 'title': 'bar', + 'author': 'someone', + }) + + assert actual_check is True + assert detail is None + + +def test_check_metadata_ko(swh_checks_deposit): + """Missing optional field should be caught + + """ + actual_check, error_detail = swh_checks_deposit._check_metadata({ + 'url': 'something', + 'external_identifier': 'something-else', + 'author': 'someone', + }) + + expected_error = { + 'metadata': [{ + 'summary': 'Mandatory alternate fields are missing', + 'fields': ['name or title'], + }] + } + assert actual_check is False + assert error_detail == expected_error + + +def test_check_metadata_ko2(swh_checks_deposit): + """Missing mandatory fields should be caught + + """ + actual_check, error_detail = swh_checks_deposit._check_metadata({ + 'url': 'something', + 'external_identifier': 'something-else', + 'title': 'foobar', + }) + + expected_error = { + 'metadata': [{ + 'summary': 'Mandatory fields are missing', + 'fields': ['author'], + }] + } + + assert actual_check is False + assert error_detail == expected_error + + +def create_deposit_archive_with_archive( + root_path, archive_extension, client, collection_name): + # we create the holding archive to a given extension + archive = create_arborescence_archive( + root_path, 'archive1', 'file1', b'some content in file', + extension=archive_extension) + + # now we create an archive holding the first created archive + invalid_archive = create_archive_with_archive( + root_path, 'invalid.tar.gz', archive) + + # we deposit it + response = client.post( + reverse(COL_IRI, args=[collection_name]), + content_type='application/x-tar', + data=invalid_archive['data'], + CONTENT_LENGTH=invalid_archive['length'], + HTTP_MD5SUM=invalid_archive['md5sum'], + HTTP_SLUG='external-id', + HTTP_IN_PROGRESS=False, + HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % ( + invalid_archive['name'], )) + + # then + assert response.status_code == status.HTTP_201_CREATED + response_content = parse_xml(response.content) + deposit_status = response_content['deposit_status'] + assert deposit_status == DEPOSIT_STATUS_DEPOSITED + deposit_id = int(response_content['deposit_id']) + + deposit = Deposit.objects.get(pk=deposit_id) + assert DEPOSIT_STATUS_DEPOSITED == deposit.status + return deposit diff --git a/swh/deposit/tests/api/test_deposit_private_read_archive.py b/swh/deposit/tests/api/test_deposit_private_read_archive.py --- a/swh/deposit/tests/api/test_deposit_private_read_archive.py +++ b/swh/deposit/tests/api/test_deposit_private_read_archive.py @@ -4,95 +4,108 @@ # See top-level LICENSE file for more information import hashlib -import os +import shutil from django.urls import reverse -import pytest +from os import listdir, path, mkdir from rest_framework import status -from rest_framework.test import APITestCase from swh.core import tarball -from swh.deposit.config import PRIVATE_GET_RAW_CONTENT -from swh.deposit.tests import TEST_CONFIG +from swh.deposit.config import PRIVATE_GET_RAW_CONTENT, EM_IRI -from ..common import BasicTestCase, WithAuthTestCase, CommonCreationRoutine -from ..common import FileSystemCreationRoutine, create_arborescence_archive +from swh.deposit.tests.common import create_arborescence_archive -@pytest.mark.fs -class DepositReadArchivesTest(APITestCase, WithAuthTestCase, - BasicTestCase, CommonCreationRoutine, - FileSystemCreationRoutine): +PRIVATE_GET_RAW_CONTENT_NC = PRIVATE_GET_RAW_CONTENT + '-nc' - def setUp(self): - super().setUp() - self.archive2 = create_arborescence_archive( - self.root_path, 'archive2', 'file2', b'some other content in file') - self.workdir = os.path.join(self.root_path, 'workdir') - def private_deposit_url(self, deposit_id): - return reverse(PRIVATE_GET_RAW_CONTENT, - args=[self.collection.name, deposit_id]) +def private_get_raw_url_endpoints(collection, deposit): + """There are 2 endpoints to check (one with collection, one without)""" + return [ + reverse(PRIVATE_GET_RAW_CONTENT, args=[collection.name, deposit.id]), + reverse(PRIVATE_GET_RAW_CONTENT_NC, args=[deposit.id]) + ] - def test_access_to_existing_deposit_with_one_archive(self): - """Access to deposit should stream a 200 response with its raw content - """ - deposit_id = self.create_simple_binary_deposit() +def test_access_to_existing_deposit_with_one_archive( + authenticated_client, deposit_collection, complete_deposit, + sample_archive): + """Access to deposit should stream a 200 response with its raw content - url = self.private_deposit_url(deposit_id) - r = self.client.get(url) + """ + deposit = complete_deposit - self.assertEqual(r.status_code, status.HTTP_200_OK) - self.assertEqual(r._headers['content-type'][1], - 'application/octet-stream') + for url in private_get_raw_url_endpoints(deposit_collection, deposit): + r = authenticated_client.get(url) + + assert r.status_code == status.HTTP_200_OK + assert r._headers['content-type'][1] == 'application/octet-stream' # read the stream data = b''.join(r.streaming_content) actual_sha1 = hashlib.sha1(data).hexdigest() - self.assertEqual(actual_sha1, self.archive['sha1sum']) - - # this does not touch the extraction dir so this should stay empty - self.assertEqual(os.listdir(TEST_CONFIG['extraction_dir']), []) - - def _check_tarball_consistency(self, actual_sha1): - tarball.uncompress(self.archive['path'], self.workdir) - self.assertEqual(os.listdir(self.workdir), ['file1']) - tarball.uncompress(self.archive2['path'], self.workdir) - lst = set(os.listdir(self.workdir)) - self.assertEqual(lst, {'file1', 'file2'}) - - new_path = self.workdir + '.zip' - tarball.compress(new_path, 'zip', self.workdir) - with open(new_path, 'rb') as f: - h = hashlib.sha1(f.read()).hexdigest() - - self.assertEqual(actual_sha1, h) - self.assertNotEqual(actual_sha1, self.archive['sha1sum']) - self.assertNotEqual(actual_sha1, self.archive2['sha1sum']) - - def test_access_to_existing_deposit_with_multiple_archives(self): - """Access to deposit should stream a 200 response with its raw contents - - """ - deposit_id = self.create_complex_binary_deposit() - url = self.private_deposit_url(deposit_id) - r = self.client.get(url) - - self.assertEqual(r.status_code, status.HTTP_200_OK) - self.assertEqual(r._headers['content-type'][1], - 'application/octet-stream') + assert actual_sha1 == sample_archive['sha1sum'] + + +def test_access_to_existing_deposit_with_multiple_archives( + tmp_path, authenticated_client, deposit_collection, partial_deposit, + sample_archive): + """Access to deposit should stream a 200 response with its raw contents + + """ + deposit = partial_deposit + archive2 = create_arborescence_archive( + tmp_path, 'archive2', 'file2', b'some content in file') + + # Add a second archive to deposit + update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit.id]) + response = authenticated_client.post( + update_uri, + content_type='application/zip', # as zip + data=archive2['data'], + # + headers + CONTENT_LENGTH=archive2['length'], + HTTP_SLUG=deposit.external_id, + HTTP_CONTENT_MD5=archive2['md5sum'], + HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', + HTTP_IN_PROGRESS='false', + HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % ( + archive2['name'], )) + assert response.status_code == status.HTTP_201_CREATED + + for url in private_get_raw_url_endpoints(deposit_collection, deposit): + r = authenticated_client.get(url) + + assert r.status_code == status.HTTP_200_OK + assert r._headers['content-type'][1] == 'application/octet-stream' # read the stream data = b''.join(r.streaming_content) actual_sha1 = hashlib.sha1(data).hexdigest() - self._check_tarball_consistency(actual_sha1) - - # this touches the extraction directory but should clean up - # after itself - self.assertEqual(os.listdir(TEST_CONFIG['extraction_dir']), []) - - -@pytest.mark.fs -class DepositReadArchivesTest2(DepositReadArchivesTest): - def private_deposit_url(self, deposit_id): - return reverse(PRIVATE_GET_RAW_CONTENT+'-nc', args=[deposit_id]) + check_tarball_consistency( + tmp_path, sample_archive, archive2, actual_sha1) + + +def check_tarball_consistency(tmp_path, archive, archive2, actual_sha1): + """Check the tarballs are ok + + """ + workdir = path.join(tmp_path, 'workdir') + mkdir(workdir) + lst = set(listdir(workdir)) + assert lst == set() + tarball.uncompress(archive['path'], dest=workdir) + assert listdir(workdir) == ['file1'] + tarball.uncompress(archive2['path'], dest=workdir) + lst = set(listdir(workdir)) + assert lst == {'file1', 'file2'} + + new_path = workdir + '.zip' + tarball.compress(new_path, 'zip', workdir) + with open(new_path, 'rb') as f: + h = hashlib.sha1(f.read()).hexdigest() + + assert actual_sha1 == h + assert actual_sha1 != archive['sha1sum'] + assert actual_sha1 != archive2['sha1sum'] + + shutil.rmtree(workdir) diff --git a/swh/deposit/tests/api/test_deposit_private_read_metadata.py b/swh/deposit/tests/api/test_deposit_private_read_metadata.py --- a/swh/deposit/tests/api/test_deposit_private_read_metadata.py +++ b/swh/deposit/tests/api/test_deposit_private_read_metadata.py @@ -3,480 +3,446 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information - from django.urls import reverse from rest_framework import status -from rest_framework.test import APITestCase from swh.deposit.models import Deposit -from swh.deposit.config import PRIVATE_GET_DEPOSIT_METADATA -from swh.deposit.config import DEPOSIT_STATUS_LOAD_SUCCESS -from swh.deposit.config import DEPOSIT_STATUS_PARTIAL +from swh.deposit.config import ( + PRIVATE_GET_DEPOSIT_METADATA, SWH_PERSON, EDIT_SE_IRI +) -from ...config import SWH_PERSON -from ..common import BasicTestCase, WithAuthTestCase, CommonCreationRoutine +PRIVATE_GET_DEPOSIT_METADATA_NC = PRIVATE_GET_DEPOSIT_METADATA + '-nc' -class DepositReadMetadataTest(APITestCase, WithAuthTestCase, BasicTestCase, - CommonCreationRoutine): - """Deposit access to read metadata information on deposit. +def private_get_raw_url_endpoints(collection, deposit): + """There are 2 endpoints to check (one with collection, one without)""" + deposit_id = deposit if isinstance(deposit, int) else deposit.id + return [ + reverse(PRIVATE_GET_DEPOSIT_METADATA, + args=[collection.name, deposit_id]), + reverse(PRIVATE_GET_DEPOSIT_METADATA_NC, + args=[deposit_id]) + ] + + +def update_deposit(authenticated_client, collection, deposit, atom_dataset): + for atom_data in ['entry-data2', 'entry-data3']: + update_deposit_with_metadata( + authenticated_client, collection, deposit, atom_dataset[atom_data] + ) + return deposit + + +def update_deposit_with_metadata(authenticated_client, collection, deposit, + metadata): + # update deposit's metadata + response = authenticated_client.post( + reverse(EDIT_SE_IRI, args=[collection.name, deposit.id]), + content_type='application/atom+xml;type=entry', + data=metadata, + HTTP_SLUG=deposit.external_id, + HTTP_IN_PROGRESS=True) + assert response.status_code == status.HTTP_201_CREATED + return deposit - """ - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - - self.template_metadata = """ - - Composing a Web of Audio Applications - hal - hal-01243065 - hal-01243065 - https://hal-test.archives-ouvertes.fr/hal-01243065 - test - DSP programming - this is the description - 1 - phpstorm - stable - php - python - C - - GNU General Public License v3.0 only - - - CeCILL Free Software License Agreement v1.1 - - - HAL - hal@ccsd.cnrs.fr - - - Morane Gruenpeter - -%s -""" - - def private_deposit_url(self, deposit_id): - return reverse(PRIVATE_GET_DEPOSIT_METADATA, - args=[self.collection.name, deposit_id]) - - def test_read_metadata(self): - """Private metadata read api to existing deposit should return metadata - - """ - deposit_id = self.create_deposit_partial() - - url = self.private_deposit_url(deposit_id) - - response = self.client.get(url) - - self.assertEqual(response.status_code, - status.HTTP_200_OK) - self.assertEqual(response._headers['content-type'][1], - 'application/json') +def test_read_metadata( + authenticated_client, deposit_collection, partial_deposit, + atom_dataset): + """Private metadata read api to existing deposit should return metadata + + """ + deposit = partial_deposit + deposit.external_id = 'some-external-id' + deposit.save() + deposit = update_deposit(authenticated_client, deposit_collection, deposit, + atom_dataset) + + for url in private_get_raw_url_endpoints(deposit_collection, deposit): + response = authenticated_client.get(url) + assert response.status_code == status.HTTP_200_OK + assert response._headers['content-type'][1] == 'application/json' data = response.json() expected_meta = { + 'branch_name': 'master', 'origin': { - 'url': 'https://hal-test.archives-ouvertes.fr/' + - 'some-external-id', - 'type': 'deposit' + 'type': 'deposit', + 'url': 'https://hal-test.archives-ouvertes.fr/some-external-id' }, 'origin_metadata': { 'metadata': { '@xmlns': ['http://www.w3.org/2005/Atom'], - 'author': ['some awesome author', 'another one', 'no one'], + 'author': [ + 'some awesome author', + 'another one', + 'no one' + ], 'codemeta:dateCreated': '2017-10-07T15:17:08Z', 'external_identifier': 'some-external-id', - 'url': 'https://hal-test.archives-ouvertes.fr/' + - 'some-external-id' + 'url': 'https://hal-test.archives-ouvertes.fr/some-external-id' # noqa }, 'provider': { - 'provider_name': 'hal', + 'metadata': {}, + 'provider_name': '', 'provider_type': 'deposit_client', - 'provider_url': 'https://hal-test.archives-ouvertes.fr/', - 'metadata': {} + 'provider_url': 'https://hal-test.archives-ouvertes.fr/' }, 'tool': { + 'configuration': {'sword_version': '2'}, 'name': 'swh-deposit', - 'version': '0.0.1', - 'configuration': { - 'sword_version': '2' - } + 'version': '0.0.1' } }, 'revision': { - 'synthetic': True, + 'author': SWH_PERSON, + 'committer': SWH_PERSON, 'committer_date': { - 'timestamp': { - 'seconds': 1507389428, - 'microseconds': 0 - }, + 'negative_utc': False, 'offset': 0, - 'negative_utc': False + 'timestamp': { + 'microseconds': 0, + 'seconds': 1507389428 + } }, - 'message': 'hal: Deposit %s in collection hal' % deposit_id, - 'author': SWH_PERSON, - 'committer': SWH_PERSON, 'date': { - 'timestamp': { - 'seconds': 1507389428, - 'microseconds': 0 - }, + 'negative_utc': False, 'offset': 0, - 'negative_utc': False + 'timestamp': {'microseconds': 0, 'seconds': 1507389428} }, + 'message': 'test: Deposit %s in collection test' % deposit.id, 'metadata': { '@xmlns': ['http://www.w3.org/2005/Atom'], - 'author': ['some awesome author', 'another one', 'no one'], - 'external_identifier': 'some-external-id', + 'author': ['some awesome author', + 'another one', + 'no one'], 'codemeta:dateCreated': '2017-10-07T15:17:08Z', - 'url': 'https://hal-test.archives-ouvertes.fr/' + - 'some-external-id' + 'external_identifier': 'some-external-id', + 'url': 'https://hal-test.archives-ouvertes.fr/some-external-id' # noqa }, + 'synthetic': True, 'type': 'tar' - }, - 'branch_name': 'master', + } } - self.assertEqual(data, expected_meta) - - def test_read_metadata_revision_with_parent(self): - """Private read metadata to a deposit (with parent) returns metadata - - """ - swh_id = 'da78a9d4cf1d5d29873693fd496142e3a18c20fa' - swh_persistent_id = 'swh:1:rev:%s' % swh_id - deposit_id1 = self.create_deposit_with_status( - status=DEPOSIT_STATUS_LOAD_SUCCESS, - external_id='some-external-id', - swh_id=swh_persistent_id) - - deposit_parent = Deposit.objects.get(pk=deposit_id1) - self.assertEqual(deposit_parent.swh_id, swh_persistent_id) - self.assertEqual(deposit_parent.external_id, 'some-external-id') - self.assertEqual(deposit_parent.status, DEPOSIT_STATUS_LOAD_SUCCESS) + assert data == expected_meta - deposit_id = self.create_deposit_partial( - external_id='some-external-id') - deposit = Deposit.objects.get(pk=deposit_id) - self.assertEqual(deposit.external_id, 'some-external-id') - self.assertEqual(deposit.swh_id, None) - self.assertEqual(deposit.parent, deposit_parent) - self.assertEqual(deposit.status, DEPOSIT_STATUS_PARTIAL) +def test_read_metadata_revision_with_parent( + authenticated_client, deposit_collection, partial_deposit, + atom_dataset): + """Private read metadata to a deposit (with parent) returns metadata - url = self.private_deposit_url(deposit_id) - - response = self.client.get(url) - - self.assertEqual(response.status_code, - status.HTTP_200_OK) - self.assertEqual(response._headers['content-type'][1], - 'application/json') + """ + deposit = partial_deposit + deposit.external_id = 'some-external-id' + deposit.save() + deposit = update_deposit(authenticated_client, deposit_collection, deposit, + atom_dataset) + rev_id = 'da78a9d4cf1d5d29873693fd496142e3a18c20fa' + swh_id = 'swh:1:rev:%s' % rev_id + fake_parent = Deposit(swh_id=swh_id, + client=deposit.client, collection=deposit.collection) + fake_parent.save() + deposit.parent = fake_parent + deposit.save() + + for url in private_get_raw_url_endpoints(deposit_collection, deposit): + response = authenticated_client.get(url) + + assert response.status_code == status.HTTP_200_OK + assert response._headers['content-type'][1] == 'application/json' data = response.json() expected_meta = { + 'branch_name': 'master', 'origin': { - 'url': 'https://hal-test.archives-ouvertes.fr/' + - 'some-external-id', - 'type': 'deposit' + 'type': 'deposit', + 'url': 'https://hal-test.archives-ouvertes.fr/some-external-id' }, 'origin_metadata': { 'metadata': { '@xmlns': ['http://www.w3.org/2005/Atom'], - 'author': ['some awesome author', 'another one', 'no one'], + 'author': [ + 'some awesome author', + 'another one', + 'no one' + ], 'codemeta:dateCreated': '2017-10-07T15:17:08Z', 'external_identifier': 'some-external-id', - 'url': 'https://hal-test.archives-ouvertes.fr/' + - 'some-external-id' + 'url': 'https://hal-test.archives-ouvertes.fr/some-external-id' # noqa }, 'provider': { - 'provider_name': 'hal', + 'metadata': {}, + 'provider_name': '', 'provider_type': 'deposit_client', - 'provider_url': 'https://hal-test.archives-ouvertes.fr/', - 'metadata': {} + 'provider_url': 'https://hal-test.archives-ouvertes.fr/' }, 'tool': { + 'configuration': {'sword_version': '2'}, 'name': 'swh-deposit', - 'version': '0.0.1', - 'configuration': { - 'sword_version': '2' - } + 'version': '0.0.1' } }, 'revision': { - 'synthetic': True, - 'date': { - 'timestamp': { - 'seconds': 1507389428, - 'microseconds': 0 - }, - 'offset': 0, - 'negative_utc': False - }, + 'author': SWH_PERSON, + 'committer': SWH_PERSON, 'committer_date': { + 'negative_utc': False, + 'offset': 0, 'timestamp': { - 'seconds': 1507389428, - 'microseconds': 0 - }, + 'microseconds': 0, + 'seconds': 1507389428 + } + }, + 'date': { + 'negative_utc': False, 'offset': 0, - 'negative_utc': False + 'timestamp': {'microseconds': 0, 'seconds': 1507389428} }, - 'author': SWH_PERSON, - 'committer': SWH_PERSON, - 'type': 'tar', - 'message': 'hal: Deposit %s in collection hal' % deposit_id, + 'message': 'test: Deposit %s in collection test' % deposit.id, 'metadata': { '@xmlns': ['http://www.w3.org/2005/Atom'], - 'author': ['some awesome author', 'another one', 'no one'], + 'author': ['some awesome author', + 'another one', + 'no one'], 'codemeta:dateCreated': '2017-10-07T15:17:08Z', 'external_identifier': 'some-external-id', - 'url': 'https://hal-test.archives-ouvertes.fr/' + - 'some-external-id' + 'url': 'https://hal-test.archives-ouvertes.fr/some-external-id' # noqa }, - 'parents': [swh_id] - }, - 'branch_name': 'master', + 'synthetic': True, + 'type': 'tar', + 'parents': [rev_id], + } } - self.assertEqual(data, expected_meta) + assert data == expected_meta + - def test_read_metadata_3(self): - """date(Created|Published) provided, uses author/committer date +def test_read_metadata_3( + authenticated_client, deposit_collection, partial_deposit, + atom_dataset): + """date(Created|Published) provided, uses author/committer date - """ - # add metadata to the deposit with datePublished and dateCreated - codemeta_entry_data = self.template_metadata % """ + """ + deposit = partial_deposit + deposit.external_id = 'hal-01243065' + deposit.save() + deposit = update_deposit( + authenticated_client, deposit_collection, deposit, + atom_dataset) + # add metadata to the deposit with datePublished and dateCreated + codemeta_entry_data = atom_dataset['metadata'] % b""" 2015-04-06T17:08:47+02:00 2017-05-03T16:08:47+02:00 """ + update_deposit_with_metadata( + authenticated_client, deposit_collection, deposit, + codemeta_entry_data + ) - deposit_id = self.create_deposit_partial_with_data_in_args( - codemeta_entry_data) - url = self.private_deposit_url(deposit_id) - response = self.client.get(url) + for url in private_get_raw_url_endpoints(deposit_collection, deposit): + response = authenticated_client.get(url) - self.assertEqual(response.status_code, - status.HTTP_200_OK) - self.assertEqual(response._headers['content-type'][1], - 'application/json') + assert response.status_code == status.HTTP_200_OK + assert response._headers['content-type'][1] == 'application/json' data = response.json() - expected_origin = { - 'type': 'deposit', - 'url': 'https://hal-test.archives-ouvertes.fr/hal-01243065' - } - expected_metadata = { - '@xmlns': 'http://www.w3.org/2005/Atom', - '@xmlns:codemeta': - 'https://doi.org/10.5063/SCHEMA/CODEMETA-2.0', - 'author': { - 'email': 'hal@ccsd.cnrs.fr', - 'name': 'HAL' - }, + metadata = { + '@xmlns': ['http://www.w3.org/2005/Atom'], + '@xmlns:codemeta': 'https://doi.org/10.5063/SCHEMA/CODEMETA-2.0', + 'author': [ + 'some awesome author', + 'another one', + 'no one', + { + 'email': 'hal@ccsd.cnrs.fr', + 'name': 'HAL' + } + ], 'client': 'hal', 'codemeta:applicationCategory': 'test', 'codemeta:author': { 'codemeta:name': 'Morane Gruenpeter' }, - 'codemeta:dateCreated': '2015-04-06T17:08:47+02:00', + 'codemeta:dateCreated': ['2017-10-07T15:17:08Z', + '2015-04-06T17:08:47+02:00'], 'codemeta:datePublished': '2017-05-03T16:08:47+02:00', 'codemeta:description': 'this is the description', 'codemeta:developmentStatus': 'stable', 'codemeta:keywords': 'DSP programming', 'codemeta:license': [ - { - 'codemeta:name': 'GNU General Public License v3.0 only' - }, - { - 'codemeta:name': - 'CeCILL Free Software License Agreement v1.1' - } - ], + {'codemeta:name': 'GNU General Public License v3.0 only'}, + {'codemeta:name': 'CeCILL ' + 'Free ' + 'Software ' + 'License ' + 'Agreement ' + 'v1.1'}], 'codemeta:programmingLanguage': [ - 'php', 'python', 'C' + 'php', + 'python', + 'C' ], 'codemeta:runtimePlatform': 'phpstorm', 'codemeta:url': 'https://hal-test.archives-ouvertes.fr/hal-01243065', # noqa 'codemeta:version': '1', - 'external_identifier': 'hal-01243065', + 'external_identifier': [ + 'some-external-id', + 'hal-01243065' + ], 'id': 'hal-01243065', - 'title': 'Composing a Web of Audio Applications' + 'title': 'Composing a Web of Audio ' + 'Applications', + 'url': 'https://hal-test.archives-ouvertes.fr/some-external-id' } - - expected_origin_metadata = { - 'metadata': expected_metadata, - 'provider': { - 'metadata': {}, - 'provider_name': 'hal', - 'provider_type': 'deposit_client', - 'provider_url': 'https://hal-test.archives-ouvertes.fr/' + expected_meta = { + 'branch_name': 'master', + 'origin': { + 'type': 'deposit', + 'url': 'https://hal-test.archives-ouvertes.fr/hal-01243065' }, - 'tool': { - 'configuration': { - 'sword_version': '2' + 'origin_metadata': { + 'metadata': metadata, + 'provider': { + 'metadata': {}, + 'provider_name': '', + 'provider_type': 'deposit_client', + 'provider_url': 'https://hal-test.archives-ouvertes.fr/' }, - 'name': 'swh-deposit', - 'version': '0.0.1' - } - } - - expected_revision = { - 'author': { - 'email': 'robot@softwareheritage.org', - 'fullname': 'Software Heritage', - 'name': 'Software Heritage' - }, - 'committer': { - 'email': 'robot@softwareheritage.org', - 'fullname': 'Software Heritage', - 'name': 'Software Heritage' - }, - 'committer_date': { - 'negative_utc': False, - 'offset': 120, - 'timestamp': { - 'microseconds': 0, - 'seconds': 1493820527 - } - }, - 'date': { - 'negative_utc': False, - 'offset': 120, - 'timestamp': { - 'microseconds': 0, - 'seconds': 1428332927 + 'tool': { + 'configuration': {'sword_version': '2'}, + 'name': 'swh-deposit', + 'version': '0.0.1' } }, - 'message': 'hal: Deposit %s in collection hal' % deposit_id, - 'metadata': expected_metadata, - 'synthetic': True, - 'type': 'tar' - } - - expected_meta = { - 'branch_name': 'master', - 'origin': expected_origin, - 'origin_metadata': expected_origin_metadata, - 'revision': expected_revision, + 'revision': { + 'author': SWH_PERSON, + 'committer': SWH_PERSON, + 'committer_date': {'negative_utc': False, + 'offset': 120, + 'timestamp': {'microseconds': 0, + 'seconds': 1493820527}}, + 'date': { + 'negative_utc': False, + 'offset': 0, + 'timestamp': {'microseconds': 0, 'seconds': 1507389428} + }, + 'message': '%s: Deposit %s in collection %s' % ( + deposit_collection.name, + deposit.id, + deposit_collection.name + ), + 'metadata': metadata, + 'synthetic': True, + 'type': 'tar' + } } + assert data == expected_meta - self.assertEqual(data, expected_meta) - def test_read_metadata_4(self): - """dateCreated/datePublished not provided, revision uses complete_date +def test_read_metadata_4( + authenticated_client, deposit_collection, atom_dataset, + partial_deposit): + """dateCreated/datePublished not provided, revision uses complete_date - """ - codemeta_entry_data = self.template_metadata % '' - - deposit_id = self.create_deposit_partial_with_data_in_args( - codemeta_entry_data) + """ + deposit = partial_deposit + codemeta_entry_data = atom_dataset['metadata'] % b'' + deposit = update_deposit_with_metadata( + authenticated_client, deposit_collection, deposit, + codemeta_entry_data) - # will use the deposit completed date as fallback date - deposit = Deposit.objects.get(pk=deposit_id) - deposit.complete_date = '2016-04-06' - deposit.save() + # will use the deposit completed date as fallback date + deposit.complete_date = '2016-04-06' + deposit.save() - url = self.private_deposit_url(deposit_id) - response = self.client.get(url) + for url in private_get_raw_url_endpoints(deposit_collection, deposit): + response = authenticated_client.get(url) - self.assertEqual(response.status_code, - status.HTTP_200_OK) - self.assertEqual(response._headers['content-type'][1], - 'application/json') + assert response.status_code == status.HTTP_200_OK + assert response._headers['content-type'][1] == 'application/json' data = response.json() - expected_origin = { - 'type': 'deposit', - 'url': 'https://hal-test.archives-ouvertes.fr/hal-01243065' - } - expected_metadata = { + metadata = { '@xmlns': 'http://www.w3.org/2005/Atom', - '@xmlns:codemeta': - 'https://doi.org/10.5063/SCHEMA/CODEMETA-2.0', - 'author': { - 'email': 'hal@ccsd.cnrs.fr', - 'name': 'HAL' - }, + '@xmlns:codemeta': 'https://doi.org/10.5063/SCHEMA/CODEMETA-2.0', + 'author': {'email': 'hal@ccsd.cnrs.fr', + 'name': 'HAL'}, 'client': 'hal', 'codemeta:applicationCategory': 'test', - 'codemeta:author': { - 'codemeta:name': 'Morane Gruenpeter' - }, - 'codemeta:description': 'this is the description', + 'codemeta:author': {'codemeta:name': 'Morane ' + 'Gruenpeter'}, + 'codemeta:description': 'this is the ' + 'description', 'codemeta:developmentStatus': 'stable', 'codemeta:keywords': 'DSP programming', - 'codemeta:license': [ - { - 'codemeta:name': 'GNU General Public License v3.0 only' - }, - { - 'codemeta:name': - 'CeCILL Free Software License Agreement v1.1' - } - ], - 'codemeta:programmingLanguage': [ - 'php', 'python', 'C' - ], + 'codemeta:license': [{'codemeta:name': 'GNU ' + 'General ' + 'Public ' + 'License ' + 'v3.0 ' + 'only'}, + {'codemeta:name': 'CeCILL ' + 'Free ' + 'Software ' + 'License ' + 'Agreement ' + 'v1.1'}], + 'codemeta:programmingLanguage': ['php', + 'python', + 'C'], 'codemeta:runtimePlatform': 'phpstorm', - 'codemeta:url': 'https://hal-test.archives-ouvertes.fr/hal-01243065', # noqa + 'codemeta:url': + 'https://hal-test.archives-ouvertes.fr/hal-01243065', 'codemeta:version': '1', 'external_identifier': 'hal-01243065', 'id': 'hal-01243065', - 'title': 'Composing a Web of Audio Applications' + 'title': 'Composing a Web of Audio ' + 'Applications' + } + + expected_origin = { + 'type': 'deposit', + 'url': 'https://hal-test.archives-ouvertes.fr/%s' % ( + deposit.external_id) } expected_origin_metadata = { - 'metadata': expected_metadata, + 'metadata': metadata, 'provider': { 'metadata': {}, - 'provider_name': 'hal', + 'provider_name': '', 'provider_type': 'deposit_client', 'provider_url': 'https://hal-test.archives-ouvertes.fr/' }, 'tool': { - 'configuration': { - 'sword_version': '2' - }, + 'configuration': {'sword_version': '2'}, 'name': 'swh-deposit', 'version': '0.0.1' } } expected_revision = { - 'author': { - 'email': 'robot@softwareheritage.org', - 'fullname': 'Software Heritage', - 'name': 'Software Heritage' - }, - 'committer': { - 'email': 'robot@softwareheritage.org', - 'fullname': 'Software Heritage', - 'name': 'Software Heritage' - }, - 'committer_date': { - 'negative_utc': False, - 'offset': 0, - 'timestamp': { - 'microseconds': 0, - 'seconds': 1459900800 - } - }, + 'author': {'email': 'robot@softwareheritage.org', + 'fullname': 'Software Heritage', + 'name': 'Software Heritage'}, + 'committer': {'email': 'robot@softwareheritage.org', + 'fullname': 'Software Heritage', + 'name': 'Software Heritage'}, + 'committer_date': {'negative_utc': False, + 'offset': 0, + 'timestamp': {'microseconds': 0, + 'seconds': 1459900800}}, 'date': { 'negative_utc': False, 'offset': 0, - 'timestamp': { - 'microseconds': 0, - 'seconds': 1459900800 - } - }, - 'message': 'hal: Deposit %s in collection hal' % deposit_id, - 'metadata': expected_metadata, + 'timestamp': {'microseconds': 0, 'seconds': 1459900800}}, + 'message': '%s: Deposit %s in collection %s' % ( + deposit_collection.name, deposit.id, deposit_collection.name + ), + 'metadata': metadata, 'synthetic': True, 'type': 'tar' } @@ -488,130 +454,121 @@ 'revision': expected_revision, } - self.assertEqual(data, expected_meta) + assert data == expected_meta + - def test_read_metadata_5(self): - """dateCreated/datePublished provided, revision uses author/committer - date +def test_read_metadata_5( + authenticated_client, deposit_collection, partial_deposit, + atom_dataset): + """dateCreated/datePublished provided, revision uses author/committer + date - If multiple dateCreated provided, the first occurrence (of - dateCreated) is selected. If multiple datePublished provided, - the first occurrence (of datePublished) is selected. + If multiple dateCreated provided, the first occurrence (of + dateCreated) is selected. If multiple datePublished provided, + the first occurrence (of datePublished) is selected. - """ - # add metadata to the deposit with multiple datePublished/dateCreated - codemeta_entry_data = self.template_metadata % """ + """ + deposit = partial_deposit + # add metadata to the deposit with multiple datePublished/dateCreated + codemeta_entry_data = atom_dataset['metadata'] % b""" 2015-04-06T17:08:47+02:00 2017-05-03T16:08:47+02:00 2016-04-06T17:08:47+02:00 2018-05-03T16:08:47+02:00 """ + deposit = update_deposit_with_metadata( + authenticated_client, deposit_collection, deposit, + codemeta_entry_data) - deposit_id = self.create_deposit_partial_with_data_in_args( - codemeta_entry_data) - url = self.private_deposit_url(deposit_id) - response = self.client.get(url) + for url in private_get_raw_url_endpoints(deposit_collection, deposit): + response = authenticated_client.get(url) - self.assertEqual(response.status_code, - status.HTTP_200_OK) - self.assertEqual(response._headers['content-type'][1], - 'application/json') + assert response.status_code == status.HTTP_200_OK + assert response._headers['content-type'][1] == 'application/json' data = response.json() expected_origin = { 'type': 'deposit', - 'url': 'https://hal-test.archives-ouvertes.fr/hal-01243065' + 'url': 'https://hal-test.archives-ouvertes.fr/external-id-partial' } - expected_metadata = { + + metadata = { '@xmlns': 'http://www.w3.org/2005/Atom', - '@xmlns:codemeta': - 'https://doi.org/10.5063/SCHEMA/CODEMETA-2.0', - 'author': { - 'email': 'hal@ccsd.cnrs.fr', - 'name': 'HAL' - }, + '@xmlns:codemeta': 'https://doi.org/10.5063/SCHEMA/CODEMETA-2.0', + 'author': {'email': 'hal@ccsd.cnrs.fr', + 'name': 'HAL'}, 'client': 'hal', 'codemeta:applicationCategory': 'test', - 'codemeta:author': { - 'codemeta:name': 'Morane Gruenpeter' - }, - 'codemeta:dateCreated': [ - '2015-04-06T17:08:47+02:00', - '2016-04-06T17:08:47+02:00', - ], - 'codemeta:datePublished': [ - '2017-05-03T16:08:47+02:00', - '2018-05-03T16:08:47+02:00', - ], + 'codemeta:author': {'codemeta:name': 'Morane ' + 'Gruenpeter'}, + 'codemeta:dateCreated': ['2015-04-06T17:08:47+02:00', + '2016-04-06T17:08:47+02:00'], + 'codemeta:datePublished': ['2017-05-03T16:08:47+02:00', + '2018-05-03T16:08:47+02:00'], 'codemeta:description': 'this is the description', 'codemeta:developmentStatus': 'stable', 'codemeta:keywords': 'DSP programming', 'codemeta:license': [ { - 'codemeta:name': 'GNU General Public License v3.0 only' - }, + 'codemeta:name': 'GNU ' + 'General ' + 'Public ' + 'License ' + 'v3.0 ' + 'only'}, { - 'codemeta:name': - 'CeCILL Free Software License Agreement v1.1' + 'codemeta:name': 'CeCILL ' + 'Free ' + 'Software ' + 'License ' + 'Agreement ' + 'v1.1' } ], - 'codemeta:programmingLanguage': [ - 'php', 'python', 'C' - ], + 'codemeta:programmingLanguage': ['php', + 'python', + 'C'], 'codemeta:runtimePlatform': 'phpstorm', 'codemeta:url': 'https://hal-test.archives-ouvertes.fr/hal-01243065', # noqa 'codemeta:version': '1', 'external_identifier': 'hal-01243065', 'id': 'hal-01243065', - 'title': 'Composing a Web of Audio Applications' + 'title': 'Composing a Web of Audio ' + 'Applications' } expected_origin_metadata = { - 'metadata': expected_metadata, + 'metadata': metadata, 'provider': { 'metadata': {}, - 'provider_name': 'hal', + 'provider_name': '', 'provider_type': 'deposit_client', - 'provider_url': 'https://hal-test.archives-ouvertes.fr/' - }, + 'provider_url': 'https://hal-test.archives-ouvertes.fr/'}, 'tool': { - 'configuration': { - 'sword_version': '2' - }, + 'configuration': {'sword_version': '2'}, 'name': 'swh-deposit', 'version': '0.0.1' } } expected_revision = { - 'author': { - 'email': 'robot@softwareheritage.org', - 'fullname': 'Software Heritage', - 'name': 'Software Heritage' - }, - 'committer': { - 'email': 'robot@softwareheritage.org', - 'fullname': 'Software Heritage', - 'name': 'Software Heritage' - }, - 'committer_date': { - 'negative_utc': False, - 'offset': 120, - 'timestamp': { - 'microseconds': 0, - 'seconds': 1493820527 - } - }, - 'date': { - 'negative_utc': False, - 'offset': 120, - 'timestamp': { - 'microseconds': 0, - 'seconds': 1428332927 - } - }, - 'message': 'hal: Deposit %s in collection hal' % deposit_id, - 'metadata': expected_metadata, + 'author': {'email': 'robot@softwareheritage.org', + 'fullname': 'Software Heritage', + 'name': 'Software Heritage'}, + 'committer': {'email': 'robot@softwareheritage.org', + 'fullname': 'Software Heritage', + 'name': 'Software Heritage'}, + 'committer_date': {'negative_utc': False, + 'offset': 120, + 'timestamp': {'microseconds': 0, + 'seconds': 1493820527}}, + 'date': {'negative_utc': False, + 'offset': 120, + 'timestamp': {'microseconds': 0, 'seconds': 1428332927}}, + 'message': '%s: Deposit %s in collection %s' % ( + deposit_collection.name, deposit.id, deposit_collection.name + ), + 'metadata': metadata, 'synthetic': True, 'type': 'tar' } @@ -620,25 +577,25 @@ 'branch_name': 'master', 'origin': expected_origin, 'origin_metadata': expected_origin_metadata, - 'revision': expected_revision, + 'revision': expected_revision } - self.assertEqual(data, expected_meta) + assert data == expected_meta - def test_access_to_nonexisting_deposit_returns_404_response(self): - """Read unknown collection should return a 404 response - """ - unknown_id = '999' - url = self.private_deposit_url(unknown_id) - response = self.client.get(url) - self.assertEqual(response.status_code, - status.HTTP_404_NOT_FOUND) - self.assertIn('Deposit with id %s does not exist' % unknown_id, - response.content.decode('utf-8')) +def test_access_to_nonexisting_deposit_returns_404_response( + authenticated_client, deposit_collection, ): + """Read unknown collection should return a 404 response - -class DepositReadMetadataTest2(DepositReadMetadataTest): - def private_deposit_url(self, deposit_id): - return reverse(PRIVATE_GET_DEPOSIT_METADATA+'-nc', - args=[deposit_id]) + """ + unknown_id = 999 + try: + Deposit.objects.get(pk=unknown_id) + except Deposit.DoesNotExist: + assert True + + for url in private_get_raw_url_endpoints(deposit_collection, unknown_id): + response = authenticated_client.get(url) + assert response.status_code == status.HTTP_404_NOT_FOUND + msg = 'Deposit with id %s does not exist' % unknown_id + assert msg in response.content.decode('utf-8') diff --git a/swh/deposit/tests/api/test_deposit_private_update_status.py b/swh/deposit/tests/api/test_deposit_private_update_status.py --- a/swh/deposit/tests/api/test_deposit_private_update_status.py +++ b/swh/deposit/tests/api/test_deposit_private_update_status.py @@ -7,57 +7,56 @@ from django.urls import reverse from rest_framework import status -from rest_framework.test import APITestCase from swh.deposit.models import Deposit, DEPOSIT_STATUS_DETAIL -from swh.deposit.config import PRIVATE_PUT_DEPOSIT, DEPOSIT_STATUS_VERIFIED -from swh.deposit.config import DEPOSIT_STATUS_LOAD_SUCCESS -from ..common import BasicTestCase +from swh.deposit.config import ( + PRIVATE_PUT_DEPOSIT, DEPOSIT_STATUS_VERIFIED, DEPOSIT_STATUS_LOAD_SUCCESS +) -class UpdateDepositStatusTest(APITestCase, BasicTestCase): - """Update the deposit's status scenario +PRIVATE_PUT_DEPOSIT_NC = PRIVATE_PUT_DEPOSIT + '-nc' - """ - def setUp(self): - super().setUp() - deposit = Deposit(status=DEPOSIT_STATUS_VERIFIED, - collection=self.collection, - client=self.user) - deposit.save() - self.deposit = Deposit.objects.get(pk=deposit.id) - assert self.deposit.status == DEPOSIT_STATUS_VERIFIED - def private_deposit_url(self, deposit_id): - return reverse(PRIVATE_PUT_DEPOSIT, - args=[self.collection.name, deposit_id]) +def private_check_url_endpoints(collection, deposit): + """There are 2 endpoints to check (one with collection, one without)""" + return [ + reverse(PRIVATE_PUT_DEPOSIT, args=[collection.name, deposit.id]), + reverse(PRIVATE_PUT_DEPOSIT_NC, args=[deposit.id]) + ] - def test_update_deposit_status(self): - """Existing status for update should return a 204 response - """ - url = self.private_deposit_url(self.deposit.id) +def test_update_deposit_status( + authenticated_client, deposit_collection, ready_deposit_verified): + """Existing status for update should return a 204 response + """ + deposit = ready_deposit_verified + for url in private_check_url_endpoints(deposit_collection, deposit): possible_status = set(DEPOSIT_STATUS_DETAIL.keys()) - set( [DEPOSIT_STATUS_LOAD_SUCCESS]) for _status in possible_status: - response = self.client.put( + response = authenticated_client.put( url, content_type='application/json', data=json.dumps({'status': _status})) - self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) + assert response.status_code == status.HTTP_204_NO_CONTENT - deposit = Deposit.objects.get(pk=self.deposit.id) - self.assertEqual(deposit.status, _status) + deposit = Deposit.objects.get(pk=deposit.id) + assert deposit.status == _status - def test_update_deposit_status_with_info(self): - """Existing status for update with info should return a 204 response + deposit.status = DEPOSIT_STATUS_VERIFIED + deposit.save() # hack the same deposit - """ - url = self.private_deposit_url(self.deposit.id) +def test_update_deposit_status_with_info( + authenticated_client, deposit_collection, ready_deposit_verified): + """Existing status for update with info should return a 204 response + + """ + deposit = ready_deposit_verified + for url in private_check_url_endpoints(deposit_collection, deposit): expected_status = DEPOSIT_STATUS_LOAD_SUCCESS origin_url = 'something' directory_id = '42a13fc721c8716ff695d0d62fc851d641f3a12b' @@ -69,7 +68,7 @@ expected_swh_anchor_id_context = 'swh:1:rev:%s;origin=%s' % ( revision_id, origin_url) - response = self.client.put( + response = authenticated_client.put( url, content_type='application/json', data=json.dumps({ @@ -79,56 +78,63 @@ 'origin_url': origin_url, })) - self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) + assert response.status_code == status.HTTP_204_NO_CONTENT + + deposit = Deposit.objects.get(pk=deposit.id) + assert deposit.status == expected_status + assert deposit.swh_id == expected_swh_id + assert deposit.swh_id_context == expected_swh_id_context + assert deposit.swh_anchor_id == expected_swh_anchor_id + assert deposit.swh_anchor_id_context == expected_swh_anchor_id_context - deposit = Deposit.objects.get(pk=self.deposit.id) - self.assertEqual(deposit.status, expected_status) - self.assertEqual(deposit.swh_id, expected_swh_id) - self.assertEqual(deposit.swh_id_context, expected_swh_id_context) - self.assertEqual(deposit.swh_anchor_id, expected_swh_anchor_id) - self.assertEqual(deposit.swh_anchor_id_context, - expected_swh_anchor_id_context) + deposit.swh_id = None + deposit.swh_id_context = None + deposit.swh_anchor_id = None + deposit.swh_anchor_id_context = None + deposit.status = DEPOSIT_STATUS_VERIFIED + deposit.save() - def test_update_deposit_status_will_fail_with_unknown_status(self): - """Unknown status for update should return a 400 response - """ - url = self.private_deposit_url(self.deposit.id) +def test_update_deposit_status_will_fail_with_unknown_status( + authenticated_client, deposit_collection, ready_deposit_verified): + """Unknown status for update should return a 400 response - response = self.client.put( + """ + deposit = ready_deposit_verified + for url in private_check_url_endpoints(deposit_collection, deposit): + response = authenticated_client.put( url, content_type='application/json', data=json.dumps({'status': 'unknown'})) - self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + assert response.status_code == status.HTTP_400_BAD_REQUEST - def test_update_deposit_status_will_fail_with_no_status_key(self): - """No status provided for update should return a 400 response - """ - url = self.private_deposit_url(self.deposit.id) +def test_update_deposit_status_will_fail_with_no_status_key( + authenticated_client, deposit_collection, ready_deposit_verified): + """No status provided for update should return a 400 response - response = self.client.put( + """ + deposit = ready_deposit_verified + for url in private_check_url_endpoints(deposit_collection, deposit): + response = authenticated_client.put( url, content_type='application/json', data=json.dumps({'something': 'something'})) - self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + assert response.status_code == status.HTTP_400_BAD_REQUEST - def test_update_deposit_status_success_without_swh_id_fail(self): - """Providing successful status without swh_id should return a 400 - """ - url = self.private_deposit_url(self.deposit.id) +def test_update_deposit_status_success_without_swh_id_fail( + authenticated_client, deposit_collection, ready_deposit_verified): + """Providing successful status without swh_id should return a 400 - response = self.client.put( + """ + deposit = ready_deposit_verified + for url in private_check_url_endpoints(deposit_collection, deposit): + response = authenticated_client.put( url, content_type='application/json', data=json.dumps({'status': DEPOSIT_STATUS_LOAD_SUCCESS})) - self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) - - -class UpdateDepositStatusTest2(UpdateDepositStatusTest): - def private_deposit_url(self, deposit_id): - return reverse(PRIVATE_PUT_DEPOSIT+'-nc', args=[deposit_id]) + assert response.status_code == status.HTTP_400_BAD_REQUEST diff --git a/swh/deposit/tests/api/test_deposit_status.py b/swh/deposit/tests/api/test_deposit_status.py --- a/swh/deposit/tests/api/test_deposit_status.py +++ b/swh/deposit/tests/api/test_deposit_status.py @@ -7,139 +7,124 @@ from django.urls import reverse from io import BytesIO from rest_framework import status -from rest_framework.test import APITestCase -from swh.deposit.config import (COL_IRI, STATE_IRI, DEPOSIT_STATUS_DEPOSITED, +from swh.deposit.config import (STATE_IRI, DEPOSIT_STATUS_DEPOSITED, DEPOSIT_STATUS_REJECTED) -from swh.deposit.models import Deposit, DEPOSIT_STATUS_DETAIL -from swh.deposit.models import DEPOSIT_STATUS_LOAD_SUCCESS +from swh.deposit.models import ( + DEPOSIT_STATUS_DETAIL, DEPOSIT_STATUS_LOAD_SUCCESS +) from swh.deposit.parsers import parse_xml -from ..common import BasicTestCase, WithAuthTestCase, FileSystemCreationRoutine -from ..common import CommonCreationRoutine +def test_post_deposit_with_status_check( + authenticated_client, deposited_deposit): + """Successful but not loaded deposit should have a status 'deposited' -class DepositStatusTestCase(APITestCase, WithAuthTestCase, BasicTestCase, - FileSystemCreationRoutine, CommonCreationRoutine): - """Status on deposit + """ + deposit = deposited_deposit + status_url = reverse(STATE_IRI, + args=[deposit.collection.name, deposit.id]) + + # check status + status_response = authenticated_client.get(status_url) + + assert status_response.status_code == status.HTTP_200_OK + r = parse_xml(BytesIO(status_response.content)) + + assert int(r['deposit_id']) == deposit.id + assert r['deposit_status'] == DEPOSIT_STATUS_DEPOSITED + assert r['deposit_status_detail'] == \ + DEPOSIT_STATUS_DETAIL[DEPOSIT_STATUS_DEPOSITED] + assert r['deposit_external_id'] == deposit.external_id + + +def test_status_unknown_deposit(authenticated_client, deposit_collection): + """Unknown deposit status should return 404 response + + """ + unknown_deposit_id = 999 + status_url = reverse(STATE_IRI, + args=[deposit_collection.name, unknown_deposit_id]) + status_response = authenticated_client.get(status_url) + assert status_response.status_code == status.HTTP_404_NOT_FOUND + + +def test_status_unknown_collection( + authenticated_client, deposited_deposit): + """Unknown collection status should return 404 response""" + deposit = deposited_deposit + unknown_collection = 'something-unknown' + status_url = reverse(STATE_IRI, + args=[unknown_collection, deposit.id]) + status_response = authenticated_client.get(status_url) + assert status_response.status_code == status.HTTP_404_NOT_FOUND + + +def test_status_deposit_rejected(authenticated_client, rejected_deposit): + """Rejected deposit status should be 'rejected' with detailed summary + + """ + deposit = rejected_deposit + # _status_detail = {'url': {'summary': 'Wrong url'}} + + url = reverse(STATE_IRI, + args=[deposit.collection.name, deposit.id]) + + # when + status_response = authenticated_client.get(url) + + # then + assert status_response.status_code == status.HTTP_200_OK + r = parse_xml(BytesIO(status_response.content)) + assert int(r['deposit_id']) == deposit.id + assert r['deposit_status'] == DEPOSIT_STATUS_REJECTED + assert r['deposit_status_detail'] == 'Deposit failed the checks' + if deposit.swh_id: + assert r['deposit_swh_id'] == deposit.swh_id + + +def test_status_with_http_accept_header_should_not_break( + authenticated_client, partial_deposit): + """Asking deposit status with Accept header should return 200 + + """ + deposit = partial_deposit + + status_url = reverse(STATE_IRI, args=[ + deposit.collection.name, deposit.id]) + + response = authenticated_client.get(status_url) + assert response.status_code == status.HTTP_200_OK + + response = authenticated_client.get( + status_url, + HTTP_ACCEPT='text/html,application/xml;q=9,*/*,q=8') + assert response.status_code == status.HTTP_200_OK + + +def test_status_complete_deposit( + authenticated_client, complete_deposit): + """Successful and loaded deposit should be 'done' and have detailed swh ids """ - def test_post_deposit_with_status_check(self): - """Binary upload should be accepted - - """ - # given - url = reverse(COL_IRI, args=[self.collection.name]) - - external_id = 'some-external-id-1' - - # when - response = self.client.post( - url, - content_type='application/zip', # as zip - data=self.archive['data'], - # + headers - CONTENT_LENGTH=self.archive['length'], - HTTP_SLUG=external_id, - HTTP_CONTENT_MD5=self.archive['md5sum'], - HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', - HTTP_IN_PROGRESS='false', - HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') - - # then - self.assertEqual(response.status_code, status.HTTP_201_CREATED) - - deposit = Deposit.objects.get(external_id=external_id) - - status_url = reverse(STATE_IRI, - args=[self.collection.name, deposit.id]) - - # check status - status_response = self.client.get(status_url) - - self.assertEqual(status_response.status_code, status.HTTP_200_OK) - r = parse_xml(BytesIO(status_response.content)) - - self.assertEqual(int(r['deposit_id']), deposit.id) - self.assertEqual(r['deposit_status'], DEPOSIT_STATUS_DEPOSITED) - self.assertEqual(r['deposit_status_detail'], - DEPOSIT_STATUS_DETAIL[DEPOSIT_STATUS_DEPOSITED]) - self.assertEqual(r['deposit_external_id'], external_id) - - def test_status_with_swh_information(self): - _status = DEPOSIT_STATUS_LOAD_SUCCESS - _context = 'https://hal.archives-ouvertes.fr/hal-01727745' - _swh_id = 'swh:1:dir:42a13fc721c8716ff695d0d62fc851d641f3a12b' - _swh_id_context = '%s;%s' % (_swh_id, _context) - _swh_anchor_id = 'swh:rev:1:548b3c0a2bb43e1fca191e24b5803ff6b3bc7c10' - _swh_anchor_id_context = '%s;%s' % (_swh_anchor_id, _context) - - # given - deposit_id = self.create_deposit_with_status( - status=_status, - swh_id=_swh_id, - swh_id_context=_swh_id_context, - swh_anchor_id=_swh_anchor_id, - swh_anchor_id_context=_swh_anchor_id_context - ) - - url = reverse(STATE_IRI, args=[self.collection.name, deposit_id]) - - # when - status_response = self.client.get(url) - - # then - self.assertEqual(status_response.status_code, status.HTTP_200_OK) - r = parse_xml(BytesIO(status_response.content)) - self.assertEqual(int(r['deposit_id']), deposit_id) - self.assertEqual(r['deposit_status'], _status) - self.assertEqual(r['deposit_status_detail'], - DEPOSIT_STATUS_DETAIL[DEPOSIT_STATUS_LOAD_SUCCESS]) - self.assertEqual(r['deposit_swh_id'], _swh_id) - self.assertEqual(r['deposit_swh_id_context'], _swh_id_context) - self.assertEqual(r['deposit_swh_anchor_id'], _swh_anchor_id) - self.assertEqual(r['deposit_swh_anchor_id_context'], - _swh_anchor_id_context) - - def test_status_on_unknown_deposit(self): - """Asking for the status of unknown deposit returns 404 response""" - status_url = reverse(STATE_IRI, args=[self.collection.name, 999]) - status_response = self.client.get(status_url) - self.assertEqual(status_response.status_code, - status.HTTP_404_NOT_FOUND) - - def test_status_with_http_accept_header_should_not_break(self): - """Asking deposit status with Accept header should return 200 - - """ - deposit_id = self.create_deposit_partial() - - status_url = reverse(STATE_IRI, args=[ - self.collection.name, deposit_id]) - response = self.client.get( - status_url, - HTTP_ACCEPT='text/html,application/xml;q=9,*/*,q=8') - - self.assertEqual(response.status_code, status.HTTP_200_OK) - - def test_status_on_deposit_rejected(self): - _status = DEPOSIT_STATUS_REJECTED - _swh_id = '548b3c0a2bb43e1fca191e24b5803ff6b3bc7c10' - _status_detail = {'url': {'summary': 'Wrong url'}} - - # given - deposit_id = self.create_deposit_with_status( - status=_status, swh_id=_swh_id, status_detail=_status_detail) - - url = reverse(STATE_IRI, args=[self.collection.name, deposit_id]) - - # when - status_response = self.client.get(url) - - # then - self.assertEqual(status_response.status_code, status.HTTP_200_OK) - r = parse_xml(BytesIO(status_response.content)) - self.assertEqual(int(r['deposit_id']), deposit_id) - self.assertEqual(r['deposit_status'], _status) - self.assertEqual(r['deposit_status_detail'], '- Wrong url') - self.assertEqual(r['deposit_swh_id'], _swh_id) + deposit = complete_deposit + url = reverse(STATE_IRI, args=[deposit.collection.name, deposit.id]) + + # when + status_response = authenticated_client.get(url) + + # then + assert status_response.status_code == status.HTTP_200_OK + r = parse_xml(BytesIO(status_response.content)) + assert int(r['deposit_id']) == deposit.id + assert r['deposit_status'] == DEPOSIT_STATUS_LOAD_SUCCESS + assert r['deposit_status_detail'] == \ + DEPOSIT_STATUS_DETAIL[DEPOSIT_STATUS_LOAD_SUCCESS] + assert deposit.swh_id is not None + assert r['deposit_swh_id'] == deposit.swh_id + assert deposit.swh_id_context is not None + assert r['deposit_swh_id_context'] == deposit.swh_id_context + assert deposit.swh_anchor_id is not None + assert r['deposit_swh_anchor_id'] == deposit.swh_anchor_id + assert deposit.swh_anchor_id_context is not None + assert r['deposit_swh_anchor_id_context'] == deposit.swh_anchor_id_context diff --git a/swh/deposit/tests/api/test_deposit_update.py b/swh/deposit/tests/api/test_deposit_update.py --- a/swh/deposit/tests/api/test_deposit_update.py +++ b/swh/deposit/tests/api/test_deposit_update.py @@ -5,329 +5,379 @@ from django.urls import reverse from rest_framework import status -from rest_framework.test import APITestCase -from swh.deposit.models import Deposit, DepositRequest +from swh.deposit.models import Deposit, DepositRequest, DepositCollection from swh.deposit.config import EDIT_SE_IRI, EM_IRI +from swh.deposit.parsers import parse_xml -from ..common import BasicTestCase, WithAuthTestCase, CommonCreationRoutine -from ..common import FileSystemCreationRoutine, create_arborescence_archive +from swh.deposit.tests.common import create_arborescence_archive, check_archive -class DepositUpdateOrReplaceExistingDataTest( - APITestCase, WithAuthTestCase, BasicTestCase, - FileSystemCreationRoutine, CommonCreationRoutine): - """Try put/post (update/replace) query on EM_IRI +def test_replace_archive_to_deposit_is_possible( + tmp_path, partial_deposit, deposit_collection, authenticated_client, + sample_archive, atom_dataset): + """Replace all archive with another one should return a 204 response """ - def setUp(self): - super().setUp() + tmp_path = str(tmp_path) + # given + deposit = partial_deposit + requests = DepositRequest.objects.filter( + deposit=deposit, + type='archive') + + assert len(list(requests)) == 1 + check_archive(sample_archive['name'], requests[0].archive.name) + + # we have no metadata for that deposit + requests = list(DepositRequest.objects.filter( + deposit=deposit, type='metadata')) + assert len(requests) == 0 + + response = authenticated_client.post( + reverse(EDIT_SE_IRI, args=[deposit_collection.name, deposit.id]), + content_type='application/atom+xml;type=entry', + data=atom_dataset['entry-data1'], + HTTP_SLUG=deposit.external_id, + HTTP_IN_PROGRESS=True) + + requests = list(DepositRequest.objects.filter( + deposit=deposit, type='metadata')) + assert len(requests) == 1 + + update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit.id]) + external_id = 'some-external-id-1' + archive2 = create_arborescence_archive( + tmp_path, 'archive2', 'file2', b'some other content in file') + + response = authenticated_client.put( + update_uri, + content_type='application/zip', # as zip + data=archive2['data'], + # + headers + CONTENT_LENGTH=archive2['length'], + HTTP_SLUG=external_id, + HTTP_CONTENT_MD5=archive2['md5sum'], + HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', + HTTP_IN_PROGRESS='false', + HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % ( + archive2['name'], )) + + assert response.status_code == status.HTTP_204_NO_CONTENT + + requests = DepositRequest.objects.filter( + deposit=deposit, + type='archive') + + assert len(list(requests)) == 1 + check_archive(archive2['name'], requests[0].archive.name) + + # check we did not touch the other parts + requests = list(DepositRequest.objects.filter( + deposit=deposit, type='metadata')) + assert len(requests) == 1 + + +def test_replace_metadata_to_deposit_is_possible( + tmp_path, authenticated_client, partial_deposit_with_metadata, + deposit_collection, atom_dataset): + """Replace all metadata with another one should return a 204 response - self.atom_entry_data1 = b""" - - bar -""" - - self.atom_entry_data1 = b""" - - bar -""" - - self.archive2 = create_arborescence_archive( - self.root_path, 'archive2', 'file2', b'some other content in file') - - def test_replace_archive_to_deposit_is_possible(self): - """Replace all archive with another one should return a 204 response - - """ - # given - deposit_id = self.create_simple_binary_deposit(status_partial=True) - - deposit = Deposit.objects.get(pk=deposit_id) - requests = DepositRequest.objects.filter( - deposit=deposit, - type='archive') - - assert len(list(requests)) == 1 - assert self.archive['name'] in requests[0].archive.name - - # we have no metadata for that deposit - requests = list(DepositRequest.objects.filter( - deposit=deposit, type='metadata')) - assert len(requests) == 0 - - deposit_id = self._update_deposit_with_status(deposit_id, - status_partial=True) - - requests = list(DepositRequest.objects.filter( - deposit=deposit, type='metadata')) - assert len(requests) == 1 - - update_uri = reverse(EM_IRI, args=[self.collection.name, deposit_id]) - - external_id = 'some-external-id-1' - - response = self.client.put( - update_uri, - content_type='application/zip', # as zip - data=self.archive2['data'], - # + headers - CONTENT_LENGTH=self.archive2['length'], - HTTP_SLUG=external_id, - HTTP_CONTENT_MD5=self.archive2['md5sum'], - HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', - HTTP_IN_PROGRESS='false', - HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % ( - self.archive2['name'], )) - - self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) - - requests = DepositRequest.objects.filter( - deposit=deposit, - type='archive') - - self.assertEqual(len(list(requests)), 1) - self.assertRegex(requests[0].archive.name, self.archive2['name']) - - # check we did not touch the other parts - requests = list(DepositRequest.objects.filter( - deposit=deposit, type='metadata')) - self.assertEqual(len(requests), 1) - - def test_replace_metadata_to_deposit_is_possible(self): - """Replace all metadata with another one should return a 204 response - - """ - # given - deposit_id = self.create_simple_binary_deposit(status_partial=True) - - deposit = Deposit.objects.get(pk=deposit_id) - requests = DepositRequest.objects.filter( - deposit=deposit, - type='metadata') - assert len(list(requests)) == 0 - - requests = list(DepositRequest.objects.filter( - deposit=deposit, type='archive')) - assert len(requests) == 1 - - update_uri = reverse(EDIT_SE_IRI, args=[self.collection.name, - deposit_id]) - - response = self.client.put( - update_uri, - content_type='application/atom+xml;type=entry', - data=self.atom_entry_data1) - - self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) - - requests = DepositRequest.objects.filter( - deposit=deposit, - type='metadata') - - self.assertEqual(len(list(requests)), 1) - metadata = requests[0].metadata - self.assertEqual(metadata['foobar'], 'bar') - - # check we did not touch the other parts - requests = list(DepositRequest.objects.filter( - deposit=deposit, type='archive')) - self.assertEqual(len(requests), 1) - - def test_add_archive_to_deposit_is_possible(self): - """Add another archive to a deposit return a 201 response - - """ - # given - deposit_id = self.create_simple_binary_deposit(status_partial=True) - - deposit = Deposit.objects.get(pk=deposit_id) - requests = DepositRequest.objects.filter( - deposit=deposit, - type='archive') - - assert len(list(requests)) == 1 - assert self.archive['name'] in requests[0].archive.name - - requests = list(DepositRequest.objects.filter( - deposit=deposit, type='metadata')) - assert len(requests) == 0 - - update_uri = reverse(EM_IRI, args=[self.collection.name, deposit_id]) + """ + # given + deposit = partial_deposit_with_metadata + raw_metadata0 = atom_dataset['entry-data0'] % deposit.external_id.encode( + 'utf-8') + + requests_meta = DepositRequest.objects.filter( + deposit=deposit, + type='metadata') + assert len(requests_meta) == 1 + request_meta0 = requests_meta[0] + assert request_meta0.raw_metadata == raw_metadata0.decode('utf-8') + + requests_archive0 = DepositRequest.objects.filter( + deposit=deposit, type='archive') + assert len(requests_archive0) == 1 + + update_uri = reverse(EDIT_SE_IRI, args=[ + deposit_collection.name, deposit.id]) + + response = authenticated_client.put( + update_uri, + content_type='application/atom+xml;type=entry', + data=atom_dataset['entry-data1']) + + assert response.status_code == status.HTTP_204_NO_CONTENT + + requests_meta = DepositRequest.objects.filter( + deposit=deposit, + type='metadata') + + assert len(requests_meta) == 1 + request_meta1 = requests_meta[0] + raw_metadata1 = request_meta1.raw_metadata + assert raw_metadata1 == atom_dataset['entry-data1'].decode('utf-8') + assert raw_metadata0 != raw_metadata1 + assert request_meta0 != request_meta1 + + # check we did not touch the other parts + requests_archive1 = DepositRequest.objects.filter( + deposit=deposit, type='archive') + assert len(requests_archive1) == 1 + assert set(requests_archive0) == set(requests_archive1) + + +def test_add_archive_to_deposit_is_possible( + tmp_path, authenticated_client, deposit_collection, + partial_deposit_with_metadata, sample_archive): + """Add another archive to a deposit return a 201 response - external_id = 'some-external-id-1' + """ + tmp_path = str(tmp_path) + deposit = partial_deposit_with_metadata + + requests = DepositRequest.objects.filter( + deposit=deposit, + type='archive') + + assert len(requests) == 1 + check_archive(sample_archive['name'], requests[0].archive.name) + + requests_meta0 = DepositRequest.objects.filter( + deposit=deposit, type='metadata') + assert len(requests_meta0) == 1 + + update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit.id]) + + external_id = 'some-external-id-1' + archive2 = create_arborescence_archive( + tmp_path, 'archive2', 'file2', b'some other content in file') + + response = authenticated_client.post( + update_uri, + content_type='application/zip', # as zip + data=archive2['data'], + # + headers + CONTENT_LENGTH=archive2['length'], + HTTP_SLUG=external_id, + HTTP_CONTENT_MD5=archive2['md5sum'], + HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', + HTTP_IN_PROGRESS='false', + HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % ( + archive2['name'],)) + + assert response.status_code == status.HTTP_201_CREATED + + requests = DepositRequest.objects.filter( + deposit=deposit, + type='archive').order_by('id') + + assert len(requests) == 2 + # first archive still exists + check_archive(sample_archive['name'], requests[0].archive.name) + # a new one was added + check_archive(archive2['name'], requests[1].archive.name) + + # check we did not touch the other parts + requests_meta1 = DepositRequest.objects.filter( + deposit=deposit, type='metadata') + assert len(requests_meta1) == 1 + assert set(requests_meta0) == set(requests_meta1) + + +def test_add_metadata_to_deposit_is_possible( + authenticated_client, deposit_collection, + partial_deposit_with_metadata, atom_dataset): + """Add metadata with another one should return a 204 response - response = self.client.post( - update_uri, - content_type='application/zip', # as zip - data=self.archive2['data'], - # + headers - CONTENT_LENGTH=self.archive2['length'], - HTTP_SLUG=external_id, - HTTP_CONTENT_MD5=self.archive2['md5sum'], - HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', - HTTP_IN_PROGRESS='false', - HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % ( - self.archive2['name'],)) - - self.assertEqual(response.status_code, status.HTTP_201_CREATED) + """ + deposit = partial_deposit_with_metadata + requests = DepositRequest.objects.filter( + deposit=deposit, + type='metadata') - requests = list(DepositRequest.objects.filter( - deposit=deposit, - type='archive').order_by('id')) + assert len(requests) == 1 - self.assertEqual(len(requests), 2) - # first archive still exists - self.assertRegex(requests[0].archive.name, self.archive['name']) - # a new one was added - self.assertRegex(requests[1].archive.name, self.archive2['name']) + requests_archive0 = DepositRequest.objects.filter( + deposit=deposit, type='archive') + assert len(requests_archive0) == 1 - # check we did not touch the other parts - requests = list(DepositRequest.objects.filter( - deposit=deposit, type='metadata')) - self.assertEqual(len(requests), 0) + update_uri = reverse(EDIT_SE_IRI, args=[deposit_collection.name, + deposit.id]) - def test_add_metadata_to_deposit_is_possible(self): - """Add metadata with another one should return a 204 response + atom_entry = atom_dataset['entry-data1'] + response = authenticated_client.post( + update_uri, + content_type='application/atom+xml;type=entry', + data=atom_entry) - """ - # given - deposit_id = self.create_deposit_partial() + assert response.status_code == status.HTTP_201_CREATED - deposit = Deposit.objects.get(pk=deposit_id) - requests = DepositRequest.objects.filter( - deposit=deposit, - type='metadata') + requests = DepositRequest.objects.filter( + deposit=deposit, + type='metadata').order_by('id') - assert len(list(requests)) == 2 + assert len(requests) == 2 + expected_raw_meta0 = atom_dataset['entry-data0'] % ( + deposit.external_id.encode('utf-8')) + # a new one was added + assert requests[0].raw_metadata == expected_raw_meta0.decode('utf-8') + assert requests[1].raw_metadata == atom_entry.decode('utf-8') - requests = list(DepositRequest.objects.filter( - deposit=deposit, type='archive')) - assert len(requests) == 0 + # check we did not touch the other parts + requests_archive1 = DepositRequest.objects.filter( + deposit=deposit, type='archive') + assert len(requests_archive1) == 1 + assert set(requests_archive0) == set(requests_archive1) - update_uri = reverse(EDIT_SE_IRI, args=[self.collection.name, - deposit_id]) - response = self.client.post( - update_uri, - content_type='application/atom+xml;type=entry', - data=self.atom_entry_data1) +def test_add_metadata_to_unknown_deposit( + deposit_collection, authenticated_client, atom_dataset): + """Replacing metadata to unknown deposit should return a 404 response - self.assertEqual(response.status_code, status.HTTP_201_CREATED) + """ + unknown_deposit_id = 1000 + try: + Deposit.objects.get(pk=unknown_deposit_id) + except Deposit.DoesNotExist: + assert True + + url = reverse(EDIT_SE_IRI, args=[deposit_collection, unknown_deposit_id]) + response = authenticated_client.post( + url, + content_type='application/atom+xml;type=entry', + data=atom_dataset['entry-data1']) + assert response.status_code == status.HTTP_404_NOT_FOUND + response_content = parse_xml(response.content) + assert 'Unknown collection name' in \ + response_content['sword:error']['summary'] + + +def test_add_metadata_to_unknown_collection( + partial_deposit, authenticated_client, atom_dataset): + """Replacing metadata to unknown deposit should return a 404 response - requests = DepositRequest.objects.filter( - deposit=deposit, - type='metadata').order_by('id') + """ + deposit = partial_deposit + unknown_collection_name = 'unknown-collection' + try: + DepositCollection.objects.get(name=unknown_collection_name) + except DepositCollection.DoesNotExist: + assert True + + url = reverse(EDIT_SE_IRI, args=[unknown_collection_name, deposit.id]) + response = authenticated_client.post( + url, + content_type='application/atom+xml;type=entry', + data=atom_dataset['entry-data1']) + assert response.status_code == status.HTTP_404_NOT_FOUND + response_content = parse_xml(response.content) + assert 'Unknown collection name' in \ + response_content['sword:error']['summary'] + + +def test_replace_metadata_to_unknown_deposit( + authenticated_client, deposit_collection, atom_dataset): + """Adding metadata to unknown deposit should return a 404 response - self.assertEqual(len(list(requests)), 3) - # a new one was added - self.assertEqual(requests[1].metadata['foobar'], 'bar') + """ + unknown_deposit_id = 998 + try: + Deposit.objects.get(pk=unknown_deposit_id) + except Deposit.DoesNotExist: + assert True + url = reverse(EDIT_SE_IRI, args=[ + deposit_collection.name, unknown_deposit_id]) + response = authenticated_client.put( + url, + content_type='application/atom+xml;type=entry', + data=atom_dataset['entry-data1']) + assert response.status_code == status.HTTP_404_NOT_FOUND + response_content = parse_xml(response.content) + assert 'Deposit with id %s does not exist' % unknown_deposit_id == \ + response_content['sword:error']['summary'] + + +def test_add_archive_to_unknown_deposit( + authenticated_client, deposit_collection, atom_dataset): + """Adding metadata to unknown deposit should return a 404 response - # check we did not touch the other parts - requests = list(DepositRequest.objects.filter( - deposit=deposit, type='archive')) - self.assertEqual(len(requests), 0) + """ + unknown_deposit_id = 997 + try: + Deposit.objects.get(pk=unknown_deposit_id) + except Deposit.DoesNotExist: + assert True + + url = reverse(EM_IRI, args=[deposit_collection.name, unknown_deposit_id]) + response = authenticated_client.post(url, + content_type='application/zip', + data=atom_dataset['entry-data1']) + assert response.status_code == status.HTTP_404_NOT_FOUND + response_content = parse_xml(response.content) + assert 'Deposit with id %s does not exist' % unknown_deposit_id == \ + response_content['sword:error']['summary'] + + +def test_replace_archive_to_unknown_deposit( + authenticated_client, deposit_collection, atom_dataset): + """Replacing archive to unknown deposit should return a 404 response + """ + unknown_deposit_id = 996 + try: + Deposit.objects.get(pk=unknown_deposit_id) + except Deposit.DoesNotExist: + assert True + + url = reverse(EM_IRI, args=[deposit_collection.name, unknown_deposit_id]) + response = authenticated_client.put( + url, + content_type='application/zip', + data=atom_dataset['entry-data1']) + assert response.status_code == status.HTTP_404_NOT_FOUND + response_content = parse_xml(response.content) + assert 'Deposit with id %s does not exist' % unknown_deposit_id == \ + response_content['sword:error']['summary'] + + +def test_post_metadata_to_em_iri_failure( + authenticated_client, deposit_collection, partial_deposit, + atom_dataset): + """Update (POST) archive with wrong content type should return 400 -class DepositUpdateFailuresTest(APITestCase, WithAuthTestCase, BasicTestCase, - CommonCreationRoutine): - """Failure scenario about add/replace (post/put) query on deposit. + """ + deposit = partial_deposit + update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit.id]) + response = authenticated_client.post( + update_uri, + content_type='application/x-gtar-compressed', + data=atom_dataset['entry-data1']) + assert response.status_code == status.HTTP_400_BAD_REQUEST + response_content = parse_xml(response.content) + msg = 'Packaging format supported is restricted to ' + \ + 'application/zip, application/x-tar' + assert msg == response_content['sword:error']['summary'] + + +def test_put_metadata_to_em_iri_failure( + authenticated_client, deposit_collection, partial_deposit, + atom_dataset): + """Update (PUT) archive with wrong content type should return 400 """ - def test_add_metadata_to_unknown_collection(self): - """Replacing metadata to unknown deposit should return a 404 response - - """ - url = reverse(EDIT_SE_IRI, args=['test', 1000]) - response = self.client.post( - url, - content_type='application/atom+xml;type=entry', - data=self.atom_entry_data0) - self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) - self.assertRegex(response.content.decode('utf-8'), - 'Unknown collection name test') - - def test_add_metadata_to_unknown_deposit(self): - """Replacing metadata to unknown deposit should return a 404 response - - """ - url = reverse(EDIT_SE_IRI, args=[self.collection.name, 999]) - response = self.client.post( - url, - content_type='application/atom+xml;type=entry', - data=self.atom_entry_data0) - self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) - self.assertRegex(response.content.decode('utf-8'), - 'Deposit with id 999 does not exist') - - def test_replace_metadata_to_unknown_deposit(self): - """Adding metadata to unknown deposit should return a 404 response - - """ - url = reverse(EDIT_SE_IRI, args=[self.collection.name, 998]) - response = self.client.put( - url, - content_type='application/atom+xml;type=entry', - data=self.atom_entry_data0) - self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) - self.assertRegex(response.content.decode('utf-8'), - 'Deposit with id 998 does not exist') - - def test_add_archive_to_unknown_deposit(self): - """Adding metadata to unknown deposit should return a 404 response - - """ - url = reverse(EM_IRI, args=[self.collection.name, 997]) - response = self.client.post( - url, - content_type='application/zip', - data=self.atom_entry_data0) - self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) - self.assertRegex(response.content.decode('utf-8'), - 'Deposit with id 997 does not exist') - - def test_replace_archive_to_unknown_deposit(self): - """Replacing archive to unknown deposit should return a 404 response - - """ - url = reverse(EM_IRI, args=[self.collection.name, 996]) - response = self.client.put( - url, - content_type='application/zip', - data=self.atom_entry_data0) - self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) - self.assertRegex(response.content.decode('utf-8'), - 'Deposit with id 996 does not exist') - - def test_post_metadata_to_em_iri_failure(self): - """Update (POST) archive with wrong content type should return 400 - - """ - deposit_id = self.create_deposit_partial() # only update on partial - update_uri = reverse(EM_IRI, args=[self.collection.name, deposit_id]) - response = self.client.post( - update_uri, - content_type='application/x-gtar-compressed', - data=self.atom_entry_data0) - self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) - self.assertRegex(response.content.decode('utf-8'), - 'Packaging format supported is restricted to ' - 'application/zip, application/x-tar') - - def test_put_metadata_to_em_iri_failure(self): - """Update (PUT) archive with wrong content type should return 400 - - """ - # given - deposit_id = self.create_deposit_partial() # only update on partial - # when - update_uri = reverse(EM_IRI, args=[self.collection.name, deposit_id]) - response = self.client.put( - update_uri, - content_type='application/atom+xml;type=entry', - data=self.atom_entry_data0) - # then - self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) - self.assertRegex(response.content.decode('utf-8'), - 'Packaging format supported is restricted to ' - 'application/zip, application/x-tar') + # given + deposit = partial_deposit + # when + update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit.id]) + response = authenticated_client.put( + update_uri, + content_type='application/atom+xml;type=entry', + data=atom_dataset['entry-data1']) + # then + assert response.status_code == status.HTTP_400_BAD_REQUEST + response_content = parse_xml(response.content) + msg = 'Packaging format supported is restricted to ' + \ + 'application/zip, application/x-tar' + assert msg == response_content['sword:error']['summary'] diff --git a/swh/deposit/tests/api/test_parser.py b/swh/deposit/tests/api/test_parser.py --- a/swh/deposit/tests/api/test_parser.py +++ b/swh/deposit/tests/api/test_parser.py @@ -1,4 +1,4 @@ -# Copyright (C) 2018 The Software Heritage developers +# Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -6,96 +6,92 @@ import io from collections import OrderedDict -from rest_framework.test import APITestCase from swh.deposit.parsers import SWHXMLParser -class ParsingTest(APITestCase): - """Access to main entry point is ok without authentication +def test_parsing_without_duplicates(): + xml_no_duplicate = io.BytesIO(b''' + + Awesome Compiler + + GPL3.0 + https://opensource.org/licenses/GPL-3.0 + + Python3 + + author1 + Inria + + ocaml + http://issuetracker.com +''') - """ - def test_parsing_without_duplicates(self): - xml_no_duplicate = io.BytesIO(b''' - - Awesome Compiler - - GPL3.0 - https://opensource.org/licenses/GPL-3.0 - - Python3 - - author1 - Inria - - ocaml - http://issuetracker.com - ''') + actual_result = SWHXMLParser().parse(xml_no_duplicate) + expected_dict = OrderedDict( + [('@xmlns', 'http://www.w3.org/2005/Atom'), + ('@xmlns:codemeta', + 'https://doi.org/10.5063/SCHEMA/CODEMETA-2.0'), + ('title', 'Awesome Compiler'), + ('codemeta:license', + OrderedDict([('codemeta:name', 'GPL3.0'), + ('codemeta:url', + 'https://opensource.org/licenses/GPL-3.0')])), + ('codemeta:runtimePlatform', 'Python3'), + ('codemeta:author', + OrderedDict([('codemeta:name', 'author1'), + ('codemeta:affiliation', 'Inria')])), + ('codemeta:programmingLanguage', 'ocaml'), + ('codemeta:issueTracker', 'http://issuetracker.com')]) + assert expected_dict == actual_result - actual_result = SWHXMLParser().parse(xml_no_duplicate) - expected_dict = OrderedDict( - [('@xmlns', 'http://www.w3.org/2005/Atom'), - ('@xmlns:codemeta', - 'https://doi.org/10.5063/SCHEMA/CODEMETA-2.0'), - ('title', 'Awesome Compiler'), - ('codemeta:license', - OrderedDict([('codemeta:name', 'GPL3.0'), - ('codemeta:url', - 'https://opensource.org/licenses/GPL-3.0')])), - ('codemeta:runtimePlatform', 'Python3'), - ('codemeta:author', - OrderedDict([('codemeta:name', 'author1'), - ('codemeta:affiliation', 'Inria')])), - ('codemeta:programmingLanguage', 'ocaml'), - ('codemeta:issueTracker', 'http://issuetracker.com')]) - self.assertEqual(expected_dict, actual_result) - def test_parsing_with_duplicates(self): - xml_with_duplicates = io.BytesIO(b''' - - Another Compiler - GNU/Linux - - GPL3.0 - https://opensource.org/licenses/GPL-3.0 - - Un*x - - author1 - Inria - - - author2 - Inria - - ocaml - haskell - - spdx - http://spdx.org - - python3 - ''') +def test_parsing_with_duplicates(): + xml_with_duplicates = io.BytesIO(b''' + + Another Compiler + GNU/Linux + + GPL3.0 + https://opensource.org/licenses/GPL-3.0 + + Un*x + + author1 + Inria + + + author2 + Inria + + ocaml + haskell + + spdx + http://spdx.org + + python3 +''') - actual_result = SWHXMLParser().parse(xml_with_duplicates) + actual_result = SWHXMLParser().parse(xml_with_duplicates) - expected_dict = OrderedDict([ - ('@xmlns', 'http://www.w3.org/2005/Atom'), - ('@xmlns:codemeta', 'https://doi.org/10.5063/SCHEMA/CODEMETA-2.0'), - ('title', 'Another Compiler'), - ('codemeta:runtimePlatform', ['GNU/Linux', 'Un*x']), - ('codemeta:license', - [OrderedDict([('codemeta:name', 'GPL3.0'), - ('codemeta:url', - 'https://opensource.org/licenses/GPL-3.0')]), - OrderedDict([('codemeta:name', 'spdx'), - ('codemeta:url', 'http://spdx.org')])]), - ('codemeta:author', - [OrderedDict([('codemeta:name', 'author1'), - ('codemeta:affiliation', 'Inria')]), - OrderedDict([('codemeta:name', 'author2'), - ('codemeta:affiliation', 'Inria')])]), - ('codemeta:programmingLanguage', ['ocaml', 'haskell', 'python3'])]) - self.assertEqual(expected_dict, actual_result) + expected_dict = OrderedDict([ + ('@xmlns', 'http://www.w3.org/2005/Atom'), + ('@xmlns:codemeta', 'https://doi.org/10.5063/SCHEMA/CODEMETA-2.0'), + ('title', 'Another Compiler'), + ('codemeta:runtimePlatform', ['GNU/Linux', 'Un*x']), + ('codemeta:license', + [OrderedDict([('codemeta:name', 'GPL3.0'), + ('codemeta:url', + 'https://opensource.org/licenses/GPL-3.0')]), + OrderedDict([('codemeta:name', 'spdx'), + ('codemeta:url', 'http://spdx.org')])]), + ('codemeta:author', + [OrderedDict([('codemeta:name', 'author1'), + ('codemeta:affiliation', 'Inria')]), + OrderedDict([('codemeta:name', 'author2'), + ('codemeta:affiliation', 'Inria')])]), + ('codemeta:programmingLanguage', ['ocaml', 'haskell', 'python3'])]) + assert expected_dict == actual_result diff --git a/swh/deposit/tests/api/test_service_document.py b/swh/deposit/tests/api/test_service_document.py --- a/swh/deposit/tests/api/test_service_document.py +++ b/swh/deposit/tests/api/test_service_document.py @@ -5,45 +5,55 @@ from django.urls import reverse from rest_framework import status -from rest_framework.test import APITestCase from swh.deposit.tests import TEST_CONFIG from swh.deposit.config import SD_IRI -from ..common import BasicTestCase, WithAuthTestCase -class ServiceDocumentNoAuthCase(APITestCase, BasicTestCase): - """Service document endpoints are protected with basic authentication. +def test_service_document_no_auth_fails(client): + """Without authentication, service document endpoint should return 401 """ - def test_service_document_no_authentication_fails(self): - """Without authentication, service document endpoint should return 401 + url = reverse(SD_IRI) + response = client.get(url) + assert response.status_code == status.HTTP_401_UNAUTHORIZED - """ - url = reverse(SD_IRI) - response = self.client.get(url) +def test_service_document_no_auth_with_http_auth_should_not_break(client): + """Without auth, sd endpoint through browser should return 401 + + """ + url = reverse(SD_IRI) + response = client.get( + url, + HTTP_ACCEPT='text/html,application/xml;q=9,*/*,q=8') + assert response.status_code == status.HTTP_401_UNAUTHORIZED - self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) - def test_service_document_with_http_accept_should_not_break(self): - """Without auth, sd endpoint through browser should return 401 +def test_service_document(authenticated_client, deposit_user): + """With authentication, service document list user's collection - """ - url = reverse(SD_IRI) + """ + url = reverse(SD_IRI) + response = authenticated_client.get(url) + check_response(response, deposit_user.username) - # when - response = self.client.get( - url, - HTTP_ACCEPT='text/html,application/xml;q=9,*/*,q=8') - self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) +def test_service_document_with_http_accept_header( + authenticated_client, deposit_user): + """With authentication, with browser, sd list user's collection + """ + url = reverse(SD_IRI) + response = authenticated_client.get( + url, + HTTP_ACCEPT='text/html,application/xml;q=9,*/*,q=8') + check_response(response, deposit_user.username) -class ServiceDocumentCase(APITestCase, WithAuthTestCase, BasicTestCase): - def assertResponseOk(self, response): # noqa: N802 - self.assertEqual(response.status_code, status.HTTP_200_OK) - self.assertEqual(response.content.decode('utf-8'), + +def check_response(response, username): + assert response.status_code == status.HTTP_200_OK + assert response.content.decode('utf-8') == \ ''' ''' % (TEST_CONFIG['max_upload_size'], - self.username, - self.username, - self.username, - self.username)) # noqa - - def test_service_document(self): - """With authentication, service document list user's collection - - """ - url = reverse(SD_IRI) - - # when - response = self.client.get(url) - - # then - self.assertResponseOk(response) - - def test_service_document_with_http_accept_header(self): - """With authentication, with browser, sd list user's collection - - """ - url = reverse(SD_IRI) - - # when - response = self.client.get( - url, - HTTP_ACCEPT='text/html,application/xml;q=9,*/*,q=8') - - self.assertResponseOk(response) + username, + username, + username, + username) # noqa diff --git a/swh/deposit/tests/common.py b/swh/deposit/tests/common.py --- a/swh/deposit/tests/common.py +++ b/swh/deposit/tests/common.py @@ -6,6 +6,7 @@ import base64 import hashlib import os +import re import shutil import tarfile import tempfile @@ -566,3 +567,20 @@ if dr.type == 'metadata': assert deposit_requests[0].metadata is not {} return deposit_id + + +def check_archive(archive_name: str, archive_name_to_check: str): + """Helper function to ensure archive_name is present within the + archive_name_to_check. + + Raises: + AssertionError if archive_name is not present within + archive_name_to_check + + """ + if '.' in archive_name: + filename, extension = archive_name.split('.') + pattern = re.compile('.*/%s.*\\.%s' % (filename, extension)) + else: + pattern = re.compile('.*/%s' % archive_name) + assert pattern.match(archive_name_to_check) is not None diff --git a/swh/deposit/tests/conftest.py b/swh/deposit/tests/conftest.py new file mode 100644 --- /dev/null +++ b/swh/deposit/tests/conftest.py @@ -0,0 +1,306 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import base64 +import pytest +import psycopg2 + +from django.urls import reverse +from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT +from rest_framework import status +from rest_framework.test import APIClient +from typing import Mapping + +from swh.scheduler.tests.conftest import * # noqa +from swh.deposit.parsers import parse_xml +from swh.deposit.config import ( + COL_IRI, EDIT_SE_IRI, DEPOSIT_STATUS_DEPOSITED, DEPOSIT_STATUS_REJECTED, + DEPOSIT_STATUS_PARTIAL, DEPOSIT_STATUS_LOAD_SUCCESS, + DEPOSIT_STATUS_LOAD_FAILURE +) +from swh.deposit.tests.common import create_arborescence_archive + + +TEST_USER = { + 'username': 'test', + 'password': 'password', + 'email': 'test@example.org', + 'provider_url': 'https://hal-test.archives-ouvertes.fr/', + 'domain': 'archives-ouvertes.fr/', + 'collection': { + 'name': 'test' + }, +} + + +def execute_sql(sql): + """Execute sql to postgres db""" + with psycopg2.connect(database='postgres') as conn: + conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) + cur = conn.cursor() + cur.execute(sql) + + +@pytest.hookimpl(tryfirst=True) +def pytest_load_initial_conftests(early_config, parser, args): + """This hook is done prior to django loading. + Used to initialize the deposit's server db. + + """ + import project.app.signals + + def prepare_db(*args, **kwargs): + from django.conf import settings + db_name = 'tests' + print('before: %s' % settings.DATABASES) + # work around db settings for django + for k, v in [ + ('ENGINE', 'django.db.backends.postgresql'), + ('NAME', 'tests'), + ('USER', postgresql_proc.user), # noqa + ('HOST', postgresql_proc.host), # noqa + ('PORT', postgresql_proc.port), # noqa + ]: + settings.DATABASES['default'][k] = v + + print('after: %s' % settings.DATABASES) + execute_sql('DROP DATABASE IF EXISTS %s' % db_name) + execute_sql('CREATE DATABASE %s TEMPLATE template0' % db_name) + + project.app.signals.something = prepare_db + + +def create_deposit_collection(collection_name: str): + """Create a deposit collection with name collection_name + + """ + from swh.deposit.models import DepositCollection + try: + collection = DepositCollection._default_manager.get( + name=collection_name) + except DepositCollection.DoesNotExist: + collection = DepositCollection(name=collection_name) + collection.save() + return collection + + +def deposit_collection_factory( + collection_name=TEST_USER['collection']['name']): + @pytest.fixture + def _deposit_collection(db, collection_name=collection_name): + return create_deposit_collection(collection_name) + + return _deposit_collection + + +deposit_collection = deposit_collection_factory() +deposit_another_collection = deposit_collection_factory('another-collection') + + +@pytest.fixture +def deposit_user(db, deposit_collection): + """Create/Return the test_user "test" + + """ + from swh.deposit.models import DepositClient + try: + user = DepositClient._default_manager.get( + username=TEST_USER['username']) + except DepositClient.DoesNotExist: + user = DepositClient._default_manager.create_user( + username=TEST_USER['username'], + email=TEST_USER['email'], + password=TEST_USER['password'], + provider_url=TEST_USER['provider_url'], + domain=TEST_USER['domain'], + ) + user.collections = [deposit_collection.id] + user.save() + return user + + +@pytest.fixture +def client(): + """Override pytest-django one which does not work for djangorestframework. + + """ + return APIClient() # <- drf's client + + +@pytest.yield_fixture +def authenticated_client(client, deposit_user): + """Returned a logged client + + """ + _token = '%s:%s' % (deposit_user.username, TEST_USER['password']) + token = base64.b64encode(_token.encode('utf-8')) + authorization = 'Basic %s' % token.decode('utf-8') + client.credentials(HTTP_AUTHORIZATION=authorization) + yield client + client.logout() + + +@pytest.fixture +def sample_archive(tmp_path): + """Returns a sample archive + + """ + tmp_path = str(tmp_path) # pytest version limitation in previous version + archive = create_arborescence_archive( + tmp_path, 'archive1', 'file1', b'some content in file') + + return archive + + +def create_deposit( + authenticated_client, collection_name: str, sample_archive, + external_id: str, deposit_status=DEPOSIT_STATUS_DEPOSITED): + """Create a skeleton shell deposit + + """ + url = reverse(COL_IRI, args=[collection_name]) + # when + response = authenticated_client.post( + url, + content_type='application/zip', # as zip + data=sample_archive['data'], + # + headers + CONTENT_LENGTH=sample_archive['length'], + HTTP_SLUG=external_id, + HTTP_CONTENT_MD5=sample_archive['md5sum'], + HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', + HTTP_IN_PROGRESS='false', + HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % ( + sample_archive['name'])) + + # then + assert response.status_code == status.HTTP_201_CREATED + from swh.deposit.models import Deposit + deposit = Deposit._default_manager.get(external_id=external_id) + + if deposit.status != deposit_status: + deposit.status = deposit_status + deposit.save() + assert deposit.status == deposit_status + return deposit + + +def create_binary_deposit( + authenticated_client, collection_name: str, sample_archive, + external_id: str, deposit_status: str = DEPOSIT_STATUS_DEPOSITED, + atom_dataset: Mapping[str, bytes] = {}): + """Create a deposit with both metadata and archive set. Then alters its status + to `deposit_status`. + + """ + deposit = create_deposit( + authenticated_client, collection_name, sample_archive, + external_id=external_id, deposit_status=DEPOSIT_STATUS_PARTIAL) + + response = authenticated_client.post( + reverse(EDIT_SE_IRI, args=[collection_name, deposit.id]), + content_type='application/atom+xml;type=entry', + data=atom_dataset['entry-data0'] % deposit.external_id.encode('utf-8'), + HTTP_SLUG=deposit.external_id, + HTTP_IN_PROGRESS='true') + + assert response.status_code == status.HTTP_201_CREATED + assert deposit.status == DEPOSIT_STATUS_PARTIAL + + from swh.deposit.models import Deposit + deposit = Deposit._default_manager.get(pk=deposit.id) + if deposit.status != deposit_status: + deposit.status = deposit_status + deposit.save() + + assert deposit.status == deposit_status + return deposit + + +def deposit_factory(deposit_status=DEPOSIT_STATUS_DEPOSITED): + """Build deposit with a specific status + + """ + @pytest.fixture() + def _deposit(sample_archive, deposit_collection, authenticated_client, + deposit_status=deposit_status): + external_id = 'external-id-%s' % deposit_status + return create_deposit( + authenticated_client, deposit_collection.name, sample_archive, + external_id=external_id, deposit_status=deposit_status + ) + + return _deposit + + +deposited_deposit = deposit_factory() +rejected_deposit = deposit_factory(deposit_status=DEPOSIT_STATUS_REJECTED) +partial_deposit = deposit_factory(deposit_status=DEPOSIT_STATUS_PARTIAL) +completed_deposit = deposit_factory(deposit_status=DEPOSIT_STATUS_LOAD_SUCCESS) +failed_deposit = deposit_factory(deposit_status=DEPOSIT_STATUS_LOAD_FAILURE) + + +@pytest.fixture +def partial_deposit_with_metadata( + sample_archive, deposit_collection, authenticated_client, + atom_dataset): + """Returns deposit with archive and metadata provided, status 'partial' + + """ + return create_binary_deposit( + authenticated_client, deposit_collection.name, sample_archive, + external_id='external-id-partial', + deposit_status=DEPOSIT_STATUS_PARTIAL, + atom_dataset=atom_dataset + ) + + +@pytest.fixture +def partial_deposit_only_metadata( + deposit_collection, authenticated_client, + atom_dataset): + + response = authenticated_client.post( + reverse(COL_IRI, args=[deposit_collection.name]), + content_type='application/atom+xml;type=entry', + data=atom_dataset['entry-data1'], + HTTP_SLUG='external-id-partial', + HTTP_IN_PROGRESS=True) + + assert response.status_code == status.HTTP_201_CREATED + + response_content = parse_xml(response.content) + deposit_id = response_content['deposit_id'] + from swh.deposit.models import Deposit + deposit = Deposit._default_manager.get(pk=deposit_id) + assert deposit.status == DEPOSIT_STATUS_PARTIAL + return deposit + + +@pytest.fixture +def complete_deposit(sample_archive, deposit_collection, authenticated_client): + """Returns a completed deposit (load success) + + """ + deposit = create_deposit( + authenticated_client, deposit_collection.name, sample_archive, + external_id='external-id-complete', + deposit_status=DEPOSIT_STATUS_LOAD_SUCCESS + ) + _swh_id_context = 'https://hal.archives-ouvertes.fr/hal-01727745' + deposit.swh_id = 'swh:1:dir:42a13fc721c8716ff695d0d62fc851d641f3a12b' + deposit.swh_id_context = '%s;%s' % ( + deposit.swh_id, _swh_id_context) + deposit.swh_anchor_id = \ + 'swh:rev:1:548b3c0a2bb43e1fca191e24b5803ff6b3bc7c10' + deposit.swh_anchor_id_context = '%s;%s' % ( + deposit.swh_anchor_id, _swh_id_context) + deposit.save() + return deposit + + +@pytest.fixture() +def tmp_path(tmp_path): + return str(tmp_path) # issue with oldstable's pytest version diff --git a/swh/deposit/tests/loader/conftest.py b/swh/deposit/tests/loader/conftest.py --- a/swh/deposit/tests/loader/conftest.py +++ b/swh/deposit/tests/loader/conftest.py @@ -1,3 +1,8 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + import pytest from swh.scheduler.tests.conftest import * # noqa diff --git a/swh/deposit/tests/test_common.py b/swh/deposit/tests/test_common.py new file mode 100644 --- /dev/null +++ b/swh/deposit/tests/test_common.py @@ -0,0 +1,26 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import pytest + +from swh.deposit.tests.common import check_archive + + +def test_check_archive_helper(): + # success + for archive_name, archive_name_to_check in [ + ('filename0', 'something/filename0'), + ('archive.zip', 'client_1/archive_noisynoise.zip'), + ]: + check_archive(archive_name, archive_name_to_check) + + # failures + for archive_name, archive_name_to_check in [ + ('filename0', 'something-filename0'), + ('archive.zip', 'client_1_archive_noisynoise.zip'), + ('reference', 'irrelevant'), + ]: + with pytest.raises(AssertionError): + check_archive(archive_name, archive_name_to_check) diff --git a/swh/deposit/tests/test_utils.py b/swh/deposit/tests/test_utils.py --- a/swh/deposit/tests/test_utils.py +++ b/swh/deposit/tests/test_utils.py @@ -3,7 +3,6 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import unittest import pytest from unittest.mock import patch @@ -48,127 +47,127 @@ utils.origin_url_from(deposit) -class UtilsTestCase(unittest.TestCase): - """Utils library +def test_merge(): + """Calling utils.merge on dicts should merge without losing information """ - def test_merge(self): - """Calling utils.merge on dicts should merge without losing information - - """ - d0 = { - 'author': 'someone', - 'license': [['gpl2']], - 'a': 1 + d0 = { + 'author': 'someone', + 'license': [['gpl2']], + 'a': 1 + } + + d1 = { + 'author': ['author0', {'name': 'author1'}], + 'license': [['gpl3']], + 'b': { + '1': '2' } + } - d1 = { - 'author': ['author0', {'name': 'author1'}], - 'license': [['gpl3']], - 'b': { - '1': '2' - } + d2 = { + 'author': map(lambda x: x, ['else']), + 'license': 'mit', + 'b': { + '2': '3', } - - d2 = { - 'author': map(lambda x: x, ['else']), - 'license': 'mit', - 'b': { - '2': '3', - } + } + + d3 = { + 'author': (v for v in ['no one']), + } + + actual_merge = utils.merge(d0, d1, d2, d3) + + expected_merge = { + 'a': 1, + 'license': [['gpl2'], ['gpl3'], 'mit'], + 'author': [ + 'someone', 'author0', {'name': 'author1'}, 'else', 'no one'], + 'b': { + '1': '2', + '2': '3', } + } + assert actual_merge == expected_merge - d3 = { - 'author': (v for v in ['no one']), - } - actual_merge = utils.merge(d0, d1, d2, d3) - - expected_merge = { - 'a': 1, - 'license': [['gpl2'], ['gpl3'], 'mit'], - 'author': [ - 'someone', 'author0', {'name': 'author1'}, 'else', 'no one'], - 'b': { - '1': '2', - '2': '3', - } +def test_merge_2(): + d0 = { + 'license': 'gpl2', + 'runtime': { + 'os': 'unix derivative' } - self.assertEqual(actual_merge, expected_merge) + } - def test_merge_2(self): - d0 = { - 'license': 'gpl2', - 'runtime': { + d1 = { + 'license': 'gpl3', + 'runtime': 'GNU/Linux' + } + + expected = { + 'license': ['gpl2', 'gpl3'], + 'runtime': [ + { 'os': 'unix derivative' - } - } + }, + 'GNU/Linux' + ], + } - d1 = { - 'license': 'gpl3', - 'runtime': 'GNU/Linux' - } + actual = utils.merge(d0, d1) + assert actual == expected - expected = { - 'license': ['gpl2', 'gpl3'], - 'runtime': [ - { - 'os': 'unix derivative' - }, - 'GNU/Linux' - ], - } - actual = utils.merge(d0, d1) - self.assertEqual(actual, expected) - - def test_merge_edge_cases(self): - input_dict = { - 'license': ['gpl2', 'gpl3'], - 'runtime': [ - { - 'os': 'unix derivative' - }, - 'GNU/Linux' - ], - } - # against empty dict - actual = utils.merge(input_dict, {}) - self.assertEqual(actual, input_dict) +def test_merge_edge_cases(): + input_dict = { + 'license': ['gpl2', 'gpl3'], + 'runtime': [ + { + 'os': 'unix derivative' + }, + 'GNU/Linux' + ], + } + # against empty dict + actual = utils.merge(input_dict, {}) + assert actual == input_dict - # against oneself - actual = utils.merge(input_dict, input_dict, input_dict) - self.assertEqual(input_dict, input_dict) + # against oneself + actual = utils.merge(input_dict, input_dict, input_dict) + assert actual == input_dict - def test_merge_one_dict(self): - """Merge one dict should result in the same dict value - """ - input_and_expected = {'anything': 'really'} - actual = utils.merge(input_and_expected) - self.assertEqual(actual, input_and_expected) +def test_merge_one_dict(): + """Merge one dict should result in the same dict value - def test_merge_raise(self): - """Calling utils.merge with any no dict argument should raise + """ + input_and_expected = {'anything': 'really'} + actual = utils.merge(input_and_expected) + assert actual == input_and_expected - """ - d0 = { - 'author': 'someone', - 'a': 1 - } - d1 = ['not a dict'] +def test_merge_raise(): + """Calling utils.merge with any no dict argument should raise + + """ + d0 = { + 'author': 'someone', + 'a': 1 + } + + d1 = ['not a dict'] - with self.assertRaises(ValueError): - utils.merge(d0, d1) + with pytest.raises(ValueError): + utils.merge(d0, d1) - with self.assertRaises(ValueError): - utils.merge(d1, d0) + with pytest.raises(ValueError): + utils.merge(d1, d0) - with self.assertRaises(ValueError): - utils.merge(d1) + with pytest.raises(ValueError): + utils.merge(d1) - self.assertEqual(utils.merge(d0), d0) + assert utils.merge(d0) == d0 @patch('swh.deposit.utils.normalize_timestamp', side_effect=lambda x: x) diff --git a/tox.ini b/tox.ini --- a/tox.ini +++ b/tox.ini @@ -13,6 +13,16 @@ commands = pifpaf run postgresql -- pytest --cov {envsitepackagesdir}/swh/deposit --cov-branch {posargs} {envsitepackagesdir}/swh/deposit + +[testenv:py3-dev] +deps = + .[testing] + pytest-cov + ipdb +commands = + pytest {envsitepackagesdir}/swh/deposit/ {posargs} + + [testenv:flake8] skip_install = true deps =