diff --git a/swh/model/hypothesis_strategies.py b/swh/model/hypothesis_strategies.py index d193852..448487a 100644 --- a/swh/model/hypothesis_strategies.py +++ b/swh/model/hypothesis_strategies.py @@ -1,450 +1,460 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from hypothesis import assume from hypothesis.extra.dateutil import timezones from hypothesis.strategies import ( binary, booleans, builds, characters, composite, datetimes, dictionaries, from_regex, integers, just, lists, none, one_of, sampled_from, sets, text, ) from .from_disk import DentryPerms from .model import ( Person, Timestamp, TimestampWithTimezone, Origin, OriginVisit, OriginVisitStatus, Snapshot, SnapshotBranch, ObjectType, TargetType, Release, Revision, RevisionType, BaseContent, Directory, DirectoryEntry, Content, SkippedContent, ) from .identifiers import snapshot_identifier, identifier_to_bytes pgsql_alphabet = characters( blacklist_categories=("Cs",), blacklist_characters=["\u0000"] ) # postgresql does not like these def optional(strategy): return one_of(none(), strategy) def pgsql_text(): return text(alphabet=pgsql_alphabet) def sha1_git(): return binary(min_size=20, max_size=20) def sha1(): return binary(min_size=20, max_size=20) def aware_datetimes(): - return datetimes(timezones=timezones()) + # datetimes in Software Heritage are not used for software artifacts + # (which may be much older than 2000), but only for objects like scheduler + # task runs, and origin visits, which were created by Software Heritage, + # so at least in 2015. + # We're forbidding old datetimes, because until 1956, many timezones had seconds + # in their "UTC offsets" (see + # ), which is not + # encodable in ISO8601; and we need our datetimes to be ISO8601-encodable in the + # RPC protocol + min_value = datetime.datetime(2000, 1, 1, 0, 0, 0) + return datetimes(min_value=min_value, timezones=timezones()) @composite def urls(draw): protocol = draw(sampled_from(["git", "http", "https", "deb"])) domain = draw(from_regex(r"\A([a-z]([a-z0-9-]*)\.){1,3}[a-z0-9]+\Z")) return "%s://%s" % (protocol, domain) def persons_d(): return builds( dict, fullname=binary(), email=optional(binary()), name=optional(binary()), ) def persons(): return persons_d().map(Person.from_dict) def timestamps_d(): max_seconds = datetime.datetime.max.replace( tzinfo=datetime.timezone.utc ).timestamp() min_seconds = datetime.datetime.min.replace( tzinfo=datetime.timezone.utc ).timestamp() return builds( dict, seconds=integers(min_seconds, max_seconds), microseconds=integers(0, 1000000), ) def timestamps(): return timestamps_d().map(Timestamp.from_dict) @composite def timestamps_with_timezone_d( draw, timestamp=timestamps_d(), offset=integers(min_value=-14 * 60, max_value=14 * 60), negative_utc=booleans(), ): timestamp = draw(timestamp) offset = draw(offset) negative_utc = draw(negative_utc) assume(not (negative_utc and offset)) return dict(timestamp=timestamp, offset=offset, negative_utc=negative_utc) timestamps_with_timezone = timestamps_with_timezone_d().map( TimestampWithTimezone.from_dict ) def origins_d(): return builds(dict, url=urls()) def origins(): return origins_d().map(Origin.from_dict) def origin_visits_d(): return builds( dict, visit=integers(0, 1000), origin=urls(), date=aware_datetimes(), status=sampled_from(["ongoing", "full", "partial"]), type=pgsql_text(), snapshot=optional(sha1_git()), ) def origin_visits(): return origin_visits_d().map(OriginVisit.from_dict) def metadata_dicts(): return dictionaries(pgsql_text(), pgsql_text()) def origin_visit_statuses_d(): return builds( dict, visit=integers(0, 1000), origin=urls(), status=sampled_from(["ongoing", "full", "partial"]), date=aware_datetimes(), snapshot=optional(sha1_git()), metadata=one_of(none(), metadata_dicts()), ) def origin_visit_statuses(): return origin_visit_statuses_d().map(OriginVisitStatus.from_dict) @composite def releases_d(draw): target_type = sampled_from([x.value for x in ObjectType]) name = binary() message = binary() synthetic = booleans() target = sha1_git() metadata = one_of(none(), revision_metadata()) return draw( one_of( builds( dict, name=name, message=message, synthetic=synthetic, author=none(), date=none(), target=target, target_type=target_type, metadata=metadata, ), builds( dict, name=name, message=message, synthetic=synthetic, date=timestamps_with_timezone_d(), author=persons_d(), target=target, target_type=target_type, metadata=metadata, ), ) ) def releases(): return releases_d().map(Release.from_dict) revision_metadata = metadata_dicts def revisions_d(): return builds( dict, message=binary(), synthetic=booleans(), author=persons_d(), committer=persons_d(), date=timestamps_with_timezone_d(), committer_date=timestamps_with_timezone_d(), parents=lists(sha1_git()), directory=sha1_git(), type=sampled_from([x.value for x in RevisionType]), metadata=one_of(none(), revision_metadata()), ) # TODO: metadata['extra_headers'] can have binary keys and values def revisions(): return revisions_d().map(Revision.from_dict) def directory_entries_d(): return builds( dict, name=binary(), target=sha1_git(), type=sampled_from(["file", "dir", "rev"]), perms=sampled_from([perm.value for perm in DentryPerms]), ) def directory_entries(): return directory_entries_d().map(DirectoryEntry) def directories_d(): return builds(dict, entries=lists(directory_entries_d())) def directories(): return directories_d().map(Directory.from_dict) def contents_d(): return one_of(present_contents_d(), skipped_contents_d()) def contents(): return one_of(present_contents(), skipped_contents()) def present_contents_d(): return builds( dict, data=binary(max_size=4096), ctime=optional(aware_datetimes()), status=one_of(just("visible"), just("hidden")), ) def present_contents(): return present_contents_d().map(lambda d: Content.from_data(**d)) @composite def skipped_contents_d(draw): result = BaseContent._hash_data(draw(binary(max_size=4096))) result.pop("data") nullify_attrs = draw( sets(sampled_from(["sha1", "sha1_git", "sha256", "blake2s256"])) ) for k in nullify_attrs: result[k] = None result["reason"] = draw(pgsql_text()) result["status"] = "absent" result["ctime"] = draw(optional(aware_datetimes())) return result def skipped_contents(): return skipped_contents_d().map(SkippedContent.from_dict) def branch_names(): return binary(min_size=1) def branch_targets_object_d(): return builds( dict, target=sha1_git(), target_type=sampled_from( [x.value for x in TargetType if x.value not in ("alias",)] ), ) def branch_targets_alias_d(): return builds( dict, target=sha1_git(), target_type=just("alias") ) # TargetType.ALIAS.value)) def branch_targets_d(*, only_objects=False): if only_objects: return branch_targets_object_d() else: return one_of(branch_targets_alias_d(), branch_targets_object_d()) def branch_targets(*, only_objects=False): return builds(SnapshotBranch.from_dict, branch_targets_d(only_objects=only_objects)) @composite def snapshots_d(draw, *, min_size=0, max_size=100, only_objects=False): branches = draw( dictionaries( keys=branch_names(), values=one_of(none(), branch_targets_d(only_objects=only_objects)), min_size=min_size, max_size=max_size, ) ) if not only_objects: # Make sure aliases point to actual branches unresolved_aliases = { branch: target["target"] for branch, target in branches.items() if ( target and target["target_type"] == "alias" and target["target"] not in branches ) } for alias_name, alias_target in unresolved_aliases.items(): # Override alias branch with one pointing to a real object # if max_size constraint is reached alias = alias_target if len(branches) < max_size else alias_name branches[alias] = draw(branch_targets_d(only_objects=True)) # Ensure no cycles between aliases while True: try: id_ = snapshot_identifier( { "branches": { name: branch or None for (name, branch) in branches.items() } } ) except ValueError as e: for (source, target) in e.args[1]: branches[source] = draw(branch_targets_d(only_objects=True)) else: break return dict(id=identifier_to_bytes(id_), branches=branches) def snapshots(*, min_size=0, max_size=100, only_objects=False): return snapshots_d( min_size=min_size, max_size=max_size, only_objects=only_objects ).map(Snapshot.from_dict) def objects(blacklist_types=("origin_visit_status",), split_content=False): """generates a random couple (type, obj) which obj is an instance of the Model class corresponding to obj_type. `blacklist_types` is a list of obj_type to exclude from the strategy. If `split_content` is True, generates Content and SkippedContent under different obj_type, resp. "content" and "skipped_content". """ strategies = [ ("origin", origins), ("origin_visit", origin_visits), ("origin_visit_status", origin_visit_statuses), ("snapshot", snapshots), ("release", releases), ("revision", revisions), ("directory", directories), ] if split_content: strategies.append(("content", present_contents)) strategies.append(("skipped_content", skipped_contents)) else: strategies.append(("content", contents)) args = [ obj_gen().map(lambda x, obj_type=obj_type: (obj_type, x)) for (obj_type, obj_gen) in strategies if obj_type not in blacklist_types ] return one_of(*args) def object_dicts(blacklist_types=("origin_visit_status",), split_content=False): """generates a random couple (type, dict) which dict is suitable for .from_dict() factory methods. `blacklist_types` is a list of obj_type to exclude from the strategy. If `split_content` is True, generates Content and SkippedContent under different obj_type, resp. "content" and "skipped_content". """ strategies = [ ("origin", origins_d), ("origin_visit", origin_visits_d), ("origin_visit_status", origin_visit_statuses_d), ("snapshot", snapshots_d), ("release", releases_d), ("revision", revisions_d), ("directory", directories_d), ] if split_content: strategies.append(("content", present_contents_d)) strategies.append(("skipped_content", skipped_contents_d)) else: strategies.append(("content", contents_d)) args = [ obj_gen().map(lambda x, obj_type=obj_type: (obj_type, x)) for (obj_type, obj_gen) in strategies if obj_type not in blacklist_types ] return one_of(*args) diff --git a/swh/model/tests/test_hypothesis_strategies.py b/swh/model/tests/test_hypothesis_strategies.py index 2df2d7a..1622b3c 100644 --- a/swh/model/tests/test_hypothesis_strategies.py +++ b/swh/model/tests/test_hypothesis_strategies.py @@ -1,188 +1,198 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import attr +import iso8601 from hypothesis import given, settings from swh.model.hashutil import DEFAULT_ALGORITHMS from swh.model.hypothesis_strategies import ( + aware_datetimes, objects, object_dicts, contents, skipped_contents, snapshots, origin_visits, ) from swh.model.model import TargetType target_types = ("content", "directory", "revision", "release", "snapshot", "alias") all_but_skipped_content = ( "origin", "origin_visit", "origin_visit_status", "snapshot", "release", "revision", "directory", "content", ) @given(objects(blacklist_types=())) def test_generation(obj_type_and_obj): (obj_type, object_) = obj_type_and_obj attr.validate(object_) @given(objects(split_content=False)) def test_generation_merged_content(obj_type_and_obj): # we should never generate a "skipped_content" here assert obj_type_and_obj[0] != "skipped_content" @given(objects(split_content=True, blacklist_types=all_but_skipped_content)) def test_generation_split_content(obj_type_and_obj): # we should only generate "skipped_content" assert obj_type_and_obj[0] == "skipped_content" @given(objects(blacklist_types=("origin_visit", "directory"))) def test_generation_blacklist(obj_type_and_obj): assert obj_type_and_obj[0] not in ("origin_visit", "directory") def assert_nested_dict(obj): """Tests the object is a nested dict and contains no more class from swh.model.model.""" if isinstance(obj, dict): for (key, value) in obj.items(): assert isinstance(key, (str, bytes)), key assert_nested_dict(value) elif isinstance(obj, list): for value in obj: assert_nested_dict(value) elif isinstance(obj, (int, float, str, bytes, bool, type(None), datetime.datetime)): pass else: assert False, obj @given(object_dicts(blacklist_types=())) def test_dicts_generation(obj_type_and_obj): (obj_type, object_) = obj_type_and_obj assert_nested_dict(object_) if obj_type == "content": COMMON_KEYS = set(DEFAULT_ALGORITHMS) | {"length", "status", "ctime"} if object_["status"] == "visible": assert set(object_) <= COMMON_KEYS | {"data"} elif object_["status"] == "absent": assert set(object_) == COMMON_KEYS | {"reason"} elif object_["status"] == "hidden": assert set(object_) <= COMMON_KEYS | {"data"} else: assert False, object_ elif obj_type == "release": assert object_["target_type"] in target_types elif obj_type == "snapshot": for branch in object_["branches"].values(): assert branch is None or branch["target_type"] in target_types +@given(aware_datetimes()) +def test_datetimes(dt): + # Checks this doesn't raise an error, eg. about seconds in the TZ offset + iso8601.parse_date(dt.isoformat()) + + assert dt.tzinfo is not None + + @given(object_dicts(split_content=False)) def test_dicts_generation_merged_content(obj_type_and_obj): # we should never generate a "skipped_content" here assert obj_type_and_obj[0] != "skipped_content" @given(object_dicts(split_content=True, blacklist_types=all_but_skipped_content)) def test_dicts_generation_split_content(obj_type_and_obj): # we should only generate "skipped_content" assert obj_type_and_obj[0] == "skipped_content" @given(object_dicts(blacklist_types=("release", "content"))) def test_dicts_generation_blacklist(obj_type_and_obj): assert obj_type_and_obj[0] not in ("release", "content") @given(objects()) def test_model_to_dicts(obj_type_and_obj): (obj_type, object_) = obj_type_and_obj obj_dict = object_.to_dict() assert_nested_dict(obj_dict) if obj_type == "content": COMMON_KEYS = set(DEFAULT_ALGORITHMS) | {"length", "status", "ctime"} if obj_dict["status"] == "visible": assert set(obj_dict) == COMMON_KEYS | {"data"} elif obj_dict["status"] == "absent": assert set(obj_dict) == COMMON_KEYS | {"reason"} elif obj_dict["status"] == "hidden": assert set(obj_dict) == COMMON_KEYS | {"data"} else: assert False, obj_dict elif obj_type == "release": assert obj_dict["target_type"] in target_types elif obj_type == "snapshot": for branch in obj_dict["branches"].values(): assert branch is None or branch["target_type"] in target_types @given(contents()) def test_content_aware_datetime(cont): assert cont.ctime is None or cont.ctime.tzinfo is not None @given(skipped_contents()) def test_skipped_content_aware_datetime(cont): assert cont.ctime is None or cont.ctime.tzinfo is not None _min_snp_size = 10 _max_snp_size = 100 @given(snapshots(min_size=_min_snp_size, max_size=_max_snp_size)) @settings(max_examples=1) def test_snapshots_strategy(snapshot): branches = snapshot.branches assert len(branches) >= _min_snp_size assert len(branches) <= _max_snp_size aliases = [] # check snapshot integrity for name, branch in branches.items(): assert branch is None or branch.target_type.value in target_types if branch is not None and branch.target_type == TargetType.ALIAS: aliases.append(name) assert branch.target in branches # check no cycles between aliases for alias in aliases: processed_alias = set() current_alias = alias while ( branches[current_alias] is not None and branches[current_alias].target_type == TargetType.ALIAS ): assert branches[current_alias].target not in processed_alias processed_alias.add(current_alias) current_alias = branches[current_alias].target @given(snapshots(min_size=_min_snp_size, max_size=_min_snp_size)) @settings(max_examples=1) def test_snapshots_strategy_fixed_size(snapshot): assert len(snapshot.branches) == _min_snp_size @given(origin_visits()) def test_origin_visit_aware_datetime(visit): assert visit.date.tzinfo is not None