Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/publisher.py
Show All 10 Lines | |||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.storage.algos import snapshot | from swh.storage.algos import snapshot | ||||
from .serializers import kafka_to_key, key_to_kafka | from .serializers import kafka_to_key, key_to_kafka | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
MANDATORY_KEYS = [ | |||||
'brokers', 'temporary_prefix', 'final_prefix', 'consumer_id', | |||||
'publisher_id', 'object_types', 'storage' | |||||
] | |||||
class JournalPublisher: | class JournalPublisher: | ||||
"""The journal publisher is a layer in charge of: | """The journal publisher is a layer in charge of: | ||||
- consuming messages from topics (1 topic per object_type) | - consuming messages from topics (1 topic per object_type) | ||||
- reify the object ids read from those topics (using the storage) | - reify the object ids read from those topics (using the storage) | ||||
- producing those reified objects to output topics (1 topic per | - producing those reified objects to output topics (1 topic per | ||||
object type) | object type) | ||||
The main entry point for this class is the 'poll' method. | The main entry point for this class is the 'poll' method. | ||||
""" | """ | ||||
def __init__(self, config): | def __init__(self, config): | ||||
self.config = config | self.config = config | ||||
self.check_config(config) | |||||
self._prepare_storage(config) | self._prepare_storage(config) | ||||
self._prepare_journal(config) | self._prepare_journal(config) | ||||
self.max_messages = self.config['max_messages'] | self.max_messages = self.config['max_messages'] | ||||
logger.setLevel(logging.DEBUG) | |||||
def check_config(self, config): | |||||
"""Check the configuration is fine. | |||||
If not raise an error. | |||||
""" | |||||
missing_keys = [] | |||||
for key in MANDATORY_KEYS: | |||||
if not config.get(key): | |||||
missing_keys.append(key) | |||||
if missing_keys: | |||||
raise ValueError( | |||||
'Configuration error: The following keys must be' | |||||
' provided: %s' % (','.join(missing_keys), )) | |||||
def _prepare_journal(self, config): | def _prepare_journal(self, config): | ||||
"""Prepare the consumer and subscriber instances for the publisher to | """Prepare the consumer and subscriber instances for the publisher to | ||||
actually be able to discuss with the journal. | actually be able to discuss with the journal. | ||||
""" | """ | ||||
# yes, the temporary topics contain values that are actually _keys_ | # yes, the temporary topics contain values that are actually _keys_ | ||||
self.consumer = KafkaConsumer( | self.consumer = KafkaConsumer( | ||||
▲ Show 20 Lines • Show All 156 Lines • Show Last 20 Lines |