Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/web/ui/query.py b/swh/web/ui/query.py
index ac795517..5de6f231 100644
--- a/swh/web/ui/query.py
+++ b/swh/web/ui/query.py
@@ -1,58 +1,87 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import re
from swh.core.hashutil import ALGORITHMS, hex_to_hash
from swh.web.ui.exc import BadInputExc
SHA256_RE = re.compile(r'^[0-9a-f]{64}$', re.IGNORECASE)
SHA1_RE = re.compile(r'^[0-9a-f]{40}$', re.IGNORECASE)
def parse_hash(q):
"""Detect the hash type of a user submitted query string.
Args:
query string with the following format: "[HASH_TYPE:]HEX_CHECKSUM",
where HASH_TYPE is optional, defaults to "sha1", and can be one of
swh.core.hashutil.ALGORITHMS
Returns:
A pair (hash_algorithm, byte hash value)
Raises:
ValueError if the given query string does not correspond to a valid
hash value
"""
def guess_algo(q):
if SHA1_RE.match(q):
return 'sha1'
elif SHA256_RE.match(q):
return 'sha256'
else:
raise BadInputExc('Invalid checksum query string %s' % q)
def check_algo(algo, hex):
if (algo in {'sha1', 'sha1_git'} and not SHA1_RE.match(hex)) \
or (algo == 'sha256' and not SHA256_RE.match(hex)):
raise BadInputExc('Invalid hash %s for algorithm %s' % (hex, algo))
parts = q.split(':')
if len(parts) > 2:
raise BadInputExc('Invalid checksum query string %s' % q)
elif len(parts) == 1:
parts = (guess_algo(q), q)
elif len(parts) == 2:
check_algo(parts[0], parts[1])
algo = parts[0]
if algo not in ALGORITHMS:
raise BadInputExc('Unknown hash algorithm %s' % algo)
return (algo, hex_to_hash(parts[1]))
+
+
+def parse_hash_with_algorithms_or_throws(q, accepted_algo, error_msg):
+ """Parse a query but only accepts accepted_algo.
+ Otherwise, raise the exception with message error_msg.
+
+ Args:
+ - q: query string with the following format: "[HASH_TYPE:]HEX_CHECKSUM"
+ where HASH_TYPE is optional, defaults to "sha1", and can be one of
+ swh.core.hashutil.ALGORITHMS.
+ - accepted_algo: array of strings representing the names of accepted
+ algorithms.
+ - error_msg: error message to raise as BadInputExc if the algo of
+ the query does not match.
+
+ Returns:
+ A pair (hash_algorithm, byte hash value)
+
+ Raises:
+ BadInputExc when the inputs is invalid or does not
+ validate the accepted algorithms.
+
+ """
+ algo, hash = parse_hash(q)
+
+ if algo not in accepted_algo:
+ raise BadInputExc(error_msg)
+
+ return (algo, hash)
diff --git a/swh/web/ui/service.py b/swh/web/ui/service.py
index 1b41d8ab..2e4c6d00 100644
--- a/swh/web/ui/service.py
+++ b/swh/web/ui/service.py
@@ -1,370 +1,376 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import defaultdict
from swh.core import hashutil
from swh.web.ui import converters, query, upload, backend
-from swh.web.ui.exc import BadInputExc, NotFoundExc
+from swh.web.ui.exc import NotFoundExc
def hash_and_search(filepath):
"""Hash the filepath's content as sha1, then search in storage if
it exists.
Args:
Filepath of the file to hash and search.
Returns:
Tuple (hex sha1, found as True or false).
The found boolean, according to whether the sha1 of the file
is present or not.
"""
h = hashutil.hashfile(filepath)
c = backend.content_find('sha1', h['sha1'])
if c:
r = converters.from_content(c)
r['found'] = True
return r
else:
return {'sha1': hashutil.hash_to_hex(h['sha1']),
'found': False}
def upload_and_search(file):
"""Upload a file and compute its hash.
"""
tmpdir, filename, filepath = upload.save_in_upload_folder(file)
res = {'filename': filename}
try:
content = hash_and_search(filepath)
res.update(content)
return res
finally:
# clean up
if tmpdir:
upload.cleanup(tmpdir)
def lookup_hash(q):
"""Checks if the storage contains a given content checksum
Args: query string of the form <hash_algo:hash>
Returns: Dict with key found to True or False, according to
whether the checksum is present or not
"""
- (algo, hash) = query.parse_hash(q)
+ algo, hash = query.parse_hash(q)
found = backend.content_find(algo, hash)
return {'found': found,
'algo': algo}
def lookup_hash_origin(q):
"""Return information about the checksum contained in the query q.
Args: query string of the form <hash_algo:hash>
Returns:
origin as dictionary if found for the given content.
"""
- algo, h = query.parse_hash(q)
- origin = backend.content_find_occurrence(algo, h)
+ algo, hash = query.parse_hash(q)
+ origin = backend.content_find_occurrence(algo, hash)
return converters.from_origin(origin)
def lookup_origin(origin_id):
"""Return information about the origin with id origin_id.
Args:
origin_id as string
Returns:
origin information as dict.
"""
return backend.origin_get(origin_id)
def lookup_person(person_id):
"""Return information about the person with id person_id.
Args:
person_id as string
Returns:
person information as dict.
"""
person = backend.person_get(person_id)
return converters.from_person(person)
def lookup_directory(sha1_git):
"""Return information about the directory with id sha1_git.
Args:
sha1_git as string
Returns:
directory information as dict.
"""
- algo, sha1_git_bin = query.parse_hash(sha1_git)
- if algo != 'sha1': # HACK: sha1_git really but they are both sha1...
- raise BadInputExc('Only sha1_git is supported.')
+ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws(
+ sha1_git,
+ ['sha1'], # HACK: sha1_git really
+ 'Only sha1_git is supported.')
directory_entries = backend.directory_get(sha1_git_bin)
return map(converters.from_directory_entry, directory_entries)
def lookup_release(release_sha1_git):
"""Return information about the release with sha1 release_sha1_git.
Args:
release_sha1_git: The release's sha1 as hexadecimal
Returns:
Release information as dict.
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
- algo, sha1_git_bin = query.parse_hash(release_sha1_git)
- if algo != 'sha1': # HACK: sha1_git really but they are both sha1...
- raise BadInputExc('Only sha1_git is supported.')
-
+ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws(
+ release_sha1_git,
+ ['sha1'],
+ 'Only sha1_git is supported.')
res = backend.release_get(sha1_git_bin)
return converters.from_release(res)
def lookup_revision(rev_sha1_git):
"""Return information about the revision with sha1 revision_sha1_git.
Args:
revision_sha1_git: The revision's sha1 as hexadecimal
Returns:
Revision information as dict.
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
- algo, sha1_git_bin = query.parse_hash(rev_sha1_git)
- if algo != 'sha1': # HACK: sha1_git really but they are both sha1...
- raise BadInputExc('Only sha1_git is supported.')
+ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws(
+ rev_sha1_git,
+ ['sha1'],
+ 'Only sha1_git is supported.')
res = backend.revision_get(sha1_git_bin)
return converters.from_revision(res)
def lookup_revision_by(origin_id,
branch_name="refs/heads/master",
timestamp=None):
"""Lookup revisions by origin_id, branch_name and timestamp.
If:
- branch_name is not provided, lookup using 'refs/heads/master' as default.
- ts is not provided, use the most recent
Yields:
The revisions matching the criterions.
"""
res = backend.revision_get_by(origin_id, branch_name, timestamp)
return converters.from_revision(res)
def lookup_revision_log(rev_sha1_git, limit=100):
"""Return information about the revision with sha1 revision_sha1_git.
Args:
revision_sha1_git: The revision's sha1 as hexadecimal
limit: the maximum number of revisions returned
Returns:
Revision information as dict.
Raises:
ValueError if the identifier provided is not of sha1 nature.
"""
- algo, bin_sha1 = query.parse_hash(rev_sha1_git)
- if algo != 'sha1': # HACK: sha1_git really but they are both sha1...
- raise BadInputExc('Only sha1_git is supported.')
+ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws(
+ rev_sha1_git,
+ ['sha1'],
+ 'Only sha1_git is supported.')
- revision_entries = backend.revision_log(bin_sha1, limit)
+ revision_entries = backend.revision_log(sha1_git_bin, limit)
return map(converters.from_revision, revision_entries)
def lookup_revision_with_context(sha1_git_root, sha1_git, limit=100):
"""Return information about revision sha1_git, limited to the
sub-graph of all transitive parents of sha1_git_root.
In other words, sha1_git is an ancestor of sha1_git_root.
Args:
sha1_git_root: latest revision of the browsed history
sha1_git: one of sha1_git_root's ancestors
limit: limit the lookup to 100 revisions back
Returns:
Information on sha1_git if it is an ancestor of sha1_git_root
including children leading to sha1_git_root
Raises:
BadInputExc in case of unknown algo_hash or bad hash
NotFoundExc if either revision is not found or if sha1_git is not an
ancestor of sha1_git_root
"""
- algo, sha1_git_bin = query.parse_hash(sha1_git)
- if algo != 'sha1': # HACK: sha1_git really but they are both sha1...
- raise BadInputExc('Only sha1_git is supported.')
+ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws(
+ sha1_git,
+ ['sha1'],
+ 'Only sha1_git is supported.')
- algo, sha1_git_root_bin = query.parse_hash(sha1_git_root)
- if algo != 'sha1': # HACK: sha1_git really but they are both sha1...
- raise BadInputExc('Only sha1_git is supported.')
+ _, sha1_git_root_bin = query.parse_hash_with_algorithms_or_throws(
+ sha1_git_root,
+ ['sha1'],
+ 'Only sha1_git is supported.')
revision = backend.revision_get(sha1_git_bin)
if not revision:
raise NotFoundExc('Revision %s not found' % sha1_git)
revision_root = backend.revision_get(sha1_git_root_bin)
if not revision_root:
raise NotFoundExc('Revision %s not found' % sha1_git_root)
revision_log = backend.revision_log(sha1_git_root_bin, limit)
parents = {}
children = defaultdict(list)
for rev in revision_log:
rev_id = rev['id']
parents[rev_id] = []
for parent_id in rev['parents']:
parents[rev_id].append(parent_id)
children[parent_id].append(rev_id)
if revision['id'] not in parents:
raise NotFoundExc('Revision %s is not an ancestor of %s' %
(sha1_git, sha1_git_root))
revision['children'] = children[revision['id']]
return converters.from_revision(revision)
def _lookup_name_in(directory_entries, name):
"""Given a name and a list of directory entries, return the
corresponding entry."""
bname = name.encode('utf-8')
res = list(filter(lambda e: e['name'] == bname, directory_entries))
if not res:
return None
return res[0]
def lookup_directory_with_revision(sha1_git, dir_path=None):
"""Return information on directory pointed by revision with sha1_git.
If dir_path is not provided, display top level directory.
Otherwise, display the directory pointed by dir_path (if it exists).
Args:
sha1_git: revision's hash.
dir_path: optional directory pointed to by that revision.
Returns:
Information on the directory pointed to by that revision.
Raises:
BadInputExc in case of unknown algo_hash or bad hash.
NotFoundExc either if the revision is not found or the path referenced
does not exist.
NotImplementedError in case of dir_path exists but do not reference a
type 'dir' or 'file'.
"""
- algo, sha1_git_bin = query.parse_hash(sha1_git)
- if algo != 'sha1': # HACK: sha1_git really but they are both sha1...
- raise BadInputExc('Only sha1_git is supported.')
+ _, sha1_git_bin = query.parse_hash_with_algorithms_or_throws(
+ sha1_git,
+ ['sha1'],
+ 'Only sha1_git is supported.')
revision = backend.revision_get(sha1_git_bin)
if not revision:
raise NotFoundExc('Revision %s not found' % sha1_git)
dir_sha1_git_bin = revision['directory']
if dir_path:
directory_entries = backend.directory_get(dir_sha1_git_bin,
recursive=True)
entity = _lookup_name_in(directory_entries, dir_path)
if not entity:
raise NotFoundExc(
"Directory or File '%s' pointed to by revision %s not found"
% (dir_path, sha1_git))
else:
entity = {'type': 'dir', 'target': dir_sha1_git_bin}
if entity['type'] == 'dir':
directory_entries = backend.directory_get(entity['target'])
return {'type': 'dir',
'content': map(converters.from_directory_entry,
directory_entries)}
elif entity['type'] == 'file': # content
content = backend.content_find('sha1_git', entity['target'])
return {'type': 'file',
'content': converters.from_content(content)}
else:
raise NotImplementedError('Entity of type %s not implemented.'
% entity['type'])
def lookup_content(q):
"""Lookup the content designed by q.
Args:
q: The release's sha1 as hexadecimal
"""
- (algo, hash) = query.parse_hash(q)
+ algo, hash = query.parse_hash(q)
c = backend.content_find(algo, hash)
return converters.from_content(c)
def lookup_content_raw(q):
"""Lookup the content designed by q.
Args:
q: query string of the form <hash_algo:hash>
Returns:
dict with 'sha1' and 'data' keys.
data representing its raw data decoded.
"""
- (algo, hash) = query.parse_hash(q)
+ algo, hash = query.parse_hash(q)
c = backend.content_find(algo, hash)
if not c:
return None
content = backend.content_get(c['sha1'])
return converters.from_content(content)
def stat_counters():
"""Return the stat counters for Software Heritage
Returns:
A dict mapping textual labels to integer values.
"""
return backend.stat_counters()
diff --git a/swh/web/ui/tests/test_query.py b/swh/web/ui/tests/test_query.py
index 53dfa039..b4e2aa81 100644
--- a/swh/web/ui/tests/test_query.py
+++ b/swh/web/ui/tests/test_query.py
@@ -1,75 +1,125 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import unittest
+from unittest.mock import patch
from nose.tools import istest
from swh.core import hashutil
from swh.web.ui import query
from swh.web.ui.exc import BadInputExc
class QueryTestCase(unittest.TestCase):
@istest
def parse_hash_malformed_query_with_more_than_2_parts(self):
with self.assertRaises(BadInputExc):
query.parse_hash('sha1:1234567890987654:other-stuff')
@istest
def parse_hash_guess_sha1(self):
h = 'f1d2d2f924e986ac86fdf7b36c94bcdf32beec15'
r = query.parse_hash(h)
self.assertEquals(r, ('sha1', hashutil.hex_to_hash(h)))
@istest
def parse_hash_guess_sha256(self):
h = '084C799CD551DD1D8D5C5F9A5D593B2' \
'E931F5E36122ee5c793c1d08a19839cc0'
r = query.parse_hash(h)
self.assertEquals(r, ('sha256', hashutil.hex_to_hash(h)))
@istest
def parse_hash_guess_algo_malformed_hash(self):
with self.assertRaises(BadInputExc):
query.parse_hash('1234567890987654')
@istest
def parse_hash_check_sha1(self):
h = 'f1d2d2f924e986ac86fdf7b36c94bcdf32beec15'
r = query.parse_hash('sha1:' + h)
self.assertEquals(r, ('sha1', hashutil.hex_to_hash(h)))
@istest
def parse_hash_check_sha1_git(self):
h = 'e1d2d2f924e986ac86fdf7b36c94bcdf32beec15'
r = query.parse_hash('sha1_git:' + h)
self.assertEquals(r, ('sha1_git', hashutil.hex_to_hash(h)))
@istest
def parse_hash_check_sha256(self):
h = '084C799CD551DD1D8D5C5F9A5D593B2E931F5E36122ee5c793c1d08a19839cc0'
r = query.parse_hash('sha256:' + h)
self.assertEquals(r, ('sha256', hashutil.hex_to_hash(h)))
@istest
def parse_hash_check_algo_malformed_sha1_hash(self):
with self.assertRaises(BadInputExc):
query.parse_hash('sha1:1234567890987654')
@istest
def parse_hash_check_algo_malformed_sha1_git_hash(self):
with self.assertRaises(BadInputExc):
query.parse_hash('sha1_git:1234567890987654')
@istest
def parse_hash_check_algo_malformed_sha256_hash(self):
with self.assertRaises(BadInputExc):
query.parse_hash('sha256:1234567890987654')
@istest
def parse_hash_check_algo_unknown_one(self):
with self.assertRaises(BadInputExc):
query.parse_hash('sha2:1234567890987654')
+
+ @patch('swh.web.ui.query.parse_hash')
+ @istest
+ def parse_hash_with_algorithms_or_throws_bad_query(self, mock_hash):
+ # given
+ mock_hash.side_effect = BadInputExc('Error input')
+
+ # when
+ with self.assertRaises(BadInputExc) as cm:
+ query.parse_hash_with_algorithms_or_throws(
+ 'sha1:blah',
+ ['sha1'],
+ 'useless error message for this use case')
+ self.assertIn('Error input', cm.exception.args[0])
+
+ mock_hash.assert_called_once_with('sha1:blah')
+
+ @patch('swh.web.ui.query.parse_hash')
+ @istest
+ def parse_hash_with_algorithms_or_throws_bad_algo(self, mock_hash):
+ # given
+ mock_hash.return_value = 'sha1', '123'
+
+ # when
+ with self.assertRaises(BadInputExc) as cm:
+ query.parse_hash_with_algorithms_or_throws(
+ 'sha1:431',
+ ['sha1_git'],
+ 'Only sha1_git!')
+ self.assertIn('Only sha1_git!', cm.exception.args[0])
+
+ mock_hash.assert_called_once_with('sha1:431')
+
+ @patch('swh.web.ui.query.parse_hash')
+ @istest
+ def parse_hash_with_algorithms(self, mock_hash):
+ # given
+ mock_hash.return_value = ('sha256', b'123')
+
+ # when
+ algo, sha = query.parse_hash_with_algorithms_or_throws(
+ 'sha256:123',
+ ['sha256', 'sha1_git'],
+ 'useless error message for this use case')
+
+ self.assertEquals(algo, 'sha256')
+ self.assertEquals(sha, b'123')
+
+ mock_hash.assert_called_once_with('sha256:123')
diff --git a/swh/web/ui/tests/test_service.py b/swh/web/ui/tests/test_service.py
index fffe0343..20c33303 100644
--- a/swh/web/ui/tests/test_service.py
+++ b/swh/web/ui/tests/test_service.py
@@ -1,1133 +1,1110 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
from nose.tools import istest
from unittest.mock import MagicMock, patch, call
from swh.core.hashutil import hex_to_hash, hash_to_hex
from swh.web.ui import service
from swh.web.ui.exc import BadInputExc, NotFoundExc
from swh.web.ui.tests import test_app
class ServiceTestCase(test_app.SWHApiTestCase):
@patch('swh.web.ui.service.backend')
@istest
def lookup_hash_does_not_exist(self, mock_backend):
# given
mock_backend.content_find = MagicMock(return_value=None)
# when
actual_lookup = service.lookup_hash(
'sha1_git:123caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEquals({'found': None,
'algo': 'sha1_git'}, actual_lookup)
# check the function has been called with parameters
mock_backend.content_find.assert_called_with(
'sha1_git',
hex_to_hash('123caf10e9535160d90e874b45aa426de762f19f'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_hash_exist(self, mock_backend):
# given
stub_content = {
'sha1': hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')
}
mock_backend.content_find = MagicMock(return_value=stub_content)
# when
actual_lookup = service.lookup_hash(
'sha1:456caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEquals({'found': stub_content,
'algo': 'sha1'}, actual_lookup)
mock_backend.content_find.assert_called_with(
'sha1',
hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f'),
)
@patch('swh.web.ui.service.backend')
@istest
def lookup_hash_origin(self, mock_backend):
# given
mock_backend.content_find_occurrence = MagicMock(return_value={
'origin_type': 'sftp',
'origin_url': 'sftp://ftp.gnu.org/gnu/octave',
'branch': 'octavio-3.4.0.tar.gz',
'revision': b'\xb0L\xaf\x10\xe9SQ`\xd9\x0e\x87KE\xaaBm\xe7b\xf1\x9f', # noqa
'path': b'octavio-3.4.0/doc/interpreter/octave.html/doc_002dS_005fISREG.html' # noqa
})
expected_origin = {
'origin_type': 'sftp',
'origin_url': 'sftp://ftp.gnu.org/gnu/octave',
'branch': 'octavio-3.4.0.tar.gz',
'revision': 'b04caf10e9535160d90e874b45aa426de762f19f',
'path': 'octavio-3.4.0/doc/interpreter/octave.html/doc'
'_002dS_005fISREG.html'
}
# when
actual_origin = service.lookup_hash_origin(
'sha1_git:456caf10e9535160d90e874b45aa426de762f19f')
# then
self.assertEqual(actual_origin, expected_origin)
mock_backend.content_find_occurrence.assert_called_with(
'sha1_git',
hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f'))
@patch('swh.web.ui.service.backend')
@istest
def stat_counters(self, mock_backend):
# given
input_stats = {
"content": 1770830,
"directory": 211683,
"directory_entry_dir": 209167,
"directory_entry_file": 1807094,
"directory_entry_rev": 0,
"entity": 0,
"entity_history": 0,
"occurrence": 0,
"occurrence_history": 19600,
"origin": 1096,
"person": 0,
"release": 8584,
"revision": 7792,
"revision_history": 0,
"skipped_content": 0
}
mock_backend.stat_counters = MagicMock(return_value=input_stats)
# when
actual_stats = service.stat_counters()
# then
expected_stats = input_stats
self.assertEqual(actual_stats, expected_stats)
mock_backend.stat_counters.assert_called_with()
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.hashutil')
@istest
def hash_and_search(self, mock_hashutil, mock_backend):
# given
bhash = hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')
mock_hashutil.hashfile.return_value = {'sha1': bhash}
mock_backend.content_find = MagicMock(return_value={
'sha1': bhash,
'sha1_git': bhash,
})
# when
actual_content = service.hash_and_search('/some/path')
# then
self.assertEqual(actual_content, {
'sha1': '456caf10e9535160d90e874b45aa426de762f19f',
'sha1_git': '456caf10e9535160d90e874b45aa426de762f19f',
'found': True,
})
mock_hashutil.hashfile.assert_called_once_with('/some/path')
mock_backend.content_find.assert_called_once_with('sha1', bhash)
@patch('swh.web.ui.service.hashutil')
@istest
def hash_and_search_not_found(self, mock_hashutil):
# given
bhash = hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')
mock_hashutil.hashfile.return_value = {'sha1': bhash}
mock_hashutil.hash_to_hex = MagicMock(
return_value='456caf10e9535160d90e874b45aa426de762f19f')
self.storage.content_find = MagicMock(return_value=None)
# when
actual_content = service.hash_and_search('/some/path')
# then
self.assertEqual(actual_content, {
'sha1': '456caf10e9535160d90e874b45aa426de762f19f',
'found': False,
})
mock_hashutil.hashfile.assert_called_once_with('/some/path')
self.storage.content_find.assert_called_once_with({'sha1': bhash})
mock_hashutil.hash_to_hex.assert_called_once_with(bhash)
@patch('swh.web.ui.service.upload')
@istest
def test_upload_and_search(self, mock_upload):
mock_upload.save_in_upload_folder.return_value = (
'/tmp/dir', 'some-filename', '/tmp/dir/path/some-filename')
service.hash_and_search = MagicMock(side_effect=lambda filepath:
{'sha1': 'blah',
'found': True})
mock_upload.cleanup.return_value = None
file = MagicMock(filename='some-filename')
# when
actual_res = service.upload_and_search(file)
# then
self.assertEqual(actual_res, {
'filename': 'some-filename',
'sha1': 'blah',
'found': True})
mock_upload.save_in_upload_folder.assert_called_with(file)
mock_upload.cleanup.assert_called_with('/tmp/dir')
service.hash_and_search.assert_called_once_with(
'/tmp/dir/path/some-filename')
@patch('swh.web.ui.service.backend')
@istest
def lookup_origin(self, mock_backend):
# given
mock_backend.origin_get = MagicMock(return_value={
'id': 'origin-id',
'lister': 'uuid-lister',
'project': 'uuid-project',
'url': 'ftp://some/url/to/origin',
'type': 'ftp'})
# when
actual_origin = service.lookup_origin('origin-id')
# then
self.assertEqual(actual_origin, {'id': 'origin-id',
'lister': 'uuid-lister',
'project': 'uuid-project',
'url': 'ftp://some/url/to/origin',
'type': 'ftp'})
mock_backend.origin_get.assert_called_with('origin-id')
@patch('swh.web.ui.service.backend')
@istest
def lookup_release_ko_id_checksum_not_ok_because_not_a_sha1(self,
mock_backend):
# given
mock_backend.release_get = MagicMock()
with self.assertRaises(BadInputExc) as cm:
# when
service.lookup_release('not-a-sha1')
self.assertIn('invalid checksum', cm.exception.args[0])
mock_backend.release_get.called = False
@patch('swh.web.ui.service.backend')
@istest
def lookup_release_ko_id_checksum_ok_but_not_a_sha1(self, mock_backend):
# given
mock_backend.release_get = MagicMock()
# when
with self.assertRaises(BadInputExc) as cm:
service.lookup_release(
'13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f62d4daf5'
'1aea892abe')
self.assertIn('sha1_git supported', cm.exception.args[0])
mock_backend.release_get.called = False
@patch('swh.web.ui.service.backend')
@istest
def lookup_release(self, mock_backend):
# given
mock_backend.release_get = MagicMock(return_value={
'id': hex_to_hash('65a55bbdf3629f916219feb3dcc7393ded1bc8db'),
'target': None,
'date': datetime.datetime(2015, 1, 1, 22, 0, 0,
tzinfo=datetime.timezone.utc),
'name': b'v0.0.1',
'message': b'synthetic release',
'synthetic': True,
})
# when
actual_release = service.lookup_release(
'65a55bbdf3629f916219feb3dcc7393ded1bc8db')
# then
self.assertEqual(actual_release, {
'id': '65a55bbdf3629f916219feb3dcc7393ded1bc8db',
'target': None,
'date': datetime.datetime(2015, 1, 1, 22, 0, 0,
tzinfo=datetime.timezone.utc),
'name': 'v0.0.1',
'message': 'synthetic release',
'synthetic': True,
})
mock_backend.release_get.assert_called_with(
hex_to_hash('65a55bbdf3629f916219feb3dcc7393ded1bc8db'))
@istest
def lookup_revision_with_context_ko_not_a_sha1_1(self):
# given
sha1_git = '13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f62d4' \
'daf51aea892abe'
sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
# when
with self.assertRaises(BadInputExc) as cm:
service.lookup_revision_with_context(sha1_git_root, sha1_git)
self.assertIn('Only sha1_git is supported', cm.exception.args[0])
@istest
def lookup_revision_with_context_ko_not_a_sha1_2(self):
# given
sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
sha1_git = '13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f6' \
'2d4daf51aea892abe'
# when
with self.assertRaises(BadInputExc) as cm:
service.lookup_revision_with_context(sha1_git_root, sha1_git)
self.assertIn('Only sha1_git is supported', cm.exception.args[0])
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_with_context_ko_sha1_git_does_not_exist(
self,
mock_backend):
# given
sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
sha1_git = '777777bdf3629f916219feb3dcc7393ded1bc8db'
sha1_git_bin = hex_to_hash(sha1_git)
mock_backend.revision_get.return_value = None
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_revision_with_context(sha1_git_root, sha1_git)
self.assertIn('Revision 777777bdf3629f916219feb3dcc7393ded1bc8db'
' not found', cm.exception.args[0])
mock_backend.revision_get.assert_called_once_with(
sha1_git_bin)
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_with_context_ko_root_sha1_git_does_not_exist(
self,
mock_backend):
# given
sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db'
sha1_git = '777777bdf3629f916219feb3dcc7393ded1bc8db'
sha1_git_root_bin = hex_to_hash(sha1_git_root)
sha1_git_bin = hex_to_hash(sha1_git)
mock_backend.revision_get.side_effect = ['foo', None]
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_revision_with_context(sha1_git_root, sha1_git)
self.assertIn('Revision 65a55bbdf3629f916219feb3dcc7393ded1bc8db'
' not found', cm.exception.args[0])
mock_backend.revision_get.assert_has_calls([call(sha1_git_bin),
call(sha1_git_root_bin)])
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_revision_with_context(self, mock_query, mock_backend):
# given
sha1_git_root = '666'
sha1_git = '883'
sha1_git_root_bin = b'666'
sha1_git_bin = b'883'
sha1_git_root_dict = {
'id': sha1_git_root_bin,
'parents': [b'999'],
}
sha1_git_dict = {
'id': sha1_git_bin,
'parents': [],
'directory': b'278',
}
stub_revisions = [
sha1_git_root_dict,
{
'id': b'999',
'parents': [b'777', b'883', b'888'],
},
{
'id': b'777',
'parents': [b'883'],
},
sha1_git_dict,
{
'id': b'888',
'parents': [b'889'],
},
{
'id': b'889',
'parents': [],
},
]
# inputs ok
- mock_query.parse_hash.side_effect = [
+ mock_query.parse_hash_with_algorithms_or_throws.side_effect = [
('sha1', sha1_git_bin),
('sha1', sha1_git_root_bin)
]
# lookup revision first 883, then 666 (both exists)
mock_backend.revision_get.side_effect = [
sha1_git_dict,
sha1_git_root_dict
]
mock_backend.revision_log = MagicMock(
return_value=stub_revisions)
# when
actual_revision = service.lookup_revision_with_context(
sha1_git_root,
sha1_git)
# then
self.assertEquals(actual_revision, {
'id': hash_to_hex(sha1_git_bin),
'parents': [],
'children': [hash_to_hex(b'999'), hash_to_hex(b'777')],
'directory': hash_to_hex(b'278'),
})
- mock_query.parse_hash.assert_has_calls([call(sha1_git),
- call(sha1_git_root)])
+ mock_query.parse_hash_with_algorithms_or_throws.assert_has_calls(
+ [call(sha1_git, ['sha1'], 'Only sha1_git is supported.'),
+ call(sha1_git_root, ['sha1'], 'Only sha1_git is supported.')])
mock_backend.revision_log.assert_called_with(
sha1_git_root_bin, 100)
@istest
def lookup_name_in(self):
file0 = {
"type": "file",
"sha1": b"8e295bec402303cf2bd21b68f1f1fb0692f9c00a",
"dir_id": b"93857db1982141c94d3ca05b16ef6bd41d9da2ef",
"name": b"Entries",
"perms": 100644,
"target": b"55b983eaed0e68f8402c4ef891f0fcbcc80ece74"
}
dir1 = {
"type": "dir",
"sha1": b"d4c7a6c81832350d05c3f76f5f193ee62a2e6a16",
"dir_id": b"93857db1982141c94d3ca05b16ef6bd41d9da2ef",
"name": b"doc",
"perms": 40000,
"target": b"5d71ad3d16c0aaf5e0c3f4a4241020d7962c0e43"
}
dir2 = {
"type": "dir",
"sha1": b"cvbfdrc81832350d05c3f76f5f193ee62a2e6a16",
"dir_id": b"789012b1982141c94d3ca05b16ef6bd41d9da2ef",
"name": b"generated",
"perms": 40000,
"target": b"1234563d16c0aaf5e0c3f4a4241020d7962c0e43"
}
dir_entries = [file0, dir1, dir2]
for e in [file0, dir1, dir2]:
actual_entity = service._lookup_name_in(dir_entries,
e['name'].decode('utf-8'))
self.assertEquals(actual_entity, e)
actual_entity = service._lookup_name_in(dir_entries, "nothing")
self.assertIsNone(actual_entity)
- @istest
- def lookup_directory_with_revision_bad_input(self):
- with self.assertRaises(BadInputExc) as cm:
- service.lookup_directory_with_revision('123', 'some/path')
- self.assertIn('Only sha1_git is supported', cm.exception.args[0])
-
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_revision_not_found(self,
mock_query,
mock_backend):
# given
- mock_query.parse_hash.return_value = 'sha1', b'123'
+ mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
+ b'123')
mock_backend.revision_get.return_value = None
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_directory_with_revision('123')
self.assertIn('Revision 123 not found', cm.exception.args[0])
- mock_query.parse_hash.assert_called_once_with('123')
+ mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
+ ('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_revision_without_path(self,
mock_query,
mock_backend):
# given
- mock_query.parse_hash.return_value = 'sha1', b'123'
+ mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
+ b'123')
dir_id = b'dir-id-as-sha1'
mock_backend.revision_get.return_value = {
'directory': dir_id,
}
stub_dir_entries = [{
'id': b'123',
'type': 'dir'
}, {
'id': b'456',
'type': 'file'
}]
mock_backend.directory_get.return_value = stub_dir_entries
# when
actual_directory_entries = service.lookup_directory_with_revision(
'123')
self.assertEqual(actual_directory_entries['type'], 'dir')
self.assertEqual(list(actual_directory_entries['content']),
stub_dir_entries)
- mock_query.parse_hash.assert_called_once_with(
- '123')
+ mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
+ ('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
mock_backend.directory_get.assert_called_once_with(dir_id)
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_revision_with_path_to_dir(self,
mock_query,
mock_backend):
# given
- mock_query.parse_hash.return_value = 'sha1', b'123'
+ mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
+ b'123')
dir_id = b'dir-id-as-sha1'
mock_backend.revision_get.return_value = {
'directory': dir_id,
}
stub_dir_ls = [
{
'type': 'dir',
'name': b'some/path',
'target': b'456'
},
{
'type': 'file',
'name': b'something-else.hs',
'target': b'789'
}
]
stub_dir_entries = [{
'id': b'12',
'type': 'dir'
}, {
'id': b'34',
'type': 'file'
}]
mock_backend.directory_get.side_effect = [
stub_dir_ls,
stub_dir_entries
]
# when
actual_directory_entries = service.lookup_directory_with_revision(
'123',
'some/path')
self.assertEqual(actual_directory_entries['type'], 'dir')
self.assertEqual(list(actual_directory_entries['content']),
stub_dir_entries)
- mock_query.parse_hash.assert_called_once_with(
- '123')
+ mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
+ ('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
mock_backend.directory_get.assert_has_calls([
call(b'dir-id-as-sha1', recursive=True),
call(b'456')
])
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_revision_with_path_to_file(
self,
mock_query,
mock_backend):
# given
- mock_query.parse_hash.return_value = 'sha1', b'123'
+ mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
+ b'123')
dir_id = b'dir-id-as-sha1'
mock_backend.revision_get.return_value = {
'directory': dir_id,
}
stub_dir_ls = [
{
'type': 'file',
'name': b'some/path/to/file',
'target': b'456'
},
{
'type': 'file',
'name': b'something-else.hs',
'target': b'789'
}
]
mock_backend.directory_get.return_value = stub_dir_ls
stub_content = {
'status': 'visible',
}
mock_backend.content_find.return_value = stub_content
# when
actual_content = service.lookup_directory_with_revision(
'123',
'some/path/to/file')
# then
self.assertEqual(actual_content, {'type': 'file',
'content': stub_content})
- mock_query.parse_hash.assert_called_once_with(
- '123')
+ mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
+ ('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
mock_backend.directory_get.assert_called_once_with(
b'dir-id-as-sha1', recursive=True)
mock_backend.content_find.assert_called_once_with('sha1_git', b'456')
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_ko_revision_with_path_to_nowhere(
self,
mock_query,
mock_backend):
# given
- mock_query.parse_hash.return_value = 'sha1', b'123'
+ mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
+ b'123')
dir_id = b'dir-id-as-sha1'
mock_backend.revision_get.return_value = {
'directory': dir_id,
}
stub_dir_ls = [
{
'type': 'file',
'name': b'some/path/to/dir',
'target': b'456'
},
{
'type': 'file',
'name': b'something-else.hs',
'target': b'789'
}
]
mock_backend.directory_get.return_value = stub_dir_ls
# when
with self.assertRaises(NotFoundExc) as cm:
service.lookup_directory_with_revision(
'123',
'path/to/something/unknown')
self.assertIn("Directory/File 'path/to/something/unknown' " +
"pointed to by revision 123 not found",
cm.exception.args[0])
- mock_query.parse_hash.assert_called_once_with('123')
+ mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
+ ('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
mock_backend.directory_get.assert_called_once_with(
b'dir-id-as-sha1', recursive=True)
@patch('swh.web.ui.service.backend')
@patch('swh.web.ui.service.query')
@istest
def lookup_directory_with_revision_ok_type_not_implemented(
self,
mock_query,
mock_backend):
# given
- mock_query.parse_hash.return_value = 'sha1', b'123'
+ mock_query.parse_hash_with_algorithms_or_throws.return_value = ('sha1',
+ b'123')
dir_id = b'dir-id-as-sha1'
mock_backend.revision_get.return_value = {
'directory': dir_id,
}
stub_dir_ls = [
{
'type': 'rev',
'name': b'some/path/to/rev',
'target': b'456'
},
{
'type': 'file',
'name': b'something-else.hs',
'target': b'789'
}
]
mock_backend.directory_get.return_value = stub_dir_ls
stub_content = {
'id': b'12',
'type': 'file'
}
mock_backend.content_get.return_value = stub_content
# when
with self.assertRaises(NotImplementedError) as cm:
service.lookup_directory_with_revision(
'123',
'some/path/to/rev')
self.assertIn("Entity of type 'rev' not implemented.",
cm.exception.args[0])
# then
- mock_query.parse_hash.assert_called_once_with(
- '123')
+ mock_query.parse_hash_with_algorithms_or_throws.assert_called_once_with
+ ('123', ['sha1'], 'Only sha1_git is supported.')
mock_backend.revision_get.assert_called_once_with(b'123')
mock_backend.directory_get.assert_called_once_with(
b'dir-id-as-sha1', recursive=True)
- @patch('swh.web.ui.service.query')
- @istest
- def lookup_revision_bad_input(self, mock_query):
- # given
- mock_query.parse_hash.return_value = ('sha1_git', 'do not care')
-
- # when
- with self.assertRaises(BadInputExc) as cm:
- service.lookup_revision('123')
- self.assertIn('Only sha1_git is supported.', cm.exception.args[0])
-
- mock_query.parse_hash.assert_called_with('123')
-
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision(self, mock_backend):
# given
mock_backend.revision_get = MagicMock(return_value={
'id': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'),
'directory': hex_to_hash(
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'),
'author': {
'name': b'bill & boule',
'email': b'bill@boule.org',
},
'committer': {
'name': b'boule & bill',
'email': b'boule@bill.org',
},
'message': b'elegant fix for bug 31415957',
'date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'date_offset': 0,
'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
})
# when
actual_revision = service.lookup_revision(
'18d8be353ed3480476f032475e7c233eff7371d5')
# then
self.assertEqual(actual_revision, {
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'author': {
'name': 'bill & boule',
'email': 'bill@boule.org',
},
'committer': {
'name': 'boule & bill',
'email': 'boule@bill.org',
},
'message': 'elegant fix for bug 31415957',
'date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'date_offset': 0,
'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
})
mock_backend.revision_get.assert_called_with(
hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'))
- @patch('swh.web.ui.service.query')
- @istest
- def lookup_revision_log_bad_input(self, mock_query):
- # given
- mock_query.parse_hash.return_value = ('sha1_git', 'do not care')
-
- # when
- with self.assertRaises(BadInputExc) as cm:
- service.lookup_revision_log('123')
- self.assertIn('Only sha1_git is supported.', cm.exception.args[0])
-
- mock_query.parse_hash.assert_called_with('123')
-
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_log(self, mock_backend):
# given
stub_revision_log = [{
'id': hex_to_hash('28d8be353ed3480476f032475e7c233eff7371d5'),
'directory': hex_to_hash(
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'),
'author': {
'name': b'bill & boule',
'email': b'bill@boule.org',
},
'committer': {
'name': b'boule & bill',
'email': b'boule@bill.org',
},
'message': b'elegant fix for bug 31415957',
'date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'date_offset': 0,
'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
}]
mock_backend.revision_log = MagicMock(return_value=stub_revision_log)
# when
actual_revision = service.lookup_revision_log(
'abcdbe353ed3480476f032475e7c233eff7371d5')
# then
self.assertEqual(list(actual_revision), [{
'id': '28d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'author': {
'name': 'bill & boule',
'email': 'bill@boule.org',
},
'committer': {
'name': 'boule & bill',
'email': 'boule@bill.org',
},
'message': 'elegant fix for bug 31415957',
'date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'date_offset': 0,
'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54),
'committer_date_offset': 0,
'synthetic': False,
'type': 'git',
'parents': [],
'metadata': [],
}])
mock_backend.revision_log.assert_called_with(
hex_to_hash('abcdbe353ed3480476f032475e7c233eff7371d5'), 100)
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_raw_not_found(self, mock_backend):
# given
mock_backend.content_find = MagicMock(return_value=None)
# when
actual_content = service.lookup_content_raw(
'sha1:18d8be353ed3480476f032475e7c233eff7371d5')
# then
self.assertIsNone(actual_content)
mock_backend.content_find.assert_called_with(
'sha1', hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_raw(self, mock_backend):
# given
mock_backend.content_find = MagicMock(return_value={
'sha1': '18d8be353ed3480476f032475e7c233eff7371d5',
})
mock_backend.content_get = MagicMock(return_value={
'data': b'binary data'})
# when
actual_content = service.lookup_content_raw(
'sha256:39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926')
# then
self.assertEquals(actual_content, {'data': b'binary data'})
mock_backend.content_find.assert_called_once_with(
'sha256', hex_to_hash('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926'))
mock_backend.content_get.assert_called_once_with(
'18d8be353ed3480476f032475e7c233eff7371d5')
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_not_found(self, mock_backend):
# given
mock_backend.content_find = MagicMock(return_value=None)
# when
actual_content = service.lookup_content(
'sha1:18d8be353ed3480476f032475e7c233eff7371d5')
# then
self.assertIsNone(actual_content)
mock_backend.content_find.assert_called_with(
'sha1', hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_with_sha1(self, mock_backend):
# given
mock_backend.content_find = MagicMock(return_value={
'sha1': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'),
'sha256': hex_to_hash('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926'),
'sha1_git': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'length': 190,
'status': 'hidden',
})
# when
actual_content = service.lookup_content(
'sha1:18d8be353ed3480476f032475e7c233eff7371d5')
# then
self.assertEqual(actual_content, {
'sha1': '18d8be353ed3480476f032475e7c233eff7371d5',
'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d274'
'7d3bf96c926',
'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'length': 190,
'status': 'absent',
})
mock_backend.content_find.assert_called_with(
'sha1', hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_content_with_sha256(self, mock_backend):
# given
mock_backend.content_find = MagicMock(return_value={
'sha1': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'),
'sha256': hex_to_hash('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926'),
'sha1_git': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'length': 360,
'status': 'visible',
})
# when
actual_content = service.lookup_content(
'sha256:39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926')
# then
self.assertEqual(actual_content, {
'sha1': '18d8be353ed3480476f032475e7c233eff7371d5',
'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d274'
'7d3bf96c926',
'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'length': 360,
'status': 'visible',
})
mock_backend.content_find.assert_called_with(
'sha256', hex_to_hash('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_person(self, mock_backend):
# given
mock_backend.person_get = MagicMock(return_value={
'id': 'person_id',
'name': b'some_name',
'email': b'some-email',
})
# when
actual_person = service.lookup_person('person_id')
# then
self.assertEqual(actual_person, {
'id': 'person_id',
'name': 'some_name',
'email': 'some-email',
})
mock_backend.person_get.assert_called_with('person_id')
@patch('swh.web.ui.service.backend')
@istest
def lookup_directory_bad_checksum(self, mock_backend):
# given
mock_backend.directory_get = MagicMock()
# when
with self.assertRaises(BadInputExc):
service.lookup_directory('directory_id')
# then
mock_backend.directory_get.called = False
@patch('swh.web.ui.service.backend')
@istest
def lookup_directory(self, mock_backend):
# given
stub_dir_entries = [{
'sha1': hex_to_hash('5c6f0e2750f48fa0bd0c4cf5976ba0b9e0'
'2ebda5'),
'sha256': hex_to_hash('39007420ca5de7cb3cfc15196335507e'
'e76c98930e7e0afa4d2747d3bf96c926'),
'sha1_git': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'target': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'dir_id': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66'
'c5b00a6d03'),
'name': b'bob',
'type': 10,
}]
expected_dir_entries = [{
'sha1': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5',
'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d2747'
'd3bf96c926',
'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'target': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'dir_id': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'name': 'bob',
'type': 10,
}]
mock_backend.directory_get = MagicMock(
return_value=stub_dir_entries)
# when
actual_directory = service.lookup_directory(
'40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
# then
self.assertEqual(list(actual_directory), expected_dir_entries)
mock_backend.directory_get.assert_called_with(
hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'))
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_by_nothing_found(self, mock_backend):
# given
mock_backend.revision_get_by.return_value = None
# when
actual_revisions = service.lookup_revision_by(1)
# then
self.assertIsNone(actual_revisions)
mock_backend.revision_get_by(1, 'master', None)
@patch('swh.web.ui.service.backend')
@istest
def lookup_revision_by(self, mock_backend):
# given
stub_rev = {
'id': hex_to_hash('28d8be353ed3480476f032475e7c233eff7371d5'),
'directory': hex_to_hash(
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'),
'author': {
'name': b'ynot',
'email': b'ynot@blah.org',
},
'committer': {
'name': b'ynot',
'email': b'ynot@blah.org',
},
'message': b'elegant solution 31415',
'date': datetime.datetime(2016, 1, 17, 11, 23, 54),
'date_offset': 0,
'committer_date': datetime.datetime(2016, 1, 17, 11, 23, 54),
'committer_date_offset': 0,
}
expected_rev = {
'id': '28d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'author': {
'name': 'ynot',
'email': 'ynot@blah.org',
},
'committer': {
'name': 'ynot',
'email': 'ynot@blah.org',
},
'message': 'elegant solution 31415',
'date': datetime.datetime(2016, 1, 17, 11, 23, 54),
'date_offset': 0,
'committer_date': datetime.datetime(2016, 1, 17, 11, 23, 54),
'committer_date_offset': 0,
}
mock_backend.revision_get_by.return_value = stub_rev
# when
actual_revision = service.lookup_revision_by(10, 'master2', 'some-ts')
# then
self.assertEquals(actual_revision, expected_rev)
mock_backend.revision_get_by(1, 'master2', 'some-ts')

File Metadata

Mime Type
text/x-diff
Expires
Sat, Jun 21, 7:23 PM (2 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3251862

Event Timeline