diff --git a/PKG-INFO b/PKG-INFO index 50938cc..33420a6 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.journal -Version: 0.0.1 +Version: 0.0.2 Summary: Software Heritage Journal utilities Home-page: https://forge.softwareheritage.org/diffusion/DJNL/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/PKG-INFO b/swh.journal.egg-info/PKG-INFO similarity index 94% copy from PKG-INFO copy to swh.journal.egg-info/PKG-INFO index 50938cc..33420a6 100644 --- a/PKG-INFO +++ b/swh.journal.egg-info/PKG-INFO @@ -1,10 +1,10 @@ Metadata-Version: 1.0 Name: swh.journal -Version: 0.0.1 +Version: 0.0.2 Summary: Software Heritage Journal utilities Home-page: https://forge.softwareheritage.org/diffusion/DJNL/ Author: Software Heritage developers Author-email: swh-devel@inria.fr License: UNKNOWN Description: UNKNOWN Platform: UNKNOWN diff --git a/swh.journal.egg-info/SOURCES.txt b/swh.journal.egg-info/SOURCES.txt new file mode 100644 index 0000000..d84bb89 --- /dev/null +++ b/swh.journal.egg-info/SOURCES.txt @@ -0,0 +1,26 @@ +.gitignore +AUTHORS +LICENSE +MANIFEST.in +Makefile +requirements-swh.txt +requirements.txt +setup.py +version.txt +debian/changelog +debian/compat +debian/control +debian/copyright +debian/rules +debian/source/format +swh.journal.egg-info/PKG-INFO +swh.journal.egg-info/SOURCES.txt +swh.journal.egg-info/dependency_links.txt +swh.journal.egg-info/requires.txt +swh.journal.egg-info/top_level.txt +swh/journal/__init__.py +swh/journal/checker.py +swh/journal/client.py +swh/journal/publisher.py +swh/journal/serializers.py +swh/journal/tests/test_serializers.py \ No newline at end of file diff --git a/swh.journal.egg-info/dependency_links.txt b/swh.journal.egg-info/dependency_links.txt new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/swh.journal.egg-info/dependency_links.txt @@ -0,0 +1 @@ + diff --git a/swh.journal.egg-info/requires.txt b/swh.journal.egg-info/requires.txt new file mode 100644 index 0000000..a08feb1 --- /dev/null +++ b/swh.journal.egg-info/requires.txt @@ -0,0 +1,2 @@ +kafka +vcversioner diff --git a/swh.journal.egg-info/top_level.txt b/swh.journal.egg-info/top_level.txt new file mode 100644 index 0000000..0cb0f8f --- /dev/null +++ b/swh.journal.egg-info/top_level.txt @@ -0,0 +1 @@ +swh diff --git a/swh/journal/checker.py b/swh/journal/checker.py new file mode 100644 index 0000000..ab2cff2 --- /dev/null +++ b/swh/journal/checker.py @@ -0,0 +1,137 @@ +# Copyright (C) 2017 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""Module defining journal checker classes. + +Those checker goal is to send back all, or missing objects from the +journal queues. + +At the moment, a first naive implementation is the +SWHSimpleCheckerProducer. It simply reads the objects from the +storage and sends every object identifier back to the journal. + +""" + +import psycopg2 + +from kafka import KafkaProducer + +from swh.core.config import SWHConfig +from .serializers import key_to_kafka + + +TYPE_TO_PRIMARY_KEY = { + 'origin': ['id'], + 'content': ['sha1', 'sha1_git', 'sha256'], + 'directory': ['id'], + 'revision': ['id'], + 'release': ['id'], + 'origin_visit': ['origin', 'visit'], + 'skipped_content': ['sha1', 'sha1_git', 'sha256'], +} + + +def entry_to_bytes(entry): + """Convert an entry coming from the database to bytes""" + if isinstance(entry, memoryview): + return entry.tobytes() + if isinstance(entry, tuple): + return [entry_to_bytes(value) for value in entry] + return entry + + +def fetch(db_conn, obj_type): + """Fetch all obj_type's identifiers from db. + + This opens one connection, stream objects and when done, close + the connection. + + Raises: + ValueError if obj_type is not supported + + Yields: + Identifiers for the specific object_type + + """ + primary_key = TYPE_TO_PRIMARY_KEY.get(obj_type) + if not primary_key: + raise ValueError('The object type %s is not supported. ' + 'Only possible values are %s' % ( + obj_type, TYPE_TO_PRIMARY_KEY.keys())) + + primary_key_str = ','.join(primary_key) + query = 'select %s from %s order by %s' % ( + primary_key_str, obj_type, primary_key_str) + server_side_cursor_name = 'swh.journal.%s' % obj_type + + with psycopg2.connect(db_conn) as db: + cursor = db.cursor(name=server_side_cursor_name) + cursor.execute(query) + for o in cursor: + yield dict(zip(primary_key, entry_to_bytes(o))) + + +class SWHJournalSimpleCheckerProducer(SWHConfig): + """Class in charge of reading the storage's objects and sends those + back to the publisher queue. + + This is designed to be run periodically. + + """ + DEFAULT_CONFIG = { + 'brokers': ('list[str]', ['getty.internal.softwareheritage.org']), + 'temporary_prefix': ('str', 'swh.tmp_journal.new'), + 'publisher_id': ('str', 'swh.journal.publisher.test'), + 'object_types': ('list[str]', ['content', 'revision', 'release']), + 'storage_dbconn': ('str', 'service=swh-dev'), + } + + CONFIG_BASE_FILENAME = 'journal/checker' + + def __init__(self, extra_configuration=None): + self.config = config = self.parse_config_file() + if extra_configuration: + config.update(extra_configuration) + + self.object_types = self.config['object_types'] + for obj_type in self.object_types: + if obj_type not in TYPE_TO_PRIMARY_KEY: + raise ValueError('The object type %s is not supported. ' + 'Possible values are %s' % ( + obj_type, + ', '.join(TYPE_TO_PRIMARY_KEY))) + + self.storage_dbconn = self.config['storage_dbconn'] + + self.producer = KafkaProducer( + bootstrap_servers=config['brokers'], + value_serializer=key_to_kafka, + client_id=config['publisher_id'], + ) + + def _read_storage(self): + """Read storage's objects and generates tuple as object_type, dict of + object. + + Yields: + tuple of object_type, object as dict + + """ + for obj_type in self.object_types: + for obj in fetch(self.storage_dbconn, obj_type): + yield obj_type, obj + + def run(self): + """Reads storage's subscribed object types and send them to the + publisher's reading queue. + + """ + for obj_type, obj in self._read_storage(): + topic = '%s.%s' % (self.config['temporary_prefix'], obj_type) + self.producer.send(topic, value=obj) + + +if __name__ == '__main__': + SWHJournalSimpleCheckerProducer().run() diff --git a/swh/journal/client.py b/swh/journal/client.py index 386b400..c1b56c8 100644 --- a/swh/journal/client.py +++ b/swh/journal/client.py @@ -1,133 +1,134 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from abc import ABCMeta, abstractmethod from collections import defaultdict from kafka import KafkaConsumer from swh.core.config import SWHConfig -from .serializers import kafka_to_value +from .serializers import kafka_to_key, kafka_to_value # Only accepted offset reset policy accepted ACCEPTED_OFFSET_RESET = ['earliest', 'latest'] # Only accepted object types ACCEPTED_OBJECT_TYPES = [ 'content', 'revision', 'release', 'occurrence', 'origin', 'origin_visit' ] class SWHJournalClient(SWHConfig, metaclass=ABCMeta): """A base client for the Software Heritage journal. The current implementation of the journal uses Apache Kafka brokers to publish messages under a given topic prefix, with each object type using a specific topic under that prefix. Clients subscribe to events specific to each object type by using the `object_types` configuration variable. Clients can be sharded by setting the `client_id` to a common value across instances. The journal will share the message throughput across the nodes sharing the same client_id. Messages are processed by the `process_objects` method in batches of maximum `max_messages`. """ DEFAULT_CONFIG = { # Broker to connect to 'brokers': ('list[str]', ['localhost']), # Prefix topic to receive notification from 'topic_prefix': ('str', 'swh.journal.objects'), # Consumer identifier - 'consumer_identifier': ('str', 'swh.journal.client.test'), + 'consumer_id': ('str', 'swh.journal.client'), # Object types to deal with (in a subscription manner) 'object_types': ('list[str]', [ 'content', 'revision', 'release', 'occurrence', 'origin', 'origin_visit' ]), # Number of messages to batch process 'max_messages': ('int', 100), 'auto_offset_reset': ('str', 'earliest') } CONFIG_BASE_FILENAME = 'journal/client' ADDITIONAL_CONFIG = None def __init__(self, extra_configuration={}): self.config = self.parse_config_file( additional_configs=[self.ADDITIONAL_CONFIG]) if extra_configuration: self.config.update(extra_configuration) self.log = logging.getLogger('swh.journal.client.SWHJournalClient') auto_offset_reset = self.config['auto_offset_reset'] if auto_offset_reset not in ACCEPTED_OFFSET_RESET: raise ValueError( 'Option \'auto_offset_reset\' only accept %s.' % ACCEPTED_OFFSET_RESET) object_types = self.config['object_types'] for object_type in object_types: if object_type not in ACCEPTED_OBJECT_TYPES: raise ValueError( 'Option \'object_types\' only accepts %s.' % ACCEPTED_OFFSET_RESET) self.consumer = KafkaConsumer( bootstrap_servers=self.config['brokers'], + key_deserializer=kafka_to_key, value_deserializer=kafka_to_value, auto_offset_reset=auto_offset_reset, enable_auto_commit=False, - group_id=self.config['consumer_identifier'], + group_id=self.config['consumer_id'], ) self.consumer.subscribe( topics=['%s.%s' % (self.config['topic_prefix'], object_type) for object_type in object_types], ) self.max_messages = self.config['max_messages'] def process(self): """Main entry point to process event message reception. """ while True: messages = defaultdict(list) for num, message in enumerate(self.consumer): object_type = message.topic.split('.')[-1] messages[object_type].append(message.value) if num >= self.max_messages: break self.process_objects(messages) self.consumer.commit() # Override the following method in the sub-classes @abstractmethod def process_objects(self, messages): """Process the objects (store, compute, etc...) Args: messages (dict): Dict of key object_type (as per configuration) and their associated values. """ pass diff --git a/swh/journal/publisher.py b/swh/journal/publisher.py index 366e633..de9e936 100644 --- a/swh/journal/publisher.py +++ b/swh/journal/publisher.py @@ -1,123 +1,126 @@ -# Copyright (C) 2016 The Software Heritage developers +# Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict import logging from kafka import KafkaProducer, KafkaConsumer from swh.core.config import SWHConfig from swh.storage import get_storage -from .serializers import kafka_to_value, value_to_kafka +from .serializers import kafka_to_key, key_to_kafka class SWHJournalPublisher(SWHConfig): DEFAULT_CONFIG = { 'brokers': ('list[str]', ['getty.internal.softwareheritage.org']), 'temporary_prefix': ('str', 'swh.tmp_journal.new'), - 'final_prefix': ('str', 'swh.journal.test_publisher'), + 'final_prefix': ('str', 'swh.journal.objects'), - 'consumer_id': ('str', 'swh.journal.publisher.test'), - 'publisher_id': ('str', 'swh.journal.publisher.test'), + 'consumer_id': ('str', 'swh.journal.publisher'), + 'publisher_id': ('str', 'swh.journal.publisher'), 'object_types': ('list[str]', ['content', 'revision', 'release']), 'storage': ('dict', { 'cls': 'remote', 'args': { 'url': 'http://localhost:5002/', } }), 'max_messages': ('int', 10000), } CONFIG_BASE_FILENAME = 'journal/publisher' def __init__(self, extra_configuration=None): self.config = config = self.parse_config_file() if extra_configuration: config.update(extra_configuration) self.storage = get_storage(**config['storage']) + # yes, the temporary topics contain values that are actually _keys_ self.consumer = KafkaConsumer( bootstrap_servers=config['brokers'], - value_deserializer=kafka_to_value, + value_deserializer=kafka_to_key, auto_offset_reset='earliest', enable_auto_commit=False, group_id=config['consumer_id'], ) self.producer = KafkaProducer( bootstrap_servers=config['brokers'], - value_serializer=value_to_kafka, + key_serializer=key_to_kafka, + value_serializer=key_to_kafka, client_id=config['publisher_id'], ) self.consumer.subscribe( topics=['%s.%s' % (config['temporary_prefix'], object_type) for object_type in config['object_types']], ) self.max_messages = self.config['max_messages'] def poll(self): """Process a batch of messages""" num = 0 messages = defaultdict(list) for num, message in enumerate(self.consumer): object_type = message.topic.split('.')[-1] messages[object_type].append(message.value) if num >= self.max_messages: break new_objects = self.process_objects(messages) self.produce_messages(new_objects) self.consumer.commit() def process_objects(self, messages): processors = { 'content': self.process_contents, 'revision': self.process_revisions, 'release': self.process_releases, } return { key: processors[key](value) for key, value in messages.items() } def produce_messages(self, messages): for object_type, objects in messages.items(): topic = '%s.%s' % (self.config['final_prefix'], object_type) for key, object in objects: self.producer.send(topic, key=key, value=object) self.producer.flush() def process_contents(self, content_objs): - metadata = self.storage.content_get_metadata(content_objs) + metadata = self.storage.content_get_metadata( + (c[b'sha1'] for c in content_objs)) return [(content['sha1'], content) for content in metadata] def process_revisions(self, revision_objs): - metadata = self.storage.revision_get(revision_objs) + metadata = self.storage.revision_get((r[b'id'] for r in revision_objs)) return [(revision['id'], revision) for revision in metadata] def process_releases(self, release_objs): - metadata = self.storage.release_get(release_objs) + metadata = self.storage.release_get((r[b'id'] for r in release_objs)) return [(release['id'], release) for release in metadata] if __name__ == '__main__': logging.basicConfig( level=logging.INFO, format='%(asctime)s %(process)d %(levelname)s %(message)s' ) publisher = SWHJournalPublisher() while True: publisher.poll() diff --git a/swh/journal/serializers.py b/swh/journal/serializers.py index b8d5c4d..9895757 100644 --- a/swh/journal/serializers.py +++ b/swh/journal/serializers.py @@ -1,16 +1,30 @@ -# Copyright (C) 2016 The Software Heritage developers +# Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import msgpack +def key_to_kafka(key): + """Serialize a key, possibly a dict, in a predictable way""" + p = msgpack.Packer(use_bin_type=True) + if isinstance(key, dict): + return p.pack_map_pairs(sorted(key.items())) + else: + return p.pack(key) + + +def kafka_to_key(kafka_key): + """Deserialize a key""" + return msgpack.loads(kafka_key) + + def value_to_kafka(value): """Serialize some data for storage in kafka""" return msgpack.dumps(value, use_bin_type=True) def kafka_to_value(kafka_value): """Deserialize some data stored in kafka""" return msgpack.loads(kafka_value) diff --git a/swh/journal/tests/test_serializers.py b/swh/journal/tests/test_serializers.py new file mode 100644 index 0000000..9d4bdd4 --- /dev/null +++ b/swh/journal/tests/test_serializers.py @@ -0,0 +1,29 @@ +# Copyright (C) 2017 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from collections import OrderedDict +import itertools +import unittest + +from swh.journal import serializers + + +class TestSerializers(unittest.TestCase): + def test_key_to_kafka_repeatable(self): + """Check the kafka key encoding is repeatable""" + base_dict = { + 'a': 'foo', + 'b': 'bar', + 'c': 'baz', + } + + key = serializers.key_to_kafka(base_dict) + + for dict_keys in itertools.permutations(base_dict): + d = OrderedDict() + for k in dict_keys: + d[k] = base_dict[k] + + self.assertEqual(key, serializers.key_to_kafka(d)) diff --git a/version.txt b/version.txt new file mode 100644 index 0000000..b53c18c --- /dev/null +++ b/version.txt @@ -0,0 +1 @@ +v0.0.2-0-g1312c34 \ No newline at end of file