Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/proxies/buffer.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from functools import partial | from functools import partial | ||||
from typing import Dict, Iterable, Mapping, Sequence, Tuple | from typing import Dict, Iterable, Mapping, Sequence, Tuple | ||||
from typing_extensions import Literal | from typing_extensions import Literal | ||||
from swh.core.utils import grouper | from swh.core.utils import grouper | ||||
from swh.model.model import BaseModel, Content, Directory, SkippedContent | from swh.model.model import BaseModel, Content, Directory, Revision, SkippedContent | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.storage.interface import StorageInterface | from swh.storage.interface import StorageInterface | ||||
LObjectType = Literal[ | LObjectType = Literal[ | ||||
"content", | "content", | ||||
"skipped_content", | "skipped_content", | ||||
"directory", | "directory", | ||||
"revision", | "revision", | ||||
Show All 13 Lines | |||||
DEFAULT_BUFFER_THRESHOLDS: Dict[str, int] = { | DEFAULT_BUFFER_THRESHOLDS: Dict[str, int] = { | ||||
"content": 10000, | "content": 10000, | ||||
"content_bytes": 100 * 1024 * 1024, | "content_bytes": 100 * 1024 * 1024, | ||||
"skipped_content": 10000, | "skipped_content": 10000, | ||||
"directory": 25000, | "directory": 25000, | ||||
"directory_entries": 200000, | "directory_entries": 200000, | ||||
"revision": 100000, | "revision": 100000, | ||||
"revision_parents": 200000, | |||||
"release": 100000, | "release": 100000, | ||||
"snapshot": 25000, | "snapshot": 25000, | ||||
"extid": 10000, | "extid": 10000, | ||||
} | } | ||||
class BufferingProxyStorage: | class BufferingProxyStorage: | ||||
"""Storage implementation in charge of accumulating objects prior to | """Storage implementation in charge of accumulating objects prior to | ||||
Show All 13 Lines | .. code-block:: yaml | ||||
args: http://storage.internal.staging.swh.network:5002/ | args: http://storage.internal.staging.swh.network:5002/ | ||||
min_batch_size: | min_batch_size: | ||||
content: 10000 | content: 10000 | ||||
content_bytes: 100000000 | content_bytes: 100000000 | ||||
skipped_content: 10000 | skipped_content: 10000 | ||||
directory: 5000 | directory: 5000 | ||||
directory_entries: 100000 | directory_entries: 100000 | ||||
revision: 1000 | revision: 1000 | ||||
revision_parents: 2000 | |||||
release: 10000 | release: 10000 | ||||
snapshot: 5000 | snapshot: 5000 | ||||
""" | """ | ||||
def __init__(self, storage: Mapping, min_batch_size: Mapping = {}): | def __init__(self, storage: Mapping, min_batch_size: Mapping = {}): | ||||
self.storage: StorageInterface = get_storage(**storage) | self.storage: StorageInterface = get_storage(**storage) | ||||
self._buffer_thresholds = {**DEFAULT_BUFFER_THRESHOLDS, **min_batch_size} | self._buffer_thresholds = {**DEFAULT_BUFFER_THRESHOLDS, **min_batch_size} | ||||
self._objects: Dict[LObjectType, Dict[Tuple[str, ...], BaseModel]] = { | self._objects: Dict[LObjectType, Dict[Tuple[str, ...], BaseModel]] = { | ||||
k: {} for k in OBJECT_TYPES | k: {} for k in OBJECT_TYPES | ||||
} | } | ||||
self._contents_size: int = 0 | self._contents_size: int = 0 | ||||
self._directory_entries: int = 0 | self._directory_entries: int = 0 | ||||
self._revision_parents: int = 0 | |||||
def __getattr__(self, key: str): | def __getattr__(self, key: str): | ||||
if key.endswith("_add"): | if key.endswith("_add"): | ||||
object_type = key.rsplit("_", 1)[0] | object_type = key.rsplit("_", 1)[0] | ||||
if object_type in OBJECT_TYPES: | if object_type in OBJECT_TYPES: | ||||
return partial(self.object_add, object_type=object_type, keys=["id"],) | return partial(self.object_add, object_type=object_type, keys=["id"],) | ||||
if key == "storage": | if key == "storage": | ||||
raise AttributeError(key) | raise AttributeError(key) | ||||
Show All 35 Lines | def directory_add(self, directories: Sequence[Directory]) -> Dict[str, int]: | ||||
if not stats: | if not stats: | ||||
# We did not flush based on number of objects; check the number of entries | # We did not flush based on number of objects; check the number of entries | ||||
self._directory_entries += sum(len(d.entries) for d in directories) | self._directory_entries += sum(len(d.entries) for d in directories) | ||||
if self._directory_entries >= self._buffer_thresholds["directory_entries"]: | if self._directory_entries >= self._buffer_thresholds["directory_entries"]: | ||||
return self.flush(["content", "directory"]) | return self.flush(["content", "directory"]) | ||||
return stats | return stats | ||||
def revision_add(self, revisions: Sequence[Revision]) -> Dict[str, int]: | |||||
stats = self.object_add(revisions, object_type="revision", keys=["id"]) | |||||
if not stats: | |||||
# We did not flush based on number of objects; check the number of parents | |||||
self._revision_parents += sum(len(r.parents) for r in revisions) | |||||
if self._revision_parents >= self._buffer_thresholds["revision_parents"]: | |||||
return self.flush(["content", "directory", "revision"]) | |||||
return stats | |||||
def object_add( | def object_add( | ||||
self, | self, | ||||
objects: Sequence[BaseModel], | objects: Sequence[BaseModel], | ||||
*, | *, | ||||
object_type: LObjectType, | object_type: LObjectType, | ||||
keys: Iterable[str], | keys: Iterable[str], | ||||
) -> Dict[str, int]: | ) -> Dict[str, int]: | ||||
"""Push objects to write to the storage in the buffer. Flushes the | """Push objects to write to the storage in the buffer. Flushes the | ||||
▲ Show 20 Lines • Show All 46 Lines • ▼ Show 20 Lines | def clear_buffers(self, object_types: Sequence[LObjectType] = OBJECT_TYPES) -> None: | ||||
""" | """ | ||||
for object_type in object_types: | for object_type in object_types: | ||||
buffer_ = self._objects[object_type] | buffer_ = self._objects[object_type] | ||||
buffer_.clear() | buffer_.clear() | ||||
if object_type == "content": | if object_type == "content": | ||||
self._contents_size = 0 | self._contents_size = 0 | ||||
elif object_type == "directory": | elif object_type == "directory": | ||||
self._directory_entries = 0 | self._directory_entries = 0 | ||||
elif object_type == "revision": | |||||
self._revision_parents = 0 | |||||
self.storage.clear_buffers(object_types) | self.storage.clear_buffers(object_types) |