diff --git a/swh/model/hypothesis_strategies.py b/swh/model/hypothesis_strategies.py index 448487a..bc9d58c 100644 --- a/swh/model/hypothesis_strategies.py +++ b/swh/model/hypothesis_strategies.py @@ -1,460 +1,463 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from hypothesis import assume from hypothesis.extra.dateutil import timezones from hypothesis.strategies import ( binary, booleans, builds, characters, composite, datetimes, dictionaries, from_regex, integers, just, lists, none, one_of, sampled_from, sets, text, ) from .from_disk import DentryPerms from .model import ( Person, Timestamp, TimestampWithTimezone, Origin, OriginVisit, OriginVisitStatus, Snapshot, SnapshotBranch, ObjectType, TargetType, Release, Revision, RevisionType, BaseContent, Directory, DirectoryEntry, Content, SkippedContent, ) from .identifiers import snapshot_identifier, identifier_to_bytes pgsql_alphabet = characters( blacklist_categories=("Cs",), blacklist_characters=["\u0000"] ) # postgresql does not like these def optional(strategy): return one_of(none(), strategy) def pgsql_text(): return text(alphabet=pgsql_alphabet) def sha1_git(): return binary(min_size=20, max_size=20) def sha1(): return binary(min_size=20, max_size=20) def aware_datetimes(): # datetimes in Software Heritage are not used for software artifacts # (which may be much older than 2000), but only for objects like scheduler # task runs, and origin visits, which were created by Software Heritage, # so at least in 2015. # We're forbidding old datetimes, because until 1956, many timezones had seconds # in their "UTC offsets" (see # ), which is not # encodable in ISO8601; and we need our datetimes to be ISO8601-encodable in the # RPC protocol min_value = datetime.datetime(2000, 1, 1, 0, 0, 0) return datetimes(min_value=min_value, timezones=timezones()) @composite def urls(draw): protocol = draw(sampled_from(["git", "http", "https", "deb"])) domain = draw(from_regex(r"\A([a-z]([a-z0-9-]*)\.){1,3}[a-z0-9]+\Z")) return "%s://%s" % (protocol, domain) -def persons_d(): - return builds( - dict, fullname=binary(), email=optional(binary()), name=optional(binary()), - ) +@composite +def persons_d(draw): + fullname = draw(binary()) + email = draw(optional(binary())) + name = draw(optional(binary())) + assume(not (len(fullname) == 32 and email is None and name is None)) + return dict(fullname=fullname, name=name, email=email) def persons(): return persons_d().map(Person.from_dict) def timestamps_d(): max_seconds = datetime.datetime.max.replace( tzinfo=datetime.timezone.utc ).timestamp() min_seconds = datetime.datetime.min.replace( tzinfo=datetime.timezone.utc ).timestamp() return builds( dict, seconds=integers(min_seconds, max_seconds), microseconds=integers(0, 1000000), ) def timestamps(): return timestamps_d().map(Timestamp.from_dict) @composite def timestamps_with_timezone_d( draw, timestamp=timestamps_d(), offset=integers(min_value=-14 * 60, max_value=14 * 60), negative_utc=booleans(), ): timestamp = draw(timestamp) offset = draw(offset) negative_utc = draw(negative_utc) assume(not (negative_utc and offset)) return dict(timestamp=timestamp, offset=offset, negative_utc=negative_utc) timestamps_with_timezone = timestamps_with_timezone_d().map( TimestampWithTimezone.from_dict ) def origins_d(): return builds(dict, url=urls()) def origins(): return origins_d().map(Origin.from_dict) def origin_visits_d(): return builds( dict, visit=integers(0, 1000), origin=urls(), date=aware_datetimes(), status=sampled_from(["ongoing", "full", "partial"]), type=pgsql_text(), snapshot=optional(sha1_git()), ) def origin_visits(): return origin_visits_d().map(OriginVisit.from_dict) def metadata_dicts(): return dictionaries(pgsql_text(), pgsql_text()) def origin_visit_statuses_d(): return builds( dict, visit=integers(0, 1000), origin=urls(), status=sampled_from(["ongoing", "full", "partial"]), date=aware_datetimes(), snapshot=optional(sha1_git()), metadata=one_of(none(), metadata_dicts()), ) def origin_visit_statuses(): return origin_visit_statuses_d().map(OriginVisitStatus.from_dict) @composite def releases_d(draw): target_type = sampled_from([x.value for x in ObjectType]) name = binary() message = binary() synthetic = booleans() target = sha1_git() metadata = one_of(none(), revision_metadata()) return draw( one_of( builds( dict, name=name, message=message, synthetic=synthetic, author=none(), date=none(), target=target, target_type=target_type, metadata=metadata, ), builds( dict, name=name, message=message, synthetic=synthetic, date=timestamps_with_timezone_d(), author=persons_d(), target=target, target_type=target_type, metadata=metadata, ), ) ) def releases(): return releases_d().map(Release.from_dict) revision_metadata = metadata_dicts def revisions_d(): return builds( dict, message=binary(), synthetic=booleans(), author=persons_d(), committer=persons_d(), date=timestamps_with_timezone_d(), committer_date=timestamps_with_timezone_d(), parents=lists(sha1_git()), directory=sha1_git(), type=sampled_from([x.value for x in RevisionType]), metadata=one_of(none(), revision_metadata()), ) # TODO: metadata['extra_headers'] can have binary keys and values def revisions(): return revisions_d().map(Revision.from_dict) def directory_entries_d(): return builds( dict, name=binary(), target=sha1_git(), type=sampled_from(["file", "dir", "rev"]), perms=sampled_from([perm.value for perm in DentryPerms]), ) def directory_entries(): return directory_entries_d().map(DirectoryEntry) def directories_d(): return builds(dict, entries=lists(directory_entries_d())) def directories(): return directories_d().map(Directory.from_dict) def contents_d(): return one_of(present_contents_d(), skipped_contents_d()) def contents(): return one_of(present_contents(), skipped_contents()) def present_contents_d(): return builds( dict, data=binary(max_size=4096), ctime=optional(aware_datetimes()), status=one_of(just("visible"), just("hidden")), ) def present_contents(): return present_contents_d().map(lambda d: Content.from_data(**d)) @composite def skipped_contents_d(draw): result = BaseContent._hash_data(draw(binary(max_size=4096))) result.pop("data") nullify_attrs = draw( sets(sampled_from(["sha1", "sha1_git", "sha256", "blake2s256"])) ) for k in nullify_attrs: result[k] = None result["reason"] = draw(pgsql_text()) result["status"] = "absent" result["ctime"] = draw(optional(aware_datetimes())) return result def skipped_contents(): return skipped_contents_d().map(SkippedContent.from_dict) def branch_names(): return binary(min_size=1) def branch_targets_object_d(): return builds( dict, target=sha1_git(), target_type=sampled_from( [x.value for x in TargetType if x.value not in ("alias",)] ), ) def branch_targets_alias_d(): return builds( dict, target=sha1_git(), target_type=just("alias") ) # TargetType.ALIAS.value)) def branch_targets_d(*, only_objects=False): if only_objects: return branch_targets_object_d() else: return one_of(branch_targets_alias_d(), branch_targets_object_d()) def branch_targets(*, only_objects=False): return builds(SnapshotBranch.from_dict, branch_targets_d(only_objects=only_objects)) @composite def snapshots_d(draw, *, min_size=0, max_size=100, only_objects=False): branches = draw( dictionaries( keys=branch_names(), values=one_of(none(), branch_targets_d(only_objects=only_objects)), min_size=min_size, max_size=max_size, ) ) if not only_objects: # Make sure aliases point to actual branches unresolved_aliases = { branch: target["target"] for branch, target in branches.items() if ( target and target["target_type"] == "alias" and target["target"] not in branches ) } for alias_name, alias_target in unresolved_aliases.items(): # Override alias branch with one pointing to a real object # if max_size constraint is reached alias = alias_target if len(branches) < max_size else alias_name branches[alias] = draw(branch_targets_d(only_objects=True)) # Ensure no cycles between aliases while True: try: id_ = snapshot_identifier( { "branches": { name: branch or None for (name, branch) in branches.items() } } ) except ValueError as e: for (source, target) in e.args[1]: branches[source] = draw(branch_targets_d(only_objects=True)) else: break return dict(id=identifier_to_bytes(id_), branches=branches) def snapshots(*, min_size=0, max_size=100, only_objects=False): return snapshots_d( min_size=min_size, max_size=max_size, only_objects=only_objects ).map(Snapshot.from_dict) def objects(blacklist_types=("origin_visit_status",), split_content=False): """generates a random couple (type, obj) which obj is an instance of the Model class corresponding to obj_type. `blacklist_types` is a list of obj_type to exclude from the strategy. If `split_content` is True, generates Content and SkippedContent under different obj_type, resp. "content" and "skipped_content". """ strategies = [ ("origin", origins), ("origin_visit", origin_visits), ("origin_visit_status", origin_visit_statuses), ("snapshot", snapshots), ("release", releases), ("revision", revisions), ("directory", directories), ] if split_content: strategies.append(("content", present_contents)) strategies.append(("skipped_content", skipped_contents)) else: strategies.append(("content", contents)) args = [ obj_gen().map(lambda x, obj_type=obj_type: (obj_type, x)) for (obj_type, obj_gen) in strategies if obj_type not in blacklist_types ] return one_of(*args) def object_dicts(blacklist_types=("origin_visit_status",), split_content=False): """generates a random couple (type, dict) which dict is suitable for .from_dict() factory methods. `blacklist_types` is a list of obj_type to exclude from the strategy. If `split_content` is True, generates Content and SkippedContent under different obj_type, resp. "content" and "skipped_content". """ strategies = [ ("origin", origins_d), ("origin_visit", origin_visits_d), ("origin_visit_status", origin_visit_statuses_d), ("snapshot", snapshots_d), ("release", releases_d), ("revision", revisions_d), ("directory", directories_d), ] if split_content: strategies.append(("content", present_contents_d)) strategies.append(("skipped_content", skipped_contents_d)) else: strategies.append(("content", contents_d)) args = [ obj_gen().map(lambda x, obj_type=obj_type: (obj_type, x)) for (obj_type, obj_gen) in strategies if obj_type not in blacklist_types ] return one_of(*args) diff --git a/swh/model/model.py b/swh/model/model.py index 74702cf..7db255c 100644 --- a/swh/model/model.py +++ b/swh/model/model.py @@ -1,612 +1,649 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from abc import ABCMeta, abstractmethod from enum import Enum -from typing import Dict, List, Optional, Union +from hashlib import sha256 +from typing import Dict, List, Optional, TypeVar, Union import attr from attrs_strict import type_validator import dateutil.parser import iso8601 from .identifiers import ( normalize_timestamp, directory_identifier, revision_identifier, release_identifier, snapshot_identifier, ) from .hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, MultiHash class MissingData(Exception): """Raised by `Content.with_data` when it has no way of fetching the data (but not when fetching the data fails).""" pass SHA1_SIZE = 20 # TODO: Limit this to 20 bytes Sha1Git = bytes def dictify(value): "Helper function used by BaseModel.to_dict()" if isinstance(value, BaseModel): return value.to_dict() elif isinstance(value, Enum): return value.value elif isinstance(value, dict): return {k: dictify(v) for k, v in value.items()} elif isinstance(value, list): return [dictify(v) for v in value] else: return value +ModelType = TypeVar("ModelType", bound="BaseModel") + + class BaseModel: """Base class for SWH model classes. Provides serialization/deserialization to/from Python dictionaries, that are suitable for JSON/msgpack-like formats.""" def to_dict(self): """Wrapper of `attr.asdict` that can be overridden by subclasses that have special handling of some of the fields.""" return dictify(attr.asdict(self, recurse=False)) @classmethod def from_dict(cls, d): """Takes a dictionary representing a tree of SWH objects, and recursively builds the corresponding objects.""" return cls(**d) + def anonymize(self: ModelType) -> Optional[ModelType]: + """Returns an anonymized version of the object, if needed. + + If the object model does not need/support anonymization, returns None. + """ + return None + class HashableObject(metaclass=ABCMeta): """Mixin to automatically compute object identifier hash when the associated model is instantiated.""" @staticmethod @abstractmethod def compute_hash(object_dict): """Derived model classes must implement this to compute the object hash from its dict representation.""" pass def __attrs_post_init__(self): if not self.id: obj_id = hash_to_bytes(self.compute_hash(self.to_dict())) object.__setattr__(self, "id", obj_id) @attr.s(frozen=True) class Person(BaseModel): """Represents the author/committer of a revision or release.""" fullname = attr.ib(type=bytes, validator=type_validator()) name = attr.ib(type=Optional[bytes], validator=type_validator()) email = attr.ib(type=Optional[bytes], validator=type_validator()) @classmethod def from_fullname(cls, fullname: bytes): """Returns a Person object, by guessing the name and email from the fullname, in the `name ` format. The fullname is left unchanged.""" if fullname is None: raise TypeError("fullname is None.") name: Optional[bytes] email: Optional[bytes] try: open_bracket = fullname.index(b"<") except ValueError: name = fullname email = None else: raw_name = fullname[:open_bracket] raw_email = fullname[open_bracket + 1 :] if not raw_name: name = None else: name = raw_name.strip() try: close_bracket = raw_email.rindex(b">") except ValueError: email = raw_email else: email = raw_email[:close_bracket] return Person(name=name or None, email=email or None, fullname=fullname,) + def anonymize(self) -> "Person": + """Returns an anonymized version of the Person object. + + Anonymization is simply a Person which fullname is the hashed, with unset name + or email. + """ + return Person(fullname=sha256(self.fullname).digest(), name=None, email=None,) + @attr.s(frozen=True) class Timestamp(BaseModel): """Represents a naive timestamp from a VCS.""" seconds = attr.ib(type=int, validator=type_validator()) microseconds = attr.ib(type=int, validator=type_validator()) @seconds.validator def check_seconds(self, attribute, value): """Check that seconds fit in a 64-bits signed integer.""" if not (-(2 ** 63) <= value < 2 ** 63): raise ValueError("Seconds must be a signed 64-bits integer.") @microseconds.validator def check_microseconds(self, attribute, value): """Checks that microseconds are positive and < 1000000.""" if not (0 <= value < 10 ** 6): raise ValueError("Microseconds must be in [0, 1000000[.") @attr.s(frozen=True) class TimestampWithTimezone(BaseModel): """Represents a TZ-aware timestamp from a VCS.""" timestamp = attr.ib(type=Timestamp, validator=type_validator()) offset = attr.ib(type=int, validator=type_validator()) negative_utc = attr.ib(type=bool, validator=type_validator()) @offset.validator def check_offset(self, attribute, value): """Checks the offset is a 16-bits signed integer (in theory, it should always be between -14 and +14 hours).""" if not (-(2 ** 15) <= value < 2 ** 15): # max 14 hours offset in theory, but you never know what # you'll find in the wild... raise ValueError("offset too large: %d minutes" % value) @negative_utc.validator def check_negative_utc(self, attribute, value): if self.offset and value: raise ValueError("negative_utc can only be True is offset=0") @classmethod def from_dict(cls, obj: Union[Dict, datetime.datetime, int]): """Builds a TimestampWithTimezone from any of the formats accepted by :func:`swh.model.normalize_timestamp`.""" # TODO: this accept way more types than just dicts; find a better # name d = normalize_timestamp(obj) return cls( timestamp=Timestamp.from_dict(d["timestamp"]), offset=d["offset"], negative_utc=d["negative_utc"], ) @classmethod def from_datetime(cls, dt: datetime.datetime): return cls.from_dict(dt) @classmethod def from_iso8601(cls, s): """Builds a TimestampWithTimezone from an ISO8601-formatted string. """ dt = iso8601.parse_date(s) tstz = cls.from_datetime(dt) if dt.tzname() == "-00:00": tstz = attr.evolve(tstz, negative_utc=True) return tstz @attr.s(frozen=True) class Origin(BaseModel): """Represents a software source: a VCS and an URL.""" url = attr.ib(type=str, validator=type_validator()) @attr.s(frozen=True) class OriginVisit(BaseModel): """Represents a visit of an origin at a given point in time, by a SWH loader.""" origin = attr.ib(type=str, validator=type_validator()) date = attr.ib(type=datetime.datetime, validator=type_validator()) status = attr.ib( type=str, validator=attr.validators.in_(["ongoing", "full", "partial"]) ) type = attr.ib(type=str, validator=type_validator()) snapshot = attr.ib(type=Optional[Sha1Git], validator=type_validator()) metadata = attr.ib( type=Optional[Dict[str, object]], validator=type_validator(), default=None ) visit = attr.ib(type=Optional[int], validator=type_validator(), default=None) """Should not be set before calling 'origin_visit_add()'.""" def to_dict(self): """Serializes the date as a string and omits the visit id if it is `None`.""" ov = super().to_dict() if ov["visit"] is None: del ov["visit"] return ov @classmethod def from_dict(cls, d): """Parses the date from a string, and accepts missing visit ids.""" if isinstance(d["date"], str): d = d.copy() d["date"] = dateutil.parser.parse(d["date"]) return super().from_dict(d) @attr.s(frozen=True) class OriginVisitStatus(BaseModel): """Represents a visit update of an origin at a given point in time. """ origin = attr.ib(type=str, validator=type_validator()) visit = attr.ib(type=int, validator=type_validator()) date = attr.ib(type=datetime.datetime, validator=type_validator()) status = attr.ib( type=str, validator=attr.validators.in_(["ongoing", "full", "partial"]) ) snapshot = attr.ib(type=Optional[Sha1Git], validator=type_validator()) metadata = attr.ib( type=Optional[Dict[str, object]], validator=type_validator(), default=None ) class TargetType(Enum): """The type of content pointed to by a snapshot branch. Usually a revision or an alias.""" CONTENT = "content" DIRECTORY = "directory" REVISION = "revision" RELEASE = "release" SNAPSHOT = "snapshot" ALIAS = "alias" class ObjectType(Enum): """The type of content pointed to by a release. Usually a revision""" CONTENT = "content" DIRECTORY = "directory" REVISION = "revision" RELEASE = "release" SNAPSHOT = "snapshot" @attr.s(frozen=True) class SnapshotBranch(BaseModel): """Represents one of the branches of a snapshot.""" target = attr.ib(type=bytes, validator=type_validator()) target_type = attr.ib(type=TargetType, validator=type_validator()) @target.validator def check_target(self, attribute, value): """Checks the target type is not an alias, checks the target is a valid sha1_git.""" if self.target_type != TargetType.ALIAS and self.target is not None: if len(value) != 20: raise ValueError("Wrong length for bytes identifier: %d" % len(value)) @classmethod def from_dict(cls, d): return cls(target=d["target"], target_type=TargetType(d["target_type"])) @attr.s(frozen=True) class Snapshot(BaseModel, HashableObject): """Represents the full state of an origin at a given point in time.""" branches = attr.ib( type=Dict[bytes, Optional[SnapshotBranch]], validator=type_validator() ) id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") @staticmethod def compute_hash(object_dict): return snapshot_identifier(object_dict) @classmethod def from_dict(cls, d): d = d.copy() return cls( branches={ name: SnapshotBranch.from_dict(branch) if branch else None for (name, branch) in d.pop("branches").items() }, **d, ) @attr.s(frozen=True) class Release(BaseModel, HashableObject): name = attr.ib(type=bytes, validator=type_validator()) message = attr.ib(type=Optional[bytes], validator=type_validator()) target = attr.ib(type=Optional[Sha1Git], validator=type_validator()) target_type = attr.ib(type=ObjectType, validator=type_validator()) synthetic = attr.ib(type=bool, validator=type_validator()) author = attr.ib(type=Optional[Person], validator=type_validator(), default=None) date = attr.ib( type=Optional[TimestampWithTimezone], validator=type_validator(), default=None ) metadata = attr.ib( type=Optional[Dict[str, object]], validator=type_validator(), default=None ) id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") @staticmethod def compute_hash(object_dict): return release_identifier(object_dict) @author.validator def check_author(self, attribute, value): """If the author is `None`, checks the date is `None` too.""" if self.author is None and self.date is not None: raise ValueError("release date must be None if author is None.") def to_dict(self): rel = super().to_dict() if rel["metadata"] is None: del rel["metadata"] return rel @classmethod def from_dict(cls, d): d = d.copy() if d.get("author"): d["author"] = Person.from_dict(d["author"]) if d.get("date"): d["date"] = TimestampWithTimezone.from_dict(d["date"]) return cls(target_type=ObjectType(d.pop("target_type")), **d) + def anonymize(self) -> "Release": + """Returns an anonymized version of the Release object. + + Anonymization consists in replacing the author with an anonymized Person object. + """ + author = self.author and self.author.anonymize() + return attr.evolve(self, author=author) + class RevisionType(Enum): GIT = "git" TAR = "tar" DSC = "dsc" SUBVERSION = "svn" MERCURIAL = "hg" @attr.s(frozen=True) class Revision(BaseModel, HashableObject): message = attr.ib(type=bytes, validator=type_validator()) author = attr.ib(type=Person, validator=type_validator()) committer = attr.ib(type=Person, validator=type_validator()) date = attr.ib(type=Optional[TimestampWithTimezone], validator=type_validator()) committer_date = attr.ib( type=Optional[TimestampWithTimezone], validator=type_validator() ) type = attr.ib(type=RevisionType, validator=type_validator()) directory = attr.ib(type=Sha1Git, validator=type_validator()) synthetic = attr.ib(type=bool, validator=type_validator()) metadata = attr.ib( type=Optional[Dict[str, object]], validator=type_validator(), default=None ) parents = attr.ib( type=List[Sha1Git], validator=type_validator(), default=attr.Factory(list) ) id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") @staticmethod def compute_hash(object_dict): return revision_identifier(object_dict) @classmethod def from_dict(cls, d): d = d.copy() date = d.pop("date") if date: date = TimestampWithTimezone.from_dict(date) committer_date = d.pop("committer_date") if committer_date: committer_date = TimestampWithTimezone.from_dict(committer_date) return cls( author=Person.from_dict(d.pop("author")), committer=Person.from_dict(d.pop("committer")), date=date, committer_date=committer_date, type=RevisionType(d.pop("type")), **d, ) + def anonymize(self) -> "Revision": + """Returns an anonymized version of the Revision object. + + Anonymization consists in replacing the author and committer with an anonymized + Person object. + """ + return attr.evolve( + self, author=self.author.anonymize(), committer=self.committer.anonymize() + ) + @attr.s(frozen=True) class DirectoryEntry(BaseModel): name = attr.ib(type=bytes, validator=type_validator()) type = attr.ib(type=str, validator=attr.validators.in_(["file", "dir", "rev"])) target = attr.ib(type=Sha1Git, validator=type_validator()) perms = attr.ib(type=int, validator=type_validator()) """Usually one of the values of `swh.model.from_disk.DentryPerms`.""" @attr.s(frozen=True) class Directory(BaseModel, HashableObject): entries = attr.ib(type=List[DirectoryEntry], validator=type_validator()) id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") @staticmethod def compute_hash(object_dict): return directory_identifier(object_dict) @classmethod def from_dict(cls, d): d = d.copy() return cls( entries=[DirectoryEntry.from_dict(entry) for entry in d.pop("entries")], **d ) @attr.s(frozen=True) class BaseContent(BaseModel): status = attr.ib( type=str, validator=attr.validators.in_(["visible", "hidden", "absent"]) ) @staticmethod def _hash_data(data: bytes): """Hash some data, returning most of the fields of a content object""" d = MultiHash.from_data(data).digest() d["data"] = data d["length"] = len(data) return d @classmethod def from_dict(cls, d, use_subclass=True): if use_subclass: # Chooses a subclass to instantiate instead. if d["status"] == "absent": return SkippedContent.from_dict(d) else: return Content.from_dict(d) else: return super().from_dict(d) def get_hash(self, hash_name): if hash_name not in DEFAULT_ALGORITHMS: raise ValueError("{} is not a valid hash name.".format(hash_name)) return getattr(self, hash_name) def hashes(self) -> Dict[str, bytes]: """Returns a dictionary {hash_name: hash_value}""" return {algo: getattr(self, algo) for algo in DEFAULT_ALGORITHMS} @attr.s(frozen=True) class Content(BaseContent): sha1 = attr.ib(type=bytes, validator=type_validator()) sha1_git = attr.ib(type=Sha1Git, validator=type_validator()) sha256 = attr.ib(type=bytes, validator=type_validator()) blake2s256 = attr.ib(type=bytes, validator=type_validator()) length = attr.ib(type=int, validator=type_validator()) status = attr.ib( type=str, validator=attr.validators.in_(["visible", "hidden"]), default="visible", ) data = attr.ib(type=Optional[bytes], validator=type_validator(), default=None) ctime = attr.ib( type=Optional[datetime.datetime], validator=type_validator(), default=None ) @length.validator def check_length(self, attribute, value): """Checks the length is positive.""" if value < 0: raise ValueError("Length must be positive.") def to_dict(self): content = super().to_dict() if content["data"] is None: del content["data"] return content @classmethod def from_data(cls, data, status="visible", ctime=None) -> "Content": """Generate a Content from a given `data` byte string. This populates the Content with the hashes and length for the data passed as argument, as well as the data itself. """ d = cls._hash_data(data) d["status"] = status d["ctime"] = ctime return cls(**d) @classmethod def from_dict(cls, d): if isinstance(d.get("ctime"), str): d = d.copy() d["ctime"] = dateutil.parser.parse(d["ctime"]) return super().from_dict(d, use_subclass=False) def with_data(self) -> "Content": """Loads the `data` attribute; meaning that it is guaranteed not to be None after this call. This call is almost a no-op, but subclasses may overload this method to lazy-load data (eg. from disk or objstorage).""" if self.data is None: raise MissingData("Content data is None.") return self @attr.s(frozen=True) class SkippedContent(BaseContent): sha1 = attr.ib(type=Optional[bytes], validator=type_validator()) sha1_git = attr.ib(type=Optional[Sha1Git], validator=type_validator()) sha256 = attr.ib(type=Optional[bytes], validator=type_validator()) blake2s256 = attr.ib(type=Optional[bytes], validator=type_validator()) length = attr.ib(type=Optional[int], validator=type_validator()) status = attr.ib(type=str, validator=attr.validators.in_(["absent"])) reason = attr.ib(type=Optional[str], validator=type_validator(), default=None) origin = attr.ib(type=Optional[str], validator=type_validator(), default=None) ctime = attr.ib( type=Optional[datetime.datetime], validator=type_validator(), default=None ) @reason.validator def check_reason(self, attribute, value): """Checks the reason is full if status != absent.""" assert self.reason == value if value is None: raise ValueError("Must provide a reason if content is absent.") @length.validator def check_length(self, attribute, value): """Checks the length is positive or -1.""" if value < -1: raise ValueError("Length must be positive or -1.") def to_dict(self): content = super().to_dict() if content["origin"] is None: del content["origin"] return content @classmethod def from_data( cls, data: bytes, reason: str, ctime: Optional[datetime.datetime] = None ) -> "SkippedContent": """Generate a SkippedContent from a given `data` byte string. This populates the SkippedContent with the hashes and length for the data passed as argument. You can use `attr.evolve` on such a generated content to nullify some of its attributes, e.g. for tests. """ d = cls._hash_data(data) del d["data"] d["status"] = "absent" d["reason"] = reason d["ctime"] = ctime return cls(**d) @classmethod def from_dict(cls, d): d2 = d.copy() if d2.pop("data", None) is not None: raise ValueError('SkippedContent has no "data" attribute %r' % d) return super().from_dict(d2, use_subclass=False) diff --git a/swh/model/tests/test_hypothesis_strategies.py b/swh/model/tests/test_hypothesis_strategies.py index 1622b3c..2be35a0 100644 --- a/swh/model/tests/test_hypothesis_strategies.py +++ b/swh/model/tests/test_hypothesis_strategies.py @@ -1,198 +1,206 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import attr import iso8601 from hypothesis import given, settings from swh.model.hashutil import DEFAULT_ALGORITHMS from swh.model.hypothesis_strategies import ( aware_datetimes, objects, object_dicts, contents, skipped_contents, snapshots, origin_visits, + persons, ) from swh.model.model import TargetType target_types = ("content", "directory", "revision", "release", "snapshot", "alias") all_but_skipped_content = ( "origin", "origin_visit", "origin_visit_status", "snapshot", "release", "revision", "directory", "content", ) @given(objects(blacklist_types=())) def test_generation(obj_type_and_obj): (obj_type, object_) = obj_type_and_obj attr.validate(object_) @given(objects(split_content=False)) def test_generation_merged_content(obj_type_and_obj): # we should never generate a "skipped_content" here assert obj_type_and_obj[0] != "skipped_content" @given(objects(split_content=True, blacklist_types=all_but_skipped_content)) def test_generation_split_content(obj_type_and_obj): # we should only generate "skipped_content" assert obj_type_and_obj[0] == "skipped_content" @given(objects(blacklist_types=("origin_visit", "directory"))) def test_generation_blacklist(obj_type_and_obj): assert obj_type_and_obj[0] not in ("origin_visit", "directory") def assert_nested_dict(obj): """Tests the object is a nested dict and contains no more class from swh.model.model.""" if isinstance(obj, dict): for (key, value) in obj.items(): assert isinstance(key, (str, bytes)), key assert_nested_dict(value) elif isinstance(obj, list): for value in obj: assert_nested_dict(value) elif isinstance(obj, (int, float, str, bytes, bool, type(None), datetime.datetime)): pass else: assert False, obj @given(object_dicts(blacklist_types=())) def test_dicts_generation(obj_type_and_obj): (obj_type, object_) = obj_type_and_obj assert_nested_dict(object_) if obj_type == "content": COMMON_KEYS = set(DEFAULT_ALGORITHMS) | {"length", "status", "ctime"} if object_["status"] == "visible": assert set(object_) <= COMMON_KEYS | {"data"} elif object_["status"] == "absent": assert set(object_) == COMMON_KEYS | {"reason"} elif object_["status"] == "hidden": assert set(object_) <= COMMON_KEYS | {"data"} else: assert False, object_ elif obj_type == "release": assert object_["target_type"] in target_types elif obj_type == "snapshot": for branch in object_["branches"].values(): assert branch is None or branch["target_type"] in target_types @given(aware_datetimes()) def test_datetimes(dt): # Checks this doesn't raise an error, eg. about seconds in the TZ offset iso8601.parse_date(dt.isoformat()) assert dt.tzinfo is not None @given(object_dicts(split_content=False)) def test_dicts_generation_merged_content(obj_type_and_obj): # we should never generate a "skipped_content" here assert obj_type_and_obj[0] != "skipped_content" @given(object_dicts(split_content=True, blacklist_types=all_but_skipped_content)) def test_dicts_generation_split_content(obj_type_and_obj): # we should only generate "skipped_content" assert obj_type_and_obj[0] == "skipped_content" @given(object_dicts(blacklist_types=("release", "content"))) def test_dicts_generation_blacklist(obj_type_and_obj): assert obj_type_and_obj[0] not in ("release", "content") @given(objects()) def test_model_to_dicts(obj_type_and_obj): (obj_type, object_) = obj_type_and_obj obj_dict = object_.to_dict() assert_nested_dict(obj_dict) if obj_type == "content": COMMON_KEYS = set(DEFAULT_ALGORITHMS) | {"length", "status", "ctime"} if obj_dict["status"] == "visible": assert set(obj_dict) == COMMON_KEYS | {"data"} elif obj_dict["status"] == "absent": assert set(obj_dict) == COMMON_KEYS | {"reason"} elif obj_dict["status"] == "hidden": assert set(obj_dict) == COMMON_KEYS | {"data"} else: assert False, obj_dict elif obj_type == "release": assert obj_dict["target_type"] in target_types elif obj_type == "snapshot": for branch in obj_dict["branches"].values(): assert branch is None or branch["target_type"] in target_types @given(contents()) def test_content_aware_datetime(cont): assert cont.ctime is None or cont.ctime.tzinfo is not None @given(skipped_contents()) def test_skipped_content_aware_datetime(cont): assert cont.ctime is None or cont.ctime.tzinfo is not None _min_snp_size = 10 _max_snp_size = 100 @given(snapshots(min_size=_min_snp_size, max_size=_max_snp_size)) @settings(max_examples=1) def test_snapshots_strategy(snapshot): branches = snapshot.branches assert len(branches) >= _min_snp_size assert len(branches) <= _max_snp_size aliases = [] # check snapshot integrity for name, branch in branches.items(): assert branch is None or branch.target_type.value in target_types if branch is not None and branch.target_type == TargetType.ALIAS: aliases.append(name) assert branch.target in branches # check no cycles between aliases for alias in aliases: processed_alias = set() current_alias = alias while ( branches[current_alias] is not None and branches[current_alias].target_type == TargetType.ALIAS ): assert branches[current_alias].target not in processed_alias processed_alias.add(current_alias) current_alias = branches[current_alias].target @given(snapshots(min_size=_min_snp_size, max_size=_min_snp_size)) @settings(max_examples=1) def test_snapshots_strategy_fixed_size(snapshot): assert len(snapshot.branches) == _min_snp_size @given(origin_visits()) def test_origin_visit_aware_datetime(visit): assert visit.date.tzinfo is not None + + +@given(persons()) +def test_person_do_not_look_like_anonimized(person): + assert not ( + len(person.fullname) == 32 and person.name is None and person.email is None + ) diff --git a/swh/model/tests/test_model.py b/swh/model/tests/test_model.py index 6027cc2..e126ca5 100644 --- a/swh/model/tests/test_model.py +++ b/swh/model/tests/test_model.py @@ -1,439 +1,470 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy import datetime import attr from attrs_strict import AttributeTypeError from hypothesis import given from hypothesis.strategies import binary import pytest from swh.model.model import ( Content, SkippedContent, Directory, Revision, Release, Snapshot, Origin, Timestamp, TimestampWithTimezone, MissingData, Person, ) from swh.model.hashutil import hash_to_bytes, MultiHash import swh.model.hypothesis_strategies as strategies from swh.model.identifiers import ( directory_identifier, revision_identifier, release_identifier, snapshot_identifier, ) from swh.model.tests.test_identifiers import ( directory_example, revision_example, release_example, snapshot_example, ) @given(strategies.objects()) def test_todict_inverse_fromdict(objtype_and_obj): (obj_type, obj) = objtype_and_obj if obj_type in ("origin", "origin_visit"): return obj_as_dict = obj.to_dict() obj_as_dict_copy = copy.deepcopy(obj_as_dict) # Check the composition of to_dict and from_dict is the identity assert obj == type(obj).from_dict(obj_as_dict) # Check from_dict() does not change the input dict assert obj_as_dict == obj_as_dict_copy # Check the composition of from_dict and to_dict is the identity assert obj_as_dict == type(obj).from_dict(obj_as_dict).to_dict() +# Anonymization + + +@given(strategies.objects()) +def test_anonymization(objtype_and_obj): + (obj_type, obj) = objtype_and_obj + + def check_person(p): + if p is not None: + assert p.name is None + assert p.email is None + assert len(p.fullname) == 32 + + anon_obj = obj.anonymize() + if obj_type == "person": + assert anon_obj is not None + check_person(anon_obj) + elif obj_type == "release": + assert anon_obj is not None + check_person(anon_obj.author) + elif obj_type == "revision": + assert anon_obj is not None + check_person(anon_obj.author) + check_person(anon_obj.committer) + else: + assert anon_obj is None + + +# Origin, OriginVisit + + @given(strategies.origins()) def test_todict_origins(origin): obj = origin.to_dict() assert "type" not in obj assert type(origin)(url=origin.url) == type(origin).from_dict(obj) @given(strategies.origin_visits()) def test_todict_origin_visits(origin_visit): obj = origin_visit.to_dict() assert origin_visit == type(origin_visit).from_dict(obj) @given(strategies.origin_visit_statuses()) def test_todict_origin_visit_statuses(origin_visit_status): obj = origin_visit_status.to_dict() assert origin_visit_status == type(origin_visit_status).from_dict(obj) # Timestamp @given(strategies.timestamps()) def test_timestamps_strategy(timestamp): attr.validate(timestamp) def test_timestamp_seconds(): attr.validate(Timestamp(seconds=0, microseconds=0)) with pytest.raises(AttributeTypeError): Timestamp(seconds="0", microseconds=0) attr.validate(Timestamp(seconds=2 ** 63 - 1, microseconds=0)) with pytest.raises(ValueError): Timestamp(seconds=2 ** 63, microseconds=0) attr.validate(Timestamp(seconds=-(2 ** 63), microseconds=0)) with pytest.raises(ValueError): Timestamp(seconds=-(2 ** 63) - 1, microseconds=0) def test_timestamp_microseconds(): attr.validate(Timestamp(seconds=0, microseconds=0)) with pytest.raises(AttributeTypeError): Timestamp(seconds=0, microseconds="0") attr.validate(Timestamp(seconds=0, microseconds=10 ** 6 - 1)) with pytest.raises(ValueError): Timestamp(seconds=0, microseconds=10 ** 6) with pytest.raises(ValueError): Timestamp(seconds=0, microseconds=-1) def test_timestamp_from_dict(): assert Timestamp.from_dict({"seconds": 10, "microseconds": 5}) with pytest.raises(AttributeTypeError): Timestamp.from_dict({"seconds": "10", "microseconds": 5}) with pytest.raises(AttributeTypeError): Timestamp.from_dict({"seconds": 10, "microseconds": "5"}) with pytest.raises(ValueError): Timestamp.from_dict({"seconds": 0, "microseconds": -1}) Timestamp.from_dict({"seconds": 0, "microseconds": 10 ** 6 - 1}) with pytest.raises(ValueError): Timestamp.from_dict({"seconds": 0, "microseconds": 10 ** 6}) # TimestampWithTimezone def test_timestampwithtimezone(): ts = Timestamp(seconds=0, microseconds=0) tstz = TimestampWithTimezone(timestamp=ts, offset=0, negative_utc=False) attr.validate(tstz) assert tstz.negative_utc is False attr.validate(TimestampWithTimezone(timestamp=ts, offset=10, negative_utc=False)) attr.validate(TimestampWithTimezone(timestamp=ts, offset=-10, negative_utc=False)) tstz = TimestampWithTimezone(timestamp=ts, offset=0, negative_utc=True) attr.validate(tstz) assert tstz.negative_utc is True with pytest.raises(AttributeTypeError): TimestampWithTimezone( timestamp=datetime.datetime.now(), offset=0, negative_utc=False ) with pytest.raises(AttributeTypeError): TimestampWithTimezone(timestamp=ts, offset="0", negative_utc=False) with pytest.raises(AttributeTypeError): TimestampWithTimezone(timestamp=ts, offset=1.0, negative_utc=False) with pytest.raises(AttributeTypeError): TimestampWithTimezone(timestamp=ts, offset=1, negative_utc=0) with pytest.raises(ValueError): TimestampWithTimezone(timestamp=ts, offset=1, negative_utc=True) with pytest.raises(ValueError): TimestampWithTimezone(timestamp=ts, offset=-1, negative_utc=True) def test_timestampwithtimezone_from_datetime(): tz = datetime.timezone(datetime.timedelta(minutes=+60)) date = datetime.datetime(2020, 2, 27, 14, 39, 19, tzinfo=tz) tstz = TimestampWithTimezone.from_datetime(date) assert tstz == TimestampWithTimezone( timestamp=Timestamp(seconds=1582810759, microseconds=0,), offset=60, negative_utc=False, ) def test_timestampwithtimezone_from_iso8601(): date = "2020-02-27 14:39:19.123456+0100" tstz = TimestampWithTimezone.from_iso8601(date) assert tstz == TimestampWithTimezone( timestamp=Timestamp(seconds=1582810759, microseconds=123456,), offset=60, negative_utc=False, ) def test_timestampwithtimezone_from_iso8601_negative_utc(): date = "2020-02-27 13:39:19-0000" tstz = TimestampWithTimezone.from_iso8601(date) assert tstz == TimestampWithTimezone( timestamp=Timestamp(seconds=1582810759, microseconds=0,), offset=0, negative_utc=True, ) def test_person_from_fullname(): """The author should have name, email and fullname filled. """ actual_person = Person.from_fullname(b"tony ") assert actual_person == Person( fullname=b"tony ", name=b"tony", email=b"ynot@dagobah", ) def test_person_from_fullname_no_email(): """The author and fullname should be the same as the input (author). """ actual_person = Person.from_fullname(b"tony") assert actual_person == Person(fullname=b"tony", name=b"tony", email=None,) def test_person_from_fullname_empty_person(): """Empty person has only its fullname filled with the empty byte-string. """ actual_person = Person.from_fullname(b"") assert actual_person == Person(fullname=b"", name=None, email=None,) def test_git_author_line_to_author(): # edge case out of the way with pytest.raises(TypeError): Person.from_fullname(None) tests = { b"a ": Person(name=b"a", email=b"b@c.com", fullname=b"a ",), b"": Person( name=None, email=b"foo@bar.com", fullname=b"", ), b"malformed ': Person( name=b"malformed", email=b'"', ), b"trailing ": Person( name=b"trailing", email=b"sp@c.e", fullname=b"trailing ", ), b"no": Person(name=b"no", email=b"sp@c.e", fullname=b"no",), b" more ": Person( name=b"more", email=b"sp@c.es", fullname=b" more ", ), b" <>": Person(name=None, email=None, fullname=b" <>",), } for person in sorted(tests): expected_person = tests[person] assert expected_person == Person.from_fullname(person) # Content def test_content_get_hash(): hashes = dict(sha1=b"foo", sha1_git=b"bar", sha256=b"baz", blake2s256=b"qux") c = Content(length=42, status="visible", **hashes) for (hash_name, hash_) in hashes.items(): assert c.get_hash(hash_name) == hash_ def test_content_hashes(): hashes = dict(sha1=b"foo", sha1_git=b"bar", sha256=b"baz", blake2s256=b"qux") c = Content(length=42, status="visible", **hashes) assert c.hashes() == hashes def test_content_data(): c = Content( length=42, status="visible", data=b"foo", sha1=b"foo", sha1_git=b"bar", sha256=b"baz", blake2s256=b"qux", ) assert c.with_data() == c def test_content_data_missing(): c = Content( length=42, status="visible", sha1=b"foo", sha1_git=b"bar", sha256=b"baz", blake2s256=b"qux", ) with pytest.raises(MissingData): c.with_data() @given(strategies.present_contents_d()) def test_content_from_dict(content_d): c = Content.from_data(**content_d) assert c assert c.ctime == content_d["ctime"] content_d2 = c.to_dict() c2 = Content.from_dict(content_d2) assert c2.ctime == c.ctime def test_content_from_dict_str_ctime(): # test with ctime as a string n = datetime.datetime(2020, 5, 6, 12, 34) content_d = { "ctime": n.isoformat(), "data": b"", "length": 0, "sha1": b"\x00", "sha256": b"\x00", "sha1_git": b"\x00", "blake2s256": b"\x00", } c = Content.from_dict(content_d) assert c.ctime == n @given(binary(max_size=4096)) def test_content_from_data(data): c = Content.from_data(data) assert c.data == data assert c.length == len(data) assert c.status == "visible" for key, value in MultiHash.from_data(data).digest().items(): assert getattr(c, key) == value @given(binary(max_size=4096)) def test_hidden_content_from_data(data): c = Content.from_data(data, status="hidden") assert c.data == data assert c.length == len(data) assert c.status == "hidden" for key, value in MultiHash.from_data(data).digest().items(): assert getattr(c, key) == value # SkippedContent @given(binary(max_size=4096)) def test_skipped_content_from_data(data): c = SkippedContent.from_data(data, reason="reason") assert c.reason == "reason" assert c.length == len(data) assert c.status == "absent" for key, value in MultiHash.from_data(data).digest().items(): assert getattr(c, key) == value @given(strategies.skipped_contents_d()) def test_skipped_content_origin_is_str(skipped_content_d): assert SkippedContent.from_dict(skipped_content_d) skipped_content_d["origin"] = "http://path/to/origin" assert SkippedContent.from_dict(skipped_content_d) skipped_content_d["origin"] = Origin(url="http://path/to/origin") with pytest.raises(ValueError, match="origin"): SkippedContent.from_dict(skipped_content_d) # ID computation def test_directory_model_id_computation(): dir_dict = directory_example.copy() del dir_dict["id"] dir_id = hash_to_bytes(directory_identifier(dir_dict)) dir_model = Directory.from_dict(dir_dict) assert dir_model.id == dir_id def test_revision_model_id_computation(): rev_dict = revision_example.copy() del rev_dict["id"] rev_id = hash_to_bytes(revision_identifier(rev_dict)) rev_model = Revision.from_dict(rev_dict) assert rev_model.id == rev_id def test_revision_model_id_computation_with_no_date(): """We can have revision with date to None """ rev_dict = revision_example.copy() rev_dict["date"] = None rev_dict["committer_date"] = None del rev_dict["id"] rev_id = hash_to_bytes(revision_identifier(rev_dict)) rev_model = Revision.from_dict(rev_dict) assert rev_model.date is None assert rev_model.committer_date is None assert rev_model.id == rev_id def test_release_model_id_computation(): rel_dict = release_example.copy() del rel_dict["id"] rel_id = hash_to_bytes(release_identifier(rel_dict)) rel_model = Release.from_dict(rel_dict) assert isinstance(rel_model.date, TimestampWithTimezone) assert rel_model.id == hash_to_bytes(rel_id) def test_snapshot_model_id_computation(): snp_dict = snapshot_example.copy() del snp_dict["id"] snp_id = hash_to_bytes(snapshot_identifier(snp_dict)) snp_model = Snapshot.from_dict(snp_dict) assert snp_model.id == snp_id