diff --git a/pytest.ini b/pytest.ini index 7e930e42..79f20fad 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,5 +1,7 @@ [pytest] norecursedirs = docs node_modules .tox DJANGO_SETTINGS_MODULE = swh.web.settings.tests filterwarnings = ignore::django.utils.deprecation.RemovedInDjango20Warning +markers = + origin_id: execute tests using an origin id (deselect with '-m "not origin_id"') diff --git a/swh/web/api/views/origin.py b/swh/web/api/views/origin.py index 30029138..f536610c 100644 --- a/swh/web/api/views/origin.py +++ b/swh/web/api/views/origin.py @@ -1,595 +1,595 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from distutils.util import strtobool from functools import partial from swh.web.common import service from swh.web.common.exc import BadInputExc from swh.web.common.origin_visits import get_origin_visits from swh.web.common.utils import reverse from swh.web.api.apidoc import api_doc, format_docstring from swh.web.api.apiurls import api_route from swh.web.api.views.utils import api_lookup DOC_RETURN_ORIGIN = ''' :>json string origin_visits_url: link to in order to get information about the visits for that origin :>json string url: the origin canonical url :>json string type: the type of software origin (deprecated value; types are now associated to visits instead of origins) :>json number id: the origin unique identifier (deprecated value; you should only refer to origins based on their URL) ''' DOC_RETURN_ORIGIN_ARRAY = \ DOC_RETURN_ORIGIN.replace(':>json', ':>jsonarr') DOC_RETURN_ORIGIN_VISIT = ''' :>json string date: ISO representation of the visit date (in UTC) :>json str origin: the origin canonical url :>json string origin_url: link to get information about the origin :>jsonarr string snapshot: the snapshot identifier of the visit :>jsonarr string snapshot_url: link to :http:get:`/api/1/snapshot/(snapshot_id)/` in order to get information about the snapshot of the visit :>json string status: status of the visit (either **full**, **partial** or **ongoing**) :>json number visit: the unique identifier of the visit ''' DOC_RETURN_ORIGIN_VISIT_ARRAY = \ DOC_RETURN_ORIGIN_VISIT.replace(':>json', ':>jsonarr') DOC_RETURN_ORIGIN_VISIT_ARRAY += ''' :>jsonarr number id: the unique identifier of the origin :>jsonarr string origin_visit_url: link to :http:get:`/api/1/origin/(origin_url)/visit/(visit_id)/` in order to get information about the visit ''' def _enrich_origin(origin): - if 'id' in origin: + if 'url' in origin: o = origin.copy() o['origin_visits_url'] = reverse( 'api-1-origin-visits', url_args={'origin_url': origin['url']}) return o return origin def _enrich_origin_visit(origin_visit, *, with_origin_link, with_origin_visit_link): ov = origin_visit.copy() if with_origin_link: ov['origin_url'] = reverse('api-1-origin', url_args={'origin_url': ov['origin']}) if with_origin_visit_link: ov['origin_visit_url'] = reverse('api-1-origin-visit', url_args={'origin_url': ov['origin'], 'visit_id': ov['visit']}) snapshot = ov['snapshot'] if snapshot: ov['snapshot_url'] = reverse('api-1-snapshot', url_args={'snapshot_id': snapshot}) else: ov['snapshot_url'] = None return ov @api_route(r'/origins/', 'api-1-origins') @api_doc('/origins/', noargs=True) @format_docstring(return_origin_array=DOC_RETURN_ORIGIN_ARRAY) def api_origins(request): """ .. http:get:: /api/1/origins/ Get list of archived software origins. Origins are sorted by ids before returning them. :query int origin_from: The first origin id that will be included in returned results (default to 1) :query int origin_count: The maximum number of origins to return (default to 100, can not exceed 10000) {return_origin_array} {common_headers} {resheader_link} **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, :http:method:`options` :statuscode 200: no error **Example:** .. parsed-literal:: :swh_web_api:`origins?origin_from=50000&origin_count=500` """ origin_from = int(request.query_params.get('origin_from', '1')) origin_count = int(request.query_params.get('origin_count', '100')) origin_count = min(origin_count, 10000) results = api_lookup( service.lookup_origins, origin_from, origin_count+1, enrich_fn=_enrich_origin) response = {'results': results, 'headers': {}} if len(results) > origin_count: origin_from = results.pop()['id'] response['headers']['link-next'] = reverse( 'api-1-origins', query_params={'origin_from': origin_from, 'origin_count': origin_count}) return response @api_route(r'/origin/(?P[a-z]+)/url/(?P.+)/', 'api-1-origin') @api_route(r'/origin/(?P.+)/get/', 'api-1-origin') @api_route(r'/origin/(?P[0-9]+)/', 'api-1-origin') @api_doc('/origin/') @format_docstring(return_origin=DOC_RETURN_ORIGIN) def api_origin(request, origin_id=None, origin_type=None, origin_url=None): """ .. http:get:: /api/1/origin/(origin_url)/get/ Get information about a software origin. :param string origin_url: the origin url {return_origin} {common_headers} **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, :http:method:`options` :statuscode 200: no error :statuscode 404: requested origin can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`origin/git/url/https://github.com/python/cpython/` .. http:get:: /api/1/origin/(origin_id)/ Get information about a software origin. .. warning:: All endpoints using an ``origin_id`` or an ``origin_type`` are deprecated and will be removed in the near future. Only those using an ``origin_url`` will remain available. You should use :http:get:`/api/1/origin/(origin_url)/get/` instead. :param int origin_id: a software origin identifier {return_origin} {common_headers} **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, :http:method:`options` :statuscode 200: no error :statuscode 404: requested origin can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`origin/1/` .. http:get:: /api/1/origin/(origin_type)/url/(origin_url)/ Get information about a software origin. .. warning:: All endpoints using an ``origin_id`` or an ``origin_type`` are deprecated and will be removed in the near future. Only those using an ``origin_url`` will remain available. You should use :http:get:`/api/1/origin/(origin_url)/get/` instead. :param string origin_type: the origin type (possible values are ``git``, ``svn``, ``hg``, ``deb``, ``pypi``, ``npm``, ``ftp`` or ``deposit``) :param string origin_url: the origin url {return_origin} {common_headers} **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, :http:method:`options` :statuscode 200: no error :statuscode 404: requested origin can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`origin/git/url/https://github.com/python/cpython/` """ ori_dict = { 'id': int(origin_id) if origin_id else None, 'type': origin_type, 'url': origin_url } ori_dict = {k: v for k, v in ori_dict.items() if ori_dict[k]} error_msg = 'Origin %s not found.' % \ (ori_dict.get('id') or ori_dict['url']) return api_lookup( service.lookup_origin, ori_dict, notfound_msg=error_msg, enrich_fn=_enrich_origin) @api_route(r'/origin/search/(?P.+)/', 'api-1-origin-search') @api_doc('/origin/search/') @format_docstring(return_origin_array=DOC_RETURN_ORIGIN_ARRAY) def api_origin_search(request, url_pattern): """ .. http:get:: /api/1/origin/search/(url_pattern)/ Search for software origins whose urls contain a provided string pattern or match a provided regular expression. The search is performed in a case insensitive way. :param string url_pattern: a string pattern or a regular expression :query int offset: the number of found origins to skip before returning results :query int limit: the maximum number of found origins to return :query boolean regexp: if true, consider provided pattern as a regular expression and search origins whose urls match it :query boolean with_visit: if true, only return origins with at least one visit by Software heritage {return_origin_array} {common_headers} **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, :http:method:`options` :statuscode 200: no error **Example:** .. parsed-literal:: :swh_web_api:`origin/search/python/?limit=2` """ result = {} offset = int(request.query_params.get('offset', '0')) limit = int(request.query_params.get('limit', '70')) regexp = request.query_params.get('regexp', 'false') with_visit = request.query_params.get('with_visit', 'false') results = api_lookup(service.search_origin, url_pattern, offset, limit, bool(strtobool(regexp)), bool(strtobool(with_visit)), enrich_fn=_enrich_origin) nb_results = len(results) if nb_results == limit: query_params = {} query_params['offset'] = offset + limit query_params['limit'] = limit query_params['regexp'] = regexp result['headers'] = { 'link-next': reverse('api-1-origin-search', url_args={'url_pattern': url_pattern}, query_params=query_params) } result.update({ 'results': results }) return result @api_route(r'/origin/metadata-search/', 'api-1-origin-metadata-search') @api_doc('/origin/metadata-search/', noargs=True, need_params=True) @format_docstring(return_origin_array=DOC_RETURN_ORIGIN_ARRAY) def api_origin_metadata_search(request): """ .. http:get:: /api/1/origin/metadata-search/ Search for software origins whose metadata (expressed as a JSON-LD/CodeMeta dictionary) match the provided criteria. For now, only full-text search on this dictionary is supported. :query str fulltext: a string that will be matched against origin metadata; results are ranked and ordered starting with the best ones. :query int limit: the maximum number of found origins to return (bounded to 100) {return_origin_array} {common_headers} **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, :http:method:`options` :statuscode 200: no error **Example:** .. parsed-literal:: :swh_web_api:`origin/metadata-search/?limit=2&fulltext=Jane%20Doe` """ fulltext = request.query_params.get('fulltext', None) limit = min(int(request.query_params.get('limit', '70')), 100) if not fulltext: content = '"fulltext" must be provided and non-empty.' raise BadInputExc(content) results = api_lookup(service.search_origin_metadata, fulltext, limit) return { 'results': results, } @api_route(r'/origin/(?P.*)/visits/', 'api-1-origin-visits') @api_route(r'/origin/(?P[0-9]+)/visits/', 'api-1-origin-visits') @api_doc('/origin/visits/') @format_docstring( return_origin_visit_array=DOC_RETURN_ORIGIN_VISIT_ARRAY) def api_origin_visits(request, origin_id=None, origin_url=None): """ .. http:get:: /api/1/origin/(origin_url)/visits/ Get information about all visits of a software origin. Visits are returned sorted in descending order according to their date. :param str origin_url: a software origin URL :query int per_page: specify the number of visits to list, for pagination purposes :query int last_visit: visit to start listing from, for pagination purposes {common_headers} {resheader_link} {return_origin_visit_array} **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, :http:method:`options` :statuscode 200: no error :statuscode 404: requested origin can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`origin/https://github.com/hylang/hy/visits/` .. http:get:: /api/1/origin/(origin_id)/visits/ Get information about all visits of a software origin. Visits are returned sorted in descending order according to their date. .. warning:: All endpoints using an ``origin_id`` are deprecated and will be removed in the near future. Only those using an ``origin_url`` will remain available. Use :http:get:`/api/1/origin/(origin_url)/visits/` instead. :param int origin_id: a software origin identifier :query int per_page: specify the number of visits to list, for pagination purposes :query int last_visit: visit to start listing from, for pagination purposes {common_headers} {resheader_link} {return_origin_visit_array} **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, :http:method:`options` :statuscode 200: no error :statuscode 404: requested origin can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`origin/1/visits/` """ result = {} if origin_url: origin_query = {'url': origin_url} notfound_msg = 'No origin {} found'.format(origin_url) url_args_next = {'origin_url': origin_url} else: origin_query = {'id': int(origin_id)} notfound_msg = 'No origin {} found'.format(origin_id) url_args_next = {'origin_id': origin_id} per_page = int(request.query_params.get('per_page', '10')) last_visit = request.query_params.get('last_visit') if last_visit: last_visit = int(last_visit) def _lookup_origin_visits( origin_query, last_visit=last_visit, per_page=per_page): all_visits = get_origin_visits(origin_query) all_visits.reverse() visits = [] if not last_visit: visits = all_visits[:per_page] else: for i, v in enumerate(all_visits): if v['visit'] == last_visit: visits = all_visits[i+1:i+1+per_page] break for v in visits: yield v results = api_lookup(_lookup_origin_visits, origin_query, notfound_msg=notfound_msg, enrich_fn=partial(_enrich_origin_visit, with_origin_link=False, with_origin_visit_link=True)) if results: nb_results = len(results) if nb_results == per_page: new_last_visit = results[-1]['visit'] query_params = {} query_params['last_visit'] = new_last_visit if request.query_params.get('per_page'): query_params['per_page'] = per_page result['headers'] = { 'link-next': reverse('api-1-origin-visits', url_args=url_args_next, query_params=query_params) } result.update({ 'results': results }) return result @api_route(r'/origin/(?P.*)/visit/(?P[0-9]+)/', 'api-1-origin-visit') @api_route(r'/origin/(?P[0-9]+)/visit/(?P[0-9]+)/', 'api-1-origin-visit') @api_doc('/origin/visit/') @format_docstring(return_origin_visit=DOC_RETURN_ORIGIN_VISIT) def api_origin_visit(request, visit_id, origin_url=None, origin_id=None): """ .. http:get:: /api/1/origin/(origin_url)/visit/(visit_id)/ Get information about a specific visit of a software origin. :param str origin_url: a software origin URL :param int visit_id: a visit identifier {common_headers} {return_origin_visit} **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, :http:method:`options` :statuscode 200: no error :statuscode 404: requested origin or visit can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`origin/https://github.com/hylang/hy/visit/1/` .. http:get:: /api/1/origin/(origin_id)/visit/(visit_id)/ Get information about a specific visit of a software origin. .. warning:: All endpoints using an ``origin_id`` are deprecated and will be removed in the near future. Only those using an ``origin_url`` will remain available. Use :http:get:`/api/1/origin/(origin_url)/visit/(visit_id)` instead. :param int origin_id: a software origin identifier :param int visit_id: a visit identifier {common_headers} {return_origin_visit} **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, :http:method:`options` :statuscode 200: no error :statuscode 404: requested origin or visit can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`origin/1500/visit/1/` """ if not origin_url: origin_url = service.lookup_origin({'id': int(origin_id)})['url'] return api_lookup( service.lookup_origin_visit, origin_url, int(visit_id), notfound_msg=('No visit {} for origin {} found' .format(visit_id, origin_url)), enrich_fn=partial(_enrich_origin_visit, with_origin_link=True, with_origin_visit_link=False)) @api_route(r'/origin/(?P[a-z]+)/url/(?P.+)' '/intrinsic-metadata', 'api-origin-intrinsic-metadata') @api_doc('/origin/intrinsic-metadata/') @format_docstring() def api_origin_intrinsic_metadata(request, origin_type, origin_url): """ .. http:get:: /api/1/origin/(origin_type)/url/(origin_url)/intrinsic-metadata Get intrinsic metadata of a software origin (as a JSON-LD/CodeMeta dictionary). :param string origin_type: the origin type (possible values are ``git``, ``svn``, ``hg``, ``deb``, ``pypi``, ``npm``, ``ftp`` or ``deposit``) :param string origin_url: the origin url :>json string ???: intrinsic metadata field of the origin {common_headers} **Allowed HTTP Methods:** :http:method:`get`, :http:method:`head`, :http:method:`options` :statuscode 200: no error :statuscode 404: requested origin can not be found in the archive **Example:** .. parsed-literal:: :swh_web_api:`origin/git/url/https://github.com/python/cpython/intrinsic-metadata` """ # noqa ori_dict = { 'type': origin_type, 'url': origin_url } error_msg = 'Origin with URL %s not found' % ori_dict['url'] return api_lookup( service.lookup_origin_intrinsic_metadata, ori_dict, notfound_msg=error_msg, enrich_fn=_enrich_origin) diff --git a/swh/web/common/service.py b/swh/web/common/service.py index 01d24b7b..2f6e4183 100644 --- a/swh/web/common/service.py +++ b/swh/web/common/service.py @@ -1,1079 +1,1108 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import os from collections import defaultdict from swh.model import hashutil from swh.storage.algos import revisions_walker from swh.web.common import converters from swh.web.common import query from swh.web.common.exc import NotFoundExc from swh.web.common.origin_visits import get_origin_visit from swh.web import config storage = config.storage() vault = config.vault() idx_storage = config.indexer_storage() MAX_LIMIT = 50 # Top limit the users can ask for def _first_element(l): """Returns the first element in the provided list or None if it is empty or None""" return next(iter(l or []), None) def lookup_multiple_hashes(hashes): """Lookup the passed hashes in a single DB connection, using batch processing. Args: An array of {filename: X, sha1: Y}, string X, hex sha1 string Y. Returns: The same array with elements updated with elem['found'] = true if the hash is present in storage, elem['found'] = false if not. """ hashlist = [hashutil.hash_to_bytes(elem['sha1']) for elem in hashes] content_missing = storage.content_missing_per_sha1(hashlist) missing = [hashutil.hash_to_hex(x) for x in content_missing] for x in hashes: x.update({'found': True}) for h in hashes: if h['sha1'] in missing: h['found'] = False return hashes def lookup_expression(expression, last_sha1, per_page): """Lookup expression in raw content. Args: expression (str): An expression to lookup through raw indexed content last_sha1 (str): Last sha1 seen per_page (int): Number of results per page Yields: ctags whose content match the expression """ limit = min(per_page, MAX_LIMIT) ctags = idx_storage.content_ctags_search(expression, last_sha1=last_sha1, limit=limit) for ctag in ctags: ctag = converters.from_swh(ctag, hashess={'id'}) ctag['sha1'] = ctag['id'] ctag.pop('id') yield ctag def lookup_hash(q): """Checks if the storage contains a given content checksum Args: query string of the form Returns: Dict with key found containing the hash info if the hash is present, None if not. """ algo, hash = query.parse_hash(q) found = _first_element(storage.content_find({algo: hash})) return {'found': converters.from_content(found), 'algo': algo} def search_hash(q): """Checks if the storage contains a given content checksum Args: query string of the form Returns: Dict with key found to True or False, according to whether the checksum is present or not """ algo, hash = query.parse_hash(q) found = _first_element(storage.content_find({algo: hash})) return {'found': found is not None} def _lookup_content_sha1(q): """Given a possible input, query for the content's sha1. Args: q: query string of the form Returns: binary sha1 if found or None """ algo, hash = query.parse_hash(q) if algo != 'sha1': hashes = _first_element(storage.content_find({algo: hash})) if not hashes: return None return hashes['sha1'] return hash def lookup_content_ctags(q): """Return ctags information from a specified content. Args: q: query string of the form Yields: ctags information (dict) list if the content is found. """ sha1 = _lookup_content_sha1(q) if not sha1: return None ctags = list(idx_storage.content_ctags_get([sha1])) if not ctags: return None for ctag in ctags: yield converters.from_swh(ctag, hashess={'id'}) def lookup_content_filetype(q): """Return filetype information from a specified content. Args: q: query string of the form Yields: filetype information (dict) list if the content is found. """ sha1 = _lookup_content_sha1(q) if not sha1: return None filetype = _first_element(list(idx_storage.content_mimetype_get([sha1]))) if not filetype: return None return converters.from_filetype(filetype) def lookup_content_language(q): """Return language information from a specified content. Args: q: query string of the form Yields: language information (dict) list if the content is found. """ sha1 = _lookup_content_sha1(q) if not sha1: return None lang = _first_element(list(idx_storage.content_language_get([sha1]))) if not lang: return None return converters.from_swh(lang, hashess={'id'}) def lookup_content_license(q): """Return license information from a specified content. Args: q: query string of the form Yields: license information (dict) list if the content is found. """ sha1 = _lookup_content_sha1(q) if not sha1: return None lic = _first_element(idx_storage.content_fossology_license_get([sha1])) if not lic: return None return converters.from_swh({'id': sha1, 'facts': lic[sha1]}, hashess={'id'}) def lookup_origin(origin): """Return information about the origin matching dict origin. Args: origin: origin's dict with keys either 'id' or 'url' Returns: origin information as dict. """ origin_info = storage.origin_get(origin) if not origin_info: msg = 'Origin %s not found!' % \ (origin.get('id') or origin['url']) raise NotFoundExc(msg) return converters.from_origin(origin_info) def lookup_origins(origin_from=1, origin_count=100): """Get list of archived software origins in a paginated way. Origins are sorted by id before returning them Args: origin_from (int): The minimum id of the origins to return origin_count (int): The maximum number of origins to return Yields: origins information as dicts """ origins = storage.origin_get_range(origin_from, origin_count) return map(converters.from_origin, origins) def search_origin(url_pattern, offset=0, limit=50, regexp=False, with_visit=False): """Search for origins whose urls contain a provided string pattern or match a provided regular expression. Args: url_pattern: the string pattern to search for in origin urls offset: number of found origins to skip before returning results limit: the maximum number of found origins to return Returns: list of origin information as dict. """ origins = storage.origin_search(url_pattern, offset, limit, regexp, with_visit) return map(converters.from_origin, origins) def search_origin_metadata(fulltext, limit=50): """Search for origins whose metadata match a provided string pattern. Args: fulltext: the string pattern to search for in origin metadata offset: number of found origins to skip before returning results limit: the maximum number of found origins to return Returns: list of origin metadata as dict. """ matches = idx_storage.origin_intrinsic_metadata_search_fulltext( conjunction=[fulltext], limit=limit) results = [] for match in matches: match['from_revision'] = hashutil.hash_to_hex(match['from_revision']) result = converters.from_origin( - storage.origin_get({'id': match.pop('id')})) + storage.origin_get({'url': match.pop('origin_url')})) result['metadata'] = match results.append(result) return results def lookup_origin_intrinsic_metadata(origin_dict): - """Return intrinsic metadata for origin whose origin_id matches given - origin_id. + """Return intrinsic metadata for origin whose origin matches given + origin. Args: origin_dict: origin's dict with keys ('type' AND 'url') Returns: origin metadata. """ origin_info = storage.origin_get(origin_dict) if not origin_info: msg = 'Origin with type %s and url %s not found!' % \ (origin_dict['type'], origin_dict['url']) raise NotFoundExc(msg) - origin_ids = [origin_info['id']] + origins = [origin_info['url']] match = _first_element( - idx_storage.origin_intrinsic_metadata_get(origin_ids)) + idx_storage.origin_intrinsic_metadata_get(origins)) result = {} if match: result = match['metadata'] return result def lookup_person(person_id): """Return information about the person with id person_id. Args: person_id as string Returns: person information as dict. Raises: NotFoundExc if there is no person with the provided id. """ person = _first_element(storage.person_get([int(person_id)])) if not person: raise NotFoundExc('Person with id %s not found' % person_id) return converters.from_person(person) def _to_sha1_bin(sha1_hex): _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( sha1_hex, ['sha1'], # HACK: sha1_git really 'Only sha1_git is supported.') return sha1_git_bin def _check_directory_exists(sha1_git, sha1_git_bin): if len(list(storage.directory_missing([sha1_git_bin]))): raise NotFoundExc('Directory with sha1_git %s not found' % sha1_git) def lookup_directory(sha1_git): """Return information about the directory with id sha1_git. Args: sha1_git as string Returns: directory information as dict. """ empty_dir_sha1 = '4b825dc642cb6eb9a060e54bf8d69288fbee4904' if sha1_git == empty_dir_sha1: return [] sha1_git_bin = _to_sha1_bin(sha1_git) _check_directory_exists(sha1_git, sha1_git_bin) directory_entries = storage.directory_ls(sha1_git_bin) return map(converters.from_directory_entry, directory_entries) def lookup_directory_with_path(sha1_git, path_string): """Return directory information for entry with path path_string w.r.t. root directory pointed by directory_sha1_git Args: - directory_sha1_git: sha1_git corresponding to the directory to which we append paths to (hopefully) find the entry - the relative path to the entry starting from the directory pointed by directory_sha1_git Raises: NotFoundExc if the directory entry is not found """ sha1_git_bin = _to_sha1_bin(sha1_git) _check_directory_exists(sha1_git, sha1_git_bin) paths = path_string.strip(os.path.sep).split(os.path.sep) queried_dir = storage.directory_entry_get_by_path( sha1_git_bin, list(map(lambda p: p.encode('utf-8'), paths))) if not queried_dir: raise NotFoundExc(('Directory entry with path %s from %s not found') % (path_string, sha1_git)) return converters.from_directory_entry(queried_dir) def lookup_release(release_sha1_git): """Return information about the release with sha1 release_sha1_git. Args: release_sha1_git: The release's sha1 as hexadecimal Returns: Release information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. """ sha1_git_bin = _to_sha1_bin(release_sha1_git) release = _first_element(storage.release_get([sha1_git_bin])) if not release: raise NotFoundExc('Release with sha1_git %s not found.' % release_sha1_git) return converters.from_release(release) def lookup_release_multiple(sha1_git_list): """Return information about the revisions identified with their sha1_git identifiers. Args: sha1_git_list: A list of revision sha1_git identifiers Returns: Release information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. """ sha1_bin_list = (_to_sha1_bin(sha1_git) for sha1_git in sha1_git_list) releases = storage.release_get(sha1_bin_list) or [] return (converters.from_release(r) for r in releases) def lookup_revision(rev_sha1_git): """Return information about the revision with sha1 revision_sha1_git. Args: revision_sha1_git: The revision's sha1 as hexadecimal Returns: Revision information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. NotFoundExc if there is no revision with the provided sha1_git. """ sha1_git_bin = _to_sha1_bin(rev_sha1_git) revision = _first_element(storage.revision_get([sha1_git_bin])) if not revision: raise NotFoundExc('Revision with sha1_git %s not found.' % rev_sha1_git) return converters.from_revision(revision) def lookup_revision_multiple(sha1_git_list): """Return information about the revisions identified with their sha1_git identifiers. Args: sha1_git_list: A list of revision sha1_git identifiers Returns: Generator of revisions information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. """ sha1_bin_list = (_to_sha1_bin(sha1_git) for sha1_git in sha1_git_list) revisions = storage.revision_get(sha1_bin_list) or [] return (converters.from_revision(r) for r in revisions) def lookup_revision_message(rev_sha1_git): """Return the raw message of the revision with sha1 revision_sha1_git. Args: revision_sha1_git: The revision's sha1 as hexadecimal Returns: Decoded revision message as dict {'message': } Raises: ValueError if the identifier provided is not of sha1 nature. NotFoundExc if the revision is not found, or if it has no message """ sha1_git_bin = _to_sha1_bin(rev_sha1_git) revision = _first_element(storage.revision_get([sha1_git_bin])) if not revision: raise NotFoundExc('Revision with sha1_git %s not found.' % rev_sha1_git) if 'message' not in revision: raise NotFoundExc('No message for revision with sha1_git %s.' % rev_sha1_git) res = {'message': revision['message']} return res -def _lookup_revision_id_by(origin_id, branch_name, timestamp): +def _lookup_revision_id_by(origin, branch_name, timestamp): def _get_snapshot_branch(snapshot, branch_name): snapshot = lookup_snapshot(visit['snapshot'], branches_from=branch_name, branches_count=10) branch = None if branch_name in snapshot['branches']: branch = snapshot['branches'][branch_name] return branch - visit = get_origin_visit({'id': origin_id}, visit_ts=timestamp) + if isinstance(origin, int): + origin = {'id': origin} + elif isinstance(origin, str): + origin = {'url': origin} + else: + raise TypeError('"origin" must be an int or a string.') + + visit = get_origin_visit(origin, visit_ts=timestamp) branch = _get_snapshot_branch(visit['snapshot'], branch_name) rev_id = None if branch and branch['target_type'] == 'revision': rev_id = branch['target'] elif branch and branch['target_type'] == 'alias': branch = _get_snapshot_branch(visit['snapshot'], branch['target']) if branch and branch['target_type'] == 'revision': rev_id = branch['target'] if not rev_id: raise NotFoundExc('Revision for origin %s and branch %s not found.' - % (origin_id, branch_name)) + % (origin.get('url') or origin['id'], branch_name)) return rev_id -def lookup_revision_by(origin_id, +def lookup_revision_by(origin, branch_name='HEAD', timestamp=None): - """Lookup revision by origin id, snapshot branch name and visit timestamp. + """Lookup revision by origin, snapshot branch name and visit timestamp. If branch_name is not provided, lookup using 'HEAD' as default. If timestamp is not provided, use the most recent. Args: - origin_id (int): origin of the revision + origin (Union[int,str]): origin of the revision branch_name (str): snapshot branch name timestamp (str/int): origin visit time frame Returns: dict: The revision matching the criterions Raises: NotFoundExc if no revision corresponds to the criterion """ - rev_id = _lookup_revision_id_by(origin_id, branch_name, timestamp) + rev_id = _lookup_revision_id_by(origin, branch_name, timestamp) return lookup_revision(rev_id) def lookup_revision_log(rev_sha1_git, limit): """Lookup revision log by revision id. Args: rev_sha1_git (str): The revision's sha1 as hexadecimal limit (int): the maximum number of revisions returned Returns: list: Revision log as list of revision dicts Raises: ValueError: if the identifier provided is not of sha1 nature. NotFoundExc: if there is no revision with the provided sha1_git. """ lookup_revision(rev_sha1_git) sha1_git_bin = _to_sha1_bin(rev_sha1_git) revision_entries = storage.revision_log([sha1_git_bin], limit) return map(converters.from_revision, revision_entries) -def lookup_revision_log_by(origin_id, branch_name, timestamp, limit): - """Lookup revision by origin id, snapshot branch name and visit timestamp. +def lookup_revision_log_by(origin, branch_name, timestamp, limit): + """Lookup revision by origin, snapshot branch name and visit timestamp. Args: - origin_id (int): origin of the revision + origin (Union[int,str]): origin of the revision branch_name (str): snapshot branch timestamp (str/int): origin visit time frame limit (int): the maximum number of revisions returned Returns: list: Revision log as list of revision dicts Raises: NotFoundExc: if no revision corresponds to the criterion """ - rev_id = _lookup_revision_id_by(origin_id, branch_name, timestamp) + rev_id = _lookup_revision_id_by(origin, branch_name, timestamp) return lookup_revision_log(rev_id, limit) -def lookup_revision_with_context_by(origin_id, branch_name, timestamp, +def lookup_revision_with_context_by(origin, branch_name, timestamp, sha1_git, limit=100): """Return information about revision sha1_git, limited to the sub-graph of all transitive parents of sha1_git_root. - sha1_git_root being resolved through the lookup of a revision by origin_id, + sha1_git_root being resolved through the lookup of a revision by origin, branch_name and ts. In other words, sha1_git is an ancestor of sha1_git_root. Args: - - origin_id: origin of the revision. + - origin: origin of the revision. - branch_name: revision's branch. - timestamp: revision's time frame. - sha1_git: one of sha1_git_root's ancestors. - limit: limit the lookup to 100 revisions back. Returns: Pair of (root_revision, revision). Information on sha1_git if it is an ancestor of sha1_git_root including children leading to sha1_git_root Raises: - BadInputExc in case of unknown algo_hash or bad hash. - NotFoundExc if either revision is not found or if sha1_git is not an ancestor of sha1_git_root. """ - rev_root_id = _lookup_revision_id_by(origin_id, branch_name, timestamp) + rev_root_id = _lookup_revision_id_by(origin, branch_name, timestamp) rev_root_id_bin = hashutil.hash_to_bytes(rev_root_id) rev_root = _first_element(storage.revision_get([rev_root_id_bin])) return (converters.from_revision(rev_root), lookup_revision_with_context(rev_root, sha1_git, limit)) def lookup_revision_with_context(sha1_git_root, sha1_git, limit=100): """Return information about revision sha1_git, limited to the sub-graph of all transitive parents of sha1_git_root. In other words, sha1_git is an ancestor of sha1_git_root. Args: sha1_git_root: latest revision. The type is either a sha1 (as an hex string) or a non converted dict. sha1_git: one of sha1_git_root's ancestors limit: limit the lookup to 100 revisions back Returns: Information on sha1_git if it is an ancestor of sha1_git_root including children leading to sha1_git_root Raises: BadInputExc in case of unknown algo_hash or bad hash NotFoundExc if either revision is not found or if sha1_git is not an ancestor of sha1_git_root """ sha1_git_bin = _to_sha1_bin(sha1_git) revision = _first_element(storage.revision_get([sha1_git_bin])) if not revision: raise NotFoundExc('Revision %s not found' % sha1_git) if isinstance(sha1_git_root, str): sha1_git_root_bin = _to_sha1_bin(sha1_git_root) revision_root = _first_element(storage.revision_get([sha1_git_root_bin])) # noqa if not revision_root: raise NotFoundExc('Revision root %s not found' % sha1_git_root) else: sha1_git_root_bin = sha1_git_root['id'] revision_log = storage.revision_log([sha1_git_root_bin], limit) parents = {} children = defaultdict(list) for rev in revision_log: rev_id = rev['id'] parents[rev_id] = [] for parent_id in rev['parents']: parents[rev_id].append(parent_id) children[parent_id].append(rev_id) if revision['id'] not in parents: raise NotFoundExc('Revision %s is not an ancestor of %s' % (sha1_git, sha1_git_root)) revision['children'] = children[revision['id']] return converters.from_revision(revision) def lookup_directory_with_revision(sha1_git, dir_path=None, with_data=False): """Return information on directory pointed by revision with sha1_git. If dir_path is not provided, display top level directory. Otherwise, display the directory pointed by dir_path (if it exists). Args: sha1_git: revision's hash. dir_path: optional directory pointed to by that revision. with_data: boolean that indicates to retrieve the raw data if the path resolves to a content. Default to False (for the api) Returns: Information on the directory pointed to by that revision. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc either if the revision is not found or the path referenced does not exist. NotImplementedError in case of dir_path exists but do not reference a type 'dir' or 'file'. """ sha1_git_bin = _to_sha1_bin(sha1_git) revision = _first_element(storage.revision_get([sha1_git_bin])) if not revision: raise NotFoundExc('Revision %s not found' % sha1_git) dir_sha1_git_bin = revision['directory'] if dir_path: paths = dir_path.strip(os.path.sep).split(os.path.sep) entity = storage.directory_entry_get_by_path( dir_sha1_git_bin, list(map(lambda p: p.encode('utf-8'), paths))) if not entity: raise NotFoundExc( "Directory or File '%s' pointed to by revision %s not found" % (dir_path, sha1_git)) else: entity = {'type': 'dir', 'target': dir_sha1_git_bin} if entity['type'] == 'dir': directory_entries = storage.directory_ls(entity['target']) or [] return {'type': 'dir', 'path': '.' if not dir_path else dir_path, 'revision': sha1_git, 'content': list(map(converters.from_directory_entry, directory_entries))} elif entity['type'] == 'file': # content content = _first_element( storage.content_find({'sha1_git': entity['target']})) if not content: raise NotFoundExc('Content not found for revision %s' % sha1_git) if with_data: c = _first_element(storage.content_get([content['sha1']])) content['data'] = c['data'] return {'type': 'file', 'path': '.' if not dir_path else dir_path, 'revision': sha1_git, 'content': converters.from_content(content)} elif entity['type'] == 'rev': # revision revision = next(storage.revision_get([entity['target']])) return {'type': 'rev', 'path': '.' if not dir_path else dir_path, 'revision': sha1_git, 'content': converters.from_revision(revision)} else: raise NotImplementedError('Entity of type %s not implemented.' % entity['type']) def lookup_content(q): """Lookup the content designed by q. Args: q: The release's sha1 as hexadecimal Raises: NotFoundExc if the requested content is not found """ algo, hash = query.parse_hash(q) c = _first_element(storage.content_find({algo: hash})) if not c: raise NotFoundExc('Content with %s checksum equals to %s not found!' % (algo, hashutil.hash_to_hex(hash))) return converters.from_content(c) def lookup_content_raw(q): """Lookup the content defined by q. Args: q: query string of the form Returns: dict with 'sha1' and 'data' keys. data representing its raw data decoded. Raises: NotFoundExc if the requested content is not found or if the content bytes are not available in the storage """ c = lookup_content(q) content_sha1_bytes = hashutil.hash_to_bytes(c['checksums']['sha1']) content = _first_element(storage.content_get([content_sha1_bytes])) if not content: algo, hash = query.parse_hash(q) raise NotFoundExc('Bytes of content with %s checksum equals to %s ' 'are not available!' % (algo, hashutil.hash_to_hex(hash))) return converters.from_content(content) def stat_counters(): """Return the stat counters for Software Heritage Returns: A dict mapping textual labels to integer values. """ return storage.stat_counters() def _lookup_origin_visits(origin_url, last_visit=None, limit=10): - """Yields the origin origin_ids' visits. + """Yields the origin origins' visits. Args: origin_url (str): origin to list visits for last_visit (int): last visit to lookup from limit (int): Number of elements max to display Yields: Dictionaries of origin_visit for that origin """ limit = min(limit, MAX_LIMIT) for visit in storage.origin_visit_get( origin_url, last_visit=last_visit, limit=limit): visit['origin'] = origin_url yield visit -def lookup_origin_visits(origin_id, last_visit=None, per_page=10): - """Yields the origin origin_ids' visits. +def lookup_origin_visits(origin, last_visit=None, per_page=10): + """Yields the origin origins' visits. Args: - origin_id: origin to list visits for + origin: origin to list visits for Yields: Dictionaries of origin_visit for that origin """ - visits = _lookup_origin_visits(origin_id, last_visit=last_visit, + visits = _lookup_origin_visits(origin, last_visit=last_visit, limit=per_page) for visit in visits: yield converters.from_origin_visit(visit) def lookup_origin_visit(origin_url, visit_id): - """Return information about visit visit_id with origin origin_id. + """Return information about visit visit_id with origin origin. Args: origin (str): origin concerned by the visit visit_id: the visit identifier to lookup Yields: The dict origin_visit concerned """ visit = storage.origin_visit_get_by(origin_url, visit_id) if not visit: raise NotFoundExc('Origin %s or its visit ' 'with id %s not found!' % (origin_url, visit_id)) visit['origin'] = origin_url return converters.from_origin_visit(visit) def lookup_snapshot_size(snapshot_id): """Count the number of branches in the snapshot with the given id Args: snapshot_id (str): sha1 identifier of the snapshot Returns: dict: A dict whose keys are the target types of branches and values their corresponding amount """ snapshot_id_bin = _to_sha1_bin(snapshot_id) snapshot_size = storage.snapshot_count_branches(snapshot_id_bin) if 'revision' not in snapshot_size: snapshot_size['revision'] = 0 if 'release' not in snapshot_size: snapshot_size['release'] = 0 return snapshot_size def lookup_snapshot(snapshot_id, branches_from='', branches_count=1000, target_types=None): """Return information about a snapshot, aka the list of named branches found during a specific visit of an origin. Args: snapshot_id (str): sha1 identifier of the snapshot branches_from (str): optional parameter used to skip branches whose name is lesser than it before returning them branches_count (int): optional parameter used to restrain the amount of returned branches target_types (list): optional parameter used to filter the target types of branch to return (possible values that can be contained in that list are `'content', 'directory', 'revision', 'release', 'snapshot', 'alias'`) Returns: A dict filled with the snapshot content. """ snapshot_id_bin = _to_sha1_bin(snapshot_id) snapshot = storage.snapshot_get_branches(snapshot_id_bin, branches_from.encode(), branches_count, target_types) if not snapshot: raise NotFoundExc('Snapshot with id %s not found!' % snapshot_id) return converters.from_snapshot(snapshot) -def lookup_latest_origin_snapshot(origin_id, allowed_statuses=None): +def lookup_latest_origin_snapshot(origin, allowed_statuses=None): """Return information about the latest snapshot of an origin. .. warning:: At most 1000 branches contained in the snapshot will be returned for performance reasons. Args: - origin_id: integer identifier of the origin + origin: URL or integer identifier of the origin allowed_statuses: list of visit statuses considered to find the latest snapshot for the visit. For instance, ``allowed_statuses=['full']`` will only consider visits that have successfully run to completion. Returns: A dict filled with the snapshot content. """ - snapshot = storage.snapshot_get_latest(origin_id, allowed_statuses) + snapshot = storage.snapshot_get_latest(origin, allowed_statuses) return converters.from_snapshot(snapshot) def lookup_revision_through(revision, limit=100): """Retrieve a revision from the criterion stored in revision dictionary. Args: revision: Dictionary of criterion to lookup the revision with. Here are the supported combination of possible values: - origin_id, branch_name, ts, sha1_git - origin_id, branch_name, ts + - origin_url, branch_name, ts, sha1_git + - origin_url, branch_name, ts - sha1_git_root, sha1_git - sha1_git Returns: None if the revision is not found or the actual revision. """ - if 'origin_id' in revision and \ - 'branch_name' in revision and \ - 'ts' in revision and \ - 'sha1_git' in revision: + if ( + 'origin_url' in revision and + 'branch_name' in revision and + 'ts' in revision and + 'sha1_git' in revision): + return lookup_revision_with_context_by(revision['origin_url'], + revision['branch_name'], + revision['ts'], + revision['sha1_git'], + limit) + if ( + 'origin_id' in revision and + 'branch_name' in revision and + 'ts' in revision and + 'sha1_git' in revision): return lookup_revision_with_context_by(revision['origin_id'], revision['branch_name'], revision['ts'], revision['sha1_git'], limit) - if 'origin_id' in revision and \ - 'branch_name' in revision and \ - 'ts' in revision: + if ( + 'origin_url' in revision and + 'branch_name' in revision and + 'ts' in revision): + return lookup_revision_by(revision['origin_url'], + revision['branch_name'], + revision['ts']) + if ( + 'origin_id' in revision and + 'branch_name' in revision and + 'ts' in revision): return lookup_revision_by(revision['origin_id'], revision['branch_name'], revision['ts']) - if 'sha1_git_root' in revision and \ - 'sha1_git' in revision: + if ( + 'sha1_git_root' in revision and + 'sha1_git' in revision): return lookup_revision_with_context(revision['sha1_git_root'], revision['sha1_git'], limit) if 'sha1_git' in revision: return lookup_revision(revision['sha1_git']) # this should not happen raise NotImplementedError('Should not happen!') def lookup_directory_through_revision(revision, path=None, limit=100, with_data=False): """Retrieve the directory information from the revision. Args: revision: dictionary of criterion representing a revision to lookup path: directory's path to lookup. limit: optional query parameter to limit the revisions log (default to 100). For now, note that this limit could impede the transitivity conclusion about sha1_git not being an ancestor of. with_data: indicate to retrieve the content's raw data if path resolves to a content. Returns: The directory pointing to by the revision criterions at path. """ rev = lookup_revision_through(revision, limit) if not rev: raise NotFoundExc('Revision with criterion %s not found!' % revision) return (rev['id'], lookup_directory_with_revision(rev['id'], path, with_data)) def vault_cook(obj_type, obj_id, email=None): """Cook a vault bundle. """ return vault.cook(obj_type, obj_id, email=email) def vault_fetch(obj_type, obj_id): """Fetch a vault bundle. """ return vault.fetch(obj_type, obj_id) def vault_progress(obj_type, obj_id): """Get the current progress of a vault bundle. """ return vault.progress(obj_type, obj_id) def diff_revision(rev_id): """Get the list of file changes (insertion / deletion / modification / renaming) for a particular revision. """ rev_sha1_git_bin = _to_sha1_bin(rev_id) changes = storage.diff_revision(rev_sha1_git_bin, track_renaming=True) for change in changes: change['from'] = converters.from_directory_entry(change['from']) change['to'] = converters.from_directory_entry(change['to']) if change['from_path']: change['from_path'] = change['from_path'].decode('utf-8') if change['to_path']: change['to_path'] = change['to_path'].decode('utf-8') return changes class _RevisionsWalkerProxy(object): """ Proxy class wrapping a revisions walker iterator from swh-storage and performing needed conversions. """ def __init__(self, rev_walker_type, rev_start, *args, **kwargs): rev_start_bin = hashutil.hash_to_bytes(rev_start) self.revisions_walker = \ revisions_walker.get_revisions_walker(rev_walker_type, storage, rev_start_bin, *args, **kwargs) def export_state(self): return self.revisions_walker.export_state() def __next__(self): return converters.from_revision(next(self.revisions_walker)) def __iter__(self): return self def get_revisions_walker(rev_walker_type, rev_start, *args, **kwargs): """ Utility function to instantiate a revisions walker of a given type, see :mod:`swh.storage.algos.revisions_walker`. Args: rev_walker_type (str): the type of revisions walker to return, possible values are: ``committer_date``, ``dfs``, ``dfs_post``, ``bfs`` and ``path`` rev_start (str): hexadecimal representation of a revision identifier args (list): position arguments to pass to the revisions walker constructor kwargs (dict): keyword arguments to pass to the revisions walker constructor """ # first check if the provided revision is valid lookup_revision(rev_start) return _RevisionsWalkerProxy(rev_walker_type, rev_start, *args, **kwargs) diff --git a/swh/web/tests/api/views/test_origin.py b/swh/web/tests/api/views/test_origin.py index bab2fd11..39db3f19 100644 --- a/swh/web/tests/api/views/test_origin.py +++ b/swh/web/tests/api/views/test_origin.py @@ -1,525 +1,533 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random from hypothesis import given +import pytest from rest_framework.test import APITestCase from unittest.mock import patch from swh.storage.exc import StorageDBError, StorageAPIError from swh.web.common.utils import reverse from swh.web.common.origin_visits import get_origin_visits from swh.web.tests.strategies import ( origin, new_origin, new_origins, visit_dates, new_snapshots ) from swh.web.tests.testcase import WebTestCase class OriginApiTestCase(WebTestCase, APITestCase): @patch('swh.web.api.views.origin.get_origin_visits') def test_api_lookup_origin_visits_raise_error( self, mock_get_origin_visits, ): err_msg = 'voluntary error to check the bad request middleware.' mock_get_origin_visits.side_effect = ValueError(err_msg) url = reverse( 'api-1-origin-visits', url_args={'origin_url': 'http://foo'}) rv = self.client.get(url) self.assertEqual(rv.status_code, 400, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'ValueError', 'reason': err_msg}) @patch('swh.web.api.views.origin.get_origin_visits') def test_api_lookup_origin_visits_raise_swh_storage_error_db( self, mock_get_origin_visits): err_msg = 'Storage exploded! Will be back online shortly!' mock_get_origin_visits.side_effect = StorageDBError(err_msg) url = reverse( 'api-1-origin-visits', url_args={'origin_url': 'http://foo'}) rv = self.client.get(url) self.assertEqual(rv.status_code, 503, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'StorageDBError', 'reason': 'An unexpected error occurred in the backend: %s' % err_msg}) @patch('swh.web.api.views.origin.get_origin_visits') def test_api_lookup_origin_visits_raise_swh_storage_error_api( self, mock_get_origin_visits): err_msg = 'Storage API dropped dead! Will resurrect asap!' mock_get_origin_visits.side_effect = StorageAPIError(err_msg) url = reverse( 'api-1-origin-visits', url_args={'origin_url': 'http://foo'}) rv = self.client.get(url) self.assertEqual(rv.status_code, 503, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'StorageAPIError', 'reason': 'An unexpected error occurred in the api backend: %s' % err_msg }) @given(new_origin(), visit_dates(3), new_snapshots(3)) def test_api_lookup_origin_visits(self, new_origin, visit_dates, new_snapshots): origin_id = self.storage.origin_add_one(new_origin) new_origin['id'] = origin_id for i, visit_date in enumerate(visit_dates): origin_visit = self.storage.origin_visit_add(origin_id, visit_date) self.storage.snapshot_add(origin_id, origin_visit['visit'], new_snapshots[i]) all_visits = list(reversed(get_origin_visits(new_origin))) for last_visit, expected_visits in ( (None, all_visits[:2]), (all_visits[1]['visit'], all_visits[2:4])): url = reverse('api-1-origin-visits', url_args={'origin_url': new_origin['url']}, query_params={'per_page': 2, 'last_visit': last_visit}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') for expected_visit in expected_visits: origin_visit_url = reverse( 'api-1-origin-visit', url_args={'origin_url': new_origin['url'], 'visit_id': expected_visit['visit']}) snapshot_url = reverse( 'api-1-snapshot', url_args={'snapshot_id': expected_visit['snapshot']}) expected_visit['origin'] = new_origin['url'] expected_visit['origin_visit_url'] = origin_visit_url expected_visit['snapshot_url'] = snapshot_url self.assertEqual(rv.data, expected_visits) @given(new_origin(), visit_dates(3), new_snapshots(3)) def test_api_lookup_origin_visits_by_id(self, new_origin, visit_dates, new_snapshots): origin_id = self.storage.origin_add_one(new_origin) new_origin['id'] = origin_id for i, visit_date in enumerate(visit_dates): origin_visit = self.storage.origin_visit_add(origin_id, visit_date) self.storage.snapshot_add(origin_id, origin_visit['visit'], new_snapshots[i]) all_visits = list(reversed(get_origin_visits(new_origin))) for last_visit, expected_visits in ( (None, all_visits[:2]), (all_visits[1]['visit'], all_visits[2:4])): url = reverse('api-1-origin-visits', url_args={'origin_url': new_origin['url']}, query_params={'per_page': 2, 'last_visit': last_visit}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') for expected_visit in expected_visits: origin_visit_url = reverse( 'api-1-origin-visit', url_args={'origin_url': new_origin['url'], 'visit_id': expected_visit['visit']}) snapshot_url = reverse( 'api-1-snapshot', url_args={'snapshot_id': expected_visit['snapshot']}) expected_visit['origin'] = new_origin['url'] expected_visit['origin_visit_url'] = origin_visit_url expected_visit['snapshot_url'] = snapshot_url self.assertEqual(rv.data, expected_visits) @given(new_origin(), visit_dates(3), new_snapshots(3)) def test_api_lookup_origin_visit(self, new_origin, visit_dates, new_snapshots): origin_id = self.storage.origin_add_one(new_origin) new_origin['id'] = origin_id for i, visit_date in enumerate(visit_dates): origin_visit = self.storage.origin_visit_add(origin_id, visit_date) visit_id = origin_visit['visit'] self.storage.snapshot_add(origin_id, origin_visit['visit'], new_snapshots[i]) url = reverse('api-1-origin-visit', url_args={'origin_url': new_origin['url'], 'visit_id': visit_id}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') expected_visit = self.origin_visit_get_by(origin_id, visit_id) origin_url = reverse('api-1-origin', url_args={'origin_url': new_origin['url']}) snapshot_url = reverse( 'api-1-snapshot', url_args={'snapshot_id': expected_visit['snapshot']}) expected_visit['origin'] = new_origin['url'] expected_visit['origin_url'] = origin_url expected_visit['snapshot_url'] = snapshot_url self.assertEqual(rv.data, expected_visit) + @pytest.mark.origin_id @given(new_origin(), visit_dates(3), new_snapshots(3)) def test_api_lookup_origin_visit_by_id(self, new_origin, visit_dates, new_snapshots): origin_id = self.storage.origin_add_one(new_origin) new_origin['id'] = origin_id for i, visit_date in enumerate(visit_dates): origin_visit = self.storage.origin_visit_add(origin_id, visit_date) visit_id = origin_visit['visit'] self.storage.snapshot_add(origin_id, origin_visit['visit'], new_snapshots[i]) url = reverse('api-1-origin-visit', url_args={'origin_id': origin_id, 'visit_id': visit_id}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') expected_visit = self.origin_visit_get_by(origin_id, visit_id) origin_url = reverse('api-1-origin', url_args={'origin_url': new_origin['url']}) snapshot_url = reverse( 'api-1-snapshot', url_args={'snapshot_id': expected_visit['snapshot']}) expected_visit['origin'] = new_origin['url'] expected_visit['origin_url'] = origin_url expected_visit['snapshot_url'] = snapshot_url self.assertEqual(rv.data, expected_visit) @given(origin()) def test_api_lookup_origin_visit_not_found(self, origin): all_visits = list(reversed(get_origin_visits(origin))) max_visit_id = max([v['visit'] for v in all_visits]) url = reverse('api-1-origin-visit', url_args={'origin_url': origin['url'], 'visit_id': max_visit_id + 1}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', 'reason': 'Origin %s or its visit with id %s not found!' % (origin['url'], max_visit_id+1) }) + @pytest.mark.origin_id @given(origin()) def test_api_lookup_origin_visit_not_found_by_id(self, origin): all_visits = list(reversed(get_origin_visits(origin))) max_visit_id = max([v['visit'] for v in all_visits]) url = reverse('api-1-origin-visit', url_args={'origin_id': origin['id'], 'visit_id': max_visit_id + 1}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', 'reason': 'Origin %s or its visit with id %s not found!' % (origin['url'], max_visit_id+1) }) + @pytest.mark.origin_id @given(origin()) def test_api_origin_by_id(self, origin): url = reverse('api-1-origin', url_args={'origin_id': origin['id']}) rv = self.client.get(url) expected_origin = self.origin_get(origin) origin_visits_url = reverse('api-1-origin-visits', url_args={'origin_url': origin['url']}) expected_origin['origin_visits_url'] = origin_visits_url self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_origin) @given(origin()) def test_api_origin_by_url(self, origin): url = reverse('api-1-origin', url_args={'origin_url': origin['url']}) rv = self.client.get(url) expected_origin = self.origin_get(origin) origin_visits_url = reverse('api-1-origin-visits', url_args={'origin_url': origin['url']}) expected_origin['origin_visits_url'] = origin_visits_url self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_origin) @given(origin()) def test_api_origin_by_type_url(self, origin): url = reverse('api-1-origin', url_args={'origin_type': origin['type'], 'origin_url': origin['url']}) rv = self.client.get(url) expected_origin = self.origin_get(origin) origin_visits_url = reverse('api-1-origin-visits', url_args={'origin_url': origin['url']}) expected_origin['origin_visits_url'] = origin_visits_url self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_origin) @given(new_origin()) def test_api_origin_not_found(self, new_origin): url = reverse('api-1-origin', url_args={'origin_type': new_origin['type'], 'origin_url': new_origin['url']}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', 'reason': 'Origin %s not found!' % new_origin['url'] }) @given(origin()) def test_api_origin_metadata_search(self, origin): with patch('swh.web.common.service.idx_storage') as mock_idx_storage: mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ .side_effect = lambda conjunction, limit: [{ 'from_revision': ( b'p&\xb7\xc1\xa2\xafVR\x1e\x95\x1c\x01\xed ' b'\xf2U\xfa\x05B8'), 'metadata': {'author': 'Jane Doe'}, - 'id': origin['id'], + 'origin_url': origin['url'], 'tool': { 'configuration': { 'context': ['NpmMapping', 'CodemetaMapping'], 'type': 'local' }, 'id': 3, 'name': 'swh-metadata-detector', 'version': '0.0.1' } }] url = reverse('api-1-origin-metadata-search', query_params={'fulltext': 'Jane Doe'}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200, rv.content) self.assertEqual(rv['Content-Type'], 'application/json') expected_data = [{ - 'id': origin['id'], 'type': origin['type'], 'url': origin['url'], 'metadata': { 'metadata': {'author': 'Jane Doe'}, 'from_revision': ( '7026b7c1a2af56521e951c01ed20f255fa054238'), 'tool': { 'configuration': { 'context': ['NpmMapping', 'CodemetaMapping'], 'type': 'local' }, 'id': 3, 'name': 'swh-metadata-detector', 'version': '0.0.1', } } }] + actual_data = rv.data + for d in actual_data: + if 'id' in d: + del d['id'] self.assertEqual(rv.data, expected_data) mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ .assert_called_with(conjunction=['Jane Doe'], limit=70) @given(origin()) def test_api_origin_metadata_search_limit(self, origin): with patch('swh.web.common.service.idx_storage') as mock_idx_storage: mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ .side_effect = lambda conjunction, limit: [{ 'from_revision': ( b'p&\xb7\xc1\xa2\xafVR\x1e\x95\x1c\x01\xed ' b'\xf2U\xfa\x05B8'), 'metadata': {'author': 'Jane Doe'}, - 'id': origin['id'], + 'origin_url': origin['url'], 'tool': { 'configuration': { 'context': ['NpmMapping', 'CodemetaMapping'], 'type': 'local' }, 'id': 3, 'name': 'swh-metadata-detector', 'version': '0.0.1' } }] url = reverse('api-1-origin-metadata-search', query_params={'fulltext': 'Jane Doe'}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200, rv.content) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(len(rv.data), 1) mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ .assert_called_with(conjunction=['Jane Doe'], limit=70) url = reverse('api-1-origin-metadata-search', query_params={'fulltext': 'Jane Doe', 'limit': 10}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200, rv.content) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(len(rv.data), 1) mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ .assert_called_with(conjunction=['Jane Doe'], limit=10) url = reverse('api-1-origin-metadata-search', query_params={'fulltext': 'Jane Doe', 'limit': 987}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200, rv.content) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(len(rv.data), 1) mock_idx_storage.origin_intrinsic_metadata_search_fulltext \ .assert_called_with(conjunction=['Jane Doe'], limit=100) @given(origin()) def test_api_origin_intrinsic_metadata(self, origin): with patch('swh.web.common.service.idx_storage') as mock_idx_storage: mock_idx_storage.origin_intrinsic_metadata_get \ .side_effect = lambda origin_ids: [{ 'from_revision': ( b'p&\xb7\xc1\xa2\xafVR\x1e\x95\x1c\x01\xed ' b'\xf2U\xfa\x05B8'), 'metadata': {'author': 'Jane Doe'}, - 'id': origin['id'], + 'origin_url': origin['url'], 'tool': { 'configuration': { 'context': ['NpmMapping', 'CodemetaMapping'], 'type': 'local' }, 'id': 3, 'name': 'swh-metadata-detector', 'version': '0.0.1' } }] url = reverse('api-origin-intrinsic-metadata', url_args={'origin_type': origin['type'], 'origin_url': origin['url']}) rv = self.client.get(url) mock_idx_storage.origin_intrinsic_metadata_get \ - .assert_called_once_with([origin['id']]) + .assert_called_once_with([origin['url']]) self.assertEqual(rv.status_code, 200, rv.content) self.assertEqual(rv['Content-Type'], 'application/json') expected_data = {'author': 'Jane Doe'} self.assertEqual(rv.data, expected_data) @patch('swh.web.common.service.idx_storage') def test_api_origin_metadata_search_invalid(self, mock_idx_storage): url = reverse('api-1-origin-metadata-search') rv = self.client.get(url) self.assertEqual(rv.status_code, 400, rv.content) mock_idx_storage.assert_not_called() + @pytest.mark.origin_id @given(new_origins(10)) def test_api_lookup_origins(self, new_origins): nb_origins = len(new_origins) expected_origins = self.storage.origin_add(new_origins) origin_from_idx = random.randint(1, nb_origins-1) - 1 origin_from = expected_origins[origin_from_idx]['id'] max_origin_id = expected_origins[-1]['id'] origin_count = random.randint(1, max_origin_id - origin_from) url = reverse('api-1-origins', query_params={'origin_from': origin_from, 'origin_count': origin_count}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200, rv.data) start = origin_from_idx end = origin_from_idx + origin_count expected_origins = expected_origins[start:end] for expected_origin in expected_origins: expected_origin['origin_visits_url'] = reverse( 'api-1-origin-visits', url_args={'origin_url': expected_origin['url']}) self.assertEqual(rv.data, expected_origins) next_origin_id = expected_origins[-1]['id']+1 if self.storage.origin_get({'id': next_origin_id}): self.assertIn('Link', rv) next_url = reverse('api-1-origins', query_params={'origin_from': next_origin_id, 'origin_count': origin_count}) self.assertIn(next_url, rv['Link']) diff --git a/swh/web/tests/api/views/test_revision.py b/swh/web/tests/api/views/test_revision.py index 8703732c..77f40203 100644 --- a/swh/web/tests/api/views/test_revision.py +++ b/swh/web/tests/api/views/test_revision.py @@ -1,539 +1,570 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random from hypothesis import given +import pytest from rest_framework.test import APITestCase from unittest.mock import patch from swh.model.hashutil import hash_to_hex from swh.web.common.exc import NotFoundExc from swh.web.common.utils import reverse, parse_timestamp from swh.web.tests.data import random_sha1 from swh.web.tests.strategies import ( revision, new_revision, origin, origin_with_multiple_visits ) from swh.web.tests.testcase import WebTestCase class RevisionApiTestCase(WebTestCase, APITestCase): @given(revision()) def test_api_revision(self, revision): url = reverse('api-1-revision', url_args={'sha1_git': revision}) rv = self.client.get(url) expected_revision = self.revision_get(revision) self._enrich_revision(expected_revision) self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_revision) def test_api_revision_not_found(self): unknown_revision_ = random_sha1() url = reverse('api-1-revision', url_args={'sha1_git': unknown_revision_}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', 'reason': 'Revision with sha1_git %s not found.' % unknown_revision_}) @given(revision()) def test_api_revision_raw_ok(self, revision): url = reverse('api-1-revision-raw-message', url_args={'sha1_git': revision}) rv = self.client.get(url) expected_message = self.revision_get(revision)['message'] self.assertEqual(rv.status_code, 200) self.assertEqual(rv['Content-Type'], 'application/octet-stream') self.assertEqual(rv.content, expected_message.encode()) @given(new_revision()) def test_api_revision_raw_ok_no_msg(self, new_revision): del new_revision['message'] self.storage.revision_add([new_revision]) new_revision_id = hash_to_hex(new_revision['id']) url = reverse('api-1-revision-raw-message', url_args={'sha1_git': new_revision_id}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', 'reason': 'No message for revision with sha1_git %s.' % new_revision_id}) def test_api_revision_raw_ko_no_rev(self): unknown_revision_ = random_sha1() url = reverse('api-1-revision-raw-message', url_args={'sha1_git': unknown_revision_}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', 'reason': 'Revision with sha1_git %s not found.' % unknown_revision_}) - def test_api_revision_with_origin_not_found(self): - unknown_origin_id_ = random.randint(1000, 1000000) + @pytest.mark.origin_id + def test_api_revision_with_origin_id_not_found(self): + unknown_origin_id = random.randint(1000, 1000000) url = reverse('api-1-revision-origin', - url_args={'origin_id': unknown_origin_id_}) + url_args={'origin_id': unknown_origin_id}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') + self.assertEqual(rv['content-type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', 'reason': 'Origin %s not found!' % - unknown_origin_id_}) + unknown_origin_id}) + @pytest.mark.origin_id @given(origin()) - def test_api_revision_with_origin(self, origin): + def test_api_revision_with_origin_id(self, origin): url = reverse('api-1-revision-origin', url_args={'origin_id': origin['id']}) rv = self.client.get(url) - snapshot = self.snapshot_get_latest(origin['id']) + snapshot = self.snapshot_get_latest(origin['url']) expected_revision = self.revision_get( snapshot['branches']['HEAD']['target']) self._enrich_revision(expected_revision) self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_revision) + @pytest.mark.origin_id @given(origin()) - def test_api_revision_with_origin_and_branch_name(self, origin): + def test_api_revision_with_origin_id_and_branch_name(self, origin): - snapshot = self.snapshot_get_latest(origin['id']) + snapshot = self.snapshot_get_latest(origin['url']) branch_name = random.choice( list(b for b in snapshot['branches'].keys() if snapshot['branches'][b]['target_type'] == 'revision')) url = reverse('api-1-revision-origin', url_args={'origin_id': origin['id'], 'branch_name': branch_name}) rv = self.client.get(url) expected_revision = self.revision_get( snapshot['branches'][branch_name]['target']) self._enrich_revision(expected_revision) self.assertEqual(rv.status_code, 200, rv.data) - self.assertEqual(rv['Content-Type'], 'application/json') + self.assertEqual(rv['content-type'], 'application/json') self.assertEqual(rv.data, expected_revision) + @pytest.mark.origin_id @given(origin_with_multiple_visits()) - def test_api_revision_with_origin_and_branch_name_and_ts(self, origin): + def test_api_revision_with_origin_id_and_branch_name_and_ts(self, origin): - visit = random.choice(self.origin_visit_get(origin['id'])) + visit = random.choice(self.origin_visit_get(origin['url'])) snapshot = self.snapshot_get(visit['snapshot']) branch_name = random.choice( list(b for b in snapshot['branches'].keys() if snapshot['branches'][b]['target_type'] == 'revision')) url = reverse('api-1-revision-origin', url_args={'origin_id': origin['id'], 'branch_name': branch_name, 'ts': visit['date']}) rv = self.client.get(url) expected_revision = self.revision_get( snapshot['branches'][branch_name]['target']) self._enrich_revision(expected_revision) self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_revision) + @pytest.mark.origin_id @given(origin_with_multiple_visits()) - def test_api_revision_with_origin_and_branch_name_and_ts_escapes(self, - origin): - visit = random.choice(self.origin_visit_get(origin['id'])) + def test_api_revision_with_origin_id_and_branch_name_and_ts_escapes( + self, origin): + visit = random.choice(self.origin_visit_get(origin['url'])) snapshot = self.snapshot_get(visit['snapshot']) branch_name = random.choice( list(b for b in snapshot['branches'].keys() if snapshot['branches'][b]['target_type'] == 'revision')) date = parse_timestamp(visit['date']) formatted_date = date.strftime('Today is %B %d, %Y at %X') url = reverse('api-1-revision-origin', url_args={'origin_id': origin['id'], 'branch_name': branch_name, 'ts': formatted_date}) rv = self.client.get(url) expected_revision = self.revision_get( snapshot['branches'][branch_name]['target']) self._enrich_revision(expected_revision) self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_revision) - def test_api_directory_through_revision_origin_ko(self): + @pytest.mark.origin_id + def test_api_directory_through_revision_origin_id_ko(self): unknown_origin_id_ = random.randint(1000, 1000000) url = reverse('api-1-revision-origin-directory', url_args={'origin_id': unknown_origin_id_}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', 'reason': 'Origin %s not found!' % unknown_origin_id_ }) + @pytest.mark.origin_id @given(origin()) - def test_api_directory_through_revision_origin(self, origin): + def test_api_directory_through_revision_origin_id(self, origin): url = reverse('api-1-revision-origin-directory', url_args={'origin_id': origin['id']}) rv = self.client.get(url) snapshot = self.snapshot_get_latest(origin['id']) revision_id = snapshot['branches']['HEAD']['target'] revision = self.revision_get(revision_id) directory = self.directory_ls(revision['directory']) for entry in directory: if entry['type'] == 'dir': entry['target_url'] = reverse( 'api-1-directory', url_args={'sha1_git': entry['target']} ) entry['dir_url'] = reverse( 'api-1-revision-origin-directory', url_args={'origin_id': origin['id'], 'path': entry['name']}) elif entry['type'] == 'file': entry['target_url'] = reverse( 'api-1-content', url_args={'q': 'sha1_git:%s' % entry['target']} ) entry['file_url'] = reverse( 'api-1-revision-origin-directory', url_args={'origin_id': origin['id'], 'path': entry['name']}) elif entry['type'] == 'rev': entry['target_url'] = reverse( 'api-1-revision', url_args={'sha1_git': entry['target']} ) entry['rev_url'] = reverse( 'api-1-revision-origin-directory', url_args={'origin_id': origin['id'], 'path': entry['name']}) expected_result = { 'content': directory, 'path': '.', 'revision': revision_id, 'type': 'dir' } self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_result) @given(revision()) def test_api_revision_log(self, revision): per_page = 10 url = reverse('api-1-revision-log', url_args={'sha1_git': revision}, query_params={'per_page': per_page}) rv = self.client.get(url) expected_log = self.revision_log(revision, limit=per_page+1) expected_log = list(map(self._enrich_revision, expected_log)) has_next = len(expected_log) > per_page self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_log[:-1] if has_next else expected_log) if has_next: self.assertIn('Link', rv) next_log_url = reverse( 'api-1-revision-log', url_args={'sha1_git': expected_log[-1]['id']}, query_params={'per_page': per_page}) self.assertIn(next_log_url, rv['Link']) def test_api_revision_log_not_found(self): unknown_revision_ = random_sha1() url = reverse('api-1-revision-log', url_args={'sha1_git': unknown_revision_}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', 'reason': 'Revision with sha1_git %s not found.' % unknown_revision_}) self.assertFalse(rv.has_header('Link')) @given(revision()) def test_api_revision_log_context(self, revision): revisions = self.revision_log(revision, limit=4) prev_rev = revisions[0]['id'] rev = revisions[-1]['id'] per_page = 10 url = reverse('api-1-revision-log', url_args={'sha1_git': rev, 'prev_sha1s': prev_rev}, query_params={'per_page': per_page}) rv = self.client.get(url) expected_log = self.revision_log(rev, limit=per_page) prev_revision = self.revision_get(prev_rev) expected_log.insert(0, prev_revision) expected_log = list(map(self._enrich_revision, expected_log)) self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_log) + @pytest.mark.origin_id @given(origin()) - def test_api_revision_log_by(self, origin): + def test_api_revision_log_by_origin_id(self, origin): per_page = 10 url = reverse('api-1-revision-origin-log', url_args={'origin_id': origin['id']}, query_params={'per_page': per_page}) rv = self.client.get(url) - snapshot = self.snapshot_get_latest(origin['id']) + snapshot = self.snapshot_get_latest(origin['url']) expected_log = self.revision_log( snapshot['branches']['HEAD']['target'], limit=per_page+1) expected_log = list(map(self._enrich_revision, expected_log)) has_next = len(expected_log) > per_page self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_log[:-1] if has_next else expected_log) if has_next: self.assertIn('Link', rv) next_log_url = reverse( 'api-1-revision-origin-log', url_args={'origin_id': origin['id'], 'branch_name': 'HEAD'}, query_params={'per_page': per_page, 'sha1_git': expected_log[-1]['id']}) self.assertIn(next_log_url, rv['Link']) + @pytest.mark.origin_id @given(origin()) def test_api_revision_log_by_ko(self, origin): invalid_branch_name = 'foobar' url = reverse('api-1-revision-origin-log', url_args={'origin_id': origin['id'], 'branch_name': invalid_branch_name}) rv = self.client.get(url) self.assertEqual(rv.status_code, 404, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertFalse(rv.has_header('Link')) self.assertEqual( rv.data, {'exception': 'NotFoundExc', 'reason': 'Revision for origin %s and branch %s not found.' % (origin['id'], invalid_branch_name)}) + @pytest.mark.origin_id + @given(origin()) + def test_api_revision_log_by_origin_id_ko(self, origin): + + invalid_branch_name = 'foobar' + + url = reverse('api-1-revision-origin-log', + url_args={'origin_id': origin['id'], + 'branch_name': invalid_branch_name}) + + rv = self.client.get(url) + + self.assertEqual(rv.status_code, 404, rv.data) + self.assertEqual(rv['Content-Type'], 'application/json') + self.assertFalse(rv.has_header('Link')) + self.assertEqual( + rv.data, + {'exception': 'NotFoundExc', + 'reason': 'Revision for origin %s and branch %s not found.' % + (origin['id'], invalid_branch_name)}) + @patch('swh.web.api.views.revision._revision_directory_by') def test_api_revision_directory_ko_not_found(self, mock_rev_dir): # given mock_rev_dir.side_effect = NotFoundExc('Not found') # then rv = self.client.get('/api/1/revision/999/directory/some/path/to/dir/') self.assertEqual(rv.status_code, 404, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', 'reason': 'Not found'}) mock_rev_dir.assert_called_once_with( {'sha1_git': '999'}, 'some/path/to/dir', '/api/1/revision/999/directory/some/path/to/dir/', with_data=False) @patch('swh.web.api.views.revision._revision_directory_by') def test_api_revision_directory_ok_returns_dir_entries(self, mock_rev_dir): stub_dir = { 'type': 'dir', 'revision': '999', 'content': [ { 'sha1_git': '789', 'type': 'file', 'target': '101', 'target_url': '/api/1/content/sha1_git:101/', 'name': 'somefile', 'file_url': '/api/1/revision/999/directory/some/path/' 'somefile/' }, { 'sha1_git': '123', 'type': 'dir', 'target': '456', 'target_url': '/api/1/directory/456/', 'name': 'to-subdir', 'dir_url': '/api/1/revision/999/directory/some/path/' 'to-subdir/', }] } # given mock_rev_dir.return_value = stub_dir # then rv = self.client.get('/api/1/revision/999/directory/some/path/') self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, stub_dir) mock_rev_dir.assert_called_once_with( {'sha1_git': '999'}, 'some/path', '/api/1/revision/999/directory/some/path/', with_data=False) @patch('swh.web.api.views.revision._revision_directory_by') def test_api_revision_directory_ok_returns_content(self, mock_rev_dir): stub_content = { 'type': 'file', 'revision': '999', 'content': { 'sha1_git': '789', 'sha1': '101', 'data_url': '/api/1/content/101/raw/', } } # given mock_rev_dir.return_value = stub_content # then url = '/api/1/revision/666/directory/some/other/path/' rv = self.client.get(url) self.assertEqual(rv.status_code, 200, rv.data) self.assertEqual(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, stub_content) mock_rev_dir.assert_called_once_with( {'sha1_git': '666'}, 'some/other/path', url, with_data=False) def _enrich_revision(self, revision): author_url = reverse( 'api-1-person', url_args={'person_id': revision['author']['id']}) committer_url = reverse( 'api-1-person', url_args={'person_id': revision['committer']['id']}) directory_url = reverse( 'api-1-directory', url_args={'sha1_git': revision['directory']}) history_url = reverse('api-1-revision-log', url_args={'sha1_git': revision['id']}) parents_id_url = [] for p in revision['parents']: parents_id_url.append({ 'id': p, 'url': reverse('api-1-revision', url_args={'sha1_git': p}) }) revision_url = reverse('api-1-revision', url_args={'sha1_git': revision['id']}) revision['author_url'] = author_url revision['committer_url'] = committer_url revision['directory_url'] = directory_url revision['history_url'] = history_url revision['url'] = revision_url revision['parents'] = parents_id_url return revision @given(revision()) def test_api_revision_uppercase(self, revision): url = reverse('api-1-revision-uppercase-checksum', url_args={'sha1_git': revision.upper()}) resp = self.client.get(url) self.assertEqual(resp.status_code, 302) redirect_url = reverse('api-1-revision', url_args={'sha1_git': revision}) self.assertEqual(resp['location'], redirect_url) diff --git a/swh/web/tests/browse/test_utils.py b/swh/web/tests/browse/test_utils.py index a9df615e..52e58a53 100644 --- a/swh/web/tests/browse/test_utils.py +++ b/swh/web/tests/browse/test_utils.py @@ -1,90 +1,90 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from hypothesis import given from swh.web.browse import utils from swh.web.common.utils import reverse, format_utc_iso_date from swh.web.tests.strategies import origin_with_multiple_visits from swh.web.tests.testcase import WebTestCase class SwhBrowseUtilsTestCase(WebTestCase): def test_get_mimetype_and_encoding_for_content(self): text = b'Hello world!' self.assertEqual(utils.get_mimetype_and_encoding_for_content(text), ('text/plain', 'us-ascii')) @given(origin_with_multiple_visits()) def test_get_origin_visit_snapshot_simple(self, origin): - visits = self.origin_visit_get(origin['id']) + visits = self.origin_visit_get(origin['url']) for visit in visits: snapshot = self.snapshot_get(visit['snapshot']) branches = [] releases = [] for branch in sorted(snapshot['branches'].keys()): branch_data = snapshot['branches'][branch] if branch_data['target_type'] == 'revision': rev_data = self.revision_get(branch_data['target']) branches.append({ 'name': branch, 'revision': branch_data['target'], 'directory': rev_data['directory'], 'date': format_utc_iso_date(rev_data['date']), 'message': rev_data['message'] }) elif branch_data['target_type'] == 'release': rel_data = self.release_get(branch_data['target']) rev_data = self.revision_get(rel_data['target']) releases.append({ 'name': rel_data['name'], 'branch_name': branch, 'date': format_utc_iso_date(rel_data['date']), 'id': rel_data['id'], 'message': rel_data['message'], 'target_type': rel_data['target_type'], 'target': rel_data['target'], 'directory': rev_data['directory'] }) assert branches and releases, 'Incomplete test data.' origin_visit_branches = utils.get_origin_visit_snapshot( origin, visit_id=visit['visit']) self.assertEqual(origin_visit_branches, (branches, releases)) def test_gen_link(self): self.assertEqual( utils.gen_link('https://www.softwareheritage.org/', 'swh'), 'swh') def test_gen_person_link(self): person_id = 8221896 person_name = 'Antoine Lambert' person_url = reverse('browse-person', url_args={'person_id': person_id}) self.assertEqual(utils.gen_person_link(person_id, person_name, link_attrs=None), '%s' % (person_url, person_name)) def test_gen_revision_link(self): revision_id = '28a0bc4120d38a394499382ba21d6965a67a3703' revision_url = reverse('browse-revision', url_args={'sha1_git': revision_id}) self.assertEqual(utils.gen_revision_link(revision_id, link_text=None, link_attrs=None), '%s' % (revision_url, revision_id)) self.assertEqual( utils.gen_revision_link(revision_id, shorten_id=True, link_attrs=None), '%s' % (revision_url, revision_id[:7])) diff --git a/swh/web/tests/browse/views/test_origin.py b/swh/web/tests/browse/views/test_origin.py index 78df2e3b..bc66c206 100644 --- a/swh/web/tests/browse/views/test_origin.py +++ b/swh/web/tests/browse/views/test_origin.py @@ -1,901 +1,902 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random from unittest.mock import patch from django.utils.html import escape from hypothesis import given from swh.model.hashutil import hash_to_bytes from swh.web.browse.utils import process_snapshot_branches from swh.web.common.exc import NotFoundExc from swh.web.common.utils import ( reverse, gen_path_info, format_utc_iso_date, parse_timestamp, get_swh_persistent_id ) from swh.web.tests.data import get_content from swh.web.tests.strategies import ( origin, origin_with_multiple_visits, new_origin, new_snapshot, visit_dates, revisions ) from swh.web.tests.testcase import WebTestCase class SwhBrowseOriginTest(WebTestCase): @given(origin_with_multiple_visits()) def test_origin_visits_browse(self, origin): url = reverse('browse-origin-visits', url_args={'origin_type': origin['type'], 'origin_url': origin['url']}) resp = self.client.get(url) self.assertEqual(resp.status_code, 200) self.assertTemplateUsed('origin-visits.html') url = reverse('browse-origin-visits', url_args={'origin_url': origin['url']}) resp = self.client.get(url) self.assertEqual(resp.status_code, 200) self.assertTemplateUsed('origin-visits.html') - visits = self.origin_visit_get(origin['id']) + visits = self.origin_visit_get(origin['url']) for v in visits: vdate = format_utc_iso_date(v['date'], '%Y-%m-%dT%H:%M:%SZ') browse_dir_url = reverse('browse-origin-directory', url_args={'origin_url': origin['url'], 'timestamp': vdate}) self.assertContains(resp, browse_dir_url) def origin_content_view_helper(self, origin_info, origin_visits, origin_branches, origin_releases, root_dir_sha1, content, visit_id=None, timestamp=None): content_path = '/'.join(content['path'].split('/')[1:]) url_args = {'origin_type': origin_info['type'], 'origin_url': origin_info['url'], 'path': content_path} if not visit_id: visit_id = origin_visits[-1]['visit'] query_params = {} if timestamp: url_args['timestamp'] = timestamp if visit_id: query_params['visit_id'] = visit_id url = reverse('browse-origin-content', url_args=url_args, query_params=query_params) resp = self.client.get(url) self.assertEqual(resp.status_code, 200) self.assertTemplateUsed('content.html') self.assertContains(resp, '' % content['hljs_language']) self.assertContains(resp, escape(content['data'])) split_path = content_path.split('/') filename = split_path[-1] path = content_path.replace(filename, '')[:-1] path_info = gen_path_info(path) del url_args['path'] if timestamp: url_args['timestamp'] = \ format_utc_iso_date(parse_timestamp(timestamp).isoformat(), '%Y-%m-%dT%H:%M:%S') root_dir_url = reverse('browse-origin-directory', url_args=url_args, query_params=query_params) self.assertContains(resp, '
  • ', count=len(path_info)+1) self.assertContains(resp, '%s' % (root_dir_url, root_dir_sha1[:7])) for p in path_info: url_args['path'] = p['path'] dir_url = reverse('browse-origin-directory', url_args=url_args, query_params=query_params) self.assertContains(resp, '%s' % (dir_url, p['name'])) self.assertContains(resp, '
  • %s
  • ' % filename) query_string = 'sha1_git:' + content['sha1_git'] url_raw = reverse('browse-content-raw', url_args={'query_string': query_string}, query_params={'filename': filename}) self.assertContains(resp, url_raw) if 'args' in url_args: del url_args['path'] origin_branches_url = reverse('browse-origin-branches', url_args=url_args, query_params=query_params) self.assertContains(resp, 'Branches (%s)' % (origin_branches_url, len(origin_branches))) origin_releases_url = reverse('browse-origin-releases', url_args=url_args, query_params=query_params) self.assertContains(resp, 'Releases (%s)' % (origin_releases_url, len(origin_releases))) self.assertContains(resp, '
  • ', count=len(origin_branches)) url_args['path'] = content_path for branch in origin_branches: query_params['branch'] = branch['name'] root_dir_branch_url = reverse('browse-origin-content', url_args=url_args, query_params=query_params) self.assertContains(resp, '' % root_dir_branch_url) self.assertContains(resp, '
  • ', count=len(origin_releases)) query_params['branch'] = None for release in origin_releases: query_params['release'] = release['name'] root_dir_release_url = reverse('browse-origin-content', url_args=url_args, query_params=query_params) self.assertContains(resp, '' % root_dir_release_url) del url_args['origin_type'] url = reverse('browse-origin-content', url_args=url_args, query_params=query_params) resp = self.client.get(url) self.assertEqual(resp.status_code, 200) self.assertTemplateUsed('content.html') swh_cnt_id = get_swh_persistent_id('content', content['sha1_git']) swh_cnt_id_url = reverse('browse-swh-id', url_args={'swh_id': swh_cnt_id}) self.assertContains(resp, swh_cnt_id) self.assertContains(resp, swh_cnt_id_url) self.assertContains(resp, 'swh-take-new-snapshot') @given(origin_with_multiple_visits()) def test_origin_content_view(self, origin): - origin_visits = self.origin_visit_get(origin['id']) + origin_visits = self.origin_visit_get(origin['url']) def _get_test_data(visit_idx): snapshot = self.snapshot_get(origin_visits[visit_idx]['snapshot']) head_rev_id = snapshot['branches']['HEAD']['target'] head_rev = self.revision_get(head_rev_id) dir_content = self.directory_ls(head_rev['directory']) dir_files = [e for e in dir_content if e['type'] == 'file'] dir_file = random.choice(dir_files) branches, releases = process_snapshot_branches(snapshot) return { 'branches': branches, 'releases': releases, 'root_dir_sha1': head_rev['directory'], 'content': get_content(dir_file['checksums']['sha1']), 'visit': origin_visits[visit_idx] } test_data = _get_test_data(-1) self.origin_content_view_helper(origin, origin_visits, test_data['branches'], test_data['releases'], test_data['root_dir_sha1'], test_data['content']) self.origin_content_view_helper(origin, origin_visits, test_data['branches'], test_data['releases'], test_data['root_dir_sha1'], test_data['content'], timestamp=test_data['visit']['date']) visit_unix_ts = parse_timestamp(test_data['visit']['date']).timestamp() visit_unix_ts = int(visit_unix_ts) self.origin_content_view_helper(origin, origin_visits, test_data['branches'], test_data['releases'], test_data['root_dir_sha1'], test_data['content'], timestamp=visit_unix_ts) test_data = _get_test_data(0) self.origin_content_view_helper(origin, origin_visits, test_data['branches'], test_data['releases'], test_data['root_dir_sha1'], test_data['content'], visit_id=test_data['visit']['visit']) def origin_directory_view_helper(self, origin_info, origin_visits, origin_branches, origin_releases, root_directory_sha1, directory_entries, visit_id=None, timestamp=None, path=None): dirs = [e for e in directory_entries if e['type'] in ('dir', 'rev')] files = [e for e in directory_entries if e['type'] == 'file'] if not visit_id: visit_id = origin_visits[-1]['visit'] url_args = {'origin_url': origin_info['url']} query_params = {} if timestamp: url_args['timestamp'] = timestamp else: query_params['visit_id'] = visit_id if path: url_args['path'] = path url = reverse('browse-origin-directory', url_args=url_args, query_params=query_params) resp = self.client.get(url) self.assertEqual(resp.status_code, 200) self.assertTemplateUsed('directory.html') self.assertEqual(resp.status_code, 200) self.assertTemplateUsed('directory.html') self.assertContains(resp, '', count=len(dirs)) self.assertContains(resp, '', count=len(files)) if timestamp: url_args['timestamp'] = \ format_utc_iso_date(parse_timestamp(timestamp).isoformat(), '%Y-%m-%dT%H:%M:%S') for d in dirs: if d['type'] == 'rev': dir_url = reverse('browse-revision', url_args={'sha1_git': d['target']}) else: dir_path = d['name'] if path: dir_path = "%s/%s" % (path, d['name']) dir_url_args = dict(url_args) dir_url_args['path'] = dir_path dir_url = reverse('browse-origin-directory', url_args=dir_url_args, query_params=query_params) self.assertContains(resp, dir_url) for f in files: file_path = f['name'] if path: file_path = "%s/%s" % (path, f['name']) file_url_args = dict(url_args) file_url_args['path'] = file_path file_url = reverse('browse-origin-content', url_args=file_url_args, query_params=query_params) self.assertContains(resp, file_url) if 'path' in url_args: del url_args['path'] root_dir_branch_url = \ reverse('browse-origin-directory', url_args=url_args, query_params=query_params) nb_bc_paths = 1 if path: nb_bc_paths = len(path.split('/')) + 1 self.assertContains(resp, '
  • ', count=nb_bc_paths) self.assertContains(resp, '%s' % (root_dir_branch_url, root_directory_sha1[:7])) origin_branches_url = reverse('browse-origin-branches', url_args=url_args, query_params=query_params) self.assertContains(resp, 'Branches (%s)' % (origin_branches_url, len(origin_branches))) origin_releases_url = reverse('browse-origin-releases', url_args=url_args, query_params=query_params) nb_releases = len(origin_releases) if nb_releases > 0: self.assertContains(resp, 'Releases (%s)' % (origin_releases_url, nb_releases)) if path: url_args['path'] = path self.assertContains(resp, '
  • ', count=len(origin_branches)) for branch in origin_branches: query_params['branch'] = branch['name'] root_dir_branch_url = \ reverse('browse-origin-directory', url_args=url_args, query_params=query_params) self.assertContains(resp, '' % root_dir_branch_url) self.assertContains(resp, '
  • ', count=len(origin_releases)) query_params['branch'] = None for release in origin_releases: query_params['release'] = release['name'] root_dir_release_url = \ reverse('browse-origin-directory', url_args=url_args, query_params=query_params) self.assertContains(resp, '' % root_dir_release_url) self.assertContains(resp, 'vault-cook-directory') self.assertContains(resp, 'vault-cook-revision') swh_dir_id = get_swh_persistent_id('directory', directory_entries[0]['dir_id']) # noqa swh_dir_id_url = reverse('browse-swh-id', url_args={'swh_id': swh_dir_id}) self.assertContains(resp, swh_dir_id) self.assertContains(resp, swh_dir_id_url) self.assertContains(resp, 'swh-take-new-snapshot') @given(origin()) def test_origin_root_directory_view(self, origin): - origin_visits = self.origin_visit_get(origin['id']) + origin_visits = self.origin_visit_get(origin['url']) visit = origin_visits[-1] snapshot = self.snapshot_get(visit['snapshot']) head_rev_id = snapshot['branches']['HEAD']['target'] head_rev = self.revision_get(head_rev_id) root_dir_sha1 = head_rev['directory'] dir_content = self.directory_ls(root_dir_sha1) branches, releases = process_snapshot_branches(snapshot) visit_unix_ts = parse_timestamp(visit['date']).timestamp() visit_unix_ts = int(visit_unix_ts) self.origin_directory_view_helper(origin, origin_visits, branches, releases, root_dir_sha1, dir_content) self.origin_directory_view_helper(origin, origin_visits, branches, releases, root_dir_sha1, dir_content, visit_id=visit['visit']) self.origin_directory_view_helper(origin, origin_visits, branches, releases, root_dir_sha1, dir_content, timestamp=visit_unix_ts) self.origin_directory_view_helper(origin, origin_visits, branches, releases, root_dir_sha1, dir_content, timestamp=visit['date']) origin = dict(origin) del origin['type'] self.origin_directory_view_helper(origin, origin_visits, branches, releases, root_dir_sha1, dir_content) self.origin_directory_view_helper(origin, origin_visits, branches, releases, root_dir_sha1, dir_content, visit_id=visit['visit']) self.origin_directory_view_helper(origin, origin_visits, branches, releases, root_dir_sha1, dir_content, timestamp=visit_unix_ts) self.origin_directory_view_helper(origin, origin_visits, branches, releases, root_dir_sha1, dir_content, timestamp=visit['date']) @given(origin()) def test_origin_sub_directory_view(self, origin): - origin_visits = self.origin_visit_get(origin['id']) + origin_visits = self.origin_visit_get(origin['url']) visit = origin_visits[-1] snapshot = self.snapshot_get(visit['snapshot']) head_rev_id = snapshot['branches']['HEAD']['target'] head_rev = self.revision_get(head_rev_id) root_dir_sha1 = head_rev['directory'] subdirs = [e for e in self.directory_ls(root_dir_sha1) if e['type'] == 'dir'] branches, releases = process_snapshot_branches(snapshot) visit_unix_ts = parse_timestamp(visit['date']).timestamp() visit_unix_ts = int(visit_unix_ts) if len(subdirs) == 0: return subdir = random.choice(subdirs) subdir_content = self.directory_ls(subdir['target']) subdir_path = subdir['name'] self.origin_directory_view_helper(origin, origin_visits, branches, releases, root_dir_sha1, subdir_content, path=subdir_path) self.origin_directory_view_helper(origin, origin_visits, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, visit_id=visit['visit']) self.origin_directory_view_helper(origin, origin_visits, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, timestamp=visit_unix_ts) self.origin_directory_view_helper(origin, origin_visits, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, timestamp=visit['date']) origin = dict(origin) del origin['type'] self.origin_directory_view_helper(origin, origin_visits, branches, releases, root_dir_sha1, subdir_content, path=subdir_path) self.origin_directory_view_helper(origin, origin_visits, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, visit_id=visit['visit']) self.origin_directory_view_helper(origin, origin_visits, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, timestamp=visit_unix_ts) self.origin_directory_view_helper(origin, origin_visits, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, timestamp=visit['date']) def origin_branches_helper(self, origin_info, origin_snapshot): url_args = {'origin_type': origin_info['type'], 'origin_url': origin_info['url']} url = reverse('browse-origin-branches', url_args=url_args) resp = self.client.get(url) self.assertEqual(resp.status_code, 200) self.assertTemplateUsed('branches.html') origin_branches = origin_snapshot[0] origin_releases = origin_snapshot[1] origin_branches_url = reverse('browse-origin-branches', url_args=url_args) self.assertContains(resp, 'Branches (%s)' % (origin_branches_url, len(origin_branches))) origin_releases_url = reverse('browse-origin-releases', url_args=url_args) nb_releases = len(origin_releases) if nb_releases > 0: self.assertContains(resp, 'Releases (%s)' % (origin_releases_url, nb_releases)) self.assertContains(resp, '' % escape(browse_branch_url)) browse_revision_url = reverse( 'browse-revision', url_args={'sha1_git': branch['revision']}, query_params={'origin_type': origin_info['type'], 'origin': origin_info['url']}) self.assertContains(resp, '' % escape(browse_revision_url)) @given(origin()) def test_origin_branches(self, origin): - origin_visits = self.origin_visit_get(origin['id']) + origin_visits = self.origin_visit_get(origin['url']) visit = origin_visits[-1] snapshot = self.snapshot_get(visit['snapshot']) snapshot_content = process_snapshot_branches(snapshot) self.origin_branches_helper(origin, snapshot_content) origin = dict(origin) origin['type'] = None self.origin_branches_helper(origin, snapshot_content) def origin_releases_helper(self, origin_info, origin_snapshot): url_args = {'origin_type': origin_info['type'], 'origin_url': origin_info['url']} url = reverse('browse-origin-releases', url_args=url_args) resp = self.client.get(url) self.assertEqual(resp.status_code, 200) self.assertTemplateUsed('releases.html') origin_branches = origin_snapshot[0] origin_releases = origin_snapshot[1] origin_branches_url = reverse('browse-origin-branches', url_args=url_args) self.assertContains(resp, 'Branches (%s)' % (origin_branches_url, len(origin_branches))) origin_releases_url = reverse('browse-origin-releases', url_args=url_args) nb_releases = len(origin_releases) if nb_releases > 0: self.assertContains(resp, 'Releases (%s)' % (origin_releases_url, nb_releases)) self.assertContains(resp, '' % escape(browse_release_url)) self.assertContains(resp, '' % escape(browse_revision_url)) @given(origin()) def test_origin_releases(self, origin): - origin_visits = self.origin_visit_get(origin['id']) + origin_visits = self.origin_visit_get(origin['url']) visit = origin_visits[-1] snapshot = self.snapshot_get(visit['snapshot']) snapshot_content = process_snapshot_branches(snapshot) self.origin_releases_helper(origin, snapshot_content) origin = dict(origin) origin['type'] = None self.origin_releases_helper(origin, snapshot_content) @given(new_origin(), new_snapshot(min_size=4, max_size=4), visit_dates(), revisions(min_size=3, max_size=3)) def test_origin_snapshot_null_branch(self, new_origin, new_snapshot, visit_dates, revisions): snp_dict = new_snapshot.to_dict() new_origin = self.storage.origin_add([new_origin])[0] for i, branch in enumerate(snp_dict['branches'].keys()): if i == 0: snp_dict['branches'][branch] = None else: snp_dict['branches'][branch]['target_type'] = 'revision' snp_dict['branches'][branch]['target'] = hash_to_bytes( revisions[i-1]) self.storage.snapshot_add([snp_dict]) - visit = self.storage.origin_visit_add(new_origin['id'], visit_dates[0]) - self.storage.origin_visit_update(new_origin['id'], visit['visit'], + visit = self.storage.origin_visit_add( + new_origin['url'], visit_dates[0]) + self.storage.origin_visit_update(new_origin['url'], visit['visit'], status='partial', snapshot=snp_dict['id']) url = reverse('browse-origin-directory', url_args={'origin_url': new_origin['url']}) rv = self.client.get(url) self.assertEqual(rv.status_code, 200) @patch('swh.web.browse.views.utils.snapshot_context.request_content') @patch('swh.web.common.origin_visits.get_origin_visits') @patch('swh.web.browse.utils.get_origin_visit_snapshot') @patch('swh.web.browse.utils.service') @patch('swh.web.browse.views.origin.service') @patch('swh.web.browse.views.utils.snapshot_context.service') @patch('swh.web.browse.views.origin.get_origin_info') def test_origin_request_errors(self, mock_get_origin_info, mock_snapshot_service, mock_origin_service, mock_utils_service, mock_get_origin_visit_snapshot, mock_get_origin_visits, mock_request_content): mock_get_origin_info.side_effect = \ NotFoundExc('origin not found') url = reverse('browse-origin-visits', url_args={'origin_type': 'foo', 'origin_url': 'bar'}) resp = self.client.get(url) self.assertEqual(resp.status_code, 404) self.assertTemplateUsed('error.html') self.assertContains(resp, 'origin not found', status_code=404) mock_utils_service.lookup_origin.side_effect = None mock_utils_service.lookup_origin.return_value = {'type': 'foo', 'url': 'bar', 'id': 457} mock_get_origin_visits.return_value = [] url = reverse('browse-origin-directory', url_args={'origin_type': 'foo', 'origin_url': 'bar'}) resp = self.client.get(url) self.assertEqual(resp.status_code, 404) self.assertTemplateUsed('error.html') self.assertContains(resp, "No visit", status_code=404) mock_get_origin_visits.return_value = [{'visit': 1}] mock_get_origin_visit_snapshot.side_effect = \ NotFoundExc('visit not found') url = reverse('browse-origin-directory', url_args={'origin_type': 'foo', 'origin_url': 'bar'}, query_params={'visit_id': 2}) resp = self.client.get(url) self.assertEqual(resp.status_code, 404) self.assertTemplateUsed('error.html') self.assertRegex(resp.content.decode('utf-8'), 'Visit.*not found') mock_get_origin_visits.return_value = [{ 'date': '2015-09-26T09:30:52.373449+00:00', 'metadata': {}, 'origin': 457, 'snapshot': 'bdaf9ac436488a8c6cda927a0f44e172934d3f65', 'status': 'full', 'visit': 1 }] mock_get_origin_visit_snapshot.side_effect = None mock_get_origin_visit_snapshot.return_value = ( [{'directory': 'ae59ceecf46367e8e4ad800e231fc76adc3afffb', 'name': 'HEAD', 'revision': '7bc08e1aa0b08cb23e18715a32aa38517ad34672', 'date': '04 May 2017, 13:27 UTC', 'message': ''}], [] ) mock_utils_service.lookup_snapshot_size.return_value = { 'revision': 1, 'release': 0 } mock_utils_service.lookup_directory.side_effect = \ NotFoundExc('Directory not found') url = reverse('browse-origin-directory', url_args={'origin_type': 'foo', 'origin_url': 'bar'}) resp = self.client.get(url) self.assertEqual(resp.status_code, 404) self.assertTemplateUsed('error.html') self.assertContains(resp, 'Directory not found', status_code=404) with patch('swh.web.browse.views.utils.snapshot_context.' 'get_snapshot_context') as mock_get_snapshot_context: mock_get_snapshot_context.side_effect = \ NotFoundExc('Snapshot not found') url = reverse('browse-origin-directory', url_args={'origin_type': 'foo', 'origin_url': 'bar'}) resp = self.client.get(url) self.assertEqual(resp.status_code, 404) self.assertTemplateUsed('error.html') self.assertContains(resp, 'Snapshot not found', status_code=404) mock_origin_service.lookup_origin.side_effect = None mock_origin_service.lookup_origin.return_value = {'type': 'foo', 'url': 'bar', 'id': 457} mock_get_origin_visits.return_value = [] url = reverse('browse-origin-content', url_args={'origin_type': 'foo', 'origin_url': 'bar', 'path': 'foo'}) resp = self.client.get(url) self.assertEqual(resp.status_code, 404) self.assertTemplateUsed('error.html') self.assertContains(resp, "No visit", status_code=404) mock_get_origin_visits.return_value = [{'visit': 1}] mock_get_origin_visit_snapshot.side_effect = \ NotFoundExc('visit not found') url = reverse('browse-origin-content', url_args={'origin_type': 'foo', 'origin_url': 'bar', 'path': 'foo'}, query_params={'visit_id': 2}) resp = self.client.get(url) self.assertEqual(resp.status_code, 404) self.assertTemplateUsed('error.html') self.assertRegex(resp.content.decode('utf-8'), 'Visit.*not found') mock_get_origin_visits.return_value = [{ 'date': '2015-09-26T09:30:52.373449+00:00', 'metadata': {}, 'origin': 457, 'snapshot': 'bdaf9ac436488a8c6cda927a0f44e172934d3f65', 'status': 'full', 'visit': 1 }] mock_get_origin_visit_snapshot.side_effect = None mock_get_origin_visit_snapshot.return_value = ([], []) url = reverse('browse-origin-content', url_args={'origin_type': 'foo', 'origin_url': 'bar', 'path': 'baz'}) resp = self.client.get(url) self.assertEqual(resp.status_code, 404) self.assertTemplateUsed('error.html') self.assertRegex(resp.content.decode('utf-8'), 'Origin.*has an empty list of branches') mock_get_origin_visit_snapshot.return_value = ( [{'directory': 'ae59ceecf46367e8e4ad800e231fc76adc3afffb', 'name': 'HEAD', 'revision': '7bc08e1aa0b08cb23e18715a32aa38517ad34672', 'date': '04 May 2017, 13:27 UTC', 'message': ''}], [] ) mock_snapshot_service.lookup_directory_with_path.return_value = \ {'target': '5ecd9f37b7a2d2e9980d201acd6286116f2ba1f1'} mock_request_content.side_effect = \ NotFoundExc('Content not found') url = reverse('browse-origin-content', url_args={'origin_type': 'foo', 'origin_url': 'bar', 'path': 'baz'}) resp = self.client.get(url) self.assertEqual(resp.status_code, 404) self.assertTemplateUsed('error.html') self.assertContains(resp, 'Content not found', status_code=404) @patch('swh.web.common.origin_visits.get_origin_visits') @patch('swh.web.browse.utils.get_origin_visit_snapshot') @patch('swh.web.browse.utils.service') def test_origin_empty_snapshot(self, mock_utils_service, mock_get_origin_visit_snapshot, mock_get_origin_visits): mock_get_origin_visits.return_value = [{ 'date': '2015-09-26T09:30:52.373449+00:00', 'metadata': {}, 'origin': 457, 'snapshot': 'bdaf9ac436488a8c6cda927a0f44e172934d3f65', 'status': 'full', 'visit': 1 }] mock_get_origin_visit_snapshot.return_value = ([], []) mock_utils_service.lookup_snapshot_size.return_value = { 'revision': 0, 'release': 0 } mock_utils_service.lookup_origin.return_value = { 'id': 457, 'type': 'git', 'url': 'https://github.com/foo/bar' } url = reverse('browse-origin-directory', url_args={'origin_type': 'foo', 'origin_url': 'bar'}) resp = self.client.get(url) self.assertEqual(resp.status_code, 200) self.assertTemplateUsed('content.html') self.assertRegex(resp.content.decode('utf-8'), 'snapshot.*is empty') diff --git a/swh/web/tests/browse/views/test_release.py b/swh/web/tests/browse/views/test_release.py index 3a115f6f..eddb6904 100644 --- a/swh/web/tests/browse/views/test_release.py +++ b/swh/web/tests/browse/views/test_release.py @@ -1,113 +1,113 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random from hypothesis import given from swh.web.common.utils import ( reverse, format_utc_iso_date, get_swh_persistent_id ) from swh.web.tests.strategies import ( release, origin_with_release, unknown_release ) from swh.web.tests.testcase import WebTestCase class SwhBrowseReleaseTest(WebTestCase): @given(release()) def test_release_browse(self, release): url = reverse('browse-release', url_args={'sha1_git': release}) release_data = self.release_get(release) resp = self.client.get(url) self._release_browse_checks(resp, release_data) @given(origin_with_release()) def test_release_browse_with_origin(self, origin): - snapshot = self.snapshot_get_latest(origin['id']) + snapshot = self.snapshot_get_latest(origin['url']) release = random.choice([b for b in snapshot['branches'].values() if b['target_type'] == 'release']) url = reverse('browse-release', url_args={'sha1_git': release['target']}, query_params={'origin': origin['url']}) release_data = self.release_get(release['target']) resp = self.client.get(url) self._release_browse_checks(resp, release_data, origin) @given(unknown_release()) def test_release_browse_not_found(self, unknown_release): url = reverse('browse-release', url_args={'sha1_git': unknown_release}) resp = self.client.get(url) self.assertEqual(resp.status_code, 404) self.assertTemplateUsed('error.html') err_msg = 'Release with sha1_git %s not found' % unknown_release self.assertContains(resp, err_msg, status_code=404) def _release_browse_checks(self, resp, release_data, origin_info=None): query_params = {} if origin_info: query_params['origin'] = origin_info['url'] release_id = release_data['id'] release_name = release_data['name'] author_id = release_data['author']['id'] author_name = release_data['author']['name'] author_url = reverse('browse-person', url_args={'person_id': author_id}, query_params=query_params) release_date = release_data['date'] message = release_data['message'] target_type = release_data['target_type'] target = release_data['target'] target_url = reverse('browse-revision', url_args={'sha1_git': target}, query_params=query_params) message_lines = message.split('\n') self.assertEqual(resp.status_code, 200) self.assertTemplateUsed('browse/release.html') self.assertContains(resp, '%s' % (author_url, author_name)) self.assertContains(resp, format_utc_iso_date(release_date)) self.assertContains(resp, '
    %s
    %s' % (message_lines[0] or 'None', '\n'.join(message_lines[1:]))) self.assertContains(resp, release_id) self.assertContains(resp, release_name) self.assertContains(resp, target_type) self.assertContains(resp, '%s' % (target_url, target)) swh_rel_id = get_swh_persistent_id('release', release_id) swh_rel_id_url = reverse('browse-swh-id', url_args={'swh_id': swh_rel_id}) self.assertContains(resp, swh_rel_id) self.assertContains(resp, swh_rel_id_url) @given(release()) def test_release_uppercase(self, release): url = reverse('browse-release-uppercase-checksum', url_args={'sha1_git': release.upper()}) resp = self.client.get(url) self.assertEqual(resp.status_code, 302) redirect_url = reverse('browse-release', url_args={'sha1_git': release}) self.assertEqual(resp['location'], redirect_url) diff --git a/swh/web/tests/browse/views/test_revision.py b/swh/web/tests/browse/views/test_revision.py index b2d370e9..145299a5 100644 --- a/swh/web/tests/browse/views/test_revision.py +++ b/swh/web/tests/browse/views/test_revision.py @@ -1,256 +1,256 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from django.utils.html import escape from hypothesis import given from swh.web.common.utils import ( reverse, format_utc_iso_date, get_swh_persistent_id, parse_timestamp ) from swh.web.tests.strategies import ( origin, revision, unknown_revision, new_origin ) from swh.web.tests.testcase import WebTestCase class SwhBrowseRevisionTest(WebTestCase): @given(revision()) def test_revision_browse(self, revision): url = reverse('browse-revision', url_args={'sha1_git': revision}) revision_data = self.revision_get(revision) author_id = revision_data['author']['id'] author_name = revision_data['author']['name'] committer_id = revision_data['committer']['id'] committer_name = revision_data['committer']['name'] dir_id = revision_data['directory'] author_url = reverse('browse-person', url_args={'person_id': author_id}) committer_url = reverse('browse-person', url_args={'person_id': committer_id}) directory_url = reverse('browse-directory', url_args={'sha1_git': dir_id}) history_url = reverse('browse-revision-log', url_args={'sha1_git': revision}) resp = self.client.get(url) self.assertEqual(resp.status_code, 200) self.assertTemplateUsed('browse/revision.html') self.assertContains(resp, '%s' % (author_url, author_name)) self.assertContains(resp, '%s' % (committer_url, committer_name)) self.assertContains(resp, directory_url) self.assertContains(resp, history_url) for parent in revision_data['parents']: parent_url = reverse('browse-revision', url_args={'sha1_git': parent}) self.assertContains(resp, '%s' % (parent_url, parent)) author_date = revision_data['date'] committer_date = revision_data['committer_date'] message_lines = revision_data['message'].split('\n') self.assertContains(resp, format_utc_iso_date(author_date)) self.assertContains(resp, format_utc_iso_date(committer_date)) self.assertContains(resp, escape(message_lines[0])) self.assertContains(resp, escape('\n'.join(message_lines[1:]))) @given(origin()) def test_revision_origin_browse(self, origin): - snapshot = self.snapshot_get_latest(origin['id']) + snapshot = self.snapshot_get_latest(origin['url']) revision = snapshot['branches']['HEAD']['target'] revision_data = self.revision_get(revision) dir_id = revision_data['directory'] origin_revision_log_url = reverse('browse-origin-log', url_args={'origin_url': origin['url']}, # noqa query_params={'revision': revision}) url = reverse('browse-revision', url_args={'sha1_git': revision}, query_params={'origin': origin['url']}) resp = self.client.get(url) self.assertContains(resp, origin_revision_log_url) for parent in revision_data['parents']: parent_url = reverse('browse-revision', url_args={'sha1_git': parent}, query_params={'origin': origin['url']}) self.assertContains(resp, '%s' % (parent_url, parent)) self.assertContains(resp, 'vault-cook-directory') self.assertContains(resp, 'vault-cook-revision') swh_rev_id = get_swh_persistent_id('revision', revision) swh_rev_id_url = reverse('browse-swh-id', url_args={'swh_id': swh_rev_id}) self.assertContains(resp, swh_rev_id) self.assertContains(resp, swh_rev_id_url) swh_dir_id = get_swh_persistent_id('directory', dir_id) swh_dir_id_url = reverse('browse-swh-id', url_args={'swh_id': swh_dir_id}) self.assertContains(resp, swh_dir_id) self.assertContains(resp, swh_dir_id_url) self.assertContains(resp, 'swh-take-new-snapshot') @given(revision()) def test_revision_log_browse(self, revision): per_page = 10 revision_log = self.revision_log(revision) revision_log_sorted = \ sorted(revision_log, key=lambda rev: -parse_timestamp( rev['committer_date']).timestamp()) url = reverse('browse-revision-log', url_args={'sha1_git': revision}, query_params={'per_page': per_page}) resp = self.client.get(url) next_page_url = reverse('browse-revision-log', url_args={'sha1_git': revision}, query_params={'offset': per_page, 'per_page': per_page}) nb_log_entries = per_page if len(revision_log_sorted) < per_page: nb_log_entries = len(revision_log_sorted) self.assertEqual(resp.status_code, 200) self.assertTemplateUsed('browse/revision-log.html') self.assertContains(resp, 'Newer') if len(revision_log_sorted) > per_page: self.assertContains(resp, 'Older' % # noqa escape(next_page_url)) for log in revision_log_sorted[:per_page]: revision_url = reverse('browse-revision', url_args={'sha1_git': log['id']}) self.assertContains(resp, log['id'][:7]) self.assertContains(resp, log['author']['name']) self.assertContains(resp, format_utc_iso_date(log['date'])) self.assertContains(resp, escape(log['message'])) self.assertContains(resp, format_utc_iso_date(log['committer_date'])) # noqa self.assertContains(resp, revision_url) if len(revision_log_sorted) <= per_page: return resp = self.client.get(next_page_url) prev_page_url = reverse('browse-revision-log', url_args={'sha1_git': revision}, query_params={'per_page': per_page}) next_page_url = reverse('browse-revision-log', url_args={'sha1_git': revision}, query_params={'offset': 2 * per_page, 'per_page': per_page}) nb_log_entries = len(revision_log_sorted) - per_page if nb_log_entries > per_page: nb_log_entries = per_page self.assertEqual(resp.status_code, 200) self.assertTemplateUsed('browse/revision-log.html') self.assertContains(resp, 'Newer' % escape(prev_page_url)) if len(revision_log_sorted) > 2 * per_page: self.assertContains(resp, 'Older' % # noqa escape(next_page_url)) if len(revision_log_sorted) <= 2 * per_page: return resp = self.client.get(next_page_url) prev_page_url = reverse('browse-revision-log', url_args={'sha1_git': revision}, query_params={'offset': per_page, 'per_page': per_page}) next_page_url = reverse('browse-revision-log', url_args={'sha1_git': revision}, query_params={'offset': 3 * per_page, 'per_page': per_page}) nb_log_entries = len(revision_log_sorted) - 2 * per_page if nb_log_entries > per_page: nb_log_entries = per_page self.assertEqual(resp.status_code, 200) self.assertTemplateUsed('browse/revision-log.html') self.assertContains(resp, 'Newer' % escape(prev_page_url)) if len(revision_log_sorted) > 3 * per_page: self.assertContains(resp, 'Older' % # noqa escape(next_page_url)) @given(revision(), unknown_revision(), new_origin()) def test_revision_request_errors(self, revision, unknown_revision, new_origin): url = reverse('browse-revision', url_args={'sha1_git': unknown_revision}) resp = self.client.get(url) self.assertEqual(resp.status_code, 404) self.assertTemplateUsed('error.html') self.assertContains(resp, 'Revision with sha1_git %s not found' % unknown_revision, status_code=404) url = reverse('browse-revision', url_args={'sha1_git': revision}, query_params={'origin_type': new_origin['type'], 'origin': new_origin['url']}) resp = self.client.get(url) self.assertEqual(resp.status_code, 404) self.assertTemplateUsed('error.html') self.assertContains(resp, 'the origin mentioned in your request' ' appears broken', status_code=404) @given(revision()) def test_revision_uppercase(self, revision): url = reverse('browse-revision-uppercase-checksum', url_args={'sha1_git': revision.upper()}) resp = self.client.get(url) self.assertEqual(resp.status_code, 302) redirect_url = reverse('browse-revision', url_args={'sha1_git': revision}) self.assertEqual(resp['location'], redirect_url) diff --git a/swh/web/tests/common/test_service.py b/swh/web/tests/common/test_service.py index 869120d5..b614db10 100644 --- a/swh/web/tests/common/test_service.py +++ b/swh/web/tests/common/test_service.py @@ -1,879 +1,886 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import itertools import pytest import random from collections import defaultdict from hypothesis import given from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.model.from_disk import DentryPerms from swh.web.common import service from swh.web.common.exc import BadInputExc, NotFoundExc from swh.web.tests.data import random_sha1, random_content from swh.web.tests.strategies import ( content, contents, unknown_contents, contents_with_ctags, origin, new_origin, visit_dates, directory, release, revision, unknown_revision, revisions, ancestor_revisions, non_ancestor_revisions, invalid_sha1, sha256, revision_with_submodules, empty_directory, new_revision, new_origins ) from swh.web.tests.testcase import ( WebTestCase, ctags_json_missing, fossology_missing ) class ServiceTestCase(WebTestCase): @given(contents()) def test_lookup_multiple_hashes_all_present(self, contents): input_data = [] expected_output = [] for cnt in contents: input_data.append({'sha1': cnt['sha1']}) expected_output.append({'sha1': cnt['sha1'], 'found': True}) self.assertEqual(service.lookup_multiple_hashes(input_data), expected_output) @given(contents(), unknown_contents()) def test_lookup_multiple_hashes_some_missing(self, contents, unknown_contents): input_contents = list(itertools.chain(contents, unknown_contents)) random.shuffle(input_contents) input_data = [] expected_output = [] for cnt in input_contents: input_data.append({'sha1': cnt['sha1']}) expected_output.append({'sha1': cnt['sha1'], 'found': cnt in contents}) self.assertEqual(service.lookup_multiple_hashes(input_data), expected_output) def test_lookup_hash_does_not_exist(self): unknown_content_ = random_content() actual_lookup = service.lookup_hash('sha1_git:%s' % unknown_content_['sha1_git']) self.assertEqual(actual_lookup, {'found': None, 'algo': 'sha1_git'}) @given(content()) def test_lookup_hash_exist(self, content): actual_lookup = service.lookup_hash('sha1:%s' % content['sha1']) content_metadata = self.content_get_metadata(content['sha1']) self.assertEqual({'found': content_metadata, 'algo': 'sha1'}, actual_lookup) def test_search_hash_does_not_exist(self): unknown_content_ = random_content() actual_lookup = service.search_hash('sha1_git:%s' % unknown_content_['sha1_git']) self.assertEqual({'found': False}, actual_lookup) @given(content()) def test_search_hash_exist(self, content): actual_lookup = service.search_hash('sha1:%s' % content['sha1']) self.assertEqual({'found': True}, actual_lookup) @pytest.mark.skipif(ctags_json_missing, reason="requires ctags with json output support") @given(contents_with_ctags()) def test_lookup_content_ctags(self, contents_with_ctags): content_sha1 = random.choice(contents_with_ctags['sha1s']) self.content_add_ctags(content_sha1) actual_ctags = \ list(service.lookup_content_ctags('sha1:%s' % content_sha1)) expected_data = list(self.content_get_ctags(content_sha1)) for ctag in expected_data: ctag['id'] = content_sha1 self.assertEqual(actual_ctags, expected_data) def test_lookup_content_ctags_no_hash(self): unknown_content_ = random_content() actual_ctags = \ list(service.lookup_content_ctags('sha1:%s' % unknown_content_['sha1'])) self.assertEqual(actual_ctags, []) @given(content()) def test_lookup_content_filetype(self, content): self.content_add_mimetype(content['sha1']) actual_filetype = service.lookup_content_filetype(content['sha1']) expected_filetype = self.content_get_mimetype(content['sha1']) self.assertEqual(actual_filetype, expected_filetype) @pytest.mark.xfail # Language indexer is disabled. @given(content()) def test_lookup_content_language(self, content): self.content_add_language(content['sha1']) actual_language = service.lookup_content_language(content['sha1']) expected_language = self.content_get_language(content['sha1']) self.assertEqual(actual_language, expected_language) @given(contents_with_ctags()) def test_lookup_expression(self, contents_with_ctags): per_page = 10 expected_ctags = [] for content_sha1 in contents_with_ctags['sha1s']: if len(expected_ctags) == per_page: break self.content_add_ctags(content_sha1) for ctag in self.content_get_ctags(content_sha1): if len(expected_ctags) == per_page: break if ctag['name'] == contents_with_ctags['symbol_name']: del ctag['id'] ctag['sha1'] = content_sha1 expected_ctags.append(ctag) actual_ctags = \ list(service.lookup_expression(contents_with_ctags['symbol_name'], last_sha1=None, per_page=10)) self.assertEqual(actual_ctags, expected_ctags) def test_lookup_expression_no_result(self): expected_ctags = [] actual_ctags = \ list(service.lookup_expression('barfoo', last_sha1=None, per_page=10)) self.assertEqual(actual_ctags, expected_ctags) @pytest.mark.skipif(fossology_missing, reason="requires fossology-nomossa installed") @given(content()) def test_lookup_content_license(self, content): self.content_add_license(content['sha1']) actual_license = service.lookup_content_license(content['sha1']) expected_license = self.content_get_license(content['sha1']) self.assertEqual(actual_license, expected_license) def test_stat_counters(self): actual_stats = service.stat_counters() self.assertEqual(actual_stats, self.storage.stat_counters()) @given(new_origin(), visit_dates()) def test_lookup_origin_visits(self, new_origin, visit_dates): origin_id = self.storage.origin_add_one(new_origin) for ts in visit_dates: self.storage.origin_visit_add(origin_id, ts) actual_origin_visits = list( service.lookup_origin_visits(origin_id, per_page=100)) expected_visits = self.origin_visit_get(origin_id) self.assertEqual(actual_origin_visits, expected_visits) @given(new_origin(), visit_dates()) def test_lookup_origin_visit(self, new_origin, visit_dates): origin_id = self.storage.origin_add_one(new_origin) visits = [] for ts in visit_dates: visits.append(self.storage.origin_visit_add(origin_id, ts)) visit = random.choice(visits)['visit'] actual_origin_visit = service.lookup_origin_visit(origin_id, visit) expected_visit = dict(self.storage.origin_visit_get_by(origin_id, visit)) expected_visit['date'] = expected_visit['date'].isoformat() expected_visit['metadata'] = {} self.assertEqual(actual_origin_visit, expected_visit) + @pytest.mark.origin_id @given(new_origin()) - def test_lookup_origin(self, new_origin): + def test_lookup_origin_by_id(self, new_origin): origin_id = self.storage.origin_add_one(new_origin) actual_origin = service.lookup_origin({'id': origin_id}) expected_origin = self.storage.origin_get({'id': origin_id}) self.assertEqual(actual_origin, expected_origin) + @given(new_origin()) + def test_lookup_origin(self, new_origin): + self.storage.origin_add_one(new_origin) + actual_origin = service.lookup_origin({'type': new_origin['type'], 'url': new_origin['url']}) expected_origin = self.storage.origin_get({'type': new_origin['type'], 'url': new_origin['url']}) self.assertEqual(actual_origin, expected_origin) @given(invalid_sha1()) def test_lookup_release_ko_id_checksum_not_a_sha1(self, invalid_sha1): with self.assertRaises(BadInputExc) as cm: service.lookup_release(invalid_sha1) self.assertIn('invalid checksum', cm.exception.args[0].lower()) @given(sha256()) def test_lookup_release_ko_id_checksum_too_long(self, sha256): with self.assertRaises(BadInputExc) as cm: service.lookup_release(sha256) self.assertEqual('Only sha1_git is supported.', cm.exception.args[0]) @given(directory()) def test_lookup_directory_with_path_not_found(self, directory): path = 'some/invalid/path/here' with self.assertRaises(NotFoundExc) as cm: service.lookup_directory_with_path(directory, path) self.assertEqual('Directory entry with path %s from %s ' 'not found' % (path, directory), cm.exception.args[0]) @given(directory()) def test_lookup_directory_with_path_found(self, directory): directory_content = self.directory_ls(directory) directory_entry = random.choice(directory_content) path = directory_entry['name'] actual_result = service.lookup_directory_with_path(directory, path) self.assertEqual(actual_result, directory_entry) @given(release()) def test_lookup_release(self, release): actual_release = service.lookup_release(release) self.assertEqual(actual_release, self.release_get(release)) @given(revision(), invalid_sha1(), sha256()) def test_lookup_revision_with_context_ko_not_a_sha1(self, revision, invalid_sha1, sha256): sha1_git_root = revision sha1_git = invalid_sha1 with self.assertRaises(BadInputExc) as cm: service.lookup_revision_with_context(sha1_git_root, sha1_git) self.assertIn('Invalid checksum query string', cm.exception.args[0]) sha1_git = sha256 with self.assertRaises(BadInputExc) as cm: service.lookup_revision_with_context(sha1_git_root, sha1_git) self.assertIn('Only sha1_git is supported', cm.exception.args[0]) @given(revision(), unknown_revision()) def test_lookup_revision_with_context_ko_sha1_git_does_not_exist( self, revision, unknown_revision): sha1_git_root = revision sha1_git = unknown_revision with self.assertRaises(NotFoundExc) as cm: service.lookup_revision_with_context(sha1_git_root, sha1_git) self.assertIn('Revision %s not found' % sha1_git, cm.exception.args[0]) @given(revision(), unknown_revision()) def test_lookup_revision_with_context_ko_root_sha1_git_does_not_exist( self, revision, unknown_revision): sha1_git_root = unknown_revision sha1_git = revision with self.assertRaises(NotFoundExc) as cm: service.lookup_revision_with_context(sha1_git_root, sha1_git) self.assertIn('Revision root %s not found' % sha1_git_root, cm.exception.args[0]) @given(ancestor_revisions()) def test_lookup_revision_with_context(self, ancestor_revisions): sha1_git = ancestor_revisions['sha1_git'] root_sha1_git = ancestor_revisions['sha1_git_root'] for sha1_git_root in (root_sha1_git, {'id': hash_to_bytes(root_sha1_git)}): actual_revision = \ service.lookup_revision_with_context(sha1_git_root, sha1_git) children = [] for rev in self.revision_log(root_sha1_git): for p_rev in rev['parents']: p_rev_hex = hash_to_hex(p_rev) if p_rev_hex == sha1_git: children.append(rev['id']) expected_revision = self.revision_get(sha1_git) expected_revision['children'] = children self.assertEqual(actual_revision, expected_revision) @given(non_ancestor_revisions()) def test_lookup_revision_with_context_ko(self, non_ancestor_revisions): sha1_git = non_ancestor_revisions['sha1_git'] root_sha1_git = non_ancestor_revisions['sha1_git_root'] with self.assertRaises(NotFoundExc) as cm: service.lookup_revision_with_context(root_sha1_git, sha1_git) self.assertIn('Revision %s is not an ancestor of %s' % (sha1_git, root_sha1_git), cm.exception.args[0]) def test_lookup_directory_with_revision_not_found(self): unknown_revision_ = random_sha1() with self.assertRaises(NotFoundExc) as cm: service.lookup_directory_with_revision(unknown_revision_) self.assertIn('Revision %s not found' % unknown_revision_, cm.exception.args[0]) def test_lookup_directory_with_revision_unknown_content(self): unknown_content_ = random_content() unknown_revision_ = random_sha1() unknown_directory_ = random_sha1() dir_path = 'README.md' # Create a revision that points to a directory # Which points to unknown content revision = { 'author': { 'name': b'abcd', 'email': b'abcd@company.org', 'fullname': b'abcd abcd' }, 'committer': { 'email': b'aaaa@company.org', 'fullname': b'aaaa aaa', 'name': b'aaa' }, 'committer_date': { 'negative_utc': False, 'offset': 0, 'timestamp': 1437511651 }, 'date': { 'negative_utc': False, 'offset': 0, 'timestamp': 1437511651 }, 'message': b'bleh', 'metadata': [], 'parents': [], 'synthetic': False, 'type': 'file', 'id': hash_to_bytes(unknown_revision_), 'directory': hash_to_bytes(unknown_directory_) } # A directory that points to unknown content dir = { 'id': hash_to_bytes(unknown_directory_), 'entries': [{ 'name': bytes(dir_path.encode('utf-8')), 'type': 'file', 'target': hash_to_bytes(unknown_content_['sha1_git']), 'perms': DentryPerms.content }] } # Add the directory and revision in mem self.storage.directory_add([dir]) self.storage.revision_add([revision]) with self.assertRaises(NotFoundExc) as cm: service.lookup_directory_with_revision( unknown_revision_, dir_path) self.assertIn('Content not found for revision %s' % unknown_revision_, cm.exception.args[0]) @given(revision()) def test_lookup_directory_with_revision_ko_path_to_nowhere( self, revision): invalid_path = 'path/to/something/unknown' with self.assertRaises(NotFoundExc) as cm: service.lookup_directory_with_revision(revision, invalid_path) exception_text = cm.exception.args[0].lower() self.assertIn('directory or file', exception_text) self.assertIn(invalid_path, exception_text) self.assertIn('revision %s' % revision, exception_text) self.assertIn('not found', exception_text) @given(revision_with_submodules()) def test_lookup_directory_with_revision_submodules( self, revision_with_submodules): rev_sha1_git = revision_with_submodules['rev_sha1_git'] rev_dir_path = revision_with_submodules['rev_dir_rev_path'] actual_data = service.lookup_directory_with_revision( rev_sha1_git, rev_dir_path) revision = self.revision_get(revision_with_submodules['rev_sha1_git']) directory = self.directory_ls(revision['directory']) rev_entry = next(e for e in directory if e['name'] == rev_dir_path) expected_data = { 'content': self.revision_get(rev_entry['target']), 'path': rev_dir_path, 'revision': rev_sha1_git, 'type': 'rev' } self.assertEqual(actual_data, expected_data) @given(revision()) def test_lookup_directory_with_revision_without_path(self, revision): actual_directory_entries = \ service.lookup_directory_with_revision(revision) revision_data = self.revision_get(revision) expected_directory_entries = \ self.directory_ls(revision_data['directory']) self.assertEqual(actual_directory_entries['type'], 'dir') self.assertEqual(actual_directory_entries['content'], expected_directory_entries) @given(revision()) def test_lookup_directory_with_revision_with_path(self, revision): revision_data = self.revision_get(revision) dir_entries = [e for e in self.directory_ls(revision_data['directory']) if e['type'] in ('file', 'dir')] expected_dir_entry = random.choice(dir_entries) actual_dir_entry = \ service.lookup_directory_with_revision(revision, expected_dir_entry['name']) self.assertEqual(actual_dir_entry['type'], expected_dir_entry['type']) self.assertEqual(actual_dir_entry['revision'], revision) self.assertEqual(actual_dir_entry['path'], expected_dir_entry['name']) if actual_dir_entry['type'] == 'file': del actual_dir_entry['content']['checksums']['blake2s256'] for key in ('checksums', 'status', 'length'): self.assertEqual(actual_dir_entry['content'][key], expected_dir_entry[key]) else: sub_dir_entries = self.directory_ls(expected_dir_entry['target']) self.assertEqual(actual_dir_entry['content'], sub_dir_entries) @given(revision()) def test_lookup_directory_with_revision_with_path_to_file_and_data( self, revision): revision_data = self.revision_get(revision) dir_entries = [e for e in self.directory_ls(revision_data['directory']) if e['type'] == 'file'] expected_dir_entry = random.choice(dir_entries) expected_data = \ self.content_get(expected_dir_entry['checksums']['sha1']) actual_dir_entry = \ service.lookup_directory_with_revision(revision, expected_dir_entry['name'], with_data=True) self.assertEqual(actual_dir_entry['type'], expected_dir_entry['type']) self.assertEqual(actual_dir_entry['revision'], revision) self.assertEqual(actual_dir_entry['path'], expected_dir_entry['name']) del actual_dir_entry['content']['checksums']['blake2s256'] for key in ('checksums', 'status', 'length'): self.assertEqual(actual_dir_entry['content'][key], expected_dir_entry[key]) self.assertEqual(actual_dir_entry['content']['data'], expected_data['data']) @given(revision()) def test_lookup_revision(self, revision): actual_revision = service.lookup_revision(revision) self.assertEqual(actual_revision, self.revision_get(revision)) @given(new_revision()) def test_lookup_revision_invalid_msg(self, new_revision): new_revision['message'] = b'elegant fix for bug \xff' self.storage.revision_add([new_revision]) revision = service.lookup_revision(hash_to_hex(new_revision['id'])) self.assertEqual(revision['message'], None) self.assertEqual(revision['message_decoding_failed'], True) @given(new_revision()) def test_lookup_revision_msg_ok(self, new_revision): self.storage.revision_add([new_revision]) revision_message = service.lookup_revision_message( hash_to_hex(new_revision['id'])) self.assertEqual(revision_message, {'message': new_revision['message']}) @given(new_revision()) def test_lookup_revision_msg_absent(self, new_revision): del new_revision['message'] self.storage.revision_add([new_revision]) new_revision_id = hash_to_hex(new_revision['id']) with self.assertRaises(NotFoundExc) as cm: service.lookup_revision_message(new_revision_id) self.assertEqual( cm.exception.args[0], 'No message for revision with sha1_git %s.' % new_revision_id ) def test_lookup_revision_msg_no_rev(self): unknown_revision_ = random_sha1() with self.assertRaises(NotFoundExc) as cm: service.lookup_revision_message(unknown_revision_) self.assertEqual( cm.exception.args[0], 'Revision with sha1_git %s not found.' % unknown_revision_ ) @given(revisions()) def test_lookup_revision_multiple(self, revisions): actual_revisions = list(service.lookup_revision_multiple(revisions)) expected_revisions = [] for rev in revisions: expected_revisions.append(self.revision_get(rev)) self.assertEqual(actual_revisions, expected_revisions) def test_lookup_revision_multiple_none_found(self): unknown_revisions_ = [random_sha1(), random_sha1(), random_sha1()] actual_revisions = \ list(service.lookup_revision_multiple(unknown_revisions_)) self.assertEqual(actual_revisions, [None] * len(unknown_revisions_)) @given(revision()) def test_lookup_revision_log(self, revision): actual_revision_log = \ list(service.lookup_revision_log(revision, limit=25)) expected_revision_log = self.revision_log(revision, limit=25) self.assertEqual(actual_revision_log, expected_revision_log) def _get_origin_branches(self, origin): - origin_visit = self.origin_visit_get(origin['id'])[-1] + origin_visit = self.origin_visit_get(origin['url'])[-1] snapshot = self.snapshot_get(origin_visit['snapshot']) branches = {k: v for (k, v) in snapshot['branches'].items() if v['target_type'] == 'revision'} return branches @given(origin()) def test_lookup_revision_log_by(self, origin): branches = self._get_origin_branches(origin) branch_name = random.choice(list(branches.keys())) actual_log = \ - list(service.lookup_revision_log_by(origin['id'], branch_name, + list(service.lookup_revision_log_by(origin['url'], branch_name, None, limit=25)) expected_log = \ self.revision_log(branches[branch_name]['target'], limit=25) self.assertEqual(actual_log, expected_log) @given(origin()) def test_lookup_revision_log_by_notfound(self, origin): with self.assertRaises(NotFoundExc): service.lookup_revision_log_by( - origin['id'], 'unknown_branch_name', None, limit=100) + origin['url'], 'unknown_branch_name', None, limit=100) def test_lookup_content_raw_not_found(self): unknown_content_ = random_content() with self.assertRaises(NotFoundExc) as cm: service.lookup_content_raw('sha1:' + unknown_content_['sha1']) self.assertIn(cm.exception.args[0], 'Content with %s checksum equals to %s not found!' % ('sha1', unknown_content_['sha1'])) @given(content()) def test_lookup_content_raw(self, content): actual_content = service.lookup_content_raw( 'sha256:%s' % content['sha256']) expected_content = self.content_get(content['sha1']) self.assertEqual(actual_content, expected_content) def test_lookup_content_not_found(self): unknown_content_ = random_content() with self.assertRaises(NotFoundExc) as cm: service.lookup_content('sha1:%s' % unknown_content_['sha1']) self.assertIn(cm.exception.args[0], 'Content with %s checksum equals to %s not found!' % ('sha1', unknown_content_['sha1'])) @given(content()) def test_lookup_content_with_sha1(self, content): actual_content = service.lookup_content( 'sha1:%s' % content['sha1']) expected_content = self.content_get_metadata(content['sha1']) self.assertEqual(actual_content, expected_content) @given(content()) def test_lookup_content_with_sha256(self, content): actual_content = service.lookup_content( 'sha256:%s' % content['sha256']) expected_content = self.content_get_metadata(content['sha1']) self.assertEqual(actual_content, expected_content) @given(revision()) def test_lookup_person(self, revision): rev_data = self.revision_get(revision) actual_person = service.lookup_person(rev_data['author']['id']) self.assertEqual(actual_person, rev_data['author']) def test_lookup_directory_bad_checksum(self): with self.assertRaises(BadInputExc): service.lookup_directory('directory_id') def test_lookup_directory_not_found(self): unknown_directory_ = random_sha1() with self.assertRaises(NotFoundExc) as cm: service.lookup_directory(unknown_directory_) self.assertIn('Directory with sha1_git %s not found' % unknown_directory_, cm.exception.args[0]) @given(directory()) def test_lookup_directory(self, directory): actual_directory_ls = list(service.lookup_directory( directory)) expected_directory_ls = self.directory_ls(directory) self.assertEqual(actual_directory_ls, expected_directory_ls) @given(empty_directory()) def test_lookup_directory_empty(self, empty_directory): actual_directory_ls = list(service.lookup_directory(empty_directory)) self.assertEqual(actual_directory_ls, []) @given(origin()) def test_lookup_revision_by_nothing_found(self, origin): with self.assertRaises(NotFoundExc): - service.lookup_revision_by(origin['id'], 'invalid-branch-name') + service.lookup_revision_by( + origin['url'], 'invalid-branch-name') @given(origin()) def test_lookup_revision_by(self, origin): branches = self._get_origin_branches(origin) branch_name = random.choice(list(branches.keys())) actual_revision = \ - service.lookup_revision_by(origin['id'], branch_name, None) + service.lookup_revision_by(origin['url'], branch_name, None) expected_revision = \ self.revision_get(branches[branch_name]['target']) self.assertEqual(actual_revision, expected_revision) @given(origin(), revision()) def test_lookup_revision_with_context_by_ko(self, origin, revision): with self.assertRaises(NotFoundExc): - service.lookup_revision_with_context_by(origin['id'], + service.lookup_revision_with_context_by(origin['url'], 'invalid-branch-name', None, revision) @given(origin()) def test_lookup_revision_with_context_by(self, origin): branches = self._get_origin_branches(origin) branch_name = random.choice(list(branches.keys())) root_rev = branches[branch_name]['target'] root_rev_log = self.revision_log(root_rev) children = defaultdict(list) for rev in root_rev_log: for rev_p in rev['parents']: children[rev_p].append(rev['id']) rev = root_rev_log[-1]['id'] actual_root_rev, actual_rev = service.lookup_revision_with_context_by( - origin['id'], branch_name, None, rev) + origin['url'], branch_name, None, rev) expected_root_rev = self.revision_get(root_rev) expected_rev = self.revision_get(rev) expected_rev['children'] = children[rev] self.assertEqual(actual_root_rev, expected_root_rev) self.assertEqual(actual_rev, expected_rev) def test_lookup_revision_through_ko_not_implemented(self): with self.assertRaises(NotImplementedError): service.lookup_revision_through({ 'something-unknown': 10, }) @given(origin()) def test_lookup_revision_through_with_context_by(self, origin): branches = self._get_origin_branches(origin) branch_name = random.choice(list(branches.keys())) root_rev = branches[branch_name]['target'] root_rev_log = self.revision_log(root_rev) rev = root_rev_log[-1]['id'] self.assertEqual(service.lookup_revision_through({ - 'origin_id': origin['id'], + 'origin_url': origin['url'], 'branch_name': branch_name, 'ts': None, 'sha1_git': rev }), service.lookup_revision_with_context_by( - origin['id'], branch_name, None, rev) + origin['url'], branch_name, None, rev) ) @given(origin()) def test_lookup_revision_through_with_revision_by(self, origin): branches = self._get_origin_branches(origin) branch_name = random.choice(list(branches.keys())) self.assertEqual(service.lookup_revision_through({ - 'origin_id': origin['id'], + 'origin_url': origin['url'], 'branch_name': branch_name, 'ts': None, }), service.lookup_revision_by( - origin['id'], branch_name, None) + origin['url'], branch_name, None) ) @given(ancestor_revisions()) def test_lookup_revision_through_with_context(self, ancestor_revisions): sha1_git = ancestor_revisions['sha1_git'] sha1_git_root = ancestor_revisions['sha1_git_root'] self.assertEqual(service.lookup_revision_through({ 'sha1_git_root': sha1_git_root, 'sha1_git': sha1_git, }), service.lookup_revision_with_context( sha1_git_root, sha1_git) ) @given(revision()) def test_lookup_revision_through_with_revision(self, revision): self.assertEqual(service.lookup_revision_through({ 'sha1_git': revision }), service.lookup_revision(revision) ) @given(revision()) def test_lookup_directory_through_revision_ko_not_found(self, revision): with self.assertRaises(NotFoundExc): service.lookup_directory_through_revision( {'sha1_git': revision}, 'some/invalid/path') @given(revision()) def test_lookup_directory_through_revision_ok(self, revision): revision_data = self.revision_get(revision) dir_entries = [e for e in self.directory_ls(revision_data['directory']) if e['type'] == 'file'] dir_entry = random.choice(dir_entries) self.assertEqual( service.lookup_directory_through_revision({'sha1_git': revision}, dir_entry['name']), (revision, service.lookup_directory_with_revision( revision, dir_entry['name'])) ) @given(revision()) def test_lookup_directory_through_revision_ok_with_data(self, revision): revision_data = self.revision_get(revision) dir_entries = [e for e in self.directory_ls(revision_data['directory']) if e['type'] == 'file'] dir_entry = random.choice(dir_entries) self.assertEqual( service.lookup_directory_through_revision({'sha1_git': revision}, dir_entry['name'], with_data=True), (revision, service.lookup_directory_with_revision( revision, dir_entry['name'], with_data=True)) ) + @pytest.mark.origin_id @given(new_origins(20)) def test_lookup_origins(self, new_origins): nb_origins = len(new_origins) expected_origins = self.storage.origin_add(new_origins) origin_from_idx = random.randint(1, nb_origins-1) - 1 origin_from = expected_origins[origin_from_idx]['id'] max_origin_idx = expected_origins[-1]['id'] origin_count = random.randint(1, max_origin_idx - origin_from) actual_origins = list(service.lookup_origins(origin_from, origin_count)) expected_origins = list(self.storage.origin_get_range(origin_from, origin_count)) self.assertEqual(actual_origins, expected_origins) diff --git a/swh/web/tests/data.py b/swh/web/tests/data.py index 265070c1..90e041b3 100644 --- a/swh/web/tests/data.py +++ b/swh/web/tests/data.py @@ -1,322 +1,320 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from copy import deepcopy import os import random import time from swh.indexer.fossology_license import FossologyLicenseIndexer from swh.indexer.mimetype import MimetypeIndexer from swh.indexer.ctags import CtagsIndexer from swh.indexer.storage import get_indexer_storage from swh.model.hashutil import hash_to_hex, hash_to_bytes, DEFAULT_ALGORITHMS from swh.model.identifiers import directory_identifier from swh.loader.git.from_disk import GitLoaderFromArchive from swh.storage.algos.dir_iterators import dir_iterator from swh.web import config from swh.web.browse.utils import ( get_mimetype_and_encoding_for_content, prepare_content_for_display ) from swh.web.common import service # Module used to initialize data that will be provided as tests input # Configuration for git loader _TEST_LOADER_CONFIG = { 'storage': { 'cls': 'memory', 'args': {} }, 'send_contents': True, 'send_directories': True, 'send_revisions': True, 'send_releases': True, 'send_snapshot': True, 'content_size_limit': 100 * 1024 * 1024, 'content_packet_size': 10, 'content_packet_size_bytes': 100 * 1024 * 1024, 'directory_packet_size': 10, 'revision_packet_size': 10, 'release_packet_size': 10, 'save_data': False, } # Base content indexer configuration _TEST_INDEXER_BASE_CONFIG = { 'storage': { 'cls': 'memory', 'args': {}, }, 'objstorage': { 'cls': 'memory', 'args': {}, }, 'indexer_storage': { 'cls': 'memory', 'args': {}, } } def random_sha1(): return hash_to_hex(bytes(random.randint(0, 255) for _ in range(20))) def random_sha256(): return hash_to_hex(bytes(random.randint(0, 255) for _ in range(32))) def random_blake2s256(): return hash_to_hex(bytes(random.randint(0, 255) for _ in range(32))) def random_content(): return { 'sha1': random_sha1(), 'sha1_git': random_sha1(), 'sha256': random_sha256(), 'blake2s256': random_blake2s256(), } # MimetypeIndexer with custom configuration for tests class _MimetypeIndexer(MimetypeIndexer): def parse_config_file(self, *args, **kwargs): return { **_TEST_INDEXER_BASE_CONFIG, 'tools': { 'name': 'file', 'version': '1:5.30-1+deb9u1', 'configuration': { "type": "library", "debian-package": "python3-magic" } } } # FossologyLicenseIndexer with custom configuration for tests class _FossologyLicenseIndexer(FossologyLicenseIndexer): def parse_config_file(self, *args, **kwargs): return { **_TEST_INDEXER_BASE_CONFIG, 'workdir': '/tmp/swh/indexer.fossology.license', 'tools': { 'name': 'nomos', 'version': '3.1.0rc2-31-ga2cbb8c', 'configuration': { 'command_line': 'nomossa ', }, } } # CtagsIndexer with custom configuration for tests class _CtagsIndexer(CtagsIndexer): def parse_config_file(self, *args, **kwargs): return { **_TEST_INDEXER_BASE_CONFIG, 'workdir': '/tmp/swh/indexer.ctags', 'languages': {'c': 'c'}, 'tools': { 'name': 'universal-ctags', 'version': '~git7859817b', 'configuration': { 'command_line': '''ctags --fields=+lnz --sort=no --links=no ''' # noqa '''--output-format=json ''' }, } } # Lightweight git repositories that will be loaded to generate # input data for tests _TEST_ORIGINS = [ { - 'id': 1, 'type': 'git', 'url': 'https://github.com/wcoder/highlightjs-line-numbers.js', 'archives': ['highlightjs-line-numbers.js.zip', 'highlightjs-line-numbers.js_visit2.zip'] }, { - 'id': 2, 'type': 'git', 'url': 'https://github.com/memononen/libtess2', 'archives': ['libtess2.zip'] }, { - 'id': 3, 'type': 'git', 'url': 'repo_with_submodules', 'archives': ['repo_with_submodules.tgz'] } ] _contents = {} # Tests data initialization def _init_tests_data(): # Load git repositories from archives loader = GitLoaderFromArchive(config=_TEST_LOADER_CONFIG) # Get reference to the memory storage storage = loader.storage for origin in _TEST_ORIGINS: nb_visits = len(origin['archives']) for i, archive in enumerate(origin['archives']): origin_repo_archive = \ os.path.join(os.path.dirname(__file__), 'resources/repos/%s' % archive) loader.load(origin['url'], origin_repo_archive, None) if nb_visits > 1 and i != nb_visits - 1: time.sleep(1) + origin.update(storage.origin_get(origin)) # add an 'id' key if enabled contents = set() directories = set() revisions = set() releases = set() snapshots = set() persons = set() content_path = {} # Get all objects loaded into the test archive for origin in _TEST_ORIGINS: - snp = storage.snapshot_get_latest(origin['id']) + snp = storage.snapshot_get_latest(origin['url']) snapshots.add(hash_to_hex(snp['id'])) for branch_name, branch_data in snp['branches'].items(): if branch_data['target_type'] == 'revision': revisions.add(branch_data['target']) elif branch_data['target_type'] == 'release': release = next(storage.release_get([branch_data['target']])) revisions.add(release['target']) releases.add(hash_to_hex(branch_data['target'])) persons.add(release['author']['id']) for rev_log in storage.revision_shortlog(set(revisions)): rev_id = rev_log[0] revisions.add(rev_id) for rev in storage.revision_get(revisions): dir_id = rev['directory'] persons.add(rev['author']['id']) persons.add(rev['committer']['id']) directories.add(hash_to_hex(dir_id)) for entry in dir_iterator(storage, dir_id): content_path[entry['sha1']] = '/'.join( [hash_to_hex(dir_id), entry['path'].decode('utf-8')]) if entry['type'] == 'file': contents.add(entry['sha1']) elif entry['type'] == 'dir': directories.add(hash_to_hex(entry['target'])) # Get all checksums for each content contents_metadata = storage.content_get_metadata(contents) contents = [] for content_metadata in contents_metadata: contents.append({ algo: hash_to_hex(content_metadata[algo]) for algo in DEFAULT_ALGORITHMS }) path = content_path[content_metadata['sha1']] cnt = next(storage.content_get([content_metadata['sha1']])) mimetype, encoding = get_mimetype_and_encoding_for_content(cnt['data']) content_display_data = prepare_content_for_display( cnt['data'], mimetype, path) contents[-1]['path'] = path contents[-1]['mimetype'] = mimetype contents[-1]['encoding'] = encoding contents[-1]['hljs_language'] = content_display_data['language'] contents[-1]['data'] = content_display_data['content_data'] _contents[contents[-1]['sha1']] = contents[-1] # Create indexer storage instance that will be shared by indexers idx_storage = get_indexer_storage('memory', {}) # Add the empty directory to the test archive empty_dir_id = directory_identifier({'entries': []}) empty_dir_id_bin = hash_to_bytes(empty_dir_id) storage.directory_add([{'id': empty_dir_id_bin, 'entries': []}]) # Return tests data return { 'storage': storage, 'idx_storage': idx_storage, 'origins': _TEST_ORIGINS, 'contents': contents, 'directories': list(directories), 'persons': list(persons), 'releases': list(releases), 'revisions': list(map(hash_to_hex, revisions)), 'snapshots': list(snapshots), 'generated_checksums': set(), } def _init_indexers(tests_data): # Instantiate content indexers that will be used in tests # and force them to use the memory storages indexers = {} for idx_name, idx_class in (('mimetype_indexer', _MimetypeIndexer), ('license_indexer', _FossologyLicenseIndexer), ('ctags_indexer', _CtagsIndexer)): idx = idx_class() idx.storage = tests_data['storage'] idx.objstorage = tests_data['storage'].objstorage idx.idx_storage = tests_data['idx_storage'] idx.register_tools(idx.config['tools']) indexers[idx_name] = idx return indexers def get_content(content_sha1): return _contents.get(content_sha1) _tests_data = None _current_tests_data = None _indexer_loggers = {} def get_tests_data(reset=False): """ Initialize tests data and return them in a dict. """ global _tests_data, _current_tests_data if _tests_data is None: _tests_data = _init_tests_data() indexers = _init_indexers(_tests_data) for (name, idx) in indexers.items(): # pytest makes the loggers use a temporary file; and deepcopy # requires serializability. So we remove them, and add them # back after the copy. _indexer_loggers[name] = idx.log del idx.log _tests_data.update(indexers) if reset or _current_tests_data is None: _current_tests_data = deepcopy(_tests_data) for (name, logger) in _indexer_loggers.items(): _current_tests_data[name].log = logger return _current_tests_data def override_storages(storage, idx_storage): """ Helper function to replace the storages from which archive data are fetched. """ swh_config = config.get_config() swh_config.update({'storage': storage}) service.storage = storage swh_config.update({'indexer_storage': idx_storage}) service.idx_storage = idx_storage diff --git a/swh/web/tests/strategies.py b/swh/web/tests/strategies.py index 53dd4b84..e79874dc 100644 --- a/swh/web/tests/strategies.py +++ b/swh/web/tests/strategies.py @@ -1,536 +1,536 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random from collections import defaultdict from datetime import datetime from hypothesis import settings, assume from hypothesis.strategies import ( just, sampled_from, lists, composite, datetimes, integers, binary, text, characters ) from swh.model.hashutil import hash_to_hex, hash_to_bytes from swh.model.identifiers import directory_identifier from swh.storage.algos.revisions_walker import get_revisions_walker from swh.model.hypothesis_strategies import ( origins as new_origin_strategy, snapshots as new_snapshot ) from swh.web.tests.data import get_tests_data # Module dedicated to the generation of input data for tests through # the use of hypothesis. # Some of these data are sampled from a test archive created and populated # in the swh.web.tests.data module. # Set the swh-web hypothesis profile if none has been explicitly set hypothesis_default_settings = settings.get_profile('default') if repr(settings()) == repr(hypothesis_default_settings): settings.load_profile('swh-web') # The following strategies exploit the hypothesis capabilities def _filter_checksum(cs): generated_checksums = get_tests_data()['generated_checksums'] if not int.from_bytes(cs, byteorder='little') or \ cs in generated_checksums: return False generated_checksums.add(cs) return True def _known_swh_object(object_type): return sampled_from(get_tests_data()[object_type]) def sha1(): """ Hypothesis strategy returning a valid hexadecimal sha1 value. """ return binary( min_size=20, max_size=20).filter(_filter_checksum).map(hash_to_hex) def invalid_sha1(): """ Hypothesis strategy returning an invalid sha1 representation. """ return binary( min_size=50, max_size=50).filter(_filter_checksum).map(hash_to_hex) def sha256(): """ Hypothesis strategy returning a valid hexadecimal sha256 value. """ return binary( min_size=32, max_size=32).filter(_filter_checksum).map(hash_to_hex) def content(): """ Hypothesis strategy returning a random content ingested into the test archive. """ return _known_swh_object('contents') def contents(): """ Hypothesis strategy returning random contents ingested into the test archive. """ return lists(content(), min_size=2, max_size=8) def content_text(): """ Hypothesis strategy returning random textual contents ingested into the test archive. """ return content().filter(lambda c: c['mimetype'].startswith('text/')) def content_text_non_utf8(): """ Hypothesis strategy returning random textual contents not encoded to UTF-8 ingested into the test archive. """ return content().filter(lambda c: c['mimetype'].startswith('text/') and c['encoding'] not in ('utf-8', 'us-ascii')) def content_text_no_highlight(): """ Hypothesis strategy returning random textual contents with no detected programming language to highlight ingested into the test archive. """ return content().filter(lambda c: c['mimetype'].startswith('text/') and c['hljs_language'] == 'nohighlight') def content_image_type(): """ Hypothesis strategy returning random image contents ingested into the test archive. """ return content().filter(lambda c: c['mimetype'].startswith('image/')) @composite def new_content(draw): blake2s256_hex = draw(sha256()) sha1_hex = draw(sha1()) sha1_git_hex = draw(sha1()) sha256_hex = draw(sha256()) assume(sha1_hex != sha1_git_hex) assume(blake2s256_hex != sha256_hex) return { 'blake2S256': blake2s256_hex, 'sha1': sha1_hex, 'sha1_git': sha1_git_hex, 'sha256': sha256_hex } def unknown_content(): """ Hypothesis strategy returning a random content not ingested into the test archive. """ return new_content().filter( lambda c: next(get_tests_data()['storage'].content_get( [hash_to_bytes(c['sha1'])])) is None) def unknown_contents(): """ Hypothesis strategy returning random contents not ingested into the test archive. """ return lists(unknown_content(), min_size=2, max_size=8) def directory(): """ Hypothesis strategy returning a random directory ingested into the test archive. """ return _known_swh_object('directories') def directory_with_subdirs(): """ Hypothesis strategy returning a random directory containing sub directories ingested into the test archive. """ storage = get_tests_data()['storage'] return directory().filter( lambda d: any([e['type'] == 'dir' for e in list(storage.directory_ls(hash_to_bytes(d)))])) def empty_directory(): """ Hypothesis strategy returning the empty directory ingested into the test archive. """ return just(directory_identifier({'entries': []})) def unknown_directory(): """ Hypothesis strategy returning a random directory not ingested into the test archive. """ storage = get_tests_data()['storage'] return sha1().filter( lambda s: len(list(storage.directory_missing([hash_to_bytes(s)]))) > 0) def origin(): """ Hypothesis strategy returning a random origin ingested into the test archive. """ return _known_swh_object('origins') def origin_with_multiple_visits(): """ Hypothesis strategy returning a random origin ingested into the test archive. """ ret = [] tests_data = get_tests_data() for origin in tests_data['origins']: - visits = list(tests_data['storage'].origin_visit_get(origin['id'])) + visits = list(tests_data['storage'].origin_visit_get(origin['url'])) if len(visits) > 1: ret.append(origin) return sampled_from(ret) def origin_with_release(): """ Hypothesis strategy returning a random origin ingested into the test archive. """ ret = [] tests_data = get_tests_data() for origin in tests_data['origins']: - snapshot = tests_data['storage'].snapshot_get_latest(origin['id']) + snapshot = tests_data['storage'].snapshot_get_latest(origin['url']) if any([b['target_type'] == 'release' for b in snapshot['branches'].values()]): ret.append(origin) return sampled_from(ret) def unknown_origin_id(): """ Hypothesis strategy returning a random origin id not ingested into the test archive. """ return integers(min_value=1000000) def new_origin(): """ Hypothesis strategy returning a random origin not ingested into the test archive. """ storage = get_tests_data()['storage'] return new_origin_strategy().map(lambda origin: origin.to_dict()).filter( lambda origin: storage.origin_get([origin])[0] is None) def new_origins(nb_origins=None): """ Hypothesis strategy returning random origins not ingested into the test archive. """ min_size = nb_origins if nb_origins is not None else 2 max_size = nb_origins if nb_origins is not None else 8 size = random.randint(min_size, max_size) return lists(new_origin(), min_size=size, max_size=size, unique_by=lambda o: tuple(sorted(o.items()))) def visit_dates(nb_dates=None): """ Hypothesis strategy returning a list of visit dates. """ min_size = nb_dates if nb_dates else 2 max_size = nb_dates if nb_dates else 8 return lists(datetimes(min_value=datetime(2015, 1, 1, 0, 0), max_value=datetime(2018, 12, 31, 0, 0)), min_size=min_size, max_size=max_size, unique=True).map(sorted) def release(): """ Hypothesis strategy returning a random release ingested into the test archive. """ return _known_swh_object('releases') def unknown_release(): """ Hypothesis strategy returning a random revision not ingested into the test archive. """ return sha1().filter( lambda s: next(get_tests_data()['storage'].release_get([s])) is None) def revision(): """ Hypothesis strategy returning a random revision ingested into the test archive. """ return _known_swh_object('revisions') def unknown_revision(): """ Hypothesis strategy returning a random revision not ingested into the test archive. """ storage = get_tests_data()['storage'] return sha1().filter( lambda s: next(storage.revision_get([hash_to_bytes(s)])) is None) @composite def new_person(draw): """ Hypothesis strategy returning random raw swh person data. """ name = draw(text(min_size=5, max_size=30, alphabet=characters(min_codepoint=0, max_codepoint=255))) email = '%s@company.org' % name return { 'name': name.encode(), 'email': email.encode(), 'fullname': ('%s <%s>' % (name, email)).encode() } @composite def new_swh_date(draw): """ Hypothesis strategy returning random raw swh date data. """ timestamp = draw( datetimes(min_value=datetime(2015, 1, 1, 0, 0), max_value=datetime(2018, 12, 31, 0, 0)).map( lambda d: int(d.timestamp()))) return { 'timestamp': timestamp, 'offset': 0, 'negative_utc': False, } @composite def new_revision(draw): """ Hypothesis strategy returning random raw swh revision data not ingested into the test archive. """ return { 'id': draw(unknown_revision().map(hash_to_bytes)), 'directory': draw(sha1().map(hash_to_bytes)), 'author': draw(new_person()), 'committer': draw(new_person()), 'message': draw( text(min_size=20, max_size=100).map(lambda t: t.encode())), 'date': draw(new_swh_date()), 'committer_date': draw(new_swh_date()), 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], } def revisions(min_size=2, max_size=8): """ Hypothesis strategy returning random revisions ingested into the test archive. """ return lists(revision(), min_size=min_size, max_size=max_size) def unknown_revisions(min_size=2, max_size=8): """ Hypothesis strategy returning random revisions not ingested into the test archive. """ return lists(unknown_revision(), min_size=min_size, max_size=max_size) def snapshot(): """ Hypothesis strategy returning a random snapshot ingested into the test archive. """ return _known_swh_object('snapshots') def new_snapshots(nb_snapshots=None): min_size = nb_snapshots if nb_snapshots else 2 max_size = nb_snapshots if nb_snapshots else 8 return lists(new_snapshot(min_size=2, max_size=10, only_objects=True) .map(lambda snp: snp.to_dict()), min_size=min_size, max_size=max_size) def unknown_snapshot(): """ Hypothesis strategy returning a random revision not ingested into the test archive. """ storage = get_tests_data()['storage'] return sha1().filter( lambda s: storage.snapshot_get(hash_to_bytes(s)) is None) def person(): """ Hypothesis strategy returning a random person ingested into the test archive. """ return _known_swh_object('persons') def unknown_person(): """ Hypothesis strategy returning a random person not ingested into the test archive. """ return integers(min_value=1000000) def _get_origin_dfs_revisions_walker(): tests_data = get_tests_data() storage = tests_data['storage'] origin = random.choice(tests_data['origins'][:-1]) - snapshot = storage.snapshot_get_latest(origin['id']) + snapshot = storage.snapshot_get_latest(origin['url']) head = snapshot['branches'][b'HEAD']['target'] return get_revisions_walker('dfs', storage, head) def ancestor_revisions(): """ Hypothesis strategy returning a pair of revisions ingested into the test archive with an ancestor relation. """ # get a dfs revisions walker for one of the origins # loaded into the test archive revisions_walker = _get_origin_dfs_revisions_walker() master_revisions = [] children = defaultdict(list) init_rev_found = False # get revisions only authored in the master branch for rev in revisions_walker: for rev_p in rev['parents']: children[rev_p].append(rev['id']) if not init_rev_found: master_revisions.append(rev) if not rev['parents']: init_rev_found = True # head revision root_rev = master_revisions[0] # pick a random revision, different from head, only authored # in the master branch ancestor_rev_idx = random.choice(list(range(1, len(master_revisions)-1))) ancestor_rev = master_revisions[ancestor_rev_idx] ancestor_child_revs = children[ancestor_rev['id']] return just({ 'sha1_git_root': hash_to_hex(root_rev['id']), 'sha1_git': hash_to_hex(ancestor_rev['id']), 'children': [hash_to_hex(r) for r in ancestor_child_revs] }) def non_ancestor_revisions(): """ Hypothesis strategy returning a pair of revisions ingested into the test archive with no ancestor relation. """ # get a dfs revisions walker for one of the origins # loaded into the test archive revisions_walker = _get_origin_dfs_revisions_walker() merge_revs = [] children = defaultdict(list) # get all merge revisions for rev in revisions_walker: if len(rev['parents']) > 1: merge_revs.append(rev) for rev_p in rev['parents']: children[rev_p].append(rev['id']) # find a merge revisions whose parents have a unique child revision random.shuffle(merge_revs) selected_revs = None for merge_rev in merge_revs: if all(len(children[rev_p]) == 1 for rev_p in merge_rev['parents']): selected_revs = merge_rev['parents'] return just({ 'sha1_git_root': hash_to_hex(selected_revs[0]), 'sha1_git': hash_to_hex(selected_revs[1]) }) # The following strategies returns data specific to some tests # that can not be generated and thus are hardcoded. def contents_with_ctags(): """ Hypothesis strategy returning contents ingested into the test archive. Those contents are ctags compatible, that is running ctags on those lay results. """ return just({ 'sha1s': ['0ab37c02043ebff946c1937523f60aadd0844351', '15554cf7608dde6bfefac7e3d525596343a85b6f', '2ce837f1489bdfb8faf3ebcc7e72421b5bea83bd', '30acd0b47fc25e159e27a980102ddb1c4bea0b95', '4f81f05aaea3efb981f9d90144f746d6b682285b', '5153aa4b6e4455a62525bc4de38ed0ff6e7dd682', '59d08bafa6a749110dfb65ba43a61963d5a5bf9f', '7568285b2d7f31ae483ae71617bd3db873deaa2c', '7ed3ee8e94ac52ba983dd7690bdc9ab7618247b4', '8ed7ef2e7ff9ed845e10259d08e4145f1b3b5b03', '9b3557f1ab4111c8607a4f2ea3c1e53c6992916c', '9c20da07ed14dc4fcd3ca2b055af99b2598d8bdd', 'c20ceebd6ec6f7a19b5c3aebc512a12fbdc9234b', 'e89e55a12def4cd54d5bff58378a3b5119878eb7', 'e8c0654fe2d75ecd7e0b01bee8a8fc60a130097e', 'eb6595e559a1d34a2b41e8d4835e0e4f98a5d2b5'], 'symbol_name': 'ABS' }) def revision_with_submodules(): """ Hypothesis strategy returning a revision that is known to point to a directory with revision entries (aka git submodule) """ return just({ 'rev_sha1_git': 'ffcb69001f3f6745dfd5b48f72ab6addb560e234', 'rev_dir_sha1_git': 'd92a21446387fa28410e5a74379c934298f39ae2', 'rev_dir_rev_path': 'libtess2' }) diff --git a/tox.ini b/tox.ini index a2aec2e1..85664054 100644 --- a/tox.ini +++ b/tox.ini @@ -1,26 +1,36 @@ [tox] envlist=flake8,py3 [testenv:py3] deps = .[testing] pytest-cov pytest-django commands = pytest --hypothesis-profile=swh-web-fast --cov {envsitepackagesdir}/swh/web --cov-branch {posargs} {envsitepackagesdir}/swh/web +[testenv:py3-no-origin-ids] +deps = + .[testing] + pytest-cov + pytest-django +setenv = + SWH_STORAGE_IN_MEMORY_ENABLE_ORIGIN_IDS=false +commands = + pytest --hypothesis-profile=swh-web-fast --cov {envsitepackagesdir}/swh/web --cov-branch {posargs} {envsitepackagesdir}/swh/web -m "not origin_id" + [testenv:py3-slow] deps = .[testing] pytest-cov pytest-django commands = pytest --hypothesis-profile=swh-web --cov {envsitepackagesdir}/swh/web --cov-branch {posargs} {envsitepackagesdir}/swh/web [testenv:flake8] skip_install = true deps = flake8 commands = {envpython} -m flake8 \ --exclude=.tox,.git,__pycache__,.eggs,*.egg,node_modules