Changeset View
Changeset View
Standalone View
Standalone View
swh/model/model.py
# Copyright (C) 2018-2019 The Software Heritage developers | # Copyright (C) 2018-2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import datetime | import datetime | ||||
from enum import Enum | from enum import Enum | ||||
from typing import List, Optional, Dict | from typing import List, Optional, Dict | ||||
import attr | import attr | ||||
import dateutil.parser | |||||
from .identifiers import normalize_timestamp | |||||
# TODO: Limit this to 20 bytes | # TODO: Limit this to 20 bytes | ||||
Sha1Git = bytes | Sha1Git = bytes | ||||
def contains_optional_validator(validator): | |||||
"""Inspects an attribute's validator to find its type. | |||||
Inspired by `hypothesis/searchstrategy/attrs.py`.""" | |||||
if isinstance(validator, attr.validators._OptionalValidator): | |||||
return True | |||||
elif isinstance(validator, attr.validators._AndValidator): | |||||
for validator in validator._validators: | |||||
res = contains_optional_validator(validator) | |||||
if res: | |||||
return True | |||||
else: | |||||
return False | |||||
class BaseModel: | |||||
douardda: I'm a bit puzzled by the fact the commit message says:
> Add a from_dict() method to model… | |||||
Done Inline Actions
It did exist, but was missing from some classes where is was not necessary. vlorentz: > so I read it like "to_dict already exists"... which looks like it is not the case, at least… | |||||
"""Base class for SWH model classes. | |||||
Provides serialization/deserialization to/from Python dictionaries, | |||||
that are suitable for JSON/msgpack-like formats.""" | |||||
def to_dict(self): | |||||
"""Wrapper of `attr.asdict` that can be overriden by subclasses | |||||
that have special handling of some of the fields.""" | |||||
return attr.asdict(self) | |||||
@classmethod | |||||
def from_dict(cls, d): | |||||
"""Takes a dictionary representing a tree of SWH objects, and | |||||
recursively builds the corresponding objects.""" | |||||
if not isinstance(d, dict): | |||||
raise TypeError( | |||||
'%s.from_dict expects a dict, not %r' % (cls.__name__, d)) | |||||
for (name, attribute) in attr.fields_dict(cls).items(): | |||||
type_ = attribute.type | |||||
# Heuristic to detect `Optional[X]` and unwrap it to `X`. | |||||
if contains_optional_validator(attribute.validator): | |||||
if name not in d: | |||||
continue | |||||
if d[name] is None: | |||||
del d[name] | |||||
continue | |||||
else: | |||||
type_ = type_.__args__[0] | |||||
# Construct an object of the expected type | |||||
if issubclass(type_, BaseModel): | |||||
d[name] = type_.from_dict(d[name]) | |||||
elif issubclass(type_, Enum): | |||||
d[name] = type_(d[name]) | |||||
else: | |||||
pass | |||||
return cls(**d) | |||||
@attr.s | @attr.s | ||||
class Person: | class Person(BaseModel): | ||||
"""Represents the author/committer of a revision or release.""" | |||||
name = attr.ib(type=bytes) | name = attr.ib(type=bytes) | ||||
email = attr.ib(type=bytes) | email = attr.ib(type=bytes) | ||||
fullname = attr.ib(type=bytes) | fullname = attr.ib(type=bytes) | ||||
@attr.s | @attr.s | ||||
class Timestamp: | class Timestamp(BaseModel): | ||||
"""Represents a naive timestamp from a VCS.""" | |||||
seconds = attr.ib(type=int) | seconds = attr.ib(type=int) | ||||
microseconds = attr.ib(type=int) | microseconds = attr.ib(type=int) | ||||
@seconds.validator | @seconds.validator | ||||
def check_seconds(self, attribute, value): | def check_seconds(self, attribute, value): | ||||
"""Check that seconds fit in a 64-bits signed integer.""" | """Check that seconds fit in a 64-bits signed integer.""" | ||||
if not (-2**63 <= value < 2**63): | if not (-2**63 <= value < 2**63): | ||||
raise ValueError('Seconds must be a signed 64-bits integer.') | raise ValueError('Seconds must be a signed 64-bits integer.') | ||||
@microseconds.validator | @microseconds.validator | ||||
def check_microseconds(self, attribute, value): | def check_microseconds(self, attribute, value): | ||||
"""Checks that microseconds are positive and < 1000000.""" | """Checks that microseconds are positive and < 1000000.""" | ||||
if not (0 <= value < 10**6): | if not (0 <= value < 10**6): | ||||
raise ValueError('Microseconds must be in [0, 1000000[.') | raise ValueError('Microseconds must be in [0, 1000000[.') | ||||
@attr.s | @attr.s | ||||
class TimestampWithTimezone: | class TimestampWithTimezone(BaseModel): | ||||
"""Represents a TZ-aware timestamp from a VCS.""" | |||||
timestamp = attr.ib(type=Timestamp) | timestamp = attr.ib(type=Timestamp) | ||||
offset = attr.ib(type=int) | offset = attr.ib(type=int) | ||||
negative_utc = attr.ib(type=bool) | negative_utc = attr.ib(type=bool) | ||||
def to_dict(self): | |||||
return attr.asdict(self) | |||||
@offset.validator | @offset.validator | ||||
def check_offset(self, attribute, value): | def check_offset(self, attribute, value): | ||||
"""Checks the offset is a 16-bits signed integer (in theory, it | |||||
should always be between -14 and +14 hours).""" | |||||
if not (-2**15 <= value < 2**15): | if not (-2**15 <= value < 2**15): | ||||
# max 14 hours offset in theory, but you never know what | # max 14 hours offset in theory, but you never know what | ||||
# you'll find in the wild... | # you'll find in the wild... | ||||
raise ValueError('offset too large: %d minutes' % value) | raise ValueError('offset too large: %d minutes' % value) | ||||
@classmethod | |||||
def from_dict(cls, d): | |||||
"""Builds a TimestampWithTimezone from any of the formats | |||||
accepted by :py:`swh.model.normalize_timestamp`.""" | |||||
return super().from_dict(normalize_timestamp(d)) | |||||
@attr.s | @attr.s | ||||
class Origin: | class Origin(BaseModel): | ||||
"""Represents a software source: a VCS and an URL.""" | |||||
type = attr.ib(type=str) | type = attr.ib(type=str) | ||||
url = attr.ib(type=str) | url = attr.ib(type=str) | ||||
def to_dict(self): | |||||
return attr.asdict(self) | |||||
@attr.s | @attr.s | ||||
class OriginVisit: | class OriginVisit(BaseModel): | ||||
"""Represents a visit of an origin at a given point in time, by a | |||||
SWH loader.""" | |||||
origin = attr.ib(type=Origin) | origin = attr.ib(type=Origin) | ||||
date = attr.ib(type=datetime.datetime) | date = attr.ib(type=datetime.datetime) | ||||
visit = attr.ib(type=Optional[int]) | visit = attr.ib(type=Optional[int], | ||||
validator=attr.validators.optional([])) | |||||
"""Should not be set before calling 'origin_visit_add()'.""" | """Should not be set before calling 'origin_visit_add()'.""" | ||||
def to_dict(self): | def to_dict(self): | ||||
ov = attr.asdict(self) | """Serializes the date as a string and omits the visit id if it is | ||||
ov['origin'] = self.origin.to_dict() | `None`.""" | ||||
ov = super().to_dict() | |||||
ov['date'] = str(self.date) | ov['date'] = str(self.date) | ||||
if not ov['visit']: | if ov['visit'] is None: | ||||
del ov['visit'] | del ov['visit'] | ||||
return ov | return ov | ||||
@classmethod | |||||
def from_dict(cls, d): | |||||
"""Parses the date from a string, and accepts missing visit ids.""" | |||||
return cls( | |||||
origin=Origin.from_dict(d['origin']), | |||||
date=dateutil.parser.parse(d['date']), | |||||
visit=d.get('visit')) | |||||
class TargetType(Enum): | class TargetType(Enum): | ||||
"""The type of content pointed to by a snapshot branch. Usually a | |||||
revision or an alias.""" | |||||
CONTENT = 'content' | CONTENT = 'content' | ||||
DIRECTORY = 'directory' | DIRECTORY = 'directory' | ||||
REVISION = 'revision' | REVISION = 'revision' | ||||
RELEASE = 'release' | RELEASE = 'release' | ||||
SNAPSHOT = 'snapshot' | SNAPSHOT = 'snapshot' | ||||
ALIAS = 'alias' | ALIAS = 'alias' | ||||
class ObjectType(Enum): | class ObjectType(Enum): | ||||
"""The type of content pointed to by a release. Usually a revision""" | |||||
CONTENT = 'content' | CONTENT = 'content' | ||||
DIRECTORY = 'directory' | DIRECTORY = 'directory' | ||||
REVISION = 'revision' | REVISION = 'revision' | ||||
RELEASE = 'release' | RELEASE = 'release' | ||||
SNAPSHOT = 'snapshot' | SNAPSHOT = 'snapshot' | ||||
@attr.s | @attr.s | ||||
class SnapshotBranch: | class SnapshotBranch(BaseModel): | ||||
"""Represents one of the branches of a snapshot.""" | |||||
target = attr.ib(type=bytes) | target = attr.ib(type=bytes) | ||||
target_type = attr.ib(type=TargetType) | target_type = attr.ib(type=TargetType) | ||||
@target.validator | @target.validator | ||||
def check_target(self, attribute, value): | def check_target(self, attribute, value): | ||||
"""Checks the target type is not an alias, checks the target is a | |||||
valid sha1_git.""" | |||||
if self.target_type != TargetType.ALIAS: | if self.target_type != TargetType.ALIAS: | ||||
if len(value) != 20: | if len(value) != 20: | ||||
raise ValueError('Wrong length for bytes identifier: %d' % | raise ValueError('Wrong length for bytes identifier: %d' % | ||||
len(value)) | len(value)) | ||||
def to_dict(self): | def to_dict(self): | ||||
branch = attr.asdict(self) | branch = attr.asdict(self) | ||||
branch['target_type'] = branch['target_type'].value | branch['target_type'] = branch['target_type'].value | ||||
return branch | return branch | ||||
@attr.s | @attr.s | ||||
class Snapshot: | class Snapshot(BaseModel): | ||||
"""Represents the full state of an origin at a given point in time.""" | |||||
id = attr.ib(type=Sha1Git) | id = attr.ib(type=Sha1Git) | ||||
branches = attr.ib(type=Dict[bytes, Optional[SnapshotBranch]]) | branches = attr.ib(type=Dict[bytes, Optional[SnapshotBranch]]) | ||||
def to_dict(self): | def to_dict(self): | ||||
return { | return { | ||||
'id': self.id, | 'id': self.id, | ||||
'branches': { | 'branches': { | ||||
name: branch.to_dict() | name: branch.to_dict() | ||||
for (name, branch) in self.branches.items() | for (name, branch) in self.branches.items() | ||||
} | } | ||||
} | } | ||||
@classmethod | |||||
def from_dict(cls, d): | |||||
d['branches'] = { | |||||
name: SnapshotBranch.from_dict(branch) | |||||
for (name, branch) in d['branches'].items() | |||||
} | |||||
return cls(**d) | |||||
@attr.s | @attr.s | ||||
class Release: | class Release(BaseModel): | ||||
id = attr.ib(type=Sha1Git) | id = attr.ib(type=Sha1Git) | ||||
name = attr.ib(type=bytes) | name = attr.ib(type=bytes) | ||||
message = attr.ib(type=bytes) | message = attr.ib(type=bytes) | ||||
date = attr.ib(type=Optional[TimestampWithTimezone]) | target = attr.ib(type=Optional[Sha1Git], | ||||
author = attr.ib(type=Optional[Person]) | validator=attr.validators.optional([])) | ||||
target = attr.ib(type=Optional[Sha1Git]) | |||||
target_type = attr.ib(type=ObjectType) | target_type = attr.ib(type=ObjectType) | ||||
synthetic = attr.ib(type=bool) | synthetic = attr.ib(type=bool) | ||||
author = attr.ib(type=Optional[Person], | |||||
default=None, | |||||
validator=attr.validators.optional([])) | |||||
date = attr.ib(type=Optional[TimestampWithTimezone], | |||||
default=None, | |||||
validator=attr.validators.optional([])) | |||||
@author.validator | |||||
def check_author(self, attribute, value): | |||||
"""If the author is `None`, checks the date is `None` too.""" | |||||
if self.author is None and self.date is not None: | |||||
raise ValueError('release date must be None if author is None.') | |||||
def to_dict(self): | def to_dict(self): | ||||
rel = attr.asdict(self) | rel = attr.asdict(self) | ||||
rel['date'] = self.date.to_dict() if self.date is not None else None | rel['date'] = self.date.to_dict() if self.date is not None else None | ||||
rel['target_type'] = rel['target_type'].value | rel['target_type'] = rel['target_type'].value | ||||
return rel | return rel | ||||
@author.validator | |||||
def check_author(self, attribute, value): | |||||
if self.author is None and self.date is not None: | |||||
raise ValueError('release date must be None if date is None.') | |||||
class RevisionType(Enum): | class RevisionType(Enum): | ||||
GIT = 'git' | GIT = 'git' | ||||
TAR = 'tar' | TAR = 'tar' | ||||
DSC = 'dsc' | DSC = 'dsc' | ||||
SUBVERSION = 'svn' | SUBVERSION = 'svn' | ||||
MERCURIAL = 'hg' | MERCURIAL = 'hg' | ||||
@attr.s | @attr.s | ||||
class Revision: | class Revision(BaseModel): | ||||
id = attr.ib(type=Sha1Git) | id = attr.ib(type=Sha1Git) | ||||
message = attr.ib(type=bytes) | message = attr.ib(type=bytes) | ||||
author = attr.ib(type=Person) | author = attr.ib(type=Person) | ||||
committer = attr.ib(type=Person) | committer = attr.ib(type=Person) | ||||
date = attr.ib(type=TimestampWithTimezone) | date = attr.ib(type=TimestampWithTimezone) | ||||
committer_date = attr.ib(type=TimestampWithTimezone) | committer_date = attr.ib(type=TimestampWithTimezone) | ||||
parents = attr.ib(type=List[Sha1Git]) | |||||
type = attr.ib(type=RevisionType) | type = attr.ib(type=RevisionType) | ||||
directory = attr.ib(type=Sha1Git) | directory = attr.ib(type=Sha1Git) | ||||
metadata = attr.ib(type=Optional[Dict[str, object]]) | |||||
synthetic = attr.ib(type=bool) | synthetic = attr.ib(type=bool) | ||||
metadata = attr.ib(type=Optional[Dict[str, object]], | |||||
default=None, | |||||
validator=attr.validators.optional([])) | |||||
parents = attr.ib(type=List[Sha1Git], | |||||
default=attr.Factory(list)) | |||||
def to_dict(self): | def to_dict(self): | ||||
rev = attr.asdict(self) | rev = attr.asdict(self) | ||||
rev['date'] = self.date.to_dict() | rev['date'] = self.date.to_dict() | ||||
rev['committer_date'] = self.committer_date.to_dict() | rev['committer_date'] = self.committer_date.to_dict() | ||||
rev['type'] = rev['type'].value | rev['type'] = rev['type'].value | ||||
return rev | return rev | ||||
@attr.s | @attr.s | ||||
class DirectoryEntry: | class DirectoryEntry(BaseModel): | ||||
name = attr.ib(type=bytes) | name = attr.ib(type=bytes) | ||||
type = attr.ib(type=str, | type = attr.ib(type=str, | ||||
validator=attr.validators.in_(['file', 'dir', 'rev'])) | validator=attr.validators.in_(['file', 'dir', 'rev'])) | ||||
target = attr.ib(type=Sha1Git) | target = attr.ib(type=Sha1Git) | ||||
perms = attr.ib(type=int) | perms = attr.ib(type=int) | ||||
"""Usually one of the values of `swh.model.from_disk.DentryPerms`.""" | """Usually one of the values of `swh.model.from_disk.DentryPerms`.""" | ||||
def to_dict(self): | |||||
return attr.asdict(self) | |||||
@attr.s | @attr.s | ||||
class Directory: | class Directory(BaseModel): | ||||
id = attr.ib(type=Sha1Git) | id = attr.ib(type=Sha1Git) | ||||
entries = attr.ib(type=List[DirectoryEntry]) | entries = attr.ib(type=List[DirectoryEntry]) | ||||
def to_dict(self): | def to_dict(self): | ||||
dir_ = attr.asdict(self) | dir_ = attr.asdict(self) | ||||
dir_['entries'] = [entry.to_dict() for entry in self.entries] | dir_['entries'] = [entry.to_dict() for entry in self.entries] | ||||
return dir_ | return dir_ | ||||
@classmethod | |||||
def from_dict(cls, d): | |||||
d['entries'] = list(map(DirectoryEntry.from_dict, d['entries'])) | |||||
return super().from_dict(d) | |||||
@attr.s | @attr.s | ||||
class Content: | class Content(BaseModel): | ||||
sha1 = attr.ib(type=bytes) | sha1 = attr.ib(type=bytes) | ||||
sha1_git = attr.ib(type=Sha1Git) | sha1_git = attr.ib(type=Sha1Git) | ||||
sha256 = attr.ib(type=bytes) | sha256 = attr.ib(type=bytes) | ||||
blake2s256 = attr.ib(type=bytes) | blake2s256 = attr.ib(type=bytes) | ||||
data = attr.ib(type=bytes) | |||||
length = attr.ib(type=int) | length = attr.ib(type=int) | ||||
status = attr.ib( | status = attr.ib( | ||||
type=str, | type=str, | ||||
validator=attr.validators.in_(['visible', 'absent', 'hidden'])) | validator=attr.validators.in_(['visible', 'absent', 'hidden'])) | ||||
reason = attr.ib(type=Optional[str]) | reason = attr.ib(type=Optional[str], | ||||
default=None, | |||||
validator=attr.validators.optional([])) | |||||
data = attr.ib(type=Optional[bytes], | |||||
default=None, | |||||
validator=attr.validators.optional([])) | |||||
@length.validator | @length.validator | ||||
def check_length(self, attribute, value): | def check_length(self, attribute, value): | ||||
"""Checks the length is positive.""" | """Checks the length is positive.""" | ||||
if value < 0: | if value < 0: | ||||
raise ValueError('Length must be positive.') | raise ValueError('Length must be positive.') | ||||
@reason.validator | @reason.validator | ||||
Show All 16 Lines |
I'm a bit puzzled by the fact the commit message says:
so I read it like "to_dict already exists"... which looks like it is not the case, at least not everywhere. Maybe a better commit message? Also this needs to be documented since it's now part of the api of the model, so it also needs docstrings.