diff --git a/swh/core/tarball.py b/swh/core/tarball.py index 219a3df..446328f 100644 --- a/swh/core/tarball.py +++ b/swh/core/tarball.py @@ -1,142 +1,142 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import shutil import stat import tarfile import zipfile from . import utils -def unpack_tar_Z(tarpath: str, extract_dir: str) -> str: +def unpack_specific_tar(tarpath: str, extract_dir: str) -> str: """Unpack .tar.Z file and returns the full path to the uncompressed directory. Raises ReadError in case of issue uncompressing the archive. """ try: if not os.path.exists(tarpath): raise ValueError(f'{tarpath} not found') filename = os.path.basename(tarpath) output_directory = os.path.join(extract_dir, filename) os.makedirs(output_directory, exist_ok=True) from subprocess import run run(['tar', 'xf', tarpath, '-C', output_directory]) # data = os.listdir(output_directory) # assert len(data) > 0 return output_directory except Exception as e: raise shutil.ReadError( f'Unable to uncompress {tarpath} to {extract_dir}. Reason: {e}') def register_new_archive_formats(): """Register new archive formats to uncompress """ registered_formats = [f[0] for f in shutil.get_unpack_formats()] for format_id in ADDITIONAL_ARCHIVE_FORMATS: name = format_id[0] if name in registered_formats: continue shutil.register_unpack_format( name=format_id[0], extensions=format_id[1], function=format_id[2]) def uncompress(tarpath: str, dest: str): """Uncompress tarpath to dest folder if tarball is supported and safe. Safe means, no file will be uncompressed outside of dirpath. Note that this fixes permissions after successfully uncompressing the archive. Args: tarpath: path to tarball to uncompress dest: the destination folder where to uncompress the tarball Returns: The nature of the tarball, zip or tar. Raises: ValueError when the archive is not supported """ try: shutil.unpack_archive(tarpath, extract_dir=dest) except shutil.ReadError: raise ValueError(f'File {tarpath} is not a supported archive.') # Fix permissions for dirpath, _, fnames in os.walk(dest): os.chmod(dirpath, 0o755) for fname in fnames: fpath = os.path.join(dirpath, fname) if not os.path.islink(fpath): fpath_exec = os.stat(fpath).st_mode & stat.S_IXUSR if not fpath_exec: os.chmod(fpath, 0o644) def _ls(rootdir): """Generator of filepath, filename from rootdir. """ for dirpath, dirnames, fnames in os.walk(rootdir): for fname in (dirnames+fnames): fpath = os.path.join(dirpath, fname) fname = utils.commonname(rootdir, fpath) yield fpath, fname def _compress_zip(tarpath, files): """Compress dirpath's content as tarpath. """ with zipfile.ZipFile(tarpath, 'w') as z: for fpath, fname in files: z.write(fpath, arcname=fname) def _compress_tar(tarpath, files): """Compress dirpath's content as tarpath. """ with tarfile.open(tarpath, 'w:bz2') as t: for fpath, fname in files: t.add(fpath, arcname=fname, recursive=False) def compress(tarpath, nature, dirpath_or_files): """Create a tarball tarpath with nature nature. The content of the tarball is either dirpath's content (if representing a directory path) or dirpath's iterable contents. Compress the directory dirpath's content to a tarball. The tarball being dumped at tarpath. The nature of the tarball is determined by the nature argument. """ if isinstance(dirpath_or_files, str): files = _ls(dirpath_or_files) else: # iterable of 'filepath, filename' files = dirpath_or_files if nature == 'zip': _compress_zip(tarpath, files) else: _compress_tar(tarpath, files) return tarpath # Additional uncompression archive format support ADDITIONAL_ARCHIVE_FORMATS = [ # name , extensions, function - ('tar.Z', ['.tar.Z'], unpack_tar_Z), + ('tar.Z|x', ['.tar.Z', '.tar.x'], unpack_specific_tar), ] diff --git a/swh/core/tests/data/archives/hello.tar.x b/swh/core/tests/data/archives/hello.tar.x new file mode 100644 index 0000000..09298d3 Binary files /dev/null and b/swh/core/tests/data/archives/hello.tar.x differ diff --git a/swh/core/tests/test_tarball.py b/swh/core/tests/test_tarball.py index 725d9a6..c09d202 100644 --- a/swh/core/tests/test_tarball.py +++ b/swh/core/tests/test_tarball.py @@ -1,121 +1,121 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import pytest import shutil from swh.core import tarball @pytest.fixture def prepare_shutil_state(): """Reset any shutil modification in its current state """ import shutil registered_formats = [f[0] for f in shutil.get_unpack_formats()] for format_id in tarball.ADDITIONAL_ARCHIVE_FORMATS: name = format_id[0] if name in registered_formats: shutil.unregister_unpack_format(name) return shutil def test_compress_uncompress_zip(tmp_path): tocompress = tmp_path / 'compressme' tocompress.mkdir() for i in range(10): fpath = tocompress / ('file%s.txt' % i) fpath.write_text('content of file %s' % i) zipfile = tmp_path / 'archive.zip' tarball.compress(str(zipfile), 'zip', str(tocompress)) destdir = tmp_path / 'destdir' tarball.uncompress(str(zipfile), str(destdir)) lsdir = sorted(x.name for x in destdir.iterdir()) assert ['file%s.txt' % i for i in range(10)] == lsdir def test_compress_uncompress_tar(tmp_path): tocompress = tmp_path / 'compressme' tocompress.mkdir() for i in range(10): fpath = tocompress / ('file%s.txt' % i) fpath.write_text('content of file %s' % i) tarfile = tmp_path / 'archive.tar' tarball.compress(str(tarfile), 'tar', str(tocompress)) destdir = tmp_path / 'destdir' tarball.uncompress(str(tarfile), str(destdir)) lsdir = sorted(x.name for x in destdir.iterdir()) assert ['file%s.txt' % i for i in range(10)] == lsdir -def test_uncompress_tar_Z_failure(tmp_path, datadir): +def test_unpack_specific_tar_failure(tmp_path, datadir): tarpath = os.path.join(datadir, 'archives', 'inexistent-archive.tar.Z') assert not os.path.exists(tarpath) with pytest.raises(shutil.ReadError, match=f'Unable to uncompress {tarpath} to {tmp_path}'): - tarball.unpack_tar_Z(tarpath, tmp_path) + tarball.unpack_specific_tar(tarpath, tmp_path) -def test_uncompress_tar_Z(tmp_path, datadir): +def test_unpack_specific_tar(tmp_path, datadir): filename = 'groff-1.02.tar.Z' tarpath = os.path.join(datadir, 'archives', filename) assert os.path.exists(tarpath) - output_directory = tarball.unpack_tar_Z(tarpath, tmp_path) + output_directory = tarball.unpack_specific_tar(tarpath, tmp_path) expected_path = os.path.join(tmp_path, filename) assert os.path.exists(expected_path) assert expected_path == output_directory assert len(os.listdir(expected_path)) > 0 def test_register_new_archive_formats(prepare_shutil_state): unpack_formats_v1 = [f[0] for f in shutil.get_unpack_formats()] for format_id in tarball.ADDITIONAL_ARCHIVE_FORMATS: assert format_id[0] not in unpack_formats_v1 # when tarball.register_new_archive_formats() # then unpack_formats_v2 = [f[0] for f in shutil.get_unpack_formats()] for format_id in tarball.ADDITIONAL_ARCHIVE_FORMATS: assert format_id[0] in unpack_formats_v2 def test_uncompress_tarpaths(tmp_path, datadir, prepare_shutil_state): archive_dir = os.path.join(datadir, 'archives') tarfiles = os.listdir(archive_dir) tarpaths = [os.path.join(archive_dir, tarfile) for tarfile in tarfiles] unregistered_yet_tarpaths = list( filter(lambda t: t.endswith('.Z'), tarpaths)) for tarpath in unregistered_yet_tarpaths: with pytest.raises(ValueError, match=f'File {tarpath} is not a supported archive'): tarball.uncompress(tarpath, dest=tmp_path) tarball.register_new_archive_formats() for n, tarpath in enumerate(tarpaths, start=1): tarball.uncompress(tarpath, dest=tmp_path) assert n == len(tarpaths)