Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7124105
D4616.id.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
9 KB
Subscribers
None
D4616.id.diff
View Options
diff --git a/swh/web/common/archive.py b/swh/web/common/archive.py
--- a/swh/web/common/archive.py
+++ b/swh/web/common/archive.py
@@ -1023,7 +1023,9 @@
return converters.from_origin_visit({**visit_status.to_dict(), "type": visit.type})
-def lookup_snapshot_sizes(snapshot_id: str) -> Dict[str, int]:
+def lookup_snapshot_sizes(
+ snapshot_id: str, branch_name_exclude_prefix: Optional[str] = None
+) -> Dict[str, int]:
"""Count the number of branches in the snapshot with the given id
Args:
@@ -1035,7 +1037,10 @@
"""
snapshot_id_bin = _to_sha1_bin(snapshot_id)
snapshot_sizes = dict.fromkeys(("alias", "release", "revision"), 0)
- branch_counts = storage.snapshot_count_branches(snapshot_id_bin)
+ branch_counts = storage.snapshot_count_branches(
+ snapshot_id_bin,
+ branch_name_exclude_prefix.encode() if branch_name_exclude_prefix else None,
+ )
# remove possible None key returned by snapshot_count_branches
# when null branches are present in the snapshot
branch_counts.pop(None, None)
@@ -1044,31 +1049,49 @@
def lookup_snapshot(
- snapshot_id, branches_from="", branches_count=1000, target_types=None
-):
+ snapshot_id: str,
+ branches_from: str = "",
+ branches_count: int = 1000,
+ target_types: Optional[List[str]] = None,
+ branch_name_include_substring: Optional[str] = None,
+ branch_name_exclude_prefix: Optional[str] = None,
+) -> Dict[str, Any]:
"""Return information about a snapshot, aka the list of named
branches found during a specific visit of an origin.
Args:
- snapshot_id (str): sha1 identifier of the snapshot
- branches_from (str): optional parameter used to skip branches
+ snapshot_id: sha1 identifier of the snapshot
+ branches_from: optional parameter used to skip branches
whose name is lesser than it before returning them
- branches_count (int): optional parameter used to restrain
+ branches_count: optional parameter used to restrain
the amount of returned branches
- target_types (list): optional parameter used to filter the
+ target_types: optional parameter used to filter the
target types of branch to return (possible values that can be
contained in that list are `'content', 'directory',
'revision', 'release', 'snapshot', 'alias'`)
+ branch_name_include_substring: if provided, only return branches whose name
+ contains given substring
+ branch_name_exclude_prefix: if provided, do not return branches whose name
+ starts with given pattern
+
Returns:
A dict filled with the snapshot content.
"""
snapshot_id_bin = _to_sha1_bin(snapshot_id)
partial_branches = storage.snapshot_get_branches(
- snapshot_id_bin, branches_from.encode(), branches_count, target_types
+ snapshot_id_bin,
+ branches_from.encode(),
+ branches_count,
+ target_types,
+ branch_name_include_substring.encode()
+ if branch_name_include_substring
+ else None,
+ branch_name_exclude_prefix.encode() if branch_name_exclude_prefix else None,
)
if not partial_branches:
raise NotFoundExc(f"Snapshot with id {snapshot_id} not found!")
+
return converters.from_partial_branches(partial_branches)
diff --git a/swh/web/tests/common/test_archive.py b/swh/web/tests/common/test_archive.py
--- a/swh/web/tests/common/test_archive.py
+++ b/swh/web/tests/common/test_archive.py
@@ -14,7 +14,16 @@
from swh.model.from_disk import DentryPerms
from swh.model.hashutil import hash_to_bytes, hash_to_hex
from swh.model.identifiers import CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT
-from swh.model.model import Directory, DirectoryEntry, Origin, OriginVisit, Revision
+from swh.model.model import (
+ Directory,
+ DirectoryEntry,
+ Origin,
+ OriginVisit,
+ Revision,
+ Snapshot,
+ SnapshotBranch,
+ TargetType,
+)
from swh.web.common import archive
from swh.web.common.exc import BadInputExc, NotFoundExc
from swh.web.common.typing import OriginInfo
@@ -1042,9 +1051,151 @@
assert archive.lookup_snapshot_sizes(snapshot) == expected_sizes
+@given(revision())
+def test_lookup_snapshot_sizes_with_filtering(archive_data, revision):
+ rev_id = hash_to_bytes(revision)
+ snapshot = Snapshot(
+ branches={
+ b"refs/heads/master": SnapshotBranch(
+ target=rev_id, target_type=TargetType.REVISION,
+ ),
+ b"refs/heads/incoming": SnapshotBranch(
+ target=rev_id, target_type=TargetType.REVISION,
+ ),
+ b"refs/pull/1": SnapshotBranch(
+ target=rev_id, target_type=TargetType.REVISION,
+ ),
+ b"refs/pull/2": SnapshotBranch(
+ target=rev_id, target_type=TargetType.REVISION,
+ ),
+ },
+ )
+ archive_data.snapshot_add([snapshot])
+
+ expected_sizes = {"alias": 0, "release": 0, "revision": 2}
+
+ assert (
+ archive.lookup_snapshot_sizes(
+ snapshot.id.hex(), branch_name_exclude_prefix="refs/pull/"
+ )
+ == expected_sizes
+ )
+
+
@given(snapshot())
def test_lookup_snapshot_alias(snapshot):
resolved_alias = archive.lookup_snapshot_alias(snapshot, "HEAD")
assert resolved_alias is not None
assert resolved_alias["target_type"] == "revision"
assert resolved_alias["target"] is not None
+
+
+@given(revision())
+def test_lookup_snapshot_branch_names_filtering(archive_data, revision):
+ rev_id = hash_to_bytes(revision)
+ snapshot = Snapshot(
+ branches={
+ b"refs/heads/master": SnapshotBranch(
+ target=rev_id, target_type=TargetType.REVISION,
+ ),
+ b"refs/heads/incoming": SnapshotBranch(
+ target=rev_id, target_type=TargetType.REVISION,
+ ),
+ b"refs/pull/1": SnapshotBranch(
+ target=rev_id, target_type=TargetType.REVISION,
+ ),
+ b"refs/pull/2": SnapshotBranch(
+ target=rev_id, target_type=TargetType.REVISION,
+ ),
+ "non_ascii_name_é".encode(): SnapshotBranch(
+ target=rev_id, target_type=TargetType.REVISION,
+ ),
+ },
+ )
+ archive_data.snapshot_add([snapshot])
+
+ for include_pattern, exclude_prefix, nb_results in (
+ ("pull", None, 2),
+ ("incoming", None, 1),
+ ("é", None, 1),
+ (None, "refs/heads/", 3),
+ ("refs", "refs/heads/master", 3),
+ ):
+
+ branches = archive.lookup_snapshot(
+ hash_to_hex(snapshot.id),
+ branch_name_include_substring=include_pattern,
+ branch_name_exclude_prefix=exclude_prefix,
+ )["branches"]
+ assert len(branches) == nb_results
+ for branch_name in branches:
+ if include_pattern:
+ assert include_pattern in branch_name
+ if exclude_prefix:
+ assert not branch_name.startswith(exclude_prefix)
+
+
+@given(directory(), revision())
+def test_lookup_snapshot_branch_names_filtering_paginated(
+ archive_data, directory, revision
+):
+ pattern = "foo"
+ nb_branches_by_target_type = 10
+ branches = {}
+ for i in range(nb_branches_by_target_type):
+ branches[f"branch/directory/bar{i}".encode()] = SnapshotBranch(
+ target=hash_to_bytes(directory), target_type=TargetType.DIRECTORY,
+ )
+ branches[f"branch/revision/bar{i}".encode()] = SnapshotBranch(
+ target=hash_to_bytes(revision), target_type=TargetType.REVISION,
+ )
+ branches[f"branch/directory/{pattern}{i}".encode()] = SnapshotBranch(
+ target=hash_to_bytes(directory), target_type=TargetType.DIRECTORY,
+ )
+ branches[f"branch/revision/{pattern}{i}".encode()] = SnapshotBranch(
+ target=hash_to_bytes(revision), target_type=TargetType.REVISION,
+ )
+
+ snapshot = Snapshot(branches=branches)
+ archive_data.snapshot_add([snapshot])
+
+ branches_count = nb_branches_by_target_type // 2
+
+ for target_type in (
+ DIRECTORY,
+ REVISION,
+ ):
+ partial_branches = archive.lookup_snapshot(
+ hash_to_hex(snapshot.id),
+ target_types=[target_type],
+ branches_count=branches_count,
+ branch_name_include_substring=pattern,
+ )
+ branches = partial_branches["branches"]
+
+ assert len(branches) == branches_count
+ for branch_name, branch_data in branches.items():
+ assert pattern in branch_name
+ assert branch_data["target_type"] == target_type
+ for i in range(branches_count):
+ assert f"branch/{target_type}/{pattern}{i}" in branches
+ assert (
+ partial_branches["next_branch"]
+ == f"branch/{target_type}/{pattern}{branches_count}"
+ )
+
+ partial_branches = archive.lookup_snapshot(
+ hash_to_hex(snapshot.id),
+ target_types=[target_type],
+ branches_from=partial_branches["next_branch"],
+ branch_name_include_substring=pattern,
+ )
+ branches = partial_branches["branches"]
+
+ assert len(branches) == branches_count
+ for branch_name, branch_data in branches.items():
+ assert pattern in branch_name
+ assert branch_data["target_type"] == target_type
+ for i in range(branches_count, 2 * branches_count):
+ assert f"branch/{target_type}/{pattern}{i}" in branches
+ assert partial_branches["next_branch"] is None
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Dec 20 2024, 3:35 PM (11 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3224026
Attached To
D4616: common/archive: Add branch names filtering support in lookup_snapshot
Event Timeline
Log In to Comment