diff --git a/pristine_zip/__init__.py b/pristine_zip/__init__.py index 1ffbf5d..0ccb1c7 100644 --- a/pristine_zip/__init__.py +++ b/pristine_zip/__init__.py @@ -1,9 +1,10 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -__all__ = ["gendelta", "genzip"] +__all__ = ["Executables", "gendelta", "genzip"] +from .common import Executables from .delta_to_zipball import genzip from .zipball_to_delta import gendelta diff --git a/pristine_zip/delta_to_zipball.py b/pristine_zip/delta_to_zipball.py index ca8f6fd..561df85 100644 --- a/pristine_zip/delta_to_zipball.py +++ b/pristine_zip/delta_to_zipball.py @@ -1,66 +1,86 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import subprocess import tempfile +from . import common +from . import parameters from . import reference -def genzip(checkout_dir: str, delta_path: str, zipball_path: str): +def genzip( + executables: common.Executables, + checkout_dir: str, + delta_path: str, + zipball_path: str, +): with tempfile.TemporaryDirectory(prefix="pristine-zip-genzip") as work_dir: reference_zipball_path = os.path.join(work_dir, "reference.zip") + encoding_software = _extract_delta(work_dir, delta_path) + # generate reference zipball - reference.compress(checkout_dir, reference_zipball_path) + reference.compress( + executables, encoding_software, checkout_dir, reference_zipball_path + ) - _apply_delta(reference_zipball_path, zipball_path, work_dir, delta_path) + _apply_delta(reference_zipball_path, zipball_path, work_dir) -def _apply_delta( - reference_zipball_path: str, zipball_path: str, work_dir: str, delta_path: str, -): +def _extract_delta(work_dir: str, delta_path: str) -> parameters.EncodingSoftware: proc = subprocess.run(["tar", "--extract", "-f", delta_path,], cwd=work_dir) proc.check_returncode() with open(os.path.join(work_dir, "type"), "rb") as fd: type_ = fd.read().decode().strip() assert type_ == "zip", ( f"Unknown zipball type {type}. Are you" f"extracting a delta from pristine-tar instead of pristine-zip?" ) + with open(os.path.join(work_dir, "encoding_software"), "rb") as fd: + encoding_software_str = fd.read().decode().strip() + + encoding_software = parameters.EncodingSoftware(encoding_software_str) + + return encoding_software + + +def _apply_delta( + reference_zipball_path: str, zipball_path: str, work_dir: str, +): with open(os.path.join(work_dir, "reference_md5sum"), "rb") as fd: expected_md5sum = fd.read().decode().strip() proc = subprocess.run(["md5sum", reference_zipball_path], capture_output=True) proc.check_returncode() actual_md5sum = proc.stdout.decode().split(" ", 1)[0].strip() if actual_md5sum != expected_md5sum: print( f"md5sum mismatch between reference zipballs " f"(expected '{expected_md5sum}', got '{actual_md5sum}').\n" f"This is a bug, please report it along with the original zipball " f"and the version number of pristine-zip." ) exit(1) xdelta3_path = os.path.join(work_dir, "delta") assert os.path.isfile(xdelta3_path), "Missing 'delta' file in delta archive." _apply_xdelta3(reference_zipball_path, zipball_path, xdelta3_path) def _apply_xdelta3(reference_zipball_path: str, zipball_path: str, xdelta3_path: str): try: os.remove(zipball_path) except FileNotFoundError: pass proc = subprocess.run( ["xdelta3", "-d", "-s", reference_zipball_path, xdelta3_path, zipball_path] ) proc.check_returncode() diff --git a/pristine_zip/main.py b/pristine_zip/main.py index 47c2677..1ab6769 100644 --- a/pristine_zip/main.py +++ b/pristine_zip/main.py @@ -1,67 +1,108 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os.path import tempfile import click -from . import reference +from . import common from . import delta_to_zipball +from . import parameters +from . import reference from . import zipball_to_delta @click.group() -def cli(): - pass +@click.option( + "--infozip30", + default="zip", + type=str, + help=( + "Path to an InfoZip 3.0 executable (usually the default 'zip' on " + "Unix-like distributions)." + ), +) +@click.option( + "--7zip63", + "sevenzip63", + default="7z", + type=str, + help=("Path to a 7zip executable. Usually '7z'."), +) +@click.pass_context +def cli(ctx, infozip30, sevenzip63): + ctx.ensure_object(dict) + ctx.obj["executables"] = common.Executables( + infozip_3_0=infozip30, sevenzip_6_3=sevenzip63, + ) @cli.command() @click.argument("zipball", type=click.Path(exists=True, readable=True)) @click.argument("delta", type=click.Path()) -def gendelta(zipball: str, delta: str): +@click.pass_context +def gendelta(ctx, zipball: str, delta: str): """Takes an upstream zipball and generates a small binary delta that can be used to re-generate the zipball.""" - zipball_to_delta.gendelta(os.path.abspath(zipball), os.path.abspath(delta)) + zipball_to_delta.gendelta( + ctx.obj["executables"], os.path.abspath(zipball), os.path.abspath(delta) + ) @cli.command() @click.argument("delta", type=click.Path(exists=True, readable=True)) @click.argument("zipball", type=click.Path()) -def genzip(delta: str, zipball: str): +@click.pass_context +def genzip(ctx, delta: str, zipball: str): """Takes a delta generated by 'pristine-zip gendelta' and reads files from the CWD to generate the exact same zipball as was given to 'gendelta'.""" checkout_dir = os.getcwd() delta_to_zipball.genzip( - checkout_dir, os.path.abspath(delta), os.path.abspath(zipball) + ctx.obj["executables"], + checkout_dir, + os.path.abspath(delta), + os.path.abspath(zipball), ) @cli.command() @click.argument("source_zipball", type=click.Path(exists=True, readable=True)) @click.argument("reference_zipball", type=click.Path()) -def regenzip(source_zipball: str, reference_zipball: str): +@click.pass_context +def regenzip(ctx, source_zipball: str, reference_zipball: str): """Takes a zipball and generates a reference zipball from it.""" with tempfile.TemporaryDirectory() as work_dir: + encoding_software = parameters.guess_encoding_software(source_zipball) zipball_to_delta.generate_reference_zipball_from_zipball( + ctx.obj["executables"], + encoding_software, os.path.abspath(source_zipball), os.path.abspath(reference_zipball), work_dir, ) @cli.command() @click.argument("reference_zipball", type=click.Path()) -def cwdgenzip(reference_zipball: str): +@click.argument( + "encoding_software", + type=click.Choice([es.value for es in parameters.EncodingSoftware]), +) +@click.pass_context +def cwdgenzip(ctx, reference_zipball: str, encoding_software): """Reads the CWD and generates a reference zipball from it.""" checkout_dir = os.getcwd() reference.compress( - checkout_dir, reference_zipball, + ctx.obj["executables"], + parameters.EncodingSoftware(encoding_software), + checkout_dir, + reference_zipball, ) def main(): return cli() diff --git a/pristine_zip/reference.py b/pristine_zip/reference.py index 8b648a8..12352f2 100644 --- a/pristine_zip/reference.py +++ b/pristine_zip/reference.py @@ -1,73 +1,90 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Creates a ZIP file deterministically. This can be used a base reference for delta files.""" import os import subprocess +from . import common +from . import parameters + def normalize_timestamps(entries): for entry in entries: os.utime(entry, (0, 0)) def normalize_perms(checkout_dir): # Info-ZIP has an extension that allows storing permissions; but not all # implementations support it (eg. not Python). So let's normalize by # wiping all permissions. for dir_entry in os.scandir(checkout_dir): if dir_entry.is_dir(follow_symlinks=False): os.chmod(dir_entry.path, 0o100755) normalize_perms(dir_entry.path) elif dir_entry.is_file(follow_symlinks=False): os.chmod(dir_entry.path, 0o100644) elif dir_entry.is_symlink(): os.chmod(dir_entry.path, 0o120000) else: assert False, f"Unknown DirEntry type: {dir_entry}" def walk(checkout_dir): entries = [] for (dirpath, dirnames, filenames) in os.walk(checkout_dir): assert dirpath.startswith(checkout_dir) dirpath = dirpath[len(checkout_dir) :].lstrip("/") entries.append(dirpath) paths = [os.path.join(dirpath, filename) for filename in filenames] entries.extend(paths) return entries -def compress(checkout_dir: str, target: str): +def compress( + executables: common.Executables, + encoding_software: parameters.EncodingSoftware, + checkout_dir: str, + target: str, +): """Generates a reference zipball for the given checked out directory.""" try: os.remove(target) except FileNotFoundError: pass assert os.path.isdir(checkout_dir), checkout_dir entries_str = walk(checkout_dir) normalize_perms(checkout_dir) normalize_timestamps(os.path.join(checkout_dir, entry) for entry in entries_str) # Encode *before* sorting; sorting on unicode changes across configurations. entries = [entry.encode() for entry in entries_str] # Sort entries ourselves; InfoZIP's zip does not guarantee order entries.sort() - # -X = --no-extra, which prevents inclusion of extra non-deterministic - # and implementation-dependant data - # -o = --latest-time, which sets the modification time of the zip to that - # of the most recent file - proc = subprocess.run( - ["zip", "-X", "-o", target, "--names-stdin"], - cwd=checkout_dir, - input=b"\n".join(entries), - ) - proc.check_returncode() + if encoding_software == parameters.EncodingSoftware.INFOZIP_3_0: + # -X = --no-extra, which prevents inclusion of extra non-deterministic + # and implementation-dependant data + # -o = --latest-time, which sets the modification time of the zip to that + # of the most recent file + assert executables.infozip_3_0 + proc = subprocess.run( + [executables.infozip_3_0, "-X", "-o", target, "--names-stdin"], + cwd=checkout_dir, + input=b"\n".join(entries), + ) + proc.check_returncode() + elif encoding_software == parameters.EncodingSoftware.SEVENZIP_6_3: + assert executables.sevenzip_6_3 + assert target.endswith(".zip") + proc = subprocess.run( + [executables.sevenzip_6_3, "a", target, *entries], cwd=checkout_dir, + ) + proc.check_returncode() diff --git a/pristine_zip/zipball_to_delta.py b/pristine_zip/zipball_to_delta.py index 6d8463b..016d2fe 100644 --- a/pristine_zip/zipball_to_delta.py +++ b/pristine_zip/zipball_to_delta.py @@ -1,97 +1,123 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import subprocess import tempfile +from . import common +from . import parameters from . import reference -def gendelta(zipball_path: str, delta_path: str): +def gendelta(executables: common.Executables, zipball_path: str, delta_path: str): + encoding_software = parameters.guess_encoding_software(zipball_path) + with tempfile.TemporaryDirectory(prefix="pristine-zip-gendelta") as work_dir: reference_zipball_path = os.path.join(work_dir, "reference.zip") generate_reference_zipball_from_zipball( - zipball_path, reference_zipball_path, work_dir + executables, + encoding_software, + zipball_path, + reference_zipball_path, + work_dir, + ) + _generate_delta( + executables, + encoding_software, + zipball_path, + reference_zipball_path, + work_dir, + delta_path, ) - _generate_delta(zipball_path, reference_zipball_path, work_dir, delta_path) def _generate_delta( + executables: common.Executables, + encoding_software: parameters.EncodingSoftware, upstream_zipball_path: str, reference_zipball_path: str, work_dir: str, delta_path: str, ): _generate_xdelta3( upstream_zipball_path, reference_zipball_path, os.path.join(work_dir, "delta") ) with open(os.path.join(work_dir, "type"), "wb") as fd: fd.write(b"zip\n") proc = subprocess.run(["md5sum", reference_zipball_path], capture_output=True) proc.check_returncode() md5sum = proc.stdout.split(b" ", 1)[0] with open(os.path.join(work_dir, "reference_md5sum"), "wb") as fd: fd.write(md5sum + b"\n") - files = ["delta", "reference_md5sum", "type"] + with open(os.path.join(work_dir, "encoding_software"), "wb") as fd: + fd.write(encoding_software.value.encode() + b"\n") + + files = ["delta", "encoding_software", "reference_md5sum", "type"] # Make the timestamps in the delta tarball deterministic for file in files: os.utime(os.path.join(work_dir, file), times=(0, 0)) proc = subprocess.run( [ "tar", # make entries in the delta tarball deterministic: "--owner", "0", "--group", "0", "--numeric-owner", "--mode", "644", # generic options: "--create", "--gzip", "-f", delta_path, *files, ], cwd=work_dir, ) proc.check_returncode() def _generate_xdelta3( upstream_zipball_path: str, reference_zipball_path: str, delta_path: str ): """Generates the xdelta3 difference between a reference zipball and the original one.""" proc = subprocess.run( [ "xdelta3", "-e", "-s", reference_zipball_path, upstream_zipball_path, delta_path, ] ) proc.check_returncode() def generate_reference_zipball_from_zipball( - upstream_zipball_path: str, reference_zipball_path: str, work_dir: str + executables: common.Executables, + encoding_software: parameters.EncodingSoftware, + upstream_zipball_path: str, + reference_zipball_path: str, + work_dir: str, ): """Unzips an upstream zipball and rezips it in a reference zipball.""" checkout_dir = os.path.join(work_dir, "checkout") os.mkdir(checkout_dir) proc = subprocess.run(["unzip", upstream_zipball_path], cwd=checkout_dir) proc.check_returncode() - reference.compress(checkout_dir, reference_zipball_path) + reference.compress( + executables, encoding_software, checkout_dir, reference_zipball_path + )