Page MenuHomeSoftware Heritage

D2354.diff
No OneTemporary

D2354.diff

diff --git a/sql/updater/sql/Makefile b/sql/updater/sql/Makefile
deleted file mode 100644
--- a/sql/updater/sql/Makefile
+++ /dev/null
@@ -1,73 +0,0 @@
-# Depends: postgresql-client, postgresql-autodoc
-
-DBNAME = softwareheritage-scheduler-updater-dev
-DOCDIR = autodoc
-
-SQL_INIT = 10-swh-init.sql
-SQL_SCHEMA = 30-swh-schema.sql
-SQL_FUNC = 40-swh-func.sql
-SQLS = $(SQL_INIT) $(SQL_SCHEMA) $(SQL_FUNC)
-SQL_FILES = $(abspath $(addprefix \
- $(CURDIR)/../../../swh/scheduler/sql/updater/,$(SQLS)))
-
-PSQL_BIN = psql
-PSQL_FLAGS = --echo-all -X -v ON_ERROR_STOP=1
-PSQL = $(PSQL_BIN) $(PSQL_FLAGS)
-
-PIFPAF=$(findstring postgresql://,$(PIFPAF_URLS))
-
-all:
-
-createdb: createdb-stamp
-createdb-stamp: $(SQL_FILES)
-ifeq ($(PIFPAF),)
- -dropdb $(DBNAME)
-endif
- createdb $(DBNAME)
-ifeq ($(PIFPAF),)
- touch $@
-else
- rm -f $@
-endif
-
-filldb: filldb-stamp
-filldb-stamp: createdb-stamp
- cat $(SQL_FILES) | $(PSQL) $(DBNAME)
-ifeq ($(PIFPAF),)
- touch $@
-else
- rm -f $@
-endif
-
-dropdb:
- -dropdb $(DBNAME)
-
-dumpdb: swh-scheduler-updater.dump
-swh-scheduler-updater.dump: filldb-stamp
- pg_dump -Fc $(DBNAME) > $@
-
-$(DOCDIR):
- test -d $(DOCDIR)/ || mkdir $(DOCDIR)
-
-doc: autodoc-stamp $(DOCDIR)/swh-scheduler-updater.pdf
-autodoc-stamp: filldb-stamp $(DOCDIR)
- postgresql_autodoc -d $(DBNAME) -f $(DOCDIR)/swh-scheduler-updater
- cp -a $(DOCDIR)/swh-scheduler-updater.dot $(DOCDIR)/swh-scheduler-updater.dot.orig
-ifeq ($(PIFPAF),)
- touch $@
-else
- rm -f $@
-endif
-
-$(DOCDIR)/swh-scheduler-updater.pdf: $(DOCDIR)/swh-scheduler-updater.dot autodoc-stamp
- dot -T pdf $< > $@
-$(DOCDIR)/swh-scheduler-updater.svg: $(DOCDIR)/swh-scheduler-updater.dot autodoc-stamp
- dot -T svg $< > $@
-
-clean:
- rm -rf *-stamp $(DOCDIR)/
-
-distclean: clean dropdb
- rm -f swh-scheduler-updater.dump
-
-.PHONY: all initdb createdb dropdb doc clean
diff --git a/swh/scheduler/cli/admin.py b/swh/scheduler/cli/admin.py
--- a/swh/scheduler/cli/admin.py
+++ b/swh/scheduler/cli/admin.py
@@ -88,37 +88,3 @@
if debug is None:
debug = ctx.obj['log_level'] <= logging.DEBUG
server.app.run(host, port=port, debug=bool(debug))
-
-
-@cli.command('start-updater')
-@click.option('--verbose/--no-verbose', '-v', default=False,
- help='Verbose mode')
-@click.pass_context
-def updater(ctx, verbose):
- """Starts a scheduler-updater service.
-
- Insert tasks in the scheduler from the scheduler-updater's events read from
- the db cache (filled e.g. by the ghtorrent consumer service) .
-
- """
- from swh.scheduler.updater.writer import UpdaterWriter
- UpdaterWriter(**ctx.obj['config']).run()
-
-
-@cli.command('start-ghtorrent')
-@click.option('--verbose/--no-verbose', '-v', default=False,
- help='Verbose mode')
-@click.pass_context
-def ghtorrent(ctx, verbose):
- """Starts a ghtorrent consumer service.
-
- Consumes events from ghtorrent and write them to a cache.
-
- """
- from swh.scheduler.updater.ghtorrent import GHTorrentConsumer
- from swh.scheduler.updater.backend import SchedulerUpdaterBackend
-
- ght_config = ctx.obj['config'].get('ghtorrent', {})
- back_config = ctx.obj['config'].get('scheduler_updater', {})
- backend = SchedulerUpdaterBackend(**back_config)
- GHTorrentConsumer(backend, **ght_config).run()
diff --git a/swh/scheduler/sql/updater/10-swh-init.sql b/swh/scheduler/sql/updater/10-swh-init.sql
deleted file mode 100644
--- a/swh/scheduler/sql/updater/10-swh-init.sql
+++ /dev/null
@@ -1,4 +0,0 @@
-create extension if not exists btree_gist;
-create extension if not exists pgcrypto;
-
-create or replace language plpgsql;
diff --git a/swh/scheduler/sql/updater/30-swh-schema.sql b/swh/scheduler/sql/updater/30-swh-schema.sql
deleted file mode 100644
--- a/swh/scheduler/sql/updater/30-swh-schema.sql
+++ /dev/null
@@ -1,29 +0,0 @@
-create table dbversion
-(
- version int primary key,
- release timestamptz not null,
- description text not null
-);
-
-comment on table dbversion is 'Schema update tracking';
-
--- a SHA1 checksum (not necessarily originating from Git)
-create domain sha1 as bytea check (length(value) = 20);
-
-insert into dbversion (version, release, description)
- values (1, now(), 'Work In Progress');
-
-create type origin_type as enum ('git', 'svn', 'hg', 'deb');
-comment on type origin_type is 'Url''s repository type';
-
-create table cache (
- id sha1 primary key,
- url text not null,
- origin_type origin_type not null,
- cnt int default 1,
- first_seen timestamptz not null default now(),
- last_seen timestamptz not null
- );
-
-create index on cache(url);
-create index on cache(last_seen);
diff --git a/swh/scheduler/sql/updater/40-swh-func.sql b/swh/scheduler/sql/updater/40-swh-func.sql
deleted file mode 100644
--- a/swh/scheduler/sql/updater/40-swh-func.sql
+++ /dev/null
@@ -1,48 +0,0 @@
--- Postgresql index helper function
-create or replace function hash_sha1(text)
- returns sha1
-as $$
- select public.digest($1, 'sha1') :: sha1
-$$ language sql strict immutable;
-
-comment on function hash_sha1(text) is 'Compute sha1 hash as text';
-
--- create a temporary table for cache tmp_cache,
-create or replace function swh_mktemp_cache()
- returns void
- language sql
-as $$
- create temporary table tmp_cache (
- like cache including defaults
- ) on commit drop;
- alter table tmp_cache drop column id;
-$$;
-
-create or replace function swh_cache_put()
- returns void
- language plpgsql
-as $$
-begin
- insert into cache (id, url, origin_type, cnt, last_seen)
- select hash_sha1(url), url, origin_type, cnt, last_seen
- from tmp_cache t
- on conflict(id)
- do update set cnt = (select cnt from cache where id=excluded.id) + excluded.cnt,
- last_seen = excluded.last_seen;
- return;
-end
-$$;
-
-comment on function swh_cache_put() is 'Write to cache temporary events';
-
-create or replace function swh_cache_read(ts timestamptz, lim integer)
- returns setof cache
- language sql stable
-as $$
- select id, url, origin_type, cnt, first_seen, last_seen
- from cache
- where last_seen <= ts
- limit lim;
-$$;
-
-comment on function swh_cache_read(timestamptz, integer) is 'Read cache entries';
diff --git a/swh/scheduler/tests/updater/__init__.py b/swh/scheduler/tests/updater/__init__.py
deleted file mode 100644
--- a/swh/scheduler/tests/updater/__init__.py
+++ /dev/null
@@ -1,53 +0,0 @@
-# Copyright (C) 2018 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-from arrow import utcnow
-
-try:
- from hypothesis.strategies import from_regex
-except ImportError:
- from hypothesis.strategies import text
-
- # Revert to using basic text generation
- def from_regex(*args, **kwargs):
- return text()
-
-
-class UpdaterTestUtil:
- """Mixin intended for event generation purposes
-
- """
- def _make_event(self, event_type, name, origin_type):
- return {
- 'type': event_type,
- 'repo': {
- 'name': name,
- },
- 'created_at': utcnow(),
- 'origin_type': origin_type,
- }
-
- def _make_events(self, events):
- for event_type, repo_name, origin_type in events:
- yield self._make_event(event_type, repo_name, origin_type)
-
- def _make_incomplete_event(self, event_type, name, origin_type,
- missing_data_key):
- event = self._make_event(event_type, name, origin_type)
- del event[missing_data_key]
- return event
-
- def _make_incomplete_events(self, events):
- for event_type, repo_name, origin_type, missing_data_key in events:
- yield self._make_incomplete_event(event_type, repo_name,
- origin_type, missing_data_key)
-
- def _make_simple_event(self, event_type, name, origin_type):
- return {
- 'type': event_type,
- 'url': 'https://fakeurl/%s' % name,
- 'origin_type': origin_type,
- 'created_at': utcnow(),
- }
diff --git a/swh/scheduler/tests/updater/conftest.py b/swh/scheduler/tests/updater/conftest.py
deleted file mode 100644
--- a/swh/scheduler/tests/updater/conftest.py
+++ /dev/null
@@ -1,68 +0,0 @@
-import pytest
-import glob
-import os
-from arrow import utcnow # XXX
-
-from swh.core.utils import numfile_sortkey as sortkey
-from swh.scheduler.updater.backend import SchedulerUpdaterBackend
-from swh.scheduler.tests import SQL_DIR
-import swh.scheduler.tests.conftest # noqa
-
-
-DUMP_FILES = os.path.join(SQL_DIR, 'updater', '*.sql')
-
-
-@pytest.fixture
-def swh_scheduler_updater(postgresql):
- config = {
- 'db': postgresql.dsn,
- }
-
- all_dump_files = sorted(glob.glob(DUMP_FILES), key=sortkey)
-
- cursor = postgresql.cursor()
- for fname in all_dump_files:
- with open(fname) as fobj:
- cursor.execute(fobj.read())
- postgresql.commit()
-
- backend = SchedulerUpdaterBackend(**config)
- return backend
-
-
-def make_event(event_type, name, origin_type):
- return {
- 'type': event_type,
- 'repo': {
- 'name': name,
- },
- 'created_at': utcnow(),
- 'origin_type': origin_type,
- }
-
-
-def make_simple_event(event_type, name, origin_type):
- return {
- 'type': event_type,
- 'url': 'https://fakeurl/%s' % name,
- 'origin_type': origin_type,
- 'created_at': utcnow(),
- }
-
-
-def make_events(events):
- for event_type, repo_name, origin_type in events:
- yield make_event(event_type, repo_name, origin_type)
-
-
-def make_incomplete_event(event_type, name, origin_type,
- missing_data_key):
- event = make_event(event_type, name, origin_type)
- del event[missing_data_key]
- return event
-
-
-def make_incomplete_events(events):
- for event_type, repo_name, origin_type, missing_data_key in events:
- yield make_incomplete_event(event_type, repo_name,
- origin_type, missing_data_key)
diff --git a/swh/scheduler/tests/updater/test_backend.py b/swh/scheduler/tests/updater/test_backend.py
deleted file mode 100644
--- a/swh/scheduler/tests/updater/test_backend.py
+++ /dev/null
@@ -1,36 +0,0 @@
-# Copyright (C) 2018 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-from hypothesis import given
-from hypothesis.strategies import sets
-
-from swh.scheduler.updater.events import SWHEvent
-
-from . import from_regex
-
-
-@given(urls=sets(
- from_regex(
- r'^https://somewhere[.]org/[a-z0-9]{5,7}/[a-z0-9]{3,10}$'),
- min_size=10, max_size=15))
-def test_cache_read(urls, swh_scheduler_updater):
- # beware that the fixture is only called once for all the tests
- # generated by hypothesis, so the db is not cleared between calls.
- # see the end of
- # https://hypothesis.works/articles/hypothesis-pytest-fixtures/
- def gen_events(urls):
- for url in urls:
- yield SWHEvent({
- 'url': url,
- 'type': 'create',
- 'origin_type': 'git',
- })
- known_urls = set(e['url'] for e in
- swh_scheduler_updater.cache_read(limit=1000000))
- swh_scheduler_updater.cache_put(gen_events(urls))
- new_urls = {u.strip() for u in urls} - known_urls
- all_urls = set(e['url'] for e in
- swh_scheduler_updater.cache_read(limit=1000000))
- assert (all_urls - known_urls) == new_urls
diff --git a/swh/scheduler/tests/updater/test_consumer.py b/swh/scheduler/tests/updater/test_consumer.py
deleted file mode 100644
--- a/swh/scheduler/tests/updater/test_consumer.py
+++ /dev/null
@@ -1,199 +0,0 @@
-# Copyright (C) 2018 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import unittest
-from itertools import chain
-
-from hypothesis import given, settings, HealthCheck
-from hypothesis.strategies import lists, sampled_from, text, tuples
-
-from swh.scheduler.updater.consumer import UpdaterConsumer
-from swh.scheduler.updater.events import LISTENED_EVENTS, SWHEvent
-
-from . import UpdaterTestUtil, from_regex
-
-
-class FakeSchedulerUpdaterBackend:
- def __init__(self):
- self.events = []
-
- def cache_put(self, events):
- self.events.append(events)
-
-
-class FakeUpdaterConsumerBase(UpdaterConsumer):
- def __init__(self, backend):
- super().__init__(backend)
- self.connection_opened = False
- self.connection_closed = False
- self.consume_called = False
- self.has_events_called = False
-
- def open_connection(self):
- self.connection_opened = True
-
- def close_connection(self):
- self.connection_closed = True
-
- def convert_event(self, event):
- pass
-
-
-class FakeUpdaterConsumerRaise(FakeUpdaterConsumerBase):
- def has_events(self):
- self.has_events_called = True
- return True
-
- def consume_events(self):
- self.consume_called = True
- raise ValueError('Broken stuff')
-
-
-class UpdaterConsumerRaisingTest(unittest.TestCase):
- def setUp(self):
- self.updater = FakeUpdaterConsumerRaise(
- FakeSchedulerUpdaterBackend())
-
- def test_running_raise(self):
- """Raising during run should finish fine.
-
- """
- # given
- self.assertEqual(self.updater.count, 0)
- self.assertEqual(self.updater.seen_events, set())
- self.assertEqual(self.updater.events, [])
-
- # when
- with self.assertRaisesRegex(ValueError, 'Broken stuff'):
- self.updater.run()
-
- # then
- self.assertEqual(self.updater.count, 0)
- self.assertEqual(self.updater.seen_events, set())
- self.assertEqual(self.updater.events, [])
- self.assertTrue(self.updater.connection_opened)
- self.assertTrue(self.updater.has_events_called)
- self.assertTrue(self.updater.connection_closed)
- self.assertTrue(self.updater.consume_called)
-
-
-class FakeUpdaterConsumerNoEvent(FakeUpdaterConsumerBase):
- def has_events(self):
- self.has_events_called = True
- return False
-
- def consume_events(self):
- self.consume_called = True
-
-
-class UpdaterConsumerNoEventTest(unittest.TestCase):
- def setUp(self):
- self.updater = FakeUpdaterConsumerNoEvent(
- FakeSchedulerUpdaterBackend())
-
- def test_running_does_not_consume(self):
- """Run with no events should do just fine"""
- # given
- self.assertEqual(self.updater.count, 0)
- self.assertEqual(self.updater.seen_events, set())
- self.assertEqual(self.updater.events, [])
-
- # when
- self.updater.run()
-
- # then
- self.assertEqual(self.updater.count, 0)
- self.assertEqual(self.updater.seen_events, set())
- self.assertEqual(self.updater.events, [])
- self.assertTrue(self.updater.connection_opened)
- self.assertTrue(self.updater.has_events_called)
- self.assertTrue(self.updater.connection_closed)
- self.assertFalse(self.updater.consume_called)
-
-
-EVENT_KEYS = ['type', 'repo', 'created_at', 'origin_type']
-
-
-class FakeUpdaterConsumer(FakeUpdaterConsumerBase):
- def __init__(self, backend, messages):
- super().__init__(backend)
- self.messages = messages
- self.debug = False
-
- def has_events(self):
- self.has_events_called = True
- return len(self.messages) > 0
-
- def consume_events(self):
- self.consume_called = True
- for msg in self.messages:
- yield msg
- self.messages.pop()
-
- def convert_event(self, event, keys=EVENT_KEYS):
- for k in keys:
- v = event.get(k)
- if v is None:
- return None
-
- e = {
- 'type': event['type'],
- 'url': 'https://fake.url/%s' % event['repo']['name'],
- 'last_seen': event['created_at'],
- 'origin_type': event['origin_type'],
- }
- return SWHEvent(e)
-
-
-class UpdaterConsumerWithEventTest(UpdaterTestUtil, unittest.TestCase):
- @settings(suppress_health_check=[HealthCheck.too_slow])
- @given(lists(tuples(sampled_from(LISTENED_EVENTS), # event type
- from_regex(r'^[a-z0-9]{5,10}/[a-z0-9]{7,12}$'), # name
- text()), # origin type
- min_size=3, max_size=10),
- lists(tuples(text(), # event type
- from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$'), # name
- text()), # origin type
- min_size=3, max_size=10),
- lists(tuples(sampled_from(LISTENED_EVENTS), # event type
- from_regex(r'^[a-z0-9]{5,10}/[a-z0-9]{7,12}$'), # name
- text(), # origin type
- sampled_from(EVENT_KEYS)), # keys to drop
- min_size=3, max_size=10))
- def test_running(self, events, uninteresting_events, incomplete_events):
- """Interesting events are written to cache, others are dropped
-
- """
- # given
- ready_events = self._make_events(events)
- ready_uninteresting_events = self._make_events(uninteresting_events)
- ready_incomplete_events = self._make_incomplete_events(
- incomplete_events)
-
- updater = FakeUpdaterConsumer(
- FakeSchedulerUpdaterBackend(),
- list(chain(
- ready_events, ready_incomplete_events,
- ready_uninteresting_events)))
-
- self.assertEqual(updater.count, 0)
- self.assertEqual(updater.seen_events, set())
- self.assertEqual(updater.events, [])
-
- # when
- updater.run()
-
- # then
- self.assertEqual(updater.count, 0)
- self.assertEqual(updater.seen_events, set())
- self.assertEqual(updater.events, [])
- self.assertTrue(updater.connection_opened)
- self.assertTrue(updater.has_events_called)
- self.assertTrue(updater.connection_closed)
- self.assertTrue(updater.consume_called)
-
- self.assertEqual(updater.messages, [])
- # uninteresting or incomplete events are dropped
- self.assertTrue(len(updater.backend.events), len(events))
diff --git a/swh/scheduler/tests/updater/test_events.py b/swh/scheduler/tests/updater/test_events.py
deleted file mode 100644
--- a/swh/scheduler/tests/updater/test_events.py
+++ /dev/null
@@ -1,44 +0,0 @@
-# Copyright (C) 2018 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import unittest
-
-from hypothesis import given
-from hypothesis.strategies import sampled_from, text
-
-from swh.scheduler.updater.events import LISTENED_EVENTS, SWHEvent
-from swh.scheduler.updater.ghtorrent import events
-
-from . import UpdaterTestUtil
-
-
-def event_values_ko():
- return set(events['evt']).union(
- set(events['ent'])).difference(
- set(LISTENED_EVENTS))
-
-
-WRONG_EVENTS = sorted(list(event_values_ko()))
-
-
-class EventTest(UpdaterTestUtil, unittest.TestCase):
- @given(sampled_from(LISTENED_EVENTS), text(), text())
- def test_is_interesting_ok(self, event_type, name, origin_type):
- evt = self._make_simple_event(event_type, name, origin_type)
- self.assertTrue(SWHEvent(evt).is_interesting())
-
- @given(text(), text(), text())
- def test_is_interested_with_noisy_event_should_be_ko(
- self, event_type, name, origin_type):
- if event_type in LISTENED_EVENTS:
- # just in case something good is generated, skip it
- return
- evt = self._make_simple_event(event_type, name, origin_type)
- self.assertFalse(SWHEvent(evt).is_interesting())
-
- @given(sampled_from(WRONG_EVENTS), text(), text())
- def test_is_interesting_ko(self, event_type, name, origin_type):
- evt = self._make_simple_event(event_type, name, origin_type)
- self.assertFalse(SWHEvent(evt).is_interesting())
diff --git a/swh/scheduler/tests/updater/test_ghtorrent.py b/swh/scheduler/tests/updater/test_ghtorrent.py
deleted file mode 100644
--- a/swh/scheduler/tests/updater/test_ghtorrent.py
+++ /dev/null
@@ -1,174 +0,0 @@
-# Copyright (C) 2018 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import unittest
-from unittest.mock import patch
-
-from hypothesis import given
-from hypothesis.strategies import sampled_from
-
-from swh.scheduler.updater.events import SWHEvent
-from swh.scheduler.updater.ghtorrent import (INTERESTING_EVENT_KEYS,
- GHTorrentConsumer, events)
-from swh.scheduler.updater.backend import SchedulerUpdaterBackend
-
-from . import UpdaterTestUtil, from_regex
-
-
-def event_values():
- return set(events['evt']).union(set(events['ent']))
-
-
-def ghtorrentize_event_name(event_name):
- return '%sEvent' % event_name.capitalize()
-
-
-EVENT_TYPES = sorted([ghtorrentize_event_name(e) for e in event_values()])
-
-
-class FakeChannel:
- """Fake Channel (virtual connection inside a connection)
-
- """
- def close(self):
- self.close = True
-
-
-class FakeConnection:
- """Fake Rabbitmq connection for test purposes
-
- """
- def __init__(self, conn_string):
- self._conn_string = conn_string
- self._connect = False
- self._release = False
- self._channel = False
-
- def connect(self):
- self._connect = True
- return True
-
- def release(self):
- self._connect = False
- self._release = True
-
- def channel(self):
- self._channel = True
- return FakeChannel()
-
-
-class GHTorrentConsumerTest(UpdaterTestUtil, unittest.TestCase):
- def setUp(self):
- config = {
- 'ghtorrent': {
- 'rabbitmq': {
- 'conn': {
- 'url': 'amqp://u:p@https://somewhere:9807',
- },
- 'prefetch_read': 17,
- },
- 'batch_cache_write': 42,
- },
- 'scheduler_updater': {
- 'cls': 'local',
- 'args': {
- 'db': 'dbname=softwareheritage-scheduler-updater-dev',
- },
- },
- }
-
- GHTorrentConsumer.connection_class = FakeConnection
- with patch.object(
- SchedulerUpdaterBackend, '__init__', return_value=None):
- self.consumer = GHTorrentConsumer(**config)
-
- @patch('swh.scheduler.updater.backend.SchedulerUpdaterBackend')
- def test_init(self, mock_backend):
- # given
- # check init is ok
- self.assertEqual(self.consumer.batch, 42)
- self.assertEqual(self.consumer.prefetch_read, 17)
-
- def test_has_events(self):
- self.assertTrue(self.consumer.has_events())
-
- def test_connection(self):
- # when
- self.consumer.open_connection()
-
- # then
- self.assertEqual(self.consumer.conn._conn_string,
- 'amqp://u:p@https://somewhere:9807')
- self.assertTrue(self.consumer.conn._connect)
- self.assertFalse(self.consumer.conn._release)
-
- # when
- self.consumer.close_connection()
-
- # then
- self.assertFalse(self.consumer.conn._connect)
- self.assertTrue(self.consumer.conn._release)
- self.assertIsInstance(self.consumer.channel, FakeChannel)
-
- @given(sampled_from(EVENT_TYPES),
- from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$'))
- def test_convert_event_ok(self, event_type, name):
- input_event = self._make_event(event_type, name, 'git')
- actual_event = self.consumer.convert_event(input_event)
-
- self.assertTrue(isinstance(actual_event, SWHEvent))
-
- event = actual_event.get()
-
- expected_event = {
- 'type': event_type.lower().rstrip('Event'),
- 'url': 'https://github.com/%s' % name,
- 'last_seen': input_event['created_at'],
- 'cnt': 1,
- 'origin_type': 'git',
- }
- self.assertEqual(event, expected_event)
-
- @given(sampled_from(EVENT_TYPES),
- from_regex(r'^[a-z0-9]{5,7}/[a-z0-9]{3,10}$'),
- sampled_from(INTERESTING_EVENT_KEYS))
- def test_convert_event_ko(self, event_type, name, missing_data_key):
- input_event = self._make_incomplete_event(
- event_type, name, 'git', missing_data_key)
-
- logger = self.consumer.log
- del self.consumer.log # prevent gazillions of warnings
- actual_converted_event = self.consumer.convert_event(input_event)
- self.consumer.log = logger
- self.assertIsNone(actual_converted_event)
-
- @patch('swh.scheduler.updater.ghtorrent.collect_replies')
- def test_consume_events(self, mock_collect_replies):
- # given
- self.consumer.queue = 'fake-queue' # hack
- self.consumer.open_connection()
-
- fake_events = [
- self._make_event('PushEvent', 'user/some-repo', 'git'),
- self._make_event('PushEvent', 'user2/some-other-repo', 'git'),
- ]
-
- mock_collect_replies.return_value = fake_events
-
- # when
- actual_events = []
- for e in self.consumer.consume_events():
- actual_events.append(e)
-
- # then
- self.assertEqual(fake_events, actual_events)
-
- mock_collect_replies.assert_called_once_with(
- self.consumer.conn,
- self.consumer.channel,
- 'fake-queue',
- no_ack=False,
- limit=17
- )
diff --git a/swh/scheduler/tests/updater/test_writer.py b/swh/scheduler/tests/updater/test_writer.py
deleted file mode 100644
--- a/swh/scheduler/tests/updater/test_writer.py
+++ /dev/null
@@ -1,152 +0,0 @@
-# Copyright (C) 2018 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import os
-from glob import glob
-
-import pytest
-from pytest_postgresql.factories import postgresql as pg_fixture_factory
-
-from os.path import join
-from swh.core.utils import numfile_sortkey as sortkey
-from swh.scheduler.tests import SQL_DIR
-from swh.scheduler.updater.events import LISTENED_EVENTS, SWHEvent
-from swh.scheduler.updater.writer import UpdaterWriter
-
-from .conftest import make_simple_event
-
-
-pg_scheduler = pg_fixture_factory('postgresql_proc', 'scheduler')
-pg_updater = pg_fixture_factory('postgresql_proc', 'updater')
-
-
-def pg_sched_fact(dbname, sqldir):
- @pytest.fixture
- def pg_scheduler_db(request):
- pg = request.getfixturevalue('pg_%s' % dbname)
- dump_files = sorted(glob(os.path.join(sqldir, '*.sql')),
- key=sortkey)
- with pg.cursor() as cur:
- for fname in dump_files:
- with open(fname) as fobj:
- sql = fobj.read().replace('concurrently', '')
- cur.execute(sql)
- pg.commit()
- yield pg
-
- return pg_scheduler_db
-
-
-scheduler_db = pg_sched_fact('scheduler', SQL_DIR)
-updater_db = pg_sched_fact('updater', join(SQL_DIR, 'updater'))
-
-
-@pytest.fixture
-def swh_updater_writer(scheduler_db, updater_db):
- config = {
- 'scheduler': {
- 'cls': 'local',
- 'args': {
- 'db': scheduler_db.dsn,
- },
- },
- 'scheduler_updater': {
- 'cls': 'local',
- 'args': {
- 'db': updater_db.dsn,
- 'cache_read_limit': 5,
- },
- },
- 'updater_writer': {
- 'pause': 0.1,
- 'verbose': False,
- },
- }
- return UpdaterWriter(**config)
-
-
-def test_run_ko(swh_updater_writer):
- """Only git tasks are supported for now, other types are dismissed.
-
- """
- scheduler = swh_updater_writer.scheduler_backend
- updater = swh_updater_writer.scheduler_updater_backend
-
- ready_events = [
- SWHEvent(
- make_simple_event(event_type, 'origin-%s' % i,
- 'svn'))
- for i, event_type in enumerate(LISTENED_EVENTS)
- ]
-
- updater.cache_put(ready_events)
- list(updater.cache_read())
-
- r = scheduler.peek_ready_tasks('load-git')
-
- # first read on an empty scheduling db results with nothing in it
- assert not r
-
- # Read from cache to scheduler db
- swh_updater_writer.run()
-
- r = scheduler.peek_ready_tasks('load-git')
-
- # other reads after writes are still empty since it's not supported
- assert not r
-
-
-def test_run_ok(swh_updater_writer):
- """Only git origin are supported for now
-
- """
- scheduler = swh_updater_writer.scheduler_backend
- updater = swh_updater_writer.scheduler_updater_backend
-
- ready_events = [
- SWHEvent(
- make_simple_event(event_type, 'origin-%s' % i, 'git'))
- for i, event_type in enumerate(LISTENED_EVENTS)
- ]
-
- expected_length = len(ready_events)
-
- updater.cache_put(ready_events)
-
- data = list(updater.cache_read())
- assert len(data) == expected_length
-
- r = scheduler.peek_ready_tasks('load-git')
-
- # first read on an empty scheduling db results with nothing in it
- assert not r
-
- # Read from cache to scheduler db
- swh_updater_writer.run()
-
- # now, we should have scheduling task ready
- r = scheduler.peek_ready_tasks('load-git')
-
- assert len(r) == expected_length
-
- # Check the task has been scheduled
- for t in r:
- assert t['type'] == 'load-git'
- assert t['priority'] == 'normal'
- assert t['policy'] == 'oneshot'
- assert t['status'] == 'next_run_not_scheduled'
-
- # writer has nothing to do now
- swh_updater_writer.run()
-
- # so no more data in cache
- data = list(updater.cache_read())
-
- assert not data
-
- # provided, no runner is ran, still the same amount of scheduling tasks
- r = scheduler.peek_ready_tasks('load-git')
-
- assert len(r) == expected_length
diff --git a/swh/scheduler/updater/__init__.py b/swh/scheduler/updater/__init__.py
deleted file mode 100644
diff --git a/swh/scheduler/updater/backend.py b/swh/scheduler/updater/backend.py
deleted file mode 100644
--- a/swh/scheduler/updater/backend.py
+++ /dev/null
@@ -1,113 +0,0 @@
-# Copyright (C) 2018 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-
-from arrow import utcnow
-import psycopg2.pool
-import psycopg2.extras
-
-from swh.core.db import BaseDb
-from swh.core.db.common import db_transaction, db_transaction_generator
-from swh.scheduler.backend import format_query
-
-
-class SchedulerUpdaterBackend:
- CONFIG_BASE_FILENAME = 'backend/scheduler-updater'
-# 'cache_read_limit': ('int', 1000),
-
- def __init__(self, db, cache_read_limit=1000,
- min_pool_conns=1, max_pool_conns=10):
- """
- Args:
- db_conn: either a libpq connection string, or a psycopg2 connection
-
- """
- if isinstance(db, psycopg2.extensions.connection):
- self._pool = None
- self._db = BaseDb(db)
- else:
- self._pool = psycopg2.pool.ThreadedConnectionPool(
- min_pool_conns, max_pool_conns, db,
- cursor_factory=psycopg2.extras.RealDictCursor,
- )
- self._db = None
- self.limit = cache_read_limit
-
- def get_db(self):
- if self._db:
- return self._db
- return BaseDb.from_pool(self._pool)
-
- def put_db(self, db):
- if db is not self._db:
- db.put_conn()
-
- cache_put_keys = ['url', 'cnt', 'last_seen', 'origin_type']
-
- @db_transaction()
- def cache_put(self, events, timestamp=None, db=None, cur=None):
- """Write new events in the backend.
-
- """
- cur.execute('select swh_mktemp_cache()')
- db.copy_to(prepare_events(events, timestamp),
- 'tmp_cache', self.cache_put_keys, cur=cur)
- cur.execute('select swh_cache_put()')
-
- cache_read_keys = ['id', 'url', 'origin_type', 'cnt', 'first_seen',
- 'last_seen']
-
- @db_transaction_generator()
- def cache_read(self, timestamp=None, limit=None, db=None, cur=None):
- """Read events from the cache prior to timestamp.
-
- Note that limit=None does not mean 'no limit' but use the default
- limit (see cache_read_limit constructor argument).
-
- """
- if not timestamp:
- timestamp = utcnow()
-
- if not limit:
- limit = self.limit
-
- q = format_query('select {keys} from swh_cache_read(%s, %s)',
- self.cache_read_keys)
- cur.execute(q, (timestamp, limit))
- yield from cur.fetchall()
-
- @db_transaction()
- def cache_remove(self, entries, db=None, cur=None):
- """Clean events from the cache
-
- """
- q = 'delete from cache where url in (%s)' % (
- ', '.join(("'%s'" % e for e in entries)), )
- cur.execute(q)
-
-
-def prepare_events(events, timestamp=None):
- if timestamp is None:
- timestamp = utcnow()
- outevents = []
- urls = []
- for e in events:
- event = e.get()
- url = event['url'].strip()
- if event['last_seen'] is None:
- event['last_seen'] = timestamp
- event['url'] = url
-
- if url in urls:
- idx = urls.index(url)
- urls.append(urls.pop(idx))
- prev_event = outevents.pop(idx)
- event['cnt'] += prev_event['cnt']
- event['last_seen'] = max(
- event['last_seen'], prev_event['last_seen'])
- else:
- urls.append(url)
- outevents.append(event)
- return outevents
diff --git a/swh/scheduler/updater/consumer.py b/swh/scheduler/updater/consumer.py
deleted file mode 100644
--- a/swh/scheduler/updater/consumer.py
+++ /dev/null
@@ -1,138 +0,0 @@
-# Copyright (C) 2018 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import logging
-
-from abc import ABCMeta, abstractmethod
-
-
-class UpdaterConsumer(metaclass=ABCMeta):
- """Event consumer
-
- """
- def __init__(self, backend, batch_cache_write=1000):
- super().__init__()
- self._reset_cache()
- self.backend = backend
- self.batch = int(batch_cache_write)
- logging.basicConfig(level=logging.DEBUG)
- self.log = logging.getLogger('%s.%s' % (
- self.__class__.__module__, self.__class__.__name__))
-
- def _reset_cache(self):
- """Reset internal cache.
-
- """
- self.count = 0
- self.seen_events = set()
- self.events = []
-
- def is_interesting(self, event):
- """Determine if an event is interesting or not.
-
- Args:
- event (SWHEvent): SWH event
-
- """
- return event.is_interesting()
-
- @abstractmethod
- def convert_event(self, event):
- """Parse an event into an SWHEvent.
-
- """
- pass
-
- def process_event(self, event):
- """Process converted and interesting event.
-
- Args:
- event (SWHEvent): Event to process if deemed interesting
-
- """
- try:
- if event.url in self.seen_events:
- event.cnt += 1
- else:
- self.events.append(event)
- self.seen_events.add(event.url)
- self.count += 1
- finally:
- if self.count >= self.batch:
- if self.events:
- self.backend.cache_put(self.events)
- self._reset_cache()
-
- def _flush(self):
- """Flush remaining internal cache if any.
-
- """
- if self.events:
- self.backend.cache_put(self.events)
- self._reset_cache()
-
- @abstractmethod
- def has_events(self):
- """Determine if there remains events to consume.
-
- Returns
- boolean value, true for remaining events, False otherwise
-
- """
- pass
-
- @abstractmethod
- def consume_events(self):
- """The main entry point to consume events.
-
- This should either yield or return message for consumption.
-
- """
- pass
-
- @abstractmethod
- def open_connection(self):
- """Open a connection to the remote system we are supposed to consume
- from.
-
- """
- pass
-
- @abstractmethod
- def close_connection(self):
- """Close opened connection to the remote system.
-
- """
- pass
-
- def run(self):
- """The main entry point to consume events.
-
- """
- try:
- self.open_connection()
- while self.has_events():
- for _event in self.consume_events():
- event = self.convert_event(_event)
- if not event:
- self.log.warning(
- 'Incomplete event dropped %s' % _event)
- continue
- if not self.is_interesting(event):
- continue
- if self.debug:
- self.log.debug('Event: %s' % event)
- try:
- self.process_event(event)
- except Exception:
- self.log.exception(
- 'Problem when processing event %s' % _event)
- continue
- except Exception as e:
- self.log.error('Error raised during consumption: %s' % e)
- raise e
- finally:
- self.close_connection()
- self._flush()
diff --git a/swh/scheduler/updater/events.py b/swh/scheduler/updater/events.py
deleted file mode 100644
--- a/swh/scheduler/updater/events.py
+++ /dev/null
@@ -1,39 +0,0 @@
-# Copyright (C) 2018 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-
-LISTENED_EVENTS = [
- 'delete',
- 'public',
- 'push'
-]
-
-
-class SWHEvent:
- """SWH's interesting event (resulting in an origin update)
-
- """
- def __init__(self, evt, cnt=1):
- self.event = evt
- self.type = evt['type'].lower()
- self.url = evt['url']
- self.last_seen = evt.get('last_seen')
- self.cnt = cnt
- self.origin_type = evt.get('origin_type')
-
- def is_interesting(self):
- return self.type in LISTENED_EVENTS
-
- def get(self):
- return {
- 'type': self.type,
- 'url': self.url,
- 'last_seen': self.last_seen,
- 'cnt': self.cnt,
- 'origin_type': self.origin_type
- }
-
- def __str__(self):
- return str(self.get())
diff --git a/swh/scheduler/updater/ghtorrent/__init__.py b/swh/scheduler/updater/ghtorrent/__init__.py
deleted file mode 100644
--- a/swh/scheduler/updater/ghtorrent/__init__.py
+++ /dev/null
@@ -1,143 +0,0 @@
-# Copyright (C) 2018 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import json
-
-from kombu import Connection, Exchange, Queue
-from kombu.common import collect_replies
-
-from swh.core.config import merge_configs
-
-from swh.scheduler.updater.events import SWHEvent
-from swh.scheduler.updater.consumer import UpdaterConsumer
-from swh.scheduler.updater.backend import SchedulerUpdaterBackend
-
-
-events = {
- # ghtorrent events related to github events (interesting)
- 'evt': [
- 'commitcomment', 'create', 'delete', 'deployment',
- 'deploymentstatus', 'download', 'follow', 'fork', 'forkapply',
- 'gist', 'gollum', 'issuecomment', 'issues', 'member',
- 'membership', 'pagebuild', 'public', 'pullrequest',
- 'pullrequestreviewcomment', 'push', 'release', 'repository',
- 'status', 'teamadd', 'watch'
- ],
- # ghtorrent events related to mongodb insert (not interesting)
- 'ent': [
- 'commit_comments', 'commits', 'followers', 'forks',
- 'geo_cache', 'issue_comments', 'issue_events', 'issues',
- 'org_members', 'pull_request_comments', 'pull_requests',
- 'repo_collaborators', 'repo_labels', 'repos', 'users', 'watchers'
- ]
-}
-
-INTERESTING_EVENT_KEYS = ['type', 'repo', 'created_at']
-
-DEFAULT_CONFIG = {
- 'ghtorrent': {
- 'batch_cache_write': 1000,
- 'rabbitmq': {
- 'prefetch_read': 100,
- 'conn': {
- 'url': 'amqp://guest:guest@localhost:5672',
- 'exchange_name': 'ght-streams',
- 'routing_key': 'something',
- 'queue_name': 'fake-events',
- },
- },
- },
- 'scheduler_updater': {
- 'cls': 'local',
- 'args': {
- 'db': 'dbname=softwareheritage-scheduler-updater-dev',
- 'cache_read_limit': 1000,
- },
- },
-}
-
-
-class GHTorrentConsumer(UpdaterConsumer):
- """GHTorrent events consumer
-
- """
- connection_class = Connection
-
- def __init__(self, **config):
- self.config = merge_configs(DEFAULT_CONFIG, config)
-
- ght_config = self.config['ghtorrent']
- rmq_config = ght_config['rabbitmq']
- self.prefetch_read = int(rmq_config.get('prefetch_read', 100))
-
- exchange = Exchange(
- rmq_config['conn']['exchange_name'],
- 'topic', durable=True)
- routing_key = rmq_config['conn']['routing_key']
- self.queue = Queue(rmq_config['conn']['queue_name'],
- exchange=exchange,
- routing_key=routing_key,
- auto_delete=True)
-
- if self.config['scheduler_updater']['cls'] != 'local':
- raise ValueError(
- 'The scheduler_updater can only be a cls=local for now')
- backend = SchedulerUpdaterBackend(
- **self.config['scheduler_updater']['args'])
-
- super().__init__(backend, ght_config.get('batch_cache_write', 1000))
-
- def has_events(self):
- """Always has events
-
- """
- return True
-
- def convert_event(self, event):
- """Given ghtorrent event, convert it to a SWHEvent instance.
-
- """
- if isinstance(event, str):
- event = json.loads(event)
- for k in INTERESTING_EVENT_KEYS:
- if k not in event:
- if hasattr(self, 'log'):
- self.log.warning(
- 'Event should have the \'%s\' entry defined' % k)
- return None
-
- _type = event['type'].lower().rstrip('Event')
- _repo_name = 'https://github.com/%s' % event['repo']['name']
-
- return SWHEvent({
- 'type': _type,
- 'url': _repo_name,
- 'last_seen': event['created_at'],
- 'origin_type': 'git',
- })
-
- def open_connection(self):
- """Open rabbitmq connection
-
- """
- self.conn = self.connection_class(
- self.config['ghtorrent']['rabbitmq']['conn']['url'])
- self.conn.connect()
- self.channel = self.conn.channel()
-
- def close_connection(self):
- """Close rabbitmq connection
-
- """
- self.channel.close()
- self.conn.release()
-
- def consume_events(self):
- """Consume and yield queue messages
-
- """
- yield from collect_replies(
- self.conn, self.channel, self.queue,
- no_ack=False, limit=self.prefetch_read)
diff --git a/swh/scheduler/updater/ghtorrent/cli.py b/swh/scheduler/updater/ghtorrent/cli.py
deleted file mode 100644
--- a/swh/scheduler/updater/ghtorrent/cli.py
+++ /dev/null
@@ -1,23 +0,0 @@
-# Copyright (C) 2018 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import click
-
-
-@click.command()
-@click.option('--verbose/--no-verbose', '-v', default=False,
- help='Verbose mode')
-@click.pass_context
-def main(ctx, verbose):
- """Consume events from ghtorrent and write them to cache.
-
- """
- click.echo("Deprecated! Use 'swh-scheduler updater' instead.",
- err=True)
- ctx.exit(1)
-
-
-if __name__ == '__main__':
- main()
diff --git a/swh/scheduler/updater/writer.py b/swh/scheduler/updater/writer.py
deleted file mode 100644
--- a/swh/scheduler/updater/writer.py
+++ /dev/null
@@ -1,96 +0,0 @@
-# Copyright (C) 2018 The Software Heritage developers
-# See the AUTHORS file at the top-level directory of this distribution
-# License: GNU General Public License version 3, or any later version
-# See top-level LICENSE file for more information
-
-import click
-import logging
-import time
-
-from arrow import utcnow
-
-from swh.core import utils
-from swh.scheduler import get_scheduler
-from swh.scheduler.utils import create_oneshot_task_dict
-from swh.scheduler.updater.backend import SchedulerUpdaterBackend
-
-
-class UpdaterWriter:
- """Updater writer in charge of updating the scheduler db with latest
- prioritized oneshot tasks
-
- In effect, this:
- - reads the events from scheduler updater's db
- - converts those events into priority oneshot tasks
- - dumps them into the scheduler db
-
- """
-
- def __init__(self, **config):
- self.config = config
- if self.config['scheduler_updater']['cls'] != 'local':
- raise ValueError(
- 'The scheduler_updater can only be a cls=local for now')
- self.scheduler_updater_backend = SchedulerUpdaterBackend(
- **self.config['scheduler_updater']['args'])
- self.scheduler_backend = get_scheduler(**self.config['scheduler'])
- self.pause = self.config.get('updater_writer', {}).get('pause', 10)
- self.log = logging.getLogger(
- 'swh.scheduler.updater.writer.UpdaterWriter')
-
- def convert_to_oneshot_task(self, event):
- """Given an event, convert it into oneshot task with priority
-
- Args:
- event (dict): The event to convert to task
-
- """
- if event['origin_type'] == 'git':
- return create_oneshot_task_dict(
- 'load-git',
- event['url'],
- priority='normal')
- self.log.warning('Type %s is not supported for now, only git' % (
- event['origin_type'], ))
- return None
-
- def write_event_to_scheduler(self, events):
- """Write events to the scheduler and yield ids when done"""
- # convert events to oneshot tasks
- oneshot_tasks = filter(lambda e: e is not None,
- map(self.convert_to_oneshot_task, events))
- # write event to scheduler
- self.scheduler_backend.create_tasks(list(oneshot_tasks))
- for e in events:
- yield e['url']
-
- def run(self):
- """First retrieve events from cache (including origin_type, cnt),
- then convert them into oneshot tasks with priority, then
- write them to the scheduler db, at last remove them from
- cache.
-
- """
- while True:
- timestamp = utcnow()
- events = list(self.scheduler_updater_backend.cache_read(timestamp))
- if not events:
- break
- for urls in utils.grouper(self.write_event_to_scheduler(events),
- n=100):
- self.scheduler_updater_backend.cache_remove(urls)
- time.sleep(self.pause)
-
-
-@click.command()
-@click.option('--verbose/--no-verbose', '-v', default=False,
- help='Verbose mode')
-@click.pass_context
-def main(ctx, verbose):
- click.echo("Deprecated! Use 'swh-scheduler updater' instead.",
- err=True)
- ctx.exit(1)
-
-
-if __name__ == '__main__':
- main()

File Metadata

Mime Type
text/plain
Expires
Thu, Jul 3, 3:57 PM (2 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3215664

Event Timeline