Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/core/tarball.py b/swh/core/tarball.py
index d8e08b8..232b195 100644
--- a/swh/core/tarball.py
+++ b/swh/core/tarball.py
@@ -1,186 +1,191 @@
# Copyright (C) 2015-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
import shutil
import stat
from subprocess import run
import tarfile
import zipfile
from . import utils
def _unpack_tar(tarpath: str, extract_dir: str) -> str:
"""Unpack tarballs unsupported by the standard python library. Examples
include tar.Z, tar.lz, tar.x, etc....
As this implementation relies on the `tar` command, this function supports
the same compression the tar command supports.
This expects the `extract_dir` to exist.
Raises:
shutil.ReadError in case of issue uncompressing the archive (tarpath
does not exist, extract_dir does not exist, etc...)
Returns:
full path to the uncompressed directory.
"""
try:
run(["tar", "xf", tarpath, "-C", extract_dir], check=True)
return extract_dir
except Exception as e:
raise shutil.ReadError(
f"Unable to uncompress {tarpath} to {extract_dir}. Reason: {e}"
)
def _unpack_zip(zippath: str, extract_dir: str) -> str:
"""Unpack zip files unsupported by the standard python library, for instance
those with legacy compression type 6 (implode).
This expects the `extract_dir` to exist.
Raises:
shutil.ReadError in case of issue uncompressing the archive (zippath
does not exist, extract_dir does not exist, etc...)
Returns:
full path to the uncompressed directory.
"""
try:
run(["unzip", "-q", "-d", extract_dir, zippath], check=True)
return extract_dir
except Exception as e:
raise shutil.ReadError(
f"Unable to uncompress {zippath} to {extract_dir}. Reason: {e}"
)
def register_new_archive_formats():
"""Register new archive formats to uncompress
"""
registered_formats = [f[0] for f in shutil.get_unpack_formats()]
for name, extensions, function in ADDITIONAL_ARCHIVE_FORMATS:
if name in registered_formats:
continue
shutil.register_unpack_format(name, extensions, function)
def uncompress(tarpath: str, dest: str):
"""Uncompress tarpath to dest folder if tarball is supported.
Note that this fixes permissions after successfully
uncompressing the archive.
Args:
tarpath: path to tarball to uncompress
dest: the destination folder where to uncompress the tarball,
it will be created if it does not exist
Raises:
ValueError when a problem occurs during unpacking
"""
try:
os.makedirs(dest, exist_ok=True)
- shutil.unpack_archive(tarpath, extract_dir=dest)
+ format = None
+ for format_, exts, _ in shutil.get_unpack_formats():
+ if any([tarpath.lower().endswith(ext.lower()) for ext in exts]):
+ format = format_
+ break
+ shutil.unpack_archive(tarpath, extract_dir=dest, format=format)
except shutil.ReadError as e:
raise ValueError(f"Problem during unpacking {tarpath}. Reason: {e}")
except NotImplementedError:
- if tarpath.endswith(".zip"):
+ if tarpath.lower().endswith(".zip"):
_unpack_zip(tarpath, dest)
else:
raise
normalize_permissions(dest)
def normalize_permissions(path: str):
"""Normalize the permissions of all files and directories under `path`.
This makes all subdirectories and files with the user executable bit set mode
0o0755, and all other files mode 0o0644.
Args:
path: the path under which permissions should be normalized
"""
for dirpath, _, fnames in os.walk(path):
os.chmod(dirpath, 0o0755)
for fname in fnames:
fpath = os.path.join(dirpath, fname)
if not os.path.islink(fpath):
is_executable = os.stat(fpath).st_mode & stat.S_IXUSR
forced_mode = 0o0755 if is_executable else 0o0644
os.chmod(fpath, forced_mode)
def _ls(rootdir):
"""Generator of filepath, filename from rootdir.
"""
for dirpath, dirnames, fnames in os.walk(rootdir):
for fname in dirnames + fnames:
fpath = os.path.join(dirpath, fname)
fname = utils.commonname(rootdir, fpath)
yield fpath, fname
def _compress_zip(tarpath, files):
"""Compress dirpath's content as tarpath.
"""
with zipfile.ZipFile(tarpath, "w") as z:
for fpath, fname in files:
z.write(fpath, arcname=fname)
def _compress_tar(tarpath, files):
"""Compress dirpath's content as tarpath.
"""
with tarfile.open(tarpath, "w:bz2") as t:
for fpath, fname in files:
t.add(fpath, arcname=fname, recursive=False)
def compress(tarpath, nature, dirpath_or_files):
"""Create a tarball tarpath with nature nature.
The content of the tarball is either dirpath's content (if representing
a directory path) or dirpath's iterable contents.
Compress the directory dirpath's content to a tarball.
The tarball being dumped at tarpath.
The nature of the tarball is determined by the nature argument.
"""
if isinstance(dirpath_or_files, str):
files = _ls(dirpath_or_files)
else: # iterable of 'filepath, filename'
files = dirpath_or_files
if nature == "zip":
_compress_zip(tarpath, files)
else:
_compress_tar(tarpath, files)
return tarpath
# Additional uncompression archive format support
ADDITIONAL_ARCHIVE_FORMATS = [
# name, extensions, function
("tar.Z|x", [".tar.Z", ".tar.x"], _unpack_tar),
("jar", [".jar"], _unpack_zip),
("tbz2", [".tbz", "tbz2"], _unpack_tar),
# FIXME: make this optional depending on the runtime lzip package install
("tar.lz", [".tar.lz"], _unpack_tar),
]
register_new_archive_formats()
diff --git a/swh/core/tests/test_tarball.py b/swh/core/tests/test_tarball.py
index f8680f2..a4fa829 100644
--- a/swh/core/tests/test_tarball.py
+++ b/swh/core/tests/test_tarball.py
@@ -1,224 +1,242 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
import shutil
import pytest
from swh.core import tarball
@pytest.fixture
def prepare_shutil_state():
"""Reset any shutil modification in its current state
"""
import shutil
registered_formats = [f[0] for f in shutil.get_unpack_formats()]
for format_id in tarball.ADDITIONAL_ARCHIVE_FORMATS:
name = format_id[0]
if name in registered_formats:
shutil.unregister_unpack_format(name)
return shutil
def test_compress_uncompress_zip(tmp_path):
tocompress = tmp_path / "compressme"
tocompress.mkdir()
for i in range(10):
fpath = tocompress / ("file%s.txt" % i)
fpath.write_text("content of file %s" % i)
zipfile = tmp_path / "archive.zip"
tarball.compress(str(zipfile), "zip", str(tocompress))
destdir = tmp_path / "destdir"
tarball.uncompress(str(zipfile), str(destdir))
lsdir = sorted(x.name for x in destdir.iterdir())
assert ["file%s.txt" % i for i in range(10)] == lsdir
@pytest.mark.xfail(
reason=(
"Python's zipfile library doesn't support Info-ZIP's "
"extension for file permissions."
)
)
def test_compress_uncompress_zip_modes(tmp_path):
tocompress = tmp_path / "compressme"
tocompress.mkdir()
fpath = tocompress / "text.txt"
fpath.write_text("echo foo")
fpath.chmod(0o644)
fpath = tocompress / "executable.sh"
fpath.write_text("echo foo")
fpath.chmod(0o755)
zipfile = tmp_path / "archive.zip"
tarball.compress(str(zipfile), "zip", str(tocompress))
destdir = tmp_path / "destdir"
tarball.uncompress(str(zipfile), str(destdir))
(executable_path, text_path) = sorted(destdir.iterdir())
assert text_path.stat().st_mode == 0o100644 # succeeds, it's the default
assert executable_path.stat().st_mode == 0o100755 # fails
def test_compress_uncompress_tar(tmp_path):
tocompress = tmp_path / "compressme"
tocompress.mkdir()
for i in range(10):
fpath = tocompress / ("file%s.txt" % i)
fpath.write_text("content of file %s" % i)
tarfile = tmp_path / "archive.tar"
tarball.compress(str(tarfile), "tar", str(tocompress))
destdir = tmp_path / "destdir"
tarball.uncompress(str(tarfile), str(destdir))
lsdir = sorted(x.name for x in destdir.iterdir())
assert ["file%s.txt" % i for i in range(10)] == lsdir
def test_compress_uncompress_tar_modes(tmp_path):
tocompress = tmp_path / "compressme"
tocompress.mkdir()
fpath = tocompress / "text.txt"
fpath.write_text("echo foo")
fpath.chmod(0o644)
fpath = tocompress / "executable.sh"
fpath.write_text("echo foo")
fpath.chmod(0o755)
tarfile = tmp_path / "archive.tar"
tarball.compress(str(tarfile), "tar", str(tocompress))
destdir = tmp_path / "destdir"
tarball.uncompress(str(tarfile), str(destdir))
(executable_path, text_path) = sorted(destdir.iterdir())
assert text_path.stat().st_mode == 0o100644
assert executable_path.stat().st_mode == 0o100755
def test_uncompress_tar_failure(tmp_path, datadir):
"""Unpack inexistent tarball should fail
"""
tarpath = os.path.join(datadir, "archives", "inexistent-archive.tar.Z")
assert not os.path.exists(tarpath)
with pytest.raises(ValueError, match=f"Problem during unpacking {tarpath}"):
tarball.uncompress(tarpath, tmp_path)
def test_uncompress_tar(tmp_path, datadir):
"""Unpack supported tarball into an existent folder should be ok
"""
filename = "groff-1.02.tar.Z"
tarpath = os.path.join(datadir, "archives", filename)
assert os.path.exists(tarpath)
extract_dir = os.path.join(tmp_path, filename)
tarball.uncompress(tarpath, extract_dir)
assert len(os.listdir(extract_dir)) > 0
def test_register_new_archive_formats(prepare_shutil_state):
"""Registering new archive formats should be fine
"""
unpack_formats_v1 = [f[0] for f in shutil.get_unpack_formats()]
for format_id in tarball.ADDITIONAL_ARCHIVE_FORMATS:
assert format_id[0] not in unpack_formats_v1
# when
tarball.register_new_archive_formats()
# then
unpack_formats_v2 = [f[0] for f in shutil.get_unpack_formats()]
for format_id in tarball.ADDITIONAL_ARCHIVE_FORMATS:
assert format_id[0] in unpack_formats_v2
def test_uncompress_tarpaths(tmp_path, datadir, prepare_shutil_state):
"""High level call uncompression on un/supported tarballs
"""
archive_dir = os.path.join(datadir, "archives")
tarfiles = os.listdir(archive_dir)
tarpaths = [os.path.join(archive_dir, tarfile) for tarfile in tarfiles]
unsupported_tarpaths = []
for t in tarpaths:
if t.endswith(".Z") or t.endswith(".x") or t.endswith(".lz"):
unsupported_tarpaths.append(t)
# not supported yet
for tarpath in unsupported_tarpaths:
with pytest.raises(ValueError, match=f"Problem during unpacking {tarpath}."):
tarball.uncompress(tarpath, dest=tmp_path)
# register those unsupported formats
tarball.register_new_archive_formats()
# unsupported formats are now supported
for n, tarpath in enumerate(tarpaths, start=1):
tarball.uncompress(tarpath, dest=tmp_path)
assert n == len(tarpaths)
def test_normalize_permissions(tmp_path):
for perms in range(0o1000):
filename = str(perms)
file_path = tmp_path / filename
file_path.touch()
file_path.chmod(perms)
for file in tmp_path.iterdir():
assert file.stat().st_mode == 0o100000 | int(file.name)
tarball.normalize_permissions(str(tmp_path))
for file in tmp_path.iterdir():
if int(file.name) & 0o100: # original file was executable for its owner
assert file.stat().st_mode == 0o100755
else:
assert file.stat().st_mode == 0o100644
def test_unpcompress_zip_imploded(tmp_path, datadir):
"""Unpack a zip archive with compression type 6 (implode),
not supported by python zipfile module.
"""
filename = "msk316src.zip"
zippath = os.path.join(datadir, "archives", filename)
assert os.path.exists(zippath)
extract_dir = os.path.join(tmp_path, filename)
tarball.uncompress(zippath, extract_dir)
assert len(os.listdir(extract_dir)) > 0
+
+
+def test_uncompress_upper_archive_extension(tmp_path, datadir):
+ """Copy test archives in a temporary directory but turn their names
+ to uppercase, then check they can be successfully extracted.
+ """
+ archives_path = os.path.join(datadir, "archives")
+ archive_files = [
+ f
+ for f in os.listdir(archives_path)
+ if os.path.isfile(os.path.join(archives_path, f))
+ ]
+ for archive_file in archive_files:
+ archive_file_upper = os.path.join(tmp_path, archive_file.upper())
+ extract_dir = os.path.join(tmp_path, archive_file)
+ shutil.copy(os.path.join(archives_path, archive_file), archive_file_upper)
+ tarball.uncompress(archive_file_upper, extract_dir)
+ assert len(os.listdir(extract_dir)) > 0

File Metadata

Mime Type
text/x-diff
Expires
Thu, Sep 18, 4:48 PM (1 d, 12 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3252139

Event Timeline