Page MenuHomeSoftware Heritage

D4616.id.diff
No OneTemporary

D4616.id.diff

diff --git a/swh/web/common/archive.py b/swh/web/common/archive.py
--- a/swh/web/common/archive.py
+++ b/swh/web/common/archive.py
@@ -1023,7 +1023,9 @@
return converters.from_origin_visit({**visit_status.to_dict(), "type": visit.type})
-def lookup_snapshot_sizes(snapshot_id: str) -> Dict[str, int]:
+def lookup_snapshot_sizes(
+ snapshot_id: str, branch_name_exclude_prefix: Optional[str] = None
+) -> Dict[str, int]:
"""Count the number of branches in the snapshot with the given id
Args:
@@ -1035,7 +1037,10 @@
"""
snapshot_id_bin = _to_sha1_bin(snapshot_id)
snapshot_sizes = dict.fromkeys(("alias", "release", "revision"), 0)
- branch_counts = storage.snapshot_count_branches(snapshot_id_bin)
+ branch_counts = storage.snapshot_count_branches(
+ snapshot_id_bin,
+ branch_name_exclude_prefix.encode() if branch_name_exclude_prefix else None,
+ )
# remove possible None key returned by snapshot_count_branches
# when null branches are present in the snapshot
branch_counts.pop(None, None)
@@ -1044,31 +1049,49 @@
def lookup_snapshot(
- snapshot_id, branches_from="", branches_count=1000, target_types=None
-):
+ snapshot_id: str,
+ branches_from: str = "",
+ branches_count: int = 1000,
+ target_types: Optional[List[str]] = None,
+ branch_name_include_substring: Optional[str] = None,
+ branch_name_exclude_prefix: Optional[str] = None,
+) -> Dict[str, Any]:
"""Return information about a snapshot, aka the list of named
branches found during a specific visit of an origin.
Args:
- snapshot_id (str): sha1 identifier of the snapshot
- branches_from (str): optional parameter used to skip branches
+ snapshot_id: sha1 identifier of the snapshot
+ branches_from: optional parameter used to skip branches
whose name is lesser than it before returning them
- branches_count (int): optional parameter used to restrain
+ branches_count: optional parameter used to restrain
the amount of returned branches
- target_types (list): optional parameter used to filter the
+ target_types: optional parameter used to filter the
target types of branch to return (possible values that can be
contained in that list are `'content', 'directory',
'revision', 'release', 'snapshot', 'alias'`)
+ branch_name_include_substring: if provided, only return branches whose name
+ contains given substring
+ branch_name_exclude_prefix: if provided, do not return branches whose name
+ starts with given pattern
+
Returns:
A dict filled with the snapshot content.
"""
snapshot_id_bin = _to_sha1_bin(snapshot_id)
partial_branches = storage.snapshot_get_branches(
- snapshot_id_bin, branches_from.encode(), branches_count, target_types
+ snapshot_id_bin,
+ branches_from.encode(),
+ branches_count,
+ target_types,
+ branch_name_include_substring.encode()
+ if branch_name_include_substring
+ else None,
+ branch_name_exclude_prefix.encode() if branch_name_exclude_prefix else None,
)
if not partial_branches:
raise NotFoundExc(f"Snapshot with id {snapshot_id} not found!")
+
return converters.from_partial_branches(partial_branches)
diff --git a/swh/web/tests/common/test_archive.py b/swh/web/tests/common/test_archive.py
--- a/swh/web/tests/common/test_archive.py
+++ b/swh/web/tests/common/test_archive.py
@@ -14,7 +14,16 @@
from swh.model.from_disk import DentryPerms
from swh.model.hashutil import hash_to_bytes, hash_to_hex
from swh.model.identifiers import CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT
-from swh.model.model import Directory, DirectoryEntry, Origin, OriginVisit, Revision
+from swh.model.model import (
+ Directory,
+ DirectoryEntry,
+ Origin,
+ OriginVisit,
+ Revision,
+ Snapshot,
+ SnapshotBranch,
+ TargetType,
+)
from swh.web.common import archive
from swh.web.common.exc import BadInputExc, NotFoundExc
from swh.web.common.typing import OriginInfo
@@ -1042,9 +1051,151 @@
assert archive.lookup_snapshot_sizes(snapshot) == expected_sizes
+@given(revision())
+def test_lookup_snapshot_sizes_with_filtering(archive_data, revision):
+ rev_id = hash_to_bytes(revision)
+ snapshot = Snapshot(
+ branches={
+ b"refs/heads/master": SnapshotBranch(
+ target=rev_id, target_type=TargetType.REVISION,
+ ),
+ b"refs/heads/incoming": SnapshotBranch(
+ target=rev_id, target_type=TargetType.REVISION,
+ ),
+ b"refs/pull/1": SnapshotBranch(
+ target=rev_id, target_type=TargetType.REVISION,
+ ),
+ b"refs/pull/2": SnapshotBranch(
+ target=rev_id, target_type=TargetType.REVISION,
+ ),
+ },
+ )
+ archive_data.snapshot_add([snapshot])
+
+ expected_sizes = {"alias": 0, "release": 0, "revision": 2}
+
+ assert (
+ archive.lookup_snapshot_sizes(
+ snapshot.id.hex(), branch_name_exclude_prefix="refs/pull/"
+ )
+ == expected_sizes
+ )
+
+
@given(snapshot())
def test_lookup_snapshot_alias(snapshot):
resolved_alias = archive.lookup_snapshot_alias(snapshot, "HEAD")
assert resolved_alias is not None
assert resolved_alias["target_type"] == "revision"
assert resolved_alias["target"] is not None
+
+
+@given(revision())
+def test_lookup_snapshot_branch_names_filtering(archive_data, revision):
+ rev_id = hash_to_bytes(revision)
+ snapshot = Snapshot(
+ branches={
+ b"refs/heads/master": SnapshotBranch(
+ target=rev_id, target_type=TargetType.REVISION,
+ ),
+ b"refs/heads/incoming": SnapshotBranch(
+ target=rev_id, target_type=TargetType.REVISION,
+ ),
+ b"refs/pull/1": SnapshotBranch(
+ target=rev_id, target_type=TargetType.REVISION,
+ ),
+ b"refs/pull/2": SnapshotBranch(
+ target=rev_id, target_type=TargetType.REVISION,
+ ),
+ "non_ascii_name_é".encode(): SnapshotBranch(
+ target=rev_id, target_type=TargetType.REVISION,
+ ),
+ },
+ )
+ archive_data.snapshot_add([snapshot])
+
+ for include_pattern, exclude_prefix, nb_results in (
+ ("pull", None, 2),
+ ("incoming", None, 1),
+ ("é", None, 1),
+ (None, "refs/heads/", 3),
+ ("refs", "refs/heads/master", 3),
+ ):
+
+ branches = archive.lookup_snapshot(
+ hash_to_hex(snapshot.id),
+ branch_name_include_substring=include_pattern,
+ branch_name_exclude_prefix=exclude_prefix,
+ )["branches"]
+ assert len(branches) == nb_results
+ for branch_name in branches:
+ if include_pattern:
+ assert include_pattern in branch_name
+ if exclude_prefix:
+ assert not branch_name.startswith(exclude_prefix)
+
+
+@given(directory(), revision())
+def test_lookup_snapshot_branch_names_filtering_paginated(
+ archive_data, directory, revision
+):
+ pattern = "foo"
+ nb_branches_by_target_type = 10
+ branches = {}
+ for i in range(nb_branches_by_target_type):
+ branches[f"branch/directory/bar{i}".encode()] = SnapshotBranch(
+ target=hash_to_bytes(directory), target_type=TargetType.DIRECTORY,
+ )
+ branches[f"branch/revision/bar{i}".encode()] = SnapshotBranch(
+ target=hash_to_bytes(revision), target_type=TargetType.REVISION,
+ )
+ branches[f"branch/directory/{pattern}{i}".encode()] = SnapshotBranch(
+ target=hash_to_bytes(directory), target_type=TargetType.DIRECTORY,
+ )
+ branches[f"branch/revision/{pattern}{i}".encode()] = SnapshotBranch(
+ target=hash_to_bytes(revision), target_type=TargetType.REVISION,
+ )
+
+ snapshot = Snapshot(branches=branches)
+ archive_data.snapshot_add([snapshot])
+
+ branches_count = nb_branches_by_target_type // 2
+
+ for target_type in (
+ DIRECTORY,
+ REVISION,
+ ):
+ partial_branches = archive.lookup_snapshot(
+ hash_to_hex(snapshot.id),
+ target_types=[target_type],
+ branches_count=branches_count,
+ branch_name_include_substring=pattern,
+ )
+ branches = partial_branches["branches"]
+
+ assert len(branches) == branches_count
+ for branch_name, branch_data in branches.items():
+ assert pattern in branch_name
+ assert branch_data["target_type"] == target_type
+ for i in range(branches_count):
+ assert f"branch/{target_type}/{pattern}{i}" in branches
+ assert (
+ partial_branches["next_branch"]
+ == f"branch/{target_type}/{pattern}{branches_count}"
+ )
+
+ partial_branches = archive.lookup_snapshot(
+ hash_to_hex(snapshot.id),
+ target_types=[target_type],
+ branches_from=partial_branches["next_branch"],
+ branch_name_include_substring=pattern,
+ )
+ branches = partial_branches["branches"]
+
+ assert len(branches) == branches_count
+ for branch_name, branch_data in branches.items():
+ assert pattern in branch_name
+ assert branch_data["target_type"] == target_type
+ for i in range(branches_count, 2 * branches_count):
+ assert f"branch/{target_type}/{pattern}{i}" in branches
+ assert partial_branches["next_branch"] is None

File Metadata

Mime Type
text/plain
Expires
Dec 20 2024, 3:35 PM (11 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3224026

Event Timeline