diff --git a/swh/web/api/apiresponse.py b/swh/web/api/apiresponse.py
index 6daaa54c..cb39f51b 100644
--- a/swh/web/api/apiresponse.py
+++ b/swh/web/api/apiresponse.py
@@ -1,184 +1,190 @@
# Copyright (C) 2017-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
import traceback
+from django.utils.html import escape
+
from rest_framework.response import Response
from swh.storage.exc import StorageDBError, StorageAPIError
from swh.web.api import utils
from swh.web.common.exc import NotFoundExc, ForbiddenExc
from swh.web.common.utils import shorten_path, gen_path_info
from swh.web.config import get_config
def compute_link_header(rv, options):
"""Add Link header in returned value results.
Args:
rv (dict): dictionary with keys:
- headers: potential headers with 'link-next' and 'link-prev'
keys
- results: containing the result to return
options (dict): the initial dict to update with result if any
Returns:
dict: dictionary with optional keys 'link-next' and 'link-prev'
"""
link_headers = []
if 'headers' not in rv:
return {}
rv_headers = rv['headers']
if 'link-next' in rv_headers:
link_headers.append('<%s>; rel="next"' % (
rv_headers['link-next']))
if 'link-prev' in rv_headers:
link_headers.append('<%s>; rel="previous"' % (
rv_headers['link-prev']))
if link_headers:
link_header_str = ','.join(link_headers)
headers = options.get('headers', {})
headers.update({
'Link': link_header_str
})
return headers
return {}
def filter_by_fields(request, data):
"""Extract a request parameter 'fields' if it exists to permit the filtering on
the data dict's keys.
If such field is not provided, returns the data as is.
"""
fields = request.query_params.get('fields')
if fields:
fields = set(fields.split(','))
data = utils.filter_field_keys(data, fields)
return data
def transform(rv):
"""Transform an eventual returned value with multiple layer of
information with only what's necessary.
If the returned value rv contains the 'results' key, this is the
associated value which is returned.
Otherwise, return the initial dict without the potential 'headers'
key.
"""
if 'results' in rv:
return rv['results']
if 'headers' in rv:
rv.pop('headers')
return rv
def make_api_response(request, data, doc_data={}, options={}):
"""Generates an API response based on the requested mimetype.
Args:
request: a DRF Request object
data: raw data to return in the API response
doc_data: documentation data for HTML response
options: optional data that can be used to generate the response
Returns:
a DRF Response a object
"""
if data:
options['headers'] = compute_link_header(data, options)
data = transform(data)
data = filter_by_fields(request, data)
doc_env = doc_data
headers = {}
if 'headers' in options:
doc_env['headers_data'] = options['headers']
headers = options['headers']
# get request status code
doc_env['status_code'] = options.get('status', 200)
response_args = {'status': doc_env['status_code'],
'headers': headers,
'content_type': request.accepted_media_type}
# when requesting HTML, typically when browsing the API through its
# documented views, we need to enrich the input data with documentation
# related ones and inform DRF that we request HTML template rendering
if request.accepted_media_type == 'text/html':
if data:
data = json.dumps(data, sort_keys=True,
indent=4,
separators=(',', ': '))
doc_env['response_data'] = data
doc_env['request'] = {
'path': request.path,
'method': request.method,
'absolute_uri': request.build_absolute_uri(),
}
doc_env['heading'] = shorten_path(str(request.path))
if 'route' in doc_env:
doc_env['endpoint_path'] = gen_path_info(doc_env['route'])
response_args['data'] = doc_env
response_args['template_name'] = 'api/apidoc.html'
# otherwise simply return the raw data and let DRF picks
# the correct renderer (JSON or YAML)
else:
response_args['data'] = data
return Response(**response_args)
def error_response(request, error, doc_data):
"""Private function to create a custom error response.
Args:
request: a DRF Request object
error: the exception that caused the error
doc_data: documentation data for HTML response
"""
error_code = 400
if isinstance(error, NotFoundExc):
error_code = 404
elif isinstance(error, ForbiddenExc):
error_code = 403
elif isinstance(error, StorageDBError):
error_code = 503
elif isinstance(error, StorageAPIError):
error_code = 503
error_opts = {'status': error_code}
error_data = {
'exception': error.__class__.__name__,
'reason': str(error),
}
+
+ if request.accepted_media_type == 'text/html':
+ error_data['reason'] = escape(error_data['reason'])
+
if get_config()['debug']:
error_data['traceback'] = traceback.format_exc()
return make_api_response(request, error_data, doc_data,
options=error_opts)
diff --git a/swh/web/assets/src/bundles/browse/origin-search.js b/swh/web/assets/src/bundles/browse/origin-search.js
index 91b3d6df..83e36358 100644
--- a/swh/web/assets/src/bundles/browse/origin-search.js
+++ b/swh/web/assets/src/bundles/browse/origin-search.js
@@ -1,238 +1,238 @@
/**
* Copyright (C) 2018 The Software Heritage developers
* See the AUTHORS file at the top-level directory of this distribution
* License: GNU Affero General Public License version 3, or any later version
* See top-level LICENSE file for more information
*/
import {heapsPermute} from 'utils/heaps-permute';
import {handleFetchError} from 'utils/functions';
let originPatterns;
let perPage = 100;
let limit = perPage * 2;
let offset = 0;
let currentData = null;
let inSearch = false;
function fixTableRowsStyle() {
setTimeout(() => {
$('#origin-search-results tbody tr').removeAttr('style');
});
}
function clearOriginSearchResultsTable() {
$('#origin-search-results tbody tr').remove();
}
function populateOriginSearchResultsTable(data, offset) {
let localOffset = offset % limit;
if (data.length > 0) {
$('#swh-origin-search-results').show();
$('#swh-no-result').hide();
clearOriginSearchResultsTable();
let table = $('#origin-search-results tbody');
for (let i = localOffset; i < localOffset + perPage && i < data.length; ++i) {
let elem = data[i];
let browseUrl = Urls.browse_origin(elem.url);
let tableRow = `
';
table.append(tableRow);
// get async latest visit snapshot and update visit status icon
let latestSnapshotUrl = Urls.browse_origin_latest_snapshot(elem.id);
fetch(latestSnapshotUrl)
.then(response => response.json())
.then(data => {
let originId = elem.id;
$(`#visit-status-origin-${originId}`).children().remove();
if (data) {
$(`#visit-status-origin-${originId}`).append('');
} else {
$(`#visit-status-origin-${originId}`).append('');
if ($('#swh-filter-empty-visits').prop('checked')) {
$(`#origin-${originId}`).remove();
}
}
});
}
fixTableRowsStyle();
} else {
$('#swh-origin-search-results').hide();
$('#swh-no-result').text('No origins matching the search criteria were found.');
$('#swh-no-result').show();
}
if (data.length - localOffset < perPage ||
(data.length < limit && (localOffset + perPage) === data.length)) {
$('#origins-next-results-button').addClass('disabled');
} else {
$('#origins-next-results-button').removeClass('disabled');
}
if (offset > 0) {
$('#origins-prev-results-button').removeClass('disabled');
} else {
$('#origins-prev-results-button').addClass('disabled');
}
inSearch = false;
setTimeout(() => {
window.scrollTo(0, 0);
});
}
function escapeStringRegexp(str) {
let matchOperatorsRe = /[|\\{}()[\]^$+*?.]/g;
return str.replace(matchOperatorsRe, '\\\\\\$&');
}
function searchOrigins(patterns, limit, searchOffset, offset) {
let baseSearchUrl;
let searchMetadata = $('#swh-search-origin-metadata').prop('checked');
if (searchMetadata) {
baseSearchUrl = Urls.api_origin_metadata_search() + `?fulltext=${patterns}`;
} else {
originPatterns = patterns;
let patternsArray = patterns.trim().replace(/\s+/g, ' ').split(' ');
for (let i = 0; i < patternsArray.length; ++i) {
patternsArray[i] = escapeStringRegexp(patternsArray[i]);
}
// url length must be less than 4096 for modern browsers
// assuming average word length, 6 is max patternArray.length
if (patternsArray.length < 7) {
let patternsPermut = [];
heapsPermute(patternsArray, p => patternsPermut.push(p.join('.*')));
let regex = patternsPermut.join('|');
baseSearchUrl = Urls.browse_origin_search(regex) + `?regexp=true`;
} else {
baseSearchUrl = Urls.browse_origin_search(patternsArray.join('.*')) + `?regexp=true`;
}
}
let withVisit = $('#swh-search-origins-with-visit').prop('checked');
let searchUrl = baseSearchUrl + `&limit=${limit}&offset=${searchOffset}&with_visit=${withVisit}`;
clearOriginSearchResultsTable();
$('.swh-loading').addClass('show');
fetch(searchUrl)
.then(handleFetchError)
.then(response => response.json())
.then(data => {
currentData = data;
$('.swh-loading').removeClass('show');
populateOriginSearchResultsTable(data, offset);
})
.catch(response => {
$('.swh-loading').removeClass('show');
inSearch = false;
$('#swh-origin-search-results').hide();
$('#swh-no-result').text(`Error ${response.status}: ${response.statusText}`);
$('#swh-no-result').show();
});
}
function doSearch() {
$('#swh-no-result').hide();
let patterns = $('#origins-url-patterns').val();
offset = 0;
inSearch = true;
// first try to resolve a swh persistent identifier
let resolvePidUrl = Urls.api_resolve_swh_pid(patterns);
fetch(resolvePidUrl)
.then(handleFetchError)
.then(response => response.json())
.then(data => {
// pid has been successfully resolved,
// so redirect to browse page
window.location = data.browse_url;
})
.catch(response => {
// pid resolving failed
if (patterns.startsWith('swh:')) {
// display a useful error message if the input
// looks like a swh pid
response.json().then(data => {
$('#swh-origin-search-results').hide();
$('.swh-search-pagination').hide();
$('#swh-no-result').text(data.reason);
$('#swh-no-result').show();
});
} else {
// otherwise, proceed with origins search
$('#swh-origin-search-results').show();
$('.swh-search-pagination').show();
searchOrigins(patterns, limit, offset, offset);
}
});
}
export function initOriginSearch() {
$(document).ready(() => {
$('#swh-search-origins').submit(event => {
event.preventDefault();
let patterns = $('#origins-url-patterns').val().trim();
let withVisit = $('#swh-search-origins-with-visit').prop('checked');
let withContent = $('#swh-filter-empty-visits').prop('checked');
let searchMetadata = $('#swh-search-origin-metadata').prop('checked');
let queryParameters = '?q=' + encodeURIComponent(patterns);
if (withVisit) {
queryParameters += '&with_visit';
}
if (withContent) {
queryParameters += '&with_content';
}
if (searchMetadata) {
queryParameters += '&search_metadata';
}
// Update the url, triggering page reload and effective search
window.location.search = queryParameters;
});
$('#origins-next-results-button').click(event => {
if ($('#origins-next-results-button').hasClass('disabled') || inSearch) {
return;
}
inSearch = true;
offset += perPage;
if (!currentData || (offset >= limit && offset % limit === 0)) {
searchOrigins(originPatterns, limit, offset, offset);
} else {
populateOriginSearchResultsTable(currentData, offset);
}
event.preventDefault();
});
$('#origins-prev-results-button').click(event => {
if ($('#origins-prev-results-button').hasClass('disabled') || inSearch) {
return;
}
inSearch = true;
offset -= perPage;
if (!currentData || (offset > 0 && (offset + perPage) % limit === 0)) {
searchOrigins(originPatterns, limit, (offset + perPage) - limit, offset);
} else {
populateOriginSearchResultsTable(currentData, offset);
}
event.preventDefault();
});
$(document).on('shown.bs.tab', 'a[data-toggle="tab"]', e => {
if (e.currentTarget.text.trim() === 'Search') {
fixTableRowsStyle();
}
});
let urlParams = new URLSearchParams(window.location.search);
let query = urlParams.get('q');
let withVisit = urlParams.has('with_visit');
let withContent = urlParams.has('with_content');
let searchMetadata = urlParams.has('search_metadata');
if (query) {
$('#origins-url-patterns').val(query);
$('#swh-search-origins-with-visit').prop('checked', withVisit);
$('#swh-filter-empty-visits').prop('checked', withContent);
$('#swh-search-origin-metadata').prop('checked', searchMetadata);
doSearch();
}
});
}
diff --git a/swh/web/browse/utils.py b/swh/web/browse/utils.py
index 1098cf17..c7a2a32e 100644
--- a/swh/web/browse/utils.py
+++ b/swh/web/browse/utils.py
@@ -1,1111 +1,1113 @@
# Copyright (C) 2017-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import base64
from collections import defaultdict
import magic
import pypandoc
import stat
import textwrap
from django.core.cache import cache
from django.utils.safestring import mark_safe
+from django.utils.html import escape
from importlib import reload
from swh.model.identifiers import persistent_identifier
from swh.web.common import highlightjs, service
from swh.web.common.exc import NotFoundExc, http_status_code_message
from swh.web.common.origin_visits import get_origin_visit
from swh.web.common.utils import (
reverse, format_utc_iso_date, get_swh_persistent_id,
swh_object_icons
)
from swh.web.config import get_config
def get_directory_entries(sha1_git):
"""Function that retrieves the content of a directory
from the archive.
The directories entries are first sorted in lexicographical order.
Sub-directories and regular files are then extracted.
Args:
sha1_git: sha1_git identifier of the directory
Returns:
A tuple whose first member corresponds to the sub-directories list
and second member the regular files list
Raises:
NotFoundExc if the directory is not found
"""
cache_entry_id = 'directory_entries_%s' % sha1_git
cache_entry = cache.get(cache_entry_id)
if cache_entry:
return cache_entry
entries = list(service.lookup_directory(sha1_git))
for e in entries:
e['perms'] = stat.filemode(e['perms'])
if e['type'] == 'rev':
# modify dir entry name to explicitly show it points
# to a revision
e['name'] = '%s @ %s' % (e['name'], e['target'][:7])
dirs = [e for e in entries if e['type'] in ('dir', 'rev')]
files = [e for e in entries if e['type'] == 'file']
dirs = sorted(dirs, key=lambda d: d['name'])
files = sorted(files, key=lambda f: f['name'])
cache.set(cache_entry_id, (dirs, files))
return dirs, files
def get_mimetype_and_encoding_for_content(content):
"""Function that returns the mime type and the encoding associated to
a content buffer using the magic module under the hood.
Args:
content (bytes): a content buffer
Returns:
A tuple (mimetype, encoding), for instance ('text/plain', 'us-ascii'),
associated to the provided content.
"""
while True:
try:
magic_result = magic.detect_from_content(content)
mime_type = magic_result.mime_type
encoding = magic_result.encoding
break
except Exception:
# workaround an issue with the magic module who can fail
# if detect_from_content is called multiple times in
# a short amount of time
reload(magic)
return mime_type, encoding
# maximum authorized content size in bytes for HTML display
# with code highlighting
content_display_max_size = get_config()['content_display_max_size']
snapshot_content_max_size = get_config()['snapshot_content_max_size']
def _reencode_content(mimetype, encoding, content_data):
# encode textual content to utf-8 if needed
if mimetype.startswith('text/'):
# probably a malformed UTF-8 content, re-encode it
# by replacing invalid chars with a substitution one
if encoding == 'unknown-8bit':
content_data = content_data.decode('utf-8', 'replace')\
.encode('utf-8')
elif 'ascii' not in encoding and encoding not in ['utf-8', 'binary']:
content_data = content_data.decode(encoding, 'replace')\
.encode('utf-8')
elif mimetype.startswith('application/octet-stream'):
# file may detect a text content as binary
# so try to decode it for display
encodings = ['us-ascii']
encodings += ['iso-8859-%s' % i for i in range(1, 17)]
for encoding in encodings:
try:
content_data = content_data.decode(encoding)\
.encode('utf-8')
except Exception:
pass
else:
# ensure display in content view
mimetype = 'text/plain'
break
return mimetype, content_data
def request_content(query_string, max_size=content_display_max_size,
raise_if_unavailable=True, reencode=True):
"""Function that retrieves a content from the archive.
Raw bytes content is first retrieved, then the content mime type.
If the mime type is not stored in the archive, it will be computed
using Python magic module.
Args:
query_string: a string of the form "[ALGO_HASH:]HASH" where
optional ALGO_HASH can be either ``sha1``, ``sha1_git``,
``sha256``, or ``blake2s256`` (default to ``sha1``) and HASH
the hexadecimal representation of the hash value
max_size: the maximum size for a content to retrieve (default to 1MB,
no size limit if None)
Returns:
A tuple whose first member corresponds to the content raw bytes
and second member the content mime type
Raises:
NotFoundExc if the content is not found
"""
content_data = service.lookup_content(query_string)
filetype = None
language = None
license = None
# requests to the indexer db may fail so properly handle
# those cases in order to avoid content display errors
try:
filetype = service.lookup_content_filetype(query_string)
language = service.lookup_content_language(query_string)
license = service.lookup_content_license(query_string)
except Exception:
pass
mimetype = 'unknown'
encoding = 'unknown'
if filetype:
mimetype = filetype['mimetype']
encoding = filetype['encoding']
# workaround when encountering corrupted data due to implicit
# conversion from bytea to text in the indexer db (see T818)
# TODO: Remove that code when all data have been correctly converted
if mimetype.startswith('\\'):
filetype = None
content_data['error_code'] = 200
content_data['error_message'] = ''
content_data['error_description'] = ''
if not max_size or content_data['length'] < max_size:
try:
content_raw = service.lookup_content_raw(query_string)
except Exception as e:
if raise_if_unavailable:
raise e
else:
content_data['raw_data'] = None
content_data['error_code'] = 404
content_data['error_description'] = \
'The bytes of the content are currently not available in the archive.' # noqa
content_data['error_message'] = \
http_status_code_message[content_data['error_code']]
else:
content_data['raw_data'] = content_raw['data']
if not filetype:
mimetype, encoding = \
get_mimetype_and_encoding_for_content(content_data['raw_data']) # noqa
if reencode:
mimetype, raw_data = _reencode_content(
mimetype, encoding, content_data['raw_data'])
content_data['raw_data'] = raw_data
else:
content_data['raw_data'] = None
content_data['mimetype'] = mimetype
content_data['encoding'] = encoding
if language:
content_data['language'] = language['lang']
else:
content_data['language'] = 'not detected'
if license:
content_data['licenses'] = ', '.join(license['facts'][0]['licenses'])
else:
content_data['licenses'] = 'not detected'
return content_data
_browsers_supported_image_mimes = set(['image/gif', 'image/png',
'image/jpeg', 'image/bmp',
'image/webp', 'image/svg',
'image/svg+xml'])
def prepare_content_for_display(content_data, mime_type, path):
"""Function that prepares a content for HTML display.
The function tries to associate a programming language to a
content in order to perform syntax highlighting client-side
using highlightjs. The language is determined using either
the content filename or its mime type.
If the mime type corresponds to an image format supported
by web browsers, the content will be encoded in base64
for displaying the image.
Args:
content_data (bytes): raw bytes of the content
mime_type (string): mime type of the content
path (string): path of the content including filename
Returns:
A dict containing the content bytes (possibly different from the one
provided as parameter if it is an image) under the key 'content_data
and the corresponding highlightjs language class under the
key 'language'.
"""
language = highlightjs.get_hljs_language_from_filename(path)
if not language:
language = highlightjs.get_hljs_language_from_mime_type(mime_type)
if not language:
language = 'nohighlight'
elif mime_type.startswith('application/'):
mime_type = mime_type.replace('application/', 'text/')
if mime_type.startswith('image/'):
if mime_type in _browsers_supported_image_mimes:
content_data = base64.b64encode(content_data)
else:
content_data = None
if mime_type.startswith('image/svg'):
mime_type = 'image/svg+xml'
return {'content_data': content_data,
'language': language,
'mimetype': mime_type}
def process_snapshot_branches(snapshot):
"""
Process a dictionary describing snapshot branches: extract those
targeting revisions and releases, put them in two different lists,
then sort those lists in lexicographical order of the branches' names.
Args:
snapshot_branches (dict): A dict describing the branches of a snapshot
as returned for instance by :func:`swh.web.common.service.lookup_snapshot`
Returns:
tuple: A tuple whose first member is the sorted list of branches
targeting revisions and second member the sorted list of branches
targeting releases
""" # noqa
snapshot_branches = snapshot['branches']
branches = {}
branch_aliases = {}
releases = {}
revision_to_branch = defaultdict(set)
revision_to_release = defaultdict(set)
release_to_branch = defaultdict(set)
for branch_name, target in snapshot_branches.items():
if not target:
# FIXME: display branches with an unknown target anyway
continue
target_id = target['target']
target_type = target['target_type']
if target_type == 'revision':
branches[branch_name] = {
'name': branch_name,
'revision': target_id,
}
revision_to_branch[target_id].add(branch_name)
elif target_type == 'release':
release_to_branch[target_id].add(branch_name)
elif target_type == 'alias':
branch_aliases[branch_name] = target_id
# FIXME: handle pointers to other object types
def _enrich_release_branch(branch, release):
releases[branch] = {
'name': release['name'],
'branch_name': branch,
'date': format_utc_iso_date(release['date']),
'id': release['id'],
'message': release['message'],
'target_type': release['target_type'],
'target': release['target'],
}
def _enrich_revision_branch(branch, revision):
branches[branch].update({
'revision': revision['id'],
'directory': revision['directory'],
'date': format_utc_iso_date(revision['date']),
'message': revision['message']
})
releases_info = service.lookup_release_multiple(
release_to_branch.keys()
)
for release in releases_info:
branches_to_update = release_to_branch[release['id']]
for branch in branches_to_update:
_enrich_release_branch(branch, release)
if release['target_type'] == 'revision':
revision_to_release[release['target']].update(
branches_to_update
)
revisions = service.lookup_revision_multiple(
set(revision_to_branch.keys()) | set(revision_to_release.keys())
)
for revision in revisions:
if not revision:
continue
for branch in revision_to_branch[revision['id']]:
_enrich_revision_branch(branch, revision)
for release in revision_to_release[revision['id']]:
releases[release]['directory'] = revision['directory']
for branch_alias, branch_target in branch_aliases.items():
if branch_target in branches:
branches[branch_alias] = dict(branches[branch_target])
else:
snp = service.lookup_snapshot(snapshot['id'],
branches_from=branch_target,
branches_count=1)
if snp and branch_target in snp['branches']:
target_type = snp['branches'][branch_target]['target_type']
target = snp['branches'][branch_target]['target']
if target_type == 'revision':
branches[branch_alias] = snp['branches'][branch_target]
revision = service.lookup_revision(target)
_enrich_revision_branch(branch_alias, revision)
elif target_type == 'release':
release = service.lookup_release(target)
_enrich_release_branch(branch_alias, release)
if branch_alias in branches:
branches[branch_alias]['name'] = branch_alias
ret_branches = list(sorted(branches.values(), key=lambda b: b['name']))
ret_releases = list(sorted(releases.values(), key=lambda b: b['name']))
return ret_branches, ret_releases
def get_snapshot_content(snapshot_id):
"""Returns the lists of branches and releases
associated to a swh snapshot.
That list is put in cache in order to speedup the navigation
in the swh-web/browse ui.
.. warning:: At most 1000 branches contained in the snapshot
will be returned for performance reasons.
Args:
snapshot_id (str): hexadecimal representation of the snapshot
identifier
Returns:
A tuple with two members. The first one is a list of dict describing
the snapshot branches. The second one is a list of dict describing the
snapshot releases.
Raises:
NotFoundExc if the snapshot does not exist
"""
cache_entry_id = 'swh_snapshot_%s' % snapshot_id
cache_entry = cache.get(cache_entry_id)
if cache_entry:
return cache_entry['branches'], cache_entry['releases']
branches = []
releases = []
if snapshot_id:
snapshot = service.lookup_snapshot(
snapshot_id, branches_count=snapshot_content_max_size)
branches, releases = process_snapshot_branches(snapshot)
cache.set(cache_entry_id, {
'branches': branches,
'releases': releases,
})
return branches, releases
def get_origin_visit_snapshot(origin_info, visit_ts=None, visit_id=None,
snapshot_id=None):
"""Returns the lists of branches and releases
associated to a swh origin for a given visit.
The visit is expressed by a timestamp. In the latter case,
the closest visit from the provided timestamp will be used.
If no visit parameter is provided, it returns the list of branches
found for the latest visit.
That list is put in cache in order to speedup the navigation
in the swh-web/browse ui.
.. warning:: At most 1000 branches contained in the snapshot
will be returned for performance reasons.
Args:
origin_info (dict): a dict filled with origin information
(id, url, type)
visit_ts (int or str): an ISO date string or Unix timestamp to parse
visit_id (int): optional visit id for disambiguation in case
several visits have the same timestamp
Returns:
A tuple with two members. The first one is a list of dict describing
the origin branches for the given visit.
The second one is a list of dict describing the origin releases
for the given visit.
Raises:
NotFoundExc if the origin or its visit are not found
"""
visit_info = get_origin_visit(origin_info, visit_ts, visit_id, snapshot_id)
return get_snapshot_content(visit_info['snapshot'])
def gen_link(url, link_text=None, link_attrs=None):
"""
Utility function for generating an HTML link to insert
in Django templates.
Args:
url (str): an url
link_text (str): optional text for the produced link,
if not provided the url will be used
link_attrs (dict): optional attributes (e.g. class)
to add to the link
Returns:
An HTML link in the form 'link_text'
"""
attrs = ' '
if link_attrs:
for k, v in link_attrs.items():
attrs += '%s="%s" ' % (k, v)
if not link_text:
link_text = url
- link = '%s' % (attrs, url, link_text)
+ link = '%s' \
+ % (attrs, escape(url), escape(link_text))
return mark_safe(link)
def _snapshot_context_query_params(snapshot_context):
query_params = None
if snapshot_context and snapshot_context['origin_info']:
origin_info = snapshot_context['origin_info']
query_params = {'origin': origin_info['url']}
if 'timestamp' in snapshot_context['url_args']:
query_params['timestamp'] = \
snapshot_context['url_args']['timestamp']
if 'visit_id' in snapshot_context['query_params']:
query_params['visit_id'] = \
snapshot_context['query_params']['visit_id']
elif snapshot_context:
query_params = {'snapshot_id': snapshot_context['snapshot_id']}
return query_params
def gen_person_link(person_id, person_name, snapshot_context=None,
link_attrs=None):
"""
Utility function for generating a link to a person HTML view
to insert in Django templates.
Args:
person_id (int): a person id
person_name (str): the associated person name
link_attrs (dict): optional attributes (e.g. class)
to add to the link
Returns:
An HTML link in the form 'person_name'
"""
query_params = _snapshot_context_query_params(snapshot_context)
person_url = reverse('browse-person', url_args={'person_id': person_id},
query_params=query_params)
return gen_link(person_url, person_name or 'None', link_attrs)
def gen_revision_url(revision_id, snapshot_context=None):
"""
Utility function for generating an url to a revision.
Args:
revision_id (str): a revision id
snapshot_context (dict): if provided, generate snapshot-dependent
browsing url
Returns:
str: The url to browse the revision
"""
query_params = _snapshot_context_query_params(snapshot_context)
return reverse('browse-revision',
url_args={'sha1_git': revision_id},
query_params=query_params)
def gen_revision_link(revision_id, shorten_id=False, snapshot_context=None,
link_text='Browse',
link_attrs={'class': 'btn btn-default btn-sm',
'role': 'button'}):
"""
Utility function for generating a link to a revision HTML view
to insert in Django templates.
Args:
revision_id (str): a revision id
shorten_id (boolean): whether to shorten the revision id to 7
characters for the link text
snapshot_context (dict): if provided, generate snapshot-dependent
browsing link
link_text (str): optional text for the generated link
(the revision id will be used by default)
link_attrs (dict): optional attributes (e.g. class)
to add to the link
Returns:
str: An HTML link in the form 'revision_id'
"""
if not revision_id:
return None
revision_url = gen_revision_url(revision_id, snapshot_context)
if shorten_id:
return gen_link(revision_url, revision_id[:7], link_attrs)
else:
if not link_text:
link_text = revision_id
return gen_link(revision_url, link_text, link_attrs)
def gen_directory_link(sha1_git, snapshot_context=None, link_text='Browse',
link_attrs={'class': 'btn btn-default btn-sm',
'role': 'button'}):
"""
Utility function for generating a link to a directory HTML view
to insert in Django templates.
Args:
sha1_git (str): directory identifier
link_text (str): optional text for the generated link
(the directory id will be used by default)
link_attrs (dict): optional attributes (e.g. class)
to add to the link
Returns:
An HTML link in the form 'link_text'
"""
if not sha1_git:
return None
query_params = _snapshot_context_query_params(snapshot_context)
directory_url = reverse('browse-directory',
url_args={'sha1_git': sha1_git},
query_params=query_params)
if not link_text:
link_text = sha1_git
return gen_link(directory_url, link_text, link_attrs)
def gen_snapshot_link(snapshot_id, snapshot_context=None, link_text='Browse',
link_attrs={'class': 'btn btn-default btn-sm',
'role': 'button'}):
"""
Utility function for generating a link to a snapshot HTML view
to insert in Django templates.
Args:
snapshot_id (str): snapshot identifier
link_text (str): optional text for the generated link
(the snapshot id will be used by default)
link_attrs (dict): optional attributes (e.g. class)
to add to the link
Returns:
An HTML link in the form 'link_text'
"""
query_params = _snapshot_context_query_params(snapshot_context)
snapshot_url = reverse('browse-snapshot',
url_args={'snapshot_id': snapshot_id},
query_params=query_params)
if not link_text:
link_text = snapshot_id
return gen_link(snapshot_url, link_text, link_attrs)
def gen_content_link(sha1_git, snapshot_context=None, link_text='Browse',
link_attrs={'class': 'btn btn-default btn-sm',
'role': 'button'}):
"""
Utility function for generating a link to a content HTML view
to insert in Django templates.
Args:
sha1_git (str): content identifier
link_text (str): optional text for the generated link
(the content sha1_git will be used by default)
link_attrs (dict): optional attributes (e.g. class)
to add to the link
Returns:
An HTML link in the form 'link_text'
"""
if not sha1_git:
return None
query_params = _snapshot_context_query_params(snapshot_context)
content_url = reverse('browse-content',
url_args={'query_string': 'sha1_git:' + sha1_git},
query_params=query_params)
if not link_text:
link_text = sha1_git
return gen_link(content_url, link_text, link_attrs)
def get_revision_log_url(revision_id, snapshot_context=None):
"""
Utility function for getting the URL for a revision log HTML view
(possibly in the context of an origin).
Args:
revision_id (str): revision identifier the history heads to
snapshot_context (dict): if provided, generate snapshot-dependent
browsing link
Returns:
The revision log view URL
"""
query_params = {'revision': revision_id}
if snapshot_context and snapshot_context['origin_info']:
origin_info = snapshot_context['origin_info']
url_args = {'origin_url': origin_info['url']}
if 'timestamp' in snapshot_context['url_args']:
url_args['timestamp'] = \
snapshot_context['url_args']['timestamp']
if 'visit_id' in snapshot_context['query_params']:
query_params['visit_id'] = \
snapshot_context['query_params']['visit_id']
revision_log_url = reverse('browse-origin-log',
url_args=url_args,
query_params=query_params)
elif snapshot_context:
url_args = {'snapshot_id': snapshot_context['snapshot_id']}
revision_log_url = reverse('browse-snapshot-log',
url_args=url_args,
query_params=query_params)
else:
revision_log_url = reverse('browse-revision-log',
url_args={'sha1_git': revision_id})
return revision_log_url
def gen_revision_log_link(revision_id, snapshot_context=None,
link_text='Browse',
link_attrs={'class': 'btn btn-default btn-sm',
'role': 'button'}):
"""
Utility function for generating a link to a revision log HTML view
(possibly in the context of an origin) to insert in Django templates.
Args:
revision_id (str): revision identifier the history heads to
snapshot_context (dict): if provided, generate snapshot-dependent
browsing link
link_text (str): optional text to use for the generated link
(the revision id will be used by default)
link_attrs (dict): optional attributes (e.g. class)
to add to the link
Returns:
An HTML link in the form
'link_text'
"""
if not revision_id:
return None
revision_log_url = get_revision_log_url(revision_id, snapshot_context)
if not link_text:
link_text = revision_id
return gen_link(revision_log_url, link_text, link_attrs)
def gen_release_link(sha1_git, snapshot_context=None, link_text='Browse',
link_attrs={'class': 'btn btn-default btn-sm',
'role': 'button'}):
"""
Utility function for generating a link to a release HTML view
to insert in Django templates.
Args:
sha1_git (str): release identifier
link_text (str): optional text for the generated link
(the release id will be used by default)
link_attrs (dict): optional attributes (e.g. class)
to add to the link
Returns:
An HTML link in the form 'link_text'
"""
query_params = _snapshot_context_query_params(snapshot_context)
release_url = reverse('browse-release',
url_args={'sha1_git': sha1_git},
query_params=query_params)
if not link_text:
link_text = sha1_git
return gen_link(release_url, link_text, link_attrs)
def format_log_entries(revision_log, per_page, snapshot_context=None):
"""
Utility functions that process raw revision log data for HTML display.
Its purpose is to:
* add links to relevant browse views
* format date in human readable format
* truncate the message log
Args:
revision_log (list): raw revision log as returned by the swh-web api
per_page (int): number of log entries per page
snapshot_context (dict): if provided, generate snapshot-dependent
browsing link
"""
revision_log_data = []
for i, rev in enumerate(revision_log):
if i == per_page:
break
author_name = 'None'
author_fullname = 'None'
committer_fullname = 'None'
if rev['author']:
author_name = rev['author']['name'] or rev['author']['fullname']
author_fullname = rev['author']['fullname']
if rev['committer']:
committer_fullname = rev['committer']['fullname']
author_date = format_utc_iso_date(rev['date'])
committer_date = format_utc_iso_date(rev['committer_date'])
tooltip = 'revision %s\n' % rev['id']
tooltip += 'author: %s\n' % author_fullname
tooltip += 'author date: %s\n' % author_date
tooltip += 'committer: %s\n' % committer_fullname
tooltip += 'committer date: %s\n\n' % committer_date
if rev['message']:
tooltip += textwrap.indent(rev['message'], ' '*4)
revision_log_data.append({
'author': author_name,
'id': rev['id'][:7],
'message': rev['message'],
'date': author_date,
'commit_date': committer_date,
'url': gen_revision_url(rev['id'], snapshot_context),
'tooltip': tooltip
})
return revision_log_data
# list of origin types that can be found in the swh archive
# TODO: retrieve it dynamically in an efficient way instead
# of hardcoding it
_swh_origin_types = ['git', 'svn', 'deb', 'hg', 'ftp', 'deposit',
'pypi', 'npm']
def get_origin_info(origin_url, origin_type=None):
"""
Get info about a software origin.
Its main purpose is to automatically find an origin type
when it is not provided as parameter.
Args:
origin_url (str): complete url of a software origin
origin_type (str): optional origin type
Returns:
A dict with the following entries:
* type: the origin type
* url: the origin url
* id: the internal id of the origin
"""
if origin_type:
return service.lookup_origin({'type': origin_type,
'url': origin_url})
else:
for origin_type in _swh_origin_types:
try:
origin_info = service.lookup_origin({'type': origin_type,
'url': origin_url})
return origin_info
except Exception:
pass
- raise NotFoundExc('Origin with url %s not found!' % origin_url)
+ raise NotFoundExc('Origin with url %s not found!' % escape(origin_url))
def get_snapshot_context(snapshot_id=None, origin_type=None, origin_url=None,
timestamp=None, visit_id=None):
"""
Utility function to compute relevant information when navigating
the archive in a snapshot context. The snapshot is either
referenced by its id or it will be retrieved from an origin visit.
Args:
snapshot_id (str): hexadecimal representation of a snapshot identifier,
all other parameters will be ignored if it is provided
origin_type (str): the origin type (git, svn, deposit, ...)
origin_url (str): the origin_url (e.g. https://github.com/(user)/(repo)/)
timestamp (str): a datetime string for retrieving the closest
visit of the origin
visit_id (int): optional visit id for disambiguation in case
of several visits with the same timestamp
Returns:
A dict with the following entries:
* origin_info: dict containing origin information
* visit_info: dict containing visit information
* branches: the list of branches for the origin found
during the visit
* releases: the list of releases for the origin found
during the visit
* origin_browse_url: the url to browse the origin
* origin_branches_url: the url to browse the origin branches
* origin_releases_url': the url to browse the origin releases
* origin_visit_url: the url to browse the snapshot of the origin
found during the visit
* url_args: dict containing url arguments to use when browsing in
the context of the origin and its visit
Raises:
NotFoundExc: if no snapshot is found for the visit of an origin.
""" # noqa
origin_info = None
visit_info = None
url_args = None
query_params = {}
branches = []
releases = []
browse_url = None
visit_url = None
branches_url = None
releases_url = None
swh_type = 'snapshot'
if origin_url:
swh_type = 'origin'
origin_info = get_origin_info(origin_url, origin_type)
visit_info = get_origin_visit(origin_info, timestamp, visit_id,
snapshot_id)
fmt_date = format_utc_iso_date(visit_info['date'])
visit_info['fmt_date'] = fmt_date
snapshot_id = visit_info['snapshot']
if not snapshot_id:
raise NotFoundExc('No snapshot associated to the visit of origin '
- '%s on %s' % (origin_url, fmt_date))
+ '%s on %s' % (escape(origin_url), fmt_date))
# provided timestamp is not necessarily equals to the one
# of the retrieved visit, so get the exact one in order
# use it in the urls generated below
if timestamp:
timestamp = visit_info['date']
branches, releases = \
get_origin_visit_snapshot(origin_info, timestamp, visit_id,
snapshot_id)
url_args = {'origin_type': origin_type,
'origin_url': origin_info['url']}
query_params = {'visit_id': visit_id}
browse_url = reverse('browse-origin-visits',
url_args=url_args)
if timestamp:
url_args['timestamp'] = format_utc_iso_date(timestamp,
'%Y-%m-%dT%H:%M:%S')
visit_url = reverse('browse-origin-directory',
url_args=url_args,
query_params=query_params)
visit_info['url'] = visit_url
branches_url = reverse('browse-origin-branches',
url_args=url_args,
query_params=query_params)
releases_url = reverse('browse-origin-releases',
url_args=url_args,
query_params=query_params)
elif snapshot_id:
branches, releases = get_snapshot_content(snapshot_id)
url_args = {'snapshot_id': snapshot_id}
browse_url = reverse('browse-snapshot',
url_args=url_args)
branches_url = reverse('browse-snapshot-branches',
url_args=url_args)
releases_url = reverse('browse-snapshot-releases',
url_args=url_args)
releases = list(reversed(releases))
snapshot_size = service.lookup_snapshot_size(snapshot_id)
is_empty = sum(snapshot_size.values()) == 0
swh_snp_id = persistent_identifier('snapshot', snapshot_id)
return {
'swh_type': swh_type,
'swh_object_id': swh_snp_id,
'snapshot_id': snapshot_id,
'snapshot_size': snapshot_size,
'is_empty': is_empty,
'origin_info': origin_info,
# keep track if the origin type was provided as url argument
'origin_type': origin_type,
'visit_info': visit_info,
'branches': branches,
'releases': releases,
'branch': None,
'release': None,
'browse_url': browse_url,
'branches_url': branches_url,
'releases_url': releases_url,
'url_args': url_args,
'query_params': query_params
}
# list of common readme names ordered by preference
# (lower indices have higher priority)
_common_readme_names = [
"readme.markdown",
"readme.md",
"readme.rst",
"readme.txt",
"readme"
]
def get_readme_to_display(readmes):
"""
Process a list of readme files found in a directory
in order to find the adequate one to display.
Args:
readmes: a list of dict where keys are readme file names and values
are readme sha1s
Returns:
A tuple (readme_name, readme_sha1)
"""
readme_name = None
readme_url = None
readme_sha1 = None
readme_html = None
lc_readmes = {k.lower(): {'orig_name': k, 'sha1': v}
for k, v in readmes.items()}
# look for readme names according to the preference order
# defined by the _common_readme_names list
for common_readme_name in _common_readme_names:
if common_readme_name in lc_readmes:
readme_name = lc_readmes[common_readme_name]['orig_name']
readme_sha1 = lc_readmes[common_readme_name]['sha1']
readme_url = reverse('browse-content-raw',
url_args={'query_string': readme_sha1},
query_params={'reencode': 'true'})
break
# otherwise pick the first readme like file if any
if not readme_name and len(readmes.items()) > 0:
readme_name = next(iter(readmes))
readme_sha1 = readmes[readme_name]
readme_url = reverse('browse-content-raw',
url_args={'query_string': readme_sha1},
query_params={'reencode': 'true'})
# convert rst README to html server side as there is
# no viable solution to perform that task client side
if readme_name and readme_name.endswith('.rst'):
cache_entry_id = 'readme_%s' % readme_sha1
cache_entry = cache.get(cache_entry_id)
if cache_entry:
readme_html = cache_entry
else:
try:
rst_doc = request_content(readme_sha1)
readme_html = pypandoc.convert_text(rst_doc['raw_data'],
'html', format='rst')
cache.set(cache_entry_id, readme_html)
except Exception:
readme_html = 'Readme bytes are not available'
return readme_name, readme_url, readme_html
def get_swh_persistent_ids(swh_objects, snapshot_context=None):
"""
Returns a list of dict containing info related to persistent
identifiers of swh objects.
Args:
swh_objects (list): a list of dict with the following keys:
* type: swh object type (content/directory/release/revision/snapshot)
* id: swh object id
snapshot_context (dict): optional parameter describing the snapshot in which
the object has been found
Returns:
list: a list of dict with the following keys:
* object_type: the swh object type (content/directory/release/revision/snapshot)
* object_icon: the swh object icon to use in HTML views
* swh_id: the computed swh object persistent identifier
* swh_id_url: the url resolving the persistent identifier
* show_options: boolean indicating if the persistent id options must
be displayed in persistent ids HTML view
""" # noqa
swh_ids = []
for swh_object in swh_objects:
if not swh_object['id']:
continue
swh_id = get_swh_persistent_id(swh_object['type'], swh_object['id'])
show_options = swh_object['type'] == 'content' or \
(snapshot_context and snapshot_context['origin_info'] is not None)
object_icon = swh_object_icons[swh_object['type']]
swh_ids.append({
'object_type': swh_object['type'],
'object_icon': object_icon,
'swh_id': swh_id,
'swh_id_url': reverse('browse-swh-id',
url_args={'swh_id': swh_id}),
'show_options': show_options
})
return swh_ids
diff --git a/swh/web/browse/views/utils/snapshot_context.py b/swh/web/browse/views/utils/snapshot_context.py
index 911d5175..d6f21a64 100644
--- a/swh/web/browse/views/utils/snapshot_context.py
+++ b/swh/web/browse/views/utils/snapshot_context.py
@@ -1,912 +1,913 @@
# Copyright (C) 2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
# Utility module implementing Django views for browsing the archive
# in a snapshot context.
# Its purpose is to factorize code for the views reachable from the
# /origin/.* and /snapshot/.* endpoints.
from django.shortcuts import render
from django.template.defaultfilters import filesizeformat
+from django.utils.html import escape
from swh.model.identifiers import snapshot_identifier
from swh.web.browse.utils import (
get_snapshot_context, get_directory_entries, gen_directory_link,
gen_revision_link, request_content, gen_content_link,
prepare_content_for_display, content_display_max_size,
format_log_entries, gen_revision_log_link, gen_link,
get_readme_to_display, get_swh_persistent_ids,
gen_snapshot_link, process_snapshot_branches
)
from swh.web.common import service
from swh.web.common.exc import (
handle_view_exception, NotFoundExc
)
from swh.web.common.utils import (
reverse, gen_path_info, format_utc_iso_date, swh_object_icons
)
_empty_snapshot_id = snapshot_identifier({'branches': {}})
def _get_branch(branches, branch_name, snapshot_id):
"""
Utility function to get a specific branch from a branches list.
Its purpose is to get the default HEAD branch as some software origin
(e.g those with svn type) does not have it. In that latter case, check
if there is a master branch instead and returns it.
"""
filtered_branches = \
[b for b in branches if b['name'].endswith(branch_name)]
if len(filtered_branches) > 0:
return filtered_branches[0]
elif branch_name == 'HEAD':
filtered_branches = \
[b for b in branches if b['name'].endswith('master')]
if len(filtered_branches) > 0:
return filtered_branches[0]
elif len(branches) > 0:
return branches[0]
else:
# case where a large branches list has been truncated
snp = service.lookup_snapshot(snapshot_id,
branches_from=branch_name,
branches_count=1,
target_types=['revision', 'alias'])
snp_branch, _ = process_snapshot_branches(snp)
if snp_branch:
branches.append(snp_branch[0])
return snp_branch[0]
return None
def _get_release(releases, release_name):
"""
Utility function to get a specific release from a releases list.
Returns None if the release can not be found in the list.
"""
filtered_releases = \
[r for r in releases if r['name'] == release_name]
if len(filtered_releases) > 0:
return filtered_releases[0]
else:
return None
def _branch_not_found(branch_type, branch, branches, snapshot_id=None,
origin_info=None, timestamp=None, visit_id=None):
"""
Utility function to raise an exception when a specified branch/release
can not be found.
"""
if branch_type == 'branch':
branch_type = 'Branch'
branch_type_plural = 'branches'
else:
branch_type = 'Release'
branch_type_plural = 'releases'
if snapshot_id and len(branches) == 0:
msg = 'Snapshot with id %s has an empty list' \
' of %s!' % (snapshot_id, branch_type_plural)
elif snapshot_id:
msg = '%s %s for snapshot with id %s' \
' not found!' % (branch_type, branch, snapshot_id)
elif visit_id and len(branches) == 0:
msg = 'Origin with type %s and url %s' \
' for visit with id %s has an empty list' \
' of %s!' % (origin_info['type'], origin_info['url'], visit_id,
branch_type_plural)
elif visit_id:
msg = '%s %s associated to visit with' \
' id %s for origin with type %s and url %s' \
' not found!' % (branch_type, branch, visit_id,
origin_info['type'], origin_info['url'])
elif len(branches) == 0:
msg = 'Origin with type %s and url %s' \
' for visit with timestamp %s has an empty list' \
' of %s!' % (origin_info['type'], origin_info['url'],
timestamp, branch_type_plural)
else:
msg = '%s %s associated to visit with' \
' timestamp %s for origin with type %s' \
' and url %s not found!' % (branch_type, branch, timestamp,
origin_info['type'],
origin_info['url'])
- raise NotFoundExc(msg)
+ raise NotFoundExc(escape(msg))
def _process_snapshot_request(request, snapshot_id=None, origin_type=None,
origin_url=None, timestamp=None, path=None,
browse_context='directory'):
"""
Utility function to perform common input request processing
for snapshot context views.
"""
visit_id = request.GET.get('visit_id', None)
snapshot_context = get_snapshot_context(snapshot_id, origin_type,
origin_url, timestamp, visit_id)
swh_type = snapshot_context['swh_type']
origin_info = snapshot_context['origin_info']
branches = snapshot_context['branches']
releases = snapshot_context['releases']
url_args = snapshot_context['url_args']
query_params = snapshot_context['query_params']
if snapshot_context['visit_info']:
timestamp = format_utc_iso_date(snapshot_context['visit_info']['date'],
'%Y-%m-%dT%H:%M:%SZ')
snapshot_context['timestamp'] = \
format_utc_iso_date(snapshot_context['visit_info']['date'])
browse_view_name = 'browse-' + swh_type + '-' + browse_context
root_sha1_git = None
revision_id = request.GET.get('revision', None)
release_name = request.GET.get('release', None)
release_id = None
branch_name = None
snapshot_total_size = sum(snapshot_context['snapshot_size'].values())
if snapshot_total_size and revision_id:
revision = service.lookup_revision(revision_id)
root_sha1_git = revision['directory']
branches.append({'name': revision_id,
'revision': revision_id,
'directory': root_sha1_git,
'url': None})
branch_name = revision_id
query_params['revision'] = revision_id
elif snapshot_total_size and release_name:
release = _get_release(releases, release_name)
try:
root_sha1_git = release['directory']
revision_id = release['target']
release_id = release['id']
query_params['release'] = release_name
except Exception:
_branch_not_found("release", release_name, releases, snapshot_id,
origin_info, timestamp, visit_id)
elif snapshot_total_size:
branch_name = request.GET.get('branch', None)
if branch_name:
query_params['branch'] = branch_name
branch = _get_branch(branches, branch_name or 'HEAD',
snapshot_context['snapshot_id'])
try:
branch_name = branch['name']
revision_id = branch['revision']
root_sha1_git = branch['directory']
except Exception:
_branch_not_found("branch", branch_name, branches, snapshot_id,
origin_info, timestamp, visit_id)
for b in branches:
branch_url_args = dict(url_args)
branch_query_params = dict(query_params)
if 'release' in branch_query_params:
del branch_query_params['release']
branch_query_params['branch'] = b['name']
if path:
b['path'] = path
branch_url_args['path'] = path
b['url'] = reverse(browse_view_name,
url_args=branch_url_args,
query_params=branch_query_params)
for r in releases:
release_url_args = dict(url_args)
release_query_params = dict(query_params)
if 'branch' in release_query_params:
del release_query_params['branch']
release_query_params['release'] = r['name']
if path:
r['path'] = path
release_url_args['path'] = path
r['url'] = reverse(browse_view_name,
url_args=release_url_args,
query_params=release_query_params)
snapshot_context['query_params'] = query_params
snapshot_context['root_sha1_git'] = root_sha1_git
snapshot_context['revision_id'] = revision_id
snapshot_context['branch'] = branch_name
snapshot_context['release'] = release_name
snapshot_context['release_id'] = release_id
return snapshot_context
def browse_snapshot_directory(request, snapshot_id=None, origin_type=None,
origin_url=None, timestamp=None, path=None):
"""
Django view implementation for browsing a directory in a snapshot context.
"""
try:
snapshot_context = _process_snapshot_request(request, snapshot_id,
origin_type, origin_url,
timestamp, path,
browse_context='directory') # noqa
root_sha1_git = snapshot_context['root_sha1_git']
sha1_git = root_sha1_git
if root_sha1_git and path:
dir_info = service.lookup_directory_with_path(root_sha1_git, path)
sha1_git = dir_info['target']
dirs = []
files = []
if sha1_git:
dirs, files = get_directory_entries(sha1_git)
except Exception as exc:
return handle_view_exception(request, exc)
swh_type = snapshot_context['swh_type']
origin_info = snapshot_context['origin_info']
visit_info = snapshot_context['visit_info']
url_args = snapshot_context['url_args']
query_params = snapshot_context['query_params']
revision_id = snapshot_context['revision_id']
snapshot_id = snapshot_context['snapshot_id']
path_info = gen_path_info(path)
browse_view_name = 'browse-' + swh_type + '-directory'
breadcrumbs = []
if root_sha1_git:
breadcrumbs.append({'name': root_sha1_git[:7],
'url': reverse(browse_view_name,
url_args=url_args,
query_params=query_params)})
for pi in path_info:
bc_url_args = dict(url_args)
bc_url_args['path'] = pi['path']
breadcrumbs.append({'name': pi['name'],
'url': reverse(browse_view_name,
url_args=bc_url_args,
query_params=query_params)})
path = '' if path is None else (path + '/')
for d in dirs:
if d['type'] == 'rev':
d['url'] = reverse('browse-revision',
url_args={'sha1_git': d['target']})
else:
bc_url_args = dict(url_args)
bc_url_args['path'] = path + d['name']
d['url'] = reverse(browse_view_name,
url_args=bc_url_args,
query_params=query_params)
sum_file_sizes = 0
readmes = {}
browse_view_name = 'browse-' + swh_type + '-content'
for f in files:
bc_url_args = dict(url_args)
bc_url_args['path'] = path + f['name']
f['url'] = reverse(browse_view_name,
url_args=bc_url_args,
query_params=query_params)
if f['length'] is not None:
sum_file_sizes += f['length']
f['length'] = filesizeformat(f['length'])
if f['name'].lower().startswith('readme'):
readmes[f['name']] = f['checksums']['sha1']
readme_name, readme_url, readme_html = get_readme_to_display(readmes)
browse_view_name = 'browse-' + swh_type + '-log'
history_url = None
if snapshot_id != _empty_snapshot_id:
history_url = reverse(browse_view_name,
url_args=url_args,
query_params=query_params)
nb_files = None
nb_dirs = None
dir_path = None
if root_sha1_git:
nb_files = len(files)
nb_dirs = len(dirs)
sum_file_sizes = filesizeformat(sum_file_sizes)
dir_path = '/' + path
browse_dir_link = gen_directory_link(sha1_git)
browse_rev_link = gen_revision_link(revision_id)
browse_snp_link = gen_snapshot_link(snapshot_id)
dir_metadata = {'directory': sha1_git,
'context-independent directory': browse_dir_link,
'number of regular files': nb_files,
'number of subdirectories': nb_dirs,
'sum of regular file sizes': sum_file_sizes,
'path': dir_path,
'revision': revision_id,
'context-independent revision': browse_rev_link,
'snapshot': snapshot_id,
'context-independent snapshot': browse_snp_link}
if origin_info:
dir_metadata['origin type'] = origin_info['type']
dir_metadata['origin url'] = origin_info['url']
dir_metadata['origin visit date'] = format_utc_iso_date(visit_info['date']) # noqa
vault_cooking = {
'directory_context': True,
'directory_id': sha1_git,
'revision_context': True,
'revision_id': revision_id
}
swh_objects = [{'type': 'directory',
'id': sha1_git},
{'type': 'revision',
'id': revision_id},
{'type': 'snapshot',
'id': snapshot_id}]
release_id = snapshot_context['release_id']
if release_id:
swh_objects.append({'type': 'release',
'id': release_id})
swh_ids = get_swh_persistent_ids(swh_objects, snapshot_context)
dir_path = '/'.join([bc['name'] for bc in breadcrumbs]) + '/'
context_found = 'snapshot: %s' % snapshot_context['snapshot_id']
if origin_info:
context_found = 'origin: %s' % origin_info['url']
heading = 'Directory - %s - %s - %s' %\
(dir_path, snapshot_context['branch'], context_found)
return render(request, 'browse/directory.html',
{'heading': heading,
'swh_object_name': 'Directory',
'swh_object_metadata': dir_metadata,
'dirs': dirs,
'files': files,
'breadcrumbs': breadcrumbs if root_sha1_git else [],
'top_right_link': {
'url': history_url,
'icon': swh_object_icons['revisions history'],
'text': 'History'
},
'readme_name': readme_name,
'readme_url': readme_url,
'readme_html': readme_html,
'snapshot_context': snapshot_context,
'vault_cooking': vault_cooking,
'show_actions_menu': True,
'swh_ids': swh_ids})
def browse_snapshot_content(request, snapshot_id=None, origin_type=None,
origin_url=None, timestamp=None, path=None):
"""
Django view implementation for browsing a content in a snapshot context.
"""
try:
snapshot_context = _process_snapshot_request(request, snapshot_id,
origin_type, origin_url,
timestamp, path,
browse_context='content')
root_sha1_git = snapshot_context['root_sha1_git']
sha1_git = None
query_string = None
content_data = None
split_path = path.split('/')
filename = split_path[-1]
filepath = path[:-len(filename)]
if root_sha1_git:
content_info = service.lookup_directory_with_path(root_sha1_git,
path)
sha1_git = content_info['target']
query_string = 'sha1_git:' + sha1_git
content_data = request_content(query_string,
raise_if_unavailable=False)
if filepath:
dir_info = service.lookup_directory_with_path(root_sha1_git,
filepath)
directory_id = dir_info['target']
else:
directory_id = root_sha1_git
except Exception as exc:
return handle_view_exception(request, exc)
swh_type = snapshot_context['swh_type']
url_args = snapshot_context['url_args']
query_params = snapshot_context['query_params']
revision_id = snapshot_context['revision_id']
origin_info = snapshot_context['origin_info']
visit_info = snapshot_context['visit_info']
snapshot_id = snapshot_context['snapshot_id']
content = None
language = None
mimetype = None
if content_data and content_data['raw_data'] is not None:
content_display_data = prepare_content_for_display(
content_data['raw_data'], content_data['mimetype'], path)
content = content_display_data['content_data']
language = content_display_data['language']
mimetype = content_display_data['mimetype']
browse_view_name = 'browse-' + swh_type + '-directory'
breadcrumbs = []
path_info = gen_path_info(filepath)
if root_sha1_git:
breadcrumbs.append({'name': root_sha1_git[:7],
'url': reverse(browse_view_name,
url_args=url_args,
query_params=query_params)})
for pi in path_info:
bc_url_args = dict(url_args)
bc_url_args['path'] = pi['path']
breadcrumbs.append({'name': pi['name'],
'url': reverse(browse_view_name,
url_args=bc_url_args,
query_params=query_params)})
breadcrumbs.append({'name': filename,
'url': None})
browse_content_link = gen_content_link(sha1_git)
content_raw_url = None
if query_string:
content_raw_url = reverse('browse-content-raw',
url_args={'query_string': query_string},
query_params={'filename': filename})
browse_rev_link = gen_revision_link(revision_id)
browse_dir_link = gen_directory_link(directory_id)
content_metadata = {
'context-independent content': browse_content_link,
'path': None,
'filename': None,
'directory': directory_id,
'context-independent directory': browse_dir_link,
'revision': revision_id,
'context-independent revision': browse_rev_link,
'snapshot': snapshot_id
}
cnt_sha1_git = None
content_size = None
error_code = 200
error_description = ''
error_message = ''
if content_data:
content_metadata['sha1'] = \
content_data['checksums']['sha1']
content_metadata['sha1_git'] = \
content_data['checksums']['sha1_git']
content_metadata['sha256'] = \
content_data['checksums']['sha256']
content_metadata['blake2s256'] = \
content_data['checksums']['blake2s256']
content_metadata['mimetype'] = content_data['mimetype']
content_metadata['encoding'] = content_data['encoding']
content_metadata['size'] = filesizeformat(content_data['length'])
content_metadata['language'] = content_data['language']
content_metadata['licenses'] = content_data['licenses']
content_metadata['path'] = '/' + filepath
content_metadata['filename'] = filename
cnt_sha1_git = content_data['checksums']['sha1_git']
content_size = content_data['length']
error_code = content_data['error_code']
error_message = content_data['error_message']
error_description = content_data['error_description']
if origin_info:
content_metadata['origin type'] = origin_info['type']
content_metadata['origin url'] = origin_info['url']
content_metadata['origin visit date'] = format_utc_iso_date(visit_info['date']) # noqa
browse_snapshot_url = reverse('browse-snapshot-content',
url_args={'snapshot_id': snapshot_id,
'path': path},
query_params=request.GET)
browse_snapshot_link = gen_link(browse_snapshot_url)
content_metadata['context-independent snapshot'] = browse_snapshot_link
swh_objects = [{'type': 'content',
'id': cnt_sha1_git},
{'type': 'revision',
'id': revision_id},
{'type': 'snapshot',
'id': snapshot_id}]
release_id = snapshot_context['release_id']
if release_id:
swh_objects.append({'type': 'release',
'id': release_id})
swh_ids = get_swh_persistent_ids(swh_objects, snapshot_context)
content_path = '/'.join([bc['name'] for bc in breadcrumbs])
context_found = 'snapshot: %s' % snapshot_context['snapshot_id']
if origin_info:
context_found = 'origin: %s' % origin_info['url']
heading = 'Content - %s - %s - %s' %\
(content_path, snapshot_context['branch'], context_found)
return render(request, 'browse/content.html',
{'heading': heading,
'swh_object_name': 'Content',
'swh_object_metadata': content_metadata,
'content': content,
'content_size': content_size,
'max_content_size': content_display_max_size,
'mimetype': mimetype,
'language': language,
'breadcrumbs': breadcrumbs if root_sha1_git else [],
'top_right_link': {
'url': content_raw_url,
'icon': swh_object_icons['content'],
'text': 'Raw File'
},
'snapshot_context': snapshot_context,
'vault_cooking': None,
'show_actions_menu': True,
'swh_ids': swh_ids,
'error_code': error_code,
'error_message': error_message,
'error_description': error_description},
status=error_code)
PER_PAGE = 100
def browse_snapshot_log(request, snapshot_id=None, origin_type=None,
origin_url=None, timestamp=None):
"""
Django view implementation for browsing a revision history in a
snapshot context.
"""
try:
snapshot_context = _process_snapshot_request(request, snapshot_id,
origin_type, origin_url,
timestamp, browse_context='log') # noqa
revision_id = snapshot_context['revision_id']
per_page = int(request.GET.get('per_page', PER_PAGE))
offset = int(request.GET.get('offset', 0))
revs_ordering = request.GET.get('revs_ordering', 'committer_date')
session_key = 'rev_%s_log_ordering_%s' % (revision_id, revs_ordering)
rev_log_session = request.session.get(session_key, None)
rev_log = []
revs_walker_state = None
if rev_log_session:
rev_log = rev_log_session['rev_log']
revs_walker_state = rev_log_session['revs_walker_state']
if len(rev_log) < offset+per_page:
revs_walker = \
service.get_revisions_walker(revs_ordering,
revision_id,
max_revs=offset+per_page+1,
state=revs_walker_state)
rev_log += list(revs_walker)
revs_walker_state = revs_walker.export_state()
revision_log = rev_log[offset:offset+per_page]
request.session[session_key] = {
'rev_log': rev_log,
'revs_walker_state': revs_walker_state
}
except Exception as exc:
return handle_view_exception(request, exc)
swh_type = snapshot_context['swh_type']
origin_info = snapshot_context['origin_info']
visit_info = snapshot_context['visit_info']
url_args = snapshot_context['url_args']
query_params = snapshot_context['query_params']
snapshot_id = snapshot_context['snapshot_id']
query_params['per_page'] = per_page
revs_ordering = request.GET.get('revs_ordering', '')
query_params['revs_ordering'] = revs_ordering
browse_view_name = 'browse-' + swh_type + '-log'
prev_log_url = None
if len(rev_log) > offset + per_page:
query_params['offset'] = offset + per_page
prev_log_url = reverse(browse_view_name,
url_args=url_args,
query_params=query_params)
next_log_url = None
if offset != 0:
query_params['offset'] = offset - per_page
next_log_url = reverse(browse_view_name,
url_args=url_args,
query_params=query_params)
revision_log_data = format_log_entries(revision_log, per_page,
snapshot_context)
browse_rev_link = gen_revision_link(revision_id)
browse_log_link = gen_revision_log_link(revision_id)
browse_snp_link = gen_snapshot_link(snapshot_id)
revision_metadata = {
'context-independent revision': browse_rev_link,
'context-independent revision history': browse_log_link,
'context-independent snapshot': browse_snp_link,
'snapshot': snapshot_id
}
if origin_info:
revision_metadata['origin type'] = origin_info['type']
revision_metadata['origin url'] = origin_info['url']
revision_metadata['origin visit date'] = format_utc_iso_date(visit_info['date']) # noqa
swh_objects = [{'type': 'revision',
'id': revision_id},
{'type': 'snapshot',
'id': snapshot_id}]
release_id = snapshot_context['release_id']
if release_id:
swh_objects.append({'type': 'release',
'id': release_id})
swh_ids = get_swh_persistent_ids(swh_objects, snapshot_context)
context_found = 'snapshot: %s' % snapshot_context['snapshot_id']
if origin_info:
context_found = 'origin: %s' % origin_info['url']
heading = 'Revision history - %s - %s' %\
(snapshot_context['branch'], context_found)
return render(request, 'browse/revision-log.html',
{'heading': heading,
'swh_object_name': 'Revisions history',
'swh_object_metadata': revision_metadata,
'revision_log': revision_log_data,
'revs_ordering': revs_ordering,
'next_log_url': next_log_url,
'prev_log_url': prev_log_url,
'breadcrumbs': None,
'top_right_link': None,
'snapshot_context': snapshot_context,
'vault_cooking': None,
'show_actions_menu': True,
'swh_ids': swh_ids})
def browse_snapshot_branches(request, snapshot_id=None, origin_type=None,
origin_url=None, timestamp=None):
"""
Django view implementation for browsing a list of branches in a snapshot
context.
"""
try:
snapshot_context = _process_snapshot_request(request, snapshot_id,
origin_type, origin_url,
timestamp)
branches_bc = request.GET.get('branches_breadcrumbs', '')
branches_bc = \
branches_bc.split(',') if branches_bc else []
branches_from = branches_bc[-1] if branches_bc else ''
swh_type = snapshot_context['swh_type']
origin_info = snapshot_context['origin_info']
url_args = snapshot_context['url_args']
query_params = snapshot_context['query_params']
browse_view_name = 'browse-' + swh_type + '-directory'
snapshot = \
service.lookup_snapshot(snapshot_context['snapshot_id'],
branches_from, PER_PAGE+1,
target_types=['revision', 'alias'])
displayed_branches, _ = process_snapshot_branches(snapshot)
except Exception as exc:
return handle_view_exception(request, exc)
for branch in displayed_branches:
if snapshot_id:
revision_url = reverse('browse-revision',
url_args={'sha1_git': branch['revision']},
query_params={'snapshot_id': snapshot_id})
else:
revision_url = reverse('browse-revision',
url_args={'sha1_git': branch['revision']},
query_params={'origin_type': origin_type,
'origin': origin_info['url']})
query_params['branch'] = branch['name']
directory_url = reverse(browse_view_name,
url_args=url_args,
query_params=query_params)
del query_params['branch']
branch['revision_url'] = revision_url
branch['directory_url'] = directory_url
browse_view_name = 'browse-' + swh_type + '-branches'
prev_branches_url = None
next_branches_url = None
if branches_bc:
query_params_prev = dict(query_params)
query_params_prev['branches_breadcrumbs'] = \
','.join(branches_bc[:-1])
prev_branches_url = reverse(browse_view_name, url_args=url_args,
query_params=query_params_prev)
elif branches_from:
prev_branches_url = reverse(browse_view_name, url_args=url_args,
query_params=query_params)
if len(displayed_branches) > PER_PAGE:
query_params_next = dict(query_params)
next_branch = displayed_branches[-1]['name']
del displayed_branches[-1]
branches_bc.append(next_branch)
query_params_next['branches_breadcrumbs'] = \
','.join(branches_bc)
next_branches_url = reverse(browse_view_name, url_args=url_args,
query_params=query_params_next)
heading = 'Branches - '
if origin_info:
heading += 'origin: %s' % origin_info['url']
else:
heading += 'snapshot: %s' % snapshot_id
return render(request, 'browse/branches.html',
{'heading': heading,
'swh_object_name': 'Branches',
'swh_object_metadata': {},
'top_right_link': None,
'displayed_branches': displayed_branches,
'prev_branches_url': prev_branches_url,
'next_branches_url': next_branches_url,
'snapshot_context': snapshot_context})
def browse_snapshot_releases(request, snapshot_id=None, origin_type=None,
origin_url=None, timestamp=None):
"""
Django view implementation for browsing a list of releases in a snapshot
context.
"""
try:
snapshot_context = _process_snapshot_request(request, snapshot_id,
origin_type, origin_url,
timestamp)
rel_bc = request.GET.get('releases_breadcrumbs', '')
rel_bc = \
rel_bc.split(',') if rel_bc else []
rel_from = rel_bc[-1] if rel_bc else ''
swh_type = snapshot_context['swh_type']
origin_info = snapshot_context['origin_info']
url_args = snapshot_context['url_args']
query_params = snapshot_context['query_params']
snapshot = \
service.lookup_snapshot(snapshot_context['snapshot_id'],
rel_from, PER_PAGE+1,
target_types=['release', 'alias'])
_, displayed_releases = process_snapshot_branches(snapshot)
except Exception as exc:
return handle_view_exception(request, exc)
for release in displayed_releases:
if snapshot_id:
query_params_tgt = {'snapshot_id': snapshot_id}
else:
query_params_tgt = {'origin': origin_info['url']}
release_url = reverse('browse-release',
url_args={'sha1_git': release['id']},
query_params=query_params_tgt)
target_url = ''
if release['target_type'] == 'revision':
target_url = reverse('browse-revision',
url_args={'sha1_git': release['target']},
query_params=query_params_tgt)
elif release['target_type'] == 'directory':
target_url = reverse('browse-directory',
url_args={'sha1_git': release['target']},
query_params=query_params_tgt)
elif release['target_type'] == 'content':
target_url = reverse('browse-content',
url_args={'query_string': release['target']},
query_params=query_params_tgt)
elif release['target_type'] == 'release':
target_url = reverse('browse-release',
url_args={'sha1_git': release['target']},
query_params=query_params_tgt)
release['release_url'] = release_url
release['target_url'] = target_url
browse_view_name = 'browse-' + swh_type + '-releases'
prev_releases_url = None
next_releases_url = None
if rel_bc:
query_params_prev = dict(query_params)
query_params_prev['releases_breadcrumbs'] = \
','.join(rel_bc[:-1])
prev_releases_url = reverse(browse_view_name, url_args=url_args,
query_params=query_params_prev)
elif rel_from:
prev_releases_url = reverse(browse_view_name, url_args=url_args,
query_params=query_params)
if len(displayed_releases) > PER_PAGE:
query_params_next = dict(query_params)
next_rel = displayed_releases[-1]['branch_name']
del displayed_releases[-1]
rel_bc.append(next_rel)
query_params_next['releases_breadcrumbs'] = \
','.join(rel_bc)
next_releases_url = reverse(browse_view_name, url_args=url_args,
query_params=query_params_next)
heading = 'Releases - '
if origin_info:
heading += 'origin: %s' % origin_info['url']
else:
heading += 'snapshot: %s' % snapshot_id
return render(request, 'browse/releases.html',
{'heading': heading,
'top_panel_visible': False,
'top_panel_collapsible': False,
'swh_object_name': 'Releases',
'swh_object_metadata': {},
'top_right_link': None,
'displayed_releases': displayed_releases,
'prev_releases_url': prev_releases_url,
'next_releases_url': next_releases_url,
'snapshot_context': snapshot_context,
'vault_cooking': None,
'show_actions_menu': False})
diff --git a/swh/web/common/exc.py b/swh/web/common/exc.py
index 61fa81ca..9fd94202 100644
--- a/swh/web/common/exc.py
+++ b/swh/web/common/exc.py
@@ -1,122 +1,123 @@
# Copyright (C) 2015-2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import traceback
from django.http import HttpResponse
from django.shortcuts import render
from django.utils.safestring import mark_safe
+from django.utils.html import escape
from swh.web.config import get_config
class BadInputExc(ValueError):
"""Wrong request to the api.
Example: Asking a content with the wrong identifier format.
"""
pass
class NotFoundExc(Exception):
"""Good request to the api but no result were found.
Example: Asking a content with the right identifier format but
that content does not exist.
"""
pass
class ForbiddenExc(Exception):
"""Good request to the api, forbidden result to return due to enforce
policy.
Example: Asking for a raw content which exists but whose mimetype
is not text.
"""
pass
http_status_code_message = {
400: 'Bad Request',
401: 'Unauthorized',
403: 'Access Denied',
404: 'Resource not found',
500: 'Internal Server Error',
501: 'Not Implemented',
502: 'Bad Gateway',
503: 'Service unavailable'
}
def _generate_error_page(request, error_code, error_description):
return render(request, 'error.html',
{'error_code': error_code,
'error_message': http_status_code_message[error_code],
'error_description': mark_safe(error_description)},
status=error_code)
def swh_handle400(request):
"""
Custom Django HTTP error 400 handler for swh-web.
"""
error_description = 'The server cannot process the request to %s due to '\
'something that is perceived to be a client error.' %\
- request.META['PATH_INFO']
+ escape(request.META['PATH_INFO'])
return _generate_error_page(request, 400, error_description)
def swh_handle403(request):
"""
Custom Django HTTP error 403 handler for swh-web.
"""
error_description = 'The resource %s requires an authentication.' %\
- request.META['PATH_INFO']
+ escape(request.META['PATH_INFO'])
return _generate_error_page(request, 403, error_description)
def swh_handle404(request):
"""
Custom Django HTTP error 404 handler for swh-web.
"""
error_description = 'The resource %s could not be found on the server.' %\
- request.META['PATH_INFO']
+ escape(request.META['PATH_INFO'])
return _generate_error_page(request, 404, error_description)
def swh_handle500(request):
"""
Custom Django HTTP error 500 handler for swh-web.
"""
error_description = 'An unexpected condition was encountered when '\
'requesting resource %s.' %\
- request.META['PATH_INFO']
+ escape(request.META['PATH_INFO'])
return _generate_error_page(request, 500, error_description)
def handle_view_exception(request, exc, html_response=True):
"""
Function used to generate an error page when an exception
was raised inside a swh-web browse view.
"""
error_code = 500
error_description = '%s: %s' % (type(exc).__name__, str(exc))
if get_config()['debug']:
error_description = traceback.format_exc()
if isinstance(exc, BadInputExc):
error_code = 400
if isinstance(exc, ForbiddenExc):
error_code = 403
if isinstance(exc, NotFoundExc):
error_code = 404
if html_response:
return _generate_error_page(request, error_code, error_description)
else:
return HttpResponse(error_description, content_type='text/plain',
status=error_code)
diff --git a/swh/web/common/origin_save.py b/swh/web/common/origin_save.py
index ab8777d8..b9e853cb 100644
--- a/swh/web/common/origin_save.py
+++ b/swh/web/common/origin_save.py
@@ -1,400 +1,401 @@
# Copyright (C) 2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from bisect import bisect_right
from datetime import datetime, timezone
from django.core.exceptions import ObjectDoesNotExist
from django.core.exceptions import ValidationError
from django.core.validators import URLValidator
+from django.utils.html import escape
from swh.web import config
from swh.web.common import service
from swh.web.common.exc import BadInputExc, ForbiddenExc, NotFoundExc
from swh.web.common.models import (
SaveUnauthorizedOrigin, SaveAuthorizedOrigin, SaveOriginRequest,
SAVE_REQUEST_ACCEPTED, SAVE_REQUEST_REJECTED, SAVE_REQUEST_PENDING,
SAVE_TASK_NOT_YET_SCHEDULED, SAVE_TASK_SCHEDULED,
SAVE_TASK_SUCCEED, SAVE_TASK_FAILED, SAVE_TASK_RUNNING
)
from swh.web.common.origin_visits import get_origin_visits
from swh.web.common.utils import parse_timestamp
from swh.scheduler.utils import create_oneshot_task_dict
scheduler = config.scheduler()
def get_origin_save_authorized_urls():
"""
Get the list of origin url prefixes authorized to be
immediately loaded into the archive (whitelist).
Returns:
list: The list of authorized origin url prefix
"""
return [origin.url
for origin in SaveAuthorizedOrigin.objects.all()]
def get_origin_save_unauthorized_urls():
"""
Get the list of origin url prefixes forbidden to be
loaded into the archive (blacklist).
Returns:
list: the list of unauthorized origin url prefix
"""
return [origin.url
for origin in SaveUnauthorizedOrigin.objects.all()]
def can_save_origin(origin_url):
"""
Check if a software origin can be saved into the archive.
Based on the origin url, the save request will be either:
* immediately accepted if the url is whitelisted
* rejected if the url is blacklisted
* put in pending state for manual review otherwise
Args:
origin_url (str): the software origin url to check
Returns:
str: the origin save request status, either **accepted**,
**rejected** or **pending**
"""
# origin url may be blacklisted
for url_prefix in get_origin_save_unauthorized_urls():
if origin_url.startswith(url_prefix):
return SAVE_REQUEST_REJECTED
# if the origin url is in the white list, it can be immediately saved
for url_prefix in get_origin_save_authorized_urls():
if origin_url.startswith(url_prefix):
return SAVE_REQUEST_ACCEPTED
# otherwise, the origin url needs to be manually verified
return SAVE_REQUEST_PENDING
# map origin type to scheduler task
# TODO: do not hardcode the task name here (T1157)
_origin_type_task = {
'git': 'load-git',
'hg': 'load-hg',
'svn': 'load-svn'
}
# map scheduler task status to origin save status
_save_task_status = {
'next_run_not_scheduled': SAVE_TASK_NOT_YET_SCHEDULED,
'next_run_scheduled': SAVE_TASK_SCHEDULED,
'completed': SAVE_TASK_SUCCEED,
'disabled': SAVE_TASK_FAILED
}
def get_savable_origin_types():
return sorted(list(_origin_type_task.keys()))
def _check_origin_type_savable(origin_type):
"""
Get the list of software origin types that can be loaded
through a save request.
Returns:
list: the list of saveable origin types
"""
allowed_origin_types = ', '.join(get_savable_origin_types())
if origin_type not in _origin_type_task:
raise BadInputExc('Origin of type %s can not be saved! '
'Allowed types are the following: %s' %
(origin_type, allowed_origin_types))
_validate_url = URLValidator(schemes=['http', 'https', 'svn', 'git'])
def _check_origin_url_valid(origin_url):
try:
_validate_url(origin_url)
except ValidationError:
raise BadInputExc('The provided origin url (%s) is not valid!' %
- origin_url)
+ escape(origin_url))
def _get_visit_info_for_save_request(save_request):
visit_date = None
visit_status = None
try:
origin = {'type': save_request.origin_type,
'url': save_request.origin_url}
origin_info = service.lookup_origin(origin)
origin_visits = get_origin_visits(origin_info)
visit_dates = [parse_timestamp(v['date'])
for v in origin_visits]
i = bisect_right(visit_dates, save_request.request_date)
if i != len(visit_dates):
visit_date = visit_dates[i]
visit_status = origin_visits[i]['status']
if origin_visits[i]['status'] == 'ongoing':
visit_date = None
except Exception:
pass
return visit_date, visit_status
def _check_visit_update_status(save_request, save_task_status):
visit_date, visit_status = _get_visit_info_for_save_request(save_request)
save_request.visit_date = visit_date
# visit has been performed, mark the saving task as succeed
if visit_date and visit_status is not None:
save_task_status = SAVE_TASK_SUCCEED
elif visit_status == 'ongoing':
save_task_status = SAVE_TASK_RUNNING
else:
time_now = datetime.now(tz=timezone.utc)
time_delta = time_now - save_request.request_date
# consider the task as failed if it is still in scheduled state
# 30 days after its submission
if time_delta.days > 30:
save_task_status = SAVE_TASK_FAILED
return visit_date, save_task_status
def _save_request_dict(save_request, task=None):
must_save = False
visit_date = save_request.visit_date
# save task still in scheduler db
if task:
save_task_status = _save_task_status[task['status']]
if save_task_status in (SAVE_TASK_FAILED, SAVE_TASK_SUCCEED) \
and not visit_date:
visit_date, _ = _get_visit_info_for_save_request(save_request)
save_request.visit_date = visit_date
must_save = True
# Ensure last origin visit is available in database
# before reporting the task execution as successful
if save_task_status == SAVE_TASK_SUCCEED and not visit_date:
save_task_status = SAVE_TASK_SCHEDULED
# Check tasks still marked as scheduled / not yet scheduled
if save_task_status in (SAVE_TASK_SCHEDULED,
SAVE_TASK_NOT_YET_SCHEDULED):
visit_date, save_task_status = _check_visit_update_status(
save_request, save_task_status)
# save task may have been archived
else:
save_task_status = save_request.loading_task_status
if save_task_status in (SAVE_TASK_SCHEDULED,
SAVE_TASK_NOT_YET_SCHEDULED):
visit_date, save_task_status = _check_visit_update_status(
save_request, save_task_status)
else:
save_task_status = save_request.loading_task_status
if save_request.loading_task_status != save_task_status:
save_request.loading_task_status = save_task_status
must_save = True
if must_save:
save_request.save()
return {'id': save_request.id,
'origin_type': save_request.origin_type,
'origin_url': save_request.origin_url,
'save_request_date': save_request.request_date.isoformat(),
'save_request_status': save_request.status,
'save_task_status': save_task_status,
'visit_date': visit_date.isoformat() if visit_date else None}
def create_save_origin_request(origin_type, origin_url):
"""
Create a loading task to save a software origin into the archive.
This function aims to create a software origin loading task
trough the use of the swh-scheduler component.
First, some checks are performed to see if the origin type and
url are valid but also if the the save request can be accepted.
If those checks passed, the loading task is then created.
Otherwise, the save request is put in pending or rejected state.
All the submitted save requests are logged into the swh-web
database to keep track of them.
Args:
origin_type (str): the type of origin to save (currently only
``git`` but ``svn`` and ``hg`` will soon be available)
origin_url (str): the url of the origin to save
Raises:
BadInputExc: the origin type or url is invalid
ForbiddenExc: the provided origin url is blacklisted
Returns:
dict: A dict describing the save request with the following keys:
* **origin_type**: the type of the origin to save
* **origin_url**: the url of the origin
* **save_request_date**: the date the request was submitted
* **save_request_status**: the request status, either **accepted**,
**rejected** or **pending**
* **save_task_status**: the origin loading task status, either
**not created**, **not yet scheduled**, **scheduled**,
**succeed** or **failed**
"""
_check_origin_type_savable(origin_type)
_check_origin_url_valid(origin_url)
save_request_status = can_save_origin(origin_url)
task = None
# if the origin save request is accepted, create a scheduler
# task to load it into the archive
if save_request_status == SAVE_REQUEST_ACCEPTED:
# create a task with high priority
kwargs = {'priority': 'high'}
# set task parameters according to the origin type
if origin_type == 'git':
kwargs['repo_url'] = origin_url
elif origin_type == 'hg':
kwargs['origin_url'] = origin_url
elif origin_type == 'svn':
kwargs['origin_url'] = origin_url
kwargs['svn_url'] = origin_url
sor = None
# get list of previously sumitted save requests
current_sors = \
list(SaveOriginRequest.objects.filter(origin_type=origin_type,
origin_url=origin_url))
can_create_task = False
# if no save requests previously submitted, create the scheduler task
if not current_sors:
can_create_task = True
else:
# get the latest submitted save request
sor = current_sors[0]
# if it was in pending state, we need to create the scheduler task
# and update the save request info in the database
if sor.status == SAVE_REQUEST_PENDING:
can_create_task = True
# a task has already been created to load the origin
elif sor.loading_task_id != -1:
# get the scheduler task and its status
tasks = scheduler.get_tasks([sor.loading_task_id])
task = tasks[0] if tasks else None
task_status = _save_request_dict(sor, task)['save_task_status']
# create a new scheduler task only if the previous one has been
# already executed
if task_status == SAVE_TASK_FAILED or \
task_status == SAVE_TASK_SUCCEED:
can_create_task = True
sor = None
else:
can_create_task = False
if can_create_task:
# effectively create the scheduler task
task_dict = create_oneshot_task_dict(
_origin_type_task[origin_type], **kwargs)
task = scheduler.create_tasks([task_dict])[0]
# pending save request has been accepted
if sor:
sor.status = SAVE_REQUEST_ACCEPTED
sor.loading_task_id = task['id']
sor.save()
else:
sor = SaveOriginRequest.objects.create(origin_type=origin_type,
origin_url=origin_url,
status=save_request_status, # noqa
loading_task_id=task['id']) # noqa
# save request must be manually reviewed for acceptation
elif save_request_status == SAVE_REQUEST_PENDING:
# check if there is already such a save request already submitted,
# no need to add it to the database in that case
try:
sor = SaveOriginRequest.objects.get(origin_type=origin_type,
origin_url=origin_url,
status=save_request_status)
# if not add it to the database
except ObjectDoesNotExist:
sor = SaveOriginRequest.objects.create(origin_type=origin_type,
origin_url=origin_url,
status=save_request_status)
# origin can not be saved as its url is blacklisted,
# log the request to the database anyway
else:
sor = SaveOriginRequest.objects.create(origin_type=origin_type,
origin_url=origin_url,
status=save_request_status)
if save_request_status == SAVE_REQUEST_REJECTED:
raise ForbiddenExc('The origin url is blacklisted and will not be '
'loaded into the archive.')
return _save_request_dict(sor, task)
def get_save_origin_requests_from_queryset(requests_queryset):
"""
Get all save requests from a SaveOriginRequest queryset.
Args:
requests_queryset (django.db.models.QuerySet): input
SaveOriginRequest queryset
Returns:
list: A list of save origin requests dict as described in
:func:`swh.web.common.origin_save.create_save_origin_request`
"""
task_ids = []
for sor in requests_queryset:
task_ids.append(sor.loading_task_id)
requests = []
if task_ids:
tasks = scheduler.get_tasks(task_ids)
tasks = {task['id']: task for task in tasks}
for sor in requests_queryset:
sr_dict = _save_request_dict(sor, tasks.get(sor.loading_task_id))
requests.append(sr_dict)
return requests
def get_save_origin_requests(origin_type, origin_url):
"""
Get all save requests for a given software origin.
Args:
origin_type (str): the type of the origin
origin_url (str): the url of the origin
Raises:
BadInputExc: the origin type or url is invalid
NotFoundExc: no save requests can be found for the given origin
Returns:
list: A list of save origin requests dict as described in
:func:`swh.web.common.origin_save.create_save_origin_request`
"""
_check_origin_type_savable(origin_type)
_check_origin_url_valid(origin_url)
sors = SaveOriginRequest.objects.filter(origin_type=origin_type,
origin_url=origin_url)
if sors.count() == 0:
raise NotFoundExc(('No save requests found for origin with type '
'%s and url %s.') % (origin_type, origin_url))
return get_save_origin_requests_from_queryset(sors)
diff --git a/swh/web/templates/includes/top-navigation.html b/swh/web/templates/includes/top-navigation.html
index c3daf6e0..aff46bfe 100644
--- a/swh/web/templates/includes/top-navigation.html
+++ b/swh/web/templates/includes/top-navigation.html
@@ -1,126 +1,126 @@
{% comment %}
Copyright (C) 2017-2018 The Software Heritage developers
See the AUTHORS file at the top-level directory of this distribution
License: GNU Affero General Public License version 3, or any later version
See top-level LICENSE file for more information
{% endcomment %}
{% load swh_templatetags %}
{% if snapshot_context %}
{% if snapshot_context.branch or snapshot_context.release %}
{% endfor %}
{% if snapshot_context.branches|length < snapshot_context.snapshot_size.revision %}
Branches list truncated to {{ snapshot_context.branches|length }} entries,
{{ snapshot_context.branches|length|mul:-1|add:snapshot_context.snapshot_size.revision }}
were omitted.
{% endif %}
{% if snapshot_context.releases %}
{% for r in snapshot_context.releases %}
{% if r.target_type == 'revision' %}
Releases list truncated to {{ snapshot_context.releases|length }} entries,
{{ snapshot_context.releases|length|mul:-1|add:snapshot_context.snapshot_size.release }}
were omitted.
{% endif %}
{% else %}
No releases to show
{% endif %}
{% if not snapshot_context or not snapshot_context.is_empty %}
{% include "includes/vault-create-tasks.html" %}
{% endif %}
{% include "includes/show-metadata.html" %}
{% include "includes/take-new-snapshot.html" %}