diff --git a/swh/core/tests/test_config.py b/swh/core/tests/test_config.py index bdabb1a..b0461b4 100644 --- a/swh/core/tests/test_config.py +++ b/swh/core/tests/test_config.py @@ -1,210 +1,210 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import shutil import tempfile import unittest from swh.core import config class ConfReaderTest(unittest.TestCase): @classmethod def setUpClass(cls): # create a temporary folder cls.tmpdir = tempfile.mkdtemp(prefix='test-swh-core.') cls.conffile = os.path.join(cls.tmpdir, 'config.ini') conf_contents = """[main] a = 1 b = this is a string c = true h = false ls = list, of, strings li = 1, 2, 3, 4 """ with open(cls.conffile, 'w') as conf: conf.write(conf_contents) cls.non_existing_conffile = os.path.join(cls.tmpdir, 'config-nonexisting.ini') # Create an unreadable, proper configuration file cls.perms_broken_file = os.path.join(cls.tmpdir, 'unreadable.ini') with open(cls.perms_broken_file, 'w') as conf: conf.write(conf_contents) os.chmod(cls.perms_broken_file, 0o000) # Create a proper configuration file in an unreadable directory cls.perms_broken_dir = os.path.join(cls.tmpdir, 'unreadabledir') cls.file_in_broken_dir = os.path.join(cls.perms_broken_dir, 'unreadable.ini') os.makedirs(cls.perms_broken_dir) with open(cls.file_in_broken_dir, 'w') as conf: conf.write(conf_contents) os.chmod(cls.perms_broken_dir, 0o000) cls.empty_conffile = os.path.join(cls.tmpdir, 'empty.ini') open(cls.empty_conffile, 'w').close() cls.default_conf = { 'a': ('int', 2), 'b': ('string', 'default-string'), 'c': ('bool', True), 'd': ('int', 10), 'e': ('int', None), 'f': ('bool', None), 'g': ('string', None), 'h': ('bool', True), 'i': ('bool', True), 'ls': ('list[str]', ['a', 'b', 'c']), 'li': ('list[int]', [42, 43]), } cls.other_default_conf = { 'a': ('int', 3), } cls.full_default_conf = cls.default_conf.copy() cls.full_default_conf['a'] = cls.other_default_conf['a'] cls.parsed_default_conf = { key: value for key, (type, value) in cls.default_conf.items() } cls.parsed_conffile = { 'a': 1, 'b': 'this is a string', 'c': True, 'd': 10, 'e': None, 'f': None, 'g': None, 'h': False, 'i': True, 'ls': ['list', 'of', 'strings'], 'li': [1, 2, 3, 4], } @classmethod def tearDownClass(cls): # Make the broken perms items readable again to be able to remove them os.chmod(cls.perms_broken_dir, 0o755) os.chmod(cls.perms_broken_file, 0o644) shutil.rmtree(cls.tmpdir) def test_read(self): # when res = config.read(self.conffile, self.default_conf) # then - self.assertEquals(res, self.parsed_conffile) + self.assertEqual(res, self.parsed_conffile) def test_read_empty_file(self): # when res = config.read(None, self.default_conf) # then - self.assertEquals(res, self.parsed_default_conf) + self.assertEqual(res, self.parsed_default_conf) def test_support_non_existing_conffile(self): # when res = config.read(self.non_existing_conffile, self.default_conf) # then - self.assertEquals(res, self.parsed_default_conf) + self.assertEqual(res, self.parsed_default_conf) def test_support_empty_conffile(self): # when res = config.read(self.empty_conffile, self.default_conf) # then - self.assertEquals(res, self.parsed_default_conf) + self.assertEqual(res, self.parsed_default_conf) def test_raise_on_broken_directory_perms(self): with self.assertRaises(PermissionError): config.read(self.file_in_broken_dir, self.default_conf) def test_raise_on_broken_file_perms(self): with self.assertRaises(PermissionError): config.read(self.perms_broken_file, self.default_conf) def test_merge_default_configs(self): # when res = config.merge_default_configs(self.default_conf, self.other_default_conf) # then - self.assertEquals(res, self.full_default_conf) + self.assertEqual(res, self.full_default_conf) def test_priority_read_nonexist_conf(self): # when res = config.priority_read([self.non_existing_conffile, self.conffile], self.default_conf) # then - self.assertEquals(res, self.parsed_conffile) + self.assertEqual(res, self.parsed_conffile) def test_priority_read_conf_nonexist_empty(self): # when res = config.priority_read([ self.conffile, self.non_existing_conffile, self.empty_conffile, ], self.default_conf) # then - self.assertEquals(res, self.parsed_conffile) + self.assertEqual(res, self.parsed_conffile) def test_priority_read_empty_conf_nonexist(self): # when res = config.priority_read([ self.empty_conffile, self.conffile, self.non_existing_conffile, ], self.default_conf) # then - self.assertEquals(res, self.parsed_default_conf) + self.assertEqual(res, self.parsed_default_conf) def test_swh_config_paths(self): res = config.swh_config_paths('foo/bar.ini') self.assertEqual(res, [ '~/.config/swh/foo/bar.ini', '~/.swh/foo/bar.ini', '/etc/softwareheritage/foo/bar.ini', ]) def test_prepare_folder(self): # given conf = {'path1': os.path.join(self.tmpdir, 'path1'), 'path2': os.path.join(self.tmpdir, 'path2', 'depth1')} # the folders does not exists self.assertFalse(os.path.exists(conf['path1']), "path1 should not exist.") self.assertFalse(os.path.exists(conf['path2']), "path2 should not exist.") # when config.prepare_folders(conf, 'path1') # path1 exists but not path2 self.assertTrue(os.path.exists(conf['path1']), "path1 should now exist!") self.assertFalse(os.path.exists(conf['path2']), "path2 should not exist.") # path1 already exists, skips it but creates path2 config.prepare_folders(conf, 'path1', 'path2') self.assertTrue(os.path.exists(conf['path1']), "path1 should still exist!") self.assertTrue(os.path.exists(conf['path2']), "path2 should now exist.") diff --git a/swh/core/tests/test_logger.py b/swh/core/tests/test_logger.py index 66461a5..ba4c9f6 100644 --- a/swh/core/tests/test_logger.py +++ b/swh/core/tests/test_logger.py @@ -1,45 +1,45 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging import os import unittest import pytest from swh.core.logger import PostgresHandler from swh.core.tests.db_testing import SingleDbTestFixture from swh.core.tests import SQL_DIR @pytest.mark.db class PgLogHandler(SingleDbTestFixture, unittest.TestCase): TEST_DB_DUMP = os.path.join(SQL_DIR, 'log-schema.sql') def setUp(self): super().setUp() self.modname = 'swh.core.tests.test_logger' self.logger = logging.Logger(self.modname, logging.DEBUG) self.logger.addHandler(PostgresHandler('dbname=' + self.TEST_DB_NAME)) def tearDown(self): logging.shutdown() super().tearDown() def test_log(self): self.logger.info('notice', extra={'swh_type': 'test entry', 'swh_data': 42}) - self.logger.warn('warning') + self.logger.warning('warning') with self.conn.cursor() as cur: cur.execute('SELECT level, message, data, src_module FROM log') db_log_entries = cur.fetchall() self.assertIn(('info', 'notice', {'type': 'test entry', 'data': 42}, self.modname), db_log_entries) self.assertIn(('warning', 'warning', {}, self.modname), db_log_entries) diff --git a/swh/core/tests/test_utils.py b/swh/core/tests/test_utils.py index 48dab5d..a2a4397 100644 --- a/swh/core/tests/test_utils.py +++ b/swh/core/tests/test_utils.py @@ -1,116 +1,116 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from swh.core import utils class UtilsLib(unittest.TestCase): def test_grouper(self): # given actual_data = utils.grouper((i for i in range(0, 9)), 2) out = [] for d in actual_data: out.append(list(d)) # force generator resolution for checks self.assertEqual(out, [[0, 1], [2, 3], [4, 5], [6, 7], [8]]) # given actual_data = utils.grouper((i for i in range(9, 0, -1)), 4) out = [] for d in actual_data: out.append(list(d)) # force generator resolution for checks self.assertEqual(out, [[9, 8, 7, 6], [5, 4, 3, 2], [1]]) def test_backslashescape_errors(self): raw_data_err = b'abcd\x80' with self.assertRaises(UnicodeDecodeError): raw_data_err.decode('utf-8', 'strict') - self.assertEquals( + self.assertEqual( raw_data_err.decode('utf-8', 'backslashescape'), 'abcd\\x80', ) raw_data_ok = b'abcd\xc3\xa9' - self.assertEquals( + self.assertEqual( raw_data_ok.decode('utf-8', 'backslashescape'), raw_data_ok.decode('utf-8', 'strict'), ) unicode_data = 'abcdef\u00a3' - self.assertEquals( + self.assertEqual( unicode_data.encode('ascii', 'backslashescape'), b'abcdef\\xa3', ) def test_encode_with_unescape(self): valid_data = '\\x01020304\\x00' valid_data_encoded = b'\x01020304\x00' - self.assertEquals( + self.assertEqual( valid_data_encoded, utils.encode_with_unescape(valid_data) ) def test_encode_with_unescape_invalid_escape(self): invalid_data = 'test\\abcd' with self.assertRaises(ValueError) as exc: utils.encode_with_unescape(invalid_data) self.assertIn('invalid escape', exc.exception.args[0]) self.assertIn('position 4', exc.exception.args[0]) def test_decode_with_escape(self): backslashes = b'foo\\bar\\\\baz' backslashes_escaped = 'foo\\\\bar\\\\\\\\baz' - self.assertEquals( + self.assertEqual( backslashes_escaped, utils.decode_with_escape(backslashes), ) valid_utf8 = b'foo\xc3\xa2' valid_utf8_escaped = 'foo\u00e2' - self.assertEquals( + self.assertEqual( valid_utf8_escaped, utils.decode_with_escape(valid_utf8), ) invalid_utf8 = b'foo\xa2' invalid_utf8_escaped = 'foo\\xa2' - self.assertEquals( + self.assertEqual( invalid_utf8_escaped, utils.decode_with_escape(invalid_utf8), ) valid_utf8_nul = b'foo\xc3\xa2\x00' valid_utf8_nul_escaped = 'foo\u00e2\\x00' - self.assertEquals( + self.assertEqual( valid_utf8_nul_escaped, utils.decode_with_escape(valid_utf8_nul), ) def test_commonname(self): # when actual_commonname = utils.commonname('/some/where/to/', '/some/where/to/go/to') # then - self.assertEquals('go/to', actual_commonname) + self.assertEqual('go/to', actual_commonname) # when actual_commonname2 = utils.commonname(b'/some/where/to/', b'/some/where/to/go/to') # then - self.assertEquals(b'go/to', actual_commonname2) + self.assertEqual(b'go/to', actual_commonname2)