Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/buffer.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from functools import partial | from functools import partial | ||||
from typing import Dict, Iterable, List, Optional | from typing import Dict, Iterable, List, Optional | ||||
from swh.core.utils import grouper | from swh.core.utils import grouper | ||||
from swh.model.model import Content, BaseModel | from swh.model.model import Content, BaseModel | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.storage.interface import StorageInterface | |||||
class BufferingProxyStorage: | class BufferingProxyStorage: | ||||
"""Storage implementation in charge of accumulating objects prior to | """Storage implementation in charge of accumulating objects prior to | ||||
discussing with the "main" storage. | discussing with the "main" storage. | ||||
Sample configuration use case for buffering storage: | Sample configuration use case for buffering storage: | ||||
Show All 11 Lines | .. code-block:: yaml | ||||
skipped_content: 10000 | skipped_content: 10000 | ||||
directory: 5000 | directory: 5000 | ||||
revision: 1000 | revision: 1000 | ||||
release: 10000 | release: 10000 | ||||
""" | """ | ||||
def __init__(self, storage, min_batch_size=None): | def __init__(self, storage, min_batch_size=None): | ||||
self.storage = get_storage(**storage) | self.storage: StorageInterface = get_storage(**storage) | ||||
if min_batch_size is None: | if min_batch_size is None: | ||||
min_batch_size = {} | min_batch_size = {} | ||||
self.min_batch_size = { | self.min_batch_size = { | ||||
"content": min_batch_size.get("content", 10000), | "content": min_batch_size.get("content", 10000), | ||||
"content_bytes": min_batch_size.get("content_bytes", 100 * 1024 * 1024), | "content_bytes": min_batch_size.get("content_bytes", 100 * 1024 * 1024), | ||||
"skipped_content": min_batch_size.get("skipped_content", 10000), | "skipped_content": min_batch_size.get("skipped_content", 10000), | ||||
Show All 14 Lines | def __getattr__(self, key): | ||||
if key.endswith("_add"): | if key.endswith("_add"): | ||||
object_type = key.rsplit("_", 1)[0] | object_type = key.rsplit("_", 1)[0] | ||||
if object_type in self.object_types: | if object_type in self.object_types: | ||||
return partial(self.object_add, object_type=object_type, keys=["id"],) | return partial(self.object_add, object_type=object_type, keys=["id"],) | ||||
if key == "storage": | if key == "storage": | ||||
raise AttributeError(key) | raise AttributeError(key) | ||||
return getattr(self.storage, key) | return getattr(self.storage, key) | ||||
def content_add(self, content: Iterable[Content]) -> Dict: | def content_add(self, content: List[Content]) -> Dict: | ||||
"""Enqueue contents to write to the storage. | """Enqueue contents to write to the storage. | ||||
Following policies apply: | Following policies apply: | ||||
- First, check if the queue's threshold is hit. | - First, check if the queue's threshold is hit. | ||||
If it is flush content to the storage. | If it is flush content to the storage. | ||||
- If not, check if the total size of enqueued contents's | - If not, check if the total size of enqueued contents's | ||||
threshold is hit. If it is flush content to the storage. | threshold is hit. If it is flush content to the storage. | ||||
""" | """ | ||||
content = list(content) | |||||
s = self.object_add( | s = self.object_add( | ||||
content, | content, | ||||
object_type="content", | object_type="content", | ||||
keys=["sha1", "sha1_git", "sha256", "blake2s256"], | keys=["sha1", "sha1_git", "sha256", "blake2s256"], | ||||
) | ) | ||||
if not s: | if not s: | ||||
buffer_ = self._objects["content"].values() | buffer_ = self._objects["content"].values() | ||||
total_size = sum(c.length for c in buffer_) | total_size = sum(c.length for c in buffer_) | ||||
if total_size >= self.min_batch_size["content_bytes"]: | if total_size >= self.min_batch_size["content_bytes"]: | ||||
return self.flush(["content"]) | return self.flush(["content"]) | ||||
return s | return s | ||||
def skipped_content_add(self, content: Iterable[Content]) -> Dict: | def skipped_content_add(self, content: List[Content]) -> Dict: | ||||
return self.object_add( | return self.object_add( | ||||
content, | content, | ||||
object_type="skipped_content", | object_type="skipped_content", | ||||
keys=["sha1", "sha1_git", "sha256", "blake2s256"], | keys=["sha1", "sha1_git", "sha256", "blake2s256"], | ||||
) | ) | ||||
def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: | def flush(self, object_types: Optional[List[str]] = None) -> Dict: | ||||
summary: Dict[str, int] = self.storage.flush(object_types) | summary: Dict[str, int] = self.storage.flush(object_types) | ||||
if object_types is None: | if object_types is None: | ||||
object_types = self.object_types | object_types = self.object_types | ||||
for object_type in object_types: | for object_type in object_types: | ||||
buffer_ = self._objects[object_type] | buffer_ = self._objects[object_type] | ||||
batches = grouper(buffer_.values(), n=self.min_batch_size[object_type]) | batches = grouper(buffer_.values(), n=self.min_batch_size[object_type]) | ||||
for batch in batches: | for batch in batches: | ||||
add_fn = getattr(self.storage, "%s_add" % object_type) | add_fn = getattr(self.storage, "%s_add" % object_type) | ||||
Show All 15 Lines | ) -> Dict: | ||||
for obj in objects: | for obj in objects: | ||||
obj_key = tuple(getattr(obj, key) for key in keys) | obj_key = tuple(getattr(obj, key) for key in keys) | ||||
buffer_[obj_key] = obj | buffer_[obj_key] = obj | ||||
if len(buffer_) >= threshold: | if len(buffer_) >= threshold: | ||||
return self.flush() | return self.flush() | ||||
return {} | return {} | ||||
def clear_buffers(self, object_types: Optional[Iterable[str]] = None) -> None: | def clear_buffers(self, object_types: Optional[List[str]] = None) -> None: | ||||
"""Clear objects from current buffer. | """Clear objects from current buffer. | ||||
WARNING: | WARNING: | ||||
data that has not been flushed to storage will be lost when this | data that has not been flushed to storage will be lost when this | ||||
method is called. This should only be called when `flush` fails and | method is called. This should only be called when `flush` fails and | ||||
you want to continue your processing. | you want to continue your processing. | ||||
Show All 9 Lines |