diff --git a/swh/web/ui/api.py b/swh/web/ui/api.py index 0a5e902b..74700f8c 100644 --- a/swh/web/ui/api.py +++ b/swh/web/ui/api.py @@ -1,870 +1,870 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from types import GeneratorType from flask import request, url_for, Response, redirect from swh.web.ui import service, utils from swh.web.ui.exc import BadInputExc, NotFoundExc from swh.web.ui.main import app @app.route('/api/1/stat/counters/') def api_stats(): """Return statistics on SWH storage. Returns: SWH storage's statistics. """ return service.stat_counters() @app.route('/api/1/search/') @app.route('/api/1/search//') def api_search(q='sha1:bd819b5b28fcde3bf114d16a44ac46250da94ee5'): """Search a content per hash. Args: q is of the form algo_hash:hash with algo_hash in (sha1, sha1_git, sha256). Returns: Dictionary with 'found' key and the associated result. Raises: BadInputExc in case of unknown algo_hash or bad hash. Example: GET /api/1/search/sha1:bd819b5b28fcde3bf114d16a44ac46250da94ee5/ """ r = service.lookup_hash(q).get('found') return {'found': True if r else False} def _api_lookup(criteria, lookup_fn, error_msg_if_not_found, enrich_fn=lambda x: x, *args): """Capture a redundant behavior of: - looking up the backend with a criteria (be it an identifier or checksum) passed to the function lookup_fn - if nothing is found, raise an NotFoundExc exception with error message error_msg_if_not_found. - Otherwise if something is returned: - either as list, map or generator, map the enrich_fn function to it and return the resulting data structure as list. - either as dict and pass to enrich_fn and return the dict enriched. Args: - criteria: discriminating criteria to lookup - lookup_fn: function expects one criteria and optional supplementary *args. - error_msg_if_not_found: if nothing matching the criteria is found, raise NotFoundExc with this error message. - enrich_fn: Function to use to enrich the result returned by lookup_fn. Default to the identity function if not provided. - *args: supplementary arguments to pass to lookup_fn. Raises: NotFoundExp or whatever `lookup_fn` raises. """ res = lookup_fn(criteria, *args) if not res: raise NotFoundExc(error_msg_if_not_found) if isinstance(res, (map, list, GeneratorType)): enriched_data = [] for e in res: enriched_data.append(enrich_fn(e)) return enriched_data return enrich_fn(res) @app.route('/api/1/origin/') @app.route('/api/1/origin//') def api_origin(origin_id=1): """Return information about origin with id origin_id. Args: origin_id: the origin's identifier. Returns: Information on the origin if found. Raises: NotFoundExc if the origin is not found. Example: GET /api/1/origin/1/ """ return _api_lookup( origin_id, lookup_fn=service.lookup_origin, error_msg_if_not_found='Origin with id %s not found.' % origin_id) @app.route('/api/1/person/') @app.route('/api/1/person//') def api_person(person_id=1): """Return information about person with identifier person_id. Args: person_id: the person's identifier. Returns: Information on the person if found. Raises: NotFoundExc if the person is not found. Example: GET /api/1/person/1/ """ return _api_lookup( person_id, lookup_fn=service.lookup_person, error_msg_if_not_found='Person with id %s not found.' % person_id) def _enrich_release(release): """Enrich a release with link to the 'target' of 'type' revision. """ if 'target' in release and \ 'target_type' in release and \ release['target_type'] == 'revision': release['target_url'] = url_for('api_revision', sha1_git=release['target']) return release @app.route('/api/1/release/') @app.route('/api/1/release//') -def api_release(sha1_git='3c31de6fdc47031857fda10cfa4caf7044cadefb'): +def api_release(sha1_git='1e951912027ea6873da6985b91e50c47f645ae1a'): """Return information about release with id sha1_git. Args: sha1_git: the release's hash. Returns: Information on the release if found. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc if the release is not found. Example: GET /api/1/release/b307094f00c3641b0c9da808d894f3a325371414 """ error_msg = 'Release with sha1_git %s not found.' % sha1_git return _api_lookup( sha1_git, lookup_fn=service.lookup_release, error_msg_if_not_found=error_msg, enrich_fn=_enrich_release) def _enrich_revision_with_urls(revision, context=None): """Enrich revision with links where it makes sense (directory, parents). """ if not context: context = revision['id'] revision['url'] = url_for('api_revision', sha1_git=revision['id']) revision['history_url'] = url_for('api_revision_log', sha1_git=revision['id']) if 'directory' in revision: revision['directory_url'] = url_for('api_directory', sha1_git=revision['directory']) if 'parents' in revision: parents = [] for parent in revision['parents']: parents.append(url_for('api_revision_history', sha1_git_root=context, sha1_git=parent)) revision['parent_urls'] = parents if 'children' in revision: children = [] for child in revision['children']: children.append(url_for('api_revision_history', sha1_git_root=context, sha1_git=child)) revision['children_urls'] = children return revision @app.route('/api/1/revision' '/origin/' '/directory/') @app.route('/api/1/revision' '/origin/' '/directory//') @app.route('/api/1/revision' '/origin/' '/branch/' '/directory/') @app.route('/api/1/revision' '/origin/' '/branch/' '/directory//') @app.route('/api/1/revision' '/origin/' '/branch/' '/ts/' '/directory/') @app.route('/api/1/revision' '/origin/' '/branch/' '/ts/' '/directory//') def api_directory_through_revision_with_origin(origin_id=1, branch_name="refs/heads/master", ts=None, path=None): """Display directory or content information through a revision identified by origin/branch/timestamp. Args: origin_id: origin's identifier (default to 1). branch_name: the optional branch for the given origin (default to master). timestamp: optional timestamp (default to the nearest time crawl of timestamp). path: Path to directory or file to display. Returns: Information on the directory or content pointed to by such revision. Raises: NotFoundExc if the revision is not found or the path pointed to is not found. """ if ts: ts = utils.parse_timestamp(ts) revision = service.lookup_revision_by(origin_id, branch_name, ts) if not revision: raise NotFoundExc('Revision with (origin_id: %s, branch_name: %s' ', ts: %s) not found.' % (origin_id, branch_name, ts)) return _revision_directory(revision['id'], path, request.path) @app.route('/api/1/revision' '/origin/' '/history//') @app.route('/api/1/revision' '/origin/' '/branch/' '/history//') @app.route('/api/1/revision' '/origin/' '/branch/' '/ts/' '/history//') def api_history_through_revision_with_origin(origin_id=1, branch_name="refs/heads/master", ts=None, sha1_git=None): """ Return information about revision sha1_git, limited to the sub-graph of all transitive parents of the revision root identified by (origin_id, branch_name, ts). Given sha1_git_root such root revision's identifier, in other words, sha1_git is an ancestor of sha1_git_root. Args: origin_id: origin's identifier (default to 1). branch_name: the optional branch for the given origin (default to master). timestamp: optional timestamp (default to the nearest time crawl of timestamp). sha1_git: one of sha1_git_root's ancestors. limit: optional query parameter to limit the revisions log (default to 100). For now, note that this limit could impede the transitivity conclusion about sha1_git not being an ancestor of sha1_git_root (even if it is). Returns: Information on sha1_git if it is an ancestor of sha1_git_root including children leading to sha1_git_root. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc if either revision is not found or if sha1_git is not an ancestor of sha1_git_root. """ limit = int(request.args.get('limit', '100')) if ts: ts = utils.parse_timestamp(ts) rev_root, revision = service.lookup_revision_with_context_by( origin_id, branch_name, ts, sha1_git, limit) if not revision: raise NotFoundExc( "Possibly sha1_git '%s' is not an ancestor of sha1_git_root '%s' " "sha1_git_root being the revision's identifier pointed to by " "(origin_id: %s, branch_name: %s, ts: %s)." % (sha1_git, rev_root['id'], origin_id, branch_name, ts)) return _enrich_revision_with_urls(revision, context=rev_root['id']) @app.route('/api/1/revision' '/origin/' '/history/' '/directory/') @app.route('/api/1/revision' '/origin/' '/history/' '/directory//') @app.route('/api/1/revision' '/origin/' '/branch/' '/history/' '/directory/') @app.route('/api/1/revision' '/origin/' '/branch/' '/history/' '/directory//') @app.route('/api/1/revision' '/origin/' '/branch/' '/ts/' '/history/' '/directory/') @app.route('/api/1/revision' '/origin/' '/branch/' '/ts/' '/history/' '/directory//') def api_directory_through_revision_with_origin_history( origin_id=1, branch_name="refs/heads/master", ts=None, sha1_git=None, path=None): """Return information about directory or content pointed to by the revision defined as: revision sha1_git, limited to the sub-graph of all transitive parents of sha1_git_root (being the identified sha1 by looking up origin_id/branch_name/ts) Args: origin_id: origin's identifier (default to 1). branch_name: the optional branch for the given origin (default to master). timestamp: optional timestamp (default to the nearest time crawl of timestamp). sha1_git: one of sha1_git_root's ancestors. path: optional directory or content pointed to by that revision. limit: optional query parameter to limit the revisions log (default to 100). For now, note that this limit could impede the transitivity conclusion about sha1_git not being an ancestor of sha1_git_root (even if it is). Returns: Information on the directory pointed to by that revision. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc if either revision is not found or if sha1_git is not an ancestor of sha1_git_root or the path referenced does not exist. """ limit = int(request.args.get('limit', '100')) if ts: ts = utils.parse_timestamp(ts) rev_root, revision = service.lookup_revision_with_context_by(origin_id, branch_name, ts, sha1_git, limit) if not revision: raise NotFoundExc( "Possibly sha1_git '%s' is not an ancestor of sha1_git_root '%s' " "sha1_git_root being the revision's identifier pointed to by " "(origin_id: %s, branch_name: %s, ts: %s)." % (sha1_git, rev_root['id'], origin_id, branch_name, ts)) return _revision_directory(revision['id'], path, request.path) @app.route('/api/1/revision' '/origin/') @app.route('/api/1/revision' '/origin//') @app.route('/api/1/revision' '/origin/' '/branch//') @app.route('/api/1/revision' '/origin/' '/branch/' '/ts//') @app.route('/api/1/revision' '/origin/' '/ts//') def api_revision_with_origin(origin_id=1, branch_name="refs/heads/master", ts=None): """Instead of having to specify a (root) revision by SHA1_GIT, users might want to specify a place and a time. In SWH a "place" is an origin; a "time" is a timestamp at which some place has been observed by SWH crawlers. Args: origin_id: origin's identifier (default to 1). branch_name: the optional branch for the given origin (default to master). timestamp: optional timestamp (default to the nearest time crawl of timestamp). Returns: Information on the revision if found. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc if the revision is not found. """ if ts: ts = utils.parse_timestamp(ts) return _api_lookup( origin_id, service.lookup_revision_by, 'Revision with (origin_id: %s, branch_name: %s' ', ts: %s) not found.' % (origin_id, branch_name, ts), _enrich_revision_with_urls, branch_name, ts) @app.route('/api/1/revision/') @app.route('/api/1/revision//') def api_revision(sha1_git='a585d2b738bfa26326b3f1f40f0f1eda0c067ccf'): """Return information about revision with id sha1_git. Args: sha1_git: the revision's hash. Returns: Information on the revision if found. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc if the revision is not found. Example: GET /api/1/revision/baf18f9fc50a0b6fef50460a76c33b2ddc57486e """ return _api_lookup( sha1_git, lookup_fn=service.lookup_revision, error_msg_if_not_found='Revision with sha1_git %s not' ' found.' % sha1_git, enrich_fn=_enrich_revision_with_urls) def _enrich_directory(directory, context_url=None): """Enrich directory with url to content or directory. """ if 'type' in directory: target_type = directory['type'] target = directory['target'] if target_type == 'file': directory['target_url'] = url_for('api_content_with_details', q='sha1_git:%s' % target) if context_url: directory['file_url'] = context_url + directory['name'] + '/' else: directory['target_url'] = url_for('api_directory', sha1_git=target) if context_url: directory['dir_url'] = context_url + directory['name'] + '/' return directory def _revision_directory(rev_sha1_git, dir_path, request_path): """Compute the revision rev_sha1_git's directory or content data. """ def enrich_directory_local(dir, context_url=request_path): return _enrich_directory(dir, context_url) result = service.lookup_directory_with_revision(rev_sha1_git, dir_path) if not result: raise NotFoundExc('Revision with sha1_git %s not' ' found.' % rev_sha1_git) if result['type'] == 'dir': # dir_entries return list(map(enrich_directory_local, result['content'])) else: # content return _enrich_content(result['content']) @app.route('/api/1/revision//directory/') @app.route('/api/1/revision//directory//') def api_directory_with_revision( sha1_git='a585d2b738bfa26326b3f1f40f0f1eda0c067ccf', dir_path=None): """Return information on directory pointed by revision with sha1_git. If dir_path is not provided, display top level directory. Otherwise, display the directory pointed by dir_path (if it exists). Args: sha1_git: revision's hash. dir_path: optional directory pointed to by that revision. Returns: Information on the directory pointed to by that revision. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc either if the revision is not found or the path referenced does not exist Example: GET /api/1/revision/baf18f9fc50a0b6fef50460a76c33b2ddc57486e/directory/ """ return _revision_directory(sha1_git, dir_path, request.path) @app.route('/api/1/revision//history//') def api_revision_history(sha1_git_root, sha1_git): """Return information about revision sha1_git, limited to the sub-graph of all transitive parents of sha1_git_root. In other words, sha1_git is an ancestor of sha1_git_root. Args: sha1_git_root: latest revision of the browsed history. sha1_git: one of sha1_git_root's ancestors. limit: optional query parameter to limit the revisions log (default to 100). For now, note that this limit could impede the transitivity conclusion about sha1_git not being an ancestor of sha1_git_root (even if it is). Returns: Information on sha1_git if it is an ancestor of sha1_git_root including children leading to sha1_git_root. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc if either revision is not found or if sha1_git is not an ancestor of sha1_git_root. """ limit = int(request.args.get('limit', '100')) if sha1_git == sha1_git_root: return redirect(url_for('api_revision', sha1_git=sha1_git, limit=limit)) revision = service.lookup_revision_with_context(sha1_git_root, sha1_git, limit) if not revision: raise NotFoundExc( "Possibly sha1_git '%s' is not an ancestor of sha1_git_root '%s'" % (sha1_git, sha1_git_root)) return _enrich_revision_with_urls(revision, context=sha1_git_root) @app.route('/api/1/revision/' '/history/' '/directory/') @app.route('/api/1/revision/' '/history/' '/directory//') def api_directory_revision_history(sha1_git_root, sha1_git, dir_path=None): """Return information about directory pointed to by the revision defined as: revision sha1_git, limited to the sub-graph of all transitive parents of sha1_git_root. Args: sha1_git_root: latest revision of the browsed history. sha1_git: one of sha1_git_root's ancestors. dir_path: optional directory pointed to by that revision. limit: optional query parameter to limit the revisions log (default to 100). For now, note that this limit could impede the transitivity conclusion about sha1_git not being an ancestor of sha1_git_root (even if it is). Returns: Information on the directory pointed to by that revision. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc if either revision is not found or if sha1_git is not an ancestor of sha1_git_root or the path referenced does not exist """ limit = int(request.args.get('limit', '100')) if sha1_git == sha1_git_root: return redirect(url_for('api_directory_with_revision', sha1_git=sha1_git, dir_path=dir_path), code=301) revision = service.lookup_revision_with_context(sha1_git_root, sha1_git, limit) if not revision: raise NotFoundExc( "Possibly sha1_git '%s' is not an ancestor of sha1_git_root '%s'" % (sha1_git, sha1_git_root)) return _revision_directory(revision['id'], dir_path, request.path) @app.route('/api/1/revision//log/') def api_revision_log(sha1_git): """Show all revisions (~git log) starting from sha1_git. The first element returned is the given sha1_git. Args: sha1_git: the revision's hash. limit: optional query parameter to limit the revisions log (default to 100). Returns: Information on the revision if found. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc if the revision is not found. """ limit = int(request.args.get('limit', '100')) def lookup_revision_log_with_limit(s, limit=limit): return service.lookup_revision_log(s, limit) error_msg = 'Revision with sha1_git %s not found.' % sha1_git return _api_lookup(sha1_git, lookup_fn=lookup_revision_log_with_limit, error_msg_if_not_found=error_msg, enrich_fn=_enrich_revision_with_urls) @app.route('/api/1/directory/') @app.route('/api/1/directory//') def api_directory(sha1_git='dcf3289b576b1c8697f2a2d46909d36104208ba3'): """Return information about release with id sha1_git. Args: sha1_git: Directory's sha1_git. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc if the content is not found. Example: GET /api/1/directory/8d7dc91d18546a91564606c3e3695a5ab568d179 """ error_msg = 'Directory with sha1_git %s not found.' % sha1_git return _api_lookup( sha1_git, lookup_fn=service.lookup_directory, error_msg_if_not_found=error_msg, enrich_fn=_enrich_directory) # @app.route('/api/1/browse/') # @app.route('/api/1/browse//') def api_content_checksum_to_origin(q='sha1_git:26ac0281bc74e9bd8a4a4aab1c7c7a' '0c19d4436c'): """Return content information up to one of its origin if the content is found. Args: q is of the form algo_hash:hash with algo_hash in (sha1, sha1_git, sha256). Returns: Information on one possible origin for such content. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc if the content is not found. Example: GET /api/1/browse/sha1_git:88b9b366facda0b5ff8d8640ee9279bed346f242 """ found = service.lookup_hash(q)['found'] if not found: raise NotFoundExc('Content with %s not found.' % q) return service.lookup_hash_origin(q) @app.route('/api/1/content//raw/') def api_content_raw(q): """Return content's raw data if content is found. Args: q is of the form (algo_hash:)hash with algo_hash in (sha1, sha1_git, sha256). When algo_hash is not provided, 'hash' is considered sha1. Returns: Content's raw data in application/octet-stream. Raises: - BadInputExc in case of unknown algo_hash or bad hash - NotFoundExc if the content is not found. """ def generate(content): yield content['data'] content = service.lookup_content_raw(q) if not content: raise NotFoundExc('Content with %s not found.' % q) return Response(generate(content), mimetype='application/octet-stream') def _enrich_content(content): """Enrich content with 'data', a link to its raw content. """ content['data_url'] = url_for('api_content_raw', q=content['sha1']) return content @app.route('/api/1/content/') @app.route('/api/1/content//') def api_content_with_details(q='sha256:e2c76e40866bb6b28916387bdfc8649beceb' '523015738ec6d4d540c7fe65232b'): """Return content information if content is found. Args: q is of the form (algo_hash:)hash with algo_hash in (sha1, sha1_git, sha256). When algo_hash is not provided, 'hash' is considered sha1. Returns: Content's information. Raises: - BadInputExc in case of unknown algo_hash or bad hash. - NotFoundExc if the content is not found. Example: GET /api/1/content/sha256:e2c76e40866bb6b28916387bdfc8649beceb 523015738ec6d4d540c7fe65232b """ return _api_lookup( q, lookup_fn=service.lookup_content, error_msg_if_not_found='Content with %s not found.' % q, enrich_fn=_enrich_content) def _enrich_entity(entity): """Enrich entity with """ entity['uuid_url'] = url_for('api_entity_by_uuid', uuid=entity['uuid']) if 'parent' in entity and entity['parent']: entity['parent_url'] = url_for('api_entity_by_uuid', uuid=entity['parent']) return entity @app.route('/api/1/entity/') @app.route('/api/1/entity//') def api_entity_by_uuid(uuid='5f4d4c51-498a-4e28-88b3-b3e4e8396cba'): """Return content information if content is found. Args: q is of the form (algo_hash:)hash with algo_hash in (sha1, sha1_git, sha256). When algo_hash is not provided, 'hash' is considered sha1. Returns: Content's information. Raises: - BadInputExc in case of unknown algo_hash or bad hash. - NotFoundExc if the content is not found. Example: - GET /api/1/entity/7c33636b-8f11-4bda-89d9-ba8b76a42cec/ - GET /api/1/entity/5f4d4c51-498a-4e28-88b3-b3e4e8396cba/ """ return _api_lookup( uuid, lookup_fn=service.lookup_entity_by_uuid, error_msg_if_not_found="Entity with uuid '%s' not found." % uuid, enrich_fn=_enrich_entity) @app.route('/api/1/uploadnsearch/', methods=['POST']) def api_uploadnsearch(): """Upload the file's content in the post body request. Compute its hash and determine if it exists in the storage. Args: request.files filled with the filename's data to upload. Returns: Dictionary with 'sha1', 'filename' and 'found' predicate depending on whether we find it or not. Raises: BadInputExc in case of the form submitted is incorrect. """ file = request.files.get('filename') if not file: raise BadInputExc("Bad request, missing 'filename' entry in form.") return service.upload_and_search(file) diff --git a/swh/web/ui/tests/test_views.py b/swh/web/ui/tests/test_views.py index 87a01533..3b8ab229 100644 --- a/swh/web/ui/tests/test_views.py +++ b/swh/web/ui/tests/test_views.py @@ -1,787 +1,787 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from nose.tools import istest from swh.web.ui.tests import test_app from unittest.mock import patch from swh.web.ui.exc import BadInputExc class FileMock(): def __init__(self, filename): self.filename = filename class ViewTestCase(test_app.SWHViewTestCase): render_template = False @istest def info(self): # when rv = self.client.get('/about/') self.assertEquals(rv.status_code, 200) self.assert_template_used('about.html') self.assertIn(b'About', rv.data) @istest def search_default(self): # when rv = self.client.get('/search/') self.assertEquals(rv.status_code, 200) self.assertEqual(self.get_context_variable('q'), '') self.assertEqual(self.get_context_variable('messages'), []) self.assertEqual(self.get_context_variable('filename'), None) self.assertEqual(self.get_context_variable('file'), None) self.assert_template_used('upload_and_search.html') @patch('swh.web.ui.views.service') @istest def search_get_query_hash_not_found(self, mock_service): # given mock_service.lookup_hash.return_value = {'found': None} # when rv = self.client.get('/search/?q=sha1:456') self.assertEquals(rv.status_code, 200) self.assertEqual(self.get_context_variable('q'), 'sha1:456') self.assertEqual(self.get_context_variable('messages'), ['Content with hash sha1:456 not found!']) self.assertEqual(self.get_context_variable('filename'), None) self.assertEqual(self.get_context_variable('file'), None) self.assert_template_used('upload_and_search.html') mock_service.lookup_hash.assert_called_once_with('sha1:456') @patch('swh.web.ui.views.service') @istest def search_get_query_hash_bad_input(self, mock_service): # given mock_service.lookup_hash.side_effect = BadInputExc('error msg') # when rv = self.client.get('/search/?q=sha1_git:789') self.assertEquals(rv.status_code, 200) self.assertEqual(self.get_context_variable('q'), 'sha1_git:789') self.assertEqual(self.get_context_variable('messages'), ['error msg']) self.assertEqual(self.get_context_variable('filename'), None) self.assertEqual(self.get_context_variable('file'), None) self.assert_template_used('upload_and_search.html') mock_service.lookup_hash.assert_called_once_with('sha1_git:789') @patch('swh.web.ui.views.service') @istest def search_get_query_hash_found(self, mock_service): # given mock_service.lookup_hash.return_value = {'found': True} # when rv = self.client.get('/search/?q=sha1:123') self.assertEquals(rv.status_code, 200) self.assertEqual(self.get_context_variable('q'), 'sha1:123') self.assertEqual(self.get_context_variable('messages'), ['Content with hash sha1:123 found!']) self.assertEqual(self.get_context_variable('filename'), None) self.assertEqual(self.get_context_variable('file'), None) self.assert_template_used('upload_and_search.html') mock_service.lookup_hash.assert_called_once_with('sha1:123') @patch('swh.web.ui.views.service') @istest def search_post_query_hash_not_found(self, mock_service): # given mock_service.lookup_hash.return_value = {'found': None} # when rv = self.client.get('/search/?q=sha1:456') self.assertEquals(rv.status_code, 200) self.assertEqual(self.get_context_variable('q'), 'sha1:456') self.assertEqual(self.get_context_variable('messages'), ['Content with hash sha1:456 not found!']) self.assertEqual(self.get_context_variable('filename'), None) self.assertEqual(self.get_context_variable('file'), None) self.assert_template_used('upload_and_search.html') mock_service.lookup_hash.assert_called_once_with('sha1:456') @patch('swh.web.ui.views.service') @istest def search_post_query_hash_bad_input(self, mock_service): # given mock_service.lookup_hash.side_effect = BadInputExc('error msg!') # when rv = self.client.post('/search/', data=dict(q='sha1_git:987')) self.assertEquals(rv.status_code, 200) self.assertEqual(self.get_context_variable('q'), 'sha1_git:987') self.assertEqual(self.get_context_variable('messages'), ['error msg!']) self.assertEqual(self.get_context_variable('filename'), None) self.assertEqual(self.get_context_variable('file'), None) self.assert_template_used('upload_and_search.html') mock_service.lookup_hash.assert_called_once_with('sha1_git:987') @patch('swh.web.ui.views.service') @istest def search_post_query_hash_found(self, mock_service): # given mock_service.lookup_hash.return_value = {'found': True} # when rv = self.client.post('/search/', data=dict(q='sha1:321')) self.assertEquals(rv.status_code, 200) self.assertEqual(self.get_context_variable('q'), 'sha1:321') self.assertEqual(self.get_context_variable('messages'), ['Content with hash sha1:321 found!']) self.assertEqual(self.get_context_variable('filename'), None) self.assertEqual(self.get_context_variable('file'), None) self.assert_template_used('upload_and_search.html') mock_service.lookup_hash.assert_called_once_with('sha1:321') @patch('swh.web.ui.views.service') @patch('swh.web.ui.views.request') @istest def search_post_upload_and_hash_bad_input(self, mock_request, mock_service): # given mock_request.data = {} mock_request.method = 'POST' mock_request.files = dict(filename=FileMock('foobar')) mock_service.upload_and_search.side_effect = BadInputExc( 'error bad input') # when (mock_request completes the post request) rv = self.client.post('/search/') # then self.assertEquals(rv.status_code, 200) self.assertEqual(self.get_context_variable('messages'), ['error bad input']) self.assert_template_used('upload_and_search.html') mock_service.upload_and_search.called = True @patch('swh.web.ui.views.service') @patch('swh.web.ui.views.request') @istest def search_post_upload_and_hash_not_found(self, mock_request, mock_service): # given mock_request.data = {} mock_request.method = 'POST' mock_request.files = dict(filename=FileMock('foobar')) mock_service.upload_and_search.return_value = {'filename': 'foobar', 'sha1': 'blahhash', 'found': False} # when (mock_request completes the post request) rv = self.client.post('/search/') # then self.assertEquals(rv.status_code, 200) self.assertEqual(self.get_context_variable('messages'), ["File foobar with hash blahhash not found!"]) self.assertEqual(self.get_context_variable('filename'), 'foobar') self.assertEqual(self.get_context_variable('sha1'), 'blahhash') self.assert_template_used('upload_and_search.html') mock_service.upload_and_search.called = True @patch('swh.web.ui.views.service') @patch('swh.web.ui.views.request') @istest def search_post_upload_and_hash_found(self, mock_request, mock_service): # given mock_request.data = {} mock_request.method = 'POST' mock_request.files = dict(filename=FileMock('foobar')) mock_service.upload_and_search.return_value = {'filename': 'foobar', 'sha1': '123456789', 'found': True} # when (mock_request completes the post request) rv = self.client.post('/search/') # then self.assertEquals(rv.status_code, 200) self.assertEqual(self.get_context_variable('messages'), ["File foobar with hash 123456789 found!"]) self.assertEqual(self.get_context_variable('filename'), 'foobar') self.assertEqual(self.get_context_variable('sha1'), '123456789') self.assert_template_used('upload_and_search.html') mock_service.upload_and_search.called = True @patch('swh.web.ui.views.service') @istest def browse_content_detail_not_found(self, mock_service): # given mock_service.lookup_content.return_value = None # when rv = self.client.get('/browse/content/sha1:sha1-hash/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('content.html') self.assertEqual(self.get_context_variable('message'), 'Content with sha1:sha1-hash not found.') self.assertEqual(self.get_context_variable('content'), None) mock_service.lookup_content.assert_called_once_with( 'sha1:sha1-hash') @patch('swh.web.ui.views.service') @istest def browse_content_detail_bad_input(self, mock_service): # given mock_service.lookup_content.side_effect = BadInputExc('Bad input!') # when rv = self.client.get('/browse/content/sha1:sha1-hash/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('content.html') self.assertEqual(self.get_context_variable('message'), 'Bad input!') self.assertIsNone(self.get_context_variable('content')) mock_service.lookup_content.assert_called_once_with( 'sha1:sha1-hash') @patch('swh.web.ui.views.service') @istest def browse_content_detail(self, mock_service): # given stub_content = {'sha1': 'sha1_hash'} mock_service.lookup_content.return_value = stub_content # when rv = self.client.get('/browse/content/sha1:sha1-hash/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('content.html') self.assertIsNone(self.get_context_variable('message')) self.assertEqual(self.get_context_variable('content'), {'sha1': 'sha1_hash'}) mock_service.lookup_content.assert_called_once_with( 'sha1:sha1-hash') @patch('swh.web.ui.views.service') @istest def browse_content_data(self, mock_service): # given stub_content_raw = { 'sha1': 'sha1-hash', 'data': b'some-data' } mock_service.lookup_content_raw.return_value = stub_content_raw # when rv = self.client.get('/browse/content/sha1:sha1-hash/raw/') self.assertEquals(rv.status_code, 200) self.assert_template_used('content-data.html') self.assertEqual(self.get_context_variable('message'), 'Content sha1-hash') self.assertEqual(self.get_context_variable('content'), stub_content_raw) mock_service.lookup_content_raw.assert_called_once_with( 'sha1:sha1-hash') @patch('swh.web.ui.views.service') @istest def browse_content_data_not_found(self, mock_service): # given mock_service.lookup_content_raw.return_value = None # when rv = self.client.get('/browse/content/sha1:sha1-unknown/raw/') self.assertEquals(rv.status_code, 200) self.assert_template_used('content-data.html') self.assertEqual(self.get_context_variable('message'), 'Content with sha1:sha1-unknown not found.') self.assertEqual(self.get_context_variable('content'), None) mock_service.lookup_content_raw.assert_called_once_with( 'sha1:sha1-unknown') @patch('swh.web.ui.views.service') @istest def browse_content_data_invalid_hash(self, mock_service): # given mock_service.lookup_content_raw.side_effect = BadInputExc( 'Invalid hash') # when rv = self.client.get('/browse/content/sha2:sha1-invalid/raw/') self.assertEquals(rv.status_code, 200) self.assert_template_used('content-data.html') self.assertEqual(self.get_context_variable('message'), 'Invalid hash') self.assertEqual(self.get_context_variable('content'), None) mock_service.lookup_content_raw.assert_called_once_with( 'sha2:sha1-invalid') @patch('swh.web.ui.views.service') @patch('swh.web.ui.utils') @istest def browse_directory_bad_input(self, mock_utils, mock_service): # given mock_service.lookup_directory.side_effect = BadInputExc('Invalid hash') # when rv = self.client.get('/browse/directory/sha2-invalid/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('directory.html') self.assertEqual(self.get_context_variable('message'), 'Invalid hash') self.assertEqual(self.get_context_variable('files'), []) mock_service.lookup_directory.assert_called_once_with( 'sha2-invalid') @patch('swh.web.ui.views.service') @patch('swh.web.ui.utils') @istest def browse_directory_empty_result(self, mock_utils, mock_service): # given mock_service.lookup_directory.return_value = None # when rv = self.client.get('/browse/directory/some-sha1/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('directory.html') self.assertEqual(self.get_context_variable('message'), 'Directory some-sha1 not found.') self.assertEqual(self.get_context_variable('files'), []) mock_service.lookup_directory.assert_called_once_with( 'some-sha1') @patch('swh.web.ui.views.service') @patch('swh.web.ui.views.utils') @istest def browse_directory(self, mock_utils, mock_service): # given stub_directory_ls = [ {'type': 'dir', 'target': '123', 'name': 'some-dir-name'}, {'type': 'file', 'sha1': '654', 'name': 'some-filename'}, {'type': 'dir', 'target': '987', 'name': 'some-other-dirname'} ] mock_service.lookup_directory.return_value = stub_directory_ls stub_directory_map = [ {'link': '/path/to/url/dir/123', 'name': 'some-dir-name'}, {'link': '/path/to/url/file/654', 'name': 'some-filename'}, {'link': '/path/to/url/dir/987', 'name': 'some-other-dirname'} ] mock_utils.prepare_directory_listing.return_value = stub_directory_map # when rv = self.client.get('/browse/directory/some-sha1/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('directory.html') self.assertEqual(self.get_context_variable('message'), 'Listing for directory some-sha1:') self.assertEqual(self.get_context_variable('files'), stub_directory_map) mock_service.lookup_directory.assert_called_once_with( 'some-sha1') mock_utils.prepare_directory_listing.assert_called_once_with( stub_directory_ls) @patch('swh.web.ui.views.service') # @istest def browse_content_with_origin_content_not_found(self, mock_service): # given mock_service.lookup_hash.return_value = {'found': False} # when rv = self.client.get('/browse/content/sha256:some-sha256/origin/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('content-with-origin.html') self.assertEqual(self.get_context_variable('message'), 'Hash sha256:some-sha256 was not found.') mock_service.lookup_hash.assert_called_once_with( 'sha256:some-sha256') mock_service.lookup_hash_origin.called = False @patch('swh.web.ui.views.service') # @istest def browse_content_with_origin_bad_input(self, mock_service): # given mock_service.lookup_hash.side_effect = BadInputExc('Invalid hash') # when rv = self.client.get('/browse/content/sha256:some-sha256/origin/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('content-with-origin.html') self.assertEqual( self.get_context_variable('message'), 'Invalid hash') mock_service.lookup_hash.assert_called_once_with( 'sha256:some-sha256') mock_service.lookup_hash_origin.called = False @patch('swh.web.ui.views.service') # @istest def browse_content_with_origin(self, mock_service): # given mock_service.lookup_hash.return_value = {'found': True} mock_service.lookup_hash_origin.return_value = { 'origin_type': 'ftp', 'origin_url': '/some/url', 'revision': 'revision-hash', 'branch': 'master', 'path': '/path/to', } # when rv = self.client.get('/browse/content/sha256:some-sha256/origin/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('content-with-origin.html') self.assertEqual( self.get_context_variable('message'), "The content with hash sha256:some-sha256 has been seen on " + "origin with type 'ftp'\n" + "at url '/some/url'. The revision was identified at " + "'revision-hash' on branch 'master'.\n" + "The file's path referenced was '/path/to'.") mock_service.lookup_hash.assert_called_once_with( 'sha256:some-sha256') mock_service.lookup_hash_origin.assert_called_once_with( 'sha256:some-sha256') @patch('swh.web.ui.views.service') @istest def browse_origin_not_found(self, mock_service): # given mock_service.lookup_origin.return_value = None # when rv = self.client.get('/browse/origin/1/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('origin.html') self.assertEqual(self.get_context_variable('origin_id'), 1) self.assertEqual( self.get_context_variable('message'), 'Origin 1 not found!') mock_service.lookup_origin.assert_called_once_with(1) @patch('swh.web.ui.views.service') @istest def browse_origin_found(self, mock_service): # given mock_origin = {'type': 'git', 'lister': None, 'project': None, 'url': 'rsync://some/url', 'id': 426} mock_service.lookup_origin.return_value = mock_origin # when rv = self.client.get('/browse/origin/426/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('origin.html') self.assertEqual(self.get_context_variable('origin_id'), 426) self.assertEqual(self.get_context_variable('origin'), mock_origin) mock_service.lookup_origin.assert_called_once_with(426) @patch('swh.web.ui.views.service') @istest def browse_origin_bad_input(self, mock_service): # given mock_service.lookup_origin.side_effect = BadInputExc('wrong input') # when rv = self.client.get('/browse/origin/426/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('origin.html') self.assertEqual(self.get_context_variable('origin_id'), 426) mock_service.lookup_origin.assert_called_once_with(426) @patch('swh.web.ui.views.service') @istest def browse_person_not_found(self, mock_service): # given mock_service.lookup_person.return_value = None # when rv = self.client.get('/browse/person/1/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('person.html') self.assertEqual(self.get_context_variable('person_id'), 1) self.assertEqual( self.get_context_variable('message'), 'Person 1 not found!') mock_service.lookup_person.assert_called_once_with(1) @patch('swh.web.ui.views.service') @istest def browse_person_found(self, mock_service): # given mock_person = {'type': 'git', 'lister': None, 'project': None, 'url': 'rsync://some/url', 'id': 426} mock_service.lookup_person.return_value = mock_person # when rv = self.client.get('/browse/person/426/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('person.html') self.assertEqual(self.get_context_variable('person_id'), 426) self.assertEqual(self.get_context_variable('person'), mock_person) mock_service.lookup_person.assert_called_once_with(426) @patch('swh.web.ui.views.service') @istest def browse_person_bad_input(self, mock_service): # given mock_service.lookup_person.side_effect = BadInputExc('wrong input') # when rv = self.client.get('/browse/person/426/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('person.html') self.assertEqual(self.get_context_variable('person_id'), 426) mock_service.lookup_person.assert_called_once_with(426) @patch('swh.web.ui.views.service') @istest def browse_release_not_found(self, mock_service): # given mock_service.lookup_release.return_value = None # when rv = self.client.get('/browse/release/1/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('release.html') self.assertEqual(self.get_context_variable('sha1_git'), '1') self.assertEqual( self.get_context_variable('message'), 'Release 1 not found!') mock_service.lookup_release.assert_called_once_with('1') @patch('swh.web.ui.views.service') @istest def browse_release_bad_input(self, mock_service): # given mock_service.lookup_release.side_effect = BadInputExc('wrong input') # when rv = self.client.get('/browse/release/426/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('release.html') self.assertEqual(self.get_context_variable('sha1_git'), '426') mock_service.lookup_release.assert_called_once_with('426') @patch('swh.web.ui.views.service') @istest - def browse_release_found(self, mock_service): + def browse_release(self, mock_service): # given mock_release = { "date": "Sun, 05 Jul 2015 18:02:06 GMT", "id": "1e951912027ea6873da6985b91e50c47f645ae1a", "target": "d770e558e21961ad6cfdf0ff7df0eb5d7d4f0754", "synthetic": False, "target_type": "revision", "author": { "email": "torvalds@linux-foundation.org", "name": "Linus Torvalds" }, "message": "Linux 4.2-rc1\n", "name": "v4.2-rc1" } mock_service.lookup_release.return_value = mock_release expected_release = { "date": "Sun, 05 Jul 2015 18:02:06 GMT", "id": "1e951912027ea6873da6985b91e50c47f645ae1a", "target": '/browse/revision/d770e558e21961ad6cfdf0ff7df0' 'eb5d7d4f0754/', "synthetic": False, "target_type": "revision", "author": "Linus Torvalds ", "message": "Linux 4.2-rc1\n", "name": "v4.2-rc1" } # when rv = self.client.get('/browse/release/426/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('release.html') self.assertEqual(self.get_context_variable('sha1_git'), '426') self.assertEqual(self.get_context_variable('release'), expected_release) self.assertEqual(self.get_context_variable('keys'), [ 'id', 'name', 'date', 'message', 'author', 'target', 'target_type']) mock_service.lookup_release.assert_called_once_with('426') @patch('swh.web.ui.views.service') @istest def browse_revision_not_found(self, mock_service): # given mock_service.lookup_revision.return_value = None # when rv = self.client.get('/browse/revision/1/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('revision.html') self.assertEqual(self.get_context_variable('sha1_git'), '1') self.assertEqual( self.get_context_variable('message'), 'Revision 1 not found!') mock_service.lookup_revision.assert_called_once_with('1') @patch('swh.web.ui.views.service') @istest def browse_revision_bad_input(self, mock_service): # given mock_service.lookup_revision.side_effect = BadInputExc('wrong input') # when rv = self.client.get('/browse/revision/426/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('revision.html') self.assertEqual(self.get_context_variable('sha1_git'), '426') mock_service.lookup_revision.assert_called_once_with('426') @patch('swh.web.ui.views.service') @istest def browse_revision_found(self, mock_service): # given mock_revision = { 'id': 'd770e558e21961ad6cfdf0ff7df0eb5d7d4f0754', 'date': 'Sun, 05 Jul 2015 18:01:52 GMT', 'committer': { 'email': 'torvalds@linux-foundation.org', 'name': 'Linus Torvalds' }, 'committer_date': 'Sun, 05 Jul 2015 18:01:52 GMT', 'metadata': None, 'type': 'git', 'author': { 'email': 'torvalds@linux-foundation.org', 'name': 'Linus Torvalds' }, 'message': 'Linux 4.2-rc1\n', 'synthetic': False, 'directory': '2a1dbabeed4dcf1f4a4c441993b2ffc9d972780b', 'parents': [ 'a585d2b738bfa26326b3f1f40f0f1eda0c067ccf' ], } mock_service.lookup_revision.return_value = mock_revision expected_revision = { 'id': 'd770e558e21961ad6cfdf0ff7df0eb5d7d4f0754', 'date': 'Sun, 05 Jul 2015 18:01:52 GMT', 'committer': 'Linus Torvalds ', 'committer_date': 'Sun, 05 Jul 2015 18:01:52 GMT', 'type': 'git', 'author': 'Linus Torvalds ', 'message': 'Linux 4.2-rc1\n', 'synthetic': False, 'metadata': None, 'parents': [ '/browse/revision/a585d2b738bfa26326b3f1f40f0f1eda0c067ccf/' ], 'directory': '/browse/directory/2a1dbabeed4dcf1f4a4c441993b2f' 'fc9d972780b/', } # when rv = self.client.get('/browse/revision/426/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('revision.html') self.assertEqual(self.get_context_variable('sha1_git'), '426') self.assertEqual(self.get_context_variable('revision'), expected_revision) self.assertEqual(self.get_context_variable('keys'), ['id', 'message', 'date', 'author', 'committer', 'committer_date', 'synthetic']) mock_service.lookup_revision.assert_called_once_with('426')