Changeset View
Changeset View
Standalone View
Standalone View
swh/model/model.py
# Copyright (C) 2018-2020 The Software Heritage developers | # Copyright (C) 2018-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import datetime | import datetime | ||||
from abc import ABCMeta, abstractmethod | from abc import ABCMeta, abstractmethod | ||||
from copy import deepcopy | |||||
from enum import Enum | from enum import Enum | ||||
from hashlib import sha256 | from hashlib import sha256 | ||||
from typing import Dict, Iterable, Optional, Tuple, TypeVar, Union | from typing import Any, Dict, Iterable, Optional, Tuple, TypeVar, Union | ||||
from typing_extensions import Final | from typing_extensions import Final | ||||
import attr | import attr | ||||
from attrs_strict import type_validator | from attrs_strict import type_validator | ||||
import dateutil.parser | import dateutil.parser | ||||
import iso8601 | import iso8601 | ||||
from .collections import ImmutableDict | |||||
from .hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, MultiHash | |||||
from .identifiers import ( | from .identifiers import ( | ||||
normalize_timestamp, | normalize_timestamp, | ||||
directory_identifier, | directory_identifier, | ||||
revision_identifier, | revision_identifier, | ||||
release_identifier, | release_identifier, | ||||
snapshot_identifier, | snapshot_identifier, | ||||
SWHID, | |||||
) | ) | ||||
from .hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, MultiHash | |||||
class MissingData(Exception): | class MissingData(Exception): | ||||
"""Raised by `Content.with_data` when it has no way of fetching the | """Raised by `Content.with_data` when it has no way of fetching the | ||||
data (but not when fetching the data fails).""" | data (but not when fetching the data fails).""" | ||||
pass | pass | ||||
SHA1_SIZE = 20 | SHA1_SIZE = 20 | ||||
# TODO: Limit this to 20 bytes | # TODO: Limit this to 20 bytes | ||||
Sha1Git = bytes | Sha1Git = bytes | ||||
KT = TypeVar("KT") | |||||
VT = TypeVar("VT") | |||||
def freeze_optional_dict( | |||||
d: Union[None, Dict[KT, VT], ImmutableDict[KT, VT]] # type: ignore | |||||
) -> Optional[ImmutableDict[KT, VT]]: | |||||
if isinstance(d, dict): | |||||
return ImmutableDict(d) | |||||
else: | |||||
return d | |||||
def dictify(value): | def dictify(value): | ||||
"Helper function used by BaseModel.to_dict()" | "Helper function used by BaseModel.to_dict()" | ||||
if isinstance(value, BaseModel): | if isinstance(value, BaseModel): | ||||
return value.to_dict() | return value.to_dict() | ||||
elif isinstance(value, Enum): | elif isinstance(value, Enum): | ||||
return value.value | return value.value | ||||
elif isinstance(value, dict): | elif isinstance(value, (dict, ImmutableDict)): | ||||
return {k: dictify(v) for k, v in value.items()} | return {k: dictify(v) for k, v in value.items()} | ||||
elif isinstance(value, tuple): | elif isinstance(value, tuple): | ||||
return tuple(dictify(v) for v in value) | return tuple(dictify(v) for v in value) | ||||
else: | else: | ||||
return value | return value | ||||
ModelType = TypeVar("ModelType", bound="BaseModel") | ModelType = TypeVar("ModelType", bound="BaseModel") | ||||
▲ Show 20 Lines • Show All 213 Lines • ▼ Show 20 Lines | class OriginVisitStatus(BaseModel): | ||||
date = attr.ib(type=datetime.datetime, validator=type_validator()) | date = attr.ib(type=datetime.datetime, validator=type_validator()) | ||||
status = attr.ib( | status = attr.ib( | ||||
type=str, | type=str, | ||||
validator=attr.validators.in_(["created", "ongoing", "full", "partial"]), | validator=attr.validators.in_(["created", "ongoing", "full", "partial"]), | ||||
) | ) | ||||
snapshot = attr.ib(type=Optional[Sha1Git], validator=type_validator()) | snapshot = attr.ib(type=Optional[Sha1Git], validator=type_validator()) | ||||
metadata = attr.ib( | metadata = attr.ib( | ||||
type=Optional[Dict[str, object]], validator=type_validator(), default=None | type=Optional[ImmutableDict[str, object]], | ||||
validator=type_validator(), | |||||
converter=freeze_optional_dict, | |||||
default=None, | |||||
) | ) | ||||
class TargetType(Enum): | class TargetType(Enum): | ||||
"""The type of content pointed to by a snapshot branch. Usually a | """The type of content pointed to by a snapshot branch. Usually a | ||||
revision or an alias.""" | revision or an alias.""" | ||||
CONTENT = "content" | CONTENT = "content" | ||||
Show All 38 Lines | |||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class Snapshot(BaseModel, HashableObject): | class Snapshot(BaseModel, HashableObject): | ||||
"""Represents the full state of an origin at a given point in time.""" | """Represents the full state of an origin at a given point in time.""" | ||||
object_type: Final = "snapshot" | object_type: Final = "snapshot" | ||||
branches = attr.ib( | branches = attr.ib( | ||||
type=Dict[bytes, Optional[SnapshotBranch]], validator=type_validator() | type=ImmutableDict[bytes, Optional[SnapshotBranch]], | ||||
validator=type_validator(), | |||||
converter=freeze_optional_dict, | |||||
) | ) | ||||
id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") | id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") | ||||
@staticmethod | @staticmethod | ||||
def compute_hash(object_dict): | def compute_hash(object_dict): | ||||
return snapshot_identifier(object_dict) | return snapshot_identifier(object_dict) | ||||
@classmethod | @classmethod | ||||
def from_dict(cls, d): | def from_dict(cls, d): | ||||
d = d.copy() | d = d.copy() | ||||
return cls( | return cls( | ||||
branches={ | branches=ImmutableDict( | ||||
name: SnapshotBranch.from_dict(branch) if branch else None | (name, SnapshotBranch.from_dict(branch) if branch else None) | ||||
for (name, branch) in d.pop("branches").items() | for (name, branch) in d.pop("branches").items() | ||||
}, | ), | ||||
**d, | **d, | ||||
) | ) | ||||
@attr.s(frozen=True) | @attr.s(frozen=True) | ||||
class Release(BaseModel, HashableObject): | class Release(BaseModel, HashableObject): | ||||
object_type: Final = "release" | object_type: Final = "release" | ||||
name = attr.ib(type=bytes, validator=type_validator()) | name = attr.ib(type=bytes, validator=type_validator()) | ||||
message = attr.ib(type=Optional[bytes], validator=type_validator()) | message = attr.ib(type=Optional[bytes], validator=type_validator()) | ||||
target = attr.ib(type=Optional[Sha1Git], validator=type_validator()) | target = attr.ib(type=Optional[Sha1Git], validator=type_validator()) | ||||
target_type = attr.ib(type=ObjectType, validator=type_validator()) | target_type = attr.ib(type=ObjectType, validator=type_validator()) | ||||
synthetic = attr.ib(type=bool, validator=type_validator()) | synthetic = attr.ib(type=bool, validator=type_validator()) | ||||
author = attr.ib(type=Optional[Person], validator=type_validator(), default=None) | author = attr.ib(type=Optional[Person], validator=type_validator(), default=None) | ||||
date = attr.ib( | date = attr.ib( | ||||
type=Optional[TimestampWithTimezone], validator=type_validator(), default=None | type=Optional[TimestampWithTimezone], validator=type_validator(), default=None | ||||
) | ) | ||||
metadata = attr.ib( | metadata = attr.ib( | ||||
type=Optional[Dict[str, object]], validator=type_validator(), default=None | type=Optional[ImmutableDict[str, object]], | ||||
validator=type_validator(), | |||||
converter=freeze_optional_dict, | |||||
default=None, | |||||
) | ) | ||||
id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") | id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") | ||||
@staticmethod | @staticmethod | ||||
def compute_hash(object_dict): | def compute_hash(object_dict): | ||||
return release_identifier(object_dict) | return release_identifier(object_dict) | ||||
@author.validator | @author.validator | ||||
▲ Show 20 Lines • Show All 48 Lines • ▼ Show 20 Lines | class Revision(BaseModel, HashableObject): | ||||
date = attr.ib(type=Optional[TimestampWithTimezone], validator=type_validator()) | date = attr.ib(type=Optional[TimestampWithTimezone], validator=type_validator()) | ||||
committer_date = attr.ib( | committer_date = attr.ib( | ||||
type=Optional[TimestampWithTimezone], validator=type_validator() | type=Optional[TimestampWithTimezone], validator=type_validator() | ||||
) | ) | ||||
type = attr.ib(type=RevisionType, validator=type_validator()) | type = attr.ib(type=RevisionType, validator=type_validator()) | ||||
directory = attr.ib(type=Sha1Git, validator=type_validator()) | directory = attr.ib(type=Sha1Git, validator=type_validator()) | ||||
synthetic = attr.ib(type=bool, validator=type_validator()) | synthetic = attr.ib(type=bool, validator=type_validator()) | ||||
metadata = attr.ib( | metadata = attr.ib( | ||||
type=Optional[Dict[str, object]], validator=type_validator(), default=None | type=Optional[ImmutableDict[str, object]], | ||||
validator=type_validator(), | |||||
converter=freeze_optional_dict, | |||||
default=None, | |||||
) | ) | ||||
parents = attr.ib(type=Tuple[Sha1Git, ...], validator=type_validator(), default=()) | parents = attr.ib(type=Tuple[Sha1Git, ...], validator=type_validator(), default=()) | ||||
id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") | id = attr.ib(type=Sha1Git, validator=type_validator(), default=b"") | ||||
extra_headers = attr.ib( | extra_headers = attr.ib( | ||||
type=Tuple[Tuple[bytes, bytes], ...], # but it makes mypy sad | type=Tuple[Tuple[bytes, bytes], ...], | ||||
validator=type_validator(), | validator=type_validator(), | ||||
converter=tuplify_extra_headers, # type: ignore | converter=tuplify_extra_headers, | ||||
default=(), | default=(), | ||||
) | ) | ||||
def __attrs_post_init__(self): | def __attrs_post_init__(self): | ||||
super().__attrs_post_init__() | super().__attrs_post_init__() | ||||
# ensure metadata is a deep copy of whatever was given, and if needed | # ensure metadata is a deep copy of whatever was given, and if needed | ||||
# extract extra_headers from there | # extract extra_headers from there | ||||
if self.metadata: | if self.metadata: | ||||
metadata = deepcopy(self.metadata) | metadata = self.metadata | ||||
if not self.extra_headers and "extra_headers" in metadata: | if not self.extra_headers and "extra_headers" in metadata: | ||||
(extra_headers, metadata) = metadata.copy_pop("extra_headers") | |||||
object.__setattr__( | object.__setattr__( | ||||
self, | self, "extra_headers", tuplify_extra_headers(extra_headers), | ||||
"extra_headers", | |||||
tuplify_extra_headers(metadata.pop("extra_headers")), | |||||
) | ) | ||||
attr.validate(self) | attr.validate(self) | ||||
object.__setattr__(self, "metadata", metadata) | object.__setattr__(self, "metadata", metadata) | ||||
@staticmethod | @staticmethod | ||||
def compute_hash(object_dict): | def compute_hash(object_dict): | ||||
return revision_identifier(object_dict) | return revision_identifier(object_dict) | ||||
▲ Show 20 Lines • Show All 228 Lines • ▼ Show 20 Lines | ) -> "SkippedContent": | ||||
return cls(**d) | return cls(**d) | ||||
@classmethod | @classmethod | ||||
def from_dict(cls, d): | def from_dict(cls, d): | ||||
d2 = d.copy() | d2 = d.copy() | ||||
if d2.pop("data", None) is not None: | if d2.pop("data", None) is not None: | ||||
raise ValueError('SkippedContent has no "data" attribute %r' % d) | raise ValueError('SkippedContent has no "data" attribute %r' % d) | ||||
return super().from_dict(d2, use_subclass=False) | return super().from_dict(d2, use_subclass=False) | ||||
class MetadataAuthorityType(Enum): | |||||
DEPOSIT = "deposit" | |||||
FORGE = "forge" | |||||
REGISTRY = "registry" | |||||
@attr.s(frozen=True) | |||||
class MetadataAuthority(BaseModel): | |||||
"""Represents an entity that provides metadata about an origin or | |||||
software artifact.""" | |||||
type = attr.ib(type=MetadataAuthorityType, validator=type_validator()) | |||||
url = attr.ib(type=str, validator=type_validator()) | |||||
metadata = attr.ib( | |||||
type=Optional[ImmutableDict[str, Any]], | |||||
default=None, | |||||
validator=type_validator(), | |||||
converter=freeze_optional_dict, | |||||
) | |||||
@attr.s(frozen=True) | |||||
class MetadataFetcher(BaseModel): | |||||
"""Represents a software component used to fetch metadata from a metadata | |||||
authority, and ingest them into the Software Heritage archive.""" | |||||
name = attr.ib(type=str, validator=type_validator()) | |||||
version = attr.ib(type=str, validator=type_validator()) | |||||
metadata = attr.ib( | |||||
type=Optional[ImmutableDict[str, Any]], | |||||
default=None, | |||||
validator=type_validator(), | |||||
converter=freeze_optional_dict, | |||||
) | |||||
class MetadataTargetType(Enum): | |||||
"""The type of object extrinsic metadata refer to.""" | |||||
CONTENT = "content" | |||||
DIRECTORY = "directory" | |||||
REVISION = "revision" | |||||
RELEASE = "release" | |||||
SNAPSHOT = "snapshot" | |||||
ORIGIN = "origin" | |||||
@attr.s(frozen=True) | |||||
class RawExtrinsicMetadata(BaseModel): | |||||
# target object | |||||
type = attr.ib(type=MetadataTargetType, validator=type_validator()) | |||||
id = attr.ib(type=Union[str, SWHID], validator=type_validator()) | |||||
"""URL if type=MetadataTargetType.ORIGIN, else core SWHID""" | |||||
# source | |||||
discovery_date = attr.ib(type=datetime.datetime, validator=type_validator()) | |||||
authority = attr.ib(type=MetadataAuthority, validator=type_validator()) | |||||
fetcher = attr.ib(type=MetadataFetcher, validator=type_validator()) | |||||
# the metadata itself | |||||
format = attr.ib(type=str, validator=type_validator()) | |||||
metadata = attr.ib(type=bytes, validator=type_validator()) | |||||
# context | |||||
origin = attr.ib(type=Optional[str], default=None, validator=type_validator()) | |||||
visit = attr.ib(type=Optional[int], default=None, validator=type_validator()) | |||||
snapshot = attr.ib(type=Optional[SWHID], default=None, validator=type_validator()) | |||||
release = attr.ib(type=Optional[SWHID], default=None, validator=type_validator()) | |||||
revision = attr.ib(type=Optional[SWHID], default=None, validator=type_validator()) | |||||
path = attr.ib(type=Optional[bytes], default=None, validator=type_validator()) | |||||
directory = attr.ib(type=Optional[SWHID], default=None, validator=type_validator()) | |||||
@id.validator | |||||
def check_id(self, attribute, value): | |||||
if self.type == MetadataTargetType.ORIGIN: | |||||
if isinstance(value, SWHID) or value.startswith("swh:"): | |||||
raise ValueError( | |||||
"Got SWHID as id for origin metadata (expected an URL)." | |||||
) | |||||
else: | |||||
self._check_pid(self.type.value, value) | |||||
@origin.validator | |||||
def check_origin(self, attribute, value): | |||||
if value is None: | |||||
return | |||||
if self.type not in ( | |||||
MetadataTargetType.SNAPSHOT, | |||||
MetadataTargetType.RELEASE, | |||||
MetadataTargetType.REVISION, | |||||
MetadataTargetType.DIRECTORY, | |||||
MetadataTargetType.CONTENT, | |||||
): | |||||
raise ValueError( | |||||
f"Unexpected 'origin' context for {self.type.value} object: {value}" | |||||
) | |||||
if value.startswith("swh:"): | |||||
# Technically this is valid; but: | |||||
# 1. SWHIDs are URIs, not URLs | |||||
# 2. if a SWHID gets here, it's very likely to be a mistake | |||||
# (and we can remove this check if it turns out there is a | |||||
# legitimate use for it). | |||||
raise ValueError(f"SWHID used as context origin URL: {value}") | |||||
@visit.validator | |||||
def check_visit(self, attribute, value): | |||||
if value is None: | |||||
return | |||||
if self.type not in ( | |||||
MetadataTargetType.SNAPSHOT, | |||||
MetadataTargetType.RELEASE, | |||||
MetadataTargetType.REVISION, | |||||
MetadataTargetType.DIRECTORY, | |||||
MetadataTargetType.CONTENT, | |||||
): | |||||
raise ValueError( | |||||
f"Unexpected 'visit' context for {self.type.value} object: {value}" | |||||
) | |||||
if self.origin is None: | |||||
raise ValueError("'origin' context must be set if 'visit' is.") | |||||
if value <= 0: | |||||
raise ValueError("Nonpositive visit id") | |||||
@snapshot.validator | |||||
def check_snapshot(self, attribute, value): | |||||
if value is None: | |||||
return | |||||
if self.type not in ( | |||||
MetadataTargetType.RELEASE, | |||||
MetadataTargetType.REVISION, | |||||
MetadataTargetType.DIRECTORY, | |||||
MetadataTargetType.CONTENT, | |||||
): | |||||
raise ValueError( | |||||
f"Unexpected 'snapshot' context for {self.type.value} object: {value}" | |||||
) | |||||
self._check_pid("snapshot", value) | |||||
@release.validator | |||||
def check_release(self, attribute, value): | |||||
if value is None: | |||||
return | |||||
if self.type not in ( | |||||
MetadataTargetType.REVISION, | |||||
MetadataTargetType.DIRECTORY, | |||||
MetadataTargetType.CONTENT, | |||||
): | |||||
raise ValueError( | |||||
f"Unexpected 'release' context for {self.type.value} object: {value}" | |||||
) | |||||
self._check_pid("release", value) | |||||
@revision.validator | |||||
def check_revision(self, attribute, value): | |||||
if value is None: | |||||
return | |||||
if self.type not in (MetadataTargetType.DIRECTORY, MetadataTargetType.CONTENT,): | |||||
raise ValueError( | |||||
f"Unexpected 'revision' context for {self.type.value} object: {value}" | |||||
) | |||||
self._check_pid("revision", value) | |||||
@path.validator | |||||
def check_path(self, attribute, value): | |||||
if value is None: | |||||
return | |||||
if self.type not in (MetadataTargetType.DIRECTORY, MetadataTargetType.CONTENT,): | |||||
raise ValueError( | |||||
f"Unexpected 'path' context for {self.type.value} object: {value}" | |||||
) | |||||
@directory.validator | |||||
def check_directory(self, attribute, value): | |||||
if value is None: | |||||
return | |||||
if self.type not in (MetadataTargetType.CONTENT,): | |||||
raise ValueError( | |||||
f"Unexpected 'directory' context for {self.type.value} object: {value}" | |||||
) | |||||
self._check_pid("directory", value) | |||||
def _check_pid(self, expected_object_type, pid): | |||||
if isinstance(pid, str): | |||||
raise ValueError(f"Expected SWHID, got a string: {pid}") | |||||
if pid.object_type != expected_object_type: | |||||
raise ValueError( | |||||
f"Expected SWHID type '{expected_object_type}', " | |||||
f"got '{pid.object_type}' in {pid}" | |||||
) | |||||
if pid.metadata: | |||||
raise ValueError(f"Expected core SWHID, but got: {pid}") |