diff --git a/swh/web/browse/views/utils/snapshot_context.py b/swh/web/browse/views/utils/snapshot_context.py index f4083e41..d1c814ce 100644 --- a/swh/web/browse/views/utils/snapshot_context.py +++ b/swh/web/browse/views/utils/snapshot_context.py @@ -1,909 +1,925 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information # Utility module implementing Django views for browsing the archive # in a snapshot context. # Its purpose is to factorize code for the views reachable from the # /origin/.* and /snapshot/.* endpoints. from django.shortcuts import render from django.template.defaultfilters import filesizeformat from django.utils.html import escape from swh.model.identifiers import snapshot_identifier from swh.web.browse.utils import ( get_snapshot_context, get_directory_entries, gen_directory_link, gen_revision_link, request_content, gen_content_link, prepare_content_for_display, content_display_max_size, format_log_entries, gen_revision_log_link, gen_release_link, get_readme_to_display, get_swh_persistent_ids, gen_snapshot_link, process_snapshot_branches ) from swh.web.common import service, highlightjs from swh.web.common.exc import ( handle_view_exception, NotFoundExc ) from swh.web.common.utils import ( reverse, gen_path_info, format_utc_iso_date, swh_object_icons ) _empty_snapshot_id = snapshot_identifier({'branches': {}}) def _get_branch(branches, branch_name, snapshot_id): """ Utility function to get a specific branch from a branches list. Its purpose is to get the default HEAD branch as some software origin (e.g those with svn type) does not have it. In that latter case, check if there is a master branch instead and returns it. """ filtered_branches = [b for b in branches if b['name'] == branch_name] if filtered_branches: return filtered_branches[0] elif branch_name == 'HEAD': filtered_branches = [b for b in branches if b['name'].endswith('master')] if filtered_branches: return filtered_branches[0] elif branches: return branches[0] else: # case where a large branches list has been truncated snp = service.lookup_snapshot(snapshot_id, branches_from=branch_name, branches_count=1, target_types=['revision', 'alias']) snp_branch, _ = process_snapshot_branches(snp) if snp_branch and snp_branch[0]['name'] == branch_name: branches.append(snp_branch[0]) return snp_branch[0] -def _get_release(releases, release_name): +def _get_release(releases, release_name, snapshot_id): """ Utility function to get a specific release from a releases list. Returns None if the release can not be found in the list. """ filtered_releases = [r for r in releases if r['name'] == release_name] if filtered_releases: return filtered_releases[0] - - -def _branch_not_found(branch_type, branch, branches, snapshot_id=None, - origin_info=None, timestamp=None, visit_id=None): + else: + # case where a large branches list has been truncated + for branch_name in (release_name, f'refs/tags/{release_name}'): + snp = service.lookup_snapshot(snapshot_id, + branches_from=branch_name, + branches_count=1, + target_types=['release']) + _, snp_release = process_snapshot_branches(snp) + if snp_release and snp_release[0]['name'] == release_name: + releases.append(snp_release[0]) + return snp_release[0] + + +def _branch_not_found(branch_type, branch, snapshot_id, snapshot_size, + origin_info, timestamp, visit_id): """ Utility function to raise an exception when a specified branch/release can not be found. """ if branch_type == 'branch': branch_type = 'Branch' branch_type_plural = 'branches' + target_type = 'revision' else: branch_type = 'Release' branch_type_plural = 'releases' + target_type = 'release' - if snapshot_id and not branches: + if snapshot_id and snapshot_size[target_type] == 0: msg = ('Snapshot with id %s has an empty list' ' of %s!' % (snapshot_id, branch_type_plural)) elif snapshot_id: msg = ('%s %s for snapshot with id %s' ' not found!' % (branch_type, branch, snapshot_id)) - elif visit_id and not branches: + elif visit_id and snapshot_size[target_type] == 0: msg = ('Origin with url %s' ' for visit with id %s has an empty list' ' of %s!' % (origin_info['url'], visit_id, branch_type_plural)) elif visit_id: msg = ('%s %s associated to visit with' ' id %s for origin with url %s' ' not found!' % (branch_type, branch, visit_id, origin_info['url'])) - elif not branches: + elif snapshot_size[target_type] == 0: msg = ('Origin with url %s' ' for visit with timestamp %s has an empty list' ' of %s!' % (origin_info['url'], timestamp, branch_type_plural)) else: msg = ('%s %s associated to visit with' ' timestamp %s for origin with ' 'url %s not found!' % (branch_type, branch, timestamp, origin_info['url'])) raise NotFoundExc(escape(msg)) def _process_snapshot_request(request, snapshot_id=None, origin_url=None, timestamp=None, path=None, browse_context='directory'): """ Utility function to perform common input request processing for snapshot context views. """ visit_id = request.GET.get('visit_id', None) snapshot_context = get_snapshot_context(snapshot_id, origin_url, timestamp, visit_id) swh_type = snapshot_context['swh_type'] origin_info = snapshot_context['origin_info'] branches = snapshot_context['branches'] releases = snapshot_context['releases'] url_args = snapshot_context['url_args'] query_params = snapshot_context['query_params'] if snapshot_context['visit_info']: timestamp = format_utc_iso_date(snapshot_context['visit_info']['date'], '%Y-%m-%dT%H:%M:%SZ') snapshot_context['timestamp'] = format_utc_iso_date( snapshot_context['visit_info']['date']) browse_view_name = 'browse-' + swh_type + '-' + browse_context root_sha1_git = None revision_id = request.GET.get('revision', None) release_name = request.GET.get('release', None) release_id = None branch_name = None - snapshot_total_size = sum(snapshot_context['snapshot_size'].values()) + snapshot_size = snapshot_context['snapshot_size'] + snapshot_total_size = sum(snapshot_size.values()) if snapshot_total_size and revision_id: revision = service.lookup_revision(revision_id) root_sha1_git = revision['directory'] branches.append({'name': revision_id, 'revision': revision_id, 'directory': root_sha1_git, 'url': None}) branch_name = revision_id query_params['revision'] = revision_id elif snapshot_total_size and release_name: - release = _get_release(releases, release_name) + release = _get_release(releases, release_name, + snapshot_context['snapshot_id']) try: root_sha1_git = release['directory'] revision_id = release['target'] release_id = release['id'] query_params['release'] = release_name except Exception: - _branch_not_found("release", release_name, releases, snapshot_id, - origin_info, timestamp, visit_id) + _branch_not_found('release', release_name, snapshot_id, + snapshot_size, origin_info, timestamp, visit_id) elif snapshot_total_size: branch_name = request.GET.get('branch', None) if branch_name: query_params['branch'] = branch_name branch = _get_branch(branches, branch_name or 'HEAD', snapshot_context['snapshot_id']) try: branch_name = branch['name'] revision_id = branch['revision'] root_sha1_git = branch['directory'] except Exception: - _branch_not_found("branch", branch_name, branches, snapshot_id, - origin_info, timestamp, visit_id) + _branch_not_found('branch', branch_name, snapshot_id, + snapshot_size, origin_info, timestamp, visit_id) for b in branches: branch_url_args = dict(url_args) branch_query_params = dict(query_params) if 'release' in branch_query_params: del branch_query_params['release'] branch_query_params['branch'] = b['name'] if path: b['path'] = path branch_url_args['path'] = path b['url'] = reverse(browse_view_name, url_args=branch_url_args, query_params=branch_query_params) for r in releases: release_url_args = dict(url_args) release_query_params = dict(query_params) if 'branch' in release_query_params: del release_query_params['branch'] release_query_params['release'] = r['name'] if path: r['path'] = path release_url_args['path'] = path r['url'] = reverse(browse_view_name, url_args=release_url_args, query_params=release_query_params) snapshot_context['query_params'] = query_params snapshot_context['root_sha1_git'] = root_sha1_git snapshot_context['revision_id'] = revision_id snapshot_context['branch'] = branch_name snapshot_context['release'] = release_name snapshot_context['release_id'] = release_id return snapshot_context def browse_snapshot_directory(request, snapshot_id=None, origin_url=None, timestamp=None, path=None): """ Django view implementation for browsing a directory in a snapshot context. """ try: snapshot_context = _process_snapshot_request( request, snapshot_id, origin_url, timestamp, path, browse_context='directory') root_sha1_git = snapshot_context['root_sha1_git'] sha1_git = root_sha1_git if root_sha1_git and path: dir_info = service.lookup_directory_with_path(root_sha1_git, path) sha1_git = dir_info['target'] dirs = [] files = [] if sha1_git: dirs, files = get_directory_entries(sha1_git) except Exception as exc: return handle_view_exception(request, exc) swh_type = snapshot_context['swh_type'] origin_info = snapshot_context['origin_info'] visit_info = snapshot_context['visit_info'] url_args = snapshot_context['url_args'] query_params = snapshot_context['query_params'] revision_id = snapshot_context['revision_id'] snapshot_id = snapshot_context['snapshot_id'] path_info = gen_path_info(path) browse_view_name = 'browse-' + swh_type + '-directory' breadcrumbs = [] if root_sha1_git: breadcrumbs.append({'name': root_sha1_git[:7], 'url': reverse(browse_view_name, url_args=url_args, query_params=query_params)}) for pi in path_info: bc_url_args = dict(url_args) bc_url_args['path'] = pi['path'] breadcrumbs.append({'name': pi['name'], 'url': reverse(browse_view_name, url_args=bc_url_args, query_params=query_params)}) path = '' if path is None else (path + '/') for d in dirs: if d['type'] == 'rev': d['url'] = reverse('browse-revision', url_args={'sha1_git': d['target']}) else: bc_url_args = dict(url_args) bc_url_args['path'] = path + d['name'] d['url'] = reverse(browse_view_name, url_args=bc_url_args, query_params=query_params) sum_file_sizes = 0 readmes = {} browse_view_name = 'browse-' + swh_type + '-content' for f in files: bc_url_args = dict(url_args) bc_url_args['path'] = path + f['name'] f['url'] = reverse(browse_view_name, url_args=bc_url_args, query_params=query_params) if f['length'] is not None: sum_file_sizes += f['length'] f['length'] = filesizeformat(f['length']) if f['name'].lower().startswith('readme'): readmes[f['name']] = f['checksums']['sha1'] readme_name, readme_url, readme_html = get_readme_to_display(readmes) browse_view_name = 'browse-' + swh_type + '-log' history_url = None if snapshot_id != _empty_snapshot_id: history_url = reverse(browse_view_name, url_args=url_args, query_params=query_params) nb_files = None nb_dirs = None dir_path = None if root_sha1_git: nb_files = len(files) nb_dirs = len(dirs) sum_file_sizes = filesizeformat(sum_file_sizes) dir_path = '/' + path browse_dir_link = gen_directory_link(sha1_git) browse_rev_link = gen_revision_link(revision_id) browse_snp_link = gen_snapshot_link(snapshot_id) dir_metadata = {"directory": sha1_git, "context-independent directory": browse_dir_link, "number of regular files": nb_files, "number of subdirectories": nb_dirs, "sum of regular file sizes": sum_file_sizes, "path": dir_path, "revision": revision_id, "context-independent revision": browse_rev_link, "snapshot": snapshot_id, "context-independent snapshot": browse_snp_link} if origin_info: dir_metadata['origin url'] = origin_info['url'] dir_metadata['origin visit date'] = format_utc_iso_date( visit_info['date']) dir_metadata['origin visit type'] = visit_info['type'] vault_cooking = { 'directory_context': True, 'directory_id': sha1_git, 'revision_context': True, 'revision_id': revision_id } swh_objects = [{'type': 'directory', 'id': sha1_git}, {'type': 'revision', 'id': revision_id}, {'type': 'snapshot', 'id': snapshot_id}] release_id = snapshot_context['release_id'] if release_id: swh_objects.append({'type': 'release', 'id': release_id}) browse_rel_link = gen_release_link(release_id) dir_metadata['release'] = release_id dir_metadata['context-independent release'] = browse_rel_link swh_ids = get_swh_persistent_ids(swh_objects, snapshot_context) dir_path = '/'.join([bc['name'] for bc in breadcrumbs]) + '/' context_found = 'snapshot: %s' % snapshot_context['snapshot_id'] if origin_info: context_found = 'origin: %s' % origin_info['url'] heading = ('Directory - %s - %s - %s' % (dir_path, snapshot_context['branch'], context_found)) return render(request, 'browse/directory.html', {'heading': heading, 'swh_object_name': 'Directory', 'swh_object_metadata': dir_metadata, 'dirs': dirs, 'files': files, 'breadcrumbs': breadcrumbs if root_sha1_git else [], 'top_right_link': { 'url': history_url, 'icon': swh_object_icons['revisions history'], 'text': 'History' }, 'readme_name': readme_name, 'readme_url': readme_url, 'readme_html': readme_html, 'snapshot_context': snapshot_context, 'vault_cooking': vault_cooking, 'show_actions_menu': True, 'swh_ids': swh_ids}) def browse_snapshot_content(request, snapshot_id=None, origin_url=None, timestamp=None, path=None, selected_language=None): """ Django view implementation for browsing a content in a snapshot context. """ try: snapshot_context = _process_snapshot_request(request, snapshot_id, origin_url, timestamp, path, browse_context='content') root_sha1_git = snapshot_context['root_sha1_git'] sha1_git = None query_string = None content_data = None + directory_id = None split_path = path.split('/') filename = split_path[-1] filepath = path[:-len(filename)] if root_sha1_git: content_info = service.lookup_directory_with_path(root_sha1_git, path) sha1_git = content_info['target'] query_string = 'sha1_git:' + sha1_git content_data = request_content(query_string, raise_if_unavailable=False) if filepath: dir_info = service.lookup_directory_with_path(root_sha1_git, filepath) directory_id = dir_info['target'] else: directory_id = root_sha1_git except Exception as exc: return handle_view_exception(request, exc) swh_type = snapshot_context['swh_type'] url_args = snapshot_context['url_args'] query_params = snapshot_context['query_params'] revision_id = snapshot_context['revision_id'] origin_info = snapshot_context['origin_info'] visit_info = snapshot_context['visit_info'] snapshot_id = snapshot_context['snapshot_id'] content = None language = None mimetype = None if content_data and content_data['raw_data'] is not None: content_display_data = prepare_content_for_display( content_data['raw_data'], content_data['mimetype'], path) content = content_display_data['content_data'] language = content_display_data['language'] mimetype = content_display_data['mimetype'] # Override language with user-selected language if selected_language is not None: language = selected_language available_languages = None if mimetype and 'text/' in mimetype: available_languages = highlightjs.get_supported_languages() browse_view_name = 'browse-' + swh_type + '-directory' breadcrumbs = [] path_info = gen_path_info(filepath) if root_sha1_git: breadcrumbs.append({'name': root_sha1_git[:7], 'url': reverse(browse_view_name, url_args=url_args, query_params=query_params)}) for pi in path_info: bc_url_args = dict(url_args) bc_url_args['path'] = pi['path'] breadcrumbs.append({'name': pi['name'], 'url': reverse(browse_view_name, url_args=bc_url_args, query_params=query_params)}) breadcrumbs.append({'name': filename, 'url': None}) browse_content_link = gen_content_link(sha1_git) content_raw_url = None if query_string: content_raw_url = reverse('browse-content-raw', url_args={'query_string': query_string}, query_params={'filename': filename}) browse_rev_link = gen_revision_link(revision_id) browse_dir_link = gen_directory_link(directory_id) content_metadata = { 'context-independent content': browse_content_link, 'path': None, 'filename': None, 'directory': directory_id, 'context-independent directory': browse_dir_link, 'revision': revision_id, 'context-independent revision': browse_rev_link, 'snapshot': snapshot_id } cnt_sha1_git = None content_size = None error_code = 200 error_description = '' error_message = '' if content_data: for checksum in content_data['checksums'].keys(): content_metadata[checksum] = content_data['checksums'][checksum] content_metadata['mimetype'] = content_data['mimetype'] content_metadata['encoding'] = content_data['encoding'] content_metadata['size'] = filesizeformat(content_data['length']) content_metadata['language'] = content_data['language'] content_metadata['licenses'] = content_data['licenses'] content_metadata['path'] = '/' + filepath content_metadata['filename'] = filename cnt_sha1_git = content_data['checksums']['sha1_git'] content_size = content_data['length'] error_code = content_data['error_code'] error_message = content_data['error_message'] error_description = content_data['error_description'] if origin_info: content_metadata['origin url'] = origin_info['url'] content_metadata['origin visit date'] = format_utc_iso_date( visit_info['date']) content_metadata['origin visit type'] = visit_info['type'] browse_snapshot_link = gen_snapshot_link(snapshot_id) content_metadata['context-independent snapshot'] = browse_snapshot_link swh_objects = [{'type': 'content', 'id': cnt_sha1_git}, {'type': 'directory', 'id': directory_id}, {'type': 'revision', 'id': revision_id}, {'type': 'snapshot', 'id': snapshot_id}] release_id = snapshot_context['release_id'] if release_id: swh_objects.append({'type': 'release', 'id': release_id}) browse_rel_link = gen_release_link(release_id) content_metadata['release'] = release_id content_metadata['context-independent release'] = browse_rel_link swh_ids = get_swh_persistent_ids(swh_objects, snapshot_context) content_path = '/'.join([bc['name'] for bc in breadcrumbs]) context_found = 'snapshot: %s' % snapshot_context['snapshot_id'] if origin_info: context_found = 'origin: %s' % origin_info['url'] heading = ('Content - %s - %s - %s' % (content_path, snapshot_context['branch'], context_found)) return render(request, 'browse/content.html', {'heading': heading, 'swh_object_name': 'Content', 'swh_object_metadata': content_metadata, 'content': content, 'content_size': content_size, 'max_content_size': content_display_max_size, 'mimetype': mimetype, 'language': language, 'available_languages': available_languages, 'breadcrumbs': breadcrumbs if root_sha1_git else [], 'top_right_link': { 'url': content_raw_url, 'icon': swh_object_icons['content'], 'text': 'Raw File' }, 'snapshot_context': snapshot_context, 'vault_cooking': None, 'show_actions_menu': True, 'swh_ids': swh_ids, 'error_code': error_code, 'error_message': error_message, 'error_description': error_description}, status=error_code) PER_PAGE = 100 def browse_snapshot_log(request, snapshot_id=None, origin_url=None, timestamp=None): """ Django view implementation for browsing a revision history in a snapshot context. """ try: snapshot_context = _process_snapshot_request( request, snapshot_id, origin_url, timestamp, browse_context='log') revision_id = snapshot_context['revision_id'] per_page = int(request.GET.get('per_page', PER_PAGE)) offset = int(request.GET.get('offset', 0)) revs_ordering = request.GET.get('revs_ordering', 'committer_date') session_key = 'rev_%s_log_ordering_%s' % (revision_id, revs_ordering) rev_log_session = request.session.get(session_key, None) rev_log = [] revs_walker_state = None if rev_log_session: rev_log = rev_log_session['rev_log'] revs_walker_state = rev_log_session['revs_walker_state'] if len(rev_log) < offset+per_page: revs_walker = service.get_revisions_walker( revs_ordering, revision_id, max_revs=offset+per_page+1, state=revs_walker_state) rev_log += [rev['id'] for rev in revs_walker] revs_walker_state = revs_walker.export_state() revs = rev_log[offset:offset+per_page] revision_log = service.lookup_revision_multiple(revs) request.session[session_key] = { 'rev_log': rev_log, 'revs_walker_state': revs_walker_state } except Exception as exc: return handle_view_exception(request, exc) swh_type = snapshot_context['swh_type'] origin_info = snapshot_context['origin_info'] visit_info = snapshot_context['visit_info'] url_args = snapshot_context['url_args'] query_params = snapshot_context['query_params'] snapshot_id = snapshot_context['snapshot_id'] query_params['per_page'] = per_page revs_ordering = request.GET.get('revs_ordering', '') query_params['revs_ordering'] = revs_ordering browse_view_name = 'browse-' + swh_type + '-log' prev_log_url = None if len(rev_log) > offset + per_page: query_params['offset'] = offset + per_page prev_log_url = reverse(browse_view_name, url_args=url_args, query_params=query_params) next_log_url = None if offset != 0: query_params['offset'] = offset - per_page next_log_url = reverse(browse_view_name, url_args=url_args, query_params=query_params) revision_log_data = format_log_entries(revision_log, per_page, snapshot_context) browse_rev_link = gen_revision_link(revision_id) browse_log_link = gen_revision_log_link(revision_id) browse_snp_link = gen_snapshot_link(snapshot_id) revision_metadata = { 'context-independent revision': browse_rev_link, 'context-independent revision history': browse_log_link, 'context-independent snapshot': browse_snp_link, 'snapshot': snapshot_id } if origin_info: revision_metadata['origin url'] = origin_info['url'] revision_metadata['origin visit date'] = format_utc_iso_date( visit_info['date']) revision_metadata['origin visit type'] = visit_info['type'] swh_objects = [{'type': 'revision', 'id': revision_id}, {'type': 'snapshot', 'id': snapshot_id}] release_id = snapshot_context['release_id'] if release_id: swh_objects.append({'type': 'release', 'id': release_id}) browse_rel_link = gen_release_link(release_id) revision_metadata['release'] = release_id revision_metadata['context-independent release'] = browse_rel_link swh_ids = get_swh_persistent_ids(swh_objects, snapshot_context) context_found = 'snapshot: %s' % snapshot_context['snapshot_id'] if origin_info: context_found = 'origin: %s' % origin_info['url'] heading = ('Revision history - %s - %s' % (snapshot_context['branch'], context_found)) return render(request, 'browse/revision-log.html', {'heading': heading, 'swh_object_name': 'Revisions history', 'swh_object_metadata': revision_metadata, 'revision_log': revision_log_data, 'revs_ordering': revs_ordering, 'next_log_url': next_log_url, 'prev_log_url': prev_log_url, 'breadcrumbs': None, 'top_right_link': None, 'snapshot_context': snapshot_context, 'vault_cooking': None, 'show_actions_menu': True, 'swh_ids': swh_ids}) def browse_snapshot_branches(request, snapshot_id=None, origin_url=None, timestamp=None): """ Django view implementation for browsing a list of branches in a snapshot context. """ try: snapshot_context = _process_snapshot_request(request, snapshot_id, origin_url, timestamp) branches_bc = request.GET.get('branches_breadcrumbs', '') branches_bc = branches_bc.split(',') if branches_bc else [] branches_from = branches_bc[-1] if branches_bc else '' swh_type = snapshot_context['swh_type'] origin_info = snapshot_context['origin_info'] url_args = snapshot_context['url_args'] query_params = snapshot_context['query_params'] browse_view_name = 'browse-' + swh_type + '-directory' snapshot = service.lookup_snapshot(snapshot_context['snapshot_id'], branches_from, PER_PAGE+1, target_types=['revision', 'alias']) displayed_branches, _ = process_snapshot_branches(snapshot) except Exception as exc: return handle_view_exception(request, exc) for branch in displayed_branches: if snapshot_id: revision_url = reverse('browse-revision', url_args={'sha1_git': branch['revision']}, query_params={'snapshot_id': snapshot_id}) else: revision_url = reverse('browse-revision', url_args={'sha1_git': branch['revision']}, query_params={'origin': origin_info['url']}) query_params['branch'] = branch['name'] directory_url = reverse(browse_view_name, url_args=url_args, query_params=query_params) del query_params['branch'] branch['revision_url'] = revision_url branch['directory_url'] = directory_url browse_view_name = 'browse-' + swh_type + '-branches' prev_branches_url = None next_branches_url = None if branches_bc: query_params_prev = dict(query_params) query_params_prev['branches_breadcrumbs'] = ','.join(branches_bc[:-1]) prev_branches_url = reverse(browse_view_name, url_args=url_args, query_params=query_params_prev) elif branches_from: prev_branches_url = reverse(browse_view_name, url_args=url_args, query_params=query_params) if len(displayed_branches) > PER_PAGE: query_params_next = dict(query_params) next_branch = displayed_branches[-1]['name'] del displayed_branches[-1] branches_bc.append(next_branch) query_params_next['branches_breadcrumbs'] = ','.join(branches_bc) next_branches_url = reverse(browse_view_name, url_args=url_args, query_params=query_params_next) heading = 'Branches - ' if origin_info: heading += 'origin: %s' % origin_info['url'] else: heading += 'snapshot: %s' % snapshot_id return render(request, 'browse/branches.html', {'heading': heading, 'swh_object_name': 'Branches', 'swh_object_metadata': {}, 'top_right_link': None, 'displayed_branches': displayed_branches, 'prev_branches_url': prev_branches_url, 'next_branches_url': next_branches_url, 'snapshot_context': snapshot_context}) def browse_snapshot_releases(request, snapshot_id=None, origin_url=None, timestamp=None): """ Django view implementation for browsing a list of releases in a snapshot context. """ try: snapshot_context = _process_snapshot_request(request, snapshot_id, origin_url, timestamp) rel_bc = request.GET.get('releases_breadcrumbs', '') rel_bc = rel_bc.split(',') if rel_bc else [] rel_from = rel_bc[-1] if rel_bc else '' swh_type = snapshot_context['swh_type'] origin_info = snapshot_context['origin_info'] url_args = snapshot_context['url_args'] query_params = snapshot_context['query_params'] snapshot = service.lookup_snapshot(snapshot_context['snapshot_id'], rel_from, PER_PAGE+1, target_types=['release', 'alias']) _, displayed_releases = process_snapshot_branches(snapshot) except Exception as exc: return handle_view_exception(request, exc) for release in displayed_releases: if snapshot_id: query_params_tgt = {'snapshot_id': snapshot_id} else: query_params_tgt = {'origin': origin_info['url']} release_url = reverse('browse-release', url_args={'sha1_git': release['id']}, query_params=query_params_tgt) target_url = '' if release['target_type'] == 'revision': target_url = reverse('browse-revision', url_args={'sha1_git': release['target']}, query_params=query_params_tgt) elif release['target_type'] == 'directory': target_url = reverse('browse-directory', url_args={'sha1_git': release['target']}, query_params=query_params_tgt) elif release['target_type'] == 'content': target_url = reverse('browse-content', url_args={'query_string': release['target']}, query_params=query_params_tgt) elif release['target_type'] == 'release': target_url = reverse('browse-release', url_args={'sha1_git': release['target']}, query_params=query_params_tgt) release['release_url'] = release_url release['target_url'] = target_url browse_view_name = 'browse-' + swh_type + '-releases' prev_releases_url = None next_releases_url = None if rel_bc: query_params_prev = dict(query_params) query_params_prev['releases_breadcrumbs'] = ','.join(rel_bc[:-1]) prev_releases_url = reverse(browse_view_name, url_args=url_args, query_params=query_params_prev) elif rel_from: prev_releases_url = reverse(browse_view_name, url_args=url_args, query_params=query_params) if len(displayed_releases) > PER_PAGE: query_params_next = dict(query_params) next_rel = displayed_releases[-1]['branch_name'] del displayed_releases[-1] rel_bc.append(next_rel) query_params_next['releases_breadcrumbs'] = ','.join(rel_bc) next_releases_url = reverse(browse_view_name, url_args=url_args, query_params=query_params_next) heading = 'Releases - ' if origin_info: heading += 'origin: %s' % origin_info['url'] else: heading += 'snapshot: %s' % snapshot_id return render(request, 'browse/releases.html', {'heading': heading, 'top_panel_visible': False, 'top_panel_collapsible': False, 'swh_object_name': 'Releases', 'swh_object_metadata': {}, 'top_right_link': None, 'displayed_releases': displayed_releases, 'prev_releases_url': prev_releases_url, 'next_releases_url': next_releases_url, 'snapshot_context': snapshot_context, 'vault_cooking': None, 'show_actions_menu': False}) diff --git a/swh/web/tests/browse/views/test_origin.py b/swh/web/tests/browse/views/test_origin.py index cbb4a3a9..8c5c3b40 100644 --- a/swh/web/tests/browse/views/test_origin.py +++ b/swh/web/tests/browse/views/test_origin.py @@ -1,851 +1,900 @@ # Copyright (C) 2017-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random import re +import swh.web.browse.utils + from django.utils.html import escape from hypothesis import given from swh.model.hashutil import hash_to_bytes from swh.web.browse.utils import process_snapshot_branches from swh.web.common.exc import NotFoundExc from swh.web.common.utils import ( reverse, gen_path_info, format_utc_iso_date, parse_timestamp, get_swh_persistent_id ) from swh.web.tests.data import get_content from swh.web.tests.django_asserts import assert_contains, assert_template_used from swh.web.tests.strategies import ( origin, origin_with_multiple_visits, new_origin, - new_snapshot, visit_dates, revisions + new_snapshot, visit_dates, revisions, origin_with_releases ) @given(origin_with_multiple_visits()) def test_origin_visits_browse(client, archive_data, origin): url = reverse('browse-origin-visits', url_args={'origin_url': origin['url']}) resp = client.get(url) assert resp.status_code == 200 assert_template_used('origin-visits.html') url = reverse('browse-origin-visits', url_args={'origin_url': origin['url']}) resp = client.get(url) assert resp.status_code == 200 assert_template_used('origin-visits.html') visits = archive_data.origin_visit_get(origin['url']) for v in visits: vdate = format_utc_iso_date(v['date'], '%Y-%m-%dT%H:%M:%SZ') browse_dir_url = reverse('browse-origin-directory', url_args={'origin_url': origin['url'], 'timestamp': vdate}) assert_contains(resp, browse_dir_url) @given(origin_with_multiple_visits()) def test_origin_content_view(client, archive_data, origin): origin_visits = archive_data.origin_visit_get(origin['url']) def _get_archive_data(visit_idx): snapshot = archive_data.snapshot_get( origin_visits[visit_idx]['snapshot']) head_rev_id = archive_data.snapshot_get_head(snapshot) head_rev = archive_data.revision_get(head_rev_id) dir_content = archive_data.directory_ls(head_rev['directory']) dir_files = [e for e in dir_content if e['type'] == 'file'] dir_file = random.choice(dir_files) branches, releases = process_snapshot_branches(snapshot) return { 'branches': branches, 'releases': releases, 'root_dir_sha1': head_rev['directory'], 'content': get_content(dir_file['checksums']['sha1']), 'visit': origin_visits[visit_idx] } tdata = _get_archive_data(-1) _origin_content_view_test_helper(client, origin, origin_visits, tdata['branches'], tdata['releases'], tdata['root_dir_sha1'], tdata['content']) _origin_content_view_test_helper(client, origin, origin_visits, tdata['branches'], tdata['releases'], tdata['root_dir_sha1'], tdata['content'], timestamp=tdata['visit']['date']) visit_unix_ts = parse_timestamp(tdata['visit']['date']).timestamp() visit_unix_ts = int(visit_unix_ts) _origin_content_view_test_helper(client, origin, origin_visits, tdata['branches'], tdata['releases'], tdata['root_dir_sha1'], tdata['content'], timestamp=visit_unix_ts) tdata = _get_archive_data(0) _origin_content_view_test_helper(client, origin, origin_visits, tdata['branches'], tdata['releases'], tdata['root_dir_sha1'], tdata['content'], visit_id=tdata['visit']['visit']) @given(origin()) def test_origin_root_directory_view(client, archive_data, origin): origin_visits = archive_data.origin_visit_get(origin['url']) visit = origin_visits[-1] snapshot = archive_data.snapshot_get(visit['snapshot']) head_rev_id = archive_data.snapshot_get_head(snapshot) head_rev = archive_data.revision_get(head_rev_id) root_dir_sha1 = head_rev['directory'] dir_content = archive_data.directory_ls(root_dir_sha1) branches, releases = process_snapshot_branches(snapshot) visit_unix_ts = parse_timestamp(visit['date']).timestamp() visit_unix_ts = int(visit_unix_ts) _origin_directory_view_test_helper(client, origin, origin_visits, branches, releases, root_dir_sha1, dir_content) _origin_directory_view_test_helper(client, origin, origin_visits, branches, releases, root_dir_sha1, dir_content, visit_id=visit['visit']) _origin_directory_view_test_helper(client, origin, origin_visits, branches, releases, root_dir_sha1, dir_content, timestamp=visit_unix_ts) _origin_directory_view_test_helper(client, origin, origin_visits, branches, releases, root_dir_sha1, dir_content, timestamp=visit['date']) origin = dict(origin) del origin['type'] _origin_directory_view_test_helper(client, origin, origin_visits, branches, releases, root_dir_sha1, dir_content) _origin_directory_view_test_helper(client, origin, origin_visits, branches, releases, root_dir_sha1, dir_content, visit_id=visit['visit']) _origin_directory_view_test_helper(client, origin, origin_visits, branches, releases, root_dir_sha1, dir_content, timestamp=visit_unix_ts) _origin_directory_view_test_helper(client, origin, origin_visits, branches, releases, root_dir_sha1, dir_content, timestamp=visit['date']) @given(origin()) def test_origin_sub_directory_view(client, archive_data, origin): origin_visits = archive_data.origin_visit_get(origin['url']) visit = origin_visits[-1] snapshot = archive_data.snapshot_get(visit['snapshot']) head_rev_id = archive_data.snapshot_get_head(snapshot) head_rev = archive_data.revision_get(head_rev_id) root_dir_sha1 = head_rev['directory'] subdirs = [e for e in archive_data.directory_ls(root_dir_sha1) if e['type'] == 'dir'] branches, releases = process_snapshot_branches(snapshot) visit_unix_ts = parse_timestamp(visit['date']).timestamp() visit_unix_ts = int(visit_unix_ts) if len(subdirs) == 0: return subdir = random.choice(subdirs) subdir_content = archive_data.directory_ls(subdir['target']) subdir_path = subdir['name'] _origin_directory_view_test_helper(client, origin, origin_visits, branches, releases, root_dir_sha1, subdir_content, path=subdir_path) _origin_directory_view_test_helper(client, origin, origin_visits, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, visit_id=visit['visit']) _origin_directory_view_test_helper(client, origin, origin_visits, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, timestamp=visit_unix_ts) _origin_directory_view_test_helper(client, origin, origin_visits, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, timestamp=visit['date']) origin = dict(origin) del origin['type'] _origin_directory_view_test_helper(client, origin, origin_visits, branches, releases, root_dir_sha1, subdir_content, path=subdir_path) _origin_directory_view_test_helper(client, origin, origin_visits, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, visit_id=visit['visit']) _origin_directory_view_test_helper(client, origin, origin_visits, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, timestamp=visit_unix_ts) _origin_directory_view_test_helper(client, origin, origin_visits, branches, releases, root_dir_sha1, subdir_content, path=subdir_path, timestamp=visit['date']) @given(origin()) def test_origin_branches(client, archive_data, origin): origin_visits = archive_data.origin_visit_get(origin['url']) visit = origin_visits[-1] snapshot = archive_data.snapshot_get(visit['snapshot']) snapshot_content = process_snapshot_branches(snapshot) _origin_branches_test_helper(client, origin, snapshot_content) origin = dict(origin) origin['type'] = None _origin_branches_test_helper(client, origin, snapshot_content) @given(origin()) def test_origin_releases(client, archive_data, origin): origin_visits = archive_data.origin_visit_get(origin['url']) visit = origin_visits[-1] snapshot = archive_data.snapshot_get(visit['snapshot']) snapshot_content = process_snapshot_branches(snapshot) _origin_releases_test_helper(client, origin, snapshot_content) origin = dict(origin) origin['type'] = None _origin_releases_test_helper(client, origin, snapshot_content) @given(new_origin(), new_snapshot(min_size=4, max_size=4), visit_dates(), revisions(min_size=3, max_size=3)) def test_origin_snapshot_null_branch(client, archive_data, new_origin, new_snapshot, visit_dates, revisions): snp_dict = new_snapshot.to_dict() new_origin = archive_data.origin_add([new_origin])[0] for i, branch in enumerate(snp_dict['branches'].keys()): if i == 0: snp_dict['branches'][branch] = None else: snp_dict['branches'][branch] = { 'target_type': 'revision', 'target': hash_to_bytes(revisions[i-1]), } archive_data.snapshot_add([snp_dict]) visit = archive_data.origin_visit_add( new_origin['url'], visit_dates[0], type='git') archive_data.origin_visit_update(new_origin['url'], visit['visit'], status='partial', snapshot=snp_dict['id']) url = reverse('browse-origin-directory', url_args={'origin_url': new_origin['url']}) rv = client.get(url) assert rv.status_code == 200 @given(new_origin(), new_snapshot(min_size=4, max_size=4), visit_dates(), revisions(min_size=4, max_size=4)) def test_origin_snapshot_invalid_branch(client, archive_data, new_origin, new_snapshot, visit_dates, revisions): snp_dict = new_snapshot.to_dict() new_origin = archive_data.origin_add([new_origin])[0] for i, branch in enumerate(snp_dict['branches'].keys()): snp_dict['branches'][branch] = { 'target_type': 'revision', 'target': hash_to_bytes(revisions[i]), } archive_data.snapshot_add([snp_dict]) visit = archive_data.origin_visit_add( new_origin['url'], visit_dates[0], type='git') archive_data.origin_visit_update(new_origin['url'], visit['visit'], status='full', snapshot=snp_dict['id']) url = reverse('browse-origin-directory', url_args={'origin_url': new_origin['url']}, query_params={'branch': 'invalid_branch'}) rv = client.get(url) assert rv.status_code == 404 def test_origin_request_errors(client, archive_data, mocker): mock_snapshot_service = mocker.patch( 'swh.web.browse.views.utils.snapshot_context.service') mock_origin_service = mocker.patch('swh.web.browse.views.origin.service') mock_utils_service = mocker.patch('swh.web.browse.utils.service') mock_get_origin_visit_snapshot = mocker.patch( 'swh.web.browse.utils.get_origin_visit_snapshot') mock_get_origin_visits = mocker.patch( 'swh.web.common.origin_visits.get_origin_visits') mock_request_content = mocker.patch( 'swh.web.browse.views.utils.snapshot_context.request_content') mock_origin_service.lookup_origin.side_effect = NotFoundExc( 'origin not found') url = reverse('browse-origin-visits', url_args={'origin_url': 'bar'}) resp = client.get(url) assert resp.status_code == 404 assert_template_used('error.html') assert_contains(resp, 'origin not found', status_code=404) mock_origin_service.lookup_origin.side_effect = None mock_origin_service.lookup_origin.return_value = {'type': 'foo', 'url': 'bar', 'id': 457} mock_get_origin_visits.return_value = [] url = reverse('browse-origin-directory', url_args={'origin_url': 'bar'}) resp = client.get(url) assert resp.status_code == 404 assert_template_used('error.html') assert_contains(resp, "No visit", status_code=404) mock_get_origin_visits.return_value = [{'visit': 1}] mock_get_origin_visit_snapshot.side_effect = NotFoundExc('visit not found') url = reverse('browse-origin-directory', url_args={'origin_url': 'bar'}, query_params={'visit_id': 2}) resp = client.get(url) assert resp.status_code == 404 assert_template_used('error.html') assert re.search('Visit.*not found', resp.content.decode('utf-8')) mock_get_origin_visits.return_value = [{ 'date': '2015-09-26T09:30:52.373449+00:00', 'metadata': {}, 'origin': 457, 'snapshot': 'bdaf9ac436488a8c6cda927a0f44e172934d3f65', 'status': 'full', 'visit': 1 }] mock_get_origin_visit_snapshot.side_effect = None mock_get_origin_visit_snapshot.return_value = ( [{'directory': 'ae59ceecf46367e8e4ad800e231fc76adc3afffb', 'name': 'HEAD', 'revision': '7bc08e1aa0b08cb23e18715a32aa38517ad34672', 'date': '04 May 2017, 13:27 UTC', 'message': ''}], [] ) mock_utils_service.lookup_snapshot_size.return_value = { 'revision': 1, 'release': 0 } mock_lookup_directory = mock_utils_service.lookup_directory mock_lookup_directory.side_effect = NotFoundExc('Directory not found') url = reverse('browse-origin-directory', url_args={'origin_url': 'bar'}) resp = client.get(url) assert resp.status_code == 404 assert_template_used('error.html') assert_contains(resp, 'Directory not found', status_code=404) mock_origin_service.lookup_origin.side_effect = None mock_origin_service.lookup_origin.return_value = {'type': 'foo', 'url': 'bar', 'id': 457} mock_get_origin_visits.return_value = [] url = reverse('browse-origin-content', url_args={'origin_url': 'bar', 'path': 'foo'}) resp = client.get(url) assert resp.status_code == 404 assert_template_used('error.html') assert_contains(resp, "No visit", status_code=404) mock_get_origin_visits.return_value = [{'visit': 1}] mock_get_origin_visit_snapshot.side_effect = NotFoundExc('visit not found') url = reverse('browse-origin-content', url_args={'origin_url': 'bar', 'path': 'foo'}, query_params={'visit_id': 2}) resp = client.get(url) assert resp.status_code == 404 assert_template_used('error.html') assert re.search('Visit.*not found', resp.content.decode('utf-8')) mock_get_origin_visits.return_value = [{ 'date': '2015-09-26T09:30:52.373449+00:00', 'metadata': {}, 'origin': 457, 'snapshot': 'bdaf9ac436488a8c6cda927a0f44e172934d3f65', 'status': 'full', + 'type': 'git', 'visit': 1 }] mock_get_origin_visit_snapshot.side_effect = None mock_get_origin_visit_snapshot.return_value = ([], []) + mock_utils_service.lookup_snapshot_size.return_value = { + 'revision': 0, + 'release': 0 + } + mock_utils_service.lookup_origin.return_value = {'type': 'foo', + 'url': 'bar', + 'id': 457} url = reverse('browse-origin-content', url_args={'origin_url': 'bar', 'path': 'baz'}) resp = client.get(url) - assert resp.status_code == 404 + assert resp.status_code == 200 assert_template_used('error.html') - assert re.search('Origin.*has an empty list of branches', - resp.content.decode('utf-8')) + assert re.search('snapshot.*is empty', resp.content.decode('utf-8')) mock_get_origin_visit_snapshot.return_value = ( [{'directory': 'ae59ceecf46367e8e4ad800e231fc76adc3afffb', 'name': 'HEAD', 'revision': '7bc08e1aa0b08cb23e18715a32aa38517ad34672', 'date': '04 May 2017, 13:27 UTC', 'message': ''}], [] ) + mock_utils_service.lookup_snapshot_size.return_value = { + 'revision': 1, + 'release': 0 + } mock_snapshot_service.lookup_directory_with_path.return_value = { 'target': '5ecd9f37b7a2d2e9980d201acd6286116f2ba1f1' } mock_request_content.side_effect = NotFoundExc('Content not found') url = reverse('browse-origin-content', url_args={'origin_url': 'bar', 'path': 'baz'}) resp = client.get(url) assert resp.status_code == 404 assert_template_used('error.html') assert_contains(resp, 'Content not found', status_code=404) mock_get_snapshot_context = mocker.patch( 'swh.web.browse.views.utils.snapshot_context.get_snapshot_context') mock_get_snapshot_context.side_effect = NotFoundExc('Snapshot not found') url = reverse('browse-origin-directory', url_args={'origin_url': 'bar'}) resp = client.get(url) assert resp.status_code == 404 assert_template_used('error.html') assert_contains(resp, 'Snapshot not found', status_code=404) def test_origin_empty_snapshot(client, mocker): mock_utils_service = mocker.patch('swh.web.browse.utils.service') mock_get_origin_visit_snapshot = mocker.patch( 'swh.web.browse.utils.get_origin_visit_snapshot') mock_get_origin_visits = mocker.patch( 'swh.web.common.origin_visits.get_origin_visits') mock_get_origin_visits.return_value = [{ 'date': '2015-09-26T09:30:52.373449+00:00', 'metadata': {}, 'origin': 457, 'snapshot': 'bdaf9ac436488a8c6cda927a0f44e172934d3f65', 'status': 'full', 'type': 'git', 'visit': 1 }] mock_get_origin_visit_snapshot.return_value = ([], []) mock_utils_service.lookup_snapshot_size.return_value = { 'revision': 0, 'release': 0 } mock_utils_service.lookup_origin.return_value = { 'id': 457, 'url': 'https://github.com/foo/bar' } url = reverse('browse-origin-directory', url_args={'origin_url': 'bar'}) resp = client.get(url) assert resp.status_code == 200 assert_template_used('content.html') assert re.search('snapshot.*is empty', resp.content.decode('utf-8')) +@given(origin_with_releases()) +def test_origin_release_browse(client, archive_data, origin): + # for swh.web.browse.utils.get_snapshot_content to only return one branch + snapshot_max_size = swh.web.browse.utils.snapshot_content_max_size + swh.web.browse.utils.snapshot_content_max_size = 1 + try: + snapshot = archive_data.snapshot_get_latest(origin['url']) + release = [b for b in snapshot['branches'].values() + if b['target_type'] == 'release'][-1] + release_data = archive_data.release_get(release['target']) + url = reverse('browse-origin-directory', + url_args={'origin_url': origin['url']}, + query_params={'release': release_data['name']}) + + resp = client.get(url) + assert resp.status_code == 200 + assert_contains(resp, release_data['name']) + assert_contains(resp, release['target']) + finally: + swh.web.browse.utils.snapshot_content_max_size = snapshot_max_size + + +@given(origin_with_releases()) +def test_origin_release_browse_not_found(client, archive_data, origin): + + invalid_release_name = 'swh-foo-bar' + url = reverse('browse-origin-directory', + url_args={'origin_url': origin['url']}, + query_params={'release': invalid_release_name}) + + resp = client.get(url) + assert resp.status_code == 404 + assert re.search(f'Release {invalid_release_name}.*not found', + resp.content.decode('utf-8')) + + def _origin_content_view_test_helper(client, origin_info, origin_visits, origin_branches, origin_releases, root_dir_sha1, content, visit_id=None, timestamp=None): content_path = '/'.join(content['path'].split('/')[1:]) url_args = {'origin_url': origin_info['url'], 'path': content_path} if not visit_id: visit_id = origin_visits[-1]['visit'] query_params = {} if timestamp: url_args['timestamp'] = timestamp if visit_id: query_params['visit_id'] = visit_id url = reverse('browse-origin-content', url_args=url_args, query_params=query_params) resp = client.get(url) assert resp.status_code == 200 assert_template_used('content.html') assert_contains(resp, '' % content['hljs_language']) assert_contains(resp, escape(content['data'])) split_path = content_path.split('/') filename = split_path[-1] path = content_path.replace(filename, '')[:-1] path_info = gen_path_info(path) del url_args['path'] if timestamp: url_args['timestamp'] = format_utc_iso_date( parse_timestamp(timestamp).isoformat(), '%Y-%m-%dT%H:%M:%S') root_dir_url = reverse('browse-origin-directory', url_args=url_args, query_params=query_params) assert_contains(resp, '
  • ', count=len(path_info)+1) assert_contains(resp, '%s' % (root_dir_url, root_dir_sha1[:7])) for p in path_info: url_args['path'] = p['path'] dir_url = reverse('browse-origin-directory', url_args=url_args, query_params=query_params) assert_contains(resp, '%s' % (dir_url, p['name'])) assert_contains(resp, '
  • %s
  • ' % filename) query_string = 'sha1_git:' + content['sha1_git'] url_raw = reverse('browse-content-raw', url_args={'query_string': query_string}, query_params={'filename': filename}) assert_contains(resp, url_raw) if 'args' in url_args: del url_args['path'] origin_branches_url = reverse('browse-origin-branches', url_args=url_args, query_params=query_params) assert_contains(resp, 'Branches (%s)' % (origin_branches_url, len(origin_branches))) origin_releases_url = reverse('browse-origin-releases', url_args=url_args, query_params=query_params) assert_contains(resp, 'Releases (%s)' % (origin_releases_url, len(origin_releases))) assert_contains(resp, '
  • ', count=len(origin_branches)) url_args['path'] = content_path for branch in origin_branches: query_params['branch'] = branch['name'] root_dir_branch_url = reverse('browse-origin-content', url_args=url_args, query_params=query_params) assert_contains(resp, '' % root_dir_branch_url) assert_contains(resp, '
  • ', count=len(origin_releases)) query_params['branch'] = None for release in origin_releases: query_params['release'] = release['name'] root_dir_release_url = reverse('browse-origin-content', url_args=url_args, query_params=query_params) assert_contains(resp, '' % root_dir_release_url) url = reverse('browse-origin-content', url_args=url_args, query_params=query_params) resp = client.get(url) assert resp.status_code == 200 assert_template_used('content.html') swh_cnt_id = get_swh_persistent_id('content', content['sha1_git']) swh_cnt_id_url = reverse('browse-swh-id', url_args={'swh_id': swh_cnt_id}) assert_contains(resp, swh_cnt_id) assert_contains(resp, swh_cnt_id_url) assert_contains(resp, 'swh-take-new-snapshot') def _origin_directory_view_test_helper(client, origin_info, origin_visits, origin_branches, origin_releases, root_directory_sha1, directory_entries, visit_id=None, timestamp=None, path=None): dirs = [e for e in directory_entries if e['type'] in ('dir', 'rev')] files = [e for e in directory_entries if e['type'] == 'file'] if not visit_id: visit_id = origin_visits[-1]['visit'] url_args = {'origin_url': origin_info['url']} query_params = {} if timestamp: url_args['timestamp'] = timestamp else: query_params['visit_id'] = visit_id if path: url_args['path'] = path url = reverse('browse-origin-directory', url_args=url_args, query_params=query_params) resp = client.get(url) assert resp.status_code == 200 assert_template_used('directory.html') assert resp.status_code == 200 assert_template_used('directory.html') assert_contains(resp, '', count=len(dirs)) assert_contains(resp, '', count=len(files)) if timestamp: url_args['timestamp'] = format_utc_iso_date( parse_timestamp(timestamp).isoformat(), '%Y-%m-%dT%H:%M:%S') for d in dirs: if d['type'] == 'rev': dir_url = reverse('browse-revision', url_args={'sha1_git': d['target']}) else: dir_path = d['name'] if path: dir_path = "%s/%s" % (path, d['name']) dir_url_args = dict(url_args) dir_url_args['path'] = dir_path dir_url = reverse('browse-origin-directory', url_args=dir_url_args, query_params=query_params) assert_contains(resp, dir_url) for f in files: file_path = f['name'] if path: file_path = "%s/%s" % (path, f['name']) file_url_args = dict(url_args) file_url_args['path'] = file_path file_url = reverse('browse-origin-content', url_args=file_url_args, query_params=query_params) assert_contains(resp, file_url) if 'path' in url_args: del url_args['path'] root_dir_branch_url = reverse('browse-origin-directory', url_args=url_args, query_params=query_params) nb_bc_paths = 1 if path: nb_bc_paths = len(path.split('/')) + 1 assert_contains(resp, '
  • ', count=nb_bc_paths) assert_contains(resp, '%s' % (root_dir_branch_url, root_directory_sha1[:7])) origin_branches_url = reverse('browse-origin-branches', url_args=url_args, query_params=query_params) assert_contains(resp, 'Branches (%s)' % (origin_branches_url, len(origin_branches))) origin_releases_url = reverse('browse-origin-releases', url_args=url_args, query_params=query_params) nb_releases = len(origin_releases) if nb_releases > 0: assert_contains(resp, 'Releases (%s)' % (origin_releases_url, nb_releases)) if path: url_args['path'] = path assert_contains(resp, '
  • ', count=len(origin_branches)) for branch in origin_branches: query_params['branch'] = branch['name'] root_dir_branch_url = reverse('browse-origin-directory', url_args=url_args, query_params=query_params) assert_contains(resp, '' % root_dir_branch_url) assert_contains(resp, '
  • ', count=len(origin_releases)) query_params['branch'] = None for release in origin_releases: query_params['release'] = release['name'] root_dir_release_url = reverse('browse-origin-directory', url_args=url_args, query_params=query_params) assert_contains(resp, '' % root_dir_release_url) assert_contains(resp, 'vault-cook-directory') assert_contains(resp, 'vault-cook-revision') swh_dir_id = get_swh_persistent_id('directory', directory_entries[0]['dir_id']) # noqa swh_dir_id_url = reverse('browse-swh-id', url_args={'swh_id': swh_dir_id}) assert_contains(resp, swh_dir_id) assert_contains(resp, swh_dir_id_url) assert_contains(resp, 'swh-take-new-snapshot') def _origin_branches_test_helper(client, origin_info, origin_snapshot): url_args = {'origin_url': origin_info['url']} url = reverse('browse-origin-branches', url_args=url_args) resp = client.get(url) assert resp.status_code == 200 assert_template_used('branches.html') origin_branches = origin_snapshot[0] origin_releases = origin_snapshot[1] origin_branches_url = reverse('browse-origin-branches', url_args=url_args) assert_contains(resp, 'Branches (%s)' % (origin_branches_url, len(origin_branches))) origin_releases_url = reverse('browse-origin-releases', url_args=url_args) nb_releases = len(origin_releases) if nb_releases > 0: assert_contains(resp, 'Releases (%s)' % (origin_releases_url, nb_releases)) assert_contains(resp, '' % escape(browse_branch_url)) browse_revision_url = reverse( 'browse-revision', url_args={'sha1_git': branch['revision']}, query_params={'origin': origin_info['url']}) assert_contains(resp, '' % escape(browse_revision_url)) def _origin_releases_test_helper(client, origin_info, origin_snapshot): url_args = {'origin_url': origin_info['url']} url = reverse('browse-origin-releases', url_args=url_args) resp = client.get(url) assert resp.status_code == 200 assert_template_used('releases.html') origin_branches = origin_snapshot[0] origin_releases = origin_snapshot[1] origin_branches_url = reverse('browse-origin-branches', url_args=url_args) assert_contains(resp, 'Branches (%s)' % (origin_branches_url, len(origin_branches))) origin_releases_url = reverse('browse-origin-releases', url_args=url_args) nb_releases = len(origin_releases) if nb_releases > 0: assert_contains(resp, 'Releases (%s)' % (origin_releases_url, nb_releases)) assert_contains(resp, '' % escape(browse_release_url)) assert_contains(resp, '' % escape(browse_revision_url)) diff --git a/swh/web/tests/browse/views/test_release.py b/swh/web/tests/browse/views/test_release.py index d124a1cf..13871e56 100644 --- a/swh/web/tests/browse/views/test_release.py +++ b/swh/web/tests/browse/views/test_release.py @@ -1,104 +1,104 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random from hypothesis import given from swh.web.common.utils import ( reverse, format_utc_iso_date, get_swh_persistent_id ) from swh.web.tests.django_asserts import assert_contains, assert_template_used from swh.web.tests.strategies import ( - release, origin_with_release, unknown_release + release, origin_with_releases, unknown_release ) @given(release()) def test_release_browse(client, archive_data, release): url = reverse('browse-release', url_args={'sha1_git': release}) release_data = archive_data.release_get(release) resp = client.get(url) _release_browse_checks(resp, release_data) -@given(origin_with_release()) +@given(origin_with_releases()) def test_release_browse_with_origin(client, archive_data, origin): snapshot = archive_data.snapshot_get_latest(origin['url']) release = random.choice([b for b in snapshot['branches'].values() if b['target_type'] == 'release']) url = reverse('browse-release', url_args={'sha1_git': release['target']}, query_params={'origin': origin['url']}) release_data = archive_data.release_get(release['target']) resp = client.get(url) _release_browse_checks(resp, release_data, origin) @given(unknown_release()) def test_release_browse_not_found(client, archive_data, unknown_release): url = reverse('browse-release', url_args={'sha1_git': unknown_release}) resp = client.get(url) assert resp.status_code == 404 assert_template_used('error.html') err_msg = 'Release with sha1_git %s not found' % unknown_release assert_contains(resp, err_msg, status_code=404) @given(release()) def test_release_uppercase(client, release): url = reverse('browse-release-uppercase-checksum', url_args={'sha1_git': release.upper()}) resp = client.get(url) assert resp.status_code == 302 redirect_url = reverse('browse-release', url_args={'sha1_git': release}) assert resp['location'] == redirect_url def _release_browse_checks(resp, release_data, origin_info=None): query_params = {} if origin_info: query_params['origin'] = origin_info['url'] release_id = release_data['id'] release_name = release_data['name'] author_name = release_data['author']['name'] release_date = release_data['date'] message = release_data['message'] target_type = release_data['target_type'] target = release_data['target'] target_url = reverse('browse-revision', url_args={'sha1_git': target}, query_params=query_params) message_lines = message.split('\n') assert resp.status_code == 200 assert_template_used('browse/release.html') assert_contains(resp, author_name) assert_contains(resp, format_utc_iso_date(release_date)) assert_contains(resp, '
    %s
    %s' % (message_lines[0] or 'None', '\n'.join(message_lines[1:]))) assert_contains(resp, release_id) assert_contains(resp, release_name) assert_contains(resp, target_type) assert_contains(resp, '
    %s' % (target_url, target)) swh_rel_id = get_swh_persistent_id('release', release_id) swh_rel_id_url = reverse('browse-swh-id', url_args={'swh_id': swh_rel_id}) assert_contains(resp, swh_rel_id) assert_contains(resp, swh_rel_id_url) diff --git a/swh/web/tests/strategies.py b/swh/web/tests/strategies.py index 3171e1ae..f374b335 100644 --- a/swh/web/tests/strategies.py +++ b/swh/web/tests/strategies.py @@ -1,533 +1,533 @@ # Copyright (C) 2018-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import random from collections import defaultdict from datetime import datetime from hypothesis import settings, assume from hypothesis.strategies import ( just, sampled_from, lists, composite, datetimes, binary, text, characters ) from swh.model.hashutil import hash_to_hex, hash_to_bytes from swh.model.identifiers import directory_identifier from swh.storage.algos.revisions_walker import get_revisions_walker from swh.model.hypothesis_strategies import ( origins as new_origin_strategy, snapshots as new_snapshot ) from swh.web.tests.data import get_tests_data # Module dedicated to the generation of input data for tests through # the use of hypothesis. # Some of these data are sampled from a test archive created and populated # in the swh.web.tests.data module. # Set the swh-web hypothesis profile if none has been explicitly set hypothesis_default_settings = settings.get_profile('default') if repr(settings()) == repr(hypothesis_default_settings): settings.load_profile('swh-web') # The following strategies exploit the hypothesis capabilities def _filter_checksum(cs): generated_checksums = get_tests_data()['generated_checksums'] if not int.from_bytes(cs, byteorder='little') or \ cs in generated_checksums: return False generated_checksums.add(cs) return True def _known_swh_object(object_type): return sampled_from(get_tests_data()[object_type]) def sha1(): """ Hypothesis strategy returning a valid hexadecimal sha1 value. """ return binary( min_size=20, max_size=20).filter(_filter_checksum).map(hash_to_hex) def invalid_sha1(): """ Hypothesis strategy returning an invalid sha1 representation. """ return binary( min_size=50, max_size=50).filter(_filter_checksum).map(hash_to_hex) def sha256(): """ Hypothesis strategy returning a valid hexadecimal sha256 value. """ return binary( min_size=32, max_size=32).filter(_filter_checksum).map(hash_to_hex) def content(): """ Hypothesis strategy returning a random content ingested into the test archive. """ return _known_swh_object('contents') def contents(): """ Hypothesis strategy returning random contents ingested into the test archive. """ return lists(content(), min_size=2, max_size=8) def content_text(): """ Hypothesis strategy returning random textual contents ingested into the test archive. """ return content().filter(lambda c: c['mimetype'].startswith('text/')) def content_text_non_utf8(): """ Hypothesis strategy returning random textual contents not encoded to UTF-8 ingested into the test archive. """ return content().filter(lambda c: c['mimetype'].startswith('text/') and c['encoding'] not in ('utf-8', 'us-ascii')) def content_text_no_highlight(): """ Hypothesis strategy returning random textual contents with no detected programming language to highlight ingested into the test archive. """ return content().filter(lambda c: c['mimetype'].startswith('text/') and c['hljs_language'] == 'nohighlight') def content_image_type(): """ Hypothesis strategy returning random image contents ingested into the test archive. """ return content().filter(lambda c: c['mimetype'].startswith('image/')) def content_utf8_detected_as_binary(): """ Hypothesis strategy returning random textual contents detected as binary by libmagic while they are valid UTF-8 encoded files. """ def utf8_binary_detected(content): if content['encoding'] != 'binary': return False try: content['data'].decode('utf-8') except Exception: return False else: return True return content().filter(utf8_binary_detected) @composite def new_content(draw): blake2s256_hex = draw(sha256()) sha1_hex = draw(sha1()) sha1_git_hex = draw(sha1()) sha256_hex = draw(sha256()) assume(sha1_hex != sha1_git_hex) assume(blake2s256_hex != sha256_hex) return { 'blake2S256': blake2s256_hex, 'sha1': sha1_hex, 'sha1_git': sha1_git_hex, 'sha256': sha256_hex } def unknown_content(): """ Hypothesis strategy returning a random content not ingested into the test archive. """ return new_content().filter( lambda c: next(get_tests_data()['storage'].content_get( [hash_to_bytes(c['sha1'])])) is None) def unknown_contents(): """ Hypothesis strategy returning random contents not ingested into the test archive. """ return lists(unknown_content(), min_size=2, max_size=8) def directory(): """ Hypothesis strategy returning a random directory ingested into the test archive. """ return _known_swh_object('directories') def directory_with_subdirs(): """ Hypothesis strategy returning a random directory containing sub directories ingested into the test archive. """ return directory().filter( lambda d: any([e['type'] == 'dir' for e in list( get_tests_data()['storage'].directory_ls(hash_to_bytes(d)))])) def empty_directory(): """ Hypothesis strategy returning the empty directory ingested into the test archive. """ return just(directory_identifier({'entries': []})) def unknown_directory(): """ Hypothesis strategy returning a random directory not ingested into the test archive. """ return sha1().filter( lambda s: len(list(get_tests_data()['storage'].directory_missing( [hash_to_bytes(s)]))) > 0) def origin(): """ Hypothesis strategy returning a random origin ingested into the test archive. """ return _known_swh_object('origins') def origin_with_multiple_visits(): """ Hypothesis strategy returning a random origin ingested into the test archive. """ ret = [] tests_data = get_tests_data() for origin in tests_data['origins']: visits = list(tests_data['storage'].origin_visit_get(origin['url'])) if len(visits) > 1: ret.append(origin) return sampled_from(ret) -def origin_with_release(): +def origin_with_releases(): """ Hypothesis strategy returning a random origin ingested into the test archive. """ ret = [] tests_data = get_tests_data() for origin in tests_data['origins']: snapshot = tests_data['storage'].snapshot_get_latest(origin['url']) if any([b['target_type'] == 'release' for b in snapshot['branches'].values()]): ret.append(origin) return sampled_from(ret) def new_origin(): """ Hypothesis strategy returning a random origin not ingested into the test archive. """ return new_origin_strategy().map(lambda origin: origin.to_dict()).filter( lambda origin: get_tests_data()['storage'].origin_get( [origin])[0] is None) def new_origins(nb_origins=None): """ Hypothesis strategy returning random origins not ingested into the test archive. """ min_size = nb_origins if nb_origins is not None else 2 max_size = nb_origins if nb_origins is not None else 8 size = random.randint(min_size, max_size) return lists(new_origin(), min_size=size, max_size=size, unique_by=lambda o: tuple(sorted(o.items()))) def visit_dates(nb_dates=None): """ Hypothesis strategy returning a list of visit dates. """ min_size = nb_dates if nb_dates else 2 max_size = nb_dates if nb_dates else 8 return lists(datetimes(min_value=datetime(2015, 1, 1, 0, 0), max_value=datetime(2018, 12, 31, 0, 0)), min_size=min_size, max_size=max_size, unique=True).map(sorted) def release(): """ Hypothesis strategy returning a random release ingested into the test archive. """ return _known_swh_object('releases') def unknown_release(): """ Hypothesis strategy returning a random revision not ingested into the test archive. """ return sha1().filter( lambda s: next(get_tests_data()['storage'].release_get([s])) is None) def revision(): """ Hypothesis strategy returning a random revision ingested into the test archive. """ return _known_swh_object('revisions') def unknown_revision(): """ Hypothesis strategy returning a random revision not ingested into the test archive. """ return sha1().filter( lambda s: next(get_tests_data()['storage'].revision_get( [hash_to_bytes(s)])) is None) @composite def new_person(draw): """ Hypothesis strategy returning random raw swh person data. """ name = draw(text(min_size=5, max_size=30, alphabet=characters(min_codepoint=0, max_codepoint=255))) email = '%s@company.org' % name return { 'name': name.encode(), 'email': email.encode(), 'fullname': ('%s <%s>' % (name, email)).encode() } @composite def new_swh_date(draw): """ Hypothesis strategy returning random raw swh date data. """ timestamp = draw( datetimes(min_value=datetime(2015, 1, 1, 0, 0), max_value=datetime(2018, 12, 31, 0, 0)).map( lambda d: int(d.timestamp()))) return { 'timestamp': timestamp, 'offset': 0, 'negative_utc': False, } @composite def new_revision(draw): """ Hypothesis strategy returning random raw swh revision data not ingested into the test archive. """ return { 'id': draw(unknown_revision().map(hash_to_bytes)), 'directory': draw(sha1().map(hash_to_bytes)), 'author': draw(new_person()), 'committer': draw(new_person()), 'message': draw( text(min_size=20, max_size=100).map(lambda t: t.encode())), 'date': draw(new_swh_date()), 'committer_date': draw(new_swh_date()), 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], } def revisions(min_size=2, max_size=8): """ Hypothesis strategy returning random revisions ingested into the test archive. """ return lists(revision(), min_size=min_size, max_size=max_size) def unknown_revisions(min_size=2, max_size=8): """ Hypothesis strategy returning random revisions not ingested into the test archive. """ return lists(unknown_revision(), min_size=min_size, max_size=max_size) def snapshot(): """ Hypothesis strategy returning a random snapshot ingested into the test archive. """ return _known_swh_object('snapshots') def new_snapshots(nb_snapshots=None): min_size = nb_snapshots if nb_snapshots else 2 max_size = nb_snapshots if nb_snapshots else 8 return lists(new_snapshot(min_size=2, max_size=10, only_objects=True) .map(lambda snp: snp.to_dict()), min_size=min_size, max_size=max_size) def unknown_snapshot(): """ Hypothesis strategy returning a random revision not ingested into the test archive. """ return sha1().filter( lambda s: get_tests_data()['storage'].snapshot_get( hash_to_bytes(s)) is None) def _get_origin_dfs_revisions_walker(): tests_data = get_tests_data() storage = tests_data['storage'] origin = random.choice(tests_data['origins'][:-1]) snapshot = storage.snapshot_get_latest(origin['url']) if snapshot['branches'][b'HEAD']['target_type'] == 'alias': target = snapshot['branches'][b'HEAD']['target'] head = snapshot['branches'][target]['target'] else: head = snapshot['branches'][b'HEAD']['target'] return get_revisions_walker('dfs', storage, head) def ancestor_revisions(): """ Hypothesis strategy returning a pair of revisions ingested into the test archive with an ancestor relation. """ # get a dfs revisions walker for one of the origins # loaded into the test archive revisions_walker = _get_origin_dfs_revisions_walker() master_revisions = [] children = defaultdict(list) init_rev_found = False # get revisions only authored in the master branch for rev in revisions_walker: for rev_p in rev['parents']: children[rev_p].append(rev['id']) if not init_rev_found: master_revisions.append(rev) if not rev['parents']: init_rev_found = True # head revision root_rev = master_revisions[0] # pick a random revision, different from head, only authored # in the master branch ancestor_rev_idx = random.choice(list(range(1, len(master_revisions)-1))) ancestor_rev = master_revisions[ancestor_rev_idx] ancestor_child_revs = children[ancestor_rev['id']] return just({ 'sha1_git_root': hash_to_hex(root_rev['id']), 'sha1_git': hash_to_hex(ancestor_rev['id']), 'children': [hash_to_hex(r) for r in ancestor_child_revs] }) def non_ancestor_revisions(): """ Hypothesis strategy returning a pair of revisions ingested into the test archive with no ancestor relation. """ # get a dfs revisions walker for one of the origins # loaded into the test archive revisions_walker = _get_origin_dfs_revisions_walker() merge_revs = [] children = defaultdict(list) # get all merge revisions for rev in revisions_walker: if len(rev['parents']) > 1: merge_revs.append(rev) for rev_p in rev['parents']: children[rev_p].append(rev['id']) # find a merge revisions whose parents have a unique child revision random.shuffle(merge_revs) selected_revs = None for merge_rev in merge_revs: if all(len(children[rev_p]) == 1 for rev_p in merge_rev['parents']): selected_revs = merge_rev['parents'] return just({ 'sha1_git_root': hash_to_hex(selected_revs[0]), 'sha1_git': hash_to_hex(selected_revs[1]) }) # The following strategies returns data specific to some tests # that can not be generated and thus are hardcoded. def contents_with_ctags(): """ Hypothesis strategy returning contents ingested into the test archive. Those contents are ctags compatible, that is running ctags on those lay results. """ return just({ 'sha1s': ['0ab37c02043ebff946c1937523f60aadd0844351', '15554cf7608dde6bfefac7e3d525596343a85b6f', '2ce837f1489bdfb8faf3ebcc7e72421b5bea83bd', '30acd0b47fc25e159e27a980102ddb1c4bea0b95', '4f81f05aaea3efb981f9d90144f746d6b682285b', '5153aa4b6e4455a62525bc4de38ed0ff6e7dd682', '59d08bafa6a749110dfb65ba43a61963d5a5bf9f', '7568285b2d7f31ae483ae71617bd3db873deaa2c', '7ed3ee8e94ac52ba983dd7690bdc9ab7618247b4', '8ed7ef2e7ff9ed845e10259d08e4145f1b3b5b03', '9b3557f1ab4111c8607a4f2ea3c1e53c6992916c', '9c20da07ed14dc4fcd3ca2b055af99b2598d8bdd', 'c20ceebd6ec6f7a19b5c3aebc512a12fbdc9234b', 'e89e55a12def4cd54d5bff58378a3b5119878eb7', 'e8c0654fe2d75ecd7e0b01bee8a8fc60a130097e', 'eb6595e559a1d34a2b41e8d4835e0e4f98a5d2b5'], 'symbol_name': 'ABS' }) def revision_with_submodules(): """ Hypothesis strategy returning a revision that is known to point to a directory with revision entries (aka git submodule) """ return just({ 'rev_sha1_git': 'ffcb69001f3f6745dfd5b48f72ab6addb560e234', 'rev_dir_sha1_git': 'd92a21446387fa28410e5a74379c934298f39ae2', 'rev_dir_rev_path': 'libtess2' })