Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7123005
D1028.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
23 KB
Subscribers
None
D1028.diff
View Options
diff --git a/swh/scheduler/cli.py b/swh/scheduler/cli.py
--- a/swh/scheduler/cli.py
+++ b/swh/scheduler/cli.py
@@ -633,5 +633,34 @@
click.echo('OK')
+@cli.command('updater')
+@click.option('--verbose/--no-verbose', '-v', default=False,
+ help='Verbose mode')
+@click.pass_context
+def updater(ctx, verbose):
+ """Insert tasks in the scheduler from the scheduler-updater's events
+
+ """
+ from swh.scheduler.updater.writer import UpdaterWriter
+ UpdaterWriter(**ctx.obj['config']).run()
+
+
+@cli.command('ghtorrent')
+@click.option('--verbose/--no-verbose', '-v', default=False,
+ help='Verbose mode')
+@click.pass_context
+def ghtorrent(ctx, verbose):
+ """Consume events from ghtorrent and write them to cache.
+
+ """
+ from swh.scheduler.updater.ghtorrent import GHTorrentConsumer
+ from swh.scheduler.updater.backend import SchedulerUpdaterBackend
+
+ ght_config = ctx.obj['config'].get('ghtorrent', {})
+ back_config = ctx.obj['config'].get('scheduler_updater', {})
+ backend = SchedulerUpdaterBackend(**back_config)
+ GHTorrentConsumer(backend, **ght_config).run()
+
+
if __name__ == '__main__':
cli()
diff --git a/swh/scheduler/tests/updater/test_backend.py b/swh/scheduler/tests/updater/test_backend.py
--- a/swh/scheduler/tests/updater/test_backend.py
+++ b/swh/scheduler/tests/updater/test_backend.py
@@ -27,7 +27,7 @@
def setUp(self):
super().setUp()
config = {
- 'scheduling_updater_db': 'dbname=' + self.TEST_DB_NAME,
+ 'db': 'dbname=' + self.TEST_DB_NAME,
'cache_read_limit': 1000,
}
self.backend = SchedulerUpdaterBackend(**config)
@@ -42,7 +42,6 @@
self.conn.commit()
def tearDown(self):
- self.backend.close_connection()
self._empty_tables()
super().tearDown()
@@ -58,8 +57,6 @@
'type': 'create',
'origin_type': 'git',
})
-
self.backend.cache_put(gen_events(urls))
r = self.backend.cache_read(timestamp=utcnow())
-
self.assertNotEqual(r, [])
diff --git a/swh/scheduler/tests/updater/test_consumer.py b/swh/scheduler/tests/updater/test_consumer.py
--- a/swh/scheduler/tests/updater/test_consumer.py
+++ b/swh/scheduler/tests/updater/test_consumer.py
@@ -24,8 +24,8 @@
class FakeUpdaterConsumerBase(UpdaterConsumer):
- def __init__(self, backend_class=FakeSchedulerUpdaterBackend):
- super().__init__(backend_class=backend_class)
+ def __init__(self, backend):
+ super().__init__(backend)
self.connection_opened = False
self.connection_closed = False
self.consume_called = False
@@ -53,7 +53,8 @@
class UpdaterConsumerRaisingTest(unittest.TestCase):
def setUp(self):
- self.updater = FakeUpdaterConsumerRaise()
+ self.updater = FakeUpdaterConsumerRaise(
+ FakeSchedulerUpdaterBackend())
def test_running_raise(self):
"""Raising during run should finish fine.
@@ -89,7 +90,8 @@
class UpdaterConsumerNoEventTest(unittest.TestCase):
def setUp(self):
- self.updater = FakeUpdaterConsumerNoEvent()
+ self.updater = FakeUpdaterConsumerNoEvent(
+ FakeSchedulerUpdaterBackend())
def test_running_does_not_consume(self):
"""Run with no events should do just fine"""
@@ -115,8 +117,8 @@
class FakeUpdaterConsumer(FakeUpdaterConsumerBase):
- def __init__(self, messages):
- super().__init__()
+ def __init__(self, backend, messages):
+ super().__init__(backend)
self.messages = messages
self.debug = False
@@ -170,9 +172,11 @@
ready_incomplete_events = self._make_incomplete_events(
incomplete_events)
- updater = FakeUpdaterConsumer(list(chain(
- ready_events, ready_incomplete_events,
- ready_uninteresting_events)))
+ updater = FakeUpdaterConsumer(
+ FakeSchedulerUpdaterBackend(),
+ list(chain(
+ ready_events, ready_incomplete_events,
+ ready_uninteresting_events)))
self.assertEqual(updater.count, 0)
self.assertEqual(updater.seen_events, set())
diff --git a/swh/scheduler/tests/updater/test_ghtorrent.py b/swh/scheduler/tests/updater/test_ghtorrent.py
--- a/swh/scheduler/tests/updater/test_ghtorrent.py
+++ b/swh/scheduler/tests/updater/test_ghtorrent.py
@@ -12,6 +12,7 @@
from swh.scheduler.updater.events import SWHEvent
from swh.scheduler.updater.ghtorrent import (INTERESTING_EVENT_KEYS,
GHTorrentConsumer, events)
+from swh.scheduler.updater.backend import SchedulerUpdaterBackend
from . import UpdaterTestUtil, from_regex
@@ -60,28 +61,35 @@
class GHTorrentConsumerTest(UpdaterTestUtil, unittest.TestCase):
def setUp(self):
- self.fake_config = {
- 'conn': {
- 'url': 'amqp://u:p@https://somewhere:9807',
+ config = {
+ 'ghtorrent': {
+ 'rabbitmq': {
+ 'conn': {
+ 'url': 'amqp://u:p@https://somewhere:9807',
+ },
+ 'prefetch_read': 17,
+ },
+ 'batch_cache_write': 42,
+ },
+ 'scheduler_updater': {
+ 'cls': 'local',
+ 'args': {
+ 'db': 'dbname=softwareheritage-scheduler-updater-dev',
+ },
},
- 'debug': True,
- 'batch_cache_write': 10,
- 'rabbitmq_prefetch_read': 100,
}
- self.consumer = GHTorrentConsumer(self.fake_config,
- _connection_class=FakeConnection)
+ GHTorrentConsumer.connection_class = FakeConnection
+ with patch.object(
+ SchedulerUpdaterBackend, '__init__', return_value=None):
+ self.consumer = GHTorrentConsumer(**config)
- def test_init(self):
+ @patch('swh.scheduler.updater.backend.SchedulerUpdaterBackend')
+ def test_init(self, mock_backend):
# given
# check init is ok
- self.assertEqual(self.consumer.debug,
- self.fake_config['debug'])
- self.assertEqual(self.consumer.batch,
- self.fake_config['batch_cache_write'])
- self.assertEqual(self.consumer.prefetch_read,
- self.fake_config['rabbitmq_prefetch_read'])
- self.assertEqual(self.consumer.config, self.fake_config)
+ self.assertEqual(self.consumer.batch, 42)
+ self.assertEqual(self.consumer.prefetch_read, 17)
def test_has_events(self):
self.assertTrue(self.consumer.has_events())
@@ -92,7 +100,7 @@
# then
self.assertEqual(self.consumer.conn._conn_string,
- self.fake_config['conn']['url'])
+ 'amqp://u:p@https://somewhere:9807')
self.assertTrue(self.consumer.conn._connect)
self.assertFalse(self.consumer.conn._release)
@@ -130,8 +138,10 @@
input_event = self._make_incomplete_event(
event_type, name, 'git', missing_data_key)
+ logger = self.consumer.log
+ del self.consumer.log # prevent gazillions of warnings
actual_converted_event = self.consumer.convert_event(input_event)
-
+ self.consumer.log = logger
self.assertIsNone(actual_converted_event)
@patch('swh.scheduler.updater.ghtorrent.collect_replies')
@@ -160,5 +170,5 @@
self.consumer.channel,
'fake-queue',
no_ack=False,
- limit=self.fake_config['rabbitmq_prefetch_read']
+ limit=17
)
diff --git a/swh/scheduler/tests/updater/test_writer.py b/swh/scheduler/tests/updater/test_writer.py
--- a/swh/scheduler/tests/updater/test_writer.py
+++ b/swh/scheduler/tests/updater/test_writer.py
@@ -52,26 +52,26 @@
'scheduler': {
'cls': 'local',
'args': {
- 'scheduling_db': 'dbname=softwareheritage-scheduler-test',
+ 'db': 'dbname=softwareheritage-scheduler-test',
},
},
'scheduler_updater': {
- 'scheduling_updater_db':
- 'dbname=softwareheritage-scheduler-updater-test',
- 'cache_read_limit': 5,
+ 'cls': 'local',
+ 'args': {
+ 'db':
+ 'dbname=softwareheritage-scheduler-updater-test',
+ 'cache_read_limit': 5,
+ },
+ },
+ 'updater_writer': {
+ 'pause': 0.1,
+ 'verbose': False,
},
- 'pause': 0.1,
- 'verbose': False,
}
self.writer = UpdaterWriter(**config)
self.scheduler_backend = self.writer.scheduler_backend
self.scheduler_updater_backend = self.writer.scheduler_updater_backend
- def tearDown(self):
- self.scheduler_backend.close_connection()
- self.scheduler_updater_backend.close_connection()
- super().tearDown()
-
def test_run_ko(self):
"""Only git tasks are supported for now, other types are dismissed.
diff --git a/swh/scheduler/updater/backend.py b/swh/scheduler/updater/backend.py
--- a/swh/scheduler/updater/backend.py
+++ b/swh/scheduler/updater/backend.py
@@ -5,33 +5,43 @@
from arrow import utcnow
-from swh.core.config import SWHConfig
-from swh.scheduler.backend import DbBackend, autocommit
+import psycopg2.pool
+import psycopg2.extras
+from swh.scheduler.backend import DbBackend
+from swh.core.db.common import db_transaction, db_transaction_generator
-class SchedulerUpdaterBackend(SWHConfig, DbBackend):
+class SchedulerUpdaterBackend:
CONFIG_BASE_FILENAME = 'backend/scheduler-updater'
- DEFAULT_CONFIG = {
- 'scheduling_updater_db': (
- 'str', 'dbname=softwareheritage-scheduler-updater-dev'),
- 'cache_read_limit': ('int', 1000),
- }
-
- def __init__(self, **override_config):
- super().__init__()
- if override_config:
- self.config = override_config
+# 'cache_read_limit': ('int', 1000),
+
+ def __init__(self, db, cache_read_limit=1000,
+ min_pool_conns=1, max_pool_conns=10):
+ """
+ Args:
+ db_conn: either a libpq connection string, or a psycopg2 connection
+
+ """
+ if isinstance(db, psycopg2.extensions.connection):
+ self._pool = None
+ self._db = DbBackend(db)
else:
- self.config = self.parse_config_file(global_config=False)
- self.db = None
- self.db_conn_dsn = self.config['scheduling_updater_db']
- self.limit = self.config['cache_read_limit']
- self.reconnect()
+ self._pool = psycopg2.pool.ThreadedConnectionPool(
+ min_pool_conns, max_pool_conns, db,
+ cursor_factory=psycopg2.extras.RealDictCursor,
+ )
+ self._db = None
+ self.limit = cache_read_limit
+
+ def get_db(self):
+ if self._db:
+ return self._db
+ return DbBackend.from_pool(self._pool)
cache_put_keys = ['url', 'cnt', 'last_seen', 'origin_type']
- @autocommit
- def cache_put(self, events, timestamp=None, cursor=None):
+ @db_transaction()
+ def cache_put(self, events, timestamp=None, db=None, cur=None):
"""Write new events in the backend.
"""
@@ -45,17 +55,16 @@
if seen is None:
event['last_seen'] = timestamp
yield event
-
- cursor.execute('select swh_mktemp_cache()')
- self.copy_to(prepare_events(events),
- 'tmp_cache', self.cache_put_keys, cursor=cursor)
- cursor.execute('select swh_cache_put()')
+ cur.execute('select swh_mktemp_cache()')
+ db.copy_to(prepare_events(events),
+ 'tmp_cache', self.cache_put_keys, cursor=cur)
+ cur.execute('select swh_cache_put()')
cache_read_keys = ['id', 'url', 'origin_type', 'cnt', 'first_seen',
'last_seen']
- @autocommit
- def cache_read(self, timestamp=None, limit=None, cursor=None):
+ @db_transaction_generator()
+ def cache_read(self, timestamp=None, limit=None, db=None, cur=None):
"""Read events from the cache prior to timestamp.
"""
@@ -65,18 +74,18 @@
if not limit:
limit = self.limit
- q = self._format_query('select {keys} from swh_cache_read(%s, %s)',
- self.cache_read_keys)
- cursor.execute(q, (timestamp, limit))
- for r in cursor.fetchall():
+ q = db._format_query('select {keys} from swh_cache_read(%s, %s)',
+ self.cache_read_keys)
+ cur.execute(q, (timestamp, limit))
+ for r in cur.fetchall():
r['id'] = r['id'].tobytes()
yield r
- @autocommit
- def cache_remove(self, entries, cursor=None):
+ @db_transaction()
+ def cache_remove(self, entries, db=None, cur=None):
"""Clean events from the cache
"""
q = 'delete from cache where url in (%s)' % (
', '.join(("'%s'" % e for e in entries)), )
- cursor.execute(q)
+ cur.execute(q)
diff --git a/swh/scheduler/updater/consumer.py b/swh/scheduler/updater/consumer.py
--- a/swh/scheduler/updater/consumer.py
+++ b/swh/scheduler/updater/consumer.py
@@ -7,21 +7,19 @@
from abc import ABCMeta, abstractmethod
-from swh.scheduler.updater.backend import SchedulerUpdaterBackend
-
class UpdaterConsumer(metaclass=ABCMeta):
"""Event consumer
"""
- def __init__(self, batch=1000, backend_class=SchedulerUpdaterBackend,
- log_class='swh.scheduler.updater.consumer.UpdaterConsumer'):
+ def __init__(self, backend, batch_cache_write=1000):
super().__init__()
self._reset_cache()
- self.backend = backend_class()
- self.batch = batch
+ self.backend = backend
+ self.batch = int(batch_cache_write)
logging.basicConfig(level=logging.DEBUG)
- self.log = logging.getLogger(log_class)
+ self.log = logging.getLogger('%s.%s' % (
+ self.__class__.__module__, self.__class__.__name__))
def _reset_cache(self):
"""Reset internal cache.
diff --git a/swh/scheduler/updater/ghtorrent/__init__.py b/swh/scheduler/updater/ghtorrent/__init__.py
--- a/swh/scheduler/updater/ghtorrent/__init__.py
+++ b/swh/scheduler/updater/ghtorrent/__init__.py
@@ -8,9 +8,11 @@
from kombu import Connection, Exchange, Queue
from kombu.common import collect_replies
-from swh.core.config import SWHConfig
+from swh.core.config import merge_configs
+
from swh.scheduler.updater.events import SWHEvent
from swh.scheduler.updater.consumer import UpdaterConsumer
+from swh.scheduler.updater.backend import SchedulerUpdaterBackend
events = {
@@ -32,65 +34,60 @@
]
}
+INTERESTING_EVENT_KEYS = ['type', 'repo', 'created_at']
-class RabbitMQConn(SWHConfig):
- """RabbitMQ Connection class
+DEFAULT_CONFIG = {
+ 'ghtorrent': {
+ 'batch_cache_write': 1000,
+ 'rabbitmq': {
+ 'prefetch_read': 100,
+ 'conn': {
+ 'url': 'amqp://guest:guest@localhost:5672',
+ 'exchange_name': 'ght-streams',
+ 'routing_key': 'something',
+ 'queue_name': 'fake-events',
+ },
+ },
+ },
+ 'scheduler_updater': {
+ 'cls': 'local',
+ 'args': {
+ 'db': 'dbname=softwareheritage-scheduler-updater-dev',
+ 'cache_read_limit': 1000,
+ },
+ },
+}
- """
- CONFIG_BASE_FILENAME = 'backend/ghtorrent'
-
- DEFAULT_CONFIG = {
- 'conn': ('dict', {
- 'url': 'amqp://guest:guest@localhost:5672',
- 'exchange_name': 'ght-streams',
- 'routing_key': 'something',
- 'queue_name': 'fake-events'
- })
- }
- ADDITIONAL_CONFIG = {}
+class GHTorrentConsumer(UpdaterConsumer):
+ """GHTorrent events consumer
+
+ """
+ connection_class = Connection
def __init__(self, **config):
- super().__init__()
- if config and set(config.keys()) - {'log_class'} != set():
- self.config = config
- else:
- self.config = self.parse_config_file(
- additional_configs=[self.ADDITIONAL_CONFIG])
-
- self.conn_string = self.config['conn']['url']
- self.exchange = Exchange(self.config['conn']['exchange_name'],
- 'topic', durable=True)
- self.routing_key = self.config['conn']['routing_key']
- self.queue = Queue(self.config['conn']['queue_name'],
- exchange=self.exchange,
- routing_key=self.routing_key,
+ self.config = merge_configs(DEFAULT_CONFIG, config)
+
+ ght_config = self.config['ghtorrent']
+ rmq_config = ght_config['rabbitmq']
+ self.prefetch_read = int(rmq_config.get('prefetch_read', 100))
+
+ exchange = Exchange(
+ rmq_config['conn']['exchange_name'],
+ 'topic', durable=True)
+ routing_key = rmq_config['conn']['routing_key']
+ self.queue = Queue(rmq_config['conn']['queue_name'],
+ exchange=exchange,
+ routing_key=routing_key,
auto_delete=True)
+ if self.config['scheduler_updater']['cls'] != 'local':
+ raise ValueError(
+ 'The scheduler_updater can only be a cls=local for now')
+ backend = SchedulerUpdaterBackend(
+ **self.config['scheduler_updater']['args'])
-INTERESTING_EVENT_KEYS = ['type', 'repo', 'created_at']
-
-
-class GHTorrentConsumer(RabbitMQConn, UpdaterConsumer):
- """GHTorrent events consumer
-
- """
- ADDITIONAL_CONFIG = {
- 'debug': ('bool', False),
- 'batch_cache_write': ('int', 1000),
- 'rabbitmq_prefetch_read': ('int', 100),
- }
-
- def __init__(self, config=None, _connection_class=Connection):
- if config is None:
- super().__init__(
- log_class='swh.scheduler.updater.ghtorrent.GHTorrentConsumer')
- else:
- self.config = config
- self._connection_class = _connection_class
- self.debug = self.config['debug']
- self.batch = self.config['batch_cache_write']
- self.prefetch_read = self.config['rabbitmq_prefetch_read']
+ super().__init__(backend, ght_config.get('batch_cache_write', 1000))
def has_events(self):
"""Always has events
@@ -104,11 +101,10 @@
"""
if isinstance(event, str):
event = json.loads(event)
-
for k in INTERESTING_EVENT_KEYS:
if k not in event:
if hasattr(self, 'log'):
- self.log.warn(
+ self.log.warning(
'Event should have the \'%s\' entry defined' % k)
return None
@@ -126,7 +122,8 @@
"""Open rabbitmq connection
"""
- self.conn = self._connection_class(self.config['conn']['url'])
+ self.conn = self.connection_class(
+ self.config['ghtorrent']['rabbitmq']['conn']['url'])
self.conn.connect()
self.channel = self.conn.channel()
diff --git a/swh/scheduler/updater/ghtorrent/cli.py b/swh/scheduler/updater/ghtorrent/cli.py
--- a/swh/scheduler/updater/ghtorrent/cli.py
+++ b/swh/scheduler/updater/ghtorrent/cli.py
@@ -4,24 +4,19 @@
# See top-level LICENSE file for more information
import click
-import logging
-
-from swh.scheduler.updater.ghtorrent import GHTorrentConsumer
@click.command()
@click.option('--verbose/--no-verbose', '-v', default=False,
help='Verbose mode')
-def main(verbose):
+@click.pass_context
+def main(ctx, verbose):
"""Consume events from ghtorrent and write them to cache.
"""
- log = logging.getLogger('swh.scheduler.updater.ghtorrent.cli')
- log.addHandler(logging.StreamHandler())
- _loglevel = logging.DEBUG if verbose else logging.INFO
- log.setLevel(_loglevel)
-
- GHTorrentConsumer().run()
+ click.echo("Deprecated! Use 'swh-scheduler updater' instead.",
+ err=True)
+ ctx.exit(1)
if __name__ == '__main__':
diff --git a/swh/scheduler/updater/writer.py b/swh/scheduler/updater/writer.py
--- a/swh/scheduler/updater/writer.py
+++ b/swh/scheduler/updater/writer.py
@@ -9,14 +9,13 @@
from arrow import utcnow
-from swh.core.config import SWHConfig
from swh.core import utils
from swh.scheduler import get_scheduler
from swh.scheduler.utils import create_oneshot_task_dict
from swh.scheduler.updater.backend import SchedulerUpdaterBackend
-class UpdaterWriter(SWHConfig):
+class UpdaterWriter:
"""Updater writer in charge of updating the scheduler db with latest
prioritized oneshot tasks
@@ -26,41 +25,18 @@
- dumps them into the scheduler db
"""
- CONFIG_BASE_FILENAME = 'backend/scheduler-updater-writer'
- DEFAULT_CONFIG = {
- # access to the scheduler backend
- 'scheduler': ('dict', {
- 'cls': 'local',
- 'args': {
- 'scheduling_db': 'dbname=softwareheritage-scheduler-dev',
- },
- }),
- # access to the scheduler updater cache
- 'scheduler_updater': ('dict', {
- 'scheduling_updater_db':
- 'dbname=softwareheritage-scheduler-updater-dev',
- 'cache_read_limit': 1000,
- }),
- # waiting time between db reads
- 'pause': ('int', 10),
- # verbose or not
- 'verbose': ('bool', False),
- }
def __init__(self, **config):
- if config:
- self.config = config
- else:
- self.config = self.parse_config_file()
-
+ self.config = config
+ if self.config['scheduler_updater']['cls'] != 'local':
+ raise ValueError(
+ 'The scheduler_updater can only be a cls=local for now')
self.scheduler_updater_backend = SchedulerUpdaterBackend(
- **self.config['scheduler_updater'])
+ **self.config['scheduler_updater']['args'])
self.scheduler_backend = get_scheduler(**self.config['scheduler'])
- self.pause = self.config['pause']
+ self.pause = self.config.get('updater_writer', {}).get('pause', 10)
self.log = logging.getLogger(
'swh.scheduler.updater.writer.UpdaterWriter')
- self.log.setLevel(
- logging.DEBUG if self.config['verbose'] else logging.INFO)
def convert_to_oneshot_task(self, event):
"""Given an event, convert it into oneshot task with priority
@@ -109,13 +85,11 @@
@click.command()
@click.option('--verbose/--no-verbose', '-v', default=False,
help='Verbose mode')
-def main(verbose):
- log = logging.getLogger('swh.scheduler.updater.writer')
- log.addHandler(logging.StreamHandler())
- _loglevel = logging.DEBUG if verbose else logging.INFO
- log.setLevel(_loglevel)
-
- UpdaterWriter().run()
+@click.pass_context
+def main(ctx, verbose):
+ click.echo("Deprecated! Use 'swh-scheduler updater' instead.",
+ err=True)
+ ctx.exit(1)
if __name__ == '__main__':
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Tue, Dec 17, 3:31 PM (2 d, 20 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3223919
Attached To
D1028: Refactor swh/scheduler/updater as well
Event Timeline
Log In to Comment