diff --git a/swh/storage/buffer.py b/swh/storage/buffer.py index 4434de7b..382868b9 100644 --- a/swh/storage/buffer.py +++ b/swh/storage/buffer.py @@ -1,160 +1,157 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from functools import partial from typing import Collection, Dict, Iterable, List, Mapping, Optional, Tuple from swh.core.utils import grouper from swh.model.model import BaseModel, Content, SkippedContent from swh.storage import get_storage from swh.storage.interface import StorageInterface OBJECT_TYPES: List[str] = [ "content", "skipped_content", "directory", "revision", "release", ] DEFAULT_BUFFER_THRESHOLDS: Dict[str, int] = { "content": 10000, "content_bytes": 100 * 1024 * 1024, "skipped_content": 10000, "directory": 25000, "revision": 100000, "release": 100000, } class BufferingProxyStorage: """Storage implementation in charge of accumulating objects prior to discussing with the "main" storage. Deduplicates values based on a tuple of keys depending on the object type. Sample configuration use case for buffering storage: .. code-block:: yaml storage: cls: buffer args: storage: cls: remote args: http://storage.internal.staging.swh.network:5002/ min_batch_size: content: 10000 content_bytes: 100000000 skipped_content: 10000 directory: 5000 revision: 1000 release: 10000 """ - def __init__( - self, storage: Mapping, min_batch_size: Mapping = DEFAULT_BUFFER_THRESHOLDS, - ): + def __init__(self, storage: Mapping, min_batch_size: Mapping = {}): self.storage: StorageInterface = get_storage(**storage) - if min_batch_size is not DEFAULT_BUFFER_THRESHOLDS: - self._buffer_thresholds = {**DEFAULT_BUFFER_THRESHOLDS, **min_batch_size} + self._buffer_thresholds = {**DEFAULT_BUFFER_THRESHOLDS, **min_batch_size} self._objects: Dict[str, Dict[Tuple[str, ...], BaseModel]] = { k: {} for k in OBJECT_TYPES } self._contents_size: int = 0 def __getattr__(self, key: str): if key.endswith("_add"): object_type = key.rsplit("_", 1)[0] if object_type in OBJECT_TYPES: return partial(self.object_add, object_type=object_type, keys=["id"],) if key == "storage": raise AttributeError(key) return getattr(self.storage, key) def content_add(self, contents: Collection[Content]) -> Dict: """Push contents to write to the storage in the buffer. Following policies apply: - if the buffer's threshold is hit, flush content to the storage. - otherwise, if the total size of buffered contents's threshold is hit, flush content to the storage. """ stats = self.object_add( contents, object_type="content", keys=["sha1", "sha1_git", "sha256", "blake2s256"], ) if not stats: # We did not flush already self._contents_size += sum(c.length for c in contents) if self._contents_size >= self._buffer_thresholds["content_bytes"]: return self.flush(["content"]) return stats def skipped_content_add(self, contents: Collection[SkippedContent]) -> Dict: return self.object_add( contents, object_type="skipped_content", keys=["sha1", "sha1_git", "sha256", "blake2s256"], ) def object_add( self, objects: Collection[BaseModel], *, object_type: str, keys: Iterable[str], ) -> Dict[str, int]: """Push objects to write to the storage in the buffer. Flushes the buffer to the storage if the threshold is hit. """ buffer_ = self._objects[object_type] for obj in objects: obj_key = tuple(getattr(obj, key) for key in keys) buffer_[obj_key] = obj if len(buffer_) >= self._buffer_thresholds[object_type]: return self.flush() return {} def flush(self, object_types: Optional[List[str]] = None) -> Dict[str, int]: summary: Dict[str, int] = self.storage.flush(object_types) if object_types is None: object_types = OBJECT_TYPES for object_type in object_types: buffer_ = self._objects[object_type] batches = grouper(buffer_.values(), n=self._buffer_thresholds[object_type]) for batch in batches: add_fn = getattr(self.storage, "%s_add" % object_type) stats = add_fn(list(batch)) summary = {k: v + summary.get(k, 0) for k, v in stats.items()} self.clear_buffers(object_types) return summary def clear_buffers(self, object_types: Optional[List[str]] = None) -> None: """Clear objects from current buffer. WARNING: data that has not been flushed to storage will be lost when this method is called. This should only be called when `flush` fails and you want to continue your processing. """ if object_types is None: object_types = OBJECT_TYPES for object_type in object_types: buffer_ = self._objects[object_type] buffer_.clear() if object_type == "content": self._contents_size = 0 return self.storage.clear_buffers(object_types)