Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/package/nixguix/tests/test_nixguix.py
Show All 21 Lines | from swh.loader.package.nixguix.loader import ( | ||||
parse_sources, | parse_sources, | ||||
retrieve_sources, | retrieve_sources, | ||||
) | ) | ||||
from swh.loader.package.utils import download | from swh.loader.package.utils import download | ||||
from swh.loader.tests import assert_last_visit_matches | from swh.loader.tests import assert_last_visit_matches | ||||
from swh.loader.tests import check_snapshot as check_snapshot_full | from swh.loader.tests import check_snapshot as check_snapshot_full | ||||
from swh.loader.tests import get_stats | from swh.loader.tests import get_stats | ||||
from swh.model.hashutil import hash_to_bytes, hash_to_hex | from swh.model.hashutil import hash_to_bytes, hash_to_hex | ||||
from swh.model.identifiers import SWHID | from swh.model.identifiers import ExtendedObjectType, ExtendedSWHID | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
MetadataAuthority, | MetadataAuthority, | ||||
MetadataAuthorityType, | MetadataAuthorityType, | ||||
MetadataFetcher, | MetadataFetcher, | ||||
MetadataTargetType, | |||||
RawExtrinsicMetadata, | RawExtrinsicMetadata, | ||||
Snapshot, | Snapshot, | ||||
SnapshotBranch, | SnapshotBranch, | ||||
TargetType, | TargetType, | ||||
) | ) | ||||
from swh.storage.algos.origin import origin_get_latest_visit_status | from swh.storage.algos.origin import origin_get_latest_visit_status | ||||
from swh.storage.algos.snapshot import snapshot_get_all_branches | from swh.storage.algos.snapshot import snapshot_get_all_branches | ||||
from swh.storage.exc import HashCollision | from swh.storage.exc import HashCollision | ||||
▲ Show 20 Lines • Show All 253 Lines • ▼ Show 20 Lines | def test_loader_one_visit(swh_storage, requests_mock_datadir, raw_sources): | ||||
# The visit is partial because urls pointing to non tarball file | # The visit is partial because urls pointing to non tarball file | ||||
# are not handled yet | # are not handled yet | ||||
assert_last_visit_matches( | assert_last_visit_matches( | ||||
swh_storage, sources_url, status="partial", type="nixguix" | swh_storage, sources_url, status="partial", type="nixguix" | ||||
) | ) | ||||
visit_status = origin_get_latest_visit_status(swh_storage, sources_url) | visit_status = origin_get_latest_visit_status(swh_storage, sources_url) | ||||
snapshot_swhid = SWHID( | snapshot_swhid = ExtendedSWHID( | ||||
object_type="snapshot", object_id=hash_to_hex(visit_status.snapshot) | object_type=ExtendedObjectType.SNAPSHOT, object_id=visit_status.snapshot | ||||
) | ) | ||||
metadata_authority = MetadataAuthority( | metadata_authority = MetadataAuthority( | ||||
type=MetadataAuthorityType.FORGE, url=sources_url, | type=MetadataAuthorityType.FORGE, url=sources_url, | ||||
) | ) | ||||
expected_metadata = [ | expected_metadata = [ | ||||
RawExtrinsicMetadata( | RawExtrinsicMetadata( | ||||
type=MetadataTargetType.SNAPSHOT, | |||||
target=snapshot_swhid, | target=snapshot_swhid, | ||||
authority=metadata_authority, | authority=metadata_authority, | ||||
fetcher=MetadataFetcher( | fetcher=MetadataFetcher( | ||||
name="swh.loader.package.nixguix.loader.NixGuixLoader", | name="swh.loader.package.nixguix.loader.NixGuixLoader", | ||||
version=__version__, | version=__version__, | ||||
), | ), | ||||
discovery_date=loader.visit_date, | discovery_date=loader.visit_date, | ||||
format="nixguix-sources-json", | format="nixguix-sources-json", | ||||
metadata=raw_sources, | metadata=raw_sources, | ||||
origin=sources_url, | origin=sources_url, | ||||
) | ) | ||||
] | ] | ||||
assert swh_storage.raw_extrinsic_metadata_get( | assert swh_storage.raw_extrinsic_metadata_get( | ||||
MetadataTargetType.SNAPSHOT, snapshot_swhid, metadata_authority, | snapshot_swhid, metadata_authority, | ||||
) == PagedResult(next_page_token=None, results=expected_metadata,) | ) == PagedResult(next_page_token=None, results=expected_metadata,) | ||||
def test_uncompress_failure(swh_storage, requests_mock_datadir): | def test_uncompress_failure(swh_storage, requests_mock_datadir): | ||||
"""Non tarball files are currently not supported and the uncompress | """Non tarball files are currently not supported and the uncompress | ||||
function fails on such kind of files. | function fails on such kind of files. | ||||
However, even in this case of failure (because of the url | However, even in this case of failure (because of the url | ||||
▲ Show 20 Lines • Show All 372 Lines • Show Last 20 Lines |