diff --git a/pytest.ini b/pytest.ini
index afa4cf37..dc2e5dc8 100644
--- a/pytest.ini
+++ b/pytest.ini
@@ -1,2 +1,3 @@
[pytest]
norecursedirs = docs
+DJANGO_SETTINGS_MODULE = swh.deposit.settings.testing
diff --git a/requirements-test.txt b/requirements-test.txt
index 2a6f75e5..125b4602 100644
--- a/requirements-test.txt
+++ b/requirements-test.txt
@@ -1,2 +1,2 @@
-nose
-django_nose
+pytest
+pytest-django
diff --git a/swh/deposit/settings/testing.py b/swh/deposit/settings/testing.py
index 45ee6873..c35631b3 100644
--- a/swh/deposit/settings/testing.py
+++ b/swh/deposit/settings/testing.py
@@ -1,50 +1,47 @@
# Copyright (C) 2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from .common import * # noqa
from .common import ALLOWED_HOSTS
from .development import * # noqa
from .development import INSTALLED_APPS
-# django-nose setup
+# django setup
ALLOWED_HOSTS += ['testserver']
-INSTALLED_APPS += ['django_nose']
-
-TEST_RUNNER = 'django_nose.NoseTestSuiteRunner'
-NOSE_ARGS = ['--verbosity=3', '-s'] # to see test pass
+INSTALLED_APPS += ['pytest_django']
# https://docs.djangoproject.com/en/1.10/ref/settings/#logging
LOGGING = {
'version': 1,
'disable_existing_loggers': True,
'formatters': {
'standard': {
'format': "[%(asctime)s] %(levelname)s [%(name)s:%(lineno)s] %(message)s", # noqa
'datefmt': "%d/%b/%Y %H:%M:%S"
},
},
'handlers': {
'console': {
'level': 'ERROR',
'class': 'logging.StreamHandler',
'formatter': 'standard'
},
},
'loggers': {
'swh.deposit': {
'handlers': ['console'],
'level': 'ERROR',
},
}
}
# https://docs.djangoproject.com/en/1.11/ref/settings/#std:setting-MEDIA_ROOT
# SECURITY WARNING: Override this in the production.py module
MEDIA_ROOT = '/tmp/swh-deposit/test/uploads/'
FILE_UPLOAD_HANDLERS = [
"django.core.files.uploadhandler.MemoryFileUploadHandler",
]
diff --git a/swh/deposit/tests/api/test_deposit_check.py b/swh/deposit/tests/api/test_deposit_check.py
index d3cb3528..019ded0c 100644
--- a/swh/deposit/tests/api/test_deposit_check.py
+++ b/swh/deposit/tests/api/test_deposit_check.py
@@ -1,236 +1,236 @@
# Copyright (C) 2017-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import unittest
from django.core.urlresolvers import reverse
-from nose.plugins.attrib import attr
+import pytest
from rest_framework import status
from rest_framework.test import APITestCase
from swh.deposit.config import (
DEPOSIT_STATUS_VERIFIED, PRIVATE_CHECK_DEPOSIT,
DEPOSIT_STATUS_DEPOSITED, DEPOSIT_STATUS_REJECTED
)
from swh.deposit.api.private.deposit_check import (
SWHChecksDeposit, MANDATORY_ARCHIVE_INVALID,
MANDATORY_FIELDS_MISSING, INCOMPATIBLE_URL_FIELDS,
MANDATORY_ARCHIVE_UNSUPPORTED, ALTERNATE_FIELDS_MISSING,
MANDATORY_ARCHIVE_MISSING
)
from swh.deposit.models import Deposit
from ..common import BasicTestCase, WithAuthTestCase, CommonCreationRoutine
from ..common import FileSystemCreationRoutine
-@attr('fs')
+@pytest.mark.fs
class CheckDepositTest(APITestCase, WithAuthTestCase,
BasicTestCase, CommonCreationRoutine,
FileSystemCreationRoutine):
"""Check deposit endpoints.
"""
def setUp(self):
super().setUp()
def test_deposit_ok(self):
"""Proper deposit should succeed the checks (-> status ready)
"""
deposit_id = self.create_simple_binary_deposit(status_partial=True)
deposit_id = self.update_binary_deposit(deposit_id,
status_partial=False)
deposit = Deposit.objects.get(pk=deposit_id)
self.assertEquals(deposit.status, DEPOSIT_STATUS_DEPOSITED)
url = reverse(PRIVATE_CHECK_DEPOSIT,
args=[self.collection.name, deposit.id])
response = self.client.get(url)
self.assertEqual(response.status_code, status.HTTP_200_OK)
data = response.json()
self.assertEqual(data['status'], DEPOSIT_STATUS_VERIFIED)
deposit = Deposit.objects.get(pk=deposit.id)
self.assertEquals(deposit.status, DEPOSIT_STATUS_VERIFIED)
def test_deposit_invalid_tarball(self):
"""Deposit with tarball (of 1 tarball) should fail the checks: rejected
"""
for archive_extension in ['zip', 'tar', 'tar.gz', 'tar.bz2', 'tar.xz']:
deposit_id = self.create_deposit_archive_with_archive(
archive_extension)
deposit = Deposit.objects.get(pk=deposit_id)
self.assertEquals(DEPOSIT_STATUS_DEPOSITED, deposit.status)
url = reverse(PRIVATE_CHECK_DEPOSIT,
args=[self.collection.name, deposit.id])
response = self.client.get(url)
self.assertEqual(response.status_code, status.HTTP_200_OK)
data = response.json()
self.assertEqual(data['status'], DEPOSIT_STATUS_REJECTED)
details = data['details']
# archive checks failure
self.assertEqual(len(details['archive']), 1)
self.assertEqual(details['archive'][0]['summary'],
MANDATORY_ARCHIVE_INVALID)
deposit = Deposit.objects.get(pk=deposit.id)
self.assertEquals(deposit.status, DEPOSIT_STATUS_REJECTED)
def test_deposit_ko_missing_tarball(self):
"""Deposit without archive should fail the checks: rejected
"""
deposit_id = self.create_deposit_ready() # no archive, only atom
deposit = Deposit.objects.get(pk=deposit_id)
self.assertEquals(DEPOSIT_STATUS_DEPOSITED, deposit.status)
url = reverse(PRIVATE_CHECK_DEPOSIT,
args=[self.collection.name, deposit.id])
response = self.client.get(url)
self.assertEqual(response.status_code, status.HTTP_200_OK)
data = response.json()
self.assertEqual(data['status'], DEPOSIT_STATUS_REJECTED)
details = data['details']
# archive checks failure
self.assertEqual(len(details['archive']), 1)
self.assertEqual(details['archive'][0]['summary'],
MANDATORY_ARCHIVE_MISSING)
deposit = Deposit.objects.get(pk=deposit.id)
self.assertEquals(deposit.status, DEPOSIT_STATUS_REJECTED)
def test_deposit_ko_unsupported_tarball(self):
"""Deposit with an unsupported tarball should fail the checks: rejected
"""
deposit_id = self.create_deposit_with_invalid_archive()
deposit = Deposit.objects.get(pk=deposit_id)
self.assertEquals(DEPOSIT_STATUS_DEPOSITED, deposit.status)
url = reverse(PRIVATE_CHECK_DEPOSIT,
args=[self.collection.name, deposit.id])
response = self.client.get(url)
self.assertEqual(response.status_code, status.HTTP_200_OK)
data = response.json()
self.assertEqual(data['status'], DEPOSIT_STATUS_REJECTED)
details = data['details']
# archive checks failure
self.assertEqual(len(details['archive']), 1)
self.assertEqual(details['archive'][0]['summary'],
MANDATORY_ARCHIVE_UNSUPPORTED)
# metadata check failure
self.assertEqual(len(details['metadata']), 2)
mandatory = details['metadata'][0]
self.assertEqual(mandatory['summary'], MANDATORY_FIELDS_MISSING)
self.assertEqual(set(mandatory['fields']),
set(['url', 'external_identifier', 'author']))
alternate = details['metadata'][1]
self.assertEqual(alternate['summary'], ALTERNATE_FIELDS_MISSING)
self.assertEqual(alternate['fields'], ['name or title'])
# url check failure
self.assertEqual(details['url']['summary'], INCOMPATIBLE_URL_FIELDS)
deposit = Deposit.objects.get(pk=deposit.id)
self.assertEquals(deposit.status, DEPOSIT_STATUS_REJECTED)
def test_check_deposit_metadata_ok(self):
"""Proper deposit should succeed the checks (-> status ready)
with all **MUST** metadata
using the codemeta metadata test set
"""
deposit_id = self.create_simple_binary_deposit(status_partial=True)
deposit_id_metadata = self.add_metadata_to_deposit(deposit_id)
self.assertEquals(deposit_id, deposit_id_metadata)
deposit = Deposit.objects.get(pk=deposit_id)
self.assertEquals(deposit.status, DEPOSIT_STATUS_DEPOSITED)
url = reverse(PRIVATE_CHECK_DEPOSIT,
args=[self.collection.name, deposit.id])
response = self.client.get(url)
self.assertEqual(response.status_code, status.HTTP_200_OK)
data = response.json()
self.assertEqual(data['status'], DEPOSIT_STATUS_VERIFIED)
deposit = Deposit.objects.get(pk=deposit.id)
self.assertEquals(deposit.status, DEPOSIT_STATUS_VERIFIED)
class CheckMetadata(unittest.TestCase, SWHChecksDeposit):
def test_check_metadata_ok(self):
actual_check, detail = self._check_metadata({
'url': 'something',
'external_identifier': 'something-else',
'name': 'foo',
'author': 'someone',
})
self.assertTrue(actual_check)
self.assertIsNone(detail)
def test_check_metadata_ok2(self):
actual_check, detail = self._check_metadata({
'url': 'something',
'external_identifier': 'something-else',
'title': 'bar',
'author': 'someone',
})
self.assertTrue(actual_check)
self.assertIsNone(detail)
def test_check_metadata_ko(self):
"""Missing optional field should be caught
"""
actual_check, error_detail = self._check_metadata({
'url': 'something',
'external_identifier': 'something-else',
'author': 'someone',
})
expected_error = {
'metadata': [{
'summary': 'Mandatory alternate fields are missing',
'fields': ['name or title'],
}]
}
self.assertFalse(actual_check)
self.assertEqual(error_detail, expected_error)
def test_check_metadata_ko2(self):
"""Missing mandatory fields should be caught
"""
actual_check, error_detail = self._check_metadata({
'url': 'something',
'external_identifier': 'something-else',
'title': 'foobar',
})
expected_error = {
'metadata': [{
'summary': 'Mandatory fields are missing',
'fields': ['author'],
}]
}
self.assertFalse(actual_check)
self.assertEqual(error_detail, expected_error)
diff --git a/swh/deposit/tests/api/test_deposit_list.py b/swh/deposit/tests/api/test_deposit_list.py
index e22cd739..9e7fc587 100644
--- a/swh/deposit/tests/api/test_deposit_list.py
+++ b/swh/deposit/tests/api/test_deposit_list.py
@@ -1,94 +1,94 @@
# Copyright (C) 2017-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from django.core.urlresolvers import reverse
-from nose.plugins.attrib import attr
+import pytest
from rest_framework import status
from rest_framework.test import APITestCase
from swh.deposit.api.converters import convert_status_detail
from ...config import DEPOSIT_STATUS_PARTIAL, PRIVATE_LIST_DEPOSITS
from ..common import BasicTestCase, WithAuthTestCase, CommonCreationRoutine
from ...models import Deposit
-@attr('fs')
+@pytest.mark.fs
class CheckDepositListTest(APITestCase, WithAuthTestCase,
BasicTestCase, CommonCreationRoutine):
"""Check deposit list endpoints.
"""
def setUp(self):
super().setUp()
def test_deposit_list(self):
"""Deposit list api should return the deposits
"""
deposit_id = self.create_deposit_partial()
# amend the deposit with a status_detail
deposit = Deposit.objects.get(pk=deposit_id)
status_detail = {
'url': {
'summary': 'At least one compatible url field. Failed',
'fields': ['testurl'],
},
'metadata': [
{
'summary': 'Mandatory fields missing',
'fields': ['9', 10, 1.212],
},
],
'archive': [
{
'summary': 'Invalid archive',
'fields': ['3'],
},
{
'summary': 'Unsupported archive',
'fields': [2],
}
],
}
deposit.status_detail = status_detail
deposit.save()
deposit_id2 = self.create_deposit_partial()
# NOTE: does not work as documented
# https://docs.djangoproject.com/en/1.11/ref/urlresolvers/#django.core.urlresolvers.reverse # noqa
# url = reverse(PRIVATE_LIST_DEPOSITS, kwargs={'page_size': 1})
main_url = reverse(PRIVATE_LIST_DEPOSITS)
url = '%s?page_size=1' % main_url
response = self.client.get(url)
self.assertEqual(response.status_code, status.HTTP_200_OK)
data = response.json()
self.assertEqual(data['count'], 2) # 2 deposits
expected_next = '%s?page=2&page_size=1' % main_url
self.assertTrue(data['next'].endswith(expected_next))
self.assertIsNone(data['previous'])
self.assertEqual(len(data['results']), 1) # page of size 1
deposit = data['results'][0]
self.assertEquals(deposit['id'], deposit_id)
self.assertEquals(deposit['status'], DEPOSIT_STATUS_PARTIAL)
expected_status_detail = convert_status_detail(status_detail)
self.assertEquals(deposit['status_detail'], expected_status_detail)
# then 2nd page
response2 = self.client.get(expected_next)
self.assertEqual(response2.status_code, status.HTTP_200_OK)
data2 = response2.json()
self.assertEqual(data2['count'], 2) # still 2 deposits
self.assertIsNone(data2['next'])
expected_previous = '%s?page_size=1' % main_url
self.assertTrue(data2['previous'].endswith(expected_previous))
self.assertEqual(len(data2['results']), 1) # page of size 1
deposit2 = data2['results'][0]
self.assertEquals(deposit2['id'], deposit_id2)
self.assertEquals(deposit2['status'], DEPOSIT_STATUS_PARTIAL)
diff --git a/swh/deposit/tests/api/test_deposit_read_archive.py b/swh/deposit/tests/api/test_deposit_read_archive.py
index 7ed3b85a..086ec6e9 100644
--- a/swh/deposit/tests/api/test_deposit_read_archive.py
+++ b/swh/deposit/tests/api/test_deposit_read_archive.py
@@ -1,125 +1,125 @@
# Copyright (C) 2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import hashlib
import os
from django.core.urlresolvers import reverse
-from nose.plugins.attrib import attr
+import pytest
from rest_framework import status
from rest_framework.test import APITestCase
from swh.core import tarball
from swh.deposit.config import PRIVATE_GET_RAW_CONTENT
from swh.deposit.tests import TEST_CONFIG
from ..common import BasicTestCase, WithAuthTestCase, CommonCreationRoutine
from ..common import FileSystemCreationRoutine, create_arborescence_archive
-@attr('fs')
+@pytest.mark.fs
class DepositReadArchivesTest(APITestCase, WithAuthTestCase,
BasicTestCase, CommonCreationRoutine,
FileSystemCreationRoutine):
def setUp(self):
super().setUp()
self.archive2 = create_arborescence_archive(
self.root_path, 'archive2', 'file2', b'some other content in file')
self.workdir = os.path.join(self.root_path, 'workdir')
def test_access_to_existing_deposit_with_one_archive(self):
"""Access to deposit should stream a 200 response with its raw content
"""
deposit_id = self.create_simple_binary_deposit()
url = reverse(PRIVATE_GET_RAW_CONTENT,
args=[self.collection.name, deposit_id])
r = self.client.get(url)
self.assertEquals(r.status_code, status.HTTP_200_OK)
self.assertEquals(r._headers['content-type'][1],
'application/octet-stream')
# read the stream
data = b''.join(r.streaming_content)
actual_sha1 = hashlib.sha1(data).hexdigest()
self.assertEquals(actual_sha1, self.archive['sha1sum'])
# this does not touch the extraction dir so this should stay empty
self.assertEquals(os.listdir(TEST_CONFIG['extraction_dir']), [])
def _check_tarball_consistency(self, actual_sha1):
tarball.uncompress(self.archive['path'], self.workdir)
self.assertEquals(os.listdir(self.workdir), ['file1'])
tarball.uncompress(self.archive2['path'], self.workdir)
lst = set(os.listdir(self.workdir))
self.assertEquals(lst, {'file1', 'file2'})
new_path = self.workdir + '.zip'
tarball.compress(new_path, 'zip', self.workdir)
with open(new_path, 'rb') as f:
h = hashlib.sha1(f.read()).hexdigest()
self.assertEqual(actual_sha1, h)
self.assertNotEqual(actual_sha1, self.archive['sha1sum'])
self.assertNotEqual(actual_sha1, self.archive2['sha1sum'])
def test_access_to_existing_deposit_with_multiple_archives(self):
"""Access to deposit should stream a 200 response with its raw contents
"""
deposit_id = self.create_complex_binary_deposit()
url = reverse(PRIVATE_GET_RAW_CONTENT,
args=[self.collection.name, deposit_id])
r = self.client.get(url)
self.assertEquals(r.status_code, status.HTTP_200_OK)
self.assertEquals(r._headers['content-type'][1],
'application/octet-stream')
# read the stream
data = b''.join(r.streaming_content)
actual_sha1 = hashlib.sha1(data).hexdigest()
self._check_tarball_consistency(actual_sha1)
# this touches the extraction directory but should clean up
# after itself
self.assertEquals(os.listdir(TEST_CONFIG['extraction_dir']), [])
class DepositReadArchivesFailureTest(APITestCase, WithAuthTestCase,
BasicTestCase, CommonCreationRoutine):
def test_access_to_nonexisting_deposit_returns_404_response(self):
"""Read unknown collection should return a 404 response
"""
unknown_id = '999'
url = reverse(PRIVATE_GET_RAW_CONTENT,
args=[self.collection.name, unknown_id])
response = self.client.get(url)
self.assertEqual(response.status_code,
status.HTTP_404_NOT_FOUND)
self.assertIn('Deposit with id %s does not exist' % unknown_id,
response.content.decode('utf-8'))
def test_access_to_nonexisting_collection_returns_404_response(self):
"""Read unknown deposit should return a 404 response
"""
collection_name = 'non-existing'
deposit_id = self.create_deposit_partial()
url = reverse(PRIVATE_GET_RAW_CONTENT,
args=[collection_name, deposit_id])
response = self.client.get(url)
self.assertEqual(response.status_code,
status.HTTP_404_NOT_FOUND)
self.assertIn('Unknown collection name %s' % collection_name,
response.content.decode('utf-8'))
diff --git a/swh/deposit/tests/common.py b/swh/deposit/tests/common.py
index c2799717..d7d9de20 100644
--- a/swh/deposit/tests/common.py
+++ b/swh/deposit/tests/common.py
@@ -1,573 +1,573 @@
# Copyright (C) 2017-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import base64
import hashlib
import os
import shutil
import tarfile
import tempfile
from django.core.urlresolvers import reverse
from django.test import TestCase
from io import BytesIO
-from nose.plugins.attrib import attr
+import pytest
from rest_framework import status
from swh.deposit.config import (COL_IRI, EM_IRI, EDIT_SE_IRI,
DEPOSIT_STATUS_PARTIAL,
DEPOSIT_STATUS_VERIFIED,
DEPOSIT_STATUS_REJECTED,
DEPOSIT_STATUS_DEPOSITED)
from swh.deposit.models import DepositClient, DepositCollection, Deposit
from swh.deposit.models import DepositRequest
from swh.deposit.models import DepositRequestType
from swh.deposit.parsers import parse_xml
from swh.deposit.settings.testing import MEDIA_ROOT
from swh.core import tarball
def compute_info(archive_path):
"""Given a path, compute information on path.
"""
with open(archive_path, 'rb') as f:
length = 0
sha1sum = hashlib.sha1()
md5sum = hashlib.md5()
data = b''
for chunk in f:
sha1sum.update(chunk)
md5sum.update(chunk)
length += len(chunk)
data += chunk
return {
'dir': os.path.dirname(archive_path),
'name': os.path.basename(archive_path),
'path': archive_path,
'length': length,
'sha1sum': sha1sum.hexdigest(),
'md5sum': md5sum.hexdigest(),
'data': data
}
def _compress(path, extension, dir_path):
"""Compress path according to extension
"""
if extension == 'zip' or extension == 'tar':
return tarball.compress(path, extension, dir_path)
elif '.' in extension:
split_ext = extension.split('.')
if split_ext[0] != 'tar':
raise ValueError(
'Development error, only zip or tar archive supported, '
'%s not supported' % extension)
# deal with specific tar
mode = split_ext[1]
supported_mode = ['xz', 'gz', 'bz2']
if mode not in supported_mode:
raise ValueError(
'Development error, only %s supported, %s not supported' % (
supported_mode, mode))
files = tarball._ls(dir_path)
with tarfile.open(path, 'w:%s' % mode) as t:
for fpath, fname in files:
t.add(fpath, arcname=fname, recursive=False)
return path
def create_arborescence_archive(root_path, archive_name, filename, content,
up_to_size=None, extension='zip'):
"""Build an archive named archive_name in the root_path.
This archive contains one file named filename with the content content.
Args:
root_path (str): Location path of the archive to create
archive_name (str): Archive's name (without extension)
filename (str): Archive's content is only one filename
content (bytes): Content of the filename
up_to_size (int | None): Fill in the blanks size to oversize
or complete an archive's size
extension (str): Extension of the archive to write (default is zip)
Returns:
dict with the keys:
- dir: the directory of that archive
- path: full path to the archive
- sha1sum: archive's sha1sum
- length: archive's length
"""
os.makedirs(root_path, exist_ok=True)
archive_path_dir = tempfile.mkdtemp(dir=root_path)
dir_path = os.path.join(archive_path_dir, archive_name)
os.mkdir(dir_path)
filepath = os.path.join(dir_path, filename)
_length = len(content)
count = 0
batch_size = 128
with open(filepath, 'wb') as f:
f.write(content)
if up_to_size: # fill with blank content up to a given size
count += _length
while count < up_to_size:
f.write(b'0'*batch_size)
count += batch_size
_path = '%s.%s' % (dir_path, extension)
_path = _compress(_path, extension, dir_path)
return compute_info(_path)
def create_archive_with_archive(root_path, name, archive):
"""Create an archive holding another.
"""
invalid_archive_path = os.path.join(root_path, name)
with tarfile.open(invalid_archive_path, 'w:gz') as _archive:
_archive.add(archive['path'], arcname=archive['name'])
return compute_info(invalid_archive_path)
-@attr('fs')
+@pytest.mark.fs
class FileSystemCreationRoutine(TestCase):
"""Mixin intended for tests needed to tamper with archives.
"""
def setUp(self):
"""Define the test client and other test variables."""
super().setUp()
self.root_path = '/tmp/swh-deposit/test/build-zip/'
os.makedirs(self.root_path, exist_ok=True)
self.archive = create_arborescence_archive(
self.root_path, 'archive1', 'file1', b'some content in file')
self.atom_entry = b"""
Awesome Compiler
urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a
1785io25c695
2017-10-07T15:17:08Z
some awesome author
https://hal-test.archives-ouvertes.fr
"""
def tearDown(self):
super().tearDown()
shutil.rmtree(self.root_path)
def create_simple_binary_deposit(self, status_partial=True):
response = self.client.post(
reverse(COL_IRI, args=[self.collection.name]),
content_type='application/zip',
data=self.archive['data'],
CONTENT_LENGTH=self.archive['length'],
HTTP_MD5SUM=self.archive['md5sum'],
HTTP_SLUG='external-id',
HTTP_IN_PROGRESS=status_partial,
HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % (
self.archive['name'], ))
# then
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
response_content = parse_xml(BytesIO(response.content))
_status = response_content['deposit_status']
if status_partial:
expected_status = DEPOSIT_STATUS_PARTIAL
else:
expected_status = DEPOSIT_STATUS_VERIFIED
self.assertEqual(_status, expected_status)
deposit_id = int(response_content['deposit_id'])
return deposit_id
def create_complex_binary_deposit(self, status_partial=False):
deposit_id = self.create_simple_binary_deposit(
status_partial=True)
# Add a second archive to the deposit
# update its status to DEPOSIT_STATUS_VERIFIED
response = self.client.post(
reverse(EM_IRI, args=[self.collection.name, deposit_id]),
content_type='application/zip',
data=self.archive2['data'],
CONTENT_LENGTH=self.archive2['length'],
HTTP_MD5SUM=self.archive2['md5sum'],
HTTP_SLUG='external-id',
HTTP_IN_PROGRESS=status_partial,
HTTP_CONTENT_DISPOSITION='attachment; filename=filename1.zip')
# then
assert response.status_code == status.HTTP_201_CREATED
response_content = parse_xml(BytesIO(response.content))
deposit_id = int(response_content['deposit_id'])
return deposit_id
def create_deposit_archive_with_archive(self, archive_extension):
# we create the holding archive to a given extension
archive = create_arborescence_archive(
self.root_path, 'archive1', 'file1', b'some content in file',
extension=archive_extension)
# now we create an archive holding the first created archive
invalid_archive = create_archive_with_archive(
self.root_path, 'invalid.tar.gz', archive)
# we deposit it
response = self.client.post(
reverse(COL_IRI, args=[self.collection.name]),
content_type='application/x-tar',
data=invalid_archive['data'],
CONTENT_LENGTH=invalid_archive['length'],
HTTP_MD5SUM=invalid_archive['md5sum'],
HTTP_SLUG='external-id',
HTTP_IN_PROGRESS=False,
HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % (
invalid_archive['name'], ))
# then
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
response_content = parse_xml(BytesIO(response.content))
_status = response_content['deposit_status']
self.assertEqual(_status, DEPOSIT_STATUS_DEPOSITED)
deposit_id = int(response_content['deposit_id'])
return deposit_id
def update_binary_deposit(self, deposit_id, status_partial=False):
# update existing deposit with atom entry metadata
response = self.client.post(
reverse(EDIT_SE_IRI, args=[self.collection.name, deposit_id]),
content_type='application/atom+xml;type=entry',
data=self.codemeta_entry_data1,
HTTP_SLUG='external-id',
HTTP_IN_PROGRESS=status_partial)
# then
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
response_content = parse_xml(BytesIO(response.content))
_status = response_content['deposit_status']
if status_partial:
expected_status = DEPOSIT_STATUS_PARTIAL
else:
expected_status = DEPOSIT_STATUS_DEPOSITED
self.assertEqual(_status, expected_status)
deposit_id = int(response_content['deposit_id'])
return deposit_id
-@attr('fs')
+@pytest.mark.fs
class BasicTestCase(TestCase):
"""Mixin intended for data setup purposes (user, collection, etc...)
"""
def setUp(self):
"""Define the test client and other test variables."""
super().setUp()
# expanding diffs in tests
self.maxDiff = None
# basic minimum test data
deposit_request_types = {}
# Add deposit request types
for deposit_request_type in ['archive', 'metadata']:
drt = DepositRequestType(name=deposit_request_type)
drt.save()
deposit_request_types[deposit_request_type] = drt
_name = 'hal'
_provider_url = 'https://hal-test.archives-ouvertes.fr/'
_domain = 'archives-ouvertes.fr/'
# set collection up
_collection = DepositCollection(name=_name)
_collection.save()
# set user/client up
_client = DepositClient.objects.create_user(username=_name,
password=_name,
provider_url=_provider_url,
domain=_domain)
_client.collections = [_collection.id]
_client.last_name = _name
_client.save()
self.collection = _collection
self.user = _client
self.username = _name
self.userpass = _name
self.deposit_request_types = deposit_request_types
def tearDown(self):
super().tearDown()
# Clean up uploaded files in temporary directory (tests have
# their own media root folder)
if os.path.exists(MEDIA_ROOT):
for d in os.listdir(MEDIA_ROOT):
shutil.rmtree(os.path.join(MEDIA_ROOT, d))
class WithAuthTestCase(TestCase):
"""Mixin intended for testing the api with basic authentication.
"""
def setUp(self):
super().setUp()
_token = '%s:%s' % (self.username, self.userpass)
token = base64.b64encode(_token.encode('utf-8'))
authorization = 'Basic %s' % token.decode('utf-8')
self.client.credentials(HTTP_AUTHORIZATION=authorization)
def tearDown(self):
super().tearDown()
self.client.credentials()
class CommonCreationRoutine(TestCase):
"""Mixin class to share initialization routine.
cf:
`class`:test_deposit_update.DepositReplaceExistingDataTest
`class`:test_deposit_update.DepositUpdateDepositWithNewDataTest
`class`:test_deposit_update.DepositUpdateFailuresTest
`class`:test_deposit_delete.DepositDeleteTest
"""
def setUp(self):
super().setUp()
self.atom_entry_data0 = b"""
some-external-id
https://hal-test.archives-ouvertes.fr/some-external-id
some awesome author
"""
self.atom_entry_data1 = b"""
another one
no one
"""
self.atom_entry_data2 = b"""
Awesome Compiler
urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a
1785io25c695
2017-10-07T15:17:08Z
some awesome author
https://hal-test.archives-ouvertes.fr/id
"""
self.codemeta_entry_data0 = b"""
Awesome Compiler
https://hal-test.archives-ouvertes.fr/1785io25c695
urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a
1785io25c695
2017-10-07T15:17:08Z
some awesome author
description
key-word 1
"""
self.codemeta_entry_data1 = b"""
Composing a Web of Audio Applications
hal
hal-01243065
hal-01243065
https://hal-test.archives-ouvertes.fr/hal-01243065
test
DSP programming,Web
2017-05-03T16:08:47+02:00
this is the description
1
phpstorm
stable
php
python
C
GNU General Public License v3.0 only
CeCILL Free Software License Agreement v1.1
HAL
hal@ccsd.cnrs.fr
Morane Gruenpeter
"""
def create_deposit_with_invalid_archive(self,
external_id='some-external-id-1'):
url = reverse(COL_IRI, args=[self.collection.name])
data = b'some data which is clearly not a zip file'
md5sum = hashlib.md5(data).hexdigest()
# when
response = self.client.post(
url,
content_type='application/zip', # as zip
data=data,
# + headers
CONTENT_LENGTH=len(data),
# other headers needs HTTP_ prefix to be taken into account
HTTP_SLUG=external_id,
HTTP_CONTENT_MD5=md5sum,
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip',
HTTP_CONTENT_DISPOSITION='attachment; filename=filename0')
response_content = parse_xml(BytesIO(response.content))
deposit_id = int(response_content['deposit_id'])
return deposit_id
def create_deposit_with_status(
self, status,
external_id='some-external-id-1',
swh_id=None,
swh_id_context=None,
swh_anchor_id=None,
swh_anchor_id_context=None,
status_detail=None):
# create an invalid deposit which we will update further down the line
deposit_id = self.create_deposit_with_invalid_archive(external_id)
# We cannot create some form of deposit with a given status in
# test context ('rejected' for example). Update in place the
# deposit with such status to permit some further tests.
deposit = Deposit.objects.get(pk=deposit_id)
if status == DEPOSIT_STATUS_REJECTED:
deposit.status_detail = status_detail
deposit.status = status
if swh_id:
deposit.swh_id = swh_id
if swh_id_context:
deposit.swh_id_context = swh_id_context
if swh_anchor_id:
deposit.swh_anchor_id = swh_anchor_id
if swh_anchor_id_context:
deposit.swh_anchor_id_context = swh_anchor_id_context
deposit.save()
return deposit_id
def create_simple_deposit_partial(self, external_id='some-external-id'):
"""Create a simple deposit (1 request) in `partial` state and returns
its new identifier.
Returns:
deposit id
"""
response = self.client.post(
reverse(COL_IRI, args=[self.collection.name]),
content_type='application/atom+xml;type=entry',
data=self.atom_entry_data0,
HTTP_SLUG=external_id,
HTTP_IN_PROGRESS='true')
assert response.status_code == status.HTTP_201_CREATED
response_content = parse_xml(BytesIO(response.content))
deposit_id = int(response_content['deposit_id'])
return deposit_id
def create_deposit_partial_with_data_in_args(self, data):
"""Create a simple deposit (1 request) in `partial` state with the data
or metadata as an argument and returns its new identifier.
Args:
data: atom entry
Returns:
deposit id
"""
response = self.client.post(
reverse(COL_IRI, args=[self.collection.name]),
content_type='application/atom+xml;type=entry',
data=data,
HTTP_SLUG='external-id',
HTTP_IN_PROGRESS='true')
assert response.status_code == status.HTTP_201_CREATED
response_content = parse_xml(BytesIO(response.content))
deposit_id = int(response_content['deposit_id'])
return deposit_id
def _update_deposit_with_status(self, deposit_id, status_partial=False):
"""Add to a given deposit another archive and update its current
status to `deposited` (by default).
Returns:
deposit id
"""
# when
response = self.client.post(
reverse(EDIT_SE_IRI, args=[self.collection.name, deposit_id]),
content_type='application/atom+xml;type=entry',
data=self.atom_entry_data1,
HTTP_SLUG='external-id',
HTTP_IN_PROGRESS=status_partial)
# then
assert response.status_code == status.HTTP_201_CREATED
return deposit_id
def create_deposit_ready(self, external_id='some-external-id'):
"""Create a complex deposit (2 requests) in status `deposited`.
"""
deposit_id = self.create_simple_deposit_partial(
external_id=external_id)
deposit_id = self._update_deposit_with_status(deposit_id)
return deposit_id
def create_deposit_partial(self, external_id='some-external-id'):
"""Create a complex deposit (2 requests) in status `partial`.
"""
deposit_id = self.create_simple_deposit_partial(
external_id=external_id)
deposit_id = self._update_deposit_with_status(
deposit_id, status_partial=True)
return deposit_id
def add_metadata_to_deposit(self, deposit_id, status_partial=False):
"""Add metadata to deposit.
"""
# when
response = self.client.post(
reverse(EDIT_SE_IRI, args=[self.collection.name, deposit_id]),
content_type='application/atom+xml;type=entry',
data=self.codemeta_entry_data1,
HTTP_SLUG='external-id',
HTTP_IN_PROGRESS=status_partial)
assert response.status_code == status.HTTP_201_CREATED
# then
deposit = Deposit.objects.get(pk=deposit_id)
assert deposit is not None
deposit_requests = DepositRequest.objects.filter(deposit=deposit)
assert deposit_requests is not []
for dr in deposit_requests:
if dr.type.name == 'metadata':
assert deposit_requests[0].metadata is not {}
return deposit_id
diff --git a/swh/deposit/tests/loader/test_client.py b/swh/deposit/tests/loader/test_client.py
index cbdb703f..8241967a 100644
--- a/swh/deposit/tests/loader/test_client.py
+++ b/swh/deposit/tests/loader/test_client.py
@@ -1,258 +1,258 @@
# Copyright (C) 2017-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
import shutil
import tempfile
import unittest
-from nose.plugins.attrib import attr
+import pytest
from swh.deposit.client import PrivateApiDepositClient
from swh.deposit.config import DEPOSIT_STATUS_LOAD_SUCCESS
from swh.deposit.config import DEPOSIT_STATUS_LOAD_FAILURE
from .common import CLIENT_TEST_CONFIG
class StreamedResponse:
"""Streamed response facsimile
"""
def __init__(self, ok, stream):
self.ok = ok
self.stream = stream
def iter_content(self):
yield from self.stream
class FakeRequestClientGet:
"""Fake request client dedicated to get method calls.
"""
def __init__(self, response):
self.response = response
def get(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
return self.response
-@attr('fs')
+@pytest.mark.fs
class PrivateApiDepositClientReadArchiveTest(unittest.TestCase):
def setUp(self):
super().setUp()
self.temporary_directory = tempfile.mkdtemp(dir='/tmp')
def tearDown(self):
super().setUp()
shutil.rmtree(self.temporary_directory)
def test_archive_get(self):
"""Reading archive should write data in temporary directory
"""
stream_content = [b"some", b"streamed", b"response"]
response = StreamedResponse(
ok=True,
stream=(s for s in stream_content))
_client = FakeRequestClientGet(response)
deposit_client = PrivateApiDepositClient(config=CLIENT_TEST_CONFIG,
_client=_client)
archive_path = os.path.join(self.temporary_directory, 'test.archive')
archive_path = deposit_client.archive_get('/some/url', archive_path)
self.assertTrue(os.path.exists(archive_path))
with open(archive_path, 'rb') as f:
actual_content = f.read()
self.assertEquals(actual_content, b''.join(stream_content))
self.assertEquals(_client.args, ('http://nowhere:9000/some/url', ))
self.assertEquals(_client.kwargs, {
'stream': True
})
def test_archive_get_with_authentication(self):
"""Reading archive should write data in temporary directory
"""
stream_content = [b"some", b"streamed", b"response", b"for", b"auth"]
response = StreamedResponse(
ok=True,
stream=(s for s in stream_content))
_client = FakeRequestClientGet(response)
_config = CLIENT_TEST_CONFIG.copy()
_config['auth'] = { # add authentication setup
'username': 'user',
'password': 'pass'
}
deposit_client = PrivateApiDepositClient(_config, _client=_client)
archive_path = os.path.join(self.temporary_directory, 'test.archive')
archive_path = deposit_client.archive_get('/some/url', archive_path)
self.assertTrue(os.path.exists(archive_path))
with open(archive_path, 'rb') as f:
actual_content = f.read()
self.assertEquals(actual_content, b''.join(stream_content))
self.assertEquals(_client.args, ('http://nowhere:9000/some/url', ))
self.assertEquals(_client.kwargs, {
'stream': True,
'auth': ('user', 'pass')
})
def test_archive_get_can_fail(self):
"""Reading archive can fail for some reasons
"""
response = StreamedResponse(ok=False, stream=None)
_client = FakeRequestClientGet(response)
deposit_client = PrivateApiDepositClient(config=CLIENT_TEST_CONFIG,
_client=_client)
with self.assertRaisesRegex(
ValueError,
'Problem when retrieving deposit archive'):
deposit_client.archive_get('/some/url', 'some/path')
class JsonResponse:
"""Json response facsimile
"""
def __init__(self, ok, response):
self.ok = ok
self.response = response
def json(self):
return self.response
class PrivateApiDepositClientReadMetadataTest(unittest.TestCase):
def test_metadata_get(self):
"""Reading archive should write data in temporary directory
"""
expected_response = {"some": "dict"}
response = JsonResponse(
ok=True,
response=expected_response)
_client = FakeRequestClientGet(response)
deposit_client = PrivateApiDepositClient(config=CLIENT_TEST_CONFIG,
_client=_client)
actual_metadata = deposit_client.metadata_get('/metadata')
self.assertEquals(actual_metadata, expected_response)
def test_metadata_get_can_fail(self):
"""Reading metadata can fail for some reasons
"""
_client = FakeRequestClientGet(JsonResponse(ok=False, response=None))
deposit_client = PrivateApiDepositClient(config=CLIENT_TEST_CONFIG,
_client=_client)
with self.assertRaisesRegex(
ValueError,
'Problem when retrieving metadata at'):
deposit_client.metadata_get('/some/metadata/url')
class FakeRequestClientPut:
"""Fake Request client dedicated to put request method calls.
"""
args = None
kwargs = None
def put(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
class PrivateApiDepositClientStatusUpdateTest(unittest.TestCase):
def test_status_update(self):
"""Update status
"""
_client = FakeRequestClientPut()
deposit_client = PrivateApiDepositClient(config=CLIENT_TEST_CONFIG,
_client=_client)
deposit_client.status_update('/update/status',
DEPOSIT_STATUS_LOAD_SUCCESS,
revision_id='some-revision-id')
self.assertEquals(_client.args,
('http://nowhere:9000/update/status', ))
self.assertEquals(_client.kwargs, {
'json': {
'status': DEPOSIT_STATUS_LOAD_SUCCESS,
'revision_id': 'some-revision-id',
}
})
def test_status_update_with_no_revision_id(self):
"""Reading metadata can fail for some reasons
"""
_client = FakeRequestClientPut()
deposit_client = PrivateApiDepositClient(config=CLIENT_TEST_CONFIG,
_client=_client)
deposit_client.status_update('/update/status/fail',
DEPOSIT_STATUS_LOAD_FAILURE)
self.assertEquals(_client.args,
('http://nowhere:9000/update/status/fail', ))
self.assertEquals(_client.kwargs, {
'json': {
'status': DEPOSIT_STATUS_LOAD_FAILURE,
}
})
class PrivateApiDepositClientCheckTest(unittest.TestCase):
def test_check(self):
"""When check ok, this should return the deposit's status
"""
_client = FakeRequestClientGet(
JsonResponse(ok=True, response={'status': 'something'}))
deposit_client = PrivateApiDepositClient(config=CLIENT_TEST_CONFIG,
_client=_client)
r = deposit_client.check('/check')
self.assertEquals(_client.args,
('http://nowhere:9000/check', ))
self.assertEquals(_client.kwargs, {})
self.assertEquals(r, 'something')
def test_check_fails(self):
"""Checking deposit can fail for some reason
"""
_client = FakeRequestClientGet(
JsonResponse(ok=False, response=None))
deposit_client = PrivateApiDepositClient(config=CLIENT_TEST_CONFIG,
_client=_client)
with self.assertRaisesRegex(
ValueError,
'Problem when checking deposit'):
deposit_client.check('/check/fails')
self.assertEquals(_client.args,
('http://nowhere:9000/check/fails', ))
self.assertEquals(_client.kwargs, {})
diff --git a/swh/deposit/tests/loader/test_loader.py b/swh/deposit/tests/loader/test_loader.py
index efdc30c1..2103d4ed 100644
--- a/swh/deposit/tests/loader/test_loader.py
+++ b/swh/deposit/tests/loader/test_loader.py
@@ -1,305 +1,305 @@
# Copyright (C) 2017-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
import unittest
import shutil
-from nose.plugins.attrib import attr
+import pytest
from rest_framework.test import APITestCase
from swh.model import hashutil
from swh.deposit.models import Deposit
from swh.deposit.loader import loader
from swh.deposit.config import (
PRIVATE_GET_RAW_CONTENT, PRIVATE_GET_DEPOSIT_METADATA, PRIVATE_PUT_DEPOSIT
)
from django.core.urlresolvers import reverse
from .common import SWHDepositTestClient, CLIENT_TEST_CONFIG
from .. import TEST_LOADER_CONFIG
from ..common import (BasicTestCase, WithAuthTestCase,
CommonCreationRoutine,
FileSystemCreationRoutine)
TOOL_ID = 99
PROVIDER_ID = 12
class DepositLoaderInhibitsStorage:
"""Mixin class to inhibit the persistence and keep in memory the data
sent for storage.
cf. SWHDepositLoaderNoStorage
"""
def __init__(self, client=None):
# client is not used here, transit it nonetheless to other mixins
super().__init__(client=client)
# typed data
self.state = {
'origin': [],
'origin_visit': [],
'origin_metadata': [],
'content': [],
'directory': [],
'revision': [],
'release': [],
'snapshot': [],
'tool': [],
'provider': []
}
def _add(self, type, l):
"""Add without duplicates and keeping the insertion order.
Args:
type (str): Type of objects concerned by the action
l ([object]): List of 'type' object
"""
col = self.state[type]
for o in l:
if o in col:
continue
col.extend([o])
def send_origin(self, origin):
origin.update({'id': 1})
self._add('origin', [origin])
return origin['id']
def send_origin_visit(self, origin_id, visit_date):
origin_visit = {
'origin': origin_id,
'visit_date': visit_date,
'visit': 1,
}
self._add('origin_visit', [origin_visit])
return origin_visit
def send_origin_metadata(self, origin_id, visit_date, provider_id, tool_id,
metadata):
origin_metadata = {
'origin_id': origin_id,
'visit_date': visit_date,
'provider_id': provider_id,
'tool_id': tool_id,
'metadata': metadata
}
self._add('origin_metadata', [origin_metadata])
return origin_metadata
def send_tool(self, tool):
tool = {
'tool_name': tool['tool_name'],
'tool_version': tool['tool_version'],
'tool_configuration': tool['tool_configuration']
}
self._add('tool', [tool])
tool_id = TOOL_ID
return tool_id
def send_provider(self, provider):
provider = {
'provider_name': provider['provider_name'],
'provider_type': provider['provider_type'],
'provider_url': provider['provider_url'],
'metadata': provider['metadata']
}
self._add('provider', [provider])
provider_id = PROVIDER_ID
return provider_id
def maybe_load_contents(self, contents):
self._add('content', contents)
def maybe_load_directories(self, directories):
self._add('directory', directories)
def maybe_load_revisions(self, revisions):
self._add('revision', revisions)
def maybe_load_releases(self, releases):
self._add('release', releases)
def maybe_load_snapshot(self, snapshot):
self._add('snapshot', [snapshot])
def open_fetch_history(self):
pass
def close_fetch_history_failure(self, fetch_history_id):
pass
def close_fetch_history_success(self, fetch_history_id):
pass
def update_origin_visit(self, origin_id, visit, status):
self.status = status
# Override to do nothing at the end
def close_failure(self):
pass
def close_success(self):
pass
class TestLoaderUtils(unittest.TestCase):
def assertRevisionsOk(self, expected_revisions): # noqa: N802
"""Check the loader's revisions match the expected revisions.
Expects self.loader to be instantiated and ready to be
inspected (meaning the loading took place).
Args:
expected_revisions (dict): Dict with key revision id,
value the targeted directory id.
"""
# The last revision being the one used later to start back from
for rev in self.loader.state['revision']:
rev_id = hashutil.hash_to_hex(rev['id'])
directory_id = hashutil.hash_to_hex(rev['directory'])
self.assertEquals(expected_revisions[rev_id], directory_id)
class SWHDepositLoaderNoStorage(DepositLoaderInhibitsStorage,
loader.DepositLoader):
"""Loader to test.
It inherits from the actual deposit loader to actually test its
correct behavior. It also inherits from
DepositLoaderInhibitsStorage so that no persistence takes place.
"""
pass
-@attr('fs')
+@pytest.mark.fs
class DepositLoaderScenarioTest(APITestCase, WithAuthTestCase,
BasicTestCase, CommonCreationRoutine,
FileSystemCreationRoutine, TestLoaderUtils):
def setUp(self):
super().setUp()
# create the extraction dir used by the loader
os.makedirs(TEST_LOADER_CONFIG['extraction_dir'], exist_ok=True)
# 1. create a deposit with archive and metadata
self.deposit_id = self.create_simple_binary_deposit()
# 2. Sets a basic client which accesses the test data
loader_client = SWHDepositTestClient(self.client,
config=CLIENT_TEST_CONFIG)
# 3. setup loader with no persistence and that client
self.loader = SWHDepositLoaderNoStorage(client=loader_client)
def tearDown(self):
super().tearDown()
shutil.rmtree(TEST_LOADER_CONFIG['extraction_dir'])
def test_inject_deposit_ready(self):
"""Load a deposit which is ready
"""
args = [self.collection.name, self.deposit_id]
archive_url = reverse(PRIVATE_GET_RAW_CONTENT, args=args)
deposit_meta_url = reverse(PRIVATE_GET_DEPOSIT_METADATA, args=args)
deposit_update_url = reverse(PRIVATE_PUT_DEPOSIT, args=args)
# when
self.loader.load(archive_url=archive_url,
deposit_meta_url=deposit_meta_url,
deposit_update_url=deposit_update_url)
# then
self.assertEquals(len(self.loader.state['content']), 1)
self.assertEquals(len(self.loader.state['directory']), 1)
self.assertEquals(len(self.loader.state['revision']), 1)
self.assertEquals(len(self.loader.state['release']), 0)
self.assertEquals(len(self.loader.state['snapshot']), 1)
def test_inject_deposit_verify_metadata(self):
"""Load a deposit with metadata, test metadata integrity
"""
self.deposit_metadata_id = self.add_metadata_to_deposit(
self.deposit_id)
args = [self.collection.name, self.deposit_metadata_id]
archive_url = reverse(PRIVATE_GET_RAW_CONTENT, args=args)
deposit_meta_url = reverse(PRIVATE_GET_DEPOSIT_METADATA, args=args)
deposit_update_url = reverse(PRIVATE_PUT_DEPOSIT, args=args)
# when
self.loader.load(archive_url=archive_url,
deposit_meta_url=deposit_meta_url,
deposit_update_url=deposit_update_url)
# then
self.assertEquals(len(self.loader.state['content']), 1)
self.assertEquals(len(self.loader.state['directory']), 1)
self.assertEquals(len(self.loader.state['revision']), 1)
self.assertEquals(len(self.loader.state['release']), 0)
self.assertEquals(len(self.loader.state['snapshot']), 1)
self.assertEquals(len(self.loader.state['origin_metadata']), 1)
self.assertEquals(len(self.loader.state['tool']), 1)
self.assertEquals(len(self.loader.state['provider']), 1)
codemeta = 'codemeta:'
origin_url = 'https://hal-test.archives-ouvertes.fr/hal-01243065'
expected_origin_metadata = {
'@xmlns': 'http://www.w3.org/2005/Atom',
'@xmlns:codemeta': 'https://doi.org/10.5063/SCHEMA/CODEMETA-2.0',
'author': {
'email': 'hal@ccsd.cnrs.fr',
'name': 'HAL'
},
codemeta + 'url': origin_url,
codemeta + 'runtimePlatform': 'phpstorm',
codemeta + 'license': [
{
codemeta + 'name': 'GNU General Public License v3.0 only'
},
{
codemeta + 'name': 'CeCILL Free Software License Agreement v1.1' # noqa
}
],
codemeta + 'author': {
codemeta + 'name': 'Morane Gruenpeter'
},
codemeta + 'programmingLanguage': ['php', 'python', 'C'],
codemeta + 'applicationCategory': 'test',
codemeta + 'dateCreated': '2017-05-03T16:08:47+02:00',
codemeta + 'version': '1',
'external_identifier': 'hal-01243065',
'title': 'Composing a Web of Audio Applications',
codemeta + 'description': 'this is the description',
'id': 'hal-01243065',
'client': 'hal',
codemeta + 'keywords': 'DSP programming,Web',
codemeta + 'developmentStatus': 'stable'
}
result = self.loader.state['origin_metadata'][0]
self.assertEquals(result['metadata'], expected_origin_metadata)
self.assertEquals(result['tool_id'], TOOL_ID)
self.assertEquals(result['provider_id'], PROVIDER_ID)
deposit = Deposit.objects.get(pk=self.deposit_id)
self.assertRegex(deposit.swh_id, r'^swh:1:dir:.*')
self.assertEquals(deposit.swh_id_context, '%s;origin=%s' % (
deposit.swh_id, origin_url
))
self.assertRegex(deposit.swh_anchor_id, r'^swh:1:rev:.*')
self.assertEquals(deposit.swh_anchor_id_context, '%s;origin=%s' % (
deposit.swh_anchor_id, origin_url
))