Changeset View
Changeset View
Standalone View
Standalone View
swh/lister/core/tests/test_lister.py
Show First 20 Lines • Show All 302 Lines • ▼ Show 20 Lines | def test_api_request(self, http_mocker): | ||||
"""Test API request for rate limit handling | """Test API request for rate limit handling | ||||
""" | """ | ||||
http_mocker.get(self.test_re, text=self.mock_limit_twice_response) | http_mocker.get(self.test_re, text=self.mock_limit_twice_response) | ||||
with patch.object(time, 'sleep', wraps=time.sleep) as sleepmock: | with patch.object(time, 'sleep', wraps=time.sleep) as sleepmock: | ||||
self.get_api_response(self.first_index) | self.get_api_response(self.first_index) | ||||
self.assertEqual(sleepmock.call_count, 2) | self.assertEqual(sleepmock.call_count, 2) | ||||
def scheduled_tasks_test(self, next_api_response_file, next_last_index, | |||||
http_mocker): | |||||
"""Check that no loading tasks get disabled when processing a new | |||||
page of repositories returned by a forge API | |||||
""" | |||||
fl = self.create_fl_with_db(http_mocker) | |||||
# process first page of repositories listing | |||||
fl.run() | |||||
# process second page of repositories listing | |||||
prev_last_index = self.last_index | |||||
self.first_index = self.last_index | |||||
self.last_index = next_last_index | |||||
self.good_api_response_file = next_api_response_file | |||||
fl.run(min_bound=prev_last_index) | |||||
# check expected number of ingested repos and loading tasks | |||||
ingested_repos = list(fl.db_query_range(0, self.last_index)) | |||||
self.assertEqual(len(ingested_repos), len(self.scheduler_tasks)) | |||||
self.assertEqual(len(ingested_repos), 2 * self.entries_per_page) | |||||
# check tasks are not disabled | |||||
for task in self.scheduler_tasks: | |||||
self.assertTrue(task['status'] != 'disabled') | |||||
class HttpSimpleListerTester(HttpListerTesterBase, abc.ABC): | class HttpSimpleListerTester(HttpListerTesterBase, abc.ABC): | ||||
"""Base testing class for subclass of | """Base testing class for subclass of | ||||
:class:`swh.lister.core.simple)_lister.SimpleLister` | :class:`swh.lister.core.simple)_lister.SimpleLister` | ||||
See :class:`swh.lister.pypi.tests.test_lister` for an example of how | See :class:`swh.lister.pypi.tests.test_lister` for an example of how | ||||
to customize for a specific listing service. | to customize for a specific listing service. | ||||
▲ Show 20 Lines • Show All 68 Lines • Show Last 20 Lines |