diff --git a/swh/model/hypothesis_strategies.py b/swh/model/hypothesis_strategies.py index e9acfa1..15e3bdd 100644 --- a/swh/model/hypothesis_strategies.py +++ b/swh/model/hypothesis_strategies.py @@ -1,235 +1,232 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import attr import datetime from hypothesis.strategies import ( binary, builds, characters, composite, dictionaries, from_regex, integers, just, lists, none, one_of, sampled_from, text, tuples, ) from .from_disk import DentryPerms from .model import ( Person, Timestamp, TimestampWithTimezone, Origin, OriginVisit, Snapshot, SnapshotBranch, TargetType, Release, Revision, Directory, DirectoryEntry, Content ) from .identifiers import snapshot_identifier, identifier_to_bytes pgsql_alphabet = characters( blacklist_categories=('Cs', ), blacklist_characters=['\u0000']) # postgresql does not like these def pgsql_text(): return text(alphabet=pgsql_alphabet) def sha1_git(): return binary(min_size=20, max_size=20) def sha1(): return binary(min_size=20, max_size=20) @composite def urls(draw): protocol = draw(sampled_from(['git', 'http', 'https', 'deb'])) domain = draw(from_regex(r'\A([a-z]([a-z0-9-]*)\.){1,3}[a-z0-9]+\Z')) return '%s://%s' % (protocol, domain) def persons(): return builds(Person) def timestamps(): max_seconds = datetime.datetime.max.replace( tzinfo=datetime.timezone.utc).timestamp() min_seconds = datetime.datetime.min.replace( tzinfo=datetime.timezone.utc).timestamp() return builds( Timestamp, seconds=integers(min_seconds, max_seconds), microseconds=integers(0, 1000000)) def timestamps_with_timezone(): return builds( TimestampWithTimezone, timestamp=timestamps(), offset=integers(min_value=-14*60, max_value=14*60)) def origins(): return builds( Origin, type=sampled_from(['git', 'hg', 'svn', 'pypi', 'deb']), url=urls()) def origin_visits(): return builds( OriginVisit, visit=integers(0, 1000), origin=urls(), status=sampled_from(['ongoing', 'full', 'partial']), type=pgsql_text()) @composite def releases(draw): (date, author) = draw(one_of( tuples(none(), none()), tuples(timestamps_with_timezone(), persons()))) rel = draw(builds( Release, - id=sha1_git(), author=none(), date=none(), target=sha1_git())) return attr.evolve( rel, date=date, author=author) def revision_metadata(): return dictionaries(pgsql_text(), pgsql_text()) def revisions(): return builds( Revision, - id=sha1_git(), date=timestamps_with_timezone(), committer_date=timestamps_with_timezone(), parents=lists(sha1_git()), directory=sha1_git(), metadata=one_of(none(), revision_metadata())) # TODO: metadata['extra_headers'] can have binary keys and values def directory_entries(): return builds( DirectoryEntry, target=sha1_git(), perms=sampled_from([perm.value for perm in DentryPerms])) def directories(): return builds( Directory, - id=sha1_git(), entries=lists(directory_entries())) @composite def contents(draw): (status, data, reason) = draw(one_of( tuples(just('visible'), binary(), none()), tuples(just('absent'), none(), pgsql_text()), tuples(just('hidden'), binary(), none()), )) return draw(builds( Content, length=integers(min_value=0, max_value=2**63-1), sha1=sha1(), sha1_git=sha1_git(), sha256=binary(min_size=32, max_size=32), blake2s256=binary(min_size=32, max_size=32), status=just(status), data=just(data), reason=just(reason), )) def branch_names(): return binary(min_size=1) def branch_targets_object(): return builds( SnapshotBranch, target=sha1_git(), target_type=sampled_from([ TargetType.CONTENT, TargetType.DIRECTORY, TargetType.REVISION, TargetType.RELEASE, TargetType.SNAPSHOT])) def branch_targets_alias(): return builds( SnapshotBranch, target_type=just(TargetType.ALIAS)) def branch_targets(*, only_objects=False): if only_objects: return branch_targets_object() else: return one_of(branch_targets_alias(), branch_targets_object()) @composite def snapshots(draw, *, min_size=0, max_size=100, only_objects=False): branches = draw(dictionaries( keys=branch_names(), values=one_of( none(), branch_targets(only_objects=only_objects) ), min_size=min_size, max_size=max_size, )) if not only_objects: # Make sure aliases point to actual branches unresolved_aliases = { target.target for target in branches.values() if (target and target.target_type == 'alias' and target.target not in branches) } for alias in unresolved_aliases: branches[alias] = draw(branch_targets(only_objects=True)) while True: try: id_ = snapshot_identifier({ 'branches': { name: branch.to_dict() if branch else None for (name, branch) in branches.items()}}) except ValueError as e: for (source, target) in e.args[1]: branches[source] = draw(branch_targets(only_objects=True)) else: break return Snapshot( id=identifier_to_bytes(id_), branches=branches) def objects(): return one_of( origins().map(lambda x: ('origin', x)), origin_visits().map(lambda x: ('origin_visit', x)), snapshots().map(lambda x: ('snapshot', x)), releases().map(lambda x: ('release', x)), revisions().map(lambda x: ('revision', x)), directories().map(lambda x: ('directory', x)), contents().map(lambda x: ('content', x)), ) def object_dicts(): return objects().map(lambda x: (x[0], x[1].to_dict())) diff --git a/swh/model/model.py b/swh/model/model.py index 8c31f6d..f551706 100644 --- a/swh/model/model.py +++ b/swh/model/model.py @@ -1,363 +1,402 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime + +from abc import ABCMeta, abstractmethod from enum import Enum from typing import List, Optional, Dict import attr import dateutil.parser -from .identifiers import normalize_timestamp -from .hashutil import DEFAULT_ALGORITHMS +from .identifiers import ( + normalize_timestamp, directory_identifier, revision_identifier, + release_identifier, snapshot_identifier +) +from .hashutil import DEFAULT_ALGORITHMS, hash_to_bytes SHA1_SIZE = 20 # TODO: Limit this to 20 bytes Sha1Git = bytes class BaseModel: """Base class for SWH model classes. Provides serialization/deserialization to/from Python dictionaries, that are suitable for JSON/msgpack-like formats.""" def to_dict(self): """Wrapper of `attr.asdict` that can be overridden by subclasses that have special handling of some of the fields.""" def dictify(value): if isinstance(value, BaseModel): return value.to_dict() elif isinstance(value, Enum): return value.value elif isinstance(value, dict): return {k: dictify(v) for k, v in value.items()} elif isinstance(value, list): return [dictify(v) for v in value] else: return value ret = attr.asdict(self, recurse=False) return dictify(ret) @classmethod def from_dict(cls, d): """Takes a dictionary representing a tree of SWH objects, and recursively builds the corresponding objects.""" return cls(**d) +class HashableObject(metaclass=ABCMeta): + """Mixin to automatically compute object identifier hash when + the associated model is instantiated.""" + + @staticmethod + @abstractmethod + def compute_hash(object_dict): + """Derived model classes must implement this to compute + the object hash from its dict representation.""" + pass + + def __attrs_post_init__(self): + if not self.id: + obj_id = hash_to_bytes(self.compute_hash(self.to_dict())) + object.__setattr__(self, 'id', obj_id) + + @attr.s(frozen=True) class Person(BaseModel): """Represents the author/committer of a revision or release.""" name = attr.ib(type=bytes) email = attr.ib(type=bytes) fullname = attr.ib(type=bytes) @attr.s(frozen=True) class Timestamp(BaseModel): """Represents a naive timestamp from a VCS.""" seconds = attr.ib(type=int) microseconds = attr.ib(type=int) @seconds.validator def check_seconds(self, attribute, value): """Check that seconds fit in a 64-bits signed integer.""" if not (-2**63 <= value < 2**63): raise ValueError('Seconds must be a signed 64-bits integer.') @microseconds.validator def check_microseconds(self, attribute, value): """Checks that microseconds are positive and < 1000000.""" if not (0 <= value < 10**6): raise ValueError('Microseconds must be in [0, 1000000[.') @attr.s(frozen=True) class TimestampWithTimezone(BaseModel): """Represents a TZ-aware timestamp from a VCS.""" timestamp = attr.ib(type=Timestamp) offset = attr.ib(type=int) negative_utc = attr.ib(type=bool) @offset.validator def check_offset(self, attribute, value): """Checks the offset is a 16-bits signed integer (in theory, it should always be between -14 and +14 hours).""" if not (-2**15 <= value < 2**15): # max 14 hours offset in theory, but you never know what # you'll find in the wild... raise ValueError('offset too large: %d minutes' % value) @classmethod def from_dict(cls, d): """Builds a TimestampWithTimezone from any of the formats accepted by :py:`swh.model.normalize_timestamp`.""" d = normalize_timestamp(d) return cls( timestamp=Timestamp.from_dict(d['timestamp']), offset=d['offset'], negative_utc=d['negative_utc']) @attr.s(frozen=True) class Origin(BaseModel): """Represents a software source: a VCS and an URL.""" url = attr.ib(type=str) type = attr.ib(type=Optional[str], default=None) def to_dict(self): r = super().to_dict() r.pop('type', None) return r @attr.s(frozen=True) class OriginVisit(BaseModel): """Represents a visit of an origin at a given point in time, by a SWH loader.""" origin = attr.ib(type=str) date = attr.ib(type=datetime.datetime) status = attr.ib( type=str, validator=attr.validators.in_(['ongoing', 'full', 'partial'])) type = attr.ib(type=str) snapshot = attr.ib(type=Sha1Git) metadata = attr.ib(type=Optional[Dict[str, object]], default=None) visit = attr.ib(type=Optional[int], default=None) """Should not be set before calling 'origin_visit_add()'.""" def to_dict(self): """Serializes the date as a string and omits the visit id if it is `None`.""" ov = super().to_dict() if ov['visit'] is None: del ov['visit'] return ov @classmethod def from_dict(cls, d): """Parses the date from a string, and accepts missing visit ids.""" d = d.copy() date = d.pop('date') return cls( date=(date if isinstance(date, datetime.datetime) else dateutil.parser.parse(date)), **d) class TargetType(Enum): """The type of content pointed to by a snapshot branch. Usually a revision or an alias.""" CONTENT = 'content' DIRECTORY = 'directory' REVISION = 'revision' RELEASE = 'release' SNAPSHOT = 'snapshot' ALIAS = 'alias' class ObjectType(Enum): """The type of content pointed to by a release. Usually a revision""" CONTENT = 'content' DIRECTORY = 'directory' REVISION = 'revision' RELEASE = 'release' SNAPSHOT = 'snapshot' @attr.s(frozen=True) class SnapshotBranch(BaseModel): """Represents one of the branches of a snapshot.""" target = attr.ib(type=bytes) target_type = attr.ib(type=TargetType) @target.validator def check_target(self, attribute, value): """Checks the target type is not an alias, checks the target is a valid sha1_git.""" - if self.target_type != TargetType.ALIAS: + if self.target_type != TargetType.ALIAS and self.target is not None: if len(value) != 20: raise ValueError('Wrong length for bytes identifier: %d' % len(value)) @classmethod def from_dict(cls, d): return cls( target=d['target'], target_type=TargetType(d['target_type'])) @attr.s(frozen=True) -class Snapshot(BaseModel): +class Snapshot(BaseModel, HashableObject): """Represents the full state of an origin at a given point in time.""" - id = attr.ib(type=Sha1Git) branches = attr.ib(type=Dict[bytes, Optional[SnapshotBranch]]) + id = attr.ib(type=Sha1Git, default=b'') + + @staticmethod + def compute_hash(object_dict): + return snapshot_identifier(object_dict) @classmethod def from_dict(cls, d): + d = d.copy() return cls( - id=d['id'], branches={ name: SnapshotBranch.from_dict(branch) if branch else None - for (name, branch) in d['branches'].items() - }) + for (name, branch) in d.pop('branches').items() + }, + **d) @attr.s(frozen=True) -class Release(BaseModel): - id = attr.ib(type=Sha1Git) +class Release(BaseModel, HashableObject): name = attr.ib(type=bytes) message = attr.ib(type=bytes) target = attr.ib(type=Optional[Sha1Git]) target_type = attr.ib(type=ObjectType) synthetic = attr.ib(type=bool) author = attr.ib(type=Optional[Person], default=None) date = attr.ib(type=Optional[TimestampWithTimezone], default=None) metadata = attr.ib(type=Optional[Dict[str, object]], default=None) + id = attr.ib(type=Sha1Git, default=b'') + + @staticmethod + def compute_hash(object_dict): + return release_identifier(object_dict) @author.validator def check_author(self, attribute, value): """If the author is `None`, checks the date is `None` too.""" if self.author is None and self.date is not None: raise ValueError('release date must be None if author is None.') def to_dict(self): rel = super().to_dict() if rel['metadata'] is None: del rel['metadata'] return rel @classmethod def from_dict(cls, d): d = d.copy() if d.get('author'): d['author'] = Person.from_dict(d['author']) if d.get('date'): d['date'] = TimestampWithTimezone.from_dict(d['date']) return cls( target_type=ObjectType(d.pop('target_type')), **d) class RevisionType(Enum): GIT = 'git' TAR = 'tar' DSC = 'dsc' SUBVERSION = 'svn' MERCURIAL = 'hg' @attr.s(frozen=True) -class Revision(BaseModel): - id = attr.ib(type=Sha1Git) +class Revision(BaseModel, HashableObject): message = attr.ib(type=bytes) author = attr.ib(type=Person) committer = attr.ib(type=Person) date = attr.ib(type=TimestampWithTimezone) committer_date = attr.ib(type=TimestampWithTimezone) type = attr.ib(type=RevisionType) directory = attr.ib(type=Sha1Git) synthetic = attr.ib(type=bool) metadata = attr.ib(type=Optional[Dict[str, object]], default=None) parents = attr.ib(type=List[Sha1Git], default=attr.Factory(list)) + id = attr.ib(type=Sha1Git, default=b'') + + @staticmethod + def compute_hash(object_dict): + return revision_identifier(object_dict) @classmethod def from_dict(cls, d): d = d.copy() return cls( - id=d.pop('id'), author=Person.from_dict(d.pop('author')), committer=Person.from_dict(d.pop('committer')), date=TimestampWithTimezone.from_dict(d.pop('date')), committer_date=TimestampWithTimezone.from_dict( d.pop('committer_date')), type=RevisionType(d.pop('type')), **d) @attr.s(frozen=True) class DirectoryEntry(BaseModel): name = attr.ib(type=bytes) type = attr.ib(type=str, validator=attr.validators.in_(['file', 'dir', 'rev'])) target = attr.ib(type=Sha1Git) perms = attr.ib(type=int) """Usually one of the values of `swh.model.from_disk.DentryPerms`.""" @attr.s(frozen=True) -class Directory(BaseModel): - id = attr.ib(type=Sha1Git) +class Directory(BaseModel, HashableObject): entries = attr.ib(type=List[DirectoryEntry]) + id = attr.ib(type=Sha1Git, default=b'') + + @staticmethod + def compute_hash(object_dict): + return directory_identifier(object_dict) @classmethod def from_dict(cls, d): + d = d.copy() return cls( - id=d['id'], entries=[DirectoryEntry.from_dict(entry) - for entry in d['entries']]) + for entry in d.pop('entries')], + **d) @attr.s(frozen=True) class Content(BaseModel): sha1 = attr.ib(type=bytes) sha1_git = attr.ib(type=Sha1Git) sha256 = attr.ib(type=bytes) blake2s256 = attr.ib(type=bytes) length = attr.ib(type=int) status = attr.ib( type=str, validator=attr.validators.in_(['visible', 'absent', 'hidden'])) reason = attr.ib(type=Optional[str], default=None) data = attr.ib(type=Optional[bytes], default=None) ctime = attr.ib(type=Optional[datetime.datetime], default=None) @length.validator def check_length(self, attribute, value): """Checks the length is positive.""" if self.status == 'absent' and value < -1: raise ValueError('Length must be positive or -1.') elif self.status != 'absent' and value < 0: raise ValueError('Length must be positive, unless status=absent.') @reason.validator def check_reason(self, attribute, value): """Checks the reason is full if status != absent.""" assert self.reason == value if self.status == 'absent' and value is None: raise ValueError('Must provide a reason if content is absent.') elif self.status != 'absent' and value is not None: raise ValueError( 'Must not provide a reason if content is not absent.') def to_dict(self): content = super().to_dict() for field in ('data', 'reason', 'ctime'): if content[field] is None: del content[field] return content def get_hash(self, hash_name): if hash_name not in DEFAULT_ALGORITHMS: raise ValueError('{} is not a valid hash name.'.format(hash_name)) return getattr(self, hash_name) diff --git a/swh/model/tests/test_identifiers.py b/swh/model/tests/test_identifiers.py index a315fc5..bddf0bc 100644 --- a/swh/model/tests/test_identifiers.py +++ b/swh/model/tests/test_identifiers.py @@ -1,920 +1,943 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import binascii import datetime import unittest from swh.model import hashutil, identifiers from swh.model.exceptions import ValidationError +from swh.model.hashutil import hash_to_bytes from swh.model.identifiers import (CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT, PersistentId) class UtilityFunctionsIdentifier(unittest.TestCase): def setUp(self): self.str_id = 'c2e41aae41ac17bd4a650770d6ee77f62e52235b' self.bytes_id = binascii.unhexlify(self.str_id) self.bad_type_id = object() def test_identifier_to_bytes(self): for id in [self.str_id, self.bytes_id]: self.assertEqual(identifiers.identifier_to_bytes(id), self.bytes_id) # wrong length with self.assertRaises(ValueError) as cm: identifiers.identifier_to_bytes(id[:-2]) self.assertIn('length', str(cm.exception)) with self.assertRaises(ValueError) as cm: identifiers.identifier_to_bytes(self.bad_type_id) self.assertIn('type', str(cm.exception)) def test_identifier_to_str(self): for id in [self.str_id, self.bytes_id]: self.assertEqual(identifiers.identifier_to_str(id), self.str_id) # wrong length with self.assertRaises(ValueError) as cm: identifiers.identifier_to_str(id[:-2]) self.assertIn('length', str(cm.exception)) with self.assertRaises(ValueError) as cm: identifiers.identifier_to_str(self.bad_type_id) self.assertIn('type', str(cm.exception)) class UtilityFunctionsDateOffset(unittest.TestCase): def setUp(self): self.dates = { b'1448210036': { 'seconds': 1448210036, 'microseconds': 0, }, b'1448210036.002342': { 'seconds': 1448210036, 'microseconds': 2342, }, b'1448210036.12': { 'seconds': 1448210036, 'microseconds': 120000, } } self.broken_dates = [ 1448210036.12, ] self.offsets = { 0: b'+0000', -630: b'-1030', 800: b'+1320', } def test_format_date(self): for date_repr, date in self.dates.items(): self.assertEqual(identifiers.format_date(date), date_repr) def test_format_date_fail(self): for date in self.broken_dates: with self.assertRaises(ValueError): identifiers.format_date(date) def test_format_offset(self): for offset, res in self.offsets.items(): self.assertEqual(identifiers.format_offset(offset), res) class ContentIdentifier(unittest.TestCase): def setUp(self): self.content = { 'status': 'visible', 'length': 5, 'data': b'1984\n', 'ctime': datetime.datetime(2015, 11, 22, 16, 33, 56, tzinfo=datetime.timezone.utc), } self.content_id = hashutil.MultiHash.from_data( self.content['data']).digest() def test_content_identifier(self): self.assertEqual(identifiers.content_identifier(self.content), self.content_id) +directory_example = { + 'id': 'c2e41aae41ac17bd4a650770d6ee77f62e52235b', + 'entries': [ + { + 'type': 'file', + 'perms': 33188, + 'name': b'README', + 'target': '37ec8ea2110c0b7a32fbb0e872f6e7debbf95e21' + }, + { + 'type': 'file', + 'perms': 33188, + 'name': b'Rakefile', + 'target': '3bb0e8592a41ae3185ee32266c860714980dbed7' + }, + { + 'type': 'dir', + 'perms': 16384, + 'name': b'app', + 'target': '61e6e867f5d7ba3b40540869bc050b0c4fed9e95' + }, + { + 'type': 'file', + 'perms': 33188, + 'name': b'1.megabyte', + 'target': '7c2b2fbdd57d6765cdc9d84c2d7d333f11be7fb3' + }, + { + 'type': 'dir', + 'perms': 16384, + 'name': b'config', + 'target': '591dfe784a2e9ccc63aaba1cb68a765734310d98' + }, + { + 'type': 'dir', + 'perms': 16384, + 'name': b'public', + 'target': '9588bf4522c2b4648bfd1c61d175d1f88c1ad4a5' + }, + { + 'type': 'file', + 'perms': 33188, + 'name': b'development.sqlite3', + 'target': 'e69de29bb2d1d6434b8b29ae775ad8c2e48c5391' + }, + { + 'type': 'dir', + 'perms': 16384, + 'name': b'doc', + 'target': '154705c6aa1c8ead8c99c7915373e3c44012057f' + }, + { + 'type': 'dir', + 'perms': 16384, + 'name': b'db', + 'target': '85f157bdc39356b7bc7de9d0099b4ced8b3b382c' + }, + { + 'type': 'dir', + 'perms': 16384, + 'name': b'log', + 'target': '5e3d3941c51cce73352dff89c805a304ba96fffe' + }, + { + 'type': 'dir', + 'perms': 16384, + 'name': b'script', + 'target': '1b278423caf176da3f3533592012502aa10f566c' + }, + { + 'type': 'dir', + 'perms': 16384, + 'name': b'test', + 'target': '035f0437c080bfd8711670b3e8677e686c69c763' + }, + { + 'type': 'dir', + 'perms': 16384, + 'name': b'vendor', + 'target': '7c0dc9ad978c1af3f9a4ce061e50f5918bd27138' + }, + { + 'type': 'rev', + 'perms': 57344, + 'name': b'will_paginate', + 'target': '3d531e169db92a16a9a8974f0ae6edf52e52659e' + } + ], +} + + class DirectoryIdentifier(unittest.TestCase): def setUp(self): - self.directory = { - 'id': 'c2e41aae41ac17bd4a650770d6ee77f62e52235b', - 'entries': [ - { - 'type': 'file', - 'perms': 33188, - 'name': b'README', - 'target': '37ec8ea2110c0b7a32fbb0e872f6e7debbf95e21' - }, - { - 'type': 'file', - 'perms': 33188, - 'name': b'Rakefile', - 'target': '3bb0e8592a41ae3185ee32266c860714980dbed7' - }, - { - 'type': 'dir', - 'perms': 16384, - 'name': b'app', - 'target': '61e6e867f5d7ba3b40540869bc050b0c4fed9e95' - }, - { - 'type': 'file', - 'perms': 33188, - 'name': b'1.megabyte', - 'target': '7c2b2fbdd57d6765cdc9d84c2d7d333f11be7fb3' - }, - { - 'type': 'dir', - 'perms': 16384, - 'name': b'config', - 'target': '591dfe784a2e9ccc63aaba1cb68a765734310d98' - }, - { - 'type': 'dir', - 'perms': 16384, - 'name': b'public', - 'target': '9588bf4522c2b4648bfd1c61d175d1f88c1ad4a5' - }, - { - 'type': 'file', - 'perms': 33188, - 'name': b'development.sqlite3', - 'target': 'e69de29bb2d1d6434b8b29ae775ad8c2e48c5391' - }, - { - 'type': 'dir', - 'perms': 16384, - 'name': b'doc', - 'target': '154705c6aa1c8ead8c99c7915373e3c44012057f' - }, - { - 'type': 'dir', - 'perms': 16384, - 'name': b'db', - 'target': '85f157bdc39356b7bc7de9d0099b4ced8b3b382c' - }, - { - 'type': 'dir', - 'perms': 16384, - 'name': b'log', - 'target': '5e3d3941c51cce73352dff89c805a304ba96fffe' - }, - { - 'type': 'dir', - 'perms': 16384, - 'name': b'script', - 'target': '1b278423caf176da3f3533592012502aa10f566c' - }, - { - 'type': 'dir', - 'perms': 16384, - 'name': b'test', - 'target': '035f0437c080bfd8711670b3e8677e686c69c763' - }, - { - 'type': 'dir', - 'perms': 16384, - 'name': b'vendor', - 'target': '7c0dc9ad978c1af3f9a4ce061e50f5918bd27138' - }, - { - 'type': 'rev', - 'perms': 57344, - 'name': b'will_paginate', - 'target': '3d531e169db92a16a9a8974f0ae6edf52e52659e' - } - ], - } + self.directory = directory_example self.empty_directory = { 'id': '4b825dc642cb6eb9a060e54bf8d69288fbee4904', 'entries': [], } def test_dir_identifier(self): self.assertEqual( identifiers.directory_identifier(self.directory), self.directory['id']) def test_dir_identifier_empty_directory(self): self.assertEqual( identifiers.directory_identifier(self.empty_directory), self.empty_directory['id']) -class RevisionIdentifier(unittest.TestCase): - def setUp(self): +linus_tz = datetime.timezone(datetime.timedelta(minutes=-420)) + +revision_example = { + 'id': 'bc0195aad0daa2ad5b0d76cce22b167bc3435590', + 'directory': '85a74718d377195e1efd0843ba4f3260bad4fe07', + 'parents': ['01e2d0627a9a6edb24c37db45db5ecb31e9de808'], + 'author': { + 'name': b'Linus Torvalds', + 'email': b'torvalds@linux-foundation.org', + 'fullname': b'Linus Torvalds ' + }, + 'date': datetime.datetime(2015, 7, 12, 15, 10, 30, + tzinfo=linus_tz), + 'committer': { + 'name': b'Linus Torvalds', + 'email': b'torvalds@linux-foundation.org', + 'fullname': b'Linus Torvalds ' + }, + 'committer_date': datetime.datetime(2015, 7, 12, 15, 10, 30, + tzinfo=linus_tz), + 'message': b'Linux 4.2-rc2\n', + 'type': 'git', + 'synthetic': False +} - linus_tz = datetime.timezone(datetime.timedelta(minutes=-420)) +class RevisionIdentifier(unittest.TestCase): + def setUp(self): gpgsig = b'''\ -----BEGIN PGP SIGNATURE----- Version: GnuPG v1.4.13 (Darwin) iQIcBAABAgAGBQJVJcYsAAoJEBiY3kIkQRNJVAUQAJ8/XQIfMqqC5oYeEFfHOPYZ L7qy46bXHVBa9Qd8zAJ2Dou3IbI2ZoF6/Et89K/UggOycMlt5FKV/9toWyuZv4Po L682wonoxX99qvVTHo6+wtnmYO7+G0f82h+qHMErxjP+I6gzRNBvRr+SfY7VlGdK wikMKOMWC5smrScSHITnOq1Ews5pe3N7qDYMzK0XVZmgDoaem4RSWMJs4My/qVLN e0CqYWq2A22GX7sXl6pjneJYQvcAXUX+CAzp24QnPSb+Q22Guj91TcxLFcHCTDdn qgqMsEyMiisoglwrCbO+D+1xq9mjN9tNFWP66SQ48mrrHYTBV5sz9eJyDfroJaLP CWgbDTgq6GzRMehHT3hXfYS5NNatjnhkNISXR7pnVP/obIi/vpWh5ll6Gd8q26z+ a/O41UzOaLTeNI365MWT4/cnXohVLRG7iVJbAbCxoQmEgsYMRc/pBAzWJtLfcB2G jdTswYL6+MUdL8sB9pZ82D+BP/YAdHe69CyTu1lk9RT2pYtI/kkfjHubXBCYEJSG +VGllBbYG6idQJpyrOYNRJyrDi9yvDJ2W+S0iQrlZrxzGBVGTB/y65S8C+2WTBcE lf1Qb5GDsQrZWgD+jtWTywOYHtCBwyCKSAXxSARMbNPeak9WPlcW/Jmu+fUcMe2x dg1KdHOa34shrKDaOVzW =od6m -----END PGP SIGNATURE-----''' - self.revision = { - 'id': 'bc0195aad0daa2ad5b0d76cce22b167bc3435590', - 'directory': '85a74718d377195e1efd0843ba4f3260bad4fe07', - 'parents': ['01e2d0627a9a6edb24c37db45db5ecb31e9de808'], - 'author': { - 'name': b'Linus Torvalds', - 'email': b'torvalds@linux-foundation.org', - }, - 'date': datetime.datetime(2015, 7, 12, 15, 10, 30, - tzinfo=linus_tz), - 'committer': { - 'name': b'Linus Torvalds', - 'email': b'torvalds@linux-foundation.org', - }, - 'committer_date': datetime.datetime(2015, 7, 12, 15, 10, 30, - tzinfo=linus_tz), - 'message': b'Linux 4.2-rc2\n', - } + self.revision = revision_example self.revision_none_metadata = { 'id': 'bc0195aad0daa2ad5b0d76cce22b167bc3435590', 'directory': '85a74718d377195e1efd0843ba4f3260bad4fe07', 'parents': ['01e2d0627a9a6edb24c37db45db5ecb31e9de808'], 'author': { 'name': b'Linus Torvalds', 'email': b'torvalds@linux-foundation.org', }, 'date': datetime.datetime(2015, 7, 12, 15, 10, 30, tzinfo=linus_tz), 'committer': { 'name': b'Linus Torvalds', 'email': b'torvalds@linux-foundation.org', }, 'committer_date': datetime.datetime(2015, 7, 12, 15, 10, 30, tzinfo=linus_tz), 'message': b'Linux 4.2-rc2\n', 'metadata': None, } self.synthetic_revision = { 'id': b'\xb2\xa7\xe1&\x04\x92\xe3D\xfa\xb3\xcb\xf9\x1b\xc1<\x91' b'\xe0T&\xfd', 'author': { 'name': b'Software Heritage', 'email': b'robot@softwareheritage.org', }, 'date': { 'timestamp': {'seconds': 1437047495}, 'offset': 0, 'negative_utc': False, }, 'type': 'tar', 'committer': { 'name': b'Software Heritage', 'email': b'robot@softwareheritage.org', }, 'committer_date': 1437047495, 'synthetic': True, 'parents': [None], 'message': b'synthetic revision message\n', 'directory': b'\xd1\x1f\x00\xa6\xa0\xfe\xa6\x05SA\xd2U\x84\xb5\xa9' b'e\x16\xc0\xd2\xb8', 'metadata': {'original_artifact': [ {'archive_type': 'tar', 'name': 'gcc-5.2.0.tar.bz2', 'sha1_git': '39d281aff934d44b439730057e55b055e206a586', 'sha1': 'fe3f5390949d47054b613edc36c557eb1d51c18e', 'sha256': '5f835b04b5f7dd4f4d2dc96190ec1621b8d89f' '2dc6f638f9f8bc1b1014ba8cad'}]}, } # cat commit.txt | git hash-object -t commit --stdin self.revision_with_extra_headers = { 'id': '010d34f384fa99d047cdd5e2f41e56e5c2feee45', 'directory': '85a74718d377195e1efd0843ba4f3260bad4fe07', 'parents': ['01e2d0627a9a6edb24c37db45db5ecb31e9de808'], 'author': { 'name': b'Linus Torvalds', 'email': b'torvalds@linux-foundation.org', 'fullname': b'Linus Torvalds ', }, 'date': datetime.datetime(2015, 7, 12, 15, 10, 30, tzinfo=linus_tz), 'committer': { 'name': b'Linus Torvalds', 'email': b'torvalds@linux-foundation.org', 'fullname': b'Linus Torvalds ', }, 'committer_date': datetime.datetime(2015, 7, 12, 15, 10, 30, tzinfo=linus_tz), 'message': b'Linux 4.2-rc2\n', 'metadata': { 'extra_headers': [ ['svn-repo-uuid', '046f1af7-66c2-d61b-5410-ce57b7db7bff'], ['svn-revision', 10], ] } } self.revision_with_gpgsig = { 'id': '44cc742a8ca17b9c279be4cc195a93a6ef7a320e', 'directory': 'b134f9b7dc434f593c0bab696345548b37de0558', 'parents': ['689664ae944b4692724f13b709a4e4de28b54e57', 'c888305e1efbaa252d01b4e5e6b778f865a97514'], 'author': { 'name': b'Jiang Xin', 'email': b'worldhello.net@gmail.com', 'fullname': b'Jiang Xin ', }, 'date': { 'timestamp': 1428538899, 'offset': 480, }, 'committer': { 'name': b'Jiang Xin', 'email': b'worldhello.net@gmail.com', }, 'committer_date': { 'timestamp': 1428538899, 'offset': 480, }, 'metadata': { 'extra_headers': [ ['gpgsig', gpgsig], ], }, 'message': b'''Merge branch 'master' of git://github.com/alexhenrie/git-po * 'master' of git://github.com/alexhenrie/git-po: l10n: ca.po: update translation ''' } self.revision_no_message = { 'id': '4cfc623c9238fa92c832beed000ce2d003fd8333', 'directory': 'b134f9b7dc434f593c0bab696345548b37de0558', 'parents': ['689664ae944b4692724f13b709a4e4de28b54e57', 'c888305e1efbaa252d01b4e5e6b778f865a97514'], 'author': { 'name': b'Jiang Xin', 'email': b'worldhello.net@gmail.com', 'fullname': b'Jiang Xin ', }, 'date': { 'timestamp': 1428538899, 'offset': 480, }, 'committer': { 'name': b'Jiang Xin', 'email': b'worldhello.net@gmail.com', }, 'committer_date': { 'timestamp': 1428538899, 'offset': 480, }, 'message': None, } self.revision_empty_message = { 'id': '7442cd78bd3b4966921d6a7f7447417b7acb15eb', 'directory': 'b134f9b7dc434f593c0bab696345548b37de0558', 'parents': ['689664ae944b4692724f13b709a4e4de28b54e57', 'c888305e1efbaa252d01b4e5e6b778f865a97514'], 'author': { 'name': b'Jiang Xin', 'email': b'worldhello.net@gmail.com', 'fullname': b'Jiang Xin ', }, 'date': { 'timestamp': 1428538899, 'offset': 480, }, 'committer': { 'name': b'Jiang Xin', 'email': b'worldhello.net@gmail.com', }, 'committer_date': { 'timestamp': 1428538899, 'offset': 480, }, 'message': b'', } self.revision_only_fullname = { 'id': '010d34f384fa99d047cdd5e2f41e56e5c2feee45', 'directory': '85a74718d377195e1efd0843ba4f3260bad4fe07', 'parents': ['01e2d0627a9a6edb24c37db45db5ecb31e9de808'], 'author': { 'fullname': b'Linus Torvalds ', }, 'date': datetime.datetime(2015, 7, 12, 15, 10, 30, tzinfo=linus_tz), 'committer': { 'fullname': b'Linus Torvalds ', }, 'committer_date': datetime.datetime(2015, 7, 12, 15, 10, 30, tzinfo=linus_tz), 'message': b'Linux 4.2-rc2\n', 'metadata': { 'extra_headers': [ ['svn-repo-uuid', '046f1af7-66c2-d61b-5410-ce57b7db7bff'], ['svn-revision', 10], ] } } def test_revision_identifier(self): self.assertEqual( identifiers.revision_identifier(self.revision), identifiers.identifier_to_str(self.revision['id']), ) def test_revision_identifier_none_metadata(self): self.assertEqual( identifiers.revision_identifier(self.revision_none_metadata), identifiers.identifier_to_str(self.revision_none_metadata['id']), ) def test_revision_identifier_synthetic(self): self.assertEqual( identifiers.revision_identifier(self.synthetic_revision), identifiers.identifier_to_str(self.synthetic_revision['id']), ) def test_revision_identifier_with_extra_headers(self): self.assertEqual( identifiers.revision_identifier( self.revision_with_extra_headers), identifiers.identifier_to_str( self.revision_with_extra_headers['id']), ) def test_revision_identifier_with_gpgsig(self): self.assertEqual( identifiers.revision_identifier( self.revision_with_gpgsig), identifiers.identifier_to_str( self.revision_with_gpgsig['id']), ) def test_revision_identifier_no_message(self): self.assertEqual( identifiers.revision_identifier( self.revision_no_message), identifiers.identifier_to_str( self.revision_no_message['id']), ) def test_revision_identifier_empty_message(self): self.assertEqual( identifiers.revision_identifier( self.revision_empty_message), identifiers.identifier_to_str( self.revision_empty_message['id']), ) def test_revision_identifier_only_fullname(self): self.assertEqual( identifiers.revision_identifier( self.revision_only_fullname), identifiers.identifier_to_str( self.revision_only_fullname['id']), ) -class ReleaseIdentifier(unittest.TestCase): - def setUp(self): - linus_tz = datetime.timezone(datetime.timedelta(minutes=-420)) - - self.release = { - 'id': '2b10839e32c4c476e9d94492756bb1a3e1ec4aa8', - 'target': b't\x1b"R\xa5\xe1Ml`\xa9\x13\xc7z`\x99\xab\xe7:\x85J', - 'target_type': 'revision', - 'name': b'v2.6.14', - 'author': { - 'name': b'Linus Torvalds', - 'email': b'torvalds@g5.osdl.org', - }, - 'date': datetime.datetime(2005, 10, 27, 17, 2, 33, - tzinfo=linus_tz), - 'message': b'''\ +release_example = { + 'id': '2b10839e32c4c476e9d94492756bb1a3e1ec4aa8', + 'target': b't\x1b"R\xa5\xe1Ml`\xa9\x13\xc7z`\x99\xab\xe7:\x85J', + 'target_type': 'revision', + 'name': b'v2.6.14', + 'author': { + 'name': b'Linus Torvalds', + 'email': b'torvalds@g5.osdl.org', + 'fullname': b'Linus Torvalds ' + }, + 'date': datetime.datetime(2005, 10, 27, 17, 2, 33, + tzinfo=linus_tz), + 'message': b'''\ Linux 2.6.14 release -----BEGIN PGP SIGNATURE----- Version: GnuPG v1.4.1 (GNU/Linux) iD8DBQBDYWq6F3YsRnbiHLsRAmaeAJ9RCez0y8rOBbhSv344h86l/VVcugCeIhO1 wdLOnvj91G4wxYqrvThthbE= =7VeT -----END PGP SIGNATURE----- ''', - 'synthetic': False, - } + 'synthetic': False, +} + + +class ReleaseIdentifier(unittest.TestCase): + def setUp(self): + linus_tz = datetime.timezone(datetime.timedelta(minutes=-420)) + + self.release = release_example self.release_no_author = { 'id': b'&y\x1a\x8b\xcf\x0em3\xf4:\xefv\x82\xbd\xb5U#mV\xde', 'target': '9ee1c939d1cb936b1f98e8d81aeffab57bae46ab', 'target_type': 'revision', 'name': b'v2.6.12', 'message': b'''\ This is the final 2.6.12 release -----BEGIN PGP SIGNATURE----- Version: GnuPG v1.2.4 (GNU/Linux) iD8DBQBCsykyF3YsRnbiHLsRAvPNAJ482tCZwuxp/bJRz7Q98MHlN83TpACdHr37 o6X/3T+vm8K3bf3driRr34c= =sBHn -----END PGP SIGNATURE----- ''', 'synthetic': False, } self.release_no_message = { 'id': 'b6f4f446715f7d9543ef54e41b62982f0db40045', 'target': '9ee1c939d1cb936b1f98e8d81aeffab57bae46ab', 'target_type': 'revision', 'name': b'v2.6.12', 'author': { 'name': b'Linus Torvalds', 'email': b'torvalds@g5.osdl.org', }, 'date': datetime.datetime(2005, 10, 27, 17, 2, 33, tzinfo=linus_tz), 'message': None, } self.release_empty_message = { 'id': '71a0aea72444d396575dc25ac37fec87ee3c6492', 'target': '9ee1c939d1cb936b1f98e8d81aeffab57bae46ab', 'target_type': 'revision', 'name': b'v2.6.12', 'author': { 'name': b'Linus Torvalds', 'email': b'torvalds@g5.osdl.org', }, 'date': datetime.datetime(2005, 10, 27, 17, 2, 33, tzinfo=linus_tz), 'message': b'', } self.release_negative_utc = { 'id': '97c8d2573a001f88e72d75f596cf86b12b82fd01', 'name': b'20081029', 'target': '54e9abca4c77421e2921f5f156c9fe4a9f7441c7', 'target_type': 'revision', 'date': { 'timestamp': {'seconds': 1225281976}, 'offset': 0, 'negative_utc': True, }, 'author': { 'name': b'Otavio Salvador', 'email': b'otavio@debian.org', 'id': 17640, }, 'synthetic': False, 'message': b'tagging version 20081029\n\nr56558\n', } self.release_newline_in_author = { 'author': { 'email': b'esycat@gmail.com', 'fullname': b'Eugene Janusov\n', 'name': b'Eugene Janusov\n', }, 'date': { 'negative_utc': None, 'offset': 600, 'timestamp': { 'microseconds': 0, 'seconds': 1377480558, }, }, 'id': b'\\\x98\xf5Y\xd04\x16-\xe2->\xbe\xb9T3\xe6\xf8\x88R1', 'message': b'Release of v0.3.2.', 'name': b'0.3.2', 'synthetic': False, 'target': (b'\xc0j\xa3\xd9;x\xa2\x86\\I5\x17' b'\x000\xf8\xc2\xd79o\xd3'), 'target_type': 'revision', } self.release_snapshot_target = dict(self.release) self.release_snapshot_target['target_type'] = 'snapshot' self.release_snapshot_target['id'] = ( 'c29c3ddcc6769a04e54dd69d63a6fdcbc566f850') def test_release_identifier(self): self.assertEqual( identifiers.release_identifier(self.release), identifiers.identifier_to_str(self.release['id']) ) def test_release_identifier_no_author(self): self.assertEqual( identifiers.release_identifier(self.release_no_author), identifiers.identifier_to_str(self.release_no_author['id']) ) def test_release_identifier_no_message(self): self.assertEqual( identifiers.release_identifier(self.release_no_message), identifiers.identifier_to_str(self.release_no_message['id']) ) def test_release_identifier_empty_message(self): self.assertEqual( identifiers.release_identifier(self.release_empty_message), identifiers.identifier_to_str(self.release_empty_message['id']) ) def test_release_identifier_negative_utc(self): self.assertEqual( identifiers.release_identifier(self.release_negative_utc), identifiers.identifier_to_str(self.release_negative_utc['id']) ) def test_release_identifier_newline_in_author(self): self.assertEqual( identifiers.release_identifier(self.release_newline_in_author), identifiers.identifier_to_str(self.release_newline_in_author['id']) ) def test_release_identifier_snapshot_target(self): self.assertEqual( identifiers.release_identifier(self.release_snapshot_target), identifiers.identifier_to_str(self.release_snapshot_target['id']) ) +snapshot_example = { + 'id': hash_to_bytes('6e65b86363953b780d92b0a928f3e8fcdd10db36'), + 'branches': { + b'directory': { + 'target': hash_to_bytes( + '1bd0e65f7d2ff14ae994de17a1e7fe65111dcad8'), + 'target_type': 'directory', + }, + b'content': { + 'target': hash_to_bytes( + 'fe95a46679d128ff167b7c55df5d02356c5a1ae1'), + 'target_type': 'content', + }, + b'alias': { + 'target': b'revision', + 'target_type': 'alias', + }, + b'revision': { + 'target': hash_to_bytes( + 'aafb16d69fd30ff58afdd69036a26047f3aebdc6'), + 'target_type': 'revision', + }, + b'release': { + 'target': hash_to_bytes( + '7045404f3d1c54e6473c71bbb716529fbad4be24'), + 'target_type': 'release', + }, + b'snapshot': { + 'target': hash_to_bytes( + '1a8893e6a86f444e8be8e7bda6cb34fb1735a00e' + ), + 'target_type': 'snapshot', + }, + b'dangling': None, + } +} + + class SnapshotIdentifier(unittest.TestCase): def setUp(self): super().setUp() self.empty = { 'id': '1a8893e6a86f444e8be8e7bda6cb34fb1735a00e', 'branches': {}, } self.dangling_branch = { 'id': 'c84502e821eb21ed84e9fd3ec40973abc8b32353', 'branches': { b'HEAD': None, }, } self.unresolved = { 'id': '84b4548ea486e4b0a7933fa541ff1503a0afe1e0', 'branches': { b'foo': { 'target': b'bar', 'target_type': 'alias', }, }, } - self.all_types = { - 'id': '6e65b86363953b780d92b0a928f3e8fcdd10db36', - 'branches': { - b'directory': { - 'target': '1bd0e65f7d2ff14ae994de17a1e7fe65111dcad8', - 'target_type': 'directory', - }, - b'content': { - 'target': 'fe95a46679d128ff167b7c55df5d02356c5a1ae1', - 'target_type': 'content', - }, - b'alias': { - 'target': b'revision', - 'target_type': 'alias', - }, - b'revision': { - 'target': 'aafb16d69fd30ff58afdd69036a26047f3aebdc6', - 'target_type': 'revision', - }, - b'release': { - 'target': '7045404f3d1c54e6473c71bbb716529fbad4be24', - 'target_type': 'release', - }, - b'snapshot': { - 'target': '1a8893e6a86f444e8be8e7bda6cb34fb1735a00e', - 'target_type': 'snapshot', - }, - b'dangling': None, - } - } + self.all_types = snapshot_example def test_empty_snapshot(self): self.assertEqual( identifiers.snapshot_identifier(self.empty), identifiers.identifier_to_str(self.empty['id']), ) def test_dangling_branch(self): self.assertEqual( identifiers.snapshot_identifier(self.dangling_branch), identifiers.identifier_to_str(self.dangling_branch['id']), ) def test_unresolved(self): with self.assertRaisesRegex(ValueError, "b'foo' -> b'bar'"): identifiers.snapshot_identifier(self.unresolved) def test_unresolved_force(self): self.assertEqual( identifiers.snapshot_identifier( self.unresolved, ignore_unresolved=True, ), identifiers.identifier_to_str(self.unresolved['id']), ) def test_all_types(self): self.assertEqual( identifiers.snapshot_identifier(self.all_types), identifiers.identifier_to_str(self.all_types['id']), ) def test_persistent_identifier(self): _snapshot_id = hashutil.hash_to_bytes( 'c7c108084bc0bf3d81436bf980b46e98bd338453') _release_id = '22ece559cc7cc2364edc5e5593d63ae8bd229f9f' _revision_id = '309cf2674ee7a0749978cf8265ab91a60aea0f7d' _directory_id = 'd198bc9d7a6bcf6db04f476d29314f157507d505' _content_id = '94a9ed024d3859793618152ea559a168bbcbb5e2' _snapshot = {'id': _snapshot_id} _release = {'id': _release_id} _revision = {'id': _revision_id} _directory = {'id': _directory_id} _content = {'sha1_git': _content_id} for full_type, _hash, expected_persistent_id, version, _meta in [ (SNAPSHOT, _snapshot_id, 'swh:1:snp:c7c108084bc0bf3d81436bf980b46e98bd338453', None, {}), (RELEASE, _release_id, 'swh:1:rel:22ece559cc7cc2364edc5e5593d63ae8bd229f9f', 1, {}), (REVISION, _revision_id, 'swh:1:rev:309cf2674ee7a0749978cf8265ab91a60aea0f7d', None, {}), (DIRECTORY, _directory_id, 'swh:1:dir:d198bc9d7a6bcf6db04f476d29314f157507d505', None, {}), (CONTENT, _content_id, 'swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2', 1, {}), (SNAPSHOT, _snapshot, 'swh:1:snp:c7c108084bc0bf3d81436bf980b46e98bd338453', None, {}), (RELEASE, _release, 'swh:1:rel:22ece559cc7cc2364edc5e5593d63ae8bd229f9f', 1, {}), (REVISION, _revision, 'swh:1:rev:309cf2674ee7a0749978cf8265ab91a60aea0f7d', None, {}), (DIRECTORY, _directory, 'swh:1:dir:d198bc9d7a6bcf6db04f476d29314f157507d505', None, {}), (CONTENT, _content, 'swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2', 1, {}), (CONTENT, _content, 'swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2;origin=1', 1, {'origin': '1'}), ]: if version: actual_value = identifiers.persistent_identifier( full_type, _hash, version, metadata=_meta) else: actual_value = identifiers.persistent_identifier( full_type, _hash, metadata=_meta) self.assertEqual(actual_value, expected_persistent_id) def test_persistent_identifier_wrong_input(self): _snapshot_id = 'notahash4bc0bf3d81436bf980b46e98bd338453' _snapshot = {'id': _snapshot_id} for _type, _hash in [ (SNAPSHOT, _snapshot_id), (SNAPSHOT, _snapshot), ('foo', ''), ]: with self.assertRaises(ValidationError): identifiers.persistent_identifier(_type, _hash) def test_parse_persistent_identifier(self): for pid, _type, _version, _hash in [ ('swh:1:cnt:94a9ed024d3859793618152ea559a168bbcbb5e2', CONTENT, 1, '94a9ed024d3859793618152ea559a168bbcbb5e2'), ('swh:1:dir:d198bc9d7a6bcf6db04f476d29314f157507d505', DIRECTORY, 1, 'd198bc9d7a6bcf6db04f476d29314f157507d505'), ('swh:1:rev:309cf2674ee7a0749978cf8265ab91a60aea0f7d', REVISION, 1, '309cf2674ee7a0749978cf8265ab91a60aea0f7d'), ('swh:1:rel:22ece559cc7cc2364edc5e5593d63ae8bd229f9f', RELEASE, 1, '22ece559cc7cc2364edc5e5593d63ae8bd229f9f'), ('swh:1:snp:c7c108084bc0bf3d81436bf980b46e98bd338453', SNAPSHOT, 1, 'c7c108084bc0bf3d81436bf980b46e98bd338453'), ]: expected_result = PersistentId( namespace='swh', scheme_version=_version, object_type=_type, object_id=_hash, metadata={} ) actual_result = identifiers.parse_persistent_identifier(pid) self.assertEqual(actual_result, expected_result) for pid, _type, _version, _hash, _metadata in [ ('swh:1:cnt:9c95815d9e9d91b8dae8e05d8bbc696fe19f796b;lines=1-18;origin=https://github.com/python/cpython', # noqa CONTENT, 1, '9c95815d9e9d91b8dae8e05d8bbc696fe19f796b', { 'lines': '1-18', 'origin': 'https://github.com/python/cpython' }), ('swh:1:dir:0b6959356d30f1a4e9b7f6bca59b9a336464c03d;origin=deb://Debian/packages/linuxdoc-tools', # noqa DIRECTORY, 1, '0b6959356d30f1a4e9b7f6bca59b9a336464c03d', { 'origin': 'deb://Debian/packages/linuxdoc-tools' }) ]: expected_result = PersistentId( namespace='swh', scheme_version=_version, object_type=_type, object_id=_hash, metadata=_metadata ) actual_result = identifiers.parse_persistent_identifier(pid) self.assertEqual(actual_result, expected_result) def test_parse_persistent_identifier_parsing_error(self): for pid in [ ('swh:1:cnt'), ('swh:1:'), ('swh:'), ('swh:1:cnt:'), ('foo:1:cnt:abc8bc9d7a6bcf6db04f476d29314f157507d505'), ('swh:2:dir:def8bc9d7a6bcf6db04f476d29314f157507d505'), ('swh:1:foo:fed8bc9d7a6bcf6db04f476d29314f157507d505'), ('swh:1:dir:0b6959356d30f1a4e9b7f6bca59b9a336464c03d;invalid;' 'malformed'), ('swh:1:snp:gh6959356d30f1a4e9b7f6bca59b9a336464c03d'), ('swh:1:snp:foo'), ]: with self.assertRaises(ValidationError): identifiers.parse_persistent_identifier(pid) def test_persistentid_class_validation_error(self): for _ns, _version, _type, _id in [ ('foo', 1, CONTENT, 'abc8bc9d7a6bcf6db04f476d29314f157507d505'), ('swh', 2, DIRECTORY, 'def8bc9d7a6bcf6db04f476d29314f157507d505'), ('swh', 1, 'foo', 'fed8bc9d7a6bcf6db04f476d29314f157507d505'), ('swh', 1, SNAPSHOT, 'gh6959356d30f1a4e9b7f6bca59b9a336464c03d'), ]: with self.assertRaises(ValidationError): PersistentId( namespace=_ns, scheme_version=_version, object_type=_type, object_id=_id ) class OriginIdentifier(unittest.TestCase): def setUp(self): self.origin = { 'url': 'https://github.com/torvalds/linux', } def test_content_identifier(self): self.assertEqual(identifiers.origin_identifier(self.origin), 'b63a575fe3faab7692c9f38fb09d4bb45651bb0f') diff --git a/swh/model/tests/test_model.py b/swh/model/tests/test_model.py index f65dd28..a5719ca 100644 --- a/swh/model/tests/test_model.py +++ b/swh/model/tests/test_model.py @@ -1,54 +1,114 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import copy from hypothesis import given -from swh.model.model import Content +from swh.model.model import Content, Directory, Revision, Release, Snapshot +from swh.model.hashutil import hash_to_bytes from swh.model.hypothesis_strategies import objects, origins, origin_visits +from swh.model.identifiers import ( + directory_identifier, revision_identifier, release_identifier, + snapshot_identifier +) +from swh.model.tests.test_identifiers import ( + directory_example, revision_example, release_example, snapshot_example +) @given(objects()) def test_todict_inverse_fromdict(objtype_and_obj): (obj_type, obj) = objtype_and_obj if obj_type in ('origin', 'origin_visit'): return obj_as_dict = obj.to_dict() obj_as_dict_copy = copy.deepcopy(obj_as_dict) # Check the composition of to_dict and from_dict is the identity assert obj == type(obj).from_dict(obj_as_dict) # Check from_dict() does not change the input dict assert obj_as_dict == obj_as_dict_copy # Check the composition of from_dict and to_dict is the identity assert obj_as_dict == type(obj).from_dict(obj_as_dict).to_dict() @given(origins()) def test_todict_origins(origin): obj = origin.to_dict() assert 'type' not in obj assert type(origin)(url=origin.url) == type(origin).from_dict(obj) @given(origin_visits()) def test_todict_origin_visits(origin_visit): obj = origin_visit.to_dict() assert origin_visit == type(origin_visit).from_dict(obj) def test_content_get_hash(): hashes = dict( sha1=b'foo', sha1_git=b'bar', sha256=b'baz', blake2s256=b'qux') c = Content(length=42, status='visible', **hashes) for (hash_name, hash_) in hashes.items(): assert c.get_hash(hash_name) == hash_ + + +def test_directory_model_id_computation(): + dir_dict = dict(directory_example) + del dir_dict['id'] + + dir_model = Directory(**dir_dict) + assert dir_model.id + assert dir_model.id == hash_to_bytes(directory_identifier(dir_dict)) + + dir_model = Directory.from_dict(dir_dict) + assert dir_model.id + assert dir_model.id == hash_to_bytes(directory_identifier(dir_dict)) + + +def test_revision_model_id_computation(): + rev_dict = dict(revision_example) + del rev_dict['id'] + + rev_model = Revision(**rev_dict) + assert rev_model.id + assert rev_model.id == hash_to_bytes(revision_identifier(rev_dict)) + + rev_model = Revision.from_dict(rev_dict) + assert rev_model.id + assert rev_model.id == hash_to_bytes(revision_identifier(rev_dict)) + + +def test_release_model_id_computation(): + rel_dict = dict(release_example) + del rel_dict['id'] + + rel_model = Release(**rel_dict) + assert rel_model.id + assert rel_model.id == hash_to_bytes(release_identifier(rel_dict)) + + rel_model = Release.from_dict(rel_dict) + assert rel_model.id + assert rel_model.id == hash_to_bytes(release_identifier(rel_dict)) + + +def test_snapshot_model_id_computation(): + snp_dict = dict(snapshot_example) + del snp_dict['id'] + + snp_model = Snapshot(**snp_dict) + assert snp_model.id + assert snp_model.id == hash_to_bytes(snapshot_identifier(snp_dict)) + + snp_model = Snapshot.from_dict(snp_dict) + assert snp_model.id + assert snp_model.id == hash_to_bytes(snapshot_identifier(snp_dict))