diff --git a/swh/journal/backend.py b/swh/journal/backend.py new file mode 100644 --- /dev/null +++ b/swh/journal/backend.py @@ -0,0 +1,55 @@ +# Copyright (C) 2017 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import psycopg2 +import psycopg2.extras + + +def entry_to_bytes(entry): + """Convert an entry coming from the database to bytes""" + if isinstance(entry, memoryview): + return entry.tobytes() + if isinstance(entry, list): + return [entry_to_bytes(value) for value in entry] + return entry + + +class Backend: + """Backend for Software Heritage object identifiers batch retrieval. + + The need is to retrieve all the identifiers per object type fast (stream). + For this, the implementation is using: + - server side cursor + - one db connection per object type + + """ + _map_type_primary_key = { + 'origin': 'id', + 'content': 'sha1', + 'directory': 'id', + 'revision': 'id', + 'release': 'id', + } + + def __init__(self, db_conn): + self.db_conn = db_conn + + def fetch(self, obj_type): + """""" + primary_key = self._map_type_primary_key.get(obj_type) + if not primary_key: + raise ValueError('The object type %s is not supported. ' + 'Only possible values are %s' % ( + obj_type, self._map_type_primary_key.keys())) + + query = 'select %s from %s order by %s' % ( + primary_key, obj_type, primary_key) + server_side_cursor_name = 'swh.journal.%s' % obj_type + + with psycopg2.connect(dsn=self.db_conn) as db: + cursor = db.cursor(name=server_side_cursor_name) + cursor.execute(query) + for o in cursor: + yield entry_to_bytes(o[0]) diff --git a/swh/journal/checker.py b/swh/journal/checker.py new file mode 100644 --- /dev/null +++ b/swh/journal/checker.py @@ -0,0 +1,86 @@ +# Copyright (C) 2017 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""Module defining journal checker classes. + +Those checker goal is to send back all, or missing objects from the +journal queues. + +At the moment, a first naive implementation is the +SWHSimpleCheckerProducer. It simply reads the objects from the +storage and sends every object identifier back to the journal. + +""" + +from kafka import KafkaProducer + +from swh.core.config import SWHConfig +from .backend import Backend +from .serializers import value_to_kafka + + +SUPPORTED_OBJECT_TYPES = set([ + 'origin', 'content', 'directory', 'revision', 'release']) + + +class SWHJournalSimpleCheckerProducer(SWHConfig): + """Class in charge of reading the storage's objects and sends those + back to the publisher queue. + + This is designed to be run periodically. + + """ + DEFAULT_CONFIG = { + 'brokers': ('list[str]', ['getty.internal.softwareheritage.org']), + 'writing_prefix': ('str', 'swh.tmp_journal.new'), + 'publisher_id': ('str', 'swh.journal.publisher.test'), + 'object_types': ('list[str]', ['content', 'revision', 'release']), + 'storage_dbconn': ('str', 'service=swh-dev'), + } + + CONFIG_BASE_FILENAME = 'journal/checker' + + def __init__(self, extra_configuration=None): + self.config = config = self.parse_config_file() + if extra_configuration: + config.update(extra_configuration) + + self.object_types = self.config['object_types'] + for obj_type in self.object_types: + if obj_type not in SUPPORTED_OBJECT_TYPES: + raise ValueError('The object type %s is not supported. ' + 'Possible values are %s' % ( + obj_type, SUPPORTED_OBJECT_TYPES)) + + self.storage_backend = Backend(self.config['storage_dbconn']) + + self.producer = KafkaProducer( + bootstrap_servers=config['brokers'], + value_serializer=value_to_kafka, + client_id=config['publisher_id'], + ) + + def _read_storage(self): + """Read all the storage's objects and returns as dict of object_types, + set of identifiers. + + """ + for obj_type in self.object_types: + for obj_id in self.storage_backend.fetch(obj_type): + yield obj_type, obj_id + + def run(self): + """Reads storage's subscribed object types and send them all back to + the publisher queue. + + """ + + for obj_type, obj_id in self._read_storage(): + topic = '%s.%s' % (self.config['writing_prefix'], obj_type) + self.producer.send(topic, value=obj_id) + + +if __name__ == '__main__': + SWHJournalSimpleCheckerProducer().run()