diff --git a/swh/web/ui/controller.py b/swh/web/ui/controller.py index b6396991..89a874e8 100644 --- a/swh/web/ui/controller.py +++ b/swh/web/ui/controller.py @@ -1,335 +1,337 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from flask import render_template, request, flash, url_for from flask.ext.api.decorators import set_renderers from flask.ext.api.renderers import HTMLRenderer from swh.core.hashutil import ALGORITHMS from swh.web.ui.main import app from swh.web.ui import service, renderers, utils from swh.web.ui.exc import BadInputExc, NotFoundExc hash_filter_keys = ALGORITHMS @app.route('/') @set_renderers(HTMLRenderer) def main(): """Home page """ flash('This Web app is still work in progress, use at your own risk', 'warning') # return redirect(url_for('about')) return render_template('home.html') @app.route('/about') @set_renderers(HTMLRenderer) def about(): return render_template('about.html') @app.route('/search') @set_renderers(HTMLRenderer) def search(): """Search for hashes in swh-storage. """ q = request.args.get('q', '') env = {'q': q, 'message': '', 'found': None} try: if q: env.update(service.lookup_hash(q)) except BadInputExc: env['message'] = 'Error: invalid query string' return render_template('search.html', **env) @app.route('/uploadnsearch', methods=['GET', 'POST']) @set_renderers(HTMLRenderer) def uploadnsearch(): """Upload and search for hashes in swh-storage. """ env = {'filename': None, 'message': '', 'found': None} if request.method == 'POST': file = request.files['filename'] try: filename, sha1, found = service.upload_and_search(file) message = 'The file %s with hash %s has%sbeen found.' % ( filename, sha1, ' ' if found else ' not ') env.update({ 'filename': filename, 'sha1': sha1, 'found': found, 'message': message }) except BadInputExc: env['message'] = 'Error: invalid query string' return render_template('upload_and_search.html', **env) def _origin_seen(hash, data): """Given an origin, compute a message string with the right information. Args: origin: a dictionary with keys: - origin: a dictionary with type and url keys - occurrence: a dictionary with a validity range Returns: message as a string """ if data is None: return 'Content with hash %s is unknown as of now.' % hash origin_type = data['origin_type'] origin_url = data['origin_url'] revision = data['revision'] branch = data['branch'] path = data['path'] return """The content with hash %s has been seen on origin with type '%s' at url '%s'. The revision was identified at '%s' on branch '%s'. The file's path referenced was '%s'.""" % (hash, origin_type, origin_url, revision, branch, path) -@app.route('/browse/content/:') +@app.route('/browse/content/') @set_renderers(HTMLRenderer) -def content(hash, sha): +def content(q): """Show content information. Args: hash: hash according to HASH_ALGO, where HASH_ALGO is one of: sha1, sha1_git, sha256. This means that several different URLs (at least one per HASH_ALGO) will point to the same content sha: the sha with 'hash' format Returns: The content's information at sha1_git """ - env = {'hash': hash, 'sha': sha} + env = {'q': q} + + try: + content = service.lookup_hash(q) + found = content['found'] - if hash not in hash_filter_keys: - message = 'The checksum must be one of sha1, sha1_git, sha256' - else: - q = "%s:%s" % (hash, sha) - found = service.lookup_hash(q)['found'] if not found: - message = "Hash %s was not found." % sha + message = "Hash %s was not found." % content['algo'] else: origin = service.lookup_hash_origin(q) message = _origin_seen(hash, origin) + except BadInputExc as e: # do not like it but do not duplicate code + message = e + env['message'] = message return render_template('content.html', **env) @app.route('/api') @app.route('/api/') def api_main_points(): """List the current api endpoints starting with /api/. """ return utils.filter_endpoints(app.url_map, '/api', blacklist=['/api/']) @app.route('/api/1') @app.route('/api/1/') def api_main_points_v1(): """List the current api v1 endpoints starting with /api/. """ return utils.filter_endpoints(app.url_map, '/api/1', blacklist=['/api/1/']) @app.route('/api/1/stat/counters') def api_stats(): """Return statistics as a JSON object""" return service.stat_counters() @app.errorhandler(ValueError) def value_error_as_bad_request(error): """Compute a bad request and add body as payload. """ return renderers.error_response('Bad request', 400, error) @app.errorhandler(NotFoundExc) def value_not_found(error): """Compute a not found and add body as payload. """ return renderers.error_response('Not found', 404, error) @app.route('/api/1/search/') def api_search(q): """Return search results as a JSON object""" return {'found': service.lookup_hash(q)} def _api_lookup(criteria, lookup_fn, error_msg_if_not_found): """Factorize function regarding the api to lookup for data.""" res = lookup_fn(criteria) if not res: raise NotFoundExc(error_msg_if_not_found) return res @app.route('/api/1/origin/') def api_origin(origin_id): """Return information about origin.""" return _api_lookup( origin_id, lookup_fn=service.lookup_origin, error_msg_if_not_found='Origin with id %s not found.' % origin_id) @app.route('/api/1/person/') def api_person(person_id): """Return information about person.""" return _api_lookup( person_id, lookup_fn=service.lookup_person, error_msg_if_not_found='Person with id %s not found.' % person_id) @app.route('/api/1/release/') def api_release(sha1_git): """Return information about release with id sha1_git.""" error_msg = 'Release with sha1_git %s not found.' % sha1_git return _api_lookup( sha1_git, lookup_fn=service.lookup_release, error_msg_if_not_found=error_msg) @app.route('/api/1/revision/') def api_revision(sha1_git): """Return information about revision with id sha1_git. """ error_msg = 'Revision with sha1_git %s not found.' % sha1_git return _api_lookup( sha1_git, lookup_fn=service.lookup_revision, error_msg_if_not_found=error_msg) @app.route('/api/1/directory/') def api_directory(sha1_git): """Return information about release with id sha1_git.""" directory_entries = service.lookup_directory(sha1_git) if not directory_entries: raise NotFoundExc('Directory with sha1_git %s not found.' % sha1_git) return list(directory_entries) @app.route('/api/1/browse/') def api_content_checksum_to_origin(q): """Return content information up to one of its origin if the content is found. Args: q is of the form algo_hash:hash with algo_hash in (sha1, sha1_git, sha256) Returns: Information on one possible origin for such content. Raises: BadInputExc in case of unknown algo_hash or bad hash NotFoundExc if the content is not found. """ found = service.lookup_hash(q)['found'] if not found: raise NotFoundExc('Content with %s not found.' % q) return service.lookup_hash_origin(q) @app.route('/api/1/content//raw') @set_renderers(renderers.PlainRenderer) def api_content_raw(q): """Return content information on the content with provided hash. Args: q is of the form (algo_hash:)hash with algo_hash in (sha1, sha1_git, sha256). If no algo_hash is provided, will work with default sha1 algorithm Actual limitation: Only works with current sha1 Raises: - BadInputExc in case of unknown algo_hash or bad hash - NotFoundExc if the content is not found. """ content = service.lookup_content_raw(q) if not content: raise NotFoundExc('Content with %s not found.' % q) return content['data'] @app.route('/api/1/content/') def api_content_with_details(q): """Return content information on the content with provided hash. Args: q is of the form (algo_hash:)hash with algo_hash in (sha1, sha1_git, sha256). If no algo_hash is provided, will work with default sha1 algorithm Actual limitation: Only works with current sha1 Raises: - BadInputExc in case of unknown algo_hash or bad hash - NotFoundExc if the content is not found. """ content = service.lookup_content(q) if not content: raise NotFoundExc('Content with %s not found.' % q) content['data'] = url_for('api_content_raw', q=content['sha1']) return content @app.route('/api/1/uploadnsearch', methods=['POST']) def api_uploadnsearch(): """Upload the file's content in the post body request. Compute the hash and determine if it exists in the storage. """ file = request.files['filename'] filename, sha1, found = service.upload_and_search(file) return {'sha1': sha1, 'filename': filename, 'found': found} diff --git a/swh/web/ui/query.py b/swh/web/ui/query.py index 326ef368..08d41117 100644 --- a/swh/web/ui/query.py +++ b/swh/web/ui/query.py @@ -1,59 +1,59 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import re from swh.core.hashutil import ALGORITHMS, hex_to_hash from swh.web.ui.exc import BadInputExc SHA256_RE = re.compile(r'^[0-9a-f]{64}$', re.IGNORECASE) SHA1_RE = re.compile(r'^[0-9a-f]{40}$', re.IGNORECASE) def parse_hash(q): """Detect the hash type of a user submitted query string. Args: query string with the following format: "[HASH_TYPE:]HEX_CHECKSUM", where HASH_TYPE is optional, defaults to "sha1", and can be one of swh.core.hashutil.ALGORITHMS Returns: A pair (hash_algorithm, byte hash value) Raises: ValueError if the given query string does not correspond to a valid hash value """ def guess_algo(q): if SHA1_RE.match(q): return 'sha1' elif SHA256_RE.match(q): return 'sha256' else: - raise BadInputExc('invalid checksum query string') + raise BadInputExc('Invalid checksum query string %s' % q) def check_algo(algo, hex): if (algo in set(['sha1', 'sha1_git']) and not SHA1_RE.match(hex)) \ or (algo == 'sha256' and not SHA256_RE.match(hex)): - raise BadInputExc('invalid hash for algorithm ' + algo) + raise BadInputExc('Invalid hash %s for algorithm %s' % (hex, algo)) parts = q.split(':') if len(parts) > 2: - raise BadInputExc('invalid checksum query string') + raise BadInputExc('Invalid checksum query string %s' % q) elif len(parts) == 1: parts = (guess_algo(q), q) elif len(parts) == 2: check_algo(parts[0], parts[1]) algo = parts[0] hash = hex_to_hash(parts[1]) if algo not in ALGORITHMS: - raise BadInputExc('unknown hash algorithm: ' + algo) + raise BadInputExc('Unknown hash algorithm %s' % algo) return (algo, hash) diff --git a/swh/web/ui/service.py b/swh/web/ui/service.py index 178a1a4c..8bcab9c7 100644 --- a/swh/web/ui/service.py +++ b/swh/web/ui/service.py @@ -1,218 +1,219 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.core import hashutil from swh.web.ui import converters, main, query, upload from swh.web.ui.exc import BadInputExc def hash_and_search(filepath): """Hash the filepath's content as sha1, then search in storage if it exists. Args: Filepath of the file to hash and search. Returns: Tuple (hex sha1, found as True or false). The found boolean, according to whether the sha1 of the file is present or not. """ hash = hashutil.hashfile(filepath) return (hashutil.hash_to_hex(hash['sha1']), main.storage().content_exist(hash)) def upload_and_search(file): """Upload a file and compute its hash. """ tmpdir, filename, filepath = upload.save_in_upload_folder(file) try: sha1, found = None, None if filepath: sha1, found = hash_and_search(filepath) return filename, sha1, found finally: # clean up if tmpdir: upload.cleanup(tmpdir) def lookup_hash(q): """Checks if the storage contains a given content checksum Args: query string of the form Returns: Dict with key found to True or False, according to whether the checksum is present or not """ (algo, hash) = query.parse_hash(q) found = main.storage().content_find({algo: hash}) - return {'found': found} + return {'found': found, + 'algo': algo} def lookup_hash_origin(q): """Return information about the checksum contained in the query q. Args: query string of the form Returns: origin as dictionary if found for the given content. """ algo, h = query.parse_hash(q) origin = main.storage().content_find_occurrence({algo: h}) return converters.from_origin(origin) def lookup_origin(origin_id): """Return information about the origin with id origin_id. Args: origin_id as string Returns: origin information as dict. """ return main.storage().origin_get({'id': origin_id}) def lookup_person(person_id): """Return information about the person with id person_id. Args: person_id as string Returns: person information as dict. """ persons = main.storage().person_get([person_id]) if not persons: return None return converters.from_person(persons[0]) def lookup_directory(sha1_git): """Return information about the directory with id sha1_git. Args: sha1_git as string Returns: directory information as dict. """ algo, hBinSha1 = query.parse_hash(sha1_git) if algo != 'sha1': # HACK: sha1_git really but they are both sha1... raise BadInputExc('Only sha1_git is supported.') directory_entries = main.storage().directory_get(hBinSha1) if not directory_entries: return None res = [] for directory_entry in directory_entries: res.append(converters.from_directory_entry(directory_entry)) return res def lookup_release(release_sha1_git): """Return information about the release with sha1 release_sha1_git. Args: release_sha1_git: The release's sha1 as hexadecimal Returns: Release information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. """ algo, hBinSha1 = query.parse_hash(release_sha1_git) if algo != 'sha1': # HACK: sha1_git really but they are both sha1... raise BadInputExc('Only sha1_git is supported.') res = main.storage().release_get([hBinSha1]) if res and len(res) >= 1: return converters.from_release(res[0]) return None def lookup_revision(rev_sha1_git, rev_type='git'): """Return information about the revision with sha1 revision_sha1_git. Args: revision_sha1_git: The revision's sha1 as hexadecimal Returns: Revision information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. """ algo, hBinSha1 = query.parse_hash(rev_sha1_git) if algo != 'sha1': # HACK: sha1_git really but they are both sha1... raise BadInputExc('Only sha1_git is supported.') res = main.storage().revision_get([hBinSha1]) if res and len(res) >= 1: return converters.from_revision(res[0]) return None def lookup_content(q): """Lookup the content designed by q. Args: q: The release's sha1 as hexadecimal """ (algo, hash) = query.parse_hash(q) c = main.storage().content_find({algo: hash}) if c: return converters.from_content(c) return None def lookup_content_raw(q): """Lookup the content designed by q. Args: q: query string of the form Returns: dict with 'sha1' and 'data' keys. data representing its raw data decoded. """ (algo, hash) = query.parse_hash(q) c = main.storage().content_find({algo: hash}) if not c: return None sha1 = c['sha1'] contents = main.storage().content_get([sha1]) if contents and len(contents) >= 1: return converters.from_content(contents[0]) return None def stat_counters(): """Return the stat counters for Software Heritage Returns: A dict mapping textual labels to integer values. """ return main.storage().stat_counters() diff --git a/swh/web/ui/tests/test_service.py b/swh/web/ui/tests/test_service.py index 51df389a..a6ee3700 100644 --- a/swh/web/ui/tests/test_service.py +++ b/swh/web/ui/tests/test_service.py @@ -1,507 +1,511 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import unittest from nose.tools import istest from unittest.mock import MagicMock, patch from swh.core.hashutil import hex_to_hash from swh.web.ui import service from swh.web.ui.exc import BadInputExc from swh.web.ui.tests import test_app class ServiceTestCase(unittest.TestCase): @classmethod def setUpClass(cls): _, _, cls.storage = test_app.init_app() @istest def lookup_hash_does_not_exist(self): # given self.storage.content_find = MagicMock(return_value=None) # when actual_lookup = service.lookup_hash( - 'sha1:123caf10e9535160d90e874b45aa426de762f19f') + 'sha1_git:123caf10e9535160d90e874b45aa426de762f19f') # then - self.assertEquals({'found': None}, actual_lookup) + self.assertEquals({'found': None, + 'algo': 'sha1_git'}, actual_lookup) # check the function has been called with parameters self.storage.content_find.assert_called_with({ - 'sha1': - hex_to_hash('123caf10e9535160d90e874b45aa426de762f19f')}) + 'sha1_git': + hex_to_hash('123caf10e9535160d90e874b45aa426de762f19f') + }) @istest def lookup_hash_exist(self): # given stub_content = { 'sha1': hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f') } self.storage.content_find = MagicMock(return_value=stub_content) # when actual_lookup = service.lookup_hash( 'sha1:456caf10e9535160d90e874b45aa426de762f19f') # then - self.assertEquals({'found': stub_content}, actual_lookup) + self.assertEquals({'found': stub_content, + 'algo': 'sha1'}, actual_lookup) self.storage.content_find.assert_called_with({ 'sha1': - hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')}) + hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f'), + }) @istest def lookup_hash_origin(self): # given self.storage.content_find_occurrence = MagicMock(return_value={ 'origin_type': 'sftp', 'origin_url': 'sftp://ftp.gnu.org/gnu/octave', 'branch': 'octavio-3.4.0.tar.gz', 'revision': b'\xb0L\xaf\x10\xe9SQ`\xd9\x0e\x87KE\xaaBm\xe7b\xf1\x9f', # noqa 'path': b'octavio-3.4.0/doc/interpreter/octave.html/doc_002dS_005fISREG.html' # noqa }) expected_origin = { 'origin_type': 'sftp', 'origin_url': 'sftp://ftp.gnu.org/gnu/octave', 'branch': 'octavio-3.4.0.tar.gz', 'revision': 'b04caf10e9535160d90e874b45aa426de762f19f', 'path': 'octavio-3.4.0/doc/interpreter/octave.html/doc' '_002dS_005fISREG.html' } # when actual_origin = service.lookup_hash_origin( 'sha1_git:456caf10e9535160d90e874b45aa426de762f19f') # then self.assertEqual(actual_origin, expected_origin) self.storage.content_find_occurrence.assert_called_with( {'sha1_git': hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')}) @istest def stat_counters(self): # given input_stats = { "content": 1770830, "directory": 211683, "directory_entry_dir": 209167, "directory_entry_file": 1807094, "directory_entry_rev": 0, "entity": 0, "entity_history": 0, "occurrence": 0, "occurrence_history": 19600, "origin": 1096, "person": 0, "release": 8584, "revision": 7792, "revision_history": 0, "skipped_content": 0 } self.storage.stat_counters = MagicMock(return_value=input_stats) # when actual_stats = service.stat_counters() # then expected_stats = input_stats self.assertEqual(actual_stats, expected_stats) self.storage.stat_counters.assert_called_with() @istest def hash_and_search(self): # given self.storage.content_exist = MagicMock(return_value=False) bhash = hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f') # when with patch( 'swh.core.hashutil.hashfile', return_value={'sha1': bhash}): actual_hash, actual_search = service.hash_and_search('/some/path') # then self.assertEqual(actual_hash, '456caf10e9535160d90e874b45aa426de762f19f') self.assertFalse(actual_search) self.storage.content_exist.assert_called_with({'sha1': bhash}) @patch('swh.web.ui.service.upload') @istest def test_upload_and_search_upload_OK_basic_case(self, mock_upload): # given (cf. decorators patch) mock_upload.save_in_upload_folder.return_value = ( '/tmp/blah', 'some-filename', None) mock_upload.cleanup.return_value = None file = MagicMock() file.filename = 'some-filename' # when actual_file, actual_hash, actual_search = service.upload_and_search( file) # then self.assertEqual(actual_file, 'some-filename') self.assertIsNone(actual_hash) self.assertIsNone(actual_search) mock_upload.save_in_upload_folder.assert_called_with(file) mock_upload.cleanup.assert_called_with('/tmp/blah') @istest def lookup_origin(self): # given self.storage.origin_get = MagicMock(return_value={ 'id': 'origin-id', 'lister': 'uuid-lister', 'project': 'uuid-project', 'url': 'ftp://some/url/to/origin', 'type': 'ftp'}) # when actual_origin = service.lookup_origin('origin-id') # then self.assertEqual(actual_origin, {'id': 'origin-id', 'lister': 'uuid-lister', 'project': 'uuid-project', 'url': 'ftp://some/url/to/origin', 'type': 'ftp'}) self.storage.origin_get.assert_called_with({'id': 'origin-id'}) @istest def lookup_release(self): # given self.storage.release_get = MagicMock(return_value=[{ 'id': hex_to_hash('65a55bbdf3629f916219feb3dcc7393ded1bc8db'), 'revision': None, 'date': datetime.datetime(2015, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc), 'date_offset': None, 'name': 'v0.0.1', 'comment': b'synthetic release', 'synthetic': True, }]) # when actual_release = service.lookup_release( '65a55bbdf3629f916219feb3dcc7393ded1bc8db') # then self.assertEqual(actual_release, { 'id': '65a55bbdf3629f916219feb3dcc7393ded1bc8db', 'revision': None, 'date': datetime.datetime(2015, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc), 'date_offset': None, 'name': 'v0.0.1', 'comment': 'synthetic release', 'synthetic': True, }) self.storage.release_get.assert_called_with( [hex_to_hash('65a55bbdf3629f916219feb3dcc7393ded1bc8db')]) @istest def lookup_release_ko_id_checksum_not_ok_because_not_a_sha1(self): # given self.storage.release_get = MagicMock() with self.assertRaises(BadInputExc) as cm: # when service.lookup_release('not-a-sha1') self.assertIn('invalid checksum', cm.exception.args[0]) self.storage.release_get.called = False @istest def lookup_release_ko_id_checksum_ok_but_not_a_sha1(self): # given self.storage.release_get = MagicMock() # when with self.assertRaises(BadInputExc) as cm: service.lookup_release( '13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f62d4daf5' '1aea892abe') self.assertIn('sha1_git supported', cm.exception.args[0]) self.storage.release_get.called = False @istest def lookup_revision(self): # given self.storage.revision_get = MagicMock(return_value=[{ 'id': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'), 'directory': hex_to_hash( '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'), 'author_name': b'bill & boule', 'author_email': b'bill@boule.org', 'committer_name': b'boule & bill', 'committer_email': b'boule@bill.org', 'message': b'elegant fix for bug 31415957', 'date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'date_offset': 0, 'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'committer_date_offset': 0, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], }]) # when actual_revision = service.lookup_revision( '18d8be353ed3480476f032475e7c233eff7371d5') # then self.assertEqual(actual_revision, { 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author_name': 'bill & boule', 'author_email': 'bill@boule.org', 'committer_name': 'boule & bill', 'committer_email': 'boule@bill.org', 'message': 'elegant fix for bug 31415957', 'date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'date_offset': 0, 'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'committer_date_offset': 0, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], }) self.storage.revision_get.assert_called_with( [hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5')]) @istest def lookup_content_raw_not_found(self): # given self.storage.content_find = MagicMock(return_value=None) # when actual_content = service.lookup_content_raw( 'sha1:18d8be353ed3480476f032475e7c233eff7371d5') # then self.assertIsNone(actual_content) self.storage.content_find.assert_called_with( {'sha1': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5')}) @istest def lookup_content_raw(self): # given self.storage.content_find = MagicMock(return_value={ 'sha1': '18d8be353ed3480476f032475e7c233eff7371d5', }) self.storage.content_get = MagicMock(return_value=[{ 'data': b'binary data', }, {}]) # when actual_content = service.lookup_content_raw( 'sha256:39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926') # then self.assertEquals(actual_content, {'data': 'binary data'}) self.storage.content_find.assert_called_once_with( {'sha256': hex_to_hash('39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926')}) self.storage.content_get.assert_called_once_with( ['18d8be353ed3480476f032475e7c233eff7371d5']) @istest def lookup_content_not_found(self): # given self.storage.content_find = MagicMock(return_value=None) # when actual_content = service.lookup_content( 'sha1:18d8be353ed3480476f032475e7c233eff7371d5') # then self.assertIsNone(actual_content) self.storage.content_find.assert_called_with( {'sha1': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5')}) @istest def lookup_content_with_sha1(self): # given self.storage.content_find = MagicMock(return_value={ 'sha1': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'), 'sha256': hex_to_hash('39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926'), 'sha1_git': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'length': 190, 'status': 'absent', }) # when actual_content = service.lookup_content( 'sha1:18d8be353ed3480476f032475e7c233eff7371d5') # then self.assertEqual(actual_content, { 'sha1': '18d8be353ed3480476f032475e7c233eff7371d5', 'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d274' '7d3bf96c926', 'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'length': 190, }) self.storage.content_find.assert_called_with( {'sha1': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5')}) @istest def lookup_content_with_sha256(self): # given self.storage.content_find = MagicMock(return_value={ 'sha1': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'), 'sha256': hex_to_hash('39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926'), 'sha1_git': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'length': 360, 'status': 'visible', }) # when actual_content = service.lookup_content( 'sha256:39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926') # then self.assertEqual(actual_content, { 'sha1': '18d8be353ed3480476f032475e7c233eff7371d5', 'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d274' '7d3bf96c926', 'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'length': 360, }) self.storage.content_find.assert_called_with( {'sha256': hex_to_hash('39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926')}) @istest def lookup_person(self): # given self.storage.person_get = MagicMock(return_value=[{ 'id': 'person_id', 'name': b'some_name', 'email': b'some-email', }]) # when actual_person = service.lookup_person('person_id') # then self.assertEqual(actual_person, { 'id': 'person_id', 'name': 'some_name', 'email': 'some-email', }) self.storage.person_get.assert_called_with(['person_id']) @istest def lookup_person_not_found(self): # given self.storage.person_get = MagicMock(return_value=[]) # when actual_person = service.lookup_person('person_id') # then self.assertIsNone(actual_person) self.storage.person_get.assert_called_with(['person_id']) @istest def lookup_directory_bad_checksum(self): # given self.storage.directory_get = MagicMock() # when with self.assertRaises(BadInputExc): service.lookup_directory('directory_id') # then self.storage.directory_get.called = False @istest def lookup_directory_not_found(self): # given self.storage.directory_get = MagicMock(return_value=[]) # when actual_directory = service.lookup_directory( '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') # then self.assertIsNone(actual_directory) self.storage.directory_get.assert_called_with( hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')) @istest def lookup_directory(self): # given dir_entries_input = { 'sha1': hex_to_hash('5c6f0e2750f48fa0bd0c4cf5976ba0b9e0' '2ebda5'), 'sha256': hex_to_hash('39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926'), 'sha1_git': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'target': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'dir_id': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'name': b'bob', 'type': 10, } expected_dir_entries = { 'sha1': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5', 'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d2747' 'd3bf96c926', 'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'target': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'dir_id': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'name': 'bob', 'type': 10, } self.storage.directory_get = MagicMock( return_value=[dir_entries_input]) # when actual_directory = service.lookup_directory( '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') # then self.assertIsNotNone(actual_directory) self.assertEqual(actual_directory, [expected_dir_entries]) self.storage.directory_get.assert_called_with( hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'))