diff --git a/swh/web/ui/apidoc.py b/swh/web/ui/apidoc.py index d2cf08072..60a30f272 100644 --- a/swh/web/ui/apidoc.py +++ b/swh/web/ui/apidoc.py @@ -1,238 +1,248 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import re from functools import wraps +from enum import Enum from flask import request, render_template, url_for from flask import g from swh.web.ui.main import app -class argtypes(object): +class argtypes(Enum): """Class for centralizing argument type descriptions """ ts = 'timestamp' int = 'integer' path = 'path' sha1 = 'sha1' uuid = 'uuid' sha1_git = 'sha1_git' - octet_stream = 'octet stream' algo_and_hash = 'algo_hash:hash' -class rettypes(object): +class rettypes(Enum): """Class for centralizing return type descriptions """ + octet_stream = 'octet stream' list = 'list' dict = 'dict' -class excs(object): +class excs(Enum): """Class for centralizing exception type descriptions """ badinput = 'BadInputExc' notfound = 'NotFoundExc' class APIUrls(object): """ Class to manage API documentation URLs. * Indexes all routes documented using apidoc's decorators. * Tracks endpoint/request processing method relationships for use in generating related urls in API documentation Relies on the load_controllers logic in main.py for initialization. """ apidoc_routes = {} method_endpoints = {} @classmethod def get_app_endpoints(cls): return cls.apidoc_routes @classmethod def get_method_endpoints(cls, fname): if len(cls.method_endpoints) == 0: cls.method_endpoints = cls.group_routes_by_method() return cls.method_endpoints[fname] @classmethod def group_routes_by_method(cls): """ Group URL endpoints according to their processing method. Returns: A dict where keys are the processing method names, and values are the routes that are bound to the key method. """ endpoints = {} for rule in app.url_map.iter_rules(): rule_dict = {'rule': rule.rule, 'methods': rule.methods} if rule.endpoint not in endpoints: endpoints[rule.endpoint] = [rule_dict] else: endpoints[rule.endpoint].append(rule_dict) return endpoints @classmethod def index_add_route(cls, route, docstring): """ Add a route to the self-documenting API reference """ if route not in cls.apidoc_routes: cls.apidoc_routes[route] = docstring class route(object): """ Decorate an API method to register it in the API doc route index and create the corresponding Flask route. Caution: decorating a method with this requires to also decorate it __at least__ with @returns, or breaks the decorated endpoint Args: route: the documentation page's route noargs: set to True if the route has no arguments, and its result should be displayed anytime its documentation is requested """ def __init__(self, route, noargs=False): self.route = route self.noargs = noargs def __call__(self, f): APIUrls.index_add_route(self.route, f.__doc__) @wraps(f) def doc_func(*args, **kwargs): return f(call_args=(args, kwargs), doc_route=self.route, noargs=self.noargs) if not self.noargs: app.add_url_rule(self.route, f.__name__, doc_func) return doc_func class arg(object): """ Decorate an API method to display an argument's information on the doc page specified by @route above. Args: name: the argument's name. MUST match the method argument's name to create the example request URL. default: the argument's default value - argtype: the argument's type (map, dict, list, tuple...) + argtype: the argument's type as an Enum value from apidoc.argtypes argdoc: the argument's documentation string """ def __init__(self, name, default, argtype, argdoc): self.doc_dict = { 'name': name, - 'type': argtype, + 'type': argtype.value, 'doc': argdoc, 'default': default } + self.req_args = ['call_args', 'doc_route'] + + def check_args(self, kwargs): + missing = [arg for arg in self.req_args if arg not in kwargs] + if len(missing) > 0: + message = 'Expected keyword args %s, missing %s.' % ( + ', '.join(self.req_args), + ', '.join(missing)) + raise SWHAPIDocException(message) def __call__(self, f): @wraps(f) def arg_fun(*args, **kwargs): if 'args' in kwargs: kwargs['args'].append(self.doc_dict) else: kwargs['args'] = [self.doc_dict] return f(*args, **kwargs) return arg_fun class raises(object): """ Decorate an API method to display information pertaining to an exception that can be raised by this method. Args: - exc: the exception name + exc: the exception name as an Enum value from apidoc.excs doc: the exception's documentation string """ def __init__(self, exc, doc): self.exc_dict = { - 'exc': exc, + 'exc': exc.value, 'doc': doc } def __call__(self, f): @wraps(f) def exc_fun(*args, **kwargs): if 'excs' in kwargs: kwargs['excs'].append(self.exc_dict) else: kwargs['excs'] = [self.exc_dict] return f(*args, **kwargs) return exc_fun class returns(object): """ Decorate an API method to display information about its return value. Caution: this MUST be the last decorator in the apidoc decorator stack, or the decorated endpoint breaks Args: - rettype: the return value's type (map, dict, list, tuple...) + rettype: the return value's type as an Enum value from apidoc.rettypes retdoc: the return value's documentation string """ def __init__(self, rettype=None, retdoc=None): self.return_dict = { - 'type': rettype, + 'type': rettype.value, 'doc': retdoc } def filter_api_url(self, endpoint, route_re, noargs): doc_methods = {'GET', 'HEAD', 'OPTIONS'} if re.match(route_re, endpoint['rule']): if endpoint['methods'] == doc_methods and not noargs: return False return True def __call__(self, f): @wraps(f) def ret_fun(*args, **kwargs): # Build documentation env = { 'docstring': f.__doc__, 'route': kwargs['doc_route'], 'return': self.return_dict } for arg in ['args', 'excs']: if arg in kwargs: env[arg] = kwargs[arg] route_re = re.compile('.*%s$' % kwargs['doc_route']) endpoint_list = APIUrls.get_method_endpoints(f.__name__) other_urls = [url for url in endpoint_list if self.filter_api_url(url, route_re, kwargs['noargs'])] env['urls'] = other_urls # Build example endpoint URL if 'args' in env: defaults = {arg['name']: arg['default'] for arg in env['args']} env['example'] = url_for(f.__name__, **defaults) # Prepare and send to mimetype selector if it's not a doc request if re.match(route_re, request.url) and not kwargs['noargs']: return app.response_class( render_template('apidoc.html', **env), content_type='text/html') cargs, ckwargs = kwargs['call_args'] g.doc_env = env # Store for response processing return f(*cargs, **ckwargs) return ret_fun diff --git a/swh/web/ui/tests/test_apidoc.py b/swh/web/ui/tests/test_apidoc.py index 2b57f83e5..b1c0d98c5 100644 --- a/swh/web/ui/tests/test_apidoc.py +++ b/swh/web/ui/tests/test_apidoc.py @@ -1,296 +1,299 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from unittest.mock import MagicMock, patch from nose.tools import istest from swh.web.ui import apidoc from swh.web.ui.tests import test_app class APIDocTestCase(test_app.SWHApidocTestCase): def setUp(self): self.arg_dict = { 'name': 'my_pretty_arg', 'default': 'some default value', - 'type': 'str', + 'type': apidoc.argtypes.sha1, 'doc': 'this arg does things' } - self.stub_excs = [{'exc': 'catastrophic_exception', + self.stub_excs = [{'exc': apidoc.excs.badinput, 'doc': 'My exception documentation'}] self.stub_args = [{'name': 'stub_arg', 'default': 'some_default'}] self.stub_rule_list = [ {'rule': 'some/route/with/args/', 'methods': {'GET', 'HEAD', 'OPTIONS'}}, {'rule': 'some/doc/route/', 'methods': {'GET', 'HEAD', 'OPTIONS'}}, {'rule': 'some/other/route/', 'methods': {'GET', 'HEAD', 'OPTIONS'}} ] self.stub_return = { - 'type': 'some_return_type', + 'type': apidoc.rettypes.dict.value, 'doc': 'a dict with amazing properties' } @patch('swh.web.ui.apidoc.APIUrls') @patch('swh.web.ui.apidoc.app') @istest def apidoc_route(self, mock_app, mock_api_urls): # given decorator = apidoc.route('/some/url/for/doc/') mock_fun = MagicMock(return_value=123) mock_fun.__doc__ = 'Some documentation' mock_fun.__name__ = 'some_fname' decorated = decorator.__call__(mock_fun) # when decorated('some', 'value', kws='and a kw') # then mock_fun.assert_called_once_with( call_args=(('some', 'value'), {'kws': 'and a kw'}), doc_route='/some/url/for/doc/', noargs=False ) mock_api_urls.index_add_route.assert_called_once_with( '/some/url/for/doc/', 'Some documentation') mock_app.add_url_rule.assert_called_once_with( '/some/url/for/doc/', 'some_fname', decorated) @istest def apidoc_arg_noprevious(self): # given decorator = apidoc.arg('my_pretty_arg', default='some default value', - argtype='str', + argtype=apidoc.argtypes.sha1, argdoc='this arg does things') mock_fun = MagicMock(return_value=123) decorated = decorator.__call__(mock_fun) + self.arg_dict['type'] = self.arg_dict['type'].value # when decorated(call_args=((), {}), doc_route='some/route/') # then mock_fun.assert_called_once_with( call_args=((), {}), doc_route='some/route/', args=[self.arg_dict] ) @istest def apidoc_arg_previous(self): # given decorator = apidoc.arg('my_other_arg', default='some other value', - argtype='str', + argtype=apidoc.argtypes.sha1, argdoc='this arg is optional') mock_fun = MagicMock(return_value=123) decorated = decorator.__call__(mock_fun) # when decorated(call_args=((), {}), doc_route='some/route/', args=[self.arg_dict]) # then mock_fun.assert_called_once_with( call_args=((), {}), doc_route='some/route/', args=[self.arg_dict, {'name': 'my_other_arg', 'default': 'some other value', - 'type': 'str', + 'type': apidoc.argtypes.sha1.value, 'doc': 'this arg is optional'}]) @istest def apidoc_raises_noprevious(self): # given - decorator = apidoc.raises(exc='catastrophic_exception', + decorator = apidoc.raises(exc=apidoc.excs.badinput, doc='My exception documentation') mock_fun = MagicMock(return_value=123) decorated = decorator.__call__(mock_fun) + self.stub_excs[0]['exc'] = self.stub_excs[0]['exc'].value # when decorated(call_args=((), {}), doc_route='some/route/') # then mock_fun.assert_called_once_with( call_args=((), {}), doc_route='some/route/', excs=self.stub_excs ) @istest def apidoc_raises_previous(self): # given - decorator = apidoc.raises(exc='cataclysmic_exception', + decorator = apidoc.raises(exc=apidoc.excs.notfound, doc='Another documentation') mock_fun = MagicMock(return_value=123) decorated = decorator.__call__(mock_fun) expected_excs = self.stub_excs + [{ - 'exc': 'cataclysmic_exception', + 'exc': apidoc.excs.notfound.value, 'doc': 'Another documentation'}] + expected_excs[0]['exc'] = expected_excs[0]['exc'].value # when decorated(call_args=((), {}), doc_route='some/route/', excs=self.stub_excs) # then mock_fun.assert_called_once_with( call_args=((), {}), doc_route='some/route/', excs=expected_excs) @patch('swh.web.ui.apidoc.render_template') @patch('swh.web.ui.apidoc.url_for') @patch('swh.web.ui.apidoc.APIUrls') @patch('swh.web.ui.apidoc.request') @istest def apidoc_returns_doc_call(self, mock_request, mock_api_urls, mock_url_for, mock_render): # given - decorator = apidoc.returns(rettype='some_return_type', + decorator = apidoc.returns(rettype=apidoc.rettypes.dict, retdoc='a dict with amazing properties') mock_fun = MagicMock(return_value=123) mock_fun.__name__ = 'some_fname' mock_fun.__doc__ = 'Some documentation' decorated = decorator.__call__(mock_fun) mock_api_urls.get_method_endpoints.return_value = self.stub_rule_list mock_request.url = 'http://my-domain.tld/some/doc/route/' mock_url_for.return_value = 'http://my-domain.tld/meaningful_route/' expected_env = { 'urls': [{'rule': 'some/route/with/args/', 'methods': {'GET', 'HEAD', 'OPTIONS'}}, {'rule': 'some/other/route/', 'methods': {'GET', 'HEAD', 'OPTIONS'}}], 'docstring': 'Some documentation', 'args': self.stub_args, 'excs': self.stub_excs, 'route': 'some/doc/route/', 'example': 'http://my-domain.tld/meaningful_route/', 'return': self.stub_return } # when decorated( docstring='Some documentation', call_args=(('some', 'args'), {'kw': 'kwargs'}), args=self.stub_args, excs=self.stub_excs, doc_route='some/doc/route/', noargs=False ) # then self.assertEqual(mock_fun.call_args_list, []) # function not called mock_render.assert_called_once_with( 'apidoc.html', **expected_env ) @patch('swh.web.ui.apidoc.g') @patch('swh.web.ui.apidoc.url_for') @patch('swh.web.ui.apidoc.APIUrls') @patch('swh.web.ui.apidoc.request') @istest def apidoc_returns_noargs(self, mock_request, mock_api_urls, mock_url_for, mock_g): # given - decorator = apidoc.returns(rettype='some_return_type', + decorator = apidoc.returns(rettype=apidoc.rettypes.dict, retdoc='a dict with amazing properties') mock_fun = MagicMock(return_value=123) mock_fun.__name__ = 'some_fname' mock_fun.__doc__ = 'Some documentation' decorated = decorator.__call__(mock_fun) mock_api_urls.get_method_endpoints.return_value = [ {'rule': 'some/doc/route/', 'methods': {'GET', 'HEAD', 'OPTIONS'}}] mock_request.url = 'http://my-domain.tld/some/doc/route/' doc_dict = { 'urls': [ {'rule': 'some/doc/route/', 'methods': {'GET', 'HEAD', 'OPTIONS'}}], 'docstring': 'Some documentation', 'route': 'some/doc/route/', - 'return': {'type': 'some_return_type', + 'return': {'type': apidoc.rettypes.dict.value, 'doc': 'a dict with amazing properties'} } # when decorated( call_args=((), {}), doc_route='some/doc/route/', noargs=True ) # then mock_fun.assert_called_once_with() self.assertEqual(mock_g.doc_env, doc_dict) @patch('swh.web.ui.apidoc.g') @patch('swh.web.ui.apidoc.url_for') @patch('swh.web.ui.apidoc.APIUrls') @patch('swh.web.ui.apidoc.request') @istest def apidoc_return_endpoint_call(self, mock_request, mock_api_urls, mock_url_for, mock_g): # given - decorator = apidoc.returns(rettype='some_return_type', + decorator = apidoc.returns(rettype=apidoc.rettypes.dict, retdoc='a dict with amazing properties') mock_fun = MagicMock(return_value=123) mock_fun.__name__ = 'some_fname' mock_fun.__doc__ = 'Some documentation' decorated = decorator.__call__(mock_fun) mock_api_urls.get_method_endpoints.return_value = self.stub_rule_list mock_request.url = 'http://my-domain.tld/some/arg/route/' mock_url_for.return_value = 'http://my-domain.tld/some/arg/route' doc_dict = { 'urls': [{'rule': 'some/route/with/args/', 'methods': {'GET', 'HEAD', 'OPTIONS'}}, {'rule': 'some/other/route/', 'methods': {'GET', 'HEAD', 'OPTIONS'}}], 'docstring': 'Some documentation', 'args': self.stub_args, 'excs': self.stub_excs, 'route': 'some/doc/route/', 'example': 'http://my-domain.tld/some/arg/route', 'return': self.stub_return } # when decorated( docstring='Some documentation', call_args=(('some', 'args'), {'kw': 'kwargs'}), args=self.stub_args, excs=self.stub_excs, noargs=False, doc_route='some/doc/route/', ) # then mock_fun.assert_called_once_with('some', 'args', kw='kwargs') self.assertEqual(mock_g.doc_env, doc_dict) diff --git a/swh/web/ui/views/api.py b/swh/web/ui/views/api.py index 008b0ec25..bd088269c 100644 --- a/swh/web/ui/views/api.py +++ b/swh/web/ui/views/api.py @@ -1,739 +1,739 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from types import GeneratorType from flask import request, url_for from swh.web.ui import service, utils, apidoc as doc from swh.web.ui.exc import NotFoundExc from swh.web.ui.main import app @app.route('/api/1/stat/counters/') @doc.route('/api/1/stat/counters/', noargs=True) @doc.returns(rettype=doc.rettypes.dict, retdoc="A dictionary of SWH's most important statistics") def api_stats(): """Return statistics on SWH storage. """ return service.stat_counters() @app.route('/api/1/stat/visits//') @doc.route('/api/1/stat/visits/') @doc.arg('origin_id', default=1, argtype=doc.argtypes.int, argdoc='The requested SWH origin identifier') @doc.returns(rettype=doc.rettypes.list, retdoc="""All instances of visits of the origin pointed by origin_id as POSIX time since epoch""") def api_origin_visits(origin_id): """Return a list of visit dates as POSIX timestamps for the given revision. """ date_gen = (item['date'] for item in service.stat_origin_visits(origin_id)) return sorted(date_gen) @app.route('/api/1/search/', methods=['POST']) @app.route('/api/1/search//') @doc.route('/api/1/search/') @doc.arg('q', default='sha1:adc83b19e793491b1c6ea0fd8b46cd9f32e592fc', argtype=doc.argtypes.algo_and_hash, argdoc="""An algo_hash:hash string, where algo_hash is one of sha1, sha1_git or sha256 and hash is the hash to search for in SWH""") @doc.raises(exc=doc.excs.badinput, doc='Raised if q is not well formed') @doc.returns(rettype=doc.rettypes.dict, retdoc="""A dict with keys: - search_res: a list of dicts corresponding to queried content with key 'found' to True if found, 'False' if not - search_stats: a dict containing number of files searched and percentage of files found """) def api_search(q=None): """Search a content per hash. This may take the form of: - a GET request with a single checksum - a POST request with many hashes, with the request body containing identifiers (typically filenames) as keys and corresponding hashes as values. """ response = {'search_res': None, 'search_stats': None} search_stats = {'nbfiles': 0, 'pct': 0} search_res = None # Single hash request route if q: r = service.search_hash(q) search_res = [{'filename': None, 'sha1': q, 'found': r['found']}] search_stats['nbfiles'] = 1 search_stats['pct'] = 100 if r['found'] else 0 # Post form submission with many hash requests elif request.method == 'POST': data = request.form queries = [] # Remove potential inputs with no associated value for k, v in data.items(): if v is not None: if k == 'q' and len(v) > 0: queries.append({'filename': None, 'sha1': v}) elif v != '': queries.append({'filename': k, 'sha1': v}) if len(queries) > 0: lookup = service.lookup_multiple_hashes(queries) result = [] for el in lookup: result.append({'filename': el['filename'], 'sha1': el['sha1'], 'found': el['found']}) search_res = result nbfound = len([x for x in lookup if x['found']]) search_stats['nbfiles'] = len(queries) search_stats['pct'] = (nbfound / len(queries))*100 response['search_res'] = search_res response['search_stats'] = search_stats return response def _api_lookup(criteria, lookup_fn, error_msg_if_not_found, enrich_fn=lambda x: x, *args): """Capture a redundant behavior of: - looking up the backend with a criteria (be it an identifier or checksum) passed to the function lookup_fn - if nothing is found, raise an NotFoundExc exception with error message error_msg_if_not_found. - Otherwise if something is returned: - either as list, map or generator, map the enrich_fn function to it and return the resulting data structure as list. - either as dict and pass to enrich_fn and return the dict enriched. Args: - criteria: discriminating criteria to lookup - lookup_fn: function expects one criteria and optional supplementary *args. - error_msg_if_not_found: if nothing matching the criteria is found, raise NotFoundExc with this error message. - enrich_fn: Function to use to enrich the result returned by lookup_fn. Default to the identity function if not provided. - *args: supplementary arguments to pass to lookup_fn. Raises: NotFoundExp or whatever `lookup_fn` raises. """ res = lookup_fn(criteria, *args) if not res: raise NotFoundExc(error_msg_if_not_found) if isinstance(res, (map, list, GeneratorType)): enriched_data = [] for e in res: enriched_data.append(enrich_fn(e)) return enriched_data return enrich_fn(res) @app.route('/api/1/origin//') @doc.route('/api/1/origin/') @doc.arg('origin_id', default=1, argtype=doc.argtypes.int, argdoc="The origin's SWH origin_id.") @doc.raises(exc=doc.excs.notfound, doc='Raised if origin_id does not correspond to an origin in SWH') @doc.returns(rettype=doc.rettypes.dict, retdoc='The metadata of the origin identified by origin_id') def api_origin(origin_id): """Return information about origin with id origin_id. """ return _api_lookup( origin_id, lookup_fn=service.lookup_origin, error_msg_if_not_found='Origin with id %s not found.' % origin_id) @app.route('/api/1/person//') @doc.route('/api/1/person/') @doc.arg('person_id', default=1, argtype=doc.argtypes.int, argdoc="The person's SWH identifier") @doc.raises(exc=doc.excs.notfound, doc='Raised if person_id does not correspond to an origin in SWH') @doc.returns(rettype=doc.rettypes.dict, retdoc='The metadata of the person identified by person_id') def api_person(person_id): """Return information about person with identifier person_id. """ return _api_lookup( person_id, lookup_fn=service.lookup_person, error_msg_if_not_found='Person with id %s not found.' % person_id) @app.route('/api/1/release//') @doc.route('/api/1/release/') @doc.arg('sha1_git', default='8b137891791fe96927ad78e64b0aad7bded08bdc', argtype=doc.argtypes.sha1_git, argdoc="The release's sha1_git identifier") @doc.raises(exc=doc.excs.badinput, doc='Raised if the argument is not a sha1') @doc.raises(exc=doc.excs.notfound, doc='Raised if sha1_git does not correspond to a release in SWH') @doc.returns(rettype=doc.rettypes.dict, retdoc='The metadata of the release identified by sha1_git') def api_release(sha1_git): """Return information about release with id sha1_git. """ error_msg = 'Release with sha1_git %s not found.' % sha1_git return _api_lookup( sha1_git, lookup_fn=service.lookup_release, error_msg_if_not_found=error_msg, enrich_fn=utils.enrich_release) def _revision_directory_by(revision, path, request_path, limit=100, with_data=False): """Compute the revision matching criterion's directory or content data. Args: revision: dictionary of criterions representing a revision to lookup path: directory's path to lookup request_path: request path which holds the original context to limit: optional query parameter to limit the revisions log (default to 100). For now, note that this limit could impede the transitivity conclusion about sha1_git not being an ancestor of with_data: indicate to retrieve the content's raw data if path resolves to a content. """ def enrich_directory_local(dir, context_url=request_path): return utils.enrich_directory(dir, context_url) rev_id, result = service.lookup_directory_through_revision( revision, path, limit=limit, with_data=with_data) content = result['content'] if result['type'] == 'dir': # dir_entries result['content'] = list(map(enrich_directory_local, content)) else: # content result['content'] = utils.enrich_content(content) return result @app.route('/api/1/revision' '/origin/' '/directory/') @app.route('/api/1/revision' '/origin/' '/directory//') @app.route('/api/1/revision' '/origin/' '/branch/' '/directory/') @app.route('/api/1/revision' '/origin/' '/branch/' '/directory//') @app.route('/api/1/revision' '/origin/' '/branch/' '/ts/' '/directory/') @app.route('/api/1/revision' '/origin/' '/branch/' '/ts/' '/directory//') @doc.route('/api/1/revision/origin/directory/') @doc.arg('origin_id', default=1, argtype=doc.argtypes.int, argdoc="The revision's origin's SWH identifier") @doc.arg('branch_name', default='refs/heads/master', argtype=doc.argtypes.path, argdoc="""The optional branch for the given origin (default to master""") @doc.arg('ts', default='2000-01-17T11:23:54+00:00', argtype=doc.argtypes.ts, argdoc="""Optional timestamp (default to the nearest time crawl of timestamp)""") @doc.arg('path', default='.', argtype=doc.argtypes.path, argdoc='The path to the directory or file to display') @doc.raises(exc=doc.excs.notfound, doc="""Raised if a revision matching the passed criteria was not found""") @doc.returns(rettype=doc.rettypes.dict, retdoc="""The metadata of the revision corresponding to the passed criteria""") def api_directory_through_revision_origin(origin_id, branch_name="refs/heads/master", ts=None, path=None, with_data=False): """Display directory or content information through a revision identified by origin/branch/timestamp. """ if ts: ts = utils.parse_timestamp(ts) return _revision_directory_by( { 'origin_id': origin_id, 'branch_name': branch_name, 'ts': ts }, path, request.path, with_data=with_data) @app.route('/api/1/revision' '/origin//') @app.route('/api/1/revision' '/origin/' '/branch//') @app.route('/api/1/revision' '/origin/' '/branch/' '/ts//') @app.route('/api/1/revision' '/origin/' '/ts//') @doc.route('/api/1/revision/origin/') @doc.arg('origin_id', default=1, argtype=doc.argtypes.int, argdoc="The queried revision's origin identifier in SWH") @doc.arg('branch_name', default='refs/heads/master', argtype=doc.argtypes.path, argdoc="""The optional branch for the given origin (default to master)""") @doc.arg('ts', default='2000-01-17T11:23:54+00:00', argtype=doc.argtypes.ts, argdoc="The time at which the queried revision should be constrained") @doc.raises(exc=doc.excs.notfound, doc="""Raised if a revision matching given criteria was not found in SWH""") @doc.returns(rettype=doc.rettypes.dict, retdoc="""The metadata of the revision identified by the given criteria""") def api_revision_with_origin(origin_id, branch_name="refs/heads/master", ts=None): """Display revision information through its identification by origin/branch/timestamp. """ if ts: ts = utils.parse_timestamp(ts) return _api_lookup( origin_id, service.lookup_revision_by, 'Revision with (origin_id: %s, branch_name: %s' ', ts: %s) not found.' % (origin_id, branch_name, ts), utils.enrich_revision, branch_name, ts) @app.route('/api/1/revision//') @app.route('/api/1/revision//prev//') @doc.route('/api/1/revision/') @doc.arg('sha1_git', default='ec72c666fb345ea5f21359b7bc063710ce558e39', argtype=doc.argtypes.sha1_git, argdoc="The revision's sha1_git identifier") @doc.arg('context', default='6adc4a22f20bbf3bbc754f1ec8c82be5dfb5c71a', argtype=doc.argtypes.path, argdoc='The navigation breadcrumbs -- use at your own risk') @doc.raises(exc=doc.excs.badinput, doc='Raised if sha1_git is not well formed') @doc.raises(exc=doc.excs.notfound, doc='Raised if a revision matching sha1_git was not found in SWH') @doc.returns(rettype=doc.rettypes.dict, retdoc='The metadata of the revision identified by sha1_git') def api_revision(sha1_git, context=None): """Return information about revision with id sha1_git. """ def _enrich_revision(revision, context=context): return utils.enrich_revision(revision, context) return _api_lookup( sha1_git, service.lookup_revision, 'Revision with sha1_git %s not found.' % sha1_git, _enrich_revision) @app.route('/api/1/revision//raw/') @doc.route('/api/1/revision/raw/') @doc.arg('sha1_git', default='ec72c666fb345ea5f21359b7bc063710ce558e39', argtype=doc.argtypes.sha1_git, argdoc="The queried revision's sha1_git identifier") @doc.raises(exc=doc.excs.badinput, doc='Raised if sha1_git is not well formed') @doc.raises(exc=doc.excs.notfound, doc='Raised if a revision matching sha1_git was not found in SWH') -@doc.returns(rettype=doc.argtypes.octet_stream, +@doc.returns(rettype=doc.rettypes.octet_stream, retdoc="""The message of the revision identified by sha1_git as a downloadable octet stream""") def api_revision_raw_message(sha1_git): """Return the raw data of the message of revision identified by sha1_git """ raw = service.lookup_revision_message(sha1_git) return app.response_class(raw['message'], headers={'Content-disposition': 'attachment;' 'filename=rev_%s_raw' % sha1_git}, mimetype='application/octet-stream') @app.route('/api/1/revision//directory/') @app.route('/api/1/revision//directory//') @doc.route('/api/1/revision/directory/') @doc.arg('sha1_git', default='ec72c666fb345ea5f21359b7bc063710ce558e39', argtype=doc.argtypes.sha1_git, argdoc="The revision's sha1_git identifier.") @doc.arg('dir_path', default='.', argtype=doc.argtypes.path, argdoc='The path from the top level directory') @doc.raises(exc=doc.excs.badinput, doc='Raised if sha1_git is not well formed') @doc.raises(exc=doc.excs.notfound, doc="""Raised if a revision matching sha1_git was not found in SWH , or if the path specified does not exist""") @doc.returns(rettype=doc.rettypes.dict, retdoc="""The metadata of the directory pointed by revision id sha1-git and dir_path""") def api_revision_directory(sha1_git, dir_path=None, with_data=False): """Return information on directory pointed by revision with sha1_git. If dir_path is not provided, display top level directory. Otherwise, display the directory pointed by dir_path (if it exists). """ return _revision_directory_by( { 'sha1_git': sha1_git }, dir_path, request.path, with_data=with_data) @app.route('/api/1/revision//log/') @app.route('/api/1/revision//prev//log/') @doc.route('/api/1/revision/log/') @doc.arg('sha1_git', default='ec72c666fb345ea5f21359b7bc063710ce558e39', argtype=doc.argtypes.sha1_git, argdoc='The sha1_git of the revision queried') @doc.arg('prev_sha1s', default='6adc4a22f20bbf3bbc754f1ec8c82be5dfb5c71a', argtype=doc.argtypes.path, argdoc='The navigation breadcrumbs -- use at your own risk!') @doc.raises(exc=doc.excs.badinput, doc='Raised if sha1_git or prev_sha1s is not well formed') @doc.raises(exc=doc.excs.notfound, doc='Raised if a revision matching sha1_git was not found in SWH') @doc.returns(rettype=doc.rettypes.dict, retdoc="""The log data starting at the revision identified by sha1_git, completed with the navigation breadcrumbs, if any""") def api_revision_log(sha1_git, prev_sha1s=None): """Show all revisions (~git log) starting from sha1_git. The first element returned is the given sha1_git, or the first breadcrumb, if any. """ limit = app.config['conf']['max_log_revs'] response = {'revisions': None, 'next_revs_url': None} revisions = None next_revs_url = None def lookup_revision_log_with_limit(s, limit=limit+1): return service.lookup_revision_log(s, limit) error_msg = 'Revision with sha1_git %s not found.' % sha1_git rev_get = _api_lookup(sha1_git, lookup_fn=lookup_revision_log_with_limit, error_msg_if_not_found=error_msg, enrich_fn=utils.enrich_revision) if len(rev_get) == limit+1: rev_backward = rev_get[:-1] next_revs_url = url_for('api_revision_log', sha1_git=rev_get[-1]['id']) else: rev_backward = rev_get if not prev_sha1s: # no nav breadcrumbs, so we're done revisions = rev_backward else: rev_forward_ids = prev_sha1s.split('/') rev_forward = _api_lookup(rev_forward_ids, lookup_fn=service.lookup_revision_multiple, error_msg_if_not_found=error_msg, enrich_fn=utils.enrich_revision) revisions = rev_forward + rev_backward response['revisions'] = revisions response['next_revs_url'] = next_revs_url return response @app.route('/api/1/revision' '/origin//log/') @app.route('/api/1/revision' '/origin/' '/branch//log/') @app.route('/api/1/revision' '/origin/' '/branch/' '/ts//log/') @app.route('/api/1/revision' '/origin/' '/ts//log/') @doc.route('/api/1/revision/origin/log/') @doc.arg('origin_id', default=1, argtype=doc.argtypes.int, argdoc="The revision's SWH origin identifier") @doc.arg('branch_name', default='refs/heads/master', argtype=doc.argtypes.path, argdoc="The revision's branch name within the origin specified") @doc.arg('ts', default='2000-01-17T11:23:54+00:00', argtype=doc.argtypes.ts, argdoc="""A time or timestamp string to parse""") @doc.raises(exc=doc.excs.notfound, doc="""Raised if a revision matching the given criteria was not found in SWH""") @doc.returns(rettype=doc.rettypes.dict, retdoc="""The metadata of the revision log starting at the revision matching the given criteria.""") def api_revision_log_by(origin_id, branch_name='refs/heads/master', ts=None): """Show all revisions (~git log) starting from the revision described by its origin_id, optional branch name and timestamp. The first element returned is the described revision. """ limit = app.config['conf']['max_log_revs'] response = {'revisions': None, 'next_revs_url': None} next_revs_url = None if ts: ts = utils.parse_timestamp(ts) def lookup_revision_log_by_with_limit(o_id, br, ts, limit=limit+1): return service.lookup_revision_log_by(o_id, br, ts, limit) error_msg = 'No revision matching origin %s ' % origin_id error_msg += ', branch name %s' % branch_name error_msg += (' and time stamp %s.' % ts) if ts else '.' rev_get = _api_lookup(origin_id, lookup_revision_log_by_with_limit, error_msg, utils.enrich_revision, branch_name, ts) if len(rev_get) == limit+1: revisions = rev_get[:-1] next_revs_url = url_for('api_revision_log', sha1_git=rev_get[-1]['id']) else: revisions = rev_get response['revisions'] = revisions response['next_revs_url'] = next_revs_url return response @app.route('/api/1/directory//') @app.route('/api/1/directory///') @doc.route('/api/1/directory/') @doc.arg('sha1_git', default='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc', argtype=doc.argtypes.sha1_git, argdoc="The queried directory's corresponding sha1_git hash") @doc.arg('path', default='.', argtype=doc.argtypes.path, argdoc="A path relative to the queried directory's top level") @doc.raises(exc=doc.excs.badinput, doc='Raised if sha1_git is not well formed') @doc.raises(exc=doc.excs.notfound, doc='Raised if a directory matching sha1_git was not found in SWH') @doc.returns(rettype=doc.rettypes.dict, retdoc="""The metadata and contents of the release identified by sha1_git""") def api_directory(sha1_git, path=None): """Return information about release with id sha1_git. """ if path: error_msg_path = ('Entry with path %s relative to directory ' 'with sha1_git %s not found.') % (path, sha1_git) return _api_lookup( sha1_git, service.lookup_directory_with_path, error_msg_path, utils.enrich_directory, path) else: error_msg_nopath = 'Directory with sha1_git %s not found.' % sha1_git return _api_lookup( sha1_git, service.lookup_directory, error_msg_nopath, utils.enrich_directory) # @app.route('/api/1/browse/') # @app.route('/api/1/browse//') def api_content_checksum_to_origin(q): """Return content information up to one of its origin if the content is found. Args: q is of the form algo_hash:hash with algo_hash in (sha1, sha1_git, sha256). Returns: Information on one possible origin for such content. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc if the content is not found. Example: GET /api/1/browse/sha1_git:88b9b366facda0b5ff8d8640ee9279bed346f242 """ found = service.lookup_hash(q)['found'] if not found: raise NotFoundExc('Content with %s not found.' % q) return service.lookup_hash_origin(q) @app.route('/api/1/content//raw/') @doc.route('/api/1/content/raw/') @doc.arg('q', default='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc', argtype=doc.argtypes.algo_and_hash, argdoc="""An algo_hash:hash string, where algo_hash is one of sha1, sha1_git or sha256 and hash is the hash to search for in SWH. Defaults to sha1 in the case of a missing algo_hash """) @doc.raises(exc=doc.excs.badinput, doc='Raised if q is not well formed') @doc.raises(exc=doc.excs.notfound, doc='Raised if a content matching q was not found in SWH') -@doc.returns(rettype='octet stream', +@doc.returns(rettype=doc.rettypes.octet_stream, retdoc='The raw content data as an octet stream') def api_content_raw(q): """Return content's raw data if content is found. """ def generate(content): yield content['data'] content = service.lookup_content_raw(q) if not content: raise NotFoundExc('Content with %s not found.' % q) return app.response_class(generate(content), headers={'Content-disposition': 'attachment;' 'filename=content_%s_raw' % q}, mimetype='application/octet-stream') @app.route('/api/1/content//') @doc.route('/api/1/content/') @doc.arg('q', default='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc', argtype=doc.argtypes.algo_and_hash, argdoc="""An algo_hash:hash string, where algo_hash is one of sha1, sha1_git or sha256 and hash is the hash to search for in SWH. Defaults to sha1 in the case of a missing algo_hash """) @doc.raises(exc=doc.excs.badinput, doc='Raised if q is not well formed') @doc.raises(exc=doc.excs.notfound, doc='Raised if a content matching q was not found in SWH') @doc.returns(rettype=doc.rettypes.dict, retdoc="""The metadata of the content identified by q. If content decoding was successful, it also returns the data""") def api_content_metadata(q): """Return content information if content is found. """ return _api_lookup( q, lookup_fn=service.lookup_content, error_msg_if_not_found='Content with %s not found.' % q, enrich_fn=utils.enrich_content) @app.route('/api/1/entity//') @doc.route('/api/1/entity/') @doc.arg('uuid', default='5f4d4c51-498a-4e28-88b3-b3e4e8396cba', argtype=doc.argtypes.uuid, argdoc="The entity's uuid identifier") @doc.raises(exc=doc.excs.badinput, doc='Raised if uuid is not well formed') @doc.raises(exc=doc.excs.notfound, doc='Raised if an entity matching uuid was not found in SWH') @doc.returns(rettype=doc.rettypes.dict, retdoc='The metadata of the entity identified by uuid') def api_entity_by_uuid(uuid): """Return content information if content is found. """ return _api_lookup( uuid, lookup_fn=service.lookup_entity_by_uuid, error_msg_if_not_found="Entity with uuid '%s' not found." % uuid, enrich_fn=utils.enrich_entity)