Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9125592
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
216 KB
Subscribers
None
View Options
diff --git a/swh/web/ui/backend.py b/swh/web/ui/backend.py
index b9bb9b74..dab5b187 100644
--- a/swh/web/ui/backend.py
+++ b/swh/web/ui/backend.py
@@ -1,277 +1,291 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
from . import main
def content_get(sha1_bin):
"""Lookup the content designed by {algo: hash_bin}.
Args:
sha1_bin: content's binary sha1.
Returns:
Content as dict with 'sha1' and 'data' keys.
data representing its raw data.
"""
contents = main.storage().content_get([sha1_bin])
if contents and len(contents) >= 1:
return contents[0]
return None
def content_find(algo, hash_bin):
"""Retrieve the content with binary hash hash_bin
Args:
algo: nature of the hash hash_bin.
hash_bin: content's hash searched for.
Returns:
A triplet (sha1, sha1_git, sha256) if the content exist
or None otherwise.
"""
return main.storage().content_find({algo: hash_bin})
def content_find_provenance(algo, hash_bin):
"""Find the content's provenance information.
Args:
algo: nature of the hash hash_bin.
hash_bin: content's hash corresponding to algo searched for.
Yields:
Yields the list of provenance information for that content if
any (this can be empty if the cache is not populated)
"""
return main.storage().content_find_provenance({algo: hash_bin})
def content_missing_per_sha1(sha1list):
"""List content missing from storage based on sha1
Args:
sha1s: Iterable of sha1 to check for absence
Returns:
an iterable of sha1s missing from the storage
"""
return main.storage().content_missing_per_sha1(sha1list)
def directory_get(sha1_bin):
"""Retrieve information on one directory.
Args:
sha1_bin: Directory's identifier
Returns:
The directory's information.
"""
res = main.storage().directory_get([sha1_bin])
if res and len(res) >= 1:
return res[0]
def origin_get(origin):
"""Return information about the origin matching dict origin.
Args:
origin: origin's dict with keys either 'id' or
('type' AND 'url')
Returns:
Origin information as dict.
"""
return main.storage().origin_get(origin)
def person_get(person_id):
"""Return information about the person with id person_id.
Args:
person_id: person's identifier.v
Returns:
Person information as dict.
"""
res = main.storage().person_get([person_id])
if res and len(res) >= 1:
return res[0]
def directory_ls(sha1_git_bin, recursive=False):
"""Return information about the directory with id sha1_git.
Args:
sha1_git: directory's identifier.
recursive: Optional recursive flag default to False
Returns:
Directory information as dict.
"""
directory_entries = main.storage().directory_ls(sha1_git_bin, recursive)
if not directory_entries:
return []
return directory_entries
def release_get(sha1_git_bin):
"""Return information about the release with sha1 sha1_git_bin.
Args:
sha1_git_bin: The release's sha1 as bytes.
Returns:
Release information as dict if found, None otherwise.
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
res = main.storage().release_get([sha1_git_bin])
if res and len(res) >= 1:
return res[0]
return None
def revision_get(sha1_git_bin):
"""Return information about the revision with sha1 sha1_git_bin.
Args:
sha1_git_bin: The revision's sha1 as bytes.
Returns:
Revision information as dict if found, None otherwise.
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
res = main.storage().revision_get([sha1_git_bin])
if res and len(res) >= 1:
return res[0]
return None
def revision_get_multiple(sha1_git_bin_list):
"""Return information about the revisions in sha1_git_bin_list
Args:
sha1_git_bin_list: The revisions' sha1s as a list of bytes.
Returns:
Revisions' information as an iterable of dicts if any found,
an empty list otherwise
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
res = main.storage().revision_get(sha1_git_bin_list)
if res and len(res) >= 1:
return res
return []
def revision_log(sha1_git_bin, limit):
"""Return information about the revision with sha1 sha1_git_bin.
Args:
sha1_git_bin: The revision's sha1 as bytes.
limit: the maximum number of revisions returned.
Returns:
Revision information as dict if found, None otherwise.
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
return main.storage().revision_log([sha1_git_bin], limit)
def revision_log_by(origin_id, branch_name, ts, limit):
"""Return information about the revision matching the timestamp
ts, from origin origin_id, in branch branch_name.
Args:
origin_id: origin of the revision
- branch_name: revision's branch.
- timestamp: revision's time frame.
Returns:
Information for the revision matching the criterions.
"""
return main.storage().revision_log_by(origin_id,
branch_name,
ts,
limit=limit)
def stat_counters():
"""Return the stat counters for Software Heritage
Returns:
A dict mapping textual labels to integer values.
"""
return main.storage().stat_counters()
def lookup_origin_visits(origin_id):
"""Yields the origin origin_ids' visits.
Args:
origin_id: origin to list visits for
Yields:
Dictionaries of origin_visit for that origin
"""
yield from main.storage().origin_visit_get(origin_id)
+def lookup_origin_visit(origin_id, visit_id):
+ """Return information about visit visit_id with origin origin_id.
+
+ Args:
+ origin_id: origin concerned by the visit
+ visit_id: the visit identifier to lookup
+
+ Yields:
+ The dict origin_visit concerned
+
+ """
+ return main.storage().origin_visit_get_by(origin_id, visit_id)
+
+
def revision_get_by(origin_id, branch_name, timestamp):
"""Return occurrence information matching the criterions origin_id,
branch_name, ts.
"""
res = main.storage().revision_get_by(origin_id,
branch_name,
timestamp=timestamp,
limit=1)
if not res:
return None
return res[0]
def directory_entry_get_by_path(directory, path):
"""Return a directory entry by its path.
"""
paths = path.strip(os.path.sep).split(os.path.sep)
return main.storage().directory_entry_get_by_path(
directory,
list(map(lambda p: p.encode('utf-8'), paths)))
def entity_get(uuid):
"""Retrieve the entity per its uuid.
"""
return main.storage().entity_get(uuid)
diff --git a/swh/web/ui/service.py b/swh/web/ui/service.py
index 1b3f72d1..33cef982 100644
--- a/swh/web/ui/service.py
+++ b/swh/web/ui/service.py
@@ -1,630 +1,646 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import defaultdict
from swh.core import hashutil
from swh.web.ui import converters, query, backend
from swh.web.ui.exc import NotFoundExc
def lookup_multiple_hashes(hashes):
"""Lookup the passed hashes in a single DB connection, using batch
processing.
Args:
An array of {filename: X, sha1: Y}, string X, hex sha1 string Y.
Returns:
The same array with elements updated with elem['found'] = true if
the hash is present in storage, elem['found'] = false if not.
"""
hashlist = [hashutil.hex_to_hash(elem['sha1']) for elem in hashes]
content_missing = backend.content_missing_per_sha1(hashlist)
missing = [hashutil.hash_to_hex(x) for x in content_missing]
for x in hashes:
x.update({'found': True})
for h in hashes:
if h['sha1'] in missing:
h['found'] = False
return hashes
def lookup_hash(q):
"""Checks if the storage contains a given content checksum
Args: query string of the form <hash_algo:hash>
Returns: Dict with key found containing the hash info if the
hash is present, None if not.
"""
algo, hash = query.parse_hash(q)
found = backend.content_find(algo, hash)
return {'found': found,
'algo': algo}
def search_hash(q):
"""Checks if the storage contains a given content checksum
Args: query string of the form <hash_algo:hash>
Returns: Dict with key found to True or False, according to
whether the checksum is present or not
"""
algo, hash = query.parse_hash(q)
found = backend.content_find(algo, hash)
return {'found': found is not None}
def lookup_content_provenance(q):
"""Return provenance information from a specified content.
Args:
q: query string of the form <hash_algo:hash>
Yields:
provenance information (dict) list if the content is found.
"""
algo, hash = query.parse_hash(q)
provenances = backend.content_find_provenance(algo, hash)
if not provenances:
return None
return (converters.from_provenance(p) for p in provenances)
def lookup_origin(origin):
"""Return information about the origin matching dict origin.
Args:
origin: origin's dict with keys either 'id' or
('type' AND 'url')
Returns:
origin information as dict.
"""
return backend.origin_get(origin)
def lookup_person(person_id):
"""Return information about the person with id person_id.
Args:
person_id as string
Returns:
person information as dict.
"""
person = backend.person_get(person_id)
return converters.from_person(person)
def lookup_directory(sha1_git):
"""Return information about the directory with id sha1_git.
Args:
sha1_git as string
Returns:
directory information as dict.
"""
_, sha1_git_bin = query.parse_hash_with_algorithms_or_throws(
sha1_git,
['sha1'], # HACK: sha1_git really
'Only sha1_git is supported.')
dir = backend.directory_get(sha1_git_bin)
if not dir:
return None
directory_entries = backend.directory_ls(sha1_git_bin)
return map(converters.from_directory_entry, directory_entries)
def lookup_directory_with_path(directory_sha1_git, path_string):
"""Return directory information for entry with path path_string w.r.t.
root directory pointed by directory_sha1_git
Args:
- directory_sha1_git: sha1_git corresponding to the directory
to which we append paths to (hopefully) find the entry
- the relative path to the entry starting from the directory pointed by
directory_sha1_git
Raises:
NotFoundExc if the directory entry is not found
"""
_, sha1_git_bin = query.parse_hash_with_algorithms_or_throws(
directory_sha1_git,
['sha1'],
'Only sha1_git is supported.')
queried_dir = backend.directory_entry_get_by_path(
sha1_git_bin, path_string)
if not queried_dir:
raise NotFoundExc(('Directory entry with path %s from %s not found') %
(path_string, directory_sha1_git))
return converters.from_directory_entry(queried_dir)
def lookup_release(release_sha1_git):
"""Return information about the release with sha1 release_sha1_git.
Args:
release_sha1_git: The release's sha1 as hexadecimal
Returns:
Release information as dict.
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
_, sha1_git_bin = query.parse_hash_with_algorithms_or_throws(
release_sha1_git,
['sha1'],
'Only sha1_git is supported.')
res = backend.release_get(sha1_git_bin)
return converters.from_release(res)
def lookup_revision(rev_sha1_git):
"""Return information about the revision with sha1 revision_sha1_git.
Args:
revision_sha1_git: The revision's sha1 as hexadecimal
Returns:
Revision information as dict.
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
_, sha1_git_bin = query.parse_hash_with_algorithms_or_throws(
rev_sha1_git,
['sha1'],
'Only sha1_git is supported.')
revision = backend.revision_get(sha1_git_bin)
return converters.from_revision(revision)
def lookup_revision_multiple(sha1_git_list):
"""Return information about the revision with sha1 revision_sha1_git.
Args:
revision_sha1_git: The revision's sha1 as hexadecimal
Returns:
Revision information as dict.
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
def to_sha1_bin(sha1_hex):
_, sha1_git_bin = query.parse_hash_with_algorithms_or_throws(
sha1_hex,
['sha1'],
'Only sha1_git is supported.')
return sha1_git_bin
sha1_bin_list = (to_sha1_bin(x) for x in sha1_git_list)
revisions = backend.revision_get_multiple(sha1_bin_list)
return (converters.from_revision(x) for x in revisions)
def lookup_revision_message(rev_sha1_git):
"""Return the raw message of the revision with sha1 revision_sha1_git.
Args:
revision_sha1_git: The revision's sha1 as hexadecimal
Returns:
Decoded revision message as dict {'message': <the_message>}
Raises:
ValueError if the identifier provided is not of sha1 nature.
NotFoundExc if the revision is not found, or if it has no message
"""
_, sha1_git_bin = query.parse_hash_with_algorithms_or_throws(
rev_sha1_git,
['sha1'],
'Only sha1_git is supported.')
revision = backend.revision_get(sha1_git_bin)
if not revision:
raise NotFoundExc('Revision with sha1_git %s not found.'
% rev_sha1_git)
if 'message' not in revision:
raise NotFoundExc('No message for revision with sha1_git %s.'
% rev_sha1_git)
res = {'message': revision['message']}
return res
def lookup_revision_by(origin_id,
branch_name="refs/heads/master",
timestamp=None):
"""Lookup revisions by origin_id, branch_name and timestamp.
If:
- branch_name is not provided, lookup using 'refs/heads/master' as default.
- ts is not provided, use the most recent
Args:
- origin_id: origin of the revision.
- branch_name: revision's branch.
- timestamp: revision's time frame.
Yields:
The revisions matching the criterions.
"""
res = backend.revision_get_by(origin_id, branch_name, timestamp)
return converters.from_revision(res)
def lookup_revision_log(rev_sha1_git, limit):
"""Return information about the revision with sha1 revision_sha1_git.
Args:
revision_sha1_git: The revision's sha1 as hexadecimal
limit: the maximum number of revisions returned
Returns:
Revision information as dict.
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
_, sha1_git_bin = query.parse_hash_with_algorithms_or_throws(
rev_sha1_git,
['sha1'],
'Only sha1_git is supported.')
revision_entries = backend.revision_log(sha1_git_bin, limit)
return map(converters.from_revision, revision_entries)
def lookup_revision_log_by(origin_id, branch_name, timestamp, limit):
"""Return information about the revision with sha1 revision_sha1_git.
Args:
origin_id: origin of the revision
branch_name: revision's branch
timestamp: revision's time frame
limit: the maximum number of revisions returned
Returns:
Revision information as dict.
Raises:
NotFoundExc if no revision corresponds to the criterion
NotFoundExc if the corresponding revision has no log
"""
revision_entries = backend.revision_log_by(origin_id,
branch_name,
timestamp,
limit)
if not revision_entries:
return None
return map(converters.from_revision, revision_entries)
def lookup_revision_with_context_by(origin_id, branch_name, ts, sha1_git,
limit=100):
"""Return information about revision sha1_git, limited to the
sub-graph of all transitive parents of sha1_git_root.
sha1_git_root being resolved through the lookup of a revision by origin_id,
branch_name and ts.
In other words, sha1_git is an ancestor of sha1_git_root.
Args:
- origin_id: origin of the revision.
- branch_name: revision's branch.
- timestamp: revision's time frame.
- sha1_git: one of sha1_git_root's ancestors.
- limit: limit the lookup to 100 revisions back.
Returns:
Pair of (root_revision, revision).
Information on sha1_git if it is an ancestor of sha1_git_root
including children leading to sha1_git_root
Raises:
- BadInputExc in case of unknown algo_hash or bad hash.
- NotFoundExc if either revision is not found or if sha1_git is not an
ancestor of sha1_git_root.
"""
rev_root = backend.revision_get_by(origin_id, branch_name, ts)
if not rev_root:
raise NotFoundExc('Revision with (origin_id: %s, branch_name: %s'
', ts: %s) not found.' % (origin_id,
branch_name,
ts))
return (converters.from_revision(rev_root),
lookup_revision_with_context(rev_root, sha1_git, limit))
def lookup_revision_with_context(sha1_git_root, sha1_git, limit=100):
"""Return information about revision sha1_git, limited to the
sub-graph of all transitive parents of sha1_git_root.
In other words, sha1_git is an ancestor of sha1_git_root.
Args:
sha1_git_root: latest revision. The type is either a sha1 (as an hex
string) or a non converted dict.
sha1_git: one of sha1_git_root's ancestors
limit: limit the lookup to 100 revisions back
Returns:
Information on sha1_git if it is an ancestor of sha1_git_root
including children leading to sha1_git_root
Raises:
BadInputExc in case of unknown algo_hash or bad hash
NotFoundExc if either revision is not found or if sha1_git is not an
ancestor of sha1_git_root
"""
_, sha1_git_bin = query.parse_hash_with_algorithms_or_throws(
sha1_git,
['sha1'],
'Only sha1_git is supported.')
revision = backend.revision_get(sha1_git_bin)
if not revision:
raise NotFoundExc('Revision %s not found' % sha1_git)
if isinstance(sha1_git_root, str):
_, sha1_git_root_bin = query.parse_hash_with_algorithms_or_throws(
sha1_git_root,
['sha1'],
'Only sha1_git is supported.')
revision_root = backend.revision_get(sha1_git_root_bin)
if not revision_root:
raise NotFoundExc('Revision root %s not found' % sha1_git_root)
else:
sha1_git_root_bin = sha1_git_root['id']
revision_log = backend.revision_log(sha1_git_root_bin, limit)
parents = {}
children = defaultdict(list)
for rev in revision_log:
rev_id = rev['id']
parents[rev_id] = []
for parent_id in rev['parents']:
parents[rev_id].append(parent_id)
children[parent_id].append(rev_id)
if revision['id'] not in parents:
raise NotFoundExc('Revision %s is not an ancestor of %s' %
(sha1_git, sha1_git_root))
revision['children'] = children[revision['id']]
return converters.from_revision(revision)
def lookup_directory_with_revision(sha1_git, dir_path=None, with_data=False):
"""Return information on directory pointed by revision with sha1_git.
If dir_path is not provided, display top level directory.
Otherwise, display the directory pointed by dir_path (if it exists).
Args:
sha1_git: revision's hash.
dir_path: optional directory pointed to by that revision.
with_data: boolean that indicates to retrieve the raw data if the path
resolves to a content. Default to False (for the api)
Returns:
Information on the directory pointed to by that revision.
Raises:
BadInputExc in case of unknown algo_hash or bad hash.
NotFoundExc either if the revision is not found or the path referenced
does not exist.
NotImplementedError in case of dir_path exists but do not reference a
type 'dir' or 'file'.
"""
_, sha1_git_bin = query.parse_hash_with_algorithms_or_throws(
sha1_git,
['sha1'],
'Only sha1_git is supported.')
revision = backend.revision_get(sha1_git_bin)
if not revision:
raise NotFoundExc('Revision %s not found' % sha1_git)
dir_sha1_git_bin = revision['directory']
if dir_path:
entity = backend.directory_entry_get_by_path(dir_sha1_git_bin,
dir_path)
if not entity:
raise NotFoundExc(
"Directory or File '%s' pointed to by revision %s not found"
% (dir_path, sha1_git))
else:
entity = {'type': 'dir', 'target': dir_sha1_git_bin}
if entity['type'] == 'dir':
directory_entries = backend.directory_ls(entity['target'])
return {'type': 'dir',
'path': '.' if not dir_path else dir_path,
'revision': sha1_git,
'content': map(converters.from_directory_entry,
directory_entries)}
elif entity['type'] == 'file': # content
content = backend.content_find('sha1_git', entity['target'])
if with_data:
content['data'] = backend.content_get(content['sha1'])['data']
return {'type': 'file',
'path': '.' if not dir_path else dir_path,
'revision': sha1_git,
'content': converters.from_content(content)}
else:
raise NotImplementedError('Entity of type %s not implemented.'
% entity['type'])
def lookup_content(q):
"""Lookup the content designed by q.
Args:
q: The release's sha1 as hexadecimal
"""
algo, hash = query.parse_hash(q)
c = backend.content_find(algo, hash)
return converters.from_content(c)
def lookup_content_raw(q):
"""Lookup the content defined by q.
Args:
q: query string of the form <hash_algo:hash>
Returns:
dict with 'sha1' and 'data' keys.
data representing its raw data decoded.
"""
algo, hash = query.parse_hash(q)
c = backend.content_find(algo, hash)
if not c:
return None
content = backend.content_get(c['sha1'])
return converters.from_content(content)
def stat_counters():
"""Return the stat counters for Software Heritage
Returns:
A dict mapping textual labels to integer values.
"""
return backend.stat_counters()
def lookup_origin_visits(origin_id):
"""Yields the origin origin_ids' visits.
Args:
origin_id: origin to list visits for
Yields:
Dictionaries of origin_visit for that origin
"""
- for visit in backend.lookup_origin_visits(origin_id):
+ visits = backend.lookup_origin_visits(origin_id)
+ for visit in visits:
yield converters.from_origin_visit(visit)
+def lookup_origin_visit(origin_id, visit_id):
+ """Return information about visit visit_id with origin origin_id.
+
+ Args:
+ origin_id: origin concerned by the visit
+ visit_id: the visit identifier to lookup
+
+ Yields:
+ The dict origin_visit concerned
+
+ """
+ visit = backend.lookup_origin_visit(origin_id, visit_id)
+ return converters.from_origin_visit(visit)
+
+
def lookup_entity_by_uuid(uuid):
"""Return the entity's hierarchy from its uuid.
Args:
uuid: entity's identifier.
Returns:
List of hierarchy entities from the entity with uuid.
"""
uuid = query.parse_uuid4(uuid)
return backend.entity_get(uuid)
def lookup_revision_through(revision, limit=100):
"""Retrieve a revision from the criterion stored in revision dictionary.
Args:
revision: Dictionary of criterion to lookup the revision with.
Here are the supported combination of possible values:
- origin_id, branch_name, ts, sha1_git
- origin_id, branch_name, ts
- sha1_git_root, sha1_git
- sha1_git
Returns:
None if the revision is not found or the actual revision.
"""
if 'origin_id' in revision and \
'branch_name' in revision and \
'ts' in revision and \
'sha1_git' in revision:
return lookup_revision_with_context_by(revision['origin_id'],
revision['branch_name'],
revision['ts'],
revision['sha1_git'],
limit)
if 'origin_id' in revision and \
'branch_name' in revision and \
'ts' in revision:
return lookup_revision_by(revision['origin_id'],
revision['branch_name'],
revision['ts'])
if 'sha1_git_root' in revision and \
'sha1_git' in revision:
return lookup_revision_with_context(revision['sha1_git_root'],
revision['sha1_git'],
limit)
if 'sha1_git' in revision:
return lookup_revision(revision['sha1_git'])
# this should not happen
raise NotImplementedError('Should not happen!')
def lookup_directory_through_revision(revision, path=None,
limit=100, with_data=False):
"""Retrieve the directory information from the revision.
Args:
revision: dictionary of criterion representing a revision to lookup
path: directory's path to lookup.
limit: optional query parameter to limit the revisions log.
(default to 100). For now, note that this limit could impede the
transitivity conclusion about sha1_git not being an ancestor of.
with_data: indicate to retrieve the content's raw data if path resolves
to a content.
Returns:
The directory pointing to by the revision criterions at path.
"""
rev = lookup_revision_through(revision, limit)
if not rev:
raise NotFoundExc('Revision with criterion %s not found!' % revision)
return (rev['id'],
lookup_directory_with_revision(rev['id'], path, with_data))
diff --git a/swh/web/ui/tests/test_backend.py b/swh/web/ui/tests/test_backend.py
index 05fe58ea..f855c621 100644
--- a/swh/web/ui/tests/test_backend.py
+++ b/swh/web/ui/tests/test_backend.py
@@ -1,721 +1,754 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
from nose.tools import istest
from unittest.mock import MagicMock
from swh.core import hashutil
from swh.web.ui import backend
from swh.web.ui.tests import test_app
class BackendTestCase(test_app.SWHApiTestCase):
+ def setUp(self):
+ self.origin_visit1 = {
+ 'date': datetime.datetime(
+ 2015, 1, 1, 22, 0, 0,
+ tzinfo=datetime.timezone.utc),
+ 'origin': 1,
+ 'visit': 1
+ }
+
@istest
def content_get_ko_not_found_1(self):
# given
sha1_bin = hashutil.hex_to_hash(
'456caf10e9535160d90e874b45aa426de762f777')
self.storage.content_get = MagicMock(return_value=None)
# when
actual_content = backend.content_get(sha1_bin)
# then
self.assertIsNone(actual_content)
self.storage.content_get.assert_called_once_with(
[sha1_bin])
@istest
def content_get_ko_not_found_empty_result(self):
# given
sha1_bin = hashutil.hex_to_hash(
'456caf10e9535160d90e874b45aa426de762f19f')
self.storage.content_get = MagicMock(return_value=[])
# when
actual_content = backend.content_get(sha1_bin)
# then
self.assertIsNone(actual_content)
self.storage.content_get.assert_called_once_with(
[sha1_bin])
@istest
def content_get(self):
# given
sha1_bin = hashutil.hex_to_hash(
'123caf10e9535160d90e874b45aa426de762f19f')
stub_contents = [{
'sha1': sha1_bin,
'data': b'binary data',
},
{}]
self.storage.content_get = MagicMock(return_value=stub_contents)
# when
actual_content = backend.content_get(sha1_bin)
# then
self.assertEquals(actual_content, stub_contents[0])
self.storage.content_get.assert_called_once_with(
[sha1_bin])
@istest
def content_find_ko_no_result(self):
# given
sha1_bin = hashutil.hex_to_hash(
'123caf10e9535160d90e874b45aa426de762f19f')
self.storage.content_find = MagicMock(return_value=None)
# when
actual_lookup = backend.content_find('sha1_git', sha1_bin)
# then
self.assertIsNone(actual_lookup)
self.storage.content_find.assert_called_once_with(
{'sha1_git': sha1_bin})
@istest
def content_find(self):
# given
sha1_bin = hashutil.hex_to_hash(
'456caf10e9535160d90e874b45aa426de762f19f')
self.storage.content_find = MagicMock(return_value=(1, 2, 3))
# when
actual_content = backend.content_find('sha1', sha1_bin)
# then
self.assertEquals(actual_content, (1, 2, 3))
# check the function has been called with parameters
self.storage.content_find.assert_called_with({'sha1': sha1_bin})
@istest
def content_find_provenance_ko_no_result(self):
# given
sha1_bin = hashutil.hex_to_hash(
'123caf10e9535160d90e874b45aa426de762f19f')
self.storage.content_find_provenance = MagicMock(
return_value=(x for x in []))
# when
actual_lookup = backend.content_find_provenance('sha1_git', sha1_bin)
# then
self.assertEquals(list(actual_lookup), [])
self.storage.content_find_provenance.assert_called_once_with(
{'sha1_git': sha1_bin})
@istest
def content_find_provenance(self):
# given
sha1_bin = hashutil.hex_to_hash(
'456caf10e9535160d90e874b45aa426de762f19f')
self.storage.content_find_provenance = MagicMock(
return_value=(x for x in (1, 2, 3)))
# when
actual_content = backend.content_find_provenance('sha1', sha1_bin)
# then
self.assertEquals(list(actual_content), [1, 2, 3])
# check the function has been called with parameters
self.storage.content_find_provenance.assert_called_with(
{'sha1': sha1_bin})
@istest
def content_missing_per_sha1_none(self):
# given
sha1s_bin = [hashutil.hex_to_hash(
'456caf10e9535160d90e874b45aa426de762f19f'),
hashutil.hex_to_hash(
'745bab676c8f3cec8016e0c39ea61cf57e518865'
)]
self.storage.content_missing_per_sha1 = MagicMock(return_value=[])
# when
actual_content = backend.content_missing_per_sha1(sha1s_bin)
# then
self.assertEquals(actual_content, [])
self.storage.content_missing_per_sha1.assert_called_with(sha1s_bin)
@istest
def content_missing_per_sha1_some(self):
# given
sha1s_bin = [hashutil.hex_to_hash(
'456caf10e9535160d90e874b45aa426de762f19f'),
hashutil.hex_to_hash(
'745bab676c8f3cec8016e0c39ea61cf57e518865'
)]
self.storage.content_missing_per_sha1 = MagicMock(return_value=[
hashutil.hex_to_hash(
'745bab676c8f3cec8016e0c39ea61cf57e518865'
)])
# when
actual_content = backend.content_missing_per_sha1(sha1s_bin)
# then
self.assertEquals(actual_content, [hashutil.hex_to_hash(
'745bab676c8f3cec8016e0c39ea61cf57e518865'
)])
self.storage.content_missing_per_sha1.assert_called_with(sha1s_bin)
@istest
def origin_get_by_id(self):
# given
self.storage.origin_get = MagicMock(return_value={
'id': 'origin-id',
'lister': 'uuid-lister',
'project': 'uuid-project',
'url': 'ftp://some/url/to/origin',
'type': 'ftp'})
# when
actual_origin = backend.origin_get({'id': 'origin-id'})
# then
self.assertEqual(actual_origin, {'id': 'origin-id',
'lister': 'uuid-lister',
'project': 'uuid-project',
'url': 'ftp://some/url/to/origin',
'type': 'ftp'})
self.storage.origin_get.assert_called_with({'id': 'origin-id'})
@istest
def origin_get_by_type_url(self):
# given
self.storage.origin_get = MagicMock(return_value={
'id': 'origin-id',
'lister': 'uuid-lister',
'project': 'uuid-project',
'url': 'ftp://some/url/to/origin',
'type': 'ftp'})
# when
actual_origin = backend.origin_get({'type': 'ftp',
'url': 'ftp://some/url/to/origin'})
# then
self.assertEqual(actual_origin, {'id': 'origin-id',
'lister': 'uuid-lister',
'project': 'uuid-project',
'url': 'ftp://some/url/to/origin',
'type': 'ftp'})
self.storage.origin_get.assert_called_with(
{'type': 'ftp',
'url': 'ftp://some/url/to/origin'})
@istest
def person_get(self):
# given
self.storage.person_get = MagicMock(return_value=[{
'id': 'person-id',
'name': 'blah'}])
# when
actual_person = backend.person_get('person-id')
# then
self.assertEqual(actual_person, {'id': 'person-id',
'name': 'blah'})
self.storage.person_get.assert_called_with(['person-id'])
@istest
def directory_get_not_found(self):
# given
sha1_bin = hashutil.hex_to_hash(
'40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
self.storage.directory_get = MagicMock(return_value=None)
# when
actual_directory = backend.directory_get(sha1_bin)
# then
self.assertEquals(actual_directory, None)
self.storage.directory_get.assert_called_with([sha1_bin])
@istest
def directory_get(self):
# given
sha1_bin = hashutil.hex_to_hash(
'51f71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
sha1_bin2 = hashutil.hex_to_hash(
'62071b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
stub_dir = {'id': sha1_bin, 'revision': b'sha1-blah'}
stub_dir2 = {'id': sha1_bin2, 'revision': b'sha1-foobar'}
self.storage.directory_get = MagicMock(return_value=[stub_dir,
stub_dir2])
# when
actual_directory = backend.directory_get(sha1_bin)
# then
self.assertEquals(actual_directory, stub_dir)
self.storage.directory_get.assert_called_with([sha1_bin])
@istest
def directory_ls_empty_result(self):
# given
sha1_bin = hashutil.hex_to_hash(
'40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
self.storage.directory_ls = MagicMock(return_value=[])
# when
actual_directory = backend.directory_ls(sha1_bin)
# then
self.assertEquals(actual_directory, [])
self.storage.directory_ls.assert_called_with(sha1_bin, False)
@istest
def directory_ls(self):
# given
sha1_bin = hashutil.hex_to_hash(
'40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
stub_dir_entries = [{
'sha1': hashutil.hex_to_hash('5c6f0e2750f48fa0bd0c4cf5976ba0b9e0'
'2ebda5'),
'sha256': hashutil.hex_to_hash('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926'),
'sha1_git': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'target': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'dir_id': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'name': b'bob',
'type': 10,
}]
self.storage.directory_ls = MagicMock(
return_value=stub_dir_entries)
actual_directory = backend.directory_ls(sha1_bin, recursive=True)
# then
self.assertIsNotNone(actual_directory)
self.assertEqual(list(actual_directory), stub_dir_entries)
self.storage.directory_ls.assert_called_with(sha1_bin, True)
@istest
def release_get_not_found(self):
# given
sha1_bin = hashutil.hex_to_hash(
'65a55bbdf3629f916219feb3dcc7393ded1bc8db')
self.storage.release_get = MagicMock(return_value=[])
# when
actual_release = backend.release_get(sha1_bin)
# then
self.assertIsNone(actual_release)
self.storage.release_get.assert_called_with([sha1_bin])
@istest
def release_get(self):
# given
sha1_bin = hashutil.hex_to_hash(
'65a55bbdf3629f916219feb3dcc7393ded1bc8db')
stub_releases = [{
'id': sha1_bin,
'target': None,
'date': datetime.datetime(2015, 1, 1, 22, 0, 0,
tzinfo=datetime.timezone.utc),
'name': b'v0.0.1',
'message': b'synthetic release',
'synthetic': True,
}]
self.storage.release_get = MagicMock(return_value=stub_releases)
# when
actual_release = backend.release_get(sha1_bin)
# then
self.assertEqual(actual_release, stub_releases[0])
self.storage.release_get.assert_called_with([sha1_bin])
@istest
def revision_get_by_not_found(self):
# given
self.storage.revision_get_by = MagicMock(return_value=[])
# when
actual_revision = backend.revision_get_by(10, 'master', 'ts2')
# then
self.assertIsNone(actual_revision)
self.storage.revision_get_by.assert_called_with(10, 'master',
timestamp='ts2',
limit=1)
@istest
def revision_get_by(self):
# given
self.storage.revision_get_by = MagicMock(return_value=[{'id': 1}])
# when
actual_revisions = backend.revision_get_by(100, 'dev', 'ts')
# then
self.assertEquals(actual_revisions, {'id': 1})
self.storage.revision_get_by.assert_called_with(100, 'dev',
timestamp='ts',
limit=1)
@istest
def revision_get_not_found(self):
# given
sha1_bin = hashutil.hex_to_hash(
'18d8be353ed3480476f032475e7c233eff7371d5')
self.storage.revision_get = MagicMock(return_value=[])
# when
actual_revision = backend.revision_get(sha1_bin)
# then
self.assertIsNone(actual_revision)
self.storage.revision_get.assert_called_with([sha1_bin])
@istest
def revision_get(self):
# given
sha1_bin = hashutil.hex_to_hash(
'18d8be353ed3480476f032475e7c233eff7371d5')
stub_revisions = [{
'id': sha1_bin,
'directory': hashutil.hex_to_hash(
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'),
'author': {
'name': b'bill & boule',
'email': b'bill@boule.org',
},
'committer': {
'name': b'boule & bill',
'email': b'boule@bill.org',
},
'message': b'elegant fix for bug 31415957',
'date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'date_offset': 0,
'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
}]
self.storage.revision_get = MagicMock(return_value=stub_revisions)
# when
actual_revision = backend.revision_get(sha1_bin)
# then
self.assertEqual(actual_revision, stub_revisions[0])
self.storage.revision_get.assert_called_with([sha1_bin])
@istest
def revision_get_multiple(self):
# given
sha1_bin = hashutil.hex_to_hash(
'18d8be353ed3480476f032475e7c233eff7371d5')
sha1_other = hashutil.hex_to_hash(
'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc')
stub_revisions = [
{
'id': sha1_bin,
'directory': hashutil.hex_to_hash(
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'),
'author': {
'name': b'bill & boule',
'email': b'bill@boule.org',
},
'committer': {
'name': b'boule & bill',
'email': b'boule@bill.org',
},
'message': b'elegant fix for bug 31415957',
'date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'date_offset': 0,
'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
},
{
'id': sha1_other,
'directory': hashutil.hex_to_hash(
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'),
'author': {
'name': b'name',
'email': b'name@surname.org',
},
'committer': {
'name': b'name',
'email': b'name@surname.org',
},
'message': b'ugly fix for bug 42',
'date': datetime.datetime(2000, 1, 12, 5, 23, 54),
'date_offset': 0,
'committer_date': datetime.datetime(2000, 1, 12, 5, 23, 54),
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
}
]
self.storage.revision_get = MagicMock(
return_value=stub_revisions)
# when
actual_revision = backend.revision_get_multiple([sha1_bin, sha1_other])
# then
self.assertEqual(actual_revision, stub_revisions)
self.storage.revision_get.assert_called_with(
[sha1_bin, sha1_other])
@istest
def revision_get_multiple_none_found(self):
# given
sha1_bin = hashutil.hex_to_hash(
'18d8be353ed3480476f032475e7c233eff7371d5')
sha1_other = hashutil.hex_to_hash(
'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc')
self.storage.revision_get = MagicMock(
return_value=[])
# when
actual_revision = backend.revision_get_multiple([sha1_bin, sha1_other])
# then
self.assertEqual(actual_revision, [])
self.storage.revision_get.assert_called_with(
[sha1_bin, sha1_other])
@istest
def revision_log(self):
# given
sha1_bin = hashutil.hex_to_hash(
'28d8be353ed3480476f032475e7c233eff7371d5')
stub_revision_log = [{
'id': sha1_bin,
'directory': hashutil.hex_to_hash(
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'),
'author': {
'name': b'bill & boule',
'email': b'bill@boule.org',
},
'committer': {
'name': b'boule & bill',
'email': b'boule@bill.org',
},
'message': b'elegant fix for bug 31415957',
'date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'date_offset': 0,
'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
}]
self.storage.revision_log = MagicMock(return_value=stub_revision_log)
# when
actual_revision = backend.revision_log(sha1_bin, limit=1)
# then
self.assertEqual(list(actual_revision), stub_revision_log)
self.storage.revision_log.assert_called_with([sha1_bin], 1)
@istest
def revision_log_by(self):
# given
sha1_bin = hashutil.hex_to_hash(
'28d8be353ed3480476f032475e7c233eff7371d5')
stub_revision_log = [{
'id': sha1_bin,
'directory': hashutil.hex_to_hash(
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'),
'author': {
'name': b'bill & boule',
'email': b'bill@boule.org',
},
'committer': {
'name': b'boule & bill',
'email': b'boule@bill.org',
},
'message': b'elegant fix for bug 31415957',
'date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'date_offset': 0,
'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
}]
self.storage.revision_log_by = MagicMock(
return_value=stub_revision_log)
# when
actual_log = backend.revision_log_by(1, 'refs/heads/master',
None, limit=1)
# then
self.assertEqual(actual_log, stub_revision_log)
self.storage.revision_log.assert_called_with([sha1_bin], 1)
@istest
def revision_log_by_norev(self):
# given
sha1_bin = hashutil.hex_to_hash(
'28d8be353ed3480476f032475e7c233eff7371d5')
self.storage.revision_log_by = MagicMock(return_value=None)
# when
actual_log = backend.revision_log_by(1, 'refs/heads/master',
None, limit=1)
# then
self.assertEqual(actual_log, None)
self.storage.revision_log.assert_called_with([sha1_bin], 1)
@istest
def stat_counters(self):
# given
input_stats = {
"content": 1770830,
"directory": 211683,
"directory_entry_dir": 209167,
"directory_entry_file": 1807094,
"directory_entry_rev": 0,
"entity": 0,
"entity_history": 0,
"occurrence": 0,
"occurrence_history": 19600,
"origin": 1096,
"person": 0,
"release": 8584,
"revision": 7792,
"revision_history": 0,
"skipped_content": 0
}
self.storage.stat_counters = MagicMock(return_value=input_stats)
# when
actual_stats = backend.stat_counters()
# then
expected_stats = input_stats
self.assertEqual(actual_stats, expected_stats)
self.storage.stat_counters.assert_called_with()
@istest
def lookup_origin_visits(self):
# given
- expected_dates = [{
- 'date': datetime.datetime(
- 2015, 1, 1, 22, 0, 0,
- tzinfo=datetime.timezone.utc),
- 'origin': 1,
- 'visit': 1
- }, {
- 'date': datetime.datetime(
- 2013, 7, 1, 20, 0, 0,
- tzinfo=datetime.timezone.utc),
- 'origin': 1,
- 'visit': 2
- }, {
- 'date': datetime.datetime(
- 2015, 1, 1, 21, 0, 0,
- tzinfo=datetime.timezone.utc),
- 'origin': 1,
- 'visit': 3
- }]
- self.storage.origin_visit_get = MagicMock(return_value=expected_dates)
+ expected_origin_visits = [
+ self.origin_visit1, {
+ 'date': datetime.datetime(
+ 2013, 7, 1, 20, 0, 0,
+ tzinfo=datetime.timezone.utc),
+ 'origin': 1,
+ 'visit': 2
+ }, {
+ 'date': datetime.datetime(
+ 2015, 1, 1, 21, 0, 0,
+ tzinfo=datetime.timezone.utc),
+ 'origin': 1,
+ 'visit': 3
+ }]
+ self.storage.origin_visit_get = MagicMock(
+ return_value=expected_origin_visits)
# when
- actual_dates = backend.lookup_origin_visits(5)
+ actual_origin_visits = backend.lookup_origin_visits(5)
# then
- self.assertEqual(list(actual_dates), expected_dates)
+ self.assertEqual(list(actual_origin_visits), expected_origin_visits)
self.storage.origin_visit_get.assert_called_with(5)
+ @istest
+ def lookup_origin_visit(self):
+ # given
+ self.storage.origin_visit_get_by = MagicMock(
+ return_value=self.origin_visit1)
+
+ # when
+ actual_origin_visit = backend.lookup_origin_visit(10, 1)
+
+ # then
+ self.assertEqual(actual_origin_visit, self.origin_visit1)
+
+ self.storage.origin_visit_get_by.assert_called_with(10, 1)
+
+ @istest
+ def lookup_origin_visit_None(self):
+ # given
+ self.storage.origin_visit_get_by = MagicMock(
+ return_value=None)
+
+ # when
+ actual_origin_visit = backend.lookup_origin_visit(1, 10)
+
+ # then
+ self.assertIsNone(actual_origin_visit)
+
+ self.storage.origin_visit_get_by.assert_called_with(1, 10)
+
@istest
def directory_entry_get_by_path(self):
# given
stub_dir_entry = {'id': b'dir-id',
'type': 'dir',
'name': b'some/path/foo'}
self.storage.directory_entry_get_by_path = MagicMock(
return_value=stub_dir_entry)
# when
actual_dir_entry = backend.directory_entry_get_by_path(b'dir-sha1',
'some/path/foo')
self.assertEquals(actual_dir_entry, stub_dir_entry)
self.storage.directory_entry_get_by_path.assert_called_once_with(
b'dir-sha1',
[b'some', b'path', b'foo'])
@istest
def entity_get(self):
# given
stub_entities = [{'uuid': 'e8c3fc2e-a932-4fd7-8f8e-c40645eb35a7',
'parent': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2'},
{'uuid': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2',
'parent': None}]
self.storage.entity_get = MagicMock(return_value=stub_entities)
# when
actual_entities = backend.entity_get(
'e8c3fc2e-a932-4fd7-8f8e-c40645eb35a7')
# then
self.assertEquals(actual_entities, stub_entities)
self.storage.entity_get.assert_called_once_with(
'e8c3fc2e-a932-4fd7-8f8e-c40645eb35a7')
diff --git a/swh/web/ui/tests/test_service.py b/swh/web/ui/tests/test_service.py
index c03627da..365016a8 100644
--- a/swh/web/ui/tests/test_service.py
+++ b/swh/web/ui/tests/test_service.py
@@ -1,1726 +1,1749 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
from nose.tools import istest
from unittest.mock import MagicMock, patch, call
from swh.core.hashutil import hex_to_hash, hash_to_hex
from swh.web.ui import service
from swh.web.ui.exc import BadInputExc, NotFoundExc
from swh.web.ui.tests import test_app
class ServiceTestCase(test_app.SWHApiTestCase):
def setUp(self):
self.SHA1_SAMPLE = '18d8be353ed3480476f032475e7c233eff7371d5'
self.SHA1_SAMPLE_BIN = hex_to_hash(self.SHA1_SAMPLE)
self.SHA256_SAMPLE = ('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926')
self.SHA256_SAMPLE_BIN = hex_to_hash(self.SHA256_SAMPLE)
self.SHA1GIT_SAMPLE = '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'
self.SHA1GIT_SAMPLE_BIN = hex_to_hash(self.SHA1GIT_SAMPLE)
self.DIRECTORY_ID = '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'
self.DIRECTORY_ID_BIN = hex_to_hash(self.DIRECTORY_ID)
self.AUTHOR_ID_BIN = {
'name': b'author',
'email': b'author@company.org',
}
self.AUTHOR_ID = {
'name': 'author',
'email': 'author@company.org',
}
self.COMMITTER_ID_BIN = {
'name': b'committer',
'email': b'committer@corp.org',
}
self.COMMITTER_ID = {
'name': 'committer',
'email': 'committer@corp.org',
}
self.SAMPLE_DATE_RAW = {
'timestamp': datetime.datetime(
2000, 1, 17, 11, 23, 54,
tzinfo=datetime.timezone.utc,
).timestamp(),
'offset': 0,
'negative_utc': False,
}
self.SAMPLE_DATE = '2000-01-17T11:23:54+00:00'
self.SAMPLE_MESSAGE_BIN = b'elegant fix for bug 31415957'
self.SAMPLE_MESSAGE = 'elegant fix for bug 31415957'
self.SAMPLE_REVISION = {
'id': self.SHA1_SAMPLE,
'directory': self.DIRECTORY_ID,
'author': self.AUTHOR_ID,
'committer': self.COMMITTER_ID,
'message': self.SAMPLE_MESSAGE,
'date': self.SAMPLE_DATE,
'committer_date': self.SAMPLE_DATE,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
'merge': False
}
self.SAMPLE_REVISION_RAW = {
'id': self.SHA1_SAMPLE_BIN,
'directory': self.DIRECTORY_ID_BIN,
'author': self.AUTHOR_ID_BIN,
'committer': self.COMMITTER_ID_BIN,
'message': self.SAMPLE_MESSAGE_BIN,
'date': self.SAMPLE_DATE_RAW,
'committer_date': self.SAMPLE_DATE_RAW,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
}
self.SAMPLE_CONTENT = {
'sha1': self.SHA1_SAMPLE,
'sha256': self.SHA256_SAMPLE,
'sha1_git': self.SHA1GIT_SAMPLE,
'length': 190,
'status': 'absent'
}
self.SAMPLE_CONTENT_RAW = {
'sha1': self.SHA1_SAMPLE_BIN,
'sha256': self.SHA256_SAMPLE_BIN,
'sha1_git': self.SHA1GIT_SAMPLE_BIN,
'length': 190,
'status': 'hidden'
}
+ self.date_origin_visit1 = datetime.datetime(
+ 2015, 1, 1, 22, 0, 0,
+ tzinfo=datetime.timezone.utc)
+
+ self.origin_visit1 = {
+ 'date': self.date_origin_visit1,
+ 'origin': 1,
+ 'visit': 1
+ }
+
@patch('swh.web.ui.service.backend')
@istest
def lookup_multiple_hashes_ball_missing(self, mock_backend):
# given
mock_backend.content_missing_per_sha1 = MagicMock(return_value=[])
# when
actual_lookup = service.lookup_multiple_hashes(
[{'filename': 'a',
'sha1': '456caf10e9535160d90e874b45aa426de762f19f'},
{'filename': 'b',
'sha1': '745bab676c8f3cec8016e0c39ea61cf57e518865'}])
# then
self.assertEquals(actual_lookup, [
{'filename': 'a',
'sha1': '456caf10e9535160d90e874b45aa426de762f19f',
'found': True},
{'filename': 'b',
'sha1': '745bab676c8f3cec8016e0c39ea61cf57e518865',
'found': True}
])
@patch('swh.web.ui.service.backend')
@istest
def lookup_multiple_hashes_some_missing(self, mock_backend):
# given
mock_backend.content_missing_per_sha1 = MagicMock(return_value=[
hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')
])
# when
actual_lookup = service.lookup_multiple_hashes(
[{'filename': 'a',
'sha1': '456caf10e9535160d90e874b45aa426de762f19f'},
{'filename': 'b',
'sha1': '745bab676c8f3cec8016e0c39ea61cf57e518865'}])
# then
self.assertEquals(actual_lookup, [
{'filename': 'a',
'sha1': '456caf10e9535160d90e874b45aa426de762f19f',
'found': False},
{'filename': 'b',
'sha1': '745bab676c8f3cec8016e0c39ea61cf57e518865',
'found': True}
])
@patch('swh.web.ui.service.backend')
@istest
def lookup_hash_does_not_exist(self, mock_backend):
# given
mock_backend.content_find = MagicMock(return_value=None)
# when
actual_lookup = service.lookup_hash(
'sha1_git:123caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEquals({'found': None,
'algo': 'sha1_git'}, actual_lookup)
# check the function has been called with parameters
mock_backend.content_find.assert_called_with(
'sha1_git',
hex_to_hash('123caf10e9535160d90e874b45aa426de762f19f'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_hash_exist(self, mock_backend):
# given
stub_content = {
'sha1': hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')
}
mock_backend.content_find = MagicMock(return_value=stub_content)
# when
actual_lookup = service.lookup_hash(
'sha1:456caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEquals({'found': stub_content,
'algo': 'sha1'}, actual_lookup)
mock_backend.content_find.assert_called_with(
'sha1',
hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f'),
)
@patch('swh.web.ui.service.backend')
@istest
def search_hash_does_not_exist(self, mock_backend):
# given
mock_backend.content_find = MagicMock(return_value=None)
# when
actual_lookup = service.search_hash(
'sha1_git:123caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEquals({'found': False}, actual_lookup)
# check the function has been called with parameters
mock_backend.content_find.assert_called_with(
'sha1_git',
hex_to_hash('123caf10e9535160d90e874b45aa426de762f19f'))
@patch('swh.web.ui.service.backend')
@istest
def search_hash_exist(self, mock_backend):
# given
stub_content = {
'sha1': hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')
}
mock_backend.content_find = MagicMock(return_value=stub_content)
# when
actual_lookup = service.search_hash(
'sha1:456caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEquals({'found': True}, actual_lookup)
mock_backend.content_find.assert_called_with(
'sha1',
hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f'),
)
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_provenance(self, mock_backend):
# given
mock_backend.content_find_provenance = MagicMock(
return_value=(p for p in [{
'content': hex_to_hash(
'123caf10e9535160d90e874b45aa426de762f19f'),
'revision': hex_to_hash(
'456caf10e9535160d90e874b45aa426de762f19f'),
'origin': 100,
'visit': 1,
'path': b'octavio-3.4.0/octave.html/doc_002dS_005fISREG.html'
}]))
expected_provenances = [{
'content': '123caf10e9535160d90e874b45aa426de762f19f',
'revision': '456caf10e9535160d90e874b45aa426de762f19f',
'origin': 100,
'visit': 1,
'path': 'octavio-3.4.0/octave.html/doc_002dS_005fISREG.html'
}]
# when
actual_provenances = service.lookup_content_provenance(
'sha1_git:123caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEqual(list(actual_provenances), expected_provenances)
mock_backend.content_find_provenance.assert_called_with(
'sha1_git',
hex_to_hash('123caf10e9535160d90e874b45aa426de762f19f'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_provenance_not_found(self, mock_backend):
# given
mock_backend.content_find_provenance = MagicMock(return_value=None)
# when
actual_provenances = service.lookup_content_provenance(
'sha1_git:456caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertIsNone(actual_provenances)
mock_backend.content_find_provenance.assert_called_with(
'sha1_git',
hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f'))
@patch('swh.web.ui.service.backend')
@istest
def stat_counters(self, mock_backend):
# given
input_stats = {
"content": 1770830,
"directory": 211683,
"directory_entry_dir": 209167,
"directory_entry_file": 1807094,
"directory_entry_rev": 0,
"entity": 0,
"entity_history": 0,
"occurrence": 0,
"occurrence_history": 19600,
"origin": 1096,
"person": 0,
"release": 8584,
"revision": 7792,
"revision_history": 0,
"skipped_content": 0
}
mock_backend.stat_counters = MagicMock(return_value=input_stats)
# when
actual_stats = service.stat_counters()
# then
expected_stats = input_stats
self.assertEqual(actual_stats, expected_stats)
mock_backend.stat_counters.assert_called_with()
@patch('swh.web.ui.service.backend')
@istest
def lookup_origin_visits(self, mock_backend):
# given
- date_origin_visit1 = datetime.datetime(
- 2015, 1, 1, 22, 0, 0,
- tzinfo=datetime.timezone.utc)
-
date_origin_visit2 = datetime.datetime(
2013, 7, 1, 20, 0, 0,
tzinfo=datetime.timezone.utc)
date_origin_visit3 = datetime.datetime(
2015, 1, 1, 21, 0, 0,
tzinfo=datetime.timezone.utc)
- stub_result = [{
- 'date': date_origin_visit1,
- 'origin': 1,
- 'visit': 1
- }, {
+ stub_result = [self.origin_visit1, {
'date': date_origin_visit2,
'origin': 1,
'visit': 2
}, {
'date': date_origin_visit3,
'origin': 1,
'visit': 3
}]
mock_backend.lookup_origin_visits.return_value = stub_result
# when
- expected_dates = [{
- 'date': date_origin_visit1.timestamp(),
- 'origin': 1,
- 'visit': 1
+ expected_origin_visits = [{
+ 'date': self.origin_visit1['date'].timestamp(),
+ 'origin': self.origin_visit1['origin'],
+ 'visit': self.origin_visit1['visit']
}, {
'date': date_origin_visit2.timestamp(),
'origin': 1,
'visit': 2
}, {
'date': date_origin_visit3.timestamp(),
'origin': 1,
'visit': 3
}]
- actual_dates = service.lookup_origin_visits(6)
+ actual_origin_visits = service.lookup_origin_visits(6)
# then
- self.assertEqual(list(actual_dates), expected_dates)
+ self.assertEqual(list(actual_origin_visits), expected_origin_visits)
mock_backend.lookup_origin_visits.assert_called_once_with(6)
+ @patch('swh.web.ui.service.backend')
+ @istest
+ def lookup_origin_visit(self, mock_backend):
+ # given
+ stub_result = self.origin_visit1
+ mock_backend.lookup_origin_visit.return_value = stub_result
+
+ expected_origin_visit = {
+ 'date': self.origin_visit1['date'].timestamp(),
+ 'origin': self.origin_visit1['origin'],
+ 'visit': self.origin_visit1['visit']
+ }
+
+ # when
+ actual_origin_visit = service.lookup_origin_visit(1, 1)
+
+ # then
+ self.assertEqual(actual_origin_visit, expected_origin_visit)
+
+ mock_backend.lookup_origin_visit.assert_called_once_with(1, 1)
+
@patch('swh.web.ui.service.backend')
@istest
def lookup_origin(self, mock_backend):
# given
mock_backend.origin_get = MagicMock(return_value={
'id': 'origin-id',
'lister': 'uuid-lister',
'project': 'uuid-project',
'url': 'ftp://some/url/to/origin',
'type': 'ftp'})
# when
actual_origin = service.lookup_origin({'id': 'origin-id'})
# then
self.assertEqual(actual_origin, {'id': 'origin-id',
'lister': 'uuid-lister',
'project': 'uuid-project',
'url': 'ftp://some/url/to/origin',
'type': 'ftp'})
mock_backend.origin_get.assert_called_with({'id': 'origin-id'})
@patch('swh.web.ui.service.backend')
@istest
def lookup_release_ko_id_checksum_not_ok_because_not_a_sha1(self,
mock_backend):
# given
mock_backend.release_get = MagicMock()
with self.assertRaises(BadInputExc) as cm:
# when
service.lookup_release('not-a-sha1')
self.assertIn('invalid checksum', cm.exception.args[0])
mock_backend.release_get.called = False
@patch('swh.web.ui.service.backend')
@istest
def lookup_release_ko_id_checksum_ok_but_not_a_sha1(self, mock_backend):
# given
mock_backend.release_get = MagicMock()
# when
with self.assertRaises(BadInputExc) as cm:
service.lookup_release(
'13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f62d4daf5'
'1aea892abe')
self.assertIn('sha1_git supported', cm.exception.args[0])
mock_backend.release_get.called = False
@patch('swh.web.ui.service.backend')
@istest
def lookup_directory_with_path_not_found(self, mock_backend):
# given
mock_backend.lookup_directory_with_path = MagicMock(return_value=None)
sha1_git = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
# when
actual_directory = mock_backend.lookup_directory_with_path(
sha1_git, 'some/path/here')
self.assertIsNone(actual_directory)
@patch('swh.web.ui.service.backend')
@istest
def lookup_directory_with_path_found(self, mock_backend):
# given
sha1_git = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
entry = {'id': 'dir-id',
'type': 'dir',
'name': 'some/path/foo'}
mock_backend.lookup_directory_with_path = MagicMock(return_value=entry)
# when
actual_directory = mock_backend.lookup_directory_with_path(
sha1_git, 'some/path/here')
self.assertEqual(entry, actual_directory)
@patch('swh.web.ui.service.backend')
@istest
def lookup_release(self, mock_backend):
# given
mock_backend.release_get = MagicMock(return_value={
'id': hex_to_hash('65a55bbdf3629f916219feb3dcc7393ded1bc8db'),
'target': None,
'date': {
'timestamp': datetime.datetime(
2015, 1, 1, 22, 0, 0,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': True,
},
'name': b'v0.0.1',
'message': b'synthetic release',
'synthetic': True,
})
# when
actual_release = service.lookup_release(
'65a55bbdf3629f916219feb3dcc7393ded1bc8db')
# then
self.assertEqual(actual_release, {
'id': '65a55bbdf3629f916219feb3dcc7393ded1bc8db',
'target': None,
'date': '2015-01-01T22:00:00-00:00',
'name': 'v0.0.1',
'message': 'synthetic release',
'synthetic': True,
})
mock_backend.release_get.assert_called_with(
hex_to_hash('65a55bbdf3629f916219feb3dcc7393ded1bc8db'))
@istest
def lookup_revision_with_context_ko_not_a_sha1_1(self):
# given
sha1_git = '13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f62d4' \
'daf51aea892abe'
sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
# when
with self.assertRaises(BadInputExc) as cm:
service.lookup_revision_with_context(sha1_git_root, sha1_git)
self.assertIn('Only sha1_git is supported', cm.exception.args[0])
@istest
def lookup_revision_with_context_ko_not_a_sha1_2(self):
# given
sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
sha1_git = '13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f6' \
'2d4daf51aea892abe'
# when
with self.assertRaises(BadInputExc) as cm:
service.lookup_revision_with_context(sha1_git_root, sha1_git)
self.assertIn('Only sha1_git is supported', cm.exception.args[0])
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_with_context_ko_sha1_git_does_not_exist(
self,
mock_backend):
# given
sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
sha1_git = '777777bdf3629f916219feb3dcc7393ded1bc8db'
sha1_git_bin = hex_to_hash(sha1_git)
mock_backend.revision_get.return_value = None
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_revision_with_context(sha1_git_root, sha1_git)
self.assertIn('Revision 777777bdf3629f916219feb3dcc7393ded1bc8db'
' not found', cm.exception.args[0])
mock_backend.revision_get.assert_called_once_with(
sha1_git_bin)
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_with_context_ko_root_sha1_git_does_not_exist(
self,
mock_backend):
# given
sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
sha1_git = '777777bdf3629f916219feb3dcc7393ded1bc8db'
sha1_git_root_bin = hex_to_hash(sha1_git_root)
sha1_git_bin = hex_to_hash(sha1_git)
mock_backend.revision_get.side_effect = ['foo', None]
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_revision_with_context(sha1_git_root, sha1_git)
self.assertIn('Revision 65a55bbdf3629f916219feb3dcc7393ded1bc8db'
' not found', cm.exception.args[0])
mock_backend.revision_get.assert_has_calls([call(sha1_git_bin),
call(sha1_git_root_bin)])
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_revision_with_context(self, mock_query, mock_backend):
# given
sha1_git_root = '666'
sha1_git = '883'
sha1_git_root_bin = b'666'
sha1_git_bin = b'883'
sha1_git_root_dict = {
'id': sha1_git_root_bin,
'parents': [b'999'],
}
sha1_git_dict = {
'id': sha1_git_bin,
'parents': [],
'directory': b'278',
}
stub_revisions = [
sha1_git_root_dict,
{
'id': b'999',
'parents': [b'777', b'883', b'888'],
},
{
'id': b'777',
'parents': [b'883'],
},
sha1_git_dict,
{
'id': b'888',
'parents': [b'889'],
},
{
'id': b'889',
'parents': [],
},
]
# inputs ok
mock_query.parse_hash_with_algorithms_or_throws.side_effect = [
('sha1', sha1_git_bin),
('sha1', sha1_git_root_bin)
]
# lookup revision first 883, then 666 (both exists)
mock_backend.revision_get.side_effect = [
sha1_git_dict,
sha1_git_root_dict
]
mock_backend.revision_log = MagicMock(
return_value=stub_revisions)
# when
actual_revision = service.lookup_revision_with_context(
sha1_git_root,
sha1_git)
# then
self.assertEquals(actual_revision, {
'id': hash_to_hex(sha1_git_bin),
'parents': [],
'children': [hash_to_hex(b'999'), hash_to_hex(b'777')],
'directory': hash_to_hex(b'278'),
'merge': False
})
mock_query.parse_hash_with_algorithms_or_throws.assert_has_calls(
[call(sha1_git, ['sha1'], 'Only sha1_git is supported.'),
call(sha1_git_root, ['sha1'], 'Only sha1_git is supported.')])
mock_backend.revision_log.assert_called_with(
sha1_git_root_bin, 100)
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_revision_with_context_sha1_git_root_already_retrieved_as_dict(
self, mock_query, mock_backend):
# given
sha1_git = '883'
sha1_git_root_bin = b'666'
sha1_git_bin = b'883'
sha1_git_root_dict = {
'id': sha1_git_root_bin,
'parents': [b'999'],
}
sha1_git_dict = {
'id': sha1_git_bin,
'parents': [],
'directory': b'278',
}
stub_revisions = [
sha1_git_root_dict,
{
'id': b'999',
'parents': [b'777', b'883', b'888'],
},
{
'id': b'777',
'parents': [b'883'],
},
sha1_git_dict,
{
'id': b'888',
'parents': [b'889'],
},
{
'id': b'889',
'parents': [],
},
]
# inputs ok
mock_query.parse_hash_with_algorithms_or_throws.return_value = (
'sha1', sha1_git_bin)
# lookup only on sha1
mock_backend.revision_get.return_value = sha1_git_dict
mock_backend.revision_log.return_value = stub_revisions
# when
actual_revision = service.lookup_revision_with_context(
{'id': sha1_git_root_bin},
sha1_git)
# then
self.assertEquals(actual_revision, {
'id': hash_to_hex(sha1_git_bin),
'parents': [],
'children': [hash_to_hex(b'999'), hash_to_hex(b'777')],
'directory': hash_to_hex(b'278'),
'merge': False
})
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with( # noqa
sha1_git, ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(sha1_git_bin)
mock_backend.revision_log.assert_called_with(
sha1_git_root_bin, 100)
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_ko_revision_not_found(self,
mock_query,
mock_backend):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
b'123')
mock_backend.revision_get.return_value = None
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_directory_with_revision('123')
self.assertIn('Revision 123 not found', cm.exception.args[0])
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_ko_revision_with_path_to_nowhere(
self,
mock_query,
mock_backend):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
b'123')
dir_id = b'dir-id-as-sha1'
mock_backend.revision_get.return_value = {
'directory': dir_id,
}
mock_backend.directory_entry_get_by_path.return_value = None
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_directory_with_revision(
'123',
'path/to/something/unknown')
self.assertIn("Directory/File 'path/to/something/unknown' " +
"pointed to by revision 123 not found",
cm.exception.args[0])
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
mock_backend.directory_entry_get_by_path.assert_called_once_with(
b'dir-id-as-sha1', 'path/to/something/unknown')
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_ko_type_not_implemented(
self,
mock_query,
mock_backend):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
b'123')
dir_id = b'dir-id-as-sha1'
mock_backend.revision_get.return_value = {
'directory': dir_id,
}
mock_backend.directory_entry_get_by_path.return_value = {
'type': 'rev',
'name': b'some/path/to/rev',
'target': b'456'
}
stub_content = {
'id': b'12',
'type': 'file'
}
mock_backend.content_get.return_value = stub_content
# when
with self.assertRaises(NotImplementedError) as cm:
service.lookup_directory_with_revision(
'123',
'some/path/to/rev')
self.assertIn("Entity of type 'rev' not implemented.",
cm.exception.args[0])
# then
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
mock_backend.directory_entry_get_by_path.assert_called_once_with(
b'dir-id-as-sha1', 'some/path/to/rev')
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_revision_without_path(self,
mock_query,
mock_backend):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
b'123')
dir_id = b'dir-id-as-sha1'
mock_backend.revision_get.return_value = {
'directory': dir_id,
}
stub_dir_entries = [{
'id': b'123',
'type': 'dir'
}, {
'id': b'456',
'type': 'file'
}]
mock_backend.directory_ls.return_value = stub_dir_entries
# when
actual_directory_entries = service.lookup_directory_with_revision(
'123')
self.assertEqual(actual_directory_entries['type'], 'dir')
self.assertEqual(list(actual_directory_entries['content']),
stub_dir_entries)
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
mock_backend.directory_ls.assert_called_once_with(dir_id)
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_revision_with_path_to_dir(self,
mock_query,
mock_backend):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
b'123')
dir_id = b'dir-id-as-sha1'
mock_backend.revision_get.return_value = {
'directory': dir_id,
}
stub_dir_entries = [{
'id': b'12',
'type': 'dir'
}, {
'id': b'34',
'type': 'file'
}]
mock_backend.directory_entry_get_by_path.return_value = {
'type': 'dir',
'name': b'some/path',
'target': b'456'
}
mock_backend.directory_ls.return_value = stub_dir_entries
# when
actual_directory_entries = service.lookup_directory_with_revision(
'123',
'some/path')
self.assertEqual(actual_directory_entries['type'], 'dir')
self.assertEqual(actual_directory_entries['revision'], '123')
self.assertEqual(actual_directory_entries['path'], 'some/path')
self.assertEqual(list(actual_directory_entries['content']),
stub_dir_entries)
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
mock_backend.directory_entry_get_by_path.assert_called_once_with(
dir_id,
'some/path')
mock_backend.directory_ls.assert_called_once_with(b'456')
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_revision_with_path_to_file_without_data(
self,
mock_query,
mock_backend):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
b'123')
dir_id = b'dir-id-as-sha1'
mock_backend.revision_get.return_value = {
'directory': dir_id,
}
mock_backend.directory_entry_get_by_path.return_value = {
'type': 'file',
'name': b'some/path/to/file',
'target': b'789'
}
stub_content = {
'status': 'visible',
}
mock_backend.content_find.return_value = stub_content
# when
actual_content = service.lookup_directory_with_revision(
'123',
'some/path/to/file')
# then
self.assertEqual(actual_content, {'type': 'file',
'revision': '123',
'path': 'some/path/to/file',
'content': stub_content})
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
mock_backend.directory_entry_get_by_path.assert_called_once_with(
b'dir-id-as-sha1', 'some/path/to/file')
mock_backend.content_find.assert_called_once_with('sha1_git', b'789')
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_revision_with_path_to_file_with_data(
self,
mock_query,
mock_backend):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
b'123')
dir_id = b'dir-id-as-sha1'
mock_backend.revision_get.return_value = {
'directory': dir_id,
}
mock_backend.directory_entry_get_by_path.return_value = {
'type': 'file',
'name': b'some/path/to/file',
'target': b'789'
}
stub_content = {
'status': 'visible',
'sha1': b'content-sha1'
}
mock_backend.content_find.return_value = stub_content
mock_backend.content_get.return_value = {
'sha1': b'content-sha1',
'data': b'some raw data'
}
expected_content = {
'status': 'visible',
'sha1': hash_to_hex(b'content-sha1'),
'data': b'some raw data'
}
# when
actual_content = service.lookup_directory_with_revision(
'123',
'some/path/to/file',
with_data=True)
# then
self.assertEqual(actual_content, {'type': 'file',
'revision': '123',
'path': 'some/path/to/file',
'content': expected_content})
mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
mock_backend.directory_entry_get_by_path.assert_called_once_with(
b'dir-id-as-sha1', 'some/path/to/file')
mock_backend.content_find.assert_called_once_with('sha1_git', b'789')
mock_backend.content_get.assert_called_once_with(b'content-sha1')
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision(self, mock_backend):
# given
mock_backend.revision_get = MagicMock(
return_value=self.SAMPLE_REVISION_RAW)
# when
actual_revision = service.lookup_revision(
self.SHA1_SAMPLE)
# then
self.assertEqual(actual_revision, self.SAMPLE_REVISION)
mock_backend.revision_get.assert_called_with(
self.SHA1_SAMPLE_BIN)
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_invalid_msg(self, mock_backend):
# given
stub_rev = self.SAMPLE_REVISION_RAW
stub_rev['message'] = b'elegant fix for bug \xff'
expected_revision = self.SAMPLE_REVISION
expected_revision['message'] = None
expected_revision['message_decoding_failed'] = True
mock_backend.revision_get = MagicMock(return_value=stub_rev)
# when
actual_revision = service.lookup_revision(
self.SHA1_SAMPLE)
# then
self.assertEqual(actual_revision, expected_revision)
mock_backend.revision_get.assert_called_with(
self.SHA1_SAMPLE_BIN)
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_msg_ok(self, mock_backend):
# given
mock_backend.revision_get.return_value = self.SAMPLE_REVISION_RAW
# when
rv = service.lookup_revision_message(
self.SHA1_SAMPLE)
# then
self.assertEquals(rv, {'message': self.SAMPLE_MESSAGE_BIN})
mock_backend.revision_get.assert_called_with(
self.SHA1_SAMPLE_BIN)
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_msg_absent(self, mock_backend):
# given
stub_revision = self.SAMPLE_REVISION_RAW
del stub_revision['message']
mock_backend.revision_get.return_value = stub_revision
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_revision_message(
self.SHA1_SAMPLE)
# then
mock_backend.revision_get.assert_called_with(
self.SHA1_SAMPLE_BIN)
self.assertEqual(cm.exception.args[0], 'No message for revision '
'with sha1_git '
'18d8be353ed3480476f032475e7c233eff7371d5.')
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_msg_norev(self, mock_backend):
# given
mock_backend.revision_get.return_value = None
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_revision_message(
self.SHA1_SAMPLE)
# then
mock_backend.revision_get.assert_called_with(
self.SHA1_SAMPLE_BIN)
self.assertEqual(cm.exception.args[0], 'Revision with sha1_git '
'18d8be353ed3480476f032475e7c233eff7371d5 '
'not found.')
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_multiple(self, mock_backend):
# given
sha1 = self.SHA1_SAMPLE
sha1_other = 'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'
stub_revisions = [
self.SAMPLE_REVISION_RAW,
{
'id': hex_to_hash(sha1_other),
'directory': 'abcdbe353ed3480476f032475e7c233eff7371d5',
'author': {
'name': b'name',
'email': b'name@surname.org',
},
'committer': {
'name': b'name',
'email': b'name@surname.org',
},
'message': b'ugly fix for bug 42',
'date': {
'timestamp': datetime.datetime(
2000, 1, 12, 5, 23, 54,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': False
},
'date_offset': 0,
'committer_date': {
'timestamp': datetime.datetime(
2000, 1, 12, 5, 23, 54,
tzinfo=datetime.timezone.utc).timestamp(),
'offset': 0,
'negative_utc': False
},
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
}
]
mock_backend.revision_get_multiple.return_value = stub_revisions
# when
actual_revisions = service.lookup_revision_multiple(
[sha1, sha1_other])
# then
self.assertEqual(list(actual_revisions), [
self.SAMPLE_REVISION,
{
'id': sha1_other,
'directory': 'abcdbe353ed3480476f032475e7c233eff7371d5',
'author': {
'name': 'name',
'email': 'name@surname.org',
},
'committer': {
'name': 'name',
'email': 'name@surname.org',
},
'message': 'ugly fix for bug 42',
'date': '2000-01-12T05:23:54+00:00',
'date_offset': 0,
'committer_date': '2000-01-12T05:23:54+00:00',
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
'merge': False
}
])
self.assertEqual(
list(mock_backend.revision_get_multiple.call_args[0][0]),
[hex_to_hash(sha1),
hex_to_hash(sha1_other)])
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_multiple_none_found(self, mock_backend):
# given
sha1_bin = self.SHA1_SAMPLE
sha1_other = 'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'
mock_backend.revision_get_multiple.return_value = []
# then
actual_revisions = service.lookup_revision_multiple(
[sha1_bin, sha1_other])
self.assertEqual(list(actual_revisions), [])
self.assertEqual(
list(mock_backend.revision_get_multiple.call_args[0][0]),
[hex_to_hash(self.SHA1_SAMPLE),
hex_to_hash(sha1_other)])
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_log(self, mock_backend):
# given
stub_revision_log = [self.SAMPLE_REVISION_RAW]
mock_backend.revision_log = MagicMock(return_value=stub_revision_log)
# when
actual_revision = service.lookup_revision_log(
'abcdbe353ed3480476f032475e7c233eff7371d5',
limit=25)
# then
self.assertEqual(list(actual_revision), [self.SAMPLE_REVISION])
mock_backend.revision_log.assert_called_with(
hex_to_hash('abcdbe353ed3480476f032475e7c233eff7371d5'), 25)
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_log_by(self, mock_backend):
# given
stub_revision_log = [self.SAMPLE_REVISION_RAW]
mock_backend.revision_log_by = MagicMock(
return_value=stub_revision_log)
# when
actual_log = service.lookup_revision_log_by(
1, 'refs/heads/master', None, limit=100)
# then
self.assertEqual(list(actual_log), [self.SAMPLE_REVISION])
mock_backend.revision_log_by.assert_called_with(
1, 'refs/heads/master', None, 100)
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_log_by_nolog(self, mock_backend):
# given
mock_backend.revision_log_by = MagicMock(return_value=None)
# when
res = service.lookup_revision_log_by(
1, 'refs/heads/master', None, limit=100)
# then
self.assertEquals(res, None)
mock_backend.revision_log_by.assert_called_with(
1, 'refs/heads/master', None, 100)
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_raw_not_found(self, mock_backend):
# given
mock_backend.content_find = MagicMock(return_value=None)
# when
actual_content = service.lookup_content_raw(
'sha1:18d8be353ed3480476f032475e7c233eff7371d5')
# then
self.assertIsNone(actual_content)
mock_backend.content_find.assert_called_with(
'sha1', hex_to_hash(self.SHA1_SAMPLE))
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_raw(self, mock_backend):
# given
mock_backend.content_find = MagicMock(return_value={
'sha1': self.SHA1_SAMPLE,
})
mock_backend.content_get = MagicMock(return_value={
'data': b'binary data'})
# when
actual_content = service.lookup_content_raw(
'sha256:%s' % self.SHA256_SAMPLE)
# then
self.assertEquals(actual_content, {'data': b'binary data'})
mock_backend.content_find.assert_called_once_with(
'sha256', self.SHA256_SAMPLE_BIN)
mock_backend.content_get.assert_called_once_with(
self.SHA1_SAMPLE)
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_not_found(self, mock_backend):
# given
mock_backend.content_find = MagicMock(return_value=None)
# when
actual_content = service.lookup_content(
'sha1:%s' % self.SHA1_SAMPLE)
# then
self.assertIsNone(actual_content)
mock_backend.content_find.assert_called_with(
'sha1', self.SHA1_SAMPLE_BIN)
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_with_sha1(self, mock_backend):
# given
mock_backend.content_find = MagicMock(
return_value=self.SAMPLE_CONTENT_RAW)
# when
actual_content = service.lookup_content(
'sha1:%s' % self.SHA1_SAMPLE)
# then
self.assertEqual(actual_content, self.SAMPLE_CONTENT)
mock_backend.content_find.assert_called_with(
'sha1', hex_to_hash(self.SHA1_SAMPLE))
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_with_sha256(self, mock_backend):
# given
stub_content = self.SAMPLE_CONTENT_RAW
stub_content['status'] = 'visible'
expected_content = self.SAMPLE_CONTENT
expected_content['status'] = 'visible'
mock_backend.content_find = MagicMock(
return_value=stub_content)
# when
actual_content = service.lookup_content(
'sha256:%s' % self.SHA256_SAMPLE)
# then
self.assertEqual(actual_content, expected_content)
mock_backend.content_find.assert_called_with(
'sha256', self.SHA256_SAMPLE_BIN)
@patch('swh.web.ui.service.backend')
@istest
def lookup_person(self, mock_backend):
# given
mock_backend.person_get = MagicMock(return_value={
'id': 'person_id',
'name': b'some_name',
'email': b'some-email',
})
# when
actual_person = service.lookup_person('person_id')
# then
self.assertEqual(actual_person, {
'id': 'person_id',
'name': 'some_name',
'email': 'some-email',
})
mock_backend.person_get.assert_called_with('person_id')
@patch('swh.web.ui.service.backend')
@istest
def lookup_directory_bad_checksum(self, mock_backend):
# given
mock_backend.directory_ls = MagicMock()
# when
with self.assertRaises(BadInputExc):
service.lookup_directory('directory_id')
# then
mock_backend.directory_ls.called = False
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_not_found(self, mock_query, mock_backend):
# given
mock_query.parse_hash_with_algorithms_or_throws.return_value = (
'sha1',
'directory-id-bin')
mock_backend.directory_get.return_value = None
# when
actual_dir = service.lookup_directory('directory_id')
# then
self.assertIsNone(actual_dir)
mock_query.parse_hash_with_algorithms_or_throws.assert_called_with(
'directory_id', ['sha1'], 'Only sha1_git is supported.')
mock_backend.directory_get.assert_called_with('directory-id-bin')
mock_backend.directory_ls.called = False
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory(self, mock_query, mock_backend):
mock_query.parse_hash_with_algorithms_or_throws.return_value = (
'sha1',
'directory-sha1-bin')
# something that exists is all that matters here
mock_backend.directory_get.return_value = {'id': b'directory-sha1-bin'}
# given
stub_dir_entries = [{
'sha1': self.SHA1_SAMPLE_BIN,
'sha256': self.SHA256_SAMPLE_BIN,
'sha1_git': self.SHA1GIT_SAMPLE_BIN,
'target': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'dir_id': self.DIRECTORY_ID_BIN,
'name': b'bob',
'type': 10,
}]
expected_dir_entries = [{
'sha1': self.SHA1_SAMPLE,
'sha256': self.SHA256_SAMPLE,
'sha1_git': self.SHA1GIT_SAMPLE,
'target': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'dir_id': self.DIRECTORY_ID,
'name': 'bob',
'type': 10,
}]
mock_backend.directory_ls.return_value = stub_dir_entries
# when
actual_directory_ls = list(service.lookup_directory(
'directory-sha1'))
# then
self.assertEqual(actual_directory_ls, expected_dir_entries)
mock_query.parse_hash_with_algorithms_or_throws.assert_called_with(
'directory-sha1', ['sha1'], 'Only sha1_git is supported.')
mock_backend.directory_ls.assert_called_with(
'directory-sha1-bin')
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_by_nothing_found(self, mock_backend):
# given
mock_backend.revision_get_by.return_value = None
# when
actual_revisions = service.lookup_revision_by(1)
# then
self.assertIsNone(actual_revisions)
mock_backend.revision_get_by(1, 'master', None)
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_by(self, mock_backend):
# given
stub_rev = self.SAMPLE_REVISION_RAW
expected_rev = self.SAMPLE_REVISION
mock_backend.revision_get_by.return_value = stub_rev
# when
actual_revision = service.lookup_revision_by(10, 'master2', 'some-ts')
# then
self.assertEquals(actual_revision, expected_rev)
mock_backend.revision_get_by(1, 'master2', 'some-ts')
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_by_nomerge(self, mock_backend):
# given
stub_rev = self.SAMPLE_REVISION_RAW
stub_rev['parents'] = [
hex_to_hash('adc83b19e793491b1c6ea0fd8b46cd9f32e592fc')]
expected_rev = self.SAMPLE_REVISION
expected_rev['parents'] = ['adc83b19e793491b1c6ea0fd8b46cd9f32e592fc']
mock_backend.revision_get_by.return_value = stub_rev
# when
actual_revision = service.lookup_revision_by(10, 'master2', 'some-ts')
# then
self.assertEquals(actual_revision, expected_rev)
mock_backend.revision_get_by(1, 'master2', 'some-ts')
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_by_merge(self, mock_backend):
# given
stub_rev = self.SAMPLE_REVISION_RAW
stub_rev['parents'] = [
hex_to_hash('adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'),
hex_to_hash('ffff3b19e793491b1c6db0fd8b46cd9f32e592fc')
]
expected_rev = self.SAMPLE_REVISION
expected_rev['parents'] = [
'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc',
'ffff3b19e793491b1c6db0fd8b46cd9f32e592fc'
]
expected_rev['merge'] = True
mock_backend.revision_get_by.return_value = stub_rev
# when
actual_revision = service.lookup_revision_by(10, 'master2', 'some-ts')
# then
self.assertEquals(actual_revision, expected_rev)
mock_backend.revision_get_by(1, 'master2', 'some-ts')
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_with_context_by_ko(self, mock_backend):
# given
mock_backend.revision_get_by.return_value = None
# when
with self.assertRaises(NotFoundExc) as cm:
origin_id = 1
branch_name = 'master3'
ts = None
service.lookup_revision_with_context_by(origin_id, branch_name, ts,
'sha1')
# then
self.assertIn(
'Revision with (origin_id: %s, branch_name: %s'
', ts: %s) not found.' % (origin_id,
branch_name,
ts), cm.exception.args[0])
mock_backend.revision_get_by.assert_called_once_with(
origin_id, branch_name, ts)
@patch('swh.web.ui.service.lookup_revision_with_context')
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_with_context_by(self, mock_backend,
mock_lookup_revision_with_context):
# given
stub_root_rev = {'id': 'root-rev-id'}
mock_backend.revision_get_by.return_value = {'id': 'root-rev-id'}
stub_rev = {'id': 'rev-found'}
mock_lookup_revision_with_context.return_value = stub_rev
# when
origin_id = 1
branch_name = 'master3'
ts = None
sha1_git = 'sha1'
actual_root_rev, actual_rev = service.lookup_revision_with_context_by(
origin_id, branch_name, ts, sha1_git)
# then
self.assertEquals(actual_root_rev, stub_root_rev)
self.assertEquals(actual_rev, stub_rev)
mock_backend.revision_get_by.assert_called_once_with(
origin_id, branch_name, ts)
mock_lookup_revision_with_context.assert_called_once_with(
stub_root_rev, sha1_git, 100)
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_entity_by_uuid(self, mock_query, mock_backend):
# given
uuid_test = 'correct-uuid'
mock_query.parse_uuid4.return_value = uuid_test
stub_entities = [{'uuid': uuid_test}]
mock_backend.entity_get.return_value = stub_entities
# when
actual_entities = service.lookup_entity_by_uuid(uuid_test)
# then
self.assertEquals(actual_entities, stub_entities)
mock_query.parse_uuid4.assert_called_once_with(uuid_test)
mock_backend.entity_get.assert_called_once_with(uuid_test)
@istest
def lookup_revision_through_ko_not_implemented(self):
# then
with self.assertRaises(NotImplementedError):
service.lookup_revision_through({
'something-unknown': 10,
})
@patch('swh.web.ui.service.lookup_revision_with_context_by')
@istest
def lookup_revision_through_with_context_by(self, mock_lookup):
# given
stub_rev = {'id': 'rev'}
mock_lookup.return_value = stub_rev
# when
actual_revision = service.lookup_revision_through({
'origin_id': 1,
'branch_name': 'master',
'ts': None,
'sha1_git': 'sha1-git'
}, limit=1000)
# then
self.assertEquals(actual_revision, stub_rev)
mock_lookup.assert_called_once_with(
1, 'master', None, 'sha1-git', 1000)
@patch('swh.web.ui.service.lookup_revision_by')
@istest
def lookup_revision_through_with_revision_by(self, mock_lookup):
# given
stub_rev = {'id': 'rev'}
mock_lookup.return_value = stub_rev
# when
actual_revision = service.lookup_revision_through({
'origin_id': 2,
'branch_name': 'master2',
'ts': 'some-ts',
}, limit=10)
# then
self.assertEquals(actual_revision, stub_rev)
mock_lookup.assert_called_once_with(
2, 'master2', 'some-ts')
@patch('swh.web.ui.service.lookup_revision_with_context')
@istest
def lookup_revision_through_with_context(self, mock_lookup):
# given
stub_rev = {'id': 'rev'}
mock_lookup.return_value = stub_rev
# when
actual_revision = service.lookup_revision_through({
'sha1_git_root': 'some-sha1-root',
'sha1_git': 'some-sha1',
})
# then
self.assertEquals(actual_revision, stub_rev)
mock_lookup.assert_called_once_with(
'some-sha1-root', 'some-sha1', 100)
@patch('swh.web.ui.service.lookup_revision')
@istest
def lookup_revision_through_with_revision(self, mock_lookup):
# given
stub_rev = {'id': 'rev'}
mock_lookup.return_value = stub_rev
# when
actual_revision = service.lookup_revision_through({
'sha1_git': 'some-sha1',
})
# then
self.assertEquals(actual_revision, stub_rev)
mock_lookup.assert_called_once_with(
'some-sha1')
@patch('swh.web.ui.service.lookup_revision_through')
@istest
def lookup_directory_through_revision_ko_not_found(
self, mock_lookup_rev):
# given
mock_lookup_rev.return_value = None
# when
with self.assertRaises(NotFoundExc):
service.lookup_directory_through_revision(
{'id': 'rev'}, 'some/path', 100)
mock_lookup_rev.assert_called_once_with({'id': 'rev'}, 100)
@patch('swh.web.ui.service.lookup_revision_through')
@patch('swh.web.ui.service.lookup_directory_with_revision')
@istest
def lookup_directory_through_revision_ok_with_data(
self, mock_lookup_dir, mock_lookup_rev):
# given
mock_lookup_rev.return_value = {'id': 'rev-id'}
mock_lookup_dir.return_value = {'type': 'dir',
'content': []}
# when
rev_id, dir_result = service.lookup_directory_through_revision(
{'id': 'rev'}, 'some/path', 100)
# then
self.assertEquals(rev_id, 'rev-id')
self.assertEquals(dir_result, {'type': 'dir',
'content': []})
mock_lookup_rev.assert_called_once_with({'id': 'rev'}, 100)
mock_lookup_dir.assert_called_once_with('rev-id', 'some/path', False)
@patch('swh.web.ui.service.lookup_revision_through')
@patch('swh.web.ui.service.lookup_directory_with_revision')
@istest
def lookup_directory_through_revision_ok_with_content(
self, mock_lookup_dir, mock_lookup_rev):
# given
mock_lookup_rev.return_value = {'id': 'rev-id'}
stub_result = {'type': 'file',
'revision': 'rev-id',
'content': {'data': b'blah',
'sha1': 'sha1'}}
mock_lookup_dir.return_value = stub_result
# when
rev_id, dir_result = service.lookup_directory_through_revision(
{'id': 'rev'}, 'some/path', 10, with_data=True)
# then
self.assertEquals(rev_id, 'rev-id')
self.assertEquals(dir_result, stub_result)
mock_lookup_rev.assert_called_once_with({'id': 'rev'}, 10)
mock_lookup_dir.assert_called_once_with('rev-id', 'some/path', True)
diff --git a/swh/web/ui/tests/views/test_api.py b/swh/web/ui/tests/views/test_api.py
index 945c5bb7..8b12c24d 100644
--- a/swh/web/ui/tests/views/test_api.py
+++ b/swh/web/ui/tests/views/test_api.py
@@ -1,1929 +1,1970 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
import unittest
import yaml
from nose.tools import istest
from unittest.mock import patch, MagicMock
from swh.web.ui.tests import test_app
from swh.web.ui import exc
from swh.web.ui.views import api
from swh.web.ui.exc import NotFoundExc, BadInputExc
from swh.storage.exc import StorageDBError, StorageAPIError
class ApiTestCase(test_app.SWHApiTestCase):
@istest
def generic_api_lookup_nothing_is_found(self):
# given
def test_generic_lookup_fn(sha1, another_unused_arg):
assert another_unused_arg == 'unused arg'
assert sha1 == 'sha1'
return None
# when
with self.assertRaises(NotFoundExc) as cm:
api._api_lookup('sha1', test_generic_lookup_fn,
'This will be raised because None is returned.',
lambda x: x,
'unused arg')
self.assertIn('This will be raised because None is returned.',
cm.exception.args[0])
@istest
def generic_api_map_are_enriched_and_transformed_to_list(self):
# given
def test_generic_lookup_fn_1(criteria0, param0, param1):
assert criteria0 == 'something'
return map(lambda x: x + 1, [1, 2, 3])
# when
actual_result = api._api_lookup(
'something',
test_generic_lookup_fn_1,
'This is not the error message you are looking for. Move along.',
lambda x: x * 2,
'some param 0',
'some param 1')
self.assertEqual(actual_result, [4, 6, 8])
@istest
def generic_api_list_are_enriched_too(self):
# given
def test_generic_lookup_fn_2(crit):
assert crit == 'something'
return ['a', 'b', 'c']
# when
actual_result = api._api_lookup(
'something',
test_generic_lookup_fn_2,
'Not the error message you are looking for, it is. '
'Along, you move!',
lambda x: ''. join(['=', x, '=']))
self.assertEqual(actual_result, ['=a=', '=b=', '=c='])
@istest
def generic_api_generator_are_enriched_and_returned_as_list(self):
# given
def test_generic_lookup_fn_3(crit):
assert crit == 'crit'
return (i for i in [4, 5, 6])
# when
actual_result = api._api_lookup(
'crit',
test_generic_lookup_fn_3,
'Move!',
lambda x: x - 1)
self.assertEqual(actual_result, [3, 4, 5])
@istest
def generic_api_simple_data_are_enriched_and_returned_too(self):
# given
def test_generic_lookup_fn_4(crit):
assert crit == '123'
return {'a': 10}
def test_enrich_data(x):
x['a'] = x['a'] * 10
return x
# when
actual_result = api._api_lookup(
'123',
test_generic_lookup_fn_4,
'Nothing to do',
test_enrich_data)
self.assertEqual(actual_result, {'a': 100})
@patch('swh.web.ui.views.api.service')
@istest
def api_content_provenance(self, mock_service):
stub_provenances = [{
'origin': 1,
'visit': 2,
'revision': 'b04caf10e9535160d90e874b45aa426de762f19f',
'content': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'path': 'octavio-3.4.0/octave.html/doc_002dS_005fISREG.html'
}]
mock_service.lookup_content_provenance.return_value = stub_provenances
# when
rv = self.app.get(
'/api/1/provenance/'
'sha1_git:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, [{
'origin': 1,
'visit': 2,
'origin_url': '/api/1/origin/1/',
'revision': 'b04caf10e9535160d90e874b45aa426de762f19f',
'revision_url': '/api/1/revision/'
'b04caf10e9535160d90e874b45aa426de762f19f/',
'content': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'content_url': '/api/1/content/'
'sha1_git:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/',
'path': 'octavio-3.4.0/octave.html/doc_002dS_005fISREG.html'
}])
mock_service.lookup_content_provenance.assert_called_once_with(
'sha1_git:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
@patch('swh.web.ui.views.api.service')
@istest
def api_content_provenance_sha_not_found(self, mock_service):
# given
mock_service.lookup_content_provenance.return_value = None
# when
rv = self.app.get(
'/api/1/provenance/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Content with sha1:40e71b8614fcd89ccd17ca2b1d9e6'
'6c5b00a6d03 not found.'
})
@patch('swh.web.ui.views.api.service')
@istest
def api_content_metadata(self, mock_service):
# given
mock_service.lookup_content.return_value = {
'sha1': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'sha1_git': 'b4e8f472ffcb01a03875b26e462eb568739f6882',
'sha256': '83c0e67cc80f60caf1fcbec2d84b0ccd7968b3be4735637006560'
'cde9b067a4f',
'length': 17,
'status': 'visible'
}
# when
rv = self.app.get(
'/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'data_url': '/api/1/content/'
'40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/raw/',
'sha1': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'sha1_git': 'b4e8f472ffcb01a03875b26e462eb568739f6882',
'sha256': '83c0e67cc80f60caf1fcbec2d84b0ccd7968b3be4735637006560c'
'de9b067a4f',
'length': 17,
'status': 'visible'
})
mock_service.lookup_content.assert_called_once_with(
'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
@patch('swh.web.ui.views.api.service')
@istest
def api_content_not_found_as_json(self, mock_service):
# given
mock_service.lookup_content.return_value = None
mock_service.lookup_content_provenance = MagicMock()
# when
rv = self.app.get(
'/api/1/content/sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3'
'be4735637006560c/')
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Content with sha256:83c0e67cc80f60caf1fcbec2d84b0ccd79'
'68b3be4735637006560c not found.'
})
mock_service.lookup_content.assert_called_once_with(
'sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3'
'be4735637006560c')
mock_service.lookup_content_provenance.called = False
@patch('swh.web.ui.views.api.service')
@istest
def api_content_not_found_as_yaml(self, mock_service):
# given
mock_service.lookup_content.return_value = None
mock_service.lookup_content_provenance = MagicMock()
# when
rv = self.app.get(
'/api/1/content/sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3'
'be4735637006560c/',
headers={'accept': 'application/yaml'})
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/yaml')
response_data = yaml.load(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Content with sha256:83c0e67cc80f60caf1fcbec2d84b0ccd79'
'68b3be4735637006560c not found.'
})
mock_service.lookup_content.assert_called_once_with(
'sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3'
'be4735637006560c')
mock_service.lookup_content_provenance.called = False
@patch('swh.web.ui.views.api.service')
@istest
def api_content_raw_ko_not_found(self, mock_service):
# given
mock_service.lookup_content_raw.return_value = None
# when
rv = self.app.get(
'/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'
'/raw/')
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Content with sha1:40e71b8614fcd89ccd17ca2b1d9e6'
'6c5b00a6d03 not found.'
})
mock_service.lookup_content_raw.assert_called_once_with(
'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
@patch('swh.web.ui.views.api.service')
@istest
def api_content_raw(self, mock_service):
# given
stub_content = {'data': b'some content data'}
mock_service.lookup_content_raw.return_value = stub_content
# when
rv = self.app.get(
'/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'
'/raw/',
headers={'Content-type': 'application/octet-stream',
'Content-disposition': 'attachment'})
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/octet-stream')
self.assertEquals(rv.data, stub_content['data'])
mock_service.lookup_content_raw.assert_called_once_with(
'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
@patch('swh.web.ui.views.api.service')
@istest
def api_search(self, mock_service):
# given
mock_service.search_hash.return_value = {'found': True}
expected_result = {
'search_stats': {'nbfiles': 1, 'pct': 100},
'search_res': [{'filename': None,
'sha1': 'sha1:blah',
'found': True}]
}
# when
rv = self.app.get('/api/1/content/search/sha1:blah/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_result)
mock_service.search_hash.assert_called_once_with('sha1:blah')
@patch('swh.web.ui.views.api.service')
@istest
def api_search_as_yaml(self, mock_service):
# given
mock_service.search_hash.return_value = {'found': True}
expected_result = {
'search_stats': {'nbfiles': 1, 'pct': 100},
'search_res': [{'filename': None,
'sha1': 'sha1:halb',
'found': True}]
}
# when
rv = self.app.get('/api/1/content/search/sha1:halb/',
headers={'Accept': 'application/yaml'})
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/yaml')
response_data = yaml.load(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_result)
mock_service.search_hash.assert_called_once_with('sha1:halb')
@patch('swh.web.ui.views.api.service')
@istest
def api_search_not_found(self, mock_service):
# given
mock_service.search_hash.return_value = {'found': False}
expected_result = {
'search_stats': {'nbfiles': 1, 'pct': 0},
'search_res': [{'filename': None,
'sha1': 'sha1:halb',
'found': False}]
}
# when
rv = self.app.get('/api/1/content/search/sha1:halb/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_result)
mock_service.search_hash.assert_called_once_with('sha1:halb')
@patch('swh.web.ui.views.api.service')
@istest
def api_1_stat_counters_raise_error(self, mock_service):
# given
mock_service.stat_counters.side_effect = ValueError(
'voluntary error to check the bad request middleware.')
# when
rv = self.app.get('/api/1/stat/counters/')
# then
self.assertEquals(rv.status_code, 400)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'voluntary error to check the bad request middleware.'})
@patch('swh.web.ui.views.api.service')
@istest
def api_1_stat_counters_raise_swh_storage_error_db(self, mock_service):
# given
mock_service.stat_counters.side_effect = StorageDBError(
'SWH Storage exploded! Will be back online shortly!')
# when
rv = self.app.get('/api/1/stat/counters/')
# then
self.assertEquals(rv.status_code, 503)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error':
'An unexpected error occurred in the backend: '
'SWH Storage exploded! Will be back online shortly!'})
@patch('swh.web.ui.views.api.service')
@istest
def api_1_stat_counters_raise_swh_storage_error_api(self, mock_service):
# given
mock_service.stat_counters.side_effect = StorageAPIError(
'SWH Storage API dropped dead! Will resurrect from its ashes asap!'
)
# when
rv = self.app.get('/api/1/stat/counters/')
# then
self.assertEquals(rv.status_code, 503)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error':
'An unexpected error occurred in the api backend: '
'SWH Storage API dropped dead! Will resurrect from its ashes asap!'
})
@patch('swh.web.ui.views.api.service')
@istest
def api_1_stat_counters(self, mock_service):
# given
stub_stats = {
"content": 1770830,
"directory": 211683,
"directory_entry_dir": 209167,
"directory_entry_file": 1807094,
"directory_entry_rev": 0,
"entity": 0,
"entity_history": 0,
"occurrence": 0,
"occurrence_history": 19600,
"origin": 1096,
"person": 0,
"release": 8584,
"revision": 7792,
"revision_history": 0,
"skipped_content": 0
}
mock_service.stat_counters.return_value = stub_stats
# when
rv = self.app.get('/api/1/stat/counters/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, stub_stats)
mock_service.stat_counters.assert_called_once_with()
@patch('swh.web.ui.views.api.service')
@istest
def api_1_lookup_origin_visits_raise_error(self, mock_service):
# given
mock_service.lookup_origin_visits.side_effect = ValueError(
'voluntary error to check the bad request middleware.')
# when
rv = self.app.get('/api/1/origin/2/visits/')
# then
self.assertEquals(rv.status_code, 400)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'voluntary error to check the bad request middleware.'})
@patch('swh.web.ui.views.api.service')
@istest
def api_1_lookup_origin_visits_raise_swh_storage_error_db(
self, mock_service):
# given
mock_service.lookup_origin_visits.side_effect = StorageDBError(
'SWH Storage exploded! Will be back online shortly!')
# when
rv = self.app.get('/api/1/origin/2/visits/')
# then
self.assertEquals(rv.status_code, 503)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error':
'An unexpected error occurred in the backend: '
'SWH Storage exploded! Will be back online shortly!'})
@patch('swh.web.ui.views.api.service')
@istest
def api_1_lookup_origin_visits_raise_swh_storage_error_api(
self, mock_service):
# given
mock_service.lookup_origin_visits.side_effect = StorageAPIError(
'SWH Storage API dropped dead! Will resurrect from its ashes asap!'
)
# when
rv = self.app.get('/api/1/origin/2/visits/')
# then
self.assertEquals(rv.status_code, 503)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error':
'An unexpected error occurred in the api backend: '
'SWH Storage API dropped dead! Will resurrect from its ashes asap!'
})
@patch('swh.web.ui.views.api.service')
@istest
def api_1_lookup_origin_visits(self, mock_service):
# given
stub_visits = [
{
'date': 1104616800.0,
'origin': 1,
'visit': 1
},
{
'date': 1293919200.0,
'origin': 1,
'visit': 2
},
{
'date': 1420149600.0,
'origin': 1,
'visit': 3
}
]
mock_service.lookup_origin_visits.return_value = stub_visits
# when
rv = self.app.get('/api/1/origin/2/visits/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, stub_visits)
mock_service.lookup_origin_visits.assert_called_once_with(2)
+ @patch('swh.web.ui.views.api.service')
+ @istest
+ def api_1_lookup_origin_visit(self, mock_service):
+ # given
+ stub_origin_visit = {
+ 'date': 1104616800.0,
+ 'origin': 10,
+ 'visit': 100,
+ 'metadata': None,
+ 'status': 'full',
+ }
+ mock_service.lookup_origin_visit.return_value = stub_origin_visit
+
+ # when
+ rv = self.app.get('/api/1/origin/10/visits/100/')
+
+ self.assertEquals(rv.status_code, 200)
+ self.assertEquals(rv.mimetype, 'application/json')
+ response_data = json.loads(rv.data.decode('utf-8'))
+ self.assertEquals(response_data, stub_origin_visit)
+
+ mock_service.lookup_origin_visit.assert_called_once_with(10, 100)
+
+ @patch('swh.web.ui.views.api.service')
+ @istest
+ def api_1_lookup_origin_visit_not_found(self, mock_service):
+ # given
+ mock_service.lookup_origin_visit.return_value = None
+
+ # when
+ rv = self.app.get('/api/1/origin/1/visits/1000/')
+
+ self.assertEquals(rv.status_code, 404)
+ self.assertEquals(rv.mimetype, 'application/json')
+ response_data = json.loads(rv.data.decode('utf-8'))
+ self.assertEquals(response_data, {
+ 'error': 'No visit 1000 for origin 1 found'
+ })
+
+ mock_service.lookup_origin_visit.assert_called_once_with(1, 1000)
+
@patch('swh.web.ui.views.api.service')
@istest
def api_origin_by_id(self, mock_service):
# given
stub_origin = {
'id': 1234,
'lister': 'uuid-lister-0',
'project': 'uuid-project-0',
'url': 'ftp://some/url/to/origin/0',
'type': 'ftp'
}
mock_service.lookup_origin.return_value = stub_origin
# when
rv = self.app.get('/api/1/origin/1234/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, stub_origin)
mock_service.lookup_origin.assert_called_with({'id': 1234})
@patch('swh.web.ui.views.api.service')
@istest
def api_origin_by_type_url(self, mock_service):
# given
stub_origin = {
'id': 1234,
'lister': 'uuid-lister-0',
'project': 'uuid-project-0',
'url': 'ftp://some/url/to/origin/0',
'type': 'ftp'
}
mock_service.lookup_origin.return_value = stub_origin
# when
rv = self.app.get('/api/1/origin/ftp/url/ftp://some/url/to/origin/0/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, stub_origin)
mock_service.lookup_origin.assert_called_with(
{'url': 'ftp://some/url/to/origin/0',
'type': 'ftp'})
@patch('swh.web.ui.views.api.service')
@istest
def api_origin_not_found(self, mock_service):
# given
mock_service.lookup_origin.return_value = None
# when
rv = self.app.get('/api/1/origin/4321/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Origin with id 4321 not found.'
})
mock_service.lookup_origin.assert_called_with({'id': 4321})
@patch('swh.web.ui.views.api.service')
@istest
def api_release(self, mock_service):
# given
stub_release = {
'id': 'release-0',
'target_type': 'revision',
'target': 'revision-sha1',
"date": "Mon, 10 Mar 1997 08:00:00 GMT",
"synthetic": True,
'author': {
'name': 'author release name',
'email': 'author@email',
},
}
expected_release = {
'id': 'release-0',
'target_type': 'revision',
'target': 'revision-sha1',
'target_url': '/api/1/revision/revision-sha1/',
"date": "Mon, 10 Mar 1997 08:00:00 GMT",
"synthetic": True,
'author': {
'name': 'author release name',
'email': 'author@email',
},
}
mock_service.lookup_release.return_value = stub_release
# when
rv = self.app.get('/api/1/release/release-0/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_release)
mock_service.lookup_release.assert_called_once_with('release-0')
@patch('swh.web.ui.views.api.service')
@istest
def api_release_target_type_not_a_revision(self, mock_service):
# given
stub_release = {
'id': 'release-0',
'target_type': 'other-stuff',
'target': 'other-stuff-checksum',
"date": "Mon, 10 Mar 1997 08:00:00 GMT",
"synthetic": True,
'author': {
'name': 'author release name',
'email': 'author@email',
},
}
expected_release = {
'id': 'release-0',
'target_type': 'other-stuff',
'target': 'other-stuff-checksum',
"date": "Mon, 10 Mar 1997 08:00:00 GMT",
"synthetic": True,
'author': {
'name': 'author release name',
'email': 'author@email',
},
}
mock_service.lookup_release.return_value = stub_release
# when
rv = self.app.get('/api/1/release/release-0/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_release)
mock_service.lookup_release.assert_called_once_with('release-0')
@patch('swh.web.ui.views.api.service')
@istest
def api_release_not_found(self, mock_service):
# given
mock_service.lookup_release.return_value = None
# when
rv = self.app.get('/api/1/release/release-0/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Release with sha1_git release-0 not found.'
})
@patch('swh.web.ui.views.api.service')
@istest
def api_revision(self, mock_service):
# given
stub_revision = {
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'author_name': 'Software Heritage',
'author_email': 'robot@softwareheritage.org',
'committer_name': 'Software Heritage',
'committer_email': 'robot@softwareheritage.org',
'message': 'synthetic revision message',
'date_offset': 0,
'committer_date_offset': 0,
'parents': ['8734ef7e7c357ce2af928115c6c6a42b7e2a44e7'],
'type': 'tar',
'synthetic': True,
'metadata': {
'original_artifact': [{
'archive_type': 'tar',
'name': 'webbase-5.7.0.tar.gz',
'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd',
'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1',
'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f'
'309d36484e7edf7bb912'
}]
},
}
mock_service.lookup_revision.return_value = stub_revision
expected_revision = {
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'url': '/api/1/revision/18d8be353ed3480476f032475e7c233eff7371d5/',
'history_url': '/api/1/revision/18d8be353ed3480476f032475e7c233e'
'ff7371d5/log/',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'directory_url': '/api/1/directory/7834ef7e7c357ce2af928115c6c6'
'a42b7e2a44e6/',
'author_name': 'Software Heritage',
'author_email': 'robot@softwareheritage.org',
'committer_name': 'Software Heritage',
'committer_email': 'robot@softwareheritage.org',
'message': 'synthetic revision message',
'date_offset': 0,
'committer_date_offset': 0,
'parents': [
'8734ef7e7c357ce2af928115c6c6a42b7e2a44e7'
],
'parent_urls': [
'/api/1/revision/8734ef7e7c357ce2af928115c6c6a42b7e2a44e7'
'/prev/18d8be353ed3480476f032475e7c233eff7371d5/'
],
'type': 'tar',
'synthetic': True,
'metadata': {
'original_artifact': [{
'archive_type': 'tar',
'name': 'webbase-5.7.0.tar.gz',
'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd',
'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1',
'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f'
'309d36484e7edf7bb912'
}]
},
}
# when
rv = self.app.get('/api/1/revision/'
'18d8be353ed3480476f032475e7c233eff7371d5/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_revision)
mock_service.lookup_revision.assert_called_once_with(
'18d8be353ed3480476f032475e7c233eff7371d5')
@patch('swh.web.ui.views.api.service')
@istest
def api_revision_not_found(self, mock_service):
# given
mock_service.lookup_revision.return_value = None
# when
rv = self.app.get('/api/1/revision/revision-0/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Revision with sha1_git revision-0 not found.'})
@patch('swh.web.ui.views.api.service')
@istest
def api_revision_raw_ok(self, mock_service):
# given
stub_revision = {'message': 'synthetic revision message'}
mock_service.lookup_revision_message.return_value = stub_revision
# when
rv = self.app.get('/api/1/revision/18d8be353ed3480476f032475e7c2'
'33eff7371d5/raw/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/octet-stream')
self.assertEquals(rv.data, b'synthetic revision message')
mock_service.lookup_revision_message.assert_called_once_with(
'18d8be353ed3480476f032475e7c233eff7371d5')
@patch('swh.web.ui.views.api.service')
@istest
def api_revision_raw_ok_no_msg(self, mock_service):
# given
mock_service.lookup_revision_message.side_effect = NotFoundExc(
'No message for revision')
# when
rv = self.app.get('/api/1/revision/'
'18d8be353ed3480476f032475e7c233eff7371d5/raw/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'No message for revision'})
self.assertEquals
mock_service.lookup_revision_message.assert_called_once_with(
'18d8be353ed3480476f032475e7c233eff7371d5')
@patch('swh.web.ui.views.api.service')
@istest
def api_revision_raw_ko_no_rev(self, mock_service):
# given
mock_service.lookup_revision_message.side_effect = NotFoundExc(
'No revision found')
# when
rv = self.app.get('/api/1/revision/'
'18d8be353ed3480476f032475e7c233eff7371d5/raw/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'No revision found'})
mock_service.lookup_revision_message.assert_called_once_with(
'18d8be353ed3480476f032475e7c233eff7371d5')
@patch('swh.web.ui.views.api.service')
@istest
def api_revision_with_origin_not_found(self, mock_service):
mock_service.lookup_revision_by.return_value = None
rv = self.app.get('/api/1/revision/origin/123/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertIn('Revision with (origin_id: 123', response_data['error'])
self.assertIn('not found', response_data['error'])
mock_service.lookup_revision_by.assert_called_once_with(
123,
'refs/heads/master',
None)
@patch('swh.web.ui.views.api.service')
@istest
def api_revision_with_origin(self, mock_service):
mock_revision = {
'id': '32',
'directory': '21',
'message': 'message 1',
'type': 'deb',
}
expected_revision = {
'id': '32',
'url': '/api/1/revision/32/',
'history_url': '/api/1/revision/32/log/',
'directory': '21',
'directory_url': '/api/1/directory/21/',
'message': 'message 1',
'type': 'deb',
}
mock_service.lookup_revision_by.return_value = mock_revision
rv = self.app.get('/api/1/revision/origin/1/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, expected_revision)
mock_service.lookup_revision_by.assert_called_once_with(
1,
'refs/heads/master',
None)
@patch('swh.web.ui.views.api.service')
@istest
def api_revision_with_origin_and_branch_name(self, mock_service):
mock_revision = {
'id': '12',
'directory': '23',
'message': 'message 2',
'type': 'tar',
}
mock_service.lookup_revision_by.return_value = mock_revision
expected_revision = {
'id': '12',
'url': '/api/1/revision/12/',
'history_url': '/api/1/revision/12/log/',
'directory': '23',
'directory_url': '/api/1/directory/23/',
'message': 'message 2',
'type': 'tar',
}
rv = self.app.get('/api/1/revision/origin/1/branch/refs/origin/dev/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, expected_revision)
mock_service.lookup_revision_by.assert_called_once_with(
1,
'refs/origin/dev',
None)
@patch('swh.web.ui.views.api.service')
@patch('swh.web.ui.views.api.utils')
@istest
def api_revision_with_origin_and_branch_name_and_timestamp(self,
mock_utils,
mock_service):
mock_revision = {
'id': '123',
'directory': '456',
'message': 'message 3',
'type': 'tar',
}
mock_service.lookup_revision_by.return_value = mock_revision
expected_revision = {
'id': '123',
'url': '/api/1/revision/123/',
'history_url': '/api/1/revision/123/log/',
'directory': '456',
'directory_url': '/api/1/directory/456/',
'message': 'message 3',
'type': 'tar',
}
mock_utils.parse_timestamp.return_value = 'parsed-date'
mock_utils.enrich_revision.return_value = expected_revision
rv = self.app.get('/api/1/revision'
'/origin/1'
'/branch/refs/origin/dev'
'/ts/1452591542/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, expected_revision)
mock_service.lookup_revision_by.assert_called_once_with(
1,
'refs/origin/dev',
'parsed-date')
mock_utils.parse_timestamp.assert_called_once_with('1452591542')
mock_utils.enrich_revision.assert_called_once_with(
mock_revision)
@patch('swh.web.ui.views.api.service')
@patch('swh.web.ui.views.api.utils')
@istest
def api_revision_with_origin_and_branch_name_and_timestamp_with_escapes(
self,
mock_utils,
mock_service):
mock_revision = {
'id': '999',
}
mock_service.lookup_revision_by.return_value = mock_revision
expected_revision = {
'id': '999',
'url': '/api/1/revision/999/',
'history_url': '/api/1/revision/999/log/',
}
mock_utils.parse_timestamp.return_value = 'parsed-date'
mock_utils.enrich_revision.return_value = expected_revision
rv = self.app.get('/api/1/revision'
'/origin/1'
'/branch/refs%2Forigin%2Fdev'
'/ts/Today%20is%20'
'January%201,%202047%20at%208:21:00AM/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, expected_revision)
mock_service.lookup_revision_by.assert_called_once_with(
1,
'refs/origin/dev',
'parsed-date')
mock_utils.parse_timestamp.assert_called_once_with(
'Today is January 1, 2047 at 8:21:00AM')
mock_utils.enrich_revision.assert_called_once_with(
mock_revision)
@patch('swh.web.ui.views.api.service')
@istest
def revision_directory_by_ko_raise(self, mock_service):
# given
mock_service.lookup_directory_through_revision.side_effect = NotFoundExc('not') # noqa
# when
with self.assertRaises(NotFoundExc):
api._revision_directory_by(
{'sha1_git': 'id'},
None,
'/api/1/revision/sha1/directory/')
# then
mock_service.lookup_directory_through_revision.assert_called_once_with(
{'sha1_git': 'id'},
None, limit=100, with_data=False)
@patch('swh.web.ui.views.api.service')
@istest
def revision_directory_by_type_dir(self, mock_service):
# given
mock_service.lookup_directory_through_revision.return_value = (
'rev-id',
{
'type': 'dir',
'revision': 'rev-id',
'path': 'some/path',
'content': []
})
# when
actual_dir_content = api._revision_directory_by(
{'sha1_git': 'blah-id'},
'some/path', '/api/1/revision/sha1/directory/')
# then
self.assertEquals(actual_dir_content, {
'type': 'dir',
'revision': 'rev-id',
'path': 'some/path',
'content': []
})
mock_service.lookup_directory_through_revision.assert_called_once_with(
{'sha1_git': 'blah-id'},
'some/path', limit=100, with_data=False)
@patch('swh.web.ui.views.api.service')
@istest
def revision_directory_by_type_file(self, mock_service):
# given
mock_service.lookup_directory_through_revision.return_value = (
'rev-id',
{
'type': 'file',
'revision': 'rev-id',
'path': 'some/path',
'content': {'blah': 'blah'}
})
# when
actual_dir_content = api._revision_directory_by(
{'sha1_git': 'sha1'},
'some/path',
'/api/1/revision/origin/2/directory/',
limit=1000, with_data=True)
# then
self.assertEquals(actual_dir_content, {
'type': 'file',
'revision': 'rev-id',
'path': 'some/path',
'content': {'blah': 'blah'}
})
mock_service.lookup_directory_through_revision.assert_called_once_with(
{'sha1_git': 'sha1'},
'some/path', limit=1000, with_data=True)
@patch('swh.web.ui.views.api.utils')
@patch('swh.web.ui.views.api._revision_directory_by')
@istest
def api_directory_through_revision_origin_ko_not_found(self,
mock_rev_dir,
mock_utils):
mock_rev_dir.side_effect = NotFoundExc('not found')
mock_utils.parse_timestamp.return_value = '2012-10-20 00:00:00'
rv = self.app.get('/api/1/revision'
'/origin/10'
'/branch/refs/remote/origin/dev'
'/ts/2012-10-20'
'/directory/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, {
'error': 'not found'})
mock_rev_dir.assert_called_once_with(
{'origin_id': 10,
'branch_name': 'refs/remote/origin/dev',
'ts': '2012-10-20 00:00:00'}, None,
'/api/1/revision'
'/origin/10'
'/branch/refs/remote/origin/dev'
'/ts/2012-10-20'
'/directory/',
with_data=False)
@patch('swh.web.ui.views.api._revision_directory_by')
@istest
def api_directory_through_revision_origin(self,
mock_revision_dir):
expected_res = [{
'id': '123'
}]
mock_revision_dir.return_value = expected_res
rv = self.app.get('/api/1/revision/origin/3/directory/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, expected_res)
mock_revision_dir.assert_called_once_with({
'origin_id': 3,
'branch_name': 'refs/heads/master',
'ts': None}, None, '/api/1/revision/origin/3/directory/',
with_data=False)
@patch('swh.web.ui.views.api.service')
@istest
def api_revision_log(self, mock_service):
# given
stub_revisions = [{
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'author_name': 'Software Heritage',
'author_email': 'robot@softwareheritage.org',
'committer_name': 'Software Heritage',
'committer_email': 'robot@softwareheritage.org',
'message': 'synthetic revision message',
'date_offset': 0,
'committer_date_offset': 0,
'parents': ['7834ef7e7c357ce2af928115c6c6a42b7e2a4345'],
'type': 'tar',
'synthetic': True,
}]
mock_service.lookup_revision_log.return_value = stub_revisions
expected_revisions = [{
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'url': '/api/1/revision/18d8be353ed3480476f032475e7c233eff7371d5/',
'history_url': '/api/1/revision/18d8be353ed3480476f032475e7c233ef'
'f7371d5/log/',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'directory_url': '/api/1/directory/7834ef7e7c357ce2af928115c6c6a'
'42b7e2a44e6/',
'author_name': 'Software Heritage',
'author_email': 'robot@softwareheritage.org',
'committer_name': 'Software Heritage',
'committer_email': 'robot@softwareheritage.org',
'message': 'synthetic revision message',
'date_offset': 0,
'committer_date_offset': 0,
'parents': [
'7834ef7e7c357ce2af928115c6c6a42b7e2a4345'
],
'parent_urls': [
'/api/1/revision/7834ef7e7c357ce2af928115c6c6a42b7e2a4345'
'/prev/18d8be353ed3480476f032475e7c233eff7371d5/'
],
'type': 'tar',
'synthetic': True,
}]
expected_response = {
'revisions': expected_revisions,
'next_revs_url': None
}
# when
rv = self.app.get('/api/1/revision/8834ef7e7c357ce2af928115c6c6a42'
'b7e2a44e6/log/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_response)
mock_service.lookup_revision_log.assert_called_once_with(
'8834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 26)
@patch('swh.web.ui.views.api.service')
@istest
def api_revision_log_with_next(self, mock_service):
# given
stub_revisions = []
for i in range(27):
stub_revisions.append({'id': i})
mock_service.lookup_revision_log.return_value = stub_revisions[:26]
expected_revisions = [x for x in stub_revisions if x['id'] < 25]
for e in expected_revisions:
e['url'] = '/api/1/revision/%s/' % e['id']
e['history_url'] = '/api/1/revision/%s/log/' % e['id']
expected_response = {
'revisions': expected_revisions,
'next_revs_url': '/api/1/revision/25/log/'
}
# when
rv = self.app.get('/api/1/revision/8834ef7e7c357ce2af928115c6c6a42'
'b7e2a44e6/log/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_response)
mock_service.lookup_revision_log.assert_called_once_with(
'8834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 26)
@patch('swh.web.ui.views.api.service')
@istest
def api_revision_log_not_found(self, mock_service):
# given
mock_service.lookup_revision_log.return_value = None
# when
rv = self.app.get('/api/1/revision/8834ef7e7c357ce2af928115c6c6a42b7'
'e2a44e6/log/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Revision with sha1_git'
' 8834ef7e7c357ce2af928115c6c6a42b7e2a44e6 not found.'})
mock_service.lookup_revision_log.assert_called_once_with(
'8834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 26)
@patch('swh.web.ui.views.api.service')
@istest
def api_revision_log_context(self, mock_service):
# given
stub_revisions = [{
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'author_name': 'Software Heritage',
'author_email': 'robot@softwareheritage.org',
'committer_name': 'Software Heritage',
'committer_email': 'robot@softwareheritage.org',
'message': 'synthetic revision message',
'date_offset': 0,
'committer_date_offset': 0,
'parents': ['7834ef7e7c357ce2af928115c6c6a42b7e2a4345'],
'type': 'tar',
'synthetic': True,
}]
mock_service.lookup_revision_log.return_value = stub_revisions
mock_service.lookup_revision_multiple.return_value = [{
'id': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'directory': '18d8be353ed3480476f032475e7c233eff7371d5',
'author_name': 'Name Surname',
'author_email': 'name@surname.com',
'committer_name': 'Name Surname',
'committer_email': 'name@surname.com',
'message': 'amazing revision message',
'date_offset': 0,
'committer_date_offset': 0,
'parents': ['adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'],
'type': 'tar',
'synthetic': True,
}]
expected_revisions = [
{
'url': '/api/1/revision/'
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6/',
'history_url': '/api/1/revision/'
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6/log/',
'id': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'directory': '18d8be353ed3480476f032475e7c233eff7371d5',
'directory_url': '/api/1/directory/'
'18d8be353ed3480476f032475e7c233eff7371d5/',
'author_name': 'Name Surname',
'author_email': 'name@surname.com',
'committer_name': 'Name Surname',
'committer_email': 'name@surname.com',
'message': 'amazing revision message',
'date_offset': 0,
'committer_date_offset': 0,
'parents': ['adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'],
'parent_urls': [
'/api/1/revision/adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'
'/prev/7834ef7e7c357ce2af928115c6c6a42b7e2a44e6/'
],
'type': 'tar',
'synthetic': True,
},
{
'url': '/api/1/revision/'
'18d8be353ed3480476f032475e7c233eff7371d5/',
'history_url': '/api/1/revision/'
'18d8be353ed3480476f032475e7c233eff7371d5/log/',
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'directory_url': '/api/1/directory/'
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6/',
'author_name': 'Software Heritage',
'author_email': 'robot@softwareheritage.org',
'committer_name': 'Software Heritage',
'committer_email': 'robot@softwareheritage.org',
'message': 'synthetic revision message',
'date_offset': 0,
'committer_date_offset': 0,
'parents': ['7834ef7e7c357ce2af928115c6c6a42b7e2a4345'],
'parent_urls': [
'/api/1/revision/7834ef7e7c357ce2af928115c6c6a42b7e2a4345'
'/prev/18d8be353ed3480476f032475e7c233eff7371d5/'
],
'type': 'tar',
'synthetic': True,
}]
expected_response = {
'revisions': expected_revisions,
'next_revs_url': None
}
# when
rv = self.app.get('/api/1/revision/18d8be353ed3480476f0'
'32475e7c233eff7371d5/prev/prev-rev/log/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_response)
mock_service.lookup_revision_log.assert_called_once_with(
'18d8be353ed3480476f032475e7c233eff7371d5', 26)
mock_service.lookup_revision_multiple.assert_called_once_with(
['prev-rev'])
@patch('swh.web.ui.views.api.service')
@istest
def api_revision_log_by(self, mock_service):
# given
stub_revisions = [{
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'author_name': 'Software Heritage',
'author_email': 'robot@softwareheritage.org',
'committer_name': 'Software Heritage',
'committer_email': 'robot@softwareheritage.org',
'message': 'synthetic revision message',
'date_offset': 0,
'committer_date_offset': 0,
'parents': ['7834ef7e7c357ce2af928115c6c6a42b7e2a4345'],
'type': 'tar',
'synthetic': True,
}]
mock_service.lookup_revision_log_by.return_value = stub_revisions
expected_revisions = [{
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'url': '/api/1/revision/18d8be353ed3480476f032475e7c233eff7371d5/',
'history_url': '/api/1/revision/18d8be353ed3480476f032475e7c233ef'
'f7371d5/log/',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'directory_url': '/api/1/directory/7834ef7e7c357ce2af928115c6c6a'
'42b7e2a44e6/',
'author_name': 'Software Heritage',
'author_email': 'robot@softwareheritage.org',
'committer_name': 'Software Heritage',
'committer_email': 'robot@softwareheritage.org',
'message': 'synthetic revision message',
'date_offset': 0,
'committer_date_offset': 0,
'parents': [
'7834ef7e7c357ce2af928115c6c6a42b7e2a4345'
],
'parent_urls': [
'/api/1/revision/7834ef7e7c357ce2af928115c6c6a42b7e2a4345'
'/prev/18d8be353ed3480476f032475e7c233eff7371d5/'
],
'type': 'tar',
'synthetic': True,
}]
expected_result = {
'revisions': expected_revisions,
'next_revs_url': None
}
# when
rv = self.app.get('/api/1/revision/origin/1/log/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_result)
mock_service.lookup_revision_log_by.assert_called_once_with(
1, 'refs/heads/master', None, 26)
@patch('swh.web.ui.views.api.service')
@istest
def api_revision_log_by_with_next(self, mock_service):
# given
stub_revisions = []
for i in range(27):
stub_revisions.append({'id': i})
mock_service.lookup_revision_log_by.return_value = stub_revisions[:26]
expected_revisions = [x for x in stub_revisions if x['id'] < 25]
for e in expected_revisions:
e['url'] = '/api/1/revision/%s/' % e['id']
e['history_url'] = '/api/1/revision/%s/log/' % e['id']
expected_response = {
'revisions': expected_revisions,
'next_revs_url': '/api/1/revision/25/log/'
}
# when
rv = self.app.get('/api/1/revision/origin/1/log/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_response)
mock_service.lookup_revision_log_by.assert_called_once_with(
1, 'refs/heads/master', None, 26)
@patch('swh.web.ui.views.api.service')
@istest
def api_revision_log_by_norev(self, mock_service):
# given
mock_service.lookup_revision_log_by.side_effect = NotFoundExc(
'No revision')
# when
rv = self.app.get('/api/1/revision/origin/1/log/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {'error': 'No revision'})
mock_service.lookup_revision_log_by.assert_called_once_with(
1, 'refs/heads/master', None, 26)
@patch('swh.web.ui.views.api.service')
@istest
def api_revision_history(self, mock_service):
# for readability purposes, we use:
# - sha1 as 3 letters (url are way too long otherwise to respect pep8)
# - only keys with modification steps (all other keys are kept as is)
# given
stub_revision = {
'id': '883',
'children': ['777', '999'],
'parents': [],
'directory': '272'
}
mock_service.lookup_revision.return_value = stub_revision
# then
rv = self.app.get('/api/1/revision/883/prev/999/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'id': '883',
'url': '/api/1/revision/883/',
'history_url': '/api/1/revision/883/log/',
'history_context_url': '/api/1/revision/883/prev/999/log/',
'children': ['777', '999'],
'children_urls': ['/api/1/revision/777/',
'/api/1/revision/999/'],
'parents': [],
'parent_urls': [],
'directory': '272',
'directory_url': '/api/1/directory/272/'
})
mock_service.lookup_revision.assert_called_once_with('883')
@patch('swh.web.ui.views.api._revision_directory_by')
@istest
def api_revision_directory_ko_not_found(self, mock_rev_dir):
# given
mock_rev_dir.side_effect = NotFoundExc('Not found')
# then
rv = self.app.get('/api/1/revision/999/directory/some/path/to/dir/')
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Not found'})
mock_rev_dir.assert_called_once_with(
{'sha1_git': '999'},
'some/path/to/dir',
'/api/1/revision/999/directory/some/path/to/dir/',
with_data=False)
@patch('swh.web.ui.views.api._revision_directory_by')
@istest
def api_revision_directory_ok_returns_dir_entries(self, mock_rev_dir):
stub_dir = {
'type': 'dir',
'revision': '999',
'content': [
{
'sha1_git': '789',
'type': 'file',
'target': '101',
'target_url': '/api/1/content/sha1_git:101/',
'name': 'somefile',
'file_url': '/api/1/revision/999/directory/some/path/'
'somefile/'
},
{
'sha1_git': '123',
'type': 'dir',
'target': '456',
'target_url': '/api/1/directory/456/',
'name': 'to-subdir',
'dir_url': '/api/1/revision/999/directory/some/path/'
'to-subdir/',
}]
}
# given
mock_rev_dir.return_value = stub_dir
# then
rv = self.app.get('/api/1/revision/999/directory/some/path/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, stub_dir)
mock_rev_dir.assert_called_once_with(
{'sha1_git': '999'},
'some/path',
'/api/1/revision/999/directory/some/path/',
with_data=False)
@patch('swh.web.ui.views.api._revision_directory_by')
@istest
def api_revision_directory_ok_returns_content(self, mock_rev_dir):
stub_content = {
'type': 'file',
'revision': '999',
'content': {
'sha1_git': '789',
'sha1': '101',
'data_url': '/api/1/content/101/raw/',
}
}
# given
mock_rev_dir.return_value = stub_content
# then
url = '/api/1/revision/666/directory/some/other/path/'
rv = self.app.get(url)
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, stub_content)
mock_rev_dir.assert_called_once_with(
{'sha1_git': '666'}, 'some/other/path', url, with_data=False)
@patch('swh.web.ui.views.api.service')
@istest
def api_person(self, mock_service):
# given
stub_person = {
'id': '198003',
'name': 'Software Heritage',
'email': 'robot@softwareheritage.org',
}
mock_service.lookup_person.return_value = stub_person
# when
rv = self.app.get('/api/1/person/198003/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, stub_person)
@patch('swh.web.ui.views.api.service')
@istest
def api_person_not_found(self, mock_service):
# given
mock_service.lookup_person.return_value = None
# when
rv = self.app.get('/api/1/person/666/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Person with id 666 not found.'})
@patch('swh.web.ui.views.api.service')
@istest
def api_directory(self, mock_service):
# given
stub_directories = [
{
'sha1_git': '18d8be353ed3480476f032475e7c233eff7371d5',
'type': 'file',
'target': '4568be353ed3480476f032475e7c233eff737123',
},
{
'sha1_git': '1d518d8be353ed3480476f032475e7c233eff737',
'type': 'dir',
'target': '8be353ed3480476f032475e7c233eff737123456',
}]
expected_directories = [
{
'sha1_git': '18d8be353ed3480476f032475e7c233eff7371d5',
'type': 'file',
'target': '4568be353ed3480476f032475e7c233eff737123',
'target_url': '/api/1/content/'
'sha1_git:4568be353ed3480476f032475e7c233eff737123/',
},
{
'sha1_git': '1d518d8be353ed3480476f032475e7c233eff737',
'type': 'dir',
'target': '8be353ed3480476f032475e7c233eff737123456',
'target_url':
'/api/1/directory/8be353ed3480476f032475e7c233eff737123456/',
}]
mock_service.lookup_directory.return_value = stub_directories
# when
rv = self.app.get('/api/1/directory/'
'18d8be353ed3480476f032475e7c233eff7371d5/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_directories)
mock_service.lookup_directory.assert_called_once_with(
'18d8be353ed3480476f032475e7c233eff7371d5')
@patch('swh.web.ui.views.api.service')
@istest
def api_directory_not_found(self, mock_service):
# given
mock_service.lookup_directory.return_value = []
# when
rv = self.app.get('/api/1/directory/'
'66618d8be353ed3480476f032475e7c233eff737/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Directory with sha1_git '
'66618d8be353ed3480476f032475e7c233eff737 not found.'})
@patch('swh.web.ui.views.api.service')
@istest
def api_directory_with_path_found(self, mock_service):
# given
expected_dir = {
'sha1_git': '18d8be353ed3480476f032475e7c233eff7371d5',
'type': 'file',
'name': 'bla',
'target': '4568be353ed3480476f032475e7c233eff737123',
'target_url': '/api/1/content/'
'sha1_git:4568be353ed3480476f032475e7c233eff737123/',
}
mock_service.lookup_directory_with_path.return_value = expected_dir
# when
rv = self.app.get('/api/1/directory/'
'18d8be353ed3480476f032475e7c233eff7371d5/bla/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_dir)
mock_service.lookup_directory_with_path.assert_called_once_with(
'18d8be353ed3480476f032475e7c233eff7371d5', 'bla')
@patch('swh.web.ui.views.api.service')
@istest
def api_directory_with_path_not_found(self, mock_service):
# given
mock_service.lookup_directory_with_path.return_value = None
path = 'some/path/to/dir/'
# when
rv = self.app.get(('/api/1/directory/'
'66618d8be353ed3480476f032475e7c233eff737/%s')
% path)
path = path.strip('/') # Path stripped of lead/trail separators
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': (('Entry with path %s relative to '
'directory with sha1_git '
'66618d8be353ed3480476f032475e7c233eff737 not found.')
% path)})
@patch('swh.web.ui.views.api.service')
@istest
def api_lookup_entity_by_uuid_not_found(self, mock_service):
# when
mock_service.lookup_entity_by_uuid.return_value = []
# when
rv = self.app.get('/api/1/entity/'
'5f4d4c51-498a-4e28-88b3-b3e4e8396cba/')
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error':
"Entity with uuid '5f4d4c51-498a-4e28-88b3-b3e4e8396cba' not " +
"found."})
mock_service.lookup_entity_by_uuid.assert_called_once_with(
'5f4d4c51-498a-4e28-88b3-b3e4e8396cba')
@patch('swh.web.ui.views.api.service')
@istest
def api_lookup_entity_by_uuid_bad_request(self, mock_service):
# when
mock_service.lookup_entity_by_uuid.side_effect = BadInputExc(
'bad input: uuid malformed!')
# when
rv = self.app.get('/api/1/entity/uuid malformed/')
self.assertEquals(rv.status_code, 400)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'bad input: uuid malformed!'})
mock_service.lookup_entity_by_uuid.assert_called_once_with(
'uuid malformed')
@patch('swh.web.ui.views.api.service')
@istest
def api_lookup_entity_by_uuid(self, mock_service):
# when
stub_entities = [
{
'uuid': '34bd6b1b-463f-43e5-a697-785107f598e4',
'parent': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2'
},
{
'uuid': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2'
}
]
mock_service.lookup_entity_by_uuid.return_value = stub_entities
expected_entities = [
{
'uuid': '34bd6b1b-463f-43e5-a697-785107f598e4',
'uuid_url': '/api/1/entity/34bd6b1b-463f-43e5-a697-'
'785107f598e4/',
'parent': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2',
'parent_url': '/api/1/entity/aee991a0-f8d7-4295-a201-'
'd1ce2efc9fb2/'
},
{
'uuid': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2',
'uuid_url': '/api/1/entity/aee991a0-f8d7-4295-a201-'
'd1ce2efc9fb2/'
}
]
# when
rv = self.app.get('/api/1/entity'
'/34bd6b1b-463f-43e5-a697-785107f598e4/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_entities)
mock_service.lookup_entity_by_uuid.assert_called_once_with(
'34bd6b1b-463f-43e5-a697-785107f598e4')
class ApiUtils(unittest.TestCase):
@istest
def api_lookup_not_found(self):
# when
with self.assertRaises(exc.NotFoundExc) as e:
api._api_lookup('something',
lambda x: None,
'this is the error message raised as it is None')
self.assertEqual(e.exception.args[0],
'this is the error message raised as it is None')
@istest
def api_lookup_with_result(self):
# when
actual_result = api._api_lookup('something',
lambda x: x + '!',
'this is the error which won\'t be '
'used here')
self.assertEqual(actual_result, 'something!')
@istest
def api_lookup_with_result_as_map(self):
# when
actual_result = api._api_lookup([1, 2, 3],
lambda x: map(lambda y: y+1, x),
'this is the error which won\'t be '
'used here')
self.assertEqual(actual_result, [2, 3, 4])
diff --git a/swh/web/ui/views/api.py b/swh/web/ui/views/api.py
index d3e8befa..954b6a99 100644
--- a/swh/web/ui/views/api.py
+++ b/swh/web/ui/views/api.py
@@ -1,780 +1,795 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from types import GeneratorType
from flask import request, url_for
from swh.web.ui import service, utils, apidoc as doc
from swh.web.ui.exc import NotFoundExc
from swh.web.ui.main import app
@app.route('/api/1/stat/counters/')
@doc.route('/api/1/stat/counters/', noargs=True)
@doc.returns(rettype=doc.rettypes.dict,
retdoc="A dictionary of SWH's most important statistics")
def api_stats():
"""Return statistics on SWH storage.
"""
return service.stat_counters()
@app.route('/api/1/origin/<int:origin_id>/visits/')
-@app.route('/api/1/origin/<int:origin_id>/visits/<int:visit_id>')
@doc.route('/api/1/origin/visits/')
@doc.arg('origin_id',
default=1,
argtype=doc.argtypes.int,
argdoc='The requested SWH origin identifier')
-@doc.arg('visit_id',
- default=None,
- argtype=doc.argtypes.int,
- argdoc='The requested SWH origin visit identifier')
@doc.returns(rettype=doc.rettypes.list,
retdoc="""All instances of visits of the origin pointed by
- origin_id as POSIX time since epoch (if visit_id is not defined).
- Only the instance of visit pointed by origin_id and visit_id.""")
-def api_origin_visits(origin_id, visit_id=None):
+ origin_id as POSIX time since epoch (if visit_id is not defined)
+""")
+def api_origin_visits(origin_id):
"""Return a list of visit dates as POSIX timestamps for the
given revision.
"""
- if not visit_id:
- return _api_lookup(
- origin_id,
- service.lookup_origin_visits,
- error_msg_if_not_found='No origin %s found' % origin_id)
- # TODO
+ return _api_lookup(
+ origin_id,
+ service.lookup_origin_visits,
+ error_msg_if_not_found='No origin %s found' % origin_id)
+
+
+@app.route('/api/1/origin/<int:origin_id>/visits/<int:visit_id>/')
+@doc.route('/api/1/origin/visits/id/')
+@doc.arg('origin_id',
+ default=1,
+ argtype=doc.argtypes.int,
+ argdoc='The requested SWH origin identifier')
+@doc.arg('visit_id',
+ default=None,
+ argtype=doc.argtypes.int,
+ argdoc='The requested SWH origin visit identifier')
+@doc.returns(rettype=doc.rettypes.list,
+ retdoc="""The single instance visit visit_id of the origin pointed
+ by origin_id as POSIX time since epoch""")
+def api_origin_visit(origin_id, visit_id):
+ return _api_lookup(
+ origin_id,
+ service.lookup_origin_visit,
+ 'No visit %s for origin %s found' % (visit_id, origin_id),
+ lambda x: x,
+ visit_id)
@app.route('/api/1/content/search/', methods=['POST'])
@app.route('/api/1/content/search/<string:q>/')
@doc.route('/api/1/content/search/')
@doc.arg('q',
default='sha1:adc83b19e793491b1c6ea0fd8b46cd9f32e592fc',
argtype=doc.argtypes.algo_and_hash,
argdoc="""An algo_hash:hash string, where algo_hash is one of sha1,
sha1_git or sha256 and hash is the hash to search for in SWH""")
@doc.raises(exc=doc.excs.badinput,
doc='Raised if q is not well formed')
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""A dict with keys:
- search_res: a list of dicts corresponding to queried content
with key 'found' to True if found, 'False' if not
- search_stats: a dict containing number of files searched and
percentage of files found
""")
def api_search(q=None):
"""Search a content per hash.
This may take the form of:
- a GET request with a single checksum
- a POST request with many hashes, with the request body containing
identifiers (typically filenames) as keys and corresponding hashes as
values.
"""
response = {'search_res': None,
'search_stats': None}
search_stats = {'nbfiles': 0, 'pct': 0}
search_res = None
# Single hash request route
if q:
r = service.search_hash(q)
search_res = [{'filename': None,
'sha1': q,
'found': r['found']}]
search_stats['nbfiles'] = 1
search_stats['pct'] = 100 if r['found'] else 0
# Post form submission with many hash requests
elif request.method == 'POST':
data = request.form
queries = []
# Remove potential inputs with no associated value
for k, v in data.items():
if v is not None:
if k == 'q' and len(v) > 0:
queries.append({'filename': None, 'sha1': v})
elif v != '':
queries.append({'filename': k, 'sha1': v})
if len(queries) > 0:
lookup = service.lookup_multiple_hashes(queries)
result = []
for el in lookup:
result.append({'filename': el['filename'],
'sha1': el['sha1'],
'found': el['found']})
search_res = result
nbfound = len([x for x in lookup if x['found']])
search_stats['nbfiles'] = len(queries)
search_stats['pct'] = (nbfound / len(queries))*100
response['search_res'] = search_res
response['search_stats'] = search_stats
return response
def _api_lookup(criteria,
lookup_fn,
error_msg_if_not_found,
enrich_fn=lambda x: x,
*args):
"""Capture a redundant behavior of:
- looking up the backend with a criteria (be it an identifier or checksum)
passed to the function lookup_fn
- if nothing is found, raise an NotFoundExc exception with error
message error_msg_if_not_found.
- Otherwise if something is returned:
- either as list, map or generator, map the enrich_fn function to it
and return the resulting data structure as list.
- either as dict and pass to enrich_fn and return the dict enriched.
Args:
- criteria: discriminating criteria to lookup
- lookup_fn: function expects one criteria and optional supplementary
*args.
- error_msg_if_not_found: if nothing matching the criteria is found,
raise NotFoundExc with this error message.
- enrich_fn: Function to use to enrich the result returned by
lookup_fn. Default to the identity function if not provided.
- *args: supplementary arguments to pass to lookup_fn.
Raises:
NotFoundExp or whatever `lookup_fn` raises.
"""
res = lookup_fn(criteria, *args)
if not res:
raise NotFoundExc(error_msg_if_not_found)
if isinstance(res, (map, list, GeneratorType)):
return [enrich_fn(x) for x in res]
return enrich_fn(res)
@app.route('/api/1/origin/<int:origin_id>/')
@app.route('/api/1/origin/<string:origin_type>/url/<path:origin_url>/')
@doc.route('/api/1/origin/')
@doc.arg('origin_id',
default=1,
argtype=doc.argtypes.int,
argdoc="The origin's SWH origin_id.")
@doc.arg('origin_type',
default='git',
argtype=doc.argtypes.str,
argdoc="The origin's type (git, svn..)")
@doc.arg('origin_url',
default='https://github.com/hylang/hy',
argtype=doc.argtypes.path,
argdoc="The origin's URL.")
@doc.raises(exc=doc.excs.notfound,
doc='Raised if origin_id does not correspond to an origin in SWH')
@doc.returns(rettype=doc.rettypes.dict,
retdoc='The metadata of the origin identified by origin_id')
def api_origin(origin_id=None, origin_type=None, origin_url=None):
"""Return information about the origin matching the passed criteria.
Criteria may be:
- An SWH-specific ID, if you already know it
- An origin type and its URL, if you do not have the origin's SWH
identifier
"""
ori_dict = {
'id': origin_id,
'type': origin_type,
'url': origin_url
}
ori_dict = {k: v for k, v in ori_dict.items() if ori_dict[k]}
if 'id' in ori_dict:
error_msg = 'Origin with id %s not found.' % ori_dict['id']
else:
error_msg = 'Origin with type %s and URL %s not found' % (
ori_dict['type'], ori_dict['url'])
return _api_lookup(
ori_dict, lookup_fn=service.lookup_origin,
error_msg_if_not_found=error_msg)
@app.route('/api/1/person/<int:person_id>/')
@doc.route('/api/1/person/')
@doc.arg('person_id',
default=1,
argtype=doc.argtypes.int,
argdoc="The person's SWH identifier")
@doc.raises(exc=doc.excs.notfound,
doc='Raised if person_id does not correspond to an origin in SWH')
@doc.returns(rettype=doc.rettypes.dict,
retdoc='The metadata of the person identified by person_id')
def api_person(person_id):
"""Return information about person with identifier person_id.
"""
return _api_lookup(
person_id, lookup_fn=service.lookup_person,
error_msg_if_not_found='Person with id %s not found.' % person_id)
@app.route('/api/1/release/<string:sha1_git>/')
@doc.route('/api/1/release/')
@doc.arg('sha1_git',
default='8b137891791fe96927ad78e64b0aad7bded08bdc',
argtype=doc.argtypes.sha1_git,
argdoc="The release's sha1_git identifier")
@doc.raises(exc=doc.excs.badinput,
doc='Raised if the argument is not a sha1')
@doc.raises(exc=doc.excs.notfound,
doc='Raised if sha1_git does not correspond to a release in SWH')
@doc.returns(rettype=doc.rettypes.dict,
retdoc='The metadata of the release identified by sha1_git')
def api_release(sha1_git):
"""Return information about release with id sha1_git.
"""
error_msg = 'Release with sha1_git %s not found.' % sha1_git
return _api_lookup(
sha1_git,
lookup_fn=service.lookup_release,
error_msg_if_not_found=error_msg,
enrich_fn=utils.enrich_release)
def _revision_directory_by(revision, path, request_path,
limit=100, with_data=False):
"""Compute the revision matching criterion's directory or content data.
Args:
revision: dictionary of criterions representing a revision to lookup
path: directory's path to lookup
request_path: request path which holds the original context to
limit: optional query parameter to limit the revisions log
(default to 100). For now, note that this limit could impede the
transitivity conclusion about sha1_git not being an ancestor of
with_data: indicate to retrieve the content's raw data if path resolves
to a content.
"""
def enrich_directory_local(dir, context_url=request_path):
return utils.enrich_directory(dir, context_url)
rev_id, result = service.lookup_directory_through_revision(
revision, path, limit=limit, with_data=with_data)
content = result['content']
if result['type'] == 'dir': # dir_entries
result['content'] = list(map(enrich_directory_local, content))
else: # content
result['content'] = utils.enrich_content(content)
return result
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/directory/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/directory/<path:path>/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/directory/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/directory/<path:path>/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/ts/<string:ts>'
'/directory/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/ts/<string:ts>'
'/directory/<path:path>/')
@doc.route('/api/1/revision/origin/directory/')
@doc.arg('origin_id',
default=1,
argtype=doc.argtypes.int,
argdoc="The revision's origin's SWH identifier")
@doc.arg('branch_name',
default='refs/heads/master',
argtype=doc.argtypes.path,
argdoc="""The optional branch for the given origin (default
to master""")
@doc.arg('ts',
default='2000-01-17T11:23:54+00:00',
argtype=doc.argtypes.ts,
argdoc="""Optional timestamp (default to the nearest time
crawl of timestamp)""")
@doc.arg('path',
default='.',
argtype=doc.argtypes.path,
argdoc='The path to the directory or file to display')
@doc.raises(exc=doc.excs.notfound,
doc="""Raised if a revision matching the passed criteria was
not found""")
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""The metadata of the revision corresponding to the
passed criteria""")
def api_directory_through_revision_origin(origin_id,
branch_name="refs/heads/master",
ts=None,
path=None,
with_data=False):
"""Display directory or content information through a revision identified
by origin/branch/timestamp.
"""
if ts:
ts = utils.parse_timestamp(ts)
return _revision_directory_by(
{
'origin_id': origin_id,
'branch_name': branch_name,
'ts': ts
},
path,
request.path,
with_data=with_data)
@app.route('/api/1/revision'
'/origin/<int:origin_id>/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/ts/<string:ts>/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/ts/<string:ts>/')
@doc.route('/api/1/revision/origin/')
@doc.arg('origin_id',
default=1,
argtype=doc.argtypes.int,
argdoc="The queried revision's origin identifier in SWH")
@doc.arg('branch_name',
default='refs/heads/master',
argtype=doc.argtypes.path,
argdoc="""The optional branch for the given origin (default
to master)""")
@doc.arg('ts',
default='2000-01-17T11:23:54+00:00',
argtype=doc.argtypes.ts,
argdoc="The time at which the queried revision should be constrained")
@doc.raises(exc=doc.excs.notfound,
doc="""Raised if a revision matching given criteria was not found
in SWH""")
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""The metadata of the revision identified by the given
criteria""")
def api_revision_with_origin(origin_id,
branch_name="refs/heads/master",
ts=None):
"""Display revision information through its identification by
origin/branch/timestamp.
"""
if ts:
ts = utils.parse_timestamp(ts)
return _api_lookup(
origin_id,
service.lookup_revision_by,
'Revision with (origin_id: %s, branch_name: %s'
', ts: %s) not found.' % (origin_id,
branch_name,
ts),
utils.enrich_revision,
branch_name,
ts)
@app.route('/api/1/revision/<string:sha1_git>/')
@app.route('/api/1/revision/<string:sha1_git>/prev/<path:context>/')
@doc.route('/api/1/revision/')
@doc.arg('sha1_git',
default='ec72c666fb345ea5f21359b7bc063710ce558e39',
argtype=doc.argtypes.sha1_git,
argdoc="The revision's sha1_git identifier")
@doc.arg('context',
default='6adc4a22f20bbf3bbc754f1ec8c82be5dfb5c71a',
argtype=doc.argtypes.path,
argdoc='The navigation breadcrumbs -- use at your own risk')
@doc.raises(exc=doc.excs.badinput,
doc='Raised if sha1_git is not well formed')
@doc.raises(exc=doc.excs.notfound,
doc='Raised if a revision matching sha1_git was not found in SWH')
@doc.returns(rettype=doc.rettypes.dict,
retdoc='The metadata of the revision identified by sha1_git')
def api_revision(sha1_git, context=None):
"""Return information about revision with id sha1_git.
"""
def _enrich_revision(revision, context=context):
return utils.enrich_revision(revision, context)
return _api_lookup(
sha1_git,
service.lookup_revision,
'Revision with sha1_git %s not found.' % sha1_git,
_enrich_revision)
@app.route('/api/1/revision/<string:sha1_git>/raw/')
@doc.route('/api/1/revision/raw/')
@doc.arg('sha1_git',
default='ec72c666fb345ea5f21359b7bc063710ce558e39',
argtype=doc.argtypes.sha1_git,
argdoc="The queried revision's sha1_git identifier")
@doc.raises(exc=doc.excs.badinput,
doc='Raised if sha1_git is not well formed')
@doc.raises(exc=doc.excs.notfound,
doc='Raised if a revision matching sha1_git was not found in SWH')
@doc.returns(rettype=doc.rettypes.octet_stream,
retdoc="""The message of the revision identified by sha1_git
as a downloadable octet stream""")
def api_revision_raw_message(sha1_git):
"""Return the raw data of the message of revision identified by sha1_git
"""
raw = service.lookup_revision_message(sha1_git)
return app.response_class(raw['message'],
headers={'Content-disposition': 'attachment;'
'filename=rev_%s_raw' % sha1_git},
mimetype='application/octet-stream')
@app.route('/api/1/revision/<string:sha1_git>/directory/')
@app.route('/api/1/revision/<string:sha1_git>/directory/<path:dir_path>/')
@doc.route('/api/1/revision/directory/')
@doc.arg('sha1_git',
default='ec72c666fb345ea5f21359b7bc063710ce558e39',
argtype=doc.argtypes.sha1_git,
argdoc="The revision's sha1_git identifier.")
@doc.arg('dir_path',
default='.',
argtype=doc.argtypes.path,
argdoc='The path from the top level directory')
@doc.raises(exc=doc.excs.badinput,
doc='Raised if sha1_git is not well formed')
@doc.raises(exc=doc.excs.notfound,
doc="""Raised if a revision matching sha1_git was not found in SWH
, or if the path specified does not exist""")
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""The metadata of the directory pointed by revision id
sha1-git and dir_path""")
def api_revision_directory(sha1_git,
dir_path=None,
with_data=False):
"""Return information on directory pointed by revision with sha1_git.
If dir_path is not provided, display top level directory.
Otherwise, display the directory pointed by dir_path (if it exists).
"""
return _revision_directory_by(
{
'sha1_git': sha1_git
},
dir_path,
request.path,
with_data=with_data)
@app.route('/api/1/revision/<string:sha1_git>/log/')
@app.route('/api/1/revision/<string:sha1_git>/prev/<path:prev_sha1s>/log/')
@doc.route('/api/1/revision/log/')
@doc.arg('sha1_git',
default='ec72c666fb345ea5f21359b7bc063710ce558e39',
argtype=doc.argtypes.sha1_git,
argdoc='The sha1_git of the revision queried')
@doc.arg('prev_sha1s',
default='6adc4a22f20bbf3bbc754f1ec8c82be5dfb5c71a',
argtype=doc.argtypes.path,
argdoc='The navigation breadcrumbs -- use at your own risk!')
@doc.raises(exc=doc.excs.badinput,
doc='Raised if sha1_git or prev_sha1s is not well formed')
@doc.raises(exc=doc.excs.notfound,
doc='Raised if a revision matching sha1_git was not found in SWH')
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""The log data starting at the revision identified by
sha1_git, completed with the navigation breadcrumbs,
if any""")
def api_revision_log(sha1_git, prev_sha1s=None):
"""Show all revisions (~git log) starting from sha1_git.
The first element returned is the given sha1_git, or the first
breadcrumb, if any.
"""
limit = app.config['conf']['max_log_revs']
response = {'revisions': None, 'next_revs_url': None}
revisions = None
next_revs_url = None
def lookup_revision_log_with_limit(s, limit=limit+1):
return service.lookup_revision_log(s, limit)
error_msg = 'Revision with sha1_git %s not found.' % sha1_git
rev_get = _api_lookup(sha1_git,
lookup_fn=lookup_revision_log_with_limit,
error_msg_if_not_found=error_msg,
enrich_fn=utils.enrich_revision)
if len(rev_get) == limit+1:
rev_backward = rev_get[:-1]
next_revs_url = url_for('api_revision_log',
sha1_git=rev_get[-1]['id'])
else:
rev_backward = rev_get
if not prev_sha1s: # no nav breadcrumbs, so we're done
revisions = rev_backward
else:
rev_forward_ids = prev_sha1s.split('/')
rev_forward = _api_lookup(rev_forward_ids,
lookup_fn=service.lookup_revision_multiple,
error_msg_if_not_found=error_msg,
enrich_fn=utils.enrich_revision)
revisions = rev_forward + rev_backward
response['revisions'] = revisions
response['next_revs_url'] = next_revs_url
return response
@app.route('/api/1/revision'
'/origin/<int:origin_id>/log/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>/log/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/branch/<path:branch_name>'
'/ts/<string:ts>/log/')
@app.route('/api/1/revision'
'/origin/<int:origin_id>'
'/ts/<string:ts>/log/')
@doc.route('/api/1/revision/origin/log/')
@doc.arg('origin_id',
default=1,
argtype=doc.argtypes.int,
argdoc="The revision's SWH origin identifier")
@doc.arg('branch_name',
default='refs/heads/master',
argtype=doc.argtypes.path,
argdoc="The revision's branch name within the origin specified")
@doc.arg('ts',
default='2000-01-17T11:23:54+00:00',
argtype=doc.argtypes.ts,
argdoc="""A time or timestamp string to parse""")
@doc.raises(exc=doc.excs.notfound,
doc="""Raised if a revision matching the given criteria was not
found in SWH""")
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""The metadata of the revision log starting at the revision
matching the given criteria.""")
def api_revision_log_by(origin_id,
branch_name='refs/heads/master',
ts=None):
"""Show all revisions (~git log) starting from the revision
described by its origin_id, optional branch name and timestamp.
The first element returned is the described revision.
"""
limit = app.config['conf']['max_log_revs']
response = {'revisions': None, 'next_revs_url': None}
next_revs_url = None
if ts:
ts = utils.parse_timestamp(ts)
def lookup_revision_log_by_with_limit(o_id, br, ts, limit=limit+1):
return service.lookup_revision_log_by(o_id, br, ts, limit)
error_msg = 'No revision matching origin %s ' % origin_id
error_msg += ', branch name %s' % branch_name
error_msg += (' and time stamp %s.' % ts) if ts else '.'
rev_get = _api_lookup(origin_id,
lookup_revision_log_by_with_limit,
error_msg,
utils.enrich_revision,
branch_name,
ts)
if len(rev_get) == limit+1:
revisions = rev_get[:-1]
next_revs_url = url_for('api_revision_log',
sha1_git=rev_get[-1]['id'])
else:
revisions = rev_get
response['revisions'] = revisions
response['next_revs_url'] = next_revs_url
return response
@app.route('/api/1/directory/<string:sha1_git>/')
@app.route('/api/1/directory/<string:sha1_git>/<path:path>/')
@doc.route('/api/1/directory/')
@doc.arg('sha1_git',
default='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc',
argtype=doc.argtypes.sha1_git,
argdoc="The queried directory's corresponding sha1_git hash")
@doc.arg('path',
default='.',
argtype=doc.argtypes.path,
argdoc="A path relative to the queried directory's top level")
@doc.raises(exc=doc.excs.badinput,
doc='Raised if sha1_git is not well formed')
@doc.raises(exc=doc.excs.notfound,
doc='Raised if a directory matching sha1_git was not found in SWH')
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""The metadata and contents of the release identified by
sha1_git""")
def api_directory(sha1_git,
path=None):
"""Return information about release with id sha1_git.
"""
if path:
error_msg_path = ('Entry with path %s relative to directory '
'with sha1_git %s not found.') % (path, sha1_git)
return _api_lookup(
sha1_git,
service.lookup_directory_with_path,
error_msg_path,
utils.enrich_directory,
path)
else:
error_msg_nopath = 'Directory with sha1_git %s not found.' % sha1_git
return _api_lookup(
sha1_git,
service.lookup_directory,
error_msg_nopath,
utils.enrich_directory)
@app.route('/api/1/provenance/<string:q>/')
@doc.route('/api/1/provenance/')
@doc.arg('q',
default='sha1_git:88b9b366facda0b5ff8d8640ee9279bed346f242',
argtype=doc.argtypes.algo_and_hash,
argdoc="""The queried content's corresponding hash (supported hash
algorithms: sha1_git, sha1, sha256)""")
@doc.raises(exc=doc.excs.badinput,
doc="""Raised if hash algorithm is incorrect or if the hash
value is badly formatted.""")
@doc.raises(exc=doc.excs.notfound,
doc="""Raised if a content matching the hash was not found
in SWH""")
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""List of provenance information (dict) for the matched
content.""")
def api_content_provenance(q):
"""Return content's provenance information if any.
"""
def _enrich_revision(provenance):
p = provenance.copy()
p['revision_url'] = url_for('api_revision',
sha1_git=provenance['revision'])
p['content_url'] = url_for('api_content_metadata',
q='sha1_git:%s' % provenance['content'])
p['origin_url'] = url_for('api_origin',
origin_id=provenance['origin'])
return p
return _api_lookup(
q,
lookup_fn=service.lookup_content_provenance,
error_msg_if_not_found='Content with %s not found.' % q,
enrich_fn=_enrich_revision)
@app.route('/api/1/content/<string:q>/raw/')
@doc.route('/api/1/content/raw/')
@doc.arg('q',
default='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc',
argtype=doc.argtypes.algo_and_hash,
argdoc="""An algo_hash:hash string, where algo_hash is one of sha1,
sha1_git or sha256 and hash is the hash to search for in SWH. Defaults
to sha1 in the case of a missing algo_hash
""")
@doc.raises(exc=doc.excs.badinput,
doc='Raised if q is not well formed')
@doc.raises(exc=doc.excs.notfound,
doc='Raised if a content matching q was not found in SWH')
@doc.returns(rettype=doc.rettypes.octet_stream,
retdoc='The raw content data as an octet stream')
def api_content_raw(q):
"""Return content's raw data if content is found.
"""
def generate(content):
yield content['data']
content = service.lookup_content_raw(q)
if not content:
raise NotFoundExc('Content with %s not found.' % q)
return app.response_class(generate(content),
headers={'Content-disposition': 'attachment;'
'filename=content_%s_raw' % q},
mimetype='application/octet-stream')
@app.route('/api/1/content/<string:q>/')
@doc.route('/api/1/content/')
@doc.arg('q',
default='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc',
argtype=doc.argtypes.algo_and_hash,
argdoc="""An algo_hash:hash string, where algo_hash is one of sha1,
sha1_git or sha256 and hash is the hash to search for in SWH. Defaults
to sha1 in the case of a missing algo_hash
""")
@doc.raises(exc=doc.excs.badinput,
doc='Raised if q is not well formed')
@doc.raises(exc=doc.excs.notfound,
doc='Raised if a content matching q was not found in SWH')
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""The metadata of the content identified by q. If content
decoding was successful, it also returns the data""")
def api_content_metadata(q):
"""Return content information if content is found.
"""
return _api_lookup(
q,
lookup_fn=service.lookup_content,
error_msg_if_not_found='Content with %s not found.' % q,
enrich_fn=utils.enrich_content)
@app.route('/api/1/entity/<string:uuid>/')
@doc.route('/api/1/entity/')
@doc.arg('uuid',
default='5f4d4c51-498a-4e28-88b3-b3e4e8396cba',
argtype=doc.argtypes.uuid,
argdoc="The entity's uuid identifier")
@doc.raises(exc=doc.excs.badinput,
doc='Raised if uuid is not well formed')
@doc.raises(exc=doc.excs.notfound,
doc='Raised if an entity matching uuid was not found in SWH')
@doc.returns(rettype=doc.rettypes.dict,
retdoc='The metadata of the entity identified by uuid')
def api_entity_by_uuid(uuid):
"""Return content information if content is found.
"""
return _api_lookup(
uuid,
lookup_fn=service.lookup_entity_by_uuid,
error_msg_if_not_found="Entity with uuid '%s' not found." % uuid,
enrich_fn=utils.enrich_entity)
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sat, Jun 21, 9:05 PM (4 w, 11 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3280252
Attached To
rDWAPPS Web applications
Event Timeline
Log In to Comment