Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/package/npm/tests/test_npm.py
Show First 20 Lines • Show All 376 Lines • ▼ Show 20 Lines | |||||
def package_metadata_url(package): | def package_metadata_url(package): | ||||
return 'https://replicate.npmjs.com/%s/' % package | return 'https://replicate.npmjs.com/%s/' % package | ||||
def test_revision_metadata_structure(swh_config, requests_mock_datadir): | def test_revision_metadata_structure(swh_config, requests_mock_datadir): | ||||
package = 'org' | package = 'org' | ||||
loader = NpmLoader(package, | loader = NpmLoader(package_url(package)) | ||||
package_url(package), | |||||
package_metadata_url(package)) | |||||
actual_load_status = loader.load() | actual_load_status = loader.load() | ||||
assert actual_load_status['status'] == 'eventful' | assert actual_load_status['status'] == 'eventful' | ||||
assert actual_load_status['snapshot_id'] is not None | assert actual_load_status['snapshot_id'] is not None | ||||
expected_revision_id = hash_to_bytes( | expected_revision_id = hash_to_bytes( | ||||
'd8a1c7474d2956ac598a19f0f27d52f7015f117e') | 'd8a1c7474d2956ac598a19f0f27d52f7015f117e') | ||||
revision = list(loader.storage.revision_get([expected_revision_id]))[0] | revision = list(loader.storage.revision_get([expected_revision_id]))[0] | ||||
Show All 15 Lines | for original_artifact in revision['metadata']['original_artifact']: | ||||
('length', int), | ('length', int), | ||||
('checksums', dict), | ('checksums', dict), | ||||
]) | ]) | ||||
def test_npm_loader_first_visit(swh_config, requests_mock_datadir): | def test_npm_loader_first_visit(swh_config, requests_mock_datadir): | ||||
package = 'org' | package = 'org' | ||||
loader = NpmLoader(package, | loader = NpmLoader(package_url(package)) | ||||
package_url(package), | |||||
package_metadata_url(package)) | |||||
actual_load_status = loader.load() | actual_load_status = loader.load() | ||||
expected_snapshot_id = 'd0587e1195aed5a8800411a008f2f2d627f18e2d' | expected_snapshot_id = 'd0587e1195aed5a8800411a008f2f2d627f18e2d' | ||||
assert actual_load_status == { | assert actual_load_status == { | ||||
'status': 'eventful', | 'status': 'eventful', | ||||
'snapshot_id': expected_snapshot_id | 'snapshot_id': expected_snapshot_id | ||||
} | } | ||||
▲ Show 20 Lines • Show All 44 Lines • ▼ Show 20 Lines | def test_npm_loader_first_visit(swh_config, requests_mock_datadir): | ||||
} | } | ||||
check_snapshot(expected_snapshot, loader.storage) | check_snapshot(expected_snapshot, loader.storage) | ||||
def test_npm_loader_incremental_visit( | def test_npm_loader_incremental_visit( | ||||
swh_config, requests_mock_datadir_visits): | swh_config, requests_mock_datadir_visits): | ||||
package = 'org' | package = 'org' | ||||
url = package_url(package) | url = package_url(package) | ||||
metadata_url = package_metadata_url(package) | loader = NpmLoader(url) | ||||
loader = NpmLoader(package, url, metadata_url) | |||||
actual_load_status = loader.load() | actual_load_status = loader.load() | ||||
assert actual_load_status['status'] == 'eventful' | assert actual_load_status['status'] == 'eventful' | ||||
assert actual_load_status['status'] is not None | assert actual_load_status['status'] is not None | ||||
origin_visit = list(loader.storage.origin_visit_get(url))[-1] | origin_visit = list(loader.storage.origin_visit_get(url))[-1] | ||||
assert origin_visit['status'] == 'full' | assert origin_visit['status'] == 'full' | ||||
assert origin_visit['type'] == 'npm' | assert origin_visit['type'] == 'npm' | ||||
▲ Show 20 Lines • Show All 42 Lines • ▼ Show 20 Lines | def test_npm_loader_incremental_visit( | ||||
] | ] | ||||
assert len(urls) == len(set(urls)) # we visited each artifact once across | assert len(urls) == len(set(urls)) # we visited each artifact once across | ||||
@pytest.mark.usefixtures('requests_mock_datadir') | @pytest.mark.usefixtures('requests_mock_datadir') | ||||
def test_npm_loader_version_divergence(swh_config): | def test_npm_loader_version_divergence(swh_config): | ||||
package = '@aller_shared' | package = '@aller_shared' | ||||
url = package_url(package) | url = package_url(package) | ||||
loader = NpmLoader(package, url, package_metadata_url(package)) | loader = NpmLoader(url) | ||||
actual_load_status = loader.load() | actual_load_status = loader.load() | ||||
assert actual_load_status['status'] == 'eventful' | assert actual_load_status['status'] == 'eventful' | ||||
assert actual_load_status['status'] is not None | assert actual_load_status['status'] is not None | ||||
origin_visit = list(loader.storage.origin_visit_get(url))[-1] | origin_visit = list(loader.storage.origin_visit_get(url))[-1] | ||||
assert origin_visit['status'] == 'full' | assert origin_visit['status'] == 'full' | ||||
assert origin_visit['type'] == 'npm' | assert origin_visit['type'] == 'npm' | ||||
▲ Show 20 Lines • Show All 108 Lines • Show Last 20 Lines |