diff --git a/swh/web/api/views/origin.py b/swh/web/api/views/origin.py index cb3c0625..7fe2801a 100644 --- a/swh/web/api/views/origin.py +++ b/swh/web/api/views/origin.py @@ -1,382 +1,383 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from distutils.util import strtobool from swh.web.common import service from swh.web.common.exc import BadInputExc from swh.web.common.origin_visits import get_origin_visits from swh.web.common.utils import reverse from swh.web.api.apidoc import api_doc from swh.web.api.apiurls import api_route from swh.web.api.views.utils import api_lookup def _enrich_origin(origin): if 'id' in origin: o = origin.copy() o['origin_visits_url'] = \ reverse('api-origin-visits', url_args={'origin_id': origin['id']}) return o return origin @api_route(r'/origin/(?P[0-9]+)/', 'api-origin') @api_route(r'/origin/(?P[a-z]+)/url/(?P.+)/', 'api-origin') @api_doc('/origin/') def api_origin(request, origin_id=None, origin_type=None, origin_url=None): """ .. http:get:: /api/1/origin/(origin_id)/ Get information about a software origin. :param int origin_id: a software origin identifier :>json number id: the origin unique identifier :>json string origin_visits_url: link to in order to get information about the visits for that origin :>json string type: the type of software origin (possible values are ``git``, ``svn``, ``hg``, ``deb``, ``pypi``, ``ftp`` or ``deposit``) :>json string url: the origin canonical url :reqheader Accept: the requested response content type, either ``application/json`` (default) or ``application/yaml`` :resheader Content-Type: this depends on :http:header:`Accept` header of request **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, :http:method:`options` :statuscode 200: no error :statuscode 404: requested origin can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`origin/1/` .. http:get:: /api/1/origin/(origin_type)/url/(origin_url)/ Get information about a software origin. :param string origin_type: the origin type (possible values are ``git``, ``svn``, ``hg``, ``deb``, ``pypi``, ``ftp`` or ``deposit``) :param string origin_url: the origin url :>json number id: the origin unique identifier :>json string origin_visits_url: link to in order to get information about the visits for that origin :>json string type: the type of software origin :>json string url: the origin canonical url :reqheader Accept: the requested response content type, either ``application/json`` (default) or ``application/yaml`` :resheader Content-Type: this depends on :http:header:`Accept` header of request **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, :http:method:`options` :statuscode 200: no error :statuscode 404: requested origin can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`origin/git/url/https://github.com/python/cpython/` """ # noqa ori_dict = { - 'id': origin_id, + 'id': int(origin_id) if origin_id else None, 'type': origin_type, 'url': origin_url } ori_dict = {k: v for k, v in ori_dict.items() if ori_dict[k]} if 'id' in ori_dict: error_msg = 'Origin with id %s not found.' % ori_dict['id'] else: error_msg = 'Origin with type %s and URL %s not found' % ( ori_dict['type'], ori_dict['url']) return api_lookup( service.lookup_origin, ori_dict, notfound_msg=error_msg, enrich_fn=_enrich_origin) @api_route(r'/origin/search/(?P.+)/', 'api-origin-search') @api_doc('/origin/search/') def api_origin_search(request, url_pattern): """ .. http:get:: /api/1/origin/search/(url_pattern)/ Search for software origins whose urls contain a provided string pattern or match a provided regular expression. The search is performed in a case insensitive way. :param string url_pattern: a string pattern or a regular expression :query int offset: the number of found origins to skip before returning results :query int limit: the maximum number of found origins to return :query boolean regexp: if true, consider provided pattern as a regular expression and search origins whose urls match it :query boolean with_visit: if true, only return origins with at least one visit by Software heritage :>jsonarr number id: the origin unique identifier :>jsonarr string origin_visits_url: link to in order to get information about the visits for that origin :>jsonarr string type: the type of software origin :>jsonarr string url: the origin canonical url :reqheader Accept: the requested response content type, either ``application/json`` (default) or ``application/yaml`` :resheader Content-Type: this depends on :http:header:`Accept` header of request **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, :http:method:`options` :statuscode 200: no error **Example:** .. parsed-literal:: :swh_web_api:`origin/search/python/?limit=2` """ # noqa result = {} offset = int(request.query_params.get('offset', '0')) limit = int(request.query_params.get('limit', '70')) regexp = request.query_params.get('regexp', 'false') with_visit = request.query_params.get('with_visit', 'false') results = api_lookup(service.search_origin, url_pattern, offset, limit, bool(strtobool(regexp)), bool(strtobool(with_visit)), enrich_fn=_enrich_origin) nb_results = len(results) if nb_results == limit: query_params = {} query_params['offset'] = offset + limit query_params['limit'] = limit query_params['regexp'] = regexp result['headers'] = { 'link-next': reverse('api-origin-search', url_args={'url_pattern': url_pattern}, query_params=query_params) } result.update({ 'results': results }) return result @api_route(r'/origin/metadata-search/', 'api-origin-metadata-search') @api_doc('/origin/metadata-search/', noargs=True) def api_origin_metadata_search(request): """ .. http:get:: /api/1/origin/metadata-search/ Search for software origins whose metadata (expressed as a JSON-LD/CodeMeta dictionary) match the provided criteria. For now, only full-text search on this dictionary is supported. :query str fulltext: a string that will be matched against origin metadata; results are ranked and ordered starting with the best ones. :query int limit: the maximum number of found origins to return (bounded to 100) :>jsonarr number origin_id: the origin unique identifier :>jsonarr dict metadata: metadata of the origin (as a JSON-LD/CodeMeta dictionary) :>jsonarr string from_revision: the revision used to extract these metadata (the current HEAD or one of the former HEADs) :>jsonarr dict tool: the tool used to extract these metadata :reqheader Accept: the requested response content type, either ``application/json`` (default) or ``application/yaml`` :resheader Content-Type: this depends on :http:header:`Accept` header of request **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, :http:method:`options` :statuscode 200: no error **Example:** .. parsed-literal:: :swh_web_api:`origin/metadata-search/?limit=2&fulltext=Jane%20Doe` """ # noqa fulltext = request.query_params.get('fulltext', None) limit = min(int(request.query_params.get('limit', '70')), 100) if not fulltext: content = '"fulltext" must be provided and non-empty.' raise BadInputExc(content) results = api_lookup(service.search_origin_metadata, fulltext, limit) return { 'results': results, } @api_route(r'/origin/(?P[0-9]+)/visits/', 'api-origin-visits') @api_doc('/origin/visits/') def api_origin_visits(request, origin_id): """ .. http:get:: /api/1/origin/(origin_id)/visits/ Get information about all visits of a software origin. Visits are returned sorted in descending order according to their date. :param int origin_id: a software origin identifier :query int per_page: specify the number of visits to list, for pagination purposes :query int last_visit: visit to start listing from, for pagination purposes :reqheader Accept: the requested response content type, either ``application/json`` (default) or ``application/yaml`` :resheader Content-Type: this depends on :http:header:`Accept` header of request :resheader Link: indicates that a subsequent result page is available and contains the url pointing to it :>jsonarr string date: ISO representation of the visit date (in UTC) :>jsonarr number id: the unique identifier of the origin :>jsonarr string origin_visit_url: link to :http:get:`/api/1/origin/(origin_id)/visit/(visit_id)/` in order to get information about the visit :>jsonarr string snapshot: the snapshot identifier of the visit :>jsonarr string snapshot_url: link to :http:get:`/api/1/snapshot/(snapshot_id)/` in order to get information about the snapshot of the visit :>jsonarr string status: status of the visit (either **full**, **partial** or **ongoing**) :>jsonarr number visit: the unique identifier of the visit **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, :http:method:`options` :statuscode 200: no error :statuscode 404: requested origin can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`origin/1/visits/` """ # noqa result = {} + origin_id = int(origin_id) per_page = int(request.query_params.get('per_page', '10')) last_visit = request.query_params.get('last_visit') if last_visit: last_visit = int(last_visit) def _lookup_origin_visits( origin_id, last_visit=last_visit, per_page=per_page): all_visits = get_origin_visits({'id': origin_id}) all_visits.reverse() visits = [] if not last_visit: visits = all_visits[:per_page] else: for i, v in enumerate(all_visits): if v['visit'] == last_visit: visits = all_visits[i+1:i+1+per_page] break for v in visits: yield v def _enrich_origin_visit(origin_visit): ov = origin_visit.copy() ov['origin_visit_url'] = reverse('api-origin-visit', url_args={'origin_id': origin_id, 'visit_id': ov['visit']}) snapshot = ov['snapshot'] if snapshot: ov['snapshot_url'] = reverse('api-snapshot', url_args={'snapshot_id': snapshot}) else: ov['snapshot_url'] = None return ov results = api_lookup(_lookup_origin_visits, origin_id, notfound_msg='No origin {} found'.format(origin_id), enrich_fn=_enrich_origin_visit) if results: nb_results = len(results) if nb_results == per_page: new_last_visit = results[-1]['visit'] query_params = {} query_params['last_visit'] = new_last_visit if request.query_params.get('per_page'): query_params['per_page'] = per_page result['headers'] = { 'link-next': reverse('api-origin-visits', url_args={'origin_id': origin_id}, query_params=query_params) } result.update({ 'results': results }) return result @api_route(r'/origin/(?P[0-9]+)/visit/(?P[0-9]+)/', 'api-origin-visit') @api_doc('/origin/visit/') def api_origin_visit(request, origin_id, visit_id): """ .. http:get:: /api/1/origin/(origin_id)/visit/(visit_id)/ Get information about a specific visit of a software origin. :param int origin_id: a software origin identifier :param int visit_id: a visit identifier :reqheader Accept: the requested response content type, either ``application/json`` (default) or ``application/yaml`` :resheader Content-Type: this depends on :http:header:`Accept` header of request :>json string date: ISO representation of the visit date (in UTC) :>json number origin: the origin unique identifier :>json string origin_url: link to get information about the origin :>jsonarr string snapshot: the snapshot identifier of the visit :>jsonarr string snapshot_url: link to :http:get:`/api/1/snapshot/(snapshot_id)/` in order to get information about the snapshot of the visit :>json string status: status of the visit (either **full**, **partial** or **ongoing**) :>json number visit: the unique identifier of the visit **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, :http:method:`options` :statuscode 200: no error :statuscode 404: requested origin or visit can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`origin/1500/visit/1/` """ # noqa def _enrich_origin_visit(origin_visit): ov = origin_visit.copy() ov['origin_url'] = reverse('api-origin', url_args={'origin_id': ov['origin']}) snapshot = ov['snapshot'] if snapshot: ov['snapshot_url'] = reverse('api-snapshot', url_args={'snapshot_id': snapshot}) else: ov['snapshot_url'] = None return ov return api_lookup( - service.lookup_origin_visit, origin_id, visit_id, + service.lookup_origin_visit, int(origin_id), int(visit_id), notfound_msg=('No visit {} for origin {} found' .format(visit_id, origin_id)), enrich_fn=_enrich_origin_visit) diff --git a/swh/web/common/origin_visits.py b/swh/web/common/origin_visits.py index 10b87d96..9bb1c7cb 100644 --- a/swh/web/common/origin_visits.py +++ b/swh/web/common/origin_visits.py @@ -1,186 +1,186 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import math from django.core.cache import cache from swh.web.common.exc import NotFoundExc from swh.web.common.utils import parse_timestamp def get_origin_visits(origin_info): """Function that returns the list of visits for a swh origin. That list is put in cache in order to speedup the navigation in the swh web browse ui. Args: - origin_id (int): the id of the swh origin to fetch visits from + origin_info (dict): dict describing the origin to fetch visits from Returns: list: A list of dict describing the origin visits with the following keys: * **date**: UTC visit date in ISO format, * **origin**: the origin id * **status**: the visit status, either **full**, **partial** or **ongoing** * **visit**: the visit id Raises: NotFoundExc: if the origin is not found """ from swh.web.common import service cache_entry_id = 'origin_%s_visits' % origin_info['id'] cache_entry = cache.get(cache_entry_id) if cache_entry: last_visit = cache_entry[-1]['visit'] new_visits = list(service.lookup_origin_visits(origin_info['id'], last_visit=last_visit)) if not new_visits: return cache_entry origin_visits = [] per_page = service.MAX_LIMIT last_visit = None while 1: visits = list(service.lookup_origin_visits(origin_info['id'], last_visit=last_visit, per_page=per_page)) origin_visits += visits if len(visits) < per_page: break else: if not last_visit: last_visit = per_page else: last_visit += per_page def _visit_sort_key(visit): ts = parse_timestamp(visit['date']).timestamp() return ts + (float(visit['visit']) / 10e3) for v in origin_visits: if 'metadata' in v: del v['metadata'] origin_visits = [dict(t) for t in set([tuple(d.items()) for d in origin_visits])] origin_visits = sorted(origin_visits, key=lambda v: _visit_sort_key(v)) cache.set(cache_entry_id, origin_visits) return origin_visits def get_origin_visit(origin_info, visit_ts=None, visit_id=None, snapshot_id=None): """Function that returns information about a visit for a given origin. The visit is retrieved from a provided timestamp. The closest visit from that timestamp is selected. Args: origin_info (dict): a dict filled with origin information (id, url, type) visit_ts (int or str): an ISO date string or Unix timestamp to parse Returns: A dict containing the visit info as described below:: {'origin': 2, 'date': '2017-10-08T11:54:25.582463+00:00', 'metadata': {}, 'visit': 25, 'status': 'full'} """ visits = get_origin_visits(origin_info) if not visits: if 'type' in origin_info and 'url' in origin_info: message = ('No visit associated to origin with' ' type %s and url %s!' % (origin_info['type'], origin_info['url'])) else: message = ('No visit associated to origin with' ' id %s!' % origin_info['id']) raise NotFoundExc(message) if snapshot_id: visit = [v for v in visits if v['snapshot'] == snapshot_id] if len(visit) == 0: if 'type' in origin_info and 'url' in origin_info: message = ('Visit for snapshot with id %s for origin with type' ' %s and url %s not found!' % (snapshot_id, origin_info['type'], origin_info['url'])) else: message = ('Visit for snapshot with id %s for origin with' ' id %s not found!' % (snapshot_id, origin_info['id'])) raise NotFoundExc(message) return visit[0] if visit_id: visit = [v for v in visits if v['visit'] == int(visit_id)] if len(visit) == 0: if 'type' in origin_info and 'url' in origin_info: message = ('Visit with id %s for origin with type %s' ' and url %s not found!' % (visit_id, origin_info['type'], origin_info['url'])) else: message = ('Visit with id %s for origin with id %s' ' not found!' % (visit_id, origin_info['id'])) raise NotFoundExc(message) return visit[0] if not visit_ts: # returns the latest full visit when no timestamp is provided for v in reversed(visits): if v['status'] == 'full': return v return visits[-1] parsed_visit_ts = math.floor(parse_timestamp(visit_ts).timestamp()) visit_idx = None for i, visit in enumerate(visits): ts = math.floor(parse_timestamp(visit['date']).timestamp()) if i == 0 and parsed_visit_ts <= ts: return visit elif i == len(visits) - 1: if parsed_visit_ts >= ts: return visit else: next_ts = math.floor( parse_timestamp(visits[i+1]['date']).timestamp()) if parsed_visit_ts >= ts and parsed_visit_ts < next_ts: if (parsed_visit_ts - ts) < (next_ts - parsed_visit_ts): visit_idx = i break else: visit_idx = i+1 break if visit_idx is not None: visit = visits[visit_idx] while visit_idx < len(visits) - 1 and \ visit['date'] == visits[visit_idx+1]['date']: visit_idx = visit_idx + 1 visit = visits[visit_idx] return visit else: if 'type' in origin_info and 'url' in origin_info: message = ('Visit with timestamp %s for origin with type %s ' 'and url %s not found!' % (visit_ts, origin_info['type'], origin_info['url'])) else: message = ('Visit with timestamp %s for origin with id %s ' 'not found!' % (visit_ts, origin_info['id'])) raise NotFoundExc(message) diff --git a/swh/web/tests/api/views/test_origin.py b/swh/web/tests/api/views/test_origin.py index 3be9e9e3..ba18c92d 100644 --- a/swh/web/tests/api/views/test_origin.py +++ b/swh/web/tests/api/views/test_origin.py @@ -1,363 +1,332 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information +from hypothesis import given from rest_framework.test import APITestCase from unittest.mock import patch from swh.storage.exc import StorageDBError, StorageAPIError +from swh.web.common.utils import reverse +from swh.web.common.origin_visits import get_origin_visits +from swh.web.tests.strategies import ( + origin, new_origin, visit_dates, new_snapshots +) from swh.web.tests.testcase import WebTestCase class OriginApiTestCase(WebTestCase, APITestCase): - def setUp(self): - self.origin_visit1 = { - 'date': 1104616800.0, - 'origin': 10, - 'visit': 100, - 'metadata': None, - 'status': 'full', - } - - self.origin1 = { - 'id': 1234, - 'url': 'ftp://some/url/to/origin/0', - 'type': 'ftp' - } - @patch('swh.web.api.views.origin.get_origin_visits') - def test_api_1_lookup_origin_visits_raise_error( + def test_api_lookup_origin_visits_raise_error( self, mock_get_origin_visits, ): - # given - mock_get_origin_visits.side_effect = ValueError( - 'voluntary error to check the bad request middleware.') - # when - rv = self.client.get('/api/1/origin/2/visits/') - # then + + err_msg = 'voluntary error to check the bad request middleware.' + + mock_get_origin_visits.side_effect = ValueError(err_msg) + + url = reverse('api-origin-visits', url_args={'origin_id': 2}) + rv = self.client.get(url) + self.assertEqual(rv.status_code, 400) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'ValueError', - 'reason': 'voluntary error to check the bad request middleware.'}) + 'reason': err_msg}) @patch('swh.web.api.views.origin.get_origin_visits') - def test_api_1_lookup_origin_visits_raise_swh_storage_error_db( + def test_api_lookup_origin_visits_raise_swh_storage_error_db( self, mock_get_origin_visits): - # given - mock_get_origin_visits.side_effect = StorageDBError( - 'Storage exploded! Will be back online shortly!') - # when - rv = self.client.get('/api/1/origin/2/visits/') - # then + + err_msg = 'Storage exploded! Will be back online shortly!' + + mock_get_origin_visits.side_effect = StorageDBError(err_msg) + + url = reverse('api-origin-visits', url_args={'origin_id': 2}) + rv = self.client.get(url) + self.assertEqual(rv.status_code, 503) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'StorageDBError', 'reason': - 'An unexpected error occurred in the backend: ' - 'Storage exploded! Will be back online shortly!'}) + 'An unexpected error occurred in the backend: %s' % err_msg}) @patch('swh.web.api.views.origin.get_origin_visits') - def test_api_1_lookup_origin_visits_raise_swh_storage_error_api( + def test_api_lookup_origin_visits_raise_swh_storage_error_api( self, mock_get_origin_visits): - # given - mock_get_origin_visits.side_effect = StorageAPIError( - 'Storage API dropped dead! Will resurrect from its ashes asap!' - ) - # when - rv = self.client.get('/api/1/origin/2/visits/') - # then + + err_msg = 'Storage API dropped dead! Will resurrect asap!' + + mock_get_origin_visits.side_effect = StorageAPIError(err_msg) + + url = reverse('api-origin-visits', url_args={'origin_id': 2}) + rv = self.client.get(url) + self.assertEqual(rv.status_code, 503) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'StorageAPIError', 'reason': - 'An unexpected error occurred in the api backend: ' - 'Storage API dropped dead! Will resurrect from its ashes asap!' + 'An unexpected error occurred in the api backend: %s' % err_msg }) - @patch('swh.web.api.views.origin.get_origin_visits') - def test_api_1_lookup_origin_visits(self, mock_get_origin_visits): - # given - stub_visits = [ - { - 'date': 1293919200.0, - 'origin': 2, - 'snapshot': '1234', - 'visit': 1 - }, - { - 'date': 1293919200.0, - 'origin': 2, - 'snapshot': '1234', - 'visit': 2 - }, - { - 'date': 1420149600.0, - 'origin': 2, - 'snapshot': '5678', - 'visit': 3 - }, - { - 'date': 1420149600.0, - 'origin': 2, - 'snapshot': '5678', - 'visit': 4 - } - ] + @given(new_origin(), visit_dates(4), new_snapshots(4)) + def test_api_lookup_origin_visits(self, new_origin, visit_dates, + new_snapshots): - mock_get_origin_visits.return_value = stub_visits + origin_id = self.storage.origin_add_one(new_origin) + new_origin['id'] = origin_id + for i, visit_date in enumerate(visit_dates): + origin_visit = self.storage.origin_visit_add(origin_id, visit_date) + self.storage.snapshot_add(origin_id, origin_visit['visit'], + new_snapshots[i]) - # when - rv = self.client.get('/api/1/origin/2/visits/?per_page=2&last_visit=3') + all_visits = list(reversed(get_origin_visits(new_origin))) - self.assertEqual(rv.status_code, 200) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(rv.data, [ - { - 'date': 1293919200.0, - 'origin': 2, - 'snapshot': '1234', - 'visit': 2, - 'origin_visit_url': '/api/1/origin/2/visit/2/', - 'snapshot_url': '/api/1/snapshot/1234/' - }, - { - 'date': 1293919200.0, - 'origin': 2, - 'snapshot': '1234', - 'visit': 1, - 'origin_visit_url': '/api/1/origin/2/visit/1/', - 'snapshot_url': '/api/1/snapshot/1234/' - }, - - ]) - - @patch('swh.web.api.views.origin.service') - def test_api_1_lookup_origin_visit(self, mock_service): - # given - origin_visit = self.origin_visit1.copy() - origin_visit.update({ - 'snapshot': '57478754' - }) + for last_visit, expected_visits in ( + (None, all_visits[:2]), + (all_visits[1]['visit'], all_visits[2:4])): - mock_service.lookup_origin_visit.return_value = origin_visit + url = reverse('api-origin-visits', + url_args={'origin_id': origin_id}, + query_params={'per_page': 2, + 'last_visit': last_visit}) - expected_origin_visit = self.origin_visit1.copy() - expected_origin_visit.update({ - 'origin_url': '/api/1/origin/10/', - 'snapshot': '57478754', - 'snapshot_url': '/api/1/snapshot/57478754/' - }) + rv = self.client.get(url) - # when - rv = self.client.get('/api/1/origin/10/visit/100/') + self.assertEqual(rv.status_code, 200) + self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(rv.status_code, 200) - self.assertEqual(rv['Content-Type'], 'application/json') - self.assertEqual(rv.data, expected_origin_visit) + for expected_visit in expected_visits: + origin_visit_url = reverse( + 'api-origin-visit', + url_args={'origin_id': origin_id, + 'visit_id': expected_visit['visit']}) + snapshot_url = reverse( + 'api-snapshot', + url_args={'snapshot_id': expected_visit['snapshot']}) + expected_visit['origin_visit_url'] = origin_visit_url + expected_visit['snapshot_url'] = snapshot_url - mock_service.lookup_origin_visit.assert_called_once_with('10', '100') + self.assertEqual(rv.data, expected_visits) - @patch('swh.web.api.views.origin.service') - def test_api_1_lookup_origin_visit_not_found(self, mock_service): - # given - mock_service.lookup_origin_visit.return_value = None + @given(new_origin(), visit_dates(4), new_snapshots(4)) + def test_api_lookup_origin_visit(self, new_origin, visit_dates, + new_snapshots): - # when - rv = self.client.get('/api/1/origin/1/visit/1000/') + origin_id = self.storage.origin_add_one(new_origin) + new_origin['id'] = origin_id + for i, visit_date in enumerate(visit_dates): + origin_visit = self.storage.origin_visit_add(origin_id, visit_date) + visit_id = origin_visit['visit'] + self.storage.snapshot_add(origin_id, origin_visit['visit'], + new_snapshots[i]) + url = reverse('api-origin-visit', + url_args={'origin_id': origin_id, + 'visit_id': visit_id}) + + rv = self.client.get(url) + self.assertEqual(rv.status_code, 200) + self.assertEqual(rv['Content-Type'], 'application/json') + + expected_visit = self.origin_visit_get_by(origin_id, visit_id) + + origin_url = reverse('api-origin', + url_args={'origin_id': origin_id}) + snapshot_url = reverse( + 'api-snapshot', + url_args={'snapshot_id': expected_visit['snapshot']}) + + expected_visit['origin_url'] = origin_url + expected_visit['snapshot_url'] = snapshot_url + + self.assertEqual(rv.data, expected_visit) + + @given(origin()) + def test_api_lookup_origin_visit_not_found(self, origin): + + all_visits = list(reversed(get_origin_visits(origin))) + + max_visit_id = max([v['visit'] for v in all_visits]) + + url = reverse('api-origin-visit', + url_args={'origin_id': origin['id'], + 'visit_id': max_visit_id + 1}) + + rv = self.client.get(url) self.assertEqual(rv.status_code, 404) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', - 'reason': 'No visit 1000 for origin 1 found' + 'reason': 'Origin with id %s or its visit with id %s not found!' % + (origin['id'], max_visit_id+1) }) - mock_service.lookup_origin_visit.assert_called_once_with('1', '1000') + @given(origin()) + def test_api_origin_by_id(self, origin): - @patch('swh.web.api.views.origin.service') - def test_api_origin_by_id(self, mock_service): - # given - mock_service.lookup_origin.return_value = self.origin1 + url = reverse('api-origin', url_args={'origin_id': origin['id']}) - expected_origin = self.origin1.copy() - expected_origin.update({ - 'origin_visits_url': '/api/1/origin/1234/visits/' - }) + rv = self.client.get(url) - # when - rv = self.client.get('/api/1/origin/1234/') + expected_origin = self.origin_get(origin) + + origin_visits_url = reverse('api-origin-visits', + url_args={'origin_id': origin['id']}) + + expected_origin['origin_visits_url'] = origin_visits_url - # then self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_origin) - mock_service.lookup_origin.assert_called_with({'id': '1234'}) + @given(origin()) + def test_api_origin_by_type_url(self, origin): - @patch('swh.web.api.views.origin.service') - def test_api_origin_by_type_url(self, mock_service): - # given - stub_origin = self.origin1.copy() - stub_origin.update({ - 'id': 987 - }) - mock_service.lookup_origin.return_value = stub_origin + url = reverse('api-origin', + url_args={'origin_type': origin['type'], + 'origin_url': origin['url']}) + rv = self.client.get(url) - expected_origin = stub_origin.copy() - expected_origin.update({ - 'origin_visits_url': '/api/1/origin/987/visits/' - }) + expected_origin = self.origin_get(origin) + + origin_visits_url = reverse('api-origin-visits', + url_args={'origin_id': origin['id']}) - # when - rv = self.client.get('/api/1/origin/ftp/url' - '/ftp://some/url/to/origin/0/') + expected_origin['origin_visits_url'] = origin_visits_url - # then self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_origin) - mock_service.lookup_origin.assert_called_with( - {'url': 'ftp://some/url/to/origin/0', - 'type': 'ftp'}) + @given(new_origin()) + def test_api_origin_not_found(self, new_origin): - @patch('swh.web.api.views.origin.service') - def test_api_origin_not_found(self, mock_service): - # given - mock_service.lookup_origin.return_value = None + url = reverse('api-origin', + url_args={'origin_type': new_origin['type'], + 'origin_url': new_origin['url']}) + rv = self.client.get(url) - # when - rv = self.client.get('/api/1/origin/4321/') - - # then self.assertEqual(rv.status_code, 404) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', - 'reason': 'Origin with id 4321 not found.' + 'reason': 'Origin with type %s and url %s not found!' % + (new_origin['type'], new_origin['url']) }) - mock_service.lookup_origin.assert_called_with({'id': '4321'}) - @patch('swh.web.common.service.idx_storage') - @patch('swh.web.common.service.storage') - def test_api_origin_metadata_search(self, mock_storage, mock_idx_storage): - # given - mock_storage.origin_get.return_value = { - 'id': 54974445, 'type': 'git', 'url': '/dev/null'} + @given(origin()) + def test_api_origin_metadata_search(self, mock_idx_storage, origin): + mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ .side_effect = lambda conjunction, limit: [{ 'from_revision': b'p&\xb7\xc1\xa2\xafVR\x1e\x95\x1c\x01\xed \xf2U\xfa\x05B8', 'metadata': {'author': 'Jane Doe'}, - 'origin_id': 54974445, + 'origin_id': origin['id'], 'tool': { 'configuration': { 'context': ['NpmMapping', 'CodemetaMapping'], 'type': 'local' }, 'id': 3, 'name': 'swh-metadata-detector', 'version': '0.0.1' } }] - # when - rv = self.client.get( - '/api/1/origin/metadata-search/?fulltext=Jane%20Doe') + url = reverse('api-origin-metadata-search', + query_params={'fulltext': 'Jane Doe'}) + rv = self.client.get(url) - # then self.assertEqual(rv.status_code, 200, rv.content) self.assertEqual(rv['Content-Type'], 'application/json') expected_data = [{ - 'id': 54974445, - 'type': 'git', - 'url': '/dev/null', + 'id': origin['id'], + 'type': origin['type'], + 'url': origin['url'], 'metadata': { 'metadata': {'author': 'Jane Doe'}, 'from_revision': '7026b7c1a2af56521e951c01ed20f255fa054238', 'tool': { 'configuration': { 'context': ['NpmMapping', 'CodemetaMapping'], 'type': 'local' }, 'id': 3, 'name': 'swh-metadata-detector', 'version': '0.0.1', } } }] self.assertEqual(rv.data, expected_data) mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ .assert_called_with(conjunction=['Jane Doe'], limit=70) @patch('swh.web.common.service.idx_storage') - @patch('swh.web.common.service.storage') - def test_api_origin_metadata_search_limit(self, mock_storage, - mock_idx_storage): - # given - mock_storage.origin_get.return_value = { - 'id': 54974445, 'type': 'git', 'url': '/dev/null'} + @given(origin()) + def test_api_origin_metadata_search_limit(self, mock_idx_storage, origin): + mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ .side_effect = lambda conjunction, limit: [{ 'from_revision': b'p&\xb7\xc1\xa2\xafVR\x1e\x95\x1c\x01\xed \xf2U\xfa\x05B8', 'metadata': {'author': 'Jane Doe'}, - 'origin_id': 54974445, + 'origin_id': origin['id'], 'tool': { 'configuration': { 'context': ['NpmMapping', 'CodemetaMapping'], 'type': 'local' }, 'id': 3, 'name': 'swh-metadata-detector', 'version': '0.0.1' } }] - # when - rv = self.client.get( - '/api/1/origin/metadata-search/?fulltext=Jane%20Doe') + url = reverse('api-origin-metadata-search', + query_params={'fulltext': 'Jane Doe'}) + rv = self.client.get(url) - # then self.assertEqual(rv.status_code, 200, rv.content) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(len(rv.data), 1) mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ .assert_called_with(conjunction=['Jane Doe'], limit=70) - # when - rv = self.client.get( - '/api/1/origin/metadata-search/?fulltext=Jane%20Doe&limit=10') + url = reverse('api-origin-metadata-search', + query_params={'fulltext': 'Jane Doe', + 'limit': 10}) + rv = self.client.get(url) - # then self.assertEqual(rv.status_code, 200, rv.content) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(len(rv.data), 1) mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ .assert_called_with(conjunction=['Jane Doe'], limit=10) - # when - rv = self.client.get( - '/api/1/origin/metadata-search/?fulltext=Jane%20Doe&limit=987') + url = reverse('api-origin-metadata-search', + query_params={'fulltext': 'Jane Doe', + 'limit': 987}) + rv = self.client.get(url) - # then self.assertEqual(rv.status_code, 200, rv.content) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(len(rv.data), 1) mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ .assert_called_with(conjunction=['Jane Doe'], limit=100) @patch('swh.web.common.service.idx_storage') def test_api_origin_metadata_search_invalid(self, mock_idx_storage): - rv = self.client.get('/api/1/origin/metadata-search/') - # then + url = reverse('api-origin-metadata-search') + rv = self.client.get(url) + self.assertEqual(rv.status_code, 400, rv.content) mock_idx_storage.assert_not_called() diff --git a/swh/web/tests/common/test_service.py b/swh/web/tests/common/test_service.py index 4ac849c1..4247fc9e 100644 --- a/swh/web/tests/common/test_service.py +++ b/swh/web/tests/common/test_service.py @@ -1,865 +1,862 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import copy import datetime import itertools import pytest import random from collections import defaultdict from hypothesis import given from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.web.common import service from swh.web.common.exc import BadInputExc, NotFoundExc from swh.web.tests.strategies import ( content, contents, unknown_content, unknown_contents, contents_with_ctags, origin, new_origin, visit_dates, directory, release, revision, unknown_revision, revisions, unknown_revisions, ancestor_revisions, non_ancestor_revisions, invalid_sha1, sha256, revision_with_submodules, unknown_directory, empty_directory ) from swh.web.tests.testcase import ( WebTestCase, ctags_json_missing, fossology_missing ) class ServiceTestCase(WebTestCase): def setUp(self): self.SHA1_SAMPLE = '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03' self.SHA1_SAMPLE_BIN = hash_to_bytes(self.SHA1_SAMPLE) self.DIRECTORY_ID = '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6' self.DIRECTORY_ID_BIN = hash_to_bytes(self.DIRECTORY_ID) self.AUTHOR_ID_BIN = { 'name': b'author', 'email': b'author@company.org', 'fullname': b'author ' } self.AUTHOR_ID = { 'name': 'author', 'email': 'author@company.org', 'fullname': 'author ' } self.COMMITTER_ID_BIN = { 'name': b'committer', 'email': b'committer@corp.org', 'fullname': b'committer ' } self.COMMITTER_ID = { 'name': 'committer', 'email': 'committer@corp.org', 'fullname': 'committer ' } self.SAMPLE_DATE_RAW = { 'timestamp': int(datetime.datetime( 2000, 1, 17, 11, 23, 54, tzinfo=datetime.timezone.utc, ).timestamp()), 'offset': 0, 'negative_utc': False, } self.SAMPLE_DATE = '2000-01-17T11:23:54+00:00' self.SAMPLE_MESSAGE_BIN = b'elegant fix for bug 31415957' self.SAMPLE_MESSAGE = 'elegant fix for bug 31415957' self.SAMPLE_REVISION = { 'id': self.SHA1_SAMPLE, 'directory': self.DIRECTORY_ID, 'author': self.AUTHOR_ID, 'committer': self.COMMITTER_ID, 'message': self.SAMPLE_MESSAGE, 'date': self.SAMPLE_DATE, 'committer_date': self.SAMPLE_DATE, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': {}, 'merge': False } self.SAMPLE_REVISION_RAW = { 'id': self.SHA1_SAMPLE_BIN, 'directory': self.DIRECTORY_ID_BIN, 'author': self.AUTHOR_ID_BIN, 'committer': self.COMMITTER_ID_BIN, 'message': self.SAMPLE_MESSAGE_BIN, 'date': self.SAMPLE_DATE_RAW, 'committer_date': self.SAMPLE_DATE_RAW, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], } @given(contents()) def test_lookup_multiple_hashes_all_present(self, contents): input_data = [] expected_output = [] for cnt in contents: input_data.append({'sha1': cnt['sha1']}) expected_output.append({'sha1': cnt['sha1'], 'found': True}) self.assertEqual(service.lookup_multiple_hashes(input_data), expected_output) @given(contents(), unknown_contents()) def test_lookup_multiple_hashes_some_missing(self, contents, unknown_contents): input_contents = list(itertools.chain(contents, unknown_contents)) random.shuffle(input_contents) input_data = [] expected_output = [] for cnt in input_contents: input_data.append({'sha1': cnt['sha1']}) expected_output.append({'sha1': cnt['sha1'], 'found': cnt in contents}) self.assertEqual(service.lookup_multiple_hashes(input_data), expected_output) @given(unknown_content()) def test_lookup_hash_does_not_exist(self, unknown_content): actual_lookup = service.lookup_hash('sha1_git:%s' % unknown_content['sha1_git']) self.assertEqual(actual_lookup, {'found': None, 'algo': 'sha1_git'}) @given(content()) def test_lookup_hash_exist(self, content): actual_lookup = service.lookup_hash('sha1:%s' % content['sha1']) content_metadata = self.content_get_metadata(content['sha1']) self.assertEqual({'found': content_metadata, 'algo': 'sha1'}, actual_lookup) @given(unknown_content()) def test_search_hash_does_not_exist(self, content): actual_lookup = service.search_hash('sha1_git:%s' % content['sha1_git']) self.assertEqual({'found': False}, actual_lookup) @given(content()) def test_search_hash_exist(self, content): actual_lookup = service.search_hash('sha1:%s' % content['sha1']) self.assertEqual({'found': True}, actual_lookup) @pytest.mark.skipif(ctags_json_missing, reason="requires ctags with json output support") @given(contents_with_ctags()) def test_lookup_content_ctags(self, contents_with_ctags): content_sha1 = random.choice(contents_with_ctags['sha1s']) self.content_add_ctags(content_sha1) actual_ctags = \ list(service.lookup_content_ctags('sha1:%s' % content_sha1)) expected_data = list(self.content_get_ctags(content_sha1)) for ctag in expected_data: ctag['id'] = content_sha1 self.assertEqual(actual_ctags, expected_data) @given(unknown_content()) def test_lookup_content_ctags_no_hash(self, unknown_content): actual_ctags = \ list(service.lookup_content_ctags('sha1:%s' % unknown_content['sha1'])) self.assertEqual(actual_ctags, []) @given(content()) def test_lookup_content_filetype(self, content): self.content_add_mimetype(content['sha1']) actual_filetype = service.lookup_content_filetype(content['sha1']) expected_filetype = self.content_get_mimetype(content['sha1']) self.assertEqual(actual_filetype, expected_filetype) @given(content()) def test_lookup_content_language(self, content): self.content_add_language(content['sha1']) actual_language = service.lookup_content_language(content['sha1']) expected_language = self.content_get_language(content['sha1']) self.assertEqual(actual_language, expected_language) @given(contents_with_ctags()) def test_lookup_expression(self, contents_with_ctags): per_page = 10 expected_ctags = [] for content_sha1 in contents_with_ctags['sha1s']: if len(expected_ctags) == per_page: break self.content_add_ctags(content_sha1) for ctag in self.content_get_ctags(content_sha1): if len(expected_ctags) == per_page: break if ctag['name'] == contents_with_ctags['symbol_name']: del ctag['id'] ctag['sha1'] = content_sha1 expected_ctags.append(ctag) actual_ctags = \ list(service.lookup_expression(contents_with_ctags['symbol_name'], last_sha1=None, per_page=10)) self.assertEqual(actual_ctags, expected_ctags) def test_lookup_expression_no_result(self): expected_ctags = [] actual_ctags = \ list(service.lookup_expression('barfoo', last_sha1=None, per_page=10)) self.assertEqual(actual_ctags, expected_ctags) @pytest.mark.skipif(fossology_missing, reason="requires fossology-nomossa installed") @given(content()) def test_lookup_content_license(self, content): self.content_add_license(content['sha1']) actual_license = service.lookup_content_license(content['sha1']) expected_license = self.content_get_license(content['sha1']) self.assertEqual(actual_license, expected_license) def test_stat_counters(self): actual_stats = service.stat_counters() self.assertEqual(actual_stats, self.storage.stat_counters()) @given(new_origin(), visit_dates()) def test_lookup_origin_visits(self, new_origin, visit_dates): origin_id = self.storage.origin_add_one(new_origin) for ts in visit_dates: self.storage.origin_visit_add(origin_id, ts) actual_origin_visits = list( service.lookup_origin_visits(origin_id, per_page=100)) - expected_visits = list(self.storage.origin_visit_get(origin_id)) - for visit in expected_visits: - visit['date'] = visit['date'].isoformat() - visit['metadata'] = {} + expected_visits = self.origin_visit_get(origin_id) self.assertEqual(actual_origin_visits, expected_visits) @given(new_origin(), visit_dates()) def test_lookup_origin_visit(self, new_origin, visit_dates): origin_id = self.storage.origin_add_one(new_origin) visits = [] for ts in visit_dates: visits.append(self.storage.origin_visit_add(origin_id, ts)) visit = random.choice(visits)['visit'] actual_origin_visit = service.lookup_origin_visit(origin_id, visit) expected_visit = dict(self.storage.origin_visit_get_by(origin_id, visit)) expected_visit['date'] = expected_visit['date'].isoformat() expected_visit['metadata'] = {} self.assertEqual(actual_origin_visit, expected_visit) @given(new_origin()) def test_lookup_origin(self, new_origin): origin_id = self.storage.origin_add_one(new_origin) actual_origin = service.lookup_origin({'id': origin_id}) expected_origin = self.storage.origin_get({'id': origin_id}) self.assertEqual(actual_origin, expected_origin) actual_origin = service.lookup_origin({'type': new_origin['type'], 'url': new_origin['url']}) expected_origin = self.storage.origin_get({'type': new_origin['type'], 'url': new_origin['url']}) self.assertEqual(actual_origin, expected_origin) @given(invalid_sha1()) def test_lookup_release_ko_id_checksum_not_a_sha1(self, invalid_sha1): with self.assertRaises(BadInputExc) as cm: service.lookup_release(invalid_sha1) self.assertIn('invalid checksum', cm.exception.args[0].lower()) @given(sha256()) def test_lookup_release_ko_id_checksum_too_long(self, sha256): with self.assertRaises(BadInputExc) as cm: service.lookup_release(sha256) self.assertEqual('Only sha1_git is supported.', cm.exception.args[0]) @given(directory()) def test_lookup_directory_with_path_not_found(self, directory): path = 'some/invalid/path/here' with self.assertRaises(NotFoundExc) as cm: service.lookup_directory_with_path(directory, path) self.assertEqual('Directory entry with path %s from %s ' 'not found' % (path, directory), cm.exception.args[0]) @given(directory()) def test_lookup_directory_with_path_found(self, directory): directory_content = self.directory_ls(directory) directory_entry = random.choice(directory_content) path = directory_entry['name'] actual_result = service.lookup_directory_with_path(directory, path) self.assertEqual(actual_result, directory_entry) @given(release()) def test_lookup_release(self, release): actual_release = service.lookup_release(release) self.assertEqual(actual_release, self.release_get(release)) @given(revision(), invalid_sha1(), sha256()) def test_lookup_revision_with_context_ko_not_a_sha1(self, revision, invalid_sha1, sha256): sha1_git_root = revision sha1_git = invalid_sha1 with self.assertRaises(BadInputExc) as cm: service.lookup_revision_with_context(sha1_git_root, sha1_git) self.assertIn('Invalid checksum query string', cm.exception.args[0]) sha1_git = sha256 with self.assertRaises(BadInputExc) as cm: service.lookup_revision_with_context(sha1_git_root, sha1_git) self.assertIn('Only sha1_git is supported', cm.exception.args[0]) @given(revision(), unknown_revision()) def test_lookup_revision_with_context_ko_sha1_git_does_not_exist( self, revision, unknown_revision): sha1_git_root = revision sha1_git = unknown_revision with self.assertRaises(NotFoundExc) as cm: service.lookup_revision_with_context(sha1_git_root, sha1_git) self.assertIn('Revision %s not found' % sha1_git, cm.exception.args[0]) @given(revision(), unknown_revision()) def test_lookup_revision_with_context_ko_root_sha1_git_does_not_exist( self, revision, unknown_revision): sha1_git_root = unknown_revision sha1_git = revision with self.assertRaises(NotFoundExc) as cm: service.lookup_revision_with_context(sha1_git_root, sha1_git) self.assertIn('Revision root %s not found' % sha1_git_root, cm.exception.args[0]) @given(ancestor_revisions()) def test_lookup_revision_with_context(self, ancestor_revisions): sha1_git = ancestor_revisions['sha1_git'] root_sha1_git = ancestor_revisions['sha1_git_root'] for sha1_git_root in (root_sha1_git, {'id': hash_to_bytes(root_sha1_git)}): actual_revision = \ service.lookup_revision_with_context(sha1_git_root, sha1_git) children = [] for rev in self.revision_log(root_sha1_git): for p_rev in rev['parents']: p_rev_hex = hash_to_hex(p_rev) if p_rev_hex == sha1_git: children.append(rev['id']) expected_revision = self.revision_get(sha1_git) expected_revision['children'] = children self.assertEqual(actual_revision, expected_revision) @given(non_ancestor_revisions()) def test_lookup_revision_with_context_ko(self, non_ancestor_revisions): sha1_git = non_ancestor_revisions['sha1_git'] root_sha1_git = non_ancestor_revisions['sha1_git_root'] with self.assertRaises(NotFoundExc) as cm: service.lookup_revision_with_context(root_sha1_git, sha1_git) self.assertIn('Revision %s is not an ancestor of %s' % (sha1_git, root_sha1_git), cm.exception.args[0]) @given(unknown_revision()) def test_lookup_directory_with_revision_not_found(self, unknown_revision): with self.assertRaises(NotFoundExc) as cm: service.lookup_directory_with_revision(unknown_revision) self.assertIn('Revision %s not found' % unknown_revision, cm.exception.args[0]) @given(revision()) def test_lookup_directory_with_revision_ko_path_to_nowhere(self, revision): invalid_path = 'path/to/something/unknown' with self.assertRaises(NotFoundExc) as cm: service.lookup_directory_with_revision(revision, invalid_path) exception_text = cm.exception.args[0].lower() self.assertIn('directory or file', exception_text) self.assertIn(invalid_path, exception_text) self.assertIn('revision %s' % revision, exception_text) self.assertIn('not found', exception_text) @given(revision_with_submodules()) def test_lookup_directory_with_revision_ko_type_not_implemented( self, revision_with_submodules): with self.assertRaises(NotImplementedError) as cm: service.lookup_directory_with_revision( revision_with_submodules['rev_sha1_git'], revision_with_submodules['rev_dir_rev_path']) self.assertIn("Entity of type rev not implemented.", cm.exception.args[0]) @given(revision()) def test_lookup_directory_with_revision_without_path(self, revision): actual_directory_entries = \ service.lookup_directory_with_revision(revision) revision_data = self.revision_get(revision) expected_directory_entries = \ self.directory_ls(revision_data['directory']) self.assertEqual(actual_directory_entries['type'], 'dir') self.assertEqual(actual_directory_entries['content'], expected_directory_entries) @given(revision()) def test_lookup_directory_with_revision_with_path(self, revision): revision_data = self.revision_get(revision) dir_entries = [e for e in self.directory_ls(revision_data['directory']) if e['type'] in ('file', 'dir')] expected_dir_entry = random.choice(dir_entries) actual_dir_entry = \ service.lookup_directory_with_revision(revision, expected_dir_entry['name']) self.assertEqual(actual_dir_entry['type'], expected_dir_entry['type']) self.assertEqual(actual_dir_entry['revision'], revision) self.assertEqual(actual_dir_entry['path'], expected_dir_entry['name']) if actual_dir_entry['type'] == 'file': del actual_dir_entry['content']['checksums']['blake2s256'] for key in ('checksums', 'status', 'length'): self.assertEqual(actual_dir_entry['content'][key], expected_dir_entry[key]) else: sub_dir_entries = self.directory_ls(expected_dir_entry['target']) self.assertEqual(actual_dir_entry['content'], sub_dir_entries) @given(revision()) def test_lookup_directory_with_revision_with_path_to_file_and_data( self, revision): revision_data = self.revision_get(revision) dir_entries = [e for e in self.directory_ls(revision_data['directory']) if e['type'] == 'file'] expected_dir_entry = random.choice(dir_entries) expected_data = \ self.content_get(expected_dir_entry['checksums']['sha1']) actual_dir_entry = \ service.lookup_directory_with_revision(revision, expected_dir_entry['name'], with_data=True) self.assertEqual(actual_dir_entry['type'], expected_dir_entry['type']) self.assertEqual(actual_dir_entry['revision'], revision) self.assertEqual(actual_dir_entry['path'], expected_dir_entry['name']) del actual_dir_entry['content']['checksums']['blake2s256'] for key in ('checksums', 'status', 'length'): self.assertEqual(actual_dir_entry['content'][key], expected_dir_entry[key]) self.assertEqual(actual_dir_entry['content']['data'], expected_data['data']) @given(revision()) def test_lookup_revision(self, revision): actual_revision = service.lookup_revision(revision) self.assertEqual(actual_revision, self.revision_get(revision)) @given(unknown_revision()) def test_lookup_revision_invalid_msg(self, new_revision_id): new_revision = copy.deepcopy(self.SAMPLE_REVISION_RAW) new_revision['id'] = hash_to_bytes(new_revision_id) new_revision['message'] = b'elegant fix for bug \xff' self.storage.revision_add([new_revision]) revision = service.lookup_revision(new_revision_id) self.assertEqual(revision['message'], None) self.assertEqual(revision['message_decoding_failed'], True) @given(unknown_revision()) def test_lookup_revision_msg_ok(self, new_revision_id): new_revision = copy.deepcopy(self.SAMPLE_REVISION_RAW) new_revision['id'] = hash_to_bytes(new_revision_id) self.storage.revision_add([new_revision]) revision_message = service.lookup_revision_message(new_revision_id) self.assertEqual(revision_message, {'message': self.SAMPLE_MESSAGE_BIN}) @given(unknown_revision()) def test_lookup_revision_msg_absent(self, new_revision_id): new_revision = copy.deepcopy(self.SAMPLE_REVISION_RAW) new_revision['id'] = hash_to_bytes(new_revision_id) del new_revision['message'] self.storage.revision_add([new_revision]) with self.assertRaises(NotFoundExc) as cm: service.lookup_revision_message(new_revision_id) self.assertEqual( cm.exception.args[0], 'No message for revision with sha1_git %s.' % new_revision_id ) @given(unknown_revision()) def test_lookup_revision_msg_no_rev(self, unknown_revision): with self.assertRaises(NotFoundExc) as cm: service.lookup_revision_message(unknown_revision) self.assertEqual( cm.exception.args[0], 'Revision with sha1_git %s not found.' % unknown_revision ) @given(revisions()) def test_lookup_revision_multiple(self, revisions): actual_revisions = list(service.lookup_revision_multiple(revisions)) expected_revisions = [] for rev in revisions: expected_revisions.append(self.revision_get(rev)) self.assertEqual(actual_revisions, expected_revisions) @given(unknown_revisions()) def test_lookup_revision_multiple_none_found(self, unknown_revisions): actual_revisions = \ list(service.lookup_revision_multiple(unknown_revisions)) self.assertEqual(actual_revisions, [None] * len(unknown_revisions)) @given(revision()) def test_lookup_revision_log(self, revision): actual_revision_log = \ list(service.lookup_revision_log(revision, limit=25)) expected_revision_log = self.revision_log(revision, limit=25) self.assertEqual(actual_revision_log, expected_revision_log) def _get_origin_branches(self, origin): origin_visit = self.origin_visit_get(origin['id'])[0] snapshot = self.snapshot_get(origin_visit['snapshot']) branches = {k: v for (k, v) in snapshot['branches'].items() if v['target_type'] == 'revision'} return branches @given(origin()) def test_lookup_revision_log_by(self, origin): branches = self._get_origin_branches(origin) branch_name = random.choice(list(branches.keys())) actual_log = \ list(service.lookup_revision_log_by(origin['id'], branch_name, None, limit=25)) expected_log = \ self.revision_log(branches[branch_name]['target'], limit=25) self.assertEqual(actual_log, expected_log) @given(origin()) def test_lookup_revision_log_by_notfound(self, origin): with self.assertRaises(NotFoundExc): service.lookup_revision_log_by( origin['id'], 'unknown_branch_name', None, limit=100) @given(unknown_content()) def test_lookup_content_raw_not_found(self, unknown_content): with self.assertRaises(NotFoundExc) as cm: service.lookup_content_raw('sha1:' + unknown_content['sha1']) self.assertIn(cm.exception.args[0], 'Content with %s checksum equals to %s not found!' % ('sha1', unknown_content['sha1'])) @given(content()) def test_lookup_content_raw(self, content): actual_content = service.lookup_content_raw( 'sha256:%s' % content['sha256']) expected_content = self.content_get(content['sha1']) self.assertEqual(actual_content, expected_content) @given(unknown_content()) def test_lookup_content_not_found(self, unknown_content): with self.assertRaises(NotFoundExc) as cm: service.lookup_content('sha1:%s' % unknown_content['sha1']) self.assertIn(cm.exception.args[0], 'Content with %s checksum equals to %s not found!' % ('sha1', unknown_content['sha1'])) @given(content()) def test_lookup_content_with_sha1(self, content): actual_content = service.lookup_content( 'sha1:%s' % content['sha1']) expected_content = self.content_get_metadata(content['sha1']) self.assertEqual(actual_content, expected_content) @given(content()) def test_lookup_content_with_sha256(self, content): actual_content = service.lookup_content( 'sha256:%s' % content['sha256']) expected_content = self.content_get_metadata(content['sha1']) self.assertEqual(actual_content, expected_content) @given(revision()) def test_lookup_person(self, revision): rev_data = self.revision_get(revision) actual_person = service.lookup_person(rev_data['author']['id']) self.assertEqual(actual_person, rev_data['author']) def test_lookup_directory_bad_checksum(self): with self.assertRaises(BadInputExc): service.lookup_directory('directory_id') @given(unknown_directory()) def test_lookup_directory_not_found(self, unknown_directory): with self.assertRaises(NotFoundExc) as cm: service.lookup_directory(unknown_directory) self.assertIn('Directory with sha1_git %s not found' % unknown_directory, cm.exception.args[0]) @given(directory()) def test_lookup_directory(self, directory): actual_directory_ls = list(service.lookup_directory( directory)) expected_directory_ls = self.directory_ls(directory) self.assertEqual(actual_directory_ls, expected_directory_ls) @given(empty_directory()) def test_lookup_directory_empty(self, empty_directory): actual_directory_ls = list(service.lookup_directory(empty_directory)) self.assertEqual(actual_directory_ls, []) @given(origin()) def test_lookup_revision_by_nothing_found(self, origin): with self.assertRaises(NotFoundExc): service.lookup_revision_by(origin['id'], 'invalid-branch-name') @given(origin()) def test_lookup_revision_by(self, origin): branches = self._get_origin_branches(origin) branch_name = random.choice(list(branches.keys())) actual_revision = \ service.lookup_revision_by(origin['id'], branch_name, None) expected_revision = \ self.revision_get(branches[branch_name]['target']) self.assertEqual(actual_revision, expected_revision) @given(origin(), revision()) def test_lookup_revision_with_context_by_ko(self, origin, revision): with self.assertRaises(NotFoundExc): service.lookup_revision_with_context_by(origin['id'], 'invalid-branch-name', None, revision) @given(origin()) def test_lookup_revision_with_context_by(self, origin): branches = self._get_origin_branches(origin) branch_name = random.choice(list(branches.keys())) root_rev = branches[branch_name]['target'] root_rev_log = self.revision_log(root_rev) children = defaultdict(list) for rev in root_rev_log: for rev_p in rev['parents']: children[rev_p].append(rev['id']) rev = root_rev_log[-1]['id'] actual_root_rev, actual_rev = service.lookup_revision_with_context_by( origin['id'], branch_name, None, rev) expected_root_rev = self.revision_get(root_rev) expected_rev = self.revision_get(rev) expected_rev['children'] = children[rev] self.assertEqual(actual_root_rev, expected_root_rev) self.assertEqual(actual_rev, expected_rev) def test_lookup_revision_through_ko_not_implemented(self): with self.assertRaises(NotImplementedError): service.lookup_revision_through({ 'something-unknown': 10, }) @given(origin()) def test_lookup_revision_through_with_context_by(self, origin): branches = self._get_origin_branches(origin) branch_name = random.choice(list(branches.keys())) root_rev = branches[branch_name]['target'] root_rev_log = self.revision_log(root_rev) rev = root_rev_log[-1]['id'] self.assertEqual(service.lookup_revision_through({ 'origin_id': origin['id'], 'branch_name': branch_name, 'ts': None, 'sha1_git': rev }), service.lookup_revision_with_context_by( origin['id'], branch_name, None, rev) ) @given(origin()) def test_lookup_revision_through_with_revision_by(self, origin): branches = self._get_origin_branches(origin) branch_name = random.choice(list(branches.keys())) self.assertEqual(service.lookup_revision_through({ 'origin_id': origin['id'], 'branch_name': branch_name, 'ts': None, }), service.lookup_revision_by( origin['id'], branch_name, None) ) @given(ancestor_revisions()) def test_lookup_revision_through_with_context(self, ancestor_revisions): sha1_git = ancestor_revisions['sha1_git'] sha1_git_root = ancestor_revisions['sha1_git_root'] self.assertEqual(service.lookup_revision_through({ 'sha1_git_root': sha1_git_root, 'sha1_git': sha1_git, }), service.lookup_revision_with_context( sha1_git_root, sha1_git) ) @given(revision()) def test_lookup_revision_through_with_revision(self, revision): self.assertEqual(service.lookup_revision_through({ 'sha1_git': revision }), service.lookup_revision(revision) ) @given(revision()) def test_lookup_directory_through_revision_ko_not_found(self, revision): with self.assertRaises(NotFoundExc): service.lookup_directory_through_revision( {'sha1_git': revision}, 'some/invalid/path') @given(revision()) def test_lookup_directory_through_revision_ok(self, revision): revision_data = self.revision_get(revision) dir_entries = [e for e in self.directory_ls(revision_data['directory']) if e['type'] == 'file'] dir_entry = random.choice(dir_entries) self.assertEqual( service.lookup_directory_through_revision({'sha1_git': revision}, dir_entry['name']), (revision, service.lookup_directory_with_revision( revision, dir_entry['name'])) ) @given(revision()) def test_lookup_directory_through_revision_ok_with_data(self, revision): revision_data = self.revision_get(revision) dir_entries = [e for e in self.directory_ls(revision_data['directory']) if e['type'] == 'file'] dir_entry = random.choice(dir_entries) self.assertEqual( service.lookup_directory_through_revision({'sha1_git': revision}, dir_entry['name'], with_data=True), (revision, service.lookup_directory_with_revision( revision, dir_entry['name'], with_data=True)) ) diff --git a/swh/web/tests/strategies.py b/swh/web/tests/strategies.py index c1ed4883..02e1c293 100644 --- a/swh/web/tests/strategies.py +++ b/swh/web/tests/strategies.py @@ -1,359 +1,381 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random from collections import defaultdict from datetime import datetime from hypothesis import settings from hypothesis.strategies import ( just, sampled_from, lists, composite, datetimes, integers ) from string import ascii_letters, hexdigits from swh.model.hashutil import hash_to_hex, hash_to_bytes from swh.model.identifiers import directory_identifier from swh.storage.algos.revisions_walker import get_revisions_walker -from swh.storage.tests.algos.test_snapshot import origins as new_origin # noqa +from swh.storage.tests.algos.test_snapshot import ( # noqa + origins as new_origin_strategy, snapshots as new_snapshot +) from swh.web.tests.data import get_tests_data # Module dedicated to the generation of input data for tests through # the use of hypothesis. # Some of these data are sampled from a test archive created and populated # in the swh.web.tests.data module. # Set some hypothesis settings settings.register_profile("swh-web", settings(deadline=None)) settings.load_profile("swh-web") # The following strategies exploit the hypothesis capabilities def _known_swh_object(object_type): tests_data = get_tests_data() return sampled_from(tests_data[object_type]) def _unknown_swh_object(draw, object_type): tests_data = get_tests_data() storage = tests_data['storage'] while True: sha1_git = draw(sha1()) # some tests will use the generated id to create a revision on the fly if object_type == 'revisions': obj = next(storage.revision_get([hash_to_bytes(sha1_git)])) if obj is None: return sha1_git elif sha1_git not in tests_data[object_type]: return sha1_git def sha1(): """ Hypothesis strategy returning a valid hexadecimal sha1 value. """ sha1 = ''.join(random.choice(hexdigits) for x in range(40)) return just(sha1.lower()) def invalid_sha1(): """ Hypothesis strategy returning an invalid sha1 representation. """ invalid_sha1 = ''.join(random.choice(ascii_letters) for x in range(50)) return just(invalid_sha1.lower()) def sha256(): """ Hypothesis strategy returning a valid hexadecimal sha256 value. """ sha256 = ''.join(random.choice(hexdigits) for x in range(64)) return just(sha256.lower()) def content(): """ Hypothesis strategy returning a random content ingested into the test archive. """ return _known_swh_object('contents') def contents(): """ Hypothesis strategy returning random contents ingested into the test archive. """ return lists(content(), min_size=2, max_size=8) @composite def unknown_content(draw): """ Hypothesis strategy returning a random content not ingested into the test archive. """ tests_data = get_tests_data() while True: unknown_content = { 'blake2s256': draw(sha256()), 'sha1': draw(sha1()), 'sha1_git': draw(sha1()), 'sha256': draw(sha256()) } if unknown_content not in tests_data['contents']: return unknown_content def unknown_contents(): """ Hypothesis strategy returning random contents not ingested into the test archive. """ return lists(unknown_content(), min_size=2, max_size=8) def directory(): """ Hypothesis strategy returning a random directory ingested into the test archive. """ return _known_swh_object('directories') def empty_directory(): """ Hypothesis strategy returning the empty directory ingested into the test archive. """ return just(directory_identifier({'entries': []})) @composite def unknown_directory(draw): """ Hypothesis strategy returning a random directory not ingested into the test archive. """ return _unknown_swh_object(draw, 'directories') def origin(): """ - Hypothesis strategy returning a random origin not ingested + Hypothesis strategy returning a random origin ingested into the test archive. """ return _known_swh_object('origins') -def visit_dates(): +def new_origin(): + """ + Hypothesis strategy returning a random origin not ingested + into the test archive. + """ + tests_data = get_tests_data() + storage = tests_data['storage'] + return new_origin_strategy().filter( + lambda origin: storage.origin_get(origin) is None) + + +def visit_dates(nb_dates=None): """ Hypothesis strategy returning a list of visit dates. """ + min_size = nb_dates if nb_dates else 2 + max_size = nb_dates if nb_dates else 8 return lists(datetimes(min_value=datetime(2015, 1, 1, 0, 0), max_value=datetime(2018, 12, 31, 0, 0)), - min_size=2, max_size=8, unique=True) + min_size=min_size, max_size=max_size, unique=True).map(sorted) def release(): """ Hypothesis strategy returning a random release ingested into the test archive. """ return _known_swh_object('releases') @composite def unknown_release(draw): """ Hypothesis strategy returning a random revision not ingested into the test archive. """ return _unknown_swh_object(draw, 'releases') def revision(): """ Hypothesis strategy returning a random revision ingested into the test archive. """ return _known_swh_object('revisions') @composite def unknown_revision(draw): """ Hypothesis strategy returning a random revision not ingested into the test archive. """ return _unknown_swh_object(draw, 'revisions') def revisions(): """ Hypothesis strategy returning random revisions ingested into the test archive. """ return lists(revision(), min_size=2, max_size=8) def unknown_revisions(): """ Hypothesis strategy returning random revisions not ingested into the test archive. """ return lists(unknown_revision(), min_size=2, max_size=8) def snapshot(): """ Hypothesis strategy returning a random snapshot ingested into the test archive. """ return _known_swh_object('snapshots') +def new_snapshots(nb_snapshots=None): + min_size = nb_snapshots if nb_snapshots else 2 + max_size = nb_snapshots if nb_snapshots else 8 + return lists(new_snapshot(min_size=2, max_size=10, only_objects=True), + min_size=min_size, max_size=max_size) + + @composite def unknown_snapshot(draw): """ Hypothesis strategy returning a random revision not ingested into the test archive. """ return _unknown_swh_object(draw, 'snapshots') def person(): """ Hypothesis strategy returning a random person ingested into the test archive. """ return _known_swh_object('persons') def unknown_person(): """ Hypothesis strategy returning a random person not ingested into the test archive. """ persons = get_tests_data()['persons'] return integers(min_value=max(persons)+1) def _get_origin_dfs_revisions_walker(): storage = get_tests_data()['storage'] origin = random.choice(get_tests_data()['origins'][:-1]) snapshot = storage.snapshot_get_latest(origin['id']) head = snapshot['branches'][b'HEAD']['target'] return get_revisions_walker('dfs', storage, head) def ancestor_revisions(): """ Hypothesis strategy returning a pair of revisions ingested into the test archive with an ancestor relation. """ # get a dfs revisions walker for one of the origins # loaded into the test archive revisions_walker = _get_origin_dfs_revisions_walker() master_revisions = [] children = defaultdict(list) init_rev_found = False # get revisions only authored in the master branch for rev in revisions_walker: for rev_p in rev['parents']: children[rev_p].append(rev['id']) if not init_rev_found: master_revisions.append(rev) if not rev['parents']: init_rev_found = True # head revision root_rev = master_revisions[0] # pick a random revision, different from head, only authored # in the master branch ancestor_rev_idx = random.choice(list(range(1, len(master_revisions)-1))) ancestor_rev = master_revisions[ancestor_rev_idx] ancestor_child_revs = children[ancestor_rev['id']] return just({ 'sha1_git_root': hash_to_hex(root_rev['id']), 'sha1_git': hash_to_hex(ancestor_rev['id']), 'children': [hash_to_hex(r) for r in ancestor_child_revs] }) def non_ancestor_revisions(): """ Hypothesis strategy returning a pair of revisions ingested into the test archive with no ancestor relation. """ # get a dfs revisions walker for one of the origins # loaded into the test archive revisions_walker = _get_origin_dfs_revisions_walker() merge_revs = [] children = defaultdict(list) # get all merge revisions for rev in revisions_walker: if len(rev['parents']) > 1: merge_revs.append(rev) for rev_p in rev['parents']: children[rev_p].append(rev['id']) # find a merge revisions whose parents have a unique child revision random.shuffle(merge_revs) selected_revs = None for merge_rev in merge_revs: if all(len(children[rev_p]) == 1 for rev_p in merge_rev['parents']): selected_revs = merge_rev['parents'] return just({ 'sha1_git_root': hash_to_hex(selected_revs[0]), 'sha1_git': hash_to_hex(selected_revs[1]) }) # The following strategies returns data specific to some tests # that can not be generated and thus are hardcoded. def contents_with_ctags(): """ Hypothesis strategy returning contents ingested into the test archive. Those contents are ctags compatible, that is running ctags on those lay results. """ return just({ 'sha1s': ['0ab37c02043ebff946c1937523f60aadd0844351', '15554cf7608dde6bfefac7e3d525596343a85b6f', '2ce837f1489bdfb8faf3ebcc7e72421b5bea83bd', '30acd0b47fc25e159e27a980102ddb1c4bea0b95', '4f81f05aaea3efb981f9d90144f746d6b682285b', '5153aa4b6e4455a62525bc4de38ed0ff6e7dd682', '59d08bafa6a749110dfb65ba43a61963d5a5bf9f', '7568285b2d7f31ae483ae71617bd3db873deaa2c', '7ed3ee8e94ac52ba983dd7690bdc9ab7618247b4', '8ed7ef2e7ff9ed845e10259d08e4145f1b3b5b03', '9b3557f1ab4111c8607a4f2ea3c1e53c6992916c', '9c20da07ed14dc4fcd3ca2b055af99b2598d8bdd', 'c20ceebd6ec6f7a19b5c3aebc512a12fbdc9234b', 'e89e55a12def4cd54d5bff58378a3b5119878eb7', 'e8c0654fe2d75ecd7e0b01bee8a8fc60a130097e', 'eb6595e559a1d34a2b41e8d4835e0e4f98a5d2b5'], 'symbol_name': 'ABS' }) def revision_with_submodules(): """ Hypothesis strategy returning a revision that is known to point to a directory with revision entries (aka git submodule) """ return just({ 'rev_sha1_git': 'ffcb69001f3f6745dfd5b48f72ab6addb560e234', 'rev_dir_sha1_git': 'd92a21446387fa28410e5a74379c934298f39ae2', 'rev_dir_rev_path': 'libtess2' }) diff --git a/swh/web/tests/testcase.py b/swh/web/tests/testcase.py index 1e855e05..70870838 100644 --- a/swh/web/tests/testcase.py +++ b/swh/web/tests/testcase.py @@ -1,165 +1,175 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import shutil from subprocess import run, PIPE from django.core.cache import cache from hypothesis.extra.django import TestCase from swh.model.hashutil import hash_to_bytes from swh.web import config from swh.web.common import converters, service from swh.web.tests.data import get_tests_data ctags_json_missing = \ shutil.which('ctags') is None or \ b'+json' not in run(['ctags', '--version'], stdout=PIPE).stdout fossology_missing = shutil.which('nomossa') is None class WebTestCase(TestCase): """Base TestCase class for swh-web. It is initialized with references to in-memory storages containing raw tests data. It also defines class methods to retrieve those tests data in a json serializable format in order to ease tests implementation. """ @classmethod def setUpClass(cls): super().setUpClass() tests_data = get_tests_data() cls.storage = tests_data['storage'] cls.idx_storage = tests_data['idx_storage'] cls.mimetype_indexer = tests_data['mimetype_indexer'] cls.language_indexer = tests_data['language_indexer'] cls.license_indexer = tests_data['license_indexer'] cls.ctags_indexer = tests_data['ctags_indexer'] # Update swh-web configuration to use the in-memory storage # instantiated in the tests.data module swh_config = config.get_config() swh_config.update({'storage': cls.storage}) service.storage = cls.storage # Update swh-web configuration to use the in-memory indexer storage # instantiated in the tests.data modules swh_config.update({'indexer_storage': cls.idx_storage}) service.idx_storage = cls.idx_storage @classmethod def content_add_mimetype(cls, cnt_id): cls.mimetype_indexer.run([hash_to_bytes(cnt_id)], 'update-dups') @classmethod def content_get_mimetype(cls, cnt_id): mimetype = next(cls.idx_storage.content_mimetype_get( [hash_to_bytes(cnt_id)])) return converters.from_filetype(mimetype) @classmethod def content_add_language(cls, cnt_id): cls.language_indexer.run([hash_to_bytes(cnt_id)], 'update-dups') @classmethod def content_get_language(cls, cnt_id): lang = next(cls.idx_storage.content_language_get( [hash_to_bytes(cnt_id)])) return converters.from_swh(lang, hashess={'id'}) @classmethod def content_add_license(cls, cnt_id): cls.license_indexer.run([hash_to_bytes(cnt_id)], 'update-dups') @classmethod def content_get_license(cls, cnt_id): cnt_id_bytes = hash_to_bytes(cnt_id) lic = next(cls.idx_storage.content_fossology_license_get( [cnt_id_bytes])) return converters.from_swh({'id': cnt_id_bytes, 'facts': lic[cnt_id_bytes]}, hashess={'id'}) @classmethod def content_add_ctags(cls, cnt_id): cls.ctags_indexer.run([hash_to_bytes(cnt_id)], 'update-dups') @classmethod def content_get_ctags(cls, cnt_id): cnt_id_bytes = hash_to_bytes(cnt_id) ctags = cls.idx_storage.content_ctags_get([cnt_id_bytes]) for ctag in ctags: yield converters.from_swh(ctag, hashess={'id'}) @classmethod def content_get_metadata(cls, cnt_id): cnt_id_bytes = hash_to_bytes(cnt_id) metadata = next(cls.storage.content_get_metadata([cnt_id_bytes])) return converters.from_swh(metadata, hashess={'sha1', 'sha1_git', 'sha256', 'blake2s256'}) @classmethod def content_get(cls, cnt_id): cnt_id_bytes = hash_to_bytes(cnt_id) cnt = next(cls.storage.content_get([cnt_id_bytes])) return converters.from_content(cnt) @classmethod def directory_ls(cls, dir_id): cnt_id_bytes = hash_to_bytes(dir_id) dir_content = map(converters.from_directory_entry, cls.storage.directory_ls(cnt_id_bytes)) return list(dir_content) @classmethod def release_get(cls, rel_id): rel_id_bytes = hash_to_bytes(rel_id) rel_data = next(cls.storage.release_get([rel_id_bytes])) return converters.from_release(rel_data) @classmethod def revision_get(cls, rev_id): rev_id_bytes = hash_to_bytes(rev_id) rev_data = next(cls.storage.revision_get([rev_id_bytes])) return converters.from_revision(rev_data) @classmethod def revision_log(cls, rev_id, limit=None): rev_id_bytes = hash_to_bytes(rev_id) return list(map(converters.from_revision, cls.storage.revision_log([rev_id_bytes], limit=limit))) @classmethod def snapshot_get_latest(cls, origin_id): snp = cls.storage.snapshot_get_latest(origin_id) return converters.from_snapshot(snp) + @classmethod + def origin_get(cls, origin_info): + origin = cls.storage.origin_get(origin_info) + return converters.from_origin(origin) + @classmethod def origin_visit_get(cls, origin_id): visits = cls.storage.origin_visit_get(origin_id) return list(map(converters.from_origin_visit, visits)) + @classmethod + def origin_visit_get_by(cls, origin_id, visit_id): + visit = cls.storage.origin_visit_get_by(origin_id, visit_id) + return converters.from_origin_visit(visit) + @classmethod def snapshot_get(cls, snapshot_id): snp = cls.storage.snapshot_get(hash_to_bytes(snapshot_id)) return converters.from_snapshot(snp) @classmethod def person_get(cls, person_id): person = next(cls.storage.person_get([person_id])) return converters.from_person(person) def setUp(self): cache.clear()