Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/proxies/buffer.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from functools import partial | from functools import partial | ||||
from typing import Dict, Iterable, Mapping, Sequence, Tuple | from typing import Dict, Iterable, Mapping, Sequence, Tuple | ||||
from typing_extensions import Literal | from typing_extensions import Literal | ||||
from swh.core.utils import grouper | from swh.core.utils import grouper | ||||
from swh.model.model import BaseModel, Content, Directory, Revision, SkippedContent | from swh.model.model import ( | ||||
BaseModel, | |||||
Content, | |||||
Directory, | |||||
Release, | |||||
Revision, | |||||
SkippedContent, | |||||
) | |||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.storage.interface import StorageInterface | from swh.storage.interface import StorageInterface | ||||
LObjectType = Literal[ | LObjectType = Literal[ | ||||
"content", | "content", | ||||
"skipped_content", | "skipped_content", | ||||
"directory", | "directory", | ||||
"revision", | "revision", | ||||
Show All 14 Lines | |||||
DEFAULT_BUFFER_THRESHOLDS: Dict[str, int] = { | DEFAULT_BUFFER_THRESHOLDS: Dict[str, int] = { | ||||
"content": 10000, | "content": 10000, | ||||
"content_bytes": 100 * 1024 * 1024, | "content_bytes": 100 * 1024 * 1024, | ||||
"skipped_content": 10000, | "skipped_content": 10000, | ||||
"directory": 25000, | "directory": 25000, | ||||
"directory_entries": 200000, | "directory_entries": 200000, | ||||
"revision": 100000, | "revision": 100000, | ||||
"revision_parents": 200000, | "revision_parents": 200000, | ||||
"revision_bytes": 100 * 1024 * 1024, | |||||
"release": 100000, | "release": 100000, | ||||
"release_bytes": 100 * 1024 * 1024, | |||||
"snapshot": 25000, | "snapshot": 25000, | ||||
"extid": 10000, | "extid": 10000, | ||||
} | } | ||||
def estimate_revision_size(revision: Revision) -> int: | |||||
"""Estimate the size of a revision, by summing the size of variable length fields""" | |||||
s = 20 * len(revision.parents) | |||||
if revision.message: | |||||
s += len(revision.message) | |||||
s += len(revision.author.fullname) | |||||
s += len(revision.committer.fullname) | |||||
s += sum(len(h) + len(v) for h, v in revision.extra_headers) | |||||
return s | |||||
def estimate_release_size(release: Release) -> int: | |||||
"""Estimate the size of a release, by summing the size of variable length fields""" | |||||
s = 0 | |||||
if release.message: | |||||
s += len(release.message) | |||||
if release.author: | |||||
s += len(release.author.fullname) | |||||
return s | |||||
class BufferingProxyStorage: | class BufferingProxyStorage: | ||||
"""Storage implementation in charge of accumulating objects prior to | """Storage implementation in charge of accumulating objects prior to | ||||
discussing with the "main" storage. | discussing with the "main" storage. | ||||
Deduplicates values based on a tuple of keys depending on the object type. | Deduplicates values based on a tuple of keys depending on the object type. | ||||
Sample configuration use case for buffering storage: | Sample configuration use case for buffering storage: | ||||
.. code-block:: yaml | .. code-block:: yaml | ||||
storage: | storage: | ||||
cls: buffer | cls: buffer | ||||
args: | args: | ||||
storage: | storage: | ||||
cls: remote | cls: remote | ||||
args: http://storage.internal.staging.swh.network:5002/ | args: http://storage.internal.staging.swh.network:5002/ | ||||
min_batch_size: | min_batch_size: | ||||
content: 10000 | content: 10000 | ||||
content_bytes: 100000000 | content_bytes: 100000000 | ||||
skipped_content: 10000 | skipped_content: 10000 | ||||
directory: 5000 | directory: 5000 | ||||
directory_entries: 100000 | directory_entries: 100000 | ||||
revision: 1000 | revision: 1000 | ||||
revision_parents: 2000 | revision_parents: 2000 | ||||
revision_bytes: 100000000 | |||||
release: 10000 | release: 10000 | ||||
release_bytes: 100000000 | |||||
snapshot: 5000 | snapshot: 5000 | ||||
""" | """ | ||||
def __init__(self, storage: Mapping, min_batch_size: Mapping = {}): | def __init__(self, storage: Mapping, min_batch_size: Mapping = {}): | ||||
self.storage: StorageInterface = get_storage(**storage) | self.storage: StorageInterface = get_storage(**storage) | ||||
self._buffer_thresholds = {**DEFAULT_BUFFER_THRESHOLDS, **min_batch_size} | self._buffer_thresholds = {**DEFAULT_BUFFER_THRESHOLDS, **min_batch_size} | ||||
self._objects: Dict[LObjectType, Dict[Tuple[str, ...], BaseModel]] = { | self._objects: Dict[LObjectType, Dict[Tuple[str, ...], BaseModel]] = { | ||||
k: {} for k in OBJECT_TYPES | k: {} for k in OBJECT_TYPES | ||||
} | } | ||||
self._contents_size: int = 0 | self._contents_size: int = 0 | ||||
self._directory_entries: int = 0 | self._directory_entries: int = 0 | ||||
self._revision_parents: int = 0 | self._revision_parents: int = 0 | ||||
self._revision_size: int = 0 | |||||
self._release_size: int = 0 | |||||
def __getattr__(self, key: str): | def __getattr__(self, key: str): | ||||
if key.endswith("_add"): | if key.endswith("_add"): | ||||
object_type = key.rsplit("_", 1)[0] | object_type = key.rsplit("_", 1)[0] | ||||
if object_type in OBJECT_TYPES: | if object_type in OBJECT_TYPES: | ||||
return partial(self.object_add, object_type=object_type, keys=["id"],) | return partial(self.object_add, object_type=object_type, keys=["id"],) | ||||
if key == "storage": | if key == "storage": | ||||
raise AttributeError(key) | raise AttributeError(key) | ||||
Show All 39 Lines | def directory_add(self, directories: Sequence[Directory]) -> Dict[str, int]: | ||||
return self.flush(["content", "directory"]) | return self.flush(["content", "directory"]) | ||||
return stats | return stats | ||||
def revision_add(self, revisions: Sequence[Revision]) -> Dict[str, int]: | def revision_add(self, revisions: Sequence[Revision]) -> Dict[str, int]: | ||||
stats = self.object_add(revisions, object_type="revision", keys=["id"]) | stats = self.object_add(revisions, object_type="revision", keys=["id"]) | ||||
if not stats: | if not stats: | ||||
# We did not flush based on number of objects; check the number of parents | # We did not flush based on number of objects; check the number of | ||||
# parents and estimated size | |||||
self._revision_parents += sum(len(r.parents) for r in revisions) | self._revision_parents += sum(len(r.parents) for r in revisions) | ||||
if self._revision_parents >= self._buffer_thresholds["revision_parents"]: | self._revision_size += sum(estimate_revision_size(r) for r in revisions) | ||||
if ( | |||||
self._revision_parents >= self._buffer_thresholds["revision_parents"] | |||||
or self._revision_size >= self._buffer_thresholds["revision_bytes"] | |||||
): | |||||
return self.flush(["content", "directory", "revision"]) | return self.flush(["content", "directory", "revision"]) | ||||
return stats | return stats | ||||
def release_add(self, releases: Sequence[Release]) -> Dict[str, int]: | |||||
stats = self.object_add(releases, object_type="release", keys=["id"]) | |||||
if not stats: | |||||
# We did not flush based on number of objects; check the estimated size | |||||
self._release_size += sum(estimate_release_size(r) for r in releases) | |||||
if self._release_size >= self._buffer_thresholds["release_bytes"]: | |||||
ardumont: oops ;) | |||||
return self.flush(["content", "directory", "revision", "release"]) | |||||
return stats | |||||
def object_add( | def object_add( | ||||
self, | self, | ||||
objects: Sequence[BaseModel], | objects: Sequence[BaseModel], | ||||
*, | *, | ||||
object_type: LObjectType, | object_type: LObjectType, | ||||
keys: Iterable[str], | keys: Iterable[str], | ||||
) -> Dict[str, int]: | ) -> Dict[str, int]: | ||||
"""Push objects to write to the storage in the buffer. Flushes the | """Push objects to write to the storage in the buffer. Flushes the | ||||
▲ Show 20 Lines • Show All 48 Lines • ▼ Show 20 Lines | def clear_buffers(self, object_types: Sequence[LObjectType] = OBJECT_TYPES) -> None: | ||||
buffer_ = self._objects[object_type] | buffer_ = self._objects[object_type] | ||||
buffer_.clear() | buffer_.clear() | ||||
if object_type == "content": | if object_type == "content": | ||||
self._contents_size = 0 | self._contents_size = 0 | ||||
elif object_type == "directory": | elif object_type == "directory": | ||||
self._directory_entries = 0 | self._directory_entries = 0 | ||||
elif object_type == "revision": | elif object_type == "revision": | ||||
self._revision_parents = 0 | self._revision_parents = 0 | ||||
self._revision_size = 0 | |||||
elif object_type == "release": | |||||
self._release_size = 0 | |||||
self.storage.clear_buffers(object_types) | self.storage.clear_buffers(object_types) |
oops ;)