diff --git a/swh/deposit/tests/api/test_common.py b/swh/deposit/tests/api/test_common.py index 12585229..74bc0b8b 100644 --- a/swh/deposit/tests/api/test_common.py +++ b/swh/deposit/tests/api/test_common.py @@ -1,42 +1,39 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from django.core.urlresolvers import reverse -from nose.tools import istest from rest_framework import status from rest_framework.test import APITestCase from ..common import BasicTestCase, WithAuthTestCase class IndexNoAuthCase(APITestCase, BasicTestCase): """Access to main entry point is ok without authentication """ - @istest - def get_home_is_ok(self): + def test_get_home_is_ok(self): """Without authentication, endpoint refuses access with 401 response """ url = reverse('home') response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertIn(b'The Software Heritage Deposit', response.content) class IndexWithAuthCase(WithAuthTestCase, APITestCase, BasicTestCase): """Access to main entry point is ok with authentication as well """ - @istest - def get_home_is_ok_2(self): + def test_get_home_is_ok_2(self): """Without authentication, endpoint refuses access with 401 response """ url = reverse('home') response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertIn(b'The Software Heritage Deposit', response.content) diff --git a/swh/deposit/tests/api/test_converters.py b/swh/deposit/tests/api/test_converters.py index f02b43a2..3d2a4888 100644 --- a/swh/deposit/tests/api/test_converters.py +++ b/swh/deposit/tests/api/test_converters.py @@ -1,138 +1,132 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from nose.tools import istest from rest_framework.test import APITestCase from swh.deposit.api.converters import convert_status_detail class ConvertersTestCase(APITestCase): - @istest - def convert_status_detail_empty(self): + def test_convert_status_detail_empty(self): actual_status_detail = convert_status_detail({}) self.assertIsNone(actual_status_detail) actual_status_detail = convert_status_detail({'dummy-keys': []}) self.assertIsNone(actual_status_detail) actual_status_detail = convert_status_detail(None) self.assertIsNone(actual_status_detail) - @istest - def convert_status_detail(self): + def test_convert_status_detail(self): status_detail = { 'url': { 'summary': "At least one url field must be compatible with the client\'s domain name. The following url fields failed the check", # noqa 'fields': ['blahurl', 'testurl'], }, 'metadata': [ { 'summary': 'Mandatory fields missing', 'fields': ['url', 'title'], }, { 'summary': 'Alternate fields missing', 'fields': ['name or title', 'url or badurl'] } ], 'archive': [{ 'summary': 'Unreadable archive', 'fields': ['1'], }], } expected_status_detail = '''- Mandatory fields missing (url, title) - Alternate fields missing (name or title, url or badurl) - Unreadable archive (1) - At least one url field must be compatible with the client's domain name. The following url fields failed the check (blahurl, testurl) ''' # noqa actual_status_detail = convert_status_detail(status_detail) self.assertEqual(actual_status_detail, expected_status_detail) - @istest - def convert_status_detail_2(self): + def test_convert_status_detail_2(self): status_detail = { 'url': { 'summary': 'At least one compatible url field. Failed', 'fields': ['testurl'], }, 'metadata': [ { 'summary': 'Mandatory fields missing', 'fields': ['name'], }, ], 'archive': [ { 'summary': 'Invalid archive', 'fields': ['2'], }, { 'summary': 'Unsupported archive', 'fields': ['1'], } ], } expected_status_detail = '''- Mandatory fields missing (name) - Invalid archive (2) - Unsupported archive (1) - At least one compatible url field. Failed (testurl) ''' actual_status_detail = convert_status_detail(status_detail) self.assertEqual(actual_status_detail, expected_status_detail) - @istest - def convert_status_detail_3(self): + def test_convert_status_detail_3(self): status_detail = { 'url': { 'summary': 'At least one compatible url field', }, } expected_status_detail = '- At least one compatible url field\n' actual_status_detail = convert_status_detail(status_detail) self.assertEqual(actual_status_detail, expected_status_detail) - @istest - def convert_status_detail_edge_case(self): + def test_convert_status_detail_edge_case(self): status_detail = { 'url': { 'summary': 'At least one compatible url field. Failed', 'fields': ['testurl'], }, 'metadata': [ { 'summary': 'Mandatory fields missing', 'fields': ['9', 10, 1.212], }, ], 'archive': [ { 'summary': 'Invalid archive', 'fields': ['3'], }, { 'summary': 'Unsupported archive', 'fields': [2], } ], } expected_status_detail = '''- Mandatory fields missing (9, 10, 1.212) - Invalid archive (3) - Unsupported archive (2) - At least one compatible url field. Failed (testurl) ''' actual_status_detail = convert_status_detail(status_detail) self.assertEqual(actual_status_detail, expected_status_detail) diff --git a/swh/deposit/tests/api/test_deposit.py b/swh/deposit/tests/api/test_deposit.py index 507bafdd..1fe03228 100644 --- a/swh/deposit/tests/api/test_deposit.py +++ b/swh/deposit/tests/api/test_deposit.py @@ -1,167 +1,160 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import hashlib from django.core.urlresolvers import reverse from io import BytesIO -from nose.tools import istest, nottest from rest_framework import status from rest_framework.test import APITestCase from swh.deposit.config import COL_IRI, EDIT_SE_IRI, DEPOSIT_STATUS_REJECTED from swh.deposit.config import DEPOSIT_STATUS_PARTIAL from swh.deposit.config import DEPOSIT_STATUS_LOAD_SUCCESS from swh.deposit.config import DEPOSIT_STATUS_LOAD_FAILURE from swh.deposit.models import Deposit, DepositClient, DepositCollection from swh.deposit.parsers import parse_xml from ..common import BasicTestCase, WithAuthTestCase, CommonCreationRoutine class DepositNoAuthCase(APITestCase, BasicTestCase): """Deposit access are protected with basic authentication. """ - @istest - def post_will_fail_with_401(self): + def test_post_will_fail_with_401(self): """Without authentication, endpoint refuses access with 401 response """ url = reverse(COL_IRI, args=[self.collection.name]) # when response = self.client.post(url) # then self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) class DepositFailuresTest(APITestCase, WithAuthTestCase, BasicTestCase, CommonCreationRoutine): """Deposit access are protected with basic authentication. """ def setUp(self): super().setUp() # Add another user _collection2 = DepositCollection(name='some') _collection2.save() _user = DepositClient.objects.create_user(username='user', password='user') _user.collections = [_collection2.id] self.collection2 = _collection2 - @istest - def access_to_another_user_collection_is_forbidden(self): + def test_access_to_another_user_collection_is_forbidden(self): """Access to another user collection should return a 403 """ url = reverse(COL_IRI, args=[self.collection2.name]) response = self.client.post(url) self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) self.assertRegex(response.content.decode('utf-8'), 'Client hal cannot access collection %s' % ( self.collection2.name, )) - @istest - def delete_on_col_iri_not_supported(self): + def test_delete_on_col_iri_not_supported(self): """Delete on col iri should return a 405 response """ url = reverse(COL_IRI, args=[self.collection.name]) response = self.client.delete(url) self.assertEqual(response.status_code, status.HTTP_405_METHOD_NOT_ALLOWED) self.assertRegex(response.content.decode('utf-8'), 'DELETE method is not supported on this endpoint') - @nottest def create_deposit_with_rejection_status(self): url = reverse(COL_IRI, args=[self.collection.name]) data = b'some data which is clearly not a zip file' md5sum = hashlib.md5(data).hexdigest() external_id = 'some-external-id-1' # when response = self.client.post( url, content_type='application/zip', # as zip data=data, # + headers CONTENT_LENGTH=len(data), # other headers needs HTTP_ prefix to be taken into account HTTP_SLUG=external_id, HTTP_CONTENT_MD5=md5sum, HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') self.assertEquals(response.status_code, status.HTTP_201_CREATED) response_content = parse_xml(BytesIO(response.content)) actual_state = response_content['deposit_status'] self.assertEquals(actual_state, DEPOSIT_STATUS_REJECTED) - @istest - def act_on_deposit_rejected_is_not_permitted(self): + def test_act_on_deposit_rejected_is_not_permitted(self): deposit_id = self.create_deposit_with_status(DEPOSIT_STATUS_REJECTED) deposit = Deposit.objects.get(pk=deposit_id) assert deposit.status == DEPOSIT_STATUS_REJECTED response = self.client.post( reverse(EDIT_SE_IRI, args=[self.collection.name, deposit_id]), content_type='application/atom+xml;type=entry', data=self.atom_entry_data1, HTTP_SLUG='external-id') self.assertEquals(response.status_code, status.HTTP_400_BAD_REQUEST) self.assertRegex( response.content.decode('utf-8'), "You can only act on deposit with status '%s'" % ( DEPOSIT_STATUS_PARTIAL, )) - @istest - def add_deposit_with_parent(self): + def test_add_deposit_with_parent(self): # given multiple deposit already loaded deposit_id = self.create_deposit_with_status( status=DEPOSIT_STATUS_LOAD_SUCCESS, external_id='some-external-id') deposit1 = Deposit.objects.get(pk=deposit_id) self.assertIsNotNone(deposit1) self.assertEquals(deposit1.external_id, 'some-external-id') self.assertEquals(deposit1.status, DEPOSIT_STATUS_LOAD_SUCCESS) deposit_id2 = self.create_deposit_with_status( status=DEPOSIT_STATUS_LOAD_SUCCESS, external_id='some-external-id') deposit2 = Deposit.objects.get(pk=deposit_id2) self.assertIsNotNone(deposit2) self.assertEquals(deposit2.external_id, 'some-external-id') self.assertEquals(deposit2.status, DEPOSIT_STATUS_LOAD_SUCCESS) deposit_id3 = self.create_deposit_with_status( status=DEPOSIT_STATUS_LOAD_FAILURE, external_id='some-external-id') deposit3 = Deposit.objects.get(pk=deposit_id3) self.assertIsNotNone(deposit3) self.assertEquals(deposit3.external_id, 'some-external-id') self.assertEquals(deposit3.status, DEPOSIT_STATUS_LOAD_FAILURE) # when deposit_id3 = self.create_simple_deposit_partial( external_id='some-external-id') # then deposit4 = Deposit.objects.get(pk=deposit_id3) self.assertIsNotNone(deposit4) self.assertEquals(deposit4.external_id, 'some-external-id') self.assertEquals(deposit4.status, DEPOSIT_STATUS_PARTIAL) self.assertEquals(deposit4.parent, deposit2) diff --git a/swh/deposit/tests/api/test_deposit_atom.py b/swh/deposit/tests/api/test_deposit_atom.py index b4096b80..84b5f742 100644 --- a/swh/deposit/tests/api/test_deposit_atom.py +++ b/swh/deposit/tests/api/test_deposit_atom.py @@ -1,538 +1,528 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from django.core.urlresolvers import reverse from io import BytesIO -from nose.tools import istest from rest_framework import status from rest_framework.test import APITestCase from swh.deposit.config import COL_IRI, DEPOSIT_STATUS_DEPOSITED from swh.deposit.models import Deposit, DepositRequest from swh.deposit.parsers import parse_xml from ..common import BasicTestCase, WithAuthTestCase class DepositAtomEntryTestCase(APITestCase, WithAuthTestCase, BasicTestCase): """Try and post atom entry deposit. """ def setUp(self): super().setUp() self.atom_entry_data0 = b""" Awesome Compiler hal urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a %s 2017-10-07T15:17:08Z some awesome author something awesome-compiler This is an awesome compiler destined to awesomely compile stuff and other stuff compiler,programming,language 2005-10-07T17:17:08Z 2005-10-07T17:17:08Z release note related link Awesome https://hoster.org/awesome-compiler GNU/Linux 0.0.1 running all """ self.atom_entry_data1 = b""" hal urn:uuid:2225c695-cfb8-4ebb-aaaa-80da344efa6a 2017-10-07T15:17:08Z some awesome author something awesome-compiler This is an awesome compiler destined to awesomely compile stuff and other stuff compiler,programming,language 2005-10-07T17:17:08Z 2005-10-07T17:17:08Z release note related link Awesome https://hoster.org/awesome-compiler GNU/Linux 0.0.1 running all """ self.atom_entry_data2 = b""" %s """ self.atom_entry_data_empty_body = b""" """ self.atom_entry_data3 = b""" something """ self.atom_entry_data_atom_only = b""" Awesome Compiler urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a 1785io25c695 2017-10-07T15:17:08Z some awesome author """ self.atom_entry_data_codemeta = b""" Awesome Compiler urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a 1785io25c695 1785io25c695 origin url other identifier, DOI, ARK Domain description key-word 1 key-word 2 creation date publication date comment article name article id Collaboration/Projet project name id see also Sponsor A Sponsor B Platform/OS dependencies Version active license url spdx .Net Framework 3.0 Python2.3 author1 Inria UPMC author2 Inria UPMC http://code.com language 1 language 2 http://issuetracker.com """ # noqa self.atom_entry_data_dc_codemeta = b""" %s hal-01587361 https://hal.inria.fr/hal-01587361 https://hal.inria.fr/hal-01587361/document https://hal.inria.fr/hal-01587361/file/AffectationRO-v1.0.0.zip doi:10.5281/zenodo.438684 The assignment problem AffectationRO Gruenpeter, Morane [INFO] Computer Science [cs] [INFO.INFO-RO] Computer Science [cs]/Operations Research [cs.RO] SOFTWARE Project in OR: The assignment problemA java implementation for the assignment problem first release description fr 2015-06-01 2017-10-19 en url stable Version sur hal Version entre par lutilisateur Mots-cls Commentaire Rfrence interne Collaboration/Projet nom du projet id Voir aussi Financement Projet ANR Projet Europen Platform/OS Dpendances Etat du dveloppement license url spdx Outils de dveloppement- outil no1 Outils de dveloppement- outil no2 http://code.com language 1 language 2 """ # noqa self.atom_entry_tei = b"""HAL TEI export of hal-01587083CCSDDistributed under a Creative Commons Attribution 4.0 International License

HAL API platform

questionnaire software metadataMoraneGruenpeter7de56c632362954fa84172cad80afe4einria.fr1556733MoraneGruenpeterf85a43a5fb4a2e0778a77e017f28c8fdgmail.com2017-09-29 11:21:322017-10-03 17:20:132017-10-03 17:20:132017-09-292017-09-29contributorMoraneGruenpeterf85a43a5fb4a2e0778a77e017f28c8fdgmail.comCCSDhal-01587083https://hal.inria.fr/hal-01587083gruenpeter:hal-0158708320172017questionnaire software metadataMoraneGruenpeter7de56c632362954fa84172cad80afe4einria.fr1556733EnglishComputer Science [cs]SoftwareIRILLInitiative pour la Recherche et l'Innovation sur le Logiciel Libre
https://www.irill.org/
Universite Pierre et Marie Curie - Paris 6UPMC
4 place Jussieu - 75005 Paris
http://www.upmc.fr/
Institut National de Recherche en Informatique et en AutomatiqueInria
Domaine de VoluceauRocquencourt - BP 10578153 Le Chesnay Cedex
http://www.inria.fr/en/
Universite Paris Diderot - Paris 7UPD7
5 rue Thomas-Mann - 75205 Paris cedex 13
http://www.univ-paris-diderot.fr
""" # noqa self.atom_entry_data_badly_formatted = b""" """ self.atom_error_with_decimal = b""" Composing a Web of Audio Applications hal hal-01243065 hal-01243065 https://hal-test.archives-ouvertes.fr/hal-01243065 test DSP programming,Web,Composability,Faust 2017-05-03T16:08:47+02:00 The Web offers a great opportunity to share, deploy and use programs without installation difficulties. In this article we explore the idea of freely combining/composing real-time audio applications deployed on the Web using Faust audio DSP language. 1 10.4 phpstorm stable linux php python C GNU General Public License v3.0 only CeCILL Free Software License Agreement v1.1 HAL hal@ccsd.cnrs.fr Someone Nice someone@nice.fr FFJ """ # noqa - @istest - def post_deposit_atom_entry_serialization_error(self): + def test_post_deposit_atom_entry_serialization_error(self): """Posting an initial atom entry should return 201 with deposit receipt """ # given # when response = self.client.post( reverse(COL_IRI, args=[self.collection.name]), content_type='application/atom+xml;type=entry', data=self.atom_error_with_decimal, HTTP_SLUG='external-id', HTTP_IN_PROGRESS='false') # then self.assertEqual(response.status_code, status.HTTP_201_CREATED) response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content['deposit_id'] deposit = Deposit.objects.get(pk=deposit_id) dr = DepositRequest.objects.get(deposit=deposit) self.assertIsNotNone(dr.metadata) sw_version = dr.metadata.get('codemeta:softwareVersion') self.assertEquals(sw_version, '10.4') - @istest - def post_deposit_atom_empty_body_request(self): + def test_post_deposit_atom_empty_body_request(self): """Posting empty body request should return a 400 response """ response = self.client.post( reverse(COL_IRI, args=[self.collection.name]), content_type='application/atom+xml;type=entry', data=self.atom_entry_data_empty_body) self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) - @istest - def post_deposit_atom_badly_formatted_is_a_bad_request(self): + def test_post_deposit_atom_badly_formatted_is_a_bad_request(self): """Posting a badly formatted atom should return a 400 response """ response = self.client.post( reverse(COL_IRI, args=[self.collection.name]), content_type='application/atom+xml;type=entry', data=self.atom_entry_data_badly_formatted) self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) - @istest - def post_deposit_atom_without_slug_header_is_bad_request(self): + def test_post_deposit_atom_without_slug_header_is_bad_request(self): """Posting an atom entry without a slug header should return a 400 """ url = reverse(COL_IRI, args=[self.collection.name]) # when response = self.client.post( url, content_type='application/atom+xml;type=entry', data=self.atom_entry_data0, # + headers HTTP_IN_PROGRESS='false') self.assertIn(b'Missing SLUG header', response.content) self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) - @istest - def post_deposit_atom_unknown_collection(self): + def test_post_deposit_atom_unknown_collection(self): """Posting an atom entry to an unknown collection should return a 404 """ response = self.client.post( reverse(COL_IRI, args=['unknown-one']), content_type='application/atom+xml;type=entry', data=self.atom_entry_data3, HTTP_SLUG='something') self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) - @istest - def post_deposit_atom_entry_initial(self): + def test_post_deposit_atom_entry_initial(self): """Posting an initial atom entry should return 201 with deposit receipt """ # given external_id = 'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a' with self.assertRaises(Deposit.DoesNotExist): Deposit.objects.get(external_id=external_id) atom_entry_data = self.atom_entry_data0 % external_id.encode('utf-8') # when response = self.client.post( reverse(COL_IRI, args=[self.collection.name]), content_type='application/atom+xml;type=entry', data=atom_entry_data, HTTP_SLUG='external-id', HTTP_IN_PROGRESS='false') # then self.assertEqual(response.status_code, status.HTTP_201_CREATED) response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content['deposit_id'] deposit = Deposit.objects.get(pk=deposit_id) self.assertEqual(deposit.collection, self.collection) self.assertEqual(deposit.external_id, external_id) self.assertEqual(deposit.status, DEPOSIT_STATUS_DEPOSITED) self.assertEqual(deposit.client, self.user) # one associated request to a deposit deposit_request = DepositRequest.objects.get(deposit=deposit) self.assertIsNotNone(deposit_request.metadata) self.assertEquals( deposit_request.raw_metadata, atom_entry_data.decode('utf-8')) self.assertFalse(bool(deposit_request.archive)) - @istest - def post_deposit_atom_entry_with_codemeta(self): + def test_post_deposit_atom_entry_with_codemeta(self): """Posting an initial atom entry should return 201 with deposit receipt """ # given external_id = 'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a' with self.assertRaises(Deposit.DoesNotExist): Deposit.objects.get(external_id=external_id) atom_entry_data = self.atom_entry_data_dc_codemeta % ( external_id.encode('utf-8'), ) # when response = self.client.post( reverse(COL_IRI, args=[self.collection.name]), content_type='application/atom+xml;type=entry', data=atom_entry_data, HTTP_SLUG='external-id', HTTP_IN_PROGRESS='false') # then self.assertEqual(response.status_code, status.HTTP_201_CREATED) response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content['deposit_id'] deposit = Deposit.objects.get(pk=deposit_id) self.assertEqual(deposit.collection, self.collection) self.assertEqual(deposit.external_id, external_id) self.assertEqual(deposit.status, DEPOSIT_STATUS_DEPOSITED) self.assertEqual(deposit.client, self.user) # one associated request to a deposit deposit_request = DepositRequest.objects.get(deposit=deposit) self.assertIsNotNone(deposit_request.metadata) self.assertEquals( deposit_request.raw_metadata, atom_entry_data.decode('utf-8')) self.assertFalse(bool(deposit_request.archive)) - @istest def test_post_deposit_atom_entry_tei(self): """Posting initial atom entry as TEI should return 201 with receipt """ # given external_id = 'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a' with self.assertRaises(Deposit.DoesNotExist): Deposit.objects.get(external_id=external_id) atom_entry_data = self.atom_entry_tei # when response = self.client.post( reverse(COL_IRI, args=[self.collection.name]), content_type='application/atom+xml;type=entry', data=atom_entry_data, HTTP_SLUG=external_id, HTTP_IN_PROGRESS='false') # then self.assertEqual(response.status_code, status.HTTP_201_CREATED) response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content['deposit_id'] deposit = Deposit.objects.get(pk=deposit_id) self.assertEqual(deposit.collection, self.collection) self.assertEqual(deposit.external_id, external_id) self.assertEqual(deposit.status, DEPOSIT_STATUS_DEPOSITED) self.assertEqual(deposit.client, self.user) # one associated request to a deposit deposit_request = DepositRequest.objects.get(deposit=deposit) self.assertIsNotNone(deposit_request.metadata) self.assertEquals( deposit_request.raw_metadata, atom_entry_data.decode('utf-8')) self.assertFalse(bool(deposit_request.archive)) - @istest - def post_deposit_atom_entry_multiple_steps(self): + def test_post_deposit_atom_entry_multiple_steps(self): """After initial deposit, updating a deposit should return a 201 """ # given external_id = 'urn:uuid:2225c695-cfb8-4ebb-aaaa-80da344efa6a' with self.assertRaises(Deposit.DoesNotExist): deposit = Deposit.objects.get(external_id=external_id) # when response = self.client.post( reverse(COL_IRI, args=[self.collection.name]), content_type='application/atom+xml;type=entry', data=self.atom_entry_data1, HTTP_IN_PROGRESS='True', HTTP_SLUG=external_id) # then self.assertEqual(response.status_code, status.HTTP_201_CREATED) response_content = parse_xml(BytesIO(response.content)) deposit_id = int(response_content['deposit_id']) deposit = Deposit.objects.get(pk=deposit_id) self.assertEqual(deposit.collection, self.collection) self.assertEqual(deposit.external_id, external_id) self.assertEqual(deposit.status, 'partial') self.assertEqual(deposit.client, self.user) # one associated request to a deposit deposit_requests = DepositRequest.objects.filter(deposit=deposit) self.assertEqual(len(deposit_requests), 1) atom_entry_data = self.atom_entry_data2 % external_id.encode('utf-8') update_uri = response._headers['location'][1] # when updating the first deposit post response = self.client.post( update_uri, content_type='application/atom+xml;type=entry', data=atom_entry_data, HTTP_IN_PROGRESS='False') # then self.assertEqual(response.status_code, status.HTTP_201_CREATED) response_content = parse_xml(BytesIO(response.content)) deposit_id = int(response_content['deposit_id']) deposit = Deposit.objects.get(pk=deposit_id) self.assertEqual(deposit.collection, self.collection) self.assertEqual(deposit.external_id, external_id) self.assertEqual(deposit.status, DEPOSIT_STATUS_DEPOSITED) self.assertEqual(deposit.client, self.user) self.assertEqual(len(Deposit.objects.all()), 1) # now 2 associated requests to a same deposit deposit_requests = DepositRequest.objects.filter( deposit=deposit).order_by('id') self.assertEqual(len(deposit_requests), 2) expected_meta = [ { 'metadata': parse_xml(self.atom_entry_data1), 'raw_metadata': self.atom_entry_data1.decode('utf-8'), }, { 'metadata': parse_xml(atom_entry_data), 'raw_metadata': atom_entry_data.decode('utf-8'), } ] for i, deposit_request in enumerate(deposit_requests): actual_metadata = deposit_request.metadata self.assertEquals(actual_metadata, expected_meta[i]['metadata']) self.assertEquals(deposit_request.raw_metadata, expected_meta[i]['raw_metadata']) self.assertFalse(bool(deposit_request.archive)) diff --git a/swh/deposit/tests/api/test_deposit_binary.py b/swh/deposit/tests/api/test_deposit_binary.py index 73734396..6bf7f0ff 100644 --- a/swh/deposit/tests/api/test_deposit_binary.py +++ b/swh/deposit/tests/api/test_deposit_binary.py @@ -1,656 +1,645 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from django.core.files.uploadedfile import InMemoryUploadedFile from django.core.urlresolvers import reverse from io import BytesIO -from nose.tools import istest from rest_framework import status from rest_framework.test import APITestCase from swh.deposit.tests import TEST_CONFIG from swh.deposit.config import COL_IRI, EM_IRI from swh.deposit.config import DEPOSIT_STATUS_DEPOSITED from swh.deposit.models import Deposit, DepositRequest from swh.deposit.parsers import parse_xml from ..common import ( BasicTestCase, WithAuthTestCase, create_arborescence_archive, FileSystemCreationRoutine ) class DepositTestCase(APITestCase, WithAuthTestCase, BasicTestCase, FileSystemCreationRoutine): """Try and upload one single deposit """ def setUp(self): super().setUp() self.atom_entry_data0 = b""" Awesome Compiler hal urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a %s 2017-10-07T15:17:08Z some awesome author something awesome-compiler This is an awesome compiler destined to awesomely compile stuff and other stuff compiler,programming,language 2005-10-07T17:17:08Z 2005-10-07T17:17:08Z release note related link Awesome https://hoster.org/awesome-compiler GNU/Linux 0.0.1 running all """ self.atom_entry_data1 = b""" hal urn:uuid:2225c695-cfb8-4ebb-aaaa-80da344efa6a 2017-10-07T15:17:08Z some awesome author something awesome-compiler This is an awesome compiler destined to awesomely compile stuff and other stuff compiler,programming,language 2005-10-07T17:17:08Z 2005-10-07T17:17:08Z release note related link Awesome https://hoster.org/awesome-compiler GNU/Linux 0.0.1 running all """ self.atom_entry_data2 = b""" %s """ self.atom_entry_data_empty_body = b""" """ self.atom_entry_data3 = b""" something """ self.data_atom_entry_ok = b""" Title urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a 2005-10-07T17:17:08Z Contributor The abstract The abstract Access Rights Alternative Title Date Available Bibliographic Citation # noqa Contributor Description Has Part Has Version Identifier Is Part Of Publisher References Rights Holder Source Title Type """ - @istest - def post_deposit_binary_without_slug_header_is_bad_request(self): + def test_post_deposit_binary_without_slug_header_is_bad_request(self): """Posting a binary deposit without slug header should return 400 """ url = reverse(COL_IRI, args=[self.collection.name]) # when response = self.client.post( url, content_type='application/zip', # as zip data=self.archive['data'], # + headers CONTENT_LENGTH=self.archive['length'], HTTP_CONTENT_MD5=self.archive['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false', HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') self.assertIn(b'Missing SLUG header', response.content) self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) - @istest - def post_deposit_binary_upload_final_and_status_check(self): + def test_post_deposit_binary_upload_final_and_status_check(self): """Binary upload with correct headers should return 201 with receipt """ # given url = reverse(COL_IRI, args=[self.collection.name]) external_id = 'some-external-id-1' # when response = self.client.post( url, content_type='application/zip', # as zip data=self.archive['data'], # + headers CONTENT_LENGTH=self.archive['length'], # other headers needs HTTP_ prefix to be taken into account HTTP_SLUG=external_id, HTTP_CONTENT_MD5=self.archive['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false', HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % ( self.archive['name'], )) # then response_content = parse_xml(BytesIO(response.content)) self.assertEqual(response.status_code, status.HTTP_201_CREATED) deposit_id = response_content['deposit_id'] deposit = Deposit.objects.get(pk=deposit_id) self.assertEqual(deposit.status, DEPOSIT_STATUS_DEPOSITED) self.assertEqual(deposit.external_id, external_id) self.assertEqual(deposit.collection, self.collection) self.assertEqual(deposit.client, self.user) self.assertIsNone(deposit.swh_id) deposit_request = DepositRequest.objects.get(deposit=deposit) self.assertEquals(deposit_request.deposit, deposit) self.assertRegex(deposit_request.archive.name, self.archive['name']) self.assertIsNone(deposit_request.metadata) self.assertIsNone(deposit_request.raw_metadata) response_content = parse_xml(BytesIO(response.content)) self.assertEqual(response_content['deposit_archive'], self.archive['name']) self.assertEqual(int(response_content['deposit_id']), deposit.id) self.assertEqual(response_content['deposit_status'], deposit.status) edit_se_iri = reverse('edit_se_iri', args=[self.collection.name, deposit.id]) self.assertEqual(response._headers['location'], ('Location', 'http://testserver' + edit_se_iri)) - @istest - def post_deposit_binary_upload_supports_zip_or_tar(self): + def test_post_deposit_binary_upload_supports_zip_or_tar(self): """Binary upload with content-type not in [zip,x-tar] should return 415 """ # given url = reverse(COL_IRI, args=[self.collection.name]) external_id = 'some-external-id-1' # when response = self.client.post( url, content_type='application/octet-stream', data=self.archive['data'], # + headers CONTENT_LENGTH=self.archive['length'], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=self.archive['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false', HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') # then self.assertEqual(response.status_code, status.HTTP_415_UNSUPPORTED_MEDIA_TYPE) with self.assertRaises(Deposit.DoesNotExist): Deposit.objects.get(external_id=external_id) - @istest - def post_deposit_binary_fails_if_unsupported_packaging_header( + def test_post_deposit_binary_fails_if_unsupported_packaging_header( self): """Bin deposit without supported content_disposition header returns 400 """ # given url = reverse(COL_IRI, args=[self.collection.name]) external_id = 'some-external-id' # when response = self.client.post( url, content_type='application/zip', data=self.archive['data'], # + headers CONTENT_LENGTH=self.archive['length'], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=self.archive['md5sum'], HTTP_PACKAGING='something-unsupported', HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') # then self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) with self.assertRaises(Deposit.DoesNotExist): Deposit.objects.get(external_id=external_id) - @istest - def post_deposit_binary_upload_fail_if_no_content_disposition_header( + def test_post_deposit_binary_upload_fail_if_no_content_disposition_header( self): """Binary upload without content_disposition header should return 400 """ # given url = reverse(COL_IRI, args=[self.collection.name]) external_id = 'some-external-id' # when response = self.client.post( url, content_type='application/zip', data=self.archive['data'], # + headers CONTENT_LENGTH=self.archive['length'], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=self.archive['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false') # then self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) with self.assertRaises(Deposit.DoesNotExist): Deposit.objects.get(external_id=external_id) - @istest - def post_deposit_mediation_not_supported(self): + def test_post_deposit_mediation_not_supported(self): """Binary upload with mediation should return a 412 response """ # given url = reverse(COL_IRI, args=[self.collection.name]) external_id = 'some-external-id-1' # when response = self.client.post( url, content_type='application/zip', data=self.archive['data'], # + headers CONTENT_LENGTH=self.archive['length'], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=self.archive['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false', HTTP_ON_BEHALF_OF='someone', HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') # then self.assertEqual(response.status_code, status.HTTP_412_PRECONDITION_FAILED) with self.assertRaises(Deposit.DoesNotExist): Deposit.objects.get(external_id=external_id) - @istest - def post_deposit_binary_upload_fail_if_upload_size_limit_exceeded( + def test_post_deposit_binary_upload_fail_if_upload_size_limit_exceeded( self): """Binary upload must not exceed the limit set up... """ # given url = reverse(COL_IRI, args=[self.collection.name]) archive = create_arborescence_archive( self.root_path, 'archive2', 'file2', b'some content in file', up_to_size=TEST_CONFIG['max_upload_size']) external_id = 'some-external-id' # when response = self.client.post( url, content_type='application/zip', data=archive['data'], # + headers CONTENT_LENGTH=archive['length'], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=archive['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false', HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') # then self.assertEqual(response.status_code, status.HTTP_413_REQUEST_ENTITY_TOO_LARGE) self.assertRegex(response.content, b'Upload size limit exceeded') with self.assertRaises(Deposit.DoesNotExist): Deposit.objects.get(external_id=external_id) - @istest - def post_deposit_2_post_2_different_deposits(self): + def test_post_deposit_2_post_2_different_deposits(self): """2 posting deposits should return 2 different 201 with receipt """ url = reverse(COL_IRI, args=[self.collection.name]) # when response = self.client.post( url, content_type='application/zip', # as zip data=self.archive['data'], # + headers CONTENT_LENGTH=self.archive['length'], HTTP_SLUG='some-external-id-1', HTTP_CONTENT_MD5=self.archive['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false', HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') # then self.assertEqual(response.status_code, status.HTTP_201_CREATED) response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content['deposit_id'] deposit = Deposit.objects.get(pk=deposit_id) deposits = Deposit.objects.all() self.assertEqual(len(deposits), 1) self.assertEqual(deposits[0], deposit) # second post response = self.client.post( url, content_type='application/x-tar', # as zip data=self.archive['data'], # + headers CONTENT_LENGTH=self.archive['length'], HTTP_SLUG='another-external-id', HTTP_CONTENT_MD5=self.archive['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false', HTTP_CONTENT_DISPOSITION='attachment; filename=filename1') self.assertEqual(response.status_code, status.HTTP_201_CREATED) response_content = parse_xml(BytesIO(response.content)) deposit_id2 = response_content['deposit_id'] deposit2 = Deposit.objects.get(pk=deposit_id2) self.assertNotEqual(deposit, deposit2) deposits = Deposit.objects.all().order_by('id') self.assertEqual(len(deposits), 2) self.assertEqual(list(deposits), [deposit, deposit2]) - @istest - def post_deposit_binary_and_post_to_add_another_archive(self): + def test_post_deposit_binary_and_post_to_add_another_archive(self): """Updating a deposit should return a 201 with receipt """ # given url = reverse(COL_IRI, args=[self.collection.name]) external_id = 'some-external-id-1' # when response = self.client.post( url, content_type='application/zip', # as zip data=self.archive['data'], # + headers CONTENT_LENGTH=self.archive['length'], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=self.archive['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='true', HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % ( self.archive['name'], )) # then self.assertEqual(response.status_code, status.HTTP_201_CREATED) response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content['deposit_id'] deposit = Deposit.objects.get(pk=deposit_id) self.assertEqual(deposit.status, 'partial') self.assertEqual(deposit.external_id, external_id) self.assertEqual(deposit.collection, self.collection) self.assertEqual(deposit.client, self.user) self.assertIsNone(deposit.swh_id) deposit_request = DepositRequest.objects.get(deposit=deposit) self.assertEquals(deposit_request.deposit, deposit) self.assertEquals(deposit_request.type.name, 'archive') self.assertRegex(deposit_request.archive.name, self.archive['name']) # 2nd archive to upload archive2 = create_arborescence_archive( self.root_path, 'archive2', 'file2', b'some other content in file') # uri to update the content update_uri = reverse(EM_IRI, args=[self.collection.name, deposit_id]) # adding another archive for the deposit and finalizing it response = self.client.post( update_uri, content_type='application/zip', # as zip data=archive2['data'], # + headers CONTENT_LENGTH=archive2['length'], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=archive2['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % ( archive2['name'])) self.assertEqual(response.status_code, status.HTTP_201_CREATED) response_content = parse_xml(BytesIO(response.content)) deposit = Deposit.objects.get(pk=deposit_id) self.assertEqual(deposit.status, DEPOSIT_STATUS_DEPOSITED) self.assertEqual(deposit.external_id, external_id) self.assertEqual(deposit.collection, self.collection) self.assertEqual(deposit.client, self.user) self.assertIsNone(deposit.swh_id) deposit_requests = list(DepositRequest.objects.filter(deposit=deposit). order_by('id')) # 2 deposit requests for the same deposit self.assertEquals(len(deposit_requests), 2) self.assertEquals(deposit_requests[0].deposit, deposit) self.assertEquals(deposit_requests[0].type.name, 'archive') self.assertRegex(deposit_requests[0].archive.name, self.archive['name']) self.assertEquals(deposit_requests[1].deposit, deposit) self.assertEquals(deposit_requests[1].type.name, 'archive') self.assertRegex(deposit_requests[1].archive.name, archive2['name']) # only 1 deposit in db deposits = Deposit.objects.all() self.assertEqual(len(deposits), 1) - @istest - def post_deposit_then_post_or_put_is_refused_when_status_ready(self): + def test_post_deposit_then_post_or_put_is_refused_when_status_ready(self): """Updating a deposit with status 'ready' should return a 400 """ url = reverse(COL_IRI, args=[self.collection.name]) external_id = 'some-external-id-1' # when response = self.client.post( url, content_type='application/zip', # as zip data=self.archive['data'], # + headers CONTENT_LENGTH=self.archive['length'], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=self.archive['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false', HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') # then self.assertEqual(response.status_code, status.HTTP_201_CREATED) response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content['deposit_id'] deposit = Deposit.objects.get(pk=deposit_id) self.assertEqual(deposit.status, DEPOSIT_STATUS_DEPOSITED) self.assertEqual(deposit.external_id, external_id) self.assertEqual(deposit.collection, self.collection) self.assertEqual(deposit.client, self.user) self.assertIsNone(deposit.swh_id) deposit_request = DepositRequest.objects.get(deposit=deposit) self.assertEquals(deposit_request.deposit, deposit) self.assertRegex(deposit_request.archive.name, 'filename0') # updating/adding is forbidden # uri to update the content edit_se_iri = reverse( 'edit_se_iri', args=[self.collection.name, deposit_id]) em_iri = reverse( 'em_iri', args=[self.collection.name, deposit_id]) # Testing all update/add endpoint should fail # since the status is ready archive2 = create_arborescence_archive( self.root_path, 'archive2', 'file2', b'some content in file 2') # replacing file is no longer possible since the deposit's # status is ready r = self.client.put( em_iri, content_type='application/zip', data=archive2['data'], CONTENT_LENGTH=archive2['length'], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=archive2['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false', HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') self.assertEquals(r.status_code, status.HTTP_400_BAD_REQUEST) # adding file is no longer possible since the deposit's status # is ready r = self.client.post( em_iri, content_type='application/zip', data=archive2['data'], CONTENT_LENGTH=archive2['length'], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=archive2['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false', HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') self.assertEquals(r.status_code, status.HTTP_400_BAD_REQUEST) # replacing metadata is no longer possible since the deposit's # status is ready r = self.client.put( edit_se_iri, content_type='application/atom+xml;type=entry', data=self.data_atom_entry_ok, CONTENT_LENGTH=len(self.data_atom_entry_ok), HTTP_SLUG=external_id) self.assertEquals(r.status_code, status.HTTP_400_BAD_REQUEST) # adding new metadata is no longer possible since the # deposit's status is ready r = self.client.post( edit_se_iri, content_type='application/atom+xml;type=entry', data=self.data_atom_entry_ok, CONTENT_LENGTH=len(self.data_atom_entry_ok), HTTP_SLUG=external_id) self.assertEquals(r.status_code, status.HTTP_400_BAD_REQUEST) archive_content = b'some content representing archive' archive = InMemoryUploadedFile( BytesIO(archive_content), field_name='archive0', name='archive0', content_type='application/zip', size=len(archive_content), charset=None) atom_entry = InMemoryUploadedFile( BytesIO(self.data_atom_entry_ok), field_name='atom0', name='atom0', content_type='application/atom+xml; charset="utf-8"', size=len(self.data_atom_entry_ok), charset='utf-8') # replacing multipart metadata is no longer possible since the # deposit's status is ready r = self.client.put( edit_se_iri, format='multipart', data={ 'archive': archive, 'atom_entry': atom_entry, }) self.assertEquals(r.status_code, status.HTTP_400_BAD_REQUEST) # adding new metadata is no longer possible since the # deposit's status is ready r = self.client.post( edit_se_iri, format='multipart', data={ 'archive': archive, 'atom_entry': atom_entry, }) self.assertEquals(r.status_code, status.HTTP_400_BAD_REQUEST) diff --git a/swh/deposit/tests/api/test_deposit_check.py b/swh/deposit/tests/api/test_deposit_check.py index 9ce5d1c5..d3cb3528 100644 --- a/swh/deposit/tests/api/test_deposit_check.py +++ b/swh/deposit/tests/api/test_deposit_check.py @@ -1,246 +1,236 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from django.core.urlresolvers import reverse -from nose.tools import istest from nose.plugins.attrib import attr from rest_framework import status from rest_framework.test import APITestCase from swh.deposit.config import ( DEPOSIT_STATUS_VERIFIED, PRIVATE_CHECK_DEPOSIT, DEPOSIT_STATUS_DEPOSITED, DEPOSIT_STATUS_REJECTED ) from swh.deposit.api.private.deposit_check import ( SWHChecksDeposit, MANDATORY_ARCHIVE_INVALID, MANDATORY_FIELDS_MISSING, INCOMPATIBLE_URL_FIELDS, MANDATORY_ARCHIVE_UNSUPPORTED, ALTERNATE_FIELDS_MISSING, MANDATORY_ARCHIVE_MISSING ) from swh.deposit.models import Deposit from ..common import BasicTestCase, WithAuthTestCase, CommonCreationRoutine from ..common import FileSystemCreationRoutine @attr('fs') class CheckDepositTest(APITestCase, WithAuthTestCase, BasicTestCase, CommonCreationRoutine, FileSystemCreationRoutine): """Check deposit endpoints. """ def setUp(self): super().setUp() - @istest - def deposit_ok(self): + def test_deposit_ok(self): """Proper deposit should succeed the checks (-> status ready) """ deposit_id = self.create_simple_binary_deposit(status_partial=True) deposit_id = self.update_binary_deposit(deposit_id, status_partial=False) deposit = Deposit.objects.get(pk=deposit_id) self.assertEquals(deposit.status, DEPOSIT_STATUS_DEPOSITED) url = reverse(PRIVATE_CHECK_DEPOSIT, args=[self.collection.name, deposit.id]) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_200_OK) data = response.json() self.assertEqual(data['status'], DEPOSIT_STATUS_VERIFIED) deposit = Deposit.objects.get(pk=deposit.id) self.assertEquals(deposit.status, DEPOSIT_STATUS_VERIFIED) - @istest - def deposit_invalid_tarball(self): + def test_deposit_invalid_tarball(self): """Deposit with tarball (of 1 tarball) should fail the checks: rejected """ for archive_extension in ['zip', 'tar', 'tar.gz', 'tar.bz2', 'tar.xz']: deposit_id = self.create_deposit_archive_with_archive( archive_extension) deposit = Deposit.objects.get(pk=deposit_id) self.assertEquals(DEPOSIT_STATUS_DEPOSITED, deposit.status) url = reverse(PRIVATE_CHECK_DEPOSIT, args=[self.collection.name, deposit.id]) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_200_OK) data = response.json() self.assertEqual(data['status'], DEPOSIT_STATUS_REJECTED) details = data['details'] # archive checks failure self.assertEqual(len(details['archive']), 1) self.assertEqual(details['archive'][0]['summary'], MANDATORY_ARCHIVE_INVALID) deposit = Deposit.objects.get(pk=deposit.id) self.assertEquals(deposit.status, DEPOSIT_STATUS_REJECTED) - @istest - def deposit_ko_missing_tarball(self): + def test_deposit_ko_missing_tarball(self): """Deposit without archive should fail the checks: rejected """ deposit_id = self.create_deposit_ready() # no archive, only atom deposit = Deposit.objects.get(pk=deposit_id) self.assertEquals(DEPOSIT_STATUS_DEPOSITED, deposit.status) url = reverse(PRIVATE_CHECK_DEPOSIT, args=[self.collection.name, deposit.id]) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_200_OK) data = response.json() self.assertEqual(data['status'], DEPOSIT_STATUS_REJECTED) details = data['details'] # archive checks failure self.assertEqual(len(details['archive']), 1) self.assertEqual(details['archive'][0]['summary'], MANDATORY_ARCHIVE_MISSING) deposit = Deposit.objects.get(pk=deposit.id) self.assertEquals(deposit.status, DEPOSIT_STATUS_REJECTED) - @istest - def deposit_ko_unsupported_tarball(self): + def test_deposit_ko_unsupported_tarball(self): """Deposit with an unsupported tarball should fail the checks: rejected """ deposit_id = self.create_deposit_with_invalid_archive() deposit = Deposit.objects.get(pk=deposit_id) self.assertEquals(DEPOSIT_STATUS_DEPOSITED, deposit.status) url = reverse(PRIVATE_CHECK_DEPOSIT, args=[self.collection.name, deposit.id]) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_200_OK) data = response.json() self.assertEqual(data['status'], DEPOSIT_STATUS_REJECTED) details = data['details'] # archive checks failure self.assertEqual(len(details['archive']), 1) self.assertEqual(details['archive'][0]['summary'], MANDATORY_ARCHIVE_UNSUPPORTED) # metadata check failure self.assertEqual(len(details['metadata']), 2) mandatory = details['metadata'][0] self.assertEqual(mandatory['summary'], MANDATORY_FIELDS_MISSING) self.assertEqual(set(mandatory['fields']), set(['url', 'external_identifier', 'author'])) alternate = details['metadata'][1] self.assertEqual(alternate['summary'], ALTERNATE_FIELDS_MISSING) self.assertEqual(alternate['fields'], ['name or title']) # url check failure self.assertEqual(details['url']['summary'], INCOMPATIBLE_URL_FIELDS) deposit = Deposit.objects.get(pk=deposit.id) self.assertEquals(deposit.status, DEPOSIT_STATUS_REJECTED) - @istest - def check_deposit_metadata_ok(self): + def test_check_deposit_metadata_ok(self): """Proper deposit should succeed the checks (-> status ready) with all **MUST** metadata using the codemeta metadata test set """ deposit_id = self.create_simple_binary_deposit(status_partial=True) deposit_id_metadata = self.add_metadata_to_deposit(deposit_id) self.assertEquals(deposit_id, deposit_id_metadata) deposit = Deposit.objects.get(pk=deposit_id) self.assertEquals(deposit.status, DEPOSIT_STATUS_DEPOSITED) url = reverse(PRIVATE_CHECK_DEPOSIT, args=[self.collection.name, deposit.id]) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_200_OK) data = response.json() self.assertEqual(data['status'], DEPOSIT_STATUS_VERIFIED) deposit = Deposit.objects.get(pk=deposit.id) self.assertEquals(deposit.status, DEPOSIT_STATUS_VERIFIED) class CheckMetadata(unittest.TestCase, SWHChecksDeposit): - @istest - def check_metadata_ok(self): + def test_check_metadata_ok(self): actual_check, detail = self._check_metadata({ 'url': 'something', 'external_identifier': 'something-else', 'name': 'foo', 'author': 'someone', }) self.assertTrue(actual_check) self.assertIsNone(detail) - @istest - def check_metadata_ok2(self): + def test_check_metadata_ok2(self): actual_check, detail = self._check_metadata({ 'url': 'something', 'external_identifier': 'something-else', 'title': 'bar', 'author': 'someone', }) self.assertTrue(actual_check) self.assertIsNone(detail) - @istest - def check_metadata_ko(self): + def test_check_metadata_ko(self): """Missing optional field should be caught """ actual_check, error_detail = self._check_metadata({ 'url': 'something', 'external_identifier': 'something-else', 'author': 'someone', }) expected_error = { 'metadata': [{ 'summary': 'Mandatory alternate fields are missing', 'fields': ['name or title'], }] } self.assertFalse(actual_check) self.assertEqual(error_detail, expected_error) - @istest - def check_metadata_ko2(self): + def test_check_metadata_ko2(self): """Missing mandatory fields should be caught """ actual_check, error_detail = self._check_metadata({ 'url': 'something', 'external_identifier': 'something-else', 'title': 'foobar', }) expected_error = { 'metadata': [{ 'summary': 'Mandatory fields are missing', 'fields': ['author'], }] } self.assertFalse(actual_check) self.assertEqual(error_detail, expected_error) diff --git a/swh/deposit/tests/api/test_deposit_delete.py b/swh/deposit/tests/api/test_deposit_delete.py index 721d3128..409d76bf 100644 --- a/swh/deposit/tests/api/test_deposit_delete.py +++ b/swh/deposit/tests/api/test_deposit_delete.py @@ -1,119 +1,113 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from django.core.urlresolvers import reverse -from nose.tools import istest from rest_framework import status from rest_framework.test import APITestCase from swh.deposit.config import EDIT_SE_IRI, EM_IRI, ARCHIVE_KEY, METADATA_KEY from swh.deposit.config import DEPOSIT_STATUS_DEPOSITED from swh.deposit.models import Deposit, DepositRequest from ..common import BasicTestCase, WithAuthTestCase, CommonCreationRoutine class DepositDeleteTest(APITestCase, WithAuthTestCase, BasicTestCase, CommonCreationRoutine): - @istest - def delete_archive_on_partial_deposit_works(self): + def test_delete_archive_on_partial_deposit_works(self): """Removing partial deposit's archive should return a 204 response """ # given deposit_id = self.create_deposit_partial() deposit = Deposit.objects.get(pk=deposit_id) deposit_requests = DepositRequest.objects.filter(deposit=deposit) self.assertEquals(len(deposit_requests), 2) for dr in deposit_requests: if dr.type.name == ARCHIVE_KEY: continue elif dr.type.name == METADATA_KEY: continue else: self.fail('only archive and metadata type should exist ' 'in this test context') # when update_uri = reverse(EM_IRI, args=[self.collection.name, deposit_id]) response = self.client.delete(update_uri) # then self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) deposit = Deposit.objects.get(pk=deposit_id) requests = list(DepositRequest.objects.filter(deposit=deposit)) self.assertEquals(len(requests), 2) self.assertEquals(requests[0].type.name, 'metadata') self.assertEquals(requests[1].type.name, 'metadata') - @istest - def delete_archive_on_undefined_deposit_fails(self): + def test_delete_archive_on_undefined_deposit_fails(self): """Delete undefined deposit returns a 404 response """ # when update_uri = reverse(EM_IRI, args=[self.collection.name, 999]) response = self.client.delete(update_uri) # then self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) - @istest - def delete_archive_on_non_partial_deposit_fails(self): + def test_delete_archive_on_non_partial_deposit_fails(self): """Delete !partial status deposit should return a 400 response""" deposit_id = self.create_deposit_ready() deposit = Deposit.objects.get(pk=deposit_id) self.assertEquals(deposit.status, DEPOSIT_STATUS_DEPOSITED) # when update_uri = reverse(EM_IRI, args=[self.collection.name, deposit_id]) response = self.client.delete(update_uri) # then self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) deposit = Deposit.objects.get(pk=deposit_id) self.assertIsNotNone(deposit) - @istest - def delete_partial_deposit_works(self): + def test_delete_partial_deposit_works(self): """Delete deposit should return a 204 response """ # given deposit_id = self.create_simple_deposit_partial() deposit = Deposit.objects.get(pk=deposit_id) assert deposit.id == deposit_id # when url = reverse(EDIT_SE_IRI, args=[self.collection.name, deposit_id]) response = self.client.delete(url) # then self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) deposit_requests = list(DepositRequest.objects.filter(deposit=deposit)) self.assertEquals(deposit_requests, []) deposits = list(Deposit.objects.filter(pk=deposit_id)) self.assertEquals(deposits, []) - @istest - def delete_on_edit_se_iri_cannot_delete_non_partial_deposit(self): + def test_delete_on_edit_se_iri_cannot_delete_non_partial_deposit(self): """Delete !partial deposit should return a 400 response """ # given deposit_id = self.create_deposit_ready() deposit = Deposit.objects.get(pk=deposit_id) assert deposit.id == deposit_id # when url = reverse(EDIT_SE_IRI, args=[self.collection.name, deposit_id]) response = self.client.delete(url) # then self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) deposit = Deposit.objects.get(pk=deposit_id) self.assertIsNotNone(deposit) diff --git a/swh/deposit/tests/api/test_deposit_list.py b/swh/deposit/tests/api/test_deposit_list.py index fe05ff46..e22cd739 100644 --- a/swh/deposit/tests/api/test_deposit_list.py +++ b/swh/deposit/tests/api/test_deposit_list.py @@ -1,96 +1,94 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from django.core.urlresolvers import reverse -from nose.tools import istest from nose.plugins.attrib import attr from rest_framework import status from rest_framework.test import APITestCase from swh.deposit.api.converters import convert_status_detail from ...config import DEPOSIT_STATUS_PARTIAL, PRIVATE_LIST_DEPOSITS from ..common import BasicTestCase, WithAuthTestCase, CommonCreationRoutine from ...models import Deposit @attr('fs') class CheckDepositListTest(APITestCase, WithAuthTestCase, BasicTestCase, CommonCreationRoutine): """Check deposit list endpoints. """ def setUp(self): super().setUp() - @istest - def deposit_list(self): + def test_deposit_list(self): """Deposit list api should return the deposits """ deposit_id = self.create_deposit_partial() # amend the deposit with a status_detail deposit = Deposit.objects.get(pk=deposit_id) status_detail = { 'url': { 'summary': 'At least one compatible url field. Failed', 'fields': ['testurl'], }, 'metadata': [ { 'summary': 'Mandatory fields missing', 'fields': ['9', 10, 1.212], }, ], 'archive': [ { 'summary': 'Invalid archive', 'fields': ['3'], }, { 'summary': 'Unsupported archive', 'fields': [2], } ], } deposit.status_detail = status_detail deposit.save() deposit_id2 = self.create_deposit_partial() # NOTE: does not work as documented # https://docs.djangoproject.com/en/1.11/ref/urlresolvers/#django.core.urlresolvers.reverse # noqa # url = reverse(PRIVATE_LIST_DEPOSITS, kwargs={'page_size': 1}) main_url = reverse(PRIVATE_LIST_DEPOSITS) url = '%s?page_size=1' % main_url response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_200_OK) data = response.json() self.assertEqual(data['count'], 2) # 2 deposits expected_next = '%s?page=2&page_size=1' % main_url self.assertTrue(data['next'].endswith(expected_next)) self.assertIsNone(data['previous']) self.assertEqual(len(data['results']), 1) # page of size 1 deposit = data['results'][0] self.assertEquals(deposit['id'], deposit_id) self.assertEquals(deposit['status'], DEPOSIT_STATUS_PARTIAL) expected_status_detail = convert_status_detail(status_detail) self.assertEquals(deposit['status_detail'], expected_status_detail) # then 2nd page response2 = self.client.get(expected_next) self.assertEqual(response2.status_code, status.HTTP_200_OK) data2 = response2.json() self.assertEqual(data2['count'], 2) # still 2 deposits self.assertIsNone(data2['next']) expected_previous = '%s?page_size=1' % main_url self.assertTrue(data2['previous'].endswith(expected_previous)) self.assertEqual(len(data2['results']), 1) # page of size 1 deposit2 = data2['results'][0] self.assertEquals(deposit2['id'], deposit_id2) self.assertEquals(deposit2['status'], DEPOSIT_STATUS_PARTIAL) diff --git a/swh/deposit/tests/api/test_deposit_multipart.py b/swh/deposit/tests/api/test_deposit_multipart.py index 8388d2ec..b7339b4a 100644 --- a/swh/deposit/tests/api/test_deposit_multipart.py +++ b/swh/deposit/tests/api/test_deposit_multipart.py @@ -1,408 +1,402 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from django.core.files.uploadedfile import InMemoryUploadedFile from django.core.urlresolvers import reverse from io import BytesIO -from nose.tools import istest from rest_framework import status from rest_framework.test import APITestCase from swh.deposit.config import COL_IRI from swh.deposit.config import DEPOSIT_STATUS_DEPOSITED from swh.deposit.models import Deposit, DepositRequest from swh.deposit.parsers import parse_xml from ..common import BasicTestCase, WithAuthTestCase from ..common import FileSystemCreationRoutine class DepositMultipartTestCase(APITestCase, WithAuthTestCase, BasicTestCase, FileSystemCreationRoutine): """Post multipart deposit scenario """ def setUp(self): super().setUp() self.data_atom_entry_ok = b""" Title urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a 2005-10-07T17:17:08Z Contributor The abstract The abstract Access Rights Alternative Title Date Available Bibliographic Citation # noqa Contributor Description Has Part Has Version Identifier Is Part Of Publisher References Rights Holder Source Title Type """ self.data_atom_entry_update_in_place = """ urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa7b Title Type """ - @istest - def post_deposit_multipart_without_slug_header_is_bad_request(self): + def test_post_deposit_multipart_without_slug_header_is_bad_request(self): # given url = reverse(COL_IRI, args=[self.collection.name]) data_atom_entry = self.data_atom_entry_ok archive_content = b'some content representing archive' archive = InMemoryUploadedFile( BytesIO(archive_content), field_name='archive0', name='archive0', content_type='application/zip', size=len(archive_content), charset=None) atom_entry = InMemoryUploadedFile( BytesIO(data_atom_entry), field_name='atom0', name='atom0', content_type='application/atom+xml; charset="utf-8"', size=len(data_atom_entry), charset='utf-8') # when response = self.client.post( url, format='multipart', data={ 'archive': archive, 'atom_entry': atom_entry, }, # + headers HTTP_IN_PROGRESS='false') self.assertIn(b'Missing SLUG header', response.content) self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) - @istest - def post_deposit_multipart_zip(self): + def test_post_deposit_multipart_zip(self): """one multipart deposit (zip+xml) should be accepted """ # given url = reverse(COL_IRI, args=[self.collection.name]) # from django.core.files import uploadedfile data_atom_entry = self.data_atom_entry_ok archive = InMemoryUploadedFile( BytesIO(self.archive['data']), field_name=self.archive['name'], name=self.archive['name'], content_type='application/zip', size=self.archive['length'], charset=None) atom_entry = InMemoryUploadedFile( BytesIO(data_atom_entry), field_name='atom0', name='atom0', content_type='application/atom+xml; charset="utf-8"', size=len(data_atom_entry), charset='utf-8') external_id = 'external-id' # when response = self.client.post( url, format='multipart', data={ 'archive': archive, 'atom_entry': atom_entry, }, # + headers HTTP_IN_PROGRESS='false', HTTP_SLUG=external_id) # then self.assertEqual(response.status_code, status.HTTP_201_CREATED) response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content['deposit_id'] deposit = Deposit.objects.get(pk=deposit_id) self.assertEqual(deposit.status, DEPOSIT_STATUS_DEPOSITED) self.assertEqual(deposit.external_id, external_id) self.assertEqual(deposit.collection, self.collection) self.assertEqual(deposit.client, self.user) self.assertIsNone(deposit.swh_id) deposit_requests = DepositRequest.objects.filter(deposit=deposit) self.assertEquals(len(deposit_requests), 2) for deposit_request in deposit_requests: self.assertEquals(deposit_request.deposit, deposit) if deposit_request.type.name == 'archive': self.assertRegex(deposit_request.archive.name, self.archive['name']) self.assertIsNone(deposit_request.metadata) self.assertIsNone(deposit_request.raw_metadata) else: self.assertEquals( deposit_request.metadata['id'], 'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a') self.assertEquals(deposit_request.raw_metadata, data_atom_entry.decode('utf-8')) - @istest - def post_deposit_multipart_tar(self): + def test_post_deposit_multipart_tar(self): """one multipart deposit (tar+xml) should be accepted """ # given url = reverse(COL_IRI, args=[self.collection.name]) # from django.core.files import uploadedfile data_atom_entry = self.data_atom_entry_ok archive = InMemoryUploadedFile( BytesIO(self.archive['data']), field_name=self.archive['name'], name=self.archive['name'], content_type='application/x-tar', size=self.archive['length'], charset=None) atom_entry = InMemoryUploadedFile( BytesIO(data_atom_entry), field_name='atom0', name='atom0', content_type='application/atom+xml; charset="utf-8"', size=len(data_atom_entry), charset='utf-8') external_id = 'external-id' # when response = self.client.post( url, format='multipart', data={ 'archive': archive, 'atom_entry': atom_entry, }, # + headers HTTP_IN_PROGRESS='false', HTTP_SLUG=external_id) # then self.assertEqual(response.status_code, status.HTTP_201_CREATED) response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content['deposit_id'] deposit = Deposit.objects.get(pk=deposit_id) self.assertEqual(deposit.status, DEPOSIT_STATUS_DEPOSITED) self.assertEqual(deposit.external_id, external_id) self.assertEqual(deposit.collection, self.collection) self.assertEqual(deposit.client, self.user) self.assertIsNone(deposit.swh_id) deposit_requests = DepositRequest.objects.filter(deposit=deposit) self.assertEquals(len(deposit_requests), 2) for deposit_request in deposit_requests: self.assertEquals(deposit_request.deposit, deposit) if deposit_request.type.name == 'archive': self.assertRegex(deposit_request.archive.name, self.archive['name']) self.assertIsNone(deposit_request.metadata) self.assertIsNone(deposit_request.raw_metadata) else: self.assertEquals( deposit_request.metadata['id'], 'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a') self.assertEquals(deposit_request.raw_metadata, data_atom_entry.decode('utf-8')) - @istest - def post_deposit_multipart_put_to_replace_metadata(self): + def test_post_deposit_multipart_put_to_replace_metadata(self): """One multipart deposit followed by a metadata update should be accepted """ # given url = reverse(COL_IRI, args=[self.collection.name]) data_atom_entry = self.data_atom_entry_ok archive = InMemoryUploadedFile( BytesIO(self.archive['data']), field_name=self.archive['name'], name=self.archive['name'], content_type='application/zip', size=self.archive['length'], charset=None) atom_entry = InMemoryUploadedFile( BytesIO(data_atom_entry), field_name='atom0', name='atom0', content_type='application/atom+xml; charset="utf-8"', size=len(data_atom_entry), charset='utf-8') external_id = 'external-id' # when response = self.client.post( url, format='multipart', data={ 'archive': archive, 'atom_entry': atom_entry, }, # + headers HTTP_IN_PROGRESS='true', HTTP_SLUG=external_id) # then self.assertEqual(response.status_code, status.HTTP_201_CREATED) response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content['deposit_id'] deposit = Deposit.objects.get(pk=deposit_id) self.assertEqual(deposit.status, 'partial') self.assertEqual(deposit.external_id, external_id) self.assertEqual(deposit.collection, self.collection) self.assertEqual(deposit.client, self.user) self.assertIsNone(deposit.swh_id) deposit_requests = DepositRequest.objects.filter(deposit=deposit) self.assertEquals(len(deposit_requests), 2) for deposit_request in deposit_requests: self.assertEquals(deposit_request.deposit, deposit) if deposit_request.type.name == 'archive': self.assertRegex(deposit_request.archive.name, self.archive['name']) else: self.assertEquals( deposit_request.metadata['id'], 'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a') self.assertEquals(deposit_request.raw_metadata, data_atom_entry.decode('utf-8')) replace_metadata_uri = response._headers['location'][1] response = self.client.put( replace_metadata_uri, content_type='application/atom+xml;type=entry', data=self.data_atom_entry_update_in_place, HTTP_IN_PROGRESS='false') self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) # deposit_id did not change deposit = Deposit.objects.get(pk=deposit_id) self.assertEqual(deposit.status, DEPOSIT_STATUS_DEPOSITED) self.assertEqual(deposit.external_id, external_id) self.assertEqual(deposit.collection, self.collection) self.assertEqual(deposit.client, self.user) self.assertIsNone(deposit.swh_id) deposit_requests = DepositRequest.objects.filter(deposit=deposit) self.assertEquals(len(deposit_requests), 2) for deposit_request in deposit_requests: self.assertEquals(deposit_request.deposit, deposit) if deposit_request.type.name == 'archive': self.assertRegex(deposit_request.archive.name, self.archive['name']) else: self.assertEquals( deposit_request.metadata['id'], 'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa7b') self.assertEquals( deposit_request.raw_metadata, self.data_atom_entry_update_in_place) # FAILURE scenarios - @istest - def post_deposit_multipart_only_archive_and_atom_entry(self): + def test_post_deposit_multipart_only_archive_and_atom_entry(self): """Multipart deposit only accepts one archive and one atom+xml""" # given url = reverse(COL_IRI, args=[self.collection.name]) archive_content = b'some content representing archive' archive = InMemoryUploadedFile(BytesIO(archive_content), field_name='archive0', name='archive0', content_type='application/x-tar', size=len(archive_content), charset=None) other_archive_content = b"some-other-content" other_archive = InMemoryUploadedFile(BytesIO(other_archive_content), field_name='atom0', name='atom0', content_type='application/x-tar', size=len(other_archive_content), charset='utf-8') # when response = self.client.post( url, format='multipart', data={ 'archive': archive, 'atom_entry': other_archive, }, # + headers HTTP_IN_PROGRESS='false', HTTP_SLUG='external-id') # then self.assertEqual(response.status_code, status.HTTP_415_UNSUPPORTED_MEDIA_TYPE) self.assertTrue( 'Only 1 application/zip (or application/x-tar) archive' in response.content.decode('utf-8')) # when archive.seek(0) response = self.client.post( url, format='multipart', data={ 'archive': archive, }, # + headers HTTP_IN_PROGRESS='false', HTTP_SLUG='external-id') # then self.assertEqual(response.status_code, status.HTTP_415_UNSUPPORTED_MEDIA_TYPE) self.assertTrue( 'You must provide both 1 application/zip (or ' 'application/x-tar) and 1 atom+xml entry for ' 'multipart deposit' in response.content.decode('utf-8') ) diff --git a/swh/deposit/tests/api/test_deposit_read_archive.py b/swh/deposit/tests/api/test_deposit_read_archive.py index 21c4bf90..7ed3b85a 100644 --- a/swh/deposit/tests/api/test_deposit_read_archive.py +++ b/swh/deposit/tests/api/test_deposit_read_archive.py @@ -1,130 +1,125 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import hashlib import os from django.core.urlresolvers import reverse -from nose.tools import istest from nose.plugins.attrib import attr from rest_framework import status from rest_framework.test import APITestCase from swh.core import tarball from swh.deposit.config import PRIVATE_GET_RAW_CONTENT from swh.deposit.tests import TEST_CONFIG from ..common import BasicTestCase, WithAuthTestCase, CommonCreationRoutine from ..common import FileSystemCreationRoutine, create_arborescence_archive @attr('fs') class DepositReadArchivesTest(APITestCase, WithAuthTestCase, BasicTestCase, CommonCreationRoutine, FileSystemCreationRoutine): def setUp(self): super().setUp() self.archive2 = create_arborescence_archive( self.root_path, 'archive2', 'file2', b'some other content in file') self.workdir = os.path.join(self.root_path, 'workdir') - @istest - def access_to_existing_deposit_with_one_archive(self): + def test_access_to_existing_deposit_with_one_archive(self): """Access to deposit should stream a 200 response with its raw content """ deposit_id = self.create_simple_binary_deposit() url = reverse(PRIVATE_GET_RAW_CONTENT, args=[self.collection.name, deposit_id]) r = self.client.get(url) self.assertEquals(r.status_code, status.HTTP_200_OK) self.assertEquals(r._headers['content-type'][1], 'application/octet-stream') # read the stream data = b''.join(r.streaming_content) actual_sha1 = hashlib.sha1(data).hexdigest() self.assertEquals(actual_sha1, self.archive['sha1sum']) # this does not touch the extraction dir so this should stay empty self.assertEquals(os.listdir(TEST_CONFIG['extraction_dir']), []) def _check_tarball_consistency(self, actual_sha1): tarball.uncompress(self.archive['path'], self.workdir) self.assertEquals(os.listdir(self.workdir), ['file1']) tarball.uncompress(self.archive2['path'], self.workdir) lst = set(os.listdir(self.workdir)) self.assertEquals(lst, {'file1', 'file2'}) new_path = self.workdir + '.zip' tarball.compress(new_path, 'zip', self.workdir) with open(new_path, 'rb') as f: h = hashlib.sha1(f.read()).hexdigest() self.assertEqual(actual_sha1, h) self.assertNotEqual(actual_sha1, self.archive['sha1sum']) self.assertNotEqual(actual_sha1, self.archive2['sha1sum']) - @istest - def access_to_existing_deposit_with_multiple_archives(self): + def test_access_to_existing_deposit_with_multiple_archives(self): """Access to deposit should stream a 200 response with its raw contents """ deposit_id = self.create_complex_binary_deposit() url = reverse(PRIVATE_GET_RAW_CONTENT, args=[self.collection.name, deposit_id]) r = self.client.get(url) self.assertEquals(r.status_code, status.HTTP_200_OK) self.assertEquals(r._headers['content-type'][1], 'application/octet-stream') # read the stream data = b''.join(r.streaming_content) actual_sha1 = hashlib.sha1(data).hexdigest() self._check_tarball_consistency(actual_sha1) # this touches the extraction directory but should clean up # after itself self.assertEquals(os.listdir(TEST_CONFIG['extraction_dir']), []) class DepositReadArchivesFailureTest(APITestCase, WithAuthTestCase, BasicTestCase, CommonCreationRoutine): - @istest - def access_to_nonexisting_deposit_returns_404_response(self): + def test_access_to_nonexisting_deposit_returns_404_response(self): """Read unknown collection should return a 404 response """ unknown_id = '999' url = reverse(PRIVATE_GET_RAW_CONTENT, args=[self.collection.name, unknown_id]) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) self.assertIn('Deposit with id %s does not exist' % unknown_id, response.content.decode('utf-8')) - @istest - def access_to_nonexisting_collection_returns_404_response(self): + def test_access_to_nonexisting_collection_returns_404_response(self): """Read unknown deposit should return a 404 response """ collection_name = 'non-existing' deposit_id = self.create_deposit_partial() url = reverse(PRIVATE_GET_RAW_CONTENT, args=[collection_name, deposit_id]) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) self.assertIn('Unknown collection name %s' % collection_name, response.content.decode('utf-8')) diff --git a/swh/deposit/tests/api/test_deposit_read_metadata.py b/swh/deposit/tests/api/test_deposit_read_metadata.py index 15653fd7..08b211f5 100644 --- a/swh/deposit/tests/api/test_deposit_read_metadata.py +++ b/swh/deposit/tests/api/test_deposit_read_metadata.py @@ -1,210 +1,205 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from django.core.urlresolvers import reverse -from nose.tools import istest from rest_framework import status from rest_framework.test import APITestCase from swh.deposit.models import Deposit from swh.deposit.config import PRIVATE_GET_DEPOSIT_METADATA from swh.deposit.config import DEPOSIT_STATUS_LOAD_SUCCESS from swh.deposit.config import DEPOSIT_STATUS_PARTIAL from ...config import SWH_PERSON from ..common import BasicTestCase, WithAuthTestCase, CommonCreationRoutine class DepositReadMetadataTest(APITestCase, WithAuthTestCase, BasicTestCase, CommonCreationRoutine): """Deposit access to read metadata information on deposit. """ - @istest - def read_metadata(self): + def test_read_metadata(self): """Private metadata read api to existing deposit should return metadata """ deposit_id = self.create_deposit_partial() url = reverse(PRIVATE_GET_DEPOSIT_METADATA, args=[self.collection.name, deposit_id]) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEquals(response._headers['content-type'][1], 'application/json') data = response.json() expected_meta = { 'origin': { 'url': 'https://hal-test.archives-ouvertes.fr/' + 'some-external-id', 'type': 'deposit' }, 'origin_metadata': { 'metadata': { '@xmlns': ['http://www.w3.org/2005/Atom'], 'author': ['some awesome author', 'another one', 'no one'], 'external_identifier': 'some-external-id', 'url': 'https://hal-test.archives-ouvertes.fr/' + 'some-external-id' }, 'provider': { 'provider_name': 'hal', 'provider_type': 'deposit_client', 'provider_url': 'https://hal-test.archives-ouvertes.fr/', 'metadata': {} }, 'tool': { 'tool_name': 'swh-deposit', 'tool_version': '0.0.1', 'tool_configuration': { 'sword_version': '2' } } }, 'revision': { 'synthetic': True, 'committer_date': None, 'message': 'hal: Deposit %s in collection hal' % deposit_id, 'author': SWH_PERSON, 'committer': SWH_PERSON, 'date': None, 'metadata': { '@xmlns': ['http://www.w3.org/2005/Atom'], 'author': ['some awesome author', 'another one', 'no one'], 'external_identifier': 'some-external-id', 'url': 'https://hal-test.archives-ouvertes.fr/' + 'some-external-id' }, 'type': 'tar' }, 'branch_name': 'master', } self.assertEquals(data, expected_meta) - @istest - def read_metadata_revision_with_parent(self): + def test_read_metadata_revision_with_parent(self): """Private read metadata to a deposit (with parent) returns metadata """ swh_id = 'da78a9d4cf1d5d29873693fd496142e3a18c20fa' swh_persistent_id = 'swh:1:rev:%s' % swh_id deposit_id1 = self.create_deposit_with_status( status=DEPOSIT_STATUS_LOAD_SUCCESS, external_id='some-external-id', swh_id=swh_persistent_id) deposit_parent = Deposit.objects.get(pk=deposit_id1) self.assertEquals(deposit_parent.swh_id, swh_persistent_id) self.assertEquals(deposit_parent.external_id, 'some-external-id') self.assertEquals(deposit_parent.status, DEPOSIT_STATUS_LOAD_SUCCESS) deposit_id = self.create_deposit_partial( external_id='some-external-id') deposit = Deposit.objects.get(pk=deposit_id) self.assertEquals(deposit.external_id, 'some-external-id') self.assertEquals(deposit.swh_id, None) self.assertEquals(deposit.parent, deposit_parent) self.assertEquals(deposit.status, DEPOSIT_STATUS_PARTIAL) url = reverse(PRIVATE_GET_DEPOSIT_METADATA, args=[self.collection.name, deposit_id]) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEquals(response._headers['content-type'][1], 'application/json') data = response.json() expected_meta = { 'origin': { 'url': 'https://hal-test.archives-ouvertes.fr/' + 'some-external-id', 'type': 'deposit' }, 'origin_metadata': { 'metadata': { '@xmlns': ['http://www.w3.org/2005/Atom'], 'author': ['some awesome author', 'another one', 'no one'], 'external_identifier': 'some-external-id', 'url': 'https://hal-test.archives-ouvertes.fr/' + 'some-external-id' }, 'provider': { 'provider_name': 'hal', 'provider_type': 'deposit_client', 'provider_url': 'https://hal-test.archives-ouvertes.fr/', 'metadata': {} }, 'tool': { 'tool_name': 'swh-deposit', 'tool_version': '0.0.1', 'tool_configuration': { 'sword_version': '2' } } }, 'revision': { 'synthetic': True, 'date': None, 'committer_date': None, 'author': SWH_PERSON, 'committer': SWH_PERSON, 'type': 'tar', 'message': 'hal: Deposit %s in collection hal' % deposit_id, 'metadata': { '@xmlns': ['http://www.w3.org/2005/Atom'], 'author': ['some awesome author', 'another one', 'no one'], 'external_identifier': 'some-external-id', 'url': 'https://hal-test.archives-ouvertes.fr/' + 'some-external-id' }, 'parents': [swh_id] }, 'branch_name': 'master', } self.assertEqual(data, expected_meta) - @istest - def access_to_nonexisting_deposit_returns_404_response(self): + def test_access_to_nonexisting_deposit_returns_404_response(self): """Read unknown collection should return a 404 response """ unknown_id = '999' url = reverse(PRIVATE_GET_DEPOSIT_METADATA, args=[self.collection.name, unknown_id]) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) self.assertIn('Deposit with id %s does not exist' % unknown_id, response.content.decode('utf-8')) - @istest - def access_to_nonexisting_collection_returns_404_response(self): + def test_access_to_nonexisting_collection_returns_404_response(self): """Read unknown deposit should return a 404 response """ collection_name = 'non-existing' deposit_id = self.create_deposit_partial() url = reverse(PRIVATE_GET_DEPOSIT_METADATA, args=[collection_name, deposit_id]) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) self.assertIn('Unknown collection name %s' % collection_name, response.content.decode('utf-8'),) diff --git a/swh/deposit/tests/api/test_deposit_status.py b/swh/deposit/tests/api/test_deposit_status.py index 5c2fe066..256f2cf6 100644 --- a/swh/deposit/tests/api/test_deposit_status.py +++ b/swh/deposit/tests/api/test_deposit_status.py @@ -1,150 +1,144 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from django.core.urlresolvers import reverse from io import BytesIO -from nose.tools import istest from rest_framework import status from rest_framework.test import APITestCase from swh.deposit.config import (COL_IRI, STATE_IRI, DEPOSIT_STATUS_DEPOSITED, DEPOSIT_STATUS_REJECTED) from swh.deposit.models import Deposit, DEPOSIT_STATUS_DETAIL from swh.deposit.models import DEPOSIT_STATUS_LOAD_SUCCESS from swh.deposit.parsers import parse_xml from ..common import BasicTestCase, WithAuthTestCase, FileSystemCreationRoutine from ..common import CommonCreationRoutine class DepositStatusTestCase(APITestCase, WithAuthTestCase, BasicTestCase, FileSystemCreationRoutine, CommonCreationRoutine): """Status on deposit """ - @istest - def post_deposit_with_status_check(self): + def test_post_deposit_with_status_check(self): """Binary upload should be accepted """ # given url = reverse(COL_IRI, args=[self.collection.name]) external_id = 'some-external-id-1' # when response = self.client.post( url, content_type='application/zip', # as zip data=self.archive['data'], # + headers CONTENT_LENGTH=self.archive['length'], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=self.archive['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false', HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') # then self.assertEqual(response.status_code, status.HTTP_201_CREATED) deposit = Deposit.objects.get(external_id=external_id) status_url = reverse(STATE_IRI, args=[self.collection.name, deposit.id]) # check status status_response = self.client.get(status_url) self.assertEqual(status_response.status_code, status.HTTP_200_OK) r = parse_xml(BytesIO(status_response.content)) self.assertEqual(int(r['deposit_id']), deposit.id) self.assertEqual(r['deposit_status'], DEPOSIT_STATUS_DEPOSITED) self.assertEqual(r['deposit_status_detail'], DEPOSIT_STATUS_DETAIL[DEPOSIT_STATUS_DEPOSITED]) - @istest - def status_with_swh_information(self): + def test_status_with_swh_information(self): _status = DEPOSIT_STATUS_LOAD_SUCCESS _context = 'https://hal.archives-ouvertes.fr/hal-01727745' _swh_id = 'swh:1:dir:42a13fc721c8716ff695d0d62fc851d641f3a12b' _swh_id_context = '%s;%s' % (_swh_id, _context) _swh_anchor_id = 'swh:rev:1:548b3c0a2bb43e1fca191e24b5803ff6b3bc7c10' _swh_anchor_id_context = '%s;%s' % (_swh_anchor_id, _context) # given deposit_id = self.create_deposit_with_status( status=_status, swh_id=_swh_id, swh_id_context=_swh_id_context, swh_anchor_id=_swh_anchor_id, swh_anchor_id_context=_swh_anchor_id_context ) url = reverse(STATE_IRI, args=[self.collection.name, deposit_id]) # when status_response = self.client.get(url) # then self.assertEqual(status_response.status_code, status.HTTP_200_OK) r = parse_xml(BytesIO(status_response.content)) self.assertEqual(int(r['deposit_id']), deposit_id) self.assertEqual(r['deposit_status'], _status) self.assertEqual(r['deposit_status_detail'], DEPOSIT_STATUS_DETAIL[DEPOSIT_STATUS_LOAD_SUCCESS]) self.assertEqual(r['deposit_swh_id'], _swh_id) self.assertEqual(r['deposit_swh_id_context'], _swh_id_context) self.assertEqual(r['deposit_swh_anchor_id'], _swh_anchor_id) self.assertEqual(r['deposit_swh_anchor_id_context'], _swh_anchor_id_context) - @istest - def status_on_unknown_deposit(self): + def test_status_on_unknown_deposit(self): """Asking for the status of unknown deposit returns 404 response""" status_url = reverse(STATE_IRI, args=[self.collection.name, 999]) status_response = self.client.get(status_url) self.assertEqual(status_response.status_code, status.HTTP_404_NOT_FOUND) - @istest - def status_with_http_accept_header_should_not_break(self): + def test_status_with_http_accept_header_should_not_break(self): """Asking deposit status with Accept header should return 200 """ deposit_id = self.create_deposit_partial() status_url = reverse(STATE_IRI, args=[ self.collection.name, deposit_id]) response = self.client.get( status_url, HTTP_ACCEPT='text/html,application/xml;q=9,*/*,q=8') self.assertEqual(response.status_code, status.HTTP_200_OK) - @istest - def status_on_deposit_rejected(self): + def test_status_on_deposit_rejected(self): _status = DEPOSIT_STATUS_REJECTED _swh_id = '548b3c0a2bb43e1fca191e24b5803ff6b3bc7c10' _status_detail = {'url': {'summary': 'Wrong url'}} # given deposit_id = self.create_deposit_with_status( status=_status, swh_id=_swh_id, status_detail=_status_detail) url = reverse(STATE_IRI, args=[self.collection.name, deposit_id]) # when status_response = self.client.get(url) # then self.assertEqual(status_response.status_code, status.HTTP_200_OK) r = parse_xml(BytesIO(status_response.content)) self.assertEqual(int(r['deposit_id']), deposit_id) self.assertEqual(r['deposit_status'], _status) self.assertEqual(r['deposit_status_detail'], '- Wrong url') self.assertEqual(r['deposit_swh_id'], _swh_id) diff --git a/swh/deposit/tests/api/test_deposit_update.py b/swh/deposit/tests/api/test_deposit_update.py index 22478011..286d2f09 100644 --- a/swh/deposit/tests/api/test_deposit_update.py +++ b/swh/deposit/tests/api/test_deposit_update.py @@ -1,345 +1,333 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from django.core.urlresolvers import reverse -from nose.tools import istest from rest_framework import status from rest_framework.test import APITestCase from swh.deposit.models import Deposit, DepositRequest from swh.deposit.config import EDIT_SE_IRI, EM_IRI from ..common import BasicTestCase, WithAuthTestCase, CommonCreationRoutine from ..common import FileSystemCreationRoutine, create_arborescence_archive class DepositUpdateOrReplaceExistingDataTest( APITestCase, WithAuthTestCase, BasicTestCase, FileSystemCreationRoutine, CommonCreationRoutine): """Try put/post (update/replace) query on EM_IRI """ def setUp(self): super().setUp() self.atom_entry_data1 = b""" bar """ self.atom_entry_data1 = b""" bar """ self.archive2 = create_arborescence_archive( self.root_path, 'archive2', 'file2', b'some other content in file') - @istest - def replace_archive_to_deposit_is_possible(self): + def test_replace_archive_to_deposit_is_possible(self): """Replace all archive with another one should return a 204 response """ # given deposit_id = self.create_simple_binary_deposit(status_partial=True) deposit = Deposit.objects.get(pk=deposit_id) requests = DepositRequest.objects.filter( deposit=deposit, type=self.deposit_request_types['archive']) assert len(list(requests)) == 1 assert self.archive['name'] in requests[0].archive.name # we have no metadata for that deposit requests = list(DepositRequest.objects.filter( deposit=deposit, type=self.deposit_request_types['metadata'])) assert len(requests) == 0 deposit_id = self._update_deposit_with_status(deposit_id, status_partial=True) requests = list(DepositRequest.objects.filter( deposit=deposit, type=self.deposit_request_types['metadata'])) assert len(requests) == 1 update_uri = reverse(EM_IRI, args=[self.collection.name, deposit_id]) external_id = 'some-external-id-1' response = self.client.put( update_uri, content_type='application/zip', # as zip data=self.archive2['data'], # + headers CONTENT_LENGTH=self.archive2['length'], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=self.archive2['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false', HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % ( self.archive2['name'], )) self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) requests = DepositRequest.objects.filter( deposit=deposit, type=self.deposit_request_types['archive']) self.assertEquals(len(list(requests)), 1) self.assertRegex(requests[0].archive.name, self.archive2['name']) # check we did not touch the other parts requests = list(DepositRequest.objects.filter( deposit=deposit, type=self.deposit_request_types['metadata'])) self.assertEquals(len(requests), 1) - @istest - def replace_metadata_to_deposit_is_possible(self): + def test_replace_metadata_to_deposit_is_possible(self): """Replace all metadata with another one should return a 204 response """ # given deposit_id = self.create_simple_binary_deposit(status_partial=True) deposit = Deposit.objects.get(pk=deposit_id) requests = DepositRequest.objects.filter( deposit=deposit, type=self.deposit_request_types['metadata']) assert len(list(requests)) == 0 requests = list(DepositRequest.objects.filter( deposit=deposit, type=self.deposit_request_types['archive'])) assert len(requests) == 1 update_uri = reverse(EDIT_SE_IRI, args=[self.collection.name, deposit_id]) response = self.client.put( update_uri, content_type='application/atom+xml;type=entry', data=self.atom_entry_data1) self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) requests = DepositRequest.objects.filter( deposit=deposit, type=self.deposit_request_types['metadata']) self.assertEquals(len(list(requests)), 1) metadata = requests[0].metadata self.assertEquals(metadata['foobar'], 'bar') # check we did not touch the other parts requests = list(DepositRequest.objects.filter( deposit=deposit, type=self.deposit_request_types['archive'])) self.assertEquals(len(requests), 1) - @istest - def add_archive_to_deposit_is_possible(self): + def test_add_archive_to_deposit_is_possible(self): """Add another archive to a deposit return a 201 response """ # given deposit_id = self.create_simple_binary_deposit(status_partial=True) deposit = Deposit.objects.get(pk=deposit_id) requests = DepositRequest.objects.filter( deposit=deposit, type=self.deposit_request_types['archive']) assert len(list(requests)) == 1 assert self.archive['name'] in requests[0].archive.name requests = list(DepositRequest.objects.filter( deposit=deposit, type=self.deposit_request_types['metadata'])) assert len(requests) == 0 update_uri = reverse(EM_IRI, args=[self.collection.name, deposit_id]) external_id = 'some-external-id-1' response = self.client.post( update_uri, content_type='application/zip', # as zip data=self.archive2['data'], # + headers CONTENT_LENGTH=self.archive2['length'], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=self.archive2['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false', HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % ( self.archive2['name'],)) self.assertEqual(response.status_code, status.HTTP_201_CREATED) requests = list(DepositRequest.objects.filter( deposit=deposit, type=self.deposit_request_types['archive']).order_by('id')) self.assertEquals(len(requests), 2) # first archive still exists self.assertRegex(requests[0].archive.name, self.archive['name']) # a new one was added self.assertRegex(requests[1].archive.name, self.archive2['name']) # check we did not touch the other parts requests = list(DepositRequest.objects.filter( deposit=deposit, type=self.deposit_request_types['metadata'])) self.assertEquals(len(requests), 0) - @istest - def add_metadata_to_deposit_is_possible(self): + def test_add_metadata_to_deposit_is_possible(self): """Add metadata with another one should return a 204 response """ # given deposit_id = self.create_deposit_partial() deposit = Deposit.objects.get(pk=deposit_id) requests = DepositRequest.objects.filter( deposit=deposit, type=self.deposit_request_types['metadata']) assert len(list(requests)) == 2 requests = list(DepositRequest.objects.filter( deposit=deposit, type=self.deposit_request_types['archive'])) assert len(requests) == 0 update_uri = reverse(EDIT_SE_IRI, args=[self.collection.name, deposit_id]) response = self.client.post( update_uri, content_type='application/atom+xml;type=entry', data=self.atom_entry_data1) self.assertEqual(response.status_code, status.HTTP_201_CREATED) requests = DepositRequest.objects.filter( deposit=deposit, type=self.deposit_request_types['metadata']).order_by('id') self.assertEquals(len(list(requests)), 3) # a new one was added self.assertEquals(requests[1].metadata['foobar'], 'bar') # check we did not touch the other parts requests = list(DepositRequest.objects.filter( deposit=deposit, type=self.deposit_request_types['archive'])) self.assertEquals(len(requests), 0) class DepositUpdateFailuresTest(APITestCase, WithAuthTestCase, BasicTestCase, CommonCreationRoutine): """Failure scenario about add/replace (post/put) query on deposit. """ - @istest - def add_metadata_to_unknown_collection(self): + def test_add_metadata_to_unknown_collection(self): """Replacing metadata to unknown deposit should return a 404 response """ url = reverse(EDIT_SE_IRI, args=['test', 1000]) response = self.client.post( url, content_type='application/atom+xml;type=entry', data=self.atom_entry_data0) self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) self.assertRegex(response.content.decode('utf-8'), 'Unknown collection name test') - @istest - def add_metadata_to_unknown_deposit(self): + def test_add_metadata_to_unknown_deposit(self): """Replacing metadata to unknown deposit should return a 404 response """ url = reverse(EDIT_SE_IRI, args=[self.collection.name, 999]) response = self.client.post( url, content_type='application/atom+xml;type=entry', data=self.atom_entry_data0) self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) self.assertRegex(response.content.decode('utf-8'), 'Deposit with id 999 does not exist') - @istest - def replace_metadata_to_unknown_deposit(self): + def test_replace_metadata_to_unknown_deposit(self): """Adding metadata to unknown deposit should return a 404 response """ url = reverse(EDIT_SE_IRI, args=[self.collection.name, 998]) response = self.client.put( url, content_type='application/atom+xml;type=entry', data=self.atom_entry_data0) self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) self.assertRegex(response.content.decode('utf-8'), 'Deposit with id 998 does not exist') - @istest - def add_archive_to_unknown_deposit(self): + def test_add_archive_to_unknown_deposit(self): """Adding metadata to unknown deposit should return a 404 response """ url = reverse(EM_IRI, args=[self.collection.name, 997]) response = self.client.post( url, content_type='application/zip', data=self.atom_entry_data0) self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) self.assertRegex(response.content.decode('utf-8'), 'Deposit with id 997 does not exist') - @istest - def replace_archive_to_unknown_deposit(self): + def test_replace_archive_to_unknown_deposit(self): """Replacing archive to unknown deposit should return a 404 response """ url = reverse(EM_IRI, args=[self.collection.name, 996]) response = self.client.put( url, content_type='application/zip', data=self.atom_entry_data0) self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) self.assertRegex(response.content.decode('utf-8'), 'Deposit with id 996 does not exist') - @istest - def post_metadata_to_em_iri_failure(self): + def test_post_metadata_to_em_iri_failure(self): """Update (POST) archive with wrong content type should return 400 """ deposit_id = self.create_deposit_partial() # only update on partial update_uri = reverse(EM_IRI, args=[self.collection.name, deposit_id]) response = self.client.post( update_uri, content_type='application/x-gtar-compressed', data=self.atom_entry_data0) self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) self.assertRegex(response.content.decode('utf-8'), 'Packaging format supported is restricted to ' 'application/zip, application/x-tar') - @istest - def put_metadata_to_em_iri_failure(self): + def test_put_metadata_to_em_iri_failure(self): """Update (PUT) archive with wrong content type should return 400 """ # given deposit_id = self.create_deposit_partial() # only update on partial # when update_uri = reverse(EM_IRI, args=[self.collection.name, deposit_id]) response = self.client.put( update_uri, content_type='application/atom+xml;type=entry', data=self.atom_entry_data0) # then self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) self.assertRegex(response.content.decode('utf-8'), 'Packaging format supported is restricted to ' 'application/zip, application/x-tar') diff --git a/swh/deposit/tests/api/test_deposit_update_status.py b/swh/deposit/tests/api/test_deposit_update_status.py index 23f35052..0b5f21ec 100644 --- a/swh/deposit/tests/api/test_deposit_update_status.py +++ b/swh/deposit/tests/api/test_deposit_update_status.py @@ -1,136 +1,130 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json from django.core.urlresolvers import reverse -from nose.tools import istest from rest_framework import status from rest_framework.test import APITestCase from swh.deposit.models import Deposit, DEPOSIT_STATUS_DETAIL from swh.deposit.config import PRIVATE_PUT_DEPOSIT, DEPOSIT_STATUS_VERIFIED from swh.deposit.config import DEPOSIT_STATUS_LOAD_SUCCESS from ..common import BasicTestCase class UpdateDepositStatusTest(APITestCase, BasicTestCase): """Update the deposit's status scenario """ def setUp(self): super().setUp() deposit = Deposit(status=DEPOSIT_STATUS_VERIFIED, collection=self.collection, client=self.user) deposit.save() self.deposit = Deposit.objects.get(pk=deposit.id) assert self.deposit.status == DEPOSIT_STATUS_VERIFIED - @istest - def update_deposit_status(self): + def test_update_deposit_status(self): """Existing status for update should return a 204 response """ url = reverse(PRIVATE_PUT_DEPOSIT, args=[self.collection.name, self.deposit.id]) possible_status = set(DEPOSIT_STATUS_DETAIL.keys()) - set( [DEPOSIT_STATUS_LOAD_SUCCESS]) for _status in possible_status: response = self.client.put( url, content_type='application/json', data=json.dumps({'status': _status})) self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) deposit = Deposit.objects.get(pk=self.deposit.id) self.assertEquals(deposit.status, _status) - @istest - def update_deposit_status_with_info(self): + def test_update_deposit_status_with_info(self): """Existing status for update with info should return a 204 response """ url = reverse(PRIVATE_PUT_DEPOSIT, args=[self.collection.name, self.deposit.id]) expected_status = DEPOSIT_STATUS_LOAD_SUCCESS origin_url = 'something' directory_id = '42a13fc721c8716ff695d0d62fc851d641f3a12b' revision_id = '47dc6b4636c7f6cba0df83e3d5490bf4334d987e' expected_swh_id = 'swh:1:dir:%s' % directory_id expected_swh_id_context = 'swh:1:dir:%s;origin=%s' % ( directory_id, origin_url) expected_swh_anchor_id = 'swh:1:rev:%s' % revision_id expected_swh_anchor_id_context = 'swh:1:rev:%s;origin=%s' % ( revision_id, origin_url) response = self.client.put( url, content_type='application/json', data=json.dumps({ 'status': expected_status, 'revision_id': revision_id, 'directory_id': directory_id, 'origin_url': origin_url, })) self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) deposit = Deposit.objects.get(pk=self.deposit.id) self.assertEquals(deposit.status, expected_status) self.assertEquals(deposit.swh_id, expected_swh_id) self.assertEquals(deposit.swh_id_context, expected_swh_id_context) self.assertEquals(deposit.swh_anchor_id, expected_swh_anchor_id) self.assertEquals(deposit.swh_anchor_id_context, expected_swh_anchor_id_context) - @istest - def update_deposit_status_will_fail_with_unknown_status(self): + def test_update_deposit_status_will_fail_with_unknown_status(self): """Unknown status for update should return a 400 response """ url = reverse(PRIVATE_PUT_DEPOSIT, args=[self.collection.name, self.deposit.id]) response = self.client.put( url, content_type='application/json', data=json.dumps({'status': 'unknown'})) self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) - @istest - def update_deposit_status_will_fail_with_no_status_key(self): + def test_update_deposit_status_will_fail_with_no_status_key(self): """No status provided for update should return a 400 response """ url = reverse(PRIVATE_PUT_DEPOSIT, args=[self.collection.name, self.deposit.id]) response = self.client.put( url, content_type='application/json', data=json.dumps({'something': 'something'})) self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) - @istest - def update_deposit_status_success_without_swh_id_fail(self): + def test_update_deposit_status_success_without_swh_id_fail(self): """Providing successful status without swh_id should return a 400 """ url = reverse(PRIVATE_PUT_DEPOSIT, args=[self.collection.name, self.deposit.id]) response = self.client.put( url, content_type='application/json', data=json.dumps({'status': DEPOSIT_STATUS_LOAD_SUCCESS})) self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) diff --git a/swh/deposit/tests/api/test_parser.py b/swh/deposit/tests/api/test_parser.py index d874867f..8e9cc917 100644 --- a/swh/deposit/tests/api/test_parser.py +++ b/swh/deposit/tests/api/test_parser.py @@ -1,104 +1,101 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import io from collections import OrderedDict -from nose.tools import istest from rest_framework.test import APITestCase from swh.deposit.parsers import SWHXMLParser class ParsingTest(APITestCase): """Access to main entry point is ok without authentication """ - @istest - def parsing_without_duplicates(self): + def test_parsing_without_duplicates(self): xml_no_duplicate = io.BytesIO(b''' Awesome Compiler GPL3.0 https://opensource.org/licenses/GPL-3.0 Python3 author1 Inria ocaml http://issuetracker.com ''') actual_result = SWHXMLParser().parse(xml_no_duplicate) expected_dict = OrderedDict( [('@xmlns', 'http://www.w3.org/2005/Atom'), ('@xmlns:codemeta', 'https://doi.org/10.5063/SCHEMA/CODEMETA-2.0'), ('title', 'Awesome Compiler'), ('codemeta:license', OrderedDict([('codemeta:name', 'GPL3.0'), ('codemeta:url', 'https://opensource.org/licenses/GPL-3.0')])), ('codemeta:runtimePlatform', 'Python3'), ('codemeta:author', OrderedDict([('codemeta:name', 'author1'), ('codemeta:affiliation', 'Inria')])), ('codemeta:programmingLanguage', 'ocaml'), ('codemeta:issueTracker', 'http://issuetracker.com')]) self.assertEqual(expected_dict, actual_result) - @istest - def parsing_with_duplicates(self): + def test_parsing_with_duplicates(self): xml_with_duplicates = io.BytesIO(b''' Another Compiler GNU/Linux GPL3.0 https://opensource.org/licenses/GPL-3.0 Un*x author1 Inria author2 Inria ocaml haskell spdx http://spdx.org python3 ''') actual_result = SWHXMLParser().parse(xml_with_duplicates) expected_dict = OrderedDict([ ('@xmlns', 'http://www.w3.org/2005/Atom'), ('@xmlns:codemeta', 'https://doi.org/10.5063/SCHEMA/CODEMETA-2.0'), ('title', 'Another Compiler'), ('codemeta:runtimePlatform', ['GNU/Linux', 'Un*x']), ('codemeta:license', [OrderedDict([('codemeta:name', 'GPL3.0'), ('codemeta:url', 'https://opensource.org/licenses/GPL-3.0')]), OrderedDict([('codemeta:name', 'spdx'), ('codemeta:url', 'http://spdx.org')])]), ('codemeta:author', [OrderedDict([('codemeta:name', 'author1'), ('codemeta:affiliation', 'Inria')]), OrderedDict([('codemeta:name', 'author2'), ('codemeta:affiliation', 'Inria')])]), ('codemeta:programmingLanguage', ['ocaml', 'haskell', 'python3'])]) self.assertEqual(expected_dict, actual_result) diff --git a/swh/deposit/tests/api/test_service_document.py b/swh/deposit/tests/api/test_service_document.py index f61f86dd..402e7557 100644 --- a/swh/deposit/tests/api/test_service_document.py +++ b/swh/deposit/tests/api/test_service_document.py @@ -1,107 +1,102 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from django.core.urlresolvers import reverse -from nose.tools import istest from rest_framework import status from rest_framework.test import APITestCase from swh.deposit.tests import TEST_CONFIG from swh.deposit.config import SD_IRI from ..common import BasicTestCase, WithAuthTestCase class ServiceDocumentNoAuthCase(APITestCase, BasicTestCase): """Service document endpoints are protected with basic authentication. """ - @istest - def service_document_no_authentication_fails(self): + def test_service_document_no_authentication_fails(self): """Without authentication, service document endpoint should return 401 """ url = reverse(SD_IRI) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) - @istest - def service_document_with_http_accept_should_not_break(self): + def test_service_document_with_http_accept_should_not_break(self): """Without auth, sd endpoint through browser should return 401 """ url = reverse(SD_IRI) # when response = self.client.get( url, HTTP_ACCEPT='text/html,application/xml;q=9,*/*,q=8') self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED) class ServiceDocumentCase(APITestCase, WithAuthTestCase, BasicTestCase): def assertResponseOk(self, response): # noqa: N802 self.assertEqual(response.status_code, status.HTTP_200_OK) self.assertEquals(response.content.decode('utf-8'), ''' 2.0 %s The Software Heritage (SWH) Archive %s Software Collection application/zip application/x-tar Collection Policy Software Heritage Archive Collect, Preserve, Share false false http://purl.org/net/sword/package/SimpleZip http://testserver/1/%s/ %s ''' % (TEST_CONFIG['max_upload_size'], self.username, self.username, self.username, self.username)) # noqa - @istest - def service_document(self): + def test_service_document(self): """With authentication, service document list user's collection """ url = reverse(SD_IRI) # when response = self.client.get(url) # then self.assertResponseOk(response) - @istest - def service_document_with_http_accept_header(self): + def test_service_document_with_http_accept_header(self): """With authentication, with browser, sd list user's collection """ url = reverse(SD_IRI) # when response = self.client.get( url, HTTP_ACCEPT='text/html,application/xml;q=9,*/*,q=8') self.assertResponseOk(response) diff --git a/swh/deposit/tests/loader/test_checker.py b/swh/deposit/tests/loader/test_checker.py index b48afc92..c0d6ddad 100644 --- a/swh/deposit/tests/loader/test_checker.py +++ b/swh/deposit/tests/loader/test_checker.py @@ -1,71 +1,68 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from nose.tools import istest from rest_framework.test import APITestCase from swh.deposit.models import Deposit from swh.deposit.config import PRIVATE_CHECK_DEPOSIT, DEPOSIT_STATUS_VERIFIED from swh.deposit.config import DEPOSIT_STATUS_REJECTED from swh.deposit.loader.checker import DepositChecker from django.core.urlresolvers import reverse from .common import SWHDepositTestClient, CLIENT_TEST_CONFIG from ..common import BasicTestCase, WithAuthTestCase, CommonCreationRoutine from ..common import FileSystemCreationRoutine class DepositCheckerScenarioTest(APITestCase, WithAuthTestCase, BasicTestCase, CommonCreationRoutine, FileSystemCreationRoutine): def setUp(self): super().setUp() # 2. Sets a basic client which accesses the test data checker_client = SWHDepositTestClient(client=self.client, config=CLIENT_TEST_CONFIG) # 3. setup loader with no persistence and that client self.checker = DepositChecker(client=checker_client) - @istest - def check_deposit_ready(self): + def test_check_deposit_ready(self): """Check on a valid 'deposited' deposit should result in 'verified' """ # 1. create a deposit with archive and metadata deposit_id = self.create_simple_binary_deposit() deposit_id = self.update_binary_deposit(deposit_id, status_partial=False) args = [self.collection.name, deposit_id] deposit_check_url = reverse(PRIVATE_CHECK_DEPOSIT, args=args) # when actual_result = self.checker.check(deposit_check_url=deposit_check_url) # then deposit = Deposit.objects.get(pk=deposit_id) self.assertEquals(deposit.status, DEPOSIT_STATUS_VERIFIED) self.assertEquals(actual_result, {'status': 'eventful'}) - @istest - def check_deposit_rejected(self): + def test_check_deposit_rejected(self): """Check on invalid 'deposited' deposit should result in 'rejected' """ # 1. create a deposit with archive and metadata deposit_id = self.create_deposit_with_invalid_archive() args = [self.collection.name, deposit_id] deposit_check_url = reverse(PRIVATE_CHECK_DEPOSIT, args=args) # when actual_result = self.checker.check(deposit_check_url=deposit_check_url) # then deposit = Deposit.objects.get(pk=deposit_id) self.assertEquals(deposit.status, DEPOSIT_STATUS_REJECTED) self.assertEquals(actual_result, {'status': 'eventful'}) diff --git a/swh/deposit/tests/loader/test_client.py b/swh/deposit/tests/loader/test_client.py index 55b857d9..cbdb703f 100644 --- a/swh/deposit/tests/loader/test_client.py +++ b/swh/deposit/tests/loader/test_client.py @@ -1,268 +1,258 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import shutil import tempfile import unittest from nose.plugins.attrib import attr -from nose.tools import istest from swh.deposit.client import PrivateApiDepositClient from swh.deposit.config import DEPOSIT_STATUS_LOAD_SUCCESS from swh.deposit.config import DEPOSIT_STATUS_LOAD_FAILURE from .common import CLIENT_TEST_CONFIG class StreamedResponse: """Streamed response facsimile """ def __init__(self, ok, stream): self.ok = ok self.stream = stream def iter_content(self): yield from self.stream class FakeRequestClientGet: """Fake request client dedicated to get method calls. """ def __init__(self, response): self.response = response def get(self, *args, **kwargs): self.args = args self.kwargs = kwargs return self.response @attr('fs') class PrivateApiDepositClientReadArchiveTest(unittest.TestCase): def setUp(self): super().setUp() self.temporary_directory = tempfile.mkdtemp(dir='/tmp') def tearDown(self): super().setUp() shutil.rmtree(self.temporary_directory) - @istest - def archive_get(self): + def test_archive_get(self): """Reading archive should write data in temporary directory """ stream_content = [b"some", b"streamed", b"response"] response = StreamedResponse( ok=True, stream=(s for s in stream_content)) _client = FakeRequestClientGet(response) deposit_client = PrivateApiDepositClient(config=CLIENT_TEST_CONFIG, _client=_client) archive_path = os.path.join(self.temporary_directory, 'test.archive') archive_path = deposit_client.archive_get('/some/url', archive_path) self.assertTrue(os.path.exists(archive_path)) with open(archive_path, 'rb') as f: actual_content = f.read() self.assertEquals(actual_content, b''.join(stream_content)) self.assertEquals(_client.args, ('http://nowhere:9000/some/url', )) self.assertEquals(_client.kwargs, { 'stream': True }) - @istest - def archive_get_with_authentication(self): + def test_archive_get_with_authentication(self): """Reading archive should write data in temporary directory """ stream_content = [b"some", b"streamed", b"response", b"for", b"auth"] response = StreamedResponse( ok=True, stream=(s for s in stream_content)) _client = FakeRequestClientGet(response) _config = CLIENT_TEST_CONFIG.copy() _config['auth'] = { # add authentication setup 'username': 'user', 'password': 'pass' } deposit_client = PrivateApiDepositClient(_config, _client=_client) archive_path = os.path.join(self.temporary_directory, 'test.archive') archive_path = deposit_client.archive_get('/some/url', archive_path) self.assertTrue(os.path.exists(archive_path)) with open(archive_path, 'rb') as f: actual_content = f.read() self.assertEquals(actual_content, b''.join(stream_content)) self.assertEquals(_client.args, ('http://nowhere:9000/some/url', )) self.assertEquals(_client.kwargs, { 'stream': True, 'auth': ('user', 'pass') }) - @istest - def archive_get_can_fail(self): + def test_archive_get_can_fail(self): """Reading archive can fail for some reasons """ response = StreamedResponse(ok=False, stream=None) _client = FakeRequestClientGet(response) deposit_client = PrivateApiDepositClient(config=CLIENT_TEST_CONFIG, _client=_client) with self.assertRaisesRegex( ValueError, 'Problem when retrieving deposit archive'): deposit_client.archive_get('/some/url', 'some/path') class JsonResponse: """Json response facsimile """ def __init__(self, ok, response): self.ok = ok self.response = response def json(self): return self.response class PrivateApiDepositClientReadMetadataTest(unittest.TestCase): - @istest - def metadata_get(self): + def test_metadata_get(self): """Reading archive should write data in temporary directory """ expected_response = {"some": "dict"} response = JsonResponse( ok=True, response=expected_response) _client = FakeRequestClientGet(response) deposit_client = PrivateApiDepositClient(config=CLIENT_TEST_CONFIG, _client=_client) actual_metadata = deposit_client.metadata_get('/metadata') self.assertEquals(actual_metadata, expected_response) - @istest - def metadata_get_can_fail(self): + def test_metadata_get_can_fail(self): """Reading metadata can fail for some reasons """ _client = FakeRequestClientGet(JsonResponse(ok=False, response=None)) deposit_client = PrivateApiDepositClient(config=CLIENT_TEST_CONFIG, _client=_client) with self.assertRaisesRegex( ValueError, 'Problem when retrieving metadata at'): deposit_client.metadata_get('/some/metadata/url') class FakeRequestClientPut: """Fake Request client dedicated to put request method calls. """ args = None kwargs = None def put(self, *args, **kwargs): self.args = args self.kwargs = kwargs class PrivateApiDepositClientStatusUpdateTest(unittest.TestCase): - @istest - def status_update(self): + def test_status_update(self): """Update status """ _client = FakeRequestClientPut() deposit_client = PrivateApiDepositClient(config=CLIENT_TEST_CONFIG, _client=_client) deposit_client.status_update('/update/status', DEPOSIT_STATUS_LOAD_SUCCESS, revision_id='some-revision-id') self.assertEquals(_client.args, ('http://nowhere:9000/update/status', )) self.assertEquals(_client.kwargs, { 'json': { 'status': DEPOSIT_STATUS_LOAD_SUCCESS, 'revision_id': 'some-revision-id', } }) - @istest - def status_update_with_no_revision_id(self): + def test_status_update_with_no_revision_id(self): """Reading metadata can fail for some reasons """ _client = FakeRequestClientPut() deposit_client = PrivateApiDepositClient(config=CLIENT_TEST_CONFIG, _client=_client) deposit_client.status_update('/update/status/fail', DEPOSIT_STATUS_LOAD_FAILURE) self.assertEquals(_client.args, ('http://nowhere:9000/update/status/fail', )) self.assertEquals(_client.kwargs, { 'json': { 'status': DEPOSIT_STATUS_LOAD_FAILURE, } }) class PrivateApiDepositClientCheckTest(unittest.TestCase): - @istest - def check(self): + def test_check(self): """When check ok, this should return the deposit's status """ _client = FakeRequestClientGet( JsonResponse(ok=True, response={'status': 'something'})) deposit_client = PrivateApiDepositClient(config=CLIENT_TEST_CONFIG, _client=_client) r = deposit_client.check('/check') self.assertEquals(_client.args, ('http://nowhere:9000/check', )) self.assertEquals(_client.kwargs, {}) self.assertEquals(r, 'something') - @istest - def check_fails(self): + def test_check_fails(self): """Checking deposit can fail for some reason """ _client = FakeRequestClientGet( JsonResponse(ok=False, response=None)) deposit_client = PrivateApiDepositClient(config=CLIENT_TEST_CONFIG, _client=_client) with self.assertRaisesRegex( ValueError, 'Problem when checking deposit'): deposit_client.check('/check/fails') self.assertEquals(_client.args, ('http://nowhere:9000/check/fails', )) self.assertEquals(_client.kwargs, {}) diff --git a/swh/deposit/tests/loader/test_loader.py b/swh/deposit/tests/loader/test_loader.py index 8b7e646c..efdc30c1 100644 --- a/swh/deposit/tests/loader/test_loader.py +++ b/swh/deposit/tests/loader/test_loader.py @@ -1,308 +1,305 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import unittest import shutil -from nose.tools import istest from nose.plugins.attrib import attr from rest_framework.test import APITestCase from swh.model import hashutil from swh.deposit.models import Deposit from swh.deposit.loader import loader from swh.deposit.config import ( PRIVATE_GET_RAW_CONTENT, PRIVATE_GET_DEPOSIT_METADATA, PRIVATE_PUT_DEPOSIT ) from django.core.urlresolvers import reverse from .common import SWHDepositTestClient, CLIENT_TEST_CONFIG from .. import TEST_LOADER_CONFIG from ..common import (BasicTestCase, WithAuthTestCase, CommonCreationRoutine, FileSystemCreationRoutine) TOOL_ID = 99 PROVIDER_ID = 12 class DepositLoaderInhibitsStorage: """Mixin class to inhibit the persistence and keep in memory the data sent for storage. cf. SWHDepositLoaderNoStorage """ def __init__(self, client=None): # client is not used here, transit it nonetheless to other mixins super().__init__(client=client) # typed data self.state = { 'origin': [], 'origin_visit': [], 'origin_metadata': [], 'content': [], 'directory': [], 'revision': [], 'release': [], 'snapshot': [], 'tool': [], 'provider': [] } def _add(self, type, l): """Add without duplicates and keeping the insertion order. Args: type (str): Type of objects concerned by the action l ([object]): List of 'type' object """ col = self.state[type] for o in l: if o in col: continue col.extend([o]) def send_origin(self, origin): origin.update({'id': 1}) self._add('origin', [origin]) return origin['id'] def send_origin_visit(self, origin_id, visit_date): origin_visit = { 'origin': origin_id, 'visit_date': visit_date, 'visit': 1, } self._add('origin_visit', [origin_visit]) return origin_visit def send_origin_metadata(self, origin_id, visit_date, provider_id, tool_id, metadata): origin_metadata = { 'origin_id': origin_id, 'visit_date': visit_date, 'provider_id': provider_id, 'tool_id': tool_id, 'metadata': metadata } self._add('origin_metadata', [origin_metadata]) return origin_metadata def send_tool(self, tool): tool = { 'tool_name': tool['tool_name'], 'tool_version': tool['tool_version'], 'tool_configuration': tool['tool_configuration'] } self._add('tool', [tool]) tool_id = TOOL_ID return tool_id def send_provider(self, provider): provider = { 'provider_name': provider['provider_name'], 'provider_type': provider['provider_type'], 'provider_url': provider['provider_url'], 'metadata': provider['metadata'] } self._add('provider', [provider]) provider_id = PROVIDER_ID return provider_id def maybe_load_contents(self, contents): self._add('content', contents) def maybe_load_directories(self, directories): self._add('directory', directories) def maybe_load_revisions(self, revisions): self._add('revision', revisions) def maybe_load_releases(self, releases): self._add('release', releases) def maybe_load_snapshot(self, snapshot): self._add('snapshot', [snapshot]) def open_fetch_history(self): pass def close_fetch_history_failure(self, fetch_history_id): pass def close_fetch_history_success(self, fetch_history_id): pass def update_origin_visit(self, origin_id, visit, status): self.status = status # Override to do nothing at the end def close_failure(self): pass def close_success(self): pass class TestLoaderUtils(unittest.TestCase): def assertRevisionsOk(self, expected_revisions): # noqa: N802 """Check the loader's revisions match the expected revisions. Expects self.loader to be instantiated and ready to be inspected (meaning the loading took place). Args: expected_revisions (dict): Dict with key revision id, value the targeted directory id. """ # The last revision being the one used later to start back from for rev in self.loader.state['revision']: rev_id = hashutil.hash_to_hex(rev['id']) directory_id = hashutil.hash_to_hex(rev['directory']) self.assertEquals(expected_revisions[rev_id], directory_id) class SWHDepositLoaderNoStorage(DepositLoaderInhibitsStorage, loader.DepositLoader): """Loader to test. It inherits from the actual deposit loader to actually test its correct behavior. It also inherits from DepositLoaderInhibitsStorage so that no persistence takes place. """ pass @attr('fs') class DepositLoaderScenarioTest(APITestCase, WithAuthTestCase, BasicTestCase, CommonCreationRoutine, FileSystemCreationRoutine, TestLoaderUtils): def setUp(self): super().setUp() # create the extraction dir used by the loader os.makedirs(TEST_LOADER_CONFIG['extraction_dir'], exist_ok=True) # 1. create a deposit with archive and metadata self.deposit_id = self.create_simple_binary_deposit() # 2. Sets a basic client which accesses the test data loader_client = SWHDepositTestClient(self.client, config=CLIENT_TEST_CONFIG) # 3. setup loader with no persistence and that client self.loader = SWHDepositLoaderNoStorage(client=loader_client) def tearDown(self): super().tearDown() shutil.rmtree(TEST_LOADER_CONFIG['extraction_dir']) - @istest - def inject_deposit_ready(self): + def test_inject_deposit_ready(self): """Load a deposit which is ready """ args = [self.collection.name, self.deposit_id] archive_url = reverse(PRIVATE_GET_RAW_CONTENT, args=args) deposit_meta_url = reverse(PRIVATE_GET_DEPOSIT_METADATA, args=args) deposit_update_url = reverse(PRIVATE_PUT_DEPOSIT, args=args) # when self.loader.load(archive_url=archive_url, deposit_meta_url=deposit_meta_url, deposit_update_url=deposit_update_url) # then self.assertEquals(len(self.loader.state['content']), 1) self.assertEquals(len(self.loader.state['directory']), 1) self.assertEquals(len(self.loader.state['revision']), 1) self.assertEquals(len(self.loader.state['release']), 0) self.assertEquals(len(self.loader.state['snapshot']), 1) - @istest - def inject_deposit_verify_metadata(self): + def test_inject_deposit_verify_metadata(self): """Load a deposit with metadata, test metadata integrity """ self.deposit_metadata_id = self.add_metadata_to_deposit( self.deposit_id) args = [self.collection.name, self.deposit_metadata_id] archive_url = reverse(PRIVATE_GET_RAW_CONTENT, args=args) deposit_meta_url = reverse(PRIVATE_GET_DEPOSIT_METADATA, args=args) deposit_update_url = reverse(PRIVATE_PUT_DEPOSIT, args=args) # when self.loader.load(archive_url=archive_url, deposit_meta_url=deposit_meta_url, deposit_update_url=deposit_update_url) # then self.assertEquals(len(self.loader.state['content']), 1) self.assertEquals(len(self.loader.state['directory']), 1) self.assertEquals(len(self.loader.state['revision']), 1) self.assertEquals(len(self.loader.state['release']), 0) self.assertEquals(len(self.loader.state['snapshot']), 1) self.assertEquals(len(self.loader.state['origin_metadata']), 1) self.assertEquals(len(self.loader.state['tool']), 1) self.assertEquals(len(self.loader.state['provider']), 1) codemeta = 'codemeta:' origin_url = 'https://hal-test.archives-ouvertes.fr/hal-01243065' expected_origin_metadata = { '@xmlns': 'http://www.w3.org/2005/Atom', '@xmlns:codemeta': 'https://doi.org/10.5063/SCHEMA/CODEMETA-2.0', 'author': { 'email': 'hal@ccsd.cnrs.fr', 'name': 'HAL' }, codemeta + 'url': origin_url, codemeta + 'runtimePlatform': 'phpstorm', codemeta + 'license': [ { codemeta + 'name': 'GNU General Public License v3.0 only' }, { codemeta + 'name': 'CeCILL Free Software License Agreement v1.1' # noqa } ], codemeta + 'author': { codemeta + 'name': 'Morane Gruenpeter' }, codemeta + 'programmingLanguage': ['php', 'python', 'C'], codemeta + 'applicationCategory': 'test', codemeta + 'dateCreated': '2017-05-03T16:08:47+02:00', codemeta + 'version': '1', 'external_identifier': 'hal-01243065', 'title': 'Composing a Web of Audio Applications', codemeta + 'description': 'this is the description', 'id': 'hal-01243065', 'client': 'hal', codemeta + 'keywords': 'DSP programming,Web', codemeta + 'developmentStatus': 'stable' } result = self.loader.state['origin_metadata'][0] self.assertEquals(result['metadata'], expected_origin_metadata) self.assertEquals(result['tool_id'], TOOL_ID) self.assertEquals(result['provider_id'], PROVIDER_ID) deposit = Deposit.objects.get(pk=self.deposit_id) self.assertRegex(deposit.swh_id, r'^swh:1:dir:.*') self.assertEquals(deposit.swh_id_context, '%s;origin=%s' % ( deposit.swh_id, origin_url )) self.assertRegex(deposit.swh_anchor_id, r'^swh:1:rev:.*') self.assertEquals(deposit.swh_anchor_id_context, '%s;origin=%s' % ( deposit.swh_anchor_id, origin_url )) diff --git a/swh/deposit/tests/test_utils.py b/swh/deposit/tests/test_utils.py index 1dfe46e6..d6e03835 100644 --- a/swh/deposit/tests/test_utils.py +++ b/swh/deposit/tests/test_utils.py @@ -1,138 +1,132 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest -from nose.tools import istest from swh.deposit import utils class UtilsTestCase(unittest.TestCase): """Utils library """ - @istest - def merge(self): + def test_merge(self): """Calling utils.merge on dicts should merge without losing information """ d0 = { 'author': 'someone', 'license': [['gpl2']], 'a': 1 } d1 = { 'author': ['author0', {'name': 'author1'}], 'license': [['gpl3']], 'b': { '1': '2' } } d2 = { 'author': map(lambda x: x, ['else']), 'license': 'mit', 'b': { '2': '3', } } d3 = { 'author': (v for v in ['no one']), } actual_merge = utils.merge(d0, d1, d2, d3) expected_merge = { 'a': 1, 'license': [['gpl2'], ['gpl3'], 'mit'], 'author': [ 'someone', 'author0', {'name': 'author1'}, 'else', 'no one'], 'b': { '1': '2', '2': '3', } } self.assertEquals(actual_merge, expected_merge) - @istest - def merge_2(self): + def test_merge_2(self): d0 = { 'license': 'gpl2', 'runtime': { 'os': 'unix derivative' } } d1 = { 'license': 'gpl3', 'runtime': 'GNU/Linux' } expected = { 'license': ['gpl2', 'gpl3'], 'runtime': [ { 'os': 'unix derivative' }, 'GNU/Linux' ], } actual = utils.merge(d0, d1) self.assertEqual(actual, expected) - @istest - def merge_edge_cases(self): + def test_merge_edge_cases(self): input_dict = { 'license': ['gpl2', 'gpl3'], 'runtime': [ { 'os': 'unix derivative' }, 'GNU/Linux' ], } # against empty dict actual = utils.merge(input_dict, {}) self.assertEqual(actual, input_dict) # against oneself actual = utils.merge(input_dict, input_dict, input_dict) self.assertEqual(input_dict, input_dict) - @istest - def merge_one_dict(self): + def test_merge_one_dict(self): """Merge one dict should result in the same dict value """ input_and_expected = {'anything': 'really'} actual = utils.merge(input_and_expected) self.assertEqual(actual, input_and_expected) - @istest - def merge_raise(self): + def test_merge_raise(self): """Calling utils.merge with any no dict argument should raise """ d0 = { 'author': 'someone', 'a': 1 } d1 = ['not a dict'] with self.assertRaises(ValueError): utils.merge(d0, d1) with self.assertRaises(ValueError): utils.merge(d1, d0) with self.assertRaises(ValueError): utils.merge(d1) self.assertEquals(utils.merge(d0), d0)