diff --git a/swh/model/hypothesis_strategies.py b/swh/model/hypothesis_strategies.py index 19c3fd3..542b457 100644 --- a/swh/model/hypothesis_strategies.py +++ b/swh/model/hypothesis_strategies.py @@ -1,265 +1,275 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import attr import datetime +from hypothesis import assume from hypothesis.strategies import ( - binary, builds, characters, composite, dictionaries, + binary, booleans, builds, characters, composite, dictionaries, from_regex, integers, just, lists, none, one_of, sampled_from, sets, text, tuples, ) from .from_disk import DentryPerms from .model import ( Person, Timestamp, TimestampWithTimezone, Origin, OriginVisit, OriginVisitUpdate, Snapshot, SnapshotBranch, TargetType, Release, Revision, Directory, DirectoryEntry, Content, SkippedContent ) from .identifiers import snapshot_identifier, identifier_to_bytes pgsql_alphabet = characters( blacklist_categories=('Cs', ), blacklist_characters=['\u0000']) # postgresql does not like these def optional(strategy): return one_of(none(), strategy) def pgsql_text(): return text(alphabet=pgsql_alphabet) def sha1_git(): return binary(min_size=20, max_size=20) def sha1(): return binary(min_size=20, max_size=20) @composite def urls(draw): protocol = draw(sampled_from(['git', 'http', 'https', 'deb'])) domain = draw(from_regex(r'\A([a-z]([a-z0-9-]*)\.){1,3}[a-z0-9]+\Z')) return '%s://%s' % (protocol, domain) def persons(): return builds(Person, email=optional(binary()), name=optional(binary())) def timestamps(): max_seconds = datetime.datetime.max.replace( tzinfo=datetime.timezone.utc).timestamp() min_seconds = datetime.datetime.min.replace( tzinfo=datetime.timezone.utc).timestamp() return builds( Timestamp, seconds=integers(min_seconds, max_seconds), microseconds=integers(0, 1000000)) -def timestamps_with_timezone(): - return builds( - TimestampWithTimezone, +@composite +def timestamps_with_timezone( + draw, timestamp=timestamps(), - offset=integers(min_value=-14*60, max_value=14*60)) + offset=integers(min_value=-14*60, max_value=14*60), + negative_utc=booleans()): + timestamp = draw(timestamp) + offset = draw(offset) + negative_utc = draw(negative_utc) + assume(not (negative_utc and offset)) + return TimestampWithTimezone( + timestamp=timestamp, + offset=offset, + negative_utc=negative_utc) def origins(): return builds( Origin, url=urls()) def origin_visits(): return builds( OriginVisit, visit=integers(0, 1000), origin=urls(), status=sampled_from(['ongoing', 'full', 'partial']), type=pgsql_text(), snapshot=optional(sha1_git()), ) def metadata_dicts(): return dictionaries(pgsql_text(), pgsql_text()) def origin_visit_updates(): return builds( OriginVisitUpdate, visit=integers(0, 1000), origin=urls(), status=sampled_from(['ongoing', 'full', 'partial']), snapshot=optional(sha1_git()), metadata=one_of(none(), metadata_dicts())) @composite def releases(draw): (date, author) = draw(one_of( tuples(none(), none()), tuples(timestamps_with_timezone(), persons()))) rel = draw(builds( Release, author=none(), date=none(), target=sha1_git())) return attr.evolve( rel, date=date, author=author) revision_metadata = metadata_dicts def revisions(): return builds( Revision, author=persons(), committer=persons(), date=timestamps_with_timezone(), committer_date=timestamps_with_timezone(), parents=lists(sha1_git()), directory=sha1_git(), metadata=one_of(none(), revision_metadata())) # TODO: metadata['extra_headers'] can have binary keys and values def directory_entries(): return builds( DirectoryEntry, target=sha1_git(), perms=sampled_from([perm.value for perm in DentryPerms])) def directories(): return builds( Directory, entries=lists(directory_entries())) def contents(): return one_of(present_contents(), skipped_contents()) def present_contents(): return builds( Content.from_data, binary(max_size=4096), status=one_of(just('visible'), just('hidden')), ) @composite def skipped_contents(draw): nullify_attrs = draw( sets(sampled_from(['sha1', 'sha1_git', 'sha256', 'blake2s256'])) ) new_attrs = { k: None for k in nullify_attrs } ret = draw(builds( SkippedContent.from_data, binary(max_size=4096), reason=pgsql_text(), )) return attr.evolve(ret, **new_attrs) def branch_names(): return binary(min_size=1) def branch_targets_object(): return builds( SnapshotBranch, target=sha1_git(), target_type=sampled_from([ TargetType.CONTENT, TargetType.DIRECTORY, TargetType.REVISION, TargetType.RELEASE, TargetType.SNAPSHOT])) def branch_targets_alias(): return builds( SnapshotBranch, target_type=just(TargetType.ALIAS)) def branch_targets(*, only_objects=False): if only_objects: return branch_targets_object() else: return one_of(branch_targets_alias(), branch_targets_object()) @composite def snapshots(draw, *, min_size=0, max_size=100, only_objects=False): branches = draw(dictionaries( keys=branch_names(), values=one_of( none(), branch_targets(only_objects=only_objects) ), min_size=min_size, max_size=max_size, )) if not only_objects: # Make sure aliases point to actual branches unresolved_aliases = { target.target for target in branches.values() if (target and target.target_type == 'alias' and target.target not in branches) } for alias in unresolved_aliases: branches[alias] = draw(branch_targets(only_objects=True)) # Ensure no cycles between aliases while True: try: id_ = snapshot_identifier({ 'branches': { name: branch.to_dict() if branch else None for (name, branch) in branches.items()}}) except ValueError as e: for (source, target) in e.args[1]: branches[source] = draw(branch_targets(only_objects=True)) else: break return Snapshot( id=identifier_to_bytes(id_), branches=branches) def objects(): return one_of( origins().map(lambda x: ('origin', x)), origin_visits().map(lambda x: ('origin_visit', x)), origin_visit_updates().map(lambda x: ('origin_visit_update', x)), snapshots().map(lambda x: ('snapshot', x)), releases().map(lambda x: ('release', x)), revisions().map(lambda x: ('revision', x)), directories().map(lambda x: ('directory', x)), contents().map(lambda x: ('content', x)), ) def object_dicts(): return objects().map(lambda x: (x[0], x[1].to_dict())) diff --git a/swh/model/model.py b/swh/model/model.py index eb2ec15..6fdfe32 100644 --- a/swh/model/model.py +++ b/swh/model/model.py @@ -1,729 +1,734 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from abc import ABCMeta, abstractmethod from enum import Enum from typing import Dict, List, Optional, Union import attr from attrs_strict import type_validator import dateutil.parser import iso8601 from .identifiers import ( normalize_timestamp, directory_identifier, revision_identifier, release_identifier, snapshot_identifier ) from .hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, MultiHash class MissingData(Exception): """Raised by `Content.with_data` when it has no way of fetching the data (but not when fetching the data fails).""" pass SHA1_SIZE = 20 # TODO: Limit this to 20 bytes Sha1Git = bytes def dictify(value): "Helper function used by BaseModel.to_dict()" if isinstance(value, BaseModel): return value.to_dict() elif isinstance(value, Enum): return value.value elif isinstance(value, dict): return {k: dictify(v) for k, v in value.items()} elif isinstance(value, list): return [dictify(v) for v in value] else: return value class BaseModel: """Base class for SWH model classes. Provides serialization/deserialization to/from Python dictionaries, that are suitable for JSON/msgpack-like formats.""" def to_dict(self): """Wrapper of `attr.asdict` that can be overridden by subclasses that have special handling of some of the fields.""" return dictify(attr.asdict(self, recurse=False)) @classmethod def from_dict(cls, d): """Takes a dictionary representing a tree of SWH objects, and recursively builds the corresponding objects.""" return cls(**d) class HashableObject(metaclass=ABCMeta): """Mixin to automatically compute object identifier hash when the associated model is instantiated.""" @staticmethod @abstractmethod def compute_hash(object_dict): """Derived model classes must implement this to compute the object hash from its dict representation.""" pass def __attrs_post_init__(self): if not self.id: obj_id = hash_to_bytes(self.compute_hash(self.to_dict())) object.__setattr__(self, 'id', obj_id) @attr.s(frozen=True) class Person(BaseModel): """Represents the author/committer of a revision or release.""" fullname = attr.ib( type=bytes, validator=type_validator()) name = attr.ib( type=Optional[bytes], validator=type_validator()) email = attr.ib( type=Optional[bytes], validator=type_validator()) @classmethod def from_fullname(cls, fullname: bytes): """Returns a Person object, by guessing the name and email from the fullname, in the `name ` format. The fullname is left unchanged.""" if fullname is None: raise TypeError('fullname is None.') name: Optional[bytes] email: Optional[bytes] try: open_bracket = fullname.index(b'<') except ValueError: name = fullname email = None else: raw_name = fullname[:open_bracket] raw_email = fullname[open_bracket+1:] if not raw_name: name = None else: name = raw_name.strip() try: close_bracket = raw_email.rindex(b'>') except ValueError: email = raw_email else: email = raw_email[:close_bracket] return Person( name=name or None, email=email or None, fullname=fullname, ) @attr.s(frozen=True) class Timestamp(BaseModel): """Represents a naive timestamp from a VCS.""" seconds = attr.ib( type=int, validator=type_validator()) microseconds = attr.ib( type=int, validator=type_validator()) @seconds.validator def check_seconds(self, attribute, value): """Check that seconds fit in a 64-bits signed integer.""" if not (-2**63 <= value < 2**63): raise ValueError('Seconds must be a signed 64-bits integer.') @microseconds.validator def check_microseconds(self, attribute, value): """Checks that microseconds are positive and < 1000000.""" if not (0 <= value < 10**6): raise ValueError('Microseconds must be in [0, 1000000[.') @attr.s(frozen=True) class TimestampWithTimezone(BaseModel): """Represents a TZ-aware timestamp from a VCS.""" timestamp = attr.ib( type=Timestamp, validator=type_validator()) offset = attr.ib( type=int, validator=type_validator()) negative_utc = attr.ib( type=bool, validator=type_validator()) @offset.validator def check_offset(self, attribute, value): """Checks the offset is a 16-bits signed integer (in theory, it should always be between -14 and +14 hours).""" if not (-2**15 <= value < 2**15): # max 14 hours offset in theory, but you never know what # you'll find in the wild... raise ValueError('offset too large: %d minutes' % value) + @negative_utc.validator + def check_negative_utc(self, attribute, value): + if self.offset and value: + raise ValueError("negative_utc can only be True is offset=0") + @classmethod def from_dict(cls, obj: Union[Dict, datetime.datetime, int]): """Builds a TimestampWithTimezone from any of the formats accepted by :func:`swh.model.normalize_timestamp`.""" # TODO: this accept way more types than just dicts; find a better # name d = normalize_timestamp(obj) return cls( timestamp=Timestamp.from_dict(d['timestamp']), offset=d['offset'], negative_utc=d['negative_utc']) @classmethod def from_datetime(cls, dt: datetime.datetime): return cls.from_dict(dt) @classmethod def from_iso8601(cls, s): """Builds a TimestampWithTimezone from an ISO8601-formatted string. """ dt = iso8601.parse_date(s) tstz = cls.from_datetime(dt) if dt.tzname() == '-00:00': tstz = attr.evolve(tstz, negative_utc=True) return tstz @attr.s(frozen=True) class Origin(BaseModel): """Represents a software source: a VCS and an URL.""" url = attr.ib( type=str, validator=type_validator()) @attr.s(frozen=True) class OriginVisit(BaseModel): """Represents a visit of an origin at a given point in time, by a SWH loader.""" origin = attr.ib( type=str, validator=type_validator()) date = attr.ib( type=datetime.datetime, validator=type_validator()) status = attr.ib( type=str, validator=attr.validators.in_(['ongoing', 'full', 'partial'])) type = attr.ib( type=str, validator=type_validator()) snapshot = attr.ib( type=Optional[Sha1Git], validator=type_validator()) metadata = attr.ib( type=Optional[Dict[str, object]], validator=type_validator(), default=None) visit = attr.ib( type=Optional[int], validator=type_validator(), default=None) """Should not be set before calling 'origin_visit_add()'.""" def to_dict(self): """Serializes the date as a string and omits the visit id if it is `None`.""" ov = super().to_dict() if ov['visit'] is None: del ov['visit'] return ov @classmethod def from_dict(cls, d): """Parses the date from a string, and accepts missing visit ids.""" if isinstance(d['date'], str): d = d.copy() d['date'] = dateutil.parser.parse(d['date']) return super().from_dict(d) @attr.s(frozen=True) class OriginVisitUpdate(BaseModel): """Represents a visit update of an origin at a given point in time. """ origin = attr.ib( type=str, validator=type_validator()) visit = attr.ib( type=int, validator=type_validator()) date = attr.ib( type=datetime.datetime, validator=type_validator()) status = attr.ib( type=str, validator=attr.validators.in_(['ongoing', 'full', 'partial'])) snapshot = attr.ib( type=Optional[Sha1Git], validator=type_validator()) metadata = attr.ib( type=Optional[Dict[str, object]], validator=type_validator(), default=None) class TargetType(Enum): """The type of content pointed to by a snapshot branch. Usually a revision or an alias.""" CONTENT = 'content' DIRECTORY = 'directory' REVISION = 'revision' RELEASE = 'release' SNAPSHOT = 'snapshot' ALIAS = 'alias' class ObjectType(Enum): """The type of content pointed to by a release. Usually a revision""" CONTENT = 'content' DIRECTORY = 'directory' REVISION = 'revision' RELEASE = 'release' SNAPSHOT = 'snapshot' @attr.s(frozen=True) class SnapshotBranch(BaseModel): """Represents one of the branches of a snapshot.""" target = attr.ib( type=bytes, validator=type_validator()) target_type = attr.ib( type=TargetType, validator=type_validator()) @target.validator def check_target(self, attribute, value): """Checks the target type is not an alias, checks the target is a valid sha1_git.""" if self.target_type != TargetType.ALIAS and self.target is not None: if len(value) != 20: raise ValueError('Wrong length for bytes identifier: %d' % len(value)) @classmethod def from_dict(cls, d): return cls( target=d['target'], target_type=TargetType(d['target_type'])) @attr.s(frozen=True) class Snapshot(BaseModel, HashableObject): """Represents the full state of an origin at a given point in time.""" branches = attr.ib( type=Dict[bytes, Optional[SnapshotBranch]], validator=type_validator()) id = attr.ib( type=Sha1Git, validator=type_validator(), default=b'') @staticmethod def compute_hash(object_dict): return snapshot_identifier(object_dict) @classmethod def from_dict(cls, d): d = d.copy() return cls( branches={ name: SnapshotBranch.from_dict(branch) if branch else None for (name, branch) in d.pop('branches').items() }, **d) @attr.s(frozen=True) class Release(BaseModel, HashableObject): name = attr.ib( type=bytes, validator=type_validator()) message = attr.ib( type=bytes, validator=type_validator()) target = attr.ib( type=Optional[Sha1Git], validator=type_validator()) target_type = attr.ib( type=ObjectType, validator=type_validator()) synthetic = attr.ib( type=bool, validator=type_validator()) author = attr.ib( type=Optional[Person], validator=type_validator(), default=None) date = attr.ib( type=Optional[TimestampWithTimezone], validator=type_validator(), default=None) metadata = attr.ib( type=Optional[Dict[str, object]], validator=type_validator(), default=None) id = attr.ib( type=Sha1Git, validator=type_validator(), default=b'') @staticmethod def compute_hash(object_dict): return release_identifier(object_dict) @author.validator def check_author(self, attribute, value): """If the author is `None`, checks the date is `None` too.""" if self.author is None and self.date is not None: raise ValueError('release date must be None if author is None.') def to_dict(self): rel = super().to_dict() if rel['metadata'] is None: del rel['metadata'] return rel @classmethod def from_dict(cls, d): d = d.copy() if d.get('author'): d['author'] = Person.from_dict(d['author']) if d.get('date'): d['date'] = TimestampWithTimezone.from_dict(d['date']) return cls( target_type=ObjectType(d.pop('target_type')), **d) class RevisionType(Enum): GIT = 'git' TAR = 'tar' DSC = 'dsc' SUBVERSION = 'svn' MERCURIAL = 'hg' @attr.s(frozen=True) class Revision(BaseModel, HashableObject): message = attr.ib( type=bytes, validator=type_validator()) author = attr.ib( type=Person, validator=type_validator()) committer = attr.ib( type=Person, validator=type_validator()) date = attr.ib( type=Optional[TimestampWithTimezone], validator=type_validator()) committer_date = attr.ib( type=Optional[TimestampWithTimezone], validator=type_validator()) type = attr.ib( type=RevisionType, validator=type_validator()) directory = attr.ib( type=Sha1Git, validator=type_validator()) synthetic = attr.ib( type=bool, validator=type_validator()) metadata = attr.ib( type=Optional[Dict[str, object]], validator=type_validator(), default=None) parents = attr.ib( type=List[Sha1Git], validator=type_validator(), default=attr.Factory(list)) id = attr.ib( type=Sha1Git, validator=type_validator(), default=b'') @staticmethod def compute_hash(object_dict): return revision_identifier(object_dict) @classmethod def from_dict(cls, d): d = d.copy() date = d.pop('date') if date: date = TimestampWithTimezone.from_dict(date) committer_date = d.pop('committer_date') if committer_date: committer_date = TimestampWithTimezone.from_dict( committer_date) return cls( author=Person.from_dict(d.pop('author')), committer=Person.from_dict(d.pop('committer')), date=date, committer_date=committer_date, type=RevisionType(d.pop('type')), **d) @attr.s(frozen=True) class DirectoryEntry(BaseModel): name = attr.ib( type=bytes, validator=type_validator()) type = attr.ib( type=str, validator=attr.validators.in_(['file', 'dir', 'rev'])) target = attr.ib( type=Sha1Git, validator=type_validator()) perms = attr.ib( type=int, validator=type_validator()) """Usually one of the values of `swh.model.from_disk.DentryPerms`.""" @attr.s(frozen=True) class Directory(BaseModel, HashableObject): entries = attr.ib( type=List[DirectoryEntry], validator=type_validator()) id = attr.ib( type=Sha1Git, validator=type_validator(), default=b'') @staticmethod def compute_hash(object_dict): return directory_identifier(object_dict) @classmethod def from_dict(cls, d): d = d.copy() return cls( entries=[DirectoryEntry.from_dict(entry) for entry in d.pop('entries')], **d) @attr.s(frozen=True) class BaseContent(BaseModel): status = attr.ib( type=str, validator=attr.validators.in_(['visible', 'hidden', 'absent'])) @staticmethod def _hash_data(data: bytes): """Hash some data, returning most of the fields of a content object""" d = MultiHash.from_data(data).digest() d['data'] = data d['length'] = len(data) return d def to_dict(self): content = super().to_dict() if content['ctime'] is None: del content['ctime'] return content @classmethod def from_dict(cls, d, use_subclass=True): if use_subclass: # Chooses a subclass to instantiate instead. if d['status'] == 'absent': return SkippedContent.from_dict(d) else: return Content.from_dict(d) else: return super().from_dict(d) def get_hash(self, hash_name): if hash_name not in DEFAULT_ALGORITHMS: raise ValueError('{} is not a valid hash name.'.format(hash_name)) return getattr(self, hash_name) def hashes(self) -> Dict[str, bytes]: """Returns a dictionary {hash_name: hash_value}""" return {algo: getattr(self, algo) for algo in DEFAULT_ALGORITHMS} @attr.s(frozen=True) class Content(BaseContent): sha1 = attr.ib( type=bytes, validator=type_validator()) sha1_git = attr.ib( type=Sha1Git, validator=type_validator()) sha256 = attr.ib( type=bytes, validator=type_validator()) blake2s256 = attr.ib( type=bytes, validator=type_validator()) length = attr.ib( type=int, validator=type_validator()) status = attr.ib( type=str, validator=attr.validators.in_(['visible', 'hidden']), default='visible') data = attr.ib( type=Optional[bytes], validator=type_validator(), default=None) ctime = attr.ib( type=Optional[datetime.datetime], validator=type_validator(), default=None) @length.validator def check_length(self, attribute, value): """Checks the length is positive.""" if value < 0: raise ValueError('Length must be positive.') def to_dict(self): content = super().to_dict() if content['data'] is None: del content['data'] return content @classmethod def from_data(cls, data, status='visible') -> 'Content': """Generate a Content from a given `data` byte string. This populates the Content with the hashes and length for the data passed as argument, as well as the data itself. """ d = cls._hash_data(data) d['status'] = status return cls(**d) @classmethod def from_dict(cls, d): return super().from_dict(d, use_subclass=False) def with_data(self) -> 'Content': """Loads the `data` attribute; meaning that it is guaranteed not to be None after this call. This call is almost a no-op, but subclasses may overload this method to lazy-load data (eg. from disk or objstorage).""" if self.data is None: raise MissingData('Content data is None.') return self @attr.s(frozen=True) class SkippedContent(BaseContent): sha1 = attr.ib( type=Optional[bytes], validator=type_validator()) sha1_git = attr.ib( type=Optional[Sha1Git], validator=type_validator()) sha256 = attr.ib( type=Optional[bytes], validator=type_validator()) blake2s256 = attr.ib( type=Optional[bytes], validator=type_validator()) length = attr.ib( type=Optional[int], validator=type_validator()) status = attr.ib( type=str, validator=attr.validators.in_(['absent'])) reason = attr.ib( type=Optional[str], validator=type_validator(), default=None) origin = attr.ib( type=Optional[Origin], validator=type_validator(), default=None) ctime = attr.ib( type=Optional[datetime.datetime], validator=type_validator(), default=None) @reason.validator def check_reason(self, attribute, value): """Checks the reason is full if status != absent.""" assert self.reason == value if value is None: raise ValueError('Must provide a reason if content is absent.') @length.validator def check_length(self, attribute, value): """Checks the length is positive or -1.""" if value < -1: raise ValueError('Length must be positive or -1.') def to_dict(self): content = super().to_dict() if content['origin'] is None: del content['origin'] return content @classmethod def from_data(cls, data, reason: str) -> 'SkippedContent': """Generate a SkippedContent from a given `data` byte string. This populates the SkippedContent with the hashes and length for the data passed as argument. You can use `attr.evolve` on such a generated content to nullify some of its attributes, e.g. for tests. """ d = cls._hash_data(data) del d['data'] d['status'] = 'absent' d['reason'] = reason return cls(**d) @classmethod def from_dict(cls, d): d2 = d d = d.copy() if d.pop('data', None) is not None: raise ValueError('SkippedContent has no "data" attribute %r' % d2) return super().from_dict(d, use_subclass=False) diff --git a/swh/model/tests/test_model.py b/swh/model/tests/test_model.py index 35d5695..e167d5a 100644 --- a/swh/model/tests/test_model.py +++ b/swh/model/tests/test_model.py @@ -1,349 +1,439 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy import datetime import attr from attrs_strict import AttributeTypeError from hypothesis import given from hypothesis.strategies import binary import pytest from swh.model.model import ( Content, SkippedContent, Directory, Revision, Release, Snapshot, Timestamp, TimestampWithTimezone, MissingData, Person ) from swh.model.hashutil import hash_to_bytes, MultiHash + from swh.model.hypothesis_strategies import ( - objects, origins, origin_visits, origin_visit_updates + objects, origins, origin_visits, origin_visit_updates, timestamps ) + from swh.model.identifiers import ( directory_identifier, revision_identifier, release_identifier, snapshot_identifier ) from swh.model.tests.test_identifiers import ( directory_example, revision_example, release_example, snapshot_example ) @given(objects()) def test_todict_inverse_fromdict(objtype_and_obj): (obj_type, obj) = objtype_and_obj if obj_type in ('origin', 'origin_visit'): return obj_as_dict = obj.to_dict() obj_as_dict_copy = copy.deepcopy(obj_as_dict) # Check the composition of to_dict and from_dict is the identity assert obj == type(obj).from_dict(obj_as_dict) # Check from_dict() does not change the input dict assert obj_as_dict == obj_as_dict_copy # Check the composition of from_dict and to_dict is the identity assert obj_as_dict == type(obj).from_dict(obj_as_dict).to_dict() @given(origins()) def test_todict_origins(origin): obj = origin.to_dict() assert 'type' not in obj assert type(origin)(url=origin.url) == type(origin).from_dict(obj) @given(origin_visits()) def test_todict_origin_visits(origin_visit): obj = origin_visit.to_dict() assert origin_visit == type(origin_visit).from_dict(obj) @given(origin_visit_updates()) def test_todict_origin_visit_updates(origin_visit_update): obj = origin_visit_update.to_dict() assert origin_visit_update == type(origin_visit_update).from_dict(obj) +# Timestamp + +@given(timestamps()) +def test_timestamps_strategy(timestamp): + attr.validate(timestamp) + + def test_timestamp_seconds(): attr.validate(Timestamp(seconds=0, microseconds=0)) with pytest.raises(AttributeTypeError): Timestamp(seconds='0', microseconds=0) attr.validate(Timestamp(seconds=2**63-1, microseconds=0)) with pytest.raises(ValueError): Timestamp(seconds=2**63, microseconds=0) attr.validate(Timestamp(seconds=-2**63, microseconds=0)) with pytest.raises(ValueError): Timestamp(seconds=-2**63-1, microseconds=0) def test_timestamp_microseconds(): attr.validate(Timestamp(seconds=0, microseconds=0)) with pytest.raises(AttributeTypeError): Timestamp(seconds=0, microseconds='0') attr.validate(Timestamp(seconds=0, microseconds=10**6-1)) with pytest.raises(ValueError): Timestamp(seconds=0, microseconds=10**6) with pytest.raises(ValueError): Timestamp(seconds=0, microseconds=-1) +def test_timestamp_from_dict(): + assert Timestamp.from_dict({'seconds': 10, 'microseconds': 5}) + + with pytest.raises(AttributeTypeError): + Timestamp.from_dict({'seconds': '10', 'microseconds': 5}) + + with pytest.raises(AttributeTypeError): + Timestamp.from_dict({'seconds': 10, 'microseconds': '5'}) + with pytest.raises(ValueError): + Timestamp.from_dict({'seconds': 0, 'microseconds': -1}) + + Timestamp.from_dict({'seconds': 0, 'microseconds': 10**6 - 1}) + with pytest.raises(ValueError): + Timestamp.from_dict({'seconds': 0, 'microseconds': 10**6}) + + +# TimestampWithTimezone + +def test_timestampwithtimezone(): + ts = Timestamp(seconds=0, microseconds=0) + tstz = TimestampWithTimezone( + timestamp=ts, + offset=0, + negative_utc=False) + attr.validate(tstz) + assert tstz.negative_utc is False + + attr.validate(TimestampWithTimezone( + timestamp=ts, + offset=10, + negative_utc=False)) + + attr.validate(TimestampWithTimezone( + timestamp=ts, + offset=-10, + negative_utc=False)) + + tstz = TimestampWithTimezone( + timestamp=ts, + offset=0, + negative_utc=True) + attr.validate(tstz) + assert tstz.negative_utc is True + + with pytest.raises(AttributeTypeError): + TimestampWithTimezone( + timestamp=datetime.datetime.now(), + offset=0, + negative_utc=False) + + with pytest.raises(AttributeTypeError): + TimestampWithTimezone( + timestamp=ts, + offset='0', + negative_utc=False) + + with pytest.raises(AttributeTypeError): + TimestampWithTimezone( + timestamp=ts, + offset=1.0, + negative_utc=False) + + with pytest.raises(AttributeTypeError): + TimestampWithTimezone( + timestamp=ts, + offset=1, + negative_utc=0) + + with pytest.raises(ValueError): + TimestampWithTimezone( + timestamp=ts, + offset=1, + negative_utc=True) + + with pytest.raises(ValueError): + TimestampWithTimezone( + timestamp=ts, + offset=-1, + negative_utc=True) + + def test_timestampwithtimezone_from_datetime(): tz = datetime.timezone(datetime.timedelta(minutes=+60)) date = datetime.datetime( 2020, 2, 27, 14, 39, 19, tzinfo=tz) tstz = TimestampWithTimezone.from_datetime(date) assert tstz == TimestampWithTimezone( timestamp=Timestamp( seconds=1582810759, microseconds=0, ), offset=60, negative_utc=False, ) def test_timestampwithtimezone_from_iso8601(): date = '2020-02-27 14:39:19.123456+0100' tstz = TimestampWithTimezone.from_iso8601(date) assert tstz == TimestampWithTimezone( timestamp=Timestamp( seconds=1582810759, microseconds=123456, ), offset=60, negative_utc=False, ) def test_timestampwithtimezone_from_iso8601_negative_utc(): date = '2020-02-27 13:39:19-0000' tstz = TimestampWithTimezone.from_iso8601(date) assert tstz == TimestampWithTimezone( timestamp=Timestamp( seconds=1582810759, microseconds=0, ), offset=0, negative_utc=True, ) def test_person_from_fullname(): """The author should have name, email and fullname filled. """ actual_person = Person.from_fullname(b'tony ') assert actual_person == Person( fullname=b'tony ', name=b'tony', email=b'ynot@dagobah', ) def test_person_from_fullname_no_email(): """The author and fullname should be the same as the input (author). """ actual_person = Person.from_fullname(b'tony') assert actual_person == Person( fullname=b'tony', name=b'tony', email=None, ) def test_person_from_fullname_empty_person(): """Empty person has only its fullname filled with the empty byte-string. """ actual_person = Person.from_fullname(b'') assert actual_person == Person( fullname=b'', name=None, email=None, ) def test_git_author_line_to_author(): # edge case out of the way with pytest.raises(TypeError): Person.from_fullname(None) tests = { b'a ': Person( name=b'a', email=b'b@c.com', fullname=b'a ', ), b'': Person( name=None, email=b'foo@bar.com', fullname=b'', ), b'malformed ': Person( name=b'malformed', email=b'"', ), b'trailing ': Person( name=b'trailing', email=b'sp@c.e', fullname=b'trailing ', ), b'no': Person( name=b'no', email=b'sp@c.e', fullname=b'no', ), b' more ': Person( name=b'more', email=b'sp@c.es', fullname=b' more ', ), b' <>': Person( name=None, email=None, fullname=b' <>', ), } for person in sorted(tests): expected_person = tests[person] assert expected_person == Person.from_fullname(person) def test_content_get_hash(): hashes = dict( sha1=b'foo', sha1_git=b'bar', sha256=b'baz', blake2s256=b'qux') c = Content(length=42, status='visible', **hashes) for (hash_name, hash_) in hashes.items(): assert c.get_hash(hash_name) == hash_ def test_content_hashes(): hashes = dict( sha1=b'foo', sha1_git=b'bar', sha256=b'baz', blake2s256=b'qux') c = Content(length=42, status='visible', **hashes) assert c.hashes() == hashes def test_content_data(): c = Content( length=42, status='visible', data=b'foo', sha1=b'foo', sha1_git=b'bar', sha256=b'baz', blake2s256=b'qux') assert c.with_data() == c def test_content_data_missing(): c = Content( length=42, status='visible', sha1=b'foo', sha1_git=b'bar', sha256=b'baz', blake2s256=b'qux') with pytest.raises(MissingData): c.with_data() @given(binary(max_size=4096)) def test_content_from_data(data): c = Content.from_data(data) assert c.data == data assert c.length == len(data) assert c.status == 'visible' for key, value in MultiHash.from_data(data).digest().items(): assert getattr(c, key) == value @given(binary(max_size=4096)) def test_hidden_content_from_data(data): c = Content.from_data(data, status='hidden') assert c.data == data assert c.length == len(data) assert c.status == 'hidden' for key, value in MultiHash.from_data(data).digest().items(): assert getattr(c, key) == value @given(binary(max_size=4096)) def test_skipped_content_from_data(data): c = SkippedContent.from_data(data, reason='reason') assert c.reason == 'reason' assert c.length == len(data) assert c.status == 'absent' for key, value in MultiHash.from_data(data).digest().items(): assert getattr(c, key) == value def test_directory_model_id_computation(): dir_dict = directory_example.copy() del dir_dict['id'] dir_id = hash_to_bytes(directory_identifier(dir_dict)) dir_model = Directory.from_dict(dir_dict) assert dir_model.id == dir_id def test_revision_model_id_computation(): rev_dict = revision_example.copy() del rev_dict['id'] rev_id = hash_to_bytes(revision_identifier(rev_dict)) rev_model = Revision.from_dict(rev_dict) assert rev_model.id == rev_id def test_revision_model_id_computation_with_no_date(): """We can have revision with date to None """ rev_dict = revision_example.copy() rev_dict['date'] = None rev_dict['committer_date'] = None del rev_dict['id'] rev_id = hash_to_bytes(revision_identifier(rev_dict)) rev_model = Revision.from_dict(rev_dict) assert rev_model.date is None assert rev_model.committer_date is None assert rev_model.id == rev_id def test_release_model_id_computation(): rel_dict = release_example.copy() del rel_dict['id'] rel_id = hash_to_bytes(release_identifier(rel_dict)) rel_model = Release.from_dict(rel_dict) assert isinstance(rel_model.date, TimestampWithTimezone) assert rel_model.id == hash_to_bytes(rel_id) def test_snapshot_model_id_computation(): snp_dict = snapshot_example.copy() del snp_dict['id'] snp_id = hash_to_bytes(snapshot_identifier(snp_dict)) snp_model = Snapshot.from_dict(snp_dict) assert snp_model.id == snp_id