diff --git a/swh/model/hypothesis_strategies.py b/swh/model/hypothesis_strategies.py index 3b046ca..26d4a81 100644 --- a/swh/model/hypothesis_strategies.py +++ b/swh/model/hypothesis_strategies.py @@ -1,190 +1,220 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import datetime + from hypothesis.strategies import ( - lists, one_of, composite, builds, integers, sampled_from, binary, - dictionaries, none, from_regex, just + binary, builds, characters, composite, dictionaries, from_regex, + integers, just, lists, none, one_of, sampled_from, text, tuples, ) from .from_disk import DentryPerms from .model import ( Person, Timestamp, TimestampWithTimezone, Origin, OriginVisit, Snapshot, SnapshotBranch, TargetType, Release, Revision, Directory, DirectoryEntry, Content ) from .identifiers import snapshot_identifier, identifier_to_bytes def sha1_git(): return binary(min_size=20, max_size=20) +def sha1(): + return binary(min_size=20, max_size=20) + + @composite def urls(draw): protocol = draw(sampled_from(['git', 'http', 'https', 'deb'])) domain = draw(from_regex(r'\A([a-z]([a-z0-9-]*)\.){1,3}[a-z0-9]+\Z')) return '%s://%s' % (protocol, domain) def persons(): return builds(Person) def timestamps(): + max_seconds = datetime.datetime.max.timestamp() + min_seconds = datetime.datetime.min.timestamp() return builds( Timestamp, - seconds=integers(-2**63, 2**63-1), + seconds=integers(min_seconds, max_seconds), microseconds=integers(0, 1000000)) def timestamps_with_timezone(): return builds( TimestampWithTimezone, timestamp=timestamps(), - offset=integers(-2**16, 2**16-1)) + offset=integers(min_value=-14*60, max_value=14*60)) def origins(): return builds( Origin, type=sampled_from(['git', 'hg', 'svn', 'pypi', 'deb']), url=urls()) def origin_visits(): return builds( OriginVisit, visit=integers(0, 1000), origin=origins()) -def releases(): - return builds( +@composite +def releases(draw): + (date, author) = draw(one_of( + tuples(none(), none()), + tuples(timestamps_with_timezone(), persons()))) + rel = draw(builds( Release, id=sha1_git(), - date=timestamps_with_timezone(), - author=one_of(none(), persons()), - target=one_of(none(), sha1_git())) + author=none(), + date=none(), + target=sha1_git())) + rel.date = date + rel.author = author + return rel + + +def revision_metadata(): + alphabet = characters( + blacklist_categories=('Cs', ), + blacklist_characters=['\u0000']) # postgresql does not like these + return dictionaries(text(alphabet=alphabet), text(alphabet=alphabet)) def revisions(): return builds( Revision, id=sha1_git(), date=timestamps_with_timezone(), committer_date=timestamps_with_timezone(), - parents=lists(binary()), - directory=binary(), - metadata=one_of(none(), dictionaries(binary(), binary()))) + parents=lists(sha1_git()), + directory=sha1_git(), + metadata=one_of(none(), revision_metadata())) + # TODO: metadata['extra_headers'] can have binary keys and values def directory_entries(): return builds( DirectoryEntry, target=sha1_git(), perms=sampled_from([perm.value for perm in DentryPerms])) def directories(): return builds( Directory, id=sha1_git(), entries=lists(directory_entries())) -def contents(): - def filter_data(content): - if content.status != 'visible': - content.data = None - return content +@composite +def contents(draw): + (status, data, reason) = draw(one_of( + tuples(just('visible'), binary(), none()), + tuples(just('absent'), none(), text()), + tuples(just('hidden'), none(), none()), + )) - return builds( + return draw(builds( Content, length=integers(0), - data=binary(), + sha1=sha1(), sha1_git=sha1_git(), - ).map(filter_data) + sha256=binary(min_size=32, max_size=32), + blake2s256=binary(min_size=32, max_size=32), + status=just(status), + data=just(data), + reason=just(reason), + )) def branch_names(): return binary() def branch_targets_object(): return builds( SnapshotBranch, target=sha1_git(), target_type=sampled_from([ TargetType.CONTENT, TargetType.DIRECTORY, TargetType.REVISION, TargetType.RELEASE, TargetType.SNAPSHOT])) def branch_targets_alias(): return builds( SnapshotBranch, target_type=just(TargetType.ALIAS)) def branch_targets(*, only_objects=False): if only_objects: return branch_targets_object() else: return one_of(branch_targets_alias(), branch_targets_object()) @composite def snapshots(draw, *, min_size=0, max_size=100, only_objects=False): branches = draw(dictionaries( keys=branch_names(), values=branch_targets(only_objects=only_objects), min_size=min_size, max_size=max_size, )) if not only_objects: # Make sure aliases point to actual branches unresolved_aliases = { target.target for target in branches.values() if (target and target.target_type == 'alias' and target.target not in branches) } for alias in unresolved_aliases: branches[alias] = draw(branch_targets(only_objects=True)) while True: try: id_ = snapshot_identifier({ 'branches': { name: branch.to_dict() for (name, branch) in branches.items()}}) except ValueError as e: for (source, target) in e.args[1]: branches[source] = draw(branch_targets(only_objects=True)) else: break return Snapshot( id=identifier_to_bytes(id_), branches=branches) def objects(): return one_of( origins().map(lambda x: ('origin', x)), origin_visits().map(lambda x: ('origin_visit', x)), snapshots().map(lambda x: ('snapshot', x)), releases().map(lambda x: ('release', x)), revisions().map(lambda x: ('revision', x)), directories().map(lambda x: ('directory', x)), contents().map(lambda x: ('content', x)), ) def object_dicts(): return objects().map(lambda x: (x[0], x[1].to_dict())) diff --git a/swh/model/model.py b/swh/model/model.py index 890d133..036879d 100644 --- a/swh/model/model.py +++ b/swh/model/model.py @@ -1,205 +1,247 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from enum import Enum from typing import List, Optional, Dict import attr # TODO: Limit this to 20 bytes Sha1Git = bytes @attr.s class Person: name = attr.ib(type=bytes) email = attr.ib(type=bytes) fullname = attr.ib(type=bytes) @attr.s class Timestamp: seconds = attr.ib(type=int) microseconds = attr.ib(type=int) @seconds.validator def check_seconds(self, attribute, value): """Check that seconds fit in a 64-bits signed integer.""" if not (-2**63 <= value < 2**63): raise ValueError('Seconds must be a signed 64-bits integer.') @microseconds.validator def check_microseconds(self, attribute, value): """Checks that microseconds are positive and < 1000000.""" if not (0 <= value < 10**6): raise ValueError('Microseconds must be in [0, 1000000[.') @attr.s class TimestampWithTimezone: timestamp = attr.ib(type=Timestamp) offset = attr.ib(type=int) negative_utc = attr.ib(type=bool) def to_dict(self): return attr.asdict(self) + @offset.validator + def check_offset(self, attribute, value): + if not (-2**15 <= value < 2**15): + # max 14 hours offset in theory, but you never know what + # you'll find in the wild... + raise ValueError('offset too large: %d minutes' % value) + @attr.s class Origin: type = attr.ib(type=str) url = attr.ib(type=str) def to_dict(self): return attr.asdict(self) @attr.s class OriginVisit: origin = attr.ib(type=Origin) date = attr.ib(type=datetime.datetime) visit = attr.ib(type=Optional[int]) """Should not be set before calling 'origin_visit_add()'.""" def to_dict(self): ov = attr.asdict(self) ov['origin'] = self.origin.to_dict() ov['date'] = str(self.date) if not ov['visit']: del ov['visit'] return ov class TargetType(Enum): CONTENT = 'content' DIRECTORY = 'directory' REVISION = 'revision' RELEASE = 'release' SNAPSHOT = 'snapshot' ALIAS = 'alias' +class ObjectType(Enum): + CONTENT = 'content' + DIRECTORY = 'directory' + REVISION = 'revision' + RELEASE = 'release' + SNAPSHOT = 'snapshot' + + @attr.s class SnapshotBranch: target = attr.ib(type=bytes) target_type = attr.ib(type=TargetType) @target.validator def check_target(self, attribute, value): if self.target_type != TargetType.ALIAS: if len(value) != 20: raise ValueError('Wrong length for bytes identifier: %d' % len(value)) def to_dict(self): branch = attr.asdict(self) branch['target_type'] = branch['target_type'].value return branch @attr.s class Snapshot: id = attr.ib(type=Sha1Git) branches = attr.ib(type=Dict[bytes, Optional[SnapshotBranch]]) def to_dict(self): return { 'id': self.id, 'branches': { name: branch.to_dict() for (name, branch) in self.branches.items() } } @attr.s class Release: id = attr.ib(type=Sha1Git) name = attr.ib(type=bytes) message = attr.ib(type=bytes) - date = attr.ib(type=TimestampWithTimezone) + date = attr.ib(type=Optional[TimestampWithTimezone]) author = attr.ib(type=Optional[Person]) target = attr.ib(type=Optional[Sha1Git]) - target_type = attr.ib(type=TargetType) + target_type = attr.ib(type=ObjectType) synthetic = attr.ib(type=bool) def to_dict(self): rel = attr.asdict(self) - rel['date'] = self.date.to_dict() + rel['date'] = self.date.to_dict() if self.date is not None else None rel['target_type'] = rel['target_type'].value return rel + @author.validator + def check_author(self, attribute, value): + if self.author is None and self.date is not None: + raise ValueError('release date must be None if date is None.') + + +class RevisionType(Enum): + GIT = 'git' + TAR = 'tar' + DSC = 'dsc' + SUBVERSION = 'svn' + MERCURIAL = 'hg' + @attr.s class Revision: id = attr.ib(type=Sha1Git) message = attr.ib(type=bytes) author = attr.ib(type=Person) committer = attr.ib(type=Person) date = attr.ib(type=TimestampWithTimezone) committer_date = attr.ib(type=TimestampWithTimezone) parents = attr.ib(type=List[Sha1Git]) - type = attr.ib(type=str) + type = attr.ib(type=RevisionType) directory = attr.ib(type=Sha1Git) - metadata = attr.ib(type=Optional[dict]) + metadata = attr.ib(type=Optional[Dict[str, object]]) synthetic = attr.ib(type=bool) def to_dict(self): rev = attr.asdict(self) rev['date'] = self.date.to_dict() rev['committer_date'] = self.committer_date.to_dict() + rev['type'] = rev['type'].value return rev @attr.s class DirectoryEntry: name = attr.ib(type=bytes) type = attr.ib(type=str, validator=attr.validators.in_(['file', 'dir', 'rev'])) target = attr.ib(type=Sha1Git) perms = attr.ib(type=int) """Usually one of the values of `swh.model.from_disk.DentryPerms`.""" def to_dict(self): return attr.asdict(self) @attr.s class Directory: id = attr.ib(type=Sha1Git) entries = attr.ib(type=List[DirectoryEntry]) def to_dict(self): dir_ = attr.asdict(self) dir_['entries'] = [entry.to_dict() for entry in self.entries] return dir_ @attr.s class Content: sha1 = attr.ib(type=bytes) sha1_git = attr.ib(type=Sha1Git) sha256 = attr.ib(type=bytes) blake2s256 = attr.ib(type=bytes) data = attr.ib(type=bytes) length = attr.ib(type=int) status = attr.ib( type=str, validator=attr.validators.in_(['visible', 'absent', 'hidden'])) + reason = attr.ib(type=Optional[str]) @length.validator def check_length(self, attribute, value): """Checks the length is positive.""" if value < 0: raise ValueError('Length must be positive.') + @reason.validator + def check_reason(self, attribute, value): + """Checks the reason is full iff status != absent.""" + assert self.reason == value + if self.status == 'absent' and value is None: + raise ValueError('Must provide a reason if content is absent.') + elif self.status != 'absent' and value is not None: + raise ValueError( + 'Must not provide a reason if content is not absent.') + def to_dict(self): content = attr.asdict(self) if content['data'] is None: del content['data'] + if content['reason'] is None: + del content['reason'] return content diff --git a/swh/model/tests/test_hypothesis_strategies.py b/swh/model/tests/test_hypothesis_strategies.py index 7e63d84..3e69ab9 100644 --- a/swh/model/tests/test_hypothesis_strategies.py +++ b/swh/model/tests/test_hypothesis_strategies.py @@ -1,54 +1,59 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import attr from hypothesis import given from swh.model.hashutil import DEFAULT_ALGORITHMS from swh.model.hypothesis_strategies import objects, object_dicts target_types = ( 'content', 'directory', 'revision', 'release', 'snapshot', 'alias') @given(objects()) def test_generation(obj_type_and_obj): (obj_type, object_) = obj_type_and_obj attr.validate(object_) def assert_nested_dict(obj): """Tests the object is a nested dict and contains no more class from swh.model.model.""" if isinstance(obj, dict): for (key, value) in obj.items(): assert isinstance(key, (str, bytes)), key assert_nested_dict(value) elif isinstance(obj, list): for value in obj: assert_nested_dict(value) elif isinstance(obj, (int, float, str, bytes, bool, type(None))): pass else: assert False, obj @given(object_dicts()) def test_dicts_generation(obj_type_and_obj): (obj_type, object_) = obj_type_and_obj assert_nested_dict(object_) if obj_type == 'content': if object_['status'] == 'visible': assert set(object_) == \ set(DEFAULT_ALGORITHMS) | {'length', 'status', 'data'} - else: + elif object_['status'] == 'absent': + assert set(object_) == \ + set(DEFAULT_ALGORITHMS) | {'length', 'status', 'reason'} + elif object_['status'] == 'hidden': assert set(object_) == \ set(DEFAULT_ALGORITHMS) | {'length', 'status'} + else: + assert False, object_ elif obj_type == 'release': assert object_['target_type'] in target_types elif obj_type == 'snapshot': for branch in object_['branches'].values(): assert branch['target_type'] in target_types