Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F11023568
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
13 KB
Subscribers
None
View Options
diff --git a/swh/core/tarball.py b/swh/core/tarball.py
index d8e08b8..232b195 100644
--- a/swh/core/tarball.py
+++ b/swh/core/tarball.py
@@ -1,186 +1,191 @@
# Copyright (C) 2015-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
import shutil
import stat
from subprocess import run
import tarfile
import zipfile
from . import utils
def _unpack_tar(tarpath: str, extract_dir: str) -> str:
"""Unpack tarballs unsupported by the standard python library. Examples
include tar.Z, tar.lz, tar.x, etc....
As this implementation relies on the `tar` command, this function supports
the same compression the tar command supports.
This expects the `extract_dir` to exist.
Raises:
shutil.ReadError in case of issue uncompressing the archive (tarpath
does not exist, extract_dir does not exist, etc...)
Returns:
full path to the uncompressed directory.
"""
try:
run(["tar", "xf", tarpath, "-C", extract_dir], check=True)
return extract_dir
except Exception as e:
raise shutil.ReadError(
f"Unable to uncompress {tarpath} to {extract_dir}. Reason: {e}"
)
def _unpack_zip(zippath: str, extract_dir: str) -> str:
"""Unpack zip files unsupported by the standard python library, for instance
those with legacy compression type 6 (implode).
This expects the `extract_dir` to exist.
Raises:
shutil.ReadError in case of issue uncompressing the archive (zippath
does not exist, extract_dir does not exist, etc...)
Returns:
full path to the uncompressed directory.
"""
try:
run(["unzip", "-q", "-d", extract_dir, zippath], check=True)
return extract_dir
except Exception as e:
raise shutil.ReadError(
f"Unable to uncompress {zippath} to {extract_dir}. Reason: {e}"
)
def register_new_archive_formats():
"""Register new archive formats to uncompress
"""
registered_formats = [f[0] for f in shutil.get_unpack_formats()]
for name, extensions, function in ADDITIONAL_ARCHIVE_FORMATS:
if name in registered_formats:
continue
shutil.register_unpack_format(name, extensions, function)
def uncompress(tarpath: str, dest: str):
"""Uncompress tarpath to dest folder if tarball is supported.
Note that this fixes permissions after successfully
uncompressing the archive.
Args:
tarpath: path to tarball to uncompress
dest: the destination folder where to uncompress the tarball,
it will be created if it does not exist
Raises:
ValueError when a problem occurs during unpacking
"""
try:
os.makedirs(dest, exist_ok=True)
- shutil.unpack_archive(tarpath, extract_dir=dest)
+ format = None
+ for format_, exts, _ in shutil.get_unpack_formats():
+ if any([tarpath.lower().endswith(ext.lower()) for ext in exts]):
+ format = format_
+ break
+ shutil.unpack_archive(tarpath, extract_dir=dest, format=format)
except shutil.ReadError as e:
raise ValueError(f"Problem during unpacking {tarpath}. Reason: {e}")
except NotImplementedError:
- if tarpath.endswith(".zip"):
+ if tarpath.lower().endswith(".zip"):
_unpack_zip(tarpath, dest)
else:
raise
normalize_permissions(dest)
def normalize_permissions(path: str):
"""Normalize the permissions of all files and directories under `path`.
This makes all subdirectories and files with the user executable bit set mode
0o0755, and all other files mode 0o0644.
Args:
path: the path under which permissions should be normalized
"""
for dirpath, _, fnames in os.walk(path):
os.chmod(dirpath, 0o0755)
for fname in fnames:
fpath = os.path.join(dirpath, fname)
if not os.path.islink(fpath):
is_executable = os.stat(fpath).st_mode & stat.S_IXUSR
forced_mode = 0o0755 if is_executable else 0o0644
os.chmod(fpath, forced_mode)
def _ls(rootdir):
"""Generator of filepath, filename from rootdir.
"""
for dirpath, dirnames, fnames in os.walk(rootdir):
for fname in dirnames + fnames:
fpath = os.path.join(dirpath, fname)
fname = utils.commonname(rootdir, fpath)
yield fpath, fname
def _compress_zip(tarpath, files):
"""Compress dirpath's content as tarpath.
"""
with zipfile.ZipFile(tarpath, "w") as z:
for fpath, fname in files:
z.write(fpath, arcname=fname)
def _compress_tar(tarpath, files):
"""Compress dirpath's content as tarpath.
"""
with tarfile.open(tarpath, "w:bz2") as t:
for fpath, fname in files:
t.add(fpath, arcname=fname, recursive=False)
def compress(tarpath, nature, dirpath_or_files):
"""Create a tarball tarpath with nature nature.
The content of the tarball is either dirpath's content (if representing
a directory path) or dirpath's iterable contents.
Compress the directory dirpath's content to a tarball.
The tarball being dumped at tarpath.
The nature of the tarball is determined by the nature argument.
"""
if isinstance(dirpath_or_files, str):
files = _ls(dirpath_or_files)
else: # iterable of 'filepath, filename'
files = dirpath_or_files
if nature == "zip":
_compress_zip(tarpath, files)
else:
_compress_tar(tarpath, files)
return tarpath
# Additional uncompression archive format support
ADDITIONAL_ARCHIVE_FORMATS = [
# name, extensions, function
("tar.Z|x", [".tar.Z", ".tar.x"], _unpack_tar),
("jar", [".jar"], _unpack_zip),
("tbz2", [".tbz", "tbz2"], _unpack_tar),
# FIXME: make this optional depending on the runtime lzip package install
("tar.lz", [".tar.lz"], _unpack_tar),
]
register_new_archive_formats()
diff --git a/swh/core/tests/test_tarball.py b/swh/core/tests/test_tarball.py
index f8680f2..a4fa829 100644
--- a/swh/core/tests/test_tarball.py
+++ b/swh/core/tests/test_tarball.py
@@ -1,224 +1,242 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
import shutil
import pytest
from swh.core import tarball
@pytest.fixture
def prepare_shutil_state():
"""Reset any shutil modification in its current state
"""
import shutil
registered_formats = [f[0] for f in shutil.get_unpack_formats()]
for format_id in tarball.ADDITIONAL_ARCHIVE_FORMATS:
name = format_id[0]
if name in registered_formats:
shutil.unregister_unpack_format(name)
return shutil
def test_compress_uncompress_zip(tmp_path):
tocompress = tmp_path / "compressme"
tocompress.mkdir()
for i in range(10):
fpath = tocompress / ("file%s.txt" % i)
fpath.write_text("content of file %s" % i)
zipfile = tmp_path / "archive.zip"
tarball.compress(str(zipfile), "zip", str(tocompress))
destdir = tmp_path / "destdir"
tarball.uncompress(str(zipfile), str(destdir))
lsdir = sorted(x.name for x in destdir.iterdir())
assert ["file%s.txt" % i for i in range(10)] == lsdir
@pytest.mark.xfail(
reason=(
"Python's zipfile library doesn't support Info-ZIP's "
"extension for file permissions."
)
)
def test_compress_uncompress_zip_modes(tmp_path):
tocompress = tmp_path / "compressme"
tocompress.mkdir()
fpath = tocompress / "text.txt"
fpath.write_text("echo foo")
fpath.chmod(0o644)
fpath = tocompress / "executable.sh"
fpath.write_text("echo foo")
fpath.chmod(0o755)
zipfile = tmp_path / "archive.zip"
tarball.compress(str(zipfile), "zip", str(tocompress))
destdir = tmp_path / "destdir"
tarball.uncompress(str(zipfile), str(destdir))
(executable_path, text_path) = sorted(destdir.iterdir())
assert text_path.stat().st_mode == 0o100644 # succeeds, it's the default
assert executable_path.stat().st_mode == 0o100755 # fails
def test_compress_uncompress_tar(tmp_path):
tocompress = tmp_path / "compressme"
tocompress.mkdir()
for i in range(10):
fpath = tocompress / ("file%s.txt" % i)
fpath.write_text("content of file %s" % i)
tarfile = tmp_path / "archive.tar"
tarball.compress(str(tarfile), "tar", str(tocompress))
destdir = tmp_path / "destdir"
tarball.uncompress(str(tarfile), str(destdir))
lsdir = sorted(x.name for x in destdir.iterdir())
assert ["file%s.txt" % i for i in range(10)] == lsdir
def test_compress_uncompress_tar_modes(tmp_path):
tocompress = tmp_path / "compressme"
tocompress.mkdir()
fpath = tocompress / "text.txt"
fpath.write_text("echo foo")
fpath.chmod(0o644)
fpath = tocompress / "executable.sh"
fpath.write_text("echo foo")
fpath.chmod(0o755)
tarfile = tmp_path / "archive.tar"
tarball.compress(str(tarfile), "tar", str(tocompress))
destdir = tmp_path / "destdir"
tarball.uncompress(str(tarfile), str(destdir))
(executable_path, text_path) = sorted(destdir.iterdir())
assert text_path.stat().st_mode == 0o100644
assert executable_path.stat().st_mode == 0o100755
def test_uncompress_tar_failure(tmp_path, datadir):
"""Unpack inexistent tarball should fail
"""
tarpath = os.path.join(datadir, "archives", "inexistent-archive.tar.Z")
assert not os.path.exists(tarpath)
with pytest.raises(ValueError, match=f"Problem during unpacking {tarpath}"):
tarball.uncompress(tarpath, tmp_path)
def test_uncompress_tar(tmp_path, datadir):
"""Unpack supported tarball into an existent folder should be ok
"""
filename = "groff-1.02.tar.Z"
tarpath = os.path.join(datadir, "archives", filename)
assert os.path.exists(tarpath)
extract_dir = os.path.join(tmp_path, filename)
tarball.uncompress(tarpath, extract_dir)
assert len(os.listdir(extract_dir)) > 0
def test_register_new_archive_formats(prepare_shutil_state):
"""Registering new archive formats should be fine
"""
unpack_formats_v1 = [f[0] for f in shutil.get_unpack_formats()]
for format_id in tarball.ADDITIONAL_ARCHIVE_FORMATS:
assert format_id[0] not in unpack_formats_v1
# when
tarball.register_new_archive_formats()
# then
unpack_formats_v2 = [f[0] for f in shutil.get_unpack_formats()]
for format_id in tarball.ADDITIONAL_ARCHIVE_FORMATS:
assert format_id[0] in unpack_formats_v2
def test_uncompress_tarpaths(tmp_path, datadir, prepare_shutil_state):
"""High level call uncompression on un/supported tarballs
"""
archive_dir = os.path.join(datadir, "archives")
tarfiles = os.listdir(archive_dir)
tarpaths = [os.path.join(archive_dir, tarfile) for tarfile in tarfiles]
unsupported_tarpaths = []
for t in tarpaths:
if t.endswith(".Z") or t.endswith(".x") or t.endswith(".lz"):
unsupported_tarpaths.append(t)
# not supported yet
for tarpath in unsupported_tarpaths:
with pytest.raises(ValueError, match=f"Problem during unpacking {tarpath}."):
tarball.uncompress(tarpath, dest=tmp_path)
# register those unsupported formats
tarball.register_new_archive_formats()
# unsupported formats are now supported
for n, tarpath in enumerate(tarpaths, start=1):
tarball.uncompress(tarpath, dest=tmp_path)
assert n == len(tarpaths)
def test_normalize_permissions(tmp_path):
for perms in range(0o1000):
filename = str(perms)
file_path = tmp_path / filename
file_path.touch()
file_path.chmod(perms)
for file in tmp_path.iterdir():
assert file.stat().st_mode == 0o100000 | int(file.name)
tarball.normalize_permissions(str(tmp_path))
for file in tmp_path.iterdir():
if int(file.name) & 0o100: # original file was executable for its owner
assert file.stat().st_mode == 0o100755
else:
assert file.stat().st_mode == 0o100644
def test_unpcompress_zip_imploded(tmp_path, datadir):
"""Unpack a zip archive with compression type 6 (implode),
not supported by python zipfile module.
"""
filename = "msk316src.zip"
zippath = os.path.join(datadir, "archives", filename)
assert os.path.exists(zippath)
extract_dir = os.path.join(tmp_path, filename)
tarball.uncompress(zippath, extract_dir)
assert len(os.listdir(extract_dir)) > 0
+
+
+def test_uncompress_upper_archive_extension(tmp_path, datadir):
+ """Copy test archives in a temporary directory but turn their names
+ to uppercase, then check they can be successfully extracted.
+ """
+ archives_path = os.path.join(datadir, "archives")
+ archive_files = [
+ f
+ for f in os.listdir(archives_path)
+ if os.path.isfile(os.path.join(archives_path, f))
+ ]
+ for archive_file in archive_files:
+ archive_file_upper = os.path.join(tmp_path, archive_file.upper())
+ extract_dir = os.path.join(tmp_path, archive_file)
+ shutil.copy(os.path.join(archives_path, archive_file), archive_file_upper)
+ tarball.uncompress(archive_file_upper, extract_dir)
+ assert len(os.listdir(extract_dir)) > 0
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Thu, Sep 18, 4:48 PM (1 d, 12 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3252139
Attached To
rDCORE Foundations and core functionalities
Event Timeline
Log In to Comment