diff --git a/swh/web/common/archive.py b/swh/web/common/archive.py --- a/swh/web/common/archive.py +++ b/swh/web/common/archive.py @@ -1014,31 +1014,63 @@ def lookup_snapshot( - snapshot_id, branches_from="", branches_count=1000, target_types=None -): + snapshot_id: str, + branches_from: str = "", + branches_count: int = 1000, + target_types: Optional[List[str]] = None, + branch_name_includes: Optional[List[str]] = None, + branch_name_excludes: Optional[List[str]] = None, +) -> Dict[str, Any]: """Return information about a snapshot, aka the list of named branches found during a specific visit of an origin. Args: - snapshot_id (str): sha1 identifier of the snapshot - branches_from (str): optional parameter used to skip branches + snapshot_id: sha1 identifier of the snapshot + branches_from: optional parameter used to skip branches whose name is lesser than it before returning them - branches_count (int): optional parameter used to restrain + branches_count: optional parameter used to restrain the amount of returned branches - target_types (list): optional parameter used to filter the + target_types: optional parameter used to filter the target types of branch to return (possible values that can be contained in that list are `'content', 'directory', 'revision', 'release', 'snapshot', 'alias'`) + branch_name_includes: optional parameter used to filter branches + according to their names, only those whose name is containing + one of the substring in the provided list will be returned + branch_name_excludes: optional parameter used to filter out branches + according to their names, those whose name is containing one of + the substring in the provided list will not be returned + Returns: A dict filled with the snapshot content. """ snapshot_id_bin = _to_sha1_bin(snapshot_id) + branches_name_pattern = None + if branch_name_includes and branch_name_excludes: + includes = "|".join([re.escape(include) for include in branch_name_includes]) + excludes = "|".join([re.escape(exclude) for exclude in branch_name_excludes]) + branches_name_pattern = ( + f"^(?:(?!(?:{excludes})).)*(?:{includes})(?:(?!(?:{excludes})).)*$" + ) + elif branch_name_includes: + branches_name_pattern = "|".join( + [re.escape(include) for include in branch_name_includes] + ) + elif branch_name_excludes: + excludes = "|".join([re.escape(exclude) for exclude in branch_name_excludes]) + branches_name_pattern = f"^((?!{excludes}).)*$" + partial_branches = storage.snapshot_get_branches( - snapshot_id_bin, branches_from.encode(), branches_count, target_types + snapshot_id_bin, + branches_from.encode(), + branches_count, + target_types, + branches_name_pattern, ) if not partial_branches: raise NotFoundExc(f"Snapshot with id {snapshot_id} not found!") + return converters.from_partial_branches(partial_branches) diff --git a/swh/web/tests/common/test_archive.py b/swh/web/tests/common/test_archive.py --- a/swh/web/tests/common/test_archive.py +++ b/swh/web/tests/common/test_archive.py @@ -14,7 +14,16 @@ from swh.model.from_disk import DentryPerms from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.model.identifiers import CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT -from swh.model.model import Directory, DirectoryEntry, Origin, OriginVisit, Revision +from swh.model.model import ( + Directory, + DirectoryEntry, + Origin, + OriginVisit, + Revision, + Snapshot, + SnapshotBranch, + TargetType, +) from swh.web.common import archive from swh.web.common.exc import BadInputExc, NotFoundExc from swh.web.common.typing import OriginInfo @@ -1038,3 +1047,111 @@ assert resolved_alias is not None assert resolved_alias["target_type"] == "revision" assert resolved_alias["target"] is not None + + +@given(revision()) +def test_lookup_snapshot_branch_names_filtering(archive_data, revision): + rev_id = hash_to_bytes(revision) + snapshot = Snapshot( + branches={ + b"refs/heads/master": SnapshotBranch( + target=rev_id, target_type=TargetType.REVISION, + ), + b"refs/heads/incoming": SnapshotBranch( + target=rev_id, target_type=TargetType.REVISION, + ), + b"refs/pull/1": SnapshotBranch( + target=rev_id, target_type=TargetType.REVISION, + ), + b"refs/pull/2": SnapshotBranch( + target=rev_id, target_type=TargetType.REVISION, + ), + }, + ) + archive_data.snapshot_add([snapshot]) + + for includes, excludes, nb_results in ( + (["pull"], None, 2), + (["incoming"], None, 1), + (None, ["heads"], 2), + (["refs"], ["master"], 3), + (["heads", "pull"], None, 4), + ): + + branches = archive.lookup_snapshot( + hash_to_hex(snapshot.id), + branch_name_includes=includes, + branch_name_excludes=excludes, + )["branches"] + assert len(branches) == nb_results + for branch_name in branches: + if includes: + assert any([include in branch_name for include in includes]) + if excludes: + assert all([exclude not in branch_name for exclude in excludes]) + + +@given(directory(), revision()) +def test_lookup_snapshot_branch_names_filtering_paginated( + archive_data, directory, revision +): + pattern = "foo" + nb_branches_by_target_type = 10 + branches = {} + for i in range(nb_branches_by_target_type): + branches[f"branch/directory/bar{i}".encode()] = SnapshotBranch( + target=hash_to_bytes(directory), target_type=TargetType.DIRECTORY, + ) + branches[f"branch/revision/bar{i}".encode()] = SnapshotBranch( + target=hash_to_bytes(revision), target_type=TargetType.REVISION, + ) + branches[f"branch/directory/{pattern}{i}".encode()] = SnapshotBranch( + target=hash_to_bytes(directory), target_type=TargetType.DIRECTORY, + ) + branches[f"branch/revision/{pattern}{i}".encode()] = SnapshotBranch( + target=hash_to_bytes(revision), target_type=TargetType.REVISION, + ) + + snapshot = Snapshot(branches=branches) + archive_data.snapshot_add([snapshot]) + + branches_count = nb_branches_by_target_type // 2 + + for target_type in ( + DIRECTORY, + REVISION, + ): + partial_branches = archive.lookup_snapshot( + hash_to_hex(snapshot.id), + target_types=[target_type], + branches_count=branches_count, + branch_name_includes=[pattern], + ) + branches = partial_branches["branches"] + + assert len(branches) == branches_count + for branch_name, branch_data in branches.items(): + assert pattern in branch_name + assert branch_data["target_type"] == target_type + for i in range(branches_count): + assert f"branch/{target_type}/{pattern}{i}" in branches + assert ( + partial_branches["next_branch"] + == f"branch/{target_type}/{pattern}{branches_count}" + ) + + partial_branches = archive.lookup_snapshot( + hash_to_hex(snapshot.id), + branch_name_includes=[pattern], + target_types=[target_type], + branches_from=partial_branches["next_branch"], + ) + branches = partial_branches["branches"] + + assert len(branches) == branches_count + for branch_name, branch_data in branches.items(): + assert pattern in branch_name + assert branch_data["target_type"] == target_type + for i in range(branches_count, 2 * branches_count): + assert f"branch/{target_type}/{pattern}{i}" in branches + assert partial_branches["next_branch"] is None