diff --git a/pristine_zip/__init__.py b/pristine_zip/__init__.py index 0ccb1c7..d5b0b09 100644 --- a/pristine_zip/__init__.py +++ b/pristine_zip/__init__.py @@ -1,10 +1,10 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -__all__ = ["Executables", "gendelta", "genzip"] +__all__ = ["Executables", "PristineZipException", "gendelta", "genzip"] -from .common import Executables +from .common import Executables, PristineZipException from .delta_to_zipball import genzip from .zipball_to_delta import gendelta diff --git a/pristine_zip/delta_to_zipball.py b/pristine_zip/delta_to_zipball.py index 561df85..838dbc3 100644 --- a/pristine_zip/delta_to_zipball.py +++ b/pristine_zip/delta_to_zipball.py @@ -1,86 +1,93 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import subprocess import tempfile from . import common from . import parameters from . import reference def genzip( executables: common.Executables, checkout_dir: str, delta_path: str, zipball_path: str, ): with tempfile.TemporaryDirectory(prefix="pristine-zip-genzip") as work_dir: reference_zipball_path = os.path.join(work_dir, "reference.zip") encoding_software = _extract_delta(work_dir, delta_path) # generate reference zipball reference.compress( executables, encoding_software, checkout_dir, reference_zipball_path ) _apply_delta(reference_zipball_path, zipball_path, work_dir) def _extract_delta(work_dir: str, delta_path: str) -> parameters.EncodingSoftware: proc = subprocess.run(["tar", "--extract", "-f", delta_path,], cwd=work_dir) proc.check_returncode() with open(os.path.join(work_dir, "type"), "rb") as fd: type_ = fd.read().decode().strip() assert type_ == "zip", ( f"Unknown zipball type {type}. Are you" f"extracting a delta from pristine-tar instead of pristine-zip?" ) with open(os.path.join(work_dir, "encoding_software"), "rb") as fd: encoding_software_str = fd.read().decode().strip() - encoding_software = parameters.EncodingSoftware(encoding_software_str) + try: + encoding_software = parameters.EncodingSoftware(encoding_software_str) + except ValueError: + raise common.PristineZipException( + f"Delta file refers to unknown encoding software: " + f"{encoding_software_str}. It may have been created by a newer version " + f"of pristine-zip than the one you are currently running." + ) return encoding_software def _apply_delta( reference_zipball_path: str, zipball_path: str, work_dir: str, ): with open(os.path.join(work_dir, "reference_md5sum"), "rb") as fd: expected_md5sum = fd.read().decode().strip() proc = subprocess.run(["md5sum", reference_zipball_path], capture_output=True) proc.check_returncode() actual_md5sum = proc.stdout.decode().split(" ", 1)[0].strip() if actual_md5sum != expected_md5sum: print( f"md5sum mismatch between reference zipballs " f"(expected '{expected_md5sum}', got '{actual_md5sum}').\n" f"This is a bug, please report it along with the original zipball " f"and the version number of pristine-zip." ) exit(1) xdelta3_path = os.path.join(work_dir, "delta") assert os.path.isfile(xdelta3_path), "Missing 'delta' file in delta archive." _apply_xdelta3(reference_zipball_path, zipball_path, xdelta3_path) def _apply_xdelta3(reference_zipball_path: str, zipball_path: str, xdelta3_path: str): try: os.remove(zipball_path) except FileNotFoundError: pass proc = subprocess.run( ["xdelta3", "-d", "-s", reference_zipball_path, xdelta3_path, zipball_path] ) proc.check_returncode() diff --git a/pristine_zip/main.py b/pristine_zip/main.py index 1ab6769..cc65e67 100644 --- a/pristine_zip/main.py +++ b/pristine_zip/main.py @@ -1,108 +1,127 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import functools import os.path import tempfile import click from . import common from . import delta_to_zipball from . import parameters from . import reference from . import zipball_to_delta +def print_pristinezip_exceptions(f): + """Decorator for CLI functions to display PristineZipException exception + nicely instead of showing a traceback.""" + + @functools.wraps(f) + def newf(*args, **kwargs): + try: + return f(*args, **kwargs) + except common.PristineZipException as e: + raise click.ClickException(*e.args) + + return newf + + @click.group() @click.option( "--infozip30", default="zip", type=str, help=( "Path to an InfoZip 3.0 executable (usually the default 'zip' on " "Unix-like distributions)." ), ) @click.option( "--7zip63", "sevenzip63", default="7z", type=str, help=("Path to a 7zip executable. Usually '7z'."), ) @click.pass_context def cli(ctx, infozip30, sevenzip63): ctx.ensure_object(dict) ctx.obj["executables"] = common.Executables( infozip_3_0=infozip30, sevenzip_6_3=sevenzip63, ) @cli.command() @click.argument("zipball", type=click.Path(exists=True, readable=True)) @click.argument("delta", type=click.Path()) @click.pass_context +@print_pristinezip_exceptions def gendelta(ctx, zipball: str, delta: str): """Takes an upstream zipball and generates a small binary delta that can be used to re-generate the zipball.""" zipball_to_delta.gendelta( ctx.obj["executables"], os.path.abspath(zipball), os.path.abspath(delta) ) @cli.command() @click.argument("delta", type=click.Path(exists=True, readable=True)) @click.argument("zipball", type=click.Path()) @click.pass_context +@print_pristinezip_exceptions def genzip(ctx, delta: str, zipball: str): """Takes a delta generated by 'pristine-zip gendelta' and reads files from the CWD to generate the exact same zipball as was given to 'gendelta'.""" checkout_dir = os.getcwd() delta_to_zipball.genzip( ctx.obj["executables"], checkout_dir, os.path.abspath(delta), os.path.abspath(zipball), ) @cli.command() @click.argument("source_zipball", type=click.Path(exists=True, readable=True)) @click.argument("reference_zipball", type=click.Path()) @click.pass_context +@print_pristinezip_exceptions def regenzip(ctx, source_zipball: str, reference_zipball: str): """Takes a zipball and generates a reference zipball from it.""" with tempfile.TemporaryDirectory() as work_dir: encoding_software = parameters.guess_encoding_software(source_zipball) zipball_to_delta.generate_reference_zipball_from_zipball( ctx.obj["executables"], encoding_software, os.path.abspath(source_zipball), os.path.abspath(reference_zipball), work_dir, ) @cli.command() @click.argument("reference_zipball", type=click.Path()) @click.argument( "encoding_software", type=click.Choice([es.value for es in parameters.EncodingSoftware]), ) @click.pass_context +@print_pristinezip_exceptions def cwdgenzip(ctx, reference_zipball: str, encoding_software): """Reads the CWD and generates a reference zipball from it.""" checkout_dir = os.getcwd() reference.compress( ctx.obj["executables"], parameters.EncodingSoftware(encoding_software), checkout_dir, reference_zipball, ) def main(): return cli()