Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/proxies/buffer.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from functools import partial | from functools import partial | ||||
from typing import Dict, Iterable, Mapping, Sequence, Tuple | import logging | ||||
from typing import Dict, Iterable, Mapping, Sequence, Tuple, cast | |||||
from typing_extensions import Literal | from typing_extensions import Literal | ||||
from swh.core.utils import grouper | from swh.core.utils import grouper | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
BaseModel, | BaseModel, | ||||
Content, | Content, | ||||
Directory, | Directory, | ||||
Release, | Release, | ||||
Revision, | Revision, | ||||
SkippedContent, | SkippedContent, | ||||
) | ) | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
from swh.storage.interface import StorageInterface | from swh.storage.interface import StorageInterface | ||||
logger = logging.getLogger(__name__) | |||||
LObjectType = Literal[ | LObjectType = Literal[ | ||||
"content", | "content", | ||||
"skipped_content", | "skipped_content", | ||||
"directory", | "directory", | ||||
"revision", | "revision", | ||||
"release", | "release", | ||||
"snapshot", | "snapshot", | ||||
"extid", | "extid", | ||||
▲ Show 20 Lines • Show All 197 Lines • ▼ Show 20 Lines | ) -> Dict[str, int]: | ||||
summary: Dict[str, int] = {} | summary: Dict[str, int] = {} | ||||
def update_summary(stats): | def update_summary(stats): | ||||
for k, v in stats.items(): | for k, v in stats.items(): | ||||
summary[k] = v + summary.get(k, 0) | summary[k] = v + summary.get(k, 0) | ||||
for object_type in object_types: | for object_type in object_types: | ||||
buffer_ = self._objects[object_type] | buffer_ = self._objects[object_type] | ||||
if not buffer_: | |||||
continue | |||||
if logger.isEnabledFor(logging.DEBUG): | |||||
log = "Flushing %s objects of type %s" | |||||
log_args = [len(buffer_), object_type] | |||||
if object_type == "content": | |||||
log += " (%s bytes)" | |||||
log_args.append( | |||||
sum(cast(Content, c).length for c in buffer_.values()) | |||||
) | |||||
elif object_type == "directory": | |||||
log += " (%s entries)" | |||||
log_args.append( | |||||
sum(len(cast(Directory, d).entries) for d in buffer_.values()) | |||||
) | |||||
elif object_type == "revision": | |||||
log += " (%s parents, %s estimated bytes)" | |||||
log_args.extend( | |||||
( | |||||
sum( | |||||
len(cast(Revision, r).parents) for r in buffer_.values() | |||||
), | |||||
sum( | |||||
estimate_revision_size(cast(Revision, r)) | |||||
for r in buffer_.values() | |||||
), | |||||
) | |||||
) | |||||
elif object_type == "release": | |||||
log += " (%s estimated bytes)" | |||||
log_args.append( | |||||
sum( | |||||
estimate_release_size(cast(Release, r)) | |||||
for r in buffer_.values() | |||||
) | |||||
) | |||||
logger.debug(log, *log_args) | |||||
batches = grouper(buffer_.values(), n=self._buffer_thresholds[object_type]) | batches = grouper(buffer_.values(), n=self._buffer_thresholds[object_type]) | ||||
for batch in batches: | for batch in batches: | ||||
add_fn = getattr(self.storage, "%s_add" % object_type) | add_fn = getattr(self.storage, "%s_add" % object_type) | ||||
stats = add_fn(list(batch)) | stats = add_fn(list(batch)) | ||||
update_summary(stats) | update_summary(stats) | ||||
# Flush underlying storage | # Flush underlying storage | ||||
stats = self.storage.flush(object_types) | stats = self.storage.flush(object_types) | ||||
Show All 30 Lines |