diff --git a/swh/deposit/tests/api/test_deposit.py b/swh/deposit/tests/api/test_deposit.py
index 1fe03228..fcfac4e9 100644
--- a/swh/deposit/tests/api/test_deposit.py
+++ b/swh/deposit/tests/api/test_deposit.py
@@ -1,160 +1,160 @@
# Copyright (C) 2017-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import hashlib
from django.core.urlresolvers import reverse
from io import BytesIO
from rest_framework import status
from rest_framework.test import APITestCase
from swh.deposit.config import COL_IRI, EDIT_SE_IRI, DEPOSIT_STATUS_REJECTED
from swh.deposit.config import DEPOSIT_STATUS_PARTIAL
from swh.deposit.config import DEPOSIT_STATUS_LOAD_SUCCESS
from swh.deposit.config import DEPOSIT_STATUS_LOAD_FAILURE
from swh.deposit.models import Deposit, DepositClient, DepositCollection
from swh.deposit.parsers import parse_xml
from ..common import BasicTestCase, WithAuthTestCase, CommonCreationRoutine
class DepositNoAuthCase(APITestCase, BasicTestCase):
"""Deposit access are protected with basic authentication.
"""
def test_post_will_fail_with_401(self):
"""Without authentication, endpoint refuses access with 401 response
"""
url = reverse(COL_IRI, args=[self.collection.name])
# when
response = self.client.post(url)
# then
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
class DepositFailuresTest(APITestCase, WithAuthTestCase, BasicTestCase,
CommonCreationRoutine):
"""Deposit access are protected with basic authentication.
"""
def setUp(self):
super().setUp()
# Add another user
_collection2 = DepositCollection(name='some')
_collection2.save()
_user = DepositClient.objects.create_user(username='user',
password='user')
_user.collections = [_collection2.id]
self.collection2 = _collection2
def test_access_to_another_user_collection_is_forbidden(self):
"""Access to another user collection should return a 403
"""
url = reverse(COL_IRI, args=[self.collection2.name])
response = self.client.post(url)
self.assertEqual(response.status_code,
status.HTTP_403_FORBIDDEN)
self.assertRegex(response.content.decode('utf-8'),
'Client hal cannot access collection %s' % (
self.collection2.name, ))
def test_delete_on_col_iri_not_supported(self):
"""Delete on col iri should return a 405 response
"""
url = reverse(COL_IRI, args=[self.collection.name])
response = self.client.delete(url)
self.assertEqual(response.status_code,
status.HTTP_405_METHOD_NOT_ALLOWED)
self.assertRegex(response.content.decode('utf-8'),
'DELETE method is not supported on this endpoint')
def create_deposit_with_rejection_status(self):
url = reverse(COL_IRI, args=[self.collection.name])
data = b'some data which is clearly not a zip file'
md5sum = hashlib.md5(data).hexdigest()
external_id = 'some-external-id-1'
# when
response = self.client.post(
url,
content_type='application/zip', # as zip
data=data,
# + headers
CONTENT_LENGTH=len(data),
# other headers needs HTTP_ prefix to be taken into account
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=md5sum,
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_CONTENT_DISPOSITION='attachment; filename=filename0')
- self.assertEquals(response.status_code, status.HTTP_201_CREATED)
+ self.assertEqual(response.status_code, status.HTTP_201_CREATED)
response_content = parse_xml(BytesIO(response.content))
actual_state = response_content['deposit_status']
- self.assertEquals(actual_state, DEPOSIT_STATUS_REJECTED)
+ self.assertEqual(actual_state, DEPOSIT_STATUS_REJECTED)
def test_act_on_deposit_rejected_is_not_permitted(self):
deposit_id = self.create_deposit_with_status(DEPOSIT_STATUS_REJECTED)
deposit = Deposit.objects.get(pk=deposit_id)
assert deposit.status == DEPOSIT_STATUS_REJECTED
response = self.client.post(
reverse(EDIT_SE_IRI, args=[self.collection.name, deposit_id]),
content_type='application/atom+xml;type=entry',
data=self.atom_entry_data1,
HTTP_SLUG='external-id')
- self.assertEquals(response.status_code, status.HTTP_400_BAD_REQUEST)
+ self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertRegex(
response.content.decode('utf-8'),
"You can only act on deposit with status '%s'" % (
DEPOSIT_STATUS_PARTIAL, ))
def test_add_deposit_with_parent(self):
# given multiple deposit already loaded
deposit_id = self.create_deposit_with_status(
status=DEPOSIT_STATUS_LOAD_SUCCESS,
external_id='some-external-id')
deposit1 = Deposit.objects.get(pk=deposit_id)
self.assertIsNotNone(deposit1)
- self.assertEquals(deposit1.external_id, 'some-external-id')
- self.assertEquals(deposit1.status, DEPOSIT_STATUS_LOAD_SUCCESS)
+ self.assertEqual(deposit1.external_id, 'some-external-id')
+ self.assertEqual(deposit1.status, DEPOSIT_STATUS_LOAD_SUCCESS)
deposit_id2 = self.create_deposit_with_status(
status=DEPOSIT_STATUS_LOAD_SUCCESS,
external_id='some-external-id')
deposit2 = Deposit.objects.get(pk=deposit_id2)
self.assertIsNotNone(deposit2)
- self.assertEquals(deposit2.external_id, 'some-external-id')
- self.assertEquals(deposit2.status, DEPOSIT_STATUS_LOAD_SUCCESS)
+ self.assertEqual(deposit2.external_id, 'some-external-id')
+ self.assertEqual(deposit2.status, DEPOSIT_STATUS_LOAD_SUCCESS)
deposit_id3 = self.create_deposit_with_status(
status=DEPOSIT_STATUS_LOAD_FAILURE,
external_id='some-external-id')
deposit3 = Deposit.objects.get(pk=deposit_id3)
self.assertIsNotNone(deposit3)
- self.assertEquals(deposit3.external_id, 'some-external-id')
- self.assertEquals(deposit3.status, DEPOSIT_STATUS_LOAD_FAILURE)
+ self.assertEqual(deposit3.external_id, 'some-external-id')
+ self.assertEqual(deposit3.status, DEPOSIT_STATUS_LOAD_FAILURE)
# when
deposit_id3 = self.create_simple_deposit_partial(
external_id='some-external-id')
# then
deposit4 = Deposit.objects.get(pk=deposit_id3)
self.assertIsNotNone(deposit4)
- self.assertEquals(deposit4.external_id, 'some-external-id')
- self.assertEquals(deposit4.status, DEPOSIT_STATUS_PARTIAL)
- self.assertEquals(deposit4.parent, deposit2)
+ self.assertEqual(deposit4.external_id, 'some-external-id')
+ self.assertEqual(deposit4.status, DEPOSIT_STATUS_PARTIAL)
+ self.assertEqual(deposit4.parent, deposit2)
diff --git a/swh/deposit/tests/api/test_deposit_atom.py b/swh/deposit/tests/api/test_deposit_atom.py
index 84b5f742..2f50f050 100644
--- a/swh/deposit/tests/api/test_deposit_atom.py
+++ b/swh/deposit/tests/api/test_deposit_atom.py
@@ -1,528 +1,528 @@
# Copyright (C) 2017-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from django.core.urlresolvers import reverse
from io import BytesIO
from rest_framework import status
from rest_framework.test import APITestCase
from swh.deposit.config import COL_IRI, DEPOSIT_STATUS_DEPOSITED
from swh.deposit.models import Deposit, DepositRequest
from swh.deposit.parsers import parse_xml
from ..common import BasicTestCase, WithAuthTestCase
class DepositAtomEntryTestCase(APITestCase, WithAuthTestCase, BasicTestCase):
"""Try and post atom entry deposit.
"""
def setUp(self):
super().setUp()
self.atom_entry_data0 = b"""
Awesome Compiler
hal
urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a
%s
2017-10-07T15:17:08Z
some awesome author
something
awesome-compiler
This is an awesome compiler destined to
awesomely compile stuff
and other stuff
compiler,programming,language
2005-10-07T17:17:08Z
2005-10-07T17:17:08Z
release note
related link
Awesome
https://hoster.org/awesome-compiler
GNU/Linux
0.0.1
running
all
"""
self.atom_entry_data1 = b"""
hal
urn:uuid:2225c695-cfb8-4ebb-aaaa-80da344efa6a
2017-10-07T15:17:08Z
some awesome author
something
awesome-compiler
This is an awesome compiler destined to
awesomely compile stuff
and other stuff
compiler,programming,language
2005-10-07T17:17:08Z
2005-10-07T17:17:08Z
release note
related link
Awesome
https://hoster.org/awesome-compiler
GNU/Linux
0.0.1
running
all
"""
self.atom_entry_data2 = b"""
%s
"""
self.atom_entry_data_empty_body = b"""
"""
self.atom_entry_data3 = b"""
something
"""
self.atom_entry_data_atom_only = b"""
Awesome Compiler
urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a
1785io25c695
2017-10-07T15:17:08Z
some awesome author
"""
self.atom_entry_data_codemeta = b"""
Awesome Compiler
urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a
1785io25c695
1785io25c695
origin url
other identifier, DOI, ARK
Domain
description
key-word 1
key-word 2
creation date
publication date
comment
article name
article id
Collaboration/Projet
project name
id
see also
Sponsor A
Sponsor B
Platform/OS
dependencies
Version
active
license
url spdx
.Net Framework 3.0
Python2.3
author1
Inria
UPMC
author2
Inria
UPMC
http://code.com
language 1
language 2
http://issuetracker.com
""" # noqa
self.atom_entry_data_dc_codemeta = b"""
%s
hal-01587361
https://hal.inria.fr/hal-01587361
https://hal.inria.fr/hal-01587361/document
https://hal.inria.fr/hal-01587361/file/AffectationRO-v1.0.0.zip
doi:10.5281/zenodo.438684
The assignment problem
AffectationRO
Gruenpeter, Morane
[INFO] Computer Science [cs]
[INFO.INFO-RO] Computer Science [cs]/Operations Research [cs.RO]
SOFTWARE
Project in OR: The assignment problemA java implementation for the assignment problem first release
description fr
2015-06-01
2017-10-19
en
url stable
Version sur hal
Version entre par lutilisateur
Mots-cls
Commentaire
Rfrence interne
Collaboration/Projet
nom du projet
id
Voir aussi
Financement
Projet ANR
Projet Europen
Platform/OS
Dpendances
Etat du dveloppement
license
url spdx
Outils de dveloppement- outil no1
Outils de dveloppement- outil no2
http://code.com
language 1
language 2
""" # noqa
self.atom_entry_tei = b"""HAL TEI export of hal-01587083CCSDDistributed under a Creative Commons Attribution 4.0 International LicenseHAL API platform
questionnaire software metadataMoraneGruenpeter7de56c632362954fa84172cad80afe4einria.fr1556733MoraneGruenpeterf85a43a5fb4a2e0778a77e017f28c8fdgmail.com2017-09-29 11:21:322017-10-03 17:20:132017-10-03 17:20:132017-09-292017-09-29contributorMoraneGruenpeterf85a43a5fb4a2e0778a77e017f28c8fdgmail.comCCSDhal-01587083https://hal.inria.fr/hal-01587083gruenpeter:hal-0158708320172017questionnaire software metadataMoraneGruenpeter7de56c632362954fa84172cad80afe4einria.fr1556733EnglishComputer Science [cs]SoftwareIRILLInitiative pour la Recherche et l'Innovation sur le Logiciel Libre[https://www.irill.org/]Universite Pierre et Marie Curie - Paris 6UPMC4 place Jussieu - 75005 Paris[http://www.upmc.fr/]Institut National de Recherche en Informatique et en AutomatiqueInriaDomaine de VoluceauRocquencourt - BP 10578153 Le Chesnay Cedex[http://www.inria.fr/en/]Universite Paris Diderot - Paris 7UPD75 rue Thomas-Mann - 75205 Paris cedex 13[http://www.univ-paris-diderot.fr]""" # noqa
self.atom_entry_data_badly_formatted = b"""
"""
self.atom_error_with_decimal = b"""
Composing a Web of Audio Applications
hal
hal-01243065
hal-01243065
https://hal-test.archives-ouvertes.fr/hal-01243065
test
DSP programming,Web,Composability,Faust
2017-05-03T16:08:47+02:00
The Web offers a great opportunity to share, deploy and use programs without installation difficulties. In this article we explore the idea of freely combining/composing real-time audio applications deployed on the Web using Faust audio DSP language.
1
10.4
phpstorm
stable
linux
php
python
C
GNU General Public License v3.0 only
CeCILL Free Software License Agreement v1.1
HAL
hal@ccsd.cnrs.fr
Someone Nice
someone@nice.fr
FFJ
""" # noqa
def test_post_deposit_atom_entry_serialization_error(self):
"""Posting an initial atom entry should return 201 with deposit receipt
"""
# given
# when
response = self.client.post(
reverse(COL_IRI, args=[self.collection.name]),
content_type='application/atom+xml;type=entry',
data=self.atom_error_with_decimal,
HTTP_SLUG='external-id',
HTTP_IN_PROGRESS='false')
# then
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
response_content = parse_xml(BytesIO(response.content))
deposit_id = response_content['deposit_id']
deposit = Deposit.objects.get(pk=deposit_id)
dr = DepositRequest.objects.get(deposit=deposit)
self.assertIsNotNone(dr.metadata)
sw_version = dr.metadata.get('codemeta:softwareVersion')
- self.assertEquals(sw_version, '10.4')
+ self.assertEqual(sw_version, '10.4')
def test_post_deposit_atom_empty_body_request(self):
"""Posting empty body request should return a 400 response
"""
response = self.client.post(
reverse(COL_IRI, args=[self.collection.name]),
content_type='application/atom+xml;type=entry',
data=self.atom_entry_data_empty_body)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
def test_post_deposit_atom_badly_formatted_is_a_bad_request(self):
"""Posting a badly formatted atom should return a 400 response
"""
response = self.client.post(
reverse(COL_IRI, args=[self.collection.name]),
content_type='application/atom+xml;type=entry',
data=self.atom_entry_data_badly_formatted)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
def test_post_deposit_atom_without_slug_header_is_bad_request(self):
"""Posting an atom entry without a slug header should return a 400
"""
url = reverse(COL_IRI, args=[self.collection.name])
# when
response = self.client.post(
url,
content_type='application/atom+xml;type=entry',
data=self.atom_entry_data0,
# + headers
HTTP_IN_PROGRESS='false')
self.assertIn(b'Missing SLUG header', response.content)
self.assertEqual(response.status_code,
status.HTTP_400_BAD_REQUEST)
def test_post_deposit_atom_unknown_collection(self):
"""Posting an atom entry to an unknown collection should return a 404
"""
response = self.client.post(
reverse(COL_IRI, args=['unknown-one']),
content_type='application/atom+xml;type=entry',
data=self.atom_entry_data3,
HTTP_SLUG='something')
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
def test_post_deposit_atom_entry_initial(self):
"""Posting an initial atom entry should return 201 with deposit receipt
"""
# given
external_id = 'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a'
with self.assertRaises(Deposit.DoesNotExist):
Deposit.objects.get(external_id=external_id)
atom_entry_data = self.atom_entry_data0 % external_id.encode('utf-8')
# when
response = self.client.post(
reverse(COL_IRI, args=[self.collection.name]),
content_type='application/atom+xml;type=entry',
data=atom_entry_data,
HTTP_SLUG='external-id',
HTTP_IN_PROGRESS='false')
# then
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
response_content = parse_xml(BytesIO(response.content))
deposit_id = response_content['deposit_id']
deposit = Deposit.objects.get(pk=deposit_id)
self.assertEqual(deposit.collection, self.collection)
self.assertEqual(deposit.external_id, external_id)
self.assertEqual(deposit.status, DEPOSIT_STATUS_DEPOSITED)
self.assertEqual(deposit.client, self.user)
# one associated request to a deposit
deposit_request = DepositRequest.objects.get(deposit=deposit)
self.assertIsNotNone(deposit_request.metadata)
- self.assertEquals(
+ self.assertEqual(
deposit_request.raw_metadata, atom_entry_data.decode('utf-8'))
self.assertFalse(bool(deposit_request.archive))
def test_post_deposit_atom_entry_with_codemeta(self):
"""Posting an initial atom entry should return 201 with deposit receipt
"""
# given
external_id = 'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a'
with self.assertRaises(Deposit.DoesNotExist):
Deposit.objects.get(external_id=external_id)
atom_entry_data = self.atom_entry_data_dc_codemeta % (
external_id.encode('utf-8'), )
# when
response = self.client.post(
reverse(COL_IRI, args=[self.collection.name]),
content_type='application/atom+xml;type=entry',
data=atom_entry_data,
HTTP_SLUG='external-id',
HTTP_IN_PROGRESS='false')
# then
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
response_content = parse_xml(BytesIO(response.content))
deposit_id = response_content['deposit_id']
deposit = Deposit.objects.get(pk=deposit_id)
self.assertEqual(deposit.collection, self.collection)
self.assertEqual(deposit.external_id, external_id)
self.assertEqual(deposit.status, DEPOSIT_STATUS_DEPOSITED)
self.assertEqual(deposit.client, self.user)
# one associated request to a deposit
deposit_request = DepositRequest.objects.get(deposit=deposit)
self.assertIsNotNone(deposit_request.metadata)
- self.assertEquals(
+ self.assertEqual(
deposit_request.raw_metadata, atom_entry_data.decode('utf-8'))
self.assertFalse(bool(deposit_request.archive))
def test_post_deposit_atom_entry_tei(self):
"""Posting initial atom entry as TEI should return 201 with receipt
"""
# given
external_id = 'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a'
with self.assertRaises(Deposit.DoesNotExist):
Deposit.objects.get(external_id=external_id)
atom_entry_data = self.atom_entry_tei
# when
response = self.client.post(
reverse(COL_IRI, args=[self.collection.name]),
content_type='application/atom+xml;type=entry',
data=atom_entry_data,
HTTP_SLUG=external_id,
HTTP_IN_PROGRESS='false')
# then
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
response_content = parse_xml(BytesIO(response.content))
deposit_id = response_content['deposit_id']
deposit = Deposit.objects.get(pk=deposit_id)
self.assertEqual(deposit.collection, self.collection)
self.assertEqual(deposit.external_id, external_id)
self.assertEqual(deposit.status, DEPOSIT_STATUS_DEPOSITED)
self.assertEqual(deposit.client, self.user)
# one associated request to a deposit
deposit_request = DepositRequest.objects.get(deposit=deposit)
self.assertIsNotNone(deposit_request.metadata)
- self.assertEquals(
+ self.assertEqual(
deposit_request.raw_metadata, atom_entry_data.decode('utf-8'))
self.assertFalse(bool(deposit_request.archive))
def test_post_deposit_atom_entry_multiple_steps(self):
"""After initial deposit, updating a deposit should return a 201
"""
# given
external_id = 'urn:uuid:2225c695-cfb8-4ebb-aaaa-80da344efa6a'
with self.assertRaises(Deposit.DoesNotExist):
deposit = Deposit.objects.get(external_id=external_id)
# when
response = self.client.post(
reverse(COL_IRI, args=[self.collection.name]),
content_type='application/atom+xml;type=entry',
data=self.atom_entry_data1,
HTTP_IN_PROGRESS='True',
HTTP_SLUG=external_id)
# then
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
response_content = parse_xml(BytesIO(response.content))
deposit_id = int(response_content['deposit_id'])
deposit = Deposit.objects.get(pk=deposit_id)
self.assertEqual(deposit.collection, self.collection)
self.assertEqual(deposit.external_id, external_id)
self.assertEqual(deposit.status, 'partial')
self.assertEqual(deposit.client, self.user)
# one associated request to a deposit
deposit_requests = DepositRequest.objects.filter(deposit=deposit)
self.assertEqual(len(deposit_requests), 1)
atom_entry_data = self.atom_entry_data2 % external_id.encode('utf-8')
update_uri = response._headers['location'][1]
# when updating the first deposit post
response = self.client.post(
update_uri,
content_type='application/atom+xml;type=entry',
data=atom_entry_data,
HTTP_IN_PROGRESS='False')
# then
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
response_content = parse_xml(BytesIO(response.content))
deposit_id = int(response_content['deposit_id'])
deposit = Deposit.objects.get(pk=deposit_id)
self.assertEqual(deposit.collection, self.collection)
self.assertEqual(deposit.external_id, external_id)
self.assertEqual(deposit.status, DEPOSIT_STATUS_DEPOSITED)
self.assertEqual(deposit.client, self.user)
self.assertEqual(len(Deposit.objects.all()), 1)
# now 2 associated requests to a same deposit
deposit_requests = DepositRequest.objects.filter(
deposit=deposit).order_by('id')
self.assertEqual(len(deposit_requests), 2)
expected_meta = [
{
'metadata': parse_xml(self.atom_entry_data1),
'raw_metadata': self.atom_entry_data1.decode('utf-8'),
},
{
'metadata': parse_xml(atom_entry_data),
'raw_metadata': atom_entry_data.decode('utf-8'),
}
]
for i, deposit_request in enumerate(deposit_requests):
actual_metadata = deposit_request.metadata
- self.assertEquals(actual_metadata,
- expected_meta[i]['metadata'])
- self.assertEquals(deposit_request.raw_metadata,
- expected_meta[i]['raw_metadata'])
+ self.assertEqual(actual_metadata,
+ expected_meta[i]['metadata'])
+ self.assertEqual(deposit_request.raw_metadata,
+ expected_meta[i]['raw_metadata'])
self.assertFalse(bool(deposit_request.archive))
diff --git a/swh/deposit/tests/api/test_deposit_binary.py b/swh/deposit/tests/api/test_deposit_binary.py
index 6bf7f0ff..f2549e7a 100644
--- a/swh/deposit/tests/api/test_deposit_binary.py
+++ b/swh/deposit/tests/api/test_deposit_binary.py
@@ -1,645 +1,645 @@
# Copyright (C) 2017-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from django.core.files.uploadedfile import InMemoryUploadedFile
from django.core.urlresolvers import reverse
from io import BytesIO
from rest_framework import status
from rest_framework.test import APITestCase
from swh.deposit.tests import TEST_CONFIG
from swh.deposit.config import COL_IRI, EM_IRI
from swh.deposit.config import DEPOSIT_STATUS_DEPOSITED
from swh.deposit.models import Deposit, DepositRequest
from swh.deposit.parsers import parse_xml
from ..common import (
BasicTestCase, WithAuthTestCase, create_arborescence_archive,
FileSystemCreationRoutine
)
class DepositTestCase(APITestCase, WithAuthTestCase, BasicTestCase,
FileSystemCreationRoutine):
"""Try and upload one single deposit
"""
def setUp(self):
super().setUp()
self.atom_entry_data0 = b"""
Awesome Compiler
hal
urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a
%s
2017-10-07T15:17:08Z
some awesome author
something
awesome-compiler
This is an awesome compiler destined to
awesomely compile stuff
and other stuff
compiler,programming,language
2005-10-07T17:17:08Z
2005-10-07T17:17:08Z
release note
related link
Awesome
https://hoster.org/awesome-compiler
GNU/Linux
0.0.1
running
all
"""
self.atom_entry_data1 = b"""
hal
urn:uuid:2225c695-cfb8-4ebb-aaaa-80da344efa6a
2017-10-07T15:17:08Z
some awesome author
something
awesome-compiler
This is an awesome compiler destined to
awesomely compile stuff
and other stuff
compiler,programming,language
2005-10-07T17:17:08Z
2005-10-07T17:17:08Z
release note
related link
Awesome
https://hoster.org/awesome-compiler
GNU/Linux
0.0.1
running
all
"""
self.atom_entry_data2 = b"""
%s
"""
self.atom_entry_data_empty_body = b"""
"""
self.atom_entry_data3 = b"""
something
"""
self.data_atom_entry_ok = b"""
Title
urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a
2005-10-07T17:17:08Z
Contributor
The abstract
The abstract
Access Rights
Alternative Title
Date Available
Bibliographic Citation # noqa
Contributor
Description
Has Part
Has Version
Identifier
Is Part Of
Publisher
References
Rights Holder
Source
Title
Type
"""
def test_post_deposit_binary_without_slug_header_is_bad_request(self):
"""Posting a binary deposit without slug header should return 400
"""
url = reverse(COL_IRI, args=[self.collection.name])
# when
response = self.client.post(
url,
content_type='application/zip', # as zip
data=self.archive['data'],
# + headers
CONTENT_LENGTH=self.archive['length'],
HTTP_CONTENT_MD5=self.archive['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
HTTP_CONTENT_DISPOSITION='attachment; filename=filename0')
self.assertIn(b'Missing SLUG header', response.content)
self.assertEqual(response.status_code,
status.HTTP_400_BAD_REQUEST)
def test_post_deposit_binary_upload_final_and_status_check(self):
"""Binary upload with correct headers should return 201 with receipt
"""
# given
url = reverse(COL_IRI, args=[self.collection.name])
external_id = 'some-external-id-1'
# when
response = self.client.post(
url,
content_type='application/zip', # as zip
data=self.archive['data'],
# + headers
CONTENT_LENGTH=self.archive['length'],
# other headers needs HTTP_ prefix to be taken into account
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=self.archive['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % (
self.archive['name'], ))
# then
response_content = parse_xml(BytesIO(response.content))
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
deposit_id = response_content['deposit_id']
deposit = Deposit.objects.get(pk=deposit_id)
self.assertEqual(deposit.status, DEPOSIT_STATUS_DEPOSITED)
self.assertEqual(deposit.external_id, external_id)
self.assertEqual(deposit.collection, self.collection)
self.assertEqual(deposit.client, self.user)
self.assertIsNone(deposit.swh_id)
deposit_request = DepositRequest.objects.get(deposit=deposit)
- self.assertEquals(deposit_request.deposit, deposit)
+ self.assertEqual(deposit_request.deposit, deposit)
self.assertRegex(deposit_request.archive.name, self.archive['name'])
self.assertIsNone(deposit_request.metadata)
self.assertIsNone(deposit_request.raw_metadata)
response_content = parse_xml(BytesIO(response.content))
self.assertEqual(response_content['deposit_archive'],
self.archive['name'])
self.assertEqual(int(response_content['deposit_id']),
deposit.id)
self.assertEqual(response_content['deposit_status'],
deposit.status)
edit_se_iri = reverse('edit_se_iri',
args=[self.collection.name, deposit.id])
self.assertEqual(response._headers['location'],
('Location', 'http://testserver' + edit_se_iri))
def test_post_deposit_binary_upload_supports_zip_or_tar(self):
"""Binary upload with content-type not in [zip,x-tar] should return 415
"""
# given
url = reverse(COL_IRI, args=[self.collection.name])
external_id = 'some-external-id-1'
# when
response = self.client.post(
url,
content_type='application/octet-stream',
data=self.archive['data'],
# + headers
CONTENT_LENGTH=self.archive['length'],
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=self.archive['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
HTTP_CONTENT_DISPOSITION='attachment; filename=filename0')
# then
self.assertEqual(response.status_code,
status.HTTP_415_UNSUPPORTED_MEDIA_TYPE)
with self.assertRaises(Deposit.DoesNotExist):
Deposit.objects.get(external_id=external_id)
def test_post_deposit_binary_fails_if_unsupported_packaging_header(
self):
"""Bin deposit without supported content_disposition header returns 400
"""
# given
url = reverse(COL_IRI, args=[self.collection.name])
external_id = 'some-external-id'
# when
response = self.client.post(
url,
content_type='application/zip',
data=self.archive['data'],
# + headers
CONTENT_LENGTH=self.archive['length'],
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=self.archive['md5sum'],
HTTP_PACKAGING='something-unsupported',
HTTP_CONTENT_DISPOSITION='attachment; filename=filename0')
# then
self.assertEqual(response.status_code,
status.HTTP_400_BAD_REQUEST)
with self.assertRaises(Deposit.DoesNotExist):
Deposit.objects.get(external_id=external_id)
def test_post_deposit_binary_upload_fail_if_no_content_disposition_header(
self):
"""Binary upload without content_disposition header should return 400
"""
# given
url = reverse(COL_IRI, args=[self.collection.name])
external_id = 'some-external-id'
# when
response = self.client.post(
url,
content_type='application/zip',
data=self.archive['data'],
# + headers
CONTENT_LENGTH=self.archive['length'],
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=self.archive['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false')
# then
self.assertEqual(response.status_code,
status.HTTP_400_BAD_REQUEST)
with self.assertRaises(Deposit.DoesNotExist):
Deposit.objects.get(external_id=external_id)
def test_post_deposit_mediation_not_supported(self):
"""Binary upload with mediation should return a 412 response
"""
# given
url = reverse(COL_IRI, args=[self.collection.name])
external_id = 'some-external-id-1'
# when
response = self.client.post(
url,
content_type='application/zip',
data=self.archive['data'],
# + headers
CONTENT_LENGTH=self.archive['length'],
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=self.archive['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
HTTP_ON_BEHALF_OF='someone',
HTTP_CONTENT_DISPOSITION='attachment; filename=filename0')
# then
self.assertEqual(response.status_code,
status.HTTP_412_PRECONDITION_FAILED)
with self.assertRaises(Deposit.DoesNotExist):
Deposit.objects.get(external_id=external_id)
def test_post_deposit_binary_upload_fail_if_upload_size_limit_exceeded(
self):
"""Binary upload must not exceed the limit set up...
"""
# given
url = reverse(COL_IRI, args=[self.collection.name])
archive = create_arborescence_archive(
self.root_path, 'archive2', 'file2', b'some content in file',
up_to_size=TEST_CONFIG['max_upload_size'])
external_id = 'some-external-id'
# when
response = self.client.post(
url,
content_type='application/zip',
data=archive['data'],
# + headers
CONTENT_LENGTH=archive['length'],
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=archive['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
HTTP_CONTENT_DISPOSITION='attachment; filename=filename0')
# then
self.assertEqual(response.status_code,
status.HTTP_413_REQUEST_ENTITY_TOO_LARGE)
self.assertRegex(response.content, b'Upload size limit exceeded')
with self.assertRaises(Deposit.DoesNotExist):
Deposit.objects.get(external_id=external_id)
def test_post_deposit_2_post_2_different_deposits(self):
"""2 posting deposits should return 2 different 201 with receipt
"""
url = reverse(COL_IRI, args=[self.collection.name])
# when
response = self.client.post(
url,
content_type='application/zip', # as zip
data=self.archive['data'],
# + headers
CONTENT_LENGTH=self.archive['length'],
HTTP_SLUG='some-external-id-1',
HTTP_CONTENT_MD5=self.archive['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
HTTP_CONTENT_DISPOSITION='attachment; filename=filename0')
# then
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
response_content = parse_xml(BytesIO(response.content))
deposit_id = response_content['deposit_id']
deposit = Deposit.objects.get(pk=deposit_id)
deposits = Deposit.objects.all()
self.assertEqual(len(deposits), 1)
self.assertEqual(deposits[0], deposit)
# second post
response = self.client.post(
url,
content_type='application/x-tar', # as zip
data=self.archive['data'],
# + headers
CONTENT_LENGTH=self.archive['length'],
HTTP_SLUG='another-external-id',
HTTP_CONTENT_MD5=self.archive['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
HTTP_CONTENT_DISPOSITION='attachment; filename=filename1')
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
response_content = parse_xml(BytesIO(response.content))
deposit_id2 = response_content['deposit_id']
deposit2 = Deposit.objects.get(pk=deposit_id2)
self.assertNotEqual(deposit, deposit2)
deposits = Deposit.objects.all().order_by('id')
self.assertEqual(len(deposits), 2)
self.assertEqual(list(deposits), [deposit, deposit2])
def test_post_deposit_binary_and_post_to_add_another_archive(self):
"""Updating a deposit should return a 201 with receipt
"""
# given
url = reverse(COL_IRI, args=[self.collection.name])
external_id = 'some-external-id-1'
# when
response = self.client.post(
url,
content_type='application/zip', # as zip
data=self.archive['data'],
# + headers
CONTENT_LENGTH=self.archive['length'],
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=self.archive['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='true',
HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % (
self.archive['name'], ))
# then
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
response_content = parse_xml(BytesIO(response.content))
deposit_id = response_content['deposit_id']
deposit = Deposit.objects.get(pk=deposit_id)
self.assertEqual(deposit.status, 'partial')
self.assertEqual(deposit.external_id, external_id)
self.assertEqual(deposit.collection, self.collection)
self.assertEqual(deposit.client, self.user)
self.assertIsNone(deposit.swh_id)
deposit_request = DepositRequest.objects.get(deposit=deposit)
- self.assertEquals(deposit_request.deposit, deposit)
- self.assertEquals(deposit_request.type.name, 'archive')
+ self.assertEqual(deposit_request.deposit, deposit)
+ self.assertEqual(deposit_request.type.name, 'archive')
self.assertRegex(deposit_request.archive.name, self.archive['name'])
# 2nd archive to upload
archive2 = create_arborescence_archive(
self.root_path, 'archive2', 'file2', b'some other content in file')
# uri to update the content
update_uri = reverse(EM_IRI, args=[self.collection.name, deposit_id])
# adding another archive for the deposit and finalizing it
response = self.client.post(
update_uri,
content_type='application/zip', # as zip
data=archive2['data'],
# + headers
CONTENT_LENGTH=archive2['length'],
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=archive2['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % (
archive2['name']))
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
response_content = parse_xml(BytesIO(response.content))
deposit = Deposit.objects.get(pk=deposit_id)
self.assertEqual(deposit.status, DEPOSIT_STATUS_DEPOSITED)
self.assertEqual(deposit.external_id, external_id)
self.assertEqual(deposit.collection, self.collection)
self.assertEqual(deposit.client, self.user)
self.assertIsNone(deposit.swh_id)
deposit_requests = list(DepositRequest.objects.filter(deposit=deposit).
order_by('id'))
# 2 deposit requests for the same deposit
- self.assertEquals(len(deposit_requests), 2)
- self.assertEquals(deposit_requests[0].deposit, deposit)
- self.assertEquals(deposit_requests[0].type.name, 'archive')
+ self.assertEqual(len(deposit_requests), 2)
+ self.assertEqual(deposit_requests[0].deposit, deposit)
+ self.assertEqual(deposit_requests[0].type.name, 'archive')
self.assertRegex(deposit_requests[0].archive.name,
self.archive['name'])
- self.assertEquals(deposit_requests[1].deposit, deposit)
- self.assertEquals(deposit_requests[1].type.name, 'archive')
+ self.assertEqual(deposit_requests[1].deposit, deposit)
+ self.assertEqual(deposit_requests[1].type.name, 'archive')
self.assertRegex(deposit_requests[1].archive.name,
archive2['name'])
# only 1 deposit in db
deposits = Deposit.objects.all()
self.assertEqual(len(deposits), 1)
def test_post_deposit_then_post_or_put_is_refused_when_status_ready(self):
"""Updating a deposit with status 'ready' should return a 400
"""
url = reverse(COL_IRI, args=[self.collection.name])
external_id = 'some-external-id-1'
# when
response = self.client.post(
url,
content_type='application/zip', # as zip
data=self.archive['data'],
# + headers
CONTENT_LENGTH=self.archive['length'],
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=self.archive['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
HTTP_CONTENT_DISPOSITION='attachment; filename=filename0')
# then
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
response_content = parse_xml(BytesIO(response.content))
deposit_id = response_content['deposit_id']
deposit = Deposit.objects.get(pk=deposit_id)
self.assertEqual(deposit.status, DEPOSIT_STATUS_DEPOSITED)
self.assertEqual(deposit.external_id, external_id)
self.assertEqual(deposit.collection, self.collection)
self.assertEqual(deposit.client, self.user)
self.assertIsNone(deposit.swh_id)
deposit_request = DepositRequest.objects.get(deposit=deposit)
- self.assertEquals(deposit_request.deposit, deposit)
+ self.assertEqual(deposit_request.deposit, deposit)
self.assertRegex(deposit_request.archive.name, 'filename0')
# updating/adding is forbidden
# uri to update the content
edit_se_iri = reverse(
'edit_se_iri', args=[self.collection.name, deposit_id])
em_iri = reverse(
'em_iri', args=[self.collection.name, deposit_id])
# Testing all update/add endpoint should fail
# since the status is ready
archive2 = create_arborescence_archive(
self.root_path, 'archive2', 'file2', b'some content in file 2')
# replacing file is no longer possible since the deposit's
# status is ready
r = self.client.put(
em_iri,
content_type='application/zip',
data=archive2['data'],
CONTENT_LENGTH=archive2['length'],
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=archive2['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
HTTP_CONTENT_DISPOSITION='attachment; filename=filename0')
- self.assertEquals(r.status_code, status.HTTP_400_BAD_REQUEST)
+ self.assertEqual(r.status_code, status.HTTP_400_BAD_REQUEST)
# adding file is no longer possible since the deposit's status
# is ready
r = self.client.post(
em_iri,
content_type='application/zip',
data=archive2['data'],
CONTENT_LENGTH=archive2['length'],
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=archive2['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
HTTP_CONTENT_DISPOSITION='attachment; filename=filename0')
- self.assertEquals(r.status_code, status.HTTP_400_BAD_REQUEST)
+ self.assertEqual(r.status_code, status.HTTP_400_BAD_REQUEST)
# replacing metadata is no longer possible since the deposit's
# status is ready
r = self.client.put(
edit_se_iri,
content_type='application/atom+xml;type=entry',
data=self.data_atom_entry_ok,
CONTENT_LENGTH=len(self.data_atom_entry_ok),
HTTP_SLUG=external_id)
- self.assertEquals(r.status_code, status.HTTP_400_BAD_REQUEST)
+ self.assertEqual(r.status_code, status.HTTP_400_BAD_REQUEST)
# adding new metadata is no longer possible since the
# deposit's status is ready
r = self.client.post(
edit_se_iri,
content_type='application/atom+xml;type=entry',
data=self.data_atom_entry_ok,
CONTENT_LENGTH=len(self.data_atom_entry_ok),
HTTP_SLUG=external_id)
- self.assertEquals(r.status_code, status.HTTP_400_BAD_REQUEST)
+ self.assertEqual(r.status_code, status.HTTP_400_BAD_REQUEST)
archive_content = b'some content representing archive'
archive = InMemoryUploadedFile(
BytesIO(archive_content),
field_name='archive0',
name='archive0',
content_type='application/zip',
size=len(archive_content),
charset=None)
atom_entry = InMemoryUploadedFile(
BytesIO(self.data_atom_entry_ok),
field_name='atom0',
name='atom0',
content_type='application/atom+xml; charset="utf-8"',
size=len(self.data_atom_entry_ok),
charset='utf-8')
# replacing multipart metadata is no longer possible since the
# deposit's status is ready
r = self.client.put(
edit_se_iri,
format='multipart',
data={
'archive': archive,
'atom_entry': atom_entry,
})
- self.assertEquals(r.status_code, status.HTTP_400_BAD_REQUEST)
+ self.assertEqual(r.status_code, status.HTTP_400_BAD_REQUEST)
# adding new metadata is no longer possible since the
# deposit's status is ready
r = self.client.post(
edit_se_iri,
format='multipart',
data={
'archive': archive,
'atom_entry': atom_entry,
})
- self.assertEquals(r.status_code, status.HTTP_400_BAD_REQUEST)
+ self.assertEqual(r.status_code, status.HTTP_400_BAD_REQUEST)
diff --git a/swh/deposit/tests/api/test_deposit_check.py b/swh/deposit/tests/api/test_deposit_check.py
index 019ded0c..1b9c5d2d 100644
--- a/swh/deposit/tests/api/test_deposit_check.py
+++ b/swh/deposit/tests/api/test_deposit_check.py
@@ -1,236 +1,236 @@
# Copyright (C) 2017-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import unittest
from django.core.urlresolvers import reverse
import pytest
from rest_framework import status
from rest_framework.test import APITestCase
from swh.deposit.config import (
DEPOSIT_STATUS_VERIFIED, PRIVATE_CHECK_DEPOSIT,
DEPOSIT_STATUS_DEPOSITED, DEPOSIT_STATUS_REJECTED
)
from swh.deposit.api.private.deposit_check import (
SWHChecksDeposit, MANDATORY_ARCHIVE_INVALID,
MANDATORY_FIELDS_MISSING, INCOMPATIBLE_URL_FIELDS,
MANDATORY_ARCHIVE_UNSUPPORTED, ALTERNATE_FIELDS_MISSING,
MANDATORY_ARCHIVE_MISSING
)
from swh.deposit.models import Deposit
from ..common import BasicTestCase, WithAuthTestCase, CommonCreationRoutine
from ..common import FileSystemCreationRoutine
@pytest.mark.fs
class CheckDepositTest(APITestCase, WithAuthTestCase,
BasicTestCase, CommonCreationRoutine,
FileSystemCreationRoutine):
"""Check deposit endpoints.
"""
def setUp(self):
super().setUp()
def test_deposit_ok(self):
"""Proper deposit should succeed the checks (-> status ready)
"""
deposit_id = self.create_simple_binary_deposit(status_partial=True)
deposit_id = self.update_binary_deposit(deposit_id,
status_partial=False)
deposit = Deposit.objects.get(pk=deposit_id)
- self.assertEquals(deposit.status, DEPOSIT_STATUS_DEPOSITED)
+ self.assertEqual(deposit.status, DEPOSIT_STATUS_DEPOSITED)
url = reverse(PRIVATE_CHECK_DEPOSIT,
args=[self.collection.name, deposit.id])
response = self.client.get(url)
self.assertEqual(response.status_code, status.HTTP_200_OK)
data = response.json()
self.assertEqual(data['status'], DEPOSIT_STATUS_VERIFIED)
deposit = Deposit.objects.get(pk=deposit.id)
- self.assertEquals(deposit.status, DEPOSIT_STATUS_VERIFIED)
+ self.assertEqual(deposit.status, DEPOSIT_STATUS_VERIFIED)
def test_deposit_invalid_tarball(self):
"""Deposit with tarball (of 1 tarball) should fail the checks: rejected
"""
for archive_extension in ['zip', 'tar', 'tar.gz', 'tar.bz2', 'tar.xz']:
deposit_id = self.create_deposit_archive_with_archive(
archive_extension)
deposit = Deposit.objects.get(pk=deposit_id)
- self.assertEquals(DEPOSIT_STATUS_DEPOSITED, deposit.status)
+ self.assertEqual(DEPOSIT_STATUS_DEPOSITED, deposit.status)
url = reverse(PRIVATE_CHECK_DEPOSIT,
args=[self.collection.name, deposit.id])
response = self.client.get(url)
self.assertEqual(response.status_code, status.HTTP_200_OK)
data = response.json()
self.assertEqual(data['status'], DEPOSIT_STATUS_REJECTED)
details = data['details']
# archive checks failure
self.assertEqual(len(details['archive']), 1)
self.assertEqual(details['archive'][0]['summary'],
MANDATORY_ARCHIVE_INVALID)
deposit = Deposit.objects.get(pk=deposit.id)
- self.assertEquals(deposit.status, DEPOSIT_STATUS_REJECTED)
+ self.assertEqual(deposit.status, DEPOSIT_STATUS_REJECTED)
def test_deposit_ko_missing_tarball(self):
"""Deposit without archive should fail the checks: rejected
"""
deposit_id = self.create_deposit_ready() # no archive, only atom
deposit = Deposit.objects.get(pk=deposit_id)
- self.assertEquals(DEPOSIT_STATUS_DEPOSITED, deposit.status)
+ self.assertEqual(DEPOSIT_STATUS_DEPOSITED, deposit.status)
url = reverse(PRIVATE_CHECK_DEPOSIT,
args=[self.collection.name, deposit.id])
response = self.client.get(url)
self.assertEqual(response.status_code, status.HTTP_200_OK)
data = response.json()
self.assertEqual(data['status'], DEPOSIT_STATUS_REJECTED)
details = data['details']
# archive checks failure
self.assertEqual(len(details['archive']), 1)
self.assertEqual(details['archive'][0]['summary'],
MANDATORY_ARCHIVE_MISSING)
deposit = Deposit.objects.get(pk=deposit.id)
- self.assertEquals(deposit.status, DEPOSIT_STATUS_REJECTED)
+ self.assertEqual(deposit.status, DEPOSIT_STATUS_REJECTED)
def test_deposit_ko_unsupported_tarball(self):
"""Deposit with an unsupported tarball should fail the checks: rejected
"""
deposit_id = self.create_deposit_with_invalid_archive()
deposit = Deposit.objects.get(pk=deposit_id)
- self.assertEquals(DEPOSIT_STATUS_DEPOSITED, deposit.status)
+ self.assertEqual(DEPOSIT_STATUS_DEPOSITED, deposit.status)
url = reverse(PRIVATE_CHECK_DEPOSIT,
args=[self.collection.name, deposit.id])
response = self.client.get(url)
self.assertEqual(response.status_code, status.HTTP_200_OK)
data = response.json()
self.assertEqual(data['status'], DEPOSIT_STATUS_REJECTED)
details = data['details']
# archive checks failure
self.assertEqual(len(details['archive']), 1)
self.assertEqual(details['archive'][0]['summary'],
MANDATORY_ARCHIVE_UNSUPPORTED)
# metadata check failure
self.assertEqual(len(details['metadata']), 2)
mandatory = details['metadata'][0]
self.assertEqual(mandatory['summary'], MANDATORY_FIELDS_MISSING)
self.assertEqual(set(mandatory['fields']),
set(['url', 'external_identifier', 'author']))
alternate = details['metadata'][1]
self.assertEqual(alternate['summary'], ALTERNATE_FIELDS_MISSING)
self.assertEqual(alternate['fields'], ['name or title'])
# url check failure
self.assertEqual(details['url']['summary'], INCOMPATIBLE_URL_FIELDS)
deposit = Deposit.objects.get(pk=deposit.id)
- self.assertEquals(deposit.status, DEPOSIT_STATUS_REJECTED)
+ self.assertEqual(deposit.status, DEPOSIT_STATUS_REJECTED)
def test_check_deposit_metadata_ok(self):
"""Proper deposit should succeed the checks (-> status ready)
with all **MUST** metadata
using the codemeta metadata test set
"""
deposit_id = self.create_simple_binary_deposit(status_partial=True)
deposit_id_metadata = self.add_metadata_to_deposit(deposit_id)
- self.assertEquals(deposit_id, deposit_id_metadata)
+ self.assertEqual(deposit_id, deposit_id_metadata)
deposit = Deposit.objects.get(pk=deposit_id)
- self.assertEquals(deposit.status, DEPOSIT_STATUS_DEPOSITED)
+ self.assertEqual(deposit.status, DEPOSIT_STATUS_DEPOSITED)
url = reverse(PRIVATE_CHECK_DEPOSIT,
args=[self.collection.name, deposit.id])
response = self.client.get(url)
self.assertEqual(response.status_code, status.HTTP_200_OK)
data = response.json()
self.assertEqual(data['status'], DEPOSIT_STATUS_VERIFIED)
deposit = Deposit.objects.get(pk=deposit.id)
- self.assertEquals(deposit.status, DEPOSIT_STATUS_VERIFIED)
+ self.assertEqual(deposit.status, DEPOSIT_STATUS_VERIFIED)
class CheckMetadata(unittest.TestCase, SWHChecksDeposit):
def test_check_metadata_ok(self):
actual_check, detail = self._check_metadata({
'url': 'something',
'external_identifier': 'something-else',
'name': 'foo',
'author': 'someone',
})
self.assertTrue(actual_check)
self.assertIsNone(detail)
def test_check_metadata_ok2(self):
actual_check, detail = self._check_metadata({
'url': 'something',
'external_identifier': 'something-else',
'title': 'bar',
'author': 'someone',
})
self.assertTrue(actual_check)
self.assertIsNone(detail)
def test_check_metadata_ko(self):
"""Missing optional field should be caught
"""
actual_check, error_detail = self._check_metadata({
'url': 'something',
'external_identifier': 'something-else',
'author': 'someone',
})
expected_error = {
'metadata': [{
'summary': 'Mandatory alternate fields are missing',
'fields': ['name or title'],
}]
}
self.assertFalse(actual_check)
self.assertEqual(error_detail, expected_error)
def test_check_metadata_ko2(self):
"""Missing mandatory fields should be caught
"""
actual_check, error_detail = self._check_metadata({
'url': 'something',
'external_identifier': 'something-else',
'title': 'foobar',
})
expected_error = {
'metadata': [{
'summary': 'Mandatory fields are missing',
'fields': ['author'],
}]
}
self.assertFalse(actual_check)
self.assertEqual(error_detail, expected_error)
diff --git a/swh/deposit/tests/api/test_deposit_delete.py b/swh/deposit/tests/api/test_deposit_delete.py
index 409d76bf..9bf963cc 100644
--- a/swh/deposit/tests/api/test_deposit_delete.py
+++ b/swh/deposit/tests/api/test_deposit_delete.py
@@ -1,113 +1,113 @@
# Copyright (C) 2017-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from django.core.urlresolvers import reverse
from rest_framework import status
from rest_framework.test import APITestCase
from swh.deposit.config import EDIT_SE_IRI, EM_IRI, ARCHIVE_KEY, METADATA_KEY
from swh.deposit.config import DEPOSIT_STATUS_DEPOSITED
from swh.deposit.models import Deposit, DepositRequest
from ..common import BasicTestCase, WithAuthTestCase, CommonCreationRoutine
class DepositDeleteTest(APITestCase, WithAuthTestCase, BasicTestCase,
CommonCreationRoutine):
def test_delete_archive_on_partial_deposit_works(self):
"""Removing partial deposit's archive should return a 204 response
"""
# given
deposit_id = self.create_deposit_partial()
deposit = Deposit.objects.get(pk=deposit_id)
deposit_requests = DepositRequest.objects.filter(deposit=deposit)
- self.assertEquals(len(deposit_requests), 2)
+ self.assertEqual(len(deposit_requests), 2)
for dr in deposit_requests:
if dr.type.name == ARCHIVE_KEY:
continue
elif dr.type.name == METADATA_KEY:
continue
else:
self.fail('only archive and metadata type should exist '
'in this test context')
# when
update_uri = reverse(EM_IRI, args=[self.collection.name, deposit_id])
response = self.client.delete(update_uri)
# then
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
deposit = Deposit.objects.get(pk=deposit_id)
requests = list(DepositRequest.objects.filter(deposit=deposit))
- self.assertEquals(len(requests), 2)
- self.assertEquals(requests[0].type.name, 'metadata')
- self.assertEquals(requests[1].type.name, 'metadata')
+ self.assertEqual(len(requests), 2)
+ self.assertEqual(requests[0].type.name, 'metadata')
+ self.assertEqual(requests[1].type.name, 'metadata')
def test_delete_archive_on_undefined_deposit_fails(self):
"""Delete undefined deposit returns a 404 response
"""
# when
update_uri = reverse(EM_IRI, args=[self.collection.name, 999])
response = self.client.delete(update_uri)
# then
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
def test_delete_archive_on_non_partial_deposit_fails(self):
"""Delete !partial status deposit should return a 400 response"""
deposit_id = self.create_deposit_ready()
deposit = Deposit.objects.get(pk=deposit_id)
- self.assertEquals(deposit.status, DEPOSIT_STATUS_DEPOSITED)
+ self.assertEqual(deposit.status, DEPOSIT_STATUS_DEPOSITED)
# when
update_uri = reverse(EM_IRI, args=[self.collection.name, deposit_id])
response = self.client.delete(update_uri)
# then
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
deposit = Deposit.objects.get(pk=deposit_id)
self.assertIsNotNone(deposit)
def test_delete_partial_deposit_works(self):
"""Delete deposit should return a 204 response
"""
# given
deposit_id = self.create_simple_deposit_partial()
deposit = Deposit.objects.get(pk=deposit_id)
assert deposit.id == deposit_id
# when
url = reverse(EDIT_SE_IRI, args=[self.collection.name, deposit_id])
response = self.client.delete(url)
# then
self.assertEqual(response.status_code,
status.HTTP_204_NO_CONTENT)
deposit_requests = list(DepositRequest.objects.filter(deposit=deposit))
- self.assertEquals(deposit_requests, [])
+ self.assertEqual(deposit_requests, [])
deposits = list(Deposit.objects.filter(pk=deposit_id))
- self.assertEquals(deposits, [])
+ self.assertEqual(deposits, [])
def test_delete_on_edit_se_iri_cannot_delete_non_partial_deposit(self):
"""Delete !partial deposit should return a 400 response
"""
# given
deposit_id = self.create_deposit_ready()
deposit = Deposit.objects.get(pk=deposit_id)
assert deposit.id == deposit_id
# when
url = reverse(EDIT_SE_IRI, args=[self.collection.name, deposit_id])
response = self.client.delete(url)
# then
self.assertEqual(response.status_code,
status.HTTP_400_BAD_REQUEST)
deposit = Deposit.objects.get(pk=deposit_id)
self.assertIsNotNone(deposit)
diff --git a/swh/deposit/tests/api/test_deposit_list.py b/swh/deposit/tests/api/test_deposit_list.py
index 9e7fc587..6fb84349 100644
--- a/swh/deposit/tests/api/test_deposit_list.py
+++ b/swh/deposit/tests/api/test_deposit_list.py
@@ -1,94 +1,94 @@
# Copyright (C) 2017-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from django.core.urlresolvers import reverse
import pytest
from rest_framework import status
from rest_framework.test import APITestCase
from swh.deposit.api.converters import convert_status_detail
from ...config import DEPOSIT_STATUS_PARTIAL, PRIVATE_LIST_DEPOSITS
from ..common import BasicTestCase, WithAuthTestCase, CommonCreationRoutine
from ...models import Deposit
@pytest.mark.fs
class CheckDepositListTest(APITestCase, WithAuthTestCase,
BasicTestCase, CommonCreationRoutine):
"""Check deposit list endpoints.
"""
def setUp(self):
super().setUp()
def test_deposit_list(self):
"""Deposit list api should return the deposits
"""
deposit_id = self.create_deposit_partial()
# amend the deposit with a status_detail
deposit = Deposit.objects.get(pk=deposit_id)
status_detail = {
'url': {
'summary': 'At least one compatible url field. Failed',
'fields': ['testurl'],
},
'metadata': [
{
'summary': 'Mandatory fields missing',
'fields': ['9', 10, 1.212],
},
],
'archive': [
{
'summary': 'Invalid archive',
'fields': ['3'],
},
{
'summary': 'Unsupported archive',
'fields': [2],
}
],
}
deposit.status_detail = status_detail
deposit.save()
deposit_id2 = self.create_deposit_partial()
# NOTE: does not work as documented
# https://docs.djangoproject.com/en/1.11/ref/urlresolvers/#django.core.urlresolvers.reverse # noqa
# url = reverse(PRIVATE_LIST_DEPOSITS, kwargs={'page_size': 1})
main_url = reverse(PRIVATE_LIST_DEPOSITS)
url = '%s?page_size=1' % main_url
response = self.client.get(url)
self.assertEqual(response.status_code, status.HTTP_200_OK)
data = response.json()
self.assertEqual(data['count'], 2) # 2 deposits
expected_next = '%s?page=2&page_size=1' % main_url
self.assertTrue(data['next'].endswith(expected_next))
self.assertIsNone(data['previous'])
self.assertEqual(len(data['results']), 1) # page of size 1
deposit = data['results'][0]
- self.assertEquals(deposit['id'], deposit_id)
- self.assertEquals(deposit['status'], DEPOSIT_STATUS_PARTIAL)
+ self.assertEqual(deposit['id'], deposit_id)
+ self.assertEqual(deposit['status'], DEPOSIT_STATUS_PARTIAL)
expected_status_detail = convert_status_detail(status_detail)
- self.assertEquals(deposit['status_detail'], expected_status_detail)
+ self.assertEqual(deposit['status_detail'], expected_status_detail)
# then 2nd page
response2 = self.client.get(expected_next)
self.assertEqual(response2.status_code, status.HTTP_200_OK)
data2 = response2.json()
self.assertEqual(data2['count'], 2) # still 2 deposits
self.assertIsNone(data2['next'])
expected_previous = '%s?page_size=1' % main_url
self.assertTrue(data2['previous'].endswith(expected_previous))
self.assertEqual(len(data2['results']), 1) # page of size 1
deposit2 = data2['results'][0]
- self.assertEquals(deposit2['id'], deposit_id2)
- self.assertEquals(deposit2['status'], DEPOSIT_STATUS_PARTIAL)
+ self.assertEqual(deposit2['id'], deposit_id2)
+ self.assertEqual(deposit2['status'], DEPOSIT_STATUS_PARTIAL)
diff --git a/swh/deposit/tests/api/test_deposit_multipart.py b/swh/deposit/tests/api/test_deposit_multipart.py
index b7339b4a..4bd37b6a 100644
--- a/swh/deposit/tests/api/test_deposit_multipart.py
+++ b/swh/deposit/tests/api/test_deposit_multipart.py
@@ -1,402 +1,402 @@
# Copyright (C) 2017-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from django.core.files.uploadedfile import InMemoryUploadedFile
from django.core.urlresolvers import reverse
from io import BytesIO
from rest_framework import status
from rest_framework.test import APITestCase
from swh.deposit.config import COL_IRI
from swh.deposit.config import DEPOSIT_STATUS_DEPOSITED
from swh.deposit.models import Deposit, DepositRequest
from swh.deposit.parsers import parse_xml
from ..common import BasicTestCase, WithAuthTestCase
from ..common import FileSystemCreationRoutine
class DepositMultipartTestCase(APITestCase, WithAuthTestCase, BasicTestCase,
FileSystemCreationRoutine):
"""Post multipart deposit scenario
"""
def setUp(self):
super().setUp()
self.data_atom_entry_ok = b"""
Title
urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a
2005-10-07T17:17:08Z
Contributor
The abstract
The abstract
Access Rights
Alternative Title
Date Available
Bibliographic Citation # noqa
Contributor
Description
Has Part
Has Version
Identifier
Is Part Of
Publisher
References
Rights Holder
Source
Title
Type
"""
self.data_atom_entry_update_in_place = """
urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa7b
Title
Type
"""
def test_post_deposit_multipart_without_slug_header_is_bad_request(self):
# given
url = reverse(COL_IRI, args=[self.collection.name])
data_atom_entry = self.data_atom_entry_ok
archive_content = b'some content representing archive'
archive = InMemoryUploadedFile(
BytesIO(archive_content),
field_name='archive0',
name='archive0',
content_type='application/zip',
size=len(archive_content),
charset=None)
atom_entry = InMemoryUploadedFile(
BytesIO(data_atom_entry),
field_name='atom0',
name='atom0',
content_type='application/atom+xml; charset="utf-8"',
size=len(data_atom_entry),
charset='utf-8')
# when
response = self.client.post(
url,
format='multipart',
data={
'archive': archive,
'atom_entry': atom_entry,
},
# + headers
HTTP_IN_PROGRESS='false')
self.assertIn(b'Missing SLUG header', response.content)
self.assertEqual(response.status_code,
status.HTTP_400_BAD_REQUEST)
def test_post_deposit_multipart_zip(self):
"""one multipart deposit (zip+xml) should be accepted
"""
# given
url = reverse(COL_IRI, args=[self.collection.name])
# from django.core.files import uploadedfile
data_atom_entry = self.data_atom_entry_ok
archive = InMemoryUploadedFile(
BytesIO(self.archive['data']),
field_name=self.archive['name'],
name=self.archive['name'],
content_type='application/zip',
size=self.archive['length'],
charset=None)
atom_entry = InMemoryUploadedFile(
BytesIO(data_atom_entry),
field_name='atom0',
name='atom0',
content_type='application/atom+xml; charset="utf-8"',
size=len(data_atom_entry),
charset='utf-8')
external_id = 'external-id'
# when
response = self.client.post(
url,
format='multipart',
data={
'archive': archive,
'atom_entry': atom_entry,
},
# + headers
HTTP_IN_PROGRESS='false',
HTTP_SLUG=external_id)
# then
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
response_content = parse_xml(BytesIO(response.content))
deposit_id = response_content['deposit_id']
deposit = Deposit.objects.get(pk=deposit_id)
self.assertEqual(deposit.status, DEPOSIT_STATUS_DEPOSITED)
self.assertEqual(deposit.external_id, external_id)
self.assertEqual(deposit.collection, self.collection)
self.assertEqual(deposit.client, self.user)
self.assertIsNone(deposit.swh_id)
deposit_requests = DepositRequest.objects.filter(deposit=deposit)
- self.assertEquals(len(deposit_requests), 2)
+ self.assertEqual(len(deposit_requests), 2)
for deposit_request in deposit_requests:
- self.assertEquals(deposit_request.deposit, deposit)
+ self.assertEqual(deposit_request.deposit, deposit)
if deposit_request.type.name == 'archive':
self.assertRegex(deposit_request.archive.name,
self.archive['name'])
self.assertIsNone(deposit_request.metadata)
self.assertIsNone(deposit_request.raw_metadata)
else:
- self.assertEquals(
+ self.assertEqual(
deposit_request.metadata['id'],
'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a')
- self.assertEquals(deposit_request.raw_metadata,
- data_atom_entry.decode('utf-8'))
+ self.assertEqual(deposit_request.raw_metadata,
+ data_atom_entry.decode('utf-8'))
def test_post_deposit_multipart_tar(self):
"""one multipart deposit (tar+xml) should be accepted
"""
# given
url = reverse(COL_IRI, args=[self.collection.name])
# from django.core.files import uploadedfile
data_atom_entry = self.data_atom_entry_ok
archive = InMemoryUploadedFile(
BytesIO(self.archive['data']),
field_name=self.archive['name'],
name=self.archive['name'],
content_type='application/x-tar',
size=self.archive['length'],
charset=None)
atom_entry = InMemoryUploadedFile(
BytesIO(data_atom_entry),
field_name='atom0',
name='atom0',
content_type='application/atom+xml; charset="utf-8"',
size=len(data_atom_entry),
charset='utf-8')
external_id = 'external-id'
# when
response = self.client.post(
url,
format='multipart',
data={
'archive': archive,
'atom_entry': atom_entry,
},
# + headers
HTTP_IN_PROGRESS='false',
HTTP_SLUG=external_id)
# then
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
response_content = parse_xml(BytesIO(response.content))
deposit_id = response_content['deposit_id']
deposit = Deposit.objects.get(pk=deposit_id)
self.assertEqual(deposit.status, DEPOSIT_STATUS_DEPOSITED)
self.assertEqual(deposit.external_id, external_id)
self.assertEqual(deposit.collection, self.collection)
self.assertEqual(deposit.client, self.user)
self.assertIsNone(deposit.swh_id)
deposit_requests = DepositRequest.objects.filter(deposit=deposit)
- self.assertEquals(len(deposit_requests), 2)
+ self.assertEqual(len(deposit_requests), 2)
for deposit_request in deposit_requests:
- self.assertEquals(deposit_request.deposit, deposit)
+ self.assertEqual(deposit_request.deposit, deposit)
if deposit_request.type.name == 'archive':
self.assertRegex(deposit_request.archive.name,
self.archive['name'])
self.assertIsNone(deposit_request.metadata)
self.assertIsNone(deposit_request.raw_metadata)
else:
- self.assertEquals(
+ self.assertEqual(
deposit_request.metadata['id'],
'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a')
- self.assertEquals(deposit_request.raw_metadata,
- data_atom_entry.decode('utf-8'))
+ self.assertEqual(deposit_request.raw_metadata,
+ data_atom_entry.decode('utf-8'))
def test_post_deposit_multipart_put_to_replace_metadata(self):
"""One multipart deposit followed by a metadata update should be
accepted
"""
# given
url = reverse(COL_IRI, args=[self.collection.name])
data_atom_entry = self.data_atom_entry_ok
archive = InMemoryUploadedFile(
BytesIO(self.archive['data']),
field_name=self.archive['name'],
name=self.archive['name'],
content_type='application/zip',
size=self.archive['length'],
charset=None)
atom_entry = InMemoryUploadedFile(
BytesIO(data_atom_entry),
field_name='atom0',
name='atom0',
content_type='application/atom+xml; charset="utf-8"',
size=len(data_atom_entry),
charset='utf-8')
external_id = 'external-id'
# when
response = self.client.post(
url,
format='multipart',
data={
'archive': archive,
'atom_entry': atom_entry,
},
# + headers
HTTP_IN_PROGRESS='true',
HTTP_SLUG=external_id)
# then
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
response_content = parse_xml(BytesIO(response.content))
deposit_id = response_content['deposit_id']
deposit = Deposit.objects.get(pk=deposit_id)
self.assertEqual(deposit.status, 'partial')
self.assertEqual(deposit.external_id, external_id)
self.assertEqual(deposit.collection, self.collection)
self.assertEqual(deposit.client, self.user)
self.assertIsNone(deposit.swh_id)
deposit_requests = DepositRequest.objects.filter(deposit=deposit)
- self.assertEquals(len(deposit_requests), 2)
+ self.assertEqual(len(deposit_requests), 2)
for deposit_request in deposit_requests:
- self.assertEquals(deposit_request.deposit, deposit)
+ self.assertEqual(deposit_request.deposit, deposit)
if deposit_request.type.name == 'archive':
self.assertRegex(deposit_request.archive.name,
self.archive['name'])
else:
- self.assertEquals(
+ self.assertEqual(
deposit_request.metadata['id'],
'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a')
- self.assertEquals(deposit_request.raw_metadata,
- data_atom_entry.decode('utf-8'))
+ self.assertEqual(deposit_request.raw_metadata,
+ data_atom_entry.decode('utf-8'))
replace_metadata_uri = response._headers['location'][1]
response = self.client.put(
replace_metadata_uri,
content_type='application/atom+xml;type=entry',
data=self.data_atom_entry_update_in_place,
HTTP_IN_PROGRESS='false')
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
# deposit_id did not change
deposit = Deposit.objects.get(pk=deposit_id)
self.assertEqual(deposit.status, DEPOSIT_STATUS_DEPOSITED)
self.assertEqual(deposit.external_id, external_id)
self.assertEqual(deposit.collection, self.collection)
self.assertEqual(deposit.client, self.user)
self.assertIsNone(deposit.swh_id)
deposit_requests = DepositRequest.objects.filter(deposit=deposit)
- self.assertEquals(len(deposit_requests), 2)
+ self.assertEqual(len(deposit_requests), 2)
for deposit_request in deposit_requests:
- self.assertEquals(deposit_request.deposit, deposit)
+ self.assertEqual(deposit_request.deposit, deposit)
if deposit_request.type.name == 'archive':
self.assertRegex(deposit_request.archive.name,
self.archive['name'])
else:
- self.assertEquals(
+ self.assertEqual(
deposit_request.metadata['id'],
'urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa7b')
- self.assertEquals(
+ self.assertEqual(
deposit_request.raw_metadata,
self.data_atom_entry_update_in_place)
# FAILURE scenarios
def test_post_deposit_multipart_only_archive_and_atom_entry(self):
"""Multipart deposit only accepts one archive and one atom+xml"""
# given
url = reverse(COL_IRI, args=[self.collection.name])
archive_content = b'some content representing archive'
archive = InMemoryUploadedFile(BytesIO(archive_content),
field_name='archive0',
name='archive0',
content_type='application/x-tar',
size=len(archive_content),
charset=None)
other_archive_content = b"some-other-content"
other_archive = InMemoryUploadedFile(BytesIO(other_archive_content),
field_name='atom0',
name='atom0',
content_type='application/x-tar',
size=len(other_archive_content),
charset='utf-8')
# when
response = self.client.post(
url,
format='multipart',
data={
'archive': archive,
'atom_entry': other_archive,
},
# + headers
HTTP_IN_PROGRESS='false',
HTTP_SLUG='external-id')
# then
self.assertEqual(response.status_code,
status.HTTP_415_UNSUPPORTED_MEDIA_TYPE)
self.assertTrue(
'Only 1 application/zip (or application/x-tar) archive' in
response.content.decode('utf-8'))
# when
archive.seek(0)
response = self.client.post(
url,
format='multipart',
data={
'archive': archive,
},
# + headers
HTTP_IN_PROGRESS='false',
HTTP_SLUG='external-id')
# then
self.assertEqual(response.status_code,
status.HTTP_415_UNSUPPORTED_MEDIA_TYPE)
self.assertTrue(
'You must provide both 1 application/zip (or '
'application/x-tar) and 1 atom+xml entry for '
'multipart deposit' in response.content.decode('utf-8')
)
diff --git a/swh/deposit/tests/api/test_deposit_read_archive.py b/swh/deposit/tests/api/test_deposit_read_archive.py
index 086ec6e9..4c82ad2b 100644
--- a/swh/deposit/tests/api/test_deposit_read_archive.py
+++ b/swh/deposit/tests/api/test_deposit_read_archive.py
@@ -1,125 +1,125 @@
# Copyright (C) 2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import hashlib
import os
from django.core.urlresolvers import reverse
import pytest
from rest_framework import status
from rest_framework.test import APITestCase
from swh.core import tarball
from swh.deposit.config import PRIVATE_GET_RAW_CONTENT
from swh.deposit.tests import TEST_CONFIG
from ..common import BasicTestCase, WithAuthTestCase, CommonCreationRoutine
from ..common import FileSystemCreationRoutine, create_arborescence_archive
@pytest.mark.fs
class DepositReadArchivesTest(APITestCase, WithAuthTestCase,
BasicTestCase, CommonCreationRoutine,
FileSystemCreationRoutine):
def setUp(self):
super().setUp()
self.archive2 = create_arborescence_archive(
self.root_path, 'archive2', 'file2', b'some other content in file')
self.workdir = os.path.join(self.root_path, 'workdir')
def test_access_to_existing_deposit_with_one_archive(self):
"""Access to deposit should stream a 200 response with its raw content
"""
deposit_id = self.create_simple_binary_deposit()
url = reverse(PRIVATE_GET_RAW_CONTENT,
args=[self.collection.name, deposit_id])
r = self.client.get(url)
- self.assertEquals(r.status_code, status.HTTP_200_OK)
- self.assertEquals(r._headers['content-type'][1],
- 'application/octet-stream')
+ self.assertEqual(r.status_code, status.HTTP_200_OK)
+ self.assertEqual(r._headers['content-type'][1],
+ 'application/octet-stream')
# read the stream
data = b''.join(r.streaming_content)
actual_sha1 = hashlib.sha1(data).hexdigest()
- self.assertEquals(actual_sha1, self.archive['sha1sum'])
+ self.assertEqual(actual_sha1, self.archive['sha1sum'])
# this does not touch the extraction dir so this should stay empty
- self.assertEquals(os.listdir(TEST_CONFIG['extraction_dir']), [])
+ self.assertEqual(os.listdir(TEST_CONFIG['extraction_dir']), [])
def _check_tarball_consistency(self, actual_sha1):
tarball.uncompress(self.archive['path'], self.workdir)
- self.assertEquals(os.listdir(self.workdir), ['file1'])
+ self.assertEqual(os.listdir(self.workdir), ['file1'])
tarball.uncompress(self.archive2['path'], self.workdir)
lst = set(os.listdir(self.workdir))
- self.assertEquals(lst, {'file1', 'file2'})
+ self.assertEqual(lst, {'file1', 'file2'})
new_path = self.workdir + '.zip'
tarball.compress(new_path, 'zip', self.workdir)
with open(new_path, 'rb') as f:
h = hashlib.sha1(f.read()).hexdigest()
self.assertEqual(actual_sha1, h)
self.assertNotEqual(actual_sha1, self.archive['sha1sum'])
self.assertNotEqual(actual_sha1, self.archive2['sha1sum'])
def test_access_to_existing_deposit_with_multiple_archives(self):
"""Access to deposit should stream a 200 response with its raw contents
"""
deposit_id = self.create_complex_binary_deposit()
url = reverse(PRIVATE_GET_RAW_CONTENT,
args=[self.collection.name, deposit_id])
r = self.client.get(url)
- self.assertEquals(r.status_code, status.HTTP_200_OK)
- self.assertEquals(r._headers['content-type'][1],
- 'application/octet-stream')
+ self.assertEqual(r.status_code, status.HTTP_200_OK)
+ self.assertEqual(r._headers['content-type'][1],
+ 'application/octet-stream')
# read the stream
data = b''.join(r.streaming_content)
actual_sha1 = hashlib.sha1(data).hexdigest()
self._check_tarball_consistency(actual_sha1)
# this touches the extraction directory but should clean up
# after itself
- self.assertEquals(os.listdir(TEST_CONFIG['extraction_dir']), [])
+ self.assertEqual(os.listdir(TEST_CONFIG['extraction_dir']), [])
class DepositReadArchivesFailureTest(APITestCase, WithAuthTestCase,
BasicTestCase, CommonCreationRoutine):
def test_access_to_nonexisting_deposit_returns_404_response(self):
"""Read unknown collection should return a 404 response
"""
unknown_id = '999'
url = reverse(PRIVATE_GET_RAW_CONTENT,
args=[self.collection.name, unknown_id])
response = self.client.get(url)
self.assertEqual(response.status_code,
status.HTTP_404_NOT_FOUND)
self.assertIn('Deposit with id %s does not exist' % unknown_id,
response.content.decode('utf-8'))
def test_access_to_nonexisting_collection_returns_404_response(self):
"""Read unknown deposit should return a 404 response
"""
collection_name = 'non-existing'
deposit_id = self.create_deposit_partial()
url = reverse(PRIVATE_GET_RAW_CONTENT,
args=[collection_name, deposit_id])
response = self.client.get(url)
self.assertEqual(response.status_code,
status.HTTP_404_NOT_FOUND)
self.assertIn('Unknown collection name %s' % collection_name,
response.content.decode('utf-8'))
diff --git a/swh/deposit/tests/api/test_deposit_read_metadata.py b/swh/deposit/tests/api/test_deposit_read_metadata.py
index 08b211f5..2d7ec66b 100644
--- a/swh/deposit/tests/api/test_deposit_read_metadata.py
+++ b/swh/deposit/tests/api/test_deposit_read_metadata.py
@@ -1,205 +1,205 @@
# Copyright (C) 2017-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from django.core.urlresolvers import reverse
from rest_framework import status
from rest_framework.test import APITestCase
from swh.deposit.models import Deposit
from swh.deposit.config import PRIVATE_GET_DEPOSIT_METADATA
from swh.deposit.config import DEPOSIT_STATUS_LOAD_SUCCESS
from swh.deposit.config import DEPOSIT_STATUS_PARTIAL
from ...config import SWH_PERSON
from ..common import BasicTestCase, WithAuthTestCase, CommonCreationRoutine
class DepositReadMetadataTest(APITestCase, WithAuthTestCase, BasicTestCase,
CommonCreationRoutine):
"""Deposit access to read metadata information on deposit.
"""
def test_read_metadata(self):
"""Private metadata read api to existing deposit should return metadata
"""
deposit_id = self.create_deposit_partial()
url = reverse(PRIVATE_GET_DEPOSIT_METADATA,
args=[self.collection.name, deposit_id])
response = self.client.get(url)
self.assertEqual(response.status_code,
status.HTTP_200_OK)
- self.assertEquals(response._headers['content-type'][1],
- 'application/json')
+ self.assertEqual(response._headers['content-type'][1],
+ 'application/json')
data = response.json()
expected_meta = {
'origin': {
'url': 'https://hal-test.archives-ouvertes.fr/' +
'some-external-id',
'type': 'deposit'
},
'origin_metadata': {
'metadata': {
'@xmlns': ['http://www.w3.org/2005/Atom'],
'author': ['some awesome author', 'another one', 'no one'],
'external_identifier': 'some-external-id',
'url': 'https://hal-test.archives-ouvertes.fr/' +
'some-external-id'
},
'provider': {
'provider_name': 'hal',
'provider_type': 'deposit_client',
'provider_url': 'https://hal-test.archives-ouvertes.fr/',
'metadata': {}
},
'tool': {
'tool_name': 'swh-deposit',
'tool_version': '0.0.1',
'tool_configuration': {
'sword_version': '2'
}
}
},
'revision': {
'synthetic': True,
'committer_date': None,
'message': 'hal: Deposit %s in collection hal' % deposit_id,
'author': SWH_PERSON,
'committer': SWH_PERSON,
'date': None,
'metadata': {
'@xmlns': ['http://www.w3.org/2005/Atom'],
'author': ['some awesome author', 'another one', 'no one'],
'external_identifier': 'some-external-id',
'url': 'https://hal-test.archives-ouvertes.fr/' +
'some-external-id'
},
'type': 'tar'
},
'branch_name': 'master',
}
- self.assertEquals(data, expected_meta)
+ self.assertEqual(data, expected_meta)
def test_read_metadata_revision_with_parent(self):
"""Private read metadata to a deposit (with parent) returns metadata
"""
swh_id = 'da78a9d4cf1d5d29873693fd496142e3a18c20fa'
swh_persistent_id = 'swh:1:rev:%s' % swh_id
deposit_id1 = self.create_deposit_with_status(
status=DEPOSIT_STATUS_LOAD_SUCCESS,
external_id='some-external-id',
swh_id=swh_persistent_id)
deposit_parent = Deposit.objects.get(pk=deposit_id1)
- self.assertEquals(deposit_parent.swh_id, swh_persistent_id)
- self.assertEquals(deposit_parent.external_id, 'some-external-id')
- self.assertEquals(deposit_parent.status, DEPOSIT_STATUS_LOAD_SUCCESS)
+ self.assertEqual(deposit_parent.swh_id, swh_persistent_id)
+ self.assertEqual(deposit_parent.external_id, 'some-external-id')
+ self.assertEqual(deposit_parent.status, DEPOSIT_STATUS_LOAD_SUCCESS)
deposit_id = self.create_deposit_partial(
external_id='some-external-id')
deposit = Deposit.objects.get(pk=deposit_id)
- self.assertEquals(deposit.external_id, 'some-external-id')
- self.assertEquals(deposit.swh_id, None)
- self.assertEquals(deposit.parent, deposit_parent)
- self.assertEquals(deposit.status, DEPOSIT_STATUS_PARTIAL)
+ self.assertEqual(deposit.external_id, 'some-external-id')
+ self.assertEqual(deposit.swh_id, None)
+ self.assertEqual(deposit.parent, deposit_parent)
+ self.assertEqual(deposit.status, DEPOSIT_STATUS_PARTIAL)
url = reverse(PRIVATE_GET_DEPOSIT_METADATA,
args=[self.collection.name, deposit_id])
response = self.client.get(url)
self.assertEqual(response.status_code,
status.HTTP_200_OK)
- self.assertEquals(response._headers['content-type'][1],
- 'application/json')
+ self.assertEqual(response._headers['content-type'][1],
+ 'application/json')
data = response.json()
expected_meta = {
'origin': {
'url': 'https://hal-test.archives-ouvertes.fr/' +
'some-external-id',
'type': 'deposit'
},
'origin_metadata': {
'metadata': {
'@xmlns': ['http://www.w3.org/2005/Atom'],
'author': ['some awesome author', 'another one', 'no one'],
'external_identifier': 'some-external-id',
'url': 'https://hal-test.archives-ouvertes.fr/' +
'some-external-id'
},
'provider': {
'provider_name': 'hal',
'provider_type': 'deposit_client',
'provider_url': 'https://hal-test.archives-ouvertes.fr/',
'metadata': {}
},
'tool': {
'tool_name': 'swh-deposit',
'tool_version': '0.0.1',
'tool_configuration': {
'sword_version': '2'
}
}
},
'revision': {
'synthetic': True,
'date': None,
'committer_date': None,
'author': SWH_PERSON,
'committer': SWH_PERSON,
'type': 'tar',
'message': 'hal: Deposit %s in collection hal' % deposit_id,
'metadata': {
'@xmlns': ['http://www.w3.org/2005/Atom'],
'author': ['some awesome author', 'another one', 'no one'],
'external_identifier': 'some-external-id',
'url': 'https://hal-test.archives-ouvertes.fr/' +
'some-external-id'
},
'parents': [swh_id]
},
'branch_name': 'master',
}
self.assertEqual(data, expected_meta)
def test_access_to_nonexisting_deposit_returns_404_response(self):
"""Read unknown collection should return a 404 response
"""
unknown_id = '999'
url = reverse(PRIVATE_GET_DEPOSIT_METADATA,
args=[self.collection.name, unknown_id])
response = self.client.get(url)
self.assertEqual(response.status_code,
status.HTTP_404_NOT_FOUND)
self.assertIn('Deposit with id %s does not exist' % unknown_id,
response.content.decode('utf-8'))
def test_access_to_nonexisting_collection_returns_404_response(self):
"""Read unknown deposit should return a 404 response
"""
collection_name = 'non-existing'
deposit_id = self.create_deposit_partial()
url = reverse(PRIVATE_GET_DEPOSIT_METADATA,
args=[collection_name, deposit_id])
response = self.client.get(url)
self.assertEqual(response.status_code,
status.HTTP_404_NOT_FOUND)
self.assertIn('Unknown collection name %s' % collection_name,
response.content.decode('utf-8'),)
diff --git a/swh/deposit/tests/api/test_deposit_update.py b/swh/deposit/tests/api/test_deposit_update.py
index 286d2f09..45935564 100644
--- a/swh/deposit/tests/api/test_deposit_update.py
+++ b/swh/deposit/tests/api/test_deposit_update.py
@@ -1,333 +1,333 @@
# Copyright (C) 2017-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from django.core.urlresolvers import reverse
from rest_framework import status
from rest_framework.test import APITestCase
from swh.deposit.models import Deposit, DepositRequest
from swh.deposit.config import EDIT_SE_IRI, EM_IRI
from ..common import BasicTestCase, WithAuthTestCase, CommonCreationRoutine
from ..common import FileSystemCreationRoutine, create_arborescence_archive
class DepositUpdateOrReplaceExistingDataTest(
APITestCase, WithAuthTestCase, BasicTestCase,
FileSystemCreationRoutine, CommonCreationRoutine):
"""Try put/post (update/replace) query on EM_IRI
"""
def setUp(self):
super().setUp()
self.atom_entry_data1 = b"""
bar
"""
self.atom_entry_data1 = b"""
bar
"""
self.archive2 = create_arborescence_archive(
self.root_path, 'archive2', 'file2', b'some other content in file')
def test_replace_archive_to_deposit_is_possible(self):
"""Replace all archive with another one should return a 204 response
"""
# given
deposit_id = self.create_simple_binary_deposit(status_partial=True)
deposit = Deposit.objects.get(pk=deposit_id)
requests = DepositRequest.objects.filter(
deposit=deposit,
type=self.deposit_request_types['archive'])
assert len(list(requests)) == 1
assert self.archive['name'] in requests[0].archive.name
# we have no metadata for that deposit
requests = list(DepositRequest.objects.filter(
deposit=deposit, type=self.deposit_request_types['metadata']))
assert len(requests) == 0
deposit_id = self._update_deposit_with_status(deposit_id,
status_partial=True)
requests = list(DepositRequest.objects.filter(
deposit=deposit, type=self.deposit_request_types['metadata']))
assert len(requests) == 1
update_uri = reverse(EM_IRI, args=[self.collection.name, deposit_id])
external_id = 'some-external-id-1'
response = self.client.put(
update_uri,
content_type='application/zip', # as zip
data=self.archive2['data'],
# + headers
CONTENT_LENGTH=self.archive2['length'],
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=self.archive2['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % (
self.archive2['name'], ))
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
requests = DepositRequest.objects.filter(
deposit=deposit,
type=self.deposit_request_types['archive'])
- self.assertEquals(len(list(requests)), 1)
+ self.assertEqual(len(list(requests)), 1)
self.assertRegex(requests[0].archive.name, self.archive2['name'])
# check we did not touch the other parts
requests = list(DepositRequest.objects.filter(
deposit=deposit, type=self.deposit_request_types['metadata']))
- self.assertEquals(len(requests), 1)
+ self.assertEqual(len(requests), 1)
def test_replace_metadata_to_deposit_is_possible(self):
"""Replace all metadata with another one should return a 204 response
"""
# given
deposit_id = self.create_simple_binary_deposit(status_partial=True)
deposit = Deposit.objects.get(pk=deposit_id)
requests = DepositRequest.objects.filter(
deposit=deposit,
type=self.deposit_request_types['metadata'])
assert len(list(requests)) == 0
requests = list(DepositRequest.objects.filter(
deposit=deposit, type=self.deposit_request_types['archive']))
assert len(requests) == 1
update_uri = reverse(EDIT_SE_IRI, args=[self.collection.name,
deposit_id])
response = self.client.put(
update_uri,
content_type='application/atom+xml;type=entry',
data=self.atom_entry_data1)
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
requests = DepositRequest.objects.filter(
deposit=deposit,
type=self.deposit_request_types['metadata'])
- self.assertEquals(len(list(requests)), 1)
+ self.assertEqual(len(list(requests)), 1)
metadata = requests[0].metadata
- self.assertEquals(metadata['foobar'], 'bar')
+ self.assertEqual(metadata['foobar'], 'bar')
# check we did not touch the other parts
requests = list(DepositRequest.objects.filter(
deposit=deposit, type=self.deposit_request_types['archive']))
- self.assertEquals(len(requests), 1)
+ self.assertEqual(len(requests), 1)
def test_add_archive_to_deposit_is_possible(self):
"""Add another archive to a deposit return a 201 response
"""
# given
deposit_id = self.create_simple_binary_deposit(status_partial=True)
deposit = Deposit.objects.get(pk=deposit_id)
requests = DepositRequest.objects.filter(
deposit=deposit,
type=self.deposit_request_types['archive'])
assert len(list(requests)) == 1
assert self.archive['name'] in requests[0].archive.name
requests = list(DepositRequest.objects.filter(
deposit=deposit, type=self.deposit_request_types['metadata']))
assert len(requests) == 0
update_uri = reverse(EM_IRI, args=[self.collection.name, deposit_id])
external_id = 'some-external-id-1'
response = self.client.post(
update_uri,
content_type='application/zip', # as zip
data=self.archive2['data'],
# + headers
CONTENT_LENGTH=self.archive2['length'],
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=self.archive2['md5sum'],
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_IN_PROGRESS='false',
HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % (
self.archive2['name'],))
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
requests = list(DepositRequest.objects.filter(
deposit=deposit,
type=self.deposit_request_types['archive']).order_by('id'))
- self.assertEquals(len(requests), 2)
+ self.assertEqual(len(requests), 2)
# first archive still exists
self.assertRegex(requests[0].archive.name, self.archive['name'])
# a new one was added
self.assertRegex(requests[1].archive.name, self.archive2['name'])
# check we did not touch the other parts
requests = list(DepositRequest.objects.filter(
deposit=deposit, type=self.deposit_request_types['metadata']))
- self.assertEquals(len(requests), 0)
+ self.assertEqual(len(requests), 0)
def test_add_metadata_to_deposit_is_possible(self):
"""Add metadata with another one should return a 204 response
"""
# given
deposit_id = self.create_deposit_partial()
deposit = Deposit.objects.get(pk=deposit_id)
requests = DepositRequest.objects.filter(
deposit=deposit,
type=self.deposit_request_types['metadata'])
assert len(list(requests)) == 2
requests = list(DepositRequest.objects.filter(
deposit=deposit, type=self.deposit_request_types['archive']))
assert len(requests) == 0
update_uri = reverse(EDIT_SE_IRI, args=[self.collection.name,
deposit_id])
response = self.client.post(
update_uri,
content_type='application/atom+xml;type=entry',
data=self.atom_entry_data1)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
requests = DepositRequest.objects.filter(
deposit=deposit,
type=self.deposit_request_types['metadata']).order_by('id')
- self.assertEquals(len(list(requests)), 3)
+ self.assertEqual(len(list(requests)), 3)
# a new one was added
- self.assertEquals(requests[1].metadata['foobar'], 'bar')
+ self.assertEqual(requests[1].metadata['foobar'], 'bar')
# check we did not touch the other parts
requests = list(DepositRequest.objects.filter(
deposit=deposit, type=self.deposit_request_types['archive']))
- self.assertEquals(len(requests), 0)
+ self.assertEqual(len(requests), 0)
class DepositUpdateFailuresTest(APITestCase, WithAuthTestCase, BasicTestCase,
CommonCreationRoutine):
"""Failure scenario about add/replace (post/put) query on deposit.
"""
def test_add_metadata_to_unknown_collection(self):
"""Replacing metadata to unknown deposit should return a 404 response
"""
url = reverse(EDIT_SE_IRI, args=['test', 1000])
response = self.client.post(
url,
content_type='application/atom+xml;type=entry',
data=self.atom_entry_data0)
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
self.assertRegex(response.content.decode('utf-8'),
'Unknown collection name test')
def test_add_metadata_to_unknown_deposit(self):
"""Replacing metadata to unknown deposit should return a 404 response
"""
url = reverse(EDIT_SE_IRI, args=[self.collection.name, 999])
response = self.client.post(
url,
content_type='application/atom+xml;type=entry',
data=self.atom_entry_data0)
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
self.assertRegex(response.content.decode('utf-8'),
'Deposit with id 999 does not exist')
def test_replace_metadata_to_unknown_deposit(self):
"""Adding metadata to unknown deposit should return a 404 response
"""
url = reverse(EDIT_SE_IRI, args=[self.collection.name, 998])
response = self.client.put(
url,
content_type='application/atom+xml;type=entry',
data=self.atom_entry_data0)
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
self.assertRegex(response.content.decode('utf-8'),
'Deposit with id 998 does not exist')
def test_add_archive_to_unknown_deposit(self):
"""Adding metadata to unknown deposit should return a 404 response
"""
url = reverse(EM_IRI, args=[self.collection.name, 997])
response = self.client.post(
url,
content_type='application/zip',
data=self.atom_entry_data0)
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
self.assertRegex(response.content.decode('utf-8'),
'Deposit with id 997 does not exist')
def test_replace_archive_to_unknown_deposit(self):
"""Replacing archive to unknown deposit should return a 404 response
"""
url = reverse(EM_IRI, args=[self.collection.name, 996])
response = self.client.put(
url,
content_type='application/zip',
data=self.atom_entry_data0)
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
self.assertRegex(response.content.decode('utf-8'),
'Deposit with id 996 does not exist')
def test_post_metadata_to_em_iri_failure(self):
"""Update (POST) archive with wrong content type should return 400
"""
deposit_id = self.create_deposit_partial() # only update on partial
update_uri = reverse(EM_IRI, args=[self.collection.name, deposit_id])
response = self.client.post(
update_uri,
content_type='application/x-gtar-compressed',
data=self.atom_entry_data0)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertRegex(response.content.decode('utf-8'),
'Packaging format supported is restricted to '
'application/zip, application/x-tar')
def test_put_metadata_to_em_iri_failure(self):
"""Update (PUT) archive with wrong content type should return 400
"""
# given
deposit_id = self.create_deposit_partial() # only update on partial
# when
update_uri = reverse(EM_IRI, args=[self.collection.name, deposit_id])
response = self.client.put(
update_uri,
content_type='application/atom+xml;type=entry',
data=self.atom_entry_data0)
# then
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertRegex(response.content.decode('utf-8'),
'Packaging format supported is restricted to '
'application/zip, application/x-tar')
diff --git a/swh/deposit/tests/api/test_deposit_update_status.py b/swh/deposit/tests/api/test_deposit_update_status.py
index 0b5f21ec..1ec4dbdb 100644
--- a/swh/deposit/tests/api/test_deposit_update_status.py
+++ b/swh/deposit/tests/api/test_deposit_update_status.py
@@ -1,130 +1,130 @@
# Copyright (C) 2017-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
from django.core.urlresolvers import reverse
from rest_framework import status
from rest_framework.test import APITestCase
from swh.deposit.models import Deposit, DEPOSIT_STATUS_DETAIL
from swh.deposit.config import PRIVATE_PUT_DEPOSIT, DEPOSIT_STATUS_VERIFIED
from swh.deposit.config import DEPOSIT_STATUS_LOAD_SUCCESS
from ..common import BasicTestCase
class UpdateDepositStatusTest(APITestCase, BasicTestCase):
"""Update the deposit's status scenario
"""
def setUp(self):
super().setUp()
deposit = Deposit(status=DEPOSIT_STATUS_VERIFIED,
collection=self.collection,
client=self.user)
deposit.save()
self.deposit = Deposit.objects.get(pk=deposit.id)
assert self.deposit.status == DEPOSIT_STATUS_VERIFIED
def test_update_deposit_status(self):
"""Existing status for update should return a 204 response
"""
url = reverse(PRIVATE_PUT_DEPOSIT,
args=[self.collection.name, self.deposit.id])
possible_status = set(DEPOSIT_STATUS_DETAIL.keys()) - set(
[DEPOSIT_STATUS_LOAD_SUCCESS])
for _status in possible_status:
response = self.client.put(
url,
content_type='application/json',
data=json.dumps({'status': _status}))
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
deposit = Deposit.objects.get(pk=self.deposit.id)
- self.assertEquals(deposit.status, _status)
+ self.assertEqual(deposit.status, _status)
def test_update_deposit_status_with_info(self):
"""Existing status for update with info should return a 204 response
"""
url = reverse(PRIVATE_PUT_DEPOSIT,
args=[self.collection.name, self.deposit.id])
expected_status = DEPOSIT_STATUS_LOAD_SUCCESS
origin_url = 'something'
directory_id = '42a13fc721c8716ff695d0d62fc851d641f3a12b'
revision_id = '47dc6b4636c7f6cba0df83e3d5490bf4334d987e'
expected_swh_id = 'swh:1:dir:%s' % directory_id
expected_swh_id_context = 'swh:1:dir:%s;origin=%s' % (
directory_id, origin_url)
expected_swh_anchor_id = 'swh:1:rev:%s' % revision_id
expected_swh_anchor_id_context = 'swh:1:rev:%s;origin=%s' % (
revision_id, origin_url)
response = self.client.put(
url,
content_type='application/json',
data=json.dumps({
'status': expected_status,
'revision_id': revision_id,
'directory_id': directory_id,
'origin_url': origin_url,
}))
self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)
deposit = Deposit.objects.get(pk=self.deposit.id)
- self.assertEquals(deposit.status, expected_status)
- self.assertEquals(deposit.swh_id, expected_swh_id)
- self.assertEquals(deposit.swh_id_context, expected_swh_id_context)
- self.assertEquals(deposit.swh_anchor_id, expected_swh_anchor_id)
- self.assertEquals(deposit.swh_anchor_id_context,
- expected_swh_anchor_id_context)
+ self.assertEqual(deposit.status, expected_status)
+ self.assertEqual(deposit.swh_id, expected_swh_id)
+ self.assertEqual(deposit.swh_id_context, expected_swh_id_context)
+ self.assertEqual(deposit.swh_anchor_id, expected_swh_anchor_id)
+ self.assertEqual(deposit.swh_anchor_id_context,
+ expected_swh_anchor_id_context)
def test_update_deposit_status_will_fail_with_unknown_status(self):
"""Unknown status for update should return a 400 response
"""
url = reverse(PRIVATE_PUT_DEPOSIT,
args=[self.collection.name, self.deposit.id])
response = self.client.put(
url,
content_type='application/json',
data=json.dumps({'status': 'unknown'}))
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
def test_update_deposit_status_will_fail_with_no_status_key(self):
"""No status provided for update should return a 400 response
"""
url = reverse(PRIVATE_PUT_DEPOSIT,
args=[self.collection.name, self.deposit.id])
response = self.client.put(
url,
content_type='application/json',
data=json.dumps({'something': 'something'}))
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
def test_update_deposit_status_success_without_swh_id_fail(self):
"""Providing successful status without swh_id should return a 400
"""
url = reverse(PRIVATE_PUT_DEPOSIT,
args=[self.collection.name, self.deposit.id])
response = self.client.put(
url,
content_type='application/json',
data=json.dumps({'status': DEPOSIT_STATUS_LOAD_SUCCESS}))
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
diff --git a/swh/deposit/tests/api/test_service_document.py b/swh/deposit/tests/api/test_service_document.py
index 402e7557..3afa1ca5 100644
--- a/swh/deposit/tests/api/test_service_document.py
+++ b/swh/deposit/tests/api/test_service_document.py
@@ -1,102 +1,102 @@
# Copyright (C) 2017-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from django.core.urlresolvers import reverse
from rest_framework import status
from rest_framework.test import APITestCase
from swh.deposit.tests import TEST_CONFIG
from swh.deposit.config import SD_IRI
from ..common import BasicTestCase, WithAuthTestCase
class ServiceDocumentNoAuthCase(APITestCase, BasicTestCase):
"""Service document endpoints are protected with basic authentication.
"""
def test_service_document_no_authentication_fails(self):
"""Without authentication, service document endpoint should return 401
"""
url = reverse(SD_IRI)
response = self.client.get(url)
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
def test_service_document_with_http_accept_should_not_break(self):
"""Without auth, sd endpoint through browser should return 401
"""
url = reverse(SD_IRI)
# when
response = self.client.get(
url,
HTTP_ACCEPT='text/html,application/xml;q=9,*/*,q=8')
self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)
class ServiceDocumentCase(APITestCase, WithAuthTestCase, BasicTestCase):
def assertResponseOk(self, response): # noqa: N802
self.assertEqual(response.status_code, status.HTTP_200_OK)
- self.assertEquals(response.content.decode('utf-8'),
+ self.assertEqual(response.content.decode('utf-8'),
'''
2.0
%s
The Software Heritage (SWH) Archive
%s Software Collection
application/zip
application/x-tar
Collection Policy
Software Heritage Archive
Collect, Preserve, Share
false
false
http://purl.org/net/sword/package/SimpleZip
http://testserver/1/%s/
%s
''' % (TEST_CONFIG['max_upload_size'],
self.username,
self.username,
self.username,
self.username)) # noqa
def test_service_document(self):
"""With authentication, service document list user's collection
"""
url = reverse(SD_IRI)
# when
response = self.client.get(url)
# then
self.assertResponseOk(response)
def test_service_document_with_http_accept_header(self):
"""With authentication, with browser, sd list user's collection
"""
url = reverse(SD_IRI)
# when
response = self.client.get(
url,
HTTP_ACCEPT='text/html,application/xml;q=9,*/*,q=8')
self.assertResponseOk(response)
diff --git a/swh/deposit/tests/loader/test_checker.py b/swh/deposit/tests/loader/test_checker.py
index c0d6ddad..d1721d64 100644
--- a/swh/deposit/tests/loader/test_checker.py
+++ b/swh/deposit/tests/loader/test_checker.py
@@ -1,68 +1,68 @@
# Copyright (C) 2017-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from rest_framework.test import APITestCase
from swh.deposit.models import Deposit
from swh.deposit.config import PRIVATE_CHECK_DEPOSIT, DEPOSIT_STATUS_VERIFIED
from swh.deposit.config import DEPOSIT_STATUS_REJECTED
from swh.deposit.loader.checker import DepositChecker
from django.core.urlresolvers import reverse
from .common import SWHDepositTestClient, CLIENT_TEST_CONFIG
from ..common import BasicTestCase, WithAuthTestCase, CommonCreationRoutine
from ..common import FileSystemCreationRoutine
class DepositCheckerScenarioTest(APITestCase, WithAuthTestCase,
BasicTestCase, CommonCreationRoutine,
FileSystemCreationRoutine):
def setUp(self):
super().setUp()
# 2. Sets a basic client which accesses the test data
checker_client = SWHDepositTestClient(client=self.client,
config=CLIENT_TEST_CONFIG)
# 3. setup loader with no persistence and that client
self.checker = DepositChecker(client=checker_client)
def test_check_deposit_ready(self):
"""Check on a valid 'deposited' deposit should result in 'verified'
"""
# 1. create a deposit with archive and metadata
deposit_id = self.create_simple_binary_deposit()
deposit_id = self.update_binary_deposit(deposit_id,
status_partial=False)
args = [self.collection.name, deposit_id]
deposit_check_url = reverse(PRIVATE_CHECK_DEPOSIT, args=args)
# when
actual_result = self.checker.check(deposit_check_url=deposit_check_url)
# then
deposit = Deposit.objects.get(pk=deposit_id)
- self.assertEquals(deposit.status, DEPOSIT_STATUS_VERIFIED)
- self.assertEquals(actual_result, {'status': 'eventful'})
+ self.assertEqual(deposit.status, DEPOSIT_STATUS_VERIFIED)
+ self.assertEqual(actual_result, {'status': 'eventful'})
def test_check_deposit_rejected(self):
"""Check on invalid 'deposited' deposit should result in 'rejected'
"""
# 1. create a deposit with archive and metadata
deposit_id = self.create_deposit_with_invalid_archive()
args = [self.collection.name, deposit_id]
deposit_check_url = reverse(PRIVATE_CHECK_DEPOSIT, args=args)
# when
actual_result = self.checker.check(deposit_check_url=deposit_check_url)
# then
deposit = Deposit.objects.get(pk=deposit_id)
- self.assertEquals(deposit.status, DEPOSIT_STATUS_REJECTED)
- self.assertEquals(actual_result, {'status': 'eventful'})
+ self.assertEqual(deposit.status, DEPOSIT_STATUS_REJECTED)
+ self.assertEqual(actual_result, {'status': 'eventful'})
diff --git a/swh/deposit/tests/loader/test_client.py b/swh/deposit/tests/loader/test_client.py
index 8241967a..8d19497e 100644
--- a/swh/deposit/tests/loader/test_client.py
+++ b/swh/deposit/tests/loader/test_client.py
@@ -1,258 +1,258 @@
# Copyright (C) 2017-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
import shutil
import tempfile
import unittest
import pytest
from swh.deposit.client import PrivateApiDepositClient
from swh.deposit.config import DEPOSIT_STATUS_LOAD_SUCCESS
from swh.deposit.config import DEPOSIT_STATUS_LOAD_FAILURE
from .common import CLIENT_TEST_CONFIG
class StreamedResponse:
"""Streamed response facsimile
"""
def __init__(self, ok, stream):
self.ok = ok
self.stream = stream
def iter_content(self):
yield from self.stream
class FakeRequestClientGet:
"""Fake request client dedicated to get method calls.
"""
def __init__(self, response):
self.response = response
def get(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
return self.response
@pytest.mark.fs
class PrivateApiDepositClientReadArchiveTest(unittest.TestCase):
def setUp(self):
super().setUp()
self.temporary_directory = tempfile.mkdtemp(dir='/tmp')
def tearDown(self):
super().setUp()
shutil.rmtree(self.temporary_directory)
def test_archive_get(self):
"""Reading archive should write data in temporary directory
"""
stream_content = [b"some", b"streamed", b"response"]
response = StreamedResponse(
ok=True,
stream=(s for s in stream_content))
_client = FakeRequestClientGet(response)
deposit_client = PrivateApiDepositClient(config=CLIENT_TEST_CONFIG,
_client=_client)
archive_path = os.path.join(self.temporary_directory, 'test.archive')
archive_path = deposit_client.archive_get('/some/url', archive_path)
self.assertTrue(os.path.exists(archive_path))
with open(archive_path, 'rb') as f:
actual_content = f.read()
- self.assertEquals(actual_content, b''.join(stream_content))
- self.assertEquals(_client.args, ('http://nowhere:9000/some/url', ))
- self.assertEquals(_client.kwargs, {
+ self.assertEqual(actual_content, b''.join(stream_content))
+ self.assertEqual(_client.args, ('http://nowhere:9000/some/url', ))
+ self.assertEqual(_client.kwargs, {
'stream': True
})
def test_archive_get_with_authentication(self):
"""Reading archive should write data in temporary directory
"""
stream_content = [b"some", b"streamed", b"response", b"for", b"auth"]
response = StreamedResponse(
ok=True,
stream=(s for s in stream_content))
_client = FakeRequestClientGet(response)
_config = CLIENT_TEST_CONFIG.copy()
_config['auth'] = { # add authentication setup
'username': 'user',
'password': 'pass'
}
deposit_client = PrivateApiDepositClient(_config, _client=_client)
archive_path = os.path.join(self.temporary_directory, 'test.archive')
archive_path = deposit_client.archive_get('/some/url', archive_path)
self.assertTrue(os.path.exists(archive_path))
with open(archive_path, 'rb') as f:
actual_content = f.read()
- self.assertEquals(actual_content, b''.join(stream_content))
- self.assertEquals(_client.args, ('http://nowhere:9000/some/url', ))
- self.assertEquals(_client.kwargs, {
+ self.assertEqual(actual_content, b''.join(stream_content))
+ self.assertEqual(_client.args, ('http://nowhere:9000/some/url', ))
+ self.assertEqual(_client.kwargs, {
'stream': True,
'auth': ('user', 'pass')
})
def test_archive_get_can_fail(self):
"""Reading archive can fail for some reasons
"""
response = StreamedResponse(ok=False, stream=None)
_client = FakeRequestClientGet(response)
deposit_client = PrivateApiDepositClient(config=CLIENT_TEST_CONFIG,
_client=_client)
with self.assertRaisesRegex(
ValueError,
'Problem when retrieving deposit archive'):
deposit_client.archive_get('/some/url', 'some/path')
class JsonResponse:
"""Json response facsimile
"""
def __init__(self, ok, response):
self.ok = ok
self.response = response
def json(self):
return self.response
class PrivateApiDepositClientReadMetadataTest(unittest.TestCase):
def test_metadata_get(self):
"""Reading archive should write data in temporary directory
"""
expected_response = {"some": "dict"}
response = JsonResponse(
ok=True,
response=expected_response)
_client = FakeRequestClientGet(response)
deposit_client = PrivateApiDepositClient(config=CLIENT_TEST_CONFIG,
_client=_client)
actual_metadata = deposit_client.metadata_get('/metadata')
- self.assertEquals(actual_metadata, expected_response)
+ self.assertEqual(actual_metadata, expected_response)
def test_metadata_get_can_fail(self):
"""Reading metadata can fail for some reasons
"""
_client = FakeRequestClientGet(JsonResponse(ok=False, response=None))
deposit_client = PrivateApiDepositClient(config=CLIENT_TEST_CONFIG,
_client=_client)
with self.assertRaisesRegex(
ValueError,
'Problem when retrieving metadata at'):
deposit_client.metadata_get('/some/metadata/url')
class FakeRequestClientPut:
"""Fake Request client dedicated to put request method calls.
"""
args = None
kwargs = None
def put(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
class PrivateApiDepositClientStatusUpdateTest(unittest.TestCase):
def test_status_update(self):
"""Update status
"""
_client = FakeRequestClientPut()
deposit_client = PrivateApiDepositClient(config=CLIENT_TEST_CONFIG,
_client=_client)
deposit_client.status_update('/update/status',
DEPOSIT_STATUS_LOAD_SUCCESS,
revision_id='some-revision-id')
- self.assertEquals(_client.args,
- ('http://nowhere:9000/update/status', ))
- self.assertEquals(_client.kwargs, {
+ self.assertEqual(_client.args,
+ ('http://nowhere:9000/update/status', ))
+ self.assertEqual(_client.kwargs, {
'json': {
'status': DEPOSIT_STATUS_LOAD_SUCCESS,
'revision_id': 'some-revision-id',
}
})
def test_status_update_with_no_revision_id(self):
"""Reading metadata can fail for some reasons
"""
_client = FakeRequestClientPut()
deposit_client = PrivateApiDepositClient(config=CLIENT_TEST_CONFIG,
_client=_client)
deposit_client.status_update('/update/status/fail',
DEPOSIT_STATUS_LOAD_FAILURE)
- self.assertEquals(_client.args,
- ('http://nowhere:9000/update/status/fail', ))
- self.assertEquals(_client.kwargs, {
+ self.assertEqual(_client.args,
+ ('http://nowhere:9000/update/status/fail', ))
+ self.assertEqual(_client.kwargs, {
'json': {
'status': DEPOSIT_STATUS_LOAD_FAILURE,
}
})
class PrivateApiDepositClientCheckTest(unittest.TestCase):
def test_check(self):
"""When check ok, this should return the deposit's status
"""
_client = FakeRequestClientGet(
JsonResponse(ok=True, response={'status': 'something'}))
deposit_client = PrivateApiDepositClient(config=CLIENT_TEST_CONFIG,
_client=_client)
r = deposit_client.check('/check')
- self.assertEquals(_client.args,
- ('http://nowhere:9000/check', ))
- self.assertEquals(_client.kwargs, {})
- self.assertEquals(r, 'something')
+ self.assertEqual(_client.args,
+ ('http://nowhere:9000/check', ))
+ self.assertEqual(_client.kwargs, {})
+ self.assertEqual(r, 'something')
def test_check_fails(self):
"""Checking deposit can fail for some reason
"""
_client = FakeRequestClientGet(
JsonResponse(ok=False, response=None))
deposit_client = PrivateApiDepositClient(config=CLIENT_TEST_CONFIG,
_client=_client)
with self.assertRaisesRegex(
ValueError,
'Problem when checking deposit'):
deposit_client.check('/check/fails')
- self.assertEquals(_client.args,
- ('http://nowhere:9000/check/fails', ))
- self.assertEquals(_client.kwargs, {})
+ self.assertEqual(_client.args,
+ ('http://nowhere:9000/check/fails', ))
+ self.assertEqual(_client.kwargs, {})
diff --git a/swh/deposit/tests/loader/test_loader.py b/swh/deposit/tests/loader/test_loader.py
index 2103d4ed..cb6e81d8 100644
--- a/swh/deposit/tests/loader/test_loader.py
+++ b/swh/deposit/tests/loader/test_loader.py
@@ -1,305 +1,305 @@
# Copyright (C) 2017-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
import unittest
import shutil
import pytest
from rest_framework.test import APITestCase
from swh.model import hashutil
from swh.deposit.models import Deposit
from swh.deposit.loader import loader
from swh.deposit.config import (
PRIVATE_GET_RAW_CONTENT, PRIVATE_GET_DEPOSIT_METADATA, PRIVATE_PUT_DEPOSIT
)
from django.core.urlresolvers import reverse
from .common import SWHDepositTestClient, CLIENT_TEST_CONFIG
from .. import TEST_LOADER_CONFIG
from ..common import (BasicTestCase, WithAuthTestCase,
CommonCreationRoutine,
FileSystemCreationRoutine)
TOOL_ID = 99
PROVIDER_ID = 12
class DepositLoaderInhibitsStorage:
"""Mixin class to inhibit the persistence and keep in memory the data
sent for storage.
cf. SWHDepositLoaderNoStorage
"""
def __init__(self, client=None):
# client is not used here, transit it nonetheless to other mixins
super().__init__(client=client)
# typed data
self.state = {
'origin': [],
'origin_visit': [],
'origin_metadata': [],
'content': [],
'directory': [],
'revision': [],
'release': [],
'snapshot': [],
'tool': [],
'provider': []
}
def _add(self, type, l):
"""Add without duplicates and keeping the insertion order.
Args:
type (str): Type of objects concerned by the action
l ([object]): List of 'type' object
"""
col = self.state[type]
for o in l:
if o in col:
continue
col.extend([o])
def send_origin(self, origin):
origin.update({'id': 1})
self._add('origin', [origin])
return origin['id']
def send_origin_visit(self, origin_id, visit_date):
origin_visit = {
'origin': origin_id,
'visit_date': visit_date,
'visit': 1,
}
self._add('origin_visit', [origin_visit])
return origin_visit
def send_origin_metadata(self, origin_id, visit_date, provider_id, tool_id,
metadata):
origin_metadata = {
'origin_id': origin_id,
'visit_date': visit_date,
'provider_id': provider_id,
'tool_id': tool_id,
'metadata': metadata
}
self._add('origin_metadata', [origin_metadata])
return origin_metadata
def send_tool(self, tool):
tool = {
'tool_name': tool['tool_name'],
'tool_version': tool['tool_version'],
'tool_configuration': tool['tool_configuration']
}
self._add('tool', [tool])
tool_id = TOOL_ID
return tool_id
def send_provider(self, provider):
provider = {
'provider_name': provider['provider_name'],
'provider_type': provider['provider_type'],
'provider_url': provider['provider_url'],
'metadata': provider['metadata']
}
self._add('provider', [provider])
provider_id = PROVIDER_ID
return provider_id
def maybe_load_contents(self, contents):
self._add('content', contents)
def maybe_load_directories(self, directories):
self._add('directory', directories)
def maybe_load_revisions(self, revisions):
self._add('revision', revisions)
def maybe_load_releases(self, releases):
self._add('release', releases)
def maybe_load_snapshot(self, snapshot):
self._add('snapshot', [snapshot])
def open_fetch_history(self):
pass
def close_fetch_history_failure(self, fetch_history_id):
pass
def close_fetch_history_success(self, fetch_history_id):
pass
def update_origin_visit(self, origin_id, visit, status):
self.status = status
# Override to do nothing at the end
def close_failure(self):
pass
def close_success(self):
pass
class TestLoaderUtils(unittest.TestCase):
def assertRevisionsOk(self, expected_revisions): # noqa: N802
"""Check the loader's revisions match the expected revisions.
Expects self.loader to be instantiated and ready to be
inspected (meaning the loading took place).
Args:
expected_revisions (dict): Dict with key revision id,
value the targeted directory id.
"""
# The last revision being the one used later to start back from
for rev in self.loader.state['revision']:
rev_id = hashutil.hash_to_hex(rev['id'])
directory_id = hashutil.hash_to_hex(rev['directory'])
- self.assertEquals(expected_revisions[rev_id], directory_id)
+ self.assertEqual(expected_revisions[rev_id], directory_id)
class SWHDepositLoaderNoStorage(DepositLoaderInhibitsStorage,
loader.DepositLoader):
"""Loader to test.
It inherits from the actual deposit loader to actually test its
correct behavior. It also inherits from
DepositLoaderInhibitsStorage so that no persistence takes place.
"""
pass
@pytest.mark.fs
class DepositLoaderScenarioTest(APITestCase, WithAuthTestCase,
BasicTestCase, CommonCreationRoutine,
FileSystemCreationRoutine, TestLoaderUtils):
def setUp(self):
super().setUp()
# create the extraction dir used by the loader
os.makedirs(TEST_LOADER_CONFIG['extraction_dir'], exist_ok=True)
# 1. create a deposit with archive and metadata
self.deposit_id = self.create_simple_binary_deposit()
# 2. Sets a basic client which accesses the test data
loader_client = SWHDepositTestClient(self.client,
config=CLIENT_TEST_CONFIG)
# 3. setup loader with no persistence and that client
self.loader = SWHDepositLoaderNoStorage(client=loader_client)
def tearDown(self):
super().tearDown()
shutil.rmtree(TEST_LOADER_CONFIG['extraction_dir'])
def test_inject_deposit_ready(self):
"""Load a deposit which is ready
"""
args = [self.collection.name, self.deposit_id]
archive_url = reverse(PRIVATE_GET_RAW_CONTENT, args=args)
deposit_meta_url = reverse(PRIVATE_GET_DEPOSIT_METADATA, args=args)
deposit_update_url = reverse(PRIVATE_PUT_DEPOSIT, args=args)
# when
self.loader.load(archive_url=archive_url,
deposit_meta_url=deposit_meta_url,
deposit_update_url=deposit_update_url)
# then
- self.assertEquals(len(self.loader.state['content']), 1)
- self.assertEquals(len(self.loader.state['directory']), 1)
- self.assertEquals(len(self.loader.state['revision']), 1)
- self.assertEquals(len(self.loader.state['release']), 0)
- self.assertEquals(len(self.loader.state['snapshot']), 1)
+ self.assertEqual(len(self.loader.state['content']), 1)
+ self.assertEqual(len(self.loader.state['directory']), 1)
+ self.assertEqual(len(self.loader.state['revision']), 1)
+ self.assertEqual(len(self.loader.state['release']), 0)
+ self.assertEqual(len(self.loader.state['snapshot']), 1)
def test_inject_deposit_verify_metadata(self):
"""Load a deposit with metadata, test metadata integrity
"""
self.deposit_metadata_id = self.add_metadata_to_deposit(
self.deposit_id)
args = [self.collection.name, self.deposit_metadata_id]
archive_url = reverse(PRIVATE_GET_RAW_CONTENT, args=args)
deposit_meta_url = reverse(PRIVATE_GET_DEPOSIT_METADATA, args=args)
deposit_update_url = reverse(PRIVATE_PUT_DEPOSIT, args=args)
# when
self.loader.load(archive_url=archive_url,
deposit_meta_url=deposit_meta_url,
deposit_update_url=deposit_update_url)
# then
- self.assertEquals(len(self.loader.state['content']), 1)
- self.assertEquals(len(self.loader.state['directory']), 1)
- self.assertEquals(len(self.loader.state['revision']), 1)
- self.assertEquals(len(self.loader.state['release']), 0)
- self.assertEquals(len(self.loader.state['snapshot']), 1)
- self.assertEquals(len(self.loader.state['origin_metadata']), 1)
- self.assertEquals(len(self.loader.state['tool']), 1)
- self.assertEquals(len(self.loader.state['provider']), 1)
+ self.assertEqual(len(self.loader.state['content']), 1)
+ self.assertEqual(len(self.loader.state['directory']), 1)
+ self.assertEqual(len(self.loader.state['revision']), 1)
+ self.assertEqual(len(self.loader.state['release']), 0)
+ self.assertEqual(len(self.loader.state['snapshot']), 1)
+ self.assertEqual(len(self.loader.state['origin_metadata']), 1)
+ self.assertEqual(len(self.loader.state['tool']), 1)
+ self.assertEqual(len(self.loader.state['provider']), 1)
codemeta = 'codemeta:'
origin_url = 'https://hal-test.archives-ouvertes.fr/hal-01243065'
expected_origin_metadata = {
'@xmlns': 'http://www.w3.org/2005/Atom',
'@xmlns:codemeta': 'https://doi.org/10.5063/SCHEMA/CODEMETA-2.0',
'author': {
'email': 'hal@ccsd.cnrs.fr',
'name': 'HAL'
},
codemeta + 'url': origin_url,
codemeta + 'runtimePlatform': 'phpstorm',
codemeta + 'license': [
{
codemeta + 'name': 'GNU General Public License v3.0 only'
},
{
codemeta + 'name': 'CeCILL Free Software License Agreement v1.1' # noqa
}
],
codemeta + 'author': {
codemeta + 'name': 'Morane Gruenpeter'
},
codemeta + 'programmingLanguage': ['php', 'python', 'C'],
codemeta + 'applicationCategory': 'test',
codemeta + 'dateCreated': '2017-05-03T16:08:47+02:00',
codemeta + 'version': '1',
'external_identifier': 'hal-01243065',
'title': 'Composing a Web of Audio Applications',
codemeta + 'description': 'this is the description',
'id': 'hal-01243065',
'client': 'hal',
codemeta + 'keywords': 'DSP programming,Web',
codemeta + 'developmentStatus': 'stable'
}
result = self.loader.state['origin_metadata'][0]
- self.assertEquals(result['metadata'], expected_origin_metadata)
- self.assertEquals(result['tool_id'], TOOL_ID)
- self.assertEquals(result['provider_id'], PROVIDER_ID)
+ self.assertEqual(result['metadata'], expected_origin_metadata)
+ self.assertEqual(result['tool_id'], TOOL_ID)
+ self.assertEqual(result['provider_id'], PROVIDER_ID)
deposit = Deposit.objects.get(pk=self.deposit_id)
self.assertRegex(deposit.swh_id, r'^swh:1:dir:.*')
- self.assertEquals(deposit.swh_id_context, '%s;origin=%s' % (
+ self.assertEqual(deposit.swh_id_context, '%s;origin=%s' % (
deposit.swh_id, origin_url
))
self.assertRegex(deposit.swh_anchor_id, r'^swh:1:rev:.*')
- self.assertEquals(deposit.swh_anchor_id_context, '%s;origin=%s' % (
+ self.assertEqual(deposit.swh_anchor_id_context, '%s;origin=%s' % (
deposit.swh_anchor_id, origin_url
))
diff --git a/swh/deposit/tests/test_utils.py b/swh/deposit/tests/test_utils.py
index d6e03835..aa3ecfa7 100644
--- a/swh/deposit/tests/test_utils.py
+++ b/swh/deposit/tests/test_utils.py
@@ -1,132 +1,132 @@
# Copyright (C) 2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import unittest
from swh.deposit import utils
class UtilsTestCase(unittest.TestCase):
"""Utils library
"""
def test_merge(self):
"""Calling utils.merge on dicts should merge without losing information
"""
d0 = {
'author': 'someone',
'license': [['gpl2']],
'a': 1
}
d1 = {
'author': ['author0', {'name': 'author1'}],
'license': [['gpl3']],
'b': {
'1': '2'
}
}
d2 = {
'author': map(lambda x: x, ['else']),
'license': 'mit',
'b': {
'2': '3',
}
}
d3 = {
'author': (v for v in ['no one']),
}
actual_merge = utils.merge(d0, d1, d2, d3)
expected_merge = {
'a': 1,
'license': [['gpl2'], ['gpl3'], 'mit'],
'author': [
'someone', 'author0', {'name': 'author1'}, 'else', 'no one'],
'b': {
'1': '2',
'2': '3',
}
}
- self.assertEquals(actual_merge, expected_merge)
+ self.assertEqual(actual_merge, expected_merge)
def test_merge_2(self):
d0 = {
'license': 'gpl2',
'runtime': {
'os': 'unix derivative'
}
}
d1 = {
'license': 'gpl3',
'runtime': 'GNU/Linux'
}
expected = {
'license': ['gpl2', 'gpl3'],
'runtime': [
{
'os': 'unix derivative'
},
'GNU/Linux'
],
}
actual = utils.merge(d0, d1)
self.assertEqual(actual, expected)
def test_merge_edge_cases(self):
input_dict = {
'license': ['gpl2', 'gpl3'],
'runtime': [
{
'os': 'unix derivative'
},
'GNU/Linux'
],
}
# against empty dict
actual = utils.merge(input_dict, {})
self.assertEqual(actual, input_dict)
# against oneself
actual = utils.merge(input_dict, input_dict, input_dict)
self.assertEqual(input_dict, input_dict)
def test_merge_one_dict(self):
"""Merge one dict should result in the same dict value
"""
input_and_expected = {'anything': 'really'}
actual = utils.merge(input_and_expected)
self.assertEqual(actual, input_and_expected)
def test_merge_raise(self):
"""Calling utils.merge with any no dict argument should raise
"""
d0 = {
'author': 'someone',
'a': 1
}
d1 = ['not a dict']
with self.assertRaises(ValueError):
utils.merge(d0, d1)
with self.assertRaises(ValueError):
utils.merge(d1, d0)
with self.assertRaises(ValueError):
utils.merge(d1)
- self.assertEquals(utils.merge(d0), d0)
+ self.assertEqual(utils.merge(d0), d0)