diff --git a/swh/journal/backend.py b/swh/journal/backend.py index 1db0d2c..a9966b2 100644 --- a/swh/journal/backend.py +++ b/swh/journal/backend.py @@ -1,129 +1,55 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from functools import wraps - import psycopg2 import psycopg2.extras def entry_to_bytes(entry): """Convert an entry coming from the database to bytes""" if isinstance(entry, memoryview): return entry.tobytes() if isinstance(entry, list): return [entry_to_bytes(value) for value in entry] return entry -def line_to_bytes(line): - """Convert a line coming from the database to bytes""" - if not line: - return line - if isinstance(line, dict): - return {k: entry_to_bytes(v) for k, v in line.items()} - return line.__class__(entry_to_bytes(entry) for entry in line) - - -def cursor_to_bytes(cursor): - """Yield all the data from a cursor as bytes""" - yield from (line_to_bytes(line) for line in cursor) - - -def autocommit(fn): - @wraps(fn) - def wrapped(self, *args, **kwargs): - autocommit = False - if 'cursor' not in kwargs or not kwargs['cursor']: - autocommit = True - kwargs['cursor'] = self.cursor() - - try: - ret = fn(self, *args, **kwargs) - except: - if autocommit: - self.rollback() - raise - - if autocommit: - self.commit() - - return ret - - return wrapped - - class Backend: - """Backend for querying Software Heritage object identifiers. + """Backend for Software Heritage object identifiers batch retrieval. + + The need is to retrieve all the identifiers per object type fast (stream). + For this, the implementation is using: + - server side cursor + - one db connection per object type """ + _map_type_primary_key = { + 'origin': 'id', + 'content': 'sha1', + 'directory': 'id', + 'revision': 'id', + 'release': 'id', + } def __init__(self, db_conn): self.db_conn = db_conn - self.db = None - self.reconnect() - - def reconnect(self): - if not self.db or self.db.closed: - self.db = psycopg2.connect(dsn=self.db_conn) - - def cursor(self): - """Return a fresh cursor on the database, with auto-reconnection in - case of failure - - """ - cur = None - - # Get a fresh cursor and reconnect at most three times - tries = 0 - while True: - tries += 1 - try: - cur = self.db.cursor() - cur.execute('select 1') - break - except psycopg2.OperationalError: - if tries < 3: - self.reconnect() - else: - raise - - return cur - - def commit(self): - """Commit a transaction - - """ - self.db.commit() - - def rollback(self): - """Rollback a transaction - - """ - self.db.rollback() - - @autocommit - def content_get_ids(self, cursor=None): - cursor.execute('select sha1 from content') - for c in cursor_to_bytes(cursor): - yield c[0] - - @autocommit - def origin_get_ids(self, cursor=None): - cursor.execute('select id from origin') - for o in cursor_to_bytes(cursor): - yield o[0] - - @autocommit - def revision_get_ids(self, cursor=None): - cursor.execute('select id from revision') - for r in cursor_to_bytes(cursor): - yield r[0] - @autocommit - def release_get_ids(self, cursor=None): - cursor.execute('select id from release') - for r in cursor_to_bytes(cursor): - yield r[0] + def fetch(self, obj_type): + """""" + primary_key = self._map_type_primary_key.get(obj_type) + if not primary_key: + raise ValueError('The object type %s is not supported. ' + 'Only possible values are %s' % ( + obj_type, self._map_type_primary_key.keys())) + + query = 'select %s from %s order by %s' % ( + primary_key, obj_type, primary_key) + server_side_cursor_name = 'swh.journal.%s' % obj_type + + with psycopg2.connect(dsn=self.db_conn) as db: + cursor = db.cursor(name=server_side_cursor_name) + cursor.execute(query) + for o in cursor: + yield entry_to_bytes(o[0]) diff --git a/swh/journal/checker.py b/swh/journal/checker.py index a52dbe8..81e05be 100644 --- a/swh/journal/checker.py +++ b/swh/journal/checker.py @@ -1,82 +1,86 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Module defining journal checker classes. Those checker goal is to send back all, or missing objects from the journal queues. At the moment, a first naive implementation is the SWHSimpleCheckerProducer. It simply reads the objects from the storage and sends every object identifier back to the journal. """ from kafka import KafkaProducer from swh.core.config import SWHConfig from .backend import Backend from .serializers import value_to_kafka +SUPPORTED_OBJECT_TYPES = set([ + 'origin', 'content', 'directory', 'revision', 'release']) + + class SWHJournalSimpleCheckerProducer(SWHConfig): """Class in charge of reading the storage's objects and sends those back to the publisher queue. This is designed to be run periodically. """ DEFAULT_CONFIG = { 'brokers': ('list[str]', ['getty.internal.softwareheritage.org']), - 'writing_prefix': ('str', 'swh.journal.objects'), + 'writing_prefix': ('str', 'swh.tmp_journal.new'), 'publisher_id': ('str', 'swh.journal.publisher.test'), 'object_types': ('list[str]', ['content', 'revision', 'release']), 'storage_dbconn': ('str', 'service=swh-dev'), } CONFIG_BASE_FILENAME = 'journal/checker' def __init__(self, extra_configuration=None): self.config = config = self.parse_config_file() if extra_configuration: config.update(extra_configuration) + self.object_types = self.config['object_types'] + for obj_type in self.object_types: + if obj_type not in SUPPORTED_OBJECT_TYPES: + raise ValueError('The object type %s is not supported. ' + 'Possible values are %s' % ( + obj_type, SUPPORTED_OBJECT_TYPES)) + self.storage_backend = Backend(self.config['storage_dbconn']) self.producer = KafkaProducer( bootstrap_servers=config['brokers'], value_serializer=value_to_kafka, client_id=config['publisher_id'], ) - self.object_read_fn = { - 'content': self.storage_backend.content_get_ids, - 'origin': self.storage_backend.origin_get_ids, - 'revision': self.storage_backend.revision_get_ids, - 'release': self.storage_backend.release_get_ids, - } - def _read_storage(self): """Read all the storage's objects and returns as dict of object_types, set of identifiers. """ - for obj_type in self.config['object_types']: - for obj_id in self.object_read_fn[obj_type](): + for obj_type in self.object_types: + for obj_id in self.storage_backend.fetch(obj_type): yield obj_type, obj_id def run(self): """Reads storage's subscribed object types and send them all back to the publisher queue. """ for obj_type, obj_id in self._read_storage(): topic = '%s.%s' % (self.config['writing_prefix'], obj_type) self.producer.send(topic, value=obj_id) if __name__ == '__main__': SWHJournalSimpleCheckerProducer().run()