Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/web/ui/backend.py b/swh/web/ui/backend.py
index b9bb9b74..dab5b187 100644
--- a/swh/web/ui/backend.py
+++ b/swh/web/ui/backend.py
@@ -1,277 +1,291 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
from . import main
def content_get(sha1_bin):
"""Lookup the content designed by {algo: hash_bin}.
Args:
sha1_bin: content's binary sha1.
Returns:
Content as dict with 'sha1' and 'data' keys.
data representing its raw data.
"""
contents = main.storage().content_get([sha1_bin])
if contents and len(contents) >= 1:
return contents[0]
return None
def content_find(algo, hash_bin):
"""Retrieve the content with binary hash hash_bin
Args:
algo: nature of the hash hash_bin.
hash_bin: content's hash searched for.
Returns:
A triplet (sha1, sha1_git, sha256) if the content exist
or None otherwise.
"""
return main.storage().content_find({algo: hash_bin})
def content_find_provenance(algo, hash_bin):
"""Find the content's provenance information.
Args:
algo: nature of the hash hash_bin.
hash_bin: content's hash corresponding to algo searched for.
Yields:
Yields the list of provenance information for that content if
any (this can be empty if the cache is not populated)
"""
return main.storage().content_find_provenance({algo: hash_bin})
def content_missing_per_sha1(sha1list):
"""List content missing from storage based on sha1
Args:
sha1s: Iterable of sha1 to check for absence
Returns:
an iterable of sha1s missing from the storage
"""
return main.storage().content_missing_per_sha1(sha1list)
def directory_get(sha1_bin):
"""Retrieve information on one directory.
Args:
sha1_bin: Directory's identifier
Returns:
The directory's information.
"""
res = main.storage().directory_get([sha1_bin])
if res and len(res) >= 1:
return res[0]
def origin_get(origin):
"""Return information about the origin matching dict origin.
Args:
origin: origin's dict with keys either 'id' or
('type' AND 'url')
Returns:
Origin information as dict.
"""
return main.storage().origin_get(origin)
def person_get(person_id):
"""Return information about the person with id person_id.
Args:
person_id: person's identifier.v
Returns:
Person information as dict.
"""
res = main.storage().person_get([person_id])
if res and len(res) >= 1:
return res[0]
def directory_ls(sha1_git_bin, recursive=False):
"""Return information about the directory with id sha1_git.
Args:
sha1_git: directory's identifier.
recursive: Optional recursive flag default to False
Returns:
Directory information as dict.
"""
directory_entries = main.storage().directory_ls(sha1_git_bin, recursive)
if not directory_entries:
return []
return directory_entries
def release_get(sha1_git_bin):
"""Return information about the release with sha1 sha1_git_bin.
Args:
sha1_git_bin: The release's sha1 as bytes.
Returns:
Release information as dict if found, None otherwise.
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
res = main.storage().release_get([sha1_git_bin])
if res and len(res) >= 1:
return res[0]
return None
def revision_get(sha1_git_bin):
"""Return information about the revision with sha1 sha1_git_bin.
Args:
sha1_git_bin: The revision's sha1 as bytes.
Returns:
Revision information as dict if found, None otherwise.
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
res = main.storage().revision_get([sha1_git_bin])
if res and len(res) >= 1:
return res[0]
return None
def revision_get_multiple(sha1_git_bin_list):
"""Return information about the revisions in sha1_git_bin_list
Args:
sha1_git_bin_list: The revisions' sha1s as a list of bytes.
Returns:
Revisions' information as an iterable of dicts if any found,
an empty list otherwise
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
res = main.storage().revision_get(sha1_git_bin_list)
if res and len(res) >= 1:
return res
return []
def revision_log(sha1_git_bin, limit):
"""Return information about the revision with sha1 sha1_git_bin.
Args:
sha1_git_bin: The revision's sha1 as bytes.
limit: the maximum number of revisions returned.
Returns:
Revision information as dict if found, None otherwise.
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
return main.storage().revision_log([sha1_git_bin], limit)
def revision_log_by(origin_id, branch_name, ts, limit):
"""Return information about the revision matching the timestamp
ts, from origin origin_id, in branch branch_name.
Args:
origin_id: origin of the revision
- branch_name: revision's branch.
- timestamp: revision's time frame.
Returns:
Information for the revision matching the criterions.
"""
return main.storage().revision_log_by(origin_id,
branch_name,
ts,
limit=limit)
def stat_counters():
"""Return the stat counters for Software Heritage
Returns:
A dict mapping textual labels to integer values.
"""
return main.storage().stat_counters()
def lookup_origin_visits(origin_id):
"""Yields the origin origin_ids' visits.
Args:
origin_id: origin to list visits for
Yields:
Dictionaries of origin_visit for that origin
"""
yield from main.storage().origin_visit_get(origin_id)
+def lookup_origin_visit(origin_id, visit_id):
+ """Return information about visit visit_id with origin origin_id.
+
+ Args:
+ origin_id: origin concerned by the visit
+ visit_id: the visit identifier to lookup
+
+ Yields:
+ The dict origin_visit concerned
+
+ """
+ return main.storage().origin_visit_get_by(origin_id, visit_id)
+
+
def revision_get_by(origin_id, branch_name, timestamp):
"""Return occurrence information matching the criterions origin_id,
branch_name, ts.
"""
res = main.storage().revision_get_by(origin_id,
branch_name,
timestamp=timestamp,
limit=1)
if not res:
return None
return res[0]
def directory_entry_get_by_path(directory, path):
"""Return a directory entry by its path.
"""
paths = path.strip(os.path.sep).split(os.path.sep)
return main.storage().directory_entry_get_by_path(
directory,
list(map(lambda p: p.encode('utf-8'), paths)))
def entity_get(uuid):
"""Retrieve the entity per its uuid.
"""
return main.storage().entity_get(uuid)
diff --git a/swh/web/ui/service.py b/swh/web/ui/service.py
index 1b3f72d1..33cef982 100644
--- a/swh/web/ui/service.py
+++ b/swh/web/ui/service.py
@@ -1,630 +1,646 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import defaultdict
from swh.core import hashutil
from swh.web.ui import converters, query, backend
from swh.web.ui.exc import NotFoundExc
def lookup_multiple_hashes(hashes):
"""Lookup the passed hashes in a single DB connection, using batch
processing.
Args:
An array of {filename: X, sha1: Y}, string X, hex sha1 string Y.
Returns:
The same array with elements updated with elem['found'] = true if
the hash is present in storage, elem['found'] = false if not.
"""
hashlist = [hashutil.hex_to_hash(elem['sha1']) for elem in hashes]
content_missing = backend.content_missing_per_sha1(hashlist)
missing = [hashutil.hash_to_hex(x) for x in content_missing]
for x in hashes:
x.update({'found': True})
for h in hashes:
if h['sha1'] in missing:
h['found'] = False
return hashes
def lookup_hash(q):
"""Checks if the storage contains a given content checksum
Args: query string of the form <hash_algo:hash>
Returns: Dict with key found containing the hash info if the
hash is present, None if not.
"""
algo, hash = query.parse_hash(q)
found = backend.content_find(algo, hash)
return {'found': found,
'algo': algo}
def search_hash(q):
"""Checks if the storage contains a given content checksum
Args: query string of the form <hash_algo:hash>
Returns: Dict with key found to True or False, according to
whether the checksum is present or not
"""
algo, hash = query.parse_hash(q)
found = backend.content_find(algo, hash)
return {'found': found is not None}
def lookup_content_provenance(q):
"""Return provenance information from a specified content.
Args:
q: query string of the form <hash_algo:hash>
Yields:
provenance information (dict) list if the content is found.
"""
algo, hash = query.parse_hash(q)
provenances = backend.content_find_provenance(algo, hash)
if not provenances:
return None
return (converters.from_provenance(p) for p in provenances)
def lookup_origin(origin):
"""Return information about the origin matching dict origin.
Args:
origin: origin's dict with keys either 'id' or
('type' AND 'url')
Returns:
origin information as dict.
"""
return backend.origin_get(origin)
def lookup_person(person_id):
"""Return information about the person with id person_id.
Args:
person_id as string
Returns:
person information as dict.
"""
person = backend.person_get(person_id)
return converters.from_person(person)
def lookup_directory(sha1_git):
"""Return information about the directory with id sha1_git.
Args:
sha1_git as string
Returns:
directory information as dict.
"""
_, sha1_git_bin = query.parse_hash_with_algorithms_or_throws(
sha1_git,
['sha1'], # HACK: sha1_git really
'Only sha1_git is supported.')
dir = backend.directory_get(sha1_git_bin)
if not dir:
return None
directory_entries = backend.directory_ls(sha1_git_bin)
return map(converters.from_directory_entry, directory_entries)
def lookup_directory_with_path(directory_sha1_git, path_string):
"""Return directory information for entry with path path_string w.r.t.
root directory pointed by directory_sha1_git
Args:
- directory_sha1_git: sha1_git corresponding to the directory
to which we append paths to (hopefully) find the entry
- the relative path to the entry starting from the directory pointed by
directory_sha1_git
Raises:
NotFoundExc if the directory entry is not found
"""
_, sha1_git_bin = query.parse_hash_with_algorithms_or_throws(
directory_sha1_git,
['sha1'],
'Only sha1_git is supported.')
queried_dir = backend.directory_entry_get_by_path(
sha1_git_bin, path_string)
if not queried_dir:
raise NotFoundExc(('Directory entry with path %s from %s not found') %
(path_string, directory_sha1_git))
return converters.from_directory_entry(queried_dir)
def lookup_release(release_sha1_git):
"""Return information about the release with sha1 release_sha1_git.
Args:
release_sha1_git: The release's sha1 as hexadecimal
Returns:
Release information as dict.
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
_, sha1_git_bin = query.parse_hash_with_algorithms_or_throws(
release_sha1_git,
['sha1'],
'Only sha1_git is supported.')
res = backend.release_get(sha1_git_bin)
return converters.from_release(res)
def lookup_revision(rev_sha1_git):
"""Return information about the revision with sha1 revision_sha1_git.
Args:
revision_sha1_git: The revision's sha1 as hexadecimal
Returns:
Revision information as dict.
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
_, sha1_git_bin = query.parse_hash_with_algorithms_or_throws(
rev_sha1_git,
['sha1'],
'Only sha1_git is supported.')
revision = backend.revision_get(sha1_git_bin)
return converters.from_revision(revision)
def lookup_revision_multiple(sha1_git_list):
"""Return information about the revision with sha1 revision_sha1_git.
Args:
revision_sha1_git: The revision's sha1 as hexadecimal
Returns:
Revision information as dict.
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
def to_sha1_bin(sha1_hex):
_, sha1_git_bin = query.parse_hash_with_algorithms_or_throws(
sha1_hex,
['sha1'],
'Only sha1_git is supported.')
return sha1_git_bin
sha1_bin_list = (to_sha1_bin(x) for x in sha1_git_list)
revisions = backend.revision_get_multiple(sha1_bin_list)
return (converters.from_revision(x) for x in revisions)
def lookup_revision_message(rev_sha1_git):
"""Return the raw message of the revision with sha1 revision_sha1_git.
Args:
revision_sha1_git: The revision's sha1 as hexadecimal
Returns:
Decoded revision message as dict {'message': <the_message>}
Raises:
ValueError if the identifier provided is not of sha1 nature.
NotFoundExc if the revision is not found, or if it has no message
"""
_, sha1_git_bin = query.parse_hash_with_algorithms_or_throws(
rev_sha1_git,
['sha1'],
'Only sha1_git is supported.')
revision = backend.revision_get(sha1_git_bin)
if not revision:
raise NotFoundExc('Revision with sha1_git %s not found.'
% rev_sha1_git)
if 'message' not in revision:
raise NotFoundExc('No message for revision with sha1_git %s.'
% rev_sha1_git)
res = {'message': revision['message']}
return res
def lookup_revision_by(origin_id,
branch_name="refs/heads/master",
timestamp=None):
"""Lookup revisions by origin_id, branch_name and timestamp.
If:
- branch_name is not provided, lookup using 'refs/heads/master' as default.
- ts is not provided, use the most recent
Args:
- origin_id: origin of the revision.
- branch_name: revision's branch.
- timestamp: revision's time frame.
Yields:
The revisions matching the criterions.
"""
res = backend.revision_get_by(origin_id, branch_name, timestamp)
return converters.from_revision(res)
def lookup_revision_log(rev_sha1_git, limit):
"""Return information about the revision with sha1 revision_sha1_git.
Args:
revision_sha1_git: The revision's sha1 as hexadecimal
limit: the maximum number of revisions returned
Returns:
Revision information as dict.
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
_, sha1_git_bin = query.parse_hash_with_algorithms_or_throws(
rev_sha1_git,
['sha1'],
'Only sha1_git is supported.')
revision_entries = backend.revision_log(sha1_git_bin, limit)
return map(converters.from_revision, revision_entries)
def lookup_revision_log_by(origin_id, branch_name, timestamp, limit):
"""Return information about the revision with sha1 revision_sha1_git.
Args:
origin_id: origin of the revision
branch_name: revision's branch
timestamp: revision's time frame
limit: the maximum number of revisions returned
Returns:
Revision information as dict.
Raises:
NotFoundExc if no revision corresponds to the criterion
NotFoundExc if the corresponding revision has no log
"""
revision_entries = backend.revision_log_by(origin_id,
branch_name,
timestamp,
limit)
if not revision_entries:
return None
return map(converters.from_revision, revision_entries)
def lookup_revision_with_context_by(origin_id, branch_name, ts, sha1_git,
limit=100):
"""Return information about revision sha1_git, limited to the
sub-graph of all transitive parents of sha1_git_root.
sha1_git_root being resolved through the lookup of a revision by origin_id,
branch_name and ts.
In other words, sha1_git is an ancestor of sha1_git_root.
Args:
- origin_id: origin of the revision.
- branch_name: revision's branch.
- timestamp: revision's time frame.
- sha1_git: one of sha1_git_root's ancestors.
- limit: limit the lookup to 100 revisions back.
Returns:
Pair of (root_revision, revision).
Information on sha1_git if it is an ancestor of sha1_git_root
including children leading to sha1_git_root
Raises:
- BadInputExc in case of unknown algo_hash or bad hash.
- NotFoundExc if either revision is not found or if sha1_git is not an
ancestor of sha1_git_root.
"""
rev_root = backend.revision_get_by(origin_id, branch_name, ts)
if not rev_root:
raise NotFoundExc('Revision with (origin_id: %s, branch_name: %s'
', ts: %s) not found.' % (origin_id,
branch_name,
ts))
return (converters.from_revision(rev_root),
lookup_revision_with_context(rev_root, sha1_git, limit))
def lookup_revision_with_context(sha1_git_root, sha1_git, limit=100):
"""Return information about revision sha1_git, limited to the
sub-graph of all transitive parents of sha1_git_root.
In other words, sha1_git is an ancestor of sha1_git_root.
Args:
sha1_git_root: latest revision. The type is either a sha1 (as an hex
string) or a non converted dict.
sha1_git: one of sha1_git_root's ancestors
limit: limit the lookup to 100 revisions back
Returns:
Information on sha1_git if it is an ancestor of sha1_git_root
including children leading to sha1_git_root
Raises:
BadInputExc in case of unknown algo_hash or bad hash
NotFoundExc if either revision is not found or if sha1_git is not an
ancestor of sha1_git_root
"""
_, sha1_git_bin = query.parse_hash_with_algorithms_or_throws(
sha1_git,
['sha1'],
'Only sha1_git is supported.')
revision = backend.revision_get(sha1_git_bin)
if not revision:
raise NotFoundExc('Revision %s not found' % sha1_git)
if isinstance(sha1_git_root, str):
_, sha1_git_root_bin = query.parse_hash_with_algorithms_or_throws(
sha1_git_root,
['sha1'],
'Only sha1_git is supported.')
revision_root = backend.revision_get(sha1_git_root_bin)
if not revision_root:
raise NotFoundExc('Revision root %s not found' % sha1_git_root)
else:
sha1_git_root_bin = sha1_git_root['id']
revision_log = backend.revision_log(sha1_git_root_bin, limit)
parents = {}
children = defaultdict(list)
for rev in revision_log:
rev_id = rev['id']
parents[rev_id] = []
for parent_id in rev['parents']:
parents[rev_id].append(parent_id)
children[parent_id].append(rev_id)
if revision['id'] not in parents:
raise NotFoundExc('Revision %s is not an ancestor of %s' %
(sha1_git, sha1_git_root))
revision['children'] = children[revision['id']]
return converters.from_revision(revision)
def lookup_directory_with_revision(sha1_git, dir_path=None, with_data=False):
"""Return information on directory pointed by revision with sha1_git.
If dir_path is not provided, display top level directory.
Otherwise, display the directory pointed by dir_path (if it exists).
Args:
sha1_git: revision's hash.
dir_path: optional directory pointed to by that revision.
with_data: boolean that indicates to retrieve the raw data if the path
resolves to a content. Default to False (for the api)
Returns:
Information on the directory pointed to by that revision.
Raises:
BadInputExc in case of unknown algo_hash or bad hash.
NotFoundExc either if the revision is not found or the path referenced
does not exist.
NotImplementedError in case of dir_path exists but do not reference a
type 'dir' or 'file'.
"""
_, sha1_git_bin = query.parse_hash_with_algorithms_or_throws(
sha1_git,
['sha1'],
'Only sha1_git is supported.')
revision = backend.revision_get(sha1_git_bin)
if not revision:
raise NotFoundExc('Revision %s not found' % sha1_git)
dir_sha1_git_bin = revision['directory']
if dir_path:
entity = backend.directory_entry_get_by_path(dir_sha1_git_bin,
dir_path)
if not entity:
raise NotFoundExc(
"Directory or File '%s' pointed to by revision %s not found"
% (dir_path, sha1_git))
else:
entity = {'type': 'dir', 'target': dir_sha1_git_bin}
if entity['type'] == 'dir':
directory_entries = backend.directory_ls(entity['target'])
return {'type': 'dir',
'path': '.' if not dir_path else dir_path,
'revision': sha1_git,
'content': map(converters.from_directory_entry,
directory_entries)}
elif entity['type'] == 'file': # content
content = backend.content_find('sha1_git', entity['target'])
if with_data:
content['data'] = backend.content_get(content['sha1'])['data']
return {'type': 'file',
'path': '.' if not dir_path else dir_path,
'revision': sha1_git,
'content': converters.from_content(content)}
else:
raise NotImplementedError('Entity of type %s not implemented.'
% entity['type'])
def lookup_content(q):
"""Lookup the content designed by q.
Args:
q: The release's sha1 as hexadecimal
"""
algo, hash = query.parse_hash(q)
c = backend.content_find(algo, hash)
return converters.from_content(c)
def lookup_content_raw(q):
"""Lookup the content defined by q.
Args:
q: query string of the form <hash_algo:hash>
Returns:
dict with 'sha1' and 'data' keys.
data representing its raw data decoded.
"""
algo, hash = query.parse_hash(q)
c = backend.content_find(algo, hash)
if not c:
return None
content = backend.content_get(c['sha1'])
return converters.from_content(content)
def stat_counters():
"""Return the stat counters for Software Heritage
Returns:
A dict mapping textual labels to integer values.
"""
return backend.stat_counters()
def lookup_origin_visits(origin_id):
"""Yields the origin origin_ids' visits.
Args:
origin_id: origin to list visits for
Yields:
Dictionaries of origin_visit for that origin
"""
- for visit in backend.lookup_origin_visits(origin_id):
+ visits = backend.lookup_origin_visits(origin_id)
+ for visit in visits:
yield converters.from_origin_visit(visit)
+def lookup_origin_visit(origin_id, visit_id):
+ """Return information about visit visit_id with origin origin_id.
+
+ Args:
+ origin_id: origin concerned by the visit
+ visit_id: the visit identifier to lookup
+
+ Yields:
+ The dict origin_visit concerned
+
+ """
+ visit = backend.lookup_origin_visit(origin_id, visit_id)
+ return converters.from_origin_visit(visit)
+
+
def lookup_entity_by_uuid(uuid):
"""Return the entity's hierarchy from its uuid.
Args:
uuid: entity's identifier.
Returns:
List of hierarchy entities from the entity with uuid.
"""
uuid = query.parse_uuid4(uuid)
return backend.entity_get(uuid)
def lookup_revision_through(revision, limit=100):
"""Retrieve a revision from the criterion stored in revision dictionary.
Args:
revision: Dictionary of criterion to lookup the revision with.
Here are the supported combination of possible values:
- origin_id, branch_name, ts, sha1_git
- origin_id, branch_name, ts
- sha1_git_root, sha1_git
- sha1_git
Returns:
None if the revision is not found or the actual revision.
"""
if 'origin_id' in revision and \
'branch_name' in revision and \
'ts' in revision and \
'sha1_git' in revision:
return lookup_revision_with_context_by(revision['origin_id'],
revision['branch_name'],
revision['ts'],
revision['sha1_git'],
limit)
if 'origin_id' in revision and \
'branch_name' in revision and \
'ts' in revision:
return lookup_revision_by(revision['origin_id'],
revision['branch_name'],
revision['ts'])
if 'sha1_git_root' in revision and \
'sha1_git' in revision:
return lookup_revision_with_context(revision['sha1_git_root'],
revision['sha1_git'],
limit)
if 'sha1_git' in revision:
return lookup_revision(revision['sha1_git'])
# this should not happen
raise NotImplementedError('Should not happen!')
def lookup_directory_through_revision(revision, path=None,
limit=100, with_data=False):
"""Retrieve the directory information from the revision.
Args:
revision: dictionary of criterion representing a revision to lookup
path: directory's path to lookup.
limit: optional query parameter to limit the revisions log.
(default to 100). For now, note that this limit could impede the
transitivity conclusion about sha1_git not being an ancestor of.
with_data: indicate to retrieve the content's raw data if path resolves
to a content.
Returns:
The directory pointing to by the revision criterions at path.
"""
rev = lookup_revision_through(revision, limit)
if not rev:
raise NotFoundExc('Revision with criterion %s not found!' % revision)
return (rev['id'],
lookup_directory_with_revision(rev['id'], path, with_data))
diff --git a/swh/web/ui/tests/test_backend.py b/swh/web/ui/tests/test_backend.py
index 05fe58ea..f855c621 100644
--- a/swh/web/ui/tests/test_backend.py
+++ b/swh/web/ui/tests/test_backend.py
@@ -1,721 +1,754 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
from nose.tools import istest
from unittest.mock import MagicMock
from swh.core import hashutil
from swh.web.ui import backend
from swh.web.ui.tests import test_app
class BackendTestCase(test_app.SWHApiTestCase):
+ def setUp(self):
+ self.origin_visit1 = {
+ 'date': datetime.datetime(
+ 2015, 1, 1, 22, 0, 0,
+ tzinfo=datetime.timezone.utc),
+ 'origin': 1,
+ 'visit': 1
+ }
+
@istest
def content_get_ko_not_found_1(self):
# given
sha1_bin = hashutil.hex_to_hash(
'456caf10e9535160d90e874b45aa426de762f777')
self.storage.content_get = MagicMock(return_value=None)
# when
actual_content = backend.content_get(sha1_bin)
# then
self.assertIsNone(actual_content)
self.storage.content_get.assert_called_once_with(
[sha1_bin])
@istest
def content_get_ko_not_found_empty_result(self):
# given
sha1_bin = hashutil.hex_to_hash(
'456caf10e9535160d90e874b45aa426de762f19f')
self.storage.content_get = MagicMock(return_value=[])
# when
actual_content = backend.content_get(sha1_bin)
# then
self.assertIsNone(actual_content)
self.storage.content_get.assert_called_once_with(
[sha1_bin])
@istest
def content_get(self):
# given
sha1_bin = hashutil.hex_to_hash(
'123caf10e9535160d90e874b45aa426de762f19f')
stub_contents = [{
'sha1': sha1_bin,
'data': b'binary data',
},
{}]
self.storage.content_get = MagicMock(return_value=stub_contents)
# when
actual_content = backend.content_get(sha1_bin)
# then
self.assertEquals(actual_content, stub_contents[0])
self.storage.content_get.assert_called_once_with(
[sha1_bin])
@istest
def content_find_ko_no_result(self):
# given
sha1_bin = hashutil.hex_to_hash(
'123caf10e9535160d90e874b45aa426de762f19f')
self.storage.content_find = MagicMock(return_value=None)
# when
actual_lookup = backend.content_find('sha1_git', sha1_bin)
# then
self.assertIsNone(actual_lookup)
self.storage.content_find.assert_called_once_with(
{'sha1_git': sha1_bin})
@istest
def content_find(self):
# given
sha1_bin = hashutil.hex_to_hash(
'456caf10e9535160d90e874b45aa426de762f19f')
self.storage.content_find = MagicMock(return_value=(1, 2, 3))
# when
actual_content = backend.content_find('sha1', sha1_bin)
# then
self.assertEquals(actual_content, (1, 2, 3))
# check the function has been called with parameters
self.storage.content_find.assert_called_with({'sha1': sha1_bin})
@istest
def content_find_provenance_ko_no_result(self):
# given
sha1_bin = hashutil.hex_to_hash(
'123caf10e9535160d90e874b45aa426de762f19f')
self.storage.content_find_provenance = MagicMock(
return_value=(x for x in []))
# when
actual_lookup = backend.content_find_provenance('sha1_git', sha1_bin)
# then
self.assertEquals(list(actual_lookup), [])
self.storage.content_find_provenance.assert_called_once_with(
{'sha1_git': sha1_bin})
@istest
def content_find_provenance(self):
# given
sha1_bin = hashutil.hex_to_hash(
'456caf10e9535160d90e874b45aa426de762f19f')
self.storage.content_find_provenance = MagicMock(
return_value=(x for x in (1, 2, 3)))
# when
actual_content = backend.content_find_provenance('sha1', sha1_bin)
# then
self.assertEquals(list(actual_content), [1, 2, 3])
# check the function has been called with parameters
self.storage.content_find_provenance.assert_called_with(
{'sha1': sha1_bin})
@istest
def content_missing_per_sha1_none(self):
# given
sha1s_bin = [hashutil.hex_to_hash(
'456caf10e9535160d90e874b45aa426de762f19f'),
hashutil.hex_to_hash(
'745bab676c8f3cec8016e0c39ea61cf57e518865'
)]
self.storage.content_missing_per_sha1 = MagicMock(return_value=[])
# when
actual_content = backend.content_missing_per_sha1(sha1s_bin)
# then
self.assertEquals(actual_content, [])
self.storage.content_missing_per_sha1.assert_called_with(sha1s_bin)
@istest
def content_missing_per_sha1_some(self):
# given
sha1s_bin = [hashutil.hex_to_hash(
'456caf10e9535160d90e874b45aa426de762f19f'),
hashutil.hex_to_hash(
'745bab676c8f3cec8016e0c39ea61cf57e518865'
)]
self.storage.content_missing_per_sha1 = MagicMock(return_value=[
hashutil.hex_to_hash(
'745bab676c8f3cec8016e0c39ea61cf57e518865'
)])
# when
actual_content = backend.content_missing_per_sha1(sha1s_bin)
# then
self.assertEquals(actual_content, [hashutil.hex_to_hash(
'745bab676c8f3cec8016e0c39ea61cf57e518865'
)])
self.storage.content_missing_per_sha1.assert_called_with(sha1s_bin)
@istest
def origin_get_by_id(self):
# given
self.storage.origin_get = MagicMock(return_value={
'id': 'origin-id',
'lister': 'uuid-lister',
'project': 'uuid-project',
'url': 'ftp://some/url/to/origin',
'type': 'ftp'})
# when
actual_origin = backend.origin_get({'id': 'origin-id'})
# then
self.assertEqual(actual_origin, {'id': 'origin-id',
'lister': 'uuid-lister',
'project': 'uuid-project',
'url': 'ftp://some/url/to/origin',
'type': 'ftp'})
self.storage.origin_get.assert_called_with({'id': 'origin-id'})
@istest
def origin_get_by_type_url(self):
# given
self.storage.origin_get = MagicMock(return_value={
'id': 'origin-id',
'lister': 'uuid-lister',
'project': 'uuid-project',
'url': 'ftp://some/url/to/origin',
'type': 'ftp'})
# when
actual_origin = backend.origin_get({'type': 'ftp',
'url': 'ftp://some/url/to/origin'})
# then
self.assertEqual(actual_origin, {'id': 'origin-id',
'lister': 'uuid-lister',
'project': 'uuid-project',
'url': 'ftp://some/url/to/origin',
'type': 'ftp'})
self.storage.origin_get.assert_called_with(
{'type': 'ftp',
'url': 'ftp://some/url/to/origin'})
@istest
def person_get(self):
# given
self.storage.person_get = MagicMock(return_value=[{
'id': 'person-id',
'name': 'blah'}])
# when
actual_person = backend.person_get('person-id')
# then
self.assertEqual(actual_person, {'id': 'person-id',
'name': 'blah'})
self.storage.person_get.assert_called_with(['person-id'])
@istest
def directory_get_not_found(self):
# given
sha1_bin = hashutil.hex_to_hash(
'40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
self.storage.directory_get = MagicMock(return_value=None)
# when
actual_directory = backend.directory_get(sha1_bin)
# then
self.assertEquals(actual_directory, None)
self.storage.directory_get.assert_called_with([sha1_bin])
@istest
def directory_get(self):
# given
sha1_bin = hashutil.hex_to_hash(
'51f71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
sha1_bin2 = hashutil.hex_to_hash(
'62071b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
stub_dir = {'id': sha1_bin, 'revision': b'sha1-blah'}
stub_dir2 = {'id': sha1_bin2, 'revision': b'sha1-foobar'}
self.storage.directory_get = MagicMock(return_value=[stub_dir,
stub_dir2])
# when
actual_directory = backend.directory_get(sha1_bin)
# then
self.assertEquals(actual_directory, stub_dir)
self.storage.directory_get.assert_called_with([sha1_bin])
@istest
def directory_ls_empty_result(self):
# given
sha1_bin = hashutil.hex_to_hash(
'40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
self.storage.directory_ls = MagicMock(return_value=[])
# when
actual_directory = backend.directory_ls(sha1_bin)
# then
self.assertEquals(actual_directory, [])
self.storage.directory_ls.assert_called_with(sha1_bin, False)
@istest
def directory_ls(self):
# given
sha1_bin = hashutil.hex_to_hash(
'40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
stub_dir_entries = [{
'sha1': hashutil.hex_to_hash('5c6f0e2750f48fa0bd0c4cf5976ba0b9e0'
'2ebda5'),
'sha256': hashutil.hex_to_hash('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926'),
'sha1_git': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'target': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'dir_id': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'name': b'bob',
'type': 10,
}]
self.storage.directory_ls = MagicMock(
return_value=stub_dir_entries)
actual_directory = backend.directory_ls(sha1_bin, recursive=True)
# then
self.assertIsNotNone(actual_directory)
self.assertEqual(list(actual_directory), stub_dir_entries)
self.storage.directory_ls.assert_called_with(sha1_bin, True)
@istest
def release_get_not_found(self):
# given
sha1_bin = hashutil.hex_to_hash(
'65a55bbdf3629f916219feb3dcc7393ded1bc8db')
self.storage.release_get = MagicMock(return_value=[])
# when
actual_release = backend.release_get(sha1_bin)
# then
self.assertIsNone(actual_release)
self.storage.release_get.assert_called_with([sha1_bin])
@istest
def release_get(self):
# given
sha1_bin = hashutil.hex_to_hash(
'65a55bbdf3629f916219feb3dcc7393ded1bc8db')
stub_releases = [{
'id': sha1_bin,
'target': None,
'date': datetime.datetime(2015, 1, 1, 22, 0, 0,
tzinfo=datetime.timezone.utc),
'name': b'v0.0.1',
'message': b'synthetic release',
'synthetic': True,
}]
self.storage.release_get = MagicMock(return_value=stub_releases)
# when
actual_release = backend.release_get(sha1_bin)
# then
self.assertEqual(actual_release, stub_releases[0])
self.storage.release_get.assert_called_with([sha1_bin])
@istest
def revision_get_by_not_found(self):
# given
self.storage.revision_get_by = MagicMock(return_value=[])
# when
actual_revision = backend.revision_get_by(10, 'master', 'ts2')
# then
self.assertIsNone(actual_revision)
self.storage.revision_get_by.assert_called_with(10, 'master',
timestamp='ts2',
limit=1)
@istest
def revision_get_by(self):
# given
self.storage.revision_get_by = MagicMock(return_value=[{'id': 1}])
# when
actual_revisions = backend.revision_get_by(100, 'dev', 'ts')
# then
self.assertEquals(actual_revisions, {'id': 1})
self.storage.revision_get_by.assert_called_with(100, 'dev',
timestamp='ts',
limit=1)
@istest
def revision_get_not_found(self):
# given
sha1_bin = hashutil.hex_to_hash(
'18d8be353ed3480476f032475e7c233eff7371d5')
self.storage.revision_get = MagicMock(return_value=[])
# when
actual_revision = backend.revision_get(sha1_bin)
# then
self.assertIsNone(actual_revision)
self.storage.revision_get.assert_called_with([sha1_bin])
@istest
def revision_get(self):
# given
sha1_bin = hashutil.hex_to_hash(
'18d8be353ed3480476f032475e7c233eff7371d5')
stub_revisions = [{
'id': sha1_bin,
'directory': hashutil.hex_to_hash(
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'),
'author': {
'name': b'bill & boule',
'email': b'bill@boule.org',
},
'committer': {
'name': b'boule & bill',
'email': b'boule@bill.org',
},
'message': b'elegant fix for bug 31415957',
'date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'date_offset': 0,
'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
}]
self.storage.revision_get = MagicMock(return_value=stub_revisions)
# when
actual_revision = backend.revision_get(sha1_bin)
# then
self.assertEqual(actual_revision, stub_revisions[0])
self.storage.revision_get.assert_called_with([sha1_bin])
@istest
def revision_get_multiple(self):
# given
sha1_bin = hashutil.hex_to_hash(
'18d8be353ed3480476f032475e7c233eff7371d5')
sha1_other = hashutil.hex_to_hash(
'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc')
stub_revisions = [
{
'id': sha1_bin,
'directory': hashutil.hex_to_hash(
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'),
'author': {
'name': b'bill & boule',
'email': b'bill@boule.org',
},
'committer': {
'name': b'boule & bill',
'email': b'boule@bill.org',
},
'message': b'elegant fix for bug 31415957',
'date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'date_offset': 0,
'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
},
{
'id': sha1_other,
'directory': hashutil.hex_to_hash(
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'),
'author': {
'name': b'name',
'email': b'name@surname.org',
},
'committer': {
'name': b'name',
'email': b'name@surname.org',
},
'message': b'ugly fix for bug 42',
'date': datetime.datetime(2000, 1, 12, 5, 23, 54),
'date_offset': 0,
'committer_date': datetime.datetime(2000, 1, 12, 5, 23, 54),
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
}
]
self.storage.revision_get = MagicMock(
return_value=stub_revisions)
# when
actual_revision = backend.revision_get_multiple([sha1_bin, sha1_other])
# then
self.assertEqual(actual_revision, stub_revisions)
self.storage.revision_get.assert_called_with(
[sha1_bin, sha1_other])
@istest
def revision_get_multiple_none_found(self):
# given
sha1_bin = hashutil.hex_to_hash(
'18d8be353ed3480476f032475e7c233eff7371d5')
sha1_other = hashutil.hex_to_hash(
'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc')
self.storage.revision_get = MagicMock(
return_value=[])
# when
actual_revision = backend.revision_get_multiple([sha1_bin, sha1_other])
# then
self.assertEqual(actual_revision, [])
self.storage.revision_get.assert_called_with(
[sha1_bin, sha1_other])
@istest
def revision_log(self):
# given
sha1_bin = hashutil.hex_to_hash(
'28d8be353ed3480476f032475e7c233eff7371d5')
stub_revision_log = [{
'id': sha1_bin,
'directory': hashutil.hex_to_hash(
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'),
'author': {
'name': b'bill & boule',
'email': b'bill@boule.org',
},
'committer': {
'name': b'boule & bill',
'email': b'boule@bill.org',
},
'message': b'elegant fix for bug 31415957',
'date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'date_offset': 0,
'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
}]
self.storage.revision_log = MagicMock(return_value=stub_revision_log)
# when
actual_revision = backend.revision_log(sha1_bin, limit=1)
# then
self.assertEqual(list(actual_revision), stub_revision_log)
self.storage.revision_log.assert_called_with([sha1_bin], 1)
@istest
def revision_log_by(self):
# given
sha1_bin = hashutil.hex_to_hash(
'28d8be353ed3480476f032475e7c233eff7371d5')
stub_revision_log = [{
'id': sha1_bin,
'directory': hashutil.hex_to_hash(
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'),
'author': {
'name': b'bill & boule',
'email': b'bill@boule.org',
},
'committer': {
'name': b'boule & bill',
'email': b'boule@bill.org',
},
'message': b'elegant fix for bug 31415957',
'date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'date_offset': 0,
'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
}]
self.storage.revision_log_by = MagicMock(
return_value=stub_revision_log)
# when
actual_log = backend.revision_log_by(1, 'refs/heads/master',
None, limit=1)
# then
self.assertEqual(actual_log, stub_revision_log)
self.storage.revision_log.assert_called_with([sha1_bin], 1)
@istest
def revision_log_by_norev(self):
# given
sha1_bin = hashutil.hex_to_hash(
'28d8be353ed3480476f032475e7c233eff7371d5')
self.storage.revision_log_by = MagicMock(return_value=None)
# when
actual_log = backend.revision_log_by(1, 'refs/heads/master',
None, limit=1)
# then
self.assertEqual(actual_log, None)
self.storage.revision_log.assert_called_with([sha1_bin], 1)
@istest
def stat_counters(self):
# given
input_stats = {
"content": 1770830,
"directory": 211683,
"directory_entry_dir": 209167,
"directory_entry_file": 1807094,
"directory_entry_rev": 0,
"entity": 0,
"entity_history": 0,
"occurrence": 0,
"occurrence_history": 19600,
"origin": 1096,
"person": 0,
"release": 8584,
"revision": 7792,
"revision_history": 0,
"skipped_content": 0
}
self.storage.stat_counters = MagicMock(return_value=input_stats)
# when
actual_stats = backend.stat_counters()
# then
expected_stats = input_stats
self.assertEqual(actual_stats, expected_stats)
self.storage.stat_counters.assert_called_with()
@istest
def lookup_origin_visits(self):
# given
- expected_dates = [{
- 'date': datetime.datetime(
- 2015, 1, 1, 22, 0, 0,
- tzinfo=datetime.timezone.utc),
- 'origin': 1,
- 'visit': 1
- }, {
- 'date': datetime.datetime(
- 2013, 7, 1, 20, 0, 0,
- tzinfo=datetime.timezone.utc),
- 'origin': 1,
- 'visit': 2
- }, {
- 'date': datetime.datetime(
- 2015, 1, 1, 21, 0, 0,
- tzinfo=datetime.timezone.utc),
- 'origin': 1,
- 'visit': 3
- }]
- self.storage.origin_visit_get = MagicMock(return_value=expected_dates)
+ expected_origin_visits = [
+ self.origin_visit1, {
+ 'date': datetime.datetime(
+ 2013, 7, 1, 20, 0, 0,
+ tzinfo=datetime.timezone.utc),
+ 'origin': 1,
+ 'visit': 2
+ }, {
+ 'date': datetime.datetime(
+ 2015, 1, 1, 21, 0, 0,
+ tzinfo=datetime.timezone.utc),
+ 'origin': 1,
+ 'visit': 3
+ }]
+ self.storage.origin_visit_get = MagicMock(
+ return_value=expected_origin_visits)
# when
- actual_dates = backend.lookup_origin_visits(5)
+ actual_origin_visits = backend.lookup_origin_visits(5)
# then
- self.assertEqual(list(actual_dates), expected_dates)
+ self.assertEqual(list(actual_origin_visits), expected_origin_visits)
self.storage.origin_visit_get.assert_called_with(5)
+ @istest
+ def lookup_origin_visit(self):
+ # given
+ self.storage.origin_visit_get_by = MagicMock(
+ return_value=self.origin_visit1)
+
+ # when
+ actual_origin_visit = backend.lookup_origin_visit(10, 1)
+
+ # then
+ self.assertEqual(actual_origin_visit, self.origin_visit1)
+
+ self.storage.origin_visit_get_by.assert_called_with(10, 1)
+
+ @istest
+ def lookup_origin_visit_None(self):
+ # given
+ self.storage.origin_visit_get_by = MagicMock(
+ return_value=None)
+
+ # when
+ actual_origin_visit = backend.lookup_origin_visit(1, 10)
+
+ # then
+ self.assertIsNone(actual_origin_visit)
+
+ self.storage.origin_visit_get_by.assert_called_with(1, 10)
+
@istest
def directory_entry_get_by_path(self):
# given
stub_dir_entry = {'id': b'dir-id',
'type': 'dir',
'name': b'some/path/foo'}
self.storage.directory_entry_get_by_path = MagicMock(
return_value=stub_dir_entry)
# when
actual_dir_entry = backend.directory_entry_get_by_path(b'dir-sha1',
'some/path/foo')
self.assertEquals(actual_dir_entry, stub_dir_entry)
self.storage.directory_entry_get_by_path.assert_called_once_with(
b'dir-sha1',
[b'some', b'path', b'foo'])
@istest
def entity_get(self):
# given
stub_entities = [{'uuid': 'e8c3fc2e-a932-4fd7-8f8e-c40645eb35a7',
'parent': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2'},
{'uuid': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2',
'parent': None}]
self.storage.entity_get = MagicMock(return_value=stub_entities)
# when
actual_entities = backend.entity_get(
'e8c3fc2e-a932-4fd7-8f8e-c40645eb35a7')
# then
self.assertEquals(actual_entities, stub_entities)
self.storage.entity_get.assert_called_once_with(
'e8c3fc2e-a932-4fd7-8f8e-c40645eb35a7')
diff --git a/swh/web/ui/tests/test_service.py b/swh/web/ui/tests/test_service.py
index c03627da..365016a8 100644
--- a/swh/web/ui/tests/test_service.py
+++ b/swh/web/ui/tests/test_service.py
@@ -1,1726 +1,1749 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
from nose.tools import istest
from unittest.mock import MagicMock, patch, call
from swh.core.hashutil import hex_to_hash, hash_to_hex
from swh.web.ui import service
from swh.web.ui.exc import BadInputExc, NotFoundExc
from swh.web.ui.tests import test_app
class ServiceTestCase(test_app.SWHApiTestCase):
def setUp(self):
self.SHA1_SAMPLE = '18d8be353ed3480476f032475e7c233eff7371d5'
self.SHA1_SAMPLE_BIN = hex_to_hash(self.SHA1_SAMPLE)
self.SHA256_SAMPLE = ('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926')
self.SHA256_SAMPLE_BIN = hex_to_hash(self.SHA256_SAMPLE)
self.SHA1GIT_SAMPLE = '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'
self.SHA1GIT_SAMPLE_BIN = hex_to_hash(self.SHA1GIT_SAMPLE)
self.DIRECTORY_ID = '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'
self.DIRECTORY_ID_BIN = hex_to_hash(self.DIRECTORY_ID)
self.AUTHOR_ID_BIN = {
'name': b'author',
'email': b'author@company.org',
}
self.AUTHOR_ID = {
'name': 'author',
'email': 'author@company.org',
}
self.COMMITTER_ID_BIN = {
'name': b'committer',
'email': b'committer@corp.org',
}
self.COMMITTER_ID = {
'name': 'committer',
'email': 'committer@corp.org',
}
self.SAMPLE_DATE_RAW = {
'timestamp': datetime.datetime(
2000, 1, 17, 11, 23, 54,
tzinfo=datetime.timezone.utc,
).timestamp(),
'offset': 0,
'negative_utc': False,
}
self.SAMPLE_DATE = '2000-01-17T11:23:54+00:00'
self.SAMPLE_MESSAGE_BIN = b'elegant fix for bug 31415957'
self.SAMPLE_MESSAGE = 'elegant fix for bug 31415957'
self.SAMPLE_REVISION = {
'id': self.SHA1_SAMPLE,
'directory': self.DIRECTORY_ID,
'author': self.AUTHOR_ID,
'committer': self.COMMITTER_ID,
'message': self.SAMPLE_MESSAGE,
'date': self.SAMPLE_DATE,
'committer_date': self.SAMPLE_DATE,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
'merge': False
}
self.SAMPLE_REVISION_RAW = {
'id': self.SHA1_SAMPLE_BIN,
'directory': self.DIRECTORY_ID_BIN,
'author': self.AUTHOR_ID_BIN,
'committer': self.COMMITTER_ID_BIN,
'message': self.SAMPLE_MESSAGE_BIN,
'date': self.SAMPLE_DATE_RAW,
'committer_date': self.SAMPLE_DATE_RAW,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
}
self.SAMPLE_CONTENT = {
'sha1': self.SHA1_SAMPLE,
'sha256': self.SHA256_SAMPLE,
'sha1_git': self.SHA1GIT_SAMPLE,
'length': 190,
'status': 'absent'
}
self.SAMPLE_CONTENT_RAW = {
'sha1': self.SHA1_SAMPLE_BIN,
'sha256': self.SHA256_SAMPLE_BIN,
'sha1_git': self.SHA1GIT_SAMPLE_BIN,
'length': 190,
'status': 'hidden'
}
+ self.date_origin_visit1 = datetime.datetime(
+ 2015, 1, 1, 22, 0, 0,
+ tzinfo=datetime.timezone.utc)
+
+ self.origin_visit1 = {
+ 'date': self.date_origin_visit1,
+ 'origin': 1,
+ 'visit': 1
+ }
+
@patch('swh.web.ui.service.backend')
@istest
def lookup_multiple_hashes_ball_missing(self, mock_backend):
# given
mock_backend.content_missing_per_sha1 = MagicMock(return_value=[])
# when
actual_lookup = service.lookup_multiple_hashes(
[{'filename': 'a',
'sha1': '456caf10e9535160d90e874b45aa426de762f19f'},
{'filename': 'b',
'sha1': '745bab676c8f3cec8016e0c39ea61cf57e518865'}])
# then
self.assertEquals(actual_lookup, [
{'filename': 'a',
'sha1': '456caf10e9535160d90e874b45aa426de762f19f',
'found': True},
{'filename': 'b',
'sha1': '745bab676c8f3cec8016e0c39ea61cf57e518865',
'found': True}
])
@patch('swh.web.ui.service.backend')
@istest
def lookup_multiple_hashes_some_missing(self, mock_backend):
# given
mock_backend.content_missing_per_sha1 = MagicMock(return_value=[
hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')
])
# when
actual_lookup = service.lookup_multiple_hashes(
[{'filename': 'a',
'sha1': '456caf10e9535160d90e874b45aa426de762f19f'},
{'filename': 'b',
'sha1': '745bab676c8f3cec8016e0c39ea61cf57e518865'}])
# then
self.assertEquals(actual_lookup, [
{'filename': 'a',
'sha1': '456caf10e9535160d90e874b45aa426de762f19f',
'found': False},
{'filename': 'b',
'sha1': '745bab676c8f3cec8016e0c39ea61cf57e518865',
'found': True}
])
@patch('swh.web.ui.service.backend')
@istest
def lookup_hash_does_not_exist(self, mock_backend):
# given
mock_backend.content_find = MagicMock(return_value=None)
# when
actual_lookup = service.lookup_hash(
'sha1_git:123caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEquals({'found': None,
'algo': 'sha1_git'}, actual_lookup)
# check the function has been called with parameters
mock_backend.content_find.assert_called_with(
'sha1_git',
hex_to_hash('123caf10e9535160d90e874b45aa426de762f19f'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_hash_exist(self, mock_backend):
# given
stub_content = {
'sha1': hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')
}
mock_backend.content_find = MagicMock(return_value=stub_content)
# when
actual_lookup = service.lookup_hash(
'sha1:456caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEquals({'found': stub_content,
'algo': 'sha1'}, actual_lookup)
mock_backend.content_find.assert_called_with(
'sha1',
hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f'),
)
@patch('swh.web.ui.service.backend')
@istest
def search_hash_does_not_exist(self, mock_backend):
# given
mock_backend.content_find = MagicMock(return_value=None)
# when
actual_lookup = service.search_hash(
'sha1_git:123caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEquals({'found': False}, actual_lookup)
# check the function has been called with parameters
mock_backend.content_find.assert_called_with(
'sha1_git',
hex_to_hash('123caf10e9535160d90e874b45aa426de762f19f'))
@patch('swh.web.ui.service.backend')
@istest
def search_hash_exist(self, mock_backend):
# given
stub_content = {
'sha1': hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')
}
mock_backend.content_find = MagicMock(return_value=stub_content)
# when
actual_lookup = service.search_hash(
'sha1:456caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEquals({'found': True}, actual_lookup)
mock_backend.content_find.assert_called_with(
'sha1',
hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f'),
)
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_provenance(self, mock_backend):
# given
mock_backend.content_find_provenance = MagicMock(
return_value=(p for p in [{
'content': hex_to_hash(
'123caf10e9535160d90e874b45aa426de762f19f'),
'revision': hex_to_hash(
'456caf10e9535160d90e874b45aa426de762f19f'),
'origin': 100,
'visit': 1,
'path': b'octavio-3.4.0/octave.html/doc_002dS_005fISREG.html'
}]))
expected_provenances = [{
'content': '123caf10e9535160d90e874b45aa426de762f19f',
'revision': '456caf10e9535160d90e874b45aa426de762f19f',
'origin': 100,
'visit': 1,
'path': 'octavio-3.4.0/octave.html/doc_002dS_005fISREG.html'
}]
# when
actual_provenances = service.lookup_content_provenance(
'sha1_git:123caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEqual(list(actual_provenances), expected_provenances)
mock_backend.content_find_provenance.assert_called_with(
'sha1_git',
hex_to_hash('123caf10e9535160d90e874b45aa426de762f19f'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_provenance_not_found(self, mock_backend):
# given
mock_backend.content_find_provenance = MagicMock(return_value=None)
# when
actual_provenances = service.lookup_content_provenance(
'sha1_git:456caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertIsNone(actual_provenances)
mock_backend.content_find_provenance.assert_called_with(
'sha1_git',
hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f'))
@patch('swh.web.ui.service.backend')
@istest
def stat_counters(self, mock_backend):
# given
input_stats = {
"content": 1770830,
"directory": 211683,
"directory_entry_dir": 209167,
"directory_entry_file": 1807094,
"directory_entry_rev": 0,
"entity": 0,
"entity_history": 0,
"occurrence": 0,
"occurrence_history": 19600,
"origin": 1096,
"person": 0,
"release": 8584,
"revision": 7792,
"revision_history": 0,
"skipped_content": 0
}
mock_backend.stat_counters = MagicMock(return_value=input_stats)
# when
actual_stats = service.stat_counters()
# then
expected_stats = input_stats
self.assertEqual(actual_stats, expected_stats)
mock_backend.stat_counters.assert_called_with()
@patch('swh.web.ui.service.backend')
@istest
def lookup_origin_visits(self, mock_backend):
# given
- date_origin_visit1 = datetime.datetime(
- 2015, 1, 1, 22, 0, 0,
- tzinfo=datetime.timezone.utc)
-
date_origin_visit2 = datetime.datetime(
2013, 7, 1, 20, 0, 0,
tzinfo=datetime.timezone.utc)
date_origin_visit3 = datetime.datetime(
2015, 1, 1, 21, 0, 0,
tzinfo=datetime.timezone.utc)
- stub_result = [{
- 'date': date_origin_visit1,
- 'origin': 1,
- 'visit': 1
- }, {
+ stub_result = [self.origin_visit1, {
'date': date_origin_visit2,
'origin': 1,
'visit': 2
}, {
'date': date_origin_visit3,
'origin': 1,
'visit': 3
}]
mock_backend.lookup_origin_visits.return_value = stub_result
# when
- expected_dates = [{
- 'date': date_origin_visit1.timestamp(),
- 'origin': 1,
- 'visit': 1
+ expected_origin_visits = [{
+ 'date': self.origin_visit1['date'].timestamp(),
+ 'origin': self.origin_visit1['origin'],
+ 'visit': self.origin_visit1['visit']
}, {
'date': date_origin_visit2.timestamp(),
'origin': 1,
'visit': 2
}, {
'date': date_origin_visit3.timestamp(),
'origin': 1,
'visit': 3
}]
- actual_dates = service.lookup_origin_visits(6)
+ actual_origin_visits = service.lookup_origin_visits(6)
# then
- self.assertEqual(list(actual_dates), expected_dates)
+ self.assertEqual(list(actual_origin_visits), expected_origin_visits)
mock_backend.lookup_origin_visits.assert_called_once_with(6)
+ @patch('swh.web.ui.service.backend')
+ @istest
+ def lookup_origin_visit(self, mock_backend):
+ # given
+ stub_result = self.origin_visit1
+ mock_backend.lookup_origin_visit.return_value = stub_result
+
+ expected_origin_visit = {
+ 'date': self.origin_visit1['date'].timestamp(),
+ 'origin': self.origin_visit1['origin'],
+ 'visit': self.origin_visit1['visit']
+ }
+
+ # when
+ actual_origin_visit = service.lookup_origin_visit(1, 1)
+
+ # then
+ self.assertEqual(actual_origin_visit, expected_origin_visit)
+
+ mock_backend.lookup_origin_visit.assert_called_once_with(1, 1)
+
@patch('swh.web.ui.service.backend')
@istest
def lookup_origin(self, mock_backend):
# given
mock_backend.origin_get = MagicMock(return_value={
'id': 'origin-id',
'lister': 'uuid-lister',
'project': 'uuid-project',
'url': 'ftp://some/url/to/origin',
'type': 'ftp'})
# when
actual_origin = service.lookup_origin({'id': 'origin-id'})
# then
self.assertEqual(actual_origin, {'id': 'origin-id',
'lister': 'uuid-lister',
'project': 'uuid-project',
'url': 'ftp://some/url/to/origin',
'type': 'ftp'})
mock_backend.origin_get.assert_called_with({'id': 'origin-id'})
@patch('swh.web.ui.service.backend')
@istest
def lookup_release_ko_id_checksum_not_ok_because_not_a_sha1(self,
mock_backend):
# given
mock_backend.release_get = MagicMock()
with self.assertRaises(BadInputExc) as cm:
# when
service.lookup_release('not-a-sha1')
self.assertIn('invalid checksum', cm.exception.args[0])
mock_backend.release_get.called = False
@patch('swh.web.ui.service.backend')
@istest
def lookup_release_ko_id_checksum_ok_but_not_a_sha1(self, mock_backend):
# given
mock_backend.release_get = MagicMock()
# when
with self.assertRaises(BadInputExc) as cm:
service.lookup_release(
'13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f62d4daf5'
'1aea892abe')
self.assertIn('sha1_git supported', cm.exception.args[0])
mock_backend.release_get.called = False
@patch('swh.web.ui.service.backend')
@istest
def lookup_directory_with_path_not_found(self, mock_backend):
# given
mock_backend.lookup_directory_with_path = MagicMock(return_value=None)
sha1_git = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
# when
actual_directory = mock_backend.lookup_directory_with_path(
sha1_git, 'some/path/here')
self.assertIsNone(actual_directory)
@patch('swh.web.ui.service.backend')
@istest
def lookup_directory_with_path_found(self, mock_backend):
# given
sha1_git = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
entry = {'id': 'dir-id',
'type': 'dir',
'name': 'some/path/foo'}
mock_backend.lookup_directory_with_path = MagicMock(return_value=entry)
# when
actual_directory = mock_backend.lookup_directory_with_path(
sha1_git, 'some/path/here')
self.assertEqual(entry, actual_directory)
@patch('swh.web.ui.service.backend')
@istest
def lookup_release(self, mock_backend):
# given
mock_backend.release_get = MagicMock(return_value={
'id': hex_to_hash('65a55bbdf3629f916219feb3dcc7393ded1bc8db'),
'target': None,
'date': {
'timestamp': datetime.datetime(
2015, 1, 1, 22, 0, 0,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': True,
},
'name': b'v0.0.1',
'message': b'synthetic release',
'synthetic': True,
})
# when
actual_release = service.lookup_release(
'65a55bbdf3629f916219feb3dcc7393ded1bc8db')
# then
self.assertEqual(actual_release, {
'id': '65a55bbdf3629f916219feb3dcc7393ded1bc8db',
'target': None,
'date': '2015-01-01T22:00:00-00:00',
'name': 'v0.0.1',
'message': 'synthetic release',
'synthetic': True,
})
mock_backend.release_get.assert_called_with(
hex_to_hash('65a55bbdf3629f916219feb3dcc7393ded1bc8db'))
@istest
def lookup_revision_with_context_ko_not_a_sha1_1(self):
# given
sha1_git = '13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f62d4' \
'daf51aea892abe'
sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
# when
with self.assertRaises(BadInputExc) as cm:
service.lookup_revision_with_context(sha1_git_root, sha1_git)
self.assertIn('Only sha1_git is supported', cm.exception.args[0])
@istest
def lookup_revision_with_context_ko_not_a_sha1_2(self):
# given
sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
sha1_git = '13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f6' \
'2d4daf51aea892abe'
# when
with self.assertRaises(BadInputExc) as cm:
service.lookup_revision_with_context(sha1_git_root, sha1_git)
self.assertIn('Only sha1_git is supported', cm.exception.args[0])
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_with_context_ko_sha1_git_does_not_exist(
self,
mock_backend):
# given
sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
sha1_git = '777777bdf3629f916219feb3dcc7393ded1bc8db'
sha1_git_bin = hex_to_hash(sha1_git)
mock_backend.revision_get.return_value = None
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_revision_with_context(sha1_git_root, sha1_git)
self.assertIn('Revision 777777bdf3629f916219feb3dcc7393ded1bc8db'
' not found', cm.exception.args[0])
mock_backend.revision_get.assert_called_once_with(
sha1_git_bin)
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_with_context_ko_root_sha1_git_does_not_exist(
self,
mock_backend):
# given
sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
sha1_git = '777777bdf3629f916219feb3dcc7393ded1bc8db'
sha1_git_root_bin = hex_to_hash(sha1_git_root)
sha1_git_bin = hex_to_hash(sha1_git)
mock_backend.revision_get.side_effect = ['foo', None]
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_revision_with_context(sha1_git_root, sha1_git)
self.assertIn('Revision 65a55bbdf3629f916219feb3dcc7393ded1bc8db'
' not found', cm.exception.args[0])
mock_backend.revision_get.assert_has_calls([call(sha1_git_bin),
call(sha1_git_root_bin)])
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_revision_with_context(self, mock_query, mock_backend):
# given
sha1_git_root = '666'
sha1_git = '883'
sha1_git_root_bin = b'666'
sha1_git_bin = b'883'
sha1_git_root_dict = {
'id': sha1_git_root_bin,
'parents': [b'999'],
}
sha1_git_dict = {
'id': sha1_git_bin,
'parents': [],
'directory': b'278',
}
stub_revisions = [
sha1_git_root_dict,
{
'id': b'999',
'parents': [b'777', b'883', b'888'],
},
{
'id': b'777',
'parents': [b'883'],
},
sha1_git_dict,
{
'id': b'888',
'parents': [b'889'],
},
{
'id': b'889',
'parents': [],
},
]
# inputs ok
mock_query.parse_hash_with_algorithms_or_throws.side_effect = [
('sha1', sha1_git_bin),
('sha1', sha1_git_root_bin)
]
# lookup revision first 883, then 666 (both exists)
mock_backend.revision_get.side_effect = [
sha1_git_dict,
sha1_git_root_dict
]
mock_backend.revision_log = MagicMock(
return_value=stub_revisions)
# when
actual_revision = service.lookup_revision_with_context(
sha1_git_root,
sha1_git)
# then
self.assertEquals(actual_revision, {
'id': hash_to_hex(sha1_git_bin),
'parents': [],
'children': [hash_to_hex(b'999'), hash_to_hex(b'777')],
'directory': hash_to_hex(b'278'),
'merge': False
})
mock_query.parse_hash_with_algorithms_or_throws.assert_has_calls(
[call(sha1_git, ['sha1'], 'Only sha1_git is supported.'),
call(sha1_git_root, ['sha1'], 'Only sha1_git is supported.')])
mock_backend.revision_log.assert_called_with(
sha1_git_root_bin, 100)
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_revision_with_context_sha1_git_root_already_retrieved_as_dict(
self, mock_query, mock_backend):
# given
sha1_git = '883'
sha1_git_root_bin = b'666'
sha1_git_bin = b'883'
sha1_git_root_dict = {
'id': sha1_git_root_bin,
'parents': [b'999'],
}
sha1_git_dict = {
'id': sha1_git_bin,
'parents': [],
'directory': b'278',
}
stub_revisions = [
sha1_git_root_dict,
{
'id': b'999',
'parents': [b'777', b'883', b'888'],
},
{
'id': b'777',
'parents': [b'883'],
},
sha1_git_dict,
{
'id': b'888',
'parents': [b'889'],
},
{
'id': b'889',
'parents': [],
},
]
# inputs ok
mock_query.parse_hash_with_algorithms_or_throws.return_value = (
'sha1', sha1_git_bin)
# lookup only on sha1
mock_backend.revision_get.return_value = sha1_git_dict
mock_backend.revision_log.return_value = stub_revisions
# when
actual_revision = service.lookup_revision_with_context(
{'id': sha1_git_root_bin},
sha1_git)
# then
self.assertEquals(actual_revision, {
'id': hash_to_hex(sha1_git_bin),
'parents': [],
'children': [hash_to_hex(b'999'), hash_to_hex(b'777')],
'directory': hash_to_hex(b'278'),
'merge': False
})
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with( # noqa
sha1_git, ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(sha1_git_bin)
mock_backend.revision_log.assert_called_with(
sha1_git_root_bin, 100)
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_ko_revision_not_found(self,
mock_query,
mock_backend):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
b'123')
mock_backend.revision_get.return_value = None
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_directory_with_revision('123')
self.assertIn('Revision 123 not found', cm.exception.args[0])
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_ko_revision_with_path_to_nowhere(
self,
mock_query,
mock_backend):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
b'123')
dir_id = b'dir-id-as-sha1'
mock_backend.revision_get.return_value = {
'directory': dir_id,
}
mock_backend.directory_entry_get_by_path.return_value = None
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_directory_with_revision(
'123',
'path/to/something/unknown')
self.assertIn("Directory/File 'path/to/something/unknown' " +
"pointed to by revision 123 not found",
cm.exception.args[0])
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
mock_backend.directory_entry_get_by_path.assert_called_once_with(
b'dir-id-as-sha1', 'path/to/something/unknown')
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_ko_type_not_implemented(
self,
mock_query,
mock_backend):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
b'123')
dir_id = b'dir-id-as-sha1'
mock_backend.revision_get.return_value = {
'directory': dir_id,
}
mock_backend.directory_entry_get_by_path.return_value = {
'type': 'rev',
'name': b'some/path/to/rev',
'target': b'456'
}
stub_content = {
'id': b'12',
'type': 'file'
}
mock_backend.content_get.return_value = stub_content
# when
with self.assertRaises(NotImplementedError) as cm:
service.lookup_directory_with_revision(
'123',
'some/path/to/rev')
self.assertIn("Entity of type 'rev' not implemented.",
cm.exception.args[0])
# then
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
mock_backend.directory_entry_get_by_path.assert_called_once_with(
b'dir-id-as-sha1', 'some/path/to/rev')
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_revision_without_path(self,
mock_query,
mock_backend):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
b'123')
dir_id = b'dir-id-as-sha1'
mock_backend.revision_get.return_value = {
'directory': dir_id,
}
stub_dir_entries = [{
'id': b'123',
'type': 'dir'
}, {
'id': b'456',
'type': 'file'
}]
mock_backend.directory_ls.return_value = stub_dir_entries
# when
actual_directory_entries = service.lookup_directory_with_revision(
'123')
self.assertEqual(actual_directory_entries['type'], 'dir')
self.assertEqual(list(actual_directory_entries['content']),
stub_dir_entries)
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
mock_backend.directory_ls.assert_called_once_with(dir_id)
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_revision_with_path_to_dir(self,
mock_query,
mock_backend):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
b'123')
dir_id = b'dir-id-as-sha1'
mock_backend.revision_get.return_value = {
'directory': dir_id,
}
stub_dir_entries = [{
'id': b'12',
'type': 'dir'
}, {
'id': b'34',
'type': 'file'
}]
mock_backend.directory_entry_get_by_path.return_value = {
'type': 'dir',
'name': b'some/path',
'target': b'456'
}
mock_backend.directory_ls.return_value = stub_dir_entries
# when
actual_directory_entries = service.lookup_directory_with_revision(
'123',
'some/path')
self.assertEqual(actual_directory_entries['type'], 'dir')
self.assertEqual(actual_directory_entries['revision'], '123')
self.assertEqual(actual_directory_entries['path'], 'some/path')
self.assertEqual(list(actual_directory_entries['content']),
stub_dir_entries)
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
mock_backend.directory_entry_get_by_path.assert_called_once_with(
dir_id,
'some/path')
mock_backend.directory_ls.assert_called_once_with(b'456')
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_revision_with_path_to_file_without_data(
self,
mock_query,
mock_backend):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
b'123')
dir_id = b'dir-id-as-sha1'
mock_backend.revision_get.return_value = {
'directory': dir_id,
}
mock_backend.directory_entry_get_by_path.return_value = {
'type': 'file',
'name': b'some/path/to/file',
'target': b'789'
}
stub_content = {
'status': 'visible',
}
mock_backend.content_find.return_value = stub_content
# when
actual_content = service.lookup_directory_with_revision(
'123',
'some/path/to/file')
# then
self.assertEqual(actual_content, {'type': 'file',
'revision': '123',
'path': 'some/path/to/file',
'content': stub_content})
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
mock_backend.directory_entry_get_by_path.assert_called_once_with(
b'dir-id-as-sha1', 'some/path/to/file')
mock_backend.content_find.assert_called_once_with('sha1_git', b'789')
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_revision_with_path_to_file_with_data(
self,
mock_query,
mock_backend):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
b'123')
dir_id = b'dir-id-as-sha1'
mock_backend.revision_get.return_value = {
'directory': dir_id,
}
mock_backend.directory_entry_get_by_path.return_value = {
'type': 'file',
'name': b'some/path/to/file',
'target': b'789'
}
stub_content = {
'status': 'visible',
'sha1': b'content-sha1'
}
mock_backend.content_find.return_value = stub_content
mock_backend.content_get.return_value = {
'sha1': b'content-sha1',
'data': b'some raw data'
}
expected_content = {
'status': 'visible',
'sha1': hash_to_hex(b'content-sha1'),
'data': b'some raw data'
}
# when
actual_content = service.lookup_directory_with_revision(
'123',
'some/path/to/file',
with_data=True)
# then
self.assertEqual(actual_content, {'type': 'file',
'revision': '123',
'path': 'some/path/to/file',
'content': expected_content})
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
mock_backend.directory_entry_get_by_path.assert_called_once_with(
b'dir-id-as-sha1', 'some/path/to/file')
mock_backend.content_find.assert_called_once_with('sha1_git', b'789')
mock_backend.content_get.assert_called_once_with(b'content-sha1')
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision(self, mock_backend):
# given
mock_backend.revision_get = MagicMock(
return_value=self.SAMPLE_REVISION_RAW)
# when
actual_revision = service.lookup_revision(
self.SHA1_SAMPLE)
# then
self.assertEqual(actual_revision, self.SAMPLE_REVISION)
mock_backend.revision_get.assert_called_with(
self.SHA1_SAMPLE_BIN)
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_invalid_msg(self, mock_backend):
# given
stub_rev = self.SAMPLE_REVISION_RAW
stub_rev['message'] = b'elegant fix for bug \xff'
expected_revision = self.SAMPLE_REVISION
expected_revision['message'] = None
expected_revision['message_decoding_failed'] = True
mock_backend.revision_get = MagicMock(return_value=stub_rev)
# when
actual_revision = service.lookup_revision(
self.SHA1_SAMPLE)
# then
self.assertEqual(actual_revision, expected_revision)
mock_backend.revision_get.assert_called_with(
self.SHA1_SAMPLE_BIN)
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_msg_ok(self, mock_backend):
# given
mock_backend.revision_get.return_value = self.SAMPLE_REVISION_RAW
# when
rv = service.lookup_revision_message(
self.SHA1_SAMPLE)
# then
self.assertEquals(rv, {'message': self.SAMPLE_MESSAGE_BIN})
mock_backend.revision_get.assert_called_with(
self.SHA1_SAMPLE_BIN)
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_msg_absent(self, mock_backend):
# given
stub_revision = self.SAMPLE_REVISION_RAW
del stub_revision['message']
mock_backend.revision_get.return_value = stub_revision
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_revision_message(
self.SHA1_SAMPLE)
# then
mock_backend.revision_get.assert_called_with(
self.SHA1_SAMPLE_BIN)
self.assertEqual(cm.exception.args[0], 'No message for revision '
'with sha1_git '
'18d8be353ed3480476f032475e7c233eff7371d5.')
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_msg_norev(self, mock_backend):
# given
mock_backend.revision_get.return_value = None
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_revision_message(
self.SHA1_SAMPLE)
# then
mock_backend.revision_get.assert_called_with(
self.SHA1_SAMPLE_BIN)
self.assertEqual(cm.exception.args[0], 'Revision with sha1_git '
'18d8be353ed3480476f032475e7c233eff7371d5 '
'not found.')
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_multiple(self, mock_backend):
# given
sha1 = self.SHA1_SAMPLE
sha1_other = 'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'
stub_revisions = [
self.SAMPLE_REVISION_RAW,
{
'id': hex_to_hash(sha1_other),
'directory': 'abcdbe353ed3480476f032475e7c233eff7371d5',
'author': {
'name': b'name',
'email': b'name@surname.org',
},
'committer': {
'name': b'name',
'email': b'name@surname.org',
},
'message': b'ugly fix for bug 42',
'date': {
'timestamp': datetime.datetime(
2000, 1, 12, 5, 23, 54,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': False
},
'date_offset': 0,
'committer_date': {
'timestamp': datetime.datetime(
2000, 1, 12, 5, 23, 54,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': False
},
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
}
]
mock_backend.revision_get_multiple.return_value = stub_revisions
# when
actual_revisions = service.lookup_revision_multiple(
[sha1, sha1_other])
# then
self.assertEqual(list(actual_revisions), [
self.SAMPLE_REVISION,
{
'id': sha1_other,
'directory': 'abcdbe353ed3480476f032475e7c233eff7371d5',
'author': {
'name': 'name',
'email': 'name@surname.org',
},
'committer': {
'name': 'name',
'email': 'name@surname.org',
},
'message': 'ugly fix for bug 42',
'date': '2000-01-12T05:23:54+00:00',
'date_offset': 0,
'committer_date': '2000-01-12T05:23:54+00:00',
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
'merge': False
}
])
self.assertEqual(
list(mock_backend.revision_get_multiple.call_args[0][0]),
[hex_to_hash(sha1),
hex_to_hash(sha1_other)])
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_multiple_none_found(self, mock_backend):
# given
sha1_bin = self.SHA1_SAMPLE
sha1_other = 'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'
mock_backend.revision_get_multiple.return_value = []
# then
actual_revisions = service.lookup_revision_multiple(
[sha1_bin, sha1_other])
self.assertEqual(list(actual_revisions), [])
self.assertEqual(
list(mock_backend.revision_get_multiple.call_args[0][0]),
[hex_to_hash(self.SHA1_SAMPLE),
hex_to_hash(sha1_other)])
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_log(self, mock_backend):
# given
stub_revision_log = [self.SAMPLE_REVISION_RAW]
mock_backend.revision_log = MagicMock(return_value=stub_revision_log)
# when
actual_revision = service.lookup_revision_log(
'abcdbe353ed3480476f032475e7c233eff7371d5',
limit=25)
# then
self.assertEqual(list(actual_revision), [self.SAMPLE_REVISION])
mock_backend.revision_log.assert_called_with(
hex_to_hash('abcdbe353ed3480476f032475e7c233eff7371d5'), 25)
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_log_by(self, mock_backend):
# given
stub_revision_log = [self.SAMPLE_REVISION_RAW]
mock_backend.revision_log_by = MagicMock(
return_value=stub_revision_log)
# when
actual_log = service.lookup_revision_log_by(
1, 'refs/heads/master', None, limit=100)
# then
self.assertEqual(list(actual_log), [self.SAMPLE_REVISION])
mock_backend.revision_log_by.assert_called_with(
1, 'refs/heads/master', None, 100)
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_log_by_nolog(self, mock_backend):
# given
mock_backend.revision_log_by = MagicMock(return_value=None)
# when
res = service.lookup_revision_log_by(
1, 'refs/heads/master', None, limit=100)
# then
self.assertEquals(res, None)
mock_backend.revision_log_by.assert_called_with(
1, 'refs/heads/master', None, 100)
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_raw_not_found(self, mock_backend):
# given
mock_backend.content_find = MagicMock(return_value=None)
# when
actual_content = service.lookup_content_raw(
'sha1:18d8be353ed3480476f032475e7c233eff7371d5')
# then
self.assertIsNone(actual_content)
mock_backend.content_find.assert_called_with(
'sha1', hex_to_hash(self.SHA1_SAMPLE))
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_raw(self, mock_backend):
# given
mock_backend.content_find = MagicMock(return_value={
'sha1': self.SHA1_SAMPLE,
})
mock_backend.content_get = MagicMock(return_value={
'data': b'binary data'})
# when
actual_content = service.lookup_content_raw(
'sha256:%s' % self.SHA256_SAMPLE)
# then
self.assertEquals(actual_content, {'data': b'binary data'})
mock_backend.content_find.assert_called_once_with(
'sha256', self.SHA256_SAMPLE_BIN)
mock_backend.content_get.assert_called_once_with(
self.SHA1_SAMPLE)
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_not_found(self, mock_backend):
# given
mock_backend.content_find = MagicMock(return_value=None)
# when
actual_content = service.lookup_content(
'sha1:%s' % self.SHA1_SAMPLE)
# then
self.assertIsNone(actual_content)
mock_backend.content_find.assert_called_with(
'sha1', self.SHA1_SAMPLE_BIN)
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_with_sha1(self, mock_backend):
# given
mock_backend.content_find = MagicMock(
return_value=self.SAMPLE_CONTENT_RAW)
# when
actual_content = service.lookup_content(
'sha1:%s' % self.SHA1_SAMPLE)
# then
self.assertEqual(actual_content, self.SAMPLE_CONTENT)
mock_backend.content_find.assert_called_with(
'sha1', hex_to_hash(self.SHA1_SAMPLE))
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_with_sha256(self, mock_backend):
# given
stub_content = self.SAMPLE_CONTENT_RAW
stub_content['status'] = 'visible'
expected_content = self.SAMPLE_CONTENT
expected_content['status'] = 'visible'
mock_backend.content_find = MagicMock(
return_value=stub_content)
# when
actual_content = service.lookup_content(
'sha256:%s' % self.SHA256_SAMPLE)
# then
self.assertEqual(actual_content, expected_content)
mock_backend.content_find.assert_called_with(
'sha256', self.SHA256_SAMPLE_BIN)
@patch('swh.web.ui.service.backend')
@istest
def lookup_person(self, mock_backend):
# given
mock_backend.person_get = MagicMock(return_value={
'id': 'person_id',
'name': b'some_name',
'email': b'some-email',
})
# when
actual_person = service.lookup_person('person_id')
# then
self.assertEqual(actual_person, {
'id': 'person_id',
'name': 'some_name',
'email': 'some-email',
})
mock_backend.person_get.assert_called_with('person_id')
@patch('swh.web.ui.service.backend')
@istest
def lookup_directory_bad_checksum(self, mock_backend):
# given
mock_backend.directory_ls = MagicMock()
# when
with self.assertRaises(BadInputExc):
service.lookup_directory('directory_id')
# then
mock_backend.directory_ls.called = False
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_not_found(self, mock_query, mock_backend):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = (
'sha1',
'directory-id-bin')
mock_backend.directory_get.return_value = None
# when
actual_dir = service.lookup_directory('directory_id')
# then
self.assertIsNone(actual_dir)
mock_query.parse_hash_with_algorithms_or_throws.assert_called_with(
'directory_id', ['sha1'], 'Only sha1_git is supported.')
mock_backend.directory_get.assert_called_with('directory-id-bin')
mock_backend.directory_ls.called = False
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory(self, mock_query, mock_backend):
mock_query.parse_hash_with_algorithms_or_throws.return_value = (
'sha1',
'directory-sha1-bin')
# something that exists is all that matters here
mock_backend.directory_get.return_value = {'id': b'directory-sha1-bin'}
# given
stub_dir_entries = [{
'sha1': self.SHA1_SAMPLE_BIN,
'sha256': self.SHA256_SAMPLE_BIN,
'sha1_git': self.SHA1GIT_SAMPLE_BIN,
'target': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'dir_id': self.DIRECTORY_ID_BIN,
'name': b'bob',
'type': 10,
}]
expected_dir_entries = [{
'sha1': self.SHA1_SAMPLE,
'sha256': self.SHA256_SAMPLE,
'sha1_git': self.SHA1GIT_SAMPLE,
'target': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'dir_id': self.DIRECTORY_ID,
'name': 'bob',
'type': 10,
}]
mock_backend.directory_ls.return_value = stub_dir_entries
# when
actual_directory_ls = list(service.lookup_directory(
'directory-sha1'))
# then
self.assertEqual(actual_directory_ls, expected_dir_entries)
mock_query.parse_hash_with_algorithms_or_throws.assert_called_with(
'directory-sha1', ['sha1'], 'Only sha1_git is supported.')
mock_backend.directory_ls.assert_called_with(
'directory-sha1-bin')
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_by_nothing_found(self, mock_backend):
# given
mock_backend.revision_get_by.return_value = None
# when
actual_revisions = service.lookup_revision_by(1)
# then
self.assertIsNone(actual_revisions)
mock_backend.revision_get_by(1, 'master', None)
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_by(self, mock_backend):
# given
stub_rev = self.SAMPLE_REVISION_RAW
expected_rev = self.SAMPLE_REVISION
mock_backend.revision_get_by.return_value = stub_rev
# when
actual_revision = service.lookup_revision_by(10, 'master2', 'some-ts')
# then
self.assertEquals(actual_revision, expected_rev)
mock_backend.revision_get_by(1, 'master2', 'some-ts')
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_by_nomerge(self, mock_backend):
# given
stub_rev = self.SAMPLE_REVISION_RAW
stub_rev['parents'] = [
hex_to_hash('adc83b19e793491b1c6ea0fd8b46cd9f32e592fc')]
expected_rev = self.SAMPLE_REVISION
expected_rev['parents'] = ['adc83b19e793491b1c6ea0fd8b46cd9f32e592fc']
mock_backend.revision_get_by.return_value = stub_rev
# when
actual_revision = service.lookup_revision_by(10, 'master2', 'some-ts')
# then
self.assertEquals(actual_revision, expected_rev)
mock_backend.revision_get_by(1, 'master2', 'some-ts')
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_by_merge(self, mock_backend):
# given
stub_rev = self.SAMPLE_REVISION_RAW
stub_rev['parents'] = [
hex_to_hash('adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'),
hex_to_hash('ffff3b19e793491b1c6db0fd8b46cd9f32e592fc')
]
expected_rev = self.SAMPLE_REVISION
expected_rev['parents'] = [
'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc',
'ffff3b19e793491b1c6db0fd8b46cd9f32e592fc'
]
expected_rev['merge'] = True
mock_backend.revision_get_by.return_value = stub_rev
# when
actual_revision = service.lookup_revision_by(10, 'master2', 'some-ts')
# then
self.assertEquals(actual_revision, expected_rev)
mock_backend.revision_get_by(1, 'master2', 'some-ts')
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_with_context_by_ko(self, mock_backend):
# given
mock_backend.revision_get_by.return_value = None
# when
with self.assertRaises(NotFoundExc) as cm:
origin_id = 1
branch_name = 'master3'
ts = None
service.lookup_revision_with_context_by(origin_id, branch_name, ts,
'sha1')
# then
self.assertIn(
'Revision with (origin_id: %s, branch_name: %s'
', ts: %s) not found.' % (origin_id,
branch_name,
ts), cm.exception.args[0])
mock_backend.revision_get_by.assert_called_once_with(
origin_id, branch_name, ts)
@patch('swh.web.ui.service.lookup_revision_with_context')
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_with_context_by(self, mock_backend,
mock_lookup_revision_with_context):
# given
stub_root_rev = {'id': 'root-rev-id'}
mock_backend.revision_get_by.return_value = {'id': 'root-rev-id'}
stub_rev = {'id': 'rev-found'}
mock_lookup_revision_with_context.return_value = stub_rev
# when
origin_id = 1
branch_name = 'master3'
ts = None
sha1_git = 'sha1'
actual_root_rev, actual_rev = service.lookup_revision_with_context_by(
origin_id, branch_name, ts, sha1_git)
# then
self.assertEquals(actual_root_rev, stub_root_rev)
self.assertEquals(actual_rev, stub_rev)
mock_backend.revision_get_by.assert_called_once_with(
origin_id, branch_name, ts)
mock_lookup_revision_with_context.assert_called_once_with(
stub_root_rev, sha1_git, 100)
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_entity_by_uuid(self, mock_query, mock_backend):
# given
uuid_test = 'correct-uuid'
mock_query.parse_uuid4.return_value = uuid_test
stub_entities = [{'uuid': uuid_test}]
mock_backend.entity_get.return_value = stub_entities
# when
actual_entities = service.lookup_entity_by_uuid(uuid_test)
# then
self.assertEquals(actual_entities, stub_entities)
mock_query.parse_uuid4.assert_called_once_with(uuid_test)
mock_backend.entity_get.assert_called_once_with(uuid_test)
@istest
def lookup_revision_through_ko_not_implemented(self):
# then
with self.assertRaises(NotImplementedError):
service.lookup_revision_through({
'something-unknown': 10,
})
@patch('swh.web.ui.service.lookup_revision_with_context_by')
@istest
def lookup_revision_through_with_context_by(self, mock_lookup):
# given
stub_rev = {'id': 'rev'}
mock_lookup.return_value = stub_rev
# when
actual_revision = service.lookup_revision_through({
'origin_id': 1,
'branch_name': 'master',
'ts': None,
'sha1_git': 'sha1-git'
}, limit=1000)
# then
self.assertEquals(actual_revision, stub_rev)
mock_lookup.assert_called_once_with(
1, 'master', None, 'sha1-git', 1000)
@patch('swh.web.ui.service.lookup_revision_by')
@istest
def lookup_revision_through_with_revision_by(self, mock_lookup):
# given
stub_rev = {'id': 'rev'}
mock_lookup.return_value = stub_rev
# when
actual_revision = service.lookup_revision_through({
'origin_id': 2,
'branch_name': 'master2',
'ts': 'some-ts',
}, limit=10)
# then
self.assertEquals(actual_revision, stub_rev)
mock_lookup.assert_called_once_with(
2, 'master2', 'some-ts')
@patch('swh.web.ui.service.lookup_revision_with_context')
@istest
def lookup_revision_through_with_context(self, mock_lookup):
# given
stub_rev = {'id': 'rev'}
mock_lookup.return_value = stub_rev
# when
actual_revision = service.lookup_revision_through({
'sha1_git_root': 'some-sha1-root',
'sha1_git': 'some-sha1',
})
# then
self.assertEquals(actual_revision, stub_rev)
mock_lookup.assert_called_once_with(
'some-sha1-root', 'some-sha1', 100)
@patch('swh.web.ui.service.lookup_revision')
@istest
def lookup_revision_through_with_revision(self, mock_lookup):
# given
stub_rev = {'id': 'rev'}
mock_lookup.return_value = stub_rev
# when
actual_revision = service.lookup_revision_through({
'sha1_git': 'some-sha1',
})
# then
self.assertEquals(actual_revision, stub_rev)
mock_lookup.assert_called_once_with(
'some-sha1')
@patch('swh.web.ui.service.lookup_revision_through')
@istest
def lookup_directory_through_revision_ko_not_found(
self, mock_lookup_rev):
# given
mock_lookup_rev.return_value = None
# when
with self.assertRaises(NotFoundExc):
service.lookup_directory_through_revision(
{'id': 'rev'}, 'some/path', 100)
mock_lookup_rev.assert_called_once_with({'id': 'rev'}, 100)
@patch('swh.web.ui.service.lookup_revision_through')
@patch('swh.web.ui.service.lookup_directory_with_revision')
@istest
def lookup_directory_through_revision_ok_with_data(
self, mock_lookup_dir, mock_lookup_rev):
# given
mock_lookup_rev.return_value = {'id': 'rev-id'}
mock_lookup_dir.return_value = {'type': 'dir',
'content': []}
# when
rev_id, dir_result = service.lookup_directory_through_revision(
{'id': 'rev'}, 'some/path', 100)
# then
self.assertEquals(rev_id, 'rev-id')
self.assertEquals(dir_result, {'type': 'dir',
'content': []})
mock_lookup_rev.assert_called_once_with({'id': 'rev'}, 100)
mock_lookup_dir.assert_called_once_with('rev-id', 'some/path', False)
@patch('swh.web.ui.service.lookup_revision_through')
@patch('swh.web.ui.service.lookup_directory_with_revision')
@istest
def lookup_directory_through_revision_ok_with_content(
self, mock_lookup_dir, mock_lookup_rev):
# given
mock_lookup_rev.return_value = {'id': 'rev-id'}
stub_result = {'type': 'file',
'revision': 'rev-id',
'content': {'data': b'blah',
'sha1': 'sha1'}}
mock_lookup_dir.return_value = stub_result
# when
rev_id, dir_result = service.lookup_directory_through_revision(
{'id': 'rev'}, 'some/path', 10, with_data=True)
# then
self.assertEquals(rev_id, 'rev-id')
self.assertEquals(dir_result, stub_result)
mock_lookup_rev.assert_called_once_with({'id': 'rev'}, 10)
mock_lookup_dir.assert_called_once_with('rev-id', 'some/path', True)
diff --git a/swh/web/ui/tests/views/test_api.py b/swh/web/ui/tests/views/test_api.py
index 945c5bb7..8b12c24d 100644
--- a/swh/web/ui/tests/views/test_api.py
+++ b/swh/web/ui/tests/views/test_api.py
@@ -1,1929 +1,1970 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
import unittest
import yaml
from nose.tools import istest
from unittest.mock import patch, MagicMock
from swh.web.ui.tests import test_app
from swh.web.ui import exc
from swh.web.ui.views import api
from swh.web.ui.exc import NotFoundExc, BadInputExc
from swh.storage.exc import StorageDBError, StorageAPIError
class ApiTestCase(test_app.SWHApiTestCase):
@istest
def generic_api_lookup_nothing_is_found(self):
# given
def test_generic_lookup_fn(sha1, another_unused_arg):
assert another_unused_arg == 'unused arg'
assert sha1 == 'sha1'
return None
# when
with self.assertRaises(NotFoundExc) as cm:
api._api_lookup('sha1', test_generic_lookup_fn,
'This will be raised because None is returned.',
lambda x: x,
'unused arg')
self.assertIn('This will be raised because None is returned.',
cm.exception.args[0])
@istest
def generic_api_map_are_enriched_and_transformed_to_list(self):
# given
def test_generic_lookup_fn_1(criteria0, param0, param1):
assert criteria0 == 'something'
return map(lambda x: x + 1, [1, 2, 3])
# when
actual_result = api._api_lookup(
'something',
test_generic_lookup_fn_1,
'This is not the error message you are looking for. Move along.',
lambda x: x * 2,
'some param 0',
'some param 1')
self.assertEqual(actual_result, [4, 6, 8])
@istest
def generic_api_list_are_enriched_too(self):
# given
def test_generic_lookup_fn_2(crit):
assert crit == 'something'
return ['a', 'b', 'c']
# when
actual_result = api._api_lookup(
'something',
test_generic_lookup_fn_2,
'Not the error message you are looking for, it is. '
'Along, you move!',
lambda x: ''. join(['=', x, '=']))
self.assertEqual(actual_result, ['=a=', '=b=', '=c='])
@istest
def generic_api_generator_are_enriched_and_returned_as_list(self):
# given
def test_generic_lookup_fn_3(crit):
assert crit == 'crit'
return (i for i in [4, 5, 6])
# when
actual_result = api._api_lookup(
'crit',
test_generic_lookup_fn_3,
'Move!',
lambda x: x - 1)
self.assertEqual(actual_result, [3, 4, 5])
@istest
def generic_api_simple_data_are_enriched_and_returned_too(self):
# given
def test_generic_lookup_fn_4(crit):
assert crit == '123'
return {'a': 10}
def test_enrich_data(x):
x['a'] = x['a'] * 10
return x
# when
actual_result = api._api_lookup(
'123',
test_generic_lookup_fn_4,
'Nothing to do',
test_enrich_data)
self.assertEqual(actual_result, {'a': 100})
@patch('swh.web.ui.views.api.service')
@istest
def api_content_provenance(self, mock_service):
stub_provenances = [{
'origin': 1,
'visit': 2,
'revision': 'b04caf10e9535160d90e874b45aa426de762f19f',
'content': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'path': 'octavio-3.4.0/octave.html/doc_002dS_005fISREG.html'
}]
mock_service.lookup_content_provenance.return_value = stub_provenances
# when
rv = self.app.get(
'/api/1/provenance/'
'sha1_git:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, [{
'origin': 1,
'visit': 2,
'origin_url': '/api/1/origin/1/',
'revision': 'b04caf10e9535160d90e874b45aa426de762f19f',
'revision_url': '/api/1/revision/'
'b04caf10e9535160d90e874b45aa426de762f19f/',
'content': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'content_url': '/api/1/content/'
'sha1_git:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/',
'path': 'octavio-3.4.0/octave.html/doc_002dS_005fISREG.html'
}])
mock_service.lookup_content_provenance.assert_called_once_with(
'sha1_git:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
@patch('swh.web.ui.views.api.service')
@istest
def api_content_provenance_sha_not_found(self, mock_service):
# given
mock_service.lookup_content_provenance.return_value = None
# when
rv = self.app.get(
'/api/1/provenance/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Content with sha1:40e71b8614fcd89ccd17ca2b1d9e6'
'6c5b00a6d03 not found.'
})
@patch('swh.web.ui.views.api.service')
@istest
def api_content_metadata(self, mock_service):
# given
mock_service.lookup_content.return_value = {
'sha1': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'sha1_git': 'b4e8f472ffcb01a03875b26e462eb568739f6882',
'sha256': '83c0e67cc80f60caf1fcbec2d84b0ccd7968b3be4735637006560'
'cde9b067a4f',
'length': 17,
'status': 'visible'
}
# when
rv = self.app.get(
'/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'data_url': '/api/1/content/'
'40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/raw/',
'sha1': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'sha1_git': 'b4e8f472ffcb01a03875b26e462eb568739f6882',
'sha256': '83c0e67cc80f60caf1fcbec2d84b0ccd7968b3be4735637006560c'
'de9b067a4f',
'length': 17,
'status': 'visible'
})
mock_service.lookup_content.assert_called_once_with(
'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
@patch('swh.web.ui.views.api.service')
@istest
def api_content_not_found_as_json(self, mock_service):
# given
mock_service.lookup_content.return_value = None
mock_service.lookup_content_provenance = MagicMock()
# when
rv = self.app.get(
'/api/1/content/sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3'
'be4735637006560c/')
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Content with sha256:83c0e67cc80f60caf1fcbec2d84b0ccd79'
'68b3be4735637006560c not found.'
})
mock_service.lookup_content.assert_called_once_with(
'sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3'
'be4735637006560c')
mock_service.lookup_content_provenance.called = False
@patch('swh.web.ui.views.api.service')
@istest
def api_content_not_found_as_yaml(self, mock_service):
# given
mock_service.lookup_content.return_value = None
mock_service.lookup_content_provenance = MagicMock()
# when
rv = self.app.get(
'/api/1/content/sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3'
'be4735637006560c/',
headers={'accept': 'application/yaml'})
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/yaml')
response_data = yaml.load(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Content with sha256:83c0e67cc80f60caf1fcbec2d84b0ccd79'
'68b3be4735637006560c not found.'
})
mock_service.lookup_content.assert_called_once_with(
'sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3'
'be4735637006560c')
mock_service.lookup_content_provenance.called = False
@patch('swh.web.ui.views.api.service')
@istest
def api_content_raw_ko_not_found(self, mock_service):
# given
mock_service.lookup_content_raw.return_value = None
# when
rv = self.app.get(
'/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'
'/raw/')
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Content with sha1:40e71b8614fcd89ccd17ca2b1d9e6'
'6c5b00a6d03 not found.'
})
mock_service.lookup_content_raw.assert_called_once_with(
'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
@patch('swh.web.ui.views.api.service')
@istest
def api_content_raw(self, mock_service):
# given
stub_content = {'data': b'some content data'}
mock_service.lookup_content_raw.return_value = stub_content
# when
rv = self.app.get(
'/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'
'/raw/',
headers={'Content-type': 'application/octet-stream',
'Content-disposition': 'attachment'})
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/octet-stream')
self.assertEquals(rv.data, stub_content['data'])
mock_service.lookup_content_raw.assert_called_once_with(
'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
@patch('swh.web.ui.views.api.service')
@istest
def api_search(self, mock_service):
# given
mock_service.search_hash.return_value = {'found': True}
expected_result = {
'search_stats': {'nbfiles': 1, 'pct': 100},
'search_res': [{'filename': None,
'sha1': 'sha1:blah',
'found': True}]
}
# when
rv = self.app.get('/api/1/content/search/sha1:blah/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_result)
mock_service.search_hash.assert_called_once_with('sha1:blah')
@patch('swh.web.ui.views.api.service')
@istest
def api_search_as_yaml(self, mock_service):
# given
mock_service.search_hash.return_value = {'found': True}
expected_result = {
'search_stats': {'nbfiles': 1, 'pct': 100},
'search_res': [{'filename': None,
'sha1': 'sha1:halb',
'found': True}]
}
# when
rv = self.app.get('/api/1/content/search/sha1:halb/',
headers={'Accept': 'application/yaml'})
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/yaml')
response_data = yaml.load(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_result)
mock_service.search_hash.assert_called_once_with('sha1:halb')
@patch('swh.web.ui.views.api.service')
@istest
def api_search_not_found(self, mock_service):
# given
mock_service.search_hash.return_value = {'found': False}
expected_result = {
'search_stats': {'nbfiles': 1, 'pct': 0},
'search_res': [{'filename': None,
'sha1': 'sha1:halb',
'found': False}]
}
# when
rv = self.app.get('/api/1/content/search/sha1:halb/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_result)
mock_service.search_hash.assert_called_once_with('sha1:halb')
@patch('swh.web.ui.views.api.service')
@istest
def api_1_stat_counters_raise_error(self, mock_service):
# given
mock_service.stat_counters.side_effect = ValueError(
'voluntary error to check the bad request middleware.')
# when
rv = self.app.get('/api/1/stat/counters/')
# then
self.assertEquals(rv.status_code, 400)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'voluntary error to check the bad request middleware.'})
@patch('swh.web.ui.views.api.service')
@istest
def api_1_stat_counters_raise_swh_storage_error_db(self, mock_service):
# given
mock_service.stat_counters.side_effect = StorageDBError(
'SWH Storage exploded! Will be back online shortly!')
# when
rv = self.app.get('/api/1/stat/counters/')
# then
self.assertEquals(rv.status_code, 503)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error':
'An unexpected error occurred in the backend: '
'SWH Storage exploded! Will be back online shortly!'})
@patch('swh.web.ui.views.api.service')
@istest
def api_1_stat_counters_raise_swh_storage_error_api(self, mock_service):
# given
mock_service.stat_counters.side_effect = StorageAPIError(
'SWH Storage API dropped dead! Will resurrect from its ashes asap!'
)
# when
rv = self.app.get('/api/1/stat/counters/')
# then
self.assertEquals(rv.status_code, 503)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error':
'An unexpected error occurred in the api backend: '
'SWH Storage API dropped dead! Will resurrect from its ashes asap!'
})
@patch('swh.web.ui.views.api.service')
@istest
def api_1_stat_counters(self, mock_service):
# given
stub_stats = {
"content": 1770830,
"directory": 211683,
"directory_entry_dir": 209167,
"directory_entry_file": 1807094,
"directory_entry_rev": 0,
"entity": 0,
"entity_history": 0,
"occurrence": 0,
"occurrence_history": 19600,
"origin": 1096,
"person": 0,
"release": 8584,
"revision": 7792,
"revision_history": 0,
"skipped_content": 0
}
mock_service.stat_counters.return_value = stub_stats
# when
rv = self.app.get('/api/1/stat/counters/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, stub_stats)
mock_service.stat_counters.assert_called_once_with()
@patch('swh.web.ui.views.api.service')
@istest
def api_1_lookup_origin_visits_raise_error(self, mock_service):
# given
mock_service.lookup_origin_visits.side_effect = ValueError(
'voluntary error to check the bad request middleware.')
# when
rv = self.app.get('/api/1/origin/2/visits/')
# then
self.assertEquals(rv.status_code, 400)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'voluntary error to check the bad request middleware.'})
@patch('swh.web.ui.views.api.service')
@istest
def api_1_lookup_origin_visits_raise_swh_storage_error_db(
self, mock_service):
# given
mock_service.lookup_origin_visits.side_effect = StorageDBError(
'SWH Storage exploded! Will be back online shortly!')
# when
rv = self.app.get('/api/1/origin/2/visits/')
# then
self.assertEquals(rv.status_code, 503)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error':
'An unexpected error occurred in the backend: '
'SWH Storage exploded! Will be back online shortly!'})
@patch('swh.web.ui.views.api.service')
@istest
def api_1_lookup_origin_visits_raise_swh_storage_error_api(
self, mock_service):
# given
mock_service.lookup_origin_visits.side_effect = StorageAPIError(
'SWH Storage API dropped dead! Will resurrect from its ashes asap!'
)
# when
rv = self.app.get('/api/1/origin/2/visits/')
# then
self.assertEquals(rv.status_code, 503)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error':
'An unexpected error occurred in the api backend: '
'SWH Storage API dropped dead! Will resurrect from its ashes asap!'
})
@patch('swh.web.ui.views.api.service')
@istest
def api_1_lookup_origin_visits(self, mock_service):
# given
stub_visits = [
{
'date': 1104616800.0,
'origin': 1,
'visit': 1
},
{
'date': 1293919200.0,
'origin': 1,
'visit': 2
},
{
'date': 1420149600.0,
'origin': 1,
'visit': 3
}
]
mock_service.lookup_origin_visits.return_value = stub_visits
# when
rv = self.app.get('/api/1/origin/2/visits/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, stub_visits)
mock_service.lookup_origin_visits.assert_called_once_with(2)
+ @patch('swh.web.ui.views.api.service')
+ @istest
+ def api_1_lookup_origin_visit(self, mock_service):
+ # given
+ stub_origin_visit = {
+ 'date': 1104616800.0,
+ 'origin': 10,
+ 'visit': 100,
+ 'metadata': None,
+ 'status': 'full',
+ }
+ mock_service.lookup_origin_visit.return_value = stub_origin_visit
+
+ # when
+ rv = self.app.get('/api/1/origin/10/visits/100/')
+
+ self.assertEquals(rv.status_code, 200)
+ self.assertEquals(rv.mimetype, 'application/json')
+ response_data = json.loads(rv.data.decode('utf-8'))
+ self.assertEquals(response_data, stub_origin_visit)
+
+ mock_service.lookup_origin_visit.assert_called_once_with(10, 100)
+
+ @patch('swh.web.ui.views.api.service')
+ @istest
+ def api_1_lookup_origin_visit_not_found(self, mock_service):
+ # given
+ mock_service.lookup_origin_visit.return_value = None
+
+ # when
+ rv = self.app.get('/api/1/origin/1/visits/1000/')
+
+ self.assertEquals(rv.status_code, 404)
+ self.assertEquals(rv.mimetype, 'application/json')
+ response_data = json.loads(rv.data.decode('utf-8'))
+ self.assertEquals(response_data, {
+ 'error': 'No visit 1000 for origin 1 found'
+ })
+
+ mock_service.lookup_origin_visit.assert_called_once_with(1, 1000)
+
@patch('swh.web.ui.views.api.service')
@istest
def api_origin_by_id(self, mock_service):
# given
stub_origin = {
'id': 1234,
'lister': 'uuid-lister-0',
'project': 'uuid-project-0',
'url': 'ftp://some/url/to/origin/0',
'type': 'ftp'
}
mock_service.lookup_origin.return_value = stub_origin
# when
rv = self.app.get('/api/1/origin/1234/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, stub_origin)
mock_service.lookup_origin.assert_called_with({'id': 1234})
@patch('swh.web.ui.views.api.service')
@istest
def api_origin_by_type_url(self, mock_service):
# given
stub_origin = {
'id': 1234,
'lister': 'uuid-lister-0',
'project': 'uuid-project-0',
'url': 'ftp://some/url/to/origin/0',
'type': 'ftp'
}
mock_service.lookup_origin.return_value = stub_origin
# when
rv = self.app.get('/api/1/origin/ftp/url/ftp://some/url/to/origin/0/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, stub_origin)
mock_service.lookup_origin.assert_called_with(
{'url': 'ftp://some/url/to/origin/0',
'type': 'ftp'})
@patch('swh.web.ui.views.api.service')
@istest
def api_origin_not_found(self, mock_service):
# given
mock_service.lookup_origin.return_value = None
# when
rv = self.app.get('/api/1/origin/4321/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Origin with id 4321 not found.'
})
mock_service.lookup_origin.assert_called_with({'id': 4321})
@patch('swh.web.ui.views.api.service')
@istest
def api_release(self, mock_service):
# given
stub_release = {
'id': 'release-0',
'target_type': 'revision',
'target': 'revision-sha1',
"date": "Mon, 10 Mar 1997 08:00:00 GMT",
"synthetic": True,
'author': {
'name': 'author release name',
'email': 'author@email',
},
}
expected_release = {
'id': 'release-0',
'target_type': 'revision',
'target': 'revision-sha1',
'target_url': '/api/1/revision/revision-sha1/',
"date": "Mon, 10 Mar 1997 08:00:00 GMT",
"synthetic": True,
'author': {
'name': 'author release name',
'email': 'author@email',
},
}
mock_service.lookup_release.return_value = stub_release
# when
rv = self.app.get('/api/1/release/release-0/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_release)
mock_service.lookup_release.assert_called_once_with('release-0')
@patch('swh.web.ui.views.api.service')
@istest
def api_release_target_type_not_a_revision(self, mock_service):
# given
stub_release = {
'id': 'release-0',
'target_type': 'other-stuff',
'target': 'other-stuff-checksum',
"date": "Mon, 10 Mar 1997 08:00:00 GMT",
"synthetic": True,
'author': {
'name': 'author release name',
'email': 'author@email',
},
}
expected_release = {
'id': 'release-0',
'target_type': 'other-stuff',
'target': 'other-stuff-checksum',
"date": "Mon, 10 Mar 1997 08:00:00 GMT",
"synthetic": True,
'author': {
'name': 'author release name',
'email': 'author@email',
},
}
mock_service.lookup_release.return_value = stub_release
# when
rv = self.app.get('/api/1/release/release-0/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_release)
mock_service.lookup_release.assert_called_once_with('release-0')
@patch('swh.web.ui.views.api.service')
@istest
def api_release_not_found(self, mock_service):
# given
mock_service.lookup_release.return_value = None
# when
rv = self.app.get('/api/1/release/release-0/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Release with sha1_git release-0 not found.'
})
@patch('swh.web.ui.views.api.service')
@istest
def api_revision(self, mock_service):
# given
stub_revision = {
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'author_name': 'Software Heritage',
'author_email': 'robot@softwareheritage.org',
'committer_name': 'Software Heritage',
'committer_email': 'robot@softwareheritage.org',
'message': 'synthetic revision message',
'date_offset': 0,
'committer_date_offset': 0,
'parents': ['8734ef7e7c357ce2af928115c6c6a42b7e2a44e7'],
'type': 'tar',
'synthetic': True,
'metadata': {
'original_artifact': [{
'archive_type': 'tar',
'name': 'webbase-5.7.0.tar.gz',
'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd',
'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1',
'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f'
'309d36484e7edf7bb912'
}]
},
}
mock_service.lookup_revision.return_value = stub_revision
expected_revision = {
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'url': '/api/1/revision/18d8be353ed3480476f032475e7c233eff7371d5/',
'history_url': '/api/1/revision/18d8be353ed3480476f032475e7c233e'
'ff7371d5/log/',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'directory_url': '/api/1/directory/7834ef7e7c357ce2af928115c6c6'
'a42b7e2a44e6/',
'author_name': 'Software Heritage',
'author_email': 'robot@softwareheritage.org',
'committer_name': 'Software Heritage',
'committer_email': 'robot@softwareheritage.org',
'message': 'synthetic revision message',
'date_offset': 0,
'committer_date_offset': 0,
'parents': [
'8734ef7e7c357ce2af928115c6c6a42b7e2a44e7'
],
'parent_urls': [
'/api/1/revision/8734ef7e7c357ce2af928115c6c6a42b7e2a44e7'
'/prev/18d8be353ed3480476f032475e7c233eff7371d5/'
],
'type': 'tar',
'synthetic': True,
'metadata': {
'original_artifact': [{
'archive_type': 'tar',
'name': 'webbase-5.7.0.tar.gz',
'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd',
'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1',
'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f'
'309d36484e7edf7bb912'
}]
},
}
# when
rv = self.app.get('/api/1/revision/'
'18d8be353ed3480476f032475e7c233eff7371d5/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_revision)
mock_service.lookup_revision.assert_called_once_with(
'18d8be353ed3480476f032475e7c233eff7371d5')
@patch('swh.web.ui.views.api.service')
@istest
def api_revision_not_found(self, mock_service):
# given
mock_service.lookup_revision.return_value = None
# when
rv = self.app.get('/api/1/revision/revision-0/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Revision with sha1_git revision-0 not found.'})
@patch('swh.web.ui.views.api.service')
@istest
def api_revision_raw_ok(self, mock_service):
# given
stub_revision = {'message': 'synthetic revision message'}
mock_service.lookup_revision_message.return_value = stub_revision
# when
rv = self.app.get('/api/1/revision/18d8be353ed3480476f032475e7c2'
'33eff7371d5/raw/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/octet-stream')
self.assertEquals(rv.data, b'synthetic revision message')
mock_service.lookup_revision_message.assert_called_once_with(
'18d8be353ed3480476f032475e7c233eff7371d5')
@patch('swh.web.ui.views.api.service')
@istest
def api_revision_raw_ok_no_msg(self, mock_service):
# given
mock_service.lookup_revision_message.side_effect = NotFoundExc(
'No message for revision')
# when
rv = self.app.get('/api/1/revision/'
'18d8be353ed3480476f032475e7c233eff7371d5/raw/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'No message for revision'})
self.assertEquals
mock_service.lookup_revision_message.assert_called_once_with(
'18d8be353ed3480476f032475e7c233eff7371d5')
@patch('swh.web.ui.views.api.service')
@istest
def api_revision_raw_ko_no_rev(self, mock_service):
# given
mock_service.lookup_revision_message.side_effect = NotFoundExc(
'No revision found')
# when
rv = self.app.get('/api/1/revision/'
'18d8be353ed3480476f032475e7c233eff7371d5/raw/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'No revision found'})
mock_service.lookup_revision_message.assert_called_once_with(
'18d8be353ed3480476f032475e7c233eff7371d5')
@patch('swh.web.ui.views.api.service')
@istest
def api_revision_with_origin_not_found(self, mock_service):
mock_service.lookup_revision_by.return_value = None
rv = self.app.get('/api/1/revision/origin/123/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertIn('Revision with (origin_id: 123', response_data['error'])
self.assertIn('not found', response_data['error'])
mock_service.lookup_revision_by.assert_called_once_with(
123,
'refs/heads/master',
None)
@patch('swh.web.ui.views.api.service')
@istest
def api_revision_with_origin(self, mock_service):
mock_revision = {
'id': '32',
'directory': '21',
'message': 'message 1',
'type': 'deb',
}
expected_revision = {
'id': '32',
'url': '/api/1/revision/32/',
'history_url': '/api/1/revision/32/log/',
'directory': '21',
'directory_url': '/api/1/directory/21/',
'message': 'message 1',
'type': 'deb',
}
mock_service.lookup_revision_by.return_value = mock_revision
rv = self.app.get('/api/1/revision/origin/1/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, expected_revision)
mock_service.lookup_revision_by.assert_called_once_with(
1,
'refs/heads/master',
None)
@patch('swh.web.ui.views.api.service')
@istest
def api_revision_with_origin_and_branch_name(self, mock_service):
mock_revision = {
'id': '12',
'directory': '23',
'message': 'message 2',
'type': 'tar',
}
mock_service.lookup_revision_by.return_value = mock_revision
expected_revision = {
'id': '12',
'url': '/api/1/revision/12/',
'history_url': '/api/1/revision/12/log/',
'directory': '23',
'directory_url': '/api/1/directory/23/',
'message': 'message 2',
'type': 'tar',
}
rv = self.app.get('/api/1/revision/origin/1/branch/refs/origin/dev/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, expected_revision)
mock_service.lookup_revision_by.assert_called_once_with(
1,
'refs/origin/dev',
None)
@patch('swh.web.ui.views.api.service')
@patch('swh.web.ui.views.api.utils')
@istest
def api_revision_with_origin_and_branch_name_and_timestamp(self,
mock_utils,
mock_service):
mock_revision = {
'id': '123',
'directory': '456',
'message': 'message 3',
'type': 'tar',
}
mock_service.lookup_revision_by.return_value = mock_revision
expected_revision = {
'id': '123',
'url': '/api/1/revision/123/',
'history_url': '/api/1/revision/123/log/',
'directory': '456',
'directory_url': '/api/1/directory/456/',
'message': 'message 3',
'type': 'tar',
}
mock_utils.parse_timestamp.return_value = 'parsed-date'
mock_utils.enrich_revision.return_value = expected_revision
rv = self.app.get('/api/1/revision'
'/origin/1'
'/branch/refs/origin/dev'
'/ts/1452591542/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, expected_revision)
mock_service.lookup_revision_by.assert_called_once_with(
1,
'refs/origin/dev',
'parsed-date')
mock_utils.parse_timestamp.assert_called_once_with('1452591542')
mock_utils.enrich_revision.assert_called_once_with(
mock_revision)
@patch('swh.web.ui.views.api.service')
@patch('swh.web.ui.views.api.utils')
@istest
def api_revision_with_origin_and_branch_name_and_timestamp_with_escapes(
self,
mock_utils,
mock_service):
mock_revision = {
'id': '999',
}
mock_service.lookup_revision_by.return_value = mock_revision
expected_revision = {
'id': '999',
'url': '/api/1/revision/999/',
'history_url': '/api/1/revision/999/log/',
}
mock_utils.parse_timestamp.return_value = 'parsed-date'
mock_utils.enrich_revision.return_value = expected_revision
rv = self.app.get('/api/1/revision'
'/origin/1'
'/branch/refs%2Forigin%2Fdev'
'/ts/Today%20is%20'
'January%201,%202047%20at%208:21:00AM/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, expected_revision)
mock_service.lookup_revision_by.assert_called_once_with(
1,
'refs/origin/dev',
'parsed-date')
mock_utils.parse_timestamp.assert_called_once_with(
'Today is January 1, 2047 at 8:21:00AM')
mock_utils.enrich_revision.assert_called_once_with(
mock_revision)
@patch('swh.web.ui.views.api.service')
@istest
def revision_directory_by_ko_raise(self, mock_service):
# given
mock_service.lookup_directory_through_revision.side_effect = NotFoundExc('not') # noqa
# when
with self.assertRaises(NotFoundExc):
api._revision_directory_by(
{'sha1_git': 'id'},
None,
'/api/1/revision/sha1/directory/')
# then
mock_service.lookup_directory_through_revision.assert_called_once_with(
{'sha1_git': 'id'},
None, limit=100, with_data=False)
@patch('swh.web.ui.views.api.service')
@istest
def revision_directory_by_type_dir(self, mock_service):
# given
mock_service.lookup_directory_through_revision.return_value = (
'rev-id',
{
'type': 'dir',
'revision': 'rev-id',
'path': 'some/path',
'content': []
})
# when
actual_dir_content = api._revision_directory_by(
{'sha1_git': 'blah-id'},
'some/path', '/api/1/revision/sha1/directory/')
# then
self.assertEquals(actual_dir_content, {
'type': 'dir',
'revision': 'rev-id',
'path': 'some/path',
'content': []
})
mock_service.lookup_directory_through_revision.assert_called_once_with(
{'sha1_git': 'blah-id'},
'some/path', limit=100, with_data=False)
@patch('swh.web.ui.views.api.service')
@istest
def revision_directory_by_type_file(self, mock_service):
# given
mock_service.lookup_directory_through_revision.return_value = (
'rev-id',
{
'type': 'file',
'revision': 'rev-id',
'path': 'some/path',
'content': {'blah': 'blah'}
})
# when
actual_dir_content = api._revision_directory_by(
{'sha1_git': 'sha1'},
'some/path',
'/api/1/revision/origin/2/directory/',
limit=1000, with_data=True)
# then
self.assertEquals(actual_dir_content, {
'type': 'file',
'revision': 'rev-id',
'path': 'some/path',
'content': {'blah': 'blah'}
})
mock_service.lookup_directory_through_revision.assert_called_once_with(
{'sha1_git': 'sha1'},
'some/path', limit=1000, with_data=True)
@patch('swh.web.ui.views.api.utils')
@patch('swh.web.ui.views.api._revision_directory_by')
@istest
def api_directory_through_revision_origin_ko_not_found(self,
mock_rev_dir,
mock_utils):
mock_rev_dir.side_effect = NotFoundExc('not found')
mock_utils.parse_timestamp.return_value = '2012-10-20 00:00:00'
rv = self.app.get('/api/1/revision'
'/origin/10'
'/branch/refs/remote/origin/dev'
'/ts/2012-10-20'
'/directory/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, {
'error': 'not found'})
mock_rev_dir.assert_called_once_with(
{'origin_id': 10,
'branch_name': 'refs/remote/origin/dev',
'ts': '2012-10-20 00:00:00'}, None,
'/api/1/revision'
'/origin/10'
'/branch/refs/remote/origin/dev'
'/ts/2012-10-20'
'/directory/',
with_data=False)
@patch('swh.web.ui.views.api._revision_directory_by')
@istest
def api_directory_through_revision_origin(self,
mock_revision_dir):
expected_res = [{
'id': '123'
}]
mock_revision_dir.return_value = expected_res
rv = self.app.get('/api/1/revision/origin/3/directory/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, expected_res)
mock_revision_dir.assert_called_once_with({
'origin_id': 3,
'branch_name': 'refs/heads/master',
'ts': None}, None, '/api/1/revision/origin/3/directory/',
with_data=False)
@patch('swh.web.ui.views.api.service')
@istest
def api_revision_log(self, mock_service):
# given
stub_revisions = [{
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'author_name': 'Software Heritage',
'author_email': 'robot@softwareheritage.org',
'committer_name': 'Software Heritage',
'committer_email': 'robot@softwareheritage.org',
'message': 'synthetic revision message',
'date_offset': 0,
'committer_date_offset': 0,
'parents': ['7834ef7e7c357ce2af928115c6c6a42b7e2a4345'],
'type': 'tar',
'synthetic': True,
}]
mock_service.lookup_revision_log.return_value = stub_revisions
expected_revisions = [{
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'url': '/api/1/revision/18d8be353ed3480476f032475e7c233eff7371d5/',
'history_url': '/api/1/revision/18d8be353ed3480476f032475e7c233ef'
'f7371d5/log/',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'directory_url': '/api/1/directory/7834ef7e7c357ce2af928115c6c6a'
'42b7e2a44e6/',
'author_name': 'Software Heritage',
'author_email': 'robot@softwareheritage.org',
'committer_name': 'Software Heritage',
'committer_email': 'robot@softwareheritage.org',
'message': 'synthetic revision message',
'date_offset': 0,
'committer_date_offset': 0,
'parents': [
'7834ef7e7c357ce2af928115c6c6a42b7e2a4345'
],
'parent_urls': [
'/api/1/revision/7834ef7e7c357ce2af928115c6c6a42b7e2a4345'
'/prev/18d8be353ed3480476f032475e7c233eff7371d5/'
],
'type': 'tar',
'synthetic': True,
}]
expected_response = {
'revisions': expected_revisions,
'next_revs_url': None
}
# when
rv = self.app.get('/api/1/revision/8834ef7e7c357ce2af928115c6c6a42'
'b7e2a44e6/log/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_response)
mock_service.lookup_revision_log.assert_called_once_with(
'8834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 26)
@patch('swh.web.ui.views.api.service')
@istest
def api_revision_log_with_next(self, mock_service):
# given
stub_revisions = []
for i in range(27):
stub_revisions.append({'id': i})
mock_service.lookup_revision_log.return_value = stub_revisions[:26]
expected_revisions = [x for x in stub_revisions if x['id'] < 25]
for e in expected_revisions:
e['url'] = '/api/1/revision/%s/' % e['id']
e['history_url'] = '/api/1/revision/%s/log/' % e['id']
expected_response = {
'revisions': expected_revisions,
'next_revs_url': '/api/1/revision/25/log/'
}
# when
rv = self.app.get('/api/1/revision/8834ef7e7c357ce2af928115c6c6a42'
'b7e2a44e6/log/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_response)
mock_service.lookup_revision_log.assert_called_once_with(
'8834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 26)
@patch('swh.web.ui.views.api.service')
@istest
def api_revision_log_not_found(self, mock_service):
# given
mock_service.lookup_revision_log.return_value = None
# when
rv = self.app.get('/api/1/revision/8834ef7e7c357ce2af928115c6c6a42b7'
'e2a44e6/log/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Revision with sha1_git'
' 8834ef7e7c357ce2af928115c6c6a42b7e2a44e6 not found.'})
mock_service.lookup_revision_log.assert_called_once_with(
'8834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 26)
@patch('swh.web.ui.views.api.service')
@istest
def api_revision_log_context(self, mock_service):
# given
stub_revisions = [{
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'author_name': 'Software Heritage',
'author_email': 'robot@softwareheritage.org',
'committer_name': 'Software Heritage',
'committer_email': 'robot@softwareheritage.org',
'message': 'synthetic revision message',
'date_offset': 0,
'committer_date_offset': 0,
'parents': ['7834ef7e7c357ce2af928115c6c6a42b7e2a4345'],
'type': 'tar',
'synthetic': True,
}]
mock_service.lookup_revision_log.return_value = stub_revisions
mock_service.lookup_revision_multiple.return_value = [{
'id': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'directory': '18d8be353ed3480476f032475e7c233eff7371d5',
'author_name': 'Name Surname',
'author_email': 'name@surname.com',
'committer_name': 'Name Surname',
'committer_email': 'name@surname.com',
'message': 'amazing revision message',
'date_offset': 0,
'committer_date_offset': 0,
'parents': ['adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'],
'type': 'tar',
'synthetic': True,
}]
expected_revisions = [
{
'url': '/api/1/revision/'
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6/',
'history_url': '/api/1/revision/'
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6/log/',
'id': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'directory': '18d8be353ed3480476f032475e7c233eff7371d5',
'directory_url': '/api/1/directory/'
'18d8be353ed3480476f032475e7c233eff7371d5/',
'author_name': 'Name Surname',
'author_email': 'name@surname.com',
'committer_name': 'Name Surname',
'committer_email': 'name@surname.com',
'message': 'amazing revision message',
'date_offset': 0,
'committer_date_offset': 0,
'parents': ['adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'],
'parent_urls': [
'/api/1/revision/adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'
'/prev/7834ef7e7c357ce2af928115c6c6a42b7e2a44e6/'
],
'type': 'tar',
'synthetic': True,
},
{
'url': '/api/1/revision/'
'18d8be353ed3480476f032475e7c233eff7371d5/',
'history_url': '/api/1/revision/'
'18d8be353ed3480476f032475e7c233eff7371d5/log/',
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'directory_url': '/api/1/directory/'
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6/',
'author_name': 'Software Heritage',
'author_email': 'robot@softwareheritage.org',
'committer_name': 'Software Heritage',
'committer_email': 'robot@softwareheritage.org',
'message': 'synthetic revision message',
'date_offset': 0,
'committer_date_offset': 0,
'parents': ['7834ef7e7c357ce2af928115c6c6a42b7e2a4345'],
'parent_urls': [
'/api/1/revision/7834ef7e7c357ce2af928115c6c6a42b7e2a4345'
'/prev/18d8be353ed3480476f032475e7c233eff7371d5/'
],
'type': 'tar',
'synthetic': True,
}]
expected_response = {
'revisions': expected_revisions,
'next_revs_url': None
}
# when
rv = self.app.get('/api/1/revision/18d8be353ed3480476f0'
'32475e7c233eff7371d5/prev/prev-rev/log/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_response)
mock_service.lookup_revision_log.assert_called_once_with(
'18d8be353ed3480476f032475e7c233eff7371d5', 26)
mock_service.lookup_revision_multiple.assert_called_once_with(
['prev-rev'])
@patch('swh.web.ui.views.api.service')
@istest
def api_revision_log_by(self, mock_service):
# given
stub_revisions = [{
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'author_name': 'Software Heritage',
'author_email': 'robot@softwareheritage.org',
'committer_name': 'Software Heritage',
'committer_email': 'robot@softwareheritage.org',
'message': 'synthetic revision message',
'date_offset': 0,
'committer_date_offset': 0,
'parents': ['7834ef7e7c357ce2af928115c6c6a42b7e2a4345'],
'type': 'tar',
'synthetic': True,
}]
mock_service.lookup_revision_log_by.return_value = stub_revisions
expected_revisions = [{
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'url': '/api/1/revision/18d8be353ed3480476f032475e7c233eff7371d5/',
'history_url': '/api/1/revision/18d8be353ed3480476f032475e7c233ef'
'f7371d5/log/',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'directory_url': '/api/1/directory/7834ef7e7c357ce2af928115c6c6a'
'42b7e2a44e6/',
'author_name': 'Software Heritage',
'author_email': 'robot@softwareheritage.org',
'committer_name': 'Software Heritage',
'committer_email': 'robot@softwareheritage.org',
'message': 'synthetic revision message',
'date_offset': 0,
'committer_date_offset': 0,
'parents': [
'7834ef7e7c357ce2af928115c6c6a42b7e2a4345'
],
'parent_urls': [
'/api/1/revision/7834ef7e7c357ce2af928115c6c6a42b7e2a4345'
'/prev/18d8be353ed3480476f032475e7c233eff7371d5/'
],
'type': 'tar',
'synthetic': True,
}]
expected_result = {
'revisions': expected_revisions,
'next_revs_url': None
}
# when
rv = self.app.get('/api/1/revision/origin/1/log/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_result)
mock_service.lookup_revision_log_by.assert_called_once_with(
1, 'refs/heads/master', None, 26)
@patch('swh.web.ui.views.api.service')
@istest
def api_revision_log_by_with_next(self, mock_service):
# given
stub_revisions = []
for i in range(27):
stub_revisions.append({'id': i})
mock_service.lookup_revision_log_by.return_value = stub_revisions[:26]
expected_revisions = [x for x in stub_revisions if x['id'] < 25]
for e in expected_revisions:
e['url'] = '/api/1/revision/%s/' % e['id']
e['history_url'] = '/api/1/revision/%s/log/' % e['id']
expected_response = {
'revisions': expected_revisions,
'next_revs_url': '/api/1/revision/25/log/'
}
# when
rv = self.app.get('/api/1/revision/origin/1/log/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_response)
mock_service.lookup_revision_log_by.assert_called_once_with(
1, 'refs/heads/master', None, 26)
@patch('swh.web.ui.views.api.service')
@istest
def api_revision_log_by_norev(self, mock_service):
# given
mock_service.lookup_revision_log_by.side_effect = NotFoundExc(
'No revision')
# when
rv = self.app.get('/api/1/revision/origin/1/log/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {'error': 'No revision'})
mock_service.lookup_revision_log_by.assert_called_once_with(
1, 'refs/heads/master', None, 26)
@patch('swh.web.ui.views.api.service')
@istest
def api_revision_history(self, mock_service):
# for readability purposes, we use:
# - sha1 as 3 letters (url are way too long otherwise to respect pep8)
# - only keys with modification steps (all other keys are kept as is)
# given
stub_revision = {
'id': '883',
'children': ['777', '999'],
'parents': [],
'directory': '272'
}
mock_service.lookup_revision.return_value = stub_revision
# then
rv = self.app.get('/api/1/revision/883/prev/999/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'id': '883',
'url': '/api/1/revision/883/',
'history_url': '/api/1/revision/883/log/',
'history_context_url': '/api/1/revision/883/prev/999/log/',
'children': ['777', '999'],
'children_urls': ['/api/1/revision/777/',
'/api/1/revision/999/'],
'parents': [],
'parent_urls': [],
'directory': '272',
'directory_url': '/api/1/directory/272/'
})
mock_service.lookup_revision.assert_called_once_with('883')
@patch('swh.web.ui.views.api._revision_directory_by')
@istest
def api_revision_directory_ko_not_found(self, mock_rev_dir):
# given
mock_rev_dir.side_effect = NotFoundExc('Not found')
# then
rv = self.app.get('/api/1/revision/999/directory/some/path/to/dir/')
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Not found'})
mock_rev_dir.assert_called_once_with(
{'sha1_git': '999'},
'some/path/to/dir',
'/api/1/revision/999/directory/some/path/to/dir/',
with_data=False)
@patch('swh.web.ui.views.api._revision_directory_by')
@istest
def api_revision_directory_ok_returns_dir_entries(self, mock_rev_dir):
stub_dir = {
'type': 'dir',
'revision': '999',
'content': [
{
'sha1_git': '789',
'type': 'file',
'target': '101',
'target_url': '/api/1/content/sha1_git:101/',
'name': 'somefile',
'file_url': '/api/1/revision/999/directory/some/path/'
'somefile/'
},
{
'sha1_git': '123',
'type': 'dir',
'target': '456',
'target_url': '/api/1/directory/456/',
'name': 'to-subdir',
'dir_url': '/api/1/revision/999/directory/some/path/'
'to-subdir/',
}]
}
# given
mock_rev_dir.return_value = stub_dir
# then
rv = self.app.get('/api/1/revision/999/directory/some/path/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, stub_dir)
mock_rev_dir.assert_called_once_with(
{'sha1_git': '999'},
'some/path',
'/api/1/revision/999/directory/some/path/',
with_data=False)
@patch('swh.web.ui.views.api._revision_directory_by')
@istest
def api_revision_directory_ok_returns_content(self, mock_rev_dir):
stub_content = {
'type': 'file',
'revision': '999',
'content': {
'sha1_git': '789',
'sha1': '101',
'data_url': '/api/1/content/101/raw/',
}
}
# given
mock_rev_dir.return_value = stub_content
# then
url = '/api/1/revision/666/directory/some/other/path/'
rv = self.app.get(url)
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, stub_content)
mock_rev_dir.assert_called_once_with(
{'sha1_git': '666'}, 'some/other/path', url, with_data=False)
@patch('swh.web.ui.views.api.service')
@istest
def api_person(self, mock_service):
# given
stub_person = {
'id': '198003',
'name': 'Software Heritage',
'email': 'robot@softwareheritage.org',
}
mock_service.lookup_person.return_value = stub_person
# when
rv = self.app.get('/api/1/person/198003/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, stub_person)
@patch('swh.web.ui.views.api.service')
@istest
def api_person_not_found(self, mock_service):
# given
mock_service.lookup_person.return_value = None
# when
rv = self.app.get('/api/1/person/666/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Person with id 666 not found.'})
@patch('swh.web.ui.views.api.service')
@istest
def api_directory(self, mock_service):
# given
stub_directories = [
{
'sha1_git': '18d8be353ed3480476f032475e7c233eff7371d5',
'type': 'file',
'target': '4568be353ed3480476f032475e7c233eff737123',
},
{
'sha1_git': '1d518d8be353ed3480476f032475e7c233eff737',
'type': 'dir',
'target': '8be353ed3480476f032475e7c233eff737123456',
}]
expected_directories = [
{
'sha1_git': '18d8be353ed3480476f032475e7c233eff7371d5',
'type': 'file',
'target': '4568be353ed3480476f032475e7c233eff737123',
'target_url': '/api/1/content/'
'sha1_git:4568be353ed3480476f032475e7c233eff737123/',
},
{
'sha1_git': '1d518d8be353ed3480476f032475e7c233eff737',
'type': 'dir',
'target': '8be353ed3480476f032475e7c233eff737123456',
'target_url':
'/api/1/directory/8be353ed3480476f032475e7c233eff737123456/',
}]
mock_service.lookup_directory.return_value = stub_directories
# when
rv = self.app.get('/api/1/directory/'
'18d8be353ed3480476f032475e7c233eff7371d5/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_directories)
mock_service.lookup_directory.assert_called_once_with(
'18d8be353ed3480476f032475e7c233eff7371d5')
@patch('swh.web.ui.views.api.service')
@istest
def api_directory_not_found(self, mock_service):
# given
mock_service.lookup_directory.return_value = []
# when
rv = self.app.get('/api/1/directory/'
'66618d8be353ed3480476f032475e7c233eff737/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Directory with sha1_git '
'66618d8be353ed3480476f032475e7c233eff737 not found.'})
@patch('swh.web.ui.views.api.service')
@istest
def api_directory_with_path_found(self, mock_service):
# given
expected_dir = {
'sha1_git': '18d8be353ed3480476f032475e7c233eff7371d5',
'type': 'file',
'name': 'bla',
'target': '4568be353ed3480476f032475e7c233eff737123',
'target_url': '/api/1/content/'
'sha1_git:4568be353ed3480476f032475e7c233eff737123/',
}
mock_service.lookup_directory_with_path.return_value = expected_dir
# when
rv = self.app.get('/api/1/directory/'
'18d8be353ed3480476f032475e7c233eff7371d5/bla/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_dir)
mock_service.lookup_directory_with_path.assert_called_once_with(
'18d8be353ed3480476f032475e7c233eff7371d5', 'bla')
@patch('swh.web.ui.views.api.service')
@istest
def api_directory_with_path_not_found(self, mock_service):
# given
mock_service.lookup_directory_with_path.return_value = None
path = 'some/path/to/dir/'
# when
rv = self.app.get(('/api/1/directory/'
'66618d8be353ed3480476f032475e7c233eff737/%s')
% path)
path = path.strip('/') # Path stripped of lead/trail separators
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': (('Entry with path %s relative to '
'directory with sha1_git '
'66618d8be353ed3480476f032475e7c233eff737 not found.')
% path)})
@patch('swh.web.ui.views.api.service')
@istest
def api_lookup_entity_by_uuid_not_found(self, mock_service):
# when
mock_service.lookup_entity_by_uuid.return_value = []
# when
rv = self.app.get('/api/1/entity/'
'5f4d4c51-498a-4e28-88b3-b3e4e8396cba/')
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error':
"Entity with uuid '5f4d4c51-498a-4e28-88b3-b3e4e8396cba' not " +
"found."})
mock_service.lookup_entity_by_uuid.assert_called_once_with(
'5f4d4c51-498a-4e28-88b3-b3e4e8396cba')
@patch('swh.web.ui.views.api.service')
@istest
def api_lookup_entity_by_uuid_bad_request(self, mock_service):
# when
mock_service.lookup_entity_by_uuid.side_effect = BadInputExc(
'bad input: uuid malformed!')
# when
rv = self.app.get('/api/1/entity/uuid malformed/')
self.assertEquals(rv.status_code, 400)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'bad input: uuid malformed!'})
mock_service.lookup_entity_by_uuid.assert_called_once_with(
'uuid malformed')
@patch('swh.web.ui.views.api.service')
@istest
def api_lookup_entity_by_uuid(self, mock_service):
# when
stub_entities = [
{
'uuid': '34bd6b1b-463f-43e5-a697-785107f598e4',
'parent': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2'
},
{
'uuid': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2'
}
]
mock_service.lookup_entity_by_uuid.return_value = stub_entities
expected_entities = [
{
'uuid': '34bd6b1b-463f-43e5-a697-785107f598e4',
'uuid_url': '/api/1/entity/34bd6b1b-463f-43e5-a697-'
'785107f598e4/',
'parent': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2',
'parent_url': '/api/1/entity/aee991a0-f8d7-4295-a201-'
'd1ce2efc9fb2/'
},
{
'uuid': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2',
'uuid_url': '/api/1/entity/aee991a0-f8d7-4295-a201-'
'd1ce2efc9fb2/'
}
]
# when
rv = self.app.get('/api/1/entity'
'/34bd6b1b-463f-43e5-a697-785107f598e4/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_entities)
mock_service.lookup_entity_by_uuid.assert_called_once_with(
'34bd6b1b-463f-43e5-a697-785107f598e4')
class ApiUtils(unittest.TestCase):
@istest
def api_lookup_not_found(self):
# when
with self.assertRaises(exc.NotFoundExc) as e:
api._api_lookup('something',
lambda x: None,
'this is the error message raised as it is None')
self.assertEqual(e.exception.args[0],
'this is the error message raised as it is None')
@istest
def api_lookup_with_result(self):
# when
actual_result = api._api_lookup('something',
lambda x: x + '!',
'this is the error which won\'t be '
'used here')
self.assertEqual(actual_result, 'something!')
@istest
def api_lookup_with_result_as_map(self):
# when
actual_result = api._api_lookup([1, 2, 3],
lambda x: map(lambda y: y+1, x),
'this is the error which won\'t be '
'used here')
self.assertEqual(actual_result, [2, 3, 4])
diff --git a/swh/web/ui/views/api.py b/swh/web/ui/views/api.py
index d3e8befa..954b6a99 100644
--- a/swh/web/ui/views/api.py
+++ b/swh/web/ui/views/api.py
@@ -1,780 +1,795 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from types import GeneratorType
from flask import request, url_for
from swh.web.ui import service, utils, apidoc as doc
from swh.web.ui.exc import NotFoundExc
from swh.web.ui.main import app
@app.route('/api/1/stat/counters/')
@doc.route('/api/1/stat/counters/', noargs=True)
@doc.returns(rettype=doc.rettypes.dict,
retdoc="A dictionary of SWH's most important statistics")
def api_stats():
"""Return statistics on SWH storage.
"""
return service.stat_counters()
@app.route('/api/1/origin/<int:origin_id>/visits/')
-@app.route('/api/1/origin/<int:origin_id>/visits/<int:visit_id>')
@doc.route('/api/1/origin/visits/')
@doc.arg('origin_id',
default=1,
argtype=doc.argtypes.int,
argdoc='The requested SWH origin identifier')
-@doc.arg('visit_id',
- default=None,
- argtype=doc.argtypes.int,
- argdoc='The requested SWH origin visit identifier')
@doc.returns(rettype=doc.rettypes.list,
retdoc="""All instances of visits of the origin pointed by
- origin_id as POSIX time since epoch (if visit_id is not defined).
- Only the instance of visit pointed by origin_id and visit_id.""")
-def api_origin_visits(origin_id, visit_id=None):
+ origin_id as POSIX time since epoch (if visit_id is not defined)
+""")
+def api_origin_visits(origin_id):
"""Return a list of visit dates as POSIX timestamps for the
given revision.
"""
- if not visit_id:
- return _api_lookup(
- origin_id,
- service.lookup_origin_visits,
- error_msg_if_not_found='No origin %s found' % origin_id)
- # TODO
+ return _api_lookup(
+ origin_id,
+ service.lookup_origin_visits,
+ error_msg_if_not_found='No origin %s found' % origin_id)
+
+
+@app.route('/api/1/origin/<int:origin_id>/visits/<int:visit_id>/')
+@doc.route('/api/1/origin/visits/id/')
+@doc.arg('origin_id',
+ default=1,
+ argtype=doc.argtypes.int,
+ argdoc='The requested SWH origin identifier')
+@doc.arg('visit_id',
+ default=None,
+ argtype=doc.argtypes.int,
+ argdoc='The requested SWH origin visit identifier')
+@doc.returns(rettype=doc.rettypes.list,
+ retdoc="""The single instance visit visit_id of the origin pointed
+ by origin_id as POSIX time since epoch""")
+def api_origin_visit(origin_id, visit_id):
+ return _api_lookup(
+ origin_id,
+ service.lookup_origin_visit,
+ 'No visit %s for origin %s found' % (visit_id, origin_id),
+ lambda x: x,
+ visit_id)
@app.route('/api/1/content/search/', methods=['POST'])
@app.route('/api/1/content/search/<string:q>/')
@doc.route('/api/1/content/search/')
@doc.arg('q',
default='sha1:adc83b19e793491b1c6ea0fd8b46cd9f32e592fc',
argtype=doc.argtypes.algo_and_hash,
argdoc="""An algo_hash:hash string, where algo_hash is one of sha1,
sha1_git or sha256 and hash is the hash to search for in SWH""")
@doc.raises(exc=doc.excs.badinput,
doc='Raised if q is not well formed')
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""A dict with keys:
- search_res: a list of dicts corresponding to queried content
with key 'found' to True if found, 'False' if not
- search_stats: a dict containing number of files searched and
percentage of files found
""")
def api_search(q=None):
"""Search a content per hash.
This may take the form of:
- a GET request with a single checksum
- a POST request with many hashes, with the request body containing
identifiers (typically filenames) as keys and corresponding hashes as
values.
"""
response = {'search_res': None,
'search_stats': None}
search_stats = {'nbfiles': 0, 'pct': 0}
search_res = None
# Single hash request route
if q:
r = service.search_hash(q)
search_res = [{'filename': None,
'sha1': q,
'found': r['found']}]
search_stats['nbfiles'] = 1
search_stats['pct'] = 100 if r['found'] else 0
# Post form submission with many hash requests
elif request.method == 'POST':
data = request.form
queries = []
# Remove potential inputs with no associated value
for k, v in data.items():
if v is not None:
if k == 'q' and len(v) > 0:
queries.append({'filename': None, 'sha1': v})
elif v != '':
queries.append({'filename': k, 'sha1': v})
if len(queries) > 0:
lookup = service.lookup_multiple_hashes(queries)
result = []
for el in lookup:
result.append({'filename': el['filename'],
'sha1': el['sha1'],
'found': el['found']})
search_res = result
nbfound = len([x for x in lookup if x['found']])
search_stats['nbfiles'] = len(queries)
search_stats['pct'] = (nbfound / len(queries))*100
response['search_res'] = search_res
response['search_stats'] = search_stats
return response
def _api_lookup(criteria,
lookup_fn,
error_msg_if_not_found,
enrich_fn=lambda x: x,
*args):
"""Capture a redundant behavior of:
- looking up the backend with a criteria (be it an identifier or checksum)
passed to the function lookup_fn
- if nothing is found, raise an NotFoundExc exception with error
message error_msg_if_not_found.
- Otherwise if something is returned:
- either as list, map or generator, map the enrich_fn function to it
and return the resulting data structure as list.
- either as dict and pass to enrich_fn and return the dict enriched.
Args:
- criteria: discriminating criteria to lookup
- lookup_fn: function expects one criteria and optional supplementary
*args.
- error_msg_if_not_found: if nothing matching the criteria is found,
raise NotFoundExc with this error message.
- enrich_fn: Function to use to enrich the result returned by
lookup_fn. Default to the identity function if not provided.
- *args: supplementary arguments to pass to lookup_fn.
Raises:
NotFoundExp or whatever `lookup_fn` raises.
"""
res = lookup_fn(criteria, *args)
if not res:
raise NotFoundExc(error_msg_if_not_found)
if isinstance(res, (map, list, GeneratorType)):
return [enrich_fn(x) for x in res]
return enrich_fn(res)
@app.route('/api/1/origin/<int:origin_id>/')
@app.route('/api/1/origin/<string:origin_type>/url/<path:origin_url>/')
@doc.route('/api/1/origin/')
@doc.arg('origin_id',
default=1,
argtype=doc.argtypes.int,
argdoc="The origin's SWH origin_id.")
@doc.arg('origin_type',
default='git',
argtype=doc.argtypes.str,
argdoc="The origin's type (git, svn..)")
@doc.arg('origin_url',
default='https://github.com/hylang/hy',
argtype=doc.argtypes.path,
argdoc="The origin's URL.")
@doc.raises(exc=doc.excs.notfound,
doc='Raised if origin_id does not correspond to an origin in SWH')
@doc.returns(rettype=doc.rettypes.dict,
retdoc='The metadata of the origin identified by origin_id')
def api_origin(origin_id=None, origin_type=None, origin_url=None):
"""Return information about the origin matching the passed criteria.
Criteria may be:
- An SWH-specific ID, if you already know it
- An origin type and its URL, if you do not have the origin's SWH
identifier
"""
ori_dict = {
'id': origin_id,
'type': origin_type,
'url': origin_url
}
ori_dict = {k: v for k, v in ori_dict.items() if ori_dict[k]}
if 'id' in ori_dict:
error_msg = 'Origin with id %s not found.' % ori_dict['id']
else:
error_msg = 'Origin with type %s and URL %s not found' % (
ori_dict['type'], ori_dict['url'])
return _api_lookup(
ori_dict, lookup_fn=service.lookup_origin,
error_msg_if_not_found=error_msg)
@app.route('/api/1/person/<int:person_id>/')
@doc.route('/api/1/person/')
@doc.arg('person_id',
default=1,
argtype=doc.argtypes.int,
argdoc="The person's SWH identifier")
@doc.raises(exc=doc.excs.notfound,
doc='Raised if person_id does not correspond to an origin in SWH')
@doc.returns(rettype=doc.rettypes.dict,
retdoc='The metadata of the person identified by person_id')
def api_person(person_id):
"""Return information about person with identifier person_id.
"""
return _api_lookup(
person_id, lookup_fn=service.lookup_person,
error_msg_if_not_found='Person with id %s not found.' % person_id)
@app.route('/api/1/release/<string:sha1_git>/')
@doc.route('/api/1/release/')
@doc.arg('sha1_git',
default='8b137891791fe96927ad78e64b0aad7bded08bdc',
argtype=doc.argtypes.sha1_git,
argdoc="The release's sha1_git identifier")
@doc.raises(exc=doc.excs.badinput,
doc='Raised if the argument is not a sha1')
@doc.raises(exc=doc.excs.notfound,
doc='Raised if sha1_git does not correspond to a release in SWH')
@doc.returns(rettype=doc.rettypes.dict,
retdoc='The metadata of the release identified by sha1_git')
def api_release(sha1_git):
"""Return information about release with id sha1_git.
"""
error_msg = 'Release with sha1_git %s not found.' % sha1_git
return _api_lookup(
sha1_git,
lookup_fn=service.lookup_release,
error_msg_if_not_found=error_msg,
enrich_fn=utils.enrich_release)
def _revision_directory_by(revision, path, request_path,
limit=100, with_data=False):
"""Compute the revision matching criterion's directory or content data.
Args:
revision: dictionary of criterions representing a revision to lookup
path: directory's path to lookup
request_path: request path which holds the original context to
limit: optional query parameter to limit the revisions log
(default to 100). For now, note that this limit could impede the
transitivity conclusion about sha1_git not being an ancestor of
with_data: indicate to retrieve the content's raw data if path resolves
to a content.
"""
def enrich_directory_local(dir, context_url=request_path):
return utils.enrich_directory(dir, context_url)
rev_id, result = service.lookup_directory_through_revision(
revision, path, limit=limit, with_data=with_data)
content = result['content']
if result['type'] == 'dir': # dir_entries
result['content'] = list(map(enrich_directory_local, content))
else: # content
result['content'] = utils.enrich_content(content)
return result
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/directory/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/directory/<path:path>/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/directory/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/directory/<path:path>/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/ts/<string:ts>'
'/directory/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/ts/<string:ts>'
'/directory/<path:path>/')
@doc.route('/api/1/revision/origin/directory/')
@doc.arg('origin_id',
default=1,
argtype=doc.argtypes.int,
argdoc="The revision's origin's SWH identifier")
@doc.arg('branch_name',
default='refs/heads/master',
argtype=doc.argtypes.path,
argdoc="""The optional branch for the given origin (default
to master""")
@doc.arg('ts',
default='2000-01-17T11:23:54+00:00',
argtype=doc.argtypes.ts,
argdoc="""Optional timestamp (default to the nearest time
crawl of timestamp)""")
@doc.arg('path',
default='.',
argtype=doc.argtypes.path,
argdoc='The path to the directory or file to display')
@doc.raises(exc=doc.excs.notfound,
doc="""Raised if a revision matching the passed criteria was
not found""")
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""The metadata of the revision corresponding to the
passed criteria""")
def api_directory_through_revision_origin(origin_id,
branch_name="refs/heads/master",
ts=None,
path=None,
with_data=False):
"""Display directory or content information through a revision identified
by origin/branch/timestamp.
"""
if ts:
ts = utils.parse_timestamp(ts)
return _revision_directory_by(
{
'origin_id': origin_id,
'branch_name': branch_name,
'ts': ts
},
path,
request.path,
with_data=with_data)
@app.route('/api/1/revision'
'/origin/<int:origin_id>/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/ts/<string:ts>/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/ts/<string:ts>/')
@doc.route('/api/1/revision/origin/')
@doc.arg('origin_id',
default=1,
argtype=doc.argtypes.int,
argdoc="The queried revision's origin identifier in SWH")
@doc.arg('branch_name',
default='refs/heads/master',
argtype=doc.argtypes.path,
argdoc="""The optional branch for the given origin (default
to master)""")
@doc.arg('ts',
default='2000-01-17T11:23:54+00:00',
argtype=doc.argtypes.ts,
argdoc="The time at which the queried revision should be constrained")
@doc.raises(exc=doc.excs.notfound,
doc="""Raised if a revision matching given criteria was not found
in SWH""")
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""The metadata of the revision identified by the given
criteria""")
def api_revision_with_origin(origin_id,
branch_name="refs/heads/master",
ts=None):
"""Display revision information through its identification by
origin/branch/timestamp.
"""
if ts:
ts = utils.parse_timestamp(ts)
return _api_lookup(
origin_id,
service.lookup_revision_by,
'Revision with (origin_id: %s, branch_name: %s'
', ts: %s) not found.' % (origin_id,
branch_name,
ts),
utils.enrich_revision,
branch_name,
ts)
@app.route('/api/1/revision/<string:sha1_git>/')
@app.route('/api/1/revision/<string:sha1_git>/prev/<path:context>/')
@doc.route('/api/1/revision/')
@doc.arg('sha1_git',
default='ec72c666fb345ea5f21359b7bc063710ce558e39',
argtype=doc.argtypes.sha1_git,
argdoc="The revision's sha1_git identifier")
@doc.arg('context',
default='6adc4a22f20bbf3bbc754f1ec8c82be5dfb5c71a',
argtype=doc.argtypes.path,
argdoc='The navigation breadcrumbs -- use at your own risk')
@doc.raises(exc=doc.excs.badinput,
doc='Raised if sha1_git is not well formed')
@doc.raises(exc=doc.excs.notfound,
doc='Raised if a revision matching sha1_git was not found in SWH')
@doc.returns(rettype=doc.rettypes.dict,
retdoc='The metadata of the revision identified by sha1_git')
def api_revision(sha1_git, context=None):
"""Return information about revision with id sha1_git.
"""
def _enrich_revision(revision, context=context):
return utils.enrich_revision(revision, context)
return _api_lookup(
sha1_git,
service.lookup_revision,
'Revision with sha1_git %s not found.' % sha1_git,
_enrich_revision)
@app.route('/api/1/revision/<string:sha1_git>/raw/')
@doc.route('/api/1/revision/raw/')
@doc.arg('sha1_git',
default='ec72c666fb345ea5f21359b7bc063710ce558e39',
argtype=doc.argtypes.sha1_git,
argdoc="The queried revision's sha1_git identifier")
@doc.raises(exc=doc.excs.badinput,
doc='Raised if sha1_git is not well formed')
@doc.raises(exc=doc.excs.notfound,
doc='Raised if a revision matching sha1_git was not found in SWH')
@doc.returns(rettype=doc.rettypes.octet_stream,
retdoc="""The message of the revision identified by sha1_git
as a downloadable octet stream""")
def api_revision_raw_message(sha1_git):
"""Return the raw data of the message of revision identified by sha1_git
"""
raw = service.lookup_revision_message(sha1_git)
return app.response_class(raw['message'],
headers={'Content-disposition': 'attachment;'
'filename=rev_%s_raw' % sha1_git},
mimetype='application/octet-stream')
@app.route('/api/1/revision/<string:sha1_git>/directory/')
@app.route('/api/1/revision/<string:sha1_git>/directory/<path:dir_path>/')
@doc.route('/api/1/revision/directory/')
@doc.arg('sha1_git',
default='ec72c666fb345ea5f21359b7bc063710ce558e39',
argtype=doc.argtypes.sha1_git,
argdoc="The revision's sha1_git identifier.")
@doc.arg('dir_path',
default='.',
argtype=doc.argtypes.path,
argdoc='The path from the top level directory')
@doc.raises(exc=doc.excs.badinput,
doc='Raised if sha1_git is not well formed')
@doc.raises(exc=doc.excs.notfound,
doc="""Raised if a revision matching sha1_git was not found in SWH
, or if the path specified does not exist""")
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""The metadata of the directory pointed by revision id
sha1-git and dir_path""")
def api_revision_directory(sha1_git,
dir_path=None,
with_data=False):
"""Return information on directory pointed by revision with sha1_git.
If dir_path is not provided, display top level directory.
Otherwise, display the directory pointed by dir_path (if it exists).
"""
return _revision_directory_by(
{
'sha1_git': sha1_git
},
dir_path,
request.path,
with_data=with_data)
@app.route('/api/1/revision/<string:sha1_git>/log/')
@app.route('/api/1/revision/<string:sha1_git>/prev/<path:prev_sha1s>/log/')
@doc.route('/api/1/revision/log/')
@doc.arg('sha1_git',
default='ec72c666fb345ea5f21359b7bc063710ce558e39',
argtype=doc.argtypes.sha1_git,
argdoc='The sha1_git of the revision queried')
@doc.arg('prev_sha1s',
default='6adc4a22f20bbf3bbc754f1ec8c82be5dfb5c71a',
argtype=doc.argtypes.path,
argdoc='The navigation breadcrumbs -- use at your own risk!')
@doc.raises(exc=doc.excs.badinput,
doc='Raised if sha1_git or prev_sha1s is not well formed')
@doc.raises(exc=doc.excs.notfound,
doc='Raised if a revision matching sha1_git was not found in SWH')
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""The log data starting at the revision identified by
sha1_git, completed with the navigation breadcrumbs,
if any""")
def api_revision_log(sha1_git, prev_sha1s=None):
"""Show all revisions (~git log) starting from sha1_git.
The first element returned is the given sha1_git, or the first
breadcrumb, if any.
"""
limit = app.config['conf']['max_log_revs']
response = {'revisions': None, 'next_revs_url': None}
revisions = None
next_revs_url = None
def lookup_revision_log_with_limit(s, limit=limit+1):
return service.lookup_revision_log(s, limit)
error_msg = 'Revision with sha1_git %s not found.' % sha1_git
rev_get = _api_lookup(sha1_git,
lookup_fn=lookup_revision_log_with_limit,
error_msg_if_not_found=error_msg,
enrich_fn=utils.enrich_revision)
if len(rev_get) == limit+1:
rev_backward = rev_get[:-1]
next_revs_url = url_for('api_revision_log',
sha1_git=rev_get[-1]['id'])
else:
rev_backward = rev_get
if not prev_sha1s: # no nav breadcrumbs, so we're done
revisions = rev_backward
else:
rev_forward_ids = prev_sha1s.split('/')
rev_forward = _api_lookup(rev_forward_ids,
lookup_fn=service.lookup_revision_multiple,
error_msg_if_not_found=error_msg,
enrich_fn=utils.enrich_revision)
revisions = rev_forward + rev_backward
response['revisions'] = revisions
response['next_revs_url'] = next_revs_url
return response
@app.route('/api/1/revision'
'/origin/<int:origin_id>/log/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>/log/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/ts/<string:ts>/log/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/ts/<string:ts>/log/')
@doc.route('/api/1/revision/origin/log/')
@doc.arg('origin_id',
default=1,
argtype=doc.argtypes.int,
argdoc="The revision's SWH origin identifier")
@doc.arg('branch_name',
default='refs/heads/master',
argtype=doc.argtypes.path,
argdoc="The revision's branch name within the origin specified")
@doc.arg('ts',
default='2000-01-17T11:23:54+00:00',
argtype=doc.argtypes.ts,
argdoc="""A time or timestamp string to parse""")
@doc.raises(exc=doc.excs.notfound,
doc="""Raised if a revision matching the given criteria was not
found in SWH""")
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""The metadata of the revision log starting at the revision
matching the given criteria.""")
def api_revision_log_by(origin_id,
branch_name='refs/heads/master',
ts=None):
"""Show all revisions (~git log) starting from the revision
described by its origin_id, optional branch name and timestamp.
The first element returned is the described revision.
"""
limit = app.config['conf']['max_log_revs']
response = {'revisions': None, 'next_revs_url': None}
next_revs_url = None
if ts:
ts = utils.parse_timestamp(ts)
def lookup_revision_log_by_with_limit(o_id, br, ts, limit=limit+1):
return service.lookup_revision_log_by(o_id, br, ts, limit)
error_msg = 'No revision matching origin %s ' % origin_id
error_msg += ', branch name %s' % branch_name
error_msg += (' and time stamp %s.' % ts) if ts else '.'
rev_get = _api_lookup(origin_id,
lookup_revision_log_by_with_limit,
error_msg,
utils.enrich_revision,
branch_name,
ts)
if len(rev_get) == limit+1:
revisions = rev_get[:-1]
next_revs_url = url_for('api_revision_log',
sha1_git=rev_get[-1]['id'])
else:
revisions = rev_get
response['revisions'] = revisions
response['next_revs_url'] = next_revs_url
return response
@app.route('/api/1/directory/<string:sha1_git>/')
@app.route('/api/1/directory/<string:sha1_git>/<path:path>/')
@doc.route('/api/1/directory/')
@doc.arg('sha1_git',
default='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc',
argtype=doc.argtypes.sha1_git,
argdoc="The queried directory's corresponding sha1_git hash")
@doc.arg('path',
default='.',
argtype=doc.argtypes.path,
argdoc="A path relative to the queried directory's top level")
@doc.raises(exc=doc.excs.badinput,
doc='Raised if sha1_git is not well formed')
@doc.raises(exc=doc.excs.notfound,
doc='Raised if a directory matching sha1_git was not found in SWH')
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""The metadata and contents of the release identified by
sha1_git""")
def api_directory(sha1_git,
path=None):
"""Return information about release with id sha1_git.
"""
if path:
error_msg_path = ('Entry with path %s relative to directory '
'with sha1_git %s not found.') % (path, sha1_git)
return _api_lookup(
sha1_git,
service.lookup_directory_with_path,
error_msg_path,
utils.enrich_directory,
path)
else:
error_msg_nopath = 'Directory with sha1_git %s not found.' % sha1_git
return _api_lookup(
sha1_git,
service.lookup_directory,
error_msg_nopath,
utils.enrich_directory)
@app.route('/api/1/provenance/<string:q>/')
@doc.route('/api/1/provenance/')
@doc.arg('q',
default='sha1_git:88b9b366facda0b5ff8d8640ee9279bed346f242',
argtype=doc.argtypes.algo_and_hash,
argdoc="""The queried content's corresponding hash (supported hash
algorithms: sha1_git, sha1, sha256)""")
@doc.raises(exc=doc.excs.badinput,
doc="""Raised if hash algorithm is incorrect or if the hash
value is badly formatted.""")
@doc.raises(exc=doc.excs.notfound,
doc="""Raised if a content matching the hash was not found
in SWH""")
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""List of provenance information (dict) for the matched
content.""")
def api_content_provenance(q):
"""Return content's provenance information if any.
"""
def _enrich_revision(provenance):
p = provenance.copy()
p['revision_url'] = url_for('api_revision',
sha1_git=provenance['revision'])
p['content_url'] = url_for('api_content_metadata',
q='sha1_git:%s' % provenance['content'])
p['origin_url'] = url_for('api_origin',
origin_id=provenance['origin'])
return p
return _api_lookup(
q,
lookup_fn=service.lookup_content_provenance,
error_msg_if_not_found='Content with %s not found.' % q,
enrich_fn=_enrich_revision)
@app.route('/api/1/content/<string:q>/raw/')
@doc.route('/api/1/content/raw/')
@doc.arg('q',
default='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc',
argtype=doc.argtypes.algo_and_hash,
argdoc="""An algo_hash:hash string, where algo_hash is one of sha1,
sha1_git or sha256 and hash is the hash to search for in SWH. Defaults
to sha1 in the case of a missing algo_hash
""")
@doc.raises(exc=doc.excs.badinput,
doc='Raised if q is not well formed')
@doc.raises(exc=doc.excs.notfound,
doc='Raised if a content matching q was not found in SWH')
@doc.returns(rettype=doc.rettypes.octet_stream,
retdoc='The raw content data as an octet stream')
def api_content_raw(q):
"""Return content's raw data if content is found.
"""
def generate(content):
yield content['data']
content = service.lookup_content_raw(q)
if not content:
raise NotFoundExc('Content with %s not found.' % q)
return app.response_class(generate(content),
headers={'Content-disposition': 'attachment;'
'filename=content_%s_raw' % q},
mimetype='application/octet-stream')
@app.route('/api/1/content/<string:q>/')
@doc.route('/api/1/content/')
@doc.arg('q',
default='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc',
argtype=doc.argtypes.algo_and_hash,
argdoc="""An algo_hash:hash string, where algo_hash is one of sha1,
sha1_git or sha256 and hash is the hash to search for in SWH. Defaults
to sha1 in the case of a missing algo_hash
""")
@doc.raises(exc=doc.excs.badinput,
doc='Raised if q is not well formed')
@doc.raises(exc=doc.excs.notfound,
doc='Raised if a content matching q was not found in SWH')
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""The metadata of the content identified by q. If content
decoding was successful, it also returns the data""")
def api_content_metadata(q):
"""Return content information if content is found.
"""
return _api_lookup(
q,
lookup_fn=service.lookup_content,
error_msg_if_not_found='Content with %s not found.' % q,
enrich_fn=utils.enrich_content)
@app.route('/api/1/entity/<string:uuid>/')
@doc.route('/api/1/entity/')
@doc.arg('uuid',
default='5f4d4c51-498a-4e28-88b3-b3e4e8396cba',
argtype=doc.argtypes.uuid,
argdoc="The entity's uuid identifier")
@doc.raises(exc=doc.excs.badinput,
doc='Raised if uuid is not well formed')
@doc.raises(exc=doc.excs.notfound,
doc='Raised if an entity matching uuid was not found in SWH')
@doc.returns(rettype=doc.rettypes.dict,
retdoc='The metadata of the entity identified by uuid')
def api_entity_by_uuid(uuid):
"""Return content information if content is found.
"""
return _api_lookup(
uuid,
lookup_fn=service.lookup_entity_by_uuid,
error_msg_if_not_found="Entity with uuid '%s' not found." % uuid,
enrich_fn=utils.enrich_entity)

File Metadata

Mime Type
text/x-diff
Expires
Sat, Jun 21, 9:05 PM (4 w, 11 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3280252

Event Timeline