diff --git a/swh/core/tarball.py b/swh/core/tarball.py --- a/swh/core/tarball.py +++ b/swh/core/tarball.py @@ -4,6 +4,7 @@ # See top-level LICENSE file for more information import os +import shutil import stat import tarfile import zipfile @@ -134,7 +135,44 @@ members=_safemembers(tarpath, members, dirpath)) -def uncompress(tarpath, dest): +def unpack_tar_Z(tarpath: str, extract_dir: str) -> str: + """Unpack .tar.Z file and returns the full path to the uncompressed + directory. + + Raises + ReadError in case of issue uncompressing the archive. + + """ + try: + if not os.path.exists(tarpath): + raise ValueError(f'{tarpath} not found') + filename = os.path.basename(tarpath) + output_directory = os.path.join(extract_dir, filename) + os.makedirs(output_directory, exist_ok=True) + from subprocess import run + run(['tar', 'xf', tarpath, '-C', output_directory]) + # data = os.listdir(output_directory) + # assert len(data) > 0 + return output_directory + except Exception as e: + raise shutil.ReadError( + f'Unable to uncompress {tarpath} to {extract_dir}. Reason: {e}') + + +def register_new_archive_formats(): + """Register new archive formats to uncompress + + """ + registered_formats = [f[0] for f in shutil.get_unpack_formats()] + for format_id in ADDITIONAL_ARCHIVE_FORMATS: + name = format_id[0] + if name in registered_formats: + continue + shutil.register_unpack_format( + name=format_id[0], extensions=format_id[1], function=format_id[2]) + + +def uncompress(tarpath: str, dest: str): """Uncompress tarpath to dest folder if tarball is supported and safe. Safe means, no file will be uncompressed outside of dirpath. @@ -149,19 +187,13 @@ The nature of the tarball, zip or tar. Raises: - ValueError when: - - an archive member would be extracted outside basepath - - the archive is not supported + ValueError when the archive is not supported """ - if tarfile.is_tarfile(tarpath): - _uncompress_tar(tarpath, dest) - nature = 'tar' - elif zipfile.is_zipfile(tarpath): - _uncompress_zip(tarpath, dest) - nature = 'zip' - else: - raise ValueError('File %s is not a supported archive.' % tarpath) + try: + shutil.unpack_archive(tarpath, extract_dir=dest) + except shutil.ReadError: + raise ValueError(f'File {tarpath} is not a supported archive.') # Fix permissions for dirpath, _, fnames in os.walk(dest): @@ -173,8 +205,6 @@ if not fpath_exec: os.chmod(fpath, 0o644) - return nature - def _ls(rootdir): """Generator of filepath, filename from rootdir. @@ -226,3 +256,10 @@ _compress_tar(tarpath, files) return tarpath + + +# Additional uncompression archive format support +ADDITIONAL_ARCHIVE_FORMATS = [ + # name , extensions, function + ('tar.Z', ['.tar.Z'], unpack_tar_Z), +] diff --git a/swh/core/tests/data/archives/groff-1.02.tar.Z b/swh/core/tests/data/archives/groff-1.02.tar.Z new file mode 100644 index 0000000000000000000000000000000000000000..0000000000000000000000000000000000000000 GIT binary patch literal 0 Hc$@ 0 + + +def test_register_new_archive_formats(prepare_shutil_state): + unpack_formats_v1 = [f[0] for f in shutil.get_unpack_formats()] + for format_id in tarball.ADDITIONAL_ARCHIVE_FORMATS: + assert format_id[0] not in unpack_formats_v1 + + # when + tarball.register_new_archive_formats() + + # then + unpack_formats_v2 = [f[0] for f in shutil.get_unpack_formats()] + for format_id in tarball.ADDITIONAL_ARCHIVE_FORMATS: + assert format_id[0] in unpack_formats_v2 + + +def test_uncompress_tarpaths(tmp_path, datadir, prepare_shutil_state): + archive_dir = os.path.join(datadir, 'archives') + tarfiles = os.listdir(archive_dir) + tarpaths = [os.path.join(archive_dir, tarfile) for tarfile in tarfiles] + + for n, tarpath in enumerate(tarpaths, start=1): + with pytest.raises(ValueError, + match=f'File {tarpath} is not a supported archive'): + tarball.uncompress(tarpath, dest=tmp_path) + + assert n == len(tarpaths) + + tarball.register_new_archive_formats() + + for n, tarpath in enumerate(tarpaths, start=1): + tarball.uncompress(tarpath, dest=tmp_path)