Changeset View
Changeset View
Standalone View
Standalone View
swh/indexer/storage/writer.py
Show First 20 Lines • Show All 44 Lines • ▼ Show 20 Lines | def __init__(self, tool_getter: Callable[[int], Dict[str, Any]], journal_writer): | ||||
) | ) | ||||
else: | else: | ||||
self.journal = None | self.journal = None | ||||
def write_additions(self, obj_type, entries: Iterable[BaseRow]) -> None: | def write_additions(self, obj_type, entries: Iterable[BaseRow]) -> None: | ||||
if not self.journal: | if not self.journal: | ||||
return | return | ||||
translated = [] | |||||
# usually, all the additions in a batch are from the same indexer, | # usually, all the additions in a batch are from the same indexer, | ||||
# so this cache allows doing a single query for all the entries. | # so this cache allows doing a single query for all the entries. | ||||
tool_cache = {} | tool_cache = {} | ||||
for entry in entries: | for entry in entries: | ||||
assert entry.object_type == obj_type # type: ignore | assert entry.object_type == obj_type # type: ignore | ||||
# get the tool used to generate this addition | # get the tool used to generate this addition | ||||
tool_id = entry.indexer_configuration_id | tool_id = entry.indexer_configuration_id | ||||
assert tool_id | assert tool_id | ||||
if tool_id not in tool_cache: | if tool_id not in tool_cache: | ||||
tool_cache[tool_id] = self._tool_getter(tool_id) | tool_cache[tool_id] = self._tool_getter(tool_id) | ||||
entry = attr.evolve( | entry = attr.evolve( | ||||
entry, tool=tool_cache[tool_id], indexer_configuration_id=None | entry, tool=tool_cache[tool_id], indexer_configuration_id=None | ||||
) | ) | ||||
translated.append(entry) | |||||
# write to kafka | # write to kafka | ||||
self.journal.write_addition(obj_type, entry) | self.journal.write_additions(obj_type, translated) |