diff --git a/requirements-test.txt b/requirements-test.txt index 2e64c384..cda155f8 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,7 +1,8 @@ pytest pytest-django pytest-mock swh.scheduler[testing] +swh.loader.core[testing] pytest-postgresql >= 2.1.0 requests_mock -django-stubs < 1.3.0 +django-stubs diff --git a/swh/deposit/api/private/deposit_check.py b/swh/deposit/api/private/deposit_check.py index 7f0387fa..e17d4eab 100644 --- a/swh/deposit/api/private/deposit_check.py +++ b/swh/deposit/api/private/deposit_check.py @@ -1,228 +1,229 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json import re import tarfile import zipfile from itertools import chain from shutil import get_unpack_formats from rest_framework import status from swh.scheduler.utils import create_oneshot_task_dict from . import DepositReadMixin, SWHPrivateAPIView from ..common import SWHGetDepositAPI from ...config import DEPOSIT_STATUS_VERIFIED, DEPOSIT_STATUS_REJECTED from ...config import ARCHIVE_TYPE from ...models import Deposit MANDATORY_FIELDS_MISSING = 'Mandatory fields are missing' ALTERNATE_FIELDS_MISSING = 'Mandatory alternate fields are missing' MANDATORY_ARCHIVE_UNREADABLE = 'At least one of its associated archives is not readable' # noqa MANDATORY_ARCHIVE_INVALID = 'Mandatory archive is invalid (i.e contains only one archive)' # noqa MANDATORY_ARCHIVE_UNSUPPORTED = 'Mandatory archive type is not supported' MANDATORY_ARCHIVE_MISSING = 'Deposit without archive is rejected' ARCHIVE_EXTENSIONS = [ 'zip', 'tar', 'tar.gz', 'xz', 'tar.xz', 'bz2', 'tar.bz2', 'Z', 'tar.Z', 'tgz', '7z' ] PATTERN_ARCHIVE_EXTENSION = re.compile( r'.*\.(%s)$' % '|'.join(ARCHIVE_EXTENSIONS)) def known_archive_format(filename): return any(filename.endswith(t) for t in chain(*(x[1] for x in get_unpack_formats()))) class SWHChecksDeposit(SWHPrivateAPIView, SWHGetDepositAPI, DepositReadMixin): """Dedicated class to read a deposit's raw archives content. Only GET is supported. """ def _check_deposit_archives(self, deposit): """Given a deposit, check each deposit request of type archive. Args: The deposit to check archives for Returns tuple (status, error_detail): True, None if all archives are ok, (False, ) otherwise. """ requests = list(self._deposit_requests( deposit, request_type=ARCHIVE_TYPE)) if len(requests) == 0: # no associated archive is refused return False, { 'archive': [{ 'summary': MANDATORY_ARCHIVE_MISSING, }] } errors = [] for archive_request in requests: check, error_message = self._check_archive(archive_request) if not check: errors.append({ 'summary': error_message, 'fields': [archive_request.id] }) if not errors: return True, None return False, { 'archive': errors } def _check_archive(self, archive_request): """Check that a deposit associated archive is ok: - readable - supported archive format - valid content: the archive does not contain a single archive file If any of those checks are not ok, return the corresponding failing check. Args: archive_path (DepositRequest): Archive to check Returns: (True, None) if archive is check compliant, (False, ) otherwise. """ archive_path = archive_request.archive.path if not known_archive_format(archive_path): return False, MANDATORY_ARCHIVE_UNSUPPORTED try: if zipfile.is_zipfile(archive_path): with zipfile.ZipFile(archive_path) as f: files = f.namelist() elif tarfile.is_tarfile(archive_path): with tarfile.open(archive_path) as f: files = f.getnames() else: return False, MANDATORY_ARCHIVE_UNSUPPORTED except Exception: return False, MANDATORY_ARCHIVE_UNREADABLE if len(files) > 1: return True, None element = files[0] if PATTERN_ARCHIVE_EXTENSION.match(element): # archive in archive! return False, MANDATORY_ARCHIVE_INVALID return True, None def _check_metadata(self, metadata): """Check to execute on all metadata for mandatory field presence. Args: metadata (dict): Metadata dictionary to check for mandatory fields Returns: tuple (status, error_detail): True, None if metadata are ok (False, ) otherwise. """ required_fields = { 'author': False, } alternate_fields = { ('name', 'title'): False, # alternate field, at least one # of them must be present } for field, value in metadata.items(): for name in required_fields: if name in field: required_fields[name] = True for possible_names in alternate_fields: for possible_name in possible_names: if possible_name in field: alternate_fields[possible_names] = True continue mandatory_result = [k for k, v in required_fields.items() if not v] optional_result = [ ' or '.join(k) for k, v in alternate_fields.items() if not v] if mandatory_result == [] and optional_result == []: return True, None detail = [] if mandatory_result != []: detail.append({ 'summary': MANDATORY_FIELDS_MISSING, 'fields': mandatory_result }) if optional_result != []: detail.append({ 'summary': ALTERNATE_FIELDS_MISSING, 'fields': optional_result, }) return False, { 'metadata': detail } def process_get(self, req, collection_name, deposit_id): """Build a unique tarball from the multiple received and stream that content to the client. Args: req (Request): collection_name (str): Collection owning the deposit deposit_id (id): Deposit concerned by the reading Returns: Tuple status, stream of content, content-type """ deposit = Deposit.objects.get(pk=deposit_id) metadata = self._metadata_get(deposit) problems = {} # will check each deposit's associated request (both of type # archive and metadata) for errors archives_status, error_detail = self._check_deposit_archives(deposit) if not archives_status: problems.update(error_detail) metadata_status, error_detail = self._check_metadata(metadata) if not metadata_status: problems.update(error_detail) deposit_status = archives_status and metadata_status # if any problems arose, the deposit is rejected if not deposit_status: deposit.status = DEPOSIT_STATUS_REJECTED deposit.status_detail = problems response = { 'status': deposit.status, 'details': deposit.status_detail, } else: deposit.status = DEPOSIT_STATUS_VERIFIED response = { 'status': deposit.status, } if not deposit.load_task_id and self.config['checks']: url = deposit.origin_url task = create_oneshot_task_dict( - 'load-deposit', url=url, deposit_id=deposit.id) + 'load-deposit', url=url, deposit_id=deposit.id, + retries_left=3) load_task_id = self.scheduler.create_tasks([task])[0]['id'] deposit.load_task_id = load_task_id deposit.save() return status.HTTP_200_OK, json.dumps(response), 'application/json' diff --git a/swh/deposit/config.py b/swh/deposit/config.py index b7a1b6a0..912fb602 100644 --- a/swh/deposit/config.py +++ b/swh/deposit/config.py @@ -1,111 +1,111 @@ # Copyright (C) 2017-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import logging from typing import Any, Dict, Tuple from swh.core.config import SWHConfig from swh.scheduler import get_scheduler # IRIs (Internationalized Resource identifier) sword 2.0 specified EDIT_SE_IRI = 'edit_se_iri' EM_IRI = 'em_iri' CONT_FILE_IRI = 'cont_file_iri' SD_IRI = 'servicedocument' COL_IRI = 'upload' STATE_IRI = 'state_iri' PRIVATE_GET_RAW_CONTENT = 'private-download' PRIVATE_CHECK_DEPOSIT = 'check-deposit' PRIVATE_PUT_DEPOSIT = 'private-update' PRIVATE_GET_DEPOSIT_METADATA = 'private-read' PRIVATE_LIST_DEPOSITS = 'private-deposit-list' ARCHIVE_KEY = 'archive' METADATA_KEY = 'metadata' RAW_METADATA_KEY = 'raw-metadata' ARCHIVE_TYPE = 'archive' METADATA_TYPE = 'metadata' AUTHORIZED_PLATFORMS = ['development', 'production', 'testing'] DEPOSIT_STATUS_REJECTED = 'rejected' DEPOSIT_STATUS_PARTIAL = 'partial' DEPOSIT_STATUS_DEPOSITED = 'deposited' DEPOSIT_STATUS_VERIFIED = 'verified' DEPOSIT_STATUS_LOAD_SUCCESS = 'done' DEPOSIT_STATUS_LOAD_FAILURE = 'failed' # Revision author for deposit SWH_PERSON = { 'name': 'Software Heritage', 'fullname': 'Software Heritage', 'email': 'robot@softwareheritage.org' } def setup_django_for(platform=None, config_file=None): """Setup function for command line tools (swh.deposit.create_user) to initialize the needed db access. Note: Do not import any django related module prior to this function call. Otherwise, this will raise an django.core.exceptions.ImproperlyConfigured error message. Args: platform (str): the platform the scheduling is running config_file (str): Extra configuration file (typically for the production platform) Raises: ValueError in case of wrong platform inputs. """ if platform is not None: if platform not in AUTHORIZED_PLATFORMS: raise ValueError('Platform should be one of %s' % AUTHORIZED_PLATFORMS) if 'DJANGO_SETTINGS_MODULE' not in os.environ: os.environ['DJANGO_SETTINGS_MODULE'] = ( 'swh.deposit.settings.%s' % platform) if config_file: os.environ.setdefault('SWH_CONFIG_FILENAME', config_file) import django django.setup() class SWHDefaultConfig(SWHConfig): """Mixin intended to enrich views with SWH configuration. """ CONFIG_BASE_FILENAME = 'deposit/server' DEFAULT_CONFIG = { 'max_upload_size': ('int', 209715200), 'checks': ('bool', True), 'scheduler': ('dict', { 'cls': 'remote', 'args': { 'url': 'http://localhost:5008/' } }) } ADDITIONAL_CONFIG = {} # type: Dict[str, Tuple[str, Any]] def __init__(self, **config): super().__init__() self.config = self.parse_config_file( additional_configs=[self.ADDITIONAL_CONFIG]) self.config.update(config) self.log = logging.getLogger('swh.deposit') - if self.config['checks']: + if self.config.get('scheduler'): self.scheduler = get_scheduler(**self.config['scheduler']) diff --git a/swh/deposit/tests/__init__.py b/swh/deposit/tests/__init__.py index 2b34b26e..e69de29b 100644 --- a/swh/deposit/tests/__init__.py +++ b/swh/deposit/tests/__init__.py @@ -1,42 +0,0 @@ -# Copyright (C) 2017-2019 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -from swh.deposit.config import setup_django_for -from swh.deposit.config import SWHDefaultConfig # noqa - - -TEST_CONFIG = { - 'max_upload_size': 500, - 'extraction_dir': '/tmp/swh-deposit/test/extraction-dir', - 'checks': False, - 'provider': { - 'provider_name': '', - 'provider_type': 'deposit_client', - 'provider_url': '', - 'metadata': { - } - }, - 'tool': { - 'name': 'swh-deposit', - 'version': '0.0.1', - 'configuration': { - 'sword_version': '2' - } - } -} - - -def parse_deposit_config_file(base_filename=None, config_filename=None, - additional_configs=None, global_config=True): - return TEST_CONFIG - - -# monkey patch classes method permits to override, for tests purposes, -# the default configuration without side-effect, i.e do not load the -# configuration from disk -SWHDefaultConfig.parse_config_file = parse_deposit_config_file # type: ignore - - -setup_django_for('testing') diff --git a/swh/deposit/tests/api/test_deposit_binary.py b/swh/deposit/tests/api/test_deposit_binary.py index 7d3eac5d..8f1cc763 100644 --- a/swh/deposit/tests/api/test_deposit_binary.py +++ b/swh/deposit/tests/api/test_deposit_binary.py @@ -1,544 +1,543 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from django.core.files.uploadedfile import InMemoryUploadedFile from django.urls import reverse from io import BytesIO from rest_framework import status -from swh.deposit.tests import TEST_CONFIG from swh.deposit.config import ( COL_IRI, EM_IRI, DEPOSIT_STATUS_DEPOSITED, ) from swh.deposit.models import Deposit, DepositRequest from swh.deposit.parsers import parse_xml from swh.deposit.tests.common import create_arborescence_archive, check_archive def test_post_deposit_binary_no_slug( authenticated_client, deposit_collection, sample_archive): """Posting a binary deposit without slug header should return 400 """ url = reverse(COL_IRI, args=[deposit_collection.name]) # when response = authenticated_client.post( url, content_type='application/zip', # as zip data=sample_archive['data'], # + headers CONTENT_LENGTH=sample_archive['length'], HTTP_CONTENT_MD5=sample_archive['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false', HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') assert b'Missing SLUG header' in response.content assert response.status_code == status.HTTP_400_BAD_REQUEST def test_post_deposit_binary_support( authenticated_client, deposit_collection, sample_archive): """Binary upload with content-type not in [zip,x-tar] should return 415 """ # given url = reverse(COL_IRI, args=[deposit_collection.name]) external_id = 'some-external-id-1' # when response = authenticated_client.post( url, content_type='application/octet-stream', data=sample_archive['data'], # + headers CONTENT_LENGTH=sample_archive['length'], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=sample_archive['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false', HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') # then assert response.status_code == status.HTTP_415_UNSUPPORTED_MEDIA_TYPE with pytest.raises(Deposit.DoesNotExist): Deposit.objects.get(external_id=external_id) def test_post_deposit_binary_upload_ok( authenticated_client, deposit_collection, sample_archive): """Binary upload with correct headers should return 201 with receipt """ # given url = reverse(COL_IRI, args=[deposit_collection.name]) external_id = 'some-external-id-1' # when response = authenticated_client.post( url, content_type='application/zip', # as zip data=sample_archive['data'], # + headers CONTENT_LENGTH=sample_archive['length'], # other headers needs HTTP_ prefix to be taken into account HTTP_SLUG=external_id, HTTP_CONTENT_MD5=sample_archive['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false', HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % ( sample_archive['name'], )) # then response_content = parse_xml(BytesIO(response.content)) assert response.status_code == status.HTTP_201_CREATED deposit_id = response_content['deposit_id'] deposit = Deposit.objects.get(pk=deposit_id) assert deposit.status == DEPOSIT_STATUS_DEPOSITED assert deposit.external_id == external_id assert deposit.collection == deposit_collection assert deposit.swh_id is None deposit_request = DepositRequest.objects.get(deposit=deposit) check_archive(sample_archive['name'], deposit_request.archive.name) assert deposit_request.metadata is None assert deposit_request.raw_metadata is None response_content = parse_xml(BytesIO(response.content)) assert response_content['deposit_archive'] == sample_archive['name'] assert int(response_content['deposit_id']) == deposit.id assert response_content['deposit_status'] == deposit.status edit_se_iri = reverse('edit_se_iri', args=[deposit_collection.name, deposit.id]) assert response._headers['location'] == ( 'Location', 'http://testserver' + edit_se_iri) def test_post_deposit_binary_failure_unsupported_packaging_header( authenticated_client, deposit_collection, sample_archive): """Bin deposit without supported content_disposition header returns 400 """ # given url = reverse(COL_IRI, args=[deposit_collection.name]) external_id = 'some-external-id' # when response = authenticated_client.post( url, content_type='application/zip', data=sample_archive['data'], # + headers CONTENT_LENGTH=sample_archive['length'], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=sample_archive['md5sum'], HTTP_PACKAGING='something-unsupported', HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') # then assert response.status_code == status.HTTP_400_BAD_REQUEST with pytest.raises(Deposit.DoesNotExist): Deposit.objects.get(external_id=external_id) def test_post_deposit_binary_upload_no_content_disposition_header( authenticated_client, deposit_collection, sample_archive): """Binary upload without content_disposition header should return 400 """ # given url = reverse(COL_IRI, args=[deposit_collection.name]) external_id = 'some-external-id' # when response = authenticated_client.post( url, content_type='application/zip', data=sample_archive['data'], # + headers CONTENT_LENGTH=sample_archive['length'], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=sample_archive['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false') # then assert response.status_code == status.HTTP_400_BAD_REQUEST with pytest.raises(Deposit.DoesNotExist): Deposit.objects.get(external_id=external_id) def test_post_deposit_mediation_not_supported( authenticated_client, deposit_collection, sample_archive): """Binary upload with mediation should return a 412 response """ # given url = reverse(COL_IRI, args=[deposit_collection.name]) external_id = 'some-external-id-1' # when response = authenticated_client.post( url, content_type='application/zip', data=sample_archive['data'], # + headers CONTENT_LENGTH=sample_archive['length'], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=sample_archive['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false', HTTP_ON_BEHALF_OF='someone', HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') # then assert response.status_code == status.HTTP_412_PRECONDITION_FAILED with pytest.raises(Deposit.DoesNotExist): Deposit.objects.get(external_id=external_id) def test_post_deposit_binary_upload_fail_if_upload_size_limit_exceeded( authenticated_client, deposit_collection, sample_archive, tmp_path): """Binary upload must not exceed the limit set up... """ tmp_path = str(tmp_path) url = reverse(COL_IRI, args=[deposit_collection.name]) archive = create_arborescence_archive( tmp_path, 'archive2', 'file2', b'some content in file', - up_to_size=TEST_CONFIG['max_upload_size']) + up_to_size=500) external_id = 'some-external-id' # when response = authenticated_client.post( url, content_type='application/zip', data=archive['data'], # + headers CONTENT_LENGTH=archive['length'], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=archive['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false', HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') # then assert response.status_code == status.HTTP_413_REQUEST_ENTITY_TOO_LARGE assert b'Upload size limit exceeded' in response.content with pytest.raises(Deposit.DoesNotExist): Deposit.objects.get(external_id=external_id) def test_post_deposit_2_post_2_different_deposits( authenticated_client, deposit_collection, sample_archive): """2 posting deposits should return 2 different 201 with receipt """ url = reverse(COL_IRI, args=[deposit_collection.name]) # when response = authenticated_client.post( url, content_type='application/zip', # as zip data=sample_archive['data'], # + headers CONTENT_LENGTH=sample_archive['length'], HTTP_SLUG='some-external-id-1', HTTP_CONTENT_MD5=sample_archive['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false', HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') # then assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content['deposit_id'] deposit = Deposit.objects.get(pk=deposit_id) deposits = Deposit.objects.all() assert len(deposits) == 1 assert deposits[0] == deposit # second post response = authenticated_client.post( url, content_type='application/x-tar', # as zip data=sample_archive['data'], # + headers CONTENT_LENGTH=sample_archive['length'], HTTP_SLUG='another-external-id', HTTP_CONTENT_MD5=sample_archive['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false', HTTP_CONTENT_DISPOSITION='attachment; filename=filename1') assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id2 = response_content['deposit_id'] deposit2 = Deposit.objects.get(pk=deposit_id2) assert deposit != deposit2 deposits = Deposit.objects.all().order_by('id') assert len(deposits) == 2 assert list(deposits), [deposit == deposit2] def test_post_deposit_binary_and_post_to_add_another_archive( authenticated_client, deposit_collection, sample_archive, tmp_path): """Updating a deposit should return a 201 with receipt """ tmp_path = str(tmp_path) url = reverse(COL_IRI, args=[deposit_collection.name]) external_id = 'some-external-id-1' # when response = authenticated_client.post( url, content_type='application/zip', # as zip data=sample_archive['data'], # + headers CONTENT_LENGTH=sample_archive['length'], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=sample_archive['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='true', HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % ( sample_archive['name'], )) # then assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content['deposit_id'] deposit = Deposit.objects.get(pk=deposit_id) assert deposit.status == 'partial' assert deposit.external_id == external_id assert deposit.collection == deposit_collection assert deposit.swh_id is None deposit_request = DepositRequest.objects.get(deposit=deposit) assert deposit_request.deposit == deposit assert deposit_request.type == 'archive' check_archive(sample_archive['name'], deposit_request.archive.name) # 2nd archive to upload archive2 = create_arborescence_archive( tmp_path, 'archive2', 'file2', b'some other content in file') # uri to update the content update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit_id]) # adding another archive for the deposit and finalizing it response = authenticated_client.post( update_uri, content_type='application/zip', # as zip data=archive2['data'], # + headers CONTENT_LENGTH=archive2['length'], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=archive2['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % ( archive2['name'])) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit = Deposit.objects.get(pk=deposit_id) assert deposit.status == DEPOSIT_STATUS_DEPOSITED assert deposit.external_id == external_id assert deposit.collection == deposit_collection assert deposit.swh_id is None deposit_requests = list(DepositRequest.objects.filter(deposit=deposit). order_by('id')) # 2 deposit requests for the same deposit assert len(deposit_requests) == 2 assert deposit_requests[0].deposit == deposit assert deposit_requests[0].type == 'archive' check_archive(sample_archive['name'], deposit_requests[0].archive.name) assert deposit_requests[1].deposit == deposit assert deposit_requests[1].type == 'archive' check_archive(archive2['name'], deposit_requests[1].archive.name) # only 1 deposit in db deposits = Deposit.objects.all() assert len(deposits) == 1 def test_post_deposit_then_update_refused( authenticated_client, deposit_collection, sample_archive, atom_dataset, tmp_path): """Updating a deposit with status 'ready' should return a 400 """ tmp_path = str(tmp_path) url = reverse(COL_IRI, args=[deposit_collection.name]) external_id = 'some-external-id-1' # when response = authenticated_client.post( url, content_type='application/zip', # as zip data=sample_archive['data'], # + headers CONTENT_LENGTH=sample_archive['length'], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=sample_archive['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false', HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') # then assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(BytesIO(response.content)) deposit_id = response_content['deposit_id'] deposit = Deposit.objects.get(pk=deposit_id) assert deposit.status == DEPOSIT_STATUS_DEPOSITED assert deposit.external_id == external_id assert deposit.collection == deposit_collection assert deposit.swh_id is None deposit_request = DepositRequest.objects.get(deposit=deposit) assert deposit_request.deposit == deposit check_archive('filename0', deposit_request.archive.name) # updating/adding is forbidden # uri to update the content edit_se_iri = reverse( 'edit_se_iri', args=[deposit_collection.name, deposit_id]) em_iri = reverse( 'em_iri', args=[deposit_collection.name, deposit_id]) # Testing all update/add endpoint should fail # since the status is ready archive2 = create_arborescence_archive( tmp_path, 'archive2', 'file2', b'some content in file 2') # replacing file is no longer possible since the deposit's # status is ready r = authenticated_client.put( em_iri, content_type='application/zip', data=archive2['data'], CONTENT_LENGTH=archive2['length'], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=archive2['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false', HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') assert r.status_code == status.HTTP_400_BAD_REQUEST # adding file is no longer possible since the deposit's status # is ready r = authenticated_client.post( em_iri, content_type='application/zip', data=archive2['data'], CONTENT_LENGTH=archive2['length'], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=archive2['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false', HTTP_CONTENT_DISPOSITION='attachment; filename=filename0') assert r.status_code == status.HTTP_400_BAD_REQUEST # replacing metadata is no longer possible since the deposit's # status is ready r = authenticated_client.put( edit_se_iri, content_type='application/atom+xml;type=entry', data=atom_dataset['entry-data-deposit-binary'], CONTENT_LENGTH=len(atom_dataset['entry-data-deposit-binary']), HTTP_SLUG=external_id) assert r.status_code == status.HTTP_400_BAD_REQUEST # adding new metadata is no longer possible since the # deposit's status is ready r = authenticated_client.post( edit_se_iri, content_type='application/atom+xml;type=entry', data=atom_dataset['entry-data-deposit-binary'], CONTENT_LENGTH=len(atom_dataset['entry-data-deposit-binary']), HTTP_SLUG=external_id) assert r.status_code == status.HTTP_400_BAD_REQUEST archive_content = b'some content representing archive' archive = InMemoryUploadedFile( BytesIO(archive_content), field_name='archive0', name='archive0', content_type='application/zip', size=len(archive_content), charset=None) atom_entry = InMemoryUploadedFile( BytesIO(atom_dataset['entry-data-deposit-binary']), field_name='atom0', name='atom0', content_type='application/atom+xml; charset="utf-8"', size=len(atom_dataset['entry-data-deposit-binary']), charset='utf-8') # replacing multipart metadata is no longer possible since the # deposit's status is ready r = authenticated_client.put( edit_se_iri, format='multipart', data={ 'archive': archive, 'atom_entry': atom_entry, }) assert r.status_code == status.HTTP_400_BAD_REQUEST # adding new metadata is no longer possible since the # deposit's status is ready r = authenticated_client.post( edit_se_iri, format='multipart', data={ 'archive': archive, 'atom_entry': atom_entry, }) assert r.status_code == status.HTTP_400_BAD_REQUEST diff --git a/swh/deposit/tests/api/test_deposit_private_check.py b/swh/deposit/tests/api/test_deposit_private_check.py index 50f0b1d0..a6bab004 100644 --- a/swh/deposit/tests/api/test_deposit_private_check.py +++ b/swh/deposit/tests/api/test_deposit_private_check.py @@ -1,263 +1,266 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from django.urls import reverse +import pytest from rest_framework import status from swh.deposit.config import ( DEPOSIT_STATUS_VERIFIED, PRIVATE_CHECK_DEPOSIT, DEPOSIT_STATUS_DEPOSITED, DEPOSIT_STATUS_REJECTED, COL_IRI ) from swh.deposit.api.private.deposit_check import ( MANDATORY_ARCHIVE_INVALID, MANDATORY_FIELDS_MISSING, MANDATORY_ARCHIVE_UNSUPPORTED, ALTERNATE_FIELDS_MISSING, MANDATORY_ARCHIVE_MISSING ) from swh.deposit.models import Deposit from swh.deposit.parsers import parse_xml from swh.deposit.tests.common import ( create_arborescence_archive, create_archive_with_archive ) PRIVATE_CHECK_DEPOSIT_NC = PRIVATE_CHECK_DEPOSIT + '-nc' def private_check_url_endpoints(collection, deposit): """There are 2 endpoints to check (one with collection, one without)""" return [ reverse(PRIVATE_CHECK_DEPOSIT, args=[collection.name, deposit.id]), reverse(PRIVATE_CHECK_DEPOSIT_NC, args=[deposit.id]) ] +@pytest.mark.parametrize( + "extension", ['zip', 'tar', 'tar.gz', 'tar.bz2', 'tar.xz']) def test_deposit_ok( - authenticated_client, deposit_collection, ready_deposit_ok): + authenticated_client, deposit_collection, ready_deposit_ok, extension): """Proper deposit should succeed the checks (-> status ready) """ deposit = ready_deposit_ok for url in private_check_url_endpoints(deposit_collection, deposit): response = authenticated_client.get(url) assert response.status_code == status.HTTP_200_OK data = response.json() assert data['status'] == DEPOSIT_STATUS_VERIFIED deposit = Deposit.objects.get(pk=deposit.id) assert deposit.status == DEPOSIT_STATUS_VERIFIED deposit.status = DEPOSIT_STATUS_DEPOSITED deposit.save() - +@pytest.mark.parametrize( + "extension", ['zip', 'tar', 'tar.gz', 'tar.bz2', 'tar.xz']) def test_deposit_invalid_tarball( - tmp_path, authenticated_client, deposit_collection): + tmp_path, authenticated_client, deposit_collection, extension): """Deposit with tarball (of 1 tarball) should fail the checks: rejected """ - for archive_extension in ['zip', 'tar', 'tar.gz', 'tar.bz2', 'tar.xz']: - deposit = create_deposit_archive_with_archive( - tmp_path, archive_extension, - authenticated_client, - deposit_collection.name) - for url in private_check_url_endpoints(deposit_collection, deposit): - response = authenticated_client.get(url) - assert response.status_code == status.HTTP_200_OK - data = response.json() - assert data['status'] == DEPOSIT_STATUS_REJECTED - details = data['details'] - # archive checks failure - assert len(details['archive']) == 1 - assert details['archive'][0]['summary'] == \ - MANDATORY_ARCHIVE_INVALID - - deposit = Deposit.objects.get(pk=deposit.id) - assert deposit.status == DEPOSIT_STATUS_REJECTED + deposit = create_deposit_archive_with_archive( + tmp_path, extension, + authenticated_client, + deposit_collection.name) + for url in private_check_url_endpoints(deposit_collection, deposit): + response = authenticated_client.get(url) + assert response.status_code == status.HTTP_200_OK + data = response.json() + assert data['status'] == DEPOSIT_STATUS_REJECTED + details = data['details'] + # archive checks failure + assert len(details['archive']) == 1 + assert details['archive'][0]['summary'] == \ + MANDATORY_ARCHIVE_INVALID + + deposit = Deposit.objects.get(pk=deposit.id) + assert deposit.status == DEPOSIT_STATUS_REJECTED def test_deposit_ko_missing_tarball( authenticated_client, deposit_collection, ready_deposit_only_metadata): """Deposit without archive should fail the checks: rejected """ deposit = ready_deposit_only_metadata assert deposit.status == DEPOSIT_STATUS_DEPOSITED for url in private_check_url_endpoints(deposit_collection, deposit): response = authenticated_client.get(url) assert response.status_code == status.HTTP_200_OK data = response.json() assert data['status'] == DEPOSIT_STATUS_REJECTED details = data['details'] # archive checks failure assert len(details['archive']) == 1 assert details['archive'][0]['summary'] == MANDATORY_ARCHIVE_MISSING deposit = Deposit.objects.get(pk=deposit.id) assert deposit.status == DEPOSIT_STATUS_REJECTED deposit.status = DEPOSIT_STATUS_DEPOSITED deposit.save() def test_deposit_ko_unsupported_tarball( tmp_path, authenticated_client, deposit_collection, ready_deposit_invalid_archive): """Deposit with an unsupported tarball should fail the checks: rejected """ deposit = ready_deposit_invalid_archive assert DEPOSIT_STATUS_DEPOSITED == deposit.status for url in private_check_url_endpoints(deposit_collection, deposit): response = authenticated_client.get(url) assert response.status_code == status.HTTP_200_OK data = response.json() assert data['status'] == DEPOSIT_STATUS_REJECTED details = data['details'] # archive checks failure assert len(details['archive']) == 1 assert details['archive'][0]['summary'] == \ MANDATORY_ARCHIVE_UNSUPPORTED # metadata check failure assert len(details['metadata']) == 2 mandatory = details['metadata'][0] assert mandatory['summary'] == MANDATORY_FIELDS_MISSING assert set(mandatory['fields']) == set(['author']) alternate = details['metadata'][1] assert alternate['summary'] == ALTERNATE_FIELDS_MISSING assert alternate['fields'] == ['name or title'] deposit = Deposit.objects.get(pk=deposit.id) assert deposit.status == DEPOSIT_STATUS_REJECTED deposit.status = DEPOSIT_STATUS_DEPOSITED deposit.save() def test_check_deposit_metadata_ok( authenticated_client, deposit_collection, ready_deposit_ok): """Proper deposit should succeed the checks (-> status ready) with all **MUST** metadata using the codemeta metadata test set """ deposit = ready_deposit_ok assert deposit.status == DEPOSIT_STATUS_DEPOSITED for url in private_check_url_endpoints(deposit_collection, deposit): response = authenticated_client.get(url) assert response.status_code == status.HTTP_200_OK data = response.json() assert data['status'] == DEPOSIT_STATUS_VERIFIED deposit = Deposit.objects.get(pk=deposit.id) assert deposit.status == DEPOSIT_STATUS_VERIFIED deposit.status = DEPOSIT_STATUS_DEPOSITED deposit.save() def test_check_metadata_ok(swh_checks_deposit): actual_check, detail = swh_checks_deposit._check_metadata({ 'url': 'something', 'external_identifier': 'something-else', 'name': 'foo', 'author': 'someone', }) assert actual_check is True assert detail is None def test_check_metadata_ok2(swh_checks_deposit): actual_check, detail = swh_checks_deposit._check_metadata({ 'url': 'something', 'external_identifier': 'something-else', 'title': 'bar', 'author': 'someone', }) assert actual_check is True assert detail is None def test_check_metadata_ko(swh_checks_deposit): """Missing optional field should be caught """ actual_check, error_detail = swh_checks_deposit._check_metadata({ 'url': 'something', 'external_identifier': 'something-else', 'author': 'someone', }) expected_error = { 'metadata': [{ 'summary': 'Mandatory alternate fields are missing', 'fields': ['name or title'], }] } assert actual_check is False assert error_detail == expected_error def test_check_metadata_ko2(swh_checks_deposit): """Missing mandatory fields should be caught """ actual_check, error_detail = swh_checks_deposit._check_metadata({ 'url': 'something', 'external_identifier': 'something-else', 'title': 'foobar', }) expected_error = { 'metadata': [{ 'summary': 'Mandatory fields are missing', 'fields': ['author'], }] } assert actual_check is False assert error_detail == expected_error def create_deposit_archive_with_archive( root_path, archive_extension, client, collection_name): # we create the holding archive to a given extension archive = create_arborescence_archive( root_path, 'archive1', 'file1', b'some content in file', extension=archive_extension) # now we create an archive holding the first created archive invalid_archive = create_archive_with_archive( root_path, 'invalid.tgz', archive) # we deposit it response = client.post( reverse(COL_IRI, args=[collection_name]), content_type='application/x-tar', data=invalid_archive['data'], CONTENT_LENGTH=invalid_archive['length'], HTTP_MD5SUM=invalid_archive['md5sum'], HTTP_SLUG='external-id', HTTP_IN_PROGRESS=False, HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % ( invalid_archive['name'], )) # then assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(response.content) deposit_status = response_content['deposit_status'] assert deposit_status == DEPOSIT_STATUS_DEPOSITED deposit_id = int(response_content['deposit_id']) deposit = Deposit.objects.get(pk=deposit_id) assert DEPOSIT_STATUS_DEPOSITED == deposit.status return deposit diff --git a/swh/deposit/tests/api/test_service_document.py b/swh/deposit/tests/api/test_service_document.py index 558c7598..dda59a88 100644 --- a/swh/deposit/tests/api/test_service_document.py +++ b/swh/deposit/tests/api/test_service_document.py @@ -1,87 +1,86 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from django.urls import reverse from rest_framework import status -from swh.deposit.tests import TEST_CONFIG from swh.deposit.config import SD_IRI def test_service_document_no_auth_fails(client): """Without authentication, service document endpoint should return 401 """ url = reverse(SD_IRI) response = client.get(url) assert response.status_code == status.HTTP_401_UNAUTHORIZED def test_service_document_no_auth_with_http_auth_should_not_break(client): """Without auth, sd endpoint through browser should return 401 """ url = reverse(SD_IRI) response = client.get( url, HTTP_ACCEPT='text/html,application/xml;q=9,*/*,q=8') assert response.status_code == status.HTTP_401_UNAUTHORIZED def test_service_document(authenticated_client, deposit_user): """With authentication, service document list user's collection """ url = reverse(SD_IRI) response = authenticated_client.get(url) check_response(response, deposit_user.username) def test_service_document_with_http_accept_header( authenticated_client, deposit_user): """With authentication, with browser, sd list user's collection """ url = reverse(SD_IRI) response = authenticated_client.get( url, HTTP_ACCEPT='text/html,application/xml;q=9,*/*,q=8') check_response(response, deposit_user.username) def check_response(response, username): assert response.status_code == status.HTTP_200_OK assert response.content.decode('utf-8') == \ ''' 2.0 %s The Software Heritage (SWH) Archive %s Software Collection application/zip application/x-tar Collection Policy Software Heritage Archive Collect, Preserve, Share false false http://purl.org/net/sword/package/SimpleZip http://testserver/1/%s/ %s -''' % (TEST_CONFIG['max_upload_size'], +''' % (500, username, username, username, username) # noqa diff --git a/swh/deposit/tests/conftest.py b/swh/deposit/tests/conftest.py index 79ac88ad..6346896d 100644 --- a/swh/deposit/tests/conftest.py +++ b/swh/deposit/tests/conftest.py @@ -1,335 +1,390 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import base64 import pytest import psycopg2 from django.urls import reverse from django.test.utils import setup_databases # type: ignore # mypy is asked to ignore the import statement above because setup_databases # is not part of the d.t.utils.__all__ variable. from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT from rest_framework import status from rest_framework.test import APIClient from typing import Mapping +from swh.scheduler import get_scheduler from swh.scheduler.tests.conftest import * # noqa +from swh.deposit.config import setup_django_for from swh.deposit.parsers import parse_xml +from swh.deposit.config import SWHDefaultConfig from swh.deposit.config import ( COL_IRI, EDIT_SE_IRI, DEPOSIT_STATUS_DEPOSITED, DEPOSIT_STATUS_REJECTED, DEPOSIT_STATUS_PARTIAL, DEPOSIT_STATUS_LOAD_SUCCESS, DEPOSIT_STATUS_VERIFIED, DEPOSIT_STATUS_LOAD_FAILURE ) from swh.deposit.tests.common import create_arborescence_archive TEST_USER = { 'username': 'test', 'password': 'password', 'email': 'test@example.org', 'provider_url': 'https://hal-test.archives-ouvertes.fr/', 'domain': 'archives-ouvertes.fr/', 'collection': { 'name': 'test' }, } +TEST_CONFIG = { + 'max_upload_size': 500, + 'extraction_dir': '/tmp/swh-deposit/test/extraction-dir', + 'checks': False, + 'provider': { + 'provider_name': '', + 'provider_type': 'deposit_client', + 'provider_url': '', + 'metadata': { + } + }, + 'tool': { + 'name': 'swh-deposit', + 'version': '0.0.1', + 'configuration': { + 'sword_version': '2' + } + }, +} + + +def pytest_configure(): + setup_django_for('testing') + + +@pytest.fixture() +def deposit_config(): + return TEST_CONFIG + + +@pytest.fixture(autouse=True) +def deposit_autoconfig(monkeypatch, deposit_config, swh_scheduler_config): + """Enforce config for deposit classes inherited from SWHDefaultConfig.""" + def mock_parse_config(*args, **kw): + config = deposit_config.copy() + config['scheduler'] = { + 'cls': 'local', + 'args': swh_scheduler_config, + } + return config + monkeypatch.setattr( + SWHDefaultConfig, "parse_config_file", + mock_parse_config) + + scheduler = get_scheduler('local', swh_scheduler_config) + task_type = { + 'type': 'load-deposit', + 'backend_name': 'swh.loader.packages.deposit.tasks.LoadDeposit', + 'description': 'why does this have not-null constraint?'} + scheduler.create_task_type(task_type) + + @pytest.fixture(scope='session') def django_db_setup( request, django_db_blocker, postgresql_proc): from django.conf import settings settings.DATABASES['default'].update({ ('ENGINE', 'django.db.backends.postgresql'), ('NAME', 'tests'), ('USER', postgresql_proc.user), # noqa ('HOST', postgresql_proc.host), # noqa ('PORT', postgresql_proc.port), # noqa }) with django_db_blocker.unblock(): setup_databases( verbosity=request.config.option.verbose, interactive=False, keepdb=False) def execute_sql(sql): """Execute sql to postgres db""" with psycopg2.connect(database='postgres') as conn: conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) cur = conn.cursor() cur.execute(sql) @pytest.fixture(autouse=True, scope='session') def swh_proxy(): """Automatically inject this fixture in all tests to ensure no outside connection takes place. """ os.environ['http_proxy'] = 'http://localhost:999' os.environ['https_proxy'] = 'http://localhost:999' def create_deposit_collection(collection_name: str): """Create a deposit collection with name collection_name """ from swh.deposit.models import DepositCollection try: collection = DepositCollection._default_manager.get( name=collection_name) except DepositCollection.DoesNotExist: collection = DepositCollection(name=collection_name) collection.save() return collection def deposit_collection_factory( collection_name=TEST_USER['collection']['name']): @pytest.fixture def _deposit_collection(db, collection_name=collection_name): return create_deposit_collection(collection_name) return _deposit_collection deposit_collection = deposit_collection_factory() deposit_another_collection = deposit_collection_factory('another-collection') @pytest.fixture def deposit_user(db, deposit_collection): """Create/Return the test_user "test" """ from swh.deposit.models import DepositClient try: user = DepositClient._default_manager.get( username=TEST_USER['username']) except DepositClient.DoesNotExist: user = DepositClient._default_manager.create_user( username=TEST_USER['username'], email=TEST_USER['email'], password=TEST_USER['password'], provider_url=TEST_USER['provider_url'], domain=TEST_USER['domain'], ) user.collections = [deposit_collection.id] user.save() return user @pytest.fixture def client(): """Override pytest-django one which does not work for djangorestframework. """ return APIClient() # <- drf's client @pytest.yield_fixture def authenticated_client(client, deposit_user): """Returned a logged client """ _token = '%s:%s' % (deposit_user.username, TEST_USER['password']) token = base64.b64encode(_token.encode('utf-8')) authorization = 'Basic %s' % token.decode('utf-8') client.credentials(HTTP_AUTHORIZATION=authorization) yield client client.logout() @pytest.fixture def sample_archive(tmp_path): """Returns a sample archive """ tmp_path = str(tmp_path) # pytest version limitation in previous version archive = create_arborescence_archive( tmp_path, 'archive1', 'file1', b'some content in file') return archive @pytest.fixture def atom_dataset(datadir) -> Mapping[str, bytes]: """Compute the paths to atom files. Returns: Dict of atom name per content (bytes) """ atom_path = os.path.join(datadir, 'atom') data = {} for filename in os.listdir(atom_path): filepath = os.path.join(atom_path, filename) with open(filepath, 'rb') as f: raw_content = f.read() # Keep the filename without extension atom_name = filename.split('.')[0] data[atom_name] = raw_content return data def create_deposit( authenticated_client, collection_name: str, sample_archive, external_id: str, deposit_status=DEPOSIT_STATUS_DEPOSITED): """Create a skeleton shell deposit """ url = reverse(COL_IRI, args=[collection_name]) # when response = authenticated_client.post( url, content_type='application/zip', # as zip data=sample_archive['data'], # + headers CONTENT_LENGTH=sample_archive['length'], HTTP_SLUG=external_id, HTTP_CONTENT_MD5=sample_archive['md5sum'], HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', HTTP_IN_PROGRESS='false', HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % ( sample_archive['name'])) # then assert response.status_code == status.HTTP_201_CREATED from swh.deposit.models import Deposit deposit = Deposit._default_manager.get(external_id=external_id) if deposit.status != deposit_status: deposit.status = deposit_status deposit.save() assert deposit.status == deposit_status return deposit def create_binary_deposit( authenticated_client, collection_name: str, sample_archive, external_id: str, deposit_status: str = DEPOSIT_STATUS_DEPOSITED, atom_dataset: Mapping[str, bytes] = {}): """Create a deposit with both metadata and archive set. Then alters its status to `deposit_status`. """ deposit = create_deposit( authenticated_client, collection_name, sample_archive, external_id=external_id, deposit_status=DEPOSIT_STATUS_PARTIAL) response = authenticated_client.post( reverse(EDIT_SE_IRI, args=[collection_name, deposit.id]), content_type='application/atom+xml;type=entry', data=atom_dataset['entry-data0'] % deposit.external_id.encode('utf-8'), HTTP_SLUG=deposit.external_id, HTTP_IN_PROGRESS='true') assert response.status_code == status.HTTP_201_CREATED assert deposit.status == DEPOSIT_STATUS_PARTIAL from swh.deposit.models import Deposit deposit = Deposit._default_manager.get(pk=deposit.id) if deposit.status != deposit_status: deposit.status = deposit_status deposit.save() assert deposit.status == deposit_status return deposit def deposit_factory(deposit_status=DEPOSIT_STATUS_DEPOSITED): """Build deposit with a specific status """ @pytest.fixture() def _deposit(sample_archive, deposit_collection, authenticated_client, deposit_status=deposit_status): external_id = 'external-id-%s' % deposit_status return create_deposit( authenticated_client, deposit_collection.name, sample_archive, external_id=external_id, deposit_status=deposit_status ) return _deposit deposited_deposit = deposit_factory() rejected_deposit = deposit_factory(deposit_status=DEPOSIT_STATUS_REJECTED) partial_deposit = deposit_factory(deposit_status=DEPOSIT_STATUS_PARTIAL) verified_deposit = deposit_factory(deposit_status=DEPOSIT_STATUS_VERIFIED) completed_deposit = deposit_factory(deposit_status=DEPOSIT_STATUS_LOAD_SUCCESS) failed_deposit = deposit_factory(deposit_status=DEPOSIT_STATUS_LOAD_FAILURE) @pytest.fixture def partial_deposit_with_metadata( sample_archive, deposit_collection, authenticated_client, atom_dataset): """Returns deposit with archive and metadata provided, status 'partial' """ return create_binary_deposit( authenticated_client, deposit_collection.name, sample_archive, external_id='external-id-partial', deposit_status=DEPOSIT_STATUS_PARTIAL, atom_dataset=atom_dataset ) @pytest.fixture def partial_deposit_only_metadata( deposit_collection, authenticated_client, atom_dataset): response = authenticated_client.post( reverse(COL_IRI, args=[deposit_collection.name]), content_type='application/atom+xml;type=entry', data=atom_dataset['entry-data1'], HTTP_SLUG='external-id-partial', HTTP_IN_PROGRESS=True) assert response.status_code == status.HTTP_201_CREATED response_content = parse_xml(response.content) deposit_id = response_content['deposit_id'] from swh.deposit.models import Deposit deposit = Deposit._default_manager.get(pk=deposit_id) assert deposit.status == DEPOSIT_STATUS_PARTIAL return deposit @pytest.fixture def complete_deposit(sample_archive, deposit_collection, authenticated_client): """Returns a completed deposit (load success) """ deposit = create_deposit( authenticated_client, deposit_collection.name, sample_archive, external_id='external-id-complete', deposit_status=DEPOSIT_STATUS_LOAD_SUCCESS ) _swh_id_context = 'https://hal.archives-ouvertes.fr/hal-01727745' deposit.swh_id = 'swh:1:dir:42a13fc721c8716ff695d0d62fc851d641f3a12b' deposit.swh_id_context = '%s;%s' % ( deposit.swh_id, _swh_id_context) deposit.swh_anchor_id = \ 'swh:rev:1:548b3c0a2bb43e1fca191e24b5803ff6b3bc7c10' deposit.swh_anchor_id_context = '%s;%s' % ( deposit.swh_anchor_id, _swh_id_context) deposit.save() return deposit @pytest.fixture() def tmp_path(tmp_path): return str(tmp_path) # issue with oldstable's pytest version diff --git a/tox.ini b/tox.ini index 74556aad..aa8038a0 100644 --- a/tox.ini +++ b/tox.ini @@ -1,36 +1,35 @@ [tox] envlist=flake8,mypy,py3 [testenv] extras = testing deps = # the dependency below is needed for now as a workaround for # https://github.com/pypa/pip/issues/6239 swh.core[http] >= 0.0.75 dev: ipdb pytest-cov - pifpaf commands = - pifpaf run postgresql -- \ - pytest \ + pytest \ !dev: --cov {envsitepackagesdir}/swh/deposit --cov-branch \ {envsitepackagesdir}/swh/deposit \ {posargs} [testenv:flake8] skip_install = true deps = flake8 commands = {envpython} -m flake8 \ --exclude=.tox,.git,__pycache__,.tox,.eggs,*.egg,swh/deposit/migrations [testenv:mypy] -setenv = DJANGO_SETTINGS_MODULE = swh.deposit.settings.testing +setenv = DJANGO_SETTINGS_MODULE=swh.deposit.settings.testing extras = testing deps = - mypy < 0.750 + mypy + django-stubs commands = mypy swh