diff --git a/swh/deposit/tests/api/test_deposit_private_read_archive.py b/swh/deposit/tests/api/test_deposit_private_read_archive.py index e86e980d..d41ebef1 100644 --- a/swh/deposit/tests/api/test_deposit_private_read_archive.py +++ b/swh/deposit/tests/api/test_deposit_private_read_archive.py @@ -1,100 +1,111 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import hashlib -import os +import shutil from django.urls import reverse -import pytest +from os import listdir, path, mkdir from rest_framework import status -from rest_framework.test import APITestCase from swh.core import tarball -from swh.deposit.config import PRIVATE_GET_RAW_CONTENT -from swh.deposit.tests import TEST_CONFIG +from swh.deposit.config import PRIVATE_GET_RAW_CONTENT, EM_IRI -from swh.deposit.tests.common import ( - BasicTestCase, WithAuthTestCase, CommonCreationRoutine, - FileSystemCreationRoutine, create_arborescence_archive -) +from swh.deposit.tests.common import create_arborescence_archive -@pytest.mark.fs -class DepositReadArchivesTest(APITestCase, WithAuthTestCase, - BasicTestCase, CommonCreationRoutine, - FileSystemCreationRoutine): +PRIVATE_GET_RAW_CONTENT_NC = PRIVATE_GET_RAW_CONTENT + '-nc' - def setUp(self): - super().setUp() - self.archive2 = create_arborescence_archive( - self.root_path, 'archive2', 'file2', b'some other content in file') - self.workdir = os.path.join(self.root_path, 'workdir') - def private_deposit_url(self, deposit_id): - return reverse(PRIVATE_GET_RAW_CONTENT, - args=[self.collection.name, deposit_id]) +def private_get_raw_url_endpoints(collection, deposit): + """There are 2 endpoints to check (one with collection, one without)""" + return [ + reverse(PRIVATE_GET_RAW_CONTENT, args=[collection.name, deposit.id]), + reverse(PRIVATE_GET_RAW_CONTENT_NC, args=[deposit.id]) + ] - def test_access_to_existing_deposit_with_one_archive(self): - """Access to deposit should stream a 200 response with its raw content - """ - deposit_id = self.create_simple_binary_deposit() +def test_access_to_existing_deposit_with_one_archive( + authenticated_client, deposit_collection, complete_deposit, + sample_archive): + """Access to deposit should stream a 200 response with its raw content - url = self.private_deposit_url(deposit_id) - r = self.client.get(url) + """ + deposit = complete_deposit - self.assertEqual(r.status_code, status.HTTP_200_OK) - self.assertEqual(r._headers['content-type'][1], - 'application/octet-stream') + for url in private_get_raw_url_endpoints(deposit_collection, deposit): + r = authenticated_client.get(url) + + assert r.status_code == status.HTTP_200_OK + assert r._headers['content-type'][1] == 'application/octet-stream' # read the stream data = b''.join(r.streaming_content) actual_sha1 = hashlib.sha1(data).hexdigest() - self.assertEqual(actual_sha1, self.archive['sha1sum']) - - # this does not touch the extraction dir so this should stay empty - self.assertEqual(os.listdir(TEST_CONFIG['extraction_dir']), []) - - def _check_tarball_consistency(self, actual_sha1): - tarball.uncompress(self.archive['path'], self.workdir) - self.assertEqual(os.listdir(self.workdir), ['file1']) - tarball.uncompress(self.archive2['path'], self.workdir) - lst = set(os.listdir(self.workdir)) - self.assertEqual(lst, {'file1', 'file2'}) - - new_path = self.workdir + '.zip' - tarball.compress(new_path, 'zip', self.workdir) - with open(new_path, 'rb') as f: - h = hashlib.sha1(f.read()).hexdigest() - - self.assertEqual(actual_sha1, h) - self.assertNotEqual(actual_sha1, self.archive['sha1sum']) - self.assertNotEqual(actual_sha1, self.archive2['sha1sum']) - - def test_access_to_existing_deposit_with_multiple_archives(self): - """Access to deposit should stream a 200 response with its raw contents - - """ - deposit_id = self.create_complex_binary_deposit() - url = self.private_deposit_url(deposit_id) - r = self.client.get(url) - - self.assertEqual(r.status_code, status.HTTP_200_OK) - self.assertEqual(r._headers['content-type'][1], - 'application/octet-stream') + assert actual_sha1 == sample_archive['sha1sum'] + + +def test_access_to_existing_deposit_with_multiple_archives( + tmp_path, authenticated_client, deposit_collection, partial_deposit, + sample_archive): + """Access to deposit should stream a 200 response with its raw contents + + """ + deposit = partial_deposit + archive2 = create_arborescence_archive( + tmp_path, 'archive2', 'file2', b'some content in file') + + # Add a second archive to deposit + update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit.id]) + response = authenticated_client.post( + update_uri, + content_type='application/zip', # as zip + data=archive2['data'], + # + headers + CONTENT_LENGTH=archive2['length'], + HTTP_SLUG=deposit.external_id, + HTTP_CONTENT_MD5=archive2['md5sum'], + HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', + HTTP_IN_PROGRESS='false', + HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % ( + archive2['name'], )) + assert response.status_code == status.HTTP_201_CREATED + + for url in private_get_raw_url_endpoints(deposit_collection, deposit): + r = authenticated_client.get(url) + + assert r.status_code == status.HTTP_200_OK + assert r._headers['content-type'][1] == 'application/octet-stream' # read the stream data = b''.join(r.streaming_content) actual_sha1 = hashlib.sha1(data).hexdigest() - self._check_tarball_consistency(actual_sha1) - - # this touches the extraction directory but should clean up - # after itself - self.assertEqual(os.listdir(TEST_CONFIG['extraction_dir']), []) - - -@pytest.mark.fs -class DepositReadArchivesTest2(DepositReadArchivesTest): - def private_deposit_url(self, deposit_id): - return reverse(PRIVATE_GET_RAW_CONTENT+'-nc', args=[deposit_id]) + check_tarball_consistency( + tmp_path, sample_archive, archive2, actual_sha1) + + +def check_tarball_consistency(tmp_path, archive, archive2, actual_sha1): + """Check the tarballs are ok + + """ + workdir = path.join(tmp_path, 'workdir') + mkdir(workdir) + lst = set(listdir(workdir)) + assert lst == set() + tarball.uncompress(archive['path'], dest=workdir) + assert listdir(workdir) == ['file1'] + tarball.uncompress(archive2['path'], dest=workdir) + lst = set(listdir(workdir)) + assert lst == {'file1', 'file2'} + + new_path = workdir + '.zip' + tarball.compress(new_path, 'zip', workdir) + with open(new_path, 'rb') as f: + h = hashlib.sha1(f.read()).hexdigest() + + assert actual_sha1 == h + assert actual_sha1 != archive['sha1sum'] + assert actual_sha1 != archive2['sha1sum'] + + shutil.rmtree(workdir)