Changeset View
Changeset View
Standalone View
Standalone View
swh/core/tests/test_tarball.py
Show All 21 Lines | for format_id in tarball.ADDITIONAL_ARCHIVE_FORMATS: | ||||
name = format_id[0] | name = format_id[0] | ||||
if name in registered_formats: | if name in registered_formats: | ||||
shutil.unregister_unpack_format(name) | shutil.unregister_unpack_format(name) | ||||
return shutil | return shutil | ||||
def test_compress_uncompress_zip(tmp_path): | def test_compress_uncompress_zip(tmp_path): | ||||
tocompress = tmp_path / 'compressme' | tocompress = tmp_path / "compressme" | ||||
tocompress.mkdir() | tocompress.mkdir() | ||||
for i in range(10): | for i in range(10): | ||||
fpath = tocompress / ('file%s.txt' % i) | fpath = tocompress / ("file%s.txt" % i) | ||||
fpath.write_text('content of file %s' % i) | fpath.write_text("content of file %s" % i) | ||||
zipfile = tmp_path / 'archive.zip' | zipfile = tmp_path / "archive.zip" | ||||
tarball.compress(str(zipfile), 'zip', str(tocompress)) | tarball.compress(str(zipfile), "zip", str(tocompress)) | ||||
destdir = tmp_path / 'destdir' | destdir = tmp_path / "destdir" | ||||
tarball.uncompress(str(zipfile), str(destdir)) | tarball.uncompress(str(zipfile), str(destdir)) | ||||
lsdir = sorted(x.name for x in destdir.iterdir()) | lsdir = sorted(x.name for x in destdir.iterdir()) | ||||
assert ['file%s.txt' % i for i in range(10)] == lsdir | assert ["file%s.txt" % i for i in range(10)] == lsdir | ||||
def test_compress_uncompress_tar(tmp_path): | def test_compress_uncompress_tar(tmp_path): | ||||
tocompress = tmp_path / 'compressme' | tocompress = tmp_path / "compressme" | ||||
tocompress.mkdir() | tocompress.mkdir() | ||||
for i in range(10): | for i in range(10): | ||||
fpath = tocompress / ('file%s.txt' % i) | fpath = tocompress / ("file%s.txt" % i) | ||||
fpath.write_text('content of file %s' % i) | fpath.write_text("content of file %s" % i) | ||||
tarfile = tmp_path / 'archive.tar' | tarfile = tmp_path / "archive.tar" | ||||
tarball.compress(str(tarfile), 'tar', str(tocompress)) | tarball.compress(str(tarfile), "tar", str(tocompress)) | ||||
destdir = tmp_path / 'destdir' | destdir = tmp_path / "destdir" | ||||
tarball.uncompress(str(tarfile), str(destdir)) | tarball.uncompress(str(tarfile), str(destdir)) | ||||
lsdir = sorted(x.name for x in destdir.iterdir()) | lsdir = sorted(x.name for x in destdir.iterdir()) | ||||
assert ['file%s.txt' % i for i in range(10)] == lsdir | assert ["file%s.txt" % i for i in range(10)] == lsdir | ||||
def test__unpack_tar_failure(tmp_path, datadir): | def test__unpack_tar_failure(tmp_path, datadir): | ||||
"""Unpack inexistent tarball should fail | """Unpack inexistent tarball should fail | ||||
""" | """ | ||||
tarpath = os.path.join(datadir, 'archives', 'inexistent-archive.tar.Z') | tarpath = os.path.join(datadir, "archives", "inexistent-archive.tar.Z") | ||||
assert not os.path.exists(tarpath) | assert not os.path.exists(tarpath) | ||||
with pytest.raises(shutil.ReadError, | with pytest.raises( | ||||
match=f'Unable to uncompress {tarpath} to {tmp_path}'): | shutil.ReadError, match=f"Unable to uncompress {tarpath} to {tmp_path}" | ||||
): | |||||
tarball._unpack_tar(tarpath, tmp_path) | tarball._unpack_tar(tarpath, tmp_path) | ||||
def test__unpack_tar_failure2(tmp_path, datadir): | def test__unpack_tar_failure2(tmp_path, datadir): | ||||
"""Unpack Existent tarball into an inexistent folder should fail | """Unpack Existent tarball into an inexistent folder should fail | ||||
""" | """ | ||||
filename = 'groff-1.02.tar.Z' | filename = "groff-1.02.tar.Z" | ||||
tarpath = os.path.join(datadir, 'archives', filename) | tarpath = os.path.join(datadir, "archives", filename) | ||||
assert os.path.exists(tarpath) | assert os.path.exists(tarpath) | ||||
extract_dir = os.path.join(tmp_path, 'dir', 'inexistent') | extract_dir = os.path.join(tmp_path, "dir", "inexistent") | ||||
with pytest.raises(shutil.ReadError, | with pytest.raises( | ||||
match=f'Unable to uncompress {tarpath} to {tmp_path}'): | shutil.ReadError, match=f"Unable to uncompress {tarpath} to {tmp_path}" | ||||
): | |||||
tarball._unpack_tar(tarpath, extract_dir) | tarball._unpack_tar(tarpath, extract_dir) | ||||
def test__unpack_tar_failure3(tmp_path, datadir): | def test__unpack_tar_failure3(tmp_path, datadir): | ||||
"""Unpack unsupported tarball should fail | """Unpack unsupported tarball should fail | ||||
""" | """ | ||||
filename = 'hello.zip' | filename = "hello.zip" | ||||
tarpath = os.path.join(datadir, 'archives', filename) | tarpath = os.path.join(datadir, "archives", filename) | ||||
assert os.path.exists(tarpath) | assert os.path.exists(tarpath) | ||||
with pytest.raises(shutil.ReadError, | with pytest.raises( | ||||
match=f'Unable to uncompress {tarpath} to {tmp_path}'): | shutil.ReadError, match=f"Unable to uncompress {tarpath} to {tmp_path}" | ||||
): | |||||
tarball._unpack_tar(tarpath, tmp_path) | tarball._unpack_tar(tarpath, tmp_path) | ||||
def test__unpack_tar(tmp_path, datadir): | def test__unpack_tar(tmp_path, datadir): | ||||
"""Unpack supported tarball into an existent folder should be ok | """Unpack supported tarball into an existent folder should be ok | ||||
""" | """ | ||||
filename = 'groff-1.02.tar.Z' | filename = "groff-1.02.tar.Z" | ||||
tarpath = os.path.join(datadir, 'archives', filename) | tarpath = os.path.join(datadir, "archives", filename) | ||||
assert os.path.exists(tarpath) | assert os.path.exists(tarpath) | ||||
extract_dir = os.path.join(tmp_path, filename) | extract_dir = os.path.join(tmp_path, filename) | ||||
os.makedirs(extract_dir, exist_ok=True) | os.makedirs(extract_dir, exist_ok=True) | ||||
output_directory = tarball._unpack_tar(tarpath, extract_dir) | output_directory = tarball._unpack_tar(tarpath, extract_dir) | ||||
Show All 17 Lines | def test_register_new_archive_formats(prepare_shutil_state): | ||||
for format_id in tarball.ADDITIONAL_ARCHIVE_FORMATS: | for format_id in tarball.ADDITIONAL_ARCHIVE_FORMATS: | ||||
assert format_id[0] in unpack_formats_v2 | assert format_id[0] in unpack_formats_v2 | ||||
def test_uncompress_tarpaths(tmp_path, datadir, prepare_shutil_state): | def test_uncompress_tarpaths(tmp_path, datadir, prepare_shutil_state): | ||||
"""High level call uncompression on un/supported tarballs | """High level call uncompression on un/supported tarballs | ||||
""" | """ | ||||
archive_dir = os.path.join(datadir, 'archives') | archive_dir = os.path.join(datadir, "archives") | ||||
tarfiles = os.listdir(archive_dir) | tarfiles = os.listdir(archive_dir) | ||||
tarpaths = [os.path.join(archive_dir, tarfile) for tarfile in tarfiles] | tarpaths = [os.path.join(archive_dir, tarfile) for tarfile in tarfiles] | ||||
unsupported_tarpaths = [] | unsupported_tarpaths = [] | ||||
for t in tarpaths: | for t in tarpaths: | ||||
if t.endswith('.Z') or t.endswith('.x') or t.endswith('.lz'): | if t.endswith(".Z") or t.endswith(".x") or t.endswith(".lz"): | ||||
unsupported_tarpaths.append(t) | unsupported_tarpaths.append(t) | ||||
# not supported yet | # not supported yet | ||||
for tarpath in unsupported_tarpaths: | for tarpath in unsupported_tarpaths: | ||||
with pytest.raises(ValueError, | with pytest.raises(ValueError, match=f"Problem during unpacking {tarpath}."): | ||||
match=f'Problem during unpacking {tarpath}.'): | |||||
tarball.uncompress(tarpath, dest=tmp_path) | tarball.uncompress(tarpath, dest=tmp_path) | ||||
# register those unsupported formats | # register those unsupported formats | ||||
tarball.register_new_archive_formats() | tarball.register_new_archive_formats() | ||||
# unsupported formats are now supported | # unsupported formats are now supported | ||||
for n, tarpath in enumerate(tarpaths, start=1): | for n, tarpath in enumerate(tarpaths, start=1): | ||||
tarball.uncompress(tarpath, dest=tmp_path) | tarball.uncompress(tarpath, dest=tmp_path) | ||||
assert n == len(tarpaths) | assert n == len(tarpaths) |