Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/web/ui/api.py b/swh/web/ui/api.py
index 99db425c0..d550564ab 100644
--- a/swh/web/ui/api.py
+++ b/swh/web/ui/api.py
@@ -1,281 +1,281 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from flask import request, url_for
from flask.ext.api.decorators import set_renderers
from swh.web.ui.main import app
from swh.web.ui import service, renderers, utils
from swh.web.ui.exc import BadInputExc, NotFoundExc
@app.route('/browse/')
def api_browse_endpoints():
"""List the current api endpoints starting with /api or /api/.
Returns:
List of endpoints at /api
"""
return utils.filter_endpoints(app.url_map, '/browse')
@app.route('/api/')
def api_main_endpoints():
"""List the current api endpoints starting with /api or /api/.
Returns:
List of endpoints at /api
"""
return utils.filter_endpoints(app.url_map, '/api')
@app.route('/api/1/')
def api_main_v1_endpoints():
"""List the current api v1 endpoints starting with /api/1 or /api/1/.
Returns:
List of endpoints at /api/1
"""
return utils.filter_endpoints(app.url_map, '/api/1')
@app.route('/api/1/stat/counters/')
def api_stats():
"""Return statistics on SWH storage.
Returns:
SWH storage's statistics
"""
return service.stat_counters()
@app.route('/api/1/search/<string:q>/')
def api_search(q):
"""Search a content per hash.
Args:
q is of the form algo_hash:hash with algo_hash in
(sha1, sha1_git, sha256)
Returns:
Dictionary with 'found' key and the associated result.
Raises:
BadInputExc in case of unknown algo_hash or bad hash
"""
return {'found': service.lookup_hash(q)}
def _api_lookup(criteria, lookup_fn, error_msg_if_not_found):
"""Factorize function regarding the api to lookup for data."""
res = lookup_fn(criteria)
if not res:
raise NotFoundExc(error_msg_if_not_found)
return res
@app.route('/api/1/origin/<int:origin_id>/')
def api_origin(origin_id):
"""Return information about origin with id origin_id.
Args:
origin_id: the origin's identifier
Returns:
Information on the origin if found.
Raises:
NotFoundExc if the origin is not found.
"""
return _api_lookup(
origin_id, lookup_fn=service.lookup_origin,
error_msg_if_not_found='Origin with id %s not found.' % origin_id)
@app.route('/api/1/person/<int:person_id>/')
def api_person(person_id):
"""Return information about person with identifier person_id.
Args:
person_id: the person's identifier
Returns:
Information on the person if found.
Raises:
NotFoundExc if the person is not found.
"""
return _api_lookup(
person_id, lookup_fn=service.lookup_person,
error_msg_if_not_found='Person with id %s not found.' % person_id)
@app.route('/api/1/release/<string:sha1_git>/')
def api_release(sha1_git):
"""Return information about release with id sha1_git.
Args:
sha1_git: the release's hash
Returns:
Information on the release if found.
Raises:
BadInputExc in case of unknown algo_hash or bad hash
NotFoundExc if the release is not found.
"""
error_msg = 'Release with sha1_git %s not found.' % sha1_git
return _api_lookup(
sha1_git,
lookup_fn=service.lookup_release,
error_msg_if_not_found=error_msg)
@app.route('/api/1/revision/<string:sha1_git>/')
def api_revision(sha1_git):
"""Return information about revision with id sha1_git.
Args:
sha1_git: the revision's hash
Returns:
Information on the revision if found.
Raises:
BadInputExc in case of unknown algo_hash or bad hash
NotFoundExc if the revision is not found.
"""
error_msg = 'Revision with sha1_git %s not found.' % sha1_git
return _api_lookup(
sha1_git,
lookup_fn=service.lookup_revision,
error_msg_if_not_found=error_msg)
@app.route('/api/1/directory/<string:sha1_git>/')
def api_directory(sha1_git):
"""Return information about release with id sha1_git.
Args:
Directory's sha1_git
Raises:
BadInputExc in case of unknown algo_hash or bad hash
NotFoundExc if the content is not found.
"""
directory_entries = service.lookup_directory(sha1_git)
if not directory_entries:
raise NotFoundExc('Directory with sha1_git %s not found.' % sha1_git)
return list(directory_entries)
@app.route('/api/1/browse/<string:q>/')
def api_content_checksum_to_origin(q):
"""Return content information up to one of its origin if the content
is found.
Args:
q is of the form algo_hash:hash with algo_hash in
(sha1, sha1_git, sha256)
Returns:
Information on one possible origin for such content.
Raises:
BadInputExc in case of unknown algo_hash or bad hash
NotFoundExc if the content is not found.
"""
found = service.lookup_hash(q)['found']
if not found:
raise NotFoundExc('Content with %s not found.' % q)
return service.lookup_hash_origin(q)
@app.route('/api/1/content/<string:q>/raw/')
-@set_renderers(renderers.PlainRenderer)
+@set_renderers(renderers.BytesRenderer)
def api_content_raw(q):
"""Return content's raw data if content is found.
Args:
q is of the form (algo_hash:)hash with algo_hash in
(sha1, sha1_git, sha256).
When algo_hash is not provided, 'hash' is considered sha1.
Returns:
Content's raw data in text/plain.
Raises:
- BadInputExc in case of unknown algo_hash or bad hash
- NotFoundExc if the content is not found.
"""
content = service.lookup_content_raw(q)
if not content:
raise NotFoundExc('Content with %s not found.' % q)
return content['data']
@app.route('/api/1/content/<string:q>/')
def api_content_with_details(q):
"""Return content information if content is found.
Args:
q is of the form (algo_hash:)hash with algo_hash in
(sha1, sha1_git, sha256).
When algo_hash is not provided, 'hash' is considered sha1.
Returns:
Content's information.
Raises:
- BadInputExc in case of unknown algo_hash or bad hash
- NotFoundExc if the content is not found.
"""
content = service.lookup_content(q)
if not content:
raise NotFoundExc('Content with %s not found.' % q)
content['data'] = url_for('api_content_raw', q=content['sha1'])
return content
@app.route('/api/1/uploadnsearch/', methods=['POST'])
def api_uploadnsearch():
"""Upload the file's content in the post body request.
Compute its hash and determine if it exists in the storage.
Args:
request.files filled with the filename's data to upload.
Returns:
Dictionary with 'sha1', 'filename' and 'found' predicate depending
on whether we find it or not.
Raises:
BadInputExc in case of the form submitted is incorrect.
"""
file = request.files.get('filename')
if not file:
raise BadInputExc('Bad request, missing \'filename\' entry in form.')
return service.upload_and_search(file)
diff --git a/swh/web/ui/converters.py b/swh/web/ui/converters.py
index 01f10d617..b30cb1e75 100644
--- a/swh/web/ui/converters.py
+++ b/swh/web/ui/converters.py
@@ -1,173 +1,173 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from swh.core import hashutil
def fmap(f, data):
if isinstance(data, list):
return [f(x) for x in data]
if isinstance(data, dict):
return {k: f(v) for (k, v) in data.items()}
return f(data)
def from_swh(dict_swh, hashess=[], bytess=[], blacklist=[]):
"""Convert from an swh dictionary to something reasonably json
serializable.
Args:
- dict_swh: the origin dictionary needed to be transformed
- hashess: list/set of keys representing hashes values (sha1, sha256,
sha1_git, etc...) as bytes. Those need to be transformed in hexadecimal
string
- bytess: list/set of keys representing bytes values which needs to
be decoded
The remaining keys are copied as is in the output.
Returns:
dictionary equivalent as dict_swh only with its keys `converted`.
"""
def convert_hashes_bytes(v):
"""v is supposedly a hash as bytes, returns it converted in hex.
"""
if v and isinstance(v, bytes):
return hashutil.hash_to_hex(v)
return v
def convert_bytes(v):
"""v is supposedly a bytes string, decode as utf-8.
FIXME: Improve decoding policy.
If not utf-8, break!
"""
if v and isinstance(v, bytes):
return v.decode('utf-8')
return v
if not dict_swh:
return dict_swh
new_dict = {}
for key, value in dict_swh.items():
if isinstance(value, dict):
new_dict[key] = from_swh(value, hashess, bytess, blacklist)
elif key in hashess:
new_dict[key] = fmap(convert_hashes_bytes, value)
elif key in bytess:
new_dict[key] = fmap(convert_bytes, value)
elif key in blacklist:
continue
else:
new_dict[key] = value
return new_dict
def from_origin(origin):
"""Convert from an SWH origin to an origin dictionary.
"""
return from_swh(origin,
hashess=set(['revision']),
bytess=set(['path']))
def from_release(release):
"""Convert from an SWH release to a json serializable release dictionary.
Args:
release: Dict with the following keys
- id: identifier of the revision (sha1 in bytes)
- revision: identifier of the revision the release points to (sha1 in
bytes)
- comment: release's comment message (bytes)
- name: release's name (string)
- author: release's author identifier (swh's id)
- synthetic: the synthetic property (boolean)
Returns:
Release dictionary with the following keys:
- id: hexadecimal sha1 (string)
- revision: hexadecimal sha1 (string)
- comment: release's comment message (string)
- name: release's name (string)
- author: release's author identifier (swh's id)
- synthetic: the synthetic property (boolean)
"""
return from_swh(release,
hashess=set(['id', 'target']),
bytess=set(['message', 'name', 'email']))
def from_revision(revision):
"""Convert from an SWH revision to a json serializable revision dictionary.
Args:
revision: Dict with the following keys
- id: identifier of the revision (sha1 in bytes)
- directory: identifier of the directory the revision points to (sha1
in bytes)
- author_name, author_email: author's revision name and email
- committer_name, committer_email: committer's revision name and email
- message: revision's message
- date, date_offset: revision's author date
- committer_date, committer_date_offset: revision's commit date
- parents: list of parents for such revision
- synthetic: revision's property nature
- type: revision's type (git, tar or dsc at the moment)
- metadata: if the revision is synthetic, this can reference dynamic
properties.
Returns:
Revision dictionary with the same keys as inputs, only:
- sha1s are in hexadecimal strings (id, directory)
- bytes are decoded in string (author_name, committer_name,
author_email, committer_email, message)
- remaining keys are left as is
"""
return from_swh(revision,
hashess=set(['id', 'directory', 'parents']),
bytess=set(['name',
'email',
'message']))
def from_content(content):
"""Convert swh content to serializable content dictionary.
"""
return from_swh(content,
- hashess=set(['sha1', 'sha1_git', 'sha256']),
- bytess=set(['data']),
- blacklist=set(['status']))
+ hashess={'sha1', 'sha1_git', 'sha256'},
+ bytess={},
+ blacklist={'status'})
def from_person(person):
"""Convert swh person to serializable person dictionary.
"""
return from_swh(person,
hashess=set(),
bytess=set(['name', 'email']))
def from_directory_entry(dir_entry):
"""Convert swh person to serializable person dictionary.
"""
return from_swh(dir_entry,
hashess=set(['dir_id',
'sha1_git',
'sha1',
'sha256',
'target']),
bytess=set(['name']))
diff --git a/swh/web/ui/renderers.py b/swh/web/ui/renderers.py
index 57669cab5..b86631254 100644
--- a/swh/web/ui/renderers.py
+++ b/swh/web/ui/renderers.py
@@ -1,142 +1,151 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import yaml
from flask import make_response, request
from flask.ext.api import renderers, parsers
from flask_api.mediatypes import MediaType
from swh.web.ui import utils
class SWHFilterEnricher():
"""Global filter on fields.
"""
def filter_by_fields(self, data):
"""Extract a request parameter 'fields' if it exists to permit the
filtering on the data dict's keys.
If such field is not provided, returns the data as is.
"""
fields = request.args.get('fields')
if fields:
fields = set(fields.split(','))
data = utils.filter_field_keys(data, fields)
return data
-class PlainRenderer(renderers.BaseRenderer):
+class DoNothingRenderer(renderers.BaseRenderer):
+ def render(self, data, media_type, **options):
+ return data
+
+
+class PlainRenderer(DoNothingRenderer):
"""Renderer for plain/text, do nothing but send the data as is.
"""
media_type = 'text/plain'
- def render(self, data, media_type, **options):
- return data
+
+class BytesRenderer(DoNothingRenderer):
+ """Renderer for plain/text, do nothing but send the data as is.
+
+ """
+ media_type = 'application/octet-stream'
class YAMLRenderer(renderers.BaseRenderer, SWHFilterEnricher):
"""Renderer for application/yaml.
Orchestrate from python data structure to yaml.
"""
media_type = 'application/yaml'
def render(self, data, media_type, **options):
data = self.filter_by_fields(data)
return yaml.dump(data, encoding=self.charset)
class JSONPEnricher():
"""JSONP rendering.
"""
def enrich_with_jsonp(self, data):
"""Defines a jsonp function that extracts a potential 'callback'
request parameter holding the function name and wraps the data
inside a call to such function
e.g:
GET /blah/foo/bar renders: {'output': 'wrapped'}
GET /blah/foo/bar?callback=fn renders: fn({'output': 'wrapped'})
"""
jsonp = request.args.get('callback')
if jsonp:
return '%s(%s)' % (jsonp, data)
return data
class SWHJSONRenderer(renderers.JSONRenderer,
SWHFilterEnricher,
JSONPEnricher):
"""Renderer for application/json.
Serializes in json the data and returns it.
Also deals with jsonp. If callback is found in request parameter,
wrap the result as a function with name the value of the parameter
query 'callback'.
"""
def render(self, data, media_type, **options):
data = self.filter_by_fields(data)
res = super().render(data, media_type, **options)
return self.enrich_with_jsonp(res)
class SWHBrowsableAPIRenderer(renderers.BrowsableAPIRenderer):
"""SWH's browsable api renderer.
"""
template = "api.html"
RENDERERS = [
'swh.web.ui.renderers.SWHJSONRenderer',
'swh.web.ui.renderers.SWHBrowsableAPIRenderer',
'flask.ext.api.parsers.URLEncodedParser',
'swh.web.ui.renderers.YAMLRenderer',
'swh.web.ui.renderers.PlainRenderer',
]
RENDERERS_INSTANCE = [
SWHJSONRenderer(),
SWHBrowsableAPIRenderer(),
parsers.URLEncodedParser(),
YAMLRenderer(),
]
RENDERERS_BY_TYPE = {
r.media_type: r
for r in RENDERERS_INSTANCE
}
def error_response(default_error_msg, error_code, error):
"""Private function to create a custom error response.
"""
# if nothing is requested by client, use json
default_application_type = 'application/json'
accept_type = request.headers.get('Accept', default_application_type)
renderer = RENDERERS_BY_TYPE.get(
accept_type,
RENDERERS_BY_TYPE[default_application_type])
# for edge cases, use the elected renderer's media type
accept_type = renderer.media_type
response = make_response(default_error_msg, error_code)
response.headers['Content-Type'] = accept_type
response.data = renderer.render({"error": str(error)},
media_type=MediaType(accept_type),
status=error_code,
headers={'Content-Type': accept_type})
return response
diff --git a/swh/web/ui/tests/test_api.py b/swh/web/ui/tests/test_api.py
index 41ffd6121..0257bd96e 100644
--- a/swh/web/ui/tests/test_api.py
+++ b/swh/web/ui/tests/test_api.py
@@ -1,535 +1,535 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
import yaml
from nose.tools import istest
from unittest.mock import patch, MagicMock
from swh.web.ui.tests import test_app
class ApiTestCase(test_app.SWHApiTestCase):
@patch('swh.web.ui.api.service')
@istest
def api_content_checksum_to_origin(self, mock_service):
mock_service.lookup_hash.return_value = {'found': True}
stub_origin = {
"lister": None,
"url": "rsync://ftp.gnu.org/old-gnu/webbase",
"type": "ftp",
"id": 2,
"project": None
}
mock_service.lookup_hash_origin.return_value = stub_origin
# when
rv = self.app.get(
'/api/1/browse/sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, stub_origin)
mock_service.lookup_hash.assert_called_once_with(
'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
mock_service.lookup_hash_origin.assert_called_once_with(
'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
@patch('swh.web.ui.api.service')
@istest
def api_content_checksum_to_origin_sha_not_found(self, mock_service):
# given
mock_service.lookup_hash.return_value = {'found': False}
# when
rv = self.app.get(
'/api/1/browse/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Content with sha1:40e71b8614fcd89ccd17ca2b1d9e6'
'6c5b00a6d03 not found.'
})
mock_service.lookup_hash.assert_called_once_with(
'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
@patch('swh.web.ui.api.service')
@istest
def api_content_with_details(self, mock_service):
# given
mock_service.lookup_content.return_value = {
'data': 'some content data',
'sha1': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'sha1_git': 'b4e8f472ffcb01a03875b26e462eb568739f6882',
'sha256': '83c0e67cc80f60caf1fcbec2d84b0ccd7968b3be4735637006560'
'cde9b067a4f',
'length': 17,
'status': 'visible'
}
# when
rv = self.app.get(
'/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'data': '/api/1/content/'
'40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/raw/',
'sha1': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'sha1_git': 'b4e8f472ffcb01a03875b26e462eb568739f6882',
'sha256': '83c0e67cc80f60caf1fcbec2d84b0ccd7968b3be4735637006560c'
'de9b067a4f',
'length': 17,
'status': 'visible'
})
mock_service.lookup_content.assert_called_once_with(
'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
@patch('swh.web.ui.api.service')
@istest
def api_content_not_found_as_json(self, mock_service):
# given
mock_service.lookup_content.return_value = None
mock_service.lookup_hash_origin = MagicMock()
# when
rv = self.app.get(
'/api/1/content/sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3'
'be4735637006560c/')
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Content with sha256:83c0e67cc80f60caf1fcbec2d84b0ccd79'
'68b3be4735637006560c not found.'
})
mock_service.lookup_content.assert_called_once_with(
'sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3'
'be4735637006560c')
mock_service.lookup_hash_origin.called = False
@patch('swh.web.ui.api.service')
@istest
def api_content_not_found_as_yaml(self, mock_service):
# given
mock_service.lookup_content.return_value = None
mock_service.lookup_hash_origin = MagicMock()
# when
rv = self.app.get(
'/api/1/content/sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3'
'be4735637006560c/',
headers={'accept': 'application/yaml'})
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/yaml')
response_data = yaml.load(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Content with sha256:83c0e67cc80f60caf1fcbec2d84b0ccd79'
'68b3be4735637006560c not found.'
})
mock_service.lookup_content.assert_called_once_with(
'sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3'
'be4735637006560c')
mock_service.lookup_hash_origin.called = False
@patch('swh.web.ui.api.service')
@istest
def api_content_raw(self, mock_service):
# given
stub_content = {'data': 'some content data'}
mock_service.lookup_content_raw.return_value = stub_content
# when
rv = self.app.get(
'/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'
'/raw/',
headers={'Content-Type': 'text/plain'})
self.assertEquals(rv.status_code, 200)
- self.assertEquals(rv.mimetype, 'text/plain')
+ self.assertEquals(rv.mimetype, 'application/octet-stream')
self.assertEquals(rv.data.decode('utf-8'), stub_content['data'])
mock_service.lookup_content_raw.assert_called_once_with(
'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
@patch('swh.web.ui.api.service')
@istest
def api_content_raw_not_found(self, mock_service):
# given
mock_service.lookup_content_raw.return_value = None
# when
rv = self.app.get(
'/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'
'/raw/',
headers={'Content-Type': 'text/plain'})
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Content with sha1:40e71b8614fcd89ccd17ca2b1d9e6'
'6c5b00a6d03 not found.'
})
mock_service.lookup_content_raw.assert_called_once_with(
'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
@patch('swh.web.ui.api.service')
@istest
def api_search(self, mock_service):
# given
mock_service.lookup_hash.return_value = True
# when
rv = self.app.get('/api/1/search/sha1:blah/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {'found': True})
mock_service.lookup_hash.assert_called_once_with('sha1:blah')
@patch('swh.web.ui.api.service')
@istest
def api_search_as_yaml(self, mock_service):
# given
mock_service.lookup_hash.return_value = True
# when
rv = self.app.get('/api/1/search/sha1:halb/',
headers={'Accept': 'application/yaml'})
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/yaml')
response_data = yaml.load(rv.data.decode('utf-8'))
self.assertEquals(response_data, {'found': True})
mock_service.lookup_hash.assert_called_once_with('sha1:halb')
@patch('swh.web.ui.api.service')
@istest
def api_search_not_found(self, mock_service):
# given
mock_service.lookup_hash.return_value = False
# when
rv = self.app.get('/api/1/search/sha1:halb/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {'found': False})
mock_service.lookup_hash.assert_called_once_with('sha1:halb')
@patch('swh.web.ui.api.service')
@istest
def api_1_stat_counters_raise_error(self, mock_service):
# given
mock_service.stat_counters.side_effect = ValueError(
'voluntary error to check the bad request middleware.')
# when
rv = self.app.get('/api/1/stat/counters/')
# then
self.assertEquals(rv.status_code, 400)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'voluntary error to check the bad request middleware.'})
@patch('swh.web.ui.api.service')
@istest
def api_1_stat_counters(self, mock_service):
# given
stub_stats = {
"content": 1770830,
"directory": 211683,
"directory_entry_dir": 209167,
"directory_entry_file": 1807094,
"directory_entry_rev": 0,
"entity": 0,
"entity_history": 0,
"occurrence": 0,
"occurrence_history": 19600,
"origin": 1096,
"person": 0,
"release": 8584,
"revision": 7792,
"revision_history": 0,
"skipped_content": 0
}
mock_service.stat_counters.return_value = stub_stats
# when
rv = self.app.get('/api/1/stat/counters/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, stub_stats)
mock_service.stat_counters.assert_called_once_with()
@patch('swh.web.ui.api.service')
@patch('swh.web.ui.api.request')
@istest
def api_uploadnsearch(self, mock_request, mock_service):
# given
mock_request.files = {'filename': 'simple-filename'}
mock_service.upload_and_search.return_value = {
'filename': 'simple-filename',
'sha1': 'some-hex-sha1',
'found': False,
}
# when
rv = self.app.post('/api/1/uploadnsearch/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {'filename': 'simple-filename',
'sha1': 'some-hex-sha1',
'found': False})
mock_service.upload_and_search.assert_called_once_with(
'simple-filename')
@patch('swh.web.ui.api.service')
@istest
def api_origin(self, mock_service):
# given
stub_origin = {
'id': 1234,
'lister': 'uuid-lister-0',
'project': 'uuid-project-0',
'url': 'ftp://some/url/to/origin/0',
'type': 'ftp'
}
mock_service.lookup_origin.return_value = stub_origin
# when
rv = self.app.get('/api/1/origin/1234/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, stub_origin)
mock_service.lookup_origin.assert_called_with(1234)
@patch('swh.web.ui.api.service')
@istest
def api_origin_not_found(self, mock_service):
# given
mock_service.lookup_origin.return_value = None
# when
rv = self.app.get('/api/1/origin/4321/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Origin with id 4321 not found.'
})
mock_service.lookup_origin.assert_called_with(4321)
@patch('swh.web.ui.api.service')
@istest
def api_release(self, mock_service):
# given
stub_release = {
'id': 'release-0',
'revision': 'revision-sha1',
'author_name': 'author release name',
'author_email': 'author@email',
}
mock_service.lookup_release.return_value = stub_release
# when
rv = self.app.get('/api/1/release/release-0/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, stub_release)
@patch('swh.web.ui.api.service')
@istest
def api_release_not_found(self, mock_service):
# given
mock_service.lookup_release.return_value = None
# when
rv = self.app.get('/api/1/release/release-0/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Release with sha1_git release-0 not found.'
})
@patch('swh.web.ui.api.service')
@istest
def api_revision(self, mock_service):
# given
stub_revision = {
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'author_name': 'Software Heritage',
'author_email': 'robot@softwareheritage.org',
'committer_name': 'Software Heritage',
'committer_email': 'robot@softwareheritage.org',
'message': 'synthetic revision message',
'date_offset': 0,
'committer_date_offset': 0,
'parents': [],
'type': 'tar',
'synthetic': True,
'metadata': {
'original_artifact': [{
'archive_type': 'tar',
'name': 'webbase-5.7.0.tar.gz',
'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd',
'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1',
'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f'
'309d36484e7edf7bb912'
}]
},
}
mock_service.lookup_revision.return_value = stub_revision
# when
rv = self.app.get('/api/1/revision/revision-0/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, stub_revision)
@patch('swh.web.ui.api.service')
@istest
def api_revision_not_found(self, mock_service):
# given
mock_service.lookup_revision.return_value = None
# when
rv = self.app.get('/api/1/revision/revision-0/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Revision with sha1_git revision-0 not found.'})
@patch('swh.web.ui.api.service')
@istest
def api_person(self, mock_service):
# given
stub_person = {
'id': '198003',
'name': 'Software Heritage',
'email': 'robot@softwareheritage.org',
}
mock_service.lookup_person.return_value = stub_person
# when
rv = self.app.get('/api/1/person/198003/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, stub_person)
@patch('swh.web.ui.api.service')
@istest
def api_person_not_found(self, mock_service):
# given
mock_service.lookup_person.return_value = None
# when
rv = self.app.get('/api/1/person/666/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Person with id 666 not found.'})
@patch('swh.web.ui.api.service')
@istest
def api_directory(self, mock_service):
# given
stub_directory = [{
'sha1_git': '18d8be353ed3480476f032475e7c233eff7371d5',
}]
mock_service.lookup_directory.return_value = stub_directory
# when
rv = self.app.get('/api/1/directory/'
'18d8be353ed3480476f032475e7c233eff7371d5/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, stub_directory)
@patch('swh.web.ui.api.service')
@istest
def api_directory_not_found(self, mock_service):
# given
mock_service.lookup_directory.return_value = []
# when
rv = self.app.get('/api/1/directory/'
'66618d8be353ed3480476f032475e7c233eff737/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Directory with sha1_git '
'66618d8be353ed3480476f032475e7c233eff737 not found.'})
diff --git a/swh/web/ui/tests/test_converters.py b/swh/web/ui/tests/test_converters.py
index 649543424..ef8e720d4 100644
--- a/swh/web/ui/tests/test_converters.py
+++ b/swh/web/ui/tests/test_converters.py
@@ -1,362 +1,362 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import unittest
from nose.tools import istest
from swh.core import hashutil
from swh.web.ui import converters
class ConvertersTestCase(unittest.TestCase):
@istest
def from_swh(self):
some_input = {
'a': 'something',
'b': 'someone',
'c': b'sharp-0.3.4.tgz',
'd': hashutil.hex_to_hash(
'b04caf10e9535160d90e874b45aa426de762f19f'),
'e': b'sharp.html/doc_002dS_005fISREG.html',
'g': [b'utf-8-to-decode', b'another-one'],
'h': 'something filtered',
'i': {'e': b'something'},
'j': {
'k': {
'l': [b'bytes thing', b'another thingy'],
'n': 'dont care either'
},
'm': 'dont care'
}
}
expected_output = {
'a': 'something',
'b': 'someone',
'c': 'sharp-0.3.4.tgz',
'd': 'b04caf10e9535160d90e874b45aa426de762f19f',
'e': 'sharp.html/doc_002dS_005fISREG.html',
'g': ['utf-8-to-decode', 'another-one'],
'i': {'e': 'something'},
'j': {
'k': {
'l': ['bytes thing', 'another thingy']
}
},
}
actual_output = converters.from_swh(some_input,
hashess={'d'},
bytess={'c', 'e', 'g', 'l'},
blacklist={'h', 'm', 'n'})
self.assertEquals(expected_output, actual_output)
@istest
def from_swh_edge_cases_do_no_conversion_if_none_or_not_bytes(self):
some_input = {
'a': 'something',
'b': None,
'c': 'someone',
'd': None,
}
expected_output = {
'a': 'something',
'b': None,
'c': 'someone',
'd': None,
}
actual_output = converters.from_swh(some_input,
hashess={'a', 'b'},
bytess={'c', 'd'})
self.assertEquals(expected_output, actual_output)
@istest
def from_swh_empty(self):
# when
self.assertEquals({}, converters.from_swh({}))
@istest
def from_swh_none(self):
# when
self.assertIsNone(converters.from_swh(None))
@istest
def from_origin(self):
# given
origin_input = {
'origin_type': 'ftp',
'origin_url': 'rsync://ftp.gnu.org/gnu/octave',
'branch': 'octave-3.4.0.tar.gz',
'revision': b'\xb0L\xaf\x10\xe9SQ`\xd9\x0e\x87KE\xaaBm\xe7b\xf1\x9f', # noqa
'path': b'octave-3.4.0/doc/interpreter/octave.html/doc_002dS_005fISREG.html' # noqa
}
expected_origin = {
'origin_type': 'ftp',
'origin_url': 'rsync://ftp.gnu.org/gnu/octave',
'branch': 'octave-3.4.0.tar.gz',
'revision': 'b04caf10e9535160d90e874b45aa426de762f19f',
'path': 'octave-3.4.0/doc/interpreter/octave.html/doc_002dS_005fISREG.html' # noqa
}
# when
actual_origin = converters.from_origin(origin_input)
# then
self.assertEqual(actual_origin, expected_origin)
@istest
def from_release(self):
release_input = {
'id': hashutil.hex_to_hash(
'aad23fa492a0c5fed0708a6703be875448c86884'),
'target': hashutil.hex_to_hash(
'5e46d564378afc44b31bb89f99d5675195fbdf67'),
'target_type': 'revision',
'date': datetime.datetime(2015, 1, 1, 22, 0, 0,
tzinfo=datetime.timezone.utc),
'author': {
'name': b'author name',
'email': b'author@email',
},
'name': b'v0.0.1',
'message': b'some comment on release',
'synthetic': True,
}
expected_release = {
'id': 'aad23fa492a0c5fed0708a6703be875448c86884',
'target': '5e46d564378afc44b31bb89f99d5675195fbdf67',
'target_type': 'revision',
'date': datetime.datetime(2015, 1, 1, 22, 0, 0,
tzinfo=datetime.timezone.utc),
'author': {
'name': 'author name',
'email': 'author@email',
},
'name': 'v0.0.1',
'message': 'some comment on release',
'target_type': 'revision',
'synthetic': True,
}
# when
actual_release = converters.from_release(release_input)
# then
self.assertEqual(actual_release, expected_release)
@istest
def from_release_no_revision(self):
release_input = {
'id': hashutil.hex_to_hash(
'b2171ee2bdf119cd99a7ec7eff32fa8013ef9a4e'),
'target': None,
'date': datetime.datetime(2016, 3, 2, 10, 0, 0,
tzinfo=datetime.timezone.utc),
'name': b'v0.1.1',
'message': b'comment on release',
'synthetic': False,
'author': {
'name': b'bob',
'email': b'bob@alice.net',
},
}
expected_release = {
'id': 'b2171ee2bdf119cd99a7ec7eff32fa8013ef9a4e',
'target': None,
'date': datetime.datetime(2016, 3, 2, 10, 0, 0,
tzinfo=datetime.timezone.utc),
'name': 'v0.1.1',
'message': 'comment on release',
'synthetic': False,
'author': {
'name': 'bob',
'email': 'bob@alice.net',
},
}
# when
actual_release = converters.from_release(release_input)
# then
self.assertEqual(actual_release, expected_release)
@istest
def from_revision(self):
revision_input = {
'id': hashutil.hex_to_hash(
'18d8be353ed3480476f032475e7c233eff7371d5'),
'directory': hashutil.hex_to_hash(
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'),
'author': {
'name': b'Software Heritage',
'email': b'robot@softwareheritage.org',
},
'committer': {
'name': b'Software Heritage',
'email': b'robot@softwareheritage.org',
},
'message': b'synthetic revision message',
'date': datetime.datetime(2000, 1, 17, 11, 23, 54, tzinfo=None),
'date_offset': 0,
'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54,
tzinfo=None),
'committer_date_offset': 0,
'synthetic': True,
'type': 'tar',
'parents': [
hashutil.hex_to_hash(
'29d8be353ed3480476f032475e7c244eff7371d5'),
hashutil.hex_to_hash(
'30d8be353ed3480476f032475e7c244eff7371d5')
],
'metadata': {
'original_artifact': [{
'archive_type': 'tar',
'name': 'webbase-5.7.0.tar.gz',
'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd',
'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1',
'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f'
'309d36484e7edf7bb912',
}]
},
}
expected_revision = {
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'author': {
'name': 'Software Heritage',
'email': 'robot@softwareheritage.org',
},
'committer': {
'name': 'Software Heritage',
'email': 'robot@softwareheritage.org',
},
'message': 'synthetic revision message',
'date': datetime.datetime(2000, 1, 17, 11, 23, 54, tzinfo=None),
'date_offset': 0,
'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54,
tzinfo=None),
'committer_date_offset': 0,
'parents': [
'29d8be353ed3480476f032475e7c244eff7371d5',
'30d8be353ed3480476f032475e7c244eff7371d5'
],
'type': 'tar',
'synthetic': True,
'metadata': {
'original_artifact': [{
'archive_type': 'tar',
'name': 'webbase-5.7.0.tar.gz',
'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd',
'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1',
'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f'
'309d36484e7edf7bb912'
}]
},
}
# when
actual_revision = converters.from_revision(revision_input)
# then
self.assertEqual(actual_revision, expected_revision)
@istest
def from_content(self):
content_input = {
'sha1': hashutil.hex_to_hash('5c6f0e2750f48fa0bd0c4cf5976ba0b9e0'
'2ebda5'),
'sha256': hashutil.hex_to_hash('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926'),
'sha1_git': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'data': b'data in bytes',
'length': 10,
'status': 'visible',
}
# 'status' is filtered
expected_content = {
'sha1': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5',
'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d274'
'7d3bf96c926',
'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
- 'data': 'data in bytes',
+ 'data': b'data in bytes',
'length': 10,
}
# when
actual_content = converters.from_content(content_input)
# then
self.assertEqual(actual_content, expected_content)
@istest
def from_person(self):
person_input = {
'id': 10,
'anything': 'else',
'name': b'bob',
'email': b'bob@foo.alice',
}
expected_person = {
'id': 10,
'anything': 'else',
'name': 'bob',
'email': 'bob@foo.alice',
}
# when
actual_person = converters.from_person(person_input)
# then
self.assertEqual(actual_person, expected_person)
@istest
def from_directory_entries(self):
dir_entries_input = {
'sha1': hashutil.hex_to_hash('5c6f0e2750f48fa0bd0c4cf5976ba0b9e0'
'2ebda5'),
'sha256': hashutil.hex_to_hash('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926'),
'sha1_git': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'target': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'dir_id': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'name': b'bob',
'type': 10,
}
expected_dir_entries = {
'sha1': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5',
'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d2747'
'd3bf96c926',
'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'target': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'dir_id': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'name': 'bob',
'type': 10,
}
# when
actual_dir_entries = converters.from_directory_entry(dir_entries_input)
# then
self.assertEqual(actual_dir_entries, expected_dir_entries)
diff --git a/swh/web/ui/tests/test_renderers.py b/swh/web/ui/tests/test_renderers.py
index 72075366e..3239c840e 100644
--- a/swh/web/ui/tests/test_renderers.py
+++ b/swh/web/ui/tests/test_renderers.py
@@ -1,190 +1,215 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
import unittest
import yaml
from flask_api.mediatypes import MediaType
from nose.tools import istest
from unittest.mock import patch
from swh.web.ui import renderers
class RendererTestCase(unittest.TestCase):
@patch('swh.web.ui.renderers.request')
@istest
def SWHFilterRenderer_do_nothing(self, mock_request):
# given
mock_request.args = {}
swhFilterRenderer = renderers.SWHFilterEnricher()
input_data = {'a': 'some-data'}
# when
actual_data = swhFilterRenderer.filter_by_fields(input_data)
# then
self.assertEquals(actual_data, input_data)
@patch('swh.web.ui.renderers.utils')
@patch('swh.web.ui.renderers.request')
@istest
def SWHFilterRenderer_do_filter(self, mock_request, mock_utils):
# given
mock_request.args = {'fields': 'a,c'}
mock_utils.filter_field_keys.return_value = {'a': 'some-data'}
swhFilterRenderer = renderers.SWHFilterEnricher()
input_data = {'a': 'some-data',
'b': 'some-other-data'}
# when
actual_data = swhFilterRenderer.filter_by_fields(input_data)
# then
self.assertEquals(actual_data, {'a': 'some-data'})
mock_utils.filter_field_keys.assert_called_once_with(input_data,
{'a', 'c'})
+ @istest
+ def doNothingRenderer(self):
+ # given
+ doNothingRenderer = renderers.DoNothingRenderer()
+ input_data = 'some data'
+
+ # when
+ actual_data = doNothingRenderer.render(input_data, 'some-media-type')
+
+ # then
+ self.assertEqual(actual_data, input_data) # do nothing on data
+
@istest
def plainRenderer(self):
# given
plainRenderer = renderers.PlainRenderer()
input_data = 'some data'
# when
actual_data = plainRenderer.render(input_data, 'some-media-type')
# then
self.assertEqual(actual_data, input_data) # do nothing on data
+ @istest
+ def bytesRenderer(self):
+ # given
+ bytesRenderer = renderers.BytesRenderer()
+ input_data = b'some data'
+
+ # when
+ actual_data = bytesRenderer.render(input_data, 'some-media-type')
+
+ # then
+ self.assertEqual('application/octet-stream', bytesRenderer.media_type)
+ self.assertEqual(actual_data, input_data) # do nothing on data
+
@patch('swh.web.ui.renderers.request')
@istest
def yamlRenderer_without_filter(self, mock_request):
# given
mock_request.args = {}
yamlRenderer = renderers.YAMLRenderer()
input_data = {'target': 'sha1-dir',
'type': 'dir',
'dir-id': 'dir-id-sha1-git'}
expected_data = input_data
# when
actual_data = yamlRenderer.render(input_data, 'application/yaml')
# then
self.assertEqual(yaml.load(actual_data), expected_data)
@patch('swh.web.ui.renderers.request')
@istest
def yamlRenderer(self, mock_request):
# given
mock_request.args = {'fields': 'type,target'}
yamlRenderer = renderers.YAMLRenderer()
input_data = {'target': 'sha1-dir',
'type': 'dir',
'dir-id': 'dir-id-sha1-git'}
expected_data = {'target': 'sha1-dir', 'type': 'dir'}
# when
actual_data = yamlRenderer.render(input_data, 'application/yaml')
# then
self.assertEqual(yaml.load(actual_data), expected_data)
@patch('swh.web.ui.renderers.request')
@istest
def jsonRenderer_basic(self, mock_request):
# given
mock_request.args = {}
jsonRenderer = renderers.SWHJSONRenderer()
input_data = {'target': 'sha1-dir',
'type': 'dir',
'dir-id': 'dir-id-sha1-git'}
expected_data = input_data
# when
actual_data = jsonRenderer.render(input_data, MediaType(
'application/json'))
# then
self.assertEqual(json.loads(actual_data), expected_data)
@patch('swh.web.ui.renderers.request')
@istest
def jsonRenderer_basic_with_filter(self, mock_request):
# given
mock_request.args = {'fields': 'target'}
jsonRenderer = renderers.SWHJSONRenderer()
input_data = {'target': 'sha1-dir',
'type': 'dir',
'dir-id': 'dir-id-sha1-git'}
expected_data = {'target': 'sha1-dir'}
# when
actual_data = jsonRenderer.render(input_data, MediaType(
'application/json'))
# then
self.assertEqual(json.loads(actual_data), expected_data)
@patch('swh.web.ui.renderers.request')
@istest
def jsonRenderer_basic_with_filter_and_jsonp(self, mock_request):
# given
mock_request.args = {'fields': 'target',
'callback': 'jsonpfn'}
jsonRenderer = renderers.SWHJSONRenderer()
input_data = {'target': 'sha1-dir',
'type': 'dir',
'dir-id': 'dir-id-sha1-git'}
# when
actual_data = jsonRenderer.render(input_data, MediaType(
'application/json'))
# then
self.assertEqual(actual_data, 'jsonpfn({"target": "sha1-dir"})')
@patch('swh.web.ui.renderers.request')
@istest
def jsonpEnricher_basic_with_filter_and_jsonp(self, mock_request):
# given
mock_request.args = {'callback': 'jsonpfn'}
jsonpEnricher = renderers.JSONPEnricher()
# when
actual_output = jsonpEnricher.enrich_with_jsonp({'output': 'test'})
# then
self.assertEqual(actual_output, "jsonpfn({'output': 'test'})")
@patch('swh.web.ui.renderers.request')
@istest
def jsonpEnricher_do_nothing(self, mock_request):
# given
mock_request.args = {}
jsonpEnricher = renderers.JSONPEnricher()
# when
actual_output = jsonpEnricher.enrich_with_jsonp({'output': 'test'})
# then
self.assertEqual(actual_output, {'output': 'test'})
diff --git a/swh/web/ui/tests/test_service.py b/swh/web/ui/tests/test_service.py
index fffddbbcc..ae089bef8 100644
--- a/swh/web/ui/tests/test_service.py
+++ b/swh/web/ui/tests/test_service.py
@@ -1,545 +1,545 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
from nose.tools import istest
from unittest.mock import MagicMock, patch
from swh.core.hashutil import hex_to_hash
from swh.web.ui import service
from swh.web.ui.exc import BadInputExc
from swh.web.ui.tests import test_app
class ServiceTestCase(test_app.SWHApiTestCase):
@istest
def lookup_hash_does_not_exist(self):
# given
self.storage.content_find = MagicMock(return_value=None)
# when
actual_lookup = service.lookup_hash(
'sha1_git:123caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEquals({'found': None,
'algo': 'sha1_git'}, actual_lookup)
# check the function has been called with parameters
self.storage.content_find.assert_called_with({
'sha1_git':
hex_to_hash('123caf10e9535160d90e874b45aa426de762f19f')
})
@istest
def lookup_hash_exist(self):
# given
stub_content = {
'sha1': hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')
}
self.storage.content_find = MagicMock(return_value=stub_content)
# when
actual_lookup = service.lookup_hash(
'sha1:456caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEquals({'found': stub_content,
'algo': 'sha1'}, actual_lookup)
self.storage.content_find.assert_called_with({
'sha1':
hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f'),
})
@istest
def lookup_hash_origin(self):
# given
self.storage.content_find_occurrence = MagicMock(return_value={
'origin_type': 'sftp',
'origin_url': 'sftp://ftp.gnu.org/gnu/octave',
'branch': 'octavio-3.4.0.tar.gz',
'revision': b'\xb0L\xaf\x10\xe9SQ`\xd9\x0e\x87KE\xaaBm\xe7b\xf1\x9f', # noqa
'path': b'octavio-3.4.0/doc/interpreter/octave.html/doc_002dS_005fISREG.html' # noqa
})
expected_origin = {
'origin_type': 'sftp',
'origin_url': 'sftp://ftp.gnu.org/gnu/octave',
'branch': 'octavio-3.4.0.tar.gz',
'revision': 'b04caf10e9535160d90e874b45aa426de762f19f',
'path': 'octavio-3.4.0/doc/interpreter/octave.html/doc'
'_002dS_005fISREG.html'
}
# when
actual_origin = service.lookup_hash_origin(
'sha1_git:456caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEqual(actual_origin, expected_origin)
self.storage.content_find_occurrence.assert_called_with(
{'sha1_git':
hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')})
@istest
def stat_counters(self):
# given
input_stats = {
"content": 1770830,
"directory": 211683,
"directory_entry_dir": 209167,
"directory_entry_file": 1807094,
"directory_entry_rev": 0,
"entity": 0,
"entity_history": 0,
"occurrence": 0,
"occurrence_history": 19600,
"origin": 1096,
"person": 0,
"release": 8584,
"revision": 7792,
"revision_history": 0,
"skipped_content": 0
}
self.storage.stat_counters = MagicMock(return_value=input_stats)
# when
actual_stats = service.stat_counters()
# then
expected_stats = input_stats
self.assertEqual(actual_stats, expected_stats)
self.storage.stat_counters.assert_called_with()
@patch('swh.web.ui.service.hashutil')
@istest
def hash_and_search(self, mock_hashutil):
# given
bhash = hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')
mock_hashutil.hashfile.return_value = {'sha1': bhash}
self.storage.content_find = MagicMock(return_value={
'sha1': bhash,
'sha1_git': bhash,
})
# when
actual_content = service.hash_and_search('/some/path')
# then
self.assertEqual(actual_content, {
'sha1': '456caf10e9535160d90e874b45aa426de762f19f',
'sha1_git': '456caf10e9535160d90e874b45aa426de762f19f',
'found': True,
})
mock_hashutil.hashfile.assert_called_once_with('/some/path')
self.storage.content_find.assert_called_once_with({'sha1': bhash})
@patch('swh.web.ui.service.hashutil')
@istest
def hash_and_search_not_found(self, mock_hashutil):
# given
bhash = hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')
mock_hashutil.hashfile.return_value = {'sha1': bhash}
mock_hashutil.hash_to_hex = MagicMock(
return_value='456caf10e9535160d90e874b45aa426de762f19f')
self.storage.content_find = MagicMock(return_value=None)
# when
actual_content = service.hash_and_search('/some/path')
# then
self.assertEqual(actual_content, {
'sha1': '456caf10e9535160d90e874b45aa426de762f19f',
'found': False,
})
mock_hashutil.hashfile.assert_called_once_with('/some/path')
self.storage.content_find.assert_called_once_with({'sha1': bhash})
mock_hashutil.hash_to_hex.assert_called_once_with(bhash)
@patch('swh.web.ui.service.upload')
@istest
def test_upload_and_search(self, mock_upload):
mock_upload.save_in_upload_folder.return_value = (
'/tmp/dir', 'some-filename', '/tmp/dir/path/some-filename')
service.hash_and_search = MagicMock(side_effect=lambda filepath:
{'sha1': 'blah',
'found': True})
mock_upload.cleanup.return_value = None
file = MagicMock(filename='some-filename')
# when
actual_res = service.upload_and_search(file)
# then
self.assertEqual(actual_res, {
'filename': 'some-filename',
'sha1': 'blah',
'found': True})
mock_upload.save_in_upload_folder.assert_called_with(file)
mock_upload.cleanup.assert_called_with('/tmp/dir')
service.hash_and_search.assert_called_once_with(
'/tmp/dir/path/some-filename')
@istest
def lookup_origin(self):
# given
self.storage.origin_get = MagicMock(return_value={
'id': 'origin-id',
'lister': 'uuid-lister',
'project': 'uuid-project',
'url': 'ftp://some/url/to/origin',
'type': 'ftp'})
# when
actual_origin = service.lookup_origin('origin-id')
# then
self.assertEqual(actual_origin, {'id': 'origin-id',
'lister': 'uuid-lister',
'project': 'uuid-project',
'url': 'ftp://some/url/to/origin',
'type': 'ftp'})
self.storage.origin_get.assert_called_with({'id': 'origin-id'})
@istest
def lookup_release(self):
# given
self.storage.release_get = MagicMock(return_value=[{
'id': hex_to_hash('65a55bbdf3629f916219feb3dcc7393ded1bc8db'),
'target': None,
'date': datetime.datetime(2015, 1, 1, 22, 0, 0,
tzinfo=datetime.timezone.utc),
'name': b'v0.0.1',
'message': b'synthetic release',
'synthetic': True,
}])
# when
actual_release = service.lookup_release(
'65a55bbdf3629f916219feb3dcc7393ded1bc8db')
# then
self.assertEqual(actual_release, {
'id': '65a55bbdf3629f916219feb3dcc7393ded1bc8db',
'target': None,
'date': datetime.datetime(2015, 1, 1, 22, 0, 0,
tzinfo=datetime.timezone.utc),
'name': 'v0.0.1',
'message': 'synthetic release',
'synthetic': True,
})
self.storage.release_get.assert_called_with(
[hex_to_hash('65a55bbdf3629f916219feb3dcc7393ded1bc8db')])
@istest
def lookup_release_ko_id_checksum_not_ok_because_not_a_sha1(self):
# given
self.storage.release_get = MagicMock()
with self.assertRaises(BadInputExc) as cm:
# when
service.lookup_release('not-a-sha1')
self.assertIn('invalid checksum', cm.exception.args[0])
self.storage.release_get.called = False
@istest
def lookup_release_ko_id_checksum_ok_but_not_a_sha1(self):
# given
self.storage.release_get = MagicMock()
# when
with self.assertRaises(BadInputExc) as cm:
service.lookup_release(
'13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f62d4daf5'
'1aea892abe')
self.assertIn('sha1_git supported', cm.exception.args[0])
self.storage.release_get.called = False
@istest
def lookup_revision(self):
# given
self.storage.revision_get = MagicMock(return_value=[{
'id': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'),
'directory': hex_to_hash(
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'),
'author': {
'name': b'bill & boule',
'email': b'bill@boule.org',
},
'committer': {
'name': b'boule & bill',
'email': b'boule@bill.org',
},
'message': b'elegant fix for bug 31415957',
'date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'date_offset': 0,
'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
}])
# when
actual_revision = service.lookup_revision(
'18d8be353ed3480476f032475e7c233eff7371d5')
# then
self.assertEqual(actual_revision, {
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'author': {
'name': 'bill & boule',
'email': 'bill@boule.org',
},
'committer': {
'name': 'boule & bill',
'email': 'boule@bill.org',
},
'message': 'elegant fix for bug 31415957',
'date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'date_offset': 0,
'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
})
self.storage.revision_get.assert_called_with(
[hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5')])
@istest
def lookup_content_raw_not_found(self):
# given
self.storage.content_find = MagicMock(return_value=None)
# when
actual_content = service.lookup_content_raw(
'sha1:18d8be353ed3480476f032475e7c233eff7371d5')
# then
self.assertIsNone(actual_content)
self.storage.content_find.assert_called_with(
{'sha1': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5')})
@istest
def lookup_content_raw(self):
# given
self.storage.content_find = MagicMock(return_value={
'sha1': '18d8be353ed3480476f032475e7c233eff7371d5',
})
self.storage.content_get = MagicMock(return_value=[{
'data': b'binary data',
}, {}])
# when
actual_content = service.lookup_content_raw(
'sha256:39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926')
# then
- self.assertEquals(actual_content, {'data': 'binary data'})
+ self.assertEquals(actual_content, {'data': b'binary data'})
self.storage.content_find.assert_called_once_with(
{'sha256': hex_to_hash('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926')})
self.storage.content_get.assert_called_once_with(
['18d8be353ed3480476f032475e7c233eff7371d5'])
@istest
def lookup_content_not_found(self):
# given
self.storage.content_find = MagicMock(return_value=None)
# when
actual_content = service.lookup_content(
'sha1:18d8be353ed3480476f032475e7c233eff7371d5')
# then
self.assertIsNone(actual_content)
self.storage.content_find.assert_called_with(
{'sha1': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5')})
@istest
def lookup_content_with_sha1(self):
# given
self.storage.content_find = MagicMock(return_value={
'sha1': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'),
'sha256': hex_to_hash('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926'),
'sha1_git': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'length': 190,
'status': 'absent',
})
# when
actual_content = service.lookup_content(
'sha1:18d8be353ed3480476f032475e7c233eff7371d5')
# then
self.assertEqual(actual_content, {
'sha1': '18d8be353ed3480476f032475e7c233eff7371d5',
'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d274'
'7d3bf96c926',
'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'length': 190,
})
self.storage.content_find.assert_called_with(
{'sha1': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5')})
@istest
def lookup_content_with_sha256(self):
# given
self.storage.content_find = MagicMock(return_value={
'sha1': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'),
'sha256': hex_to_hash('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926'),
'sha1_git': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'length': 360,
'status': 'visible',
})
# when
actual_content = service.lookup_content(
'sha256:39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926')
# then
self.assertEqual(actual_content, {
'sha1': '18d8be353ed3480476f032475e7c233eff7371d5',
'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d274'
'7d3bf96c926',
'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'length': 360,
})
self.storage.content_find.assert_called_with(
{'sha256': hex_to_hash('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926')})
@istest
def lookup_person(self):
# given
self.storage.person_get = MagicMock(return_value=[{
'id': 'person_id',
'name': b'some_name',
'email': b'some-email',
}])
# when
actual_person = service.lookup_person('person_id')
# then
self.assertEqual(actual_person, {
'id': 'person_id',
'name': 'some_name',
'email': 'some-email',
})
self.storage.person_get.assert_called_with(['person_id'])
@istest
def lookup_person_not_found(self):
# given
self.storage.person_get = MagicMock(return_value=[])
# when
actual_person = service.lookup_person('person_id')
# then
self.assertIsNone(actual_person)
self.storage.person_get.assert_called_with(['person_id'])
@istest
def lookup_directory_bad_checksum(self):
# given
self.storage.directory_get = MagicMock()
# when
with self.assertRaises(BadInputExc):
service.lookup_directory('directory_id')
# then
self.storage.directory_get.called = False
@istest
def lookup_directory_not_found(self):
# given
self.storage.directory_get = MagicMock(return_value=[])
# when
actual_directory = service.lookup_directory(
'40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
# then
self.assertIsNone(actual_directory)
self.storage.directory_get.assert_called_with(
hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'))
@istest
def lookup_directory(self):
# given
dir_entries_input = {
'sha1': hex_to_hash('5c6f0e2750f48fa0bd0c4cf5976ba0b9e0'
'2ebda5'),
'sha256': hex_to_hash('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926'),
'sha1_git': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'target': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'dir_id': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'name': b'bob',
'type': 10,
}
expected_dir_entries = {
'sha1': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5',
'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d2747'
'd3bf96c926',
'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'target': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'dir_id': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'name': 'bob',
'type': 10,
}
self.storage.directory_get = MagicMock(
return_value=[dir_entries_input])
# when
actual_directory = service.lookup_directory(
'40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
# then
self.assertIsNotNone(actual_directory)
self.assertEqual(actual_directory, [expected_dir_entries])
self.storage.directory_get.assert_called_with(
hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'))
diff --git a/swh/web/ui/tests/test_views.py b/swh/web/ui/tests/test_views.py
index 0cd69f061..cb5e92e60 100644
--- a/swh/web/ui/tests/test_views.py
+++ b/swh/web/ui/tests/test_views.py
@@ -1,388 +1,388 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from nose.tools import istest
from swh.web.ui.tests import test_app
from unittest.mock import patch, MagicMock
from swh.web.ui.exc import BadInputExc
class ViewTestCase(test_app.SWHViewTestCase):
render_template = False
@istest
def info(self):
# when
rv = self.client.get('/about')
self.assertEquals(rv.status_code, 200)
self.assert_template_used('about.html')
self.assertIn(b'About', rv.data)
@istest
def search_default(self):
# when
rv = self.client.get('/search')
self.assertEquals(rv.status_code, 200)
self.assertEqual(self.get_context_variable('q'), '')
self.assertEqual(self.get_context_variable('message'), '')
self.assert_template_used('search.html')
@patch('swh.web.ui.views.service')
@istest
def search_content_found(self, mock_service):
# given
mock_service.lookup_hash.return_value = {
'found': True,
'algo': 'sha1'
}
# when
rv = self.client.get('/search?q=sha1:123')
self.assertEquals(rv.status_code, 200)
self.assert_template_used('search.html')
self.assertEqual(self.get_context_variable('q'), 'sha1:123')
self.assertEqual(self.get_context_variable('message'),
'Content with hash sha1:123 found!')
mock_service.lookup_hash.assert_called_once_with('sha1:123')
@patch('swh.web.ui.views.service')
@istest
def search_content_not_found(self, mock_service):
# given
mock_service.lookup_hash.return_value = {
'found': False,
'algo': 'sha1'
}
# when
rv = self.client.get('/search?q=sha1:456')
self.assertEquals(rv.status_code, 200)
self.assert_template_used('search.html')
self.assertEqual(self.get_context_variable('q'), 'sha1:456')
self.assertEqual(self.get_context_variable('message'),
'Content with hash sha1:456 not found!')
mock_service.lookup_hash.assert_called_once_with('sha1:456')
@patch('swh.web.ui.views.service')
@istest
def search_content_invalid_query(self, mock_service):
# given
mock_service.lookup_hash = MagicMock(
side_effect=BadInputExc('Invalid query!')
)
# when
rv = self.client.get('/search?q=sha1:invalid-hash')
self.assertEquals(rv.status_code, 200)
self.assert_template_used('search.html')
self.assertEqual(self.get_context_variable('q'), 'sha1:invalid-hash')
self.assertEqual(self.get_context_variable('message'),
'Invalid query!')
mock_service.lookup_hash.assert_called_once_with('sha1:invalid-hash')
@patch('swh.web.ui.views.service')
@istest
def show_content(self, mock_service):
# given
stub_content_raw = {
'sha1': 'sha1-hash',
- 'data': 'some-data'
+ 'data': b'some-data'
}
mock_service.lookup_content_raw.return_value = stub_content_raw
# when
rv = self.client.get('/browse/content/sha1:sha1-hash/raw')
self.assertEquals(rv.status_code, 200)
self.assert_template_used('display_content.html')
self.assertEqual(self.get_context_variable('message'),
'Content sha1-hash')
self.assertEqual(self.get_context_variable('content'),
stub_content_raw)
mock_service.lookup_content_raw.assert_called_once_with(
'sha1:sha1-hash')
@patch('swh.web.ui.views.service')
@istest
def show_content_not_found(self, mock_service):
# given
mock_service.lookup_content_raw.return_value = None
# when
rv = self.client.get('/browse/content/sha1:sha1-unknown/raw')
self.assertEquals(rv.status_code, 200)
self.assert_template_used('display_content.html')
self.assertEqual(self.get_context_variable('message'),
'Content with sha1:sha1-unknown not found.')
self.assertEqual(self.get_context_variable('content'), None)
mock_service.lookup_content_raw.assert_called_once_with(
'sha1:sha1-unknown')
@patch('swh.web.ui.views.service')
@istest
def show_content_invalid_hash(self, mock_service):
# given
mock_service.lookup_content_raw.side_effect = BadInputExc(
'Invalid hash')
# when
rv = self.client.get('/browse/content/sha2:sha1-invalid/raw')
self.assertEquals(rv.status_code, 200)
self.assert_template_used('display_content.html')
self.assertEqual(self.get_context_variable('message'),
'Invalid hash')
self.assertEqual(self.get_context_variable('content'), None)
mock_service.lookup_content_raw.assert_called_once_with(
'sha2:sha1-invalid')
@patch('swh.web.ui.views.service')
@patch('swh.web.ui.utils')
@istest
def browse_directory_bad_input(self, mock_utils, mock_service):
# given
mock_service.lookup_directory.side_effect = BadInputExc('Invalid hash')
# when
rv = self.client.get('/browse/directory/sha2-invalid')
# then
self.assertEquals(rv.status_code, 200)
self.assert_template_used('directory.html')
self.assertEqual(self.get_context_variable('message'),
'Invalid hash')
self.assertEqual(self.get_context_variable('files'), [])
mock_service.lookup_directory.assert_called_once_with(
'sha2-invalid')
@patch('swh.web.ui.views.service')
@patch('swh.web.ui.utils')
@istest
def browse_directory_empty_result(self, mock_utils, mock_service):
# given
mock_service.lookup_directory.return_value = None
# when
rv = self.client.get('/browse/directory/some-sha1')
# then
self.assertEquals(rv.status_code, 200)
self.assert_template_used('directory.html')
self.assertEqual(self.get_context_variable('message'),
'Directory some-sha1 not found.')
self.assertEqual(self.get_context_variable('files'), [])
mock_service.lookup_directory.assert_called_once_with(
'some-sha1')
@patch('swh.web.ui.views.service')
@patch('swh.web.ui.views.utils')
@istest
def browse_directory(self, mock_utils, mock_service):
# given
stub_directory_ls = [
{'type': 'dir',
'target': '123',
'name': 'some-dir-name'},
{'type': 'file',
'sha1': '654',
'name': 'some-filename'},
{'type': 'dir',
'target': '987',
'name': 'some-other-dirname'}
]
mock_service.lookup_directory.return_value = stub_directory_ls
stub_directory_map = [
{'link': '/path/to/url/dir/123',
'name': 'some-dir-name'},
{'link': '/path/to/url/file/654',
'name': 'some-filename'},
{'link': '/path/to/url/dir/987',
'name': 'some-other-dirname'}
]
mock_utils.prepare_directory_listing.return_value = stub_directory_map
# when
rv = self.client.get('/browse/directory/some-sha1')
# then
print(self.templates)
self.assertEquals(rv.status_code, 200)
self.assert_template_used('directory.html')
self.assertEqual(self.get_context_variable('message'),
'Listing for directory some-sha1:')
self.assertEqual(self.get_context_variable('files'),
stub_directory_map)
mock_service.lookup_directory.assert_called_once_with(
'some-sha1')
mock_utils.prepare_directory_listing.assert_called_once_with(
stub_directory_ls)
@patch('swh.web.ui.views.service')
@istest
def content_with_origin_content_not_found(self, mock_service):
# given
mock_service.lookup_hash.return_value = {'found': False}
# when
rv = self.client.get('/browse/content/sha256:some-sha256')
# then
self.assertEquals(rv.status_code, 200)
self.assert_template_used('content.html')
self.assertEqual(self.get_context_variable('message'),
'Hash sha256:some-sha256 was not found.')
mock_service.lookup_hash.assert_called_once_with(
'sha256:some-sha256')
mock_service.lookup_hash_origin.called = False
@patch('swh.web.ui.views.service')
@istest
def content_with_origin_bad_input(self, mock_service):
# given
mock_service.lookup_hash.side_effect = BadInputExc('Invalid hash')
# when
rv = self.client.get('/browse/content/sha256:some-sha256')
# then
self.assertEquals(rv.status_code, 200)
self.assert_template_used('content.html')
self.assertEqual(
self.get_context_variable('message'), 'Invalid hash')
mock_service.lookup_hash.assert_called_once_with(
'sha256:some-sha256')
mock_service.lookup_hash_origin.called = False
@patch('swh.web.ui.views.service')
@istest
def content_with_origin(self, mock_service):
# given
mock_service.lookup_hash.return_value = {'found': True}
mock_service.lookup_hash_origin.return_value = {
'origin_type': 'ftp',
'origin_url': '/some/url',
'revision': 'revision-hash',
'branch': 'master',
'path': '/path/to',
}
# when
rv = self.client.get('/browse/content/sha256:some-sha256')
# then
self.assertEquals(rv.status_code, 200)
self.assert_template_used('content.html')
self.assertEqual(
self.get_context_variable('message'),
"The content with hash sha256:some-sha256 has been seen on " +
"origin with type 'ftp'\n" +
"at url '/some/url'. The revision was identified at " +
"'revision-hash' on branch 'master'.\n" +
"The file's path referenced was '/path/to'.")
mock_service.lookup_hash.assert_called_once_with(
'sha256:some-sha256')
mock_service.lookup_hash_origin.assert_called_once_with(
'sha256:some-sha256')
@istest
def uploadnsearch_get(self):
# when
rv = self.client.get('/uploadnsearch')
# then
self.assertEquals(rv.status_code, 200)
self.assertEqual(self.get_context_variable('message'), '')
self.assertEqual(self.get_context_variable('filename'), None)
self.assertEqual(self.get_context_variable('found'), None)
self.assert_template_used('upload_and_search.html')
@patch('swh.web.ui.views.service')
@patch('swh.web.ui.views.request')
@istest
def uploadnsearch_post_not_found(self, mock_request, mock_service):
# given
mock_request.method = 'POST'
mock_request.files = dict(filename='foobar')
mock_service.upload_and_search.return_value = {'filename': 'foobar',
'sha1': 'blahhash',
'found': False}
# when
# the mock mock_request completes the post request
rv = self.client.post('/uploadnsearch')
# then
self.assertEquals(rv.status_code, 200)
self.assertEqual(self.get_context_variable('message'),
'The file foobar with hash blahhash has not been'
' found.')
self.assertEqual(self.get_context_variable('filename'), 'foobar')
self.assertEqual(self.get_context_variable('found'), False)
self.assertEqual(self.get_context_variable('sha1'), 'blahhash')
self.assert_template_used('upload_and_search.html')
mock_service.upload_and_search.called = True
@patch('swh.web.ui.views.service')
@patch('swh.web.ui.views.request')
@istest
def uploadnsearch_post_found(self, mock_request, mock_service):
# given
mock_request.method = 'POST'
mock_request.files = dict(filename='foobar')
mock_service.upload_and_search.return_value = {'filename': 'foobar',
'sha1': 'hash-blah',
'found': True}
# when
# the mock mock_request completes the post request
rv = self.client.post('/uploadnsearch')
# then
self.assertEquals(rv.status_code, 200)
self.assertEqual(self.get_context_variable('message'),
'The file foobar with hash hash-blah has been found.')
self.assertEqual(self.get_context_variable('filename'), 'foobar')
self.assertEqual(self.get_context_variable('found'), True)
self.assertEqual(self.get_context_variable('sha1'), 'hash-blah')
self.assert_template_used('upload_and_search.html')
mock_service.upload_and_search.called = True
@patch('swh.web.ui.views.service')
@patch('swh.web.ui.views.request')
@istest
def uploadnsearch_post_bad_input(self, mock_request, mock_service):
# given
mock_request.method = 'POST'
mock_request.files = dict(filename='foobar')
mock_service.upload_and_search.side_effect = BadInputExc(
'Invalid hash')
# when
# the mock mock_request completes the post request
rv = self.client.post('/uploadnsearch')
# then
self.assertEquals(rv.status_code, 200)
self.assertEqual(self.get_context_variable('message'), 'Invalid hash')
self.assert_template_used('upload_and_search.html')
mock_service.upload_and_search.called = True
diff --git a/swh/web/ui/views.py b/swh/web/ui/views.py
index b9c11ab7c..547649300 100644
--- a/swh/web/ui/views.py
+++ b/swh/web/ui/views.py
@@ -1,216 +1,218 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from flask import render_template, flash, request
from flask.ext.api.decorators import set_renderers
from flask.ext.api.renderers import HTMLRenderer
from swh.core.hashutil import ALGORITHMS
from swh.web.ui import service, utils
from swh.web.ui.exc import BadInputExc
from swh.web.ui.main import app
hash_filter_keys = ALGORITHMS
@app.route('/')
@set_renderers(HTMLRenderer)
def homepage():
"""Home page
"""
flash('This Web app is still work in progress, use at your own risk',
'warning')
# return redirect(url_for('about'))
return render_template('home.html')
@app.route('/about')
@set_renderers(HTMLRenderer)
def about():
return render_template('about.html')
@app.route('/search')
@set_renderers(HTMLRenderer)
def search():
"""Search for hashes in swh-storage.
"""
q = request.args.get('q', '')
env = {'q': q, 'message': ''}
try:
if q:
r = service.lookup_hash(q)
env['message'] = 'Content with hash %s%sfound!' % (
q,
' ' if r['found'] == True else ' not '
)
except BadInputExc as e:
env['message'] = str(e)
return render_template('search.html', **env)
@app.route('/uploadnsearch', methods=['GET', 'POST'])
@set_renderers(HTMLRenderer)
def uploadnsearch():
"""Upload and search for hashes in swh-storage.
"""
env = {'filename': None, 'message': '', 'found': None}
if request.method == 'POST':
file = request.files['filename']
try:
uploaded_content = service.upload_and_search(file)
filename = uploaded_content['filename']
sha1 = uploaded_content['sha1']
found = uploaded_content['found']
message = 'The file %s with hash %s has%sbeen found.' % (
filename,
sha1,
' ' if found else ' not ')
env.update({
'filename': filename,
'sha1': sha1,
'found': found,
'message': message
})
except BadInputExc as e:
env['message'] = str(e)
return render_template('upload_and_search.html', **env)
def _origin_seen(q, data):
"""Given an origin, compute a message string with the right information.
Args:
origin: a dictionary with keys:
- origin: a dictionary with type and url keys
- occurrence: a dictionary with a validity range
Returns:
Message as a string
"""
origin_type = data['origin_type']
origin_url = data['origin_url']
revision = data['revision']
branch = data['branch']
path = data['path']
return """The content with hash %s has been seen on origin with type '%s'
at url '%s'. The revision was identified at '%s' on branch '%s'.
The file's path referenced was '%s'.""" % (q,
origin_type,
origin_url,
revision,
branch,
path)
@app.route('/browse/content/<string:q>')
@set_renderers(HTMLRenderer)
def content_with_origin(q):
"""Show content information.
Args:
- q: query string of the form <algo_hash:hash> with
`algo_hash` in sha1, sha1_git, sha256.
This means that several different URLs (at least one per
HASH_ALGO) will point to the same content sha: the sha with
'hash' format
Returns:
The content's information at for a given checksum.
"""
env = {'q': q}
try:
content = service.lookup_hash(q)
if not content.get('found'):
message = "Hash %s was not found." % q
else:
origin = service.lookup_hash_origin(q)
message = _origin_seen(q, origin)
except BadInputExc as e: # do not like it but do not duplicate code
message = str(e)
env['message'] = message
return render_template('content.html', **env)
@app.route('/browse/content/<string:q>/raw')
@set_renderers(HTMLRenderer)
def show_content(q):
"""Given a hash and a checksum, display the content's raw data.
Args:
q is of the form algo_hash:hash with algo_hash in
(sha1, sha1_git, sha256)
Returns:
Information on one possible origin for such content.
Raises:
BadInputExc in case of unknown algo_hash or bad hash
NotFoundExc if the content is not found.
"""
env = {}
try:
content = service.lookup_content_raw(q)
if content:
+ # FIXME: will break if not utf-8
+ content['data'] = content['data'].decode('utf-8')
message = 'Content %s' % content['sha1']
else:
message = 'Content with %s not found.' % q
except BadInputExc as e:
message = str(e)
content = None
env['message'] = message
env['content'] = content
return render_template('display_content.html', **env)
@app.route('/browse/directory/<string:sha1_git>')
@set_renderers(HTMLRenderer)
def browse_directory(sha1_git):
"""Show directory information.
Args:
- sha1_git: the directory's sha1 git identifier.
Returns:
The content's information at sha1_git
"""
env = {'sha1_git': sha1_git}
try:
directory_files = service.lookup_directory(sha1_git)
if directory_files:
message = "Listing for directory %s:" % sha1_git
files = utils.prepare_directory_listing(directory_files)
else:
message = "Directory %s not found." % sha1_git
files = []
except BadInputExc as e: # do not like it but do not duplicate code
message = str(e)
files = []
env['message'] = message
env['files'] = files
return render_template('directory.html', **env)

File Metadata

Mime Type
text/x-diff
Expires
Sat, Jun 21, 5:27 PM (1 w, 5 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3267921

Event Timeline