diff --git a/swh/objstorage/replayer/cli.py b/swh/objstorage/replayer/cli.py --- a/swh/objstorage/replayer/cli.py +++ b/swh/objstorage/replayer/cli.py @@ -31,6 +31,12 @@ type=click.File("rb"), help="File containing a sorted array of hashes to be excluded.", ) +@click.option( + "--size-limit", + default=0, + type=int, + help="Exclude files which size is over this limit. 0 (default) means no size limit.", +) @click.option( "--check-dst/--no-check-dst", default=True, @@ -45,7 +51,9 @@ ), ) @click.pass_context -def content_replay(ctx, stop_after_objects, exclude_sha1_file, check_dst, concurrency): +def content_replay( + ctx, stop_after_objects, exclude_sha1_file, size_limit, check_dst, concurrency +): """Fill a destination Object Storage using a journal stream. This is typically used for a mirror configuration, by reading a Journal @@ -66,6 +74,9 @@ This file will not be fully loaded into memory at any given time, so it can be arbitrarily large. + ``--size-limit`` exclude file content which size is (strictly) above + the given size limit. If 0, then there is no size limit. + ``--check-dst`` sets whether the replayer should check in the destination ObjStorage before copying an object. You can turn that off if you know you're copying to an empty ObjStorage. @@ -134,6 +145,7 @@ "You must have a destination objstorage configured " "in your config file." ) + exclude_fns = [] if exclude_sha1_file: map_ = mmap.mmap(exclude_sha1_file.fileno(), 0, prot=mmap.PROT_READ) if map_.size() % SHA1_SIZE != 0: @@ -143,9 +155,26 @@ ) nb_excluded_hashes = int(map_.size() / SHA1_SIZE) - def exclude_fn(obj): + def exclude_by_hash(obj): return is_hash_in_bytearray(obj["sha1"], map_, nb_excluded_hashes) + exclude_fns.append(exclude_by_hash) + + if size_limit: + + def exclude_by_size(obj): + return obj["length"] > size_limit + + exclude_fns.append(exclude_by_size) + + if exclude_fns: + + def exclude_fn(obj): + for fn in exclude_fns: + if fn(obj): + return True + return False + else: exclude_fn = None diff --git a/swh/objstorage/replayer/tests/test_cli.py b/swh/objstorage/replayer/tests/test_cli.py --- a/swh/objstorage/replayer/tests/test_cli.py +++ b/swh/objstorage/replayer/tests/test_cli.py @@ -112,7 +112,7 @@ contents = {} for i in range(NUM_CONTENTS): - content = b"\x00" * 19 + bytes([i]) + content = b"\x00" * i + bytes([i]) sha1 = MultiHash(["sha1"]).from_data(content).digest()["sha1"] objstorage.add(content=content, obj_id=sha1) contents[sha1] = content @@ -122,6 +122,7 @@ value=key_to_kafka( { "sha1": sha1, + "length": len(content), "status": "visible", } ), @@ -269,7 +270,7 @@ @_patch_objstorages(["src", "dst"]) -def test_replay_content_exclude( +def test_replay_content_exclude_by_hash( objstorages, kafka_prefix: str, kafka_consumer_group: str, @@ -277,7 +278,7 @@ ): """Check the content replayer in normal conditions - with a exclusion file (--exclude-sha1-file) + with an exclusion file (--exclude-sha1-file) """ contents = _fill_objstorage_and_kafka( @@ -315,6 +316,98 @@ assert objstorages["dst"].get(sha1) == content +@_patch_objstorages(["src", "dst"]) +def test_replay_content_exclude_by_size( + objstorages, + kafka_prefix: str, + kafka_consumer_group: str, + kafka_server: Tuple[Popen, int], +): + """Check the content replayer in normal conditions + + with a size limit (--size-limit) + """ + + contents = _fill_objstorage_and_kafka( + kafka_server, kafka_prefix, objstorages["src"] + ) + + result = invoke( + "replay", + "--stop-after-objects", + str(NUM_CONTENTS), + "--size-limit", + 5, + journal_client={ + "cls": "kafka", + "brokers": kafka_server, + "group_id": kafka_consumer_group, + "prefix": kafka_prefix, + }, + ) + expected = r"Done.\n" + assert result.exit_code == 0, result.output + assert re.fullmatch(expected, result.output, re.MULTILINE), result.output + assert [c for c in contents.values() if len(c) > 5] + assert [c for c in contents.values() if len(c) <= 5] + for (sha1, content) in contents.items(): + if len(content) > 5: + assert sha1 not in objstorages["dst"], sha1 + else: + assert sha1 in objstorages["dst"], sha1 + assert objstorages["dst"].get(sha1) == content + + +@_patch_objstorages(["src", "dst"]) +def test_replay_content_exclude_by_both( + objstorages, + kafka_prefix: str, + kafka_consumer_group: str, + kafka_server: Tuple[Popen, int], +): + """Check the content replayer in normal conditions + + with both an exclusion file (--exclude-sha1-file) and a size limit (--size-limit) + """ + + contents = _fill_objstorage_and_kafka( + kafka_server, kafka_prefix, objstorages["src"] + ) + excluded_contents = list(contents)[0::2] # picking half of them + with tempfile.NamedTemporaryFile(mode="w+b") as fd: + fd.write(b"".join(sorted(excluded_contents))) + + fd.seek(0) + + result = invoke( + "replay", + "--stop-after-objects", + str(NUM_CONTENTS), + "--size-limit", + 5, + "--exclude-sha1-file", + fd.name, + journal_client={ + "cls": "kafka", + "brokers": kafka_server, + "group_id": kafka_consumer_group, + "prefix": kafka_prefix, + }, + ) + expected = r"Done.\n" + assert result.exit_code == 0, result.output + assert re.fullmatch(expected, result.output, re.MULTILINE), result.output + + for (sha1, content) in contents.items(): + if len(content) > 5: + assert sha1 not in objstorages["dst"], sha1 + elif sha1 in excluded_contents: + assert sha1 not in objstorages["dst"], sha1 + else: + assert sha1 in objstorages["dst"], sha1 + assert objstorages["dst"].get(sha1) == content + + NUM_CONTENTS_DST = 5