diff --git a/swh/scheduler/tests/updater/__init__.py b/swh/scheduler/tests/updater/__init__.py index 865b0be..bd7747b 100644 --- a/swh/scheduler/tests/updater/__init__.py +++ b/swh/scheduler/tests/updater/__init__.py @@ -1,44 +1,53 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from arrow import utcnow +try: + from hypothesis.strategies import from_regex +except ImportError: + from hypothesis.strategies import text + + # Revert to using basic text generation + def from_regex(*args, **kwargs): + return text() + class UpdaterTestUtil: """Mixin intended for event generation purposes """ def _make_event(self, event_type, name, origin_type): return { 'type': event_type, 'repo': { 'name': name, }, 'created_at': utcnow(), 'origin_type': origin_type, } def _make_events(self, events): for event_type, repo_name, origin_type in events: yield self._make_event(event_type, repo_name, origin_type) def _make_incomplete_event(self, event_type, name, origin_type, missing_data_key): event = self._make_event(event_type, name, origin_type) del event[missing_data_key] return event def _make_incomplete_events(self, events): for event_type, repo_name, origin_type, missing_data_key in events: yield self._make_incomplete_event(event_type, repo_name, origin_type, missing_data_key) def _make_simple_event(self, event_type, name, origin_type): return { 'type': event_type, 'url': 'https://fakeurl/%s' % name, 'origin_type': origin_type, 'created_at': utcnow(), } diff --git a/swh/scheduler/tests/updater/test_backend.py b/swh/scheduler/tests/updater/test_backend.py index 6eb67a7..c141f6c 100644 --- a/swh/scheduler/tests/updater/test_backend.py +++ b/swh/scheduler/tests/updater/test_backend.py @@ -1,68 +1,71 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import unittest from arrow import utcnow from nose.plugins.attrib import attr from nose.tools import istest from hypothesis import given -from hypothesis.strategies import sets, from_regex +from hypothesis.strategies import sets from swh.core.tests.db_testing import SingleDbTestFixture from swh.scheduler.updater.backend import SchedulerUpdaterBackend from swh.scheduler.updater.events import SWHEvent +from . import from_regex + + TEST_DIR = os.path.dirname(os.path.abspath(__file__)) TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../../swh-storage-testdata') @attr('db') class SchedulerUpdaterBackendTest(SingleDbTestFixture, unittest.TestCase): TEST_DB_NAME = 'softwareheritage-scheduler-updater-test' TEST_DB_DUMP = os.path.join(TEST_DATA_DIR, 'dumps/swh-scheduler-updater.dump') def setUp(self): super().setUp() config = { 'scheduling_updater_db': 'dbname=' + self.TEST_DB_NAME, 'cache_read_limit': 1000, } self.backend = SchedulerUpdaterBackend(**config) def _empty_tables(self): self.cursor.execute( """SELECT table_name FROM information_schema.tables WHERE table_schema = %s""", ('public', )) tables = set(table for (table,) in self.cursor.fetchall()) for table in tables: self.cursor.execute('truncate table %s cascade' % table) self.conn.commit() def tearDown(self): self.backend.close_connection() self._empty_tables() super().tearDown() @istest @given(sets( from_regex( r'^https://somewhere[.]org/[a-z0-9]{5,7}/[a-z0-9]{3,10}$'), min_size=10, max_size=15)) def cache_read(self, urls): def gen_events(urls): for url in urls: yield SWHEvent({ 'url': url, 'type': 'create', 'origin_type': 'git', }) self.backend.cache_put(gen_events(urls)) r = self.backend.cache_read(timestamp=utcnow()) self.assertNotEqual(r, []) diff --git a/swh/scheduler/tests/updater/test_consumer.py b/swh/scheduler/tests/updater/test_consumer.py index e614d41..0944e48 100644 --- a/swh/scheduler/tests/updater/test_consumer.py +++ b/swh/scheduler/tests/updater/test_consumer.py @@ -1,198 +1,199 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from hypothesis import given -from hypothesis.strategies import sampled_from, from_regex, lists, tuples, text +from hypothesis.strategies import sampled_from, lists, tuples, text + from itertools import chain from nose.tools import istest -from swh.scheduler.tests.updater import UpdaterTestUtil +from . import UpdaterTestUtil, from_regex from swh.scheduler.updater.events import SWHEvent, LISTENED_EVENTS from swh.scheduler.updater.consumer import UpdaterConsumer class FakeSchedulerUpdaterBackend: def __init__(self): self.events = [] def cache_put(self, events): self.events.append(events) class FakeUpdaterConsumerBase(UpdaterConsumer): def __init__(self, backend_class=FakeSchedulerUpdaterBackend): super().__init__(backend_class=backend_class) self.connection_opened = False self.connection_closed = False self.consume_called = False self.has_events_called = False def open_connection(self): self.connection_opened = True def close_connection(self): self.connection_closed = True def convert_event(self, event): pass class FakeUpdaterConsumerRaise(FakeUpdaterConsumerBase): def has_events(self): self.has_events_called = True return True def consume_events(self): self.consume_called = True raise ValueError('Broken stuff') class UpdaterConsumerRaisingTest(unittest.TestCase): def setUp(self): self.updater = FakeUpdaterConsumerRaise() @istest def running_raise(self): """Raising during run should finish fine. """ # given self.assertEqual(self.updater.count, 0) self.assertEqual(self.updater.seen_events, set()) self.assertEqual(self.updater.events, []) # when with self.assertRaisesRegex(ValueError, 'Broken stuff'): self.updater.run() # then self.assertEqual(self.updater.count, 0) self.assertEqual(self.updater.seen_events, set()) self.assertEqual(self.updater.events, []) self.assertTrue(self.updater.connection_opened) self.assertTrue(self.updater.has_events_called) self.assertTrue(self.updater.connection_closed) self.assertTrue(self.updater.consume_called) class FakeUpdaterConsumerNoEvent(FakeUpdaterConsumerBase): def has_events(self): self.has_events_called = True return False def consume_events(self): self.consume_called = True class UpdaterConsumerNoEventTest(unittest.TestCase): def setUp(self): self.updater = FakeUpdaterConsumerNoEvent() @istest def running_does_not_consume(self): """Run with no events should do just fine""" # given self.assertEqual(self.updater.count, 0) self.assertEqual(self.updater.seen_events, set()) self.assertEqual(self.updater.events, []) # when self.updater.run() # then self.assertEqual(self.updater.count, 0) self.assertEqual(self.updater.seen_events, set()) self.assertEqual(self.updater.events, []) self.assertTrue(self.updater.connection_opened) self.assertTrue(self.updater.has_events_called) self.assertTrue(self.updater.connection_closed) self.assertFalse(self.updater.consume_called) EVENT_KEYS = ['type', 'repo', 'created_at', 'origin_type'] class FakeUpdaterConsumer(FakeUpdaterConsumerBase): def __init__(self, messages): super().__init__() self.messages = messages self.debug = False def has_events(self): self.has_events_called = True return len(self.messages) > 0 def consume_events(self): self.consume_called = True for msg in self.messages: yield msg self.messages.pop() def convert_event(self, event, keys=EVENT_KEYS): for k in keys: v = event.get(k) if v is None: return None e = { 'type': event['type'], 'url': 'https://fake.url/%s' % event['repo']['name'], 'last_seen': event['created_at'], 'origin_type': event['origin_type'], } return SWHEvent(e) class UpdaterConsumerWithEventTest(UpdaterTestUtil, unittest.TestCase): @istest @given(lists(tuples(sampled_from(LISTENED_EVENTS), # event type from_regex(r'^[a-z0-9]{5,10}/[a-z0-9]{7,12}$'), # name text()), # origin type min_size=3, max_size=10), lists(tuples(text(), # event type from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$'), # name text()), # origin type min_size=3, max_size=10), lists(tuples(sampled_from(LISTENED_EVENTS), # event type from_regex(r'^[a-z0-9]{5,10}/[a-z0-9]{7,12}$'), # name text(), # origin type sampled_from(EVENT_KEYS)), # keys to drop min_size=3, max_size=10)) def running(self, events, uninteresting_events, incomplete_events): """Interesting events are written to cache, others are dropped """ # given ready_events = self._make_events(events) ready_uninteresting_events = self._make_events(uninteresting_events) ready_incomplete_events = self._make_incomplete_events( incomplete_events) updater = FakeUpdaterConsumer(list(chain( ready_events, ready_incomplete_events, ready_uninteresting_events))) self.assertEqual(updater.count, 0) self.assertEqual(updater.seen_events, set()) self.assertEqual(updater.events, []) # when updater.run() # then self.assertEqual(updater.count, 0) self.assertEqual(updater.seen_events, set()) self.assertEqual(updater.events, []) self.assertTrue(updater.connection_opened) self.assertTrue(updater.has_events_called) self.assertTrue(updater.connection_closed) self.assertTrue(updater.consume_called) self.assertEqual(updater.messages, []) # uninteresting or incomplete events are dropped self.assertTrue(len(updater.backend.events), len(events)) diff --git a/swh/scheduler/tests/updater/test_events.py b/swh/scheduler/tests/updater/test_events.py index d36bd54..cb7489e 100644 --- a/swh/scheduler/tests/updater/test_events.py +++ b/swh/scheduler/tests/updater/test_events.py @@ -1,48 +1,48 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from hypothesis import given from hypothesis.strategies import text, sampled_from from nose.tools import istest -from swh.scheduler.tests.updater import UpdaterTestUtil - from swh.scheduler.updater.events import SWHEvent, LISTENED_EVENTS from swh.scheduler.updater.ghtorrent import events +from . import UpdaterTestUtil + def event_values_ko(): return set(events['evt']).union( set(events['ent'])).difference( set(LISTENED_EVENTS)) WRONG_EVENTS = sorted(list(event_values_ko())) class EventTest(UpdaterTestUtil, unittest.TestCase): @istest @given(sampled_from(LISTENED_EVENTS), text(), text()) def is_interesting_ok(self, event_type, name, origin_type): evt = self._make_simple_event(event_type, name, origin_type) self.assertTrue(SWHEvent(evt).is_interesting()) @istest @given(text(), text(), text()) def is_interested_with_noisy_event_should_be_ko( self, event_type, name, origin_type): if event_type in LISTENED_EVENTS: # just in case something good is generated, skip it return evt = self._make_simple_event(event_type, name, origin_type) self.assertFalse(SWHEvent(evt).is_interesting()) @istest @given(sampled_from(WRONG_EVENTS), text(), text()) def is_interesting_ko(self, event_type, name, origin_type): evt = self._make_simple_event(event_type, name, origin_type) self.assertFalse(SWHEvent(evt).is_interesting()) diff --git a/swh/scheduler/tests/updater/test_ghtorrent.py b/swh/scheduler/tests/updater/test_ghtorrent.py index f82d9b0..bfeecf2 100644 --- a/swh/scheduler/tests/updater/test_ghtorrent.py +++ b/swh/scheduler/tests/updater/test_ghtorrent.py @@ -1,171 +1,171 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from hypothesis import given -from hypothesis.strategies import sampled_from, from_regex +from hypothesis.strategies import sampled_from from nose.tools import istest from unittest.mock import patch -from swh.scheduler.tests.updater import UpdaterTestUtil - from swh.scheduler.updater.events import SWHEvent from swh.scheduler.updater.ghtorrent import ( events, GHTorrentConsumer, INTERESTING_EVENT_KEYS) +from . import from_regex, UpdaterTestUtil + def event_values(): return set(events['evt']).union(set(events['ent'])) def ghtorrentize_event_name(event_name): return '%sEvent' % event_name.capitalize() EVENT_TYPES = sorted([ghtorrentize_event_name(e) for e in event_values()]) class FakeChannel: """Fake Channel (virtual connection inside a connection) """ def close(self): self.close = True class FakeConnection: """Fake Rabbitmq connection for test purposes """ def __init__(self, conn_string): self._conn_string = conn_string self._connect = False self._release = False self._channel = False def connect(self): self._connect = True return True def release(self): self._connect = False self._release = True def channel(self): self._channel = True return FakeChannel() class GHTorrentConsumerTest(UpdaterTestUtil, unittest.TestCase): def setUp(self): self.fake_config = { 'conn': { 'url': 'amqp://u:p@https://somewhere:9807', }, 'debug': True, 'batch_cache_write': 10, 'rabbitmq_prefetch_read': 100, } self.consumer = GHTorrentConsumer(self.fake_config, _connection_class=FakeConnection) @istest def test_init(self): # given # check init is ok self.assertEqual(self.consumer.debug, self.fake_config['debug']) self.assertEqual(self.consumer.batch, self.fake_config['batch_cache_write']) self.assertEqual(self.consumer.prefetch_read, self.fake_config['rabbitmq_prefetch_read']) self.assertEqual(self.consumer.config, self.fake_config) @istest def test_has_events(self): self.assertTrue(self.consumer.has_events()) @istest def test_connection(self): # when self.consumer.open_connection() # then self.assertEqual(self.consumer.conn._conn_string, self.fake_config['conn']['url']) self.assertTrue(self.consumer.conn._connect) self.assertFalse(self.consumer.conn._release) # when self.consumer.close_connection() # then self.assertFalse(self.consumer.conn._connect) self.assertTrue(self.consumer.conn._release) self.assertIsInstance(self.consumer.channel, FakeChannel) @istest @given(sampled_from(EVENT_TYPES), from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$')) def convert_event_ok(self, event_type, name): input_event = self._make_event(event_type, name, 'git') actual_event = self.consumer.convert_event(input_event) self.assertTrue(isinstance(actual_event, SWHEvent)) event = actual_event.get() expected_event = { 'type': event_type.lower().rstrip('Event'), 'url': 'https://github.com/%s' % name, 'last_seen': input_event['created_at'], 'cnt': 1, 'origin_type': 'git', } self.assertEqual(event, expected_event) @istest @given(sampled_from(EVENT_TYPES), from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$'), sampled_from(INTERESTING_EVENT_KEYS)) def convert_event_ko(self, event_type, name, missing_data_key): input_event = self._make_incomplete_event( event_type, name, 'git', missing_data_key) actual_converted_event = self.consumer.convert_event(input_event) self.assertIsNone(actual_converted_event) @patch('swh.scheduler.updater.ghtorrent.collect_replies') @istest def consume_events(self, mock_collect_replies): # given self.consumer.queue = 'fake-queue' # hack self.consumer.open_connection() fake_events = [ self._make_event('PushEvent', 'user/some-repo', 'git'), self._make_event('PushEvent', 'user2/some-other-repo', 'git'), ] mock_collect_replies.return_value = fake_events # when actual_events = [] for e in self.consumer.consume_events(): actual_events.append(e) # then self.assertEqual(fake_events, actual_events) mock_collect_replies.assert_called_once_with( self.consumer.conn, self.consumer.channel, 'fake-queue', no_ack=False, limit=self.fake_config['rabbitmq_prefetch_read'] ) diff --git a/swh/scheduler/tests/updater/test_writer.py b/swh/scheduler/tests/updater/test_writer.py index b35a6b1..2d98f28 100644 --- a/swh/scheduler/tests/updater/test_writer.py +++ b/swh/scheduler/tests/updater/test_writer.py @@ -1,162 +1,162 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import unittest from nose.plugins.attrib import attr from nose.tools import istest from swh.core.tests.db_testing import DbTestFixture from swh.scheduler.updater.events import SWHEvent from swh.scheduler.updater.writer import UpdaterWriter from swh.scheduler.updater.events import LISTENED_EVENTS -from swh.scheduler.tests.updater import UpdaterTestUtil +from . import UpdaterTestUtil TEST_DIR = os.path.dirname(os.path.abspath(__file__)) TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../../swh-storage-testdata') @attr('db') class CommonSchedulerTest(DbTestFixture): TEST_SCHED_DB = 'softwareheritage-scheduler-test' TEST_SCHED_DUMP = os.path.join(TEST_DATA_DIR, 'dumps/swh-scheduler.dump') TEST_SCHED_UPDATER_DB = 'softwareheritage-scheduler-updater-test' TEST_SCHED_UPDATER_DUMP = os.path.join(TEST_DATA_DIR, 'dumps/swh-scheduler-updater.dump') @classmethod def setUpClass(cls): cls.add_db(cls.TEST_SCHED_DB, cls.TEST_SCHED_DUMP) cls.add_db(cls.TEST_SCHED_UPDATER_DB, cls.TEST_SCHED_UPDATER_DUMP) super().setUpClass() def tearDown(self): self.reset_db_tables(self.TEST_SCHED_UPDATER_DB) self.reset_db_tables(self.TEST_SCHED_DB, excluded=['task_type', 'priority_ratio']) super().tearDown() class UpdaterWriterTest(UpdaterTestUtil, CommonSchedulerTest, unittest.TestCase): def setUp(self): super().setUp() config = { 'scheduler': { 'cls': 'local', 'args': { 'scheduling_db': 'dbname=softwareheritage-scheduler-test', }, }, 'scheduler_updater': { 'scheduling_updater_db': 'dbname=softwareheritage-scheduler-updater-test', 'cache_read_limit': 5, }, 'pause': 0.1, 'verbose': False, } self.writer = UpdaterWriter(**config) self.scheduler_backend = self.writer.scheduler_backend self.scheduler_updater_backend = self.writer.scheduler_updater_backend def tearDown(self): self.scheduler_backend.close_connection() self.scheduler_updater_backend.close_connection() super().tearDown() @istest def run_ko(self): """Only git tasks are supported for now, other types are dismissed. """ ready_events = [ SWHEvent( self._make_simple_event(event_type, 'origin-%s' % i, 'svn')) for i, event_type in enumerate(LISTENED_EVENTS) ] expected_length = len(ready_events) self.scheduler_updater_backend.cache_put(ready_events) data = list(self.scheduler_updater_backend.cache_read()) self.assertEqual(len(data), expected_length) r = self.scheduler_backend.peek_ready_tasks( 'origin-update-git') # first read on an empty scheduling db results with nothing in it self.assertEqual(len(r), 0) # Read from cache to scheduler db self.writer.run() r = self.scheduler_backend.peek_ready_tasks( 'origin-update-git') # other reads after writes are still empty since it's not supported self.assertEqual(len(r), 0) @istest def run_ok(self): """Only git origin are supported for now """ ready_events = [ SWHEvent( self._make_simple_event(event_type, 'origin-%s' % i, 'git')) for i, event_type in enumerate(LISTENED_EVENTS) ] expected_length = len(ready_events) self.scheduler_updater_backend.cache_put(ready_events) data = list(self.scheduler_updater_backend.cache_read()) self.assertEqual(len(data), expected_length) r = self.scheduler_backend.peek_ready_tasks( 'origin-update-git') # first read on an empty scheduling db results with nothing in it self.assertEqual(len(r), 0) # Read from cache to scheduler db self.writer.run() # now, we should have scheduling task ready r = self.scheduler_backend.peek_ready_tasks( 'origin-update-git') self.assertEquals(len(r), expected_length) # Check the task has been scheduled for t in r: self.assertEquals(t['type'], 'origin-update-git') self.assertEquals(t['priority'], 'normal') self.assertEquals(t['policy'], 'oneshot') self.assertEquals(t['status'], 'next_run_not_scheduled') # writer has nothing to do now self.writer.run() # so no more data in cache data = list(self.scheduler_updater_backend.cache_read()) self.assertEqual(len(data), 0) # provided, no runner is ran, still the same amount of scheduling tasks r = self.scheduler_backend.peek_ready_tasks( 'origin-update-git') self.assertEquals(len(r), expected_length)