Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/publisher.py
# Copyright (C) 2016-2018 The Software Heritage developers | # Copyright (C) 2016-2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from collections import defaultdict | from collections import defaultdict | ||||
import logging | import logging | ||||
from kafka import KafkaProducer, KafkaConsumer | from kafka import KafkaProducer, KafkaConsumer | ||||
from swh.core.config import SWHConfig | |||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.storage.algos import snapshot | from swh.storage.algos import snapshot | ||||
from .serializers import kafka_to_key, key_to_kafka | from .serializers import kafka_to_key, key_to_kafka | ||||
class JournalPublisher(SWHConfig): | class JournalPublisher: | ||||
"""The journal publisher is a layer in charge of: | """The journal publisher is a layer in charge of: | ||||
- consuming messages from topics (1 topic per object_type) | - consuming messages from topics (1 topic per object_type) | ||||
- reify the object ids read from those topics (using the storage) | - reify the object ids read from those topics (using the storage) | ||||
- producing those reified objects to output topics (1 topic per | - producing those reified objects to output topics (1 topic per | ||||
object type) | object type) | ||||
The main entry point for this class is the 'poll' method. | The main entry point for this class is the 'poll' method. | ||||
""" | """ | ||||
DEFAULT_CONFIG = { | def __init__(self, config): | ||||
'brokers': ('list[str]', ['getty.internal.softwareheritage.org']), | self.config = config | ||||
'temporary_prefix': ('str', 'swh.tmp_journal.new'), | |||||
'final_prefix': ('str', 'swh.journal.objects'), | |||||
'consumer_id': ('str', 'swh.journal.publisher'), | |||||
'publisher_id': ('str', 'swh.journal.publisher'), | |||||
'object_types': ('list[str]', ['content', 'revision', 'release']), | |||||
'storage': ('dict', { | |||||
'cls': 'remote', | |||||
'args': { | |||||
'url': 'http://localhost:5002/', | |||||
} | |||||
}), | |||||
'max_messages': ('int', 10000), | |||||
} | |||||
CONFIG_BASE_FILENAME = 'journal/publisher' | |||||
def __init__(self, extra_configuration=None): | |||||
self.config = config = self.parse_config_file() | |||||
if extra_configuration: | |||||
config.update(extra_configuration) | |||||
self._prepare_storage(config) | self._prepare_storage(config) | ||||
self._prepare_journal(config) | self._prepare_journal(config) | ||||
self.max_messages = self.config['max_messages'] | self.max_messages = self.config['max_messages'] | ||||
def _prepare_journal(self, config): | def _prepare_journal(self, config): | ||||
"""Prepare the consumer and subscriber instances for the publisher to | """Prepare the consumer and subscriber instances for the publisher to | ||||
actually be able to discuss with the journal. | actually be able to discuss with the journal. | ||||
""" | """ | ||||
# yes, the temporary topics contain values that are actually _keys_ | # yes, the temporary topics contain values that are actually _keys_ | ||||
▲ Show 20 Lines • Show All 146 Lines • ▼ Show 20 Lines | def process_snapshots(self, snapshot_objs): | ||||
full_obj = snapshot.snapshot_get_all_branches( | full_obj = snapshot.snapshot_get_all_branches( | ||||
self.storage, snap[b'id']) | self.storage, snap[b'id']) | ||||
metadata.append((full_obj['id'], full_obj)) | metadata.append((full_obj['id'], full_obj)) | ||||
return metadata | return metadata | ||||
if __name__ == '__main__': | if __name__ == '__main__': | ||||
import click | print('Please use the "swh-journal publisher run" command') | ||||
@click.command() | |||||
@click.option('--verbose', is_flag=True, default=False, | |||||
help='Be verbose if asked.') | |||||
def main(verbose): | |||||
logging.basicConfig( | |||||
level=logging.DEBUG if verbose else logging.INFO, | |||||
format='%(asctime)s %(process)d %(levelname)s %(message)s' | |||||
) | |||||
_log = logging.getLogger('kafka') | |||||
_log.setLevel(logging.INFO) | |||||
publisher = JournalPublisher() | |||||
while True: | |||||
publisher.poll() | |||||
main() |