Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/package/tests/test_loader_metadata.py
Show All 23 Lines | from swh.model.model import ( | ||||
RawExtrinsicMetadata, | RawExtrinsicMetadata, | ||||
Revision, | Revision, | ||||
RevisionType, | RevisionType, | ||||
Sha1Git, | Sha1Git, | ||||
) | ) | ||||
from swh.model.swhids import CoreSWHID, ExtendedObjectType, ExtendedSWHID, ObjectType | from swh.model.swhids import CoreSWHID, ExtendedObjectType, ExtendedSWHID, ObjectType | ||||
EMPTY_SNAPSHOT_ID = "1a8893e6a86f444e8be8e7bda6cb34fb1735a00e" | EMPTY_SNAPSHOT_ID = "1a8893e6a86f444e8be8e7bda6cb34fb1735a00e" | ||||
FULL_SNAPSHOT_ID = "4a9b608c9f01860a627237dd2409d1d50ec4b054" | FULL_SNAPSHOT_ID = "4ac5730a9393f5099b63a35a17b6c33d36d70c3a" | ||||
AUTHORITY = MetadataAuthority( | AUTHORITY = MetadataAuthority( | ||||
type=MetadataAuthorityType.FORGE, url="http://example.org/", | type=MetadataAuthorityType.FORGE, url="http://example.org/", | ||||
) | ) | ||||
ORIGIN_URL = "http://example.org/archive.tgz" | ORIGIN_URL = "http://example.org/archive.tgz" | ||||
ORIGIN_SWHID = Origin(ORIGIN_URL).swhid() | ORIGIN_SWHID = Origin(ORIGIN_URL).swhid() | ||||
REVISION_ID = hash_to_bytes("8ff44f081d43176474b267de5451f2c2e88089d0") | REVISION_ID = hash_to_bytes("8ff44f081d43176474b267de5451f2c2e88089d0") | ||||
REVISION_SWHID = CoreSWHID(object_type=ObjectType.REVISION, object_id=REVISION_ID) | RELEASE_ID = hash_to_bytes("9477a708196b44e59efb4e47b7d979a4146bd428") | ||||
RELEASE_SWHID = CoreSWHID(object_type=ObjectType.RELEASE, object_id=RELEASE_ID) | |||||
DIRECTORY_ID = hash_to_bytes("aa" * 20) | DIRECTORY_ID = hash_to_bytes("aa" * 20) | ||||
DIRECTORY_SWHID = ExtendedSWHID( | DIRECTORY_SWHID = ExtendedSWHID( | ||||
object_type=ExtendedObjectType.DIRECTORY, object_id=DIRECTORY_ID | object_type=ExtendedObjectType.DIRECTORY, object_id=DIRECTORY_ID | ||||
) | ) | ||||
FETCHER = MetadataFetcher( | FETCHER = MetadataFetcher( | ||||
name="swh.loader.package.tests.test_loader_metadata.MetadataTestLoader", | name="swh.loader.package.tests.test_loader_metadata.MetadataTestLoader", | ||||
version=__version__, | version=__version__, | ||||
) | ) | ||||
DISCOVERY_DATE = datetime.datetime.now(tz=datetime.timezone.utc) | DISCOVERY_DATE = datetime.datetime.now(tz=datetime.timezone.utc) | ||||
DIRECTORY_METADATA = [ | DIRECTORY_METADATA = [ | ||||
RawExtrinsicMetadata( | RawExtrinsicMetadata( | ||||
target=DIRECTORY_SWHID, | target=DIRECTORY_SWHID, | ||||
discovery_date=DISCOVERY_DATE, | discovery_date=DISCOVERY_DATE, | ||||
authority=AUTHORITY, | authority=AUTHORITY, | ||||
fetcher=FETCHER, | fetcher=FETCHER, | ||||
format="test-format1", | format="test-format1", | ||||
metadata=b"foo bar", | metadata=b"foo bar", | ||||
origin=ORIGIN_URL, | origin=ORIGIN_URL, | ||||
revision=REVISION_SWHID, | release=RELEASE_SWHID, | ||||
), | ), | ||||
RawExtrinsicMetadata( | RawExtrinsicMetadata( | ||||
target=DIRECTORY_SWHID, | target=DIRECTORY_SWHID, | ||||
discovery_date=DISCOVERY_DATE + datetime.timedelta(seconds=1), | discovery_date=DISCOVERY_DATE + datetime.timedelta(seconds=1), | ||||
authority=AUTHORITY, | authority=AUTHORITY, | ||||
fetcher=FETCHER, | fetcher=FETCHER, | ||||
format="test-format2", | format="test-format2", | ||||
metadata=b"bar baz", | metadata=b"bar baz", | ||||
origin=ORIGIN_URL, | origin=ORIGIN_URL, | ||||
revision=REVISION_SWHID, | release=RELEASE_SWHID, | ||||
), | ), | ||||
] | ] | ||||
ORIGIN_METADATA = [ | ORIGIN_METADATA = [ | ||||
RawExtrinsicMetadata( | RawExtrinsicMetadata( | ||||
target=ORIGIN_SWHID, | target=ORIGIN_SWHID, | ||||
discovery_date=datetime.datetime.now(tz=datetime.timezone.utc), | discovery_date=datetime.datetime.now(tz=datetime.timezone.utc), | ||||
authority=AUTHORITY, | authority=AUTHORITY, | ||||
fetcher=FETCHER, | fetcher=FETCHER, | ||||
format="test-format3", | format="test-format3", | ||||
metadata=b"baz qux", | metadata=b"baz qux", | ||||
), | ), | ||||
] | ] | ||||
class MetadataTestLoader(PackageLoader[BasePackageInfo]): | class MetadataTestLoader(PackageLoader[BasePackageInfo]): | ||||
def get_versions(self) -> Sequence[str]: | def get_versions(self) -> Sequence[str]: | ||||
return ["v1.0.0"] | return ["v1.0.0"] | ||||
def _load_directory(self, dl_artifacts, tmpdir): | def _load_directory(self, dl_artifacts, tmpdir): | ||||
class directory: | class directory: | ||||
hash = DIRECTORY_ID | hash = DIRECTORY_ID | ||||
return (None, directory) # just enough for _load_revision to work | return (None, directory) # just enough for _load_release to work | ||||
def download_package(self, p_info: BasePackageInfo, tmpdir: str): | def download_package(self, p_info: BasePackageInfo, tmpdir: str): | ||||
return [("path", {"artifact_key": "value", "length": 0})] | return [("path", {"artifact_key": "value", "length": 0})] | ||||
def build_revision( | def build_revision( | ||||
self, p_info: BasePackageInfo, uncompressed_path: str, directory: Sha1Git | self, p_info: BasePackageInfo, uncompressed_path: str, directory: Sha1Git | ||||
): | ): | ||||
return Revision( | return Revision( | ||||
▲ Show 20 Lines • Show All 49 Lines • ▼ Show 20 Lines | def test_load_artifact_metadata(swh_storage, caplog): | ||||
assert result.results[0] == RawExtrinsicMetadata( | assert result.results[0] == RawExtrinsicMetadata( | ||||
target=DIRECTORY_SWHID, | target=DIRECTORY_SWHID, | ||||
discovery_date=result.results[0].discovery_date, | discovery_date=result.results[0].discovery_date, | ||||
authority=authority, | authority=authority, | ||||
fetcher=FETCHER, | fetcher=FETCHER, | ||||
format="original-artifacts-json", | format="original-artifacts-json", | ||||
metadata=b'[{"artifact_key": "value", "length": 0}]', | metadata=b'[{"artifact_key": "value", "length": 0}]', | ||||
origin=ORIGIN_URL, | origin=ORIGIN_URL, | ||||
revision=REVISION_SWHID, | release=RELEASE_SWHID, | ||||
) | ) | ||||
def test_load_metadata(swh_storage, caplog): | def test_load_metadata(swh_storage, caplog): | ||||
loader = MetadataTestLoader(swh_storage, ORIGIN_URL) | loader = MetadataTestLoader(swh_storage, ORIGIN_URL) | ||||
load_status = loader.load() | load_status = loader.load() | ||||
assert load_status == { | assert load_status == { | ||||
▲ Show 20 Lines • Show All 45 Lines • Show Last 20 Lines |