diff --git a/swh/web/api/apidoc.py b/swh/web/api/apidoc.py index 8f51818e..c08ed00f 100644 --- a/swh/web/api/apidoc.py +++ b/swh/web/api/apidoc.py @@ -1,305 +1,305 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import re from collections import defaultdict from functools import wraps from enum import Enum from rest_framework.decorators import api_view -from swh.web.api.utils import reverse +from swh.web.common.utils import reverse from swh.web.api.apiurls import APIUrls from swh.web.api.apiresponse import make_api_response, error_response class argtypes(Enum): # noqa: N801 """Class for centralizing argument type descriptions """ ts = 'timestamp' int = 'integer' str = 'string' path = 'path' sha1 = 'sha1' uuid = 'uuid' sha1_git = 'sha1_git' algo_and_hash = 'hash_type:hash' class rettypes(Enum): # noqa: N801 """Class for centralizing return type descriptions """ octet_stream = 'octet stream' list = 'list' dict = 'dict' class excs(Enum): # noqa: N801 """Class for centralizing exception type descriptions """ badinput = 'BadInputExc' notfound = 'NotFoundExc' class APIDocException(Exception): """ Custom exception to signal errors in the use of the APIDoc decorators """ class route(object): # noqa: N801 """Decorate an API method to register it in the API doc route index and create the corresponding Flask route. This decorator is responsible for bootstrapping the linking of subsequent decorators, as well as traversing the decorator stack to obtain the documentation data from it. Args: route: documentation page's route noargs: set to True if the route has no arguments, and its result should be displayed anytime its documentation is requested. Default to False hidden: set to True to remove the endpoint from being listed in the /api endpoints. Default to False. tags: Further information on api endpoints. Two values are possibly expected: - hidden: remove the entry points from the listing - upcoming: display the entry point but it is not followable """ def __init__(self, route, noargs=False, tags=[], handle_response=False, api_version='1'): super().__init__() self.route = route self.urlpattern = '^' + api_version + route + '$' self.noargs = noargs self.tags = set(tags) self.handle_response = handle_response # @apidoc.route() Decorator call def __call__(self, f): # If the route is not hidden, add it to the index if 'hidden' not in self.tags: APIUrls.index_add_route(self.route, f.__doc__, tags=self.tags) # If the decorated route has arguments, we create a specific # documentation view if not self.noargs: @api_view(['GET', 'HEAD']) def doc_view(request): doc_data = self.get_doc_data(f) return make_api_response(request, None, doc_data) view_name = self.route[1:-1].replace('/', '-') APIUrls.index_add_url_pattern(self.urlpattern, doc_view, view_name) @wraps(f) def documented_view(request, **kwargs): doc_data = self.get_doc_data(f) try: rv = f(request, **kwargs) except Exception as exc: return error_response(request, exc, doc_data) if self.handle_response: return rv else: return make_api_response(request, rv, doc_data) return documented_view def filter_api_url(self, endpoint, route_re, noargs): doc_methods = {'GET', 'HEAD', 'OPTIONS'} if re.match(route_re, endpoint['rule']): if endpoint['methods'] == doc_methods and not noargs: return False return True def build_examples(self, urls, args): """Build example documentation. Args: f: function urls: information relative to url for that function args: information relative to arguments for that function Yields: example based on default parameter value if any """ s = set() r = [] for data_url in urls: url = data_url['rule'] defaults = {arg['name']: arg['default'] for arg in args if arg['name'] in url} if defaults and None not in defaults.values(): url = reverse(data_url['name'], kwargs=defaults) if url in s: continue s.add(url) r.append(url) return r def get_doc_data(self, f): """Build documentation data for the decorated function""" data = { 'route': self.route, 'noargs': self.noargs, } data.update(getattr(f, 'doc_data', {})) if not f.__doc__: raise APIDocException('Apidoc %s: expected a docstring' ' for function %s' % (self.__class__.__name__, f.__name__)) data['docstring'] = f.__doc__ route_re = re.compile('.*%s$' % data['route']) endpoint_list = APIUrls.get_method_endpoints(f) data['urls'] = [url for url in endpoint_list if self.filter_api_url(url, route_re, data['noargs'])] if 'args' in data: data['examples'] = self.build_examples(data['urls'], data['args']) data['heading'] = '%s Documentation' % data['route'] return data class DocData(object): """Base description of optional input/output setup for a route. """ destination = None def __init__(self): self.doc_data = {} def __call__(self, f): if not hasattr(f, 'doc_data'): f.doc_data = defaultdict(list) f.doc_data[self.destination].append(self.doc_data) return f class arg(DocData): # noqa: N801 """ Decorate an API method to display an argument's information on the doc page specified by @route above. Args: name: the argument's name. MUST match the method argument's name to create the example request URL. default: the argument's default value argtype: the argument's type as an Enum value from apidoc.argtypes argdoc: the argument's documentation string """ destination = 'args' def __init__(self, name, default, argtype, argdoc): super().__init__() self.doc_data = { 'name': name, 'type': argtype.value, 'doc': argdoc, 'default': default } class header(DocData): # noqa: N801 """ Decorate an API method to display header information the api can potentially return in the response. Args: name: the header name doc: the information about that header """ destination = 'headers' def __init__(self, name, doc): super().__init__() self.doc_data = { 'name': name, 'doc': doc, } class param(DocData): # noqa: N801 """Decorate an API method to display query parameter information the api can potentially accept. Args: name: parameter's name default: parameter's default value argtype: parameter's type as an Enum value from apidoc.argtypes doc: the information about that header """ destination = 'params' def __init__(self, name, default, argtype, doc): super().__init__() self.doc_data = { 'name': name, 'type': argtype.value, 'default': default, 'doc': doc, } class raises(DocData): # noqa: N801 """Decorate an API method to display information pertaining to an exception that can be raised by this method. Args: exc: the exception name as an Enum value from apidoc.excs doc: the exception's documentation string """ destination = 'excs' def __init__(self, exc, doc): super().__init__() self.doc_data = { 'exc': exc.value, 'doc': doc } class returns(DocData): # noqa: N801 """Decorate an API method to display information about its return value. Args: rettype: the return value's type as an Enum value from apidoc.rettypes retdoc: the return value's documentation string """ destination = 'returns' def __init__(self, rettype=None, retdoc=None): super().__init__() self.doc_data = { 'type': rettype.value, 'doc': retdoc } diff --git a/swh/web/api/apiresponse.py b/swh/web/api/apiresponse.py index b3ed9e28..03e97611 100644 --- a/swh/web/api/apiresponse.py +++ b/swh/web/api/apiresponse.py @@ -1,172 +1,173 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import json from rest_framework.response import Response from swh.storage.exc import StorageDBError, StorageAPIError +from swh.web.common.exc import NotFoundExc, ForbiddenExc +from swh.web.common.utils import shorten_path from swh.web.api import utils -from swh.web.api.exc import NotFoundExc, ForbiddenExc def compute_link_header(rv, options): """Add Link header in returned value results. Args: rv (dict): dictionary with keys: - headers: potential headers with 'link-next' and 'link-prev' keys - results: containing the result to return options (dict): the initial dict to update with result if any Returns: dict: dictionary with optional keys 'link-next' and 'link-prev' """ link_headers = [] if 'headers' not in rv: return {} rv_headers = rv['headers'] if 'link-next' in rv_headers: link_headers.append('<%s>; rel="next"' % ( rv_headers['link-next'])) if 'link-prev' in rv_headers: link_headers.append('<%s>; rel="previous"' % ( rv_headers['link-prev'])) if link_headers: link_header_str = ','.join(link_headers) headers = options.get('headers', {}) headers.update({ 'Link': link_header_str }) return headers return {} def filter_by_fields(request, data): """Extract a request parameter 'fields' if it exists to permit the filtering on he data dict's keys. If such field is not provided, returns the data as is. """ fields = utils.get_query_params(request).get('fields') if fields: fields = set(fields.split(',')) data = utils.filter_field_keys(data, fields) return data def transform(rv): """Transform an eventual returned value with multiple layer of information with only what's necessary. If the returned value rv contains the 'results' key, this is the associated value which is returned. Otherwise, return the initial dict without the potential 'headers' key. """ if 'results' in rv: return rv['results'] if 'headers' in rv: rv.pop('headers') return rv def make_api_response(request, data, doc_data={}, options={}): """Generates an API response based on the requested mimetype. Args: request: a DRF Request object data: raw data to return in the API response doc_data: documentation data for HTML response options: optionnal data that can be used to generate the response Returns: a DRF Response a object """ if data: options['headers'] = compute_link_header(data, options) data = transform(data) data = filter_by_fields(request, data) doc_env = doc_data headers = {} if 'headers' in options: doc_env['headers_data'] = options['headers'] headers = options['headers'] # get request status code doc_env['status_code'] = options.get('status', 200) response_args = {'status': doc_env['status_code'], 'headers': headers, 'content_type': request.accepted_media_type} # when requesting HTML, typically when browsing the API through its # documented views, we need to enrich the input data with documentation # related ones and inform DRF that we request HTML template rendering if request.accepted_media_type == 'text/html': if data: data = json.dumps(data, sort_keys=True, indent=4, separators=(',', ': ')) doc_env['response_data'] = data doc_env['request'] = request - doc_env['heading'] = utils.shorten_path(str(request.path)) + doc_env['heading'] = shorten_path(str(request.path)) response_args['data'] = doc_env response_args['template_name'] = 'apidoc.html' # otherwise simply return the raw data and let DRF picks # the correct renderer (JSON or YAML) else: response_args['data'] = data return Response(**response_args) def error_response(request, error, doc_data): """Private function to create a custom error response. Args: request: a DRF Request object error: the exception that caused the error doc_data: documentation data for HTML response """ error_code = 400 if isinstance(error, NotFoundExc): error_code = 404 elif isinstance(error, ForbiddenExc): error_code = 403 elif isinstance(error, StorageDBError): error_code = 503 elif isinstance(error, StorageAPIError): error_code = 503 error_opts = {'status': error_code} error_data = { 'exception': error.__class__.__name__, 'reason': str(error), } return make_api_response(request, error_data, doc_data, options=error_opts) diff --git a/swh/web/api/utils.py b/swh/web/api/utils.py index 7cded6f2..a3e1845c 100644 --- a/swh/web/api/utils.py +++ b/swh/web/api/utils.py @@ -1,450 +1,340 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import re -import urllib -from django.core import urlresolvers -from django.http import QueryDict -from datetime import datetime, timezone -from dateutil import parser - -from .exc import BadInputExc - - -# override django reverse function in order to get -# the same result on debian jessie and stretch -# (see https://code.djangoproject.com/ticket/22223) -def reverse(viewname, args=None, kwargs=None, query_params=None, - current_app=None, urlconf=None): - """An override of django reverse function supporting multiple - django versions (from 1.7 to current) and query parameters. - - Args: - viewname: the name of the django view from which to compute - a url - args: list of url arguments ordered according to their position it - kwargs: dictionnary of url arguments indexed by their names - query_params: dictionnary of query parameters to append to the - reversed url - current_app: the name of the django app tighted to the view - urlconf: url configuration module - - Returns: - The url of the requested view with processed arguments and - query parameters - """ - - url = urllib.parse.unquote( - urlresolvers.reverse( - viewname, urlconf=urlconf, args=args, - kwargs=kwargs, current_app=current_app - ) - ) - if query_params and len(query_params) > 0: - query_dict = QueryDict('', mutable=True) - for k, v in query_params.items(): - query_dict[k] = v - url += ('?' + query_dict.urlencode()) - return url +from swh.web.common.utils import reverse, fmap def filter_endpoints(url_map, prefix_url_rule, blacklist=[]): """Filter endpoints by prefix url rule. Args: - url_map: Url Werkzeug.Map of rules - prefix_url_rule: prefix url string - blacklist: blacklist of some url Returns: Dictionary of url_rule with values methods and endpoint. The key is the url, the associated value is a dictionary of 'methods' (possible http methods) and 'endpoint' (python function) """ out = {} for r in url_map: rule = r['rule'] if rule == prefix_url_rule or rule in blacklist: continue if rule.startswith(prefix_url_rule): out[rule] = {'methods': sorted(map(str, r['methods'])), 'endpoint': r['endpoint']} return out -def fmap(f, data): - """Map f to data at each level. - - This must keep the origin data structure type: - - map -> map - - dict -> dict - - list -> list - - None -> None - - Args: - f: function that expects one argument. - data: data to traverse to apply the f function. - list, map, dict or bare value. - - Returns: - The same data-structure with modified values by the f function. - - """ - if data is None: - return data - if isinstance(data, map): - return map(lambda y: fmap(f, y), (x for x in data)) - if isinstance(data, list): - return [fmap(f, x) for x in data] - if isinstance(data, dict): - return {k: fmap(f, v) for (k, v) in data.items()} - return f(data) - - def prepare_data_for_view(data, encoding='utf-8'): def prepare_data(s): # Note: can only be 'data' key with bytes of raw content if isinstance(s, bytes): try: return s.decode(encoding) except: return "Cannot decode the data bytes, try and set another " \ "encoding in the url (e.g. ?encoding=utf8) or " \ "download directly the " \ "content's raw data." if isinstance(s, str): return re.sub(r'/api/1/', r'/browse/', s) return s return fmap(prepare_data, data) def filter_field_keys(data, field_keys): """Given an object instance (directory or list), and a csv field keys to filter on. Return the object instance with filtered keys. Note: Returns obj as is if it's an instance of types not in (dictionary, list) Args: - data: one object (dictionary, list...) to filter. - field_keys: csv or set of keys to filter the object on Returns: obj filtered on field_keys """ if isinstance(data, map): return map(lambda x: filter_field_keys(x, field_keys), data) if isinstance(data, list): return [filter_field_keys(x, field_keys) for x in data] if isinstance(data, dict): return {k: v for (k, v) in data.items() if k in field_keys} return data def person_to_string(person): """Map a person (person, committer, tagger, etc...) to a string. """ return ''.join([person['name'], ' <', person['email'], '>']) -def parse_timestamp(timestamp): - """Given a time or timestamp (as string), parse the result as datetime. - - Returns: - a timezone-aware datetime representing the parsed value. - None if the parsing fails. - - Samples: - - 2016-01-12 - - 2016-01-12T09:19:12+0100 - - Today is January 1, 2047 at 8:21:00AM - - 1452591542 - - """ - if not timestamp: - return None - - try: - return parser.parse(timestamp, ignoretz=False, fuzzy=True) - except: - try: - return datetime.utcfromtimestamp(float(timestamp)).replace( - tzinfo=timezone.utc) - except (ValueError, OverflowError) as e: - raise BadInputExc(e) - - def enrich_object(object): """Enrich an object (revision, release) with link to the 'target' of type 'target_type'. Args: object: An object with target and target_type keys (e.g. release, revision) Returns: Object enriched with target_url pointing to the right swh.web.ui.api urls for the pointing object (revision, release, content, directory) """ obj = object.copy() if 'target' in obj and 'target_type' in obj: if obj['target_type'] == 'revision': obj['target_url'] = reverse('revision', kwargs={'sha1_git': obj['target']}) elif obj['target_type'] == 'release': obj['target_url'] = reverse('release', kwargs={'sha1_git': obj['target']}) elif obj['target_type'] == 'content': obj['target_url'] = \ reverse('content', kwargs={'q': 'sha1_git:' + obj['target']}) elif obj['target_type'] == 'directory': obj['target_url'] = reverse('directory', kwargs={'sha1_git': obj['target']}) if 'author' in obj: author = obj['author'] obj['author_url'] = reverse('person', kwargs={'person_id': author['id']}) return obj enrich_release = enrich_object def enrich_directory(directory, context_url=None): """Enrich directory with url to content or directory. """ if 'type' in directory: target_type = directory['type'] target = directory['target'] if target_type == 'file': directory['target_url'] = \ reverse('content', kwargs={'q': 'sha1_git:%s' % target}) if context_url: directory['file_url'] = context_url + directory['name'] + '/' else: directory['target_url'] = reverse('directory', kwargs={'sha1_git': target}) if context_url: directory['dir_url'] = context_url + directory['name'] + '/' return directory def enrich_metadata_endpoint(content): """Enrich metadata endpoint with link to the upper metadata endpoint. """ c = content.copy() c['content_url'] = reverse('content', args=['sha1:%s' % c['id']]) return c def enrich_content(content, top_url=False): """Enrich content with links to: - data_url: its raw data - filetype_url: its filetype information """ for h in ['sha1', 'sha1_git', 'sha256']: if h in content: q = '%s:%s' % (h, content[h]) if top_url: content['content_url'] = reverse('content', kwargs={'q': q}) content['data_url'] = reverse('content-raw', kwargs={'q': q}) content['filetype_url'] = reverse('content-filetype', kwargs={'q': q}) content['language_url'] = reverse('content-language', kwargs={'q': q}) content['license_url'] = reverse('content-license', kwargs={'q': q}) break return content def enrich_entity(entity): """Enrich entity with """ if 'uuid' in entity: entity['uuid_url'] = reverse('entity', kwargs={'uuid': entity['uuid']}) if 'parent' in entity and entity['parent']: entity['parent_url'] = reverse('entity', kwargs={'uuid': entity['parent']}) return entity def _get_path_list(path_string): """Helper for enrich_revision: get a list of the sha1 id of the navigation breadcrumbs, ordered from the oldest to the most recent. Args: path_string: the path as a '/'-separated string Returns: The navigation context as a list of sha1 revision ids """ return path_string.split('/') def _get_revision_contexts(rev_id, context): """Helper for enrich_revision: retrieve for the revision id and potentially the navigation breadcrumbs the context to pass to parents and children of of the revision. Args: rev_id: the revision's sha1 id context: the current navigation context Returns: The context for parents, children and the url of the direct child as a tuple in that order. """ context_for_parents = None context_for_children = None url_direct_child = None if not context: return (rev_id, None, None) path_list = _get_path_list(context) context_for_parents = '%s/%s' % (context, rev_id) prev_for_children = path_list[:-1] if len(prev_for_children) > 0: context_for_children = '/'.join(prev_for_children) child_id = path_list[-1] # This commit is not the first commit in the path if context_for_children: url_direct_child = reverse('revision-context', kwargs={'sha1_git': child_id, 'context': context_for_children}) # This commit is the first commit in the path else: url_direct_child = reverse('revision', kwargs={'sha1_git': child_id}) return (context_for_parents, context_for_children, url_direct_child) def _make_child_url(rev_children, context): """Helper for enrich_revision: retrieve the list of urls corresponding to the children of the current revision according to the navigation breadcrumbs. Args: rev_children: a list of revision id context: the '/'-separated navigation breadcrumbs Returns: the list of the children urls according to the context """ children = [] for child in rev_children: if context and child != _get_path_list(context)[-1]: children.append(reverse('revision', kwargs={'sha1_git': child})) elif not context: children.append(reverse('revision', kwargs={'sha1_git': child})) return children def enrich_revision(revision, context=None): """Enrich revision with links where it makes sense (directory, parents). Keep track of the navigation breadcrumbs if they are specified. Args: revision: the revision as a dict context: the navigation breadcrumbs as a /-separated string of revision sha1_git """ ctx_parents, ctx_children, url_direct_child = _get_revision_contexts( revision['id'], context) revision['url'] = reverse('revision', kwargs={'sha1_git': revision['id']}) revision['history_url'] = reverse('revision-log', kwargs={'sha1_git': revision['id']}) if context: revision['history_context_url'] = reverse( 'revision-log', kwargs={'sha1_git': revision['id'], 'prev_sha1s': context}) if 'author' in revision: author = revision['author'] revision['author_url'] = reverse('person', kwargs={'person_id': author['id']}) if 'committer' in revision: committer = revision['committer'] revision['committer_url'] = \ reverse('person', kwargs={'person_id': committer['id']}) if 'directory' in revision: revision['directory_url'] = \ reverse('directory', kwargs={'sha1_git': revision['directory']}) if 'parents' in revision: parents = [] for parent in revision['parents']: parents.append({ 'id': parent, 'url': reverse('revision', kwargs={'sha1_git': parent}) }) revision['parents'] = parents if 'children' in revision: children = _make_child_url(revision['children'], context) if url_direct_child: children.append(url_direct_child) revision['children_urls'] = children else: if url_direct_child: revision['children_urls'] = [url_direct_child] if 'message_decoding_failed' in revision: revision['message_url'] = reverse('revision-raw-message', kwargs={'sha1_git': revision['id']}) return revision -def shorten_path(path): - """Shorten the given path: for each hash present, only return the first - 8 characters followed by an ellipsis""" - - sha256_re = r'([0-9a-f]{8})[0-9a-z]{56}' - sha1_re = r'([0-9a-f]{8})[0-9a-f]{32}' - - ret = re.sub(sha256_re, r'\1...', path) - return re.sub(sha1_re, r'\1...', ret) - - def get_query_params(request): """Utility functions for retrieving query parameters from a DRF request object. Its purpose is to handle multiple versions of DRF.""" if hasattr(request, 'query_params'): # DRF >= 3.0 uses query_params attribute return request.query_params else: # while DRF < 3.0 uses QUERY_PARAMS attribute return request.QUERY_PARAMS diff --git a/swh/web/api/views/__init__.py b/swh/web/api/views/__init__.py index 65b67816..e48e38da 100644 --- a/swh/web/api/views/__init__.py +++ b/swh/web/api/views/__init__.py @@ -1,92 +1,92 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from django.conf.urls import url from rest_framework.response import Response from rest_framework.decorators import api_view from types import GeneratorType -from swh.web.api.exc import NotFoundExc +from swh.web.common.exc import NotFoundExc from swh.web.api.apiurls import APIUrls, api_route # canned doc string snippets that are used in several doc strings _doc_arg_content_id = """A "[hash_type:]hash" content identifier, where hash_type is one of "sha1" (the default), "sha1_git", "sha256", and hash is a checksum obtained with the hash_type hashing algorithm.""" _doc_arg_last_elt = 'element to start listing from, for pagination purposes' _doc_arg_per_page = 'number of elements to list, for pagination purposes' _doc_exc_bad_id = 'syntax error in the given identifier(s)' _doc_exc_id_not_found = 'no object matching the given criteria could be found' _doc_ret_revision_meta = 'metadata of the revision identified by sha1_git' _doc_ret_revision_log = """list of dictionaries representing the metadata of each revision found in the commit log heading to revision sha1_git. For each commit at least the following information are returned: author/committer, authoring/commit timestamps, revision id, commit message, parent (i.e., immediately preceding) commits, "root" directory id.""" _doc_header_link = """indicates that a subsequent result page is available, pointing to it""" def _api_lookup(lookup_fn, *args, notfound_msg='Object not found', enrich_fn=lambda x: x): """Capture a redundant behavior of: - looking up the backend with a criteria (be it an identifier or checksum) passed to the function lookup_fn - if nothing is found, raise an NotFoundExc exception with error message notfound_msg. - Otherwise if something is returned: - either as list, map or generator, map the enrich_fn function to it and return the resulting data structure as list. - either as dict and pass to enrich_fn and return the dict enriched. Args: - criteria: discriminating criteria to lookup - lookup_fn: function expects one criteria and optional supplementary *args. - notfound_msg: if nothing matching the criteria is found, raise NotFoundExc with this error message. - enrich_fn: Function to use to enrich the result returned by lookup_fn. Default to the identity function if not provided. - *args: supplementary arguments to pass to lookup_fn. Raises: NotFoundExp or whatever `lookup_fn` raises. """ res = lookup_fn(*args) if not res: raise NotFoundExc(notfound_msg) if isinstance(res, (map, list, GeneratorType)): return [enrich_fn(x) for x in res] return enrich_fn(res) @api_view(['GET', 'HEAD']) def api_home(request): return Response({}, template_name='api.html') APIUrls.urlpatterns.append(url(r'^$', api_home, name='api_homepage')) @api_route(r'/', 'endpoints') def api_endpoints(request): """Display the list of opened api endpoints. """ routes = APIUrls.get_app_endpoints().copy() for route, doc in routes.items(): doc['doc_intro'] = doc['docstring'].split('\n\n')[0] # Return a list of routes with consistent ordering env = { 'doc_routes': sorted(routes.items()) } return Response(env, template_name="api-endpoints.html") diff --git a/swh/web/api/views/content.py b/swh/web/api/views/content.py index dcaecab7..0c20534b 100644 --- a/swh/web/api/views/content.py +++ b/swh/web/api/views/content.py @@ -1,339 +1,340 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import functools from django.http import HttpResponse -from swh.web.api.utils import reverse -from swh.web.api import service, utils +from swh.web.common import service +from swh.web.common.utils import reverse +from swh.web.common.exc import NotFoundExc, ForbiddenExc from swh.web.api import apidoc as api_doc -from swh.web.api.exc import NotFoundExc, ForbiddenExc +from swh.web.api import utils from swh.web.api.apiurls import api_route from swh.web.api.views import ( _api_lookup, _doc_exc_id_not_found, _doc_header_link, _doc_arg_last_elt, _doc_arg_per_page, _doc_exc_bad_id, _doc_arg_content_id ) @api_route(r'/content/(?P.+)/provenance/', 'content-provenance') @api_doc.route('/content/provenance/', tags=['hidden']) @api_doc.arg('q', default='sha1_git:88b9b366facda0b5ff8d8640ee9279bed346f242', argtype=api_doc.argtypes.algo_and_hash, argdoc=_doc_arg_content_id) @api_doc.raises(exc=api_doc.excs.badinput, doc=_doc_exc_bad_id) @api_doc.raises(exc=api_doc.excs.notfound, doc=_doc_exc_id_not_found) @api_doc.returns(rettype=api_doc.rettypes.dict, retdoc="""List of provenance information (dict) for the matched content.""") def api_content_provenance(request, q): """Return content's provenance information if any. """ def _enrich_revision(provenance): p = provenance.copy() p['revision_url'] = \ reverse('revision', kwargs={'sha1_git': provenance['revision']}) p['content_url'] = \ reverse('content', kwargs={'q': 'sha1_git:%s' % provenance['content']}) p['origin_url'] = \ reverse('origin', kwargs={'origin_id': provenance['origin']}) p['origin_visits_url'] = \ reverse('origin-visits', kwargs={'origin_id': provenance['origin']}) p['origin_visit_url'] = \ reverse('origin-visit', kwargs={'origin_id': provenance['origin'], 'visit_id': provenance['visit']}) return p return _api_lookup( service.lookup_content_provenance, q, notfound_msg='Content with {} not found.'.format(q), enrich_fn=_enrich_revision) @api_route(r'/content/(?P.+)/filetype/', 'content-filetype') @api_doc.route('/content/filetype/', tags=['upcoming']) @api_doc.arg('q', default='sha1:1fc6129a692e7a87b5450e2ba56e7669d0c5775d', argtype=api_doc.argtypes.algo_and_hash, argdoc=_doc_arg_content_id) @api_doc.raises(exc=api_doc.excs.badinput, doc=_doc_exc_bad_id) @api_doc.raises(exc=api_doc.excs.notfound, doc=_doc_exc_id_not_found) @api_doc.returns(rettype=api_doc.rettypes.dict, retdoc="""Filetype information (dict) for the matched content.""") def api_content_filetype(request, q): """Get information about the detected MIME type of a content object. """ return _api_lookup( service.lookup_content_filetype, q, notfound_msg='No filetype information found for content {}.'.format(q), enrich_fn=utils.enrich_metadata_endpoint) @api_route(r'/content/(?P.+)/language/', 'content-language') @api_doc.route('/content/language/', tags=['upcoming']) @api_doc.arg('q', default='sha1:1fc6129a692e7a87b5450e2ba56e7669d0c5775d', argtype=api_doc.argtypes.algo_and_hash, argdoc=_doc_arg_content_id) @api_doc.raises(exc=api_doc.excs.badinput, doc=_doc_exc_bad_id) @api_doc.raises(exc=api_doc.excs.notfound, doc=_doc_exc_id_not_found) @api_doc.returns(rettype=api_doc.rettypes.dict, retdoc="""Language information (dict) for the matched content.""") def api_content_language(request, q): """Get information about the detected (programming) language of a content object. """ return _api_lookup( service.lookup_content_language, q, notfound_msg='No language information found for content {}.'.format(q), enrich_fn=utils.enrich_metadata_endpoint) @api_route(r'/content/(?P.+)/license/', 'content-license') @api_doc.route('/content/license/', tags=['upcoming']) @api_doc.arg('q', default='sha1:1fc6129a692e7a87b5450e2ba56e7669d0c5775d', argtype=api_doc.argtypes.algo_and_hash, argdoc=_doc_arg_content_id) @api_doc.raises(exc=api_doc.excs.badinput, doc=_doc_exc_bad_id) @api_doc.raises(exc=api_doc.excs.notfound, doc=_doc_exc_id_not_found) @api_doc.returns(rettype=api_doc.rettypes.dict, retdoc="""License information (dict) for the matched content.""") def api_content_license(request, q): """Get information about the detected license of a content object. """ return _api_lookup( service.lookup_content_license, q, notfound_msg='No license information found for content {}.'.format(q), enrich_fn=utils.enrich_metadata_endpoint) @api_route(r'/content/(?P.+)/ctags/', 'content-ctags') @api_doc.route('/content/ctags/', tags=['upcoming']) @api_doc.arg('q', default='sha1:1fc6129a692e7a87b5450e2ba56e7669d0c5775d', argtype=api_doc.argtypes.algo_and_hash, argdoc=_doc_arg_content_id) @api_doc.raises(exc=api_doc.excs.badinput, doc=_doc_exc_bad_id) @api_doc.raises(exc=api_doc.excs.notfound, doc=_doc_exc_id_not_found) @api_doc.returns(rettype=api_doc.rettypes.dict, retdoc="""Ctags symbol (dict) for the matched content.""") def api_content_ctags(request, q): """Get information about all `Ctags `_-style symbols defined in a content object. """ return _api_lookup( service.lookup_content_ctags, q, notfound_msg='No ctags symbol found for content {}.'.format(q), enrich_fn=utils.enrich_metadata_endpoint) @api_route(r'/content/(?P.+)/raw/', 'content-raw') @api_doc.route('/content/raw/', handle_response=True) @api_doc.arg('q', default='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc', argtype=api_doc.argtypes.algo_and_hash, argdoc=_doc_arg_content_id) @api_doc.param('filename', default=None, argtype=api_doc.argtypes.str, doc='User\'s desired filename. If provided, the downloaded' ' content will get that filename.') @api_doc.raises(exc=api_doc.excs.badinput, doc=_doc_exc_bad_id) @api_doc.raises(exc=api_doc.excs.notfound, doc=_doc_exc_id_not_found) @api_doc.returns(rettype=api_doc.rettypes.octet_stream, retdoc='The raw content data as an octet stream') def api_content_raw(request, q): """Get the raw content of a content object (AKA "blob"), as a byte sequence. """ def generate(content): yield content['data'] content_raw = service.lookup_content_raw(q) if not content_raw: raise NotFoundExc('Content %s is not found.' % q) content_filetype = service.lookup_content_filetype(q) if not content_filetype: raise NotFoundExc('Content %s is not available for download.' % q) mimetype = content_filetype['mimetype'] if 'text/' not in mimetype: raise ForbiddenExc('Only textual content is available for download. ' 'Actual content mimetype is %s.' % mimetype) filename = utils.get_query_params(request).get('filename') if not filename: filename = 'content_%s_raw' % q.replace(':', '_') response = HttpResponse(generate(content_raw), content_type='application/octet-stream') response['Content-disposition'] = 'attachment; filename=%s' % filename return response @api_route(r'/content/symbol/(?P.+)/', 'content-symbol') @api_doc.route('/content/symbol/', tags=['upcoming']) @api_doc.arg('q', default='hello', argtype=api_doc.argtypes.str, argdoc="""An expression string to lookup in swh's raw content""") @api_doc.header('Link', doc=_doc_header_link) @api_doc.param('last_sha1', default=None, argtype=api_doc.argtypes.str, doc=_doc_arg_last_elt) @api_doc.param('per_page', default=10, argtype=api_doc.argtypes.int, doc=_doc_arg_per_page) @api_doc.returns(rettype=api_doc.rettypes.list, retdoc="""A list of dict whose content matches the expression. Each dict has the following keys: - id (bytes): identifier of the content - name (text): symbol whose content match the expression - kind (text): kind of the symbol that matched - lang (text): Language for that entry - line (int): Number line for the symbol """) def api_content_symbol(request, q=None): """Search content objects by `Ctags `_-style symbol (e.g., function name, data type, method, ...). """ result = {} last_sha1 = utils.get_query_params(request).get('last_sha1', None) per_page = int(utils.get_query_params(request).get('per_page', '10')) def lookup_exp(exp, last_sha1=last_sha1, per_page=per_page): return service.lookup_expression(exp, last_sha1, per_page) symbols = _api_lookup( lookup_exp, q, notfound_msg="No indexed raw content match expression '{}'.".format(q), enrich_fn=functools.partial(utils.enrich_content, top_url=True)) if symbols: l = len(symbols) if l == per_page: query_params = {} new_last_sha1 = symbols[-1]['sha1'] query_params['last_sha1'] = new_last_sha1 if utils.get_query_params(request).get('per_page'): query_params['per_page'] = per_page result['headers'] = { 'link-next': reverse('content-symbol', kwargs={'q': q}, query_params=query_params) } result.update({ 'results': symbols }) return result @api_route(r'/content/known/search/', 'content-known', methods=['POST']) @api_route(r'/content/known/(?P(?!search).*)/', 'content-known') @api_doc.route('/content/known/', tags=['hidden']) @api_doc.arg('q', default='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc', argtype=api_doc.argtypes.sha1, argdoc='content identifier as a sha1 checksum') @api_doc.param('q', default=None, argtype=api_doc.argtypes.str, doc="""(POST request) An algo_hash:hash string, where algo_hash is one of sha1, sha1_git or sha256 and hash is the hash to search for in SWH""") @api_doc.raises(exc=api_doc.excs.badinput, doc=_doc_exc_bad_id) @api_doc.returns(rettype=api_doc.rettypes.dict, retdoc="""a dictionary with results (found/not found for each given identifier) and statistics about how many identifiers were found""") def api_check_content_known(request, q=None): """Check whether some content (AKA "blob") is present in the archive. Lookup can be performed by various means: - a GET request with one or several hashes, separated by ',' - a POST request with one or several hashes, passed as (multiple) values for parameter 'q' """ response = {'search_res': None, 'search_stats': None} search_stats = {'nbfiles': 0, 'pct': 0} search_res = None queries = [] # GET: Many hash separated values request if q: hashes = q.split(',') for v in hashes: queries.append({'filename': None, 'sha1': v}) # POST: Many hash requests in post form submission elif request.method == 'POST': data = request.data if hasattr(request, 'data') else request.DATA # Remove potential inputs with no associated value for k, v in data.items(): if v is not None: if k == 'q' and len(v) > 0: queries.append({'filename': None, 'sha1': v}) elif v != '': queries.append({'filename': k, 'sha1': v}) if queries: lookup = service.lookup_multiple_hashes(queries) result = [] l = len(queries) for el in lookup: res_d = {'sha1': el['sha1'], 'found': el['found']} if 'filename' in el and el['filename']: res_d['filename'] = el['filename'] result.append(res_d) search_res = result nbfound = len([x for x in lookup if x['found']]) search_stats['nbfiles'] = l search_stats['pct'] = (nbfound / l) * 100 response['search_res'] = search_res response['search_stats'] = search_stats return response @api_route(r'/content/(?P.+)/', 'content') @api_doc.route('/content/') @api_doc.arg('q', default='dc2830a9e72f23c1dfebef4413003221baa5fb62', argtype=api_doc.argtypes.algo_and_hash, argdoc=_doc_arg_content_id) @api_doc.raises(exc=api_doc.excs.badinput, doc=_doc_exc_bad_id) @api_doc.raises(exc=api_doc.excs.notfound, doc=_doc_exc_id_not_found) @api_doc.returns(rettype=api_doc.rettypes.dict, retdoc="""known metadata for content identified by q""") def api_content_metadata(request, q): """Get information about a content (AKA "blob") object. """ return _api_lookup( service.lookup_content, q, notfound_msg='Content with {} not found.'.format(q), enrich_fn=utils.enrich_content) diff --git a/swh/web/api/views/directory.py b/swh/web/api/views/directory.py index bce90159..8ac87397 100644 --- a/swh/web/api/views/directory.py +++ b/swh/web/api/views/directory.py @@ -1,58 +1,59 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information -from swh.web.api import service, utils +from swh.web.common import service +from swh.web.api import utils from swh.web.api import apidoc as api_doc from swh.web.api.apiurls import api_route from swh.web.api.views import ( _api_lookup, _doc_exc_id_not_found, _doc_exc_bad_id, ) @api_route(r'/directory/(?P[0-9a-f]+)/', 'directory') @api_route(r'/directory/(?P[0-9a-f]+)/(?P.+)/', 'directory') @api_doc.route('/directory/') @api_doc.arg('sha1_git', default='1bd0e65f7d2ff14ae994de17a1e7fe65111dcad8', argtype=api_doc.argtypes.sha1_git, argdoc='directory identifier') @api_doc.arg('path', default='codec/demux', argtype=api_doc.argtypes.path, argdoc='path relative to directory identified by sha1_git') @api_doc.raises(exc=api_doc.excs.badinput, doc=_doc_exc_bad_id) @api_doc.raises(exc=api_doc.excs.notfound, doc=_doc_exc_id_not_found) @api_doc.returns(rettype=api_doc.rettypes.dict, retdoc="""either a list of directory entries with their metadata, or the metadata of a single directory entry""") def api_directory(request, sha1_git, path=None): """Get information about directory or directory entry objects. Directories are identified by SHA1 checksums, compatible with Git directory identifiers. See ``directory_identifier`` in our `data model module `_ for details about how they are computed. When given only a directory identifier, this endpoint returns information about the directory itself, returning its content (usually a list of directory entries). When given a directory identifier and a path, this endpoint returns information about the directory entry pointed by the relative path, starting path resolution from the given directory. """ if path: error_msg_path = ('Entry with path %s relative to directory ' 'with sha1_git %s not found.') % (path, sha1_git) return _api_lookup( service.lookup_directory_with_path, sha1_git, path, notfound_msg=error_msg_path, enrich_fn=utils.enrich_directory) else: error_msg_nopath = 'Directory with sha1_git %s not found.' % sha1_git return _api_lookup( service.lookup_directory, sha1_git, notfound_msg=error_msg_nopath, enrich_fn=utils.enrich_directory) diff --git a/swh/web/api/views/entity.py b/swh/web/api/views/entity.py index 35f9e4d0..c98eaa73 100644 --- a/swh/web/api/views/entity.py +++ b/swh/web/api/views/entity.py @@ -1,32 +1,33 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information -from swh.web.api import service, utils +from swh.web.common import service +from swh.web.api import utils from swh.web.api import apidoc as api_doc from swh.web.api.apiurls import api_route from swh.web.api.views import ( _api_lookup, _doc_exc_id_not_found, _doc_exc_bad_id ) @api_route(r'/entity/(?P.+)/', 'entity') @api_doc.route('/entity/', tags=['hidden']) @api_doc.arg('uuid', default='5f4d4c51-498a-4e28-88b3-b3e4e8396cba', argtype=api_doc.argtypes.uuid, argdoc="The entity's uuid identifier") @api_doc.raises(exc=api_doc.excs.badinput, doc=_doc_exc_bad_id) @api_doc.raises(exc=api_doc.excs.notfound, doc=_doc_exc_id_not_found) @api_doc.returns(rettype=api_doc.rettypes.dict, retdoc='The metadata of the entity identified by uuid') def api_entity_by_uuid(request, uuid): """Return content information if content is found. """ return _api_lookup( service.lookup_entity_by_uuid, uuid, notfound_msg="Entity with uuid '%s' not found." % uuid, enrich_fn=utils.enrich_entity) diff --git a/swh/web/api/views/origin.py b/swh/web/api/views/origin.py index f9ff4d72..f86ba27c 100644 --- a/swh/web/api/views/origin.py +++ b/swh/web/api/views/origin.py @@ -1,178 +1,179 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information -from swh.web.api.utils import reverse -from swh.web.api import service, utils +from swh.web.common import service +from swh.web.common.utils import reverse +from swh.web.api import utils from swh.web.api import apidoc as api_doc from swh.web.api.apiurls import api_route from swh.web.api.views import ( _api_lookup, _doc_exc_id_not_found, _doc_header_link, _doc_arg_last_elt, _doc_arg_per_page ) @api_route(r'/origin/(?P[0-9]+)/', 'origin') @api_route(r'/origin/(?P[a-z]+)/url/(?P.+)', 'origin') @api_doc.route('/origin/') @api_doc.arg('origin_id', default=1, argtype=api_doc.argtypes.int, argdoc='origin identifier (when looking up by ID)') @api_doc.arg('origin_type', default='git', argtype=api_doc.argtypes.str, argdoc='origin type (when looking up by type+URL)') @api_doc.arg('origin_url', default='https://github.com/hylang/hy', argtype=api_doc.argtypes.path, argdoc='origin URL (when looking up by type+URL)') @api_doc.raises(exc=api_doc.excs.notfound, doc=_doc_exc_id_not_found) @api_doc.returns(rettype=api_doc.rettypes.dict, retdoc="""The metadata of the origin corresponding to the given criteria""") def api_origin(request, origin_id=None, origin_type=None, origin_url=None): """Get information about a software origin. Software origins might be looked up by origin type and canonical URL (e.g., "git" + a "git clone" URL), or by their unique (but otherwise meaningless) identifier. """ ori_dict = { 'id': origin_id, 'type': origin_type, 'url': origin_url } ori_dict = {k: v for k, v in ori_dict.items() if ori_dict[k]} if 'id' in ori_dict: error_msg = 'Origin with id %s not found.' % ori_dict['id'] else: error_msg = 'Origin with type %s and URL %s not found' % ( ori_dict['type'], ori_dict['url']) def _enrich_origin(origin): if 'id' in origin: o = origin.copy() o['origin_visits_url'] = \ reverse('origin-visits', kwargs={'origin_id': origin['id']}) return o return origin return _api_lookup( service.lookup_origin, ori_dict, notfound_msg=error_msg, enrich_fn=_enrich_origin) @api_route(r'/origin/(?P[0-9]+)/visits/', 'origin-visits') @api_doc.route('/origin/visits/') @api_doc.arg('origin_id', default=1, argtype=api_doc.argtypes.int, argdoc='software origin identifier') @api_doc.header('Link', doc=_doc_header_link) @api_doc.param('last_visit', default=None, argtype=api_doc.argtypes.int, doc=_doc_arg_last_elt) @api_doc.param('per_page', default=10, argtype=api_doc.argtypes.int, doc=_doc_arg_per_page) @api_doc.raises(exc=api_doc.excs.notfound, doc=_doc_exc_id_not_found) @api_doc.returns(rettype=api_doc.rettypes.list, retdoc="""a list of dictionaries describing individual visits. For each visit, its identifier, timestamp (as UNIX time), outcome, and visit-specific URL for more information are given.""") def api_origin_visits(request, origin_id): """Get information about all visits of a given software origin. """ result = {} per_page = int(utils.get_query_params(request).get('per_page', '10')) last_visit = utils.get_query_params(request).get('last_visit') if last_visit: last_visit = int(last_visit) def _lookup_origin_visits( origin_id, last_visit=last_visit, per_page=per_page): return service.lookup_origin_visits( origin_id, last_visit=last_visit, per_page=per_page) def _enrich_origin_visit(origin_visit): ov = origin_visit.copy() ov['origin_visit_url'] = reverse('origin-visit', kwargs={'origin_id': origin_id, 'visit_id': ov['visit']}) return ov r = _api_lookup( _lookup_origin_visits, origin_id, notfound_msg='No origin {} found'.format(origin_id), enrich_fn=_enrich_origin_visit) if r: l = len(r) if l == per_page: new_last_visit = r[-1]['visit'] query_params = {} query_params['last_visit'] = new_last_visit if utils.get_query_params(request).get('per_page'): query_params['per_page'] = per_page result['headers'] = { 'link-next': reverse('origin-visits', kwargs={'origin_id': origin_id}, query_params=query_params) } result.update({ 'results': r }) return result @api_route(r'/origin/(?P[0-9]+)/visit/(?P[0-9]+)/', 'origin-visit') @api_doc.route('/origin/visit/') @api_doc.arg('origin_id', default=1, argtype=api_doc.argtypes.int, argdoc='software origin identifier') @api_doc.arg('visit_id', default=1, argtype=api_doc.argtypes.int, argdoc="""visit identifier, relative to the origin identified by origin_id""") @api_doc.raises(exc=api_doc.excs.notfound, doc=_doc_exc_id_not_found) @api_doc.returns(rettype=api_doc.rettypes.dict, retdoc="""dictionary containing both metadata for the entire visit (e.g., timestamp as UNIX time, visit outcome, etc.) and what was at the software origin during the visit (i.e., a mapping from branches to other archive objects)""") def api_origin_visit(request, origin_id, visit_id): """Get information about a specific visit of a software origin. """ def _enrich_origin_visit(origin_visit): ov = origin_visit.copy() ov['origin_url'] = reverse('origin', kwargs={'origin_id': ov['origin']}) if 'occurrences' in ov: ov['occurrences'] = { k: utils.enrich_object(v) for k, v in ov['occurrences'].items() } return ov return _api_lookup( service.lookup_origin_visit, origin_id, visit_id, notfound_msg=('No visit {} for origin {} found' .format(visit_id, origin_id)), enrich_fn=_enrich_origin_visit) diff --git a/swh/web/api/views/person.py b/swh/web/api/views/person.py index be4b228f..64e4fc5a 100644 --- a/swh/web/api/views/person.py +++ b/swh/web/api/views/person.py @@ -1,29 +1,29 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information -from swh.web.api import service +from swh.web.common import service from swh.web.api import apidoc as api_doc from swh.web.api.apiurls import api_route from swh.web.api.views import ( _api_lookup, _doc_exc_id_not_found, ) @api_route(r'/person/(?P[0-9]+)/', 'person') @api_doc.route('/person/') @api_doc.arg('person_id', default=42, argtype=api_doc.argtypes.int, argdoc='person identifier') @api_doc.raises(exc=api_doc.excs.notfound, doc=_doc_exc_id_not_found) @api_doc.returns(rettype=api_doc.rettypes.dict, retdoc='The metadata of the person identified by person_id') def api_person(request, person_id): """Get information about a person. """ return _api_lookup( service.lookup_person, person_id, notfound_msg='Person with id {} not found.'.format(person_id)) diff --git a/swh/web/api/views/release.py b/swh/web/api/views/release.py index 53147436..4ef39aea 100644 --- a/swh/web/api/views/release.py +++ b/swh/web/api/views/release.py @@ -1,37 +1,38 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information -from swh.web.api import service, utils +from swh.web.common import service +from swh.web.api import utils from swh.web.api import apidoc as api_doc from swh.web.api.apiurls import api_route from swh.web.api.views import ( _api_lookup, _doc_exc_id_not_found, _doc_exc_bad_id ) @api_route(r'/release/(?P[0-9a-f]+)/', 'release') @api_doc.route('/release/') @api_doc.arg('sha1_git', default='7045404f3d1c54e6473c71bbb716529fbad4be24', argtype=api_doc.argtypes.sha1_git, argdoc='release identifier') @api_doc.raises(exc=api_doc.excs.badinput, doc=_doc_exc_bad_id) @api_doc.raises(exc=api_doc.excs.notfound, doc=_doc_exc_id_not_found) @api_doc.returns(rettype=api_doc.rettypes.dict, retdoc='The metadata of the release identified by sha1_git') def api_release(request, sha1_git): """Get information about a release. Releases are identified by SHA1 checksums, compatible with Git tag identifiers. See ``release_identifier`` in our `data model module `_ for details about how they are computed. """ error_msg = 'Release with sha1_git %s not found.' % sha1_git return _api_lookup( service.lookup_release, sha1_git, notfound_msg=error_msg, enrich_fn=utils.enrich_release) diff --git a/swh/web/api/views/revision.py b/swh/web/api/views/revision.py index d1df7f86..80c821f3 100644 --- a/swh/web/api/views/revision.py +++ b/swh/web/api/views/revision.py @@ -1,419 +1,421 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from django.http import HttpResponse -from swh.web.api.utils import reverse -from swh.web.api import service, utils +from swh.web.common import service +from swh.web.common.utils import reverse +from swh.web.common.utils import parse_timestamp +from swh.web.api import utils from swh.web.api import apidoc as api_doc from swh.web.api.apiurls import api_route from swh.web.api.views import ( _api_lookup, _doc_exc_id_not_found, _doc_header_link, _doc_arg_per_page, _doc_exc_bad_id, _doc_ret_revision_log, _doc_ret_revision_meta ) def _revision_directory_by(revision, path, request_path, limit=100, with_data=False): """Compute the revision matching criterion's directory or content data. Args: revision: dictionary of criterions representing a revision to lookup path: directory's path to lookup request_path: request path which holds the original context to limit: optional query parameter to limit the revisions log (default to 100). For now, note that this limit could impede the transitivity conclusion about sha1_git not being an ancestor of with_data: indicate to retrieve the content's raw data if path resolves to a content. """ def enrich_directory_local(dir, context_url=request_path): return utils.enrich_directory(dir, context_url) rev_id, result = service.lookup_directory_through_revision( revision, path, limit=limit, with_data=with_data) content = result['content'] if result['type'] == 'dir': # dir_entries result['content'] = list(map(enrich_directory_local, content)) else: # content result['content'] = utils.enrich_content(content) return result @api_route(r'/revision/origin/(?P[0-9]+)' r'/branch/(?P.+)/log/', 'revision-origin-log') @api_route(r'/revision/origin/(?P[0-9]+)/log/', 'revision-origin-log') @api_route(r'/revision/origin/(?P[0-9]+)' r'/ts/(?P.+)/log/', 'revision-origin-log') @api_route(r'/revision/origin/(?P[0-9]+)' r'/branch/(?P.+)' r'/ts/(?P.+)/log/', 'revision-origin-log') @api_doc.route('/revision/origin/log/') @api_doc.arg('origin_id', default=1, argtype=api_doc.argtypes.int, argdoc="The revision's SWH origin identifier") @api_doc.arg('branch_name', default='refs/heads/master', argtype=api_doc.argtypes.path, argdoc="""(Optional) The revision's branch name within the origin specified. Defaults to 'refs/heads/master'.""") @api_doc.arg('ts', default='2000-01-17T11:23:54+00:00', argtype=api_doc.argtypes.ts, argdoc="""(Optional) A time or timestamp string to parse""") @api_doc.header('Link', doc=_doc_header_link) @api_doc.param('per_page', default=10, argtype=api_doc.argtypes.int, doc=_doc_arg_per_page) @api_doc.raises(exc=api_doc.excs.notfound, doc=_doc_exc_id_not_found) @api_doc.returns(rettype=api_doc.rettypes.dict, retdoc=_doc_ret_revision_log) def api_revision_log_by(request, origin_id, branch_name='refs/heads/master', ts=None): """Show the commit log for a revision, searching for it based on software origin, branch name, and/or visit timestamp. This endpoint behaves like ``/log``, but operates on the revision that has been found at a given software origin, close to a given point in time, pointed by a given branch. """ result = {} per_page = int(utils.get_query_params(request).get('per_page', '10')) if ts: - ts = utils.parse_timestamp(ts) + ts = parse_timestamp(ts) def lookup_revision_log_by_with_limit(o_id, br, ts, limit=per_page+1): return service.lookup_revision_log_by(o_id, br, ts, limit) error_msg = 'No revision matching origin %s ' % origin_id error_msg += ', branch name %s' % branch_name error_msg += (' and time stamp %s.' % ts) if ts else '.' rev_get = _api_lookup( lookup_revision_log_by_with_limit, origin_id, branch_name, ts, notfound_msg=error_msg, enrich_fn=utils.enrich_revision) l = len(rev_get) if l == per_page+1: revisions = rev_get[:-1] last_sha1_git = rev_get[-1]['id'] params = {k: v for k, v in {'origin_id': origin_id, 'branch_name': branch_name, 'ts': ts, }.items() if v is not None} query_params = {} query_params['sha1_git'] = last_sha1_git if utils.get_query_params(request).get('per_page'): query_params['per_page'] = per_page result['headers'] = { 'link-next': reverse('revision-origin-log', kwargs=params, query_params=query_params) } else: revisions = rev_get result.update({'results': revisions}) return result @api_route(r'/revision/origin/(?P[0-9]+)/directory/', 'revision-directory') @api_route(r'/revision/origin/(?P[0-9]+)/directory/(?P.+)/', 'revision-directory') @api_route(r'/revision/origin/(?P[0-9]+)' r'/branch/(?P.+)/directory/', 'revision-directory') @api_route(r'/revision/origin/(?P[0-9]+)' r'/branch/(?P.+)/ts/(?P.+)/directory/', 'revision-directory') @api_route(r'/revision/origin/(?P[0-9]+)' r'/branch/(?P.+)/directory/(?P.+)/', 'revision-directory') @api_route(r'/revision/origin/(?P[0-9]+)' r'/branch/(?P.+)/ts/(?P.+)' r'/directory/(?P.+)/', 'revision-directory') @api_doc.route('/revision/origin/directory/', tags=['hidden']) @api_doc.arg('origin_id', default=1, argtype=api_doc.argtypes.int, argdoc="The revision's origin's SWH identifier") @api_doc.arg('branch_name', default='refs/heads/master', argtype=api_doc.argtypes.path, argdoc="""The optional branch for the given origin (default to master""") @api_doc.arg('ts', default='2000-01-17T11:23:54+00:00', argtype=api_doc.argtypes.ts, argdoc="""Optional timestamp (default to the nearest time crawl of timestamp)""") @api_doc.arg('path', default='Dockerfile', argtype=api_doc.argtypes.path, argdoc='The path to the directory or file to display') @api_doc.raises(exc=api_doc.excs.notfound, doc=_doc_exc_id_not_found) @api_doc.returns(rettype=api_doc.rettypes.dict, retdoc="""The metadata of the revision corresponding to the given criteria""") def api_directory_through_revision_origin(request, origin_id, branch_name="refs/heads/master", ts=None, path=None, with_data=False): """Display directory or content information through a revision identified by origin/branch/timestamp. """ if ts: - ts = utils.parse_timestamp(ts) + ts = parse_timestamp(ts) return _revision_directory_by({'origin_id': origin_id, 'branch_name': branch_name, 'ts': ts }, path, request.path, with_data=with_data) @api_route(r'/revision/origin/(?P[0-9]+)/', 'revision-origin') @api_route(r'/revision/origin/(?P[0-9]+)' r'/branch/(?P.+)/', 'revision-origin') @api_route(r'/revision/origin/(?P[0-9]+)' r'/branch/(?P.+)/ts/(?P.+)/', 'revision-origin') @api_route(r'/revision/origin/(?P[0-9]+)/ts/(?P.+)/', 'revision-origin') @api_doc.route('/revision/origin/') @api_doc.arg('origin_id', default=1, argtype=api_doc.argtypes.int, argdoc='software origin identifier') @api_doc.arg('branch_name', default='refs/heads/master', argtype=api_doc.argtypes.path, argdoc="""(optional) fully-qualified branch name, e.g., "refs/heads/master". Defaults to the master branch.""") @api_doc.arg('ts', default=None, argtype=api_doc.argtypes.ts, argdoc="""(optional) timestamp close to which the revision pointed by the given branch should be looked up. Defaults to now.""") @api_doc.raises(exc=api_doc.excs.notfound, doc=_doc_exc_id_not_found) @api_doc.returns(rettype=api_doc.rettypes.dict, retdoc=_doc_ret_revision_meta) def api_revision_with_origin(request, origin_id, branch_name="refs/heads/master", ts=None): """Get information about a revision, searching for it based on software origin, branch name, and/or visit timestamp. This endpoint behaves like ``/revision``, but operates on the revision that has been found at a given software origin, close to a given point in time, pointed by a given branch. """ - ts = utils.parse_timestamp(ts) + ts = parse_timestamp(ts) return _api_lookup( service.lookup_revision_by, origin_id, branch_name, ts, notfound_msg=('Revision with (origin_id: {}, branch_name: {}' ', ts: {}) not found.'.format(origin_id, branch_name, ts)), enrich_fn=utils.enrich_revision) @api_route(r'/revision/(?P[0-9a-f]+)/prev/(?P[0-9a-f/]+)/', 'revision-context') @api_doc.route('/revision/prev/', tags=['hidden']) @api_doc.arg('sha1_git', default='ec72c666fb345ea5f21359b7bc063710ce558e39', argtype=api_doc.argtypes.sha1_git, argdoc="The revision's sha1_git identifier") @api_doc.arg('context', default='6adc4a22f20bbf3bbc754f1ec8c82be5dfb5c71a', argtype=api_doc.argtypes.path, argdoc='The navigation breadcrumbs -- use at your own risk') @api_doc.raises(exc=api_doc.excs.badinput, doc=_doc_exc_bad_id) @api_doc.raises(exc=api_doc.excs.notfound, doc=_doc_exc_id_not_found) @api_doc.returns(rettype=api_doc.rettypes.dict, retdoc='The metadata of the revision identified by sha1_git') def api_revision_with_context(request, sha1_git, context): """Return information about revision with id sha1_git. """ def _enrich_revision(revision, context=context): return utils.enrich_revision(revision, context) return _api_lookup( service.lookup_revision, sha1_git, notfound_msg='Revision with sha1_git %s not found.' % sha1_git, enrich_fn=_enrich_revision) @api_route(r'/revision/(?P[0-9a-f]+)/', 'revision') @api_doc.route('/revision/') @api_doc.arg('sha1_git', default='aafb16d69fd30ff58afdd69036a26047f3aebdc6', argtype=api_doc.argtypes.sha1_git, argdoc="revision identifier") @api_doc.raises(exc=api_doc.excs.badinput, doc=_doc_exc_bad_id) @api_doc.raises(exc=api_doc.excs.notfound, doc=_doc_exc_id_not_found) @api_doc.returns(rettype=api_doc.rettypes.dict, retdoc=_doc_ret_revision_meta) def api_revision(request, sha1_git): """Get information about a revision. Revisions are identified by SHA1 checksums, compatible with Git commit identifiers. See ``revision_identifier`` in our `data model module `_ for details about how they are computed. """ return _api_lookup( service.lookup_revision, sha1_git, notfound_msg='Revision with sha1_git {} not found.'.format(sha1_git), enrich_fn=utils.enrich_revision) @api_route(r'/revision/(?P[0-9a-f]+)/raw/', 'revision-raw-message') @api_doc.route('/revision/raw/', tags=['hidden'], handle_response=True) @api_doc.arg('sha1_git', default='ec72c666fb345ea5f21359b7bc063710ce558e39', argtype=api_doc.argtypes.sha1_git, argdoc="The queried revision's sha1_git identifier") @api_doc.raises(exc=api_doc.excs.badinput, doc=_doc_exc_bad_id) @api_doc.raises(exc=api_doc.excs.notfound, doc=_doc_exc_id_not_found) @api_doc.returns(rettype=api_doc.rettypes.octet_stream, retdoc="""The message of the revision identified by sha1_git as a downloadable octet stream""") def api_revision_raw_message(request, sha1_git): """Return the raw data of the message of revision identified by sha1_git """ raw = service.lookup_revision_message(sha1_git) response = HttpResponse(raw['message'], content_type='application/octet-stream') response['Content-disposition'] = \ 'attachment;filename=rev_%s_raw' % sha1_git return response @api_route(r'/revision/(?P[0-9a-f]+)/directory/', 'revision-directory') @api_route(r'/revision/(?P[0-9a-f]+)/directory/(?P.+)/', 'revision-directory') @api_doc.route('/revision/directory/') @api_doc.arg('sha1_git', default='ec72c666fb345ea5f21359b7bc063710ce558e39', argtype=api_doc.argtypes.sha1_git, argdoc='revision identifier') @api_doc.arg('dir_path', default='Documentation/BUG-HUNTING', argtype=api_doc.argtypes.path, argdoc="""path relative to the root directory of revision identifier by sha1_git""") @api_doc.raises(exc=api_doc.excs.badinput, doc=_doc_exc_bad_id) @api_doc.raises(exc=api_doc.excs.notfound, doc=_doc_exc_id_not_found) @api_doc.returns(rettype=api_doc.rettypes.dict, retdoc="""either a list of directory entries with their metadata, or the metadata of a single directory entry""") def api_revision_directory(request, sha1_git, dir_path=None, with_data=False): """Get information about directory (entry) objects associated to revisions. Each revision is associated to a single "root" directory. This endpoint behaves like ``/directory/``, but operates on the root directory associated to a given revision. """ return _revision_directory_by({'sha1_git': sha1_git}, dir_path, request.path, with_data=with_data) @api_route(r'/revision/(?P[0-9a-f]+)/log/', 'revision-log') @api_route(r'/revision/(?P[0-9a-f]+)' r'/prev/(?P[0-9a-f/]+)/log/', 'revision-log') @api_doc.route('/revision/log/') @api_doc.arg('sha1_git', default='37fc9e08d0c4b71807a4f1ecb06112e78d91c283', argtype=api_doc.argtypes.sha1_git, argdoc='revision identifier') @api_doc.arg('prev_sha1s', default='6adc4a22f20bbf3bbc754f1ec8c82be5dfb5c71a', argtype=api_doc.argtypes.path, argdoc="""(Optional) Navigation breadcrumbs (descendant revisions previously visited). If multiple values, use / as delimiter. """) @api_doc.header('Link', doc=_doc_header_link) @api_doc.param('per_page', default=10, argtype=api_doc.argtypes.int, doc=_doc_arg_per_page) @api_doc.raises(exc=api_doc.excs.badinput, doc=_doc_exc_bad_id) @api_doc.raises(exc=api_doc.excs.notfound, doc=_doc_exc_id_not_found) @api_doc.returns(rettype=api_doc.rettypes.dict, retdoc=_doc_ret_revision_log) def api_revision_log(request, sha1_git, prev_sha1s=None): """Get a list of all revisions heading to a given one, i.e., show the commit log. """ result = {} per_page = int(utils.get_query_params(request).get('per_page', '10')) def lookup_revision_log_with_limit(s, limit=per_page+1): return service.lookup_revision_log(s, limit) error_msg = 'Revision with sha1_git %s not found.' % sha1_git rev_get = _api_lookup(lookup_revision_log_with_limit, sha1_git, notfound_msg=error_msg, enrich_fn=utils.enrich_revision) l = len(rev_get) if l == per_page+1: rev_backward = rev_get[:-1] new_last_sha1 = rev_get[-1]['id'] query_params = {} if utils.get_query_params(request).get('per_page'): query_params['per_page'] = per_page result['headers'] = { 'link-next': reverse('revision-log', kwargs={'sha1_git': new_last_sha1}, query_params=query_params) } else: rev_backward = rev_get if not prev_sha1s: # no nav breadcrumbs, so we're done revisions = rev_backward else: rev_forward_ids = prev_sha1s.split('/') rev_forward = _api_lookup( service.lookup_revision_multiple, rev_forward_ids, notfound_msg=error_msg, enrich_fn=utils.enrich_revision) revisions = rev_forward + rev_backward result.update({ 'results': revisions }) return result diff --git a/swh/web/api/views/stat.py b/swh/web/api/views/stat.py index 8fb58549..e0a5e097 100644 --- a/swh/web/api/views/stat.py +++ b/swh/web/api/views/stat.py @@ -1,20 +1,20 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information -from swh.web.api import service +from swh.web.common import service from swh.web.api import apidoc as api_doc from swh.web.api.apiurls import api_route @api_route(r'/stat/counters/', 'stat-counters') @api_doc.route('/stat/counters/', noargs=True) @api_doc.returns(rettype=api_doc.rettypes.dict, retdoc="""dictionary mapping object types to the amount of corresponding objects currently available in the archive""") def api_stats(request): """Get statistics about the content of the archive. """ return service.stat_counters() diff --git a/swh/web/api/converters.py b/swh/web/common/converters.py similarity index 100% rename from swh/web/api/converters.py rename to swh/web/common/converters.py diff --git a/swh/web/api/exc.py b/swh/web/common/exc.py similarity index 100% rename from swh/web/api/exc.py rename to swh/web/common/exc.py diff --git a/swh/web/api/query.py b/swh/web/common/query.py similarity index 98% rename from swh/web/api/query.py rename to swh/web/common/query.py index 7e95180b..26627ac9 100644 --- a/swh/web/api/query.py +++ b/swh/web/common/query.py @@ -1,111 +1,111 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import re from uuid import UUID from swh.model.hashutil import ALGORITHMS, hash_to_bytes -from swh.web.api.exc import BadInputExc +from swh.web.common.exc import BadInputExc SHA256_RE = re.compile(r'^[0-9a-f]{64}$', re.IGNORECASE) SHA1_RE = re.compile(r'^[0-9a-f]{40}$', re.IGNORECASE) def parse_hash(q): """Detect the hash type of a user submitted query string. Args: query string with the following format: "[HASH_TYPE:]HEX_CHECKSUM", where HASH_TYPE is optional, defaults to "sha1", and can be one of swh.model.hashutil.ALGORITHMS Returns: A pair (hash_algorithm, byte hash value) Raises: ValueError if the given query string does not correspond to a valid hash value """ def guess_algo(q): if SHA1_RE.match(q): return 'sha1' elif SHA256_RE.match(q): return 'sha256' else: raise BadInputExc('Invalid checksum query string %s' % q) def check_algo(algo, hex): if (algo in {'sha1', 'sha1_git'} and not SHA1_RE.match(hex)) \ or (algo == 'sha256' and not SHA256_RE.match(hex)): raise BadInputExc('Invalid hash %s for algorithm %s' % (hex, algo)) parts = q.split(':') if len(parts) > 2: raise BadInputExc('Invalid checksum query string %s' % q) elif len(parts) == 1: parts = (guess_algo(q), q) elif len(parts) == 2: check_algo(parts[0], parts[1]) algo = parts[0] if algo not in ALGORITHMS: raise BadInputExc('Unknown hash algorithm %s' % algo) return (algo, hash_to_bytes(parts[1])) def parse_hash_with_algorithms_or_throws(q, accepted_algo, error_msg): """Parse a query but only accepts accepted_algo. Otherwise, raise the exception with message error_msg. Args: - q: query string with the following format: "[HASH_TYPE:]HEX_CHECKSUM" where HASH_TYPE is optional, defaults to "sha1", and can be one of swh.model.hashutil.ALGORITHMS. - accepted_algo: array of strings representing the names of accepted algorithms. - error_msg: error message to raise as BadInputExc if the algo of the query does not match. Returns: A pair (hash_algorithm, byte hash value) Raises: BadInputExc when the inputs is invalid or does not validate the accepted algorithms. """ algo, hash = parse_hash(q) if algo not in accepted_algo: raise BadInputExc(error_msg) return (algo, hash) def parse_uuid4(uuid): """Parse an uuid 4 from a string. Args: uuid: String representing an uuid. Returns: The uuid as is if everything is ok. Raises: BadInputExc: if the uuid is invalid. """ try: UUID(uuid, version=4) except ValueError as e: # not a valid hex code for a UUID raise BadInputExc(str(e)) return uuid diff --git a/swh/web/api/service.py b/swh/web/common/service.py similarity index 99% rename from swh/web/api/service.py rename to swh/web/common/service.py index 21957ae9..ff3d9941 100644 --- a/swh/web/api/service.py +++ b/swh/web/common/service.py @@ -1,815 +1,815 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import os from collections import defaultdict from swh.model import hashutil -from swh.web.api import converters -from swh.web.api import query -from swh.web.api.exc import NotFoundExc +from swh.web.common import converters +from swh.web.common import query +from swh.web.common.exc import NotFoundExc from swh.web import config storage = config.storage() MAX_LIMIT = 50 # Top limit the users can ask for def _first_element(l): """Returns the first element in the provided list or None if it is empty or None""" return next(iter(l or []), None) def lookup_multiple_hashes(hashes): """Lookup the passed hashes in a single DB connection, using batch processing. Args: An array of {filename: X, sha1: Y}, string X, hex sha1 string Y. Returns: The same array with elements updated with elem['found'] = true if the hash is present in storage, elem['found'] = false if not. """ hashlist = [hashutil.hash_to_bytes(elem['sha1']) for elem in hashes] content_missing = storage.content_missing_per_sha1(hashlist) missing = [hashutil.hash_to_hex(x) for x in content_missing] for x in hashes: x.update({'found': True}) for h in hashes: if h['sha1'] in missing: h['found'] = False return hashes def lookup_expression(expression, last_sha1, per_page): """Lookup expression in raw content. Args: expression (str): An expression to lookup through raw indexed content last_sha1 (str): Last sha1 seen per_page (int): Number of results per page Returns: List of ctags whose content match the expression """ limit = min(per_page, MAX_LIMIT) ctags = storage.content_ctags_search(expression, last_sha1=last_sha1, limit=limit) for ctag in ctags: ctag = converters.from_swh(ctag, hashess={'id'}) ctag['sha1'] = ctag['id'] ctag.pop('id') yield ctag def lookup_hash(q): """Checks if the storage contains a given content checksum Args: query string of the form Returns: Dict with key found containing the hash info if the hash is present, None if not. """ algo, hash = query.parse_hash(q) found = storage.content_find({algo: hash}) return {'found': found, 'algo': algo} def search_hash(q): """Checks if the storage contains a given content checksum Args: query string of the form Returns: Dict with key found to True or False, according to whether the checksum is present or not """ algo, hash = query.parse_hash(q) found = storage.content_find({algo: hash}) return {'found': found is not None} def lookup_content_provenance(q): """Return provenance information from a specified content. Args: q: query string of the form Yields: provenance information (dict) list if the content is found. """ algo, hash = query.parse_hash(q) provenances = storage.content_find_provenance({algo: hash}) if not provenances: return None return (converters.from_provenance(p) for p in provenances) def _lookup_content_sha1(q): """Given a possible input, query for the content's sha1. Args: q: query string of the form Returns: binary sha1 if found or None """ algo, hash = query.parse_hash(q) if algo != 'sha1': hashes = storage.content_find({algo: hash}) if not hashes: return None return hashes['sha1'] return hash def lookup_content_ctags(q): """Return ctags information from a specified content. Args: q: query string of the form Yields: ctags information (dict) list if the content is found. """ sha1 = _lookup_content_sha1(q) if not sha1: return None ctags = list(storage.content_ctags_get([sha1])) if not ctags: return None for ctag in ctags: yield converters.from_swh(ctag, hashess={'id'}) def lookup_content_filetype(q): """Return filetype information from a specified content. Args: q: query string of the form Yields: filetype information (dict) list if the content is found. """ sha1 = _lookup_content_sha1(q) if not sha1: return None filetype = _first_element(list(storage.content_mimetype_get([sha1]))) if not filetype: return None return converters.from_filetype(filetype) def lookup_content_language(q): """Return language information from a specified content. Args: q: query string of the form Yields: language information (dict) list if the content is found. """ sha1 = _lookup_content_sha1(q) if not sha1: return None lang = _first_element(list(storage.content_language_get([sha1]))) if not lang: return None return converters.from_swh(lang, hashess={'id'}) def lookup_content_license(q): """Return license information from a specified content. Args: q: query string of the form Yields: license information (dict) list if the content is found. """ sha1 = _lookup_content_sha1(q) if not sha1: return None lang = _first_element(storage.content_fossology_license_get([sha1])) if not lang: return None return converters.from_swh(lang, hashess={'id'}) def lookup_origin(origin): """Return information about the origin matching dict origin. Args: origin: origin's dict with keys either 'id' or ('type' AND 'url') Returns: origin information as dict. """ return converters.from_origin(storage.origin_get(origin)) def lookup_person(person_id): """Return information about the person with id person_id. Args: person_id as string Returns: person information as dict. """ person = _first_element(storage.person_get([person_id])) return converters.from_person(person) def lookup_directory(sha1_git): """Return information about the directory with id sha1_git. Args: sha1_git as string Returns: directory information as dict. """ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( sha1_git, ['sha1'], # HACK: sha1_git really 'Only sha1_git is supported.') dir = _first_element(storage.directory_get([sha1_git_bin])) if not dir: return None directory_entries = storage.directory_ls(sha1_git_bin) or [] return map(converters.from_directory_entry, directory_entries) def lookup_directory_with_path(directory_sha1_git, path_string): """Return directory information for entry with path path_string w.r.t. root directory pointed by directory_sha1_git Args: - directory_sha1_git: sha1_git corresponding to the directory to which we append paths to (hopefully) find the entry - the relative path to the entry starting from the directory pointed by directory_sha1_git Raises: NotFoundExc if the directory entry is not found """ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( directory_sha1_git, ['sha1'], 'Only sha1_git is supported.') paths = path_string.strip(os.path.sep).split(os.path.sep) queried_dir = storage.directory_entry_get_by_path( sha1_git_bin, list(map(lambda p: p.encode('utf-8'), paths))) if not queried_dir: raise NotFoundExc(('Directory entry with path %s from %s not found') % (path_string, directory_sha1_git)) return converters.from_directory_entry(queried_dir) def lookup_release(release_sha1_git): """Return information about the release with sha1 release_sha1_git. Args: release_sha1_git: The release's sha1 as hexadecimal Returns: Release information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. """ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( release_sha1_git, ['sha1'], 'Only sha1_git is supported.') res = _first_element(storage.release_get([sha1_git_bin])) return converters.from_release(res) def lookup_revision(rev_sha1_git): """Return information about the revision with sha1 revision_sha1_git. Args: revision_sha1_git: The revision's sha1 as hexadecimal Returns: Revision information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. """ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( rev_sha1_git, ['sha1'], 'Only sha1_git is supported.') revision = _first_element(storage.revision_get([sha1_git_bin])) return converters.from_revision(revision) def lookup_revision_multiple(sha1_git_list): """Return information about the revision with sha1 revision_sha1_git. Args: revision_sha1_git: The revision's sha1 as hexadecimal Returns: Revision information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. """ def to_sha1_bin(sha1_hex): _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( sha1_hex, ['sha1'], 'Only sha1_git is supported.') return sha1_git_bin sha1_bin_list = (to_sha1_bin(x) for x in sha1_git_list) revisions = storage.revision_get(sha1_bin_list) or [] return (converters.from_revision(x) for x in revisions) def lookup_revision_message(rev_sha1_git): """Return the raw message of the revision with sha1 revision_sha1_git. Args: revision_sha1_git: The revision's sha1 as hexadecimal Returns: Decoded revision message as dict {'message': } Raises: ValueError if the identifier provided is not of sha1 nature. NotFoundExc if the revision is not found, or if it has no message """ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( rev_sha1_git, ['sha1'], 'Only sha1_git is supported.') revision = _first_element(storage.revision_get([sha1_git_bin])) if not revision: raise NotFoundExc('Revision with sha1_git %s not found.' % rev_sha1_git) if 'message' not in revision: raise NotFoundExc('No message for revision with sha1_git %s.' % rev_sha1_git) res = {'message': revision['message']} return res def lookup_revision_by(origin_id, branch_name="refs/heads/master", timestamp=None): """Lookup revisions by origin_id, branch_name and timestamp. If: - branch_name is not provided, lookup using 'refs/heads/master' as default. - ts is not provided, use the most recent Args: - origin_id: origin of the revision. - branch_name: revision's branch. - timestamp: revision's time frame. Yields: The revisions matching the criterions. """ res = _first_element(storage.revision_get_by(origin_id, branch_name, timestamp=timestamp, limit=1)) return converters.from_revision(res) def lookup_revision_log(rev_sha1_git, limit): """Return information about the revision with sha1 revision_sha1_git. Args: revision_sha1_git: The revision's sha1 as hexadecimal limit: the maximum number of revisions returned Returns: Revision information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. """ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( rev_sha1_git, ['sha1'], 'Only sha1_git is supported.') revision_entries = storage.revision_log([sha1_git_bin], limit) return map(converters.from_revision, revision_entries) def lookup_revision_log_by(origin_id, branch_name, timestamp, limit): """Return information about the revision with sha1 revision_sha1_git. Args: origin_id: origin of the revision branch_name: revision's branch timestamp: revision's time frame limit: the maximum number of revisions returned Returns: Revision information as dict. Raises: NotFoundExc if no revision corresponds to the criterion NotFoundExc if the corresponding revision has no log """ revision_entries = storage.revision_log_by(origin_id, branch_name, timestamp, limit=limit) if not revision_entries: return None return map(converters.from_revision, revision_entries) def lookup_revision_with_context_by(origin_id, branch_name, ts, sha1_git, limit=100): """Return information about revision sha1_git, limited to the sub-graph of all transitive parents of sha1_git_root. sha1_git_root being resolved through the lookup of a revision by origin_id, branch_name and ts. In other words, sha1_git is an ancestor of sha1_git_root. Args: - origin_id: origin of the revision. - branch_name: revision's branch. - timestamp: revision's time frame. - sha1_git: one of sha1_git_root's ancestors. - limit: limit the lookup to 100 revisions back. Returns: Pair of (root_revision, revision). Information on sha1_git if it is an ancestor of sha1_git_root including children leading to sha1_git_root Raises: - BadInputExc in case of unknown algo_hash or bad hash. - NotFoundExc if either revision is not found or if sha1_git is not an ancestor of sha1_git_root. """ rev_root = _first_element(storage.revision_get_by(origin_id, branch_name, timestamp=ts, limit=1)) if not rev_root: raise NotFoundExc('Revision with (origin_id: %s, branch_name: %s' ', ts: %s) not found.' % (origin_id, branch_name, ts)) return (converters.from_revision(rev_root), lookup_revision_with_context(rev_root, sha1_git, limit)) def lookup_revision_with_context(sha1_git_root, sha1_git, limit=100): """Return information about revision sha1_git, limited to the sub-graph of all transitive parents of sha1_git_root. In other words, sha1_git is an ancestor of sha1_git_root. Args: sha1_git_root: latest revision. The type is either a sha1 (as an hex string) or a non converted dict. sha1_git: one of sha1_git_root's ancestors limit: limit the lookup to 100 revisions back Returns: Information on sha1_git if it is an ancestor of sha1_git_root including children leading to sha1_git_root Raises: BadInputExc in case of unknown algo_hash or bad hash NotFoundExc if either revision is not found or if sha1_git is not an ancestor of sha1_git_root """ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( sha1_git, ['sha1'], 'Only sha1_git is supported.') revision = _first_element(storage.revision_get([sha1_git_bin])) if not revision: raise NotFoundExc('Revision %s not found' % sha1_git) if isinstance(sha1_git_root, str): _, sha1_git_root_bin = query.parse_hash_with_algorithms_or_throws( sha1_git_root, ['sha1'], 'Only sha1_git is supported.') revision_root = _first_element(storage.revision_get([sha1_git_root_bin])) # noqa if not revision_root: raise NotFoundExc('Revision root %s not found' % sha1_git_root) else: sha1_git_root_bin = sha1_git_root['id'] revision_log = storage.revision_log([sha1_git_root_bin], limit) parents = {} children = defaultdict(list) for rev in revision_log: rev_id = rev['id'] parents[rev_id] = [] for parent_id in rev['parents']: parents[rev_id].append(parent_id) children[parent_id].append(rev_id) if revision['id'] not in parents: raise NotFoundExc('Revision %s is not an ancestor of %s' % (sha1_git, sha1_git_root)) revision['children'] = children[revision['id']] return converters.from_revision(revision) def lookup_directory_with_revision(sha1_git, dir_path=None, with_data=False): """Return information on directory pointed by revision with sha1_git. If dir_path is not provided, display top level directory. Otherwise, display the directory pointed by dir_path (if it exists). Args: sha1_git: revision's hash. dir_path: optional directory pointed to by that revision. with_data: boolean that indicates to retrieve the raw data if the path resolves to a content. Default to False (for the api) Returns: Information on the directory pointed to by that revision. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc either if the revision is not found or the path referenced does not exist. NotImplementedError in case of dir_path exists but do not reference a type 'dir' or 'file'. """ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( sha1_git, ['sha1'], 'Only sha1_git is supported.') revision = _first_element(storage.revision_get([sha1_git_bin])) if not revision: raise NotFoundExc('Revision %s not found' % sha1_git) dir_sha1_git_bin = revision['directory'] if dir_path: paths = dir_path.strip(os.path.sep).split(os.path.sep) entity = storage.directory_entry_get_by_path( dir_sha1_git_bin, list(map(lambda p: p.encode('utf-8'), paths))) if not entity: raise NotFoundExc( "Directory or File '%s' pointed to by revision %s not found" % (dir_path, sha1_git)) else: entity = {'type': 'dir', 'target': dir_sha1_git_bin} if entity['type'] == 'dir': directory_entries = storage.directory_ls(entity['target']) or [] return {'type': 'dir', 'path': '.' if not dir_path else dir_path, 'revision': sha1_git, 'content': map(converters.from_directory_entry, directory_entries)} elif entity['type'] == 'file': # content content = storage.content_find({'sha1_git': entity['target']}) if with_data: c = _first_element(storage.content_get([content['sha1']])) content['data'] = c['data'] return {'type': 'file', 'path': '.' if not dir_path else dir_path, 'revision': sha1_git, 'content': converters.from_content(content)} else: raise NotImplementedError('Entity of type %s not implemented.' % entity['type']) def lookup_content(q): """Lookup the content designed by q. Args: q: The release's sha1 as hexadecimal """ algo, hash = query.parse_hash(q) c = storage.content_find({algo: hash}) return converters.from_content(c) def lookup_content_raw(q): """Lookup the content defined by q. Args: q: query string of the form Returns: dict with 'sha1' and 'data' keys. data representing its raw data decoded. """ algo, hash = query.parse_hash(q) c = storage.content_find({algo: hash}) if not c: return None content = _first_element(storage.content_get([c['sha1']])) return converters.from_content(content) def stat_counters(): """Return the stat counters for Software Heritage Returns: A dict mapping textual labels to integer values. """ return storage.stat_counters() def _lookup_origin_visits(origin_id, last_visit=None, limit=10): """Yields the origin origin_ids' visits. Args: origin_id (int): origin to list visits for last_visit (int): last visit to lookup from limit (int): Number of elements max to display Yields: Dictionaries of origin_visit for that origin """ limit = min(limit, MAX_LIMIT) yield from storage.origin_visit_get( origin_id, last_visit=last_visit, limit=limit) def lookup_origin_visits(origin_id, last_visit=None, per_page=10): """Yields the origin origin_ids' visits. Args: origin_id: origin to list visits for Yields: Dictionaries of origin_visit for that origin """ visits = _lookup_origin_visits(origin_id, last_visit=last_visit, limit=per_page) for visit in visits: yield converters.from_origin_visit(visit) def lookup_origin_visit(origin_id, visit_id): """Return information about visit visit_id with origin origin_id. Args: origin_id: origin concerned by the visit visit_id: the visit identifier to lookup Yields: The dict origin_visit concerned """ visit = storage.origin_visit_get_by(origin_id, visit_id) return converters.from_origin_visit(visit) def lookup_entity_by_uuid(uuid): """Return the entity's hierarchy from its uuid. Args: uuid: entity's identifier. Returns: List of hierarchy entities from the entity with uuid. """ uuid = query.parse_uuid4(uuid) for entity in storage.entity_get(uuid): entity = converters.from_swh(entity, convert={'last_seen', 'uuid'}, convert_fn=lambda x: str(x)) yield entity def lookup_revision_through(revision, limit=100): """Retrieve a revision from the criterion stored in revision dictionary. Args: revision: Dictionary of criterion to lookup the revision with. Here are the supported combination of possible values: - origin_id, branch_name, ts, sha1_git - origin_id, branch_name, ts - sha1_git_root, sha1_git - sha1_git Returns: None if the revision is not found or the actual revision. """ if 'origin_id' in revision and \ 'branch_name' in revision and \ 'ts' in revision and \ 'sha1_git' in revision: return lookup_revision_with_context_by(revision['origin_id'], revision['branch_name'], revision['ts'], revision['sha1_git'], limit) if 'origin_id' in revision and \ 'branch_name' in revision and \ 'ts' in revision: return lookup_revision_by(revision['origin_id'], revision['branch_name'], revision['ts']) if 'sha1_git_root' in revision and \ 'sha1_git' in revision: return lookup_revision_with_context(revision['sha1_git_root'], revision['sha1_git'], limit) if 'sha1_git' in revision: return lookup_revision(revision['sha1_git']) # this should not happen raise NotImplementedError('Should not happen!') def lookup_directory_through_revision(revision, path=None, limit=100, with_data=False): """Retrieve the directory information from the revision. Args: revision: dictionary of criterion representing a revision to lookup path: directory's path to lookup. limit: optional query parameter to limit the revisions log (default to 100). For now, note that this limit could impede the transitivity conclusion about sha1_git not being an ancestor of. with_data: indicate to retrieve the content's raw data if path resolves to a content. Returns: The directory pointing to by the revision criterions at path. """ rev = lookup_revision_through(revision, limit) if not rev: raise NotFoundExc('Revision with criterion %s not found!' % revision) return (rev['id'], lookup_directory_with_revision(rev['id'], path, with_data)) diff --git a/swh/web/common/swh_templatetags.py b/swh/web/common/swh_templatetags.py index 74fbaa5e..09ed6c09 100644 --- a/swh/web/common/swh_templatetags.py +++ b/swh/web/common/swh_templatetags.py @@ -1,90 +1,89 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import re from docutils.core import publish_parts from docutils.writers.html4css1 import Writer, HTMLTranslator from inspect import cleandoc from django import template from django.utils.safestring import mark_safe - from pygments import highlight from pygments.lexers import JsonLexer from pygments.formatters import HtmlFormatter register = template.Library() class NoHeaderHTMLTranslator(HTMLTranslator): """ Docutils translator subclass to customize the generation of HTML from reST-formatted docstrings """ def __init__(self, document): super().__init__(document) self.body_prefix = [] self.body_suffix = [] def visit_bullet_list(self, node): self.context.append((self.compact_simple, self.compact_p)) self.compact_p = None self.compact_simple = self.is_compactable(node) self.body.append(self.starttag(node, 'ul', CLASS='docstring')) DOCSTRING_WRITER = Writer() DOCSTRING_WRITER.translator_class = NoHeaderHTMLTranslator @register.filter def safe_docstring_display(docstring): """ Utility function to htmlize reST-formatted documentation in browsable api. """ docstring = cleandoc(docstring) return publish_parts(docstring, writer=DOCSTRING_WRITER)['html_body'] @register.filter def urlize_api_links(text): """Utility function for decorating api links in browsable api. Args: text: whose content matching links should be transformed into contextual API or Browse html links. Returns The text transformed if any link is found. The text as is otherwise. """ return re.sub(r'(/api/[^"<]*/|/browse/.*/)', r'\1', text) @register.filter def urlize_header_links(text): """Utility function for decorating headers links in browsable api. Args text: Text whose content contains Link header value Returns: The text transformed with html link if any link is found. The text as is otherwise. """ return re.sub(r'<(/api/.*|/browse/.*)>', r'<\1>', text) @register.filter def highlight_json(text): return mark_safe(highlight(text, JsonLexer(), HtmlFormatter())) diff --git a/swh/web/common/utils.py b/swh/web/common/utils.py new file mode 100644 index 00000000..265ae36f --- /dev/null +++ b/swh/web/common/utils.py @@ -0,0 +1,120 @@ +# Copyright (C) 2017 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import re + +from datetime import datetime, timezone +from dateutil import parser + +from swh.web.common.exc import BadInputExc + +import urllib + +from django.core import urlresolvers +from django.http import QueryDict + + +# override django reverse function in order to get +# the same result on debian jessie and stretch +# (see https://code.djangoproject.com/ticket/22223) +def reverse(viewname, args=None, kwargs=None, query_params=None, + current_app=None, urlconf=None): + """An override of django reverse function supporting multiple + django versions (from 1.7 to current) and query parameters. + + Args: + viewname: the name of the django view from which to compute + a url + args: list of url arguments ordered according to their position it + kwargs: dictionnary of url arguments indexed by their names + query_params: dictionnary of query parameters to append to the + reversed url + current_app: the name of the django app tighted to the view + urlconf: url configuration module + + Returns: + The url of the requested view with processed arguments and + query parameters + """ + + url = urllib.parse.unquote( + urlresolvers.reverse( + viewname, urlconf=urlconf, args=args, + kwargs=kwargs, current_app=current_app + ) + ) + if query_params and len(query_params) > 0: + query_dict = QueryDict('', mutable=True) + for k, v in query_params.items(): + query_dict[k] = v + url += ('?' + query_dict.urlencode()) + return url + + +def fmap(f, data): + """Map f to data at each level. + + This must keep the origin data structure type: + - map -> map + - dict -> dict + - list -> list + - None -> None + + Args: + f: function that expects one argument. + data: data to traverse to apply the f function. + list, map, dict or bare value. + + Returns: + The same data-structure with modified values by the f function. + + """ + if data is None: + return data + if isinstance(data, map): + return map(lambda y: fmap(f, y), (x for x in data)) + if isinstance(data, list): + return [fmap(f, x) for x in data] + if isinstance(data, dict): + return {k: fmap(f, v) for (k, v) in data.items()} + return f(data) + + +def parse_timestamp(timestamp): + """Given a time or timestamp (as string), parse the result as datetime. + + Returns: + a timezone-aware datetime representing the parsed value. + None if the parsing fails. + + Samples: + - 2016-01-12 + - 2016-01-12T09:19:12+0100 + - Today is January 1, 2047 at 8:21:00AM + - 1452591542 + + """ + if not timestamp: + return None + + try: + return parser.parse(timestamp, ignoretz=False, fuzzy=True) + except: + try: + return datetime.utcfromtimestamp(float(timestamp)).replace( + tzinfo=timezone.utc) + except (ValueError, OverflowError) as e: + raise BadInputExc(e) + + +def shorten_path(path): + """Shorten the given path: for each hash present, only return the first + 8 characters followed by an ellipsis""" + + sha256_re = r'([0-9a-f]{8})[0-9a-z]{56}' + sha1_re = r'([0-9a-f]{8})[0-9a-f]{32}' + + ret = re.sub(sha256_re, r'\1...', path) + return re.sub(sha1_re, r'\1...', ret) diff --git a/swh/web/tests/api/test_api_lookup.py b/swh/web/tests/api/test_api_lookup.py index e7d04c0c..8d35b869 100644 --- a/swh/web/tests/api/test_api_lookup.py +++ b/swh/web/tests/api/test_api_lookup.py @@ -1,126 +1,126 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from nose.tools import istest from .swh_api_testcase import SWHApiTestCase -from swh.web.api.exc import NotFoundExc +from swh.web.common.exc import NotFoundExc from swh.web.api import views class ApiLookupTestCase(SWHApiTestCase): @istest def generic_api_lookup_nothing_is_found(self): # given def test_generic_lookup_fn(sha1, another_unused_arg): assert another_unused_arg == 'unused_arg' assert sha1 == 'sha1' return None # when with self.assertRaises(NotFoundExc) as cm: views._api_lookup( test_generic_lookup_fn, 'sha1', 'unused_arg', notfound_msg='This will be raised because None is returned.') self.assertIn('This will be raised because None is returned.', cm.exception.args[0]) @istest def generic_api_map_are_enriched_and_transformed_to_list(self): # given def test_generic_lookup_fn_1(criteria0, param0, param1): assert criteria0 == 'something' return map(lambda x: x + 1, [1, 2, 3]) # when actual_result = views._api_lookup( test_generic_lookup_fn_1, 'something', 'some param 0', 'some param 1', notfound_msg=('This is not the error message you are looking for. ' 'Move along.'), enrich_fn=lambda x: x * 2) self.assertEqual(actual_result, [4, 6, 8]) @istest def generic_api_list_are_enriched_too(self): # given def test_generic_lookup_fn_2(crit): assert crit == 'something' return ['a', 'b', 'c'] # when actual_result = views._api_lookup( test_generic_lookup_fn_2, 'something', notfound_msg=('Not the error message you are looking for, it is. ' 'Along, you move!'), enrich_fn=lambda x: ''. join(['=', x, '='])) self.assertEqual(actual_result, ['=a=', '=b=', '=c=']) @istest def generic_api_generator_are_enriched_and_returned_as_list(self): # given def test_generic_lookup_fn_3(crit): assert crit == 'crit' return (i for i in [4, 5, 6]) # when actual_result = views._api_lookup( test_generic_lookup_fn_3, 'crit', notfound_msg='Move!', enrich_fn=lambda x: x - 1) self.assertEqual(actual_result, [3, 4, 5]) @istest def generic_api_simple_data_are_enriched_and_returned_too(self): # given def test_generic_lookup_fn_4(crit): assert crit == '123' return {'a': 10} def test_enrich_data(x): x['a'] = x['a'] * 10 return x # when actual_result = views._api_lookup( test_generic_lookup_fn_4, '123', notfound_msg='Nothing to do', enrich_fn=test_enrich_data) self.assertEqual(actual_result, {'a': 100}) @istest def api_lookup_not_found(self): # when with self.assertRaises(NotFoundExc) as e: views._api_lookup( lambda x: None, 'something', notfound_msg='this is the error message raised as it is None') self.assertEqual(e.exception.args[0], 'this is the error message raised as it is None') @istest def api_lookup_with_result(self): # when actual_result = views._api_lookup( lambda x: x + '!', 'something', notfound_msg='this is the error which won\'t be used here') self.assertEqual(actual_result, 'something!') @istest def api_lookup_with_result_as_map(self): # when actual_result = views._api_lookup( lambda x: map(lambda y: y+1, x), [1, 2, 3], notfound_msg='this is the error which won\'t be used here') self.assertEqual(actual_result, [2, 3, 4]) diff --git a/swh/web/tests/api/test_apiresponse.py b/swh/web/tests/api/test_apiresponse.py index 84bb15c1..0d204ba5 100644 --- a/swh/web/tests/api/test_apiresponse.py +++ b/swh/web/tests/api/test_apiresponse.py @@ -1,176 +1,176 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json import unittest from rest_framework.test import APIRequestFactory from nose.tools import istest from unittest.mock import patch from swh.web.api.apiresponse import ( compute_link_header, transform, make_api_response, filter_by_fields ) api_request_factory = APIRequestFactory() class SWHComputeLinkHeaderTest(unittest.TestCase): @istest def compute_link_header(self): rv = { 'headers': {'link-next': 'foo', 'link-prev': 'bar'}, 'results': [1, 2, 3] } options = {} # when headers = compute_link_header( rv, options) self.assertEquals(headers, { 'Link': '; rel="next",; rel="previous"', }) @istest def compute_link_header_nothing_changed(self): rv = {} options = {} # when headers = compute_link_header( rv, options) self.assertEquals(headers, {}) @istest def compute_link_header_nothing_changed_2(self): rv = {'headers': {}} options = {} # when headers = compute_link_header( rv, options) self.assertEquals(headers, {}) class SWHTransformProcessorTest(unittest.TestCase): @istest def transform_only_return_results_1(self): rv = {'results': {'some-key': 'some-value'}} self.assertEquals(transform(rv), {'some-key': 'some-value'}) @istest def transform_only_return_results_2(self): rv = {'headers': {'something': 'do changes'}, 'results': {'some-key': 'some-value'}} self.assertEquals(transform(rv), {'some-key': 'some-value'}) @istest def transform_do_remove_headers(self): rv = {'headers': {'something': 'do changes'}, 'some-key': 'some-value'} self.assertEquals(transform(rv), {'some-key': 'some-value'}) @istest def transform_do_nothing(self): rv = {'some-key': 'some-value'} self.assertEquals(transform(rv), {'some-key': 'some-value'}) class RendererTestCase(unittest.TestCase): @patch('swh.web.api.apiresponse.json') @patch('swh.web.api.apiresponse.filter_by_fields') - @patch('swh.web.api.apiresponse.utils.shorten_path') + @patch('swh.web.api.apiresponse.shorten_path') @istest def swh_multi_response_mimetype(self, mock_shorten_path, mock_filter, mock_json): # given data = { 'data': [12, 34], 'id': 'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc' } mock_filter.return_value = data mock_shorten_path.return_value = 'my_short_path' accepted_response_formats = {'html': 'text/html', 'yaml': 'application/yaml', 'json': 'application/json'} for format in accepted_response_formats: request = api_request_factory.get('/api/test/path/') mime_type = accepted_response_formats[format] setattr(request, 'accepted_media_type', mime_type) if mime_type == 'text/html': expected_data = { 'response_data': json.dumps(data), 'request': request, 'headers_data': {}, 'heading': 'my_short_path', 'status_code': 200 } mock_json.dumps.return_value = json.dumps(data) else: expected_data = data # when rv = make_api_response(request, data) # then mock_filter.assert_called_with(request, data) self.assertEqual(rv.data, expected_data) self.assertEqual(rv.status_code, 200) if mime_type == 'text/html': self.assertEqual(rv.template_name, 'apidoc.html') @istest def swh_filter_renderer_do_nothing(self): # given input_data = {'a': 'some-data'} request = api_request_factory.get('/api/test/path/', data={}) setattr(request, 'query_params', request.GET) # when actual_data = filter_by_fields(request, input_data) # then self.assertEquals(actual_data, input_data) @patch('swh.web.api.apiresponse.utils.filter_field_keys') @istest def swh_filter_renderer_do_filter(self, mock_ffk): # given mock_ffk.return_value = {'a': 'some-data'} request = api_request_factory.get('/api/test/path/', data={'fields': 'a,c'}) setattr(request, 'query_params', request.GET) input_data = {'a': 'some-data', 'b': 'some-other-data'} # when actual_data = filter_by_fields(request, input_data) # then self.assertEquals(actual_data, {'a': 'some-data'}) mock_ffk.assert_called_once_with(input_data, {'a', 'c'}) diff --git a/swh/web/tests/api/test_utils.py b/swh/web/tests/api/test_utils.py index 3f9ed246..c5913059 100644 --- a/swh/web/tests/api/test_utils.py +++ b/swh/web/tests/api/test_utils.py @@ -1,941 +1,866 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information -import datetime -import dateutil import unittest from unittest.mock import patch, call from nose.tools import istest, nottest from swh.web.api import utils class UtilsTestCase(unittest.TestCase): def setUp(self): self.maxDiff = None self.url_map = [dict(rule='/other/', methods=set(['GET', 'POST', 'HEAD']), endpoint='foo'), dict(rule='/some/old/url/', methods=set(['GET', 'POST']), endpoint='blablafn'), dict(rule='/other/old/url/', methods=set(['GET', 'HEAD']), endpoint='bar'), dict(rule='/other', methods=set([]), endpoint=None), dict(rule='/other2', methods=set([]), endpoint=None)] @istest def filter_endpoints_1(self): # when actual_data = utils.filter_endpoints(self.url_map, '/some') # then self.assertEquals(actual_data, { '/some/old/url/': { 'methods': ['GET', 'POST'], 'endpoint': 'blablafn' } }) @istest def filter_endpoints_2(self): # when actual_data = utils.filter_endpoints(self.url_map, '/other', blacklist=['/other2']) # then # rules /other is skipped because its' exactly the prefix url # rules /other2 is skipped because it's blacklisted self.assertEquals(actual_data, { '/other/': { 'methods': ['GET', 'HEAD', 'POST'], 'endpoint': 'foo' }, '/other/old/url/': { 'methods': ['GET', 'HEAD'], 'endpoint': 'bar' } }) @istest def prepare_data_for_view_default_encoding(self): self.maxDiff = None # given inputs = [ { 'data': b'some blah data' }, { 'data': 1, 'data_url': '/api/1/some/api/call', }, { 'blah': 'foobar', 'blah_url': '/some/non/changed/api/call' }] # when actual_result = utils.prepare_data_for_view(inputs) # then self.assertEquals(actual_result, [ { 'data': 'some blah data', }, { 'data': 1, 'data_url': '/browse/some/api/call', }, { 'blah': 'foobar', 'blah_url': '/some/non/changed/api/call' } ]) @istest def prepare_data_for_view(self): self.maxDiff = None # given inputs = [ { 'data': b'some blah data' }, { 'data': 1, 'data_url': '/api/1/some/api/call', }, { 'blah': 'foobar', 'blah_url': '/some/non/changed/api/call' }] # when actual_result = utils.prepare_data_for_view(inputs, encoding='ascii') # then self.assertEquals(actual_result, [ { 'data': 'some blah data', }, { 'data': 1, 'data_url': '/browse/some/api/call', }, { 'blah': 'foobar', 'blah_url': '/some/non/changed/api/call' } ]) @istest def prepare_data_for_view_ko_cannot_decode(self): self.maxDiff = None # given inputs = { 'data': 'hé dude!'.encode('utf8'), } actual_result = utils.prepare_data_for_view(inputs, encoding='ascii') # then self.assertEquals(actual_result, { 'data': "Cannot decode the data bytes, try and set another " "encoding in the url (e.g. ?encoding=utf8) or " "download directly the " "content's raw data.", }) @istest def filter_field_keys_dict_unknown_keys(self): # when actual_res = utils.filter_field_keys( {'directory': 1, 'file': 2, 'link': 3}, {'directory1', 'file2'}) # then self.assertEqual(actual_res, {}) @istest def filter_field_keys_dict(self): # when actual_res = utils.filter_field_keys( {'directory': 1, 'file': 2, 'link': 3}, {'directory', 'link'}) # then self.assertEqual(actual_res, {'directory': 1, 'link': 3}) @istest def filter_field_keys_list_unknown_keys(self): # when actual_res = utils.filter_field_keys( [{'directory': 1, 'file': 2, 'link': 3}, {'1': 1, '2': 2, 'link': 3}], {'d'}) # then self.assertEqual(actual_res, [{}, {}]) @istest def filter_field_keys_map(self): # when actual_res = utils.filter_field_keys( map(lambda x: {'i': x['i']+1, 'j': x['j']}, [{'i': 1, 'j': None}, {'i': 2, 'j': None}, {'i': 3, 'j': None}]), {'i'}) # then self.assertEqual(list(actual_res), [{'i': 2}, {'i': 3}, {'i': 4}]) @istest def filter_field_keys_list(self): # when actual_res = utils.filter_field_keys( [{'directory': 1, 'file': 2, 'link': 3}, {'dir': 1, 'fil': 2, 'lin': 3}], {'directory', 'dir'}) # then self.assertEqual(actual_res, [{'directory': 1}, {'dir': 1}]) @istest def filter_field_keys_other(self): # given input_set = {1, 2} # when actual_res = utils.filter_field_keys(input_set, {'a', '1'}) # then self.assertEqual(actual_res, input_set) @istest def fmap(self): self.assertEquals([2, 3, None, 4], utils.fmap(lambda x: x+1, [1, 2, None, 3])) self.assertEquals([11, 12, 13], list(utils.fmap(lambda x: x+10, map(lambda x: x, [1, 2, 3])))) self.assertEquals({'a': 2, 'b': 4}, utils.fmap(lambda x: x*2, {'a': 1, 'b': 2})) self.assertEquals(100, utils.fmap(lambda x: x*10, 10)) self.assertEquals({'a': [2, 6], 'b': 4}, utils.fmap(lambda x: x*2, {'a': [1, 3], 'b': 2})) self.assertIsNone(utils.fmap(lambda x: x, None)) @istest def person_to_string(self): self.assertEqual(utils.person_to_string(dict(name='raboof', email='foo@bar')), 'raboof ') - @istest - def parse_timestamp(self): - input_timestamps = [ - None, - '2016-01-12', - '2016-01-12T09:19:12+0100', - 'Today is January 1, 2047 at 8:21:00AM', - '1452591542', - ] - - output_dates = [ - None, - datetime.datetime(2016, 1, 12, 0, 0), - datetime.datetime(2016, 1, 12, 9, 19, 12, - tzinfo=dateutil.tz.tzoffset(None, 3600)), - datetime.datetime(2047, 1, 1, 8, 21), - datetime.datetime(2016, 1, 12, 9, 39, 2, - tzinfo=datetime.timezone.utc), - ] - - for ts, exp_date in zip(input_timestamps, output_dates): - self.assertEquals(utils.parse_timestamp(ts), exp_date) - @istest def enrich_release_0(self): # when actual_release = utils.enrich_release({}) # then self.assertEqual(actual_release, {}) @patch('swh.web.api.utils.reverse') @istest def enrich_release_1(self, mock_django_reverse): # given def reverse_test_context(view_name, kwargs): if view_name == 'content': id = kwargs['q'] return '/api/1/content/%s/' % id elif view_name == 'person': id = kwargs['person_id'] return '/api/1/person/%s/' % id else: raise ValueError( 'This should not happened so fail if it does.') mock_django_reverse.side_effect = reverse_test_context # when actual_release = utils.enrich_release({ 'target': '123', 'target_type': 'content', 'author': { 'id': 100, 'name': 'author release name', 'email': 'author@email', }, }) # then self.assertEqual(actual_release, { 'target': '123', 'target_type': 'content', 'target_url': '/api/1/content/sha1_git:123/', 'author_url': '/api/1/person/100/', 'author': { 'id': 100, 'name': 'author release name', 'email': 'author@email', }, }) mock_django_reverse.assert_has_calls([ call('content', kwargs={'q': 'sha1_git:123'}), call('person', kwargs={'person_id': 100}) ]) @patch('swh.web.api.utils.reverse') @istest def enrich_release_2(self, mock_django_reverse): # given mock_django_reverse.return_value = '/api/1/dir/23/' # when actual_release = utils.enrich_release({'target': '23', 'target_type': 'directory'}) # then self.assertEqual(actual_release, { 'target': '23', 'target_type': 'directory', 'target_url': '/api/1/dir/23/' }) mock_django_reverse.assert_called_once_with('directory', kwargs={'sha1_git': '23'}) @patch('swh.web.api.utils.reverse') @istest def enrich_release_3(self, mock_django_reverse): # given mock_django_reverse.return_value = '/api/1/rev/3/' # when actual_release = utils.enrich_release({'target': '3', 'target_type': 'revision'}) # then self.assertEqual(actual_release, { 'target': '3', 'target_type': 'revision', 'target_url': '/api/1/rev/3/' }) mock_django_reverse.assert_called_once_with('revision', kwargs={'sha1_git': '3'}) @patch('swh.web.api.utils.reverse') @istest def enrich_release_4(self, mock_django_reverse): # given mock_django_reverse.return_value = '/api/1/rev/4/' # when actual_release = utils.enrich_release({'target': '4', 'target_type': 'release'}) # then self.assertEqual(actual_release, { 'target': '4', 'target_type': 'release', 'target_url': '/api/1/rev/4/' }) mock_django_reverse.assert_called_once_with('release', kwargs={'sha1_git': '4'}) @patch('swh.web.api.utils.reverse') @istest def enrich_directory_no_type(self, mock_django_reverse): # when/then self.assertEqual(utils.enrich_directory({'id': 'dir-id'}), {'id': 'dir-id'}) # given mock_django_reverse.return_value = '/api/content/sha1_git:123/' # when actual_directory = utils.enrich_directory({ 'id': 'dir-id', 'type': 'file', 'target': '123', }) # then self.assertEqual(actual_directory, { 'id': 'dir-id', 'type': 'file', 'target': '123', 'target_url': '/api/content/sha1_git:123/', }) mock_django_reverse.assert_called_once_with( 'content', kwargs={'q': 'sha1_git:123'}) @patch('swh.web.api.utils.reverse') @istest def enrich_directory_with_context_and_type_file(self, mock_django_reverse): # given mock_django_reverse.return_value = '/api/content/sha1_git:123/' # when actual_directory = utils.enrich_directory({ 'id': 'dir-id', 'type': 'file', 'name': 'hy', 'target': '789', }, context_url='/api/revision/revsha1/directory/prefix/path/') # then self.assertEqual(actual_directory, { 'id': 'dir-id', 'type': 'file', 'name': 'hy', 'target': '789', 'target_url': '/api/content/sha1_git:123/', 'file_url': '/api/revision/revsha1/directory' '/prefix/path/hy/' }) mock_django_reverse.assert_called_once_with( 'content', kwargs={'q': 'sha1_git:789'}) @patch('swh.web.api.utils.reverse') @istest def enrich_directory_with_context_and_type_dir(self, mock_django_reverse): # given mock_django_reverse.return_value = '/api/directory/456/' # when actual_directory = utils.enrich_directory({ 'id': 'dir-id', 'type': 'dir', 'name': 'emacs-42', 'target_type': 'file', 'target': '456', }, context_url='/api/revision/origin/2/directory/some/prefix/path/') # then self.assertEqual(actual_directory, { 'id': 'dir-id', 'type': 'dir', 'target_type': 'file', 'name': 'emacs-42', 'target': '456', 'target_url': '/api/directory/456/', 'dir_url': '/api/revision/origin/2/directory' '/some/prefix/path/emacs-42/' }) mock_django_reverse.assert_called_once_with('directory', kwargs={'sha1_git': '456'}) @istest def enrich_content_without_hashes(self): # when/then self.assertEqual(utils.enrich_content({'id': '123'}), {'id': '123'}) @patch('swh.web.api.utils.reverse') @istest def enrich_content_with_hashes(self, mock_django_reverse): for h in ['sha1', 'sha256', 'sha1_git']: # given mock_django_reverse.side_effect = [ '/api/content/%s:123/raw/' % h, '/api/filetype/%s:123/' % h, '/api/language/%s:123/' % h, '/api/license/%s:123/' % h, ] # when enriched_content = utils.enrich_content( { 'id': '123', h: 'blahblah' } ) # then self.assertEqual( enriched_content, { 'id': '123', h: 'blahblah', 'data_url': '/api/content/%s:123/raw/' % h, 'filetype_url': '/api/filetype/%s:123/' % h, 'language_url': '/api/language/%s:123/' % h, 'license_url': '/api/license/%s:123/' % h, } ) mock_django_reverse.assert_has_calls([ call('content-raw', kwargs={'q': '%s:blahblah' % h}), call('content-filetype', kwargs={'q': '%s:blahblah' % h}), call('content-language', kwargs={'q': '%s:blahblah' % h}), call('content-license', kwargs={'q': '%s:blahblah' % h}), ]) mock_django_reverse.reset() @patch('swh.web.api.utils.reverse') @istest def enrich_content_with_hashes_and_top_level_url(self, mock_django_reverse): for h in ['sha1', 'sha256', 'sha1_git']: # given mock_django_reverse.side_effect = [ '/api/content/%s:123/' % h, '/api/content/%s:123/raw/' % h, '/api/filetype/%s:123/' % h, '/api/language/%s:123/' % h, '/api/license/%s:123/' % h, ] # when enriched_content = utils.enrich_content( { 'id': '123', h: 'blahblah' }, top_url=True ) # then self.assertEqual( enriched_content, { 'id': '123', h: 'blahblah', 'content_url': '/api/content/%s:123/' % h, 'data_url': '/api/content/%s:123/raw/' % h, 'filetype_url': '/api/filetype/%s:123/' % h, 'language_url': '/api/language/%s:123/' % h, 'license_url': '/api/license/%s:123/' % h, } ) mock_django_reverse.assert_has_calls([ call('content', kwargs={'q': '%s:blahblah' % h}), call('content-raw', kwargs={'q': '%s:blahblah' % h}), call('content-filetype', kwargs={'q': '%s:blahblah' % h}), call('content-language', kwargs={'q': '%s:blahblah' % h}), call('content-license', kwargs={'q': '%s:blahblah' % h}), ]) mock_django_reverse.reset() @istest def enrich_entity_identity(self): # when/then self.assertEqual(utils.enrich_content({'id': '123'}), {'id': '123'}) @patch('swh.web.api.utils.reverse') @istest def enrich_entity_with_sha1(self, mock_django_reverse): # given def reverse_test(view_name, kwargs): return '/api/entity/' + kwargs['uuid'] + '/' mock_django_reverse.side_effect = reverse_test # when actual_entity = utils.enrich_entity({ 'uuid': 'uuid-1', 'parent': 'uuid-parent', 'name': 'something' }) # then self.assertEqual(actual_entity, { 'uuid': 'uuid-1', 'uuid_url': '/api/entity/uuid-1/', 'parent': 'uuid-parent', 'parent_url': '/api/entity/uuid-parent/', 'name': 'something', }) mock_django_reverse.assert_has_calls( [call('entity', kwargs={'uuid': 'uuid-1'}), call('entity', kwargs={'uuid': 'uuid-parent'})]) @nottest def _reverse_context_test(self, view_name, kwargs): if view_name == 'revision': return '/api/revision/%s/' % kwargs['sha1_git'] elif view_name == 'revision-context': return '/api/revision/%s/prev/%s/' % (kwargs['sha1_git'], kwargs['context']) # noqa elif view_name == 'revision-log': if 'prev_sha1s' in kwargs: return '/api/revision/%s/prev/%s/log/' % (kwargs['sha1_git'], kwargs['prev_sha1s']) # noqa else: return '/api/revision/%s/log/' % kwargs['sha1_git'] @patch('swh.web.api.utils.reverse') @istest def enrich_revision_without_children_or_parent(self, mock_django_reverse): # given def reverse_test(view_name, kwargs): if view_name == 'revision': return '/api/revision/' + kwargs['sha1_git'] + '/' elif view_name == 'revision-log': return '/api/revision/' + kwargs['sha1_git'] + '/log/' elif view_name == 'directory': return '/api/directory/' + kwargs['sha1_git'] + '/' elif view_name == 'person': return '/api/person/' + kwargs['person_id'] + '/' mock_django_reverse.side_effect = reverse_test # when actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'directory': '123', 'author': {'id': '1'}, 'committer': {'id': '2'}, }) expected_revision = { 'id': 'rev-id', 'directory': '123', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'directory_url': '/api/directory/123/', 'author': {'id': '1'}, 'author_url': '/api/person/1/', 'committer': {'id': '2'}, 'committer_url': '/api/person/2/' } # then self.assertEqual(actual_revision, expected_revision) mock_django_reverse.assert_has_calls( [call('revision', kwargs={'sha1_git': 'rev-id'}), call('revision-log', kwargs={'sha1_git': 'rev-id'}), call('person', kwargs={'person_id': '1'}), call('person', kwargs={'person_id': '2'}), call('directory', kwargs={'sha1_git': '123'})]) @patch('swh.web.api.utils.reverse') @istest def enrich_revision_with_children_and_parent_no_dir(self, mock_django_reverse): # given mock_django_reverse.side_effect = self._reverse_context_test # when actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'parents': ['123'], 'children': ['456'], }, context='prev-rev') expected_revision = { 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'history_context_url': '/api/revision/rev-id/prev/prev-rev/log/', 'parents': [{'id': '123', 'url': '/api/revision/123/'}], 'children': ['456'], 'children_urls': ['/api/revision/456/', '/api/revision/prev-rev/'], } # then self.assertEqual(actual_revision, expected_revision) mock_django_reverse.assert_has_calls( [call('revision', kwargs={'sha1_git': 'prev-rev'}), call('revision', kwargs={'sha1_git': 'rev-id'}), call('revision-log', kwargs={'sha1_git': 'rev-id'}), call('revision-log', kwargs={'sha1_git': 'rev-id', 'prev_sha1s': 'prev-rev'}), call('revision', kwargs={'sha1_git': '123'}), call('revision', kwargs={'sha1_git': '456'})]) @patch('swh.web.api.utils.reverse') @istest def enrich_revision_no_context(self, mock_django_reverse): # given mock_django_reverse.side_effect = self._reverse_context_test # when actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'parents': ['123'], 'children': ['456'], }) expected_revision = { 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'parents': [{'id': '123', 'url': '/api/revision/123/'}], 'children': ['456'], 'children_urls': ['/api/revision/456/'] } # then self.assertEqual(actual_revision, expected_revision) mock_django_reverse.assert_has_calls( [call('revision', kwargs={'sha1_git': 'rev-id'}), call('revision-log', kwargs={'sha1_git': 'rev-id'}), call('revision', kwargs={'sha1_git': '123'}), call('revision', kwargs={'sha1_git': '456'})]) @patch('swh.web.api.utils.reverse') @istest def enrich_revision_context_empty_prev_list(self, mock_django_reverse): # given mock_django_reverse.side_effect = self._reverse_context_test # when expected_revision = { 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'history_context_url': ('/api/revision/rev-id/' 'prev/prev-rev/log/'), 'parents': [{'id': '123', 'url': '/api/revision/123/'}], 'children': ['456'], 'children_urls': ['/api/revision/456/', '/api/revision/prev-rev/'], } actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'parents': ['123'], 'children': ['456']}, context='prev-rev') # then self.assertEqual(actual_revision, expected_revision) mock_django_reverse.assert_has_calls( [call('revision', kwargs={'sha1_git': 'prev-rev'}), call('revision', kwargs={'sha1_git': 'rev-id'}), call('revision-log', kwargs={'sha1_git': 'rev-id'}), call('revision-log', kwargs={'sha1_git': 'rev-id', 'prev_sha1s': 'prev-rev'}), call('revision', kwargs={'sha1_git': '123'}), call('revision', kwargs={'sha1_git': '456'})]) @patch('swh.web.api.utils.reverse') @istest def enrich_revision_context_some_prev_list(self, mock_django_reverse): # given mock_django_reverse.side_effect = self._reverse_context_test # when expected_revision = { 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'history_context_url': ('/api/revision/rev-id/' 'prev/prev1-rev/prev0-rev/log/'), 'parents': [{'id': '123', 'url': '/api/revision/123/'}], 'children': ['456'], 'children_urls': ['/api/revision/456/', '/api/revision/prev0-rev/prev/prev1-rev/'], } actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'parents': ['123'], 'children': ['456']}, context='prev1-rev/prev0-rev') # then self.assertEqual(actual_revision, expected_revision) mock_django_reverse.assert_has_calls( [call('revision-context', kwargs={'context': 'prev1-rev', 'sha1_git': 'prev0-rev'}), call('revision', kwargs={'sha1_git': 'rev-id'}), call('revision-log', kwargs={'sha1_git': 'rev-id'}), call('revision-log', kwargs={'prev_sha1s': 'prev1-rev/prev0-rev', 'sha1_git': 'rev-id'}), call('revision', kwargs={'sha1_git': '123'}), call('revision', kwargs={'sha1_git': '456'})]) @nottest def _reverse_rev_message_test(self, view_name, kwargs): if view_name == 'revision': return '/api/revision/%s/' % kwargs['sha1_git'] elif view_name == 'revision-log': if 'prev_sha1s' in kwargs and kwargs['prev_sha1s'] is not None: return '/api/revision/%s/prev/%s/log/' % (kwargs['sha1_git'], kwargs['prev_sha1s']) # noqa else: return '/api/revision/%s/log/' % kwargs['sha1_git'] elif view_name == 'revision-raw-message': return '/api/revision/' + kwargs['sha1_git'] + '/raw/' else: return '/api/revision/%s/prev/%s/' % (kwargs['sha1_git'], kwargs['context']) # noqa @patch('swh.web.api.utils.reverse') @istest def enrich_revision_with_no_message(self, mock_django_reverse): # given mock_django_reverse.side_effect = self._reverse_rev_message_test # when expected_revision = { 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'history_context_url': ('/api/revision/rev-id/' 'prev/prev-rev/log/'), 'message': None, 'parents': [{'id': '123', 'url': '/api/revision/123/'}], 'children': ['456'], 'children_urls': ['/api/revision/456/', '/api/revision/prev-rev/'], } actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'message': None, 'parents': ['123'], 'children': ['456'], }, context='prev-rev') # then self.assertEqual(actual_revision, expected_revision) mock_django_reverse.assert_has_calls( [call('revision', kwargs={'sha1_git': 'prev-rev'}), call('revision', kwargs={'sha1_git': 'rev-id'}), call('revision-log', kwargs={'sha1_git': 'rev-id'}), call('revision-log', kwargs={'sha1_git': 'rev-id', 'prev_sha1s': 'prev-rev'}), call('revision', kwargs={'sha1_git': '123'}), call('revision', kwargs={'sha1_git': '456'})] ) @patch('swh.web.api.utils.reverse') @istest def enrich_revision_with_invalid_message(self, mock_django_reverse): # given mock_django_reverse.side_effect = self._reverse_rev_message_test # when actual_revision = utils.enrich_revision({ 'id': 'rev-id', 'message': None, 'message_decoding_failed': True, 'parents': ['123'], 'children': ['456'], }, context='prev-rev') expected_revision = { 'id': 'rev-id', 'url': '/api/revision/rev-id/', 'history_url': '/api/revision/rev-id/log/', 'history_context_url': ('/api/revision/rev-id/' 'prev/prev-rev/log/'), 'message': None, 'message_decoding_failed': True, 'message_url': '/api/revision/rev-id/raw/', 'parents': [{'id': '123', 'url': '/api/revision/123/'}], 'children': ['456'], 'children_urls': ['/api/revision/456/', '/api/revision/prev-rev/'], } # then self.assertEqual(actual_revision, expected_revision) mock_django_reverse.assert_has_calls( [call('revision', kwargs={'sha1_git': 'prev-rev'}), call('revision', kwargs={'sha1_git': 'rev-id'}), call('revision-log', kwargs={'sha1_git': 'rev-id'}), call('revision-log', kwargs={'sha1_git': 'rev-id', 'prev_sha1s': 'prev-rev'}), call('revision', kwargs={'sha1_git': '123'}), call('revision', kwargs={'sha1_git': '456'}), call('revision-raw-message', kwargs={'sha1_git': 'rev-id'})]) - - @istest - def shorten_path_noop(self): - noops = [ - '/api/', - '/browse/', - '/content/symbol/foobar/' - ] - - for noop in noops: - self.assertEqual( - utils.shorten_path(noop), - noop - ) - - @istest - def shorten_path_sha1(self): - sha1 = 'aafb16d69fd30ff58afdd69036a26047f3aebdc6' - short_sha1 = sha1[:8] + '...' - - templates = [ - '/api/1/content/sha1:%s/', - '/api/1/content/sha1_git:%s/', - '/api/1/directory/%s/', - '/api/1/content/sha1:%s/ctags/', - ] - - for template in templates: - self.assertEqual( - utils.shorten_path(template % sha1), - template % short_sha1 - ) - - @istest - def shorten_path_sha256(self): - sha256 = ('aafb16d69fd30ff58afdd69036a26047' - '213add102934013a014dfca031c41aef') - short_sha256 = sha256[:8] + '...' - - templates = [ - '/api/1/content/sha256:%s/', - '/api/1/directory/%s/', - '/api/1/content/sha256:%s/filetype/', - ] - - for template in templates: - self.assertEqual( - utils.shorten_path(template % sha256), - template % short_sha256 - ) diff --git a/swh/web/tests/api/views/test_entity.py b/swh/web/tests/api/views/test_entity.py index ea8e2562..83d3b563 100644 --- a/swh/web/tests/api/views/test_entity.py +++ b/swh/web/tests/api/views/test_entity.py @@ -1,94 +1,94 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from nose.tools import istest from unittest.mock import patch -from swh.web.api.exc import BadInputExc +from swh.web.common.exc import BadInputExc from ..swh_api_testcase import SWHApiTestCase class EntityApiTestCase(SWHApiTestCase): @patch('swh.web.api.views.entity.service') @istest def api_lookup_entity_by_uuid_not_found(self, mock_service): # when mock_service.lookup_entity_by_uuid.return_value = [] # when rv = self.client.get('/api/1/entity/' '5f4d4c51-498a-4e28-88b3-b3e4e8396cba/') self.assertEquals(rv.status_code, 404) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEquals(rv.data, { 'exception': 'NotFoundExc', 'reason': "Entity with uuid '5f4d4c51-498a-4e28-88b3-b3e4e8396cba' not " + "found."}) mock_service.lookup_entity_by_uuid.assert_called_once_with( '5f4d4c51-498a-4e28-88b3-b3e4e8396cba') @patch('swh.web.api.views.entity.service') @istest def api_lookup_entity_by_uuid_bad_request(self, mock_service): # when mock_service.lookup_entity_by_uuid.side_effect = BadInputExc( 'bad input: uuid malformed!') # when rv = self.client.get('/api/1/entity/uuid malformed/') self.assertEquals(rv.status_code, 400) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEquals(rv.data, { 'exception': 'BadInputExc', 'reason': 'bad input: uuid malformed!'}) mock_service.lookup_entity_by_uuid.assert_called_once_with( 'uuid malformed') @patch('swh.web.api.views.entity.service') @istest def api_lookup_entity_by_uuid(self, mock_service): # when stub_entities = [ { 'uuid': '34bd6b1b-463f-43e5-a697-785107f598e4', 'parent': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2' }, { 'uuid': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2' } ] mock_service.lookup_entity_by_uuid.return_value = stub_entities expected_entities = [ { 'uuid': '34bd6b1b-463f-43e5-a697-785107f598e4', 'uuid_url': '/api/1/entity/34bd6b1b-463f-43e5-a697-' '785107f598e4/', 'parent': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2', 'parent_url': '/api/1/entity/aee991a0-f8d7-4295-a201-' 'd1ce2efc9fb2/' }, { 'uuid': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2', 'uuid_url': '/api/1/entity/aee991a0-f8d7-4295-a201-' 'd1ce2efc9fb2/' } ] # when rv = self.client.get('/api/1/entity' '/34bd6b1b-463f-43e5-a697-785107f598e4/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEquals(rv.data, expected_entities) mock_service.lookup_entity_by_uuid.assert_called_once_with( '34bd6b1b-463f-43e5-a697-785107f598e4') diff --git a/swh/web/tests/api/views/test_revision.py b/swh/web/tests/api/views/test_revision.py index 802f59b9..05fc6896 100644 --- a/swh/web/tests/api/views/test_revision.py +++ b/swh/web/tests/api/views/test_revision.py @@ -1,911 +1,917 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from nose.tools import istest from unittest.mock import patch -from swh.web.api.exc import NotFoundExc +from swh.web.common.exc import NotFoundExc from swh.web.api.views.revision import ( _revision_directory_by ) from ..swh_api_testcase import SWHApiTestCase class ReleaseApiTestCase(SWHApiTestCase): @patch('swh.web.api.views.revision.service') @istest def api_revision(self, mock_service): # given stub_revision = { 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': ['8734ef7e7c357ce2af928115c6c6a42b7e2a44e7'], 'type': 'tar', 'synthetic': True, 'metadata': { 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912' }] }, } mock_service.lookup_revision.return_value = stub_revision expected_revision = { 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'url': '/api/1/revision/18d8be353ed3480476f032475e7c233eff7371d5/', 'history_url': '/api/1/revision/18d8be353ed3480476f032475e7c233e' 'ff7371d5/log/', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'directory_url': '/api/1/directory/7834ef7e7c357ce2af928115c6c6' 'a42b7e2a44e6/', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': [{ 'id': '8734ef7e7c357ce2af928115c6c6a42b7e2a44e7', 'url': '/api/1/revision/8734ef7e7c357ce2af928115c6c6a42b7e2a44e7/' # noqa }], 'type': 'tar', 'synthetic': True, 'metadata': { 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912' }] }, } # when rv = self.client.get('/api/1/revision/' '18d8be353ed3480476f032475e7c233eff7371d5/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEquals(expected_revision, rv.data) mock_service.lookup_revision.assert_called_once_with( '18d8be353ed3480476f032475e7c233eff7371d5') @patch('swh.web.api.views.revision.service') @istest def api_revision_not_found(self, mock_service): # given mock_service.lookup_revision.return_value = None # when rv = self.client.get('/api/1/revision/12345/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEquals(rv.data, { 'exception': 'NotFoundExc', 'reason': 'Revision with sha1_git 12345 not found.'}) @patch('swh.web.api.views.revision.service') @istest def api_revision_raw_ok(self, mock_service): # given stub_revision = {'message': 'synthetic revision message'} mock_service.lookup_revision_message.return_value = stub_revision # when rv = self.client.get('/api/1/revision/18d8be353ed3480476f032475e7c2' '33eff7371d5/raw/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv['Content-Type'], 'application/octet-stream') self.assertEquals(rv.content, b'synthetic revision message') mock_service.lookup_revision_message.assert_called_once_with( '18d8be353ed3480476f032475e7c233eff7371d5') @patch('swh.web.api.views.revision.service') @istest def api_revision_raw_ok_no_msg(self, mock_service): # given mock_service.lookup_revision_message.side_effect = NotFoundExc( 'No message for revision') # when rv = self.client.get('/api/1/revision/' '18d8be353ed3480476f032475e7c233eff7371d5/raw/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEquals(rv.data, { 'exception': 'NotFoundExc', 'reason': 'No message for revision'}) self.assertEquals mock_service.lookup_revision_message.assert_called_once_with( '18d8be353ed3480476f032475e7c233eff7371d5') @patch('swh.web.api.views.revision.service') @istest def api_revision_raw_ko_no_rev(self, mock_service): # given mock_service.lookup_revision_message.side_effect = NotFoundExc( 'No revision found') # when rv = self.client.get('/api/1/revision/' '18d8be353ed3480476f032475e7c233eff7371d5/raw/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEquals(rv.data, { 'exception': 'NotFoundExc', 'reason': 'No revision found'}) mock_service.lookup_revision_message.assert_called_once_with( '18d8be353ed3480476f032475e7c233eff7371d5') @patch('swh.web.api.views.revision.service') @istest def api_revision_with_origin_not_found(self, mock_service): mock_service.lookup_revision_by.return_value = None rv = self.client.get('/api/1/revision/origin/123/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv['Content-Type'], 'application/json') self.assertIn('Revision with (origin_id: 123', rv.data['reason']) self.assertIn('not found', rv.data['reason']) self.assertEqual('NotFoundExc', rv.data['exception']) mock_service.lookup_revision_by.assert_called_once_with( '123', 'refs/heads/master', None) @patch('swh.web.api.views.revision.service') @istest def api_revision_with_origin(self, mock_service): mock_revision = { 'id': '32', 'directory': '21', 'message': 'message 1', 'type': 'deb', } expected_revision = { 'id': '32', 'url': '/api/1/revision/32/', 'history_url': '/api/1/revision/32/log/', 'directory': '21', 'directory_url': '/api/1/directory/21/', 'message': 'message 1', 'type': 'deb', } mock_service.lookup_revision_by.return_value = mock_revision rv = self.client.get('/api/1/revision/origin/1/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_revision) mock_service.lookup_revision_by.assert_called_once_with( '1', 'refs/heads/master', None) @patch('swh.web.api.views.revision.service') @istest def api_revision_with_origin_and_branch_name(self, mock_service): mock_revision = { 'id': '12', 'directory': '23', 'message': 'message 2', 'type': 'tar', } mock_service.lookup_revision_by.return_value = mock_revision expected_revision = { 'id': '12', 'url': '/api/1/revision/12/', 'history_url': '/api/1/revision/12/log/', 'directory': '23', 'directory_url': '/api/1/directory/23/', 'message': 'message 2', 'type': 'tar', } rv = self.client.get('/api/1/revision/origin/1' '/branch/refs/origin/dev/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_revision) mock_service.lookup_revision_by.assert_called_once_with( '1', 'refs/origin/dev', None) + @patch('swh.web.api.views.revision.parse_timestamp') @patch('swh.web.api.views.revision.service') @patch('swh.web.api.views.revision.utils') @istest def api_revision_with_origin_and_branch_name_and_timestamp(self, mock_utils, - mock_service): + mock_service, + mock_parse_timestamp): # noqa mock_revision = { 'id': '123', 'directory': '456', 'message': 'message 3', 'type': 'tar', } mock_service.lookup_revision_by.return_value = mock_revision expected_revision = { 'id': '123', 'url': '/api/1/revision/123/', 'history_url': '/api/1/revision/123/log/', 'directory': '456', 'directory_url': '/api/1/directory/456/', 'message': 'message 3', 'type': 'tar', } - mock_utils.parse_timestamp.return_value = 'parsed-date' + mock_parse_timestamp.return_value = 'parsed-date' mock_utils.enrich_revision.return_value = expected_revision rv = self.client.get('/api/1/revision' '/origin/1' '/branch/refs/origin/dev' '/ts/1452591542/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_revision) mock_service.lookup_revision_by.assert_called_once_with( '1', 'refs/origin/dev', 'parsed-date') - mock_utils.parse_timestamp.assert_called_once_with('1452591542') + mock_parse_timestamp.assert_called_once_with('1452591542') mock_utils.enrich_revision.assert_called_once_with( mock_revision) + @patch('swh.web.api.views.revision.parse_timestamp') @patch('swh.web.api.views.revision.service') @patch('swh.web.api.views.revision.utils') @istest def api_revision_with_origin_and_branch_name_and_timestamp_with_escapes( self, mock_utils, - mock_service): + mock_service, + mock_parse_timestamp): mock_revision = { 'id': '999', } mock_service.lookup_revision_by.return_value = mock_revision expected_revision = { 'id': '999', 'url': '/api/1/revision/999/', 'history_url': '/api/1/revision/999/log/', } - mock_utils.parse_timestamp.return_value = 'parsed-date' + mock_parse_timestamp.return_value = 'parsed-date' mock_utils.enrich_revision.return_value = expected_revision rv = self.client.get('/api/1/revision' '/origin/1' '/branch/refs%2Forigin%2Fdev' '/ts/Today%20is%20' 'January%201,%202047%20at%208:21:00AM/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_revision) mock_service.lookup_revision_by.assert_called_once_with( '1', 'refs/origin/dev', 'parsed-date') - mock_utils.parse_timestamp.assert_called_once_with( + mock_parse_timestamp.assert_called_once_with( 'Today is January 1, 2047 at 8:21:00AM') mock_utils.enrich_revision.assert_called_once_with( mock_revision) @patch('swh.web.api.views.revision.service') @istest def revision_directory_by_ko_raise(self, mock_service): # given mock_service.lookup_directory_through_revision.side_effect = NotFoundExc('not') # noqa # when with self.assertRaises(NotFoundExc): _revision_directory_by( {'sha1_git': 'id'}, None, '/api/1/revision/sha1/directory/') # then mock_service.lookup_directory_through_revision.assert_called_once_with( {'sha1_git': 'id'}, None, limit=100, with_data=False) @patch('swh.web.api.views.revision.service') @istest def revision_directory_by_type_dir(self, mock_service): # given mock_service.lookup_directory_through_revision.return_value = ( 'rev-id', { 'type': 'dir', 'revision': 'rev-id', 'path': 'some/path', 'content': [] }) # when actual_dir_content = _revision_directory_by( {'sha1_git': 'blah-id'}, 'some/path', '/api/1/revision/sha1/directory/') # then self.assertEquals(actual_dir_content, { 'type': 'dir', 'revision': 'rev-id', 'path': 'some/path', 'content': [] }) mock_service.lookup_directory_through_revision.assert_called_once_with( {'sha1_git': 'blah-id'}, 'some/path', limit=100, with_data=False) @patch('swh.web.api.views.revision.service') @istest def revision_directory_by_type_file(self, mock_service): # given mock_service.lookup_directory_through_revision.return_value = ( 'rev-id', { 'type': 'file', 'revision': 'rev-id', 'path': 'some/path', 'content': {'blah': 'blah'} }) # when actual_dir_content = _revision_directory_by( {'sha1_git': 'sha1'}, 'some/path', '/api/1/revision/origin/2/directory/', limit=1000, with_data=True) # then self.assertEquals(actual_dir_content, { 'type': 'file', 'revision': 'rev-id', 'path': 'some/path', 'content': {'blah': 'blah'} }) mock_service.lookup_directory_through_revision.assert_called_once_with( {'sha1_git': 'sha1'}, 'some/path', limit=1000, with_data=True) + @patch('swh.web.api.views.revision.parse_timestamp') @patch('swh.web.api.views.revision._revision_directory_by') @patch('swh.web.api.views.revision.utils') @istest def api_directory_through_revision_origin_ko_not_found(self, mock_utils, - mock_rev_dir): + mock_rev_dir, + mock_parse_timestamp): # noqa mock_rev_dir.side_effect = NotFoundExc('not found') - mock_utils.parse_timestamp.return_value = '2012-10-20 00:00:00' + mock_parse_timestamp.return_value = '2012-10-20 00:00:00' rv = self.client.get('/api/1/revision' '/origin/10' '/branch/refs/remote/origin/dev' '/ts/2012-10-20' '/directory/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, { 'exception': 'NotFoundExc', 'reason': 'not found'}) mock_rev_dir.assert_called_once_with( {'origin_id': '10', 'branch_name': 'refs/remote/origin/dev', 'ts': '2012-10-20 00:00:00'}, None, '/api/1/revision' '/origin/10' '/branch/refs/remote/origin/dev' '/ts/2012-10-20' '/directory/', with_data=False) @patch('swh.web.api.views.revision._revision_directory_by') @istest def api_directory_through_revision_origin(self, mock_revision_dir): expected_res = [{ 'id': '123' }] mock_revision_dir.return_value = expected_res rv = self.client.get('/api/1/revision/origin/3/directory/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEqual(rv.data, expected_res) mock_revision_dir.assert_called_once_with({ 'origin_id': '3', 'branch_name': 'refs/heads/master', 'ts': None}, None, '/api/1/revision/origin/3/directory/', with_data=False) @patch('swh.web.api.views.revision.service') @istest def api_revision_log(self, mock_service): # given stub_revisions = [{ 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': ['7834ef7e7c357ce2af928115c6c6a42b7e2a4345'], 'type': 'tar', 'synthetic': True, }] mock_service.lookup_revision_log.return_value = stub_revisions expected_revisions = [{ 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'url': '/api/1/revision/18d8be353ed3480476f032475e7c233eff7371d5/', 'history_url': '/api/1/revision/18d8be353ed3480476f032475e7c233ef' 'f7371d5/log/', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'directory_url': '/api/1/directory/7834ef7e7c357ce2af928115c6c6a' '42b7e2a44e6/', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': [{ 'id': '7834ef7e7c357ce2af928115c6c6a42b7e2a4345', 'url': '/api/1/revision/7834ef7e7c357ce2af928115c6c6a42b7e2a4345/', # noqa }], 'type': 'tar', 'synthetic': True, }] # when rv = self.client.get('/api/1/revision/8834ef7e7c357ce2af928115c6c6a42' 'b7e2a44e6/log/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEquals(rv.data, expected_revisions) self.assertFalse(rv.has_header('Link')) mock_service.lookup_revision_log.assert_called_once_with( '8834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 11) @patch('swh.web.api.views.revision.service') @istest def api_revision_log_with_next(self, mock_service): # given stub_revisions = [] for i in range(27): stub_revisions.append({'id': i}) mock_service.lookup_revision_log.return_value = stub_revisions[:26] expected_revisions = [x for x in stub_revisions if x['id'] < 25] for e in expected_revisions: e['url'] = '/api/1/revision/%s/' % e['id'] e['history_url'] = '/api/1/revision/%s/log/' % e['id'] # when rv = self.client.get('/api/1/revision/8834ef7e7c357ce2af928115c6c6a42' 'b7e2a44e6/log/?per_page=25') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEquals(rv.data, expected_revisions) self.assertEquals(rv['Link'], '; rel="next"') mock_service.lookup_revision_log.assert_called_once_with( '8834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 26) @patch('swh.web.api.views.revision.service') @istest def api_revision_log_not_found(self, mock_service): # given mock_service.lookup_revision_log.return_value = None # when rv = self.client.get('/api/1/revision/8834ef7e7c357ce2af928115c6c6' 'a42b7e2a44e6/log/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEquals(rv.data, { 'exception': 'NotFoundExc', 'reason': 'Revision with sha1_git' ' 8834ef7e7c357ce2af928115c6c6a42b7e2a44e6 not found.'}) self.assertFalse(rv.has_header('Link')) mock_service.lookup_revision_log.assert_called_once_with( '8834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 11) @patch('swh.web.api.views.revision.service') @istest def api_revision_log_context(self, mock_service): # given stub_revisions = [{ 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': ['7834ef7e7c357ce2af928115c6c6a42b7e2a4345'], 'type': 'tar', 'synthetic': True, }] mock_service.lookup_revision_log.return_value = stub_revisions mock_service.lookup_revision_multiple.return_value = [{ 'id': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'directory': '18d8be353ed3480476f032475e7c233eff7371d5', 'author_name': 'Name Surname', 'author_email': 'name@surname.com', 'committer_name': 'Name Surname', 'committer_email': 'name@surname.com', 'message': 'amazing revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': ['adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'], 'type': 'tar', 'synthetic': True, }] expected_revisions = [ { 'url': '/api/1/revision/' '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6/', 'history_url': '/api/1/revision/' '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6/log/', 'id': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'directory': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory_url': '/api/1/directory/' '18d8be353ed3480476f032475e7c233eff7371d5/', 'author_name': 'Name Surname', 'author_email': 'name@surname.com', 'committer_name': 'Name Surname', 'committer_email': 'name@surname.com', 'message': 'amazing revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': [{ 'id': 'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc', 'url': '/api/1/revision/adc83b19e793491b1c6ea0fd8b46cd9f32e592fc/', # noqa }], 'type': 'tar', 'synthetic': True, }, { 'url': '/api/1/revision/' '18d8be353ed3480476f032475e7c233eff7371d5/', 'history_url': '/api/1/revision/' '18d8be353ed3480476f032475e7c233eff7371d5/log/', 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'directory_url': '/api/1/directory/' '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6/', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': [{ 'id': '7834ef7e7c357ce2af928115c6c6a42b7e2a4345', 'url': '/api/1/revision/7834ef7e7c357ce2af928115c6c6a42b7e2a4345/', # noqa }], 'type': 'tar', 'synthetic': True, }] # when rv = self.client.get('/api/1/revision/18d8be353ed3480476f0' '32475e7c233eff7371d5/prev/21145781e2' '6ad1f978e/log/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEquals(expected_revisions, rv.data) self.assertFalse(rv.has_header('Link')) mock_service.lookup_revision_log.assert_called_once_with( '18d8be353ed3480476f032475e7c233eff7371d5', 11) mock_service.lookup_revision_multiple.assert_called_once_with( ['21145781e26ad1f978e']) @patch('swh.web.api.views.revision.service') @istest def api_revision_log_by(self, mock_service): # given stub_revisions = [{ 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': ['7834ef7e7c357ce2af928115c6c6a42b7e2a4345'], 'type': 'tar', 'synthetic': True, }] mock_service.lookup_revision_log_by.return_value = stub_revisions expected_revisions = [{ 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'url': '/api/1/revision/18d8be353ed3480476f032475e7c233eff7371d5/', 'history_url': '/api/1/revision/18d8be353ed3480476f032475e7c233ef' 'f7371d5/log/', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'directory_url': '/api/1/directory/7834ef7e7c357ce2af928115c6c6a' '42b7e2a44e6/', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': [{ 'id': '7834ef7e7c357ce2af928115c6c6a42b7e2a4345', 'url': '/api/1/revision/7834ef7e7c357ce2af928115c6c6a42b7e2a4345/' # noqa }], 'type': 'tar', 'synthetic': True, }] # when rv = self.client.get('/api/1/revision/origin/1/log/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEquals(rv.data, expected_revisions) self.assertFalse(rv.has_header('Link')) mock_service.lookup_revision_log_by.assert_called_once_with( '1', 'refs/heads/master', None, 11) @patch('swh.web.api.views.revision.service') @istest def api_revision_log_by_with_next(self, mock_service): # given stub_revisions = [] for i in range(27): stub_revisions.append({'id': i}) mock_service.lookup_revision_log_by.return_value = stub_revisions[:26] expected_revisions = [x for x in stub_revisions if x['id'] < 25] for e in expected_revisions: e['url'] = '/api/1/revision/%s/' % e['id'] e['history_url'] = '/api/1/revision/%s/log/' % e['id'] # when rv = self.client.get('/api/1/revision/origin/1/log/?per_page=25') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv['Content-Type'], 'application/json') self.assertIsNotNone(rv['Link']) self.assertEquals(rv.data, expected_revisions) mock_service.lookup_revision_log_by.assert_called_once_with( '1', 'refs/heads/master', None, 26) @patch('swh.web.api.views.revision.service') @istest def api_revision_log_by_norev(self, mock_service): # given mock_service.lookup_revision_log_by.side_effect = NotFoundExc( 'No revision') # when rv = self.client.get('/api/1/revision/origin/1/log/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv['Content-Type'], 'application/json') self.assertFalse(rv.has_header('Link')) self.assertEquals(rv.data, {'exception': 'NotFoundExc', 'reason': 'No revision'}) mock_service.lookup_revision_log_by.assert_called_once_with( '1', 'refs/heads/master', None, 11) @patch('swh.web.api.views.revision.service') @istest def api_revision_history(self, mock_service): # for readability purposes, we use: # - sha1 as 3 letters (url are way too long otherwise to respect pep8) # - only keys with modification steps (all other keys are kept as is) # given stub_revision = { 'id': '883', 'children': ['777', '999'], 'parents': [], 'directory': '272' } mock_service.lookup_revision.return_value = stub_revision # then rv = self.client.get('/api/1/revision/883/prev/999/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEquals(rv.data, { 'id': '883', 'url': '/api/1/revision/883/', 'history_url': '/api/1/revision/883/log/', 'history_context_url': '/api/1/revision/883/prev/999/log/', 'children': ['777', '999'], 'children_urls': ['/api/1/revision/777/', '/api/1/revision/999/'], 'parents': [], 'directory': '272', 'directory_url': '/api/1/directory/272/' }) mock_service.lookup_revision.assert_called_once_with('883') @patch('swh.web.api.views.revision._revision_directory_by') @istest def api_revision_directory_ko_not_found(self, mock_rev_dir): # given mock_rev_dir.side_effect = NotFoundExc('Not found') # then rv = self.client.get('/api/1/revision/999/directory/some/path/to/dir/') self.assertEquals(rv.status_code, 404) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEquals(rv.data, { 'exception': 'NotFoundExc', 'reason': 'Not found'}) mock_rev_dir.assert_called_once_with( {'sha1_git': '999'}, 'some/path/to/dir', '/api/1/revision/999/directory/some/path/to/dir/', with_data=False) @patch('swh.web.api.views.revision._revision_directory_by') @istest def api_revision_directory_ok_returns_dir_entries(self, mock_rev_dir): stub_dir = { 'type': 'dir', 'revision': '999', 'content': [ { 'sha1_git': '789', 'type': 'file', 'target': '101', 'target_url': '/api/1/content/sha1_git:101/', 'name': 'somefile', 'file_url': '/api/1/revision/999/directory/some/path/' 'somefile/' }, { 'sha1_git': '123', 'type': 'dir', 'target': '456', 'target_url': '/api/1/directory/456/', 'name': 'to-subdir', 'dir_url': '/api/1/revision/999/directory/some/path/' 'to-subdir/', }] } # given mock_rev_dir.return_value = stub_dir # then rv = self.client.get('/api/1/revision/999/directory/some/path/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEquals(rv.data, stub_dir) mock_rev_dir.assert_called_once_with( {'sha1_git': '999'}, 'some/path', '/api/1/revision/999/directory/some/path/', with_data=False) @patch('swh.web.api.views.revision._revision_directory_by') @istest def api_revision_directory_ok_returns_content(self, mock_rev_dir): stub_content = { 'type': 'file', 'revision': '999', 'content': { 'sha1_git': '789', 'sha1': '101', 'data_url': '/api/1/content/101/raw/', } } # given mock_rev_dir.return_value = stub_content # then url = '/api/1/revision/666/directory/some/other/path/' rv = self.client.get(url) self.assertEquals(rv.status_code, 200) self.assertEquals(rv['Content-Type'], 'application/json') self.assertEquals(rv.data, stub_content) mock_rev_dir.assert_called_once_with( {'sha1_git': '666'}, 'some/other/path', url, with_data=False) diff --git a/swh/web/tests/api/test_converters.py b/swh/web/tests/common/test_converters.py similarity index 99% rename from swh/web/tests/api/test_converters.py rename to swh/web/tests/common/test_converters.py index 537bbc6a..85aa99a8 100644 --- a/swh/web/tests/api/test_converters.py +++ b/swh/web/tests/common/test_converters.py @@ -1,743 +1,743 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import unittest from nose.tools import istest from swh.model import hashutil -from swh.web.api import converters +from swh.web.common import converters class ConvertersTestCase(unittest.TestCase): @istest def from_swh(self): some_input = { 'a': 'something', 'b': 'someone', 'c': b'sharp-0.3.4.tgz', 'd': hashutil.hash_to_bytes( 'b04caf10e9535160d90e874b45aa426de762f19f'), 'e': b'sharp.html/doc_002dS_005fISREG.html', 'g': [b'utf-8-to-decode', b'another-one'], 'h': 'something filtered', 'i': {'e': b'something'}, 'j': { 'k': { 'l': [b'bytes thing', b'another thingy', b''], 'n': 'dont care either' }, 'm': 'dont care' }, 'o': 'something', 'p': b'foo', 'q': {'extra-headers': [['a', b'intact']]}, 'w': None, 'r': {'p': 'also intact', 'q': 'bar'}, 's': { 'timestamp': 42, 'offset': -420, 'negative_utc': None, }, 's1': { 'timestamp': {'seconds': 42, 'microseconds': 0}, 'offset': -420, 'negative_utc': None, }, 's2': datetime.datetime( 2013, 7, 1, 20, 0, 0, tzinfo=datetime.timezone.utc), 't': None, 'u': None, 'v': None, 'x': None, } expected_output = { 'a': 'something', 'b': 'someone', 'c': 'sharp-0.3.4.tgz', 'd': 'b04caf10e9535160d90e874b45aa426de762f19f', 'e': 'sharp.html/doc_002dS_005fISREG.html', 'g': ['utf-8-to-decode', 'another-one'], 'i': {'e': 'something'}, 'j': { 'k': { 'l': ['bytes thing', 'another thingy', ''] } }, 'p': 'foo', 'q': {'extra-headers': [['a', 'intact']]}, 'w': {}, 'r': {'p': 'also intact', 'q': 'bar'}, 's': '1969-12-31T17:00:42-07:00', 's1': '1969-12-31T17:00:42-07:00', 's2': '2013-07-01T20:00:00+00:00', 'u': {}, 'v': [], 'x': None, } actual_output = converters.from_swh( some_input, hashess={'d', 'o', 'x'}, bytess={'c', 'e', 'g', 'l'}, dates={'s', 's1', 's2'}, blacklist={'h', 'm', 'n', 'o'}, removables_if_empty={'t'}, empty_dict={'u'}, empty_list={'v'}, convert={'p', 'q', 'w'}, convert_fn=converters.convert_revision_metadata) self.assertEquals(expected_output, actual_output) @istest def from_swh_edge_cases_do_no_conversion_if_none_or_not_bytes(self): some_input = { 'a': 'something', 'b': None, 'c': 'someone', 'd': None, } expected_output = { 'a': 'something', 'b': None, 'c': 'someone', 'd': None, } actual_output = converters.from_swh(some_input, hashess={'a', 'b'}, bytess={'c', 'd'}) self.assertEquals(expected_output, actual_output) @istest def from_swh_edge_cases_convert_invalid_utf8_bytes(self): some_input = { 'a': 'something', 'b': 'someone', 'c': b'a name \xff', 'd': b'an email \xff', } expected_output = { 'a': 'something', 'b': 'someone', 'c': 'a name \\xff', 'd': 'an email \\xff', 'decoding_failures': ['c', 'd'] } actual_output = converters.from_swh(some_input, hashess={'a', 'b'}, bytess={'c', 'd'}) for v in ['a', 'b', 'c', 'd']: self.assertEqual(expected_output[v], actual_output[v]) self.assertEqual(len(expected_output['decoding_failures']), len(actual_output['decoding_failures'])) for v in expected_output['decoding_failures']: self.assertTrue(v in actual_output['decoding_failures']) @istest def from_swh_empty(self): # when self.assertEquals({}, converters.from_swh({})) @istest def from_swh_none(self): # when self.assertIsNone(converters.from_swh(None)) @istest def from_provenance(self): # given input_provenance = { 'origin': 10, 'visit': 1, 'content': hashutil.hash_to_bytes( '321caf10e9535160d90e874b45aa426de762f19f'), 'revision': hashutil.hash_to_bytes( '123caf10e9535160d90e874b45aa426de762f19f'), 'path': b'octave-3.4.0/doc/interpreter/octave/doc_002dS_005fISREG' } expected_provenance = { 'origin': 10, 'visit': 1, 'content': '321caf10e9535160d90e874b45aa426de762f19f', 'revision': '123caf10e9535160d90e874b45aa426de762f19f', 'path': 'octave-3.4.0/doc/interpreter/octave/doc_002dS_005fISREG' } # when actual_provenance = converters.from_provenance(input_provenance) # then self.assertEqual(actual_provenance, expected_provenance) @istest def from_origin(self): # given origin_input = { 'id': 9, 'type': 'ftp', 'url': 'rsync://ftp.gnu.org/gnu/octave', 'project': None, 'lister': None, } expected_origin = { 'id': 9, 'type': 'ftp', 'url': 'rsync://ftp.gnu.org/gnu/octave', } # when actual_origin = converters.from_origin(origin_input) # then self.assertEqual(actual_origin, expected_origin) @istest def from_release(self): release_input = { 'id': hashutil.hash_to_bytes( 'aad23fa492a0c5fed0708a6703be875448c86884'), 'target': hashutil.hash_to_bytes( '5e46d564378afc44b31bb89f99d5675195fbdf67'), 'target_type': 'revision', 'date': { 'timestamp': datetime.datetime( 2015, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': False, }, 'author': { 'name': b'author name', 'fullname': b'Author Name author@email', 'email': b'author@email', }, 'name': b'v0.0.1', 'message': b'some comment on release', 'synthetic': True, } expected_release = { 'id': 'aad23fa492a0c5fed0708a6703be875448c86884', 'target': '5e46d564378afc44b31bb89f99d5675195fbdf67', 'target_type': 'revision', 'date': '2015-01-01T22:00:00+00:00', 'author': { 'name': 'author name', 'fullname': 'Author Name author@email', 'email': 'author@email', }, 'name': 'v0.0.1', 'message': 'some comment on release', 'target_type': 'revision', 'synthetic': True, } # when actual_release = converters.from_release(release_input) # then self.assertEqual(actual_release, expected_release) @istest def from_release_no_revision(self): release_input = { 'id': hashutil.hash_to_bytes( 'b2171ee2bdf119cd99a7ec7eff32fa8013ef9a4e'), 'target': None, 'date': { 'timestamp': datetime.datetime( 2016, 3, 2, 10, 0, 0, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': True, }, 'name': b'v0.1.1', 'message': b'comment on release', 'synthetic': False, 'author': { 'name': b'bob', 'fullname': b'Bob bob@alice.net', 'email': b'bob@alice.net', }, } expected_release = { 'id': 'b2171ee2bdf119cd99a7ec7eff32fa8013ef9a4e', 'target': None, 'date': '2016-03-02T10:00:00-00:00', 'name': 'v0.1.1', 'message': 'comment on release', 'synthetic': False, 'author': { 'name': 'bob', 'fullname': 'Bob bob@alice.net', 'email': 'bob@alice.net', }, } # when actual_release = converters.from_release(release_input) # then self.assertEqual(actual_release, expected_release) @istest def from_revision(self): revision_input = { 'id': hashutil.hash_to_bytes( '18d8be353ed3480476f032475e7c233eff7371d5'), 'directory': hashutil.hash_to_bytes( '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'), 'author': { 'name': b'Software Heritage', 'fullname': b'robot robot@softwareheritage.org', 'email': b'robot@softwareheritage.org', }, 'committer': { 'name': b'Software Heritage', 'fullname': b'robot robot@softwareheritage.org', 'email': b'robot@softwareheritage.org', }, 'message': b'synthetic revision message', 'date': { 'timestamp': datetime.datetime( 2000, 1, 17, 11, 23, 54, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': False, }, 'committer_date': { 'timestamp': datetime.datetime( 2000, 1, 17, 11, 23, 54, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': False, }, 'synthetic': True, 'type': 'tar', 'parents': [ hashutil.hash_to_bytes( '29d8be353ed3480476f032475e7c244eff7371d5'), hashutil.hash_to_bytes( '30d8be353ed3480476f032475e7c244eff7371d5') ], 'children': [ hashutil.hash_to_bytes( '123546353ed3480476f032475e7c244eff7371d5'), ], 'metadata': { 'extra_headers': [['gpgsig', b'some-signature']], 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912', }] }, } expected_revision = { 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author': { 'name': 'Software Heritage', 'fullname': 'robot robot@softwareheritage.org', 'email': 'robot@softwareheritage.org', }, 'committer': { 'name': 'Software Heritage', 'fullname': 'robot robot@softwareheritage.org', 'email': 'robot@softwareheritage.org', }, 'message': 'synthetic revision message', 'date': "2000-01-17T11:23:54+00:00", 'committer_date': "2000-01-17T11:23:54+00:00", 'children': [ '123546353ed3480476f032475e7c244eff7371d5' ], 'parents': [ '29d8be353ed3480476f032475e7c244eff7371d5', '30d8be353ed3480476f032475e7c244eff7371d5' ], 'type': 'tar', 'synthetic': True, 'metadata': { 'extra_headers': [['gpgsig', 'some-signature']], 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912' }] }, 'merge': True } # when actual_revision = converters.from_revision(revision_input) # then self.assertEqual(actual_revision, expected_revision) @istest def from_revision_nomerge(self): revision_input = { 'id': hashutil.hash_to_bytes( '18d8be353ed3480476f032475e7c233eff7371d5'), 'parents': [ hashutil.hash_to_bytes( '29d8be353ed3480476f032475e7c244eff7371d5') ] } expected_revision = { 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'parents': [ '29d8be353ed3480476f032475e7c244eff7371d5' ], 'merge': False } # when actual_revision = converters.from_revision(revision_input) # then self.assertEqual(actual_revision, expected_revision) @istest def from_revision_noparents(self): revision_input = { 'id': hashutil.hash_to_bytes( '18d8be353ed3480476f032475e7c233eff7371d5'), 'directory': hashutil.hash_to_bytes( '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'), 'author': { 'name': b'Software Heritage', 'fullname': b'robot robot@softwareheritage.org', 'email': b'robot@softwareheritage.org', }, 'committer': { 'name': b'Software Heritage', 'fullname': b'robot robot@softwareheritage.org', 'email': b'robot@softwareheritage.org', }, 'message': b'synthetic revision message', 'date': { 'timestamp': datetime.datetime( 2000, 1, 17, 11, 23, 54, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': False, }, 'committer_date': { 'timestamp': datetime.datetime( 2000, 1, 17, 11, 23, 54, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': False, }, 'synthetic': True, 'type': 'tar', 'children': [ hashutil.hash_to_bytes( '123546353ed3480476f032475e7c244eff7371d5'), ], 'metadata': { 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912', }] }, } expected_revision = { 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author': { 'name': 'Software Heritage', 'fullname': 'robot robot@softwareheritage.org', 'email': 'robot@softwareheritage.org', }, 'committer': { 'name': 'Software Heritage', 'fullname': 'robot robot@softwareheritage.org', 'email': 'robot@softwareheritage.org', }, 'message': 'synthetic revision message', 'date': "2000-01-17T11:23:54+00:00", 'committer_date': "2000-01-17T11:23:54+00:00", 'children': [ '123546353ed3480476f032475e7c244eff7371d5' ], 'type': 'tar', 'synthetic': True, 'metadata': { 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912' }] } } # when actual_revision = converters.from_revision(revision_input) # then self.assertEqual(actual_revision, expected_revision) @istest def from_revision_invalid(self): revision_input = { 'id': hashutil.hash_to_bytes( '18d8be353ed3480476f032475e7c233eff7371d5'), 'directory': hashutil.hash_to_bytes( '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'), 'author': { 'name': b'Software Heritage', 'fullname': b'robot robot@softwareheritage.org', 'email': b'robot@softwareheritage.org', }, 'committer': { 'name': b'Software Heritage', 'fullname': b'robot robot@softwareheritage.org', 'email': b'robot@softwareheritage.org', }, 'message': b'invalid message \xff', 'date': { 'timestamp': datetime.datetime( 2000, 1, 17, 11, 23, 54, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': False, }, 'committer_date': { 'timestamp': datetime.datetime( 2000, 1, 17, 11, 23, 54, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': False, }, 'synthetic': True, 'type': 'tar', 'parents': [ hashutil.hash_to_bytes( '29d8be353ed3480476f032475e7c244eff7371d5'), hashutil.hash_to_bytes( '30d8be353ed3480476f032475e7c244eff7371d5') ], 'children': [ hashutil.hash_to_bytes( '123546353ed3480476f032475e7c244eff7371d5'), ], 'metadata': { 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912', }] }, } expected_revision = { 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author': { 'name': 'Software Heritage', 'fullname': 'robot robot@softwareheritage.org', 'email': 'robot@softwareheritage.org', }, 'committer': { 'name': 'Software Heritage', 'fullname': 'robot robot@softwareheritage.org', 'email': 'robot@softwareheritage.org', }, 'message': None, 'message_decoding_failed': True, 'date': "2000-01-17T11:23:54+00:00", 'committer_date': "2000-01-17T11:23:54+00:00", 'children': [ '123546353ed3480476f032475e7c244eff7371d5' ], 'parents': [ '29d8be353ed3480476f032475e7c244eff7371d5', '30d8be353ed3480476f032475e7c244eff7371d5' ], 'type': 'tar', 'synthetic': True, 'metadata': { 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912' }] }, 'merge': True } # when actual_revision = converters.from_revision(revision_input) # then self.assertEqual(actual_revision, expected_revision) @istest def from_content_none(self): self.assertIsNone(converters.from_content(None)) @istest def from_content(self): content_input = { 'sha1': hashutil.hash_to_bytes( '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5'), 'sha256': hashutil.hash_to_bytes( '39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926'), 'blake2s256': hashutil.hash_to_bytes( '49007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926'), 'sha1_git': hashutil.hash_to_bytes( '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'), 'ctime': 'something-which-is-filtered-out', 'data': b'data in bytes', 'length': 10, 'status': 'hidden', } # 'status' is filtered expected_content = { 'sha1': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5', 'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d274' '7d3bf96c926', 'blake2s256': '49007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d2' '747d3bf96c926', 'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'data': b'data in bytes', 'length': 10, 'status': 'absent', } # when actual_content = converters.from_content(content_input) # then self.assertEqual(actual_content, expected_content) @istest def from_person(self): person_input = { 'id': 10, 'anything': 'else', 'name': b'bob', 'fullname': b'bob bob@alice.net', 'email': b'bob@foo.alice', } expected_person = { 'id': 10, 'anything': 'else', 'name': 'bob', 'fullname': 'bob bob@alice.net', 'email': 'bob@foo.alice', } # when actual_person = converters.from_person(person_input) # then self.assertEqual(actual_person, expected_person) @istest def from_directory_entries(self): dir_entries_input = { 'sha1': hashutil.hash_to_bytes( '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5'), 'sha256': hashutil.hash_to_bytes( '39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926'), 'sha1_git': hashutil.hash_to_bytes( '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'), 'target': hashutil.hash_to_bytes( '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'), 'dir_id': hashutil.hash_to_bytes( '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'), 'name': b'bob', 'type': 10, 'status': 'hidden', } expected_dir_entries = { 'sha1': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5', 'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d2747' 'd3bf96c926', 'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'target': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'dir_id': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'name': 'bob', 'type': 10, 'status': 'absent', } # when actual_dir_entries = converters.from_directory_entry(dir_entries_input) # then self.assertEqual(actual_dir_entries, expected_dir_entries) @istest def from_filetype(self): content_filetype = { 'id': hashutil.hash_to_bytes( '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5'), 'encoding': b'utf-8', 'mimetype': b'text/plain', } expected_content_filetype = { 'id': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5', 'encoding': 'utf-8', 'mimetype': 'text/plain', } # when actual_content_filetype = converters.from_filetype(content_filetype) # then self.assertEqual(actual_content_filetype, expected_content_filetype) diff --git a/swh/web/tests/api/test_query.py b/swh/web/tests/common/test_query.py similarity index 95% rename from swh/web/tests/api/test_query.py rename to swh/web/tests/common/test_query.py index cebc2b17..26090c8b 100644 --- a/swh/web/tests/api/test_query.py +++ b/swh/web/tests/common/test_query.py @@ -1,142 +1,142 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from unittest.mock import patch from nose.tools import istest from swh.model import hashutil -from swh.web.api import query -from swh.web.api.exc import BadInputExc +from swh.web.common import query +from swh.web.common.exc import BadInputExc class QueryTestCase(unittest.TestCase): @istest def parse_hash_malformed_query_with_more_than_2_parts(self): with self.assertRaises(BadInputExc): query.parse_hash('sha1:1234567890987654:other-stuff') @istest def parse_hash_guess_sha1(self): h = 'f1d2d2f924e986ac86fdf7b36c94bcdf32beec15' r = query.parse_hash(h) self.assertEquals(r, ('sha1', hashutil.hash_to_bytes(h))) @istest def parse_hash_guess_sha256(self): h = '084C799CD551DD1D8D5C5F9A5D593B2' \ 'E931F5E36122ee5c793c1d08a19839cc0' r = query.parse_hash(h) self.assertEquals(r, ('sha256', hashutil.hash_to_bytes(h))) @istest def parse_hash_guess_algo_malformed_hash(self): with self.assertRaises(BadInputExc): query.parse_hash('1234567890987654') @istest def parse_hash_check_sha1(self): h = 'f1d2d2f924e986ac86fdf7b36c94bcdf32beec15' r = query.parse_hash('sha1:' + h) self.assertEquals(r, ('sha1', hashutil.hash_to_bytes(h))) @istest def parse_hash_check_sha1_git(self): h = 'e1d2d2f924e986ac86fdf7b36c94bcdf32beec15' r = query.parse_hash('sha1_git:' + h) self.assertEquals(r, ('sha1_git', hashutil.hash_to_bytes(h))) @istest def parse_hash_check_sha256(self): h = '084C799CD551DD1D8D5C5F9A5D593B2E931F5E36122ee5c793c1d08a19839cc0' r = query.parse_hash('sha256:' + h) self.assertEquals(r, ('sha256', hashutil.hash_to_bytes(h))) @istest def parse_hash_check_algo_malformed_sha1_hash(self): with self.assertRaises(BadInputExc): query.parse_hash('sha1:1234567890987654') @istest def parse_hash_check_algo_malformed_sha1_git_hash(self): with self.assertRaises(BadInputExc): query.parse_hash('sha1_git:1234567890987654') @istest def parse_hash_check_algo_malformed_sha256_hash(self): with self.assertRaises(BadInputExc): query.parse_hash('sha256:1234567890987654') @istest def parse_hash_check_algo_unknown_one(self): with self.assertRaises(BadInputExc): query.parse_hash('sha2:1234567890987654') - @patch('swh.web.api.query.parse_hash') + @patch('swh.web.common.query.parse_hash') @istest def parse_hash_with_algorithms_or_throws_bad_query(self, mock_hash): # given mock_hash.side_effect = BadInputExc('Error input') # when with self.assertRaises(BadInputExc) as cm: query.parse_hash_with_algorithms_or_throws( 'sha1:blah', ['sha1'], 'useless error message for this use case') self.assertIn('Error input', cm.exception.args[0]) mock_hash.assert_called_once_with('sha1:blah') - @patch('swh.web.api.query.parse_hash') + @patch('swh.web.common.query.parse_hash') @istest def parse_hash_with_algorithms_or_throws_bad_algo(self, mock_hash): # given mock_hash.return_value = 'sha1', '123' # when with self.assertRaises(BadInputExc) as cm: query.parse_hash_with_algorithms_or_throws( 'sha1:431', ['sha1_git'], 'Only sha1_git!') self.assertIn('Only sha1_git!', cm.exception.args[0]) mock_hash.assert_called_once_with('sha1:431') - @patch('swh.web.api.query.parse_hash') + @patch('swh.web.common.query.parse_hash') @istest def parse_hash_with_algorithms(self, mock_hash): # given mock_hash.return_value = ('sha256', b'123') # when algo, sha = query.parse_hash_with_algorithms_or_throws( 'sha256:123', ['sha256', 'sha1_git'], 'useless error message for this use case') self.assertEquals(algo, 'sha256') self.assertEquals(sha, b'123') mock_hash.assert_called_once_with('sha256:123') @istest def parse_uuid4(self): # when actual_uuid = query.parse_uuid4('7c33636b-8f11-4bda-89d9-ba8b76a42cec') # then self.assertEquals(actual_uuid, '7c33636b-8f11-4bda-89d9-ba8b76a42cec') @istest def parse_uuid4_ko(self): # when with self.assertRaises(BadInputExc) as cm: query.parse_uuid4('7c33636b-8f11-4bda-89d9-ba8b76a42') self.assertIn('badly formed hexadecimal UUID string', cm.exception.args[0]) diff --git a/swh/web/tests/api/test_service.py b/swh/web/tests/common/test_service.py similarity index 94% rename from swh/web/tests/api/test_service.py rename to swh/web/tests/common/test_service.py index 29a1c947..f356c415 100644 --- a/swh/web/tests/api/test_service.py +++ b/swh/web/tests/common/test_service.py @@ -1,2058 +1,2058 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime +import unittest from nose.tools import istest from unittest.mock import MagicMock, patch, call from swh.model.hashutil import hash_to_bytes, hash_to_hex -from .swh_api_testcase import SWHApiTestCase -from swh.web.api import service -from swh.web.api.exc import BadInputExc, NotFoundExc +from swh.web.common import service +from swh.web.common.exc import BadInputExc, NotFoundExc -class ServiceTestCase(SWHApiTestCase): +class ServiceTestCase(unittest.TestCase): def setUp(self): self.SHA1_SAMPLE = '18d8be353ed3480476f032475e7c233eff7371d5' self.SHA1_SAMPLE_BIN = hash_to_bytes(self.SHA1_SAMPLE) self.SHA256_SAMPLE = ('39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926') self.SHA256_SAMPLE_BIN = hash_to_bytes(self.SHA256_SAMPLE) self.SHA1GIT_SAMPLE = '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03' self.SHA1GIT_SAMPLE_BIN = hash_to_bytes(self.SHA1GIT_SAMPLE) self.DIRECTORY_ID = '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6' self.DIRECTORY_ID_BIN = hash_to_bytes(self.DIRECTORY_ID) self.AUTHOR_ID_BIN = { 'name': b'author', 'email': b'author@company.org', } self.AUTHOR_ID = { 'name': 'author', 'email': 'author@company.org', } self.COMMITTER_ID_BIN = { 'name': b'committer', 'email': b'committer@corp.org', } self.COMMITTER_ID = { 'name': 'committer', 'email': 'committer@corp.org', } self.SAMPLE_DATE_RAW = { 'timestamp': datetime.datetime( 2000, 1, 17, 11, 23, 54, tzinfo=datetime.timezone.utc, ).timestamp(), 'offset': 0, 'negative_utc': False, } self.SAMPLE_DATE = '2000-01-17T11:23:54+00:00' self.SAMPLE_MESSAGE_BIN = b'elegant fix for bug 31415957' self.SAMPLE_MESSAGE = 'elegant fix for bug 31415957' self.SAMPLE_REVISION = { 'id': self.SHA1_SAMPLE, 'directory': self.DIRECTORY_ID, 'author': self.AUTHOR_ID, 'committer': self.COMMITTER_ID, 'message': self.SAMPLE_MESSAGE, 'date': self.SAMPLE_DATE, 'committer_date': self.SAMPLE_DATE, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': {}, 'merge': False } self.SAMPLE_REVISION_RAW = { 'id': self.SHA1_SAMPLE_BIN, 'directory': self.DIRECTORY_ID_BIN, 'author': self.AUTHOR_ID_BIN, 'committer': self.COMMITTER_ID_BIN, 'message': self.SAMPLE_MESSAGE_BIN, 'date': self.SAMPLE_DATE_RAW, 'committer_date': self.SAMPLE_DATE_RAW, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], } self.SAMPLE_CONTENT = { 'sha1': self.SHA1_SAMPLE, 'sha256': self.SHA256_SAMPLE, 'sha1_git': self.SHA1GIT_SAMPLE, 'length': 190, 'status': 'absent' } self.SAMPLE_CONTENT_RAW = { 'sha1': self.SHA1_SAMPLE_BIN, 'sha256': self.SHA256_SAMPLE_BIN, 'sha1_git': self.SHA1GIT_SAMPLE_BIN, 'length': 190, 'status': 'hidden' } self.date_origin_visit1 = datetime.datetime( 2015, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc) self.origin_visit1 = { 'date': self.date_origin_visit1, 'origin': 1, 'visit': 1 } - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def lookup_multiple_hashes_ball_missing(self, mock_storage): # given mock_storage.content_missing_per_sha1 = MagicMock(return_value=[]) # when actual_lookup = service.lookup_multiple_hashes( [{'filename': 'a', 'sha1': '456caf10e9535160d90e874b45aa426de762f19f'}, {'filename': 'b', 'sha1': '745bab676c8f3cec8016e0c39ea61cf57e518865'}]) # then self.assertEquals(actual_lookup, [ {'filename': 'a', 'sha1': '456caf10e9535160d90e874b45aa426de762f19f', 'found': True}, {'filename': 'b', 'sha1': '745bab676c8f3cec8016e0c39ea61cf57e518865', 'found': True} ]) - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def lookup_multiple_hashes_some_missing(self, mock_storage): # given mock_storage.content_missing_per_sha1 = MagicMock(return_value=[ hash_to_bytes('456caf10e9535160d90e874b45aa426de762f19f') ]) # when actual_lookup = service.lookup_multiple_hashes( [{'filename': 'a', 'sha1': '456caf10e9535160d90e874b45aa426de762f19f'}, {'filename': 'b', 'sha1': '745bab676c8f3cec8016e0c39ea61cf57e518865'}]) # then self.assertEquals(actual_lookup, [ {'filename': 'a', 'sha1': '456caf10e9535160d90e874b45aa426de762f19f', 'found': False}, {'filename': 'b', 'sha1': '745bab676c8f3cec8016e0c39ea61cf57e518865', 'found': True} ]) - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def lookup_hash_does_not_exist(self, mock_storage): # given mock_storage.content_find = MagicMock(return_value=None) # when actual_lookup = service.lookup_hash( 'sha1_git:123caf10e9535160d90e874b45aa426de762f19f') # then self.assertEquals({'found': None, 'algo': 'sha1_git'}, actual_lookup) # check the function has been called with parameters mock_storage.content_find.assert_called_with( {'sha1_git': hash_to_bytes('123caf10e9535160d90e874b45aa426de762f19f')}) - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def lookup_hash_exist(self, mock_storage): # given stub_content = { 'sha1': hash_to_bytes( '456caf10e9535160d90e874b45aa426de762f19f') } mock_storage.content_find = MagicMock(return_value=stub_content) # when actual_lookup = service.lookup_hash( 'sha1:456caf10e9535160d90e874b45aa426de762f19f') # then self.assertEquals({'found': stub_content, 'algo': 'sha1'}, actual_lookup) mock_storage.content_find.assert_called_with( {'sha1': hash_to_bytes('456caf10e9535160d90e874b45aa426de762f19f')} ) - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def search_hash_does_not_exist(self, mock_storage): # given mock_storage.content_find = MagicMock(return_value=None) # when actual_lookup = service.search_hash( 'sha1_git:123caf10e9535160d90e874b45aa426de762f19f') # then self.assertEquals({'found': False}, actual_lookup) # check the function has been called with parameters mock_storage.content_find.assert_called_with( {'sha1_git': hash_to_bytes('123caf10e9535160d90e874b45aa426de762f19f')}) - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def search_hash_exist(self, mock_storage): # given stub_content = { 'sha1': hash_to_bytes( '456caf10e9535160d90e874b45aa426de762f19f') } mock_storage.content_find = MagicMock(return_value=stub_content) # when actual_lookup = service.search_hash( 'sha1:456caf10e9535160d90e874b45aa426de762f19f') # then self.assertEquals({'found': True}, actual_lookup) mock_storage.content_find.assert_called_with( {'sha1': hash_to_bytes('456caf10e9535160d90e874b45aa426de762f19f')}, ) - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def lookup_content_ctags(self, mock_storage): # given mock_storage.content_ctags_get = MagicMock( return_value=[{ 'id': hash_to_bytes( '123caf10e9535160d90e874b45aa426de762f19f'), 'line': 100, 'name': 'hello', 'kind': 'function', 'tool_name': 'ctags', 'tool_version': 'some-version', }]) expected_ctags = [{ 'id': '123caf10e9535160d90e874b45aa426de762f19f', 'line': 100, 'name': 'hello', 'kind': 'function', 'tool_name': 'ctags', 'tool_version': 'some-version', }] # when actual_ctags = list(service.lookup_content_ctags( 'sha1:123caf10e9535160d90e874b45aa426de762f19f')) # then self.assertEqual(actual_ctags, expected_ctags) mock_storage.content_ctags_get.assert_called_with( [hash_to_bytes('123caf10e9535160d90e874b45aa426de762f19f')]) - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def lookup_content_ctags_no_hash(self, mock_storage): # given mock_storage.content_find.return_value = None mock_storage.content_ctags_get = MagicMock( return_value=None) # when actual_ctags = list(service.lookup_content_ctags( 'sha1_git:123caf10e9535160d90e874b45aa426de762f19f')) # then self.assertEqual(actual_ctags, []) mock_storage.content_find.assert_called_once_with( {'sha1_git': hash_to_bytes( '123caf10e9535160d90e874b45aa426de762f19f')}) - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def lookup_content_filetype(self, mock_storage): # given mock_storage.content_mimetype_get = MagicMock( return_value=[{ 'id': hash_to_bytes( '123caf10e9535160d90e874b45aa426de762f19f'), 'mimetype': b'text/x-c++', 'encoding': b'us-ascii', }]) expected_filetype = { 'id': '123caf10e9535160d90e874b45aa426de762f19f', 'mimetype': 'text/x-c++', 'encoding': 'us-ascii', } # when actual_filetype = service.lookup_content_filetype( 'sha1:123caf10e9535160d90e874b45aa426de762f19f') # then self.assertEqual(actual_filetype, expected_filetype) mock_storage.content_mimetype_get.assert_called_with( [hash_to_bytes('123caf10e9535160d90e874b45aa426de762f19f')]) - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def lookup_content_filetype_2(self, mock_storage): # given mock_storage.content_find = MagicMock( return_value={ 'sha1': hash_to_bytes( '123caf10e9535160d90e874b45aa426de762f19f') } ) mock_storage.content_mimetype_get = MagicMock( return_value=[{ 'id': hash_to_bytes( '123caf10e9535160d90e874b45aa426de762f19f'), 'mimetype': b'text/x-python', 'encoding': b'us-ascii', }] ) expected_filetype = { 'id': '123caf10e9535160d90e874b45aa426de762f19f', 'mimetype': 'text/x-python', 'encoding': 'us-ascii', } # when actual_filetype = service.lookup_content_filetype( 'sha1_git:456caf10e9535160d90e874b45aa426de762f19f') # then self.assertEqual(actual_filetype, expected_filetype) mock_storage.content_find( 'sha1_git', hash_to_bytes( '456caf10e9535160d90e874b45aa426de762f19f') ) mock_storage.content_mimetype_get.assert_called_with( [hash_to_bytes('123caf10e9535160d90e874b45aa426de762f19f')]) - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def lookup_content_language(self, mock_storage): # given mock_storage.content_language_get = MagicMock( return_value=[{ 'id': hash_to_bytes( '123caf10e9535160d90e874b45aa426de762f19f'), 'lang': 'python', }]) expected_language = { 'id': '123caf10e9535160d90e874b45aa426de762f19f', 'lang': 'python', } # when actual_language = service.lookup_content_language( 'sha1:123caf10e9535160d90e874b45aa426de762f19f') # then self.assertEqual(actual_language, expected_language) mock_storage.content_language_get.assert_called_with( [hash_to_bytes('123caf10e9535160d90e874b45aa426de762f19f')]) - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def lookup_content_language_2(self, mock_storage): # given mock_storage.content_find = MagicMock( return_value={ 'sha1': hash_to_bytes( '123caf10e9535160d90e874b45aa426de762f19f') } ) mock_storage.content_language_get = MagicMock( return_value=[{ 'id': hash_to_bytes( '123caf10e9535160d90e874b45aa426de762f19f'), 'lang': 'haskell', }] ) expected_language = { 'id': '123caf10e9535160d90e874b45aa426de762f19f', 'lang': 'haskell', } # when actual_language = service.lookup_content_language( 'sha1_git:456caf10e9535160d90e874b45aa426de762f19f') # then self.assertEqual(actual_language, expected_language) mock_storage.content_find( 'sha1_git', hash_to_bytes( '456caf10e9535160d90e874b45aa426de762f19f') ) mock_storage.content_language_get.assert_called_with( [hash_to_bytes('123caf10e9535160d90e874b45aa426de762f19f')]) - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def lookup_expression(self, mock_storage): # given mock_storage.content_ctags_search = MagicMock( return_value=[{ 'id': hash_to_bytes( '123caf10e9535160d90e874b45aa426de762f19f'), 'name': 'foobar', 'kind': 'variable', 'lang': 'C', 'line': 10 }]) expected_ctags = [{ 'sha1': '123caf10e9535160d90e874b45aa426de762f19f', 'name': 'foobar', 'kind': 'variable', 'lang': 'C', 'line': 10 }] # when actual_ctags = list(service.lookup_expression( 'foobar', last_sha1='hash', per_page=10)) # then self.assertEqual(actual_ctags, expected_ctags) mock_storage.content_ctags_search.assert_called_with( 'foobar', last_sha1='hash', limit=10) - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def lookup_expression_no_result(self, mock_storage): # given mock_storage.content_ctags_search = MagicMock( return_value=[]) expected_ctags = [] # when actual_ctags = list(service.lookup_expression( 'barfoo', last_sha1='hash', per_page=10)) # then self.assertEqual(actual_ctags, expected_ctags) mock_storage.content_ctags_search.assert_called_with( 'barfoo', last_sha1='hash', limit=10) - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def lookup_content_license(self, mock_storage): # given mock_storage.content_fossology_license_get = MagicMock( return_value=[{ 'id': hash_to_bytes( '123caf10e9535160d90e874b45aa426de762f19f'), 'lang': 'python', }]) expected_license = { 'id': '123caf10e9535160d90e874b45aa426de762f19f', 'lang': 'python', } # when actual_license = service.lookup_content_license( 'sha1:123caf10e9535160d90e874b45aa426de762f19f') # then self.assertEqual(actual_license, expected_license) mock_storage.content_fossology_license_get.assert_called_with( [hash_to_bytes('123caf10e9535160d90e874b45aa426de762f19f')]) - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def lookup_content_license_2(self, mock_storage): # given mock_storage.content_find = MagicMock( return_value={ 'sha1': hash_to_bytes( '123caf10e9535160d90e874b45aa426de762f19f') } ) mock_storage.content_fossology_license_get = MagicMock( return_value=[{ 'id': hash_to_bytes( '123caf10e9535160d90e874b45aa426de762f19f'), 'lang': 'haskell', }] ) expected_license = { 'id': '123caf10e9535160d90e874b45aa426de762f19f', 'lang': 'haskell', } # when actual_license = service.lookup_content_license( 'sha1_git:456caf10e9535160d90e874b45aa426de762f19f') # then self.assertEqual(actual_license, expected_license) mock_storage.content_find( 'sha1_git', hash_to_bytes( '456caf10e9535160d90e874b45aa426de762f19f') ) mock_storage.content_fossology_license_get.assert_called_with( [hash_to_bytes('123caf10e9535160d90e874b45aa426de762f19f')]) - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def lookup_content_provenance(self, mock_storage): # given mock_storage.content_find_provenance = MagicMock( return_value=(p for p in [{ 'content': hash_to_bytes( '123caf10e9535160d90e874b45aa426de762f19f'), 'revision': hash_to_bytes( '456caf10e9535160d90e874b45aa426de762f19f'), 'origin': 100, 'visit': 1, 'path': b'octavio-3.4.0/octave.html/doc_002dS_005fISREG.html' }])) expected_provenances = [{ 'content': '123caf10e9535160d90e874b45aa426de762f19f', 'revision': '456caf10e9535160d90e874b45aa426de762f19f', 'origin': 100, 'visit': 1, 'path': 'octavio-3.4.0/octave.html/doc_002dS_005fISREG.html' }] # when actual_provenances = service.lookup_content_provenance( 'sha1_git:123caf10e9535160d90e874b45aa426de762f19f') # then self.assertEqual(list(actual_provenances), expected_provenances) mock_storage.content_find_provenance.assert_called_with( {'sha1_git': hash_to_bytes('123caf10e9535160d90e874b45aa426de762f19f')}) - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def lookup_content_provenance_not_found(self, mock_storage): # given mock_storage.content_find_provenance = MagicMock(return_value=None) # when actual_provenances = service.lookup_content_provenance( 'sha1_git:456caf10e9535160d90e874b45aa426de762f19f') # then self.assertIsNone(actual_provenances) mock_storage.content_find_provenance.assert_called_with( {'sha1_git': hash_to_bytes('456caf10e9535160d90e874b45aa426de762f19f')}) - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def stat_counters(self, mock_storage): # given input_stats = { "content": 1770830, "directory": 211683, "directory_entry_dir": 209167, "directory_entry_file": 1807094, "directory_entry_rev": 0, "entity": 0, "entity_history": 0, "occurrence": 0, "occurrence_history": 19600, "origin": 1096, "person": 0, "release": 8584, "revision": 7792, "revision_history": 0, "skipped_content": 0 } mock_storage.stat_counters = MagicMock(return_value=input_stats) # when actual_stats = service.stat_counters() # then expected_stats = input_stats self.assertEqual(actual_stats, expected_stats) mock_storage.stat_counters.assert_called_with() - @patch('swh.web.api.service._lookup_origin_visits') + @patch('swh.web.common.service._lookup_origin_visits') @istest def lookup_origin_visits(self, mock_lookup_visits): # given date_origin_visit2 = datetime.datetime( 2013, 7, 1, 20, 0, 0, tzinfo=datetime.timezone.utc) date_origin_visit3 = datetime.datetime( 2015, 1, 1, 21, 0, 0, tzinfo=datetime.timezone.utc) stub_result = [self.origin_visit1, { 'date': date_origin_visit2, 'origin': 1, 'visit': 2, 'target': hash_to_bytes( '65a55bbdf3629f916219feb3dcc7393ded1bc8db'), 'branch': b'master', 'target_type': 'release', 'metadata': None, }, { 'date': date_origin_visit3, 'origin': 1, 'visit': 3 }] mock_lookup_visits.return_value = stub_result # when expected_origin_visits = [{ 'date': self.origin_visit1['date'].isoformat(), 'origin': self.origin_visit1['origin'], 'visit': self.origin_visit1['visit'] }, { 'date': date_origin_visit2.isoformat(), 'origin': 1, 'visit': 2, 'target': '65a55bbdf3629f916219feb3dcc7393ded1bc8db', 'branch': 'master', 'target_type': 'release', 'metadata': {}, }, { 'date': date_origin_visit3.isoformat(), 'origin': 1, 'visit': 3 }] actual_origin_visits = service.lookup_origin_visits(6) # then self.assertEqual(list(actual_origin_visits), expected_origin_visits) mock_lookup_visits.assert_called_once_with( 6, last_visit=None, limit=10) - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def lookup_origin_visit(self, mock_storage): # given stub_result = self.origin_visit1 mock_storage.origin_visit_get_by.return_value = stub_result expected_origin_visit = { 'date': self.origin_visit1['date'].isoformat(), 'origin': self.origin_visit1['origin'], 'visit': self.origin_visit1['visit'] } # when actual_origin_visit = service.lookup_origin_visit(1, 1) # then self.assertEqual(actual_origin_visit, expected_origin_visit) mock_storage.origin_visit_get_by.assert_called_once_with(1, 1) - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def lookup_origin(self, mock_storage): # given mock_storage.origin_get = MagicMock(return_value={ 'id': 'origin-id', 'lister': 'uuid-lister', 'project': 'uuid-project', 'url': 'ftp://some/url/to/origin', 'type': 'ftp'}) # when actual_origin = service.lookup_origin({'id': 'origin-id'}) # then self.assertEqual(actual_origin, {'id': 'origin-id', 'lister': 'uuid-lister', 'project': 'uuid-project', 'url': 'ftp://some/url/to/origin', 'type': 'ftp'}) mock_storage.origin_get.assert_called_with({'id': 'origin-id'}) - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def lookup_release_ko_id_checksum_not_ok_because_not_a_sha1(self, mock_storage): # given mock_storage.release_get = MagicMock() with self.assertRaises(BadInputExc) as cm: # when service.lookup_release('not-a-sha1') self.assertIn('invalid checksum', cm.exception.args[0]) mock_storage.release_get.called = False - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def lookup_release_ko_id_checksum_ok_but_not_a_sha1(self, mock_storage): # given mock_storage.release_get = MagicMock() # when with self.assertRaises(BadInputExc) as cm: service.lookup_release( '13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f62d4daf5' '1aea892abe') self.assertIn('sha1_git supported', cm.exception.args[0]) mock_storage.release_get.called = False - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def lookup_directory_with_path_not_found(self, mock_storage): # given mock_storage.lookup_directory_with_path = MagicMock(return_value=None) sha1_git = '65a55bbdf3629f916219feb3dcc7393ded1bc8db' # when actual_directory = mock_storage.lookup_directory_with_path( sha1_git, 'some/path/here') self.assertIsNone(actual_directory) - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def lookup_directory_with_path_found(self, mock_storage): # given sha1_git = '65a55bbdf3629f916219feb3dcc7393ded1bc8db' entry = {'id': 'dir-id', 'type': 'dir', 'name': 'some/path/foo'} mock_storage.lookup_directory_with_path = MagicMock(return_value=entry) # when actual_directory = mock_storage.lookup_directory_with_path( sha1_git, 'some/path/here') self.assertEqual(entry, actual_directory) - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def lookup_release(self, mock_storage): # given mock_storage.release_get = MagicMock(return_value=[{ 'id': hash_to_bytes('65a55bbdf3629f916219feb3dcc7393ded1bc8db'), 'target': None, 'date': { 'timestamp': datetime.datetime( 2015, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': True, }, 'name': b'v0.0.1', 'message': b'synthetic release', 'synthetic': True, }]) # when actual_release = service.lookup_release( '65a55bbdf3629f916219feb3dcc7393ded1bc8db') # then self.assertEqual(actual_release, { 'id': '65a55bbdf3629f916219feb3dcc7393ded1bc8db', 'target': None, 'date': '2015-01-01T22:00:00-00:00', 'name': 'v0.0.1', 'message': 'synthetic release', 'synthetic': True, }) mock_storage.release_get.assert_called_with( [hash_to_bytes('65a55bbdf3629f916219feb3dcc7393ded1bc8db')]) @istest def lookup_revision_with_context_ko_not_a_sha1_1(self): # given sha1_git = '13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f62d4' \ 'daf51aea892abe' sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db' # when with self.assertRaises(BadInputExc) as cm: service.lookup_revision_with_context(sha1_git_root, sha1_git) self.assertIn('Only sha1_git is supported', cm.exception.args[0]) @istest def lookup_revision_with_context_ko_not_a_sha1_2(self): # given sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db' sha1_git = '13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f6' \ '2d4daf51aea892abe' # when with self.assertRaises(BadInputExc) as cm: service.lookup_revision_with_context(sha1_git_root, sha1_git) self.assertIn('Only sha1_git is supported', cm.exception.args[0]) - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def lookup_revision_with_context_ko_sha1_git_does_not_exist( self, mock_storage): # given sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db' sha1_git = '777777bdf3629f916219feb3dcc7393ded1bc8db' sha1_git_bin = hash_to_bytes(sha1_git) mock_storage.revision_get.return_value = None # when with self.assertRaises(NotFoundExc) as cm: service.lookup_revision_with_context(sha1_git_root, sha1_git) self.assertIn('Revision 777777bdf3629f916219feb3dcc7393ded1bc8db' ' not found', cm.exception.args[0]) mock_storage.revision_get.assert_called_once_with( [sha1_git_bin]) - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def lookup_revision_with_context_ko_root_sha1_git_does_not_exist( self, mock_storage): # given sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db' sha1_git = '777777bdf3629f916219feb3dcc7393ded1bc8db' sha1_git_root_bin = hash_to_bytes(sha1_git_root) sha1_git_bin = hash_to_bytes(sha1_git) mock_storage.revision_get.side_effect = ['foo', None] # when with self.assertRaises(NotFoundExc) as cm: service.lookup_revision_with_context(sha1_git_root, sha1_git) self.assertIn('Revision 65a55bbdf3629f916219feb3dcc7393ded1bc8db' ' not found', cm.exception.args[0]) mock_storage.revision_get.assert_has_calls([call([sha1_git_bin]), call([sha1_git_root_bin])]) - @patch('swh.web.api.service.storage') - @patch('swh.web.api.service.query') + @patch('swh.web.common.service.storage') + @patch('swh.web.common.service.query') @istest def lookup_revision_with_context(self, mock_query, mock_storage): # given sha1_git_root = '666' sha1_git = '883' sha1_git_root_bin = b'666' sha1_git_bin = b'883' sha1_git_root_dict = { 'id': sha1_git_root_bin, 'parents': [b'999'], } sha1_git_dict = { 'id': sha1_git_bin, 'parents': [], 'directory': b'278', } stub_revisions = [ sha1_git_root_dict, { 'id': b'999', 'parents': [b'777', b'883', b'888'], }, { 'id': b'777', 'parents': [b'883'], }, sha1_git_dict, { 'id': b'888', 'parents': [b'889'], }, { 'id': b'889', 'parents': [], }, ] # inputs ok mock_query.parse_hash_with_algorithms_or_throws.side_effect = [ ('sha1', sha1_git_bin), ('sha1', sha1_git_root_bin) ] # lookup revision first 883, then 666 (both exists) mock_storage.revision_get.return_value = [ sha1_git_dict, sha1_git_root_dict ] mock_storage.revision_log = MagicMock( return_value=stub_revisions) # when actual_revision = service.lookup_revision_with_context( sha1_git_root, sha1_git) # then self.assertEquals(actual_revision, { 'id': hash_to_hex(sha1_git_bin), 'parents': [], 'children': [hash_to_hex(b'999'), hash_to_hex(b'777')], 'directory': hash_to_hex(b'278'), 'merge': False }) mock_query.parse_hash_with_algorithms_or_throws.assert_has_calls( [call(sha1_git, ['sha1'], 'Only sha1_git is supported.'), call(sha1_git_root, ['sha1'], 'Only sha1_git is supported.')]) mock_storage.revision_log.assert_called_with( [sha1_git_root_bin], 100) - @patch('swh.web.api.service.storage') - @patch('swh.web.api.service.query') + @patch('swh.web.common.service.storage') + @patch('swh.web.common.service.query') @istest def lookup_revision_with_context_sha1_git_root_already_retrieved_as_dict( self, mock_query, mock_storage): # given sha1_git = '883' sha1_git_root_bin = b'666' sha1_git_bin = b'883' sha1_git_root_dict = { 'id': sha1_git_root_bin, 'parents': [b'999'], } sha1_git_dict = { 'id': sha1_git_bin, 'parents': [], 'directory': b'278', } stub_revisions = [ sha1_git_root_dict, { 'id': b'999', 'parents': [b'777', b'883', b'888'], }, { 'id': b'777', 'parents': [b'883'], }, sha1_git_dict, { 'id': b'888', 'parents': [b'889'], }, { 'id': b'889', 'parents': [], }, ] # inputs ok mock_query.parse_hash_with_algorithms_or_throws.return_value = ( 'sha1', sha1_git_bin) # lookup only on sha1 mock_storage.revision_get.return_value = [sha1_git_dict] mock_storage.revision_log.return_value = stub_revisions # when actual_revision = service.lookup_revision_with_context( {'id': sha1_git_root_bin}, sha1_git) # then self.assertEquals(actual_revision, { 'id': hash_to_hex(sha1_git_bin), 'parents': [], 'children': [hash_to_hex(b'999'), hash_to_hex(b'777')], 'directory': hash_to_hex(b'278'), 'merge': False }) mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with( # noqa sha1_git, ['sha1'], 'Only sha1_git is supported.') mock_storage.revision_get.assert_called_once_with([sha1_git_bin]) mock_storage.revision_log.assert_called_with( [sha1_git_root_bin], 100) - @patch('swh.web.api.service.storage') - @patch('swh.web.api.service.query') + @patch('swh.web.common.service.storage') + @patch('swh.web.common.service.query') @istest def lookup_directory_with_revision_ko_revision_not_found(self, mock_query, mock_storage): # given mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1', b'123') mock_storage.revision_get.return_value = None # when with self.assertRaises(NotFoundExc) as cm: service.lookup_directory_with_revision('123') self.assertIn('Revision 123 not found', cm.exception.args[0]) mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with ('123', ['sha1'], 'Only sha1_git is supported.') mock_storage.revision_get.assert_called_once_with([b'123']) - @patch('swh.web.api.service.storage') - @patch('swh.web.api.service.query') + @patch('swh.web.common.service.storage') + @patch('swh.web.common.service.query') @istest def lookup_directory_with_revision_ko_revision_with_path_to_nowhere( self, mock_query, mock_storage): # given mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1', b'123') dir_id = b'dir-id-as-sha1' mock_storage.revision_get.return_value = [{ 'directory': dir_id, }] mock_storage.directory_entry_get_by_path.return_value = None # when with self.assertRaises(NotFoundExc) as cm: service.lookup_directory_with_revision( '123', 'path/to/something/unknown') self.assertIn("Directory/File 'path/to/something/unknown' " + "pointed to by revision 123 not found", cm.exception.args[0]) mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with ('123', ['sha1'], 'Only sha1_git is supported.') mock_storage.revision_get.assert_called_once_with([b'123']) mock_storage.directory_entry_get_by_path.assert_called_once_with( b'dir-id-as-sha1', [b'path', b'to', b'something', b'unknown']) - @patch('swh.web.api.service.storage') - @patch('swh.web.api.service.query') + @patch('swh.web.common.service.storage') + @patch('swh.web.common.service.query') @istest def lookup_directory_with_revision_ko_type_not_implemented( self, mock_query, mock_storage): # given mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1', b'123') dir_id = b'dir-id-as-sha1' mock_storage.revision_get.return_value = [{ 'directory': dir_id, }] mock_storage.directory_entry_get_by_path.return_value = { 'type': 'rev', 'name': b'some/path/to/rev', 'target': b'456' } stub_content = { 'id': b'12', 'type': 'file' } mock_storage.content_get.return_value = stub_content # when with self.assertRaises(NotImplementedError) as cm: service.lookup_directory_with_revision( '123', 'some/path/to/rev') self.assertIn("Entity of type 'rev' not implemented.", cm.exception.args[0]) # then mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with ('123', ['sha1'], 'Only sha1_git is supported.') mock_storage.revision_get.assert_called_once_with([b'123']) mock_storage.directory_entry_get_by_path.assert_called_once_with( b'dir-id-as-sha1', [b'some', b'path', b'to', b'rev']) - @patch('swh.web.api.service.storage') - @patch('swh.web.api.service.query') + @patch('swh.web.common.service.storage') + @patch('swh.web.common.service.query') @istest def lookup_directory_with_revision_revision_without_path(self, mock_query, mock_storage): # given mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1', b'123') dir_id = b'dir-id-as-sha1' mock_storage.revision_get.return_value = [{ 'directory': dir_id, }] stub_dir_entries = [{ 'id': b'123', 'type': 'dir' }, { 'id': b'456', 'type': 'file' }] mock_storage.directory_ls.return_value = stub_dir_entries # when actual_directory_entries = service.lookup_directory_with_revision( '123') self.assertEqual(actual_directory_entries['type'], 'dir') self.assertEqual(list(actual_directory_entries['content']), stub_dir_entries) mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with ('123', ['sha1'], 'Only sha1_git is supported.') mock_storage.revision_get.assert_called_once_with([b'123']) mock_storage.directory_ls.assert_called_once_with(dir_id) - @patch('swh.web.api.service.storage') - @patch('swh.web.api.service.query') + @patch('swh.web.common.service.storage') + @patch('swh.web.common.service.query') @istest def lookup_directory_with_revision_revision_with_path_to_dir(self, mock_query, mock_storage): # given mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1', b'123') dir_id = b'dir-id-as-sha1' mock_storage.revision_get.return_value = [{ 'directory': dir_id, }] stub_dir_entries = [{ 'id': b'12', 'type': 'dir' }, { 'id': b'34', 'type': 'file' }] mock_storage.directory_entry_get_by_path.return_value = { 'type': 'dir', 'name': b'some/path', 'target': b'456' } mock_storage.directory_ls.return_value = stub_dir_entries # when actual_directory_entries = service.lookup_directory_with_revision( '123', 'some/path') self.assertEqual(actual_directory_entries['type'], 'dir') self.assertEqual(actual_directory_entries['revision'], '123') self.assertEqual(actual_directory_entries['path'], 'some/path') self.assertEqual(list(actual_directory_entries['content']), stub_dir_entries) mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with ('123', ['sha1'], 'Only sha1_git is supported.') mock_storage.revision_get.assert_called_once_with([b'123']) mock_storage.directory_entry_get_by_path.assert_called_once_with( dir_id, [b'some', b'path']) mock_storage.directory_ls.assert_called_once_with(b'456') - @patch('swh.web.api.service.storage') - @patch('swh.web.api.service.query') + @patch('swh.web.common.service.storage') + @patch('swh.web.common.service.query') @istest def lookup_directory_with_revision_revision_with_path_to_file_without_data( self, mock_query, mock_storage): # given mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1', b'123') dir_id = b'dir-id-as-sha1' mock_storage.revision_get.return_value = [{ 'directory': dir_id, }] mock_storage.directory_entry_get_by_path.return_value = { 'type': 'file', 'name': b'some/path/to/file', 'target': b'789' } stub_content = { 'status': 'visible', } mock_storage.content_find.return_value = stub_content # when actual_content = service.lookup_directory_with_revision( '123', 'some/path/to/file') # then self.assertEqual(actual_content, {'type': 'file', 'revision': '123', 'path': 'some/path/to/file', 'content': stub_content}) mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with ('123', ['sha1'], 'Only sha1_git is supported.') mock_storage.revision_get.assert_called_once_with([b'123']) mock_storage.directory_entry_get_by_path.assert_called_once_with( b'dir-id-as-sha1', [b'some', b'path', b'to', b'file']) mock_storage.content_find.assert_called_once_with({'sha1_git': b'789'}) - @patch('swh.web.api.service.storage') - @patch('swh.web.api.service.query') + @patch('swh.web.common.service.storage') + @patch('swh.web.common.service.query') @istest def lookup_directory_with_revision_revision_with_path_to_file_with_data( self, mock_query, mock_storage): # given mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1', b'123') dir_id = b'dir-id-as-sha1' mock_storage.revision_get.return_value = [{ 'directory': dir_id, }] mock_storage.directory_entry_get_by_path.return_value = { 'type': 'file', 'name': b'some/path/to/file', 'target': b'789' } stub_content = { 'status': 'visible', 'sha1': b'content-sha1' } mock_storage.content_find.return_value = stub_content mock_storage.content_get.return_value = [{ 'sha1': b'content-sha1', 'data': b'some raw data' }] expected_content = { 'status': 'visible', 'sha1': hash_to_hex(b'content-sha1'), 'data': b'some raw data' } # when actual_content = service.lookup_directory_with_revision( '123', 'some/path/to/file', with_data=True) # then self.assertEqual(actual_content, {'type': 'file', 'revision': '123', 'path': 'some/path/to/file', 'content': expected_content}) mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with ('123', ['sha1'], 'Only sha1_git is supported.') mock_storage.revision_get.assert_called_once_with([b'123']) mock_storage.directory_entry_get_by_path.assert_called_once_with( b'dir-id-as-sha1', [b'some', b'path', b'to', b'file']) mock_storage.content_find.assert_called_once_with({'sha1_git': b'789'}) mock_storage.content_get.assert_called_once_with([b'content-sha1']) - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def lookup_revision(self, mock_storage): # given mock_storage.revision_get = MagicMock( return_value=[self.SAMPLE_REVISION_RAW]) # when actual_revision = service.lookup_revision( self.SHA1_SAMPLE) # then self.assertEqual(actual_revision, self.SAMPLE_REVISION) mock_storage.revision_get.assert_called_with( [self.SHA1_SAMPLE_BIN]) - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def lookup_revision_invalid_msg(self, mock_storage): # given stub_rev = self.SAMPLE_REVISION_RAW stub_rev['message'] = b'elegant fix for bug \xff' expected_revision = self.SAMPLE_REVISION expected_revision['message'] = None expected_revision['message_decoding_failed'] = True mock_storage.revision_get = MagicMock(return_value=[stub_rev]) # when actual_revision = service.lookup_revision( self.SHA1_SAMPLE) # then self.assertEqual(actual_revision, expected_revision) mock_storage.revision_get.assert_called_with( [self.SHA1_SAMPLE_BIN]) - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def lookup_revision_msg_ok(self, mock_storage): # given mock_storage.revision_get.return_value = [self.SAMPLE_REVISION_RAW] # when rv = service.lookup_revision_message( self.SHA1_SAMPLE) # then self.assertEquals(rv, {'message': self.SAMPLE_MESSAGE_BIN}) mock_storage.revision_get.assert_called_with( [self.SHA1_SAMPLE_BIN]) - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def lookup_revision_msg_absent(self, mock_storage): # given stub_revision = self.SAMPLE_REVISION_RAW del stub_revision['message'] mock_storage.revision_get.return_value = stub_revision # when with self.assertRaises(NotFoundExc) as cm: service.lookup_revision_message( self.SHA1_SAMPLE) # then mock_storage.revision_get.assert_called_with( self.SHA1_SAMPLE_BIN) self.assertEqual(cm.exception.args[0], 'No message for revision ' 'with sha1_git ' '18d8be353ed3480476f032475e7c233eff7371d5.') - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def lookup_revision_msg_norev(self, mock_storage): # given mock_storage.revision_get.return_value = None # when with self.assertRaises(NotFoundExc) as cm: service.lookup_revision_message( self.SHA1_SAMPLE) # then mock_storage.revision_get.assert_called_with( self.SHA1_SAMPLE_BIN) self.assertEqual(cm.exception.args[0], 'Revision with sha1_git ' '18d8be353ed3480476f032475e7c233eff7371d5 ' 'not found.') - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def lookup_revision_multiple(self, mock_storage): # given sha1 = self.SHA1_SAMPLE sha1_other = 'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc' stub_revisions = [ self.SAMPLE_REVISION_RAW, { 'id': hash_to_bytes(sha1_other), 'directory': 'abcdbe353ed3480476f032475e7c233eff7371d5', 'author': { 'name': b'name', 'email': b'name@surname.org', }, 'committer': { 'name': b'name', 'email': b'name@surname.org', }, 'message': b'ugly fix for bug 42', 'date': { 'timestamp': datetime.datetime( 2000, 1, 12, 5, 23, 54, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': False }, 'date_offset': 0, 'committer_date': { 'timestamp': datetime.datetime( 2000, 1, 12, 5, 23, 54, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': False }, 'committer_date_offset': 0, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], } ] mock_storage.revision_get.return_value = stub_revisions # when actual_revisions = service.lookup_revision_multiple( [sha1, sha1_other]) # then self.assertEqual(list(actual_revisions), [ self.SAMPLE_REVISION, { 'id': sha1_other, 'directory': 'abcdbe353ed3480476f032475e7c233eff7371d5', 'author': { 'name': 'name', 'email': 'name@surname.org', }, 'committer': { 'name': 'name', 'email': 'name@surname.org', }, 'message': 'ugly fix for bug 42', 'date': '2000-01-12T05:23:54+00:00', 'date_offset': 0, 'committer_date': '2000-01-12T05:23:54+00:00', 'committer_date_offset': 0, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': {}, 'merge': False } ]) self.assertEqual( list(mock_storage.revision_get.call_args[0][0]), [hash_to_bytes(sha1), hash_to_bytes(sha1_other)]) - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def lookup_revision_multiple_none_found(self, mock_storage): # given sha1_bin = self.SHA1_SAMPLE sha1_other = 'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc' mock_storage.revision_get.return_value = [] # then actual_revisions = service.lookup_revision_multiple( [sha1_bin, sha1_other]) self.assertEqual(list(actual_revisions), []) self.assertEqual( list(mock_storage.revision_get.call_args[0][0]), [hash_to_bytes(self.SHA1_SAMPLE), hash_to_bytes(sha1_other)]) - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def lookup_revision_log(self, mock_storage): # given stub_revision_log = [self.SAMPLE_REVISION_RAW] mock_storage.revision_log = MagicMock(return_value=stub_revision_log) # when actual_revision = service.lookup_revision_log( 'abcdbe353ed3480476f032475e7c233eff7371d5', limit=25) # then self.assertEqual(list(actual_revision), [self.SAMPLE_REVISION]) mock_storage.revision_log.assert_called_with( [hash_to_bytes('abcdbe353ed3480476f032475e7c233eff7371d5')], 25) - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def lookup_revision_log_by(self, mock_storage): # given stub_revision_log = [self.SAMPLE_REVISION_RAW] mock_storage.revision_log_by = MagicMock( return_value=stub_revision_log) # when actual_log = service.lookup_revision_log_by( 1, 'refs/heads/master', None, limit=100) # then self.assertEqual(list(actual_log), [self.SAMPLE_REVISION]) mock_storage.revision_log_by.assert_called_with( 1, 'refs/heads/master', None, limit=100) - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def lookup_revision_log_by_nolog(self, mock_storage): # given mock_storage.revision_log_by = MagicMock(return_value=None) # when res = service.lookup_revision_log_by( 1, 'refs/heads/master', None, limit=100) # then self.assertEquals(res, None) mock_storage.revision_log_by.assert_called_with( 1, 'refs/heads/master', None, limit=100) - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def lookup_content_raw_not_found(self, mock_storage): # given mock_storage.content_find = MagicMock(return_value=None) # when actual_content = service.lookup_content_raw( 'sha1:18d8be353ed3480476f032475e7c233eff7371d5') # then self.assertIsNone(actual_content) mock_storage.content_find.assert_called_with( {'sha1': hash_to_bytes(self.SHA1_SAMPLE)}) - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def lookup_content_raw(self, mock_storage): # given mock_storage.content_find = MagicMock(return_value={ 'sha1': self.SHA1_SAMPLE, }) mock_storage.content_get = MagicMock(return_value=[{ 'data': b'binary data'}]) # when actual_content = service.lookup_content_raw( 'sha256:%s' % self.SHA256_SAMPLE) # then self.assertEquals(actual_content, {'data': b'binary data'}) mock_storage.content_find.assert_called_once_with( {'sha256': self.SHA256_SAMPLE_BIN}) mock_storage.content_get.assert_called_once_with( [self.SHA1_SAMPLE]) - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def lookup_content_not_found(self, mock_storage): # given mock_storage.content_find = MagicMock(return_value=None) # when actual_content = service.lookup_content( 'sha1:%s' % self.SHA1_SAMPLE) # then self.assertIsNone(actual_content) mock_storage.content_find.assert_called_with( {'sha1': self.SHA1_SAMPLE_BIN}) - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def lookup_content_with_sha1(self, mock_storage): # given mock_storage.content_find = MagicMock( return_value=self.SAMPLE_CONTENT_RAW) # when actual_content = service.lookup_content( 'sha1:%s' % self.SHA1_SAMPLE) # then self.assertEqual(actual_content, self.SAMPLE_CONTENT) mock_storage.content_find.assert_called_with( {'sha1': hash_to_bytes(self.SHA1_SAMPLE)}) - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def lookup_content_with_sha256(self, mock_storage): # given stub_content = self.SAMPLE_CONTENT_RAW stub_content['status'] = 'visible' expected_content = self.SAMPLE_CONTENT expected_content['status'] = 'visible' mock_storage.content_find = MagicMock( return_value=stub_content) # when actual_content = service.lookup_content( 'sha256:%s' % self.SHA256_SAMPLE) # then self.assertEqual(actual_content, expected_content) mock_storage.content_find.assert_called_with( {'sha256': self.SHA256_SAMPLE_BIN}) - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def lookup_person(self, mock_storage): # given mock_storage.person_get = MagicMock(return_value=[{ 'id': 'person_id', 'name': b'some_name', 'email': b'some-email', }]) # when actual_person = service.lookup_person('person_id') # then self.assertEqual(actual_person, { 'id': 'person_id', 'name': 'some_name', 'email': 'some-email', }) mock_storage.person_get.assert_called_with(['person_id']) - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def lookup_directory_bad_checksum(self, mock_storage): # given mock_storage.directory_ls = MagicMock() # when with self.assertRaises(BadInputExc): service.lookup_directory('directory_id') # then mock_storage.directory_ls.called = False - @patch('swh.web.api.service.storage') - @patch('swh.web.api.service.query') + @patch('swh.web.common.service.storage') + @patch('swh.web.common.service.query') @istest def lookup_directory_not_found(self, mock_query, mock_storage): # given mock_query.parse_hash_with_algorithms_or_throws.return_value = ( 'sha1', 'directory-id-bin') mock_storage.directory_get.return_value = None # when actual_dir = service.lookup_directory('directory_id') # then self.assertIsNone(actual_dir) mock_query.parse_hash_with_algorithms_or_throws.assert_called_with( 'directory_id', ['sha1'], 'Only sha1_git is supported.') mock_storage.directory_get.assert_called_with(['directory-id-bin']) mock_storage.directory_ls.called = False - @patch('swh.web.api.service.storage') - @patch('swh.web.api.service.query') + @patch('swh.web.common.service.storage') + @patch('swh.web.common.service.query') @istest def lookup_directory(self, mock_query, mock_storage): mock_query.parse_hash_with_algorithms_or_throws.return_value = ( 'sha1', 'directory-sha1-bin') # something that exists is all that matters here mock_storage.directory_get.return_value = {'id': b'directory-sha1-bin'} # given stub_dir_entries = [{ 'sha1': self.SHA1_SAMPLE_BIN, 'sha256': self.SHA256_SAMPLE_BIN, 'sha1_git': self.SHA1GIT_SAMPLE_BIN, 'target': hash_to_bytes( '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'), 'dir_id': self.DIRECTORY_ID_BIN, 'name': b'bob', 'type': 10, }] expected_dir_entries = [{ 'sha1': self.SHA1_SAMPLE, 'sha256': self.SHA256_SAMPLE, 'sha1_git': self.SHA1GIT_SAMPLE, 'target': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'dir_id': self.DIRECTORY_ID, 'name': 'bob', 'type': 10, }] mock_storage.directory_ls.return_value = stub_dir_entries # when actual_directory_ls = list(service.lookup_directory( 'directory-sha1')) # then self.assertEqual(actual_directory_ls, expected_dir_entries) mock_query.parse_hash_with_algorithms_or_throws.assert_called_with( 'directory-sha1', ['sha1'], 'Only sha1_git is supported.') mock_storage.directory_ls.assert_called_with( 'directory-sha1-bin') - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def lookup_revision_by_nothing_found(self, mock_storage): # given mock_storage.revision_get_by.return_value = None # when actual_revisions = service.lookup_revision_by(1) # then self.assertIsNone(actual_revisions) mock_storage.revision_get_by.assert_called_with(1, 'refs/heads/master', limit=1, timestamp=None) - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def lookup_revision_by(self, mock_storage): # given stub_rev = self.SAMPLE_REVISION_RAW expected_rev = self.SAMPLE_REVISION mock_storage.revision_get_by.return_value = [stub_rev] # when actual_revision = service.lookup_revision_by(10, 'master2', 'some-ts') # then self.assertEquals(actual_revision, expected_rev) mock_storage.revision_get_by.assert_called_with(10, 'master2', limit=1, timestamp='some-ts') - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def lookup_revision_by_nomerge(self, mock_storage): # given stub_rev = self.SAMPLE_REVISION_RAW stub_rev['parents'] = [ hash_to_bytes('adc83b19e793491b1c6ea0fd8b46cd9f32e592fc')] expected_rev = self.SAMPLE_REVISION expected_rev['parents'] = ['adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'] mock_storage.revision_get_by.return_value = [stub_rev] # when actual_revision = service.lookup_revision_by(10, 'master2', 'some-ts') # then self.assertEquals(actual_revision, expected_rev) mock_storage.revision_get_by.assert_called_with(10, 'master2', limit=1, timestamp='some-ts') - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def lookup_revision_by_merge(self, mock_storage): # given stub_rev = self.SAMPLE_REVISION_RAW stub_rev['parents'] = [ hash_to_bytes('adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'), hash_to_bytes('ffff3b19e793491b1c6db0fd8b46cd9f32e592fc') ] expected_rev = self.SAMPLE_REVISION expected_rev['parents'] = [ 'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc', 'ffff3b19e793491b1c6db0fd8b46cd9f32e592fc' ] expected_rev['merge'] = True mock_storage.revision_get_by.return_value = [stub_rev] # when actual_revision = service.lookup_revision_by(10, 'master2', 'some-ts') # then self.assertEquals(actual_revision, expected_rev) mock_storage.revision_get_by.assert_called_with(10, 'master2', limit=1, timestamp='some-ts') - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.storage') @istest def lookup_revision_with_context_by_ko(self, mock_storage): # given mock_storage.revision_get_by.return_value = None # when with self.assertRaises(NotFoundExc) as cm: origin_id = 1 branch_name = 'master3' ts = None service.lookup_revision_with_context_by(origin_id, branch_name, ts, 'sha1') # then self.assertIn( 'Revision with (origin_id: %s, branch_name: %s' ', ts: %s) not found.' % (origin_id, branch_name, ts), cm.exception.args[0]) mock_storage.revision_get_by.assert_called_once_with( origin_id, branch_name, ts) - @patch('swh.web.api.service.lookup_revision_with_context') - @patch('swh.web.api.service.storage') + @patch('swh.web.common.service.lookup_revision_with_context') + @patch('swh.web.common.service.storage') @istest def lookup_revision_with_context_by(self, mock_storage, mock_lookup_revision_with_context): # given stub_root_rev = {'id': 'root-rev-id'} mock_storage.revision_get_by.return_value = [{'id': 'root-rev-id'}] stub_rev = {'id': 'rev-found'} mock_lookup_revision_with_context.return_value = stub_rev # when origin_id = 1 branch_name = 'master3' ts = None sha1_git = 'sha1' actual_root_rev, actual_rev = service.lookup_revision_with_context_by( origin_id, branch_name, ts, sha1_git) # then self.assertEquals(actual_root_rev, stub_root_rev) self.assertEquals(actual_rev, stub_rev) mock_storage.revision_get_by.assert_called_once_with( origin_id, branch_name, limit=1, timestamp=ts) mock_lookup_revision_with_context.assert_called_once_with( stub_root_rev, sha1_git, 100) - @patch('swh.web.api.service.storage') - @patch('swh.web.api.service.query') + @patch('swh.web.common.service.storage') + @patch('swh.web.common.service.query') @istest def lookup_entity_by_uuid(self, mock_query, mock_storage): # given uuid_test = 'correct-uuid' mock_query.parse_uuid4.return_value = uuid_test stub_entities = [{'uuid': uuid_test}] mock_storage.entity_get.return_value = stub_entities # when actual_entities = list(service.lookup_entity_by_uuid(uuid_test)) # then self.assertEquals(actual_entities, stub_entities) mock_query.parse_uuid4.assert_called_once_with(uuid_test) mock_storage.entity_get.assert_called_once_with(uuid_test) @istest def lookup_revision_through_ko_not_implemented(self): # then with self.assertRaises(NotImplementedError): service.lookup_revision_through({ 'something-unknown': 10, }) - @patch('swh.web.api.service.lookup_revision_with_context_by') + @patch('swh.web.common.service.lookup_revision_with_context_by') @istest def lookup_revision_through_with_context_by(self, mock_lookup): # given stub_rev = {'id': 'rev'} mock_lookup.return_value = stub_rev # when actual_revision = service.lookup_revision_through({ 'origin_id': 1, 'branch_name': 'master', 'ts': None, 'sha1_git': 'sha1-git' }, limit=1000) # then self.assertEquals(actual_revision, stub_rev) mock_lookup.assert_called_once_with( 1, 'master', None, 'sha1-git', 1000) - @patch('swh.web.api.service.lookup_revision_by') + @patch('swh.web.common.service.lookup_revision_by') @istest def lookup_revision_through_with_revision_by(self, mock_lookup): # given stub_rev = {'id': 'rev'} mock_lookup.return_value = stub_rev # when actual_revision = service.lookup_revision_through({ 'origin_id': 2, 'branch_name': 'master2', 'ts': 'some-ts', }, limit=10) # then self.assertEquals(actual_revision, stub_rev) mock_lookup.assert_called_once_with( 2, 'master2', 'some-ts') - @patch('swh.web.api.service.lookup_revision_with_context') + @patch('swh.web.common.service.lookup_revision_with_context') @istest def lookup_revision_through_with_context(self, mock_lookup): # given stub_rev = {'id': 'rev'} mock_lookup.return_value = stub_rev # when actual_revision = service.lookup_revision_through({ 'sha1_git_root': 'some-sha1-root', 'sha1_git': 'some-sha1', }) # then self.assertEquals(actual_revision, stub_rev) mock_lookup.assert_called_once_with( 'some-sha1-root', 'some-sha1', 100) - @patch('swh.web.api.service.lookup_revision') + @patch('swh.web.common.service.lookup_revision') @istest def lookup_revision_through_with_revision(self, mock_lookup): # given stub_rev = {'id': 'rev'} mock_lookup.return_value = stub_rev # when actual_revision = service.lookup_revision_through({ 'sha1_git': 'some-sha1', }) # then self.assertEquals(actual_revision, stub_rev) mock_lookup.assert_called_once_with( 'some-sha1') - @patch('swh.web.api.service.lookup_revision_through') + @patch('swh.web.common.service.lookup_revision_through') @istest def lookup_directory_through_revision_ko_not_found( self, mock_lookup_rev): # given mock_lookup_rev.return_value = None # when with self.assertRaises(NotFoundExc): service.lookup_directory_through_revision( {'id': 'rev'}, 'some/path', 100) mock_lookup_rev.assert_called_once_with({'id': 'rev'}, 100) - @patch('swh.web.api.service.lookup_revision_through') - @patch('swh.web.api.service.lookup_directory_with_revision') + @patch('swh.web.common.service.lookup_revision_through') + @patch('swh.web.common.service.lookup_directory_with_revision') @istest def lookup_directory_through_revision_ok_with_data( self, mock_lookup_dir, mock_lookup_rev): # given mock_lookup_rev.return_value = {'id': 'rev-id'} mock_lookup_dir.return_value = {'type': 'dir', 'content': []} # when rev_id, dir_result = service.lookup_directory_through_revision( {'id': 'rev'}, 'some/path', 100) # then self.assertEquals(rev_id, 'rev-id') self.assertEquals(dir_result, {'type': 'dir', 'content': []}) mock_lookup_rev.assert_called_once_with({'id': 'rev'}, 100) mock_lookup_dir.assert_called_once_with('rev-id', 'some/path', False) - @patch('swh.web.api.service.lookup_revision_through') - @patch('swh.web.api.service.lookup_directory_with_revision') + @patch('swh.web.common.service.lookup_revision_through') + @patch('swh.web.common.service.lookup_directory_with_revision') @istest def lookup_directory_through_revision_ok_with_content( self, mock_lookup_dir, mock_lookup_rev): # given mock_lookup_rev.return_value = {'id': 'rev-id'} stub_result = {'type': 'file', 'revision': 'rev-id', 'content': {'data': b'blah', 'sha1': 'sha1'}} mock_lookup_dir.return_value = stub_result # when rev_id, dir_result = service.lookup_directory_through_revision( {'id': 'rev'}, 'some/path', 10, with_data=True) # then self.assertEquals(rev_id, 'rev-id') self.assertEquals(dir_result, stub_result) mock_lookup_rev.assert_called_once_with({'id': 'rev'}, 10) mock_lookup_dir.assert_called_once_with('rev-id', 'some/path', True) diff --git a/swh/web/tests/common/test_templatetags.py b/swh/web/tests/common/test_templatetags.py index e466f121..29f36f85 100644 --- a/swh/web/tests/common/test_templatetags.py +++ b/swh/web/tests/common/test_templatetags.py @@ -1,66 +1,66 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from nose.tools import istest from swh.web.common import swh_templatetags -class SWHApiTemplateTagsTest(unittest.TestCase): +class SWHTemplateTagsTest(unittest.TestCase): @istest def urlize_api_links_api(self): # update api link with html links content with links content = '{"url": "/api/1/abc/"}' expected_content = ('{"url": "/api/1/abc/"}') self.assertEquals(swh_templatetags.urlize_api_links(content), expected_content) @istest def urlize_api_links_browse(self): # update /browse link with html links content with links content = '{"url": "/browse/def/"}' expected_content = ('{"url": "' '/browse/def/"}') self.assertEquals(swh_templatetags.urlize_api_links(content), expected_content) @istest def urlize_header_links(self): # update api link with html links content with links content = """; rel="next" ; rel="prev" """ expected_content = """</api/1/abc/>; rel="next" </api/1/def/>; rel="prev" """ self.assertEquals(swh_templatetags.urlize_header_links(content), expected_content) @istest def safe_docstring_display(self): # update api link with html links content with links docstring = """This is my list header: - Here is item 1, with a continuation line right here - Here is item 2 Here is something that is not part of the list""" expected_docstring = """

