Changeset View
Changeset View
Standalone View
Standalone View
swh/model/model.py
# Copyright (C) 2018-2020 The Software Heritage developers | # Copyright (C) 2018-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import datetime | import datetime | ||||
from abc import ABCMeta, abstractmethod | from abc import ABCMeta, abstractmethod | ||||
from copy import deepcopy | |||||
from enum import Enum | from enum import Enum | ||||
from hashlib import sha256 | from hashlib import sha256 | ||||
from typing import Any, Dict, Iterable, Optional, Tuple, TypeVar, Union | from typing import Any, Dict, Iterable, Optional, Tuple, TypeVar, Union | ||||
from typing_extensions import Final | from typing_extensions import Final | ||||
import attr | import attr | ||||
from attrs_strict import type_validator | from attrs_strict import type_validator | ||||
import dateutil.parser | import dateutil.parser | ||||
import iso8601 | import iso8601 | ||||
from .collections import ImmutableDict | |||||
from .hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, MultiHash | |||||
from .identifiers import ( | from .identifiers import ( | ||||
normalize_timestamp, | normalize_timestamp, | ||||
directory_identifier, | directory_identifier, | ||||
revision_identifier, | revision_identifier, | ||||
release_identifier, | release_identifier, | ||||
snapshot_identifier, | snapshot_identifier, | ||||
SWHID, | SWHID, | ||||
) | ) | ||||
from .hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, MultiHash | |||||
class MissingData(Exception): | class MissingData(Exception): | ||||
"""Raised by `Content.with_data` when it has no way of fetching the | """Raised by `Content.with_data` when it has no way of fetching the | ||||
data (but not when fetching the data fails).""" | data (but not when fetching the data fails).""" | ||||
pass | pass | ||||
SHA1_SIZE = 20 | SHA1_SIZE = 20 | ||||
# TODO: Limit this to 20 bytes | # TODO: Limit this to 20 bytes | ||||
Sha1Git = bytes | Sha1Git = bytes | ||||
KT = TypeVar("KT") | |||||
VT = TypeVar("VT") | |||||
def freeze_optional_dict( | |||||
d: Union[None, Dict[KT, VT], ImmutableDict[KT, VT]] # type: ignore | |||||
) -> Optional[ImmutableDict[KT, VT]]: | |||||
if isinstance(d, dict): | |||||
return ImmutableDict(d) | |||||
else: | |||||
return d | |||||
def dictify(value): | def dictify(value): | ||||
"Helper function used by BaseModel.to_dict()" | "Helper function used by BaseModel.to_dict()" | ||||
if isinstance(value, BaseModel): | if isinstance(value, BaseModel): | ||||
return value.to_dict() | return value.to_dict() | ||||
elif isinstance(value, Enum): | elif isinstance(value, Enum): | ||||
return value.value | return value.value | ||||
elif isinstance(value, dict): | elif isinstance(value, (dict, ImmutableDict)): | ||||
return {k: dictify(v) for k, v in value.items()} | return {k: dictify(v) for k, v in value.items()} | ||||
elif isinstance(value, tuple): | elif isinstance(value, tuple): | ||||
return tuple(dictify(v) for v in value) | return tuple(dictify(v) for v in value) | ||||
else: | else: | ||||
return value | return value | ||||
ModelType = TypeVar("ModelType", bound="BaseModel") | ModelType = TypeVar("ModelType", bound="BaseModel") | ||||
▲ Show 20 Lines • Show All 213 Lines • ▼ Show 20 Lines | class OriginVisitStatus(BaseModel): | ||||
date = attr.ib(type=datetime.datetime, validator=type_validator()) | date = attr.ib(type=datetime.datetime, validator=type_validator()) | ||||
status = attr.ib( | status = attr.ib( | ||||
type=str, | type=str, | ||||
validator=attr.validators.in_(["created", "ongoing", "full", "partial"]), | validator=attr.validators.in_(["created", "ongoing", "full", "partial"]), | ||||
) | ) | ||||
snapshot = attr.ib(type=Optional[Sha1Git], validator=type_validator()) | snapshot = attr.ib(type=Optional[Sha1Git], validator=type_validator()) | ||||
metadata = attr.ib( | metadata = attr.ib( | ||||
type=Optional[Dict[str, object]], validator=type_validator(), default=None | type=Optional[ImmutableDict[str, object]], | ||||
validator=type_validator(), | |||||
converter=freeze_optional_dict, | |||||
default=None, | |||||
) | ) | ||||
class TargetType(Enum): | class TargetType(Enum): | ||||
"""The type of content pointed to by a snapshot branch. Usually a | """The type of content pointed to by a snapshot branch. Usually a | ||||
revision or an alias.""" | revision or an alias.""" | ||||
CONTENT = "content" | CONTENT = "content" | ||||
Show All 38 Lines | |||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class Snapshot(BaseModel, HashableObject): | class Snapshot(BaseModel, HashableObject): | ||||
"""Represents the full state of an origin at a given point in time.""" | """Represents the full state of an origin at a given point in time.""" | ||||
object_type: Final = "snapshot" | object_type: Final = "snapshot" | ||||
branches = attr.ib( | branches = attr.ib( | ||||
type=Dict[bytes, Optional[SnapshotBranch]], validator=type_validator() | type=ImmutableDict[bytes, Optional[SnapshotBranch]], | ||||
validator=type_validator(), | |||||
converter=freeze_optional_dict, | |||||
) | ) | ||||
id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") | id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") | ||||
@staticmethod | @staticmethod | ||||
def compute_hash(object_dict): | def compute_hash(object_dict): | ||||
return snapshot_identifier(object_dict) | return snapshot_identifier(object_dict) | ||||
@classmethod | @classmethod | ||||
def from_dict(cls, d): | def from_dict(cls, d): | ||||
d = d.copy() | d = d.copy() | ||||
return cls( | return cls( | ||||
branches={ | branches=ImmutableDict( | ||||
name: SnapshotBranch.from_dict(branch) if branch else None | (name, SnapshotBranch.from_dict(branch) if branch else None) | ||||
for (name, branch) in d.pop("branches").items() | for (name, branch) in d.pop("branches").items() | ||||
}, | ), | ||||
**d, | **d, | ||||
) | ) | ||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class Release(BaseModel, HashableObject): | class Release(BaseModel, HashableObject): | ||||
object_type: Final = "release" | object_type: Final = "release" | ||||
name = attr.ib(type=bytes, validator=type_validator()) | name = attr.ib(type=bytes, validator=type_validator()) | ||||
message = attr.ib(type=Optional[bytes], validator=type_validator()) | message = attr.ib(type=Optional[bytes], validator=type_validator()) | ||||
target = attr.ib(type=Optional[Sha1Git], validator=type_validator()) | target = attr.ib(type=Optional[Sha1Git], validator=type_validator()) | ||||
target_type = attr.ib(type=ObjectType, validator=type_validator()) | target_type = attr.ib(type=ObjectType, validator=type_validator()) | ||||
synthetic = attr.ib(type=bool, validator=type_validator()) | synthetic = attr.ib(type=bool, validator=type_validator()) | ||||
author = attr.ib(type=Optional[Person], validator=type_validator(), default=None) | author = attr.ib(type=Optional[Person], validator=type_validator(), default=None) | ||||
date = attr.ib( | date = attr.ib( | ||||
type=Optional[TimestampWithTimezone], validator=type_validator(), default=None | type=Optional[TimestampWithTimezone], validator=type_validator(), default=None | ||||
) | ) | ||||
metadata = attr.ib( | metadata = attr.ib( | ||||
type=Optional[Dict[str, object]], validator=type_validator(), default=None | type=Optional[ImmutableDict[str, object]], | ||||
validator=type_validator(), | |||||
converter=freeze_optional_dict, | |||||
default=None, | |||||
) | ) | ||||
id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") | id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") | ||||
@staticmethod | @staticmethod | ||||
def compute_hash(object_dict): | def compute_hash(object_dict): | ||||
return release_identifier(object_dict) | return release_identifier(object_dict) | ||||
@author.validator | @author.validator | ||||
▲ Show 20 Lines • Show All 48 Lines • ▼ Show 20 Lines | class Revision(BaseModel, HashableObject): | ||||
date = attr.ib(type=Optional[TimestampWithTimezone], validator=type_validator()) | date = attr.ib(type=Optional[TimestampWithTimezone], validator=type_validator()) | ||||
committer_date = attr.ib( | committer_date = attr.ib( | ||||
type=Optional[TimestampWithTimezone], validator=type_validator() | type=Optional[TimestampWithTimezone], validator=type_validator() | ||||
) | ) | ||||
type = attr.ib(type=RevisionType, validator=type_validator()) | type = attr.ib(type=RevisionType, validator=type_validator()) | ||||
directory = attr.ib(type=Sha1Git, validator=type_validator()) | directory = attr.ib(type=Sha1Git, validator=type_validator()) | ||||
synthetic = attr.ib(type=bool, validator=type_validator()) | synthetic = attr.ib(type=bool, validator=type_validator()) | ||||
metadata = attr.ib( | metadata = attr.ib( | ||||
type=Optional[Dict[str, object]], validator=type_validator(), default=None | type=Optional[ImmutableDict[str, object]], | ||||
validator=type_validator(), | |||||
converter=freeze_optional_dict, | |||||
default=None, | |||||
) | ) | ||||
parents = attr.ib(type=Tuple[Sha1Git, ...], validator=type_validator(), default=()) | parents = attr.ib(type=Tuple[Sha1Git, ...], validator=type_validator(), default=()) | ||||
id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") | id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") | ||||
extra_headers = attr.ib( | extra_headers = attr.ib( | ||||
type=Tuple[Tuple[bytes, bytes], ...], # but it makes mypy sad | type=Tuple[Tuple[bytes, bytes], ...], # but it makes mypy sad | ||||
validator=type_validator(), | validator=type_validator(), | ||||
converter=tuplify_extra_headers, # type: ignore | converter=tuplify_extra_headers, # type: ignore | ||||
default=(), | default=(), | ||||
) | ) | ||||
def __attrs_post_init__(self): | def __attrs_post_init__(self): | ||||
super().__attrs_post_init__() | super().__attrs_post_init__() | ||||
# ensure metadata is a deep copy of whatever was given, and if needed | # ensure metadata is a deep copy of whatever was given, and if needed | ||||
# extract extra_headers from there | # extract extra_headers from there | ||||
if self.metadata: | if self.metadata: | ||||
metadata = deepcopy(self.metadata) | metadata = self.metadata | ||||
if not self.extra_headers and "extra_headers" in metadata: | if not self.extra_headers and "extra_headers" in metadata: | ||||
(extra_headers, metadata) = metadata.copy_pop("extra_headers") | |||||
object.__setattr__( | object.__setattr__( | ||||
self, | self, "extra_headers", tuplify_extra_headers(extra_headers), | ||||
"extra_headers", | |||||
tuplify_extra_headers(metadata.pop("extra_headers")), | |||||
) | ) | ||||
attr.validate(self) | attr.validate(self) | ||||
object.__setattr__(self, "metadata", metadata) | object.__setattr__(self, "metadata", metadata) | ||||
@staticmethod | @staticmethod | ||||
def compute_hash(object_dict): | def compute_hash(object_dict): | ||||
return revision_identifier(object_dict) | return revision_identifier(object_dict) | ||||
▲ Show 20 Lines • Show All 244 Lines • ▼ Show 20 Lines | |||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class MetadataAuthority(BaseModel): | class MetadataAuthority(BaseModel): | ||||
"""Represents an entity that provides metadata about an origin or | """Represents an entity that provides metadata about an origin or | ||||
software artifact.""" | software artifact.""" | ||||
type = attr.ib(type=MetadataAuthorityType, validator=type_validator()) | type = attr.ib(type=MetadataAuthorityType, validator=type_validator()) | ||||
url = attr.ib(type=str, validator=type_validator()) | url = attr.ib(type=str, validator=type_validator()) | ||||
metadata = attr.ib( | metadata = attr.ib( | ||||
type=Optional[Dict[str, Any]], default=None, validator=type_validator() | type=Optional[ImmutableDict[str, Any]], | ||||
default=None, | |||||
validator=type_validator(), | |||||
converter=freeze_optional_dict, | |||||
) | ) | ||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class MetadataFetcher(BaseModel): | class MetadataFetcher(BaseModel): | ||||
"""Represents a software component used to fetch metadata from a metadata | """Represents a software component used to fetch metadata from a metadata | ||||
authority, and ingest them into the Software Heritage archive.""" | authority, and ingest them into the Software Heritage archive.""" | ||||
name = attr.ib(type=str, validator=type_validator()) | name = attr.ib(type=str, validator=type_validator()) | ||||
version = attr.ib(type=str, validator=type_validator()) | version = attr.ib(type=str, validator=type_validator()) | ||||
metadata = attr.ib( | metadata = attr.ib( | ||||
type=Optional[Dict[str, Any]], default=None, validator=type_validator() | type=Optional[ImmutableDict[str, Any]], | ||||
default=None, | |||||
validator=type_validator(), | |||||
converter=freeze_optional_dict, | |||||
) | ) | ||||
class MetadataTargetType(Enum): | class MetadataTargetType(Enum): | ||||
"""The type of object extrinsic metadata refer to.""" | """The type of object extrinsic metadata refer to.""" | ||||
CONTENT = "content" | CONTENT = "content" | ||||
DIRECTORY = "directory" | DIRECTORY = "directory" | ||||
▲ Show 20 Lines • Show All 166 Lines • Show Last 20 Lines |