Changeset View
Changeset View
Standalone View
Standalone View
swh/loader/package/npm/tests/test_npm.py
Show All 14 Lines | from swh.loader.package.npm.loader import ( | ||||
extract_npm_package_author, | extract_npm_package_author, | ||||
artifact_to_revision_id, | artifact_to_revision_id, | ||||
) | ) | ||||
from swh.loader.package.tests.common import ( | from swh.loader.package.tests.common import ( | ||||
check_snapshot, | check_snapshot, | ||||
check_metadata_paths, | check_metadata_paths, | ||||
get_stats, | get_stats, | ||||
) | ) | ||||
from swh.loader.tests.common import assert_last_visit_ok | |||||
def test_extract_npm_package_author(datadir): | def test_extract_npm_package_author(datadir): | ||||
package_metadata_filepath = os.path.join( | package_metadata_filepath = os.path.join( | ||||
datadir, "https_replicate.npmjs.com", "org_visit1" | datadir, "https_replicate.npmjs.com", "org_visit1" | ||||
) | ) | ||||
with open(package_metadata_filepath) as json_file: | with open(package_metadata_filepath) as json_file: | ||||
▲ Show 20 Lines • Show All 314 Lines • ▼ Show 20 Lines | |||||
def test_npm_loader_incremental_visit(swh_config, requests_mock_datadir_visits): | def test_npm_loader_incremental_visit(swh_config, requests_mock_datadir_visits): | ||||
package = "org" | package = "org" | ||||
url = package_url(package) | url = package_url(package) | ||||
loader = NpmLoader(url) | loader = NpmLoader(url) | ||||
actual_load_status = loader.load() | actual_load_status = loader.load() | ||||
assert actual_load_status["status"] == "eventful" | assert actual_load_status["status"] == "eventful" | ||||
assert actual_load_status["status"] is not None | assert actual_load_status["status"] is not None | ||||
origin_visit = loader.storage.origin_visit_get_latest(url) | assert_last_visit_ok(loader.storage, url, status="full", type="npm") | ||||
assert origin_visit["status"] == "full" | |||||
assert origin_visit["type"] == "npm" | |||||
stats = get_stats(loader.storage) | stats = get_stats(loader.storage) | ||||
assert { | assert { | ||||
"content": len(_expected_new_contents_first_visit), | "content": len(_expected_new_contents_first_visit), | ||||
"directory": len(_expected_new_directories_first_visit), | "directory": len(_expected_new_directories_first_visit), | ||||
"origin": 1, | "origin": 1, | ||||
"origin_visit": 1, | "origin_visit": 1, | ||||
"person": 2, | "person": 2, | ||||
"release": 0, | "release": 0, | ||||
"revision": len(_expected_new_revisions_first_visit), | "revision": len(_expected_new_revisions_first_visit), | ||||
"skipped_content": 0, | "skipped_content": 0, | ||||
"snapshot": 1, | "snapshot": 1, | ||||
} == stats | } == stats | ||||
loader._info = None # reset loader internal state | loader._info = None # reset loader internal state | ||||
actual_load_status2 = loader.load() | actual_load_status2 = loader.load() | ||||
assert actual_load_status2["status"] == "eventful" | assert actual_load_status2["status"] == "eventful" | ||||
snap_id2 = actual_load_status2["snapshot_id"] | snap_id2 = actual_load_status2["snapshot_id"] | ||||
assert snap_id2 is not None | assert snap_id2 is not None | ||||
assert snap_id2 != actual_load_status["snapshot_id"] | assert snap_id2 != actual_load_status["snapshot_id"] | ||||
origin_visit2 = loader.storage.origin_visit_get_latest(url) | assert_last_visit_ok(loader.storage, url, status="full", type="npm") | ||||
assert origin_visit2["status"] == "full" | |||||
assert origin_visit2["type"] == "npm" | |||||
stats = get_stats(loader.storage) | stats = get_stats(loader.storage) | ||||
assert { # 3 new releases artifacts | assert { # 3 new releases artifacts | ||||
"content": len(_expected_new_contents_first_visit) + 14, | "content": len(_expected_new_contents_first_visit) + 14, | ||||
"directory": len(_expected_new_directories_first_visit) + 15, | "directory": len(_expected_new_directories_first_visit) + 15, | ||||
"origin": 1, | "origin": 1, | ||||
"origin_visit": 2, | "origin_visit": 2, | ||||
Show All 16 Lines | |||||
def test_npm_loader_version_divergence(swh_config): | def test_npm_loader_version_divergence(swh_config): | ||||
package = "@aller_shared" | package = "@aller_shared" | ||||
url = package_url(package) | url = package_url(package) | ||||
loader = NpmLoader(url) | loader = NpmLoader(url) | ||||
actual_load_status = loader.load() | actual_load_status = loader.load() | ||||
assert actual_load_status["status"] == "eventful" | assert actual_load_status["status"] == "eventful" | ||||
assert actual_load_status["status"] is not None | assert actual_load_status["status"] is not None | ||||
origin_visit = loader.storage.origin_visit_get_latest(url) | assert_last_visit_ok(loader.storage, url, status="full", type="npm") | ||||
assert origin_visit["status"] == "full" | |||||
assert origin_visit["type"] == "npm" | |||||
stats = get_stats(loader.storage) | stats = get_stats(loader.storage) | ||||
assert { # 1 new releases artifacts | assert { # 1 new releases artifacts | ||||
"content": 534, | "content": 534, | ||||
"directory": 153, | "directory": 153, | ||||
"origin": 1, | "origin": 1, | ||||
"origin_visit": 1, | "origin_visit": 1, | ||||
▲ Show 20 Lines • Show All 95 Lines • ▼ Show 20 Lines | def test_npm_artifact_with_no_intrinsic_metadata(swh_config, requests_mock_datadir): | ||||
# no branch as one artifact without any intrinsic metadata | # no branch as one artifact without any intrinsic metadata | ||||
expected_snapshot = { | expected_snapshot = { | ||||
"id": "1a8893e6a86f444e8be8e7bda6cb34fb1735a00e", | "id": "1a8893e6a86f444e8be8e7bda6cb34fb1735a00e", | ||||
"branches": {}, | "branches": {}, | ||||
} | } | ||||
check_snapshot(expected_snapshot, loader.storage) | check_snapshot(expected_snapshot, loader.storage) | ||||
origin_visit = loader.storage.origin_visit_get_latest(url) | assert_last_visit_ok(loader.storage, url, status="full", type="npm") | ||||
assert origin_visit["status"] == "full" | |||||
assert origin_visit["type"] == "npm" | |||||
def test_npm_artifact_with_no_upload_time(swh_config, requests_mock_datadir): | def test_npm_artifact_with_no_upload_time(swh_config, requests_mock_datadir): | ||||
"""With no time upload, artifact is skipped | """With no time upload, artifact is skipped | ||||
""" | """ | ||||
package = "jammit-no-time" | package = "jammit-no-time" | ||||
url = package_url(package) | url = package_url(package) | ||||
loader = NpmLoader(url) | loader = NpmLoader(url) | ||||
actual_load_status = loader.load() | actual_load_status = loader.load() | ||||
assert actual_load_status["status"] == "uneventful" | assert actual_load_status["status"] == "uneventful" | ||||
# no branch as one artifact without any intrinsic metadata | # no branch as one artifact without any intrinsic metadata | ||||
expected_snapshot = { | expected_snapshot = { | ||||
"id": "1a8893e6a86f444e8be8e7bda6cb34fb1735a00e", | "id": "1a8893e6a86f444e8be8e7bda6cb34fb1735a00e", | ||||
"branches": {}, | "branches": {}, | ||||
} | } | ||||
check_snapshot(expected_snapshot, loader.storage) | check_snapshot(expected_snapshot, loader.storage) | ||||
origin_visit = loader.storage.origin_visit_get_latest(url) | assert_last_visit_ok(loader.storage, url, status="partial", type="npm") | ||||
assert origin_visit["status"] == "partial" | |||||
assert origin_visit["type"] == "npm" | |||||
def test_npm_artifact_use_mtime_if_no_time(swh_config, requests_mock_datadir): | def test_npm_artifact_use_mtime_if_no_time(swh_config, requests_mock_datadir): | ||||
"""With no time upload, artifact is skipped | """With no time upload, artifact is skipped | ||||
""" | """ | ||||
package = "jammit-express" | package = "jammit-express" | ||||
url = package_url(package) | url = package_url(package) | ||||
Show All 10 Lines | expected_snapshot = { | ||||
"releases/0.0.1": { | "releases/0.0.1": { | ||||
"target_type": "revision", | "target_type": "revision", | ||||
"target": "9e4dd2b40d1b46b70917c0949aa2195c823a648e", | "target": "9e4dd2b40d1b46b70917c0949aa2195c823a648e", | ||||
}, | }, | ||||
}, | }, | ||||
} | } | ||||
check_snapshot(expected_snapshot, loader.storage) | check_snapshot(expected_snapshot, loader.storage) | ||||
origin_visit = loader.storage.origin_visit_get_latest(url) | assert_last_visit_ok(loader.storage, url, status="full", type="npm") | ||||
assert origin_visit["status"] == "full" | |||||
assert origin_visit["type"] == "npm" | |||||
def test_npm_no_artifact(swh_config, requests_mock_datadir): | def test_npm_no_artifact(swh_config, requests_mock_datadir): | ||||
"""If no artifacts at all is found for origin, the visit fails completely | """If no artifacts at all is found for origin, the visit fails completely | ||||
""" | """ | ||||
package = "catify" | package = "catify" | ||||
url = package_url(package) | url = package_url(package) | ||||
loader = NpmLoader(url) | loader = NpmLoader(url) | ||||
actual_load_status = loader.load() | actual_load_status = loader.load() | ||||
assert actual_load_status == { | assert actual_load_status == { | ||||
"status": "failed", | "status": "failed", | ||||
} | } | ||||
origin_visit = loader.storage.origin_visit_get_latest(url) | assert_last_visit_ok(loader.storage, url, status="partial", type="npm") | ||||
assert origin_visit["status"] == "partial" | |||||
assert origin_visit["type"] == "npm" |