Changeset View
Standalone View
swh/storage/buffer.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from functools import partial | from functools import partial | ||||
from typing import Dict, Iterable, List, Optional | from typing import Collection, Dict, Iterable, List, Mapping, Optional, Tuple | ||||
from swh.core.utils import grouper | from swh.core.utils import grouper | ||||
from swh.model.model import BaseModel, Content | from swh.model.model import BaseModel, Content, SkippedContent | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.storage.interface import StorageInterface | from swh.storage.interface import StorageInterface | ||||
OBJECT_TYPES: List[str] = [ | |||||
"content", | |||||
"skipped_content", | |||||
"directory", | |||||
"revision", | |||||
"release", | |||||
] | |||||
ardumont: T`H`RESHOLDS | |||||
DEFAULT_BUFFER_THRESHOLDS: Dict[str, int] = { | |||||
"content": 10000, | |||||
"content_bytes": 100 * 1024 * 1024, | |||||
"skipped_content": 10000, | |||||
"directory": 25000, | |||||
"revision": 100000, | |||||
"release": 100000, | |||||
} | |||||
class BufferingProxyStorage: | class BufferingProxyStorage: | ||||
"""Storage implementation in charge of accumulating objects prior to | """Storage implementation in charge of accumulating objects prior to | ||||
discussing with the "main" storage. | discussing with the "main" storage. | ||||
Deduplicates values based on a tuple of keys depending on the object type. | |||||
Sample configuration use case for buffering storage: | Sample configuration use case for buffering storage: | ||||
.. code-block:: yaml | .. code-block:: yaml | ||||
storage: | storage: | ||||
cls: buffer | cls: buffer | ||||
args: | args: | ||||
storage: | storage: | ||||
cls: remote | cls: remote | ||||
args: http://storage.internal.staging.swh.network:5002/ | args: http://storage.internal.staging.swh.network:5002/ | ||||
min_batch_size: | min_batch_size: | ||||
content: 10000 | content: 10000 | ||||
content_bytes: 100000000 | content_bytes: 100000000 | ||||
skipped_content: 10000 | skipped_content: 10000 | ||||
directory: 5000 | directory: 5000 | ||||
revision: 1000 | revision: 1000 | ||||
release: 10000 | release: 10000 | ||||
""" | """ | ||||
def __init__(self, storage, min_batch_size=None): | def __init__( | ||||
self, storage: Mapping, min_batch_size: Mapping = DEFAULT_BUFFER_THRESHOLDS, | |||||
): | |||||
Not Done Inline Actionswhat does that mean? ardumont: what does that mean? | |||||
Not Done Inline Actionsgot it, renaming the parameters, min_batch_size to buffer_thresholds... ardumont: got it, renaming the parameters, `min_batch_size` to `buffer_thresholds`... | |||||
self.storage: StorageInterface = get_storage(**storage) | self.storage: StorageInterface = get_storage(**storage) | ||||
if min_batch_size is None: | if min_batch_size is not DEFAULT_BUFFER_THRESHOLDS: | ||||
Not Done Inline Actionsi read that somewhere [1] ;) [1] https://treyhunner.com/2016/02/how-to-merge-dictionaries-in-python/ ardumont: i read that somewhere [1] ;)
[1] https://treyhunner.com/2016/02/how-to-merge-dictionaries-in… | |||||
Done Inline ActionsThat's common py>=3.5 dict merging, but I hesitated to use this which is shallow merge vs. core/config merge_configs which is a deep merge. For now it is OK, but it would be great that this function was moved to a core utility module with a generic name like deepmerge_dicts so that we could really use it here. tenma: That's common py>=3.5 dict merging, but I hesitated to use this which is shallow merge vs. | |||||
Not Done Inline ActionsI think this is enough for our use case though. ardumont: I think this is enough for our use case though. | |||||
Not Done Inline ActionsJust use min_batch_size: Optional[Dict], and test if it's None instead of doing identity comparison on dicts. As for merging dicts: https://forge.softwareheritage.org/source/swh-core/browse/master/swh/core/config.py$173 vlorentz: Just use `min_batch_size: Optional[Dict]`, and test if it's `None` instead of doing identity… | |||||
min_batch_size = {} | self._buffer_thresholds = {**DEFAULT_BUFFER_THRESHOLDS, **min_batch_size} | ||||
Not Done Inline Actionswhy not != ? ardumont: why not `!=` ? | |||||
Done Inline Actionsbecause it IS a case for identity equality rather than structural one : we test on default value. This check is not needed for correctness but for both clarity and performance. tenma: because it IS a case for identity equality rather than structural one : we test on default… | |||||
self.min_batch_size = { | self._objects: Dict[str, Dict[Tuple[str, ...], BaseModel]] = { | ||||
"content": min_batch_size.get("content", 10000), | k: {} for k in OBJECT_TYPES | ||||
"content_bytes": min_batch_size.get("content_bytes", 100 * 1024 * 1024), | |||||
"skipped_content": min_batch_size.get("skipped_content", 10000), | |||||
"directory": min_batch_size.get("directory", 25000), | |||||
"revision": min_batch_size.get("revision", 100000), | |||||
"release": min_batch_size.get("release", 100000), | |||||
} | } | ||||
self.object_types = [ | self._contents_size: int = 0 | ||||
"content", | |||||
"skipped_content", | |||||
"directory", | |||||
"revision", | |||||
"release", | |||||
] | |||||
self._objects = {k: {} for k in self.object_types} | |||||
def __getattr__(self, key): | def __getattr__(self, key: str): | ||||
if key.endswith("_add"): | if key.endswith("_add"): | ||||
object_type = key.rsplit("_", 1)[0] | object_type = key.rsplit("_", 1)[0] | ||||
if object_type in self.object_types: | if object_type in OBJECT_TYPES: | ||||
return partial(self.object_add, object_type=object_type, keys=["id"],) | return partial(self.object_add, object_type=object_type, keys=["id"],) | ||||
Not Done Inline ActionsLifting some doubt i have now (or again), i gather content and skipped_content (included in OBJECT_TYPES) are not concerned by that conditional anyway (asking because their id is not under the "id" name). I recall that since content_add and skipped_content_add are namely overridden here, they don't pass into the __getattr__ call. (I noticed that it is the current implementation anyways but still, not really apparent ;) ardumont: Lifting some doubt i have now (or again), i gather `content` and `skipped_content` (included… | |||||
Done Inline ActionsYeah I also wondered and checked that getattr gets called after lookup fail, whereas getattribute gets called before tenma: Yeah I also wondered and checked that __getattr__ gets called after lookup fail, whereas… | |||||
if key == "storage": | if key == "storage": | ||||
raise AttributeError(key) | raise AttributeError(key) | ||||
return getattr(self.storage, key) | return getattr(self.storage, key) | ||||
def content_add(self, content: List[Content]) -> Dict: | def content_add(self, contents: Collection[Content]) -> Dict: | ||||
"""Enqueue contents to write to the storage. | """Push contents to write to the storage in the buffer. | ||||
Following policies apply: | Following policies apply: | ||||
- if the buffer's threshold is hit, flush content to the storage. | |||||
- First, check if the queue's threshold is hit. | - otherwise, if the total size of buffered contents's threshold is hit, | ||||
If it is flush content to the storage. | flush content to the storage. | ||||
- If not, check if the total size of enqueued contents's | |||||
threshold is hit. If it is flush content to the storage. | |||||
""" | """ | ||||
s = self.object_add( | stats = self.object_add( | ||||
content, | contents, | ||||
object_type="content", | object_type="content", | ||||
keys=["sha1", "sha1_git", "sha256", "blake2s256"], | keys=["sha1", "sha1_git", "sha256", "blake2s256"], | ||||
) | ) | ||||
if not s: | if not stats: # We did not flush already | ||||
buffer_ = self._objects["content"].values() | self._contents_size += sum(c.length for c in contents) | ||||
total_size = sum(c.length for c in buffer_) | if self._contents_size >= self._buffer_thresholds["content_bytes"]: | ||||
if total_size >= self.min_batch_size["content_bytes"]: | |||||
return self.flush(["content"]) | return self.flush(["content"]) | ||||
return s | return stats | ||||
def skipped_content_add(self, content: List[Content]) -> Dict: | def skipped_content_add(self, contents: Collection[SkippedContent]) -> Dict: | ||||
return self.object_add( | return self.object_add( | ||||
content, | contents, | ||||
object_type="skipped_content", | object_type="skipped_content", | ||||
keys=["sha1", "sha1_git", "sha256", "blake2s256"], | keys=["sha1", "sha1_git", "sha256", "blake2s256"], | ||||
) | ) | ||||
def flush(self, object_types: Optional[List[str]] = None) -> Dict: | |||||
summary: Dict[str, int] = self.storage.flush(object_types) | |||||
if object_types is None: | |||||
object_types = self.object_types | |||||
for object_type in object_types: | |||||
buffer_ = self._objects[object_type] | |||||
batches = grouper(buffer_.values(), n=self.min_batch_size[object_type]) | |||||
for batch in batches: | |||||
add_fn = getattr(self.storage, "%s_add" % object_type) | |||||
s = add_fn(list(batch)) | |||||
summary = {k: v + summary.get(k, 0) for k, v in s.items()} | |||||
buffer_.clear() | |||||
return summary | |||||
def object_add( | def object_add( | ||||
self, objects: Iterable[BaseModel], *, object_type: str, keys: List[str] | self, objects: Collection[BaseModel], *, object_type: str, keys: Iterable[str], | ||||
) -> Dict: | ) -> Dict[str, int]: | ||||
"""Enqueue objects to write to the storage. This checks if the queue's | """Push objects to write to the storage in the buffer. Flushes the | ||||
threshold is hit. If it is actually write those to the storage. | buffer to the storage if the threshold is hit. | ||||
""" | """ | ||||
buffer_ = self._objects[object_type] | buffer_ = self._objects[object_type] | ||||
threshold = self.min_batch_size[object_type] | |||||
for obj in objects: | for obj in objects: | ||||
obj_key = tuple(getattr(obj, key) for key in keys) | obj_key = tuple(getattr(obj, key) for key in keys) | ||||
buffer_[obj_key] = obj | buffer_[obj_key] = obj | ||||
if len(buffer_) >= threshold: | if len(buffer_) >= self._buffer_thresholds[object_type]: | ||||
return self.flush() | return self.flush() | ||||
return {} | return {} | ||||
def flush(self, object_types: Optional[List[str]] = None) -> Dict[str, int]: | |||||
summary: Dict[str, int] = self.storage.flush(object_types) | |||||
if object_types is None: | |||||
object_types = OBJECT_TYPES | |||||
for object_type in object_types: | |||||
buffer_ = self._objects[object_type] | |||||
batches = grouper(buffer_.values(), n=self._buffer_thresholds[object_type]) | |||||
for batch in batches: | |||||
add_fn = getattr(self.storage, "%s_add" % object_type) | |||||
stats = add_fn(list(batch)) | |||||
summary = {k: v + summary.get(k, 0) for k, v in stats.items()} | |||||
self.clear_buffers(object_types) | |||||
return summary | |||||
def clear_buffers(self, object_types: Optional[List[str]] = None) -> None: | def clear_buffers(self, object_types: Optional[List[str]] = None) -> None: | ||||
"""Clear objects from current buffer. | """Clear objects from current buffer. | ||||
WARNING: | WARNING: | ||||
data that has not been flushed to storage will be lost when this | data that has not been flushed to storage will be lost when this | ||||
method is called. This should only be called when `flush` fails and | method is called. This should only be called when `flush` fails and | ||||
you want to continue your processing. | you want to continue your processing. | ||||
""" | """ | ||||
if object_types is None: | if object_types is None: | ||||
object_types = self.object_types | object_types = OBJECT_TYPES | ||||
for object_type in object_types: | for object_type in object_types: | ||||
q = self._objects[object_type] | buffer_ = self._objects[object_type] | ||||
q.clear() | buffer_.clear() | ||||
if object_type == "content": | |||||
self._contents_size = 0 | |||||
return self.storage.clear_buffers(object_types) | return self.storage.clear_buffers(object_types) |
T`H`RESHOLDS