diff --git a/.gitignore b/.gitignore index dab40fd..55cc78f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,12 +1,13 @@ *.pyc *.sw? *~ /.coverage /.coverage.* .eggs/ __pycache__ *.egg-info/ build dist version.txt .tox +.mypy_cache/ diff --git a/MANIFEST.in b/MANIFEST.in index e7c46fc..8dc6972 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,4 +1,5 @@ include Makefile include requirements.txt include requirements-swh.txt include version.txt +recursive-include swh py.typed diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 0000000..6535cfa --- /dev/null +++ b/mypy.ini @@ -0,0 +1,21 @@ +[mypy] +namespace_packages = True +warn_unused_ignores = True + + +# 3rd party libraries without stubs (yet) + +[mypy-azure.*] +ignore_missing_imports = True + +[mypy-libcloud.*] +ignore_missing_imports = True + +[mypy-pkg_resources.*] +ignore_missing_imports = True + +[mypy-pytest.*] +ignore_missing_imports = True + +[mypy-rados.*] +ignore_missing_imports = True diff --git a/swh/__init__.py b/swh/__init__.py index 69e3be5..de9df06 100644 --- a/swh/__init__.py +++ b/swh/__init__.py @@ -1 +1,4 @@ -__path__ = __import__('pkgutil').extend_path(__path__, __name__) +from typing import Iterable + +__path__ = __import__('pkgutil').extend_path(__path__, + __name__) # type: Iterable[str] diff --git a/swh/objstorage/__init__.py b/swh/objstorage/__init__.py index d254b9a..8c530f7 100644 --- a/swh/objstorage/__init__.py +++ b/swh/objstorage/__init__.py @@ -1,108 +1,109 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.objstorage.objstorage import ObjStorage, ID_HASH_LENGTH # noqa from swh.objstorage.backends.pathslicing import PathSlicingObjStorage from swh.objstorage.backends.in_memory import InMemoryObjStorage from swh.objstorage.api.client import RemoteObjStorage from swh.objstorage.multiplexer import ( MultiplexerObjStorage, StripingObjStorage) from swh.objstorage.multiplexer.filter import add_filters from swh.objstorage.backends.seaweed import WeedObjStorage from swh.objstorage.backends.generator import RandomGeneratorObjStorage +from typing import Callable, Dict, Union __all__ = ['get_objstorage', 'ObjStorage'] -_STORAGE_CLASSES = { +_STORAGE_CLASSES: Dict[str, Union[type, Callable[..., type]]] = { 'pathslicing': PathSlicingObjStorage, 'remote': RemoteObjStorage, 'memory': InMemoryObjStorage, 'weed': WeedObjStorage, 'random': RandomGeneratorObjStorage, } _STORAGE_CLASSES_MISSING = { } try: from swh.objstorage.backends.azure import ( AzureCloudObjStorage, PrefixedAzureCloudObjStorage, ) _STORAGE_CLASSES['azure'] = AzureCloudObjStorage _STORAGE_CLASSES['azure-prefixed'] = PrefixedAzureCloudObjStorage except ImportError as e: _STORAGE_CLASSES_MISSING['azure'] = e.args[0] _STORAGE_CLASSES_MISSING['azure-prefixed'] = e.args[0] try: from swh.objstorage.backends.rados import RADOSObjStorage _STORAGE_CLASSES['rados'] = RADOSObjStorage except ImportError as e: _STORAGE_CLASSES_MISSING['rados'] = e.args[0] try: from swh.objstorage.backends.libcloud import ( AwsCloudObjStorage, OpenStackCloudObjStorage, ) _STORAGE_CLASSES['s3'] = AwsCloudObjStorage _STORAGE_CLASSES['swift'] = OpenStackCloudObjStorage except ImportError as e: _STORAGE_CLASSES_MISSING['s3'] = e.args[0] _STORAGE_CLASSES_MISSING['swift'] = e.args[0] def get_objstorage(cls, args): """ Create an ObjStorage using the given implementation class. Args: cls (str): objstorage class unique key contained in the _STORAGE_CLASSES dict. args (dict): arguments for the required class of objstorage that must match exactly the one in the `__init__` method of the class. Returns: subclass of ObjStorage that match the given `storage_class` argument. Raises: ValueError: if the given storage class is not a valid objstorage key. """ if cls in _STORAGE_CLASSES: return _STORAGE_CLASSES[cls](**args) else: raise ValueError('Storage class {} is not available: {}'.format( cls, _STORAGE_CLASSES_MISSING.get(cls, 'unknown name'))) def _construct_filtered_objstorage(storage_conf, filters_conf): return add_filters( get_objstorage(**storage_conf), filters_conf ) _STORAGE_CLASSES['filtered'] = _construct_filtered_objstorage def _construct_multiplexer_objstorage(objstorages): storages = [get_objstorage(**conf) for conf in objstorages] return MultiplexerObjStorage(storages) _STORAGE_CLASSES['multiplexer'] = _construct_multiplexer_objstorage def _construct_striping_objstorage(objstorages): storages = [get_objstorage(**conf) for conf in objstorages] return StripingObjStorage(storages) _STORAGE_CLASSES['striping'] = _construct_striping_objstorage diff --git a/swh/objstorage/api/server.py b/swh/objstorage/api/server.py index 11a4132..986acb0 100644 --- a/swh/objstorage/api/server.py +++ b/swh/objstorage/api/server.py @@ -1,267 +1,268 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import aiohttp.web +import json from swh.core.config import read as config_read from swh.core.api.asynchronous import (RPCServerApp, decode_request, encode_data_server as encode_data) from swh.core.api.serializers import msgpack_loads, SWHJSONDecoder from swh.model import hashutil from swh.objstorage import get_objstorage from swh.objstorage.objstorage import DEFAULT_LIMIT from swh.objstorage.exc import ObjNotFoundError from swh.core.statsd import statsd def timed(f): async def w(*a, **kw): with statsd.timed( 'swh_objstorage_request_duration_seconds', tags={'endpoint': f.__name__}): return await f(*a, **kw) return w @timed async def index(request): return aiohttp.web.Response(body="SWH Objstorage API server") @timed async def check_config(request): req = await decode_request(request) return encode_data(request.app['objstorage'].check_config(**req)) @timed async def contains(request): req = await decode_request(request) return encode_data(request.app['objstorage'].__contains__(**req)) @timed async def add_bytes(request): req = await decode_request(request) statsd.increment('swh_objstorage_in_bytes_total', len(req['content']), tags={'endpoint': 'add_bytes'}) return encode_data(request.app['objstorage'].add(**req)) @timed async def add_batch(request): req = await decode_request(request) return encode_data(request.app['objstorage'].add_batch(**req)) @timed async def get_bytes(request): req = await decode_request(request) try: ret = request.app['objstorage'].get(**req) except ObjNotFoundError: ret = { 'error': 'object_not_found', 'request': req, } return encode_data(ret, status=404) else: statsd.increment('swh_objstorage_out_bytes_total', len(ret), tags={'endpoint': 'get_bytes'}) return encode_data(ret) @timed async def get_batch(request): req = await decode_request(request) return encode_data(request.app['objstorage'].get_batch(**req)) @timed async def check(request): req = await decode_request(request) return encode_data(request.app['objstorage'].check(**req)) @timed async def delete(request): req = await decode_request(request) return encode_data(request.app['objstorage'].delete(**req)) # Management methods @timed async def get_random_contents(request): req = await decode_request(request) return encode_data(request.app['objstorage'].get_random(**req)) # Streaming methods @timed async def add_stream(request): hex_id = request.match_info['hex_id'] obj_id = hashutil.hash_to_bytes(hex_id) check_pres = (request.query.get('check_presence', '').lower() == 'true') objstorage = request.app['objstorage'] if check_pres and obj_id in objstorage: return encode_data(obj_id) # XXX this really should go in a decode_stream_request coroutine in # swh.core, but since py35 does not support async generators, it cannot # easily be made for now content_type = request.headers.get('Content-Type') if content_type == 'application/x-msgpack': decode = msgpack_loads elif content_type == 'application/json': decode = lambda x: json.loads(x, cls=SWHJSONDecoder) # noqa else: raise ValueError('Wrong content type `%s` for API request' % content_type) buffer = b'' with objstorage.chunk_writer(obj_id) as write: while not request.content.at_eof(): data, eot = await request.content.readchunk() buffer += data if eot: write(decode(buffer)) buffer = b'' return encode_data(obj_id) @timed async def get_stream(request): hex_id = request.match_info['hex_id'] obj_id = hashutil.hash_to_bytes(hex_id) response = aiohttp.web.StreamResponse() await response.prepare(request) for chunk in request.app['objstorage'].get_stream(obj_id, 2 << 20): await response.write(chunk) await response.write_eof() return response @timed async def list_content(request): last_obj_id = request.query.get('last_obj_id') if last_obj_id: last_obj_id = bytes.fromhex(last_obj_id) limit = int(request.query.get('limit', DEFAULT_LIMIT)) response = aiohttp.web.StreamResponse() response.enable_chunked_encoding() await response.prepare(request) for obj_id in request.app['objstorage'].list_content( last_obj_id, limit=limit): await response.write(obj_id) await response.write_eof() return response def make_app(config): """Initialize the remote api application. """ client_max_size = config.get('client_max_size', 1024 * 1024 * 1024) app = RPCServerApp(client_max_size=client_max_size) # retro compatibility configuration settings app['config'] = config _cfg = config['objstorage'] app['objstorage'] = get_objstorage(_cfg['cls'], _cfg['args']) app.router.add_route('GET', '/', index) app.router.add_route('POST', '/check_config', check_config) app.router.add_route('POST', '/content/contains', contains) app.router.add_route('POST', '/content/add', add_bytes) app.router.add_route('POST', '/content/add/batch', add_batch) app.router.add_route('POST', '/content/get', get_bytes) app.router.add_route('POST', '/content/get/batch', get_batch) app.router.add_route('POST', '/content/get/random', get_random_contents) app.router.add_route('POST', '/content/check', check) app.router.add_route('POST', '/content/delete', delete) app.router.add_route('GET', '/content', list_content) app.router.add_route('POST', '/content/add_stream/{hex_id}', add_stream) app.router.add_route('GET', '/content/get_stream/{hex_id}', get_stream) return app def load_and_check_config(config_file): """Check the minimal configuration is set to run the api or raise an error explanation. Args: config_file (str): Path to the configuration file to load type (str): configuration type. For 'local' type, more checks are done. Raises: Error if the setup is not as expected Returns: configuration as a dict """ if not config_file: raise EnvironmentError('Configuration file must be defined') if not os.path.exists(config_file): raise FileNotFoundError('Configuration file %s does not exist' % ( config_file, )) cfg = config_read(config_file) if 'objstorage' not in cfg: raise KeyError( "Invalid configuration; missing objstorage config entry") missing_keys = [] vcfg = cfg['objstorage'] for key in ('cls', 'args'): v = vcfg.get(key) if v is None: missing_keys.append(key) if missing_keys: raise KeyError( "Invalid configuration; missing %s config entry" % ( ', '.join(missing_keys), )) cls = vcfg.get('cls') if cls == 'pathslicing': args = vcfg['args'] for key in ('root', 'slicing'): v = args.get(key) if v is None: missing_keys.append(key) if missing_keys: raise KeyError( "Invalid configuration; missing args.%s config entry" % ( ', '.join(missing_keys), )) return cfg def make_app_from_configfile(): """Load configuration and then build application to run """ config_file = os.environ.get('SWH_CONFIG_FILENAME') config = load_and_check_config(config_file) return make_app(config=config) if __name__ == '__main__': print('Deprecated. Use swh-objstorage') diff --git a/swh/objstorage/py.typed b/swh/objstorage/py.typed new file mode 100644 index 0000000..1242d43 --- /dev/null +++ b/swh/objstorage/py.typed @@ -0,0 +1 @@ +# Marker file for PEP 561. diff --git a/swh/objstorage/tests/test_objstorage_azure.py b/swh/objstorage/tests/test_objstorage_azure.py index 8adccb2..6c5af71 100644 --- a/swh/objstorage/tests/test_objstorage_azure.py +++ b/swh/objstorage/tests/test_objstorage_azure.py @@ -1,133 +1,135 @@ # Copyright (C) 2016-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from collections import defaultdict from unittest.mock import patch +from typing import Any, Dict + from azure.common import AzureMissingResourceHttpError from swh.model.hashutil import hash_to_hex from swh.objstorage import get_objstorage from .objstorage_testing import ObjStorageTestFixture class MockBlob(): """ Libcloud object mock that replicates its API """ def __init__(self, name, content): self.name = name self.content = content class MockBlockBlobService(): """Mock internal azure library which AzureCloudObjStorage depends upon. """ - data = {} + data = {} # type: Dict[str, Any] def __init__(self, account_name, account_key, **kwargs): # do not care for the account_name and the api_secret_key here self.data = defaultdict(dict) def get_container_properties(self, container_name): self.data[container_name] return container_name in self.data def create_blob_from_bytes(self, container_name, blob_name, blob): self.data[container_name][blob_name] = blob def get_blob_to_bytes(self, container_name, blob_name): if blob_name not in self.data[container_name]: raise AzureMissingResourceHttpError( 'Blob %s not found' % blob_name, 404) return MockBlob(name=blob_name, content=self.data[container_name][blob_name]) def delete_blob(self, container_name, blob_name): try: self.data[container_name].pop(blob_name) except KeyError: raise AzureMissingResourceHttpError( 'Blob %s not found' % blob_name, 404) return True def exists(self, container_name, blob_name): return blob_name in self.data[container_name] def list_blobs(self, container_name, marker=None, maxresults=None): for blob_name, content in sorted(self.data[container_name].items()): if marker is None or blob_name > marker: yield MockBlob(name=blob_name, content=content) class TestAzureCloudObjStorage(ObjStorageTestFixture, unittest.TestCase): def setUp(self): super().setUp() patcher = patch( 'swh.objstorage.backends.azure.BlockBlobService', MockBlockBlobService, ) patcher.start() self.addCleanup(patcher.stop) self.storage = get_objstorage('azure', { 'account_name': 'account-name', 'api_secret_key': 'api-secret-key', 'container_name': 'container-name', }) class TestPrefixedAzureCloudObjStorage(ObjStorageTestFixture, unittest.TestCase): def setUp(self): super().setUp() patcher = patch( 'swh.objstorage.backends.azure.BlockBlobService', MockBlockBlobService, ) patcher.start() self.addCleanup(patcher.stop) self.accounts = {} for prefix in '0123456789abcdef': self.accounts[prefix] = { 'account_name': 'account_%s' % prefix, 'api_secret_key': 'secret_key_%s' % prefix, 'container_name': 'container_%s' % prefix, } self.storage = get_objstorage('azure-prefixed', { 'accounts': self.accounts }) def test_prefixedazure_instantiation_missing_prefixes(self): del self.accounts['d'] del self.accounts['e'] with self.assertRaisesRegex(ValueError, 'Missing prefixes'): get_objstorage('azure-prefixed', { 'accounts': self.accounts }) def test_prefixedazure_instantiation_inconsistent_prefixes(self): self.accounts['00'] = self.accounts['0'] with self.assertRaisesRegex(ValueError, 'Inconsistent prefixes'): get_objstorage('azure-prefixed', { 'accounts': self.accounts }) def test_prefixedazure_sharding_behavior(self): for i in range(100): content, obj_id = self.hash_content(b'test_content_%02d' % i) self.storage.add(content, obj_id=obj_id) hex_obj_id = hash_to_hex(obj_id) prefix = hex_obj_id[0] self.assertTrue( self.storage.prefixes[prefix][0].exists( self.accounts[prefix]['container_name'], hex_obj_id )) diff --git a/tox.ini b/tox.ini index cce1c66..3cd7258 100644 --- a/tox.ini +++ b/tox.ini @@ -1,16 +1,24 @@ [tox] -envlist=flake8,py3 +envlist=flake8,py3,mypy [testenv] deps = .[testing] pytest-cov commands = pytest --cov=swh --cov-branch {posargs} [testenv:flake8] skip_install = true deps = flake8 commands = {envpython} -m flake8 + +[testenv:mypy] +skip_install = true +deps = + .[testing] + mypy +commands = + mypy swh