Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9346378
D2354.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
48 KB
Subscribers
None
D2354.diff
View Options
diff --git a/sql/updater/sql/Makefile b/sql/updater/sql/Makefile
deleted file mode 100644
--- a/sql/updater/sql/Makefile
+++ /dev/null
@@ -1,73 +0,0 @@
-# Depends: postgresql-client, postgresql-autodoc
-
-DBNAME = softwareheritage-scheduler-updater-dev
-DOCDIR = autodoc
-
-SQL_INIT = 10-swh-init.sql
-SQL_SCHEMA = 30-swh-schema.sql
-SQL_FUNC = 40-swh-func.sql
-SQLS = $(SQL_INIT) $(SQL_SCHEMA) $(SQL_FUNC)
-SQL_FILES = $(abspath $(addprefix \
- $(CURDIR)/../../../swh/scheduler/sql/updater/,$(SQLS)))
-
-PSQL_BIN = psql
-PSQL_FLAGS = --echo-all -X -v ON_ERROR_STOP=1
-PSQL = $(PSQL_BIN) $(PSQL_FLAGS)
-
-PIFPAF=$(findstring postgresql://,$(PIFPAF_URLS))
-
-all:
-
-createdb: createdb-stamp
-createdb-stamp: $(SQL_FILES)
-ifeq ($(PIFPAF),)
- -dropdb $(DBNAME)
-endif
- createdb $(DBNAME)
-ifeq ($(PIFPAF),)
- touch $@
-else
- rm -f $@
-endif
-
-filldb: filldb-stamp
-filldb-stamp: createdb-stamp
- cat $(SQL_FILES) | $(PSQL) $(DBNAME)
-ifeq ($(PIFPAF),)
- touch $@
-else
- rm -f $@
-endif
-
-dropdb:
- -dropdb $(DBNAME)
-
-dumpdb: swh-scheduler-updater.dump
-swh-scheduler-updater.dump: filldb-stamp
- pg_dump -Fc $(DBNAME) > $@
-
-$(DOCDIR):
- test -d $(DOCDIR)/ || mkdir $(DOCDIR)
-
-doc: autodoc-stamp $(DOCDIR)/swh-scheduler-updater.pdf
-autodoc-stamp: filldb-stamp $(DOCDIR)
- postgresql_autodoc -d $(DBNAME) -f $(DOCDIR)/swh-scheduler-updater
- cp -a $(DOCDIR)/swh-scheduler-updater.dot $(DOCDIR)/swh-scheduler-updater.dot.orig
-ifeq ($(PIFPAF),)
- touch $@
-else
- rm -f $@
-endif
-
-$(DOCDIR)/swh-scheduler-updater.pdf: $(DOCDIR)/swh-scheduler-updater.dot autodoc-stamp
- dot -T pdf $< > $@
-$(DOCDIR)/swh-scheduler-updater.svg: $(DOCDIR)/swh-scheduler-updater.dot autodoc-stamp
- dot -T svg $< > $@
-
-clean:
- rm -rf *-stamp $(DOCDIR)/
-
-distclean: clean dropdb
- rm -f swh-scheduler-updater.dump
-
-.PHONY: all initdb createdb dropdb doc clean
diff --git a/swh/scheduler/cli/admin.py b/swh/scheduler/cli/admin.py
--- a/swh/scheduler/cli/admin.py
+++ b/swh/scheduler/cli/admin.py
@@ -88,37 +88,3 @@
if debug is None:
debug = ctx.obj['log_level'] <= logging.DEBUG
server.app.run(host, port=port, debug=bool(debug))
-
-
-@cli.command('start-updater')
-@click.option('--verbose/--no-verbose', '-v', default=False,
- help='Verbose mode')
-@click.pass_context
-def updater(ctx, verbose):
- """Starts a scheduler-updater service.
-
- Insert tasks in the scheduler from the scheduler-updater's events read from
- the db cache (filled e.g. by the ghtorrent consumer service) .
-
- """
- from swh.scheduler.updater.writer import UpdaterWriter
- UpdaterWriter(**ctx.obj['config']).run()
-
-
-@cli.command('start-ghtorrent')
-@click.option('--verbose/--no-verbose', '-v', default=False,
- help='Verbose mode')
-@click.pass_context
-def ghtorrent(ctx, verbose):
- """Starts a ghtorrent consumer service.
-
- Consumes events from ghtorrent and write them to a cache.
-
- """
- from swh.scheduler.updater.ghtorrent import GHTorrentConsumer
- from swh.scheduler.updater.backend import SchedulerUpdaterBackend
-
- ght_config = ctx.obj['config'].get('ghtorrent', {})
- back_config = ctx.obj['config'].get('scheduler_updater', {})
- backend = SchedulerUpdaterBackend(**back_config)
- GHTorrentConsumer(backend, **ght_config).run()
diff --git a/swh/scheduler/sql/updater/10-swh-init.sql b/swh/scheduler/sql/updater/10-swh-init.sql
deleted file mode 100644
--- a/swh/scheduler/sql/updater/10-swh-init.sql
+++ /dev/null
@@ -1,4 +0,0 @@
-create extension if not exists btree_gist;
-create extension if not exists pgcrypto;
-
-create or replace language plpgsql;
diff --git a/swh/scheduler/sql/updater/30-swh-schema.sql b/swh/scheduler/sql/updater/30-swh-schema.sql
deleted file mode 100644
--- a/swh/scheduler/sql/updater/30-swh-schema.sql
+++ /dev/null
@@ -1,29 +0,0 @@
-create table dbversion
-(
- version int primary key,
- release timestamptz not null,
- description text not null
-);
-
-comment on table dbversion is 'Schema update tracking';
-
--- a SHA1 checksum (not necessarily originating from Git)
-create domain sha1 as bytea check (length(value) = 20);
-
-insert into dbversion (version, release, description)
- values (1, now(), 'Work In Progress');
-
-create type origin_type as enum ('git', 'svn', 'hg', 'deb');
-comment on type origin_type is 'Url''s repository type';
-
-create table cache (
- id sha1 primary key,
- url text not null,
- origin_type origin_type not null,
- cnt int default 1,
- first_seen timestamptz not null default now(),
- last_seen timestamptz not null
- );
-
-create index on cache(url);
-create index on cache(last_seen);
diff --git a/swh/scheduler/sql/updater/40-swh-func.sql b/swh/scheduler/sql/updater/40-swh-func.sql
deleted file mode 100644
--- a/swh/scheduler/sql/updater/40-swh-func.sql
+++ /dev/null
@@ -1,48 +0,0 @@
--- Postgresql index helper function
-create or replace function hash_sha1(text)
- returns sha1
-as $$
- select public.digest($1, 'sha1') :: sha1
-$$ language sql strict immutable;
-
-comment on function hash_sha1(text) is 'Compute sha1 hash as text';
-
--- create a temporary table for cache tmp_cache,
-create or replace function swh_mktemp_cache()
- returns void
- language sql
-as $$
- create temporary table tmp_cache (
- like cache including defaults
- ) on commit drop;
- alter table tmp_cache drop column id;
-$$;
-
-create or replace function swh_cache_put()
- returns void
- language plpgsql
-as $$
-begin
- insert into cache (id, url, origin_type, cnt, last_seen)
- select hash_sha1(url), url, origin_type, cnt, last_seen
- from tmp_cache t
- on conflict(id)
- do update set cnt = (select cnt from cache where id=excluded.id) + excluded.cnt,
- last_seen = excluded.last_seen;
- return;
-end
-$$;
-
-comment on function swh_cache_put() is 'Write to cache temporary events';
-
-create or replace function swh_cache_read(ts timestamptz, lim integer)
- returns setof cache
- language sql stable
-as $$
- select id, url, origin_type, cnt, first_seen, last_seen
- from cache
- where last_seen <= ts
- limit lim;
-$$;
-
-comment on function swh_cache_read(timestamptz, integer) is 'Read cache entries';
diff --git a/swh/scheduler/tests/updater/__init__.py b/swh/scheduler/tests/updater/__init__.py
deleted file mode 100644
--- a/swh/scheduler/tests/updater/__init__.py
+++ /dev/null
@@ -1,53 +0,0 @@
-# Copyright (C) 2018 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-from arrow import utcnow
-
-try:
- from hypothesis.strategies import from_regex
-except ImportError:
- from hypothesis.strategies import text
-
- # Revert to using basic text generation
- def from_regex(*args, **kwargs):
- return text()
-
-
-class UpdaterTestUtil:
- """Mixin intended for event generation purposes
-
- """
- def _make_event(self, event_type, name, origin_type):
- return {
- 'type': event_type,
- 'repo': {
- 'name': name,
- },
- 'created_at': utcnow(),
- 'origin_type': origin_type,
- }
-
- def _make_events(self, events):
- for event_type, repo_name, origin_type in events:
- yield self._make_event(event_type, repo_name, origin_type)
-
- def _make_incomplete_event(self, event_type, name, origin_type,
- missing_data_key):
- event = self._make_event(event_type, name, origin_type)
- del event[missing_data_key]
- return event
-
- def _make_incomplete_events(self, events):
- for event_type, repo_name, origin_type, missing_data_key in events:
- yield self._make_incomplete_event(event_type, repo_name,
- origin_type, missing_data_key)
-
- def _make_simple_event(self, event_type, name, origin_type):
- return {
- 'type': event_type,
- 'url': 'https://fakeurl/%s' % name,
- 'origin_type': origin_type,
- 'created_at': utcnow(),
- }
diff --git a/swh/scheduler/tests/updater/conftest.py b/swh/scheduler/tests/updater/conftest.py
deleted file mode 100644
--- a/swh/scheduler/tests/updater/conftest.py
+++ /dev/null
@@ -1,68 +0,0 @@
-import pytest
-import glob
-import os
-from arrow import utcnow # XXX
-
-from swh.core.utils import numfile_sortkey as sortkey
-from swh.scheduler.updater.backend import SchedulerUpdaterBackend
-from swh.scheduler.tests import SQL_DIR
-import swh.scheduler.tests.conftest # noqa
-
-
-DUMP_FILES = os.path.join(SQL_DIR, 'updater', '*.sql')
-
-
-@pytest.fixture
-def swh_scheduler_updater(postgresql):
- config = {
- 'db': postgresql.dsn,
- }
-
- all_dump_files = sorted(glob.glob(DUMP_FILES), key=sortkey)
-
- cursor = postgresql.cursor()
- for fname in all_dump_files:
- with open(fname) as fobj:
- cursor.execute(fobj.read())
- postgresql.commit()
-
- backend = SchedulerUpdaterBackend(**config)
- return backend
-
-
-def make_event(event_type, name, origin_type):
- return {
- 'type': event_type,
- 'repo': {
- 'name': name,
- },
- 'created_at': utcnow(),
- 'origin_type': origin_type,
- }
-
-
-def make_simple_event(event_type, name, origin_type):
- return {
- 'type': event_type,
- 'url': 'https://fakeurl/%s' % name,
- 'origin_type': origin_type,
- 'created_at': utcnow(),
- }
-
-
-def make_events(events):
- for event_type, repo_name, origin_type in events:
- yield make_event(event_type, repo_name, origin_type)
-
-
-def make_incomplete_event(event_type, name, origin_type,
- missing_data_key):
- event = make_event(event_type, name, origin_type)
- del event[missing_data_key]
- return event
-
-
-def make_incomplete_events(events):
- for event_type, repo_name, origin_type, missing_data_key in events:
- yield make_incomplete_event(event_type, repo_name,
- origin_type, missing_data_key)
diff --git a/swh/scheduler/tests/updater/test_backend.py b/swh/scheduler/tests/updater/test_backend.py
deleted file mode 100644
--- a/swh/scheduler/tests/updater/test_backend.py
+++ /dev/null
@@ -1,36 +0,0 @@
-# Copyright (C) 2018 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-from hypothesis import given
-from hypothesis.strategies import sets
-
-from swh.scheduler.updater.events import SWHEvent
-
-from . import from_regex
-
-
-@given(urls=sets(
- from_regex(
- r'^https://somewhere[.]org/[a-z0-9]{5,7}/[a-z0-9]{3,10}$'),
- min_size=10, max_size=15))
-def test_cache_read(urls, swh_scheduler_updater):
- # beware that the fixture is only called once for all the tests
- # generated by hypothesis, so the db is not cleared between calls.
- # see the end of
- # https://hypothesis.works/articles/hypothesis-pytest-fixtures/
- def gen_events(urls):
- for url in urls:
- yield SWHEvent({
- 'url': url,
- 'type': 'create',
- 'origin_type': 'git',
- })
- known_urls = set(e['url'] for e in
- swh_scheduler_updater.cache_read(limit=1000000))
- swh_scheduler_updater.cache_put(gen_events(urls))
- new_urls = {u.strip() for u in urls} - known_urls
- all_urls = set(e['url'] for e in
- swh_scheduler_updater.cache_read(limit=1000000))
- assert (all_urls - known_urls) == new_urls
diff --git a/swh/scheduler/tests/updater/test_consumer.py b/swh/scheduler/tests/updater/test_consumer.py
deleted file mode 100644
--- a/swh/scheduler/tests/updater/test_consumer.py
+++ /dev/null
@@ -1,199 +0,0 @@
-# Copyright (C) 2018 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import unittest
-from itertools import chain
-
-from hypothesis import given, settings, HealthCheck
-from hypothesis.strategies import lists, sampled_from, text, tuples
-
-from swh.scheduler.updater.consumer import UpdaterConsumer
-from swh.scheduler.updater.events import LISTENED_EVENTS, SWHEvent
-
-from . import UpdaterTestUtil, from_regex
-
-
-class FakeSchedulerUpdaterBackend:
- def __init__(self):
- self.events = []
-
- def cache_put(self, events):
- self.events.append(events)
-
-
-class FakeUpdaterConsumerBase(UpdaterConsumer):
- def __init__(self, backend):
- super().__init__(backend)
- self.connection_opened = False
- self.connection_closed = False
- self.consume_called = False
- self.has_events_called = False
-
- def open_connection(self):
- self.connection_opened = True
-
- def close_connection(self):
- self.connection_closed = True
-
- def convert_event(self, event):
- pass
-
-
-class FakeUpdaterConsumerRaise(FakeUpdaterConsumerBase):
- def has_events(self):
- self.has_events_called = True
- return True
-
- def consume_events(self):
- self.consume_called = True
- raise ValueError('Broken stuff')
-
-
-class UpdaterConsumerRaisingTest(unittest.TestCase):
- def setUp(self):
- self.updater = FakeUpdaterConsumerRaise(
- FakeSchedulerUpdaterBackend())
-
- def test_running_raise(self):
- """Raising during run should finish fine.
-
- """
- # given
- self.assertEqual(self.updater.count, 0)
- self.assertEqual(self.updater.seen_events, set())
- self.assertEqual(self.updater.events, [])
-
- # when
- with self.assertRaisesRegex(ValueError, 'Broken stuff'):
- self.updater.run()
-
- # then
- self.assertEqual(self.updater.count, 0)
- self.assertEqual(self.updater.seen_events, set())
- self.assertEqual(self.updater.events, [])
- self.assertTrue(self.updater.connection_opened)
- self.assertTrue(self.updater.has_events_called)
- self.assertTrue(self.updater.connection_closed)
- self.assertTrue(self.updater.consume_called)
-
-
-class FakeUpdaterConsumerNoEvent(FakeUpdaterConsumerBase):
- def has_events(self):
- self.has_events_called = True
- return False
-
- def consume_events(self):
- self.consume_called = True
-
-
-class UpdaterConsumerNoEventTest(unittest.TestCase):
- def setUp(self):
- self.updater = FakeUpdaterConsumerNoEvent(
- FakeSchedulerUpdaterBackend())
-
- def test_running_does_not_consume(self):
- """Run with no events should do just fine"""
- # given
- self.assertEqual(self.updater.count, 0)
- self.assertEqual(self.updater.seen_events, set())
- self.assertEqual(self.updater.events, [])
-
- # when
- self.updater.run()
-
- # then
- self.assertEqual(self.updater.count, 0)
- self.assertEqual(self.updater.seen_events, set())
- self.assertEqual(self.updater.events, [])
- self.assertTrue(self.updater.connection_opened)
- self.assertTrue(self.updater.has_events_called)
- self.assertTrue(self.updater.connection_closed)
- self.assertFalse(self.updater.consume_called)
-
-
-EVENT_KEYS = ['type', 'repo', 'created_at', 'origin_type']
-
-
-class FakeUpdaterConsumer(FakeUpdaterConsumerBase):
- def __init__(self, backend, messages):
- super().__init__(backend)
- self.messages = messages
- self.debug = False
-
- def has_events(self):
- self.has_events_called = True
- return len(self.messages) > 0
-
- def consume_events(self):
- self.consume_called = True
- for msg in self.messages:
- yield msg
- self.messages.pop()
-
- def convert_event(self, event, keys=EVENT_KEYS):
- for k in keys:
- v = event.get(k)
- if v is None:
- return None
-
- e = {
- 'type': event['type'],
- 'url': 'https://fake.url/%s' % event['repo']['name'],
- 'last_seen': event['created_at'],
- 'origin_type': event['origin_type'],
- }
- return SWHEvent(e)
-
-
-class UpdaterConsumerWithEventTest(UpdaterTestUtil, unittest.TestCase):
- @settings(suppress_health_check=[HealthCheck.too_slow])
- @given(lists(tuples(sampled_from(LISTENED_EVENTS), # event type
- from_regex(r'^[a-z0-9]{5,10}/[a-z0-9]{7,12}$'), # name
- text()), # origin type
- min_size=3, max_size=10),
- lists(tuples(text(), # event type
- from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$'), # name
- text()), # origin type
- min_size=3, max_size=10),
- lists(tuples(sampled_from(LISTENED_EVENTS), # event type
- from_regex(r'^[a-z0-9]{5,10}/[a-z0-9]{7,12}$'), # name
- text(), # origin type
- sampled_from(EVENT_KEYS)), # keys to drop
- min_size=3, max_size=10))
- def test_running(self, events, uninteresting_events, incomplete_events):
- """Interesting events are written to cache, others are dropped
-
- """
- # given
- ready_events = self._make_events(events)
- ready_uninteresting_events = self._make_events(uninteresting_events)
- ready_incomplete_events = self._make_incomplete_events(
- incomplete_events)
-
- updater = FakeUpdaterConsumer(
- FakeSchedulerUpdaterBackend(),
- list(chain(
- ready_events, ready_incomplete_events,
- ready_uninteresting_events)))
-
- self.assertEqual(updater.count, 0)
- self.assertEqual(updater.seen_events, set())
- self.assertEqual(updater.events, [])
-
- # when
- updater.run()
-
- # then
- self.assertEqual(updater.count, 0)
- self.assertEqual(updater.seen_events, set())
- self.assertEqual(updater.events, [])
- self.assertTrue(updater.connection_opened)
- self.assertTrue(updater.has_events_called)
- self.assertTrue(updater.connection_closed)
- self.assertTrue(updater.consume_called)
-
- self.assertEqual(updater.messages, [])
- # uninteresting or incomplete events are dropped
- self.assertTrue(len(updater.backend.events), len(events))
diff --git a/swh/scheduler/tests/updater/test_events.py b/swh/scheduler/tests/updater/test_events.py
deleted file mode 100644
--- a/swh/scheduler/tests/updater/test_events.py
+++ /dev/null
@@ -1,44 +0,0 @@
-# Copyright (C) 2018 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import unittest
-
-from hypothesis import given
-from hypothesis.strategies import sampled_from, text
-
-from swh.scheduler.updater.events import LISTENED_EVENTS, SWHEvent
-from swh.scheduler.updater.ghtorrent import events
-
-from . import UpdaterTestUtil
-
-
-def event_values_ko():
- return set(events['evt']).union(
- set(events['ent'])).difference(
- set(LISTENED_EVENTS))
-
-
-WRONG_EVENTS = sorted(list(event_values_ko()))
-
-
-class EventTest(UpdaterTestUtil, unittest.TestCase):
- @given(sampled_from(LISTENED_EVENTS), text(), text())
- def test_is_interesting_ok(self, event_type, name, origin_type):
- evt = self._make_simple_event(event_type, name, origin_type)
- self.assertTrue(SWHEvent(evt).is_interesting())
-
- @given(text(), text(), text())
- def test_is_interested_with_noisy_event_should_be_ko(
- self, event_type, name, origin_type):
- if event_type in LISTENED_EVENTS:
- # just in case something good is generated, skip it
- return
- evt = self._make_simple_event(event_type, name, origin_type)
- self.assertFalse(SWHEvent(evt).is_interesting())
-
- @given(sampled_from(WRONG_EVENTS), text(), text())
- def test_is_interesting_ko(self, event_type, name, origin_type):
- evt = self._make_simple_event(event_type, name, origin_type)
- self.assertFalse(SWHEvent(evt).is_interesting())
diff --git a/swh/scheduler/tests/updater/test_ghtorrent.py b/swh/scheduler/tests/updater/test_ghtorrent.py
deleted file mode 100644
--- a/swh/scheduler/tests/updater/test_ghtorrent.py
+++ /dev/null
@@ -1,174 +0,0 @@
-# Copyright (C) 2018 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import unittest
-from unittest.mock import patch
-
-from hypothesis import given
-from hypothesis.strategies import sampled_from
-
-from swh.scheduler.updater.events import SWHEvent
-from swh.scheduler.updater.ghtorrent import (INTERESTING_EVENT_KEYS,
- GHTorrentConsumer, events)
-from swh.scheduler.updater.backend import SchedulerUpdaterBackend
-
-from . import UpdaterTestUtil, from_regex
-
-
-def event_values():
- return set(events['evt']).union(set(events['ent']))
-
-
-def ghtorrentize_event_name(event_name):
- return '%sEvent' % event_name.capitalize()
-
-
-EVENT_TYPES = sorted([ghtorrentize_event_name(e) for e in event_values()])
-
-
-class FakeChannel:
- """Fake Channel (virtual connection inside a connection)
-
- """
- def close(self):
- self.close = True
-
-
-class FakeConnection:
- """Fake Rabbitmq connection for test purposes
-
- """
- def __init__(self, conn_string):
- self._conn_string = conn_string
- self._connect = False
- self._release = False
- self._channel = False
-
- def connect(self):
- self._connect = True
- return True
-
- def release(self):
- self._connect = False
- self._release = True
-
- def channel(self):
- self._channel = True
- return FakeChannel()
-
-
-class GHTorrentConsumerTest(UpdaterTestUtil, unittest.TestCase):
- def setUp(self):
- config = {
- 'ghtorrent': {
- 'rabbitmq': {
- 'conn': {
- 'url': 'amqp://u:p@https://somewhere:9807',
- },
- 'prefetch_read': 17,
- },
- 'batch_cache_write': 42,
- },
- 'scheduler_updater': {
- 'cls': 'local',
- 'args': {
- 'db': 'dbname=softwareheritage-scheduler-updater-dev',
- },
- },
- }
-
- GHTorrentConsumer.connection_class = FakeConnection
- with patch.object(
- SchedulerUpdaterBackend, '__init__', return_value=None):
- self.consumer = GHTorrentConsumer(**config)
-
- @patch('swh.scheduler.updater.backend.SchedulerUpdaterBackend')
- def test_init(self, mock_backend):
- # given
- # check init is ok
- self.assertEqual(self.consumer.batch, 42)
- self.assertEqual(self.consumer.prefetch_read, 17)
-
- def test_has_events(self):
- self.assertTrue(self.consumer.has_events())
-
- def test_connection(self):
- # when
- self.consumer.open_connection()
-
- # then
- self.assertEqual(self.consumer.conn._conn_string,
- 'amqp://u:p@https://somewhere:9807')
- self.assertTrue(self.consumer.conn._connect)
- self.assertFalse(self.consumer.conn._release)
-
- # when
- self.consumer.close_connection()
-
- # then
- self.assertFalse(self.consumer.conn._connect)
- self.assertTrue(self.consumer.conn._release)
- self.assertIsInstance(self.consumer.channel, FakeChannel)
-
- @given(sampled_from(EVENT_TYPES),
- from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$'))
- def test_convert_event_ok(self, event_type, name):
- input_event = self._make_event(event_type, name, 'git')
- actual_event = self.consumer.convert_event(input_event)
-
- self.assertTrue(isinstance(actual_event, SWHEvent))
-
- event = actual_event.get()
-
- expected_event = {
- 'type': event_type.lower().rstrip('Event'),
- 'url': 'https://github.com/%s' % name,
- 'last_seen': input_event['created_at'],
- 'cnt': 1,
- 'origin_type': 'git',
- }
- self.assertEqual(event, expected_event)
-
- @given(sampled_from(EVENT_TYPES),
- from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$'),
- sampled_from(INTERESTING_EVENT_KEYS))
- def test_convert_event_ko(self, event_type, name, missing_data_key):
- input_event = self._make_incomplete_event(
- event_type, name, 'git', missing_data_key)
-
- logger = self.consumer.log
- del self.consumer.log # prevent gazillions of warnings
- actual_converted_event = self.consumer.convert_event(input_event)
- self.consumer.log = logger
- self.assertIsNone(actual_converted_event)
-
- @patch('swh.scheduler.updater.ghtorrent.collect_replies')
- def test_consume_events(self, mock_collect_replies):
- # given
- self.consumer.queue = 'fake-queue' # hack
- self.consumer.open_connection()
-
- fake_events = [
- self._make_event('PushEvent', 'user/some-repo', 'git'),
- self._make_event('PushEvent', 'user2/some-other-repo', 'git'),
- ]
-
- mock_collect_replies.return_value = fake_events
-
- # when
- actual_events = []
- for e in self.consumer.consume_events():
- actual_events.append(e)
-
- # then
- self.assertEqual(fake_events, actual_events)
-
- mock_collect_replies.assert_called_once_with(
- self.consumer.conn,
- self.consumer.channel,
- 'fake-queue',
- no_ack=False,
- limit=17
- )
diff --git a/swh/scheduler/tests/updater/test_writer.py b/swh/scheduler/tests/updater/test_writer.py
deleted file mode 100644
--- a/swh/scheduler/tests/updater/test_writer.py
+++ /dev/null
@@ -1,152 +0,0 @@
-# Copyright (C) 2018 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import os
-from glob import glob
-
-import pytest
-from pytest_postgresql.factories import postgresql as pg_fixture_factory
-
-from os.path import join
-from swh.core.utils import numfile_sortkey as sortkey
-from swh.scheduler.tests import SQL_DIR
-from swh.scheduler.updater.events import LISTENED_EVENTS, SWHEvent
-from swh.scheduler.updater.writer import UpdaterWriter
-
-from .conftest import make_simple_event
-
-
-pg_scheduler = pg_fixture_factory('postgresql_proc', 'scheduler')
-pg_updater = pg_fixture_factory('postgresql_proc', 'updater')
-
-
-def pg_sched_fact(dbname, sqldir):
- @pytest.fixture
- def pg_scheduler_db(request):
- pg = request.getfixturevalue('pg_%s' % dbname)
- dump_files = sorted(glob(os.path.join(sqldir, '*.sql')),
- key=sortkey)
- with pg.cursor() as cur:
- for fname in dump_files:
- with open(fname) as fobj:
- sql = fobj.read().replace('concurrently', '')
- cur.execute(sql)
- pg.commit()
- yield pg
-
- return pg_scheduler_db
-
-
-scheduler_db = pg_sched_fact('scheduler', SQL_DIR)
-updater_db = pg_sched_fact('updater', join(SQL_DIR, 'updater'))
-
-
-@pytest.fixture
-def swh_updater_writer(scheduler_db, updater_db):
- config = {
- 'scheduler': {
- 'cls': 'local',
- 'args': {
- 'db': scheduler_db.dsn,
- },
- },
- 'scheduler_updater': {
- 'cls': 'local',
- 'args': {
- 'db': updater_db.dsn,
- 'cache_read_limit': 5,
- },
- },
- 'updater_writer': {
- 'pause': 0.1,
- 'verbose': False,
- },
- }
- return UpdaterWriter(**config)
-
-
-def test_run_ko(swh_updater_writer):
- """Only git tasks are supported for now, other types are dismissed.
-
- """
- scheduler = swh_updater_writer.scheduler_backend
- updater = swh_updater_writer.scheduler_updater_backend
-
- ready_events = [
- SWHEvent(
- make_simple_event(event_type, 'origin-%s' % i,
- 'svn'))
- for i, event_type in enumerate(LISTENED_EVENTS)
- ]
-
- updater.cache_put(ready_events)
- list(updater.cache_read())
-
- r = scheduler.peek_ready_tasks('load-git')
-
- # first read on an empty scheduling db results with nothing in it
- assert not r
-
- # Read from cache to scheduler db
- swh_updater_writer.run()
-
- r = scheduler.peek_ready_tasks('load-git')
-
- # other reads after writes are still empty since it's not supported
- assert not r
-
-
-def test_run_ok(swh_updater_writer):
- """Only git origin are supported for now
-
- """
- scheduler = swh_updater_writer.scheduler_backend
- updater = swh_updater_writer.scheduler_updater_backend
-
- ready_events = [
- SWHEvent(
- make_simple_event(event_type, 'origin-%s' % i, 'git'))
- for i, event_type in enumerate(LISTENED_EVENTS)
- ]
-
- expected_length = len(ready_events)
-
- updater.cache_put(ready_events)
-
- data = list(updater.cache_read())
- assert len(data) == expected_length
-
- r = scheduler.peek_ready_tasks('load-git')
-
- # first read on an empty scheduling db results with nothing in it
- assert not r
-
- # Read from cache to scheduler db
- swh_updater_writer.run()
-
- # now, we should have scheduling task ready
- r = scheduler.peek_ready_tasks('load-git')
-
- assert len(r) == expected_length
-
- # Check the task has been scheduled
- for t in r:
- assert t['type'] == 'load-git'
- assert t['priority'] == 'normal'
- assert t['policy'] == 'oneshot'
- assert t['status'] == 'next_run_not_scheduled'
-
- # writer has nothing to do now
- swh_updater_writer.run()
-
- # so no more data in cache
- data = list(updater.cache_read())
-
- assert not data
-
- # provided, no runner is ran, still the same amount of scheduling tasks
- r = scheduler.peek_ready_tasks('load-git')
-
- assert len(r) == expected_length
diff --git a/swh/scheduler/updater/__init__.py b/swh/scheduler/updater/__init__.py
deleted file mode 100644
diff --git a/swh/scheduler/updater/backend.py b/swh/scheduler/updater/backend.py
deleted file mode 100644
--- a/swh/scheduler/updater/backend.py
+++ /dev/null
@@ -1,113 +0,0 @@
-# Copyright (C) 2018 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-
-from arrow import utcnow
-import psycopg2.pool
-import psycopg2.extras
-
-from swh.core.db import BaseDb
-from swh.core.db.common import db_transaction, db_transaction_generator
-from swh.scheduler.backend import format_query
-
-
-class SchedulerUpdaterBackend:
- CONFIG_BASE_FILENAME = 'backend/scheduler-updater'
-# 'cache_read_limit': ('int', 1000),
-
- def __init__(self, db, cache_read_limit=1000,
- min_pool_conns=1, max_pool_conns=10):
- """
- Args:
- db_conn: either a libpq connection string, or a psycopg2 connection
-
- """
- if isinstance(db, psycopg2.extensions.connection):
- self._pool = None
- self._db = BaseDb(db)
- else:
- self._pool = psycopg2.pool.ThreadedConnectionPool(
- min_pool_conns, max_pool_conns, db,
- cursor_factory=psycopg2.extras.RealDictCursor,
- )
- self._db = None
- self.limit = cache_read_limit
-
- def get_db(self):
- if self._db:
- return self._db
- return BaseDb.from_pool(self._pool)
-
- def put_db(self, db):
- if db is not self._db:
- db.put_conn()
-
- cache_put_keys = ['url', 'cnt', 'last_seen', 'origin_type']
-
- @db_transaction()
- def cache_put(self, events, timestamp=None, db=None, cur=None):
- """Write new events in the backend.
-
- """
- cur.execute('select swh_mktemp_cache()')
- db.copy_to(prepare_events(events, timestamp),
- 'tmp_cache', self.cache_put_keys, cur=cur)
- cur.execute('select swh_cache_put()')
-
- cache_read_keys = ['id', 'url', 'origin_type', 'cnt', 'first_seen',
- 'last_seen']
-
- @db_transaction_generator()
- def cache_read(self, timestamp=None, limit=None, db=None, cur=None):
- """Read events from the cache prior to timestamp.
-
- Note that limit=None does not mean 'no limit' but use the default
- limit (see cache_read_limit constructor argument).
-
- """
- if not timestamp:
- timestamp = utcnow()
-
- if not limit:
- limit = self.limit
-
- q = format_query('select {keys} from swh_cache_read(%s, %s)',
- self.cache_read_keys)
- cur.execute(q, (timestamp, limit))
- yield from cur.fetchall()
-
- @db_transaction()
- def cache_remove(self, entries, db=None, cur=None):
- """Clean events from the cache
-
- """
- q = 'delete from cache where url in (%s)' % (
- ', '.join(("'%s'" % e for e in entries)), )
- cur.execute(q)
-
-
-def prepare_events(events, timestamp=None):
- if timestamp is None:
- timestamp = utcnow()
- outevents = []
- urls = []
- for e in events:
- event = e.get()
- url = event['url'].strip()
- if event['last_seen'] is None:
- event['last_seen'] = timestamp
- event['url'] = url
-
- if url in urls:
- idx = urls.index(url)
- urls.append(urls.pop(idx))
- prev_event = outevents.pop(idx)
- event['cnt'] += prev_event['cnt']
- event['last_seen'] = max(
- event['last_seen'], prev_event['last_seen'])
- else:
- urls.append(url)
- outevents.append(event)
- return outevents
diff --git a/swh/scheduler/updater/consumer.py b/swh/scheduler/updater/consumer.py
deleted file mode 100644
--- a/swh/scheduler/updater/consumer.py
+++ /dev/null
@@ -1,138 +0,0 @@
-# Copyright (C) 2018 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import logging
-
-from abc import ABCMeta, abstractmethod
-
-
-class UpdaterConsumer(metaclass=ABCMeta):
- """Event consumer
-
- """
- def __init__(self, backend, batch_cache_write=1000):
- super().__init__()
- self._reset_cache()
- self.backend = backend
- self.batch = int(batch_cache_write)
- logging.basicConfig(level=logging.DEBUG)
- self.log = logging.getLogger('%s.%s' % (
- self.__class__.__module__, self.__class__.__name__))
-
- def _reset_cache(self):
- """Reset internal cache.
-
- """
- self.count = 0
- self.seen_events = set()
- self.events = []
-
- def is_interesting(self, event):
- """Determine if an event is interesting or not.
-
- Args:
- event (SWHEvent): SWH event
-
- """
- return event.is_interesting()
-
- @abstractmethod
- def convert_event(self, event):
- """Parse an event into an SWHEvent.
-
- """
- pass
-
- def process_event(self, event):
- """Process converted and interesting event.
-
- Args:
- event (SWHEvent): Event to process if deemed interesting
-
- """
- try:
- if event.url in self.seen_events:
- event.cnt += 1
- else:
- self.events.append(event)
- self.seen_events.add(event.url)
- self.count += 1
- finally:
- if self.count >= self.batch:
- if self.events:
- self.backend.cache_put(self.events)
- self._reset_cache()
-
- def _flush(self):
- """Flush remaining internal cache if any.
-
- """
- if self.events:
- self.backend.cache_put(self.events)
- self._reset_cache()
-
- @abstractmethod
- def has_events(self):
- """Determine if there remains events to consume.
-
- Returns
- boolean value, true for remaining events, False otherwise
-
- """
- pass
-
- @abstractmethod
- def consume_events(self):
- """The main entry point to consume events.
-
- This should either yield or return message for consumption.
-
- """
- pass
-
- @abstractmethod
- def open_connection(self):
- """Open a connection to the remote system we are supposed to consume
- from.
-
- """
- pass
-
- @abstractmethod
- def close_connection(self):
- """Close opened connection to the remote system.
-
- """
- pass
-
- def run(self):
- """The main entry point to consume events.
-
- """
- try:
- self.open_connection()
- while self.has_events():
- for _event in self.consume_events():
- event = self.convert_event(_event)
- if not event:
- self.log.warning(
- 'Incomplete event dropped %s' % _event)
- continue
- if not self.is_interesting(event):
- continue
- if self.debug:
- self.log.debug('Event: %s' % event)
- try:
- self.process_event(event)
- except Exception:
- self.log.exception(
- 'Problem when processing event %s' % _event)
- continue
- except Exception as e:
- self.log.error('Error raised during consumption: %s' % e)
- raise e
- finally:
- self.close_connection()
- self._flush()
diff --git a/swh/scheduler/updater/events.py b/swh/scheduler/updater/events.py
deleted file mode 100644
--- a/swh/scheduler/updater/events.py
+++ /dev/null
@@ -1,39 +0,0 @@
-# Copyright (C) 2018 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-
-LISTENED_EVENTS = [
- 'delete',
- 'public',
- 'push'
-]
-
-
-class SWHEvent:
- """SWH's interesting event (resulting in an origin update)
-
- """
- def __init__(self, evt, cnt=1):
- self.event = evt
- self.type = evt['type'].lower()
- self.url = evt['url']
- self.last_seen = evt.get('last_seen')
- self.cnt = cnt
- self.origin_type = evt.get('origin_type')
-
- def is_interesting(self):
- return self.type in LISTENED_EVENTS
-
- def get(self):
- return {
- 'type': self.type,
- 'url': self.url,
- 'last_seen': self.last_seen,
- 'cnt': self.cnt,
- 'origin_type': self.origin_type
- }
-
- def __str__(self):
- return str(self.get())
diff --git a/swh/scheduler/updater/ghtorrent/__init__.py b/swh/scheduler/updater/ghtorrent/__init__.py
deleted file mode 100644
--- a/swh/scheduler/updater/ghtorrent/__init__.py
+++ /dev/null
@@ -1,143 +0,0 @@
-# Copyright (C) 2018 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import json
-
-from kombu import Connection, Exchange, Queue
-from kombu.common import collect_replies
-
-from swh.core.config import merge_configs
-
-from swh.scheduler.updater.events import SWHEvent
-from swh.scheduler.updater.consumer import UpdaterConsumer
-from swh.scheduler.updater.backend import SchedulerUpdaterBackend
-
-
-events = {
- # ghtorrent events related to github events (interesting)
- 'evt': [
- 'commitcomment', 'create', 'delete', 'deployment',
- 'deploymentstatus', 'download', 'follow', 'fork', 'forkapply',
- 'gist', 'gollum', 'issuecomment', 'issues', 'member',
- 'membership', 'pagebuild', 'public', 'pullrequest',
- 'pullrequestreviewcomment', 'push', 'release', 'repository',
- 'status', 'teamadd', 'watch'
- ],
- # ghtorrent events related to mongodb insert (not interesting)
- 'ent': [
- 'commit_comments', 'commits', 'followers', 'forks',
- 'geo_cache', 'issue_comments', 'issue_events', 'issues',
- 'org_members', 'pull_request_comments', 'pull_requests',
- 'repo_collaborators', 'repo_labels', 'repos', 'users', 'watchers'
- ]
-}
-
-INTERESTING_EVENT_KEYS = ['type', 'repo', 'created_at']
-
-DEFAULT_CONFIG = {
- 'ghtorrent': {
- 'batch_cache_write': 1000,
- 'rabbitmq': {
- 'prefetch_read': 100,
- 'conn': {
- 'url': 'amqp://guest:guest@localhost:5672',
- 'exchange_name': 'ght-streams',
- 'routing_key': 'something',
- 'queue_name': 'fake-events',
- },
- },
- },
- 'scheduler_updater': {
- 'cls': 'local',
- 'args': {
- 'db': 'dbname=softwareheritage-scheduler-updater-dev',
- 'cache_read_limit': 1000,
- },
- },
-}
-
-
-class GHTorrentConsumer(UpdaterConsumer):
- """GHTorrent events consumer
-
- """
- connection_class = Connection
-
- def __init__(self, **config):
- self.config = merge_configs(DEFAULT_CONFIG, config)
-
- ght_config = self.config['ghtorrent']
- rmq_config = ght_config['rabbitmq']
- self.prefetch_read = int(rmq_config.get('prefetch_read', 100))
-
- exchange = Exchange(
- rmq_config['conn']['exchange_name'],
- 'topic', durable=True)
- routing_key = rmq_config['conn']['routing_key']
- self.queue = Queue(rmq_config['conn']['queue_name'],
- exchange=exchange,
- routing_key=routing_key,
- auto_delete=True)
-
- if self.config['scheduler_updater']['cls'] != 'local':
- raise ValueError(
- 'The scheduler_updater can only be a cls=local for now')
- backend = SchedulerUpdaterBackend(
- **self.config['scheduler_updater']['args'])
-
- super().__init__(backend, ght_config.get('batch_cache_write', 1000))
-
- def has_events(self):
- """Always has events
-
- """
- return True
-
- def convert_event(self, event):
- """Given ghtorrent event, convert it to a SWHEvent instance.
-
- """
- if isinstance(event, str):
- event = json.loads(event)
- for k in INTERESTING_EVENT_KEYS:
- if k not in event:
- if hasattr(self, 'log'):
- self.log.warning(
- 'Event should have the \'%s\' entry defined' % k)
- return None
-
- _type = event['type'].lower().rstrip('Event')
- _repo_name = 'https://github.com/%s' % event['repo']['name']
-
- return SWHEvent({
- 'type': _type,
- 'url': _repo_name,
- 'last_seen': event['created_at'],
- 'origin_type': 'git',
- })
-
- def open_connection(self):
- """Open rabbitmq connection
-
- """
- self.conn = self.connection_class(
- self.config['ghtorrent']['rabbitmq']['conn']['url'])
- self.conn.connect()
- self.channel = self.conn.channel()
-
- def close_connection(self):
- """Close rabbitmq connection
-
- """
- self.channel.close()
- self.conn.release()
-
- def consume_events(self):
- """Consume and yield queue messages
-
- """
- yield from collect_replies(
- self.conn, self.channel, self.queue,
- no_ack=False, limit=self.prefetch_read)
diff --git a/swh/scheduler/updater/ghtorrent/cli.py b/swh/scheduler/updater/ghtorrent/cli.py
deleted file mode 100644
--- a/swh/scheduler/updater/ghtorrent/cli.py
+++ /dev/null
@@ -1,23 +0,0 @@
-# Copyright (C) 2018 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import click
-
-
-@click.command()
-@click.option('--verbose/--no-verbose', '-v', default=False,
- help='Verbose mode')
-@click.pass_context
-def main(ctx, verbose):
- """Consume events from ghtorrent and write them to cache.
-
- """
- click.echo("Deprecated! Use 'swh-scheduler updater' instead.",
- err=True)
- ctx.exit(1)
-
-
-if __name__ == '__main__':
- main()
diff --git a/swh/scheduler/updater/writer.py b/swh/scheduler/updater/writer.py
deleted file mode 100644
--- a/swh/scheduler/updater/writer.py
+++ /dev/null
@@ -1,96 +0,0 @@
-# Copyright (C) 2018 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import click
-import logging
-import time
-
-from arrow import utcnow
-
-from swh.core import utils
-from swh.scheduler import get_scheduler
-from swh.scheduler.utils import create_oneshot_task_dict
-from swh.scheduler.updater.backend import SchedulerUpdaterBackend
-
-
-class UpdaterWriter:
- """Updater writer in charge of updating the scheduler db with latest
- prioritized oneshot tasks
-
- In effect, this:
- - reads the events from scheduler updater's db
- - converts those events into priority oneshot tasks
- - dumps them into the scheduler db
-
- """
-
- def __init__(self, **config):
- self.config = config
- if self.config['scheduler_updater']['cls'] != 'local':
- raise ValueError(
- 'The scheduler_updater can only be a cls=local for now')
- self.scheduler_updater_backend = SchedulerUpdaterBackend(
- **self.config['scheduler_updater']['args'])
- self.scheduler_backend = get_scheduler(**self.config['scheduler'])
- self.pause = self.config.get('updater_writer', {}).get('pause', 10)
- self.log = logging.getLogger(
- 'swh.scheduler.updater.writer.UpdaterWriter')
-
- def convert_to_oneshot_task(self, event):
- """Given an event, convert it into oneshot task with priority
-
- Args:
- event (dict): The event to convert to task
-
- """
- if event['origin_type'] == 'git':
- return create_oneshot_task_dict(
- 'load-git',
- event['url'],
- priority='normal')
- self.log.warning('Type %s is not supported for now, only git' % (
- event['origin_type'], ))
- return None
-
- def write_event_to_scheduler(self, events):
- """Write events to the scheduler and yield ids when done"""
- # convert events to oneshot tasks
- oneshot_tasks = filter(lambda e: e is not None,
- map(self.convert_to_oneshot_task, events))
- # write event to scheduler
- self.scheduler_backend.create_tasks(list(oneshot_tasks))
- for e in events:
- yield e['url']
-
- def run(self):
- """First retrieve events from cache (including origin_type, cnt),
- then convert them into oneshot tasks with priority, then
- write them to the scheduler db, at last remove them from
- cache.
-
- """
- while True:
- timestamp = utcnow()
- events = list(self.scheduler_updater_backend.cache_read(timestamp))
- if not events:
- break
- for urls in utils.grouper(self.write_event_to_scheduler(events),
- n=100):
- self.scheduler_updater_backend.cache_remove(urls)
- time.sleep(self.pause)
-
-
-@click.command()
-@click.option('--verbose/--no-verbose', '-v', default=False,
- help='Verbose mode')
-@click.pass_context
-def main(ctx, verbose):
- click.echo("Deprecated! Use 'swh-scheduler updater' instead.",
- err=True)
- ctx.exit(1)
-
-
-if __name__ == '__main__':
- main()
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Jul 3, 3:57 PM (2 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3215664
Attached To
D2354: scheduler.updater: Remove dead code
Event Timeline
Log In to Comment