Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F11023740
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
75 KB
Subscribers
None
View Options
diff --git a/swh/web/browse/views/utils/snapshot_context.py b/swh/web/browse/views/utils/snapshot_context.py
index ad3f3247..4e6c4f33 100644
--- a/swh/web/browse/views/utils/snapshot_context.py
+++ b/swh/web/browse/views/utils/snapshot_context.py
@@ -1,936 +1,936 @@
# Copyright (C) 2018-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
# Utility module implementing Django views for browsing the archive
# in a snapshot context.
# Its purpose is to factorize code for the views reachable from the
# /origin/.* and /snapshot/.* endpoints.
from django.shortcuts import render
from django.template.defaultfilters import filesizeformat
from django.utils.html import escape
import sentry_sdk
from swh.model.identifiers import snapshot_identifier
from swh.web.browse.utils import (
get_snapshot_context, get_directory_entries, gen_directory_link,
gen_revision_link, request_content, gen_content_link,
prepare_content_for_display, content_display_max_size,
format_log_entries, gen_revision_log_link, gen_release_link,
get_readme_to_display, get_swh_persistent_ids,
gen_snapshot_link, process_snapshot_branches
)
from swh.web.common import service, highlightjs
from swh.web.common.exc import (
handle_view_exception, NotFoundExc
)
from swh.web.common.utils import (
reverse, gen_path_info, format_utc_iso_date, swh_object_icons
)
_empty_snapshot_id = snapshot_identifier({'branches': {}})
def _get_branch(branches, branch_name, snapshot_id):
"""
Utility function to get a specific branch from a branches list.
Its purpose is to get the default HEAD branch as some software origin
(e.g those with svn type) does not have it. In that latter case, check
if there is a master branch instead and returns it.
"""
filtered_branches = [b for b in branches if b['name'] == branch_name]
if filtered_branches:
return filtered_branches[0]
elif branch_name == 'HEAD':
filtered_branches = [b for b in branches
if b['name'].endswith('master')]
if filtered_branches:
return filtered_branches[0]
elif branches:
return branches[0]
else:
# case where a large branches list has been truncated
snp = service.lookup_snapshot(snapshot_id,
branches_from=branch_name,
branches_count=1,
target_types=['revision', 'alias'])
snp_branch, _ = process_snapshot_branches(snp)
if snp_branch and snp_branch[0]['name'] == branch_name:
branches.append(snp_branch[0])
return snp_branch[0]
def _get_release(releases, release_name, snapshot_id):
"""
Utility function to get a specific release from a releases list.
Returns None if the release can not be found in the list.
"""
filtered_releases = [r for r in releases if r['name'] == release_name]
if filtered_releases:
return filtered_releases[0]
else:
# case where a large branches list has been truncated
for branch_name in (release_name, f'refs/tags/{release_name}'):
snp = service.lookup_snapshot(snapshot_id,
branches_from=branch_name,
branches_count=1,
target_types=['release'])
_, snp_release = process_snapshot_branches(snp)
if snp_release and snp_release[0]['name'] == release_name:
releases.append(snp_release[0])
return snp_release[0]
def _branch_not_found(branch_type, branch, snapshot_id, snapshot_sizes,
origin_info, timestamp, visit_id):
"""
Utility function to raise an exception when a specified branch/release
can not be found.
"""
if branch_type == 'branch':
branch_type = 'Branch'
branch_type_plural = 'branches'
target_type = 'revision'
else:
branch_type = 'Release'
branch_type_plural = 'releases'
target_type = 'release'
if snapshot_id and snapshot_sizes[target_type] == 0:
msg = ('Snapshot with id %s has an empty list'
' of %s!' % (snapshot_id, branch_type_plural))
elif snapshot_id:
msg = ('%s %s for snapshot with id %s'
' not found!' % (branch_type, branch, snapshot_id))
elif visit_id and snapshot_sizes[target_type] == 0:
msg = ('Origin with url %s'
' for visit with id %s has an empty list'
' of %s!' % (origin_info['url'], visit_id,
branch_type_plural))
elif visit_id:
msg = ('%s %s associated to visit with'
' id %s for origin with url %s'
' not found!' % (branch_type, branch, visit_id,
origin_info['url']))
elif snapshot_sizes[target_type] == 0:
msg = ('Origin with url %s'
' for visit with timestamp %s has an empty list'
' of %s!' % (origin_info['url'],
timestamp, branch_type_plural))
else:
msg = ('%s %s associated to visit with'
' timestamp %s for origin with '
'url %s not found!' % (branch_type, branch, timestamp,
origin_info['url']))
raise NotFoundExc(escape(msg))
def _process_snapshot_request(request, snapshot_id=None,
origin_url=None, timestamp=None, path=None,
browse_context='directory'):
"""
Utility function to perform common input request processing
for snapshot context views.
"""
visit_id = request.GET.get('visit_id', None)
snapshot_context = get_snapshot_context(snapshot_id,
origin_url, timestamp, visit_id)
swh_type = snapshot_context['swh_type']
origin_info = snapshot_context['origin_info']
branches = snapshot_context['branches']
releases = snapshot_context['releases']
url_args = snapshot_context['url_args']
query_params = snapshot_context['query_params']
if snapshot_context['visit_info']:
timestamp = format_utc_iso_date(snapshot_context['visit_info']['date'],
'%Y-%m-%dT%H:%M:%SZ')
snapshot_context['timestamp'] = format_utc_iso_date(
snapshot_context['visit_info']['date'])
browse_view_name = 'browse-' + swh_type + '-' + browse_context
root_sha1_git = None
revision_id = request.GET.get('revision', None)
release_name = request.GET.get('release', None)
release_id = None
branch_name = None
snapshot_sizes = snapshot_context['snapshot_sizes']
snapshot_total_size = sum(snapshot_sizes.values())
if snapshot_total_size and revision_id:
revision = service.lookup_revision(revision_id)
root_sha1_git = revision['directory']
branches.append({'name': revision_id,
'revision': revision_id,
'directory': root_sha1_git,
'url': None})
branch_name = revision_id
query_params['revision'] = revision_id
elif snapshot_total_size and release_name:
release = _get_release(releases, release_name,
snapshot_context['snapshot_id'])
try:
root_sha1_git = release['directory']
revision_id = release['target']
release_id = release['id']
query_params['release'] = release_name
except Exception as exc:
sentry_sdk.capture_exception(exc)
_branch_not_found('release', release_name, snapshot_id,
snapshot_sizes, origin_info, timestamp, visit_id)
elif snapshot_total_size:
branch_name = request.GET.get('branch', None)
if branch_name:
query_params['branch'] = branch_name
branch = _get_branch(branches, branch_name or 'HEAD',
snapshot_context['snapshot_id'])
try:
branch_name = branch['name']
revision_id = branch['revision']
root_sha1_git = branch['directory']
except Exception as exc:
sentry_sdk.capture_exception(exc)
_branch_not_found('branch', branch_name, snapshot_id,
snapshot_sizes, origin_info, timestamp, visit_id)
for b in branches:
branch_url_args = dict(url_args)
branch_query_params = dict(query_params)
if 'release' in branch_query_params:
del branch_query_params['release']
branch_query_params['branch'] = b['name']
if path:
b['path'] = path
branch_url_args['path'] = path
b['url'] = reverse(browse_view_name,
url_args=branch_url_args,
query_params=branch_query_params)
for r in releases:
release_url_args = dict(url_args)
release_query_params = dict(query_params)
if 'branch' in release_query_params:
del release_query_params['branch']
release_query_params['release'] = r['name']
if path:
r['path'] = path
release_url_args['path'] = path
r['url'] = reverse(browse_view_name,
url_args=release_url_args,
query_params=release_query_params)
snapshot_context['query_params'] = query_params
snapshot_context['root_sha1_git'] = root_sha1_git
snapshot_context['revision_id'] = revision_id
snapshot_context['branch'] = branch_name
snapshot_context['release'] = release_name
snapshot_context['release_id'] = release_id
return snapshot_context
def browse_snapshot_directory(request, snapshot_id=None,
origin_url=None, timestamp=None, path=None):
"""
Django view implementation for browsing a directory in a snapshot context.
"""
try:
snapshot_context = _process_snapshot_request(
request, snapshot_id, origin_url,
timestamp, path, browse_context='directory')
root_sha1_git = snapshot_context['root_sha1_git']
sha1_git = root_sha1_git
if root_sha1_git and path:
dir_info = service.lookup_directory_with_path(root_sha1_git, path)
sha1_git = dir_info['target']
dirs = []
files = []
if sha1_git:
dirs, files = get_directory_entries(sha1_git)
except Exception as exc:
return handle_view_exception(request, exc)
swh_type = snapshot_context['swh_type']
origin_info = snapshot_context['origin_info']
visit_info = snapshot_context['visit_info']
url_args = snapshot_context['url_args']
query_params = snapshot_context['query_params']
revision_id = snapshot_context['revision_id']
snapshot_id = snapshot_context['snapshot_id']
path_info = gen_path_info(path)
browse_view_name = 'browse-' + swh_type + '-directory'
breadcrumbs = []
if root_sha1_git:
breadcrumbs.append({'name': root_sha1_git[:7],
'url': reverse(browse_view_name,
url_args=url_args,
query_params=query_params)})
for pi in path_info:
bc_url_args = dict(url_args)
bc_url_args['path'] = pi['path']
breadcrumbs.append({'name': pi['name'],
'url': reverse(browse_view_name,
url_args=bc_url_args,
query_params=query_params)})
path = '' if path is None else (path + '/')
for d in dirs:
if d['type'] == 'rev':
d['url'] = reverse('browse-revision',
url_args={'sha1_git': d['target']})
else:
bc_url_args = dict(url_args)
bc_url_args['path'] = path + d['name']
d['url'] = reverse(browse_view_name,
url_args=bc_url_args,
query_params=query_params)
sum_file_sizes = 0
readmes = {}
browse_view_name = 'browse-' + swh_type + '-content'
for f in files:
bc_url_args = dict(url_args)
bc_url_args['path'] = path + f['name']
f['url'] = reverse(browse_view_name,
url_args=bc_url_args,
query_params=query_params)
if f['length'] is not None:
sum_file_sizes += f['length']
f['length'] = filesizeformat(f['length'])
if f['name'].lower().startswith('readme'):
readmes[f['name']] = f['checksums']['sha1']
readme_name, readme_url, readme_html = get_readme_to_display(readmes)
browse_view_name = 'browse-' + swh_type + '-log'
history_url = None
if snapshot_id != _empty_snapshot_id:
history_url = reverse(browse_view_name,
url_args=url_args,
query_params=query_params)
nb_files = None
nb_dirs = None
dir_path = None
if root_sha1_git:
nb_files = len(files)
nb_dirs = len(dirs)
sum_file_sizes = filesizeformat(sum_file_sizes)
dir_path = '/' + path
browse_dir_link = gen_directory_link(sha1_git)
browse_rev_link = gen_revision_link(revision_id)
browse_snp_link = gen_snapshot_link(snapshot_id)
dir_metadata = {"directory": sha1_git,
"context-independent directory": browse_dir_link,
"number of regular files": nb_files,
"number of subdirectories": nb_dirs,
"sum of regular file sizes": sum_file_sizes,
"path": dir_path,
"revision": revision_id,
"context-independent revision": browse_rev_link,
"snapshot": snapshot_id,
"context-independent snapshot": browse_snp_link}
if origin_info:
dir_metadata['origin url'] = origin_info['url']
dir_metadata['origin visit date'] = format_utc_iso_date(
visit_info['date'])
dir_metadata['origin visit type'] = visit_info['type']
vault_cooking = {
'directory_context': True,
'directory_id': sha1_git,
'revision_context': True,
'revision_id': revision_id
}
swh_objects = [{'type': 'directory',
'id': sha1_git},
{'type': 'revision',
'id': revision_id},
{'type': 'snapshot',
'id': snapshot_id}]
release_id = snapshot_context['release_id']
if release_id:
swh_objects.append({'type': 'release',
'id': release_id})
browse_rel_link = gen_release_link(release_id)
dir_metadata['release'] = release_id
dir_metadata['context-independent release'] = browse_rel_link
swh_ids = get_swh_persistent_ids(swh_objects, snapshot_context)
dir_path = '/'.join([bc['name'] for bc in breadcrumbs]) + '/'
context_found = 'snapshot: %s' % snapshot_context['snapshot_id']
if origin_info:
context_found = 'origin: %s' % origin_info['url']
heading = ('Directory - %s - %s - %s' %
(dir_path, snapshot_context['branch'], context_found))
top_right_link = None
if not snapshot_context['is_empty']:
top_right_link = {
'url': history_url,
'icon': swh_object_icons['revisions history'],
'text': 'History'
}
return render(request, 'browse/directory.html',
{'heading': heading,
'swh_object_name': 'Directory',
'swh_object_metadata': dir_metadata,
'dirs': dirs,
'files': files,
'breadcrumbs': breadcrumbs if root_sha1_git else [],
'top_right_link': top_right_link,
'readme_name': readme_name,
'readme_url': readme_url,
'readme_html': readme_html,
'snapshot_context': snapshot_context,
'vault_cooking': vault_cooking,
'show_actions_menu': True,
'swh_ids': swh_ids})
def browse_snapshot_content(request, snapshot_id=None,
origin_url=None, timestamp=None, path=None,
selected_language=None):
"""
Django view implementation for browsing a content in a snapshot context.
"""
try:
snapshot_context = _process_snapshot_request(request, snapshot_id,
origin_url,
timestamp, path,
browse_context='content')
root_sha1_git = snapshot_context['root_sha1_git']
sha1_git = None
query_string = None
content_data = None
directory_id = None
split_path = path.split('/')
filename = split_path[-1]
filepath = path[:-len(filename)]
if root_sha1_git:
content_info = service.lookup_directory_with_path(root_sha1_git,
path)
sha1_git = content_info['target']
query_string = 'sha1_git:' + sha1_git
content_data = request_content(query_string,
raise_if_unavailable=False)
if filepath:
dir_info = service.lookup_directory_with_path(root_sha1_git,
filepath)
directory_id = dir_info['target']
else:
directory_id = root_sha1_git
except Exception as exc:
return handle_view_exception(request, exc)
swh_type = snapshot_context['swh_type']
url_args = snapshot_context['url_args']
query_params = snapshot_context['query_params']
revision_id = snapshot_context['revision_id']
origin_info = snapshot_context['origin_info']
visit_info = snapshot_context['visit_info']
snapshot_id = snapshot_context['snapshot_id']
content = None
language = None
mimetype = None
if content_data and content_data['raw_data'] is not None:
content_display_data = prepare_content_for_display(
content_data['raw_data'], content_data['mimetype'], path)
content = content_display_data['content_data']
language = content_display_data['language']
mimetype = content_display_data['mimetype']
# Override language with user-selected language
if selected_language is not None:
language = selected_language
available_languages = None
if mimetype and 'text/' in mimetype:
available_languages = highlightjs.get_supported_languages()
browse_view_name = 'browse-' + swh_type + '-directory'
breadcrumbs = []
path_info = gen_path_info(filepath)
if root_sha1_git:
breadcrumbs.append({'name': root_sha1_git[:7],
'url': reverse(browse_view_name,
url_args=url_args,
query_params=query_params)})
for pi in path_info:
bc_url_args = dict(url_args)
bc_url_args['path'] = pi['path']
breadcrumbs.append({'name': pi['name'],
'url': reverse(browse_view_name,
url_args=bc_url_args,
query_params=query_params)})
breadcrumbs.append({'name': filename,
'url': None})
browse_content_link = gen_content_link(sha1_git)
content_raw_url = None
if query_string:
content_raw_url = reverse('browse-content-raw',
url_args={'query_string': query_string},
query_params={'filename': filename})
browse_rev_link = gen_revision_link(revision_id)
browse_dir_link = gen_directory_link(directory_id)
content_metadata = {
'context-independent content': browse_content_link,
'path': None,
'filename': None,
'directory': directory_id,
'context-independent directory': browse_dir_link,
'revision': revision_id,
'context-independent revision': browse_rev_link,
'snapshot': snapshot_id
}
cnt_sha1_git = None
content_size = None
error_code = 200
error_description = ''
error_message = ''
if content_data:
for checksum in content_data['checksums'].keys():
content_metadata[checksum] = content_data['checksums'][checksum]
content_metadata['mimetype'] = content_data['mimetype']
content_metadata['encoding'] = content_data['encoding']
content_metadata['size'] = filesizeformat(content_data['length'])
content_metadata['language'] = content_data['language']
content_metadata['licenses'] = content_data['licenses']
content_metadata['path'] = '/' + filepath
content_metadata['filename'] = filename
cnt_sha1_git = content_data['checksums']['sha1_git']
content_size = content_data['length']
error_code = content_data['error_code']
error_message = content_data['error_message']
error_description = content_data['error_description']
if origin_info:
content_metadata['origin url'] = origin_info['url']
content_metadata['origin visit date'] = format_utc_iso_date(
visit_info['date'])
content_metadata['origin visit type'] = visit_info['type']
browse_snapshot_link = gen_snapshot_link(snapshot_id)
content_metadata['context-independent snapshot'] = browse_snapshot_link
swh_objects = [{'type': 'content',
'id': cnt_sha1_git},
{'type': 'directory',
'id': directory_id},
{'type': 'revision',
'id': revision_id},
{'type': 'snapshot',
'id': snapshot_id}]
release_id = snapshot_context['release_id']
if release_id:
swh_objects.append({'type': 'release',
'id': release_id})
browse_rel_link = gen_release_link(release_id)
content_metadata['release'] = release_id
content_metadata['context-independent release'] = browse_rel_link
swh_ids = get_swh_persistent_ids(swh_objects, snapshot_context)
content_path = '/'.join([bc['name'] for bc in breadcrumbs])
context_found = 'snapshot: %s' % snapshot_context['snapshot_id']
if origin_info:
context_found = 'origin: %s' % origin_info['url']
heading = ('Content - %s - %s - %s' %
(content_path, snapshot_context['branch'], context_found))
top_right_link = None
if not snapshot_context['is_empty']:
top_right_link = {
'url': content_raw_url,
'icon': swh_object_icons['content'],
'text': 'Raw File'
}
return render(request, 'browse/content.html',
{'heading': heading,
'swh_object_name': 'Content',
'swh_object_metadata': content_metadata,
'content': content,
'content_size': content_size,
'max_content_size': content_display_max_size,
'mimetype': mimetype,
'language': language,
'available_languages': available_languages,
'breadcrumbs': breadcrumbs if root_sha1_git else [],
'top_right_link': top_right_link,
'snapshot_context': snapshot_context,
'vault_cooking': None,
'show_actions_menu': True,
'swh_ids': swh_ids,
'error_code': error_code,
'error_message': error_message,
'error_description': error_description},
status=error_code)
PER_PAGE = 100
def browse_snapshot_log(request, snapshot_id=None,
origin_url=None, timestamp=None):
"""
Django view implementation for browsing a revision history in a
snapshot context.
"""
try:
snapshot_context = _process_snapshot_request(
request, snapshot_id, origin_url, timestamp, browse_context='log')
revision_id = snapshot_context['revision_id']
per_page = int(request.GET.get('per_page', PER_PAGE))
offset = int(request.GET.get('offset', 0))
revs_ordering = request.GET.get('revs_ordering', 'committer_date')
session_key = 'rev_%s_log_ordering_%s' % (revision_id, revs_ordering)
rev_log_session = request.session.get(session_key, None)
rev_log = []
revs_walker_state = None
if rev_log_session:
rev_log = rev_log_session['rev_log']
revs_walker_state = rev_log_session['revs_walker_state']
if len(rev_log) < offset+per_page:
revs_walker = service.get_revisions_walker(
revs_ordering, revision_id,
max_revs=offset+per_page+1,
state=revs_walker_state)
rev_log += [rev['id'] for rev in revs_walker]
revs_walker_state = revs_walker.export_state()
revs = rev_log[offset:offset+per_page]
revision_log = service.lookup_revision_multiple(revs)
request.session[session_key] = {
'rev_log': rev_log,
'revs_walker_state': revs_walker_state
}
except Exception as exc:
return handle_view_exception(request, exc)
swh_type = snapshot_context['swh_type']
origin_info = snapshot_context['origin_info']
visit_info = snapshot_context['visit_info']
url_args = snapshot_context['url_args']
query_params = snapshot_context['query_params']
snapshot_id = snapshot_context['snapshot_id']
query_params['per_page'] = per_page
revs_ordering = request.GET.get('revs_ordering', '')
query_params['revs_ordering'] = revs_ordering
browse_view_name = 'browse-' + swh_type + '-log'
prev_log_url = None
if len(rev_log) > offset + per_page:
query_params['offset'] = offset + per_page
prev_log_url = reverse(browse_view_name,
url_args=url_args,
query_params=query_params)
next_log_url = None
if offset != 0:
query_params['offset'] = offset - per_page
next_log_url = reverse(browse_view_name,
url_args=url_args,
query_params=query_params)
revision_log_data = format_log_entries(revision_log, per_page,
snapshot_context)
browse_rev_link = gen_revision_link(revision_id)
browse_log_link = gen_revision_log_link(revision_id)
browse_snp_link = gen_snapshot_link(snapshot_id)
revision_metadata = {
'context-independent revision': browse_rev_link,
'context-independent revision history': browse_log_link,
'context-independent snapshot': browse_snp_link,
'snapshot': snapshot_id
}
if origin_info:
revision_metadata['origin url'] = origin_info['url']
revision_metadata['origin visit date'] = format_utc_iso_date(
visit_info['date'])
revision_metadata['origin visit type'] = visit_info['type']
swh_objects = [{'type': 'revision',
'id': revision_id},
{'type': 'snapshot',
'id': snapshot_id}]
release_id = snapshot_context['release_id']
if release_id:
swh_objects.append({'type': 'release',
'id': release_id})
browse_rel_link = gen_release_link(release_id)
revision_metadata['release'] = release_id
revision_metadata['context-independent release'] = browse_rel_link
swh_ids = get_swh_persistent_ids(swh_objects, snapshot_context)
context_found = 'snapshot: %s' % snapshot_context['snapshot_id']
if origin_info:
context_found = 'origin: %s' % origin_info['url']
heading = ('Revision history - %s - %s' %
(snapshot_context['branch'], context_found))
return render(request, 'browse/revision-log.html',
{'heading': heading,
'swh_object_name': 'Revisions history',
'swh_object_metadata': revision_metadata,
'revision_log': revision_log_data,
'revs_ordering': revs_ordering,
'next_log_url': next_log_url,
'prev_log_url': prev_log_url,
'breadcrumbs': None,
'top_right_link': None,
'snapshot_context': snapshot_context,
'vault_cooking': None,
'show_actions_menu': True,
'swh_ids': swh_ids})
def browse_snapshot_branches(request, snapshot_id=None,
origin_url=None, timestamp=None):
"""
Django view implementation for browsing a list of branches in a snapshot
context.
"""
try:
snapshot_context = _process_snapshot_request(request, snapshot_id,
origin_url, timestamp)
branches_bc = request.GET.get('branches_breadcrumbs', '')
branches_bc = branches_bc.split(',') if branches_bc else []
branches_from = branches_bc[-1] if branches_bc else ''
swh_type = snapshot_context['swh_type']
origin_info = snapshot_context['origin_info']
url_args = snapshot_context['url_args']
query_params = snapshot_context['query_params']
browse_view_name = 'browse-' + swh_type + '-directory'
snapshot = service.lookup_snapshot(snapshot_context['snapshot_id'],
branches_from, PER_PAGE+1,
target_types=['revision', 'alias'])
displayed_branches, _ = process_snapshot_branches(snapshot)
except Exception as exc:
return handle_view_exception(request, exc)
for branch in displayed_branches:
if snapshot_id:
revision_url = reverse('browse-revision',
url_args={'sha1_git': branch['revision']},
query_params={'snapshot_id': snapshot_id})
else:
revision_url = reverse('browse-revision',
url_args={'sha1_git': branch['revision']},
query_params={'origin': origin_info['url']})
query_params['branch'] = branch['name']
directory_url = reverse(browse_view_name,
url_args=url_args,
query_params=query_params)
del query_params['branch']
branch['revision_url'] = revision_url
branch['directory_url'] = directory_url
browse_view_name = 'browse-' + swh_type + '-branches'
prev_branches_url = None
next_branches_url = None
if branches_bc:
query_params_prev = dict(query_params)
query_params_prev['branches_breadcrumbs'] = ','.join(branches_bc[:-1])
prev_branches_url = reverse(browse_view_name, url_args=url_args,
query_params=query_params_prev)
elif branches_from:
prev_branches_url = reverse(browse_view_name, url_args=url_args,
query_params=query_params)
- if len(displayed_branches) > PER_PAGE:
+ if snapshot['next_branch'] is not None:
query_params_next = dict(query_params)
next_branch = displayed_branches[-1]['name']
del displayed_branches[-1]
branches_bc.append(next_branch)
query_params_next['branches_breadcrumbs'] = ','.join(branches_bc)
next_branches_url = reverse(browse_view_name, url_args=url_args,
query_params=query_params_next)
heading = 'Branches - '
if origin_info:
heading += 'origin: %s' % origin_info['url']
else:
heading += 'snapshot: %s' % snapshot_id
return render(request, 'browse/branches.html',
{'heading': heading,
'swh_object_name': 'Branches',
'swh_object_metadata': {},
'top_right_link': None,
'displayed_branches': displayed_branches,
'prev_branches_url': prev_branches_url,
'next_branches_url': next_branches_url,
'snapshot_context': snapshot_context})
def browse_snapshot_releases(request, snapshot_id=None,
origin_url=None, timestamp=None):
"""
Django view implementation for browsing a list of releases in a snapshot
context.
"""
try:
snapshot_context = _process_snapshot_request(request, snapshot_id,
origin_url, timestamp)
rel_bc = request.GET.get('releases_breadcrumbs', '')
rel_bc = rel_bc.split(',') if rel_bc else []
rel_from = rel_bc[-1] if rel_bc else ''
swh_type = snapshot_context['swh_type']
origin_info = snapshot_context['origin_info']
url_args = snapshot_context['url_args']
query_params = snapshot_context['query_params']
snapshot = service.lookup_snapshot(snapshot_context['snapshot_id'],
rel_from, PER_PAGE+1,
target_types=['release', 'alias'])
_, displayed_releases = process_snapshot_branches(snapshot)
except Exception as exc:
return handle_view_exception(request, exc)
for release in displayed_releases:
if snapshot_id:
query_params_tgt = {'snapshot_id': snapshot_id}
else:
query_params_tgt = {'origin': origin_info['url']}
release_url = reverse('browse-release',
url_args={'sha1_git': release['id']},
query_params=query_params_tgt)
target_url = ''
if release['target_type'] == 'revision':
target_url = reverse('browse-revision',
url_args={'sha1_git': release['target']},
query_params=query_params_tgt)
elif release['target_type'] == 'directory':
target_url = reverse('browse-directory',
url_args={'sha1_git': release['target']},
query_params=query_params_tgt)
elif release['target_type'] == 'content':
target_url = reverse('browse-content',
url_args={'query_string': release['target']},
query_params=query_params_tgt)
elif release['target_type'] == 'release':
target_url = reverse('browse-release',
url_args={'sha1_git': release['target']},
query_params=query_params_tgt)
release['release_url'] = release_url
release['target_url'] = target_url
browse_view_name = 'browse-' + swh_type + '-releases'
prev_releases_url = None
next_releases_url = None
if rel_bc:
query_params_prev = dict(query_params)
query_params_prev['releases_breadcrumbs'] = ','.join(rel_bc[:-1])
prev_releases_url = reverse(browse_view_name, url_args=url_args,
query_params=query_params_prev)
elif rel_from:
prev_releases_url = reverse(browse_view_name, url_args=url_args,
query_params=query_params)
- if len(displayed_releases) > PER_PAGE:
+ if snapshot['next_branch'] is not None:
query_params_next = dict(query_params)
next_rel = displayed_releases[-1]['branch_name']
del displayed_releases[-1]
rel_bc.append(next_rel)
query_params_next['releases_breadcrumbs'] = ','.join(rel_bc)
next_releases_url = reverse(browse_view_name, url_args=url_args,
query_params=query_params_next)
heading = 'Releases - '
if origin_info:
heading += 'origin: %s' % origin_info['url']
else:
heading += 'snapshot: %s' % snapshot_id
return render(request, 'browse/releases.html',
{'heading': heading,
'top_panel_visible': False,
'top_panel_collapsible': False,
'swh_object_name': 'Releases',
'swh_object_metadata': {},
'top_right_link': None,
'displayed_releases': displayed_releases,
'prev_releases_url': prev_releases_url,
'next_releases_url': next_releases_url,
'snapshot_context': snapshot_context,
'vault_cooking': None,
'show_actions_menu': False})
diff --git a/swh/web/tests/browse/views/test_origin.py b/swh/web/tests/browse/views/test_origin.py
index 99443168..e0166087 100644
--- a/swh/web/tests/browse/views/test_origin.py
+++ b/swh/web/tests/browse/views/test_origin.py
@@ -1,905 +1,950 @@
# Copyright (C) 2017-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import random
import re
+import string
import swh.web.browse.utils
from django.utils.html import escape
from hypothesis import given
from swh.model.hashutil import hash_to_bytes
from swh.model.model import Snapshot
from swh.web.browse.utils import process_snapshot_branches
from swh.web.common.exc import NotFoundExc
from swh.web.common.utils import (
reverse, gen_path_info, format_utc_iso_date,
parse_timestamp, get_swh_persistent_id
)
-from swh.web.tests.data import get_content
+from swh.web.tests.data import get_content, random_sha1
from swh.web.tests.django_asserts import assert_contains, assert_template_used
from swh.web.tests.strategies import (
origin, origin_with_multiple_visits, new_origin,
- new_snapshot, visit_dates, revisions, origin_with_releases
+ new_snapshot, visit_dates, revisions, origin_with_releases,
+ release as existing_release
)
@given(origin_with_multiple_visits())
def test_origin_visits_browse(client, archive_data, origin):
url = reverse('browse-origin-visits',
url_args={'origin_url': origin['url']})
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, 'browse/origin-visits.html')
url = reverse('browse-origin-visits',
url_args={'origin_url': origin['url']})
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, 'browse/origin-visits.html')
visits = archive_data.origin_visit_get(origin['url'])
for v in visits:
vdate = format_utc_iso_date(v['date'], '%Y-%m-%dT%H:%M:%SZ')
browse_dir_url = reverse('browse-origin-directory',
url_args={'origin_url': origin['url'],
'timestamp': vdate})
assert_contains(resp, browse_dir_url)
@given(origin_with_multiple_visits())
def test_origin_content_view(client, archive_data, origin):
origin_visits = archive_data.origin_visit_get(origin['url'])
def _get_archive_data(visit_idx):
snapshot = archive_data.snapshot_get(
origin_visits[visit_idx]['snapshot'])
head_rev_id = archive_data.snapshot_get_head(snapshot)
head_rev = archive_data.revision_get(head_rev_id)
dir_content = archive_data.directory_ls(head_rev['directory'])
dir_files = [e for e in dir_content if e['type'] == 'file']
dir_file = random.choice(dir_files)
branches, releases = process_snapshot_branches(snapshot)
return {
'branches': branches,
'releases': releases,
'root_dir_sha1': head_rev['directory'],
'content': get_content(dir_file['checksums']['sha1']),
'visit': origin_visits[visit_idx]
}
tdata = _get_archive_data(-1)
_origin_content_view_test_helper(client, origin, origin_visits,
tdata['branches'],
tdata['releases'],
tdata['root_dir_sha1'],
tdata['content'])
_origin_content_view_test_helper(client, origin, origin_visits,
tdata['branches'],
tdata['releases'],
tdata['root_dir_sha1'],
tdata['content'],
timestamp=tdata['visit']['date'])
visit_unix_ts = parse_timestamp(tdata['visit']['date']).timestamp()
visit_unix_ts = int(visit_unix_ts)
_origin_content_view_test_helper(client, origin, origin_visits,
tdata['branches'],
tdata['releases'],
tdata['root_dir_sha1'],
tdata['content'],
timestamp=visit_unix_ts)
tdata = _get_archive_data(0)
_origin_content_view_test_helper(client, origin, origin_visits,
tdata['branches'],
tdata['releases'],
tdata['root_dir_sha1'],
tdata['content'],
visit_id=tdata['visit']['visit'])
@given(origin())
def test_origin_root_directory_view(client, archive_data, origin):
origin_visits = archive_data.origin_visit_get(origin['url'])
visit = origin_visits[-1]
snapshot = archive_data.snapshot_get(visit['snapshot'])
head_rev_id = archive_data.snapshot_get_head(snapshot)
head_rev = archive_data.revision_get(head_rev_id)
root_dir_sha1 = head_rev['directory']
dir_content = archive_data.directory_ls(root_dir_sha1)
branches, releases = process_snapshot_branches(snapshot)
visit_unix_ts = parse_timestamp(visit['date']).timestamp()
visit_unix_ts = int(visit_unix_ts)
_origin_directory_view_test_helper(client, origin, origin_visits, branches,
releases, root_dir_sha1, dir_content)
_origin_directory_view_test_helper(client, origin, origin_visits, branches,
releases, root_dir_sha1, dir_content,
visit_id=visit['visit'])
_origin_directory_view_test_helper(client, origin, origin_visits, branches,
releases, root_dir_sha1, dir_content,
timestamp=visit_unix_ts)
_origin_directory_view_test_helper(client, origin, origin_visits, branches,
releases, root_dir_sha1, dir_content,
timestamp=visit['date'])
origin = dict(origin)
del origin['type']
_origin_directory_view_test_helper(client, origin, origin_visits, branches,
releases, root_dir_sha1, dir_content)
_origin_directory_view_test_helper(client, origin, origin_visits, branches,
releases, root_dir_sha1, dir_content,
visit_id=visit['visit'])
_origin_directory_view_test_helper(client, origin, origin_visits, branches,
releases, root_dir_sha1, dir_content,
timestamp=visit_unix_ts)
_origin_directory_view_test_helper(client, origin, origin_visits, branches,
releases, root_dir_sha1, dir_content,
timestamp=visit['date'])
@given(origin())
def test_origin_sub_directory_view(client, archive_data, origin):
origin_visits = archive_data.origin_visit_get(origin['url'])
visit = origin_visits[-1]
snapshot = archive_data.snapshot_get(visit['snapshot'])
head_rev_id = archive_data.snapshot_get_head(snapshot)
head_rev = archive_data.revision_get(head_rev_id)
root_dir_sha1 = head_rev['directory']
subdirs = [e for e in archive_data.directory_ls(root_dir_sha1)
if e['type'] == 'dir']
branches, releases = process_snapshot_branches(snapshot)
visit_unix_ts = parse_timestamp(visit['date']).timestamp()
visit_unix_ts = int(visit_unix_ts)
if len(subdirs) == 0:
return
subdir = random.choice(subdirs)
subdir_content = archive_data.directory_ls(subdir['target'])
subdir_path = subdir['name']
_origin_directory_view_test_helper(client, origin, origin_visits, branches,
releases, root_dir_sha1, subdir_content,
path=subdir_path)
_origin_directory_view_test_helper(client, origin, origin_visits, branches,
releases, root_dir_sha1, subdir_content,
path=subdir_path,
visit_id=visit['visit'])
_origin_directory_view_test_helper(client, origin, origin_visits, branches,
releases, root_dir_sha1, subdir_content,
path=subdir_path,
timestamp=visit_unix_ts)
_origin_directory_view_test_helper(client, origin, origin_visits, branches,
releases, root_dir_sha1, subdir_content,
path=subdir_path,
timestamp=visit['date'])
origin = dict(origin)
del origin['type']
_origin_directory_view_test_helper(client, origin, origin_visits, branches,
releases, root_dir_sha1, subdir_content,
path=subdir_path)
_origin_directory_view_test_helper(client, origin, origin_visits, branches,
releases, root_dir_sha1, subdir_content,
path=subdir_path,
visit_id=visit['visit'])
_origin_directory_view_test_helper(client, origin, origin_visits, branches,
releases, root_dir_sha1, subdir_content,
path=subdir_path,
timestamp=visit_unix_ts)
_origin_directory_view_test_helper(client, origin, origin_visits, branches,
releases, root_dir_sha1, subdir_content,
path=subdir_path,
timestamp=visit['date'])
@given(origin())
def test_origin_branches(client, archive_data, origin):
origin_visits = archive_data.origin_visit_get(origin['url'])
visit = origin_visits[-1]
snapshot = archive_data.snapshot_get(visit['snapshot'])
snapshot_content = process_snapshot_branches(snapshot)
_origin_branches_test_helper(client, origin, snapshot_content)
origin = dict(origin)
origin['type'] = None
_origin_branches_test_helper(client, origin, snapshot_content)
@given(origin())
def test_origin_releases(client, archive_data, origin):
origin_visits = archive_data.origin_visit_get(origin['url'])
visit = origin_visits[-1]
snapshot = archive_data.snapshot_get(visit['snapshot'])
snapshot_content = process_snapshot_branches(snapshot)
_origin_releases_test_helper(client, origin, snapshot_content)
origin = dict(origin)
origin['type'] = None
_origin_releases_test_helper(client, origin, snapshot_content)
@given(new_origin(), new_snapshot(min_size=4, max_size=4), visit_dates(),
revisions(min_size=3, max_size=3))
def test_origin_snapshot_null_branch(client, archive_data, new_origin,
new_snapshot, visit_dates, revisions):
snp_dict = new_snapshot.to_dict()
new_origin = archive_data.origin_add([new_origin])[0]
for i, branch in enumerate(snp_dict['branches'].keys()):
if i == 0:
snp_dict['branches'][branch] = None
else:
snp_dict['branches'][branch] = {
'target_type': 'revision',
'target': hash_to_bytes(revisions[i-1]),
}
archive_data.snapshot_add([Snapshot.from_dict(snp_dict)])
visit = archive_data.origin_visit_add(
new_origin['url'], visit_dates[0], type='git')
archive_data.origin_visit_update(new_origin['url'], visit.visit,
status='partial',
snapshot=snp_dict['id'])
url = reverse('browse-origin-directory',
url_args={'origin_url': new_origin['url']})
rv = client.get(url)
assert rv.status_code == 200
@given(new_origin(), new_snapshot(min_size=4, max_size=4), visit_dates(),
revisions(min_size=4, max_size=4))
def test_origin_snapshot_invalid_branch(client, archive_data, new_origin,
new_snapshot, visit_dates, revisions):
snp_dict = new_snapshot.to_dict()
new_origin = archive_data.origin_add([new_origin])[0]
for i, branch in enumerate(snp_dict['branches'].keys()):
snp_dict['branches'][branch] = {
'target_type': 'revision',
'target': hash_to_bytes(revisions[i]),
}
archive_data.snapshot_add([Snapshot.from_dict(snp_dict)])
visit = archive_data.origin_visit_add(
new_origin['url'], visit_dates[0], type='git')
archive_data.origin_visit_update(new_origin['url'], visit.visit,
status='full',
snapshot=snp_dict['id'])
url = reverse('browse-origin-directory',
url_args={'origin_url': new_origin['url']},
query_params={'branch': 'invalid_branch'})
rv = client.get(url)
assert rv.status_code == 404
def test_origin_request_errors(client, archive_data, mocker):
mock_snapshot_service = mocker.patch(
'swh.web.browse.views.utils.snapshot_context.service')
mock_origin_service = mocker.patch('swh.web.browse.views.origin.service')
mock_utils_service = mocker.patch('swh.web.browse.utils.service')
mock_get_origin_visit_snapshot = mocker.patch(
'swh.web.browse.utils.get_origin_visit_snapshot')
mock_get_origin_visits = mocker.patch(
'swh.web.common.origin_visits.get_origin_visits')
mock_request_content = mocker.patch(
'swh.web.browse.views.utils.snapshot_context.request_content')
mock_origin_service.lookup_origin.side_effect = NotFoundExc(
'origin not found')
url = reverse('browse-origin-visits',
url_args={'origin_url': 'bar'})
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, 'error.html')
assert_contains(resp, 'origin not found', status_code=404)
mock_origin_service.lookup_origin.side_effect = None
mock_origin_service.lookup_origin.return_value = {'type': 'foo',
'url': 'bar',
'id': 457}
mock_get_origin_visits.return_value = []
url = reverse('browse-origin-directory',
url_args={'origin_url': 'bar'})
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, 'error.html')
assert_contains(resp, "No visit", status_code=404)
mock_get_origin_visits.return_value = [{'visit': 1}]
mock_get_origin_visit_snapshot.side_effect = NotFoundExc('visit not found')
url = reverse('browse-origin-directory',
url_args={'origin_url': 'bar'},
query_params={'visit_id': 2})
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, 'error.html')
assert re.search('Visit.*not found', resp.content.decode('utf-8'))
mock_get_origin_visits.return_value = [{
'date': '2015-09-26T09:30:52.373449+00:00',
'metadata': {},
'origin': 457,
'snapshot': 'bdaf9ac436488a8c6cda927a0f44e172934d3f65',
'status': 'full',
'visit': 1
}]
mock_get_origin_visit_snapshot.side_effect = None
mock_get_origin_visit_snapshot.return_value = (
[{'directory': 'ae59ceecf46367e8e4ad800e231fc76adc3afffb',
'name': 'HEAD',
'revision': '7bc08e1aa0b08cb23e18715a32aa38517ad34672',
'date': '04 May 2017, 13:27 UTC',
'message': ''}],
[]
)
mock_utils_service.lookup_snapshot_sizes.return_value = {
'revision': 1,
'release': 0
}
mock_lookup_directory = mock_utils_service.lookup_directory
mock_lookup_directory.side_effect = NotFoundExc('Directory not found')
url = reverse('browse-origin-directory',
url_args={'origin_url': 'bar'})
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, 'error.html')
assert_contains(resp, 'Directory not found', status_code=404)
mock_origin_service.lookup_origin.side_effect = None
mock_origin_service.lookup_origin.return_value = {'type': 'foo',
'url': 'bar',
'id': 457}
mock_get_origin_visits.return_value = []
url = reverse('browse-origin-content',
url_args={'origin_url': 'bar',
'path': 'foo'})
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, 'error.html')
assert_contains(resp, "No visit", status_code=404)
mock_get_origin_visits.return_value = [{'visit': 1}]
mock_get_origin_visit_snapshot.side_effect = NotFoundExc('visit not found')
url = reverse('browse-origin-content',
url_args={'origin_url': 'bar',
'path': 'foo'},
query_params={'visit_id': 2})
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, 'error.html')
assert re.search('Visit.*not found', resp.content.decode('utf-8'))
mock_get_origin_visits.return_value = [{
'date': '2015-09-26T09:30:52.373449+00:00',
'metadata': {},
'origin': 457,
'snapshot': 'bdaf9ac436488a8c6cda927a0f44e172934d3f65',
'status': 'full',
'type': 'git',
'visit': 1
}]
mock_get_origin_visit_snapshot.side_effect = None
mock_get_origin_visit_snapshot.return_value = ([], [])
mock_utils_service.lookup_snapshot_sizes.return_value = {
'revision': 0,
'release': 0
}
mock_utils_service.lookup_origin.return_value = {'type': 'foo',
'url': 'bar',
'id': 457}
url = reverse('browse-origin-content',
url_args={'origin_url': 'bar',
'path': 'baz'})
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, 'browse/content.html')
assert re.search('snapshot.*is empty', resp.content.decode('utf-8'))
mock_get_origin_visit_snapshot.return_value = (
[{'directory': 'ae59ceecf46367e8e4ad800e231fc76adc3afffb',
'name': 'HEAD',
'revision': '7bc08e1aa0b08cb23e18715a32aa38517ad34672',
'date': '04 May 2017, 13:27 UTC',
'message': ''}],
[]
)
mock_utils_service.lookup_snapshot_sizes.return_value = {
'revision': 1,
'release': 0
}
mock_snapshot_service.lookup_directory_with_path.return_value = {
'target': '5ecd9f37b7a2d2e9980d201acd6286116f2ba1f1'
}
mock_request_content.side_effect = NotFoundExc('Content not found')
url = reverse('browse-origin-content',
url_args={'origin_url': 'bar',
'path': 'baz'})
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, 'error.html')
assert_contains(resp, 'Content not found', status_code=404)
mock_get_snapshot_context = mocker.patch(
'swh.web.browse.views.utils.snapshot_context.get_snapshot_context')
mock_get_snapshot_context.side_effect = NotFoundExc('Snapshot not found')
url = reverse('browse-origin-directory',
url_args={'origin_url': 'bar'})
resp = client.get(url)
assert resp.status_code == 404
assert_template_used(resp, 'error.html')
assert_contains(resp, 'Snapshot not found', status_code=404)
def test_origin_empty_snapshot(client, mocker):
mock_utils_service = mocker.patch('swh.web.browse.utils.service')
mock_get_origin_visit_snapshot = mocker.patch(
'swh.web.browse.utils.get_origin_visit_snapshot')
mock_get_origin_visits = mocker.patch(
'swh.web.common.origin_visits.get_origin_visits')
mock_get_origin_visits.return_value = [{
'date': '2015-09-26T09:30:52.373449+00:00',
'metadata': {},
'origin': 457,
'snapshot': 'bdaf9ac436488a8c6cda927a0f44e172934d3f65',
'status': 'full',
'type': 'git',
'visit': 1
}]
mock_get_origin_visit_snapshot.return_value = ([], [])
mock_utils_service.lookup_snapshot_sizes.return_value = {
'revision': 0,
'release': 0
}
mock_utils_service.lookup_origin.return_value = {
'id': 457,
'url': 'https://github.com/foo/bar'
}
url = reverse('browse-origin-directory',
url_args={'origin_url': 'bar'})
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, 'browse/directory.html')
resp_content = resp.content.decode('utf-8')
assert re.search('snapshot.*is empty', resp_content)
assert not re.search('swh-tr-link', resp_content)
@given(origin_with_releases())
def test_origin_release_browse(client, archive_data, origin):
# for swh.web.browse.utils.get_snapshot_content to only return one branch
snapshot_max_size = swh.web.browse.utils.snapshot_content_max_size
swh.web.browse.utils.snapshot_content_max_size = 1
try:
snapshot = archive_data.snapshot_get_latest(origin['url'])
release = [b for b in snapshot['branches'].values()
if b['target_type'] == 'release'][-1]
release_data = archive_data.release_get(release['target'])
url = reverse('browse-origin-directory',
url_args={'origin_url': origin['url']},
query_params={'release': release_data['name']})
resp = client.get(url)
assert resp.status_code == 200
assert_contains(resp, release_data['name'])
assert_contains(resp, release['target'])
finally:
swh.web.browse.utils.snapshot_content_max_size = snapshot_max_size
@given(origin_with_releases())
def test_origin_release_browse_not_found(client, archive_data, origin):
invalid_release_name = 'swh-foo-bar'
url = reverse('browse-origin-directory',
url_args={'origin_url': origin['url']},
query_params={'release': invalid_release_name})
resp = client.get(url)
assert resp.status_code == 404
assert re.search(f'Release {invalid_release_name}.*not found',
resp.content.decode('utf-8'))
def _origin_content_view_test_helper(client, origin_info, origin_visits,
origin_branches, origin_releases,
root_dir_sha1, content,
visit_id=None, timestamp=None):
content_path = '/'.join(content['path'].split('/')[1:])
url_args = {'origin_url': origin_info['url'],
'path': content_path}
if not visit_id:
visit_id = origin_visits[-1]['visit']
query_params = {}
if timestamp:
url_args['timestamp'] = timestamp
if visit_id:
query_params['visit_id'] = visit_id
url = reverse('browse-origin-content',
url_args=url_args,
query_params=query_params)
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, 'browse/content.html')
assert type(content['data']) == str
assert_contains(resp, '<code class="%s">' %
content['hljs_language'])
assert_contains(resp, escape(content['data']))
split_path = content_path.split('/')
filename = split_path[-1]
path = content_path.replace(filename, '')[:-1]
path_info = gen_path_info(path)
del url_args['path']
if timestamp:
url_args['timestamp'] = format_utc_iso_date(
parse_timestamp(timestamp).isoformat(), '%Y-%m-%dT%H:%M:%S')
root_dir_url = reverse('browse-origin-directory',
url_args=url_args,
query_params=query_params)
assert_contains(resp, '<li class="swh-path">',
count=len(path_info)+1)
assert_contains(resp, '<a href="%s">%s</a>' %
(root_dir_url, root_dir_sha1[:7]))
for p in path_info:
url_args['path'] = p['path']
dir_url = reverse('browse-origin-directory',
url_args=url_args,
query_params=query_params)
assert_contains(resp, '<a href="%s">%s</a>' %
(dir_url, p['name']))
assert_contains(resp, '<li>%s</li>' % filename)
query_string = 'sha1_git:' + content['sha1_git']
url_raw = reverse('browse-content-raw',
url_args={'query_string': query_string},
query_params={'filename': filename})
assert_contains(resp, url_raw)
if 'args' in url_args:
del url_args['path']
origin_branches_url = reverse('browse-origin-branches',
url_args=url_args,
query_params=query_params)
assert_contains(resp, '<a href="%s">Branches (%s)</a>' %
(origin_branches_url, len(origin_branches)))
origin_releases_url = reverse('browse-origin-releases',
url_args=url_args,
query_params=query_params)
assert_contains(resp, '<a href="%s">Releases (%s)</a>' %
(origin_releases_url, len(origin_releases)))
assert_contains(resp, '<li class="swh-branch">',
count=len(origin_branches))
url_args['path'] = content_path
for branch in origin_branches:
query_params['branch'] = branch['name']
root_dir_branch_url = reverse('browse-origin-content',
url_args=url_args,
query_params=query_params)
assert_contains(resp, '<a href="%s">' % root_dir_branch_url)
assert_contains(resp, '<li class="swh-release">',
count=len(origin_releases))
query_params['branch'] = None
for release in origin_releases:
query_params['release'] = release['name']
root_dir_release_url = reverse('browse-origin-content',
url_args=url_args,
query_params=query_params)
assert_contains(resp, '<a href="%s">' % root_dir_release_url)
url = reverse('browse-origin-content',
url_args=url_args,
query_params=query_params)
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, 'browse/content.html')
swh_cnt_id = get_swh_persistent_id('content', content['sha1_git'])
swh_cnt_id_url = reverse('browse-swh-id',
url_args={'swh_id': swh_cnt_id})
assert_contains(resp, swh_cnt_id)
assert_contains(resp, swh_cnt_id_url)
assert_contains(resp, 'swh-take-new-snapshot')
def _origin_directory_view_test_helper(client, origin_info, origin_visits,
origin_branches, origin_releases,
root_directory_sha1, directory_entries,
visit_id=None, timestamp=None,
path=None):
dirs = [e for e in directory_entries
if e['type'] in ('dir', 'rev')]
files = [e for e in directory_entries
if e['type'] == 'file']
if not visit_id:
visit_id = origin_visits[-1]['visit']
url_args = {'origin_url': origin_info['url']}
query_params = {}
if timestamp:
url_args['timestamp'] = timestamp
else:
query_params['visit_id'] = visit_id
if path:
url_args['path'] = path
url = reverse('browse-origin-directory',
url_args=url_args,
query_params=query_params)
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, 'browse/directory.html')
assert resp.status_code == 200
assert_template_used(resp, 'browse/directory.html')
assert_contains(resp, '<td class="swh-directory">',
count=len(dirs))
assert_contains(resp, '<td class="swh-content">',
count=len(files))
if timestamp:
url_args['timestamp'] = format_utc_iso_date(
parse_timestamp(timestamp).isoformat(), '%Y-%m-%dT%H:%M:%S')
for d in dirs:
if d['type'] == 'rev':
dir_url = reverse('browse-revision',
url_args={'sha1_git': d['target']})
else:
dir_path = d['name']
if path:
dir_path = "%s/%s" % (path, d['name'])
dir_url_args = dict(url_args)
dir_url_args['path'] = dir_path
dir_url = reverse('browse-origin-directory',
url_args=dir_url_args,
query_params=query_params)
assert_contains(resp, dir_url)
for f in files:
file_path = f['name']
if path:
file_path = "%s/%s" % (path, f['name'])
file_url_args = dict(url_args)
file_url_args['path'] = file_path
file_url = reverse('browse-origin-content',
url_args=file_url_args,
query_params=query_params)
assert_contains(resp, file_url)
if 'path' in url_args:
del url_args['path']
root_dir_branch_url = reverse('browse-origin-directory',
url_args=url_args,
query_params=query_params)
nb_bc_paths = 1
if path:
nb_bc_paths = len(path.split('/')) + 1
assert_contains(resp, '<li class="swh-path">', count=nb_bc_paths)
assert_contains(resp, '<a href="%s">%s</a>' %
(root_dir_branch_url,
root_directory_sha1[:7]))
origin_branches_url = reverse('browse-origin-branches',
url_args=url_args,
query_params=query_params)
assert_contains(resp, '<a href="%s">Branches (%s)</a>' %
(origin_branches_url, len(origin_branches)))
origin_releases_url = reverse('browse-origin-releases',
url_args=url_args,
query_params=query_params)
nb_releases = len(origin_releases)
if nb_releases > 0:
assert_contains(resp, '<a href="%s">Releases (%s)</a>' %
(origin_releases_url, nb_releases))
if path:
url_args['path'] = path
assert_contains(resp, '<li class="swh-branch">',
count=len(origin_branches))
for branch in origin_branches:
query_params['branch'] = branch['name']
root_dir_branch_url = reverse('browse-origin-directory',
url_args=url_args,
query_params=query_params)
assert_contains(resp, '<a href="%s">' % root_dir_branch_url)
assert_contains(resp, '<li class="swh-release">',
count=len(origin_releases))
query_params['branch'] = None
for release in origin_releases:
query_params['release'] = release['name']
root_dir_release_url = reverse('browse-origin-directory',
url_args=url_args,
query_params=query_params)
assert_contains(resp, '<a href="%s">' % root_dir_release_url)
assert_contains(resp, 'vault-cook-directory')
assert_contains(resp, 'vault-cook-revision')
swh_dir_id = get_swh_persistent_id('directory', directory_entries[0]['dir_id']) # noqa
swh_dir_id_url = reverse('browse-swh-id',
url_args={'swh_id': swh_dir_id})
assert_contains(resp, swh_dir_id)
assert_contains(resp, swh_dir_id_url)
assert_contains(resp, 'swh-take-new-snapshot')
def _origin_branches_test_helper(client, origin_info, origin_snapshot):
url_args = {'origin_url': origin_info['url']}
url = reverse('browse-origin-branches',
url_args=url_args)
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, 'browse/branches.html')
origin_branches = origin_snapshot[0]
origin_releases = origin_snapshot[1]
origin_branches_url = reverse('browse-origin-branches',
url_args=url_args)
assert_contains(resp, '<a href="%s">Branches (%s)</a>' %
(origin_branches_url, len(origin_branches)))
origin_releases_url = reverse('browse-origin-releases',
url_args=url_args)
nb_releases = len(origin_releases)
if nb_releases > 0:
assert_contains(resp, '<a href="%s">Releases (%s)</a>' %
(origin_releases_url, nb_releases))
assert_contains(resp, '<tr class="swh-branch-entry',
count=len(origin_branches))
for branch in origin_branches:
browse_branch_url = reverse(
'browse-origin-directory',
url_args={'origin_url': origin_info['url']},
query_params={'branch': branch['name']})
assert_contains(resp, '<a href="%s">' %
escape(browse_branch_url))
browse_revision_url = reverse(
'browse-revision',
url_args={'sha1_git': branch['revision']},
query_params={'origin': origin_info['url']})
assert_contains(resp, '<a href="%s">' %
escape(browse_revision_url))
def _origin_releases_test_helper(client, origin_info, origin_snapshot):
url_args = {'origin_url': origin_info['url']}
url = reverse('browse-origin-releases',
url_args=url_args)
resp = client.get(url)
assert resp.status_code == 200
assert_template_used(resp, 'browse/releases.html')
origin_branches = origin_snapshot[0]
origin_releases = origin_snapshot[1]
origin_branches_url = reverse('browse-origin-branches',
url_args=url_args)
assert_contains(resp, '<a href="%s">Branches (%s)</a>' %
(origin_branches_url, len(origin_branches)))
origin_releases_url = reverse('browse-origin-releases',
url_args=url_args)
nb_releases = len(origin_releases)
if nb_releases > 0:
assert_contains(resp, '<a href="%s">Releases (%s)</a>' %
(origin_releases_url, nb_releases))
assert_contains(resp, '<tr class="swh-release-entry',
count=nb_releases)
for release in origin_releases:
browse_release_url = reverse(
'browse-release',
url_args={'sha1_git': release['id']},
query_params={'origin': origin_info['url']})
browse_revision_url = reverse(
'browse-revision',
url_args={'sha1_git': release['target']},
query_params={'origin': origin_info['url']})
assert_contains(resp, '<a href="%s">' %
escape(browse_release_url))
assert_contains(resp, '<a href="%s">' %
escape(browse_revision_url))
+
+
+@given(new_origin(), visit_dates(), revisions(min_size=10, max_size=10),
+ existing_release())
+def test_origin_branches_pagination_with_alias(client, archive_data, mocker,
+ new_origin, visit_dates,
+ revisions, existing_release):
+ """
+ When a snapshot contains a branch or a release alias, pagination links
+ in the branches / releases view should be displayed.
+ """
+ mocker.patch('swh.web.browse.views.utils.snapshot_context.PER_PAGE',
+ len(revisions) / 2)
+ snp_dict = {'branches': {}, 'id': hash_to_bytes(random_sha1())}
+ for i in range(len(revisions)):
+ branch = ''.join(random.choices(string.ascii_lowercase, k=8))
+ snp_dict['branches'][branch.encode()] = {
+ 'target_type': 'revision',
+ 'target': hash_to_bytes(revisions[i]),
+ }
+ release = ''.join(random.choices(string.ascii_lowercase, k=8))
+ snp_dict['branches'][b'RELEASE_ALIAS'] = {
+ 'target_type': 'alias',
+ 'target': release.encode()
+ }
+ snp_dict['branches'][release.encode()] = {
+ 'target_type': 'release',
+ 'target': hash_to_bytes(existing_release)
+ }
+ new_origin = archive_data.origin_add([new_origin])[0]
+ archive_data.snapshot_add([Snapshot.from_dict(snp_dict)])
+ visit = archive_data.origin_visit_add(
+ new_origin['url'], visit_dates[0], type='git')
+ archive_data.origin_visit_update(new_origin['url'], visit.visit,
+ status='full',
+ snapshot=snp_dict['id'])
+
+ url = reverse('browse-origin-branches',
+ url_args={'origin_url': new_origin['url']})
+ resp = client.get(url)
+ assert resp.status_code == 200
+ assert_template_used(resp, 'browse/branches.html')
+ assert_contains(resp, '<ul class="pagination')
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Thu, Sep 18, 5:02 PM (1 d, 22 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3247072
Attached To
rDWAPPS Web applications
Event Timeline
Log In to Comment