diff --git a/swh/core/tests/test_api.py b/swh/core/tests/test_api.py index 1bab537..e1009b1 100644 --- a/swh/core/tests/test_api.py +++ b/swh/core/tests/test_api.py @@ -1,84 +1,81 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest -from nose.tools import istest import requests_mock from werkzeug.wrappers import BaseResponse from werkzeug.test import Client as WerkzeugTestClient from swh.core.api import ( error_handler, encode_data_server, remote_api_endpoint, SWHRemoteAPI, SWHServerAPIApp) class ApiTest(unittest.TestCase): - @istest def test_server(self): testcase = self nb_endpoint_calls = 0 class TestStorage: @remote_api_endpoint('test_endpoint_url') def test_endpoint(self, test_data, db=None, cur=None): nonlocal nb_endpoint_calls nb_endpoint_calls += 1 testcase.assertEqual(test_data, 'spam') return 'egg' app = SWHServerAPIApp('testapp', backend_class=TestStorage, backend_factory=lambda: TestStorage()) @app.errorhandler(Exception) def my_error_handler(exception): return error_handler(exception, encode_data_server) client = WerkzeugTestClient(app, BaseResponse) res = client.post('/test_endpoint_url', headers={'Content-Type': 'application/x-msgpack'}, data=b'\x81\xa9test_data\xa4spam') self.assertEqual(nb_endpoint_calls, 1) self.assertEqual(b''.join(res.response), b'\xa3egg') - @istest def test_client(self): class TestStorage: @remote_api_endpoint('test_endpoint_url') def test_endpoint(self, test_data, db=None, cur=None): pass nb_http_calls = 0 def callback(request, context): nonlocal nb_http_calls nb_http_calls += 1 self.assertEqual(request.headers['Content-Type'], 'application/x-msgpack') self.assertEqual(request.body, b'\x81\xa9test_data\xa4spam') context.headers['Content-Type'] = 'application/x-msgpack' context.content = b'\xa3egg' return b'\xa3egg' adapter = requests_mock.Adapter() adapter.register_uri('POST', 'mock://example.com/test_endpoint_url', content=callback) class Testclient(SWHRemoteAPI): backend_class = TestStorage def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.session.mount('mock', adapter) c = Testclient('foo', 'mock://example.com/') res = c.test_endpoint('spam') self.assertEqual(nb_http_calls, 1) self.assertEqual(res, 'egg') diff --git a/swh/core/tests/test_config.py b/swh/core/tests/test_config.py index c517962..bdabb1a 100644 --- a/swh/core/tests/test_config.py +++ b/swh/core/tests/test_config.py @@ -1,224 +1,210 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import tempfile -import unittest import os import shutil - -from nose.tools import istest +import tempfile +import unittest from swh.core import config class ConfReaderTest(unittest.TestCase): @classmethod def setUpClass(cls): # create a temporary folder cls.tmpdir = tempfile.mkdtemp(prefix='test-swh-core.') cls.conffile = os.path.join(cls.tmpdir, 'config.ini') conf_contents = """[main] a = 1 b = this is a string c = true h = false ls = list, of, strings li = 1, 2, 3, 4 """ with open(cls.conffile, 'w') as conf: conf.write(conf_contents) cls.non_existing_conffile = os.path.join(cls.tmpdir, 'config-nonexisting.ini') # Create an unreadable, proper configuration file cls.perms_broken_file = os.path.join(cls.tmpdir, 'unreadable.ini') with open(cls.perms_broken_file, 'w') as conf: conf.write(conf_contents) os.chmod(cls.perms_broken_file, 0o000) # Create a proper configuration file in an unreadable directory cls.perms_broken_dir = os.path.join(cls.tmpdir, 'unreadabledir') cls.file_in_broken_dir = os.path.join(cls.perms_broken_dir, 'unreadable.ini') os.makedirs(cls.perms_broken_dir) with open(cls.file_in_broken_dir, 'w') as conf: conf.write(conf_contents) os.chmod(cls.perms_broken_dir, 0o000) cls.empty_conffile = os.path.join(cls.tmpdir, 'empty.ini') open(cls.empty_conffile, 'w').close() cls.default_conf = { 'a': ('int', 2), 'b': ('string', 'default-string'), 'c': ('bool', True), 'd': ('int', 10), 'e': ('int', None), 'f': ('bool', None), 'g': ('string', None), 'h': ('bool', True), 'i': ('bool', True), 'ls': ('list[str]', ['a', 'b', 'c']), 'li': ('list[int]', [42, 43]), } cls.other_default_conf = { 'a': ('int', 3), } cls.full_default_conf = cls.default_conf.copy() cls.full_default_conf['a'] = cls.other_default_conf['a'] cls.parsed_default_conf = { key: value for key, (type, value) in cls.default_conf.items() } cls.parsed_conffile = { 'a': 1, 'b': 'this is a string', 'c': True, 'd': 10, 'e': None, 'f': None, 'g': None, 'h': False, 'i': True, 'ls': ['list', 'of', 'strings'], 'li': [1, 2, 3, 4], } @classmethod def tearDownClass(cls): # Make the broken perms items readable again to be able to remove them os.chmod(cls.perms_broken_dir, 0o755) os.chmod(cls.perms_broken_file, 0o644) shutil.rmtree(cls.tmpdir) - @istest - def read(self): + def test_read(self): # when res = config.read(self.conffile, self.default_conf) # then self.assertEquals(res, self.parsed_conffile) - @istest - def read_empty_file(self): + def test_read_empty_file(self): # when res = config.read(None, self.default_conf) # then self.assertEquals(res, self.parsed_default_conf) - @istest - def support_non_existing_conffile(self): + def test_support_non_existing_conffile(self): # when res = config.read(self.non_existing_conffile, self.default_conf) # then self.assertEquals(res, self.parsed_default_conf) - @istest - def support_empty_conffile(self): + def test_support_empty_conffile(self): # when res = config.read(self.empty_conffile, self.default_conf) # then self.assertEquals(res, self.parsed_default_conf) - @istest - def raise_on_broken_directory_perms(self): + def test_raise_on_broken_directory_perms(self): with self.assertRaises(PermissionError): config.read(self.file_in_broken_dir, self.default_conf) - @istest - def raise_on_broken_file_perms(self): + def test_raise_on_broken_file_perms(self): with self.assertRaises(PermissionError): config.read(self.perms_broken_file, self.default_conf) - @istest - def merge_default_configs(self): + def test_merge_default_configs(self): # when res = config.merge_default_configs(self.default_conf, self.other_default_conf) # then self.assertEquals(res, self.full_default_conf) - @istest - def priority_read_nonexist_conf(self): + def test_priority_read_nonexist_conf(self): # when res = config.priority_read([self.non_existing_conffile, self.conffile], self.default_conf) # then self.assertEquals(res, self.parsed_conffile) - @istest - def priority_read_conf_nonexist_empty(self): + def test_priority_read_conf_nonexist_empty(self): # when res = config.priority_read([ self.conffile, self.non_existing_conffile, self.empty_conffile, ], self.default_conf) # then self.assertEquals(res, self.parsed_conffile) - @istest - def priority_read_empty_conf_nonexist(self): + def test_priority_read_empty_conf_nonexist(self): # when res = config.priority_read([ self.empty_conffile, self.conffile, self.non_existing_conffile, ], self.default_conf) # then self.assertEquals(res, self.parsed_default_conf) - @istest - def swh_config_paths(self): + def test_swh_config_paths(self): res = config.swh_config_paths('foo/bar.ini') self.assertEqual(res, [ '~/.config/swh/foo/bar.ini', '~/.swh/foo/bar.ini', '/etc/softwareheritage/foo/bar.ini', ]) - @istest - def prepare_folder(self): + def test_prepare_folder(self): # given conf = {'path1': os.path.join(self.tmpdir, 'path1'), 'path2': os.path.join(self.tmpdir, 'path2', 'depth1')} # the folders does not exists self.assertFalse(os.path.exists(conf['path1']), "path1 should not exist.") self.assertFalse(os.path.exists(conf['path2']), "path2 should not exist.") # when config.prepare_folders(conf, 'path1') # path1 exists but not path2 self.assertTrue(os.path.exists(conf['path1']), "path1 should now exist!") self.assertFalse(os.path.exists(conf['path2']), "path2 should not exist.") # path1 already exists, skips it but creates path2 config.prepare_folders(conf, 'path1', 'path2') self.assertTrue(os.path.exists(conf['path1']), "path1 should still exist!") self.assertTrue(os.path.exists(conf['path2']), "path2 should now exist.") diff --git a/swh/core/tests/test_logger.py b/swh/core/tests/test_logger.py index 0547873..1cbe308 100644 --- a/swh/core/tests/test_logger.py +++ b/swh/core/tests/test_logger.py @@ -1,50 +1,47 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import os import unittest -from nose.tools import istest from nose.plugins.attrib import attr from swh.core.logger import PostgresHandler from swh.core.tests.db_testing import SingleDbTestFixture - TEST_DIR = os.path.dirname(os.path.abspath(__file__)) SQL_DIR = os.path.join(TEST_DIR, '../../../sql') @attr('db') class PgLogHandler(SingleDbTestFixture, unittest.TestCase): TEST_DB_DUMP = os.path.join(SQL_DIR, 'log-schema.sql') TEST_DB_DUMP_TYPE = 'psql' def setUp(self): super().setUp() self.modname = 'swh.core.tests.test_logger' self.logger = logging.Logger(self.modname, logging.DEBUG) self.logger.addHandler(PostgresHandler('dbname=' + self.TEST_DB_NAME)) def tearDown(self): logging.shutdown() super().tearDown() - @istest - def log(self): + def test_log(self): self.logger.info('notice', extra={'swh_type': 'test entry', 'swh_data': 42}) self.logger.warn('warning') with self.conn.cursor() as cur: cur.execute('SELECT level, message, data, src_module FROM log') db_log_entries = cur.fetchall() self.assertIn(('info', 'notice', {'type': 'test entry', 'data': 42}, self.modname), db_log_entries) self.assertIn(('warning', 'warning', {}, self.modname), db_log_entries) diff --git a/swh/core/tests/test_serializers.py b/swh/core/tests/test_serializers.py index 0ffb594..166744a 100644 --- a/swh/core/tests/test_serializers.py +++ b/swh/core/tests/test_serializers.py @@ -1,82 +1,80 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import arrow import datetime import json import unittest from uuid import UUID -from nose.tools import istest +import arrow -from swh.core.serializers import SWHJSONDecoder, SWHJSONEncoder -from swh.core.serializers import msgpack_dumps, msgpack_loads +from swh.core.serializers import ( + SWHJSONDecoder, + SWHJSONEncoder, + msgpack_dumps, + msgpack_loads +) class Serializers(unittest.TestCase): def setUp(self): self.tz = datetime.timezone(datetime.timedelta(minutes=118)) self.data = { 'bytes': b'123456789\x99\xaf\xff\x00\x12', 'datetime_naive': datetime.datetime(2015, 1, 1, 12, 4, 42, 231455), 'datetime_tz': datetime.datetime(2015, 3, 4, 18, 25, 13, 1234, tzinfo=self.tz), 'datetime_utc': datetime.datetime(2015, 3, 4, 18, 25, 13, 1234, tzinfo=datetime.timezone.utc), 'datetime_delta': datetime.timedelta(64), 'arrow_date': arrow.get('2018-04-25T16:17:53.533672+00:00'), 'swhtype': 'fake', 'swh_dict': {'swhtype': 42, 'd': 'test'}, 'random_dict': {'swhtype': 43}, 'uuid': UUID('cdd8f804-9db6-40c3-93ab-5955d3836234'), } self.encoded_data = { 'bytes': {'swhtype': 'bytes', 'd': 'F)}kWH8wXmIhn8j01^'}, 'datetime_naive': {'swhtype': 'datetime', 'd': '2015-01-01T12:04:42.231455'}, 'datetime_tz': {'swhtype': 'datetime', 'd': '2015-03-04T18:25:13.001234+01:58'}, 'datetime_utc': {'swhtype': 'datetime', 'd': '2015-03-04T18:25:13.001234+00:00'}, 'datetime_delta': {'swhtype': 'timedelta', 'd': 'datetime.timedelta(64)'}, 'arrow_date': {'swhtype': 'arrow', 'd': '2018-04-25T16:17:53.533672+00:00'}, 'swhtype': 'fake', 'swh_dict': {'swhtype': 42, 'd': 'test'}, 'random_dict': {'swhtype': 43}, 'uuid': {'swhtype': 'uuid', 'd': 'cdd8f804-9db6-40c3-93ab-5955d3836234'}, } self.generator = (i for i in range(5)) self.gen_lst = list(range(5)) - @istest - def round_trip_json(self): + def test_round_trip_json(self): data = json.dumps(self.data, cls=SWHJSONEncoder) self.assertEqual(self.data, json.loads(data, cls=SWHJSONDecoder)) - @istest - def encode_swh_json(self): + def test_encode_swh_json(self): data = json.dumps(self.data, cls=SWHJSONEncoder) self.assertEqual(self.encoded_data, json.loads(data)) - @istest - def round_trip_msgpack(self): + def test_round_trip_msgpack(self): data = msgpack_dumps(self.data) self.assertEqual(self.data, msgpack_loads(data)) - @istest - def generator_json(self): + def test_generator_json(self): data = json.dumps(self.generator, cls=SWHJSONEncoder) self.assertEqual(self.gen_lst, json.loads(data, cls=SWHJSONDecoder)) - @istest - def generator_msgpack(self): + def test_generator_msgpack(self): data = msgpack_dumps(self.generator) self.assertEqual(self.gen_lst, msgpack_loads(data)) diff --git a/swh/core/tests/test_utils.py b/swh/core/tests/test_utils.py index 85403c4..48dab5d 100644 --- a/swh/core/tests/test_utils.py +++ b/swh/core/tests/test_utils.py @@ -1,124 +1,116 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest -from nose.tools import istest - from swh.core import utils class UtilsLib(unittest.TestCase): - @istest - def grouper(self): + def test_grouper(self): # given actual_data = utils.grouper((i for i in range(0, 9)), 2) out = [] for d in actual_data: out.append(list(d)) # force generator resolution for checks self.assertEqual(out, [[0, 1], [2, 3], [4, 5], [6, 7], [8]]) # given actual_data = utils.grouper((i for i in range(9, 0, -1)), 4) out = [] for d in actual_data: out.append(list(d)) # force generator resolution for checks self.assertEqual(out, [[9, 8, 7, 6], [5, 4, 3, 2], [1]]) - @istest - def backslashescape_errors(self): + def test_backslashescape_errors(self): raw_data_err = b'abcd\x80' with self.assertRaises(UnicodeDecodeError): raw_data_err.decode('utf-8', 'strict') self.assertEquals( raw_data_err.decode('utf-8', 'backslashescape'), 'abcd\\x80', ) raw_data_ok = b'abcd\xc3\xa9' self.assertEquals( raw_data_ok.decode('utf-8', 'backslashescape'), raw_data_ok.decode('utf-8', 'strict'), ) unicode_data = 'abcdef\u00a3' self.assertEquals( unicode_data.encode('ascii', 'backslashescape'), b'abcdef\\xa3', ) - @istest - def encode_with_unescape(self): + def test_encode_with_unescape(self): valid_data = '\\x01020304\\x00' valid_data_encoded = b'\x01020304\x00' self.assertEquals( valid_data_encoded, utils.encode_with_unescape(valid_data) ) - @istest - def encode_with_unescape_invalid_escape(self): + def test_encode_with_unescape_invalid_escape(self): invalid_data = 'test\\abcd' with self.assertRaises(ValueError) as exc: utils.encode_with_unescape(invalid_data) self.assertIn('invalid escape', exc.exception.args[0]) self.assertIn('position 4', exc.exception.args[0]) - @istest - def decode_with_escape(self): + def test_decode_with_escape(self): backslashes = b'foo\\bar\\\\baz' backslashes_escaped = 'foo\\\\bar\\\\\\\\baz' self.assertEquals( backslashes_escaped, utils.decode_with_escape(backslashes), ) valid_utf8 = b'foo\xc3\xa2' valid_utf8_escaped = 'foo\u00e2' self.assertEquals( valid_utf8_escaped, utils.decode_with_escape(valid_utf8), ) invalid_utf8 = b'foo\xa2' invalid_utf8_escaped = 'foo\\xa2' self.assertEquals( invalid_utf8_escaped, utils.decode_with_escape(invalid_utf8), ) valid_utf8_nul = b'foo\xc3\xa2\x00' valid_utf8_nul_escaped = 'foo\u00e2\\x00' self.assertEquals( valid_utf8_nul_escaped, utils.decode_with_escape(valid_utf8_nul), ) - @istest - def commonname(self): + def test_commonname(self): # when actual_commonname = utils.commonname('/some/where/to/', '/some/where/to/go/to') # then self.assertEquals('go/to', actual_commonname) # when actual_commonname2 = utils.commonname(b'/some/where/to/', b'/some/where/to/go/to') # then self.assertEquals(b'go/to', actual_commonname2)