Changeset View
Changeset View
Standalone View
Standalone View
swh/model/model.py
Show All 22 Lines | class BaseModel: | ||||
"""Base class for SWH model classes. | """Base class for SWH model classes. | ||||
Provides serialization/deserialization to/from Python dictionaries, | Provides serialization/deserialization to/from Python dictionaries, | ||||
that are suitable for JSON/msgpack-like formats.""" | that are suitable for JSON/msgpack-like formats.""" | ||||
def to_dict(self): | def to_dict(self): | ||||
"""Wrapper of `attr.asdict` that can be overridden by subclasses | """Wrapper of `attr.asdict` that can be overridden by subclasses | ||||
that have special handling of some of the fields.""" | that have special handling of some of the fields.""" | ||||
return attr.asdict(self) | |||||
def dictify(value): | |||||
if isinstance(value, BaseModel): | |||||
return value.to_dict() | |||||
elif isinstance(value, Enum): | |||||
return value.value | |||||
elif isinstance(value, dict): | |||||
return {k: dictify(v) for k, v in value.items()} | |||||
elif isinstance(value, list): | |||||
return [dictify(v) for v in value] | |||||
else: | |||||
return value | |||||
ret = attr.asdict(self, recurse=False) | |||||
return dictify(ret) | |||||
@classmethod | @classmethod | ||||
def from_dict(cls, d): | def from_dict(cls, d): | ||||
"""Takes a dictionary representing a tree of SWH objects, and | """Takes a dictionary representing a tree of SWH objects, and | ||||
recursively builds the corresponding objects.""" | recursively builds the corresponding objects.""" | ||||
return cls(**d) | return cls(**d) | ||||
▲ Show 20 Lines • Show All 82 Lines • ▼ Show 20 Lines | class OriginVisit(BaseModel): | ||||
"""Should not be set before calling 'origin_visit_add()'.""" | """Should not be set before calling 'origin_visit_add()'.""" | ||||
def to_dict(self): | def to_dict(self): | ||||
"""Serializes the date as a string and omits the visit id if it is | """Serializes the date as a string and omits the visit id if it is | ||||
`None`.""" | `None`.""" | ||||
ov = super().to_dict() | ov = super().to_dict() | ||||
if ov['visit'] is None: | if ov['visit'] is None: | ||||
del ov['visit'] | del ov['visit'] | ||||
ov['origin'] = self.origin.to_dict() | |||||
return ov | return ov | ||||
@classmethod | @classmethod | ||||
def from_dict(cls, d): | def from_dict(cls, d): | ||||
"""Parses the date from a string, and accepts missing visit ids.""" | """Parses the date from a string, and accepts missing visit ids.""" | ||||
d = d.copy() | d = d.copy() | ||||
date = d.pop('date') | date = d.pop('date') | ||||
return cls( | return cls( | ||||
Show All 34 Lines | class SnapshotBranch(BaseModel): | ||||
def check_target(self, attribute, value): | def check_target(self, attribute, value): | ||||
"""Checks the target type is not an alias, checks the target is a | """Checks the target type is not an alias, checks the target is a | ||||
valid sha1_git.""" | valid sha1_git.""" | ||||
if self.target_type != TargetType.ALIAS: | if self.target_type != TargetType.ALIAS: | ||||
if len(value) != 20: | if len(value) != 20: | ||||
raise ValueError('Wrong length for bytes identifier: %d' % | raise ValueError('Wrong length for bytes identifier: %d' % | ||||
len(value)) | len(value)) | ||||
def to_dict(self): | |||||
branch = attr.asdict(self) | |||||
branch['target_type'] = branch['target_type'].value | |||||
return branch | |||||
@classmethod | @classmethod | ||||
def from_dict(cls, d): | def from_dict(cls, d): | ||||
return cls( | return cls( | ||||
target=d['target'], | target=d['target'], | ||||
target_type=TargetType(d['target_type'])) | target_type=TargetType(d['target_type'])) | ||||
@attr.s | @attr.s | ||||
class Snapshot(BaseModel): | class Snapshot(BaseModel): | ||||
"""Represents the full state of an origin at a given point in time.""" | """Represents the full state of an origin at a given point in time.""" | ||||
id = attr.ib(type=Sha1Git) | id = attr.ib(type=Sha1Git) | ||||
branches = attr.ib(type=Dict[bytes, Optional[SnapshotBranch]]) | branches = attr.ib(type=Dict[bytes, Optional[SnapshotBranch]]) | ||||
def to_dict(self): | |||||
return { | |||||
'id': self.id, | |||||
'branches': { | |||||
name: branch.to_dict() if branch else None | |||||
for (name, branch) in self.branches.items() | |||||
} | |||||
} | |||||
@classmethod | @classmethod | ||||
def from_dict(cls, d): | def from_dict(cls, d): | ||||
return cls( | return cls( | ||||
id=d['id'], | id=d['id'], | ||||
branches={ | branches={ | ||||
name: SnapshotBranch.from_dict(branch) if branch else None | name: SnapshotBranch.from_dict(branch) if branch else None | ||||
for (name, branch) in d['branches'].items() | for (name, branch) in d['branches'].items() | ||||
}) | }) | ||||
Show All 16 Lines | class Release(BaseModel): | ||||
@author.validator | @author.validator | ||||
def check_author(self, attribute, value): | def check_author(self, attribute, value): | ||||
"""If the author is `None`, checks the date is `None` too.""" | """If the author is `None`, checks the date is `None` too.""" | ||||
if self.author is None and self.date is not None: | if self.author is None and self.date is not None: | ||||
raise ValueError('release date must be None if author is None.') | raise ValueError('release date must be None if author is None.') | ||||
def to_dict(self): | def to_dict(self): | ||||
rel = attr.asdict(self) | rel = super().to_dict() | ||||
rel['date'] = self.date.to_dict() if self.date is not None else None | |||||
rel['target_type'] = rel['target_type'].value | |||||
if rel['metadata'] is None: | if rel['metadata'] is None: | ||||
del rel['metadata'] | del rel['metadata'] | ||||
return rel | return rel | ||||
@classmethod | @classmethod | ||||
def from_dict(cls, d): | def from_dict(cls, d): | ||||
d = d.copy() | d = d.copy() | ||||
if d.get('author'): | if d.get('author'): | ||||
Show All 24 Lines | class Revision(BaseModel): | ||||
type = attr.ib(type=RevisionType) | type = attr.ib(type=RevisionType) | ||||
directory = attr.ib(type=Sha1Git) | directory = attr.ib(type=Sha1Git) | ||||
synthetic = attr.ib(type=bool) | synthetic = attr.ib(type=bool) | ||||
metadata = attr.ib(type=Optional[Dict[str, object]], | metadata = attr.ib(type=Optional[Dict[str, object]], | ||||
default=None) | default=None) | ||||
parents = attr.ib(type=List[Sha1Git], | parents = attr.ib(type=List[Sha1Git], | ||||
default=attr.Factory(list)) | default=attr.Factory(list)) | ||||
def to_dict(self): | |||||
rev = attr.asdict(self) | |||||
rev['date'] = self.date.to_dict() | |||||
rev['committer_date'] = self.committer_date.to_dict() | |||||
rev['type'] = rev['type'].value | |||||
return rev | |||||
@classmethod | @classmethod | ||||
def from_dict(cls, d): | def from_dict(cls, d): | ||||
d = d.copy() | d = d.copy() | ||||
return cls( | return cls( | ||||
id=d.pop('id'), | id=d.pop('id'), | ||||
author=Person.from_dict(d.pop('author')), | author=Person.from_dict(d.pop('author')), | ||||
committer=Person.from_dict(d.pop('committer')), | committer=Person.from_dict(d.pop('committer')), | ||||
date=TimestampWithTimezone.from_dict(d.pop('date')), | date=TimestampWithTimezone.from_dict(d.pop('date')), | ||||
Show All 13 Lines | class DirectoryEntry(BaseModel): | ||||
"""Usually one of the values of `swh.model.from_disk.DentryPerms`.""" | """Usually one of the values of `swh.model.from_disk.DentryPerms`.""" | ||||
@attr.s | @attr.s | ||||
class Directory(BaseModel): | class Directory(BaseModel): | ||||
id = attr.ib(type=Sha1Git) | id = attr.ib(type=Sha1Git) | ||||
entries = attr.ib(type=List[DirectoryEntry]) | entries = attr.ib(type=List[DirectoryEntry]) | ||||
def to_dict(self): | |||||
dir_ = attr.asdict(self) | |||||
dir_['entries'] = [entry.to_dict() for entry in self.entries] | |||||
return dir_ | |||||
@classmethod | @classmethod | ||||
def from_dict(cls, d): | def from_dict(cls, d): | ||||
return cls( | return cls( | ||||
id=d['id'], | id=d['id'], | ||||
entries=[DirectoryEntry.from_dict(entry) | entries=[DirectoryEntry.from_dict(entry) | ||||
for entry in d['entries']]) | for entry in d['entries']]) | ||||
Show All 30 Lines | def check_reason(self, attribute, value): | ||||
assert self.reason == value | assert self.reason == value | ||||
if self.status == 'absent' and value is None: | if self.status == 'absent' and value is None: | ||||
raise ValueError('Must provide a reason if content is absent.') | raise ValueError('Must provide a reason if content is absent.') | ||||
elif self.status != 'absent' and value is not None: | elif self.status != 'absent' and value is not None: | ||||
raise ValueError( | raise ValueError( | ||||
'Must not provide a reason if content is not absent.') | 'Must not provide a reason if content is not absent.') | ||||
def to_dict(self): | def to_dict(self): | ||||
content = attr.asdict(self) | content = super().to_dict() | ||||
for field in ('data', 'reason', 'ctime'): | for field in ('data', 'reason', 'ctime'): | ||||
if content[field] is None: | if content[field] is None: | ||||
del content[field] | del content[field] | ||||
return content | return content | ||||
def get_hash(self, hash_name): | def get_hash(self, hash_name): | ||||
if hash_name not in DEFAULT_ALGORITHMS: | if hash_name not in DEFAULT_ALGORITHMS: | ||||
raise ValueError('{} is not a valid hash name.'.format(hash_name)) | raise ValueError('{} is not a valid hash name.'.format(hash_name)) | ||||
return getattr(self, hash_name) | return getattr(self, hash_name) |