Changeset View
Changeset View
Standalone View
Standalone View
swh/deposit/tests/api/test_deposit_private_read_archive.py
# Copyright (C) 2017-2019 The Software Heritage developers | # Copyright (C) 2017-2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import io | from os.path import exists, join | ||||
import zipfile | import tarfile | ||||
from django.urls import reverse | from django.urls import reverse | ||||
from rest_framework import status | from rest_framework import status | ||||
from swh.deposit.api.private.deposit_read import aggregate_tarballs | |||||
from swh.deposit.config import EM_IRI, PRIVATE_GET_RAW_CONTENT | from swh.deposit.config import EM_IRI, PRIVATE_GET_RAW_CONTENT | ||||
from swh.deposit.tests.common import create_arborescence_archive | from swh.deposit.tests.common import create_arborescence_archive | ||||
PRIVATE_GET_RAW_CONTENT_NC = PRIVATE_GET_RAW_CONTENT + "-nc" | PRIVATE_GET_RAW_CONTENT_NC = PRIVATE_GET_RAW_CONTENT + "-nc" | ||||
def private_get_raw_url_endpoints(collection, deposit): | def private_get_raw_url_endpoints(collection, deposit): | ||||
"""There are 2 endpoints to check (one with collection, one without)""" | """There are 2 endpoints to check (one with collection, one without)""" | ||||
return [ | return [ | ||||
reverse(PRIVATE_GET_RAW_CONTENT, args=[collection.name, deposit.id]), | reverse(PRIVATE_GET_RAW_CONTENT, args=[collection.name, deposit.id]), | ||||
reverse(PRIVATE_GET_RAW_CONTENT_NC, args=[deposit.id]), | reverse(PRIVATE_GET_RAW_CONTENT_NC, args=[deposit.id]), | ||||
] | ] | ||||
def test_access_to_existing_deposit_with_one_archive( | def test_access_to_existing_deposit_with_one_archive( | ||||
authenticated_client, deposit_collection, complete_deposit, sample_archive | authenticated_client, | ||||
deposit_collection, | |||||
complete_deposit, | |||||
sample_archive, | |||||
tmp_path, | |||||
): | ): | ||||
"""Access to deposit should stream a 200 response with its raw content | """Access to deposit should stream a 200 response with its raw content | ||||
""" | """ | ||||
deposit = complete_deposit | deposit = complete_deposit | ||||
for url in private_get_raw_url_endpoints(deposit_collection, deposit): | for i, url in enumerate(private_get_raw_url_endpoints(deposit_collection, deposit)): | ||||
r = authenticated_client.get(url) | response = authenticated_client.get(url) | ||||
assert r.status_code == status.HTTP_200_OK | assert response.status_code == status.HTTP_200_OK | ||||
assert r._headers["content-type"][1] == "application/zip" | assert response._headers["content-type"][1] == "application/tar" | ||||
# read the stream | # write the response stream in a temporary archive | ||||
data = b"".join(r.streaming_content) | archive_path = join(tmp_path, f"archive_{i}.tar") | ||||
# extract the file from the zip | with open(archive_path, "wb") as f: | ||||
zfile = zipfile.ZipFile(io.BytesIO(data)) | for chunk in response.streaming_content: | ||||
assert zfile.namelist() == ["file1"] | f.write(chunk) | ||||
assert zfile.open("file1").read() == b"some content in file" | |||||
# to check its properties are correct | |||||
tfile = tarfile.open(archive_path) | |||||
assert set(tfile.getnames()) == {".", "./file1"} | |||||
assert tfile.extractfile("./file1").read() == b"some content in file" | |||||
def test_access_to_existing_deposit_with_multiple_archives( | def test_access_to_existing_deposit_with_multiple_archives( | ||||
tmp_path, authenticated_client, deposit_collection, partial_deposit, sample_archive | tmp_path, authenticated_client, deposit_collection, partial_deposit, sample_archive | ||||
): | ): | ||||
"""Access to deposit should stream a 200 response with its raw contents | """Access to deposit should stream a 200 response with its raw contents | ||||
""" | """ | ||||
Show All 13 Lines | response = authenticated_client.post( | ||||
HTTP_SLUG=deposit.external_id, | HTTP_SLUG=deposit.external_id, | ||||
HTTP_CONTENT_MD5=archive2["md5sum"], | HTTP_CONTENT_MD5=archive2["md5sum"], | ||||
HTTP_PACKAGING="http://purl.org/net/sword/package/SimpleZip", | HTTP_PACKAGING="http://purl.org/net/sword/package/SimpleZip", | ||||
HTTP_IN_PROGRESS="false", | HTTP_IN_PROGRESS="false", | ||||
HTTP_CONTENT_DISPOSITION="attachment; filename=%s" % (archive2["name"],), | HTTP_CONTENT_DISPOSITION="attachment; filename=%s" % (archive2["name"],), | ||||
) | ) | ||||
assert response.status_code == status.HTTP_201_CREATED | assert response.status_code == status.HTTP_201_CREATED | ||||
for url in private_get_raw_url_endpoints(deposit_collection, deposit): | for i, url in enumerate(private_get_raw_url_endpoints(deposit_collection, deposit)): | ||||
r = authenticated_client.get(url) | response = authenticated_client.get(url) | ||||
assert r.status_code == status.HTTP_200_OK | assert response.status_code == status.HTTP_200_OK | ||||
assert r._headers["content-type"][1] == "application/zip" | assert response._headers["content-type"][1] == "application/tar" | ||||
# read the stream | # write the response stream in a temporary archive | ||||
data = b"".join(r.streaming_content) | archive_path = join(tmp_path, f"archive_{i}.tar") | ||||
# extract the file from the zip | with open(archive_path, "wb") as f: | ||||
zfile = zipfile.ZipFile(io.BytesIO(data)) | for chunk in response.streaming_content: | ||||
assert set(zfile.namelist()) == {"file1", "file2"} | f.write(chunk) | ||||
assert zfile.open("file1").read() == b"some content in file" | |||||
assert zfile.open("file2").read() == b"some other content in file" | # to check its properties are correct | ||||
tfile = tarfile.open(archive_path) | |||||
assert set(tfile.getnames()) == {".", "./file1", "./file2"} | |||||
assert tfile.extractfile("./file1").read() == b"some content in file" | |||||
assert tfile.extractfile("./file2").read() == b"some other content in file" | |||||
def test_aggregate_tarballs_with_strange_archive(datadir, tmp_path): | |||||
archive = join(datadir, "archives", "single-artifact-package.tar.gz") | |||||
with aggregate_tarballs(tmp_path, [archive]) as tarball_path: | |||||
assert exists(tarball_path) |