diff --git a/swh/deposit/tests/test_utils.py b/swh/deposit/tests/test_utils.py index e3495685..27e06047 100644 --- a/swh/deposit/tests/test_utils.py +++ b/swh/deposit/tests/test_utils.py @@ -1,196 +1,216 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest +import pytest from unittest.mock import patch from swh.deposit import utils from swh.deposit.models import Deposit, DepositClient def test_origin_url_from(): + """With correctly setup-ed deposit, all is fine + """ for provider_url, external_id in ( ('http://somewhere.org', 'uuid'), ('http://overthejungle.org', 'diuu'), ): deposit = Deposit( client=DepositClient(provider_url=provider_url), external_id=external_id ) actual_origin_url = utils.origin_url_from(deposit) assert actual_origin_url == '%s/%s' % ( provider_url.rstrip('/'), external_id) +def test_origin_url_from_ko(): + """Badly configured deposit should raise + + """ + for provider_url, external_id in ( + (None, 'uuid'), + ('http://overthejungle.org', None), + ): + deposit = Deposit( + client=DepositClient(provider_url=provider_url), + external_id=None + ) + + with pytest.raises(AssertionError): + utils.origin_url_from(deposit) + + class UtilsTestCase(unittest.TestCase): """Utils library """ def test_merge(self): """Calling utils.merge on dicts should merge without losing information """ d0 = { 'author': 'someone', 'license': [['gpl2']], 'a': 1 } d1 = { 'author': ['author0', {'name': 'author1'}], 'license': [['gpl3']], 'b': { '1': '2' } } d2 = { 'author': map(lambda x: x, ['else']), 'license': 'mit', 'b': { '2': '3', } } d3 = { 'author': (v for v in ['no one']), } actual_merge = utils.merge(d0, d1, d2, d3) expected_merge = { 'a': 1, 'license': [['gpl2'], ['gpl3'], 'mit'], 'author': [ 'someone', 'author0', {'name': 'author1'}, 'else', 'no one'], 'b': { '1': '2', '2': '3', } } self.assertEqual(actual_merge, expected_merge) def test_merge_2(self): d0 = { 'license': 'gpl2', 'runtime': { 'os': 'unix derivative' } } d1 = { 'license': 'gpl3', 'runtime': 'GNU/Linux' } expected = { 'license': ['gpl2', 'gpl3'], 'runtime': [ { 'os': 'unix derivative' }, 'GNU/Linux' ], } actual = utils.merge(d0, d1) self.assertEqual(actual, expected) def test_merge_edge_cases(self): input_dict = { 'license': ['gpl2', 'gpl3'], 'runtime': [ { 'os': 'unix derivative' }, 'GNU/Linux' ], } # against empty dict actual = utils.merge(input_dict, {}) self.assertEqual(actual, input_dict) # against oneself actual = utils.merge(input_dict, input_dict, input_dict) self.assertEqual(input_dict, input_dict) def test_merge_one_dict(self): """Merge one dict should result in the same dict value """ input_and_expected = {'anything': 'really'} actual = utils.merge(input_and_expected) self.assertEqual(actual, input_and_expected) def test_merge_raise(self): """Calling utils.merge with any no dict argument should raise """ d0 = { 'author': 'someone', 'a': 1 } d1 = ['not a dict'] with self.assertRaises(ValueError): utils.merge(d0, d1) with self.assertRaises(ValueError): utils.merge(d1, d0) with self.assertRaises(ValueError): utils.merge(d1) self.assertEqual(utils.merge(d0), d0) @patch('swh.deposit.utils.normalize_timestamp', side_effect=lambda x: x) def test_normalize_date_0(mock_normalize): """When date is a list, choose the first date and normalize it Note: We do not test swh.model.identifiers which is already tested in swh.model """ actual_date = utils.normalize_date(['2017-10-12', 'date1']) expected_date = '2017-10-12 00:00:00+00:00' assert str(actual_date) == expected_date @patch('swh.deposit.utils.normalize_timestamp', side_effect=lambda x: x) def test_normalize_date_1(mock_normalize): """Providing a date in a reasonable format, everything is fine Note: We do not test swh.model.identifiers which is already tested in swh.model """ actual_date = utils.normalize_date('2018-06-11 17:02:02') expected_date = '2018-06-11 17:02:02+00:00' assert str(actual_date) == expected_date @patch('swh.deposit.utils.normalize_timestamp', side_effect=lambda x: x) def test_normalize_date_doing_irrelevant_stuff(mock_normalize): """Providing a date with only the year results in a reasonable date Note: We do not test swh.model.identifiers which is already tested in swh.model """ actual_date = utils.normalize_date('2017') expected_date = '2017-01-01 00:00:00+00:00' assert str(actual_date) == expected_date diff --git a/swh/deposit/utils.py b/swh/deposit/utils.py index 4818fc74..beb31ef6 100644 --- a/swh/deposit/utils.py +++ b/swh/deposit/utils.py @@ -1,98 +1,108 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import iso8601 from types import GeneratorType from swh.model.identifiers import normalize_timestamp def origin_url_from(deposit): - """Given a deposit instance, return the associated origin url + """Given a deposit instance, return the associated origin url. + + This expects a deposit and the associated client to be correctly + configured. Args: deposit (Deposit): The deposit from which derives the origin url + Raises: + AssertionError if: + - the client's provider_url field is not configured. + - the deposit's external_id field is not configured. + Returns The associated origin url """ - base_url = deposit.client.provider_url external_id = deposit.external_id + assert external_id is not None + base_url = deposit.client.provider_url + assert base_url is not None return '%s/%s' % (base_url.rstrip('/'), external_id) def merge(*dicts): """Given an iterator of dicts, merge them losing no information. Args: *dicts: arguments are all supposed to be dict to merge into one Returns: dict merged without losing information """ def _extend(existing_val, value): """Given an existing value and a value (as potential lists), merge them together without repetition. """ if isinstance(value, (list, map, GeneratorType)): vals = value else: vals = [value] for v in vals: if v in existing_val: continue existing_val.append(v) return existing_val d = {} for data in dicts: if not isinstance(data, dict): raise ValueError( 'dicts is supposed to be a variable arguments of dict') for key, value in data.items(): existing_val = d.get(key) if not existing_val: d[key] = value continue if isinstance(existing_val, (list, map, GeneratorType)): new_val = _extend(existing_val, value) elif isinstance(existing_val, dict): if isinstance(value, dict): new_val = merge(existing_val, value) else: new_val = _extend([existing_val], value) else: new_val = _extend([existing_val], value) d[key] = new_val return d def normalize_date(date): """Normalize date fields as expected by swh workers. If date is a list, elect arbitrarily the first element of that list If date is (then) a string, parse it through dateutil.parser.parse to extract a datetime. Then normalize it through swh.model.identifiers.normalize_timestamp. Returns The swh date object """ if isinstance(date, list): date = date[0] if isinstance(date, str): date = iso8601.parse_date(date) return normalize_timestamp(date)