Changeset View
Changeset View
Standalone View
Standalone View
swh/web/tests/common/test_service.py
Show All 9 Lines | |||||
from collections import defaultdict | from collections import defaultdict | ||||
from hypothesis import given | from hypothesis import given | ||||
from swh.model.hashutil import hash_to_bytes, hash_to_hex | from swh.model.hashutil import hash_to_bytes, hash_to_hex | ||||
from swh.model.from_disk import DentryPerms | from swh.model.from_disk import DentryPerms | ||||
from swh.web.common import service | from swh.web.common import service | ||||
from swh.web.common.exc import BadInputExc, NotFoundExc | from swh.web.common.exc import BadInputExc, NotFoundExc | ||||
from swh.web.tests.data import random_sha1, random_content | |||||
from swh.web.tests.strategies import ( | from swh.web.tests.strategies import ( | ||||
content, contents, unknown_content, unknown_contents, | content, contents, unknown_contents, | ||||
contents_with_ctags, origin, new_origin, visit_dates, directory, | contents_with_ctags, origin, new_origin, visit_dates, directory, | ||||
release, revision, unknown_revision, revisions, unknown_revisions, | release, revision, unknown_revision, revisions, | ||||
ancestor_revisions, non_ancestor_revisions, invalid_sha1, sha256, | ancestor_revisions, non_ancestor_revisions, invalid_sha1, sha256, | ||||
revision_with_submodules, unknown_directory, empty_directory, | revision_with_submodules, empty_directory, | ||||
new_revision, new_origins | new_revision, new_origins | ||||
) | ) | ||||
from swh.web.tests.testcase import ( | from swh.web.tests.testcase import ( | ||||
WebTestCase, ctags_json_missing, fossology_missing | WebTestCase, ctags_json_missing, fossology_missing | ||||
) | ) | ||||
class ServiceTestCase(WebTestCase): | class ServiceTestCase(WebTestCase): | ||||
Show All 21 Lines | def test_lookup_multiple_hashes_some_missing(self, contents, | ||||
for cnt in input_contents: | for cnt in input_contents: | ||||
input_data.append({'sha1': cnt['sha1']}) | input_data.append({'sha1': cnt['sha1']}) | ||||
expected_output.append({'sha1': cnt['sha1'], | expected_output.append({'sha1': cnt['sha1'], | ||||
'found': cnt in contents}) | 'found': cnt in contents}) | ||||
self.assertEqual(service.lookup_multiple_hashes(input_data), | self.assertEqual(service.lookup_multiple_hashes(input_data), | ||||
expected_output) | expected_output) | ||||
@given(unknown_content()) | def test_lookup_hash_does_not_exist(self): | ||||
def test_lookup_hash_does_not_exist(self, unknown_content): | unknown_content_ = random_content() | ||||
actual_lookup = service.lookup_hash('sha1_git:%s' % | actual_lookup = service.lookup_hash('sha1_git:%s' % | ||||
unknown_content['sha1_git']) | unknown_content_['sha1_git']) | ||||
self.assertEqual(actual_lookup, {'found': None, | self.assertEqual(actual_lookup, {'found': None, | ||||
'algo': 'sha1_git'}) | 'algo': 'sha1_git'}) | ||||
@given(content()) | @given(content()) | ||||
def test_lookup_hash_exist(self, content): | def test_lookup_hash_exist(self, content): | ||||
actual_lookup = service.lookup_hash('sha1:%s' % content['sha1']) | actual_lookup = service.lookup_hash('sha1:%s' % content['sha1']) | ||||
content_metadata = self.content_get_metadata(content['sha1']) | content_metadata = self.content_get_metadata(content['sha1']) | ||||
self.assertEqual({'found': content_metadata, | self.assertEqual({'found': content_metadata, | ||||
'algo': 'sha1'}, actual_lookup) | 'algo': 'sha1'}, actual_lookup) | ||||
@given(unknown_content()) | def test_search_hash_does_not_exist(self): | ||||
def test_search_hash_does_not_exist(self, content): | unknown_content_ = random_content() | ||||
actual_lookup = service.search_hash('sha1_git:%s' % | actual_lookup = service.search_hash('sha1_git:%s' % | ||||
content['sha1_git']) | unknown_content_['sha1_git']) | ||||
self.assertEqual({'found': False}, actual_lookup) | self.assertEqual({'found': False}, actual_lookup) | ||||
@given(content()) | @given(content()) | ||||
def test_search_hash_exist(self, content): | def test_search_hash_exist(self, content): | ||||
actual_lookup = service.search_hash('sha1:%s' % content['sha1']) | actual_lookup = service.search_hash('sha1:%s' % content['sha1']) | ||||
Show All 10 Lines | def test_lookup_content_ctags(self, contents_with_ctags): | ||||
list(service.lookup_content_ctags('sha1:%s' % content_sha1)) | list(service.lookup_content_ctags('sha1:%s' % content_sha1)) | ||||
expected_data = list(self.content_get_ctags(content_sha1)) | expected_data = list(self.content_get_ctags(content_sha1)) | ||||
for ctag in expected_data: | for ctag in expected_data: | ||||
ctag['id'] = content_sha1 | ctag['id'] = content_sha1 | ||||
self.assertEqual(actual_ctags, expected_data) | self.assertEqual(actual_ctags, expected_data) | ||||
@given(unknown_content()) | def test_lookup_content_ctags_no_hash(self): | ||||
def test_lookup_content_ctags_no_hash(self, unknown_content): | unknown_content_ = random_content() | ||||
actual_ctags = \ | actual_ctags = \ | ||||
list(service.lookup_content_ctags('sha1:%s' % | list(service.lookup_content_ctags('sha1:%s' % | ||||
unknown_content['sha1'])) | unknown_content_['sha1'])) | ||||
self.assertEqual(actual_ctags, []) | self.assertEqual(actual_ctags, []) | ||||
@given(content()) | @given(content()) | ||||
def test_lookup_content_filetype(self, content): | def test_lookup_content_filetype(self, content): | ||||
self.content_add_mimetype(content['sha1']) | self.content_add_mimetype(content['sha1']) | ||||
actual_filetype = service.lookup_content_filetype(content['sha1']) | actual_filetype = service.lookup_content_filetype(content['sha1']) | ||||
▲ Show 20 Lines • Show All 203 Lines • ▼ Show 20 Lines | def test_lookup_revision_with_context_ko(self, non_ancestor_revisions): | ||||
sha1_git = non_ancestor_revisions['sha1_git'] | sha1_git = non_ancestor_revisions['sha1_git'] | ||||
root_sha1_git = non_ancestor_revisions['sha1_git_root'] | root_sha1_git = non_ancestor_revisions['sha1_git_root'] | ||||
with self.assertRaises(NotFoundExc) as cm: | with self.assertRaises(NotFoundExc) as cm: | ||||
service.lookup_revision_with_context(root_sha1_git, sha1_git) | service.lookup_revision_with_context(root_sha1_git, sha1_git) | ||||
self.assertIn('Revision %s is not an ancestor of %s' % | self.assertIn('Revision %s is not an ancestor of %s' % | ||||
(sha1_git, root_sha1_git), cm.exception.args[0]) | (sha1_git, root_sha1_git), cm.exception.args[0]) | ||||
@given(unknown_revision()) | def test_lookup_directory_with_revision_not_found(self): | ||||
def test_lookup_directory_with_revision_not_found(self, unknown_revision): | unknown_revision_ = random_sha1() | ||||
with self.assertRaises(NotFoundExc) as cm: | with self.assertRaises(NotFoundExc) as cm: | ||||
service.lookup_directory_with_revision(unknown_revision) | service.lookup_directory_with_revision(unknown_revision_) | ||||
self.assertIn('Revision %s not found' % unknown_revision, | self.assertIn('Revision %s not found' % unknown_revision_, | ||||
cm.exception.args[0]) | cm.exception.args[0]) | ||||
@given(unknown_content(), unknown_revision(), unknown_directory()) | def test_lookup_directory_with_revision_unknown_content(self): | ||||
def test_lookup_directory_with_revision_unknown_content( | unknown_content_ = random_content() | ||||
self, unknown_content, unknown_revision, unknown_directory): | unknown_revision_ = random_sha1() | ||||
unknown_directory_ = random_sha1() | |||||
dir_path = 'README.md' | dir_path = 'README.md' | ||||
# Create a revision that points to a directory | # Create a revision that points to a directory | ||||
# Which points to unknown content | # Which points to unknown content | ||||
revision = { | revision = { | ||||
'author': { | 'author': { | ||||
'name': b'abcd', | 'name': b'abcd', | ||||
'email': b'abcd@company.org', | 'email': b'abcd@company.org', | ||||
Show All 14 Lines | def test_lookup_directory_with_revision_unknown_content(self): | ||||
'offset': 0, | 'offset': 0, | ||||
'timestamp': 1437511651 | 'timestamp': 1437511651 | ||||
}, | }, | ||||
'message': b'bleh', | 'message': b'bleh', | ||||
'metadata': [], | 'metadata': [], | ||||
'parents': [], | 'parents': [], | ||||
'synthetic': False, | 'synthetic': False, | ||||
'type': 'file', | 'type': 'file', | ||||
'id': hash_to_bytes(unknown_revision), | 'id': hash_to_bytes(unknown_revision_), | ||||
'directory': hash_to_bytes(unknown_directory) | 'directory': hash_to_bytes(unknown_directory_) | ||||
} | } | ||||
# A directory that points to unknown content | # A directory that points to unknown content | ||||
dir = { | dir = { | ||||
'id': hash_to_bytes(unknown_directory), | 'id': hash_to_bytes(unknown_directory_), | ||||
'entries': [{ | 'entries': [{ | ||||
'name': bytes(dir_path.encode('utf-8')), | 'name': bytes(dir_path.encode('utf-8')), | ||||
'type': 'file', | 'type': 'file', | ||||
'target': hash_to_bytes(unknown_content['sha1_git']), | 'target': hash_to_bytes(unknown_content_['sha1_git']), | ||||
'perms': DentryPerms.content | 'perms': DentryPerms.content | ||||
}] | }] | ||||
} | } | ||||
# Add the directory and revision in mem | # Add the directory and revision in mem | ||||
self.storage.directory_add([dir]) | self.storage.directory_add([dir]) | ||||
self.storage.revision_add([revision]) | self.storage.revision_add([revision]) | ||||
with self.assertRaises(NotFoundExc) as cm: | with self.assertRaises(NotFoundExc) as cm: | ||||
service.lookup_directory_with_revision(unknown_revision, dir_path) | service.lookup_directory_with_revision( | ||||
self.assertIn('Content not found for revision %s' % unknown_revision, | unknown_revision_, dir_path) | ||||
self.assertIn('Content not found for revision %s' % | |||||
unknown_revision_, | |||||
cm.exception.args[0]) | cm.exception.args[0]) | ||||
@given(revision()) | @given(revision()) | ||||
def test_lookup_directory_with_revision_ko_path_to_nowhere( | def test_lookup_directory_with_revision_ko_path_to_nowhere( | ||||
self, revision): | self, revision): | ||||
invalid_path = 'path/to/something/unknown' | invalid_path = 'path/to/something/unknown' | ||||
with self.assertRaises(NotFoundExc) as cm: | with self.assertRaises(NotFoundExc) as cm: | ||||
service.lookup_directory_with_revision(revision, invalid_path) | service.lookup_directory_with_revision(revision, invalid_path) | ||||
▲ Show 20 Lines • Show All 127 Lines • ▼ Show 20 Lines | def test_lookup_revision_msg_absent(self, new_revision): | ||||
with self.assertRaises(NotFoundExc) as cm: | with self.assertRaises(NotFoundExc) as cm: | ||||
service.lookup_revision_message(new_revision_id) | service.lookup_revision_message(new_revision_id) | ||||
self.assertEqual( | self.assertEqual( | ||||
cm.exception.args[0], | cm.exception.args[0], | ||||
'No message for revision with sha1_git %s.' % new_revision_id | 'No message for revision with sha1_git %s.' % new_revision_id | ||||
) | ) | ||||
@given(unknown_revision()) | def test_lookup_revision_msg_no_rev(self): | ||||
def test_lookup_revision_msg_no_rev(self, unknown_revision): | unknown_revision_ = random_sha1() | ||||
with self.assertRaises(NotFoundExc) as cm: | with self.assertRaises(NotFoundExc) as cm: | ||||
service.lookup_revision_message(unknown_revision) | service.lookup_revision_message(unknown_revision_) | ||||
self.assertEqual( | self.assertEqual( | ||||
cm.exception.args[0], | cm.exception.args[0], | ||||
'Revision with sha1_git %s not found.' % unknown_revision | 'Revision with sha1_git %s not found.' % unknown_revision_ | ||||
) | ) | ||||
@given(revisions()) | @given(revisions()) | ||||
def test_lookup_revision_multiple(self, revisions): | def test_lookup_revision_multiple(self, revisions): | ||||
actual_revisions = list(service.lookup_revision_multiple(revisions)) | actual_revisions = list(service.lookup_revision_multiple(revisions)) | ||||
expected_revisions = [] | expected_revisions = [] | ||||
for rev in revisions: | for rev in revisions: | ||||
expected_revisions.append(self.revision_get(rev)) | expected_revisions.append(self.revision_get(rev)) | ||||
self.assertEqual(actual_revisions, expected_revisions) | self.assertEqual(actual_revisions, expected_revisions) | ||||
@given(unknown_revisions()) | def test_lookup_revision_multiple_none_found(self): | ||||
def test_lookup_revision_multiple_none_found(self, unknown_revisions): | unknown_revisions_ = [random_sha1(), random_sha1(), random_sha1()] | ||||
actual_revisions = \ | actual_revisions = \ | ||||
list(service.lookup_revision_multiple(unknown_revisions)) | list(service.lookup_revision_multiple(unknown_revisions_)) | ||||
self.assertEqual(actual_revisions, [None] * len(unknown_revisions)) | self.assertEqual(actual_revisions, [None] * len(unknown_revisions_)) | ||||
@given(revision()) | @given(revision()) | ||||
def test_lookup_revision_log(self, revision): | def test_lookup_revision_log(self, revision): | ||||
actual_revision_log = \ | actual_revision_log = \ | ||||
list(service.lookup_revision_log(revision, limit=25)) | list(service.lookup_revision_log(revision, limit=25)) | ||||
expected_revision_log = self.revision_log(revision, limit=25) | expected_revision_log = self.revision_log(revision, limit=25) | ||||
Show All 23 Lines | class ServiceTestCase(WebTestCase): | ||||
@given(origin()) | @given(origin()) | ||||
def test_lookup_revision_log_by_notfound(self, origin): | def test_lookup_revision_log_by_notfound(self, origin): | ||||
with self.assertRaises(NotFoundExc): | with self.assertRaises(NotFoundExc): | ||||
service.lookup_revision_log_by( | service.lookup_revision_log_by( | ||||
origin['id'], 'unknown_branch_name', None, limit=100) | origin['id'], 'unknown_branch_name', None, limit=100) | ||||
@given(unknown_content()) | def test_lookup_content_raw_not_found(self): | ||||
def test_lookup_content_raw_not_found(self, unknown_content): | unknown_content_ = random_content() | ||||
with self.assertRaises(NotFoundExc) as cm: | with self.assertRaises(NotFoundExc) as cm: | ||||
service.lookup_content_raw('sha1:' + unknown_content['sha1']) | service.lookup_content_raw('sha1:' + unknown_content_['sha1']) | ||||
self.assertIn(cm.exception.args[0], | self.assertIn(cm.exception.args[0], | ||||
'Content with %s checksum equals to %s not found!' % | 'Content with %s checksum equals to %s not found!' % | ||||
('sha1', unknown_content['sha1'])) | ('sha1', unknown_content_['sha1'])) | ||||
@given(content()) | @given(content()) | ||||
def test_lookup_content_raw(self, content): | def test_lookup_content_raw(self, content): | ||||
actual_content = service.lookup_content_raw( | actual_content = service.lookup_content_raw( | ||||
'sha256:%s' % content['sha256']) | 'sha256:%s' % content['sha256']) | ||||
expected_content = self.content_get(content['sha1']) | expected_content = self.content_get(content['sha1']) | ||||
self.assertEqual(actual_content, expected_content) | self.assertEqual(actual_content, expected_content) | ||||
@given(unknown_content()) | def test_lookup_content_not_found(self): | ||||
def test_lookup_content_not_found(self, unknown_content): | unknown_content_ = random_content() | ||||
with self.assertRaises(NotFoundExc) as cm: | with self.assertRaises(NotFoundExc) as cm: | ||||
service.lookup_content('sha1:%s' % unknown_content['sha1']) | service.lookup_content('sha1:%s' % unknown_content_['sha1']) | ||||
self.assertIn(cm.exception.args[0], | self.assertIn(cm.exception.args[0], | ||||
'Content with %s checksum equals to %s not found!' % | 'Content with %s checksum equals to %s not found!' % | ||||
('sha1', unknown_content['sha1'])) | ('sha1', unknown_content_['sha1'])) | ||||
@given(content()) | @given(content()) | ||||
def test_lookup_content_with_sha1(self, content): | def test_lookup_content_with_sha1(self, content): | ||||
actual_content = service.lookup_content( | actual_content = service.lookup_content( | ||||
'sha1:%s' % content['sha1']) | 'sha1:%s' % content['sha1']) | ||||
expected_content = self.content_get_metadata(content['sha1']) | expected_content = self.content_get_metadata(content['sha1']) | ||||
Show All 19 Lines | def test_lookup_person(self, revision): | ||||
self.assertEqual(actual_person, rev_data['author']) | self.assertEqual(actual_person, rev_data['author']) | ||||
def test_lookup_directory_bad_checksum(self): | def test_lookup_directory_bad_checksum(self): | ||||
with self.assertRaises(BadInputExc): | with self.assertRaises(BadInputExc): | ||||
service.lookup_directory('directory_id') | service.lookup_directory('directory_id') | ||||
@given(unknown_directory()) | def test_lookup_directory_not_found(self): | ||||
def test_lookup_directory_not_found(self, unknown_directory): | unknown_directory_ = random_sha1() | ||||
with self.assertRaises(NotFoundExc) as cm: | with self.assertRaises(NotFoundExc) as cm: | ||||
service.lookup_directory(unknown_directory) | service.lookup_directory(unknown_directory_) | ||||
self.assertIn('Directory with sha1_git %s not found' | self.assertIn('Directory with sha1_git %s not found' | ||||
% unknown_directory, cm.exception.args[0]) | % unknown_directory_, cm.exception.args[0]) | ||||
@given(directory()) | @given(directory()) | ||||
def test_lookup_directory(self, directory): | def test_lookup_directory(self, directory): | ||||
actual_directory_ls = list(service.lookup_directory( | actual_directory_ls = list(service.lookup_directory( | ||||
directory)) | directory)) | ||||
expected_directory_ls = self.directory_ls(directory) | expected_directory_ls = self.directory_ls(directory) | ||||
▲ Show 20 Lines • Show All 189 Lines • Show Last 20 Lines |