diff --git a/swh/web/api/utils.py b/swh/web/api/utils.py index a3e1845c..0bcaf527 100644 --- a/swh/web/api/utils.py +++ b/swh/web/api/utils.py @@ -1,340 +1,359 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import re from swh.web.common.utils import reverse, fmap +from swh.web.common.query import parse_hash def filter_endpoints(url_map, prefix_url_rule, blacklist=[]): """Filter endpoints by prefix url rule. Args: - url_map: Url Werkzeug.Map of rules - prefix_url_rule: prefix url string - blacklist: blacklist of some url Returns: Dictionary of url_rule with values methods and endpoint. The key is the url, the associated value is a dictionary of 'methods' (possible http methods) and 'endpoint' (python function) """ out = {} for r in url_map: rule = r['rule'] if rule == prefix_url_rule or rule in blacklist: continue if rule.startswith(prefix_url_rule): out[rule] = {'methods': sorted(map(str, r['methods'])), 'endpoint': r['endpoint']} return out def prepare_data_for_view(data, encoding='utf-8'): def prepare_data(s): # Note: can only be 'data' key with bytes of raw content if isinstance(s, bytes): try: return s.decode(encoding) except: return "Cannot decode the data bytes, try and set another " \ "encoding in the url (e.g. ?encoding=utf8) or " \ "download directly the " \ "content's raw data." if isinstance(s, str): return re.sub(r'/api/1/', r'/browse/', s) return s return fmap(prepare_data, data) def filter_field_keys(data, field_keys): """Given an object instance (directory or list), and a csv field keys to filter on. Return the object instance with filtered keys. Note: Returns obj as is if it's an instance of types not in (dictionary, list) Args: - data: one object (dictionary, list...) to filter. - field_keys: csv or set of keys to filter the object on Returns: obj filtered on field_keys """ if isinstance(data, map): return map(lambda x: filter_field_keys(x, field_keys), data) if isinstance(data, list): return [filter_field_keys(x, field_keys) for x in data] if isinstance(data, dict): return {k: v for (k, v) in data.items() if k in field_keys} return data def person_to_string(person): """Map a person (person, committer, tagger, etc...) to a string. """ return ''.join([person['name'], ' <', person['email'], '>']) def enrich_object(object): """Enrich an object (revision, release) with link to the 'target' of type 'target_type'. Args: object: An object with target and target_type keys (e.g. release, revision) Returns: Object enriched with target_url pointing to the right swh.web.ui.api urls for the pointing object (revision, release, content, directory) """ obj = object.copy() if 'target' in obj and 'target_type' in obj: if obj['target_type'] == 'revision': obj['target_url'] = reverse('revision', kwargs={'sha1_git': obj['target']}) elif obj['target_type'] == 'release': obj['target_url'] = reverse('release', kwargs={'sha1_git': obj['target']}) elif obj['target_type'] == 'content': obj['target_url'] = \ reverse('content', kwargs={'q': 'sha1_git:' + obj['target']}) elif obj['target_type'] == 'directory': obj['target_url'] = reverse('directory', kwargs={'sha1_git': obj['target']}) if 'author' in obj: author = obj['author'] obj['author_url'] = reverse('person', kwargs={'person_id': author['id']}) return obj enrich_release = enrich_object def enrich_directory(directory, context_url=None): """Enrich directory with url to content or directory. """ if 'type' in directory: target_type = directory['type'] target = directory['target'] if target_type == 'file': directory['target_url'] = \ reverse('content', kwargs={'q': 'sha1_git:%s' % target}) if context_url: directory['file_url'] = context_url + directory['name'] + '/' else: directory['target_url'] = reverse('directory', kwargs={'sha1_git': target}) if context_url: directory['dir_url'] = context_url + directory['name'] + '/' return directory def enrich_metadata_endpoint(content): """Enrich metadata endpoint with link to the upper metadata endpoint. """ c = content.copy() c['content_url'] = reverse('content', args=['sha1:%s' % c['id']]) return c -def enrich_content(content, top_url=False): +def enrich_content(content, top_url=False, query_string=None): """Enrich content with links to: - data_url: its raw data - filetype_url: its filetype information + - language_url: its programming language information + - license_url: its licensing information + + Args: + content: dict of data associated to a swh content object + top_url: whether or not to include the content url in + the enriched data + query_string: optional query string of type ':' + used when requesting the content, it acts as a hint + for picking the same hash method when computing + the url listed above + + Returns: + An enriched content dict filled with additional urls """ - for h in ['sha1', 'sha1_git', 'sha256']: - if h in content: - q = '%s:%s' % (h, content[h]) - if top_url: - content['content_url'] = reverse('content', kwargs={'q': q}) - content['data_url'] = reverse('content-raw', kwargs={'q': q}) - content['filetype_url'] = reverse('content-filetype', - kwargs={'q': q}) - content['language_url'] = reverse('content-language', - kwargs={'q': q}) - content['license_url'] = reverse('content-license', - kwargs={'q': q}) - break + checksums = content + if 'checksums' in content: + checksums = content['checksums'] + hash_algo = 'sha1' + if query_string: + hash_algo = parse_hash(query_string)[0] + if hash_algo in checksums: + q = '%s:%s' % (hash_algo, checksums[hash_algo]) + if top_url: + content['content_url'] = reverse('content', kwargs={'q': q}) + content['data_url'] = reverse('content-raw', kwargs={'q': q}) + content['filetype_url'] = reverse('content-filetype', + kwargs={'q': q}) + content['language_url'] = reverse('content-language', + kwargs={'q': q}) + content['license_url'] = reverse('content-license', + kwargs={'q': q}) return content def enrich_entity(entity): """Enrich entity with """ if 'uuid' in entity: entity['uuid_url'] = reverse('entity', kwargs={'uuid': entity['uuid']}) if 'parent' in entity and entity['parent']: entity['parent_url'] = reverse('entity', kwargs={'uuid': entity['parent']}) return entity def _get_path_list(path_string): """Helper for enrich_revision: get a list of the sha1 id of the navigation breadcrumbs, ordered from the oldest to the most recent. Args: path_string: the path as a '/'-separated string Returns: The navigation context as a list of sha1 revision ids """ return path_string.split('/') def _get_revision_contexts(rev_id, context): """Helper for enrich_revision: retrieve for the revision id and potentially the navigation breadcrumbs the context to pass to parents and children of of the revision. Args: rev_id: the revision's sha1 id context: the current navigation context Returns: The context for parents, children and the url of the direct child as a tuple in that order. """ context_for_parents = None context_for_children = None url_direct_child = None if not context: return (rev_id, None, None) path_list = _get_path_list(context) context_for_parents = '%s/%s' % (context, rev_id) prev_for_children = path_list[:-1] if len(prev_for_children) > 0: context_for_children = '/'.join(prev_for_children) child_id = path_list[-1] # This commit is not the first commit in the path if context_for_children: url_direct_child = reverse('revision-context', kwargs={'sha1_git': child_id, 'context': context_for_children}) # This commit is the first commit in the path else: url_direct_child = reverse('revision', kwargs={'sha1_git': child_id}) return (context_for_parents, context_for_children, url_direct_child) def _make_child_url(rev_children, context): """Helper for enrich_revision: retrieve the list of urls corresponding to the children of the current revision according to the navigation breadcrumbs. Args: rev_children: a list of revision id context: the '/'-separated navigation breadcrumbs Returns: the list of the children urls according to the context """ children = [] for child in rev_children: if context and child != _get_path_list(context)[-1]: children.append(reverse('revision', kwargs={'sha1_git': child})) elif not context: children.append(reverse('revision', kwargs={'sha1_git': child})) return children def enrich_revision(revision, context=None): """Enrich revision with links where it makes sense (directory, parents). Keep track of the navigation breadcrumbs if they are specified. Args: revision: the revision as a dict context: the navigation breadcrumbs as a /-separated string of revision sha1_git """ ctx_parents, ctx_children, url_direct_child = _get_revision_contexts( revision['id'], context) revision['url'] = reverse('revision', kwargs={'sha1_git': revision['id']}) revision['history_url'] = reverse('revision-log', kwargs={'sha1_git': revision['id']}) if context: revision['history_context_url'] = reverse( 'revision-log', kwargs={'sha1_git': revision['id'], 'prev_sha1s': context}) if 'author' in revision: author = revision['author'] revision['author_url'] = reverse('person', kwargs={'person_id': author['id']}) if 'committer' in revision: committer = revision['committer'] revision['committer_url'] = \ reverse('person', kwargs={'person_id': committer['id']}) if 'directory' in revision: revision['directory_url'] = \ reverse('directory', kwargs={'sha1_git': revision['directory']}) if 'parents' in revision: parents = [] for parent in revision['parents']: parents.append({ 'id': parent, 'url': reverse('revision', kwargs={'sha1_git': parent}) }) revision['parents'] = parents if 'children' in revision: children = _make_child_url(revision['children'], context) if url_direct_child: children.append(url_direct_child) revision['children_urls'] = children else: if url_direct_child: revision['children_urls'] = [url_direct_child] if 'message_decoding_failed' in revision: revision['message_url'] = reverse('revision-raw-message', kwargs={'sha1_git': revision['id']}) return revision def get_query_params(request): """Utility functions for retrieving query parameters from a DRF request object. Its purpose is to handle multiple versions of DRF.""" if hasattr(request, 'query_params'): # DRF >= 3.0 uses query_params attribute return request.query_params else: # while DRF < 3.0 uses QUERY_PARAMS attribute return request.QUERY_PARAMS diff --git a/swh/web/api/views/content.py b/swh/web/api/views/content.py index 0c20534b..2ceb501c 100644 --- a/swh/web/api/views/content.py +++ b/swh/web/api/views/content.py @@ -1,340 +1,340 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import functools from django.http import HttpResponse from swh.web.common import service from swh.web.common.utils import reverse from swh.web.common.exc import NotFoundExc, ForbiddenExc from swh.web.api import apidoc as api_doc from swh.web.api import utils from swh.web.api.apiurls import api_route from swh.web.api.views import ( _api_lookup, _doc_exc_id_not_found, _doc_header_link, _doc_arg_last_elt, _doc_arg_per_page, _doc_exc_bad_id, _doc_arg_content_id ) @api_route(r'/content/(?P.+)/provenance/', 'content-provenance') @api_doc.route('/content/provenance/', tags=['hidden']) @api_doc.arg('q', default='sha1_git:88b9b366facda0b5ff8d8640ee9279bed346f242', argtype=api_doc.argtypes.algo_and_hash, argdoc=_doc_arg_content_id) @api_doc.raises(exc=api_doc.excs.badinput, doc=_doc_exc_bad_id) @api_doc.raises(exc=api_doc.excs.notfound, doc=_doc_exc_id_not_found) @api_doc.returns(rettype=api_doc.rettypes.dict, retdoc="""List of provenance information (dict) for the matched content.""") def api_content_provenance(request, q): """Return content's provenance information if any. """ def _enrich_revision(provenance): p = provenance.copy() p['revision_url'] = \ reverse('revision', kwargs={'sha1_git': provenance['revision']}) p['content_url'] = \ reverse('content', kwargs={'q': 'sha1_git:%s' % provenance['content']}) p['origin_url'] = \ reverse('origin', kwargs={'origin_id': provenance['origin']}) p['origin_visits_url'] = \ reverse('origin-visits', kwargs={'origin_id': provenance['origin']}) p['origin_visit_url'] = \ reverse('origin-visit', kwargs={'origin_id': provenance['origin'], 'visit_id': provenance['visit']}) return p return _api_lookup( service.lookup_content_provenance, q, notfound_msg='Content with {} not found.'.format(q), enrich_fn=_enrich_revision) @api_route(r'/content/(?P.+)/filetype/', 'content-filetype') @api_doc.route('/content/filetype/', tags=['upcoming']) @api_doc.arg('q', default='sha1:1fc6129a692e7a87b5450e2ba56e7669d0c5775d', argtype=api_doc.argtypes.algo_and_hash, argdoc=_doc_arg_content_id) @api_doc.raises(exc=api_doc.excs.badinput, doc=_doc_exc_bad_id) @api_doc.raises(exc=api_doc.excs.notfound, doc=_doc_exc_id_not_found) @api_doc.returns(rettype=api_doc.rettypes.dict, retdoc="""Filetype information (dict) for the matched content.""") def api_content_filetype(request, q): """Get information about the detected MIME type of a content object. """ return _api_lookup( service.lookup_content_filetype, q, notfound_msg='No filetype information found for content {}.'.format(q), enrich_fn=utils.enrich_metadata_endpoint) @api_route(r'/content/(?P.+)/language/', 'content-language') @api_doc.route('/content/language/', tags=['upcoming']) @api_doc.arg('q', default='sha1:1fc6129a692e7a87b5450e2ba56e7669d0c5775d', argtype=api_doc.argtypes.algo_and_hash, argdoc=_doc_arg_content_id) @api_doc.raises(exc=api_doc.excs.badinput, doc=_doc_exc_bad_id) @api_doc.raises(exc=api_doc.excs.notfound, doc=_doc_exc_id_not_found) @api_doc.returns(rettype=api_doc.rettypes.dict, retdoc="""Language information (dict) for the matched content.""") def api_content_language(request, q): """Get information about the detected (programming) language of a content object. """ return _api_lookup( service.lookup_content_language, q, notfound_msg='No language information found for content {}.'.format(q), enrich_fn=utils.enrich_metadata_endpoint) @api_route(r'/content/(?P.+)/license/', 'content-license') @api_doc.route('/content/license/', tags=['upcoming']) @api_doc.arg('q', default='sha1:1fc6129a692e7a87b5450e2ba56e7669d0c5775d', argtype=api_doc.argtypes.algo_and_hash, argdoc=_doc_arg_content_id) @api_doc.raises(exc=api_doc.excs.badinput, doc=_doc_exc_bad_id) @api_doc.raises(exc=api_doc.excs.notfound, doc=_doc_exc_id_not_found) @api_doc.returns(rettype=api_doc.rettypes.dict, retdoc="""License information (dict) for the matched content.""") def api_content_license(request, q): """Get information about the detected license of a content object. """ return _api_lookup( service.lookup_content_license, q, notfound_msg='No license information found for content {}.'.format(q), enrich_fn=utils.enrich_metadata_endpoint) @api_route(r'/content/(?P.+)/ctags/', 'content-ctags') @api_doc.route('/content/ctags/', tags=['upcoming']) @api_doc.arg('q', default='sha1:1fc6129a692e7a87b5450e2ba56e7669d0c5775d', argtype=api_doc.argtypes.algo_and_hash, argdoc=_doc_arg_content_id) @api_doc.raises(exc=api_doc.excs.badinput, doc=_doc_exc_bad_id) @api_doc.raises(exc=api_doc.excs.notfound, doc=_doc_exc_id_not_found) @api_doc.returns(rettype=api_doc.rettypes.dict, retdoc="""Ctags symbol (dict) for the matched content.""") def api_content_ctags(request, q): """Get information about all `Ctags `_-style symbols defined in a content object. """ return _api_lookup( service.lookup_content_ctags, q, notfound_msg='No ctags symbol found for content {}.'.format(q), enrich_fn=utils.enrich_metadata_endpoint) @api_route(r'/content/(?P.+)/raw/', 'content-raw') @api_doc.route('/content/raw/', handle_response=True) @api_doc.arg('q', default='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc', argtype=api_doc.argtypes.algo_and_hash, argdoc=_doc_arg_content_id) @api_doc.param('filename', default=None, argtype=api_doc.argtypes.str, doc='User\'s desired filename. If provided, the downloaded' ' content will get that filename.') @api_doc.raises(exc=api_doc.excs.badinput, doc=_doc_exc_bad_id) @api_doc.raises(exc=api_doc.excs.notfound, doc=_doc_exc_id_not_found) @api_doc.returns(rettype=api_doc.rettypes.octet_stream, retdoc='The raw content data as an octet stream') def api_content_raw(request, q): """Get the raw content of a content object (AKA "blob"), as a byte sequence. """ def generate(content): yield content['data'] content_raw = service.lookup_content_raw(q) if not content_raw: raise NotFoundExc('Content %s is not found.' % q) content_filetype = service.lookup_content_filetype(q) if not content_filetype: raise NotFoundExc('Content %s is not available for download.' % q) mimetype = content_filetype['mimetype'] if 'text/' not in mimetype: raise ForbiddenExc('Only textual content is available for download. ' 'Actual content mimetype is %s.' % mimetype) filename = utils.get_query_params(request).get('filename') if not filename: filename = 'content_%s_raw' % q.replace(':', '_') response = HttpResponse(generate(content_raw), content_type='application/octet-stream') response['Content-disposition'] = 'attachment; filename=%s' % filename return response @api_route(r'/content/symbol/(?P.+)/', 'content-symbol') @api_doc.route('/content/symbol/', tags=['upcoming']) @api_doc.arg('q', default='hello', argtype=api_doc.argtypes.str, argdoc="""An expression string to lookup in swh's raw content""") @api_doc.header('Link', doc=_doc_header_link) @api_doc.param('last_sha1', default=None, argtype=api_doc.argtypes.str, doc=_doc_arg_last_elt) @api_doc.param('per_page', default=10, argtype=api_doc.argtypes.int, doc=_doc_arg_per_page) @api_doc.returns(rettype=api_doc.rettypes.list, retdoc="""A list of dict whose content matches the expression. Each dict has the following keys: - id (bytes): identifier of the content - name (text): symbol whose content match the expression - kind (text): kind of the symbol that matched - lang (text): Language for that entry - line (int): Number line for the symbol """) def api_content_symbol(request, q=None): """Search content objects by `Ctags `_-style symbol (e.g., function name, data type, method, ...). """ result = {} last_sha1 = utils.get_query_params(request).get('last_sha1', None) per_page = int(utils.get_query_params(request).get('per_page', '10')) def lookup_exp(exp, last_sha1=last_sha1, per_page=per_page): return service.lookup_expression(exp, last_sha1, per_page) symbols = _api_lookup( lookup_exp, q, notfound_msg="No indexed raw content match expression '{}'.".format(q), enrich_fn=functools.partial(utils.enrich_content, top_url=True)) if symbols: l = len(symbols) if l == per_page: query_params = {} new_last_sha1 = symbols[-1]['sha1'] query_params['last_sha1'] = new_last_sha1 if utils.get_query_params(request).get('per_page'): query_params['per_page'] = per_page result['headers'] = { 'link-next': reverse('content-symbol', kwargs={'q': q}, query_params=query_params) } result.update({ 'results': symbols }) return result @api_route(r'/content/known/search/', 'content-known', methods=['POST']) @api_route(r'/content/known/(?P(?!search).*)/', 'content-known') @api_doc.route('/content/known/', tags=['hidden']) @api_doc.arg('q', default='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc', argtype=api_doc.argtypes.sha1, argdoc='content identifier as a sha1 checksum') @api_doc.param('q', default=None, argtype=api_doc.argtypes.str, doc="""(POST request) An algo_hash:hash string, where algo_hash is one of sha1, sha1_git or sha256 and hash is the hash to search for in SWH""") @api_doc.raises(exc=api_doc.excs.badinput, doc=_doc_exc_bad_id) @api_doc.returns(rettype=api_doc.rettypes.dict, retdoc="""a dictionary with results (found/not found for each given identifier) and statistics about how many identifiers were found""") def api_check_content_known(request, q=None): """Check whether some content (AKA "blob") is present in the archive. Lookup can be performed by various means: - a GET request with one or several hashes, separated by ',' - a POST request with one or several hashes, passed as (multiple) values for parameter 'q' """ response = {'search_res': None, 'search_stats': None} search_stats = {'nbfiles': 0, 'pct': 0} search_res = None queries = [] # GET: Many hash separated values request if q: hashes = q.split(',') for v in hashes: queries.append({'filename': None, 'sha1': v}) # POST: Many hash requests in post form submission elif request.method == 'POST': data = request.data if hasattr(request, 'data') else request.DATA # Remove potential inputs with no associated value for k, v in data.items(): if v is not None: if k == 'q' and len(v) > 0: queries.append({'filename': None, 'sha1': v}) elif v != '': queries.append({'filename': k, 'sha1': v}) if queries: lookup = service.lookup_multiple_hashes(queries) result = [] l = len(queries) for el in lookup: res_d = {'sha1': el['sha1'], 'found': el['found']} if 'filename' in el and el['filename']: res_d['filename'] = el['filename'] result.append(res_d) search_res = result nbfound = len([x for x in lookup if x['found']]) search_stats['nbfiles'] = l search_stats['pct'] = (nbfound / l) * 100 response['search_res'] = search_res response['search_stats'] = search_stats return response @api_route(r'/content/(?P.+)/', 'content') @api_doc.route('/content/') @api_doc.arg('q', default='dc2830a9e72f23c1dfebef4413003221baa5fb62', argtype=api_doc.argtypes.algo_and_hash, argdoc=_doc_arg_content_id) @api_doc.raises(exc=api_doc.excs.badinput, doc=_doc_exc_bad_id) @api_doc.raises(exc=api_doc.excs.notfound, doc=_doc_exc_id_not_found) @api_doc.returns(rettype=api_doc.rettypes.dict, retdoc="""known metadata for content identified by q""") def api_content_metadata(request, q): """Get information about a content (AKA "blob") object. """ return _api_lookup( service.lookup_content, q, notfound_msg='Content with {} not found.'.format(q), - enrich_fn=utils.enrich_content) + enrich_fn=functools.partial(utils.enrich_content, query_string=q)) diff --git a/swh/web/common/converters.py b/swh/web/common/converters.py index 7b494b39..931fb606 100644 --- a/swh/web/common/converters.py +++ b/swh/web/common/converters.py @@ -1,325 +1,342 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import json from swh.model import hashutil from swh.core.utils import decode_with_escape from swh.web.api import utils +def _group_checksums(data): + """Groups checksums values computed from hash functions used in swh + and stored in data dict under a single entry 'checksums' + """ + if data: + checksums = {} + for hash in hashutil.ALGORITHMS: + if hash in data and data[hash]: + checksums[hash] = data[hash] + del data[hash] + if len(checksums) > 0: + data['checksums'] = checksums + + def from_swh(dict_swh, hashess={}, bytess={}, dates={}, blacklist={}, removables_if_empty={}, empty_dict={}, empty_list={}, convert={}, convert_fn=lambda x: x): """Convert from an swh dictionary to something reasonably json serializable. Args: dict_swh: the origin dictionary needed to be transformed hashess: list/set of keys representing hashes values (sha1, sha256, sha1_git, etc...) as bytes. Those need to be transformed in hexadecimal string bytess: list/set of keys representing bytes values which needs to be decoded blacklist: set of keys to filter out from the conversion convert: set of keys whose associated values need to be converted using convert_fn convert_fn: the conversion function to apply on the value of key in 'convert' The remaining keys are copied as is in the output. Returns: dictionary equivalent as dict_swh only with its keys converted. """ def convert_hashes_bytes(v): """v is supposedly a hash as bytes, returns it converted in hex. """ if isinstance(v, bytes): return hashutil.hash_to_hex(v) return v def convert_bytes(v): """v is supposedly a bytes string, decode as utf-8. FIXME: Improve decoding policy. If not utf-8, break! """ if isinstance(v, bytes): return v.decode('utf-8') return v def convert_date(v): """ Args: v (dict or datatime): either: - a dict with three keys: - timestamp (dict or integer timestamp) - offset - negative_utc - or, a datetime We convert it to a human-readable string """ if isinstance(v, datetime.datetime): return v.isoformat() tz = datetime.timezone(datetime.timedelta(minutes=v['offset'])) swh_timestamp = v['timestamp'] if isinstance(swh_timestamp, dict): date = datetime.datetime.fromtimestamp( swh_timestamp['seconds'], tz=tz) else: date = datetime.datetime.fromtimestamp( swh_timestamp, tz=tz) datestr = date.isoformat() if v['offset'] == 0 and v['negative_utc']: # remove the rightmost + and replace it with a - return '-'.join(datestr.rsplit('+', 1)) return datestr if not dict_swh: return dict_swh new_dict = {} for key, value in dict_swh.items(): if key in blacklist or (key in removables_if_empty and not value): continue if key in dates: new_dict[key] = convert_date(value) elif key in convert: new_dict[key] = convert_fn(value) elif isinstance(value, dict): new_dict[key] = from_swh(value, hashess=hashess, bytess=bytess, dates=dates, blacklist=blacklist, removables_if_empty=removables_if_empty, empty_dict=empty_dict, empty_list=empty_list, convert=convert, convert_fn=convert_fn) elif key in hashess: new_dict[key] = utils.fmap(convert_hashes_bytes, value) elif key in bytess: try: new_dict[key] = utils.fmap(convert_bytes, value) except UnicodeDecodeError: if 'decoding_failures' not in new_dict: new_dict['decoding_failures'] = [key] else: new_dict['decoding_failures'].append(key) new_dict[key] = utils.fmap(decode_with_escape, value) elif key in empty_dict and not value: new_dict[key] = {} elif key in empty_list and not value: new_dict[key] = [] else: new_dict[key] = value + _group_checksums(new_dict) + return new_dict def from_provenance(provenance): """Convert from a provenance information to a provenance dictionary. Args: provenance (dict): Dictionary with the following keys: - content (sha1_git): the content's identifier - revision (sha1_git): the revision the content was seen - origin (int): the origin the content was seen - visit (int): the visit it occurred - path (bytes): the path the content was seen at """ return from_swh(provenance, hashess={'content', 'revision'}, bytess={'path'}) def from_origin(origin): """Convert from an SWH origin to an origin dictionary. """ return from_swh(origin, removables_if_empty={'lister', 'project'}) def from_release(release): """Convert from an SWH release to a json serializable release dictionary. Args: release (dict): dictionary with keys: - id: identifier of the revision (sha1 in bytes) - revision: identifier of the revision the release points to (sha1 in bytes) comment: release's comment message (bytes) name: release's name (string) author: release's author identifier (swh's id) synthetic: the synthetic property (boolean) Returns: dict: Release dictionary with the following keys: - id: hexadecimal sha1 (string) - revision: hexadecimal sha1 (string) - comment: release's comment message (string) - name: release's name (string) - author: release's author identifier (swh's id) - synthetic: the synthetic property (boolean) """ return from_swh( release, hashess={'id', 'target'}, bytess={'message', 'name', 'fullname', 'email'}, dates={'date'}, ) class SWHMetadataEncoder(json.JSONEncoder): """Special json encoder for metadata field which can contain bytes encoded value. """ def default(self, obj): if isinstance(obj, bytes): return obj.decode('utf-8') # Let the base class default method raise the TypeError return json.JSONEncoder.default(self, obj) def convert_revision_metadata(metadata): """Convert json specific dict to a json serializable one. """ if not metadata: return {} return json.loads(json.dumps(metadata, cls=SWHMetadataEncoder)) def from_revision(revision): """Convert from an SWH revision to a json serializable revision dictionary. Args: revision (dict): dict with keys: - id: identifier of the revision (sha1 in bytes) - directory: identifier of the directory the revision points to (sha1 in bytes) - author_name, author_email: author's revision name and email - committer_name, committer_email: committer's revision name and email - message: revision's message - date, date_offset: revision's author date - committer_date, committer_date_offset: revision's commit date - parents: list of parents for such revision - synthetic: revision's property nature - type: revision's type (git, tar or dsc at the moment) - metadata: if the revision is synthetic, this can reference dynamic properties. Returns: dict: Revision dictionary with the same keys as inputs, except: - sha1s are in hexadecimal strings (id, directory) - bytes are decoded in string (author_name, committer_name, author_email, committer_email) Remaining keys are left as is """ revision = from_swh(revision, hashess={'id', 'directory', 'parents', 'children'}, bytess={'name', 'fullname', 'email'}, convert={'metadata'}, convert_fn=convert_revision_metadata, dates={'date', 'committer_date'}) if revision: if 'parents' in revision: revision['merge'] = len(revision['parents']) > 1 if 'message' in revision: try: revision['message'] = revision['message'].decode('utf-8') except UnicodeDecodeError: revision['message_decoding_failed'] = True revision['message'] = None return revision def from_content(content): """Convert swh content to serializable content dictionary. """ return from_swh(content, hashess={'sha1', 'sha1_git', 'sha256', 'blake2s256'}, blacklist={'ctime'}, convert={'status'}, convert_fn=lambda v: 'absent' if v == 'hidden' else v) def from_person(person): """Convert swh person to serializable person dictionary. """ return from_swh(person, bytess={'name', 'fullname', 'email'}) def from_origin_visit(visit): """Convert swh origin_visit to serializable origin_visit dictionary. """ ov = from_swh(visit, hashess={'target'}, bytess={'branch'}, dates={'date'}, empty_dict={'metadata'}) if ov and 'occurrences' in ov: ov['occurrences'] = { decode_with_escape(k): v for k, v in ov['occurrences'].items() } return ov def from_directory_entry(dir_entry): """Convert swh person to serializable person dictionary. """ return from_swh(dir_entry, - hashess={'dir_id', 'sha1_git', 'sha1', 'sha256', 'target'}, + hashess={'dir_id', 'sha1_git', 'sha1', 'sha256', + 'blake2s256', 'target'}, bytess={'name'}, removables_if_empty={ - 'sha1', 'sha1_git', 'sha256', 'status'}, + 'sha1', 'sha1_git', 'sha256', 'blake2s256', 'status'}, convert={'status'}, convert_fn=lambda v: 'absent' if v == 'hidden' else v) def from_filetype(content_entry): """Convert swh person to serializable person dictionary. """ return from_swh(content_entry, hashess={'id'}, bytess={'mimetype', 'encoding'}) diff --git a/swh/web/common/service.py b/swh/web/common/service.py index ff3d9941..b919ba90 100644 --- a/swh/web/common/service.py +++ b/swh/web/common/service.py @@ -1,815 +1,813 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import os from collections import defaultdict from swh.model import hashutil from swh.web.common import converters from swh.web.common import query from swh.web.common.exc import NotFoundExc from swh.web import config storage = config.storage() MAX_LIMIT = 50 # Top limit the users can ask for def _first_element(l): """Returns the first element in the provided list or None if it is empty or None""" return next(iter(l or []), None) def lookup_multiple_hashes(hashes): """Lookup the passed hashes in a single DB connection, using batch processing. Args: An array of {filename: X, sha1: Y}, string X, hex sha1 string Y. Returns: The same array with elements updated with elem['found'] = true if the hash is present in storage, elem['found'] = false if not. """ hashlist = [hashutil.hash_to_bytes(elem['sha1']) for elem in hashes] content_missing = storage.content_missing_per_sha1(hashlist) missing = [hashutil.hash_to_hex(x) for x in content_missing] for x in hashes: x.update({'found': True}) for h in hashes: if h['sha1'] in missing: h['found'] = False return hashes def lookup_expression(expression, last_sha1, per_page): """Lookup expression in raw content. Args: expression (str): An expression to lookup through raw indexed content last_sha1 (str): Last sha1 seen per_page (int): Number of results per page Returns: List of ctags whose content match the expression """ limit = min(per_page, MAX_LIMIT) ctags = storage.content_ctags_search(expression, last_sha1=last_sha1, limit=limit) for ctag in ctags: ctag = converters.from_swh(ctag, hashess={'id'}) ctag['sha1'] = ctag['id'] ctag.pop('id') yield ctag def lookup_hash(q): """Checks if the storage contains a given content checksum Args: query string of the form Returns: Dict with key found containing the hash info if the hash is present, None if not. """ algo, hash = query.parse_hash(q) found = storage.content_find({algo: hash}) return {'found': found, 'algo': algo} def search_hash(q): """Checks if the storage contains a given content checksum Args: query string of the form Returns: Dict with key found to True or False, according to whether the checksum is present or not """ algo, hash = query.parse_hash(q) found = storage.content_find({algo: hash}) return {'found': found is not None} def lookup_content_provenance(q): """Return provenance information from a specified content. Args: q: query string of the form Yields: provenance information (dict) list if the content is found. """ algo, hash = query.parse_hash(q) provenances = storage.content_find_provenance({algo: hash}) if not provenances: return None return (converters.from_provenance(p) for p in provenances) def _lookup_content_sha1(q): """Given a possible input, query for the content's sha1. Args: q: query string of the form Returns: binary sha1 if found or None """ algo, hash = query.parse_hash(q) if algo != 'sha1': hashes = storage.content_find({algo: hash}) if not hashes: return None return hashes['sha1'] return hash def lookup_content_ctags(q): """Return ctags information from a specified content. Args: q: query string of the form Yields: ctags information (dict) list if the content is found. """ sha1 = _lookup_content_sha1(q) if not sha1: return None ctags = list(storage.content_ctags_get([sha1])) if not ctags: return None for ctag in ctags: yield converters.from_swh(ctag, hashess={'id'}) def lookup_content_filetype(q): """Return filetype information from a specified content. Args: q: query string of the form Yields: filetype information (dict) list if the content is found. """ sha1 = _lookup_content_sha1(q) if not sha1: return None filetype = _first_element(list(storage.content_mimetype_get([sha1]))) if not filetype: return None return converters.from_filetype(filetype) def lookup_content_language(q): """Return language information from a specified content. Args: q: query string of the form Yields: language information (dict) list if the content is found. """ sha1 = _lookup_content_sha1(q) if not sha1: return None lang = _first_element(list(storage.content_language_get([sha1]))) if not lang: return None return converters.from_swh(lang, hashess={'id'}) def lookup_content_license(q): """Return license information from a specified content. Args: q: query string of the form Yields: license information (dict) list if the content is found. """ sha1 = _lookup_content_sha1(q) if not sha1: return None lang = _first_element(storage.content_fossology_license_get([sha1])) if not lang: return None return converters.from_swh(lang, hashess={'id'}) def lookup_origin(origin): """Return information about the origin matching dict origin. Args: origin: origin's dict with keys either 'id' or ('type' AND 'url') Returns: origin information as dict. """ return converters.from_origin(storage.origin_get(origin)) def lookup_person(person_id): """Return information about the person with id person_id. Args: person_id as string Returns: person information as dict. """ person = _first_element(storage.person_get([person_id])) return converters.from_person(person) def lookup_directory(sha1_git): """Return information about the directory with id sha1_git. Args: sha1_git as string Returns: directory information as dict. """ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( sha1_git, ['sha1'], # HACK: sha1_git really 'Only sha1_git is supported.') dir = _first_element(storage.directory_get([sha1_git_bin])) if not dir: return None directory_entries = storage.directory_ls(sha1_git_bin) or [] return map(converters.from_directory_entry, directory_entries) def lookup_directory_with_path(directory_sha1_git, path_string): """Return directory information for entry with path path_string w.r.t. root directory pointed by directory_sha1_git Args: - directory_sha1_git: sha1_git corresponding to the directory to which we append paths to (hopefully) find the entry - the relative path to the entry starting from the directory pointed by directory_sha1_git Raises: NotFoundExc if the directory entry is not found """ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( directory_sha1_git, ['sha1'], 'Only sha1_git is supported.') paths = path_string.strip(os.path.sep).split(os.path.sep) queried_dir = storage.directory_entry_get_by_path( sha1_git_bin, list(map(lambda p: p.encode('utf-8'), paths))) if not queried_dir: raise NotFoundExc(('Directory entry with path %s from %s not found') % (path_string, directory_sha1_git)) return converters.from_directory_entry(queried_dir) def lookup_release(release_sha1_git): """Return information about the release with sha1 release_sha1_git. Args: release_sha1_git: The release's sha1 as hexadecimal Returns: Release information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. """ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( release_sha1_git, ['sha1'], 'Only sha1_git is supported.') res = _first_element(storage.release_get([sha1_git_bin])) return converters.from_release(res) def lookup_revision(rev_sha1_git): """Return information about the revision with sha1 revision_sha1_git. Args: revision_sha1_git: The revision's sha1 as hexadecimal Returns: Revision information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. """ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( rev_sha1_git, ['sha1'], 'Only sha1_git is supported.') revision = _first_element(storage.revision_get([sha1_git_bin])) return converters.from_revision(revision) def lookup_revision_multiple(sha1_git_list): """Return information about the revision with sha1 revision_sha1_git. Args: revision_sha1_git: The revision's sha1 as hexadecimal Returns: Revision information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. """ def to_sha1_bin(sha1_hex): _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( sha1_hex, ['sha1'], 'Only sha1_git is supported.') return sha1_git_bin sha1_bin_list = (to_sha1_bin(x) for x in sha1_git_list) revisions = storage.revision_get(sha1_bin_list) or [] return (converters.from_revision(x) for x in revisions) def lookup_revision_message(rev_sha1_git): """Return the raw message of the revision with sha1 revision_sha1_git. Args: revision_sha1_git: The revision's sha1 as hexadecimal Returns: Decoded revision message as dict {'message': } Raises: ValueError if the identifier provided is not of sha1 nature. NotFoundExc if the revision is not found, or if it has no message """ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( rev_sha1_git, ['sha1'], 'Only sha1_git is supported.') revision = _first_element(storage.revision_get([sha1_git_bin])) if not revision: raise NotFoundExc('Revision with sha1_git %s not found.' % rev_sha1_git) if 'message' not in revision: raise NotFoundExc('No message for revision with sha1_git %s.' % rev_sha1_git) res = {'message': revision['message']} return res def lookup_revision_by(origin_id, branch_name="refs/heads/master", timestamp=None): """Lookup revisions by origin_id, branch_name and timestamp. If: - branch_name is not provided, lookup using 'refs/heads/master' as default. - ts is not provided, use the most recent Args: - origin_id: origin of the revision. - branch_name: revision's branch. - timestamp: revision's time frame. Yields: The revisions matching the criterions. """ res = _first_element(storage.revision_get_by(origin_id, branch_name, timestamp=timestamp, limit=1)) return converters.from_revision(res) def lookup_revision_log(rev_sha1_git, limit): """Return information about the revision with sha1 revision_sha1_git. Args: revision_sha1_git: The revision's sha1 as hexadecimal limit: the maximum number of revisions returned Returns: Revision information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. """ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( rev_sha1_git, ['sha1'], 'Only sha1_git is supported.') revision_entries = storage.revision_log([sha1_git_bin], limit) return map(converters.from_revision, revision_entries) def lookup_revision_log_by(origin_id, branch_name, timestamp, limit): """Return information about the revision with sha1 revision_sha1_git. Args: origin_id: origin of the revision branch_name: revision's branch timestamp: revision's time frame limit: the maximum number of revisions returned Returns: Revision information as dict. Raises: NotFoundExc if no revision corresponds to the criterion NotFoundExc if the corresponding revision has no log """ revision_entries = storage.revision_log_by(origin_id, branch_name, timestamp, limit=limit) if not revision_entries: return None return map(converters.from_revision, revision_entries) def lookup_revision_with_context_by(origin_id, branch_name, ts, sha1_git, limit=100): """Return information about revision sha1_git, limited to the sub-graph of all transitive parents of sha1_git_root. sha1_git_root being resolved through the lookup of a revision by origin_id, branch_name and ts. In other words, sha1_git is an ancestor of sha1_git_root. Args: - origin_id: origin of the revision. - branch_name: revision's branch. - timestamp: revision's time frame. - sha1_git: one of sha1_git_root's ancestors. - limit: limit the lookup to 100 revisions back. Returns: Pair of (root_revision, revision). Information on sha1_git if it is an ancestor of sha1_git_root including children leading to sha1_git_root Raises: - BadInputExc in case of unknown algo_hash or bad hash. - NotFoundExc if either revision is not found or if sha1_git is not an ancestor of sha1_git_root. """ rev_root = _first_element(storage.revision_get_by(origin_id, branch_name, timestamp=ts, limit=1)) if not rev_root: raise NotFoundExc('Revision with (origin_id: %s, branch_name: %s' ', ts: %s) not found.' % (origin_id, branch_name, ts)) return (converters.from_revision(rev_root), lookup_revision_with_context(rev_root, sha1_git, limit)) def lookup_revision_with_context(sha1_git_root, sha1_git, limit=100): """Return information about revision sha1_git, limited to the sub-graph of all transitive parents of sha1_git_root. In other words, sha1_git is an ancestor of sha1_git_root. Args: sha1_git_root: latest revision. The type is either a sha1 (as an hex string) or a non converted dict. sha1_git: one of sha1_git_root's ancestors limit: limit the lookup to 100 revisions back Returns: Information on sha1_git if it is an ancestor of sha1_git_root including children leading to sha1_git_root Raises: BadInputExc in case of unknown algo_hash or bad hash NotFoundExc if either revision is not found or if sha1_git is not an ancestor of sha1_git_root """ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( sha1_git, ['sha1'], 'Only sha1_git is supported.') revision = _first_element(storage.revision_get([sha1_git_bin])) if not revision: raise NotFoundExc('Revision %s not found' % sha1_git) if isinstance(sha1_git_root, str): _, sha1_git_root_bin = query.parse_hash_with_algorithms_or_throws( sha1_git_root, ['sha1'], 'Only sha1_git is supported.') revision_root = _first_element(storage.revision_get([sha1_git_root_bin])) # noqa if not revision_root: raise NotFoundExc('Revision root %s not found' % sha1_git_root) else: sha1_git_root_bin = sha1_git_root['id'] revision_log = storage.revision_log([sha1_git_root_bin], limit) parents = {} children = defaultdict(list) for rev in revision_log: rev_id = rev['id'] parents[rev_id] = [] for parent_id in rev['parents']: parents[rev_id].append(parent_id) children[parent_id].append(rev_id) if revision['id'] not in parents: raise NotFoundExc('Revision %s is not an ancestor of %s' % (sha1_git, sha1_git_root)) revision['children'] = children[revision['id']] return converters.from_revision(revision) def lookup_directory_with_revision(sha1_git, dir_path=None, with_data=False): """Return information on directory pointed by revision with sha1_git. If dir_path is not provided, display top level directory. Otherwise, display the directory pointed by dir_path (if it exists). Args: sha1_git: revision's hash. dir_path: optional directory pointed to by that revision. with_data: boolean that indicates to retrieve the raw data if the path resolves to a content. Default to False (for the api) Returns: Information on the directory pointed to by that revision. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc either if the revision is not found or the path referenced does not exist. NotImplementedError in case of dir_path exists but do not reference a type 'dir' or 'file'. """ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( sha1_git, ['sha1'], 'Only sha1_git is supported.') revision = _first_element(storage.revision_get([sha1_git_bin])) if not revision: raise NotFoundExc('Revision %s not found' % sha1_git) dir_sha1_git_bin = revision['directory'] if dir_path: paths = dir_path.strip(os.path.sep).split(os.path.sep) entity = storage.directory_entry_get_by_path( dir_sha1_git_bin, list(map(lambda p: p.encode('utf-8'), paths))) if not entity: raise NotFoundExc( "Directory or File '%s' pointed to by revision %s not found" % (dir_path, sha1_git)) else: entity = {'type': 'dir', 'target': dir_sha1_git_bin} if entity['type'] == 'dir': directory_entries = storage.directory_ls(entity['target']) or [] - return {'type': 'dir', 'path': '.' if not dir_path else dir_path, 'revision': sha1_git, 'content': map(converters.from_directory_entry, directory_entries)} elif entity['type'] == 'file': # content content = storage.content_find({'sha1_git': entity['target']}) if with_data: c = _first_element(storage.content_get([content['sha1']])) content['data'] = c['data'] - return {'type': 'file', 'path': '.' if not dir_path else dir_path, 'revision': sha1_git, 'content': converters.from_content(content)} else: raise NotImplementedError('Entity of type %s not implemented.' % entity['type']) def lookup_content(q): """Lookup the content designed by q. Args: q: The release's sha1 as hexadecimal """ algo, hash = query.parse_hash(q) c = storage.content_find({algo: hash}) return converters.from_content(c) def lookup_content_raw(q): """Lookup the content defined by q. Args: q: query string of the form Returns: dict with 'sha1' and 'data' keys. data representing its raw data decoded. """ algo, hash = query.parse_hash(q) c = storage.content_find({algo: hash}) if not c: return None content = _first_element(storage.content_get([c['sha1']])) return converters.from_content(content) def stat_counters(): """Return the stat counters for Software Heritage Returns: A dict mapping textual labels to integer values. """ return storage.stat_counters() def _lookup_origin_visits(origin_id, last_visit=None, limit=10): """Yields the origin origin_ids' visits. Args: origin_id (int): origin to list visits for last_visit (int): last visit to lookup from limit (int): Number of elements max to display Yields: Dictionaries of origin_visit for that origin """ limit = min(limit, MAX_LIMIT) yield from storage.origin_visit_get( origin_id, last_visit=last_visit, limit=limit) def lookup_origin_visits(origin_id, last_visit=None, per_page=10): """Yields the origin origin_ids' visits. Args: origin_id: origin to list visits for Yields: Dictionaries of origin_visit for that origin """ visits = _lookup_origin_visits(origin_id, last_visit=last_visit, limit=per_page) for visit in visits: yield converters.from_origin_visit(visit) def lookup_origin_visit(origin_id, visit_id): """Return information about visit visit_id with origin origin_id. Args: origin_id: origin concerned by the visit visit_id: the visit identifier to lookup Yields: The dict origin_visit concerned """ visit = storage.origin_visit_get_by(origin_id, visit_id) return converters.from_origin_visit(visit) def lookup_entity_by_uuid(uuid): """Return the entity's hierarchy from its uuid. Args: uuid: entity's identifier. Returns: List of hierarchy entities from the entity with uuid. """ uuid = query.parse_uuid4(uuid) for entity in storage.entity_get(uuid): entity = converters.from_swh(entity, convert={'last_seen', 'uuid'}, convert_fn=lambda x: str(x)) yield entity def lookup_revision_through(revision, limit=100): """Retrieve a revision from the criterion stored in revision dictionary. Args: revision: Dictionary of criterion to lookup the revision with. Here are the supported combination of possible values: - origin_id, branch_name, ts, sha1_git - origin_id, branch_name, ts - sha1_git_root, sha1_git - sha1_git Returns: None if the revision is not found or the actual revision. """ if 'origin_id' in revision and \ 'branch_name' in revision and \ 'ts' in revision and \ 'sha1_git' in revision: return lookup_revision_with_context_by(revision['origin_id'], revision['branch_name'], revision['ts'], revision['sha1_git'], limit) if 'origin_id' in revision and \ 'branch_name' in revision and \ 'ts' in revision: return lookup_revision_by(revision['origin_id'], revision['branch_name'], revision['ts']) if 'sha1_git_root' in revision and \ 'sha1_git' in revision: return lookup_revision_with_context(revision['sha1_git_root'], revision['sha1_git'], limit) if 'sha1_git' in revision: return lookup_revision(revision['sha1_git']) # this should not happen raise NotImplementedError('Should not happen!') def lookup_directory_through_revision(revision, path=None, limit=100, with_data=False): """Retrieve the directory information from the revision. Args: revision: dictionary of criterion representing a revision to lookup path: directory's path to lookup. limit: optional query parameter to limit the revisions log (default to 100). For now, note that this limit could impede the transitivity conclusion about sha1_git not being an ancestor of. with_data: indicate to retrieve the content's raw data if path resolves to a content. Returns: The directory pointing to by the revision criterions at path. """ rev = lookup_revision_through(revision, limit) if not rev: raise NotFoundExc('Revision with criterion %s not found!' % revision) return (rev['id'], lookup_directory_with_revision(rev['id'], path, with_data)) diff --git a/swh/web/tests/api/test_utils.py b/swh/web/tests/api/test_utils.py index c5913059..48340b99 100644 --- a/swh/web/tests/api/test_utils.py +++ b/swh/web/tests/api/test_utils.py @@ -1,866 +1,880 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from unittest.mock import patch, call from nose.tools import istest, nottest from swh.web.api import utils class UtilsTestCase(unittest.TestCase): def setUp(self): self.maxDiff = None self.url_map = [dict(rule='/other/', methods=set(['GET', 'POST', 'HEAD']), endpoint='foo'), dict(rule='/some/old/url/', methods=set(['GET', 'POST']), endpoint='blablafn'), dict(rule='/other/old/url/', methods=set(['GET', 'HEAD']), endpoint='bar'), dict(rule='/other', methods=set([]), endpoint=None), dict(rule='/other2', methods=set([]), endpoint=None)] + self.sample_content_hashes = { + 'blake2s256': ('791e07fcea240ade6dccd0a9309141673' + 'c31242cae9c237cf3855e151abc78e9'), + 'sha1': 'dc2830a9e72f23c1dfebef4413003221baa5fb62', + 'sha1_git': 'fe95a46679d128ff167b7c55df5d02356c5a1ae1', + 'sha256': ('b5c7fe0536f44ef60c8780b6065d30bca74a5cd06' + 'd78a4a71ba1ad064770f0c9') + } @istest def filter_endpoints_1(self): # when actual_data = utils.filter_endpoints(self.url_map, '/some') # then self.assertEquals(actual_data, { '/some/old/url/': { 'methods': ['GET', 'POST'], 'endpoint': 'blablafn' } }) @istest def filter_endpoints_2(self): # when actual_data = utils.filter_endpoints(self.url_map, '/other', blacklist=['/other2']) # then # rules /other is skipped because its' exactly the prefix url # rules /other2 is skipped because it's blacklisted self.assertEquals(actual_data, { '/other/': { 'methods': ['GET', 'HEAD', 'POST'], 'endpoint': 'foo' }, '/other/old/url/': { 'methods': ['GET', 'HEAD'], 'endpoint': 'bar' } }) @istest def prepare_data_for_view_default_encoding(self): self.maxDiff = None # given inputs = [ { 'data': b'some blah data' }, { 'data': 1, 'data_url': '/api/1/some/api/call', }, { 'blah': 'foobar', 'blah_url': '/some/non/changed/api/call' }] # when actual_result = utils.prepare_data_for_view(inputs) # then self.assertEquals(actual_result, [ { 'data': 'some blah data', }, { 'data': 1, 'data_url': '/browse/some/api/call', }, { 'blah': 'foobar', 'blah_url': '/some/non/changed/api/call' } ]) @istest def prepare_data_for_view(self): self.maxDiff = None # given inputs = [ { 'data': b'some blah data' }, { 'data': 1, 'data_url': '/api/1/some/api/call', }, { 'blah': 'foobar', 'blah_url': '/some/non/changed/api/call' }] # when actual_result = utils.prepare_data_for_view(inputs, encoding='ascii') # then self.assertEquals(actual_result, [ { 'data': 'some blah data', }, { 'data': 1, 'data_url': '/browse/some/api/call', }, { 'blah': 'foobar', 'blah_url': '/some/non/changed/api/call' } ]) @istest def prepare_data_for_view_ko_cannot_decode(self): self.maxDiff = None # given inputs = { 'data': 'hé dude!'.encode('utf8'), } actual_result = utils.prepare_data_for_view(inputs, encoding='ascii') # then self.assertEquals(actual_result, { 'data': "Cannot decode the data bytes, try and set another " "encoding in the url (e.g. ?encoding=utf8) or " "download directly the " "content's raw data.", }) @istest def filter_field_keys_dict_unknown_keys(self): # when actual_res = utils.filter_field_keys( {'directory': 1, 'file': 2, 'link': 3}, {'directory1', 'file2'}) # then self.assertEqual(actual_res, {}) @istest def filter_field_keys_dict(self): # when actual_res = utils.filter_field_keys( {'directory': 1, 'file': 2, 'link': 3}, {'directory', 'link'}) # then self.assertEqual(actual_res, {'directory': 1, 'link': 3}) @istest def filter_field_keys_list_unknown_keys(self): # when actual_res = utils.filter_field_keys( [{'directory': 1, 'file': 2, 'link': 3}, {'1': 1, '2': 2, 'link': 3}], {'d'}) # then self.assertEqual(actual_res, [{}, {}]) @istest def filter_field_keys_map(self): # when actual_res = utils.filter_field_keys( map(lambda x: {'i': x['i']+1, 'j': x['j']}, [{'i': 1, 'j': None}, {'i': 2, 'j': None}, {'i': 3, 'j': None}]), {'i'}) # then self.assertEqual(list(actual_res), [{'i': 2}, {'i': 3}, {'i': 4}]) @istest def filter_field_keys_list(self): # when actual_res = utils.filter_field_keys( [{'directory': 1, 'file': 2, 'link': 3}, {'dir': 1, 'fil': 2, 'lin': 3}], {'directory', 'dir'}) # then self.assertEqual(actual_res, [{'directory': 1}, {'dir': 1}]) @istest def filter_field_keys_other(self): # given input_set = {1, 2} # when actual_res = utils.filter_field_keys(input_set, {'a', '1'}) # then self.assertEqual(actual_res, input_set) @istest def fmap(self): self.assertEquals([2, 3, None, 4], utils.fmap(lambda x: x+1, [1, 2, None, 3])) self.assertEquals([11, 12, 13], list(utils.fmap(lambda x: x+10, map(lambda x: x, [1, 2, 3])))) self.assertEquals({'a': 2, 'b': 4}, utils.fmap(lambda x: x*2, {'a': 1, 'b': 2})) self.assertEquals(100, utils.fmap(lambda x: x*10, 10)) self.assertEquals({'a': [2, 6], 'b': 4}, utils.fmap(lambda x: x*2, {'a': [1, 3], 'b': 2})) self.assertIsNone(utils.fmap(lambda x: x, None)) @istest def person_to_string(self): self.assertEqual(utils.person_to_string(dict(name='raboof', email='foo@bar')), 'raboof ') @istest def enrich_release_0(self): # when actual_release = utils.enrich_release({}) # then self.assertEqual(actual_release, {}) @patch('swh.web.api.utils.reverse') @istest def enrich_release_1(self, mock_django_reverse): # given def reverse_test_context(view_name, kwargs): if view_name == 'content': id = kwargs['q'] return '/api/1/content/%s/' % id elif view_name == 'person': id = kwargs['person_id'] return '/api/1/person/%s/' % id else: raise ValueError( 'This should not happened so fail if it does.') mock_django_reverse.side_effect = reverse_test_context # when actual_release = utils.enrich_release({ 'target': '123', 'target_type': 'content', 'author': { 'id': 100, 'name': 'author release name', 'email': 'author@email', }, }) # then self.assertEqual(actual_release, { 'target': '123', 'target_type': 'content', 'target_url': '/api/1/content/sha1_git:123/', 'author_url': '/api/1/person/100/', 'author': { 'id': 100, 'name': 'author release name', 'email': 'author@email', }, }) mock_django_reverse.assert_has_calls([ call('content', kwargs={'q': 'sha1_git:123'}), call('person', kwargs={'person_id': 100}) ]) @patch('swh.web.api.utils.reverse') @istest def enrich_release_2(self, mock_django_reverse): # given mock_django_reverse.return_value = '/api/1/dir/23/' # when actual_release = utils.enrich_release({'target': '23', 'target_type': 'directory'}) # then self.assertEqual(actual_release, { 'target': '23', 'target_type': 'directory', 'target_url': '/api/1/dir/23/' }) mock_django_reverse.assert_called_once_with('directory', kwargs={'sha1_git': '23'}) @patch('swh.web.api.utils.reverse') @istest def enrich_release_3(self, mock_django_reverse): # given mock_django_reverse.return_value = '/api/1/rev/3/' # when actual_release = utils.enrich_release({'target': '3', 'target_type': 'revision'}) # then self.assertEqual(actual_release, { 'target': '3', 'target_type': 'revision', 'target_url': '/api/1/rev/3/' }) mock_django_reverse.assert_called_once_with('revision', kwargs={'sha1_git': '3'}) @patch('swh.web.api.utils.reverse') @istest def enrich_release_4(self, mock_django_reverse): # given mock_django_reverse.return_value = '/api/1/rev/4/' # when actual_release = utils.enrich_release({'target': '4', 'target_type': 'release'}) # then self.assertEqual(actual_release, { 'target': '4', 'target_type': 'release', 'target_url': '/api/1/rev/4/' }) mock_django_reverse.assert_called_once_with('release', kwargs={'sha1_git': '4'}) @patch('swh.web.api.utils.reverse') @istest def enrich_directory_no_type(self, mock_django_reverse): # when/then self.assertEqual(utils.enrich_directory({'id': 'dir-id'}), {'id': 'dir-id'}) # given mock_django_reverse.return_value = '/api/content/sha1_git:123/' # when actual_directory = utils.enrich_directory({ 'id': 'dir-id', 'type': 'file', 'target': '123', }) # then self.assertEqual(actual_directory, { 'id': 'dir-id', 'type': 'file', 'target': '123', 'target_url': '/api/content/sha1_git:123/', }) mock_django_reverse.assert_called_once_with( 'content', kwargs={'q': 'sha1_git:123'}) @patch('swh.web.api.utils.reverse') @istest def enrich_directory_with_context_and_type_file(self, mock_django_reverse): # given mock_django_reverse.return_value = '/api/content/sha1_git:123/' # when actual_directory = utils.enrich_directory({ 'id': 'dir-id', 'type': 'file', 'name': 'hy', 'target': '789', }, context_url='/api/revision/revsha1/directory/prefix/path/') # then self.assertEqual(actual_directory, { 'id': 'dir-id', 'type': 'file', 'name': 'hy', 'target': '789', 'target_url': '/api/content/sha1_git:123/', 'file_url': '/api/revision/revsha1/directory' '/prefix/path/hy/' }) mock_django_reverse.assert_called_once_with( 'content', kwargs={'q': 'sha1_git:789'}) @patch('swh.web.api.utils.reverse') @istest def enrich_directory_with_context_and_type_dir(self, mock_django_reverse): # given mock_django_reverse.return_value = '/api/directory/456/' # when actual_directory = utils.enrich_directory({ 'id': 'dir-id', 'type': 'dir', 'name': 'emacs-42', 'target_type': 'file', 'target': '456', }, context_url='/api/revision/origin/2/directory/some/prefix/path/') # then self.assertEqual(actual_directory, { 'id': 'dir-id', 'type': 'dir', 'target_type': 'file', 'name': 'emacs-42', 'target': '456', 'target_url': '/api/directory/456/', 'dir_url': '/api/revision/origin/2/directory' '/some/prefix/path/emacs-42/' }) mock_django_reverse.assert_called_once_with('directory', kwargs={'sha1_git': '456'}) @istest def enrich_content_without_hashes(self): # when/then self.assertEqual(utils.enrich_content({'id': '123'}), {'id': '123'}) @patch('swh.web.api.utils.reverse') @istest def enrich_content_with_hashes(self, mock_django_reverse): - for h in ['sha1', 'sha256', 'sha1_git']: + + for algo, hash in self.sample_content_hashes.items(): + + query_string = '%s:%s' % (algo, hash) + # given mock_django_reverse.side_effect = [ - '/api/content/%s:123/raw/' % h, - '/api/filetype/%s:123/' % h, - '/api/language/%s:123/' % h, - '/api/license/%s:123/' % h, + '/api/content/%s/raw/' % query_string, + '/api/filetype/%s/' % query_string, + '/api/language/%s/' % query_string, + '/api/license/%s/' % query_string ] # when enriched_content = utils.enrich_content( { - 'id': '123', - h: 'blahblah' - } + algo: hash, + }, + query_string=query_string ) # then self.assertEqual( enriched_content, { - 'id': '123', - h: 'blahblah', - 'data_url': '/api/content/%s:123/raw/' % h, - 'filetype_url': '/api/filetype/%s:123/' % h, - 'language_url': '/api/language/%s:123/' % h, - 'license_url': '/api/license/%s:123/' % h, + algo: hash, + 'data_url': '/api/content/%s/raw/' % query_string, + 'filetype_url': '/api/filetype/%s/' % query_string, + 'language_url': '/api/language/%s/' % query_string, + 'license_url': '/api/license/%s/' % query_string, } ) mock_django_reverse.assert_has_calls([ - call('content-raw', kwargs={'q': '%s:blahblah' % h}), - call('content-filetype', kwargs={'q': '%s:blahblah' % h}), - call('content-language', kwargs={'q': '%s:blahblah' % h}), - call('content-license', kwargs={'q': '%s:blahblah' % h}), + call('content-raw', kwargs={'q': query_string}), + call('content-filetype', kwargs={'q': query_string}), + call('content-language', kwargs={'q': query_string}), + call('content-license', kwargs={'q': query_string}), ]) mock_django_reverse.reset() @patch('swh.web.api.utils.reverse') @istest def enrich_content_with_hashes_and_top_level_url(self, mock_django_reverse): - for h in ['sha1', 'sha256', 'sha1_git']: + + for algo, hash in self.sample_content_hashes.items(): + + query_string = '%s:%s' % (algo, hash) + # given mock_django_reverse.side_effect = [ - '/api/content/%s:123/' % h, - '/api/content/%s:123/raw/' % h, - '/api/filetype/%s:123/' % h, - '/api/language/%s:123/' % h, - '/api/license/%s:123/' % h, + '/api/content/%s/' % query_string, + '/api/content/%s/raw/' % query_string, + '/api/filetype/%s/' % query_string, + '/api/language/%s/' % query_string, + '/api/license/%s/' % query_string, ] # when enriched_content = utils.enrich_content( { - 'id': '123', - h: 'blahblah' + algo: hash }, - top_url=True + top_url=True, + query_string=query_string ) # then self.assertEqual( enriched_content, { - 'id': '123', - h: 'blahblah', - 'content_url': '/api/content/%s:123/' % h, - 'data_url': '/api/content/%s:123/raw/' % h, - 'filetype_url': '/api/filetype/%s:123/' % h, - 'language_url': '/api/language/%s:123/' % h, - 'license_url': '/api/license/%s:123/' % h, + algo: hash, + 'content_url': '/api/content/%s/' % query_string, + 'data_url': '/api/content/%s/raw/' % query_string, + 'filetype_url': '/api/filetype/%s/' % query_string, + 'language_url': '/api/language/%s/' % query_string, + 'license_url': '/api/license/%s/' % query_string, } ) mock_django_reverse.assert_has_calls([ - call('content', kwargs={'q': '%s:blahblah' % h}), - call('content-raw', kwargs={'q': '%s:blahblah' % h}), - call('content-filetype', kwargs={'q': '%s:blahblah' % h}), - call('content-language', kwargs={'q': '%s:blahblah' % h}), - call('content-license', kwargs={'q': '%s:blahblah' % h}), + call('content', kwargs={'q': query_string}), + call('content-raw', kwargs={'q': query_string}), + call('content-filetype', kwargs={'q': query_string}), + call('content-language', kwargs={'q': query_string}), + call('content-license', kwargs={'q': query_string}), ]) mock_django_reverse.reset() @istest def enrich_entity_identity(self): # when/then self.assertEqual(utils.enrich_content({'id': '123'}), {'id': '123'}) @patch('swh.web.api.utils.reverse') @istest def enrich_entity_with_sha1(self, mock_django_reverse): # given def reverse_test(view_name, kwargs): return '/api/entity/' + kwargs['uuid'] + '/' mock_django_reverse.side_effect = reverse_test # when actual_entity = utils.enrich_entity({ 'uuid': 'uuid-1', 'parent': 'uuid-parent', 'name': 'something' }) # then self.assertEqual(actual_entity, { 'uuid': 'uuid-1', 'uuid_url': '/api/entity/uuid-1/', 'parent': 'uuid-parent', 'parent_url': '/api/entity/uuid-parent/', 'name': 'something', }) mock_django_reverse.assert_has_calls( [call('entity', kwargs={'uuid': 'uuid-1'}), call('entity', kwargs={'uuid': 'uuid-parent'})]) @nottest def _reverse_context_test(self, view_name, kwargs): if view_name == 'revision': return '/api/revision/%s/' % kwargs['sha1_git'] elif view_name == 'revision-context': return '/api/revision/%s/prev/%s/' % (kwargs['sha1_git'], kwargs['context']) # noqa elif view_name == 'revision-log': if 'prev_sha1s' in kwargs: return '/api/revision/%s/prev/%s/log/' % (kwargs['sha1_git'], kwargs['prev_sha1s']) # noqa else: return '/api/revision/%s/log/' % kwargs['sha1_git'] @patch('swh.web.api.utils.reverse') @istest def enrich_revision_without_children_or_parent(self, mock_django_reverse): # given def reverse_test(view_name, kwargs): if view_name == 'revision': return '/api/revision/' + kwargs['sha1_git'] + '/' elif view_name == 'revision-log': return '/api/revision/' + kwargs['sha1_git'] + '/log/' elif view_name == 'directory': return '/api/directory/' + kwargs['sha1_git'] + '/' elif view_name == 'person': return '/api/person/' + kwargs['person_id'] + '/' mock_django_reverse.side_effect = reverse_test # when actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'directory': '123', 'author': {'id': '1'}, 'committer': {'id': '2'}, }) expected_revision = { 'id': 'rev-id', 'directory': '123', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'directory_url': '/api/directory/123/', 'author': {'id': '1'}, 'author_url': '/api/person/1/', 'committer': {'id': '2'}, 'committer_url': '/api/person/2/' } # then self.assertEqual(actual_revision, expected_revision) mock_django_reverse.assert_has_calls( [call('revision', kwargs={'sha1_git': 'rev-id'}), call('revision-log', kwargs={'sha1_git': 'rev-id'}), call('person', kwargs={'person_id': '1'}), call('person', kwargs={'person_id': '2'}), call('directory', kwargs={'sha1_git': '123'})]) @patch('swh.web.api.utils.reverse') @istest def enrich_revision_with_children_and_parent_no_dir(self, mock_django_reverse): # given mock_django_reverse.side_effect = self._reverse_context_test # when actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'parents': ['123'], 'children': ['456'], }, context='prev-rev') expected_revision = { 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'history_context_url': '/api/revision/rev-id/prev/prev-rev/log/', 'parents': [{'id': '123', 'url': '/api/revision/123/'}], 'children': ['456'], 'children_urls': ['/api/revision/456/', '/api/revision/prev-rev/'], } # then self.assertEqual(actual_revision, expected_revision) mock_django_reverse.assert_has_calls( [call('revision', kwargs={'sha1_git': 'prev-rev'}), call('revision', kwargs={'sha1_git': 'rev-id'}), call('revision-log', kwargs={'sha1_git': 'rev-id'}), call('revision-log', kwargs={'sha1_git': 'rev-id', 'prev_sha1s': 'prev-rev'}), call('revision', kwargs={'sha1_git': '123'}), call('revision', kwargs={'sha1_git': '456'})]) @patch('swh.web.api.utils.reverse') @istest def enrich_revision_no_context(self, mock_django_reverse): # given mock_django_reverse.side_effect = self._reverse_context_test # when actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'parents': ['123'], 'children': ['456'], }) expected_revision = { 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'parents': [{'id': '123', 'url': '/api/revision/123/'}], 'children': ['456'], 'children_urls': ['/api/revision/456/'] } # then self.assertEqual(actual_revision, expected_revision) mock_django_reverse.assert_has_calls( [call('revision', kwargs={'sha1_git': 'rev-id'}), call('revision-log', kwargs={'sha1_git': 'rev-id'}), call('revision', kwargs={'sha1_git': '123'}), call('revision', kwargs={'sha1_git': '456'})]) @patch('swh.web.api.utils.reverse') @istest def enrich_revision_context_empty_prev_list(self, mock_django_reverse): # given mock_django_reverse.side_effect = self._reverse_context_test # when expected_revision = { 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'history_context_url': ('/api/revision/rev-id/' 'prev/prev-rev/log/'), 'parents': [{'id': '123', 'url': '/api/revision/123/'}], 'children': ['456'], 'children_urls': ['/api/revision/456/', '/api/revision/prev-rev/'], } actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'parents': ['123'], 'children': ['456']}, context='prev-rev') # then self.assertEqual(actual_revision, expected_revision) mock_django_reverse.assert_has_calls( [call('revision', kwargs={'sha1_git': 'prev-rev'}), call('revision', kwargs={'sha1_git': 'rev-id'}), call('revision-log', kwargs={'sha1_git': 'rev-id'}), call('revision-log', kwargs={'sha1_git': 'rev-id', 'prev_sha1s': 'prev-rev'}), call('revision', kwargs={'sha1_git': '123'}), call('revision', kwargs={'sha1_git': '456'})]) @patch('swh.web.api.utils.reverse') @istest def enrich_revision_context_some_prev_list(self, mock_django_reverse): # given mock_django_reverse.side_effect = self._reverse_context_test # when expected_revision = { 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'history_context_url': ('/api/revision/rev-id/' 'prev/prev1-rev/prev0-rev/log/'), 'parents': [{'id': '123', 'url': '/api/revision/123/'}], 'children': ['456'], 'children_urls': ['/api/revision/456/', '/api/revision/prev0-rev/prev/prev1-rev/'], } actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'parents': ['123'], 'children': ['456']}, context='prev1-rev/prev0-rev') # then self.assertEqual(actual_revision, expected_revision) mock_django_reverse.assert_has_calls( [call('revision-context', kwargs={'context': 'prev1-rev', 'sha1_git': 'prev0-rev'}), call('revision', kwargs={'sha1_git': 'rev-id'}), call('revision-log', kwargs={'sha1_git': 'rev-id'}), call('revision-log', kwargs={'prev_sha1s': 'prev1-rev/prev0-rev', 'sha1_git': 'rev-id'}), call('revision', kwargs={'sha1_git': '123'}), call('revision', kwargs={'sha1_git': '456'})]) @nottest def _reverse_rev_message_test(self, view_name, kwargs): if view_name == 'revision': return '/api/revision/%s/' % kwargs['sha1_git'] elif view_name == 'revision-log': if 'prev_sha1s' in kwargs and kwargs['prev_sha1s'] is not None: return '/api/revision/%s/prev/%s/log/' % (kwargs['sha1_git'], kwargs['prev_sha1s']) # noqa else: return '/api/revision/%s/log/' % kwargs['sha1_git'] elif view_name == 'revision-raw-message': return '/api/revision/' + kwargs['sha1_git'] + '/raw/' else: return '/api/revision/%s/prev/%s/' % (kwargs['sha1_git'], kwargs['context']) # noqa @patch('swh.web.api.utils.reverse') @istest def enrich_revision_with_no_message(self, mock_django_reverse): # given mock_django_reverse.side_effect = self._reverse_rev_message_test # when expected_revision = { 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'history_context_url': ('/api/revision/rev-id/' 'prev/prev-rev/log/'), 'message': None, 'parents': [{'id': '123', 'url': '/api/revision/123/'}], 'children': ['456'], 'children_urls': ['/api/revision/456/', '/api/revision/prev-rev/'], } actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'message': None, 'parents': ['123'], 'children': ['456'], }, context='prev-rev') # then self.assertEqual(actual_revision, expected_revision) mock_django_reverse.assert_has_calls( [call('revision', kwargs={'sha1_git': 'prev-rev'}), call('revision', kwargs={'sha1_git': 'rev-id'}), call('revision-log', kwargs={'sha1_git': 'rev-id'}), call('revision-log', kwargs={'sha1_git': 'rev-id', 'prev_sha1s': 'prev-rev'}), call('revision', kwargs={'sha1_git': '123'}), call('revision', kwargs={'sha1_git': '456'})] ) @patch('swh.web.api.utils.reverse') @istest def enrich_revision_with_invalid_message(self, mock_django_reverse): # given mock_django_reverse.side_effect = self._reverse_rev_message_test # when actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'message': None, 'message_decoding_failed': True, 'parents': ['123'], 'children': ['456'], }, context='prev-rev') expected_revision = { 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'history_context_url': ('/api/revision/rev-id/' 'prev/prev-rev/log/'), 'message': None, 'message_decoding_failed': True, 'message_url': '/api/revision/rev-id/raw/', 'parents': [{'id': '123', 'url': '/api/revision/123/'}], 'children': ['456'], 'children_urls': ['/api/revision/456/', '/api/revision/prev-rev/'], } # then self.assertEqual(actual_revision, expected_revision) mock_django_reverse.assert_has_calls( [call('revision', kwargs={'sha1_git': 'prev-rev'}), call('revision', kwargs={'sha1_git': 'rev-id'}), call('revision-log', kwargs={'sha1_git': 'rev-id'}), call('revision-log', kwargs={'sha1_git': 'rev-id', 'prev_sha1s': 'prev-rev'}), call('revision', kwargs={'sha1_git': '123'}), call('revision', kwargs={'sha1_git': '456'}), call('revision-raw-message', kwargs={'sha1_git': 'rev-id'})]) diff --git a/swh/web/tests/api/views/test_content.py b/swh/web/tests/api/views/test_content.py index 518ee813..338fbe2c 100644 --- a/swh/web/tests/api/views/test_content.py +++ b/swh/web/tests/api/views/test_content.py @@ -1,722 +1,730 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from nose.tools import istest from unittest.mock import patch, MagicMock from ..swh_api_testcase import SWHApiTestCase class ContentApiTestCase(SWHApiTestCase): @patch('swh.web.api.views.content.service') @istest def api_content_filetype(self, mock_service): stub_filetype = { 'accepted_media_type': 'application/xml', 'encoding': 'ascii', 'id': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', } mock_service.lookup_content_filetype.return_value = stub_filetype # when rv = self.client.get( '/api/1/content/' 'sha1_git:b04caf10e9535160d90e874b45aa426de762f19f/filetype/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEquals(rv.data, { 'accepted_media_type': 'application/xml', 'encoding': 'ascii', 'id': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'content_url': '/api/1/content/' 'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', }) mock_service.lookup_content_filetype.assert_called_once_with( 'sha1_git:b04caf10e9535160d90e874b45aa426de762f19f') @patch('swh.web.api.views.content.service') @istest def api_content_filetype_sha_not_found(self, mock_service): # given mock_service.lookup_content_filetype.return_value = None # when rv = self.client.get( '/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/' 'filetype/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEquals(rv.data, { 'exception': 'NotFoundExc', 'reason': 'No filetype information found for content ' 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03.' }) mock_service.lookup_content_filetype.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.api.views.content.service') @istest def api_content_language(self, mock_service): stub_language = { 'lang': 'lisp', 'id': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', } mock_service.lookup_content_language.return_value = stub_language # when rv = self.client.get( '/api/1/content/' 'sha1_git:b04caf10e9535160d90e874b45aa426de762f19f/language/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEquals(rv.data, { 'lang': 'lisp', 'id': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'content_url': '/api/1/content/' 'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', }) mock_service.lookup_content_language.assert_called_once_with( 'sha1_git:b04caf10e9535160d90e874b45aa426de762f19f') @patch('swh.web.api.views.content.service') @istest def api_content_language_sha_not_found(self, mock_service): # given mock_service.lookup_content_language.return_value = None # when rv = self.client.get( '/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03' '/language/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEquals(rv.data, { 'exception': 'NotFoundExc', 'reason': 'No language information found for content ' 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03.' }) mock_service.lookup_content_language.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.api.views.content.service') @istest def api_content_symbol(self, mock_service): stub_ctag = [{ 'sha1': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'name': 'foobar', 'kind': 'Haskell', 'line': 10, }] mock_service.lookup_expression.return_value = stub_ctag # when rv = self.client.get('/api/1/content/symbol/foo/?last_sha1=sha1') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEquals(rv.data, [{ 'sha1': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'name': 'foobar', 'kind': 'Haskell', 'line': 10, 'content_url': '/api/1/content/' 'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', 'data_url': '/api/1/content/' 'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/raw/', 'license_url': '/api/1/content/' 'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/license/', 'language_url': '/api/1/content/' 'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/language/', 'filetype_url': '/api/1/content/' 'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/filetype/', }]) self.assertFalse('Link' in rv) mock_service.lookup_expression.assert_called_once_with( 'foo', 'sha1', 10) @patch('swh.web.api.views.content.service') @istest def api_content_symbol_2(self, mock_service): stub_ctag = [{ 'sha1': '12371b8614fcd89ccd17ca2b1d9e66c5b00a6456', 'name': 'foobar', 'kind': 'Haskell', 'line': 10, }, { 'sha1': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6678', 'name': 'foo', 'kind': 'Lisp', 'line': 10, }] mock_service.lookup_expression.return_value = stub_ctag # when rv = self.client.get( '/api/1/content/symbol/foo/?last_sha1=prev-sha1&per_page=2') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEquals(rv.data, stub_ctag) self.assertTrue( rv['Link'] == '; rel="next"' or # noqa rv['Link'] == '; rel="next"' # noqa ) mock_service.lookup_expression.assert_called_once_with( 'foo', 'prev-sha1', 2) @patch('swh.web.api.views.content.service') # @istest def api_content_symbol_3(self, mock_service): stub_ctag = [{ 'sha1': '67891b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'name': 'foo', 'kind': 'variable', 'line': 100, }] mock_service.lookup_expression.return_value = stub_ctag # when rv = self.client.get('/api/1/content/symbol/foo/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEquals(rv.data, [{ 'sha1': '67891b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'name': 'foo', 'kind': 'variable', 'line': 100, 'content_url': '/api/1/content/' 'sha1:67891b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', 'data_url': '/api/1/content/' 'sha1:67891b8614fcd89ccd17ca2b1d9e66c5b00a6d03/raw/', 'license_url': '/api/1/content/' 'sha1:67891b8614fcd89ccd17ca2b1d9e66c5b00a6d03/license/', 'language_url': '/api/1/content/' 'sha1:67891b8614fcd89ccd17ca2b1d9e66c5b00a6d03/language/', 'filetype_url': '/api/1/content/' 'sha1:67891b8614fcd89ccd17ca2b1d9e66c5b00a6d03/filetype/', }]) self.assertFalse(rv.has_header('Link')) mock_service.lookup_expression.assert_called_once_with('foo', None, 10) @patch('swh.web.api.views.content.service') @istest def api_content_symbol_not_found(self, mock_service): # given mock_service.lookup_expression.return_value = [] # when rv = self.client.get('/api/1/content/symbol/bar/?last_sha1=hash') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEquals(rv.data, { 'exception': 'NotFoundExc', 'reason': 'No indexed raw content match expression \'bar\'.' }) self.assertFalse('Link' in rv) mock_service.lookup_expression.assert_called_once_with( 'bar', 'hash', 10) @patch('swh.web.api.views.content.service') @istest def api_content_ctags(self, mock_service): stub_ctags = { 'id': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'ctags': [] } mock_service.lookup_content_ctags.return_value = stub_ctags # when rv = self.client.get( '/api/1/content/' 'sha1_git:b04caf10e9535160d90e874b45aa426de762f19f/ctags/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEquals(rv.data, { 'id': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'ctags': [], 'content_url': '/api/1/content/' 'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', }) mock_service.lookup_content_ctags.assert_called_once_with( 'sha1_git:b04caf10e9535160d90e874b45aa426de762f19f') @patch('swh.web.api.views.content.service') @istest def api_content_license(self, mock_service): stub_license = { 'licenses': ['No_license_found', 'Apache-2.0'], 'id': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'tool_name': 'nomos', } mock_service.lookup_content_license.return_value = stub_license # when rv = self.client.get( '/api/1/content/' 'sha1_git:b04caf10e9535160d90e874b45aa426de762f19f/license/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEquals(rv.data, { 'licenses': ['No_license_found', 'Apache-2.0'], 'id': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'tool_name': 'nomos', 'content_url': '/api/1/content/' 'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', }) mock_service.lookup_content_license.assert_called_once_with( 'sha1_git:b04caf10e9535160d90e874b45aa426de762f19f') @patch('swh.web.api.views.content.service') @istest def api_content_license_sha_not_found(self, mock_service): # given mock_service.lookup_content_license.return_value = None # when rv = self.client.get( '/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/' 'license/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEquals(rv.data, { 'exception': 'NotFoundExc', 'reason': 'No license information found for content ' 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03.' }) mock_service.lookup_content_license.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.api.views.content.service') @istest def api_content_provenance(self, mock_service): stub_provenances = [{ 'origin': 1, 'visit': 2, 'revision': 'b04caf10e9535160d90e874b45aa426de762f19f', 'content': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'path': 'octavio-3.4.0/octave.html/doc_002dS_005fISREG.html' }] mock_service.lookup_content_provenance.return_value = stub_provenances # when rv = self.client.get( '/api/1/content/' 'sha1_git:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/provenance/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEquals(rv.data, [{ 'origin': 1, 'visit': 2, 'origin_url': '/api/1/origin/1/', 'origin_visits_url': '/api/1/origin/1/visits/', 'origin_visit_url': '/api/1/origin/1/visit/2/', 'revision': 'b04caf10e9535160d90e874b45aa426de762f19f', 'revision_url': '/api/1/revision/' 'b04caf10e9535160d90e874b45aa426de762f19f/', 'content': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'content_url': '/api/1/content/' 'sha1_git:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/', 'path': 'octavio-3.4.0/octave.html/doc_002dS_005fISREG.html' }]) mock_service.lookup_content_provenance.assert_called_once_with( 'sha1_git:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.api.views.content.service') @istest def api_content_provenance_sha_not_found(self, mock_service): # given mock_service.lookup_content_provenance.return_value = None # when rv = self.client.get( '/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/' 'provenance/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEquals(rv.data, { 'exception': 'NotFoundExc', 'reason': 'Content with sha1:40e71b8614fcd89ccd17ca2b1d9e6' '6c5b00a6d03 not found.' }) mock_service.lookup_content_provenance.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.api.views.content.service') @istest - def api_content_metadata(self, mock_service): + def test_api_content_metadata(self, mock_service): # given mock_service.lookup_content.return_value = { - 'sha1': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', - 'sha1_git': 'b4e8f472ffcb01a03875b26e462eb568739f6882', - 'sha256': '83c0e67cc80f60caf1fcbec2d84b0ccd7968b3be4735637006560' - 'cde9b067a4f', + 'checksums': { + 'blake2s256': '685395c5dc57cada459364f0946d3dd45bad5f' + 'cbabc1048edb44380f1d31d0aa', + 'sha1': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', + 'sha1_git': 'b4e8f472ffcb01a03875b26e462eb568739f6882', + 'sha256': '83c0e67cc80f60caf1fcbec2d84b0ccd7968b3be47' + '35637006560cde9b067a4f', + }, 'length': 17, 'status': 'visible' } # when rv = self.client.get( '/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEquals(rv.data, { + 'checksums': { + 'blake2s256': '685395c5dc57cada459364f0946d3dd45bad5f' + 'cbabc1048edb44380f1d31d0aa', + 'sha1': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', + 'sha1_git': 'b4e8f472ffcb01a03875b26e462eb568739f6882', + 'sha256': '83c0e67cc80f60caf1fcbec2d84b0ccd7968b3be47' + '35637006560cde9b067a4f', + }, 'data_url': '/api/1/content/' 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/raw/', 'filetype_url': '/api/1/content/' 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/filetype/', 'language_url': '/api/1/content/' 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/language/', 'license_url': '/api/1/content/' 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/license/', - 'sha1': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', - 'sha1_git': 'b4e8f472ffcb01a03875b26e462eb568739f6882', - 'sha256': '83c0e67cc80f60caf1fcbec2d84b0ccd7968b3be4735637006560c' - 'de9b067a4f', 'length': 17, 'status': 'visible' }) mock_service.lookup_content.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.api.views.content.service') @istest def api_content_not_found_as_json(self, mock_service): # given mock_service.lookup_content.return_value = None mock_service.lookup_content_provenance = MagicMock() # when rv = self.client.get( '/api/1/content/sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3' 'be4735637006560c/') self.assertEquals(rv.status_code, 404) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEquals(rv.data, { 'exception': 'NotFoundExc', 'reason': 'Content with sha256:83c0e67cc80f60caf1fcbec2d84b0ccd79' '68b3be4735637006560c not found.' }) mock_service.lookup_content.assert_called_once_with( 'sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3' 'be4735637006560c') mock_service.lookup_content_provenance.called = False @patch('swh.web.api.views.content.service') @istest def api_content_not_found_as_yaml(self, mock_service): # given mock_service.lookup_content.return_value = None mock_service.lookup_content_provenance = MagicMock() # when rv = self.client.get( '/api/1/content/sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3' 'be4735637006560c/', HTTP_ACCEPT='application/yaml') self.assertEquals(rv.status_code, 404) self.assertTrue('application/yaml' in rv['Content-Type']) self.assertEquals(rv.data, { 'exception': 'NotFoundExc', 'reason': 'Content with sha256:83c0e67cc80f60caf1fcbec2d84b0ccd79' '68b3be4735637006560c not found.' }) mock_service.lookup_content.assert_called_once_with( 'sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3' 'be4735637006560c') mock_service.lookup_content_provenance.called = False @patch('swh.web.api.views.content.service') @istest def api_content_raw_ko_not_found(self, mock_service): # given mock_service.lookup_content_raw.return_value = None # when rv = self.client.get( '/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03' '/raw/') self.assertEquals(rv.status_code, 404) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEquals(rv.data, { 'exception': 'NotFoundExc', 'reason': 'Content sha1:40e71b8614fcd89ccd17ca2b1d9e6' '6c5b00a6d03 is not found.' }) mock_service.lookup_content_raw.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.api.views.content.service') @istest def api_content_raw_text(self, mock_service): # given stub_content = {'data': b'some content data'} mock_service.lookup_content_raw.return_value = stub_content mock_service.lookup_content_filetype.return_value = { 'mimetype': 'text/html' } # when rv = self.client.get( '/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03' '/raw/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv['Content-Type'], 'application/octet-stream') self.assertEquals( rv['Content-disposition'], 'attachment; filename=content_sha1_' '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03_raw') self.assertEquals( rv['Content-Type'], 'application/octet-stream') self.assertEquals(rv.content, stub_content['data']) mock_service.lookup_content_raw.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') mock_service.lookup_content_filetype.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.api.views.content.service') @istest def api_content_raw_text_with_filename(self, mock_service): # given stub_content = {'data': b'some content data'} mock_service.lookup_content_raw.return_value = stub_content mock_service.lookup_content_filetype.return_value = { 'mimetype': 'text/html' } # when rv = self.client.get( '/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03' '/raw/?filename=filename.txt') self.assertEquals(rv.status_code, 200) self.assertEquals(rv['Content-Type'], 'application/octet-stream') self.assertEquals( rv['Content-disposition'], 'attachment; filename=filename.txt') self.assertEquals( rv['Content-Type'], 'application/octet-stream') self.assertEquals(rv.content, stub_content['data']) mock_service.lookup_content_raw.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') mock_service.lookup_content_filetype.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.api.views.content.service') @istest def api_content_raw_no_accepted_media_type_text_is_not_available_for_download( # noqa self, mock_service): # given stub_content = {'data': b'some content data'} mock_service.lookup_content_raw.return_value = stub_content mock_service.lookup_content_filetype.return_value = { 'mimetype': 'application/octet-stream' } # when rv = self.client.get( '/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03' '/raw/') self.assertEquals(rv.status_code, 403) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEquals(rv.data, { 'exception': 'ForbiddenExc', 'reason': 'Only textual content is available for download. ' 'Actual content mimetype is application/octet-stream.' }) mock_service.lookup_content_raw.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') mock_service.lookup_content_filetype.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.api.views.content.service') @istest def api_content_raw_no_accepted_media_type_found_so_not_available_for_download( # noqa self, mock_service): # given stub_content = {'data': b'some content data'} mock_service.lookup_content_raw.return_value = stub_content mock_service.lookup_content_filetype.return_value = None # when rv = self.client.get( '/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03' '/raw/') self.assertEquals(rv.status_code, 404) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEquals(rv.data, { 'exception': 'NotFoundExc', 'reason': 'Content sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03 ' 'is not available for download.' }) mock_service.lookup_content_raw.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') mock_service.lookup_content_filetype.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.api.views.content.service') @istest def api_check_content_known(self, mock_service): # given mock_service.lookup_multiple_hashes.return_value = [ {'found': True, 'filename': None, 'sha1': 'sha1:blah'} ] expected_result = { 'search_stats': {'nbfiles': 1, 'pct': 100}, 'search_res': [{'sha1': 'sha1:blah', 'found': True}] } # when rv = self.client.get('/api/1/content/known/sha1:blah/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEquals(rv.data, expected_result) mock_service.lookup_multiple_hashes.assert_called_once_with( [{'filename': None, 'sha1': 'sha1:blah'}]) @patch('swh.web.api.views.content.service') @istest def api_check_content_known_as_yaml(self, mock_service): # given mock_service.lookup_multiple_hashes.return_value = [ {'found': True, 'filename': None, 'sha1': 'sha1:halb'}, {'found': False, 'filename': None, 'sha1': 'sha1_git:hello'} ] expected_result = { 'search_stats': {'nbfiles': 2, 'pct': 50}, 'search_res': [{'sha1': 'sha1:halb', 'found': True}, {'sha1': 'sha1_git:hello', 'found': False}] } # when rv = self.client.get('/api/1/content/known/sha1:halb,sha1_git:hello/', HTTP_ACCEPT='application/yaml') self.assertEquals(rv.status_code, 200) self.assertTrue('application/yaml' in rv['Content-Type']) self.assertEquals(rv.data, expected_result) mock_service.lookup_multiple_hashes.assert_called_once_with( [{'filename': None, 'sha1': 'sha1:halb'}, {'filename': None, 'sha1': 'sha1_git:hello'}]) @patch('swh.web.api.views.content.service') @istest def api_check_content_known_post_as_yaml(self, mock_service): # given stub_result = [{'sha1': '7e62b1fe10c88a3eddbba930b156bee2956b2435', 'found': True}, {'filename': 'filepath', 'sha1': '8e62b1fe10c88a3eddbba930b156bee2956b2435', 'found': True}, {'filename': 'filename', 'sha1': '64025b5d1520c615061842a6ce6a456cad962a3f', 'found': False}] mock_service.lookup_multiple_hashes.return_value = stub_result expected_result = { 'search_stats': {'nbfiles': 3, 'pct': 2/3 * 100}, 'search_res': stub_result } # when rv = self.client.post( '/api/1/content/known/search/', data=dict( q='7e62b1fe10c88a3eddbba930b156bee2956b2435', filepath='8e62b1fe10c88a3eddbba930b156bee2956b2435', filename='64025b5d1520c615061842a6ce6a456cad962a3f'), HTTP_ACCEPT='application/yaml' ) self.assertEquals(rv.status_code, 200) self.assertTrue('application/yaml' in rv['Content-Type']) self.assertEquals(rv.data, expected_result) @patch('swh.web.api.views.content.service') @istest def api_check_content_known_not_found(self, mock_service): # given stub_result = [{'sha1': 'sha1:halb', 'found': False}] mock_service.lookup_multiple_hashes.return_value = stub_result expected_result = { 'search_stats': {'nbfiles': 1, 'pct': 0.0}, 'search_res': stub_result } # when rv = self.client.get('/api/1/content/known/sha1:halb/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEquals(rv.data, expected_result) mock_service.lookup_multiple_hashes.assert_called_once_with( [{'filename': None, 'sha1': 'sha1:halb'}]) diff --git a/swh/web/tests/common/test_converters.py b/swh/web/tests/common/test_converters.py index 85aa99a8..8bf3d5ed 100644 --- a/swh/web/tests/common/test_converters.py +++ b/swh/web/tests/common/test_converters.py @@ -1,743 +1,752 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import unittest from nose.tools import istest from swh.model import hashutil from swh.web.common import converters class ConvertersTestCase(unittest.TestCase): @istest def from_swh(self): some_input = { 'a': 'something', 'b': 'someone', 'c': b'sharp-0.3.4.tgz', 'd': hashutil.hash_to_bytes( 'b04caf10e9535160d90e874b45aa426de762f19f'), 'e': b'sharp.html/doc_002dS_005fISREG.html', 'g': [b'utf-8-to-decode', b'another-one'], 'h': 'something filtered', 'i': {'e': b'something'}, 'j': { 'k': { 'l': [b'bytes thing', b'another thingy', b''], 'n': 'dont care either' }, 'm': 'dont care' }, 'o': 'something', 'p': b'foo', 'q': {'extra-headers': [['a', b'intact']]}, 'w': None, 'r': {'p': 'also intact', 'q': 'bar'}, 's': { 'timestamp': 42, 'offset': -420, 'negative_utc': None, }, 's1': { 'timestamp': {'seconds': 42, 'microseconds': 0}, 'offset': -420, 'negative_utc': None, }, 's2': datetime.datetime( 2013, 7, 1, 20, 0, 0, tzinfo=datetime.timezone.utc), 't': None, 'u': None, 'v': None, 'x': None, } expected_output = { 'a': 'something', 'b': 'someone', 'c': 'sharp-0.3.4.tgz', 'd': 'b04caf10e9535160d90e874b45aa426de762f19f', 'e': 'sharp.html/doc_002dS_005fISREG.html', 'g': ['utf-8-to-decode', 'another-one'], 'i': {'e': 'something'}, 'j': { 'k': { 'l': ['bytes thing', 'another thingy', ''] } }, 'p': 'foo', 'q': {'extra-headers': [['a', 'intact']]}, 'w': {}, 'r': {'p': 'also intact', 'q': 'bar'}, 's': '1969-12-31T17:00:42-07:00', 's1': '1969-12-31T17:00:42-07:00', 's2': '2013-07-01T20:00:00+00:00', 'u': {}, 'v': [], 'x': None, } actual_output = converters.from_swh( some_input, hashess={'d', 'o', 'x'}, bytess={'c', 'e', 'g', 'l'}, dates={'s', 's1', 's2'}, blacklist={'h', 'm', 'n', 'o'}, removables_if_empty={'t'}, empty_dict={'u'}, empty_list={'v'}, convert={'p', 'q', 'w'}, convert_fn=converters.convert_revision_metadata) self.assertEquals(expected_output, actual_output) @istest def from_swh_edge_cases_do_no_conversion_if_none_or_not_bytes(self): some_input = { 'a': 'something', 'b': None, 'c': 'someone', 'd': None, } expected_output = { 'a': 'something', 'b': None, 'c': 'someone', 'd': None, } actual_output = converters.from_swh(some_input, hashess={'a', 'b'}, bytess={'c', 'd'}) self.assertEquals(expected_output, actual_output) @istest def from_swh_edge_cases_convert_invalid_utf8_bytes(self): some_input = { 'a': 'something', 'b': 'someone', 'c': b'a name \xff', 'd': b'an email \xff', } expected_output = { 'a': 'something', 'b': 'someone', 'c': 'a name \\xff', 'd': 'an email \\xff', 'decoding_failures': ['c', 'd'] } actual_output = converters.from_swh(some_input, hashess={'a', 'b'}, bytess={'c', 'd'}) for v in ['a', 'b', 'c', 'd']: self.assertEqual(expected_output[v], actual_output[v]) self.assertEqual(len(expected_output['decoding_failures']), len(actual_output['decoding_failures'])) for v in expected_output['decoding_failures']: self.assertTrue(v in actual_output['decoding_failures']) @istest def from_swh_empty(self): # when self.assertEquals({}, converters.from_swh({})) @istest def from_swh_none(self): # when self.assertIsNone(converters.from_swh(None)) @istest def from_provenance(self): # given input_provenance = { 'origin': 10, 'visit': 1, 'content': hashutil.hash_to_bytes( '321caf10e9535160d90e874b45aa426de762f19f'), 'revision': hashutil.hash_to_bytes( '123caf10e9535160d90e874b45aa426de762f19f'), 'path': b'octave-3.4.0/doc/interpreter/octave/doc_002dS_005fISREG' } expected_provenance = { 'origin': 10, 'visit': 1, 'content': '321caf10e9535160d90e874b45aa426de762f19f', 'revision': '123caf10e9535160d90e874b45aa426de762f19f', 'path': 'octave-3.4.0/doc/interpreter/octave/doc_002dS_005fISREG' } # when actual_provenance = converters.from_provenance(input_provenance) # then self.assertEqual(actual_provenance, expected_provenance) @istest def from_origin(self): # given origin_input = { 'id': 9, 'type': 'ftp', 'url': 'rsync://ftp.gnu.org/gnu/octave', 'project': None, 'lister': None, } expected_origin = { 'id': 9, 'type': 'ftp', 'url': 'rsync://ftp.gnu.org/gnu/octave', } # when actual_origin = converters.from_origin(origin_input) # then self.assertEqual(actual_origin, expected_origin) @istest def from_release(self): release_input = { 'id': hashutil.hash_to_bytes( 'aad23fa492a0c5fed0708a6703be875448c86884'), 'target': hashutil.hash_to_bytes( '5e46d564378afc44b31bb89f99d5675195fbdf67'), 'target_type': 'revision', 'date': { 'timestamp': datetime.datetime( 2015, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': False, }, 'author': { 'name': b'author name', 'fullname': b'Author Name author@email', 'email': b'author@email', }, 'name': b'v0.0.1', 'message': b'some comment on release', 'synthetic': True, } expected_release = { 'id': 'aad23fa492a0c5fed0708a6703be875448c86884', 'target': '5e46d564378afc44b31bb89f99d5675195fbdf67', 'target_type': 'revision', 'date': '2015-01-01T22:00:00+00:00', 'author': { 'name': 'author name', 'fullname': 'Author Name author@email', 'email': 'author@email', }, 'name': 'v0.0.1', 'message': 'some comment on release', 'target_type': 'revision', 'synthetic': True, } # when actual_release = converters.from_release(release_input) # then self.assertEqual(actual_release, expected_release) @istest def from_release_no_revision(self): release_input = { 'id': hashutil.hash_to_bytes( 'b2171ee2bdf119cd99a7ec7eff32fa8013ef9a4e'), 'target': None, 'date': { 'timestamp': datetime.datetime( 2016, 3, 2, 10, 0, 0, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': True, }, 'name': b'v0.1.1', 'message': b'comment on release', 'synthetic': False, 'author': { 'name': b'bob', 'fullname': b'Bob bob@alice.net', 'email': b'bob@alice.net', }, } expected_release = { 'id': 'b2171ee2bdf119cd99a7ec7eff32fa8013ef9a4e', 'target': None, 'date': '2016-03-02T10:00:00-00:00', 'name': 'v0.1.1', 'message': 'comment on release', 'synthetic': False, 'author': { 'name': 'bob', 'fullname': 'Bob bob@alice.net', 'email': 'bob@alice.net', }, } # when actual_release = converters.from_release(release_input) # then self.assertEqual(actual_release, expected_release) @istest def from_revision(self): revision_input = { 'id': hashutil.hash_to_bytes( '18d8be353ed3480476f032475e7c233eff7371d5'), 'directory': hashutil.hash_to_bytes( '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'), 'author': { 'name': b'Software Heritage', 'fullname': b'robot robot@softwareheritage.org', 'email': b'robot@softwareheritage.org', }, 'committer': { 'name': b'Software Heritage', 'fullname': b'robot robot@softwareheritage.org', 'email': b'robot@softwareheritage.org', }, 'message': b'synthetic revision message', 'date': { 'timestamp': datetime.datetime( 2000, 1, 17, 11, 23, 54, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': False, }, 'committer_date': { 'timestamp': datetime.datetime( 2000, 1, 17, 11, 23, 54, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': False, }, 'synthetic': True, 'type': 'tar', 'parents': [ hashutil.hash_to_bytes( '29d8be353ed3480476f032475e7c244eff7371d5'), hashutil.hash_to_bytes( '30d8be353ed3480476f032475e7c244eff7371d5') ], 'children': [ hashutil.hash_to_bytes( '123546353ed3480476f032475e7c244eff7371d5'), ], 'metadata': { 'extra_headers': [['gpgsig', b'some-signature']], 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912', }] }, } expected_revision = { 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author': { 'name': 'Software Heritage', 'fullname': 'robot robot@softwareheritage.org', 'email': 'robot@softwareheritage.org', }, 'committer': { 'name': 'Software Heritage', 'fullname': 'robot robot@softwareheritage.org', 'email': 'robot@softwareheritage.org', }, 'message': 'synthetic revision message', 'date': "2000-01-17T11:23:54+00:00", 'committer_date': "2000-01-17T11:23:54+00:00", 'children': [ '123546353ed3480476f032475e7c244eff7371d5' ], 'parents': [ '29d8be353ed3480476f032475e7c244eff7371d5', '30d8be353ed3480476f032475e7c244eff7371d5' ], 'type': 'tar', 'synthetic': True, 'metadata': { 'extra_headers': [['gpgsig', 'some-signature']], 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912' }] }, 'merge': True } # when actual_revision = converters.from_revision(revision_input) # then self.assertEqual(actual_revision, expected_revision) @istest def from_revision_nomerge(self): revision_input = { 'id': hashutil.hash_to_bytes( '18d8be353ed3480476f032475e7c233eff7371d5'), 'parents': [ hashutil.hash_to_bytes( '29d8be353ed3480476f032475e7c244eff7371d5') ] } expected_revision = { 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'parents': [ '29d8be353ed3480476f032475e7c244eff7371d5' ], 'merge': False } # when actual_revision = converters.from_revision(revision_input) # then self.assertEqual(actual_revision, expected_revision) @istest def from_revision_noparents(self): revision_input = { 'id': hashutil.hash_to_bytes( '18d8be353ed3480476f032475e7c233eff7371d5'), 'directory': hashutil.hash_to_bytes( '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'), 'author': { 'name': b'Software Heritage', 'fullname': b'robot robot@softwareheritage.org', 'email': b'robot@softwareheritage.org', }, 'committer': { 'name': b'Software Heritage', 'fullname': b'robot robot@softwareheritage.org', 'email': b'robot@softwareheritage.org', }, 'message': b'synthetic revision message', 'date': { 'timestamp': datetime.datetime( 2000, 1, 17, 11, 23, 54, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': False, }, 'committer_date': { 'timestamp': datetime.datetime( 2000, 1, 17, 11, 23, 54, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': False, }, 'synthetic': True, 'type': 'tar', 'children': [ hashutil.hash_to_bytes( '123546353ed3480476f032475e7c244eff7371d5'), ], 'metadata': { 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912', }] }, } expected_revision = { 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author': { 'name': 'Software Heritage', 'fullname': 'robot robot@softwareheritage.org', 'email': 'robot@softwareheritage.org', }, 'committer': { 'name': 'Software Heritage', 'fullname': 'robot robot@softwareheritage.org', 'email': 'robot@softwareheritage.org', }, 'message': 'synthetic revision message', 'date': "2000-01-17T11:23:54+00:00", 'committer_date': "2000-01-17T11:23:54+00:00", 'children': [ '123546353ed3480476f032475e7c244eff7371d5' ], 'type': 'tar', 'synthetic': True, 'metadata': { 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912' }] } } # when actual_revision = converters.from_revision(revision_input) # then self.assertEqual(actual_revision, expected_revision) @istest def from_revision_invalid(self): revision_input = { 'id': hashutil.hash_to_bytes( '18d8be353ed3480476f032475e7c233eff7371d5'), 'directory': hashutil.hash_to_bytes( '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'), 'author': { 'name': b'Software Heritage', 'fullname': b'robot robot@softwareheritage.org', 'email': b'robot@softwareheritage.org', }, 'committer': { 'name': b'Software Heritage', 'fullname': b'robot robot@softwareheritage.org', 'email': b'robot@softwareheritage.org', }, 'message': b'invalid message \xff', 'date': { 'timestamp': datetime.datetime( 2000, 1, 17, 11, 23, 54, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': False, }, 'committer_date': { 'timestamp': datetime.datetime( 2000, 1, 17, 11, 23, 54, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': False, }, 'synthetic': True, 'type': 'tar', 'parents': [ hashutil.hash_to_bytes( '29d8be353ed3480476f032475e7c244eff7371d5'), hashutil.hash_to_bytes( '30d8be353ed3480476f032475e7c244eff7371d5') ], 'children': [ hashutil.hash_to_bytes( '123546353ed3480476f032475e7c244eff7371d5'), ], 'metadata': { 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912', }] }, } expected_revision = { 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author': { 'name': 'Software Heritage', 'fullname': 'robot robot@softwareheritage.org', 'email': 'robot@softwareheritage.org', }, 'committer': { 'name': 'Software Heritage', 'fullname': 'robot robot@softwareheritage.org', 'email': 'robot@softwareheritage.org', }, 'message': None, 'message_decoding_failed': True, 'date': "2000-01-17T11:23:54+00:00", 'committer_date': "2000-01-17T11:23:54+00:00", 'children': [ '123546353ed3480476f032475e7c244eff7371d5' ], 'parents': [ '29d8be353ed3480476f032475e7c244eff7371d5', '30d8be353ed3480476f032475e7c244eff7371d5' ], 'type': 'tar', 'synthetic': True, 'metadata': { 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912' }] }, 'merge': True } # when actual_revision = converters.from_revision(revision_input) # then self.assertEqual(actual_revision, expected_revision) @istest def from_content_none(self): self.assertIsNone(converters.from_content(None)) @istest def from_content(self): content_input = { 'sha1': hashutil.hash_to_bytes( '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5'), 'sha256': hashutil.hash_to_bytes( '39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926'), 'blake2s256': hashutil.hash_to_bytes( '49007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926'), 'sha1_git': hashutil.hash_to_bytes( '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'), 'ctime': 'something-which-is-filtered-out', 'data': b'data in bytes', 'length': 10, 'status': 'hidden', } # 'status' is filtered expected_content = { - 'sha1': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5', - 'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d274' - '7d3bf96c926', - 'blake2s256': '49007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d2' - '747d3bf96c926', - 'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', + 'checksums': { + 'sha1': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5', + 'sha256': '39007420ca5de7cb3cfc15196335507ee76c98' + '930e7e0afa4d2747d3bf96c926', + 'blake2s256': '49007420ca5de7cb3cfc15196335507ee7' + '6c98930e7e0afa4d2747d3bf96c926', + 'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', + }, 'data': b'data in bytes', 'length': 10, 'status': 'absent', } # when actual_content = converters.from_content(content_input) # then self.assertEqual(actual_content, expected_content) @istest def from_person(self): person_input = { 'id': 10, 'anything': 'else', 'name': b'bob', 'fullname': b'bob bob@alice.net', 'email': b'bob@foo.alice', } expected_person = { 'id': 10, 'anything': 'else', 'name': 'bob', 'fullname': 'bob bob@alice.net', 'email': 'bob@foo.alice', } # when actual_person = converters.from_person(person_input) # then self.assertEqual(actual_person, expected_person) @istest def from_directory_entries(self): dir_entries_input = { 'sha1': hashutil.hash_to_bytes( '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5'), 'sha256': hashutil.hash_to_bytes( '39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926'), 'sha1_git': hashutil.hash_to_bytes( '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'), + 'blake2s256': hashutil.hash_to_bytes( + '685395c5dc57cada459364f0946d3dd45bad5fcbab' + 'c1048edb44380f1d31d0aa'), 'target': hashutil.hash_to_bytes( '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'), 'dir_id': hashutil.hash_to_bytes( '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'), 'name': b'bob', 'type': 10, 'status': 'hidden', } expected_dir_entries = { - 'sha1': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5', - 'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d2747' - 'd3bf96c926', - 'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', + 'checksums': { + 'sha1': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5', + 'sha256': '39007420ca5de7cb3cfc15196335507ee76c98' + '930e7e0afa4d2747d3bf96c926', + 'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', + 'blake2s256': '685395c5dc57cada459364f0946d3dd45bad5f' + 'cbabc1048edb44380f1d31d0aa', + }, 'target': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'dir_id': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'name': 'bob', 'type': 10, 'status': 'absent', } # when actual_dir_entries = converters.from_directory_entry(dir_entries_input) # then self.assertEqual(actual_dir_entries, expected_dir_entries) @istest def from_filetype(self): content_filetype = { 'id': hashutil.hash_to_bytes( '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5'), 'encoding': b'utf-8', 'mimetype': b'text/plain', } expected_content_filetype = { 'id': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5', 'encoding': 'utf-8', 'mimetype': 'text/plain', } # when actual_content_filetype = converters.from_filetype(content_filetype) # then self.assertEqual(actual_content_filetype, expected_content_filetype) diff --git a/swh/web/tests/common/test_service.py b/swh/web/tests/common/test_service.py index f356c415..5bbcb18d 100644 --- a/swh/web/tests/common/test_service.py +++ b/swh/web/tests/common/test_service.py @@ -1,2058 +1,2071 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import unittest from nose.tools import istest from unittest.mock import MagicMock, patch, call from swh.model.hashutil import hash_to_bytes, hash_to_hex from swh.web.common import service from swh.web.common.exc import BadInputExc, NotFoundExc class ServiceTestCase(unittest.TestCase): def setUp(self): - self.SHA1_SAMPLE = '18d8be353ed3480476f032475e7c233eff7371d5' + self.BLAKE2S256_SAMPLE = ('685395c5dc57cada459364f0946d3dd45b' + 'ad5fcbabc1048edb44380f1d31d0aa') + self.BLAKE2S256_SAMPLE_BIN = hash_to_bytes(self.BLAKE2S256_SAMPLE) + self.SHA1_SAMPLE = '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03' self.SHA1_SAMPLE_BIN = hash_to_bytes(self.SHA1_SAMPLE) - self.SHA256_SAMPLE = ('39007420ca5de7cb3cfc15196335507e' - 'e76c98930e7e0afa4d2747d3bf96c926') + self.SHA256_SAMPLE = ('8abb0aa566452620ecce816eecdef4792d77a' + '293ad8ea82a4d5ecb4d36f7e560') self.SHA256_SAMPLE_BIN = hash_to_bytes(self.SHA256_SAMPLE) - self.SHA1GIT_SAMPLE = '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03' + self.SHA1GIT_SAMPLE = '25d1a2e8f32937b0f498a5ca87f823d8df013c01' self.SHA1GIT_SAMPLE_BIN = hash_to_bytes(self.SHA1GIT_SAMPLE) self.DIRECTORY_ID = '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6' self.DIRECTORY_ID_BIN = hash_to_bytes(self.DIRECTORY_ID) self.AUTHOR_ID_BIN = { 'name': b'author', 'email': b'author@company.org', } self.AUTHOR_ID = { 'name': 'author', 'email': 'author@company.org', } self.COMMITTER_ID_BIN = { 'name': b'committer', 'email': b'committer@corp.org', } self.COMMITTER_ID = { 'name': 'committer', 'email': 'committer@corp.org', } self.SAMPLE_DATE_RAW = { 'timestamp': datetime.datetime( 2000, 1, 17, 11, 23, 54, tzinfo=datetime.timezone.utc, ).timestamp(), 'offset': 0, 'negative_utc': False, } self.SAMPLE_DATE = '2000-01-17T11:23:54+00:00' self.SAMPLE_MESSAGE_BIN = b'elegant fix for bug 31415957' self.SAMPLE_MESSAGE = 'elegant fix for bug 31415957' self.SAMPLE_REVISION = { 'id': self.SHA1_SAMPLE, 'directory': self.DIRECTORY_ID, 'author': self.AUTHOR_ID, 'committer': self.COMMITTER_ID, 'message': self.SAMPLE_MESSAGE, 'date': self.SAMPLE_DATE, 'committer_date': self.SAMPLE_DATE, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': {}, 'merge': False } self.SAMPLE_REVISION_RAW = { 'id': self.SHA1_SAMPLE_BIN, 'directory': self.DIRECTORY_ID_BIN, 'author': self.AUTHOR_ID_BIN, 'committer': self.COMMITTER_ID_BIN, 'message': self.SAMPLE_MESSAGE_BIN, 'date': self.SAMPLE_DATE_RAW, 'committer_date': self.SAMPLE_DATE_RAW, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], } self.SAMPLE_CONTENT = { - 'sha1': self.SHA1_SAMPLE, - 'sha256': self.SHA256_SAMPLE, - 'sha1_git': self.SHA1GIT_SAMPLE, + 'checksums': { + 'blake2s256': self.BLAKE2S256_SAMPLE, + 'sha1': self.SHA1_SAMPLE, + 'sha256': self.SHA256_SAMPLE, + 'sha1_git': self.SHA1GIT_SAMPLE, + }, 'length': 190, 'status': 'absent' } self.SAMPLE_CONTENT_RAW = { + 'blake2s256': self.BLAKE2S256_SAMPLE_BIN, 'sha1': self.SHA1_SAMPLE_BIN, 'sha256': self.SHA256_SAMPLE_BIN, 'sha1_git': self.SHA1GIT_SAMPLE_BIN, 'length': 190, 'status': 'hidden' } self.date_origin_visit1 = datetime.datetime( 2015, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc) self.origin_visit1 = { 'date': self.date_origin_visit1, 'origin': 1, 'visit': 1 } @patch('swh.web.common.service.storage') @istest def lookup_multiple_hashes_ball_missing(self, mock_storage): # given mock_storage.content_missing_per_sha1 = MagicMock(return_value=[]) # when actual_lookup = service.lookup_multiple_hashes( [{'filename': 'a', 'sha1': '456caf10e9535160d90e874b45aa426de762f19f'}, {'filename': 'b', 'sha1': '745bab676c8f3cec8016e0c39ea61cf57e518865'}]) # then self.assertEquals(actual_lookup, [ {'filename': 'a', 'sha1': '456caf10e9535160d90e874b45aa426de762f19f', 'found': True}, {'filename': 'b', 'sha1': '745bab676c8f3cec8016e0c39ea61cf57e518865', 'found': True} ]) @patch('swh.web.common.service.storage') @istest def lookup_multiple_hashes_some_missing(self, mock_storage): # given mock_storage.content_missing_per_sha1 = MagicMock(return_value=[ hash_to_bytes('456caf10e9535160d90e874b45aa426de762f19f') ]) # when actual_lookup = service.lookup_multiple_hashes( [{'filename': 'a', 'sha1': '456caf10e9535160d90e874b45aa426de762f19f'}, {'filename': 'b', 'sha1': '745bab676c8f3cec8016e0c39ea61cf57e518865'}]) # then self.assertEquals(actual_lookup, [ {'filename': 'a', 'sha1': '456caf10e9535160d90e874b45aa426de762f19f', 'found': False}, {'filename': 'b', 'sha1': '745bab676c8f3cec8016e0c39ea61cf57e518865', 'found': True} ]) @patch('swh.web.common.service.storage') @istest def lookup_hash_does_not_exist(self, mock_storage): # given mock_storage.content_find = MagicMock(return_value=None) # when actual_lookup = service.lookup_hash( 'sha1_git:123caf10e9535160d90e874b45aa426de762f19f') # then self.assertEquals({'found': None, 'algo': 'sha1_git'}, actual_lookup) # check the function has been called with parameters mock_storage.content_find.assert_called_with( {'sha1_git': hash_to_bytes('123caf10e9535160d90e874b45aa426de762f19f')}) @patch('swh.web.common.service.storage') @istest def lookup_hash_exist(self, mock_storage): # given stub_content = { 'sha1': hash_to_bytes( '456caf10e9535160d90e874b45aa426de762f19f') } mock_storage.content_find = MagicMock(return_value=stub_content) # when actual_lookup = service.lookup_hash( 'sha1:456caf10e9535160d90e874b45aa426de762f19f') # then self.assertEquals({'found': stub_content, 'algo': 'sha1'}, actual_lookup) mock_storage.content_find.assert_called_with( {'sha1': hash_to_bytes('456caf10e9535160d90e874b45aa426de762f19f')} ) @patch('swh.web.common.service.storage') @istest def search_hash_does_not_exist(self, mock_storage): # given mock_storage.content_find = MagicMock(return_value=None) # when actual_lookup = service.search_hash( 'sha1_git:123caf10e9535160d90e874b45aa426de762f19f') # then self.assertEquals({'found': False}, actual_lookup) # check the function has been called with parameters mock_storage.content_find.assert_called_with( {'sha1_git': hash_to_bytes('123caf10e9535160d90e874b45aa426de762f19f')}) @patch('swh.web.common.service.storage') @istest def search_hash_exist(self, mock_storage): # given stub_content = { 'sha1': hash_to_bytes( '456caf10e9535160d90e874b45aa426de762f19f') } mock_storage.content_find = MagicMock(return_value=stub_content) # when actual_lookup = service.search_hash( 'sha1:456caf10e9535160d90e874b45aa426de762f19f') # then self.assertEquals({'found': True}, actual_lookup) mock_storage.content_find.assert_called_with( {'sha1': hash_to_bytes('456caf10e9535160d90e874b45aa426de762f19f')}, ) @patch('swh.web.common.service.storage') @istest def lookup_content_ctags(self, mock_storage): # given mock_storage.content_ctags_get = MagicMock( return_value=[{ 'id': hash_to_bytes( '123caf10e9535160d90e874b45aa426de762f19f'), 'line': 100, 'name': 'hello', 'kind': 'function', 'tool_name': 'ctags', 'tool_version': 'some-version', }]) expected_ctags = [{ 'id': '123caf10e9535160d90e874b45aa426de762f19f', 'line': 100, 'name': 'hello', 'kind': 'function', 'tool_name': 'ctags', 'tool_version': 'some-version', }] # when actual_ctags = list(service.lookup_content_ctags( 'sha1:123caf10e9535160d90e874b45aa426de762f19f')) # then self.assertEqual(actual_ctags, expected_ctags) mock_storage.content_ctags_get.assert_called_with( [hash_to_bytes('123caf10e9535160d90e874b45aa426de762f19f')]) @patch('swh.web.common.service.storage') @istest def lookup_content_ctags_no_hash(self, mock_storage): # given mock_storage.content_find.return_value = None mock_storage.content_ctags_get = MagicMock( return_value=None) # when actual_ctags = list(service.lookup_content_ctags( 'sha1_git:123caf10e9535160d90e874b45aa426de762f19f')) # then self.assertEqual(actual_ctags, []) mock_storage.content_find.assert_called_once_with( {'sha1_git': hash_to_bytes( '123caf10e9535160d90e874b45aa426de762f19f')}) @patch('swh.web.common.service.storage') @istest def lookup_content_filetype(self, mock_storage): # given mock_storage.content_mimetype_get = MagicMock( return_value=[{ 'id': hash_to_bytes( '123caf10e9535160d90e874b45aa426de762f19f'), 'mimetype': b'text/x-c++', 'encoding': b'us-ascii', }]) expected_filetype = { 'id': '123caf10e9535160d90e874b45aa426de762f19f', 'mimetype': 'text/x-c++', 'encoding': 'us-ascii', } # when actual_filetype = service.lookup_content_filetype( 'sha1:123caf10e9535160d90e874b45aa426de762f19f') # then self.assertEqual(actual_filetype, expected_filetype) mock_storage.content_mimetype_get.assert_called_with( [hash_to_bytes('123caf10e9535160d90e874b45aa426de762f19f')]) @patch('swh.web.common.service.storage') @istest def lookup_content_filetype_2(self, mock_storage): # given mock_storage.content_find = MagicMock( return_value={ 'sha1': hash_to_bytes( '123caf10e9535160d90e874b45aa426de762f19f') } ) mock_storage.content_mimetype_get = MagicMock( return_value=[{ 'id': hash_to_bytes( '123caf10e9535160d90e874b45aa426de762f19f'), 'mimetype': b'text/x-python', 'encoding': b'us-ascii', }] ) expected_filetype = { 'id': '123caf10e9535160d90e874b45aa426de762f19f', 'mimetype': 'text/x-python', 'encoding': 'us-ascii', } # when actual_filetype = service.lookup_content_filetype( 'sha1_git:456caf10e9535160d90e874b45aa426de762f19f') # then self.assertEqual(actual_filetype, expected_filetype) mock_storage.content_find( 'sha1_git', hash_to_bytes( '456caf10e9535160d90e874b45aa426de762f19f') ) mock_storage.content_mimetype_get.assert_called_with( [hash_to_bytes('123caf10e9535160d90e874b45aa426de762f19f')]) @patch('swh.web.common.service.storage') @istest def lookup_content_language(self, mock_storage): # given mock_storage.content_language_get = MagicMock( return_value=[{ 'id': hash_to_bytes( '123caf10e9535160d90e874b45aa426de762f19f'), 'lang': 'python', }]) expected_language = { 'id': '123caf10e9535160d90e874b45aa426de762f19f', 'lang': 'python', } # when actual_language = service.lookup_content_language( 'sha1:123caf10e9535160d90e874b45aa426de762f19f') # then self.assertEqual(actual_language, expected_language) mock_storage.content_language_get.assert_called_with( [hash_to_bytes('123caf10e9535160d90e874b45aa426de762f19f')]) @patch('swh.web.common.service.storage') @istest def lookup_content_language_2(self, mock_storage): # given mock_storage.content_find = MagicMock( return_value={ 'sha1': hash_to_bytes( '123caf10e9535160d90e874b45aa426de762f19f') } ) mock_storage.content_language_get = MagicMock( return_value=[{ 'id': hash_to_bytes( '123caf10e9535160d90e874b45aa426de762f19f'), 'lang': 'haskell', }] ) expected_language = { 'id': '123caf10e9535160d90e874b45aa426de762f19f', 'lang': 'haskell', } # when actual_language = service.lookup_content_language( 'sha1_git:456caf10e9535160d90e874b45aa426de762f19f') # then self.assertEqual(actual_language, expected_language) mock_storage.content_find( 'sha1_git', hash_to_bytes( '456caf10e9535160d90e874b45aa426de762f19f') ) mock_storage.content_language_get.assert_called_with( [hash_to_bytes('123caf10e9535160d90e874b45aa426de762f19f')]) @patch('swh.web.common.service.storage') @istest def lookup_expression(self, mock_storage): # given mock_storage.content_ctags_search = MagicMock( return_value=[{ 'id': hash_to_bytes( '123caf10e9535160d90e874b45aa426de762f19f'), 'name': 'foobar', 'kind': 'variable', 'lang': 'C', 'line': 10 }]) expected_ctags = [{ 'sha1': '123caf10e9535160d90e874b45aa426de762f19f', 'name': 'foobar', 'kind': 'variable', 'lang': 'C', 'line': 10 }] # when actual_ctags = list(service.lookup_expression( 'foobar', last_sha1='hash', per_page=10)) # then self.assertEqual(actual_ctags, expected_ctags) mock_storage.content_ctags_search.assert_called_with( 'foobar', last_sha1='hash', limit=10) @patch('swh.web.common.service.storage') @istest def lookup_expression_no_result(self, mock_storage): # given mock_storage.content_ctags_search = MagicMock( return_value=[]) expected_ctags = [] # when actual_ctags = list(service.lookup_expression( 'barfoo', last_sha1='hash', per_page=10)) # then self.assertEqual(actual_ctags, expected_ctags) mock_storage.content_ctags_search.assert_called_with( 'barfoo', last_sha1='hash', limit=10) @patch('swh.web.common.service.storage') @istest def lookup_content_license(self, mock_storage): # given mock_storage.content_fossology_license_get = MagicMock( return_value=[{ 'id': hash_to_bytes( '123caf10e9535160d90e874b45aa426de762f19f'), 'lang': 'python', }]) expected_license = { 'id': '123caf10e9535160d90e874b45aa426de762f19f', 'lang': 'python', } # when actual_license = service.lookup_content_license( 'sha1:123caf10e9535160d90e874b45aa426de762f19f') # then self.assertEqual(actual_license, expected_license) mock_storage.content_fossology_license_get.assert_called_with( [hash_to_bytes('123caf10e9535160d90e874b45aa426de762f19f')]) @patch('swh.web.common.service.storage') @istest def lookup_content_license_2(self, mock_storage): # given mock_storage.content_find = MagicMock( return_value={ 'sha1': hash_to_bytes( '123caf10e9535160d90e874b45aa426de762f19f') } ) mock_storage.content_fossology_license_get = MagicMock( return_value=[{ 'id': hash_to_bytes( '123caf10e9535160d90e874b45aa426de762f19f'), 'lang': 'haskell', }] ) expected_license = { 'id': '123caf10e9535160d90e874b45aa426de762f19f', 'lang': 'haskell', } # when actual_license = service.lookup_content_license( 'sha1_git:456caf10e9535160d90e874b45aa426de762f19f') # then self.assertEqual(actual_license, expected_license) mock_storage.content_find( 'sha1_git', hash_to_bytes( '456caf10e9535160d90e874b45aa426de762f19f') ) mock_storage.content_fossology_license_get.assert_called_with( [hash_to_bytes('123caf10e9535160d90e874b45aa426de762f19f')]) @patch('swh.web.common.service.storage') @istest def lookup_content_provenance(self, mock_storage): # given mock_storage.content_find_provenance = MagicMock( return_value=(p for p in [{ 'content': hash_to_bytes( '123caf10e9535160d90e874b45aa426de762f19f'), 'revision': hash_to_bytes( '456caf10e9535160d90e874b45aa426de762f19f'), 'origin': 100, 'visit': 1, 'path': b'octavio-3.4.0/octave.html/doc_002dS_005fISREG.html' }])) expected_provenances = [{ 'content': '123caf10e9535160d90e874b45aa426de762f19f', 'revision': '456caf10e9535160d90e874b45aa426de762f19f', 'origin': 100, 'visit': 1, 'path': 'octavio-3.4.0/octave.html/doc_002dS_005fISREG.html' }] # when actual_provenances = service.lookup_content_provenance( 'sha1_git:123caf10e9535160d90e874b45aa426de762f19f') # then self.assertEqual(list(actual_provenances), expected_provenances) mock_storage.content_find_provenance.assert_called_with( {'sha1_git': hash_to_bytes('123caf10e9535160d90e874b45aa426de762f19f')}) @patch('swh.web.common.service.storage') @istest def lookup_content_provenance_not_found(self, mock_storage): # given mock_storage.content_find_provenance = MagicMock(return_value=None) # when actual_provenances = service.lookup_content_provenance( 'sha1_git:456caf10e9535160d90e874b45aa426de762f19f') # then self.assertIsNone(actual_provenances) mock_storage.content_find_provenance.assert_called_with( {'sha1_git': hash_to_bytes('456caf10e9535160d90e874b45aa426de762f19f')}) @patch('swh.web.common.service.storage') @istest def stat_counters(self, mock_storage): # given input_stats = { "content": 1770830, "directory": 211683, "directory_entry_dir": 209167, "directory_entry_file": 1807094, "directory_entry_rev": 0, "entity": 0, "entity_history": 0, "occurrence": 0, "occurrence_history": 19600, "origin": 1096, "person": 0, "release": 8584, "revision": 7792, "revision_history": 0, "skipped_content": 0 } mock_storage.stat_counters = MagicMock(return_value=input_stats) # when actual_stats = service.stat_counters() # then expected_stats = input_stats self.assertEqual(actual_stats, expected_stats) mock_storage.stat_counters.assert_called_with() @patch('swh.web.common.service._lookup_origin_visits') @istest def lookup_origin_visits(self, mock_lookup_visits): # given date_origin_visit2 = datetime.datetime( 2013, 7, 1, 20, 0, 0, tzinfo=datetime.timezone.utc) date_origin_visit3 = datetime.datetime( 2015, 1, 1, 21, 0, 0, tzinfo=datetime.timezone.utc) stub_result = [self.origin_visit1, { 'date': date_origin_visit2, 'origin': 1, 'visit': 2, 'target': hash_to_bytes( '65a55bbdf3629f916219feb3dcc7393ded1bc8db'), 'branch': b'master', 'target_type': 'release', 'metadata': None, }, { 'date': date_origin_visit3, 'origin': 1, 'visit': 3 }] mock_lookup_visits.return_value = stub_result # when expected_origin_visits = [{ 'date': self.origin_visit1['date'].isoformat(), 'origin': self.origin_visit1['origin'], 'visit': self.origin_visit1['visit'] }, { 'date': date_origin_visit2.isoformat(), 'origin': 1, 'visit': 2, 'target': '65a55bbdf3629f916219feb3dcc7393ded1bc8db', 'branch': 'master', 'target_type': 'release', 'metadata': {}, }, { 'date': date_origin_visit3.isoformat(), 'origin': 1, 'visit': 3 }] actual_origin_visits = service.lookup_origin_visits(6) # then self.assertEqual(list(actual_origin_visits), expected_origin_visits) mock_lookup_visits.assert_called_once_with( 6, last_visit=None, limit=10) @patch('swh.web.common.service.storage') @istest def lookup_origin_visit(self, mock_storage): # given stub_result = self.origin_visit1 mock_storage.origin_visit_get_by.return_value = stub_result expected_origin_visit = { 'date': self.origin_visit1['date'].isoformat(), 'origin': self.origin_visit1['origin'], 'visit': self.origin_visit1['visit'] } # when actual_origin_visit = service.lookup_origin_visit(1, 1) # then self.assertEqual(actual_origin_visit, expected_origin_visit) mock_storage.origin_visit_get_by.assert_called_once_with(1, 1) @patch('swh.web.common.service.storage') @istest def lookup_origin(self, mock_storage): # given mock_storage.origin_get = MagicMock(return_value={ 'id': 'origin-id', 'lister': 'uuid-lister', 'project': 'uuid-project', 'url': 'ftp://some/url/to/origin', 'type': 'ftp'}) # when actual_origin = service.lookup_origin({'id': 'origin-id'}) # then self.assertEqual(actual_origin, {'id': 'origin-id', 'lister': 'uuid-lister', 'project': 'uuid-project', 'url': 'ftp://some/url/to/origin', 'type': 'ftp'}) mock_storage.origin_get.assert_called_with({'id': 'origin-id'}) @patch('swh.web.common.service.storage') @istest def lookup_release_ko_id_checksum_not_ok_because_not_a_sha1(self, mock_storage): # given mock_storage.release_get = MagicMock() with self.assertRaises(BadInputExc) as cm: # when service.lookup_release('not-a-sha1') self.assertIn('invalid checksum', cm.exception.args[0]) mock_storage.release_get.called = False @patch('swh.web.common.service.storage') @istest def lookup_release_ko_id_checksum_ok_but_not_a_sha1(self, mock_storage): # given mock_storage.release_get = MagicMock() # when with self.assertRaises(BadInputExc) as cm: service.lookup_release( '13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f62d4daf5' '1aea892abe') self.assertIn('sha1_git supported', cm.exception.args[0]) mock_storage.release_get.called = False @patch('swh.web.common.service.storage') @istest def lookup_directory_with_path_not_found(self, mock_storage): # given mock_storage.lookup_directory_with_path = MagicMock(return_value=None) sha1_git = '65a55bbdf3629f916219feb3dcc7393ded1bc8db' # when actual_directory = mock_storage.lookup_directory_with_path( sha1_git, 'some/path/here') self.assertIsNone(actual_directory) @patch('swh.web.common.service.storage') @istest def lookup_directory_with_path_found(self, mock_storage): # given sha1_git = '65a55bbdf3629f916219feb3dcc7393ded1bc8db' entry = {'id': 'dir-id', 'type': 'dir', 'name': 'some/path/foo'} mock_storage.lookup_directory_with_path = MagicMock(return_value=entry) # when actual_directory = mock_storage.lookup_directory_with_path( sha1_git, 'some/path/here') self.assertEqual(entry, actual_directory) @patch('swh.web.common.service.storage') @istest def lookup_release(self, mock_storage): # given mock_storage.release_get = MagicMock(return_value=[{ 'id': hash_to_bytes('65a55bbdf3629f916219feb3dcc7393ded1bc8db'), 'target': None, 'date': { 'timestamp': datetime.datetime( 2015, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': True, }, 'name': b'v0.0.1', 'message': b'synthetic release', 'synthetic': True, }]) # when actual_release = service.lookup_release( '65a55bbdf3629f916219feb3dcc7393ded1bc8db') # then self.assertEqual(actual_release, { 'id': '65a55bbdf3629f916219feb3dcc7393ded1bc8db', 'target': None, 'date': '2015-01-01T22:00:00-00:00', 'name': 'v0.0.1', 'message': 'synthetic release', 'synthetic': True, }) mock_storage.release_get.assert_called_with( [hash_to_bytes('65a55bbdf3629f916219feb3dcc7393ded1bc8db')]) @istest def lookup_revision_with_context_ko_not_a_sha1_1(self): # given sha1_git = '13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f62d4' \ 'daf51aea892abe' sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db' # when with self.assertRaises(BadInputExc) as cm: service.lookup_revision_with_context(sha1_git_root, sha1_git) self.assertIn('Only sha1_git is supported', cm.exception.args[0]) @istest def lookup_revision_with_context_ko_not_a_sha1_2(self): # given sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db' sha1_git = '13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f6' \ '2d4daf51aea892abe' # when with self.assertRaises(BadInputExc) as cm: service.lookup_revision_with_context(sha1_git_root, sha1_git) self.assertIn('Only sha1_git is supported', cm.exception.args[0]) @patch('swh.web.common.service.storage') @istest def lookup_revision_with_context_ko_sha1_git_does_not_exist( self, mock_storage): # given sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db' sha1_git = '777777bdf3629f916219feb3dcc7393ded1bc8db' sha1_git_bin = hash_to_bytes(sha1_git) mock_storage.revision_get.return_value = None # when with self.assertRaises(NotFoundExc) as cm: service.lookup_revision_with_context(sha1_git_root, sha1_git) self.assertIn('Revision 777777bdf3629f916219feb3dcc7393ded1bc8db' ' not found', cm.exception.args[0]) mock_storage.revision_get.assert_called_once_with( [sha1_git_bin]) @patch('swh.web.common.service.storage') @istest def lookup_revision_with_context_ko_root_sha1_git_does_not_exist( self, mock_storage): # given sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db' sha1_git = '777777bdf3629f916219feb3dcc7393ded1bc8db' sha1_git_root_bin = hash_to_bytes(sha1_git_root) sha1_git_bin = hash_to_bytes(sha1_git) mock_storage.revision_get.side_effect = ['foo', None] # when with self.assertRaises(NotFoundExc) as cm: service.lookup_revision_with_context(sha1_git_root, sha1_git) self.assertIn('Revision 65a55bbdf3629f916219feb3dcc7393ded1bc8db' ' not found', cm.exception.args[0]) mock_storage.revision_get.assert_has_calls([call([sha1_git_bin]), call([sha1_git_root_bin])]) @patch('swh.web.common.service.storage') @patch('swh.web.common.service.query') @istest def lookup_revision_with_context(self, mock_query, mock_storage): # given sha1_git_root = '666' sha1_git = '883' sha1_git_root_bin = b'666' sha1_git_bin = b'883' sha1_git_root_dict = { 'id': sha1_git_root_bin, 'parents': [b'999'], } sha1_git_dict = { 'id': sha1_git_bin, 'parents': [], 'directory': b'278', } stub_revisions = [ sha1_git_root_dict, { 'id': b'999', 'parents': [b'777', b'883', b'888'], }, { 'id': b'777', 'parents': [b'883'], }, sha1_git_dict, { 'id': b'888', 'parents': [b'889'], }, { 'id': b'889', 'parents': [], }, ] # inputs ok mock_query.parse_hash_with_algorithms_or_throws.side_effect = [ ('sha1', sha1_git_bin), ('sha1', sha1_git_root_bin) ] # lookup revision first 883, then 666 (both exists) mock_storage.revision_get.return_value = [ sha1_git_dict, sha1_git_root_dict ] mock_storage.revision_log = MagicMock( return_value=stub_revisions) # when actual_revision = service.lookup_revision_with_context( sha1_git_root, sha1_git) # then self.assertEquals(actual_revision, { 'id': hash_to_hex(sha1_git_bin), 'parents': [], 'children': [hash_to_hex(b'999'), hash_to_hex(b'777')], 'directory': hash_to_hex(b'278'), 'merge': False }) mock_query.parse_hash_with_algorithms_or_throws.assert_has_calls( [call(sha1_git, ['sha1'], 'Only sha1_git is supported.'), call(sha1_git_root, ['sha1'], 'Only sha1_git is supported.')]) mock_storage.revision_log.assert_called_with( [sha1_git_root_bin], 100) @patch('swh.web.common.service.storage') @patch('swh.web.common.service.query') @istest def lookup_revision_with_context_sha1_git_root_already_retrieved_as_dict( self, mock_query, mock_storage): # given sha1_git = '883' sha1_git_root_bin = b'666' sha1_git_bin = b'883' sha1_git_root_dict = { 'id': sha1_git_root_bin, 'parents': [b'999'], } sha1_git_dict = { 'id': sha1_git_bin, 'parents': [], 'directory': b'278', } stub_revisions = [ sha1_git_root_dict, { 'id': b'999', 'parents': [b'777', b'883', b'888'], }, { 'id': b'777', 'parents': [b'883'], }, sha1_git_dict, { 'id': b'888', 'parents': [b'889'], }, { 'id': b'889', 'parents': [], }, ] # inputs ok mock_query.parse_hash_with_algorithms_or_throws.return_value = ( 'sha1', sha1_git_bin) # lookup only on sha1 mock_storage.revision_get.return_value = [sha1_git_dict] mock_storage.revision_log.return_value = stub_revisions # when actual_revision = service.lookup_revision_with_context( {'id': sha1_git_root_bin}, sha1_git) # then self.assertEquals(actual_revision, { 'id': hash_to_hex(sha1_git_bin), 'parents': [], 'children': [hash_to_hex(b'999'), hash_to_hex(b'777')], 'directory': hash_to_hex(b'278'), 'merge': False }) mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with( # noqa sha1_git, ['sha1'], 'Only sha1_git is supported.') mock_storage.revision_get.assert_called_once_with([sha1_git_bin]) mock_storage.revision_log.assert_called_with( [sha1_git_root_bin], 100) @patch('swh.web.common.service.storage') @patch('swh.web.common.service.query') @istest def lookup_directory_with_revision_ko_revision_not_found(self, mock_query, mock_storage): # given mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1', b'123') mock_storage.revision_get.return_value = None # when with self.assertRaises(NotFoundExc) as cm: service.lookup_directory_with_revision('123') self.assertIn('Revision 123 not found', cm.exception.args[0]) mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with ('123', ['sha1'], 'Only sha1_git is supported.') mock_storage.revision_get.assert_called_once_with([b'123']) @patch('swh.web.common.service.storage') @patch('swh.web.common.service.query') @istest def lookup_directory_with_revision_ko_revision_with_path_to_nowhere( self, mock_query, mock_storage): # given mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1', b'123') dir_id = b'dir-id-as-sha1' mock_storage.revision_get.return_value = [{ 'directory': dir_id, }] mock_storage.directory_entry_get_by_path.return_value = None # when with self.assertRaises(NotFoundExc) as cm: service.lookup_directory_with_revision( '123', 'path/to/something/unknown') self.assertIn("Directory/File 'path/to/something/unknown' " + "pointed to by revision 123 not found", cm.exception.args[0]) mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with ('123', ['sha1'], 'Only sha1_git is supported.') mock_storage.revision_get.assert_called_once_with([b'123']) mock_storage.directory_entry_get_by_path.assert_called_once_with( b'dir-id-as-sha1', [b'path', b'to', b'something', b'unknown']) @patch('swh.web.common.service.storage') @patch('swh.web.common.service.query') @istest def lookup_directory_with_revision_ko_type_not_implemented( self, mock_query, mock_storage): # given mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1', b'123') dir_id = b'dir-id-as-sha1' mock_storage.revision_get.return_value = [{ 'directory': dir_id, }] mock_storage.directory_entry_get_by_path.return_value = { 'type': 'rev', 'name': b'some/path/to/rev', 'target': b'456' } stub_content = { 'id': b'12', 'type': 'file' } mock_storage.content_get.return_value = stub_content # when with self.assertRaises(NotImplementedError) as cm: service.lookup_directory_with_revision( '123', 'some/path/to/rev') self.assertIn("Entity of type 'rev' not implemented.", cm.exception.args[0]) # then mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with ('123', ['sha1'], 'Only sha1_git is supported.') mock_storage.revision_get.assert_called_once_with([b'123']) mock_storage.directory_entry_get_by_path.assert_called_once_with( b'dir-id-as-sha1', [b'some', b'path', b'to', b'rev']) @patch('swh.web.common.service.storage') @patch('swh.web.common.service.query') @istest def lookup_directory_with_revision_revision_without_path(self, mock_query, mock_storage): # given mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1', b'123') dir_id = b'dir-id-as-sha1' mock_storage.revision_get.return_value = [{ 'directory': dir_id, }] stub_dir_entries = [{ 'id': b'123', 'type': 'dir' }, { 'id': b'456', 'type': 'file' }] mock_storage.directory_ls.return_value = stub_dir_entries # when actual_directory_entries = service.lookup_directory_with_revision( '123') self.assertEqual(actual_directory_entries['type'], 'dir') self.assertEqual(list(actual_directory_entries['content']), stub_dir_entries) mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with ('123', ['sha1'], 'Only sha1_git is supported.') mock_storage.revision_get.assert_called_once_with([b'123']) mock_storage.directory_ls.assert_called_once_with(dir_id) @patch('swh.web.common.service.storage') @patch('swh.web.common.service.query') @istest def lookup_directory_with_revision_revision_with_path_to_dir(self, mock_query, mock_storage): # given mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1', b'123') dir_id = b'dir-id-as-sha1' mock_storage.revision_get.return_value = [{ 'directory': dir_id, }] stub_dir_entries = [{ 'id': b'12', 'type': 'dir' }, { 'id': b'34', 'type': 'file' }] mock_storage.directory_entry_get_by_path.return_value = { 'type': 'dir', 'name': b'some/path', 'target': b'456' } mock_storage.directory_ls.return_value = stub_dir_entries # when actual_directory_entries = service.lookup_directory_with_revision( '123', 'some/path') self.assertEqual(actual_directory_entries['type'], 'dir') self.assertEqual(actual_directory_entries['revision'], '123') self.assertEqual(actual_directory_entries['path'], 'some/path') self.assertEqual(list(actual_directory_entries['content']), stub_dir_entries) mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with ('123', ['sha1'], 'Only sha1_git is supported.') mock_storage.revision_get.assert_called_once_with([b'123']) mock_storage.directory_entry_get_by_path.assert_called_once_with( dir_id, [b'some', b'path']) mock_storage.directory_ls.assert_called_once_with(b'456') @patch('swh.web.common.service.storage') @patch('swh.web.common.service.query') @istest def lookup_directory_with_revision_revision_with_path_to_file_without_data( self, mock_query, mock_storage): # given mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1', b'123') dir_id = b'dir-id-as-sha1' mock_storage.revision_get.return_value = [{ 'directory': dir_id, }] mock_storage.directory_entry_get_by_path.return_value = { 'type': 'file', 'name': b'some/path/to/file', 'target': b'789' } stub_content = { 'status': 'visible', } mock_storage.content_find.return_value = stub_content # when actual_content = service.lookup_directory_with_revision( '123', 'some/path/to/file') # then self.assertEqual(actual_content, {'type': 'file', 'revision': '123', 'path': 'some/path/to/file', 'content': stub_content}) mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with ('123', ['sha1'], 'Only sha1_git is supported.') mock_storage.revision_get.assert_called_once_with([b'123']) mock_storage.directory_entry_get_by_path.assert_called_once_with( b'dir-id-as-sha1', [b'some', b'path', b'to', b'file']) mock_storage.content_find.assert_called_once_with({'sha1_git': b'789'}) @patch('swh.web.common.service.storage') @patch('swh.web.common.service.query') @istest def lookup_directory_with_revision_revision_with_path_to_file_with_data( self, mock_query, mock_storage): # given mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1', b'123') dir_id = b'dir-id-as-sha1' mock_storage.revision_get.return_value = [{ 'directory': dir_id, }] mock_storage.directory_entry_get_by_path.return_value = { 'type': 'file', 'name': b'some/path/to/file', 'target': b'789' } stub_content = { 'status': 'visible', 'sha1': b'content-sha1' } mock_storage.content_find.return_value = stub_content mock_storage.content_get.return_value = [{ 'sha1': b'content-sha1', 'data': b'some raw data' }] expected_content = { 'status': 'visible', - 'sha1': hash_to_hex(b'content-sha1'), + 'checksums': { + 'sha1': hash_to_hex(b'content-sha1'), + }, 'data': b'some raw data' } # when actual_content = service.lookup_directory_with_revision( '123', 'some/path/to/file', with_data=True) # then self.assertEqual(actual_content, {'type': 'file', 'revision': '123', 'path': 'some/path/to/file', 'content': expected_content}) mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with ('123', ['sha1'], 'Only sha1_git is supported.') mock_storage.revision_get.assert_called_once_with([b'123']) mock_storage.directory_entry_get_by_path.assert_called_once_with( b'dir-id-as-sha1', [b'some', b'path', b'to', b'file']) mock_storage.content_find.assert_called_once_with({'sha1_git': b'789'}) mock_storage.content_get.assert_called_once_with([b'content-sha1']) @patch('swh.web.common.service.storage') @istest def lookup_revision(self, mock_storage): # given mock_storage.revision_get = MagicMock( return_value=[self.SAMPLE_REVISION_RAW]) # when actual_revision = service.lookup_revision( self.SHA1_SAMPLE) # then self.assertEqual(actual_revision, self.SAMPLE_REVISION) mock_storage.revision_get.assert_called_with( [self.SHA1_SAMPLE_BIN]) @patch('swh.web.common.service.storage') @istest def lookup_revision_invalid_msg(self, mock_storage): # given stub_rev = self.SAMPLE_REVISION_RAW stub_rev['message'] = b'elegant fix for bug \xff' expected_revision = self.SAMPLE_REVISION expected_revision['message'] = None expected_revision['message_decoding_failed'] = True mock_storage.revision_get = MagicMock(return_value=[stub_rev]) # when actual_revision = service.lookup_revision( self.SHA1_SAMPLE) # then self.assertEqual(actual_revision, expected_revision) mock_storage.revision_get.assert_called_with( [self.SHA1_SAMPLE_BIN]) @patch('swh.web.common.service.storage') @istest def lookup_revision_msg_ok(self, mock_storage): # given mock_storage.revision_get.return_value = [self.SAMPLE_REVISION_RAW] # when rv = service.lookup_revision_message( self.SHA1_SAMPLE) # then self.assertEquals(rv, {'message': self.SAMPLE_MESSAGE_BIN}) mock_storage.revision_get.assert_called_with( [self.SHA1_SAMPLE_BIN]) @patch('swh.web.common.service.storage') @istest def lookup_revision_msg_absent(self, mock_storage): # given stub_revision = self.SAMPLE_REVISION_RAW del stub_revision['message'] mock_storage.revision_get.return_value = stub_revision # when with self.assertRaises(NotFoundExc) as cm: service.lookup_revision_message( self.SHA1_SAMPLE) # then mock_storage.revision_get.assert_called_with( self.SHA1_SAMPLE_BIN) self.assertEqual(cm.exception.args[0], 'No message for revision ' 'with sha1_git ' '18d8be353ed3480476f032475e7c233eff7371d5.') @patch('swh.web.common.service.storage') @istest def lookup_revision_msg_norev(self, mock_storage): # given mock_storage.revision_get.return_value = None # when with self.assertRaises(NotFoundExc) as cm: service.lookup_revision_message( self.SHA1_SAMPLE) # then mock_storage.revision_get.assert_called_with( self.SHA1_SAMPLE_BIN) self.assertEqual(cm.exception.args[0], 'Revision with sha1_git ' '18d8be353ed3480476f032475e7c233eff7371d5 ' 'not found.') @patch('swh.web.common.service.storage') @istest def lookup_revision_multiple(self, mock_storage): # given sha1 = self.SHA1_SAMPLE sha1_other = 'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc' stub_revisions = [ self.SAMPLE_REVISION_RAW, { 'id': hash_to_bytes(sha1_other), 'directory': 'abcdbe353ed3480476f032475e7c233eff7371d5', 'author': { 'name': b'name', 'email': b'name@surname.org', }, 'committer': { 'name': b'name', 'email': b'name@surname.org', }, 'message': b'ugly fix for bug 42', 'date': { 'timestamp': datetime.datetime( 2000, 1, 12, 5, 23, 54, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': False }, 'date_offset': 0, 'committer_date': { 'timestamp': datetime.datetime( 2000, 1, 12, 5, 23, 54, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': False }, 'committer_date_offset': 0, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], } ] mock_storage.revision_get.return_value = stub_revisions # when actual_revisions = service.lookup_revision_multiple( [sha1, sha1_other]) # then self.assertEqual(list(actual_revisions), [ self.SAMPLE_REVISION, { 'id': sha1_other, 'directory': 'abcdbe353ed3480476f032475e7c233eff7371d5', 'author': { 'name': 'name', 'email': 'name@surname.org', }, 'committer': { 'name': 'name', 'email': 'name@surname.org', }, 'message': 'ugly fix for bug 42', 'date': '2000-01-12T05:23:54+00:00', 'date_offset': 0, 'committer_date': '2000-01-12T05:23:54+00:00', 'committer_date_offset': 0, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': {}, 'merge': False } ]) self.assertEqual( list(mock_storage.revision_get.call_args[0][0]), [hash_to_bytes(sha1), hash_to_bytes(sha1_other)]) @patch('swh.web.common.service.storage') @istest def lookup_revision_multiple_none_found(self, mock_storage): # given sha1_bin = self.SHA1_SAMPLE sha1_other = 'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc' mock_storage.revision_get.return_value = [] # then actual_revisions = service.lookup_revision_multiple( [sha1_bin, sha1_other]) self.assertEqual(list(actual_revisions), []) self.assertEqual( list(mock_storage.revision_get.call_args[0][0]), [hash_to_bytes(self.SHA1_SAMPLE), hash_to_bytes(sha1_other)]) @patch('swh.web.common.service.storage') @istest def lookup_revision_log(self, mock_storage): # given stub_revision_log = [self.SAMPLE_REVISION_RAW] mock_storage.revision_log = MagicMock(return_value=stub_revision_log) # when actual_revision = service.lookup_revision_log( 'abcdbe353ed3480476f032475e7c233eff7371d5', limit=25) # then self.assertEqual(list(actual_revision), [self.SAMPLE_REVISION]) mock_storage.revision_log.assert_called_with( [hash_to_bytes('abcdbe353ed3480476f032475e7c233eff7371d5')], 25) @patch('swh.web.common.service.storage') @istest def lookup_revision_log_by(self, mock_storage): # given stub_revision_log = [self.SAMPLE_REVISION_RAW] mock_storage.revision_log_by = MagicMock( return_value=stub_revision_log) # when actual_log = service.lookup_revision_log_by( 1, 'refs/heads/master', None, limit=100) # then self.assertEqual(list(actual_log), [self.SAMPLE_REVISION]) mock_storage.revision_log_by.assert_called_with( 1, 'refs/heads/master', None, limit=100) @patch('swh.web.common.service.storage') @istest def lookup_revision_log_by_nolog(self, mock_storage): # given mock_storage.revision_log_by = MagicMock(return_value=None) # when res = service.lookup_revision_log_by( 1, 'refs/heads/master', None, limit=100) # then self.assertEquals(res, None) mock_storage.revision_log_by.assert_called_with( 1, 'refs/heads/master', None, limit=100) @patch('swh.web.common.service.storage') @istest def lookup_content_raw_not_found(self, mock_storage): # given mock_storage.content_find = MagicMock(return_value=None) # when actual_content = service.lookup_content_raw( - 'sha1:18d8be353ed3480476f032475e7c233eff7371d5') + 'sha1:' + self.SHA1_SAMPLE) # then self.assertIsNone(actual_content) mock_storage.content_find.assert_called_with( {'sha1': hash_to_bytes(self.SHA1_SAMPLE)}) @patch('swh.web.common.service.storage') @istest def lookup_content_raw(self, mock_storage): # given mock_storage.content_find = MagicMock(return_value={ 'sha1': self.SHA1_SAMPLE, }) mock_storage.content_get = MagicMock(return_value=[{ 'data': b'binary data'}]) # when actual_content = service.lookup_content_raw( 'sha256:%s' % self.SHA256_SAMPLE) # then self.assertEquals(actual_content, {'data': b'binary data'}) mock_storage.content_find.assert_called_once_with( {'sha256': self.SHA256_SAMPLE_BIN}) mock_storage.content_get.assert_called_once_with( [self.SHA1_SAMPLE]) @patch('swh.web.common.service.storage') @istest def lookup_content_not_found(self, mock_storage): # given mock_storage.content_find = MagicMock(return_value=None) # when actual_content = service.lookup_content( 'sha1:%s' % self.SHA1_SAMPLE) # then self.assertIsNone(actual_content) mock_storage.content_find.assert_called_with( {'sha1': self.SHA1_SAMPLE_BIN}) @patch('swh.web.common.service.storage') @istest def lookup_content_with_sha1(self, mock_storage): # given mock_storage.content_find = MagicMock( return_value=self.SAMPLE_CONTENT_RAW) # when actual_content = service.lookup_content( 'sha1:%s' % self.SHA1_SAMPLE) # then self.assertEqual(actual_content, self.SAMPLE_CONTENT) mock_storage.content_find.assert_called_with( {'sha1': hash_to_bytes(self.SHA1_SAMPLE)}) @patch('swh.web.common.service.storage') @istest def lookup_content_with_sha256(self, mock_storage): # given stub_content = self.SAMPLE_CONTENT_RAW stub_content['status'] = 'visible' expected_content = self.SAMPLE_CONTENT expected_content['status'] = 'visible' mock_storage.content_find = MagicMock( return_value=stub_content) # when actual_content = service.lookup_content( 'sha256:%s' % self.SHA256_SAMPLE) # then self.assertEqual(actual_content, expected_content) mock_storage.content_find.assert_called_with( {'sha256': self.SHA256_SAMPLE_BIN}) @patch('swh.web.common.service.storage') @istest def lookup_person(self, mock_storage): # given mock_storage.person_get = MagicMock(return_value=[{ 'id': 'person_id', 'name': b'some_name', 'email': b'some-email', }]) # when actual_person = service.lookup_person('person_id') # then self.assertEqual(actual_person, { 'id': 'person_id', 'name': 'some_name', 'email': 'some-email', }) mock_storage.person_get.assert_called_with(['person_id']) @patch('swh.web.common.service.storage') @istest def lookup_directory_bad_checksum(self, mock_storage): # given mock_storage.directory_ls = MagicMock() # when with self.assertRaises(BadInputExc): service.lookup_directory('directory_id') # then mock_storage.directory_ls.called = False @patch('swh.web.common.service.storage') @patch('swh.web.common.service.query') @istest def lookup_directory_not_found(self, mock_query, mock_storage): # given mock_query.parse_hash_with_algorithms_or_throws.return_value = ( 'sha1', 'directory-id-bin') mock_storage.directory_get.return_value = None # when actual_dir = service.lookup_directory('directory_id') # then self.assertIsNone(actual_dir) mock_query.parse_hash_with_algorithms_or_throws.assert_called_with( 'directory_id', ['sha1'], 'Only sha1_git is supported.') mock_storage.directory_get.assert_called_with(['directory-id-bin']) mock_storage.directory_ls.called = False @patch('swh.web.common.service.storage') @patch('swh.web.common.service.query') @istest def lookup_directory(self, mock_query, mock_storage): mock_query.parse_hash_with_algorithms_or_throws.return_value = ( 'sha1', 'directory-sha1-bin') # something that exists is all that matters here mock_storage.directory_get.return_value = {'id': b'directory-sha1-bin'} # given stub_dir_entries = [{ 'sha1': self.SHA1_SAMPLE_BIN, 'sha256': self.SHA256_SAMPLE_BIN, 'sha1_git': self.SHA1GIT_SAMPLE_BIN, + 'blake2s256': self.BLAKE2S256_SAMPLE_BIN, 'target': hash_to_bytes( '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'), 'dir_id': self.DIRECTORY_ID_BIN, 'name': b'bob', 'type': 10, }] expected_dir_entries = [{ - 'sha1': self.SHA1_SAMPLE, - 'sha256': self.SHA256_SAMPLE, - 'sha1_git': self.SHA1GIT_SAMPLE, + 'checksums': { + 'sha1': self.SHA1_SAMPLE, + 'sha256': self.SHA256_SAMPLE, + 'sha1_git': self.SHA1GIT_SAMPLE, + 'blake2s256': self.BLAKE2S256_SAMPLE + }, 'target': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'dir_id': self.DIRECTORY_ID, 'name': 'bob', 'type': 10, }] mock_storage.directory_ls.return_value = stub_dir_entries # when actual_directory_ls = list(service.lookup_directory( 'directory-sha1')) # then self.assertEqual(actual_directory_ls, expected_dir_entries) mock_query.parse_hash_with_algorithms_or_throws.assert_called_with( 'directory-sha1', ['sha1'], 'Only sha1_git is supported.') mock_storage.directory_ls.assert_called_with( 'directory-sha1-bin') @patch('swh.web.common.service.storage') @istest def lookup_revision_by_nothing_found(self, mock_storage): # given mock_storage.revision_get_by.return_value = None # when actual_revisions = service.lookup_revision_by(1) # then self.assertIsNone(actual_revisions) mock_storage.revision_get_by.assert_called_with(1, 'refs/heads/master', limit=1, timestamp=None) @patch('swh.web.common.service.storage') @istest def lookup_revision_by(self, mock_storage): # given stub_rev = self.SAMPLE_REVISION_RAW expected_rev = self.SAMPLE_REVISION mock_storage.revision_get_by.return_value = [stub_rev] # when actual_revision = service.lookup_revision_by(10, 'master2', 'some-ts') # then self.assertEquals(actual_revision, expected_rev) mock_storage.revision_get_by.assert_called_with(10, 'master2', limit=1, timestamp='some-ts') @patch('swh.web.common.service.storage') @istest def lookup_revision_by_nomerge(self, mock_storage): # given stub_rev = self.SAMPLE_REVISION_RAW stub_rev['parents'] = [ hash_to_bytes('adc83b19e793491b1c6ea0fd8b46cd9f32e592fc')] expected_rev = self.SAMPLE_REVISION expected_rev['parents'] = ['adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'] mock_storage.revision_get_by.return_value = [stub_rev] # when actual_revision = service.lookup_revision_by(10, 'master2', 'some-ts') # then self.assertEquals(actual_revision, expected_rev) mock_storage.revision_get_by.assert_called_with(10, 'master2', limit=1, timestamp='some-ts') @patch('swh.web.common.service.storage') @istest def lookup_revision_by_merge(self, mock_storage): # given stub_rev = self.SAMPLE_REVISION_RAW stub_rev['parents'] = [ hash_to_bytes('adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'), hash_to_bytes('ffff3b19e793491b1c6db0fd8b46cd9f32e592fc') ] expected_rev = self.SAMPLE_REVISION expected_rev['parents'] = [ 'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc', 'ffff3b19e793491b1c6db0fd8b46cd9f32e592fc' ] expected_rev['merge'] = True mock_storage.revision_get_by.return_value = [stub_rev] # when actual_revision = service.lookup_revision_by(10, 'master2', 'some-ts') # then self.assertEquals(actual_revision, expected_rev) mock_storage.revision_get_by.assert_called_with(10, 'master2', limit=1, timestamp='some-ts') @patch('swh.web.common.service.storage') @istest def lookup_revision_with_context_by_ko(self, mock_storage): # given mock_storage.revision_get_by.return_value = None # when with self.assertRaises(NotFoundExc) as cm: origin_id = 1 branch_name = 'master3' ts = None service.lookup_revision_with_context_by(origin_id, branch_name, ts, 'sha1') # then self.assertIn( 'Revision with (origin_id: %s, branch_name: %s' ', ts: %s) not found.' % (origin_id, branch_name, ts), cm.exception.args[0]) mock_storage.revision_get_by.assert_called_once_with( origin_id, branch_name, ts) @patch('swh.web.common.service.lookup_revision_with_context') @patch('swh.web.common.service.storage') @istest def lookup_revision_with_context_by(self, mock_storage, mock_lookup_revision_with_context): # given stub_root_rev = {'id': 'root-rev-id'} mock_storage.revision_get_by.return_value = [{'id': 'root-rev-id'}] stub_rev = {'id': 'rev-found'} mock_lookup_revision_with_context.return_value = stub_rev # when origin_id = 1 branch_name = 'master3' ts = None sha1_git = 'sha1' actual_root_rev, actual_rev = service.lookup_revision_with_context_by( origin_id, branch_name, ts, sha1_git) # then self.assertEquals(actual_root_rev, stub_root_rev) self.assertEquals(actual_rev, stub_rev) mock_storage.revision_get_by.assert_called_once_with( origin_id, branch_name, limit=1, timestamp=ts) mock_lookup_revision_with_context.assert_called_once_with( stub_root_rev, sha1_git, 100) @patch('swh.web.common.service.storage') @patch('swh.web.common.service.query') @istest def lookup_entity_by_uuid(self, mock_query, mock_storage): # given uuid_test = 'correct-uuid' mock_query.parse_uuid4.return_value = uuid_test stub_entities = [{'uuid': uuid_test}] mock_storage.entity_get.return_value = stub_entities # when actual_entities = list(service.lookup_entity_by_uuid(uuid_test)) # then self.assertEquals(actual_entities, stub_entities) mock_query.parse_uuid4.assert_called_once_with(uuid_test) mock_storage.entity_get.assert_called_once_with(uuid_test) @istest def lookup_revision_through_ko_not_implemented(self): # then with self.assertRaises(NotImplementedError): service.lookup_revision_through({ 'something-unknown': 10, }) @patch('swh.web.common.service.lookup_revision_with_context_by') @istest def lookup_revision_through_with_context_by(self, mock_lookup): # given stub_rev = {'id': 'rev'} mock_lookup.return_value = stub_rev # when actual_revision = service.lookup_revision_through({ 'origin_id': 1, 'branch_name': 'master', 'ts': None, 'sha1_git': 'sha1-git' }, limit=1000) # then self.assertEquals(actual_revision, stub_rev) mock_lookup.assert_called_once_with( 1, 'master', None, 'sha1-git', 1000) @patch('swh.web.common.service.lookup_revision_by') @istest def lookup_revision_through_with_revision_by(self, mock_lookup): # given stub_rev = {'id': 'rev'} mock_lookup.return_value = stub_rev # when actual_revision = service.lookup_revision_through({ 'origin_id': 2, 'branch_name': 'master2', 'ts': 'some-ts', }, limit=10) # then self.assertEquals(actual_revision, stub_rev) mock_lookup.assert_called_once_with( 2, 'master2', 'some-ts') @patch('swh.web.common.service.lookup_revision_with_context') @istest def lookup_revision_through_with_context(self, mock_lookup): # given stub_rev = {'id': 'rev'} mock_lookup.return_value = stub_rev # when actual_revision = service.lookup_revision_through({ 'sha1_git_root': 'some-sha1-root', 'sha1_git': 'some-sha1', }) # then self.assertEquals(actual_revision, stub_rev) mock_lookup.assert_called_once_with( 'some-sha1-root', 'some-sha1', 100) @patch('swh.web.common.service.lookup_revision') @istest def lookup_revision_through_with_revision(self, mock_lookup): # given stub_rev = {'id': 'rev'} mock_lookup.return_value = stub_rev # when actual_revision = service.lookup_revision_through({ 'sha1_git': 'some-sha1', }) # then self.assertEquals(actual_revision, stub_rev) mock_lookup.assert_called_once_with( 'some-sha1') @patch('swh.web.common.service.lookup_revision_through') @istest def lookup_directory_through_revision_ko_not_found( self, mock_lookup_rev): # given mock_lookup_rev.return_value = None # when with self.assertRaises(NotFoundExc): service.lookup_directory_through_revision( {'id': 'rev'}, 'some/path', 100) mock_lookup_rev.assert_called_once_with({'id': 'rev'}, 100) @patch('swh.web.common.service.lookup_revision_through') @patch('swh.web.common.service.lookup_directory_with_revision') @istest def lookup_directory_through_revision_ok_with_data( self, mock_lookup_dir, mock_lookup_rev): # given mock_lookup_rev.return_value = {'id': 'rev-id'} mock_lookup_dir.return_value = {'type': 'dir', 'content': []} # when rev_id, dir_result = service.lookup_directory_through_revision( {'id': 'rev'}, 'some/path', 100) # then self.assertEquals(rev_id, 'rev-id') self.assertEquals(dir_result, {'type': 'dir', 'content': []}) mock_lookup_rev.assert_called_once_with({'id': 'rev'}, 100) mock_lookup_dir.assert_called_once_with('rev-id', 'some/path', False) @patch('swh.web.common.service.lookup_revision_through') @patch('swh.web.common.service.lookup_directory_with_revision') @istest def lookup_directory_through_revision_ok_with_content( self, mock_lookup_dir, mock_lookup_rev): # given mock_lookup_rev.return_value = {'id': 'rev-id'} stub_result = {'type': 'file', 'revision': 'rev-id', 'content': {'data': b'blah', 'sha1': 'sha1'}} mock_lookup_dir.return_value = stub_result # when rev_id, dir_result = service.lookup_directory_through_revision( {'id': 'rev'}, 'some/path', 10, with_data=True) # then self.assertEquals(rev_id, 'rev-id') self.assertEquals(dir_result, stub_result) mock_lookup_rev.assert_called_once_with({'id': 'rev'}, 10) mock_lookup_dir.assert_called_once_with('rev-id', 'some/path', True)