diff --git a/swh/web/ui/tests/test_utils.py b/swh/web/ui/tests/test_utils.py index d82f9095d..ab2d50b55 100644 --- a/swh/web/ui/tests/test_utils.py +++ b/swh/web/ui/tests/test_utils.py @@ -1,165 +1,171 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from unittest.mock import patch from nose.tools import istest from swh.web.ui import utils class UtilsTestCase(unittest.TestCase): def setUp(self): self.url_map = [dict(rule='/other/', methods=set(['GET', 'POST', 'HEAD']), endpoint='foo'), dict(rule='/some/old/url/', methods=set(['GET', 'POST']), endpoint='blablafn'), dict(rule='/other/old/url/', methods=set(['GET', 'HEAD']), endpoint='bar'), dict(rule='/other', methods=set([]), endpoint=None), dict(rule='/other2', methods=set([]), endpoint=None)] @istest def filter_endpoints_1(self): # when actual_data = utils.filter_endpoints(self.url_map, '/some') # then self.assertEquals(actual_data, { '/some/old/url/': { 'methods': ['GET', 'POST'], 'endpoint': 'blablafn' } }) @istest def filter_endpoints_2(self): # when actual_data = utils.filter_endpoints(self.url_map, '/other', blacklist=['/other2']) # then # rules /other is skipped because its' exactly the prefix url # rules /other2 is skipped because it's blacklisted self.assertEquals(actual_data, { '/other/': { 'methods': ['GET', 'HEAD', 'POST'], 'endpoint': 'foo' }, '/other/old/url/': { 'methods': ['GET', 'HEAD'], 'endpoint': 'bar' } }) @patch('swh.web.ui.utils.flask') @istest def prepare_directory_listing(self, mock_flask): # given def mock_url_for(url_key, **kwds): if url_key == 'browse_directory': sha1_git = kwds['sha1_git'] return '/path/to/url/dir' + '/' + sha1_git else: sha1_git = kwds['q'] return '/path/to/url/file' + '/' + sha1_git mock_flask.url_for.side_effect = mock_url_for inputs = [{'type': 'dir', 'target': '123', 'name': 'some-dir-name'}, {'type': 'file', 'sha1': '654', 'name': 'some-filename'}, {'type': 'dir', 'target': '987', 'name': 'some-other-dirname'}] expected_output = [{'link': '/path/to/url/dir/123', 'name': 'some-dir-name', 'type': 'dir'}, {'link': '/path/to/url/file/654', 'name': 'some-filename', 'type': 'file'}, {'link': '/path/to/url/dir/987', 'name': 'some-other-dirname', 'type': 'dir'}] # when actual_outputs = utils.prepare_directory_listing(inputs) # then self.assertEquals(actual_outputs, expected_output) @istest def filter_field_keys_dict_unknown_keys(self): # when actual_res = utils.filter_field_keys( {'directory': 1, 'file': 2, 'link': 3}, {'directory1', 'file2'}) # then self.assertEqual(actual_res, {}) @istest def filter_field_keys_dict(self): # when actual_res = utils.filter_field_keys( {'directory': 1, 'file': 2, 'link': 3}, {'directory', 'link'}) # then self.assertEqual(actual_res, {'directory': 1, 'link': 3}) @istest def filter_field_keys_list_unknown_keys(self): # when actual_res = utils.filter_field_keys( [{'directory': 1, 'file': 2, 'link': 3}, {'1': 1, '2': 2, 'link': 3}], {'d'}) # then self.assertEqual(actual_res, [{}, {}]) @istest def filter_field_keys_list(self): # when actual_res = utils.filter_field_keys( [{'directory': 1, 'file': 2, 'link': 3}, {'dir': 1, 'fil': 2, 'lin': 3}], {'directory', 'dir'}) # then self.assertEqual(actual_res, [{'directory': 1}, {'dir': 1}]) @istest def filter_field_keys_other(self): # given inputSet = {1, 2} # when actual_res = utils.filter_field_keys(inputSet, {'a', '1'}) # then self.assertEqual(actual_res, inputSet) @istest def fmap(self): self.assertEquals([2, 3, 4], utils.fmap(lambda x: x+1, [1, 2, 3])) self.assertEquals({'a': 2, 'b': 4}, utils.fmap(lambda x: x*2, {'a': 1, 'b': 2})) self.assertEquals(100, utils.fmap(lambda x: x*10, 10)) + + @istest + def person_to_string(self): + self.assertEqual(utils.person_to_string(dict(name='raboof', + email='foo@bar')), + 'raboof ') diff --git a/swh/web/ui/utils.py b/swh/web/ui/utils.py index df6263b20..b76b910dd 100644 --- a/swh/web/ui/utils.py +++ b/swh/web/ui/utils.py @@ -1,91 +1,98 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import flask def filter_endpoints(url_map, prefix_url_rule, blacklist=[]): """Filter endpoints by prefix url rule. Args: - url_map: Url Werkzeug.Map of rules - prefix_url_rule: prefix url string - blacklist: blacklist of some url Returns: Dictionary of url_rule with values methods and endpoint. The key is the url, the associated value is a dictionary of 'methods' (possible http methods) and 'endpoint' (python function) """ out = {} for r in url_map: rule = r['rule'] if rule == prefix_url_rule or rule in blacklist: continue if rule.startswith(prefix_url_rule): out[rule] = {'methods': sorted(map(str, r['methods'])), 'endpoint': r['endpoint']} return out def prepare_directory_listing(files): """Given a list of dictionary files, return a view ready dictionary. """ ls = [] for entry in files: new_entry = {'name': entry['name'], 'type': entry['type']} if entry['type'] == 'dir': new_entry['link'] = flask.url_for('browse_directory', sha1_git=entry['target']) else: new_entry['link'] = flask.url_for('show_content', q=entry['sha1']) ls.append(new_entry) return ls def filter_field_keys(obj, field_keys): """Given an object instance (directory or list), and a csv field keys to filter on. Return the object instance with filtered keys. Note: Returns obj as is if it's an instance of types not in (dictionary, list) Args: - obj: one object (dictionary, list...) to filter. - field_keys: csv or set of keys to filter the object on Returns: obj filtered on field_keys """ if isinstance(obj, dict): filt_dict = {} for key, value in obj.items(): if key in field_keys: filt_dict[key] = value return filt_dict elif isinstance(obj, list): filt_list = [] for e in obj: filt_list.append(filter_field_keys(e, field_keys)) return filt_list return obj def fmap(f, data): if isinstance(data, list): return [f(x) for x in data] if isinstance(data, dict): return {k: f(v) for (k, v) in data.items()} return f(data) + + +def person_to_string(person): + """Map a person (person, committer, tagger, etc...) to a string. + + """ + return ''.join([person['name'], ' <', person['email'], '>']) diff --git a/swh/web/ui/views.py b/swh/web/ui/views.py index 7864c002d..9ec5a5cb2 100644 --- a/swh/web/ui/views.py +++ b/swh/web/ui/views.py @@ -1,324 +1,321 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from flask import render_template, flash, request, url_for from flask.ext.api.decorators import set_renderers from flask.ext.api.renderers import HTMLRenderer from swh.core.hashutil import ALGORITHMS from swh.web.ui import service, utils from swh.web.ui.exc import BadInputExc from swh.web.ui.main import app hash_filter_keys = ALGORITHMS @app.route('/') @set_renderers(HTMLRenderer) def homepage(): """Home page """ flash('This Web app is still work in progress, use at your own risk', 'warning') # return redirect(url_for('about')) return render_template('home.html') @app.route('/about/') @set_renderers(HTMLRenderer) def about(): return render_template('about.html') @app.route('/search/', methods=['GET', 'POST']) @set_renderers(HTMLRenderer) def search(): """Search for hashes in swh-storage. One form to submit either: - hash query to look up in swh storage - some file content to upload, compute its hash and look it up in swh storage - both Returns: dict representing data to look for in swh storage. The following keys are returned: - file: File submitted for upload - filename: Filename submitted for upload - q: Query on hash to look for - message: Message detailing if data has been found or not. """ env = {'filename': None, 'q': None, 'file': None} data = None q = env['q'] file = env['file'] if request.method == 'GET': data = request.args elif request.method == 'POST': data = request.data # could either be a query for sha1 hash q = data.get('q') # or hash and search a file file = request.files.get('filename') messages = [] if q: env['q'] = q try: r = service.lookup_hash(q) messages.append('Content with hash %s%sfound!' % ( q, ' ' if r.get('found') else ' not ')) except BadInputExc as e: messages.append(str(e)) if file: env['file'] = file try: uploaded_content = service.upload_and_search(file) filename = uploaded_content['filename'] sha1 = uploaded_content['sha1'] found = uploaded_content['found'] messages.append('File %s with hash %s%sfound!' % ( filename, sha1, ' ' if found else ' not ')) env.update({ 'filename': filename, 'sha1': sha1, }) except BadInputExc as e: messages.append(str(e)) env['q'] = q if q else '' env['messages'] = messages return render_template('upload_and_search.html', **env) def _origin_seen(q, data): """Given an origin, compute a message string with the right information. Args: origin: a dictionary with keys: - origin: a dictionary with type and url keys - occurrence: a dictionary with a validity range Returns: Message as a string """ origin_type = data['origin_type'] origin_url = data['origin_url'] revision = data['revision'] branch = data['branch'] path = data['path'] return """The content with hash %s has been seen on origin with type '%s' at url '%s'. The revision was identified at '%s' on branch '%s'. The file's path referenced was '%s'.""" % (q, origin_type, origin_url, revision, branch, path) # @app.route('/browse/content/') # @app.route('/browse/content//') @set_renderers(HTMLRenderer) def content_with_origin(q='sha1:4320781056e5a735a39de0b8c229aea224590052'): """Show content information. Args: - q: query string of the form with `algo_hash` in sha1, sha1_git, sha256. This means that several different URLs (at least one per HASH_ALGO) will point to the same content sha: the sha with 'hash' format Returns: The content's information at for a given checksum. """ env = {'q': q} try: content = service.lookup_hash(q) if not content.get('found'): message = "Hash %s was not found." % q else: origin = service.lookup_hash_origin(q) message = _origin_seen(q, origin) except BadInputExc as e: # do not like it but do not duplicate code message = str(e) env['message'] = message return render_template('content.html', **env) @app.route('/browse/content//raw/') @set_renderers(HTMLRenderer) def show_content(q): """Given a hash and a checksum, display the content's raw data. Args: q is of the form algo_hash:hash with algo_hash in (sha1, sha1_git, sha256) Returns: Information on one possible origin for such content. Raises: BadInputExc in case of unknown algo_hash or bad hash NotFoundExc if the content is not found. """ env = {} try: content = service.lookup_content_raw(q) if content: # FIXME: will break if not utf-8 content['data'] = content['data'].decode('utf-8') message = 'Content %s' % content['sha1'] else: message = 'Content with %s not found.' % q except BadInputExc as e: message = str(e) content = None env['message'] = message env['content'] = content return render_template('display_content.html', **env) @app.route('/browse/directory/') @app.route('/browse/directory//') @set_renderers(HTMLRenderer) def browse_directory(sha1_git='828da2b80e41aa958b2c98526f4a1d2cc7d298b7'): """Show directory information. Args: - sha1_git: the directory's sha1 git identifier. Returns: The content's information at sha1_git """ env = {'sha1_git': sha1_git} try: directory_files = service.lookup_directory(sha1_git) if directory_files: message = "Listing for directory %s:" % sha1_git files = utils.prepare_directory_listing(directory_files) else: message = "Directory %s not found." % sha1_git files = [] except BadInputExc as e: # do not like it but do not duplicate code message = str(e) files = [] env['message'] = message env['files'] = files return render_template('directory.html', **env) @app.route('/browse/origin/') @app.route('/browse/origin//') @set_renderers(HTMLRenderer) def browse_origin(origin_id=1): """Browse origin with id id. """ env = {'origin_id': origin_id, 'origin': None} try: ori = service.lookup_origin(origin_id) if ori: env.update({'origin': ori}) else: env.update({'message': 'Origin %s not found!' % origin_id}) except BadInputExc as e: env.update({'message': str(e)}) return render_template('origin.html', **env) @app.route('/browse/person/') @app.route('/browse/person//') @set_renderers(HTMLRenderer) def browse_person(person_id=1): """Browse person with id id. """ env = {'person_id': person_id, 'person': None} try: ori = service.lookup_person(person_id) if ori: env.update({'person': ori}) else: env.update({'message': 'Person %s not found!' % person_id}) except BadInputExc as e: env.update({'message': str(e)}) return render_template('person.html', **env) @app.route('/browse/release/') @app.route('/browse/release//') @set_renderers(HTMLRenderer) def browse_release(sha1_git='1e951912027ea6873da6985b91e50c47f645ae1a'): """Browse release with sha1_git. """ env = {'sha1_git': sha1_git, 'release': None} try: rel = service.lookup_release(sha1_git) if rel: author = rel.get('author') if author: - rel['author'] = ''.join([author['name'], - ' <', author['email'], '>']) - rel['author'] = utils.person_to_string(author) target_type = rel.get('target_type') if target_type == 'revision': rel['target'] = url_for('browse_revision', sha1_git=rel['target']) env.update({'release': rel, 'keys': ['id', 'name', 'date', 'message', 'author', 'target', 'target_type']}) else: env.update({'message': 'Release %s not found!' % sha1_git}) except BadInputExc as e: env.update({'message': str(e)}) return render_template('release.html', **env) @app.route('/browse/revision/') @app.route('/browse/revision//') @set_renderers(HTMLRenderer) def browse_revision(sha1_git='d770e558e21961ad6cfdf0ff7df0eb5d7d4f0754'): pass