Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9312081
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
60 KB
Subscribers
None
View Options
diff --git a/swh/web/ui/backend.py b/swh/web/ui/backend.py
index f7c27362..d12e88db 100644
--- a/swh/web/ui/backend.py
+++ b/swh/web/ui/backend.py
@@ -1,162 +1,163 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from swh.web.ui import main
def content_get(sha1_bin):
"""Lookup the content designed by {algo: hash_bin}.
Args:
sha1_bin: content's binary sha1.
Returns:
Content as dict with 'sha1' and 'data' keys.
data representing its raw data.
"""
contents = main.storage().content_get([sha1_bin])
if contents and len(contents) >= 1:
return contents[0]
return None
def content_find(algo, hash_bin):
"""Retrieve the content with binary hash hash_bin
Args:
algo: nature of the hash hash_bin.
hash_bin: content's hash searched for.
Returns:
A triplet (sha1, sha1_git, sha256) if the content exist
or None otherwise.
"""
return main.storage().content_find({algo: hash_bin})
def content_find_occurrence(algo, hash_bin):
"""Find the content's occurrence.
Args:
algo: nature of the hash hash_bin.
hash_bin: content's hash searched for.
Returns:
The occurrence of the content.
"""
return main.storage().content_find_occurrence({algo: hash_bin})
def origin_get(origin_id):
"""Return information about the origin with id origin_id.
Args:
origin_id: origin's identifier
Returns:
Origin information as dict.
"""
return main.storage().origin_get({'id': origin_id})
def person_get(person_id):
"""Return information about the person with id person_id.
Args:
person_id: person's identifier.v
Returns:
Person information as dict.
"""
return main.storage().person_get([person_id])
-def directory_get(sha1_git_bin):
+def directory_get(sha1_git_bin, recursive=False):
"""Return information about the directory with id sha1_git.
Args:
sha1_git: directory's identifier.
+ recursive: Optional recursive flag default to False
Returns:
Directory information as dict.
"""
- directory_entries = main.storage().directory_get(sha1_git_bin)
+ directory_entries = main.storage().directory_get(sha1_git_bin, recursive)
if not directory_entries:
return None
return directory_entries
def release_get(sha1_git_bin):
"""Return information about the release with sha1 sha1_git_bin.
Args:
sha1_git_bin: The release's sha1 as hexadecimal.
Returns:
Release information as dict if found, None otherwise.
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
res = main.storage().release_get([sha1_git_bin])
if res and len(res) >= 1:
return res[0]
return None
def revision_get(sha1_git_bin):
"""Return information about the revision with sha1 sha1_git_bin.
Args:
sha1_git_bin: The revision's sha1 as hexadecimal.
Returns:
Revision information as dict if found, None otherwise.
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
res = main.storage().revision_get([sha1_git_bin])
if res and len(res) >= 1:
return res[0]
return None
def revision_log(sha1_git_bin, limit=100):
"""Return information about the revision with sha1 sha1_git_bin.
Args:
sha1_git_bin: The revision's sha1 as hexadecimal.
limit: the maximum number of revisions returned.
Returns:
Revision information as dict if found, None otherwise.
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
return main.storage().revision_log(sha1_git_bin, limit)
def stat_counters():
"""Return the stat counters for Software Heritage
Returns:
A dict mapping textual labels to integer values.
"""
return main.storage().stat_counters()
diff --git a/swh/web/ui/service.py b/swh/web/ui/service.py
index 9408adba..862e1006 100644
--- a/swh/web/ui/service.py
+++ b/swh/web/ui/service.py
@@ -1,340 +1,324 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import defaultdict
from swh.core import hashutil
-from swh.web.ui import converters, main, query, upload
+from swh.web.ui import converters, query, upload, backend
from swh.web.ui.exc import BadInputExc, NotFoundExc
def hash_and_search(filepath):
"""Hash the filepath's content as sha1, then search in storage if
it exists.
Args:
Filepath of the file to hash and search.
Returns:
Tuple (hex sha1, found as True or false).
The found boolean, according to whether the sha1 of the file
is present or not.
"""
h = hashutil.hashfile(filepath)
- c = main.storage().content_find(h)
+ c = backend.content_find('sha1', h['sha1'])
if c:
r = converters.from_content(c)
r['found'] = True
return r
else:
return {'sha1': hashutil.hash_to_hex(h['sha1']),
'found': False}
def upload_and_search(file):
"""Upload a file and compute its hash.
"""
tmpdir, filename, filepath = upload.save_in_upload_folder(file)
res = {'filename': filename}
try:
content = hash_and_search(filepath)
res.update(content)
return res
finally:
# clean up
if tmpdir:
upload.cleanup(tmpdir)
def lookup_hash(q):
"""Checks if the storage contains a given content checksum
Args: query string of the form <hash_algo:hash>
Returns: Dict with key found to True or False, according to
whether the checksum is present or not
"""
(algo, hash) = query.parse_hash(q)
- found = main.storage().content_find({algo: hash})
+ found = backend.content_find(algo, hash)
return {'found': found,
'algo': algo}
def lookup_hash_origin(q):
"""Return information about the checksum contained in the query q.
Args: query string of the form <hash_algo:hash>
Returns:
origin as dictionary if found for the given content.
"""
algo, h = query.parse_hash(q)
- origin = main.storage().content_find_occurrence({algo: h})
+ origin = backend.content_find_occurrence(algo, h)
return converters.from_origin(origin)
def lookup_origin(origin_id):
"""Return information about the origin with id origin_id.
Args:
origin_id as string
Returns:
origin information as dict.
"""
- return main.storage().origin_get({'id': origin_id})
+ return backend.origin_get(origin_id)
def lookup_person(person_id):
"""Return information about the person with id person_id.
Args:
person_id as string
Returns:
person information as dict.
"""
- persons = main.storage().person_get([person_id])
- if not persons:
- return None
- return converters.from_person(persons[0])
+ person = backend.person_get(person_id)
+ return converters.from_person(person)
def lookup_directory(sha1_git):
"""Return information about the directory with id sha1_git.
Args:
sha1_git as string
Returns:
directory information as dict.
"""
- algo, hBinSha1 = query.parse_hash(sha1_git)
+ algo, sha1_git_bin = query.parse_hash(sha1_git)
if algo != 'sha1': # HACK: sha1_git really but they are both sha1...
raise BadInputExc('Only sha1_git is supported.')
- directory_entries = main.storage().directory_get(hBinSha1)
- if not directory_entries:
- return None
-
+ directory_entries = backend.directory_get(sha1_git_bin)
return map(converters.from_directory_entry, directory_entries)
def lookup_release(release_sha1_git):
"""Return information about the release with sha1 release_sha1_git.
Args:
release_sha1_git: The release's sha1 as hexadecimal
Returns:
Release information as dict.
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
- algo, hBinSha1 = query.parse_hash(release_sha1_git)
+ algo, sha1_git_bin = query.parse_hash(release_sha1_git)
if algo != 'sha1': # HACK: sha1_git really but they are both sha1...
raise BadInputExc('Only sha1_git is supported.')
- res = main.storage().release_get([hBinSha1])
-
- if res and len(res) >= 1:
- return converters.from_release(res[0])
- return None
+ res = backend.release_get(sha1_git_bin)
+ return converters.from_release(res)
def lookup_revision(rev_sha1_git):
"""Return information about the revision with sha1 revision_sha1_git.
Args:
revision_sha1_git: The revision's sha1 as hexadecimal
Returns:
Revision information as dict.
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
- algo, hBinSha1 = query.parse_hash(rev_sha1_git)
+ algo, sha1_git_bin = query.parse_hash(rev_sha1_git)
if algo != 'sha1': # HACK: sha1_git really but they are both sha1...
raise BadInputExc('Only sha1_git is supported.')
- res = main.storage().revision_get([hBinSha1])
-
- if res and len(res) >= 1:
- return converters.from_revision(res[0])
- return None
+ res = backend.revision_get(sha1_git_bin)
+ return converters.from_revision(res)
def lookup_revision_log(rev_sha1_git, limit=100):
"""Return information about the revision with sha1 revision_sha1_git.
Args:
revision_sha1_git: The revision's sha1 as hexadecimal
limit: the maximum number of revisions returned
Returns:
Revision information as dict.
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
algo, bin_sha1 = query.parse_hash(rev_sha1_git)
if algo != 'sha1': # HACK: sha1_git really but they are both sha1...
raise BadInputExc('Only sha1_git is supported.')
- revision_entries = main.storage().revision_log(bin_sha1, limit)
+ revision_entries = backend.revision_log(bin_sha1, limit)
return map(converters.from_revision, revision_entries)
def lookup_revision_with_context(sha1_git_root, sha1_git, limit=100):
"""Return information about revision sha1_git, limited to the
sub-graph of all transitive parents of sha1_git_root.
In other words, sha1_git is an ancestor of sha1_git_root.
Args:
sha1_git_root: latest revision of the browsed history
sha1_git: one of sha1_git_root's ancestors
limit: limit the lookup to 100 revisions back
Returns:
Information on sha1_git if it is an ancestor of sha1_git_root
including children leading to sha1_git_root
Raises:
BadInputExc in case of unknown algo_hash or bad hash
NotFoundExc if either revision is not found or if sha1_git is not an
ancestor of sha1_git_root
"""
revision = lookup_revision(sha1_git)
if not revision:
raise NotFoundExc('Revision %s not found' % sha1_git)
revision_root = lookup_revision(sha1_git_root)
if not revision_root:
raise NotFoundExc('Revision %s not found' % sha1_git_root)
bin_sha1_root = hashutil.hex_to_hash(sha1_git_root)
- revision_log = main.storage().revision_log(bin_sha1_root, limit)
+ revision_log = backend.revision_log(bin_sha1_root, limit)
parents = {}
children = defaultdict(list)
for rev in revision_log:
rev_id = hashutil.hash_to_hex(rev['id'])
parents[rev_id] = []
for parent_id in rev['parents']:
parent_id = hashutil.hash_to_hex(parent_id)
parents[rev_id].append(parent_id)
children[parent_id].append(rev_id)
if revision['id'] not in parents:
raise NotFoundExc('Revision %s is not an ancestor of %s' %
(sha1_git, sha1_git_root))
revision['children'] = children[revision['id']]
return revision
def lookup_revision_with_directory(sha1_git, dir_path=None):
"""Return information on directory pointed by revision with sha1_git.
If dir_path is not provided, display top level directory.
Otherwise, display the directory pointed by dir_path (if it exists).
Args:
sha1_git: revision's hash.
dir_path: optional directory pointed to by that revision.
Returns:
Information on the directory pointed to by that revision.
Raises:
BadInputExc in case of unknown algo_hash or bad hash.
NotFoundExc either if the revision is not found or the path referenced
does not exist
"""
revision = lookup_revision(sha1_git)
if not revision:
raise NotFoundExc('Revision %s not found' % sha1_git)
dir_sha1_git = revision['directory']
if not dir_path:
return lookup_directory(dir_sha1_git)
dir_sha1_git_bin = hashutil.hex_to_hash(dir_sha1_git)
- directory_entries = main.storage().directory_get(dir_sha1_git_bin,
- recursive=True)
+ directory_entries = backend.directory_get(dir_sha1_git_bin,
+ recursive=True)
dir_id = None
for entry_dir in directory_entries:
name = entry_dir['name'].decode('utf-8')
type = entry_dir['type']
if name == dir_path and type == 'dir':
dir_id = hashutil.hash_to_hex(entry_dir['target'])
if not dir_id:
raise NotFoundExc("Directory '%s' pointed to by revision %s not found"
% (dir_path, sha1_git))
return lookup_directory(dir_id)
def lookup_content(q):
"""Lookup the content designed by q.
Args:
q: The release's sha1 as hexadecimal
"""
(algo, hash) = query.parse_hash(q)
- c = main.storage().content_find({algo: hash})
- if c:
- return converters.from_content(c)
- return None
+ c = backend.content_find(algo, hash)
+ return converters.from_content(c)
def lookup_content_raw(q):
"""Lookup the content designed by q.
Args:
q: query string of the form <hash_algo:hash>
Returns:
dict with 'sha1' and 'data' keys.
data representing its raw data decoded.
"""
(algo, hash) = query.parse_hash(q)
- c = main.storage().content_find({algo: hash})
+ c = backend.content_find(algo, hash)
if not c:
return None
- sha1 = c['sha1']
- contents = main.storage().content_get([sha1])
- if contents and len(contents) >= 1:
- return converters.from_content(contents[0])
- return None
+ content = backend.content_get(c['sha1'])
+ return converters.from_content(content)
def stat_counters():
"""Return the stat counters for Software Heritage
Returns:
A dict mapping textual labels to integer values.
"""
- return main.storage().stat_counters()
+ return backend.stat_counters()
diff --git a/swh/web/ui/tests/test_backend.py b/swh/web/ui/tests/test_backend.py
index 221e3641..753c2d4e 100644
--- a/swh/web/ui/tests/test_backend.py
+++ b/swh/web/ui/tests/test_backend.py
@@ -1,381 +1,381 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
from nose.tools import istest
from unittest.mock import MagicMock
from swh.core import hashutil
from swh.web.ui import backend
from swh.web.ui.tests import test_app
class BackendTestCase(test_app.SWHApiTestCase):
@istest
def content_get_ko_not_found_1(self):
# given
sha1_bin = hashutil.hex_to_hash(
'456caf10e9535160d90e874b45aa426de762f777')
self.storage.content_get = MagicMock(return_value=None)
# when
actual_content = backend.content_get(sha1_bin)
# then
self.assertIsNone(actual_content)
self.storage.content_get.assert_called_once_with(
[sha1_bin])
@istest
def content_get_ko_not_found_empty_result(self):
# given
sha1_bin = hashutil.hex_to_hash(
'456caf10e9535160d90e874b45aa426de762f19f')
self.storage.content_get = MagicMock(return_value=[])
# when
actual_content = backend.content_get(sha1_bin)
# then
self.assertIsNone(actual_content)
self.storage.content_get.assert_called_once_with(
[sha1_bin])
@istest
def content_get(self):
# given
sha1_bin = hashutil.hex_to_hash(
'123caf10e9535160d90e874b45aa426de762f19f')
stub_contents = [{
'sha1': sha1_bin,
'data': b'binary data',
},
{}]
self.storage.content_get = MagicMock(return_value=stub_contents)
# when
actual_content = backend.content_get(sha1_bin)
# then
self.assertEquals(actual_content, stub_contents[0])
self.storage.content_get.assert_called_once_with(
[sha1_bin])
@istest
def content_find_ko_no_result(self):
# given
sha1_bin = hashutil.hex_to_hash(
'123caf10e9535160d90e874b45aa426de762f19f')
self.storage.content_find = MagicMock(return_value=None)
# when
actual_lookup = backend.content_find('sha1_git', sha1_bin)
# then
self.assertIsNone(actual_lookup)
self.storage.content_find.assert_called_once_with(
{'sha1_git': sha1_bin})
@istest
def content_find(self):
# given
sha1_bin = hashutil.hex_to_hash(
'456caf10e9535160d90e874b45aa426de762f19f')
self.storage.content_find = MagicMock(return_value=(1, 2, 3))
# when
actual_content = backend.content_find('sha1', sha1_bin)
# then
self.assertEquals(actual_content, (1, 2, 3))
# check the function has been called with parameters
self.storage.content_find.assert_called_with({'sha1': sha1_bin})
@istest
def content_find_occurrence_ko_no_result(self):
# given
sha1_bin = hashutil.hex_to_hash(
'123caf10e9535160d90e874b45aa426de762f19f')
self.storage.content_find_occurrence = MagicMock(return_value=None)
# when
actual_lookup = backend.content_find_occurrence('sha1_git', sha1_bin)
# then
self.assertIsNone(actual_lookup)
self.storage.content_find_occurrence.assert_called_once_with(
{'sha1_git': sha1_bin})
@istest
def content_find_occurrence(self):
# given
sha1_bin = hashutil.hex_to_hash(
'456caf10e9535160d90e874b45aa426de762f19f')
self.storage.content_find_occurrence = MagicMock(
return_value=(1, 2, 3))
# when
actual_content = backend.content_find_occurrence('sha1', sha1_bin)
# then
self.assertEquals(actual_content, (1, 2, 3))
# check the function has been called with parameters
self.storage.content_find_occurrence.assert_called_with(
{'sha1': sha1_bin})
@istest
def origin_get(self):
# given
self.storage.origin_get = MagicMock(return_value={
'id': 'origin-id',
'lister': 'uuid-lister',
'project': 'uuid-project',
'url': 'ftp://some/url/to/origin',
'type': 'ftp'})
# when
actual_origin = backend.origin_get('origin-id')
# then
self.assertEqual(actual_origin, {'id': 'origin-id',
'lister': 'uuid-lister',
'project': 'uuid-project',
'url': 'ftp://some/url/to/origin',
'type': 'ftp'})
self.storage.origin_get.assert_called_with({'id': 'origin-id'})
@istest
def person_get(self):
# given
self.storage.person_get = MagicMock(return_value={
'id': 'person-id',
'name': 'blah'})
# when
actual_person = backend.person_get('person-id')
# then
self.assertEqual(actual_person, {'id': 'person-id',
'name': 'blah'})
self.storage.person_get.assert_called_with(['person-id'])
@istest
def directory_get_not_found(self):
# given
sha1_bin = hashutil.hex_to_hash(
'40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
self.storage.directory_get = MagicMock(return_value=[])
# when
actual_directory = backend.directory_get(sha1_bin)
# then
self.assertIsNone(actual_directory)
- self.storage.directory_get.assert_called_with(sha1_bin)
+ self.storage.directory_get.assert_called_with(sha1_bin, False)
@istest
def directory_get(self):
# given
sha1_bin = hashutil.hex_to_hash(
'40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
stub_dir_entries = [{
'sha1': hashutil.hex_to_hash('5c6f0e2750f48fa0bd0c4cf5976ba0b9e0'
'2ebda5'),
'sha256': hashutil.hex_to_hash('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926'),
'sha1_git': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'target': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'dir_id': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'name': b'bob',
'type': 10,
}]
self.storage.directory_get = MagicMock(
return_value=stub_dir_entries)
- actual_directory = backend.directory_get(sha1_bin)
+ actual_directory = backend.directory_get(sha1_bin, recursive=True)
# then
self.assertIsNotNone(actual_directory)
self.assertEqual(list(actual_directory), stub_dir_entries)
- self.storage.directory_get.assert_called_with(sha1_bin)
+ self.storage.directory_get.assert_called_with(sha1_bin, True)
@istest
def release_get_not_found(self):
# given
sha1_bin = hashutil.hex_to_hash(
'65a55bbdf3629f916219feb3dcc7393ded1bc8db')
self.storage.release_get = MagicMock(return_value=[])
# when
actual_release = backend.release_get(sha1_bin)
# then
self.assertIsNone(actual_release)
self.storage.release_get.assert_called_with([sha1_bin])
@istest
def release_get(self):
# given
sha1_bin = hashutil.hex_to_hash(
'65a55bbdf3629f916219feb3dcc7393ded1bc8db')
stub_releases = [{
'id': sha1_bin,
'target': None,
'date': datetime.datetime(2015, 1, 1, 22, 0, 0,
tzinfo=datetime.timezone.utc),
'name': b'v0.0.1',
'message': b'synthetic release',
'synthetic': True,
}]
self.storage.release_get = MagicMock(return_value=stub_releases)
# when
actual_release = backend.release_get(sha1_bin)
# then
self.assertEqual(actual_release, stub_releases[0])
self.storage.release_get.assert_called_with([sha1_bin])
@istest
def revision_get_not_found(self):
# given
sha1_bin = hashutil.hex_to_hash(
'18d8be353ed3480476f032475e7c233eff7371d5')
self.storage.revision_get = MagicMock(return_value=[])
# when
actual_revision = backend.revision_get(sha1_bin)
# then
self.assertIsNone(actual_revision)
self.storage.revision_get.assert_called_with([sha1_bin])
@istest
def revision_get(self):
# given
sha1_bin = hashutil.hex_to_hash(
'18d8be353ed3480476f032475e7c233eff7371d5')
stub_revisions = [{
'id': sha1_bin,
'directory': hashutil.hex_to_hash(
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'),
'author': {
'name': b'bill & boule',
'email': b'bill@boule.org',
},
'committer': {
'name': b'boule & bill',
'email': b'boule@bill.org',
},
'message': b'elegant fix for bug 31415957',
'date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'date_offset': 0,
'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
}]
self.storage.revision_get = MagicMock(return_value=stub_revisions)
# when
actual_revision = backend.revision_get(sha1_bin)
# then
self.assertEqual(actual_revision, stub_revisions[0])
self.storage.revision_get.assert_called_with([sha1_bin])
@istest
def revision_log(self):
# given
sha1_bin = hashutil.hex_to_hash(
'28d8be353ed3480476f032475e7c233eff7371d5')
stub_revision_log = [{
'id': sha1_bin,
'directory': hashutil.hex_to_hash(
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'),
'author': {
'name': b'bill & boule',
'email': b'bill@boule.org',
},
'committer': {
'name': b'boule & bill',
'email': b'boule@bill.org',
},
'message': b'elegant fix for bug 31415957',
'date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'date_offset': 0,
'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
}]
self.storage.revision_log = MagicMock(return_value=stub_revision_log)
# when
actual_revision = backend.revision_log(sha1_bin)
# then
self.assertEqual(list(actual_revision), stub_revision_log)
self.storage.revision_log.assert_called_with(sha1_bin, 100)
@istest
def stat_counters(self):
# given
input_stats = {
"content": 1770830,
"directory": 211683,
"directory_entry_dir": 209167,
"directory_entry_file": 1807094,
"directory_entry_rev": 0,
"entity": 0,
"entity_history": 0,
"occurrence": 0,
"occurrence_history": 19600,
"origin": 1096,
"person": 0,
"release": 8584,
"revision": 7792,
"revision_history": 0,
"skipped_content": 0
}
self.storage.stat_counters = MagicMock(return_value=input_stats)
# when
actual_stats = backend.stat_counters()
# then
expected_stats = input_stats
self.assertEqual(actual_stats, expected_stats)
self.storage.stat_counters.assert_called_with()
diff --git a/swh/web/ui/tests/test_service.py b/swh/web/ui/tests/test_service.py
index 97fd1431..2d9d1d46 100644
--- a/swh/web/ui/tests/test_service.py
+++ b/swh/web/ui/tests/test_service.py
@@ -1,728 +1,722 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
from nose.tools import istest
from unittest.mock import MagicMock, patch, call
from swh.core.hashutil import hex_to_hash
from swh.web.ui import service
from swh.web.ui.exc import BadInputExc, NotFoundExc
from swh.web.ui.tests import test_app
class ServiceTestCase(test_app.SWHApiTestCase):
+ @patch('swh.web.ui.service.backend')
@istest
- def lookup_hash_does_not_exist(self):
+ def lookup_hash_does_not_exist(self, mock_backend):
# given
- self.storage.content_find = MagicMock(return_value=None)
+ mock_backend.content_find = MagicMock(return_value=None)
# when
actual_lookup = service.lookup_hash(
'sha1_git:123caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEquals({'found': None,
'algo': 'sha1_git'}, actual_lookup)
# check the function has been called with parameters
- self.storage.content_find.assert_called_with({
- 'sha1_git':
- hex_to_hash('123caf10e9535160d90e874b45aa426de762f19f')
- })
+ mock_backend.content_find.assert_called_with(
+ 'sha1_git',
+ hex_to_hash('123caf10e9535160d90e874b45aa426de762f19f'))
+ @patch('swh.web.ui.service.backend')
@istest
- def lookup_hash_exist(self):
+ def lookup_hash_exist(self, mock_backend):
# given
stub_content = {
'sha1': hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')
}
- self.storage.content_find = MagicMock(return_value=stub_content)
+ mock_backend.content_find = MagicMock(return_value=stub_content)
# when
actual_lookup = service.lookup_hash(
'sha1:456caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEquals({'found': stub_content,
'algo': 'sha1'}, actual_lookup)
- self.storage.content_find.assert_called_with({
- 'sha1':
+ mock_backend.content_find.assert_called_with(
+ 'sha1',
hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f'),
- })
+ )
+ @patch('swh.web.ui.service.backend')
@istest
- def lookup_hash_origin(self):
+ def lookup_hash_origin(self, mock_backend):
# given
- self.storage.content_find_occurrence = MagicMock(return_value={
+ mock_backend.content_find_occurrence = MagicMock(return_value={
'origin_type': 'sftp',
'origin_url': 'sftp://ftp.gnu.org/gnu/octave',
'branch': 'octavio-3.4.0.tar.gz',
'revision': b'\xb0L\xaf\x10\xe9SQ`\xd9\x0e\x87KE\xaaBm\xe7b\xf1\x9f', # noqa
'path': b'octavio-3.4.0/doc/interpreter/octave.html/doc_002dS_005fISREG.html' # noqa
})
expected_origin = {
'origin_type': 'sftp',
'origin_url': 'sftp://ftp.gnu.org/gnu/octave',
'branch': 'octavio-3.4.0.tar.gz',
'revision': 'b04caf10e9535160d90e874b45aa426de762f19f',
'path': 'octavio-3.4.0/doc/interpreter/octave.html/doc'
'_002dS_005fISREG.html'
}
# when
actual_origin = service.lookup_hash_origin(
'sha1_git:456caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEqual(actual_origin, expected_origin)
- self.storage.content_find_occurrence.assert_called_with(
- {'sha1_git':
- hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')})
+ mock_backend.content_find_occurrence.assert_called_with(
+ 'sha1_git',
+ hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f'))
+ @patch('swh.web.ui.service.backend')
@istest
- def stat_counters(self):
+ def stat_counters(self, mock_backend):
# given
input_stats = {
"content": 1770830,
"directory": 211683,
"directory_entry_dir": 209167,
"directory_entry_file": 1807094,
"directory_entry_rev": 0,
"entity": 0,
"entity_history": 0,
"occurrence": 0,
"occurrence_history": 19600,
"origin": 1096,
"person": 0,
"release": 8584,
"revision": 7792,
"revision_history": 0,
"skipped_content": 0
}
- self.storage.stat_counters = MagicMock(return_value=input_stats)
+ mock_backend.stat_counters = MagicMock(return_value=input_stats)
# when
actual_stats = service.stat_counters()
# then
expected_stats = input_stats
self.assertEqual(actual_stats, expected_stats)
- self.storage.stat_counters.assert_called_with()
+ mock_backend.stat_counters.assert_called_with()
+ @patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.hashutil')
@istest
- def hash_and_search(self, mock_hashutil):
+ def hash_and_search(self, mock_hashutil, mock_backend):
# given
bhash = hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')
mock_hashutil.hashfile.return_value = {'sha1': bhash}
- self.storage.content_find = MagicMock(return_value={
+ mock_backend.content_find = MagicMock(return_value={
'sha1': bhash,
'sha1_git': bhash,
})
# when
actual_content = service.hash_and_search('/some/path')
# then
self.assertEqual(actual_content, {
'sha1': '456caf10e9535160d90e874b45aa426de762f19f',
'sha1_git': '456caf10e9535160d90e874b45aa426de762f19f',
'found': True,
})
mock_hashutil.hashfile.assert_called_once_with('/some/path')
- self.storage.content_find.assert_called_once_with({'sha1': bhash})
+ mock_backend.content_find.assert_called_once_with('sha1', bhash)
@patch('swh.web.ui.service.hashutil')
@istest
def hash_and_search_not_found(self, mock_hashutil):
# given
bhash = hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')
mock_hashutil.hashfile.return_value = {'sha1': bhash}
mock_hashutil.hash_to_hex = MagicMock(
return_value='456caf10e9535160d90e874b45aa426de762f19f')
self.storage.content_find = MagicMock(return_value=None)
# when
actual_content = service.hash_and_search('/some/path')
# then
self.assertEqual(actual_content, {
'sha1': '456caf10e9535160d90e874b45aa426de762f19f',
'found': False,
})
mock_hashutil.hashfile.assert_called_once_with('/some/path')
self.storage.content_find.assert_called_once_with({'sha1': bhash})
mock_hashutil.hash_to_hex.assert_called_once_with(bhash)
@patch('swh.web.ui.service.upload')
@istest
def test_upload_and_search(self, mock_upload):
mock_upload.save_in_upload_folder.return_value = (
'/tmp/dir', 'some-filename', '/tmp/dir/path/some-filename')
service.hash_and_search = MagicMock(side_effect=lambda filepath:
{'sha1': 'blah',
'found': True})
mock_upload.cleanup.return_value = None
file = MagicMock(filename='some-filename')
# when
actual_res = service.upload_and_search(file)
# then
self.assertEqual(actual_res, {
'filename': 'some-filename',
'sha1': 'blah',
'found': True})
mock_upload.save_in_upload_folder.assert_called_with(file)
mock_upload.cleanup.assert_called_with('/tmp/dir')
service.hash_and_search.assert_called_once_with(
'/tmp/dir/path/some-filename')
+ @patch('swh.web.ui.service.backend')
@istest
- def lookup_origin(self):
+ def lookup_origin(self, mock_backend):
# given
- self.storage.origin_get = MagicMock(return_value={
+ mock_backend.origin_get = MagicMock(return_value={
'id': 'origin-id',
'lister': 'uuid-lister',
'project': 'uuid-project',
'url': 'ftp://some/url/to/origin',
'type': 'ftp'})
# when
actual_origin = service.lookup_origin('origin-id')
# then
self.assertEqual(actual_origin, {'id': 'origin-id',
'lister': 'uuid-lister',
'project': 'uuid-project',
'url': 'ftp://some/url/to/origin',
'type': 'ftp'})
- self.storage.origin_get.assert_called_with({'id': 'origin-id'})
+ mock_backend.origin_get.assert_called_with('origin-id')
+
+ @patch('swh.web.ui.service.backend')
+ @istest
+ def lookup_release_ko_id_checksum_not_ok_because_not_a_sha1(self,
+ mock_backend):
+ # given
+ mock_backend.release_get = MagicMock()
+
+ with self.assertRaises(BadInputExc) as cm:
+ # when
+ service.lookup_release('not-a-sha1')
+ self.assertIn('invalid checksum', cm.exception.args[0])
+
+ mock_backend.release_get.called = False
+ @patch('swh.web.ui.service.backend')
@istest
- def lookup_release(self):
+ def lookup_release_ko_id_checksum_ok_but_not_a_sha1(self, mock_backend):
# given
- self.storage.release_get = MagicMock(return_value=[{
+ mock_backend.release_get = MagicMock()
+
+ # when
+ with self.assertRaises(BadInputExc) as cm:
+ service.lookup_release(
+ '13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f62d4daf5'
+ '1aea892abe')
+ self.assertIn('sha1_git supported', cm.exception.args[0])
+
+ mock_backend.release_get.called = False
+
+ @patch('swh.web.ui.service.backend')
+ @istest
+ def lookup_release(self, mock_backend):
+ # given
+ mock_backend.release_get = MagicMock(return_value={
'id': hex_to_hash('65a55bbdf3629f916219feb3dcc7393ded1bc8db'),
'target': None,
'date': datetime.datetime(2015, 1, 1, 22, 0, 0,
tzinfo=datetime.timezone.utc),
'name': b'v0.0.1',
'message': b'synthetic release',
'synthetic': True,
- }])
+ })
# when
actual_release = service.lookup_release(
'65a55bbdf3629f916219feb3dcc7393ded1bc8db')
# then
self.assertEqual(actual_release, {
'id': '65a55bbdf3629f916219feb3dcc7393ded1bc8db',
'target': None,
'date': datetime.datetime(2015, 1, 1, 22, 0, 0,
tzinfo=datetime.timezone.utc),
'name': 'v0.0.1',
'message': 'synthetic release',
'synthetic': True,
})
- self.storage.release_get.assert_called_with(
- [hex_to_hash('65a55bbdf3629f916219feb3dcc7393ded1bc8db')])
-
- @istest
- def lookup_release_ko_id_checksum_not_ok_because_not_a_sha1(self):
- # given
- self.storage.release_get = MagicMock()
-
- with self.assertRaises(BadInputExc) as cm:
- # when
- service.lookup_release('not-a-sha1')
- self.assertIn('invalid checksum', cm.exception.args[0])
-
- self.storage.release_get.called = False
-
- @istest
- def lookup_release_ko_id_checksum_ok_but_not_a_sha1(self):
- # given
- self.storage.release_get = MagicMock()
-
- # when
- with self.assertRaises(BadInputExc) as cm:
- service.lookup_release(
- '13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f62d4daf5'
- '1aea892abe')
- self.assertIn('sha1_git supported', cm.exception.args[0])
-
- self.storage.release_get.called = False
+ mock_backend.release_get.assert_called_with(
+ hex_to_hash('65a55bbdf3629f916219feb3dcc7393ded1bc8db'))
@istest
def lookup_revision_with_context_ko_not_a_sha1_1(self):
# given
- sha1_git_root = '13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f62d4' \
- 'daf51aea892abe'
- sha1_git = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
+ sha1_git = '13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f62d4' \
+ 'daf51aea892abe'
+ sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
# when
with self.assertRaises(BadInputExc) as cm:
service.lookup_revision_with_context(sha1_git_root, sha1_git)
self.assertIn('Only sha1_git is supported', cm.exception.args[0])
@istest
def lookup_revision_with_context_ko_not_a_sha1_2(self):
# given
sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
sha1_git = '13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f6' \
'2d4daf51aea892abe'
# when
with self.assertRaises(BadInputExc) as cm:
service.lookup_revision_with_context(sha1_git_root, sha1_git)
self.assertIn('Only sha1_git is supported', cm.exception.args[0])
@patch('swh.web.ui.service.lookup_revision')
@istest
- def lookup_revision_with_context_ko_sha1_git_does_not_exist(self, mock):
+ def lookup_revision_with_context_ko_sha1_git_does_not_exist(
+ self,
+ mock_lookup_revision):
# given
sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
sha1_git = '777777bdf3629f916219feb3dcc7393ded1bc8db'
- mock.return_value = None
+ mock_lookup_revision.return_value = None
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_revision_with_context(sha1_git_root, sha1_git)
self.assertIn('Revision 777777bdf3629f916219feb3dcc7393ded1bc8db'
' not found', cm.exception.args[0])
- mock.assert_called_once_with(sha1_git)
+ mock_lookup_revision.assert_called_once_with(sha1_git)
@patch('swh.web.ui.service.lookup_revision')
@istest
- def lookup_revision_with_context_ko_root_sha1_git_does_not_exist(self,
- mock):
+ def lookup_revision_with_context_ko_root_sha1_git_does_not_exist(
+ self,
+ mock_lookup_revision):
# given
sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
sha1_git = '777777bdf3629f916219feb3dcc7393ded1bc8db'
- mock.side_effect = ['foo', None]
+ mock_lookup_revision.side_effect = ['foo', None]
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_revision_with_context(sha1_git_root, sha1_git)
self.assertIn('Revision 65a55bbdf3629f916219feb3dcc7393ded1bc8db'
' not found', cm.exception.args[0])
- mock.assert_has_calls([call(sha1_git), call(sha1_git_root)])
+ mock_lookup_revision.assert_has_calls([call(sha1_git),
+ call(sha1_git_root)])
+ @patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.lookup_revision')
@istest
- def lookup_revision_with_context(self, mock):
+ def lookup_revision_with_context(self, mock_lookup_revision, mock_backend):
# given
stub_revisions = [
{
'id': b'666' + 17 * b'\x00',
'parents': [b'999'],
},
{
'id': b'999',
'parents': [b'777', b'883' + 17 * b'\x00', b'888'],
},
{
'id': b'777',
'parents': [b'883' + 17 * b'\x00'],
},
{
'id': b'883' + 17 * b'\x00',
'parents': [],
'directory': b'278',
},
{
'id': b'888',
'parents': [b'889'],
},
{
'id': b'889',
'parents': [],
},
]
# lookup revision first 883, then 666 (both exists)
- mock.side_effect = [
+ mock_lookup_revision.side_effect = [
{
'id': '383833' + 17 * '00',
'parents': [],
'directory': '323738',
},
{
'id': '363636' + 17 * '00',
'parents': ['393939'],
},
]
- self.storage.revision_log = MagicMock(
+ mock_backend.revision_log = MagicMock(
return_value=stub_revisions)
# when
sha1_git_root = '363636' + 17 * '00' # (ascii code for 666)
sha1_git = '383833' + 17 * '00' # (ascii code for 883)
actual_revisions = service.lookup_revision_with_context(sha1_git_root,
sha1_git)
# then
self.assertEquals(actual_revisions, {
'id': '383833' + 17 * '00',
'parents': [],
'children': ['393939', '373737'],
'directory': '323738',
})
- self.storage.revision_log.assert_called_with(
+ mock_backend.revision_log.assert_called_with(
hex_to_hash(sha1_git_root), 100)
+ @patch('swh.web.ui.service.backend')
@istest
- def lookup_revision(self):
+ def lookup_revision(self, mock_backend):
# given
- self.storage.revision_get = MagicMock(return_value=[{
+ mock_backend.revision_get = MagicMock(return_value={
'id': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'),
'directory': hex_to_hash(
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'),
'author': {
'name': b'bill & boule',
'email': b'bill@boule.org',
},
'committer': {
'name': b'boule & bill',
'email': b'boule@bill.org',
},
'message': b'elegant fix for bug 31415957',
'date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'date_offset': 0,
'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
- }])
+ })
# when
actual_revision = service.lookup_revision(
'18d8be353ed3480476f032475e7c233eff7371d5')
# then
self.assertEqual(actual_revision, {
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'author': {
'name': 'bill & boule',
'email': 'bill@boule.org',
},
'committer': {
'name': 'boule & bill',
'email': 'boule@bill.org',
},
'message': 'elegant fix for bug 31415957',
'date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'date_offset': 0,
'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
})
- self.storage.revision_get.assert_called_with(
- [hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5')])
+ mock_backend.revision_get.assert_called_with(
+ hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'))
+ @patch('swh.web.ui.service.backend')
@istest
- def lookup_revision_log(self):
+ def lookup_revision_log(self, mock_backend):
# given
stub_revision_log = [{
'id': hex_to_hash('28d8be353ed3480476f032475e7c233eff7371d5'),
'directory': hex_to_hash(
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'),
'author': {
'name': b'bill & boule',
'email': b'bill@boule.org',
},
'committer': {
'name': b'boule & bill',
'email': b'boule@bill.org',
},
'message': b'elegant fix for bug 31415957',
'date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'date_offset': 0,
'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
}]
- self.storage.revision_log = MagicMock(return_value=stub_revision_log)
+ mock_backend.revision_log = MagicMock(return_value=stub_revision_log)
# when
actual_revision = service.lookup_revision_log(
'abcdbe353ed3480476f032475e7c233eff7371d5')
# then
self.assertEqual(list(actual_revision), [{
'id': '28d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'author': {
'name': 'bill & boule',
'email': 'bill@boule.org',
},
'committer': {
'name': 'boule & bill',
'email': 'boule@bill.org',
},
'message': 'elegant fix for bug 31415957',
'date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'date_offset': 0,
'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
}])
- self.storage.revision_log.assert_called_with(
+ mock_backend.revision_log.assert_called_with(
hex_to_hash('abcdbe353ed3480476f032475e7c233eff7371d5'), 100)
+ @patch('swh.web.ui.service.backend')
@istest
- def lookup_content_raw_not_found(self):
+ def lookup_content_raw_not_found(self, mock_backend):
# given
- self.storage.content_find = MagicMock(return_value=None)
+ mock_backend.content_find = MagicMock(return_value=None)
# when
actual_content = service.lookup_content_raw(
'sha1:18d8be353ed3480476f032475e7c233eff7371d5')
# then
self.assertIsNone(actual_content)
- self.storage.content_find.assert_called_with(
- {'sha1': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5')})
+ mock_backend.content_find.assert_called_with(
+ 'sha1', hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'))
+ @patch('swh.web.ui.service.backend')
@istest
- def lookup_content_raw(self):
+ def lookup_content_raw(self, mock_backend):
# given
- self.storage.content_find = MagicMock(return_value={
+ mock_backend.content_find = MagicMock(return_value={
'sha1': '18d8be353ed3480476f032475e7c233eff7371d5',
})
- self.storage.content_get = MagicMock(return_value=[{
- 'data': b'binary data',
- }, {}])
+ mock_backend.content_get = MagicMock(return_value={
+ 'data': b'binary data'})
# when
actual_content = service.lookup_content_raw(
'sha256:39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926')
# then
self.assertEquals(actual_content, {'data': b'binary data'})
- self.storage.content_find.assert_called_once_with(
- {'sha256': hex_to_hash('39007420ca5de7cb3cfc15196335507e'
- 'e76c98930e7e0afa4d2747d3bf96c926')})
- self.storage.content_get.assert_called_once_with(
- ['18d8be353ed3480476f032475e7c233eff7371d5'])
+ mock_backend.content_find.assert_called_once_with(
+ 'sha256', hex_to_hash('39007420ca5de7cb3cfc15196335507e'
+ 'e76c98930e7e0afa4d2747d3bf96c926'))
+ mock_backend.content_get.assert_called_once_with(
+ '18d8be353ed3480476f032475e7c233eff7371d5')
+ @patch('swh.web.ui.service.backend')
@istest
- def lookup_content_not_found(self):
+ def lookup_content_not_found(self, mock_backend):
# given
- self.storage.content_find = MagicMock(return_value=None)
+ mock_backend.content_find = MagicMock(return_value=None)
# when
actual_content = service.lookup_content(
'sha1:18d8be353ed3480476f032475e7c233eff7371d5')
# then
self.assertIsNone(actual_content)
- self.storage.content_find.assert_called_with(
- {'sha1': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5')})
+ mock_backend.content_find.assert_called_with(
+ 'sha1', hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'))
+ @patch('swh.web.ui.service.backend')
@istest
- def lookup_content_with_sha1(self):
+ def lookup_content_with_sha1(self, mock_backend):
# given
- self.storage.content_find = MagicMock(return_value={
+ mock_backend.content_find = MagicMock(return_value={
'sha1': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'),
'sha256': hex_to_hash('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926'),
'sha1_git': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'length': 190,
'status': 'hidden',
})
# when
actual_content = service.lookup_content(
'sha1:18d8be353ed3480476f032475e7c233eff7371d5')
# then
self.assertEqual(actual_content, {
'sha1': '18d8be353ed3480476f032475e7c233eff7371d5',
'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d274'
'7d3bf96c926',
'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'length': 190,
'status': 'absent',
})
- self.storage.content_find.assert_called_with(
- {'sha1': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5')})
+ mock_backend.content_find.assert_called_with(
+ 'sha1', hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'))
+ @patch('swh.web.ui.service.backend')
@istest
- def lookup_content_with_sha256(self):
+ def lookup_content_with_sha256(self, mock_backend):
# given
- self.storage.content_find = MagicMock(return_value={
+ mock_backend.content_find = MagicMock(return_value={
'sha1': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'),
'sha256': hex_to_hash('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926'),
'sha1_git': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'length': 360,
'status': 'visible',
})
# when
actual_content = service.lookup_content(
'sha256:39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926')
# then
self.assertEqual(actual_content, {
'sha1': '18d8be353ed3480476f032475e7c233eff7371d5',
'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d274'
'7d3bf96c926',
'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'length': 360,
'status': 'visible',
})
- self.storage.content_find.assert_called_with(
- {'sha256': hex_to_hash('39007420ca5de7cb3cfc15196335507e'
- 'e76c98930e7e0afa4d2747d3bf96c926')})
+ mock_backend.content_find.assert_called_with(
+ 'sha256', hex_to_hash('39007420ca5de7cb3cfc15196335507e'
+ 'e76c98930e7e0afa4d2747d3bf96c926'))
+ @patch('swh.web.ui.service.backend')
@istest
- def lookup_person(self):
+ def lookup_person(self, mock_backend):
# given
- self.storage.person_get = MagicMock(return_value=[{
+ mock_backend.person_get = MagicMock(return_value={
'id': 'person_id',
'name': b'some_name',
'email': b'some-email',
- }])
+ })
# when
actual_person = service.lookup_person('person_id')
# then
self.assertEqual(actual_person, {
'id': 'person_id',
'name': 'some_name',
'email': 'some-email',
})
- self.storage.person_get.assert_called_with(['person_id'])
+ mock_backend.person_get.assert_called_with('person_id')
+ @patch('swh.web.ui.service.backend')
@istest
- def lookup_person_not_found(self):
+ def lookup_directory_bad_checksum(self, mock_backend):
# given
- self.storage.person_get = MagicMock(return_value=[])
-
- # when
- actual_person = service.lookup_person('person_id')
-
- # then
- self.assertIsNone(actual_person)
-
- self.storage.person_get.assert_called_with(['person_id'])
-
- @istest
- def lookup_directory_bad_checksum(self):
- # given
- self.storage.directory_get = MagicMock()
+ mock_backend.directory_get = MagicMock()
# when
with self.assertRaises(BadInputExc):
service.lookup_directory('directory_id')
# then
- self.storage.directory_get.called = False
+ mock_backend.directory_get.called = False
+ @patch('swh.web.ui.service.backend')
@istest
- def lookup_directory_not_found(self):
+ def lookup_directory(self, mock_backend):
# given
- self.storage.directory_get = MagicMock(return_value=[])
-
- # when
- actual_directory = service.lookup_directory(
- '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
-
- # then
- self.assertIsNone(actual_directory)
-
- self.storage.directory_get.assert_called_with(
- hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'))
-
- @istest
- def lookup_directory(self):
- # given
- dir_entries_input = {
+ stub_dir_entries = [{
'sha1': hex_to_hash('5c6f0e2750f48fa0bd0c4cf5976ba0b9e0'
'2ebda5'),
'sha256': hex_to_hash('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926'),
'sha1_git': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'target': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'dir_id': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'name': b'bob',
'type': 10,
- }
+ }]
- expected_dir_entries = {
+ expected_dir_entries = [{
'sha1': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5',
'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d2747'
'd3bf96c926',
'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'target': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'dir_id': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'name': 'bob',
'type': 10,
- }
+ }]
- self.storage.directory_get = MagicMock(
- return_value=[dir_entries_input])
+ mock_backend.directory_get = MagicMock(
+ return_value=stub_dir_entries)
# when
actual_directory = service.lookup_directory(
'40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
# then
- self.assertIsNotNone(actual_directory)
- self.assertEqual(list(actual_directory), [expected_dir_entries])
+ self.assertEqual(list(actual_directory), expected_dir_entries)
- self.storage.directory_get.assert_called_with(
+ mock_backend.directory_get.assert_called_with(
hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'))
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Thu, Jul 3, 10:42 AM (1 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3253678
Attached To
rDWAPPS Web applications
Event Timeline
Log In to Comment