diff --git a/swh/web/ui/api.py b/swh/web/ui/api.py index 99db425c0..d550564ab 100644 --- a/swh/web/ui/api.py +++ b/swh/web/ui/api.py @@ -1,281 +1,281 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from flask import request, url_for from flask.ext.api.decorators import set_renderers from swh.web.ui.main import app from swh.web.ui import service, renderers, utils from swh.web.ui.exc import BadInputExc, NotFoundExc @app.route('/browse/') def api_browse_endpoints(): """List the current api endpoints starting with /api or /api/. Returns: List of endpoints at /api """ return utils.filter_endpoints(app.url_map, '/browse') @app.route('/api/') def api_main_endpoints(): """List the current api endpoints starting with /api or /api/. Returns: List of endpoints at /api """ return utils.filter_endpoints(app.url_map, '/api') @app.route('/api/1/') def api_main_v1_endpoints(): """List the current api v1 endpoints starting with /api/1 or /api/1/. Returns: List of endpoints at /api/1 """ return utils.filter_endpoints(app.url_map, '/api/1') @app.route('/api/1/stat/counters/') def api_stats(): """Return statistics on SWH storage. Returns: SWH storage's statistics """ return service.stat_counters() @app.route('/api/1/search//') def api_search(q): """Search a content per hash. Args: q is of the form algo_hash:hash with algo_hash in (sha1, sha1_git, sha256) Returns: Dictionary with 'found' key and the associated result. Raises: BadInputExc in case of unknown algo_hash or bad hash """ return {'found': service.lookup_hash(q)} def _api_lookup(criteria, lookup_fn, error_msg_if_not_found): """Factorize function regarding the api to lookup for data.""" res = lookup_fn(criteria) if not res: raise NotFoundExc(error_msg_if_not_found) return res @app.route('/api/1/origin//') def api_origin(origin_id): """Return information about origin with id origin_id. Args: origin_id: the origin's identifier Returns: Information on the origin if found. Raises: NotFoundExc if the origin is not found. """ return _api_lookup( origin_id, lookup_fn=service.lookup_origin, error_msg_if_not_found='Origin with id %s not found.' % origin_id) @app.route('/api/1/person//') def api_person(person_id): """Return information about person with identifier person_id. Args: person_id: the person's identifier Returns: Information on the person if found. Raises: NotFoundExc if the person is not found. """ return _api_lookup( person_id, lookup_fn=service.lookup_person, error_msg_if_not_found='Person with id %s not found.' % person_id) @app.route('/api/1/release//') def api_release(sha1_git): """Return information about release with id sha1_git. Args: sha1_git: the release's hash Returns: Information on the release if found. Raises: BadInputExc in case of unknown algo_hash or bad hash NotFoundExc if the release is not found. """ error_msg = 'Release with sha1_git %s not found.' % sha1_git return _api_lookup( sha1_git, lookup_fn=service.lookup_release, error_msg_if_not_found=error_msg) @app.route('/api/1/revision//') def api_revision(sha1_git): """Return information about revision with id sha1_git. Args: sha1_git: the revision's hash Returns: Information on the revision if found. Raises: BadInputExc in case of unknown algo_hash or bad hash NotFoundExc if the revision is not found. """ error_msg = 'Revision with sha1_git %s not found.' % sha1_git return _api_lookup( sha1_git, lookup_fn=service.lookup_revision, error_msg_if_not_found=error_msg) @app.route('/api/1/directory//') def api_directory(sha1_git): """Return information about release with id sha1_git. Args: Directory's sha1_git Raises: BadInputExc in case of unknown algo_hash or bad hash NotFoundExc if the content is not found. """ directory_entries = service.lookup_directory(sha1_git) if not directory_entries: raise NotFoundExc('Directory with sha1_git %s not found.' % sha1_git) return list(directory_entries) @app.route('/api/1/browse//') def api_content_checksum_to_origin(q): """Return content information up to one of its origin if the content is found. Args: q is of the form algo_hash:hash with algo_hash in (sha1, sha1_git, sha256) Returns: Information on one possible origin for such content. Raises: BadInputExc in case of unknown algo_hash or bad hash NotFoundExc if the content is not found. """ found = service.lookup_hash(q)['found'] if not found: raise NotFoundExc('Content with %s not found.' % q) return service.lookup_hash_origin(q) @app.route('/api/1/content//raw/') -@set_renderers(renderers.PlainRenderer) +@set_renderers(renderers.BytesRenderer) def api_content_raw(q): """Return content's raw data if content is found. Args: q is of the form (algo_hash:)hash with algo_hash in (sha1, sha1_git, sha256). When algo_hash is not provided, 'hash' is considered sha1. Returns: Content's raw data in text/plain. Raises: - BadInputExc in case of unknown algo_hash or bad hash - NotFoundExc if the content is not found. """ content = service.lookup_content_raw(q) if not content: raise NotFoundExc('Content with %s not found.' % q) return content['data'] @app.route('/api/1/content//') def api_content_with_details(q): """Return content information if content is found. Args: q is of the form (algo_hash:)hash with algo_hash in (sha1, sha1_git, sha256). When algo_hash is not provided, 'hash' is considered sha1. Returns: Content's information. Raises: - BadInputExc in case of unknown algo_hash or bad hash - NotFoundExc if the content is not found. """ content = service.lookup_content(q) if not content: raise NotFoundExc('Content with %s not found.' % q) content['data'] = url_for('api_content_raw', q=content['sha1']) return content @app.route('/api/1/uploadnsearch/', methods=['POST']) def api_uploadnsearch(): """Upload the file's content in the post body request. Compute its hash and determine if it exists in the storage. Args: request.files filled with the filename's data to upload. Returns: Dictionary with 'sha1', 'filename' and 'found' predicate depending on whether we find it or not. Raises: BadInputExc in case of the form submitted is incorrect. """ file = request.files.get('filename') if not file: raise BadInputExc('Bad request, missing \'filename\' entry in form.') return service.upload_and_search(file) diff --git a/swh/web/ui/converters.py b/swh/web/ui/converters.py index 01f10d617..b30cb1e75 100644 --- a/swh/web/ui/converters.py +++ b/swh/web/ui/converters.py @@ -1,173 +1,173 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.core import hashutil def fmap(f, data): if isinstance(data, list): return [f(x) for x in data] if isinstance(data, dict): return {k: f(v) for (k, v) in data.items()} return f(data) def from_swh(dict_swh, hashess=[], bytess=[], blacklist=[]): """Convert from an swh dictionary to something reasonably json serializable. Args: - dict_swh: the origin dictionary needed to be transformed - hashess: list/set of keys representing hashes values (sha1, sha256, sha1_git, etc...) as bytes. Those need to be transformed in hexadecimal string - bytess: list/set of keys representing bytes values which needs to be decoded The remaining keys are copied as is in the output. Returns: dictionary equivalent as dict_swh only with its keys `converted`. """ def convert_hashes_bytes(v): """v is supposedly a hash as bytes, returns it converted in hex. """ if v and isinstance(v, bytes): return hashutil.hash_to_hex(v) return v def convert_bytes(v): """v is supposedly a bytes string, decode as utf-8. FIXME: Improve decoding policy. If not utf-8, break! """ if v and isinstance(v, bytes): return v.decode('utf-8') return v if not dict_swh: return dict_swh new_dict = {} for key, value in dict_swh.items(): if isinstance(value, dict): new_dict[key] = from_swh(value, hashess, bytess, blacklist) elif key in hashess: new_dict[key] = fmap(convert_hashes_bytes, value) elif key in bytess: new_dict[key] = fmap(convert_bytes, value) elif key in blacklist: continue else: new_dict[key] = value return new_dict def from_origin(origin): """Convert from an SWH origin to an origin dictionary. """ return from_swh(origin, hashess=set(['revision']), bytess=set(['path'])) def from_release(release): """Convert from an SWH release to a json serializable release dictionary. Args: release: Dict with the following keys - id: identifier of the revision (sha1 in bytes) - revision: identifier of the revision the release points to (sha1 in bytes) - comment: release's comment message (bytes) - name: release's name (string) - author: release's author identifier (swh's id) - synthetic: the synthetic property (boolean) Returns: Release dictionary with the following keys: - id: hexadecimal sha1 (string) - revision: hexadecimal sha1 (string) - comment: release's comment message (string) - name: release's name (string) - author: release's author identifier (swh's id) - synthetic: the synthetic property (boolean) """ return from_swh(release, hashess=set(['id', 'target']), bytess=set(['message', 'name', 'email'])) def from_revision(revision): """Convert from an SWH revision to a json serializable revision dictionary. Args: revision: Dict with the following keys - id: identifier of the revision (sha1 in bytes) - directory: identifier of the directory the revision points to (sha1 in bytes) - author_name, author_email: author's revision name and email - committer_name, committer_email: committer's revision name and email - message: revision's message - date, date_offset: revision's author date - committer_date, committer_date_offset: revision's commit date - parents: list of parents for such revision - synthetic: revision's property nature - type: revision's type (git, tar or dsc at the moment) - metadata: if the revision is synthetic, this can reference dynamic properties. Returns: Revision dictionary with the same keys as inputs, only: - sha1s are in hexadecimal strings (id, directory) - bytes are decoded in string (author_name, committer_name, author_email, committer_email, message) - remaining keys are left as is """ return from_swh(revision, hashess=set(['id', 'directory', 'parents']), bytess=set(['name', 'email', 'message'])) def from_content(content): """Convert swh content to serializable content dictionary. """ return from_swh(content, - hashess=set(['sha1', 'sha1_git', 'sha256']), - bytess=set(['data']), - blacklist=set(['status'])) + hashess={'sha1', 'sha1_git', 'sha256'}, + bytess={}, + blacklist={'status'}) def from_person(person): """Convert swh person to serializable person dictionary. """ return from_swh(person, hashess=set(), bytess=set(['name', 'email'])) def from_directory_entry(dir_entry): """Convert swh person to serializable person dictionary. """ return from_swh(dir_entry, hashess=set(['dir_id', 'sha1_git', 'sha1', 'sha256', 'target']), bytess=set(['name'])) diff --git a/swh/web/ui/renderers.py b/swh/web/ui/renderers.py index 57669cab5..b86631254 100644 --- a/swh/web/ui/renderers.py +++ b/swh/web/ui/renderers.py @@ -1,142 +1,151 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import yaml from flask import make_response, request from flask.ext.api import renderers, parsers from flask_api.mediatypes import MediaType from swh.web.ui import utils class SWHFilterEnricher(): """Global filter on fields. """ def filter_by_fields(self, data): """Extract a request parameter 'fields' if it exists to permit the filtering on the data dict's keys. If such field is not provided, returns the data as is. """ fields = request.args.get('fields') if fields: fields = set(fields.split(',')) data = utils.filter_field_keys(data, fields) return data -class PlainRenderer(renderers.BaseRenderer): +class DoNothingRenderer(renderers.BaseRenderer): + def render(self, data, media_type, **options): + return data + + +class PlainRenderer(DoNothingRenderer): """Renderer for plain/text, do nothing but send the data as is. """ media_type = 'text/plain' - def render(self, data, media_type, **options): - return data + +class BytesRenderer(DoNothingRenderer): + """Renderer for plain/text, do nothing but send the data as is. + + """ + media_type = 'application/octet-stream' class YAMLRenderer(renderers.BaseRenderer, SWHFilterEnricher): """Renderer for application/yaml. Orchestrate from python data structure to yaml. """ media_type = 'application/yaml' def render(self, data, media_type, **options): data = self.filter_by_fields(data) return yaml.dump(data, encoding=self.charset) class JSONPEnricher(): """JSONP rendering. """ def enrich_with_jsonp(self, data): """Defines a jsonp function that extracts a potential 'callback' request parameter holding the function name and wraps the data inside a call to such function e.g: GET /blah/foo/bar renders: {'output': 'wrapped'} GET /blah/foo/bar?callback=fn renders: fn({'output': 'wrapped'}) """ jsonp = request.args.get('callback') if jsonp: return '%s(%s)' % (jsonp, data) return data class SWHJSONRenderer(renderers.JSONRenderer, SWHFilterEnricher, JSONPEnricher): """Renderer for application/json. Serializes in json the data and returns it. Also deals with jsonp. If callback is found in request parameter, wrap the result as a function with name the value of the parameter query 'callback'. """ def render(self, data, media_type, **options): data = self.filter_by_fields(data) res = super().render(data, media_type, **options) return self.enrich_with_jsonp(res) class SWHBrowsableAPIRenderer(renderers.BrowsableAPIRenderer): """SWH's browsable api renderer. """ template = "api.html" RENDERERS = [ 'swh.web.ui.renderers.SWHJSONRenderer', 'swh.web.ui.renderers.SWHBrowsableAPIRenderer', 'flask.ext.api.parsers.URLEncodedParser', 'swh.web.ui.renderers.YAMLRenderer', 'swh.web.ui.renderers.PlainRenderer', ] RENDERERS_INSTANCE = [ SWHJSONRenderer(), SWHBrowsableAPIRenderer(), parsers.URLEncodedParser(), YAMLRenderer(), ] RENDERERS_BY_TYPE = { r.media_type: r for r in RENDERERS_INSTANCE } def error_response(default_error_msg, error_code, error): """Private function to create a custom error response. """ # if nothing is requested by client, use json default_application_type = 'application/json' accept_type = request.headers.get('Accept', default_application_type) renderer = RENDERERS_BY_TYPE.get( accept_type, RENDERERS_BY_TYPE[default_application_type]) # for edge cases, use the elected renderer's media type accept_type = renderer.media_type response = make_response(default_error_msg, error_code) response.headers['Content-Type'] = accept_type response.data = renderer.render({"error": str(error)}, media_type=MediaType(accept_type), status=error_code, headers={'Content-Type': accept_type}) return response diff --git a/swh/web/ui/tests/test_api.py b/swh/web/ui/tests/test_api.py index 41ffd6121..0257bd96e 100644 --- a/swh/web/ui/tests/test_api.py +++ b/swh/web/ui/tests/test_api.py @@ -1,535 +1,535 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json import yaml from nose.tools import istest from unittest.mock import patch, MagicMock from swh.web.ui.tests import test_app class ApiTestCase(test_app.SWHApiTestCase): @patch('swh.web.ui.api.service') @istest def api_content_checksum_to_origin(self, mock_service): mock_service.lookup_hash.return_value = {'found': True} stub_origin = { "lister": None, "url": "rsync://ftp.gnu.org/old-gnu/webbase", "type": "ftp", "id": 2, "project": None } mock_service.lookup_hash_origin.return_value = stub_origin # when rv = self.app.get( '/api/1/browse/sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, stub_origin) mock_service.lookup_hash.assert_called_once_with( 'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03') mock_service.lookup_hash_origin.assert_called_once_with( 'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.ui.api.service') @istest def api_content_checksum_to_origin_sha_not_found(self, mock_service): # given mock_service.lookup_hash.return_value = {'found': False} # when rv = self.app.get( '/api/1/browse/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Content with sha1:40e71b8614fcd89ccd17ca2b1d9e6' '6c5b00a6d03 not found.' }) mock_service.lookup_hash.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.ui.api.service') @istest def api_content_with_details(self, mock_service): # given mock_service.lookup_content.return_value = { 'data': 'some content data', 'sha1': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'sha1_git': 'b4e8f472ffcb01a03875b26e462eb568739f6882', 'sha256': '83c0e67cc80f60caf1fcbec2d84b0ccd7968b3be4735637006560' 'cde9b067a4f', 'length': 17, 'status': 'visible' } # when rv = self.app.get( '/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'data': '/api/1/content/' '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/raw/', 'sha1': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'sha1_git': 'b4e8f472ffcb01a03875b26e462eb568739f6882', 'sha256': '83c0e67cc80f60caf1fcbec2d84b0ccd7968b3be4735637006560c' 'de9b067a4f', 'length': 17, 'status': 'visible' }) mock_service.lookup_content.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.ui.api.service') @istest def api_content_not_found_as_json(self, mock_service): # given mock_service.lookup_content.return_value = None mock_service.lookup_hash_origin = MagicMock() # when rv = self.app.get( '/api/1/content/sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3' 'be4735637006560c/') self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Content with sha256:83c0e67cc80f60caf1fcbec2d84b0ccd79' '68b3be4735637006560c not found.' }) mock_service.lookup_content.assert_called_once_with( 'sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3' 'be4735637006560c') mock_service.lookup_hash_origin.called = False @patch('swh.web.ui.api.service') @istest def api_content_not_found_as_yaml(self, mock_service): # given mock_service.lookup_content.return_value = None mock_service.lookup_hash_origin = MagicMock() # when rv = self.app.get( '/api/1/content/sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3' 'be4735637006560c/', headers={'accept': 'application/yaml'}) self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/yaml') response_data = yaml.load(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Content with sha256:83c0e67cc80f60caf1fcbec2d84b0ccd79' '68b3be4735637006560c not found.' }) mock_service.lookup_content.assert_called_once_with( 'sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3' 'be4735637006560c') mock_service.lookup_hash_origin.called = False @patch('swh.web.ui.api.service') @istest def api_content_raw(self, mock_service): # given stub_content = {'data': 'some content data'} mock_service.lookup_content_raw.return_value = stub_content # when rv = self.app.get( '/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03' '/raw/', headers={'Content-Type': 'text/plain'}) self.assertEquals(rv.status_code, 200) - self.assertEquals(rv.mimetype, 'text/plain') + self.assertEquals(rv.mimetype, 'application/octet-stream') self.assertEquals(rv.data.decode('utf-8'), stub_content['data']) mock_service.lookup_content_raw.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.ui.api.service') @istest def api_content_raw_not_found(self, mock_service): # given mock_service.lookup_content_raw.return_value = None # when rv = self.app.get( '/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03' '/raw/', headers={'Content-Type': 'text/plain'}) self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Content with sha1:40e71b8614fcd89ccd17ca2b1d9e6' '6c5b00a6d03 not found.' }) mock_service.lookup_content_raw.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.ui.api.service') @istest def api_search(self, mock_service): # given mock_service.lookup_hash.return_value = True # when rv = self.app.get('/api/1/search/sha1:blah/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, {'found': True}) mock_service.lookup_hash.assert_called_once_with('sha1:blah') @patch('swh.web.ui.api.service') @istest def api_search_as_yaml(self, mock_service): # given mock_service.lookup_hash.return_value = True # when rv = self.app.get('/api/1/search/sha1:halb/', headers={'Accept': 'application/yaml'}) self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/yaml') response_data = yaml.load(rv.data.decode('utf-8')) self.assertEquals(response_data, {'found': True}) mock_service.lookup_hash.assert_called_once_with('sha1:halb') @patch('swh.web.ui.api.service') @istest def api_search_not_found(self, mock_service): # given mock_service.lookup_hash.return_value = False # when rv = self.app.get('/api/1/search/sha1:halb/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, {'found': False}) mock_service.lookup_hash.assert_called_once_with('sha1:halb') @patch('swh.web.ui.api.service') @istest def api_1_stat_counters_raise_error(self, mock_service): # given mock_service.stat_counters.side_effect = ValueError( 'voluntary error to check the bad request middleware.') # when rv = self.app.get('/api/1/stat/counters/') # then self.assertEquals(rv.status_code, 400) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'voluntary error to check the bad request middleware.'}) @patch('swh.web.ui.api.service') @istest def api_1_stat_counters(self, mock_service): # given stub_stats = { "content": 1770830, "directory": 211683, "directory_entry_dir": 209167, "directory_entry_file": 1807094, "directory_entry_rev": 0, "entity": 0, "entity_history": 0, "occurrence": 0, "occurrence_history": 19600, "origin": 1096, "person": 0, "release": 8584, "revision": 7792, "revision_history": 0, "skipped_content": 0 } mock_service.stat_counters.return_value = stub_stats # when rv = self.app.get('/api/1/stat/counters/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, stub_stats) mock_service.stat_counters.assert_called_once_with() @patch('swh.web.ui.api.service') @patch('swh.web.ui.api.request') @istest def api_uploadnsearch(self, mock_request, mock_service): # given mock_request.files = {'filename': 'simple-filename'} mock_service.upload_and_search.return_value = { 'filename': 'simple-filename', 'sha1': 'some-hex-sha1', 'found': False, } # when rv = self.app.post('/api/1/uploadnsearch/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, {'filename': 'simple-filename', 'sha1': 'some-hex-sha1', 'found': False}) mock_service.upload_and_search.assert_called_once_with( 'simple-filename') @patch('swh.web.ui.api.service') @istest def api_origin(self, mock_service): # given stub_origin = { 'id': 1234, 'lister': 'uuid-lister-0', 'project': 'uuid-project-0', 'url': 'ftp://some/url/to/origin/0', 'type': 'ftp' } mock_service.lookup_origin.return_value = stub_origin # when rv = self.app.get('/api/1/origin/1234/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, stub_origin) mock_service.lookup_origin.assert_called_with(1234) @patch('swh.web.ui.api.service') @istest def api_origin_not_found(self, mock_service): # given mock_service.lookup_origin.return_value = None # when rv = self.app.get('/api/1/origin/4321/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Origin with id 4321 not found.' }) mock_service.lookup_origin.assert_called_with(4321) @patch('swh.web.ui.api.service') @istest def api_release(self, mock_service): # given stub_release = { 'id': 'release-0', 'revision': 'revision-sha1', 'author_name': 'author release name', 'author_email': 'author@email', } mock_service.lookup_release.return_value = stub_release # when rv = self.app.get('/api/1/release/release-0/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, stub_release) @patch('swh.web.ui.api.service') @istest def api_release_not_found(self, mock_service): # given mock_service.lookup_release.return_value = None # when rv = self.app.get('/api/1/release/release-0/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Release with sha1_git release-0 not found.' }) @patch('swh.web.ui.api.service') @istest def api_revision(self, mock_service): # given stub_revision = { 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': [], 'type': 'tar', 'synthetic': True, 'metadata': { 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912' }] }, } mock_service.lookup_revision.return_value = stub_revision # when rv = self.app.get('/api/1/revision/revision-0/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, stub_revision) @patch('swh.web.ui.api.service') @istest def api_revision_not_found(self, mock_service): # given mock_service.lookup_revision.return_value = None # when rv = self.app.get('/api/1/revision/revision-0/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Revision with sha1_git revision-0 not found.'}) @patch('swh.web.ui.api.service') @istest def api_person(self, mock_service): # given stub_person = { 'id': '198003', 'name': 'Software Heritage', 'email': 'robot@softwareheritage.org', } mock_service.lookup_person.return_value = stub_person # when rv = self.app.get('/api/1/person/198003/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, stub_person) @patch('swh.web.ui.api.service') @istest def api_person_not_found(self, mock_service): # given mock_service.lookup_person.return_value = None # when rv = self.app.get('/api/1/person/666/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Person with id 666 not found.'}) @patch('swh.web.ui.api.service') @istest def api_directory(self, mock_service): # given stub_directory = [{ 'sha1_git': '18d8be353ed3480476f032475e7c233eff7371d5', }] mock_service.lookup_directory.return_value = stub_directory # when rv = self.app.get('/api/1/directory/' '18d8be353ed3480476f032475e7c233eff7371d5/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, stub_directory) @patch('swh.web.ui.api.service') @istest def api_directory_not_found(self, mock_service): # given mock_service.lookup_directory.return_value = [] # when rv = self.app.get('/api/1/directory/' '66618d8be353ed3480476f032475e7c233eff737/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Directory with sha1_git ' '66618d8be353ed3480476f032475e7c233eff737 not found.'}) diff --git a/swh/web/ui/tests/test_converters.py b/swh/web/ui/tests/test_converters.py index 649543424..ef8e720d4 100644 --- a/swh/web/ui/tests/test_converters.py +++ b/swh/web/ui/tests/test_converters.py @@ -1,362 +1,362 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import unittest from nose.tools import istest from swh.core import hashutil from swh.web.ui import converters class ConvertersTestCase(unittest.TestCase): @istest def from_swh(self): some_input = { 'a': 'something', 'b': 'someone', 'c': b'sharp-0.3.4.tgz', 'd': hashutil.hex_to_hash( 'b04caf10e9535160d90e874b45aa426de762f19f'), 'e': b'sharp.html/doc_002dS_005fISREG.html', 'g': [b'utf-8-to-decode', b'another-one'], 'h': 'something filtered', 'i': {'e': b'something'}, 'j': { 'k': { 'l': [b'bytes thing', b'another thingy'], 'n': 'dont care either' }, 'm': 'dont care' } } expected_output = { 'a': 'something', 'b': 'someone', 'c': 'sharp-0.3.4.tgz', 'd': 'b04caf10e9535160d90e874b45aa426de762f19f', 'e': 'sharp.html/doc_002dS_005fISREG.html', 'g': ['utf-8-to-decode', 'another-one'], 'i': {'e': 'something'}, 'j': { 'k': { 'l': ['bytes thing', 'another thingy'] } }, } actual_output = converters.from_swh(some_input, hashess={'d'}, bytess={'c', 'e', 'g', 'l'}, blacklist={'h', 'm', 'n'}) self.assertEquals(expected_output, actual_output) @istest def from_swh_edge_cases_do_no_conversion_if_none_or_not_bytes(self): some_input = { 'a': 'something', 'b': None, 'c': 'someone', 'd': None, } expected_output = { 'a': 'something', 'b': None, 'c': 'someone', 'd': None, } actual_output = converters.from_swh(some_input, hashess={'a', 'b'}, bytess={'c', 'd'}) self.assertEquals(expected_output, actual_output) @istest def from_swh_empty(self): # when self.assertEquals({}, converters.from_swh({})) @istest def from_swh_none(self): # when self.assertIsNone(converters.from_swh(None)) @istest def from_origin(self): # given origin_input = { 'origin_type': 'ftp', 'origin_url': 'rsync://ftp.gnu.org/gnu/octave', 'branch': 'octave-3.4.0.tar.gz', 'revision': b'\xb0L\xaf\x10\xe9SQ`\xd9\x0e\x87KE\xaaBm\xe7b\xf1\x9f', # noqa 'path': b'octave-3.4.0/doc/interpreter/octave.html/doc_002dS_005fISREG.html' # noqa } expected_origin = { 'origin_type': 'ftp', 'origin_url': 'rsync://ftp.gnu.org/gnu/octave', 'branch': 'octave-3.4.0.tar.gz', 'revision': 'b04caf10e9535160d90e874b45aa426de762f19f', 'path': 'octave-3.4.0/doc/interpreter/octave.html/doc_002dS_005fISREG.html' # noqa } # when actual_origin = converters.from_origin(origin_input) # then self.assertEqual(actual_origin, expected_origin) @istest def from_release(self): release_input = { 'id': hashutil.hex_to_hash( 'aad23fa492a0c5fed0708a6703be875448c86884'), 'target': hashutil.hex_to_hash( '5e46d564378afc44b31bb89f99d5675195fbdf67'), 'target_type': 'revision', 'date': datetime.datetime(2015, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc), 'author': { 'name': b'author name', 'email': b'author@email', }, 'name': b'v0.0.1', 'message': b'some comment on release', 'synthetic': True, } expected_release = { 'id': 'aad23fa492a0c5fed0708a6703be875448c86884', 'target': '5e46d564378afc44b31bb89f99d5675195fbdf67', 'target_type': 'revision', 'date': datetime.datetime(2015, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc), 'author': { 'name': 'author name', 'email': 'author@email', }, 'name': 'v0.0.1', 'message': 'some comment on release', 'target_type': 'revision', 'synthetic': True, } # when actual_release = converters.from_release(release_input) # then self.assertEqual(actual_release, expected_release) @istest def from_release_no_revision(self): release_input = { 'id': hashutil.hex_to_hash( 'b2171ee2bdf119cd99a7ec7eff32fa8013ef9a4e'), 'target': None, 'date': datetime.datetime(2016, 3, 2, 10, 0, 0, tzinfo=datetime.timezone.utc), 'name': b'v0.1.1', 'message': b'comment on release', 'synthetic': False, 'author': { 'name': b'bob', 'email': b'bob@alice.net', }, } expected_release = { 'id': 'b2171ee2bdf119cd99a7ec7eff32fa8013ef9a4e', 'target': None, 'date': datetime.datetime(2016, 3, 2, 10, 0, 0, tzinfo=datetime.timezone.utc), 'name': 'v0.1.1', 'message': 'comment on release', 'synthetic': False, 'author': { 'name': 'bob', 'email': 'bob@alice.net', }, } # when actual_release = converters.from_release(release_input) # then self.assertEqual(actual_release, expected_release) @istest def from_revision(self): revision_input = { 'id': hashutil.hex_to_hash( '18d8be353ed3480476f032475e7c233eff7371d5'), 'directory': hashutil.hex_to_hash( '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'), 'author': { 'name': b'Software Heritage', 'email': b'robot@softwareheritage.org', }, 'committer': { 'name': b'Software Heritage', 'email': b'robot@softwareheritage.org', }, 'message': b'synthetic revision message', 'date': datetime.datetime(2000, 1, 17, 11, 23, 54, tzinfo=None), 'date_offset': 0, 'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54, tzinfo=None), 'committer_date_offset': 0, 'synthetic': True, 'type': 'tar', 'parents': [ hashutil.hex_to_hash( '29d8be353ed3480476f032475e7c244eff7371d5'), hashutil.hex_to_hash( '30d8be353ed3480476f032475e7c244eff7371d5') ], 'metadata': { 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912', }] }, } expected_revision = { 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author': { 'name': 'Software Heritage', 'email': 'robot@softwareheritage.org', }, 'committer': { 'name': 'Software Heritage', 'email': 'robot@softwareheritage.org', }, 'message': 'synthetic revision message', 'date': datetime.datetime(2000, 1, 17, 11, 23, 54, tzinfo=None), 'date_offset': 0, 'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54, tzinfo=None), 'committer_date_offset': 0, 'parents': [ '29d8be353ed3480476f032475e7c244eff7371d5', '30d8be353ed3480476f032475e7c244eff7371d5' ], 'type': 'tar', 'synthetic': True, 'metadata': { 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912' }] }, } # when actual_revision = converters.from_revision(revision_input) # then self.assertEqual(actual_revision, expected_revision) @istest def from_content(self): content_input = { 'sha1': hashutil.hex_to_hash('5c6f0e2750f48fa0bd0c4cf5976ba0b9e0' '2ebda5'), 'sha256': hashutil.hex_to_hash('39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926'), 'sha1_git': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'data': b'data in bytes', 'length': 10, 'status': 'visible', } # 'status' is filtered expected_content = { 'sha1': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5', 'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d274' '7d3bf96c926', 'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', - 'data': 'data in bytes', + 'data': b'data in bytes', 'length': 10, } # when actual_content = converters.from_content(content_input) # then self.assertEqual(actual_content, expected_content) @istest def from_person(self): person_input = { 'id': 10, 'anything': 'else', 'name': b'bob', 'email': b'bob@foo.alice', } expected_person = { 'id': 10, 'anything': 'else', 'name': 'bob', 'email': 'bob@foo.alice', } # when actual_person = converters.from_person(person_input) # then self.assertEqual(actual_person, expected_person) @istest def from_directory_entries(self): dir_entries_input = { 'sha1': hashutil.hex_to_hash('5c6f0e2750f48fa0bd0c4cf5976ba0b9e0' '2ebda5'), 'sha256': hashutil.hex_to_hash('39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926'), 'sha1_git': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'target': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'dir_id': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'name': b'bob', 'type': 10, } expected_dir_entries = { 'sha1': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5', 'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d2747' 'd3bf96c926', 'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'target': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'dir_id': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'name': 'bob', 'type': 10, } # when actual_dir_entries = converters.from_directory_entry(dir_entries_input) # then self.assertEqual(actual_dir_entries, expected_dir_entries) diff --git a/swh/web/ui/tests/test_renderers.py b/swh/web/ui/tests/test_renderers.py index 72075366e..3239c840e 100644 --- a/swh/web/ui/tests/test_renderers.py +++ b/swh/web/ui/tests/test_renderers.py @@ -1,190 +1,215 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json import unittest import yaml from flask_api.mediatypes import MediaType from nose.tools import istest from unittest.mock import patch from swh.web.ui import renderers class RendererTestCase(unittest.TestCase): @patch('swh.web.ui.renderers.request') @istest def SWHFilterRenderer_do_nothing(self, mock_request): # given mock_request.args = {} swhFilterRenderer = renderers.SWHFilterEnricher() input_data = {'a': 'some-data'} # when actual_data = swhFilterRenderer.filter_by_fields(input_data) # then self.assertEquals(actual_data, input_data) @patch('swh.web.ui.renderers.utils') @patch('swh.web.ui.renderers.request') @istest def SWHFilterRenderer_do_filter(self, mock_request, mock_utils): # given mock_request.args = {'fields': 'a,c'} mock_utils.filter_field_keys.return_value = {'a': 'some-data'} swhFilterRenderer = renderers.SWHFilterEnricher() input_data = {'a': 'some-data', 'b': 'some-other-data'} # when actual_data = swhFilterRenderer.filter_by_fields(input_data) # then self.assertEquals(actual_data, {'a': 'some-data'}) mock_utils.filter_field_keys.assert_called_once_with(input_data, {'a', 'c'}) + @istest + def doNothingRenderer(self): + # given + doNothingRenderer = renderers.DoNothingRenderer() + input_data = 'some data' + + # when + actual_data = doNothingRenderer.render(input_data, 'some-media-type') + + # then + self.assertEqual(actual_data, input_data) # do nothing on data + @istest def plainRenderer(self): # given plainRenderer = renderers.PlainRenderer() input_data = 'some data' # when actual_data = plainRenderer.render(input_data, 'some-media-type') # then self.assertEqual(actual_data, input_data) # do nothing on data + @istest + def bytesRenderer(self): + # given + bytesRenderer = renderers.BytesRenderer() + input_data = b'some data' + + # when + actual_data = bytesRenderer.render(input_data, 'some-media-type') + + # then + self.assertEqual('application/octet-stream', bytesRenderer.media_type) + self.assertEqual(actual_data, input_data) # do nothing on data + @patch('swh.web.ui.renderers.request') @istest def yamlRenderer_without_filter(self, mock_request): # given mock_request.args = {} yamlRenderer = renderers.YAMLRenderer() input_data = {'target': 'sha1-dir', 'type': 'dir', 'dir-id': 'dir-id-sha1-git'} expected_data = input_data # when actual_data = yamlRenderer.render(input_data, 'application/yaml') # then self.assertEqual(yaml.load(actual_data), expected_data) @patch('swh.web.ui.renderers.request') @istest def yamlRenderer(self, mock_request): # given mock_request.args = {'fields': 'type,target'} yamlRenderer = renderers.YAMLRenderer() input_data = {'target': 'sha1-dir', 'type': 'dir', 'dir-id': 'dir-id-sha1-git'} expected_data = {'target': 'sha1-dir', 'type': 'dir'} # when actual_data = yamlRenderer.render(input_data, 'application/yaml') # then self.assertEqual(yaml.load(actual_data), expected_data) @patch('swh.web.ui.renderers.request') @istest def jsonRenderer_basic(self, mock_request): # given mock_request.args = {} jsonRenderer = renderers.SWHJSONRenderer() input_data = {'target': 'sha1-dir', 'type': 'dir', 'dir-id': 'dir-id-sha1-git'} expected_data = input_data # when actual_data = jsonRenderer.render(input_data, MediaType( 'application/json')) # then self.assertEqual(json.loads(actual_data), expected_data) @patch('swh.web.ui.renderers.request') @istest def jsonRenderer_basic_with_filter(self, mock_request): # given mock_request.args = {'fields': 'target'} jsonRenderer = renderers.SWHJSONRenderer() input_data = {'target': 'sha1-dir', 'type': 'dir', 'dir-id': 'dir-id-sha1-git'} expected_data = {'target': 'sha1-dir'} # when actual_data = jsonRenderer.render(input_data, MediaType( 'application/json')) # then self.assertEqual(json.loads(actual_data), expected_data) @patch('swh.web.ui.renderers.request') @istest def jsonRenderer_basic_with_filter_and_jsonp(self, mock_request): # given mock_request.args = {'fields': 'target', 'callback': 'jsonpfn'} jsonRenderer = renderers.SWHJSONRenderer() input_data = {'target': 'sha1-dir', 'type': 'dir', 'dir-id': 'dir-id-sha1-git'} # when actual_data = jsonRenderer.render(input_data, MediaType( 'application/json')) # then self.assertEqual(actual_data, 'jsonpfn({"target": "sha1-dir"})') @patch('swh.web.ui.renderers.request') @istest def jsonpEnricher_basic_with_filter_and_jsonp(self, mock_request): # given mock_request.args = {'callback': 'jsonpfn'} jsonpEnricher = renderers.JSONPEnricher() # when actual_output = jsonpEnricher.enrich_with_jsonp({'output': 'test'}) # then self.assertEqual(actual_output, "jsonpfn({'output': 'test'})") @patch('swh.web.ui.renderers.request') @istest def jsonpEnricher_do_nothing(self, mock_request): # given mock_request.args = {} jsonpEnricher = renderers.JSONPEnricher() # when actual_output = jsonpEnricher.enrich_with_jsonp({'output': 'test'}) # then self.assertEqual(actual_output, {'output': 'test'}) diff --git a/swh/web/ui/tests/test_service.py b/swh/web/ui/tests/test_service.py index fffddbbcc..ae089bef8 100644 --- a/swh/web/ui/tests/test_service.py +++ b/swh/web/ui/tests/test_service.py @@ -1,545 +1,545 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from nose.tools import istest from unittest.mock import MagicMock, patch from swh.core.hashutil import hex_to_hash from swh.web.ui import service from swh.web.ui.exc import BadInputExc from swh.web.ui.tests import test_app class ServiceTestCase(test_app.SWHApiTestCase): @istest def lookup_hash_does_not_exist(self): # given self.storage.content_find = MagicMock(return_value=None) # when actual_lookup = service.lookup_hash( 'sha1_git:123caf10e9535160d90e874b45aa426de762f19f') # then self.assertEquals({'found': None, 'algo': 'sha1_git'}, actual_lookup) # check the function has been called with parameters self.storage.content_find.assert_called_with({ 'sha1_git': hex_to_hash('123caf10e9535160d90e874b45aa426de762f19f') }) @istest def lookup_hash_exist(self): # given stub_content = { 'sha1': hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f') } self.storage.content_find = MagicMock(return_value=stub_content) # when actual_lookup = service.lookup_hash( 'sha1:456caf10e9535160d90e874b45aa426de762f19f') # then self.assertEquals({'found': stub_content, 'algo': 'sha1'}, actual_lookup) self.storage.content_find.assert_called_with({ 'sha1': hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f'), }) @istest def lookup_hash_origin(self): # given self.storage.content_find_occurrence = MagicMock(return_value={ 'origin_type': 'sftp', 'origin_url': 'sftp://ftp.gnu.org/gnu/octave', 'branch': 'octavio-3.4.0.tar.gz', 'revision': b'\xb0L\xaf\x10\xe9SQ`\xd9\x0e\x87KE\xaaBm\xe7b\xf1\x9f', # noqa 'path': b'octavio-3.4.0/doc/interpreter/octave.html/doc_002dS_005fISREG.html' # noqa }) expected_origin = { 'origin_type': 'sftp', 'origin_url': 'sftp://ftp.gnu.org/gnu/octave', 'branch': 'octavio-3.4.0.tar.gz', 'revision': 'b04caf10e9535160d90e874b45aa426de762f19f', 'path': 'octavio-3.4.0/doc/interpreter/octave.html/doc' '_002dS_005fISREG.html' } # when actual_origin = service.lookup_hash_origin( 'sha1_git:456caf10e9535160d90e874b45aa426de762f19f') # then self.assertEqual(actual_origin, expected_origin) self.storage.content_find_occurrence.assert_called_with( {'sha1_git': hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')}) @istest def stat_counters(self): # given input_stats = { "content": 1770830, "directory": 211683, "directory_entry_dir": 209167, "directory_entry_file": 1807094, "directory_entry_rev": 0, "entity": 0, "entity_history": 0, "occurrence": 0, "occurrence_history": 19600, "origin": 1096, "person": 0, "release": 8584, "revision": 7792, "revision_history": 0, "skipped_content": 0 } self.storage.stat_counters = MagicMock(return_value=input_stats) # when actual_stats = service.stat_counters() # then expected_stats = input_stats self.assertEqual(actual_stats, expected_stats) self.storage.stat_counters.assert_called_with() @patch('swh.web.ui.service.hashutil') @istest def hash_and_search(self, mock_hashutil): # given bhash = hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f') mock_hashutil.hashfile.return_value = {'sha1': bhash} self.storage.content_find = MagicMock(return_value={ 'sha1': bhash, 'sha1_git': bhash, }) # when actual_content = service.hash_and_search('/some/path') # then self.assertEqual(actual_content, { 'sha1': '456caf10e9535160d90e874b45aa426de762f19f', 'sha1_git': '456caf10e9535160d90e874b45aa426de762f19f', 'found': True, }) mock_hashutil.hashfile.assert_called_once_with('/some/path') self.storage.content_find.assert_called_once_with({'sha1': bhash}) @patch('swh.web.ui.service.hashutil') @istest def hash_and_search_not_found(self, mock_hashutil): # given bhash = hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f') mock_hashutil.hashfile.return_value = {'sha1': bhash} mock_hashutil.hash_to_hex = MagicMock( return_value='456caf10e9535160d90e874b45aa426de762f19f') self.storage.content_find = MagicMock(return_value=None) # when actual_content = service.hash_and_search('/some/path') # then self.assertEqual(actual_content, { 'sha1': '456caf10e9535160d90e874b45aa426de762f19f', 'found': False, }) mock_hashutil.hashfile.assert_called_once_with('/some/path') self.storage.content_find.assert_called_once_with({'sha1': bhash}) mock_hashutil.hash_to_hex.assert_called_once_with(bhash) @patch('swh.web.ui.service.upload') @istest def test_upload_and_search(self, mock_upload): mock_upload.save_in_upload_folder.return_value = ( '/tmp/dir', 'some-filename', '/tmp/dir/path/some-filename') service.hash_and_search = MagicMock(side_effect=lambda filepath: {'sha1': 'blah', 'found': True}) mock_upload.cleanup.return_value = None file = MagicMock(filename='some-filename') # when actual_res = service.upload_and_search(file) # then self.assertEqual(actual_res, { 'filename': 'some-filename', 'sha1': 'blah', 'found': True}) mock_upload.save_in_upload_folder.assert_called_with(file) mock_upload.cleanup.assert_called_with('/tmp/dir') service.hash_and_search.assert_called_once_with( '/tmp/dir/path/some-filename') @istest def lookup_origin(self): # given self.storage.origin_get = MagicMock(return_value={ 'id': 'origin-id', 'lister': 'uuid-lister', 'project': 'uuid-project', 'url': 'ftp://some/url/to/origin', 'type': 'ftp'}) # when actual_origin = service.lookup_origin('origin-id') # then self.assertEqual(actual_origin, {'id': 'origin-id', 'lister': 'uuid-lister', 'project': 'uuid-project', 'url': 'ftp://some/url/to/origin', 'type': 'ftp'}) self.storage.origin_get.assert_called_with({'id': 'origin-id'}) @istest def lookup_release(self): # given self.storage.release_get = MagicMock(return_value=[{ 'id': hex_to_hash('65a55bbdf3629f916219feb3dcc7393ded1bc8db'), 'target': None, 'date': datetime.datetime(2015, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc), 'name': b'v0.0.1', 'message': b'synthetic release', 'synthetic': True, }]) # when actual_release = service.lookup_release( '65a55bbdf3629f916219feb3dcc7393ded1bc8db') # then self.assertEqual(actual_release, { 'id': '65a55bbdf3629f916219feb3dcc7393ded1bc8db', 'target': None, 'date': datetime.datetime(2015, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc), 'name': 'v0.0.1', 'message': 'synthetic release', 'synthetic': True, }) self.storage.release_get.assert_called_with( [hex_to_hash('65a55bbdf3629f916219feb3dcc7393ded1bc8db')]) @istest def lookup_release_ko_id_checksum_not_ok_because_not_a_sha1(self): # given self.storage.release_get = MagicMock() with self.assertRaises(BadInputExc) as cm: # when service.lookup_release('not-a-sha1') self.assertIn('invalid checksum', cm.exception.args[0]) self.storage.release_get.called = False @istest def lookup_release_ko_id_checksum_ok_but_not_a_sha1(self): # given self.storage.release_get = MagicMock() # when with self.assertRaises(BadInputExc) as cm: service.lookup_release( '13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f62d4daf5' '1aea892abe') self.assertIn('sha1_git supported', cm.exception.args[0]) self.storage.release_get.called = False @istest def lookup_revision(self): # given self.storage.revision_get = MagicMock(return_value=[{ 'id': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'), 'directory': hex_to_hash( '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'), 'author': { 'name': b'bill & boule', 'email': b'bill@boule.org', }, 'committer': { 'name': b'boule & bill', 'email': b'boule@bill.org', }, 'message': b'elegant fix for bug 31415957', 'date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'date_offset': 0, 'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'committer_date_offset': 0, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], }]) # when actual_revision = service.lookup_revision( '18d8be353ed3480476f032475e7c233eff7371d5') # then self.assertEqual(actual_revision, { 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author': { 'name': 'bill & boule', 'email': 'bill@boule.org', }, 'committer': { 'name': 'boule & bill', 'email': 'boule@bill.org', }, 'message': 'elegant fix for bug 31415957', 'date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'date_offset': 0, 'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'committer_date_offset': 0, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], }) self.storage.revision_get.assert_called_with( [hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5')]) @istest def lookup_content_raw_not_found(self): # given self.storage.content_find = MagicMock(return_value=None) # when actual_content = service.lookup_content_raw( 'sha1:18d8be353ed3480476f032475e7c233eff7371d5') # then self.assertIsNone(actual_content) self.storage.content_find.assert_called_with( {'sha1': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5')}) @istest def lookup_content_raw(self): # given self.storage.content_find = MagicMock(return_value={ 'sha1': '18d8be353ed3480476f032475e7c233eff7371d5', }) self.storage.content_get = MagicMock(return_value=[{ 'data': b'binary data', }, {}]) # when actual_content = service.lookup_content_raw( 'sha256:39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926') # then - self.assertEquals(actual_content, {'data': 'binary data'}) + self.assertEquals(actual_content, {'data': b'binary data'}) self.storage.content_find.assert_called_once_with( {'sha256': hex_to_hash('39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926')}) self.storage.content_get.assert_called_once_with( ['18d8be353ed3480476f032475e7c233eff7371d5']) @istest def lookup_content_not_found(self): # given self.storage.content_find = MagicMock(return_value=None) # when actual_content = service.lookup_content( 'sha1:18d8be353ed3480476f032475e7c233eff7371d5') # then self.assertIsNone(actual_content) self.storage.content_find.assert_called_with( {'sha1': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5')}) @istest def lookup_content_with_sha1(self): # given self.storage.content_find = MagicMock(return_value={ 'sha1': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'), 'sha256': hex_to_hash('39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926'), 'sha1_git': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'length': 190, 'status': 'absent', }) # when actual_content = service.lookup_content( 'sha1:18d8be353ed3480476f032475e7c233eff7371d5') # then self.assertEqual(actual_content, { 'sha1': '18d8be353ed3480476f032475e7c233eff7371d5', 'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d274' '7d3bf96c926', 'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'length': 190, }) self.storage.content_find.assert_called_with( {'sha1': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5')}) @istest def lookup_content_with_sha256(self): # given self.storage.content_find = MagicMock(return_value={ 'sha1': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'), 'sha256': hex_to_hash('39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926'), 'sha1_git': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'length': 360, 'status': 'visible', }) # when actual_content = service.lookup_content( 'sha256:39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926') # then self.assertEqual(actual_content, { 'sha1': '18d8be353ed3480476f032475e7c233eff7371d5', 'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d274' '7d3bf96c926', 'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'length': 360, }) self.storage.content_find.assert_called_with( {'sha256': hex_to_hash('39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926')}) @istest def lookup_person(self): # given self.storage.person_get = MagicMock(return_value=[{ 'id': 'person_id', 'name': b'some_name', 'email': b'some-email', }]) # when actual_person = service.lookup_person('person_id') # then self.assertEqual(actual_person, { 'id': 'person_id', 'name': 'some_name', 'email': 'some-email', }) self.storage.person_get.assert_called_with(['person_id']) @istest def lookup_person_not_found(self): # given self.storage.person_get = MagicMock(return_value=[]) # when actual_person = service.lookup_person('person_id') # then self.assertIsNone(actual_person) self.storage.person_get.assert_called_with(['person_id']) @istest def lookup_directory_bad_checksum(self): # given self.storage.directory_get = MagicMock() # when with self.assertRaises(BadInputExc): service.lookup_directory('directory_id') # then self.storage.directory_get.called = False @istest def lookup_directory_not_found(self): # given self.storage.directory_get = MagicMock(return_value=[]) # when actual_directory = service.lookup_directory( '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') # then self.assertIsNone(actual_directory) self.storage.directory_get.assert_called_with( hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')) @istest def lookup_directory(self): # given dir_entries_input = { 'sha1': hex_to_hash('5c6f0e2750f48fa0bd0c4cf5976ba0b9e0' '2ebda5'), 'sha256': hex_to_hash('39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926'), 'sha1_git': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'target': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'dir_id': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'name': b'bob', 'type': 10, } expected_dir_entries = { 'sha1': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5', 'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d2747' 'd3bf96c926', 'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'target': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'dir_id': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'name': 'bob', 'type': 10, } self.storage.directory_get = MagicMock( return_value=[dir_entries_input]) # when actual_directory = service.lookup_directory( '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') # then self.assertIsNotNone(actual_directory) self.assertEqual(actual_directory, [expected_dir_entries]) self.storage.directory_get.assert_called_with( hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')) diff --git a/swh/web/ui/tests/test_views.py b/swh/web/ui/tests/test_views.py index 0cd69f061..cb5e92e60 100644 --- a/swh/web/ui/tests/test_views.py +++ b/swh/web/ui/tests/test_views.py @@ -1,388 +1,388 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from nose.tools import istest from swh.web.ui.tests import test_app from unittest.mock import patch, MagicMock from swh.web.ui.exc import BadInputExc class ViewTestCase(test_app.SWHViewTestCase): render_template = False @istest def info(self): # when rv = self.client.get('/about') self.assertEquals(rv.status_code, 200) self.assert_template_used('about.html') self.assertIn(b'About', rv.data) @istest def search_default(self): # when rv = self.client.get('/search') self.assertEquals(rv.status_code, 200) self.assertEqual(self.get_context_variable('q'), '') self.assertEqual(self.get_context_variable('message'), '') self.assert_template_used('search.html') @patch('swh.web.ui.views.service') @istest def search_content_found(self, mock_service): # given mock_service.lookup_hash.return_value = { 'found': True, 'algo': 'sha1' } # when rv = self.client.get('/search?q=sha1:123') self.assertEquals(rv.status_code, 200) self.assert_template_used('search.html') self.assertEqual(self.get_context_variable('q'), 'sha1:123') self.assertEqual(self.get_context_variable('message'), 'Content with hash sha1:123 found!') mock_service.lookup_hash.assert_called_once_with('sha1:123') @patch('swh.web.ui.views.service') @istest def search_content_not_found(self, mock_service): # given mock_service.lookup_hash.return_value = { 'found': False, 'algo': 'sha1' } # when rv = self.client.get('/search?q=sha1:456') self.assertEquals(rv.status_code, 200) self.assert_template_used('search.html') self.assertEqual(self.get_context_variable('q'), 'sha1:456') self.assertEqual(self.get_context_variable('message'), 'Content with hash sha1:456 not found!') mock_service.lookup_hash.assert_called_once_with('sha1:456') @patch('swh.web.ui.views.service') @istest def search_content_invalid_query(self, mock_service): # given mock_service.lookup_hash = MagicMock( side_effect=BadInputExc('Invalid query!') ) # when rv = self.client.get('/search?q=sha1:invalid-hash') self.assertEquals(rv.status_code, 200) self.assert_template_used('search.html') self.assertEqual(self.get_context_variable('q'), 'sha1:invalid-hash') self.assertEqual(self.get_context_variable('message'), 'Invalid query!') mock_service.lookup_hash.assert_called_once_with('sha1:invalid-hash') @patch('swh.web.ui.views.service') @istest def show_content(self, mock_service): # given stub_content_raw = { 'sha1': 'sha1-hash', - 'data': 'some-data' + 'data': b'some-data' } mock_service.lookup_content_raw.return_value = stub_content_raw # when rv = self.client.get('/browse/content/sha1:sha1-hash/raw') self.assertEquals(rv.status_code, 200) self.assert_template_used('display_content.html') self.assertEqual(self.get_context_variable('message'), 'Content sha1-hash') self.assertEqual(self.get_context_variable('content'), stub_content_raw) mock_service.lookup_content_raw.assert_called_once_with( 'sha1:sha1-hash') @patch('swh.web.ui.views.service') @istest def show_content_not_found(self, mock_service): # given mock_service.lookup_content_raw.return_value = None # when rv = self.client.get('/browse/content/sha1:sha1-unknown/raw') self.assertEquals(rv.status_code, 200) self.assert_template_used('display_content.html') self.assertEqual(self.get_context_variable('message'), 'Content with sha1:sha1-unknown not found.') self.assertEqual(self.get_context_variable('content'), None) mock_service.lookup_content_raw.assert_called_once_with( 'sha1:sha1-unknown') @patch('swh.web.ui.views.service') @istest def show_content_invalid_hash(self, mock_service): # given mock_service.lookup_content_raw.side_effect = BadInputExc( 'Invalid hash') # when rv = self.client.get('/browse/content/sha2:sha1-invalid/raw') self.assertEquals(rv.status_code, 200) self.assert_template_used('display_content.html') self.assertEqual(self.get_context_variable('message'), 'Invalid hash') self.assertEqual(self.get_context_variable('content'), None) mock_service.lookup_content_raw.assert_called_once_with( 'sha2:sha1-invalid') @patch('swh.web.ui.views.service') @patch('swh.web.ui.utils') @istest def browse_directory_bad_input(self, mock_utils, mock_service): # given mock_service.lookup_directory.side_effect = BadInputExc('Invalid hash') # when rv = self.client.get('/browse/directory/sha2-invalid') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('directory.html') self.assertEqual(self.get_context_variable('message'), 'Invalid hash') self.assertEqual(self.get_context_variable('files'), []) mock_service.lookup_directory.assert_called_once_with( 'sha2-invalid') @patch('swh.web.ui.views.service') @patch('swh.web.ui.utils') @istest def browse_directory_empty_result(self, mock_utils, mock_service): # given mock_service.lookup_directory.return_value = None # when rv = self.client.get('/browse/directory/some-sha1') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('directory.html') self.assertEqual(self.get_context_variable('message'), 'Directory some-sha1 not found.') self.assertEqual(self.get_context_variable('files'), []) mock_service.lookup_directory.assert_called_once_with( 'some-sha1') @patch('swh.web.ui.views.service') @patch('swh.web.ui.views.utils') @istest def browse_directory(self, mock_utils, mock_service): # given stub_directory_ls = [ {'type': 'dir', 'target': '123', 'name': 'some-dir-name'}, {'type': 'file', 'sha1': '654', 'name': 'some-filename'}, {'type': 'dir', 'target': '987', 'name': 'some-other-dirname'} ] mock_service.lookup_directory.return_value = stub_directory_ls stub_directory_map = [ {'link': '/path/to/url/dir/123', 'name': 'some-dir-name'}, {'link': '/path/to/url/file/654', 'name': 'some-filename'}, {'link': '/path/to/url/dir/987', 'name': 'some-other-dirname'} ] mock_utils.prepare_directory_listing.return_value = stub_directory_map # when rv = self.client.get('/browse/directory/some-sha1') # then print(self.templates) self.assertEquals(rv.status_code, 200) self.assert_template_used('directory.html') self.assertEqual(self.get_context_variable('message'), 'Listing for directory some-sha1:') self.assertEqual(self.get_context_variable('files'), stub_directory_map) mock_service.lookup_directory.assert_called_once_with( 'some-sha1') mock_utils.prepare_directory_listing.assert_called_once_with( stub_directory_ls) @patch('swh.web.ui.views.service') @istest def content_with_origin_content_not_found(self, mock_service): # given mock_service.lookup_hash.return_value = {'found': False} # when rv = self.client.get('/browse/content/sha256:some-sha256') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('content.html') self.assertEqual(self.get_context_variable('message'), 'Hash sha256:some-sha256 was not found.') mock_service.lookup_hash.assert_called_once_with( 'sha256:some-sha256') mock_service.lookup_hash_origin.called = False @patch('swh.web.ui.views.service') @istest def content_with_origin_bad_input(self, mock_service): # given mock_service.lookup_hash.side_effect = BadInputExc('Invalid hash') # when rv = self.client.get('/browse/content/sha256:some-sha256') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('content.html') self.assertEqual( self.get_context_variable('message'), 'Invalid hash') mock_service.lookup_hash.assert_called_once_with( 'sha256:some-sha256') mock_service.lookup_hash_origin.called = False @patch('swh.web.ui.views.service') @istest def content_with_origin(self, mock_service): # given mock_service.lookup_hash.return_value = {'found': True} mock_service.lookup_hash_origin.return_value = { 'origin_type': 'ftp', 'origin_url': '/some/url', 'revision': 'revision-hash', 'branch': 'master', 'path': '/path/to', } # when rv = self.client.get('/browse/content/sha256:some-sha256') # then self.assertEquals(rv.status_code, 200) self.assert_template_used('content.html') self.assertEqual( self.get_context_variable('message'), "The content with hash sha256:some-sha256 has been seen on " + "origin with type 'ftp'\n" + "at url '/some/url'. The revision was identified at " + "'revision-hash' on branch 'master'.\n" + "The file's path referenced was '/path/to'.") mock_service.lookup_hash.assert_called_once_with( 'sha256:some-sha256') mock_service.lookup_hash_origin.assert_called_once_with( 'sha256:some-sha256') @istest def uploadnsearch_get(self): # when rv = self.client.get('/uploadnsearch') # then self.assertEquals(rv.status_code, 200) self.assertEqual(self.get_context_variable('message'), '') self.assertEqual(self.get_context_variable('filename'), None) self.assertEqual(self.get_context_variable('found'), None) self.assert_template_used('upload_and_search.html') @patch('swh.web.ui.views.service') @patch('swh.web.ui.views.request') @istest def uploadnsearch_post_not_found(self, mock_request, mock_service): # given mock_request.method = 'POST' mock_request.files = dict(filename='foobar') mock_service.upload_and_search.return_value = {'filename': 'foobar', 'sha1': 'blahhash', 'found': False} # when # the mock mock_request completes the post request rv = self.client.post('/uploadnsearch') # then self.assertEquals(rv.status_code, 200) self.assertEqual(self.get_context_variable('message'), 'The file foobar with hash blahhash has not been' ' found.') self.assertEqual(self.get_context_variable('filename'), 'foobar') self.assertEqual(self.get_context_variable('found'), False) self.assertEqual(self.get_context_variable('sha1'), 'blahhash') self.assert_template_used('upload_and_search.html') mock_service.upload_and_search.called = True @patch('swh.web.ui.views.service') @patch('swh.web.ui.views.request') @istest def uploadnsearch_post_found(self, mock_request, mock_service): # given mock_request.method = 'POST' mock_request.files = dict(filename='foobar') mock_service.upload_and_search.return_value = {'filename': 'foobar', 'sha1': 'hash-blah', 'found': True} # when # the mock mock_request completes the post request rv = self.client.post('/uploadnsearch') # then self.assertEquals(rv.status_code, 200) self.assertEqual(self.get_context_variable('message'), 'The file foobar with hash hash-blah has been found.') self.assertEqual(self.get_context_variable('filename'), 'foobar') self.assertEqual(self.get_context_variable('found'), True) self.assertEqual(self.get_context_variable('sha1'), 'hash-blah') self.assert_template_used('upload_and_search.html') mock_service.upload_and_search.called = True @patch('swh.web.ui.views.service') @patch('swh.web.ui.views.request') @istest def uploadnsearch_post_bad_input(self, mock_request, mock_service): # given mock_request.method = 'POST' mock_request.files = dict(filename='foobar') mock_service.upload_and_search.side_effect = BadInputExc( 'Invalid hash') # when # the mock mock_request completes the post request rv = self.client.post('/uploadnsearch') # then self.assertEquals(rv.status_code, 200) self.assertEqual(self.get_context_variable('message'), 'Invalid hash') self.assert_template_used('upload_and_search.html') mock_service.upload_and_search.called = True diff --git a/swh/web/ui/views.py b/swh/web/ui/views.py index b9c11ab7c..547649300 100644 --- a/swh/web/ui/views.py +++ b/swh/web/ui/views.py @@ -1,216 +1,218 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from flask import render_template, flash, request from flask.ext.api.decorators import set_renderers from flask.ext.api.renderers import HTMLRenderer from swh.core.hashutil import ALGORITHMS from swh.web.ui import service, utils from swh.web.ui.exc import BadInputExc from swh.web.ui.main import app hash_filter_keys = ALGORITHMS @app.route('/') @set_renderers(HTMLRenderer) def homepage(): """Home page """ flash('This Web app is still work in progress, use at your own risk', 'warning') # return redirect(url_for('about')) return render_template('home.html') @app.route('/about') @set_renderers(HTMLRenderer) def about(): return render_template('about.html') @app.route('/search') @set_renderers(HTMLRenderer) def search(): """Search for hashes in swh-storage. """ q = request.args.get('q', '') env = {'q': q, 'message': ''} try: if q: r = service.lookup_hash(q) env['message'] = 'Content with hash %s%sfound!' % ( q, ' ' if r['found'] == True else ' not ' ) except BadInputExc as e: env['message'] = str(e) return render_template('search.html', **env) @app.route('/uploadnsearch', methods=['GET', 'POST']) @set_renderers(HTMLRenderer) def uploadnsearch(): """Upload and search for hashes in swh-storage. """ env = {'filename': None, 'message': '', 'found': None} if request.method == 'POST': file = request.files['filename'] try: uploaded_content = service.upload_and_search(file) filename = uploaded_content['filename'] sha1 = uploaded_content['sha1'] found = uploaded_content['found'] message = 'The file %s with hash %s has%sbeen found.' % ( filename, sha1, ' ' if found else ' not ') env.update({ 'filename': filename, 'sha1': sha1, 'found': found, 'message': message }) except BadInputExc as e: env['message'] = str(e) return render_template('upload_and_search.html', **env) def _origin_seen(q, data): """Given an origin, compute a message string with the right information. Args: origin: a dictionary with keys: - origin: a dictionary with type and url keys - occurrence: a dictionary with a validity range Returns: Message as a string """ origin_type = data['origin_type'] origin_url = data['origin_url'] revision = data['revision'] branch = data['branch'] path = data['path'] return """The content with hash %s has been seen on origin with type '%s' at url '%s'. The revision was identified at '%s' on branch '%s'. The file's path referenced was '%s'.""" % (q, origin_type, origin_url, revision, branch, path) @app.route('/browse/content/') @set_renderers(HTMLRenderer) def content_with_origin(q): """Show content information. Args: - q: query string of the form with `algo_hash` in sha1, sha1_git, sha256. This means that several different URLs (at least one per HASH_ALGO) will point to the same content sha: the sha with 'hash' format Returns: The content's information at for a given checksum. """ env = {'q': q} try: content = service.lookup_hash(q) if not content.get('found'): message = "Hash %s was not found." % q else: origin = service.lookup_hash_origin(q) message = _origin_seen(q, origin) except BadInputExc as e: # do not like it but do not duplicate code message = str(e) env['message'] = message return render_template('content.html', **env) @app.route('/browse/content//raw') @set_renderers(HTMLRenderer) def show_content(q): """Given a hash and a checksum, display the content's raw data. Args: q is of the form algo_hash:hash with algo_hash in (sha1, sha1_git, sha256) Returns: Information on one possible origin for such content. Raises: BadInputExc in case of unknown algo_hash or bad hash NotFoundExc if the content is not found. """ env = {} try: content = service.lookup_content_raw(q) if content: + # FIXME: will break if not utf-8 + content['data'] = content['data'].decode('utf-8') message = 'Content %s' % content['sha1'] else: message = 'Content with %s not found.' % q except BadInputExc as e: message = str(e) content = None env['message'] = message env['content'] = content return render_template('display_content.html', **env) @app.route('/browse/directory/') @set_renderers(HTMLRenderer) def browse_directory(sha1_git): """Show directory information. Args: - sha1_git: the directory's sha1 git identifier. Returns: The content's information at sha1_git """ env = {'sha1_git': sha1_git} try: directory_files = service.lookup_directory(sha1_git) if directory_files: message = "Listing for directory %s:" % sha1_git files = utils.prepare_directory_listing(directory_files) else: message = "Directory %s not found." % sha1_git files = [] except BadInputExc as e: # do not like it but do not duplicate code message = str(e) files = [] env['message'] = message env['files'] = files return render_template('directory.html', **env)