diff --git a/swh/core/tarball.py b/swh/core/tarball.py index 996e82c..1544432 100644 --- a/swh/core/tarball.py +++ b/swh/core/tarball.py @@ -1,146 +1,147 @@ -# Copyright (C) 2015-2017 The Software Heritage developers +# Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import shutil import stat import tarfile import zipfile +from subprocess import run + from . import utils -def unpack_specific_tar(tarpath: str, extract_dir: str) -> str: - """Unpack specific tarballs (.tar.Z, .tar.lz, .tar.x) file. Returns the - full path to the uncompressed directory. +def _unpack_tar(tarpath: str, extract_dir: str) -> str: + """Unpack tarballs unsupported by the standard python library. Examples + include tar.Z, tar.lz, tar.x, etc.... + + As this implementation relies on the `tar` command, this function supports + the same compression the tar command supports. - This relies on the `tar` command. + This expects the `extract_dir` to exist. Raises - ReadError in case of issue uncompressing the archive. + + shutil.ReadError in case of issue uncompressing the archive (tarpath + does not exist, extract_dir does not exist, etc...) + + Returns + full path to the uncompressed directory. """ try: - if not os.path.exists(tarpath): - raise ValueError(f'{tarpath} not found') - filename = os.path.basename(tarpath) - output_directory = os.path.join(extract_dir, filename) - os.makedirs(output_directory, exist_ok=True) - from subprocess import run - run(['tar', 'xf', tarpath, '-C', output_directory]) - # data = os.listdir(output_directory) - # assert len(data) > 0 - return output_directory + run(['tar', 'xf', tarpath, '-C', extract_dir], check=True) + return extract_dir except Exception as e: raise shutil.ReadError( f'Unable to uncompress {tarpath} to {extract_dir}. Reason: {e}') def register_new_archive_formats(): """Register new archive formats to uncompress """ registered_formats = [f[0] for f in shutil.get_unpack_formats()] - for format_id in ADDITIONAL_ARCHIVE_FORMATS: - name = format_id[0] + for name, extensions, function in ADDITIONAL_ARCHIVE_FORMATS: if name in registered_formats: continue - shutil.register_unpack_format( - name=format_id[0], extensions=format_id[1], function=format_id[2]) + shutil.register_unpack_format(name, extensions, function) def uncompress(tarpath: str, dest: str): - """Uncompress tarpath to dest folder if tarball is supported and safe. - Safe means, no file will be uncompressed outside of dirpath. + """Uncompress tarpath to dest folder if tarball is supported. Note that this fixes permissions after successfully uncompressing the archive. Args: tarpath: path to tarball to uncompress dest: the destination folder where to uncompress the tarball Returns: The nature of the tarball, zip or tar. Raises: - ValueError when the archive is not supported + ValueError when a problem occurs during unpacking """ try: shutil.unpack_archive(tarpath, extract_dir=dest) - except shutil.ReadError: - raise ValueError(f'File {tarpath} is not a supported archive.') + except shutil.ReadError as e: + raise ValueError(f'Problem during unpacking {tarpath}. Reason: {e}') # Fix permissions for dirpath, _, fnames in os.walk(dest): os.chmod(dirpath, 0o755) for fname in fnames: fpath = os.path.join(dirpath, fname) if not os.path.islink(fpath): fpath_exec = os.stat(fpath).st_mode & stat.S_IXUSR if not fpath_exec: os.chmod(fpath, 0o644) def _ls(rootdir): """Generator of filepath, filename from rootdir. """ for dirpath, dirnames, fnames in os.walk(rootdir): for fname in (dirnames+fnames): fpath = os.path.join(dirpath, fname) fname = utils.commonname(rootdir, fpath) yield fpath, fname def _compress_zip(tarpath, files): """Compress dirpath's content as tarpath. """ with zipfile.ZipFile(tarpath, 'w') as z: for fpath, fname in files: z.write(fpath, arcname=fname) def _compress_tar(tarpath, files): """Compress dirpath's content as tarpath. """ with tarfile.open(tarpath, 'w:bz2') as t: for fpath, fname in files: t.add(fpath, arcname=fname, recursive=False) def compress(tarpath, nature, dirpath_or_files): """Create a tarball tarpath with nature nature. The content of the tarball is either dirpath's content (if representing a directory path) or dirpath's iterable contents. Compress the directory dirpath's content to a tarball. The tarball being dumped at tarpath. The nature of the tarball is determined by the nature argument. """ if isinstance(dirpath_or_files, str): files = _ls(dirpath_or_files) else: # iterable of 'filepath, filename' files = dirpath_or_files if nature == 'zip': _compress_zip(tarpath, files) else: _compress_tar(tarpath, files) return tarpath # Additional uncompression archive format support ADDITIONAL_ARCHIVE_FORMATS = [ # name , extensions, function - ('tar.Z|x', ['.tar.Z', '.tar.x'], unpack_specific_tar), + ('tar.Z|x', ['.tar.Z', '.tar.x'], _unpack_tar), # FIXME: make this optional depending on the runtime lzip package install - ('tar.lz', ['.tar.lz'], unpack_specific_tar), + ('tar.lz', ['.tar.lz'], _unpack_tar), ] + +register_new_archive_formats() diff --git a/swh/core/tests/test_tarball.py b/swh/core/tests/test_tarball.py index c09d202..7c7f189 100644 --- a/swh/core/tests/test_tarball.py +++ b/swh/core/tests/test_tarball.py @@ -1,121 +1,169 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import pytest import shutil from swh.core import tarball @pytest.fixture def prepare_shutil_state(): """Reset any shutil modification in its current state """ import shutil registered_formats = [f[0] for f in shutil.get_unpack_formats()] for format_id in tarball.ADDITIONAL_ARCHIVE_FORMATS: name = format_id[0] if name in registered_formats: shutil.unregister_unpack_format(name) return shutil def test_compress_uncompress_zip(tmp_path): tocompress = tmp_path / 'compressme' tocompress.mkdir() for i in range(10): fpath = tocompress / ('file%s.txt' % i) fpath.write_text('content of file %s' % i) zipfile = tmp_path / 'archive.zip' tarball.compress(str(zipfile), 'zip', str(tocompress)) destdir = tmp_path / 'destdir' tarball.uncompress(str(zipfile), str(destdir)) lsdir = sorted(x.name for x in destdir.iterdir()) assert ['file%s.txt' % i for i in range(10)] == lsdir def test_compress_uncompress_tar(tmp_path): tocompress = tmp_path / 'compressme' tocompress.mkdir() for i in range(10): fpath = tocompress / ('file%s.txt' % i) fpath.write_text('content of file %s' % i) tarfile = tmp_path / 'archive.tar' tarball.compress(str(tarfile), 'tar', str(tocompress)) destdir = tmp_path / 'destdir' tarball.uncompress(str(tarfile), str(destdir)) lsdir = sorted(x.name for x in destdir.iterdir()) assert ['file%s.txt' % i for i in range(10)] == lsdir -def test_unpack_specific_tar_failure(tmp_path, datadir): +def test__unpack_tar_failure(tmp_path, datadir): + """Unpack inexistent tarball should fail + + """ tarpath = os.path.join(datadir, 'archives', 'inexistent-archive.tar.Z') assert not os.path.exists(tarpath) with pytest.raises(shutil.ReadError, match=f'Unable to uncompress {tarpath} to {tmp_path}'): - tarball.unpack_specific_tar(tarpath, tmp_path) + tarball._unpack_tar(tarpath, tmp_path) -def test_unpack_specific_tar(tmp_path, datadir): +def test__unpack_tar_failure2(tmp_path, datadir): + """Unpack Existent tarball into an inexistent folder should fail + + """ filename = 'groff-1.02.tar.Z' tarpath = os.path.join(datadir, 'archives', filename) assert os.path.exists(tarpath) - output_directory = tarball.unpack_specific_tar(tarpath, tmp_path) + extract_dir = os.path.join(tmp_path, 'dir', 'inexistent') + + with pytest.raises(shutil.ReadError, + match=f'Unable to uncompress {tarpath} to {tmp_path}'): + tarball._unpack_tar(tarpath, extract_dir) + - expected_path = os.path.join(tmp_path, filename) +def test__unpack_tar_failure3(tmp_path, datadir): + """Unpack unsupported tarball should fail + + """ + filename = 'hello.zip' + tarpath = os.path.join(datadir, 'archives', filename) + + assert os.path.exists(tarpath) + + with pytest.raises(shutil.ReadError, + match=f'Unable to uncompress {tarpath} to {tmp_path}'): + tarball._unpack_tar(tarpath, tmp_path) - assert os.path.exists(expected_path) - assert expected_path == output_directory - assert len(os.listdir(expected_path)) > 0 + +def test__unpack_tar(tmp_path, datadir): + """Unpack supported tarball into an existent folder should be ok + + """ + filename = 'groff-1.02.tar.Z' + tarpath = os.path.join(datadir, 'archives', filename) + + assert os.path.exists(tarpath) + + extract_dir = os.path.join(tmp_path, filename) + os.makedirs(extract_dir, exist_ok=True) + + output_directory = tarball._unpack_tar(tarpath, extract_dir) + + assert extract_dir == output_directory + assert len(os.listdir(extract_dir)) > 0 def test_register_new_archive_formats(prepare_shutil_state): + """Registering new archive formats should be fine + + """ unpack_formats_v1 = [f[0] for f in shutil.get_unpack_formats()] for format_id in tarball.ADDITIONAL_ARCHIVE_FORMATS: assert format_id[0] not in unpack_formats_v1 # when tarball.register_new_archive_formats() # then unpack_formats_v2 = [f[0] for f in shutil.get_unpack_formats()] for format_id in tarball.ADDITIONAL_ARCHIVE_FORMATS: assert format_id[0] in unpack_formats_v2 def test_uncompress_tarpaths(tmp_path, datadir, prepare_shutil_state): + """High level call uncompression on un/supported tarballs + + """ archive_dir = os.path.join(datadir, 'archives') tarfiles = os.listdir(archive_dir) tarpaths = [os.path.join(archive_dir, tarfile) for tarfile in tarfiles] - unregistered_yet_tarpaths = list( - filter(lambda t: t.endswith('.Z'), tarpaths)) - for tarpath in unregistered_yet_tarpaths: + unsupported_tarpaths = [] + for t in tarpaths: + if t.endswith('.Z') or t.endswith('.x') or t.endswith('.lz'): + unsupported_tarpaths.append(t) + + # not supported yet + for tarpath in unsupported_tarpaths: with pytest.raises(ValueError, - match=f'File {tarpath} is not a supported archive'): + match=f'Problem during unpacking {tarpath}.'): tarball.uncompress(tarpath, dest=tmp_path) + # register those unsupported formats tarball.register_new_archive_formats() + # unsupported formats are now supported for n, tarpath in enumerate(tarpaths, start=1): tarball.uncompress(tarpath, dest=tmp_path) assert n == len(tarpaths)