Changeset View
Changeset View
Standalone View
Standalone View
swh/objstorage/replayer/cli.py
Show All 30 Lines | @click.option( | ||||
type=click.File("rb"), | type=click.File("rb"), | ||||
help="File containing a sorted array of hashes to be excluded.", | help="File containing a sorted array of hashes to be excluded.", | ||||
) | ) | ||||
@click.option( | @click.option( | ||||
"--check-dst/--no-check-dst", | "--check-dst/--no-check-dst", | ||||
default=True, | default=True, | ||||
help="Check whether the destination contains the object before copying.", | help="Check whether the destination contains the object before copying.", | ||||
) | ) | ||||
@click.option( | |||||
"--concurrency", | |||||
default=4, | |||||
help=( | |||||
"Number of concurrent threads doing the actual copy of blobs between " | |||||
"the source and destination objstorages." | |||||
), | |||||
) | |||||
@click.pass_context | @click.pass_context | ||||
def content_replay(ctx, stop_after_objects, exclude_sha1_file, check_dst): | def content_replay(ctx, stop_after_objects, exclude_sha1_file, check_dst, concurrency): | ||||
"""Fill a destination Object Storage using a journal stream. | """Fill a destination Object Storage using a journal stream. | ||||
This is typically used for a mirror configuration, by reading a Journal | This is typically used for a mirror configuration, by reading a Journal | ||||
and retrieving objects from an existing source ObjStorage. | and retrieving objects from an existing source ObjStorage. | ||||
There can be several 'replayers' filling a given ObjStorage as long as they | There can be several 'replayers' filling a given ObjStorage as long as they | ||||
use the same ``group-id``. You can use the ``KAFKA_GROUP_INSTANCE_ID`` | use the same ``group-id``. You can use the ``KAFKA_GROUP_INSTANCE_ID`` | ||||
environment variable to use KIP-345 static group membership. | environment variable to use KIP-345 static group membership. | ||||
This service retrieves object ids to copy from the 'content' topic. It will | This service retrieves object ids to copy from the 'content' topic. It will | ||||
only copy object's content if the object's description in the kafka | only copy object's content if the object's description in the kafka | ||||
nmessage has the status:visible set. | nmessage has the status:visible set. | ||||
``--exclude-sha1-file`` may be used to exclude some hashes to speed-up the | ``--exclude-sha1-file`` may be used to exclude some hashes to speed-up the | ||||
replay in case many of the contents are already in the destination | replay in case many of the contents are already in the destination | ||||
objstorage. It must contain a concatenation of all (sha1) hashes, | objstorage. It must contain a concatenation of all (sha1) hashes, | ||||
and it must be sorted. | and it must be sorted. | ||||
This file will not be fully loaded into memory at any given time, | This file will not be fully loaded into memory at any given time, | ||||
so it can be arbitrarily large. | so it can be arbitrarily large. | ||||
``--check-dst`` sets whether the replayer should check in the destination | ``--check-dst`` sets whether the replayer should check in the destination | ||||
ObjStorage before copying an object. You can turn that off if you know | ObjStorage before copying an object. You can turn that off if you know | ||||
you're copying to an empty ObjStorage. | you're copying to an empty ObjStorage. | ||||
``--concurrency N`` sets the number of threads in charge of copy blob objects | |||||
from the source objstorage to the destination one. Using a large concurrency | |||||
value make sense if both the source and destination objstorages support highly | |||||
parallel workloads. Make not to set the ``batch_size`` configuration option too | |||||
low for the concurrency to be actually useful (each batch of kafka messages is | |||||
dispatched among the threads). | |||||
The expected configuration file should have 3 sections: | The expected configuration file should have 3 sections: | ||||
- objstorage: the source object storage from which to retrieve objects to | - objstorage: the source object storage from which to retrieve objects to | ||||
copy; this objstorage can (and should) be a read-only objstorage, | copy; this objstorage can (and should) be a read-only objstorage, | ||||
https://docs.softwareheritage.org/devel/apidoc/swh.objstorage.html | https://docs.softwareheritage.org/devel/apidoc/swh.objstorage.html | ||||
- objstorage_dst: the destination objstorage in which objects will be | - objstorage_dst: the destination objstorage in which objects will be | ||||
▲ Show 20 Lines • Show All 74 Lines • ▼ Show 20 Lines | client = get_journal_client( | ||||
**journal_cfg, stop_after_objects=stop_after_objects, object_types=("content",), | **journal_cfg, stop_after_objects=stop_after_objects, object_types=("content",), | ||||
) | ) | ||||
worker_fn = functools.partial( | worker_fn = functools.partial( | ||||
process_replay_objects_content, | process_replay_objects_content, | ||||
src=objstorage_src, | src=objstorage_src, | ||||
dst=objstorage_dst, | dst=objstorage_dst, | ||||
exclude_fn=exclude_fn, | exclude_fn=exclude_fn, | ||||
check_dst=check_dst, | check_dst=check_dst, | ||||
concurrency=concurrency, | |||||
) | ) | ||||
if notify: | if notify: | ||||
notify("READY=1") | notify("READY=1") | ||||
try: | try: | ||||
client.process(worker_fn) | client.process(worker_fn) | ||||
except KeyboardInterrupt: | except KeyboardInterrupt: | ||||
Show All 16 Lines |