diff --git a/swh/storage/filter.py b/swh/storage/filter.py index 692e224b..14d9882d 100644 --- a/swh/storage/filter.py +++ b/swh/storage/filter.py @@ -1,153 +1,125 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Dict, Iterable, Optional, Set +from typing import Dict, Iterable, Set from swh.model.model import ( Content, SkippedContent, Directory, Revision, ) from swh.storage import get_storage class FilteringProxyStorage: """Filtering Storage implementation. This is in charge of transparently filtering out known objects prior to adding them to storage. Sample configuration use case for filtering storage: .. code-block: yaml storage: cls: filter storage: cls: remote url: http://storage.internal.staging.swh.network:5002/ """ object_types = ["content", "skipped_content", "directory", "revision"] - objects_seen: Dict[str, Set[bytes]] = {} def __init__(self, storage): self.storage = get_storage(**storage) - for object_type in self.object_types: - self.objects_seen[object_type] = set() def __getattr__(self, key): if key == "storage": raise AttributeError(key) return getattr(self.storage, key) def content_add(self, content: Iterable[Content]) -> Dict: contents = list(content) contents_to_add = self._filter_missing_contents(contents) return self.storage.content_add( x for x in contents if x.sha256 in contents_to_add ) def skipped_content_add(self, content: Iterable[SkippedContent]) -> Dict: contents = list(content) contents_to_add = self._filter_missing_skipped_contents(contents) return self.storage.skipped_content_add( x for x in contents if x.sha1_git is None or x.sha1_git in contents_to_add ) def directory_add(self, directories: Iterable[Directory]) -> Dict: directories = list(directories) missing_ids = self._filter_missing_ids("directory", (d.id for d in directories)) return self.storage.directory_add(d for d in directories if d.id in missing_ids) def revision_add(self, revisions: Iterable[Revision]) -> Dict: revisions = list(revisions) missing_ids = self._filter_missing_ids("revision", (r.id for r in revisions)) return self.storage.revision_add(r for r in revisions if r.id in missing_ids) def _filter_missing_contents(self, contents: Iterable[Content]) -> Set[bytes]: """Return only the content keys missing from swh Args: content_hashes: List of sha256 to check for existence in swh storage """ - objects_seen = self.objects_seen["content"] missing_contents = [] for content in contents: - if content.sha256 in objects_seen: - continue - objects_seen.add(content.sha256) missing_contents.append(content.hashes()) return set(self.storage.content_missing(missing_contents, key_hash="sha256",)) def _filter_missing_skipped_contents( self, contents: Iterable[SkippedContent] ) -> Set[bytes]: """Return only the content keys missing from swh Args: content_hashes: List of sha1_git to check for existence in swh storage """ - objects_seen = self.objects_seen["skipped_content"] missing_contents = [] for content in contents: - if content.sha1_git is None or content.sha1_git in objects_seen: + if content.sha1_git is None: continue - objects_seen.add(content.sha1_git) missing_contents.append(content.hashes()) return { c.get("sha1_git") for c in self.storage.skipped_content_missing(missing_contents) } def _filter_missing_ids(self, object_type: str, ids: Iterable[bytes]) -> Set[bytes]: """Filter missing ids from the storage for a given object type. Args: object_type: object type to use {revision, directory} ids: Iterable of object_type ids Returns: Missing ids from the storage for object_type """ - objects_seen = self.objects_seen[object_type] missing_ids = [] for id in ids: - if id in objects_seen: - continue - objects_seen.add(id) missing_ids.append(id) fn_by_object_type = { "revision": self.storage.revision_missing, "directory": self.storage.directory_missing, } fn = fn_by_object_type[object_type] return set(fn(missing_ids)) - - def clear_buffers(self, object_types: Optional[Iterable[str]] = None) -> None: - """Clear objects from current buffer - - """ - if object_types is None: - object_types = self.object_types - - for object_type in object_types: - self.objects_seen[object_type] = set() - - return self.storage.clear_buffers(object_types) - - def flush(self, object_types: Optional[Iterable[str]] = None) -> Dict: - return self.storage.flush(object_types) diff --git a/swh/storage/tests/test_filter.py b/swh/storage/tests/test_filter.py index 02187367..6043decb 100644 --- a/swh/storage/tests/test_filter.py +++ b/swh/storage/tests/test_filter.py @@ -1,184 +1,127 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import pytest from swh.storage import get_storage @pytest.fixture def swh_storage(): storage_config = { "cls": "pipeline", "steps": [{"cls": "validate"}, {"cls": "filter"}, {"cls": "memory"},], } return get_storage(**storage_config) def test_filtering_proxy_storage_content(swh_storage, sample_data): sample_content = sample_data["content"][0] content = next(swh_storage.content_get([sample_content["sha1"]])) assert not content s = swh_storage.content_add([sample_content]) assert s == { "content:add": 1, "content:add:bytes": sample_content["length"], } content = next(swh_storage.content_get([sample_content["sha1"]])) assert content is not None s = swh_storage.content_add([sample_content]) assert s == { "content:add": 0, "content:add:bytes": 0, } def test_filtering_proxy_storage_skipped_content(swh_storage, sample_data): sample_content = sample_data["skipped_content"][0] content = next(swh_storage.skipped_content_missing([sample_content])) assert content["sha1"] == sample_content["sha1"] s = swh_storage.skipped_content_add([sample_content]) assert s == { "skipped_content:add": 1, } content = list(swh_storage.skipped_content_missing([sample_content])) assert content == [] s = swh_storage.skipped_content_add([sample_content]) assert s == { "skipped_content:add": 0, } def test_filtering_proxy_storage_skipped_content_missing_sha1_git( swh_storage, sample_data ): sample_content = sample_data["skipped_content"][0] sample_content2 = sample_data["skipped_content"][1] sample_content["sha1_git"] = sample_content2["sha1_git"] = None content = next(swh_storage.skipped_content_missing([sample_content])) assert content["sha1"] == sample_content["sha1"] s = swh_storage.skipped_content_add([sample_content]) assert s == { "skipped_content:add": 1, } content = list(swh_storage.skipped_content_missing([sample_content])) assert content == [] s = swh_storage.skipped_content_add([sample_content2]) assert s == { "skipped_content:add": 1, } content = list(swh_storage.skipped_content_missing([sample_content2])) assert content == [] def test_filtering_proxy_storage_revision(swh_storage, sample_data): sample_revision = sample_data["revision"][0] revision = next(swh_storage.revision_get([sample_revision["id"]])) assert not revision s = swh_storage.revision_add([sample_revision]) assert s == { "revision:add": 1, } revision = next(swh_storage.revision_get([sample_revision["id"]])) assert revision is not None s = swh_storage.revision_add([sample_revision]) assert s == { "revision:add": 0, } def test_filtering_proxy_storage_directory(swh_storage, sample_data): sample_directory = sample_data["directory"][0] directory = next(swh_storage.directory_missing([sample_directory["id"]])) assert directory s = swh_storage.directory_add([sample_directory]) assert s == { "directory:add": 1, } directory = list(swh_storage.directory_missing([sample_directory["id"]])) assert not directory s = swh_storage.directory_add([sample_directory]) assert s == { "directory:add": 0, } - - -def test_filtering_proxy_storage_clear(swh_storage, sample_data): - """Clear operation on filter proxy - - """ - threshold = 10 - contents = sample_data["content"] - assert 0 < len(contents) < threshold - skipped_contents = sample_data["skipped_content"] - assert 0 < len(skipped_contents) < threshold - directories = sample_data["directory"] - assert 0 < len(directories) < threshold - revisions = sample_data["revision"] - assert 0 < len(revisions) < threshold - releases = sample_data["release"] - assert 0 < len(releases) < threshold - - s = swh_storage.content_add(contents) - assert s["content:add"] == len(contents) - s = swh_storage.skipped_content_add(skipped_contents) - assert s == { - "skipped_content:add": len(directories), - } - s = swh_storage.directory_add(directories) - assert s == { - "directory:add": len(directories), - } - s = swh_storage.revision_add(revisions) - assert s == { - "revision:add": len(revisions), - } - - assert len(swh_storage.objects_seen["content"]) == len(contents) - assert len(swh_storage.objects_seen["skipped_content"]) == len(skipped_contents) - assert len(swh_storage.objects_seen["directory"]) == len(directories) - assert len(swh_storage.objects_seen["revision"]) == len(revisions) - - # clear only content from the buffer - s = swh_storage.clear_buffers(["content"]) - assert s is None - - # specific clear operation on specific object type content only touched - # them - assert len(swh_storage.objects_seen["content"]) == 0 - assert len(swh_storage.objects_seen["skipped_content"]) == len(skipped_contents) - assert len(swh_storage.objects_seen["directory"]) == len(directories) - assert len(swh_storage.objects_seen["revision"]) == len(revisions) - - # clear current buffer from all object types - s = swh_storage.clear_buffers() - assert s is None - - assert len(swh_storage.objects_seen["content"]) == 0 - assert len(swh_storage.objects_seen["skipped_content"]) == 0 - assert len(swh_storage.objects_seen["directory"]) == 0 - assert len(swh_storage.objects_seen["revision"]) == 0