Changeset View
Changeset View
Standalone View
Standalone View
swh/web/tests/common/test_archive.py
# Copyright (C) 2015-2020 The Software Heritage developers | # Copyright (C) 2015-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU Affero General Public License version 3, or any later version | # License: GNU Affero General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from collections import defaultdict | from collections import defaultdict | ||||
import hashlib | import hashlib | ||||
import itertools | import itertools | ||||
import random | import random | ||||
from hypothesis import given | from hypothesis import given | ||||
import pytest | import pytest | ||||
from swh.model.from_disk import DentryPerms | from swh.model.from_disk import DentryPerms | ||||
from swh.model.hashutil import hash_to_bytes, hash_to_hex | from swh.model.hashutil import hash_to_bytes, hash_to_hex | ||||
from swh.model.identifiers import CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT | from swh.model.identifiers import CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT | ||||
from swh.model.model import Directory, DirectoryEntry, Origin, OriginVisit, Revision | from swh.model.model import ( | ||||
Directory, | |||||
DirectoryEntry, | |||||
Origin, | |||||
OriginVisit, | |||||
Revision, | |||||
Snapshot, | |||||
SnapshotBranch, | |||||
TargetType, | |||||
) | |||||
from swh.web.common import archive | from swh.web.common import archive | ||||
from swh.web.common.exc import BadInputExc, NotFoundExc | from swh.web.common.exc import BadInputExc, NotFoundExc | ||||
from swh.web.common.typing import OriginInfo | from swh.web.common.typing import OriginInfo | ||||
from swh.web.tests.conftest import ctags_json_missing, fossology_missing | from swh.web.tests.conftest import ctags_json_missing, fossology_missing | ||||
from swh.web.tests.data import random_content, random_sha1 | from swh.web.tests.data import random_content, random_sha1 | ||||
from swh.web.tests.strategies import ( | from swh.web.tests.strategies import ( | ||||
ancestor_revisions, | ancestor_revisions, | ||||
content, | content, | ||||
▲ Show 20 Lines • Show All 1,007 Lines • ▼ Show 20 Lines | |||||
@given(snapshot()) | @given(snapshot()) | ||||
def test_lookup_snapshot_alias(snapshot): | def test_lookup_snapshot_alias(snapshot): | ||||
resolved_alias = archive.lookup_snapshot_alias(snapshot, "HEAD") | resolved_alias = archive.lookup_snapshot_alias(snapshot, "HEAD") | ||||
assert resolved_alias is not None | assert resolved_alias is not None | ||||
assert resolved_alias["target_type"] == "revision" | assert resolved_alias["target_type"] == "revision" | ||||
assert resolved_alias["target"] is not None | assert resolved_alias["target"] is not None | ||||
@given(revision()) | |||||
def test_lookup_snapshot_branch_names_filtering(archive_data, revision): | |||||
rev_id = hash_to_bytes(revision) | |||||
snapshot = Snapshot( | |||||
branches={ | |||||
b"refs/heads/master": SnapshotBranch( | |||||
target=rev_id, target_type=TargetType.REVISION, | |||||
), | |||||
b"refs/heads/incoming": SnapshotBranch( | |||||
target=rev_id, target_type=TargetType.REVISION, | |||||
), | |||||
b"refs/pull/1": SnapshotBranch( | |||||
target=rev_id, target_type=TargetType.REVISION, | |||||
), | |||||
b"refs/pull/2": SnapshotBranch( | |||||
target=rev_id, target_type=TargetType.REVISION, | |||||
), | |||||
}, | |||||
) | |||||
archive_data.snapshot_add([snapshot]) | |||||
for includes, excludes, nb_results in ( | |||||
(["pull"], None, 2), | |||||
(["incoming"], None, 1), | |||||
(None, ["heads"], 2), | |||||
(["refs"], ["master"], 3), | |||||
(["heads", "pull"], None, 4), | |||||
): | |||||
branches = archive.lookup_snapshot( | |||||
hash_to_hex(snapshot.id), | |||||
branch_name_includes=includes, | |||||
branch_name_excludes=excludes, | |||||
)["branches"] | |||||
assert len(branches) == nb_results | |||||
for branch_name in branches: | |||||
if includes: | |||||
assert any([include in branch_name for include in includes]) | |||||
if excludes: | |||||
assert all([exclude not in branch_name for exclude in excludes]) | |||||
@given(directory(), revision()) | |||||
def test_lookup_snapshot_branch_names_filtering_paginated( | |||||
archive_data, directory, revision | |||||
): | |||||
pattern = "foo" | |||||
nb_branches_by_target_type = 10 | |||||
branches = {} | |||||
for i in range(nb_branches_by_target_type): | |||||
branches[f"branch/directory/bar{i}".encode()] = SnapshotBranch( | |||||
target=hash_to_bytes(directory), target_type=TargetType.DIRECTORY, | |||||
) | |||||
branches[f"branch/revision/bar{i}".encode()] = SnapshotBranch( | |||||
target=hash_to_bytes(revision), target_type=TargetType.REVISION, | |||||
) | |||||
branches[f"branch/directory/{pattern}{i}".encode()] = SnapshotBranch( | |||||
target=hash_to_bytes(directory), target_type=TargetType.DIRECTORY, | |||||
) | |||||
branches[f"branch/revision/{pattern}{i}".encode()] = SnapshotBranch( | |||||
target=hash_to_bytes(revision), target_type=TargetType.REVISION, | |||||
) | |||||
snapshot = Snapshot(branches=branches) | |||||
archive_data.snapshot_add([snapshot]) | |||||
branches_count = nb_branches_by_target_type // 2 | |||||
for target_type in ( | |||||
DIRECTORY, | |||||
REVISION, | |||||
): | |||||
partial_branches = archive.lookup_snapshot( | |||||
hash_to_hex(snapshot.id), | |||||
target_types=[target_type], | |||||
branches_count=branches_count, | |||||
branch_name_includes=[pattern], | |||||
) | |||||
branches = partial_branches["branches"] | |||||
assert len(branches) == branches_count | |||||
for branch_name, branch_data in branches.items(): | |||||
assert pattern in branch_name | |||||
assert branch_data["target_type"] == target_type | |||||
for i in range(branches_count): | |||||
assert f"branch/{target_type}/{pattern}{i}" in branches | |||||
assert ( | |||||
partial_branches["next_branch"] | |||||
== f"branch/{target_type}/{pattern}{branches_count}" | |||||
) | |||||
partial_branches = archive.lookup_snapshot( | |||||
hash_to_hex(snapshot.id), | |||||
branch_name_includes=[pattern], | |||||
target_types=[target_type], | |||||
branches_from=partial_branches["next_branch"], | |||||
) | |||||
branches = partial_branches["branches"] | |||||
assert len(branches) == branches_count | |||||
for branch_name, branch_data in branches.items(): | |||||
assert pattern in branch_name | |||||
assert branch_data["target_type"] == target_type | |||||
for i in range(branches_count, 2 * branches_count): | |||||
assert f"branch/{target_type}/{pattern}{i}" in branches | |||||
assert partial_branches["next_branch"] is None |