diff --git a/requirements-swh.txt b/requirements-swh.txt index 185434a..f18315a 100644 --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,3 +1,3 @@ swh.model >= 0.0.18 -swh.storage >= 0.0.133 +swh.storage >= 0.0.152 swh.deposit diff --git a/swh/loader/core/tests/test_storage.py b/swh/loader/core/tests/test_storage.py deleted file mode 100644 index cc42b42..0000000 --- a/swh/loader/core/tests/test_storage.py +++ /dev/null @@ -1,241 +0,0 @@ -# Copyright (C) 2019 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - - -from swh.loader.package.storage import ( - BufferingProxyStorage, FilteringProxyStorage -) - - -sample_content = { - 'blake2s256': b'\xbf?\x05\xed\xc1U\xd2\xc5\x168Xm\x93\xde}f(HO@\xd0\xacn\x04\x1e\x9a\xb9\xfa\xbf\xcc\x08\xc7', # noqa - 'sha1': b'g\x15y+\xcb][\\\n\xf28\xb2\x0c_P[\xc8\x89Hk', - 'sha1_git': b'\xf2\xae\xfa\xba\xfa\xa6B\x9b^\xf9Z\xf5\x14\x0cna\xb0\xef\x8b', # noqa - 'sha256': b"\x87\x022\xedZN\x84\xe8za\xf8'(oA\xc9k\xb1\x80c\x80\xe7J\x06\xea\xd2\xd5\xbeB\x19\xb8\xce", # noqa - 'length': 48, - 'data': b'temp file for testing content storage conversion', - 'status': 'visible', -} - -sample_content2 = { - 'blake2s256': b'\xbf?\x05\xed\xc1U\xd2\xc5\x168Xm\x93\xde}f(HO@\xd0\xacn\x04\x1e\x9a\xb9\xfa\xbf\xcc\x08\xc7', # noqa - 'sha1': b'f\x15y+\xcb][\\\n\xf28\xb2\x0c_P[\xc8\x89Hk', - 'sha1_git': b'\xc2\xae\xfa\xba\xfa\xa6B\x9b^\xf9Z\xf5\x14\x0cna\xb0\xef\x8b', # noqa - 'sha256': b"\x77\x022\xedZN\x84\xe8za\xf8'(oA\xc9k\xb1\x80c\x80\xe7J\x06\xea\xd2\xd5\xbeB\x19\xb8\xce", # noqa - 'length': 50, - 'data': b'temp file for testing content storage conversion 2', - 'status': 'visible', -} - - -sample_directory = { - 'id': b'f\x15y+\xcb][\\\n\xf28\xb2\x0c_P[\xc8\x89Hk', - 'entries': [] -} - - -sample_person = { - 'name': b'John Doe', - 'email': b'john.doe@institute.org', - 'fullname': b'John Doe ' -} - - -sample_revision = { - 'id': b'f\x15y+\xcb][\\\n\xf28\xb2\x0c_P[\xc8\x89Hk', - 'message': b'something', - 'author': sample_person, - 'committer': sample_person, - 'date': 1567591673, - 'committer_date': 1567591673, - 'type': 'tar', - 'directory': b'\xc2\xae\xfa\xba\xfa\xa6B\x9b^\xf9Z\xf5\x14\x0cna\xb0\xef\x8b', # noqa - 'synthetic': False, - 'metadata': {}, - 'parents': [], -} - - -def test_buffering_proxy_storage_content_threshold_not_hit(): - storage = BufferingProxyStorage( - storage={'cls': 'memory', 'args': {}}, - thresholds={ - 'content': 10, - } - ) - s = storage.content_add([sample_content, sample_content2]) - assert s == {} - - s = storage.flush() - assert s == { - 'content:add': 1 + 1, - 'content:add:bytes': 48 + 50, - 'skipped_content:add': 0 - } - - -def test_buffering_proxy_storage_content_threshold_nb_hit(): - storage = BufferingProxyStorage( - storage={'cls': 'memory', 'args': {}}, - thresholds={ - 'content': 1, - } - ) - - s = storage.content_add([sample_content]) - assert s == { - 'content:add': 1, - 'content:add:bytes': 48, - 'skipped_content:add': 0 - } - s = storage.flush() - assert s == {} - - -def test_buffering_proxy_storage_content_threshold_bytes_hit(): - storage = BufferingProxyStorage( - storage={'cls': 'memory', 'args': {}}, - thresholds={ - 'content': 10, - 'content_bytes': 20, - } - ) - - s = storage.content_add([sample_content]) - assert s == { - 'content:add': 1, - 'content:add:bytes': 48, - 'skipped_content:add': 0 - } - s = storage.flush() - assert s == {} - - -def test_buffering_proxy_storage_directory_threshold_not_hit(): - storage = BufferingProxyStorage( - storage={'cls': 'memory', 'args': {}}, - thresholds={ - 'directory': 10, - } - ) - s = storage.directory_add([sample_directory]) - assert s == {} - - s = storage.flush() - assert s == { - 'directory:add': 1, - } - - -def test_buffering_proxy_storage_directory_threshold_hit(): - storage = BufferingProxyStorage( - storage={'cls': 'memory', 'args': {}}, - thresholds={ - 'directory': 1, - } - ) - s = storage.directory_add([sample_directory]) - assert s == { - 'directory:add': 1, - } - - s = storage.flush() - assert s == {} - - -def test_buffering_proxy_storage_revision_threshold_not_hit(): - storage = BufferingProxyStorage( - storage={'cls': 'memory', 'args': {}}, - thresholds={ - 'revision': 10, - } - ) - s = storage.revision_add([sample_revision]) - assert s == {} - - s = storage.flush() - assert s == { - 'revision:add': 1, - } - - -def test_buffering_proxy_storage_revision_threshold_hit(): - storage = BufferingProxyStorage( - storage={'cls': 'memory', 'args': {}}, - thresholds={ - 'revision': 1, - } - ) - s = storage.revision_add([sample_revision]) - assert s == { - 'revision:add': 1, - } - - s = storage.flush() - assert s == {} - - -def test_filtering_proxy_storage_content(): - storage = FilteringProxyStorage(storage={'cls': 'memory', 'args': {}}) - - content = next(storage.content_get([sample_content['sha1']])) - assert not content - - s = storage.content_add([sample_content]) - assert s == { - 'content:add': 1, - 'content:add:bytes': 48, - 'skipped_content:add': 0 - } - - content = next(storage.content_get([sample_content['sha1']])) - assert content is not None - - s = storage.content_add([sample_content]) - assert s == { - 'content:add': 0, - 'content:add:bytes': 0, - 'skipped_content:add': 0 - } - - -def test_filtering_proxy_storage_revision(): - storage = FilteringProxyStorage(storage={'cls': 'memory', 'args': {}}) - - revision = next(storage.revision_get([sample_revision['id']])) - assert not revision - - s = storage.revision_add([sample_revision]) - assert s == { - 'revision:add': 1, - } - - revision = next(storage.revision_get([sample_revision['id']])) - assert revision is not None - - s = storage.revision_add([sample_revision]) - assert s == { - 'revision:add': 0, - } - - -def test_filtering_proxy_storage_directory(): - storage = FilteringProxyStorage(storage={'cls': 'memory', 'args': {}}) - - directory = next(storage.directory_missing([sample_directory['id']])) - assert directory - - s = storage.directory_add([sample_directory]) - assert s == { - 'directory:add': 1, - } - - directory = list(storage.directory_missing([sample_directory['id']])) - assert not directory - - s = storage.directory_add([sample_directory]) - assert s == { - 'directory:add': 0, - } diff --git a/swh/loader/package/storage.py b/swh/loader/package/storage.py deleted file mode 100644 index 535b7b3..0000000 --- a/swh/loader/package/storage.py +++ /dev/null @@ -1,185 +0,0 @@ -# Copyright (C) 2019 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -from typing import Optional, Sequence, Dict, Set -from functools import partial -from collections import deque - -from swh.core.utils import grouper -from swh.storage import get_storage - - -class BufferingProxyStorage: - """Storage implementation in charge of accumulating objects prior to - discussing with the "main" storage. - - """ - def __init__(self, storage, thresholds=None): - self.storage = get_storage(**storage) - - if thresholds is None: - thresholds = {} - - self.thresholds = { - 'content': thresholds.get('content', 10000), - 'content_bytes': thresholds.get('content_bytes', 100*1024*1024), - 'directory': thresholds.get('directory', 25000), - 'revision': thresholds.get('revision', 100000), - } - self.object_types = ['content', 'directory', 'revision'] - self._objects = {k: deque() for k in self.object_types} - - def __getattr__(self, key): - if key.endswith('_add'): - object_type = key.split('_')[0] - if object_type in self.object_types: - return partial( - self.object_add, object_type=object_type - ) - return getattr(self.storage, key) - - def content_add(self, content: Sequence[Dict]) -> Dict: - """Enqueue contents to write to the storage. - - Following policies apply: - - First, check if the queue's threshold is hit. If it is flush content - to the storage. - - - If not, check if the total size of enqueued contents's threshold is - hit. If it is flush content to the storage. - - """ - s = self.object_add(content, object_type='content') - if not s: - q = self._objects['content'] - total_size = sum(c['length'] for c in q) - if total_size >= self.thresholds['content_bytes']: - return self.flush(['content']) - - return s - - def flush(self, object_types: Optional[Sequence[str]] = None) -> Dict: - if object_types is None: - object_types = self.object_types - summary = {} - for object_type in object_types: - q = self._objects[object_type] - for objs in grouper(q, n=self.thresholds[object_type]): - add_fn = getattr(self.storage, '%s_add' % object_type) - s = add_fn(objs) - summary = {k: v + summary.get(k, 0) - for k, v in s.items()} - q.clear() - - return summary - - def object_add(self, objects: Sequence[Dict], *, object_type: str) -> Dict: - """Enqueue objects to write to the storage. This checks if the queue's - threshold is hit. If it is actually write those to the storage. - - """ - q = self._objects[object_type] - threshold = self.thresholds[object_type] - q.extend(objects) - if len(q) >= threshold: - return self.flush() - - return {} - - -class FilteringProxyStorage: - """Storage implementation in charge of filtering existing objects prior to - calling the storage api for ingestion. - - """ - def __init__(self, storage): - self.storage = get_storage(**storage) - self.objects_seen = { - 'content': set(), # set of content hashes (sha256) seen - 'directory': set(), - 'revision': set(), - } - - def __getattr__(self, key): - return getattr(self.storage, key) - - def content_add(self, content: Sequence[Dict]) -> Dict: - contents = list(content) - contents_to_add = self._filter_missing_contents(contents) - return self.storage.content_add( - x for x in contents if x['sha256'] in contents_to_add - ) - - def directory_add(self, directories: Sequence[Dict]) -> Dict: - directories = list(directories) - missing_ids = self._filter_missing_ids( - 'directory', - (d['id'] for d in directories) - ) - return self.storage.directory_add( - d for d in directories if d['id'] in missing_ids - ) - - def revision_add(self, revisions): - revisions = list(revisions) - missing_ids = self._filter_missing_ids( - 'revision', - (d['id'] for d in revisions) - ) - return self.storage.revision_add( - r for r in revisions if r['id'] in missing_ids - ) - - def _filter_missing_contents( - self, content_hashes: Sequence[Dict]) -> Set[bytes]: - """Return only the content keys missing from swh - - Args: - content_hashes: List of sha256 to check for existence in swh - storage - - """ - objects_seen = self.objects_seen['content'] - missing_hashes = [] - for hashes in content_hashes: - if hashes['sha256'] in objects_seen: - continue - objects_seen.add(hashes['sha256']) - missing_hashes.append(hashes) - - return set(self.storage.content_missing( - missing_hashes, - key_hash='sha256', - )) - - def _filter_missing_ids( - self, - object_type: str, - ids: Sequence[bytes]) -> Set[bytes]: - """Filter missing ids from the storage for a given object type. - - Args: - object_type: object type to use {revision, directory} - ids: List of object_type ids - - Returns: - Missing ids from the storage for object_type - - """ - objects_seen = self.objects_seen[object_type] - missing_ids = [] - for id in ids: - if id in objects_seen: - continue - objects_seen.add(id) - missing_ids.append(id) - - fn_by_object_type = { - 'revision': self.storage.revision_missing, - 'directory': self.storage.directory_missing, - } - - fn = fn_by_object_type[object_type] - return set(fn(missing_ids)) diff --git a/swh/loader/package/tests/conftest.py b/swh/loader/package/tests/conftest.py index 397a32a..d3b2b65 100644 --- a/swh/loader/package/tests/conftest.py +++ b/swh/loader/package/tests/conftest.py @@ -1,149 +1,133 @@ # Copyright (C) 2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import os import re import pytest from functools import partial from os import path from urllib.parse import urlparse -import swh.storage -from swh.storage import get_storage as initial_get_storage - logger = logging.getLogger(__name__) -def get_storage(cls, args): - if cls == 'filter': - from swh.loader.package.storage import FilteringProxyStorage - return FilteringProxyStorage(**args) - if cls == 'buffer': - from swh.loader.package.storage import BufferingProxyStorage - return BufferingProxyStorage(**args) - return initial_get_storage(cls, args) - - -swh.storage.get_storage = get_storage - - # Check get_local_factory function # Maximum number of iteration checks to generate requests responses MAX_VISIT_FILES = 10 @pytest.fixture def swh_config(monkeypatch, datadir): conffile = os.path.join(datadir, 'loader.yml') monkeypatch.setenv('SWH_CONFIG_FILENAME', conffile) return conffile def get_response_cb(request, context, datadir, ignore_urls=[], visits=None): """Mount point callback to fetch on disk the content of a request This is meant to be used as 'body' argument of the requests_mock.get() method. It will look for files on the local filesystem based on the requested URL, using the following rules: - files are searched in the datadir/ directory - the local file name is the path part of the URL with path hierarchy markers (aka '/') replaced by '_' Eg. if you use the requests_mock fixture in your test file as: requests_mock.get('https://nowhere.com', body=get_response_cb) # or even requests_mock.get(re.compile('https://'), body=get_response_cb) then a call requests.get like: requests.get('https://nowhere.com/path/to/resource') will look the content of the response in: datadir/nowhere.com/path_to_resource Args: request (requests.Request): Object requests context (requests.Context): Object holding response metadata information (status_code, headers, etc...) ignore_urls (List): urls whose status response should be 404 even if the local file exists visits (Optional[Dict]): Map of url, number of visits. If None, disable multi visit support (default) Returns: Optional[FileDescriptor] on the on disk file to read from the test context """ logger.debug('get_response_cb(%s, %s)', request, context) logger.debug('url: %s', request.url) logger.debug('ignore_urls: %s', ignore_urls) if request.url in ignore_urls: context.status_code = 404 return None url = urlparse(request.url) dirname = url.hostname # pypi.org | files.pythonhosted.org # url.path: pypi//json -> local file: pypi__json filename = url.path[1:] if filename.endswith('/'): filename = filename[:-1] filename = filename.replace('/', '_') filepath = path.join(datadir, dirname, filename) if visits is not None: visit = visits.get(url, 0) visits[url] = visit + 1 if visit: filepath = filepath + '_visit%s' % visit if not path.isfile(filepath): logger.debug('not found filepath: %s', filepath) context.status_code = 404 return None fd = open(filepath, 'rb') context.headers['content-length'] = str(path.getsize(filepath)) return fd @pytest.fixture def datadir(request): """By default, returns the test directory """ return path.join(path.dirname(request.fspath), 'data') def local_get_factory(ignore_urls=[], has_multi_visit=False): @pytest.fixture def local_get(requests_mock, datadir): if not has_multi_visit: cb = partial(get_response_cb, ignore_urls=ignore_urls, datadir=datadir) requests_mock.get(re.compile('https://'), body=cb) else: visits = {} requests_mock.get(re.compile('https://'), body=partial( get_response_cb, ignore_urls=ignore_urls, visits=visits, datadir=datadir) ) return requests_mock return local_get local_get = local_get_factory([]) local_get_visits = local_get_factory(has_multi_visit=True) diff --git a/swh/loader/package/tests/data/loader.yml b/swh/loader/package/tests/data/loader.yml index 177c338..f4d5076 100644 --- a/swh/loader/package/tests/data/loader.yml +++ b/swh/loader/package/tests/data/loader.yml @@ -1,16 +1,16 @@ storage: cls: filter args: storage: cls: buffer args: storage: cls: memory args: {} - thresholds: + min_batch_size: content: 5 content_bytes: 100 directory: 5 revision: 5 url: https://deposit.softwareheritage.org/1/private