Page MenuHomeSoftware Heritage

D199.id653.diff
No OneTemporary

D199.id653.diff

diff --git a/swh/journal/backend.py b/swh/journal/backend.py
new file mode 100644
--- /dev/null
+++ b/swh/journal/backend.py
@@ -0,0 +1,69 @@
+# Copyright (C) 2017 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import psycopg2
+import psycopg2.extras
+
+
+def entry_to_bytes(entry):
+ """Convert an entry coming from the database to bytes"""
+ if isinstance(entry, memoryview):
+ return entry.tobytes()
+ if isinstance(entry, tuple):
+ return [entry_to_bytes(value) for value in entry]
+ return entry
+
+
+class Backend:
+ """Backend for Software Heritage object identifiers batch retrieval.
+
+ The need is to retrieve all the identifiers per object type fast (stream).
+ For this, the implementation is using:
+ - server side cursor
+ - one db connection per object type
+
+ """
+ _map_type_primary_key = {
+ 'origin': ['id'],
+ 'content': ['sha1'],
+ 'directory': ['id'],
+ 'revision': ['id'],
+ 'release': ['id'],
+ 'origin_visit': ['origin', 'visit'],
+ 'skipped_content': ['sha1', 'sha1_git', 'sha256'],
+ }
+
+ def __init__(self, db_conn):
+ self.db_conn = db_conn
+
+ def fetch(self, obj_type):
+ """Fetch all obj_type's identifiers from db.
+
+ This opens one connection, stream objects and when done, close
+ the connection.
+
+ Raises:
+ ValueError if obj_type is not supported
+
+ Yields:
+ Identifiers for the specific object_type
+
+ """
+ primary_key = self._map_type_primary_key.get(obj_type)
+ if not primary_key:
+ raise ValueError('The object type %s is not supported. '
+ 'Only possible values are %s' % (
+ obj_type, self._map_type_primary_key.keys()))
+
+ primary_key_str = ','.join(primary_key)
+ query = 'select %s from %s order by %s' % (
+ primary_key_str, obj_type, primary_key_str)
+ server_side_cursor_name = 'swh.journal.%s' % obj_type
+
+ with psycopg2.connect(self.db_conn) as db:
+ cursor = db.cursor(name=server_side_cursor_name)
+ cursor.execute(query)
+ for o in cursor:
+ yield dict(zip(primary_key, entry_to_bytes(o)))
diff --git a/swh/journal/checker.py b/swh/journal/checker.py
new file mode 100644
--- /dev/null
+++ b/swh/journal/checker.py
@@ -0,0 +1,95 @@
+# Copyright (C) 2017 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+"""Module defining journal checker classes.
+
+Those checker goal is to send back all, or missing objects from the
+journal queues.
+
+At the moment, a first naive implementation is the
+SWHSimpleCheckerProducer. It simply reads the objects from the
+storage and sends every object identifier back to the journal.
+
+"""
+
+from kafka import KafkaProducer
+
+from swh.core.config import SWHConfig
+from .backend import Backend
+from .serializers import value_to_kafka
+
+
+SUPPORTED_OBJECT_TYPES = set([
+ 'origin',
+ 'content',
+ 'directory',
+ 'revision',
+ 'release',
+ 'origin_visit',
+ 'skipped_content',
+])
+
+
+class SWHJournalSimpleCheckerProducer(SWHConfig):
+ """Class in charge of reading the storage's objects and sends those
+ back to the publisher queue.
+
+ This is designed to be run periodically.
+
+ """
+ DEFAULT_CONFIG = {
+ 'brokers': ('list[str]', ['getty.internal.softwareheritage.org']),
+ 'temporary_prefix': ('str', 'swh.tmp_journal.new'),
+ 'publisher_id': ('str', 'swh.journal.publisher.test'),
+ 'object_types': ('list[str]', ['content', 'revision', 'release']),
+ 'storage_dbconn': ('str', 'service=swh-dev'),
+ }
+
+ CONFIG_BASE_FILENAME = 'journal/checker'
+
+ def __init__(self, extra_configuration=None):
+ self.config = config = self.parse_config_file()
+ if extra_configuration:
+ config.update(extra_configuration)
+
+ self.object_types = self.config['object_types']
+ for obj_type in self.object_types:
+ if obj_type not in SUPPORTED_OBJECT_TYPES:
+ raise ValueError('The object type %s is not supported. '
+ 'Possible values are %s' % (
+ obj_type, SUPPORTED_OBJECT_TYPES))
+
+ self.storage_backend = Backend(self.config['storage_dbconn'])
+
+ self.producer = KafkaProducer(
+ bootstrap_servers=config['brokers'],
+ value_serializer=value_to_kafka,
+ client_id=config['publisher_id'],
+ )
+
+ def _read_storage(self):
+ """Read storage's objects and generates tuple as object_type, dict of
+ object.
+
+ Yields:
+ tuple of object_type, object as dict
+
+ """
+ for obj_type in self.object_types:
+ for obj in self.storage_backend.fetch(obj_type):
+ yield obj_type, obj
+
+ def run(self):
+ """Reads storage's subscribed object types and send them to the
+ publisher's reading queue.
+
+ """
+ for obj_type, obj in self._read_storage():
+ topic = '%s.%s' % (self.config['temporary_prefix'], obj_type)
+ self.producer.send(topic, value=obj)
+
+
+if __name__ == '__main__':
+ SWHJournalSimpleCheckerProducer().run()
diff --git a/swh/journal/publisher.py b/swh/journal/publisher.py
--- a/swh/journal/publisher.py
+++ b/swh/journal/publisher.py
@@ -101,15 +101,16 @@
self.producer.flush()
def process_contents(self, content_objs):
- metadata = self.storage.content_get_metadata(content_objs)
+ metadata = self.storage.content_get_metadata(
+ (c[b'sha1'] for c in content_objs))
return [(content['sha1'], content) for content in metadata]
def process_revisions(self, revision_objs):
- metadata = self.storage.revision_get(revision_objs)
+ metadata = self.storage.revision_get((r[b'id'] for r in revision_objs))
return [(revision['id'], revision) for revision in metadata]
def process_releases(self, release_objs):
- metadata = self.storage.release_get(release_objs)
+ metadata = self.storage.release_get((r[b'id'] for r in release_objs))
return [(release['id'], release) for release in metadata]

File Metadata

Mime Type
text/plain
Expires
Nov 5 2024, 6:23 PM (11 w, 14 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3219172

Event Timeline