Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/publisher.py
# Copyright (C) 2016-2019 The Software Heritage developers | # Copyright (C) 2016-2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from collections import defaultdict | from collections import defaultdict | ||||
import logging | import logging | ||||
from kafka import KafkaProducer, KafkaConsumer | from kafka import KafkaProducer, KafkaConsumer | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.storage.algos import snapshot | from swh.storage.algos import snapshot | ||||
from .serializers import kafka_to_key, key_to_kafka | from .serializers import kafka_to_key, key_to_kafka | ||||
logger = logging.getLogger(__name__) | |||||
MANDATORY_KEYS = [ | |||||
'brokers', 'temporary_prefix', 'final_prefix', 'consumer_id', | |||||
'publisher_id', 'object_types', 'storage' | |||||
] | |||||
class JournalPublisher: | class JournalPublisher: | ||||
"""The journal publisher is a layer in charge of: | """The journal publisher is a layer in charge of: | ||||
- consuming messages from topics (1 topic per object_type) | - consuming messages from topics (1 topic per object_type) | ||||
- reify the object ids read from those topics (using the storage) | - reify the object ids read from those topics (using the storage) | ||||
- producing those reified objects to output topics (1 topic per | - producing those reified objects to output topics (1 topic per | ||||
object type) | object type) | ||||
The main entry point for this class is the 'poll' method. | The main entry point for this class is the 'poll' method. | ||||
""" | """ | ||||
def __init__(self, config): | def __init__(self, config): | ||||
self.config = config | self.config = config | ||||
self.check_config(config) | |||||
self._prepare_storage(config) | self._prepare_storage(config) | ||||
self._prepare_journal(config) | self._prepare_journal(config) | ||||
self.max_messages = self.config['max_messages'] | self.max_messages = self.config['max_messages'] | ||||
logger.setLevel(logging.DEBUG) | |||||
def check_config(self, config): | |||||
"""Check the configuration is fine. | |||||
If not raise an error. | |||||
""" | |||||
missing_keys = [] | |||||
for key in MANDATORY_KEYS: | |||||
if not config.get(key): | |||||
missing_keys.append(key) | |||||
if missing_keys: | |||||
raise ValueError( | |||||
'Configuration error: The following keys must be' | |||||
' provided: %s' % (','.join(missing_keys), )) | |||||
def _prepare_journal(self, config): | def _prepare_journal(self, config): | ||||
"""Prepare the consumer and subscriber instances for the publisher to | """Prepare the consumer and subscriber instances for the publisher to | ||||
actually be able to discuss with the journal. | actually be able to discuss with the journal. | ||||
""" | """ | ||||
# yes, the temporary topics contain values that are actually _keys_ | # yes, the temporary topics contain values that are actually _keys_ | ||||
self.consumer = KafkaConsumer( | self.consumer = KafkaConsumer( | ||||
bootstrap_servers=config['brokers'], | bootstrap_servers=config['brokers'], | ||||
value_deserializer=kafka_to_key, | value_deserializer=kafka_to_key, | ||||
auto_offset_reset='earliest', | auto_offset_reset='earliest', | ||||
enable_auto_commit=False, | enable_auto_commit=False, | ||||
group_id=config['consumer_id'], | group_id=config['consumer_id'], | ||||
) | ) | ||||
self.producer = KafkaProducer( | self.producer = KafkaProducer( | ||||
bootstrap_servers=config['brokers'], | bootstrap_servers=config['brokers'], | ||||
key_serializer=key_to_kafka, | key_serializer=key_to_kafka, | ||||
value_serializer=key_to_kafka, | value_serializer=key_to_kafka, | ||||
client_id=config['publisher_id'], | client_id=config['publisher_id'], | ||||
) | ) | ||||
logging.debug('Subscribing to object types event: %s' % ( | logger.debug('Subscribing to object types event: %s' % ( | ||||
config['object_types'], )) | config['object_types'], )) | ||||
self.consumer.subscribe( | self.consumer.subscribe( | ||||
topics=['%s.%s' % (config['temporary_prefix'], object_type) | topics=['%s.%s' % (config['temporary_prefix'], object_type) | ||||
for object_type in config['object_types']], | for object_type in config['object_types']], | ||||
) | ) | ||||
def _prepare_storage(self, config): | def _prepare_storage(self, config): | ||||
"""Prepare the storage instance needed for the publisher to be able to | """Prepare the storage instance needed for the publisher to be able to | ||||
Show All 16 Lines | def poll(self, max_messages=None): | ||||
""" | """ | ||||
messages = defaultdict(list) | messages = defaultdict(list) | ||||
if max_messages is None: | if max_messages is None: | ||||
max_messages = self.max_messages | max_messages = self.max_messages | ||||
for num, message in enumerate(self.consumer): | for num, message in enumerate(self.consumer): | ||||
object_type = message.topic.split('.')[-1] | object_type = message.topic.split('.')[-1] | ||||
logging.debug('num: %s, object_type: %s, message: %s' % ( | logger.debug('num: %s, object_type: %s, message: %s' % ( | ||||
num, object_type, message)) | num, object_type, message)) | ||||
messages[object_type].append(message.value) | messages[object_type].append(message.value) | ||||
if num + 1 >= self.max_messages: | if num + 1 >= self.max_messages: | ||||
break | break | ||||
new_objects = self.process_objects(messages) | new_objects = self.process_objects(messages) | ||||
self.produce_messages(new_objects) | self.produce_messages(new_objects) | ||||
self.consumer.commit() | self.consumer.commit() | ||||
Show All 35 Lines | def produce_messages(self, messages): | ||||
object_type (str): content, revision, release | object_type (str): content, revision, release | ||||
tuple (bytes, dict): object id as bytes, object as swh dict. | tuple (bytes, dict): object id as bytes, object as swh dict. | ||||
""" | """ | ||||
for object_type, objects in messages.items(): | for object_type, objects in messages.items(): | ||||
topic = '%s.%s' % (self.config['final_prefix'], object_type) | topic = '%s.%s' % (self.config['final_prefix'], object_type) | ||||
for key, object in objects: | for key, object in objects: | ||||
logging.debug('topic: %s, key: %s, value: %s' % ( | logger.debug('topic: %s, key: %s, value: %s' % ( | ||||
topic, key, object)) | topic, key, object)) | ||||
self.producer.send(topic, key=key, value=object) | self.producer.send(topic, key=key, value=object) | ||||
self.producer.flush() | self.producer.flush() | ||||
def process_contents(self, content_objs): | def process_contents(self, content_objs): | ||||
logging.debug('contents: %s' % content_objs) | logger.debug('contents: %s' % content_objs) | ||||
metadata = self.storage.content_get_metadata( | metadata = self.storage.content_get_metadata( | ||||
(c[b'sha1'] for c in content_objs)) | (c[b'sha1'] for c in content_objs)) | ||||
return [(content['sha1'], content) for content in metadata] | return [(content['sha1'], content) for content in metadata] | ||||
def process_revisions(self, revision_objs): | def process_revisions(self, revision_objs): | ||||
logging.debug('revisions: %s' % revision_objs) | logger.debug('revisions: %s' % revision_objs) | ||||
metadata = self.storage.revision_get((r[b'id'] for r in revision_objs)) | metadata = self.storage.revision_get((r[b'id'] for r in revision_objs)) | ||||
return [(revision['id'], revision) | return [(revision['id'], revision) | ||||
for revision in metadata if revision] | for revision in metadata if revision] | ||||
def process_releases(self, release_objs): | def process_releases(self, release_objs): | ||||
logging.debug('releases: %s' % release_objs) | logger.debug('releases: %s' % release_objs) | ||||
metadata = self.storage.release_get((r[b'id'] for r in release_objs)) | metadata = self.storage.release_get((r[b'id'] for r in release_objs)) | ||||
return [(release['id'], release) for release in metadata] | return [(release['id'], release) for release in metadata] | ||||
def process_origins(self, origin_objs): | def process_origins(self, origin_objs): | ||||
logging.debug('origins: %s' % origin_objs) | logger.debug('origins: %s' % origin_objs) | ||||
r = [] | r = [] | ||||
for o in origin_objs: | for o in origin_objs: | ||||
origin = {'url': o[b'url'], 'type': o[b'type']} | origin = {'url': o[b'url'], 'type': o[b'type']} | ||||
r.append((origin, origin)) | r.append((origin, origin)) | ||||
return r | return r | ||||
def process_origin_visits(self, origin_visits): | def process_origin_visits(self, origin_visits): | ||||
logging.debug('origin_visits: %s' % origin_visits) | logger.debug('origin_visits: %s' % origin_visits) | ||||
metadata = [] | metadata = [] | ||||
for ov in origin_visits: | for ov in origin_visits: | ||||
origin_visit = self.storage.origin_visit_get_by( | origin_visit = self.storage.origin_visit_get_by( | ||||
ov[b'origin'], ov[b'visit']) | ov[b'origin'], ov[b'visit']) | ||||
if origin_visit: | if origin_visit: | ||||
pk = ov[b'origin'], ov[b'visit'] | pk = ov[b'origin'], ov[b'visit'] | ||||
origin_visit['date'] = str(origin_visit['date']) | origin_visit['date'] = str(origin_visit['date']) | ||||
metadata.append((pk, origin_visit)) | metadata.append((pk, origin_visit)) | ||||
return metadata | return metadata | ||||
def process_snapshots(self, snapshot_objs): | def process_snapshots(self, snapshot_objs): | ||||
logging.debug('snapshots: %s' % snapshot_objs) | logger.debug('snapshots: %s' % snapshot_objs) | ||||
metadata = [] | metadata = [] | ||||
for snap in snapshot_objs: | for snap in snapshot_objs: | ||||
full_obj = snapshot.snapshot_get_all_branches( | full_obj = snapshot.snapshot_get_all_branches( | ||||
self.storage, snap[b'id']) | self.storage, snap[b'id']) | ||||
metadata.append((full_obj['id'], full_obj)) | metadata.append((full_obj['id'], full_obj)) | ||||
return metadata | return metadata | ||||
if __name__ == '__main__': | if __name__ == '__main__': | ||||
print('Please use the "swh-journal publisher run" command') | print('Please use the "swh-journal publisher run" command') |