diff --git a/swh/journal/backfill.py b/swh/journal/backfill.py new file mode 100644 --- /dev/null +++ b/swh/journal/backfill.py @@ -0,0 +1,184 @@ +# Copyright (C) 2017-2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""Module defining journal backfiller classes. + +Those backfiller goal is to produce back part or all of the objects +from the storage to the journal topics + +At the moment, a first naive implementation is the +JournalBackfiller. It simply reads the objects from the +storage and sends every object identifier back to the journal. + +""" + +import logging +import psycopg2 + +from kafka import KafkaProducer + +from .serializers import key_to_kafka + +from swh.core.db import typecast_bytea + + +# Defining the key components per object type +TYPE_TO_PRIMARY_KEY = { + 'content': ['sha1', 'sha1_git', 'sha256', 'blake2s256'], + 'skipped_content': ['sha1', 'sha1_git', 'sha256', 'blake2s256'], + 'origin': ['type', 'url'], + 'directory': ['id'], + 'revision': ['id'], + 'release': ['id'], + 'origin_visit': ['type', 'url', 'fetch_date', 'visit_date'], +} + +# The columns to read per object type +TYPE_TO_COLUMNS = { + 'content': [ + 'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', 'status', + # 'ctime' # fix the conversion + ], + 'skipped_content': [ + 'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', 'ctime', + 'status', 'reason', + ], + 'origin': ['type', 'url'], + 'origin_visit': ['type', 'url', 'fetch_date', 'visit_date'], + 'directory': ['id'], + 'revision': ['id'], + 'release': ['id'], + +} + + +def fetch(db_conn, obj_type): + """Fetch all obj_type's identifiers from db. + + This opens one connection, stream objects and when done, close + the connection. + + Raises: + ValueError if obj_type is not supported + + Yields: + Identifiers for the specific object_type + + """ + columns = TYPE_TO_COLUMNS.get(obj_type) + if not columns: + raise ValueError('The object type %s is not supported. ' + 'Only possible values are %s' % ( + obj_type, TYPE_TO_PRIMARY_KEY.keys())) + + columns_str = ','.join(columns) + query = 'select %s from %s order by %s' % ( + columns_str, obj_type, columns_str) + server_side_cursor_name = 'swh.journal.%s' % obj_type + + def cursor_setup(conn, server_side_cursor_name): + """Setup cursor to return dict of data""" + # cur = conn.cursor(name=server_side_cursor_name) + cur = conn.cursor() + cur.execute("SELECT null::bytea, null::bytea[]") + bytea_oid = cur.description[0][1] + bytea_array_oid = cur.description[1][1] + + t_bytes = psycopg2.extensions.new_type( + (bytea_oid,), "bytea", typecast_bytea) + psycopg2.extensions.register_type(t_bytes, conn) + + t_bytes_array = psycopg2.extensions.new_array_type( + (bytea_array_oid,), "bytea[]", t_bytes) + psycopg2.extensions.register_type(t_bytes_array, conn) + + return cur + + logging.basicConfig(level=logging.DEBUG) + with psycopg2.connect(db_conn) as conn: + cursor = cursor_setup(conn, server_side_cursor_name) + cursor.execute(query) + component_keys = TYPE_TO_PRIMARY_KEY[obj_type] + logging.debug('component_keys: %s' % component_keys) + for row in cursor: + record = dict(zip(columns, row)) + logging.debug('record: %s' % record) + logging.debug('keys: %s' % record.keys()) + composite_key = tuple((record[k] for k in component_keys)) + logging.debug(composite_key) + yield composite_key, record + + +MANDATORY_KEYS = [ + 'brokers', 'object_types', 'storage_dbconn', + 'final_prefix', 'client_id', +] + + +class JournalBackfiller: + """Class in charge of reading the storage's objects and sends those + back to the publisher queue. + + This is designed to be run periodically. + + """ + def __init__(self, config=None): + self.config = config + self.check_config(config) + self.object_types = self.config['object_types'] + self.storage_dbconn = self.config['storage_dbconn'] + + self.producer = KafkaProducer( + bootstrap_servers=config['brokers'], + key_serializer=key_to_kafka, + value_serializer=key_to_kafka, + client_id=config['client_id'], + ) + + def check_config(self, config): + missing_keys = [] + for key in MANDATORY_KEYS: + if not config.get(key): + missing_keys.append(key) + + if missing_keys: + raise ValueError( + 'Configuration error: The following keys must be' + ' provided: %s' % (','.join(missing_keys), )) + + object_types = config['object_types'] + for obj_type in object_types: + if obj_type not in TYPE_TO_PRIMARY_KEY: + raise ValueError('The object type %s is not supported. ' + 'Possible values are %s' % ( + obj_type, + ', '.join(TYPE_TO_PRIMARY_KEY))) + + def _read_storage(self): + """Read storage's objects and generates tuple as object_type, dict of + object. + + Yields: + tuple of object_type, object as dict + + """ + for obj_type in self.object_types: + for obj_key, obj in fetch(self.storage_dbconn, obj_type): + yield obj_type, obj_key, obj + + def run(self): + """Reads storage's subscribed object types and send them to the + publisher's reading queue. + + """ + for obj_type, obj_key, obj in self._read_storage(): + topic = '%s.%s' % (self.config['final_prefix'], obj_type) + logging.debug('topic: %s, key: %s, value: %s' % ( + topic, obj_key, obj)) + self.producer.send(topic, key=obj_key, value=obj) + + +if __name__ == '__main__': + print('Please use the "swh-journal backfiller run" command') diff --git a/swh/journal/checker.py b/swh/journal/checker.py deleted file mode 100644 --- a/swh/journal/checker.py +++ /dev/null @@ -1,137 +0,0 @@ -# Copyright (C) 2017 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -"""Module defining journal checker classes. - -Those checker goal is to send back all, or missing objects from the -journal queues. - -At the moment, a first naive implementation is the -SimpleCheckerProducer. It simply reads the objects from the -storage and sends every object identifier back to the journal. - -""" - -import psycopg2 - -from kafka import KafkaProducer - -from swh.core.config import SWHConfig -from .serializers import key_to_kafka - - -TYPE_TO_PRIMARY_KEY = { - 'origin': ['id'], - 'content': ['sha1', 'sha1_git', 'sha256'], - 'directory': ['id'], - 'revision': ['id'], - 'release': ['id'], - 'origin_visit': ['origin', 'visit'], - 'skipped_content': ['sha1', 'sha1_git', 'sha256'], -} - - -def entry_to_bytes(entry): - """Convert an entry coming from the database to bytes""" - if isinstance(entry, memoryview): - return entry.tobytes() - if isinstance(entry, tuple): - return [entry_to_bytes(value) for value in entry] - return entry - - -def fetch(db_conn, obj_type): - """Fetch all obj_type's identifiers from db. - - This opens one connection, stream objects and when done, close - the connection. - - Raises: - ValueError if obj_type is not supported - - Yields: - Identifiers for the specific object_type - - """ - primary_key = TYPE_TO_PRIMARY_KEY.get(obj_type) - if not primary_key: - raise ValueError('The object type %s is not supported. ' - 'Only possible values are %s' % ( - obj_type, TYPE_TO_PRIMARY_KEY.keys())) - - primary_key_str = ','.join(primary_key) - query = 'select %s from %s order by %s' % ( - primary_key_str, obj_type, primary_key_str) - server_side_cursor_name = 'swh.journal.%s' % obj_type - - with psycopg2.connect(db_conn) as db: - cursor = db.cursor(name=server_side_cursor_name) - cursor.execute(query) - for o in cursor: - yield dict(zip(primary_key, entry_to_bytes(o))) - - -class SimpleCheckerProducer(SWHConfig): - """Class in charge of reading the storage's objects and sends those - back to the publisher queue. - - This is designed to be run periodically. - - """ - DEFAULT_CONFIG = { - 'brokers': ('list[str]', ['getty.internal.softwareheritage.org']), - 'temporary_prefix': ('str', 'swh.tmp_journal.new'), - 'publisher_id': ('str', 'swh.journal.publisher.test'), - 'object_types': ('list[str]', ['content', 'revision', 'release']), - 'storage_dbconn': ('str', 'service=swh-dev'), - } - - CONFIG_BASE_FILENAME = 'journal/checker' - - def __init__(self, extra_configuration=None): - self.config = config = self.parse_config_file() - if extra_configuration: - config.update(extra_configuration) - - self.object_types = self.config['object_types'] - for obj_type in self.object_types: - if obj_type not in TYPE_TO_PRIMARY_KEY: - raise ValueError('The object type %s is not supported. ' - 'Possible values are %s' % ( - obj_type, - ', '.join(TYPE_TO_PRIMARY_KEY))) - - self.storage_dbconn = self.config['storage_dbconn'] - - self.producer = KafkaProducer( - bootstrap_servers=config['brokers'], - value_serializer=key_to_kafka, - client_id=config['publisher_id'], - ) - - def _read_storage(self): - """Read storage's objects and generates tuple as object_type, dict of - object. - - Yields: - tuple of object_type, object as dict - - """ - for obj_type in self.object_types: - for obj in fetch(self.storage_dbconn, obj_type): - yield obj_type, obj - - def run(self): - """Reads storage's subscribed object types and send them to the - publisher's reading queue. - - """ - for obj_type, obj in self._read_storage(): - topic = '%s.%s' % (self.config['temporary_prefix'], obj_type) - self.producer.send(topic, value=obj) - - -if __name__ == '__main__': - SimpleCheckerProducer().run() diff --git a/swh/journal/cli.py b/swh/journal/cli.py --- a/swh/journal/cli.py +++ b/swh/journal/cli.py @@ -9,6 +9,7 @@ from swh.core import config from swh.journal.publisher import JournalPublisher +from swh.journal.backfill import JournalBackfiller CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) @@ -66,6 +67,21 @@ ctx.exit(0) +@cli.command() +@click.pass_context +def backfiller(ctx): + """Manipulate backfiller + + """ + conf = ctx.obj['config'] + publisher = JournalBackfiller(conf) + try: + while True: + publisher.run() + except KeyboardInterrupt: + ctx.exit(0) + + def main(): return cli(auto_envvar_prefix='SWH_JOURNAL') diff --git a/swh/journal/tests/conftest.py b/swh/journal/tests/conftest.py --- a/swh/journal/tests/conftest.py +++ b/swh/journal/tests/conftest.py @@ -132,7 +132,7 @@ } -TEST_CONFIG = { +TEST_PUBLISHER_CONFIG = { 'temporary_prefix': 'swh.tmp_journal.new', 'final_prefix': 'swh.journal.objects', 'consumer_id': 'swh.journal.publisher', @@ -188,17 +188,17 @@ @pytest.fixture -def test_config(): +def test_publisher_config(): """Test configuration needed for publisher/producer/consumer """ - return TEST_CONFIG + return TEST_PUBLISHER_CONFIG @pytest.fixture def producer_to_publisher( kafka_server: Tuple[Popen, int], - test_config: Dict, + test_publisher_config: Dict, ) -> KafkaProducer: # noqa """Producer to send message to the publisher's consumer. @@ -208,19 +208,20 @@ bootstrap_servers='localhost:{}'.format(port), key_serializer=key_to_kafka, value_serializer=key_to_kafka, - client_id=test_config['consumer_id'], + client_id=test_publisher_config['consumer_id'], ) return producer @pytest.fixture def consumer_from_publisher(kafka_server: Tuple[Popen, int], - test_config: Dict) -> KafkaConsumer: + test_publisher_config: Dict) -> KafkaConsumer: """Get a connected Kafka consumer. """ - kafka_topics = ['%s.%s' % (test_config['final_prefix'], object_type) - for object_type in test_config['object_types']] + kafka_topics = ['%s.%s' % (test_publisher_config['final_prefix'], + object_type) + for object_type in test_publisher_config['object_types']] _, kafka_port = kafka_server consumer = KafkaConsumer( *kafka_topics, @@ -244,7 +245,7 @@ @pytest.fixture def publisher(kafka_server: Tuple[Popen, int], - test_config: Dict) -> JournalPublisher: + test_publisher_config: Dict) -> JournalPublisher: """Test Publisher factory. We cannot use a fixture here as we need to modify the sample. @@ -252,6 +253,6 @@ # consumer and producer of the publisher needs to discuss with the # right instance _, port = kafka_server - test_config['brokers'] = ['localhost:{}'.format(port)] - publisher = JournalPublisherTest(test_config) + test_publisher_config['brokers'] = ['localhost:{}'.format(port)] + publisher = JournalPublisherTest(test_publisher_config) return publisher diff --git a/swh/journal/tests/test_backfill.py b/swh/journal/tests/test_backfill.py new file mode 100644 --- /dev/null +++ b/swh/journal/tests/test_backfill.py @@ -0,0 +1,42 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import pytest + +from swh.journal.backfill import JournalBackfiller, TYPE_TO_PRIMARY_KEY + + +TEST_CONFIG = { + 'brokers': ['localhost'], + 'final_prefix': 'swh.tmp_journal.new', + 'client_id': 'swh.journal.publisher.test', + 'object_types': ['content', 'revision', 'release'], + 'storage_dbconn': 'service=swh-dev', +} + + +def test_config_ko_missing_mandatory_key(): + for key in TEST_CONFIG.keys(): + config = TEST_CONFIG.copy() + config.pop(key) + + with pytest.raises(ValueError) as e: + JournalBackfiller(config) + + error = ('Configuration error: The following keys must be' + ' provided: %s' % (','.join([key]), )) + assert e.value.args[0] == error + + +def test_config_ko_unknown_object_type(): + wrong_config = TEST_CONFIG.copy() + wrong_config['object_types'] = ['something-wrong'] + with pytest.raises(ValueError) as e: + JournalBackfiller(wrong_config) + + error = ('The object type something-wrong is not supported. ' + 'Possible values are %s' % ( + ', '.join(TYPE_TO_PRIMARY_KEY))) + assert e.value.args[0] == error diff --git a/swh/journal/tests/test_publisher_kafka.py b/swh/journal/tests/test_publisher_kafka.py --- a/swh/journal/tests/test_publisher_kafka.py +++ b/swh/journal/tests/test_publisher_kafka.py @@ -11,7 +11,7 @@ from swh.journal.serializers import value_to_kafka, kafka_to_value from swh.journal.publisher import JournalPublisher -from .conftest import TEST_CONFIG, OBJECT_TYPE_KEYS +from .conftest import TEST_PUBLISHER_CONFIG, OBJECT_TYPE_KEYS def assert_publish_ok(publisher: JournalPublisher, @@ -43,7 +43,7 @@ # send message to the publisher for obj in objects: producer_to_publisher.send( - '%s.%s' % (TEST_CONFIG['temporary_prefix'], object_type), + '%s.%s' % (TEST_PUBLISHER_CONFIG['temporary_prefix'], object_type), obj ) @@ -53,7 +53,8 @@ publisher.poll(max_messages=1) # then (client reads from the messages from output topic) - expected_topic = '%s.%s' % (TEST_CONFIG['final_prefix'], object_type) + expected_topic = '%s.%s' % (TEST_PUBLISHER_CONFIG['final_prefix'], + object_type) expected_msgs = [ ( object_[object_key_id], diff --git a/swh/journal/tests/test_publisher_no_kafka.py b/swh/journal/tests/test_publisher_no_kafka.py --- a/swh/journal/tests/test_publisher_no_kafka.py +++ b/swh/journal/tests/test_publisher_no_kafka.py @@ -7,7 +7,7 @@ import unittest from .conftest import ( - JournalPublisherTest, TEST_CONFIG, + JournalPublisherTest, TEST_PUBLISHER_CONFIG, CONTENTS, REVISIONS, RELEASES, ORIGINS ) from swh.journal.publisher import MANDATORY_KEYS @@ -37,7 +37,8 @@ """ def setUp(self): - self.publisher = JournalPublisherNoKafkaInMemoryStorage(TEST_CONFIG) + self.publisher = JournalPublisherNoKafkaInMemoryStorage( + TEST_PUBLISHER_CONFIG) self.contents = [{b'sha1': c['sha1']} for c in CONTENTS] self.revisions = [{b'id': c['id']} for c in REVISIONS] self.releases = [{b'id': c['id']} for c in RELEASES] @@ -132,7 +133,7 @@ """Instantiate a publisher with the right config is fine """ - publisher = JournalPublisherCheckTest(TEST_CONFIG) + publisher = JournalPublisherCheckTest(TEST_PUBLISHER_CONFIG) assert publisher is not None @@ -141,7 +142,7 @@ """ for k in MANDATORY_KEYS: - conf = TEST_CONFIG.copy() + conf = TEST_PUBLISHER_CONFIG.copy() conf.pop(k) with pytest.raises(ValueError) as e: JournalPublisherCheckTest(conf)