Changeset View
Changeset View
Standalone View
Standalone View
swh/core/tarball.py
Show All 27 Lines | Raises | ||||
shutil.ReadError in case of issue uncompressing the archive (tarpath | shutil.ReadError in case of issue uncompressing the archive (tarpath | ||||
does not exist, extract_dir does not exist, etc...) | does not exist, extract_dir does not exist, etc...) | ||||
Returns | Returns | ||||
full path to the uncompressed directory. | full path to the uncompressed directory. | ||||
""" | """ | ||||
try: | try: | ||||
run(['tar', 'xf', tarpath, '-C', extract_dir], check=True) | run(["tar", "xf", tarpath, "-C", extract_dir], check=True) | ||||
return extract_dir | return extract_dir | ||||
except Exception as e: | except Exception as e: | ||||
raise shutil.ReadError( | raise shutil.ReadError( | ||||
f'Unable to uncompress {tarpath} to {extract_dir}. Reason: {e}') | f"Unable to uncompress {tarpath} to {extract_dir}. Reason: {e}" | ||||
) | |||||
def register_new_archive_formats(): | def register_new_archive_formats(): | ||||
"""Register new archive formats to uncompress | """Register new archive formats to uncompress | ||||
""" | """ | ||||
registered_formats = [f[0] for f in shutil.get_unpack_formats()] | registered_formats = [f[0] for f in shutil.get_unpack_formats()] | ||||
for name, extensions, function in ADDITIONAL_ARCHIVE_FORMATS: | for name, extensions, function in ADDITIONAL_ARCHIVE_FORMATS: | ||||
Show All 17 Lines | def uncompress(tarpath: str, dest: str): | ||||
Raises: | Raises: | ||||
ValueError when a problem occurs during unpacking | ValueError when a problem occurs during unpacking | ||||
""" | """ | ||||
try: | try: | ||||
shutil.unpack_archive(tarpath, extract_dir=dest) | shutil.unpack_archive(tarpath, extract_dir=dest) | ||||
except shutil.ReadError as e: | except shutil.ReadError as e: | ||||
raise ValueError(f'Problem during unpacking {tarpath}. Reason: {e}') | raise ValueError(f"Problem during unpacking {tarpath}. Reason: {e}") | ||||
# Fix permissions | # Fix permissions | ||||
for dirpath, _, fnames in os.walk(dest): | for dirpath, _, fnames in os.walk(dest): | ||||
os.chmod(dirpath, 0o755) | os.chmod(dirpath, 0o755) | ||||
for fname in fnames: | for fname in fnames: | ||||
fpath = os.path.join(dirpath, fname) | fpath = os.path.join(dirpath, fname) | ||||
if not os.path.islink(fpath): | if not os.path.islink(fpath): | ||||
fpath_exec = os.stat(fpath).st_mode & stat.S_IXUSR | fpath_exec = os.stat(fpath).st_mode & stat.S_IXUSR | ||||
if not fpath_exec: | if not fpath_exec: | ||||
os.chmod(fpath, 0o644) | os.chmod(fpath, 0o644) | ||||
def _ls(rootdir): | def _ls(rootdir): | ||||
"""Generator of filepath, filename from rootdir. | """Generator of filepath, filename from rootdir. | ||||
""" | """ | ||||
for dirpath, dirnames, fnames in os.walk(rootdir): | for dirpath, dirnames, fnames in os.walk(rootdir): | ||||
for fname in (dirnames+fnames): | for fname in dirnames + fnames: | ||||
fpath = os.path.join(dirpath, fname) | fpath = os.path.join(dirpath, fname) | ||||
fname = utils.commonname(rootdir, fpath) | fname = utils.commonname(rootdir, fpath) | ||||
yield fpath, fname | yield fpath, fname | ||||
def _compress_zip(tarpath, files): | def _compress_zip(tarpath, files): | ||||
"""Compress dirpath's content as tarpath. | """Compress dirpath's content as tarpath. | ||||
""" | """ | ||||
with zipfile.ZipFile(tarpath, 'w') as z: | with zipfile.ZipFile(tarpath, "w") as z: | ||||
for fpath, fname in files: | for fpath, fname in files: | ||||
z.write(fpath, arcname=fname) | z.write(fpath, arcname=fname) | ||||
def _compress_tar(tarpath, files): | def _compress_tar(tarpath, files): | ||||
"""Compress dirpath's content as tarpath. | """Compress dirpath's content as tarpath. | ||||
""" | """ | ||||
with tarfile.open(tarpath, 'w:bz2') as t: | with tarfile.open(tarpath, "w:bz2") as t: | ||||
for fpath, fname in files: | for fpath, fname in files: | ||||
t.add(fpath, arcname=fname, recursive=False) | t.add(fpath, arcname=fname, recursive=False) | ||||
def compress(tarpath, nature, dirpath_or_files): | def compress(tarpath, nature, dirpath_or_files): | ||||
"""Create a tarball tarpath with nature nature. | """Create a tarball tarpath with nature nature. | ||||
The content of the tarball is either dirpath's content (if representing | The content of the tarball is either dirpath's content (if representing | ||||
a directory path) or dirpath's iterable contents. | a directory path) or dirpath's iterable contents. | ||||
Compress the directory dirpath's content to a tarball. | Compress the directory dirpath's content to a tarball. | ||||
The tarball being dumped at tarpath. | The tarball being dumped at tarpath. | ||||
The nature of the tarball is determined by the nature argument. | The nature of the tarball is determined by the nature argument. | ||||
""" | """ | ||||
if isinstance(dirpath_or_files, str): | if isinstance(dirpath_or_files, str): | ||||
files = _ls(dirpath_or_files) | files = _ls(dirpath_or_files) | ||||
else: # iterable of 'filepath, filename' | else: # iterable of 'filepath, filename' | ||||
files = dirpath_or_files | files = dirpath_or_files | ||||
if nature == 'zip': | if nature == "zip": | ||||
_compress_zip(tarpath, files) | _compress_zip(tarpath, files) | ||||
else: | else: | ||||
_compress_tar(tarpath, files) | _compress_tar(tarpath, files) | ||||
return tarpath | return tarpath | ||||
# Additional uncompression archive format support | # Additional uncompression archive format support | ||||
ADDITIONAL_ARCHIVE_FORMATS = [ | ADDITIONAL_ARCHIVE_FORMATS = [ | ||||
# name , extensions, function | # name , extensions, function | ||||
('tar.Z|x', ['.tar.Z', '.tar.x'], _unpack_tar), | ("tar.Z|x", [".tar.Z", ".tar.x"], _unpack_tar), | ||||
# FIXME: make this optional depending on the runtime lzip package install | # FIXME: make this optional depending on the runtime lzip package install | ||||
('tar.lz', ['.tar.lz'], _unpack_tar), | ("tar.lz", [".tar.lz"], _unpack_tar), | ||||
] | ] | ||||
register_new_archive_formats() | register_new_archive_formats() |