diff --git a/swh/journal/backend.py b/swh/journal/backend.py new file mode 100644 --- /dev/null +++ b/swh/journal/backend.py @@ -0,0 +1,129 @@ +# Copyright (C) 2017 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from functools import wraps + +import psycopg2 +import psycopg2.extras + + +def entry_to_bytes(entry): + """Convert an entry coming from the database to bytes""" + if isinstance(entry, memoryview): + return entry.tobytes() + if isinstance(entry, list): + return [entry_to_bytes(value) for value in entry] + return entry + + +def line_to_bytes(line): + """Convert a line coming from the database to bytes""" + if not line: + return line + if isinstance(line, dict): + return {k: entry_to_bytes(v) for k, v in line.items()} + return line.__class__(entry_to_bytes(entry) for entry in line) + + +def cursor_to_bytes(cursor): + """Yield all the data from a cursor as bytes""" + yield from (line_to_bytes(line) for line in cursor) + + +def autocommit(fn): + @wraps(fn) + def wrapped(self, *args, **kwargs): + autocommit = False + if 'cursor' not in kwargs or not kwargs['cursor']: + autocommit = True + kwargs['cursor'] = self.cursor() + + try: + ret = fn(self, *args, **kwargs) + except: + if autocommit: + self.rollback() + raise + + if autocommit: + self.commit() + + return ret + + return wrapped + + +class Backend: + """Backend for querying Software Heritage object identifiers. + + """ + + def __init__(self, db_conn): + self.db_conn = db_conn + self.db = None + self.reconnect() + + def reconnect(self): + if not self.db or self.db.closed: + self.db = psycopg2.connect(dsn=self.db_conn) + + def cursor(self): + """Return a fresh cursor on the database, with auto-reconnection in + case of failure + + """ + cur = None + + # Get a fresh cursor and reconnect at most three times + tries = 0 + while True: + tries += 1 + try: + cur = self.db.cursor() + cur.execute('select 1') + break + except psycopg2.OperationalError: + if tries < 3: + self.reconnect() + else: + raise + + return cur + + def commit(self): + """Commit a transaction + + """ + self.db.commit() + + def rollback(self): + """Rollback a transaction + + """ + self.db.rollback() + + @autocommit + def content_get_ids(self, cursor=None): + cursor.execute('select sha1 from content') + for c in cursor_to_bytes(cursor): + yield c[0] + + @autocommit + def origin_get_ids(self, cursor=None): + cursor.execute('select id from origin') + for o in cursor_to_bytes(cursor): + yield o[0] + + @autocommit + def revision_get_ids(self, cursor=None): + cursor.execute('select id from revision') + for r in cursor_to_bytes(cursor): + yield r[0] + + @autocommit + def release_get_ids(self, cursor=None): + cursor.execute('select id from release') + for r in cursor_to_bytes(cursor): + yield r[0] diff --git a/swh/journal/checker.py b/swh/journal/checker.py new file mode 100644 --- /dev/null +++ b/swh/journal/checker.py @@ -0,0 +1,82 @@ +# Copyright (C) 2017 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""Module defining a checker classes. + +Those checkers whose goal is to send back all, or missing objects from +the journal queues. + +At the moment, a first naive implementation is the +SWHSimpleCheckerProducer. It simply reads the objects from the +storage and sends everything back to the journal. + +""" + +from kafka import KafkaProducer + +from swh.core.config import SWHConfig +from .backend import Backend +from .serializers import value_to_kafka + + +class SWHJournalSimpleCheckerProducer(SWHConfig): + """Class in charge reading the storage's objects and sends those back + to the publisher queue. + + This is designed to be run periodically. + + """ + DEFAULT_CONFIG = { + 'brokers': ('list[str]', ['getty.internal.softwareheritage.org']), + 'writing_prefix': ('str', 'swh.journal.objects'), + 'publisher_id': ('str', 'swh.journal.publisher.test'), + 'object_types': ('list[str]', ['content', 'revision', 'release']), + 'storage_dbconn': ('str', 'service=swh-dev'), + } + + CONFIG_BASE_FILENAME = 'journal/checker' + + def __init__(self, extra_configuration=None): + self.config = config = self.parse_config_file() + if extra_configuration: + config.update(extra_configuration) + + self.storage_backend = Backend(self.config['storage_dbconn']) + + self.producer = KafkaProducer( + bootstrap_servers=config['brokers'], + value_serializer=value_to_kafka, + client_id=config['publisher_id'], + ) + + self.object_read_fn = { + 'content': self.storage_backend.content_get_ids, + 'origin': self.storage_backend.origin_get_ids, + 'revision': self.storage_backend.revision_get_ids, + 'release': self.storage_backend.release_get_ids, + } + + def _read_storage(self): + """Read all the storage's objects and returns as dict of object_types, + set of identifiers. + + """ + for obj_type in self.config['object_types']: + for obj_id in self.object_read_fn[obj_type](): + yield obj_type, obj_id + + def run(self): + """Reads storage's subscribed object types and send them all back to + the publisher queue. + + """ + + for obj_type, obj_id in self._read_storage(): + topic = '%s.%s' % (self.config['writing_prefix'], obj_type) + self.producer.send(topic, value=obj_id) + + +if __name__ == '__main__': + SWHJournalSimpleCheckerProducer().run()