Page MenuHomeSoftware Heritage

test_service.py
No OneTemporary

test_service.py

# Copyright (C) 2015-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import itertools
import pytest
import random
from collections import defaultdict
from hypothesis import given
from swh.model.hashutil import hash_to_bytes, hash_to_hex
from swh.model.from_disk import DentryPerms
from swh.model.identifiers import (
CONTENT, DIRECTORY, RELEASE, REVISION, SNAPSHOT
)
from swh.model.model import Directory, DirectoryEntry, Origin, Revision
from swh.web.common import service
from swh.web.common.exc import BadInputExc, NotFoundExc
from swh.web.tests.data import random_sha1, random_content
from swh.web.tests.strategies import (
content, unknown_content, contents, unknown_contents,
contents_with_ctags, origin, new_origin, visit_dates, directory,
unknown_directory, release, unknown_release, revision, unknown_revision,
revisions, ancestor_revisions, non_ancestor_revisions, invalid_sha1,
sha256, revision_with_submodules, empty_directory, new_revision,
snapshot, unknown_snapshot
)
from swh.web.tests.conftest import ctags_json_missing, fossology_missing
@given(contents())
def test_lookup_multiple_hashes_all_present(contents):
input_data = []
expected_output = []
for cnt in contents:
input_data.append({'sha1': cnt['sha1']})
expected_output.append({'sha1': cnt['sha1'],
'found': True})
assert service.lookup_multiple_hashes(input_data) == expected_output
@given(contents(), unknown_contents())
def test_lookup_multiple_hashes_some_missing(contents, unknown_contents):
input_contents = list(itertools.chain(contents, unknown_contents))
random.shuffle(input_contents)
input_data = []
expected_output = []
for cnt in input_contents:
input_data.append({'sha1': cnt['sha1']})
expected_output.append({'sha1': cnt['sha1'],
'found': cnt in contents})
assert service.lookup_multiple_hashes(input_data) == expected_output
def test_lookup_hash_does_not_exist():
unknown_content_ = random_content()
actual_lookup = service.lookup_hash('sha1_git:%s' %
unknown_content_['sha1_git'])
assert actual_lookup == {'found': None, 'algo': 'sha1_git'}
@given(content())
def test_lookup_hash_exist(archive_data, content):
actual_lookup = service.lookup_hash('sha1:%s' % content['sha1'])
content_metadata = archive_data.content_get_metadata(content['sha1'])
assert {'found': content_metadata, 'algo': 'sha1'} == actual_lookup
def test_search_hash_does_not_exist():
unknown_content_ = random_content()
actual_lookup = service.search_hash('sha1_git:%s' %
unknown_content_['sha1_git'])
assert {'found': False} == actual_lookup
@given(content())
def test_search_hash_exist(content):
actual_lookup = service.search_hash('sha1:%s' % content['sha1'])
assert {'found': True} == actual_lookup
@pytest.mark.skipif(ctags_json_missing,
reason="requires ctags with json output support")
@given(contents_with_ctags())
def test_lookup_content_ctags(indexer_data, contents_with_ctags):
content_sha1 = random.choice(contents_with_ctags['sha1s'])
indexer_data.content_add_ctags(content_sha1)
actual_ctags = list(service.lookup_content_ctags('sha1:%s' % content_sha1))
expected_data = list(indexer_data.content_get_ctags(content_sha1))
for ctag in expected_data:
ctag['id'] = content_sha1
assert actual_ctags == expected_data
def test_lookup_content_ctags_no_hash():
unknown_content_ = random_content()
actual_ctags = list(service.lookup_content_ctags('sha1:%s' %
unknown_content_['sha1']))
assert actual_ctags == []
@given(content())
def test_lookup_content_filetype(indexer_data, content):
indexer_data.content_add_mimetype(content['sha1'])
actual_filetype = service.lookup_content_filetype(content['sha1'])
expected_filetype = indexer_data.content_get_mimetype(content['sha1'])
assert actual_filetype == expected_filetype
@pytest.mark.skip # Language indexer is disabled.
@given(content())
def test_lookup_content_language(indexer_data, content):
indexer_data.content_add_language(content['sha1'])
actual_language = service.lookup_content_language(content['sha1'])
expected_language = indexer_data.content_get_language(content['sha1'])
assert actual_language == expected_language
@given(contents_with_ctags())
def test_lookup_expression(indexer_data, contents_with_ctags):
per_page = 10
expected_ctags = []
for content_sha1 in contents_with_ctags['sha1s']:
if len(expected_ctags) == per_page:
break
indexer_data.content_add_ctags(content_sha1)
for ctag in indexer_data.content_get_ctags(content_sha1):
if len(expected_ctags) == per_page:
break
if ctag['name'] == contents_with_ctags['symbol_name']:
del ctag['id']
ctag['sha1'] = content_sha1
expected_ctags.append(ctag)
actual_ctags = list(
service.lookup_expression(contents_with_ctags['symbol_name'],
last_sha1=None, per_page=10))
assert actual_ctags == expected_ctags
def test_lookup_expression_no_result():
expected_ctags = []
actual_ctags = list(service.lookup_expression('barfoo', last_sha1=None,
per_page=10))
assert actual_ctags == expected_ctags
@pytest.mark.skipif(fossology_missing,
reason="requires fossology-nomossa installed")
@given(content())
def test_lookup_content_license(indexer_data, content):
indexer_data.content_add_license(content['sha1'])
actual_license = service.lookup_content_license(content['sha1'])
expected_license = indexer_data.content_get_license(content['sha1'])
assert actual_license == expected_license
def test_stat_counters(archive_data):
actual_stats = service.stat_counters()
assert actual_stats == archive_data.stat_counters()
@given(new_origin(), visit_dates())
def test_lookup_origin_visits(archive_data, new_origin, visit_dates):
archive_data.origin_add_one(new_origin)
for ts in visit_dates:
archive_data.origin_visit_add(
new_origin.url, ts, type='git')
actual_origin_visits = list(
service.lookup_origin_visits(new_origin.url, per_page=100))
expected_visits = archive_data.origin_visit_get(new_origin.url)
for expected_visit in expected_visits:
expected_visit['origin'] = new_origin.url
assert actual_origin_visits == expected_visits
@given(new_origin(), visit_dates())
def test_lookup_origin_visit(archive_data, new_origin, visit_dates):
archive_data.origin_add_one(new_origin)
visits = []
for ts in visit_dates:
visits.append(archive_data.origin_visit_add(
new_origin.url, ts, type='git'))
visit = random.choice(visits)['visit']
actual_origin_visit = service.lookup_origin_visit(
new_origin.url, visit)
expected_visit = dict(archive_data.origin_visit_get_by(
new_origin.url, visit))
assert actual_origin_visit == expected_visit
@given(new_origin())
def test_lookup_origin(archive_data, new_origin):
archive_data.origin_add_one(new_origin)
actual_origin = service.lookup_origin({'url': new_origin.url})
expected_origin = archive_data.origin_get(
{'url': new_origin.url})
assert actual_origin == expected_origin
@given(invalid_sha1())
def test_lookup_release_ko_id_checksum_not_a_sha1(invalid_sha1):
with pytest.raises(BadInputExc) as e:
service.lookup_release(invalid_sha1)
assert e.match('Invalid checksum')
@given(sha256())
def test_lookup_release_ko_id_checksum_too_long(sha256):
with pytest.raises(BadInputExc) as e:
service.lookup_release(sha256)
assert e.match('Only sha1_git is supported.')
@given(directory())
def test_lookup_directory_with_path_not_found(directory):
path = 'some/invalid/path/here'
with pytest.raises(NotFoundExc) as e:
service.lookup_directory_with_path(directory, path)
assert e.match('Directory entry with path %s from %s not found' %
(path, directory))
@given(directory())
def test_lookup_directory_with_path_found(archive_data, directory):
directory_content = archive_data.directory_ls(directory)
directory_entry = random.choice(directory_content)
path = directory_entry['name']
actual_result = service.lookup_directory_with_path(directory, path)
assert actual_result == directory_entry
@given(release())
def test_lookup_release(archive_data, release):
actual_release = service.lookup_release(release)
assert actual_release == archive_data.release_get(release)
@given(revision(), invalid_sha1(), sha256())
def test_lookup_revision_with_context_ko_not_a_sha1(revision,
invalid_sha1,
sha256):
sha1_git_root = revision
sha1_git = invalid_sha1
with pytest.raises(BadInputExc) as e:
service.lookup_revision_with_context(sha1_git_root, sha1_git)
assert e.match('Invalid checksum query string')
sha1_git = sha256
with pytest.raises(BadInputExc) as e:
service.lookup_revision_with_context(sha1_git_root, sha1_git)
assert e.match('Only sha1_git is supported')
@given(revision(), unknown_revision())
def test_lookup_revision_with_context_ko_sha1_git_does_not_exist(
revision, unknown_revision):
sha1_git_root = revision
sha1_git = unknown_revision
with pytest.raises(NotFoundExc) as e:
service.lookup_revision_with_context(sha1_git_root, sha1_git)
assert e.match('Revision %s not found' % sha1_git)
@given(revision(), unknown_revision())
def test_lookup_revision_with_context_ko_root_sha1_git_does_not_exist(
revision, unknown_revision):
sha1_git_root = unknown_revision
sha1_git = revision
with pytest.raises(NotFoundExc) as e:
service.lookup_revision_with_context(sha1_git_root, sha1_git)
assert e.match('Revision root %s not found' % sha1_git_root)
@given(ancestor_revisions())
def test_lookup_revision_with_context(archive_data, ancestor_revisions):
sha1_git = ancestor_revisions['sha1_git']
root_sha1_git = ancestor_revisions['sha1_git_root']
for sha1_git_root in (root_sha1_git,
{'id': hash_to_bytes(root_sha1_git)}):
actual_revision = service.lookup_revision_with_context(sha1_git_root,
sha1_git)
children = []
for rev in archive_data.revision_log(root_sha1_git):
for p_rev in rev['parents']:
p_rev_hex = hash_to_hex(p_rev)
if p_rev_hex == sha1_git:
children.append(rev['id'])
expected_revision = archive_data.revision_get(sha1_git)
expected_revision['children'] = children
assert actual_revision == expected_revision
@given(non_ancestor_revisions())
def test_lookup_revision_with_context_ko(non_ancestor_revisions):
sha1_git = non_ancestor_revisions['sha1_git']
root_sha1_git = non_ancestor_revisions['sha1_git_root']
with pytest.raises(NotFoundExc) as e:
service.lookup_revision_with_context(root_sha1_git, sha1_git)
assert e.match('Revision %s is not an ancestor of %s' %
(sha1_git, root_sha1_git))
def test_lookup_directory_with_revision_not_found():
unknown_revision_ = random_sha1()
with pytest.raises(NotFoundExc) as e:
service.lookup_directory_with_revision(unknown_revision_)
assert e.match('Revision %s not found' % unknown_revision_)
@given(new_revision())
def test_lookup_directory_with_revision_unknown_content(archive_data,
new_revision):
unknown_content_ = random_content()
dir_path = 'README.md'
# A directory that points to unknown content
dir = Directory(entries=[
DirectoryEntry(
name=bytes(dir_path.encode('utf-8')),
type='file',
target=hash_to_bytes(unknown_content_['sha1_git']),
perms=DentryPerms.content
)
])
# Create a revision that points to a directory
# Which points to unknown content
new_revision = new_revision.to_dict()
new_revision['directory'] = dir.id
del new_revision['id']
new_revision = Revision.from_dict(new_revision)
# Add the directory and revision in mem
archive_data.directory_add([dir])
archive_data.revision_add([new_revision])
new_revision_id = hash_to_hex(new_revision.id)
with pytest.raises(NotFoundExc) as e:
service.lookup_directory_with_revision(new_revision_id, dir_path)
assert e.match('Content not found for revision %s' % new_revision_id)
@given(revision())
def test_lookup_directory_with_revision_ko_path_to_nowhere(revision):
invalid_path = 'path/to/something/unknown'
with pytest.raises(NotFoundExc) as e:
service.lookup_directory_with_revision(revision, invalid_path)
assert e.match('Directory or File')
assert e.match(invalid_path)
assert e.match('revision %s' % revision)
assert e.match('not found')
@given(revision_with_submodules())
def test_lookup_directory_with_revision_submodules(archive_data,
revision_with_submodules):
rev_sha1_git = revision_with_submodules['rev_sha1_git']
rev_dir_path = revision_with_submodules['rev_dir_rev_path']
actual_data = service.lookup_directory_with_revision(
rev_sha1_git, rev_dir_path)
revision = archive_data.revision_get(
revision_with_submodules['rev_sha1_git'])
directory = archive_data.directory_ls(revision['directory'])
rev_entry = next(e for e in directory if e['name'] == rev_dir_path)
expected_data = {
'content': archive_data.revision_get(rev_entry['target']),
'path': rev_dir_path,
'revision': rev_sha1_git,
'type': 'rev'
}
assert actual_data == expected_data
@given(revision())
def test_lookup_directory_with_revision_without_path(archive_data, revision):
actual_directory_entries = service.lookup_directory_with_revision(revision)
revision_data = archive_data.revision_get(revision)
expected_directory_entries = archive_data.directory_ls(
revision_data['directory'])
assert actual_directory_entries['type'] == 'dir'
assert actual_directory_entries['content'] == expected_directory_entries
@given(revision())
def test_lookup_directory_with_revision_with_path(archive_data, revision):
rev_data = archive_data.revision_get(revision)
dir_entries = [e for e in archive_data.directory_ls(rev_data['directory'])
if e['type'] in ('file', 'dir')]
expected_dir_entry = random.choice(dir_entries)
actual_dir_entry = service.lookup_directory_with_revision(
revision, expected_dir_entry['name'])
assert actual_dir_entry['type'] == expected_dir_entry['type']
assert actual_dir_entry['revision'] == revision
assert actual_dir_entry['path'] == expected_dir_entry['name']
if actual_dir_entry['type'] == 'file':
del actual_dir_entry['content']['checksums']['blake2s256']
for key in ('checksums', 'status', 'length'):
assert actual_dir_entry['content'][key] == expected_dir_entry[key]
else:
sub_dir_entries = archive_data.directory_ls(
expected_dir_entry['target'])
assert actual_dir_entry['content'] == sub_dir_entries
@given(revision())
def test_lookup_directory_with_revision_with_path_to_file_and_data(
archive_data, revision):
rev_data = archive_data.revision_get(revision)
dir_entries = [e for e in archive_data.directory_ls(rev_data['directory'])
if e['type'] == 'file']
expected_dir_entry = random.choice(dir_entries)
expected_data = archive_data.content_get(
expected_dir_entry['checksums']['sha1'])
actual_dir_entry = service.lookup_directory_with_revision(
revision, expected_dir_entry['name'], with_data=True)
assert actual_dir_entry['type'] == expected_dir_entry['type']
assert actual_dir_entry['revision'] == revision
assert actual_dir_entry['path'] == expected_dir_entry['name']
del actual_dir_entry['content']['checksums']['blake2s256']
for key in ('checksums', 'status', 'length'):
assert actual_dir_entry['content'][key] == expected_dir_entry[key]
assert actual_dir_entry['content']['data'] == expected_data['data']
@given(revision())
def test_lookup_revision(archive_data, revision):
actual_revision = service.lookup_revision(revision)
assert actual_revision == archive_data.revision_get(revision)
@given(new_revision())
def test_lookup_revision_invalid_msg(archive_data, new_revision):
new_revision = new_revision.to_dict()
new_revision['message'] = b'elegant fix for bug \xff'
archive_data.revision_add([Revision.from_dict(new_revision)])
revision = service.lookup_revision(hash_to_hex(new_revision['id']))
assert revision['message'] is None
assert revision['message_decoding_failed'] is True
@given(new_revision())
def test_lookup_revision_msg_ok(archive_data, new_revision):
archive_data.revision_add([new_revision])
revision_message = service.lookup_revision_message(
hash_to_hex(new_revision.id))
assert revision_message == {'message': new_revision.message}
def test_lookup_revision_msg_no_rev():
unknown_revision_ = random_sha1()
with pytest.raises(NotFoundExc) as e:
service.lookup_revision_message(unknown_revision_)
assert e.match('Revision with sha1_git %s not found.' % unknown_revision_)
@given(revisions())
def test_lookup_revision_multiple(archive_data, revisions):
actual_revisions = list(service.lookup_revision_multiple(revisions))
expected_revisions = []
for rev in revisions:
expected_revisions.append(archive_data.revision_get(rev))
assert actual_revisions == expected_revisions
def test_lookup_revision_multiple_none_found():
unknown_revisions_ = [random_sha1(), random_sha1(), random_sha1()]
actual_revisions = list(
service.lookup_revision_multiple(unknown_revisions_))
assert actual_revisions == [None] * len(unknown_revisions_)
@given(revision())
def test_lookup_revision_log(archive_data, revision):
actual_revision_log = list(service.lookup_revision_log(revision, limit=25))
expected_revision_log = archive_data.revision_log(revision, limit=25)
assert actual_revision_log == expected_revision_log
def _get_origin_branches(archive_data, origin):
origin_visit = archive_data.origin_visit_get(origin['url'])[-1]
snapshot = archive_data.snapshot_get(origin_visit['snapshot'])
branches = {k: v for (k, v) in snapshot['branches'].items()
if v['target_type'] == 'revision'}
return branches
@given(origin())
def test_lookup_revision_log_by(archive_data, origin):
branches = _get_origin_branches(archive_data, origin)
branch_name = random.choice(list(branches.keys()))
actual_log = list(
service.lookup_revision_log_by(origin['url'], branch_name,
None, limit=25))
expected_log = archive_data.revision_log(
branches[branch_name]['target'], limit=25)
assert actual_log == expected_log
@given(origin())
def test_lookup_revision_log_by_notfound(origin):
with pytest.raises(NotFoundExc):
service.lookup_revision_log_by(
origin['url'], 'unknown_branch_name', None, limit=100)
def test_lookup_content_raw_not_found():
unknown_content_ = random_content()
with pytest.raises(NotFoundExc) as e:
service.lookup_content_raw('sha1:' + unknown_content_['sha1'])
assert e.match('Content with %s checksum equals to %s not found!' %
('sha1', unknown_content_['sha1']))
@given(content())
def test_lookup_content_raw(archive_data, content):
actual_content = service.lookup_content_raw(
'sha256:%s' % content['sha256'])
expected_content = archive_data.content_get(content['sha1'])
assert actual_content == expected_content
def test_lookup_content_not_found():
unknown_content_ = random_content()
with pytest.raises(NotFoundExc) as e:
service.lookup_content('sha1:%s' % unknown_content_['sha1'])
assert e.match('Content with %s checksum equals to %s not found!' %
('sha1', unknown_content_['sha1']))
@given(content())
def test_lookup_content_with_sha1(archive_data, content):
actual_content = service.lookup_content('sha1:%s' % content['sha1'])
expected_content = archive_data.content_get_metadata(content['sha1'])
assert actual_content == expected_content
@given(content())
def test_lookup_content_with_sha256(archive_data, content):
actual_content = service.lookup_content('sha256:%s' % content['sha256'])
expected_content = archive_data.content_get_metadata(content['sha1'])
assert actual_content == expected_content
def test_lookup_directory_bad_checksum():
with pytest.raises(BadInputExc):
service.lookup_directory('directory_id')
def test_lookup_directory_not_found():
unknown_directory_ = random_sha1()
with pytest.raises(NotFoundExc) as e:
service.lookup_directory(unknown_directory_)
assert e.match('Directory with sha1_git %s not found' % unknown_directory_)
@given(directory())
def test_lookup_directory(archive_data, directory):
actual_directory_ls = list(service.lookup_directory(directory))
expected_directory_ls = archive_data.directory_ls(directory)
assert actual_directory_ls == expected_directory_ls
@given(empty_directory())
def test_lookup_directory_empty(empty_directory):
actual_directory_ls = list(service.lookup_directory(empty_directory))
assert actual_directory_ls == []
@given(origin())
def test_lookup_revision_by_nothing_found(origin):
with pytest.raises(NotFoundExc):
service.lookup_revision_by(origin['url'], 'invalid-branch-name')
@given(origin())
def test_lookup_revision_by(archive_data, origin):
branches = _get_origin_branches(archive_data, origin)
branch_name = random.choice(list(branches.keys()))
actual_revision = service.lookup_revision_by(origin['url'], branch_name)
expected_revision = archive_data.revision_get(
branches[branch_name]['target'])
assert actual_revision == expected_revision
@given(origin(), revision())
def test_lookup_revision_with_context_by_ko(origin, revision):
with pytest.raises(NotFoundExc):
service.lookup_revision_with_context_by(origin['url'],
'invalid-branch-name',
None, revision)
@given(origin())
def test_lookup_revision_with_context_by(archive_data, origin):
branches = _get_origin_branches(archive_data, origin)
branch_name = random.choice(list(branches.keys()))
root_rev = branches[branch_name]['target']
root_rev_log = archive_data.revision_log(root_rev)
children = defaultdict(list)
for rev in root_rev_log:
for rev_p in rev['parents']:
children[rev_p].append(rev['id'])
rev = root_rev_log[-1]['id']
actual_root_rev, actual_rev = service.lookup_revision_with_context_by(
origin['url'], branch_name, None, rev)
expected_root_rev = archive_data.revision_get(root_rev)
expected_rev = archive_data.revision_get(rev)
expected_rev['children'] = children[rev]
assert actual_root_rev == expected_root_rev
assert actual_rev == expected_rev
def test_lookup_revision_through_ko_not_implemented():
with pytest.raises(NotImplementedError):
service.lookup_revision_through({'something-unknown': 10})
@given(origin())
def test_lookup_revision_through_with_context_by(archive_data, origin):
branches = _get_origin_branches(archive_data, origin)
branch_name = random.choice(list(branches.keys()))
root_rev = branches[branch_name]['target']
root_rev_log = archive_data.revision_log(root_rev)
rev = root_rev_log[-1]['id']
assert service.lookup_revision_through({
'origin_url': origin['url'],
'branch_name': branch_name,
'ts': None,
'sha1_git': rev
}) == service.lookup_revision_with_context_by(origin['url'], branch_name,
None, rev)
@given(origin())
def test_lookup_revision_through_with_revision_by(archive_data, origin):
branches = _get_origin_branches(archive_data, origin)
branch_name = random.choice(list(branches.keys()))
assert service.lookup_revision_through({
'origin_url': origin['url'],
'branch_name': branch_name,
'ts': None,
}) == service.lookup_revision_by(origin['url'], branch_name, None)
@given(ancestor_revisions())
def test_lookup_revision_through_with_context(ancestor_revisions):
sha1_git = ancestor_revisions['sha1_git']
sha1_git_root = ancestor_revisions['sha1_git_root']
assert service.lookup_revision_through({
'sha1_git_root': sha1_git_root,
'sha1_git': sha1_git,
}) == service.lookup_revision_with_context(sha1_git_root, sha1_git)
@given(revision())
def test_lookup_revision_through_with_revision(revision):
assert service.lookup_revision_through({
'sha1_git': revision
}) == service.lookup_revision(revision)
@given(revision())
def test_lookup_directory_through_revision_ko_not_found(revision):
with pytest.raises(NotFoundExc):
service.lookup_directory_through_revision(
{'sha1_git': revision}, 'some/invalid/path')
@given(revision())
def test_lookup_directory_through_revision_ok(archive_data, revision):
rev_data = archive_data.revision_get(revision)
dir_entries = [e for e in archive_data.directory_ls(rev_data['directory'])
if e['type'] == 'file']
dir_entry = random.choice(dir_entries)
assert service.lookup_directory_through_revision(
{'sha1_git': revision}, dir_entry['name']
) == (revision,
service.lookup_directory_with_revision(revision, dir_entry['name']))
@given(revision())
def test_lookup_directory_through_revision_ok_with_data(
archive_data, revision):
rev_data = archive_data.revision_get(revision)
dir_entries = [e for e in archive_data.directory_ls(rev_data['directory'])
if e['type'] == 'file']
dir_entry = random.choice(dir_entries)
assert service.lookup_directory_through_revision(
{'sha1_git': revision}, dir_entry['name'], with_data=True
) == (revision,
service.lookup_directory_with_revision(revision, dir_entry['name'],
with_data=True))
@given(content(), directory(), release(), revision(), snapshot())
def test_lookup_known_objects(archive_data, content, directory, release,
revision, snapshot):
expected = archive_data.content_find(content)
assert service.lookup_object(CONTENT, content['sha1_git']) == expected
expected = archive_data.directory_get(directory)
assert service.lookup_object(DIRECTORY, directory) == expected
expected = archive_data.release_get(release)
assert service.lookup_object(RELEASE, release) == expected
expected = archive_data.revision_get(revision)
assert service.lookup_object(REVISION, revision) == expected
expected = archive_data.snapshot_get(snapshot)
assert service.lookup_object(SNAPSHOT, snapshot) == expected
@given(unknown_content(), unknown_directory(), unknown_release(),
unknown_revision(), unknown_snapshot())
def test_lookup_unknown_objects(unknown_content, unknown_directory,
unknown_release, unknown_revision,
unknown_snapshot):
with pytest.raises(NotFoundExc) as e:
service.lookup_object(CONTENT, unknown_content['sha1_git'])
assert e.match(r'Content.*not found')
with pytest.raises(NotFoundExc) as e:
service.lookup_object(DIRECTORY, unknown_directory)
assert e.match(r'Directory.*not found')
with pytest.raises(NotFoundExc) as e:
service.lookup_object(RELEASE, unknown_release)
assert e.match(r'Release.*not found')
with pytest.raises(NotFoundExc) as e:
service.lookup_object(REVISION, unknown_revision)
assert e.match(r'Revision.*not found')
with pytest.raises(NotFoundExc) as e:
service.lookup_object(SNAPSHOT, unknown_snapshot)
assert e.match(r'Snapshot.*not found')
@given(invalid_sha1())
def test_lookup_invalid_objects(invalid_sha1):
with pytest.raises(BadInputExc) as e:
service.lookup_object('foo', invalid_sha1)
assert e.match('Invalid swh object type')
with pytest.raises(BadInputExc) as e:
service.lookup_object(CONTENT, invalid_sha1)
assert e.match('Invalid hash')
with pytest.raises(BadInputExc) as e:
service.lookup_object(DIRECTORY, invalid_sha1)
assert e.match('Invalid checksum')
with pytest.raises(BadInputExc) as e:
service.lookup_object(RELEASE, invalid_sha1)
assert e.match('Invalid checksum')
with pytest.raises(BadInputExc) as e:
service.lookup_object(REVISION, invalid_sha1)
assert e.match('Invalid checksum')
with pytest.raises(BadInputExc) as e:
service.lookup_object(SNAPSHOT, invalid_sha1)
assert e.match('Invalid checksum')
def test_lookup_missing_hashes_non_present():
missing_cnt = random_sha1()
missing_dir = random_sha1()
missing_rev = random_sha1()
missing_rel = random_sha1()
missing_snp = random_sha1()
grouped_pids = {
CONTENT: [hash_to_bytes(missing_cnt)],
DIRECTORY: [hash_to_bytes(missing_dir)],
REVISION: [hash_to_bytes(missing_rev)],
RELEASE: [hash_to_bytes(missing_rel)],
SNAPSHOT: [hash_to_bytes(missing_snp)],
}
actual_result = service.lookup_missing_hashes(grouped_pids)
assert actual_result == {missing_cnt, missing_dir, missing_rev,
missing_rel, missing_snp}
@given(content(), directory())
def test_lookup_missing_hashes_some_present(archive_data, content, directory):
missing_rev = random_sha1()
missing_rel = random_sha1()
missing_snp = random_sha1()
grouped_pids = {
CONTENT: [hash_to_bytes(content['sha1_git'])],
DIRECTORY: [hash_to_bytes(directory)],
REVISION: [hash_to_bytes(missing_rev)],
RELEASE: [hash_to_bytes(missing_rel)],
SNAPSHOT: [hash_to_bytes(missing_snp)],
}
actual_result = service.lookup_missing_hashes(grouped_pids)
assert actual_result == {missing_rev, missing_rel, missing_snp}
@given(origin())
def test_lookup_origin_extra_trailing_slash(origin):
origin_info = service.lookup_origin({'url': f"{origin['url']}/"})
assert origin_info['url'] == origin['url']
def test_lookup_origin_missing_trailing_slash(archive_data):
deb_origin = Origin(url='http://snapshot.debian.org/package/r-base/')
archive_data.origin_add_one(deb_origin)
origin_info = service.lookup_origin({'url': deb_origin.url[:-1]})
assert origin_info['url'] == deb_origin.url

File Metadata

Mime Type
text/x-python
Expires
Jun 4 2025, 6:37 PM (14 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3398661

Event Timeline