Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9339715
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
178 KB
Subscribers
None
View Options
diff --git a/swh/web/ui/api.py b/swh/web/ui/api.py
index 4a3c263f..8a900b69 100644
--- a/swh/web/ui/api.py
+++ b/swh/web/ui/api.py
@@ -1,829 +1,869 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from types import GeneratorType
from flask import request, url_for, Response, redirect
from swh.web.ui import service, utils
from swh.web.ui.exc import BadInputExc, NotFoundExc
from swh.web.ui.main import app
@app.route('/api/1/stat/counters/')
def api_stats():
"""Return statistics on SWH storage.
Returns:
SWH storage's statistics.
"""
return service.stat_counters()
@app.route('/api/1/search/')
@app.route('/api/1/search/<string:q>/')
def api_search(q='sha1:bd819b5b28fcde3bf114d16a44ac46250da94ee5'):
"""Search a content per hash.
Args:
q is of the form algo_hash:hash with algo_hash in
(sha1, sha1_git, sha256).
Returns:
Dictionary with 'found' key and the associated result.
Raises:
BadInputExc in case of unknown algo_hash or bad hash.
Example:
GET /api/1/search/sha1:bd819b5b28fcde3bf114d16a44ac46250da94ee5/
"""
r = service.lookup_hash(q).get('found')
return {'found': True if r else False}
def _api_lookup(criteria,
lookup_fn,
error_msg_if_not_found,
enrich_fn=lambda x: x,
*args):
"""Capture a redundant behavior of:
- looking up the backend with a criteria (be it an identifier or checksum)
passed to the function lookup_fn
- if nothing is found, raise an NotFoundExc exception with error
message error_msg_if_not_found.
- Otherwise if something is returned:
- either as list, map or generator, map the enrich_fn function to it
and return the resulting data structure as list.
- either as dict and pass to enrich_fn and return the dict enriched.
Args:
- criteria: discriminating criteria to lookup
- lookup_fn: function expects one criteria and optional supplementary
*args.
- error_msg_if_not_found: if nothing matching the criteria is found,
raise NotFoundExc with this error message.
- enrich_fn: Function to use to enrich the result returned by
lookup_fn. Default to the identity function if not provided.
- *args: supplementary arguments to pass to lookup_fn.
Raises:
NotFoundExp or whatever `lookup_fn` raises.
"""
res = lookup_fn(criteria, *args)
if not res:
raise NotFoundExc(error_msg_if_not_found)
if isinstance(res, (map, list, GeneratorType)):
enriched_data = []
for e in res:
enriched_data.append(enrich_fn(e))
return enriched_data
return enrich_fn(res)
@app.route('/api/1/origin/')
@app.route('/api/1/origin/<int:origin_id>/')
def api_origin(origin_id=1):
"""Return information about origin with id origin_id.
Args:
origin_id: the origin's identifier.
Returns:
Information on the origin if found.
Raises:
NotFoundExc if the origin is not found.
Example:
GET /api/1/origin/1/
"""
return _api_lookup(
origin_id, lookup_fn=service.lookup_origin,
error_msg_if_not_found='Origin with id %s not found.' % origin_id)
@app.route('/api/1/person/')
@app.route('/api/1/person/<int:person_id>/')
def api_person(person_id=1):
"""Return information about person with identifier person_id.
Args:
person_id: the person's identifier.
Returns:
Information on the person if found.
Raises:
NotFoundExc if the person is not found.
Example:
GET /api/1/person/1/
"""
return _api_lookup(
person_id, lookup_fn=service.lookup_person,
error_msg_if_not_found='Person with id %s not found.' % person_id)
def _enrich_release(release):
"""Enrich a release with link to the 'target' of 'type' revision.
"""
if 'target' in release and \
'target_type' in release and \
release['target_type'] == 'revision':
release['target_url'] = url_for('api_revision',
sha1_git=release['target'])
return release
@app.route('/api/1/release/')
@app.route('/api/1/release/<string:sha1_git>/')
def api_release(sha1_git='3c31de6fdc47031857fda10cfa4caf7044cadefb'):
"""Return information about release with id sha1_git.
Args:
sha1_git: the release's hash.
Returns:
Information on the release if found.
Raises:
BadInputExc in case of unknown algo_hash or bad hash.
NotFoundExc if the release is not found.
Example:
GET /api/1/release/b307094f00c3641b0c9da808d894f3a325371414
"""
error_msg = 'Release with sha1_git %s not found.' % sha1_git
return _api_lookup(
sha1_git,
lookup_fn=service.lookup_release,
error_msg_if_not_found=error_msg,
enrich_fn=_enrich_release)
def _enrich_revision_with_urls(revision, context=None):
"""Enrich revision with links where it makes sense (directory, parents).
"""
if not context:
context = revision['id']
revision['url'] = url_for('api_revision', sha1_git=revision['id'])
revision['history_url'] = url_for('api_revision_log',
sha1_git=revision['id'])
if 'directory' in revision:
revision['directory_url'] = url_for('api_directory',
sha1_git=revision['directory'])
if 'parents' in revision:
parents = []
for parent in revision['parents']:
parents.append(url_for('api_revision_history',
sha1_git_root=context,
sha1_git=parent))
revision['parent_urls'] = parents
if 'children' in revision:
children = []
for child in revision['children']:
children.append(url_for('api_revision_history',
sha1_git_root=context,
sha1_git=child))
revision['children_urls'] = children
return revision
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/directory/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/directory/<path:path>/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/directory/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/directory/<path:path>/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/ts/<string:ts>'
'/directory/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/ts/<string:ts>'
'/directory/<path:path>/')
def api_directory_through_revision_with_origin(origin_id=1,
branch_name="refs/heads/master",
ts=None,
path=None):
"""Display directory or content information through a revision identified
by origin/branch/timestamp.
Args:
origin_id: origin's identifier (default to 1).
branch_name: the optional branch for the given origin (default
to master).
timestamp: optional timestamp (default to the nearest time
crawl of timestamp).
path: Path to directory or file to display.
Returns:
Information on the directory or content pointed to by such revision.
Raises:
NotFoundExc if the revision is not found or the path pointed to
is not found.
"""
if ts:
ts = utils.parse_timestamp(ts)
revision = service.lookup_revision_by(origin_id, branch_name, ts)
if not revision:
raise NotFoundExc('Revision with (origin_id: %s, branch_name: %s'
', ts: %s) not found.' % (origin_id,
branch_name,
ts))
return _revision_directory(revision['id'], path, request.path)
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/history/<sha1_git>/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/history/<sha1_git>/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/ts/<string:ts>'
'/history/<sha1_git>/')
def api_history_through_revision_with_origin(origin_id=1,
branch_name="refs/heads/master",
ts=None,
sha1_git=None):
"""
Return information about revision sha1_git, limited to the
sub-graph of all transitive parents of the revision root identified
by (origin_id, branch_name, ts).
Given sha1_git_root such root revision's identifier, in other words,
sha1_git is an ancestor of sha1_git_root.
Args:
origin_id: origin's identifier (default to 1).
branch_name: the optional branch for the given origin (default
to master).
timestamp: optional timestamp (default to the nearest time
crawl of timestamp).
sha1_git: one of sha1_git_root's ancestors.
limit: optional query parameter to limit the revisions log
(default to 100). For now, note that this limit could impede the
transitivity conclusion about sha1_git not being an ancestor of
sha1_git_root (even if it is).
Returns:
Information on sha1_git if it is an ancestor of sha1_git_root
including children leading to sha1_git_root.
Raises:
BadInputExc in case of unknown algo_hash or bad hash.
NotFoundExc if either revision is not found or if sha1_git is not an
ancestor of sha1_git_root.
"""
limit = int(request.args.get('limit', '100'))
if ts:
ts = utils.parse_timestamp(ts)
rev_root, revision = service.lookup_revision_with_context_by(
origin_id, branch_name, ts, sha1_git, limit)
if not revision:
raise NotFoundExc(
"Possibly sha1_git '%s' is not an ancestor of sha1_git_root '%s' "
"sha1_git_root being the revision's identifier pointed to by "
"(origin_id: %s, branch_name: %s, ts: %s)." % (sha1_git,
rev_root['id'],
origin_id,
branch_name,
ts))
return _enrich_revision_with_urls(revision, context=rev_root['id'])
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/history/<sha1_git>'
'/directory/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/history/<sha1_git>'
'/directory/<path:path>/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/history/<sha1_git>'
'/directory/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/history/<sha1_git>'
'/directory/<path:path>/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/ts/<string:ts>'
'/history/<sha1_git>'
'/directory/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/ts/<string:ts>'
'/history/<sha1_git>'
'/directory/<path:path>/')
def api_directory_through_revision_with_origin_history(
origin_id=1,
branch_name="refs/heads/master",
ts=None,
sha1_git=None,
path=None):
"""Return information about directory or content pointed to by the
revision defined as: revision sha1_git, limited to the sub-graph
of all transitive parents of sha1_git_root (being the identified
sha1 by looking up origin_id/branch_name/ts)
Args:
origin_id: origin's identifier (default to 1).
branch_name: the optional branch for the given origin (default
to master).
timestamp: optional timestamp (default to the nearest time
crawl of timestamp).
sha1_git: one of sha1_git_root's ancestors.
path: optional directory or content pointed to by that revision.
limit: optional query parameter to limit the revisions log
(default to 100). For now, note that this limit could impede the
transitivity conclusion about sha1_git not being an ancestor of
sha1_git_root (even if it is).
Returns:
Information on the directory pointed to by that revision.
Raises:
BadInputExc in case of unknown algo_hash or bad hash.
NotFoundExc if either revision is not found or if sha1_git is not an
ancestor of sha1_git_root or the path referenced does not exist.
"""
limit = int(request.args.get('limit', '100'))
if ts:
ts = utils.parse_timestamp(ts)
rev_root, revision = service.lookup_revision_with_context_by(origin_id,
branch_name,
ts,
sha1_git,
limit)
if not revision:
raise NotFoundExc(
"Possibly sha1_git '%s' is not an ancestor of sha1_git_root '%s' "
"sha1_git_root being the revision's identifier pointed to by "
"(origin_id: %s, branch_name: %s, ts: %s)." % (sha1_git,
rev_root['id'],
origin_id,
branch_name,
ts))
return _revision_directory(revision['id'], path, request.path)
@app.route('/api/1/revision'
'/origin/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/ts/<string:ts>/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/ts/<string:ts>/')
def api_revision_with_origin(origin_id=1,
branch_name="refs/heads/master",
ts=None):
"""Instead of having to specify a (root) revision by SHA1_GIT, users
might want to specify a place and a time. In SWH a "place" is an
origin; a "time" is a timestamp at which some place has been
observed by SWH crawlers.
Args:
origin_id: origin's identifier (default to 1).
branch_name: the optional branch for the given origin (default
to master).
timestamp: optional timestamp (default to the nearest time
crawl of timestamp).
Returns:
Information on the revision if found.
Raises:
BadInputExc in case of unknown algo_hash or bad hash.
NotFoundExc if the revision is not found.
"""
if ts:
ts = utils.parse_timestamp(ts)
return _api_lookup(
origin_id,
service.lookup_revision_by,
'Revision with (origin_id: %s, branch_name: %s'
', ts: %s) not found.' % (origin_id,
branch_name,
ts),
_enrich_revision_with_urls,
branch_name,
ts)
@app.route('/api/1/revision/')
@app.route('/api/1/revision/<string:sha1_git>/')
def api_revision(sha1_git='a585d2b738bfa26326b3f1f40f0f1eda0c067ccf'):
"""Return information about revision with id sha1_git.
Args:
sha1_git: the revision's hash.
Returns:
Information on the revision if found.
Raises:
BadInputExc in case of unknown algo_hash or bad hash.
NotFoundExc if the revision is not found.
Example:
GET /api/1/revision/baf18f9fc50a0b6fef50460a76c33b2ddc57486e
"""
return _api_lookup(
sha1_git,
lookup_fn=service.lookup_revision,
error_msg_if_not_found='Revision with sha1_git %s not'
' found.' % sha1_git,
enrich_fn=_enrich_revision_with_urls)
def _enrich_directory(directory, context_url=None):
"""Enrich directory with url to content or directory.
"""
if 'type' in directory:
target_type = directory['type']
target = directory['target']
if target_type == 'file':
directory['target_url'] = url_for('api_content_with_details',
q='sha1_git:%s' % target)
if context_url:
directory['file_url'] = context_url + directory['name'] + '/'
else:
directory['target_url'] = url_for('api_directory',
sha1_git=target)
if context_url:
directory['dir_url'] = context_url + directory['name'] + '/'
return directory
def _revision_directory(rev_sha1_git, dir_path, request_path):
"""Compute the revision rev_sha1_git's directory or content data.
"""
def enrich_directory_local(dir, context_url=request_path):
return _enrich_directory(dir, context_url)
result = service.lookup_directory_with_revision(rev_sha1_git, dir_path)
if not result:
raise NotFoundExc('Revision with sha1_git %s not'
' found.' % rev_sha1_git)
if result['type'] == 'dir': # dir_entries
return list(map(enrich_directory_local, result['content']))
else: # content
return _enrich_content(result['content'])
@app.route('/api/1/revision/<string:sha1_git>/directory/')
@app.route('/api/1/revision/<string:sha1_git>/directory/<path:dir_path>/')
def api_directory_with_revision(
sha1_git='a585d2b738bfa26326b3f1f40f0f1eda0c067ccf',
dir_path=None):
"""Return information on directory pointed by revision with sha1_git.
If dir_path is not provided, display top level directory.
Otherwise, display the directory pointed by dir_path (if it exists).
Args:
sha1_git: revision's hash.
dir_path: optional directory pointed to by that revision.
Returns:
Information on the directory pointed to by that revision.
Raises:
BadInputExc in case of unknown algo_hash or bad hash.
NotFoundExc either if the revision is not found or the path referenced
does not exist
Example:
GET /api/1/revision/baf18f9fc50a0b6fef50460a76c33b2ddc57486e/directory/
"""
return _revision_directory(sha1_git, dir_path, request.path)
@app.route('/api/1/revision/<string:sha1_git_root>/history/<sha1_git>/')
def api_revision_history(sha1_git_root, sha1_git):
"""Return information about revision sha1_git, limited to the
sub-graph of all transitive parents of sha1_git_root.
In other words, sha1_git is an ancestor of sha1_git_root.
Args:
sha1_git_root: latest revision of the browsed history.
sha1_git: one of sha1_git_root's ancestors.
limit: optional query parameter to limit the revisions log
(default to 100). For now, note that this limit could impede the
transitivity conclusion about sha1_git not being an ancestor of
sha1_git_root (even if it is).
Returns:
Information on sha1_git if it is an ancestor of sha1_git_root
including children leading to sha1_git_root.
Raises:
BadInputExc in case of unknown algo_hash or bad hash.
NotFoundExc if either revision is not found or if sha1_git is not an
ancestor of sha1_git_root.
"""
limit = int(request.args.get('limit', '100'))
if sha1_git == sha1_git_root:
return redirect(url_for('api_revision',
sha1_git=sha1_git,
limit=limit))
revision = service.lookup_revision_with_context(sha1_git_root,
sha1_git,
limit)
if not revision:
raise NotFoundExc(
"Possibly sha1_git '%s' is not an ancestor of sha1_git_root '%s'"
% (sha1_git, sha1_git_root))
return _enrich_revision_with_urls(revision, context=sha1_git_root)
@app.route('/api/1/revision/<string:sha1_git_root>'
'/history/<sha1_git>'
'/directory/')
@app.route('/api/1/revision/<string:sha1_git_root>'
'/history/<sha1_git>'
'/directory/<path:dir_path>/')
def api_directory_revision_history(sha1_git_root, sha1_git, dir_path=None):
"""Return information about directory pointed to by the revision
defined as: revision sha1_git, limited to the sub-graph of all
transitive parents of sha1_git_root.
Args:
sha1_git_root: latest revision of the browsed history.
sha1_git: one of sha1_git_root's ancestors.
dir_path: optional directory pointed to by that revision.
limit: optional query parameter to limit the revisions log
(default to 100). For now, note that this limit could impede the
transitivity conclusion about sha1_git not being an ancestor of
sha1_git_root (even if it is).
Returns:
Information on the directory pointed to by that revision.
Raises:
BadInputExc in case of unknown algo_hash or bad hash.
NotFoundExc if either revision is not found or if sha1_git is not an
ancestor of sha1_git_root or the path referenced does not exist
"""
limit = int(request.args.get('limit', '100'))
if sha1_git == sha1_git_root:
return redirect(url_for('api_directory_with_revision',
sha1_git=sha1_git,
dir_path=dir_path),
code=301)
revision = service.lookup_revision_with_context(sha1_git_root,
sha1_git,
limit)
if not revision:
raise NotFoundExc(
"Possibly sha1_git '%s' is not an ancestor of sha1_git_root '%s'"
% (sha1_git, sha1_git_root))
return _revision_directory(revision['id'], dir_path, request.path)
@app.route('/api/1/revision/<string:sha1_git>/log/')
def api_revision_log(sha1_git):
"""Show all revisions (~git log) starting from sha1_git.
The first element returned is the given sha1_git.
Args:
sha1_git: the revision's hash.
limit: optional query parameter to limit the revisions log
(default to 100).
Returns:
Information on the revision if found.
Raises:
BadInputExc in case of unknown algo_hash or bad hash.
NotFoundExc if the revision is not found.
"""
limit = int(request.args.get('limit', '100'))
def lookup_revision_log_with_limit(s, limit=limit):
return service.lookup_revision_log(s, limit)
error_msg = 'Revision with sha1_git %s not found.' % sha1_git
return _api_lookup(sha1_git,
lookup_fn=lookup_revision_log_with_limit,
error_msg_if_not_found=error_msg,
enrich_fn=_enrich_revision_with_urls)
@app.route('/api/1/directory/')
@app.route('/api/1/directory/<string:sha1_git>/')
def api_directory(sha1_git='dcf3289b576b1c8697f2a2d46909d36104208ba3'):
"""Return information about release with id sha1_git.
Args:
sha1_git: Directory's sha1_git.
Raises:
BadInputExc in case of unknown algo_hash or bad hash.
NotFoundExc if the content is not found.
Example:
GET /api/1/directory/8d7dc91d18546a91564606c3e3695a5ab568d179
"""
error_msg = 'Directory with sha1_git %s not found.' % sha1_git
return _api_lookup(
sha1_git,
lookup_fn=service.lookup_directory,
error_msg_if_not_found=error_msg,
enrich_fn=_enrich_directory)
# @app.route('/api/1/browse/')
# @app.route('/api/1/browse/<string:q>/')
def api_content_checksum_to_origin(q='sha1_git:26ac0281bc74e9bd8a4a4aab1c7c7a'
'0c19d4436c'):
"""Return content information up to one of its origin if the content
is found.
Args:
q is of the form algo_hash:hash with algo_hash in
(sha1, sha1_git, sha256).
Returns:
Information on one possible origin for such content.
Raises:
BadInputExc in case of unknown algo_hash or bad hash.
NotFoundExc if the content is not found.
Example:
GET /api/1/browse/sha1_git:88b9b366facda0b5ff8d8640ee9279bed346f242
"""
found = service.lookup_hash(q)['found']
if not found:
raise NotFoundExc('Content with %s not found.' % q)
return service.lookup_hash_origin(q)
@app.route('/api/1/content/<string:q>/raw/')
def api_content_raw(q):
"""Return content's raw data if content is found.
Args:
q is of the form (algo_hash:)hash with algo_hash in
(sha1, sha1_git, sha256).
When algo_hash is not provided, 'hash' is considered sha1.
Returns:
Content's raw data in application/octet-stream.
Raises:
- BadInputExc in case of unknown algo_hash or bad hash
- NotFoundExc if the content is not found.
"""
def generate(content):
yield content['data']
content = service.lookup_content_raw(q)
if not content:
raise NotFoundExc('Content with %s not found.' % q)
return Response(generate(content), mimetype='application/octet-stream')
def _enrich_content(content):
"""Enrich content with 'data', a link to its raw content.
"""
content['data_url'] = url_for('api_content_raw', q=content['sha1'])
return content
@app.route('/api/1/content/')
@app.route('/api/1/content/<string:q>/')
def api_content_with_details(q='sha256:e2c76e40866bb6b28916387bdfc8649beceb'
'523015738ec6d4d540c7fe65232b'):
"""Return content information if content is found.
Args:
q is of the form (algo_hash:)hash with algo_hash in
(sha1, sha1_git, sha256).
When algo_hash is not provided, 'hash' is considered sha1.
Returns:
Content's information.
Raises:
- BadInputExc in case of unknown algo_hash or bad hash.
- NotFoundExc if the content is not found.
Example:
GET /api/1/content/sha256:e2c76e40866bb6b28916387bdfc8649beceb
523015738ec6d4d540c7fe65232b
"""
return _api_lookup(
q,
lookup_fn=service.lookup_content,
error_msg_if_not_found='Content with %s not found.' % q,
enrich_fn=_enrich_content)
+def _enrich_entity(entity):
+ """Enrich entity with
+
+ """
+ entity['uuid_url'] = url_for('api_entity_by_uuid',
+ uuid=entity['uuid'])
+ if 'parent' in entity and entity['parent']:
+ entity['parent_url'] = url_for('api_entity_by_uuid',
+ uuid=entity['parent'])
+ return entity
+
+
+@app.route('/api/1/entity/')
+@app.route('/api/1/entity/<string:uuid>/')
+def api_entity_by_uuid(uuid='5f4d4c51-498a-4e28-88b3-b3e4e8396cba'):
+ """Return content information if content is found.
+
+ Args:
+ q is of the form (algo_hash:)hash with algo_hash in
+ (sha1, sha1_git, sha256).
+ When algo_hash is not provided, 'hash' is considered sha1.
+
+ Returns:
+ Content's information.
+
+ Raises:
+ - BadInputExc in case of unknown algo_hash or bad hash.
+ - NotFoundExc if the content is not found.
+
+ Example:
+ GET /api/1/entity/7c33636b-8f11-4bda-89d9-ba8b76a42cec/
+
+ """
+ return _api_lookup(
+ uuid,
+ lookup_fn=service.lookup_entity_by_uuid,
+ error_msg_if_not_found="Entity with uuid '%s' not found." % uuid,
+ enrich_fn=_enrich_entity)
+
+
@app.route('/api/1/uploadnsearch/', methods=['POST'])
def api_uploadnsearch():
"""Upload the file's content in the post body request.
Compute its hash and determine if it exists in the storage.
Args:
request.files filled with the filename's data to upload.
Returns:
Dictionary with 'sha1', 'filename' and 'found' predicate depending
on whether we find it or not.
Raises:
BadInputExc in case of the form submitted is incorrect.
"""
file = request.files.get('filename')
if not file:
raise BadInputExc("Bad request, missing 'filename' entry in form.")
return service.upload_and_search(file)
diff --git a/swh/web/ui/backend.py b/swh/web/ui/backend.py
index 970df5ea..f6adabf2 100644
--- a/swh/web/ui/backend.py
+++ b/swh/web/ui/backend.py
@@ -1,188 +1,195 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
from swh.web.ui import main
def content_get(sha1_bin):
"""Lookup the content designed by {algo: hash_bin}.
Args:
sha1_bin: content's binary sha1.
Returns:
Content as dict with 'sha1' and 'data' keys.
data representing its raw data.
"""
contents = main.storage().content_get([sha1_bin])
if contents and len(contents) >= 1:
return contents[0]
return None
def content_find(algo, hash_bin):
"""Retrieve the content with binary hash hash_bin
Args:
algo: nature of the hash hash_bin.
hash_bin: content's hash searched for.
Returns:
A triplet (sha1, sha1_git, sha256) if the content exist
or None otherwise.
"""
return main.storage().content_find({algo: hash_bin})
def content_find_occurrence(algo, hash_bin):
"""Find the content's occurrence.
Args:
algo: nature of the hash hash_bin.
hash_bin: content's hash searched for.
Returns:
The occurrence of the content.
"""
return main.storage().content_find_occurrence({algo: hash_bin})
def origin_get(origin_id):
"""Return information about the origin with id origin_id.
Args:
origin_id: origin's identifier
Returns:
Origin information as dict.
"""
return main.storage().origin_get({'id': origin_id})
def person_get(person_id):
"""Return information about the person with id person_id.
Args:
person_id: person's identifier.v
Returns:
Person information as dict.
"""
return main.storage().person_get([person_id])
def directory_get(sha1_git_bin, recursive=False):
"""Return information about the directory with id sha1_git.
Args:
sha1_git: directory's identifier.
recursive: Optional recursive flag default to False
Returns:
Directory information as dict.
"""
directory_entries = main.storage().directory_get(sha1_git_bin, recursive)
if not directory_entries:
return None
return directory_entries
def release_get(sha1_git_bin):
"""Return information about the release with sha1 sha1_git_bin.
Args:
sha1_git_bin: The release's sha1 as hexadecimal.
Returns:
Release information as dict if found, None otherwise.
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
res = main.storage().release_get([sha1_git_bin])
if res and len(res) >= 1:
return res[0]
return None
def revision_get(sha1_git_bin):
"""Return information about the revision with sha1 sha1_git_bin.
Args:
sha1_git_bin: The revision's sha1 as hexadecimal.
Returns:
Revision information as dict if found, None otherwise.
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
res = main.storage().revision_get([sha1_git_bin])
if res and len(res) >= 1:
return res[0]
return None
def revision_log(sha1_git_bin, limit=100):
"""Return information about the revision with sha1 sha1_git_bin.
Args:
sha1_git_bin: The revision's sha1 as hexadecimal.
limit: the maximum number of revisions returned.
Returns:
Revision information as dict if found, None otherwise.
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
return main.storage().revision_log(sha1_git_bin, limit)
def stat_counters():
"""Return the stat counters for Software Heritage
Returns:
A dict mapping textual labels to integer values.
"""
return main.storage().stat_counters()
def revision_get_by(origin_id, branch_name, timestamp):
"""Return occurrence information matching the criterions origin_id,
branch_name, ts.
"""
res = main.storage().revision_get_by(origin_id,
branch_name,
timestamp=timestamp,
limit=1)
if not res:
return None
return res[0]
def directory_entry_get_by_path(directory, path):
"""Return a directory entry by its path.
"""
paths = path.strip(os.path.sep).split(os.path.sep)
return main.storage().directory_entry_get_by_path(
directory,
list(map(lambda p: p.encode('utf-8'), paths)))
+
+
+def entity_get(uuid):
+ """Retrieve the entity per its uuid.
+
+ """
+ return main.storage().entity_get(uuid)
diff --git a/swh/web/ui/query.py b/swh/web/ui/query.py
index 5de6f231..cb68825b 100644
--- a/swh/web/ui/query.py
+++ b/swh/web/ui/query.py
@@ -1,87 +1,111 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import re
+from uuid import UUID
+
from swh.core.hashutil import ALGORITHMS, hex_to_hash
from swh.web.ui.exc import BadInputExc
SHA256_RE = re.compile(r'^[0-9a-f]{64}$', re.IGNORECASE)
SHA1_RE = re.compile(r'^[0-9a-f]{40}$', re.IGNORECASE)
def parse_hash(q):
"""Detect the hash type of a user submitted query string.
Args:
query string with the following format: "[HASH_TYPE:]HEX_CHECKSUM",
where HASH_TYPE is optional, defaults to "sha1", and can be one of
swh.core.hashutil.ALGORITHMS
Returns:
A pair (hash_algorithm, byte hash value)
Raises:
ValueError if the given query string does not correspond to a valid
hash value
"""
def guess_algo(q):
if SHA1_RE.match(q):
return 'sha1'
elif SHA256_RE.match(q):
return 'sha256'
else:
raise BadInputExc('Invalid checksum query string %s' % q)
def check_algo(algo, hex):
if (algo in {'sha1', 'sha1_git'} and not SHA1_RE.match(hex)) \
or (algo == 'sha256' and not SHA256_RE.match(hex)):
raise BadInputExc('Invalid hash %s for algorithm %s' % (hex, algo))
parts = q.split(':')
if len(parts) > 2:
raise BadInputExc('Invalid checksum query string %s' % q)
elif len(parts) == 1:
parts = (guess_algo(q), q)
elif len(parts) == 2:
check_algo(parts[0], parts[1])
algo = parts[0]
if algo not in ALGORITHMS:
raise BadInputExc('Unknown hash algorithm %s' % algo)
return (algo, hex_to_hash(parts[1]))
def parse_hash_with_algorithms_or_throws(q, accepted_algo, error_msg):
"""Parse a query but only accepts accepted_algo.
Otherwise, raise the exception with message error_msg.
Args:
- q: query string with the following format: "[HASH_TYPE:]HEX_CHECKSUM"
where HASH_TYPE is optional, defaults to "sha1", and can be one of
swh.core.hashutil.ALGORITHMS.
- accepted_algo: array of strings representing the names of accepted
algorithms.
- error_msg: error message to raise as BadInputExc if the algo of
the query does not match.
Returns:
A pair (hash_algorithm, byte hash value)
Raises:
BadInputExc when the inputs is invalid or does not
validate the accepted algorithms.
"""
algo, hash = parse_hash(q)
if algo not in accepted_algo:
raise BadInputExc(error_msg)
return (algo, hash)
+
+
+def parse_uuid4(uuid):
+ """Parse an uuid 4 from a string.
+
+ Args:
+ uuid: String representing an uuid.
+
+ Returns:
+ The uuid as is if everything is ok.
+
+ Raises:
+ BadInputExc: if the uuid is invalid.
+
+ """
+ try:
+ UUID(uuid, version=4)
+ except ValueError as e:
+ # not a valid hex code for a UUID
+ raise BadInputExc(str(e))
+
+ return uuid
diff --git a/swh/web/ui/service.py b/swh/web/ui/service.py
index d171b51f..5f8a10bc 100644
--- a/swh/web/ui/service.py
+++ b/swh/web/ui/service.py
@@ -1,412 +1,426 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import defaultdict
from swh.core import hashutil
from swh.web.ui import converters, query, upload, backend
from swh.web.ui.exc import NotFoundExc
def hash_and_search(filepath):
"""Hash the filepath's content as sha1, then search in storage if
it exists.
Args:
Filepath of the file to hash and search.
Returns:
Tuple (hex sha1, found as True or false).
The found boolean, according to whether the sha1 of the file
is present or not.
"""
h = hashutil.hashfile(filepath)
c = backend.content_find('sha1', h['sha1'])
if c:
r = converters.from_content(c)
r['found'] = True
return r
else:
return {'sha1': hashutil.hash_to_hex(h['sha1']),
'found': False}
def upload_and_search(file):
"""Upload a file and compute its hash.
"""
tmpdir, filename, filepath = upload.save_in_upload_folder(file)
res = {'filename': filename}
try:
content = hash_and_search(filepath)
res.update(content)
return res
finally:
# clean up
if tmpdir:
upload.cleanup(tmpdir)
def lookup_hash(q):
"""Checks if the storage contains a given content checksum
Args: query string of the form <hash_algo:hash>
Returns: Dict with key found to True or False, according to
whether the checksum is present or not
"""
algo, hash = query.parse_hash(q)
found = backend.content_find(algo, hash)
return {'found': found,
'algo': algo}
def lookup_hash_origin(q):
"""Return information about the checksum contained in the query q.
Args: query string of the form <hash_algo:hash>
Returns:
origin as dictionary if found for the given content.
"""
algo, hash = query.parse_hash(q)
origin = backend.content_find_occurrence(algo, hash)
return converters.from_origin(origin)
def lookup_origin(origin_id):
"""Return information about the origin with id origin_id.
Args:
origin_id as string
Returns:
origin information as dict.
"""
return backend.origin_get(origin_id)
def lookup_person(person_id):
"""Return information about the person with id person_id.
Args:
person_id as string
Returns:
person information as dict.
"""
person = backend.person_get(person_id)
return converters.from_person(person)
def lookup_directory(sha1_git):
"""Return information about the directory with id sha1_git.
Args:
sha1_git as string
Returns:
directory information as dict.
"""
_, sha1_git_bin = query.parse_hash_with_algorithms_or_throws(
sha1_git,
['sha1'], # HACK: sha1_git really
'Only sha1_git is supported.')
directory_entries = backend.directory_get(sha1_git_bin)
return map(converters.from_directory_entry, directory_entries)
def lookup_release(release_sha1_git):
"""Return information about the release with sha1 release_sha1_git.
Args:
release_sha1_git: The release's sha1 as hexadecimal
Returns:
Release information as dict.
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
_, sha1_git_bin = query.parse_hash_with_algorithms_or_throws(
release_sha1_git,
['sha1'],
'Only sha1_git is supported.')
res = backend.release_get(sha1_git_bin)
return converters.from_release(res)
def lookup_revision(rev_sha1_git):
"""Return information about the revision with sha1 revision_sha1_git.
Args:
revision_sha1_git: The revision's sha1 as hexadecimal
Returns:
Revision information as dict.
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
_, sha1_git_bin = query.parse_hash_with_algorithms_or_throws(
rev_sha1_git,
['sha1'],
'Only sha1_git is supported.')
res = backend.revision_get(sha1_git_bin)
return converters.from_revision(res)
def lookup_revision_by(origin_id,
branch_name="refs/heads/master",
timestamp=None):
"""Lookup revisions by origin_id, branch_name and timestamp.
If:
- branch_name is not provided, lookup using 'refs/heads/master' as default.
- ts is not provided, use the most recent
Args:
- origin_id: origin of the revision.
- branch_name: revision's branch.
- timestamp: revision's time frame.
Yields:
The revisions matching the criterions.
"""
res = backend.revision_get_by(origin_id, branch_name, timestamp)
return converters.from_revision(res)
def lookup_revision_log(rev_sha1_git, limit=100):
"""Return information about the revision with sha1 revision_sha1_git.
Args:
revision_sha1_git: The revision's sha1 as hexadecimal
limit: the maximum number of revisions returned
Returns:
Revision information as dict.
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
_, sha1_git_bin = query.parse_hash_with_algorithms_or_throws(
rev_sha1_git,
['sha1'],
'Only sha1_git is supported.')
revision_entries = backend.revision_log(sha1_git_bin, limit)
return map(converters.from_revision, revision_entries)
def lookup_revision_with_context_by(origin_id, branch_name, ts, sha1_git,
limit=100):
"""Return information about revision sha1_git, limited to the
sub-graph of all transitive parents of sha1_git_root.
sha1_git_root being resolved through the lookup of a revision by origin_id,
branch_name and ts.
In other words, sha1_git is an ancestor of sha1_git_root.
Args:
- origin_id: origin of the revision.
- branch_name: revision's branch.
- timestamp: revision's time frame.
- sha1_git: one of sha1_git_root's ancestors.
- limit: limit the lookup to 100 revisions back.
Returns:
Pair of (root_revision, revision).
Information on sha1_git if it is an ancestor of sha1_git_root
including children leading to sha1_git_root
Raises:
- BadInputExc in case of unknown algo_hash or bad hash.
- NotFoundExc if either revision is not found or if sha1_git is not an
ancestor of sha1_git_root.
"""
rev_root = backend.revision_get_by(origin_id, branch_name, ts)
if not rev_root:
raise NotFoundExc('Revision with (origin_id: %s, branch_name: %s'
', ts: %s) not found.' % (origin_id,
branch_name,
ts))
return (converters.from_revision(rev_root),
lookup_revision_with_context(rev_root, sha1_git, limit))
def lookup_revision_with_context(sha1_git_root, sha1_git, limit=100):
"""Return information about revision sha1_git, limited to the
sub-graph of all transitive parents of sha1_git_root.
In other words, sha1_git is an ancestor of sha1_git_root.
Args:
sha1_git_root: latest revision. The type is either a sha1 (as an hex
string) or a non converted dict.
sha1_git: one of sha1_git_root's ancestors
limit: limit the lookup to 100 revisions back
Returns:
Information on sha1_git if it is an ancestor of sha1_git_root
including children leading to sha1_git_root
Raises:
BadInputExc in case of unknown algo_hash or bad hash
NotFoundExc if either revision is not found or if sha1_git is not an
ancestor of sha1_git_root
"""
_, sha1_git_bin = query.parse_hash_with_algorithms_or_throws(
sha1_git,
['sha1'],
'Only sha1_git is supported.')
revision = backend.revision_get(sha1_git_bin)
if not revision:
raise NotFoundExc('Revision %s not found' % sha1_git)
if isinstance(sha1_git_root, str):
_, sha1_git_root_bin = query.parse_hash_with_algorithms_or_throws(
sha1_git_root,
['sha1'],
'Only sha1_git is supported.')
revision_root = backend.revision_get(sha1_git_root_bin)
if not revision_root:
raise NotFoundExc('Revision %s not found' % sha1_git_root)
else:
sha1_git_root_bin = sha1_git_root
revision_log = backend.revision_log(sha1_git_root_bin, limit)
parents = {}
children = defaultdict(list)
for rev in revision_log:
rev_id = rev['id']
parents[rev_id] = []
for parent_id in rev['parents']:
parents[rev_id].append(parent_id)
children[parent_id].append(rev_id)
if revision['id'] not in parents:
raise NotFoundExc('Revision %s is not an ancestor of %s' %
(sha1_git, sha1_git_root))
revision['children'] = children[revision['id']]
return converters.from_revision(revision)
def lookup_directory_with_revision(sha1_git, dir_path=None):
"""Return information on directory pointed by revision with sha1_git.
If dir_path is not provided, display top level directory.
Otherwise, display the directory pointed by dir_path (if it exists).
Args:
sha1_git: revision's hash.
dir_path: optional directory pointed to by that revision.
Returns:
Information on the directory pointed to by that revision.
Raises:
BadInputExc in case of unknown algo_hash or bad hash.
NotFoundExc either if the revision is not found or the path referenced
does not exist.
NotImplementedError in case of dir_path exists but do not reference a
type 'dir' or 'file'.
"""
_, sha1_git_bin = query.parse_hash_with_algorithms_or_throws(
sha1_git,
['sha1'],
'Only sha1_git is supported.')
revision = backend.revision_get(sha1_git_bin)
if not revision:
raise NotFoundExc('Revision %s not found' % sha1_git)
dir_sha1_git_bin = revision['directory']
if dir_path:
entity = backend.directory_entry_get_by_path(dir_sha1_git_bin,
dir_path)
if not entity:
raise NotFoundExc(
"Directory or File '%s' pointed to by revision %s not found"
% (dir_path, sha1_git))
else:
entity = {'type': 'dir', 'target': dir_sha1_git_bin}
if entity['type'] == 'dir':
directory_entries = backend.directory_get(entity['target'])
return {'type': 'dir',
'content': map(converters.from_directory_entry,
directory_entries)}
elif entity['type'] == 'file': # content
content = backend.content_find('sha1_git', entity['target'])
return {'type': 'file',
'content': converters.from_content(content)}
else:
raise NotImplementedError('Entity of type %s not implemented.'
% entity['type'])
def lookup_content(q):
"""Lookup the content designed by q.
Args:
q: The release's sha1 as hexadecimal
"""
algo, hash = query.parse_hash(q)
c = backend.content_find(algo, hash)
return converters.from_content(c)
def lookup_content_raw(q):
"""Lookup the content designed by q.
Args:
q: query string of the form <hash_algo:hash>
Returns:
dict with 'sha1' and 'data' keys.
data representing its raw data decoded.
"""
algo, hash = query.parse_hash(q)
c = backend.content_find(algo, hash)
if not c:
return None
content = backend.content_get(c['sha1'])
return converters.from_content(content)
def stat_counters():
"""Return the stat counters for Software Heritage
Returns:
A dict mapping textual labels to integer values.
"""
return backend.stat_counters()
+
+
+def lookup_entity_by_uuid(uuid):
+ """Return the entity's hierarchy from its uuid.
+
+ Args:
+ uuid: entity's identifier.
+
+ Returns:
+ List of hierarchy entities from the entity with uuid.
+
+ """
+ uuid = query.parse_uuid4(uuid)
+ return backend.entity_get(uuid)
diff --git a/swh/web/ui/tests/test_api.py b/swh/web/ui/tests/test_api.py
index 975978e8..7f08c973 100644
--- a/swh/web/ui/tests/test_api.py
+++ b/swh/web/ui/tests/test_api.py
@@ -1,1856 +1,1939 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
import unittest
import yaml
from nose.tools import istest
from unittest.mock import patch, MagicMock
from swh.web.ui.tests import test_app
from swh.web.ui import api, exc
-from swh.web.ui.exc import NotFoundExc
+from swh.web.ui.exc import NotFoundExc, BadInputExc
class ApiTestCase(test_app.SWHApiTestCase):
@istest
def generic_api_lookup_Nothing_is_found(self):
# given
def test_generic_lookup_fn(sha1, another_unused_arg):
assert another_unused_arg == 'unused arg'
assert sha1 == 'sha1'
return None
# when
with self.assertRaises(NotFoundExc) as cm:
api._api_lookup('sha1', test_generic_lookup_fn,
'This will be raised because None is returned.',
lambda x: x,
'unused arg')
self.assertIn('This will be raised because None is returned.',
cm.exception.args[0])
@istest
def generic_api_map_are_enriched_and_transformed_to_list(self):
# given
def test_generic_lookup_fn_1(criteria0, param0, param1):
assert criteria0 == 'something'
return map(lambda x: x + 1, [1, 2, 3])
# when
actual_result = api._api_lookup(
'something',
test_generic_lookup_fn_1,
'This is not the error message you are looking for. Move along.',
lambda x: x * 2,
'some param 0',
'some param 1')
self.assertEqual(actual_result, [4, 6, 8])
@istest
def generic_api_list_are_enriched_too(self):
# given
def test_generic_lookup_fn_2(crit):
assert crit == 'something'
return ['a', 'b', 'c']
# when
actual_result = api._api_lookup(
'something',
test_generic_lookup_fn_2,
'Not the error message you are looking for, it is. '
'Along, you move!',
lambda x: ''. join(['=', x, '=']))
self.assertEqual(actual_result, ['=a=', '=b=', '=c='])
@istest
def generic_api_generator_are_enriched_and_returned_as_list(self):
# given
def test_generic_lookup_fn_3(crit):
assert crit == 'crit'
return (i for i in [4, 5, 6])
# when
actual_result = api._api_lookup(
'crit',
test_generic_lookup_fn_3,
'Move!',
lambda x: x - 1)
self.assertEqual(actual_result, [3, 4, 5])
@istest
def generic_api_simple_data_are_enriched_and_returned_too(self):
# given
def test_generic_lookup_fn_4(crit):
assert crit == '123'
return {'a': 10}
def test_enrich_data(x):
x['a'] = x['a'] * 10
return x
# when
actual_result = api._api_lookup(
'123',
test_generic_lookup_fn_4,
'Nothing to do',
test_enrich_data)
self.assertEqual(actual_result, {'a': 100})
@patch('swh.web.ui.api.service')
# @istest
def api_content_checksum_to_origin(self, mock_service):
mock_service.lookup_hash.return_value = {'found': True}
stub_origin = {
"lister": None,
"url": "rsync://ftp.gnu.org/old-gnu/webbase",
"type": "ftp",
"id": 2,
"project": None
}
mock_service.lookup_hash_origin.return_value = stub_origin
# when
rv = self.app.get(
'/api/1/browse/sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, stub_origin)
mock_service.lookup_hash.assert_called_once_with(
'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
mock_service.lookup_hash_origin.assert_called_once_with(
'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
@patch('swh.web.ui.api.service')
# @istest
def api_content_checksum_to_origin_sha_not_found(self, mock_service):
# given
mock_service.lookup_hash.return_value = {'found': False}
# when
rv = self.app.get(
'/api/1/browse/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Content with sha1:40e71b8614fcd89ccd17ca2b1d9e6'
'6c5b00a6d03 not found.'
})
mock_service.lookup_hash.assert_called_once_with(
'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
@patch('swh.web.ui.api.service')
@istest
def api_content_with_details(self, mock_service):
# given
mock_service.lookup_content.return_value = {
'sha1': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'sha1_git': 'b4e8f472ffcb01a03875b26e462eb568739f6882',
'sha256': '83c0e67cc80f60caf1fcbec2d84b0ccd7968b3be4735637006560'
'cde9b067a4f',
'length': 17,
'status': 'visible'
}
# when
rv = self.app.get(
'/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'data_url': '/api/1/content/'
'40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/raw/',
'sha1': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'sha1_git': 'b4e8f472ffcb01a03875b26e462eb568739f6882',
'sha256': '83c0e67cc80f60caf1fcbec2d84b0ccd7968b3be4735637006560c'
'de9b067a4f',
'length': 17,
'status': 'visible'
})
mock_service.lookup_content.assert_called_once_with(
'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
@patch('swh.web.ui.api.service')
@istest
def api_content_not_found_as_json(self, mock_service):
# given
mock_service.lookup_content.return_value = None
mock_service.lookup_hash_origin = MagicMock()
# when
rv = self.app.get(
'/api/1/content/sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3'
'be4735637006560c/')
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Content with sha256:83c0e67cc80f60caf1fcbec2d84b0ccd79'
'68b3be4735637006560c not found.'
})
mock_service.lookup_content.assert_called_once_with(
'sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3'
'be4735637006560c')
mock_service.lookup_hash_origin.called = False
@patch('swh.web.ui.api.service')
@istest
def api_content_not_found_as_yaml(self, mock_service):
# given
mock_service.lookup_content.return_value = None
mock_service.lookup_hash_origin = MagicMock()
# when
rv = self.app.get(
'/api/1/content/sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3'
'be4735637006560c/',
headers={'accept': 'application/yaml'})
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/yaml')
response_data = yaml.load(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Content with sha256:83c0e67cc80f60caf1fcbec2d84b0ccd79'
'68b3be4735637006560c not found.'
})
mock_service.lookup_content.assert_called_once_with(
'sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3'
'be4735637006560c')
mock_service.lookup_hash_origin.called = False
@patch('swh.web.ui.api.service')
@istest
def api_content_raw(self, mock_service):
# given
stub_content = {'data': b'some content data'}
mock_service.lookup_content_raw.return_value = stub_content
# when
rv = self.app.get(
'/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'
'/raw/',
headers={'Content-type': 'application/octet-stream',
'Content-disposition': 'attachment'})
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/octet-stream')
self.assertEquals(rv.data, stub_content['data'])
mock_service.lookup_content_raw.assert_called_once_with(
'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
@patch('swh.web.ui.api.service')
@istest
def api_content_raw_not_found(self, mock_service):
# given
mock_service.lookup_content_raw.return_value = None
# when
rv = self.app.get(
'/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'
'/raw/')
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Content with sha1:40e71b8614fcd89ccd17ca2b1d9e6'
'6c5b00a6d03 not found.'
})
mock_service.lookup_content_raw.assert_called_once_with(
'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
@patch('swh.web.ui.api.service')
@istest
def api_search(self, mock_service):
# given
mock_service.lookup_hash.return_value = {
'found': {
'sha1': 'or something'
}
}
# when
rv = self.app.get('/api/1/search/sha1:blah/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {'found': True})
mock_service.lookup_hash.assert_called_once_with('sha1:blah')
@patch('swh.web.ui.api.service')
@istest
def api_search_as_yaml(self, mock_service):
# given
mock_service.lookup_hash.return_value = {
'found': {
'sha1': 'sha1 hash'
}
}
# when
rv = self.app.get('/api/1/search/sha1:halb/',
headers={'Accept': 'application/yaml'})
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/yaml')
response_data = yaml.load(rv.data.decode('utf-8'))
self.assertEquals(response_data, {'found': True})
mock_service.lookup_hash.assert_called_once_with('sha1:halb')
@patch('swh.web.ui.api.service')
@istest
def api_search_not_found(self, mock_service):
# given
mock_service.lookup_hash.return_value = {}
# when
rv = self.app.get('/api/1/search/sha1:halb/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {'found': False})
mock_service.lookup_hash.assert_called_once_with('sha1:halb')
@patch('swh.web.ui.api.service')
@istest
def api_1_stat_counters_raise_error(self, mock_service):
# given
mock_service.stat_counters.side_effect = ValueError(
'voluntary error to check the bad request middleware.')
# when
rv = self.app.get('/api/1/stat/counters/')
# then
self.assertEquals(rv.status_code, 400)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'voluntary error to check the bad request middleware.'})
@patch('swh.web.ui.api.service')
@istest
def api_1_stat_counters(self, mock_service):
# given
stub_stats = {
"content": 1770830,
"directory": 211683,
"directory_entry_dir": 209167,
"directory_entry_file": 1807094,
"directory_entry_rev": 0,
"entity": 0,
"entity_history": 0,
"occurrence": 0,
"occurrence_history": 19600,
"origin": 1096,
"person": 0,
"release": 8584,
"revision": 7792,
"revision_history": 0,
"skipped_content": 0
}
mock_service.stat_counters.return_value = stub_stats
# when
rv = self.app.get('/api/1/stat/counters/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, stub_stats)
mock_service.stat_counters.assert_called_once_with()
@patch('swh.web.ui.api.service')
@patch('swh.web.ui.api.request')
@istest
def api_uploadnsearch_bad_input(self, mock_request, mock_service):
# given
mock_request.files = {}
# when
rv = self.app.post('/api/1/uploadnsearch/')
self.assertEquals(rv.status_code, 400)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': "Bad request, missing 'filename' entry in form."})
mock_service.upload_and_search.called = False
@patch('swh.web.ui.api.service')
@patch('swh.web.ui.api.request')
@istest
def api_uploadnsearch(self, mock_request, mock_service):
# given
mock_request.files = {'filename': 'simple-filename'}
mock_service.upload_and_search.return_value = {
'filename': 'simple-filename',
'sha1': 'some-hex-sha1',
'found': False,
}
# when
rv = self.app.post('/api/1/uploadnsearch/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {'filename': 'simple-filename',
'sha1': 'some-hex-sha1',
'found': False})
mock_service.upload_and_search.assert_called_once_with(
'simple-filename')
@patch('swh.web.ui.api.service')
@istest
def api_origin(self, mock_service):
# given
stub_origin = {
'id': 1234,
'lister': 'uuid-lister-0',
'project': 'uuid-project-0',
'url': 'ftp://some/url/to/origin/0',
'type': 'ftp'
}
mock_service.lookup_origin.return_value = stub_origin
# when
rv = self.app.get('/api/1/origin/1234/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, stub_origin)
mock_service.lookup_origin.assert_called_with(1234)
@patch('swh.web.ui.api.service')
@istest
def api_origin_not_found(self, mock_service):
# given
mock_service.lookup_origin.return_value = None
# when
rv = self.app.get('/api/1/origin/4321/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Origin with id 4321 not found.'
})
mock_service.lookup_origin.assert_called_with(4321)
@patch('swh.web.ui.api.service')
@istest
def api_release(self, mock_service):
# given
stub_release = {
'id': 'release-0',
'target_type': 'revision',
'target': 'revision-sha1',
"date": "Mon, 10 Mar 1997 08:00:00 GMT",
"synthetic": True,
'author': {
'name': 'author release name',
'email': 'author@email',
},
}
expected_release = {
'id': 'release-0',
'target_type': 'revision',
'target': 'revision-sha1',
'target_url': '/api/1/revision/revision-sha1/',
"date": "Mon, 10 Mar 1997 08:00:00 GMT",
"synthetic": True,
'author': {
'name': 'author release name',
'email': 'author@email',
},
}
mock_service.lookup_release.return_value = stub_release
# when
rv = self.app.get('/api/1/release/release-0/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_release)
mock_service.lookup_release.assert_called_once_with('release-0')
@patch('swh.web.ui.api.service')
@istest
def api_release_target_type_not_a_revision(self, mock_service):
# given
stub_release = {
'id': 'release-0',
'target_type': 'other-stuff',
'target': 'other-stuff-checksum',
"date": "Mon, 10 Mar 1997 08:00:00 GMT",
"synthetic": True,
'author': {
'name': 'author release name',
'email': 'author@email',
},
}
expected_release = {
'id': 'release-0',
'target_type': 'other-stuff',
'target': 'other-stuff-checksum',
"date": "Mon, 10 Mar 1997 08:00:00 GMT",
"synthetic": True,
'author': {
'name': 'author release name',
'email': 'author@email',
},
}
mock_service.lookup_release.return_value = stub_release
# when
rv = self.app.get('/api/1/release/release-0/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_release)
mock_service.lookup_release.assert_called_once_with('release-0')
@patch('swh.web.ui.api.service')
@istest
def api_release_not_found(self, mock_service):
# given
mock_service.lookup_release.return_value = None
# when
rv = self.app.get('/api/1/release/release-0/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Release with sha1_git release-0 not found.'
})
@patch('swh.web.ui.api.service')
@istest
def api_revision(self, mock_service):
# given
stub_revision = {
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'author_name': 'Software Heritage',
'author_email': 'robot@softwareheritage.org',
'committer_name': 'Software Heritage',
'committer_email': 'robot@softwareheritage.org',
'message': 'synthetic revision message',
'date_offset': 0,
'committer_date_offset': 0,
'parents': ['8734ef7e7c357ce2af928115c6c6a42b7e2a44e7'],
'type': 'tar',
'synthetic': True,
'metadata': {
'original_artifact': [{
'archive_type': 'tar',
'name': 'webbase-5.7.0.tar.gz',
'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd',
'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1',
'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f'
'309d36484e7edf7bb912'
}]
},
}
mock_service.lookup_revision.return_value = stub_revision
expected_revision = {
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'url': '/api/1/revision/18d8be353ed3480476f032475e7c233eff7371d5/',
'history_url': '/api/1/revision/18d8be353ed3480476f032475e7c233e'
'ff7371d5/log/',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'directory_url': '/api/1/directory/7834ef7e7c357ce2af928115c6c6'
'a42b7e2a44e6/',
'author_name': 'Software Heritage',
'author_email': 'robot@softwareheritage.org',
'committer_name': 'Software Heritage',
'committer_email': 'robot@softwareheritage.org',
'message': 'synthetic revision message',
'date_offset': 0,
'committer_date_offset': 0,
'parents': [
'8734ef7e7c357ce2af928115c6c6a42b7e2a44e7'
],
'parent_urls': [
'/api/1/revision/18d8be353ed3480476f032475e7c233eff7371d5'
'/history/8734ef7e7c357ce2af928115c6c6a42b7e2a44e7/'
],
'type': 'tar',
'synthetic': True,
'metadata': {
'original_artifact': [{
'archive_type': 'tar',
'name': 'webbase-5.7.0.tar.gz',
'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd',
'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1',
'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f'
'309d36484e7edf7bb912'
}]
},
}
# when
rv = self.app.get('/api/1/revision/'
'18d8be353ed3480476f032475e7c233eff7371d5/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_revision)
mock_service.lookup_revision.assert_called_once_with(
'18d8be353ed3480476f032475e7c233eff7371d5')
@patch('swh.web.ui.api.service')
@istest
def api_revision_not_found(self, mock_service):
# given
mock_service.lookup_revision.return_value = None
# when
rv = self.app.get('/api/1/revision/revision-0/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Revision with sha1_git revision-0 not found.'})
@patch('swh.web.ui.api.service')
@istest
def api_revision_with_origin_not_found(self, mock_service):
mock_service.lookup_revision_by.return_value = None
rv = self.app.get('/api/1/revision/origin/123/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertIn('Revision with (origin_id: 123', response_data['error'])
self.assertIn('not found', response_data['error'])
mock_service.lookup_revision_by.assert_called_once_with(
123,
'refs/heads/master',
None)
@patch('swh.web.ui.api.service')
@istest
def api_revision_with_origin(self, mock_service):
mock_revision = {
'id': '32',
'directory': '21',
'message': 'message 1',
'type': 'deb',
}
expected_revision = {
'id': '32',
'url': '/api/1/revision/32/',
'history_url': '/api/1/revision/32/log/',
'directory': '21',
'directory_url': '/api/1/directory/21/',
'message': 'message 1',
'type': 'deb',
}
mock_service.lookup_revision_by.return_value = mock_revision
rv = self.app.get('/api/1/revision/origin/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, expected_revision)
mock_service.lookup_revision_by.assert_called_once_with(
1,
'refs/heads/master',
None)
@patch('swh.web.ui.api.service')
@istest
def api_revision_with_origin_and_branch_name(self, mock_service):
mock_revision = {
'id': '12',
'directory': '23',
'message': 'message 2',
'type': 'tar',
}
mock_service.lookup_revision_by.return_value = mock_revision
expected_revision = {
'id': '12',
'url': '/api/1/revision/12/',
'history_url': '/api/1/revision/12/log/',
'directory': '23',
'directory_url': '/api/1/directory/23/',
'message': 'message 2',
'type': 'tar',
}
rv = self.app.get('/api/1/revision/origin/1/branch/refs/origin/dev/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, expected_revision)
mock_service.lookup_revision_by.assert_called_once_with(
1,
'refs/origin/dev',
None)
@patch('swh.web.ui.api.service')
@patch('swh.web.ui.api.utils')
@istest
def api_revision_with_origin_and_branch_name_and_timestamp(self,
mock_utils,
mock_service):
mock_revision = {
'id': '123',
'directory': '456',
'message': 'message 3',
'type': 'tar',
}
mock_service.lookup_revision_by.return_value = mock_revision
expected_revision = {
'id': '123',
'url': '/api/1/revision/123/',
'history_url': '/api/1/revision/123/log/',
'directory': '456',
'directory_url': '/api/1/directory/456/',
'message': 'message 3',
'type': 'tar',
}
mock_utils.parse_timestamp.return_value = 'parsed-date'
rv = self.app.get('/api/1/revision'
'/origin/1'
'/branch/refs/origin/dev'
'/ts/1452591542/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, expected_revision)
mock_service.lookup_revision_by.assert_called_once_with(
1,
'refs/origin/dev',
'parsed-date')
mock_utils.parse_timestamp.assert_called_once_with('1452591542')
@patch('swh.web.ui.api.service')
@patch('swh.web.ui.api.utils')
@istest
def api_revision_with_origin_and_branch_name_and_timestamp_with_escapes(
self,
mock_utils,
mock_service):
mock_revisions = [{
'id': '999',
}]
mock_service.lookup_revision_by.return_value = mock_revisions
expected_revisions = [{
'id': '999',
'url': '/api/1/revision/999/',
'history_url': '/api/1/revision/999/log/',
}]
mock_utils.parse_timestamp.return_value = 'parsed-date'
rv = self.app.get('/api/1/revision'
'/origin/1'
'/branch/refs%2Forigin%2Fdev'
'/ts/Today%20is%20'
'January%201,%202047%20at%208:21:00AM/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, expected_revisions)
mock_service.lookup_revision_by.assert_called_once_with(
1,
'refs/origin/dev',
'parsed-date')
mock_utils.parse_timestamp.assert_called_once_with(
'Today is January 1, 2047 at 8:21:00AM')
@patch('swh.web.ui.api.service')
@istest
def api_directory_through_rev_with_origin_history_with_rev_not_found_0(
self, mock_service):
# test both endpoint (path is useless here, still the endpoint
# must be tested)
for url in ['/api/1/revision'
'/origin/1'
'/history/4563'
'/directory/',
'/api/1/revision'
'/origin/1'
'/history/4563'
'/directory/some-path/']:
# given
mock_service.lookup_revision_with_context_by.return_value = {
'id': 'root-rev-id'}, None
# when
rv = self.app.get(url)
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, {
'error':
"Possibly sha1_git '%s' is not an ancestor of sha1_git_root "
"'%s' sha1_git_root being the revision's identifier pointed to"
" by (origin_id: %s, branch_name: %s, ts: %s)." % (
'4563',
'root-rev-id',
1,
'refs/heads/master',
None)})
mock_service.lookup_revision_with_context_by.assert_called(
1, 'refs/heads/master', None, '4563', 100)
# reset_mock
mock_service.reset_mock()
@patch('swh.web.ui.api.service')
@istest
def api_directory_through_rev_with_origin_history_rev_not_found_1( # noqa
self, mock_service):
# test both endpoint (path is useless here, still the endpoint
# must be tested)
for url in ['/api/1/revision'
'/origin/10'
'/branch/origin/dev'
'/history/213'
'/directory/',
'/api/1/revision'
'/origin/10'
'/branch/origin/dev'
'/history/213'
'/directory/some/path/']:
# given
mock_service.lookup_revision_with_context_by.return_value = {
'id': 'root-rev-id'}, None
# when
rv = self.app.get(url)
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, {
'error':
"Possibly sha1_git '%s' is not an ancestor of sha1_git_root "
"'%s' sha1_git_root being the revision's identifier pointed to"
" by (origin_id: %s, branch_name: %s, ts: %s)." % (
'213',
'root-rev-id',
10,
'origin/dev',
None)})
mock_service.lookup_revision_with_context_by.assert_called(
10, 'origin/dev', None, '213', 100)
# for next turn
mock_service.reset_mock()
@patch('swh.web.ui.api.utils')
@patch('swh.web.ui.api.service')
@istest
def api_directory_through_rev_with_origin_history_rev_found_2(
self, mock_service, mock_utils):
# test both endpoint (path is useless here, still the endpoint
# must be tested)
for url in ['/api/1/revision'
'/origin/100'
'/branch/master'
'/ts/2012-11-23'
'/history/876'
'/directory/',
'/api/1/revision'
'/origin/100'
'/branch/master'
'/ts/2012-11-23'
'/history/876'
'/directory/some-path/']:
# given
mock_service.lookup_revision_with_context_by.return_value = {
'id': 'root-rev-id'}, None
mock_utils.parse_timestamp.return_value = '2012-11-23 00:00:00'
# when
rv = self.app.get(url)
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, {
'error':
"Possibly sha1_git '%s' is not an ancestor of sha1_git_root "
"'%s' sha1_git_root being the revision's identifier pointed to"
" by (origin_id: %s, branch_name: %s, ts: %s)." % (
'876',
'root-rev-id',
100,
'master',
'2012-11-23 00:00:00')})
mock_service.lookup_revision_with_context_by.assert_called(
100, 'master', '2012-11-23 00:00:00', '876', 100)
mock_utils.parse_timestamp.assert_called_once_with('2012-11-23')
# rest
mock_utils.reset_mock()
mock_service.reset_mock()
@patch('swh.web.ui.api.utils')
@patch('swh.web.ui.api.service')
@istest
def api_directory_through_rev_with_origin_history_rev_ok_but_sha1_rev_not(
self, mock_service, mock_utils):
# test both endpoint (path is useless here, still the endpoint
# must be tested)
for url in ['/api/1/revision'
'/origin/666'
'/branch/refs/master'
'/ts/2016-11-23'
'/history/123-sha1-git'
'/directory/?limit=1000',
'/api/1/revision'
'/origin/666'
'/branch/refs/master'
'/ts/2016-11-23'
'/history/123-sha1-git'
'/directory/some/path/?limit=1000']:
# given
mock_service.lookup_revision_with_context_by.return_value = {
'id': 'root-rev-id'}, None
mock_utils.parse_timestamp.return_value = '2016-11-23 00:00:00'
# when
rv = self.app.get(url)
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, {
'error':
"Possibly sha1_git '%s' is not an ancestor of sha1_git_root "
"'%s' sha1_git_root being the revision's identifier pointed to"
" by (origin_id: %s, branch_name: %s, ts: %s)."
% ('123-sha1-git',
'root-rev-id',
666,
'refs/master',
'2016-11-23 00:00:00')})
mock_service.lookup_revision_with_context_by.assert_called(
666, 'refs/master', '2016-11-23 00:00:00',
'123-sha1-git', 1000)
mock_utils.parse_timestamp.assert_called_once_with('2016-11-23')
# reset_mock
mock_service.reset_mock()
mock_utils.reset_mock()
@patch('swh.web.ui.api._revision_directory')
@patch('swh.web.ui.api.utils')
@patch('swh.web.ui.api.service')
@istest
def api_directory_through_revision_with_origin_history(
self, mock_service, mock_utils, mock_revision_directory):
# given
stub_root_rev = {
'id': '45-sha1-git-root'
}
stub_revision = {
'id': 'revision-id',
}
mock_service.lookup_revision_with_context_by.return_value = (
stub_root_rev,
stub_revision)
stub_dir_content = [
{
'type': 'dir'
},
{
'type': 'file'
},
]
mock_revision_directory.return_value = stub_dir_content
mock_utils.parse_timestamp.return_value = '2016-11-24 00:00:00'
# when
url = '/api/1/revision' \
'/origin/999' \
'/branch/refs/dev' \
'/ts/2016-11-24' \
'/history/12-sha1-git' \
'/directory/some/content/'
rv = self.app.get(url)
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, stub_dir_content)
mock_utils.parse_timestamp.assert_called_once_with('2016-11-24')
mock_service.lookup_revision_with_context_by(999,
'refs/dev',
'2016-11-24 00:00:00'
'12-sha1-git',
100)
mock_revision_directory.assert_called_once_with('revision-id',
'some/content',
url)
@patch('swh.web.ui.api.service')
@istest
def api_history_through_revision_with_origin_rev_not_found_0(
self, mock_service):
mock_service.lookup_revision_with_context_by.return_value = {
'id': 'root-rev-id'}, None
# when
rv = self.app.get('/api/1/revision'
'/origin/1'
'/history/4563/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, {
'error':
"Possibly sha1_git '%s' is not an ancestor of sha1_git_root '%s'"
" sha1_git_root being the revision's identifier pointed to by "
"(origin_id: %s, branch_name: %s, ts: %s)."
% ('4563',
'root-rev-id',
1,
'refs/heads/master',
None)})
mock_service.lookup_revision_with_context_by.assert_called_once_with(
1, 'refs/heads/master', None, '4563', 100)
@patch('swh.web.ui.api.service')
@istest
def api_history_through_revision_with_origin_rev_not_found_1(
self, mock_service):
# given
mock_service.lookup_revision_with_context_by.return_value = {
'id': 'root-rev-id'}, None
# when
rv = self.app.get('/api/1/revision'
'/origin/10'
'/branch/origin/dev'
'/history/213/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, {
'error':
"Possibly sha1_git '%s' is not an ancestor of sha1_git_root '%s'"
" sha1_git_root being the revision's identifier pointed to by "
"(origin_id: %s, branch_name: %s, ts: %s)."
% ('213',
'root-rev-id',
10,
'origin/dev',
None)})
mock_service.lookup_revision_with_context_by.assert_called_once_with(
10, 'origin/dev', None, '213', 100)
@patch('swh.web.ui.api.utils')
@patch('swh.web.ui.api.service')
@istest
def api_history_through_revision_with_origin_rev_not_found_2(
self, mock_service, mock_utils):
# given
mock_service.lookup_revision_with_context_by.return_value = {
'id': 'root-rev-id'}, None
mock_utils.parse_timestamp.return_value = '2012-11-23 00:00:00'
# when
rv = self.app.get('/api/1/revision'
'/origin/100'
'/branch/master'
'/ts/2012-11-23'
'/history/876/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, {
'error':
"Possibly sha1_git '%s' is not an ancestor of sha1_git_root '%s'"
" sha1_git_root being the revision's identifier pointed to by "
"(origin_id: %s, branch_name: %s, ts: %s)."
% ('876',
'root-rev-id',
100,
'master',
'2012-11-23 00:00:00')})
mock_service.lookup_revision_with_context_by.assert_called_once_with(
100, 'master', '2012-11-23 00:00:00', '876', 100)
mock_utils.parse_timestamp.assert_called_once_with('2012-11-23')
@patch('swh.web.ui.api.utils')
@patch('swh.web.ui.api.service')
@istest
def api_history_through_revision_with_origin_rev_not_found_3(
self, mock_service, mock_utils):
# given
mock_service.lookup_revision_with_context_by.return_value = {
'id': 'root-rev-id'}, None
mock_service.lookup_revision_with_context.return_value = None
mock_utils.parse_timestamp.return_value = '2016-11-23 00:00:00'
# when
rv = self.app.get('/api/1/revision'
'/origin/666'
'/branch/refs/master'
'/ts/2016-11-23'
'/history/123-sha1-git/?limit=1000')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, {
'error':
"Possibly sha1_git '%s' is not an ancestor of sha1_git_root '%s'"
" sha1_git_root being the revision's identifier pointed to by "
"(origin_id: %s, branch_name: %s, ts: %s)."
% ('123-sha1-git',
'root-rev-id',
666,
'refs/master',
'2016-11-23 00:00:00')})
mock_service.lookup_revision_with_context_by.assert_called_once_with(
666, 'refs/master', '2016-11-23 00:00:00', '123-sha1-git', 1000)
mock_utils.parse_timestamp.assert_called_once_with('2016-11-23')
mock_service.lookup_revision_with_context('456-sha1-git-root',
'123-sha1-git',
1000)
@patch('swh.web.ui.api._enrich_revision_with_urls')
@patch('swh.web.ui.api.utils')
@patch('swh.web.ui.api.service')
@istest
def api_history_through_revision(
self, mock_service, mock_utils, mock_enrich):
# given
stub_root_rev = {
'id': '45-sha1-git-root'
}
stub_revision = {
'children': [],
}
mock_service.lookup_revision_with_context_by.return_value = (
stub_root_rev,
stub_revision)
mock_enrich.return_value = 'some-enriched-result'
mock_utils.parse_timestamp.return_value = '2016-11-24 00:00:00'
# when
rv = self.app.get('/api/1/revision'
'/origin/999'
'/branch/refs/dev'
'/ts/2016-11-24'
'/history/12-sha1-git/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, 'some-enriched-result')
mock_service.lookup_revision_with_context_by.assert_called_once_with(
999,
'refs/dev',
'2016-11-24 00:00:00',
'12-sha1-git',
100)
mock_utils.parse_timestamp.assert_called_once_with('2016-11-24')
mock_enrich.assert_called_once_with(stub_revision,
context='45-sha1-git-root')
@patch('swh.web.ui.api.utils')
@patch('swh.web.ui.api.service')
@istest
def api_directory_through_revision_with_origin_rev_not_found(self,
mock_service,
mock_utils):
mock_service.lookup_revision_by.return_value = None
mock_utils.parse_timestamp.return_value = '2012-10-20 00:00:00'
rv = self.app.get('/api/1/revision'
'/origin/10'
'/branch/refs/remote/origin/dev'
'/ts/2012-10-20'
'/directory/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, {
'error': 'Revision with (origin_id: 10, branch_name: '
'refs/remote/origin/dev, ts: 2012-10-20 00:00:00) not found.'})
mock_service.lookup_revision_by.assert_called_once_with(
10,
'refs/remote/origin/dev',
'2012-10-20 00:00:00')
@patch('swh.web.ui.api.service')
@patch('swh.web.ui.api._revision_directory')
@istest
def api_directory_through_revision_with_origin(self,
mock_revision_dir,
mock_service):
mock_revision = {
'id': '998',
}
expected_res = [{
'id': '123'
}]
mock_revision_dir.return_value = expected_res
mock_service.lookup_revision_by.return_value = mock_revision
rv = self.app.get('/api/1/revision/origin/3/directory/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, expected_res)
mock_service.lookup_revision_by.assert_called_once_with(
3,
'refs/heads/master',
None)
mock_revision_dir.assert_called_once_with(
'998',
None,
'/api/1/revision/origin/3/directory/')
@patch('swh.web.ui.api.service')
@istest
def api_revision_log(self, mock_service):
# given
stub_revisions = [{
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'author_name': 'Software Heritage',
'author_email': 'robot@softwareheritage.org',
'committer_name': 'Software Heritage',
'committer_email': 'robot@softwareheritage.org',
'message': 'synthetic revision message',
'date_offset': 0,
'committer_date_offset': 0,
'parents': ['7834ef7e7c357ce2af928115c6c6a42b7e2a4345'],
'type': 'tar',
'synthetic': True,
}]
mock_service.lookup_revision_log.return_value = stub_revisions
expected_revisions = [{
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'url': '/api/1/revision/18d8be353ed3480476f032475e7c233eff7371d5/',
'history_url': '/api/1/revision/18d8be353ed3480476f032475e7c233ef'
'f7371d5/log/',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'directory_url': '/api/1/directory/7834ef7e7c357ce2af928115c6c6a'
'42b7e2a44e6/',
'author_name': 'Software Heritage',
'author_email': 'robot@softwareheritage.org',
'committer_name': 'Software Heritage',
'committer_email': 'robot@softwareheritage.org',
'message': 'synthetic revision message',
'date_offset': 0,
'committer_date_offset': 0,
'parents': [
'7834ef7e7c357ce2af928115c6c6a42b7e2a4345'
],
'parent_urls': [
'/api/1/revision/18d8be353ed3480476f032475e7c233eff7371d5'
'/history/7834ef7e7c357ce2af928115c6c6a42b7e2a4345/'
],
'type': 'tar',
'synthetic': True,
}]
# when
rv = self.app.get('/api/1/revision/8834ef7e7c357ce2af928115c6c6a42'
'b7e2a44e6/log/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_revisions)
mock_service.lookup_revision_log.assert_called_once_with(
'8834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 100)
@patch('swh.web.ui.api.service')
@istest
def api_revision_log_not_found(self, mock_service):
# given
mock_service.lookup_revision_log.return_value = None
# when
rv = self.app.get('/api/1/revision/8834ef7e7c357ce2af928115c6c6a42b7'
'e2a44e6/log/?limit=10')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Revision with sha1_git'
' 8834ef7e7c357ce2af928115c6c6a42b7e2a44e6 not found.'})
mock_service.lookup_revision_log.assert_called_once_with(
'8834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 10)
@patch('swh.web.ui.api.service')
@istest
def api_revision_history_not_found(self, mock_service):
# given
mock_service.lookup_revision_with_context.return_value = None
# then
rv = self.app.get('/api/1/revision/999/history/338/?limit=5')
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
mock_service.lookup_revision_with_context.assert_called_once_with(
'999', '338', 5)
@istest
def api_revision_history_sha1_same_so_redirect(self):
# when
rv = self.app.get('/api/1/revision/123/history/123?limit=10')
# then
self.assertEquals(rv.status_code, 301)
# Ideally we'd like to be able to check the resulting url path
# but does not work, this returns the current url
# also following the redirect would mean to yet mock again the
# destination url... So for now cannot test it
# self.assertEquals(rv.location,
# 'http://localhost/api/1/revision/123?limit=10')
@patch('swh.web.ui.api.service')
@istest
def api_revision_history(self, mock_service):
# for readability purposes, we use:
# - sha1 as 3 letters (url are way too long otherwise to respect pep8)
# - only keys with modification steps (all other keys are kept as is)
# given
stub_revision = {
'id': '883',
'children': ['777', '999'],
'parents': [],
'directory': '272'
}
mock_service.lookup_revision_with_context.return_value = stub_revision
# then
rv = self.app.get('/api/1/revision/666/history/883/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'id': '883',
'url': '/api/1/revision/883/',
'history_url': '/api/1/revision/883/log/',
'children': ['777', '999'],
'children_urls': ['/api/1/revision/666/history/777/',
'/api/1/revision/666/history/999/'],
'parents': [],
'parent_urls': [],
'directory': '272',
'directory_url': '/api/1/directory/272/'
})
mock_service.lookup_revision_with_context.assert_called_once_with(
'666', '883', 100)
@patch('swh.web.ui.api.service')
@istest
def api_directory_with_revision_not_found(self, mock_service):
# given
mock_service.lookup_directory_with_revision.return_value = None
# then
rv = self.app.get('/api/1/revision/999/directory/some/path/to/dir/')
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
mock_service.lookup_directory_with_revision.assert_called_once_with(
'999', 'some/path/to/dir')
@patch('swh.web.ui.api.service')
@istest
def api_directory_with_revision_not_found_2(self, mock_service):
# given
mock_service.lookup_directory_with_revision.return_value = None
# then
rv = self.app.get('/api/1/revision/123/directory/')
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
mock_service.lookup_directory_with_revision.assert_called_once_with(
'123', None)
@patch('swh.web.ui.api.service')
@istest
def api_directory_with_revision_ok_returns_dir_entries(self, mock_service):
stub_dir = {
'type': 'dir',
'content': [
{
'sha1_git': '789',
'type': 'file',
'target': '101',
'name': 'somefile'
},
{
'sha1_git': '123',
'type': 'dir',
'target': '456',
'name': 'to-subdir',
}
]
}
expected_dir = [
{
'sha1_git': '789',
'type': 'file',
'target': '101',
'target_url': '/api/1/content/sha1_git:101/',
'name': 'somefile',
'file_url': '/api/1/revision/999/directory/some/path/somefile/'
},
{
'sha1_git': '123',
'type': 'dir',
'target': '456',
'target_url': '/api/1/directory/456/',
'name': 'to-subdir',
'dir_url': '/api/1/revision/999/directory/some/path/'
'to-subdir/',
}]
# given
mock_service.lookup_directory_with_revision.return_value = stub_dir
# then
rv = self.app.get('/api/1/revision/999/directory/some/path/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_dir)
mock_service.lookup_directory_with_revision.assert_called_once_with(
'999', 'some/path')
@patch('swh.web.ui.api.service')
@istest
def api_directory_with_revision_ok_returns_content(self, mock_service):
stub_content = {
'type': 'file',
'content': {
'sha1_git': '789',
'sha1': '101',
}
}
expected_content = {
'sha1_git': '789',
'sha1': '101',
'data_url': '/api/1/content/101/raw/',
}
# given
mock_service.lookup_directory_with_revision.return_value = stub_content
# then
rv = self.app.get('/api/1/revision/999/directory/some/path/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_content)
mock_service.lookup_directory_with_revision.assert_called_once_with(
'999', 'some/path')
@istest
def api_directory_revision_history_sha1_same_so_redirect(self):
# when
rv = self.app.get(
'/api/1/revision/123/history/123/directory/path/to/?limit=1')
# then
self.assertEquals(rv.status_code, 301)
# self.assertEquals(rv.location,
# 'http://localhost/api/1/revision/123/directory/path/to/')
@patch('swh.web.ui.api.service')
@istest
def api_directory_revision_history_ko_revision_not_found(self,
mock_service):
# given
mock_service.lookup_revision_with_context.return_value = None
# then
rv = self.app.get('/api/1/revision/456/history/987/directory/path/to/')
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': "Possibly sha1_git '987' is not " +
"an ancestor of sha1_git_root '456'"})
mock_service.lookup_revision_with_context.assert_called_once_with(
'456', '987', 100)
@patch('swh.web.ui.api.service')
@istest
def api_directory_revision_history(self,
mock_service):
# given
mock_service.lookup_revision_with_context.return_value = {
'id': 'rev-id'
}
stub_dir = {
'type': 'dir',
'content': [
{
'sha1_git': '879',
'type': 'file',
'target': '110',
'name': 'subfile'
},
{
'sha1_git': '213',
'type': 'dir',
'target': '546',
'name': 'subdir',
}
]
}
expected_dir = [
{
'sha1_git': '879',
'type': 'file',
'target': '110',
'target_url': '/api/1/content/sha1_git:110/',
'name': 'subfile',
'file_url': '/api/1/revision/354/history/867/directory/debian/'
'subfile/',
},
{
'sha1_git': '213',
'type': 'dir',
'target': '546',
'target_url': '/api/1/directory/546/',
'name': 'subdir',
'dir_url':
'/api/1/revision/354/history/867/directory/debian/subdir/'
}]
# given
mock_service.lookup_directory_with_revision.return_value = stub_dir
# then
rv = self.app.get('/api/1/revision/354'
'/history/867'
'/directory/debian/?limit=4')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_dir)
mock_service.lookup_revision_with_context.assert_called_once_with(
'354', '867', 4)
mock_service.lookup_directory_with_revision('rev-id', 'debian')
@patch('swh.web.ui.api.service')
@istest
def api_person(self, mock_service):
# given
stub_person = {
'id': '198003',
'name': 'Software Heritage',
'email': 'robot@softwareheritage.org',
}
mock_service.lookup_person.return_value = stub_person
# when
rv = self.app.get('/api/1/person/198003/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, stub_person)
@patch('swh.web.ui.api.service')
@istest
def api_person_not_found(self, mock_service):
# given
mock_service.lookup_person.return_value = None
# when
rv = self.app.get('/api/1/person/666/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Person with id 666 not found.'})
@patch('swh.web.ui.api.service')
@istest
def api_directory(self, mock_service):
# given
stub_directories = [
{
'sha1_git': '18d8be353ed3480476f032475e7c233eff7371d5',
'type': 'file',
'target': '4568be353ed3480476f032475e7c233eff737123',
},
{
'sha1_git': '1d518d8be353ed3480476f032475e7c233eff737',
'type': 'dir',
'target': '8be353ed3480476f032475e7c233eff737123456',
}]
expected_directories = [
{
'sha1_git': '18d8be353ed3480476f032475e7c233eff7371d5',
'type': 'file',
'target': '4568be353ed3480476f032475e7c233eff737123',
'target_url': '/api/1/content/'
'sha1_git:4568be353ed3480476f032475e7c233eff737123/',
},
{
'sha1_git': '1d518d8be353ed3480476f032475e7c233eff737',
'type': 'dir',
'target': '8be353ed3480476f032475e7c233eff737123456',
'target_url':
'/api/1/directory/8be353ed3480476f032475e7c233eff737123456/',
}]
mock_service.lookup_directory.return_value = stub_directories
# when
rv = self.app.get('/api/1/directory/'
'18d8be353ed3480476f032475e7c233eff7371d5/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_directories)
mock_service.lookup_directory.assert_called_once_with(
'18d8be353ed3480476f032475e7c233eff7371d5')
@patch('swh.web.ui.api.service')
@istest
def api_directory_not_found(self, mock_service):
# given
mock_service.lookup_directory.return_value = []
# when
rv = self.app.get('/api/1/directory/'
'66618d8be353ed3480476f032475e7c233eff737/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Directory with sha1_git '
'66618d8be353ed3480476f032475e7c233eff737 not found.'})
+ @patch('swh.web.ui.api.service')
+ @istest
+ def api_lookup_entity_by_uuid_not_found(self, mock_service):
+ # when
+ mock_service.lookup_entity_by_uuid.return_value = []
+
+ # when
+ rv = self.app.get('/api/1/entity/')
+
+ self.assertEquals(rv.status_code, 404)
+ self.assertEquals(rv.mimetype, 'application/json')
+
+ response_data = json.loads(rv.data.decode('utf-8'))
+ self.assertEquals(response_data, {
+ 'error':
+ "Entity with uuid '5f4d4c51-498a-4e28-88b3-b3e4e8396cba' not " +
+ "found."})
+
+ mock_service.lookup_entity_by_uuid.assert_called_once_with(
+ '5f4d4c51-498a-4e28-88b3-b3e4e8396cba')
+
+ @patch('swh.web.ui.api.service')
+ @istest
+ def api_lookup_entity_by_uuid_bad_request(self, mock_service):
+ # when
+ mock_service.lookup_entity_by_uuid.side_effect = BadInputExc(
+ 'bad input: uuid malformed!')
+
+ # when
+ rv = self.app.get('/api/1/entity/uuid malformed/')
+
+ self.assertEquals(rv.status_code, 400)
+ self.assertEquals(rv.mimetype, 'application/json')
+
+ response_data = json.loads(rv.data.decode('utf-8'))
+ self.assertEquals(response_data, {
+ 'error': 'bad input: uuid malformed!'})
+ mock_service.lookup_entity_by_uuid.assert_called_once_with(
+ 'uuid malformed')
+
+ @patch('swh.web.ui.api.service')
+ @istest
+ def api_lookup_entity_by_uuid(self, mock_service):
+ # when
+ stub_entities = [
+ {
+ 'uuid': '34bd6b1b-463f-43e5-a697-785107f598e4',
+ 'parent': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2'
+ },
+ {
+ 'uuid': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2'
+ }
+ ]
+ mock_service.lookup_entity_by_uuid.return_value = stub_entities
+
+ expected_entities = [
+ {
+ 'uuid': '34bd6b1b-463f-43e5-a697-785107f598e4',
+ 'uuid_url': '/api/1/entity/34bd6b1b-463f-43e5-a697-'
+ '785107f598e4/',
+ 'parent': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2',
+ 'parent_url': '/api/1/entity/aee991a0-f8d7-4295-a201-'
+ 'd1ce2efc9fb2/'
+ },
+ {
+ 'uuid': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2',
+ 'uuid_url': '/api/1/entity/aee991a0-f8d7-4295-a201-'
+ 'd1ce2efc9fb2/'
+ }
+ ]
+
+ # when
+ rv = self.app.get('/api/1/entity'
+ '/34bd6b1b-463f-43e5-a697-785107f598e4/')
+
+ self.assertEquals(rv.status_code, 200)
+ self.assertEquals(rv.mimetype, 'application/json')
+
+ response_data = json.loads(rv.data.decode('utf-8'))
+ self.assertEquals(response_data, expected_entities)
+ mock_service.lookup_entity_by_uuid.assert_called_once_with(
+ '34bd6b1b-463f-43e5-a697-785107f598e4')
+
class ApiUtils(unittest.TestCase):
@istest
def api_lookup_not_found(self):
# when
with self.assertRaises(exc.NotFoundExc) as e:
api._api_lookup('something',
lambda x: None,
'this is the error message raised as it is None')
self.assertEqual(e.exception.args[0],
'this is the error message raised as it is None')
@istest
def api_lookup_with_result(self):
# when
actual_result = api._api_lookup('something',
lambda x: x + '!',
'this is the error which won\'t be '
'used here')
self.assertEqual(actual_result, 'something!')
@istest
def api_lookup_with_result_as_map(self):
# when
actual_result = api._api_lookup([1, 2, 3],
lambda x: map(lambda y: y+1, x),
'this is the error which won\'t be '
'used here')
self.assertEqual(actual_result, [2, 3, 4])
diff --git a/swh/web/ui/tests/test_backend.py b/swh/web/ui/tests/test_backend.py
index de085fbd..3bf07c68 100644
--- a/swh/web/ui/tests/test_backend.py
+++ b/swh/web/ui/tests/test_backend.py
@@ -1,429 +1,448 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
from nose.tools import istest
from unittest.mock import MagicMock
from swh.core import hashutil
from swh.web.ui import backend
from swh.web.ui.tests import test_app
class BackendTestCase(test_app.SWHApiTestCase):
@istest
def content_get_ko_not_found_1(self):
# given
sha1_bin = hashutil.hex_to_hash(
'456caf10e9535160d90e874b45aa426de762f777')
self.storage.content_get = MagicMock(return_value=None)
# when
actual_content = backend.content_get(sha1_bin)
# then
self.assertIsNone(actual_content)
self.storage.content_get.assert_called_once_with(
[sha1_bin])
@istest
def content_get_ko_not_found_empty_result(self):
# given
sha1_bin = hashutil.hex_to_hash(
'456caf10e9535160d90e874b45aa426de762f19f')
self.storage.content_get = MagicMock(return_value=[])
# when
actual_content = backend.content_get(sha1_bin)
# then
self.assertIsNone(actual_content)
self.storage.content_get.assert_called_once_with(
[sha1_bin])
@istest
def content_get(self):
# given
sha1_bin = hashutil.hex_to_hash(
'123caf10e9535160d90e874b45aa426de762f19f')
stub_contents = [{
'sha1': sha1_bin,
'data': b'binary data',
},
{}]
self.storage.content_get = MagicMock(return_value=stub_contents)
# when
actual_content = backend.content_get(sha1_bin)
# then
self.assertEquals(actual_content, stub_contents[0])
self.storage.content_get.assert_called_once_with(
[sha1_bin])
@istest
def content_find_ko_no_result(self):
# given
sha1_bin = hashutil.hex_to_hash(
'123caf10e9535160d90e874b45aa426de762f19f')
self.storage.content_find = MagicMock(return_value=None)
# when
actual_lookup = backend.content_find('sha1_git', sha1_bin)
# then
self.assertIsNone(actual_lookup)
self.storage.content_find.assert_called_once_with(
{'sha1_git': sha1_bin})
@istest
def content_find(self):
# given
sha1_bin = hashutil.hex_to_hash(
'456caf10e9535160d90e874b45aa426de762f19f')
self.storage.content_find = MagicMock(return_value=(1, 2, 3))
# when
actual_content = backend.content_find('sha1', sha1_bin)
# then
self.assertEquals(actual_content, (1, 2, 3))
# check the function has been called with parameters
self.storage.content_find.assert_called_with({'sha1': sha1_bin})
@istest
def content_find_occurrence_ko_no_result(self):
# given
sha1_bin = hashutil.hex_to_hash(
'123caf10e9535160d90e874b45aa426de762f19f')
self.storage.content_find_occurrence = MagicMock(return_value=None)
# when
actual_lookup = backend.content_find_occurrence('sha1_git', sha1_bin)
# then
self.assertIsNone(actual_lookup)
self.storage.content_find_occurrence.assert_called_once_with(
{'sha1_git': sha1_bin})
@istest
def content_find_occurrence(self):
# given
sha1_bin = hashutil.hex_to_hash(
'456caf10e9535160d90e874b45aa426de762f19f')
self.storage.content_find_occurrence = MagicMock(
return_value=(1, 2, 3))
# when
actual_content = backend.content_find_occurrence('sha1', sha1_bin)
# then
self.assertEquals(actual_content, (1, 2, 3))
# check the function has been called with parameters
self.storage.content_find_occurrence.assert_called_with(
{'sha1': sha1_bin})
@istest
def origin_get(self):
# given
self.storage.origin_get = MagicMock(return_value={
'id': 'origin-id',
'lister': 'uuid-lister',
'project': 'uuid-project',
'url': 'ftp://some/url/to/origin',
'type': 'ftp'})
# when
actual_origin = backend.origin_get('origin-id')
# then
self.assertEqual(actual_origin, {'id': 'origin-id',
'lister': 'uuid-lister',
'project': 'uuid-project',
'url': 'ftp://some/url/to/origin',
'type': 'ftp'})
self.storage.origin_get.assert_called_with({'id': 'origin-id'})
@istest
def person_get(self):
# given
self.storage.person_get = MagicMock(return_value={
'id': 'person-id',
'name': 'blah'})
# when
actual_person = backend.person_get('person-id')
# then
self.assertEqual(actual_person, {'id': 'person-id',
'name': 'blah'})
self.storage.person_get.assert_called_with(['person-id'])
@istest
def directory_get_not_found(self):
# given
sha1_bin = hashutil.hex_to_hash(
'40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
self.storage.directory_get = MagicMock(return_value=[])
# when
actual_directory = backend.directory_get(sha1_bin)
# then
self.assertIsNone(actual_directory)
self.storage.directory_get.assert_called_with(sha1_bin, False)
@istest
def directory_get(self):
# given
sha1_bin = hashutil.hex_to_hash(
'40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
stub_dir_entries = [{
'sha1': hashutil.hex_to_hash('5c6f0e2750f48fa0bd0c4cf5976ba0b9e0'
'2ebda5'),
'sha256': hashutil.hex_to_hash('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926'),
'sha1_git': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'target': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'dir_id': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'name': b'bob',
'type': 10,
}]
self.storage.directory_get = MagicMock(
return_value=stub_dir_entries)
actual_directory = backend.directory_get(sha1_bin, recursive=True)
# then
self.assertIsNotNone(actual_directory)
self.assertEqual(list(actual_directory), stub_dir_entries)
self.storage.directory_get.assert_called_with(sha1_bin, True)
@istest
def release_get_not_found(self):
# given
sha1_bin = hashutil.hex_to_hash(
'65a55bbdf3629f916219feb3dcc7393ded1bc8db')
self.storage.release_get = MagicMock(return_value=[])
# when
actual_release = backend.release_get(sha1_bin)
# then
self.assertIsNone(actual_release)
self.storage.release_get.assert_called_with([sha1_bin])
@istest
def release_get(self):
# given
sha1_bin = hashutil.hex_to_hash(
'65a55bbdf3629f916219feb3dcc7393ded1bc8db')
stub_releases = [{
'id': sha1_bin,
'target': None,
'date': datetime.datetime(2015, 1, 1, 22, 0, 0,
tzinfo=datetime.timezone.utc),
'name': b'v0.0.1',
'message': b'synthetic release',
'synthetic': True,
}]
self.storage.release_get = MagicMock(return_value=stub_releases)
# when
actual_release = backend.release_get(sha1_bin)
# then
self.assertEqual(actual_release, stub_releases[0])
self.storage.release_get.assert_called_with([sha1_bin])
@istest
def revision_get_by_not_found(self):
# given
self.storage.revision_get_by = MagicMock(return_value=[])
# when
actual_revision = backend.revision_get_by(10, 'master', 'ts2')
# then
self.assertIsNone(actual_revision)
self.storage.revision_get_by.assert_called_with(10, 'master',
timestamp='ts2',
limit=1)
@istest
def revision_get_by(self):
# given
self.storage.revision_get_by = MagicMock(return_value=[{'id': 1}])
# when
actual_revisions = backend.revision_get_by(100, 'dev', 'ts')
# then
self.assertEquals(actual_revisions, {'id': 1})
self.storage.revision_get_by.assert_called_with(100, 'dev',
timestamp='ts',
limit=1)
@istest
def revision_get_not_found(self):
# given
sha1_bin = hashutil.hex_to_hash(
'18d8be353ed3480476f032475e7c233eff7371d5')
self.storage.revision_get = MagicMock(return_value=[])
# when
actual_revision = backend.revision_get(sha1_bin)
# then
self.assertIsNone(actual_revision)
self.storage.revision_get.assert_called_with([sha1_bin])
@istest
def revision_get(self):
# given
sha1_bin = hashutil.hex_to_hash(
'18d8be353ed3480476f032475e7c233eff7371d5')
stub_revisions = [{
'id': sha1_bin,
'directory': hashutil.hex_to_hash(
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'),
'author': {
'name': b'bill & boule',
'email': b'bill@boule.org',
},
'committer': {
'name': b'boule & bill',
'email': b'boule@bill.org',
},
'message': b'elegant fix for bug 31415957',
'date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'date_offset': 0,
'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
}]
self.storage.revision_get = MagicMock(return_value=stub_revisions)
# when
actual_revision = backend.revision_get(sha1_bin)
# then
self.assertEqual(actual_revision, stub_revisions[0])
self.storage.revision_get.assert_called_with([sha1_bin])
@istest
def revision_log(self):
# given
sha1_bin = hashutil.hex_to_hash(
'28d8be353ed3480476f032475e7c233eff7371d5')
stub_revision_log = [{
'id': sha1_bin,
'directory': hashutil.hex_to_hash(
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'),
'author': {
'name': b'bill & boule',
'email': b'bill@boule.org',
},
'committer': {
'name': b'boule & bill',
'email': b'boule@bill.org',
},
'message': b'elegant fix for bug 31415957',
'date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'date_offset': 0,
'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
}]
self.storage.revision_log = MagicMock(return_value=stub_revision_log)
# when
actual_revision = backend.revision_log(sha1_bin)
# then
self.assertEqual(list(actual_revision), stub_revision_log)
self.storage.revision_log.assert_called_with(sha1_bin, 100)
@istest
def stat_counters(self):
# given
input_stats = {
"content": 1770830,
"directory": 211683,
"directory_entry_dir": 209167,
"directory_entry_file": 1807094,
"directory_entry_rev": 0,
"entity": 0,
"entity_history": 0,
"occurrence": 0,
"occurrence_history": 19600,
"origin": 1096,
"person": 0,
"release": 8584,
"revision": 7792,
"revision_history": 0,
"skipped_content": 0
}
self.storage.stat_counters = MagicMock(return_value=input_stats)
# when
actual_stats = backend.stat_counters()
# then
expected_stats = input_stats
self.assertEqual(actual_stats, expected_stats)
self.storage.stat_counters.assert_called_with()
@istest
def directory_entry_get_by_path(self):
# given
stub_dir_entry = {'id': b'dir-id',
'type': 'dir',
'name': b'some/path/foo'}
self.storage.directory_entry_get_by_path = MagicMock(
return_value=stub_dir_entry)
# when
actual_dir_entry = backend.directory_entry_get_by_path(b'dir-sha1',
'some/path/foo')
self.assertEquals(actual_dir_entry, stub_dir_entry)
self.storage.directory_entry_get_by_path.assert_called_once_with(
b'dir-sha1',
[b'some', b'path', b'foo'])
+
+ @istest
+ def entity_get(self):
+ # given
+ stub_entities = [{'uuid': 'e8c3fc2e-a932-4fd7-8f8e-c40645eb35a7',
+ 'parent': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2'},
+ {'uuid': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2',
+ 'parent': None}]
+ self.storage.entity_get = MagicMock(return_value=stub_entities)
+
+ # when
+ actual_entities = backend.entity_get(
+ 'e8c3fc2e-a932-4fd7-8f8e-c40645eb35a7')
+
+ # then
+ self.assertEquals(actual_entities, stub_entities)
+
+ self.storage.entity_get.assert_called_once_with(
+ 'e8c3fc2e-a932-4fd7-8f8e-c40645eb35a7')
diff --git a/swh/web/ui/tests/test_query.py b/swh/web/ui/tests/test_query.py
index b4e2aa81..1053e2e2 100644
--- a/swh/web/ui/tests/test_query.py
+++ b/swh/web/ui/tests/test_query.py
@@ -1,125 +1,141 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import unittest
from unittest.mock import patch
from nose.tools import istest
from swh.core import hashutil
from swh.web.ui import query
from swh.web.ui.exc import BadInputExc
class QueryTestCase(unittest.TestCase):
@istest
def parse_hash_malformed_query_with_more_than_2_parts(self):
with self.assertRaises(BadInputExc):
query.parse_hash('sha1:1234567890987654:other-stuff')
@istest
def parse_hash_guess_sha1(self):
h = 'f1d2d2f924e986ac86fdf7b36c94bcdf32beec15'
r = query.parse_hash(h)
self.assertEquals(r, ('sha1', hashutil.hex_to_hash(h)))
@istest
def parse_hash_guess_sha256(self):
h = '084C799CD551DD1D8D5C5F9A5D593B2' \
'E931F5E36122ee5c793c1d08a19839cc0'
r = query.parse_hash(h)
self.assertEquals(r, ('sha256', hashutil.hex_to_hash(h)))
@istest
def parse_hash_guess_algo_malformed_hash(self):
with self.assertRaises(BadInputExc):
query.parse_hash('1234567890987654')
@istest
def parse_hash_check_sha1(self):
h = 'f1d2d2f924e986ac86fdf7b36c94bcdf32beec15'
r = query.parse_hash('sha1:' + h)
self.assertEquals(r, ('sha1', hashutil.hex_to_hash(h)))
@istest
def parse_hash_check_sha1_git(self):
h = 'e1d2d2f924e986ac86fdf7b36c94bcdf32beec15'
r = query.parse_hash('sha1_git:' + h)
self.assertEquals(r, ('sha1_git', hashutil.hex_to_hash(h)))
@istest
def parse_hash_check_sha256(self):
h = '084C799CD551DD1D8D5C5F9A5D593B2E931F5E36122ee5c793c1d08a19839cc0'
r = query.parse_hash('sha256:' + h)
self.assertEquals(r, ('sha256', hashutil.hex_to_hash(h)))
@istest
def parse_hash_check_algo_malformed_sha1_hash(self):
with self.assertRaises(BadInputExc):
query.parse_hash('sha1:1234567890987654')
@istest
def parse_hash_check_algo_malformed_sha1_git_hash(self):
with self.assertRaises(BadInputExc):
query.parse_hash('sha1_git:1234567890987654')
@istest
def parse_hash_check_algo_malformed_sha256_hash(self):
with self.assertRaises(BadInputExc):
query.parse_hash('sha256:1234567890987654')
@istest
def parse_hash_check_algo_unknown_one(self):
with self.assertRaises(BadInputExc):
query.parse_hash('sha2:1234567890987654')
@patch('swh.web.ui.query.parse_hash')
@istest
def parse_hash_with_algorithms_or_throws_bad_query(self, mock_hash):
# given
mock_hash.side_effect = BadInputExc('Error input')
# when
with self.assertRaises(BadInputExc) as cm:
query.parse_hash_with_algorithms_or_throws(
'sha1:blah',
['sha1'],
'useless error message for this use case')
self.assertIn('Error input', cm.exception.args[0])
mock_hash.assert_called_once_with('sha1:blah')
@patch('swh.web.ui.query.parse_hash')
@istest
def parse_hash_with_algorithms_or_throws_bad_algo(self, mock_hash):
# given
mock_hash.return_value = 'sha1', '123'
# when
with self.assertRaises(BadInputExc) as cm:
query.parse_hash_with_algorithms_or_throws(
'sha1:431',
['sha1_git'],
'Only sha1_git!')
self.assertIn('Only sha1_git!', cm.exception.args[0])
mock_hash.assert_called_once_with('sha1:431')
@patch('swh.web.ui.query.parse_hash')
@istest
def parse_hash_with_algorithms(self, mock_hash):
# given
mock_hash.return_value = ('sha256', b'123')
# when
algo, sha = query.parse_hash_with_algorithms_or_throws(
'sha256:123',
['sha256', 'sha1_git'],
'useless error message for this use case')
self.assertEquals(algo, 'sha256')
self.assertEquals(sha, b'123')
mock_hash.assert_called_once_with('sha256:123')
+
+ @istest
+ def parse_uuid4(self):
+ # when
+ actual_uuid = query.parse_uuid4('7c33636b-8f11-4bda-89d9-ba8b76a42cec')
+
+ # then
+ self.assertEquals(actual_uuid, '7c33636b-8f11-4bda-89d9-ba8b76a42cec')
+
+ @istest
+ def parse_uuid4_ko(self):
+ # when
+ with self.assertRaises(BadInputExc) as cm:
+ query.parse_uuid4('7c33636b-8f11-4bda-89d9-ba8b76a42')
+ self.assertIn('badly formed hexadecimal UUID string',
+ cm.exception.args[0])
diff --git a/swh/web/ui/tests/test_service.py b/swh/web/ui/tests/test_service.py
index 3f60e3b5..4d569fba 100644
--- a/swh/web/ui/tests/test_service.py
+++ b/swh/web/ui/tests/test_service.py
@@ -1,1084 +1,1104 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
from nose.tools import istest
from unittest.mock import MagicMock, patch, call
from swh.core.hashutil import hex_to_hash, hash_to_hex
from swh.web.ui import service
from swh.web.ui.exc import BadInputExc, NotFoundExc
from swh.web.ui.tests import test_app
class ServiceTestCase(test_app.SWHApiTestCase):
@patch('swh.web.ui.service.backend')
@istest
def lookup_hash_does_not_exist(self, mock_backend):
# given
mock_backend.content_find = MagicMock(return_value=None)
# when
actual_lookup = service.lookup_hash(
'sha1_git:123caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEquals({'found': None,
'algo': 'sha1_git'}, actual_lookup)
# check the function has been called with parameters
mock_backend.content_find.assert_called_with(
'sha1_git',
hex_to_hash('123caf10e9535160d90e874b45aa426de762f19f'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_hash_exist(self, mock_backend):
# given
stub_content = {
'sha1': hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')
}
mock_backend.content_find = MagicMock(return_value=stub_content)
# when
actual_lookup = service.lookup_hash(
'sha1:456caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEquals({'found': stub_content,
'algo': 'sha1'}, actual_lookup)
mock_backend.content_find.assert_called_with(
'sha1',
hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f'),
)
@patch('swh.web.ui.service.backend')
@istest
def lookup_hash_origin(self, mock_backend):
# given
mock_backend.content_find_occurrence = MagicMock(return_value={
'origin_type': 'sftp',
'origin_url': 'sftp://ftp.gnu.org/gnu/octave',
'branch': 'octavio-3.4.0.tar.gz',
'revision': b'\xb0L\xaf\x10\xe9SQ`\xd9\x0e\x87KE\xaaBm\xe7b\xf1\x9f', # noqa
'path': b'octavio-3.4.0/doc/interpreter/octave.html/doc_002dS_005fISREG.html' # noqa
})
expected_origin = {
'origin_type': 'sftp',
'origin_url': 'sftp://ftp.gnu.org/gnu/octave',
'branch': 'octavio-3.4.0.tar.gz',
'revision': 'b04caf10e9535160d90e874b45aa426de762f19f',
'path': 'octavio-3.4.0/doc/interpreter/octave.html/doc'
'_002dS_005fISREG.html'
}
# when
actual_origin = service.lookup_hash_origin(
'sha1_git:456caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEqual(actual_origin, expected_origin)
mock_backend.content_find_occurrence.assert_called_with(
'sha1_git',
hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f'))
@patch('swh.web.ui.service.backend')
@istest
def stat_counters(self, mock_backend):
# given
input_stats = {
"content": 1770830,
"directory": 211683,
"directory_entry_dir": 209167,
"directory_entry_file": 1807094,
"directory_entry_rev": 0,
"entity": 0,
"entity_history": 0,
"occurrence": 0,
"occurrence_history": 19600,
"origin": 1096,
"person": 0,
"release": 8584,
"revision": 7792,
"revision_history": 0,
"skipped_content": 0
}
mock_backend.stat_counters = MagicMock(return_value=input_stats)
# when
actual_stats = service.stat_counters()
# then
expected_stats = input_stats
self.assertEqual(actual_stats, expected_stats)
mock_backend.stat_counters.assert_called_with()
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.hashutil')
@istest
def hash_and_search(self, mock_hashutil, mock_backend):
# given
bhash = hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')
mock_hashutil.hashfile.return_value = {'sha1': bhash}
mock_backend.content_find = MagicMock(return_value={
'sha1': bhash,
'sha1_git': bhash,
})
# when
actual_content = service.hash_and_search('/some/path')
# then
self.assertEqual(actual_content, {
'sha1': '456caf10e9535160d90e874b45aa426de762f19f',
'sha1_git': '456caf10e9535160d90e874b45aa426de762f19f',
'found': True,
})
mock_hashutil.hashfile.assert_called_once_with('/some/path')
mock_backend.content_find.assert_called_once_with('sha1', bhash)
@patch('swh.web.ui.service.hashutil')
@istest
def hash_and_search_not_found(self, mock_hashutil):
# given
bhash = hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')
mock_hashutil.hashfile.return_value = {'sha1': bhash}
mock_hashutil.hash_to_hex = MagicMock(
return_value='456caf10e9535160d90e874b45aa426de762f19f')
self.storage.content_find = MagicMock(return_value=None)
# when
actual_content = service.hash_and_search('/some/path')
# then
self.assertEqual(actual_content, {
'sha1': '456caf10e9535160d90e874b45aa426de762f19f',
'found': False,
})
mock_hashutil.hashfile.assert_called_once_with('/some/path')
self.storage.content_find.assert_called_once_with({'sha1': bhash})
mock_hashutil.hash_to_hex.assert_called_once_with(bhash)
@patch('swh.web.ui.service.upload')
@istest
def test_upload_and_search(self, mock_upload):
mock_upload.save_in_upload_folder.return_value = (
'/tmp/dir', 'some-filename', '/tmp/dir/path/some-filename')
service.hash_and_search = MagicMock(side_effect=lambda filepath:
{'sha1': 'blah',
'found': True})
mock_upload.cleanup.return_value = None
file = MagicMock(filename='some-filename')
# when
actual_res = service.upload_and_search(file)
# then
self.assertEqual(actual_res, {
'filename': 'some-filename',
'sha1': 'blah',
'found': True})
mock_upload.save_in_upload_folder.assert_called_with(file)
mock_upload.cleanup.assert_called_with('/tmp/dir')
service.hash_and_search.assert_called_once_with(
'/tmp/dir/path/some-filename')
@patch('swh.web.ui.service.backend')
@istest
def lookup_origin(self, mock_backend):
# given
mock_backend.origin_get = MagicMock(return_value={
'id': 'origin-id',
'lister': 'uuid-lister',
'project': 'uuid-project',
'url': 'ftp://some/url/to/origin',
'type': 'ftp'})
# when
actual_origin = service.lookup_origin('origin-id')
# then
self.assertEqual(actual_origin, {'id': 'origin-id',
'lister': 'uuid-lister',
'project': 'uuid-project',
'url': 'ftp://some/url/to/origin',
'type': 'ftp'})
mock_backend.origin_get.assert_called_with('origin-id')
@patch('swh.web.ui.service.backend')
@istest
def lookup_release_ko_id_checksum_not_ok_because_not_a_sha1(self,
mock_backend):
# given
mock_backend.release_get = MagicMock()
with self.assertRaises(BadInputExc) as cm:
# when
service.lookup_release('not-a-sha1')
self.assertIn('invalid checksum', cm.exception.args[0])
mock_backend.release_get.called = False
@patch('swh.web.ui.service.backend')
@istest
def lookup_release_ko_id_checksum_ok_but_not_a_sha1(self, mock_backend):
# given
mock_backend.release_get = MagicMock()
# when
with self.assertRaises(BadInputExc) as cm:
service.lookup_release(
'13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f62d4daf5'
'1aea892abe')
self.assertIn('sha1_git supported', cm.exception.args[0])
mock_backend.release_get.called = False
@patch('swh.web.ui.service.backend')
@istest
def lookup_release(self, mock_backend):
# given
mock_backend.release_get = MagicMock(return_value={
'id': hex_to_hash('65a55bbdf3629f916219feb3dcc7393ded1bc8db'),
'target': None,
'date': datetime.datetime(2015, 1, 1, 22, 0, 0,
tzinfo=datetime.timezone.utc),
'name': b'v0.0.1',
'message': b'synthetic release',
'synthetic': True,
})
# when
actual_release = service.lookup_release(
'65a55bbdf3629f916219feb3dcc7393ded1bc8db')
# then
self.assertEqual(actual_release, {
'id': '65a55bbdf3629f916219feb3dcc7393ded1bc8db',
'target': None,
'date': datetime.datetime(2015, 1, 1, 22, 0, 0,
tzinfo=datetime.timezone.utc),
'name': 'v0.0.1',
'message': 'synthetic release',
'synthetic': True,
})
mock_backend.release_get.assert_called_with(
hex_to_hash('65a55bbdf3629f916219feb3dcc7393ded1bc8db'))
@istest
def lookup_revision_with_context_ko_not_a_sha1_1(self):
# given
sha1_git = '13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f62d4' \
'daf51aea892abe'
sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
# when
with self.assertRaises(BadInputExc) as cm:
service.lookup_revision_with_context(sha1_git_root, sha1_git)
self.assertIn('Only sha1_git is supported', cm.exception.args[0])
@istest
def lookup_revision_with_context_ko_not_a_sha1_2(self):
# given
sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
sha1_git = '13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f6' \
'2d4daf51aea892abe'
# when
with self.assertRaises(BadInputExc) as cm:
service.lookup_revision_with_context(sha1_git_root, sha1_git)
self.assertIn('Only sha1_git is supported', cm.exception.args[0])
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_with_context_ko_sha1_git_does_not_exist(
self,
mock_backend):
# given
sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
sha1_git = '777777bdf3629f916219feb3dcc7393ded1bc8db'
sha1_git_bin = hex_to_hash(sha1_git)
mock_backend.revision_get.return_value = None
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_revision_with_context(sha1_git_root, sha1_git)
self.assertIn('Revision 777777bdf3629f916219feb3dcc7393ded1bc8db'
' not found', cm.exception.args[0])
mock_backend.revision_get.assert_called_once_with(
sha1_git_bin)
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_with_context_ko_root_sha1_git_does_not_exist(
self,
mock_backend):
# given
sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
sha1_git = '777777bdf3629f916219feb3dcc7393ded1bc8db'
sha1_git_root_bin = hex_to_hash(sha1_git_root)
sha1_git_bin = hex_to_hash(sha1_git)
mock_backend.revision_get.side_effect = ['foo', None]
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_revision_with_context(sha1_git_root, sha1_git)
self.assertIn('Revision 65a55bbdf3629f916219feb3dcc7393ded1bc8db'
' not found', cm.exception.args[0])
mock_backend.revision_get.assert_has_calls([call(sha1_git_bin),
call(sha1_git_root_bin)])
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_revision_with_context(self, mock_query, mock_backend):
# given
sha1_git_root = '666'
sha1_git = '883'
sha1_git_root_bin = b'666'
sha1_git_bin = b'883'
sha1_git_root_dict = {
'id': sha1_git_root_bin,
'parents': [b'999'],
}
sha1_git_dict = {
'id': sha1_git_bin,
'parents': [],
'directory': b'278',
}
stub_revisions = [
sha1_git_root_dict,
{
'id': b'999',
'parents': [b'777', b'883', b'888'],
},
{
'id': b'777',
'parents': [b'883'],
},
sha1_git_dict,
{
'id': b'888',
'parents': [b'889'],
},
{
'id': b'889',
'parents': [],
},
]
# inputs ok
mock_query.parse_hash_with_algorithms_or_throws.side_effect = [
('sha1', sha1_git_bin),
('sha1', sha1_git_root_bin)
]
# lookup revision first 883, then 666 (both exists)
mock_backend.revision_get.side_effect = [
sha1_git_dict,
sha1_git_root_dict
]
mock_backend.revision_log = MagicMock(
return_value=stub_revisions)
# when
actual_revision = service.lookup_revision_with_context(
sha1_git_root,
sha1_git)
# then
self.assertEquals(actual_revision, {
'id': hash_to_hex(sha1_git_bin),
'parents': [],
'children': [hash_to_hex(b'999'), hash_to_hex(b'777')],
'directory': hash_to_hex(b'278'),
})
mock_query.parse_hash_with_algorithms_or_throws.assert_has_calls(
[call(sha1_git, ['sha1'], 'Only sha1_git is supported.'),
call(sha1_git_root, ['sha1'], 'Only sha1_git is supported.')])
mock_backend.revision_log.assert_called_with(
sha1_git_root_bin, 100)
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_revision_not_found(self,
mock_query,
mock_backend):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
b'123')
mock_backend.revision_get.return_value = None
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_directory_with_revision('123')
self.assertIn('Revision 123 not found', cm.exception.args[0])
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_revision_without_path(self,
mock_query,
mock_backend):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
b'123')
dir_id = b'dir-id-as-sha1'
mock_backend.revision_get.return_value = {
'directory': dir_id,
}
stub_dir_entries = [{
'id': b'123',
'type': 'dir'
}, {
'id': b'456',
'type': 'file'
}]
mock_backend.directory_get.return_value = stub_dir_entries
# when
actual_directory_entries = service.lookup_directory_with_revision(
'123')
self.assertEqual(actual_directory_entries['type'], 'dir')
self.assertEqual(list(actual_directory_entries['content']),
stub_dir_entries)
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
mock_backend.directory_get.assert_called_once_with(dir_id)
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_revision_with_path_to_dir(self,
mock_query,
mock_backend):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
b'123')
dir_id = b'dir-id-as-sha1'
mock_backend.revision_get.return_value = {
'directory': dir_id,
}
stub_dir_entries = [{
'id': b'12',
'type': 'dir'
}, {
'id': b'34',
'type': 'file'
}]
mock_backend.directory_entry_get_by_path.return_value = {
'type': 'dir',
'name': b'some/path',
'target': b'456'
}
mock_backend.directory_get.return_value = stub_dir_entries
# when
actual_directory_entries = service.lookup_directory_with_revision(
'123',
'some/path')
self.assertEqual(actual_directory_entries['type'], 'dir')
self.assertEqual(list(actual_directory_entries['content']),
stub_dir_entries)
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
mock_backend.directory_entry_get_by_path.assert_called_once_with(
dir_id,
'some/path')
mock_backend.directory_get.assert_called_once_with(b'456')
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_revision_with_path_to_file(
self,
mock_query,
mock_backend):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
b'123')
dir_id = b'dir-id-as-sha1'
mock_backend.revision_get.return_value = {
'directory': dir_id,
}
mock_backend.directory_entry_get_by_path.return_value = {
'type': 'file',
'name': b'some/path/to/file',
'target': b'789'
}
stub_content = {
'status': 'visible',
}
mock_backend.content_find.return_value = stub_content
# when
actual_content = service.lookup_directory_with_revision(
'123',
'some/path/to/file')
# then
self.assertEqual(actual_content, {'type': 'file',
'content': stub_content})
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
mock_backend.directory_entry_get_by_path.assert_called_once_with(
b'dir-id-as-sha1', 'some/path/to/file')
mock_backend.content_find.assert_called_once_with('sha1_git', b'789')
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_ko_revision_with_path_to_nowhere(
self,
mock_query,
mock_backend):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
b'123')
dir_id = b'dir-id-as-sha1'
mock_backend.revision_get.return_value = {
'directory': dir_id,
}
mock_backend.directory_entry_get_by_path.return_value = None
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_directory_with_revision(
'123',
'path/to/something/unknown')
self.assertIn("Directory/File 'path/to/something/unknown' " +
"pointed to by revision 123 not found",
cm.exception.args[0])
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
mock_backend.directory_entry_get_by_path.assert_called_once_with(
b'dir-id-as-sha1', 'path/to/something/unknown')
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_ok_type_not_implemented(
self,
mock_query,
mock_backend):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
b'123')
dir_id = b'dir-id-as-sha1'
mock_backend.revision_get.return_value = {
'directory': dir_id,
}
mock_backend.directory_entry_get_by_path.return_value = {
'type': 'rev',
'name': b'some/path/to/rev',
'target': b'456'
}
stub_content = {
'id': b'12',
'type': 'file'
}
mock_backend.content_get.return_value = stub_content
# when
with self.assertRaises(NotImplementedError) as cm:
service.lookup_directory_with_revision(
'123',
'some/path/to/rev')
self.assertIn("Entity of type 'rev' not implemented.",
cm.exception.args[0])
# then
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
mock_backend.directory_entry_get_by_path.assert_called_once_with(
b'dir-id-as-sha1', 'some/path/to/rev')
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision(self, mock_backend):
# given
mock_backend.revision_get = MagicMock(return_value={
'id': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'),
'directory': hex_to_hash(
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'),
'author': {
'name': b'bill & boule',
'email': b'bill@boule.org',
},
'committer': {
'name': b'boule & bill',
'email': b'boule@bill.org',
},
'message': b'elegant fix for bug 31415957',
'date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'date_offset': 0,
'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
})
# when
actual_revision = service.lookup_revision(
'18d8be353ed3480476f032475e7c233eff7371d5')
# then
self.assertEqual(actual_revision, {
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'author': {
'name': 'bill & boule',
'email': 'bill@boule.org',
},
'committer': {
'name': 'boule & bill',
'email': 'boule@bill.org',
},
'message': 'elegant fix for bug 31415957',
'date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'date_offset': 0,
'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
})
mock_backend.revision_get.assert_called_with(
hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_log(self, mock_backend):
# given
stub_revision_log = [{
'id': hex_to_hash('28d8be353ed3480476f032475e7c233eff7371d5'),
'directory': hex_to_hash(
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'),
'author': {
'name': b'bill & boule',
'email': b'bill@boule.org',
},
'committer': {
'name': b'boule & bill',
'email': b'boule@bill.org',
},
'message': b'elegant fix for bug 31415957',
'date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'date_offset': 0,
'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
}]
mock_backend.revision_log = MagicMock(return_value=stub_revision_log)
# when
actual_revision = service.lookup_revision_log(
'abcdbe353ed3480476f032475e7c233eff7371d5')
# then
self.assertEqual(list(actual_revision), [{
'id': '28d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'author': {
'name': 'bill & boule',
'email': 'bill@boule.org',
},
'committer': {
'name': 'boule & bill',
'email': 'boule@bill.org',
},
'message': 'elegant fix for bug 31415957',
'date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'date_offset': 0,
'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
}])
mock_backend.revision_log.assert_called_with(
hex_to_hash('abcdbe353ed3480476f032475e7c233eff7371d5'), 100)
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_raw_not_found(self, mock_backend):
# given
mock_backend.content_find = MagicMock(return_value=None)
# when
actual_content = service.lookup_content_raw(
'sha1:18d8be353ed3480476f032475e7c233eff7371d5')
# then
self.assertIsNone(actual_content)
mock_backend.content_find.assert_called_with(
'sha1', hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_raw(self, mock_backend):
# given
mock_backend.content_find = MagicMock(return_value={
'sha1': '18d8be353ed3480476f032475e7c233eff7371d5',
})
mock_backend.content_get = MagicMock(return_value={
'data': b'binary data'})
# when
actual_content = service.lookup_content_raw(
'sha256:39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926')
# then
self.assertEquals(actual_content, {'data': b'binary data'})
mock_backend.content_find.assert_called_once_with(
'sha256', hex_to_hash('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926'))
mock_backend.content_get.assert_called_once_with(
'18d8be353ed3480476f032475e7c233eff7371d5')
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_not_found(self, mock_backend):
# given
mock_backend.content_find = MagicMock(return_value=None)
# when
actual_content = service.lookup_content(
'sha1:18d8be353ed3480476f032475e7c233eff7371d5')
# then
self.assertIsNone(actual_content)
mock_backend.content_find.assert_called_with(
'sha1', hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_with_sha1(self, mock_backend):
# given
mock_backend.content_find = MagicMock(return_value={
'sha1': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'),
'sha256': hex_to_hash('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926'),
'sha1_git': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'length': 190,
'status': 'hidden',
})
# when
actual_content = service.lookup_content(
'sha1:18d8be353ed3480476f032475e7c233eff7371d5')
# then
self.assertEqual(actual_content, {
'sha1': '18d8be353ed3480476f032475e7c233eff7371d5',
'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d274'
'7d3bf96c926',
'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'length': 190,
'status': 'absent',
})
mock_backend.content_find.assert_called_with(
'sha1', hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_with_sha256(self, mock_backend):
# given
mock_backend.content_find = MagicMock(return_value={
'sha1': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'),
'sha256': hex_to_hash('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926'),
'sha1_git': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'length': 360,
'status': 'visible',
})
# when
actual_content = service.lookup_content(
'sha256:39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926')
# then
self.assertEqual(actual_content, {
'sha1': '18d8be353ed3480476f032475e7c233eff7371d5',
'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d274'
'7d3bf96c926',
'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'length': 360,
'status': 'visible',
})
mock_backend.content_find.assert_called_with(
'sha256', hex_to_hash('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_person(self, mock_backend):
# given
mock_backend.person_get = MagicMock(return_value={
'id': 'person_id',
'name': b'some_name',
'email': b'some-email',
})
# when
actual_person = service.lookup_person('person_id')
# then
self.assertEqual(actual_person, {
'id': 'person_id',
'name': 'some_name',
'email': 'some-email',
})
mock_backend.person_get.assert_called_with('person_id')
@patch('swh.web.ui.service.backend')
@istest
def lookup_directory_bad_checksum(self, mock_backend):
# given
mock_backend.directory_get = MagicMock()
# when
with self.assertRaises(BadInputExc):
service.lookup_directory('directory_id')
# then
mock_backend.directory_get.called = False
@patch('swh.web.ui.service.backend')
@istest
def lookup_directory(self, mock_backend):
# given
stub_dir_entries = [{
'sha1': hex_to_hash('5c6f0e2750f48fa0bd0c4cf5976ba0b9e0'
'2ebda5'),
'sha256': hex_to_hash('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926'),
'sha1_git': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'target': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'dir_id': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'name': b'bob',
'type': 10,
}]
expected_dir_entries = [{
'sha1': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5',
'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d2747'
'd3bf96c926',
'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'target': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'dir_id': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'name': 'bob',
'type': 10,
}]
mock_backend.directory_get = MagicMock(
return_value=stub_dir_entries)
# when
actual_directory = service.lookup_directory(
'40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
# then
self.assertEqual(list(actual_directory), expected_dir_entries)
mock_backend.directory_get.assert_called_with(
hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_by_nothing_found(self, mock_backend):
# given
mock_backend.revision_get_by.return_value = None
# when
actual_revisions = service.lookup_revision_by(1)
# then
self.assertIsNone(actual_revisions)
mock_backend.revision_get_by(1, 'master', None)
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_by(self, mock_backend):
# given
stub_rev = {
'id': hex_to_hash('28d8be353ed3480476f032475e7c233eff7371d5'),
'directory': hex_to_hash(
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'),
'author': {
'name': b'ynot',
'email': b'ynot@blah.org',
},
'committer': {
'name': b'ynot',
'email': b'ynot@blah.org',
},
'message': b'elegant solution 31415',
'date': datetime.datetime(2016, 1, 17, 11, 23, 54),
'date_offset': 0,
'committer_date': datetime.datetime(2016, 1, 17, 11, 23, 54),
'committer_date_offset': 0,
}
expected_rev = {
'id': '28d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'author': {
'name': 'ynot',
'email': 'ynot@blah.org',
},
'committer': {
'name': 'ynot',
'email': 'ynot@blah.org',
},
'message': 'elegant solution 31415',
'date': datetime.datetime(2016, 1, 17, 11, 23, 54),
'date_offset': 0,
'committer_date': datetime.datetime(2016, 1, 17, 11, 23, 54),
'committer_date_offset': 0,
}
mock_backend.revision_get_by.return_value = stub_rev
# when
actual_revision = service.lookup_revision_by(10, 'master2', 'some-ts')
# then
self.assertEquals(actual_revision, expected_rev)
mock_backend.revision_get_by(1, 'master2', 'some-ts')
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_with_context_by_ko(self, mock_backend):
# given
mock_backend.revision_get_by.return_value = None
# when
with self.assertRaises(NotFoundExc) as cm:
origin_id = 1
branch_name = 'master3'
ts = None
service.lookup_revision_with_context_by(origin_id, branch_name, ts,
'sha1')
# then
self.assertIn(
'Revision with (origin_id: %s, branch_name: %s'
', ts: %s) not found.' % (origin_id,
branch_name,
ts), cm.exception.args[0])
mock_backend.revision_get_by.assert_called_once_with(
origin_id, branch_name, ts)
@patch('swh.web.ui.service.lookup_revision_with_context')
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_with_context_by(self, mock_backend,
mock_lookup_revision_with_context):
# given
stub_root_rev = {'id': 'root-rev-id'}
mock_backend.revision_get_by.return_value = {'id': 'root-rev-id'}
stub_rev = {'id': 'rev-found'}
mock_lookup_revision_with_context.return_value = stub_rev
# when
origin_id = 1
branch_name = 'master3'
ts = None
sha1_git = 'sha1'
actual_root_rev, actual_rev = service.lookup_revision_with_context_by(
origin_id, branch_name, ts, sha1_git)
# then
self.assertEquals(actual_root_rev, stub_root_rev)
self.assertEquals(actual_rev, stub_rev)
mock_backend.revision_get_by.assert_called_once_with(
origin_id, branch_name, ts)
mock_lookup_revision_with_context.assert_called_once_with(
stub_root_rev, sha1_git, 100)
+
+ @patch('swh.web.ui.service.backend')
+ @patch('swh.web.ui.service.query')
+ @istest
+ def lookup_entity_by_uuid(self, mock_query, mock_backend):
+ # given
+ uuid_test = 'correct-uuid'
+ mock_query.parse_uuid4.return_value = uuid_test
+ stub_entities = [{'uuid': uuid_test}]
+
+ mock_backend.entity_get.return_value = stub_entities
+
+ # when
+ actual_entities = service.lookup_entity_by_uuid(uuid_test)
+
+ # then
+ self.assertEquals(actual_entities, stub_entities)
+
+ mock_query.parse_uuid4.assert_called_once_with(uuid_test)
+ mock_backend.entity_get.assert_called_once_with(uuid_test)
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Jul 4 2025, 9:52 AM (5 w, 1 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3236801
Attached To
rDWAPPS Web applications
Event Timeline
Log In to Comment