diff --git a/swh/journal/checker.py b/swh/journal/checker.py new file mode 100644 --- /dev/null +++ b/swh/journal/checker.py @@ -0,0 +1,137 @@ +# Copyright (C) 2017 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""Module defining journal checker classes. + +Those checker goal is to send back all, or missing objects from the +journal queues. + +At the moment, a first naive implementation is the +SWHSimpleCheckerProducer. It simply reads the objects from the +storage and sends every object identifier back to the journal. + +""" + +import psycopg2 + +from kafka import KafkaProducer + +from swh.core.config import SWHConfig +from .serializers import value_to_kafka + + +TYPE_TO_PRIMARY_KEY = { + 'origins': ['id'], + 'content': ['sha1'], + 'directory': ['id'], + 'revision': ['id'], + 'release': ['id'], + 'origin_visit': ['origin', 'visit'], + 'skipped_content': ['sha1', 'sha1_git', 'sha256'], +} + + +def entry_to_bytes(entry): + """Convert an entry coming from the database to bytes""" + if isinstance(entry, memoryview): + return entry.tobytes() + if isinstance(entry, tuple): + return [entry_to_bytes(value) for value in entry] + return entry + + +def fetch(db_conn, obj_type): + """Fetch all obj_type's identifiers from db. + + This opens one connection, stream objects and when done, close + the connection. + + Raises: + ValueError if obj_type is not supported + + Yields: + Identifiers for the specific object_type + + """ + primary_key = TYPE_TO_PRIMARY_KEY.get(obj_type) + if not primary_key: + raise ValueError('The object type %s is not supported. ' + 'Only possible values are %s' % ( + obj_type, TYPE_TO_PRIMARY_KEY.keys())) + + primary_key_str = ','.join(primary_key) + query = 'select %s from %s order by %s' % ( + primary_key_str, obj_type, primary_key_str) + server_side_cursor_name = 'swh.journal.%s' % obj_type + + with psycopg2.connect(db_conn) as db: + cursor = db.cursor(name=server_side_cursor_name) + cursor.execute(query) + for o in cursor: + yield dict(zip(primary_key, entry_to_bytes(o))) + + +class SWHJournalSimpleCheckerProducer(SWHConfig): + """Class in charge of reading the storage's objects and sends those + back to the publisher queue. + + This is designed to be run periodically. + + """ + DEFAULT_CONFIG = { + 'brokers': ('list[str]', ['getty.internal.softwareheritage.org']), + 'temporary_prefix': ('str', 'swh.tmp_journal.new'), + 'publisher_id': ('str', 'swh.journal.publisher.test'), + 'object_types': ('list[str]', ['content', 'revision', 'release']), + 'storage_dbconn': ('str', 'service=swh-dev'), + } + + CONFIG_BASE_FILENAME = 'journal/checker' + + def __init__(self, extra_configuration=None): + self.config = config = self.parse_config_file() + if extra_configuration: + config.update(extra_configuration) + + self.object_types = self.config['object_types'] + for obj_type in self.object_types: + if obj_type not in TYPE_TO_PRIMARY_KEY: + raise ValueError('The object type %s is not supported. ' + 'Possible values are %s' % ( + obj_type, + ', '.join(TYPE_TO_PRIMARY_KEY))) + + self.storage_dbconn = self.config['storage_dbconn'] + + self.producer = KafkaProducer( + bootstrap_servers=config['brokers'], + value_serializer=value_to_kafka, + client_id=config['publisher_id'], + ) + + def _read_storage(self): + """Read storage's objects and generates tuple as object_type, dict of + object. + + Yields: + tuple of object_type, object as dict + + """ + for obj_type in self.object_types: + for obj in fetch(self.storage_dbconn, obj_type): + yield obj_type, obj + + def run(self): + """Reads storage's subscribed object types and send them to the + publisher's reading queue. + + """ + for obj_type, obj in self._read_storage(): + topic = '%s.%s' % (self.config['temporary_prefix'], obj_type) + self.producer.send(topic, value=obj) + + +if __name__ == '__main__': + SWHJournalSimpleCheckerProducer().run() diff --git a/swh/journal/publisher.py b/swh/journal/publisher.py --- a/swh/journal/publisher.py +++ b/swh/journal/publisher.py @@ -103,15 +103,16 @@ self.producer.flush() def process_contents(self, content_objs): - metadata = self.storage.content_get_metadata(content_objs) + metadata = self.storage.content_get_metadata( + (c[b'sha1'] for c in content_objs)) return [(content['sha1'], content) for content in metadata] def process_revisions(self, revision_objs): - metadata = self.storage.revision_get(revision_objs) + metadata = self.storage.revision_get((r[b'id'] for r in revision_objs)) return [(revision['id'], revision) for revision in metadata] def process_releases(self, release_objs): - metadata = self.storage.release_get(release_objs) + metadata = self.storage.release_get((r[b'id'] for r in release_objs)) return [(release['id'], release) for release in metadata]