Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/buffer.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from collections import deque | from collections import deque | ||||
from functools import partial | from functools import partial | ||||
from typing import Optional, Iterable, Dict | from typing import Optional, Iterable, Dict | ||||
from swh.core.utils import grouper | from swh.core.utils import grouper | ||||
from swh.model.model import Content, BaseModel | |||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
class BufferingProxyStorage: | class BufferingProxyStorage: | ||||
"""Storage implementation in charge of accumulating objects prior to | """Storage implementation in charge of accumulating objects prior to | ||||
discussing with the "main" storage. | discussing with the "main" storage. | ||||
Sample configuration use case for buffering storage: | Sample configuration use case for buffering storage: | ||||
Show All 40 Lines | def __getattr__(self, key): | ||||
if object_type in self.object_types: | if object_type in self.object_types: | ||||
return partial( | return partial( | ||||
self.object_add, object_type=object_type | self.object_add, object_type=object_type | ||||
) | ) | ||||
if key == 'storage': | if key == 'storage': | ||||
raise AttributeError(key) | raise AttributeError(key) | ||||
return getattr(self.storage, key) | return getattr(self.storage, key) | ||||
def content_add(self, content: Iterable[Dict]) -> Dict: | def content_add(self, content: Iterable[Content]) -> Dict: | ||||
"""Enqueue contents to write to the storage. | """Enqueue contents to write to the storage. | ||||
Following policies apply: | Following policies apply: | ||||
- First, check if the queue's threshold is hit. | - First, check if the queue's threshold is hit. | ||||
If it is flush content to the storage. | If it is flush content to the storage. | ||||
- If not, check if the total size of enqueued contents's | - If not, check if the total size of enqueued contents's | ||||
threshold is hit. If it is flush content to the storage. | threshold is hit. If it is flush content to the storage. | ||||
""" | """ | ||||
content = list(content) | |||||
s = self.object_add(content, object_type='content') | s = self.object_add(content, object_type='content') | ||||
if not s: | if not s: | ||||
olasd: Renaming `s` to `stats` would be clearer to me. It'd be nice to do the renaming throughout, at… | |||||
q = self._objects['content'] | q = self._objects['content'] | ||||
total_size = sum(c['length'] for c in q) | total_size = sum(c.length for c in q) | ||||
Not Done Inline Actionsqueue, content instead of q, c? olasd: `queue`, `content` instead of `q`, `c`? | |||||
Not Done Inline ActionsAlso, unrelated to this diff, we should store and update the sum of content lengths in an attribute on the fly to avoid a gratuitous quatratic behavior. olasd: Also, unrelated to this diff, we should store and update the sum of content lengths in an… | |||||
if total_size >= self.min_batch_size['content_bytes']: | if total_size >= self.min_batch_size['content_bytes']: | ||||
Not Done Inline ActionsAt some point we should rename this min_batch_size to buffer_threshold or something clearer. olasd: At some point we should rename this `min_batch_size` to `buffer_threshold` or something clearer. | |||||
return self.flush(['content']) | return self.flush(['content']) | ||||
return s | return s | ||||
def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: | def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: | ||||
if object_types is None: | if object_types is None: | ||||
object_types = self.object_types | object_types = self.object_types | ||||
summary = {} # type: Dict[str, Dict] | summary = {} # type: Dict[str, Dict] | ||||
for object_type in object_types: | for object_type in object_types: | ||||
q = self._objects[object_type] | q = self._objects[object_type] | ||||
for objs in grouper(q, n=self.min_batch_size[object_type]): | for objs in grouper(q, n=self.min_batch_size[object_type]): | ||||
add_fn = getattr(self.storage, '%s_add' % object_type) | add_fn = getattr(self.storage, '%s_add' % object_type) | ||||
s = add_fn(objs) | s = add_fn(objs) | ||||
summary = {k: v + summary.get(k, 0) | summary = {k: v + summary.get(k, 0) | ||||
for k, v in s.items()} | for k, v in s.items()} | ||||
q.clear() | q.clear() | ||||
return summary | return summary | ||||
def object_add(self, objects: Iterable[Dict], *, object_type: str) -> Dict: | def object_add( | ||||
self, objects: Iterable[BaseModel], *, object_type: str) -> Dict: | |||||
"""Enqueue objects to write to the storage. This checks if the queue's | """Enqueue objects to write to the storage. This checks if the queue's | ||||
threshold is hit. If it is actually write those to the storage. | threshold is hit. If it is actually write those to the storage. | ||||
""" | """ | ||||
q = self._objects[object_type] | q = self._objects[object_type] | ||||
threshold = self.min_batch_size[object_type] | threshold = self.min_batch_size[object_type] | ||||
q.extend(objects) | q.extend(objects) | ||||
if len(q) >= threshold: | if len(q) >= threshold: | ||||
return self.flush() | return self.flush() | ||||
return {} | return {} |
Renaming s to stats would be clearer to me. It'd be nice to do the renaming throughout, at some point.
You could also add a comment saying that if not stats: means that object_add didn't flush the buffers, and we should check for the volume of contents.