diff --git a/swh/web/api/views/snapshot.py b/swh/web/api/views/snapshot.py index fc39631e..9a7ce395 100644 --- a/swh/web/api/views/snapshot.py +++ b/swh/web/api/views/snapshot.py @@ -1,121 +1,116 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.web.common import service from swh.web.common.utils import reverse from swh.web.config import get_config from swh.web.api.apidoc import api_doc from swh.web.api import utils from swh.web.api.apiurls import api_route from swh.web.api.views.utils import api_lookup @api_route(r'/snapshot/(?P[0-9a-f]+)/', 'api-snapshot') @api_doc('/snapshot/') def api_snapshot(request, snapshot_id): """ .. http:get:: /api/1/snapshot/(snapshot_id)/ Get information about a snapshot in the archive. A snapshot is a set of named branches, which are pointers to objects at any level of the Software Heritage DAG. It represents a full picture of an origin at a given time. As well as pointing to other objects in the Software Heritage DAG, branches can also be aliases, in which case their target is the name of another branch in the same snapshot, or dangling, in which case the target is unknown. A snapshot identifier is a salted sha1. See :func:`swh.model.identifiers.snapshot_identifier` in our data model module for details about how they are computed. :param sha1 snapshot_id: a snapshot identifier :query str branches_from: optional parameter used to skip branches whose name is lesser than it before returning them :query int branches_count: optional parameter used to restrain the amount of returned branches (default to 1000) :query str target_types: optional comma separated list parameter used to filter the target types of branch to return (possible values that can be contained in that list are ``content``, ``directory``, ``revision``, ``release``, ``snapshot`` or ``alias``) :reqheader Accept: the requested response content type, either ``application/json`` (default) or ``application/yaml`` :resheader Content-Type: this depends on :http:header:`Accept` header of request :resheader Link: indicates that a subsequent result page is available and contains the url pointing to it :>json object branches: object containing all branches associated to the snapshot, for each of them the associated target type and id are given but also a link to get information about that target :>json string id: the unique identifier of the snapshot **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, :http:method:`options` :statuscode 200: no error :statuscode 400: an invalid snapshot identifier has been provided :statuscode 404: requested snapshot can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`snapshot/6a3a2cf0b2b90ce7ae1cf0a221ed68035b686f5a/` """ # noqa def _enrich_snapshot(snapshot): s = snapshot.copy() if 'branches' in s: s['branches'] = { k: utils.enrich_object(v) if v else None for k, v in s['branches'].items() } for k, v in s['branches'].items(): if v and v['target_type'] == 'alias': if v['target'] in s['branches']: branch_alias = s['branches'][v['target']] v['target_url'] = branch_alias['target_url'] else: snp = \ service.lookup_snapshot(s['id'], branches_from=v['target'], branches_count=1) if snp and v['target'] in snp['branches']: branch = snp['branches'][v['target']] branch = utils.enrich_object(branch) v['target_url'] = branch['target_url'] return s snapshot_content_max_size = get_config()['snapshot_content_max_size'] branches_from = request.GET.get('branches_from', '') branches_count = int(request.GET.get('branches_count', snapshot_content_max_size)) target_types = request.GET.get('target_types', None) target_types = target_types.split(',') if target_types else None results = api_lookup( service.lookup_snapshot, snapshot_id, branches_from, - branches_count+1, target_types, + branches_count, target_types, notfound_msg='Snapshot with id {} not found.'.format(snapshot_id), enrich_fn=_enrich_snapshot) - next_branch = None - if len(results['branches']) > branches_count: - next_branch = sorted(results['branches'].keys())[-1] - del results['branches'][next_branch] - response = {'results': results, 'headers': {}} - if next_branch: + if results['next_branch'] is not None: response['headers']['link-next'] = \ reverse('api-snapshot', url_args={'snapshot_id': snapshot_id}, - query_params={'branches_from': next_branch, + query_params={'branches_from': results['next_branch'], 'branches_count': branches_count, 'target_types': target_types}) return response diff --git a/swh/web/tests/api/views/test_snapshot.py b/swh/web/tests/api/views/test_snapshot.py index 4004534b..7901204e 100644 --- a/swh/web/tests/api/views/test_snapshot.py +++ b/swh/web/tests/api/views/test_snapshot.py @@ -1,218 +1,160 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information +import random + +from hypothesis import given from rest_framework.test import APITestCase -from unittest.mock import patch -from swh.web.common.exc import BadInputExc, NotFoundExc from swh.web.common.utils import reverse +from swh.web.tests.strategies import snapshot, unknown_snapshot from swh.web.tests.testcase import WebTestCase -_snapshot_id = '36ce36946fcd0f39bdfc40727af4acfce81f67af' - -_snapshot_branches = [ - { - 'name': 'gh-pages', - 'target': 'refs/heads/gh-pages', - 'target_type': 'alias', - }, - { - 'name': 'latest', - 'target': 'refs/tags/v1.3.0', - 'target_type': 'alias', - }, - { - 'name': 'refs/heads/andresgalante-dismissable-badges', - 'target': '3af57e6db525015a25b4f3abb29263432e4af7b1', - 'target_type': 'revision', - }, - { - 'name': 'refs/heads/boom-toasted', - 'target': 'e3f4957facfbdc25fdc4c6d3f7fcf1c488f06cea', - 'target_type': 'revision', - }, - { - 'name': 'refs/heads/flex-checks', - 'target': '1b08ea1630a987d6f56f0c99e869896485bf230b', - 'target_type': 'revision', - }, - { - 'name': 'refs/heads/gh-pages', - 'target': 'ea4129886adec992483aa592db717f908b4be355', - 'target_type': 'revision', - }, - { - 'name': 'refs/heads/grid-vertical-align-height', - 'target': 'ba23eea651f483b88b78bee1084d7d0f61161e8d', - 'target_type': 'revision', - }, - { - 'name': 'refs/tags/v1.0.0', - 'target': 'd28343dc3ad53a411ae3685e7d6a7866c8c22d6b', - 'target_type': 'release', - }, - { - 'name': 'refs/tags/v1.1.0', - 'target': '0f11410b77140852f835ad456e5fbcf28760846c', - 'target_type': 'release', - }, - { - 'name': 'refs/tags/v1.1.1', - 'target': '4ca26e76ee4867bfcd65ecf81039f183fc1d3b4d', - 'target_type': 'release', - }, - { - 'name': 'refs/tags/v1.2.0', - 'target': 'cabae13db21e0e1db686f5628d1725c6191062ef', - 'target_type': 'release', - }, - { - 'name': 'refs/tags/v1.3.0', - 'target': '3c3d596d94501509bec1959a4cfb44b20bfa8606', - 'target_type': 'release', - } -] - - -def _lookup_snapshot(snapshot_id, branches_from='', - branches_count=None, target_types=None): - ret = {'id': snapshot_id, 'branches': {}} - count = 0 - for branch in _snapshot_branches: - if branches_count and count == branches_count: - break - if branch['name'] >= branches_from: - if not target_types or branch['target_type'] in target_types: # noqa - ret['branches'][branch['name']] = { - 'target': branch['target'], - 'target_type': branch['target_type'] - } - count += 1 - return ret - - -def _get_branch_url(target_type, target): - url = None - if target_type == 'revision': - url = reverse('api-revision', url_args={'sha1_git': target}) - if target_type == 'release': - url = reverse('api-release', url_args={'sha1_git': target}) - return url - - -def _enrich_snapshot_data(snapshot_data): - for branch in snapshot_data['branches'].keys(): - target = snapshot_data['branches'][branch]['target'] - target_type = snapshot_data['branches'][branch]['target_type'] - snapshot_data['branches'][branch]['target_url'] = \ - _get_branch_url(target_type, target) - for branch in snapshot_data['branches'].keys(): - target = snapshot_data['branches'][branch]['target'] - target_type = snapshot_data['branches'][branch]['target_type'] - if target_type == 'alias': - if target in snapshot_data['branches']: - snapshot_data['branches'][branch]['target_url'] = \ - snapshot_data['branches'][target]['target_url'] - else: - snp = _lookup_snapshot(snapshot_data['id'], - branches_from=target, - branches_count=1) - alias_target = snp['branches'][target]['target'] - alias_target_type = snp['branches'][target]['target_type'] - snapshot_data['branches'][branch]['target_url'] = \ - _get_branch_url(alias_target_type, alias_target) - - return snapshot_data - -@patch('swh.web.api.views.snapshot.service') class SnapshotApiTestCase(WebTestCase, APITestCase): - def test_api_snapshot(self, mock_service): - mock_service.lookup_snapshot.side_effect = _lookup_snapshot + @given(snapshot()) + def test_api_snapshot(self, snapshot): url = reverse('api-snapshot', - url_args={'snapshot_id': _snapshot_id}) + url_args={'snapshot_id': snapshot}) rv = self.client.get(url) + self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/json') - expected_data = _lookup_snapshot(_snapshot_id) - expected_data = _enrich_snapshot_data(expected_data) + expected_data = self.snapshot_get(snapshot) + expected_data = self._enrich_snapshot(expected_data) self.assertEqual(rv.data, expected_data) - def test_api_snapshot_paginated(self, mock_service): - mock_service.lookup_snapshot.side_effect = _lookup_snapshot + @given(snapshot()) + def test_api_snapshot_paginated(self, snapshot): branches_offset = 0 branches_count = 2 - whole_snapshot = {'id': _snapshot_id, 'branches': {}} + snapshot_branches = [] - while branches_offset < len(_snapshot_branches): - branches_from = _snapshot_branches[branches_offset]['name'] + for k, v in sorted(self.snapshot_get(snapshot)['branches'].items()): + snapshot_branches.append({ + 'name': k, + 'target_type': v['target_type'], + 'target': v['target'] + }) + + whole_snapshot = {'id': snapshot, 'branches': {}, 'next_branch': None} + + while branches_offset < len(snapshot_branches): + branches_from = snapshot_branches[branches_offset]['name'] url = reverse('api-snapshot', - url_args={'snapshot_id': _snapshot_id}, + url_args={'snapshot_id': snapshot}, query_params={'branches_from': branches_from, 'branches_count': branches_count}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/json') - expected_data = _lookup_snapshot(_snapshot_id, branches_from, - branches_count) - expected_data = _enrich_snapshot_data(expected_data) + expected_data = self.snapshot_get_branches(snapshot, branches_from, + branches_count) + + expected_data = self._enrich_snapshot(expected_data) + + branches_offset += branches_count + if branches_offset < len(snapshot_branches): + next_branch = snapshot_branches[branches_offset]['name'] + expected_data['next_branch'] = next_branch + else: + expected_data['next_branch'] = None + self.assertEqual(rv.data, expected_data) whole_snapshot['branches'].update(expected_data['branches']) - branches_offset += branches_count - if branches_offset < len(_snapshot_branches): - next_branch = _snapshot_branches[branches_offset]['name'] # noqa - next_url = reverse('api-snapshot', - url_args={'snapshot_id': _snapshot_id}, - query_params={'branches_from': next_branch, - 'branches_count': branches_count}) # noqa + if branches_offset < len(snapshot_branches): + next_url = reverse( + 'api-snapshot', + url_args={'snapshot_id': snapshot}, + query_params={'branches_from': next_branch, + 'branches_count': branches_count}) self.assertEqual(rv['Link'], '<%s>; rel="next"' % next_url) else: self.assertFalse(rv.has_header('Link')) url = reverse('api-snapshot', - url_args={'snapshot_id': _snapshot_id}) + url_args={'snapshot_id': snapshot}) rv = self.client.get(url) + self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, whole_snapshot) - def test_api_snapshot_filtered(self, mock_service): - mock_service.lookup_snapshot.side_effect = _lookup_snapshot + @given(snapshot()) + def test_api_snapshot_filtered(self, snapshot): + + snapshot_branches = [] - target_types = 'release' + for k, v in sorted(self.snapshot_get(snapshot)['branches'].items()): + snapshot_branches.append({ + 'name': k, + 'target_type': v['target_type'], + 'target': v['target'] + }) + + target_type = random.choice(snapshot_branches)['target_type'] url = reverse('api-snapshot', - url_args={'snapshot_id': _snapshot_id}, - query_params={'target_types': target_types}) + url_args={'snapshot_id': snapshot}, + query_params={'target_types': target_type}) rv = self.client.get(url) + + expected_data = self.snapshot_get_branches( + snapshot, target_types=target_type) + expected_data = self._enrich_snapshot(expected_data) + self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/json') - expected_data = _lookup_snapshot(_snapshot_id, - target_types=target_types.split(',')) - expected_data = _enrich_snapshot_data(expected_data) self.assertEqual(rv.data, expected_data) - def test_api_snapshot_errors(self, mock_service): - mock_service.lookup_snapshot.side_effect = \ - BadInputExc('Invalid snapshot id!') + @given(unknown_snapshot()) + def test_api_snapshot_errors(self, unknown_snapshot): url = reverse('api-snapshot', url_args={'snapshot_id': '63ce369'}) rv = self.client.get(url) self.assertEqual(rv.status_code, 400) - mock_service.lookup_snapshot.side_effect = \ - NotFoundExc('Snapshot not found!') - - snapshot_inexistent = '63ce36946fcd0f79bdfc40727af4acfce81f67fa' url = reverse('api-snapshot', - url_args={'snapshot_id': snapshot_inexistent}) + url_args={'snapshot_id': unknown_snapshot}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404) + + def _enrich_snapshot(self, snapshot): + def _get_branch_url(target_type, target): + url = None + if target_type == 'revision': + url = reverse('api-revision', url_args={'sha1_git': target}) + if target_type == 'release': + url = reverse('api-release', url_args={'sha1_git': target}) + return url + + for branch in snapshot['branches'].keys(): + target = snapshot['branches'][branch]['target'] + target_type = snapshot['branches'][branch]['target_type'] + snapshot['branches'][branch]['target_url'] = \ + _get_branch_url(target_type, target) + for branch in snapshot['branches'].keys(): + target = snapshot['branches'][branch]['target'] + target_type = snapshot['branches'][branch]['target_type'] + if target_type == 'alias': + if target in snapshot['branches']: + snapshot['branches'][branch]['target_url'] = \ + snapshot['branches'][target]['target_url'] + else: + snp = self.snapshot_get_branches(snapshot['id'], + branches_from=target, + branches_count=1) + alias_target = snp['branches'][target]['target'] + alias_target_type = snp['branches'][target]['target_type'] + snapshot['branches'][branch]['target_url'] = \ + _get_branch_url(alias_target_type, alias_target) + + return snapshot diff --git a/swh/web/tests/testcase.py b/swh/web/tests/testcase.py index 70870838..73e36f46 100644 --- a/swh/web/tests/testcase.py +++ b/swh/web/tests/testcase.py @@ -1,175 +1,183 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import shutil from subprocess import run, PIPE from django.core.cache import cache from hypothesis.extra.django import TestCase from swh.model.hashutil import hash_to_bytes from swh.web import config from swh.web.common import converters, service from swh.web.tests.data import get_tests_data ctags_json_missing = \ shutil.which('ctags') is None or \ b'+json' not in run(['ctags', '--version'], stdout=PIPE).stdout fossology_missing = shutil.which('nomossa') is None class WebTestCase(TestCase): """Base TestCase class for swh-web. It is initialized with references to in-memory storages containing raw tests data. It also defines class methods to retrieve those tests data in a json serializable format in order to ease tests implementation. """ @classmethod def setUpClass(cls): super().setUpClass() tests_data = get_tests_data() cls.storage = tests_data['storage'] cls.idx_storage = tests_data['idx_storage'] cls.mimetype_indexer = tests_data['mimetype_indexer'] cls.language_indexer = tests_data['language_indexer'] cls.license_indexer = tests_data['license_indexer'] cls.ctags_indexer = tests_data['ctags_indexer'] # Update swh-web configuration to use the in-memory storage # instantiated in the tests.data module swh_config = config.get_config() swh_config.update({'storage': cls.storage}) service.storage = cls.storage # Update swh-web configuration to use the in-memory indexer storage # instantiated in the tests.data modules swh_config.update({'indexer_storage': cls.idx_storage}) service.idx_storage = cls.idx_storage @classmethod def content_add_mimetype(cls, cnt_id): cls.mimetype_indexer.run([hash_to_bytes(cnt_id)], 'update-dups') @classmethod def content_get_mimetype(cls, cnt_id): mimetype = next(cls.idx_storage.content_mimetype_get( [hash_to_bytes(cnt_id)])) return converters.from_filetype(mimetype) @classmethod def content_add_language(cls, cnt_id): cls.language_indexer.run([hash_to_bytes(cnt_id)], 'update-dups') @classmethod def content_get_language(cls, cnt_id): lang = next(cls.idx_storage.content_language_get( [hash_to_bytes(cnt_id)])) return converters.from_swh(lang, hashess={'id'}) @classmethod def content_add_license(cls, cnt_id): cls.license_indexer.run([hash_to_bytes(cnt_id)], 'update-dups') @classmethod def content_get_license(cls, cnt_id): cnt_id_bytes = hash_to_bytes(cnt_id) lic = next(cls.idx_storage.content_fossology_license_get( [cnt_id_bytes])) return converters.from_swh({'id': cnt_id_bytes, 'facts': lic[cnt_id_bytes]}, hashess={'id'}) @classmethod def content_add_ctags(cls, cnt_id): cls.ctags_indexer.run([hash_to_bytes(cnt_id)], 'update-dups') @classmethod def content_get_ctags(cls, cnt_id): cnt_id_bytes = hash_to_bytes(cnt_id) ctags = cls.idx_storage.content_ctags_get([cnt_id_bytes]) for ctag in ctags: yield converters.from_swh(ctag, hashess={'id'}) @classmethod def content_get_metadata(cls, cnt_id): cnt_id_bytes = hash_to_bytes(cnt_id) metadata = next(cls.storage.content_get_metadata([cnt_id_bytes])) return converters.from_swh(metadata, hashess={'sha1', 'sha1_git', 'sha256', 'blake2s256'}) @classmethod def content_get(cls, cnt_id): cnt_id_bytes = hash_to_bytes(cnt_id) cnt = next(cls.storage.content_get([cnt_id_bytes])) return converters.from_content(cnt) @classmethod def directory_ls(cls, dir_id): cnt_id_bytes = hash_to_bytes(dir_id) dir_content = map(converters.from_directory_entry, cls.storage.directory_ls(cnt_id_bytes)) return list(dir_content) @classmethod def release_get(cls, rel_id): rel_id_bytes = hash_to_bytes(rel_id) rel_data = next(cls.storage.release_get([rel_id_bytes])) return converters.from_release(rel_data) @classmethod def revision_get(cls, rev_id): rev_id_bytes = hash_to_bytes(rev_id) rev_data = next(cls.storage.revision_get([rev_id_bytes])) return converters.from_revision(rev_data) @classmethod def revision_log(cls, rev_id, limit=None): rev_id_bytes = hash_to_bytes(rev_id) return list(map(converters.from_revision, cls.storage.revision_log([rev_id_bytes], limit=limit))) @classmethod def snapshot_get_latest(cls, origin_id): snp = cls.storage.snapshot_get_latest(origin_id) return converters.from_snapshot(snp) @classmethod def origin_get(cls, origin_info): origin = cls.storage.origin_get(origin_info) return converters.from_origin(origin) @classmethod def origin_visit_get(cls, origin_id): visits = cls.storage.origin_visit_get(origin_id) return list(map(converters.from_origin_visit, visits)) @classmethod def origin_visit_get_by(cls, origin_id, visit_id): visit = cls.storage.origin_visit_get_by(origin_id, visit_id) return converters.from_origin_visit(visit) @classmethod def snapshot_get(cls, snapshot_id): snp = cls.storage.snapshot_get(hash_to_bytes(snapshot_id)) return converters.from_snapshot(snp) + @classmethod + def snapshot_get_branches(cls, snapshot_id, branches_from='', + branches_count=1000, target_types=None): + snp = cls.storage.snapshot_get_branches(hash_to_bytes(snapshot_id), + branches_from.encode(), + branches_count, target_types) + return converters.from_snapshot(snp) + @classmethod def person_get(cls, person_id): person = next(cls.storage.person_get([person_id])) return converters.from_person(person) def setUp(self): cache.clear()