diff --git a/swh/journal/backfill.py b/swh/journal/backfill.py index 256401a..a6f5cf8 100644 --- a/swh/journal/backfill.py +++ b/swh/journal/backfill.py @@ -1,151 +1,184 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Module defining journal backfiller classes. Those backfiller goal is to produce back part or all of the objects from the storage to the journal topics At the moment, a first naive implementation is the JournalBackfiller. It simply reads the objects from the storage and sends every object identifier back to the journal. """ +import logging import psycopg2 from kafka import KafkaProducer from .serializers import key_to_kafka +from swh.core.db import typecast_bytea + +# Defining the key components per object type TYPE_TO_PRIMARY_KEY = { - 'origin': ['id'], 'content': ['sha1', 'sha1_git', 'sha256', 'blake2s256'], + 'skipped_content': ['sha1', 'sha1_git', 'sha256', 'blake2s256'], + 'origin': ['type', 'url'], 'directory': ['id'], 'revision': ['id'], 'release': ['id'], - 'origin_visit': ['origin', 'visit'], - 'skipped_content': ['sha1', 'sha1_git', 'sha256'], + 'origin_visit': ['type', 'url', 'fetch_date', 'visit_date'], } +# The columns to read per object type +TYPE_TO_COLUMNS = { + 'content': [ + 'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', 'status', + # 'ctime' # fix the conversion + ], + 'skipped_content': [ + 'sha1', 'sha1_git', 'sha256', 'blake2s256', 'length', 'ctime', + 'status', 'reason', + ], + 'origin': ['type', 'url'], + 'origin_visit': ['type', 'url', 'fetch_date', 'visit_date'], + 'directory': ['id'], + 'revision': ['id'], + 'release': ['id'], -def entry_to_bytes(entry): - """Convert an entry coming from the database to bytes""" - if isinstance(entry, memoryview): - return entry.tobytes() - if isinstance(entry, tuple): - return [entry_to_bytes(value) for value in entry] - return entry +} def fetch(db_conn, obj_type): """Fetch all obj_type's identifiers from db. This opens one connection, stream objects and when done, close the connection. Raises: ValueError if obj_type is not supported Yields: Identifiers for the specific object_type """ - primary_key = TYPE_TO_PRIMARY_KEY.get(obj_type) - if not primary_key: + columns = TYPE_TO_COLUMNS.get(obj_type) + if not columns: raise ValueError('The object type %s is not supported. ' 'Only possible values are %s' % ( obj_type, TYPE_TO_PRIMARY_KEY.keys())) - primary_key_str = ','.join(primary_key) + columns_str = ','.join(columns) query = 'select %s from %s order by %s' % ( - primary_key_str, obj_type, primary_key_str) + columns_str, obj_type, columns_str) server_side_cursor_name = 'swh.journal.%s' % obj_type - with psycopg2.connect(db_conn) as db: - cursor = db.cursor(name=server_side_cursor_name) - cursor.execute(query) - for o in cursor: - yield dict(zip(primary_key, entry_to_bytes(o))) + def cursor_setup(conn, server_side_cursor_name): + """Setup cursor to return dict of data""" + # cur = conn.cursor(name=server_side_cursor_name) + cur = conn.cursor() + cur.execute("SELECT null::bytea, null::bytea[]") + bytea_oid = cur.description[0][1] + bytea_array_oid = cur.description[1][1] + t_bytes = psycopg2.extensions.new_type( + (bytea_oid,), "bytea", typecast_bytea) + psycopg2.extensions.register_type(t_bytes, conn) -DEFAULT_CONFIG = { - 'brokers': ('list[str]', ['getty.internal.softwareheritage.org']), - 'temporary_prefix': ('str', 'swh.tmp_journal.new'), - 'publisher_id': ('str', 'swh.journal.publisher.test'), - 'object_types': ('list[str]', ['content', 'revision', 'release']), - 'storage_dbconn': ('str', 'service=swh-dev'), -} + t_bytes_array = psycopg2.extensions.new_array_type( + (bytea_array_oid,), "bytea[]", t_bytes) + psycopg2.extensions.register_type(t_bytes_array, conn) + + return cur + + logging.basicConfig(level=logging.DEBUG) + with psycopg2.connect(db_conn) as conn: + cursor = cursor_setup(conn, server_side_cursor_name) + cursor.execute(query) + component_keys = TYPE_TO_PRIMARY_KEY[obj_type] + logging.debug('component_keys: %s' % component_keys) + for row in cursor: + record = dict(zip(columns, row)) + logging.debug('record: %s' % record) + logging.debug('keys: %s' % record.keys()) + composite_key = tuple((record[k] for k in component_keys)) + logging.debug(composite_key) + yield composite_key, record MANDATORY_KEYS = [ - 'brokers', 'temporary_prefix', 'publisher_id', 'object_types', - 'storage_dbconn', + 'brokers', 'object_types', 'storage_dbconn', + 'final_prefix', 'client_id', ] class JournalBackfiller: """Class in charge of reading the storage's objects and sends those back to the publisher queue. This is designed to be run periodically. """ def __init__(self, config=None): self.config = config self.check_config(config) self.object_types = self.config['object_types'] self.storage_dbconn = self.config['storage_dbconn'] self.producer = KafkaProducer( bootstrap_servers=config['brokers'], + key_serializer=key_to_kafka, value_serializer=key_to_kafka, - client_id=config['publisher_id'], + client_id=config['client_id'], ) def check_config(self, config): missing_keys = [] for key in MANDATORY_KEYS: if not config.get(key): missing_keys.append(key) if missing_keys: raise ValueError( 'Configuration error: The following keys must be' ' provided: %s' % (','.join(missing_keys), )) object_types = config['object_types'] for obj_type in object_types: if obj_type not in TYPE_TO_PRIMARY_KEY: raise ValueError('The object type %s is not supported. ' 'Possible values are %s' % ( obj_type, ', '.join(TYPE_TO_PRIMARY_KEY))) def _read_storage(self): """Read storage's objects and generates tuple as object_type, dict of object. Yields: tuple of object_type, object as dict """ for obj_type in self.object_types: - for obj in fetch(self.storage_dbconn, obj_type): - yield obj_type, obj + for obj_key, obj in fetch(self.storage_dbconn, obj_type): + yield obj_type, obj_key, obj def run(self): """Reads storage's subscribed object types and send them to the publisher's reading queue. """ - for obj_type, obj in self._read_storage(): - topic = '%s.%s' % (self.config['temporary_prefix'], obj_type) - self.producer.send(topic, value=obj) + for obj_type, obj_key, obj in self._read_storage(): + topic = '%s.%s' % (self.config['final_prefix'], obj_type) + logging.debug('topic: %s, key: %s, value: %s' % ( + topic, obj_key, obj)) + self.producer.send(topic, key=obj_key, value=obj) if __name__ == '__main__': print('Please use the "swh-journal backfiller run" command') diff --git a/swh/journal/tests/test_backfill.py b/swh/journal/tests/test_backfill.py index 1b37f7b..7b8ddee 100644 --- a/swh/journal/tests/test_backfill.py +++ b/swh/journal/tests/test_backfill.py @@ -1,42 +1,42 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from swh.journal.backfill import JournalBackfiller, TYPE_TO_PRIMARY_KEY TEST_CONFIG = { 'brokers': ['localhost'], - 'temporary_prefix': 'swh.tmp_journal.new', - 'publisher_id': 'swh.journal.publisher.test', + 'final_prefix': 'swh.tmp_journal.new', + 'client_id': 'swh.journal.publisher.test', 'object_types': ['content', 'revision', 'release'], 'storage_dbconn': 'service=swh-dev', } def test_config_ko_missing_mandatory_key(): for key in TEST_CONFIG.keys(): config = TEST_CONFIG.copy() config.pop(key) with pytest.raises(ValueError) as e: JournalBackfiller(config) error = ('Configuration error: The following keys must be' ' provided: %s' % (','.join([key]), )) assert e.value.args[0] == error def test_config_ko_unknown_object_type(): wrong_config = TEST_CONFIG.copy() wrong_config['object_types'] = ['something-wrong'] with pytest.raises(ValueError) as e: JournalBackfiller(wrong_config) error = ('The object type something-wrong is not supported. ' 'Possible values are %s' % ( ', '.join(TYPE_TO_PRIMARY_KEY))) assert e.value.args[0] == error