diff --git a/sql/updater/sql/Makefile b/sql/updater/sql/Makefile deleted file mode 100644 --- a/sql/updater/sql/Makefile +++ /dev/null @@ -1,73 +0,0 @@ -# Depends: postgresql-client, postgresql-autodoc - -DBNAME = softwareheritage-scheduler-updater-dev -DOCDIR = autodoc - -SQL_INIT = 10-swh-init.sql -SQL_SCHEMA = 30-swh-schema.sql -SQL_FUNC = 40-swh-func.sql -SQLS = $(SQL_INIT) $(SQL_SCHEMA) $(SQL_FUNC) -SQL_FILES = $(abspath $(addprefix \ - $(CURDIR)/../../../swh/scheduler/sql/updater/,$(SQLS))) - -PSQL_BIN = psql -PSQL_FLAGS = --echo-all -X -v ON_ERROR_STOP=1 -PSQL = $(PSQL_BIN) $(PSQL_FLAGS) - -PIFPAF=$(findstring postgresql://,$(PIFPAF_URLS)) - -all: - -createdb: createdb-stamp -createdb-stamp: $(SQL_FILES) -ifeq ($(PIFPAF),) - -dropdb $(DBNAME) -endif - createdb $(DBNAME) -ifeq ($(PIFPAF),) - touch $@ -else - rm -f $@ -endif - -filldb: filldb-stamp -filldb-stamp: createdb-stamp - cat $(SQL_FILES) | $(PSQL) $(DBNAME) -ifeq ($(PIFPAF),) - touch $@ -else - rm -f $@ -endif - -dropdb: - -dropdb $(DBNAME) - -dumpdb: swh-scheduler-updater.dump -swh-scheduler-updater.dump: filldb-stamp - pg_dump -Fc $(DBNAME) > $@ - -$(DOCDIR): - test -d $(DOCDIR)/ || mkdir $(DOCDIR) - -doc: autodoc-stamp $(DOCDIR)/swh-scheduler-updater.pdf -autodoc-stamp: filldb-stamp $(DOCDIR) - postgresql_autodoc -d $(DBNAME) -f $(DOCDIR)/swh-scheduler-updater - cp -a $(DOCDIR)/swh-scheduler-updater.dot $(DOCDIR)/swh-scheduler-updater.dot.orig -ifeq ($(PIFPAF),) - touch $@ -else - rm -f $@ -endif - -$(DOCDIR)/swh-scheduler-updater.pdf: $(DOCDIR)/swh-scheduler-updater.dot autodoc-stamp - dot -T pdf $< > $@ -$(DOCDIR)/swh-scheduler-updater.svg: $(DOCDIR)/swh-scheduler-updater.dot autodoc-stamp - dot -T svg $< > $@ - -clean: - rm -rf *-stamp $(DOCDIR)/ - -distclean: clean dropdb - rm -f swh-scheduler-updater.dump - -.PHONY: all initdb createdb dropdb doc clean diff --git a/swh/scheduler/cli/admin.py b/swh/scheduler/cli/admin.py --- a/swh/scheduler/cli/admin.py +++ b/swh/scheduler/cli/admin.py @@ -88,37 +88,3 @@ if debug is None: debug = ctx.obj['log_level'] <= logging.DEBUG server.app.run(host, port=port, debug=bool(debug)) - - -@cli.command('start-updater') -@click.option('--verbose/--no-verbose', '-v', default=False, - help='Verbose mode') -@click.pass_context -def updater(ctx, verbose): - """Starts a scheduler-updater service. - - Insert tasks in the scheduler from the scheduler-updater's events read from - the db cache (filled e.g. by the ghtorrent consumer service) . - - """ - from swh.scheduler.updater.writer import UpdaterWriter - UpdaterWriter(**ctx.obj['config']).run() - - -@cli.command('start-ghtorrent') -@click.option('--verbose/--no-verbose', '-v', default=False, - help='Verbose mode') -@click.pass_context -def ghtorrent(ctx, verbose): - """Starts a ghtorrent consumer service. - - Consumes events from ghtorrent and write them to a cache. - - """ - from swh.scheduler.updater.ghtorrent import GHTorrentConsumer - from swh.scheduler.updater.backend import SchedulerUpdaterBackend - - ght_config = ctx.obj['config'].get('ghtorrent', {}) - back_config = ctx.obj['config'].get('scheduler_updater', {}) - backend = SchedulerUpdaterBackend(**back_config) - GHTorrentConsumer(backend, **ght_config).run() diff --git a/swh/scheduler/sql/updater/10-swh-init.sql b/swh/scheduler/sql/updater/10-swh-init.sql deleted file mode 100644 --- a/swh/scheduler/sql/updater/10-swh-init.sql +++ /dev/null @@ -1,4 +0,0 @@ -create extension if not exists btree_gist; -create extension if not exists pgcrypto; - -create or replace language plpgsql; diff --git a/swh/scheduler/sql/updater/30-swh-schema.sql b/swh/scheduler/sql/updater/30-swh-schema.sql deleted file mode 100644 --- a/swh/scheduler/sql/updater/30-swh-schema.sql +++ /dev/null @@ -1,29 +0,0 @@ -create table dbversion -( - version int primary key, - release timestamptz not null, - description text not null -); - -comment on table dbversion is 'Schema update tracking'; - --- a SHA1 checksum (not necessarily originating from Git) -create domain sha1 as bytea check (length(value) = 20); - -insert into dbversion (version, release, description) - values (1, now(), 'Work In Progress'); - -create type origin_type as enum ('git', 'svn', 'hg', 'deb'); -comment on type origin_type is 'Url''s repository type'; - -create table cache ( - id sha1 primary key, - url text not null, - origin_type origin_type not null, - cnt int default 1, - first_seen timestamptz not null default now(), - last_seen timestamptz not null - ); - -create index on cache(url); -create index on cache(last_seen); diff --git a/swh/scheduler/sql/updater/40-swh-func.sql b/swh/scheduler/sql/updater/40-swh-func.sql deleted file mode 100644 --- a/swh/scheduler/sql/updater/40-swh-func.sql +++ /dev/null @@ -1,48 +0,0 @@ --- Postgresql index helper function -create or replace function hash_sha1(text) - returns sha1 -as $$ - select public.digest($1, 'sha1') :: sha1 -$$ language sql strict immutable; - -comment on function hash_sha1(text) is 'Compute sha1 hash as text'; - --- create a temporary table for cache tmp_cache, -create or replace function swh_mktemp_cache() - returns void - language sql -as $$ - create temporary table tmp_cache ( - like cache including defaults - ) on commit drop; - alter table tmp_cache drop column id; -$$; - -create or replace function swh_cache_put() - returns void - language plpgsql -as $$ -begin - insert into cache (id, url, origin_type, cnt, last_seen) - select hash_sha1(url), url, origin_type, cnt, last_seen - from tmp_cache t - on conflict(id) - do update set cnt = (select cnt from cache where id=excluded.id) + excluded.cnt, - last_seen = excluded.last_seen; - return; -end -$$; - -comment on function swh_cache_put() is 'Write to cache temporary events'; - -create or replace function swh_cache_read(ts timestamptz, lim integer) - returns setof cache - language sql stable -as $$ - select id, url, origin_type, cnt, first_seen, last_seen - from cache - where last_seen <= ts - limit lim; -$$; - -comment on function swh_cache_read(timestamptz, integer) is 'Read cache entries'; diff --git a/swh/scheduler/tests/updater/__init__.py b/swh/scheduler/tests/updater/__init__.py deleted file mode 100644 --- a/swh/scheduler/tests/updater/__init__.py +++ /dev/null @@ -1,53 +0,0 @@ -# Copyright (C) 2018 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -from arrow import utcnow - -try: - from hypothesis.strategies import from_regex -except ImportError: - from hypothesis.strategies import text - - # Revert to using basic text generation - def from_regex(*args, **kwargs): - return text() - - -class UpdaterTestUtil: - """Mixin intended for event generation purposes - - """ - def _make_event(self, event_type, name, origin_type): - return { - 'type': event_type, - 'repo': { - 'name': name, - }, - 'created_at': utcnow(), - 'origin_type': origin_type, - } - - def _make_events(self, events): - for event_type, repo_name, origin_type in events: - yield self._make_event(event_type, repo_name, origin_type) - - def _make_incomplete_event(self, event_type, name, origin_type, - missing_data_key): - event = self._make_event(event_type, name, origin_type) - del event[missing_data_key] - return event - - def _make_incomplete_events(self, events): - for event_type, repo_name, origin_type, missing_data_key in events: - yield self._make_incomplete_event(event_type, repo_name, - origin_type, missing_data_key) - - def _make_simple_event(self, event_type, name, origin_type): - return { - 'type': event_type, - 'url': 'https://fakeurl/%s' % name, - 'origin_type': origin_type, - 'created_at': utcnow(), - } diff --git a/swh/scheduler/tests/updater/conftest.py b/swh/scheduler/tests/updater/conftest.py deleted file mode 100644 --- a/swh/scheduler/tests/updater/conftest.py +++ /dev/null @@ -1,68 +0,0 @@ -import pytest -import glob -import os -from arrow import utcnow # XXX - -from swh.core.utils import numfile_sortkey as sortkey -from swh.scheduler.updater.backend import SchedulerUpdaterBackend -from swh.scheduler.tests import SQL_DIR -import swh.scheduler.tests.conftest # noqa - - -DUMP_FILES = os.path.join(SQL_DIR, 'updater', '*.sql') - - -@pytest.fixture -def swh_scheduler_updater(postgresql): - config = { - 'db': postgresql.dsn, - } - - all_dump_files = sorted(glob.glob(DUMP_FILES), key=sortkey) - - cursor = postgresql.cursor() - for fname in all_dump_files: - with open(fname) as fobj: - cursor.execute(fobj.read()) - postgresql.commit() - - backend = SchedulerUpdaterBackend(**config) - return backend - - -def make_event(event_type, name, origin_type): - return { - 'type': event_type, - 'repo': { - 'name': name, - }, - 'created_at': utcnow(), - 'origin_type': origin_type, - } - - -def make_simple_event(event_type, name, origin_type): - return { - 'type': event_type, - 'url': 'https://fakeurl/%s' % name, - 'origin_type': origin_type, - 'created_at': utcnow(), - } - - -def make_events(events): - for event_type, repo_name, origin_type in events: - yield make_event(event_type, repo_name, origin_type) - - -def make_incomplete_event(event_type, name, origin_type, - missing_data_key): - event = make_event(event_type, name, origin_type) - del event[missing_data_key] - return event - - -def make_incomplete_events(events): - for event_type, repo_name, origin_type, missing_data_key in events: - yield make_incomplete_event(event_type, repo_name, - origin_type, missing_data_key) diff --git a/swh/scheduler/tests/updater/test_backend.py b/swh/scheduler/tests/updater/test_backend.py deleted file mode 100644 --- a/swh/scheduler/tests/updater/test_backend.py +++ /dev/null @@ -1,36 +0,0 @@ -# Copyright (C) 2018 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -from hypothesis import given -from hypothesis.strategies import sets - -from swh.scheduler.updater.events import SWHEvent - -from . import from_regex - - -@given(urls=sets( - from_regex( - r'^https://somewhere[.]org/[a-z0-9]{5,7}/[a-z0-9]{3,10}$'), - min_size=10, max_size=15)) -def test_cache_read(urls, swh_scheduler_updater): - # beware that the fixture is only called once for all the tests - # generated by hypothesis, so the db is not cleared between calls. - # see the end of - # https://hypothesis.works/articles/hypothesis-pytest-fixtures/ - def gen_events(urls): - for url in urls: - yield SWHEvent({ - 'url': url, - 'type': 'create', - 'origin_type': 'git', - }) - known_urls = set(e['url'] for e in - swh_scheduler_updater.cache_read(limit=1000000)) - swh_scheduler_updater.cache_put(gen_events(urls)) - new_urls = {u.strip() for u in urls} - known_urls - all_urls = set(e['url'] for e in - swh_scheduler_updater.cache_read(limit=1000000)) - assert (all_urls - known_urls) == new_urls diff --git a/swh/scheduler/tests/updater/test_consumer.py b/swh/scheduler/tests/updater/test_consumer.py deleted file mode 100644 --- a/swh/scheduler/tests/updater/test_consumer.py +++ /dev/null @@ -1,199 +0,0 @@ -# Copyright (C) 2018 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import unittest -from itertools import chain - -from hypothesis import given, settings, HealthCheck -from hypothesis.strategies import lists, sampled_from, text, tuples - -from swh.scheduler.updater.consumer import UpdaterConsumer -from swh.scheduler.updater.events import LISTENED_EVENTS, SWHEvent - -from . import UpdaterTestUtil, from_regex - - -class FakeSchedulerUpdaterBackend: - def __init__(self): - self.events = [] - - def cache_put(self, events): - self.events.append(events) - - -class FakeUpdaterConsumerBase(UpdaterConsumer): - def __init__(self, backend): - super().__init__(backend) - self.connection_opened = False - self.connection_closed = False - self.consume_called = False - self.has_events_called = False - - def open_connection(self): - self.connection_opened = True - - def close_connection(self): - self.connection_closed = True - - def convert_event(self, event): - pass - - -class FakeUpdaterConsumerRaise(FakeUpdaterConsumerBase): - def has_events(self): - self.has_events_called = True - return True - - def consume_events(self): - self.consume_called = True - raise ValueError('Broken stuff') - - -class UpdaterConsumerRaisingTest(unittest.TestCase): - def setUp(self): - self.updater = FakeUpdaterConsumerRaise( - FakeSchedulerUpdaterBackend()) - - def test_running_raise(self): - """Raising during run should finish fine. - - """ - # given - self.assertEqual(self.updater.count, 0) - self.assertEqual(self.updater.seen_events, set()) - self.assertEqual(self.updater.events, []) - - # when - with self.assertRaisesRegex(ValueError, 'Broken stuff'): - self.updater.run() - - # then - self.assertEqual(self.updater.count, 0) - self.assertEqual(self.updater.seen_events, set()) - self.assertEqual(self.updater.events, []) - self.assertTrue(self.updater.connection_opened) - self.assertTrue(self.updater.has_events_called) - self.assertTrue(self.updater.connection_closed) - self.assertTrue(self.updater.consume_called) - - -class FakeUpdaterConsumerNoEvent(FakeUpdaterConsumerBase): - def has_events(self): - self.has_events_called = True - return False - - def consume_events(self): - self.consume_called = True - - -class UpdaterConsumerNoEventTest(unittest.TestCase): - def setUp(self): - self.updater = FakeUpdaterConsumerNoEvent( - FakeSchedulerUpdaterBackend()) - - def test_running_does_not_consume(self): - """Run with no events should do just fine""" - # given - self.assertEqual(self.updater.count, 0) - self.assertEqual(self.updater.seen_events, set()) - self.assertEqual(self.updater.events, []) - - # when - self.updater.run() - - # then - self.assertEqual(self.updater.count, 0) - self.assertEqual(self.updater.seen_events, set()) - self.assertEqual(self.updater.events, []) - self.assertTrue(self.updater.connection_opened) - self.assertTrue(self.updater.has_events_called) - self.assertTrue(self.updater.connection_closed) - self.assertFalse(self.updater.consume_called) - - -EVENT_KEYS = ['type', 'repo', 'created_at', 'origin_type'] - - -class FakeUpdaterConsumer(FakeUpdaterConsumerBase): - def __init__(self, backend, messages): - super().__init__(backend) - self.messages = messages - self.debug = False - - def has_events(self): - self.has_events_called = True - return len(self.messages) > 0 - - def consume_events(self): - self.consume_called = True - for msg in self.messages: - yield msg - self.messages.pop() - - def convert_event(self, event, keys=EVENT_KEYS): - for k in keys: - v = event.get(k) - if v is None: - return None - - e = { - 'type': event['type'], - 'url': 'https://fake.url/%s' % event['repo']['name'], - 'last_seen': event['created_at'], - 'origin_type': event['origin_type'], - } - return SWHEvent(e) - - -class UpdaterConsumerWithEventTest(UpdaterTestUtil, unittest.TestCase): - @settings(suppress_health_check=[HealthCheck.too_slow]) - @given(lists(tuples(sampled_from(LISTENED_EVENTS), # event type - from_regex(r'^[a-z0-9]{5,10}/[a-z0-9]{7,12}$'), # name - text()), # origin type - min_size=3, max_size=10), - lists(tuples(text(), # event type - from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$'), # name - text()), # origin type - min_size=3, max_size=10), - lists(tuples(sampled_from(LISTENED_EVENTS), # event type - from_regex(r'^[a-z0-9]{5,10}/[a-z0-9]{7,12}$'), # name - text(), # origin type - sampled_from(EVENT_KEYS)), # keys to drop - min_size=3, max_size=10)) - def test_running(self, events, uninteresting_events, incomplete_events): - """Interesting events are written to cache, others are dropped - - """ - # given - ready_events = self._make_events(events) - ready_uninteresting_events = self._make_events(uninteresting_events) - ready_incomplete_events = self._make_incomplete_events( - incomplete_events) - - updater = FakeUpdaterConsumer( - FakeSchedulerUpdaterBackend(), - list(chain( - ready_events, ready_incomplete_events, - ready_uninteresting_events))) - - self.assertEqual(updater.count, 0) - self.assertEqual(updater.seen_events, set()) - self.assertEqual(updater.events, []) - - # when - updater.run() - - # then - self.assertEqual(updater.count, 0) - self.assertEqual(updater.seen_events, set()) - self.assertEqual(updater.events, []) - self.assertTrue(updater.connection_opened) - self.assertTrue(updater.has_events_called) - self.assertTrue(updater.connection_closed) - self.assertTrue(updater.consume_called) - - self.assertEqual(updater.messages, []) - # uninteresting or incomplete events are dropped - self.assertTrue(len(updater.backend.events), len(events)) diff --git a/swh/scheduler/tests/updater/test_events.py b/swh/scheduler/tests/updater/test_events.py deleted file mode 100644 --- a/swh/scheduler/tests/updater/test_events.py +++ /dev/null @@ -1,44 +0,0 @@ -# Copyright (C) 2018 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import unittest - -from hypothesis import given -from hypothesis.strategies import sampled_from, text - -from swh.scheduler.updater.events import LISTENED_EVENTS, SWHEvent -from swh.scheduler.updater.ghtorrent import events - -from . import UpdaterTestUtil - - -def event_values_ko(): - return set(events['evt']).union( - set(events['ent'])).difference( - set(LISTENED_EVENTS)) - - -WRONG_EVENTS = sorted(list(event_values_ko())) - - -class EventTest(UpdaterTestUtil, unittest.TestCase): - @given(sampled_from(LISTENED_EVENTS), text(), text()) - def test_is_interesting_ok(self, event_type, name, origin_type): - evt = self._make_simple_event(event_type, name, origin_type) - self.assertTrue(SWHEvent(evt).is_interesting()) - - @given(text(), text(), text()) - def test_is_interested_with_noisy_event_should_be_ko( - self, event_type, name, origin_type): - if event_type in LISTENED_EVENTS: - # just in case something good is generated, skip it - return - evt = self._make_simple_event(event_type, name, origin_type) - self.assertFalse(SWHEvent(evt).is_interesting()) - - @given(sampled_from(WRONG_EVENTS), text(), text()) - def test_is_interesting_ko(self, event_type, name, origin_type): - evt = self._make_simple_event(event_type, name, origin_type) - self.assertFalse(SWHEvent(evt).is_interesting()) diff --git a/swh/scheduler/tests/updater/test_ghtorrent.py b/swh/scheduler/tests/updater/test_ghtorrent.py deleted file mode 100644 --- a/swh/scheduler/tests/updater/test_ghtorrent.py +++ /dev/null @@ -1,174 +0,0 @@ -# Copyright (C) 2018 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import unittest -from unittest.mock import patch - -from hypothesis import given -from hypothesis.strategies import sampled_from - -from swh.scheduler.updater.events import SWHEvent -from swh.scheduler.updater.ghtorrent import (INTERESTING_EVENT_KEYS, - GHTorrentConsumer, events) -from swh.scheduler.updater.backend import SchedulerUpdaterBackend - -from . import UpdaterTestUtil, from_regex - - -def event_values(): - return set(events['evt']).union(set(events['ent'])) - - -def ghtorrentize_event_name(event_name): - return '%sEvent' % event_name.capitalize() - - -EVENT_TYPES = sorted([ghtorrentize_event_name(e) for e in event_values()]) - - -class FakeChannel: - """Fake Channel (virtual connection inside a connection) - - """ - def close(self): - self.close = True - - -class FakeConnection: - """Fake Rabbitmq connection for test purposes - - """ - def __init__(self, conn_string): - self._conn_string = conn_string - self._connect = False - self._release = False - self._channel = False - - def connect(self): - self._connect = True - return True - - def release(self): - self._connect = False - self._release = True - - def channel(self): - self._channel = True - return FakeChannel() - - -class GHTorrentConsumerTest(UpdaterTestUtil, unittest.TestCase): - def setUp(self): - config = { - 'ghtorrent': { - 'rabbitmq': { - 'conn': { - 'url': 'amqp://u:p@https://somewhere:9807', - }, - 'prefetch_read': 17, - }, - 'batch_cache_write': 42, - }, - 'scheduler_updater': { - 'cls': 'local', - 'args': { - 'db': 'dbname=softwareheritage-scheduler-updater-dev', - }, - }, - } - - GHTorrentConsumer.connection_class = FakeConnection - with patch.object( - SchedulerUpdaterBackend, '__init__', return_value=None): - self.consumer = GHTorrentConsumer(**config) - - @patch('swh.scheduler.updater.backend.SchedulerUpdaterBackend') - def test_init(self, mock_backend): - # given - # check init is ok - self.assertEqual(self.consumer.batch, 42) - self.assertEqual(self.consumer.prefetch_read, 17) - - def test_has_events(self): - self.assertTrue(self.consumer.has_events()) - - def test_connection(self): - # when - self.consumer.open_connection() - - # then - self.assertEqual(self.consumer.conn._conn_string, - 'amqp://u:p@https://somewhere:9807') - self.assertTrue(self.consumer.conn._connect) - self.assertFalse(self.consumer.conn._release) - - # when - self.consumer.close_connection() - - # then - self.assertFalse(self.consumer.conn._connect) - self.assertTrue(self.consumer.conn._release) - self.assertIsInstance(self.consumer.channel, FakeChannel) - - @given(sampled_from(EVENT_TYPES), - from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$')) - def test_convert_event_ok(self, event_type, name): - input_event = self._make_event(event_type, name, 'git') - actual_event = self.consumer.convert_event(input_event) - - self.assertTrue(isinstance(actual_event, SWHEvent)) - - event = actual_event.get() - - expected_event = { - 'type': event_type.lower().rstrip('Event'), - 'url': 'https://github.com/%s' % name, - 'last_seen': input_event['created_at'], - 'cnt': 1, - 'origin_type': 'git', - } - self.assertEqual(event, expected_event) - - @given(sampled_from(EVENT_TYPES), - from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$'), - sampled_from(INTERESTING_EVENT_KEYS)) - def test_convert_event_ko(self, event_type, name, missing_data_key): - input_event = self._make_incomplete_event( - event_type, name, 'git', missing_data_key) - - logger = self.consumer.log - del self.consumer.log # prevent gazillions of warnings - actual_converted_event = self.consumer.convert_event(input_event) - self.consumer.log = logger - self.assertIsNone(actual_converted_event) - - @patch('swh.scheduler.updater.ghtorrent.collect_replies') - def test_consume_events(self, mock_collect_replies): - # given - self.consumer.queue = 'fake-queue' # hack - self.consumer.open_connection() - - fake_events = [ - self._make_event('PushEvent', 'user/some-repo', 'git'), - self._make_event('PushEvent', 'user2/some-other-repo', 'git'), - ] - - mock_collect_replies.return_value = fake_events - - # when - actual_events = [] - for e in self.consumer.consume_events(): - actual_events.append(e) - - # then - self.assertEqual(fake_events, actual_events) - - mock_collect_replies.assert_called_once_with( - self.consumer.conn, - self.consumer.channel, - 'fake-queue', - no_ack=False, - limit=17 - ) diff --git a/swh/scheduler/tests/updater/test_writer.py b/swh/scheduler/tests/updater/test_writer.py deleted file mode 100644 --- a/swh/scheduler/tests/updater/test_writer.py +++ /dev/null @@ -1,152 +0,0 @@ -# Copyright (C) 2018 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import os -from glob import glob - -import pytest -from pytest_postgresql.factories import postgresql as pg_fixture_factory - -from os.path import join -from swh.core.utils import numfile_sortkey as sortkey -from swh.scheduler.tests import SQL_DIR -from swh.scheduler.updater.events import LISTENED_EVENTS, SWHEvent -from swh.scheduler.updater.writer import UpdaterWriter - -from .conftest import make_simple_event - - -pg_scheduler = pg_fixture_factory('postgresql_proc', 'scheduler') -pg_updater = pg_fixture_factory('postgresql_proc', 'updater') - - -def pg_sched_fact(dbname, sqldir): - @pytest.fixture - def pg_scheduler_db(request): - pg = request.getfixturevalue('pg_%s' % dbname) - dump_files = sorted(glob(os.path.join(sqldir, '*.sql')), - key=sortkey) - with pg.cursor() as cur: - for fname in dump_files: - with open(fname) as fobj: - sql = fobj.read().replace('concurrently', '') - cur.execute(sql) - pg.commit() - yield pg - - return pg_scheduler_db - - -scheduler_db = pg_sched_fact('scheduler', SQL_DIR) -updater_db = pg_sched_fact('updater', join(SQL_DIR, 'updater')) - - -@pytest.fixture -def swh_updater_writer(scheduler_db, updater_db): - config = { - 'scheduler': { - 'cls': 'local', - 'args': { - 'db': scheduler_db.dsn, - }, - }, - 'scheduler_updater': { - 'cls': 'local', - 'args': { - 'db': updater_db.dsn, - 'cache_read_limit': 5, - }, - }, - 'updater_writer': { - 'pause': 0.1, - 'verbose': False, - }, - } - return UpdaterWriter(**config) - - -def test_run_ko(swh_updater_writer): - """Only git tasks are supported for now, other types are dismissed. - - """ - scheduler = swh_updater_writer.scheduler_backend - updater = swh_updater_writer.scheduler_updater_backend - - ready_events = [ - SWHEvent( - make_simple_event(event_type, 'origin-%s' % i, - 'svn')) - for i, event_type in enumerate(LISTENED_EVENTS) - ] - - updater.cache_put(ready_events) - list(updater.cache_read()) - - r = scheduler.peek_ready_tasks('load-git') - - # first read on an empty scheduling db results with nothing in it - assert not r - - # Read from cache to scheduler db - swh_updater_writer.run() - - r = scheduler.peek_ready_tasks('load-git') - - # other reads after writes are still empty since it's not supported - assert not r - - -def test_run_ok(swh_updater_writer): - """Only git origin are supported for now - - """ - scheduler = swh_updater_writer.scheduler_backend - updater = swh_updater_writer.scheduler_updater_backend - - ready_events = [ - SWHEvent( - make_simple_event(event_type, 'origin-%s' % i, 'git')) - for i, event_type in enumerate(LISTENED_EVENTS) - ] - - expected_length = len(ready_events) - - updater.cache_put(ready_events) - - data = list(updater.cache_read()) - assert len(data) == expected_length - - r = scheduler.peek_ready_tasks('load-git') - - # first read on an empty scheduling db results with nothing in it - assert not r - - # Read from cache to scheduler db - swh_updater_writer.run() - - # now, we should have scheduling task ready - r = scheduler.peek_ready_tasks('load-git') - - assert len(r) == expected_length - - # Check the task has been scheduled - for t in r: - assert t['type'] == 'load-git' - assert t['priority'] == 'normal' - assert t['policy'] == 'oneshot' - assert t['status'] == 'next_run_not_scheduled' - - # writer has nothing to do now - swh_updater_writer.run() - - # so no more data in cache - data = list(updater.cache_read()) - - assert not data - - # provided, no runner is ran, still the same amount of scheduling tasks - r = scheduler.peek_ready_tasks('load-git') - - assert len(r) == expected_length diff --git a/swh/scheduler/updater/__init__.py b/swh/scheduler/updater/__init__.py deleted file mode 100644 diff --git a/swh/scheduler/updater/backend.py b/swh/scheduler/updater/backend.py deleted file mode 100644 --- a/swh/scheduler/updater/backend.py +++ /dev/null @@ -1,113 +0,0 @@ -# Copyright (C) 2018 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - - -from arrow import utcnow -import psycopg2.pool -import psycopg2.extras - -from swh.core.db import BaseDb -from swh.core.db.common import db_transaction, db_transaction_generator -from swh.scheduler.backend import format_query - - -class SchedulerUpdaterBackend: - CONFIG_BASE_FILENAME = 'backend/scheduler-updater' -# 'cache_read_limit': ('int', 1000), - - def __init__(self, db, cache_read_limit=1000, - min_pool_conns=1, max_pool_conns=10): - """ - Args: - db_conn: either a libpq connection string, or a psycopg2 connection - - """ - if isinstance(db, psycopg2.extensions.connection): - self._pool = None - self._db = BaseDb(db) - else: - self._pool = psycopg2.pool.ThreadedConnectionPool( - min_pool_conns, max_pool_conns, db, - cursor_factory=psycopg2.extras.RealDictCursor, - ) - self._db = None - self.limit = cache_read_limit - - def get_db(self): - if self._db: - return self._db - return BaseDb.from_pool(self._pool) - - def put_db(self, db): - if db is not self._db: - db.put_conn() - - cache_put_keys = ['url', 'cnt', 'last_seen', 'origin_type'] - - @db_transaction() - def cache_put(self, events, timestamp=None, db=None, cur=None): - """Write new events in the backend. - - """ - cur.execute('select swh_mktemp_cache()') - db.copy_to(prepare_events(events, timestamp), - 'tmp_cache', self.cache_put_keys, cur=cur) - cur.execute('select swh_cache_put()') - - cache_read_keys = ['id', 'url', 'origin_type', 'cnt', 'first_seen', - 'last_seen'] - - @db_transaction_generator() - def cache_read(self, timestamp=None, limit=None, db=None, cur=None): - """Read events from the cache prior to timestamp. - - Note that limit=None does not mean 'no limit' but use the default - limit (see cache_read_limit constructor argument). - - """ - if not timestamp: - timestamp = utcnow() - - if not limit: - limit = self.limit - - q = format_query('select {keys} from swh_cache_read(%s, %s)', - self.cache_read_keys) - cur.execute(q, (timestamp, limit)) - yield from cur.fetchall() - - @db_transaction() - def cache_remove(self, entries, db=None, cur=None): - """Clean events from the cache - - """ - q = 'delete from cache where url in (%s)' % ( - ', '.join(("'%s'" % e for e in entries)), ) - cur.execute(q) - - -def prepare_events(events, timestamp=None): - if timestamp is None: - timestamp = utcnow() - outevents = [] - urls = [] - for e in events: - event = e.get() - url = event['url'].strip() - if event['last_seen'] is None: - event['last_seen'] = timestamp - event['url'] = url - - if url in urls: - idx = urls.index(url) - urls.append(urls.pop(idx)) - prev_event = outevents.pop(idx) - event['cnt'] += prev_event['cnt'] - event['last_seen'] = max( - event['last_seen'], prev_event['last_seen']) - else: - urls.append(url) - outevents.append(event) - return outevents diff --git a/swh/scheduler/updater/consumer.py b/swh/scheduler/updater/consumer.py deleted file mode 100644 --- a/swh/scheduler/updater/consumer.py +++ /dev/null @@ -1,138 +0,0 @@ -# Copyright (C) 2018 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import logging - -from abc import ABCMeta, abstractmethod - - -class UpdaterConsumer(metaclass=ABCMeta): - """Event consumer - - """ - def __init__(self, backend, batch_cache_write=1000): - super().__init__() - self._reset_cache() - self.backend = backend - self.batch = int(batch_cache_write) - logging.basicConfig(level=logging.DEBUG) - self.log = logging.getLogger('%s.%s' % ( - self.__class__.__module__, self.__class__.__name__)) - - def _reset_cache(self): - """Reset internal cache. - - """ - self.count = 0 - self.seen_events = set() - self.events = [] - - def is_interesting(self, event): - """Determine if an event is interesting or not. - - Args: - event (SWHEvent): SWH event - - """ - return event.is_interesting() - - @abstractmethod - def convert_event(self, event): - """Parse an event into an SWHEvent. - - """ - pass - - def process_event(self, event): - """Process converted and interesting event. - - Args: - event (SWHEvent): Event to process if deemed interesting - - """ - try: - if event.url in self.seen_events: - event.cnt += 1 - else: - self.events.append(event) - self.seen_events.add(event.url) - self.count += 1 - finally: - if self.count >= self.batch: - if self.events: - self.backend.cache_put(self.events) - self._reset_cache() - - def _flush(self): - """Flush remaining internal cache if any. - - """ - if self.events: - self.backend.cache_put(self.events) - self._reset_cache() - - @abstractmethod - def has_events(self): - """Determine if there remains events to consume. - - Returns - boolean value, true for remaining events, False otherwise - - """ - pass - - @abstractmethod - def consume_events(self): - """The main entry point to consume events. - - This should either yield or return message for consumption. - - """ - pass - - @abstractmethod - def open_connection(self): - """Open a connection to the remote system we are supposed to consume - from. - - """ - pass - - @abstractmethod - def close_connection(self): - """Close opened connection to the remote system. - - """ - pass - - def run(self): - """The main entry point to consume events. - - """ - try: - self.open_connection() - while self.has_events(): - for _event in self.consume_events(): - event = self.convert_event(_event) - if not event: - self.log.warning( - 'Incomplete event dropped %s' % _event) - continue - if not self.is_interesting(event): - continue - if self.debug: - self.log.debug('Event: %s' % event) - try: - self.process_event(event) - except Exception: - self.log.exception( - 'Problem when processing event %s' % _event) - continue - except Exception as e: - self.log.error('Error raised during consumption: %s' % e) - raise e - finally: - self.close_connection() - self._flush() diff --git a/swh/scheduler/updater/events.py b/swh/scheduler/updater/events.py deleted file mode 100644 --- a/swh/scheduler/updater/events.py +++ /dev/null @@ -1,39 +0,0 @@ -# Copyright (C) 2018 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - - -LISTENED_EVENTS = [ - 'delete', - 'public', - 'push' -] - - -class SWHEvent: - """SWH's interesting event (resulting in an origin update) - - """ - def __init__(self, evt, cnt=1): - self.event = evt - self.type = evt['type'].lower() - self.url = evt['url'] - self.last_seen = evt.get('last_seen') - self.cnt = cnt - self.origin_type = evt.get('origin_type') - - def is_interesting(self): - return self.type in LISTENED_EVENTS - - def get(self): - return { - 'type': self.type, - 'url': self.url, - 'last_seen': self.last_seen, - 'cnt': self.cnt, - 'origin_type': self.origin_type - } - - def __str__(self): - return str(self.get()) diff --git a/swh/scheduler/updater/ghtorrent/__init__.py b/swh/scheduler/updater/ghtorrent/__init__.py deleted file mode 100644 --- a/swh/scheduler/updater/ghtorrent/__init__.py +++ /dev/null @@ -1,143 +0,0 @@ -# Copyright (C) 2018 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import json - -from kombu import Connection, Exchange, Queue -from kombu.common import collect_replies - -from swh.core.config import merge_configs - -from swh.scheduler.updater.events import SWHEvent -from swh.scheduler.updater.consumer import UpdaterConsumer -from swh.scheduler.updater.backend import SchedulerUpdaterBackend - - -events = { - # ghtorrent events related to github events (interesting) - 'evt': [ - 'commitcomment', 'create', 'delete', 'deployment', - 'deploymentstatus', 'download', 'follow', 'fork', 'forkapply', - 'gist', 'gollum', 'issuecomment', 'issues', 'member', - 'membership', 'pagebuild', 'public', 'pullrequest', - 'pullrequestreviewcomment', 'push', 'release', 'repository', - 'status', 'teamadd', 'watch' - ], - # ghtorrent events related to mongodb insert (not interesting) - 'ent': [ - 'commit_comments', 'commits', 'followers', 'forks', - 'geo_cache', 'issue_comments', 'issue_events', 'issues', - 'org_members', 'pull_request_comments', 'pull_requests', - 'repo_collaborators', 'repo_labels', 'repos', 'users', 'watchers' - ] -} - -INTERESTING_EVENT_KEYS = ['type', 'repo', 'created_at'] - -DEFAULT_CONFIG = { - 'ghtorrent': { - 'batch_cache_write': 1000, - 'rabbitmq': { - 'prefetch_read': 100, - 'conn': { - 'url': 'amqp://guest:guest@localhost:5672', - 'exchange_name': 'ght-streams', - 'routing_key': 'something', - 'queue_name': 'fake-events', - }, - }, - }, - 'scheduler_updater': { - 'cls': 'local', - 'args': { - 'db': 'dbname=softwareheritage-scheduler-updater-dev', - 'cache_read_limit': 1000, - }, - }, -} - - -class GHTorrentConsumer(UpdaterConsumer): - """GHTorrent events consumer - - """ - connection_class = Connection - - def __init__(self, **config): - self.config = merge_configs(DEFAULT_CONFIG, config) - - ght_config = self.config['ghtorrent'] - rmq_config = ght_config['rabbitmq'] - self.prefetch_read = int(rmq_config.get('prefetch_read', 100)) - - exchange = Exchange( - rmq_config['conn']['exchange_name'], - 'topic', durable=True) - routing_key = rmq_config['conn']['routing_key'] - self.queue = Queue(rmq_config['conn']['queue_name'], - exchange=exchange, - routing_key=routing_key, - auto_delete=True) - - if self.config['scheduler_updater']['cls'] != 'local': - raise ValueError( - 'The scheduler_updater can only be a cls=local for now') - backend = SchedulerUpdaterBackend( - **self.config['scheduler_updater']['args']) - - super().__init__(backend, ght_config.get('batch_cache_write', 1000)) - - def has_events(self): - """Always has events - - """ - return True - - def convert_event(self, event): - """Given ghtorrent event, convert it to a SWHEvent instance. - - """ - if isinstance(event, str): - event = json.loads(event) - for k in INTERESTING_EVENT_KEYS: - if k not in event: - if hasattr(self, 'log'): - self.log.warning( - 'Event should have the \'%s\' entry defined' % k) - return None - - _type = event['type'].lower().rstrip('Event') - _repo_name = 'https://github.com/%s' % event['repo']['name'] - - return SWHEvent({ - 'type': _type, - 'url': _repo_name, - 'last_seen': event['created_at'], - 'origin_type': 'git', - }) - - def open_connection(self): - """Open rabbitmq connection - - """ - self.conn = self.connection_class( - self.config['ghtorrent']['rabbitmq']['conn']['url']) - self.conn.connect() - self.channel = self.conn.channel() - - def close_connection(self): - """Close rabbitmq connection - - """ - self.channel.close() - self.conn.release() - - def consume_events(self): - """Consume and yield queue messages - - """ - yield from collect_replies( - self.conn, self.channel, self.queue, - no_ack=False, limit=self.prefetch_read) diff --git a/swh/scheduler/updater/ghtorrent/cli.py b/swh/scheduler/updater/ghtorrent/cli.py deleted file mode 100644 --- a/swh/scheduler/updater/ghtorrent/cli.py +++ /dev/null @@ -1,23 +0,0 @@ -# Copyright (C) 2018 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import click - - -@click.command() -@click.option('--verbose/--no-verbose', '-v', default=False, - help='Verbose mode') -@click.pass_context -def main(ctx, verbose): - """Consume events from ghtorrent and write them to cache. - - """ - click.echo("Deprecated! Use 'swh-scheduler updater' instead.", - err=True) - ctx.exit(1) - - -if __name__ == '__main__': - main() diff --git a/swh/scheduler/updater/writer.py b/swh/scheduler/updater/writer.py deleted file mode 100644 --- a/swh/scheduler/updater/writer.py +++ /dev/null @@ -1,96 +0,0 @@ -# Copyright (C) 2018 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import click -import logging -import time - -from arrow import utcnow - -from swh.core import utils -from swh.scheduler import get_scheduler -from swh.scheduler.utils import create_oneshot_task_dict -from swh.scheduler.updater.backend import SchedulerUpdaterBackend - - -class UpdaterWriter: - """Updater writer in charge of updating the scheduler db with latest - prioritized oneshot tasks - - In effect, this: - - reads the events from scheduler updater's db - - converts those events into priority oneshot tasks - - dumps them into the scheduler db - - """ - - def __init__(self, **config): - self.config = config - if self.config['scheduler_updater']['cls'] != 'local': - raise ValueError( - 'The scheduler_updater can only be a cls=local for now') - self.scheduler_updater_backend = SchedulerUpdaterBackend( - **self.config['scheduler_updater']['args']) - self.scheduler_backend = get_scheduler(**self.config['scheduler']) - self.pause = self.config.get('updater_writer', {}).get('pause', 10) - self.log = logging.getLogger( - 'swh.scheduler.updater.writer.UpdaterWriter') - - def convert_to_oneshot_task(self, event): - """Given an event, convert it into oneshot task with priority - - Args: - event (dict): The event to convert to task - - """ - if event['origin_type'] == 'git': - return create_oneshot_task_dict( - 'load-git', - event['url'], - priority='normal') - self.log.warning('Type %s is not supported for now, only git' % ( - event['origin_type'], )) - return None - - def write_event_to_scheduler(self, events): - """Write events to the scheduler and yield ids when done""" - # convert events to oneshot tasks - oneshot_tasks = filter(lambda e: e is not None, - map(self.convert_to_oneshot_task, events)) - # write event to scheduler - self.scheduler_backend.create_tasks(list(oneshot_tasks)) - for e in events: - yield e['url'] - - def run(self): - """First retrieve events from cache (including origin_type, cnt), - then convert them into oneshot tasks with priority, then - write them to the scheduler db, at last remove them from - cache. - - """ - while True: - timestamp = utcnow() - events = list(self.scheduler_updater_backend.cache_read(timestamp)) - if not events: - break - for urls in utils.grouper(self.write_event_to_scheduler(events), - n=100): - self.scheduler_updater_backend.cache_remove(urls) - time.sleep(self.pause) - - -@click.command() -@click.option('--verbose/--no-verbose', '-v', default=False, - help='Verbose mode') -@click.pass_context -def main(ctx, verbose): - click.echo("Deprecated! Use 'swh-scheduler updater' instead.", - err=True) - ctx.exit(1) - - -if __name__ == '__main__': - main()