Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/filter.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from typing import Dict, Iterable, Set | from typing import Dict, Iterable, Optional, Set | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
Content, | Content, | ||||
SkippedContent, | SkippedContent, | ||||
Directory, | Directory, | ||||
Revision, | Revision, | ||||
) | ) | ||||
Show All 11 Lines | .. code-block: yaml | ||||
storage: | storage: | ||||
cls: filter | cls: filter | ||||
storage: | storage: | ||||
cls: remote | cls: remote | ||||
url: http://storage.internal.staging.swh.network:5002/ | url: http://storage.internal.staging.swh.network:5002/ | ||||
""" | """ | ||||
object_types = ["content", "skipped_content", "directory", "revision"] | |||||
objects_seen: Dict[str, Set[bytes]] = {} | |||||
def __init__(self, storage): | def __init__(self, storage): | ||||
self.storage = get_storage(**storage) | self.storage = get_storage(**storage) | ||||
self.objects_seen = { | for object_type in self.object_types: | ||||
"content": set(), # sha256 | self.objects_seen[object_type] = set() | ||||
"skipped_content": set(), # sha1_git | |||||
"directory": set(), # sha1_git | |||||
"revision": set(), # sha1_git | |||||
} | |||||
def __getattr__(self, key): | def __getattr__(self, key): | ||||
if key == "storage": | if key == "storage": | ||||
raise AttributeError(key) | raise AttributeError(key) | ||||
return getattr(self.storage, key) | return getattr(self.storage, key) | ||||
def content_add(self, content: Iterable[Content]) -> Dict: | def content_add(self, content: Iterable[Content]) -> Dict: | ||||
contents = list(content) | contents = list(content) | ||||
▲ Show 20 Lines • Show All 81 Lines • ▼ Show 20 Lines | def _filter_missing_ids(self, object_type: str, ids: Iterable[bytes]) -> Set[bytes]: | ||||
fn_by_object_type = { | fn_by_object_type = { | ||||
"revision": self.storage.revision_missing, | "revision": self.storage.revision_missing, | ||||
"directory": self.storage.directory_missing, | "directory": self.storage.directory_missing, | ||||
} | } | ||||
fn = fn_by_object_type[object_type] | fn = fn_by_object_type[object_type] | ||||
return set(fn(missing_ids)) | return set(fn(missing_ids)) | ||||
def clear_buffers(self, object_types: Optional[Iterable[str]] = None) -> None: | |||||
"""Clear objects from current buffer | |||||
""" | |||||
if object_types is None: | |||||
object_types = self.object_types | |||||
for object_type in object_types: | |||||
self.objects_seen[object_type] = set() | |||||
return self.storage.clear_buffers(object_types) |