diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,2 +1,3 @@ swh.core[db,http] >= 0.0.60 +swh-model >= 0.0.40 swh.storage >= 0.0.147 diff --git a/swh/journal/cli.py b/swh/journal/cli.py --- a/swh/journal/cli.py +++ b/swh/journal/cli.py @@ -3,17 +3,21 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import click import functools import logging +import mmap import os +import click + from swh.core import config from swh.core.cli import CONTEXT_SETTINGS +from swh.model.model import SHA1_SIZE from swh.storage import get_storage from swh.objstorage import get_objstorage from swh.journal.client import JournalClient +from swh.journal.replay import is_hash_in_bytearray from swh.journal.replay import process_replay_objects from swh.journal.replay import process_replay_objects_content from swh.journal.backfill import JournalBackfiller @@ -152,8 +156,11 @@ @click.option('--group-id', type=str, help='Name of the group id for reading from Kafka.' '(deprecated, use the config file instead)') +@click.option('--exclude-sha1-file', default=None, type=click.File('rb'), + help='File containing a sorted array of hashes to be excluded.') @click.pass_context -def content_replay(ctx, max_messages, concurrency, brokers, prefix, group_id): +def content_replay(ctx, max_messages, concurrency, + brokers, prefix, group_id, exclude_sha1_file): """Fill a destination Object Storage (typically a mirror) by reading a Journal and retrieving objects from an existing source ObjStorage. @@ -163,6 +170,13 @@ This service retrieves object ids to copy from the 'content' topic. It will only copy object's content if the object's description in the kafka nmessage has the status:visible set. + + `--exclude-sha1-file` may be used to exclude some hashes to speed-up the + replay in case many of the contents are already in the destination + objstorage. It must contain a concatenation of all (sha1) hashes, + and it must be sorted. + This file will not be fully loaded into memory at any given time, + so it can be arbitrarily large. """ logger = logging.getLogger(__name__) conf = ctx.obj['config'] @@ -177,13 +191,26 @@ ctx.fail('You must have a destination objstorage configured ' 'in your config file.') + if exclude_sha1_file: + map_ = mmap.mmap(exclude_sha1_file.fileno(), 0, prot=mmap.PROT_READ) + if map_.size() % SHA1_SIZE != 0: + ctx.fail('--exclude-sha1 must link to a file whose size is an ' + 'exact multiple of %d bytes.' % SHA1_SIZE) + nb_excluded_hashes = int(map_.size()/SHA1_SIZE) + + def exclude_fn(obj): + return is_hash_in_bytearray(obj['sha1'], map_, nb_excluded_hashes) + else: + exclude_fn = None + client = get_journal_client( ctx, brokers=brokers, prefix=prefix, group_id=group_id, object_types=('content',)) worker_fn = functools.partial(process_replay_objects_content, src=objstorage_src, dst=objstorage_dst, - concurrency=concurrency) + concurrency=concurrency, + exclude_fn=exclude_fn) try: nb_messages = 0 diff --git a/swh/journal/replay.py b/swh/journal/replay.py --- a/swh/journal/replay.py +++ b/swh/journal/replay.py @@ -7,17 +7,15 @@ import logging from concurrent.futures import ThreadPoolExecutor -from swh.storage import HashCollision +from swh.core.statsd import statsd from swh.model.hashutil import hash_to_hex +from swh.model.model import SHA1_SIZE from swh.objstorage.objstorage import ID_HASH_ALGO -from swh.core.statsd import statsd +from swh.storage import HashCollision logger = logging.getLogger(__name__) -SHA1_SIZE = 20 - - def process_replay_objects(all_objects, *, storage): for (object_type, objects) in all_objects.items(): _insert_objects(object_type, objects, storage) @@ -123,8 +121,54 @@ return len(obj) -def process_replay_objects_content(all_objects, *, src, dst, concurrency=8): +def process_replay_objects_content(all_objects, *, src, dst, concurrency=8, + exclude_fn=None): + """ + Takes a list of records from Kafka (see + :py:func:`swh.journal.client.JournalClient.process`) and copies them + from the `src` objstorage to the `dst` objstorage, if: + + * `obj['status']` is `'visible'` + * `exclude_fn(obj)` is `False` (if `exclude_fn` is provided) + + Args: + all_objects Dict[str, List[dict]]: Objects passed by the Kafka client. + Most importantly, `all_objects['content'][*]['sha1']` is the + sha1 hash of each content + src: An object storage (see :py:func:`swh.objstorage.get_objstorage`) + dst: An object storage (see :py:func:`swh.objstorage.get_objstorage`) + exclude_fn Optional[Callable[dict, bool]]: Determines whether + an object should be copied. + + Example: + + >>> from swh.objstorage import get_objstorage + >>> src = get_objstorage('memory', {}) + >>> dst = get_objstorage('memory', {}) + >>> id1 = src.add(b'foo bar') + >>> id2 = src.add(b'baz qux') + >>> kafka_partitions = { + ... 'content': [ + ... { + ... 'sha1': id1, + ... 'status': 'visible', + ... }, + ... { + ... 'sha1': id2, + ... 'status': 'visible', + ... }, + ... ] + ... } + >>> process_replay_objects_content( + ... kafka_partitions, src=src, dst=dst, + ... exclude_fn=lambda obj: obj['sha1'] == id1) + >>> id1 in dst + False + >>> id2 in dst + True + """ vol = [] + nb_skipped = 0 t0 = time() with ThreadPoolExecutor(max_workers=concurrency) as executor: for (object_type, objects) in all_objects.items(): @@ -135,16 +179,23 @@ continue for obj in objects: obj_id = obj[ID_HASH_ALGO] - if obj['status'] == 'visible': + if obj['status'] != 'visible': + nb_skipped += 1 + logger.debug('skipped %s (status=%s)', + hash_to_hex(obj_id), obj['status']) + elif exclude_fn and exclude_fn(obj): + nb_skipped += 1 + logger.debug('skipped %s (manually excluded)', + hash_to_hex(obj_id)) + else: fut = executor.submit(copy_object, obj_id, src, dst) fut.add_done_callback(lambda fn: vol.append(fn.result())) - else: - logger.debug('skipped %s (%s)', - hash_to_hex(obj_id), obj['status']) dt = time() - t0 logger.info( 'processed %s content objects in %.1fsec ' - '(%.1f obj/sec, %.1fMB/sec) - %s failures', + '(%.1f obj/sec, %.1fMB/sec) - %d failures - %d skipped', len(vol), dt, len(vol)/dt, - sum(vol)/1024/1024/dt, len([x for x in vol if not x])) + sum(vol)/1024/1024/dt, + len([x for x in vol if not x]), + nb_skipped) diff --git a/swh/journal/tests/test_cli.py b/swh/journal/tests/test_cli.py --- a/swh/journal/tests/test_cli.py +++ b/swh/journal/tests/test_cli.py @@ -165,3 +165,41 @@ for (sha1, content) in contents.items(): assert sha1 in objstorages['dst'], sha1 assert objstorages['dst'].get(sha1) == content + + +@_patch_objstorages(['src', 'dst']) +def test_replay_content_exclude( + objstorages, + storage: Storage, + kafka_prefix: str, + kafka_server: Tuple[Popen, int]): + (_, kafka_port) = kafka_server + kafka_prefix += '.swh.journal.objects' + + contents = _fill_objstorage_and_kafka( + kafka_port, kafka_prefix, objstorages) + + excluded_contents = list(contents)[0::2] # picking half of them + with tempfile.NamedTemporaryFile(mode='w+b') as fd: + fd.write(b''.join(sorted(excluded_contents))) + + fd.seek(0) + + result = invoke(False, [ + 'content-replay', + '--broker', 'localhost:%d' % kafka_port, + '--group-id', 'test-cli-consumer', + '--prefix', kafka_prefix, + '--max-messages', '10', + '--exclude-sha1-file', fd.name, + ]) + expected = r'Done.\n' + assert result.exit_code == 0, result.output + assert re.fullmatch(expected, result.output, re.MULTILINE), result.output + + for (sha1, content) in contents.items(): + if sha1 in excluded_contents: + assert sha1 not in objstorages['dst'], sha1 + else: + assert sha1 in objstorages['dst'], sha1 + assert objstorages['dst'].get(sha1) == content