diff --git a/pristine_zip/main.py b/pristine_zip/main.py index bc7b486..8f28832 100644 --- a/pristine_zip/main.py +++ b/pristine_zip/main.py @@ -1,141 +1,147 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import functools import os.path import tempfile import click from . import common from . import delta_to_zipball from . import parameters from . import reference from . import zipball_to_delta def print_pristinezip_exceptions(f): """Decorator for CLI functions to display PristineZipException exception nicely instead of showing a traceback.""" @functools.wraps(f) def newf(*args, **kwargs): try: return f(*args, **kwargs) except common.PristineZipException as e: raise click.ClickException(*e.args) return newf @click.group() @click.option( "--infozip30", default="zip", type=str, help=( "Path to an InfoZip 3.0 executable (usually the default 'zip' on " "Unix-like distributions)." ), ) @click.option( "--7zip63", "sevenzip63", default="7z", type=str, help="Path to a 7zip executable. Usually '7z'.", ) @click.option( "--strict-guess/--lax-guess", default=True, help=( "Whether pristine-zip should try guess the encoding software even when " "it has no idea what it is." ), ) @click.pass_context def cli(ctx, infozip30, sevenzip63, strict_guess): ctx.ensure_object(dict) ctx.obj["strict_guess"] = strict_guess ctx.obj["executables"] = common.Executables( infozip_3_0=infozip30, sevenzip_6_3=sevenzip63, ) @cli.command() @click.argument("zipball", type=click.Path(exists=True, readable=True)) @click.argument("delta", type=click.Path()) @click.pass_context @print_pristinezip_exceptions def gendelta(ctx, zipball: str, delta: str): """Takes an upstream zipball and generates a small binary delta that can be used to re-generate the zipball.""" zipball_to_delta.gendelta( ctx.obj["executables"], os.path.abspath(zipball), os.path.abspath(delta), strict_guess=ctx.obj["strict_guess"], ) @cli.command() @click.argument("delta", type=click.Path(exists=True, readable=True)) @click.argument("zipball", type=click.Path()) @click.pass_context @print_pristinezip_exceptions def genzip(ctx, delta: str, zipball: str): """Takes a delta generated by 'pristine-zip gendelta' and reads files from the CWD to generate the exact same zipball as was given to 'gendelta'.""" checkout_dir = os.getcwd() delta_to_zipball.genzip( ctx.obj["executables"], checkout_dir, os.path.abspath(delta), os.path.abspath(zipball), ) @cli.command() @click.argument("source_zipball", type=click.Path(exists=True, readable=True)) -@click.argument("reference_zipball", type=click.Path()) +@click.argument("reference_zipballs_dir", type=click.Path()) @click.pass_context @print_pristinezip_exceptions -def regenzip(ctx, source_zipball: str, reference_zipball: str): +def regenzip(ctx, source_zipball: str, reference_zipballs_dir: str): """Takes a zipball and generates a reference zipball from it.""" with tempfile.TemporaryDirectory() as work_dir: - encoding_software = parameters.guess_encoding_software( + encoding_software_guesses = parameters.guess_encoding_software( source_zipball, strict_guess=ctx.obj["strict_guess"], ) - zipball_to_delta.generate_reference_zipball_from_zipball( - ctx.obj["executables"], - encoding_software, - os.path.abspath(source_zipball), - os.path.abspath(reference_zipball), - work_dir, - ) + os.mkdir(reference_zipballs_dir) + for encoding_software_guess in encoding_software_guesses: + reference_path = os.path.join( + os.path.abspath(reference_zipballs_dir), + encoding_software_guess.value.replace(" ", "_") + ".zip", + ) + zipball_to_delta.generate_reference_zipball_from_zipball( + ctx.obj["executables"], + encoding_software_guess, + os.path.abspath(source_zipball), + reference_path, + work_dir, + ) @cli.command() @click.argument("reference_zipball", type=click.Path()) @click.argument( "encoding_software", type=click.Choice([es.value for es in parameters.EncodingSoftware]), ) @click.pass_context @print_pristinezip_exceptions def cwdgenzip(ctx, reference_zipball: str, encoding_software): """Reads the CWD and generates a reference zipball from it.""" checkout_dir = os.getcwd() reference.compress( ctx.obj["executables"], parameters.EncodingSoftware(encoding_software), checkout_dir, reference_zipball, ) def main(): return cli() diff --git a/pristine_zip/zipball_to_delta.py b/pristine_zip/zipball_to_delta.py index 407f065..a9b7a21 100644 --- a/pristine_zip/zipball_to_delta.py +++ b/pristine_zip/zipball_to_delta.py @@ -1,131 +1,170 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os +import shutil import subprocess import tempfile from . import common from . import parameters from . import reference def gendelta( executables: common.Executables, zipball_path: str, delta_path: str, *, strict_guess: bool, ): - encoding_software = parameters.guess_encoding_software( + encoding_software_guesses = parameters.guess_encoding_software( zipball_path, strict_guess=strict_guess ) + if len(encoding_software_guesses) == 1: + # Only one guess, run it directly + _try_gendelta( + executables, encoding_software_guesses[0], zipball_path, delta_path + ) + else: + # Multiple guesses, try them in order + with tempfile.TemporaryDirectory(prefix="pristine-zip-gendelta") as deltas_dir: + best_delta_path = None + best_delta_size = None + for encoding_software_guess in encoding_software_guesses: + current_delta_path = os.path.join( + deltas_dir, encoding_software_guess.value.replace(" ", "_") + ) + _try_gendelta( + executables, + encoding_software_guess, + zipball_path, + current_delta_path, + ) + current_delta_size = os.stat(current_delta_path).st_size + if best_delta_size is None or best_delta_size > current_delta_size: + if best_delta_path is not None: + os.remove(best_delta_path) + best_delta_path = current_delta_path + best_delta_size = current_delta_size + + assert best_delta_path + shutil.copyfile(best_delta_path, delta_path) + + +def _try_gendelta( + executables: common.Executables, + encoding_software: parameters.EncodingSoftware, + zipball_path: str, + delta_path: str, +): with tempfile.TemporaryDirectory(prefix="pristine-zip-gendelta") as work_dir: reference_zipball_path = os.path.join(work_dir, "reference.zip") generate_reference_zipball_from_zipball( executables, encoding_software, zipball_path, reference_zipball_path, work_dir, ) _generate_delta( executables, encoding_software, zipball_path, reference_zipball_path, work_dir, delta_path, ) def _generate_delta( executables: common.Executables, encoding_software: parameters.EncodingSoftware, upstream_zipball_path: str, reference_zipball_path: str, work_dir: str, delta_path: str, ): _generate_xdelta3( upstream_zipball_path, reference_zipball_path, os.path.join(work_dir, "delta") ) with open(os.path.join(work_dir, "type"), "wb") as fd: fd.write(b"zip\n") proc = subprocess.run(["md5sum", reference_zipball_path], capture_output=True) proc.check_returncode() md5sum = proc.stdout.split(b" ", 1)[0] with open(os.path.join(work_dir, "reference_md5sum"), "wb") as fd: fd.write(md5sum + b"\n") with open(os.path.join(work_dir, "encoding_software"), "wb") as fd: fd.write(encoding_software.value.encode() + b"\n") files = ["delta", "encoding_software", "reference_md5sum", "type"] # Make the timestamps in the delta tarball deterministic for file in files: os.utime(os.path.join(work_dir, file), times=(0, 0)) proc = subprocess.run( [ "tar", # make entries in the delta tarball deterministic: "--owner", "0", "--group", "0", "--numeric-owner", "--mode", "644", # generic options: "--create", "--gzip", "-f", delta_path, *files, ], cwd=work_dir, ) proc.check_returncode() def _generate_xdelta3( upstream_zipball_path: str, reference_zipball_path: str, delta_path: str ): """Generates the xdelta3 difference between a reference zipball and the original one.""" proc = subprocess.run( [ "xdelta3", "-e", "-s", reference_zipball_path, upstream_zipball_path, delta_path, ] ) proc.check_returncode() def generate_reference_zipball_from_zipball( executables: common.Executables, encoding_software: parameters.EncodingSoftware, upstream_zipball_path: str, reference_zipball_path: str, work_dir: str, ): """Unzips an upstream zipball and rezips it in a reference zipball.""" checkout_dir = os.path.join(work_dir, "checkout") os.mkdir(checkout_dir) proc = subprocess.run(["unzip", upstream_zipball_path], cwd=checkout_dir) proc.check_returncode() reference.compress( executables, encoding_software, checkout_dir, reference_zipball_path ) + shutil.rmtree(checkout_dir)