Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7066688
D199.id653.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
6 KB
Subscribers
None
D199.id653.diff
View Options
diff --git a/swh/journal/backend.py b/swh/journal/backend.py
new file mode 100644
--- /dev/null
+++ b/swh/journal/backend.py
@@ -0,0 +1,69 @@
+# Copyright (C) 2017 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import psycopg2
+import psycopg2.extras
+
+
+def entry_to_bytes(entry):
+ """Convert an entry coming from the database to bytes"""
+ if isinstance(entry, memoryview):
+ return entry.tobytes()
+ if isinstance(entry, tuple):
+ return [entry_to_bytes(value) for value in entry]
+ return entry
+
+
+class Backend:
+ """Backend for Software Heritage object identifiers batch retrieval.
+
+ The need is to retrieve all the identifiers per object type fast (stream).
+ For this, the implementation is using:
+ - server side cursor
+ - one db connection per object type
+
+ """
+ _map_type_primary_key = {
+ 'origin': ['id'],
+ 'content': ['sha1'],
+ 'directory': ['id'],
+ 'revision': ['id'],
+ 'release': ['id'],
+ 'origin_visit': ['origin', 'visit'],
+ 'skipped_content': ['sha1', 'sha1_git', 'sha256'],
+ }
+
+ def __init__(self, db_conn):
+ self.db_conn = db_conn
+
+ def fetch(self, obj_type):
+ """Fetch all obj_type's identifiers from db.
+
+ This opens one connection, stream objects and when done, close
+ the connection.
+
+ Raises:
+ ValueError if obj_type is not supported
+
+ Yields:
+ Identifiers for the specific object_type
+
+ """
+ primary_key = self._map_type_primary_key.get(obj_type)
+ if not primary_key:
+ raise ValueError('The object type %s is not supported. '
+ 'Only possible values are %s' % (
+ obj_type, self._map_type_primary_key.keys()))
+
+ primary_key_str = ','.join(primary_key)
+ query = 'select %s from %s order by %s' % (
+ primary_key_str, obj_type, primary_key_str)
+ server_side_cursor_name = 'swh.journal.%s' % obj_type
+
+ with psycopg2.connect(self.db_conn) as db:
+ cursor = db.cursor(name=server_side_cursor_name)
+ cursor.execute(query)
+ for o in cursor:
+ yield dict(zip(primary_key, entry_to_bytes(o)))
diff --git a/swh/journal/checker.py b/swh/journal/checker.py
new file mode 100644
--- /dev/null
+++ b/swh/journal/checker.py
@@ -0,0 +1,95 @@
+# Copyright (C) 2017 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+"""Module defining journal checker classes.
+
+Those checker goal is to send back all, or missing objects from the
+journal queues.
+
+At the moment, a first naive implementation is the
+SWHSimpleCheckerProducer. It simply reads the objects from the
+storage and sends every object identifier back to the journal.
+
+"""
+
+from kafka import KafkaProducer
+
+from swh.core.config import SWHConfig
+from .backend import Backend
+from .serializers import value_to_kafka
+
+
+SUPPORTED_OBJECT_TYPES = set([
+ 'origin',
+ 'content',
+ 'directory',
+ 'revision',
+ 'release',
+ 'origin_visit',
+ 'skipped_content',
+])
+
+
+class SWHJournalSimpleCheckerProducer(SWHConfig):
+ """Class in charge of reading the storage's objects and sends those
+ back to the publisher queue.
+
+ This is designed to be run periodically.
+
+ """
+ DEFAULT_CONFIG = {
+ 'brokers': ('list[str]', ['getty.internal.softwareheritage.org']),
+ 'temporary_prefix': ('str', 'swh.tmp_journal.new'),
+ 'publisher_id': ('str', 'swh.journal.publisher.test'),
+ 'object_types': ('list[str]', ['content', 'revision', 'release']),
+ 'storage_dbconn': ('str', 'service=swh-dev'),
+ }
+
+ CONFIG_BASE_FILENAME = 'journal/checker'
+
+ def __init__(self, extra_configuration=None):
+ self.config = config = self.parse_config_file()
+ if extra_configuration:
+ config.update(extra_configuration)
+
+ self.object_types = self.config['object_types']
+ for obj_type in self.object_types:
+ if obj_type not in SUPPORTED_OBJECT_TYPES:
+ raise ValueError('The object type %s is not supported. '
+ 'Possible values are %s' % (
+ obj_type, SUPPORTED_OBJECT_TYPES))
+
+ self.storage_backend = Backend(self.config['storage_dbconn'])
+
+ self.producer = KafkaProducer(
+ bootstrap_servers=config['brokers'],
+ value_serializer=value_to_kafka,
+ client_id=config['publisher_id'],
+ )
+
+ def _read_storage(self):
+ """Read storage's objects and generates tuple as object_type, dict of
+ object.
+
+ Yields:
+ tuple of object_type, object as dict
+
+ """
+ for obj_type in self.object_types:
+ for obj in self.storage_backend.fetch(obj_type):
+ yield obj_type, obj
+
+ def run(self):
+ """Reads storage's subscribed object types and send them to the
+ publisher's reading queue.
+
+ """
+ for obj_type, obj in self._read_storage():
+ topic = '%s.%s' % (self.config['temporary_prefix'], obj_type)
+ self.producer.send(topic, value=obj)
+
+
+if __name__ == '__main__':
+ SWHJournalSimpleCheckerProducer().run()
diff --git a/swh/journal/publisher.py b/swh/journal/publisher.py
--- a/swh/journal/publisher.py
+++ b/swh/journal/publisher.py
@@ -101,15 +101,16 @@
self.producer.flush()
def process_contents(self, content_objs):
- metadata = self.storage.content_get_metadata(content_objs)
+ metadata = self.storage.content_get_metadata(
+ (c[b'sha1'] for c in content_objs))
return [(content['sha1'], content) for content in metadata]
def process_revisions(self, revision_objs):
- metadata = self.storage.revision_get(revision_objs)
+ metadata = self.storage.revision_get((r[b'id'] for r in revision_objs))
return [(revision['id'], revision) for revision in metadata]
def process_releases(self, release_objs):
- metadata = self.storage.release_get(release_objs)
+ metadata = self.storage.release_get((r[b'id'] for r in release_objs))
return [(release['id'], release) for release in metadata]
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Nov 5 2024, 6:23 PM (11 w, 14 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3219172
Attached To
D199: swh.journal.checker: Create a simple journal checker producer
Event Timeline
Log In to Comment