diff --git a/swh/storage/proxies/filter.py b/swh/storage/proxies/filter.py index f9ad74e5..14bff3cc 100644 --- a/swh/storage/proxies/filter.py +++ b/swh/storage/proxies/filter.py @@ -1,139 +1,145 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Dict, Iterable, List, Set -from swh.model.model import Content, Directory, Revision, Sha1Git, SkippedContent +from swh.model.model import ( + Content, + Directory, + Release, + Revision, + Sha1Git, + SkippedContent, +) from swh.storage import get_storage from swh.storage.interface import StorageInterface class FilteringProxyStorage: """Filtering Storage implementation. This is in charge of transparently filtering out known objects prior to adding them to storage. Sample configuration use case for filtering storage: .. code-block: yaml storage: cls: filter storage: cls: remote url: http://storage.internal.staging.swh.network:5002/ """ - object_types = ["content", "skipped_content", "directory", "revision"] + object_types = ["content", "skipped_content", "directory", "revision", "release"] def __init__(self, storage): self.storage: StorageInterface = get_storage(**storage) def __getattr__(self, key): if key == "storage": raise AttributeError(key) return getattr(self.storage, key) def content_add(self, content: List[Content]) -> Dict[str, int]: empty_stat = { "content:add": 0, "content:add:bytes": 0, } if not content: return empty_stat contents_to_add = self._filter_missing_contents(content) if not contents_to_add: return empty_stat return self.storage.content_add( [x for x in content if x.sha256 in contents_to_add] ) def skipped_content_add(self, content: List[SkippedContent]) -> Dict[str, int]: empty_stat = {"skipped_content:add": 0} if not content: return empty_stat contents_to_add = self._filter_missing_skipped_contents(content) if not contents_to_add and not any(c.sha1_git is None for c in content): return empty_stat return self.storage.skipped_content_add( [x for x in content if x.sha1_git is None or x.sha1_git in contents_to_add] ) def directory_add(self, directories: List[Directory]) -> Dict[str, int]: empty_stat = {"directory:add": 0} if not directories: return empty_stat missing_ids = self._filter_missing_ids("directory", (d.id for d in directories)) if not missing_ids: return empty_stat return self.storage.directory_add( [d for d in directories if d.id in missing_ids] ) def revision_add(self, revisions: List[Revision]) -> Dict[str, int]: empty_stat = {"revision:add": 0} if not revisions: return empty_stat missing_ids = self._filter_missing_ids("revision", (r.id for r in revisions)) if not missing_ids: return empty_stat return self.storage.revision_add([r for r in revisions if r.id in missing_ids]) + def release_add(self, releases: List[Release]) -> Dict[str, int]: + empty_stat = {"release:add": 0} + if not releases: + return empty_stat + missing_ids = self._filter_missing_ids("release", (r.id for r in releases)) + if not missing_ids: + return empty_stat + return self.storage.release_add([r for r in releases if r.id in missing_ids]) + def _filter_missing_contents(self, contents: List[Content]) -> Set[bytes]: """Return only the content keys missing from swh Args: content_hashes: List of sha256 to check for existence in swh storage """ missing_contents = [] for content in contents: missing_contents.append(content.hashes()) return set(self.storage.content_missing(missing_contents, key_hash="sha256",)) def _filter_missing_skipped_contents( self, contents: List[SkippedContent] ) -> Set[Sha1Git]: """Return only the content keys missing from swh Args: content_hashes: List of sha1_git to check for existence in swh storage """ missing_contents = [c.hashes() for c in contents if c.sha1_git is not None] ids = set() for c in self.storage.skipped_content_missing(missing_contents): if c is None or c.get("sha1_git") is None: continue ids.add(c["sha1_git"]) return ids def _filter_missing_ids(self, object_type: str, ids: Iterable[bytes]) -> Set[bytes]: """Filter missing ids from the storage for a given object type. Args: object_type: object type to use {revision, directory} ids: List of object_type ids Returns: Missing ids from the storage for object_type """ - missing_ids = [] - for id in ids: - missing_ids.append(id) - - fn_by_object_type = { - "revision": self.storage.revision_missing, - "directory": self.storage.directory_missing, - } - - fn = fn_by_object_type[object_type] - return set(fn(missing_ids)) + return set(getattr(self.storage, f"{object_type}_missing")(list(ids))) diff --git a/swh/storage/tests/test_filter.py b/swh/storage/tests/test_filter.py index 6a90050b..0ffe9cea 100644 --- a/swh/storage/tests/test_filter.py +++ b/swh/storage/tests/test_filter.py @@ -1,161 +1,181 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from unittest.mock import Mock import attr import pytest from swh.storage import get_storage @pytest.fixture def swh_storage(): storage_config = { "cls": "pipeline", "steps": [{"cls": "filter"}, {"cls": "memory"},], } return get_storage(**storage_config) def test_filtering_proxy_storage_content(swh_storage, sample_data): sample_content = sample_data.content content = swh_storage.content_get_data(sample_content.sha1) assert content is None s = swh_storage.content_add([sample_content]) assert s == { "content:add": 1, "content:add:bytes": sample_content.length, } content = swh_storage.content_get_data(sample_content.sha1) assert content is not None s = swh_storage.content_add([sample_content]) assert s == { "content:add": 0, "content:add:bytes": 0, } def test_filtering_proxy_storage_skipped_content(swh_storage, sample_data): sample_content = sample_data.skipped_content sample_content_dict = sample_content.to_dict() content = next(swh_storage.skipped_content_missing([sample_content_dict])) assert content["sha1"] == sample_content.sha1 s = swh_storage.skipped_content_add([sample_content]) assert s == { "skipped_content:add": 1, } content = list(swh_storage.skipped_content_missing([sample_content_dict])) assert content == [] s = swh_storage.skipped_content_add([sample_content]) assert s == { "skipped_content:add": 0, } def test_filtering_proxy_storage_skipped_content_missing_sha1_git( swh_storage, sample_data ): sample_contents = [ attr.evolve(c, sha1_git=None) for c in sample_data.skipped_contents ] sample_content, sample_content2 = [c.to_dict() for c in sample_contents[:2]] content = next(swh_storage.skipped_content_missing([sample_content])) assert content["sha1"] == sample_content["sha1"] s = swh_storage.skipped_content_add([sample_contents[0]]) assert s == { "skipped_content:add": 1, } content = list(swh_storage.skipped_content_missing([sample_content])) assert content == [] s = swh_storage.skipped_content_add([sample_contents[1]]) assert s == { "skipped_content:add": 1, } content = list(swh_storage.skipped_content_missing([sample_content2])) assert content == [] def test_filtering_proxy_storage_revision(swh_storage, sample_data): sample_revision = sample_data.revision revision = swh_storage.revision_get([sample_revision.id])[0] assert revision is None s = swh_storage.revision_add([sample_revision]) assert s == { "revision:add": 1, } revision = swh_storage.revision_get([sample_revision.id])[0] assert revision is not None s = swh_storage.revision_add([sample_revision]) assert s == { "revision:add": 0, } +def test_filtering_proxy_storage_release(swh_storage, sample_data): + sample_release = sample_data.release + + release = swh_storage.release_get([sample_release.id])[0] + assert release is None + + s = swh_storage.release_add([sample_release]) + assert s == { + "release:add": 1, + } + + release = swh_storage.release_get([sample_release.id])[0] + assert release is not None + + s = swh_storage.release_add([sample_release]) + assert s == { + "release:add": 0, + } + + def test_filtering_proxy_storage_directory(swh_storage, sample_data): sample_directory = sample_data.directory directory = list(swh_storage.directory_missing([sample_directory.id]))[0] assert directory s = swh_storage.directory_add([sample_directory]) assert s == { "directory:add": 1, } directory = list(swh_storage.directory_missing([sample_directory.id])) assert not directory s = swh_storage.directory_add([sample_directory]) assert s == { "directory:add": 0, } def test_filtering_proxy_storage_empty_list(swh_storage, sample_data): swh_storage.storage = mock_storage = Mock(wraps=swh_storage.storage) calls = 0 for object_type in swh_storage.object_types: calls += 1 method_name = f"{object_type}_add" method = getattr(swh_storage, method_name) one_object = getattr(sample_data, object_type) # Call with empty list: ensure underlying storage not called method([]) assert method_name not in {c[0] for c in mock_storage.method_calls} mock_storage.reset_mock() # Call with an object: ensure underlying storage is called method([one_object]) assert method_name in {c[0] for c in mock_storage.method_calls} mock_storage.reset_mock() # Call with the same object: ensure underlying storage is not called again method([one_object]) assert method_name not in {c[0] for c in mock_storage.method_calls} mock_storage.reset_mock() assert calls > 0, "Empty list never tested"