diff --git a/swh/core/tarball.py b/swh/core/tarball.py index 3f3c29a..6b7229c 100644 --- a/swh/core/tarball.py +++ b/swh/core/tarball.py @@ -1,147 +1,158 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import shutil import stat from subprocess import run import tarfile import zipfile from . import utils def _unpack_tar(tarpath: str, extract_dir: str) -> str: """Unpack tarballs unsupported by the standard python library. Examples include tar.Z, tar.lz, tar.x, etc.... As this implementation relies on the `tar` command, this function supports the same compression the tar command supports. This expects the `extract_dir` to exist. Raises shutil.ReadError in case of issue uncompressing the archive (tarpath does not exist, extract_dir does not exist, etc...) Returns full path to the uncompressed directory. """ try: run(["tar", "xf", tarpath, "-C", extract_dir], check=True) return extract_dir except Exception as e: raise shutil.ReadError( f"Unable to uncompress {tarpath} to {extract_dir}. Reason: {e}" ) def register_new_archive_formats(): """Register new archive formats to uncompress """ registered_formats = [f[0] for f in shutil.get_unpack_formats()] for name, extensions, function in ADDITIONAL_ARCHIVE_FORMATS: if name in registered_formats: continue shutil.register_unpack_format(name, extensions, function) def uncompress(tarpath: str, dest: str): """Uncompress tarpath to dest folder if tarball is supported. Note that this fixes permissions after successfully uncompressing the archive. Args: tarpath: path to tarball to uncompress dest: the destination folder where to uncompress the tarball Returns: The nature of the tarball, zip or tar. Raises: ValueError when a problem occurs during unpacking """ try: shutil.unpack_archive(tarpath, extract_dir=dest) except shutil.ReadError as e: raise ValueError(f"Problem during unpacking {tarpath}. Reason: {e}") - # Fix permissions - for dirpath, _, fnames in os.walk(dest): - os.chmod(dirpath, 0o755) + normalize_permissions(dest) + + +def normalize_permissions(path: str): + """Normalize the permissions of all files and directories under `path`. + + This makes all subdirectories and files with the user executable bit set mode + 0o0755, and all other files mode 0o0644. + + Args: + path: the path under which permissions should be normalized + """ + for dirpath, _, fnames in os.walk(path): + os.chmod(dirpath, 0o0755) for fname in fnames: fpath = os.path.join(dirpath, fname) if not os.path.islink(fpath): - fpath_exec = os.stat(fpath).st_mode & stat.S_IXUSR - if not fpath_exec: - os.chmod(fpath, 0o644) + is_executable = os.stat(fpath).st_mode & stat.S_IXUSR + forced_mode = 0o0755 if is_executable else 0o0644 + os.chmod(fpath, forced_mode) def _ls(rootdir): """Generator of filepath, filename from rootdir. """ for dirpath, dirnames, fnames in os.walk(rootdir): for fname in dirnames + fnames: fpath = os.path.join(dirpath, fname) fname = utils.commonname(rootdir, fpath) yield fpath, fname def _compress_zip(tarpath, files): """Compress dirpath's content as tarpath. """ with zipfile.ZipFile(tarpath, "w") as z: for fpath, fname in files: z.write(fpath, arcname=fname) def _compress_tar(tarpath, files): """Compress dirpath's content as tarpath. """ with tarfile.open(tarpath, "w:bz2") as t: for fpath, fname in files: t.add(fpath, arcname=fname, recursive=False) def compress(tarpath, nature, dirpath_or_files): """Create a tarball tarpath with nature nature. The content of the tarball is either dirpath's content (if representing a directory path) or dirpath's iterable contents. Compress the directory dirpath's content to a tarball. The tarball being dumped at tarpath. The nature of the tarball is determined by the nature argument. """ if isinstance(dirpath_or_files, str): files = _ls(dirpath_or_files) else: # iterable of 'filepath, filename' files = dirpath_or_files if nature == "zip": _compress_zip(tarpath, files) else: _compress_tar(tarpath, files) return tarpath # Additional uncompression archive format support ADDITIONAL_ARCHIVE_FORMATS = [ # name , extensions, function ("tar.Z|x", [".tar.Z", ".tar.x"], _unpack_tar), # FIXME: make this optional depending on the runtime lzip package install ("tar.lz", [".tar.lz"], _unpack_tar), ] register_new_archive_formats() diff --git a/swh/core/tests/test_tarball.py b/swh/core/tests/test_tarball.py index add1bec..a7cf261 100644 --- a/swh/core/tests/test_tarball.py +++ b/swh/core/tests/test_tarball.py @@ -1,224 +1,243 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import shutil import pytest from swh.core import tarball @pytest.fixture def prepare_shutil_state(): """Reset any shutil modification in its current state """ import shutil registered_formats = [f[0] for f in shutil.get_unpack_formats()] for format_id in tarball.ADDITIONAL_ARCHIVE_FORMATS: name = format_id[0] if name in registered_formats: shutil.unregister_unpack_format(name) return shutil def test_compress_uncompress_zip(tmp_path): tocompress = tmp_path / "compressme" tocompress.mkdir() for i in range(10): fpath = tocompress / ("file%s.txt" % i) fpath.write_text("content of file %s" % i) zipfile = tmp_path / "archive.zip" tarball.compress(str(zipfile), "zip", str(tocompress)) destdir = tmp_path / "destdir" tarball.uncompress(str(zipfile), str(destdir)) lsdir = sorted(x.name for x in destdir.iterdir()) assert ["file%s.txt" % i for i in range(10)] == lsdir @pytest.mark.xfail( reason=( "Python's zipfile library doesn't support Info-ZIP's " "extension for file permissions." ) ) def test_compress_uncompress_zip_modes(tmp_path): tocompress = tmp_path / "compressme" tocompress.mkdir() fpath = tocompress / "text.txt" fpath.write_text("echo foo") fpath.chmod(0o644) fpath = tocompress / "executable.sh" fpath.write_text("echo foo") fpath.chmod(0o755) zipfile = tmp_path / "archive.zip" tarball.compress(str(zipfile), "zip", str(tocompress)) destdir = tmp_path / "destdir" tarball.uncompress(str(zipfile), str(destdir)) (executable_path, text_path) = sorted(destdir.iterdir()) assert text_path.stat().st_mode == 0o100644 # succeeds, it's the default assert executable_path.stat().st_mode == 0o100755 # fails def test_compress_uncompress_tar(tmp_path): tocompress = tmp_path / "compressme" tocompress.mkdir() for i in range(10): fpath = tocompress / ("file%s.txt" % i) fpath.write_text("content of file %s" % i) tarfile = tmp_path / "archive.tar" tarball.compress(str(tarfile), "tar", str(tocompress)) destdir = tmp_path / "destdir" tarball.uncompress(str(tarfile), str(destdir)) lsdir = sorted(x.name for x in destdir.iterdir()) assert ["file%s.txt" % i for i in range(10)] == lsdir def test_compress_uncompress_tar_modes(tmp_path): tocompress = tmp_path / "compressme" tocompress.mkdir() fpath = tocompress / "text.txt" fpath.write_text("echo foo") fpath.chmod(0o644) fpath = tocompress / "executable.sh" fpath.write_text("echo foo") fpath.chmod(0o755) tarfile = tmp_path / "archive.tar" tarball.compress(str(tarfile), "tar", str(tocompress)) destdir = tmp_path / "destdir" tarball.uncompress(str(tarfile), str(destdir)) (executable_path, text_path) = sorted(destdir.iterdir()) assert text_path.stat().st_mode == 0o100644 assert executable_path.stat().st_mode == 0o100755 def test__unpack_tar_failure(tmp_path, datadir): """Unpack inexistent tarball should fail """ tarpath = os.path.join(datadir, "archives", "inexistent-archive.tar.Z") assert not os.path.exists(tarpath) with pytest.raises( shutil.ReadError, match=f"Unable to uncompress {tarpath} to {tmp_path}" ): tarball._unpack_tar(tarpath, tmp_path) def test__unpack_tar_failure2(tmp_path, datadir): """Unpack Existent tarball into an inexistent folder should fail """ filename = "groff-1.02.tar.Z" tarpath = os.path.join(datadir, "archives", filename) assert os.path.exists(tarpath) extract_dir = os.path.join(tmp_path, "dir", "inexistent") with pytest.raises( shutil.ReadError, match=f"Unable to uncompress {tarpath} to {tmp_path}" ): tarball._unpack_tar(tarpath, extract_dir) def test__unpack_tar_failure3(tmp_path, datadir): """Unpack unsupported tarball should fail """ filename = "hello.zip" tarpath = os.path.join(datadir, "archives", filename) assert os.path.exists(tarpath) with pytest.raises( shutil.ReadError, match=f"Unable to uncompress {tarpath} to {tmp_path}" ): tarball._unpack_tar(tarpath, tmp_path) def test__unpack_tar(tmp_path, datadir): """Unpack supported tarball into an existent folder should be ok """ filename = "groff-1.02.tar.Z" tarpath = os.path.join(datadir, "archives", filename) assert os.path.exists(tarpath) extract_dir = os.path.join(tmp_path, filename) os.makedirs(extract_dir, exist_ok=True) output_directory = tarball._unpack_tar(tarpath, extract_dir) assert extract_dir == output_directory assert len(os.listdir(extract_dir)) > 0 def test_register_new_archive_formats(prepare_shutil_state): """Registering new archive formats should be fine """ unpack_formats_v1 = [f[0] for f in shutil.get_unpack_formats()] for format_id in tarball.ADDITIONAL_ARCHIVE_FORMATS: assert format_id[0] not in unpack_formats_v1 # when tarball.register_new_archive_formats() # then unpack_formats_v2 = [f[0] for f in shutil.get_unpack_formats()] for format_id in tarball.ADDITIONAL_ARCHIVE_FORMATS: assert format_id[0] in unpack_formats_v2 def test_uncompress_tarpaths(tmp_path, datadir, prepare_shutil_state): """High level call uncompression on un/supported tarballs """ archive_dir = os.path.join(datadir, "archives") tarfiles = os.listdir(archive_dir) tarpaths = [os.path.join(archive_dir, tarfile) for tarfile in tarfiles] unsupported_tarpaths = [] for t in tarpaths: if t.endswith(".Z") or t.endswith(".x") or t.endswith(".lz"): unsupported_tarpaths.append(t) # not supported yet for tarpath in unsupported_tarpaths: with pytest.raises(ValueError, match=f"Problem during unpacking {tarpath}."): tarball.uncompress(tarpath, dest=tmp_path) # register those unsupported formats tarball.register_new_archive_formats() # unsupported formats are now supported for n, tarpath in enumerate(tarpaths, start=1): tarball.uncompress(tarpath, dest=tmp_path) assert n == len(tarpaths) + + +def test_normalize_permissions(tmp_path): + for perms in range(0o1000): + filename = str(perms) + file_path = tmp_path / filename + file_path.touch() + file_path.chmod(perms) + + for file in tmp_path.iterdir(): + assert file.stat().st_mode == 0o100000 | int(file.name) + + tarball.normalize_permissions(str(tmp_path)) + + for file in tmp_path.iterdir(): + if int(file.name) & 0o100: # original file was executable for its owner + assert file.stat().st_mode == 0o100755 + else: + assert file.stat().st_mode == 0o100644