diff --git a/debian/control b/debian/control --- a/debian/control +++ b/debian/control @@ -8,6 +8,7 @@ python3-nose, python3-setuptools, python3-swh.core, + python3-swh.model, python3-swh.storage, python3-vcversioner Standards-Version: 3.9.6 diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -0,0 +1 @@ +swh.model >= 0.0.14 diff --git a/swh/journal/backend.py b/swh/journal/backend.py new file mode 100644 --- /dev/null +++ b/swh/journal/backend.py @@ -0,0 +1,129 @@ +# Copyright (C) 2017 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from functools import wraps + +import psycopg2 +import psycopg2.extras + + +def entry_to_bytes(entry): + """Convert an entry coming from the database to bytes""" + if isinstance(entry, memoryview): + return entry.tobytes() + if isinstance(entry, list): + return [entry_to_bytes(value) for value in entry] + return entry + + +def line_to_bytes(line): + """Convert a line coming from the database to bytes""" + if not line: + return line + if isinstance(line, dict): + return {k: entry_to_bytes(v) for k, v in line.items()} + return line.__class__(entry_to_bytes(entry) for entry in line) + + +def cursor_to_bytes(cursor): + """Yield all the data from a cursor as bytes""" + yield from (line_to_bytes(line) for line in cursor) + + +def autocommit(fn): + @wraps(fn) + def wrapped(self, *args, **kwargs): + autocommit = False + if 'cursor' not in kwargs or not kwargs['cursor']: + autocommit = True + kwargs['cursor'] = self.cursor() + + try: + ret = fn(self, *args, **kwargs) + except: + if autocommit: + self.rollback() + raise + + if autocommit: + self.commit() + + return ret + + return wrapped + + +class Backend: + """Backend for querying Software Heritage object identifiers. + + """ + + def __init__(self, db_conn): + self.db_conn = db_conn + self.db = None + self.reconnect() + + def reconnect(self): + if not self.db or self.db.closed: + self.db = psycopg2.connect(dsn=self.db_conn) + + def cursor(self): + """Return a fresh cursor on the database, with auto-reconnection in + case of failure + + """ + cur = None + + # Get a fresh cursor and reconnect at most three times + tries = 0 + while True: + tries += 1 + try: + cur = self.db.cursor() + cur.execute('select 1') + break + except psycopg2.OperationalError: + if tries < 3: + self.reconnect() + else: + raise + + return cur + + def commit(self): + """Commit a transaction + + """ + self.db.commit() + + def rollback(self): + """Rollback a transaction + + """ + self.db.rollback() + + @autocommit + def content_get_ids(self, cursor=None): + cursor.execute('select sha1 from content') + for c in cursor_to_bytes(cursor): + yield c[0] + + @autocommit + def origin_get_ids(self, cursor=None): + cursor.execute('select id from origin') + for o in cursor_to_bytes(cursor): + yield o[0] + + @autocommit + def revision_get_ids(self, cursor=None): + cursor.execute('select id from revision') + for r in cursor_to_bytes(cursor): + yield r[0] + + @autocommit + def release_get_ids(self, cursor=None): + cursor.execute('select id from release') + for r in cursor_to_bytes(cursor): + yield r[0] diff --git a/swh/journal/checker.py b/swh/journal/checker.py new file mode 100644 --- /dev/null +++ b/swh/journal/checker.py @@ -0,0 +1,158 @@ +# Copyright (C) 2017 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""Module defining a class in charge of computing the missing objects +from the journal queues and sending those back to the queues. + +""" + +from collections import defaultdict +from kafka import KafkaProducer, KafkaConsumer + +from swh.core.config import SWHConfig +from .backend import Backend +from .serializers import kafka_to_value, value_to_kafka + + +# Dict from object to its identifier +OBJECT_TO_ID_FN = { + 'content': lambda c: c[b'sha1'], + 'origin': lambda o: o[b'id'], + 'revision': lambda r: r[b'id'], + 'release': lambda r: r[b'id'], +} + + +class SWHJournalChecker(SWHConfig): + """Class in charge of computing a diff against list of objects and the + actual swh-storage's objects. The missing objects resulting + from that diff are queued back in the publisher's queues. + + This is designed to be run periodically. + + """ + DEFAULT_CONFIG = { + 'brokers': ('list[str]', ['getty.internal.softwareheritage.org']), + + 'reading_prefix': ('str', 'swh.journal.objects'), + 'writing_prefix': ('str', 'swh.journal.objects'), + + 'consumer_id': ('str', 'swh.journal.publisher.test'), + 'publisher_id': ('str', 'swh.journal.publisher.test'), + + 'object_types': ('list[str]', ['content', 'revision', 'release']), + 'diff_journal': ('bool', False), + + 'storage_dbconn': ('str', 'service=swh-dev'), + } + + CONFIG_BASE_FILENAME = 'journal/checker' + + def __init__(self, extra_configuration=None): + self.config = config = self.parse_config_file() + if extra_configuration: + config.update(extra_configuration) + + self.storage_backend = Backend(self.config['storage_dbconn']) + + self.consumer = KafkaConsumer( + bootstrap_servers=config['brokers'], + value_deserializer=kafka_to_value, + auto_offset_reset='earliest', + enable_auto_commit=False, + group_id=config['consumer_id'], + ) + + self.producer = KafkaProducer( + bootstrap_servers=config['brokers'], + value_serializer=value_to_kafka, + client_id=config['publisher_id'], + ) + + self.diff_journal = self.config['diff_journal'] + + self.object_read_fn = { + 'content': self.storage_backend.content_get_ids, + 'origin': self.storage_backend.origin_get_ids, + 'revision': self.storage_backend.revision_get_ids, + 'release': self.storage_backend.release_get_ids, + } + + if self.diff_journal: + self.consumer.subscribe( + topics=['%s.%s' % (config['reading_prefix'], obj_type) + for obj_type in config['object_types']], + ) + + def _read_journal(self): + """Read all the journal objects and returns as a dict of obj_type, + set of identifiers. + + """ + journal_objs = defaultdict(set) + for message in self.consumer: + obj_type = message.topic.split('.')[-1] + obj_id = OBJECT_TO_ID_FN[obj_type](message.value) + journal_objs[obj_type].add(obj_id) + + return journal_objs + + def _read_storage(self): + """Read all the storage's objects and returns as dict of object_types, + set of identifiers. + + """ + storage_objs = {} + for obj_type in self.config['object_types']: + storage_objs[obj_type] = set(self.object_read_fn[obj_type]()) + + return storage_objs + + def _compute_diff(self, storage_objs, journal_objs): + """Compute the difference between storage_objects and journal_objects. + + Args: + storage_objects (dict): objects from storage, key is the + type, value is the set of ids for + that type. + journal_objects (dict): objects from journal, key is the + type, value is the set of ids for + that type. + + Returns: + dict of difference for each object_type + + """ + objects = {} + for obj_type in self.config['object_types']: + objects[obj_type] = storage_objs[obj_type] - journal_objs[obj_type] + + return objects + + def run(self): + """Reads storage's subscribed object types and send them all back to + the publisher queue. + + Optionally, reads journal's objects and compute the + difference. The missing objects present in the storage and + missing from the journal are sent back to the journal. + + """ + storage_objs = self._read_storage() + + if self.diff_journal: + journal_objs = self._read_journal() + objects = self._compute_diffs(storage_objs, journal_objs) + else: + objects = storage_objs + + for obj_type, objs in objects.items(): + topic = '%s.%s' % (self.config['writing_prefix'], obj_type) + for obj_id in objs: + self.producer.send(topic, value=obj_id) + + +if __name__ == '__main__': + SWHJournalChecker().run()