Changeset View
Changeset View
Standalone View
Standalone View
swh/core/tarball.py
Show First 20 Lines • Show All 58 Lines • ▼ Show 20 Lines | try: | ||||
run(["unzip", "-q", "-d", extract_dir, zippath], check=True) | run(["unzip", "-q", "-d", extract_dir, zippath], check=True) | ||||
return extract_dir | return extract_dir | ||||
except Exception as e: | except Exception as e: | ||||
raise shutil.ReadError( | raise shutil.ReadError( | ||||
f"Unable to uncompress {zippath} to {extract_dir}. Reason: {e}" | f"Unable to uncompress {zippath} to {extract_dir}. Reason: {e}" | ||||
) | ) | ||||
def _unpack_jar(jarpath: str, extract_dir: str) -> str: | |||||
"""Unpack jar files using standard Python module zipfile. | |||||
This expects the `extract_dir` to exist. | |||||
Raises: | |||||
shutil.ReadError in case of issue uncompressing the archive (jarpath | |||||
does not exist, extract_dir does not exist, etc...) | |||||
Returns: | |||||
full path to the uncompressed directory. | |||||
""" | |||||
try: | |||||
with zipfile.ZipFile(jarpath) as jar: | |||||
jar.extractall(path=extract_dir) | |||||
return extract_dir | |||||
except Exception as e: | |||||
raise shutil.ReadError( | |||||
f"Unable to uncompress {jarpath} to {extract_dir}. Reason: {e}" | |||||
) | |||||
def register_new_archive_formats(): | def register_new_archive_formats(): | ||||
"""Register new archive formats to uncompress""" | """Register new archive formats to uncompress""" | ||||
registered_formats = [f[0] for f in shutil.get_unpack_formats()] | registered_formats = [f[0] for f in shutil.get_unpack_formats()] | ||||
for name, extensions, function in ADDITIONAL_ARCHIVE_FORMATS: | for name, extensions, function in ADDITIONAL_ARCHIVE_FORMATS: | ||||
if name in registered_formats: | if name in registered_formats: | ||||
continue | continue | ||||
shutil.register_unpack_format(name, extensions, function) | shutil.register_unpack_format(name, extensions, function) | ||||
_mime_to_archive_format = { | _mime_to_archive_format = { | ||||
"application/x-compress": "tar.Z|x", | "application/x-compress": "tar.Z|x", | ||||
"application/x-tar": "tar", | "application/x-tar": "tar", | ||||
"application/x-bzip2": "bztar", | "application/x-bzip2": "bztar", | ||||
"application/gzip": "gztar", | "application/gzip": "gztar", | ||||
"application/x-lzip": "tar.lz", | "application/x-lzip": "tar.lz", | ||||
"application/zip": "zip", | "application/zip": "zip", | ||||
"application/java-archive": "jar", | |||||
} | } | ||||
def uncompress(tarpath: str, dest: str): | def uncompress(tarpath: str, dest: str): | ||||
"""Uncompress tarpath to dest folder if tarball is supported. | """Uncompress tarpath to dest folder if tarball is supported. | ||||
Note that this fixes permissions after successfully | Note that this fixes permissions after successfully | ||||
uncompressing the archive. | uncompressing the archive. | ||||
▲ Show 20 Lines • Show All 96 Lines • ▼ Show 20 Lines | def compress(tarpath, nature, dirpath_or_files): | ||||
return tarpath | return tarpath | ||||
# Additional uncompression archive format support | # Additional uncompression archive format support | ||||
ADDITIONAL_ARCHIVE_FORMATS = [ | ADDITIONAL_ARCHIVE_FORMATS = [ | ||||
# name, extensions, function | # name, extensions, function | ||||
("tar.Z|x", [".tar.Z", ".tar.x"], _unpack_tar), | ("tar.Z|x", [".tar.Z", ".tar.x"], _unpack_tar), | ||||
("jar", [".jar"], _unpack_zip), | ("jar", [".jar"], _unpack_jar), | ||||
("tbz2", [".tbz", "tbz2"], _unpack_tar), | ("tbz2", [".tbz", "tbz2"], _unpack_tar), | ||||
# FIXME: make this optional depending on the runtime lzip package install | # FIXME: make this optional depending on the runtime lzip package install | ||||
("tar.lz", [".tar.lz"], _unpack_tar), | ("tar.lz", [".tar.lz"], _unpack_tar), | ||||
("crate", [".crate"], _unpack_tar), | ("crate", [".crate"], _unpack_tar), | ||||
] | ] | ||||
register_new_archive_formats() | register_new_archive_formats() |