This is my list header:

  • Here is item 1, with a continuation line right here
  • Here is item 2

Here is something that is not part of the list

""" self.assertEquals(swh_templatetags.safe_docstring_display(docstring), expected_docstring) diff --git a/swh/web/tests/common/test_utils.py b/swh/web/tests/common/test_utils.py new file mode 100644 index 00000000..5d7a645c --- /dev/null +++ b/swh/web/tests/common/test_utils.py @@ -0,0 +1,87 @@ +# Copyright (C) 2017 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import datetime +import dateutil +import unittest + +from nose.tools import istest + +from swh.web.common import utils + + +class UtilsTestCase(unittest.TestCase): + @istest + def shorten_path_noop(self): + noops = [ + '/api/', + '/browse/', + '/content/symbol/foobar/' + ] + + for noop in noops: + self.assertEqual( + utils.shorten_path(noop), + noop + ) + + @istest + def shorten_path_sha1(self): + sha1 = 'aafb16d69fd30ff58afdd69036a26047f3aebdc6' + short_sha1 = sha1[:8] + '...' + + templates = [ + '/api/1/content/sha1:%s/', + '/api/1/content/sha1_git:%s/', + '/api/1/directory/%s/', + '/api/1/content/sha1:%s/ctags/', + ] + + for template in templates: + self.assertEqual( + utils.shorten_path(template % sha1), + template % short_sha1 + ) + + @istest + def shorten_path_sha256(self): + sha256 = ('aafb16d69fd30ff58afdd69036a26047' + '213add102934013a014dfca031c41aef') + short_sha256 = sha256[:8] + '...' + + templates = [ + '/api/1/content/sha256:%s/', + '/api/1/directory/%s/', + '/api/1/content/sha256:%s/filetype/', + ] + + for template in templates: + self.assertEqual( + utils.shorten_path(template % sha256), + template % short_sha256 + ) + + @istest + def parse_timestamp(self): + input_timestamps = [ + None, + '2016-01-12', + '2016-01-12T09:19:12+0100', + 'Today is January 1, 2047 at 8:21:00AM', + '1452591542', + ] + + output_dates = [ + None, + datetime.datetime(2016, 1, 12, 0, 0), + datetime.datetime(2016, 1, 12, 9, 19, 12, + tzinfo=dateutil.tz.tzoffset(None, 3600)), + datetime.datetime(2047, 1, 1, 8, 21), + datetime.datetime(2016, 1, 12, 9, 39, 2, + tzinfo=datetime.timezone.utc), + ] + + for ts, exp_date in zip(input_timestamps, output_dates): + self.assertEquals(utils.parse_timestamp(ts), exp_date)