Changeset View
Changeset View
Standalone View
Standalone View
swh/lister/core/tests/test_lister.py
# Copyright (C) 2019 the Software Heritage developers | # Copyright (C) 2019 the Software Heritage developers | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import abc | import abc | ||||
import datetime | |||||
import time | import time | ||||
from unittest import TestCase | from unittest import TestCase | ||||
from unittest.mock import Mock, patch | from unittest.mock import Mock, patch | ||||
import requests_mock | import requests_mock | ||||
from sqlalchemy import create_engine | from sqlalchemy import create_engine | ||||
from swh.lister.core.abstractattribute import AbstractAttribute | from swh.lister.core.abstractattribute import AbstractAttribute | ||||
Show All 32 Lines | def mock_rate_quota(self, n, request, context): | ||||
return '{"error":"dummy"}' | return '{"error":"dummy"}' | ||||
def __init__(self, *args, **kwargs): | def __init__(self, *args, **kwargs): | ||||
super().__init__(*args, **kwargs) | super().__init__(*args, **kwargs) | ||||
self.rate_limit = 1 | self.rate_limit = 1 | ||||
self.response = None | self.response = None | ||||
self.fl = None | self.fl = None | ||||
self.helper = None | self.helper = None | ||||
self.scheduler_tasks = [] | |||||
if self.__class__ != HttpListerTesterBase: | if self.__class__ != HttpListerTesterBase: | ||||
self.run = TestCase.run.__get__(self, self.__class__) | self.run = TestCase.run.__get__(self, self.__class__) | ||||
else: | else: | ||||
self.run = noop | self.run = noop | ||||
def mock_limit_n_response(self, n, request, context): | def mock_limit_n_response(self, n, request, context): | ||||
self.fl.reset_backoff() | self.fl.reset_backoff() | ||||
if self.rate_limit <= n: | if self.rate_limit <= n: | ||||
Show All 15 Lines | def get_fl(self, override_config=None): | ||||
""" | """ | ||||
if override_config or self.fl is None: | if override_config or self.fl is None: | ||||
self.fl = self.Lister(url='https://fakeurl', | self.fl = self.Lister(url='https://fakeurl', | ||||
override_config=override_config) | override_config=override_config) | ||||
self.fl.INITIAL_BACKOFF = 1 | self.fl.INITIAL_BACKOFF = 1 | ||||
self.fl.reset_backoff() | self.fl.reset_backoff() | ||||
self.scheduler_tasks = [] | |||||
return self.fl | return self.fl | ||||
def disable_scheduler(self, fl): | def disable_scheduler(self, fl): | ||||
fl.schedule_missing_tasks = Mock(return_value=None) | fl.schedule_missing_tasks = Mock(return_value=None) | ||||
def mock_scheduler(self, fl): | |||||
def _create_tasks(tasks): | |||||
task_id = 0 | |||||
current_nb_tasks = len(self.scheduler_tasks) | |||||
if current_nb_tasks > 0: | |||||
task_id = self.scheduler_tasks[-1]['id'] + 1 | |||||
for task in tasks: | |||||
scheduler_task = dict(task) | |||||
scheduler_task.update({ | |||||
'status': 'next_run_not_scheduled', | |||||
'retries_left': 0, | |||||
'priority': None, | |||||
'id': task_id, | |||||
'current_interval': datetime.timedelta(days=64) | |||||
}) | |||||
self.scheduler_tasks.append(scheduler_task) | |||||
task_id = task_id + 1 | |||||
return self.scheduler_tasks[current_nb_tasks:] | |||||
def _disable_tasks(task_ids): | |||||
for task_id in task_ids: | |||||
self.scheduler_tasks[task_id]['status'] = 'disabled' | |||||
fl.scheduler.create_tasks = Mock(wraps=_create_tasks) | |||||
ardumont: Thanks btw, i did not that one (since 3.5) [1]
I would have probably tried to use "side_effect"… | |||||
Not Done Inline ActionsI did not know* that one ardumont: I did not know* that one | |||||
fl.scheduler.disable_tasks = Mock(wraps=_disable_tasks) | |||||
def disable_db(self, fl): | def disable_db(self, fl): | ||||
fl.winnow_models = Mock(return_value=[]) | fl.winnow_models = Mock(return_value=[]) | ||||
fl.db_inject_repo = Mock(return_value=fl.MODEL()) | fl.db_inject_repo = Mock(return_value=fl.MODEL()) | ||||
fl.disable_deleted_repo_tasks = Mock(return_value=None) | fl.disable_deleted_repo_tasks = Mock(return_value=None) | ||||
def init_db(self, db, model): | def init_db(self, db, model): | ||||
engine = create_engine(db.url()) | engine = create_engine(db.url()) | ||||
model.metadata.create_all(engine) | model.metadata.create_all(engine) | ||||
▲ Show 20 Lines • Show All 73 Lines • ▼ Show 20 Lines | def create_fl_with_db(self, http_mocker): | ||||
'lister': { | 'lister': { | ||||
'cls': 'local', | 'cls': 'local', | ||||
'args': {'db': db.url()} | 'args': {'db': db.url()} | ||||
} | } | ||||
}) | }) | ||||
fl.db = db | fl.db = db | ||||
self.init_db(db, fl.MODEL) | self.init_db(db, fl.MODEL) | ||||
self.disable_scheduler(fl) | self.mock_scheduler(fl) | ||||
return fl | return fl | ||||
@requests_mock.Mocker() | @requests_mock.Mocker() | ||||
def test_fetch_multiple_pages_yesdb(self, http_mocker): | def test_fetch_multiple_pages_yesdb(self, http_mocker): | ||||
fl = self.create_fl_with_db(http_mocker) | fl = self.create_fl_with_db(http_mocker) | ||||
fl.run(min_bound=self.first_index) | fl.run(min_bound=self.first_index) | ||||
self.assertEqual(fl.db_last_index(), self.last_index) | self.assertEqual(fl.db_last_index(), self.last_index) | ||||
partitions = fl.db_partition_indices(5) | partitions = fl.db_partition_indices(5) | ||||
self.assertGreater(len(partitions), 0) | self.assertGreater(len(partitions), 0) | ||||
for k in partitions: | for k in partitions: | ||||
self.assertLessEqual(len(k), 5) | self.assertLessEqual(len(k), 5) | ||||
self.assertGreater(len(k), 0) | self.assertGreater(len(k), 0) | ||||
@requests_mock.Mocker() | @requests_mock.Mocker() | ||||
def test_fetch_none_nodb(self, http_mocker): | def test_fetch_none_nodb(self, http_mocker): | ||||
▲ Show 20 Lines • Show All 149 Lines • Show Last 20 Lines |
Thanks btw, i did not that one (since 3.5) [1]
I would have probably tried to use "side_effect" instead.
[1] https://docs.python.org/3/library/unittest.mock.html#unittest.mock.Mock