Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9311913
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
17 KB
Subscribers
None
View Options
diff --git a/swh/model/model.py b/swh/model/model.py
index ec644fa..4a308a4 100644
--- a/swh/model/model.py
+++ b/swh/model/model.py
@@ -1,402 +1,410 @@
# Copyright (C) 2018-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
from abc import ABCMeta, abstractmethod
from enum import Enum
from typing import List, Optional, Dict
import attr
import dateutil.parser
from .identifiers import (
normalize_timestamp, directory_identifier, revision_identifier,
release_identifier, snapshot_identifier
)
from .hashutil import DEFAULT_ALGORITHMS, hash_to_bytes
SHA1_SIZE = 20
# TODO: Limit this to 20 bytes
Sha1Git = bytes
class BaseModel:
"""Base class for SWH model classes.
Provides serialization/deserialization to/from Python dictionaries,
that are suitable for JSON/msgpack-like formats."""
def to_dict(self):
"""Wrapper of `attr.asdict` that can be overridden by subclasses
that have special handling of some of the fields."""
def dictify(value):
if isinstance(value, BaseModel):
return value.to_dict()
elif isinstance(value, Enum):
return value.value
elif isinstance(value, dict):
return {k: dictify(v) for k, v in value.items()}
elif isinstance(value, list):
return [dictify(v) for v in value]
else:
return value
ret = attr.asdict(self, recurse=False)
return dictify(ret)
@classmethod
def from_dict(cls, d):
"""Takes a dictionary representing a tree of SWH objects, and
recursively builds the corresponding objects."""
return cls(**d)
class HashableObject(metaclass=ABCMeta):
"""Mixin to automatically compute object identifier hash when
the associated model is instantiated."""
@staticmethod
@abstractmethod
def compute_hash(object_dict):
"""Derived model classes must implement this to compute
the object hash from its dict representation."""
pass
def __attrs_post_init__(self):
if not self.id:
obj_id = hash_to_bytes(self.compute_hash(self.to_dict()))
object.__setattr__(self, 'id', obj_id)
@attr.s(frozen=True)
class Person(BaseModel):
"""Represents the author/committer of a revision or release."""
name = attr.ib(type=bytes)
email = attr.ib(type=bytes)
fullname = attr.ib(type=bytes)
@attr.s(frozen=True)
class Timestamp(BaseModel):
"""Represents a naive timestamp from a VCS."""
seconds = attr.ib(type=int)
microseconds = attr.ib(type=int)
@seconds.validator
def check_seconds(self, attribute, value):
"""Check that seconds fit in a 64-bits signed integer."""
if not (-2**63 <= value < 2**63):
raise ValueError('Seconds must be a signed 64-bits integer.')
@microseconds.validator
def check_microseconds(self, attribute, value):
"""Checks that microseconds are positive and < 1000000."""
if not (0 <= value < 10**6):
raise ValueError('Microseconds must be in [0, 1000000[.')
@attr.s(frozen=True)
class TimestampWithTimezone(BaseModel):
"""Represents a TZ-aware timestamp from a VCS."""
timestamp = attr.ib(type=Timestamp)
offset = attr.ib(type=int)
negative_utc = attr.ib(type=bool)
@offset.validator
def check_offset(self, attribute, value):
"""Checks the offset is a 16-bits signed integer (in theory, it
should always be between -14 and +14 hours)."""
if not (-2**15 <= value < 2**15):
# max 14 hours offset in theory, but you never know what
# you'll find in the wild...
raise ValueError('offset too large: %d minutes' % value)
@classmethod
def from_dict(cls, d):
"""Builds a TimestampWithTimezone from any of the formats
accepted by :func:`swh.model.normalize_timestamp`."""
d = normalize_timestamp(d)
return cls(
timestamp=Timestamp.from_dict(d['timestamp']),
offset=d['offset'],
negative_utc=d['negative_utc'])
@attr.s(frozen=True)
class Origin(BaseModel):
"""Represents a software source: a VCS and an URL."""
url = attr.ib(type=str)
type = attr.ib(type=Optional[str], default=None)
def to_dict(self):
r = super().to_dict()
r.pop('type', None)
return r
@attr.s(frozen=True)
class OriginVisit(BaseModel):
"""Represents a visit of an origin at a given point in time, by a
SWH loader."""
origin = attr.ib(type=str)
date = attr.ib(type=datetime.datetime)
status = attr.ib(
type=str,
validator=attr.validators.in_(['ongoing', 'full', 'partial']))
type = attr.ib(type=str)
snapshot = attr.ib(type=Sha1Git)
metadata = attr.ib(type=Optional[Dict[str, object]],
default=None)
visit = attr.ib(type=Optional[int],
default=None)
"""Should not be set before calling 'origin_visit_add()'."""
def to_dict(self):
"""Serializes the date as a string and omits the visit id if it is
`None`."""
ov = super().to_dict()
if ov['visit'] is None:
del ov['visit']
return ov
@classmethod
def from_dict(cls, d):
"""Parses the date from a string, and accepts missing visit ids."""
d = d.copy()
date = d.pop('date')
return cls(
date=(date
if isinstance(date, datetime.datetime)
else dateutil.parser.parse(date)),
**d)
class TargetType(Enum):
"""The type of content pointed to by a snapshot branch. Usually a
revision or an alias."""
CONTENT = 'content'
DIRECTORY = 'directory'
REVISION = 'revision'
RELEASE = 'release'
SNAPSHOT = 'snapshot'
ALIAS = 'alias'
class ObjectType(Enum):
"""The type of content pointed to by a release. Usually a revision"""
CONTENT = 'content'
DIRECTORY = 'directory'
REVISION = 'revision'
RELEASE = 'release'
SNAPSHOT = 'snapshot'
@attr.s(frozen=True)
class SnapshotBranch(BaseModel):
"""Represents one of the branches of a snapshot."""
target = attr.ib(type=bytes)
target_type = attr.ib(type=TargetType)
@target.validator
def check_target(self, attribute, value):
"""Checks the target type is not an alias, checks the target is a
valid sha1_git."""
if self.target_type != TargetType.ALIAS and self.target is not None:
if len(value) != 20:
raise ValueError('Wrong length for bytes identifier: %d' %
len(value))
@classmethod
def from_dict(cls, d):
return cls(
target=d['target'],
target_type=TargetType(d['target_type']))
@attr.s(frozen=True)
class Snapshot(BaseModel, HashableObject):
"""Represents the full state of an origin at a given point in time."""
branches = attr.ib(type=Dict[bytes, Optional[SnapshotBranch]])
id = attr.ib(type=Sha1Git, default=b'')
@staticmethod
def compute_hash(object_dict):
return snapshot_identifier(object_dict)
@classmethod
def from_dict(cls, d):
d = d.copy()
return cls(
branches={
name: SnapshotBranch.from_dict(branch) if branch else None
for (name, branch) in d.pop('branches').items()
},
**d)
@attr.s(frozen=True)
class Release(BaseModel, HashableObject):
name = attr.ib(type=bytes)
message = attr.ib(type=bytes)
target = attr.ib(type=Optional[Sha1Git])
target_type = attr.ib(type=ObjectType)
synthetic = attr.ib(type=bool)
author = attr.ib(type=Optional[Person],
default=None)
date = attr.ib(type=Optional[TimestampWithTimezone],
default=None)
metadata = attr.ib(type=Optional[Dict[str, object]],
default=None)
id = attr.ib(type=Sha1Git, default=b'')
@staticmethod
def compute_hash(object_dict):
return release_identifier(object_dict)
@author.validator
def check_author(self, attribute, value):
"""If the author is `None`, checks the date is `None` too."""
if self.author is None and self.date is not None:
raise ValueError('release date must be None if author is None.')
def to_dict(self):
rel = super().to_dict()
if rel['metadata'] is None:
del rel['metadata']
return rel
@classmethod
def from_dict(cls, d):
d = d.copy()
if d.get('author'):
d['author'] = Person.from_dict(d['author'])
if d.get('date'):
d['date'] = TimestampWithTimezone.from_dict(d['date'])
return cls(
target_type=ObjectType(d.pop('target_type')),
**d)
class RevisionType(Enum):
GIT = 'git'
TAR = 'tar'
DSC = 'dsc'
SUBVERSION = 'svn'
MERCURIAL = 'hg'
@attr.s(frozen=True)
class Revision(BaseModel, HashableObject):
message = attr.ib(type=bytes)
author = attr.ib(type=Person)
committer = attr.ib(type=Person)
- date = attr.ib(type=TimestampWithTimezone)
- committer_date = attr.ib(type=TimestampWithTimezone)
+ date = attr.ib(type=Optional[TimestampWithTimezone])
+ committer_date = attr.ib(type=Optional[TimestampWithTimezone])
type = attr.ib(type=RevisionType)
directory = attr.ib(type=Sha1Git)
synthetic = attr.ib(type=bool)
metadata = attr.ib(type=Optional[Dict[str, object]],
default=None)
parents = attr.ib(type=List[Sha1Git],
default=attr.Factory(list))
id = attr.ib(type=Sha1Git, default=b'')
@staticmethod
def compute_hash(object_dict):
return revision_identifier(object_dict)
@classmethod
def from_dict(cls, d):
d = d.copy()
+ date = d.pop('date')
+ if date:
+ date = TimestampWithTimezone.from_dict(date)
+
+ committer_date = d.pop('committer_date')
+ if committer_date:
+ committer_date = TimestampWithTimezone.from_dict(
+ committer_date)
+
return cls(
author=Person.from_dict(d.pop('author')),
committer=Person.from_dict(d.pop('committer')),
- date=TimestampWithTimezone.from_dict(d.pop('date')),
- committer_date=TimestampWithTimezone.from_dict(
- d.pop('committer_date')),
+ date=date,
+ committer_date=committer_date,
type=RevisionType(d.pop('type')),
**d)
@attr.s(frozen=True)
class DirectoryEntry(BaseModel):
name = attr.ib(type=bytes)
type = attr.ib(type=str,
validator=attr.validators.in_(['file', 'dir', 'rev']))
target = attr.ib(type=Sha1Git)
perms = attr.ib(type=int)
"""Usually one of the values of `swh.model.from_disk.DentryPerms`."""
@attr.s(frozen=True)
class Directory(BaseModel, HashableObject):
entries = attr.ib(type=List[DirectoryEntry])
id = attr.ib(type=Sha1Git, default=b'')
@staticmethod
def compute_hash(object_dict):
return directory_identifier(object_dict)
@classmethod
def from_dict(cls, d):
d = d.copy()
return cls(
entries=[DirectoryEntry.from_dict(entry)
for entry in d.pop('entries')],
**d)
@attr.s(frozen=True)
class Content(BaseModel):
sha1 = attr.ib(type=bytes)
sha1_git = attr.ib(type=Sha1Git)
sha256 = attr.ib(type=bytes)
blake2s256 = attr.ib(type=bytes)
length = attr.ib(type=int)
status = attr.ib(
type=str,
validator=attr.validators.in_(['visible', 'absent', 'hidden']))
reason = attr.ib(type=Optional[str],
default=None)
data = attr.ib(type=Optional[bytes],
default=None)
ctime = attr.ib(type=Optional[datetime.datetime],
default=None)
@length.validator
def check_length(self, attribute, value):
"""Checks the length is positive."""
if self.status == 'absent' and value < -1:
raise ValueError('Length must be positive or -1.')
elif self.status != 'absent' and value < 0:
raise ValueError('Length must be positive, unless status=absent.')
@reason.validator
def check_reason(self, attribute, value):
"""Checks the reason is full if status != absent."""
assert self.reason == value
if self.status == 'absent' and value is None:
raise ValueError('Must provide a reason if content is absent.')
elif self.status != 'absent' and value is not None:
raise ValueError(
'Must not provide a reason if content is not absent.')
def to_dict(self):
content = super().to_dict()
for field in ('data', 'reason', 'ctime'):
if content[field] is None:
del content[field]
return content
def get_hash(self, hash_name):
if hash_name not in DEFAULT_ALGORITHMS:
raise ValueError('{} is not a valid hash name.'.format(hash_name))
return getattr(self, hash_name)
diff --git a/swh/model/tests/test_model.py b/swh/model/tests/test_model.py
index a5719ca..4acc3be 100644
--- a/swh/model/tests/test_model.py
+++ b/swh/model/tests/test_model.py
@@ -1,114 +1,130 @@
# Copyright (C) 2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import copy
from hypothesis import given
from swh.model.model import Content, Directory, Revision, Release, Snapshot
from swh.model.hashutil import hash_to_bytes
from swh.model.hypothesis_strategies import objects, origins, origin_visits
from swh.model.identifiers import (
directory_identifier, revision_identifier, release_identifier,
snapshot_identifier
)
from swh.model.tests.test_identifiers import (
directory_example, revision_example, release_example, snapshot_example
)
@given(objects())
def test_todict_inverse_fromdict(objtype_and_obj):
(obj_type, obj) = objtype_and_obj
if obj_type in ('origin', 'origin_visit'):
return
obj_as_dict = obj.to_dict()
obj_as_dict_copy = copy.deepcopy(obj_as_dict)
# Check the composition of to_dict and from_dict is the identity
assert obj == type(obj).from_dict(obj_as_dict)
# Check from_dict() does not change the input dict
assert obj_as_dict == obj_as_dict_copy
# Check the composition of from_dict and to_dict is the identity
assert obj_as_dict == type(obj).from_dict(obj_as_dict).to_dict()
@given(origins())
def test_todict_origins(origin):
obj = origin.to_dict()
assert 'type' not in obj
assert type(origin)(url=origin.url) == type(origin).from_dict(obj)
@given(origin_visits())
def test_todict_origin_visits(origin_visit):
obj = origin_visit.to_dict()
assert origin_visit == type(origin_visit).from_dict(obj)
def test_content_get_hash():
hashes = dict(
sha1=b'foo', sha1_git=b'bar', sha256=b'baz', blake2s256=b'qux')
c = Content(length=42, status='visible', **hashes)
for (hash_name, hash_) in hashes.items():
assert c.get_hash(hash_name) == hash_
def test_directory_model_id_computation():
dir_dict = dict(directory_example)
del dir_dict['id']
dir_model = Directory(**dir_dict)
assert dir_model.id
assert dir_model.id == hash_to_bytes(directory_identifier(dir_dict))
dir_model = Directory.from_dict(dir_dict)
assert dir_model.id
assert dir_model.id == hash_to_bytes(directory_identifier(dir_dict))
def test_revision_model_id_computation():
rev_dict = dict(revision_example)
del rev_dict['id']
rev_model = Revision(**rev_dict)
assert rev_model.id
assert rev_model.id == hash_to_bytes(revision_identifier(rev_dict))
rev_model = Revision.from_dict(rev_dict)
assert rev_model.id
assert rev_model.id == hash_to_bytes(revision_identifier(rev_dict))
+def test_revision_model_id_computation_with_no_date():
+ """We can have revision with date to None
+
+ """
+ rev_dict = dict(revision_example)
+ rev_dict['date'] = None
+ rev_dict['committer_date'] = None
+ del rev_dict['id']
+
+ rev_id = hash_to_bytes(revision_identifier(rev_dict))
+ for rev_model in [Revision(**rev_dict), Revision.from_dict(rev_dict)]:
+ assert rev_model.date is None
+ assert rev_model.committer_date is None
+ assert rev_model.id == rev_id
+
+
def test_release_model_id_computation():
rel_dict = dict(release_example)
del rel_dict['id']
rel_model = Release(**rel_dict)
assert rel_model.id
assert rel_model.id == hash_to_bytes(release_identifier(rel_dict))
rel_model = Release.from_dict(rel_dict)
assert rel_model.id
assert rel_model.id == hash_to_bytes(release_identifier(rel_dict))
def test_snapshot_model_id_computation():
snp_dict = dict(snapshot_example)
del snp_dict['id']
snp_model = Snapshot(**snp_dict)
assert snp_model.id
assert snp_model.id == hash_to_bytes(snapshot_identifier(snp_dict))
snp_model = Snapshot.from_dict(snp_dict)
assert snp_model.id
assert snp_model.id == hash_to_bytes(snapshot_identifier(snp_dict))
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Thu, Jul 3, 10:36 AM (2 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3250897
Attached To
rDMOD Data model
Event Timeline
Log In to Comment