diff --git a/swh/web/api/views/origin.py b/swh/web/api/views/origin.py index cb7d0d1f..0c70e485 100644 --- a/swh/web/api/views/origin.py +++ b/swh/web/api/views/origin.py @@ -1,489 +1,571 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from distutils.util import strtobool from functools import partial from swh.web.common import service from swh.web.common.exc import BadInputExc from swh.web.common.origin_visits import get_origin_visits from swh.web.common.utils import reverse from swh.web.api.apidoc import api_doc, format_docstring from swh.web.api.apiurls import api_route from swh.web.api.views.utils import api_lookup +DOC_RETURN_ORIGIN = ''' + :>json string origin_visits_url: link to in order to get information + about the visits for that origin + :>json string url: the origin canonical url + :>json string type: the type of software origin (deprecated value; + types are now associated to visits instead of origins) + :>json number id: the origin unique identifier (deprecated value; + you should only refer to origins based on their URL) +''' + +DOC_RETURN_ORIGIN_ARRAY = \ + DOC_RETURN_ORIGIN.replace(':>json', ':>jsonarr') + +DOC_RETURN_ORIGIN_VISIT = ''' + :>json string date: ISO representation of the visit date (in UTC) + :>json str origin: the origin canonical url + :>json string origin_url: link to get information about the origin + :>jsonarr string snapshot: the snapshot identifier of the visit + :>jsonarr string snapshot_url: link to + :http:get:`/api/1/snapshot/(snapshot_id)/` in order to get + information about the snapshot of the visit + :>json string status: status of the visit (either **full**, + **partial** or **ongoing**) + :>json number visit: the unique identifier of the visit +''' + +DOC_RETURN_ORIGIN_VISIT_ARRAY = \ + DOC_RETURN_ORIGIN_VISIT.replace(':>json', ':>jsonarr') + +DOC_RETURN_ORIGIN_VISIT_ARRAY += ''' + :>jsonarr number id: the unique identifier of the origin + :>jsonarr string origin_visit_url: link to + :http:get:`/api/1/origin/(origin_url)/visit/(visit_id)/` + in order to get information about the visit +''' + + def _enrich_origin(origin): if 'id' in origin: o = origin.copy() o['origin_visits_url'] = reverse( - 'api-1-origin-visits', url_args={'origin_id': origin['id']}) + 'api-1-origin-visits', url_args={'origin_url': origin['url']}) return o return origin def _enrich_origin_visit(origin_visit, *, - with_origin_url, with_origin_visit_url): + with_origin_link, with_origin_visit_link): ov = origin_visit.copy() - if with_origin_url: + if with_origin_link: ov['origin_url'] = reverse('api-1-origin', - url_args={'origin_id': ov['origin']}) - if with_origin_visit_url: + url_args={'origin_url': ov['origin']}) + if with_origin_visit_link: ov['origin_visit_url'] = reverse('api-1-origin-visit', - url_args={'origin_id': ov['origin'], + url_args={'origin_url': ov['origin'], 'visit_id': ov['visit']}) snapshot = ov['snapshot'] if snapshot: ov['snapshot_url'] = reverse('api-1-snapshot', url_args={'snapshot_id': snapshot}) else: ov['snapshot_url'] = None return ov @api_route(r'/origins/', 'api-1-origins') @api_doc('/origins/', noargs=True) -@format_docstring() +@format_docstring(return_origin_array=DOC_RETURN_ORIGIN_ARRAY) def api_origins(request): """ .. http:get:: /api/1/origins/ Get list of archived software origins. Origins are sorted by ids before returning them. :query int origin_from: The first origin id that will be included in returned results (default to 1) :query int origin_count: The maximum number of origins to return (default to 100, can not exceed 10000) - :>jsonarr number id: the origin unique identifier - :>jsonarr string origin_visits_url: link to in order to get information - about the visits for that origin - :>jsonarr string type: the type of software origin (possible values - are ``git``, ``svn``, ``hg``, ``deb``, ``pypi``, ``npm``, ``ftp`` - or ``deposit``) - :>jsonarr string url: the origin canonical url + {return_origin_array} {common_headers} {resheader_link} **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, :http:method:`options` :statuscode 200: no error **Example:** .. parsed-literal:: :swh_web_api:`origins?origin_from=50000&origin_count=500` """ origin_from = int(request.query_params.get('origin_from', '1')) origin_count = int(request.query_params.get('origin_count', '100')) origin_count = min(origin_count, 10000) results = api_lookup( service.lookup_origins, origin_from, origin_count+1, enrich_fn=_enrich_origin) response = {'results': results, 'headers': {}} if len(results) > origin_count: origin_from = results.pop()['id'] response['headers']['link-next'] = reverse( 'api-1-origins', query_params={'origin_from': origin_from, 'origin_count': origin_count}) return response -@api_route(r'/origin/(?P[0-9]+)/', 'api-1-origin') @api_route(r'/origin/(?P[a-z]+)/url/(?P.+)/', 'api-1-origin') +@api_route(r'/origin/(?P.+)/get/', 'api-1-origin') +@api_route(r'/origin/(?P[0-9]+)/', 'api-1-origin') @api_doc('/origin/') -@format_docstring() +@format_docstring(return_origin=DOC_RETURN_ORIGIN) def api_origin(request, origin_id=None, origin_type=None, origin_url=None): """ + .. http:get:: /api/1/origin/(origin_url)/get/ + + Get information about a software origin. + + :param string origin_url: the origin url + + {return_origin} + + {common_headers} + + **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, + :http:method:`options` + + :statuscode 200: no error + :statuscode 404: requested origin can not be found in the archive + + **Example:** + + .. parsed-literal:: + + :swh_web_api:`origin/git/url/https://github.com/python/cpython/` + .. http:get:: /api/1/origin/(origin_id)/ Get information about a software origin. :param int origin_id: a software origin identifier - :>json number id: the origin unique identifier - :>json string origin_visits_url: link to in order to get information - about the visits for that origin - :>json string type: the type of software origin (possible values are - ``git``, ``svn``, ``hg``, ``deb``, ``pypi``, ``npm``, ``ftp`` or - ``deposit``) - :>json string url: the origin canonical url + {return_origin} {common_headers} **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, :http:method:`options` :statuscode 200: no error :statuscode 404: requested origin can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`origin/1/` .. http:get:: /api/1/origin/(origin_type)/url/(origin_url)/ Get information about a software origin. + .. warning:: + + This endpoint is deprecated. You should use + :http:get:`/api/1/origin/(origin_url)/get/` instead. + :param string origin_type: the origin type (possible values are ``git``, ``svn``, ``hg``, ``deb``, ``pypi``, ``npm``, ``ftp`` or ``deposit``) :param string origin_url: the origin url - :>json number id: the origin unique identifier - :>json string origin_visits_url: link to in order to get information - about the visits for that origin - :>json string type: the type of software origin - :>json string url: the origin canonical url + {return_origin} {common_headers} **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, :http:method:`options` :statuscode 200: no error :statuscode 404: requested origin can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`origin/git/url/https://github.com/python/cpython/` """ ori_dict = { 'id': int(origin_id) if origin_id else None, 'type': origin_type, 'url': origin_url } ori_dict = {k: v for k, v in ori_dict.items() if ori_dict[k]} - if 'id' in ori_dict: - error_msg = 'Origin with id %s not found.' % ori_dict['id'] - else: - error_msg = 'Origin with type %s and URL %s not found' % ( - ori_dict['type'], ori_dict['url']) + error_msg = 'Origin %s not found.' % \ + (ori_dict.get('id') or ori_dict['url']) return api_lookup( service.lookup_origin, ori_dict, notfound_msg=error_msg, enrich_fn=_enrich_origin) @api_route(r'/origin/search/(?P.+)/', 'api-1-origin-search') @api_doc('/origin/search/') -@format_docstring() +@format_docstring(return_origin_array=DOC_RETURN_ORIGIN_ARRAY) def api_origin_search(request, url_pattern): """ .. http:get:: /api/1/origin/search/(url_pattern)/ Search for software origins whose urls contain a provided string pattern or match a provided regular expression. The search is performed in a case insensitive way. :param string url_pattern: a string pattern or a regular expression :query int offset: the number of found origins to skip before returning results :query int limit: the maximum number of found origins to return :query boolean regexp: if true, consider provided pattern as a regular expression and search origins whose urls match it :query boolean with_visit: if true, only return origins with at least one visit by Software heritage - :>jsonarr number id: the origin unique identifier - :>jsonarr string origin_visits_url: link to in order to get information - about the visits for that origin - :>jsonarr string type: the type of software origin - :>jsonarr string url: the origin canonical url + {return_origin_array} {common_headers} **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, :http:method:`options` :statuscode 200: no error **Example:** .. parsed-literal:: :swh_web_api:`origin/search/python/?limit=2` """ result = {} offset = int(request.query_params.get('offset', '0')) limit = int(request.query_params.get('limit', '70')) regexp = request.query_params.get('regexp', 'false') with_visit = request.query_params.get('with_visit', 'false') results = api_lookup(service.search_origin, url_pattern, offset, limit, bool(strtobool(regexp)), bool(strtobool(with_visit)), enrich_fn=_enrich_origin) nb_results = len(results) if nb_results == limit: query_params = {} query_params['offset'] = offset + limit query_params['limit'] = limit query_params['regexp'] = regexp result['headers'] = { 'link-next': reverse('api-1-origin-search', url_args={'url_pattern': url_pattern}, query_params=query_params) } result.update({ 'results': results }) return result @api_route(r'/origin/metadata-search/', 'api-1-origin-metadata-search') @api_doc('/origin/metadata-search/', noargs=True, need_params=True) -@format_docstring() +@format_docstring(return_origin_array=DOC_RETURN_ORIGIN_ARRAY) def api_origin_metadata_search(request): """ .. http:get:: /api/1/origin/metadata-search/ Search for software origins whose metadata (expressed as a JSON-LD/CodeMeta dictionary) match the provided criteria. For now, only full-text search on this dictionary is supported. :query str fulltext: a string that will be matched against origin metadata; results are ranked and ordered starting with the best ones. :query int limit: the maximum number of found origins to return (bounded to 100) - :>jsonarr number origin_id: the origin unique identifier - :>jsonarr dict metadata: metadata of the origin (as a - JSON-LD/CodeMeta dictionary) - :>jsonarr string from_revision: the revision used to extract these - metadata (the current HEAD or one of the former HEADs) - :>jsonarr dict tool: the tool used to extract these metadata + {return_origin_array} {common_headers} **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, :http:method:`options` :statuscode 200: no error **Example:** .. parsed-literal:: :swh_web_api:`origin/metadata-search/?limit=2&fulltext=Jane%20Doe` """ fulltext = request.query_params.get('fulltext', None) limit = min(int(request.query_params.get('limit', '70')), 100) if not fulltext: content = '"fulltext" must be provided and non-empty.' raise BadInputExc(content) results = api_lookup(service.search_origin_metadata, fulltext, limit) return { 'results': results, } +@api_route(r'/origin/(?P.*)/visits/', 'api-1-origin-visits') @api_route(r'/origin/(?P[0-9]+)/visits/', 'api-1-origin-visits') @api_doc('/origin/visits/') -@format_docstring() -def api_origin_visits(request, origin_id): +@format_docstring( + return_origin_visit_array=DOC_RETURN_ORIGIN_VISIT_ARRAY) +def api_origin_visits(request, origin_id=None, origin_url=None): """ + .. http:get:: /api/1/origin/(origin_url)/visits/ + + Get information about all visits of a software origin. + Visits are returned sorted in descending order according + to their date. + + :param str origin_url: a software origin URL + :query int per_page: specify the number of visits to list, for + pagination purposes + :query int last_visit: visit to start listing from, for pagination + purposes + + {common_headers} + {resheader_link} + + {return_origin_visit_array} + + **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, + :http:method:`options` + + :statuscode 200: no error + :statuscode 404: requested origin can not be found in the archive + + **Example:** + + .. parsed-literal:: + + :swh_web_api:`origin/https://github.com/hylang/hy/visits/` + .. http:get:: /api/1/origin/(origin_id)/visits/ Get information about all visits of a software origin. Visits are returned sorted in descending order according to their date. :param int origin_id: a software origin identifier :query int per_page: specify the number of visits to list, for pagination purposes :query int last_visit: visit to start listing from, for pagination purposes {common_headers} {resheader_link} - :>jsonarr string date: ISO representation of the visit date (in UTC) - :>jsonarr number id: the unique identifier of the origin - :>jsonarr string origin_visit_url: link to - :http:get:`/api/1/origin/(origin_id)/visit/(visit_id)/` in order to - get information about the visit - :>jsonarr string snapshot: the snapshot identifier of the visit - :>jsonarr string snapshot_url: link to - :http:get:`/api/1/snapshot/(snapshot_id)/` in order to get - information about the snapshot of the visit - :>jsonarr string status: status of the visit (either **full**, - **partial** or **ongoing**) - :>jsonarr number visit: the unique identifier of the visit + {return_origin_visit_array} **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, :http:method:`options` :statuscode 200: no error :statuscode 404: requested origin can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`origin/1/visits/` """ result = {} - origin_id = int(origin_id) + if origin_url: + origin_query = {'url': origin_url} + notfound_msg = 'No origin {} found'.format(origin_url) + url_args_next = {'origin_url': origin_url} + else: + origin_query = {'id': int(origin_id)} + notfound_msg = 'No origin {} found'.format(origin_id) + url_args_next = {'origin_id': origin_id} per_page = int(request.query_params.get('per_page', '10')) last_visit = request.query_params.get('last_visit') if last_visit: last_visit = int(last_visit) def _lookup_origin_visits( - origin_id, last_visit=last_visit, per_page=per_page): - all_visits = get_origin_visits({'id': origin_id}) + origin_query, last_visit=last_visit, per_page=per_page): + all_visits = get_origin_visits(origin_query) all_visits.reverse() visits = [] if not last_visit: visits = all_visits[:per_page] else: for i, v in enumerate(all_visits): if v['visit'] == last_visit: visits = all_visits[i+1:i+1+per_page] break for v in visits: yield v - results = api_lookup(_lookup_origin_visits, origin_id, - notfound_msg='No origin {} found'.format(origin_id), + results = api_lookup(_lookup_origin_visits, origin_query, + notfound_msg=notfound_msg, enrich_fn=partial(_enrich_origin_visit, - with_origin_url=False, - with_origin_visit_url=True)) + with_origin_link=False, + with_origin_visit_link=True)) if results: nb_results = len(results) if nb_results == per_page: new_last_visit = results[-1]['visit'] query_params = {} query_params['last_visit'] = new_last_visit if request.query_params.get('per_page'): query_params['per_page'] = per_page result['headers'] = { 'link-next': reverse('api-1-origin-visits', - url_args={'origin_id': origin_id}, + url_args=url_args_next, query_params=query_params) } result.update({ 'results': results }) return result +@api_route(r'/origin/(?P.*)/visit/(?P[0-9]+)/', + 'api-1-origin-visit') @api_route(r'/origin/(?P[0-9]+)/visit/(?P[0-9]+)/', 'api-1-origin-visit') @api_doc('/origin/visit/') -@format_docstring() -def api_origin_visit(request, origin_id, visit_id): +@format_docstring(return_origin_visit=DOC_RETURN_ORIGIN_VISIT) +def api_origin_visit(request, visit_id, origin_url=None, origin_id=None): """ + .. http:get:: /api/1/origin/(origin_url)/visit/(visit_id)/ + + Get information about a specific visit of a software origin. + + :param str origin_url: a software origin URL + :param int visit_id: a visit identifier + + {common_headers} + + {return_origin_visit} + + **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, + :http:method:`options` + + :statuscode 200: no error + :statuscode 404: requested origin or visit can not be found in the + archive + + **Example:** + + .. parsed-literal:: + + :swh_web_api:`origin/https://github.com/hylang/hy/visit/1/` + .. http:get:: /api/1/origin/(origin_id)/visit/(visit_id)/ Get information about a specific visit of a software origin. :param int origin_id: a software origin identifier :param int visit_id: a visit identifier {common_headers} - :>json string date: ISO representation of the visit date (in UTC) - :>json number origin: the origin unique identifier - :>json string origin_url: link to get information about the origin - :>jsonarr string snapshot: the snapshot identifier of the visit - :>jsonarr string snapshot_url: link to - :http:get:`/api/1/snapshot/(snapshot_id)/` in order to get - information about the snapshot of the visit - :>json string status: status of the visit (either **full**, - **partial** or **ongoing**) - :>json number visit: the unique identifier of the visit + {return_origin_visit} **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, :http:method:`options` :statuscode 200: no error :statuscode 404: requested origin or visit can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`origin/1500/visit/1/` """ + if not origin_url: + origin_url = service.lookup_origin({'id': int(origin_id)})['url'] return api_lookup( - service.lookup_origin_visit, int(origin_id), int(visit_id), + service.lookup_origin_visit, origin_url, int(visit_id), notfound_msg=('No visit {} for origin {} found' - .format(visit_id, origin_id)), + .format(visit_id, origin_url)), enrich_fn=partial(_enrich_origin_visit, - with_origin_url=True, - with_origin_visit_url=False)) + with_origin_link=True, + with_origin_visit_link=False)) @api_route(r'/origin/(?P[a-z]+)/url/(?P.+)' '/intrinsic-metadata', 'api-origin-intrinsic-metadata') @api_doc('/origin/intrinsic-metadata/') @format_docstring() def api_origin_intrinsic_metadata(request, origin_type, origin_url): """ .. http:get:: /api/1/origin/(origin_type)/url/(origin_url)/intrinsic-metadata Get intrinsic metadata of a software origin (as a JSON-LD/CodeMeta dictionary). :param string origin_type: the origin type (possible values are ``git``, ``svn``, ``hg``, ``deb``, ``pypi``, ``npm``, ``ftp`` or ``deposit``) :param string origin_url: the origin url :>json string ???: intrinsic metadata field of the origin {common_headers} **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, :http:method:`options` :statuscode 200: no error :statuscode 404: requested origin can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`origin/git/url/https://github.com/python/cpython/intrinsic-metadata` """ # noqa ori_dict = { 'type': origin_type, 'url': origin_url } - error_msg = 'Origin with type %s and URL %s not found' % ( - ori_dict['type'], ori_dict['url']) + error_msg = 'Origin with URL %s not found' % ori_dict['url'] return api_lookup( service.lookup_origin_intrinsic_metadata, ori_dict, notfound_msg=error_msg, enrich_fn=_enrich_origin) diff --git a/swh/web/common/origin_visits.py b/swh/web/common/origin_visits.py index 906e8267..b3628ef4 100644 --- a/swh/web/common/origin_visits.py +++ b/swh/web/common/origin_visits.py @@ -1,177 +1,180 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import math from django.core.cache import cache from swh.web.common.exc import NotFoundExc from swh.web.common.utils import parse_timestamp def get_origin_visits(origin_info): """Function that returns the list of visits for a swh origin. That list is put in cache in order to speedup the navigation in the swh web browse ui. Args: origin_info (dict): dict describing the origin to fetch visits from Returns: list: A list of dict describing the origin visits with the following keys: * **date**: UTC visit date in ISO format, * **origin**: the origin id * **status**: the visit status, either **full**, **partial** or **ongoing** * **visit**: the visit id Raises: NotFoundExc: if the origin is not found """ from swh.web.common import service - cache_entry_id = 'origin_%s_visits' % origin_info['id'] + if 'url' in origin_info: + origin_url = origin_info['url'] + else: + origin_url = service.lookup_origin(origin_info)['url'] + + cache_entry_id = 'origin_visits_%s' % origin_url cache_entry = cache.get(cache_entry_id) if cache_entry: last_visit = cache_entry[-1]['visit'] - new_visits = list(service.lookup_origin_visits(origin_info['id'], + new_visits = list(service.lookup_origin_visits(origin_url, last_visit=last_visit)) if not new_visits: - last_snp = service.lookup_latest_origin_snapshot(origin_info['id']) + last_snp = service.lookup_latest_origin_snapshot(origin_url) if not last_snp or last_snp['id'] == cache_entry[-1]['snapshot']: return cache_entry origin_visits = [] per_page = service.MAX_LIMIT last_visit = None while 1: - visits = list(service.lookup_origin_visits(origin_info['id'], + visits = list(service.lookup_origin_visits(origin_url, last_visit=last_visit, per_page=per_page)) origin_visits += visits if len(visits) < per_page: break else: if not last_visit: last_visit = per_page else: last_visit += per_page def _visit_sort_key(visit): ts = parse_timestamp(visit['date']).timestamp() return ts + (float(visit['visit']) / 10e3) for v in origin_visits: if 'metadata' in v: del v['metadata'] origin_visits = [dict(t) for t in set([tuple(d.items()) for d in origin_visits])] origin_visits = sorted(origin_visits, key=lambda v: _visit_sort_key(v)) cache.set(cache_entry_id, origin_visits) return origin_visits def get_origin_visit(origin_info, visit_ts=None, visit_id=None, snapshot_id=None): """Function that returns information about a visit for a given origin. The visit is retrieved from a provided timestamp. The closest visit from that timestamp is selected. Args: origin_info (dict): a dict filled with origin information (id, url, type) visit_ts (int or str): an ISO date string or Unix timestamp to parse Returns: A dict containing the visit info as described below:: {'origin': 2, 'date': '2017-10-08T11:54:25.582463+00:00', 'metadata': {}, 'visit': 25, 'status': 'full'} """ visits = get_origin_visits(origin_info) if not visits: - if 'type' in origin_info and 'url' in origin_info: + if 'url' in origin_info: message = ('No visit associated to origin with' - ' type %s and url %s!' % (origin_info['type'], - origin_info['url'])) + ' url %s!' % origin_info['url']) else: message = ('No visit associated to origin with' ' id %s!' % origin_info['id']) raise NotFoundExc(message) if snapshot_id: visit = [v for v in visits if v['snapshot'] == snapshot_id] if len(visit) == 0: if 'type' in origin_info and 'url' in origin_info: message = ('Visit for snapshot with id %s for origin with type' - ' %s and url %s not found!' % - (snapshot_id, origin_info['type'], - origin_info['url'])) + ' url %s not found!' % + (snapshot_id, origin_info['url'])) else: message = ('Visit for snapshot with id %s for origin with' ' id %s not found!' % (snapshot_id, origin_info['id'])) raise NotFoundExc(message) return visit[0] if visit_id: visit = [v for v in visits if v['visit'] == int(visit_id)] if len(visit) == 0: if 'type' in origin_info and 'url' in origin_info: - message = ('Visit with id %s for origin with type %s' + message = ('Visit with id %s for origin with' ' and url %s not found!' % - (visit_id, origin_info['type'], origin_info['url'])) + (visit_id, origin_info['url'])) else: message = ('Visit with id %s for origin with id %s' ' not found!' % (visit_id, origin_info['id'])) raise NotFoundExc(message) return visit[0] if not visit_ts: # returns the latest full visit when no timestamp is provided for v in reversed(visits): if v['status'] == 'full': return v return visits[-1] target_visit_ts = math.floor(parse_timestamp(visit_ts).timestamp()) # Find the visit with date closest to the target (in absolute value) (abs_time_delta, visit_idx) = min( ((math.floor(parse_timestamp(visit['date']).timestamp()), i) for (i, visit) in enumerate(visits)), key=lambda ts_and_i: abs(ts_and_i[0] - target_visit_ts)) if visit_idx is not None: visit = visits[visit_idx] # If multiple visits have the same date, select the one with # the largest id. while visit_idx < len(visits) - 1 and \ visit['date'] == visits[visit_idx+1]['date']: visit_idx = visit_idx + 1 visit = visits[visit_idx] return visit else: if 'type' in origin_info and 'url' in origin_info: - message = ('Visit with timestamp %s for origin with type %s ' + message = ('Visit with timestamp %s for origin with ' 'and url %s not found!' % - (visit_ts, origin_info['type'], origin_info['url'])) + (visit_ts, origin_info['url'])) else: message = ('Visit with timestamp %s for origin with id %s ' 'not found!' % (visit_ts, origin_info['id'])) raise NotFoundExc(message) diff --git a/swh/web/common/service.py b/swh/web/common/service.py index 132b20aa..01d24b7b 100644 --- a/swh/web/common/service.py +++ b/swh/web/common/service.py @@ -1,1081 +1,1079 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import os from collections import defaultdict from swh.model import hashutil from swh.storage.algos import revisions_walker from swh.web.common import converters from swh.web.common import query from swh.web.common.exc import NotFoundExc from swh.web.common.origin_visits import get_origin_visit from swh.web import config storage = config.storage() vault = config.vault() idx_storage = config.indexer_storage() MAX_LIMIT = 50 # Top limit the users can ask for def _first_element(l): """Returns the first element in the provided list or None if it is empty or None""" return next(iter(l or []), None) def lookup_multiple_hashes(hashes): """Lookup the passed hashes in a single DB connection, using batch processing. Args: An array of {filename: X, sha1: Y}, string X, hex sha1 string Y. Returns: The same array with elements updated with elem['found'] = true if the hash is present in storage, elem['found'] = false if not. """ hashlist = [hashutil.hash_to_bytes(elem['sha1']) for elem in hashes] content_missing = storage.content_missing_per_sha1(hashlist) missing = [hashutil.hash_to_hex(x) for x in content_missing] for x in hashes: x.update({'found': True}) for h in hashes: if h['sha1'] in missing: h['found'] = False return hashes def lookup_expression(expression, last_sha1, per_page): """Lookup expression in raw content. Args: expression (str): An expression to lookup through raw indexed content last_sha1 (str): Last sha1 seen per_page (int): Number of results per page Yields: ctags whose content match the expression """ limit = min(per_page, MAX_LIMIT) ctags = idx_storage.content_ctags_search(expression, last_sha1=last_sha1, limit=limit) for ctag in ctags: ctag = converters.from_swh(ctag, hashess={'id'}) ctag['sha1'] = ctag['id'] ctag.pop('id') yield ctag def lookup_hash(q): """Checks if the storage contains a given content checksum Args: query string of the form Returns: Dict with key found containing the hash info if the hash is present, None if not. """ algo, hash = query.parse_hash(q) found = _first_element(storage.content_find({algo: hash})) return {'found': converters.from_content(found), 'algo': algo} def search_hash(q): """Checks if the storage contains a given content checksum Args: query string of the form Returns: Dict with key found to True or False, according to whether the checksum is present or not """ algo, hash = query.parse_hash(q) found = _first_element(storage.content_find({algo: hash})) return {'found': found is not None} def _lookup_content_sha1(q): """Given a possible input, query for the content's sha1. Args: q: query string of the form Returns: binary sha1 if found or None """ algo, hash = query.parse_hash(q) if algo != 'sha1': hashes = _first_element(storage.content_find({algo: hash})) if not hashes: return None return hashes['sha1'] return hash def lookup_content_ctags(q): """Return ctags information from a specified content. Args: q: query string of the form Yields: ctags information (dict) list if the content is found. """ sha1 = _lookup_content_sha1(q) if not sha1: return None ctags = list(idx_storage.content_ctags_get([sha1])) if not ctags: return None for ctag in ctags: yield converters.from_swh(ctag, hashess={'id'}) def lookup_content_filetype(q): """Return filetype information from a specified content. Args: q: query string of the form Yields: filetype information (dict) list if the content is found. """ sha1 = _lookup_content_sha1(q) if not sha1: return None filetype = _first_element(list(idx_storage.content_mimetype_get([sha1]))) if not filetype: return None return converters.from_filetype(filetype) def lookup_content_language(q): """Return language information from a specified content. Args: q: query string of the form Yields: language information (dict) list if the content is found. """ sha1 = _lookup_content_sha1(q) if not sha1: return None lang = _first_element(list(idx_storage.content_language_get([sha1]))) if not lang: return None return converters.from_swh(lang, hashess={'id'}) def lookup_content_license(q): """Return license information from a specified content. Args: q: query string of the form Yields: license information (dict) list if the content is found. """ sha1 = _lookup_content_sha1(q) if not sha1: return None lic = _first_element(idx_storage.content_fossology_license_get([sha1])) if not lic: return None return converters.from_swh({'id': sha1, 'facts': lic[sha1]}, hashess={'id'}) def lookup_origin(origin): """Return information about the origin matching dict origin. Args: - origin: origin's dict with keys either 'id' or - ('type' AND 'url') + origin: origin's dict with keys either 'id' or 'url' Returns: origin information as dict. """ origin_info = storage.origin_get(origin) if not origin_info: - if 'id' in origin and origin['id']: - msg = 'Origin with id %s not found!' % origin['id'] - else: - msg = 'Origin with type %s and url %s not found!' % \ - (origin['type'], origin['url']) + msg = 'Origin %s not found!' % \ + (origin.get('id') or origin['url']) raise NotFoundExc(msg) return converters.from_origin(origin_info) def lookup_origins(origin_from=1, origin_count=100): """Get list of archived software origins in a paginated way. Origins are sorted by id before returning them Args: origin_from (int): The minimum id of the origins to return origin_count (int): The maximum number of origins to return Yields: origins information as dicts """ origins = storage.origin_get_range(origin_from, origin_count) return map(converters.from_origin, origins) def search_origin(url_pattern, offset=0, limit=50, regexp=False, with_visit=False): """Search for origins whose urls contain a provided string pattern or match a provided regular expression. Args: url_pattern: the string pattern to search for in origin urls offset: number of found origins to skip before returning results limit: the maximum number of found origins to return Returns: list of origin information as dict. """ origins = storage.origin_search(url_pattern, offset, limit, regexp, with_visit) return map(converters.from_origin, origins) def search_origin_metadata(fulltext, limit=50): """Search for origins whose metadata match a provided string pattern. Args: fulltext: the string pattern to search for in origin metadata offset: number of found origins to skip before returning results limit: the maximum number of found origins to return Returns: list of origin metadata as dict. """ matches = idx_storage.origin_intrinsic_metadata_search_fulltext( conjunction=[fulltext], limit=limit) results = [] for match in matches: match['from_revision'] = hashutil.hash_to_hex(match['from_revision']) result = converters.from_origin( storage.origin_get({'id': match.pop('id')})) result['metadata'] = match results.append(result) return results def lookup_origin_intrinsic_metadata(origin_dict): """Return intrinsic metadata for origin whose origin_id matches given origin_id. Args: origin_dict: origin's dict with keys ('type' AND 'url') Returns: origin metadata. """ origin_info = storage.origin_get(origin_dict) if not origin_info: msg = 'Origin with type %s and url %s not found!' % \ (origin_dict['type'], origin_dict['url']) raise NotFoundExc(msg) origin_ids = [origin_info['id']] match = _first_element( idx_storage.origin_intrinsic_metadata_get(origin_ids)) result = {} if match: result = match['metadata'] return result def lookup_person(person_id): """Return information about the person with id person_id. Args: person_id as string Returns: person information as dict. Raises: NotFoundExc if there is no person with the provided id. """ person = _first_element(storage.person_get([int(person_id)])) if not person: raise NotFoundExc('Person with id %s not found' % person_id) return converters.from_person(person) def _to_sha1_bin(sha1_hex): _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( sha1_hex, ['sha1'], # HACK: sha1_git really 'Only sha1_git is supported.') return sha1_git_bin def _check_directory_exists(sha1_git, sha1_git_bin): if len(list(storage.directory_missing([sha1_git_bin]))): raise NotFoundExc('Directory with sha1_git %s not found' % sha1_git) def lookup_directory(sha1_git): """Return information about the directory with id sha1_git. Args: sha1_git as string Returns: directory information as dict. """ empty_dir_sha1 = '4b825dc642cb6eb9a060e54bf8d69288fbee4904' if sha1_git == empty_dir_sha1: return [] sha1_git_bin = _to_sha1_bin(sha1_git) _check_directory_exists(sha1_git, sha1_git_bin) directory_entries = storage.directory_ls(sha1_git_bin) return map(converters.from_directory_entry, directory_entries) def lookup_directory_with_path(sha1_git, path_string): """Return directory information for entry with path path_string w.r.t. root directory pointed by directory_sha1_git Args: - directory_sha1_git: sha1_git corresponding to the directory to which we append paths to (hopefully) find the entry - the relative path to the entry starting from the directory pointed by directory_sha1_git Raises: NotFoundExc if the directory entry is not found """ sha1_git_bin = _to_sha1_bin(sha1_git) _check_directory_exists(sha1_git, sha1_git_bin) paths = path_string.strip(os.path.sep).split(os.path.sep) queried_dir = storage.directory_entry_get_by_path( sha1_git_bin, list(map(lambda p: p.encode('utf-8'), paths))) if not queried_dir: raise NotFoundExc(('Directory entry with path %s from %s not found') % (path_string, sha1_git)) return converters.from_directory_entry(queried_dir) def lookup_release(release_sha1_git): """Return information about the release with sha1 release_sha1_git. Args: release_sha1_git: The release's sha1 as hexadecimal Returns: Release information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. """ sha1_git_bin = _to_sha1_bin(release_sha1_git) release = _first_element(storage.release_get([sha1_git_bin])) if not release: raise NotFoundExc('Release with sha1_git %s not found.' % release_sha1_git) return converters.from_release(release) def lookup_release_multiple(sha1_git_list): """Return information about the revisions identified with their sha1_git identifiers. Args: sha1_git_list: A list of revision sha1_git identifiers Returns: Release information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. """ sha1_bin_list = (_to_sha1_bin(sha1_git) for sha1_git in sha1_git_list) releases = storage.release_get(sha1_bin_list) or [] return (converters.from_release(r) for r in releases) def lookup_revision(rev_sha1_git): """Return information about the revision with sha1 revision_sha1_git. Args: revision_sha1_git: The revision's sha1 as hexadecimal Returns: Revision information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. NotFoundExc if there is no revision with the provided sha1_git. """ sha1_git_bin = _to_sha1_bin(rev_sha1_git) revision = _first_element(storage.revision_get([sha1_git_bin])) if not revision: raise NotFoundExc('Revision with sha1_git %s not found.' % rev_sha1_git) return converters.from_revision(revision) def lookup_revision_multiple(sha1_git_list): """Return information about the revisions identified with their sha1_git identifiers. Args: sha1_git_list: A list of revision sha1_git identifiers Returns: Generator of revisions information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. """ sha1_bin_list = (_to_sha1_bin(sha1_git) for sha1_git in sha1_git_list) revisions = storage.revision_get(sha1_bin_list) or [] return (converters.from_revision(r) for r in revisions) def lookup_revision_message(rev_sha1_git): """Return the raw message of the revision with sha1 revision_sha1_git. Args: revision_sha1_git: The revision's sha1 as hexadecimal Returns: Decoded revision message as dict {'message': } Raises: ValueError if the identifier provided is not of sha1 nature. NotFoundExc if the revision is not found, or if it has no message """ sha1_git_bin = _to_sha1_bin(rev_sha1_git) revision = _first_element(storage.revision_get([sha1_git_bin])) if not revision: raise NotFoundExc('Revision with sha1_git %s not found.' % rev_sha1_git) if 'message' not in revision: raise NotFoundExc('No message for revision with sha1_git %s.' % rev_sha1_git) res = {'message': revision['message']} return res def _lookup_revision_id_by(origin_id, branch_name, timestamp): def _get_snapshot_branch(snapshot, branch_name): snapshot = lookup_snapshot(visit['snapshot'], branches_from=branch_name, branches_count=10) branch = None if branch_name in snapshot['branches']: branch = snapshot['branches'][branch_name] return branch visit = get_origin_visit({'id': origin_id}, visit_ts=timestamp) branch = _get_snapshot_branch(visit['snapshot'], branch_name) rev_id = None if branch and branch['target_type'] == 'revision': rev_id = branch['target'] elif branch and branch['target_type'] == 'alias': branch = _get_snapshot_branch(visit['snapshot'], branch['target']) if branch and branch['target_type'] == 'revision': rev_id = branch['target'] if not rev_id: raise NotFoundExc('Revision for origin %s and branch %s not found.' % (origin_id, branch_name)) return rev_id def lookup_revision_by(origin_id, branch_name='HEAD', timestamp=None): """Lookup revision by origin id, snapshot branch name and visit timestamp. If branch_name is not provided, lookup using 'HEAD' as default. If timestamp is not provided, use the most recent. Args: origin_id (int): origin of the revision branch_name (str): snapshot branch name timestamp (str/int): origin visit time frame Returns: dict: The revision matching the criterions Raises: NotFoundExc if no revision corresponds to the criterion """ rev_id = _lookup_revision_id_by(origin_id, branch_name, timestamp) return lookup_revision(rev_id) def lookup_revision_log(rev_sha1_git, limit): """Lookup revision log by revision id. Args: rev_sha1_git (str): The revision's sha1 as hexadecimal limit (int): the maximum number of revisions returned Returns: list: Revision log as list of revision dicts Raises: ValueError: if the identifier provided is not of sha1 nature. NotFoundExc: if there is no revision with the provided sha1_git. """ lookup_revision(rev_sha1_git) sha1_git_bin = _to_sha1_bin(rev_sha1_git) revision_entries = storage.revision_log([sha1_git_bin], limit) return map(converters.from_revision, revision_entries) def lookup_revision_log_by(origin_id, branch_name, timestamp, limit): """Lookup revision by origin id, snapshot branch name and visit timestamp. Args: origin_id (int): origin of the revision branch_name (str): snapshot branch timestamp (str/int): origin visit time frame limit (int): the maximum number of revisions returned Returns: list: Revision log as list of revision dicts Raises: NotFoundExc: if no revision corresponds to the criterion """ rev_id = _lookup_revision_id_by(origin_id, branch_name, timestamp) return lookup_revision_log(rev_id, limit) def lookup_revision_with_context_by(origin_id, branch_name, timestamp, sha1_git, limit=100): """Return information about revision sha1_git, limited to the sub-graph of all transitive parents of sha1_git_root. sha1_git_root being resolved through the lookup of a revision by origin_id, branch_name and ts. In other words, sha1_git is an ancestor of sha1_git_root. Args: - origin_id: origin of the revision. - branch_name: revision's branch. - timestamp: revision's time frame. - sha1_git: one of sha1_git_root's ancestors. - limit: limit the lookup to 100 revisions back. Returns: Pair of (root_revision, revision). Information on sha1_git if it is an ancestor of sha1_git_root including children leading to sha1_git_root Raises: - BadInputExc in case of unknown algo_hash or bad hash. - NotFoundExc if either revision is not found or if sha1_git is not an ancestor of sha1_git_root. """ rev_root_id = _lookup_revision_id_by(origin_id, branch_name, timestamp) rev_root_id_bin = hashutil.hash_to_bytes(rev_root_id) rev_root = _first_element(storage.revision_get([rev_root_id_bin])) return (converters.from_revision(rev_root), lookup_revision_with_context(rev_root, sha1_git, limit)) def lookup_revision_with_context(sha1_git_root, sha1_git, limit=100): """Return information about revision sha1_git, limited to the sub-graph of all transitive parents of sha1_git_root. In other words, sha1_git is an ancestor of sha1_git_root. Args: sha1_git_root: latest revision. The type is either a sha1 (as an hex string) or a non converted dict. sha1_git: one of sha1_git_root's ancestors limit: limit the lookup to 100 revisions back Returns: Information on sha1_git if it is an ancestor of sha1_git_root including children leading to sha1_git_root Raises: BadInputExc in case of unknown algo_hash or bad hash NotFoundExc if either revision is not found or if sha1_git is not an ancestor of sha1_git_root """ sha1_git_bin = _to_sha1_bin(sha1_git) revision = _first_element(storage.revision_get([sha1_git_bin])) if not revision: raise NotFoundExc('Revision %s not found' % sha1_git) if isinstance(sha1_git_root, str): sha1_git_root_bin = _to_sha1_bin(sha1_git_root) revision_root = _first_element(storage.revision_get([sha1_git_root_bin])) # noqa if not revision_root: raise NotFoundExc('Revision root %s not found' % sha1_git_root) else: sha1_git_root_bin = sha1_git_root['id'] revision_log = storage.revision_log([sha1_git_root_bin], limit) parents = {} children = defaultdict(list) for rev in revision_log: rev_id = rev['id'] parents[rev_id] = [] for parent_id in rev['parents']: parents[rev_id].append(parent_id) children[parent_id].append(rev_id) if revision['id'] not in parents: raise NotFoundExc('Revision %s is not an ancestor of %s' % (sha1_git, sha1_git_root)) revision['children'] = children[revision['id']] return converters.from_revision(revision) def lookup_directory_with_revision(sha1_git, dir_path=None, with_data=False): """Return information on directory pointed by revision with sha1_git. If dir_path is not provided, display top level directory. Otherwise, display the directory pointed by dir_path (if it exists). Args: sha1_git: revision's hash. dir_path: optional directory pointed to by that revision. with_data: boolean that indicates to retrieve the raw data if the path resolves to a content. Default to False (for the api) Returns: Information on the directory pointed to by that revision. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc either if the revision is not found or the path referenced does not exist. NotImplementedError in case of dir_path exists but do not reference a type 'dir' or 'file'. """ sha1_git_bin = _to_sha1_bin(sha1_git) revision = _first_element(storage.revision_get([sha1_git_bin])) if not revision: raise NotFoundExc('Revision %s not found' % sha1_git) dir_sha1_git_bin = revision['directory'] if dir_path: paths = dir_path.strip(os.path.sep).split(os.path.sep) entity = storage.directory_entry_get_by_path( dir_sha1_git_bin, list(map(lambda p: p.encode('utf-8'), paths))) if not entity: raise NotFoundExc( "Directory or File '%s' pointed to by revision %s not found" % (dir_path, sha1_git)) else: entity = {'type': 'dir', 'target': dir_sha1_git_bin} if entity['type'] == 'dir': directory_entries = storage.directory_ls(entity['target']) or [] return {'type': 'dir', 'path': '.' if not dir_path else dir_path, 'revision': sha1_git, 'content': list(map(converters.from_directory_entry, directory_entries))} elif entity['type'] == 'file': # content content = _first_element( storage.content_find({'sha1_git': entity['target']})) if not content: raise NotFoundExc('Content not found for revision %s' % sha1_git) if with_data: c = _first_element(storage.content_get([content['sha1']])) content['data'] = c['data'] return {'type': 'file', 'path': '.' if not dir_path else dir_path, 'revision': sha1_git, 'content': converters.from_content(content)} elif entity['type'] == 'rev': # revision revision = next(storage.revision_get([entity['target']])) return {'type': 'rev', 'path': '.' if not dir_path else dir_path, 'revision': sha1_git, 'content': converters.from_revision(revision)} else: raise NotImplementedError('Entity of type %s not implemented.' % entity['type']) def lookup_content(q): """Lookup the content designed by q. Args: q: The release's sha1 as hexadecimal Raises: NotFoundExc if the requested content is not found """ algo, hash = query.parse_hash(q) c = _first_element(storage.content_find({algo: hash})) if not c: raise NotFoundExc('Content with %s checksum equals to %s not found!' % (algo, hashutil.hash_to_hex(hash))) return converters.from_content(c) def lookup_content_raw(q): """Lookup the content defined by q. Args: q: query string of the form Returns: dict with 'sha1' and 'data' keys. data representing its raw data decoded. Raises: NotFoundExc if the requested content is not found or if the content bytes are not available in the storage """ c = lookup_content(q) content_sha1_bytes = hashutil.hash_to_bytes(c['checksums']['sha1']) content = _first_element(storage.content_get([content_sha1_bytes])) if not content: algo, hash = query.parse_hash(q) raise NotFoundExc('Bytes of content with %s checksum equals to %s ' 'are not available!' % (algo, hashutil.hash_to_hex(hash))) return converters.from_content(content) def stat_counters(): """Return the stat counters for Software Heritage Returns: A dict mapping textual labels to integer values. """ return storage.stat_counters() -def _lookup_origin_visits(origin_id, last_visit=None, limit=10): +def _lookup_origin_visits(origin_url, last_visit=None, limit=10): """Yields the origin origin_ids' visits. Args: - origin_id (int): origin to list visits for + origin_url (str): origin to list visits for last_visit (int): last visit to lookup from limit (int): Number of elements max to display Yields: Dictionaries of origin_visit for that origin """ limit = min(limit, MAX_LIMIT) - yield from storage.origin_visit_get( - origin_id, last_visit=last_visit, limit=limit) + for visit in storage.origin_visit_get( + origin_url, last_visit=last_visit, limit=limit): + visit['origin'] = origin_url + yield visit def lookup_origin_visits(origin_id, last_visit=None, per_page=10): """Yields the origin origin_ids' visits. Args: origin_id: origin to list visits for Yields: Dictionaries of origin_visit for that origin """ - lookup_origin({'id': origin_id}) visits = _lookup_origin_visits(origin_id, last_visit=last_visit, limit=per_page) for visit in visits: yield converters.from_origin_visit(visit) -def lookup_origin_visit(origin_id, visit_id): +def lookup_origin_visit(origin_url, visit_id): """Return information about visit visit_id with origin origin_id. Args: - origin_id: origin concerned by the visit + origin (str): origin concerned by the visit visit_id: the visit identifier to lookup Yields: The dict origin_visit concerned """ - visit = storage.origin_visit_get_by(origin_id, visit_id) + visit = storage.origin_visit_get_by(origin_url, visit_id) if not visit: - raise NotFoundExc('Origin with id %s or its visit ' - 'with id %s not found!' % (origin_id, visit_id)) + raise NotFoundExc('Origin %s or its visit ' + 'with id %s not found!' % (origin_url, visit_id)) + visit['origin'] = origin_url return converters.from_origin_visit(visit) def lookup_snapshot_size(snapshot_id): """Count the number of branches in the snapshot with the given id Args: snapshot_id (str): sha1 identifier of the snapshot Returns: dict: A dict whose keys are the target types of branches and values their corresponding amount """ snapshot_id_bin = _to_sha1_bin(snapshot_id) snapshot_size = storage.snapshot_count_branches(snapshot_id_bin) if 'revision' not in snapshot_size: snapshot_size['revision'] = 0 if 'release' not in snapshot_size: snapshot_size['release'] = 0 return snapshot_size def lookup_snapshot(snapshot_id, branches_from='', branches_count=1000, target_types=None): """Return information about a snapshot, aka the list of named branches found during a specific visit of an origin. Args: snapshot_id (str): sha1 identifier of the snapshot branches_from (str): optional parameter used to skip branches whose name is lesser than it before returning them branches_count (int): optional parameter used to restrain the amount of returned branches target_types (list): optional parameter used to filter the target types of branch to return (possible values that can be contained in that list are `'content', 'directory', 'revision', 'release', 'snapshot', 'alias'`) Returns: A dict filled with the snapshot content. """ snapshot_id_bin = _to_sha1_bin(snapshot_id) snapshot = storage.snapshot_get_branches(snapshot_id_bin, branches_from.encode(), branches_count, target_types) if not snapshot: raise NotFoundExc('Snapshot with id %s not found!' % snapshot_id) return converters.from_snapshot(snapshot) def lookup_latest_origin_snapshot(origin_id, allowed_statuses=None): """Return information about the latest snapshot of an origin. .. warning:: At most 1000 branches contained in the snapshot will be returned for performance reasons. Args: origin_id: integer identifier of the origin allowed_statuses: list of visit statuses considered to find the latest snapshot for the visit. For instance, ``allowed_statuses=['full']`` will only consider visits that have successfully run to completion. Returns: A dict filled with the snapshot content. """ snapshot = storage.snapshot_get_latest(origin_id, allowed_statuses) return converters.from_snapshot(snapshot) def lookup_revision_through(revision, limit=100): """Retrieve a revision from the criterion stored in revision dictionary. Args: revision: Dictionary of criterion to lookup the revision with. Here are the supported combination of possible values: - origin_id, branch_name, ts, sha1_git - origin_id, branch_name, ts - sha1_git_root, sha1_git - sha1_git Returns: None if the revision is not found or the actual revision. """ if 'origin_id' in revision and \ 'branch_name' in revision and \ 'ts' in revision and \ 'sha1_git' in revision: return lookup_revision_with_context_by(revision['origin_id'], revision['branch_name'], revision['ts'], revision['sha1_git'], limit) if 'origin_id' in revision and \ 'branch_name' in revision and \ 'ts' in revision: return lookup_revision_by(revision['origin_id'], revision['branch_name'], revision['ts']) if 'sha1_git_root' in revision and \ 'sha1_git' in revision: return lookup_revision_with_context(revision['sha1_git_root'], revision['sha1_git'], limit) if 'sha1_git' in revision: return lookup_revision(revision['sha1_git']) # this should not happen raise NotImplementedError('Should not happen!') def lookup_directory_through_revision(revision, path=None, limit=100, with_data=False): """Retrieve the directory information from the revision. Args: revision: dictionary of criterion representing a revision to lookup path: directory's path to lookup. limit: optional query parameter to limit the revisions log (default to 100). For now, note that this limit could impede the transitivity conclusion about sha1_git not being an ancestor of. with_data: indicate to retrieve the content's raw data if path resolves to a content. Returns: The directory pointing to by the revision criterions at path. """ rev = lookup_revision_through(revision, limit) if not rev: raise NotFoundExc('Revision with criterion %s not found!' % revision) return (rev['id'], lookup_directory_with_revision(rev['id'], path, with_data)) def vault_cook(obj_type, obj_id, email=None): """Cook a vault bundle. """ return vault.cook(obj_type, obj_id, email=email) def vault_fetch(obj_type, obj_id): """Fetch a vault bundle. """ return vault.fetch(obj_type, obj_id) def vault_progress(obj_type, obj_id): """Get the current progress of a vault bundle. """ return vault.progress(obj_type, obj_id) def diff_revision(rev_id): """Get the list of file changes (insertion / deletion / modification / renaming) for a particular revision. """ rev_sha1_git_bin = _to_sha1_bin(rev_id) changes = storage.diff_revision(rev_sha1_git_bin, track_renaming=True) for change in changes: change['from'] = converters.from_directory_entry(change['from']) change['to'] = converters.from_directory_entry(change['to']) if change['from_path']: change['from_path'] = change['from_path'].decode('utf-8') if change['to_path']: change['to_path'] = change['to_path'].decode('utf-8') return changes class _RevisionsWalkerProxy(object): """ Proxy class wrapping a revisions walker iterator from swh-storage and performing needed conversions. """ def __init__(self, rev_walker_type, rev_start, *args, **kwargs): rev_start_bin = hashutil.hash_to_bytes(rev_start) self.revisions_walker = \ revisions_walker.get_revisions_walker(rev_walker_type, storage, rev_start_bin, *args, **kwargs) def export_state(self): return self.revisions_walker.export_state() def __next__(self): return converters.from_revision(next(self.revisions_walker)) def __iter__(self): return self def get_revisions_walker(rev_walker_type, rev_start, *args, **kwargs): """ Utility function to instantiate a revisions walker of a given type, see :mod:`swh.storage.algos.revisions_walker`. Args: rev_walker_type (str): the type of revisions walker to return, possible values are: ``committer_date``, ``dfs``, ``dfs_post``, ``bfs`` and ``path`` rev_start (str): hexadecimal representation of a revision identifier args (list): position arguments to pass to the revisions walker constructor kwargs (dict): keyword arguments to pass to the revisions walker constructor """ # first check if the provided revision is valid lookup_revision(rev_start) return _RevisionsWalkerProxy(rev_walker_type, rev_start, *args, **kwargs) diff --git a/swh/web/tests/api/views/test_origin.py b/swh/web/tests/api/views/test_origin.py index 8e2a61f2..bab2fd11 100644 --- a/swh/web/tests/api/views/test_origin.py +++ b/swh/web/tests/api/views/test_origin.py @@ -1,408 +1,525 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random from hypothesis import given from rest_framework.test import APITestCase from unittest.mock import patch from swh.storage.exc import StorageDBError, StorageAPIError from swh.web.common.utils import reverse from swh.web.common.origin_visits import get_origin_visits from swh.web.tests.strategies import ( origin, new_origin, new_origins, visit_dates, new_snapshots ) from swh.web.tests.testcase import WebTestCase class OriginApiTestCase(WebTestCase, APITestCase): @patch('swh.web.api.views.origin.get_origin_visits') def test_api_lookup_origin_visits_raise_error( self, mock_get_origin_visits, ): err_msg = 'voluntary error to check the bad request middleware.' mock_get_origin_visits.side_effect = ValueError(err_msg) - url = reverse('api-1-origin-visits', url_args={'origin_id': 2}) + url = reverse( + 'api-1-origin-visits', url_args={'origin_url': 'http://foo'}) rv = self.client.get(url) self.assertEqual(rv.status_code, 400, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'ValueError', 'reason': err_msg}) @patch('swh.web.api.views.origin.get_origin_visits') def test_api_lookup_origin_visits_raise_swh_storage_error_db( self, mock_get_origin_visits): err_msg = 'Storage exploded! Will be back online shortly!' mock_get_origin_visits.side_effect = StorageDBError(err_msg) - url = reverse('api-1-origin-visits', url_args={'origin_id': 2}) + url = reverse( + 'api-1-origin-visits', url_args={'origin_url': 'http://foo'}) rv = self.client.get(url) self.assertEqual(rv.status_code, 503, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'StorageDBError', 'reason': 'An unexpected error occurred in the backend: %s' % err_msg}) @patch('swh.web.api.views.origin.get_origin_visits') def test_api_lookup_origin_visits_raise_swh_storage_error_api( self, mock_get_origin_visits): err_msg = 'Storage API dropped dead! Will resurrect asap!' mock_get_origin_visits.side_effect = StorageAPIError(err_msg) - url = reverse('api-1-origin-visits', url_args={'origin_id': 2}) + url = reverse( + 'api-1-origin-visits', url_args={'origin_url': 'http://foo'}) rv = self.client.get(url) self.assertEqual(rv.status_code, 503, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'StorageAPIError', 'reason': 'An unexpected error occurred in the api backend: %s' % err_msg }) @given(new_origin(), visit_dates(3), new_snapshots(3)) def test_api_lookup_origin_visits(self, new_origin, visit_dates, new_snapshots): origin_id = self.storage.origin_add_one(new_origin) new_origin['id'] = origin_id for i, visit_date in enumerate(visit_dates): origin_visit = self.storage.origin_visit_add(origin_id, visit_date) self.storage.snapshot_add(origin_id, origin_visit['visit'], new_snapshots[i]) all_visits = list(reversed(get_origin_visits(new_origin))) for last_visit, expected_visits in ( (None, all_visits[:2]), (all_visits[1]['visit'], all_visits[2:4])): url = reverse('api-1-origin-visits', - url_args={'origin_id': origin_id}, + url_args={'origin_url': new_origin['url']}, query_params={'per_page': 2, 'last_visit': last_visit}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') for expected_visit in expected_visits: origin_visit_url = reverse( 'api-1-origin-visit', - url_args={'origin_id': origin_id, + url_args={'origin_url': new_origin['url'], 'visit_id': expected_visit['visit']}) snapshot_url = reverse( 'api-1-snapshot', url_args={'snapshot_id': expected_visit['snapshot']}) + expected_visit['origin'] = new_origin['url'] + expected_visit['origin_visit_url'] = origin_visit_url + expected_visit['snapshot_url'] = snapshot_url + + self.assertEqual(rv.data, expected_visits) + + @given(new_origin(), visit_dates(3), new_snapshots(3)) + def test_api_lookup_origin_visits_by_id(self, new_origin, visit_dates, + new_snapshots): + + origin_id = self.storage.origin_add_one(new_origin) + new_origin['id'] = origin_id + for i, visit_date in enumerate(visit_dates): + origin_visit = self.storage.origin_visit_add(origin_id, visit_date) + self.storage.snapshot_add(origin_id, origin_visit['visit'], + new_snapshots[i]) + + all_visits = list(reversed(get_origin_visits(new_origin))) + + for last_visit, expected_visits in ( + (None, all_visits[:2]), + (all_visits[1]['visit'], all_visits[2:4])): + + url = reverse('api-1-origin-visits', + url_args={'origin_url': new_origin['url']}, + query_params={'per_page': 2, + 'last_visit': last_visit}) + + rv = self.client.get(url) + + self.assertEqual(rv.status_code, 200, rv.data) + self.assertEqual(rv['Content-Type'], 'application/json') + + for expected_visit in expected_visits: + origin_visit_url = reverse( + 'api-1-origin-visit', + url_args={'origin_url': new_origin['url'], + 'visit_id': expected_visit['visit']}) + snapshot_url = reverse( + 'api-1-snapshot', + url_args={'snapshot_id': expected_visit['snapshot']}) + expected_visit['origin'] = new_origin['url'] expected_visit['origin_visit_url'] = origin_visit_url expected_visit['snapshot_url'] = snapshot_url self.assertEqual(rv.data, expected_visits) @given(new_origin(), visit_dates(3), new_snapshots(3)) def test_api_lookup_origin_visit(self, new_origin, visit_dates, new_snapshots): + origin_id = self.storage.origin_add_one(new_origin) + new_origin['id'] = origin_id + for i, visit_date in enumerate(visit_dates): + origin_visit = self.storage.origin_visit_add(origin_id, visit_date) + visit_id = origin_visit['visit'] + self.storage.snapshot_add(origin_id, origin_visit['visit'], + new_snapshots[i]) + url = reverse('api-1-origin-visit', + url_args={'origin_url': new_origin['url'], + 'visit_id': visit_id}) + + rv = self.client.get(url) + self.assertEqual(rv.status_code, 200, rv.data) + self.assertEqual(rv['Content-Type'], 'application/json') + + expected_visit = self.origin_visit_get_by(origin_id, visit_id) + + origin_url = reverse('api-1-origin', + url_args={'origin_url': new_origin['url']}) + snapshot_url = reverse( + 'api-1-snapshot', + url_args={'snapshot_id': expected_visit['snapshot']}) + + expected_visit['origin'] = new_origin['url'] + expected_visit['origin_url'] = origin_url + expected_visit['snapshot_url'] = snapshot_url + + self.assertEqual(rv.data, expected_visit) + + @given(new_origin(), visit_dates(3), new_snapshots(3)) + def test_api_lookup_origin_visit_by_id(self, new_origin, visit_dates, + new_snapshots): + origin_id = self.storage.origin_add_one(new_origin) new_origin['id'] = origin_id for i, visit_date in enumerate(visit_dates): origin_visit = self.storage.origin_visit_add(origin_id, visit_date) visit_id = origin_visit['visit'] self.storage.snapshot_add(origin_id, origin_visit['visit'], new_snapshots[i]) url = reverse('api-1-origin-visit', url_args={'origin_id': origin_id, 'visit_id': visit_id}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') expected_visit = self.origin_visit_get_by(origin_id, visit_id) origin_url = reverse('api-1-origin', - url_args={'origin_id': origin_id}) + url_args={'origin_url': new_origin['url']}) snapshot_url = reverse( 'api-1-snapshot', url_args={'snapshot_id': expected_visit['snapshot']}) + expected_visit['origin'] = new_origin['url'] expected_visit['origin_url'] = origin_url expected_visit['snapshot_url'] = snapshot_url self.assertEqual(rv.data, expected_visit) @given(origin()) def test_api_lookup_origin_visit_not_found(self, origin): all_visits = list(reversed(get_origin_visits(origin))) max_visit_id = max([v['visit'] for v in all_visits]) + url = reverse('api-1-origin-visit', + url_args={'origin_url': origin['url'], + 'visit_id': max_visit_id + 1}) + + rv = self.client.get(url) + + self.assertEqual(rv.status_code, 404, rv.data) + self.assertEqual(rv['Content-Type'], 'application/json') + self.assertEqual(rv.data, { + 'exception': 'NotFoundExc', + 'reason': 'Origin %s or its visit with id %s not found!' % + (origin['url'], max_visit_id+1) + }) + + @given(origin()) + def test_api_lookup_origin_visit_not_found_by_id(self, origin): + + all_visits = list(reversed(get_origin_visits(origin))) + + max_visit_id = max([v['visit'] for v in all_visits]) + url = reverse('api-1-origin-visit', url_args={'origin_id': origin['id'], 'visit_id': max_visit_id + 1}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', - 'reason': 'Origin with id %s or its visit with id %s not found!' % - (origin['id'], max_visit_id+1) + 'reason': 'Origin %s or its visit with id %s not found!' % + (origin['url'], max_visit_id+1) }) @given(origin()) def test_api_origin_by_id(self, origin): url = reverse('api-1-origin', url_args={'origin_id': origin['id']}) rv = self.client.get(url) expected_origin = self.origin_get(origin) origin_visits_url = reverse('api-1-origin-visits', - url_args={'origin_id': origin['id']}) + url_args={'origin_url': origin['url']}) + + expected_origin['origin_visits_url'] = origin_visits_url + + self.assertEqual(rv.status_code, 200, rv.data) + self.assertEqual(rv['Content-Type'], 'application/json') + self.assertEqual(rv.data, expected_origin) + + @given(origin()) + def test_api_origin_by_url(self, origin): + + url = reverse('api-1-origin', + url_args={'origin_url': origin['url']}) + rv = self.client.get(url) + + expected_origin = self.origin_get(origin) + + origin_visits_url = reverse('api-1-origin-visits', + url_args={'origin_url': origin['url']}) expected_origin['origin_visits_url'] = origin_visits_url self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_origin) @given(origin()) def test_api_origin_by_type_url(self, origin): url = reverse('api-1-origin', url_args={'origin_type': origin['type'], 'origin_url': origin['url']}) rv = self.client.get(url) expected_origin = self.origin_get(origin) origin_visits_url = reverse('api-1-origin-visits', - url_args={'origin_id': origin['id']}) + url_args={'origin_url': origin['url']}) expected_origin['origin_visits_url'] = origin_visits_url self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_origin) @given(new_origin()) def test_api_origin_not_found(self, new_origin): url = reverse('api-1-origin', url_args={'origin_type': new_origin['type'], 'origin_url': new_origin['url']}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', - 'reason': 'Origin with type %s and url %s not found!' % - (new_origin['type'], new_origin['url']) + 'reason': 'Origin %s not found!' % new_origin['url'] }) @given(origin()) def test_api_origin_metadata_search(self, origin): with patch('swh.web.common.service.idx_storage') as mock_idx_storage: mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ .side_effect = lambda conjunction, limit: [{ 'from_revision': ( b'p&\xb7\xc1\xa2\xafVR\x1e\x95\x1c\x01\xed ' b'\xf2U\xfa\x05B8'), 'metadata': {'author': 'Jane Doe'}, 'id': origin['id'], 'tool': { 'configuration': { 'context': ['NpmMapping', 'CodemetaMapping'], 'type': 'local' }, 'id': 3, 'name': 'swh-metadata-detector', 'version': '0.0.1' } }] url = reverse('api-1-origin-metadata-search', query_params={'fulltext': 'Jane Doe'}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200, rv.content) self.assertEqual(rv['Content-Type'], 'application/json') expected_data = [{ 'id': origin['id'], 'type': origin['type'], 'url': origin['url'], 'metadata': { 'metadata': {'author': 'Jane Doe'}, 'from_revision': ( '7026b7c1a2af56521e951c01ed20f255fa054238'), 'tool': { 'configuration': { 'context': ['NpmMapping', 'CodemetaMapping'], 'type': 'local' }, 'id': 3, 'name': 'swh-metadata-detector', 'version': '0.0.1', } } }] self.assertEqual(rv.data, expected_data) mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ .assert_called_with(conjunction=['Jane Doe'], limit=70) @given(origin()) def test_api_origin_metadata_search_limit(self, origin): with patch('swh.web.common.service.idx_storage') as mock_idx_storage: mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ .side_effect = lambda conjunction, limit: [{ 'from_revision': ( b'p&\xb7\xc1\xa2\xafVR\x1e\x95\x1c\x01\xed ' b'\xf2U\xfa\x05B8'), 'metadata': {'author': 'Jane Doe'}, 'id': origin['id'], 'tool': { 'configuration': { 'context': ['NpmMapping', 'CodemetaMapping'], 'type': 'local' }, 'id': 3, 'name': 'swh-metadata-detector', 'version': '0.0.1' } }] url = reverse('api-1-origin-metadata-search', query_params={'fulltext': 'Jane Doe'}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200, rv.content) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(len(rv.data), 1) mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ .assert_called_with(conjunction=['Jane Doe'], limit=70) url = reverse('api-1-origin-metadata-search', query_params={'fulltext': 'Jane Doe', 'limit': 10}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200, rv.content) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(len(rv.data), 1) mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ .assert_called_with(conjunction=['Jane Doe'], limit=10) url = reverse('api-1-origin-metadata-search', query_params={'fulltext': 'Jane Doe', 'limit': 987}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200, rv.content) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(len(rv.data), 1) mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ .assert_called_with(conjunction=['Jane Doe'], limit=100) @given(origin()) def test_api_origin_intrinsic_metadata(self, origin): with patch('swh.web.common.service.idx_storage') as mock_idx_storage: mock_idx_storage.origin_intrinsic_metadata_get \ .side_effect = lambda origin_ids: [{ 'from_revision': ( b'p&\xb7\xc1\xa2\xafVR\x1e\x95\x1c\x01\xed ' b'\xf2U\xfa\x05B8'), 'metadata': {'author': 'Jane Doe'}, 'id': origin['id'], 'tool': { 'configuration': { 'context': ['NpmMapping', 'CodemetaMapping'], 'type': 'local' }, 'id': 3, 'name': 'swh-metadata-detector', 'version': '0.0.1' } }] url = reverse('api-origin-intrinsic-metadata', url_args={'origin_type': origin['type'], 'origin_url': origin['url']}) rv = self.client.get(url) mock_idx_storage.origin_intrinsic_metadata_get \ .assert_called_once_with([origin['id']]) self.assertEqual(rv.status_code, 200, rv.content) self.assertEqual(rv['Content-Type'], 'application/json') expected_data = {'author': 'Jane Doe'} self.assertEqual(rv.data, expected_data) @patch('swh.web.common.service.idx_storage') def test_api_origin_metadata_search_invalid(self, mock_idx_storage): url = reverse('api-1-origin-metadata-search') rv = self.client.get(url) self.assertEqual(rv.status_code, 400, rv.content) mock_idx_storage.assert_not_called() @given(new_origins(10)) def test_api_lookup_origins(self, new_origins): nb_origins = len(new_origins) expected_origins = self.storage.origin_add(new_origins) origin_from_idx = random.randint(1, nb_origins-1) - 1 origin_from = expected_origins[origin_from_idx]['id'] max_origin_id = expected_origins[-1]['id'] origin_count = random.randint(1, max_origin_id - origin_from) url = reverse('api-1-origins', query_params={'origin_from': origin_from, 'origin_count': origin_count}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200, rv.data) start = origin_from_idx end = origin_from_idx + origin_count expected_origins = expected_origins[start:end] for expected_origin in expected_origins: expected_origin['origin_visits_url'] = reverse( 'api-1-origin-visits', - url_args={'origin_id': expected_origin['id']}) + url_args={'origin_url': expected_origin['url']}) self.assertEqual(rv.data, expected_origins) next_origin_id = expected_origins[-1]['id']+1 if self.storage.origin_get({'id': next_origin_id}): self.assertIn('Link', rv) next_url = reverse('api-1-origins', query_params={'origin_from': next_origin_id, 'origin_count': origin_count}) self.assertIn(next_url, rv['Link']) diff --git a/swh/web/tests/api/views/test_revision.py b/swh/web/tests/api/views/test_revision.py index fb45b282..8703732c 100644 --- a/swh/web/tests/api/views/test_revision.py +++ b/swh/web/tests/api/views/test_revision.py @@ -1,539 +1,539 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random from hypothesis import given from rest_framework.test import APITestCase from unittest.mock import patch from swh.model.hashutil import hash_to_hex from swh.web.common.exc import NotFoundExc from swh.web.common.utils import reverse, parse_timestamp from swh.web.tests.data import random_sha1 from swh.web.tests.strategies import ( revision, new_revision, origin, origin_with_multiple_visits ) from swh.web.tests.testcase import WebTestCase class RevisionApiTestCase(WebTestCase, APITestCase): @given(revision()) def test_api_revision(self, revision): url = reverse('api-1-revision', url_args={'sha1_git': revision}) rv = self.client.get(url) expected_revision = self.revision_get(revision) self._enrich_revision(expected_revision) self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_revision) def test_api_revision_not_found(self): unknown_revision_ = random_sha1() url = reverse('api-1-revision', url_args={'sha1_git': unknown_revision_}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', 'reason': 'Revision with sha1_git %s not found.' % unknown_revision_}) @given(revision()) def test_api_revision_raw_ok(self, revision): url = reverse('api-1-revision-raw-message', url_args={'sha1_git': revision}) rv = self.client.get(url) expected_message = self.revision_get(revision)['message'] self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/octet-stream') self.assertEqual(rv.content, expected_message.encode()) @given(new_revision()) def test_api_revision_raw_ok_no_msg(self, new_revision): del new_revision['message'] self.storage.revision_add([new_revision]) new_revision_id = hash_to_hex(new_revision['id']) url = reverse('api-1-revision-raw-message', url_args={'sha1_git': new_revision_id}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', 'reason': 'No message for revision with sha1_git %s.' % new_revision_id}) def test_api_revision_raw_ko_no_rev(self): unknown_revision_ = random_sha1() url = reverse('api-1-revision-raw-message', url_args={'sha1_git': unknown_revision_}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', 'reason': 'Revision with sha1_git %s not found.' % unknown_revision_}) def test_api_revision_with_origin_not_found(self): unknown_origin_id_ = random.randint(1000, 1000000) url = reverse('api-1-revision-origin', url_args={'origin_id': unknown_origin_id_}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', - 'reason': 'Origin with id %s not found!' % + 'reason': 'Origin %s not found!' % unknown_origin_id_}) @given(origin()) def test_api_revision_with_origin(self, origin): url = reverse('api-1-revision-origin', url_args={'origin_id': origin['id']}) rv = self.client.get(url) snapshot = self.snapshot_get_latest(origin['id']) expected_revision = self.revision_get( snapshot['branches']['HEAD']['target']) self._enrich_revision(expected_revision) self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_revision) @given(origin()) def test_api_revision_with_origin_and_branch_name(self, origin): snapshot = self.snapshot_get_latest(origin['id']) branch_name = random.choice( list(b for b in snapshot['branches'].keys() if snapshot['branches'][b]['target_type'] == 'revision')) url = reverse('api-1-revision-origin', url_args={'origin_id': origin['id'], 'branch_name': branch_name}) rv = self.client.get(url) expected_revision = self.revision_get( snapshot['branches'][branch_name]['target']) self._enrich_revision(expected_revision) self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_revision) @given(origin_with_multiple_visits()) def test_api_revision_with_origin_and_branch_name_and_ts(self, origin): visit = random.choice(self.origin_visit_get(origin['id'])) snapshot = self.snapshot_get(visit['snapshot']) branch_name = random.choice( list(b for b in snapshot['branches'].keys() if snapshot['branches'][b]['target_type'] == 'revision')) url = reverse('api-1-revision-origin', url_args={'origin_id': origin['id'], 'branch_name': branch_name, 'ts': visit['date']}) rv = self.client.get(url) expected_revision = self.revision_get( snapshot['branches'][branch_name]['target']) self._enrich_revision(expected_revision) self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_revision) @given(origin_with_multiple_visits()) def test_api_revision_with_origin_and_branch_name_and_ts_escapes(self, origin): visit = random.choice(self.origin_visit_get(origin['id'])) snapshot = self.snapshot_get(visit['snapshot']) branch_name = random.choice( list(b for b in snapshot['branches'].keys() if snapshot['branches'][b]['target_type'] == 'revision')) date = parse_timestamp(visit['date']) formatted_date = date.strftime('Today is %B %d, %Y at %X') url = reverse('api-1-revision-origin', url_args={'origin_id': origin['id'], 'branch_name': branch_name, 'ts': formatted_date}) rv = self.client.get(url) expected_revision = self.revision_get( snapshot['branches'][branch_name]['target']) self._enrich_revision(expected_revision) self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_revision) def test_api_directory_through_revision_origin_ko(self): unknown_origin_id_ = random.randint(1000, 1000000) url = reverse('api-1-revision-origin-directory', url_args={'origin_id': unknown_origin_id_}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', - 'reason': 'Origin with id %s not found!' % + 'reason': 'Origin %s not found!' % unknown_origin_id_ }) @given(origin()) def test_api_directory_through_revision_origin(self, origin): url = reverse('api-1-revision-origin-directory', url_args={'origin_id': origin['id']}) rv = self.client.get(url) snapshot = self.snapshot_get_latest(origin['id']) revision_id = snapshot['branches']['HEAD']['target'] revision = self.revision_get(revision_id) directory = self.directory_ls(revision['directory']) for entry in directory: if entry['type'] == 'dir': entry['target_url'] = reverse( 'api-1-directory', url_args={'sha1_git': entry['target']} ) entry['dir_url'] = reverse( 'api-1-revision-origin-directory', url_args={'origin_id': origin['id'], 'path': entry['name']}) elif entry['type'] == 'file': entry['target_url'] = reverse( 'api-1-content', url_args={'q': 'sha1_git:%s' % entry['target']} ) entry['file_url'] = reverse( 'api-1-revision-origin-directory', url_args={'origin_id': origin['id'], 'path': entry['name']}) elif entry['type'] == 'rev': entry['target_url'] = reverse( 'api-1-revision', url_args={'sha1_git': entry['target']} ) entry['rev_url'] = reverse( 'api-1-revision-origin-directory', url_args={'origin_id': origin['id'], 'path': entry['name']}) expected_result = { 'content': directory, 'path': '.', 'revision': revision_id, 'type': 'dir' } self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_result) @given(revision()) def test_api_revision_log(self, revision): per_page = 10 url = reverse('api-1-revision-log', url_args={'sha1_git': revision}, query_params={'per_page': per_page}) rv = self.client.get(url) expected_log = self.revision_log(revision, limit=per_page+1) expected_log = list(map(self._enrich_revision, expected_log)) has_next = len(expected_log) > per_page self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_log[:-1] if has_next else expected_log) if has_next: self.assertIn('Link', rv) next_log_url = reverse( 'api-1-revision-log', url_args={'sha1_git': expected_log[-1]['id']}, query_params={'per_page': per_page}) self.assertIn(next_log_url, rv['Link']) def test_api_revision_log_not_found(self): unknown_revision_ = random_sha1() url = reverse('api-1-revision-log', url_args={'sha1_git': unknown_revision_}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', 'reason': 'Revision with sha1_git %s not found.' % unknown_revision_}) self.assertFalse(rv.has_header('Link')) @given(revision()) def test_api_revision_log_context(self, revision): revisions = self.revision_log(revision, limit=4) prev_rev = revisions[0]['id'] rev = revisions[-1]['id'] per_page = 10 url = reverse('api-1-revision-log', url_args={'sha1_git': rev, 'prev_sha1s': prev_rev}, query_params={'per_page': per_page}) rv = self.client.get(url) expected_log = self.revision_log(rev, limit=per_page) prev_revision = self.revision_get(prev_rev) expected_log.insert(0, prev_revision) expected_log = list(map(self._enrich_revision, expected_log)) self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_log) @given(origin()) def test_api_revision_log_by(self, origin): per_page = 10 url = reverse('api-1-revision-origin-log', url_args={'origin_id': origin['id']}, query_params={'per_page': per_page}) rv = self.client.get(url) snapshot = self.snapshot_get_latest(origin['id']) expected_log = self.revision_log( snapshot['branches']['HEAD']['target'], limit=per_page+1) expected_log = list(map(self._enrich_revision, expected_log)) has_next = len(expected_log) > per_page self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_log[:-1] if has_next else expected_log) if has_next: self.assertIn('Link', rv) next_log_url = reverse( 'api-1-revision-origin-log', url_args={'origin_id': origin['id'], 'branch_name': 'HEAD'}, query_params={'per_page': per_page, 'sha1_git': expected_log[-1]['id']}) self.assertIn(next_log_url, rv['Link']) @given(origin()) def test_api_revision_log_by_ko(self, origin): invalid_branch_name = 'foobar' url = reverse('api-1-revision-origin-log', url_args={'origin_id': origin['id'], 'branch_name': invalid_branch_name}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertFalse(rv.has_header('Link')) self.assertEqual( rv.data, {'exception': 'NotFoundExc', 'reason': 'Revision for origin %s and branch %s not found.' % (origin['id'], invalid_branch_name)}) @patch('swh.web.api.views.revision._revision_directory_by') def test_api_revision_directory_ko_not_found(self, mock_rev_dir): # given mock_rev_dir.side_effect = NotFoundExc('Not found') # then rv = self.client.get('/api/1/revision/999/directory/some/path/to/dir/') self.assertEqual(rv.status_code, 404, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', 'reason': 'Not found'}) mock_rev_dir.assert_called_once_with( {'sha1_git': '999'}, 'some/path/to/dir', '/api/1/revision/999/directory/some/path/to/dir/', with_data=False) @patch('swh.web.api.views.revision._revision_directory_by') def test_api_revision_directory_ok_returns_dir_entries(self, mock_rev_dir): stub_dir = { 'type': 'dir', 'revision': '999', 'content': [ { 'sha1_git': '789', 'type': 'file', 'target': '101', 'target_url': '/api/1/content/sha1_git:101/', 'name': 'somefile', 'file_url': '/api/1/revision/999/directory/some/path/' 'somefile/' }, { 'sha1_git': '123', 'type': 'dir', 'target': '456', 'target_url': '/api/1/directory/456/', 'name': 'to-subdir', 'dir_url': '/api/1/revision/999/directory/some/path/' 'to-subdir/', }] } # given mock_rev_dir.return_value = stub_dir # then rv = self.client.get('/api/1/revision/999/directory/some/path/') self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, stub_dir) mock_rev_dir.assert_called_once_with( {'sha1_git': '999'}, 'some/path', '/api/1/revision/999/directory/some/path/', with_data=False) @patch('swh.web.api.views.revision._revision_directory_by') def test_api_revision_directory_ok_returns_content(self, mock_rev_dir): stub_content = { 'type': 'file', 'revision': '999', 'content': { 'sha1_git': '789', 'sha1': '101', 'data_url': '/api/1/content/101/raw/', } } # given mock_rev_dir.return_value = stub_content # then url = '/api/1/revision/666/directory/some/other/path/' rv = self.client.get(url) self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, stub_content) mock_rev_dir.assert_called_once_with( {'sha1_git': '666'}, 'some/other/path', url, with_data=False) def _enrich_revision(self, revision): author_url = reverse( 'api-1-person', url_args={'person_id': revision['author']['id']}) committer_url = reverse( 'api-1-person', url_args={'person_id': revision['committer']['id']}) directory_url = reverse( 'api-1-directory', url_args={'sha1_git': revision['directory']}) history_url = reverse('api-1-revision-log', url_args={'sha1_git': revision['id']}) parents_id_url = [] for p in revision['parents']: parents_id_url.append({ 'id': p, 'url': reverse('api-1-revision', url_args={'sha1_git': p}) }) revision_url = reverse('api-1-revision', url_args={'sha1_git': revision['id']}) revision['author_url'] = author_url revision['committer_url'] = committer_url revision['directory_url'] = directory_url revision['history_url'] = history_url revision['url'] = revision_url revision['parents'] = parents_id_url return revision @given(revision()) def test_api_revision_uppercase(self, revision): url = reverse('api-1-revision-uppercase-checksum', url_args={'sha1_git': revision.upper()}) resp = self.client.get(url) self.assertEqual(resp.status_code, 302) redirect_url = reverse('api-1-revision', url_args={'sha1_git': revision}) self.assertEqual(resp['location'], redirect_url) diff --git a/swh/web/tests/common/test_origin_visits.py b/swh/web/tests/common/test_origin_visits.py index 0194dfba..39df41e6 100644 --- a/swh/web/tests/common/test_origin_visits.py +++ b/swh/web/tests/common/test_origin_visits.py @@ -1,115 +1,114 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from unittest.mock import patch from swh.web.common.exc import NotFoundExc from swh.web.common.origin_visits import ( get_origin_visits, get_origin_visit ) from swh.web.tests.testcase import WebTestCase class OriginVisitsTestCase(WebTestCase): @patch('swh.web.common.service') def test_get_origin_visits(self, mock_service): mock_service.MAX_LIMIT = 2 def _lookup_origin_visits(*args, **kwargs): if kwargs['last_visit'] is None: return [{'visit': 1, 'date': '2017-05-06T00:59:10+00:00', 'metadata': {}}, {'visit': 2, 'date': '2017-08-06T00:59:10+00:00', 'metadata': {}} ] else: return [{'visit': 3, 'date': '2017-09-06T00:59:10+00:00', 'metadata': {}} ] mock_service.lookup_origin_visits.side_effect = _lookup_origin_visits origin_info = { 'id': 1, 'type': 'git', 'url': 'https://github.com/foo/bar', } origin_visits = get_origin_visits(origin_info) self.assertEqual(len(origin_visits), 3) @patch('swh.web.common.origin_visits.get_origin_visits') def test_get_origin_visit(self, mock_origin_visits): origin_info = { 'id': 2, 'type': 'git', 'url': 'https://github.com/foo/bar', } visits = \ [{'status': 'full', 'date': '2015-07-09T21:09:24+00:00', 'visit': 1, 'origin': origin_info['id']}, {'status': 'full', 'date': '2016-02-23T18:05:23.312045+00:00', 'visit': 2, 'origin': origin_info['id']}, {'status': 'full', 'date': '2016-03-28T01:35:06.554111+00:00', 'visit': 3, 'origin': origin_info['id']}, {'status': 'full', 'date': '2016-06-18T01:22:24.808485+00:00', 'visit': 4, 'origin': origin_info['id']}, {'status': 'full', 'date': '2016-08-14T12:10:00.536702+00:00', 'visit': 5, 'origin': origin_info['id']}] mock_origin_visits.return_value = visits visit_id = 12 with self.assertRaises(NotFoundExc) as cm: visit = get_origin_visit(origin_info, visit_id=visit_id) exception_text = cm.exception.args[0] self.assertIn('Visit with id %s' % visit_id, exception_text) - self.assertIn('type %s' % origin_info['type'], exception_text) self.assertIn('url %s' % origin_info['url'], exception_text) visit = get_origin_visit(origin_info, visit_id=2) self.assertEqual(visit, visits[1]) visit = get_origin_visit( origin_info, visit_ts='2016-02-23T18:05:23.312045+00:00') self.assertEqual(visit, visits[1]) visit = get_origin_visit( origin_info, visit_ts='2016-02-20') self.assertEqual(visit, visits[1]) visit = get_origin_visit( origin_info, visit_ts='2016-06-18T01:22') self.assertEqual(visit, visits[3]) visit = get_origin_visit( origin_info, visit_ts='2016-06-18 01:22') self.assertEqual(visit, visits[3]) visit = get_origin_visit( origin_info, visit_ts=1466208000) self.assertEqual(visit, visits[3]) visit = get_origin_visit( origin_info, visit_ts='2014-01-01') self.assertEqual(visit, visits[0]) visit = get_origin_visit( origin_info, visit_ts='2018-01-01') self.assertEqual(visit, visits[-1])