diff --git a/swh/storage/buffer.py b/swh/storage/buffer.py --- a/swh/storage/buffer.py +++ b/swh/storage/buffer.py @@ -4,18 +4,37 @@ # See top-level LICENSE file for more information from functools import partial -from typing import Dict, Iterable, List, Optional +from typing import Collection, Dict, Iterable, List, Mapping, Optional, Tuple from swh.core.utils import grouper -from swh.model.model import BaseModel, Content +from swh.model.model import BaseModel, Content, SkippedContent from swh.storage import get_storage from swh.storage.interface import StorageInterface +OBJECT_TYPES: List[str] = [ + "content", + "skipped_content", + "directory", + "revision", + "release", +] + +DEFAULT_BUFFER_THRESHOLDS: Dict[str, int] = { + "content": 10000, + "content_bytes": 100 * 1024 * 1024, + "skipped_content": 10000, + "directory": 25000, + "revision": 100000, + "release": 100000, +} + class BufferingProxyStorage: """Storage implementation in charge of accumulating objects prior to discussing with the "main" storage. + Deduplicates values based on a tuple of keys depending on the object type. + Sample configuration use case for buffering storage: .. code-block:: yaml @@ -36,102 +55,89 @@ """ - def __init__(self, storage, min_batch_size=None): + def __init__( + self, storage: Mapping, min_batch_size: Mapping = DEFAULT_BUFFER_THRESHOLDS, + ): self.storage: StorageInterface = get_storage(**storage) - if min_batch_size is None: - min_batch_size = {} + if min_batch_size is not DEFAULT_BUFFER_THRESHOLDS: + self._buffer_thresholds = {**DEFAULT_BUFFER_THRESHOLDS, **min_batch_size} - self.min_batch_size = { - "content": min_batch_size.get("content", 10000), - "content_bytes": min_batch_size.get("content_bytes", 100 * 1024 * 1024), - "skipped_content": min_batch_size.get("skipped_content", 10000), - "directory": min_batch_size.get("directory", 25000), - "revision": min_batch_size.get("revision", 100000), - "release": min_batch_size.get("release", 100000), + self._objects: Dict[str, Dict[Tuple[str, ...], BaseModel]] = { + k: {} for k in OBJECT_TYPES } - self.object_types = [ - "content", - "skipped_content", - "directory", - "revision", - "release", - ] - self._objects = {k: {} for k in self.object_types} - - def __getattr__(self, key): + self._contents_size: int = 0 + + def __getattr__(self, key: str): if key.endswith("_add"): object_type = key.rsplit("_", 1)[0] - if object_type in self.object_types: + if object_type in OBJECT_TYPES: return partial(self.object_add, object_type=object_type, keys=["id"],) if key == "storage": raise AttributeError(key) return getattr(self.storage, key) - def content_add(self, content: List[Content]) -> Dict: - """Enqueue contents to write to the storage. + def content_add(self, contents: Collection[Content]) -> Dict: + """Push contents to write to the storage in the buffer. Following policies apply: - - - First, check if the queue's threshold is hit. - If it is flush content to the storage. - - - If not, check if the total size of enqueued contents's - threshold is hit. If it is flush content to the storage. + - if the buffer's threshold is hit, flush content to the storage. + - otherwise, if the total size of buffered contents's threshold is hit, + flush content to the storage. """ - s = self.object_add( - content, + stats = self.object_add( + contents, object_type="content", keys=["sha1", "sha1_git", "sha256", "blake2s256"], ) - if not s: - buffer_ = self._objects["content"].values() - total_size = sum(c.length for c in buffer_) - if total_size >= self.min_batch_size["content_bytes"]: + if not stats: # We did not flush already + self._contents_size += sum(c.length for c in contents) + if self._contents_size >= self._buffer_thresholds["content_bytes"]: return self.flush(["content"]) - return s + return stats - def skipped_content_add(self, content: List[Content]) -> Dict: + def skipped_content_add(self, contents: Collection[SkippedContent]) -> Dict: return self.object_add( - content, + contents, object_type="skipped_content", keys=["sha1", "sha1_git", "sha256", "blake2s256"], ) - def flush(self, object_types: Optional[List[str]] = None) -> Dict: - summary: Dict[str, int] = self.storage.flush(object_types) - if object_types is None: - object_types = self.object_types - for object_type in object_types: - buffer_ = self._objects[object_type] - batches = grouper(buffer_.values(), n=self.min_batch_size[object_type]) - for batch in batches: - add_fn = getattr(self.storage, "%s_add" % object_type) - s = add_fn(list(batch)) - summary = {k: v + summary.get(k, 0) for k, v in s.items()} - buffer_.clear() - - return summary - def object_add( - self, objects: Iterable[BaseModel], *, object_type: str, keys: List[str] - ) -> Dict: - """Enqueue objects to write to the storage. This checks if the queue's - threshold is hit. If it is actually write those to the storage. + self, objects: Collection[BaseModel], *, object_type: str, keys: Iterable[str], + ) -> Dict[str, int]: + """Push objects to write to the storage in the buffer. Flushes the + buffer to the storage if the threshold is hit. """ buffer_ = self._objects[object_type] - threshold = self.min_batch_size[object_type] for obj in objects: obj_key = tuple(getattr(obj, key) for key in keys) buffer_[obj_key] = obj - if len(buffer_) >= threshold: + if len(buffer_) >= self._buffer_thresholds[object_type]: return self.flush() return {} + def flush(self, object_types: Optional[List[str]] = None) -> Dict[str, int]: + summary: Dict[str, int] = self.storage.flush(object_types) + if object_types is None: + object_types = OBJECT_TYPES + + for object_type in object_types: + buffer_ = self._objects[object_type] + batches = grouper(buffer_.values(), n=self._buffer_thresholds[object_type]) + for batch in batches: + add_fn = getattr(self.storage, "%s_add" % object_type) + stats = add_fn(list(batch)) + summary = {k: v + summary.get(k, 0) for k, v in stats.items()} + + self.clear_buffers(object_types) + + return summary + def clear_buffers(self, object_types: Optional[List[str]] = None) -> None: """Clear objects from current buffer. @@ -143,10 +149,12 @@ """ if object_types is None: - object_types = self.object_types + object_types = OBJECT_TYPES for object_type in object_types: - q = self._objects[object_type] - q.clear() + buffer_ = self._objects[object_type] + buffer_.clear() + if object_type == "content": + self._contents_size = 0 return self.storage.clear_buffers(object_types)