diff --git a/pristine_zip/reference.py b/pristine_zip/reference.py index 058c26c..b14d8d4 100644 --- a/pristine_zip/reference.py +++ b/pristine_zip/reference.py @@ -1,96 +1,97 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Creates a ZIP file deterministically. This can be used a base reference for delta files.""" import os import subprocess from . import common from . import parameters def normalize_timestamps(entries): for entry in entries: os.utime(entry, (0, 0)) def normalize_perms(checkout_dir): # Info-ZIP has an extension that allows storing permissions; but not all # implementations support it (eg. not Python). So let's normalize by # wiping all permissions. for dir_entry in os.scandir(checkout_dir): if dir_entry.is_dir(follow_symlinks=False): os.chmod(dir_entry.path, 0o100755) normalize_perms(dir_entry.path) elif dir_entry.is_file(follow_symlinks=False): os.chmod(dir_entry.path, 0o100644) elif dir_entry.is_symlink(): os.chmod(dir_entry.path, 0o120000) else: assert False, f"Unknown DirEntry type: {dir_entry}" def walk(checkout_dir): entries = [] for (dirpath, dirnames, filenames) in os.walk(checkout_dir): assert dirpath.startswith(checkout_dir) dirpath = dirpath[len(checkout_dir) :].lstrip("/") entries.append(dirpath) paths = [os.path.join(dirpath, filename) for filename in filenames] entries.extend(paths) return entries def compress( executables: common.Executables, encoding_software: parameters.EncodingSoftware, checkout_dir: str, target: str, ): """Generates a reference zipball for the given checked out directory.""" try: os.remove(target) except FileNotFoundError: pass assert os.path.isdir(checkout_dir), checkout_dir entries_str = walk(checkout_dir) normalize_perms(checkout_dir) normalize_timestamps(os.path.join(checkout_dir, entry) for entry in entries_str) # Encode *before* sorting; sorting on unicode changes across configurations. entries = [entry.encode() for entry in entries_str] # Sort entries ourselves; InfoZIP's zip does not guarantee order entries.sort() if encoding_software == parameters.EncodingSoftware.INFOZIP_3_0: # -X = --no-extra, which prevents inclusion of extra non-deterministic # and implementation-dependant data # -o = --latest-time, which sets the modification time of the zip to that # of the most recent file assert executables.infozip_3_0 proc = subprocess.run( [executables.infozip_3_0, "-X", "-o", target, "--names-stdin"], cwd=checkout_dir, input=b"\n".join(entries), + capture_output=True, ) proc.check_returncode() elif encoding_software == parameters.EncodingSoftware.SEVENZIP_6_3: assert executables.sevenzip_6_3 assert target.endswith(".zip") proc = subprocess.run( [executables.sevenzip_6_3, "a", target, *entries], cwd=checkout_dir, capture_output=True, ) proc.check_returncode() # 7zip doesn't provide a way to disable its nondeterministic fields; # let's remove them after the fact. proc = subprocess.run(["strip-nondeterminism", "-t", "zip", target]) diff --git a/pristine_zip/zipball_to_delta.py b/pristine_zip/zipball_to_delta.py index a9b7a21..00b91b3 100644 --- a/pristine_zip/zipball_to_delta.py +++ b/pristine_zip/zipball_to_delta.py @@ -1,170 +1,172 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import shutil import subprocess import tempfile from . import common from . import parameters from . import reference def gendelta( executables: common.Executables, zipball_path: str, delta_path: str, *, strict_guess: bool, ): encoding_software_guesses = parameters.guess_encoding_software( zipball_path, strict_guess=strict_guess ) if len(encoding_software_guesses) == 1: # Only one guess, run it directly _try_gendelta( executables, encoding_software_guesses[0], zipball_path, delta_path ) else: # Multiple guesses, try them in order with tempfile.TemporaryDirectory(prefix="pristine-zip-gendelta") as deltas_dir: best_delta_path = None best_delta_size = None for encoding_software_guess in encoding_software_guesses: current_delta_path = os.path.join( deltas_dir, encoding_software_guess.value.replace(" ", "_") ) _try_gendelta( executables, encoding_software_guess, zipball_path, current_delta_path, ) current_delta_size = os.stat(current_delta_path).st_size if best_delta_size is None or best_delta_size > current_delta_size: if best_delta_path is not None: os.remove(best_delta_path) best_delta_path = current_delta_path best_delta_size = current_delta_size assert best_delta_path shutil.copyfile(best_delta_path, delta_path) def _try_gendelta( executables: common.Executables, encoding_software: parameters.EncodingSoftware, zipball_path: str, delta_path: str, ): with tempfile.TemporaryDirectory(prefix="pristine-zip-gendelta") as work_dir: reference_zipball_path = os.path.join(work_dir, "reference.zip") generate_reference_zipball_from_zipball( executables, encoding_software, zipball_path, reference_zipball_path, work_dir, ) _generate_delta( executables, encoding_software, zipball_path, reference_zipball_path, work_dir, delta_path, ) def _generate_delta( executables: common.Executables, encoding_software: parameters.EncodingSoftware, upstream_zipball_path: str, reference_zipball_path: str, work_dir: str, delta_path: str, ): _generate_xdelta3( upstream_zipball_path, reference_zipball_path, os.path.join(work_dir, "delta") ) with open(os.path.join(work_dir, "type"), "wb") as fd: fd.write(b"zip\n") proc = subprocess.run(["md5sum", reference_zipball_path], capture_output=True) proc.check_returncode() md5sum = proc.stdout.split(b" ", 1)[0] with open(os.path.join(work_dir, "reference_md5sum"), "wb") as fd: fd.write(md5sum + b"\n") with open(os.path.join(work_dir, "encoding_software"), "wb") as fd: fd.write(encoding_software.value.encode() + b"\n") files = ["delta", "encoding_software", "reference_md5sum", "type"] # Make the timestamps in the delta tarball deterministic for file in files: os.utime(os.path.join(work_dir, file), times=(0, 0)) proc = subprocess.run( [ "tar", # make entries in the delta tarball deterministic: "--owner", "0", "--group", "0", "--numeric-owner", "--mode", "644", # generic options: "--create", "--gzip", "-f", delta_path, *files, ], cwd=work_dir, ) proc.check_returncode() def _generate_xdelta3( upstream_zipball_path: str, reference_zipball_path: str, delta_path: str ): """Generates the xdelta3 difference between a reference zipball and the original one.""" proc = subprocess.run( [ "xdelta3", "-e", "-s", reference_zipball_path, upstream_zipball_path, delta_path, ] ) proc.check_returncode() def generate_reference_zipball_from_zipball( executables: common.Executables, encoding_software: parameters.EncodingSoftware, upstream_zipball_path: str, reference_zipball_path: str, work_dir: str, ): """Unzips an upstream zipball and rezips it in a reference zipball.""" checkout_dir = os.path.join(work_dir, "checkout") os.mkdir(checkout_dir) - proc = subprocess.run(["unzip", upstream_zipball_path], cwd=checkout_dir) + proc = subprocess.run( + ["unzip", upstream_zipball_path], cwd=checkout_dir, capture_output=True + ) proc.check_returncode() reference.compress( executables, encoding_software, checkout_dir, reference_zipball_path ) shutil.rmtree(checkout_dir)