Changeset View
Changeset View
Standalone View
Standalone View
swh/lister/npm/tests/test_lister.py
Show First 20 Lines • Show All 69 Lines • ▼ Show 20 Lines | for row in tasks: | ||||
package_url = kwargs["url"] | package_url = kwargs["url"] | ||||
package_name = package_url.split("/")[-1] | package_name = package_url.split("/")[-1] | ||||
assert package_url == f"https://www.npmjs.com/package/{package_name}" | assert package_url == f"https://www.npmjs.com/package/{package_name}" | ||||
assert row["policy"] == "recurring" | assert row["policy"] == "recurring" | ||||
assert row["priority"] is None | assert row["priority"] is None | ||||
def test_lister_npm_basic_listing(lister_npm, requests_mock_datadir): | def test_lister_npm_basic_listing(initialized_lister, requests_mock_datadir): | ||||
lister_npm.run() | initialized_lister.run() | ||||
tasks = lister_npm.scheduler.search_tasks(task_type="load-npm") | tasks = initialized_lister.scheduler.search_tasks(task_type="load-npm") | ||||
assert len(tasks) == 100 | assert len(tasks) == 100 | ||||
check_tasks(tasks) | check_tasks(tasks) | ||||
def test_lister_npm_listing_pagination(lister_npm, requests_mock_datadir): | def test_lister_npm_listing_pagination(initialized_lister, requests_mock_datadir): | ||||
lister = lister_npm | lister = initialized_lister | ||||
# Patch per page pagination | # Patch per page pagination | ||||
lister.per_page = 10 + 1 | lister.per_page = 10 + 1 | ||||
lister.PATH_TEMPLATE = lister.PATH_TEMPLATE.replace( | lister.PATH_TEMPLATE = lister.PATH_TEMPLATE.replace( | ||||
"&limit=1001", "&limit=%s" % lister.per_page | "&limit=1001", "&limit=%s" % lister.per_page | ||||
) | ) | ||||
lister.run() | lister.run() | ||||
tasks = lister.scheduler.search_tasks(task_type="load-npm") | tasks = lister.scheduler.search_tasks(task_type="load-npm") | ||||
assert len(tasks) == 2 * 10 # only 2 files with 10 results each | assert len(tasks) == 2 * 10 # only 2 files with 10 results each | ||||
check_tasks(tasks) | check_tasks(tasks) |