Changeset View
Changeset View
Standalone View
Standalone View
swh/deposit/tests/api/test_deposit_private_read_archive.py
# Copyright (C) 2017-2019 The Software Heritage developers | # Copyright (C) 2017-2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import hashlib | import io | ||||
import shutil | import zipfile | ||||
from django.urls import reverse | from django.urls import reverse | ||||
from os import listdir, path, mkdir | |||||
from rest_framework import status | from rest_framework import status | ||||
from swh.core import tarball | |||||
from swh.deposit.config import PRIVATE_GET_RAW_CONTENT, EM_IRI | from swh.deposit.config import PRIVATE_GET_RAW_CONTENT, EM_IRI | ||||
from swh.deposit.tests.common import create_arborescence_archive | from swh.deposit.tests.common import create_arborescence_archive | ||||
PRIVATE_GET_RAW_CONTENT_NC = PRIVATE_GET_RAW_CONTENT + '-nc' | PRIVATE_GET_RAW_CONTENT_NC = PRIVATE_GET_RAW_CONTENT + '-nc' | ||||
def private_get_raw_url_endpoints(collection, deposit): | def private_get_raw_url_endpoints(collection, deposit): | ||||
"""There are 2 endpoints to check (one with collection, one without)""" | """There are 2 endpoints to check (one with collection, one without)""" | ||||
Show All 14 Lines | def test_access_to_existing_deposit_with_one_archive( | ||||
for url in private_get_raw_url_endpoints(deposit_collection, deposit): | for url in private_get_raw_url_endpoints(deposit_collection, deposit): | ||||
r = authenticated_client.get(url) | r = authenticated_client.get(url) | ||||
assert r.status_code == status.HTTP_200_OK | assert r.status_code == status.HTTP_200_OK | ||||
assert r._headers['content-type'][1] == 'application/octet-stream' | assert r._headers['content-type'][1] == 'application/octet-stream' | ||||
# read the stream | # read the stream | ||||
data = b''.join(r.streaming_content) | data = b''.join(r.streaming_content) | ||||
actual_sha1 = hashlib.sha1(data).hexdigest() | # extract the file from the zip | ||||
assert actual_sha1 == sample_archive['sha1sum'] | zfile = zipfile.ZipFile(io.BytesIO(data)) | ||||
assert zfile.namelist() == ['file1'] | |||||
assert zfile.open('file1').read() == b'some content in file' | |||||
def test_access_to_existing_deposit_with_multiple_archives( | def test_access_to_existing_deposit_with_multiple_archives( | ||||
tmp_path, authenticated_client, deposit_collection, partial_deposit, | tmp_path, authenticated_client, deposit_collection, partial_deposit, | ||||
sample_archive): | sample_archive): | ||||
"""Access to deposit should stream a 200 response with its raw contents | """Access to deposit should stream a 200 response with its raw contents | ||||
""" | """ | ||||
deposit = partial_deposit | deposit = partial_deposit | ||||
archive2 = create_arborescence_archive( | archive2 = create_arborescence_archive( | ||||
tmp_path, 'archive2', 'file2', b'some content in file') | tmp_path, 'archive2', 'file2', b'some other content in file') | ||||
# Add a second archive to deposit | # Add a second archive to deposit | ||||
update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit.id]) | update_uri = reverse(EM_IRI, args=[deposit_collection.name, deposit.id]) | ||||
response = authenticated_client.post( | response = authenticated_client.post( | ||||
update_uri, | update_uri, | ||||
content_type='application/zip', # as zip | content_type='application/zip', # as zip | ||||
data=archive2['data'], | data=archive2['data'], | ||||
# + headers | # + headers | ||||
CONTENT_LENGTH=archive2['length'], | CONTENT_LENGTH=archive2['length'], | ||||
HTTP_SLUG=deposit.external_id, | HTTP_SLUG=deposit.external_id, | ||||
HTTP_CONTENT_MD5=archive2['md5sum'], | HTTP_CONTENT_MD5=archive2['md5sum'], | ||||
HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', | HTTP_PACKAGING='http://purl.org/net/sword/package/SimpleZip', | ||||
HTTP_IN_PROGRESS='false', | HTTP_IN_PROGRESS='false', | ||||
HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % ( | HTTP_CONTENT_DISPOSITION='attachment; filename=%s' % ( | ||||
archive2['name'], )) | archive2['name'], )) | ||||
assert response.status_code == status.HTTP_201_CREATED | assert response.status_code == status.HTTP_201_CREATED | ||||
for url in private_get_raw_url_endpoints(deposit_collection, deposit): | for url in private_get_raw_url_endpoints(deposit_collection, deposit): | ||||
r = authenticated_client.get(url) | r = authenticated_client.get(url) | ||||
assert r.status_code == status.HTTP_200_OK | assert r.status_code == status.HTTP_200_OK | ||||
assert r._headers['content-type'][1] == 'application/octet-stream' | assert r._headers['content-type'][1] == 'application/octet-stream' | ||||
# read the stream | # read the stream | ||||
data = b''.join(r.streaming_content) | data = b''.join(r.streaming_content) | ||||
actual_sha1 = hashlib.sha1(data).hexdigest() | # extract the file from the zip | ||||
check_tarball_consistency( | zfile = zipfile.ZipFile(io.BytesIO(data)) | ||||
tmp_path, sample_archive, archive2, actual_sha1) | assert zfile.namelist() == ['file1', 'file2'] | ||||
assert zfile.open('file1').read() == b'some content in file' | |||||
assert zfile.open('file2').read() == b'some other content in file' | |||||
def check_tarball_consistency(tmp_path, archive, archive2, actual_sha1): | |||||
"""Check the tarballs are ok | |||||
""" | |||||
workdir = path.join(tmp_path, 'workdir') | |||||
mkdir(workdir) | |||||
lst = set(listdir(workdir)) | |||||
assert lst == set() | |||||
tarball.uncompress(archive['path'], dest=workdir) | |||||
assert listdir(workdir) == ['file1'] | |||||
tarball.uncompress(archive2['path'], dest=workdir) | |||||
lst = set(listdir(workdir)) | |||||
assert lst == {'file1', 'file2'} | |||||
new_path = workdir + '.zip' | |||||
tarball.compress(new_path, 'zip', workdir) | |||||
with open(new_path, 'rb') as f: | |||||
h = hashlib.sha1(f.read()).hexdigest() | |||||
assert actual_sha1 == h | |||||
assert actual_sha1 != archive['sha1sum'] | |||||
assert actual_sha1 != archive2['sha1sum'] | |||||
shutil.rmtree(workdir) |