Changeset View
Changeset View
Standalone View
Standalone View
swh/model/model.py
# Copyright (C) 2018-2019 The Software Heritage developers | # Copyright (C) 2018-2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import datetime | import datetime | ||||
from abc import ABCMeta, abstractmethod | from abc import ABCMeta, abstractmethod | ||||
from enum import Enum | from enum import Enum | ||||
from typing import List, Optional, Dict, Union | from typing import Dict, List, Optional, Union | ||||
import attr | import attr | ||||
from attrs_strict import type_validator | |||||
import dateutil.parser | import dateutil.parser | ||||
import iso8601 | import iso8601 | ||||
from .identifiers import ( | from .identifiers import ( | ||||
normalize_timestamp, directory_identifier, revision_identifier, | normalize_timestamp, directory_identifier, revision_identifier, | ||||
release_identifier, snapshot_identifier | release_identifier, snapshot_identifier | ||||
) | ) | ||||
from .hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, MultiHash | from .hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, MultiHash | ||||
class MissingData(Exception): | class MissingData(Exception): | ||||
"""Raised by `Content.with_data` when it has no way of fetching the | """Raised by `Content.with_data` when it has no way of fetching the | ||||
data (but not when fetching the data fails).""" | data (but not when fetching the data fails).""" | ||||
pass | pass | ||||
SHA1_SIZE = 20 | SHA1_SIZE = 20 | ||||
# TODO: Limit this to 20 bytes | # TODO: Limit this to 20 bytes | ||||
Sha1Git = bytes | Sha1Git = bytes | ||||
def dictify(value): | def dictify(value): | ||||
"Helper function used by BaseModel.to_dict()" | "Helper function used by BaseModel.to_dict()" | ||||
olasd: Can we get a proper name for this function? something like `attrib_typecheck` would be way more… | |||||
if isinstance(value, BaseModel): | if isinstance(value, BaseModel): | ||||
return value.to_dict() | return value.to_dict() | ||||
elif isinstance(value, Enum): | elif isinstance(value, Enum): | ||||
return value.value | return value.value | ||||
elif isinstance(value, dict): | elif isinstance(value, dict): | ||||
return {k: dictify(v) for k, v in value.items()} | return {k: dictify(v) for k, v in value.items()} | ||||
elif isinstance(value, list): | elif isinstance(value, list): | ||||
return [dictify(v) for v in value] | return [dictify(v) for v in value] | ||||
Show All 34 Lines | def __attrs_post_init__(self): | ||||
if not self.id: | if not self.id: | ||||
obj_id = hash_to_bytes(self.compute_hash(self.to_dict())) | obj_id = hash_to_bytes(self.compute_hash(self.to_dict())) | ||||
object.__setattr__(self, 'id', obj_id) | object.__setattr__(self, 'id', obj_id) | ||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class Person(BaseModel): | class Person(BaseModel): | ||||
"""Represents the author/committer of a revision or release.""" | """Represents the author/committer of a revision or release.""" | ||||
fullname = attr.ib(type=bytes) | fullname = attr.ib( | ||||
name = attr.ib(type=Optional[bytes]) | type=bytes, | ||||
email = attr.ib(type=Optional[bytes]) | validator=type_validator()) | ||||
name = attr.ib( | |||||
type=Optional[bytes], | |||||
validator=type_validator()) | |||||
email = attr.ib( | |||||
type=Optional[bytes], | |||||
validator=type_validator()) | |||||
@classmethod | @classmethod | ||||
def from_fullname(cls, fullname: bytes): | def from_fullname(cls, fullname: bytes): | ||||
"""Returns a Person object, by guessing the name and email from the | """Returns a Person object, by guessing the name and email from the | ||||
fullname, in the `name <email>` format. | fullname, in the `name <email>` format. | ||||
The fullname is left unchanged.""" | The fullname is left unchanged.""" | ||||
if fullname is None: | if fullname is None: | ||||
Show All 28 Lines | def from_fullname(cls, fullname: bytes): | ||||
email=email or None, | email=email or None, | ||||
fullname=fullname, | fullname=fullname, | ||||
) | ) | ||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class Timestamp(BaseModel): | class Timestamp(BaseModel): | ||||
"""Represents a naive timestamp from a VCS.""" | """Represents a naive timestamp from a VCS.""" | ||||
seconds = attr.ib(type=int) | seconds = attr.ib( | ||||
microseconds = attr.ib(type=int) | type=int, | ||||
validator=type_validator()) | |||||
microseconds = attr.ib( | |||||
type=int, | |||||
validator=type_validator()) | |||||
@seconds.validator | @seconds.validator | ||||
def check_seconds(self, attribute, value): | def check_seconds(self, attribute, value): | ||||
"""Check that seconds fit in a 64-bits signed integer.""" | """Check that seconds fit in a 64-bits signed integer.""" | ||||
if not (-2**63 <= value < 2**63): | if not (-2**63 <= value < 2**63): | ||||
raise ValueError('Seconds must be a signed 64-bits integer.') | raise ValueError('Seconds must be a signed 64-bits integer.') | ||||
@microseconds.validator | @microseconds.validator | ||||
def check_microseconds(self, attribute, value): | def check_microseconds(self, attribute, value): | ||||
"""Checks that microseconds are positive and < 1000000.""" | """Checks that microseconds are positive and < 1000000.""" | ||||
if not (0 <= value < 10**6): | if not (0 <= value < 10**6): | ||||
raise ValueError('Microseconds must be in [0, 1000000[.') | raise ValueError('Microseconds must be in [0, 1000000[.') | ||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class TimestampWithTimezone(BaseModel): | class TimestampWithTimezone(BaseModel): | ||||
"""Represents a TZ-aware timestamp from a VCS.""" | """Represents a TZ-aware timestamp from a VCS.""" | ||||
timestamp = attr.ib(type=Timestamp) | timestamp = attr.ib( | ||||
offset = attr.ib(type=int) | type=Timestamp, | ||||
negative_utc = attr.ib(type=bool) | validator=type_validator()) | ||||
offset = attr.ib( | |||||
type=int, | |||||
validator=type_validator()) | |||||
negative_utc = attr.ib( | |||||
type=bool, | |||||
validator=type_validator()) | |||||
@offset.validator | @offset.validator | ||||
def check_offset(self, attribute, value): | def check_offset(self, attribute, value): | ||||
"""Checks the offset is a 16-bits signed integer (in theory, it | """Checks the offset is a 16-bits signed integer (in theory, it | ||||
should always be between -14 and +14 hours).""" | should always be between -14 and +14 hours).""" | ||||
if not (-2**15 <= value < 2**15): | if not (-2**15 <= value < 2**15): | ||||
# max 14 hours offset in theory, but you never know what | # max 14 hours offset in theory, but you never know what | ||||
# you'll find in the wild... | # you'll find in the wild... | ||||
Show All 24 Lines | def from_iso8601(cls, s): | ||||
if dt.tzname() == '-00:00': | if dt.tzname() == '-00:00': | ||||
tstz = attr.evolve(tstz, negative_utc=True) | tstz = attr.evolve(tstz, negative_utc=True) | ||||
return tstz | return tstz | ||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class Origin(BaseModel): | class Origin(BaseModel): | ||||
"""Represents a software source: a VCS and an URL.""" | """Represents a software source: a VCS and an URL.""" | ||||
url = attr.ib(type=str) | url = attr.ib( | ||||
type=str, | |||||
validator=type_validator()) | |||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class OriginVisit(BaseModel): | class OriginVisit(BaseModel): | ||||
"""Represents a visit of an origin at a given point in time, by a | """Represents a visit of an origin at a given point in time, by a | ||||
SWH loader.""" | SWH loader.""" | ||||
origin = attr.ib(type=str) | origin = attr.ib( | ||||
date = attr.ib(type=datetime.datetime) | type=str, | ||||
validator=type_validator()) | |||||
date = attr.ib( | |||||
type=datetime.datetime, | |||||
validator=type_validator()) | |||||
status = attr.ib( | status = attr.ib( | ||||
type=str, | type=str, | ||||
validator=attr.validators.in_(['ongoing', 'full', 'partial'])) | validator=attr.validators.in_(['ongoing', 'full', 'partial'])) | ||||
type = attr.ib(type=str) | type = attr.ib( | ||||
snapshot = attr.ib(type=Optional[Sha1Git]) | type=str, | ||||
metadata = attr.ib(type=Optional[Dict[str, object]], | validator=type_validator()) | ||||
default=None) | snapshot = attr.ib( | ||||
type=Optional[Sha1Git], | |||||
visit = attr.ib(type=Optional[int], | validator=type_validator()) | ||||
metadata = attr.ib( | |||||
type=Optional[Dict[str, object]], | |||||
validator=type_validator(), | |||||
default=None) | |||||
visit = attr.ib( | |||||
type=Optional[int], | |||||
validator=type_validator(), | |||||
default=None) | default=None) | ||||
"""Should not be set before calling 'origin_visit_add()'.""" | """Should not be set before calling 'origin_visit_add()'.""" | ||||
def to_dict(self): | def to_dict(self): | ||||
"""Serializes the date as a string and omits the visit id if it is | """Serializes the date as a string and omits the visit id if it is | ||||
`None`.""" | `None`.""" | ||||
ov = super().to_dict() | ov = super().to_dict() | ||||
if ov['visit'] is None: | if ov['visit'] is None: | ||||
del ov['visit'] | del ov['visit'] | ||||
return ov | return ov | ||||
@classmethod | @classmethod | ||||
def from_dict(cls, d): | def from_dict(cls, d): | ||||
"""Parses the date from a string, and accepts missing visit ids.""" | """Parses the date from a string, and accepts missing visit ids.""" | ||||
if isinstance(d['date'], str): | |||||
d = d.copy() | d = d.copy() | ||||
date = d.pop('date') | d['date'] = dateutil.parser.parse(d['date']) | ||||
return cls( | return super().from_dict(d) | ||||
date=(date | |||||
if isinstance(date, datetime.datetime) | |||||
else dateutil.parser.parse(date)), | |||||
**d) | |||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class OriginVisitUpdate(BaseModel): | class OriginVisitUpdate(BaseModel): | ||||
"""Represents a visit update of an origin at a given point in time. | """Represents a visit update of an origin at a given point in time. | ||||
""" | """ | ||||
origin = attr.ib(type=str) | origin = attr.ib( | ||||
visit = attr.ib(type=int) | type=str, | ||||
validator=type_validator()) | |||||
date = attr.ib(type=datetime.datetime) | visit = attr.ib( | ||||
type=int, | |||||
validator=type_validator()) | |||||
date = attr.ib( | |||||
type=datetime.datetime, | |||||
validator=type_validator()) | |||||
status = attr.ib( | status = attr.ib( | ||||
type=str, | type=str, | ||||
validator=attr.validators.in_(['ongoing', 'full', 'partial'])) | validator=attr.validators.in_(['ongoing', 'full', 'partial'])) | ||||
snapshot = attr.ib(type=Optional[Sha1Git]) | snapshot = attr.ib( | ||||
metadata = attr.ib(type=Optional[Dict[str, object]], | type=Optional[Sha1Git], | ||||
validator=type_validator()) | |||||
metadata = attr.ib( | |||||
type=Optional[Dict[str, object]], | |||||
validator=type_validator(), | |||||
default=None) | default=None) | ||||
class TargetType(Enum): | class TargetType(Enum): | ||||
"""The type of content pointed to by a snapshot branch. Usually a | """The type of content pointed to by a snapshot branch. Usually a | ||||
revision or an alias.""" | revision or an alias.""" | ||||
CONTENT = 'content' | CONTENT = 'content' | ||||
DIRECTORY = 'directory' | DIRECTORY = 'directory' | ||||
REVISION = 'revision' | REVISION = 'revision' | ||||
Show All 9 Lines | class ObjectType(Enum): | ||||
REVISION = 'revision' | REVISION = 'revision' | ||||
RELEASE = 'release' | RELEASE = 'release' | ||||
SNAPSHOT = 'snapshot' | SNAPSHOT = 'snapshot' | ||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class SnapshotBranch(BaseModel): | class SnapshotBranch(BaseModel): | ||||
"""Represents one of the branches of a snapshot.""" | """Represents one of the branches of a snapshot.""" | ||||
target = attr.ib(type=bytes) | target = attr.ib( | ||||
target_type = attr.ib(type=TargetType) | type=bytes, | ||||
validator=type_validator()) | |||||
target_type = attr.ib( | |||||
type=TargetType, | |||||
validator=type_validator()) | |||||
@target.validator | @target.validator | ||||
def check_target(self, attribute, value): | def check_target(self, attribute, value): | ||||
"""Checks the target type is not an alias, checks the target is a | """Checks the target type is not an alias, checks the target is a | ||||
valid sha1_git.""" | valid sha1_git.""" | ||||
if self.target_type != TargetType.ALIAS and self.target is not None: | if self.target_type != TargetType.ALIAS and self.target is not None: | ||||
if len(value) != 20: | if len(value) != 20: | ||||
raise ValueError('Wrong length for bytes identifier: %d' % | raise ValueError('Wrong length for bytes identifier: %d' % | ||||
len(value)) | len(value)) | ||||
@classmethod | @classmethod | ||||
def from_dict(cls, d): | def from_dict(cls, d): | ||||
return cls( | return cls( | ||||
target=d['target'], | target=d['target'], | ||||
target_type=TargetType(d['target_type'])) | target_type=TargetType(d['target_type'])) | ||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class Snapshot(BaseModel, HashableObject): | class Snapshot(BaseModel, HashableObject): | ||||
"""Represents the full state of an origin at a given point in time.""" | """Represents the full state of an origin at a given point in time.""" | ||||
branches = attr.ib(type=Dict[bytes, Optional[SnapshotBranch]]) | branches = attr.ib( | ||||
id = attr.ib(type=Sha1Git, default=b'') | type=Dict[bytes, Optional[SnapshotBranch]], | ||||
validator=type_validator()) | |||||
id = attr.ib( | |||||
type=Sha1Git, | |||||
validator=type_validator(), | |||||
default=b'') | |||||
@staticmethod | @staticmethod | ||||
def compute_hash(object_dict): | def compute_hash(object_dict): | ||||
return snapshot_identifier(object_dict) | return snapshot_identifier(object_dict) | ||||
@classmethod | @classmethod | ||||
def from_dict(cls, d): | def from_dict(cls, d): | ||||
d = d.copy() | d = d.copy() | ||||
return cls( | return cls( | ||||
branches={ | branches={ | ||||
name: SnapshotBranch.from_dict(branch) if branch else None | name: SnapshotBranch.from_dict(branch) if branch else None | ||||
for (name, branch) in d.pop('branches').items() | for (name, branch) in d.pop('branches').items() | ||||
}, | }, | ||||
**d) | **d) | ||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class Release(BaseModel, HashableObject): | class Release(BaseModel, HashableObject): | ||||
name = attr.ib(type=bytes) | name = attr.ib( | ||||
message = attr.ib(type=bytes) | type=bytes, | ||||
target = attr.ib(type=Optional[Sha1Git]) | validator=type_validator()) | ||||
target_type = attr.ib(type=ObjectType) | message = attr.ib( | ||||
synthetic = attr.ib(type=bool) | type=bytes, | ||||
author = attr.ib(type=Optional[Person], | validator=type_validator()) | ||||
default=None) | target = attr.ib( | ||||
date = attr.ib(type=Optional[TimestampWithTimezone], | type=Optional[Sha1Git], | ||||
default=None) | validator=type_validator()) | ||||
metadata = attr.ib(type=Optional[Dict[str, object]], | target_type = attr.ib( | ||||
default=None) | type=ObjectType, | ||||
id = attr.ib(type=Sha1Git, default=b'') | validator=type_validator()) | ||||
synthetic = attr.ib( | |||||
type=bool, | |||||
validator=type_validator()) | |||||
author = attr.ib( | |||||
type=Optional[Person], | |||||
validator=type_validator(), | |||||
default=None) | |||||
date = attr.ib( | |||||
type=Optional[TimestampWithTimezone], | |||||
validator=type_validator(), | |||||
default=None) | |||||
metadata = attr.ib( | |||||
type=Optional[Dict[str, object]], | |||||
validator=type_validator(), | |||||
default=None) | |||||
id = attr.ib( | |||||
type=Sha1Git, | |||||
validator=type_validator(), | |||||
default=b'') | |||||
@staticmethod | @staticmethod | ||||
def compute_hash(object_dict): | def compute_hash(object_dict): | ||||
return release_identifier(object_dict) | return release_identifier(object_dict) | ||||
@author.validator | @author.validator | ||||
def check_author(self, attribute, value): | def check_author(self, attribute, value): | ||||
"""If the author is `None`, checks the date is `None` too.""" | """If the author is `None`, checks the date is `None` too.""" | ||||
Show All 23 Lines | class RevisionType(Enum): | ||||
TAR = 'tar' | TAR = 'tar' | ||||
DSC = 'dsc' | DSC = 'dsc' | ||||
SUBVERSION = 'svn' | SUBVERSION = 'svn' | ||||
MERCURIAL = 'hg' | MERCURIAL = 'hg' | ||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class Revision(BaseModel, HashableObject): | class Revision(BaseModel, HashableObject): | ||||
message = attr.ib(type=bytes) | message = attr.ib( | ||||
author = attr.ib(type=Person) | type=bytes, | ||||
committer = attr.ib(type=Person) | validator=type_validator()) | ||||
date = attr.ib(type=Optional[TimestampWithTimezone]) | author = attr.ib( | ||||
committer_date = attr.ib(type=Optional[TimestampWithTimezone]) | type=Person, | ||||
type = attr.ib(type=RevisionType) | validator=type_validator()) | ||||
directory = attr.ib(type=Sha1Git) | committer = attr.ib( | ||||
synthetic = attr.ib(type=bool) | type=Person, | ||||
metadata = attr.ib(type=Optional[Dict[str, object]], | validator=type_validator()) | ||||
default=None) | date = attr.ib( | ||||
parents = attr.ib(type=List[Sha1Git], | type=Optional[TimestampWithTimezone], | ||||
validator=type_validator()) | |||||
committer_date = attr.ib( | |||||
type=Optional[TimestampWithTimezone], | |||||
validator=type_validator()) | |||||
type = attr.ib( | |||||
type=RevisionType, | |||||
validator=type_validator()) | |||||
directory = attr.ib( | |||||
type=Sha1Git, | |||||
validator=type_validator()) | |||||
synthetic = attr.ib( | |||||
type=bool, | |||||
validator=type_validator()) | |||||
metadata = attr.ib( | |||||
type=Optional[Dict[str, object]], | |||||
validator=type_validator(), | |||||
default=None) | |||||
parents = attr.ib( | |||||
type=List[Sha1Git], | |||||
validator=type_validator(), | |||||
default=attr.Factory(list)) | default=attr.Factory(list)) | ||||
id = attr.ib(type=Sha1Git, default=b'') | id = attr.ib( | ||||
type=Sha1Git, | |||||
validator=type_validator(), | |||||
default=b'') | |||||
@staticmethod | @staticmethod | ||||
def compute_hash(object_dict): | def compute_hash(object_dict): | ||||
return revision_identifier(object_dict) | return revision_identifier(object_dict) | ||||
@classmethod | @classmethod | ||||
def from_dict(cls, d): | def from_dict(cls, d): | ||||
d = d.copy() | d = d.copy() | ||||
Show All 12 Lines | def from_dict(cls, d): | ||||
date=date, | date=date, | ||||
committer_date=committer_date, | committer_date=committer_date, | ||||
type=RevisionType(d.pop('type')), | type=RevisionType(d.pop('type')), | ||||
**d) | **d) | ||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class DirectoryEntry(BaseModel): | class DirectoryEntry(BaseModel): | ||||
name = attr.ib(type=bytes) | name = attr.ib( | ||||
type = attr.ib(type=str, | type=bytes, | ||||
validator=type_validator()) | |||||
type = attr.ib( | |||||
type=str, | |||||
validator=attr.validators.in_(['file', 'dir', 'rev'])) | validator=attr.validators.in_(['file', 'dir', 'rev'])) | ||||
target = attr.ib(type=Sha1Git) | target = attr.ib( | ||||
perms = attr.ib(type=int) | type=Sha1Git, | ||||
validator=type_validator()) | |||||
perms = attr.ib( | |||||
type=int, | |||||
validator=type_validator()) | |||||
"""Usually one of the values of `swh.model.from_disk.DentryPerms`.""" | """Usually one of the values of `swh.model.from_disk.DentryPerms`.""" | ||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class Directory(BaseModel, HashableObject): | class Directory(BaseModel, HashableObject): | ||||
entries = attr.ib(type=List[DirectoryEntry]) | entries = attr.ib( | ||||
id = attr.ib(type=Sha1Git, default=b'') | type=List[DirectoryEntry], | ||||
validator=type_validator()) | |||||
id = attr.ib( | |||||
type=Sha1Git, | |||||
validator=type_validator(), | |||||
default=b'') | |||||
@staticmethod | @staticmethod | ||||
def compute_hash(object_dict): | def compute_hash(object_dict): | ||||
return directory_identifier(object_dict) | return directory_identifier(object_dict) | ||||
@classmethod | @classmethod | ||||
def from_dict(cls, d): | def from_dict(cls, d): | ||||
d = d.copy() | d = d.copy() | ||||
▲ Show 20 Lines • Show All 42 Lines • ▼ Show 20 Lines | class BaseContent(BaseModel): | ||||
def hashes(self) -> Dict[str, bytes]: | def hashes(self) -> Dict[str, bytes]: | ||||
"""Returns a dictionary {hash_name: hash_value}""" | """Returns a dictionary {hash_name: hash_value}""" | ||||
return {algo: getattr(self, algo) for algo in DEFAULT_ALGORITHMS} | return {algo: getattr(self, algo) for algo in DEFAULT_ALGORITHMS} | ||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class Content(BaseContent): | class Content(BaseContent): | ||||
sha1 = attr.ib(type=bytes) | sha1 = attr.ib( | ||||
sha1_git = attr.ib(type=Sha1Git) | type=bytes, | ||||
sha256 = attr.ib(type=bytes) | validator=type_validator()) | ||||
blake2s256 = attr.ib(type=bytes) | sha1_git = attr.ib( | ||||
type=Sha1Git, | |||||
length = attr.ib(type=int) | validator=type_validator()) | ||||
sha256 = attr.ib( | |||||
type=bytes, | |||||
validator=type_validator()) | |||||
blake2s256 = attr.ib( | |||||
type=bytes, | |||||
validator=type_validator()) | |||||
length = attr.ib( | |||||
type=int, | |||||
validator=type_validator()) | |||||
status = attr.ib( | status = attr.ib( | ||||
type=str, | type=str, | ||||
default='visible', | validator=attr.validators.in_(['visible', 'hidden']), | ||||
validator=attr.validators.in_(['visible', 'hidden'])) | default='visible') | ||||
Not Done Inline Actionscan't you adapt ibv to allow the validator override? ardumont: can't you adapt ibv to allow the validator override? | |||||
data = attr.ib(type=Optional[bytes], default=None) | data = attr.ib( | ||||
type=Optional[bytes], | |||||
validator=type_validator(), | |||||
default=None) | |||||
ctime = attr.ib(type=Optional[datetime.datetime], | ctime = attr.ib( | ||||
type=Optional[datetime.datetime], | |||||
validator=type_validator(), | |||||
default=None) | default=None) | ||||
@length.validator | @length.validator | ||||
def check_length(self, attribute, value): | def check_length(self, attribute, value): | ||||
"""Checks the length is positive.""" | """Checks the length is positive.""" | ||||
if value < 0: | if value < 0: | ||||
raise ValueError('Length must be positive.') | raise ValueError('Length must be positive.') | ||||
def to_dict(self): | def to_dict(self): | ||||
Show All 25 Lines | def with_data(self) -> 'Content': | ||||
to lazy-load data (eg. from disk or objstorage).""" | to lazy-load data (eg. from disk or objstorage).""" | ||||
if self.data is None: | if self.data is None: | ||||
raise MissingData('Content data is None.') | raise MissingData('Content data is None.') | ||||
return self | return self | ||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class SkippedContent(BaseContent): | class SkippedContent(BaseContent): | ||||
sha1 = attr.ib(type=Optional[bytes]) | sha1 = attr.ib( | ||||
sha1_git = attr.ib(type=Optional[Sha1Git]) | type=Optional[bytes], | ||||
sha256 = attr.ib(type=Optional[bytes]) | validator=type_validator()) | ||||
blake2s256 = attr.ib(type=Optional[bytes]) | sha1_git = attr.ib( | ||||
type=Optional[Sha1Git], | |||||
length = attr.ib(type=Optional[int]) | validator=type_validator()) | ||||
sha256 = attr.ib( | |||||
type=Optional[bytes], | |||||
validator=type_validator()) | |||||
blake2s256 = attr.ib( | |||||
type=Optional[bytes], | |||||
validator=type_validator()) | |||||
length = attr.ib( | |||||
type=Optional[int], | |||||
validator=type_validator()) | |||||
status = attr.ib( | status = attr.ib( | ||||
type=str, | type=str, | ||||
validator=attr.validators.in_(['absent'])) | validator=attr.validators.in_(['absent'])) | ||||
reason = attr.ib(type=Optional[str], | reason = attr.ib( | ||||
type=Optional[str], | |||||
validator=type_validator(), | |||||
default=None) | default=None) | ||||
origin = attr.ib(type=Optional[Origin], | origin = attr.ib( | ||||
type=Optional[Origin], | |||||
validator=type_validator(), | |||||
default=None) | default=None) | ||||
ctime = attr.ib(type=Optional[datetime.datetime], | ctime = attr.ib( | ||||
type=Optional[datetime.datetime], | |||||
validator=type_validator(), | |||||
default=None) | default=None) | ||||
@reason.validator | @reason.validator | ||||
def check_reason(self, attribute, value): | def check_reason(self, attribute, value): | ||||
"""Checks the reason is full if status != absent.""" | """Checks the reason is full if status != absent.""" | ||||
assert self.reason == value | assert self.reason == value | ||||
if value is None: | if value is None: | ||||
raise ValueError('Must provide a reason if content is absent.') | raise ValueError('Must provide a reason if content is absent.') | ||||
Show All 35 Lines |
Can we get a proper name for this function? something like attrib_typecheck would be way more sensible.
It'd also be nice to support extra validators to be passed as arguments (though I guess our current attributes with validators are already stricter than a plain typecheck). If we do that then we can have all our attributes use this consistently, I guess.