Changeset View
Changeset View
Standalone View
Standalone View
swh/web/tests/common/test_service.py
# Copyright (C) 2015-2018 The Software Heritage developers | # Copyright (C) 2015-2018 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU Affero General Public License version 3, or any later version | # License: GNU Affero General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import copy | |||||
import datetime | import datetime | ||||
import itertools | |||||
import pytest | |||||
import random | |||||
from unittest.mock import MagicMock, patch, call | from collections import defaultdict | ||||
from hypothesis import given | |||||
from swh.model.hashutil import hash_to_bytes, hash_to_hex | from swh.model.hashutil import hash_to_bytes, hash_to_hex | ||||
from swh.web.common import service | from swh.web.common import service | ||||
from swh.web.common.exc import BadInputExc, NotFoundExc | from swh.web.common.exc import BadInputExc, NotFoundExc | ||||
from swh.web.tests.testcase import WebTestCase | from swh.web.tests.strategies import ( | ||||
content, contents, unknown_content, unknown_contents, | |||||
contents_with_ctags, origin, new_origin, visit_dates, directory, | |||||
release, revision, unknown_revision, revisions, unknown_revisions, | |||||
ancestor_revisions, non_ancestor_revisions, invalid_sha1, sha256, | |||||
revision_with_submodules, unknown_directory, empty_directory | |||||
) | |||||
from swh.web.tests.testcase import ( | |||||
WebTestCase, ctags_json_missing, fossology_missing | |||||
) | |||||
class ServiceTestCase(WebTestCase): | class ServiceTestCase(WebTestCase): | ||||
def setUp(self): | def setUp(self): | ||||
self.BLAKE2S256_SAMPLE = ('685395c5dc57cada459364f0946d3dd45b' | |||||
'ad5fcbabc1048edb44380f1d31d0aa') | |||||
self.BLAKE2S256_SAMPLE_BIN = hash_to_bytes(self.BLAKE2S256_SAMPLE) | |||||
self.SHA1_SAMPLE = '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03' | self.SHA1_SAMPLE = '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03' | ||||
self.SHA1_SAMPLE_BIN = hash_to_bytes(self.SHA1_SAMPLE) | self.SHA1_SAMPLE_BIN = hash_to_bytes(self.SHA1_SAMPLE) | ||||
self.SHA256_SAMPLE = ('8abb0aa566452620ecce816eecdef4792d77a' | |||||
'293ad8ea82a4d5ecb4d36f7e560') | |||||
self.SHA256_SAMPLE_BIN = hash_to_bytes(self.SHA256_SAMPLE) | |||||
self.SHA1GIT_SAMPLE = '25d1a2e8f32937b0f498a5ca87f823d8df013c01' | |||||
self.SHA1GIT_SAMPLE_BIN = hash_to_bytes(self.SHA1GIT_SAMPLE) | |||||
self.DIRECTORY_ID = '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6' | self.DIRECTORY_ID = '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6' | ||||
self.DIRECTORY_ID_BIN = hash_to_bytes(self.DIRECTORY_ID) | self.DIRECTORY_ID_BIN = hash_to_bytes(self.DIRECTORY_ID) | ||||
self.AUTHOR_ID_BIN = { | self.AUTHOR_ID_BIN = { | ||||
'name': b'author', | 'name': b'author', | ||||
'email': b'author@company.org', | 'email': b'author@company.org', | ||||
'fullname': b'author <author@company.org>' | |||||
} | } | ||||
self.AUTHOR_ID = { | self.AUTHOR_ID = { | ||||
'name': 'author', | 'name': 'author', | ||||
'email': 'author@company.org', | 'email': 'author@company.org', | ||||
'fullname': 'author <author@company.org>' | |||||
} | } | ||||
self.COMMITTER_ID_BIN = { | self.COMMITTER_ID_BIN = { | ||||
'name': b'committer', | 'name': b'committer', | ||||
'email': b'committer@corp.org', | 'email': b'committer@corp.org', | ||||
'fullname': b'committer <committer@corp.org>' | |||||
} | } | ||||
self.COMMITTER_ID = { | self.COMMITTER_ID = { | ||||
'name': 'committer', | 'name': 'committer', | ||||
'email': 'committer@corp.org', | 'email': 'committer@corp.org', | ||||
'fullname': 'committer <committer@corp.org>' | |||||
} | } | ||||
self.SAMPLE_DATE_RAW = { | self.SAMPLE_DATE_RAW = { | ||||
'timestamp': datetime.datetime( | 'timestamp': int(datetime.datetime( | ||||
2000, 1, 17, 11, 23, 54, | 2000, 1, 17, 11, 23, 54, | ||||
tzinfo=datetime.timezone.utc, | tzinfo=datetime.timezone.utc, | ||||
).timestamp(), | ).timestamp()), | ||||
'offset': 0, | 'offset': 0, | ||||
'negative_utc': False, | 'negative_utc': False, | ||||
} | } | ||||
self.SAMPLE_DATE = '2000-01-17T11:23:54+00:00' | self.SAMPLE_DATE = '2000-01-17T11:23:54+00:00' | ||||
self.SAMPLE_MESSAGE_BIN = b'elegant fix for bug 31415957' | self.SAMPLE_MESSAGE_BIN = b'elegant fix for bug 31415957' | ||||
self.SAMPLE_MESSAGE = 'elegant fix for bug 31415957' | self.SAMPLE_MESSAGE = 'elegant fix for bug 31415957' | ||||
self.SAMPLE_REVISION = { | self.SAMPLE_REVISION = { | ||||
'id': self.SHA1_SAMPLE, | 'id': self.SHA1_SAMPLE, | ||||
'directory': self.DIRECTORY_ID, | 'directory': self.DIRECTORY_ID, | ||||
'author': self.AUTHOR_ID, | 'author': self.AUTHOR_ID, | ||||
'committer': self.COMMITTER_ID, | 'committer': self.COMMITTER_ID, | ||||
'message': self.SAMPLE_MESSAGE, | 'message': self.SAMPLE_MESSAGE, | ||||
'date': self.SAMPLE_DATE, | 'date': self.SAMPLE_DATE, | ||||
'committer_date': self.SAMPLE_DATE, | 'committer_date': self.SAMPLE_DATE, | ||||
'synthetic': False, | 'synthetic': False, | ||||
'type': 'git', | 'type': 'git', | ||||
'parents': [], | 'parents': [], | ||||
'metadata': {}, | 'metadata': {}, | ||||
'merge': False | 'merge': False | ||||
} | } | ||||
self.SAMPLE_REVISION_RAW = { | self.SAMPLE_REVISION_RAW = { | ||||
'id': self.SHA1_SAMPLE_BIN, | 'id': self.SHA1_SAMPLE_BIN, | ||||
'directory': self.DIRECTORY_ID_BIN, | 'directory': self.DIRECTORY_ID_BIN, | ||||
'author': self.AUTHOR_ID_BIN, | 'author': self.AUTHOR_ID_BIN, | ||||
'committer': self.COMMITTER_ID_BIN, | 'committer': self.COMMITTER_ID_BIN, | ||||
'message': self.SAMPLE_MESSAGE_BIN, | 'message': self.SAMPLE_MESSAGE_BIN, | ||||
'date': self.SAMPLE_DATE_RAW, | 'date': self.SAMPLE_DATE_RAW, | ||||
'committer_date': self.SAMPLE_DATE_RAW, | 'committer_date': self.SAMPLE_DATE_RAW, | ||||
'synthetic': False, | 'synthetic': False, | ||||
'type': 'git', | 'type': 'git', | ||||
'parents': [], | 'parents': [], | ||||
'metadata': [], | 'metadata': [], | ||||
} | } | ||||
self.SAMPLE_CONTENT = { | @given(contents()) | ||||
'checksums': { | def test_lookup_multiple_hashes_all_present(self, contents): | ||||
'blake2s256': self.BLAKE2S256_SAMPLE, | input_data = [] | ||||
'sha1': self.SHA1_SAMPLE, | expected_output = [] | ||||
'sha256': self.SHA256_SAMPLE, | for cnt in contents: | ||||
'sha1_git': self.SHA1GIT_SAMPLE, | input_data.append({'sha1': cnt['sha1']}) | ||||
}, | expected_output.append({'sha1': cnt['sha1'], | ||||
'length': 190, | 'found': True}) | ||||
'status': 'absent' | |||||
} | self.assertEqual(service.lookup_multiple_hashes(input_data), | ||||
self.SAMPLE_CONTENT_RAW = { | expected_output) | ||||
'blake2s256': self.BLAKE2S256_SAMPLE_BIN, | |||||
'sha1': self.SHA1_SAMPLE_BIN, | @given(contents(), unknown_contents()) | ||||
'sha256': self.SHA256_SAMPLE_BIN, | def test_lookup_multiple_hashes_some_missing(self, contents, | ||||
'sha1_git': self.SHA1GIT_SAMPLE_BIN, | unknown_contents): | ||||
'length': 190, | input_contents = list(itertools.chain(contents, unknown_contents)) | ||||
'status': 'hidden' | random.shuffle(input_contents) | ||||
} | |||||
input_data = [] | |||||
expected_output = [] | |||||
for cnt in input_contents: | |||||
input_data.append({'sha1': cnt['sha1']}) | |||||
expected_output.append({'sha1': cnt['sha1'], | |||||
'found': cnt in contents}) | |||||
self.assertEqual(service.lookup_multiple_hashes(input_data), | |||||
expected_output) | |||||
@given(unknown_content()) | |||||
def test_lookup_hash_does_not_exist(self, unknown_content): | |||||
actual_lookup = service.lookup_hash('sha1_git:%s' % | |||||
unknown_content['sha1_git']) | |||||
self.assertEqual(actual_lookup, {'found': None, | |||||
'algo': 'sha1_git'}) | |||||
@given(content()) | |||||
def test_lookup_hash_exist(self, content): | |||||
actual_lookup = service.lookup_hash('sha1:%s' % content['sha1']) | |||||
content_metadata = self.content_get_metadata(content['sha1']) | |||||
self.date_origin_visit1 = datetime.datetime( | self.assertEqual({'found': content_metadata, | ||||
2015, 1, 1, 22, 0, 0, | 'algo': 'sha1'}, actual_lookup) | ||||
tzinfo=datetime.timezone.utc) | |||||
self.origin_visit1 = { | |||||
'date': self.date_origin_visit1, | |||||
'origin': 1, | |||||
'visit': 1 | |||||
} | |||||
@patch('swh.web.common.service.storage') | |||||
def test_lookup_multiple_hashes_ball_missing(self, mock_storage): | |||||
# given | |||||
mock_storage.content_missing_per_sha1 = MagicMock(return_value=[]) | |||||
# when | |||||
actual_lookup = service.lookup_multiple_hashes( | |||||
[{'filename': 'a', | |||||
'sha1': '456caf10e9535160d90e874b45aa426de762f19f'}, | |||||
{'filename': 'b', | |||||
'sha1': '745bab676c8f3cec8016e0c39ea61cf57e518865'}]) | |||||
# then | |||||
self.assertEqual(actual_lookup, [ | |||||
{'filename': 'a', | |||||
'sha1': '456caf10e9535160d90e874b45aa426de762f19f', | |||||
'found': True}, | |||||
{'filename': 'b', | |||||
'sha1': '745bab676c8f3cec8016e0c39ea61cf57e518865', | |||||
'found': True} | |||||
]) | |||||
@patch('swh.web.common.service.storage') | |||||
def test_lookup_multiple_hashes_some_missing(self, mock_storage): | |||||
# given | |||||
mock_storage.content_missing_per_sha1 = MagicMock(return_value=[ | |||||
hash_to_bytes('456caf10e9535160d90e874b45aa426de762f19f') | |||||
]) | |||||
# when | |||||
actual_lookup = service.lookup_multiple_hashes( | |||||
[{'filename': 'a', | |||||
'sha1': '456caf10e9535160d90e874b45aa426de762f19f'}, | |||||
{'filename': 'b', | |||||
'sha1': '745bab676c8f3cec8016e0c39ea61cf57e518865'}]) | |||||
# then | |||||
self.assertEqual(actual_lookup, [ | |||||
{'filename': 'a', | |||||
'sha1': '456caf10e9535160d90e874b45aa426de762f19f', | |||||
'found': False}, | |||||
{'filename': 'b', | |||||
'sha1': '745bab676c8f3cec8016e0c39ea61cf57e518865', | |||||
'found': True} | |||||
]) | |||||
@patch('swh.web.common.service.storage') | |||||
def test_lookup_hash_does_not_exist(self, mock_storage): | |||||
# given | |||||
mock_storage.content_find = MagicMock(return_value=None) | |||||
# when | |||||
actual_lookup = service.lookup_hash( | |||||
'sha1_git:123caf10e9535160d90e874b45aa426de762f19f') | |||||
# then | |||||
self.assertEqual({'found': None, | |||||
'algo': 'sha1_git'}, actual_lookup) | |||||
# check the function has been called with parameters | |||||
mock_storage.content_find.assert_called_with( | |||||
{'sha1_git': | |||||
hash_to_bytes('123caf10e9535160d90e874b45aa426de762f19f')}) | |||||
@patch('swh.web.common.service.storage') | |||||
def test_lookup_hash_exist(self, mock_storage): | |||||
# given | |||||
stub_content = { | |||||
'sha1': hash_to_bytes( | |||||
'456caf10e9535160d90e874b45aa426de762f19f') | |||||
} | |||||
mock_storage.content_find = MagicMock(return_value=stub_content) | |||||
# when | @given(unknown_content()) | ||||
actual_lookup = service.lookup_hash( | def test_search_hash_does_not_exist(self, content): | ||||
'sha1:456caf10e9535160d90e874b45aa426de762f19f') | |||||
# then | |||||
self.assertEqual({ | |||||
'found': { | |||||
'checksums': { | |||||
'sha1': '456caf10e9535160d90e874b45aa426de762f19f' | |||||
} | |||||
}, | |||||
'algo': 'sha1' | |||||
}, actual_lookup) | |||||
mock_storage.content_find.assert_called_with( | |||||
{'sha1': | |||||
hash_to_bytes('456caf10e9535160d90e874b45aa426de762f19f')} | |||||
) | |||||
@patch('swh.web.common.service.storage') | actual_lookup = service.search_hash('sha1_git:%s' % | ||||
def test_search_hash_does_not_exist(self, mock_storage): | content['sha1_git']) | ||||
# given | |||||
mock_storage.content_find = MagicMock(return_value=None) | |||||
# when | |||||
actual_lookup = service.search_hash( | |||||
'sha1_git:123caf10e9535160d90e874b45aa426de762f19f') | |||||
# then | |||||
self.assertEqual({'found': False}, actual_lookup) | self.assertEqual({'found': False}, actual_lookup) | ||||
# check the function has been called with parameters | @given(content()) | ||||
mock_storage.content_find.assert_called_with( | def test_search_hash_exist(self, content): | ||||
{'sha1_git': | |||||
hash_to_bytes('123caf10e9535160d90e874b45aa426de762f19f')}) | |||||
@patch('swh.web.common.service.storage') | |||||
def test_search_hash_exist(self, mock_storage): | |||||
# given | |||||
stub_content = { | |||||
'sha1': hash_to_bytes( | |||||
'456caf10e9535160d90e874b45aa426de762f19f') | |||||
} | |||||
mock_storage.content_find = MagicMock(return_value=stub_content) | |||||
# when | actual_lookup = service.search_hash('sha1:%s' % content['sha1']) | ||||
actual_lookup = service.search_hash( | |||||
'sha1:456caf10e9535160d90e874b45aa426de762f19f') | |||||
# then | |||||
self.assertEqual({'found': True}, actual_lookup) | self.assertEqual({'found': True}, actual_lookup) | ||||
mock_storage.content_find.assert_called_with( | @pytest.mark.skipif(ctags_json_missing, | ||||
{'sha1': | reason="requires ctags with json output support") | ||||
hash_to_bytes('456caf10e9535160d90e874b45aa426de762f19f')}, | @given(contents_with_ctags()) | ||||
) | def test_lookup_content_ctags(self, contents_with_ctags): | ||||
@patch('swh.web.common.service.idx_storage') | content_sha1 = random.choice(contents_with_ctags['sha1s']) | ||||
def test_lookup_content_ctags(self, mock_idx_storage): | self.content_add_ctags(content_sha1) | ||||
# given | actual_ctags = \ | ||||
mock_idx_storage.content_ctags_get = MagicMock( | list(service.lookup_content_ctags('sha1:%s' % content_sha1)) | ||||
return_value=[{ | |||||
'id': hash_to_bytes( | expected_data = list(self.content_get_ctags(content_sha1)) | ||||
'123caf10e9535160d90e874b45aa426de762f19f'), | for ctag in expected_data: | ||||
'line': 100, | ctag['id'] = content_sha1 | ||||
'name': 'hello', | |||||
'kind': 'function', | self.assertEqual(actual_ctags, expected_data) | ||||
'tool_name': 'ctags', | |||||
'tool_version': 'some-version', | @given(unknown_content()) | ||||
}]) | def test_lookup_content_ctags_no_hash(self, unknown_content): | ||||
expected_ctags = [{ | |||||
'id': '123caf10e9535160d90e874b45aa426de762f19f', | actual_ctags = \ | ||||
'line': 100, | list(service.lookup_content_ctags('sha1:%s' % | ||||
'name': 'hello', | unknown_content['sha1'])) | ||||
'kind': 'function', | |||||
'tool_name': 'ctags', | |||||
'tool_version': 'some-version', | |||||
}] | |||||
# when | |||||
actual_ctags = list(service.lookup_content_ctags( | |||||
'sha1:123caf10e9535160d90e874b45aa426de762f19f')) | |||||
# then | |||||
self.assertEqual(actual_ctags, expected_ctags) | |||||
mock_idx_storage.content_ctags_get.assert_called_with( | |||||
[hash_to_bytes('123caf10e9535160d90e874b45aa426de762f19f')]) | |||||
@patch('swh.web.common.service.idx_storage') | |||||
def test_lookup_content_ctags_no_hash(self, mock_idx_storage): | |||||
# given | |||||
mock_idx_storage.content_ctags_get = MagicMock(return_value=[]) | |||||
# when | |||||
actual_ctags = list(service.lookup_content_ctags( | |||||
'sha1:123caf10e9535160d90e874b45aa426de762f19f')) | |||||
# then | |||||
self.assertEqual(actual_ctags, []) | self.assertEqual(actual_ctags, []) | ||||
@patch('swh.web.common.service.idx_storage') | @given(content()) | ||||
def test_lookup_content_filetype(self, mock_idx_storage): | def test_lookup_content_filetype(self, content): | ||||
# given | |||||
mock_idx_storage.content_mimetype_get = MagicMock( | |||||
return_value=[{ | |||||
'id': hash_to_bytes( | |||||
'123caf10e9535160d90e874b45aa426de762f19f'), | |||||
'mimetype': 'text/x-c++', | |||||
'encoding': 'us-ascii', | |||||
}]) | |||||
expected_filetype = { | |||||
'id': '123caf10e9535160d90e874b45aa426de762f19f', | |||||
'mimetype': 'text/x-c++', | |||||
'encoding': 'us-ascii', | |||||
} | |||||
# when | self.content_add_mimetype(content['sha1']) | ||||
actual_filetype = service.lookup_content_filetype( | actual_filetype = service.lookup_content_filetype(content['sha1']) | ||||
'sha1:123caf10e9535160d90e874b45aa426de762f19f') | |||||
# then | expected_filetype = self.content_get_mimetype(content['sha1']) | ||||
self.assertEqual(actual_filetype, expected_filetype) | self.assertEqual(actual_filetype, expected_filetype) | ||||
mock_idx_storage.content_mimetype_get.assert_called_with( | @given(content()) | ||||
[hash_to_bytes('123caf10e9535160d90e874b45aa426de762f19f')]) | def test_lookup_content_language(self, content): | ||||
@patch('swh.web.common.service.idx_storage') | self.content_add_language(content['sha1']) | ||||
@patch('swh.web.common.service.storage') | actual_language = service.lookup_content_language(content['sha1']) | ||||
def test_lookup_content_filetype_2(self, mock_storage, mock_idx_storage): | |||||
# given | |||||
mock_storage.content_find = MagicMock( | |||||
return_value={ | |||||
'sha1': hash_to_bytes( | |||||
'123caf10e9535160d90e874b45aa426de762f19f') | |||||
} | |||||
) | |||||
mock_idx_storage.content_mimetype_get = MagicMock( | |||||
return_value=[{ | |||||
'id': hash_to_bytes( | |||||
'123caf10e9535160d90e874b45aa426de762f19f'), | |||||
'mimetype': 'text/x-python', | |||||
'encoding': 'us-ascii', | |||||
}] | |||||
) | |||||
expected_filetype = { | |||||
'id': '123caf10e9535160d90e874b45aa426de762f19f', | |||||
'mimetype': 'text/x-python', | |||||
'encoding': 'us-ascii', | |||||
} | |||||
# when | |||||
actual_filetype = service.lookup_content_filetype( | |||||
'sha1_git:456caf10e9535160d90e874b45aa426de762f19f') | |||||
# then | expected_language = self.content_get_language(content['sha1']) | ||||
self.assertEqual(actual_filetype, expected_filetype) | |||||
mock_storage.content_find( | |||||
'sha1_git', hash_to_bytes( | |||||
'456caf10e9535160d90e874b45aa426de762f19f') | |||||
) | |||||
mock_idx_storage.content_mimetype_get.assert_called_with( | |||||
[hash_to_bytes('123caf10e9535160d90e874b45aa426de762f19f')]) | |||||
@patch('swh.web.common.service.idx_storage') | |||||
def test_lookup_content_language(self, mock_idx_storage): | |||||
# given | |||||
mock_idx_storage.content_language_get = MagicMock( | |||||
return_value=[{ | |||||
'id': hash_to_bytes( | |||||
'123caf10e9535160d90e874b45aa426de762f19f'), | |||||
'lang': 'python', | |||||
}]) | |||||
expected_language = { | |||||
'id': '123caf10e9535160d90e874b45aa426de762f19f', | |||||
'lang': 'python', | |||||
} | |||||
# when | |||||
actual_language = service.lookup_content_language( | |||||
'sha1:123caf10e9535160d90e874b45aa426de762f19f') | |||||
# then | |||||
self.assertEqual(actual_language, expected_language) | self.assertEqual(actual_language, expected_language) | ||||
mock_idx_storage.content_language_get.assert_called_with( | @given(contents_with_ctags()) | ||||
[hash_to_bytes('123caf10e9535160d90e874b45aa426de762f19f')]) | def test_lookup_expression(self, contents_with_ctags): | ||||
@patch('swh.web.common.service.idx_storage') | per_page = 10 | ||||
@patch('swh.web.common.service.storage') | expected_ctags = [] | ||||
def test_lookup_content_language_2(self, mock_storage, mock_idx_storage): | |||||
# given | |||||
mock_storage.content_find = MagicMock( | |||||
return_value={ | |||||
'sha1': hash_to_bytes( | |||||
'123caf10e9535160d90e874b45aa426de762f19f') | |||||
} | |||||
) | |||||
mock_idx_storage.content_language_get = MagicMock( | |||||
return_value=[{ | |||||
'id': hash_to_bytes( | |||||
'123caf10e9535160d90e874b45aa426de762f19f'), | |||||
'lang': 'haskell', | |||||
}] | |||||
) | |||||
expected_language = { | |||||
'id': '123caf10e9535160d90e874b45aa426de762f19f', | |||||
'lang': 'haskell', | |||||
} | |||||
# when | |||||
actual_language = service.lookup_content_language( | |||||
'sha1_git:456caf10e9535160d90e874b45aa426de762f19f') | |||||
# then | |||||
self.assertEqual(actual_language, expected_language) | |||||
mock_storage.content_find( | |||||
'sha1_git', hash_to_bytes( | |||||
'456caf10e9535160d90e874b45aa426de762f19f') | |||||
) | |||||
mock_idx_storage.content_language_get.assert_called_with( | |||||
[hash_to_bytes('123caf10e9535160d90e874b45aa426de762f19f')]) | |||||
@patch('swh.web.common.service.idx_storage') | for content_sha1 in contents_with_ctags['sha1s']: | ||||
def test_lookup_expression(self, mock_idx_storage): | if len(expected_ctags) == per_page: | ||||
# given | break | ||||
mock_idx_storage.content_ctags_search = MagicMock( | self.content_add_ctags(content_sha1) | ||||
return_value=[{ | for ctag in self.content_get_ctags(content_sha1): | ||||
'id': hash_to_bytes( | if len(expected_ctags) == per_page: | ||||
'123caf10e9535160d90e874b45aa426de762f19f'), | break | ||||
'name': 'foobar', | if ctag['name'] == contents_with_ctags['symbol_name']: | ||||
'kind': 'variable', | del ctag['id'] | ||||
'lang': 'C', | ctag['sha1'] = content_sha1 | ||||
'line': 10 | expected_ctags.append(ctag) | ||||
}]) | |||||
expected_ctags = [{ | actual_ctags = \ | ||||
'sha1': '123caf10e9535160d90e874b45aa426de762f19f', | list(service.lookup_expression(contents_with_ctags['symbol_name'], | ||||
'name': 'foobar', | last_sha1=None, per_page=10)) | ||||
'kind': 'variable', | |||||
'lang': 'C', | |||||
'line': 10 | |||||
}] | |||||
# when | |||||
actual_ctags = list(service.lookup_expression( | |||||
'foobar', last_sha1='hash', per_page=10)) | |||||
# then | |||||
self.assertEqual(actual_ctags, expected_ctags) | self.assertEqual(actual_ctags, expected_ctags) | ||||
mock_idx_storage.content_ctags_search.assert_called_with( | def test_lookup_expression_no_result(self): | ||||
'foobar', last_sha1='hash', limit=10) | |||||
@patch('swh.web.common.service.idx_storage') | |||||
def test_lookup_expression_no_result(self, mock_idx_storage): | |||||
# given | |||||
mock_idx_storage.content_ctags_search = MagicMock( | |||||
return_value=[]) | |||||
expected_ctags = [] | expected_ctags = [] | ||||
# when | actual_ctags = \ | ||||
actual_ctags = list(service.lookup_expression( | list(service.lookup_expression('barfoo', last_sha1=None, | ||||
'barfoo', last_sha1='hash', per_page=10)) | per_page=10)) | ||||
# then | |||||
self.assertEqual(actual_ctags, expected_ctags) | self.assertEqual(actual_ctags, expected_ctags) | ||||
mock_idx_storage.content_ctags_search.assert_called_with( | @pytest.mark.skipif(fossology_missing, | ||||
'barfoo', last_sha1='hash', limit=10) | reason="requires fossology-nomossa installed") | ||||
@given(content()) | |||||
@patch('swh.web.common.service.idx_storage') | def test_lookup_content_license(self, content): | ||||
def test_lookup_content_license(self, mock_idx_storage): | |||||
# given | |||||
mock_idx_storage.content_fossology_license_get = MagicMock( | |||||
return_value=[{ | |||||
hash_to_bytes('123caf10e9535160d90e874b45aa426de762f19f'): [{ | |||||
'licenses': ['GPL-3.0+'], | |||||
'tool': {} | |||||
}] | |||||
}]) | |||||
expected_license = { | |||||
'id': '123caf10e9535160d90e874b45aa426de762f19f', | |||||
'facts': [{ | |||||
'licenses': ['GPL-3.0+'], | |||||
'tool': {} | |||||
}] | |||||
} | |||||
# when | self.content_add_license(content['sha1']) | ||||
actual_license = service.lookup_content_license( | actual_license = service.lookup_content_license(content['sha1']) | ||||
'sha1:123caf10e9535160d90e874b45aa426de762f19f') | |||||
# then | expected_license = self.content_get_license(content['sha1']) | ||||
self.assertEqual(actual_license, expected_license) | self.assertEqual(actual_license, expected_license) | ||||
mock_idx_storage.content_fossology_license_get.assert_called_with( | def test_stat_counters(self): | ||||
[hash_to_bytes('123caf10e9535160d90e874b45aa426de762f19f')]) | actual_stats = service.stat_counters() | ||||
self.assertEqual(actual_stats, self.storage.stat_counters()) | |||||
@patch('swh.web.common.service.idx_storage') | @given(new_origin(), visit_dates()) | ||||
@patch('swh.web.common.service.storage') | def test_lookup_origin_visits(self, new_origin, visit_dates): | ||||
def test_lookup_content_license_2(self, mock_storage, mock_idx_storage): | |||||
# given | |||||
mock_storage.content_find = MagicMock( | |||||
return_value={ | |||||
'sha1': hash_to_bytes( | |||||
'123caf10e9535160d90e874b45aa426de762f19f') | |||||
} | |||||
) | |||||
mock_idx_storage.content_fossology_license_get = MagicMock( | |||||
return_value=[{ | |||||
hash_to_bytes('123caf10e9535160d90e874b45aa426de762f19f'): [{ | |||||
'licenses': ['BSD-2-Clause'], | |||||
'tool': {} | |||||
}] | |||||
}] | origin_id = self.storage.origin_add_one(new_origin) | ||||
) | for ts in visit_dates: | ||||
expected_license = { | self.storage.origin_visit_add(origin_id, ts) | ||||
'id': '123caf10e9535160d90e874b45aa426de762f19f', | |||||
'facts': [{ | |||||
'licenses': ['BSD-2-Clause'], | |||||
'tool': {} | |||||
}] | |||||
} | |||||
# when | actual_origin_visits = list(service.lookup_origin_visits(origin_id)) | ||||
actual_license = service.lookup_content_license( | |||||
'sha1_git:456caf10e9535160d90e874b45aa426de762f19f') | |||||
# then | expected_visits = list(self.storage.origin_visit_get(origin_id)) | ||||
self.assertEqual(actual_license, expected_license) | for visit in expected_visits: | ||||
visit['date'] = visit['date'].isoformat() | |||||
visit['metadata'] = {} | |||||
mock_storage.content_find( | self.assertEqual(actual_origin_visits, expected_visits) | ||||
'sha1_git', hash_to_bytes( | |||||
'456caf10e9535160d90e874b45aa426de762f19f') | |||||
) | |||||
mock_idx_storage.content_fossology_license_get.assert_called_with( | |||||
[hash_to_bytes('123caf10e9535160d90e874b45aa426de762f19f')]) | |||||
@patch('swh.web.common.service.storage') | @given(new_origin(), visit_dates()) | ||||
def test_stat_counters(self, mock_storage): | def test_lookup_origin_visit(self, new_origin, visit_dates): | ||||
# given | origin_id = self.storage.origin_add_one(new_origin) | ||||
input_stats = { | visits = [] | ||||
"content": 1770830, | for ts in visit_dates: | ||||
"directory": 211683, | visits.append(self.storage.origin_visit_add(origin_id, ts)) | ||||
"directory_entry_dir": 209167, | |||||
"directory_entry_file": 1807094, | |||||
"directory_entry_rev": 0, | |||||
"origin": 1096, | |||||
"person": 0, | |||||
"release": 8584, | |||||
"revision": 7792, | |||||
"revision_history": 0, | |||||
"skipped_content": 0 | |||||
} | |||||
mock_storage.stat_counters = MagicMock(return_value=input_stats) | |||||
# when | visit = random.choice(visits)['visit'] | ||||
actual_stats = service.stat_counters() | actual_origin_visit = service.lookup_origin_visit(origin_id, visit) | ||||
# then | expected_visit = dict(self.storage.origin_visit_get_by(origin_id, | ||||
expected_stats = input_stats | visit)) | ||||
self.assertEqual(actual_stats, expected_stats) | expected_visit['date'] = expected_visit['date'].isoformat() | ||||
expected_visit['metadata'] = {} | |||||
mock_storage.stat_counters.assert_called_with() | |||||
@patch('swh.web.common.service._lookup_origin_visits') | |||||
def test_lookup_origin_visits(self, mock_lookup_visits): | |||||
# given | |||||
date_origin_visit2 = datetime.datetime( | |||||
2013, 7, 1, 20, 0, 0, | |||||
tzinfo=datetime.timezone.utc) | |||||
date_origin_visit3 = datetime.datetime( | |||||
2015, 1, 1, 21, 0, 0, | |||||
tzinfo=datetime.timezone.utc) | |||||
stub_result = [self.origin_visit1, { | |||||
'date': date_origin_visit2, | |||||
'origin': 1, | |||||
'visit': 2, | |||||
'target': hash_to_bytes( | |||||
'65a55bbdf3629f916219feb3dcc7393ded1bc8db'), | |||||
'branch': b'master', | |||||
'target_type': 'release', | |||||
'metadata': None, | |||||
}, { | |||||
'date': date_origin_visit3, | |||||
'origin': 1, | |||||
'visit': 3 | |||||
}] | |||||
mock_lookup_visits.return_value = stub_result | |||||
# when | |||||
expected_origin_visits = [{ | |||||
'date': self.origin_visit1['date'].isoformat(), | |||||
'origin': self.origin_visit1['origin'], | |||||
'visit': self.origin_visit1['visit'] | |||||
}, { | |||||
'date': date_origin_visit2.isoformat(), | |||||
'origin': 1, | |||||
'visit': 2, | |||||
'target': '65a55bbdf3629f916219feb3dcc7393ded1bc8db', | |||||
'branch': 'master', | |||||
'target_type': 'release', | |||||
'metadata': {}, | |||||
}, { | |||||
'date': date_origin_visit3.isoformat(), | |||||
'origin': 1, | |||||
'visit': 3 | |||||
}] | |||||
actual_origin_visits = service.lookup_origin_visits(6) | |||||
# then | |||||
self.assertEqual(list(actual_origin_visits), expected_origin_visits) | |||||
mock_lookup_visits.assert_called_once_with( | |||||
6, last_visit=None, limit=10) | |||||
@patch('swh.web.common.service.storage') | |||||
def test_lookup_origin_visit(self, mock_storage): | |||||
# given | |||||
stub_result = self.origin_visit1 | |||||
mock_storage.origin_visit_get_by.return_value = stub_result | |||||
expected_origin_visit = { | |||||
'date': self.origin_visit1['date'].isoformat(), | |||||
'origin': self.origin_visit1['origin'], | |||||
'visit': self.origin_visit1['visit'] | |||||
} | |||||
# when | self.assertEqual(actual_origin_visit, expected_visit) | ||||
actual_origin_visit = service.lookup_origin_visit(1, 1) | |||||
# then | @given(new_origin()) | ||||
self.assertEqual(actual_origin_visit, expected_origin_visit) | def test_lookup_origin(self, new_origin): | ||||
origin_id = self.storage.origin_add_one(new_origin) | |||||
mock_storage.origin_visit_get_by.assert_called_once_with(1, 1) | actual_origin = service.lookup_origin({'id': origin_id}) | ||||
expected_origin = self.storage.origin_get({'id': origin_id}) | |||||
self.assertEqual(actual_origin, expected_origin) | |||||
@patch('swh.web.common.service.storage') | actual_origin = service.lookup_origin({'type': new_origin['type'], | ||||
def test_lookup_origin(self, mock_storage): | 'url': new_origin['url']}) | ||||
# given | expected_origin = self.storage.origin_get({'type': new_origin['type'], | ||||
mock_storage.origin_get = MagicMock(return_value={ | 'url': new_origin['url']}) | ||||
'id': 'origin-id', | self.assertEqual(actual_origin, expected_origin) | ||||
'url': 'ftp://some/url/to/origin', | |||||
'type': 'ftp'}) | |||||
# when | |||||
actual_origin = service.lookup_origin({'id': 'origin-id'}) | |||||
# then | |||||
self.assertEqual(actual_origin, {'id': 'origin-id', | |||||
'url': 'ftp://some/url/to/origin', | |||||
'type': 'ftp'}) | |||||
mock_storage.origin_get.assert_called_with({'id': 'origin-id'}) | |||||
@patch('swh.web.common.service.storage') | |||||
def test_lookup_release_ko_id_checksum_not_a_sha1(self, mock_storage): | |||||
# given | |||||
mock_storage.release_get = MagicMock() | |||||
@given(invalid_sha1()) | |||||
def test_lookup_release_ko_id_checksum_not_a_sha1(self, invalid_sha1): | |||||
with self.assertRaises(BadInputExc) as cm: | with self.assertRaises(BadInputExc) as cm: | ||||
# when | service.lookup_release(invalid_sha1) | ||||
service.lookup_release('not-a-sha1') | |||||
self.assertIn('invalid checksum', cm.exception.args[0].lower()) | self.assertIn('invalid checksum', cm.exception.args[0].lower()) | ||||
mock_storage.release_get.called = False | @given(sha256()) | ||||
def test_lookup_release_ko_id_checksum_too_long(self, sha256): | |||||
@patch('swh.web.common.service.storage') | |||||
def test_lookup_release_ko_id_checksum_too_long(self, mock_storage): | |||||
# given | |||||
mock_storage.release_get = MagicMock() | |||||
# when | |||||
with self.assertRaises(BadInputExc) as cm: | with self.assertRaises(BadInputExc) as cm: | ||||
service.lookup_release( | service.lookup_release(sha256) | ||||
'13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f62d4daf5' | |||||
'1aea892abe') | |||||
self.assertEqual('Only sha1_git is supported.', cm.exception.args[0]) | self.assertEqual('Only sha1_git is supported.', cm.exception.args[0]) | ||||
mock_storage.release_get.called = False | @given(directory()) | ||||
def test_lookup_directory_with_path_not_found(self, directory): | |||||
@patch('swh.web.common.service.storage') | path = 'some/invalid/path/here' | ||||
def test_lookup_directory_with_path_not_found(self, mock_storage): | with self.assertRaises(NotFoundExc) as cm: | ||||
# given | service.lookup_directory_with_path(directory, path) | ||||
mock_storage.lookup_directory_with_path = MagicMock(return_value=None) | self.assertEqual('Directory entry with path %s from %s ' | ||||
'not found' % (path, directory), | |||||
sha1_git = '65a55bbdf3629f916219feb3dcc7393ded1bc8db' | cm.exception.args[0]) | ||||
# when | |||||
actual_directory = mock_storage.lookup_directory_with_path( | |||||
sha1_git, 'some/path/here') | |||||
self.assertIsNone(actual_directory) | |||||
@patch('swh.web.common.service.storage') | |||||
def test_lookup_directory_with_path_found(self, mock_storage): | |||||
# given | |||||
sha1_git = '65a55bbdf3629f916219feb3dcc7393ded1bc8db' | |||||
entry = {'id': 'dir-id', | |||||
'type': 'dir', | |||||
'name': 'some/path/foo'} | |||||
mock_storage.lookup_directory_with_path = MagicMock(return_value=entry) | |||||
# when | |||||
actual_directory = mock_storage.lookup_directory_with_path( | |||||
sha1_git, 'some/path/here') | |||||
self.assertEqual(entry, actual_directory) | |||||
@patch('swh.web.common.service.storage') | |||||
def test_lookup_release(self, mock_storage): | |||||
# given | |||||
mock_storage.release_get = MagicMock(return_value=[{ | |||||
'id': hash_to_bytes('65a55bbdf3629f916219feb3dcc7393ded1bc8db'), | |||||
'target': None, | |||||
'date': { | |||||
'timestamp': datetime.datetime( | |||||
2015, 1, 1, 22, 0, 0, | |||||
tzinfo=datetime.timezone.utc).timestamp(), | |||||
'offset': 0, | |||||
'negative_utc': True, | |||||
}, | |||||
'name': b'v0.0.1', | |||||
'message': b'synthetic release', | |||||
'synthetic': True, | |||||
}]) | |||||
# when | |||||
actual_release = service.lookup_release( | |||||
'65a55bbdf3629f916219feb3dcc7393ded1bc8db') | |||||
# then | |||||
self.assertEqual(actual_release, { | |||||
'id': '65a55bbdf3629f916219feb3dcc7393ded1bc8db', | |||||
'target': None, | |||||
'date': '2015-01-01T22:00:00-00:00', | |||||
'name': 'v0.0.1', | |||||
'message': 'synthetic release', | |||||
'synthetic': True, | |||||
}) | |||||
mock_storage.release_get.assert_called_with( | |||||
[hash_to_bytes('65a55bbdf3629f916219feb3dcc7393ded1bc8db')]) | |||||
def test_lookup_revision_with_context_ko_not_a_sha1_1(self): | @given(directory()) | ||||
# given | def test_lookup_directory_with_path_found(self, directory): | ||||
sha1_git = '13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f62d4' \ | directory_content = self.directory_ls(directory) | ||||
'daf51aea892abe' | directory_entry = random.choice(directory_content) | ||||
sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db' | path = directory_entry['name'] | ||||
actual_result = service.lookup_directory_with_path(directory, path) | |||||
self.assertEqual(actual_result, directory_entry) | |||||
@given(release()) | |||||
def test_lookup_release(self, release): | |||||
actual_release = service.lookup_release(release) | |||||
self.assertEqual(actual_release, | |||||
self.release_get(release)) | |||||
@given(revision(), invalid_sha1(), sha256()) | |||||
def test_lookup_revision_with_context_ko_not_a_sha1(self, revision, | |||||
invalid_sha1, | |||||
sha256): | |||||
sha1_git_root = revision | |||||
sha1_git = invalid_sha1 | |||||
# when | |||||
with self.assertRaises(BadInputExc) as cm: | with self.assertRaises(BadInputExc) as cm: | ||||
service.lookup_revision_with_context(sha1_git_root, sha1_git) | service.lookup_revision_with_context(sha1_git_root, sha1_git) | ||||
self.assertIn('Only sha1_git is supported', cm.exception.args[0]) | self.assertIn('Invalid checksum query string', cm.exception.args[0]) | ||||
def test_lookup_revision_with_context_ko_not_a_sha1_2(self): | sha1_git = sha256 | ||||
# given | |||||
sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db' | |||||
sha1_git = '13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f6' \ | |||||
'2d4daf51aea892abe' | |||||
# when | |||||
with self.assertRaises(BadInputExc) as cm: | with self.assertRaises(BadInputExc) as cm: | ||||
service.lookup_revision_with_context(sha1_git_root, sha1_git) | service.lookup_revision_with_context(sha1_git_root, sha1_git) | ||||
self.assertIn('Only sha1_git is supported', cm.exception.args[0]) | self.assertIn('Only sha1_git is supported', cm.exception.args[0]) | ||||
@patch('swh.web.common.service.storage') | @given(revision(), unknown_revision()) | ||||
def test_lookup_revision_with_context_ko_sha1_git_does_not_exist( | def test_lookup_revision_with_context_ko_sha1_git_does_not_exist( | ||||
self, | self, revision, unknown_revision): | ||||
mock_storage): | sha1_git_root = revision | ||||
# given | sha1_git = unknown_revision | ||||
sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db' | |||||
sha1_git = '777777bdf3629f916219feb3dcc7393ded1bc8db' | |||||
sha1_git_bin = hash_to_bytes(sha1_git) | |||||
mock_storage.revision_get.return_value = None | |||||
# when | |||||
with self.assertRaises(NotFoundExc) as cm: | with self.assertRaises(NotFoundExc) as cm: | ||||
service.lookup_revision_with_context(sha1_git_root, sha1_git) | service.lookup_revision_with_context(sha1_git_root, sha1_git) | ||||
self.assertIn('Revision 777777bdf3629f916219feb3dcc7393ded1bc8db' | self.assertIn('Revision %s not found' % sha1_git, cm.exception.args[0]) | ||||
' not found', cm.exception.args[0]) | |||||
mock_storage.revision_get.assert_called_once_with( | |||||
[sha1_git_bin]) | |||||
@patch('swh.web.common.service.storage') | @given(revision(), unknown_revision()) | ||||
def test_lookup_revision_with_context_ko_root_sha1_git_does_not_exist( | def test_lookup_revision_with_context_ko_root_sha1_git_does_not_exist( | ||||
self, | self, revision, unknown_revision): | ||||
mock_storage): | sha1_git_root = unknown_revision | ||||
# given | sha1_git = revision | ||||
sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db' | |||||
sha1_git = '777777bdf3629f916219feb3dcc7393ded1bc8db' | |||||
sha1_git_root_bin = hash_to_bytes(sha1_git_root) | |||||
sha1_git_bin = hash_to_bytes(sha1_git) | |||||
mock_storage.revision_get.side_effect = ['foo', None] | |||||
# when | |||||
with self.assertRaises(NotFoundExc) as cm: | with self.assertRaises(NotFoundExc) as cm: | ||||
service.lookup_revision_with_context(sha1_git_root, sha1_git) | service.lookup_revision_with_context(sha1_git_root, sha1_git) | ||||
self.assertIn('Revision root 65a55bbdf3629f916219feb3dcc7393ded1bc8db' | self.assertIn('Revision root %s not found' % sha1_git_root, | ||||
' not found', cm.exception.args[0]) | cm.exception.args[0]) | ||||
mock_storage.revision_get.assert_has_calls([call([sha1_git_bin]), | |||||
call([sha1_git_root_bin])]) | |||||
@patch('swh.web.common.service.storage') | |||||
@patch('swh.web.common.service.query') | |||||
def test_lookup_revision_with_context(self, mock_query, mock_storage): | |||||
# given | |||||
sha1_git_root = '666' | |||||
sha1_git = '883' | |||||
sha1_git_root_bin = b'666' | |||||
sha1_git_bin = b'883' | |||||
sha1_git_root_dict = { | |||||
'id': sha1_git_root_bin, | |||||
'parents': [b'999'], | |||||
} | |||||
sha1_git_dict = { | |||||
'id': sha1_git_bin, | |||||
'parents': [], | |||||
'directory': b'278', | |||||
} | |||||
stub_revisions = [ | |||||
sha1_git_root_dict, | |||||
{ | |||||
'id': b'999', | |||||
'parents': [b'777', b'883', b'888'], | |||||
}, | |||||
{ | |||||
'id': b'777', | |||||
'parents': [b'883'], | |||||
}, | |||||
sha1_git_dict, | |||||
{ | |||||
'id': b'888', | |||||
'parents': [b'889'], | |||||
}, | |||||
{ | |||||
'id': b'889', | |||||
'parents': [], | |||||
}, | |||||
] | |||||
# inputs ok | |||||
mock_query.parse_hash_with_algorithms_or_throws.side_effect = [ | |||||
('sha1', sha1_git_bin), | |||||
('sha1', sha1_git_root_bin) | |||||
] | |||||
# lookup revision first 883, then 666 (both exists) | |||||
mock_storage.revision_get.return_value = [ | |||||
sha1_git_dict, | |||||
sha1_git_root_dict | |||||
] | |||||
mock_storage.revision_log = MagicMock( | |||||
return_value=stub_revisions) | |||||
# when | |||||
actual_revision = service.lookup_revision_with_context( | |||||
sha1_git_root, | |||||
sha1_git) | |||||
# then | |||||
self.assertEqual(actual_revision, { | |||||
'id': hash_to_hex(sha1_git_bin), | |||||
'parents': [], | |||||
'children': [hash_to_hex(b'999'), hash_to_hex(b'777')], | |||||
'directory': hash_to_hex(b'278'), | |||||
'merge': False | |||||
}) | |||||
mock_query.parse_hash_with_algorithms_or_throws.assert_has_calls( | |||||
[call(sha1_git, ['sha1'], 'Only sha1_git is supported.'), | |||||
call(sha1_git_root, ['sha1'], 'Only sha1_git is supported.')]) | |||||
mock_storage.revision_log.assert_called_with( | |||||
[sha1_git_root_bin], 100) | |||||
@patch('swh.web.common.service.storage') | |||||
@patch('swh.web.common.service.query') | |||||
def test_lookup_revision_with_context_retrieved_as_dict( | |||||
self, mock_query, mock_storage): | |||||
# given | |||||
sha1_git = '883' | |||||
sha1_git_root_bin = b'666' | |||||
sha1_git_bin = b'883' | |||||
sha1_git_root_dict = { | |||||
'id': sha1_git_root_bin, | |||||
'parents': [b'999'], | |||||
} | |||||
sha1_git_dict = { | |||||
'id': sha1_git_bin, | |||||
'parents': [], | |||||
'directory': b'278', | |||||
} | |||||
stub_revisions = [ | |||||
sha1_git_root_dict, | |||||
{ | |||||
'id': b'999', | |||||
'parents': [b'777', b'883', b'888'], | |||||
}, | |||||
{ | |||||
'id': b'777', | |||||
'parents': [b'883'], | |||||
}, | |||||
sha1_git_dict, | |||||
{ | |||||
'id': b'888', | |||||
'parents': [b'889'], | |||||
}, | |||||
{ | |||||
'id': b'889', | |||||
'parents': [], | |||||
}, | |||||
] | |||||
# inputs ok | |||||
mock_query.parse_hash_with_algorithms_or_throws.return_value = ( | |||||
'sha1', sha1_git_bin) | |||||
# lookup only on sha1 | |||||
mock_storage.revision_get.return_value = [sha1_git_dict] | |||||
mock_storage.revision_log.return_value = stub_revisions | |||||
# when | @given(ancestor_revisions()) | ||||
actual_revision = service.lookup_revision_with_context( | def test_lookup_revision_with_context(self, ancestor_revisions): | ||||
{'id': sha1_git_root_bin}, | sha1_git = ancestor_revisions['sha1_git'] | ||||
root_sha1_git = ancestor_revisions['sha1_git_root'] | |||||
for sha1_git_root in (root_sha1_git, | |||||
{'id': hash_to_bytes(root_sha1_git)}): | |||||
actual_revision = \ | |||||
service.lookup_revision_with_context(sha1_git_root, | |||||
sha1_git) | sha1_git) | ||||
# then | children = [] | ||||
self.assertEqual(actual_revision, { | for rev in self.revision_log(root_sha1_git): | ||||
'id': hash_to_hex(sha1_git_bin), | for p_rev in rev['parents']: | ||||
'parents': [], | p_rev_hex = hash_to_hex(p_rev) | ||||
'children': [hash_to_hex(b'999'), hash_to_hex(b'777')], | if p_rev_hex == sha1_git: | ||||
'directory': hash_to_hex(b'278'), | children.append(rev['id']) | ||||
'merge': False | |||||
}) | |||||
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with( # noqa | expected_revision = self.revision_get(sha1_git) | ||||
sha1_git, ['sha1'], 'Only sha1_git is supported.') | expected_revision['children'] = children | ||||
self.assertEqual(actual_revision, expected_revision) | |||||
mock_storage.revision_get.assert_called_once_with([sha1_git_bin]) | @given(non_ancestor_revisions()) | ||||
def test_lookup_revision_with_context_ko(self, non_ancestor_revisions): | |||||
sha1_git = non_ancestor_revisions['sha1_git'] | |||||
root_sha1_git = non_ancestor_revisions['sha1_git_root'] | |||||
mock_storage.revision_log.assert_called_with( | with self.assertRaises(NotFoundExc) as cm: | ||||
[sha1_git_root_bin], 100) | service.lookup_revision_with_context(root_sha1_git, sha1_git) | ||||
self.assertIn('Revision %s is not an ancestor of %s' % | |||||
(sha1_git, root_sha1_git), cm.exception.args[0]) | |||||
@patch('swh.web.common.service.storage') | @given(unknown_revision()) | ||||
@patch('swh.web.common.service.query') | def test_lookup_directory_with_revision_not_found(self, unknown_revision): | ||||
def test_lookup_directory_with_revision_not_found(self, | |||||
mock_query, | |||||
mock_storage): | |||||
# given | |||||
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1', | |||||
b'123') | |||||
mock_storage.revision_get.return_value = None | |||||
# when | |||||
with self.assertRaises(NotFoundExc) as cm: | with self.assertRaises(NotFoundExc) as cm: | ||||
service.lookup_directory_with_revision('123') | service.lookup_directory_with_revision(unknown_revision) | ||||
self.assertIn('Revision 123 not found', cm.exception.args[0]) | self.assertIn('Revision %s not found' % unknown_revision, | ||||
cm.exception.args[0]) | |||||
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with | |||||
('123', ['sha1'], 'Only sha1_git is supported.') | |||||
mock_storage.revision_get.assert_called_once_with([b'123']) | |||||
@patch('swh.web.common.service.storage') | |||||
@patch('swh.web.common.service.query') | |||||
def test_lookup_directory_with_revision_ko_revision_with_path_to_nowhere( | |||||
self, | |||||
mock_query, | |||||
mock_storage): | |||||
# given | |||||
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1', | |||||
b'123') | |||||
dir_id = b'dir-id-as-sha1' | |||||
mock_storage.revision_get.return_value = [{ | |||||
'directory': dir_id, | |||||
}] | |||||
mock_storage.directory_entry_get_by_path.return_value = None | @given(revision()) | ||||
def test_lookup_directory_with_revision_ko_path_to_nowhere(self, revision): | |||||
# when | invalid_path = 'path/to/something/unknown' | ||||
with self.assertRaises(NotFoundExc) as cm: | with self.assertRaises(NotFoundExc) as cm: | ||||
service.lookup_directory_with_revision( | service.lookup_directory_with_revision(revision, invalid_path) | ||||
'123', | |||||
'path/to/something/unknown') | |||||
exception_text = cm.exception.args[0].lower() | exception_text = cm.exception.args[0].lower() | ||||
self.assertIn('directory or file', exception_text) | self.assertIn('directory or file', exception_text) | ||||
self.assertIn('path/to/something/unknown', exception_text) | self.assertIn(invalid_path, exception_text) | ||||
self.assertIn('revision 123', exception_text) | self.assertIn('revision %s' % revision, exception_text) | ||||
self.assertIn('not found', exception_text) | self.assertIn('not found', exception_text) | ||||
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with | @given(revision_with_submodules()) | ||||
('123', ['sha1'], 'Only sha1_git is supported.') | |||||
mock_storage.revision_get.assert_called_once_with([b'123']) | |||||
mock_storage.directory_entry_get_by_path.assert_called_once_with( | |||||
b'dir-id-as-sha1', [b'path', b'to', b'something', b'unknown']) | |||||
@patch('swh.web.common.service.storage') | |||||
@patch('swh.web.common.service.query') | |||||
def test_lookup_directory_with_revision_ko_type_not_implemented( | def test_lookup_directory_with_revision_ko_type_not_implemented( | ||||
self, | self, revision_with_submodules): | ||||
mock_query, | |||||
mock_storage): | |||||
# given | |||||
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1', | |||||
b'123') | |||||
dir_id = b'dir-id-as-sha1' | |||||
mock_storage.revision_get.return_value = [{ | |||||
'directory': dir_id, | |||||
}] | |||||
mock_storage.directory_entry_get_by_path.return_value = { | |||||
'type': 'rev', | |||||
'name': b'some/path/to/rev', | |||||
'target': b'456' | |||||
} | |||||
stub_content = { | |||||
'id': b'12', | |||||
'type': 'file' | |||||
} | |||||
mock_storage.content_get.return_value = stub_content | |||||
# when | |||||
with self.assertRaises(NotImplementedError) as cm: | with self.assertRaises(NotImplementedError) as cm: | ||||
service.lookup_directory_with_revision( | service.lookup_directory_with_revision( | ||||
'123', | revision_with_submodules['rev_sha1_git'], | ||||
'some/path/to/rev') | revision_with_submodules['rev_dir_rev_path']) | ||||
self.assertIn("Entity of type rev not implemented.", | self.assertIn("Entity of type rev not implemented.", | ||||
cm.exception.args[0]) | cm.exception.args[0]) | ||||
# then | @given(revision()) | ||||
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with | def test_lookup_directory_with_revision_without_path(self, revision): | ||||
('123', ['sha1'], 'Only sha1_git is supported.') | |||||
mock_storage.revision_get.assert_called_once_with([b'123']) | |||||
mock_storage.directory_entry_get_by_path.assert_called_once_with( | |||||
b'dir-id-as-sha1', [b'some', b'path', b'to', b'rev']) | |||||
@patch('swh.web.common.service.storage') | |||||
@patch('swh.web.common.service.query') | |||||
def test_lookup_directory_with_revision_revision_without_path( | |||||
self, mock_query, mock_storage, | |||||
): | |||||
# given | |||||
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1', | |||||
b'123') | |||||
dir_id = b'dir-id-as-sha1' | |||||
mock_storage.revision_get.return_value = [{ | |||||
'directory': dir_id, | |||||
}] | |||||
stub_dir_entries = [{ | |||||
'id': b'123', | |||||
'type': 'dir' | |||||
}, { | |||||
'id': b'456', | |||||
'type': 'file' | |||||
}] | |||||
mock_storage.directory_ls.return_value = stub_dir_entries | |||||
# when | |||||
actual_directory_entries = service.lookup_directory_with_revision( | |||||
'123') | |||||
self.assertEqual(actual_directory_entries['type'], 'dir') | |||||
self.assertEqual(list(actual_directory_entries['content']), | |||||
stub_dir_entries) | |||||
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with | actual_directory_entries = \ | ||||
('123', ['sha1'], 'Only sha1_git is supported.') | service.lookup_directory_with_revision(revision) | ||||
mock_storage.revision_get.assert_called_once_with([b'123']) | |||||
mock_storage.directory_ls.assert_called_once_with(dir_id) | |||||
@patch('swh.web.common.service.storage') | |||||
@patch('swh.web.common.service.query') | |||||
def test_lookup_directory_with_revision_with_path_to_dir(self, | |||||
mock_query, | |||||
mock_storage): | |||||
# given | |||||
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1', | |||||
b'123') | |||||
dir_id = b'dir-id-as-sha1' | |||||
mock_storage.revision_get.return_value = [{ | |||||
'directory': dir_id, | |||||
}] | |||||
stub_dir_entries = [{ | |||||
'id': b'12', | |||||
'type': 'dir' | |||||
}, { | |||||
'id': b'34', | |||||
'type': 'file' | |||||
}] | |||||
mock_storage.directory_entry_get_by_path.return_value = { | |||||
'type': 'dir', | |||||
'name': b'some/path', | |||||
'target': b'456' | |||||
} | |||||
mock_storage.directory_ls.return_value = stub_dir_entries | |||||
# when | revision_data = self.revision_get(revision) | ||||
actual_directory_entries = service.lookup_directory_with_revision( | expected_directory_entries = \ | ||||
'123', | self.directory_ls(revision_data['directory']) | ||||
'some/path') | |||||
self.assertEqual(actual_directory_entries['type'], 'dir') | self.assertEqual(actual_directory_entries['type'], 'dir') | ||||
self.assertEqual(actual_directory_entries['revision'], '123') | self.assertEqual(actual_directory_entries['content'], | ||||
self.assertEqual(actual_directory_entries['path'], 'some/path') | expected_directory_entries) | ||||
self.assertEqual(list(actual_directory_entries['content']), | |||||
stub_dir_entries) | |||||
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with | |||||
('123', ['sha1'], 'Only sha1_git is supported.') | |||||
mock_storage.revision_get.assert_called_once_with([b'123']) | |||||
mock_storage.directory_entry_get_by_path.assert_called_once_with( | |||||
dir_id, | |||||
[b'some', b'path']) | |||||
mock_storage.directory_ls.assert_called_once_with(b'456') | |||||
@patch('swh.web.common.service.storage') | |||||
@patch('swh.web.common.service.query') | |||||
def test_lookup_directory_with_revision_with_path_to_file_wo_data( | |||||
self, | |||||
mock_query, | |||||
mock_storage): | |||||
# given | |||||
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1', | |||||
b'123') | |||||
dir_id = b'dir-id-as-sha1' | |||||
mock_storage.revision_get.return_value = [{ | |||||
'directory': dir_id, | |||||
}] | |||||
mock_storage.directory_entry_get_by_path.return_value = { | |||||
'type': 'file', | |||||
'name': b'some/path/to/file', | |||||
'target': b'789' | |||||
} | |||||
stub_content = { | |||||
'status': 'visible', | |||||
} | |||||
mock_storage.content_find.return_value = stub_content | |||||
# when | |||||
actual_content = service.lookup_directory_with_revision( | |||||
'123', | |||||
'some/path/to/file') | |||||
# then | |||||
self.assertEqual(actual_content, {'type': 'file', | |||||
'revision': '123', | |||||
'path': 'some/path/to/file', | |||||
'content': stub_content}) | |||||
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with | |||||
('123', ['sha1'], 'Only sha1_git is supported.') | |||||
mock_storage.revision_get.assert_called_once_with([b'123']) | |||||
mock_storage.directory_entry_get_by_path.assert_called_once_with( | |||||
b'dir-id-as-sha1', [b'some', b'path', b'to', b'file']) | |||||
mock_storage.content_find.assert_called_once_with({'sha1_git': b'789'}) | |||||
@patch('swh.web.common.service.storage') | |||||
@patch('swh.web.common.service.query') | |||||
def test_lookup_directory_with_revision_with_path_to_file_w_data( | |||||
self, | |||||
mock_query, | |||||
mock_storage): | |||||
# given | |||||
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1', | |||||
b'123') | |||||
dir_id = b'dir-id-as-sha1' | |||||
mock_storage.revision_get.return_value = [{ | |||||
'directory': dir_id, | |||||
}] | |||||
mock_storage.directory_entry_get_by_path.return_value = { | |||||
'type': 'file', | |||||
'name': b'some/path/to/file', | |||||
'target': b'789' | |||||
} | |||||
stub_content = { | @given(revision()) | ||||
'status': 'visible', | def test_lookup_directory_with_revision_with_path(self, revision): | ||||
'sha1': b'content-sha1' | |||||
} | |||||
mock_storage.content_find.return_value = stub_content | |||||
mock_storage.content_get.return_value = [{ | |||||
'sha1': b'content-sha1', | |||||
'data': b'some raw data' | |||||
}] | |||||
expected_content = { | |||||
'status': 'visible', | |||||
'checksums': { | |||||
'sha1': hash_to_hex(b'content-sha1'), | |||||
}, | |||||
'data': b'some raw data' | |||||
} | |||||
# when | revision_data = self.revision_get(revision) | ||||
actual_content = service.lookup_directory_with_revision( | dir_entries = [e for e in self.directory_ls(revision_data['directory']) | ||||
'123', | if e['type'] in ('file', 'dir')] | ||||
'some/path/to/file', | expected_dir_entry = random.choice(dir_entries) | ||||
actual_dir_entry = \ | |||||
service.lookup_directory_with_revision(revision, | |||||
expected_dir_entry['name']) | |||||
self.assertEqual(actual_dir_entry['type'], expected_dir_entry['type']) | |||||
self.assertEqual(actual_dir_entry['revision'], revision) | |||||
self.assertEqual(actual_dir_entry['path'], expected_dir_entry['name']) | |||||
if actual_dir_entry['type'] == 'file': | |||||
del actual_dir_entry['content']['checksums']['blake2s256'] | |||||
for key in ('checksums', 'status', 'length'): | |||||
self.assertEqual(actual_dir_entry['content'][key], | |||||
expected_dir_entry[key]) | |||||
else: | |||||
sub_dir_entries = self.directory_ls(expected_dir_entry['target']) | |||||
self.assertEqual(actual_dir_entry['content'], sub_dir_entries) | |||||
@given(revision()) | |||||
def test_lookup_directory_with_revision_with_path_to_file_and_data( | |||||
self, revision): | |||||
revision_data = self.revision_get(revision) | |||||
dir_entries = [e for e in self.directory_ls(revision_data['directory']) | |||||
if e['type'] == 'file'] | |||||
expected_dir_entry = random.choice(dir_entries) | |||||
expected_data = \ | |||||
self.content_get(expected_dir_entry['checksums']['sha1']) | |||||
actual_dir_entry = \ | |||||
service.lookup_directory_with_revision(revision, | |||||
expected_dir_entry['name'], | |||||
with_data=True) | with_data=True) | ||||
# then | self.assertEqual(actual_dir_entry['type'], expected_dir_entry['type']) | ||||
self.assertEqual(actual_content, {'type': 'file', | self.assertEqual(actual_dir_entry['revision'], revision) | ||||
'revision': '123', | self.assertEqual(actual_dir_entry['path'], expected_dir_entry['name']) | ||||
'path': 'some/path/to/file', | del actual_dir_entry['content']['checksums']['blake2s256'] | ||||
'content': expected_content}) | for key in ('checksums', 'status', 'length'): | ||||
self.assertEqual(actual_dir_entry['content'][key], | |||||
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with | expected_dir_entry[key]) | ||||
('123', ['sha1'], 'Only sha1_git is supported.') | self.assertEqual(actual_dir_entry['content']['data'], | ||||
mock_storage.revision_get.assert_called_once_with([b'123']) | expected_data['data']) | ||||
mock_storage.directory_entry_get_by_path.assert_called_once_with( | |||||
b'dir-id-as-sha1', [b'some', b'path', b'to', b'file']) | @given(revision()) | ||||
mock_storage.content_find.assert_called_once_with({'sha1_git': b'789'}) | def test_lookup_revision(self, revision): | ||||
mock_storage.content_get.assert_called_once_with([b'content-sha1']) | actual_revision = service.lookup_revision(revision) | ||||
self.assertEqual(actual_revision, self.revision_get(revision)) | |||||
@patch('swh.web.common.service.storage') | |||||
def test_lookup_revision(self, mock_storage): | @given(unknown_revision()) | ||||
# given | def test_lookup_revision_invalid_msg(self, new_revision_id): | ||||
mock_storage.revision_get = MagicMock( | |||||
return_value=[self.SAMPLE_REVISION_RAW]) | new_revision = copy.deepcopy(self.SAMPLE_REVISION_RAW) | ||||
new_revision['id'] = hash_to_bytes(new_revision_id) | |||||
# when | new_revision['message'] = b'elegant fix for bug \xff' | ||||
actual_revision = service.lookup_revision( | self.storage.revision_add([new_revision]) | ||||
self.SHA1_SAMPLE) | |||||
revision = service.lookup_revision(new_revision_id) | |||||
# then | self.assertEqual(revision['message'], None) | ||||
self.assertEqual(actual_revision, self.SAMPLE_REVISION) | self.assertEqual(revision['message_decoding_failed'], True) | ||||
mock_storage.revision_get.assert_called_with( | @given(unknown_revision()) | ||||
[self.SHA1_SAMPLE_BIN]) | def test_lookup_revision_msg_ok(self, new_revision_id): | ||||
@patch('swh.web.common.service.storage') | new_revision = copy.deepcopy(self.SAMPLE_REVISION_RAW) | ||||
def test_lookup_revision_invalid_msg(self, mock_storage): | new_revision['id'] = hash_to_bytes(new_revision_id) | ||||
# given | self.storage.revision_add([new_revision]) | ||||
stub_rev = self.SAMPLE_REVISION_RAW | |||||
stub_rev['message'] = b'elegant fix for bug \xff' | revision_message = service.lookup_revision_message(new_revision_id) | ||||
expected_revision = self.SAMPLE_REVISION | self.assertEqual(revision_message, | ||||
expected_revision['message'] = None | {'message': self.SAMPLE_MESSAGE_BIN}) | ||||
expected_revision['message_decoding_failed'] = True | |||||
mock_storage.revision_get = MagicMock(return_value=[stub_rev]) | @given(unknown_revision()) | ||||
def test_lookup_revision_msg_absent(self, new_revision_id): | |||||
# when | |||||
actual_revision = service.lookup_revision( | new_revision = copy.deepcopy(self.SAMPLE_REVISION_RAW) | ||||
self.SHA1_SAMPLE) | new_revision['id'] = hash_to_bytes(new_revision_id) | ||||
del new_revision['message'] | |||||
self.storage.revision_add([new_revision]) | |||||
# then | |||||
self.assertEqual(actual_revision, expected_revision) | |||||
mock_storage.revision_get.assert_called_with( | |||||
[self.SHA1_SAMPLE_BIN]) | |||||
@patch('swh.web.common.service.storage') | |||||
def test_lookup_revision_msg_ok(self, mock_storage): | |||||
# given | |||||
mock_storage.revision_get.return_value = [self.SAMPLE_REVISION_RAW] | |||||
# when | |||||
rv = service.lookup_revision_message( | |||||
self.SHA1_SAMPLE) | |||||
# then | |||||
self.assertEqual(rv, {'message': self.SAMPLE_MESSAGE_BIN}) | |||||
mock_storage.revision_get.assert_called_with( | |||||
[self.SHA1_SAMPLE_BIN]) | |||||
@patch('swh.web.common.service.storage') | |||||
def test_lookup_revision_msg_absent(self, mock_storage): | |||||
# given | |||||
stub_revision = self.SAMPLE_REVISION_RAW | |||||
del stub_revision['message'] | |||||
mock_storage.revision_get.return_value = stub_revision | |||||
# when | |||||
with self.assertRaises(NotFoundExc) as cm: | with self.assertRaises(NotFoundExc) as cm: | ||||
service.lookup_revision_message( | service.lookup_revision_message(new_revision_id) | ||||
self.SHA1_SAMPLE) | |||||
# then | |||||
mock_storage.revision_get.assert_called_with( | |||||
[self.SHA1_SAMPLE_BIN]) | |||||
self.assertEqual( | self.assertEqual( | ||||
cm.exception.args[0], | cm.exception.args[0], | ||||
'No message for revision with sha1_git %s.' % self.SHA1_SAMPLE, | 'No message for revision with sha1_git %s.' % new_revision_id | ||||
) | ) | ||||
@patch('swh.web.common.service.storage') | @given(unknown_revision()) | ||||
def test_lookup_revision_msg_norev(self, mock_storage): | def test_lookup_revision_msg_no_rev(self, unknown_revision): | ||||
# given | |||||
mock_storage.revision_get.return_value = None | |||||
# when | |||||
with self.assertRaises(NotFoundExc) as cm: | with self.assertRaises(NotFoundExc) as cm: | ||||
service.lookup_revision_message( | service.lookup_revision_message(unknown_revision) | ||||
self.SHA1_SAMPLE) | |||||
# then | |||||
mock_storage.revision_get.assert_called_with( | |||||
[self.SHA1_SAMPLE_BIN]) | |||||
self.assertEqual( | self.assertEqual( | ||||
cm.exception.args[0], | cm.exception.args[0], | ||||
'Revision with sha1_git %s not found.' % self.SHA1_SAMPLE, | 'Revision with sha1_git %s not found.' % unknown_revision | ||||
) | ) | ||||
@patch('swh.web.common.service.storage') | @given(revisions()) | ||||
def test_lookup_revision_multiple(self, mock_storage): | def test_lookup_revision_multiple(self, revisions): | ||||
# given | |||||
sha1 = self.SHA1_SAMPLE | |||||
sha1_other = 'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc' | |||||
stub_revisions = [ | |||||
self.SAMPLE_REVISION_RAW, | |||||
{ | |||||
'id': hash_to_bytes(sha1_other), | |||||
'directory': 'abcdbe353ed3480476f032475e7c233eff7371d5', | |||||
'author': { | |||||
'name': b'name', | |||||
'email': b'name@surname.org', | |||||
}, | |||||
'committer': { | |||||
'name': b'name', | |||||
'email': b'name@surname.org', | |||||
}, | |||||
'message': b'ugly fix for bug 42', | |||||
'date': { | |||||
'timestamp': datetime.datetime( | |||||
2000, 1, 12, 5, 23, 54, | |||||
tzinfo=datetime.timezone.utc).timestamp(), | |||||
'offset': 0, | |||||
'negative_utc': False | |||||
}, | |||||
'date_offset': 0, | |||||
'committer_date': { | |||||
'timestamp': datetime.datetime( | |||||
2000, 1, 12, 5, 23, 54, | |||||
tzinfo=datetime.timezone.utc).timestamp(), | |||||
'offset': 0, | |||||
'negative_utc': False | |||||
}, | |||||
'committer_date_offset': 0, | |||||
'synthetic': False, | |||||
'type': 'git', | |||||
'parents': [], | |||||
'metadata': [], | |||||
} | |||||
] | |||||
mock_storage.revision_get.return_value = stub_revisions | actual_revisions = list(service.lookup_revision_multiple(revisions)) | ||||
# when | expected_revisions = [] | ||||
actual_revisions = service.lookup_revision_multiple( | for rev in revisions: | ||||
[sha1, sha1_other]) | expected_revisions.append(self.revision_get(rev)) | ||||
# then | |||||
self.assertEqual(list(actual_revisions), [ | |||||
self.SAMPLE_REVISION, | |||||
{ | |||||
'id': sha1_other, | |||||
'directory': 'abcdbe353ed3480476f032475e7c233eff7371d5', | |||||
'author': { | |||||
'name': 'name', | |||||
'email': 'name@surname.org', | |||||
}, | |||||
'committer': { | |||||
'name': 'name', | |||||
'email': 'name@surname.org', | |||||
}, | |||||
'message': 'ugly fix for bug 42', | |||||
'date': '2000-01-12T05:23:54+00:00', | |||||
'date_offset': 0, | |||||
'committer_date': '2000-01-12T05:23:54+00:00', | |||||
'committer_date_offset': 0, | |||||
'synthetic': False, | |||||
'type': 'git', | |||||
'parents': [], | |||||
'metadata': {}, | |||||
'merge': False | |||||
} | |||||
]) | |||||
self.assertEqual( | self.assertEqual(actual_revisions, expected_revisions) | ||||
list(mock_storage.revision_get.call_args[0][0]), | |||||
[hash_to_bytes(sha1), | |||||
hash_to_bytes(sha1_other)]) | |||||
@patch('swh.web.common.service.storage') | |||||
def test_lookup_revision_multiple_none_found(self, mock_storage): | |||||
# given | |||||
sha1_bin = self.SHA1_SAMPLE | |||||
sha1_other = 'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc' | |||||
mock_storage.revision_get.return_value = [] | |||||
# then | |||||
actual_revisions = service.lookup_revision_multiple( | |||||
[sha1_bin, sha1_other]) | |||||
self.assertEqual(list(actual_revisions), []) | @given(unknown_revisions()) | ||||
def test_lookup_revision_multiple_none_found(self, unknown_revisions): | |||||
self.assertEqual( | actual_revisions = \ | ||||
list(mock_storage.revision_get.call_args[0][0]), | list(service.lookup_revision_multiple(unknown_revisions)) | ||||
[hash_to_bytes(self.SHA1_SAMPLE), | |||||
hash_to_bytes(sha1_other)]) | self.assertEqual(actual_revisions, [None] * len(unknown_revisions)) | ||||
@patch('swh.web.common.service.storage') | @given(revision()) | ||||
def test_lookup_revision_log(self, mock_storage): | def test_lookup_revision_log(self, revision): | ||||
# given | |||||
stub_revision_log = [self.SAMPLE_REVISION_RAW] | actual_revision_log = \ | ||||
mock_storage.revision_log = MagicMock(return_value=stub_revision_log) | list(service.lookup_revision_log(revision, limit=25)) | ||||
expected_revision_log = self.revision_log(revision, limit=25) | |||||
# when | |||||
actual_revision = service.lookup_revision_log( | self.assertEqual(actual_revision_log, expected_revision_log) | ||||
'abcdbe353ed3480476f032475e7c233eff7371d5', | |||||
limit=25) | def _get_origin_branches(self, origin): | ||||
origin_visit = self.origin_visit_get(origin['id'])[0] | |||||
# then | snapshot = self.snapshot_get(origin_visit['snapshot']) | ||||
self.assertEqual(list(actual_revision), [self.SAMPLE_REVISION]) | branches = {k: v for (k, v) in snapshot['branches'].items() | ||||
if v['target_type'] == 'revision'} | |||||
mock_storage.revision_log.assert_called_with( | return branches | ||||
[hash_to_bytes('abcdbe353ed3480476f032475e7c233eff7371d5')], 25) | |||||
@given(origin()) | |||||
@patch('swh.web.common.service.lookup_revision_log') | def test_lookup_revision_log_by(self, origin): | ||||
@patch('swh.web.common.service.lookup_snapshot') | |||||
@patch('swh.web.common.service.get_origin_visit') | branches = self._get_origin_branches(origin) | ||||
def test_lookup_revision_log_by(self, mock_get_origin_visit, | branch_name = random.choice(list(branches.keys())) | ||||
mock_lookup_snapshot, | |||||
mock_lookup_revision_log): | actual_log = \ | ||||
# given | list(service.lookup_revision_log_by(origin['id'], branch_name, | ||||
mock_get_origin_visit.return_value = {'snapshot': self.SHA1_SAMPLE} | None, limit=25)) | ||||
mock_lookup_snapshot.return_value = \ | |||||
{ | expected_log = \ | ||||
'branches': { | self.revision_log(branches[branch_name]['target'], limit=25) | ||||
'refs/heads/master': { | |||||
'target_type': 'revision', | |||||
'target': self.SAMPLE_REVISION['id'] | |||||
} | |||||
} | |||||
} | |||||
mock_lookup_revision_log.return_value = [self.SAMPLE_REVISION] | self.assertEqual(actual_log, expected_log) | ||||
# when | @given(origin()) | ||||
actual_log = service.lookup_revision_log_by( | def test_lookup_revision_log_by_notfound(self, origin): | ||||
1, 'refs/heads/master', None, limit=100) | |||||
# then | |||||
self.assertEqual(list(actual_log), [self.SAMPLE_REVISION]) | |||||
@patch('swh.web.common.service.lookup_snapshot') | |||||
@patch('swh.web.common.service.get_origin_visit') | |||||
def test_lookup_revision_log_by_notfound(self, mock_get_origin_visit, | |||||
mock_lookup_snapshot): | |||||
# given | |||||
mock_get_origin_visit.return_value = {'snapshot': self.SHA1_SAMPLE} | |||||
mock_lookup_snapshot.return_value = {'branches': {}} | |||||
# when | |||||
with self.assertRaises(NotFoundExc): | with self.assertRaises(NotFoundExc): | ||||
service.lookup_revision_log_by( | service.lookup_revision_log_by( | ||||
1, 'refs/heads/master', None, limit=100) | origin['id'], 'unknown_branch_name', None, limit=100) | ||||
@patch('swh.web.common.service.storage') | @given(unknown_content()) | ||||
def test_lookup_content_raw_not_found(self, mock_storage): | def test_lookup_content_raw_not_found(self, unknown_content): | ||||
# given | |||||
mock_storage.content_find = MagicMock(return_value=None) | |||||
# when | |||||
with self.assertRaises(NotFoundExc) as cm: | with self.assertRaises(NotFoundExc) as cm: | ||||
service.lookup_content_raw('sha1:' + self.SHA1_SAMPLE) | service.lookup_content_raw('sha1:' + unknown_content['sha1']) | ||||
self.assertIn(cm.exception.args[0], | self.assertIn(cm.exception.args[0], | ||||
'Content with %s checksum equals to %s not found!' % | 'Content with %s checksum equals to %s not found!' % | ||||
('sha1', self.SHA1_SAMPLE)) | ('sha1', unknown_content['sha1'])) | ||||
mock_storage.content_find.assert_called_with( | |||||
{'sha1': hash_to_bytes(self.SHA1_SAMPLE)}) | |||||
@patch('swh.web.common.service.storage') | @given(content()) | ||||
def test_lookup_content_raw(self, mock_storage): | def test_lookup_content_raw(self, content): | ||||
# given | |||||
mock_storage.content_find = MagicMock(return_value={ | |||||
'sha1': self.SHA1_SAMPLE, | |||||
}) | |||||
mock_storage.content_get = MagicMock(return_value=[{ | |||||
'data': b'binary data'}]) | |||||
# when | |||||
actual_content = service.lookup_content_raw( | actual_content = service.lookup_content_raw( | ||||
'sha256:%s' % self.SHA256_SAMPLE) | 'sha256:%s' % content['sha256']) | ||||
# then | expected_content = self.content_get(content['sha1']) | ||||
self.assertEqual(actual_content, {'data': b'binary data'}) | |||||
mock_storage.content_find.assert_called_once_with( | self.assertEqual(actual_content, expected_content) | ||||
{'sha256': self.SHA256_SAMPLE_BIN}) | |||||
mock_storage.content_get.assert_called_once_with( | @given(unknown_content()) | ||||
[hash_to_bytes(self.SHA1_SAMPLE)]) | def test_lookup_content_not_found(self, unknown_content): | ||||
@patch('swh.web.common.service.storage') | |||||
def test_lookup_content_not_found(self, mock_storage): | |||||
# given | |||||
mock_storage.content_find = MagicMock(return_value=None) | |||||
# when | |||||
with self.assertRaises(NotFoundExc) as cm: | with self.assertRaises(NotFoundExc) as cm: | ||||
# then | service.lookup_content('sha1:%s' % unknown_content['sha1']) | ||||
service.lookup_content('sha1:%s' % self.SHA1_SAMPLE) | |||||
self.assertIn(cm.exception.args[0], | self.assertIn(cm.exception.args[0], | ||||
'Content with %s checksum equals to %s not found!' % | 'Content with %s checksum equals to %s not found!' % | ||||
('sha1', self.SHA1_SAMPLE)) | ('sha1', unknown_content['sha1'])) | ||||
mock_storage.content_find.assert_called_with( | |||||
{'sha1': self.SHA1_SAMPLE_BIN}) | |||||
@patch('swh.web.common.service.storage') | @given(content()) | ||||
def test_lookup_content_with_sha1(self, mock_storage): | def test_lookup_content_with_sha1(self, content): | ||||
# given | |||||
mock_storage.content_find = MagicMock( | |||||
return_value=self.SAMPLE_CONTENT_RAW) | |||||
# when | |||||
actual_content = service.lookup_content( | actual_content = service.lookup_content( | ||||
'sha1:%s' % self.SHA1_SAMPLE) | 'sha1:%s' % content['sha1']) | ||||
# then | expected_content = self.content_get_metadata(content['sha1']) | ||||
self.assertEqual(actual_content, self.SAMPLE_CONTENT) | |||||
mock_storage.content_find.assert_called_with( | self.assertEqual(actual_content, expected_content) | ||||
{'sha1': hash_to_bytes(self.SHA1_SAMPLE)}) | |||||
@patch('swh.web.common.service.storage') | @given(content()) | ||||
def test_lookup_content_with_sha256(self, mock_storage): | def test_lookup_content_with_sha256(self, content): | ||||
# given | |||||
stub_content = self.SAMPLE_CONTENT_RAW | |||||
stub_content['status'] = 'visible' | |||||
expected_content = self.SAMPLE_CONTENT | |||||
expected_content['status'] = 'visible' | |||||
mock_storage.content_find = MagicMock( | |||||
return_value=stub_content) | |||||
# when | |||||
actual_content = service.lookup_content( | actual_content = service.lookup_content( | ||||
'sha256:%s' % self.SHA256_SAMPLE) | 'sha256:%s' % content['sha256']) | ||||
expected_content = self.content_get_metadata(content['sha1']) | |||||
# then | |||||
self.assertEqual(actual_content, expected_content) | self.assertEqual(actual_content, expected_content) | ||||
mock_storage.content_find.assert_called_with( | @given(revision()) | ||||
{'sha256': self.SHA256_SAMPLE_BIN}) | def test_lookup_person(self, revision): | ||||
@patch('swh.web.common.service.storage') | rev_data = self.revision_get(revision) | ||||
def test_lookup_person(self, mock_storage): | |||||
# given | |||||
mock_storage.person_get = MagicMock(return_value=[{ | |||||
'id': 'person_id', | |||||
'name': b'some_name', | |||||
'email': b'some-email', | |||||
}]) | |||||
# when | |||||
actual_person = service.lookup_person('person_id') | |||||
# then | |||||
self.assertEqual(actual_person, { | |||||
'id': 'person_id', | |||||
'name': 'some_name', | |||||
'email': 'some-email', | |||||
}) | |||||
mock_storage.person_get.assert_called_with(['person_id']) | actual_person = service.lookup_person(rev_data['author']['id']) | ||||
@patch('swh.web.common.service.storage') | self.assertEqual(actual_person, rev_data['author']) | ||||
def test_lookup_directory_bad_checksum(self, mock_storage): | |||||
# given | def test_lookup_directory_bad_checksum(self): | ||||
mock_storage.directory_ls = MagicMock() | |||||
# when | |||||
with self.assertRaises(BadInputExc): | with self.assertRaises(BadInputExc): | ||||
service.lookup_directory('directory_id') | service.lookup_directory('directory_id') | ||||
# then | @given(unknown_directory()) | ||||
mock_storage.directory_ls.called = False | def test_lookup_directory_not_found(self, unknown_directory): | ||||
@patch('swh.web.common.service.storage') | |||||
@patch('swh.web.common.service.query') | |||||
def test_lookup_directory_not_found(self, mock_query, mock_storage): | |||||
# given | |||||
mock_query.parse_hash_with_algorithms_or_throws.return_value = ( | |||||
'sha1', | |||||
'directory-id-bin') | |||||
mock_storage.directory_missing.return_value = ['directory-id-bin'] | |||||
# when | |||||
with self.assertRaises(NotFoundExc) as cm: | with self.assertRaises(NotFoundExc) as cm: | ||||
service.lookup_directory('directory_id') | service.lookup_directory(unknown_directory) | ||||
self.assertIn('Directory with sha1_git directory_id not found', | self.assertIn('Directory with sha1_git %s not found' | ||||
cm.exception.args[0]) | % unknown_directory, cm.exception.args[0]) | ||||
# then | |||||
mock_query.parse_hash_with_algorithms_or_throws.assert_called_with( | |||||
'directory_id', ['sha1'], 'Only sha1_git is supported.') | |||||
@patch('swh.web.common.service.storage') | |||||
@patch('swh.web.common.service.query') | |||||
def test_lookup_directory(self, mock_query, mock_storage): | |||||
mock_query.parse_hash_with_algorithms_or_throws.return_value = ( | |||||
'sha1', | |||||
'directory-sha1-bin') | |||||
# given | |||||
stub_dir_entries = [{ | |||||
'sha1': self.SHA1_SAMPLE_BIN, | |||||
'sha256': self.SHA256_SAMPLE_BIN, | |||||
'sha1_git': self.SHA1GIT_SAMPLE_BIN, | |||||
'blake2s256': self.BLAKE2S256_SAMPLE_BIN, | |||||
'target': hash_to_bytes( | |||||
'40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'), | |||||
'dir_id': self.DIRECTORY_ID_BIN, | |||||
'name': b'bob', | |||||
'type': 10, | |||||
}] | |||||
expected_dir_entries = [{ | |||||
'checksums': { | |||||
'sha1': self.SHA1_SAMPLE, | |||||
'sha256': self.SHA256_SAMPLE, | |||||
'sha1_git': self.SHA1GIT_SAMPLE, | |||||
'blake2s256': self.BLAKE2S256_SAMPLE | |||||
}, | |||||
'target': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', | |||||
'dir_id': self.DIRECTORY_ID, | |||||
'name': 'bob', | |||||
'type': 10, | |||||
}] | |||||
mock_storage.directory_ls.return_value = stub_dir_entries | @given(directory()) | ||||
mock_storage.directory_missing.return_value = [] | def test_lookup_directory(self, directory): | ||||
# when | |||||
actual_directory_ls = list(service.lookup_directory( | actual_directory_ls = list(service.lookup_directory( | ||||
'directory-sha1')) | directory)) | ||||
# then | expected_directory_ls = self.directory_ls(directory) | ||||
self.assertEqual(actual_directory_ls, expected_dir_entries) | |||||
mock_query.parse_hash_with_algorithms_or_throws.assert_called_with( | self.assertEqual(actual_directory_ls, expected_directory_ls) | ||||
'directory-sha1', ['sha1'], 'Only sha1_git is supported.') | |||||
mock_storage.directory_ls.assert_called_with( | |||||
'directory-sha1-bin') | |||||
@patch('swh.web.common.service.storage') | @given(empty_directory()) | ||||
def test_lookup_directory_empty(self, mock_storage): | def test_lookup_directory_empty(self, empty_directory): | ||||
empty_dir_sha1 = '4b825dc642cb6eb9a060e54bf8d69288fbee4904' | |||||
mock_storage.directory_ls.return_value = [] | |||||
# when | actual_directory_ls = list(service.lookup_directory(empty_directory)) | ||||
actual_directory_ls = list(service.lookup_directory(empty_dir_sha1)) | |||||
# then | |||||
self.assertEqual(actual_directory_ls, []) | self.assertEqual(actual_directory_ls, []) | ||||
self.assertFalse(mock_storage.directory_ls.called) | @given(origin()) | ||||
def test_lookup_revision_by_nothing_found(self, origin): | |||||
@patch('swh.web.common.service.lookup_snapshot') | |||||
@patch('swh.web.common.service.get_origin_visit') | |||||
def test_lookup_revision_by_nothing_found(self, mock_get_origin_visit, | |||||
mock_lookup_snapshot): | |||||
# given | |||||
mock_get_origin_visit.return_value = {'snapshot': self.SHA1_SAMPLE} | |||||
mock_lookup_snapshot.return_value = {'branches': {}} | |||||
# when | |||||
with self.assertRaises(NotFoundExc): | with self.assertRaises(NotFoundExc): | ||||
service.lookup_revision_by(1) | service.lookup_revision_by(origin['id'], 'invalid-branch-name') | ||||
@patch('swh.web.common.service.lookup_revision') | @given(origin()) | ||||
@patch('swh.web.common.service.lookup_snapshot') | def test_lookup_revision_by(self, origin): | ||||
@patch('swh.web.common.service.get_origin_visit') | |||||
def test_lookup_revision_by(self, mock_get_origin_visit, | |||||
mock_lookup_snapshot, mock_lookup_revision): | |||||
# given | |||||
expected_rev = self.SAMPLE_REVISION | |||||
mock_get_origin_visit.return_value = {'snapshot': self.SHA1_SAMPLE} | |||||
mock_lookup_snapshot.return_value = \ | |||||
{ | |||||
'branches': { | |||||
'master2': { | |||||
'target_type': 'revision', | |||||
'target': expected_rev['id'] | |||||
} | |||||
} | |||||
} | |||||
mock_lookup_revision.return_value = expected_rev | branches = self._get_origin_branches(origin) | ||||
branch_name = random.choice(list(branches.keys())) | |||||
# when | actual_revision = \ | ||||
actual_revision = service.lookup_revision_by(10, 'master2', 'some-ts') | service.lookup_revision_by(origin['id'], branch_name, None) | ||||
# then | expected_revision = \ | ||||
self.assertEqual(actual_revision, expected_rev) | self.revision_get(branches[branch_name]['target']) | ||||
self.assertEqual(actual_revision, expected_revision) | |||||
@given(origin(), revision()) | |||||
def test_lookup_revision_with_context_by_ko(self, origin, revision): | |||||
@patch('swh.web.common.service.lookup_snapshot') | |||||
@patch('swh.web.common.service.get_origin_visit') | |||||
def test_lookup_revision_with_context_by_ko(self, mock_get_origin_visit, | |||||
mock_lookup_snapshot): | |||||
# given | |||||
mock_get_origin_visit.return_value = {'snapshot': self.SHA1_SAMPLE} | |||||
mock_lookup_snapshot.return_value = {'branches': {}} | |||||
# when | |||||
origin_id = 1 | |||||
branch_name = 'master3' | |||||
ts = None | |||||
with self.assertRaises(NotFoundExc): | with self.assertRaises(NotFoundExc): | ||||
service.lookup_revision_with_context_by(origin_id, branch_name, ts, | service.lookup_revision_with_context_by(origin['id'], | ||||
'sha1') | 'invalid-branch-name', | ||||
None, | |||||
revision) | |||||
@patch('swh.web.common.service.storage') | @given(origin()) | ||||
@patch('swh.web.common.service.lookup_revision') | def test_lookup_revision_with_context_by(self, origin): | ||||
@patch('swh.web.common.service.lookup_snapshot') | |||||
@patch('swh.web.common.service.get_origin_visit') | branches = self._get_origin_branches(origin) | ||||
@patch('swh.web.common.service.lookup_revision_with_context') | branch_name = random.choice(list(branches.keys())) | ||||
def test_lookup_revision_with_context_by( | |||||
self, mock_lookup_revision_with_context, mock_get_origin_visit, | root_rev = branches[branch_name]['target'] | ||||
mock_lookup_snapshot, mock_lookup_revision, mock_storage | root_rev_log = self.revision_log(root_rev) | ||||
): | |||||
# given | children = defaultdict(list) | ||||
stub_root_rev = self.SAMPLE_REVISION | |||||
for rev in root_rev_log: | |||||
mock_get_origin_visit.return_value = {'snapshot': self.SHA1_SAMPLE} | for rev_p in rev['parents']: | ||||
mock_lookup_snapshot.return_value = \ | children[rev_p].append(rev['id']) | ||||
{ | |||||
'branches': { | rev = root_rev_log[-1]['id'] | ||||
'master2': { | |||||
'target_type': 'revision', | |||||
'target': stub_root_rev['id'] | |||||
} | |||||
} | |||||
} | |||||
mock_lookup_revision.return_value = stub_root_rev | |||||
stub_rev = {'id': 'rev-found'} | |||||
mock_lookup_revision_with_context.return_value = stub_rev | |||||
mock_storage.revision_get.return_value = [self.SAMPLE_REVISION_RAW] | |||||
# when | |||||
origin_id = 1 | |||||
branch_name = 'master2' | |||||
ts = None | |||||
sha1_git = 'sha1' | |||||
actual_root_rev, actual_rev = service.lookup_revision_with_context_by( | actual_root_rev, actual_rev = service.lookup_revision_with_context_by( | ||||
origin_id, branch_name, ts, sha1_git) | origin['id'], branch_name, None, rev) | ||||
# then | expected_root_rev = self.revision_get(root_rev) | ||||
self.assertEqual(actual_root_rev, stub_root_rev) | expected_rev = self.revision_get(rev) | ||||
self.assertEqual(actual_rev, stub_rev) | expected_rev['children'] = children[rev] | ||||
mock_lookup_revision_with_context.assert_called_once_with( | self.assertEqual(actual_root_rev, expected_root_rev) | ||||
self.SAMPLE_REVISION_RAW, sha1_git, 100) | self.assertEqual(actual_rev, expected_rev) | ||||
def test_lookup_revision_through_ko_not_implemented(self): | def test_lookup_revision_through_ko_not_implemented(self): | ||||
# then | |||||
with self.assertRaises(NotImplementedError): | with self.assertRaises(NotImplementedError): | ||||
service.lookup_revision_through({ | service.lookup_revision_through({ | ||||
'something-unknown': 10, | 'something-unknown': 10, | ||||
}) | }) | ||||
@patch('swh.web.common.service.lookup_revision_with_context_by') | @given(origin()) | ||||
def test_lookup_revision_through_with_context_by(self, mock_lookup): | def test_lookup_revision_through_with_context_by(self, origin): | ||||
# given | |||||
stub_rev = {'id': 'rev'} | branches = self._get_origin_branches(origin) | ||||
mock_lookup.return_value = stub_rev | branch_name = random.choice(list(branches.keys())) | ||||
# when | root_rev = branches[branch_name]['target'] | ||||
actual_revision = service.lookup_revision_through({ | root_rev_log = self.revision_log(root_rev) | ||||
'origin_id': 1, | rev = root_rev_log[-1]['id'] | ||||
'branch_name': 'master', | |||||
self.assertEqual(service.lookup_revision_through({ | |||||
'origin_id': origin['id'], | |||||
'branch_name': branch_name, | |||||
'ts': None, | 'ts': None, | ||||
'sha1_git': 'sha1-git' | 'sha1_git': rev | ||||
}, limit=1000) | }), | ||||
service.lookup_revision_with_context_by( | |||||
origin['id'], branch_name, None, rev) | |||||
) | |||||
# then | @given(origin()) | ||||
self.assertEqual(actual_revision, stub_rev) | def test_lookup_revision_through_with_revision_by(self, origin): | ||||
mock_lookup.assert_called_once_with( | branches = self._get_origin_branches(origin) | ||||
1, 'master', None, 'sha1-git', 1000) | branch_name = random.choice(list(branches.keys())) | ||||
@patch('swh.web.common.service.lookup_revision_by') | self.assertEqual(service.lookup_revision_through({ | ||||
def test_lookup_revision_through_with_revision_by(self, mock_lookup): | 'origin_id': origin['id'], | ||||
# given | 'branch_name': branch_name, | ||||
stub_rev = {'id': 'rev'} | 'ts': None, | ||||
mock_lookup.return_value = stub_rev | }), | ||||
service.lookup_revision_by( | |||||
# when | origin['id'], branch_name, None) | ||||
actual_revision = service.lookup_revision_through({ | ) | ||||
'origin_id': 2, | |||||
'branch_name': 'master2', | |||||
'ts': 'some-ts', | |||||
}, limit=10) | |||||
# then | |||||
self.assertEqual(actual_revision, stub_rev) | |||||
mock_lookup.assert_called_once_with( | |||||
2, 'master2', 'some-ts') | |||||
@patch('swh.web.common.service.lookup_revision_with_context') | |||||
def test_lookup_revision_through_with_context(self, mock_lookup): | |||||
# given | |||||
stub_rev = {'id': 'rev'} | |||||
mock_lookup.return_value = stub_rev | |||||
# when | |||||
actual_revision = service.lookup_revision_through({ | |||||
'sha1_git_root': 'some-sha1-root', | |||||
'sha1_git': 'some-sha1', | |||||
}) | |||||
# then | @given(ancestor_revisions()) | ||||
self.assertEqual(actual_revision, stub_rev) | def test_lookup_revision_through_with_context(self, ancestor_revisions): | ||||
mock_lookup.assert_called_once_with( | sha1_git = ancestor_revisions['sha1_git'] | ||||
'some-sha1-root', 'some-sha1', 100) | sha1_git_root = ancestor_revisions['sha1_git_root'] | ||||
@patch('swh.web.common.service.lookup_revision') | self.assertEqual(service.lookup_revision_through({ | ||||
def test_lookup_revision_through_with_revision(self, mock_lookup): | 'sha1_git_root': sha1_git_root, | ||||
# given | 'sha1_git': sha1_git, | ||||
stub_rev = {'id': 'rev'} | }), | ||||
mock_lookup.return_value = stub_rev | service.lookup_revision_with_context( | ||||
sha1_git_root, sha1_git) | |||||
# when | |||||
actual_revision = service.lookup_revision_through({ | |||||
'sha1_git': 'some-sha1', | |||||
}) | |||||
# then | ) | ||||
self.assertEqual(actual_revision, stub_rev) | |||||
@given(revision()) | |||||
def test_lookup_revision_through_with_revision(self, revision): | |||||
mock_lookup.assert_called_once_with( | self.assertEqual(service.lookup_revision_through({ | ||||
'some-sha1') | 'sha1_git': revision | ||||
}), | |||||
service.lookup_revision(revision) | |||||
) | |||||
@patch('swh.web.common.service.lookup_revision_through') | @given(revision()) | ||||
def test_lookup_directory_through_revision_ko_not_found( | def test_lookup_directory_through_revision_ko_not_found(self, revision): | ||||
self, mock_lookup_rev): | |||||
# given | |||||
mock_lookup_rev.return_value = None | |||||
# when | |||||
with self.assertRaises(NotFoundExc): | with self.assertRaises(NotFoundExc): | ||||
service.lookup_directory_through_revision( | service.lookup_directory_through_revision( | ||||
{'id': 'rev'}, 'some/path', 100) | {'sha1_git': revision}, 'some/invalid/path') | ||||
@given(revision()) | |||||
def test_lookup_directory_through_revision_ok(self, revision): | |||||
revision_data = self.revision_get(revision) | |||||
dir_entries = [e for e in self.directory_ls(revision_data['directory']) | |||||
if e['type'] == 'file'] | |||||
dir_entry = random.choice(dir_entries) | |||||
self.assertEqual( | |||||
service.lookup_directory_through_revision({'sha1_git': revision}, | |||||
dir_entry['name']), | |||||
(revision, | |||||
service.lookup_directory_with_revision( | |||||
revision, dir_entry['name'])) | |||||
) | |||||
mock_lookup_rev.assert_called_once_with({'id': 'rev'}, 100) | @given(revision()) | ||||
def test_lookup_directory_through_revision_ok_with_data(self, revision): | |||||
@patch('swh.web.common.service.lookup_revision_through') | revision_data = self.revision_get(revision) | ||||
@patch('swh.web.common.service.lookup_directory_with_revision') | dir_entries = [e for e in self.directory_ls(revision_data['directory']) | ||||
def test_lookup_directory_through_revision_ok_with_data( | if e['type'] == 'file'] | ||||
self, mock_lookup_dir, mock_lookup_rev): | dir_entry = random.choice(dir_entries) | ||||
# given | |||||
mock_lookup_rev.return_value = {'id': 'rev-id'} | |||||
mock_lookup_dir.return_value = {'type': 'dir', | |||||
'content': []} | |||||
# when | |||||
rev_id, dir_result = service.lookup_directory_through_revision( | |||||
{'id': 'rev'}, 'some/path', 100) | |||||
# then | |||||
self.assertEqual(rev_id, 'rev-id') | |||||
self.assertEqual(dir_result, {'type': 'dir', | |||||
'content': []}) | |||||
mock_lookup_rev.assert_called_once_with({'id': 'rev'}, 100) | |||||
mock_lookup_dir.assert_called_once_with('rev-id', 'some/path', False) | |||||
@patch('swh.web.common.service.lookup_revision_through') | |||||
@patch('swh.web.common.service.lookup_directory_with_revision') | |||||
def test_lookup_directory_through_revision_ok_with_content( | |||||
self, mock_lookup_dir, mock_lookup_rev): | |||||
# given | |||||
mock_lookup_rev.return_value = {'id': 'rev-id'} | |||||
stub_result = {'type': 'file', | |||||
'revision': 'rev-id', | |||||
'content': {'data': b'blah', | |||||
'sha1': 'sha1'}} | |||||
mock_lookup_dir.return_value = stub_result | |||||
# when | |||||
rev_id, dir_result = service.lookup_directory_through_revision( | |||||
{'id': 'rev'}, 'some/path', 10, with_data=True) | |||||
# then | |||||
self.assertEqual(rev_id, 'rev-id') | |||||
self.assertEqual(dir_result, stub_result) | |||||
mock_lookup_rev.assert_called_once_with({'id': 'rev'}, 10) | self.assertEqual( | ||||
mock_lookup_dir.assert_called_once_with('rev-id', 'some/path', True) | service.lookup_directory_through_revision({'sha1_git': revision}, | ||||
dir_entry['name'], | |||||
with_data=True), | |||||
(revision, | |||||
service.lookup_directory_with_revision( | |||||
revision, dir_entry['name'], with_data=True)) | |||||
) |