Page MenuHomeSoftware Heritage

D1028.diff
No OneTemporary

D1028.diff

diff --git a/swh/scheduler/cli.py b/swh/scheduler/cli.py
--- a/swh/scheduler/cli.py
+++ b/swh/scheduler/cli.py
@@ -633,5 +633,34 @@
click.echo('OK')
+@cli.command('updater')
+@click.option('--verbose/--no-verbose', '-v', default=False,
+ help='Verbose mode')
+@click.pass_context
+def updater(ctx, verbose):
+ """Insert tasks in the scheduler from the scheduler-updater's events
+
+ """
+ from swh.scheduler.updater.writer import UpdaterWriter
+ UpdaterWriter(**ctx.obj['config']).run()
+
+
+@cli.command('ghtorrent')
+@click.option('--verbose/--no-verbose', '-v', default=False,
+ help='Verbose mode')
+@click.pass_context
+def ghtorrent(ctx, verbose):
+ """Consume events from ghtorrent and write them to cache.
+
+ """
+ from swh.scheduler.updater.ghtorrent import GHTorrentConsumer
+ from swh.scheduler.updater.backend import SchedulerUpdaterBackend
+
+ ght_config = ctx.obj['config'].get('ghtorrent', {})
+ back_config = ctx.obj['config'].get('scheduler_updater', {})
+ backend = SchedulerUpdaterBackend(**back_config)
+ GHTorrentConsumer(backend, **ght_config).run()
+
+
if __name__ == '__main__':
cli()
diff --git a/swh/scheduler/tests/updater/test_backend.py b/swh/scheduler/tests/updater/test_backend.py
--- a/swh/scheduler/tests/updater/test_backend.py
+++ b/swh/scheduler/tests/updater/test_backend.py
@@ -27,7 +27,7 @@
def setUp(self):
super().setUp()
config = {
- 'scheduling_updater_db': 'dbname=' + self.TEST_DB_NAME,
+ 'db': 'dbname=' + self.TEST_DB_NAME,
'cache_read_limit': 1000,
}
self.backend = SchedulerUpdaterBackend(**config)
@@ -42,7 +42,6 @@
self.conn.commit()
def tearDown(self):
- self.backend.close_connection()
self._empty_tables()
super().tearDown()
@@ -58,8 +57,6 @@
'type': 'create',
'origin_type': 'git',
})
-
self.backend.cache_put(gen_events(urls))
r = self.backend.cache_read(timestamp=utcnow())
-
self.assertNotEqual(r, [])
diff --git a/swh/scheduler/tests/updater/test_consumer.py b/swh/scheduler/tests/updater/test_consumer.py
--- a/swh/scheduler/tests/updater/test_consumer.py
+++ b/swh/scheduler/tests/updater/test_consumer.py
@@ -24,8 +24,8 @@
class FakeUpdaterConsumerBase(UpdaterConsumer):
- def __init__(self, backend_class=FakeSchedulerUpdaterBackend):
- super().__init__(backend_class=backend_class)
+ def __init__(self, backend):
+ super().__init__(backend)
self.connection_opened = False
self.connection_closed = False
self.consume_called = False
@@ -53,7 +53,8 @@
class UpdaterConsumerRaisingTest(unittest.TestCase):
def setUp(self):
- self.updater = FakeUpdaterConsumerRaise()
+ self.updater = FakeUpdaterConsumerRaise(
+ FakeSchedulerUpdaterBackend())
def test_running_raise(self):
"""Raising during run should finish fine.
@@ -89,7 +90,8 @@
class UpdaterConsumerNoEventTest(unittest.TestCase):
def setUp(self):
- self.updater = FakeUpdaterConsumerNoEvent()
+ self.updater = FakeUpdaterConsumerNoEvent(
+ FakeSchedulerUpdaterBackend())
def test_running_does_not_consume(self):
"""Run with no events should do just fine"""
@@ -115,8 +117,8 @@
class FakeUpdaterConsumer(FakeUpdaterConsumerBase):
- def __init__(self, messages):
- super().__init__()
+ def __init__(self, backend, messages):
+ super().__init__(backend)
self.messages = messages
self.debug = False
@@ -170,9 +172,11 @@
ready_incomplete_events = self._make_incomplete_events(
incomplete_events)
- updater = FakeUpdaterConsumer(list(chain(
- ready_events, ready_incomplete_events,
- ready_uninteresting_events)))
+ updater = FakeUpdaterConsumer(
+ FakeSchedulerUpdaterBackend(),
+ list(chain(
+ ready_events, ready_incomplete_events,
+ ready_uninteresting_events)))
self.assertEqual(updater.count, 0)
self.assertEqual(updater.seen_events, set())
diff --git a/swh/scheduler/tests/updater/test_ghtorrent.py b/swh/scheduler/tests/updater/test_ghtorrent.py
--- a/swh/scheduler/tests/updater/test_ghtorrent.py
+++ b/swh/scheduler/tests/updater/test_ghtorrent.py
@@ -12,6 +12,7 @@
from swh.scheduler.updater.events import SWHEvent
from swh.scheduler.updater.ghtorrent import (INTERESTING_EVENT_KEYS,
GHTorrentConsumer, events)
+from swh.scheduler.updater.backend import SchedulerUpdaterBackend
from . import UpdaterTestUtil, from_regex
@@ -60,28 +61,35 @@
class GHTorrentConsumerTest(UpdaterTestUtil, unittest.TestCase):
def setUp(self):
- self.fake_config = {
- 'conn': {
- 'url': 'amqp://u:p@https://somewhere:9807',
+ config = {
+ 'ghtorrent': {
+ 'rabbitmq': {
+ 'conn': {
+ 'url': 'amqp://u:p@https://somewhere:9807',
+ },
+ 'prefetch_read': 17,
+ },
+ 'batch_cache_write': 42,
+ },
+ 'scheduler_updater': {
+ 'cls': 'local',
+ 'args': {
+ 'db': 'dbname=softwareheritage-scheduler-updater-dev',
+ },
},
- 'debug': True,
- 'batch_cache_write': 10,
- 'rabbitmq_prefetch_read': 100,
}
- self.consumer = GHTorrentConsumer(self.fake_config,
- _connection_class=FakeConnection)
+ GHTorrentConsumer.connection_class = FakeConnection
+ with patch.object(
+ SchedulerUpdaterBackend, '__init__', return_value=None):
+ self.consumer = GHTorrentConsumer(**config)
- def test_init(self):
+ @patch('swh.scheduler.updater.backend.SchedulerUpdaterBackend')
+ def test_init(self, mock_backend):
# given
# check init is ok
- self.assertEqual(self.consumer.debug,
- self.fake_config['debug'])
- self.assertEqual(self.consumer.batch,
- self.fake_config['batch_cache_write'])
- self.assertEqual(self.consumer.prefetch_read,
- self.fake_config['rabbitmq_prefetch_read'])
- self.assertEqual(self.consumer.config, self.fake_config)
+ self.assertEqual(self.consumer.batch, 42)
+ self.assertEqual(self.consumer.prefetch_read, 17)
def test_has_events(self):
self.assertTrue(self.consumer.has_events())
@@ -92,7 +100,7 @@
# then
self.assertEqual(self.consumer.conn._conn_string,
- self.fake_config['conn']['url'])
+ 'amqp://u:p@https://somewhere:9807')
self.assertTrue(self.consumer.conn._connect)
self.assertFalse(self.consumer.conn._release)
@@ -130,8 +138,10 @@
input_event = self._make_incomplete_event(
event_type, name, 'git', missing_data_key)
+ logger = self.consumer.log
+ del self.consumer.log # prevent gazillions of warnings
actual_converted_event = self.consumer.convert_event(input_event)
-
+ self.consumer.log = logger
self.assertIsNone(actual_converted_event)
@patch('swh.scheduler.updater.ghtorrent.collect_replies')
@@ -160,5 +170,5 @@
self.consumer.channel,
'fake-queue',
no_ack=False,
- limit=self.fake_config['rabbitmq_prefetch_read']
+ limit=17
)
diff --git a/swh/scheduler/tests/updater/test_writer.py b/swh/scheduler/tests/updater/test_writer.py
--- a/swh/scheduler/tests/updater/test_writer.py
+++ b/swh/scheduler/tests/updater/test_writer.py
@@ -52,26 +52,26 @@
'scheduler': {
'cls': 'local',
'args': {
- 'scheduling_db': 'dbname=softwareheritage-scheduler-test',
+ 'db': 'dbname=softwareheritage-scheduler-test',
},
},
'scheduler_updater': {
- 'scheduling_updater_db':
- 'dbname=softwareheritage-scheduler-updater-test',
- 'cache_read_limit': 5,
+ 'cls': 'local',
+ 'args': {
+ 'db':
+ 'dbname=softwareheritage-scheduler-updater-test',
+ 'cache_read_limit': 5,
+ },
+ },
+ 'updater_writer': {
+ 'pause': 0.1,
+ 'verbose': False,
},
- 'pause': 0.1,
- 'verbose': False,
}
self.writer = UpdaterWriter(**config)
self.scheduler_backend = self.writer.scheduler_backend
self.scheduler_updater_backend = self.writer.scheduler_updater_backend
- def tearDown(self):
- self.scheduler_backend.close_connection()
- self.scheduler_updater_backend.close_connection()
- super().tearDown()
-
def test_run_ko(self):
"""Only git tasks are supported for now, other types are dismissed.
diff --git a/swh/scheduler/updater/backend.py b/swh/scheduler/updater/backend.py
--- a/swh/scheduler/updater/backend.py
+++ b/swh/scheduler/updater/backend.py
@@ -5,33 +5,43 @@
from arrow import utcnow
-from swh.core.config import SWHConfig
-from swh.scheduler.backend import DbBackend, autocommit
+import psycopg2.pool
+import psycopg2.extras
+from swh.scheduler.backend import DbBackend
+from swh.core.db.common import db_transaction, db_transaction_generator
-class SchedulerUpdaterBackend(SWHConfig, DbBackend):
+class SchedulerUpdaterBackend:
CONFIG_BASE_FILENAME = 'backend/scheduler-updater'
- DEFAULT_CONFIG = {
- 'scheduling_updater_db': (
- 'str', 'dbname=softwareheritage-scheduler-updater-dev'),
- 'cache_read_limit': ('int', 1000),
- }
-
- def __init__(self, **override_config):
- super().__init__()
- if override_config:
- self.config = override_config
+# 'cache_read_limit': ('int', 1000),
+
+ def __init__(self, db, cache_read_limit=1000,
+ min_pool_conns=1, max_pool_conns=10):
+ """
+ Args:
+ db_conn: either a libpq connection string, or a psycopg2 connection
+
+ """
+ if isinstance(db, psycopg2.extensions.connection):
+ self._pool = None
+ self._db = DbBackend(db)
else:
- self.config = self.parse_config_file(global_config=False)
- self.db = None
- self.db_conn_dsn = self.config['scheduling_updater_db']
- self.limit = self.config['cache_read_limit']
- self.reconnect()
+ self._pool = psycopg2.pool.ThreadedConnectionPool(
+ min_pool_conns, max_pool_conns, db,
+ cursor_factory=psycopg2.extras.RealDictCursor,
+ )
+ self._db = None
+ self.limit = cache_read_limit
+
+ def get_db(self):
+ if self._db:
+ return self._db
+ return DbBackend.from_pool(self._pool)
cache_put_keys = ['url', 'cnt', 'last_seen', 'origin_type']
- @autocommit
- def cache_put(self, events, timestamp=None, cursor=None):
+ @db_transaction()
+ def cache_put(self, events, timestamp=None, db=None, cur=None):
"""Write new events in the backend.
"""
@@ -45,17 +55,16 @@
if seen is None:
event['last_seen'] = timestamp
yield event
-
- cursor.execute('select swh_mktemp_cache()')
- self.copy_to(prepare_events(events),
- 'tmp_cache', self.cache_put_keys, cursor=cursor)
- cursor.execute('select swh_cache_put()')
+ cur.execute('select swh_mktemp_cache()')
+ db.copy_to(prepare_events(events),
+ 'tmp_cache', self.cache_put_keys, cursor=cur)
+ cur.execute('select swh_cache_put()')
cache_read_keys = ['id', 'url', 'origin_type', 'cnt', 'first_seen',
'last_seen']
- @autocommit
- def cache_read(self, timestamp=None, limit=None, cursor=None):
+ @db_transaction_generator()
+ def cache_read(self, timestamp=None, limit=None, db=None, cur=None):
"""Read events from the cache prior to timestamp.
"""
@@ -65,18 +74,18 @@
if not limit:
limit = self.limit
- q = self._format_query('select {keys} from swh_cache_read(%s, %s)',
- self.cache_read_keys)
- cursor.execute(q, (timestamp, limit))
- for r in cursor.fetchall():
+ q = db._format_query('select {keys} from swh_cache_read(%s, %s)',
+ self.cache_read_keys)
+ cur.execute(q, (timestamp, limit))
+ for r in cur.fetchall():
r['id'] = r['id'].tobytes()
yield r
- @autocommit
- def cache_remove(self, entries, cursor=None):
+ @db_transaction()
+ def cache_remove(self, entries, db=None, cur=None):
"""Clean events from the cache
"""
q = 'delete from cache where url in (%s)' % (
', '.join(("'%s'" % e for e in entries)), )
- cursor.execute(q)
+ cur.execute(q)
diff --git a/swh/scheduler/updater/consumer.py b/swh/scheduler/updater/consumer.py
--- a/swh/scheduler/updater/consumer.py
+++ b/swh/scheduler/updater/consumer.py
@@ -7,21 +7,19 @@
from abc import ABCMeta, abstractmethod
-from swh.scheduler.updater.backend import SchedulerUpdaterBackend
-
class UpdaterConsumer(metaclass=ABCMeta):
"""Event consumer
"""
- def __init__(self, batch=1000, backend_class=SchedulerUpdaterBackend,
- log_class='swh.scheduler.updater.consumer.UpdaterConsumer'):
+ def __init__(self, backend, batch_cache_write=1000):
super().__init__()
self._reset_cache()
- self.backend = backend_class()
- self.batch = batch
+ self.backend = backend
+ self.batch = int(batch_cache_write)
logging.basicConfig(level=logging.DEBUG)
- self.log = logging.getLogger(log_class)
+ self.log = logging.getLogger('%s.%s' % (
+ self.__class__.__module__, self.__class__.__name__))
def _reset_cache(self):
"""Reset internal cache.
diff --git a/swh/scheduler/updater/ghtorrent/__init__.py b/swh/scheduler/updater/ghtorrent/__init__.py
--- a/swh/scheduler/updater/ghtorrent/__init__.py
+++ b/swh/scheduler/updater/ghtorrent/__init__.py
@@ -8,9 +8,11 @@
from kombu import Connection, Exchange, Queue
from kombu.common import collect_replies
-from swh.core.config import SWHConfig
+from swh.core.config import merge_configs
+
from swh.scheduler.updater.events import SWHEvent
from swh.scheduler.updater.consumer import UpdaterConsumer
+from swh.scheduler.updater.backend import SchedulerUpdaterBackend
events = {
@@ -32,65 +34,60 @@
]
}
+INTERESTING_EVENT_KEYS = ['type', 'repo', 'created_at']
-class RabbitMQConn(SWHConfig):
- """RabbitMQ Connection class
+DEFAULT_CONFIG = {
+ 'ghtorrent': {
+ 'batch_cache_write': 1000,
+ 'rabbitmq': {
+ 'prefetch_read': 100,
+ 'conn': {
+ 'url': 'amqp://guest:guest@localhost:5672',
+ 'exchange_name': 'ght-streams',
+ 'routing_key': 'something',
+ 'queue_name': 'fake-events',
+ },
+ },
+ },
+ 'scheduler_updater': {
+ 'cls': 'local',
+ 'args': {
+ 'db': 'dbname=softwareheritage-scheduler-updater-dev',
+ 'cache_read_limit': 1000,
+ },
+ },
+}
- """
- CONFIG_BASE_FILENAME = 'backend/ghtorrent'
-
- DEFAULT_CONFIG = {
- 'conn': ('dict', {
- 'url': 'amqp://guest:guest@localhost:5672',
- 'exchange_name': 'ght-streams',
- 'routing_key': 'something',
- 'queue_name': 'fake-events'
- })
- }
- ADDITIONAL_CONFIG = {}
+class GHTorrentConsumer(UpdaterConsumer):
+ """GHTorrent events consumer
+
+ """
+ connection_class = Connection
def __init__(self, **config):
- super().__init__()
- if config and set(config.keys()) - {'log_class'} != set():
- self.config = config
- else:
- self.config = self.parse_config_file(
- additional_configs=[self.ADDITIONAL_CONFIG])
-
- self.conn_string = self.config['conn']['url']
- self.exchange = Exchange(self.config['conn']['exchange_name'],
- 'topic', durable=True)
- self.routing_key = self.config['conn']['routing_key']
- self.queue = Queue(self.config['conn']['queue_name'],
- exchange=self.exchange,
- routing_key=self.routing_key,
+ self.config = merge_configs(DEFAULT_CONFIG, config)
+
+ ght_config = self.config['ghtorrent']
+ rmq_config = ght_config['rabbitmq']
+ self.prefetch_read = int(rmq_config.get('prefetch_read', 100))
+
+ exchange = Exchange(
+ rmq_config['conn']['exchange_name'],
+ 'topic', durable=True)
+ routing_key = rmq_config['conn']['routing_key']
+ self.queue = Queue(rmq_config['conn']['queue_name'],
+ exchange=exchange,
+ routing_key=routing_key,
auto_delete=True)
+ if self.config['scheduler_updater']['cls'] != 'local':
+ raise ValueError(
+ 'The scheduler_updater can only be a cls=local for now')
+ backend = SchedulerUpdaterBackend(
+ **self.config['scheduler_updater']['args'])
-INTERESTING_EVENT_KEYS = ['type', 'repo', 'created_at']
-
-
-class GHTorrentConsumer(RabbitMQConn, UpdaterConsumer):
- """GHTorrent events consumer
-
- """
- ADDITIONAL_CONFIG = {
- 'debug': ('bool', False),
- 'batch_cache_write': ('int', 1000),
- 'rabbitmq_prefetch_read': ('int', 100),
- }
-
- def __init__(self, config=None, _connection_class=Connection):
- if config is None:
- super().__init__(
- log_class='swh.scheduler.updater.ghtorrent.GHTorrentConsumer')
- else:
- self.config = config
- self._connection_class = _connection_class
- self.debug = self.config['debug']
- self.batch = self.config['batch_cache_write']
- self.prefetch_read = self.config['rabbitmq_prefetch_read']
+ super().__init__(backend, ght_config.get('batch_cache_write', 1000))
def has_events(self):
"""Always has events
@@ -104,11 +101,10 @@
"""
if isinstance(event, str):
event = json.loads(event)
-
for k in INTERESTING_EVENT_KEYS:
if k not in event:
if hasattr(self, 'log'):
- self.log.warn(
+ self.log.warning(
'Event should have the \'%s\' entry defined' % k)
return None
@@ -126,7 +122,8 @@
"""Open rabbitmq connection
"""
- self.conn = self._connection_class(self.config['conn']['url'])
+ self.conn = self.connection_class(
+ self.config['ghtorrent']['rabbitmq']['conn']['url'])
self.conn.connect()
self.channel = self.conn.channel()
diff --git a/swh/scheduler/updater/ghtorrent/cli.py b/swh/scheduler/updater/ghtorrent/cli.py
--- a/swh/scheduler/updater/ghtorrent/cli.py
+++ b/swh/scheduler/updater/ghtorrent/cli.py
@@ -4,24 +4,19 @@
# See top-level LICENSE file for more information
import click
-import logging
-
-from swh.scheduler.updater.ghtorrent import GHTorrentConsumer
@click.command()
@click.option('--verbose/--no-verbose', '-v', default=False,
help='Verbose mode')
-def main(verbose):
+@click.pass_context
+def main(ctx, verbose):
"""Consume events from ghtorrent and write them to cache.
"""
- log = logging.getLogger('swh.scheduler.updater.ghtorrent.cli')
- log.addHandler(logging.StreamHandler())
- _loglevel = logging.DEBUG if verbose else logging.INFO
- log.setLevel(_loglevel)
-
- GHTorrentConsumer().run()
+ click.echo("Deprecated! Use 'swh-scheduler updater' instead.",
+ err=True)
+ ctx.exit(1)
if __name__ == '__main__':
diff --git a/swh/scheduler/updater/writer.py b/swh/scheduler/updater/writer.py
--- a/swh/scheduler/updater/writer.py
+++ b/swh/scheduler/updater/writer.py
@@ -9,14 +9,13 @@
from arrow import utcnow
-from swh.core.config import SWHConfig
from swh.core import utils
from swh.scheduler import get_scheduler
from swh.scheduler.utils import create_oneshot_task_dict
from swh.scheduler.updater.backend import SchedulerUpdaterBackend
-class UpdaterWriter(SWHConfig):
+class UpdaterWriter:
"""Updater writer in charge of updating the scheduler db with latest
prioritized oneshot tasks
@@ -26,41 +25,18 @@
- dumps them into the scheduler db
"""
- CONFIG_BASE_FILENAME = 'backend/scheduler-updater-writer'
- DEFAULT_CONFIG = {
- # access to the scheduler backend
- 'scheduler': ('dict', {
- 'cls': 'local',
- 'args': {
- 'scheduling_db': 'dbname=softwareheritage-scheduler-dev',
- },
- }),
- # access to the scheduler updater cache
- 'scheduler_updater': ('dict', {
- 'scheduling_updater_db':
- 'dbname=softwareheritage-scheduler-updater-dev',
- 'cache_read_limit': 1000,
- }),
- # waiting time between db reads
- 'pause': ('int', 10),
- # verbose or not
- 'verbose': ('bool', False),
- }
def __init__(self, **config):
- if config:
- self.config = config
- else:
- self.config = self.parse_config_file()
-
+ self.config = config
+ if self.config['scheduler_updater']['cls'] != 'local':
+ raise ValueError(
+ 'The scheduler_updater can only be a cls=local for now')
self.scheduler_updater_backend = SchedulerUpdaterBackend(
- **self.config['scheduler_updater'])
+ **self.config['scheduler_updater']['args'])
self.scheduler_backend = get_scheduler(**self.config['scheduler'])
- self.pause = self.config['pause']
+ self.pause = self.config.get('updater_writer', {}).get('pause', 10)
self.log = logging.getLogger(
'swh.scheduler.updater.writer.UpdaterWriter')
- self.log.setLevel(
- logging.DEBUG if self.config['verbose'] else logging.INFO)
def convert_to_oneshot_task(self, event):
"""Given an event, convert it into oneshot task with priority
@@ -109,13 +85,11 @@
@click.command()
@click.option('--verbose/--no-verbose', '-v', default=False,
help='Verbose mode')
-def main(verbose):
- log = logging.getLogger('swh.scheduler.updater.writer')
- log.addHandler(logging.StreamHandler())
- _loglevel = logging.DEBUG if verbose else logging.INFO
- log.setLevel(_loglevel)
-
- UpdaterWriter().run()
+@click.pass_context
+def main(ctx, verbose):
+ click.echo("Deprecated! Use 'swh-scheduler updater' instead.",
+ err=True)
+ ctx.exit(1)
if __name__ == '__main__':

File Metadata

Mime Type
text/plain
Expires
Tue, Dec 17, 3:31 PM (2 d, 20 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3223919

Event Timeline