diff --git a/requirements-swh.txt b/requirements-swh.txt index 1b2e179..b2ea0a2 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,4 +1,4 @@ # Add here internal Software Heritage dependencies, one per line. -swh.core[http] -swh.objstorage >= 0.0.43 -swh.journal >= 0.0.31 +swh.core[http] >= 0.3 +swh.objstorage >= 0.2.1 +swh.journal >= 0.4.2 diff --git a/setup.py b/setup.py index c7d1b04..73405d8 100755 --- a/setup.py +++ b/setup.py @@ -1,73 +1,73 @@ #!/usr/bin/env python3 # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from io import open from os import path from setuptools import find_packages, setup here = path.abspath(path.dirname(__file__)) # Get the long description from the README file with open(path.join(here, "README.md"), encoding="utf-8") as f: long_description = f.read() def parse_requirements(name=None): if name: reqf = "requirements-%s.txt" % name else: reqf = "requirements.txt" requirements = [] if not path.exists(reqf): return requirements with open(reqf) as f: for line in f.readlines(): line = line.strip() if not line or line.startswith("#"): continue requirements.append(line) return requirements # Edit this part to match your module. # Full sample: # https://forge.softwareheritage.org/diffusion/DCORE/browse/master/setup.py setup( name="swh.objstorage.replayer", # example: swh.loader.pypi description="Software Heritage content replayer", long_description=long_description, long_description_content_type="text/markdown", python_requires=">=3.7", author="Software Heritage developers", author_email="swh-devel@inria.fr", url="https://forge.softwareheritage.org/diffusion/swh-objstorage-replayer", packages=find_packages(), # packages's modules install_requires=parse_requirements() + parse_requirements("swh"), tests_require=parse_requirements("test"), setup_requires=["setuptools_scm"], extras_require={"testing": parse_requirements("test")}, use_scm_version=True, include_package_data=True, entry_points=""" [swh.cli.subcommands] - content-replayer=swh.objstorage.replayer.cli:objstorage_cli_group + content-replayer=swh.objstorage.replayer.cli """, classifiers=[ "Programming Language :: Python :: 3", "Intended Audience :: Developers", "License :: OSI Approved :: GNU General Public License v3 (GPLv3)", "Operating System :: OS Independent", "Development Status :: 3 - Alpha", ], project_urls={ "Bug Reports": "https://forge.softwareheritage.org/maniphest", "Funding": "https://www.softwareheritage.org/donate", "Source": "https://forge.softwareheritage.org/source/swh-objstorage-replayer", }, ) diff --git a/swh/objstorage/replayer/cli.py b/swh/objstorage/replayer/cli.py index 23d641f..e281b0b 100644 --- a/swh/objstorage/replayer/cli.py +++ b/swh/objstorage/replayer/cli.py @@ -1,138 +1,138 @@ # Copyright (C) 2016-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # WARNING: do not import unnecessary things here to keep cli startup time under # control import logging import click try: from systemd.daemon import notify except ImportError: notify = None -from swh.objstorage.cli import cli as objstorage_cli_group +from swh.objstorage.cli import objstorage_cli_group @objstorage_cli_group.command("replay") @click.option( "--stop-after-objects", "-n", default=None, type=int, help="Stop after processing this many objects. Default is to run forever.", ) @click.option( "--exclude-sha1-file", default=None, type=click.File("rb"), help="File containing a sorted array of hashes to be excluded.", ) @click.option( "--check-dst/--no-check-dst", default=True, help="Check whether the destination contains the object before copying.", ) @click.pass_context def content_replay(ctx, stop_after_objects, exclude_sha1_file, check_dst): """Fill a destination Object Storage using a journal stream. This is typically used for a mirror configuration, by reading a Journal and retrieving objects from an existing source ObjStorage. There can be several 'replayers' filling a given ObjStorage as long as they use the same ``group-id``. You can use the ``KAFKA_GROUP_INSTANCE_ID`` environment variable to use KIP-345 static group membership. This service retrieves object ids to copy from the 'content' topic. It will only copy object's content if the object's description in the kafka nmessage has the status:visible set. ``--exclude-sha1-file`` may be used to exclude some hashes to speed-up the replay in case many of the contents are already in the destination objstorage. It must contain a concatenation of all (sha1) hashes, and it must be sorted. This file will not be fully loaded into memory at any given time, so it can be arbitrarily large. ``--check-dst`` sets whether the replayer should check in the destination ObjStorage before copying an object. You can turn that off if you know you're copying to an empty ObjStorage. """ import functools import mmap from swh.journal.client import get_journal_client from swh.model.model import SHA1_SIZE from swh.objstorage.factory import get_objstorage from swh.objstorage.replayer.replay import ( is_hash_in_bytearray, process_replay_objects_content, ) conf = ctx.obj["config"] try: objstorage_src = get_objstorage(**conf.pop("objstorage")) except KeyError: ctx.fail("You must have a source objstorage configured in " "your config file.") try: objstorage_dst = get_objstorage(**conf.pop("objstorage_dst")) except KeyError: ctx.fail( "You must have a destination objstorage configured " "in your config file." ) if exclude_sha1_file: map_ = mmap.mmap(exclude_sha1_file.fileno(), 0, prot=mmap.PROT_READ) if map_.size() % SHA1_SIZE != 0: ctx.fail( "--exclude-sha1 must link to a file whose size is an " "exact multiple of %d bytes." % SHA1_SIZE ) nb_excluded_hashes = int(map_.size() / SHA1_SIZE) def exclude_fn(obj): return is_hash_in_bytearray(obj["sha1"], map_, nb_excluded_hashes) else: exclude_fn = None client = get_journal_client( "kafka", **conf["journal_client"], stop_after_objects=stop_after_objects, object_types=("content",), ) worker_fn = functools.partial( process_replay_objects_content, src=objstorage_src, dst=objstorage_dst, exclude_fn=exclude_fn, check_dst=check_dst, ) if notify: notify("READY=1") try: client.process(worker_fn) except KeyboardInterrupt: ctx.exit(0) else: print("Done.") finally: if notify: notify("STOPPING=1") client.close() def main(): logging.basicConfig() return objstorage_cli_group(auto_envvar_prefix="SWH_OBJSTORAGE") if __name__ == "__main__": main()