Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9124666
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
61 KB
Subscribers
None
View Options
diff --git a/swh/web/ui/query.py b/swh/web/ui/query.py
index ac795517..5de6f231 100644
--- a/swh/web/ui/query.py
+++ b/swh/web/ui/query.py
@@ -1,58 +1,87 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import re
from swh.core.hashutil import ALGORITHMS, hex_to_hash
from swh.web.ui.exc import BadInputExc
SHA256_RE = re.compile(r'^[0-9a-f]{64}$', re.IGNORECASE)
SHA1_RE = re.compile(r'^[0-9a-f]{40}$', re.IGNORECASE)
def parse_hash(q):
"""Detect the hash type of a user submitted query string.
Args:
query string with the following format: "[HASH_TYPE:]HEX_CHECKSUM",
where HASH_TYPE is optional, defaults to "sha1", and can be one of
swh.core.hashutil.ALGORITHMS
Returns:
A pair (hash_algorithm, byte hash value)
Raises:
ValueError if the given query string does not correspond to a valid
hash value
"""
def guess_algo(q):
if SHA1_RE.match(q):
return 'sha1'
elif SHA256_RE.match(q):
return 'sha256'
else:
raise BadInputExc('Invalid checksum query string %s' % q)
def check_algo(algo, hex):
if (algo in {'sha1', 'sha1_git'} and not SHA1_RE.match(hex)) \
or (algo == 'sha256' and not SHA256_RE.match(hex)):
raise BadInputExc('Invalid hash %s for algorithm %s' % (hex, algo))
parts = q.split(':')
if len(parts) > 2:
raise BadInputExc('Invalid checksum query string %s' % q)
elif len(parts) == 1:
parts = (guess_algo(q), q)
elif len(parts) == 2:
check_algo(parts[0], parts[1])
algo = parts[0]
if algo not in ALGORITHMS:
raise BadInputExc('Unknown hash algorithm %s' % algo)
return (algo, hex_to_hash(parts[1]))
+
+
+def parse_hash_with_algorithms_or_throws(q, accepted_algo, error_msg):
+ """Parse a query but only accepts accepted_algo.
+ Otherwise, raise the exception with message error_msg.
+
+ Args:
+ - q: query string with the following format: "[HASH_TYPE:]HEX_CHECKSUM"
+ where HASH_TYPE is optional, defaults to "sha1", and can be one of
+ swh.core.hashutil.ALGORITHMS.
+ - accepted_algo: array of strings representing the names of accepted
+ algorithms.
+ - error_msg: error message to raise as BadInputExc if the algo of
+ the query does not match.
+
+ Returns:
+ A pair (hash_algorithm, byte hash value)
+
+ Raises:
+ BadInputExc when the inputs is invalid or does not
+ validate the accepted algorithms.
+
+ """
+ algo, hash = parse_hash(q)
+
+ if algo not in accepted_algo:
+ raise BadInputExc(error_msg)
+
+ return (algo, hash)
diff --git a/swh/web/ui/service.py b/swh/web/ui/service.py
index 1b41d8ab..2e4c6d00 100644
--- a/swh/web/ui/service.py
+++ b/swh/web/ui/service.py
@@ -1,370 +1,376 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import defaultdict
from swh.core import hashutil
from swh.web.ui import converters, query, upload, backend
-from swh.web.ui.exc import BadInputExc, NotFoundExc
+from swh.web.ui.exc import NotFoundExc
def hash_and_search(filepath):
"""Hash the filepath's content as sha1, then search in storage if
it exists.
Args:
Filepath of the file to hash and search.
Returns:
Tuple (hex sha1, found as True or false).
The found boolean, according to whether the sha1 of the file
is present or not.
"""
h = hashutil.hashfile(filepath)
c = backend.content_find('sha1', h['sha1'])
if c:
r = converters.from_content(c)
r['found'] = True
return r
else:
return {'sha1': hashutil.hash_to_hex(h['sha1']),
'found': False}
def upload_and_search(file):
"""Upload a file and compute its hash.
"""
tmpdir, filename, filepath = upload.save_in_upload_folder(file)
res = {'filename': filename}
try:
content = hash_and_search(filepath)
res.update(content)
return res
finally:
# clean up
if tmpdir:
upload.cleanup(tmpdir)
def lookup_hash(q):
"""Checks if the storage contains a given content checksum
Args: query string of the form <hash_algo:hash>
Returns: Dict with key found to True or False, according to
whether the checksum is present or not
"""
- (algo, hash) = query.parse_hash(q)
+ algo, hash = query.parse_hash(q)
found = backend.content_find(algo, hash)
return {'found': found,
'algo': algo}
def lookup_hash_origin(q):
"""Return information about the checksum contained in the query q.
Args: query string of the form <hash_algo:hash>
Returns:
origin as dictionary if found for the given content.
"""
- algo, h = query.parse_hash(q)
- origin = backend.content_find_occurrence(algo, h)
+ algo, hash = query.parse_hash(q)
+ origin = backend.content_find_occurrence(algo, hash)
return converters.from_origin(origin)
def lookup_origin(origin_id):
"""Return information about the origin with id origin_id.
Args:
origin_id as string
Returns:
origin information as dict.
"""
return backend.origin_get(origin_id)
def lookup_person(person_id):
"""Return information about the person with id person_id.
Args:
person_id as string
Returns:
person information as dict.
"""
person = backend.person_get(person_id)
return converters.from_person(person)
def lookup_directory(sha1_git):
"""Return information about the directory with id sha1_git.
Args:
sha1_git as string
Returns:
directory information as dict.
"""
- algo, sha1_git_bin = query.parse_hash(sha1_git)
- if algo != 'sha1': # HACK: sha1_git really but they are both sha1...
- raise BadInputExc('Only sha1_git is supported.')
+ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws(
+ sha1_git,
+ ['sha1'], # HACK: sha1_git really
+ 'Only sha1_git is supported.')
directory_entries = backend.directory_get(sha1_git_bin)
return map(converters.from_directory_entry, directory_entries)
def lookup_release(release_sha1_git):
"""Return information about the release with sha1 release_sha1_git.
Args:
release_sha1_git: The release's sha1 as hexadecimal
Returns:
Release information as dict.
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
- algo, sha1_git_bin = query.parse_hash(release_sha1_git)
- if algo != 'sha1': # HACK: sha1_git really but they are both sha1...
- raise BadInputExc('Only sha1_git is supported.')
-
+ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws(
+ release_sha1_git,
+ ['sha1'],
+ 'Only sha1_git is supported.')
res = backend.release_get(sha1_git_bin)
return converters.from_release(res)
def lookup_revision(rev_sha1_git):
"""Return information about the revision with sha1 revision_sha1_git.
Args:
revision_sha1_git: The revision's sha1 as hexadecimal
Returns:
Revision information as dict.
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
- algo, sha1_git_bin = query.parse_hash(rev_sha1_git)
- if algo != 'sha1': # HACK: sha1_git really but they are both sha1...
- raise BadInputExc('Only sha1_git is supported.')
+ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws(
+ rev_sha1_git,
+ ['sha1'],
+ 'Only sha1_git is supported.')
res = backend.revision_get(sha1_git_bin)
return converters.from_revision(res)
def lookup_revision_by(origin_id,
branch_name="refs/heads/master",
timestamp=None):
"""Lookup revisions by origin_id, branch_name and timestamp.
If:
- branch_name is not provided, lookup using 'refs/heads/master' as default.
- ts is not provided, use the most recent
Yields:
The revisions matching the criterions.
"""
res = backend.revision_get_by(origin_id, branch_name, timestamp)
return converters.from_revision(res)
def lookup_revision_log(rev_sha1_git, limit=100):
"""Return information about the revision with sha1 revision_sha1_git.
Args:
revision_sha1_git: The revision's sha1 as hexadecimal
limit: the maximum number of revisions returned
Returns:
Revision information as dict.
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
- algo, bin_sha1 = query.parse_hash(rev_sha1_git)
- if algo != 'sha1': # HACK: sha1_git really but they are both sha1...
- raise BadInputExc('Only sha1_git is supported.')
+ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws(
+ rev_sha1_git,
+ ['sha1'],
+ 'Only sha1_git is supported.')
- revision_entries = backend.revision_log(bin_sha1, limit)
+ revision_entries = backend.revision_log(sha1_git_bin, limit)
return map(converters.from_revision, revision_entries)
def lookup_revision_with_context(sha1_git_root, sha1_git, limit=100):
"""Return information about revision sha1_git, limited to the
sub-graph of all transitive parents of sha1_git_root.
In other words, sha1_git is an ancestor of sha1_git_root.
Args:
sha1_git_root: latest revision of the browsed history
sha1_git: one of sha1_git_root's ancestors
limit: limit the lookup to 100 revisions back
Returns:
Information on sha1_git if it is an ancestor of sha1_git_root
including children leading to sha1_git_root
Raises:
BadInputExc in case of unknown algo_hash or bad hash
NotFoundExc if either revision is not found or if sha1_git is not an
ancestor of sha1_git_root
"""
- algo, sha1_git_bin = query.parse_hash(sha1_git)
- if algo != 'sha1': # HACK: sha1_git really but they are both sha1...
- raise BadInputExc('Only sha1_git is supported.')
+ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws(
+ sha1_git,
+ ['sha1'],
+ 'Only sha1_git is supported.')
- algo, sha1_git_root_bin = query.parse_hash(sha1_git_root)
- if algo != 'sha1': # HACK: sha1_git really but they are both sha1...
- raise BadInputExc('Only sha1_git is supported.')
+ _, sha1_git_root_bin = query.parse_hash_with_algorithms_or_throws(
+ sha1_git_root,
+ ['sha1'],
+ 'Only sha1_git is supported.')
revision = backend.revision_get(sha1_git_bin)
if not revision:
raise NotFoundExc('Revision %s not found' % sha1_git)
revision_root = backend.revision_get(sha1_git_root_bin)
if not revision_root:
raise NotFoundExc('Revision %s not found' % sha1_git_root)
revision_log = backend.revision_log(sha1_git_root_bin, limit)
parents = {}
children = defaultdict(list)
for rev in revision_log:
rev_id = rev['id']
parents[rev_id] = []
for parent_id in rev['parents']:
parents[rev_id].append(parent_id)
children[parent_id].append(rev_id)
if revision['id'] not in parents:
raise NotFoundExc('Revision %s is not an ancestor of %s' %
(sha1_git, sha1_git_root))
revision['children'] = children[revision['id']]
return converters.from_revision(revision)
def _lookup_name_in(directory_entries, name):
"""Given a name and a list of directory entries, return the
corresponding entry."""
bname = name.encode('utf-8')
res = list(filter(lambda e: e['name'] == bname, directory_entries))
if not res:
return None
return res[0]
def lookup_directory_with_revision(sha1_git, dir_path=None):
"""Return information on directory pointed by revision with sha1_git.
If dir_path is not provided, display top level directory.
Otherwise, display the directory pointed by dir_path (if it exists).
Args:
sha1_git: revision's hash.
dir_path: optional directory pointed to by that revision.
Returns:
Information on the directory pointed to by that revision.
Raises:
BadInputExc in case of unknown algo_hash or bad hash.
NotFoundExc either if the revision is not found or the path referenced
does not exist.
NotImplementedError in case of dir_path exists but do not reference a
type 'dir' or 'file'.
"""
- algo, sha1_git_bin = query.parse_hash(sha1_git)
- if algo != 'sha1': # HACK: sha1_git really but they are both sha1...
- raise BadInputExc('Only sha1_git is supported.')
+ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws(
+ sha1_git,
+ ['sha1'],
+ 'Only sha1_git is supported.')
revision = backend.revision_get(sha1_git_bin)
if not revision:
raise NotFoundExc('Revision %s not found' % sha1_git)
dir_sha1_git_bin = revision['directory']
if dir_path:
directory_entries = backend.directory_get(dir_sha1_git_bin,
recursive=True)
entity = _lookup_name_in(directory_entries, dir_path)
if not entity:
raise NotFoundExc(
"Directory or File '%s' pointed to by revision %s not found"
% (dir_path, sha1_git))
else:
entity = {'type': 'dir', 'target': dir_sha1_git_bin}
if entity['type'] == 'dir':
directory_entries = backend.directory_get(entity['target'])
return {'type': 'dir',
'content': map(converters.from_directory_entry,
directory_entries)}
elif entity['type'] == 'file': # content
content = backend.content_find('sha1_git', entity['target'])
return {'type': 'file',
'content': converters.from_content(content)}
else:
raise NotImplementedError('Entity of type %s not implemented.'
% entity['type'])
def lookup_content(q):
"""Lookup the content designed by q.
Args:
q: The release's sha1 as hexadecimal
"""
- (algo, hash) = query.parse_hash(q)
+ algo, hash = query.parse_hash(q)
c = backend.content_find(algo, hash)
return converters.from_content(c)
def lookup_content_raw(q):
"""Lookup the content designed by q.
Args:
q: query string of the form <hash_algo:hash>
Returns:
dict with 'sha1' and 'data' keys.
data representing its raw data decoded.
"""
- (algo, hash) = query.parse_hash(q)
+ algo, hash = query.parse_hash(q)
c = backend.content_find(algo, hash)
if not c:
return None
content = backend.content_get(c['sha1'])
return converters.from_content(content)
def stat_counters():
"""Return the stat counters for Software Heritage
Returns:
A dict mapping textual labels to integer values.
"""
return backend.stat_counters()
diff --git a/swh/web/ui/tests/test_query.py b/swh/web/ui/tests/test_query.py
index 53dfa039..b4e2aa81 100644
--- a/swh/web/ui/tests/test_query.py
+++ b/swh/web/ui/tests/test_query.py
@@ -1,75 +1,125 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import unittest
+from unittest.mock import patch
from nose.tools import istest
from swh.core import hashutil
from swh.web.ui import query
from swh.web.ui.exc import BadInputExc
class QueryTestCase(unittest.TestCase):
@istest
def parse_hash_malformed_query_with_more_than_2_parts(self):
with self.assertRaises(BadInputExc):
query.parse_hash('sha1:1234567890987654:other-stuff')
@istest
def parse_hash_guess_sha1(self):
h = 'f1d2d2f924e986ac86fdf7b36c94bcdf32beec15'
r = query.parse_hash(h)
self.assertEquals(r, ('sha1', hashutil.hex_to_hash(h)))
@istest
def parse_hash_guess_sha256(self):
h = '084C799CD551DD1D8D5C5F9A5D593B2' \
'E931F5E36122ee5c793c1d08a19839cc0'
r = query.parse_hash(h)
self.assertEquals(r, ('sha256', hashutil.hex_to_hash(h)))
@istest
def parse_hash_guess_algo_malformed_hash(self):
with self.assertRaises(BadInputExc):
query.parse_hash('1234567890987654')
@istest
def parse_hash_check_sha1(self):
h = 'f1d2d2f924e986ac86fdf7b36c94bcdf32beec15'
r = query.parse_hash('sha1:' + h)
self.assertEquals(r, ('sha1', hashutil.hex_to_hash(h)))
@istest
def parse_hash_check_sha1_git(self):
h = 'e1d2d2f924e986ac86fdf7b36c94bcdf32beec15'
r = query.parse_hash('sha1_git:' + h)
self.assertEquals(r, ('sha1_git', hashutil.hex_to_hash(h)))
@istest
def parse_hash_check_sha256(self):
h = '084C799CD551DD1D8D5C5F9A5D593B2E931F5E36122ee5c793c1d08a19839cc0'
r = query.parse_hash('sha256:' + h)
self.assertEquals(r, ('sha256', hashutil.hex_to_hash(h)))
@istest
def parse_hash_check_algo_malformed_sha1_hash(self):
with self.assertRaises(BadInputExc):
query.parse_hash('sha1:1234567890987654')
@istest
def parse_hash_check_algo_malformed_sha1_git_hash(self):
with self.assertRaises(BadInputExc):
query.parse_hash('sha1_git:1234567890987654')
@istest
def parse_hash_check_algo_malformed_sha256_hash(self):
with self.assertRaises(BadInputExc):
query.parse_hash('sha256:1234567890987654')
@istest
def parse_hash_check_algo_unknown_one(self):
with self.assertRaises(BadInputExc):
query.parse_hash('sha2:1234567890987654')
+
+ @patch('swh.web.ui.query.parse_hash')
+ @istest
+ def parse_hash_with_algorithms_or_throws_bad_query(self, mock_hash):
+ # given
+ mock_hash.side_effect = BadInputExc('Error input')
+
+ # when
+ with self.assertRaises(BadInputExc) as cm:
+ query.parse_hash_with_algorithms_or_throws(
+ 'sha1:blah',
+ ['sha1'],
+ 'useless error message for this use case')
+ self.assertIn('Error input', cm.exception.args[0])
+
+ mock_hash.assert_called_once_with('sha1:blah')
+
+ @patch('swh.web.ui.query.parse_hash')
+ @istest
+ def parse_hash_with_algorithms_or_throws_bad_algo(self, mock_hash):
+ # given
+ mock_hash.return_value = 'sha1', '123'
+
+ # when
+ with self.assertRaises(BadInputExc) as cm:
+ query.parse_hash_with_algorithms_or_throws(
+ 'sha1:431',
+ ['sha1_git'],
+ 'Only sha1_git!')
+ self.assertIn('Only sha1_git!', cm.exception.args[0])
+
+ mock_hash.assert_called_once_with('sha1:431')
+
+ @patch('swh.web.ui.query.parse_hash')
+ @istest
+ def parse_hash_with_algorithms(self, mock_hash):
+ # given
+ mock_hash.return_value = ('sha256', b'123')
+
+ # when
+ algo, sha = query.parse_hash_with_algorithms_or_throws(
+ 'sha256:123',
+ ['sha256', 'sha1_git'],
+ 'useless error message for this use case')
+
+ self.assertEquals(algo, 'sha256')
+ self.assertEquals(sha, b'123')
+
+ mock_hash.assert_called_once_with('sha256:123')
diff --git a/swh/web/ui/tests/test_service.py b/swh/web/ui/tests/test_service.py
index fffe0343..20c33303 100644
--- a/swh/web/ui/tests/test_service.py
+++ b/swh/web/ui/tests/test_service.py
@@ -1,1133 +1,1110 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
from nose.tools import istest
from unittest.mock import MagicMock, patch, call
from swh.core.hashutil import hex_to_hash, hash_to_hex
from swh.web.ui import service
from swh.web.ui.exc import BadInputExc, NotFoundExc
from swh.web.ui.tests import test_app
class ServiceTestCase(test_app.SWHApiTestCase):
@patch('swh.web.ui.service.backend')
@istest
def lookup_hash_does_not_exist(self, mock_backend):
# given
mock_backend.content_find = MagicMock(return_value=None)
# when
actual_lookup = service.lookup_hash(
'sha1_git:123caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEquals({'found': None,
'algo': 'sha1_git'}, actual_lookup)
# check the function has been called with parameters
mock_backend.content_find.assert_called_with(
'sha1_git',
hex_to_hash('123caf10e9535160d90e874b45aa426de762f19f'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_hash_exist(self, mock_backend):
# given
stub_content = {
'sha1': hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')
}
mock_backend.content_find = MagicMock(return_value=stub_content)
# when
actual_lookup = service.lookup_hash(
'sha1:456caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEquals({'found': stub_content,
'algo': 'sha1'}, actual_lookup)
mock_backend.content_find.assert_called_with(
'sha1',
hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f'),
)
@patch('swh.web.ui.service.backend')
@istest
def lookup_hash_origin(self, mock_backend):
# given
mock_backend.content_find_occurrence = MagicMock(return_value={
'origin_type': 'sftp',
'origin_url': 'sftp://ftp.gnu.org/gnu/octave',
'branch': 'octavio-3.4.0.tar.gz',
'revision': b'\xb0L\xaf\x10\xe9SQ`\xd9\x0e\x87KE\xaaBm\xe7b\xf1\x9f', # noqa
'path': b'octavio-3.4.0/doc/interpreter/octave.html/doc_002dS_005fISREG.html' # noqa
})
expected_origin = {
'origin_type': 'sftp',
'origin_url': 'sftp://ftp.gnu.org/gnu/octave',
'branch': 'octavio-3.4.0.tar.gz',
'revision': 'b04caf10e9535160d90e874b45aa426de762f19f',
'path': 'octavio-3.4.0/doc/interpreter/octave.html/doc'
'_002dS_005fISREG.html'
}
# when
actual_origin = service.lookup_hash_origin(
'sha1_git:456caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEqual(actual_origin, expected_origin)
mock_backend.content_find_occurrence.assert_called_with(
'sha1_git',
hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f'))
@patch('swh.web.ui.service.backend')
@istest
def stat_counters(self, mock_backend):
# given
input_stats = {
"content": 1770830,
"directory": 211683,
"directory_entry_dir": 209167,
"directory_entry_file": 1807094,
"directory_entry_rev": 0,
"entity": 0,
"entity_history": 0,
"occurrence": 0,
"occurrence_history": 19600,
"origin": 1096,
"person": 0,
"release": 8584,
"revision": 7792,
"revision_history": 0,
"skipped_content": 0
}
mock_backend.stat_counters = MagicMock(return_value=input_stats)
# when
actual_stats = service.stat_counters()
# then
expected_stats = input_stats
self.assertEqual(actual_stats, expected_stats)
mock_backend.stat_counters.assert_called_with()
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.hashutil')
@istest
def hash_and_search(self, mock_hashutil, mock_backend):
# given
bhash = hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')
mock_hashutil.hashfile.return_value = {'sha1': bhash}
mock_backend.content_find = MagicMock(return_value={
'sha1': bhash,
'sha1_git': bhash,
})
# when
actual_content = service.hash_and_search('/some/path')
# then
self.assertEqual(actual_content, {
'sha1': '456caf10e9535160d90e874b45aa426de762f19f',
'sha1_git': '456caf10e9535160d90e874b45aa426de762f19f',
'found': True,
})
mock_hashutil.hashfile.assert_called_once_with('/some/path')
mock_backend.content_find.assert_called_once_with('sha1', bhash)
@patch('swh.web.ui.service.hashutil')
@istest
def hash_and_search_not_found(self, mock_hashutil):
# given
bhash = hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')
mock_hashutil.hashfile.return_value = {'sha1': bhash}
mock_hashutil.hash_to_hex = MagicMock(
return_value='456caf10e9535160d90e874b45aa426de762f19f')
self.storage.content_find = MagicMock(return_value=None)
# when
actual_content = service.hash_and_search('/some/path')
# then
self.assertEqual(actual_content, {
'sha1': '456caf10e9535160d90e874b45aa426de762f19f',
'found': False,
})
mock_hashutil.hashfile.assert_called_once_with('/some/path')
self.storage.content_find.assert_called_once_with({'sha1': bhash})
mock_hashutil.hash_to_hex.assert_called_once_with(bhash)
@patch('swh.web.ui.service.upload')
@istest
def test_upload_and_search(self, mock_upload):
mock_upload.save_in_upload_folder.return_value = (
'/tmp/dir', 'some-filename', '/tmp/dir/path/some-filename')
service.hash_and_search = MagicMock(side_effect=lambda filepath:
{'sha1': 'blah',
'found': True})
mock_upload.cleanup.return_value = None
file = MagicMock(filename='some-filename')
# when
actual_res = service.upload_and_search(file)
# then
self.assertEqual(actual_res, {
'filename': 'some-filename',
'sha1': 'blah',
'found': True})
mock_upload.save_in_upload_folder.assert_called_with(file)
mock_upload.cleanup.assert_called_with('/tmp/dir')
service.hash_and_search.assert_called_once_with(
'/tmp/dir/path/some-filename')
@patch('swh.web.ui.service.backend')
@istest
def lookup_origin(self, mock_backend):
# given
mock_backend.origin_get = MagicMock(return_value={
'id': 'origin-id',
'lister': 'uuid-lister',
'project': 'uuid-project',
'url': 'ftp://some/url/to/origin',
'type': 'ftp'})
# when
actual_origin = service.lookup_origin('origin-id')
# then
self.assertEqual(actual_origin, {'id': 'origin-id',
'lister': 'uuid-lister',
'project': 'uuid-project',
'url': 'ftp://some/url/to/origin',
'type': 'ftp'})
mock_backend.origin_get.assert_called_with('origin-id')
@patch('swh.web.ui.service.backend')
@istest
def lookup_release_ko_id_checksum_not_ok_because_not_a_sha1(self,
mock_backend):
# given
mock_backend.release_get = MagicMock()
with self.assertRaises(BadInputExc) as cm:
# when
service.lookup_release('not-a-sha1')
self.assertIn('invalid checksum', cm.exception.args[0])
mock_backend.release_get.called = False
@patch('swh.web.ui.service.backend')
@istest
def lookup_release_ko_id_checksum_ok_but_not_a_sha1(self, mock_backend):
# given
mock_backend.release_get = MagicMock()
# when
with self.assertRaises(BadInputExc) as cm:
service.lookup_release(
'13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f62d4daf5'
'1aea892abe')
self.assertIn('sha1_git supported', cm.exception.args[0])
mock_backend.release_get.called = False
@patch('swh.web.ui.service.backend')
@istest
def lookup_release(self, mock_backend):
# given
mock_backend.release_get = MagicMock(return_value={
'id': hex_to_hash('65a55bbdf3629f916219feb3dcc7393ded1bc8db'),
'target': None,
'date': datetime.datetime(2015, 1, 1, 22, 0, 0,
tzinfo=datetime.timezone.utc),
'name': b'v0.0.1',
'message': b'synthetic release',
'synthetic': True,
})
# when
actual_release = service.lookup_release(
'65a55bbdf3629f916219feb3dcc7393ded1bc8db')
# then
self.assertEqual(actual_release, {
'id': '65a55bbdf3629f916219feb3dcc7393ded1bc8db',
'target': None,
'date': datetime.datetime(2015, 1, 1, 22, 0, 0,
tzinfo=datetime.timezone.utc),
'name': 'v0.0.1',
'message': 'synthetic release',
'synthetic': True,
})
mock_backend.release_get.assert_called_with(
hex_to_hash('65a55bbdf3629f916219feb3dcc7393ded1bc8db'))
@istest
def lookup_revision_with_context_ko_not_a_sha1_1(self):
# given
sha1_git = '13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f62d4' \
'daf51aea892abe'
sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
# when
with self.assertRaises(BadInputExc) as cm:
service.lookup_revision_with_context(sha1_git_root, sha1_git)
self.assertIn('Only sha1_git is supported', cm.exception.args[0])
@istest
def lookup_revision_with_context_ko_not_a_sha1_2(self):
# given
sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
sha1_git = '13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f6' \
'2d4daf51aea892abe'
# when
with self.assertRaises(BadInputExc) as cm:
service.lookup_revision_with_context(sha1_git_root, sha1_git)
self.assertIn('Only sha1_git is supported', cm.exception.args[0])
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_with_context_ko_sha1_git_does_not_exist(
self,
mock_backend):
# given
sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
sha1_git = '777777bdf3629f916219feb3dcc7393ded1bc8db'
sha1_git_bin = hex_to_hash(sha1_git)
mock_backend.revision_get.return_value = None
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_revision_with_context(sha1_git_root, sha1_git)
self.assertIn('Revision 777777bdf3629f916219feb3dcc7393ded1bc8db'
' not found', cm.exception.args[0])
mock_backend.revision_get.assert_called_once_with(
sha1_git_bin)
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_with_context_ko_root_sha1_git_does_not_exist(
self,
mock_backend):
# given
sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
sha1_git = '777777bdf3629f916219feb3dcc7393ded1bc8db'
sha1_git_root_bin = hex_to_hash(sha1_git_root)
sha1_git_bin = hex_to_hash(sha1_git)
mock_backend.revision_get.side_effect = ['foo', None]
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_revision_with_context(sha1_git_root, sha1_git)
self.assertIn('Revision 65a55bbdf3629f916219feb3dcc7393ded1bc8db'
' not found', cm.exception.args[0])
mock_backend.revision_get.assert_has_calls([call(sha1_git_bin),
call(sha1_git_root_bin)])
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_revision_with_context(self, mock_query, mock_backend):
# given
sha1_git_root = '666'
sha1_git = '883'
sha1_git_root_bin = b'666'
sha1_git_bin = b'883'
sha1_git_root_dict = {
'id': sha1_git_root_bin,
'parents': [b'999'],
}
sha1_git_dict = {
'id': sha1_git_bin,
'parents': [],
'directory': b'278',
}
stub_revisions = [
sha1_git_root_dict,
{
'id': b'999',
'parents': [b'777', b'883', b'888'],
},
{
'id': b'777',
'parents': [b'883'],
},
sha1_git_dict,
{
'id': b'888',
'parents': [b'889'],
},
{
'id': b'889',
'parents': [],
},
]
# inputs ok
- mock_query.parse_hash.side_effect = [
+ mock_query.parse_hash_with_algorithms_or_throws.side_effect = [
('sha1', sha1_git_bin),
('sha1', sha1_git_root_bin)
]
# lookup revision first 883, then 666 (both exists)
mock_backend.revision_get.side_effect = [
sha1_git_dict,
sha1_git_root_dict
]
mock_backend.revision_log = MagicMock(
return_value=stub_revisions)
# when
actual_revision = service.lookup_revision_with_context(
sha1_git_root,
sha1_git)
# then
self.assertEquals(actual_revision, {
'id': hash_to_hex(sha1_git_bin),
'parents': [],
'children': [hash_to_hex(b'999'), hash_to_hex(b'777')],
'directory': hash_to_hex(b'278'),
})
- mock_query.parse_hash.assert_has_calls([call(sha1_git),
- call(sha1_git_root)])
+ mock_query.parse_hash_with_algorithms_or_throws.assert_has_calls(
+ [call(sha1_git, ['sha1'], 'Only sha1_git is supported.'),
+ call(sha1_git_root, ['sha1'], 'Only sha1_git is supported.')])
mock_backend.revision_log.assert_called_with(
sha1_git_root_bin, 100)
@istest
def lookup_name_in(self):
file0 = {
"type": "file",
"sha1": b"8e295bec402303cf2bd21b68f1f1fb0692f9c00a",
"dir_id": b"93857db1982141c94d3ca05b16ef6bd41d9da2ef",
"name": b"Entries",
"perms": 100644,
"target": b"55b983eaed0e68f8402c4ef891f0fcbcc80ece74"
}
dir1 = {
"type": "dir",
"sha1": b"d4c7a6c81832350d05c3f76f5f193ee62a2e6a16",
"dir_id": b"93857db1982141c94d3ca05b16ef6bd41d9da2ef",
"name": b"doc",
"perms": 40000,
"target": b"5d71ad3d16c0aaf5e0c3f4a4241020d7962c0e43"
}
dir2 = {
"type": "dir",
"sha1": b"cvbfdrc81832350d05c3f76f5f193ee62a2e6a16",
"dir_id": b"789012b1982141c94d3ca05b16ef6bd41d9da2ef",
"name": b"generated",
"perms": 40000,
"target": b"1234563d16c0aaf5e0c3f4a4241020d7962c0e43"
}
dir_entries = [file0, dir1, dir2]
for e in [file0, dir1, dir2]:
actual_entity = service._lookup_name_in(dir_entries,
e['name'].decode('utf-8'))
self.assertEquals(actual_entity, e)
actual_entity = service._lookup_name_in(dir_entries, "nothing")
self.assertIsNone(actual_entity)
- @istest
- def lookup_directory_with_revision_bad_input(self):
- with self.assertRaises(BadInputExc) as cm:
- service.lookup_directory_with_revision('123', 'some/path')
- self.assertIn('Only sha1_git is supported', cm.exception.args[0])
-
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_revision_not_found(self,
mock_query,
mock_backend):
# given
- mock_query.parse_hash.return_value = 'sha1', b'123'
+ mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
+ b'123')
mock_backend.revision_get.return_value = None
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_directory_with_revision('123')
self.assertIn('Revision 123 not found', cm.exception.args[0])
- mock_query.parse_hash.assert_called_once_with('123')
+ mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
+ ('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_revision_without_path(self,
mock_query,
mock_backend):
# given
- mock_query.parse_hash.return_value = 'sha1', b'123'
+ mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
+ b'123')
dir_id = b'dir-id-as-sha1'
mock_backend.revision_get.return_value = {
'directory': dir_id,
}
stub_dir_entries = [{
'id': b'123',
'type': 'dir'
}, {
'id': b'456',
'type': 'file'
}]
mock_backend.directory_get.return_value = stub_dir_entries
# when
actual_directory_entries = service.lookup_directory_with_revision(
'123')
self.assertEqual(actual_directory_entries['type'], 'dir')
self.assertEqual(list(actual_directory_entries['content']),
stub_dir_entries)
- mock_query.parse_hash.assert_called_once_with(
- '123')
+ mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
+ ('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
mock_backend.directory_get.assert_called_once_with(dir_id)
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_revision_with_path_to_dir(self,
mock_query,
mock_backend):
# given
- mock_query.parse_hash.return_value = 'sha1', b'123'
+ mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
+ b'123')
dir_id = b'dir-id-as-sha1'
mock_backend.revision_get.return_value = {
'directory': dir_id,
}
stub_dir_ls = [
{
'type': 'dir',
'name': b'some/path',
'target': b'456'
},
{
'type': 'file',
'name': b'something-else.hs',
'target': b'789'
}
]
stub_dir_entries = [{
'id': b'12',
'type': 'dir'
}, {
'id': b'34',
'type': 'file'
}]
mock_backend.directory_get.side_effect = [
stub_dir_ls,
stub_dir_entries
]
# when
actual_directory_entries = service.lookup_directory_with_revision(
'123',
'some/path')
self.assertEqual(actual_directory_entries['type'], 'dir')
self.assertEqual(list(actual_directory_entries['content']),
stub_dir_entries)
- mock_query.parse_hash.assert_called_once_with(
- '123')
+ mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
+ ('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
mock_backend.directory_get.assert_has_calls([
call(b'dir-id-as-sha1', recursive=True),
call(b'456')
])
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_revision_with_path_to_file(
self,
mock_query,
mock_backend):
# given
- mock_query.parse_hash.return_value = 'sha1', b'123'
+ mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
+ b'123')
dir_id = b'dir-id-as-sha1'
mock_backend.revision_get.return_value = {
'directory': dir_id,
}
stub_dir_ls = [
{
'type': 'file',
'name': b'some/path/to/file',
'target': b'456'
},
{
'type': 'file',
'name': b'something-else.hs',
'target': b'789'
}
]
mock_backend.directory_get.return_value = stub_dir_ls
stub_content = {
'status': 'visible',
}
mock_backend.content_find.return_value = stub_content
# when
actual_content = service.lookup_directory_with_revision(
'123',
'some/path/to/file')
# then
self.assertEqual(actual_content, {'type': 'file',
'content': stub_content})
- mock_query.parse_hash.assert_called_once_with(
- '123')
+ mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
+ ('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
mock_backend.directory_get.assert_called_once_with(
b'dir-id-as-sha1', recursive=True)
mock_backend.content_find.assert_called_once_with('sha1_git', b'456')
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_ko_revision_with_path_to_nowhere(
self,
mock_query,
mock_backend):
# given
- mock_query.parse_hash.return_value = 'sha1', b'123'
+ mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
+ b'123')
dir_id = b'dir-id-as-sha1'
mock_backend.revision_get.return_value = {
'directory': dir_id,
}
stub_dir_ls = [
{
'type': 'file',
'name': b'some/path/to/dir',
'target': b'456'
},
{
'type': 'file',
'name': b'something-else.hs',
'target': b'789'
}
]
mock_backend.directory_get.return_value = stub_dir_ls
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_directory_with_revision(
'123',
'path/to/something/unknown')
self.assertIn("Directory/File 'path/to/something/unknown' " +
"pointed to by revision 123 not found",
cm.exception.args[0])
- mock_query.parse_hash.assert_called_once_with('123')
+ mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
+ ('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
mock_backend.directory_get.assert_called_once_with(
b'dir-id-as-sha1', recursive=True)
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_ok_type_not_implemented(
self,
mock_query,
mock_backend):
# given
- mock_query.parse_hash.return_value = 'sha1', b'123'
+ mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
+ b'123')
dir_id = b'dir-id-as-sha1'
mock_backend.revision_get.return_value = {
'directory': dir_id,
}
stub_dir_ls = [
{
'type': 'rev',
'name': b'some/path/to/rev',
'target': b'456'
},
{
'type': 'file',
'name': b'something-else.hs',
'target': b'789'
}
]
mock_backend.directory_get.return_value = stub_dir_ls
stub_content = {
'id': b'12',
'type': 'file'
}
mock_backend.content_get.return_value = stub_content
# when
with self.assertRaises(NotImplementedError) as cm:
service.lookup_directory_with_revision(
'123',
'some/path/to/rev')
self.assertIn("Entity of type 'rev' not implemented.",
cm.exception.args[0])
# then
- mock_query.parse_hash.assert_called_once_with(
- '123')
+ mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
+ ('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
mock_backend.directory_get.assert_called_once_with(
b'dir-id-as-sha1', recursive=True)
- @patch('swh.web.ui.service.query')
- @istest
- def lookup_revision_bad_input(self, mock_query):
- # given
- mock_query.parse_hash.return_value = ('sha1_git', 'do not care')
-
- # when
- with self.assertRaises(BadInputExc) as cm:
- service.lookup_revision('123')
- self.assertIn('Only sha1_git is supported.', cm.exception.args[0])
-
- mock_query.parse_hash.assert_called_with('123')
-
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision(self, mock_backend):
# given
mock_backend.revision_get = MagicMock(return_value={
'id': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'),
'directory': hex_to_hash(
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'),
'author': {
'name': b'bill & boule',
'email': b'bill@boule.org',
},
'committer': {
'name': b'boule & bill',
'email': b'boule@bill.org',
},
'message': b'elegant fix for bug 31415957',
'date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'date_offset': 0,
'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
})
# when
actual_revision = service.lookup_revision(
'18d8be353ed3480476f032475e7c233eff7371d5')
# then
self.assertEqual(actual_revision, {
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'author': {
'name': 'bill & boule',
'email': 'bill@boule.org',
},
'committer': {
'name': 'boule & bill',
'email': 'boule@bill.org',
},
'message': 'elegant fix for bug 31415957',
'date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'date_offset': 0,
'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
})
mock_backend.revision_get.assert_called_with(
hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'))
- @patch('swh.web.ui.service.query')
- @istest
- def lookup_revision_log_bad_input(self, mock_query):
- # given
- mock_query.parse_hash.return_value = ('sha1_git', 'do not care')
-
- # when
- with self.assertRaises(BadInputExc) as cm:
- service.lookup_revision_log('123')
- self.assertIn('Only sha1_git is supported.', cm.exception.args[0])
-
- mock_query.parse_hash.assert_called_with('123')
-
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_log(self, mock_backend):
# given
stub_revision_log = [{
'id': hex_to_hash('28d8be353ed3480476f032475e7c233eff7371d5'),
'directory': hex_to_hash(
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'),
'author': {
'name': b'bill & boule',
'email': b'bill@boule.org',
},
'committer': {
'name': b'boule & bill',
'email': b'boule@bill.org',
},
'message': b'elegant fix for bug 31415957',
'date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'date_offset': 0,
'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
}]
mock_backend.revision_log = MagicMock(return_value=stub_revision_log)
# when
actual_revision = service.lookup_revision_log(
'abcdbe353ed3480476f032475e7c233eff7371d5')
# then
self.assertEqual(list(actual_revision), [{
'id': '28d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'author': {
'name': 'bill & boule',
'email': 'bill@boule.org',
},
'committer': {
'name': 'boule & bill',
'email': 'boule@bill.org',
},
'message': 'elegant fix for bug 31415957',
'date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'date_offset': 0,
'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
}])
mock_backend.revision_log.assert_called_with(
hex_to_hash('abcdbe353ed3480476f032475e7c233eff7371d5'), 100)
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_raw_not_found(self, mock_backend):
# given
mock_backend.content_find = MagicMock(return_value=None)
# when
actual_content = service.lookup_content_raw(
'sha1:18d8be353ed3480476f032475e7c233eff7371d5')
# then
self.assertIsNone(actual_content)
mock_backend.content_find.assert_called_with(
'sha1', hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_raw(self, mock_backend):
# given
mock_backend.content_find = MagicMock(return_value={
'sha1': '18d8be353ed3480476f032475e7c233eff7371d5',
})
mock_backend.content_get = MagicMock(return_value={
'data': b'binary data'})
# when
actual_content = service.lookup_content_raw(
'sha256:39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926')
# then
self.assertEquals(actual_content, {'data': b'binary data'})
mock_backend.content_find.assert_called_once_with(
'sha256', hex_to_hash('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926'))
mock_backend.content_get.assert_called_once_with(
'18d8be353ed3480476f032475e7c233eff7371d5')
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_not_found(self, mock_backend):
# given
mock_backend.content_find = MagicMock(return_value=None)
# when
actual_content = service.lookup_content(
'sha1:18d8be353ed3480476f032475e7c233eff7371d5')
# then
self.assertIsNone(actual_content)
mock_backend.content_find.assert_called_with(
'sha1', hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_with_sha1(self, mock_backend):
# given
mock_backend.content_find = MagicMock(return_value={
'sha1': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'),
'sha256': hex_to_hash('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926'),
'sha1_git': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'length': 190,
'status': 'hidden',
})
# when
actual_content = service.lookup_content(
'sha1:18d8be353ed3480476f032475e7c233eff7371d5')
# then
self.assertEqual(actual_content, {
'sha1': '18d8be353ed3480476f032475e7c233eff7371d5',
'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d274'
'7d3bf96c926',
'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'length': 190,
'status': 'absent',
})
mock_backend.content_find.assert_called_with(
'sha1', hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_with_sha256(self, mock_backend):
# given
mock_backend.content_find = MagicMock(return_value={
'sha1': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'),
'sha256': hex_to_hash('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926'),
'sha1_git': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'length': 360,
'status': 'visible',
})
# when
actual_content = service.lookup_content(
'sha256:39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926')
# then
self.assertEqual(actual_content, {
'sha1': '18d8be353ed3480476f032475e7c233eff7371d5',
'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d274'
'7d3bf96c926',
'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'length': 360,
'status': 'visible',
})
mock_backend.content_find.assert_called_with(
'sha256', hex_to_hash('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_person(self, mock_backend):
# given
mock_backend.person_get = MagicMock(return_value={
'id': 'person_id',
'name': b'some_name',
'email': b'some-email',
})
# when
actual_person = service.lookup_person('person_id')
# then
self.assertEqual(actual_person, {
'id': 'person_id',
'name': 'some_name',
'email': 'some-email',
})
mock_backend.person_get.assert_called_with('person_id')
@patch('swh.web.ui.service.backend')
@istest
def lookup_directory_bad_checksum(self, mock_backend):
# given
mock_backend.directory_get = MagicMock()
# when
with self.assertRaises(BadInputExc):
service.lookup_directory('directory_id')
# then
mock_backend.directory_get.called = False
@patch('swh.web.ui.service.backend')
@istest
def lookup_directory(self, mock_backend):
# given
stub_dir_entries = [{
'sha1': hex_to_hash('5c6f0e2750f48fa0bd0c4cf5976ba0b9e0'
'2ebda5'),
'sha256': hex_to_hash('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926'),
'sha1_git': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'target': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'dir_id': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'name': b'bob',
'type': 10,
}]
expected_dir_entries = [{
'sha1': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5',
'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d2747'
'd3bf96c926',
'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'target': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'dir_id': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'name': 'bob',
'type': 10,
}]
mock_backend.directory_get = MagicMock(
return_value=stub_dir_entries)
# when
actual_directory = service.lookup_directory(
'40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
# then
self.assertEqual(list(actual_directory), expected_dir_entries)
mock_backend.directory_get.assert_called_with(
hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_by_nothing_found(self, mock_backend):
# given
mock_backend.revision_get_by.return_value = None
# when
actual_revisions = service.lookup_revision_by(1)
# then
self.assertIsNone(actual_revisions)
mock_backend.revision_get_by(1, 'master', None)
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_by(self, mock_backend):
# given
stub_rev = {
'id': hex_to_hash('28d8be353ed3480476f032475e7c233eff7371d5'),
'directory': hex_to_hash(
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'),
'author': {
'name': b'ynot',
'email': b'ynot@blah.org',
},
'committer': {
'name': b'ynot',
'email': b'ynot@blah.org',
},
'message': b'elegant solution 31415',
'date': datetime.datetime(2016, 1, 17, 11, 23, 54),
'date_offset': 0,
'committer_date': datetime.datetime(2016, 1, 17, 11, 23, 54),
'committer_date_offset': 0,
}
expected_rev = {
'id': '28d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'author': {
'name': 'ynot',
'email': 'ynot@blah.org',
},
'committer': {
'name': 'ynot',
'email': 'ynot@blah.org',
},
'message': 'elegant solution 31415',
'date': datetime.datetime(2016, 1, 17, 11, 23, 54),
'date_offset': 0,
'committer_date': datetime.datetime(2016, 1, 17, 11, 23, 54),
'committer_date_offset': 0,
}
mock_backend.revision_get_by.return_value = stub_rev
# when
actual_revision = service.lookup_revision_by(10, 'master2', 'some-ts')
# then
self.assertEquals(actual_revision, expected_rev)
mock_backend.revision_get_by(1, 'master2', 'some-ts')
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sat, Jun 21, 7:23 PM (2 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3251862
Attached To
rDWAPPS Web applications
Event Timeline
Log In to Comment