Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/buffer.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from functools import partial | from functools import partial | ||||
from typing import Dict, Iterable, Mapping, Sequence, Tuple | from typing import Dict, Iterable, List, Mapping, Sequence, Tuple | ||||
from typing_extensions import Literal | from typing_extensions import Literal | ||||
from swh.core.utils import grouper | from swh.core.utils import grouper | ||||
from swh.model.model import BaseModel, Content, SkippedContent | from swh.model.model import BaseModel, Content, Origin, SkippedContent | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.storage.interface import StorageInterface | from swh.storage.interface import StorageInterface | ||||
LObjectType = Literal[ | LObjectType = Literal[ | ||||
"content", | "content", | ||||
"skipped_content", | "skipped_content", | ||||
"directory", | "directory", | ||||
"revision", | "revision", | ||||
"release", | "release", | ||||
"snapshot", | "snapshot", | ||||
"extid", | "extid", | ||||
"origin", | |||||
"origin_visit_status", | |||||
"raw_extrinsic_metadata", | |||||
"metadata_fetcher", | |||||
"metadata_authority", | |||||
] | ] | ||||
OBJECT_TYPES: Tuple[LObjectType, ...] = ( | OBJECT_TYPES: Tuple[LObjectType, ...] = ( | ||||
"content", | "content", | ||||
"skipped_content", | "skipped_content", | ||||
"directory", | "directory", | ||||
"revision", | "revision", | ||||
"release", | "release", | ||||
"snapshot", | "snapshot", | ||||
"extid", | "extid", | ||||
"origin", | |||||
"origin_visit_status", | |||||
"raw_extrinsic_metadata", | |||||
"metadata_fetcher", | |||||
"metadata_authority", | |||||
) | ) | ||||
DEFAULT_BUFFER_THRESHOLDS: Dict[str, int] = { | DEFAULT_BUFFER_THRESHOLDS: Dict[str, int] = { | ||||
"content": 10000, | "content": 10000, | ||||
"content_bytes": 100 * 1024 * 1024, | "content_bytes": 100 * 1024 * 1024, | ||||
"skipped_content": 10000, | "skipped_content": 10000, | ||||
"directory": 25000, | "directory": 25000, | ||||
"revision": 100000, | "revision": 100000, | ||||
"release": 100000, | "release": 100000, | ||||
"snapshot": 25000, | "snapshot": 25000, | ||||
"extid": 10000, | "extid": 10000, | ||||
"origin": 100000, | |||||
"origin_visit_status": 10000, | |||||
vlorentz: hmm, they can be pretty big. You may want to divide this by 10 | |||||
Done Inline ActionsI wasn't sure, so ok douardda: I wasn't sure, so ok | |||||
"raw_extrinsic_metadata": 10000, | |||||
"metadata_fetcher": 100000, | |||||
"metadata_authority": 100000, | |||||
} | } | ||||
class BufferingProxyStorage: | class BufferingProxyStorage: | ||||
"""Storage implementation in charge of accumulating objects prior to | """Storage implementation in charge of accumulating objects prior to | ||||
discussing with the "main" storage. | discussing with the "main" storage. | ||||
Deduplicates values based on a tuple of keys depending on the object type. | Deduplicates values based on a tuple of keys depending on the object type. | ||||
▲ Show 20 Lines • Show All 62 Lines • ▼ Show 20 Lines | class BufferingProxyStorage: | ||||
def skipped_content_add(self, contents: Sequence[SkippedContent]) -> Dict[str, int]: | def skipped_content_add(self, contents: Sequence[SkippedContent]) -> Dict[str, int]: | ||||
return self.object_add( | return self.object_add( | ||||
contents, | contents, | ||||
object_type="skipped_content", | object_type="skipped_content", | ||||
keys=["sha1", "sha1_git", "sha256", "blake2s256"], | keys=["sha1", "sha1_git", "sha256", "blake2s256"], | ||||
) | ) | ||||
def origin_add(self, origins: List[Origin]) -> Dict[str, int]: | |||||
return self.object_add(origins, object_type="origin", keys=["url"],) | |||||
def object_add( | def object_add( | ||||
self, | self, | ||||
objects: Sequence[BaseModel], | objects: Sequence[BaseModel], | ||||
*, | *, | ||||
object_type: LObjectType, | object_type: LObjectType, | ||||
keys: Iterable[str], | keys: Iterable[str], | ||||
) -> Dict[str, int]: | ) -> Dict[str, int]: | ||||
"""Push objects to write to the storage in the buffer. Flushes the | """Push objects to write to the storage in the buffer. Flushes the | ||||
▲ Show 20 Lines • Show All 54 Lines • Show Last 20 Lines |
hmm, they can be pretty big. You may want to divide this by 10