diff --git a/swh/web/ui/service.py b/swh/web/ui/service.py index 48c73da4..4bb06d73 100644 --- a/swh/web/ui/service.py +++ b/swh/web/ui/service.py @@ -1,430 +1,487 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict from swh.core import hashutil from swh.web.ui import converters, query, upload, backend from swh.web.ui.exc import NotFoundExc def hash_and_search(filepath): """Hash the filepath's content as sha1, then search in storage if it exists. Args: Filepath of the file to hash and search. Returns: Tuple (hex sha1, found as True or false). The found boolean, according to whether the sha1 of the file is present or not. """ h = hashutil.hashfile(filepath) c = backend.content_find('sha1', h['sha1']) if c: r = converters.from_content(c) r['found'] = True return r else: return {'sha1': hashutil.hash_to_hex(h['sha1']), 'found': False} def upload_and_search(file): """Upload a file and compute its hash. """ tmpdir, filename, filepath = upload.save_in_upload_folder(file) res = {'filename': filename} try: content = hash_and_search(filepath) res.update(content) return res finally: # clean up if tmpdir: upload.cleanup(tmpdir) def lookup_hash(q): """Checks if the storage contains a given content checksum Args: query string of the form Returns: Dict with key found to True or False, according to whether the checksum is present or not """ algo, hash = query.parse_hash(q) found = backend.content_find(algo, hash) return {'found': found, 'algo': algo} def lookup_hash_origin(q): """Return information about the checksum contained in the query q. Args: query string of the form Returns: origin as dictionary if found for the given content. """ algo, hash = query.parse_hash(q) origin = backend.content_find_occurrence(algo, hash) return converters.from_origin(origin) def lookup_origin(origin_id): """Return information about the origin with id origin_id. Args: origin_id as string Returns: origin information as dict. """ return backend.origin_get(origin_id) def lookup_person(person_id): """Return information about the person with id person_id. Args: person_id as string Returns: person information as dict. """ person = backend.person_get(person_id) return converters.from_person(person) def lookup_directory(sha1_git): """Return information about the directory with id sha1_git. Args: sha1_git as string Returns: directory information as dict. """ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( sha1_git, ['sha1'], # HACK: sha1_git really 'Only sha1_git is supported.') dir = backend.directory_get(sha1_git_bin) if not dir: return None directory_entries = backend.directory_ls(sha1_git_bin) return map(converters.from_directory_entry, directory_entries) def lookup_release(release_sha1_git): """Return information about the release with sha1 release_sha1_git. Args: release_sha1_git: The release's sha1 as hexadecimal Returns: Release information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. """ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( release_sha1_git, ['sha1'], 'Only sha1_git is supported.') res = backend.release_get(sha1_git_bin) return converters.from_release(res) def lookup_revision(rev_sha1_git): """Return information about the revision with sha1 revision_sha1_git. Args: revision_sha1_git: The revision's sha1 as hexadecimal Returns: Revision information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. """ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( rev_sha1_git, ['sha1'], 'Only sha1_git is supported.') res = backend.revision_get(sha1_git_bin) return converters.from_revision(res) def lookup_revision_by(origin_id, branch_name="refs/heads/master", timestamp=None): """Lookup revisions by origin_id, branch_name and timestamp. If: - branch_name is not provided, lookup using 'refs/heads/master' as default. - ts is not provided, use the most recent Args: - origin_id: origin of the revision. - branch_name: revision's branch. - timestamp: revision's time frame. Yields: The revisions matching the criterions. """ res = backend.revision_get_by(origin_id, branch_name, timestamp) return converters.from_revision(res) def lookup_revision_log(rev_sha1_git, limit=100): """Return information about the revision with sha1 revision_sha1_git. Args: revision_sha1_git: The revision's sha1 as hexadecimal limit: the maximum number of revisions returned Returns: Revision information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. """ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( rev_sha1_git, ['sha1'], 'Only sha1_git is supported.') revision_entries = backend.revision_log(sha1_git_bin, limit) return map(converters.from_revision, revision_entries) def lookup_revision_with_context_by(origin_id, branch_name, ts, sha1_git, limit=100): """Return information about revision sha1_git, limited to the sub-graph of all transitive parents of sha1_git_root. sha1_git_root being resolved through the lookup of a revision by origin_id, branch_name and ts. In other words, sha1_git is an ancestor of sha1_git_root. Args: - origin_id: origin of the revision. - branch_name: revision's branch. - timestamp: revision's time frame. - sha1_git: one of sha1_git_root's ancestors. - limit: limit the lookup to 100 revisions back. Returns: Pair of (root_revision, revision). Information on sha1_git if it is an ancestor of sha1_git_root including children leading to sha1_git_root Raises: - BadInputExc in case of unknown algo_hash or bad hash. - NotFoundExc if either revision is not found or if sha1_git is not an ancestor of sha1_git_root. """ rev_root = backend.revision_get_by(origin_id, branch_name, ts) if not rev_root: raise NotFoundExc('Revision with (origin_id: %s, branch_name: %s' ', ts: %s) not found.' % (origin_id, branch_name, ts)) return (converters.from_revision(rev_root), lookup_revision_with_context(rev_root, sha1_git, limit)) def lookup_revision_with_context(sha1_git_root, sha1_git, limit=100): """Return information about revision sha1_git, limited to the sub-graph of all transitive parents of sha1_git_root. In other words, sha1_git is an ancestor of sha1_git_root. Args: sha1_git_root: latest revision. The type is either a sha1 (as an hex string) or a non converted dict. sha1_git: one of sha1_git_root's ancestors limit: limit the lookup to 100 revisions back Returns: Information on sha1_git if it is an ancestor of sha1_git_root including children leading to sha1_git_root Raises: BadInputExc in case of unknown algo_hash or bad hash NotFoundExc if either revision is not found or if sha1_git is not an ancestor of sha1_git_root """ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( sha1_git, ['sha1'], 'Only sha1_git is supported.') revision = backend.revision_get(sha1_git_bin) if not revision: raise NotFoundExc('Revision %s not found' % sha1_git) if isinstance(sha1_git_root, str): _, sha1_git_root_bin = query.parse_hash_with_algorithms_or_throws( sha1_git_root, ['sha1'], 'Only sha1_git is supported.') revision_root = backend.revision_get(sha1_git_root_bin) if not revision_root: raise NotFoundExc('Revision root %s not found' % sha1_git_root) else: sha1_git_root_bin = sha1_git_root revision_log = backend.revision_log(sha1_git_root_bin, limit) parents = {} children = defaultdict(list) for rev in revision_log: rev_id = rev['id'] parents[rev_id] = [] for parent_id in rev['parents']: parents[rev_id].append(parent_id) children[parent_id].append(rev_id) if revision['id'] not in parents: raise NotFoundExc('Revision %s is not an ancestor of %s' % (sha1_git, sha1_git_root)) revision['children'] = children[revision['id']] return converters.from_revision(revision) def lookup_directory_with_revision(sha1_git, dir_path=None): """Return information on directory pointed by revision with sha1_git. If dir_path is not provided, display top level directory. Otherwise, display the directory pointed by dir_path (if it exists). Args: sha1_git: revision's hash. dir_path: optional directory pointed to by that revision. Returns: Information on the directory pointed to by that revision. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc either if the revision is not found or the path referenced does not exist. NotImplementedError in case of dir_path exists but do not reference a type 'dir' or 'file'. """ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws( sha1_git, ['sha1'], 'Only sha1_git is supported.') revision = backend.revision_get(sha1_git_bin) if not revision: raise NotFoundExc('Revision %s not found' % sha1_git) dir_sha1_git_bin = revision['directory'] if dir_path: entity = backend.directory_entry_get_by_path(dir_sha1_git_bin, dir_path) if not entity: raise NotFoundExc( "Directory or File '%s' pointed to by revision %s not found" % (dir_path, sha1_git)) else: entity = {'type': 'dir', 'target': dir_sha1_git_bin} if entity['type'] == 'dir': directory_entries = backend.directory_ls(entity['target']) return {'type': 'dir', 'content': map(converters.from_directory_entry, directory_entries)} elif entity['type'] == 'file': # content content = backend.content_find('sha1_git', entity['target']) return {'type': 'file', 'content': converters.from_content(content)} else: raise NotImplementedError('Entity of type %s not implemented.' % entity['type']) +def lookup_revision_through(revision, limit=100): + """Retrieve a revision from the criterion stored in revision dictionary. + + Args: + revision: Dictionary of criterion to lookup the revision with. + Here are the supported combination of possible values: + - origin_id, branch_name, ts, sha1_git + - origin_id, branch_name, ts + - sha1_git_root, sha1_git + - sha1_git + + Returns: + None if the revision is not found or the actual revision. + + """ + rev = None + if 'origin_id' in revision and \ + 'branch_name' in revision and \ + 'ts' in revision and \ + 'sha1_git' in revision: + rev = lookup_revision_with_context_by(revision['origin_id'], + revision['branch_name'], + revision['ts'], + revision['sha1_git'], + limit) + elif 'origin_id' in revision and \ + 'branch_name' in revision and \ + 'ts' in revision: + rev = lookup_revision_by(revision['origin_id'], + revision['branch_name'], + revision['ts']) + elif 'sha1_git_root' in revision and \ + 'sha1_git' in revision: + rev = lookup_revision_with_context(revision['sha1_git_root'], + revision['sha1_git'], + limit) + elif 'sha1_git' in revision: + rev = lookup_revision(revision['sha1_git']) + else: + # this should not happen + raise NotImplementedError('Should not happen!') + + return rev + + +def lookup_directory_through_revision(revision, path=None, limit=100): + """Retrieve the directory information from the revision. + + """ + rev = lookup_revision_through(revision, limit) + + if rev: + return rev['id'], lookup_directory_with_revision(rev['id'], path) + else: + return None, None + + def lookup_content(q): """Lookup the content designed by q. Args: q: The release's sha1 as hexadecimal """ algo, hash = query.parse_hash(q) c = backend.content_find(algo, hash) return converters.from_content(c) def lookup_content_raw(q): """Lookup the content designed by q. Args: q: query string of the form Returns: dict with 'sha1' and 'data' keys. data representing its raw data decoded. """ algo, hash = query.parse_hash(q) c = backend.content_find(algo, hash) if not c: return None content = backend.content_get(c['sha1']) return converters.from_content(content) def stat_counters(): """Return the stat counters for Software Heritage Returns: A dict mapping textual labels to integer values. """ return backend.stat_counters() def lookup_entity_by_uuid(uuid): """Return the entity's hierarchy from its uuid. Args: uuid: entity's identifier. Returns: List of hierarchy entities from the entity with uuid. """ uuid = query.parse_uuid4(uuid) return backend.entity_get(uuid) diff --git a/swh/web/ui/tests/test_service.py b/swh/web/ui/tests/test_service.py index 0033efc9..2d1e62e4 100644 --- a/swh/web/ui/tests/test_service.py +++ b/swh/web/ui/tests/test_service.py @@ -1,1134 +1,1172 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from nose.tools import istest from unittest.mock import MagicMock, patch, call from swh.core.hashutil import hex_to_hash, hash_to_hex from swh.web.ui import service from swh.web.ui.exc import BadInputExc, NotFoundExc from swh.web.ui.tests import test_app class ServiceTestCase(test_app.SWHApiTestCase): @patch('swh.web.ui.service.backend') @istest def lookup_hash_does_not_exist(self, mock_backend): # given mock_backend.content_find = MagicMock(return_value=None) # when actual_lookup = service.lookup_hash( 'sha1_git:123caf10e9535160d90e874b45aa426de762f19f') # then self.assertEquals({'found': None, 'algo': 'sha1_git'}, actual_lookup) # check the function has been called with parameters mock_backend.content_find.assert_called_with( 'sha1_git', hex_to_hash('123caf10e9535160d90e874b45aa426de762f19f')) @patch('swh.web.ui.service.backend') @istest def lookup_hash_exist(self, mock_backend): # given stub_content = { 'sha1': hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f') } mock_backend.content_find = MagicMock(return_value=stub_content) # when actual_lookup = service.lookup_hash( 'sha1:456caf10e9535160d90e874b45aa426de762f19f') # then self.assertEquals({'found': stub_content, 'algo': 'sha1'}, actual_lookup) mock_backend.content_find.assert_called_with( 'sha1', hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f'), ) @patch('swh.web.ui.service.backend') @istest def lookup_hash_origin(self, mock_backend): # given mock_backend.content_find_occurrence = MagicMock(return_value={ 'origin_type': 'sftp', 'origin_url': 'sftp://ftp.gnu.org/gnu/octave', 'branch': 'octavio-3.4.0.tar.gz', 'revision': b'\xb0L\xaf\x10\xe9SQ`\xd9\x0e\x87KE\xaaBm\xe7b\xf1\x9f', # noqa 'path': b'octavio-3.4.0/doc/interpreter/octave.html/doc_002dS_005fISREG.html' # noqa }) expected_origin = { 'origin_type': 'sftp', 'origin_url': 'sftp://ftp.gnu.org/gnu/octave', 'branch': 'octavio-3.4.0.tar.gz', 'revision': 'b04caf10e9535160d90e874b45aa426de762f19f', 'path': 'octavio-3.4.0/doc/interpreter/octave.html/doc' '_002dS_005fISREG.html' } # when actual_origin = service.lookup_hash_origin( 'sha1_git:456caf10e9535160d90e874b45aa426de762f19f') # then self.assertEqual(actual_origin, expected_origin) mock_backend.content_find_occurrence.assert_called_with( 'sha1_git', hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')) @patch('swh.web.ui.service.backend') @istest def stat_counters(self, mock_backend): # given input_stats = { "content": 1770830, "directory": 211683, "directory_entry_dir": 209167, "directory_entry_file": 1807094, "directory_entry_rev": 0, "entity": 0, "entity_history": 0, "occurrence": 0, "occurrence_history": 19600, "origin": 1096, "person": 0, "release": 8584, "revision": 7792, "revision_history": 0, "skipped_content": 0 } mock_backend.stat_counters = MagicMock(return_value=input_stats) # when actual_stats = service.stat_counters() # then expected_stats = input_stats self.assertEqual(actual_stats, expected_stats) mock_backend.stat_counters.assert_called_with() @patch('swh.web.ui.service.backend') @patch('swh.web.ui.service.hashutil') @istest def hash_and_search(self, mock_hashutil, mock_backend): # given bhash = hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f') mock_hashutil.hashfile.return_value = {'sha1': bhash} mock_backend.content_find = MagicMock(return_value={ 'sha1': bhash, 'sha1_git': bhash, }) # when actual_content = service.hash_and_search('/some/path') # then self.assertEqual(actual_content, { 'sha1': '456caf10e9535160d90e874b45aa426de762f19f', 'sha1_git': '456caf10e9535160d90e874b45aa426de762f19f', 'found': True, }) mock_hashutil.hashfile.assert_called_once_with('/some/path') mock_backend.content_find.assert_called_once_with('sha1', bhash) @patch('swh.web.ui.service.hashutil') @istest def hash_and_search_not_found(self, mock_hashutil): # given bhash = hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f') mock_hashutil.hashfile.return_value = {'sha1': bhash} mock_hashutil.hash_to_hex = MagicMock( return_value='456caf10e9535160d90e874b45aa426de762f19f') self.storage.content_find = MagicMock(return_value=None) # when actual_content = service.hash_and_search('/some/path') # then self.assertEqual(actual_content, { 'sha1': '456caf10e9535160d90e874b45aa426de762f19f', 'found': False, }) mock_hashutil.hashfile.assert_called_once_with('/some/path') self.storage.content_find.assert_called_once_with({'sha1': bhash}) mock_hashutil.hash_to_hex.assert_called_once_with(bhash) @patch('swh.web.ui.service.upload') @istest def test_upload_and_search(self, mock_upload): mock_upload.save_in_upload_folder.return_value = ( '/tmp/dir', 'some-filename', '/tmp/dir/path/some-filename') service.hash_and_search = MagicMock(side_effect=lambda filepath: {'sha1': 'blah', 'found': True}) mock_upload.cleanup.return_value = None file = MagicMock(filename='some-filename') # when actual_res = service.upload_and_search(file) # then self.assertEqual(actual_res, { 'filename': 'some-filename', 'sha1': 'blah', 'found': True}) mock_upload.save_in_upload_folder.assert_called_with(file) mock_upload.cleanup.assert_called_with('/tmp/dir') service.hash_and_search.assert_called_once_with( '/tmp/dir/path/some-filename') @patch('swh.web.ui.service.backend') @istest def lookup_origin(self, mock_backend): # given mock_backend.origin_get = MagicMock(return_value={ 'id': 'origin-id', 'lister': 'uuid-lister', 'project': 'uuid-project', 'url': 'ftp://some/url/to/origin', 'type': 'ftp'}) # when actual_origin = service.lookup_origin('origin-id') # then self.assertEqual(actual_origin, {'id': 'origin-id', 'lister': 'uuid-lister', 'project': 'uuid-project', 'url': 'ftp://some/url/to/origin', 'type': 'ftp'}) mock_backend.origin_get.assert_called_with('origin-id') @patch('swh.web.ui.service.backend') @istest def lookup_release_ko_id_checksum_not_ok_because_not_a_sha1(self, mock_backend): # given mock_backend.release_get = MagicMock() with self.assertRaises(BadInputExc) as cm: # when service.lookup_release('not-a-sha1') self.assertIn('invalid checksum', cm.exception.args[0]) mock_backend.release_get.called = False @patch('swh.web.ui.service.backend') @istest def lookup_release_ko_id_checksum_ok_but_not_a_sha1(self, mock_backend): # given mock_backend.release_get = MagicMock() # when with self.assertRaises(BadInputExc) as cm: service.lookup_release( '13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f62d4daf5' '1aea892abe') self.assertIn('sha1_git supported', cm.exception.args[0]) mock_backend.release_get.called = False @patch('swh.web.ui.service.backend') @istest def lookup_release(self, mock_backend): # given mock_backend.release_get = MagicMock(return_value={ 'id': hex_to_hash('65a55bbdf3629f916219feb3dcc7393ded1bc8db'), 'target': None, 'date': datetime.datetime(2015, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc), 'name': b'v0.0.1', 'message': b'synthetic release', 'synthetic': True, }) # when actual_release = service.lookup_release( '65a55bbdf3629f916219feb3dcc7393ded1bc8db') # then self.assertEqual(actual_release, { 'id': '65a55bbdf3629f916219feb3dcc7393ded1bc8db', 'target': None, 'date': datetime.datetime(2015, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc), 'name': 'v0.0.1', 'message': 'synthetic release', 'synthetic': True, }) mock_backend.release_get.assert_called_with( hex_to_hash('65a55bbdf3629f916219feb3dcc7393ded1bc8db')) @istest def lookup_revision_with_context_ko_not_a_sha1_1(self): # given sha1_git = '13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f62d4' \ 'daf51aea892abe' sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db' # when with self.assertRaises(BadInputExc) as cm: service.lookup_revision_with_context(sha1_git_root, sha1_git) self.assertIn('Only sha1_git is supported', cm.exception.args[0]) @istest def lookup_revision_with_context_ko_not_a_sha1_2(self): # given sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db' sha1_git = '13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f6' \ '2d4daf51aea892abe' # when with self.assertRaises(BadInputExc) as cm: service.lookup_revision_with_context(sha1_git_root, sha1_git) self.assertIn('Only sha1_git is supported', cm.exception.args[0]) @patch('swh.web.ui.service.backend') @istest def lookup_revision_with_context_ko_sha1_git_does_not_exist( self, mock_backend): # given sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db' sha1_git = '777777bdf3629f916219feb3dcc7393ded1bc8db' sha1_git_bin = hex_to_hash(sha1_git) mock_backend.revision_get.return_value = None # when with self.assertRaises(NotFoundExc) as cm: service.lookup_revision_with_context(sha1_git_root, sha1_git) self.assertIn('Revision 777777bdf3629f916219feb3dcc7393ded1bc8db' ' not found', cm.exception.args[0]) mock_backend.revision_get.assert_called_once_with( sha1_git_bin) @patch('swh.web.ui.service.backend') @istest def lookup_revision_with_context_ko_root_sha1_git_does_not_exist( self, mock_backend): # given sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db' sha1_git = '777777bdf3629f916219feb3dcc7393ded1bc8db' sha1_git_root_bin = hex_to_hash(sha1_git_root) sha1_git_bin = hex_to_hash(sha1_git) mock_backend.revision_get.side_effect = ['foo', None] # when with self.assertRaises(NotFoundExc) as cm: service.lookup_revision_with_context(sha1_git_root, sha1_git) self.assertIn('Revision 65a55bbdf3629f916219feb3dcc7393ded1bc8db' ' not found', cm.exception.args[0]) mock_backend.revision_get.assert_has_calls([call(sha1_git_bin), call(sha1_git_root_bin)]) @patch('swh.web.ui.service.backend') @patch('swh.web.ui.service.query') @istest def lookup_revision_with_context(self, mock_query, mock_backend): # given sha1_git_root = '666' sha1_git = '883' sha1_git_root_bin = b'666' sha1_git_bin = b'883' sha1_git_root_dict = { 'id': sha1_git_root_bin, 'parents': [b'999'], } sha1_git_dict = { 'id': sha1_git_bin, 'parents': [], 'directory': b'278', } stub_revisions = [ sha1_git_root_dict, { 'id': b'999', 'parents': [b'777', b'883', b'888'], }, { 'id': b'777', 'parents': [b'883'], }, sha1_git_dict, { 'id': b'888', 'parents': [b'889'], }, { 'id': b'889', 'parents': [], }, ] # inputs ok mock_query.parse_hash_with_algorithms_or_throws.side_effect = [ ('sha1', sha1_git_bin), ('sha1', sha1_git_root_bin) ] # lookup revision first 883, then 666 (both exists) mock_backend.revision_get.side_effect = [ sha1_git_dict, sha1_git_root_dict ] mock_backend.revision_log = MagicMock( return_value=stub_revisions) # when actual_revision = service.lookup_revision_with_context( sha1_git_root, sha1_git) # then self.assertEquals(actual_revision, { 'id': hash_to_hex(sha1_git_bin), 'parents': [], 'children': [hash_to_hex(b'999'), hash_to_hex(b'777')], 'directory': hash_to_hex(b'278'), }) mock_query.parse_hash_with_algorithms_or_throws.assert_has_calls( [call(sha1_git, ['sha1'], 'Only sha1_git is supported.'), call(sha1_git_root, ['sha1'], 'Only sha1_git is supported.')]) mock_backend.revision_log.assert_called_with( sha1_git_root_bin, 100) @patch('swh.web.ui.service.backend') @patch('swh.web.ui.service.query') @istest def lookup_directory_with_revision_revision_not_found(self, mock_query, mock_backend): # given mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1', b'123') mock_backend.revision_get.return_value = None # when with self.assertRaises(NotFoundExc) as cm: service.lookup_directory_with_revision('123') self.assertIn('Revision 123 not found', cm.exception.args[0]) mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with ('123', ['sha1'], 'Only sha1_git is supported.') mock_backend.revision_get.assert_called_once_with(b'123') @patch('swh.web.ui.service.backend') @patch('swh.web.ui.service.query') @istest def lookup_directory_with_revision_revision_without_path(self, mock_query, mock_backend): # given mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1', b'123') dir_id = b'dir-id-as-sha1' mock_backend.revision_get.return_value = { 'directory': dir_id, } stub_dir_entries = [{ 'id': b'123', 'type': 'dir' }, { 'id': b'456', 'type': 'file' }] mock_backend.directory_ls.return_value = stub_dir_entries # when actual_directory_entries = service.lookup_directory_with_revision( '123') self.assertEqual(actual_directory_entries['type'], 'dir') self.assertEqual(list(actual_directory_entries['content']), stub_dir_entries) mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with ('123', ['sha1'], 'Only sha1_git is supported.') mock_backend.revision_get.assert_called_once_with(b'123') mock_backend.directory_ls.assert_called_once_with(dir_id) @patch('swh.web.ui.service.backend') @patch('swh.web.ui.service.query') @istest def lookup_directory_with_revision_revision_with_path_to_dir(self, mock_query, mock_backend): # given mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1', b'123') dir_id = b'dir-id-as-sha1' mock_backend.revision_get.return_value = { 'directory': dir_id, } stub_dir_entries = [{ 'id': b'12', 'type': 'dir' }, { 'id': b'34', 'type': 'file' }] mock_backend.directory_entry_get_by_path.return_value = { 'type': 'dir', 'name': b'some/path', 'target': b'456' } mock_backend.directory_ls.return_value = stub_dir_entries # when actual_directory_entries = service.lookup_directory_with_revision( '123', 'some/path') self.assertEqual(actual_directory_entries['type'], 'dir') self.assertEqual(list(actual_directory_entries['content']), stub_dir_entries) mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with ('123', ['sha1'], 'Only sha1_git is supported.') mock_backend.revision_get.assert_called_once_with(b'123') mock_backend.directory_entry_get_by_path.assert_called_once_with( dir_id, 'some/path') mock_backend.directory_ls.assert_called_once_with(b'456') @patch('swh.web.ui.service.backend') @patch('swh.web.ui.service.query') @istest def lookup_directory_with_revision_revision_with_path_to_file( self, mock_query, mock_backend): # given mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1', b'123') dir_id = b'dir-id-as-sha1' mock_backend.revision_get.return_value = { 'directory': dir_id, } mock_backend.directory_entry_get_by_path.return_value = { 'type': 'file', 'name': b'some/path/to/file', 'target': b'789' } stub_content = { 'status': 'visible', } mock_backend.content_find.return_value = stub_content # when actual_content = service.lookup_directory_with_revision( '123', 'some/path/to/file') # then self.assertEqual(actual_content, {'type': 'file', 'content': stub_content}) mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with ('123', ['sha1'], 'Only sha1_git is supported.') mock_backend.revision_get.assert_called_once_with(b'123') mock_backend.directory_entry_get_by_path.assert_called_once_with( b'dir-id-as-sha1', 'some/path/to/file') mock_backend.content_find.assert_called_once_with('sha1_git', b'789') @patch('swh.web.ui.service.backend') @patch('swh.web.ui.service.query') @istest def lookup_directory_with_revision_ko_revision_with_path_to_nowhere( self, mock_query, mock_backend): # given mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1', b'123') dir_id = b'dir-id-as-sha1' mock_backend.revision_get.return_value = { 'directory': dir_id, } mock_backend.directory_entry_get_by_path.return_value = None # when with self.assertRaises(NotFoundExc) as cm: service.lookup_directory_with_revision( '123', 'path/to/something/unknown') self.assertIn("Directory/File 'path/to/something/unknown' " + "pointed to by revision 123 not found", cm.exception.args[0]) mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with ('123', ['sha1'], 'Only sha1_git is supported.') mock_backend.revision_get.assert_called_once_with(b'123') mock_backend.directory_entry_get_by_path.assert_called_once_with( b'dir-id-as-sha1', 'path/to/something/unknown') @patch('swh.web.ui.service.backend') @patch('swh.web.ui.service.query') @istest def lookup_directory_with_revision_ok_type_not_implemented( self, mock_query, mock_backend): # given mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1', b'123') dir_id = b'dir-id-as-sha1' mock_backend.revision_get.return_value = { 'directory': dir_id, } mock_backend.directory_entry_get_by_path.return_value = { 'type': 'rev', 'name': b'some/path/to/rev', 'target': b'456' } stub_content = { 'id': b'12', 'type': 'file' } mock_backend.content_get.return_value = stub_content # when with self.assertRaises(NotImplementedError) as cm: service.lookup_directory_with_revision( '123', 'some/path/to/rev') self.assertIn("Entity of type 'rev' not implemented.", cm.exception.args[0]) # then mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with ('123', ['sha1'], 'Only sha1_git is supported.') mock_backend.revision_get.assert_called_once_with(b'123') mock_backend.directory_entry_get_by_path.assert_called_once_with( b'dir-id-as-sha1', 'some/path/to/rev') @patch('swh.web.ui.service.backend') @istest def lookup_revision(self, mock_backend): # given mock_backend.revision_get = MagicMock(return_value={ 'id': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'), 'directory': hex_to_hash( '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'), 'author': { 'name': b'bill & boule', 'email': b'bill@boule.org', }, 'committer': { 'name': b'boule & bill', 'email': b'boule@bill.org', }, 'message': b'elegant fix for bug 31415957', 'date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'date_offset': 0, 'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'committer_date_offset': 0, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], }) # when actual_revision = service.lookup_revision( '18d8be353ed3480476f032475e7c233eff7371d5') # then self.assertEqual(actual_revision, { 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author': { 'name': 'bill & boule', 'email': 'bill@boule.org', }, 'committer': { 'name': 'boule & bill', 'email': 'boule@bill.org', }, 'message': 'elegant fix for bug 31415957', 'date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'date_offset': 0, 'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'committer_date_offset': 0, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], }) mock_backend.revision_get.assert_called_with( hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5')) @patch('swh.web.ui.service.backend') @istest def lookup_revision_log(self, mock_backend): # given stub_revision_log = [{ 'id': hex_to_hash('28d8be353ed3480476f032475e7c233eff7371d5'), 'directory': hex_to_hash( '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'), 'author': { 'name': b'bill & boule', 'email': b'bill@boule.org', }, 'committer': { 'name': b'boule & bill', 'email': b'boule@bill.org', }, 'message': b'elegant fix for bug 31415957', 'date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'date_offset': 0, 'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'committer_date_offset': 0, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], }] mock_backend.revision_log = MagicMock(return_value=stub_revision_log) # when actual_revision = service.lookup_revision_log( 'abcdbe353ed3480476f032475e7c233eff7371d5') # then self.assertEqual(list(actual_revision), [{ 'id': '28d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author': { 'name': 'bill & boule', 'email': 'bill@boule.org', }, 'committer': { 'name': 'boule & bill', 'email': 'boule@bill.org', }, 'message': 'elegant fix for bug 31415957', 'date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'date_offset': 0, 'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'committer_date_offset': 0, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], }]) mock_backend.revision_log.assert_called_with( hex_to_hash('abcdbe353ed3480476f032475e7c233eff7371d5'), 100) @patch('swh.web.ui.service.backend') @istest def lookup_content_raw_not_found(self, mock_backend): # given mock_backend.content_find = MagicMock(return_value=None) # when actual_content = service.lookup_content_raw( 'sha1:18d8be353ed3480476f032475e7c233eff7371d5') # then self.assertIsNone(actual_content) mock_backend.content_find.assert_called_with( 'sha1', hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5')) @patch('swh.web.ui.service.backend') @istest def lookup_content_raw(self, mock_backend): # given mock_backend.content_find = MagicMock(return_value={ 'sha1': '18d8be353ed3480476f032475e7c233eff7371d5', }) mock_backend.content_get = MagicMock(return_value={ 'data': b'binary data'}) # when actual_content = service.lookup_content_raw( 'sha256:39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926') # then self.assertEquals(actual_content, {'data': b'binary data'}) mock_backend.content_find.assert_called_once_with( 'sha256', hex_to_hash('39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926')) mock_backend.content_get.assert_called_once_with( '18d8be353ed3480476f032475e7c233eff7371d5') @patch('swh.web.ui.service.backend') @istest def lookup_content_not_found(self, mock_backend): # given mock_backend.content_find = MagicMock(return_value=None) # when actual_content = service.lookup_content( 'sha1:18d8be353ed3480476f032475e7c233eff7371d5') # then self.assertIsNone(actual_content) mock_backend.content_find.assert_called_with( 'sha1', hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5')) @patch('swh.web.ui.service.backend') @istest def lookup_content_with_sha1(self, mock_backend): # given mock_backend.content_find = MagicMock(return_value={ 'sha1': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'), 'sha256': hex_to_hash('39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926'), 'sha1_git': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'length': 190, 'status': 'hidden', }) # when actual_content = service.lookup_content( 'sha1:18d8be353ed3480476f032475e7c233eff7371d5') # then self.assertEqual(actual_content, { 'sha1': '18d8be353ed3480476f032475e7c233eff7371d5', 'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d274' '7d3bf96c926', 'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'length': 190, 'status': 'absent', }) mock_backend.content_find.assert_called_with( 'sha1', hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5')) @patch('swh.web.ui.service.backend') @istest def lookup_content_with_sha256(self, mock_backend): # given mock_backend.content_find = MagicMock(return_value={ 'sha1': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'), 'sha256': hex_to_hash('39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926'), 'sha1_git': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'length': 360, 'status': 'visible', }) # when actual_content = service.lookup_content( 'sha256:39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926') # then self.assertEqual(actual_content, { 'sha1': '18d8be353ed3480476f032475e7c233eff7371d5', 'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d274' '7d3bf96c926', 'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'length': 360, 'status': 'visible', }) mock_backend.content_find.assert_called_with( 'sha256', hex_to_hash('39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926')) @patch('swh.web.ui.service.backend') @istest def lookup_person(self, mock_backend): # given mock_backend.person_get = MagicMock(return_value={ 'id': 'person_id', 'name': b'some_name', 'email': b'some-email', }) # when actual_person = service.lookup_person('person_id') # then self.assertEqual(actual_person, { 'id': 'person_id', 'name': 'some_name', 'email': 'some-email', }) mock_backend.person_get.assert_called_with('person_id') @patch('swh.web.ui.service.backend') @istest def lookup_directory_bad_checksum(self, mock_backend): # given mock_backend.directory_ls = MagicMock() # when with self.assertRaises(BadInputExc): service.lookup_directory('directory_id') # then mock_backend.directory_ls.called = False @patch('swh.web.ui.service.backend') @patch('swh.web.ui.service.query') @istest def lookup_directory_not_found(self, mock_query, mock_backend): # given mock_query.parse_hash_with_algorithms_or_throws.return_value = ( 'sha1', 'directory-id-bin') mock_backend.directory_get.return_value = None # when actual_dir = service.lookup_directory('directory_id') # then self.assertIsNone(actual_dir) mock_query.parse_hash_with_algorithms_or_throws.assert_called_with( 'directory_id', ['sha1'], 'Only sha1_git is supported.') mock_backend.directory_get.assert_called_with('directory-id-bin') mock_backend.directory_ls.called = False @patch('swh.web.ui.service.backend') @patch('swh.web.ui.service.query') @istest def lookup_directory(self, mock_query, mock_backend): mock_query.parse_hash_with_algorithms_or_throws.return_value = ( 'sha1', 'directory-sha1-bin') # something that exists is all that matters here mock_backend.directory_get.return_value = {'id': b'directory-sha1-bin'} # given stub_dir_entries = [{ 'sha1': hex_to_hash('5c6f0e2750f48fa0bd0c4cf5976ba0b9e0' '2ebda5'), 'sha256': hex_to_hash('39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926'), 'sha1_git': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'target': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'dir_id': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'name': b'bob', 'type': 10, }] expected_dir_entries = [{ 'sha1': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5', 'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d2747' 'd3bf96c926', 'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'target': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'dir_id': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'name': 'bob', 'type': 10, }] mock_backend.directory_ls.return_value = stub_dir_entries # when actual_directory_ls = list(service.lookup_directory( 'directory-sha1')) # then self.assertEqual(actual_directory_ls, expected_dir_entries) mock_query.parse_hash_with_algorithms_or_throws.assert_called_with( 'directory-sha1', ['sha1'], 'Only sha1_git is supported.') mock_backend.directory_ls.assert_called_with( 'directory-sha1-bin') @patch('swh.web.ui.service.backend') @istest def lookup_revision_by_nothing_found(self, mock_backend): # given mock_backend.revision_get_by.return_value = None # when actual_revisions = service.lookup_revision_by(1) # then self.assertIsNone(actual_revisions) mock_backend.revision_get_by(1, 'master', None) @patch('swh.web.ui.service.backend') @istest def lookup_revision_by(self, mock_backend): # given stub_rev = { 'id': hex_to_hash('28d8be353ed3480476f032475e7c233eff7371d5'), 'directory': hex_to_hash( '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'), 'author': { 'name': b'ynot', 'email': b'ynot@blah.org', }, 'committer': { 'name': b'ynot', 'email': b'ynot@blah.org', }, 'message': b'elegant solution 31415', 'date': datetime.datetime(2016, 1, 17, 11, 23, 54), 'date_offset': 0, 'committer_date': datetime.datetime(2016, 1, 17, 11, 23, 54), 'committer_date_offset': 0, } expected_rev = { 'id': '28d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author': { 'name': 'ynot', 'email': 'ynot@blah.org', }, 'committer': { 'name': 'ynot', 'email': 'ynot@blah.org', }, 'message': 'elegant solution 31415', 'date': datetime.datetime(2016, 1, 17, 11, 23, 54), 'date_offset': 0, 'committer_date': datetime.datetime(2016, 1, 17, 11, 23, 54), 'committer_date_offset': 0, } mock_backend.revision_get_by.return_value = stub_rev # when actual_revision = service.lookup_revision_by(10, 'master2', 'some-ts') # then self.assertEquals(actual_revision, expected_rev) mock_backend.revision_get_by(1, 'master2', 'some-ts') @patch('swh.web.ui.service.backend') @istest def lookup_revision_with_context_by_ko(self, mock_backend): # given mock_backend.revision_get_by.return_value = None # when with self.assertRaises(NotFoundExc) as cm: origin_id = 1 branch_name = 'master3' ts = None service.lookup_revision_with_context_by(origin_id, branch_name, ts, 'sha1') # then self.assertIn( 'Revision with (origin_id: %s, branch_name: %s' ', ts: %s) not found.' % (origin_id, branch_name, ts), cm.exception.args[0]) mock_backend.revision_get_by.assert_called_once_with( origin_id, branch_name, ts) @patch('swh.web.ui.service.lookup_revision_with_context') @patch('swh.web.ui.service.backend') @istest def lookup_revision_with_context_by(self, mock_backend, mock_lookup_revision_with_context): # given stub_root_rev = {'id': 'root-rev-id'} mock_backend.revision_get_by.return_value = {'id': 'root-rev-id'} stub_rev = {'id': 'rev-found'} mock_lookup_revision_with_context.return_value = stub_rev # when origin_id = 1 branch_name = 'master3' ts = None sha1_git = 'sha1' actual_root_rev, actual_rev = service.lookup_revision_with_context_by( origin_id, branch_name, ts, sha1_git) # then self.assertEquals(actual_root_rev, stub_root_rev) self.assertEquals(actual_rev, stub_rev) mock_backend.revision_get_by.assert_called_once_with( origin_id, branch_name, ts) mock_lookup_revision_with_context.assert_called_once_with( stub_root_rev, sha1_git, 100) @patch('swh.web.ui.service.backend') @patch('swh.web.ui.service.query') @istest def lookup_entity_by_uuid(self, mock_query, mock_backend): # given uuid_test = 'correct-uuid' mock_query.parse_uuid4.return_value = uuid_test stub_entities = [{'uuid': uuid_test}] mock_backend.entity_get.return_value = stub_entities # when actual_entities = service.lookup_entity_by_uuid(uuid_test) # then self.assertEquals(actual_entities, stub_entities) mock_query.parse_uuid4.assert_called_once_with(uuid_test) mock_backend.entity_get.assert_called_once_with(uuid_test) + + @patch('swh.web.ui.service.lookup_revision_through') + @istest + def lookup_directory_through_revision_empty(self, mock_lookup_rev): + # given + mock_lookup_rev.return_value = None + + # when + rev_id, actual_dir = service.lookup_directory_through_revision( + {'sha1_git': 'sha1'}, 'some/path', 100) + + # then + self.assertIsNone(rev_id) + self.assertIsNone(actual_dir) + + mock_lookup_rev.assert_called_once_with({'sha1_git': 'sha1'}, 100) + + @patch('swh.web.ui.service.lookup_revision_through') + @patch('swh.web.ui.service.lookup_directory_with_revision') + @istest + def lookup_directory_through_revision(self, + mock_lookup_dir, + mock_lookup_rev): + # given + mock_lookup_rev.return_value = {'id': 'rev-id'} + mock_lookup_dir.return_value = {'type': 'dir', 'content': []} + + # when + rev_id, actual_dir = service.lookup_directory_through_revision( + {'sha1_git': 'yet-another-sha1'}, 'some/path', 100) + + # then + self.assertEquals(rev_id, 'rev-id') + self.assertEquals(actual_dir, {'type': 'dir', 'content': []}) + + mock_lookup_rev.assert_called_once_with( + {'sha1_git': 'yet-another-sha1'}, 100) + mock_lookup_dir.assert_called_with('rev-id', 'some/path') diff --git a/swh/web/ui/tests/test_utils.py b/swh/web/ui/tests/test_utils.py index 886f425d..8cd5032f 100644 --- a/swh/web/ui/tests/test_utils.py +++ b/swh/web/ui/tests/test_utils.py @@ -1,321 +1,383 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import dateutil import unittest from unittest.mock import patch, call from nose.tools import istest from swh.web.ui import utils class UtilsTestCase(unittest.TestCase): def setUp(self): self.url_map = [dict(rule='/other/', methods=set(['GET', 'POST', 'HEAD']), endpoint='foo'), dict(rule='/some/old/url/', methods=set(['GET', 'POST']), endpoint='blablafn'), dict(rule='/other/old/url/', methods=set(['GET', 'HEAD']), endpoint='bar'), dict(rule='/other', methods=set([]), endpoint=None), dict(rule='/other2', methods=set([]), endpoint=None)] @istest def filter_endpoints_1(self): # when actual_data = utils.filter_endpoints(self.url_map, '/some') # then self.assertEquals(actual_data, { '/some/old/url/': { 'methods': ['GET', 'POST'], 'endpoint': 'blablafn' } }) @istest def filter_endpoints_2(self): # when actual_data = utils.filter_endpoints(self.url_map, '/other', blacklist=['/other2']) # then # rules /other is skipped because its' exactly the prefix url # rules /other2 is skipped because it's blacklisted self.assertEquals(actual_data, { '/other/': { 'methods': ['GET', 'HEAD', 'POST'], 'endpoint': 'foo' }, '/other/old/url/': { 'methods': ['GET', 'HEAD'], 'endpoint': 'bar' } }) @patch('swh.web.ui.utils.flask') @istest def prepare_directory_listing(self, mock_flask): # given def mock_url_for(url_key, **kwds): if url_key == 'browse_directory': sha1_git = kwds['sha1_git'] return '/path/to/url/dir' + '/' + sha1_git else: sha1_git = kwds['q'] return '/path/to/url/file' + '/' + sha1_git mock_flask.url_for.side_effect = mock_url_for inputs = [{'type': 'dir', 'target': '123', 'name': 'some-dir-name'}, {'type': 'file', 'sha1': '654', 'name': 'some-filename'}, {'type': 'dir', 'target': '987', 'name': 'some-other-dirname'}] expected_output = [{'link': '/path/to/url/dir/123', 'name': 'some-dir-name', 'type': 'dir'}, {'link': '/path/to/url/file/654', 'name': 'some-filename', 'type': 'file'}, {'link': '/path/to/url/dir/987', 'name': 'some-other-dirname', 'type': 'dir'}] # when actual_outputs = utils.prepare_directory_listing(inputs) # then self.assertEquals(actual_outputs, expected_output) mock_flask.url_for.assert_has_calls([ call('browse_directory', sha1_git='123'), call('browse_content_data', q='654'), call('browse_directory', sha1_git='987'), ]) @patch('swh.web.ui.utils.flask') @istest def prepare_directory_listing_with_revision(self, mock_flask): # given def mock_url_for(url_key, **kwds): if url_key == 'browse_revision_directory': sha1_git = kwds['sha1_git'] path = kwds['path'] return '/browse/revision/' + sha1_git + '/directory/' + path mock_flask.url_for.side_effect = mock_url_for inputs = [{'type': 'dir', 'target': '123', 'name': 'some-dir-name'}, {'type': 'file', 'sha1': '654', 'name': 'some-filename'}, {'type': 'dir', 'target': '987', 'name': 'some-other-dirname'}] expected_output = [{'link': '/browse/revision/revsha1/directory/' 'some/path/some-dir-name', 'name': 'some-dir-name', 'type': 'dir'}, {'link': '/browse/revision/revsha1/directory/' 'some/path/some-filename', 'name': 'some-filename', 'type': 'file'}, {'link': '/browse/revision/revsha1/directory/' 'some/path/some-other-dirname', 'name': 'some-other-dirname', 'type': 'dir'}] # when actual_outputs = utils.prepare_directory_listing_with_revision( 'revsha1', 'some/path', inputs) # then self.assertEquals(actual_outputs, expected_output) mock_flask.url_for.assert_has_calls([ call('browse_revision_directory', sha1_git='revsha1', path='some/path/some-dir-name'), call('browse_revision_directory', sha1_git='revsha1', path='some/path/some-filename'), call('browse_revision_directory', sha1_git='revsha1', path='some/path/some-other-dirname'), ]) @patch('swh.web.ui.utils.flask') @istest def prepare_revision_view(self, mock_flask): # given def mock_url_for(url_key, **kwds): if url_key == 'browse_directory': return '/browse/directory/' + kwds['sha1_git'] elif url_key == 'browse_revision': return '/browse/revision/' + kwds['sha1_git'] mock_flask.url_for.side_effect = mock_url_for rev_input = { 'id': 'd770e558e21961ad6cfdf0ff7df0eb5d7d4f0754', 'date': 'Sun, 05 Jul 2015 18:01:52 GMT', 'committer': { 'email': 'torvalds@linux-foundation.org', 'name': 'Linus Torvalds' }, 'type': 'git', 'author': { 'email': 'torvalds@linux-foundation.org', 'name': 'Linus Torvalds' }, 'message': 'Linux 4.2-rc1\n', 'synthetic': False, 'directory': '2a1dbabeed4dcf1f4a4c441993b2ffc9d972780b', 'parents': [ 'a585d2b738bfa26326b3f1f40f0f1eda0c067ccf' ], 'children': [ 'b696e2b738bfa26326b3f1f40f0f1eda0c067ccf' ], } expected_rev = { 'id': 'd770e558e21961ad6cfdf0ff7df0eb5d7d4f0754', 'date': 'Sun, 05 Jul 2015 18:01:52 GMT', 'committer': 'Linus Torvalds ', 'type': 'git', 'author': 'Linus Torvalds ', 'message': 'Linux 4.2-rc1\n', 'synthetic': False, 'directory': '/browse/directory/' '2a1dbabeed4dcf1f4a4c441993b2ffc9d972780b', 'parents': [ '/browse/revision/a585d2b738bfa26326b3f1f40f0f1eda0c067ccf' ], 'children': [ '/browse/revision/b696e2b738bfa26326b3f1f40f0f1eda0c067ccf' ], } # when actual_rev = utils.prepare_revision_view(rev_input) # then self.assertEquals(actual_rev, expected_rev) mock_flask.url_for.assert_has_calls([ call('browse_revision', sha1_git='a585d2b738bfa26326b3f1f40f0f1eda0c067ccf'), call('browse_revision', sha1_git='b696e2b738bfa26326b3f1f40f0f1eda0c067ccf'), call('browse_directory', sha1_git='2a1dbabeed4dcf1f4a4c441993b2ffc9d972780b'), ]) + @patch('swh.web.ui.utils.flask') + @istest + def prepare_directory_listing_with_revision_history(self, mock_flask): + # given + def mock_url_for(url_key, **kwds): + if url_key == 'browse_revision_history_directory': + sha1_git_root = kwds['sha1_git_root'] + sha1_git = kwds['sha1_git'] + path = kwds['path'] + return '/browse/revision/' + sha1_git_root + \ + '/history/' + sha1_git + '/directory/' + path + + mock_flask.url_for.side_effect = mock_url_for + + inputs = [{'type': 'dir', + 'target': '123', + 'name': 'some-dir-name'}, + {'type': 'file', + 'sha1': '654', + 'name': 'some-filename'}, + {'type': 'dir', + 'target': '987', + 'name': 'some-other-dirname'}] + + expected_output = [{'link': '/browse/revision/revsha1root/history/' + 'revsha1/directory/' + 'some/path/some-dir-name', + 'name': 'some-dir-name', + 'type': 'dir'}, + {'link': '/browse/revision/revsha1root/history/' + 'revsha1/directory/' + 'some/path/some-filename', + 'name': 'some-filename', + 'type': 'file'}, + {'link': '/browse/revision/revsha1root/history/' + 'revsha1/directory/' + 'some/path/some-other-dirname', + 'name': 'some-other-dirname', + 'type': 'dir'}] + + # when + actual_outputs = utils.prepare_directory_listing_with_revision_history( + 'revsha1root', 'revsha1', 'some/path', inputs) + + # then + self.assertEquals(actual_outputs, expected_output) + + mock_flask.url_for.assert_has_calls([ + call('browse_revision_history_directory', + sha1_git_root='revsha1root', + sha1_git='revsha1', + path='some/path/some-dir-name'), + call('browse_revision_history_directory', + sha1_git_root='revsha1root', + sha1_git='revsha1', + path='some/path/some-filename'), + call('browse_revision_history_directory', + sha1_git_root='revsha1root', + sha1_git='revsha1', + path='some/path/some-other-dirname'), + ]) + @istest def filter_field_keys_dict_unknown_keys(self): # when actual_res = utils.filter_field_keys( {'directory': 1, 'file': 2, 'link': 3}, {'directory1', 'file2'}) # then self.assertEqual(actual_res, {}) @istest def filter_field_keys_dict(self): # when actual_res = utils.filter_field_keys( {'directory': 1, 'file': 2, 'link': 3}, {'directory', 'link'}) # then self.assertEqual(actual_res, {'directory': 1, 'link': 3}) @istest def filter_field_keys_list_unknown_keys(self): # when actual_res = utils.filter_field_keys( [{'directory': 1, 'file': 2, 'link': 3}, {'1': 1, '2': 2, 'link': 3}], {'d'}) # then self.assertEqual(actual_res, [{}, {}]) @istest def filter_field_keys_list(self): # when actual_res = utils.filter_field_keys( [{'directory': 1, 'file': 2, 'link': 3}, {'dir': 1, 'fil': 2, 'lin': 3}], {'directory', 'dir'}) # then self.assertEqual(actual_res, [{'directory': 1}, {'dir': 1}]) @istest def filter_field_keys_other(self): # given input_set = {1, 2} # when actual_res = utils.filter_field_keys(input_set, {'a', '1'}) # then self.assertEqual(actual_res, input_set) @istest def fmap(self): self.assertEquals([2, 3, 4], utils.fmap(lambda x: x+1, [1, 2, 3])) self.assertEquals({'a': 2, 'b': 4}, utils.fmap(lambda x: x*2, {'a': 1, 'b': 2})) self.assertEquals(100, utils.fmap(lambda x: x*10, 10)) @istest def person_to_string(self): self.assertEqual(utils.person_to_string(dict(name='raboof', email='foo@bar')), 'raboof ') @istest def parse_timestamp(self): input_timestamps = [ '2016-01-12', '2016-01-12T09:19:12+0100', 'Today is January 1, 2047 at 8:21:00AM', '1452591542', ] output_dates = [ datetime.datetime(2016, 1, 12, 0, 0), datetime.datetime(2016, 1, 12, 9, 19, 12, tzinfo=dateutil.tz.tzoffset(None, 3600)), datetime.datetime(2047, 1, 1, 8, 21), datetime.datetime(2016, 1, 12, 10, 39, 2), ] for ts, exp_date in zip(input_timestamps, output_dates): self.assertEquals(utils.parse_timestamp(ts), exp_date) diff --git a/swh/web/ui/tests/test_views.py b/swh/web/ui/tests/test_views.py index 34f266e0..d562d3a8 100644 --- a/swh/web/ui/tests/test_views.py +++ b/swh/web/ui/tests/test_views.py @@ -1,1070 +1,1183 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from nose.tools import istest from swh.web.ui.tests import test_app from unittest.mock import patch from swh.web.ui.exc import BadInputExc, NotFoundExc class FileMock(): def __init__(self, filename): self.filename = filename class ViewTestCase(test_app.SWHViewTestCase): render_template = False @patch('swh.web.ui.views.flask') @istest def homepage(self, mock_flask): # given mock_flask.flash.return_value = 'something' # when rv = self.client.get('/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('home.html') mock_flask.flash.assert_called_once_with( 'This Web app is still work in progress, use at your own risk', 'warning') @istest def info(self): # when rv = self.client.get('/about/') self.assertEquals(rv.status_code, 200) self.assert_template_used('about.html') self.assertIn(b'About', rv.data) @istest def search_default(self): # when rv = self.client.get('/search/') self.assertEquals(rv.status_code, 200) self.assertEqual(self.get_context_variable('q'), '') self.assertEqual(self.get_context_variable('messages'), []) self.assertEqual(self.get_context_variable('filename'), None) self.assertEqual(self.get_context_variable('file'), None) self.assert_template_used('upload_and_search.html') @patch('swh.web.ui.views.service') @istest def search_get_query_hash_not_found(self, mock_service): # given mock_service.lookup_hash.return_value = {'found': None} # when rv = self.client.get('/search/?q=sha1:456') self.assertEquals(rv.status_code, 200) self.assertEqual(self.get_context_variable('q'), 'sha1:456') self.assertEqual(self.get_context_variable('messages'), ['Content with hash sha1:456 not found!']) self.assertEqual(self.get_context_variable('filename'), None) self.assertEqual(self.get_context_variable('file'), None) self.assert_template_used('upload_and_search.html') mock_service.lookup_hash.assert_called_once_with('sha1:456') @patch('swh.web.ui.views.service') @istest def search_get_query_hash_bad_input(self, mock_service): # given mock_service.lookup_hash.side_effect = BadInputExc('error msg') # when rv = self.client.get('/search/?q=sha1_git:789') self.assertEquals(rv.status_code, 200) self.assertEqual(self.get_context_variable('q'), 'sha1_git:789') self.assertEqual(self.get_context_variable('messages'), ['error msg']) self.assertEqual(self.get_context_variable('filename'), None) self.assertEqual(self.get_context_variable('file'), None) self.assert_template_used('upload_and_search.html') mock_service.lookup_hash.assert_called_once_with('sha1_git:789') @patch('swh.web.ui.views.service') @istest def search_get_query_hash_found(self, mock_service): # given mock_service.lookup_hash.return_value = {'found': True} # when rv = self.client.get('/search/?q=sha1:123') self.assertEquals(rv.status_code, 200) self.assertEqual(self.get_context_variable('q'), 'sha1:123') self.assertEqual(self.get_context_variable('messages'), ['Content with hash sha1:123 found!']) self.assertEqual(self.get_context_variable('filename'), None) self.assertEqual(self.get_context_variable('file'), None) self.assert_template_used('upload_and_search.html') mock_service.lookup_hash.assert_called_once_with('sha1:123') @patch('swh.web.ui.views.service') @istest def search_post_query_hash_not_found(self, mock_service): # given mock_service.lookup_hash.return_value = {'found': None} # when rv = self.client.get('/search/?q=sha1:456') self.assertEquals(rv.status_code, 200) self.assertEqual(self.get_context_variable('q'), 'sha1:456') self.assertEqual(self.get_context_variable('messages'), ['Content with hash sha1:456 not found!']) self.assertEqual(self.get_context_variable('filename'), None) self.assertEqual(self.get_context_variable('file'), None) self.assert_template_used('upload_and_search.html') mock_service.lookup_hash.assert_called_once_with('sha1:456') @patch('swh.web.ui.views.service') @istest def search_post_query_hash_bad_input(self, mock_service): # given mock_service.lookup_hash.side_effect = BadInputExc('error msg!') # when rv = self.client.post('/search/', data=dict(q='sha1_git:987')) self.assertEquals(rv.status_code, 200) self.assertEqual(self.get_context_variable('q'), 'sha1_git:987') self.assertEqual(self.get_context_variable('messages'), ['error msg!']) self.assertEqual(self.get_context_variable('filename'), None) self.assertEqual(self.get_context_variable('file'), None) self.assert_template_used('upload_and_search.html') mock_service.lookup_hash.assert_called_once_with('sha1_git:987') @patch('swh.web.ui.views.service') @istest def search_post_query_hash_found(self, mock_service): # given mock_service.lookup_hash.return_value = {'found': True} # when rv = self.client.post('/search/', data=dict(q='sha1:321')) self.assertEquals(rv.status_code, 200) self.assertEqual(self.get_context_variable('q'), 'sha1:321') self.assertEqual(self.get_context_variable('messages'), ['Content with hash sha1:321 found!']) self.assertEqual(self.get_context_variable('filename'), None) self.assertEqual(self.get_context_variable('file'), None) self.assert_template_used('upload_and_search.html') mock_service.lookup_hash.assert_called_once_with('sha1:321') @patch('swh.web.ui.views.service') @patch('swh.web.ui.views.request') @istest def search_post_upload_and_hash_bad_input(self, mock_request, mock_service): # given mock_request.data = {} mock_request.method = 'POST' mock_request.files = dict(filename=FileMock('foobar')) mock_service.upload_and_search.side_effect = BadInputExc( 'error bad input') # when (mock_request completes the post request) rv = self.client.post('/search/') # then self.assertEquals(rv.status_code, 200) self.assertEqual(self.get_context_variable('messages'), ['error bad input']) self.assert_template_used('upload_and_search.html') mock_service.upload_and_search.called = True @patch('swh.web.ui.views.service') @patch('swh.web.ui.views.request') @istest def search_post_upload_and_hash_not_found(self, mock_request, mock_service): # given mock_request.data = {} mock_request.method = 'POST' mock_request.files = dict(filename=FileMock('foobar')) mock_service.upload_and_search.return_value = {'filename': 'foobar', 'sha1': 'blahhash', 'found': False} # when (mock_request completes the post request) rv = self.client.post('/search/') # then self.assertEquals(rv.status_code, 200) self.assertEqual(self.get_context_variable('messages'), ["File foobar with hash blahhash not found!"]) self.assertEqual(self.get_context_variable('filename'), 'foobar') self.assertEqual(self.get_context_variable('sha1'), 'blahhash') self.assert_template_used('upload_and_search.html') mock_service.upload_and_search.called = True @patch('swh.web.ui.views.service') @patch('swh.web.ui.views.request') @istest def search_post_upload_and_hash_found(self, mock_request, mock_service): # given mock_request.data = {} mock_request.method = 'POST' mock_request.files = dict(filename=FileMock('foobar')) mock_service.upload_and_search.return_value = {'filename': 'foobar', 'sha1': '123456789', 'found': True} # when (mock_request completes the post request) rv = self.client.post('/search/') # then self.assertEquals(rv.status_code, 200) self.assertEqual(self.get_context_variable('messages'), ["File foobar with hash 123456789 found!"]) self.assertEqual(self.get_context_variable('filename'), 'foobar') self.assertEqual(self.get_context_variable('sha1'), '123456789') self.assert_template_used('upload_and_search.html') mock_service.upload_and_search.called = True @patch('swh.web.ui.views.service') @istest def browse_content_detail_not_found(self, mock_service): # given mock_service.lookup_content.return_value = None # when rv = self.client.get('/browse/content/sha1:sha1-hash/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('content.html') self.assertEqual(self.get_context_variable('message'), 'Content with sha1:sha1-hash not found.') self.assertEqual(self.get_context_variable('content'), None) mock_service.lookup_content.assert_called_once_with( 'sha1:sha1-hash') @patch('swh.web.ui.views.service') @istest def browse_content_detail_bad_input(self, mock_service): # given mock_service.lookup_content.side_effect = BadInputExc('Bad input!') # when rv = self.client.get('/browse/content/sha1:sha1-hash/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('content.html') self.assertEqual(self.get_context_variable('message'), 'Bad input!') self.assertIsNone(self.get_context_variable('content')) mock_service.lookup_content.assert_called_once_with( 'sha1:sha1-hash') @patch('swh.web.ui.views.service') @istest def browse_content_detail(self, mock_service): # given stub_content = {'sha1': 'sha1_hash'} mock_service.lookup_content.return_value = stub_content # when rv = self.client.get('/browse/content/sha1:sha1-hash/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('content.html') self.assertIsNone(self.get_context_variable('message')) self.assertEqual(self.get_context_variable('content'), {'sha1': 'sha1_hash'}) mock_service.lookup_content.assert_called_once_with( 'sha1:sha1-hash') @patch('swh.web.ui.views.service') @istest def browse_content_data(self, mock_service): # given stub_content_raw = { 'sha1': 'sha1-hash', 'data': b'some-data' } mock_service.lookup_content_raw.return_value = stub_content_raw # when rv = self.client.get('/browse/content/sha1:sha1-hash/raw/') self.assertEquals(rv.status_code, 200) self.assert_template_used('content-data.html') self.assertEqual(self.get_context_variable('message'), 'Content sha1-hash') self.assertEqual(self.get_context_variable('content'), stub_content_raw) mock_service.lookup_content_raw.assert_called_once_with( 'sha1:sha1-hash') @patch('swh.web.ui.views.service') @istest def browse_content_data_not_found(self, mock_service): # given mock_service.lookup_content_raw.return_value = None # when rv = self.client.get('/browse/content/sha1:sha1-unknown/raw/') self.assertEquals(rv.status_code, 200) self.assert_template_used('content-data.html') self.assertEqual(self.get_context_variable('message'), 'Content with sha1:sha1-unknown not found.') self.assertEqual(self.get_context_variable('content'), None) mock_service.lookup_content_raw.assert_called_once_with( 'sha1:sha1-unknown') @patch('swh.web.ui.views.service') @istest def browse_content_data_invalid_hash(self, mock_service): # given mock_service.lookup_content_raw.side_effect = BadInputExc( 'Invalid hash') # when rv = self.client.get('/browse/content/sha2:sha1-invalid/raw/') self.assertEquals(rv.status_code, 200) self.assert_template_used('content-data.html') self.assertEqual(self.get_context_variable('message'), 'Invalid hash') self.assertEqual(self.get_context_variable('content'), None) mock_service.lookup_content_raw.assert_called_once_with( 'sha2:sha1-invalid') @patch('swh.web.ui.views.service') @patch('swh.web.ui.utils') @istest def browse_directory_bad_input(self, mock_utils, mock_service): # given mock_service.lookup_directory.side_effect = BadInputExc('Invalid hash') # when rv = self.client.get('/browse/directory/sha2-invalid/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('directory.html') self.assertEqual(self.get_context_variable('message'), 'Invalid hash') self.assertEqual(self.get_context_variable('files'), []) mock_service.lookup_directory.assert_called_once_with( 'sha2-invalid') @patch('swh.web.ui.views.service') @patch('swh.web.ui.utils') @istest def browse_directory_empty_result(self, mock_utils, mock_service): # given mock_service.lookup_directory.return_value = None # when rv = self.client.get('/browse/directory/some-sha1/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('directory.html') self.assertEqual(self.get_context_variable('message'), 'Directory some-sha1 not found.') self.assertEqual(self.get_context_variable('files'), []) mock_service.lookup_directory.assert_called_once_with( 'some-sha1') @patch('swh.web.ui.views.service') @patch('swh.web.ui.views.utils') @istest def browse_directory(self, mock_utils, mock_service): # given stub_directory_ls = [ {'type': 'dir', 'target': '123', 'name': 'some-dir-name'}, {'type': 'file', 'sha1': '654', 'name': 'some-filename'}, {'type': 'dir', 'target': '987', 'name': 'some-other-dirname'} ] mock_service.lookup_directory.return_value = stub_directory_ls stub_directory_map = [ {'link': '/path/to/url/dir/123', 'name': 'some-dir-name'}, {'link': '/path/to/url/file/654', 'name': 'some-filename'}, {'link': '/path/to/url/dir/987', 'name': 'some-other-dirname'} ] mock_utils.prepare_directory_listing.return_value = stub_directory_map # when rv = self.client.get('/browse/directory/some-sha1/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('directory.html') self.assertEqual(self.get_context_variable('message'), 'Listing for directory some-sha1:') self.assertEqual(self.get_context_variable('files'), stub_directory_map) mock_service.lookup_directory.assert_called_once_with( 'some-sha1') mock_utils.prepare_directory_listing.assert_called_once_with( stub_directory_ls) @patch('swh.web.ui.views.service') # @istest def browse_content_with_origin_content_not_found(self, mock_service): # given mock_service.lookup_hash.return_value = {'found': False} # when rv = self.client.get('/browse/content/sha256:some-sha256/origin/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('content-with-origin.html') self.assertEqual(self.get_context_variable('message'), 'Hash sha256:some-sha256 was not found.') mock_service.lookup_hash.assert_called_once_with( 'sha256:some-sha256') mock_service.lookup_hash_origin.called = False @patch('swh.web.ui.views.service') # @istest def browse_content_with_origin_bad_input(self, mock_service): # given mock_service.lookup_hash.side_effect = BadInputExc('Invalid hash') # when rv = self.client.get('/browse/content/sha256:some-sha256/origin/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('content-with-origin.html') self.assertEqual( self.get_context_variable('message'), 'Invalid hash') mock_service.lookup_hash.assert_called_once_with( 'sha256:some-sha256') mock_service.lookup_hash_origin.called = False @patch('swh.web.ui.views.service') # @istest def browse_content_with_origin(self, mock_service): # given mock_service.lookup_hash.return_value = {'found': True} mock_service.lookup_hash_origin.return_value = { 'origin_type': 'ftp', 'origin_url': '/some/url', 'revision': 'revision-hash', 'branch': 'master', 'path': '/path/to', } # when rv = self.client.get('/browse/content/sha256:some-sha256/origin/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('content-with-origin.html') self.assertEqual( self.get_context_variable('message'), "The content with hash sha256:some-sha256 has been seen on " + "origin with type 'ftp'\n" + "at url '/some/url'. The revision was identified at " + "'revision-hash' on branch 'master'.\n" + "The file's path referenced was '/path/to'.") mock_service.lookup_hash.assert_called_once_with( 'sha256:some-sha256') mock_service.lookup_hash_origin.assert_called_once_with( 'sha256:some-sha256') @patch('swh.web.ui.views.service') @istest def browse_origin_not_found(self, mock_service): # given mock_service.lookup_origin.return_value = None # when rv = self.client.get('/browse/origin/1/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('origin.html') self.assertEqual(self.get_context_variable('origin_id'), 1) self.assertEqual( self.get_context_variable('message'), 'Origin 1 not found!') mock_service.lookup_origin.assert_called_once_with(1) @patch('swh.web.ui.views.service') @istest def browse_origin_found(self, mock_service): # given mock_origin = {'type': 'git', 'lister': None, 'project': None, 'url': 'rsync://some/url', 'id': 426} mock_service.lookup_origin.return_value = mock_origin # when rv = self.client.get('/browse/origin/426/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('origin.html') self.assertEqual(self.get_context_variable('origin_id'), 426) self.assertEqual(self.get_context_variable('origin'), mock_origin) mock_service.lookup_origin.assert_called_once_with(426) @patch('swh.web.ui.views.service') @istest def browse_origin_bad_input(self, mock_service): # given mock_service.lookup_origin.side_effect = BadInputExc('wrong input') # when rv = self.client.get('/browse/origin/426/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('origin.html') self.assertEqual(self.get_context_variable('origin_id'), 426) mock_service.lookup_origin.assert_called_once_with(426) @patch('swh.web.ui.views.service') @istest def browse_person_not_found(self, mock_service): # given mock_service.lookup_person.return_value = None # when rv = self.client.get('/browse/person/1/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('person.html') self.assertEqual(self.get_context_variable('person_id'), 1) self.assertEqual( self.get_context_variable('message'), 'Person 1 not found!') mock_service.lookup_person.assert_called_once_with(1) @patch('swh.web.ui.views.service') @istest def browse_person_found(self, mock_service): # given mock_person = {'type': 'git', 'lister': None, 'project': None, 'url': 'rsync://some/url', 'id': 426} mock_service.lookup_person.return_value = mock_person # when rv = self.client.get('/browse/person/426/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('person.html') self.assertEqual(self.get_context_variable('person_id'), 426) self.assertEqual(self.get_context_variable('person'), mock_person) mock_service.lookup_person.assert_called_once_with(426) @patch('swh.web.ui.views.service') @istest def browse_person_bad_input(self, mock_service): # given mock_service.lookup_person.side_effect = BadInputExc('wrong input') # when rv = self.client.get('/browse/person/426/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('person.html') self.assertEqual(self.get_context_variable('person_id'), 426) mock_service.lookup_person.assert_called_once_with(426) @patch('swh.web.ui.views.service') @istest def browse_release_not_found(self, mock_service): # given mock_service.lookup_release.return_value = None # when rv = self.client.get('/browse/release/1/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('release.html') self.assertEqual(self.get_context_variable('sha1_git'), '1') self.assertEqual( self.get_context_variable('message'), 'Release 1 not found!') mock_service.lookup_release.assert_called_once_with('1') @patch('swh.web.ui.views.service') @istest def browse_release_bad_input(self, mock_service): # given mock_service.lookup_release.side_effect = BadInputExc('wrong input') # when rv = self.client.get('/browse/release/426/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('release.html') self.assertEqual(self.get_context_variable('sha1_git'), '426') mock_service.lookup_release.assert_called_once_with('426') @patch('swh.web.ui.views.service') @istest def browse_release(self, mock_service): # given mock_release = { "date": "Sun, 05 Jul 2015 18:02:06 GMT", "id": "1e951912027ea6873da6985b91e50c47f645ae1a", "target": "d770e558e21961ad6cfdf0ff7df0eb5d7d4f0754", "synthetic": False, "target_type": "revision", "author": { "email": "torvalds@linux-foundation.org", "name": "Linus Torvalds" }, "message": "Linux 4.2-rc1\n", "name": "v4.2-rc1" } mock_service.lookup_release.return_value = mock_release expected_release = { "date": "Sun, 05 Jul 2015 18:02:06 GMT", "id": "1e951912027ea6873da6985b91e50c47f645ae1a", "target": '/browse/revision/d770e558e21961ad6cfdf0ff7df0' 'eb5d7d4f0754/', "synthetic": False, "target_type": "revision", "author": "Linus Torvalds ", "message": "Linux 4.2-rc1\n", "name": "v4.2-rc1" } # when rv = self.client.get('/browse/release/426/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('release.html') self.assertEqual(self.get_context_variable('sha1_git'), '426') self.assertEqual(self.get_context_variable('release'), expected_release) self.assertEqual(self.get_context_variable('keys'), [ 'id', 'name', 'date', 'message', 'author', 'target', 'target_type']) mock_service.lookup_release.assert_called_once_with('426') @patch('swh.web.ui.views.service') @istest def browse_revision_not_found(self, mock_service): # given mock_service.lookup_revision.return_value = None # when rv = self.client.get('/browse/revision/1/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('revision.html') self.assertEqual(self.get_context_variable('sha1_git'), '1') self.assertEqual( self.get_context_variable('message'), 'Revision 1 not found!') mock_service.lookup_revision.assert_called_once_with('1') @patch('swh.web.ui.views.service') @istest def browse_revision_bad_input(self, mock_service): # given mock_service.lookup_revision.side_effect = BadInputExc('wrong input') # when rv = self.client.get('/browse/revision/426/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('revision.html') self.assertEqual(self.get_context_variable('sha1_git'), '426') mock_service.lookup_revision.assert_called_once_with('426') @patch('swh.web.ui.views.service') @istest def browse_revision_found(self, mock_service): # given mock_revision = { 'id': 'd770e558e21961ad6cfdf0ff7df0eb5d7d4f0754', 'date': 'Sun, 05 Jul 2015 18:01:52 GMT', 'committer': { 'email': 'torvalds@linux-foundation.org', 'name': 'Linus Torvalds' }, 'committer_date': 'Sun, 05 Jul 2015 18:01:52 GMT', 'metadata': None, 'type': 'git', 'author': { 'email': 'torvalds@linux-foundation.org', 'name': 'Linus Torvalds' }, 'message': 'Linux 4.2-rc1\n', 'synthetic': False, 'directory': '2a1dbabeed4dcf1f4a4c441993b2ffc9d972780b', 'parents': [ 'a585d2b738bfa26326b3f1f40f0f1eda0c067ccf' ], } mock_service.lookup_revision.return_value = mock_revision expected_revision = { 'id': 'd770e558e21961ad6cfdf0ff7df0eb5d7d4f0754', 'date': 'Sun, 05 Jul 2015 18:01:52 GMT', 'committer': 'Linus Torvalds ', 'committer_date': 'Sun, 05 Jul 2015 18:01:52 GMT', 'type': 'git', 'author': 'Linus Torvalds ', 'message': 'Linux 4.2-rc1\n', 'synthetic': False, 'metadata': None, 'parents': [ '/browse/revision/a585d2b738bfa26326b3f1f40f0f1eda0c067ccf/' ], 'directory': '/browse/directory/2a1dbabeed4dcf1f4a4c441993b2f' 'fc9d972780b/', } # when rv = self.client.get('/browse/revision/426/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('revision.html') self.assertEqual(self.get_context_variable('sha1_git'), '426') self.assertEqual(self.get_context_variable('revision'), expected_revision) self.assertEqual(self.get_context_variable('keys'), set(expected_revision.keys()) - set(['directory', 'parents', 'children'])) mock_service.lookup_revision.assert_called_once_with('426') @istest def browse_revision_history_same_sha1(self): # when rv = self.client.get('/browse/revision/10/history/10/') # then self.assertEquals(rv.status_code, 302) @patch('swh.web.ui.views.service') @istest def browse_revision_history_not_found(self, mock_service): # given mock_service.lookup_revision_with_context.return_value = None # when rv = self.client.get('/browse/revision/1/history/2/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('revision.html') self.assertEqual(self.get_context_variable('sha1_git_root'), '1') self.assertEqual(self.get_context_variable('sha1_git'), '2') self.assertEqual( self.get_context_variable('message'), "Possibly sha1_git '2' is not an ancestor of sha1_git_root '1'") mock_service.lookup_revision_with_context.assert_called_once_with( '1', '2', 100) @patch('swh.web.ui.views.service') @istest def browse_revision_history_root_not_found(self, mock_service): # given mock_service.lookup_revision_with_context.side_effect = NotFoundExc( 'Revision root 123 not found') # when rv = self.client.get('/browse/revision/123/history/456/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('revision.html') self.assertEqual(self.get_context_variable('sha1_git_root'), '123') self.assertEqual(self.get_context_variable('sha1_git'), '456') self.assertEqual( self.get_context_variable('message'), "Revision root 123 not found") mock_service.lookup_revision_with_context.assert_called_once_with( '123', '456', 100) @patch('swh.web.ui.views.service') @istest def browse_revision_history_root_bad_input(self, mock_service): # given mock_service.lookup_revision_with_context.side_effect = NotFoundExc( 'Input incorrect') # when rv = self.client.get('/browse/revision/321/history/654/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('revision.html') self.assertEqual(self.get_context_variable('sha1_git_root'), '321') self.assertEqual(self.get_context_variable('sha1_git'), '654') self.assertEqual( self.get_context_variable('message'), "Input incorrect") mock_service.lookup_revision_with_context.assert_called_once_with( '321', '654', 100) @patch('swh.web.ui.views.utils') @patch('swh.web.ui.views.service') @istest def browse_revision_history(self, mock_service, mock_utils): # given stub_revision = {'id': 'some-rev'} mock_service.lookup_revision_with_context.return_value = stub_revision expected_revision = {'id': 'some-rev-id'} mock_utils.prepare_revision_view.return_value = expected_revision # when rv = self.client.get('/browse/revision/426/history/789/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('revision.html') self.assertEqual(self.get_context_variable('sha1_git_root'), '426') - self.assertEqual(self.get_context_variable('sha1_git'), '789') + self.assertEqual(self.get_context_variable('sha1_git'), 'some-rev-id') self.assertEqual(self.get_context_variable('revision'), expected_revision) self.assertEqual(self.get_context_variable('keys'), set(expected_revision.keys()) - set(['directory', 'parents', 'children'])) mock_service.lookup_revision_with_context.assert_called_once_with( '426', '789', 100) mock_utils.prepare_revision_view.assert_called_once_with(stub_revision) @patch('swh.web.ui.views.service') @istest def browse_revision_directory_not_found(self, mock_service): # given - mock_service.lookup_directory_with_revision.side_effect = NotFoundExc( + mock_service.lookup_directory_through_revision.side_effect = NotFoundExc( # noqa 'Not found!') # when rv = self.client.get('/browse/revision/1/directory/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('revision-directory.html') self.assertEqual(self.get_context_variable('sha1_git'), '1') self.assertEqual(self.get_context_variable('path'), '.') self.assertIsNone(self.get_context_variable('result')) self.assertEqual( self.get_context_variable('message'), "Not found!") - mock_service.lookup_directory_with_revision.assert_called_once_with( - '1', None) + mock_service.lookup_directory_through_revision.assert_called_once_with( + {'sha1_git': '1'}, None) @patch('swh.web.ui.views.service') @istest def browse_revision_directory_bad_input(self, mock_service): # given - mock_service.lookup_directory_with_revision.side_effect = BadInputExc( + mock_service.lookup_directory_through_revision.side_effect = BadInputExc( # noqa 'Bad input!') # when rv = self.client.get('/browse/revision/10/directory/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('revision-directory.html') self.assertEqual(self.get_context_variable('sha1_git'), '10') self.assertEqual(self.get_context_variable('path'), '.') self.assertIsNone(self.get_context_variable('result')) self.assertEqual( self.get_context_variable('message'), "Bad input!") - mock_service.lookup_directory_with_revision.assert_called_once_with( - '10', None) + mock_service.lookup_directory_through_revision.assert_called_once_with( + {'sha1_git': '10'}, None) @patch('swh.web.ui.views.service') @istest def browse_revision_directory_not_implemented(self, mock_service): # given - mock_service.lookup_directory_with_revision.side_effect = NotImplementedError( # noqa + mock_service.lookup_directory_through_revision.side_effect = NotImplementedError( # noqa 'Oops! Not implemented!') # when rv = self.client.get('/browse/revision/10/directory/path/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('revision-directory.html') self.assertEqual(self.get_context_variable('sha1_git'), '10') self.assertEqual(self.get_context_variable('path'), 'path') self.assertIsNone(self.get_context_variable('result')) self.assertEqual( self.get_context_variable('message'), 'Oops! Not implemented!') - mock_service.lookup_directory_with_revision.assert_called_once_with( - '10', 'path') + mock_service.lookup_directory_through_revision.assert_called_once_with( + {'sha1_git': '10'}, 'path') - from nose.plugins.attrib import attr - - @attr('one') @patch('swh.web.ui.views.service') @istest def browse_revision_directory(self, mock_service): # given stub_result0 = {'type': 'dir', 'content': [{'id': 'some-result', 'type': 'file', 'name': 'blah'}]} - mock_service.lookup_directory_with_revision.return_value = stub_result0 + + mock_service.lookup_directory_through_revision.return_value = ( + '100', stub_result0) + stub_result1 = { 'type': 'dir', 'content': [ {'type': 'file', 'name': 'blah', 'link': '/browse/revision/100/directory/some/path/blah/'} ] } # when rv = self.client.get('/browse/revision/100/directory/some/path/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('revision-directory.html') self.assertEqual(self.get_context_variable('sha1_git'), '100') self.assertEqual(self.get_context_variable('path'), 'some/path') self.assertIsNone(self.get_context_variable('message')) self.assertEqual(self.get_context_variable('result'), stub_result1) - mock_service.lookup_directory_with_revision.assert_called_once_with( - '100', 'some/path') + mock_service.lookup_directory_through_revision.assert_called_once_with( + {'sha1_git': '100'}, 'some/path') + + @patch('swh.web.ui.views.service') + @istest + def browse_revision_history_directory_redirect(self, mock_service): + # when + rv = self.client.get('/browse/revision/1/history/1/directory/path/to/') + + # then + self.assertEquals(rv.status_code, 301) + + @patch('swh.web.ui.views.service') + @istest + def browse_revision_history_directory_not_found(self, mock_service): + # given + mock_service.lookup_directory_through_revision.side_effect = NotFoundExc('not found') # noqa + + # when + rv = self.client.get('/browse/revision/123/history/456/directory/' + '?limit=666') + + # then + self.assertEquals(rv.status_code, 200) + self.assert_template_used('revision-directory.html') + self.assertEqual(self.get_context_variable('sha1_git_root'), '123') + self.assertEqual(self.get_context_variable('sha1_git'), '456') + self.assertEqual(self.get_context_variable('path'), '.') + self.assertEqual(self.get_context_variable('message'), 'not found') + self.assertIsNone(self.get_context_variable('result')) + + mock_service.lookup_directory_through_revision.assert_called_once_with( + {'sha1_git': '456', 'sha1_git_root': '123'}, None, 666) + + @patch('swh.web.ui.views.service') + @istest + def browse_revision_history_directory_bad_input(self, mock_service): + # given + mock_service.lookup_directory_through_revision.side_effect = BadInputExc('bad input') # noqa + + # when + rv = self.client.get('/browse/revision/123/history/456/directory/') + + # then + self.assertEquals(rv.status_code, 200) + self.assert_template_used('revision-directory.html') + self.assertEqual(self.get_context_variable('sha1_git_root'), '123') + self.assertEqual(self.get_context_variable('sha1_git'), '456') + self.assertEqual(self.get_context_variable('path'), '.') + self.assertEqual(self.get_context_variable('message'), 'bad input') + self.assertIsNone(self.get_context_variable('result')) + + mock_service.lookup_directory_through_revision.assert_called_once_with( + {'sha1_git': '456', 'sha1_git_root': '123'}, None, 100) + + @patch('swh.web.ui.views.service') + @istest + def browse_revision_history_directory_not_implemented(self, mock_service): + # given + mock_service.lookup_directory_through_revision.side_effect = NotImplementedError('not yet') # noqa + + # when + rv = self.client.get('/browse/revision/123/history/456/directory/' + 'path/to/?limit=9000') + + # then + self.assertEquals(rv.status_code, 200) + self.assert_template_used('revision-directory.html') + self.assertEqual(self.get_context_variable('sha1_git_root'), '123') + self.assertEqual(self.get_context_variable('sha1_git'), '456') + self.assertEqual(self.get_context_variable('path'), 'path/to') + self.assertEqual(self.get_context_variable('message'), 'not yet') + self.assertIsNone(self.get_context_variable('result')) + + mock_service.lookup_directory_through_revision.assert_called_once_with( + {'sha1_git': '456', 'sha1_git_root': '123'}, 'path/to', 9000) + + @patch('swh.web.ui.views.service') + @istest + def browse_revision_history_directory(self, mock_service): + # given + stub_result0 = {'type': 'dir', + 'content': [{'id': 'some-result', + 'type': 'file', + 'name': 'blah'}]} + + mock_service.lookup_directory_through_revision.return_value = ( + '1000', + stub_result0) + + stub_result1 = { + 'type': 'dir', + 'content': [ + {'type': 'file', + 'name': 'blah', + 'link': '/browse/revision/100/history/999/directory/' + 'path/to/blah/'} + ] + } + + # when + rv = self.client.get('/browse/revision/100/history/999/directory/' + 'path/to/') + + # then + self.assertEquals(rv.status_code, 200) + self.assert_template_used('revision-directory.html') + self.assertEqual(self.get_context_variable('sha1_git_root'), '100') + self.assertEqual(self.get_context_variable('sha1_git'), '1000') + self.assertEqual(self.get_context_variable('path'), 'path/to') + self.assertIsNone(self.get_context_variable('message')) + self.assertEqual(self.get_context_variable('result'), stub_result1) + + mock_service.lookup_directory_through_revision.assert_called_once_with( + {'sha1_git': '999', 'sha1_git_root': '100'}, 'path/to', 100) @patch('swh.web.ui.views.service') @istest def browse_entity_not_found(self, mock_service): # given mock_service.lookup_entity_by_uuid.return_value = [] # when rv = self.client.get('/browse/entity/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('entity.html') self.assertEqual(self.get_context_variable('entities'), []) self.assertEqual( self.get_context_variable('message'), "Entity '5f4d4c51-498a-4e28-88b3-b3e4e8396cba' not found!") mock_service.lookup_entity_by_uuid.assert_called_once_with( '5f4d4c51-498a-4e28-88b3-b3e4e8396cba') @patch('swh.web.ui.views.service') @istest def browse_entity_bad_input(self, mock_service): # given mock_service.lookup_entity_by_uuid.side_effect = BadInputExc( 'wrong input') # when rv = self.client.get('/browse/entity/blah-blah-uuid/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('entity.html') self.assertEqual(self.get_context_variable('entities'), []) mock_service.lookup_entity_by_uuid.assert_called_once_with( 'blah-blah-uuid') @patch('swh.web.ui.views.service') @istest def browse_entity(self, mock_service): # given stub_entities = [ {'id': '5f4d4c51-5a9b-4e28-88b3-b3e4e8396cba'}] mock_service.lookup_entity_by_uuid.return_value = stub_entities # when rv = self.client.get('/browse/entity/' '5f4d4c51-5a9b-4e28-88b3-b3e4e8396cba/') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('entity.html') self.assertEqual(self.get_context_variable('entities'), stub_entities) self.assertIsNone(self.get_context_variable('message')) mock_service.lookup_entity_by_uuid.assert_called_once_with( '5f4d4c51-5a9b-4e28-88b3-b3e4e8396cba') diff --git a/swh/web/ui/utils.py b/swh/web/ui/utils.py index 4ad1dcf9..6acba538 100644 --- a/swh/web/ui/utils.py +++ b/swh/web/ui/utils.py @@ -1,185 +1,218 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import flask from dateutil import parser def filter_endpoints(url_map, prefix_url_rule, blacklist=[]): """Filter endpoints by prefix url rule. Args: - url_map: Url Werkzeug.Map of rules - prefix_url_rule: prefix url string - blacklist: blacklist of some url Returns: Dictionary of url_rule with values methods and endpoint. The key is the url, the associated value is a dictionary of 'methods' (possible http methods) and 'endpoint' (python function) """ out = {} for r in url_map: rule = r['rule'] if rule == prefix_url_rule or rule in blacklist: continue if rule.startswith(prefix_url_rule): out[rule] = {'methods': sorted(map(str, r['methods'])), 'endpoint': r['endpoint']} return out def prepare_directory_listing(files): """Given a list of dictionary files, return a dictionary ready for view. Args: files: List of files to enrich Returns: List of enriched files with urls to other resources """ ls = [] for entry in files: new_entry = {'name': entry['name'], 'type': entry['type']} if entry['type'] == 'dir': new_entry['link'] = flask.url_for('browse_directory', sha1_git=entry['target']) else: new_entry['link'] = flask.url_for('browse_content_data', q=entry['sha1']) ls.append(new_entry) return ls def prepare_directory_listing_with_revision(rev_sha1_git, prefix_path, files): """Given a list of dictionary files, return a dictionary ready for view. Args: rev_sha1_git: The revision identifier prefix_path: the path to append (could be None) files: List of files to enrich Returns: List of enriched files with urls to other resources """ ls = [] for entry in files: new_entry = {'name': entry['name'], 'type': entry['type']} new_path = ''.join([ '' if not prefix_path else (prefix_path + '/'), entry['name']]) new_entry['link'] = flask.url_for('browse_revision_directory', sha1_git=rev_sha1_git, path=new_path) ls.append(new_entry) return ls +def prepare_directory_listing_with_revision_history(sha1_git_root, + sha1_git, + prefix_path, + files): + """Given a list of dictionary files, return a dictionary ready for view. + + Args: + rev_sha1_git: The revision identifier + prefix_path: the path to append (could be None) + files: List of files to enrich + + Returns: + List of enriched files with urls to other resources + + """ + ls = [] + for entry in files: + new_entry = {'name': entry['name'], + 'type': entry['type']} + new_path = ''.join([ + '' if not prefix_path + else (prefix_path + '/'), + entry['name']]) + + new_entry['link'] = flask.url_for('browse_revision_history_directory', + sha1_git_root=sha1_git_root, + sha1_git=sha1_git, + path=new_path) + ls.append(new_entry) + + return ls + + def prepare_revision_view(revision): """Given a revision, return a dictionary ready view. """ author = revision.get('author') if author: revision['author'] = person_to_string(author) committer = revision.get('committer') if committer: revision['committer'] = person_to_string(committer) revision['parents'] = list(map(lambda p: flask.url_for('browse_revision', sha1_git=p), revision.get('parents', []))) if 'children' in revision: revision['children'] = list(map(lambda child: flask.url_for( 'browse_revision', sha1_git=child), revision['children'])) directory = revision.get('directory') if directory: revision['directory'] = flask.url_for('browse_directory', sha1_git=revision['directory']) return revision def filter_field_keys(obj, field_keys): """Given an object instance (directory or list), and a csv field keys to filter on. Return the object instance with filtered keys. Note: Returns obj as is if it's an instance of types not in (dictionary, list) Args: - obj: one object (dictionary, list...) to filter. - field_keys: csv or set of keys to filter the object on Returns: obj filtered on field_keys """ if isinstance(obj, dict): filt_dict = {} for key, value in obj.items(): if key in field_keys: filt_dict[key] = value return filt_dict elif isinstance(obj, list): filt_list = [] for e in obj: filt_list.append(filter_field_keys(e, field_keys)) return filt_list return obj def fmap(f, data): if isinstance(data, list): return [f(x) for x in data] if isinstance(data, dict): return {k: f(v) for (k, v) in data.items()} return f(data) def person_to_string(person): """Map a person (person, committer, tagger, etc...) to a string. """ return ''.join([person['name'], ' <', person['email'], '>']) def parse_timestamp(timestamp): """Given a time or timestamp (as string), parse the result as datetime. Returns: datetime result of parsing values. Samples: - 2016-01-12 - 2016-01-12T09:19:12+0100 - Today is January 1, 2047 at 8:21:00AM - 1452591542 """ try: res = parser.parse(timestamp, ignoretz=False, fuzzy=True) except: res = datetime.datetime.fromtimestamp(float(timestamp)) return res diff --git a/swh/web/ui/views.py b/swh/web/ui/views.py index bb8db35c..31450a95 100644 --- a/swh/web/ui/views.py +++ b/swh/web/ui/views.py @@ -1,477 +1,552 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import flask from flask import render_template, request, url_for, redirect from flask.ext.api.decorators import set_renderers from flask.ext.api.renderers import HTMLRenderer from swh.core.hashutil import ALGORITHMS from swh.web.ui import service, utils from swh.web.ui.exc import BadInputExc, NotFoundExc from swh.web.ui.main import app hash_filter_keys = ALGORITHMS @app.route('/') @set_renderers(HTMLRenderer) def homepage(): """Home page """ flask.flash('This Web app is still work in progress, use at your own risk', 'warning') return render_template('home.html') @app.route('/about/') @set_renderers(HTMLRenderer) def about(): return render_template('about.html') @app.route('/search/', methods=['GET', 'POST']) @set_renderers(HTMLRenderer) def search(): """Search for hashes in swh-storage. One form to submit either: - hash query to look up in swh storage - some file content to upload, compute its hash and look it up in swh storage - both Returns: dict representing data to look for in swh storage. The following keys are returned: - file: File submitted for upload - filename: Filename submitted for upload - q: Query on hash to look for - message: Message detailing if data has been found or not. """ env = {'filename': None, 'q': None, 'file': None} data = None q = env['q'] file = env['file'] if request.method == 'GET': data = request.args elif request.method == 'POST': data = request.data # or hash and search a file file = request.files.get('filename') # could either be a query for sha1 hash q = data.get('q') messages = [] if q: env['q'] = q try: r = service.lookup_hash(q) messages.append('Content with hash %s%sfound!' % ( q, ' ' if r.get('found') else ' not ')) except BadInputExc as e: messages.append(str(e)) if file and file.filename: env['file'] = file try: uploaded_content = service.upload_and_search(file) filename = uploaded_content['filename'] sha1 = uploaded_content['sha1'] found = uploaded_content['found'] messages.append('File %s with hash %s%sfound!' % ( filename, sha1, ' ' if found else ' not ')) env.update({ 'filename': filename, 'sha1': sha1, }) except BadInputExc as e: messages.append(str(e)) env['q'] = q if q else '' env['messages'] = messages return render_template('upload_and_search.html', **env) def _origin_seen(q, data): """Given an origin, compute a message string with the right information. Args: origin: a dictionary with keys: - origin: a dictionary with type and url keys - occurrence: a dictionary with a validity range Returns: Message as a string """ origin_type = data['origin_type'] origin_url = data['origin_url'] revision = data['revision'] branch = data['branch'] path = data['path'] return """The content with hash %s has been seen on origin with type '%s' at url '%s'. The revision was identified at '%s' on branch '%s'. The file's path referenced was '%s'.""" % (q, origin_type, origin_url, revision, branch, path) @app.route('/browse/content/') @app.route('/browse/content//') @set_renderers(HTMLRenderer) def browse_content_detail(q='5d448a06f02d9de748b6b0b9620cba1bed8480da'): """Given a hash and a checksum, display the content's meta-data. Args: q is of the form algo_hash:hash with algo_hash in (sha1, sha1_git, sha256) Returns: Information on one possible origin for such content. Raises: BadInputExc in case of unknown algo_hash or bad hash NotFoundExc if the content is not found. """ env = {} message = None content = None try: content = service.lookup_content(q) if not content: message = 'Content with %s not found.' % q except BadInputExc as e: message = str(e) env['message'] = message env['content'] = content return render_template('content.html', **env) @app.route('/browse/content//raw/') @set_renderers(HTMLRenderer) def browse_content_data(q): """Given a hash and a checksum, display the content's raw data. Args: q is of the form algo_hash:hash with algo_hash in (sha1, sha1_git, sha256) Returns: Information on one possible origin for such content. Raises: BadInputExc in case of unknown algo_hash or bad hash NotFoundExc if the content is not found. """ env = {} content = None try: content = service.lookup_content_raw(q) if content: # FIXME: will break if not utf-8 content['data'] = content['data'].decode('utf-8') message = 'Content %s' % content['sha1'] else: message = 'Content with %s not found.' % q except BadInputExc as e: message = str(e) env['message'] = message env['content'] = content return render_template('content-data.html', **env) # @app.route('/browse/content//origin/') @set_renderers(HTMLRenderer) def browse_content_with_origin( q='sha1:4320781056e5a735a39de0b8c229aea224590052'): """Show content information. Args: - q: query string of the form with `algo_hash` in sha1, sha1_git, sha256. This means that several different URLs (at least one per HASH_ALGO) will point to the same content sha: the sha with 'hash' format Returns: The content's information at for a given checksum. """ env = {'q': q} try: content = service.lookup_hash(q) if not content.get('found'): message = "Hash %s was not found." % q else: origin = service.lookup_hash_origin(q) message = _origin_seen(q, origin) except BadInputExc as e: # do not like it but do not duplicate code message = str(e) env['message'] = message return render_template('content-with-origin.html', **env) @app.route('/browse/directory/') @app.route('/browse/directory//') @set_renderers(HTMLRenderer) def browse_directory(sha1_git='dcf3289b576b1c8697f2a2d46909d36104208ba3'): """Show directory information. Args: - sha1_git: the directory's sha1 git identifier. Returns: The content's information at sha1_git """ env = {'sha1_git': sha1_git} files = [] try: directory_files = service.lookup_directory(sha1_git) if directory_files: message = "Listing for directory %s:" % sha1_git files = utils.prepare_directory_listing(directory_files) else: message = "Directory %s not found." % sha1_git except BadInputExc as e: # do not like it but do not duplicate code message = str(e) env['message'] = message env['files'] = files return render_template('directory.html', **env) @app.route('/browse/origin/') @app.route('/browse/origin//') @set_renderers(HTMLRenderer) def browse_origin(origin_id=1): """Browse origin with id id. """ env = {'origin_id': origin_id, 'origin': None} try: ori = service.lookup_origin(origin_id) if ori: env.update({'origin': ori}) else: env.update({'message': 'Origin %s not found!' % origin_id}) except BadInputExc as e: env.update({'message': str(e)}) return render_template('origin.html', **env) @app.route('/browse/person/') @app.route('/browse/person//') @set_renderers(HTMLRenderer) def browse_person(person_id=1): """Browse person with id id. """ env = {'person_id': person_id, 'person': None} try: ori = service.lookup_person(person_id) if ori: env.update({'person': ori}) else: env.update({'message': 'Person %s not found!' % person_id}) except BadInputExc as e: env.update({'message': str(e)}) return render_template('person.html', **env) @app.route('/browse/release/') @app.route('/browse/release//') @set_renderers(HTMLRenderer) def browse_release(sha1_git='1e951912027ea6873da6985b91e50c47f645ae1a'): """Browse release with sha1_git. """ env = {'sha1_git': sha1_git, 'release': None} try: rel = service.lookup_release(sha1_git) if rel: author = rel.get('author') if author: rel['author'] = utils.person_to_string(author) target_type = rel.get('target_type') if target_type == 'revision': rel['target'] = url_for('browse_revision', sha1_git=rel['target']) env.update({'release': rel, 'keys': ['id', 'name', 'date', 'message', 'author', 'target', 'target_type']}) else: env.update({'message': 'Release %s not found!' % sha1_git}) except BadInputExc as e: env.update({'message': str(e)}) return render_template('release.html', **env) @app.route('/browse/revision/') @app.route('/browse/revision//') @set_renderers(HTMLRenderer) def browse_revision(sha1_git='d770e558e21961ad6cfdf0ff7df0eb5d7d4f0754'): """Browse revision with sha1_git. """ env = {'sha1_git': sha1_git, 'keys': [], 'revision': None} try: rev = service.lookup_revision(sha1_git) if rev: rev = utils.prepare_revision_view(rev) env.update({ 'revision': rev, 'keys': set(rev.keys()) - set(['directory', 'parents', 'children']) }) else: env.update({'message': 'Revision %s not found!' % sha1_git}) except BadInputExc as e: env.update({'message': str(e)}) return render_template('revision.html', **env) @app.route('/browse/revision//history//') @set_renderers(HTMLRenderer) def browse_revision_history(sha1_git_root, sha1_git): """Display information about revision sha1_git, limited to the sub-graph of all transitive parents of sha1_git_root. In other words, sha1_git is an ancestor of sha1_git_root. Args: sha1_git_root: latest revision of the browsed history. sha1_git: one of sha1_git_root's ancestors. limit: optional query parameter to limit the revisions log (default to 100). For now, note that this limit could impede the transitivity conclusion about sha1_git not being an ancestor of sha1_git_root (even if it is). Returns: Information on sha1_git if it is an ancestor of sha1_git_root including children leading to sha1_git_root. """ limit = int(request.args.get('limit', '100')) env = {'sha1_git_root': sha1_git_root, 'sha1_git': sha1_git, 'message': None, 'keys': [], 'revision': None} if sha1_git == sha1_git_root: return redirect(url_for('browse_revision', sha1_git=sha1_git, limit=limit)) try: revision = service.lookup_revision_with_context(sha1_git_root, sha1_git, limit) if revision: revision = utils.prepare_revision_view(revision) env.update({ + 'sha1_git': revision['id'], 'revision': revision, 'keys': set(revision.keys()) - set(['directory', 'parents', 'children']) }) else: env['message'] = "Possibly sha1_git '%s' is not an ancestor " \ "of sha1_git_root '%s'" % (sha1_git, sha1_git_root) except (BadInputExc, NotFoundExc) as e: env['message'] = str(e) return render_template('revision.html', **env) @app.route('/browse/revision//directory/') @app.route('/browse/revision//directory//') @set_renderers(HTMLRenderer) def browse_revision_directory(sha1_git, path=None): """Browse directory from revision with sha1_git. """ env = { 'sha1_git': sha1_git, 'path': '.' if not path else path, 'message': None, 'result': None } try: - result = service.lookup_directory_with_revision(sha1_git, path) + rev_id, result = service.lookup_directory_through_revision({ + 'sha1_git': sha1_git + }, path) + if result['type'] == 'dir': # dir_entries result['content'] = utils.prepare_directory_listing_with_revision( sha1_git, path, result['content']) + + env['sha1_git'] = rev_id + env['result'] = result + except (BadInputExc, NotFoundExc, NotImplementedError) as e: + env['message'] = str(e) + + return render_template('revision-directory.html', **env) + + +@app.route('/browse/revision/' + '/history/' + '/directory/') +@app.route('/browse/revision/' + '/history/' + '/directory//') +@set_renderers(HTMLRenderer) +def browse_revision_history_directory(sha1_git_root, sha1_git, path=None): + """Return information about directory pointed to by the revision + defined as: revision sha1_git, limited to the sub-graph of all + transitive parents of sha1_git_root. + + Args: + sha1_git_root: latest revision of the browsed history. + sha1_git: one of sha1_git_root's ancestors. + path: optional directory pointed to by that revision. + limit: optional query parameter to limit the revisions log + (default to 100). For now, note that this limit could impede the + transitivity conclusion about sha1_git not being an ancestor of + sha1_git_root (even if it is). + + Returns: + Information on the directory pointed to by that revision. + + Raises: + BadInputExc in case of unknown algo_hash or bad hash. + NotFoundExc if either revision is not found or if sha1_git is not an + ancestor of sha1_git_root or the path referenced does not exist + + """ + limit = int(request.args.get('limit', '100')) + + env = { + 'sha1_git_root': sha1_git_root, + 'sha1_git': sha1_git, + 'path': '.' if not path else path, + 'message': None, + 'result': None + } + + if sha1_git == sha1_git_root: + return redirect(url_for('browse_revision_directory', + sha1_git=sha1_git, + path=path), + code=301) + + try: + rev_id, result = service.lookup_directory_through_revision({ + 'sha1_git_root': sha1_git_root, + 'sha1_git': sha1_git + }, path, limit) + + print(rev_id, result) + + if result['type'] == 'dir': # dir_entries + result['content'] = utils.prepare_directory_listing_with_revision_history( # noqa + sha1_git_root, + sha1_git, + path, + result['content']) + + env['sha1_git'] = rev_id env['result'] = result except (BadInputExc, NotFoundExc, NotImplementedError) as e: env['message'] = str(e) return render_template('revision-directory.html', **env) @app.route('/browse/entity/') @app.route('/browse/entity//') @set_renderers(HTMLRenderer) def browse_entity(uuid='5f4d4c51-498a-4e28-88b3-b3e4e8396cba'): env = {'entities': [], 'message': None} entities = env['entities'] try: entities = service.lookup_entity_by_uuid(uuid) if not entities: env['message'] = "Entity '%s' not found!" % uuid except BadInputExc as e: env.update({'message': str(e)}) env['entities'] = entities return render_template('entity.html', **env)