diff --git a/pytest.ini b/pytest.ini index afa4cf37..dc2e5dc8 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,2 +1,3 @@ [pytest] norecursedirs = docs +DJANGO_SETTINGS_MODULE = swh.deposit.settings.testing diff --git a/requirements-test.txt b/requirements-test.txt index 2a6f75e5..125b4602 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,2 +1,2 @@ -nose -django_nose +pytest +pytest-django diff --git a/swh/deposit/settings/testing.py b/swh/deposit/settings/testing.py index 45ee6873..c35631b3 100644 --- a/swh/deposit/settings/testing.py +++ b/swh/deposit/settings/testing.py @@ -1,50 +1,47 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from .common import * # noqa from .common import ALLOWED_HOSTS from .development import * # noqa from .development import INSTALLED_APPS -# django-nose setup +# django setup ALLOWED_HOSTS += ['testserver'] -INSTALLED_APPS += ['django_nose'] - -TEST_RUNNER = 'django_nose.NoseTestSuiteRunner' -NOSE_ARGS = ['--verbosity=3', '-s'] # to see test pass +INSTALLED_APPS += ['pytest_django'] # https://docs.djangoproject.com/en/1.10/ref/settings/#logging LOGGING = { 'version': 1, 'disable_existing_loggers': True, 'formatters': { 'standard': { 'format': "[%(asctime)s] %(levelname)s [%(name)s:%(lineno)s] %(message)s", # noqa 'datefmt': "%d/%b/%Y %H:%M:%S" }, }, 'handlers': { 'console': { 'level': 'ERROR', 'class': 'logging.StreamHandler', 'formatter': 'standard' }, }, 'loggers': { 'swh.deposit': { 'handlers': ['console'], 'level': 'ERROR', }, } } # https://docs.djangoproject.com/en/1.11/ref/settings/#std:setting-MEDIA_ROOT # SECURITY WARNING: Override this in the production.py module MEDIA_ROOT = '/tmp/swh-deposit/test/uploads/' FILE_UPLOAD_HANDLERS = [ "django.core.files.uploadhandler.MemoryFileUploadHandler", ] diff --git a/swh/deposit/tests/api/test_deposit_check.py b/swh/deposit/tests/api/test_deposit_check.py index d3cb3528..019ded0c 100644 --- a/swh/deposit/tests/api/test_deposit_check.py +++ b/swh/deposit/tests/api/test_deposit_check.py @@ -1,236 +1,236 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from django.core.urlresolvers import reverse -from nose.plugins.attrib import attr +import pytest from rest_framework import status from rest_framework.test import APITestCase from swh.deposit.config import ( DEPOSIT_STATUS_VERIFIED, PRIVATE_CHECK_DEPOSIT, DEPOSIT_STATUS_DEPOSITED, DEPOSIT_STATUS_REJECTED ) from swh.deposit.api.private.deposit_check import ( SWHChecksDeposit, MANDATORY_ARCHIVE_INVALID, MANDATORY_FIELDS_MISSING, INCOMPATIBLE_URL_FIELDS, MANDATORY_ARCHIVE_UNSUPPORTED, ALTERNATE_FIELDS_MISSING, MANDATORY_ARCHIVE_MISSING ) from swh.deposit.models import Deposit from ..common import BasicTestCase, WithAuthTestCase, CommonCreationRoutine from ..common import FileSystemCreationRoutine -@attr('fs') +@pytest.mark.fs class CheckDepositTest(APITestCase, WithAuthTestCase, BasicTestCase, CommonCreationRoutine, FileSystemCreationRoutine): """Check deposit endpoints. """ def setUp(self): super().setUp() def test_deposit_ok(self): """Proper deposit should succeed the checks (-> status ready) """ deposit_id = self.create_simple_binary_deposit(status_partial=True) deposit_id = self.update_binary_deposit(deposit_id, status_partial=False) deposit = Deposit.objects.get(pk=deposit_id) self.assertEquals(deposit.status, DEPOSIT_STATUS_DEPOSITED) url = reverse(PRIVATE_CHECK_DEPOSIT, args=[self.collection.name, deposit.id]) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_200_OK) data = response.json() self.assertEqual(data['status'], DEPOSIT_STATUS_VERIFIED) deposit = Deposit.objects.get(pk=deposit.id) self.assertEquals(deposit.status, DEPOSIT_STATUS_VERIFIED) def test_deposit_invalid_tarball(self): """Deposit with tarball (of 1 tarball) should fail the checks: rejected """ for archive_extension in ['zip', 'tar', 'tar.gz', 'tar.bz2', 'tar.xz']: deposit_id = self.create_deposit_archive_with_archive( archive_extension) deposit = Deposit.objects.get(pk=deposit_id) self.assertEquals(DEPOSIT_STATUS_DEPOSITED, deposit.status) url = reverse(PRIVATE_CHECK_DEPOSIT, args=[self.collection.name, deposit.id]) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_200_OK) data = response.json() self.assertEqual(data['status'], DEPOSIT_STATUS_REJECTED) details = data['details'] # archive checks failure self.assertEqual(len(details['archive']), 1) self.assertEqual(details['archive'][0]['summary'], MANDATORY_ARCHIVE_INVALID) deposit = Deposit.objects.get(pk=deposit.id) self.assertEquals(deposit.status, DEPOSIT_STATUS_REJECTED) def test_deposit_ko_missing_tarball(self): """Deposit without archive should fail the checks: rejected """ deposit_id = self.create_deposit_ready() # no archive, only atom deposit = Deposit.objects.get(pk=deposit_id) self.assertEquals(DEPOSIT_STATUS_DEPOSITED, deposit.status) url = reverse(PRIVATE_CHECK_DEPOSIT, args=[self.collection.name, deposit.id]) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_200_OK) data = response.json() self.assertEqual(data['status'], DEPOSIT_STATUS_REJECTED) details = data['details'] # archive checks failure self.assertEqual(len(details['archive']), 1) self.assertEqual(details['archive'][0]['summary'], MANDATORY_ARCHIVE_MISSING) deposit = Deposit.objects.get(pk=deposit.id) self.assertEquals(deposit.status, DEPOSIT_STATUS_REJECTED) def test_deposit_ko_unsupported_tarball(self): """Deposit with an unsupported tarball should fail the checks: rejected """ deposit_id = self.create_deposit_with_invalid_archive() deposit = Deposit.objects.get(pk=deposit_id) self.assertEquals(DEPOSIT_STATUS_DEPOSITED, deposit.status) url = reverse(PRIVATE_CHECK_DEPOSIT, args=[self.collection.name, deposit.id]) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_200_OK) data = response.json() self.assertEqual(data['status'], DEPOSIT_STATUS_REJECTED) details = data['details'] # archive checks failure self.assertEqual(len(details['archive']), 1) self.assertEqual(details['archive'][0]['summary'], MANDATORY_ARCHIVE_UNSUPPORTED) # metadata check failure self.assertEqual(len(details['metadata']), 2) mandatory = details['metadata'][0] self.assertEqual(mandatory['summary'], MANDATORY_FIELDS_MISSING) self.assertEqual(set(mandatory['fields']), set(['url', 'external_identifier', 'author'])) alternate = details['metadata'][1] self.assertEqual(alternate['summary'], ALTERNATE_FIELDS_MISSING) self.assertEqual(alternate['fields'], ['name or title']) # url check failure self.assertEqual(details['url']['summary'], INCOMPATIBLE_URL_FIELDS) deposit = Deposit.objects.get(pk=deposit.id) self.assertEquals(deposit.status, DEPOSIT_STATUS_REJECTED) def test_check_deposit_metadata_ok(self): """Proper deposit should succeed the checks (-> status ready) with all **MUST** metadata using the codemeta metadata test set """ deposit_id = self.create_simple_binary_deposit(status_partial=True) deposit_id_metadata = self.add_metadata_to_deposit(deposit_id) self.assertEquals(deposit_id, deposit_id_metadata) deposit = Deposit.objects.get(pk=deposit_id) self.assertEquals(deposit.status, DEPOSIT_STATUS_DEPOSITED) url = reverse(PRIVATE_CHECK_DEPOSIT, args=[self.collection.name, deposit.id]) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_200_OK) data = response.json() self.assertEqual(data['status'], DEPOSIT_STATUS_VERIFIED) deposit = Deposit.objects.get(pk=deposit.id) self.assertEquals(deposit.status, DEPOSIT_STATUS_VERIFIED) class CheckMetadata(unittest.TestCase, SWHChecksDeposit): def test_check_metadata_ok(self): actual_check, detail = self._check_metadata({ 'url': 'something', 'external_identifier': 'something-else', 'name': 'foo', 'author': 'someone', }) self.assertTrue(actual_check) self.assertIsNone(detail) def test_check_metadata_ok2(self): actual_check, detail = self._check_metadata({ 'url': 'something', 'external_identifier': 'something-else', 'title': 'bar', 'author': 'someone', }) self.assertTrue(actual_check) self.assertIsNone(detail) def test_check_metadata_ko(self): """Missing optional field should be caught """ actual_check, error_detail = self._check_metadata({ 'url': 'something', 'external_identifier': 'something-else', 'author': 'someone', }) expected_error = { 'metadata': [{ 'summary': 'Mandatory alternate fields are missing', 'fields': ['name or title'], }] } self.assertFalse(actual_check) self.assertEqual(error_detail, expected_error) def test_check_metadata_ko2(self): """Missing mandatory fields should be caught """ actual_check, error_detail = self._check_metadata({ 'url': 'something', 'external_identifier': 'something-else', 'title': 'foobar', }) expected_error = { 'metadata': [{ 'summary': 'Mandatory fields are missing', 'fields': ['author'], }] } self.assertFalse(actual_check) self.assertEqual(error_detail, expected_error) diff --git a/swh/deposit/tests/api/test_deposit_list.py b/swh/deposit/tests/api/test_deposit_list.py index e22cd739..9e7fc587 100644 --- a/swh/deposit/tests/api/test_deposit_list.py +++ b/swh/deposit/tests/api/test_deposit_list.py @@ -1,94 +1,94 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from django.core.urlresolvers import reverse -from nose.plugins.attrib import attr +import pytest from rest_framework import status from rest_framework.test import APITestCase from swh.deposit.api.converters import convert_status_detail from ...config import DEPOSIT_STATUS_PARTIAL, PRIVATE_LIST_DEPOSITS from ..common import BasicTestCase, WithAuthTestCase, CommonCreationRoutine from ...models import Deposit -@attr('fs') +@pytest.mark.fs class CheckDepositListTest(APITestCase, WithAuthTestCase, BasicTestCase, CommonCreationRoutine): """Check deposit list endpoints. """ def setUp(self): super().setUp() def test_deposit_list(self): """Deposit list api should return the deposits """ deposit_id = self.create_deposit_partial() # amend the deposit with a status_detail deposit = Deposit.objects.get(pk=deposit_id) status_detail = { 'url': { 'summary': 'At least one compatible url field. Failed', 'fields': ['testurl'], }, 'metadata': [ { 'summary': 'Mandatory fields missing', 'fields': ['9', 10, 1.212], }, ], 'archive': [ { 'summary': 'Invalid archive', 'fields': ['3'], }, { 'summary': 'Unsupported archive', 'fields': [2], } ], } deposit.status_detail = status_detail deposit.save() deposit_id2 = self.create_deposit_partial() # NOTE: does not work as documented # https://docs.djangoproject.com/en/1.11/ref/urlresolvers/#django.core.urlresolvers.reverse # noqa # url = reverse(PRIVATE_LIST_DEPOSITS, kwargs={'page_size': 1}) main_url = reverse(PRIVATE_LIST_DEPOSITS) url = '%s?page_size=1' % main_url response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_200_OK) data = response.json() self.assertEqual(data['count'], 2) # 2 deposits expected_next = '%s?page=2&page_size=1' % main_url self.assertTrue(data['next'].endswith(expected_next)) self.assertIsNone(data['previous']) self.assertEqual(len(data['results']), 1) # page of size 1 deposit = data['results'][0] self.assertEquals(deposit['id'], deposit_id) self.assertEquals(deposit['status'], DEPOSIT_STATUS_PARTIAL) expected_status_detail = convert_status_detail(status_detail) self.assertEquals(deposit['status_detail'], expected_status_detail) # then 2nd page response2 = self.client.get(expected_next) self.assertEqual(response2.status_code, status.HTTP_200_OK) data2 = response2.json() self.assertEqual(data2['count'], 2) # still 2 deposits self.assertIsNone(data2['next']) expected_previous = '%s?page_size=1' % main_url self.assertTrue(data2['previous'].endswith(expected_previous)) self.assertEqual(len(data2['results']), 1) # page of size 1 deposit2 = data2['results'][0] self.assertEquals(deposit2['id'], deposit_id2) self.assertEquals(deposit2['status'], DEPOSIT_STATUS_PARTIAL) diff --git a/swh/deposit/tests/api/test_deposit_read_archive.py b/swh/deposit/tests/api/test_deposit_read_archive.py index 7ed3b85a..086ec6e9 100644 --- a/swh/deposit/tests/api/test_deposit_read_archive.py +++ b/swh/deposit/tests/api/test_deposit_read_archive.py @@ -1,125 +1,125 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import hashlib import os from django.core.urlresolvers import reverse -from nose.plugins.attrib import attr +import pytest from rest_framework import status from rest_framework.test import APITestCase from swh.core import tarball from swh.deposit.config import PRIVATE_GET_RAW_CONTENT from swh.deposit.tests import TEST_CONFIG from ..common import BasicTestCase, WithAuthTestCase, CommonCreationRoutine from ..common import FileSystemCreationRoutine, create_arborescence_archive -@attr('fs') +@pytest.mark.fs class DepositReadArchivesTest(APITestCase, WithAuthTestCase, BasicTestCase, CommonCreationRoutine, FileSystemCreationRoutine): def setUp(self): super().setUp() self.archive2 = create_arborescence_archive( self.root_path, 'archive2', 'file2', b'some other content in file') self.workdir = os.path.join(self.root_path, 'workdir') def test_access_to_existing_deposit_with_one_archive(self): """Access to deposit should stream a 200 response with its raw content """ deposit_id = self.create_simple_binary_deposit() url = reverse(PRIVATE_GET_RAW_CONTENT, args=[self.collection.name, deposit_id]) r = self.client.get(url) self.assertEquals(r.status_code, status.HTTP_200_OK) self.assertEquals(r._headers['content-type'][1], 'application/octet-stream') # read the stream data = b''.join(r.streaming_content) actual_sha1 = hashlib.sha1(data).hexdigest() self.assertEquals(actual_sha1, self.archive['sha1sum']) # this does not touch the extraction dir so this should stay empty self.assertEquals(os.listdir(TEST_CONFIG['extraction_dir']), []) def _check_tarball_consistency(self, actual_sha1): tarball.uncompress(self.archive['path'], self.workdir) self.assertEquals(os.listdir(self.workdir), ['file1']) tarball.uncompress(self.archive2['path'], self.workdir) lst = set(os.listdir(self.workdir)) self.assertEquals(lst, {'file1', 'file2'}) new_path = self.workdir + '.zip' tarball.compress(new_path, 'zip', self.workdir) with open(new_path, 'rb') as f: h = hashlib.sha1(f.read()).hexdigest() self.assertEqual(actual_sha1, h) self.assertNotEqual(actual_sha1, self.archive['sha1sum']) self.assertNotEqual(actual_sha1, self.archive2['sha1sum']) def test_access_to_existing_deposit_with_multiple_archives(self): """Access to deposit should stream a 200 response with its raw contents """ deposit_id = self.create_complex_binary_deposit() url = reverse(PRIVATE_GET_RAW_CONTENT, args=[self.collection.name, deposit_id]) r = self.client.get(url) self.assertEquals(r.status_code, status.HTTP_200_OK) self.assertEquals(r._headers['content-type'][1], 'application/octet-stream') # read the stream data = b''.join(r.streaming_content) actual_sha1 = hashlib.sha1(data).hexdigest() self._check_tarball_consistency(actual_sha1) # this touches the extraction directory but should clean up # after itself self.assertEquals(os.listdir(TEST_CONFIG['extraction_dir']), []) class DepositReadArchivesFailureTest(APITestCase, WithAuthTestCase, BasicTestCase, CommonCreationRoutine): def test_access_to_nonexisting_deposit_returns_404_response(self): """Read unknown collection should return a 404 response """ unknown_id = '999' url = reverse(PRIVATE_GET_RAW_CONTENT, args=[self.collection.name, unknown_id]) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) self.assertIn('Deposit with id %s does not exist' % unknown_id, response.content.decode('utf-8')) def test_access_to_nonexisting_collection_returns_404_response(self): """Read unknown deposit should return a 404 response """ collection_name = 'non-existing' deposit_id = self.create_deposit_partial() url = reverse(PRIVATE_GET_RAW_CONTENT, args=[collection_name, deposit_id]) response = self.client.get(url) self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) self.assertIn('Unknown collection name %s' % collection_name, response.content.decode('utf-8')) diff --git a/swh/deposit/tests/common.py b/swh/deposit/tests/common.py index c2799717..d7d9de20 100644 --- a/swh/deposit/tests/common.py +++ b/swh/deposit/tests/common.py @@ -1,573 +1,573 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import base64 import hashlib import os import shutil import tarfile import tempfile from django.core.urlresolvers import reverse from django.test import TestCase from io import BytesIO -from nose.plugins.attrib import attr +import pytest from rest_framework import status from swh.deposit.config import (COL_IRI, EM_IRI, EDIT_SE_IRI, DEPOSIT_STATUS_PARTIAL, DEPOSIT_STATUS_VERIFIED, DEPOSIT_STATUS_REJECTED, DEPOSIT_STATUS_DEPOSITED) from swh.deposit.models import DepositClient, DepositCollection, Deposit from swh.deposit.models import DepositRequest from swh.deposit.models import DepositRequestType from swh.deposit.parsers import parse_xml from swh.deposit.settings.testing import MEDIA_ROOT from swh.core import tarball def compute_info(archive_path): """Given a path, compute information on path. """ with open(archive_path, 'rb') as f: length = 0 sha1sum = hashlib.sha1() md5sum = hashlib.md5() data = b'' for chunk in f: sha1sum.update(chunk) md5sum.update(chunk) length += len(chunk) data += chunk return { 'dir': os.path.dirname(archive_path), 'name': os.path.basename(archive_path), 'path': archive_path, 'length': length, 'sha1sum': sha1sum.hexdigest(), 'md5sum': md5sum.hexdigest(), 'data': data } def _compress(path, extension, dir_path): """Compress path according to extension """ if extension == 'zip' or extension == 'tar': return tarball.compress(path, extension, dir_path) elif '.' in extension: split_ext = extension.split('.') if split_ext[0] != 'tar': raise ValueError( 'Development error, only zip or tar archive supported, ' '%s not supported' % extension) # deal with specific tar mode = split_ext[1] supported_mode = ['xz', 'gz', 'bz2'] if mode not in supported_mode: raise ValueError( 'Development error, only %s supported, %s not supported' % ( supported_mode, mode)) files = tarball._ls(dir_path) with tarfile.open(path, 'w:%s' % mode) as t: for fpath, fname in files: t.add(fpath, arcname=fname, recursive=False) return path def create_arborescence_archive(root_path, archive_name, filename, content, up_to_size=None, extension='zip'): """Build an archive named archive_name in the root_path. This archive contains one file named filename with the content content. Args: root_path (str): Location path of the archive to create archive_name (str): Archive's name (without extension) filename (str): Archive's content is only one filename content (bytes): Content of the filename up_to_size (int | None): Fill in the blanks size to oversize or complete an archive's size extension (str): Extension of the archive to write (default is zip) Returns: dict with the keys: - dir: the directory of that archive - path: full path to the archive - sha1sum: archive's sha1sum - length: archive's length """ os.makedirs(root_path, exist_ok=True) archive_path_dir = tempfile.mkdtemp(dir=root_path) dir_path = os.path.join(archive_path_dir, archive_name) os.mkdir(dir_path) filepath = os.path.join(dir_path, filename) _length = len(content) count = 0 batch_size = 128 with open(filepath, 'wb') as f: f.write(content) if up_to_size: # fill with blank content up to a given size count += _length while count < up_to_size: f.write(b'0'*batch_size) count += batch_size _path = '%s.%s' % (dir_path, extension) _path = _compress(_path, extension, dir_path) return compute_info(_path) def create_archive_with_archive(root_path, name, archive): """Create an archive holding another. """ invalid_archive_path = os.path.join(root_path, name) with tarfile.open(invalid_archive_path, 'w:gz') as _archive: _archive.add(archive['path'], arcname=archive['name']) return compute_info(invalid_archive_path) -@attr('fs') +@pytest.mark.fs class FileSystemCreationRoutine(TestCase): """Mixin intended for tests needed to tamper with archives. """ def setUp(self): """Define the test client and other test variables.""" super().setUp() self.root_path = '/tmp/swh-deposit/test/build-zip/' os.makedirs(self.root_path, exist_ok=True) self.archive = create_arborescence_archive( self.root_path, 'archive1', 'file1', b'some content in file') self.atom_entry = b""" Awesome Compiler urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a 1785io25c695 2017-10-07T15:17:08Z some awesome author https://hal-test.archives-ouvertes.fr """ def tearDown(self): super().tearDown() shutil.rmtree(self.root_path) def create_simple_binary_deposit(self, status_partial=True): response = self.client.post( reverse(COL_IRI, args=[self.collection.name]), content_type='application/zip', data=self.archive['data'], CONTENT_LENGTH=self.archive['length'], HTTP_MD5SUM=self.archive['md5sum'], HTTP_SLUG='external-id', HTTP_IN_PROGRESS=status_partial, HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % ( self.archive['name'], )) # then self.assertEqual(response.status_code, status.HTTP_201_CREATED) response_content = parse_xml(BytesIO(response.content)) _status = response_content['deposit_status'] if status_partial: expected_status = DEPOSIT_STATUS_PARTIAL else: expected_status = DEPOSIT_STATUS_VERIFIED self.assertEqual(_status, expected_status) deposit_id = int(response_content['deposit_id']) return deposit_id def create_complex_binary_deposit(self, status_partial=False): deposit_id = self.create_simple_binary_deposit( status_partial=True) # Add a second archive to the deposit # update its status to DEPOSIT_STATUS_VERIFIED response = self.client.post( reverse(EM_IRI, args=[self.collection.name, deposit_id]), content_type='application/zip', data=self.archive2['data'], CONTENT_LENGTH=self.archive2['length'], HTTP_MD5SUM=self.archive2['md5sum'], HTTP_SLUG='external-id', HTTP_IN_PROGRESS=status_partial, HTTP_CONTENT_DISPOSITION='attachment; filename=filename1.zip') # then assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = int(response_content['deposit_id']) return deposit_id def create_deposit_archive_with_archive(self, archive_extension): # we create the holding archive to a given extension archive = create_arborescence_archive( self.root_path, 'archive1', 'file1', b'some content in file', extension=archive_extension) # now we create an archive holding the first created archive invalid_archive = create_archive_with_archive( self.root_path, 'invalid.tar.gz', archive) # we deposit it response = self.client.post( reverse(COL_IRI, args=[self.collection.name]), content_type='application/x-tar', data=invalid_archive['data'], CONTENT_LENGTH=invalid_archive['length'], HTTP_MD5SUM=invalid_archive['md5sum'], HTTP_SLUG='external-id', HTTP_IN_PROGRESS=False, HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % ( invalid_archive['name'], )) # then self.assertEqual(response.status_code, status.HTTP_201_CREATED) response_content = parse_xml(BytesIO(response.content)) _status = response_content['deposit_status'] self.assertEqual(_status, DEPOSIT_STATUS_DEPOSITED) deposit_id = int(response_content['deposit_id']) return deposit_id def update_binary_deposit(self, deposit_id, status_partial=False): # update existing deposit with atom entry metadata response = self.client.post( reverse(EDIT_SE_IRI, args=[self.collection.name, deposit_id]), content_type='application/atom+xml;type=entry', data=self.codemeta_entry_data1, HTTP_SLUG='external-id', HTTP_IN_PROGRESS=status_partial) # then self.assertEqual(response.status_code, status.HTTP_201_CREATED) response_content = parse_xml(BytesIO(response.content)) _status = response_content['deposit_status'] if status_partial: expected_status = DEPOSIT_STATUS_PARTIAL else: expected_status = DEPOSIT_STATUS_DEPOSITED self.assertEqual(_status, expected_status) deposit_id = int(response_content['deposit_id']) return deposit_id -@attr('fs') +@pytest.mark.fs class BasicTestCase(TestCase): """Mixin intended for data setup purposes (user, collection, etc...) """ def setUp(self): """Define the test client and other test variables.""" super().setUp() # expanding diffs in tests self.maxDiff = None # basic minimum test data deposit_request_types = {} # Add deposit request types for deposit_request_type in ['archive', 'metadata']: drt = DepositRequestType(name=deposit_request_type) drt.save() deposit_request_types[deposit_request_type] = drt _name = 'hal' _provider_url = 'https://hal-test.archives-ouvertes.fr/' _domain = 'archives-ouvertes.fr/' # set collection up _collection = DepositCollection(name=_name) _collection.save() # set user/client up _client = DepositClient.objects.create_user(username=_name, password=_name, provider_url=_provider_url, domain=_domain) _client.collections = [_collection.id] _client.last_name = _name _client.save() self.collection = _collection self.user = _client self.username = _name self.userpass = _name self.deposit_request_types = deposit_request_types def tearDown(self): super().tearDown() # Clean up uploaded files in temporary directory (tests have # their own media root folder) if os.path.exists(MEDIA_ROOT): for d in os.listdir(MEDIA_ROOT): shutil.rmtree(os.path.join(MEDIA_ROOT, d)) class WithAuthTestCase(TestCase): """Mixin intended for testing the api with basic authentication. """ def setUp(self): super().setUp() _token = '%s:%s' % (self.username, self.userpass) token = base64.b64encode(_token.encode('utf-8')) authorization = 'Basic %s' % token.decode('utf-8') self.client.credentials(HTTP_AUTHORIZATION=authorization) def tearDown(self): super().tearDown() self.client.credentials() class CommonCreationRoutine(TestCase): """Mixin class to share initialization routine. cf: `class`:test_deposit_update.DepositReplaceExistingDataTest `class`:test_deposit_update.DepositUpdateDepositWithNewDataTest `class`:test_deposit_update.DepositUpdateFailuresTest `class`:test_deposit_delete.DepositDeleteTest """ def setUp(self): super().setUp() self.atom_entry_data0 = b""" some-external-id https://hal-test.archives-ouvertes.fr/some-external-id some awesome author """ self.atom_entry_data1 = b""" another one no one """ self.atom_entry_data2 = b""" Awesome Compiler urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a 1785io25c695 2017-10-07T15:17:08Z some awesome author https://hal-test.archives-ouvertes.fr/id """ self.codemeta_entry_data0 = b""" Awesome Compiler https://hal-test.archives-ouvertes.fr/1785io25c695 urn:uuid:1225c695-cfb8-4ebb-aaaa-80da344efa6a 1785io25c695 2017-10-07T15:17:08Z some awesome author description key-word 1 """ self.codemeta_entry_data1 = b""" Composing a Web of Audio Applications hal hal-01243065 hal-01243065 https://hal-test.archives-ouvertes.fr/hal-01243065 test DSP programming,Web 2017-05-03T16:08:47+02:00 this is the description 1 phpstorm stable php python C GNU General Public License v3.0 only CeCILL Free Software License Agreement v1.1 HAL hal@ccsd.cnrs.fr Morane Gruenpeter """ def create_deposit_with_invalid_archive(self, external_id='some-external-id-1'): url = reverse(COL_IRI, args=[self.collection.name]) data = b'some data which is clearly not a zip file' md5sum = hashlib.md5(data).hexdigest() # when response = self.client.post( url, content_type='application/zip', # as zip data=data, # + headers CONTENT_LENGTH=len(data), # other headers needs HTTP_ prefix to be taken into account HTTP_SLUG=external_id, HTTP_CONTENT_MD5=md5sum, HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') response_content = parse_xml(BytesIO(response.content)) deposit_id = int(response_content['deposit_id']) return deposit_id def create_deposit_with_status( self, status, external_id='some-external-id-1', swh_id=None, swh_id_context=None, swh_anchor_id=None, swh_anchor_id_context=None, status_detail=None): # create an invalid deposit which we will update further down the line deposit_id = self.create_deposit_with_invalid_archive(external_id) # We cannot create some form of deposit with a given status in # test context ('rejected' for example). Update in place the # deposit with such status to permit some further tests. deposit = Deposit.objects.get(pk=deposit_id) if status == DEPOSIT_STATUS_REJECTED: deposit.status_detail = status_detail deposit.status = status if swh_id: deposit.swh_id = swh_id if swh_id_context: deposit.swh_id_context = swh_id_context if swh_anchor_id: deposit.swh_anchor_id = swh_anchor_id if swh_anchor_id_context: deposit.swh_anchor_id_context = swh_anchor_id_context deposit.save() return deposit_id def create_simple_deposit_partial(self, external_id='some-external-id'): """Create a simple deposit (1 request) in `partial` state and returns its new identifier. Returns: deposit id """ response = self.client.post( reverse(COL_IRI, args=[self.collection.name]), content_type='application/atom+xml;type=entry', data=self.atom_entry_data0, HTTP_SLUG=external_id, HTTP_IN_PROGRESS='true') assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = int(response_content['deposit_id']) return deposit_id def create_deposit_partial_with_data_in_args(self, data): """Create a simple deposit (1 request) in `partial` state with the data or metadata as an argument and returns its new identifier. Args: data: atom entry Returns: deposit id """ response = self.client.post( reverse(COL_IRI, args=[self.collection.name]), content_type='application/atom+xml;type=entry', data=data, HTTP_SLUG='external-id', HTTP_IN_PROGRESS='true') assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = int(response_content['deposit_id']) return deposit_id def _update_deposit_with_status(self, deposit_id, status_partial=False): """Add to a given deposit another archive and update its current status to `deposited` (by default). Returns: deposit id """ # when response = self.client.post( reverse(EDIT_SE_IRI, args=[self.collection.name, deposit_id]), content_type='application/atom+xml;type=entry', data=self.atom_entry_data1, HTTP_SLUG='external-id', HTTP_IN_PROGRESS=status_partial) # then assert response.status_code == status.HTTP_201_CREATED return deposit_id def create_deposit_ready(self, external_id='some-external-id'): """Create a complex deposit (2 requests) in status `deposited`. """ deposit_id = self.create_simple_deposit_partial( external_id=external_id) deposit_id = self._update_deposit_with_status(deposit_id) return deposit_id def create_deposit_partial(self, external_id='some-external-id'): """Create a complex deposit (2 requests) in status `partial`. """ deposit_id = self.create_simple_deposit_partial( external_id=external_id) deposit_id = self._update_deposit_with_status( deposit_id, status_partial=True) return deposit_id def add_metadata_to_deposit(self, deposit_id, status_partial=False): """Add metadata to deposit. """ # when response = self.client.post( reverse(EDIT_SE_IRI, args=[self.collection.name, deposit_id]), content_type='application/atom+xml;type=entry', data=self.codemeta_entry_data1, HTTP_SLUG='external-id', HTTP_IN_PROGRESS=status_partial) assert response.status_code == status.HTTP_201_CREATED # then deposit = Deposit.objects.get(pk=deposit_id) assert deposit is not None deposit_requests = DepositRequest.objects.filter(deposit=deposit) assert deposit_requests is not [] for dr in deposit_requests: if dr.type.name == 'metadata': assert deposit_requests[0].metadata is not {} return deposit_id diff --git a/swh/deposit/tests/loader/test_client.py b/swh/deposit/tests/loader/test_client.py index cbdb703f..8241967a 100644 --- a/swh/deposit/tests/loader/test_client.py +++ b/swh/deposit/tests/loader/test_client.py @@ -1,258 +1,258 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import shutil import tempfile import unittest -from nose.plugins.attrib import attr +import pytest from swh.deposit.client import PrivateApiDepositClient from swh.deposit.config import DEPOSIT_STATUS_LOAD_SUCCESS from swh.deposit.config import DEPOSIT_STATUS_LOAD_FAILURE from .common import CLIENT_TEST_CONFIG class StreamedResponse: """Streamed response facsimile """ def __init__(self, ok, stream): self.ok = ok self.stream = stream def iter_content(self): yield from self.stream class FakeRequestClientGet: """Fake request client dedicated to get method calls. """ def __init__(self, response): self.response = response def get(self, *args, **kwargs): self.args = args self.kwargs = kwargs return self.response -@attr('fs') +@pytest.mark.fs class PrivateApiDepositClientReadArchiveTest(unittest.TestCase): def setUp(self): super().setUp() self.temporary_directory = tempfile.mkdtemp(dir='/tmp') def tearDown(self): super().setUp() shutil.rmtree(self.temporary_directory) def test_archive_get(self): """Reading archive should write data in temporary directory """ stream_content = [b"some", b"streamed", b"response"] response = StreamedResponse( ok=True, stream=(s for s in stream_content)) _client = FakeRequestClientGet(response) deposit_client = PrivateApiDepositClient(config=CLIENT_TEST_CONFIG, _client=_client) archive_path = os.path.join(self.temporary_directory, 'test.archive') archive_path = deposit_client.archive_get('/some/url', archive_path) self.assertTrue(os.path.exists(archive_path)) with open(archive_path, 'rb') as f: actual_content = f.read() self.assertEquals(actual_content, b''.join(stream_content)) self.assertEquals(_client.args, ('http://nowhere:9000/some/url', )) self.assertEquals(_client.kwargs, { 'stream': True }) def test_archive_get_with_authentication(self): """Reading archive should write data in temporary directory """ stream_content = [b"some", b"streamed", b"response", b"for", b"auth"] response = StreamedResponse( ok=True, stream=(s for s in stream_content)) _client = FakeRequestClientGet(response) _config = CLIENT_TEST_CONFIG.copy() _config['auth'] = { # add authentication setup 'username': 'user', 'password': 'pass' } deposit_client = PrivateApiDepositClient(_config, _client=_client) archive_path = os.path.join(self.temporary_directory, 'test.archive') archive_path = deposit_client.archive_get('/some/url', archive_path) self.assertTrue(os.path.exists(archive_path)) with open(archive_path, 'rb') as f: actual_content = f.read() self.assertEquals(actual_content, b''.join(stream_content)) self.assertEquals(_client.args, ('http://nowhere:9000/some/url', )) self.assertEquals(_client.kwargs, { 'stream': True, 'auth': ('user', 'pass') }) def test_archive_get_can_fail(self): """Reading archive can fail for some reasons """ response = StreamedResponse(ok=False, stream=None) _client = FakeRequestClientGet(response) deposit_client = PrivateApiDepositClient(config=CLIENT_TEST_CONFIG, _client=_client) with self.assertRaisesRegex( ValueError, 'Problem when retrieving deposit archive'): deposit_client.archive_get('/some/url', 'some/path') class JsonResponse: """Json response facsimile """ def __init__(self, ok, response): self.ok = ok self.response = response def json(self): return self.response class PrivateApiDepositClientReadMetadataTest(unittest.TestCase): def test_metadata_get(self): """Reading archive should write data in temporary directory """ expected_response = {"some": "dict"} response = JsonResponse( ok=True, response=expected_response) _client = FakeRequestClientGet(response) deposit_client = PrivateApiDepositClient(config=CLIENT_TEST_CONFIG, _client=_client) actual_metadata = deposit_client.metadata_get('/metadata') self.assertEquals(actual_metadata, expected_response) def test_metadata_get_can_fail(self): """Reading metadata can fail for some reasons """ _client = FakeRequestClientGet(JsonResponse(ok=False, response=None)) deposit_client = PrivateApiDepositClient(config=CLIENT_TEST_CONFIG, _client=_client) with self.assertRaisesRegex( ValueError, 'Problem when retrieving metadata at'): deposit_client.metadata_get('/some/metadata/url') class FakeRequestClientPut: """Fake Request client dedicated to put request method calls. """ args = None kwargs = None def put(self, *args, **kwargs): self.args = args self.kwargs = kwargs class PrivateApiDepositClientStatusUpdateTest(unittest.TestCase): def test_status_update(self): """Update status """ _client = FakeRequestClientPut() deposit_client = PrivateApiDepositClient(config=CLIENT_TEST_CONFIG, _client=_client) deposit_client.status_update('/update/status', DEPOSIT_STATUS_LOAD_SUCCESS, revision_id='some-revision-id') self.assertEquals(_client.args, ('http://nowhere:9000/update/status', )) self.assertEquals(_client.kwargs, { 'json': { 'status': DEPOSIT_STATUS_LOAD_SUCCESS, 'revision_id': 'some-revision-id', } }) def test_status_update_with_no_revision_id(self): """Reading metadata can fail for some reasons """ _client = FakeRequestClientPut() deposit_client = PrivateApiDepositClient(config=CLIENT_TEST_CONFIG, _client=_client) deposit_client.status_update('/update/status/fail', DEPOSIT_STATUS_LOAD_FAILURE) self.assertEquals(_client.args, ('http://nowhere:9000/update/status/fail', )) self.assertEquals(_client.kwargs, { 'json': { 'status': DEPOSIT_STATUS_LOAD_FAILURE, } }) class PrivateApiDepositClientCheckTest(unittest.TestCase): def test_check(self): """When check ok, this should return the deposit's status """ _client = FakeRequestClientGet( JsonResponse(ok=True, response={'status': 'something'})) deposit_client = PrivateApiDepositClient(config=CLIENT_TEST_CONFIG, _client=_client) r = deposit_client.check('/check') self.assertEquals(_client.args, ('http://nowhere:9000/check', )) self.assertEquals(_client.kwargs, {}) self.assertEquals(r, 'something') def test_check_fails(self): """Checking deposit can fail for some reason """ _client = FakeRequestClientGet( JsonResponse(ok=False, response=None)) deposit_client = PrivateApiDepositClient(config=CLIENT_TEST_CONFIG, _client=_client) with self.assertRaisesRegex( ValueError, 'Problem when checking deposit'): deposit_client.check('/check/fails') self.assertEquals(_client.args, ('http://nowhere:9000/check/fails', )) self.assertEquals(_client.kwargs, {}) diff --git a/swh/deposit/tests/loader/test_loader.py b/swh/deposit/tests/loader/test_loader.py index efdc30c1..2103d4ed 100644 --- a/swh/deposit/tests/loader/test_loader.py +++ b/swh/deposit/tests/loader/test_loader.py @@ -1,305 +1,305 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import unittest import shutil -from nose.plugins.attrib import attr +import pytest from rest_framework.test import APITestCase from swh.model import hashutil from swh.deposit.models import Deposit from swh.deposit.loader import loader from swh.deposit.config import ( PRIVATE_GET_RAW_CONTENT, PRIVATE_GET_DEPOSIT_METADATA, PRIVATE_PUT_DEPOSIT ) from django.core.urlresolvers import reverse from .common import SWHDepositTestClient, CLIENT_TEST_CONFIG from .. import TEST_LOADER_CONFIG from ..common import (BasicTestCase, WithAuthTestCase, CommonCreationRoutine, FileSystemCreationRoutine) TOOL_ID = 99 PROVIDER_ID = 12 class DepositLoaderInhibitsStorage: """Mixin class to inhibit the persistence and keep in memory the data sent for storage. cf. SWHDepositLoaderNoStorage """ def __init__(self, client=None): # client is not used here, transit it nonetheless to other mixins super().__init__(client=client) # typed data self.state = { 'origin': [], 'origin_visit': [], 'origin_metadata': [], 'content': [], 'directory': [], 'revision': [], 'release': [], 'snapshot': [], 'tool': [], 'provider': [] } def _add(self, type, l): """Add without duplicates and keeping the insertion order. Args: type (str): Type of objects concerned by the action l ([object]): List of 'type' object """ col = self.state[type] for o in l: if o in col: continue col.extend([o]) def send_origin(self, origin): origin.update({'id': 1}) self._add('origin', [origin]) return origin['id'] def send_origin_visit(self, origin_id, visit_date): origin_visit = { 'origin': origin_id, 'visit_date': visit_date, 'visit': 1, } self._add('origin_visit', [origin_visit]) return origin_visit def send_origin_metadata(self, origin_id, visit_date, provider_id, tool_id, metadata): origin_metadata = { 'origin_id': origin_id, 'visit_date': visit_date, 'provider_id': provider_id, 'tool_id': tool_id, 'metadata': metadata } self._add('origin_metadata', [origin_metadata]) return origin_metadata def send_tool(self, tool): tool = { 'tool_name': tool['tool_name'], 'tool_version': tool['tool_version'], 'tool_configuration': tool['tool_configuration'] } self._add('tool', [tool]) tool_id = TOOL_ID return tool_id def send_provider(self, provider): provider = { 'provider_name': provider['provider_name'], 'provider_type': provider['provider_type'], 'provider_url': provider['provider_url'], 'metadata': provider['metadata'] } self._add('provider', [provider]) provider_id = PROVIDER_ID return provider_id def maybe_load_contents(self, contents): self._add('content', contents) def maybe_load_directories(self, directories): self._add('directory', directories) def maybe_load_revisions(self, revisions): self._add('revision', revisions) def maybe_load_releases(self, releases): self._add('release', releases) def maybe_load_snapshot(self, snapshot): self._add('snapshot', [snapshot]) def open_fetch_history(self): pass def close_fetch_history_failure(self, fetch_history_id): pass def close_fetch_history_success(self, fetch_history_id): pass def update_origin_visit(self, origin_id, visit, status): self.status = status # Override to do nothing at the end def close_failure(self): pass def close_success(self): pass class TestLoaderUtils(unittest.TestCase): def assertRevisionsOk(self, expected_revisions): # noqa: N802 """Check the loader's revisions match the expected revisions. Expects self.loader to be instantiated and ready to be inspected (meaning the loading took place). Args: expected_revisions (dict): Dict with key revision id, value the targeted directory id. """ # The last revision being the one used later to start back from for rev in self.loader.state['revision']: rev_id = hashutil.hash_to_hex(rev['id']) directory_id = hashutil.hash_to_hex(rev['directory']) self.assertEquals(expected_revisions[rev_id], directory_id) class SWHDepositLoaderNoStorage(DepositLoaderInhibitsStorage, loader.DepositLoader): """Loader to test. It inherits from the actual deposit loader to actually test its correct behavior. It also inherits from DepositLoaderInhibitsStorage so that no persistence takes place. """ pass -@attr('fs') +@pytest.mark.fs class DepositLoaderScenarioTest(APITestCase, WithAuthTestCase, BasicTestCase, CommonCreationRoutine, FileSystemCreationRoutine, TestLoaderUtils): def setUp(self): super().setUp() # create the extraction dir used by the loader os.makedirs(TEST_LOADER_CONFIG['extraction_dir'], exist_ok=True) # 1. create a deposit with archive and metadata self.deposit_id = self.create_simple_binary_deposit() # 2. Sets a basic client which accesses the test data loader_client = SWHDepositTestClient(self.client, config=CLIENT_TEST_CONFIG) # 3. setup loader with no persistence and that client self.loader = SWHDepositLoaderNoStorage(client=loader_client) def tearDown(self): super().tearDown() shutil.rmtree(TEST_LOADER_CONFIG['extraction_dir']) def test_inject_deposit_ready(self): """Load a deposit which is ready """ args = [self.collection.name, self.deposit_id] archive_url = reverse(PRIVATE_GET_RAW_CONTENT, args=args) deposit_meta_url = reverse(PRIVATE_GET_DEPOSIT_METADATA, args=args) deposit_update_url = reverse(PRIVATE_PUT_DEPOSIT, args=args) # when self.loader.load(archive_url=archive_url, deposit_meta_url=deposit_meta_url, deposit_update_url=deposit_update_url) # then self.assertEquals(len(self.loader.state['content']), 1) self.assertEquals(len(self.loader.state['directory']), 1) self.assertEquals(len(self.loader.state['revision']), 1) self.assertEquals(len(self.loader.state['release']), 0) self.assertEquals(len(self.loader.state['snapshot']), 1) def test_inject_deposit_verify_metadata(self): """Load a deposit with metadata, test metadata integrity """ self.deposit_metadata_id = self.add_metadata_to_deposit( self.deposit_id) args = [self.collection.name, self.deposit_metadata_id] archive_url = reverse(PRIVATE_GET_RAW_CONTENT, args=args) deposit_meta_url = reverse(PRIVATE_GET_DEPOSIT_METADATA, args=args) deposit_update_url = reverse(PRIVATE_PUT_DEPOSIT, args=args) # when self.loader.load(archive_url=archive_url, deposit_meta_url=deposit_meta_url, deposit_update_url=deposit_update_url) # then self.assertEquals(len(self.loader.state['content']), 1) self.assertEquals(len(self.loader.state['directory']), 1) self.assertEquals(len(self.loader.state['revision']), 1) self.assertEquals(len(self.loader.state['release']), 0) self.assertEquals(len(self.loader.state['snapshot']), 1) self.assertEquals(len(self.loader.state['origin_metadata']), 1) self.assertEquals(len(self.loader.state['tool']), 1) self.assertEquals(len(self.loader.state['provider']), 1) codemeta = 'codemeta:' origin_url = 'https://hal-test.archives-ouvertes.fr/hal-01243065' expected_origin_metadata = { '@xmlns': 'http://www.w3.org/2005/Atom', '@xmlns:codemeta': 'https://doi.org/10.5063/SCHEMA/CODEMETA-2.0', 'author': { 'email': 'hal@ccsd.cnrs.fr', 'name': 'HAL' }, codemeta + 'url': origin_url, codemeta + 'runtimePlatform': 'phpstorm', codemeta + 'license': [ { codemeta + 'name': 'GNU General Public License v3.0 only' }, { codemeta + 'name': 'CeCILL Free Software License Agreement v1.1' # noqa } ], codemeta + 'author': { codemeta + 'name': 'Morane Gruenpeter' }, codemeta + 'programmingLanguage': ['php', 'python', 'C'], codemeta + 'applicationCategory': 'test', codemeta + 'dateCreated': '2017-05-03T16:08:47+02:00', codemeta + 'version': '1', 'external_identifier': 'hal-01243065', 'title': 'Composing a Web of Audio Applications', codemeta + 'description': 'this is the description', 'id': 'hal-01243065', 'client': 'hal', codemeta + 'keywords': 'DSP programming,Web', codemeta + 'developmentStatus': 'stable' } result = self.loader.state['origin_metadata'][0] self.assertEquals(result['metadata'], expected_origin_metadata) self.assertEquals(result['tool_id'], TOOL_ID) self.assertEquals(result['provider_id'], PROVIDER_ID) deposit = Deposit.objects.get(pk=self.deposit_id) self.assertRegex(deposit.swh_id, r'^swh:1:dir:.*') self.assertEquals(deposit.swh_id_context, '%s;origin=%s' % ( deposit.swh_id, origin_url )) self.assertRegex(deposit.swh_anchor_id, r'^swh:1:rev:.*') self.assertEquals(deposit.swh_anchor_id_context, '%s;origin=%s' % ( deposit.swh_anchor_id, origin_url ))