Changeset View
Changeset View
Standalone View
Standalone View
swh/core/tests/test_tarball.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019-2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import hashlib | |||||
import os | import os | ||||
import shutil | import shutil | ||||
import pytest | import pytest | ||||
from swh.core import tarball | from swh.core import tarball | ||||
▲ Show 20 Lines • Show All 141 Lines • ▼ Show 20 Lines | def test_register_new_archive_formats(prepare_shutil_state): | ||||
tarball.register_new_archive_formats() | tarball.register_new_archive_formats() | ||||
# then | # then | ||||
unpack_formats_v2 = [f[0] for f in shutil.get_unpack_formats()] | unpack_formats_v2 = [f[0] for f in shutil.get_unpack_formats()] | ||||
for format_id in tarball.ADDITIONAL_ARCHIVE_FORMATS: | for format_id in tarball.ADDITIONAL_ARCHIVE_FORMATS: | ||||
assert format_id[0] in unpack_formats_v2 | assert format_id[0] in unpack_formats_v2 | ||||
def test_uncompress_archives(tmp_path, datadir, prepare_shutil_state): | def test_uncompress_archives(tmp_path, datadir): | ||||
"""High level call uncompression on un/supported archives | """High level call uncompression on un/supported archives | ||||
""" | """ | ||||
archive_dir = os.path.join(datadir, "archives") | archive_dir = os.path.join(datadir, "archives") | ||||
archive_files = os.listdir(archive_dir) | archive_files = os.listdir(archive_dir) | ||||
# not supported yet | |||||
unsupported_archives = [] | |||||
for archive_file in archive_files: | |||||
if archive_file.endswith((".Z", ".x", ".lz", ".crate")): | |||||
unsupported_archives.append(os.path.join(archive_dir, archive_file)) | |||||
for archive_path in unsupported_archives: | |||||
with pytest.raises( | |||||
ValueError, match=f"Problem during unpacking {archive_path}." | |||||
): | |||||
tarball.uncompress(archive_path, dest=tmp_path) | |||||
# register those unsupported formats | |||||
tarball.register_new_archive_formats() | |||||
# unsupported formats are now supported | # unsupported formats are now supported | ||||
for archive_file in archive_files: | for archive_file in archive_files: | ||||
archive_path = os.path.join(archive_dir, archive_file) | archive_path = os.path.join(archive_dir, archive_file) | ||||
extract_dir = os.path.join(tmp_path, archive_file) | extract_dir = os.path.join(tmp_path, archive_file) | ||||
tarball.uncompress(archive_path, dest=extract_dir) | tarball.uncompress(archive_path, dest=extract_dir) | ||||
assert len(os.listdir(extract_dir)) > 0 | assert len(os.listdir(extract_dir)) > 0 | ||||
▲ Show 20 Lines • Show All 44 Lines • ▼ Show 20 Lines | archive_files = [ | ||||
if os.path.isfile(os.path.join(archives_path, f)) | if os.path.isfile(os.path.join(archives_path, f)) | ||||
] | ] | ||||
for archive_file in archive_files: | for archive_file in archive_files: | ||||
archive_file_upper = os.path.join(tmp_path, archive_file.upper()) | archive_file_upper = os.path.join(tmp_path, archive_file.upper()) | ||||
extract_dir = os.path.join(tmp_path, archive_file) | extract_dir = os.path.join(tmp_path, archive_file) | ||||
shutil.copy(os.path.join(archives_path, archive_file), archive_file_upper) | shutil.copy(os.path.join(archives_path, archive_file), archive_file_upper) | ||||
tarball.uncompress(archive_file_upper, extract_dir) | tarball.uncompress(archive_file_upper, extract_dir) | ||||
assert len(os.listdir(extract_dir)) > 0 | assert len(os.listdir(extract_dir)) > 0 | ||||
def test_uncompress_archive_no_extension(tmp_path, datadir): | |||||
"""Copy test archives in a temporary directory but turn their names | |||||
to their md5 sums, then check they can be successfully extracted. | |||||
""" | |||||
archives_path = os.path.join(datadir, "archives") | |||||
archive_files = [ | |||||
f | |||||
for f in os.listdir(archives_path) | |||||
if os.path.isfile(os.path.join(archives_path, f)) | |||||
] | |||||
for archive_file in archive_files: | |||||
archive_file_path = os.path.join(archives_path, archive_file) | |||||
with open(archive_file_path, "rb") as f: | |||||
md5sum = hashlib.md5(f.read()).hexdigest() | |||||
archive_file_md5sum = os.path.join(tmp_path, md5sum) | |||||
extract_dir = os.path.join(tmp_path, archive_file) | |||||
shutil.copy(archive_file_path, archive_file_md5sum) | |||||
tarball.uncompress(archive_file_md5sum, extract_dir) | |||||
assert len(os.listdir(extract_dir)) > 0 |