Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/direct_writer.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import logging | import logging | ||||
from kafka import KafkaProducer | from kafka import KafkaProducer | ||||
from swh.model.hashutil import DEFAULT_ALGORITHMS | |||||
from .serializers import key_to_kafka, value_to_kafka | from .serializers import key_to_kafka, value_to_kafka | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
class DirectKafkaWriter: | class DirectKafkaWriter: | ||||
"""This class is instantiated and used by swh-storage to write incoming | """This class is instantiated and used by swh-storage to write incoming | ||||
new objects to Kafka before adding them to the storage backend | new objects to Kafka before adding them to the storage backend | ||||
Show All 11 Lines | class DirectKafkaWriter: | ||||
def send(self, topic, key, value): | def send(self, topic, key, value): | ||||
self.producer.send(topic=topic, key=key, value=value) | self.producer.send(topic=topic, key=key, value=value) | ||||
def _get_key(self, object_type, object_): | def _get_key(self, object_type, object_): | ||||
if object_type in ('revision', 'release', 'directory', 'snapshot'): | if object_type in ('revision', 'release', 'directory', 'snapshot'): | ||||
return object_['id'] | return object_['id'] | ||||
elif object_type == 'content': | elif object_type == 'content': | ||||
return object_['sha1'] # TODO: use a dict of hashes | return object_['sha1'] # TODO: use a dict of hashes | ||||
elif object_type == 'skipped_content': | |||||
return { | |||||
hash: object_[hash] | |||||
for hash in DEFAULT_ALGORITHMS | |||||
} | |||||
elif object_type == 'origin': | elif object_type == 'origin': | ||||
return {'url': object_['url'], 'type': object_['type']} | return {'url': object_['url'], 'type': object_['type']} | ||||
elif object_type == 'origin_visit': | elif object_type == 'origin_visit': | ||||
return { | return { | ||||
'origin': object_['origin'], | 'origin': object_['origin'], | ||||
'date': str(object_['date']), | 'date': str(object_['date']), | ||||
} | } | ||||
else: | else: | ||||
Show All 25 Lines |