Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/web/ui/api.py b/swh/web/ui/api.py
index 4a3c263f..8a900b69 100644
--- a/swh/web/ui/api.py
+++ b/swh/web/ui/api.py
@@ -1,829 +1,869 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from types import GeneratorType
from flask import request, url_for, Response, redirect
from swh.web.ui import service, utils
from swh.web.ui.exc import BadInputExc, NotFoundExc
from swh.web.ui.main import app
@app.route('/api/1/stat/counters/')
def api_stats():
"""Return statistics on SWH storage.
Returns:
SWH storage's statistics.
"""
return service.stat_counters()
@app.route('/api/1/search/')
@app.route('/api/1/search/<string:q>/')
def api_search(q='sha1:bd819b5b28fcde3bf114d16a44ac46250da94ee5'):
"""Search a content per hash.
Args:
q is of the form algo_hash:hash with algo_hash in
(sha1, sha1_git, sha256).
Returns:
Dictionary with 'found' key and the associated result.
Raises:
BadInputExc in case of unknown algo_hash or bad hash.
Example:
GET /api/1/search/sha1:bd819b5b28fcde3bf114d16a44ac46250da94ee5/
"""
r = service.lookup_hash(q).get('found')
return {'found': True if r else False}
def _api_lookup(criteria,
lookup_fn,
error_msg_if_not_found,
enrich_fn=lambda x: x,
*args):
"""Capture a redundant behavior of:
- looking up the backend with a criteria (be it an identifier or checksum)
passed to the function lookup_fn
- if nothing is found, raise an NotFoundExc exception with error
message error_msg_if_not_found.
- Otherwise if something is returned:
- either as list, map or generator, map the enrich_fn function to it
and return the resulting data structure as list.
- either as dict and pass to enrich_fn and return the dict enriched.
Args:
- criteria: discriminating criteria to lookup
- lookup_fn: function expects one criteria and optional supplementary
*args.
- error_msg_if_not_found: if nothing matching the criteria is found,
raise NotFoundExc with this error message.
- enrich_fn: Function to use to enrich the result returned by
lookup_fn. Default to the identity function if not provided.
- *args: supplementary arguments to pass to lookup_fn.
Raises:
NotFoundExp or whatever `lookup_fn` raises.
"""
res = lookup_fn(criteria, *args)
if not res:
raise NotFoundExc(error_msg_if_not_found)
if isinstance(res, (map, list, GeneratorType)):
enriched_data = []
for e in res:
enriched_data.append(enrich_fn(e))
return enriched_data
return enrich_fn(res)
@app.route('/api/1/origin/')
@app.route('/api/1/origin/<int:origin_id>/')
def api_origin(origin_id=1):
"""Return information about origin with id origin_id.
Args:
origin_id: the origin's identifier.
Returns:
Information on the origin if found.
Raises:
NotFoundExc if the origin is not found.
Example:
GET /api/1/origin/1/
"""
return _api_lookup(
origin_id, lookup_fn=service.lookup_origin,
error_msg_if_not_found='Origin with id %s not found.' % origin_id)
@app.route('/api/1/person/')
@app.route('/api/1/person/<int:person_id>/')
def api_person(person_id=1):
"""Return information about person with identifier person_id.
Args:
person_id: the person's identifier.
Returns:
Information on the person if found.
Raises:
NotFoundExc if the person is not found.
Example:
GET /api/1/person/1/
"""
return _api_lookup(
person_id, lookup_fn=service.lookup_person,
error_msg_if_not_found='Person with id %s not found.' % person_id)
def _enrich_release(release):
"""Enrich a release with link to the 'target' of 'type' revision.
"""
if 'target' in release and \
'target_type' in release and \
release['target_type'] == 'revision':
release['target_url'] = url_for('api_revision',
sha1_git=release['target'])
return release
@app.route('/api/1/release/')
@app.route('/api/1/release/<string:sha1_git>/')
def api_release(sha1_git='3c31de6fdc47031857fda10cfa4caf7044cadefb'):
"""Return information about release with id sha1_git.
Args:
sha1_git: the release's hash.
Returns:
Information on the release if found.
Raises:
BadInputExc in case of unknown algo_hash or bad hash.
NotFoundExc if the release is not found.
Example:
GET /api/1/release/b307094f00c3641b0c9da808d894f3a325371414
"""
error_msg = 'Release with sha1_git %s not found.' % sha1_git
return _api_lookup(
sha1_git,
lookup_fn=service.lookup_release,
error_msg_if_not_found=error_msg,
enrich_fn=_enrich_release)
def _enrich_revision_with_urls(revision, context=None):
"""Enrich revision with links where it makes sense (directory, parents).
"""
if not context:
context = revision['id']
revision['url'] = url_for('api_revision', sha1_git=revision['id'])
revision['history_url'] = url_for('api_revision_log',
sha1_git=revision['id'])
if 'directory' in revision:
revision['directory_url'] = url_for('api_directory',
sha1_git=revision['directory'])
if 'parents' in revision:
parents = []
for parent in revision['parents']:
parents.append(url_for('api_revision_history',
sha1_git_root=context,
sha1_git=parent))
revision['parent_urls'] = parents
if 'children' in revision:
children = []
for child in revision['children']:
children.append(url_for('api_revision_history',
sha1_git_root=context,
sha1_git=child))
revision['children_urls'] = children
return revision
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/directory/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/directory/<path:path>/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/directory/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/directory/<path:path>/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/ts/<string:ts>'
'/directory/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/ts/<string:ts>'
'/directory/<path:path>/')
def api_directory_through_revision_with_origin(origin_id=1,
branch_name="refs/heads/master",
ts=None,
path=None):
"""Display directory or content information through a revision identified
by origin/branch/timestamp.
Args:
origin_id: origin's identifier (default to 1).
branch_name: the optional branch for the given origin (default
to master).
timestamp: optional timestamp (default to the nearest time
crawl of timestamp).
path: Path to directory or file to display.
Returns:
Information on the directory or content pointed to by such revision.
Raises:
NotFoundExc if the revision is not found or the path pointed to
is not found.
"""
if ts:
ts = utils.parse_timestamp(ts)
revision = service.lookup_revision_by(origin_id, branch_name, ts)
if not revision:
raise NotFoundExc('Revision with (origin_id: %s, branch_name: %s'
', ts: %s) not found.' % (origin_id,
branch_name,
ts))
return _revision_directory(revision['id'], path, request.path)
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/history/<sha1_git>/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/history/<sha1_git>/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/ts/<string:ts>'
'/history/<sha1_git>/')
def api_history_through_revision_with_origin(origin_id=1,
branch_name="refs/heads/master",
ts=None,
sha1_git=None):
"""
Return information about revision sha1_git, limited to the
sub-graph of all transitive parents of the revision root identified
by (origin_id, branch_name, ts).
Given sha1_git_root such root revision's identifier, in other words,
sha1_git is an ancestor of sha1_git_root.
Args:
origin_id: origin's identifier (default to 1).
branch_name: the optional branch for the given origin (default
to master).
timestamp: optional timestamp (default to the nearest time
crawl of timestamp).
sha1_git: one of sha1_git_root's ancestors.
limit: optional query parameter to limit the revisions log
(default to 100). For now, note that this limit could impede the
transitivity conclusion about sha1_git not being an ancestor of
sha1_git_root (even if it is).
Returns:
Information on sha1_git if it is an ancestor of sha1_git_root
including children leading to sha1_git_root.
Raises:
BadInputExc in case of unknown algo_hash or bad hash.
NotFoundExc if either revision is not found or if sha1_git is not an
ancestor of sha1_git_root.
"""
limit = int(request.args.get('limit', '100'))
if ts:
ts = utils.parse_timestamp(ts)
rev_root, revision = service.lookup_revision_with_context_by(
origin_id, branch_name, ts, sha1_git, limit)
if not revision:
raise NotFoundExc(
"Possibly sha1_git '%s' is not an ancestor of sha1_git_root '%s' "
"sha1_git_root being the revision's identifier pointed to by "
"(origin_id: %s, branch_name: %s, ts: %s)." % (sha1_git,
rev_root['id'],
origin_id,
branch_name,
ts))
return _enrich_revision_with_urls(revision, context=rev_root['id'])
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/history/<sha1_git>'
'/directory/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/history/<sha1_git>'
'/directory/<path:path>/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/history/<sha1_git>'
'/directory/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/history/<sha1_git>'
'/directory/<path:path>/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/ts/<string:ts>'
'/history/<sha1_git>'
'/directory/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/ts/<string:ts>'
'/history/<sha1_git>'
'/directory/<path:path>/')
def api_directory_through_revision_with_origin_history(
origin_id=1,
branch_name="refs/heads/master",
ts=None,
sha1_git=None,
path=None):
"""Return information about directory or content pointed to by the
revision defined as: revision sha1_git, limited to the sub-graph
of all transitive parents of sha1_git_root (being the identified
sha1 by looking up origin_id/branch_name/ts)
Args:
origin_id: origin's identifier (default to 1).
branch_name: the optional branch for the given origin (default
to master).
timestamp: optional timestamp (default to the nearest time
crawl of timestamp).
sha1_git: one of sha1_git_root's ancestors.
path: optional directory or content pointed to by that revision.
limit: optional query parameter to limit the revisions log
(default to 100). For now, note that this limit could impede the
transitivity conclusion about sha1_git not being an ancestor of
sha1_git_root (even if it is).
Returns:
Information on the directory pointed to by that revision.
Raises:
BadInputExc in case of unknown algo_hash or bad hash.
NotFoundExc if either revision is not found or if sha1_git is not an
ancestor of sha1_git_root or the path referenced does not exist.
"""
limit = int(request.args.get('limit', '100'))
if ts:
ts = utils.parse_timestamp(ts)
rev_root, revision = service.lookup_revision_with_context_by(origin_id,
branch_name,
ts,
sha1_git,
limit)
if not revision:
raise NotFoundExc(
"Possibly sha1_git '%s' is not an ancestor of sha1_git_root '%s' "
"sha1_git_root being the revision's identifier pointed to by "
"(origin_id: %s, branch_name: %s, ts: %s)." % (sha1_git,
rev_root['id'],
origin_id,
branch_name,
ts))
return _revision_directory(revision['id'], path, request.path)
@app.route('/api/1/revision'
'/origin/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/ts/<string:ts>/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/ts/<string:ts>/')
def api_revision_with_origin(origin_id=1,
branch_name="refs/heads/master",
ts=None):
"""Instead of having to specify a (root) revision by SHA1_GIT, users
might want to specify a place and a time. In SWH a "place" is an
origin; a "time" is a timestamp at which some place has been
observed by SWH crawlers.
Args:
origin_id: origin's identifier (default to 1).
branch_name: the optional branch for the given origin (default
to master).
timestamp: optional timestamp (default to the nearest time
crawl of timestamp).
Returns:
Information on the revision if found.
Raises:
BadInputExc in case of unknown algo_hash or bad hash.
NotFoundExc if the revision is not found.
"""
if ts:
ts = utils.parse_timestamp(ts)
return _api_lookup(
origin_id,
service.lookup_revision_by,
'Revision with (origin_id: %s, branch_name: %s'
', ts: %s) not found.' % (origin_id,
branch_name,
ts),
_enrich_revision_with_urls,
branch_name,
ts)
@app.route('/api/1/revision/')
@app.route('/api/1/revision/<string:sha1_git>/')
def api_revision(sha1_git='a585d2b738bfa26326b3f1f40f0f1eda0c067ccf'):
"""Return information about revision with id sha1_git.
Args:
sha1_git: the revision's hash.
Returns:
Information on the revision if found.
Raises:
BadInputExc in case of unknown algo_hash or bad hash.
NotFoundExc if the revision is not found.
Example:
GET /api/1/revision/baf18f9fc50a0b6fef50460a76c33b2ddc57486e
"""
return _api_lookup(
sha1_git,
lookup_fn=service.lookup_revision,
error_msg_if_not_found='Revision with sha1_git %s not'
' found.' % sha1_git,
enrich_fn=_enrich_revision_with_urls)
def _enrich_directory(directory, context_url=None):
"""Enrich directory with url to content or directory.
"""
if 'type' in directory:
target_type = directory['type']
target = directory['target']
if target_type == 'file':
directory['target_url'] = url_for('api_content_with_details',
q='sha1_git:%s' % target)
if context_url:
directory['file_url'] = context_url + directory['name'] + '/'
else:
directory['target_url'] = url_for('api_directory',
sha1_git=target)
if context_url:
directory['dir_url'] = context_url + directory['name'] + '/'
return directory
def _revision_directory(rev_sha1_git, dir_path, request_path):
"""Compute the revision rev_sha1_git's directory or content data.
"""
def enrich_directory_local(dir, context_url=request_path):
return _enrich_directory(dir, context_url)
result = service.lookup_directory_with_revision(rev_sha1_git, dir_path)
if not result:
raise NotFoundExc('Revision with sha1_git %s not'
' found.' % rev_sha1_git)
if result['type'] == 'dir': # dir_entries
return list(map(enrich_directory_local, result['content']))
else: # content
return _enrich_content(result['content'])
@app.route('/api/1/revision/<string:sha1_git>/directory/')
@app.route('/api/1/revision/<string:sha1_git>/directory/<path:dir_path>/')
def api_directory_with_revision(
sha1_git='a585d2b738bfa26326b3f1f40f0f1eda0c067ccf',
dir_path=None):
"""Return information on directory pointed by revision with sha1_git.
If dir_path is not provided, display top level directory.
Otherwise, display the directory pointed by dir_path (if it exists).
Args:
sha1_git: revision's hash.
dir_path: optional directory pointed to by that revision.
Returns:
Information on the directory pointed to by that revision.
Raises:
BadInputExc in case of unknown algo_hash or bad hash.
NotFoundExc either if the revision is not found or the path referenced
does not exist
Example:
GET /api/1/revision/baf18f9fc50a0b6fef50460a76c33b2ddc57486e/directory/
"""
return _revision_directory(sha1_git, dir_path, request.path)
@app.route('/api/1/revision/<string:sha1_git_root>/history/<sha1_git>/')
def api_revision_history(sha1_git_root, sha1_git):
"""Return information about revision sha1_git, limited to the
sub-graph of all transitive parents of sha1_git_root.
In other words, sha1_git is an ancestor of sha1_git_root.
Args:
sha1_git_root: latest revision of the browsed history.
sha1_git: one of sha1_git_root's ancestors.
limit: optional query parameter to limit the revisions log
(default to 100). For now, note that this limit could impede the
transitivity conclusion about sha1_git not being an ancestor of
sha1_git_root (even if it is).
Returns:
Information on sha1_git if it is an ancestor of sha1_git_root
including children leading to sha1_git_root.
Raises:
BadInputExc in case of unknown algo_hash or bad hash.
NotFoundExc if either revision is not found or if sha1_git is not an
ancestor of sha1_git_root.
"""
limit = int(request.args.get('limit', '100'))
if sha1_git == sha1_git_root:
return redirect(url_for('api_revision',
sha1_git=sha1_git,
limit=limit))
revision = service.lookup_revision_with_context(sha1_git_root,
sha1_git,
limit)
if not revision:
raise NotFoundExc(
"Possibly sha1_git '%s' is not an ancestor of sha1_git_root '%s'"
% (sha1_git, sha1_git_root))
return _enrich_revision_with_urls(revision, context=sha1_git_root)
@app.route('/api/1/revision/<string:sha1_git_root>'
'/history/<sha1_git>'
'/directory/')
@app.route('/api/1/revision/<string:sha1_git_root>'
'/history/<sha1_git>'
'/directory/<path:dir_path>/')
def api_directory_revision_history(sha1_git_root, sha1_git, dir_path=None):
"""Return information about directory pointed to by the revision
defined as: revision sha1_git, limited to the sub-graph of all
transitive parents of sha1_git_root.
Args:
sha1_git_root: latest revision of the browsed history.
sha1_git: one of sha1_git_root's ancestors.
dir_path: optional directory pointed to by that revision.
limit: optional query parameter to limit the revisions log
(default to 100). For now, note that this limit could impede the
transitivity conclusion about sha1_git not being an ancestor of
sha1_git_root (even if it is).
Returns:
Information on the directory pointed to by that revision.
Raises:
BadInputExc in case of unknown algo_hash or bad hash.
NotFoundExc if either revision is not found or if sha1_git is not an
ancestor of sha1_git_root or the path referenced does not exist
"""
limit = int(request.args.get('limit', '100'))
if sha1_git == sha1_git_root:
return redirect(url_for('api_directory_with_revision',
sha1_git=sha1_git,
dir_path=dir_path),
code=301)
revision = service.lookup_revision_with_context(sha1_git_root,
sha1_git,
limit)
if not revision:
raise NotFoundExc(
"Possibly sha1_git '%s' is not an ancestor of sha1_git_root '%s'"
% (sha1_git, sha1_git_root))
return _revision_directory(revision['id'], dir_path, request.path)
@app.route('/api/1/revision/<string:sha1_git>/log/')
def api_revision_log(sha1_git):
"""Show all revisions (~git log) starting from sha1_git.
The first element returned is the given sha1_git.
Args:
sha1_git: the revision's hash.
limit: optional query parameter to limit the revisions log
(default to 100).
Returns:
Information on the revision if found.
Raises:
BadInputExc in case of unknown algo_hash or bad hash.
NotFoundExc if the revision is not found.
"""
limit = int(request.args.get('limit', '100'))
def lookup_revision_log_with_limit(s, limit=limit):
return service.lookup_revision_log(s, limit)
error_msg = 'Revision with sha1_git %s not found.' % sha1_git
return _api_lookup(sha1_git,
lookup_fn=lookup_revision_log_with_limit,
error_msg_if_not_found=error_msg,
enrich_fn=_enrich_revision_with_urls)
@app.route('/api/1/directory/')
@app.route('/api/1/directory/<string:sha1_git>/')
def api_directory(sha1_git='dcf3289b576b1c8697f2a2d46909d36104208ba3'):
"""Return information about release with id sha1_git.
Args:
sha1_git: Directory's sha1_git.
Raises:
BadInputExc in case of unknown algo_hash or bad hash.
NotFoundExc if the content is not found.
Example:
GET /api/1/directory/8d7dc91d18546a91564606c3e3695a5ab568d179
"""
error_msg = 'Directory with sha1_git %s not found.' % sha1_git
return _api_lookup(
sha1_git,
lookup_fn=service.lookup_directory,
error_msg_if_not_found=error_msg,
enrich_fn=_enrich_directory)
# @app.route('/api/1/browse/')
# @app.route('/api/1/browse/<string:q>/')
def api_content_checksum_to_origin(q='sha1_git:26ac0281bc74e9bd8a4a4aab1c7c7a'
'0c19d4436c'):
"""Return content information up to one of its origin if the content
is found.
Args:
q is of the form algo_hash:hash with algo_hash in
(sha1, sha1_git, sha256).
Returns:
Information on one possible origin for such content.
Raises:
BadInputExc in case of unknown algo_hash or bad hash.
NotFoundExc if the content is not found.
Example:
GET /api/1/browse/sha1_git:88b9b366facda0b5ff8d8640ee9279bed346f242
"""
found = service.lookup_hash(q)['found']
if not found:
raise NotFoundExc('Content with %s not found.' % q)
return service.lookup_hash_origin(q)
@app.route('/api/1/content/<string:q>/raw/')
def api_content_raw(q):
"""Return content's raw data if content is found.
Args:
q is of the form (algo_hash:)hash with algo_hash in
(sha1, sha1_git, sha256).
When algo_hash is not provided, 'hash' is considered sha1.
Returns:
Content's raw data in application/octet-stream.
Raises:
- BadInputExc in case of unknown algo_hash or bad hash
- NotFoundExc if the content is not found.
"""
def generate(content):
yield content['data']
content = service.lookup_content_raw(q)
if not content:
raise NotFoundExc('Content with %s not found.' % q)
return Response(generate(content), mimetype='application/octet-stream')
def _enrich_content(content):
"""Enrich content with 'data', a link to its raw content.
"""
content['data_url'] = url_for('api_content_raw', q=content['sha1'])
return content
@app.route('/api/1/content/')
@app.route('/api/1/content/<string:q>/')
def api_content_with_details(q='sha256:e2c76e40866bb6b28916387bdfc8649beceb'
'523015738ec6d4d540c7fe65232b'):
"""Return content information if content is found.
Args:
q is of the form (algo_hash:)hash with algo_hash in
(sha1, sha1_git, sha256).
When algo_hash is not provided, 'hash' is considered sha1.
Returns:
Content's information.
Raises:
- BadInputExc in case of unknown algo_hash or bad hash.
- NotFoundExc if the content is not found.
Example:
GET /api/1/content/sha256:e2c76e40866bb6b28916387bdfc8649beceb
523015738ec6d4d540c7fe65232b
"""
return _api_lookup(
q,
lookup_fn=service.lookup_content,
error_msg_if_not_found='Content with %s not found.' % q,
enrich_fn=_enrich_content)
+def _enrich_entity(entity):
+ """Enrich entity with
+
+ """
+ entity['uuid_url'] = url_for('api_entity_by_uuid',
+ uuid=entity['uuid'])
+ if 'parent' in entity and entity['parent']:
+ entity['parent_url'] = url_for('api_entity_by_uuid',
+ uuid=entity['parent'])
+ return entity
+
+
+@app.route('/api/1/entity/')
+@app.route('/api/1/entity/<string:uuid>/')
+def api_entity_by_uuid(uuid='5f4d4c51-498a-4e28-88b3-b3e4e8396cba'):
+ """Return content information if content is found.
+
+ Args:
+ q is of the form (algo_hash:)hash with algo_hash in
+ (sha1, sha1_git, sha256).
+ When algo_hash is not provided, 'hash' is considered sha1.
+
+ Returns:
+ Content's information.
+
+ Raises:
+ - BadInputExc in case of unknown algo_hash or bad hash.
+ - NotFoundExc if the content is not found.
+
+ Example:
+ GET /api/1/entity/7c33636b-8f11-4bda-89d9-ba8b76a42cec/
+
+ """
+ return _api_lookup(
+ uuid,
+ lookup_fn=service.lookup_entity_by_uuid,
+ error_msg_if_not_found="Entity with uuid '%s' not found." % uuid,
+ enrich_fn=_enrich_entity)
+
+
@app.route('/api/1/uploadnsearch/', methods=['POST'])
def api_uploadnsearch():
"""Upload the file's content in the post body request.
Compute its hash and determine if it exists in the storage.
Args:
request.files filled with the filename's data to upload.
Returns:
Dictionary with 'sha1', 'filename' and 'found' predicate depending
on whether we find it or not.
Raises:
BadInputExc in case of the form submitted is incorrect.
"""
file = request.files.get('filename')
if not file:
raise BadInputExc("Bad request, missing 'filename' entry in form.")
return service.upload_and_search(file)
diff --git a/swh/web/ui/backend.py b/swh/web/ui/backend.py
index 970df5ea..f6adabf2 100644
--- a/swh/web/ui/backend.py
+++ b/swh/web/ui/backend.py
@@ -1,188 +1,195 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
from swh.web.ui import main
def content_get(sha1_bin):
"""Lookup the content designed by {algo: hash_bin}.
Args:
sha1_bin: content's binary sha1.
Returns:
Content as dict with 'sha1' and 'data' keys.
data representing its raw data.
"""
contents = main.storage().content_get([sha1_bin])
if contents and len(contents) >= 1:
return contents[0]
return None
def content_find(algo, hash_bin):
"""Retrieve the content with binary hash hash_bin
Args:
algo: nature of the hash hash_bin.
hash_bin: content's hash searched for.
Returns:
A triplet (sha1, sha1_git, sha256) if the content exist
or None otherwise.
"""
return main.storage().content_find({algo: hash_bin})
def content_find_occurrence(algo, hash_bin):
"""Find the content's occurrence.
Args:
algo: nature of the hash hash_bin.
hash_bin: content's hash searched for.
Returns:
The occurrence of the content.
"""
return main.storage().content_find_occurrence({algo: hash_bin})
def origin_get(origin_id):
"""Return information about the origin with id origin_id.
Args:
origin_id: origin's identifier
Returns:
Origin information as dict.
"""
return main.storage().origin_get({'id': origin_id})
def person_get(person_id):
"""Return information about the person with id person_id.
Args:
person_id: person's identifier.v
Returns:
Person information as dict.
"""
return main.storage().person_get([person_id])
def directory_get(sha1_git_bin, recursive=False):
"""Return information about the directory with id sha1_git.
Args:
sha1_git: directory's identifier.
recursive: Optional recursive flag default to False
Returns:
Directory information as dict.
"""
directory_entries = main.storage().directory_get(sha1_git_bin, recursive)
if not directory_entries:
return None
return directory_entries
def release_get(sha1_git_bin):
"""Return information about the release with sha1 sha1_git_bin.
Args:
sha1_git_bin: The release's sha1 as hexadecimal.
Returns:
Release information as dict if found, None otherwise.
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
res = main.storage().release_get([sha1_git_bin])
if res and len(res) >= 1:
return res[0]
return None
def revision_get(sha1_git_bin):
"""Return information about the revision with sha1 sha1_git_bin.
Args:
sha1_git_bin: The revision's sha1 as hexadecimal.
Returns:
Revision information as dict if found, None otherwise.
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
res = main.storage().revision_get([sha1_git_bin])
if res and len(res) >= 1:
return res[0]
return None
def revision_log(sha1_git_bin, limit=100):
"""Return information about the revision with sha1 sha1_git_bin.
Args:
sha1_git_bin: The revision's sha1 as hexadecimal.
limit: the maximum number of revisions returned.
Returns:
Revision information as dict if found, None otherwise.
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
return main.storage().revision_log(sha1_git_bin, limit)
def stat_counters():
"""Return the stat counters for Software Heritage
Returns:
A dict mapping textual labels to integer values.
"""
return main.storage().stat_counters()
def revision_get_by(origin_id, branch_name, timestamp):
"""Return occurrence information matching the criterions origin_id,
branch_name, ts.
"""
res = main.storage().revision_get_by(origin_id,
branch_name,
timestamp=timestamp,
limit=1)
if not res:
return None
return res[0]
def directory_entry_get_by_path(directory, path):
"""Return a directory entry by its path.
"""
paths = path.strip(os.path.sep).split(os.path.sep)
return main.storage().directory_entry_get_by_path(
directory,
list(map(lambda p: p.encode('utf-8'), paths)))
+
+
+def entity_get(uuid):
+ """Retrieve the entity per its uuid.
+
+ """
+ return main.storage().entity_get(uuid)
diff --git a/swh/web/ui/query.py b/swh/web/ui/query.py
index 5de6f231..cb68825b 100644
--- a/swh/web/ui/query.py
+++ b/swh/web/ui/query.py
@@ -1,87 +1,111 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import re
+from uuid import UUID
+
from swh.core.hashutil import ALGORITHMS, hex_to_hash
from swh.web.ui.exc import BadInputExc
SHA256_RE = re.compile(r'^[0-9a-f]{64}$', re.IGNORECASE)
SHA1_RE = re.compile(r'^[0-9a-f]{40}$', re.IGNORECASE)
def parse_hash(q):
"""Detect the hash type of a user submitted query string.
Args:
query string with the following format: "[HASH_TYPE:]HEX_CHECKSUM",
where HASH_TYPE is optional, defaults to "sha1", and can be one of
swh.core.hashutil.ALGORITHMS
Returns:
A pair (hash_algorithm, byte hash value)
Raises:
ValueError if the given query string does not correspond to a valid
hash value
"""
def guess_algo(q):
if SHA1_RE.match(q):
return 'sha1'
elif SHA256_RE.match(q):
return 'sha256'
else:
raise BadInputExc('Invalid checksum query string %s' % q)
def check_algo(algo, hex):
if (algo in {'sha1', 'sha1_git'} and not SHA1_RE.match(hex)) \
or (algo == 'sha256' and not SHA256_RE.match(hex)):
raise BadInputExc('Invalid hash %s for algorithm %s' % (hex, algo))
parts = q.split(':')
if len(parts) > 2:
raise BadInputExc('Invalid checksum query string %s' % q)
elif len(parts) == 1:
parts = (guess_algo(q), q)
elif len(parts) == 2:
check_algo(parts[0], parts[1])
algo = parts[0]
if algo not in ALGORITHMS:
raise BadInputExc('Unknown hash algorithm %s' % algo)
return (algo, hex_to_hash(parts[1]))
def parse_hash_with_algorithms_or_throws(q, accepted_algo, error_msg):
"""Parse a query but only accepts accepted_algo.
Otherwise, raise the exception with message error_msg.
Args:
- q: query string with the following format: "[HASH_TYPE:]HEX_CHECKSUM"
where HASH_TYPE is optional, defaults to "sha1", and can be one of
swh.core.hashutil.ALGORITHMS.
- accepted_algo: array of strings representing the names of accepted
algorithms.
- error_msg: error message to raise as BadInputExc if the algo of
the query does not match.
Returns:
A pair (hash_algorithm, byte hash value)
Raises:
BadInputExc when the inputs is invalid or does not
validate the accepted algorithms.
"""
algo, hash = parse_hash(q)
if algo not in accepted_algo:
raise BadInputExc(error_msg)
return (algo, hash)
+
+
+def parse_uuid4(uuid):
+ """Parse an uuid 4 from a string.
+
+ Args:
+ uuid: String representing an uuid.
+
+ Returns:
+ The uuid as is if everything is ok.
+
+ Raises:
+ BadInputExc: if the uuid is invalid.
+
+ """
+ try:
+ UUID(uuid, version=4)
+ except ValueError as e:
+ # not a valid hex code for a UUID
+ raise BadInputExc(str(e))
+
+ return uuid
diff --git a/swh/web/ui/service.py b/swh/web/ui/service.py
index d171b51f..5f8a10bc 100644
--- a/swh/web/ui/service.py
+++ b/swh/web/ui/service.py
@@ -1,412 +1,426 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import defaultdict
from swh.core import hashutil
from swh.web.ui import converters, query, upload, backend
from swh.web.ui.exc import NotFoundExc
def hash_and_search(filepath):
"""Hash the filepath's content as sha1, then search in storage if
it exists.
Args:
Filepath of the file to hash and search.
Returns:
Tuple (hex sha1, found as True or false).
The found boolean, according to whether the sha1 of the file
is present or not.
"""
h = hashutil.hashfile(filepath)
c = backend.content_find('sha1', h['sha1'])
if c:
r = converters.from_content(c)
r['found'] = True
return r
else:
return {'sha1': hashutil.hash_to_hex(h['sha1']),
'found': False}
def upload_and_search(file):
"""Upload a file and compute its hash.
"""
tmpdir, filename, filepath = upload.save_in_upload_folder(file)
res = {'filename': filename}
try:
content = hash_and_search(filepath)
res.update(content)
return res
finally:
# clean up
if tmpdir:
upload.cleanup(tmpdir)
def lookup_hash(q):
"""Checks if the storage contains a given content checksum
Args: query string of the form <hash_algo:hash>
Returns: Dict with key found to True or False, according to
whether the checksum is present or not
"""
algo, hash = query.parse_hash(q)
found = backend.content_find(algo, hash)
return {'found': found,
'algo': algo}
def lookup_hash_origin(q):
"""Return information about the checksum contained in the query q.
Args: query string of the form <hash_algo:hash>
Returns:
origin as dictionary if found for the given content.
"""
algo, hash = query.parse_hash(q)
origin = backend.content_find_occurrence(algo, hash)
return converters.from_origin(origin)
def lookup_origin(origin_id):
"""Return information about the origin with id origin_id.
Args:
origin_id as string
Returns:
origin information as dict.
"""
return backend.origin_get(origin_id)
def lookup_person(person_id):
"""Return information about the person with id person_id.
Args:
person_id as string
Returns:
person information as dict.
"""
person = backend.person_get(person_id)
return converters.from_person(person)
def lookup_directory(sha1_git):
"""Return information about the directory with id sha1_git.
Args:
sha1_git as string
Returns:
directory information as dict.
"""
_, sha1_git_bin = query.parse_hash_with_algorithms_or_throws(
sha1_git,
['sha1'], # HACK: sha1_git really
'Only sha1_git is supported.')
directory_entries = backend.directory_get(sha1_git_bin)
return map(converters.from_directory_entry, directory_entries)
def lookup_release(release_sha1_git):
"""Return information about the release with sha1 release_sha1_git.
Args:
release_sha1_git: The release's sha1 as hexadecimal
Returns:
Release information as dict.
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
_, sha1_git_bin = query.parse_hash_with_algorithms_or_throws(
release_sha1_git,
['sha1'],
'Only sha1_git is supported.')
res = backend.release_get(sha1_git_bin)
return converters.from_release(res)
def lookup_revision(rev_sha1_git):
"""Return information about the revision with sha1 revision_sha1_git.
Args:
revision_sha1_git: The revision's sha1 as hexadecimal
Returns:
Revision information as dict.
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
_, sha1_git_bin = query.parse_hash_with_algorithms_or_throws(
rev_sha1_git,
['sha1'],
'Only sha1_git is supported.')
res = backend.revision_get(sha1_git_bin)
return converters.from_revision(res)
def lookup_revision_by(origin_id,
branch_name="refs/heads/master",
timestamp=None):
"""Lookup revisions by origin_id, branch_name and timestamp.
If:
- branch_name is not provided, lookup using 'refs/heads/master' as default.
- ts is not provided, use the most recent
Args:
- origin_id: origin of the revision.
- branch_name: revision's branch.
- timestamp: revision's time frame.
Yields:
The revisions matching the criterions.
"""
res = backend.revision_get_by(origin_id, branch_name, timestamp)
return converters.from_revision(res)
def lookup_revision_log(rev_sha1_git, limit=100):
"""Return information about the revision with sha1 revision_sha1_git.
Args:
revision_sha1_git: The revision's sha1 as hexadecimal
limit: the maximum number of revisions returned
Returns:
Revision information as dict.
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
_, sha1_git_bin = query.parse_hash_with_algorithms_or_throws(
rev_sha1_git,
['sha1'],
'Only sha1_git is supported.')
revision_entries = backend.revision_log(sha1_git_bin, limit)
return map(converters.from_revision, revision_entries)
def lookup_revision_with_context_by(origin_id, branch_name, ts, sha1_git,
limit=100):
"""Return information about revision sha1_git, limited to the
sub-graph of all transitive parents of sha1_git_root.
sha1_git_root being resolved through the lookup of a revision by origin_id,
branch_name and ts.
In other words, sha1_git is an ancestor of sha1_git_root.
Args:
- origin_id: origin of the revision.
- branch_name: revision's branch.
- timestamp: revision's time frame.
- sha1_git: one of sha1_git_root's ancestors.
- limit: limit the lookup to 100 revisions back.
Returns:
Pair of (root_revision, revision).
Information on sha1_git if it is an ancestor of sha1_git_root
including children leading to sha1_git_root
Raises:
- BadInputExc in case of unknown algo_hash or bad hash.
- NotFoundExc if either revision is not found or if sha1_git is not an
ancestor of sha1_git_root.
"""
rev_root = backend.revision_get_by(origin_id, branch_name, ts)
if not rev_root:
raise NotFoundExc('Revision with (origin_id: %s, branch_name: %s'
', ts: %s) not found.' % (origin_id,
branch_name,
ts))
return (converters.from_revision(rev_root),
lookup_revision_with_context(rev_root, sha1_git, limit))
def lookup_revision_with_context(sha1_git_root, sha1_git, limit=100):
"""Return information about revision sha1_git, limited to the
sub-graph of all transitive parents of sha1_git_root.
In other words, sha1_git is an ancestor of sha1_git_root.
Args:
sha1_git_root: latest revision. The type is either a sha1 (as an hex
string) or a non converted dict.
sha1_git: one of sha1_git_root's ancestors
limit: limit the lookup to 100 revisions back
Returns:
Information on sha1_git if it is an ancestor of sha1_git_root
including children leading to sha1_git_root
Raises:
BadInputExc in case of unknown algo_hash or bad hash
NotFoundExc if either revision is not found or if sha1_git is not an
ancestor of sha1_git_root
"""
_, sha1_git_bin = query.parse_hash_with_algorithms_or_throws(
sha1_git,
['sha1'],
'Only sha1_git is supported.')
revision = backend.revision_get(sha1_git_bin)
if not revision:
raise NotFoundExc('Revision %s not found' % sha1_git)
if isinstance(sha1_git_root, str):
_, sha1_git_root_bin = query.parse_hash_with_algorithms_or_throws(
sha1_git_root,
['sha1'],
'Only sha1_git is supported.')
revision_root = backend.revision_get(sha1_git_root_bin)
if not revision_root:
raise NotFoundExc('Revision %s not found' % sha1_git_root)
else:
sha1_git_root_bin = sha1_git_root
revision_log = backend.revision_log(sha1_git_root_bin, limit)
parents = {}
children = defaultdict(list)
for rev in revision_log:
rev_id = rev['id']
parents[rev_id] = []
for parent_id in rev['parents']:
parents[rev_id].append(parent_id)
children[parent_id].append(rev_id)
if revision['id'] not in parents:
raise NotFoundExc('Revision %s is not an ancestor of %s' %
(sha1_git, sha1_git_root))
revision['children'] = children[revision['id']]
return converters.from_revision(revision)
def lookup_directory_with_revision(sha1_git, dir_path=None):
"""Return information on directory pointed by revision with sha1_git.
If dir_path is not provided, display top level directory.
Otherwise, display the directory pointed by dir_path (if it exists).
Args:
sha1_git: revision's hash.
dir_path: optional directory pointed to by that revision.
Returns:
Information on the directory pointed to by that revision.
Raises:
BadInputExc in case of unknown algo_hash or bad hash.
NotFoundExc either if the revision is not found or the path referenced
does not exist.
NotImplementedError in case of dir_path exists but do not reference a
type 'dir' or 'file'.
"""
_, sha1_git_bin = query.parse_hash_with_algorithms_or_throws(
sha1_git,
['sha1'],
'Only sha1_git is supported.')
revision = backend.revision_get(sha1_git_bin)
if not revision:
raise NotFoundExc('Revision %s not found' % sha1_git)
dir_sha1_git_bin = revision['directory']
if dir_path:
entity = backend.directory_entry_get_by_path(dir_sha1_git_bin,
dir_path)
if not entity:
raise NotFoundExc(
"Directory or File '%s' pointed to by revision %s not found"
% (dir_path, sha1_git))
else:
entity = {'type': 'dir', 'target': dir_sha1_git_bin}
if entity['type'] == 'dir':
directory_entries = backend.directory_get(entity['target'])
return {'type': 'dir',
'content': map(converters.from_directory_entry,
directory_entries)}
elif entity['type'] == 'file': # content
content = backend.content_find('sha1_git', entity['target'])
return {'type': 'file',
'content': converters.from_content(content)}
else:
raise NotImplementedError('Entity of type %s not implemented.'
% entity['type'])
def lookup_content(q):
"""Lookup the content designed by q.
Args:
q: The release's sha1 as hexadecimal
"""
algo, hash = query.parse_hash(q)
c = backend.content_find(algo, hash)
return converters.from_content(c)
def lookup_content_raw(q):
"""Lookup the content designed by q.
Args:
q: query string of the form <hash_algo:hash>
Returns:
dict with 'sha1' and 'data' keys.
data representing its raw data decoded.
"""
algo, hash = query.parse_hash(q)
c = backend.content_find(algo, hash)
if not c:
return None
content = backend.content_get(c['sha1'])
return converters.from_content(content)
def stat_counters():
"""Return the stat counters for Software Heritage
Returns:
A dict mapping textual labels to integer values.
"""
return backend.stat_counters()
+
+
+def lookup_entity_by_uuid(uuid):
+ """Return the entity's hierarchy from its uuid.
+
+ Args:
+ uuid: entity's identifier.
+
+ Returns:
+ List of hierarchy entities from the entity with uuid.
+
+ """
+ uuid = query.parse_uuid4(uuid)
+ return backend.entity_get(uuid)
diff --git a/swh/web/ui/tests/test_api.py b/swh/web/ui/tests/test_api.py
index 975978e8..7f08c973 100644
--- a/swh/web/ui/tests/test_api.py
+++ b/swh/web/ui/tests/test_api.py
@@ -1,1856 +1,1939 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
import unittest
import yaml
from nose.tools import istest
from unittest.mock import patch, MagicMock
from swh.web.ui.tests import test_app
from swh.web.ui import api, exc
-from swh.web.ui.exc import NotFoundExc
+from swh.web.ui.exc import NotFoundExc, BadInputExc
class ApiTestCase(test_app.SWHApiTestCase):
@istest
def generic_api_lookup_Nothing_is_found(self):
# given
def test_generic_lookup_fn(sha1, another_unused_arg):
assert another_unused_arg == 'unused arg'
assert sha1 == 'sha1'
return None
# when
with self.assertRaises(NotFoundExc) as cm:
api._api_lookup('sha1', test_generic_lookup_fn,
'This will be raised because None is returned.',
lambda x: x,
'unused arg')
self.assertIn('This will be raised because None is returned.',
cm.exception.args[0])
@istest
def generic_api_map_are_enriched_and_transformed_to_list(self):
# given
def test_generic_lookup_fn_1(criteria0, param0, param1):
assert criteria0 == 'something'
return map(lambda x: x + 1, [1, 2, 3])
# when
actual_result = api._api_lookup(
'something',
test_generic_lookup_fn_1,
'This is not the error message you are looking for. Move along.',
lambda x: x * 2,
'some param 0',
'some param 1')
self.assertEqual(actual_result, [4, 6, 8])
@istest
def generic_api_list_are_enriched_too(self):
# given
def test_generic_lookup_fn_2(crit):
assert crit == 'something'
return ['a', 'b', 'c']
# when
actual_result = api._api_lookup(
'something',
test_generic_lookup_fn_2,
'Not the error message you are looking for, it is. '
'Along, you move!',
lambda x: ''. join(['=', x, '=']))
self.assertEqual(actual_result, ['=a=', '=b=', '=c='])
@istest
def generic_api_generator_are_enriched_and_returned_as_list(self):
# given
def test_generic_lookup_fn_3(crit):
assert crit == 'crit'
return (i for i in [4, 5, 6])
# when
actual_result = api._api_lookup(
'crit',
test_generic_lookup_fn_3,
'Move!',
lambda x: x - 1)
self.assertEqual(actual_result, [3, 4, 5])
@istest
def generic_api_simple_data_are_enriched_and_returned_too(self):
# given
def test_generic_lookup_fn_4(crit):
assert crit == '123'
return {'a': 10}
def test_enrich_data(x):
x['a'] = x['a'] * 10
return x
# when
actual_result = api._api_lookup(
'123',
test_generic_lookup_fn_4,
'Nothing to do',
test_enrich_data)
self.assertEqual(actual_result, {'a': 100})
@patch('swh.web.ui.api.service')
# @istest
def api_content_checksum_to_origin(self, mock_service):
mock_service.lookup_hash.return_value = {'found': True}
stub_origin = {
"lister": None,
"url": "rsync://ftp.gnu.org/old-gnu/webbase",
"type": "ftp",
"id": 2,
"project": None
}
mock_service.lookup_hash_origin.return_value = stub_origin
# when
rv = self.app.get(
'/api/1/browse/sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, stub_origin)
mock_service.lookup_hash.assert_called_once_with(
'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
mock_service.lookup_hash_origin.assert_called_once_with(
'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
@patch('swh.web.ui.api.service')
# @istest
def api_content_checksum_to_origin_sha_not_found(self, mock_service):
# given
mock_service.lookup_hash.return_value = {'found': False}
# when
rv = self.app.get(
'/api/1/browse/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Content with sha1:40e71b8614fcd89ccd17ca2b1d9e6'
'6c5b00a6d03 not found.'
})
mock_service.lookup_hash.assert_called_once_with(
'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
@patch('swh.web.ui.api.service')
@istest
def api_content_with_details(self, mock_service):
# given
mock_service.lookup_content.return_value = {
'sha1': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'sha1_git': 'b4e8f472ffcb01a03875b26e462eb568739f6882',
'sha256': '83c0e67cc80f60caf1fcbec2d84b0ccd7968b3be4735637006560'
'cde9b067a4f',
'length': 17,
'status': 'visible'
}
# when
rv = self.app.get(
'/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'data_url': '/api/1/content/'
'40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/raw/',
'sha1': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'sha1_git': 'b4e8f472ffcb01a03875b26e462eb568739f6882',
'sha256': '83c0e67cc80f60caf1fcbec2d84b0ccd7968b3be4735637006560c'
'de9b067a4f',
'length': 17,
'status': 'visible'
})
mock_service.lookup_content.assert_called_once_with(
'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
@patch('swh.web.ui.api.service')
@istest
def api_content_not_found_as_json(self, mock_service):
# given
mock_service.lookup_content.return_value = None
mock_service.lookup_hash_origin = MagicMock()
# when
rv = self.app.get(
'/api/1/content/sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3'
'be4735637006560c/')
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Content with sha256:83c0e67cc80f60caf1fcbec2d84b0ccd79'
'68b3be4735637006560c not found.'
})
mock_service.lookup_content.assert_called_once_with(
'sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3'
'be4735637006560c')
mock_service.lookup_hash_origin.called = False
@patch('swh.web.ui.api.service')
@istest
def api_content_not_found_as_yaml(self, mock_service):
# given
mock_service.lookup_content.return_value = None
mock_service.lookup_hash_origin = MagicMock()
# when
rv = self.app.get(
'/api/1/content/sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3'
'be4735637006560c/',
headers={'accept': 'application/yaml'})
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/yaml')
response_data = yaml.load(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Content with sha256:83c0e67cc80f60caf1fcbec2d84b0ccd79'
'68b3be4735637006560c not found.'
})
mock_service.lookup_content.assert_called_once_with(
'sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3'
'be4735637006560c')
mock_service.lookup_hash_origin.called = False
@patch('swh.web.ui.api.service')
@istest
def api_content_raw(self, mock_service):
# given
stub_content = {'data': b'some content data'}
mock_service.lookup_content_raw.return_value = stub_content
# when
rv = self.app.get(
'/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'
'/raw/',
headers={'Content-type': 'application/octet-stream',
'Content-disposition': 'attachment'})
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/octet-stream')
self.assertEquals(rv.data, stub_content['data'])
mock_service.lookup_content_raw.assert_called_once_with(
'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
@patch('swh.web.ui.api.service')
@istest
def api_content_raw_not_found(self, mock_service):
# given
mock_service.lookup_content_raw.return_value = None
# when
rv = self.app.get(
'/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'
'/raw/')
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Content with sha1:40e71b8614fcd89ccd17ca2b1d9e6'
'6c5b00a6d03 not found.'
})
mock_service.lookup_content_raw.assert_called_once_with(
'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
@patch('swh.web.ui.api.service')
@istest
def api_search(self, mock_service):
# given
mock_service.lookup_hash.return_value = {
'found': {
'sha1': 'or something'
}
}
# when
rv = self.app.get('/api/1/search/sha1:blah/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {'found': True})
mock_service.lookup_hash.assert_called_once_with('sha1:blah')
@patch('swh.web.ui.api.service')
@istest
def api_search_as_yaml(self, mock_service):
# given
mock_service.lookup_hash.return_value = {
'found': {
'sha1': 'sha1 hash'
}
}
# when
rv = self.app.get('/api/1/search/sha1:halb/',
headers={'Accept': 'application/yaml'})
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/yaml')
response_data = yaml.load(rv.data.decode('utf-8'))
self.assertEquals(response_data, {'found': True})
mock_service.lookup_hash.assert_called_once_with('sha1:halb')
@patch('swh.web.ui.api.service')
@istest
def api_search_not_found(self, mock_service):
# given
mock_service.lookup_hash.return_value = {}
# when
rv = self.app.get('/api/1/search/sha1:halb/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {'found': False})
mock_service.lookup_hash.assert_called_once_with('sha1:halb')
@patch('swh.web.ui.api.service')
@istest
def api_1_stat_counters_raise_error(self, mock_service):
# given
mock_service.stat_counters.side_effect = ValueError(
'voluntary error to check the bad request middleware.')
# when
rv = self.app.get('/api/1/stat/counters/')
# then
self.assertEquals(rv.status_code, 400)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'voluntary error to check the bad request middleware.'})
@patch('swh.web.ui.api.service')
@istest
def api_1_stat_counters(self, mock_service):
# given
stub_stats = {
"content": 1770830,
"directory": 211683,
"directory_entry_dir": 209167,
"directory_entry_file": 1807094,
"directory_entry_rev": 0,
"entity": 0,
"entity_history": 0,
"occurrence": 0,
"occurrence_history": 19600,
"origin": 1096,
"person": 0,
"release": 8584,
"revision": 7792,
"revision_history": 0,
"skipped_content": 0
}
mock_service.stat_counters.return_value = stub_stats
# when
rv = self.app.get('/api/1/stat/counters/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, stub_stats)
mock_service.stat_counters.assert_called_once_with()
@patch('swh.web.ui.api.service')
@patch('swh.web.ui.api.request')
@istest
def api_uploadnsearch_bad_input(self, mock_request, mock_service):
# given
mock_request.files = {}
# when
rv = self.app.post('/api/1/uploadnsearch/')
self.assertEquals(rv.status_code, 400)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': "Bad request, missing 'filename' entry in form."})
mock_service.upload_and_search.called = False
@patch('swh.web.ui.api.service')
@patch('swh.web.ui.api.request')
@istest
def api_uploadnsearch(self, mock_request, mock_service):
# given
mock_request.files = {'filename': 'simple-filename'}
mock_service.upload_and_search.return_value = {
'filename': 'simple-filename',
'sha1': 'some-hex-sha1',
'found': False,
}
# when
rv = self.app.post('/api/1/uploadnsearch/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {'filename': 'simple-filename',
'sha1': 'some-hex-sha1',
'found': False})
mock_service.upload_and_search.assert_called_once_with(
'simple-filename')
@patch('swh.web.ui.api.service')
@istest
def api_origin(self, mock_service):
# given
stub_origin = {
'id': 1234,
'lister': 'uuid-lister-0',
'project': 'uuid-project-0',
'url': 'ftp://some/url/to/origin/0',
'type': 'ftp'
}
mock_service.lookup_origin.return_value = stub_origin
# when
rv = self.app.get('/api/1/origin/1234/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, stub_origin)
mock_service.lookup_origin.assert_called_with(1234)
@patch('swh.web.ui.api.service')
@istest
def api_origin_not_found(self, mock_service):
# given
mock_service.lookup_origin.return_value = None
# when
rv = self.app.get('/api/1/origin/4321/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Origin with id 4321 not found.'
})
mock_service.lookup_origin.assert_called_with(4321)
@patch('swh.web.ui.api.service')
@istest
def api_release(self, mock_service):
# given
stub_release = {
'id': 'release-0',
'target_type': 'revision',
'target': 'revision-sha1',
"date": "Mon, 10 Mar 1997 08:00:00 GMT",
"synthetic": True,
'author': {
'name': 'author release name',
'email': 'author@email',
},
}
expected_release = {
'id': 'release-0',
'target_type': 'revision',
'target': 'revision-sha1',
'target_url': '/api/1/revision/revision-sha1/',
"date": "Mon, 10 Mar 1997 08:00:00 GMT",
"synthetic": True,
'author': {
'name': 'author release name',
'email': 'author@email',
},
}
mock_service.lookup_release.return_value = stub_release
# when
rv = self.app.get('/api/1/release/release-0/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_release)
mock_service.lookup_release.assert_called_once_with('release-0')
@patch('swh.web.ui.api.service')
@istest
def api_release_target_type_not_a_revision(self, mock_service):
# given
stub_release = {
'id': 'release-0',
'target_type': 'other-stuff',
'target': 'other-stuff-checksum',
"date": "Mon, 10 Mar 1997 08:00:00 GMT",
"synthetic": True,
'author': {
'name': 'author release name',
'email': 'author@email',
},
}
expected_release = {
'id': 'release-0',
'target_type': 'other-stuff',
'target': 'other-stuff-checksum',
"date": "Mon, 10 Mar 1997 08:00:00 GMT",
"synthetic": True,
'author': {
'name': 'author release name',
'email': 'author@email',
},
}
mock_service.lookup_release.return_value = stub_release
# when
rv = self.app.get('/api/1/release/release-0/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_release)
mock_service.lookup_release.assert_called_once_with('release-0')
@patch('swh.web.ui.api.service')
@istest
def api_release_not_found(self, mock_service):
# given
mock_service.lookup_release.return_value = None
# when
rv = self.app.get('/api/1/release/release-0/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Release with sha1_git release-0 not found.'
})
@patch('swh.web.ui.api.service')
@istest
def api_revision(self, mock_service):
# given
stub_revision = {
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'author_name': 'Software Heritage',
'author_email': 'robot@softwareheritage.org',
'committer_name': 'Software Heritage',
'committer_email': 'robot@softwareheritage.org',
'message': 'synthetic revision message',
'date_offset': 0,
'committer_date_offset': 0,
'parents': ['8734ef7e7c357ce2af928115c6c6a42b7e2a44e7'],
'type': 'tar',
'synthetic': True,
'metadata': {
'original_artifact': [{
'archive_type': 'tar',
'name': 'webbase-5.7.0.tar.gz',
'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd',
'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1',
'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f'
'309d36484e7edf7bb912'
}]
},
}
mock_service.lookup_revision.return_value = stub_revision
expected_revision = {
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'url': '/api/1/revision/18d8be353ed3480476f032475e7c233eff7371d5/',
'history_url': '/api/1/revision/18d8be353ed3480476f032475e7c233e'
'ff7371d5/log/',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'directory_url': '/api/1/directory/7834ef7e7c357ce2af928115c6c6'
'a42b7e2a44e6/',
'author_name': 'Software Heritage',
'author_email': 'robot@softwareheritage.org',
'committer_name': 'Software Heritage',
'committer_email': 'robot@softwareheritage.org',
'message': 'synthetic revision message',
'date_offset': 0,
'committer_date_offset': 0,
'parents': [
'8734ef7e7c357ce2af928115c6c6a42b7e2a44e7'
],
'parent_urls': [
'/api/1/revision/18d8be353ed3480476f032475e7c233eff7371d5'
'/history/8734ef7e7c357ce2af928115c6c6a42b7e2a44e7/'
],
'type': 'tar',
'synthetic': True,
'metadata': {
'original_artifact': [{
'archive_type': 'tar',
'name': 'webbase-5.7.0.tar.gz',
'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd',
'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1',
'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f'
'309d36484e7edf7bb912'
}]
},
}
# when
rv = self.app.get('/api/1/revision/'
'18d8be353ed3480476f032475e7c233eff7371d5/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_revision)
mock_service.lookup_revision.assert_called_once_with(
'18d8be353ed3480476f032475e7c233eff7371d5')
@patch('swh.web.ui.api.service')
@istest
def api_revision_not_found(self, mock_service):
# given
mock_service.lookup_revision.return_value = None
# when
rv = self.app.get('/api/1/revision/revision-0/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Revision with sha1_git revision-0 not found.'})
@patch('swh.web.ui.api.service')
@istest
def api_revision_with_origin_not_found(self, mock_service):
mock_service.lookup_revision_by.return_value = None
rv = self.app.get('/api/1/revision/origin/123/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertIn('Revision with (origin_id: 123', response_data['error'])
self.assertIn('not found', response_data['error'])
mock_service.lookup_revision_by.assert_called_once_with(
123,
'refs/heads/master',
None)
@patch('swh.web.ui.api.service')
@istest
def api_revision_with_origin(self, mock_service):
mock_revision = {
'id': '32',
'directory': '21',
'message': 'message 1',
'type': 'deb',
}
expected_revision = {
'id': '32',
'url': '/api/1/revision/32/',
'history_url': '/api/1/revision/32/log/',
'directory': '21',
'directory_url': '/api/1/directory/21/',
'message': 'message 1',
'type': 'deb',
}
mock_service.lookup_revision_by.return_value = mock_revision
rv = self.app.get('/api/1/revision/origin/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, expected_revision)
mock_service.lookup_revision_by.assert_called_once_with(
1,
'refs/heads/master',
None)
@patch('swh.web.ui.api.service')
@istest
def api_revision_with_origin_and_branch_name(self, mock_service):
mock_revision = {
'id': '12',
'directory': '23',
'message': 'message 2',
'type': 'tar',
}
mock_service.lookup_revision_by.return_value = mock_revision
expected_revision = {
'id': '12',
'url': '/api/1/revision/12/',
'history_url': '/api/1/revision/12/log/',
'directory': '23',
'directory_url': '/api/1/directory/23/',
'message': 'message 2',
'type': 'tar',
}
rv = self.app.get('/api/1/revision/origin/1/branch/refs/origin/dev/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, expected_revision)
mock_service.lookup_revision_by.assert_called_once_with(
1,
'refs/origin/dev',
None)
@patch('swh.web.ui.api.service')
@patch('swh.web.ui.api.utils')
@istest
def api_revision_with_origin_and_branch_name_and_timestamp(self,
mock_utils,
mock_service):
mock_revision = {
'id': '123',
'directory': '456',
'message': 'message 3',
'type': 'tar',
}
mock_service.lookup_revision_by.return_value = mock_revision
expected_revision = {
'id': '123',
'url': '/api/1/revision/123/',
'history_url': '/api/1/revision/123/log/',
'directory': '456',
'directory_url': '/api/1/directory/456/',
'message': 'message 3',
'type': 'tar',
}
mock_utils.parse_timestamp.return_value = 'parsed-date'
rv = self.app.get('/api/1/revision'
'/origin/1'
'/branch/refs/origin/dev'
'/ts/1452591542/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, expected_revision)
mock_service.lookup_revision_by.assert_called_once_with(
1,
'refs/origin/dev',
'parsed-date')
mock_utils.parse_timestamp.assert_called_once_with('1452591542')
@patch('swh.web.ui.api.service')
@patch('swh.web.ui.api.utils')
@istest
def api_revision_with_origin_and_branch_name_and_timestamp_with_escapes(
self,
mock_utils,
mock_service):
mock_revisions = [{
'id': '999',
}]
mock_service.lookup_revision_by.return_value = mock_revisions
expected_revisions = [{
'id': '999',
'url': '/api/1/revision/999/',
'history_url': '/api/1/revision/999/log/',
}]
mock_utils.parse_timestamp.return_value = 'parsed-date'
rv = self.app.get('/api/1/revision'
'/origin/1'
'/branch/refs%2Forigin%2Fdev'
'/ts/Today%20is%20'
'January%201,%202047%20at%208:21:00AM/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, expected_revisions)
mock_service.lookup_revision_by.assert_called_once_with(
1,
'refs/origin/dev',
'parsed-date')
mock_utils.parse_timestamp.assert_called_once_with(
'Today is January 1, 2047 at 8:21:00AM')
@patch('swh.web.ui.api.service')
@istest
def api_directory_through_rev_with_origin_history_with_rev_not_found_0(
self, mock_service):
# test both endpoint (path is useless here, still the endpoint
# must be tested)
for url in ['/api/1/revision'
'/origin/1'
'/history/4563'
'/directory/',
'/api/1/revision'
'/origin/1'
'/history/4563'
'/directory/some-path/']:
# given
mock_service.lookup_revision_with_context_by.return_value = {
'id': 'root-rev-id'}, None
# when
rv = self.app.get(url)
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, {
'error':
"Possibly sha1_git '%s' is not an ancestor of sha1_git_root "
"'%s' sha1_git_root being the revision's identifier pointed to"
" by (origin_id: %s, branch_name: %s, ts: %s)." % (
'4563',
'root-rev-id',
1,
'refs/heads/master',
None)})
mock_service.lookup_revision_with_context_by.assert_called(
1, 'refs/heads/master', None, '4563', 100)
# reset_mock
mock_service.reset_mock()
@patch('swh.web.ui.api.service')
@istest
def api_directory_through_rev_with_origin_history_rev_not_found_1( # noqa
self, mock_service):
# test both endpoint (path is useless here, still the endpoint
# must be tested)
for url in ['/api/1/revision'
'/origin/10'
'/branch/origin/dev'
'/history/213'
'/directory/',
'/api/1/revision'
'/origin/10'
'/branch/origin/dev'
'/history/213'
'/directory/some/path/']:
# given
mock_service.lookup_revision_with_context_by.return_value = {
'id': 'root-rev-id'}, None
# when
rv = self.app.get(url)
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, {
'error':
"Possibly sha1_git '%s' is not an ancestor of sha1_git_root "
"'%s' sha1_git_root being the revision's identifier pointed to"
" by (origin_id: %s, branch_name: %s, ts: %s)." % (
'213',
'root-rev-id',
10,
'origin/dev',
None)})
mock_service.lookup_revision_with_context_by.assert_called(
10, 'origin/dev', None, '213', 100)
# for next turn
mock_service.reset_mock()
@patch('swh.web.ui.api.utils')
@patch('swh.web.ui.api.service')
@istest
def api_directory_through_rev_with_origin_history_rev_found_2(
self, mock_service, mock_utils):
# test both endpoint (path is useless here, still the endpoint
# must be tested)
for url in ['/api/1/revision'
'/origin/100'
'/branch/master'
'/ts/2012-11-23'
'/history/876'
'/directory/',
'/api/1/revision'
'/origin/100'
'/branch/master'
'/ts/2012-11-23'
'/history/876'
'/directory/some-path/']:
# given
mock_service.lookup_revision_with_context_by.return_value = {
'id': 'root-rev-id'}, None
mock_utils.parse_timestamp.return_value = '2012-11-23 00:00:00'
# when
rv = self.app.get(url)
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, {
'error':
"Possibly sha1_git '%s' is not an ancestor of sha1_git_root "
"'%s' sha1_git_root being the revision's identifier pointed to"
" by (origin_id: %s, branch_name: %s, ts: %s)." % (
'876',
'root-rev-id',
100,
'master',
'2012-11-23 00:00:00')})
mock_service.lookup_revision_with_context_by.assert_called(
100, 'master', '2012-11-23 00:00:00', '876', 100)
mock_utils.parse_timestamp.assert_called_once_with('2012-11-23')
# rest
mock_utils.reset_mock()
mock_service.reset_mock()
@patch('swh.web.ui.api.utils')
@patch('swh.web.ui.api.service')
@istest
def api_directory_through_rev_with_origin_history_rev_ok_but_sha1_rev_not(
self, mock_service, mock_utils):
# test both endpoint (path is useless here, still the endpoint
# must be tested)
for url in ['/api/1/revision'
'/origin/666'
'/branch/refs/master'
'/ts/2016-11-23'
'/history/123-sha1-git'
'/directory/?limit=1000',
'/api/1/revision'
'/origin/666'
'/branch/refs/master'
'/ts/2016-11-23'
'/history/123-sha1-git'
'/directory/some/path/?limit=1000']:
# given
mock_service.lookup_revision_with_context_by.return_value = {
'id': 'root-rev-id'}, None
mock_utils.parse_timestamp.return_value = '2016-11-23 00:00:00'
# when
rv = self.app.get(url)
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, {
'error':
"Possibly sha1_git '%s' is not an ancestor of sha1_git_root "
"'%s' sha1_git_root being the revision's identifier pointed to"
" by (origin_id: %s, branch_name: %s, ts: %s)."
% ('123-sha1-git',
'root-rev-id',
666,
'refs/master',
'2016-11-23 00:00:00')})
mock_service.lookup_revision_with_context_by.assert_called(
666, 'refs/master', '2016-11-23 00:00:00',
'123-sha1-git', 1000)
mock_utils.parse_timestamp.assert_called_once_with('2016-11-23')
# reset_mock
mock_service.reset_mock()
mock_utils.reset_mock()
@patch('swh.web.ui.api._revision_directory')
@patch('swh.web.ui.api.utils')
@patch('swh.web.ui.api.service')
@istest
def api_directory_through_revision_with_origin_history(
self, mock_service, mock_utils, mock_revision_directory):
# given
stub_root_rev = {
'id': '45-sha1-git-root'
}
stub_revision = {
'id': 'revision-id',
}
mock_service.lookup_revision_with_context_by.return_value = (
stub_root_rev,
stub_revision)
stub_dir_content = [
{
'type': 'dir'
},
{
'type': 'file'
},
]
mock_revision_directory.return_value = stub_dir_content
mock_utils.parse_timestamp.return_value = '2016-11-24 00:00:00'
# when
url = '/api/1/revision' \
'/origin/999' \
'/branch/refs/dev' \
'/ts/2016-11-24' \
'/history/12-sha1-git' \
'/directory/some/content/'
rv = self.app.get(url)
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, stub_dir_content)
mock_utils.parse_timestamp.assert_called_once_with('2016-11-24')
mock_service.lookup_revision_with_context_by(999,
'refs/dev',
'2016-11-24 00:00:00'
'12-sha1-git',
100)
mock_revision_directory.assert_called_once_with('revision-id',
'some/content',
url)
@patch('swh.web.ui.api.service')
@istest
def api_history_through_revision_with_origin_rev_not_found_0(
self, mock_service):
mock_service.lookup_revision_with_context_by.return_value = {
'id': 'root-rev-id'}, None
# when
rv = self.app.get('/api/1/revision'
'/origin/1'
'/history/4563/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, {
'error':
"Possibly sha1_git '%s' is not an ancestor of sha1_git_root '%s'"
" sha1_git_root being the revision's identifier pointed to by "
"(origin_id: %s, branch_name: %s, ts: %s)."
% ('4563',
'root-rev-id',
1,
'refs/heads/master',
None)})
mock_service.lookup_revision_with_context_by.assert_called_once_with(
1, 'refs/heads/master', None, '4563', 100)
@patch('swh.web.ui.api.service')
@istest
def api_history_through_revision_with_origin_rev_not_found_1(
self, mock_service):
# given
mock_service.lookup_revision_with_context_by.return_value = {
'id': 'root-rev-id'}, None
# when
rv = self.app.get('/api/1/revision'
'/origin/10'
'/branch/origin/dev'
'/history/213/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, {
'error':
"Possibly sha1_git '%s' is not an ancestor of sha1_git_root '%s'"
" sha1_git_root being the revision's identifier pointed to by "
"(origin_id: %s, branch_name: %s, ts: %s)."
% ('213',
'root-rev-id',
10,
'origin/dev',
None)})
mock_service.lookup_revision_with_context_by.assert_called_once_with(
10, 'origin/dev', None, '213', 100)
@patch('swh.web.ui.api.utils')
@patch('swh.web.ui.api.service')
@istest
def api_history_through_revision_with_origin_rev_not_found_2(
self, mock_service, mock_utils):
# given
mock_service.lookup_revision_with_context_by.return_value = {
'id': 'root-rev-id'}, None
mock_utils.parse_timestamp.return_value = '2012-11-23 00:00:00'
# when
rv = self.app.get('/api/1/revision'
'/origin/100'
'/branch/master'
'/ts/2012-11-23'
'/history/876/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, {
'error':
"Possibly sha1_git '%s' is not an ancestor of sha1_git_root '%s'"
" sha1_git_root being the revision's identifier pointed to by "
"(origin_id: %s, branch_name: %s, ts: %s)."
% ('876',
'root-rev-id',
100,
'master',
'2012-11-23 00:00:00')})
mock_service.lookup_revision_with_context_by.assert_called_once_with(
100, 'master', '2012-11-23 00:00:00', '876', 100)
mock_utils.parse_timestamp.assert_called_once_with('2012-11-23')
@patch('swh.web.ui.api.utils')
@patch('swh.web.ui.api.service')
@istest
def api_history_through_revision_with_origin_rev_not_found_3(
self, mock_service, mock_utils):
# given
mock_service.lookup_revision_with_context_by.return_value = {
'id': 'root-rev-id'}, None
mock_service.lookup_revision_with_context.return_value = None
mock_utils.parse_timestamp.return_value = '2016-11-23 00:00:00'
# when
rv = self.app.get('/api/1/revision'
'/origin/666'
'/branch/refs/master'
'/ts/2016-11-23'
'/history/123-sha1-git/?limit=1000')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, {
'error':
"Possibly sha1_git '%s' is not an ancestor of sha1_git_root '%s'"
" sha1_git_root being the revision's identifier pointed to by "
"(origin_id: %s, branch_name: %s, ts: %s)."
% ('123-sha1-git',
'root-rev-id',
666,
'refs/master',
'2016-11-23 00:00:00')})
mock_service.lookup_revision_with_context_by.assert_called_once_with(
666, 'refs/master', '2016-11-23 00:00:00', '123-sha1-git', 1000)
mock_utils.parse_timestamp.assert_called_once_with('2016-11-23')
mock_service.lookup_revision_with_context('456-sha1-git-root',
'123-sha1-git',
1000)
@patch('swh.web.ui.api._enrich_revision_with_urls')
@patch('swh.web.ui.api.utils')
@patch('swh.web.ui.api.service')
@istest
def api_history_through_revision(
self, mock_service, mock_utils, mock_enrich):
# given
stub_root_rev = {
'id': '45-sha1-git-root'
}
stub_revision = {
'children': [],
}
mock_service.lookup_revision_with_context_by.return_value = (
stub_root_rev,
stub_revision)
mock_enrich.return_value = 'some-enriched-result'
mock_utils.parse_timestamp.return_value = '2016-11-24 00:00:00'
# when
rv = self.app.get('/api/1/revision'
'/origin/999'
'/branch/refs/dev'
'/ts/2016-11-24'
'/history/12-sha1-git/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, 'some-enriched-result')
mock_service.lookup_revision_with_context_by.assert_called_once_with(
999,
'refs/dev',
'2016-11-24 00:00:00',
'12-sha1-git',
100)
mock_utils.parse_timestamp.assert_called_once_with('2016-11-24')
mock_enrich.assert_called_once_with(stub_revision,
context='45-sha1-git-root')
@patch('swh.web.ui.api.utils')
@patch('swh.web.ui.api.service')
@istest
def api_directory_through_revision_with_origin_rev_not_found(self,
mock_service,
mock_utils):
mock_service.lookup_revision_by.return_value = None
mock_utils.parse_timestamp.return_value = '2012-10-20 00:00:00'
rv = self.app.get('/api/1/revision'
'/origin/10'
'/branch/refs/remote/origin/dev'
'/ts/2012-10-20'
'/directory/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, {
'error': 'Revision with (origin_id: 10, branch_name: '
'refs/remote/origin/dev, ts: 2012-10-20 00:00:00) not found.'})
mock_service.lookup_revision_by.assert_called_once_with(
10,
'refs/remote/origin/dev',
'2012-10-20 00:00:00')
@patch('swh.web.ui.api.service')
@patch('swh.web.ui.api._revision_directory')
@istest
def api_directory_through_revision_with_origin(self,
mock_revision_dir,
mock_service):
mock_revision = {
'id': '998',
}
expected_res = [{
'id': '123'
}]
mock_revision_dir.return_value = expected_res
mock_service.lookup_revision_by.return_value = mock_revision
rv = self.app.get('/api/1/revision/origin/3/directory/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, expected_res)
mock_service.lookup_revision_by.assert_called_once_with(
3,
'refs/heads/master',
None)
mock_revision_dir.assert_called_once_with(
'998',
None,
'/api/1/revision/origin/3/directory/')
@patch('swh.web.ui.api.service')
@istest
def api_revision_log(self, mock_service):
# given
stub_revisions = [{
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'author_name': 'Software Heritage',
'author_email': 'robot@softwareheritage.org',
'committer_name': 'Software Heritage',
'committer_email': 'robot@softwareheritage.org',
'message': 'synthetic revision message',
'date_offset': 0,
'committer_date_offset': 0,
'parents': ['7834ef7e7c357ce2af928115c6c6a42b7e2a4345'],
'type': 'tar',
'synthetic': True,
}]
mock_service.lookup_revision_log.return_value = stub_revisions
expected_revisions = [{
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'url': '/api/1/revision/18d8be353ed3480476f032475e7c233eff7371d5/',
'history_url': '/api/1/revision/18d8be353ed3480476f032475e7c233ef'
'f7371d5/log/',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'directory_url': '/api/1/directory/7834ef7e7c357ce2af928115c6c6a'
'42b7e2a44e6/',
'author_name': 'Software Heritage',
'author_email': 'robot@softwareheritage.org',
'committer_name': 'Software Heritage',
'committer_email': 'robot@softwareheritage.org',
'message': 'synthetic revision message',
'date_offset': 0,
'committer_date_offset': 0,
'parents': [
'7834ef7e7c357ce2af928115c6c6a42b7e2a4345'
],
'parent_urls': [
'/api/1/revision/18d8be353ed3480476f032475e7c233eff7371d5'
'/history/7834ef7e7c357ce2af928115c6c6a42b7e2a4345/'
],
'type': 'tar',
'synthetic': True,
}]
# when
rv = self.app.get('/api/1/revision/8834ef7e7c357ce2af928115c6c6a42'
'b7e2a44e6/log/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_revisions)
mock_service.lookup_revision_log.assert_called_once_with(
'8834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 100)
@patch('swh.web.ui.api.service')
@istest
def api_revision_log_not_found(self, mock_service):
# given
mock_service.lookup_revision_log.return_value = None
# when
rv = self.app.get('/api/1/revision/8834ef7e7c357ce2af928115c6c6a42b7'
'e2a44e6/log/?limit=10')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Revision with sha1_git'
' 8834ef7e7c357ce2af928115c6c6a42b7e2a44e6 not found.'})
mock_service.lookup_revision_log.assert_called_once_with(
'8834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 10)
@patch('swh.web.ui.api.service')
@istest
def api_revision_history_not_found(self, mock_service):
# given
mock_service.lookup_revision_with_context.return_value = None
# then
rv = self.app.get('/api/1/revision/999/history/338/?limit=5')
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
mock_service.lookup_revision_with_context.assert_called_once_with(
'999', '338', 5)
@istest
def api_revision_history_sha1_same_so_redirect(self):
# when
rv = self.app.get('/api/1/revision/123/history/123?limit=10')
# then
self.assertEquals(rv.status_code, 301)
# Ideally we'd like to be able to check the resulting url path
# but does not work, this returns the current url
# also following the redirect would mean to yet mock again the
# destination url... So for now cannot test it
# self.assertEquals(rv.location,
# 'http://localhost/api/1/revision/123?limit=10')
@patch('swh.web.ui.api.service')
@istest
def api_revision_history(self, mock_service):
# for readability purposes, we use:
# - sha1 as 3 letters (url are way too long otherwise to respect pep8)
# - only keys with modification steps (all other keys are kept as is)
# given
stub_revision = {
'id': '883',
'children': ['777', '999'],
'parents': [],
'directory': '272'
}
mock_service.lookup_revision_with_context.return_value = stub_revision
# then
rv = self.app.get('/api/1/revision/666/history/883/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'id': '883',
'url': '/api/1/revision/883/',
'history_url': '/api/1/revision/883/log/',
'children': ['777', '999'],
'children_urls': ['/api/1/revision/666/history/777/',
'/api/1/revision/666/history/999/'],
'parents': [],
'parent_urls': [],
'directory': '272',
'directory_url': '/api/1/directory/272/'
})
mock_service.lookup_revision_with_context.assert_called_once_with(
'666', '883', 100)
@patch('swh.web.ui.api.service')
@istest
def api_directory_with_revision_not_found(self, mock_service):
# given
mock_service.lookup_directory_with_revision.return_value = None
# then
rv = self.app.get('/api/1/revision/999/directory/some/path/to/dir/')
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
mock_service.lookup_directory_with_revision.assert_called_once_with(
'999', 'some/path/to/dir')
@patch('swh.web.ui.api.service')
@istest
def api_directory_with_revision_not_found_2(self, mock_service):
# given
mock_service.lookup_directory_with_revision.return_value = None
# then
rv = self.app.get('/api/1/revision/123/directory/')
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
mock_service.lookup_directory_with_revision.assert_called_once_with(
'123', None)
@patch('swh.web.ui.api.service')
@istest
def api_directory_with_revision_ok_returns_dir_entries(self, mock_service):
stub_dir = {
'type': 'dir',
'content': [
{
'sha1_git': '789',
'type': 'file',
'target': '101',
'name': 'somefile'
},
{
'sha1_git': '123',
'type': 'dir',
'target': '456',
'name': 'to-subdir',
}
]
}
expected_dir = [
{
'sha1_git': '789',
'type': 'file',
'target': '101',
'target_url': '/api/1/content/sha1_git:101/',
'name': 'somefile',
'file_url': '/api/1/revision/999/directory/some/path/somefile/'
},
{
'sha1_git': '123',
'type': 'dir',
'target': '456',
'target_url': '/api/1/directory/456/',
'name': 'to-subdir',
'dir_url': '/api/1/revision/999/directory/some/path/'
'to-subdir/',
}]
# given
mock_service.lookup_directory_with_revision.return_value = stub_dir
# then
rv = self.app.get('/api/1/revision/999/directory/some/path/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_dir)
mock_service.lookup_directory_with_revision.assert_called_once_with(
'999', 'some/path')
@patch('swh.web.ui.api.service')
@istest
def api_directory_with_revision_ok_returns_content(self, mock_service):
stub_content = {
'type': 'file',
'content': {
'sha1_git': '789',
'sha1': '101',
}
}
expected_content = {
'sha1_git': '789',
'sha1': '101',
'data_url': '/api/1/content/101/raw/',
}
# given
mock_service.lookup_directory_with_revision.return_value = stub_content
# then
rv = self.app.get('/api/1/revision/999/directory/some/path/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_content)
mock_service.lookup_directory_with_revision.assert_called_once_with(
'999', 'some/path')
@istest
def api_directory_revision_history_sha1_same_so_redirect(self):
# when
rv = self.app.get(
'/api/1/revision/123/history/123/directory/path/to/?limit=1')
# then
self.assertEquals(rv.status_code, 301)
# self.assertEquals(rv.location,
# 'http://localhost/api/1/revision/123/directory/path/to/')
@patch('swh.web.ui.api.service')
@istest
def api_directory_revision_history_ko_revision_not_found(self,
mock_service):
# given
mock_service.lookup_revision_with_context.return_value = None
# then
rv = self.app.get('/api/1/revision/456/history/987/directory/path/to/')
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': "Possibly sha1_git '987' is not " +
"an ancestor of sha1_git_root '456'"})
mock_service.lookup_revision_with_context.assert_called_once_with(
'456', '987', 100)
@patch('swh.web.ui.api.service')
@istest
def api_directory_revision_history(self,
mock_service):
# given
mock_service.lookup_revision_with_context.return_value = {
'id': 'rev-id'
}
stub_dir = {
'type': 'dir',
'content': [
{
'sha1_git': '879',
'type': 'file',
'target': '110',
'name': 'subfile'
},
{
'sha1_git': '213',
'type': 'dir',
'target': '546',
'name': 'subdir',
}
]
}
expected_dir = [
{
'sha1_git': '879',
'type': 'file',
'target': '110',
'target_url': '/api/1/content/sha1_git:110/',
'name': 'subfile',
'file_url': '/api/1/revision/354/history/867/directory/debian/'
'subfile/',
},
{
'sha1_git': '213',
'type': 'dir',
'target': '546',
'target_url': '/api/1/directory/546/',
'name': 'subdir',
'dir_url':
'/api/1/revision/354/history/867/directory/debian/subdir/'
}]
# given
mock_service.lookup_directory_with_revision.return_value = stub_dir
# then
rv = self.app.get('/api/1/revision/354'
'/history/867'
'/directory/debian/?limit=4')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_dir)
mock_service.lookup_revision_with_context.assert_called_once_with(
'354', '867', 4)
mock_service.lookup_directory_with_revision('rev-id', 'debian')
@patch('swh.web.ui.api.service')
@istest
def api_person(self, mock_service):
# given
stub_person = {
'id': '198003',
'name': 'Software Heritage',
'email': 'robot@softwareheritage.org',
}
mock_service.lookup_person.return_value = stub_person
# when
rv = self.app.get('/api/1/person/198003/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, stub_person)
@patch('swh.web.ui.api.service')
@istest
def api_person_not_found(self, mock_service):
# given
mock_service.lookup_person.return_value = None
# when
rv = self.app.get('/api/1/person/666/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Person with id 666 not found.'})
@patch('swh.web.ui.api.service')
@istest
def api_directory(self, mock_service):
# given
stub_directories = [
{
'sha1_git': '18d8be353ed3480476f032475e7c233eff7371d5',
'type': 'file',
'target': '4568be353ed3480476f032475e7c233eff737123',
},
{
'sha1_git': '1d518d8be353ed3480476f032475e7c233eff737',
'type': 'dir',
'target': '8be353ed3480476f032475e7c233eff737123456',
}]
expected_directories = [
{
'sha1_git': '18d8be353ed3480476f032475e7c233eff7371d5',
'type': 'file',
'target': '4568be353ed3480476f032475e7c233eff737123',
'target_url': '/api/1/content/'
'sha1_git:4568be353ed3480476f032475e7c233eff737123/',
},
{
'sha1_git': '1d518d8be353ed3480476f032475e7c233eff737',
'type': 'dir',
'target': '8be353ed3480476f032475e7c233eff737123456',
'target_url':
'/api/1/directory/8be353ed3480476f032475e7c233eff737123456/',
}]
mock_service.lookup_directory.return_value = stub_directories
# when
rv = self.app.get('/api/1/directory/'
'18d8be353ed3480476f032475e7c233eff7371d5/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_directories)
mock_service.lookup_directory.assert_called_once_with(
'18d8be353ed3480476f032475e7c233eff7371d5')
@patch('swh.web.ui.api.service')
@istest
def api_directory_not_found(self, mock_service):
# given
mock_service.lookup_directory.return_value = []
# when
rv = self.app.get('/api/1/directory/'
'66618d8be353ed3480476f032475e7c233eff737/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Directory with sha1_git '
'66618d8be353ed3480476f032475e7c233eff737 not found.'})
+ @patch('swh.web.ui.api.service')
+ @istest
+ def api_lookup_entity_by_uuid_not_found(self, mock_service):
+ # when
+ mock_service.lookup_entity_by_uuid.return_value = []
+
+ # when
+ rv = self.app.get('/api/1/entity/')
+
+ self.assertEquals(rv.status_code, 404)
+ self.assertEquals(rv.mimetype, 'application/json')
+
+ response_data = json.loads(rv.data.decode('utf-8'))
+ self.assertEquals(response_data, {
+ 'error':
+ "Entity with uuid '5f4d4c51-498a-4e28-88b3-b3e4e8396cba' not " +
+ "found."})
+
+ mock_service.lookup_entity_by_uuid.assert_called_once_with(
+ '5f4d4c51-498a-4e28-88b3-b3e4e8396cba')
+
+ @patch('swh.web.ui.api.service')
+ @istest
+ def api_lookup_entity_by_uuid_bad_request(self, mock_service):
+ # when
+ mock_service.lookup_entity_by_uuid.side_effect = BadInputExc(
+ 'bad input: uuid malformed!')
+
+ # when
+ rv = self.app.get('/api/1/entity/uuid malformed/')
+
+ self.assertEquals(rv.status_code, 400)
+ self.assertEquals(rv.mimetype, 'application/json')
+
+ response_data = json.loads(rv.data.decode('utf-8'))
+ self.assertEquals(response_data, {
+ 'error': 'bad input: uuid malformed!'})
+ mock_service.lookup_entity_by_uuid.assert_called_once_with(
+ 'uuid malformed')
+
+ @patch('swh.web.ui.api.service')
+ @istest
+ def api_lookup_entity_by_uuid(self, mock_service):
+ # when
+ stub_entities = [
+ {
+ 'uuid': '34bd6b1b-463f-43e5-a697-785107f598e4',
+ 'parent': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2'
+ },
+ {
+ 'uuid': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2'
+ }
+ ]
+ mock_service.lookup_entity_by_uuid.return_value = stub_entities
+
+ expected_entities = [
+ {
+ 'uuid': '34bd6b1b-463f-43e5-a697-785107f598e4',
+ 'uuid_url': '/api/1/entity/34bd6b1b-463f-43e5-a697-'
+ '785107f598e4/',
+ 'parent': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2',
+ 'parent_url': '/api/1/entity/aee991a0-f8d7-4295-a201-'
+ 'd1ce2efc9fb2/'
+ },
+ {
+ 'uuid': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2',
+ 'uuid_url': '/api/1/entity/aee991a0-f8d7-4295-a201-'
+ 'd1ce2efc9fb2/'
+ }
+ ]
+
+ # when
+ rv = self.app.get('/api/1/entity'
+ '/34bd6b1b-463f-43e5-a697-785107f598e4/')
+
+ self.assertEquals(rv.status_code, 200)
+ self.assertEquals(rv.mimetype, 'application/json')
+
+ response_data = json.loads(rv.data.decode('utf-8'))
+ self.assertEquals(response_data, expected_entities)
+ mock_service.lookup_entity_by_uuid.assert_called_once_with(
+ '34bd6b1b-463f-43e5-a697-785107f598e4')
+
class ApiUtils(unittest.TestCase):
@istest
def api_lookup_not_found(self):
# when
with self.assertRaises(exc.NotFoundExc) as e:
api._api_lookup('something',
lambda x: None,
'this is the error message raised as it is None')
self.assertEqual(e.exception.args[0],
'this is the error message raised as it is None')
@istest
def api_lookup_with_result(self):
# when
actual_result = api._api_lookup('something',
lambda x: x + '!',
'this is the error which won\'t be '
'used here')
self.assertEqual(actual_result, 'something!')
@istest
def api_lookup_with_result_as_map(self):
# when
actual_result = api._api_lookup([1, 2, 3],
lambda x: map(lambda y: y+1, x),
'this is the error which won\'t be '
'used here')
self.assertEqual(actual_result, [2, 3, 4])
diff --git a/swh/web/ui/tests/test_backend.py b/swh/web/ui/tests/test_backend.py
index de085fbd..3bf07c68 100644
--- a/swh/web/ui/tests/test_backend.py
+++ b/swh/web/ui/tests/test_backend.py
@@ -1,429 +1,448 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
from nose.tools import istest
from unittest.mock import MagicMock
from swh.core import hashutil
from swh.web.ui import backend
from swh.web.ui.tests import test_app
class BackendTestCase(test_app.SWHApiTestCase):
@istest
def content_get_ko_not_found_1(self):
# given
sha1_bin = hashutil.hex_to_hash(
'456caf10e9535160d90e874b45aa426de762f777')
self.storage.content_get = MagicMock(return_value=None)
# when
actual_content = backend.content_get(sha1_bin)
# then
self.assertIsNone(actual_content)
self.storage.content_get.assert_called_once_with(
[sha1_bin])
@istest
def content_get_ko_not_found_empty_result(self):
# given
sha1_bin = hashutil.hex_to_hash(
'456caf10e9535160d90e874b45aa426de762f19f')
self.storage.content_get = MagicMock(return_value=[])
# when
actual_content = backend.content_get(sha1_bin)
# then
self.assertIsNone(actual_content)
self.storage.content_get.assert_called_once_with(
[sha1_bin])
@istest
def content_get(self):
# given
sha1_bin = hashutil.hex_to_hash(
'123caf10e9535160d90e874b45aa426de762f19f')
stub_contents = [{
'sha1': sha1_bin,
'data': b'binary data',
},
{}]
self.storage.content_get = MagicMock(return_value=stub_contents)
# when
actual_content = backend.content_get(sha1_bin)
# then
self.assertEquals(actual_content, stub_contents[0])
self.storage.content_get.assert_called_once_with(
[sha1_bin])
@istest
def content_find_ko_no_result(self):
# given
sha1_bin = hashutil.hex_to_hash(
'123caf10e9535160d90e874b45aa426de762f19f')
self.storage.content_find = MagicMock(return_value=None)
# when
actual_lookup = backend.content_find('sha1_git', sha1_bin)
# then
self.assertIsNone(actual_lookup)
self.storage.content_find.assert_called_once_with(
{'sha1_git': sha1_bin})
@istest
def content_find(self):
# given
sha1_bin = hashutil.hex_to_hash(
'456caf10e9535160d90e874b45aa426de762f19f')
self.storage.content_find = MagicMock(return_value=(1, 2, 3))
# when
actual_content = backend.content_find('sha1', sha1_bin)
# then
self.assertEquals(actual_content, (1, 2, 3))
# check the function has been called with parameters
self.storage.content_find.assert_called_with({'sha1': sha1_bin})
@istest
def content_find_occurrence_ko_no_result(self):
# given
sha1_bin = hashutil.hex_to_hash(
'123caf10e9535160d90e874b45aa426de762f19f')
self.storage.content_find_occurrence = MagicMock(return_value=None)
# when
actual_lookup = backend.content_find_occurrence('sha1_git', sha1_bin)
# then
self.assertIsNone(actual_lookup)
self.storage.content_find_occurrence.assert_called_once_with(
{'sha1_git': sha1_bin})
@istest
def content_find_occurrence(self):
# given
sha1_bin = hashutil.hex_to_hash(
'456caf10e9535160d90e874b45aa426de762f19f')
self.storage.content_find_occurrence = MagicMock(
return_value=(1, 2, 3))
# when
actual_content = backend.content_find_occurrence('sha1', sha1_bin)
# then
self.assertEquals(actual_content, (1, 2, 3))
# check the function has been called with parameters
self.storage.content_find_occurrence.assert_called_with(
{'sha1': sha1_bin})
@istest
def origin_get(self):
# given
self.storage.origin_get = MagicMock(return_value={
'id': 'origin-id',
'lister': 'uuid-lister',
'project': 'uuid-project',
'url': 'ftp://some/url/to/origin',
'type': 'ftp'})
# when
actual_origin = backend.origin_get('origin-id')
# then
self.assertEqual(actual_origin, {'id': 'origin-id',
'lister': 'uuid-lister',
'project': 'uuid-project',
'url': 'ftp://some/url/to/origin',
'type': 'ftp'})
self.storage.origin_get.assert_called_with({'id': 'origin-id'})
@istest
def person_get(self):
# given
self.storage.person_get = MagicMock(return_value={
'id': 'person-id',
'name': 'blah'})
# when
actual_person = backend.person_get('person-id')
# then
self.assertEqual(actual_person, {'id': 'person-id',
'name': 'blah'})
self.storage.person_get.assert_called_with(['person-id'])
@istest
def directory_get_not_found(self):
# given
sha1_bin = hashutil.hex_to_hash(
'40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
self.storage.directory_get = MagicMock(return_value=[])
# when
actual_directory = backend.directory_get(sha1_bin)
# then
self.assertIsNone(actual_directory)
self.storage.directory_get.assert_called_with(sha1_bin, False)
@istest
def directory_get(self):
# given
sha1_bin = hashutil.hex_to_hash(
'40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
stub_dir_entries = [{
'sha1': hashutil.hex_to_hash('5c6f0e2750f48fa0bd0c4cf5976ba0b9e0'
'2ebda5'),
'sha256': hashutil.hex_to_hash('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926'),
'sha1_git': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'target': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'dir_id': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'name': b'bob',
'type': 10,
}]
self.storage.directory_get = MagicMock(
return_value=stub_dir_entries)
actual_directory = backend.directory_get(sha1_bin, recursive=True)
# then
self.assertIsNotNone(actual_directory)
self.assertEqual(list(actual_directory), stub_dir_entries)
self.storage.directory_get.assert_called_with(sha1_bin, True)
@istest
def release_get_not_found(self):
# given
sha1_bin = hashutil.hex_to_hash(
'65a55bbdf3629f916219feb3dcc7393ded1bc8db')
self.storage.release_get = MagicMock(return_value=[])
# when
actual_release = backend.release_get(sha1_bin)
# then
self.assertIsNone(actual_release)
self.storage.release_get.assert_called_with([sha1_bin])
@istest
def release_get(self):
# given
sha1_bin = hashutil.hex_to_hash(
'65a55bbdf3629f916219feb3dcc7393ded1bc8db')
stub_releases = [{
'id': sha1_bin,
'target': None,
'date': datetime.datetime(2015, 1, 1, 22, 0, 0,
tzinfo=datetime.timezone.utc),
'name': b'v0.0.1',
'message': b'synthetic release',
'synthetic': True,
}]
self.storage.release_get = MagicMock(return_value=stub_releases)
# when
actual_release = backend.release_get(sha1_bin)
# then
self.assertEqual(actual_release, stub_releases[0])
self.storage.release_get.assert_called_with([sha1_bin])
@istest
def revision_get_by_not_found(self):
# given
self.storage.revision_get_by = MagicMock(return_value=[])
# when
actual_revision = backend.revision_get_by(10, 'master', 'ts2')
# then
self.assertIsNone(actual_revision)
self.storage.revision_get_by.assert_called_with(10, 'master',
timestamp='ts2',
limit=1)
@istest
def revision_get_by(self):
# given
self.storage.revision_get_by = MagicMock(return_value=[{'id': 1}])
# when
actual_revisions = backend.revision_get_by(100, 'dev', 'ts')
# then
self.assertEquals(actual_revisions, {'id': 1})
self.storage.revision_get_by.assert_called_with(100, 'dev',
timestamp='ts',
limit=1)
@istest
def revision_get_not_found(self):
# given
sha1_bin = hashutil.hex_to_hash(
'18d8be353ed3480476f032475e7c233eff7371d5')
self.storage.revision_get = MagicMock(return_value=[])
# when
actual_revision = backend.revision_get(sha1_bin)
# then
self.assertIsNone(actual_revision)
self.storage.revision_get.assert_called_with([sha1_bin])
@istest
def revision_get(self):
# given
sha1_bin = hashutil.hex_to_hash(
'18d8be353ed3480476f032475e7c233eff7371d5')
stub_revisions = [{
'id': sha1_bin,
'directory': hashutil.hex_to_hash(
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'),
'author': {
'name': b'bill & boule',
'email': b'bill@boule.org',
},
'committer': {
'name': b'boule & bill',
'email': b'boule@bill.org',
},
'message': b'elegant fix for bug 31415957',
'date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'date_offset': 0,
'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
}]
self.storage.revision_get = MagicMock(return_value=stub_revisions)
# when
actual_revision = backend.revision_get(sha1_bin)
# then
self.assertEqual(actual_revision, stub_revisions[0])
self.storage.revision_get.assert_called_with([sha1_bin])
@istest
def revision_log(self):
# given
sha1_bin = hashutil.hex_to_hash(
'28d8be353ed3480476f032475e7c233eff7371d5')
stub_revision_log = [{
'id': sha1_bin,
'directory': hashutil.hex_to_hash(
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'),
'author': {
'name': b'bill & boule',
'email': b'bill@boule.org',
},
'committer': {
'name': b'boule & bill',
'email': b'boule@bill.org',
},
'message': b'elegant fix for bug 31415957',
'date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'date_offset': 0,
'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
}]
self.storage.revision_log = MagicMock(return_value=stub_revision_log)
# when
actual_revision = backend.revision_log(sha1_bin)
# then
self.assertEqual(list(actual_revision), stub_revision_log)
self.storage.revision_log.assert_called_with(sha1_bin, 100)
@istest
def stat_counters(self):
# given
input_stats = {
"content": 1770830,
"directory": 211683,
"directory_entry_dir": 209167,
"directory_entry_file": 1807094,
"directory_entry_rev": 0,
"entity": 0,
"entity_history": 0,
"occurrence": 0,
"occurrence_history": 19600,
"origin": 1096,
"person": 0,
"release": 8584,
"revision": 7792,
"revision_history": 0,
"skipped_content": 0
}
self.storage.stat_counters = MagicMock(return_value=input_stats)
# when
actual_stats = backend.stat_counters()
# then
expected_stats = input_stats
self.assertEqual(actual_stats, expected_stats)
self.storage.stat_counters.assert_called_with()
@istest
def directory_entry_get_by_path(self):
# given
stub_dir_entry = {'id': b'dir-id',
'type': 'dir',
'name': b'some/path/foo'}
self.storage.directory_entry_get_by_path = MagicMock(
return_value=stub_dir_entry)
# when
actual_dir_entry = backend.directory_entry_get_by_path(b'dir-sha1',
'some/path/foo')
self.assertEquals(actual_dir_entry, stub_dir_entry)
self.storage.directory_entry_get_by_path.assert_called_once_with(
b'dir-sha1',
[b'some', b'path', b'foo'])
+
+ @istest
+ def entity_get(self):
+ # given
+ stub_entities = [{'uuid': 'e8c3fc2e-a932-4fd7-8f8e-c40645eb35a7',
+ 'parent': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2'},
+ {'uuid': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2',
+ 'parent': None}]
+ self.storage.entity_get = MagicMock(return_value=stub_entities)
+
+ # when
+ actual_entities = backend.entity_get(
+ 'e8c3fc2e-a932-4fd7-8f8e-c40645eb35a7')
+
+ # then
+ self.assertEquals(actual_entities, stub_entities)
+
+ self.storage.entity_get.assert_called_once_with(
+ 'e8c3fc2e-a932-4fd7-8f8e-c40645eb35a7')
diff --git a/swh/web/ui/tests/test_query.py b/swh/web/ui/tests/test_query.py
index b4e2aa81..1053e2e2 100644
--- a/swh/web/ui/tests/test_query.py
+++ b/swh/web/ui/tests/test_query.py
@@ -1,125 +1,141 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import unittest
from unittest.mock import patch
from nose.tools import istest
from swh.core import hashutil
from swh.web.ui import query
from swh.web.ui.exc import BadInputExc
class QueryTestCase(unittest.TestCase):
@istest
def parse_hash_malformed_query_with_more_than_2_parts(self):
with self.assertRaises(BadInputExc):
query.parse_hash('sha1:1234567890987654:other-stuff')
@istest
def parse_hash_guess_sha1(self):
h = 'f1d2d2f924e986ac86fdf7b36c94bcdf32beec15'
r = query.parse_hash(h)
self.assertEquals(r, ('sha1', hashutil.hex_to_hash(h)))
@istest
def parse_hash_guess_sha256(self):
h = '084C799CD551DD1D8D5C5F9A5D593B2' \
'E931F5E36122ee5c793c1d08a19839cc0'
r = query.parse_hash(h)
self.assertEquals(r, ('sha256', hashutil.hex_to_hash(h)))
@istest
def parse_hash_guess_algo_malformed_hash(self):
with self.assertRaises(BadInputExc):
query.parse_hash('1234567890987654')
@istest
def parse_hash_check_sha1(self):
h = 'f1d2d2f924e986ac86fdf7b36c94bcdf32beec15'
r = query.parse_hash('sha1:' + h)
self.assertEquals(r, ('sha1', hashutil.hex_to_hash(h)))
@istest
def parse_hash_check_sha1_git(self):
h = 'e1d2d2f924e986ac86fdf7b36c94bcdf32beec15'
r = query.parse_hash('sha1_git:' + h)
self.assertEquals(r, ('sha1_git', hashutil.hex_to_hash(h)))
@istest
def parse_hash_check_sha256(self):
h = '084C799CD551DD1D8D5C5F9A5D593B2E931F5E36122ee5c793c1d08a19839cc0'
r = query.parse_hash('sha256:' + h)
self.assertEquals(r, ('sha256', hashutil.hex_to_hash(h)))
@istest
def parse_hash_check_algo_malformed_sha1_hash(self):
with self.assertRaises(BadInputExc):
query.parse_hash('sha1:1234567890987654')
@istest
def parse_hash_check_algo_malformed_sha1_git_hash(self):
with self.assertRaises(BadInputExc):
query.parse_hash('sha1_git:1234567890987654')
@istest
def parse_hash_check_algo_malformed_sha256_hash(self):
with self.assertRaises(BadInputExc):
query.parse_hash('sha256:1234567890987654')
@istest
def parse_hash_check_algo_unknown_one(self):
with self.assertRaises(BadInputExc):
query.parse_hash('sha2:1234567890987654')
@patch('swh.web.ui.query.parse_hash')
@istest
def parse_hash_with_algorithms_or_throws_bad_query(self, mock_hash):
# given
mock_hash.side_effect = BadInputExc('Error input')
# when
with self.assertRaises(BadInputExc) as cm:
query.parse_hash_with_algorithms_or_throws(
'sha1:blah',
['sha1'],
'useless error message for this use case')
self.assertIn('Error input', cm.exception.args[0])
mock_hash.assert_called_once_with('sha1:blah')
@patch('swh.web.ui.query.parse_hash')
@istest
def parse_hash_with_algorithms_or_throws_bad_algo(self, mock_hash):
# given
mock_hash.return_value = 'sha1', '123'
# when
with self.assertRaises(BadInputExc) as cm:
query.parse_hash_with_algorithms_or_throws(
'sha1:431',
['sha1_git'],
'Only sha1_git!')
self.assertIn('Only sha1_git!', cm.exception.args[0])
mock_hash.assert_called_once_with('sha1:431')
@patch('swh.web.ui.query.parse_hash')
@istest
def parse_hash_with_algorithms(self, mock_hash):
# given
mock_hash.return_value = ('sha256', b'123')
# when
algo, sha = query.parse_hash_with_algorithms_or_throws(
'sha256:123',
['sha256', 'sha1_git'],
'useless error message for this use case')
self.assertEquals(algo, 'sha256')
self.assertEquals(sha, b'123')
mock_hash.assert_called_once_with('sha256:123')
+
+ @istest
+ def parse_uuid4(self):
+ # when
+ actual_uuid = query.parse_uuid4('7c33636b-8f11-4bda-89d9-ba8b76a42cec')
+
+ # then
+ self.assertEquals(actual_uuid, '7c33636b-8f11-4bda-89d9-ba8b76a42cec')
+
+ @istest
+ def parse_uuid4_ko(self):
+ # when
+ with self.assertRaises(BadInputExc) as cm:
+ query.parse_uuid4('7c33636b-8f11-4bda-89d9-ba8b76a42')
+ self.assertIn('badly formed hexadecimal UUID string',
+ cm.exception.args[0])
diff --git a/swh/web/ui/tests/test_service.py b/swh/web/ui/tests/test_service.py
index 3f60e3b5..4d569fba 100644
--- a/swh/web/ui/tests/test_service.py
+++ b/swh/web/ui/tests/test_service.py
@@ -1,1084 +1,1104 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
from nose.tools import istest
from unittest.mock import MagicMock, patch, call
from swh.core.hashutil import hex_to_hash, hash_to_hex
from swh.web.ui import service
from swh.web.ui.exc import BadInputExc, NotFoundExc
from swh.web.ui.tests import test_app
class ServiceTestCase(test_app.SWHApiTestCase):
@patch('swh.web.ui.service.backend')
@istest
def lookup_hash_does_not_exist(self, mock_backend):
# given
mock_backend.content_find = MagicMock(return_value=None)
# when
actual_lookup = service.lookup_hash(
'sha1_git:123caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEquals({'found': None,
'algo': 'sha1_git'}, actual_lookup)
# check the function has been called with parameters
mock_backend.content_find.assert_called_with(
'sha1_git',
hex_to_hash('123caf10e9535160d90e874b45aa426de762f19f'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_hash_exist(self, mock_backend):
# given
stub_content = {
'sha1': hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')
}
mock_backend.content_find = MagicMock(return_value=stub_content)
# when
actual_lookup = service.lookup_hash(
'sha1:456caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEquals({'found': stub_content,
'algo': 'sha1'}, actual_lookup)
mock_backend.content_find.assert_called_with(
'sha1',
hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f'),
)
@patch('swh.web.ui.service.backend')
@istest
def lookup_hash_origin(self, mock_backend):
# given
mock_backend.content_find_occurrence = MagicMock(return_value={
'origin_type': 'sftp',
'origin_url': 'sftp://ftp.gnu.org/gnu/octave',
'branch': 'octavio-3.4.0.tar.gz',
'revision': b'\xb0L\xaf\x10\xe9SQ`\xd9\x0e\x87KE\xaaBm\xe7b\xf1\x9f', # noqa
'path': b'octavio-3.4.0/doc/interpreter/octave.html/doc_002dS_005fISREG.html' # noqa
})
expected_origin = {
'origin_type': 'sftp',
'origin_url': 'sftp://ftp.gnu.org/gnu/octave',
'branch': 'octavio-3.4.0.tar.gz',
'revision': 'b04caf10e9535160d90e874b45aa426de762f19f',
'path': 'octavio-3.4.0/doc/interpreter/octave.html/doc'
'_002dS_005fISREG.html'
}
# when
actual_origin = service.lookup_hash_origin(
'sha1_git:456caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEqual(actual_origin, expected_origin)
mock_backend.content_find_occurrence.assert_called_with(
'sha1_git',
hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f'))
@patch('swh.web.ui.service.backend')
@istest
def stat_counters(self, mock_backend):
# given
input_stats = {
"content": 1770830,
"directory": 211683,
"directory_entry_dir": 209167,
"directory_entry_file": 1807094,
"directory_entry_rev": 0,
"entity": 0,
"entity_history": 0,
"occurrence": 0,
"occurrence_history": 19600,
"origin": 1096,
"person": 0,
"release": 8584,
"revision": 7792,
"revision_history": 0,
"skipped_content": 0
}
mock_backend.stat_counters = MagicMock(return_value=input_stats)
# when
actual_stats = service.stat_counters()
# then
expected_stats = input_stats
self.assertEqual(actual_stats, expected_stats)
mock_backend.stat_counters.assert_called_with()
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.hashutil')
@istest
def hash_and_search(self, mock_hashutil, mock_backend):
# given
bhash = hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')
mock_hashutil.hashfile.return_value = {'sha1': bhash}
mock_backend.content_find = MagicMock(return_value={
'sha1': bhash,
'sha1_git': bhash,
})
# when
actual_content = service.hash_and_search('/some/path')
# then
self.assertEqual(actual_content, {
'sha1': '456caf10e9535160d90e874b45aa426de762f19f',
'sha1_git': '456caf10e9535160d90e874b45aa426de762f19f',
'found': True,
})
mock_hashutil.hashfile.assert_called_once_with('/some/path')
mock_backend.content_find.assert_called_once_with('sha1', bhash)
@patch('swh.web.ui.service.hashutil')
@istest
def hash_and_search_not_found(self, mock_hashutil):
# given
bhash = hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')
mock_hashutil.hashfile.return_value = {'sha1': bhash}
mock_hashutil.hash_to_hex = MagicMock(
return_value='456caf10e9535160d90e874b45aa426de762f19f')
self.storage.content_find = MagicMock(return_value=None)
# when
actual_content = service.hash_and_search('/some/path')
# then
self.assertEqual(actual_content, {
'sha1': '456caf10e9535160d90e874b45aa426de762f19f',
'found': False,
})
mock_hashutil.hashfile.assert_called_once_with('/some/path')
self.storage.content_find.assert_called_once_with({'sha1': bhash})
mock_hashutil.hash_to_hex.assert_called_once_with(bhash)
@patch('swh.web.ui.service.upload')
@istest
def test_upload_and_search(self, mock_upload):
mock_upload.save_in_upload_folder.return_value = (
'/tmp/dir', 'some-filename', '/tmp/dir/path/some-filename')
service.hash_and_search = MagicMock(side_effect=lambda filepath:
{'sha1': 'blah',
'found': True})
mock_upload.cleanup.return_value = None
file = MagicMock(filename='some-filename')
# when
actual_res = service.upload_and_search(file)
# then
self.assertEqual(actual_res, {
'filename': 'some-filename',
'sha1': 'blah',
'found': True})
mock_upload.save_in_upload_folder.assert_called_with(file)
mock_upload.cleanup.assert_called_with('/tmp/dir')
service.hash_and_search.assert_called_once_with(
'/tmp/dir/path/some-filename')
@patch('swh.web.ui.service.backend')
@istest
def lookup_origin(self, mock_backend):
# given
mock_backend.origin_get = MagicMock(return_value={
'id': 'origin-id',
'lister': 'uuid-lister',
'project': 'uuid-project',
'url': 'ftp://some/url/to/origin',
'type': 'ftp'})
# when
actual_origin = service.lookup_origin('origin-id')
# then
self.assertEqual(actual_origin, {'id': 'origin-id',
'lister': 'uuid-lister',
'project': 'uuid-project',
'url': 'ftp://some/url/to/origin',
'type': 'ftp'})
mock_backend.origin_get.assert_called_with('origin-id')
@patch('swh.web.ui.service.backend')
@istest
def lookup_release_ko_id_checksum_not_ok_because_not_a_sha1(self,
mock_backend):
# given
mock_backend.release_get = MagicMock()
with self.assertRaises(BadInputExc) as cm:
# when
service.lookup_release('not-a-sha1')
self.assertIn('invalid checksum', cm.exception.args[0])
mock_backend.release_get.called = False
@patch('swh.web.ui.service.backend')
@istest
def lookup_release_ko_id_checksum_ok_but_not_a_sha1(self, mock_backend):
# given
mock_backend.release_get = MagicMock()
# when
with self.assertRaises(BadInputExc) as cm:
service.lookup_release(
'13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f62d4daf5'
'1aea892abe')
self.assertIn('sha1_git supported', cm.exception.args[0])
mock_backend.release_get.called = False
@patch('swh.web.ui.service.backend')
@istest
def lookup_release(self, mock_backend):
# given
mock_backend.release_get = MagicMock(return_value={
'id': hex_to_hash('65a55bbdf3629f916219feb3dcc7393ded1bc8db'),
'target': None,
'date': datetime.datetime(2015, 1, 1, 22, 0, 0,
tzinfo=datetime.timezone.utc),
'name': b'v0.0.1',
'message': b'synthetic release',
'synthetic': True,
})
# when
actual_release = service.lookup_release(
'65a55bbdf3629f916219feb3dcc7393ded1bc8db')
# then
self.assertEqual(actual_release, {
'id': '65a55bbdf3629f916219feb3dcc7393ded1bc8db',
'target': None,
'date': datetime.datetime(2015, 1, 1, 22, 0, 0,
tzinfo=datetime.timezone.utc),
'name': 'v0.0.1',
'message': 'synthetic release',
'synthetic': True,
})
mock_backend.release_get.assert_called_with(
hex_to_hash('65a55bbdf3629f916219feb3dcc7393ded1bc8db'))
@istest
def lookup_revision_with_context_ko_not_a_sha1_1(self):
# given
sha1_git = '13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f62d4' \
'daf51aea892abe'
sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
# when
with self.assertRaises(BadInputExc) as cm:
service.lookup_revision_with_context(sha1_git_root, sha1_git)
self.assertIn('Only sha1_git is supported', cm.exception.args[0])
@istest
def lookup_revision_with_context_ko_not_a_sha1_2(self):
# given
sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
sha1_git = '13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f6' \
'2d4daf51aea892abe'
# when
with self.assertRaises(BadInputExc) as cm:
service.lookup_revision_with_context(sha1_git_root, sha1_git)
self.assertIn('Only sha1_git is supported', cm.exception.args[0])
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_with_context_ko_sha1_git_does_not_exist(
self,
mock_backend):
# given
sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
sha1_git = '777777bdf3629f916219feb3dcc7393ded1bc8db'
sha1_git_bin = hex_to_hash(sha1_git)
mock_backend.revision_get.return_value = None
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_revision_with_context(sha1_git_root, sha1_git)
self.assertIn('Revision 777777bdf3629f916219feb3dcc7393ded1bc8db'
' not found', cm.exception.args[0])
mock_backend.revision_get.assert_called_once_with(
sha1_git_bin)
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_with_context_ko_root_sha1_git_does_not_exist(
self,
mock_backend):
# given
sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
sha1_git = '777777bdf3629f916219feb3dcc7393ded1bc8db'
sha1_git_root_bin = hex_to_hash(sha1_git_root)
sha1_git_bin = hex_to_hash(sha1_git)
mock_backend.revision_get.side_effect = ['foo', None]
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_revision_with_context(sha1_git_root, sha1_git)
self.assertIn('Revision 65a55bbdf3629f916219feb3dcc7393ded1bc8db'
' not found', cm.exception.args[0])
mock_backend.revision_get.assert_has_calls([call(sha1_git_bin),
call(sha1_git_root_bin)])
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_revision_with_context(self, mock_query, mock_backend):
# given
sha1_git_root = '666'
sha1_git = '883'
sha1_git_root_bin = b'666'
sha1_git_bin = b'883'
sha1_git_root_dict = {
'id': sha1_git_root_bin,
'parents': [b'999'],
}
sha1_git_dict = {
'id': sha1_git_bin,
'parents': [],
'directory': b'278',
}
stub_revisions = [
sha1_git_root_dict,
{
'id': b'999',
'parents': [b'777', b'883', b'888'],
},
{
'id': b'777',
'parents': [b'883'],
},
sha1_git_dict,
{
'id': b'888',
'parents': [b'889'],
},
{
'id': b'889',
'parents': [],
},
]
# inputs ok
mock_query.parse_hash_with_algorithms_or_throws.side_effect = [
('sha1', sha1_git_bin),
('sha1', sha1_git_root_bin)
]
# lookup revision first 883, then 666 (both exists)
mock_backend.revision_get.side_effect = [
sha1_git_dict,
sha1_git_root_dict
]
mock_backend.revision_log = MagicMock(
return_value=stub_revisions)
# when
actual_revision = service.lookup_revision_with_context(
sha1_git_root,
sha1_git)
# then
self.assertEquals(actual_revision, {
'id': hash_to_hex(sha1_git_bin),
'parents': [],
'children': [hash_to_hex(b'999'), hash_to_hex(b'777')],
'directory': hash_to_hex(b'278'),
})
mock_query.parse_hash_with_algorithms_or_throws.assert_has_calls(
[call(sha1_git, ['sha1'], 'Only sha1_git is supported.'),
call(sha1_git_root, ['sha1'], 'Only sha1_git is supported.')])
mock_backend.revision_log.assert_called_with(
sha1_git_root_bin, 100)
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_revision_not_found(self,
mock_query,
mock_backend):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
b'123')
mock_backend.revision_get.return_value = None
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_directory_with_revision('123')
self.assertIn('Revision 123 not found', cm.exception.args[0])
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_revision_without_path(self,
mock_query,
mock_backend):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
b'123')
dir_id = b'dir-id-as-sha1'
mock_backend.revision_get.return_value = {
'directory': dir_id,
}
stub_dir_entries = [{
'id': b'123',
'type': 'dir'
}, {
'id': b'456',
'type': 'file'
}]
mock_backend.directory_get.return_value = stub_dir_entries
# when
actual_directory_entries = service.lookup_directory_with_revision(
'123')
self.assertEqual(actual_directory_entries['type'], 'dir')
self.assertEqual(list(actual_directory_entries['content']),
stub_dir_entries)
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
mock_backend.directory_get.assert_called_once_with(dir_id)
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_revision_with_path_to_dir(self,
mock_query,
mock_backend):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
b'123')
dir_id = b'dir-id-as-sha1'
mock_backend.revision_get.return_value = {
'directory': dir_id,
}
stub_dir_entries = [{
'id': b'12',
'type': 'dir'
}, {
'id': b'34',
'type': 'file'
}]
mock_backend.directory_entry_get_by_path.return_value = {
'type': 'dir',
'name': b'some/path',
'target': b'456'
}
mock_backend.directory_get.return_value = stub_dir_entries
# when
actual_directory_entries = service.lookup_directory_with_revision(
'123',
'some/path')
self.assertEqual(actual_directory_entries['type'], 'dir')
self.assertEqual(list(actual_directory_entries['content']),
stub_dir_entries)
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
mock_backend.directory_entry_get_by_path.assert_called_once_with(
dir_id,
'some/path')
mock_backend.directory_get.assert_called_once_with(b'456')
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_revision_with_path_to_file(
self,
mock_query,
mock_backend):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
b'123')
dir_id = b'dir-id-as-sha1'
mock_backend.revision_get.return_value = {
'directory': dir_id,
}
mock_backend.directory_entry_get_by_path.return_value = {
'type': 'file',
'name': b'some/path/to/file',
'target': b'789'
}
stub_content = {
'status': 'visible',
}
mock_backend.content_find.return_value = stub_content
# when
actual_content = service.lookup_directory_with_revision(
'123',
'some/path/to/file')
# then
self.assertEqual(actual_content, {'type': 'file',
'content': stub_content})
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
mock_backend.directory_entry_get_by_path.assert_called_once_with(
b'dir-id-as-sha1', 'some/path/to/file')
mock_backend.content_find.assert_called_once_with('sha1_git', b'789')
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_ko_revision_with_path_to_nowhere(
self,
mock_query,
mock_backend):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
b'123')
dir_id = b'dir-id-as-sha1'
mock_backend.revision_get.return_value = {
'directory': dir_id,
}
mock_backend.directory_entry_get_by_path.return_value = None
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_directory_with_revision(
'123',
'path/to/something/unknown')
self.assertIn("Directory/File 'path/to/something/unknown' " +
"pointed to by revision 123 not found",
cm.exception.args[0])
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
mock_backend.directory_entry_get_by_path.assert_called_once_with(
b'dir-id-as-sha1', 'path/to/something/unknown')
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_ok_type_not_implemented(
self,
mock_query,
mock_backend):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
b'123')
dir_id = b'dir-id-as-sha1'
mock_backend.revision_get.return_value = {
'directory': dir_id,
}
mock_backend.directory_entry_get_by_path.return_value = {
'type': 'rev',
'name': b'some/path/to/rev',
'target': b'456'
}
stub_content = {
'id': b'12',
'type': 'file'
}
mock_backend.content_get.return_value = stub_content
# when
with self.assertRaises(NotImplementedError) as cm:
service.lookup_directory_with_revision(
'123',
'some/path/to/rev')
self.assertIn("Entity of type 'rev' not implemented.",
cm.exception.args[0])
# then
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
mock_backend.directory_entry_get_by_path.assert_called_once_with(
b'dir-id-as-sha1', 'some/path/to/rev')
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision(self, mock_backend):
# given
mock_backend.revision_get = MagicMock(return_value={
'id': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'),
'directory': hex_to_hash(
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'),
'author': {
'name': b'bill & boule',
'email': b'bill@boule.org',
},
'committer': {
'name': b'boule & bill',
'email': b'boule@bill.org',
},
'message': b'elegant fix for bug 31415957',
'date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'date_offset': 0,
'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
})
# when
actual_revision = service.lookup_revision(
'18d8be353ed3480476f032475e7c233eff7371d5')
# then
self.assertEqual(actual_revision, {
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'author': {
'name': 'bill & boule',
'email': 'bill@boule.org',
},
'committer': {
'name': 'boule & bill',
'email': 'boule@bill.org',
},
'message': 'elegant fix for bug 31415957',
'date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'date_offset': 0,
'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
})
mock_backend.revision_get.assert_called_with(
hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_log(self, mock_backend):
# given
stub_revision_log = [{
'id': hex_to_hash('28d8be353ed3480476f032475e7c233eff7371d5'),
'directory': hex_to_hash(
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'),
'author': {
'name': b'bill & boule',
'email': b'bill@boule.org',
},
'committer': {
'name': b'boule & bill',
'email': b'boule@bill.org',
},
'message': b'elegant fix for bug 31415957',
'date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'date_offset': 0,
'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
}]
mock_backend.revision_log = MagicMock(return_value=stub_revision_log)
# when
actual_revision = service.lookup_revision_log(
'abcdbe353ed3480476f032475e7c233eff7371d5')
# then
self.assertEqual(list(actual_revision), [{
'id': '28d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'author': {
'name': 'bill & boule',
'email': 'bill@boule.org',
},
'committer': {
'name': 'boule & bill',
'email': 'boule@bill.org',
},
'message': 'elegant fix for bug 31415957',
'date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'date_offset': 0,
'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
}])
mock_backend.revision_log.assert_called_with(
hex_to_hash('abcdbe353ed3480476f032475e7c233eff7371d5'), 100)
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_raw_not_found(self, mock_backend):
# given
mock_backend.content_find = MagicMock(return_value=None)
# when
actual_content = service.lookup_content_raw(
'sha1:18d8be353ed3480476f032475e7c233eff7371d5')
# then
self.assertIsNone(actual_content)
mock_backend.content_find.assert_called_with(
'sha1', hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_raw(self, mock_backend):
# given
mock_backend.content_find = MagicMock(return_value={
'sha1': '18d8be353ed3480476f032475e7c233eff7371d5',
})
mock_backend.content_get = MagicMock(return_value={
'data': b'binary data'})
# when
actual_content = service.lookup_content_raw(
'sha256:39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926')
# then
self.assertEquals(actual_content, {'data': b'binary data'})
mock_backend.content_find.assert_called_once_with(
'sha256', hex_to_hash('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926'))
mock_backend.content_get.assert_called_once_with(
'18d8be353ed3480476f032475e7c233eff7371d5')
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_not_found(self, mock_backend):
# given
mock_backend.content_find = MagicMock(return_value=None)
# when
actual_content = service.lookup_content(
'sha1:18d8be353ed3480476f032475e7c233eff7371d5')
# then
self.assertIsNone(actual_content)
mock_backend.content_find.assert_called_with(
'sha1', hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_with_sha1(self, mock_backend):
# given
mock_backend.content_find = MagicMock(return_value={
'sha1': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'),
'sha256': hex_to_hash('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926'),
'sha1_git': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'length': 190,
'status': 'hidden',
})
# when
actual_content = service.lookup_content(
'sha1:18d8be353ed3480476f032475e7c233eff7371d5')
# then
self.assertEqual(actual_content, {
'sha1': '18d8be353ed3480476f032475e7c233eff7371d5',
'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d274'
'7d3bf96c926',
'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'length': 190,
'status': 'absent',
})
mock_backend.content_find.assert_called_with(
'sha1', hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_with_sha256(self, mock_backend):
# given
mock_backend.content_find = MagicMock(return_value={
'sha1': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'),
'sha256': hex_to_hash('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926'),
'sha1_git': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'length': 360,
'status': 'visible',
})
# when
actual_content = service.lookup_content(
'sha256:39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926')
# then
self.assertEqual(actual_content, {
'sha1': '18d8be353ed3480476f032475e7c233eff7371d5',
'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d274'
'7d3bf96c926',
'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'length': 360,
'status': 'visible',
})
mock_backend.content_find.assert_called_with(
'sha256', hex_to_hash('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_person(self, mock_backend):
# given
mock_backend.person_get = MagicMock(return_value={
'id': 'person_id',
'name': b'some_name',
'email': b'some-email',
})
# when
actual_person = service.lookup_person('person_id')
# then
self.assertEqual(actual_person, {
'id': 'person_id',
'name': 'some_name',
'email': 'some-email',
})
mock_backend.person_get.assert_called_with('person_id')
@patch('swh.web.ui.service.backend')
@istest
def lookup_directory_bad_checksum(self, mock_backend):
# given
mock_backend.directory_get = MagicMock()
# when
with self.assertRaises(BadInputExc):
service.lookup_directory('directory_id')
# then
mock_backend.directory_get.called = False
@patch('swh.web.ui.service.backend')
@istest
def lookup_directory(self, mock_backend):
# given
stub_dir_entries = [{
'sha1': hex_to_hash('5c6f0e2750f48fa0bd0c4cf5976ba0b9e0'
'2ebda5'),
'sha256': hex_to_hash('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926'),
'sha1_git': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'target': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'dir_id': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'name': b'bob',
'type': 10,
}]
expected_dir_entries = [{
'sha1': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5',
'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d2747'
'd3bf96c926',
'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'target': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'dir_id': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'name': 'bob',
'type': 10,
}]
mock_backend.directory_get = MagicMock(
return_value=stub_dir_entries)
# when
actual_directory = service.lookup_directory(
'40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
# then
self.assertEqual(list(actual_directory), expected_dir_entries)
mock_backend.directory_get.assert_called_with(
hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_by_nothing_found(self, mock_backend):
# given
mock_backend.revision_get_by.return_value = None
# when
actual_revisions = service.lookup_revision_by(1)
# then
self.assertIsNone(actual_revisions)
mock_backend.revision_get_by(1, 'master', None)
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_by(self, mock_backend):
# given
stub_rev = {
'id': hex_to_hash('28d8be353ed3480476f032475e7c233eff7371d5'),
'directory': hex_to_hash(
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'),
'author': {
'name': b'ynot',
'email': b'ynot@blah.org',
},
'committer': {
'name': b'ynot',
'email': b'ynot@blah.org',
},
'message': b'elegant solution 31415',
'date': datetime.datetime(2016, 1, 17, 11, 23, 54),
'date_offset': 0,
'committer_date': datetime.datetime(2016, 1, 17, 11, 23, 54),
'committer_date_offset': 0,
}
expected_rev = {
'id': '28d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'author': {
'name': 'ynot',
'email': 'ynot@blah.org',
},
'committer': {
'name': 'ynot',
'email': 'ynot@blah.org',
},
'message': 'elegant solution 31415',
'date': datetime.datetime(2016, 1, 17, 11, 23, 54),
'date_offset': 0,
'committer_date': datetime.datetime(2016, 1, 17, 11, 23, 54),
'committer_date_offset': 0,
}
mock_backend.revision_get_by.return_value = stub_rev
# when
actual_revision = service.lookup_revision_by(10, 'master2', 'some-ts')
# then
self.assertEquals(actual_revision, expected_rev)
mock_backend.revision_get_by(1, 'master2', 'some-ts')
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_with_context_by_ko(self, mock_backend):
# given
mock_backend.revision_get_by.return_value = None
# when
with self.assertRaises(NotFoundExc) as cm:
origin_id = 1
branch_name = 'master3'
ts = None
service.lookup_revision_with_context_by(origin_id, branch_name, ts,
'sha1')
# then
self.assertIn(
'Revision with (origin_id: %s, branch_name: %s'
', ts: %s) not found.' % (origin_id,
branch_name,
ts), cm.exception.args[0])
mock_backend.revision_get_by.assert_called_once_with(
origin_id, branch_name, ts)
@patch('swh.web.ui.service.lookup_revision_with_context')
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_with_context_by(self, mock_backend,
mock_lookup_revision_with_context):
# given
stub_root_rev = {'id': 'root-rev-id'}
mock_backend.revision_get_by.return_value = {'id': 'root-rev-id'}
stub_rev = {'id': 'rev-found'}
mock_lookup_revision_with_context.return_value = stub_rev
# when
origin_id = 1
branch_name = 'master3'
ts = None
sha1_git = 'sha1'
actual_root_rev, actual_rev = service.lookup_revision_with_context_by(
origin_id, branch_name, ts, sha1_git)
# then
self.assertEquals(actual_root_rev, stub_root_rev)
self.assertEquals(actual_rev, stub_rev)
mock_backend.revision_get_by.assert_called_once_with(
origin_id, branch_name, ts)
mock_lookup_revision_with_context.assert_called_once_with(
stub_root_rev, sha1_git, 100)
+
+ @patch('swh.web.ui.service.backend')
+ @patch('swh.web.ui.service.query')
+ @istest
+ def lookup_entity_by_uuid(self, mock_query, mock_backend):
+ # given
+ uuid_test = 'correct-uuid'
+ mock_query.parse_uuid4.return_value = uuid_test
+ stub_entities = [{'uuid': uuid_test}]
+
+ mock_backend.entity_get.return_value = stub_entities
+
+ # when
+ actual_entities = service.lookup_entity_by_uuid(uuid_test)
+
+ # then
+ self.assertEquals(actual_entities, stub_entities)
+
+ mock_query.parse_uuid4.assert_called_once_with(uuid_test)
+ mock_backend.entity_get.assert_called_once_with(uuid_test)

File Metadata

Mime Type
text/x-diff
Expires
Jul 4 2025, 9:52 AM (5 w, 1 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3236801

Event Timeline