diff --git a/requirements-test.txt b/requirements-test.txt --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,6 +1,7 @@ Click -pytest < 4 +pytest pytest-postgresql requests-mock hypothesis >= 3.11.0 pre-commit +pytz diff --git a/swh/core/tests/test_logger.py b/swh/core/tests/test_logger.py new file mode 100644 --- /dev/null +++ b/swh/core/tests/test_logger.py @@ -0,0 +1,120 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from datetime import datetime +import logging +import pytz +import inspect + +from unittest.mock import patch + +from swh.core import logger + + +def lineno(): + """Returns the current line number in our program.""" + return inspect.currentframe().f_back.f_lineno + + +def test_db_level(): + assert logger.db_level_of_py_level(10) == 'debug' + assert logger.db_level_of_py_level(20) == 'info' + assert logger.db_level_of_py_level(30) == 'warning' + assert logger.db_level_of_py_level(40) == 'error' + assert logger.db_level_of_py_level(50) == 'critical' + + +def test_flatten_scalar(): + assert list(logger.flatten('')) == [('', '')] + assert list(logger.flatten('toto')) == [('', 'toto')] + + assert list(logger.flatten(10)) == [('', 10)] + assert list(logger.flatten(10.5)) == [('', 10.5)] + + +def test_flatten_list(): + assert list(logger.flatten([])) == [] + assert list(logger.flatten([1])) == [('0', 1)] + + assert list(logger.flatten([1, 2, ['a', 'b']])) == [ + ('0', 1), + ('1', 2), + ('2_0', 'a'), + ('2_1', 'b'), + ] + + assert list(logger.flatten([1, 2, ['a', ('x', 1)]])) == [ + ('0', 1), + ('1', 2), + ('2_0', 'a'), + ('2_1_0', 'x'), + ('2_1_1', 1), + ] + + +def test_flatten_dict(): + assert list(logger.flatten({})) == [] + assert list(logger.flatten({'a': 1})) == [('a', 1)] + + assert sorted(logger.flatten({'a': 1, + 'b': (2, 3,), + 'c': {'d': 4, 'e': 'f'}})) == [ + ('a', 1), + ('b_0', 2), + ('b_1', 3), + ('c_d', 4), + ('c_e', 'f'), + ] + + +def test_stringify(): + assert logger.stringify(None) == 'None' + assert logger.stringify(123) == '123' + assert logger.stringify('abc') == 'abc' + + date = datetime(2019, 9, 1, 16, 32) + assert logger.stringify(date) == '2019-09-01T16:32:00' + + tzdate = datetime(2019, 9, 1, 16, 32, tzinfo=pytz.utc) + assert logger.stringify(tzdate) == '2019-09-01T16:32:00+00:00' + + +@patch('swh.core.logger.send') +def test_journal_handler(send): + log = logging.getLogger('test_logger') + log.addHandler(logger.JournalHandler()) + log.setLevel(logging.DEBUG) + + _, ln = log.info('hello world'), lineno() + + send.assert_called_with( + 'hello world', + CODE_FILE=__file__, + CODE_FUNC='test_journal_handler', + CODE_LINE=ln, + LOGGER='test_logger', + PRIORITY='6', + THREAD_NAME='MainThread') + + +@patch('swh.core.logger.send') +def test_journal_handler_w_data(send): + log = logging.getLogger('test_logger') + log.addHandler(logger.JournalHandler()) + log.setLevel(logging.DEBUG) + + _, ln = log.debug('something cool %s', ['with', {'extra': 'data'}]), lineno() # noqa + + send.assert_called_with( + "something cool ['with', {'extra': 'data'}]", + CODE_FILE=__file__, + CODE_FUNC='test_journal_handler_w_data', + CODE_LINE=ln, + LOGGER='test_logger', + PRIORITY='7', + THREAD_NAME='MainThread', + SWH_LOGGING_ARGS_0_0='with', + SWH_LOGGING_ARGS_0_1_EXTRA='data' + ) diff --git a/swh/core/tests/test_tarball.py b/swh/core/tests/test_tarball.py new file mode 100644 --- /dev/null +++ b/swh/core/tests/test_tarball.py @@ -0,0 +1,67 @@ +# Copyright (C) 2019 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from zipfile import ZipFile + +from swh.core import tarball + + +def test_is_tarball(tmp_path): + + nozip = tmp_path / 'nozip.zip' + nozip.write_text('Im no zip') + + assert tarball.is_tarball(str(nozip)) is False + + notar = tmp_path / 'notar.tar' + notar.write_text('Im no tar') + + assert tarball.is_tarball(str(notar)) is False + + zipfile = tmp_path / 'truezip.zip' + with ZipFile(str(zipfile), 'w') as myzip: + myzip.writestr('file1.txt', 'some content') + + assert tarball.is_tarball(str(zipfile)) is True + + +def test_compress_uncompress_zip(tmp_path): + tocompress = tmp_path / 'compressme' + tocompress.mkdir() + + for i in range(10): + fpath = tocompress / ('file%s.txt' % i) + fpath.write_text('content of file %s' % i) + + zipfile = tmp_path / 'archive.zip' + tarball.compress(str(zipfile), 'zip', str(tocompress)) + + assert tarball.is_tarball(str(zipfile)) + + destdir = tmp_path / 'destdir' + tarball.uncompress(str(zipfile), str(destdir)) + + lsdir = sorted(x.name for x in destdir.iterdir()) + assert ['file%s.txt' % i for i in range(10)] == lsdir + + +def test_compress_uncompress_tar(tmp_path): + tocompress = tmp_path / 'compressme' + tocompress.mkdir() + + for i in range(10): + fpath = tocompress / ('file%s.txt' % i) + fpath.write_text('content of file %s' % i) + + tarfile = tmp_path / 'archive.tar' + tarball.compress(str(tarfile), 'tar', str(tocompress)) + + assert tarball.is_tarball(str(tarfile)) + + destdir = tmp_path / 'destdir' + tarball.uncompress(str(tarfile), str(destdir)) + + lsdir = sorted(x.name for x in destdir.iterdir()) + assert ['file%s.txt' % i for i in range(10)] == lsdir