Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9346266
D2905.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
13 KB
Subscribers
None
D2905.diff
View Options
diff --git a/swh/model/hypothesis_strategies.py b/swh/model/hypothesis_strategies.py
--- a/swh/model/hypothesis_strategies.py
+++ b/swh/model/hypothesis_strategies.py
@@ -3,22 +3,20 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import attr
import datetime
from hypothesis import assume
from hypothesis.strategies import (
- binary, booleans, builds, characters, composite, dictionaries,
- from_regex, integers, just, lists, none, one_of,
- sampled_from, sets, text, tuples,
-)
+ binary, booleans, builds, characters,
+ composite, datetimes, dictionaries, from_regex, integers, just, lists,
+ none, one_of, sampled_from, sets, text, )
from .from_disk import DentryPerms
from .model import (
- Person, Timestamp, TimestampWithTimezone, Origin, OriginVisit,
- OriginVisitUpdate, Snapshot, SnapshotBranch, TargetType, Release,
- Revision, Directory, DirectoryEntry, Content, SkippedContent
-)
+ Person, Timestamp, TimestampWithTimezone, Origin,
+ OriginVisit, OriginVisitUpdate, Snapshot, SnapshotBranch, ObjectType,
+ TargetType, Release, Revision, RevisionType, BaseContent, Directory,
+ DirectoryEntry, Content, SkippedContent, )
from .identifiers import snapshot_identifier, identifier_to_bytes
@@ -51,178 +49,262 @@
return '%s://%s' % (protocol, domain)
+def persons_d():
+ return builds(
+ dict,
+ fullname=binary(),
+ email=optional(binary()),
+ name=optional(binary()),
+ )
+
+
def persons():
- return builds(Person, email=optional(binary()), name=optional(binary()))
+ return persons_d().map(Person.from_dict)
-def timestamps():
+def timestamps_d():
max_seconds = datetime.datetime.max.replace(
tzinfo=datetime.timezone.utc).timestamp()
min_seconds = datetime.datetime.min.replace(
tzinfo=datetime.timezone.utc).timestamp()
return builds(
- Timestamp,
+ dict,
seconds=integers(min_seconds, max_seconds),
microseconds=integers(0, 1000000))
+def timestamps():
+ return timestamps_d().map(Timestamp.from_dict)
+
+
@composite
-def timestamps_with_timezone(
+def timestamps_with_timezone_d(
draw,
- timestamp=timestamps(),
+ timestamp=timestamps_d(),
offset=integers(min_value=-14*60, max_value=14*60),
negative_utc=booleans()):
timestamp = draw(timestamp)
offset = draw(offset)
negative_utc = draw(negative_utc)
assume(not (negative_utc and offset))
- return TimestampWithTimezone(
+ return dict(
timestamp=timestamp,
offset=offset,
negative_utc=negative_utc)
-def origins():
+timestamps_with_timezone = timestamps_with_timezone_d().map(
+ TimestampWithTimezone.from_dict)
+
+
+def origins_d():
return builds(
- Origin,
+ dict,
url=urls())
-def origin_visits():
+def origins():
+ return origins_d().map(Origin.from_dict)
+
+
+def origin_visits_d():
return builds(
- OriginVisit,
+ dict,
visit=integers(0, 1000),
origin=urls(),
+ date=datetimes(),
status=sampled_from(['ongoing', 'full', 'partial']),
type=pgsql_text(),
snapshot=optional(sha1_git()),
)
+def origin_visits():
+ return origin_visits_d().map(OriginVisit.from_dict)
+
+
def metadata_dicts():
return dictionaries(pgsql_text(), pgsql_text())
-def origin_visit_updates():
+def origin_visit_updates_d():
return builds(
- OriginVisitUpdate,
+ dict,
visit=integers(0, 1000),
origin=urls(),
status=sampled_from(['ongoing', 'full', 'partial']),
+ date=datetimes(),
snapshot=optional(sha1_git()),
metadata=one_of(none(), metadata_dicts()))
+def origin_visit_updates():
+ return origin_visit_updates_d().map(OriginVisitUpdate.from_dict)
+
+
@composite
-def releases(draw):
- (date, author) = draw(one_of(
- tuples(none(), none()),
- tuples(timestamps_with_timezone(), persons())))
- rel = draw(builds(
- Release,
- author=none(),
- date=none(),
- target=sha1_git()))
- return attr.evolve(
- rel,
- date=date,
- author=author)
+def releases_d(draw):
+ target_type = sampled_from([x.value for x in ObjectType])
+ name = binary()
+ message = binary()
+ synthetic = booleans()
+ target = sha1_git()
+ metadata = one_of(none(), revision_metadata())
+
+ return draw(one_of(
+ builds(
+ dict,
+ name=name,
+ message=message,
+ synthetic=synthetic,
+ author=none(),
+ date=none(),
+ target=target,
+ target_type=target_type,
+ metadata=metadata,
+ ),
+ builds(
+ dict,
+ name=name,
+ message=message,
+ synthetic=synthetic,
+ date=timestamps_with_timezone_d(),
+ author=persons_d(),
+ target=target,
+ target_type=target_type,
+ metadata=metadata,
+ ),
+ ))
+
+
+def releases():
+ return releases_d().map(Release.from_dict)
revision_metadata = metadata_dicts
-def revisions():
+def revisions_d():
return builds(
- Revision,
- author=persons(),
- committer=persons(),
- date=timestamps_with_timezone(),
- committer_date=timestamps_with_timezone(),
+ dict,
+ message=binary(),
+ synthetic=booleans(),
+ author=persons_d(),
+ committer=persons_d(),
+ date=timestamps_with_timezone_d(),
+ committer_date=timestamps_with_timezone_d(),
parents=lists(sha1_git()),
directory=sha1_git(),
+ type=sampled_from([x.value for x in RevisionType]),
metadata=one_of(none(), revision_metadata()))
# TODO: metadata['extra_headers'] can have binary keys and values
-def directory_entries():
+def revisions():
+ return revisions_d().map(Revision.from_dict)
+
+
+def directory_entries_d():
return builds(
- DirectoryEntry,
+ dict,
+ name=binary(),
target=sha1_git(),
+ type=sampled_from(['file', 'dir', 'rev']),
perms=sampled_from([perm.value for perm in DentryPerms]))
-def directories():
+def directory_entries():
+ return directory_entries_d().map(DirectoryEntry)
+
+
+def directories_d():
return builds(
- Directory,
- entries=lists(directory_entries()))
+ dict,
+ entries=lists(directory_entries_d()))
+
+
+def directories():
+ return directories_d().map(Directory.from_dict)
+
+
+def contents_d():
+ return one_of(present_contents_d(), skipped_contents_d())
def contents():
return one_of(present_contents(), skipped_contents())
-def present_contents():
+def present_contents_d():
return builds(
- Content.from_data,
- binary(max_size=4096),
+ dict,
+ data=binary(max_size=4096),
status=one_of(just('visible'), just('hidden')),
)
+def present_contents():
+ return present_contents_d().map(lambda d: Content.from_data(**d))
+
+
@composite
-def skipped_contents(draw):
+def skipped_contents_d(draw):
+ result = BaseContent._hash_data(draw(binary(max_size=4096)))
+ result.pop('data')
nullify_attrs = draw(
sets(sampled_from(['sha1', 'sha1_git', 'sha256', 'blake2s256']))
)
+ for k in nullify_attrs:
+ result[k] = None
+ result['reason'] = draw(pgsql_text())
+ result['status'] = 'absent'
+ return result
- new_attrs = {
- k: None
- for k in nullify_attrs
- }
- ret = draw(builds(
- SkippedContent.from_data,
- binary(max_size=4096),
- reason=pgsql_text(),
- ))
-
- return attr.evolve(ret, **new_attrs)
+def skipped_contents():
+ return skipped_contents_d().map(SkippedContent.from_dict)
def branch_names():
return binary(min_size=1)
-def branch_targets_object():
+def branch_targets_object_d():
return builds(
- SnapshotBranch,
+ dict,
target=sha1_git(),
target_type=sampled_from([
- TargetType.CONTENT, TargetType.DIRECTORY, TargetType.REVISION,
- TargetType.RELEASE, TargetType.SNAPSHOT]))
+ x.value for x in TargetType
+ if x.value not in ('alias', )]))
-def branch_targets_alias():
+def branch_targets_alias_d():
return builds(
- SnapshotBranch,
- target_type=just(TargetType.ALIAS))
+ dict,
+ target=sha1_git(),
+ target_type=just('alias')) # TargetType.ALIAS.value))
-def branch_targets(*, only_objects=False):
+def branch_targets_d(*, only_objects=False):
if only_objects:
- return branch_targets_object()
+ return branch_targets_object_d()
else:
- return one_of(branch_targets_alias(), branch_targets_object())
+ return one_of(branch_targets_alias_d(), branch_targets_object_d())
+
+
+def branch_targets(*, only_objects=False):
+ return builds(
+ SnapshotBranch.from_dict,
+ branch_targets_d(only_objects=only_objects))
@composite
-def snapshots(draw, *, min_size=0, max_size=100, only_objects=False):
+def snapshots_d(draw, *, min_size=0, max_size=100, only_objects=False):
branches = draw(dictionaries(
keys=branch_names(),
values=one_of(
none(),
- branch_targets(only_objects=only_objects)
+ branch_targets_d(only_objects=only_objects)
),
min_size=min_size,
max_size=max_size,
@@ -231,33 +313,38 @@
if not only_objects:
# Make sure aliases point to actual branches
unresolved_aliases = {
- target.target
+ target['target']
for target in branches.values()
if (target
- and target.target_type == 'alias'
- and target.target not in branches)
+ and target['target_type'] == 'alias'
+ and target['target'] not in branches)
}
-
for alias in unresolved_aliases:
- branches[alias] = draw(branch_targets(only_objects=True))
+ branches[alias] = draw(branch_targets_d(only_objects=True))
# Ensure no cycles between aliases
while True:
try:
id_ = snapshot_identifier({
'branches': {
- name: branch.to_dict() if branch else None
+ name: branch or None
for (name, branch) in branches.items()}})
except ValueError as e:
for (source, target) in e.args[1]:
- branches[source] = draw(branch_targets(only_objects=True))
+ branches[source] = draw(branch_targets_d(only_objects=True))
else:
break
- return Snapshot(
+
+ return dict(
id=identifier_to_bytes(id_),
branches=branches)
+def snapshots(*, min_size=0, max_size=100, only_objects=False):
+ return snapshots_d(min_size=0, max_size=100, only_objects=False).map(
+ Snapshot.from_dict)
+
+
def objects():
return one_of(
origins().map(lambda x: ('origin', x)),
@@ -272,4 +359,12 @@
def object_dicts():
- return objects().map(lambda x: (x[0], x[1].to_dict()))
+ return one_of(
+ origins_d().map(lambda x: ('origin', x)),
+ origin_visits_d().map(lambda x: ('origin_visit', x)),
+ snapshots_d().map(lambda x: ('snapshot', x)),
+ releases_d().map(lambda x: ('release', x)),
+ revisions_d().map(lambda x: ('revision', x)),
+ directories_d().map(lambda x: ('directory', x)),
+ contents_d().map(lambda x: ('content', x)),
+ )
diff --git a/swh/model/tests/test_hypothesis_strategies.py b/swh/model/tests/test_hypothesis_strategies.py
--- a/swh/model/tests/test_hypothesis_strategies.py
+++ b/swh/model/tests/test_hypothesis_strategies.py
@@ -45,13 +45,13 @@
assert_nested_dict(object_)
if obj_type == 'content':
if object_['status'] == 'visible':
- assert set(object_) == \
+ assert set(object_) <= \
set(DEFAULT_ALGORITHMS) | {'length', 'status', 'data'}
elif object_['status'] == 'absent':
assert set(object_) == \
set(DEFAULT_ALGORITHMS) | {'length', 'status', 'reason'}
elif object_['status'] == 'hidden':
- assert set(object_) == \
+ assert set(object_) <= \
set(DEFAULT_ALGORITHMS) | {'length', 'status', 'data'}
else:
assert False, object_
@@ -60,3 +60,27 @@
elif obj_type == 'snapshot':
for branch in object_['branches'].values():
assert branch is None or branch['target_type'] in target_types
+
+
+@given(objects())
+def test_model_to_dicts(obj_type_and_obj):
+ (obj_type, object_) = obj_type_and_obj
+ obj_dict = object_.to_dict()
+ assert_nested_dict(obj_dict)
+ if obj_type == 'content':
+ if obj_dict['status'] == 'visible':
+ assert set(obj_dict) == \
+ set(DEFAULT_ALGORITHMS) | {'length', 'status', 'data'}
+ elif obj_dict['status'] == 'absent':
+ assert set(obj_dict) == \
+ set(DEFAULT_ALGORITHMS) | {'length', 'status', 'reason'}
+ elif obj_dict['status'] == 'hidden':
+ assert set(obj_dict) == \
+ set(DEFAULT_ALGORITHMS) | {'length', 'status', 'data'}
+ else:
+ assert False, obj_dict
+ elif obj_type == 'release':
+ assert obj_dict['target_type'] in target_types
+ elif obj_type == 'snapshot':
+ for branch in obj_dict['branches'].values():
+ assert branch is None or branch['target_type'] in target_types
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Jul 3, 3:51 PM (2 w, 21 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3220882
Attached To
D2905: hypothesis: split hypothesis strategies as a dict + entity instance
Event Timeline
Log In to Comment