Page MenuHomeSoftware Heritage

D8962.id32298.diff
No OneTemporary

D8962.id32298.diff

diff --git a/swh/objstorage/replayer/cli.py b/swh/objstorage/replayer/cli.py
--- a/swh/objstorage/replayer/cli.py
+++ b/swh/objstorage/replayer/cli.py
@@ -31,6 +31,12 @@
type=click.File("rb"),
help="File containing a sorted array of hashes to be excluded.",
)
+@click.option(
+ "--size-limit",
+ default=0,
+ type=int,
+ help="Exclude files which size is over this limit. 0 (default) means no size limit.",
+)
@click.option(
"--check-dst/--no-check-dst",
default=True,
@@ -45,7 +51,9 @@
),
)
@click.pass_context
-def content_replay(ctx, stop_after_objects, exclude_sha1_file, check_dst, concurrency):
+def content_replay(
+ ctx, stop_after_objects, exclude_sha1_file, size_limit, check_dst, concurrency
+):
"""Fill a destination Object Storage using a journal stream.
This is typically used for a mirror configuration, by reading a Journal
@@ -66,6 +74,9 @@
This file will not be fully loaded into memory at any given time,
so it can be arbitrarily large.
+ ``--size-limit`` exclude file content which size is (strictly) above
+ the given size limit. If 0, then there is no size limit.
+
``--check-dst`` sets whether the replayer should check in the destination
ObjStorage before copying an object. You can turn that off if you know
you're copying to an empty ObjStorage.
@@ -134,6 +145,7 @@
"You must have a destination objstorage configured " "in your config file."
)
+ exclude_fns = []
if exclude_sha1_file:
map_ = mmap.mmap(exclude_sha1_file.fileno(), 0, prot=mmap.PROT_READ)
if map_.size() % SHA1_SIZE != 0:
@@ -143,9 +155,26 @@
)
nb_excluded_hashes = int(map_.size() / SHA1_SIZE)
- def exclude_fn(obj):
+ def exclude_by_hash(obj):
return is_hash_in_bytearray(obj["sha1"], map_, nb_excluded_hashes)
+ exclude_fns.append(exclude_by_hash)
+
+ if size_limit:
+
+ def exclude_by_size(obj):
+ return obj["length"] > size_limit
+
+ exclude_fns.append(exclude_by_size)
+
+ if exclude_fns:
+
+ def exclude_fn(obj):
+ for fn in exclude_fns:
+ if fn(obj):
+ return True
+ return False
+
else:
exclude_fn = None
diff --git a/swh/objstorage/replayer/tests/test_cli.py b/swh/objstorage/replayer/tests/test_cli.py
--- a/swh/objstorage/replayer/tests/test_cli.py
+++ b/swh/objstorage/replayer/tests/test_cli.py
@@ -112,7 +112,7 @@
contents = {}
for i in range(NUM_CONTENTS):
- content = b"\x00" * 19 + bytes([i])
+ content = b"\x00" * i + bytes([i])
sha1 = MultiHash(["sha1"]).from_data(content).digest()["sha1"]
objstorage.add(content=content, obj_id=sha1)
contents[sha1] = content
@@ -122,6 +122,7 @@
value=key_to_kafka(
{
"sha1": sha1,
+ "length": len(content),
"status": "visible",
}
),
@@ -269,7 +270,7 @@
@_patch_objstorages(["src", "dst"])
-def test_replay_content_exclude(
+def test_replay_content_exclude_by_hash(
objstorages,
kafka_prefix: str,
kafka_consumer_group: str,
@@ -277,7 +278,7 @@
):
"""Check the content replayer in normal conditions
- with a exclusion file (--exclude-sha1-file)
+ with an exclusion file (--exclude-sha1-file)
"""
contents = _fill_objstorage_and_kafka(
@@ -315,6 +316,98 @@
assert objstorages["dst"].get(sha1) == content
+@_patch_objstorages(["src", "dst"])
+def test_replay_content_exclude_by_size(
+ objstorages,
+ kafka_prefix: str,
+ kafka_consumer_group: str,
+ kafka_server: Tuple[Popen, int],
+):
+ """Check the content replayer in normal conditions
+
+ with a size limit (--size-limit)
+ """
+
+ contents = _fill_objstorage_and_kafka(
+ kafka_server, kafka_prefix, objstorages["src"]
+ )
+
+ result = invoke(
+ "replay",
+ "--stop-after-objects",
+ str(NUM_CONTENTS),
+ "--size-limit",
+ 5,
+ journal_client={
+ "cls": "kafka",
+ "brokers": kafka_server,
+ "group_id": kafka_consumer_group,
+ "prefix": kafka_prefix,
+ },
+ )
+ expected = r"Done.\n"
+ assert result.exit_code == 0, result.output
+ assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
+ assert [c for c in contents.values() if len(c) > 5]
+ assert [c for c in contents.values() if len(c) <= 5]
+ for (sha1, content) in contents.items():
+ if len(content) > 5:
+ assert sha1 not in objstorages["dst"], sha1
+ else:
+ assert sha1 in objstorages["dst"], sha1
+ assert objstorages["dst"].get(sha1) == content
+
+
+@_patch_objstorages(["src", "dst"])
+def test_replay_content_exclude_by_both(
+ objstorages,
+ kafka_prefix: str,
+ kafka_consumer_group: str,
+ kafka_server: Tuple[Popen, int],
+):
+ """Check the content replayer in normal conditions
+
+ with both an exclusion file (--exclude-sha1-file) and a size limit (--size-limit)
+ """
+
+ contents = _fill_objstorage_and_kafka(
+ kafka_server, kafka_prefix, objstorages["src"]
+ )
+ excluded_contents = list(contents)[0::2] # picking half of them
+ with tempfile.NamedTemporaryFile(mode="w+b") as fd:
+ fd.write(b"".join(sorted(excluded_contents)))
+
+ fd.seek(0)
+
+ result = invoke(
+ "replay",
+ "--stop-after-objects",
+ str(NUM_CONTENTS),
+ "--size-limit",
+ 5,
+ "--exclude-sha1-file",
+ fd.name,
+ journal_client={
+ "cls": "kafka",
+ "brokers": kafka_server,
+ "group_id": kafka_consumer_group,
+ "prefix": kafka_prefix,
+ },
+ )
+ expected = r"Done.\n"
+ assert result.exit_code == 0, result.output
+ assert re.fullmatch(expected, result.output, re.MULTILINE), result.output
+
+ for (sha1, content) in contents.items():
+ if len(content) > 5:
+ assert sha1 not in objstorages["dst"], sha1
+ elif sha1 in excluded_contents:
+ assert sha1 not in objstorages["dst"], sha1
+ else:
+ assert sha1 in objstorages["dst"], sha1
+ assert objstorages["dst"].get(sha1) == content
+
+
NUM_CONTENTS_DST = 5

File Metadata

Mime Type
text/plain
Expires
Thu, Dec 19, 1:01 PM (19 h, 2 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3213802

Event Timeline