Changeset View
Changeset View
Standalone View
Standalone View
swh/model/model.py
Show First 20 Lines • Show All 45 Lines • ▼ Show 20 Lines | class BaseModel: | ||||
@classmethod | @classmethod | ||||
def from_dict(cls, d): | def from_dict(cls, d): | ||||
"""Takes a dictionary representing a tree of SWH objects, and | """Takes a dictionary representing a tree of SWH objects, and | ||||
recursively builds the corresponding objects.""" | recursively builds the corresponding objects.""" | ||||
return cls(**d) | return cls(**d) | ||||
@attr.s | @attr.s(frozen=True) | ||||
class Person(BaseModel): | class Person(BaseModel): | ||||
"""Represents the author/committer of a revision or release.""" | """Represents the author/committer of a revision or release.""" | ||||
name = attr.ib(type=bytes) | name = attr.ib(type=bytes) | ||||
email = attr.ib(type=bytes) | email = attr.ib(type=bytes) | ||||
fullname = attr.ib(type=bytes) | fullname = attr.ib(type=bytes) | ||||
@attr.s | @attr.s(frozen=True) | ||||
class Timestamp(BaseModel): | class Timestamp(BaseModel): | ||||
"""Represents a naive timestamp from a VCS.""" | """Represents a naive timestamp from a VCS.""" | ||||
seconds = attr.ib(type=int) | seconds = attr.ib(type=int) | ||||
microseconds = attr.ib(type=int) | microseconds = attr.ib(type=int) | ||||
@seconds.validator | @seconds.validator | ||||
def check_seconds(self, attribute, value): | def check_seconds(self, attribute, value): | ||||
"""Check that seconds fit in a 64-bits signed integer.""" | """Check that seconds fit in a 64-bits signed integer.""" | ||||
if not (-2**63 <= value < 2**63): | if not (-2**63 <= value < 2**63): | ||||
raise ValueError('Seconds must be a signed 64-bits integer.') | raise ValueError('Seconds must be a signed 64-bits integer.') | ||||
@microseconds.validator | @microseconds.validator | ||||
def check_microseconds(self, attribute, value): | def check_microseconds(self, attribute, value): | ||||
"""Checks that microseconds are positive and < 1000000.""" | """Checks that microseconds are positive and < 1000000.""" | ||||
if not (0 <= value < 10**6): | if not (0 <= value < 10**6): | ||||
raise ValueError('Microseconds must be in [0, 1000000[.') | raise ValueError('Microseconds must be in [0, 1000000[.') | ||||
@attr.s | @attr.s(frozen=True) | ||||
class TimestampWithTimezone(BaseModel): | class TimestampWithTimezone(BaseModel): | ||||
"""Represents a TZ-aware timestamp from a VCS.""" | """Represents a TZ-aware timestamp from a VCS.""" | ||||
timestamp = attr.ib(type=Timestamp) | timestamp = attr.ib(type=Timestamp) | ||||
offset = attr.ib(type=int) | offset = attr.ib(type=int) | ||||
negative_utc = attr.ib(type=bool) | negative_utc = attr.ib(type=bool) | ||||
@offset.validator | @offset.validator | ||||
def check_offset(self, attribute, value): | def check_offset(self, attribute, value): | ||||
Show All 10 Lines | def from_dict(cls, d): | ||||
accepted by :py:`swh.model.normalize_timestamp`.""" | accepted by :py:`swh.model.normalize_timestamp`.""" | ||||
d = normalize_timestamp(d) | d = normalize_timestamp(d) | ||||
return cls( | return cls( | ||||
timestamp=Timestamp.from_dict(d['timestamp']), | timestamp=Timestamp.from_dict(d['timestamp']), | ||||
offset=d['offset'], | offset=d['offset'], | ||||
negative_utc=d['negative_utc']) | negative_utc=d['negative_utc']) | ||||
@attr.s | @attr.s(frozen=True) | ||||
class Origin(BaseModel): | class Origin(BaseModel): | ||||
"""Represents a software source: a VCS and an URL.""" | """Represents a software source: a VCS and an URL.""" | ||||
url = attr.ib(type=str) | url = attr.ib(type=str) | ||||
type = attr.ib(type=Optional[str], default=None) | type = attr.ib(type=Optional[str], default=None) | ||||
def to_dict(self): | def to_dict(self): | ||||
r = super().to_dict() | r = super().to_dict() | ||||
r.pop('type', None) | r.pop('type', None) | ||||
return r | return r | ||||
@attr.s | @attr.s(frozen=True) | ||||
class OriginVisit(BaseModel): | class OriginVisit(BaseModel): | ||||
"""Represents a visit of an origin at a given point in time, by a | """Represents a visit of an origin at a given point in time, by a | ||||
SWH loader.""" | SWH loader.""" | ||||
origin = attr.ib(type=Origin) | origin = attr.ib(type=Origin) | ||||
date = attr.ib(type=datetime.datetime) | date = attr.ib(type=datetime.datetime) | ||||
status = attr.ib( | status = attr.ib( | ||||
type=str, | type=str, | ||||
validator=attr.validators.in_(['ongoing', 'full', 'partial'])) | validator=attr.validators.in_(['ongoing', 'full', 'partial'])) | ||||
▲ Show 20 Lines • Show All 42 Lines • ▼ Show 20 Lines | class ObjectType(Enum): | ||||
"""The type of content pointed to by a release. Usually a revision""" | """The type of content pointed to by a release. Usually a revision""" | ||||
CONTENT = 'content' | CONTENT = 'content' | ||||
DIRECTORY = 'directory' | DIRECTORY = 'directory' | ||||
REVISION = 'revision' | REVISION = 'revision' | ||||
RELEASE = 'release' | RELEASE = 'release' | ||||
SNAPSHOT = 'snapshot' | SNAPSHOT = 'snapshot' | ||||
@attr.s | @attr.s(frozen=True) | ||||
class SnapshotBranch(BaseModel): | class SnapshotBranch(BaseModel): | ||||
"""Represents one of the branches of a snapshot.""" | """Represents one of the branches of a snapshot.""" | ||||
target = attr.ib(type=bytes) | target = attr.ib(type=bytes) | ||||
target_type = attr.ib(type=TargetType) | target_type = attr.ib(type=TargetType) | ||||
@target.validator | @target.validator | ||||
def check_target(self, attribute, value): | def check_target(self, attribute, value): | ||||
"""Checks the target type is not an alias, checks the target is a | """Checks the target type is not an alias, checks the target is a | ||||
valid sha1_git.""" | valid sha1_git.""" | ||||
if self.target_type != TargetType.ALIAS: | if self.target_type != TargetType.ALIAS: | ||||
if len(value) != 20: | if len(value) != 20: | ||||
raise ValueError('Wrong length for bytes identifier: %d' % | raise ValueError('Wrong length for bytes identifier: %d' % | ||||
len(value)) | len(value)) | ||||
@classmethod | @classmethod | ||||
def from_dict(cls, d): | def from_dict(cls, d): | ||||
return cls( | return cls( | ||||
target=d['target'], | target=d['target'], | ||||
target_type=TargetType(d['target_type'])) | target_type=TargetType(d['target_type'])) | ||||
@attr.s | @attr.s(frozen=True) | ||||
class Snapshot(BaseModel): | class Snapshot(BaseModel): | ||||
"""Represents the full state of an origin at a given point in time.""" | """Represents the full state of an origin at a given point in time.""" | ||||
id = attr.ib(type=Sha1Git) | id = attr.ib(type=Sha1Git) | ||||
branches = attr.ib(type=Dict[bytes, Optional[SnapshotBranch]]) | branches = attr.ib(type=Dict[bytes, Optional[SnapshotBranch]]) | ||||
@classmethod | @classmethod | ||||
def from_dict(cls, d): | def from_dict(cls, d): | ||||
return cls( | return cls( | ||||
id=d['id'], | id=d['id'], | ||||
branches={ | branches={ | ||||
name: SnapshotBranch.from_dict(branch) if branch else None | name: SnapshotBranch.from_dict(branch) if branch else None | ||||
for (name, branch) in d['branches'].items() | for (name, branch) in d['branches'].items() | ||||
}) | }) | ||||
@attr.s | @attr.s(frozen=True) | ||||
class Release(BaseModel): | class Release(BaseModel): | ||||
id = attr.ib(type=Sha1Git) | id = attr.ib(type=Sha1Git) | ||||
name = attr.ib(type=bytes) | name = attr.ib(type=bytes) | ||||
message = attr.ib(type=bytes) | message = attr.ib(type=bytes) | ||||
target = attr.ib(type=Optional[Sha1Git]) | target = attr.ib(type=Optional[Sha1Git]) | ||||
target_type = attr.ib(type=ObjectType) | target_type = attr.ib(type=ObjectType) | ||||
synthetic = attr.ib(type=bool) | synthetic = attr.ib(type=bool) | ||||
author = attr.ib(type=Optional[Person], | author = attr.ib(type=Optional[Person], | ||||
Show All 30 Lines | |||||
class RevisionType(Enum): | class RevisionType(Enum): | ||||
GIT = 'git' | GIT = 'git' | ||||
TAR = 'tar' | TAR = 'tar' | ||||
DSC = 'dsc' | DSC = 'dsc' | ||||
SUBVERSION = 'svn' | SUBVERSION = 'svn' | ||||
MERCURIAL = 'hg' | MERCURIAL = 'hg' | ||||
@attr.s | @attr.s(frozen=True) | ||||
class Revision(BaseModel): | class Revision(BaseModel): | ||||
id = attr.ib(type=Sha1Git) | id = attr.ib(type=Sha1Git) | ||||
message = attr.ib(type=bytes) | message = attr.ib(type=bytes) | ||||
author = attr.ib(type=Person) | author = attr.ib(type=Person) | ||||
committer = attr.ib(type=Person) | committer = attr.ib(type=Person) | ||||
date = attr.ib(type=TimestampWithTimezone) | date = attr.ib(type=TimestampWithTimezone) | ||||
committer_date = attr.ib(type=TimestampWithTimezone) | committer_date = attr.ib(type=TimestampWithTimezone) | ||||
type = attr.ib(type=RevisionType) | type = attr.ib(type=RevisionType) | ||||
Show All 13 Lines | def from_dict(cls, d): | ||||
committer=Person.from_dict(d.pop('committer')), | committer=Person.from_dict(d.pop('committer')), | ||||
date=TimestampWithTimezone.from_dict(d.pop('date')), | date=TimestampWithTimezone.from_dict(d.pop('date')), | ||||
committer_date=TimestampWithTimezone.from_dict( | committer_date=TimestampWithTimezone.from_dict( | ||||
d.pop('committer_date')), | d.pop('committer_date')), | ||||
type=RevisionType(d.pop('type')), | type=RevisionType(d.pop('type')), | ||||
**d) | **d) | ||||
@attr.s | @attr.s(frozen=True) | ||||
class DirectoryEntry(BaseModel): | class DirectoryEntry(BaseModel): | ||||
name = attr.ib(type=bytes) | name = attr.ib(type=bytes) | ||||
type = attr.ib(type=str, | type = attr.ib(type=str, | ||||
validator=attr.validators.in_(['file', 'dir', 'rev'])) | validator=attr.validators.in_(['file', 'dir', 'rev'])) | ||||
target = attr.ib(type=Sha1Git) | target = attr.ib(type=Sha1Git) | ||||
perms = attr.ib(type=int) | perms = attr.ib(type=int) | ||||
"""Usually one of the values of `swh.model.from_disk.DentryPerms`.""" | """Usually one of the values of `swh.model.from_disk.DentryPerms`.""" | ||||
@attr.s | @attr.s(frozen=True) | ||||
class Directory(BaseModel): | class Directory(BaseModel): | ||||
id = attr.ib(type=Sha1Git) | id = attr.ib(type=Sha1Git) | ||||
entries = attr.ib(type=List[DirectoryEntry]) | entries = attr.ib(type=List[DirectoryEntry]) | ||||
@classmethod | @classmethod | ||||
def from_dict(cls, d): | def from_dict(cls, d): | ||||
return cls( | return cls( | ||||
id=d['id'], | id=d['id'], | ||||
entries=[DirectoryEntry.from_dict(entry) | entries=[DirectoryEntry.from_dict(entry) | ||||
for entry in d['entries']]) | for entry in d['entries']]) | ||||
@attr.s | @attr.s(frozen=True) | ||||
class Content(BaseModel): | class Content(BaseModel): | ||||
sha1 = attr.ib(type=bytes) | sha1 = attr.ib(type=bytes) | ||||
sha1_git = attr.ib(type=Sha1Git) | sha1_git = attr.ib(type=Sha1Git) | ||||
sha256 = attr.ib(type=bytes) | sha256 = attr.ib(type=bytes) | ||||
blake2s256 = attr.ib(type=bytes) | blake2s256 = attr.ib(type=bytes) | ||||
length = attr.ib(type=int) | length = attr.ib(type=int) | ||||
status = attr.ib( | status = attr.ib( | ||||
Show All 39 Lines |