Changeset View
Changeset View
Standalone View
Standalone View
swh/web/tests/common/test_service.py
Show All 18 Lines | |||||
from swh.web.tests.strategies import ( | from swh.web.tests.strategies import ( | ||||
content, contents, unknown_contents, | content, contents, unknown_contents, | ||||
contents_with_ctags, origin, new_origin, visit_dates, directory, | contents_with_ctags, origin, new_origin, visit_dates, directory, | ||||
release, revision, unknown_revision, revisions, | release, revision, unknown_revision, revisions, | ||||
ancestor_revisions, non_ancestor_revisions, invalid_sha1, sha256, | ancestor_revisions, non_ancestor_revisions, invalid_sha1, sha256, | ||||
revision_with_submodules, empty_directory, | revision_with_submodules, empty_directory, | ||||
new_revision | new_revision | ||||
) | ) | ||||
from swh.web.tests.testcase import ( | from swh.web.tests.conftest import ctags_json_missing, fossology_missing | ||||
WebTestCase, ctags_json_missing, fossology_missing | |||||
) | |||||
class ServiceTestCase(WebTestCase): | |||||
@given(contents()) | @given(contents()) | ||||
def test_lookup_multiple_hashes_all_present(self, contents): | def test_lookup_multiple_hashes_all_present(contents): | ||||
input_data = [] | input_data = [] | ||||
expected_output = [] | expected_output = [] | ||||
for cnt in contents: | for cnt in contents: | ||||
input_data.append({'sha1': cnt['sha1']}) | input_data.append({'sha1': cnt['sha1']}) | ||||
expected_output.append({'sha1': cnt['sha1'], | expected_output.append({'sha1': cnt['sha1'], | ||||
'found': True}) | 'found': True}) | ||||
self.assertEqual(service.lookup_multiple_hashes(input_data), | assert service.lookup_multiple_hashes(input_data) == expected_output | ||||
expected_output) | |||||
@given(contents(), unknown_contents()) | @given(contents(), unknown_contents()) | ||||
def test_lookup_multiple_hashes_some_missing(self, contents, | def test_lookup_multiple_hashes_some_missing(contents, unknown_contents): | ||||
unknown_contents): | |||||
input_contents = list(itertools.chain(contents, unknown_contents)) | input_contents = list(itertools.chain(contents, unknown_contents)) | ||||
random.shuffle(input_contents) | random.shuffle(input_contents) | ||||
input_data = [] | input_data = [] | ||||
expected_output = [] | expected_output = [] | ||||
for cnt in input_contents: | for cnt in input_contents: | ||||
input_data.append({'sha1': cnt['sha1']}) | input_data.append({'sha1': cnt['sha1']}) | ||||
expected_output.append({'sha1': cnt['sha1'], | expected_output.append({'sha1': cnt['sha1'], | ||||
'found': cnt in contents}) | 'found': cnt in contents}) | ||||
self.assertEqual(service.lookup_multiple_hashes(input_data), | assert service.lookup_multiple_hashes(input_data) == expected_output | ||||
expected_output) | |||||
def test_lookup_hash_does_not_exist(self): | def test_lookup_hash_does_not_exist(): | ||||
unknown_content_ = random_content() | unknown_content_ = random_content() | ||||
actual_lookup = service.lookup_hash('sha1_git:%s' % | actual_lookup = service.lookup_hash('sha1_git:%s' % | ||||
unknown_content_['sha1_git']) | unknown_content_['sha1_git']) | ||||
self.assertEqual(actual_lookup, {'found': None, | assert actual_lookup == {'found': None, 'algo': 'sha1_git'} | ||||
'algo': 'sha1_git'}) | |||||
@given(content()) | |||||
def test_lookup_hash_exist(self, content): | |||||
@given(content()) | |||||
def test_lookup_hash_exist(archive_data, content): | |||||
actual_lookup = service.lookup_hash('sha1:%s' % content['sha1']) | actual_lookup = service.lookup_hash('sha1:%s' % content['sha1']) | ||||
content_metadata = self.content_get_metadata(content['sha1']) | content_metadata = archive_data.content_get_metadata(content['sha1']) | ||||
assert {'found': content_metadata, 'algo': 'sha1'} == actual_lookup | |||||
self.assertEqual({'found': content_metadata, | |||||
'algo': 'sha1'}, actual_lookup) | |||||
def test_search_hash_does_not_exist(self): | def test_search_hash_does_not_exist(): | ||||
unknown_content_ = random_content() | unknown_content_ = random_content() | ||||
actual_lookup = service.search_hash('sha1_git:%s' % | actual_lookup = service.search_hash('sha1_git:%s' % | ||||
unknown_content_['sha1_git']) | unknown_content_['sha1_git']) | ||||
self.assertEqual({'found': False}, actual_lookup) | assert {'found': False} == actual_lookup | ||||
@given(content()) | |||||
def test_search_hash_exist(self, content): | |||||
@given(content()) | |||||
def test_search_hash_exist(content): | |||||
actual_lookup = service.search_hash('sha1:%s' % content['sha1']) | actual_lookup = service.search_hash('sha1:%s' % content['sha1']) | ||||
self.assertEqual({'found': True}, actual_lookup) | assert {'found': True} == actual_lookup | ||||
@pytest.mark.skipif(ctags_json_missing, | @pytest.mark.skipif(ctags_json_missing, | ||||
reason="requires ctags with json output support") | reason="requires ctags with json output support") | ||||
@given(contents_with_ctags()) | @given(contents_with_ctags()) | ||||
def test_lookup_content_ctags(self, contents_with_ctags): | def test_lookup_content_ctags(indexer_data, contents_with_ctags): | ||||
content_sha1 = random.choice(contents_with_ctags['sha1s']) | content_sha1 = random.choice(contents_with_ctags['sha1s']) | ||||
self.content_add_ctags(content_sha1) | indexer_data.content_add_ctags(content_sha1) | ||||
actual_ctags = \ | actual_ctags = list(service.lookup_content_ctags('sha1:%s' % content_sha1)) | ||||
list(service.lookup_content_ctags('sha1:%s' % content_sha1)) | |||||
expected_data = list(self.content_get_ctags(content_sha1)) | expected_data = list(indexer_data.content_get_ctags(content_sha1)) | ||||
for ctag in expected_data: | for ctag in expected_data: | ||||
ctag['id'] = content_sha1 | ctag['id'] = content_sha1 | ||||
self.assertEqual(actual_ctags, expected_data) | assert actual_ctags == expected_data | ||||
def test_lookup_content_ctags_no_hash(self): | def test_lookup_content_ctags_no_hash(): | ||||
unknown_content_ = random_content() | unknown_content_ = random_content() | ||||
actual_ctags = \ | actual_ctags = list(service.lookup_content_ctags('sha1:%s' % | ||||
list(service.lookup_content_ctags('sha1:%s' % | |||||
unknown_content_['sha1'])) | unknown_content_['sha1'])) | ||||
self.assertEqual(actual_ctags, []) | assert actual_ctags == [] | ||||
@given(content()) | |||||
def test_lookup_content_filetype(self, content): | |||||
self.content_add_mimetype(content['sha1']) | @given(content()) | ||||
def test_lookup_content_filetype(indexer_data, content): | |||||
indexer_data.content_add_mimetype(content['sha1']) | |||||
actual_filetype = service.lookup_content_filetype(content['sha1']) | actual_filetype = service.lookup_content_filetype(content['sha1']) | ||||
expected_filetype = self.content_get_mimetype(content['sha1']) | expected_filetype = indexer_data.content_get_mimetype(content['sha1']) | ||||
self.assertEqual(actual_filetype, expected_filetype) | assert actual_filetype == expected_filetype | ||||
@pytest.mark.xfail # Language indexer is disabled. | |||||
@given(content()) | |||||
def test_lookup_content_language(self, content): | |||||
self.content_add_language(content['sha1']) | @pytest.mark.skip # Language indexer is disabled. | ||||
@given(content()) | |||||
def test_lookup_content_language(indexer_data, content): | |||||
indexer_data.content_add_language(content['sha1']) | |||||
actual_language = service.lookup_content_language(content['sha1']) | actual_language = service.lookup_content_language(content['sha1']) | ||||
expected_language = self.content_get_language(content['sha1']) | expected_language = indexer_data.content_get_language(content['sha1']) | ||||
self.assertEqual(actual_language, expected_language) | assert actual_language == expected_language | ||||
@given(contents_with_ctags()) | |||||
def test_lookup_expression(self, contents_with_ctags): | |||||
@given(contents_with_ctags()) | |||||
def test_lookup_expression(indexer_data, contents_with_ctags): | |||||
per_page = 10 | per_page = 10 | ||||
expected_ctags = [] | expected_ctags = [] | ||||
for content_sha1 in contents_with_ctags['sha1s']: | for content_sha1 in contents_with_ctags['sha1s']: | ||||
if len(expected_ctags) == per_page: | if len(expected_ctags) == per_page: | ||||
break | break | ||||
self.content_add_ctags(content_sha1) | indexer_data.content_add_ctags(content_sha1) | ||||
for ctag in self.content_get_ctags(content_sha1): | for ctag in indexer_data.content_get_ctags(content_sha1): | ||||
if len(expected_ctags) == per_page: | if len(expected_ctags) == per_page: | ||||
break | break | ||||
if ctag['name'] == contents_with_ctags['symbol_name']: | if ctag['name'] == contents_with_ctags['symbol_name']: | ||||
del ctag['id'] | del ctag['id'] | ||||
ctag['sha1'] = content_sha1 | ctag['sha1'] = content_sha1 | ||||
expected_ctags.append(ctag) | expected_ctags.append(ctag) | ||||
actual_ctags = \ | actual_ctags = list( | ||||
list(service.lookup_expression(contents_with_ctags['symbol_name'], | service.lookup_expression(contents_with_ctags['symbol_name'], | ||||
last_sha1=None, per_page=10)) | last_sha1=None, per_page=10)) | ||||
self.assertEqual(actual_ctags, expected_ctags) | assert actual_ctags == expected_ctags | ||||
def test_lookup_expression_no_result(self): | |||||
def test_lookup_expression_no_result(): | |||||
expected_ctags = [] | expected_ctags = [] | ||||
actual_ctags = \ | actual_ctags = list(service.lookup_expression('barfoo', last_sha1=None, | ||||
list(service.lookup_expression('barfoo', last_sha1=None, | |||||
per_page=10)) | per_page=10)) | ||||
self.assertEqual(actual_ctags, expected_ctags) | assert actual_ctags == expected_ctags | ||||
@pytest.mark.skipif(fossology_missing, | @pytest.mark.skipif(fossology_missing, | ||||
reason="requires fossology-nomossa installed") | reason="requires fossology-nomossa installed") | ||||
@given(content()) | @given(content()) | ||||
def test_lookup_content_license(self, content): | def test_lookup_content_license(indexer_data, content): | ||||
indexer_data.content_add_license(content['sha1']) | |||||
self.content_add_license(content['sha1']) | |||||
actual_license = service.lookup_content_license(content['sha1']) | actual_license = service.lookup_content_license(content['sha1']) | ||||
expected_license = self.content_get_license(content['sha1']) | expected_license = indexer_data.content_get_license(content['sha1']) | ||||
self.assertEqual(actual_license, expected_license) | assert actual_license == expected_license | ||||
def test_stat_counters(self): | def test_stat_counters(archive_data): | ||||
actual_stats = service.stat_counters() | actual_stats = service.stat_counters() | ||||
self.assertEqual(actual_stats, self.storage.stat_counters()) | assert actual_stats == archive_data.stat_counters() | ||||
@given(new_origin(), visit_dates()) | |||||
def test_lookup_origin_visits(self, new_origin, visit_dates): | |||||
self.storage.origin_add_one(new_origin) | @given(new_origin(), visit_dates()) | ||||
def test_lookup_origin_visits(archive_data, new_origin, visit_dates): | |||||
archive_data.origin_add_one(new_origin) | |||||
for ts in visit_dates: | for ts in visit_dates: | ||||
self.storage.origin_visit_add(new_origin['url'], ts, type='git') | archive_data.origin_visit_add( | ||||
new_origin['url'], ts, type='git') | |||||
actual_origin_visits = list( | actual_origin_visits = list( | ||||
service.lookup_origin_visits(new_origin['url'], per_page=100)) | service.lookup_origin_visits(new_origin['url'], per_page=100)) | ||||
expected_visits = self.origin_visit_get(new_origin['url']) | expected_visits = archive_data.origin_visit_get(new_origin['url']) | ||||
for expected_visit in expected_visits: | for expected_visit in expected_visits: | ||||
expected_visit['origin'] = new_origin['url'] | expected_visit['origin'] = new_origin['url'] | ||||
self.assertEqual(actual_origin_visits, expected_visits) | assert actual_origin_visits == expected_visits | ||||
@given(new_origin(), visit_dates()) | @given(new_origin(), visit_dates()) | ||||
def test_lookup_origin_visit(self, new_origin, visit_dates): | def test_lookup_origin_visit(archive_data, new_origin, visit_dates): | ||||
self.storage.origin_add_one(new_origin) | archive_data.origin_add_one(new_origin) | ||||
visits = [] | visits = [] | ||||
for ts in visit_dates: | for ts in visit_dates: | ||||
visits.append(self.storage.origin_visit_add( | visits.append(archive_data.origin_visit_add( | ||||
new_origin['url'], ts, type='git')) | new_origin['url'], ts, type='git')) | ||||
visit = random.choice(visits)['visit'] | visit = random.choice(visits)['visit'] | ||||
actual_origin_visit = service.lookup_origin_visit( | actual_origin_visit = service.lookup_origin_visit( | ||||
new_origin['url'], visit) | new_origin['url'], visit) | ||||
expected_visit = dict(self.storage.origin_visit_get_by( | expected_visit = dict(archive_data.origin_visit_get_by( | ||||
new_origin['url'], visit)) | new_origin['url'], visit)) | ||||
expected_visit['date'] = expected_visit['date'].isoformat() | |||||
expected_visit['metadata'] = {} | |||||
expected_visit['origin'] = new_origin['url'] | |||||
self.assertEqual(actual_origin_visit, expected_visit) | assert actual_origin_visit == expected_visit | ||||
@given(new_origin()) | @given(new_origin()) | ||||
def test_lookup_origin(self, new_origin): | def test_lookup_origin(archive_data, new_origin): | ||||
self.storage.origin_add_one(new_origin) | archive_data.origin_add_one(new_origin) | ||||
actual_origin = service.lookup_origin({'url': new_origin['url']}) | actual_origin = service.lookup_origin({'url': new_origin['url']}) | ||||
expected_origin = self.storage.origin_get({'url': new_origin['url']}) | expected_origin = archive_data.origin_get( | ||||
self.assertEqual(actual_origin, expected_origin) | {'url': new_origin['url']}) | ||||
assert actual_origin == expected_origin | |||||
@given(invalid_sha1()) | @given(invalid_sha1()) | ||||
def test_lookup_release_ko_id_checksum_not_a_sha1(self, invalid_sha1): | def test_lookup_release_ko_id_checksum_not_a_sha1(invalid_sha1): | ||||
with self.assertRaises(BadInputExc) as cm: | with pytest.raises(BadInputExc) as e: | ||||
service.lookup_release(invalid_sha1) | service.lookup_release(invalid_sha1) | ||||
self.assertIn('invalid checksum', cm.exception.args[0].lower()) | assert e.match('Invalid checksum') | ||||
@given(sha256()) | @given(sha256()) | ||||
def test_lookup_release_ko_id_checksum_too_long(self, sha256): | def test_lookup_release_ko_id_checksum_too_long(sha256): | ||||
with self.assertRaises(BadInputExc) as cm: | with pytest.raises(BadInputExc) as e: | ||||
service.lookup_release(sha256) | service.lookup_release(sha256) | ||||
self.assertEqual('Only sha1_git is supported.', cm.exception.args[0]) | assert e.match('Only sha1_git is supported.') | ||||
@given(directory()) | @given(directory()) | ||||
def test_lookup_directory_with_path_not_found(self, directory): | def test_lookup_directory_with_path_not_found(directory): | ||||
path = 'some/invalid/path/here' | path = 'some/invalid/path/here' | ||||
with self.assertRaises(NotFoundExc) as cm: | with pytest.raises(NotFoundExc) as e: | ||||
service.lookup_directory_with_path(directory, path) | service.lookup_directory_with_path(directory, path) | ||||
self.assertEqual('Directory entry with path %s from %s ' | assert e.match('Directory entry with path %s from %s not found' % | ||||
'not found' % (path, directory), | (path, directory)) | ||||
cm.exception.args[0]) | |||||
@given(directory()) | @given(directory()) | ||||
def test_lookup_directory_with_path_found(self, directory): | def test_lookup_directory_with_path_found(archive_data, directory): | ||||
directory_content = self.directory_ls(directory) | directory_content = archive_data.directory_ls(directory) | ||||
directory_entry = random.choice(directory_content) | directory_entry = random.choice(directory_content) | ||||
path = directory_entry['name'] | path = directory_entry['name'] | ||||
actual_result = service.lookup_directory_with_path(directory, path) | actual_result = service.lookup_directory_with_path(directory, path) | ||||
self.assertEqual(actual_result, directory_entry) | assert actual_result == directory_entry | ||||
@given(release()) | @given(release()) | ||||
def test_lookup_release(self, release): | def test_lookup_release(archive_data, release): | ||||
actual_release = service.lookup_release(release) | actual_release = service.lookup_release(release) | ||||
self.assertEqual(actual_release, | assert actual_release == archive_data.release_get(release) | ||||
self.release_get(release)) | |||||
@given(revision(), invalid_sha1(), sha256()) | @given(revision(), invalid_sha1(), sha256()) | ||||
def test_lookup_revision_with_context_ko_not_a_sha1(self, revision, | def test_lookup_revision_with_context_ko_not_a_sha1(revision, | ||||
invalid_sha1, | invalid_sha1, | ||||
sha256): | sha256): | ||||
sha1_git_root = revision | sha1_git_root = revision | ||||
sha1_git = invalid_sha1 | sha1_git = invalid_sha1 | ||||
with self.assertRaises(BadInputExc) as cm: | with pytest.raises(BadInputExc) as e: | ||||
service.lookup_revision_with_context(sha1_git_root, sha1_git) | service.lookup_revision_with_context(sha1_git_root, sha1_git) | ||||
self.assertIn('Invalid checksum query string', cm.exception.args[0]) | assert e.match('Invalid checksum query string') | ||||
sha1_git = sha256 | sha1_git = sha256 | ||||
with self.assertRaises(BadInputExc) as cm: | with pytest.raises(BadInputExc) as e: | ||||
service.lookup_revision_with_context(sha1_git_root, sha1_git) | service.lookup_revision_with_context(sha1_git_root, sha1_git) | ||||
self.assertIn('Only sha1_git is supported', cm.exception.args[0]) | assert e.match('Only sha1_git is supported') | ||||
@given(revision(), unknown_revision()) | @given(revision(), unknown_revision()) | ||||
def test_lookup_revision_with_context_ko_sha1_git_does_not_exist( | def test_lookup_revision_with_context_ko_sha1_git_does_not_exist( | ||||
self, revision, unknown_revision): | revision, unknown_revision): | ||||
sha1_git_root = revision | sha1_git_root = revision | ||||
sha1_git = unknown_revision | sha1_git = unknown_revision | ||||
with self.assertRaises(NotFoundExc) as cm: | with pytest.raises(NotFoundExc) as e: | ||||
service.lookup_revision_with_context(sha1_git_root, sha1_git) | service.lookup_revision_with_context(sha1_git_root, sha1_git) | ||||
self.assertIn('Revision %s not found' % sha1_git, cm.exception.args[0]) | assert e.match('Revision %s not found' % sha1_git) | ||||
@given(revision(), unknown_revision()) | @given(revision(), unknown_revision()) | ||||
def test_lookup_revision_with_context_ko_root_sha1_git_does_not_exist( | def test_lookup_revision_with_context_ko_root_sha1_git_does_not_exist( | ||||
self, revision, unknown_revision): | revision, unknown_revision): | ||||
sha1_git_root = unknown_revision | sha1_git_root = unknown_revision | ||||
sha1_git = revision | sha1_git = revision | ||||
with self.assertRaises(NotFoundExc) as cm: | with pytest.raises(NotFoundExc) as e: | ||||
service.lookup_revision_with_context(sha1_git_root, sha1_git) | service.lookup_revision_with_context(sha1_git_root, sha1_git) | ||||
self.assertIn('Revision root %s not found' % sha1_git_root, | assert e.match('Revision root %s not found' % sha1_git_root) | ||||
cm.exception.args[0]) | |||||
@given(ancestor_revisions()) | @given(ancestor_revisions()) | ||||
def test_lookup_revision_with_context(self, ancestor_revisions): | def test_lookup_revision_with_context(archive_data, ancestor_revisions): | ||||
sha1_git = ancestor_revisions['sha1_git'] | sha1_git = ancestor_revisions['sha1_git'] | ||||
root_sha1_git = ancestor_revisions['sha1_git_root'] | root_sha1_git = ancestor_revisions['sha1_git_root'] | ||||
for sha1_git_root in (root_sha1_git, | for sha1_git_root in (root_sha1_git, | ||||
{'id': hash_to_bytes(root_sha1_git)}): | {'id': hash_to_bytes(root_sha1_git)}): | ||||
actual_revision = \ | actual_revision = service.lookup_revision_with_context(sha1_git_root, | ||||
service.lookup_revision_with_context(sha1_git_root, | |||||
sha1_git) | sha1_git) | ||||
children = [] | children = [] | ||||
for rev in self.revision_log(root_sha1_git): | for rev in archive_data.revision_log(root_sha1_git): | ||||
for p_rev in rev['parents']: | for p_rev in rev['parents']: | ||||
p_rev_hex = hash_to_hex(p_rev) | p_rev_hex = hash_to_hex(p_rev) | ||||
if p_rev_hex == sha1_git: | if p_rev_hex == sha1_git: | ||||
children.append(rev['id']) | children.append(rev['id']) | ||||
expected_revision = self.revision_get(sha1_git) | expected_revision = archive_data.revision_get(sha1_git) | ||||
expected_revision['children'] = children | expected_revision['children'] = children | ||||
self.assertEqual(actual_revision, expected_revision) | assert actual_revision == expected_revision | ||||
@given(non_ancestor_revisions()) | @given(non_ancestor_revisions()) | ||||
def test_lookup_revision_with_context_ko(self, non_ancestor_revisions): | def test_lookup_revision_with_context_ko(non_ancestor_revisions): | ||||
sha1_git = non_ancestor_revisions['sha1_git'] | sha1_git = non_ancestor_revisions['sha1_git'] | ||||
root_sha1_git = non_ancestor_revisions['sha1_git_root'] | root_sha1_git = non_ancestor_revisions['sha1_git_root'] | ||||
with self.assertRaises(NotFoundExc) as cm: | with pytest.raises(NotFoundExc) as e: | ||||
service.lookup_revision_with_context(root_sha1_git, sha1_git) | service.lookup_revision_with_context(root_sha1_git, sha1_git) | ||||
self.assertIn('Revision %s is not an ancestor of %s' % | assert e.match('Revision %s is not an ancestor of %s' % | ||||
(sha1_git, root_sha1_git), cm.exception.args[0]) | (sha1_git, root_sha1_git)) | ||||
def test_lookup_directory_with_revision_not_found(self): | def test_lookup_directory_with_revision_not_found(): | ||||
unknown_revision_ = random_sha1() | unknown_revision_ = random_sha1() | ||||
with self.assertRaises(NotFoundExc) as cm: | with pytest.raises(NotFoundExc) as e: | ||||
service.lookup_directory_with_revision(unknown_revision_) | service.lookup_directory_with_revision(unknown_revision_) | ||||
self.assertIn('Revision %s not found' % unknown_revision_, | assert e.match('Revision %s not found' % unknown_revision_) | ||||
cm.exception.args[0]) | |||||
def test_lookup_directory_with_revision_unknown_content(self): | def test_lookup_directory_with_revision_unknown_content(archive_data): | ||||
unknown_content_ = random_content() | unknown_content_ = random_content() | ||||
unknown_revision_ = random_sha1() | unknown_revision_ = random_sha1() | ||||
unknown_directory_ = random_sha1() | unknown_directory_ = random_sha1() | ||||
dir_path = 'README.md' | dir_path = 'README.md' | ||||
# Create a revision that points to a directory | # Create a revision that points to a directory | ||||
# Which points to unknown content | # Which points to unknown content | ||||
revision = { | revision = { | ||||
'author': { | 'author': { | ||||
'name': b'abcd', | 'name': b'abcd', | ||||
'email': b'abcd@company.org', | 'email': b'abcd@company.org', | ||||
'fullname': b'abcd abcd' | 'fullname': b'abcd abcd' | ||||
}, | }, | ||||
'committer': { | 'committer': { | ||||
'email': b'aaaa@company.org', | 'email': b'aaaa@company.org', | ||||
'fullname': b'aaaa aaa', | 'fullname': b'aaaa aaa', | ||||
'name': b'aaa' | 'name': b'aaa' | ||||
}, | }, | ||||
'committer_date': { | 'committer_date': { | ||||
'negative_utc': False, | 'negative_utc': False, | ||||
'offset': 0, | 'offset': 0, | ||||
'timestamp': 1437511651 | 'timestamp': 1437511651 | ||||
}, | }, | ||||
'date': { | 'date': { | ||||
'negative_utc': False, | 'negative_utc': False, | ||||
'offset': 0, | 'offset': 0, | ||||
'timestamp': 1437511651 | 'timestamp': 1437511651 | ||||
}, | }, | ||||
'message': b'bleh', | 'message': b'bleh', | ||||
'metadata': [], | 'metadata': [], | ||||
'parents': [], | 'parents': [], | ||||
'synthetic': False, | 'synthetic': False, | ||||
'type': 'git', | 'type': 'git', | ||||
'id': hash_to_bytes(unknown_revision_), | 'id': hash_to_bytes(unknown_revision_), | ||||
'directory': hash_to_bytes(unknown_directory_) | 'directory': hash_to_bytes(unknown_directory_) | ||||
} | } | ||||
# A directory that points to unknown content | # A directory that points to unknown content | ||||
dir = { | dir = { | ||||
'id': hash_to_bytes(unknown_directory_), | 'id': hash_to_bytes(unknown_directory_), | ||||
'entries': [{ | 'entries': [{ | ||||
'name': bytes(dir_path.encode('utf-8')), | 'name': bytes(dir_path.encode('utf-8')), | ||||
'type': 'file', | 'type': 'file', | ||||
'target': hash_to_bytes(unknown_content_['sha1_git']), | 'target': hash_to_bytes(unknown_content_['sha1_git']), | ||||
'perms': DentryPerms.content | 'perms': DentryPerms.content | ||||
}] | }] | ||||
} | } | ||||
# Add the directory and revision in mem | # Add the directory and revision in mem | ||||
self.storage.directory_add([dir]) | archive_data.directory_add([dir]) | ||||
self.storage.revision_add([revision]) | archive_data.revision_add([revision]) | ||||
with self.assertRaises(NotFoundExc) as cm: | with pytest.raises(NotFoundExc) as e: | ||||
service.lookup_directory_with_revision( | service.lookup_directory_with_revision(unknown_revision_, dir_path) | ||||
unknown_revision_, dir_path) | assert e.match('Content not found for revision %s' % unknown_revision_) | ||||
self.assertIn('Content not found for revision %s' % | |||||
unknown_revision_, | |||||
cm.exception.args[0]) | |||||
@given(revision()) | @given(revision()) | ||||
def test_lookup_directory_with_revision_ko_path_to_nowhere( | def test_lookup_directory_with_revision_ko_path_to_nowhere(revision): | ||||
self, revision): | |||||
invalid_path = 'path/to/something/unknown' | invalid_path = 'path/to/something/unknown' | ||||
with self.assertRaises(NotFoundExc) as cm: | with pytest.raises(NotFoundExc) as e: | ||||
service.lookup_directory_with_revision(revision, invalid_path) | service.lookup_directory_with_revision(revision, invalid_path) | ||||
exception_text = cm.exception.args[0].lower() | assert e.match('Directory or File') | ||||
self.assertIn('directory or file', exception_text) | assert e.match(invalid_path) | ||||
self.assertIn(invalid_path, exception_text) | assert e.match('revision %s' % revision) | ||||
self.assertIn('revision %s' % revision, exception_text) | assert e.match('not found') | ||||
self.assertIn('not found', exception_text) | |||||
@given(revision_with_submodules()) | |||||
def test_lookup_directory_with_revision_submodules( | |||||
self, revision_with_submodules): | |||||
@given(revision_with_submodules()) | |||||
def test_lookup_directory_with_revision_submodules(archive_data, | |||||
revision_with_submodules): | |||||
rev_sha1_git = revision_with_submodules['rev_sha1_git'] | rev_sha1_git = revision_with_submodules['rev_sha1_git'] | ||||
rev_dir_path = revision_with_submodules['rev_dir_rev_path'] | rev_dir_path = revision_with_submodules['rev_dir_rev_path'] | ||||
actual_data = service.lookup_directory_with_revision( | actual_data = service.lookup_directory_with_revision( | ||||
rev_sha1_git, rev_dir_path) | rev_sha1_git, rev_dir_path) | ||||
revision = self.revision_get(revision_with_submodules['rev_sha1_git']) | revision = archive_data.revision_get( | ||||
directory = self.directory_ls(revision['directory']) | revision_with_submodules['rev_sha1_git']) | ||||
directory = archive_data.directory_ls(revision['directory']) | |||||
rev_entry = next(e for e in directory if e['name'] == rev_dir_path) | rev_entry = next(e for e in directory if e['name'] == rev_dir_path) | ||||
expected_data = { | expected_data = { | ||||
'content': self.revision_get(rev_entry['target']), | 'content': archive_data.revision_get(rev_entry['target']), | ||||
'path': rev_dir_path, | 'path': rev_dir_path, | ||||
'revision': rev_sha1_git, | 'revision': rev_sha1_git, | ||||
'type': 'rev' | 'type': 'rev' | ||||
} | } | ||||
self.assertEqual(actual_data, expected_data) | assert actual_data == expected_data | ||||
@given(revision()) | @given(revision()) | ||||
def test_lookup_directory_with_revision_without_path(self, revision): | def test_lookup_directory_with_revision_without_path(archive_data, revision): | ||||
actual_directory_entries = service.lookup_directory_with_revision(revision) | |||||
actual_directory_entries = \ | revision_data = archive_data.revision_get(revision) | ||||
service.lookup_directory_with_revision(revision) | expected_directory_entries = archive_data.directory_ls( | ||||
revision_data['directory']) | |||||
revision_data = self.revision_get(revision) | assert actual_directory_entries['type'] == 'dir' | ||||
expected_directory_entries = \ | assert actual_directory_entries['content'] == expected_directory_entries | ||||
self.directory_ls(revision_data['directory']) | |||||
self.assertEqual(actual_directory_entries['type'], 'dir') | |||||
self.assertEqual(actual_directory_entries['content'], | |||||
expected_directory_entries) | |||||
@given(revision()) | |||||
def test_lookup_directory_with_revision_with_path(self, revision): | |||||
revision_data = self.revision_get(revision) | @given(revision()) | ||||
dir_entries = [e for e in self.directory_ls(revision_data['directory']) | def test_lookup_directory_with_revision_with_path(archive_data, revision): | ||||
rev_data = archive_data.revision_get(revision) | |||||
dir_entries = [e for e in archive_data.directory_ls(rev_data['directory']) | |||||
if e['type'] in ('file', 'dir')] | if e['type'] in ('file', 'dir')] | ||||
expected_dir_entry = random.choice(dir_entries) | expected_dir_entry = random.choice(dir_entries) | ||||
actual_dir_entry = \ | actual_dir_entry = service.lookup_directory_with_revision( | ||||
service.lookup_directory_with_revision(revision, | revision, expected_dir_entry['name']) | ||||
expected_dir_entry['name']) | |||||
assert actual_dir_entry['type'] == expected_dir_entry['type'] | |||||
self.assertEqual(actual_dir_entry['type'], expected_dir_entry['type']) | assert actual_dir_entry['revision'] == revision | ||||
self.assertEqual(actual_dir_entry['revision'], revision) | assert actual_dir_entry['path'] == expected_dir_entry['name'] | ||||
self.assertEqual(actual_dir_entry['path'], expected_dir_entry['name']) | |||||
if actual_dir_entry['type'] == 'file': | if actual_dir_entry['type'] == 'file': | ||||
del actual_dir_entry['content']['checksums']['blake2s256'] | del actual_dir_entry['content']['checksums']['blake2s256'] | ||||
for key in ('checksums', 'status', 'length'): | for key in ('checksums', 'status', 'length'): | ||||
self.assertEqual(actual_dir_entry['content'][key], | assert actual_dir_entry['content'][key] == expected_dir_entry[key] | ||||
expected_dir_entry[key]) | |||||
else: | else: | ||||
sub_dir_entries = self.directory_ls(expected_dir_entry['target']) | sub_dir_entries = archive_data.directory_ls( | ||||
self.assertEqual(actual_dir_entry['content'], sub_dir_entries) | expected_dir_entry['target']) | ||||
assert actual_dir_entry['content'] == sub_dir_entries | |||||
@given(revision()) | @given(revision()) | ||||
def test_lookup_directory_with_revision_with_path_to_file_and_data( | def test_lookup_directory_with_revision_with_path_to_file_and_data( | ||||
self, revision): | archive_data, revision): | ||||
rev_data = archive_data.revision_get(revision) | |||||
revision_data = self.revision_get(revision) | dir_entries = [e for e in archive_data.directory_ls(rev_data['directory']) | ||||
dir_entries = [e for e in self.directory_ls(revision_data['directory']) | |||||
if e['type'] == 'file'] | if e['type'] == 'file'] | ||||
expected_dir_entry = random.choice(dir_entries) | expected_dir_entry = random.choice(dir_entries) | ||||
expected_data = \ | expected_data = archive_data.content_get( | ||||
self.content_get(expected_dir_entry['checksums']['sha1']) | expected_dir_entry['checksums']['sha1']) | ||||
actual_dir_entry = \ | actual_dir_entry = service.lookup_directory_with_revision( | ||||
service.lookup_directory_with_revision(revision, | revision, expected_dir_entry['name'], with_data=True) | ||||
expected_dir_entry['name'], | |||||
with_data=True) | assert actual_dir_entry['type'] == expected_dir_entry['type'] | ||||
assert actual_dir_entry['revision'] == revision | |||||
self.assertEqual(actual_dir_entry['type'], expected_dir_entry['type']) | assert actual_dir_entry['path'] == expected_dir_entry['name'] | ||||
self.assertEqual(actual_dir_entry['revision'], revision) | |||||
self.assertEqual(actual_dir_entry['path'], expected_dir_entry['name']) | |||||
del actual_dir_entry['content']['checksums']['blake2s256'] | del actual_dir_entry['content']['checksums']['blake2s256'] | ||||
for key in ('checksums', 'status', 'length'): | for key in ('checksums', 'status', 'length'): | ||||
self.assertEqual(actual_dir_entry['content'][key], | assert actual_dir_entry['content'][key] == expected_dir_entry[key] | ||||
expected_dir_entry[key]) | assert actual_dir_entry['content']['data'] == expected_data['data'] | ||||
self.assertEqual(actual_dir_entry['content']['data'], | |||||
expected_data['data']) | |||||
@given(revision()) | @given(revision()) | ||||
def test_lookup_revision(self, revision): | def test_lookup_revision(archive_data, revision): | ||||
actual_revision = service.lookup_revision(revision) | actual_revision = service.lookup_revision(revision) | ||||
self.assertEqual(actual_revision, self.revision_get(revision)) | assert actual_revision == archive_data.revision_get(revision) | ||||
@given(new_revision()) | |||||
def test_lookup_revision_invalid_msg(self, new_revision): | |||||
@given(new_revision()) | |||||
def test_lookup_revision_invalid_msg(archive_data, new_revision): | |||||
new_revision['message'] = b'elegant fix for bug \xff' | new_revision['message'] = b'elegant fix for bug \xff' | ||||
self.storage.revision_add([new_revision]) | archive_data.revision_add([new_revision]) | ||||
revision = service.lookup_revision(hash_to_hex(new_revision['id'])) | revision = service.lookup_revision(hash_to_hex(new_revision['id'])) | ||||
self.assertEqual(revision['message'], None) | assert revision['message'] is None | ||||
self.assertEqual(revision['message_decoding_failed'], True) | assert revision['message_decoding_failed'] is True | ||||
@given(new_revision()) | |||||
def test_lookup_revision_msg_ok(self, new_revision): | |||||
self.storage.revision_add([new_revision]) | @given(new_revision()) | ||||
def test_lookup_revision_msg_ok(archive_data, new_revision): | |||||
archive_data.revision_add([new_revision]) | |||||
revision_message = service.lookup_revision_message( | revision_message = service.lookup_revision_message( | ||||
hash_to_hex(new_revision['id'])) | hash_to_hex(new_revision['id'])) | ||||
self.assertEqual(revision_message, | assert revision_message == {'message': new_revision['message']} | ||||
{'message': new_revision['message']}) | |||||
def test_lookup_revision_msg_no_rev(self): | def test_lookup_revision_msg_no_rev(): | ||||
unknown_revision_ = random_sha1() | unknown_revision_ = random_sha1() | ||||
with self.assertRaises(NotFoundExc) as cm: | with pytest.raises(NotFoundExc) as e: | ||||
service.lookup_revision_message(unknown_revision_) | service.lookup_revision_message(unknown_revision_) | ||||
self.assertEqual( | assert e.match('Revision with sha1_git %s not found.' % unknown_revision_) | ||||
cm.exception.args[0], | |||||
'Revision with sha1_git %s not found.' % unknown_revision_ | |||||
) | |||||
@given(revisions()) | |||||
def test_lookup_revision_multiple(self, revisions): | |||||
@given(revisions()) | |||||
def test_lookup_revision_multiple(archive_data, revisions): | |||||
actual_revisions = list(service.lookup_revision_multiple(revisions)) | actual_revisions = list(service.lookup_revision_multiple(revisions)) | ||||
expected_revisions = [] | expected_revisions = [] | ||||
for rev in revisions: | for rev in revisions: | ||||
expected_revisions.append(self.revision_get(rev)) | expected_revisions.append(archive_data.revision_get(rev)) | ||||
self.assertEqual(actual_revisions, expected_revisions) | assert actual_revisions == expected_revisions | ||||
def test_lookup_revision_multiple_none_found(self): | |||||
def test_lookup_revision_multiple_none_found(): | |||||
unknown_revisions_ = [random_sha1(), random_sha1(), random_sha1()] | unknown_revisions_ = [random_sha1(), random_sha1(), random_sha1()] | ||||
actual_revisions = \ | actual_revisions = list( | ||||
list(service.lookup_revision_multiple(unknown_revisions_)) | service.lookup_revision_multiple(unknown_revisions_)) | ||||
assert actual_revisions == [None] * len(unknown_revisions_) | |||||
self.assertEqual(actual_revisions, [None] * len(unknown_revisions_)) | |||||
@given(revision()) | @given(revision()) | ||||
def test_lookup_revision_log(self, revision): | def test_lookup_revision_log(archive_data, revision): | ||||
actual_revision_log = list(service.lookup_revision_log(revision, limit=25)) | |||||
expected_revision_log = archive_data.revision_log(revision, limit=25) | |||||
actual_revision_log = \ | assert actual_revision_log == expected_revision_log | ||||
list(service.lookup_revision_log(revision, limit=25)) | |||||
expected_revision_log = self.revision_log(revision, limit=25) | |||||
self.assertEqual(actual_revision_log, expected_revision_log) | |||||
def _get_origin_branches(self, origin): | def _get_origin_branches(archive_data, origin): | ||||
origin_visit = self.origin_visit_get(origin['url'])[-1] | origin_visit = archive_data.origin_visit_get(origin['url'])[-1] | ||||
snapshot = self.snapshot_get(origin_visit['snapshot']) | snapshot = archive_data.snapshot_get(origin_visit['snapshot']) | ||||
branches = {k: v for (k, v) in snapshot['branches'].items() | branches = {k: v for (k, v) in snapshot['branches'].items() | ||||
if v['target_type'] == 'revision'} | if v['target_type'] == 'revision'} | ||||
return branches | return branches | ||||
@given(origin()) | |||||
def test_lookup_revision_log_by(self, origin): | |||||
branches = self._get_origin_branches(origin) | @given(origin()) | ||||
def test_lookup_revision_log_by(archive_data, origin): | |||||
branches = _get_origin_branches(archive_data, origin) | |||||
branch_name = random.choice(list(branches.keys())) | branch_name = random.choice(list(branches.keys())) | ||||
actual_log = \ | actual_log = list( | ||||
list(service.lookup_revision_log_by(origin['url'], branch_name, | service.lookup_revision_log_by(origin['url'], branch_name, | ||||
None, limit=25)) | None, limit=25)) | ||||
expected_log = \ | expected_log = archive_data.revision_log( | ||||
self.revision_log(branches[branch_name]['target'], limit=25) | branches[branch_name]['target'], limit=25) | ||||
self.assertEqual(actual_log, expected_log) | assert actual_log == expected_log | ||||
@given(origin()) | |||||
def test_lookup_revision_log_by_notfound(self, origin): | |||||
with self.assertRaises(NotFoundExc): | @given(origin()) | ||||
def test_lookup_revision_log_by_notfound(origin): | |||||
with pytest.raises(NotFoundExc): | |||||
service.lookup_revision_log_by( | service.lookup_revision_log_by( | ||||
origin['url'], 'unknown_branch_name', None, limit=100) | origin['url'], 'unknown_branch_name', None, limit=100) | ||||
def test_lookup_content_raw_not_found(self): | |||||
def test_lookup_content_raw_not_found(): | |||||
unknown_content_ = random_content() | unknown_content_ = random_content() | ||||
with self.assertRaises(NotFoundExc) as cm: | with pytest.raises(NotFoundExc) as e: | ||||
service.lookup_content_raw('sha1:' + unknown_content_['sha1']) | service.lookup_content_raw('sha1:' + unknown_content_['sha1']) | ||||
self.assertIn(cm.exception.args[0], | assert e.match('Content with %s checksum equals to %s not found!' % | ||||
'Content with %s checksum equals to %s not found!' % | |||||
('sha1', unknown_content_['sha1'])) | ('sha1', unknown_content_['sha1'])) | ||||
@given(content()) | |||||
def test_lookup_content_raw(self, content): | |||||
@given(content()) | |||||
def test_lookup_content_raw(archive_data, content): | |||||
actual_content = service.lookup_content_raw( | actual_content = service.lookup_content_raw( | ||||
'sha256:%s' % content['sha256']) | 'sha256:%s' % content['sha256']) | ||||
expected_content = self.content_get(content['sha1']) | expected_content = archive_data.content_get(content['sha1']) | ||||
self.assertEqual(actual_content, expected_content) | assert actual_content == expected_content | ||||
def test_lookup_content_not_found(self): | |||||
def test_lookup_content_not_found(): | |||||
unknown_content_ = random_content() | unknown_content_ = random_content() | ||||
with self.assertRaises(NotFoundExc) as cm: | with pytest.raises(NotFoundExc) as e: | ||||
service.lookup_content('sha1:%s' % unknown_content_['sha1']) | service.lookup_content('sha1:%s' % unknown_content_['sha1']) | ||||
self.assertIn(cm.exception.args[0], | assert e.match('Content with %s checksum equals to %s not found!' % | ||||
'Content with %s checksum equals to %s not found!' % | |||||
('sha1', unknown_content_['sha1'])) | ('sha1', unknown_content_['sha1'])) | ||||
@given(content()) | @given(content()) | ||||
def test_lookup_content_with_sha1(self, content): | def test_lookup_content_with_sha1(archive_data, content): | ||||
actual_content = service.lookup_content('sha1:%s' % content['sha1']) | |||||
actual_content = service.lookup_content( | expected_content = archive_data.content_get_metadata(content['sha1']) | ||||
'sha1:%s' % content['sha1']) | |||||
expected_content = self.content_get_metadata(content['sha1']) | assert actual_content == expected_content | ||||
self.assertEqual(actual_content, expected_content) | |||||
@given(content()) | @given(content()) | ||||
def test_lookup_content_with_sha256(self, content): | def test_lookup_content_with_sha256(archive_data, content): | ||||
actual_content = service.lookup_content('sha256:%s' % content['sha256']) | |||||
actual_content = service.lookup_content( | |||||
'sha256:%s' % content['sha256']) | |||||
expected_content = self.content_get_metadata(content['sha1']) | expected_content = archive_data.content_get_metadata(content['sha1']) | ||||
self.assertEqual(actual_content, expected_content) | assert actual_content == expected_content | ||||
def test_lookup_directory_bad_checksum(self): | |||||
with self.assertRaises(BadInputExc): | def test_lookup_directory_bad_checksum(): | ||||
with pytest.raises(BadInputExc): | |||||
service.lookup_directory('directory_id') | service.lookup_directory('directory_id') | ||||
def test_lookup_directory_not_found(self): | |||||
def test_lookup_directory_not_found(): | |||||
unknown_directory_ = random_sha1() | unknown_directory_ = random_sha1() | ||||
with self.assertRaises(NotFoundExc) as cm: | with pytest.raises(NotFoundExc) as e: | ||||
service.lookup_directory(unknown_directory_) | service.lookup_directory(unknown_directory_) | ||||
self.assertIn('Directory with sha1_git %s not found' | assert e.match('Directory with sha1_git %s not found' % unknown_directory_) | ||||
% unknown_directory_, cm.exception.args[0]) | |||||
@given(directory()) | @given(directory()) | ||||
def test_lookup_directory(self, directory): | def test_lookup_directory(archive_data, directory): | ||||
actual_directory_ls = list(service.lookup_directory(directory)) | |||||
actual_directory_ls = list(service.lookup_directory( | expected_directory_ls = archive_data.directory_ls(directory) | ||||
directory)) | |||||
expected_directory_ls = self.directory_ls(directory) | assert actual_directory_ls == expected_directory_ls | ||||
self.assertEqual(actual_directory_ls, expected_directory_ls) | |||||
@given(empty_directory()) | @given(empty_directory()) | ||||
def test_lookup_directory_empty(self, empty_directory): | def test_lookup_directory_empty(empty_directory): | ||||
actual_directory_ls = list(service.lookup_directory(empty_directory)) | actual_directory_ls = list(service.lookup_directory(empty_directory)) | ||||
self.assertEqual(actual_directory_ls, []) | assert actual_directory_ls == [] | ||||
@given(origin()) | @given(origin()) | ||||
def test_lookup_revision_by_nothing_found(self, origin): | def test_lookup_revision_by_nothing_found(origin): | ||||
with pytest.raises(NotFoundExc): | |||||
service.lookup_revision_by(origin['url'], 'invalid-branch-name') | |||||
with self.assertRaises(NotFoundExc): | |||||
service.lookup_revision_by( | |||||
origin['url'], 'invalid-branch-name') | |||||
@given(origin()) | @given(origin()) | ||||
def test_lookup_revision_by(self, origin): | def test_lookup_revision_by(archive_data, origin): | ||||
branches = _get_origin_branches(archive_data, origin) | |||||
branches = self._get_origin_branches(origin) | |||||
branch_name = random.choice(list(branches.keys())) | branch_name = random.choice(list(branches.keys())) | ||||
actual_revision = \ | actual_revision = service.lookup_revision_by(origin['url'], branch_name) | ||||
service.lookup_revision_by(origin['url'], branch_name, None) | |||||
expected_revision = \ | expected_revision = archive_data.revision_get( | ||||
self.revision_get(branches[branch_name]['target']) | branches[branch_name]['target']) | ||||
self.assertEqual(actual_revision, expected_revision) | assert actual_revision == expected_revision | ||||
@given(origin(), revision()) | |||||
def test_lookup_revision_with_context_by_ko(self, origin, revision): | |||||
with self.assertRaises(NotFoundExc): | @given(origin(), revision()) | ||||
def test_lookup_revision_with_context_by_ko(origin, revision): | |||||
with pytest.raises(NotFoundExc): | |||||
service.lookup_revision_with_context_by(origin['url'], | service.lookup_revision_with_context_by(origin['url'], | ||||
'invalid-branch-name', | 'invalid-branch-name', | ||||
None, | None, revision) | ||||
revision) | |||||
@given(origin()) | |||||
def test_lookup_revision_with_context_by(self, origin): | |||||
branches = self._get_origin_branches(origin) | @given(origin()) | ||||
def test_lookup_revision_with_context_by(archive_data, origin): | |||||
branches = _get_origin_branches(archive_data, origin) | |||||
branch_name = random.choice(list(branches.keys())) | branch_name = random.choice(list(branches.keys())) | ||||
root_rev = branches[branch_name]['target'] | root_rev = branches[branch_name]['target'] | ||||
root_rev_log = self.revision_log(root_rev) | root_rev_log = archive_data.revision_log(root_rev) | ||||
children = defaultdict(list) | children = defaultdict(list) | ||||
for rev in root_rev_log: | for rev in root_rev_log: | ||||
for rev_p in rev['parents']: | for rev_p in rev['parents']: | ||||
children[rev_p].append(rev['id']) | children[rev_p].append(rev['id']) | ||||
rev = root_rev_log[-1]['id'] | rev = root_rev_log[-1]['id'] | ||||
actual_root_rev, actual_rev = service.lookup_revision_with_context_by( | actual_root_rev, actual_rev = service.lookup_revision_with_context_by( | ||||
origin['url'], branch_name, None, rev) | origin['url'], branch_name, None, rev) | ||||
expected_root_rev = self.revision_get(root_rev) | expected_root_rev = archive_data.revision_get(root_rev) | ||||
expected_rev = self.revision_get(rev) | expected_rev = archive_data.revision_get(rev) | ||||
expected_rev['children'] = children[rev] | expected_rev['children'] = children[rev] | ||||
self.assertEqual(actual_root_rev, expected_root_rev) | assert actual_root_rev == expected_root_rev | ||||
self.assertEqual(actual_rev, expected_rev) | assert actual_rev == expected_rev | ||||
def test_lookup_revision_through_ko_not_implemented(self): | |||||
with self.assertRaises(NotImplementedError): | def test_lookup_revision_through_ko_not_implemented(): | ||||
service.lookup_revision_through({ | with pytest.raises(NotImplementedError): | ||||
'something-unknown': 10, | service.lookup_revision_through({'something-unknown': 10}) | ||||
}) | |||||
@given(origin()) | |||||
def test_lookup_revision_through_with_context_by(self, origin): | |||||
branches = self._get_origin_branches(origin) | @given(origin()) | ||||
def test_lookup_revision_through_with_context_by(archive_data, origin): | |||||
branches = _get_origin_branches(archive_data, origin) | |||||
branch_name = random.choice(list(branches.keys())) | branch_name = random.choice(list(branches.keys())) | ||||
root_rev = branches[branch_name]['target'] | root_rev = branches[branch_name]['target'] | ||||
root_rev_log = self.revision_log(root_rev) | root_rev_log = archive_data.revision_log(root_rev) | ||||
rev = root_rev_log[-1]['id'] | rev = root_rev_log[-1]['id'] | ||||
self.assertEqual(service.lookup_revision_through({ | assert service.lookup_revision_through({ | ||||
'origin_url': origin['url'], | 'origin_url': origin['url'], | ||||
'branch_name': branch_name, | 'branch_name': branch_name, | ||||
'ts': None, | 'ts': None, | ||||
'sha1_git': rev | 'sha1_git': rev | ||||
}), | }) == service.lookup_revision_with_context_by(origin['url'], branch_name, | ||||
service.lookup_revision_with_context_by( | None, rev) | ||||
origin['url'], branch_name, None, rev) | |||||
) | |||||
@given(origin()) | |||||
def test_lookup_revision_through_with_revision_by(self, origin): | |||||
branches = self._get_origin_branches(origin) | @given(origin()) | ||||
def test_lookup_revision_through_with_revision_by(archive_data, origin): | |||||
branches = _get_origin_branches(archive_data, origin) | |||||
branch_name = random.choice(list(branches.keys())) | branch_name = random.choice(list(branches.keys())) | ||||
self.assertEqual(service.lookup_revision_through({ | assert service.lookup_revision_through({ | ||||
'origin_url': origin['url'], | 'origin_url': origin['url'], | ||||
'branch_name': branch_name, | 'branch_name': branch_name, | ||||
'ts': None, | 'ts': None, | ||||
}), | }) == service.lookup_revision_by(origin['url'], branch_name, None) | ||||
service.lookup_revision_by( | |||||
origin['url'], branch_name, None) | |||||
) | |||||
@given(ancestor_revisions()) | |||||
def test_lookup_revision_through_with_context(self, ancestor_revisions): | |||||
@given(ancestor_revisions()) | |||||
def test_lookup_revision_through_with_context(ancestor_revisions): | |||||
sha1_git = ancestor_revisions['sha1_git'] | sha1_git = ancestor_revisions['sha1_git'] | ||||
sha1_git_root = ancestor_revisions['sha1_git_root'] | sha1_git_root = ancestor_revisions['sha1_git_root'] | ||||
self.assertEqual(service.lookup_revision_through({ | assert service.lookup_revision_through({ | ||||
'sha1_git_root': sha1_git_root, | 'sha1_git_root': sha1_git_root, | ||||
'sha1_git': sha1_git, | 'sha1_git': sha1_git, | ||||
}), | }) == service.lookup_revision_with_context(sha1_git_root, sha1_git) | ||||
service.lookup_revision_with_context( | |||||
sha1_git_root, sha1_git) | |||||
) | |||||
@given(revision()) | @given(revision()) | ||||
def test_lookup_revision_through_with_revision(self, revision): | def test_lookup_revision_through_with_revision(revision): | ||||
assert service.lookup_revision_through({ | |||||
self.assertEqual(service.lookup_revision_through({ | |||||
'sha1_git': revision | 'sha1_git': revision | ||||
}), | }) == service.lookup_revision(revision) | ||||
service.lookup_revision(revision) | |||||
) | |||||
@given(revision()) | |||||
def test_lookup_directory_through_revision_ko_not_found(self, revision): | |||||
with self.assertRaises(NotFoundExc): | @given(revision()) | ||||
def test_lookup_directory_through_revision_ko_not_found(revision): | |||||
with pytest.raises(NotFoundExc): | |||||
service.lookup_directory_through_revision( | service.lookup_directory_through_revision( | ||||
{'sha1_git': revision}, 'some/invalid/path') | {'sha1_git': revision}, 'some/invalid/path') | ||||
@given(revision()) | |||||
def test_lookup_directory_through_revision_ok(self, revision): | |||||
revision_data = self.revision_get(revision) | @given(revision()) | ||||
dir_entries = [e for e in self.directory_ls(revision_data['directory']) | def test_lookup_directory_through_revision_ok(archive_data, revision): | ||||
rev_data = archive_data.revision_get(revision) | |||||
dir_entries = [e for e in archive_data.directory_ls(rev_data['directory']) | |||||
if e['type'] == 'file'] | if e['type'] == 'file'] | ||||
dir_entry = random.choice(dir_entries) | dir_entry = random.choice(dir_entries) | ||||
self.assertEqual( | assert service.lookup_directory_through_revision( | ||||
service.lookup_directory_through_revision({'sha1_git': revision}, | {'sha1_git': revision}, dir_entry['name'] | ||||
dir_entry['name']), | ) == (revision, | ||||
(revision, | service.lookup_directory_with_revision(revision, dir_entry['name'])) | ||||
service.lookup_directory_with_revision( | |||||
revision, dir_entry['name'])) | |||||
) | |||||
@given(revision()) | |||||
def test_lookup_directory_through_revision_ok_with_data(self, revision): | |||||
revision_data = self.revision_get(revision) | @given(revision()) | ||||
dir_entries = [e for e in self.directory_ls(revision_data['directory']) | def test_lookup_directory_through_revision_ok_with_data( | ||||
archive_data, revision): | |||||
rev_data = archive_data.revision_get(revision) | |||||
dir_entries = [e for e in archive_data.directory_ls(rev_data['directory']) | |||||
if e['type'] == 'file'] | if e['type'] == 'file'] | ||||
dir_entry = random.choice(dir_entries) | dir_entry = random.choice(dir_entries) | ||||
self.assertEqual( | assert service.lookup_directory_through_revision( | ||||
service.lookup_directory_through_revision({'sha1_git': revision}, | {'sha1_git': revision}, dir_entry['name'], with_data=True | ||||
dir_entry['name'], | ) == (revision, | ||||
with_data=True), | service.lookup_directory_with_revision(revision, dir_entry['name'], | ||||
(revision, | with_data=True)) | ||||
service.lookup_directory_with_revision( | |||||
revision, dir_entry['name'], with_data=True)) | |||||
) |