diff --git a/swh/storage/proxies/buffer.py b/swh/storage/proxies/buffer.py --- a/swh/storage/proxies/buffer.py +++ b/swh/storage/proxies/buffer.py @@ -4,15 +4,18 @@ # See top-level LICENSE file for more information from functools import partial -from typing import Dict, Iterable, Mapping, Sequence, Tuple +import logging +from typing import Dict, Iterable, Mapping, Sequence, Tuple, cast from typing_extensions import Literal from swh.core.utils import grouper -from swh.model.model import BaseModel, Content, SkippedContent +from swh.model.model import BaseModel, Content, Directory, SkippedContent from swh.storage import get_storage from swh.storage.interface import StorageInterface +logger = logging.getLogger(__name__) + LObjectType = Literal[ "content", "skipped_content", @@ -37,6 +40,7 @@ "content_bytes": 100 * 1024 * 1024, "skipped_content": 10000, "directory": 25000, + "directory_entries": 200000, "revision": 100000, "release": 100000, "snapshot": 25000, @@ -65,6 +69,7 @@ content_bytes: 100000000 skipped_content: 10000 directory: 5000 + directory_entries: 100000 revision: 1000 release: 10000 snapshot: 5000 @@ -80,6 +85,7 @@ k: {} for k in OBJECT_TYPES } self._contents_size: int = 0 + self._directory_entries: int = 0 def __getattr__(self, key: str): if key.endswith("_add"): @@ -105,7 +111,8 @@ object_type="content", keys=["sha1", "sha1_git", "sha256", "blake2s256"], ) - if not stats: # We did not flush already + if not stats: + # We did not flush based on number of objects; check total size self._contents_size += sum(c.length for c in contents) if self._contents_size >= self._buffer_thresholds["content_bytes"]: return self.flush(["content"]) @@ -119,6 +126,17 @@ keys=["sha1", "sha1_git", "sha256", "blake2s256"], ) + def directory_add(self, directories: Sequence[Directory]) -> Dict[str, int]: + stats = self.object_add(directories, object_type="directory", keys=["id"]) + + if not stats: + # We did not flush based on number of objects; check the number of entries + self._directory_entries += sum(len(d.entries) for d in directories) + if self._directory_entries >= self._buffer_thresholds["directory_entries"]: + return self.flush(["content", "directory"]) + + return stats + def object_add( self, objects: Sequence[BaseModel], @@ -150,6 +168,25 @@ for object_type in object_types: buffer_ = self._objects[object_type] + if not buffer_: + continue + + if logger.isEnabledFor(logging.DEBUG): + logger.debug( + "Flushing %s objects of type %s", len(buffer_), object_type + ) + if object_type == "content": + logger.debug( + "Flushed %s bytes", + sum(cast(Content, c).length for c in buffer_.values()), + ) + + if object_type == "directory": + logger.debug( + "Flushed %s directory entries", + sum(len(cast(Directory, d).entries) for d in buffer_.values()), + ) + batches = grouper(buffer_.values(), n=self._buffer_thresholds[object_type]) for batch in batches: add_fn = getattr(self.storage, "%s_add" % object_type) @@ -179,5 +216,7 @@ buffer_.clear() if object_type == "content": self._contents_size = 0 + elif object_type == "directory": + self._directory_entries = 0 self.storage.clear_buffers(object_types) diff --git a/swh/storage/proxies/filter.py b/swh/storage/proxies/filter.py --- a/swh/storage/proxies/filter.py +++ b/swh/storage/proxies/filter.py @@ -6,7 +6,14 @@ from typing import Dict, Iterable, List, Set -from swh.model.model import Content, Directory, Revision, Sha1Git, SkippedContent +from swh.model.model import ( + Content, + Directory, + Release, + Revision, + Sha1Git, + SkippedContent, +) from swh.storage import get_storage from swh.storage.interface import StorageInterface @@ -39,26 +46,46 @@ def content_add(self, content: List[Content]) -> Dict[str, int]: contents_to_add = self._filter_missing_contents(content) + if not contents_to_add: + return { + "content:add": 0, + "content:add:bytes": 0, + } return self.storage.content_add( [x for x in content if x.sha256 in contents_to_add] ) def skipped_content_add(self, content: List[SkippedContent]) -> Dict[str, int]: contents_to_add = self._filter_missing_skipped_contents(content) + if not contents_to_add: + return { + "content:add": 0, + "content:add:bytes": 0, + } return self.storage.skipped_content_add( [x for x in content if x.sha1_git is None or x.sha1_git in contents_to_add] ) def directory_add(self, directories: List[Directory]) -> Dict[str, int]: missing_ids = self._filter_missing_ids("directory", (d.id for d in directories)) + if not missing_ids: + return {"directory:add": 0} return self.storage.directory_add( [d for d in directories if d.id in missing_ids] ) def revision_add(self, revisions: List[Revision]) -> Dict[str, int]: missing_ids = self._filter_missing_ids("revision", (r.id for r in revisions)) + if not missing_ids: + return {"revision:add": 0} return self.storage.revision_add([r for r in revisions if r.id in missing_ids]) + def release_add(self, releases: List[Release]) -> Dict[str, int]: + missing_ids = self._filter_missing_ids("release", (r.id for r in releases)) + if not missing_ids: + return {"release:add": 0} + return self.storage.release_add([r for r in releases if r.id in missing_ids]) + def _filter_missing_contents(self, contents: List[Content]) -> Set[bytes]: """Return only the content keys missing from swh