diff --git a/swh/journal/backend.py b/swh/journal/backend.py deleted file mode 100644 index 9773a7a..0000000 --- a/swh/journal/backend.py +++ /dev/null @@ -1,69 +0,0 @@ -# Copyright (C) 2017 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import psycopg2 -import psycopg2.extras - - -def entry_to_bytes(entry): - """Convert an entry coming from the database to bytes""" - if isinstance(entry, memoryview): - return entry.tobytes() - if isinstance(entry, tuple): - return [entry_to_bytes(value) for value in entry] - return entry - - -class Backend: - """Backend for Software Heritage object identifiers batch retrieval. - - The need is to retrieve all the identifiers per object type fast (stream). - For this, the implementation is using: - - server side cursor - - one db connection per object type - - """ - _map_type_primary_key = { - 'origin': ['id'], - 'content': ['sha1'], - 'directory': ['id'], - 'revision': ['id'], - 'release': ['id'], - 'origin_visit': ['origin', 'visit'], - 'skipped_content': ['sha1', 'sha1_git', 'sha256'], - } - - def __init__(self, db_conn): - self.db_conn = db_conn - - def fetch(self, obj_type): - """Fetch all obj_type's identifiers from db. - - This opens one connection, stream objects and when done, close - the connection. - - Raises: - ValueError if obj_type is not supported - - Yields: - Identifiers for the specific object_type - - """ - primary_key = self._map_type_primary_key.get(obj_type) - if not primary_key: - raise ValueError('The object type %s is not supported. ' - 'Only possible values are %s' % ( - obj_type, self._map_type_primary_key.keys())) - - primary_key_str = ','.join(primary_key) - query = 'select %s from %s order by %s' % ( - primary_key_str, obj_type, primary_key_str) - server_side_cursor_name = 'swh.journal.%s' % obj_type - - with psycopg2.connect(self.db_conn) as db: - cursor = db.cursor(name=server_side_cursor_name) - cursor.execute(query) - for o in cursor: - yield dict(zip(primary_key, entry_to_bytes(o))) diff --git a/swh/journal/checker.py b/swh/journal/checker.py index 19ffcf6..525c139 100644 --- a/swh/journal/checker.py +++ b/swh/journal/checker.py @@ -1,95 +1,137 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Module defining journal checker classes. Those checker goal is to send back all, or missing objects from the journal queues. At the moment, a first naive implementation is the SWHSimpleCheckerProducer. It simply reads the objects from the storage and sends every object identifier back to the journal. """ +import psycopg2 + from kafka import KafkaProducer from swh.core.config import SWHConfig -from .backend import Backend from .serializers import value_to_kafka -SUPPORTED_OBJECT_TYPES = set([ - 'origin', - 'content', - 'directory', - 'revision', - 'release', - 'origin_visit', - 'skipped_content', -]) +TYPE_TO_PRIMARY_KEY = { + 'origins': ['id'], + 'content': ['sha1'], + 'directory': ['id'], + 'revision': ['id'], + 'release': ['id'], + 'origin_visit': ['origin', 'visit'], + 'skipped_content': ['sha1', 'sha1_git', 'sha256'], +} + + +def entry_to_bytes(entry): + """Convert an entry coming from the database to bytes""" + if isinstance(entry, memoryview): + return entry.tobytes() + if isinstance(entry, tuple): + return [entry_to_bytes(value) for value in entry] + return entry + + +def fetch(db_conn, obj_type): + """Fetch all obj_type's identifiers from db. + + This opens one connection, stream objects and when done, close + the connection. + + Raises: + ValueError if obj_type is not supported + + Yields: + Identifiers for the specific object_type + + """ + primary_key = TYPE_TO_PRIMARY_KEY.get(obj_type) + if not primary_key: + raise ValueError('The object type %s is not supported. ' + 'Only possible values are %s' % ( + obj_type, TYPE_TO_PRIMARY_KEY.keys())) + + primary_key_str = ','.join(primary_key) + query = 'select %s from %s order by %s' % ( + primary_key_str, obj_type, primary_key_str) + server_side_cursor_name = 'swh.journal.%s' % obj_type + + with psycopg2.connect(db_conn) as db: + cursor = db.cursor(name=server_side_cursor_name) + cursor.execute(query) + for o in cursor: + yield dict(zip(primary_key, entry_to_bytes(o))) class SWHJournalSimpleCheckerProducer(SWHConfig): """Class in charge of reading the storage's objects and sends those back to the publisher queue. This is designed to be run periodically. """ DEFAULT_CONFIG = { 'brokers': ('list[str]', ['getty.internal.softwareheritage.org']), 'temporary_prefix': ('str', 'swh.tmp_journal.new'), 'publisher_id': ('str', 'swh.journal.publisher.test'), 'object_types': ('list[str]', ['content', 'revision', 'release']), 'storage_dbconn': ('str', 'service=swh-dev'), } CONFIG_BASE_FILENAME = 'journal/checker' def __init__(self, extra_configuration=None): self.config = config = self.parse_config_file() if extra_configuration: config.update(extra_configuration) self.object_types = self.config['object_types'] for obj_type in self.object_types: - if obj_type not in SUPPORTED_OBJECT_TYPES: + if obj_type not in TYPE_TO_PRIMARY_KEY: raise ValueError('The object type %s is not supported. ' 'Possible values are %s' % ( - obj_type, SUPPORTED_OBJECT_TYPES)) + obj_type, + ', '.join(TYPE_TO_PRIMARY_KEY))) - self.storage_backend = Backend(self.config['storage_dbconn']) + self.storage_dbconn = self.config['storage_dbconn'] self.producer = KafkaProducer( bootstrap_servers=config['brokers'], value_serializer=value_to_kafka, client_id=config['publisher_id'], ) def _read_storage(self): """Read storage's objects and generates tuple as object_type, dict of object. Yields: tuple of object_type, object as dict """ for obj_type in self.object_types: - for obj in self.storage_backend.fetch(obj_type): + for obj in fetch(self.storage_dbconn, obj_type): yield obj_type, obj def run(self): """Reads storage's subscribed object types and send them to the publisher's reading queue. """ for obj_type, obj in self._read_storage(): topic = '%s.%s' % (self.config['temporary_prefix'], obj_type) self.producer.send(topic, value=obj) if __name__ == '__main__': SWHJournalSimpleCheckerProducer().run()