Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9344791
tarball.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
6 KB
Subscribers
None
tarball.py
View Options
# Copyright (C) 2015-2021 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import
os
import
shutil
import
stat
from
subprocess
import
run
import
tarfile
import
zipfile
import
magic
from
.
import
utils
def
_unpack_tar
(
tarpath
:
str
,
extract_dir
:
str
)
->
str
:
"""Unpack tarballs unsupported by the standard python library. Examples
include tar.Z, tar.lz, tar.x, etc....
As this implementation relies on the `tar` command, this function supports
the same compression the tar command supports.
This expects the `extract_dir` to exist.
Raises:
shutil.ReadError in case of issue uncompressing the archive (tarpath
does not exist, extract_dir does not exist, etc...)
Returns:
full path to the uncompressed directory.
"""
try
:
run
([
"tar"
,
"xf"
,
tarpath
,
"-C"
,
extract_dir
],
check
=
True
)
return
extract_dir
except
Exception
as
e
:
raise
shutil
.
ReadError
(
f
"Unable to uncompress {tarpath} to {extract_dir}. Reason: {e}"
)
def
_unpack_zip
(
zippath
:
str
,
extract_dir
:
str
)
->
str
:
"""Unpack zip files unsupported by the standard python library, for instance
those with legacy compression type 6 (implode).
This expects the `extract_dir` to exist.
Raises:
shutil.ReadError in case of issue uncompressing the archive (zippath
does not exist, extract_dir does not exist, etc...)
Returns:
full path to the uncompressed directory.
"""
try
:
run
([
"unzip"
,
"-q"
,
"-d"
,
extract_dir
,
zippath
],
check
=
True
)
return
extract_dir
except
Exception
as
e
:
raise
shutil
.
ReadError
(
f
"Unable to uncompress {zippath} to {extract_dir}. Reason: {e}"
)
def
register_new_archive_formats
():
"""Register new archive formats to uncompress
"""
registered_formats
=
[
f
[
0
]
for
f
in
shutil
.
get_unpack_formats
()]
for
name
,
extensions
,
function
in
ADDITIONAL_ARCHIVE_FORMATS
:
if
name
in
registered_formats
:
continue
shutil
.
register_unpack_format
(
name
,
extensions
,
function
)
_mime_to_archive_format
=
{
"application/x-compress"
:
"tar.Z|x"
,
"application/x-tar"
:
"tar"
,
"application/x-bzip2"
:
"bztar"
,
"application/gzip"
:
"gztar"
,
"application/x-lzip"
:
"tar.lz"
,
"application/zip"
:
"zip"
,
}
def
uncompress
(
tarpath
:
str
,
dest
:
str
):
"""Uncompress tarpath to dest folder if tarball is supported.
Note that this fixes permissions after successfully
uncompressing the archive.
Args:
tarpath: path to tarball to uncompress
dest: the destination folder where to uncompress the tarball,
it will be created if it does not exist
Raises:
ValueError when a problem occurs during unpacking
"""
try
:
os
.
makedirs
(
dest
,
exist_ok
=
True
)
format
=
None
# try to get archive format from extension
for
format_
,
exts
,
_
in
shutil
.
get_unpack_formats
():
if
any
([
tarpath
.
lower
()
.
endswith
(
ext
.
lower
())
for
ext
in
exts
]):
format
=
format_
break
# try to get archive format from file mimetype
if
format
is
None
:
m
=
magic
.
Magic
(
mime
=
True
)
mime
=
m
.
from_file
(
tarpath
)
format
=
_mime_to_archive_format
.
get
(
mime
)
shutil
.
unpack_archive
(
tarpath
,
extract_dir
=
dest
,
format
=
format
)
except
shutil
.
ReadError
as
e
:
raise
ValueError
(
f
"Problem during unpacking {tarpath}. Reason: {e}"
)
except
NotImplementedError
:
if
tarpath
.
lower
()
.
endswith
(
".zip"
)
or
format
==
"zip"
:
_unpack_zip
(
tarpath
,
dest
)
else
:
raise
normalize_permissions
(
dest
)
def
normalize_permissions
(
path
:
str
):
"""Normalize the permissions of all files and directories under `path`.
This makes all subdirectories and files with the user executable bit set mode
0o0755, and all other files mode 0o0644.
Args:
path: the path under which permissions should be normalized
"""
for
dirpath
,
_
,
fnames
in
os
.
walk
(
path
):
os
.
chmod
(
dirpath
,
0
o0755
)
for
fname
in
fnames
:
fpath
=
os
.
path
.
join
(
dirpath
,
fname
)
if
not
os
.
path
.
islink
(
fpath
):
is_executable
=
os
.
stat
(
fpath
)
.
st_mode
&
stat
.
S_IXUSR
forced_mode
=
0
o0755
if
is_executable
else
0
o0644
os
.
chmod
(
fpath
,
forced_mode
)
def
_ls
(
rootdir
):
"""Generator of filepath, filename from rootdir.
"""
for
dirpath
,
dirnames
,
fnames
in
os
.
walk
(
rootdir
):
for
fname
in
dirnames
+
fnames
:
fpath
=
os
.
path
.
join
(
dirpath
,
fname
)
fname
=
utils
.
commonname
(
rootdir
,
fpath
)
yield
fpath
,
fname
def
_compress_zip
(
tarpath
,
files
):
"""Compress dirpath's content as tarpath.
"""
with
zipfile
.
ZipFile
(
tarpath
,
"w"
)
as
z
:
for
fpath
,
fname
in
files
:
z
.
write
(
fpath
,
arcname
=
fname
)
def
_compress_tar
(
tarpath
,
files
):
"""Compress dirpath's content as tarpath.
"""
with
tarfile
.
open
(
tarpath
,
"w:bz2"
)
as
t
:
for
fpath
,
fname
in
files
:
t
.
add
(
fpath
,
arcname
=
fname
,
recursive
=
False
)
def
compress
(
tarpath
,
nature
,
dirpath_or_files
):
"""Create a tarball tarpath with nature nature.
The content of the tarball is either dirpath's content (if representing
a directory path) or dirpath's iterable contents.
Compress the directory dirpath's content to a tarball.
The tarball being dumped at tarpath.
The nature of the tarball is determined by the nature argument.
"""
if
isinstance
(
dirpath_or_files
,
str
):
files
=
_ls
(
dirpath_or_files
)
else
:
# iterable of 'filepath, filename'
files
=
dirpath_or_files
if
nature
==
"zip"
:
_compress_zip
(
tarpath
,
files
)
else
:
_compress_tar
(
tarpath
,
files
)
return
tarpath
# Additional uncompression archive format support
ADDITIONAL_ARCHIVE_FORMATS
=
[
# name, extensions, function
(
"tar.Z|x"
,
[
".tar.Z"
,
".tar.x"
],
_unpack_tar
),
(
"jar"
,
[
".jar"
],
_unpack_zip
),
(
"tbz2"
,
[
".tbz"
,
"tbz2"
],
_unpack_tar
),
# FIXME: make this optional depending on the runtime lzip package install
(
"tar.lz"
,
[
".tar.lz"
],
_unpack_tar
),
(
"crate"
,
[
".crate"
],
_unpack_tar
),
]
register_new_archive_formats
()
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Fri, Jul 4, 2:48 PM (3 d, 14 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3295789
Attached To
rDCORE Foundations and core functionalities
Event Timeline
Log In to Comment