Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/filter.py
# Copyright (C) 2019 The Software Heritage developers | # Copyright (C) 2019 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from typing import Dict, Generator, Sequence, Set | from typing import Dict, Generator, Iterable, Set | ||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||
class FilteringProxyStorage: | class FilteringProxyStorage: | ||||
"""Filtering Storage implementation. This is in charge of transparently | """Filtering Storage implementation. This is in charge of transparently | ||||
filtering out known objects prior to adding them to storage. | filtering out known objects prior to adding them to storage. | ||||
Sample configuration use case for filtering storage: | Sample configuration use case for filtering storage: | ||||
.. code-block: yaml | .. code-block: yaml | ||||
storage: | storage: | ||||
cls: filter | cls: filter | ||||
storage: | storage: | ||||
cls: remote | cls: remote | ||||
url: http://storage.internal.staging.swh.network:5002/ | url: http://storage.internal.staging.swh.network:5002/ | ||||
""" | """ | ||||
def __init__(self, storage): | def __init__(self, storage): | ||||
self.storage = get_storage(**storage) | self.storage = get_storage(**storage) | ||||
self.objects_seen = { | self.objects_seen = { | ||||
'content': set(), # set of content hashes (sha256) seen | 'content': set(), # sha256 | ||||
'directory': set(), | 'skipped_content': set(), # sha1_git | ||||
'revision': set(), | 'directory': set(), # sha1_git | ||||
'revision': set(), # sha1_git | |||||
} | } | ||||
def __getattr__(self, key): | def __getattr__(self, key): | ||||
return getattr(self.storage, key) | return getattr(self.storage, key) | ||||
def content_add(self, content: Sequence[Dict]) -> Dict: | def content_add(self, content: Iterable[Dict]) -> Dict: | ||||
contents = list(content) | contents = list(content) | ||||
contents_to_add = self._filter_missing_contents(contents) | contents_to_add = self._filter_missing_contents(contents) | ||||
return self.storage.content_add( | return self.storage.content_add( | ||||
x for x in contents if x['sha256'] in contents_to_add | x for x in contents if x['sha256'] in contents_to_add | ||||
) | ) | ||||
def directory_add(self, directories: Sequence[Dict]) -> Dict: | def skipped_content_add(self, content: Iterable[Dict]) -> Dict: | ||||
contents = list(content) | |||||
contents_to_add = self._filter_missing_skipped_contents(contents) | |||||
return self.storage.skipped_content_add( | |||||
x for x in contents if x['sha1_git'] in contents_to_add | |||||
) | |||||
def directory_add(self, directories: Iterable[Dict]) -> Dict: | |||||
directories = list(directories) | directories = list(directories) | ||||
missing_ids = self._filter_missing_ids( | missing_ids = self._filter_missing_ids( | ||||
'directory', | 'directory', | ||||
(d['id'] for d in directories) | (d['id'] for d in directories) | ||||
) | ) | ||||
return self.storage.directory_add( | return self.storage.directory_add( | ||||
d for d in directories if d['id'] in missing_ids | d for d in directories if d['id'] in missing_ids | ||||
) | ) | ||||
def revision_add(self, revisions): | def revision_add(self, revisions): | ||||
revisions = list(revisions) | revisions = list(revisions) | ||||
missing_ids = self._filter_missing_ids( | missing_ids = self._filter_missing_ids( | ||||
'revision', | 'revision', | ||||
(d['id'] for d in revisions) | (d['id'] for d in revisions) | ||||
) | ) | ||||
return self.storage.revision_add( | return self.storage.revision_add( | ||||
r for r in revisions if r['id'] in missing_ids | r for r in revisions if r['id'] in missing_ids | ||||
) | ) | ||||
def _filter_missing_contents( | def _filter_missing_contents( | ||||
self, content_hashes: Sequence[Dict]) -> Set[bytes]: | self, content_hashes: Iterable[Dict]) -> Set[bytes]: | ||||
"""Return only the content keys missing from swh | """Return only the content keys missing from swh | ||||
Args: | Args: | ||||
content_hashes: List of sha256 to check for existence in swh | content_hashes: List of sha256 to check for existence in swh | ||||
storage | storage | ||||
""" | """ | ||||
objects_seen = self.objects_seen['content'] | objects_seen = self.objects_seen['content'] | ||||
missing_hashes = [] | missing_hashes = [] | ||||
for hashes in content_hashes: | for hashes in content_hashes: | ||||
if hashes['sha256'] in objects_seen: | if hashes['sha256'] in objects_seen: | ||||
continue | continue | ||||
objects_seen.add(hashes['sha256']) | objects_seen.add(hashes['sha256']) | ||||
missing_hashes.append(hashes) | missing_hashes.append(hashes) | ||||
return set(self.storage.content_missing( | return set(self.storage.content_missing( | ||||
missing_hashes, | missing_hashes, | ||||
key_hash='sha256', | key_hash='sha256', | ||||
)) | )) | ||||
def _filter_missing_skipped_contents( | |||||
self, content_hashes: Iterable[Dict]) -> Set[bytes]: | |||||
"""Return only the content keys missing from swh | |||||
Args: | |||||
content_hashes: List of sha1_git to check for existence in swh | |||||
storage | |||||
""" | |||||
objects_seen = self.objects_seen['skipped_content'] | |||||
missing_hashes = [] | |||||
for hashes in content_hashes: | |||||
if hashes['sha1_git'] in objects_seen: | |||||
continue | |||||
objects_seen.add(hashes['sha1_git']) | |||||
missing_hashes.append(hashes) | |||||
return {c['sha1_git'] | |||||
for c in self.storage.skipped_content_missing(missing_hashes)} | |||||
def _filter_missing_ids( | def _filter_missing_ids( | ||||
self, | self, | ||||
object_type: str, | object_type: str, | ||||
ids: Generator[bytes, None, None]) -> Set[bytes]: | ids: Generator[bytes, None, None]) -> Set[bytes]: | ||||
"""Filter missing ids from the storage for a given object type. | """Filter missing ids from the storage for a given object type. | ||||
Args: | Args: | ||||
object_type: object type to use {revision, directory} | object_type: object type to use {revision, directory} | ||||
ids: Sequence of object_type ids | ids: Iterable of object_type ids | ||||
Returns: | Returns: | ||||
Missing ids from the storage for object_type | Missing ids from the storage for object_type | ||||
""" | """ | ||||
objects_seen = self.objects_seen[object_type] | objects_seen = self.objects_seen[object_type] | ||||
missing_ids = [] | missing_ids = [] | ||||
for id in ids: | for id in ids: | ||||
Show All 12 Lines |