Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/storage_data.py
# Copyright (C) 2015-2020 The Software Heritage developers | # Copyright (C) 2015-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import datetime | import datetime | ||||
from typing import Tuple | from typing import Tuple | ||||
import attr | import attr | ||||
from swh.model import from_disk | from swh.model import from_disk | ||||
from swh.model.hashutil import hash_to_bytes, hash_to_hex | from swh.model.hashutil import hash_to_bytes, hash_to_hex | ||||
from swh.model.identifiers import parse_swhid | |||||
from swh.model.model import ( | from swh.model.model import ( | ||||
Content, | Content, | ||||
Directory, | Directory, | ||||
DirectoryEntry, | DirectoryEntry, | ||||
MetadataAuthority, | MetadataAuthority, | ||||
MetadataAuthorityType, | MetadataAuthorityType, | ||||
MetadataFetcher, | MetadataFetcher, | ||||
MetadataTargetType, | |||||
ObjectType, | ObjectType, | ||||
Origin, | Origin, | ||||
OriginVisit, | OriginVisit, | ||||
Person, | Person, | ||||
RawExtrinsicMetadata, | RawExtrinsicMetadata, | ||||
Release, | Release, | ||||
Revision, | Revision, | ||||
RevisionType, | RevisionType, | ||||
SkippedContent, | SkippedContent, | ||||
Snapshot, | Snapshot, | ||||
SnapshotBranch, | SnapshotBranch, | ||||
TargetType, | TargetType, | ||||
Timestamp, | Timestamp, | ||||
TimestampWithTimezone, | TimestampWithTimezone, | ||||
) | ) | ||||
from swh.model.swhid import SWHID | |||||
from swh.model.swhid import _swhid_type_map as swhid_type_map | |||||
from swh.model.swhid import parse_swhid | |||||
class SWHIDProvider: | |||||
def __init__(self, data): | |||||
self._data = data | |||||
def __getattr__(self, name): | |||||
return mkswhid(getattr(self._data, name)) | |||||
def mkswhid(obj): | |||||
object_type = swhid_type_map.get(obj.object_type) | |||||
if object_type: | |||||
return SWHID(object_type=object_type, object_id=obj.id) | |||||
class StorageData: | class StorageData: | ||||
"""Data model objects to use within tests. | """Data model objects to use within tests. | ||||
""" | """ | ||||
swhid: SWHIDProvider | |||||
content = Content( | content = Content( | ||||
data=b"42\n", | data=b"42\n", | ||||
length=3, | length=3, | ||||
sha1=hash_to_bytes("34973274ccef6ab4dfaaf86599792fa9c3fe4689"), | sha1=hash_to_bytes("34973274ccef6ab4dfaaf86599792fa9c3fe4689"), | ||||
sha1_git=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), | sha1_git=hash_to_bytes("d81cc0710eb6cf9efd5b920a8453e1e07157b6cd"), | ||||
sha256=hash_to_bytes( | sha256=hash_to_bytes( | ||||
"084c799cd551dd1d8d5c5f9a5d593b2e931f5e36122ee5c793c1d08a19839cc0" | "084c799cd551dd1d8d5c5f9a5d593b2e931f5e36122ee5c793c1d08a19839cc0" | ||||
), | ), | ||||
▲ Show 20 Lines • Show All 406 Lines • ▼ Show 20 Lines | complete_snapshot = Snapshot( | ||||
), | ), | ||||
b"dangling": None, | b"dangling": None, | ||||
}, | }, | ||||
) | ) | ||||
snapshots: Tuple[Snapshot, ...] = (snapshot, empty_snapshot, complete_snapshot) | snapshots: Tuple[Snapshot, ...] = (snapshot, empty_snapshot, complete_snapshot) | ||||
content_metadata1 = RawExtrinsicMetadata( | content_metadata1 = RawExtrinsicMetadata( | ||||
type=MetadataTargetType.CONTENT, | |||||
target=parse_swhid(f"swh:1:cnt:{hash_to_hex(content.sha1_git)}"), | target=parse_swhid(f"swh:1:cnt:{hash_to_hex(content.sha1_git)}"), | ||||
origin=origin.url, | origin=mkswhid(origin), | ||||
discovery_date=datetime.datetime( | discovery_date=datetime.datetime( | ||||
2015, 1, 1, 21, 0, 0, tzinfo=datetime.timezone.utc | 2015, 1, 1, 21, 0, 0, tzinfo=datetime.timezone.utc | ||||
), | ), | ||||
authority=attr.evolve(metadata_authority, metadata=None), | authority=attr.evolve(metadata_authority, metadata=None), | ||||
fetcher=attr.evolve(metadata_fetcher, metadata=None), | fetcher=attr.evolve(metadata_fetcher, metadata=None), | ||||
format="json", | format="json", | ||||
metadata=b'{"foo": "bar"}', | metadata=b'{"foo": "bar"}', | ||||
) | ) | ||||
content_metadata2 = RawExtrinsicMetadata( | content_metadata2 = RawExtrinsicMetadata( | ||||
type=MetadataTargetType.CONTENT, | |||||
target=parse_swhid(f"swh:1:cnt:{hash_to_hex(content.sha1_git)}"), | target=parse_swhid(f"swh:1:cnt:{hash_to_hex(content.sha1_git)}"), | ||||
origin=origin2.url, | origin=mkswhid(origin2), | ||||
discovery_date=datetime.datetime( | discovery_date=datetime.datetime( | ||||
2017, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc | 2017, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc | ||||
), | ), | ||||
authority=attr.evolve(metadata_authority, metadata=None), | authority=attr.evolve(metadata_authority, metadata=None), | ||||
fetcher=attr.evolve(metadata_fetcher, metadata=None), | fetcher=attr.evolve(metadata_fetcher, metadata=None), | ||||
format="yaml", | format="yaml", | ||||
metadata=b"foo: bar", | metadata=b"foo: bar", | ||||
) | ) | ||||
content_metadata3 = RawExtrinsicMetadata( | content_metadata3 = RawExtrinsicMetadata( | ||||
type=MetadataTargetType.CONTENT, | |||||
target=parse_swhid(f"swh:1:cnt:{hash_to_hex(content.sha1_git)}"), | target=parse_swhid(f"swh:1:cnt:{hash_to_hex(content.sha1_git)}"), | ||||
discovery_date=datetime.datetime( | discovery_date=datetime.datetime( | ||||
2017, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc | 2017, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc | ||||
), | ), | ||||
authority=attr.evolve(metadata_authority2, metadata=None), | authority=attr.evolve(metadata_authority2, metadata=None), | ||||
fetcher=attr.evolve(metadata_fetcher2, metadata=None), | fetcher=attr.evolve(metadata_fetcher2, metadata=None), | ||||
format="yaml", | format="yaml", | ||||
metadata=b"foo: bar", | metadata=b"foo: bar", | ||||
origin=origin.url, | origin=mkswhid(origin), | ||||
visit=42, | visit=42, | ||||
snapshot=parse_swhid(f"swh:1:snp:{hash_to_hex(snapshot.id)}"), | snapshot=parse_swhid(f"swh:1:snp:{hash_to_hex(snapshot.id)}"), | ||||
release=parse_swhid(f"swh:1:rel:{hash_to_hex(release.id)}"), | release=parse_swhid(f"swh:1:rel:{hash_to_hex(release.id)}"), | ||||
revision=parse_swhid(f"swh:1:rev:{hash_to_hex(revision.id)}"), | revision=parse_swhid(f"swh:1:rev:{hash_to_hex(revision.id)}"), | ||||
directory=parse_swhid(f"swh:1:dir:{hash_to_hex(directory.id)}"), | directory=parse_swhid(f"swh:1:dir:{hash_to_hex(directory.id)}"), | ||||
path=b"/foo/bar", | path=b"/foo/bar", | ||||
) | ) | ||||
content_metadata: Tuple[RawExtrinsicMetadata, ...] = ( | content_metadata: Tuple[RawExtrinsicMetadata, ...] = ( | ||||
content_metadata1, | content_metadata1, | ||||
content_metadata2, | content_metadata2, | ||||
content_metadata3, | content_metadata3, | ||||
) | ) | ||||
origin_metadata1 = RawExtrinsicMetadata( | origin_metadata1 = RawExtrinsicMetadata( | ||||
type=MetadataTargetType.ORIGIN, | target=mkswhid(origin), | ||||
target=origin.url, | |||||
discovery_date=datetime.datetime( | discovery_date=datetime.datetime( | ||||
2015, 1, 1, 21, 0, 0, tzinfo=datetime.timezone.utc | 2015, 1, 1, 21, 0, 0, tzinfo=datetime.timezone.utc | ||||
), | ), | ||||
authority=attr.evolve(metadata_authority, metadata=None), | authority=attr.evolve(metadata_authority, metadata=None), | ||||
fetcher=attr.evolve(metadata_fetcher, metadata=None), | fetcher=attr.evolve(metadata_fetcher, metadata=None), | ||||
format="json", | format="json", | ||||
metadata=b'{"foo": "bar"}', | metadata=b'{"foo": "bar"}', | ||||
) | ) | ||||
origin_metadata2 = RawExtrinsicMetadata( | origin_metadata2 = RawExtrinsicMetadata( | ||||
type=MetadataTargetType.ORIGIN, | target=mkswhid(origin), | ||||
target=origin.url, | |||||
discovery_date=datetime.datetime( | discovery_date=datetime.datetime( | ||||
2017, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc | 2017, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc | ||||
), | ), | ||||
authority=attr.evolve(metadata_authority, metadata=None), | authority=attr.evolve(metadata_authority, metadata=None), | ||||
fetcher=attr.evolve(metadata_fetcher, metadata=None), | fetcher=attr.evolve(metadata_fetcher, metadata=None), | ||||
format="yaml", | format="yaml", | ||||
metadata=b"foo: bar", | metadata=b"foo: bar", | ||||
) | ) | ||||
origin_metadata3 = RawExtrinsicMetadata( | origin_metadata3 = RawExtrinsicMetadata( | ||||
type=MetadataTargetType.ORIGIN, | target=mkswhid(origin), | ||||
target=origin.url, | |||||
discovery_date=datetime.datetime( | discovery_date=datetime.datetime( | ||||
2017, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc | 2017, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc | ||||
), | ), | ||||
authority=attr.evolve(metadata_authority2, metadata=None), | authority=attr.evolve(metadata_authority2, metadata=None), | ||||
fetcher=attr.evolve(metadata_fetcher2, metadata=None), | fetcher=attr.evolve(metadata_fetcher2, metadata=None), | ||||
format="yaml", | format="yaml", | ||||
metadata=b"foo: bar", | metadata=b"foo: bar", | ||||
) | ) | ||||
origin_metadata: Tuple[RawExtrinsicMetadata, ...] = ( | origin_metadata: Tuple[RawExtrinsicMetadata, ...] = ( | ||||
origin_metadata1, | origin_metadata1, | ||||
origin_metadata2, | origin_metadata2, | ||||
origin_metadata3, | origin_metadata3, | ||||
) | ) | ||||
StorageData.swhid = SWHIDProvider(StorageData) |