diff --git a/pristine_zip/__init__.py b/pristine_zip/__init__.py index 045a05f..1ffbf5d 100644 --- a/pristine_zip/__init__.py +++ b/pristine_zip/__init__.py @@ -1,10 +1,9 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -__all__ = ["MultipleRootDirectoriesError", "gendelta", "genzip"] +__all__ = ["gendelta", "genzip"] -from .common import MultipleRootDirectoriesError from .delta_to_zipball import genzip from .zipball_to_delta import gendelta diff --git a/pristine_zip/common.py b/pristine_zip/common.py deleted file mode 100644 index cfa220b..0000000 --- a/pristine_zip/common.py +++ /dev/null @@ -1,8 +0,0 @@ -# Copyright (C) 2020 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - - -class MultipleRootDirectoriesError(Exception): - pass diff --git a/pristine_zip/compress.py b/pristine_zip/compress.py index 524468a..0acd1a0 100644 --- a/pristine_zip/compress.py +++ b/pristine_zip/compress.py @@ -1,41 +1,41 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Creates a ZIP file deterministically. This can be used a base reference for delta files.""" import os import subprocess def walk(checkout_dir): entries = [] for (dirpath, dirnames, filenames) in os.walk(checkout_dir): assert dirpath.startswith(checkout_dir) dirpath = dirpath[len(checkout_dir) :].lstrip("/") entries.append(dirpath) entries.extend(os.path.join(dirpath, filename) for filename in filenames) return entries -def compress(checkout_dir: str, dir_name: str, target: str): +def compress(checkout_dir: str, target: str): """Generates a reference zipball for the given checked out directory.""" assert not os.path.isfile(target), target assert os.path.isdir(checkout_dir), checkout_dir # Encode *before* sorting; sorting on unicode changes across configurations. entries = [entry.encode() for entry in walk(checkout_dir)] # Sort entries ourselves; InfoZIP's zip does not guarantee order entries.sort() # -X = --no-extra, which prevents inclusion of extra non-deterministic # and implementation-dependant data proc = subprocess.run( ["zip", "-X", target, "--names-stdin"], cwd=checkout_dir, input=b"\n".join(entries), ) proc.check_returncode() diff --git a/pristine_zip/delta_to_zipball.py b/pristine_zip/delta_to_zipball.py index 2fb0e39..5255dcc 100644 --- a/pristine_zip/delta_to_zipball.py +++ b/pristine_zip/delta_to_zipball.py @@ -1,60 +1,46 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import subprocess import tempfile from . import compress -from . import common - - -def get_checkout_root(checkout_dir: str) -> str: - root_dirs = list(os.listdir(checkout_dir)) - try: - (root_dir,) = root_dirs - except ValueError: - raise common.MultipleRootDirectoriesError( - f"CWD has {len(root_dirs)} root directories, expected 1.", - ) from None - - return root_dir def genzip(checkout_dir: str, delta_path: str, zipball_path: str): with tempfile.TemporaryDirectory(prefix="pristine-zip-genzip") as work_dir: reference_zipball_path = os.path.join(work_dir, "reference.zip") # generate reference zipball - root_dir = get_checkout_root(checkout_dir) - compress.compress(checkout_dir, root_dir, reference_zipball_path) + compress.compress(checkout_dir, reference_zipball_path) _apply_delta(reference_zipball_path, zipball_path, work_dir, delta_path) def _apply_delta( reference_zipball_path: str, zipball_path: str, work_dir: str, delta_path: str, ): proc = subprocess.run(["tar", "--extract", "-f", delta_path,], cwd=work_dir) proc.check_returncode() with open(os.path.join(work_dir, "type"), "rb") as fd: type_ = fd.read().decode().strip() assert type_ == "zip", ( f"Unknown zipball type {type}. Are you" f"extracting a delta from pristine-tar instead of pristine-zip?" ) xdelta3_path = os.path.join(work_dir, "delta") assert os.path.isfile(xdelta3_path), "Missing 'delta' file in delta archive." _apply_xdelta3(reference_zipball_path, zipball_path, xdelta3_path) def _apply_xdelta3(reference_zipball_path: str, zipball_path: str, xdelta3_path: str): proc = subprocess.run( ["xdelta3", "-d", "-s", reference_zipball_path, xdelta3_path, zipball_path] ) proc.check_returncode() diff --git a/pristine_zip/main.py b/pristine_zip/main.py index bf23e8d..a80e9cc 100644 --- a/pristine_zip/main.py +++ b/pristine_zip/main.py @@ -1,69 +1,67 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os.path import tempfile import click from . import compress from . import delta_to_zipball from . import zipball_to_delta @click.group() def cli(): pass @cli.command() @click.argument("zipball", type=click.Path(exists=True, readable=True)) @click.argument("delta", type=click.Path(exists=False, writable=True)) def gendelta(zipball: str, delta: str): """Takes an upstream zipball and generates a small binary delta that can be used to re-generate the zipball.""" zipball_to_delta.gendelta(os.path.abspath(zipball), os.path.abspath(delta)) @cli.command() @click.argument("delta", type=click.Path(exists=True, readable=True)) @click.argument("zipball", type=click.Path(exists=False, writable=True)) def genzip(delta: str, zipball: str): """Takes a delta generated by 'pristine-zip gendelta' and reads files from the CWD to generate the exact same zipball as was given to 'gendelta'.""" checkout_dir = os.getcwd() delta_to_zipball.genzip( checkout_dir, os.path.abspath(delta), os.path.abspath(zipball) ) @cli.command() @click.argument("source_zipball", type=click.Path(exists=True, readable=True)) @click.argument("reference_zipball", type=click.Path(exists=False, writable=True)) def regenzip(source_zipball: str, reference_zipball: str): """Takes a zipball and generates a reference zipball from it.""" with tempfile.TemporaryDirectory() as work_dir: zipball_to_delta.generate_reference_zipball_from_zipball( os.path.abspath(source_zipball), os.path.abspath(reference_zipball), work_dir, ) @cli.command() @click.argument("reference_zipball", type=click.Path(exists=False, writable=True)) def cwdgenzip(reference_zipball: str): """Reads the CWD and generates a reference zipball from it.""" checkout_dir = os.getcwd() compress.compress( - checkout_dir, - delta_to_zipball.get_checkout_root(checkout_dir), - reference_zipball, + checkout_dir, reference_zipball, ) def main(): return cli() diff --git a/pristine_zip/zipball_to_delta.py b/pristine_zip/zipball_to_delta.py index 10a4885..3bdfd07 100644 --- a/pristine_zip/zipball_to_delta.py +++ b/pristine_zip/zipball_to_delta.py @@ -1,102 +1,90 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import subprocess import tempfile from . import compress -from . import common def gendelta(zipball_path: str, delta_path: str): with tempfile.TemporaryDirectory(prefix="pristine-zip-gendelta") as work_dir: reference_zipball_path = os.path.join(work_dir, "reference.zip") generate_reference_zipball_from_zipball( zipball_path, reference_zipball_path, work_dir ) _generate_delta(zipball_path, reference_zipball_path, work_dir, delta_path) def _generate_delta( upstream_zipball_path: str, reference_zipball_path: str, work_dir: str, delta_path: str, ): _generate_xdelta3( upstream_zipball_path, reference_zipball_path, os.path.join(work_dir, "delta") ) with open(os.path.join(work_dir, "type"), "wb") as fd: fd.write(b"zip\n") files = ["delta", "type"] # Make the timestamps in the delta tarball deterministic for file in files: os.utime(os.path.join(work_dir, file), times=(0, 0)) proc = subprocess.run( [ "tar", # make entries in the delta tarball deterministic: "--owner", "0", "--group", "0", "--numeric-owner", "--mode", "644", # generic options: "--create", "--compress", "-f", delta_path, *files, ], cwd=work_dir, ) proc.check_returncode() def _generate_xdelta3( upstream_zipball_path: str, reference_zipball_path: str, delta_path: str ): """Generates the xdelta3 difference between a reference zipball and the original one.""" proc = subprocess.run( [ "xdelta3", "-e", "-s", reference_zipball_path, upstream_zipball_path, delta_path, ] ) proc.check_returncode() def generate_reference_zipball_from_zipball( upstream_zipball_path: str, reference_zipball_path: str, work_dir: str ): """Unzips an upstream zipball and rezips it in a reference zipball.""" checkout_dir = os.path.join(work_dir, "checkout") os.mkdir(checkout_dir) proc = subprocess.run(["unzip", upstream_zipball_path], cwd=checkout_dir) proc.check_returncode() - extracted_dir_names = list(os.listdir(checkout_dir)) - - try: - (dir_name,) = extracted_dir_names - except ValueError: - raise common.MultipleRootDirectoriesError( - f"Extracted {len(extracted_dir_names)} root files/dirs; expected 1." - ) from None - - dir_name = extracted_dir_names[0] - - compress.compress(checkout_dir, dir_name, reference_zipball_path) + compress.compress(checkout_dir, reference_zipball_path)