Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9123419
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
96 KB
Subscribers
None
View Options
diff --git a/swh/web/ui/api.py b/swh/web/ui/api.py
index 99db425c0..d550564ab 100644
--- a/swh/web/ui/api.py
+++ b/swh/web/ui/api.py
@@ -1,281 +1,281 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from flask import request, url_for
from flask.ext.api.decorators import set_renderers
from swh.web.ui.main import app
from swh.web.ui import service, renderers, utils
from swh.web.ui.exc import BadInputExc, NotFoundExc
@app.route('/browse/')
def api_browse_endpoints():
"""List the current api endpoints starting with /api or /api/.
Returns:
List of endpoints at /api
"""
return utils.filter_endpoints(app.url_map, '/browse')
@app.route('/api/')
def api_main_endpoints():
"""List the current api endpoints starting with /api or /api/.
Returns:
List of endpoints at /api
"""
return utils.filter_endpoints(app.url_map, '/api')
@app.route('/api/1/')
def api_main_v1_endpoints():
"""List the current api v1 endpoints starting with /api/1 or /api/1/.
Returns:
List of endpoints at /api/1
"""
return utils.filter_endpoints(app.url_map, '/api/1')
@app.route('/api/1/stat/counters/')
def api_stats():
"""Return statistics on SWH storage.
Returns:
SWH storage's statistics
"""
return service.stat_counters()
@app.route('/api/1/search/<string:q>/')
def api_search(q):
"""Search a content per hash.
Args:
q is of the form algo_hash:hash with algo_hash in
(sha1, sha1_git, sha256)
Returns:
Dictionary with 'found' key and the associated result.
Raises:
BadInputExc in case of unknown algo_hash or bad hash
"""
return {'found': service.lookup_hash(q)}
def _api_lookup(criteria, lookup_fn, error_msg_if_not_found):
"""Factorize function regarding the api to lookup for data."""
res = lookup_fn(criteria)
if not res:
raise NotFoundExc(error_msg_if_not_found)
return res
@app.route('/api/1/origin/<int:origin_id>/')
def api_origin(origin_id):
"""Return information about origin with id origin_id.
Args:
origin_id: the origin's identifier
Returns:
Information on the origin if found.
Raises:
NotFoundExc if the origin is not found.
"""
return _api_lookup(
origin_id, lookup_fn=service.lookup_origin,
error_msg_if_not_found='Origin with id %s not found.' % origin_id)
@app.route('/api/1/person/<int:person_id>/')
def api_person(person_id):
"""Return information about person with identifier person_id.
Args:
person_id: the person's identifier
Returns:
Information on the person if found.
Raises:
NotFoundExc if the person is not found.
"""
return _api_lookup(
person_id, lookup_fn=service.lookup_person,
error_msg_if_not_found='Person with id %s not found.' % person_id)
@app.route('/api/1/release/<string:sha1_git>/')
def api_release(sha1_git):
"""Return information about release with id sha1_git.
Args:
sha1_git: the release's hash
Returns:
Information on the release if found.
Raises:
BadInputExc in case of unknown algo_hash or bad hash
NotFoundExc if the release is not found.
"""
error_msg = 'Release with sha1_git %s not found.' % sha1_git
return _api_lookup(
sha1_git,
lookup_fn=service.lookup_release,
error_msg_if_not_found=error_msg)
@app.route('/api/1/revision/<string:sha1_git>/')
def api_revision(sha1_git):
"""Return information about revision with id sha1_git.
Args:
sha1_git: the revision's hash
Returns:
Information on the revision if found.
Raises:
BadInputExc in case of unknown algo_hash or bad hash
NotFoundExc if the revision is not found.
"""
error_msg = 'Revision with sha1_git %s not found.' % sha1_git
return _api_lookup(
sha1_git,
lookup_fn=service.lookup_revision,
error_msg_if_not_found=error_msg)
@app.route('/api/1/directory/<string:sha1_git>/')
def api_directory(sha1_git):
"""Return information about release with id sha1_git.
Args:
Directory's sha1_git
Raises:
BadInputExc in case of unknown algo_hash or bad hash
NotFoundExc if the content is not found.
"""
directory_entries = service.lookup_directory(sha1_git)
if not directory_entries:
raise NotFoundExc('Directory with sha1_git %s not found.' % sha1_git)
return list(directory_entries)
@app.route('/api/1/browse/<string:q>/')
def api_content_checksum_to_origin(q):
"""Return content information up to one of its origin if the content
is found.
Args:
q is of the form algo_hash:hash with algo_hash in
(sha1, sha1_git, sha256)
Returns:
Information on one possible origin for such content.
Raises:
BadInputExc in case of unknown algo_hash or bad hash
NotFoundExc if the content is not found.
"""
found = service.lookup_hash(q)['found']
if not found:
raise NotFoundExc('Content with %s not found.' % q)
return service.lookup_hash_origin(q)
@app.route('/api/1/content/<string:q>/raw/')
-@set_renderers(renderers.PlainRenderer)
+@set_renderers(renderers.BytesRenderer)
def api_content_raw(q):
"""Return content's raw data if content is found.
Args:
q is of the form (algo_hash:)hash with algo_hash in
(sha1, sha1_git, sha256).
When algo_hash is not provided, 'hash' is considered sha1.
Returns:
Content's raw data in text/plain.
Raises:
- BadInputExc in case of unknown algo_hash or bad hash
- NotFoundExc if the content is not found.
"""
content = service.lookup_content_raw(q)
if not content:
raise NotFoundExc('Content with %s not found.' % q)
return content['data']
@app.route('/api/1/content/<string:q>/')
def api_content_with_details(q):
"""Return content information if content is found.
Args:
q is of the form (algo_hash:)hash with algo_hash in
(sha1, sha1_git, sha256).
When algo_hash is not provided, 'hash' is considered sha1.
Returns:
Content's information.
Raises:
- BadInputExc in case of unknown algo_hash or bad hash
- NotFoundExc if the content is not found.
"""
content = service.lookup_content(q)
if not content:
raise NotFoundExc('Content with %s not found.' % q)
content['data'] = url_for('api_content_raw', q=content['sha1'])
return content
@app.route('/api/1/uploadnsearch/', methods=['POST'])
def api_uploadnsearch():
"""Upload the file's content in the post body request.
Compute its hash and determine if it exists in the storage.
Args:
request.files filled with the filename's data to upload.
Returns:
Dictionary with 'sha1', 'filename' and 'found' predicate depending
on whether we find it or not.
Raises:
BadInputExc in case of the form submitted is incorrect.
"""
file = request.files.get('filename')
if not file:
raise BadInputExc('Bad request, missing \'filename\' entry in form.')
return service.upload_and_search(file)
diff --git a/swh/web/ui/converters.py b/swh/web/ui/converters.py
index 01f10d617..b30cb1e75 100644
--- a/swh/web/ui/converters.py
+++ b/swh/web/ui/converters.py
@@ -1,173 +1,173 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from swh.core import hashutil
def fmap(f, data):
if isinstance(data, list):
return [f(x) for x in data]
if isinstance(data, dict):
return {k: f(v) for (k, v) in data.items()}
return f(data)
def from_swh(dict_swh, hashess=[], bytess=[], blacklist=[]):
"""Convert from an swh dictionary to something reasonably json
serializable.
Args:
- dict_swh: the origin dictionary needed to be transformed
- hashess: list/set of keys representing hashes values (sha1, sha256,
sha1_git, etc...) as bytes. Those need to be transformed in hexadecimal
string
- bytess: list/set of keys representing bytes values which needs to
be decoded
The remaining keys are copied as is in the output.
Returns:
dictionary equivalent as dict_swh only with its keys `converted`.
"""
def convert_hashes_bytes(v):
"""v is supposedly a hash as bytes, returns it converted in hex.
"""
if v and isinstance(v, bytes):
return hashutil.hash_to_hex(v)
return v
def convert_bytes(v):
"""v is supposedly a bytes string, decode as utf-8.
FIXME: Improve decoding policy.
If not utf-8, break!
"""
if v and isinstance(v, bytes):
return v.decode('utf-8')
return v
if not dict_swh:
return dict_swh
new_dict = {}
for key, value in dict_swh.items():
if isinstance(value, dict):
new_dict[key] = from_swh(value, hashess, bytess, blacklist)
elif key in hashess:
new_dict[key] = fmap(convert_hashes_bytes, value)
elif key in bytess:
new_dict[key] = fmap(convert_bytes, value)
elif key in blacklist:
continue
else:
new_dict[key] = value
return new_dict
def from_origin(origin):
"""Convert from an SWH origin to an origin dictionary.
"""
return from_swh(origin,
hashess=set(['revision']),
bytess=set(['path']))
def from_release(release):
"""Convert from an SWH release to a json serializable release dictionary.
Args:
release: Dict with the following keys
- id: identifier of the revision (sha1 in bytes)
- revision: identifier of the revision the release points to (sha1 in
bytes)
- comment: release's comment message (bytes)
- name: release's name (string)
- author: release's author identifier (swh's id)
- synthetic: the synthetic property (boolean)
Returns:
Release dictionary with the following keys:
- id: hexadecimal sha1 (string)
- revision: hexadecimal sha1 (string)
- comment: release's comment message (string)
- name: release's name (string)
- author: release's author identifier (swh's id)
- synthetic: the synthetic property (boolean)
"""
return from_swh(release,
hashess=set(['id', 'target']),
bytess=set(['message', 'name', 'email']))
def from_revision(revision):
"""Convert from an SWH revision to a json serializable revision dictionary.
Args:
revision: Dict with the following keys
- id: identifier of the revision (sha1 in bytes)
- directory: identifier of the directory the revision points to (sha1
in bytes)
- author_name, author_email: author's revision name and email
- committer_name, committer_email: committer's revision name and email
- message: revision's message
- date, date_offset: revision's author date
- committer_date, committer_date_offset: revision's commit date
- parents: list of parents for such revision
- synthetic: revision's property nature
- type: revision's type (git, tar or dsc at the moment)
- metadata: if the revision is synthetic, this can reference dynamic
properties.
Returns:
Revision dictionary with the same keys as inputs, only:
- sha1s are in hexadecimal strings (id, directory)
- bytes are decoded in string (author_name, committer_name,
author_email, committer_email, message)
- remaining keys are left as is
"""
return from_swh(revision,
hashess=set(['id', 'directory', 'parents']),
bytess=set(['name',
'email',
'message']))
def from_content(content):
"""Convert swh content to serializable content dictionary.
"""
return from_swh(content,
- hashess=set(['sha1', 'sha1_git', 'sha256']),
- bytess=set(['data']),
- blacklist=set(['status']))
+ hashess={'sha1', 'sha1_git', 'sha256'},
+ bytess={},
+ blacklist={'status'})
def from_person(person):
"""Convert swh person to serializable person dictionary.
"""
return from_swh(person,
hashess=set(),
bytess=set(['name', 'email']))
def from_directory_entry(dir_entry):
"""Convert swh person to serializable person dictionary.
"""
return from_swh(dir_entry,
hashess=set(['dir_id',
'sha1_git',
'sha1',
'sha256',
'target']),
bytess=set(['name']))
diff --git a/swh/web/ui/renderers.py b/swh/web/ui/renderers.py
index 57669cab5..b86631254 100644
--- a/swh/web/ui/renderers.py
+++ b/swh/web/ui/renderers.py
@@ -1,142 +1,151 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import yaml
from flask import make_response, request
from flask.ext.api import renderers, parsers
from flask_api.mediatypes import MediaType
from swh.web.ui import utils
class SWHFilterEnricher():
"""Global filter on fields.
"""
def filter_by_fields(self, data):
"""Extract a request parameter 'fields' if it exists to permit the
filtering on the data dict's keys.
If such field is not provided, returns the data as is.
"""
fields = request.args.get('fields')
if fields:
fields = set(fields.split(','))
data = utils.filter_field_keys(data, fields)
return data
-class PlainRenderer(renderers.BaseRenderer):
+class DoNothingRenderer(renderers.BaseRenderer):
+ def render(self, data, media_type, **options):
+ return data
+
+
+class PlainRenderer(DoNothingRenderer):
"""Renderer for plain/text, do nothing but send the data as is.
"""
media_type = 'text/plain'
- def render(self, data, media_type, **options):
- return data
+
+class BytesRenderer(DoNothingRenderer):
+ """Renderer for plain/text, do nothing but send the data as is.
+
+ """
+ media_type = 'application/octet-stream'
class YAMLRenderer(renderers.BaseRenderer, SWHFilterEnricher):
"""Renderer for application/yaml.
Orchestrate from python data structure to yaml.
"""
media_type = 'application/yaml'
def render(self, data, media_type, **options):
data = self.filter_by_fields(data)
return yaml.dump(data, encoding=self.charset)
class JSONPEnricher():
"""JSONP rendering.
"""
def enrich_with_jsonp(self, data):
"""Defines a jsonp function that extracts a potential 'callback'
request parameter holding the function name and wraps the data
inside a call to such function
e.g:
GET /blah/foo/bar renders: {'output': 'wrapped'}
GET /blah/foo/bar?callback=fn renders: fn({'output': 'wrapped'})
"""
jsonp = request.args.get('callback')
if jsonp:
return '%s(%s)' % (jsonp, data)
return data
class SWHJSONRenderer(renderers.JSONRenderer,
SWHFilterEnricher,
JSONPEnricher):
"""Renderer for application/json.
Serializes in json the data and returns it.
Also deals with jsonp. If callback is found in request parameter,
wrap the result as a function with name the value of the parameter
query 'callback'.
"""
def render(self, data, media_type, **options):
data = self.filter_by_fields(data)
res = super().render(data, media_type, **options)
return self.enrich_with_jsonp(res)
class SWHBrowsableAPIRenderer(renderers.BrowsableAPIRenderer):
"""SWH's browsable api renderer.
"""
template = "api.html"
RENDERERS = [
'swh.web.ui.renderers.SWHJSONRenderer',
'swh.web.ui.renderers.SWHBrowsableAPIRenderer',
'flask.ext.api.parsers.URLEncodedParser',
'swh.web.ui.renderers.YAMLRenderer',
'swh.web.ui.renderers.PlainRenderer',
]
RENDERERS_INSTANCE = [
SWHJSONRenderer(),
SWHBrowsableAPIRenderer(),
parsers.URLEncodedParser(),
YAMLRenderer(),
]
RENDERERS_BY_TYPE = {
r.media_type: r
for r in RENDERERS_INSTANCE
}
def error_response(default_error_msg, error_code, error):
"""Private function to create a custom error response.
"""
# if nothing is requested by client, use json
default_application_type = 'application/json'
accept_type = request.headers.get('Accept', default_application_type)
renderer = RENDERERS_BY_TYPE.get(
accept_type,
RENDERERS_BY_TYPE[default_application_type])
# for edge cases, use the elected renderer's media type
accept_type = renderer.media_type
response = make_response(default_error_msg, error_code)
response.headers['Content-Type'] = accept_type
response.data = renderer.render({"error": str(error)},
media_type=MediaType(accept_type),
status=error_code,
headers={'Content-Type': accept_type})
return response
diff --git a/swh/web/ui/tests/test_api.py b/swh/web/ui/tests/test_api.py
index 41ffd6121..0257bd96e 100644
--- a/swh/web/ui/tests/test_api.py
+++ b/swh/web/ui/tests/test_api.py
@@ -1,535 +1,535 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
import yaml
from nose.tools import istest
from unittest.mock import patch, MagicMock
from swh.web.ui.tests import test_app
class ApiTestCase(test_app.SWHApiTestCase):
@patch('swh.web.ui.api.service')
@istest
def api_content_checksum_to_origin(self, mock_service):
mock_service.lookup_hash.return_value = {'found': True}
stub_origin = {
"lister": None,
"url": "rsync://ftp.gnu.org/old-gnu/webbase",
"type": "ftp",
"id": 2,
"project": None
}
mock_service.lookup_hash_origin.return_value = stub_origin
# when
rv = self.app.get(
'/api/1/browse/sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, stub_origin)
mock_service.lookup_hash.assert_called_once_with(
'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
mock_service.lookup_hash_origin.assert_called_once_with(
'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
@patch('swh.web.ui.api.service')
@istest
def api_content_checksum_to_origin_sha_not_found(self, mock_service):
# given
mock_service.lookup_hash.return_value = {'found': False}
# when
rv = self.app.get(
'/api/1/browse/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Content with sha1:40e71b8614fcd89ccd17ca2b1d9e6'
'6c5b00a6d03 not found.'
})
mock_service.lookup_hash.assert_called_once_with(
'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
@patch('swh.web.ui.api.service')
@istest
def api_content_with_details(self, mock_service):
# given
mock_service.lookup_content.return_value = {
'data': 'some content data',
'sha1': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'sha1_git': 'b4e8f472ffcb01a03875b26e462eb568739f6882',
'sha256': '83c0e67cc80f60caf1fcbec2d84b0ccd7968b3be4735637006560'
'cde9b067a4f',
'length': 17,
'status': 'visible'
}
# when
rv = self.app.get(
'/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'data': '/api/1/content/'
'40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/raw/',
'sha1': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'sha1_git': 'b4e8f472ffcb01a03875b26e462eb568739f6882',
'sha256': '83c0e67cc80f60caf1fcbec2d84b0ccd7968b3be4735637006560c'
'de9b067a4f',
'length': 17,
'status': 'visible'
})
mock_service.lookup_content.assert_called_once_with(
'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
@patch('swh.web.ui.api.service')
@istest
def api_content_not_found_as_json(self, mock_service):
# given
mock_service.lookup_content.return_value = None
mock_service.lookup_hash_origin = MagicMock()
# when
rv = self.app.get(
'/api/1/content/sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3'
'be4735637006560c/')
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Content with sha256:83c0e67cc80f60caf1fcbec2d84b0ccd79'
'68b3be4735637006560c not found.'
})
mock_service.lookup_content.assert_called_once_with(
'sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3'
'be4735637006560c')
mock_service.lookup_hash_origin.called = False
@patch('swh.web.ui.api.service')
@istest
def api_content_not_found_as_yaml(self, mock_service):
# given
mock_service.lookup_content.return_value = None
mock_service.lookup_hash_origin = MagicMock()
# when
rv = self.app.get(
'/api/1/content/sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3'
'be4735637006560c/',
headers={'accept': 'application/yaml'})
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/yaml')
response_data = yaml.load(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Content with sha256:83c0e67cc80f60caf1fcbec2d84b0ccd79'
'68b3be4735637006560c not found.'
})
mock_service.lookup_content.assert_called_once_with(
'sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3'
'be4735637006560c')
mock_service.lookup_hash_origin.called = False
@patch('swh.web.ui.api.service')
@istest
def api_content_raw(self, mock_service):
# given
stub_content = {'data': 'some content data'}
mock_service.lookup_content_raw.return_value = stub_content
# when
rv = self.app.get(
'/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'
'/raw/',
headers={'Content-Type': 'text/plain'})
self.assertEquals(rv.status_code, 200)
- self.assertEquals(rv.mimetype, 'text/plain')
+ self.assertEquals(rv.mimetype, 'application/octet-stream')
self.assertEquals(rv.data.decode('utf-8'), stub_content['data'])
mock_service.lookup_content_raw.assert_called_once_with(
'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
@patch('swh.web.ui.api.service')
@istest
def api_content_raw_not_found(self, mock_service):
# given
mock_service.lookup_content_raw.return_value = None
# when
rv = self.app.get(
'/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'
'/raw/',
headers={'Content-Type': 'text/plain'})
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Content with sha1:40e71b8614fcd89ccd17ca2b1d9e6'
'6c5b00a6d03 not found.'
})
mock_service.lookup_content_raw.assert_called_once_with(
'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
@patch('swh.web.ui.api.service')
@istest
def api_search(self, mock_service):
# given
mock_service.lookup_hash.return_value = True
# when
rv = self.app.get('/api/1/search/sha1:blah/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {'found': True})
mock_service.lookup_hash.assert_called_once_with('sha1:blah')
@patch('swh.web.ui.api.service')
@istest
def api_search_as_yaml(self, mock_service):
# given
mock_service.lookup_hash.return_value = True
# when
rv = self.app.get('/api/1/search/sha1:halb/',
headers={'Accept': 'application/yaml'})
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/yaml')
response_data = yaml.load(rv.data.decode('utf-8'))
self.assertEquals(response_data, {'found': True})
mock_service.lookup_hash.assert_called_once_with('sha1:halb')
@patch('swh.web.ui.api.service')
@istest
def api_search_not_found(self, mock_service):
# given
mock_service.lookup_hash.return_value = False
# when
rv = self.app.get('/api/1/search/sha1:halb/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {'found': False})
mock_service.lookup_hash.assert_called_once_with('sha1:halb')
@patch('swh.web.ui.api.service')
@istest
def api_1_stat_counters_raise_error(self, mock_service):
# given
mock_service.stat_counters.side_effect = ValueError(
'voluntary error to check the bad request middleware.')
# when
rv = self.app.get('/api/1/stat/counters/')
# then
self.assertEquals(rv.status_code, 400)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'voluntary error to check the bad request middleware.'})
@patch('swh.web.ui.api.service')
@istest
def api_1_stat_counters(self, mock_service):
# given
stub_stats = {
"content": 1770830,
"directory": 211683,
"directory_entry_dir": 209167,
"directory_entry_file": 1807094,
"directory_entry_rev": 0,
"entity": 0,
"entity_history": 0,
"occurrence": 0,
"occurrence_history": 19600,
"origin": 1096,
"person": 0,
"release": 8584,
"revision": 7792,
"revision_history": 0,
"skipped_content": 0
}
mock_service.stat_counters.return_value = stub_stats
# when
rv = self.app.get('/api/1/stat/counters/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, stub_stats)
mock_service.stat_counters.assert_called_once_with()
@patch('swh.web.ui.api.service')
@patch('swh.web.ui.api.request')
@istest
def api_uploadnsearch(self, mock_request, mock_service):
# given
mock_request.files = {'filename': 'simple-filename'}
mock_service.upload_and_search.return_value = {
'filename': 'simple-filename',
'sha1': 'some-hex-sha1',
'found': False,
}
# when
rv = self.app.post('/api/1/uploadnsearch/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {'filename': 'simple-filename',
'sha1': 'some-hex-sha1',
'found': False})
mock_service.upload_and_search.assert_called_once_with(
'simple-filename')
@patch('swh.web.ui.api.service')
@istest
def api_origin(self, mock_service):
# given
stub_origin = {
'id': 1234,
'lister': 'uuid-lister-0',
'project': 'uuid-project-0',
'url': 'ftp://some/url/to/origin/0',
'type': 'ftp'
}
mock_service.lookup_origin.return_value = stub_origin
# when
rv = self.app.get('/api/1/origin/1234/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, stub_origin)
mock_service.lookup_origin.assert_called_with(1234)
@patch('swh.web.ui.api.service')
@istest
def api_origin_not_found(self, mock_service):
# given
mock_service.lookup_origin.return_value = None
# when
rv = self.app.get('/api/1/origin/4321/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Origin with id 4321 not found.'
})
mock_service.lookup_origin.assert_called_with(4321)
@patch('swh.web.ui.api.service')
@istest
def api_release(self, mock_service):
# given
stub_release = {
'id': 'release-0',
'revision': 'revision-sha1',
'author_name': 'author release name',
'author_email': 'author@email',
}
mock_service.lookup_release.return_value = stub_release
# when
rv = self.app.get('/api/1/release/release-0/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, stub_release)
@patch('swh.web.ui.api.service')
@istest
def api_release_not_found(self, mock_service):
# given
mock_service.lookup_release.return_value = None
# when
rv = self.app.get('/api/1/release/release-0/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Release with sha1_git release-0 not found.'
})
@patch('swh.web.ui.api.service')
@istest
def api_revision(self, mock_service):
# given
stub_revision = {
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'author_name': 'Software Heritage',
'author_email': 'robot@softwareheritage.org',
'committer_name': 'Software Heritage',
'committer_email': 'robot@softwareheritage.org',
'message': 'synthetic revision message',
'date_offset': 0,
'committer_date_offset': 0,
'parents': [],
'type': 'tar',
'synthetic': True,
'metadata': {
'original_artifact': [{
'archive_type': 'tar',
'name': 'webbase-5.7.0.tar.gz',
'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd',
'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1',
'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f'
'309d36484e7edf7bb912'
}]
},
}
mock_service.lookup_revision.return_value = stub_revision
# when
rv = self.app.get('/api/1/revision/revision-0/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, stub_revision)
@patch('swh.web.ui.api.service')
@istest
def api_revision_not_found(self, mock_service):
# given
mock_service.lookup_revision.return_value = None
# when
rv = self.app.get('/api/1/revision/revision-0/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Revision with sha1_git revision-0 not found.'})
@patch('swh.web.ui.api.service')
@istest
def api_person(self, mock_service):
# given
stub_person = {
'id': '198003',
'name': 'Software Heritage',
'email': 'robot@softwareheritage.org',
}
mock_service.lookup_person.return_value = stub_person
# when
rv = self.app.get('/api/1/person/198003/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, stub_person)
@patch('swh.web.ui.api.service')
@istest
def api_person_not_found(self, mock_service):
# given
mock_service.lookup_person.return_value = None
# when
rv = self.app.get('/api/1/person/666/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Person with id 666 not found.'})
@patch('swh.web.ui.api.service')
@istest
def api_directory(self, mock_service):
# given
stub_directory = [{
'sha1_git': '18d8be353ed3480476f032475e7c233eff7371d5',
}]
mock_service.lookup_directory.return_value = stub_directory
# when
rv = self.app.get('/api/1/directory/'
'18d8be353ed3480476f032475e7c233eff7371d5/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, stub_directory)
@patch('swh.web.ui.api.service')
@istest
def api_directory_not_found(self, mock_service):
# given
mock_service.lookup_directory.return_value = []
# when
rv = self.app.get('/api/1/directory/'
'66618d8be353ed3480476f032475e7c233eff737/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Directory with sha1_git '
'66618d8be353ed3480476f032475e7c233eff737 not found.'})
diff --git a/swh/web/ui/tests/test_converters.py b/swh/web/ui/tests/test_converters.py
index 649543424..ef8e720d4 100644
--- a/swh/web/ui/tests/test_converters.py
+++ b/swh/web/ui/tests/test_converters.py
@@ -1,362 +1,362 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import unittest
from nose.tools import istest
from swh.core import hashutil
from swh.web.ui import converters
class ConvertersTestCase(unittest.TestCase):
@istest
def from_swh(self):
some_input = {
'a': 'something',
'b': 'someone',
'c': b'sharp-0.3.4.tgz',
'd': hashutil.hex_to_hash(
'b04caf10e9535160d90e874b45aa426de762f19f'),
'e': b'sharp.html/doc_002dS_005fISREG.html',
'g': [b'utf-8-to-decode', b'another-one'],
'h': 'something filtered',
'i': {'e': b'something'},
'j': {
'k': {
'l': [b'bytes thing', b'another thingy'],
'n': 'dont care either'
},
'm': 'dont care'
}
}
expected_output = {
'a': 'something',
'b': 'someone',
'c': 'sharp-0.3.4.tgz',
'd': 'b04caf10e9535160d90e874b45aa426de762f19f',
'e': 'sharp.html/doc_002dS_005fISREG.html',
'g': ['utf-8-to-decode', 'another-one'],
'i': {'e': 'something'},
'j': {
'k': {
'l': ['bytes thing', 'another thingy']
}
},
}
actual_output = converters.from_swh(some_input,
hashess={'d'},
bytess={'c', 'e', 'g', 'l'},
blacklist={'h', 'm', 'n'})
self.assertEquals(expected_output, actual_output)
@istest
def from_swh_edge_cases_do_no_conversion_if_none_or_not_bytes(self):
some_input = {
'a': 'something',
'b': None,
'c': 'someone',
'd': None,
}
expected_output = {
'a': 'something',
'b': None,
'c': 'someone',
'd': None,
}
actual_output = converters.from_swh(some_input,
hashess={'a', 'b'},
bytess={'c', 'd'})
self.assertEquals(expected_output, actual_output)
@istest
def from_swh_empty(self):
# when
self.assertEquals({}, converters.from_swh({}))
@istest
def from_swh_none(self):
# when
self.assertIsNone(converters.from_swh(None))
@istest
def from_origin(self):
# given
origin_input = {
'origin_type': 'ftp',
'origin_url': 'rsync://ftp.gnu.org/gnu/octave',
'branch': 'octave-3.4.0.tar.gz',
'revision': b'\xb0L\xaf\x10\xe9SQ`\xd9\x0e\x87KE\xaaBm\xe7b\xf1\x9f', # noqa
'path': b'octave-3.4.0/doc/interpreter/octave.html/doc_002dS_005fISREG.html' # noqa
}
expected_origin = {
'origin_type': 'ftp',
'origin_url': 'rsync://ftp.gnu.org/gnu/octave',
'branch': 'octave-3.4.0.tar.gz',
'revision': 'b04caf10e9535160d90e874b45aa426de762f19f',
'path': 'octave-3.4.0/doc/interpreter/octave.html/doc_002dS_005fISREG.html' # noqa
}
# when
actual_origin = converters.from_origin(origin_input)
# then
self.assertEqual(actual_origin, expected_origin)
@istest
def from_release(self):
release_input = {
'id': hashutil.hex_to_hash(
'aad23fa492a0c5fed0708a6703be875448c86884'),
'target': hashutil.hex_to_hash(
'5e46d564378afc44b31bb89f99d5675195fbdf67'),
'target_type': 'revision',
'date': datetime.datetime(2015, 1, 1, 22, 0, 0,
tzinfo=datetime.timezone.utc),
'author': {
'name': b'author name',
'email': b'author@email',
},
'name': b'v0.0.1',
'message': b'some comment on release',
'synthetic': True,
}
expected_release = {
'id': 'aad23fa492a0c5fed0708a6703be875448c86884',
'target': '5e46d564378afc44b31bb89f99d5675195fbdf67',
'target_type': 'revision',
'date': datetime.datetime(2015, 1, 1, 22, 0, 0,
tzinfo=datetime.timezone.utc),
'author': {
'name': 'author name',
'email': 'author@email',
},
'name': 'v0.0.1',
'message': 'some comment on release',
'target_type': 'revision',
'synthetic': True,
}
# when
actual_release = converters.from_release(release_input)
# then
self.assertEqual(actual_release, expected_release)
@istest
def from_release_no_revision(self):
release_input = {
'id': hashutil.hex_to_hash(
'b2171ee2bdf119cd99a7ec7eff32fa8013ef9a4e'),
'target': None,
'date': datetime.datetime(2016, 3, 2, 10, 0, 0,
tzinfo=datetime.timezone.utc),
'name': b'v0.1.1',
'message': b'comment on release',
'synthetic': False,
'author': {
'name': b'bob',
'email': b'bob@alice.net',
},
}
expected_release = {
'id': 'b2171ee2bdf119cd99a7ec7eff32fa8013ef9a4e',
'target': None,
'date': datetime.datetime(2016, 3, 2, 10, 0, 0,
tzinfo=datetime.timezone.utc),
'name': 'v0.1.1',
'message': 'comment on release',
'synthetic': False,
'author': {
'name': 'bob',
'email': 'bob@alice.net',
},
}
# when
actual_release = converters.from_release(release_input)
# then
self.assertEqual(actual_release, expected_release)
@istest
def from_revision(self):
revision_input = {
'id': hashutil.hex_to_hash(
'18d8be353ed3480476f032475e7c233eff7371d5'),
'directory': hashutil.hex_to_hash(
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'),
'author': {
'name': b'Software Heritage',
'email': b'robot@softwareheritage.org',
},
'committer': {
'name': b'Software Heritage',
'email': b'robot@softwareheritage.org',
},
'message': b'synthetic revision message',
'date': datetime.datetime(2000, 1, 17, 11, 23, 54, tzinfo=None),
'date_offset': 0,
'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54,
tzinfo=None),
'committer_date_offset': 0,
'synthetic': True,
'type': 'tar',
'parents': [
hashutil.hex_to_hash(
'29d8be353ed3480476f032475e7c244eff7371d5'),
hashutil.hex_to_hash(
'30d8be353ed3480476f032475e7c244eff7371d5')
],
'metadata': {
'original_artifact': [{
'archive_type': 'tar',
'name': 'webbase-5.7.0.tar.gz',
'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd',
'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1',
'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f'
'309d36484e7edf7bb912',
}]
},
}
expected_revision = {
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'author': {
'name': 'Software Heritage',
'email': 'robot@softwareheritage.org',
},
'committer': {
'name': 'Software Heritage',
'email': 'robot@softwareheritage.org',
},
'message': 'synthetic revision message',
'date': datetime.datetime(2000, 1, 17, 11, 23, 54, tzinfo=None),
'date_offset': 0,
'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54,
tzinfo=None),
'committer_date_offset': 0,
'parents': [
'29d8be353ed3480476f032475e7c244eff7371d5',
'30d8be353ed3480476f032475e7c244eff7371d5'
],
'type': 'tar',
'synthetic': True,
'metadata': {
'original_artifact': [{
'archive_type': 'tar',
'name': 'webbase-5.7.0.tar.gz',
'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd',
'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1',
'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f'
'309d36484e7edf7bb912'
}]
},
}
# when
actual_revision = converters.from_revision(revision_input)
# then
self.assertEqual(actual_revision, expected_revision)
@istest
def from_content(self):
content_input = {
'sha1': hashutil.hex_to_hash('5c6f0e2750f48fa0bd0c4cf5976ba0b9e0'
'2ebda5'),
'sha256': hashutil.hex_to_hash('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926'),
'sha1_git': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'data': b'data in bytes',
'length': 10,
'status': 'visible',
}
# 'status' is filtered
expected_content = {
'sha1': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5',
'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d274'
'7d3bf96c926',
'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
- 'data': 'data in bytes',
+ 'data': b'data in bytes',
'length': 10,
}
# when
actual_content = converters.from_content(content_input)
# then
self.assertEqual(actual_content, expected_content)
@istest
def from_person(self):
person_input = {
'id': 10,
'anything': 'else',
'name': b'bob',
'email': b'bob@foo.alice',
}
expected_person = {
'id': 10,
'anything': 'else',
'name': 'bob',
'email': 'bob@foo.alice',
}
# when
actual_person = converters.from_person(person_input)
# then
self.assertEqual(actual_person, expected_person)
@istest
def from_directory_entries(self):
dir_entries_input = {
'sha1': hashutil.hex_to_hash('5c6f0e2750f48fa0bd0c4cf5976ba0b9e0'
'2ebda5'),
'sha256': hashutil.hex_to_hash('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926'),
'sha1_git': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'target': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'dir_id': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'name': b'bob',
'type': 10,
}
expected_dir_entries = {
'sha1': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5',
'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d2747'
'd3bf96c926',
'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'target': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'dir_id': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'name': 'bob',
'type': 10,
}
# when
actual_dir_entries = converters.from_directory_entry(dir_entries_input)
# then
self.assertEqual(actual_dir_entries, expected_dir_entries)
diff --git a/swh/web/ui/tests/test_renderers.py b/swh/web/ui/tests/test_renderers.py
index 72075366e..3239c840e 100644
--- a/swh/web/ui/tests/test_renderers.py
+++ b/swh/web/ui/tests/test_renderers.py
@@ -1,190 +1,215 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
import unittest
import yaml
from flask_api.mediatypes import MediaType
from nose.tools import istest
from unittest.mock import patch
from swh.web.ui import renderers
class RendererTestCase(unittest.TestCase):
@patch('swh.web.ui.renderers.request')
@istest
def SWHFilterRenderer_do_nothing(self, mock_request):
# given
mock_request.args = {}
swhFilterRenderer = renderers.SWHFilterEnricher()
input_data = {'a': 'some-data'}
# when
actual_data = swhFilterRenderer.filter_by_fields(input_data)
# then
self.assertEquals(actual_data, input_data)
@patch('swh.web.ui.renderers.utils')
@patch('swh.web.ui.renderers.request')
@istest
def SWHFilterRenderer_do_filter(self, mock_request, mock_utils):
# given
mock_request.args = {'fields': 'a,c'}
mock_utils.filter_field_keys.return_value = {'a': 'some-data'}
swhFilterRenderer = renderers.SWHFilterEnricher()
input_data = {'a': 'some-data',
'b': 'some-other-data'}
# when
actual_data = swhFilterRenderer.filter_by_fields(input_data)
# then
self.assertEquals(actual_data, {'a': 'some-data'})
mock_utils.filter_field_keys.assert_called_once_with(input_data,
{'a', 'c'})
+ @istest
+ def doNothingRenderer(self):
+ # given
+ doNothingRenderer = renderers.DoNothingRenderer()
+ input_data = 'some data'
+
+ # when
+ actual_data = doNothingRenderer.render(input_data, 'some-media-type')
+
+ # then
+ self.assertEqual(actual_data, input_data) # do nothing on data
+
@istest
def plainRenderer(self):
# given
plainRenderer = renderers.PlainRenderer()
input_data = 'some data'
# when
actual_data = plainRenderer.render(input_data, 'some-media-type')
# then
self.assertEqual(actual_data, input_data) # do nothing on data
+ @istest
+ def bytesRenderer(self):
+ # given
+ bytesRenderer = renderers.BytesRenderer()
+ input_data = b'some data'
+
+ # when
+ actual_data = bytesRenderer.render(input_data, 'some-media-type')
+
+ # then
+ self.assertEqual('application/octet-stream', bytesRenderer.media_type)
+ self.assertEqual(actual_data, input_data) # do nothing on data
+
@patch('swh.web.ui.renderers.request')
@istest
def yamlRenderer_without_filter(self, mock_request):
# given
mock_request.args = {}
yamlRenderer = renderers.YAMLRenderer()
input_data = {'target': 'sha1-dir',
'type': 'dir',
'dir-id': 'dir-id-sha1-git'}
expected_data = input_data
# when
actual_data = yamlRenderer.render(input_data, 'application/yaml')
# then
self.assertEqual(yaml.load(actual_data), expected_data)
@patch('swh.web.ui.renderers.request')
@istest
def yamlRenderer(self, mock_request):
# given
mock_request.args = {'fields': 'type,target'}
yamlRenderer = renderers.YAMLRenderer()
input_data = {'target': 'sha1-dir',
'type': 'dir',
'dir-id': 'dir-id-sha1-git'}
expected_data = {'target': 'sha1-dir', 'type': 'dir'}
# when
actual_data = yamlRenderer.render(input_data, 'application/yaml')
# then
self.assertEqual(yaml.load(actual_data), expected_data)
@patch('swh.web.ui.renderers.request')
@istest
def jsonRenderer_basic(self, mock_request):
# given
mock_request.args = {}
jsonRenderer = renderers.SWHJSONRenderer()
input_data = {'target': 'sha1-dir',
'type': 'dir',
'dir-id': 'dir-id-sha1-git'}
expected_data = input_data
# when
actual_data = jsonRenderer.render(input_data, MediaType(
'application/json'))
# then
self.assertEqual(json.loads(actual_data), expected_data)
@patch('swh.web.ui.renderers.request')
@istest
def jsonRenderer_basic_with_filter(self, mock_request):
# given
mock_request.args = {'fields': 'target'}
jsonRenderer = renderers.SWHJSONRenderer()
input_data = {'target': 'sha1-dir',
'type': 'dir',
'dir-id': 'dir-id-sha1-git'}
expected_data = {'target': 'sha1-dir'}
# when
actual_data = jsonRenderer.render(input_data, MediaType(
'application/json'))
# then
self.assertEqual(json.loads(actual_data), expected_data)
@patch('swh.web.ui.renderers.request')
@istest
def jsonRenderer_basic_with_filter_and_jsonp(self, mock_request):
# given
mock_request.args = {'fields': 'target',
'callback': 'jsonpfn'}
jsonRenderer = renderers.SWHJSONRenderer()
input_data = {'target': 'sha1-dir',
'type': 'dir',
'dir-id': 'dir-id-sha1-git'}
# when
actual_data = jsonRenderer.render(input_data, MediaType(
'application/json'))
# then
self.assertEqual(actual_data, 'jsonpfn({"target": "sha1-dir"})')
@patch('swh.web.ui.renderers.request')
@istest
def jsonpEnricher_basic_with_filter_and_jsonp(self, mock_request):
# given
mock_request.args = {'callback': 'jsonpfn'}
jsonpEnricher = renderers.JSONPEnricher()
# when
actual_output = jsonpEnricher.enrich_with_jsonp({'output': 'test'})
# then
self.assertEqual(actual_output, "jsonpfn({'output': 'test'})")
@patch('swh.web.ui.renderers.request')
@istest
def jsonpEnricher_do_nothing(self, mock_request):
# given
mock_request.args = {}
jsonpEnricher = renderers.JSONPEnricher()
# when
actual_output = jsonpEnricher.enrich_with_jsonp({'output': 'test'})
# then
self.assertEqual(actual_output, {'output': 'test'})
diff --git a/swh/web/ui/tests/test_service.py b/swh/web/ui/tests/test_service.py
index fffddbbcc..ae089bef8 100644
--- a/swh/web/ui/tests/test_service.py
+++ b/swh/web/ui/tests/test_service.py
@@ -1,545 +1,545 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
from nose.tools import istest
from unittest.mock import MagicMock, patch
from swh.core.hashutil import hex_to_hash
from swh.web.ui import service
from swh.web.ui.exc import BadInputExc
from swh.web.ui.tests import test_app
class ServiceTestCase(test_app.SWHApiTestCase):
@istest
def lookup_hash_does_not_exist(self):
# given
self.storage.content_find = MagicMock(return_value=None)
# when
actual_lookup = service.lookup_hash(
'sha1_git:123caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEquals({'found': None,
'algo': 'sha1_git'}, actual_lookup)
# check the function has been called with parameters
self.storage.content_find.assert_called_with({
'sha1_git':
hex_to_hash('123caf10e9535160d90e874b45aa426de762f19f')
})
@istest
def lookup_hash_exist(self):
# given
stub_content = {
'sha1': hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')
}
self.storage.content_find = MagicMock(return_value=stub_content)
# when
actual_lookup = service.lookup_hash(
'sha1:456caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEquals({'found': stub_content,
'algo': 'sha1'}, actual_lookup)
self.storage.content_find.assert_called_with({
'sha1':
hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f'),
})
@istest
def lookup_hash_origin(self):
# given
self.storage.content_find_occurrence = MagicMock(return_value={
'origin_type': 'sftp',
'origin_url': 'sftp://ftp.gnu.org/gnu/octave',
'branch': 'octavio-3.4.0.tar.gz',
'revision': b'\xb0L\xaf\x10\xe9SQ`\xd9\x0e\x87KE\xaaBm\xe7b\xf1\x9f', # noqa
'path': b'octavio-3.4.0/doc/interpreter/octave.html/doc_002dS_005fISREG.html' # noqa
})
expected_origin = {
'origin_type': 'sftp',
'origin_url': 'sftp://ftp.gnu.org/gnu/octave',
'branch': 'octavio-3.4.0.tar.gz',
'revision': 'b04caf10e9535160d90e874b45aa426de762f19f',
'path': 'octavio-3.4.0/doc/interpreter/octave.html/doc'
'_002dS_005fISREG.html'
}
# when
actual_origin = service.lookup_hash_origin(
'sha1_git:456caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEqual(actual_origin, expected_origin)
self.storage.content_find_occurrence.assert_called_with(
{'sha1_git':
hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')})
@istest
def stat_counters(self):
# given
input_stats = {
"content": 1770830,
"directory": 211683,
"directory_entry_dir": 209167,
"directory_entry_file": 1807094,
"directory_entry_rev": 0,
"entity": 0,
"entity_history": 0,
"occurrence": 0,
"occurrence_history": 19600,
"origin": 1096,
"person": 0,
"release": 8584,
"revision": 7792,
"revision_history": 0,
"skipped_content": 0
}
self.storage.stat_counters = MagicMock(return_value=input_stats)
# when
actual_stats = service.stat_counters()
# then
expected_stats = input_stats
self.assertEqual(actual_stats, expected_stats)
self.storage.stat_counters.assert_called_with()
@patch('swh.web.ui.service.hashutil')
@istest
def hash_and_search(self, mock_hashutil):
# given
bhash = hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')
mock_hashutil.hashfile.return_value = {'sha1': bhash}
self.storage.content_find = MagicMock(return_value={
'sha1': bhash,
'sha1_git': bhash,
})
# when
actual_content = service.hash_and_search('/some/path')
# then
self.assertEqual(actual_content, {
'sha1': '456caf10e9535160d90e874b45aa426de762f19f',
'sha1_git': '456caf10e9535160d90e874b45aa426de762f19f',
'found': True,
})
mock_hashutil.hashfile.assert_called_once_with('/some/path')
self.storage.content_find.assert_called_once_with({'sha1': bhash})
@patch('swh.web.ui.service.hashutil')
@istest
def hash_and_search_not_found(self, mock_hashutil):
# given
bhash = hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')
mock_hashutil.hashfile.return_value = {'sha1': bhash}
mock_hashutil.hash_to_hex = MagicMock(
return_value='456caf10e9535160d90e874b45aa426de762f19f')
self.storage.content_find = MagicMock(return_value=None)
# when
actual_content = service.hash_and_search('/some/path')
# then
self.assertEqual(actual_content, {
'sha1': '456caf10e9535160d90e874b45aa426de762f19f',
'found': False,
})
mock_hashutil.hashfile.assert_called_once_with('/some/path')
self.storage.content_find.assert_called_once_with({'sha1': bhash})
mock_hashutil.hash_to_hex.assert_called_once_with(bhash)
@patch('swh.web.ui.service.upload')
@istest
def test_upload_and_search(self, mock_upload):
mock_upload.save_in_upload_folder.return_value = (
'/tmp/dir', 'some-filename', '/tmp/dir/path/some-filename')
service.hash_and_search = MagicMock(side_effect=lambda filepath:
{'sha1': 'blah',
'found': True})
mock_upload.cleanup.return_value = None
file = MagicMock(filename='some-filename')
# when
actual_res = service.upload_and_search(file)
# then
self.assertEqual(actual_res, {
'filename': 'some-filename',
'sha1': 'blah',
'found': True})
mock_upload.save_in_upload_folder.assert_called_with(file)
mock_upload.cleanup.assert_called_with('/tmp/dir')
service.hash_and_search.assert_called_once_with(
'/tmp/dir/path/some-filename')
@istest
def lookup_origin(self):
# given
self.storage.origin_get = MagicMock(return_value={
'id': 'origin-id',
'lister': 'uuid-lister',
'project': 'uuid-project',
'url': 'ftp://some/url/to/origin',
'type': 'ftp'})
# when
actual_origin = service.lookup_origin('origin-id')
# then
self.assertEqual(actual_origin, {'id': 'origin-id',
'lister': 'uuid-lister',
'project': 'uuid-project',
'url': 'ftp://some/url/to/origin',
'type': 'ftp'})
self.storage.origin_get.assert_called_with({'id': 'origin-id'})
@istest
def lookup_release(self):
# given
self.storage.release_get = MagicMock(return_value=[{
'id': hex_to_hash('65a55bbdf3629f916219feb3dcc7393ded1bc8db'),
'target': None,
'date': datetime.datetime(2015, 1, 1, 22, 0, 0,
tzinfo=datetime.timezone.utc),
'name': b'v0.0.1',
'message': b'synthetic release',
'synthetic': True,
}])
# when
actual_release = service.lookup_release(
'65a55bbdf3629f916219feb3dcc7393ded1bc8db')
# then
self.assertEqual(actual_release, {
'id': '65a55bbdf3629f916219feb3dcc7393ded1bc8db',
'target': None,
'date': datetime.datetime(2015, 1, 1, 22, 0, 0,
tzinfo=datetime.timezone.utc),
'name': 'v0.0.1',
'message': 'synthetic release',
'synthetic': True,
})
self.storage.release_get.assert_called_with(
[hex_to_hash('65a55bbdf3629f916219feb3dcc7393ded1bc8db')])
@istest
def lookup_release_ko_id_checksum_not_ok_because_not_a_sha1(self):
# given
self.storage.release_get = MagicMock()
with self.assertRaises(BadInputExc) as cm:
# when
service.lookup_release('not-a-sha1')
self.assertIn('invalid checksum', cm.exception.args[0])
self.storage.release_get.called = False
@istest
def lookup_release_ko_id_checksum_ok_but_not_a_sha1(self):
# given
self.storage.release_get = MagicMock()
# when
with self.assertRaises(BadInputExc) as cm:
service.lookup_release(
'13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f62d4daf5'
'1aea892abe')
self.assertIn('sha1_git supported', cm.exception.args[0])
self.storage.release_get.called = False
@istest
def lookup_revision(self):
# given
self.storage.revision_get = MagicMock(return_value=[{
'id': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'),
'directory': hex_to_hash(
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'),
'author': {
'name': b'bill & boule',
'email': b'bill@boule.org',
},
'committer': {
'name': b'boule & bill',
'email': b'boule@bill.org',
},
'message': b'elegant fix for bug 31415957',
'date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'date_offset': 0,
'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
}])
# when
actual_revision = service.lookup_revision(
'18d8be353ed3480476f032475e7c233eff7371d5')
# then
self.assertEqual(actual_revision, {
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'author': {
'name': 'bill & boule',
'email': 'bill@boule.org',
},
'committer': {
'name': 'boule & bill',
'email': 'boule@bill.org',
},
'message': 'elegant fix for bug 31415957',
'date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'date_offset': 0,
'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
})
self.storage.revision_get.assert_called_with(
[hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5')])
@istest
def lookup_content_raw_not_found(self):
# given
self.storage.content_find = MagicMock(return_value=None)
# when
actual_content = service.lookup_content_raw(
'sha1:18d8be353ed3480476f032475e7c233eff7371d5')
# then
self.assertIsNone(actual_content)
self.storage.content_find.assert_called_with(
{'sha1': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5')})
@istest
def lookup_content_raw(self):
# given
self.storage.content_find = MagicMock(return_value={
'sha1': '18d8be353ed3480476f032475e7c233eff7371d5',
})
self.storage.content_get = MagicMock(return_value=[{
'data': b'binary data',
}, {}])
# when
actual_content = service.lookup_content_raw(
'sha256:39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926')
# then
- self.assertEquals(actual_content, {'data': 'binary data'})
+ self.assertEquals(actual_content, {'data': b'binary data'})
self.storage.content_find.assert_called_once_with(
{'sha256': hex_to_hash('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926')})
self.storage.content_get.assert_called_once_with(
['18d8be353ed3480476f032475e7c233eff7371d5'])
@istest
def lookup_content_not_found(self):
# given
self.storage.content_find = MagicMock(return_value=None)
# when
actual_content = service.lookup_content(
'sha1:18d8be353ed3480476f032475e7c233eff7371d5')
# then
self.assertIsNone(actual_content)
self.storage.content_find.assert_called_with(
{'sha1': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5')})
@istest
def lookup_content_with_sha1(self):
# given
self.storage.content_find = MagicMock(return_value={
'sha1': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'),
'sha256': hex_to_hash('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926'),
'sha1_git': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'length': 190,
'status': 'absent',
})
# when
actual_content = service.lookup_content(
'sha1:18d8be353ed3480476f032475e7c233eff7371d5')
# then
self.assertEqual(actual_content, {
'sha1': '18d8be353ed3480476f032475e7c233eff7371d5',
'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d274'
'7d3bf96c926',
'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'length': 190,
})
self.storage.content_find.assert_called_with(
{'sha1': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5')})
@istest
def lookup_content_with_sha256(self):
# given
self.storage.content_find = MagicMock(return_value={
'sha1': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'),
'sha256': hex_to_hash('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926'),
'sha1_git': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'length': 360,
'status': 'visible',
})
# when
actual_content = service.lookup_content(
'sha256:39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926')
# then
self.assertEqual(actual_content, {
'sha1': '18d8be353ed3480476f032475e7c233eff7371d5',
'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d274'
'7d3bf96c926',
'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'length': 360,
})
self.storage.content_find.assert_called_with(
{'sha256': hex_to_hash('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926')})
@istest
def lookup_person(self):
# given
self.storage.person_get = MagicMock(return_value=[{
'id': 'person_id',
'name': b'some_name',
'email': b'some-email',
}])
# when
actual_person = service.lookup_person('person_id')
# then
self.assertEqual(actual_person, {
'id': 'person_id',
'name': 'some_name',
'email': 'some-email',
})
self.storage.person_get.assert_called_with(['person_id'])
@istest
def lookup_person_not_found(self):
# given
self.storage.person_get = MagicMock(return_value=[])
# when
actual_person = service.lookup_person('person_id')
# then
self.assertIsNone(actual_person)
self.storage.person_get.assert_called_with(['person_id'])
@istest
def lookup_directory_bad_checksum(self):
# given
self.storage.directory_get = MagicMock()
# when
with self.assertRaises(BadInputExc):
service.lookup_directory('directory_id')
# then
self.storage.directory_get.called = False
@istest
def lookup_directory_not_found(self):
# given
self.storage.directory_get = MagicMock(return_value=[])
# when
actual_directory = service.lookup_directory(
'40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
# then
self.assertIsNone(actual_directory)
self.storage.directory_get.assert_called_with(
hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'))
@istest
def lookup_directory(self):
# given
dir_entries_input = {
'sha1': hex_to_hash('5c6f0e2750f48fa0bd0c4cf5976ba0b9e0'
'2ebda5'),
'sha256': hex_to_hash('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926'),
'sha1_git': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'target': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'dir_id': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'name': b'bob',
'type': 10,
}
expected_dir_entries = {
'sha1': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5',
'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d2747'
'd3bf96c926',
'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'target': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'dir_id': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'name': 'bob',
'type': 10,
}
self.storage.directory_get = MagicMock(
return_value=[dir_entries_input])
# when
actual_directory = service.lookup_directory(
'40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
# then
self.assertIsNotNone(actual_directory)
self.assertEqual(actual_directory, [expected_dir_entries])
self.storage.directory_get.assert_called_with(
hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'))
diff --git a/swh/web/ui/tests/test_views.py b/swh/web/ui/tests/test_views.py
index 0cd69f061..cb5e92e60 100644
--- a/swh/web/ui/tests/test_views.py
+++ b/swh/web/ui/tests/test_views.py
@@ -1,388 +1,388 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from nose.tools import istest
from swh.web.ui.tests import test_app
from unittest.mock import patch, MagicMock
from swh.web.ui.exc import BadInputExc
class ViewTestCase(test_app.SWHViewTestCase):
render_template = False
@istest
def info(self):
# when
rv = self.client.get('/about')
self.assertEquals(rv.status_code, 200)
self.assert_template_used('about.html')
self.assertIn(b'About', rv.data)
@istest
def search_default(self):
# when
rv = self.client.get('/search')
self.assertEquals(rv.status_code, 200)
self.assertEqual(self.get_context_variable('q'), '')
self.assertEqual(self.get_context_variable('message'), '')
self.assert_template_used('search.html')
@patch('swh.web.ui.views.service')
@istest
def search_content_found(self, mock_service):
# given
mock_service.lookup_hash.return_value = {
'found': True,
'algo': 'sha1'
}
# when
rv = self.client.get('/search?q=sha1:123')
self.assertEquals(rv.status_code, 200)
self.assert_template_used('search.html')
self.assertEqual(self.get_context_variable('q'), 'sha1:123')
self.assertEqual(self.get_context_variable('message'),
'Content with hash sha1:123 found!')
mock_service.lookup_hash.assert_called_once_with('sha1:123')
@patch('swh.web.ui.views.service')
@istest
def search_content_not_found(self, mock_service):
# given
mock_service.lookup_hash.return_value = {
'found': False,
'algo': 'sha1'
}
# when
rv = self.client.get('/search?q=sha1:456')
self.assertEquals(rv.status_code, 200)
self.assert_template_used('search.html')
self.assertEqual(self.get_context_variable('q'), 'sha1:456')
self.assertEqual(self.get_context_variable('message'),
'Content with hash sha1:456 not found!')
mock_service.lookup_hash.assert_called_once_with('sha1:456')
@patch('swh.web.ui.views.service')
@istest
def search_content_invalid_query(self, mock_service):
# given
mock_service.lookup_hash = MagicMock(
side_effect=BadInputExc('Invalid query!')
)
# when
rv = self.client.get('/search?q=sha1:invalid-hash')
self.assertEquals(rv.status_code, 200)
self.assert_template_used('search.html')
self.assertEqual(self.get_context_variable('q'), 'sha1:invalid-hash')
self.assertEqual(self.get_context_variable('message'),
'Invalid query!')
mock_service.lookup_hash.assert_called_once_with('sha1:invalid-hash')
@patch('swh.web.ui.views.service')
@istest
def show_content(self, mock_service):
# given
stub_content_raw = {
'sha1': 'sha1-hash',
- 'data': 'some-data'
+ 'data': b'some-data'
}
mock_service.lookup_content_raw.return_value = stub_content_raw
# when
rv = self.client.get('/browse/content/sha1:sha1-hash/raw')
self.assertEquals(rv.status_code, 200)
self.assert_template_used('display_content.html')
self.assertEqual(self.get_context_variable('message'),
'Content sha1-hash')
self.assertEqual(self.get_context_variable('content'),
stub_content_raw)
mock_service.lookup_content_raw.assert_called_once_with(
'sha1:sha1-hash')
@patch('swh.web.ui.views.service')
@istest
def show_content_not_found(self, mock_service):
# given
mock_service.lookup_content_raw.return_value = None
# when
rv = self.client.get('/browse/content/sha1:sha1-unknown/raw')
self.assertEquals(rv.status_code, 200)
self.assert_template_used('display_content.html')
self.assertEqual(self.get_context_variable('message'),
'Content with sha1:sha1-unknown not found.')
self.assertEqual(self.get_context_variable('content'), None)
mock_service.lookup_content_raw.assert_called_once_with(
'sha1:sha1-unknown')
@patch('swh.web.ui.views.service')
@istest
def show_content_invalid_hash(self, mock_service):
# given
mock_service.lookup_content_raw.side_effect = BadInputExc(
'Invalid hash')
# when
rv = self.client.get('/browse/content/sha2:sha1-invalid/raw')
self.assertEquals(rv.status_code, 200)
self.assert_template_used('display_content.html')
self.assertEqual(self.get_context_variable('message'),
'Invalid hash')
self.assertEqual(self.get_context_variable('content'), None)
mock_service.lookup_content_raw.assert_called_once_with(
'sha2:sha1-invalid')
@patch('swh.web.ui.views.service')
@patch('swh.web.ui.utils')
@istest
def browse_directory_bad_input(self, mock_utils, mock_service):
# given
mock_service.lookup_directory.side_effect = BadInputExc('Invalid hash')
# when
rv = self.client.get('/browse/directory/sha2-invalid')
# then
self.assertEquals(rv.status_code, 200)
self.assert_template_used('directory.html')
self.assertEqual(self.get_context_variable('message'),
'Invalid hash')
self.assertEqual(self.get_context_variable('files'), [])
mock_service.lookup_directory.assert_called_once_with(
'sha2-invalid')
@patch('swh.web.ui.views.service')
@patch('swh.web.ui.utils')
@istest
def browse_directory_empty_result(self, mock_utils, mock_service):
# given
mock_service.lookup_directory.return_value = None
# when
rv = self.client.get('/browse/directory/some-sha1')
# then
self.assertEquals(rv.status_code, 200)
self.assert_template_used('directory.html')
self.assertEqual(self.get_context_variable('message'),
'Directory some-sha1 not found.')
self.assertEqual(self.get_context_variable('files'), [])
mock_service.lookup_directory.assert_called_once_with(
'some-sha1')
@patch('swh.web.ui.views.service')
@patch('swh.web.ui.views.utils')
@istest
def browse_directory(self, mock_utils, mock_service):
# given
stub_directory_ls = [
{'type': 'dir',
'target': '123',
'name': 'some-dir-name'},
{'type': 'file',
'sha1': '654',
'name': 'some-filename'},
{'type': 'dir',
'target': '987',
'name': 'some-other-dirname'}
]
mock_service.lookup_directory.return_value = stub_directory_ls
stub_directory_map = [
{'link': '/path/to/url/dir/123',
'name': 'some-dir-name'},
{'link': '/path/to/url/file/654',
'name': 'some-filename'},
{'link': '/path/to/url/dir/987',
'name': 'some-other-dirname'}
]
mock_utils.prepare_directory_listing.return_value = stub_directory_map
# when
rv = self.client.get('/browse/directory/some-sha1')
# then
print(self.templates)
self.assertEquals(rv.status_code, 200)
self.assert_template_used('directory.html')
self.assertEqual(self.get_context_variable('message'),
'Listing for directory some-sha1:')
self.assertEqual(self.get_context_variable('files'),
stub_directory_map)
mock_service.lookup_directory.assert_called_once_with(
'some-sha1')
mock_utils.prepare_directory_listing.assert_called_once_with(
stub_directory_ls)
@patch('swh.web.ui.views.service')
@istest
def content_with_origin_content_not_found(self, mock_service):
# given
mock_service.lookup_hash.return_value = {'found': False}
# when
rv = self.client.get('/browse/content/sha256:some-sha256')
# then
self.assertEquals(rv.status_code, 200)
self.assert_template_used('content.html')
self.assertEqual(self.get_context_variable('message'),
'Hash sha256:some-sha256 was not found.')
mock_service.lookup_hash.assert_called_once_with(
'sha256:some-sha256')
mock_service.lookup_hash_origin.called = False
@patch('swh.web.ui.views.service')
@istest
def content_with_origin_bad_input(self, mock_service):
# given
mock_service.lookup_hash.side_effect = BadInputExc('Invalid hash')
# when
rv = self.client.get('/browse/content/sha256:some-sha256')
# then
self.assertEquals(rv.status_code, 200)
self.assert_template_used('content.html')
self.assertEqual(
self.get_context_variable('message'), 'Invalid hash')
mock_service.lookup_hash.assert_called_once_with(
'sha256:some-sha256')
mock_service.lookup_hash_origin.called = False
@patch('swh.web.ui.views.service')
@istest
def content_with_origin(self, mock_service):
# given
mock_service.lookup_hash.return_value = {'found': True}
mock_service.lookup_hash_origin.return_value = {
'origin_type': 'ftp',
'origin_url': '/some/url',
'revision': 'revision-hash',
'branch': 'master',
'path': '/path/to',
}
# when
rv = self.client.get('/browse/content/sha256:some-sha256')
# then
self.assertEquals(rv.status_code, 200)
self.assert_template_used('content.html')
self.assertEqual(
self.get_context_variable('message'),
"The content with hash sha256:some-sha256 has been seen on " +
"origin with type 'ftp'\n" +
"at url '/some/url'. The revision was identified at " +
"'revision-hash' on branch 'master'.\n" +
"The file's path referenced was '/path/to'.")
mock_service.lookup_hash.assert_called_once_with(
'sha256:some-sha256')
mock_service.lookup_hash_origin.assert_called_once_with(
'sha256:some-sha256')
@istest
def uploadnsearch_get(self):
# when
rv = self.client.get('/uploadnsearch')
# then
self.assertEquals(rv.status_code, 200)
self.assertEqual(self.get_context_variable('message'), '')
self.assertEqual(self.get_context_variable('filename'), None)
self.assertEqual(self.get_context_variable('found'), None)
self.assert_template_used('upload_and_search.html')
@patch('swh.web.ui.views.service')
@patch('swh.web.ui.views.request')
@istest
def uploadnsearch_post_not_found(self, mock_request, mock_service):
# given
mock_request.method = 'POST'
mock_request.files = dict(filename='foobar')
mock_service.upload_and_search.return_value = {'filename': 'foobar',
'sha1': 'blahhash',
'found': False}
# when
# the mock mock_request completes the post request
rv = self.client.post('/uploadnsearch')
# then
self.assertEquals(rv.status_code, 200)
self.assertEqual(self.get_context_variable('message'),
'The file foobar with hash blahhash has not been'
' found.')
self.assertEqual(self.get_context_variable('filename'), 'foobar')
self.assertEqual(self.get_context_variable('found'), False)
self.assertEqual(self.get_context_variable('sha1'), 'blahhash')
self.assert_template_used('upload_and_search.html')
mock_service.upload_and_search.called = True
@patch('swh.web.ui.views.service')
@patch('swh.web.ui.views.request')
@istest
def uploadnsearch_post_found(self, mock_request, mock_service):
# given
mock_request.method = 'POST'
mock_request.files = dict(filename='foobar')
mock_service.upload_and_search.return_value = {'filename': 'foobar',
'sha1': 'hash-blah',
'found': True}
# when
# the mock mock_request completes the post request
rv = self.client.post('/uploadnsearch')
# then
self.assertEquals(rv.status_code, 200)
self.assertEqual(self.get_context_variable('message'),
'The file foobar with hash hash-blah has been found.')
self.assertEqual(self.get_context_variable('filename'), 'foobar')
self.assertEqual(self.get_context_variable('found'), True)
self.assertEqual(self.get_context_variable('sha1'), 'hash-blah')
self.assert_template_used('upload_and_search.html')
mock_service.upload_and_search.called = True
@patch('swh.web.ui.views.service')
@patch('swh.web.ui.views.request')
@istest
def uploadnsearch_post_bad_input(self, mock_request, mock_service):
# given
mock_request.method = 'POST'
mock_request.files = dict(filename='foobar')
mock_service.upload_and_search.side_effect = BadInputExc(
'Invalid hash')
# when
# the mock mock_request completes the post request
rv = self.client.post('/uploadnsearch')
# then
self.assertEquals(rv.status_code, 200)
self.assertEqual(self.get_context_variable('message'), 'Invalid hash')
self.assert_template_used('upload_and_search.html')
mock_service.upload_and_search.called = True
diff --git a/swh/web/ui/views.py b/swh/web/ui/views.py
index b9c11ab7c..547649300 100644
--- a/swh/web/ui/views.py
+++ b/swh/web/ui/views.py
@@ -1,216 +1,218 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from flask import render_template, flash, request
from flask.ext.api.decorators import set_renderers
from flask.ext.api.renderers import HTMLRenderer
from swh.core.hashutil import ALGORITHMS
from swh.web.ui import service, utils
from swh.web.ui.exc import BadInputExc
from swh.web.ui.main import app
hash_filter_keys = ALGORITHMS
@app.route('/')
@set_renderers(HTMLRenderer)
def homepage():
"""Home page
"""
flash('This Web app is still work in progress, use at your own risk',
'warning')
# return redirect(url_for('about'))
return render_template('home.html')
@app.route('/about')
@set_renderers(HTMLRenderer)
def about():
return render_template('about.html')
@app.route('/search')
@set_renderers(HTMLRenderer)
def search():
"""Search for hashes in swh-storage.
"""
q = request.args.get('q', '')
env = {'q': q, 'message': ''}
try:
if q:
r = service.lookup_hash(q)
env['message'] = 'Content with hash %s%sfound!' % (
q,
' ' if r['found'] == True else ' not '
)
except BadInputExc as e:
env['message'] = str(e)
return render_template('search.html', **env)
@app.route('/uploadnsearch', methods=['GET', 'POST'])
@set_renderers(HTMLRenderer)
def uploadnsearch():
"""Upload and search for hashes in swh-storage.
"""
env = {'filename': None, 'message': '', 'found': None}
if request.method == 'POST':
file = request.files['filename']
try:
uploaded_content = service.upload_and_search(file)
filename = uploaded_content['filename']
sha1 = uploaded_content['sha1']
found = uploaded_content['found']
message = 'The file %s with hash %s has%sbeen found.' % (
filename,
sha1,
' ' if found else ' not ')
env.update({
'filename': filename,
'sha1': sha1,
'found': found,
'message': message
})
except BadInputExc as e:
env['message'] = str(e)
return render_template('upload_and_search.html', **env)
def _origin_seen(q, data):
"""Given an origin, compute a message string with the right information.
Args:
origin: a dictionary with keys:
- origin: a dictionary with type and url keys
- occurrence: a dictionary with a validity range
Returns:
Message as a string
"""
origin_type = data['origin_type']
origin_url = data['origin_url']
revision = data['revision']
branch = data['branch']
path = data['path']
return """The content with hash %s has been seen on origin with type '%s'
at url '%s'. The revision was identified at '%s' on branch '%s'.
The file's path referenced was '%s'.""" % (q,
origin_type,
origin_url,
revision,
branch,
path)
@app.route('/browse/content/<string:q>')
@set_renderers(HTMLRenderer)
def content_with_origin(q):
"""Show content information.
Args:
- q: query string of the form <algo_hash:hash> with
`algo_hash` in sha1, sha1_git, sha256.
This means that several different URLs (at least one per
HASH_ALGO) will point to the same content sha: the sha with
'hash' format
Returns:
The content's information at for a given checksum.
"""
env = {'q': q}
try:
content = service.lookup_hash(q)
if not content.get('found'):
message = "Hash %s was not found." % q
else:
origin = service.lookup_hash_origin(q)
message = _origin_seen(q, origin)
except BadInputExc as e: # do not like it but do not duplicate code
message = str(e)
env['message'] = message
return render_template('content.html', **env)
@app.route('/browse/content/<string:q>/raw')
@set_renderers(HTMLRenderer)
def show_content(q):
"""Given a hash and a checksum, display the content's raw data.
Args:
q is of the form algo_hash:hash with algo_hash in
(sha1, sha1_git, sha256)
Returns:
Information on one possible origin for such content.
Raises:
BadInputExc in case of unknown algo_hash or bad hash
NotFoundExc if the content is not found.
"""
env = {}
try:
content = service.lookup_content_raw(q)
if content:
+ # FIXME: will break if not utf-8
+ content['data'] = content['data'].decode('utf-8')
message = 'Content %s' % content['sha1']
else:
message = 'Content with %s not found.' % q
except BadInputExc as e:
message = str(e)
content = None
env['message'] = message
env['content'] = content
return render_template('display_content.html', **env)
@app.route('/browse/directory/<string:sha1_git>')
@set_renderers(HTMLRenderer)
def browse_directory(sha1_git):
"""Show directory information.
Args:
- sha1_git: the directory's sha1 git identifier.
Returns:
The content's information at sha1_git
"""
env = {'sha1_git': sha1_git}
try:
directory_files = service.lookup_directory(sha1_git)
if directory_files:
message = "Listing for directory %s:" % sha1_git
files = utils.prepare_directory_listing(directory_files)
else:
message = "Directory %s not found." % sha1_git
files = []
except BadInputExc as e: # do not like it but do not duplicate code
message = str(e)
files = []
env['message'] = message
env['files'] = files
return render_template('directory.html', **env)
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sat, Jun 21, 5:27 PM (1 w, 5 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3267921
Attached To
R65 Staging repository
Event Timeline
Log In to Comment