Changeset View
Changeset View
Standalone View
Standalone View
swh/model/model.py
Show First 20 Lines • Show All 355 Lines • ▼ Show 20 Lines | def from_dict(cls, d): | ||||
d = d.copy() | d = d.copy() | ||||
return cls( | return cls( | ||||
entries=[DirectoryEntry.from_dict(entry) | entries=[DirectoryEntry.from_dict(entry) | ||||
for entry in d.pop('entries')], | for entry in d.pop('entries')], | ||||
**d) | **d) | ||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class Content(BaseModel): | class BaseContent(BaseModel): | ||||
def to_dict(self): | |||||
content = super().to_dict() | |||||
if content['ctime'] is None: | |||||
del content['ctime'] | |||||
return content | |||||
@classmethod | |||||
def from_dict(cls, d, use_subclass=True): | |||||
if use_subclass: | |||||
# Chooses a subclass to instantiate instead. | |||||
if d['status'] == 'absent': | |||||
return SkippedContent.from_dict(d) | |||||
else: | |||||
return Content.from_dict(d) | |||||
else: | |||||
return super().from_dict(d) | |||||
def get_hash(self, hash_name): | |||||
if hash_name not in DEFAULT_ALGORITHMS: | |||||
raise ValueError('{} is not a valid hash name.'.format(hash_name)) | |||||
return getattr(self, hash_name) | |||||
@attr.s(frozen=True) | |||||
class Content(BaseContent): | |||||
sha1 = attr.ib(type=bytes) | sha1 = attr.ib(type=bytes) | ||||
sha1_git = attr.ib(type=Sha1Git) | sha1_git = attr.ib(type=Sha1Git) | ||||
sha256 = attr.ib(type=bytes) | sha256 = attr.ib(type=bytes) | ||||
blake2s256 = attr.ib(type=bytes) | blake2s256 = attr.ib(type=bytes) | ||||
length = attr.ib(type=int) | length = attr.ib(type=int) | ||||
status = attr.ib( | status = attr.ib( | ||||
type=str, | type=str, | ||||
validator=attr.validators.in_(['visible', 'absent', 'hidden'])) | validator=attr.validators.in_(['visible', 'hidden'])) | ||||
reason = attr.ib(type=Optional[str], | |||||
default=None) | |||||
data = attr.ib(type=Optional[bytes], | data = attr.ib(type=Optional[bytes], | ||||
default=None) | default=None) | ||||
ctime = attr.ib(type=Optional[datetime.datetime], | ctime = attr.ib(type=Optional[datetime.datetime], | ||||
default=None) | default=None) | ||||
@length.validator | @length.validator | ||||
def check_length(self, attribute, value): | def check_length(self, attribute, value): | ||||
"""Checks the length is positive.""" | """Checks the length is positive.""" | ||||
if self.status == 'absent' and value < -1: | if self.status != 'absent' and value < 0: | ||||
raise ValueError('Length must be positive or -1.') | raise ValueError('Length must be positive.') | ||||
elif self.status != 'absent' and value < 0: | |||||
raise ValueError('Length must be positive, unless status=absent.') | def to_dict(self): | ||||
content = super().to_dict() | |||||
if content['data'] is None: | |||||
del content['data'] | |||||
return content | |||||
@classmethod | |||||
def from_dict(cls, d): | |||||
return super().from_dict(d, use_subclass=False) | |||||
@attr.s(frozen=True) | |||||
class SkippedContent(BaseContent): | |||||
sha1 = attr.ib(type=Optional[bytes]) | |||||
sha1_git = attr.ib(type=Optional[Sha1Git]) | |||||
sha256 = attr.ib(type=Optional[bytes]) | |||||
blake2s256 = attr.ib(type=Optional[bytes]) | |||||
length = attr.ib(type=Optional[int]) | |||||
status = attr.ib( | |||||
type=str, | |||||
validator=attr.validators.in_(['absent'])) | |||||
reason = attr.ib(type=Optional[str], | |||||
default=None) | |||||
origin = attr.ib(type=Optional[Origin], | |||||
default=None) | |||||
ctime = attr.ib(type=Optional[datetime.datetime], | |||||
default=None) | |||||
@reason.validator | @reason.validator | ||||
def check_reason(self, attribute, value): | def check_reason(self, attribute, value): | ||||
"""Checks the reason is full if status != absent.""" | """Checks the reason is full if status != absent.""" | ||||
assert self.reason == value | assert self.reason == value | ||||
if self.status == 'absent' and value is None: | if value is None: | ||||
raise ValueError('Must provide a reason if content is absent.') | raise ValueError('Must provide a reason if content is absent.') | ||||
elif self.status != 'absent' and value is not None: | |||||
raise ValueError( | @length.validator | ||||
'Must not provide a reason if content is not absent.') | def check_length(self, attribute, value): | ||||
"""Checks the length is positive or -1.""" | |||||
if value is not None and value < -1: | |||||
raise ValueError('Length must be positive or -1.') | |||||
def to_dict(self): | def to_dict(self): | ||||
content = super().to_dict() | content = super().to_dict() | ||||
for field in ('data', 'reason', 'ctime'): | if content['origin'] is None: | ||||
if content[field] is None: | del content['origin'] | ||||
del content[field] | |||||
return content | return content | ||||
def get_hash(self, hash_name): | @classmethod | ||||
if hash_name not in DEFAULT_ALGORITHMS: | def from_dict(cls, d): | ||||
raise ValueError('{} is not a valid hash name.'.format(hash_name)) | d2 = d | ||||
return getattr(self, hash_name) | d = d.copy() | ||||
if d.pop('data', None) is not None: | |||||
raise ValueError('SkippedContent has no "data" attribute %r' % d2) | |||||
return super().from_dict(d, use_subclass=False) |