Changeset View
Standalone View
swh/journal/replay.py
- This file was added.
# Copyright (C) 2019 The Software Heritage developers | |||||
# See the AUTHORS file at the top-level directory of this distribution | |||||
# License: GNU General Public License version 3, or any later version | |||||
# See top-level LICENSE file for more information | |||||
import logging | |||||
from kafka import KafkaConsumer | |||||
from .serializers import kafka_to_value | |||||
logger = logging.getLogger(__name__) | |||||
OBJECT_TYPES = frozenset([ | |||||
'origin', 'origin_visit', 'snapshot', 'release', 'revision', | |||||
'directory', 'content', | |||||
]) | |||||
class StorageReplayer: | |||||
def __init__(self, brokers, prefix, consumer_id, | |||||
object_types=OBJECT_TYPES): | |||||
if not set(object_types).issubset(OBJECT_TYPES): | |||||
raise ValueError('Unknown object types: %s' % ', '.join( | |||||
set(object_types) - OBJECT_TYPES)) | |||||
self._object_types = object_types | |||||
self.consumer = KafkaConsumer( | |||||
bootstrap_servers=brokers, | |||||
value_deserializer=kafka_to_value, | |||||
auto_offset_reset='earliest', | |||||
enable_auto_commit=False, | |||||
group_id=consumer_id, | |||||
) | |||||
self.consumer.subscribe( | |||||
topics=['%s.%s' % (prefix, object_type) | |||||
for object_type in object_types], | |||||
) | |||||
douardda: why not pass the kafka consumer instance as argument of the constructor instead of passing… | |||||
Done Inline ActionsFor consistency with other related classes (eg. KafkaDirectWriter), whose arguments are deserialized directly from a yaml. vlorentz: For consistency with other related classes (eg. KafkaDirectWriter), whose arguments are… | |||||
def fill(self, storage, max_messages): | |||||
num = 0 | |||||
for message in self.consumer: | |||||
object_type = message.topic.split('.')[-1] | |||||
Done Inline ActionsI find a bit odd to put the enumerate inside the islice iterator. It makes it a bit harder to read this line IMHO. Also, since you have to initialize num=0 to handle empty look, why not add a num += 1 in the for loop instead of using enumerate? douardda: I find a bit odd to put the enumerate inside the islice iterator. It makes it a bit harder to… | |||||
Done Inline ActionsIndeed, I hesitated between the two approach so I unknowingly did both vlorentz: Indeed, I hesitated between the two approach so I unknowingly did both | |||||
# Got a message from a topic we did not subscribe to. | |||||
assert object_type in self._object_types, object_type | |||||
self.insert_object(storage, object_type, message.value) | |||||
num += 1 | |||||
if num >= max_messages: | |||||
break | |||||
return num | |||||
def insert_object(self, storage, object_type, object_): | |||||
if object_type in ('content', 'directory', 'revision', 'release', | |||||
'origin'): | |||||
Done Inline ActionsThis needs an explanation (comment) douardda: This needs an explanation (comment) | |||||
if object_type == 'content': | |||||
# TODO: we don't write contents in Kafka, so we need to | |||||
# find a way to insert them somehow. | |||||
object_['status'] = 'absent' | |||||
method = getattr(storage, object_type + '_add') | |||||
method([object_]) | |||||
elif object_type == 'origin_visit': | |||||
origin_id = storage.origin_add_one(object_.pop('origin')) | |||||
Not Done Inline ActionsThis is a bit rough douardda: This is a bit rough | |||||
Done Inline ActionsIt can't happen (because of the assertion in the other method), but I don't like my pattern-matchings to be non-exhaustive. vlorentz: It can't happen (because of the assertion in the other method), but I don't like my pattern… | |||||
Not Done Inline Actionsok, then raising a RuntimeError (or any suitable exception) would be preferrable IMHO. douardda: ok, then raising a RuntimeError (or any suitable exception) would be preferrable IMHO. | |||||
Done Inline ActionsWhy? That's what assertions are for vlorentz: Why? That's what assertions are for | |||||
Not Done Inline Actionswell I guess so. Not a huge fan of this usage (but meh). douardda: well I guess so. Not a huge fan of this usage (but meh). | |||||
storage.origin_visit_add(origin=origin_id, **object_) | |||||
method = getattr(storage, object_type + '_add') | |||||
else: | |||||
assert False |
why not pass the kafka consumer instance as argument of the constructor instead of passing KafkaConsumer init arguments?