Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/direct_writer.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import logging | import logging | ||||
from kafka import KafkaProducer | from kafka import KafkaProducer | ||||
from swh.model.hashutil import DEFAULT_ALGORITHMS | |||||
from .serializers import key_to_kafka, value_to_kafka | from .serializers import key_to_kafka, value_to_kafka | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
class DirectKafkaWriter: | class DirectKafkaWriter: | ||||
"""This class is instantiated and used by swh-storage to write incoming | """This class is instantiated and used by swh-storage to write incoming | ||||
new objects to Kafka before adding them to the storage backend | new objects to Kafka before adding them to the storage backend | ||||
(eg. postgresql) itself.""" | (eg. postgresql) itself.""" | ||||
def __init__(self, brokers, prefix, client_id): | def __init__(self, brokers, prefix, client_id): | ||||
self._prefix = prefix | self._prefix = prefix | ||||
self.producer = KafkaProducer( | self.producer = KafkaProducer( | ||||
bootstrap_servers=brokers, | bootstrap_servers=brokers, | ||||
key_serializer=key_to_kafka, | key_serializer=key_to_kafka, | ||||
value_serializer=value_to_kafka, | value_serializer=value_to_kafka, | ||||
client_id=client_id, | client_id=client_id, | ||||
) | ) | ||||
def _get_key(self, object_type, object_): | def _get_key(self, object_type, object_): | ||||
if object_type in ('revision', 'release', 'directory', 'snapshot'): | if object_type in ('revision', 'release', 'directory', 'snapshot'): | ||||
return object_['id'] | return object_['id'] | ||||
elif object_type == 'content': | elif object_type == 'content': | ||||
return object_['sha1'] # TODO: use a dict of hashes | return object_['sha1'] # TODO: use a dict of hashes | ||||
elif object_type == 'skipped_content': | |||||
return { | |||||
hash: object_[hash] | |||||
for hash in DEFAULT_ALGORITHMS | |||||
} | |||||
elif object_type == 'origin': | elif object_type == 'origin': | ||||
return {'url': object_['url'], 'type': object_['type']} | return {'url': object_['url'], 'type': object_['type']} | ||||
elif object_type == 'origin_visit': | elif object_type == 'origin_visit': | ||||
return { | return { | ||||
'origin': object_['origin'], | 'origin': object_['origin'], | ||||
'date': str(object_['date']), | 'date': str(object_['date']), | ||||
} | } | ||||
else: | else: | ||||
Show All 23 Lines |