Changeset View
Changeset View
Standalone View
Standalone View
swh/scheduler/tests/updater/test_writer.py
# Copyright (C) 2018 The Software Heritage developers | # Copyright (C) 2018 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import os | import os | ||||
import unittest | import unittest | ||||
from nose.plugins.attrib import attr | from nose.plugins.attrib import attr | ||||
from nose.tools import istest | |||||
from swh.core.tests.db_testing import DbTestFixture | from swh.core.tests.db_testing import DbTestFixture | ||||
from swh.scheduler.updater.events import SWHEvent | from swh.scheduler.tests import DATA_DIR | ||||
from swh.scheduler.updater.events import LISTENED_EVENTS, SWHEvent | |||||
from swh.scheduler.updater.writer import UpdaterWriter | from swh.scheduler.updater.writer import UpdaterWriter | ||||
from swh.scheduler.updater.events import LISTENED_EVENTS | |||||
from swh.scheduler.tests import DATA_DIR | |||||
from . import UpdaterTestUtil | from . import UpdaterTestUtil | ||||
@attr('db') | @attr('db') | ||||
class CommonSchedulerTest(DbTestFixture): | class CommonSchedulerTest(DbTestFixture): | ||||
TEST_SCHED_DB = 'softwareheritage-scheduler-test' | TEST_SCHED_DB = 'softwareheritage-scheduler-test' | ||||
TEST_SCHED_DUMP = os.path.join( | TEST_SCHED_DUMP = os.path.join( | ||||
DATA_DIR, 'dumps/swh-scheduler.sql') | DATA_DIR, 'dumps/swh-scheduler.sql') | ||||
▲ Show 20 Lines • Show All 43 Lines • ▼ Show 20 Lines | def setUp(self): | ||||
self.scheduler_backend = self.writer.scheduler_backend | self.scheduler_backend = self.writer.scheduler_backend | ||||
self.scheduler_updater_backend = self.writer.scheduler_updater_backend | self.scheduler_updater_backend = self.writer.scheduler_updater_backend | ||||
def tearDown(self): | def tearDown(self): | ||||
self.scheduler_backend.close_connection() | self.scheduler_backend.close_connection() | ||||
self.scheduler_updater_backend.close_connection() | self.scheduler_updater_backend.close_connection() | ||||
super().tearDown() | super().tearDown() | ||||
@istest | def test_run_ko(self): | ||||
def run_ko(self): | |||||
"""Only git tasks are supported for now, other types are dismissed. | """Only git tasks are supported for now, other types are dismissed. | ||||
""" | """ | ||||
ready_events = [ | ready_events = [ | ||||
SWHEvent( | SWHEvent( | ||||
self._make_simple_event(event_type, 'origin-%s' % i, | self._make_simple_event(event_type, 'origin-%s' % i, | ||||
'svn')) | 'svn')) | ||||
for i, event_type in enumerate(LISTENED_EVENTS) | for i, event_type in enumerate(LISTENED_EVENTS) | ||||
Show All 15 Lines | def test_run_ko(self): | ||||
self.writer.run() | self.writer.run() | ||||
r = self.scheduler_backend.peek_ready_tasks( | r = self.scheduler_backend.peek_ready_tasks( | ||||
'origin-update-git') | 'origin-update-git') | ||||
# other reads after writes are still empty since it's not supported | # other reads after writes are still empty since it's not supported | ||||
self.assertEqual(len(r), 0) | self.assertEqual(len(r), 0) | ||||
@istest | def test_run_ok(self): | ||||
def run_ok(self): | |||||
"""Only git origin are supported for now | """Only git origin are supported for now | ||||
""" | """ | ||||
ready_events = [ | ready_events = [ | ||||
SWHEvent( | SWHEvent( | ||||
self._make_simple_event(event_type, 'origin-%s' % i, 'git')) | self._make_simple_event(event_type, 'origin-%s' % i, 'git')) | ||||
for i, event_type in enumerate(LISTENED_EVENTS) | for i, event_type in enumerate(LISTENED_EVENTS) | ||||
] | ] | ||||
▲ Show 20 Lines • Show All 43 Lines • Show Last 20 Lines |