{% endif %}
{% endblock %}
diff --git a/swh/web/ui/views/api.py b/swh/web/ui/views/api.py
index 7a6ac5d87..4f6cdbdf2 100644
--- a/swh/web/ui/views/api.py
+++ b/swh/web/ui/views/api.py
@@ -1,1104 +1,1098 @@
# Copyright (C) 2015-2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from types import GeneratorType
from flask import request, url_for
from swh.web.ui import service, utils, apidoc as doc
from swh.web.ui.exc import NotFoundExc
from swh.web.ui.main import app
# canned doc string snippets that are used in several doc strings
_doc_arg_content_id = """A "[hash_type:]hash" content identifier, where
hash_type is one of "sha1" (the default), "sha1_git", "sha256", and hash is
a checksum obtained with the hash_type hashing algorithm."""
_doc_arg_last_elt = 'element to start listing from, for pagination purposes'
_doc_arg_per_page = 'number of elements to list, for pagination purposes'
_doc_exc_bad_id = 'syntax error in the given identifier(s)'
_doc_exc_id_not_found = 'no object matching the given criteria could be found'
_doc_ret_revision_meta = 'metadata of the revision identified by sha1_git'
_doc_ret_revision_log = """list of dictionaries representing the metadata of
each revision found in the commit log heading to revision sha1_git.
For each commit at least the following information are returned:
author/committer, authoring/commit timestamps, revision id, commit message,
parent (i.e., immediately preceding) commits, "root" directory id."""
_doc_header_link = """indicates that a subsequent result page is available,
pointing to it"""
@app.route('/api/1/stat/counters/')
@doc.route('/api/1/stat/counters/', noargs=True)
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""dictionary mapping object types to the amount of
corresponding objects currently available in the archive""")
def api_stats():
"""Get statistics about the content of the archive.
"""
return service.stat_counters()
@app.route('/api/1/origin//visits/')
@doc.route('/api/1/origin/visits/')
@doc.arg('origin_id',
default=1,
argtype=doc.argtypes.int,
argdoc='software origin identifier')
@doc.header('Link', doc=_doc_header_link)
@doc.param('last_visit', default=None,
argtype=doc.argtypes.int,
doc=_doc_arg_last_elt)
@doc.param('per_page', default=10,
argtype=doc.argtypes.int,
doc=_doc_arg_per_page)
@doc.returns(rettype=doc.rettypes.list,
retdoc="""a list of dictionaries describing individual visits.
For each visit, its identifier, timestamp (as UNIX time), outcome,
and visit-specific URL for more information are given.""")
def api_origin_visits(origin_id):
"""Get information about all visits of a given software origin.
"""
result = {}
per_page = int(request.args.get('per_page', '10'))
last_visit = request.args.get('last_visit')
if last_visit:
last_visit = int(last_visit)
def _lookup_origin_visits(
origin_id, last_visit=last_visit, per_page=per_page):
return service.lookup_origin_visits(
origin_id, last_visit=last_visit, per_page=per_page)
def _enrich_origin_visit(origin_visit):
ov = origin_visit.copy()
ov['origin_visit_url'] = url_for('api_origin_visit',
origin_id=ov['origin'],
visit_id=ov['visit'])
return ov
r = _api_lookup(
origin_id,
_lookup_origin_visits,
error_msg_if_not_found='No origin %s found' % origin_id,
enrich_fn=_enrich_origin_visit)
if r:
l = len(r)
if l == per_page:
new_last_visit = r[-1]['visit']
params = {
'origin_id': origin_id,
'last_visit': new_last_visit
}
if request.args.get('per_page'):
params['per_page'] = per_page
result['headers'] = {
'link-next': url_for('api_origin_visits', **params)
}
result.update({
'results': r
})
return result
@app.route('/api/1/origin//visit//')
@doc.route('/api/1/origin/visit/')
@doc.arg('origin_id',
default=1,
argtype=doc.argtypes.int,
argdoc='software origin identifier')
@doc.arg('visit_id',
default=1,
argtype=doc.argtypes.int,
argdoc="""visit identifier, relative to the origin identified by
origin_id""")
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""dictionary containing both metadata for the entire
visit (e.g., timestamp as UNIX time, visit outcome, etc.) and what
was at the software origin during the visit (i.e., a mapping from
branches to other archive objects""")
def api_origin_visit(origin_id, visit_id):
"""Get information about a specific visit of a software origin.
"""
def _enrich_origin_visit(origin_visit):
ov = origin_visit.copy()
ov['origin_url'] = url_for('api_origin', origin_id=ov['origin'])
if 'occurrences' in ov:
ov['occurrences'] = {
k: utils.enrich_object(v)
for k, v in ov['occurrences'].items()
}
return ov
return _api_lookup(
origin_id,
service.lookup_origin_visit,
'No visit %s for origin %s found' % (visit_id, origin_id),
_enrich_origin_visit,
visit_id)
@app.route('/api/1/content/symbol/', methods=['POST'])
@app.route('/api/1/content/symbol//')
@doc.route('/api/1/content/symbol/', tags=['upcoming'])
@doc.arg('q',
default='hello',
argtype=doc.argtypes.str,
argdoc="""An expression string to lookup in swh's raw content""")
@doc.header('Link', doc=_doc_header_link)
@doc.param('last_sha1', default=None,
argtype=doc.argtypes.str,
doc=_doc_arg_last_elt)
@doc.param('per_page', default=10,
argtype=doc.argtypes.int,
doc=_doc_arg_per_page)
@doc.returns(rettype=doc.rettypes.list,
retdoc="""A list of dict whose content matches the expression.
Each dict has the following keys:
- id (bytes): identifier of the content
- name (text): symbol whose content match the expression
- kind (text): kind of the symbol that matched
- lang (text): Language for that entry
- line (int): Number line for the symbol
""")
def api_content_symbol(q=None):
"""Search content objects by `Ctags `_-style
symbol (e.g., function name, data type, method, ...).
"""
result = {}
last_sha1 = request.args.get('last_sha1', None)
per_page = int(request.args.get('per_page', '10'))
def lookup_exp(exp, last_sha1=last_sha1, per_page=per_page):
return service.lookup_expression(exp, last_sha1, per_page)
symbols = _api_lookup(
q,
lookup_fn=lookup_exp,
error_msg_if_not_found='No indexed raw content match expression \''
'%s\'.' % q,
enrich_fn=lambda x: utils.enrich_content(x, top_url=True))
if symbols:
l = len(symbols)
if l == per_page:
new_last_sha1 = symbols[-1]['sha1']
params = {
'q': q,
'last_sha1': new_last_sha1,
}
if request.args.get('per_page'):
params['per_page'] = per_page
result['headers'] = {
'link-next': url_for('api_content_symbol', **params),
}
result.update({
'results': symbols
})
return result
@app.route('/api/1/content/known/', methods=['POST'])
@app.route('/api/1/content/known//')
@doc.route('/api/1/content/known/')
@doc.arg('q',
default='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc',
- argtype=doc.argtypes.algo_and_hash,
- argdoc=_doc_arg_content_id)
-@doc.param('q', default=None,
- argtype=doc.argtypes.str,
- doc="""(POST request) An algo_hash:hash string, where algo_hash
- is one of sha1, sha1_git or sha256 and hash is the hash to
- search for in SWH""")
+ argtype=doc.argtypes.sha1,
+ argdoc='content identifier as a sha1 checksum')
+# @doc.param('q', default=None,
+# argtype=doc.argtypes.str,
+# doc="""(POST request) An algo_hash:hash string, where algo_hash
+# is one of sha1, sha1_git or sha256 and hash is the hash to
+# search for in SWH""")
@doc.raises(exc=doc.excs.badinput, doc=_doc_exc_bad_id)
@doc.returns(rettype=doc.rettypes.dict,
- retdoc="""A dict with keys:
-
- - search_res: a list of dicts corresponding to queried content
- with key 'found' to True if found, 'False' if not
- - search_stats: a dict containing number of files searched and
- percentage of files found
- """)
+ retdoc="""a dictionary with results (found/not found for each given
+ identifier) and statistics about how many identifiers were
+ found""")
def api_check_content_known(q=None):
"""Check whether some content (AKA "blob") is present in the archive.
- **TODO** pending review
-
Lookup can be performed by various means:
- - a GET request with many hashes (limited to the size of parameter
- we can pass in url) a POST request with many hashes, with the
- - request body containing identifiers (typically filenames) as
- - keys and corresponding hashes as values.
+ - a GET request with one or several hashes, separated by ','
+ - a POST request with one or several hashes, passed as (multiple) values
+ for parameter 'q'
"""
-
response = {'search_res': None,
'search_stats': None}
search_stats = {'nbfiles': 0, 'pct': 0}
search_res = None
queries = []
# GET: Many hash separated values request
if q:
hashes = q.split(',')
for v in hashes:
queries.append({'filename': None, 'sha1': v})
# POST: Many hash requests in post form submission
elif request.method == 'POST':
data = request.form
# Remove potential inputs with no associated value
for k, v in data.items():
if v is not None:
if k == 'q' and len(v) > 0:
queries.append({'filename': None, 'sha1': v})
elif v != '':
queries.append({'filename': k, 'sha1': v})
if queries:
lookup = service.lookup_multiple_hashes(queries)
result = []
l = len(queries)
for el in lookup:
- result.append({'filename': el['filename'],
- 'sha1': el['sha1'],
- 'found': el['found']})
+ res_d = {'sha1': el['sha1'],
+ 'found': el['found']}
+ if el['filename']:
+ res_d['filename'] = el['filename']
+ result.append(res_d)
search_res = result
nbfound = len([x for x in lookup if x['found']])
search_stats['nbfiles'] = l
search_stats['pct'] = (nbfound / l) * 100
response['search_res'] = search_res
response['search_stats'] = search_stats
return response
def _api_lookup(criteria,
lookup_fn,
error_msg_if_not_found,
enrich_fn=lambda x: x,
*args):
"""Capture a redundant behavior of:
- looking up the backend with a criteria (be it an identifier or checksum)
passed to the function lookup_fn
- if nothing is found, raise an NotFoundExc exception with error
message error_msg_if_not_found.
- Otherwise if something is returned:
- either as list, map or generator, map the enrich_fn function to it
and return the resulting data structure as list.
- either as dict and pass to enrich_fn and return the dict enriched.
Args:
- criteria: discriminating criteria to lookup
- lookup_fn: function expects one criteria and optional supplementary
*args.
- error_msg_if_not_found: if nothing matching the criteria is found,
raise NotFoundExc with this error message.
- enrich_fn: Function to use to enrich the result returned by
lookup_fn. Default to the identity function if not provided.
- *args: supplementary arguments to pass to lookup_fn.
Raises:
NotFoundExp or whatever `lookup_fn` raises.
"""
res = lookup_fn(criteria, *args)
if not res:
raise NotFoundExc(error_msg_if_not_found)
if isinstance(res, (map, list, GeneratorType)):
return [enrich_fn(x) for x in res]
return enrich_fn(res)
@app.route('/api/1/origin//')
@app.route('/api/1/origin//url/')
@doc.route('/api/1/origin/')
@doc.arg('origin_id',
default=1,
argtype=doc.argtypes.int,
argdoc='origin identifier (when looking up by ID)')
@doc.arg('origin_type',
default='git',
argtype=doc.argtypes.str,
argdoc='origin type (when looking up by type+URL)')
@doc.arg('origin_url',
default='https://github.com/hylang/hy',
argtype=doc.argtypes.path,
argdoc='origin URL (when looking up by type+URL')
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""The metadata of the origin corresponding to the given
criteria""")
def api_origin(origin_id=None, origin_type=None, origin_url=None):
"""Get information about a software origin.
Software origins might be looked up by origin type and canonical URL (e.g.,
"git" + a "git clone" URL), or by their unique (but otherwise meaningless)
identifier.
"""
ori_dict = {
'id': origin_id,
'type': origin_type,
'url': origin_url
}
ori_dict = {k: v for k, v in ori_dict.items() if ori_dict[k]}
if 'id' in ori_dict:
error_msg = 'Origin with id %s not found.' % ori_dict['id']
else:
error_msg = 'Origin with type %s and URL %s not found' % (
ori_dict['type'], ori_dict['url'])
def _enrich_origin(origin):
if 'id' in origin:
o = origin.copy()
o['origin_visits_url'] = url_for('api_origin_visits',
origin_id=o['id'])
return o
return origin
return _api_lookup(
ori_dict, lookup_fn=service.lookup_origin,
error_msg_if_not_found=error_msg,
enrich_fn=_enrich_origin)
@app.route('/api/1/person//')
@doc.route('/api/1/person/')
@doc.arg('person_id',
default=42,
argtype=doc.argtypes.int,
argdoc='person identifier')
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict,
retdoc='The metadata of the person identified by person_id')
def api_person(person_id):
"""Get information about a person.
"""
return _api_lookup(
person_id, lookup_fn=service.lookup_person,
error_msg_if_not_found='Person with id %s not found.' % person_id)
@app.route('/api/1/release//')
@doc.route('/api/1/release/')
@doc.arg('sha1_git',
default='7045404f3d1c54e6473c71bbb716529fbad4be24',
argtype=doc.argtypes.sha1_git,
argdoc='release identifier')
@doc.raises(exc=doc.excs.badinput, doc=_doc_exc_bad_id)
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict,
retdoc='The metadata of the release identified by sha1_git')
def api_release(sha1_git):
"""Get information about a release.
Releases are identified by SHA1 checksums, compatible with Git tag
identifiers. See ``release_identifier`` in our `data model module
`_
for details about how they are computed.
"""
error_msg = 'Release with sha1_git %s not found.' % sha1_git
return _api_lookup(
sha1_git,
lookup_fn=service.lookup_release,
error_msg_if_not_found=error_msg,
enrich_fn=utils.enrich_release)
def _revision_directory_by(revision, path, request_path,
limit=100, with_data=False):
"""Compute the revision matching criterion's directory or content data.
Args:
revision: dictionary of criterions representing a revision to lookup
path: directory's path to lookup
request_path: request path which holds the original context to
limit: optional query parameter to limit the revisions log
(default to 100). For now, note that this limit could impede the
transitivity conclusion about sha1_git not being an ancestor of
with_data: indicate to retrieve the content's raw data if path resolves
to a content.
"""
def enrich_directory_local(dir, context_url=request_path):
return utils.enrich_directory(dir, context_url)
rev_id, result = service.lookup_directory_through_revision(
revision, path, limit=limit, with_data=with_data)
content = result['content']
if result['type'] == 'dir': # dir_entries
result['content'] = list(map(enrich_directory_local, content))
else: # content
result['content'] = utils.enrich_content(content)
return result
@app.route('/api/1/revision'
'/origin/'
'/directory/')
@app.route('/api/1/revision'
'/origin/'
'/directory//')
@app.route('/api/1/revision'
'/origin/'
'/branch/'
'/directory/')
@app.route('/api/1/revision'
'/origin/'
'/branch/'
'/directory//')
@app.route('/api/1/revision'
'/origin/'
'/branch/'
'/ts/'
'/directory/')
@app.route('/api/1/revision'
'/origin/'
'/branch/'
'/ts/'
'/directory//')
@doc.route('/api/1/revision/origin/directory/', tags=['hidden'])
@doc.arg('origin_id',
default=1,
argtype=doc.argtypes.int,
argdoc="The revision's origin's SWH identifier")
@doc.arg('branch_name',
default='refs/heads/master',
argtype=doc.argtypes.path,
argdoc="""The optional branch for the given origin (default
to master""")
@doc.arg('ts',
default='2000-01-17T11:23:54+00:00',
argtype=doc.argtypes.ts,
argdoc="""Optional timestamp (default to the nearest time
crawl of timestamp)""")
@doc.arg('path',
default='Dockerfile',
argtype=doc.argtypes.path,
argdoc='The path to the directory or file to display')
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""The metadata of the revision corresponding to the
given criteria""")
def api_directory_through_revision_origin(origin_id,
branch_name="refs/heads/master",
ts=None,
path=None,
with_data=False):
"""Display directory or content information through a revision identified
by origin/branch/timestamp.
"""
if ts:
ts = utils.parse_timestamp(ts)
return _revision_directory_by(
{
'origin_id': origin_id,
'branch_name': branch_name,
'ts': ts
},
path,
request.path,
with_data=with_data)
@app.route('/api/1/revision'
'/origin//')
@app.route('/api/1/revision'
'/origin/'
'/branch//')
@app.route('/api/1/revision'
'/origin/'
'/branch/'
'/ts//')
@app.route('/api/1/revision'
'/origin/'
'/ts//')
@doc.route('/api/1/revision/origin/')
@doc.arg('origin_id',
default=1,
argtype=doc.argtypes.int,
argdoc='software origin identifier')
@doc.arg('branch_name',
default='refs/heads/master',
argtype=doc.argtypes.path,
argdoc="""(optional) fully-qualified branch name, e.g.,
"refs/heads/master". Defaults to the master branch.""")
@doc.arg('ts',
default='2000-01-17T11:23:54+00:00',
argtype=doc.argtypes.ts,
argdoc="""(optional) timestamp close to which the revision pointed by
the given branch should be looked up. Defaults to now.""")
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict, retdoc=_doc_ret_revision_meta)
def api_revision_with_origin(origin_id,
branch_name="refs/heads/master",
ts=None):
"""Get information about a revision, searching for it based on software
origin, branch name, and/or visit timestamp.
This endpoint behaves like ``/revision``, but operates on the revision that
has been found at a given software origin, close to a given point in time,
pointed by a given branch.
"""
if ts:
ts = utils.parse_timestamp(ts)
return _api_lookup(
origin_id,
service.lookup_revision_by,
'Revision with (origin_id: %s, branch_name: %s'
', ts: %s) not found.' % (origin_id,
branch_name,
ts),
utils.enrich_revision,
branch_name,
ts)
@app.route('/api/1/revision//prev//')
@doc.route('/api/1/revision/prev/', tags=['hidden'])
@doc.arg('sha1_git',
default='ec72c666fb345ea5f21359b7bc063710ce558e39',
argtype=doc.argtypes.sha1_git,
argdoc="The revision's sha1_git identifier")
@doc.arg('context',
default='6adc4a22f20bbf3bbc754f1ec8c82be5dfb5c71a',
argtype=doc.argtypes.path,
argdoc='The navigation breadcrumbs -- use at your own risk')
@doc.raises(exc=doc.excs.badinput, doc=_doc_exc_bad_id)
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict,
retdoc='The metadata of the revision identified by sha1_git')
def api_revision_with_context(sha1_git, context):
"""Return information about revision with id sha1_git.
"""
def _enrich_revision(revision, context=context):
return utils.enrich_revision(revision, context)
return _api_lookup(
sha1_git,
service.lookup_revision,
'Revision with sha1_git %s not found.' % sha1_git,
_enrich_revision)
@app.route('/api/1/revision//')
@doc.route('/api/1/revision/')
@doc.arg('sha1_git',
default='aafb16d69fd30ff58afdd69036a26047f3aebdc6',
argtype=doc.argtypes.sha1_git,
argdoc="revision identifier")
@doc.raises(exc=doc.excs.badinput, doc=_doc_exc_bad_id)
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict, retdoc=_doc_ret_revision_meta)
def api_revision(sha1_git):
"""Get information about a revision.
Revisions are identified by SHA1 checksums, compatible with Git commit
identifiers. See ``revision_identifier`` in our `data model module
`_
for details about how they are computed.
"""
return _api_lookup(
sha1_git,
service.lookup_revision,
'Revision with sha1_git %s not found.' % sha1_git,
utils.enrich_revision)
@app.route('/api/1/revision//raw/')
@doc.route('/api/1/revision/raw/', tags=['hidden'])
@doc.arg('sha1_git',
default='ec72c666fb345ea5f21359b7bc063710ce558e39',
argtype=doc.argtypes.sha1_git,
argdoc="The queried revision's sha1_git identifier")
@doc.raises(exc=doc.excs.badinput, doc=_doc_exc_bad_id)
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.octet_stream,
retdoc="""The message of the revision identified by sha1_git
as a downloadable octet stream""")
def api_revision_raw_message(sha1_git):
"""Return the raw data of the message of revision identified by sha1_git
"""
raw = service.lookup_revision_message(sha1_git)
return app.response_class(raw['message'],
headers={'Content-disposition': 'attachment;'
'filename=rev_%s_raw' % sha1_git},
mimetype='application/octet-stream')
@app.route('/api/1/revision//directory/')
@app.route('/api/1/revision//directory//')
@doc.route('/api/1/revision/directory/')
@doc.arg('sha1_git',
default='ec72c666fb345ea5f21359b7bc063710ce558e39',
argtype=doc.argtypes.sha1_git,
argdoc='revision identifier')
@doc.arg('dir_path',
default='Documentation/BUG-HUNTING',
argtype=doc.argtypes.path,
argdoc="""path relative to the root directory of revision identifier by
sha1_git""")
@doc.raises(exc=doc.excs.badinput, doc=_doc_exc_bad_id)
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""either a list of directory entries with their metadata,
or the metadata of a single directory entry""")
def api_revision_directory(sha1_git,
dir_path=None,
with_data=False):
"""Get information about directory (entry) objects associated to revisions.
Each revision is associated to a single "root" directory. This endpoint
behaves like ``/directory/``, but operates on the root directory associated
to a given revision.
"""
return _revision_directory_by(
{
'sha1_git': sha1_git
},
dir_path,
request.path,
with_data=with_data)
@app.route('/api/1/revision//log/')
@app.route('/api/1/revision//prev//log/')
@doc.route('/api/1/revision/log/')
@doc.arg('sha1_git',
default='37fc9e08d0c4b71807a4f1ecb06112e78d91c283',
argtype=doc.argtypes.sha1_git,
argdoc='revision identifier')
# @doc.arg('prev_sha1s',
# default='6adc4a22f20bbf3bbc754f1ec8c82be5dfb5c71a',
# argtype=doc.argtypes.path,
# argdoc="""(Optional) Navigation breadcrumbs (descendant revisions
# previously visited). If multiple values, use / as delimiter. """)
@doc.header('Link', doc=_doc_header_link)
@doc.param('per_page', default=10,
argtype=doc.argtypes.int,
doc=_doc_arg_per_page)
@doc.raises(exc=doc.excs.badinput, doc=_doc_exc_bad_id)
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict, retdoc=_doc_ret_revision_log)
def api_revision_log(sha1_git, prev_sha1s=None):
"""Get a list of all revisions heading to a given one, i.e., show the
commit log.
"""
result = {}
per_page = int(request.args.get('per_page', '10'))
def lookup_revision_log_with_limit(s, limit=per_page+1):
return service.lookup_revision_log(s, limit)
error_msg = 'Revision with sha1_git %s not found.' % sha1_git
rev_get = _api_lookup(sha1_git,
lookup_fn=lookup_revision_log_with_limit,
error_msg_if_not_found=error_msg,
enrich_fn=utils.enrich_revision)
l = len(rev_get)
if l == per_page+1:
rev_backward = rev_get[:-1]
new_last_sha1 = rev_get[-1]['id']
params = {
'sha1_git': new_last_sha1,
}
if request.args.get('per_page'):
params['per_page'] = per_page
result['headers'] = {
'link-next': url_for('api_revision_log', **params)
}
else:
rev_backward = rev_get
if not prev_sha1s: # no nav breadcrumbs, so we're done
revisions = rev_backward
else:
rev_forward_ids = prev_sha1s.split('/')
rev_forward = _api_lookup(rev_forward_ids,
lookup_fn=service.lookup_revision_multiple,
error_msg_if_not_found=error_msg,
enrich_fn=utils.enrich_revision)
revisions = rev_forward + rev_backward
result.update({
'results': revisions
})
return result
@app.route('/api/1/revision'
'/origin//log/')
@app.route('/api/1/revision'
'/origin/'
'/branch//log/')
@app.route('/api/1/revision'
'/origin/'
'/branch/'
'/ts//log/')
@app.route('/api/1/revision'
'/origin/'
'/ts//log/')
@doc.route('/api/1/revision/origin/log/')
@doc.arg('origin_id',
default=1,
argtype=doc.argtypes.int,
argdoc="The revision's SWH origin identifier")
@doc.arg('branch_name',
default='refs/heads/master',
argtype=doc.argtypes.path,
argdoc="""(Optional) The revision's branch name within the origin specified.
Defaults to 'refs/heads/master'.""")
@doc.arg('ts',
default='2000-01-17T11:23:54+00:00',
argtype=doc.argtypes.ts,
argdoc="""(Optional) A time or timestamp string to parse""")
@doc.header('Link', doc=_doc_header_link)
@doc.param('per_page', default=10,
argtype=doc.argtypes.int,
doc=_doc_arg_per_page)
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict, retdoc=_doc_ret_revision_log)
def api_revision_log_by(origin_id,
branch_name='refs/heads/master',
ts=None):
"""Show the commit log for a revision, searching for it based on software
origin, branch name, and/or visit timestamp.
This endpoint behaves like ``/log``, but operates on the revision that
has been found at a given software origin, close to a given point in time,
pointed by a given branch.
"""
result = {}
per_page = int(request.args.get('per_page', '10'))
if ts:
ts = utils.parse_timestamp(ts)
def lookup_revision_log_by_with_limit(o_id, br, ts, limit=per_page+1):
return service.lookup_revision_log_by(o_id, br, ts, limit)
error_msg = 'No revision matching origin %s ' % origin_id
error_msg += ', branch name %s' % branch_name
error_msg += (' and time stamp %s.' % ts) if ts else '.'
rev_get = _api_lookup(origin_id,
lookup_revision_log_by_with_limit,
error_msg,
utils.enrich_revision,
branch_name,
ts)
l = len(rev_get)
if l == per_page+1:
revisions = rev_get[:-1]
last_sha1_git = rev_get[-1]['id']
params = {
'origin_id': origin_id,
'branch_name': branch_name,
'ts': ts,
'sha1_git': last_sha1_git,
}
if request.args.get('per_page'):
params['per_page'] = per_page
result['headers'] = {
'link-next': url_for('api_revision_log_by', **params),
}
else:
revisions = rev_get
result.update({'results': revisions})
return result
@app.route('/api/1/directory//')
@app.route('/api/1/directory///')
@doc.route('/api/1/directory/')
@doc.arg('sha1_git',
default='1bd0e65f7d2ff14ae994de17a1e7fe65111dcad8',
argtype=doc.argtypes.sha1_git,
argdoc='directory identifier')
@doc.arg('path',
default='codec/demux',
argtype=doc.argtypes.path,
argdoc='path relative to directory identified by sha1_git')
@doc.raises(exc=doc.excs.badinput, doc=_doc_exc_bad_id)
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""either a list of directory entries with their metadata,
or the metadata of a single directory entry""")
def api_directory(sha1_git,
path=None):
"""Get information about directory or directory entry objects.
Directories are identified by SHA1 checksums, compatible with Git directory
identifiers. See ``directory_identifier`` in our `data model module
`_
for details about how they are computed.
When given only a directory identifier, this endpoint returns information
about the directory itself, returning its content (usually a list of
directory entries). When given a directory identifier and a path, this
endpoint returns information about the directory entry pointed by the
relative path, starting path resolution from the given directory.
"""
if path:
error_msg_path = ('Entry with path %s relative to directory '
'with sha1_git %s not found.') % (path, sha1_git)
return _api_lookup(
sha1_git,
service.lookup_directory_with_path,
error_msg_path,
utils.enrich_directory,
path)
else:
error_msg_nopath = 'Directory with sha1_git %s not found.' % sha1_git
return _api_lookup(
sha1_git,
service.lookup_directory,
error_msg_nopath,
utils.enrich_directory)
@app.route('/api/1/provenance//')
@doc.route('/api/1/provenance/', tags=['hidden'])
@doc.arg('q',
default='sha1_git:88b9b366facda0b5ff8d8640ee9279bed346f242',
argtype=doc.argtypes.algo_and_hash,
argdoc=_doc_arg_content_id)
@doc.raises(exc=doc.excs.badinput, doc=_doc_exc_bad_id)
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""List of provenance information (dict) for the matched
content.""")
def api_content_provenance(q):
"""Return content's provenance information if any.
"""
def _enrich_revision(provenance):
p = provenance.copy()
p['revision_url'] = url_for('api_revision',
sha1_git=provenance['revision'])
p['content_url'] = url_for('api_content_metadata',
q='sha1_git:%s' % provenance['content'])
p['origin_url'] = url_for('api_origin',
origin_id=provenance['origin'])
p['origin_visits_url'] = url_for('api_origin_visits',
origin_id=provenance['origin'])
p['origin_visit_url'] = url_for('api_origin_visit',
origin_id=provenance['origin'],
visit_id=provenance['visit'])
return p
return _api_lookup(
q,
lookup_fn=service.lookup_content_provenance,
error_msg_if_not_found='Content with %s not found.' % q,
enrich_fn=_enrich_revision)
@app.route('/api/1/filetype//')
@doc.route('/api/1/filetype/', tags=['upcoming'])
@doc.arg('q',
default='sha1:1fc6129a692e7a87b5450e2ba56e7669d0c5775d',
argtype=doc.argtypes.algo_and_hash,
argdoc=_doc_arg_content_id)
@doc.raises(exc=doc.excs.badinput, doc=_doc_exc_bad_id)
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""Filetype information (dict) for the matched
content.""")
def api_content_filetype(q):
"""Get information about the detected MIME type of a content object.
"""
return _api_lookup(
q,
lookup_fn=service.lookup_content_filetype,
error_msg_if_not_found='No filetype information found '
'for content %s.' % q,
enrich_fn=utils.enrich_metadata_endpoint)
@app.route('/api/1/language//')
@doc.route('/api/1/language/', tags=['upcoming'])
@doc.arg('q',
default='sha1:1fc6129a692e7a87b5450e2ba56e7669d0c5775d',
argtype=doc.argtypes.algo_and_hash,
argdoc=_doc_arg_content_id)
@doc.raises(exc=doc.excs.badinput, doc=_doc_exc_bad_id)
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""Language information (dict) for the matched
content.""")
def api_content_language(q):
"""Get information about the detected (programming) language of a content
object.
"""
return _api_lookup(
q,
lookup_fn=service.lookup_content_language,
error_msg_if_not_found='No language information found '
'for content %s.' % q,
enrich_fn=utils.enrich_metadata_endpoint)
@app.route('/api/1/license//')
@doc.route('/api/1/license/', tags=['upcoming'])
@doc.arg('q',
default='sha1:1fc6129a692e7a87b5450e2ba56e7669d0c5775d',
argtype=doc.argtypes.algo_and_hash,
argdoc=_doc_arg_content_id)
@doc.raises(exc=doc.excs.badinput, doc=_doc_exc_bad_id)
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""License information (dict) for the matched
content.""")
def api_content_license(q):
"""Get information about the detected license of a content object.
"""
return _api_lookup(
q,
lookup_fn=service.lookup_content_license,
error_msg_if_not_found='No license information found '
'for content %s.' % q,
enrich_fn=utils.enrich_metadata_endpoint)
@app.route('/api/1/ctags//')
@doc.route('/api/1/ctags/', tags=['upcoming'])
@doc.arg('q',
default='sha1:1fc6129a692e7a87b5450e2ba56e7669d0c5775d',
argtype=doc.argtypes.algo_and_hash,
argdoc=_doc_arg_content_id)
@doc.raises(exc=doc.excs.badinput, doc=_doc_exc_bad_id)
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""Ctags symbol (dict) for the matched
content.""")
def api_content_ctags(q):
"""Get information about all `Ctags `_-style
symbols defined in a content object.
"""
return _api_lookup(
q,
lookup_fn=service.lookup_content_ctags,
error_msg_if_not_found='No ctags symbol found '
'for content %s.' % q,
enrich_fn=utils.enrich_metadata_endpoint)
@app.route('/api/1/content//raw/')
@doc.route('/api/1/content/raw/', tags=['upcoming'])
@doc.arg('q',
default='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc',
argtype=doc.argtypes.algo_and_hash,
argdoc=_doc_arg_content_id)
@doc.raises(exc=doc.excs.badinput, doc=_doc_exc_bad_id)
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.octet_stream,
retdoc='The raw content data as an octet stream')
def api_content_raw(q):
"""Get the raw content of a content object (AKA "blob"), as a byte sequence.
"""
def generate(content):
yield content['data']
content = service.lookup_content_raw(q)
if not content:
raise NotFoundExc('Content with %s not found.' % q)
return app.response_class(generate(content),
headers={'Content-disposition': 'attachment;'
'filename=content_%s_raw' % q},
mimetype='application/octet-stream')
@app.route('/api/1/content//')
@doc.route('/api/1/content/')
@doc.arg('q',
default='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc',
argtype=doc.argtypes.algo_and_hash,
argdoc=_doc_arg_content_id)
@doc.raises(exc=doc.excs.badinput, doc=_doc_exc_bad_id)
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""known metadata for content identified by q""")
def api_content_metadata(q):
"""Get information about a content (AKA "blob") object.
"""
return _api_lookup(
q,
lookup_fn=service.lookup_content,
error_msg_if_not_found='Content with %s not found.' % q,
enrich_fn=utils.enrich_content)
@app.route('/api/1/entity//')
@doc.route('/api/1/entity/', tags=['hidden'])
@doc.arg('uuid',
default='5f4d4c51-498a-4e28-88b3-b3e4e8396cba',
argtype=doc.argtypes.uuid,
argdoc="The entity's uuid identifier")
@doc.raises(exc=doc.excs.badinput, doc=_doc_exc_bad_id)
@doc.raises(exc=doc.excs.notfound, doc=_doc_exc_id_not_found)
@doc.returns(rettype=doc.rettypes.dict,
retdoc='The metadata of the entity identified by uuid')
def api_entity_by_uuid(uuid):
"""Return content information if content is found.
"""
return _api_lookup(
uuid,
lookup_fn=service.lookup_entity_by_uuid,
error_msg_if_not_found="Entity with uuid '%s' not found." % uuid,
enrich_fn=utils.enrich_entity)