Changeset View
Changeset View
Standalone View
Standalone View
swh/web/tests/strategies.py
# Copyright (C) 2018-2021 The Software Heritage developers | # Copyright (C) 2018-2021 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU Affero General Public License version 3, or any later version | # License: GNU Affero General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from collections import defaultdict | |||||
from datetime import datetime | from datetime import datetime | ||||
import random | import random | ||||
from hypothesis import assume, settings | from hypothesis import assume, settings | ||||
from hypothesis.extra.dateutil import timezones | from hypothesis.extra.dateutil import timezones | ||||
from hypothesis.strategies import ( | from hypothesis.strategies import ( | ||||
binary, | binary, | ||||
characters, | characters, | ||||
composite, | composite, | ||||
datetimes, | datetimes, | ||||
just, | |||||
lists, | lists, | ||||
sampled_from, | sampled_from, | ||||
text, | text, | ||||
) | ) | ||||
from swh.model.hashutil import hash_to_bytes, hash_to_hex | from swh.model.hashutil import hash_to_bytes, hash_to_hex | ||||
from swh.model.hypothesis_strategies import origins as new_origin_strategy | from swh.model.hypothesis_strategies import origins as new_origin_strategy | ||||
from swh.model.hypothesis_strategies import snapshots as new_snapshot | from swh.model.hypothesis_strategies import snapshots as new_snapshot | ||||
from swh.model.model import Person, Revision, RevisionType, TimestampWithTimezone | from swh.model.model import Person, Revision, RevisionType, TimestampWithTimezone | ||||
from swh.model.swhids import ObjectType | from swh.model.swhids import ObjectType | ||||
from swh.storage.algos.revisions_walker import get_revisions_walker | |||||
from swh.storage.algos.snapshot import snapshot_get_latest | |||||
from swh.web.tests.data import get_tests_data | from swh.web.tests.data import get_tests_data | ||||
# Module dedicated to the generation of input data for tests through | # Module dedicated to the generation of input data for tests through | ||||
# the use of hypothesis. | # the use of hypothesis. | ||||
# Some of these data are sampled from a test archive created and populated | # Some of these data are sampled from a test archive created and populated | ||||
# in the swh.web.tests.data module. | # in the swh.web.tests.data module. | ||||
# Set the swh-web hypothesis profile if none has been explicitly set | # Set the swh-web hypothesis profile if none has been explicitly set | ||||
▲ Show 20 Lines • Show All 127 Lines • ▼ Show 20 Lines | def unknown_release(): | ||||
Hypothesis strategy returning a random revision not ingested | Hypothesis strategy returning a random revision not ingested | ||||
into the test archive. | into the test archive. | ||||
""" | """ | ||||
return sha1().filter( | return sha1().filter( | ||||
lambda s: get_tests_data()["storage"].release_get([s])[0] is None | lambda s: get_tests_data()["storage"].release_get([s])[0] is None | ||||
) | ) | ||||
def revision(): | |||||
""" | |||||
Hypothesis strategy returning a random revision ingested | |||||
into the test archive. | |||||
""" | |||||
return _known_swh_object("revisions") | |||||
def unknown_revision(): | def unknown_revision(): | ||||
""" | """ | ||||
Hypothesis strategy returning a random revision not ingested | Hypothesis strategy returning a random revision not ingested | ||||
into the test archive. | into the test archive. | ||||
""" | """ | ||||
return sha1().filter( | return sha1().filter( | ||||
lambda s: get_tests_data()["storage"].revision_get([hash_to_bytes(s)])[0] | lambda s: get_tests_data()["storage"].revision_get([hash_to_bytes(s)])[0] | ||||
is None | is None | ||||
▲ Show 20 Lines • Show All 50 Lines • ▼ Show 20 Lines | return Revision( | ||||
message=draw(text(min_size=20, max_size=100).map(lambda t: t.encode())), | message=draw(text(min_size=20, max_size=100).map(lambda t: t.encode())), | ||||
date=TimestampWithTimezone.from_datetime(draw(new_swh_date())), | date=TimestampWithTimezone.from_datetime(draw(new_swh_date())), | ||||
committer_date=TimestampWithTimezone.from_datetime(draw(new_swh_date())), | committer_date=TimestampWithTimezone.from_datetime(draw(new_swh_date())), | ||||
synthetic=False, | synthetic=False, | ||||
type=RevisionType.GIT, | type=RevisionType.GIT, | ||||
) | ) | ||||
def revisions(min_size=2, max_size=8): | |||||
""" | |||||
Hypothesis strategy returning random revisions ingested | |||||
into the test archive. | |||||
""" | |||||
return lists(revision(), min_size=min_size, max_size=max_size) | |||||
def unknown_revisions(min_size=2, max_size=8): | def unknown_revisions(min_size=2, max_size=8): | ||||
""" | """ | ||||
Hypothesis strategy returning random revisions not ingested | Hypothesis strategy returning random revisions not ingested | ||||
into the test archive. | into the test archive. | ||||
""" | """ | ||||
return lists(unknown_revision(), min_size=min_size, max_size=max_size) | return lists(unknown_revision(), min_size=min_size, max_size=max_size) | ||||
Show All 21 Lines | def unknown_snapshot(): | ||||
into the test archive. | into the test archive. | ||||
""" | """ | ||||
return sha1().filter( | return sha1().filter( | ||||
lambda s: get_tests_data()["storage"].snapshot_get_branches(hash_to_bytes(s)) | lambda s: get_tests_data()["storage"].snapshot_get_branches(hash_to_bytes(s)) | ||||
is None | is None | ||||
) | ) | ||||
def _get_origin_dfs_revisions_walker(): | |||||
tests_data = get_tests_data() | |||||
storage = tests_data["storage"] | |||||
origin = random.choice(tests_data["origins"][:-1]) | |||||
snapshot = snapshot_get_latest(storage, origin["url"]) | |||||
if snapshot.branches[b"HEAD"].target_type.value == "alias": | |||||
target = snapshot.branches[b"HEAD"].target | |||||
head = snapshot.branches[target].target | |||||
else: | |||||
head = snapshot.branches[b"HEAD"].target | |||||
return get_revisions_walker("dfs", storage, head) | |||||
def ancestor_revisions(): | |||||
""" | |||||
Hypothesis strategy returning a pair of revisions ingested into the | |||||
test archive with an ancestor relation. | |||||
""" | |||||
# get a dfs revisions walker for one of the origins | |||||
# loaded into the test archive | |||||
revisions_walker = _get_origin_dfs_revisions_walker() | |||||
master_revisions = [] | |||||
children = defaultdict(list) | |||||
init_rev_found = False | |||||
# get revisions only authored in the master branch | |||||
for rev in revisions_walker: | |||||
for rev_p in rev["parents"]: | |||||
children[rev_p].append(rev["id"]) | |||||
if not init_rev_found: | |||||
master_revisions.append(rev) | |||||
if not rev["parents"]: | |||||
init_rev_found = True | |||||
# head revision | |||||
root_rev = master_revisions[0] | |||||
# pick a random revision, different from head, only authored | |||||
# in the master branch | |||||
ancestor_rev_idx = random.choice(list(range(1, len(master_revisions) - 1))) | |||||
ancestor_rev = master_revisions[ancestor_rev_idx] | |||||
ancestor_child_revs = children[ancestor_rev["id"]] | |||||
return just( | |||||
{ | |||||
"sha1_git_root": hash_to_hex(root_rev["id"]), | |||||
"sha1_git": hash_to_hex(ancestor_rev["id"]), | |||||
"children": [hash_to_hex(r) for r in ancestor_child_revs], | |||||
} | |||||
) | |||||
def non_ancestor_revisions(): | |||||
""" | |||||
Hypothesis strategy returning a pair of revisions ingested into the | |||||
test archive with no ancestor relation. | |||||
""" | |||||
# get a dfs revisions walker for one of the origins | |||||
# loaded into the test archive | |||||
revisions_walker = _get_origin_dfs_revisions_walker() | |||||
merge_revs = [] | |||||
children = defaultdict(list) | |||||
# get all merge revisions | |||||
for rev in revisions_walker: | |||||
if len(rev["parents"]) > 1: | |||||
merge_revs.append(rev) | |||||
for rev_p in rev["parents"]: | |||||
children[rev_p].append(rev["id"]) | |||||
# find a merge revisions whose parents have a unique child revision | |||||
random.shuffle(merge_revs) | |||||
selected_revs = None | |||||
for merge_rev in merge_revs: | |||||
if all(len(children[rev_p]) == 1 for rev_p in merge_rev["parents"]): | |||||
selected_revs = merge_rev["parents"] | |||||
return just( | |||||
{ | |||||
"sha1_git_root": hash_to_hex(selected_revs[0]), | |||||
"sha1_git": hash_to_hex(selected_revs[1]), | |||||
} | |||||
) | |||||
# The following strategies returns data specific to some tests | |||||
# that can not be generated and thus are hardcoded. | |||||
def revision_with_submodules(): | |||||
""" | |||||
Hypothesis strategy returning a revision that is known to | |||||
point to a directory with revision entries (aka git submodule) | |||||
""" | |||||
return just( | |||||
{ | |||||
"rev_sha1_git": "ffcb69001f3f6745dfd5b48f72ab6addb560e234", | |||||
"rev_dir_sha1_git": "d92a21446387fa28410e5a74379c934298f39ae2", | |||||
"rev_dir_rev_path": "libtess2", | |||||
} | |||||
) | |||||
def swhid(): | def swhid(): | ||||
""" | """ | ||||
Hypothesis strategy returning a qualified SWHID for any object | Hypothesis strategy returning a qualified SWHID for any object | ||||
ingested into the test archive. | ingested into the test archive. | ||||
""" | """ | ||||
return _known_swh_object("swhids") | return _known_swh_object("swhids") | ||||
def revision_swhid(): | |||||
""" | |||||
Hypothesis strategy returning a qualified SWHID for a revision object | |||||
ingested into the test archive. | |||||
""" | |||||
return swhid().filter(lambda swhid: swhid.object_type == ObjectType.REVISION) | |||||
def snapshot_swhid(): | def snapshot_swhid(): | ||||
""" | """ | ||||
Hypothesis strategy returning a qualified SWHID for a snapshot object | Hypothesis strategy returning a qualified SWHID for a snapshot object | ||||
ingested into the test archive. | ingested into the test archive. | ||||
""" | """ | ||||
return swhid().filter(lambda swhid: swhid.object_type == ObjectType.SNAPSHOT) | return swhid().filter(lambda swhid: swhid.object_type == ObjectType.SNAPSHOT) |