Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/buffer.py
Show First 20 Lines • Show All 128 Lines • ▼ Show 20 Lines | ) -> Dict[str, int]: | ||||
return self.flush() | return self.flush() | ||||
return {} | return {} | ||||
def flush( | def flush( | ||||
self, object_types: Sequence[LObjectType] = OBJECT_TYPES | self, object_types: Sequence[LObjectType] = OBJECT_TYPES | ||||
) -> Dict[str, int]: | ) -> Dict[str, int]: | ||||
summary: Dict[str, int] = self.storage.flush(object_types) | summary: Dict[str, int] = self.storage.flush(object_types) | ||||
def update_summary(stats): | |||||
for k, v in stats.items(): | |||||
summary[k] = v + summary.get(k, 0) | |||||
for object_type in object_types: | for object_type in object_types: | ||||
buffer_ = self._objects[object_type] | buffer_ = self._objects[object_type] | ||||
batches = grouper(buffer_.values(), n=self._buffer_thresholds[object_type]) | batches = grouper(buffer_.values(), n=self._buffer_thresholds[object_type]) | ||||
for batch in batches: | for batch in batches: | ||||
add_fn = getattr(self.storage, "%s_add" % object_type) | add_fn = getattr(self.storage, "%s_add" % object_type) | ||||
stats = add_fn(list(batch)) | stats = add_fn(list(batch)) | ||||
summary = {k: v + summary.get(k, 0) for k, v in stats.items()} | update_summary(stats) | ||||
self.clear_buffers(object_types) | self.clear_buffers(object_types) | ||||
return summary | return summary | ||||
def clear_buffers(self, object_types: Sequence[LObjectType] = OBJECT_TYPES) -> None: | def clear_buffers(self, object_types: Sequence[LObjectType] = OBJECT_TYPES) -> None: | ||||
"""Clear objects from current buffer. | """Clear objects from current buffer. | ||||
Show All 14 Lines |