Changeset View
Changeset View
Standalone View
Standalone View
swh/journal/writer/kafka.py
Show First 20 Lines • Show All 84 Lines • ▼ Show 20 Lines | def _sanitize_object(self, object_type, object_): | ||||
return { | return { | ||||
**object_, | **object_, | ||||
'date': str(object_['date']), | 'date': str(object_['date']), | ||||
} | } | ||||
elif object_type == 'origin': | elif object_type == 'origin': | ||||
assert 'id' not in object_ | assert 'id' not in object_ | ||||
return object_ | return object_ | ||||
def write_addition(self, object_type, object_, flush=True): | def _write_addition(self, object_type, object_): | ||||
"""Write a single object to the journal""" | """Write a single object to the journal""" | ||||
if isinstance(object_, BaseModel): | if isinstance(object_, BaseModel): | ||||
object_ = object_.to_dict() | object_ = object_.to_dict() | ||||
topic = '%s.%s' % (self._prefix, object_type) | topic = '%s.%s' % (self._prefix, object_type) | ||||
key = self._get_key(object_type, object_) | key = self._get_key(object_type, object_) | ||||
object_ = self._sanitize_object(object_type, object_) | dict_ = self._sanitize_object(object_type, object_) | ||||
logger.debug('topic: %s, key: %s, value: %s', topic, key, object_) | logger.debug('topic: %s, key: %s, value: %s', topic, key, dict_) | ||||
self.send(topic, key=key, value=object_) | self.send(topic, key=key, value=dict_) | ||||
if flush: | def write_addition(self, object_type, object_): | ||||
"""Write a single object to the journal""" | |||||
self._write_addition(object_type, object_) | |||||
self.flush() | self.flush() | ||||
write_update = write_addition | write_update = write_addition | ||||
def write_additions(self, object_type, objects, flush=True): | def write_additions(self, object_type, objects): | ||||
"""Write a set of objects to the journal""" | """Write a set of objects to the journal""" | ||||
for object_ in objects: | for object_ in objects: | ||||
self.write_addition(object_type, object_, flush=False) | self._write_addition(object_type, object_) | ||||
if flush: | |||||
self.flush() | self.flush() |