diff --git a/swh/journal/backend.py b/swh/journal/backend.py index a9966b2..9773a7a 100644 --- a/swh/journal/backend.py +++ b/swh/journal/backend.py @@ -1,55 +1,69 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import psycopg2 import psycopg2.extras def entry_to_bytes(entry): """Convert an entry coming from the database to bytes""" if isinstance(entry, memoryview): return entry.tobytes() - if isinstance(entry, list): + if isinstance(entry, tuple): return [entry_to_bytes(value) for value in entry] return entry class Backend: """Backend for Software Heritage object identifiers batch retrieval. The need is to retrieve all the identifiers per object type fast (stream). For this, the implementation is using: - server side cursor - one db connection per object type """ _map_type_primary_key = { - 'origin': 'id', - 'content': 'sha1', - 'directory': 'id', - 'revision': 'id', - 'release': 'id', + 'origin': ['id'], + 'content': ['sha1'], + 'directory': ['id'], + 'revision': ['id'], + 'release': ['id'], + 'origin_visit': ['origin', 'visit'], + 'skipped_content': ['sha1', 'sha1_git', 'sha256'], } def __init__(self, db_conn): self.db_conn = db_conn def fetch(self, obj_type): - """""" + """Fetch all obj_type's identifiers from db. + + This opens one connection, stream objects and when done, close + the connection. + + Raises: + ValueError if obj_type is not supported + + Yields: + Identifiers for the specific object_type + + """ primary_key = self._map_type_primary_key.get(obj_type) if not primary_key: raise ValueError('The object type %s is not supported. ' 'Only possible values are %s' % ( obj_type, self._map_type_primary_key.keys())) + primary_key_str = ','.join(primary_key) query = 'select %s from %s order by %s' % ( - primary_key, obj_type, primary_key) + primary_key_str, obj_type, primary_key_str) server_side_cursor_name = 'swh.journal.%s' % obj_type - with psycopg2.connect(dsn=self.db_conn) as db: + with psycopg2.connect(self.db_conn) as db: cursor = db.cursor(name=server_side_cursor_name) cursor.execute(query) for o in cursor: - yield entry_to_bytes(o[0]) + yield dict(zip(primary_key, entry_to_bytes(o))) diff --git a/swh/journal/checker.py b/swh/journal/checker.py index 81e05be..69d39a4 100644 --- a/swh/journal/checker.py +++ b/swh/journal/checker.py @@ -1,86 +1,93 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Module defining journal checker classes. Those checker goal is to send back all, or missing objects from the journal queues. At the moment, a first naive implementation is the SWHSimpleCheckerProducer. It simply reads the objects from the storage and sends every object identifier back to the journal. """ from kafka import KafkaProducer from swh.core.config import SWHConfig from .backend import Backend from .serializers import value_to_kafka SUPPORTED_OBJECT_TYPES = set([ - 'origin', 'content', 'directory', 'revision', 'release']) + 'origin', + 'content', + 'directory', + 'revision', + 'release', + 'origin_visit', + 'skipped_content', +]) class SWHJournalSimpleCheckerProducer(SWHConfig): """Class in charge of reading the storage's objects and sends those back to the publisher queue. This is designed to be run periodically. """ DEFAULT_CONFIG = { 'brokers': ('list[str]', ['getty.internal.softwareheritage.org']), 'writing_prefix': ('str', 'swh.tmp_journal.new'), 'publisher_id': ('str', 'swh.journal.publisher.test'), 'object_types': ('list[str]', ['content', 'revision', 'release']), 'storage_dbconn': ('str', 'service=swh-dev'), } CONFIG_BASE_FILENAME = 'journal/checker' def __init__(self, extra_configuration=None): self.config = config = self.parse_config_file() if extra_configuration: config.update(extra_configuration) self.object_types = self.config['object_types'] for obj_type in self.object_types: if obj_type not in SUPPORTED_OBJECT_TYPES: raise ValueError('The object type %s is not supported. ' 'Possible values are %s' % ( obj_type, SUPPORTED_OBJECT_TYPES)) self.storage_backend = Backend(self.config['storage_dbconn']) self.producer = KafkaProducer( bootstrap_servers=config['brokers'], value_serializer=value_to_kafka, client_id=config['publisher_id'], ) def _read_storage(self): """Read all the storage's objects and returns as dict of object_types, set of identifiers. """ for obj_type in self.object_types: - for obj_id in self.storage_backend.fetch(obj_type): - yield obj_type, obj_id + for obj in self.storage_backend.fetch(obj_type): + yield obj_type, obj def run(self): """Reads storage's subscribed object types and send them all back to the publisher queue. """ - for obj_type, obj_id in self._read_storage(): + for obj_type, obj in self._read_storage(): topic = '%s.%s' % (self.config['writing_prefix'], obj_type) - self.producer.send(topic, value=obj_id) + self.producer.send(topic, value=obj) if __name__ == '__main__': SWHJournalSimpleCheckerProducer().run()