diff --git a/swh/web/ui/api.py b/swh/web/ui/api.py index 1cf75e8c6..83425361a 100644 --- a/swh/web/ui/api.py +++ b/swh/web/ui/api.py @@ -1,474 +1,474 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from flask import request, url_for, Response, redirect from swh.web.ui import service from swh.web.ui.exc import BadInputExc, NotFoundExc from swh.web.ui.main import app @app.route('/api/1/stat/counters/') def api_stats(): """Return statistics on SWH storage. Returns: SWH storage's statistics. """ return service.stat_counters() @app.route('/api/1/search/') @app.route('/api/1/search//') def api_search(q='sha1:bd819b5b28fcde3bf114d16a44ac46250da94ee5'): """Search a content per hash. Args: q is of the form algo_hash:hash with algo_hash in (sha1, sha1_git, sha256). Returns: Dictionary with 'found' key and the associated result. Raises: BadInputExc in case of unknown algo_hash or bad hash. Example: GET /api/1/search/sha1:bd819b5b28fcde3bf114d16a44ac46250da94ee5/ """ r = service.lookup_hash(q).get('found') return {'found': True if r else False} def _api_lookup(criteria, lookup_fn, error_msg_if_not_found, enrich_fn=lambda x: x): """Factorize function regarding the api to lookup for data.""" res = lookup_fn(criteria) if not res: raise NotFoundExc(error_msg_if_not_found) if isinstance(res, (map, list)): enriched_data = [] for e in res: enriched_data.append(enrich_fn(e)) return enriched_data return enrich_fn(res) @app.route('/api/1/origin/') @app.route('/api/1/origin//') def api_origin(origin_id=1): """Return information about origin with id origin_id. Args: origin_id: the origin's identifier. Returns: Information on the origin if found. Raises: NotFoundExc if the origin is not found. Example: GET /api/1/origin/1/ """ return _api_lookup( origin_id, lookup_fn=service.lookup_origin, error_msg_if_not_found='Origin with id %s not found.' % origin_id) @app.route('/api/1/person/') @app.route('/api/1/person//') def api_person(person_id=1): """Return information about person with identifier person_id. Args: person_id: the person's identifier. Returns: Information on the person if found. Raises: NotFoundExc if the person is not found. Example: GET /api/1/person/1/ """ return _api_lookup( person_id, lookup_fn=service.lookup_person, error_msg_if_not_found='Person with id %s not found.' % person_id) def enrich_release(release): """Enrich a release with link to the 'target' of 'type' revision. """ if 'target' in release and \ 'target_type' in release and \ release['target_type'] == 'revision': release['target_url'] = url_for('api_revision', sha1_git=release['target']) return release @app.route('/api/1/release/') @app.route('/api/1/release//') def api_release(sha1_git='3c31de6fdc47031857fda10cfa4caf7044cadefb'): """Return information about release with id sha1_git. Args: sha1_git: the release's hash. Returns: Information on the release if found. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc if the release is not found. Example: GET /api/1/release/b307094f00c3641b0c9da808d894f3a325371414 """ error_msg = 'Release with sha1_git %s not found.' % sha1_git return _api_lookup( sha1_git, lookup_fn=service.lookup_release, error_msg_if_not_found=error_msg, enrich_fn=enrich_release) def enrich_revision_with_urls(revision, context=None): """Enrich revision with links where it makes sense (directory, parents). """ if not context: context = revision['id'] revision['url'] = url_for('api_revision', sha1_git=revision['id']) revision['history_url'] = url_for('api_revision_log', sha1_git=revision['id']) if 'directory' in revision: revision['directory_url'] = url_for('api_directory', sha1_git=revision['directory']) if 'parents' in revision: parents = [] for parent in revision['parents']: parents.append(url_for('api_revision_history', sha1_git_root=context, sha1_git=parent)) revision['parent_urls'] = parents if 'children' in revision: children = [] for child in revision['children']: children.append(url_for('api_revision_history', sha1_git_root=context, sha1_git=child)) revision['children_urls'] = children return revision @app.route('/api/1/revision/') @app.route('/api/1/revision//') def api_revision(sha1_git='a585d2b738bfa26326b3f1f40f0f1eda0c067ccf'): """Return information about revision with id sha1_git. Args: sha1_git: the revision's hash. Returns: Information on the revision if found. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc if the revision is not found. Example: GET /api/1/revision/baf18f9fc50a0b6fef50460a76c33b2ddc57486e """ return _api_lookup( sha1_git, lookup_fn=service.lookup_revision, error_msg_if_not_found='Revision with sha1_git %s not' ' found.' % sha1_git, enrich_fn=enrich_revision_with_urls) @app.route('/api/1/revision//directory/') @app.route('/api/1/revision//directory/') -def api_revision_with_directory( +def api_directory_with_revision( sha1_git='a585d2b738bfa26326b3f1f40f0f1eda0c067ccf', dir_path=None): """Return information on directory pointed by revision with sha1_git. If dir_path is not provided, display top level directory. Otherwise, display the directory pointed by dir_path (if it exists). Args: sha1_git: revision's hash. dir_path: optional directory pointed to by that revision. Returns: Information on the directory pointed to by that revision. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc either if the revision is not found or the path referenced does not exist Example: GET /api/1/revision/baf18f9fc50a0b6fef50460a76c33b2ddc57486e/directory/ """ - def lookup_revision_directory(sha1_git, dir_path=dir_path): - return service.lookup_revision_with_directory(sha1_git, dir_path) + def lookup_directory_with_revision(sha1_git, dir_path=dir_path): + return service.lookup_directory_with_revision(sha1_git, dir_path) return _api_lookup( sha1_git, - lookup_fn=lookup_revision_directory, + lookup_fn=lookup_directory_with_revision, error_msg_if_not_found='Revision with sha1_git %s not' ' found.' % sha1_git, enrich_fn=enrich_directory) @app.route('/api/1/revision//history//') def api_revision_history(sha1_git_root, sha1_git): """Return information about revision sha1_git, limited to the sub-graph of all transitive parents of sha1_git_root. In other words, sha1_git is an ancestor of sha1_git_root. Args: sha1_git_root: latest revision of the browsed history. sha1_git: one of sha1_git_root's ancestors. limit: optional query parameter to limit the revisions log (default to 100). For now, note that this limit could impede the transitivity conclusion about sha1_git not being an ancestor of sha1_git_root (even if it is). Returns: Information on sha1_git if it is an ancestor of sha1_git_root including children leading to sha1_git_root. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc if either revision is not found or if sha1_git is not an ancestor of sha1_git_root. """ limit = int(request.args.get('limit', '100')) if sha1_git == sha1_git_root: return redirect(url_for('api_revision', sha1_git=sha1_git)) revision = service.lookup_revision_with_context(sha1_git_root, sha1_git, limit) if not revision: raise NotFoundExc( "Possibly sha1_git '%s' is not an ancestor of sha1_git_root '%s'" % (sha1_git, sha1_git_root)) return enrich_revision_with_urls(revision, context=sha1_git_root) @app.route('/api/1/revision//log/') def api_revision_log(sha1_git): """Show all revisions (~git log) starting from sha1_git. The first element returned is the given sha1_git. Args: sha1_git: the revision's hash. limit: optional query parameter to limit the revisions log (default to 100). Returns: Information on the revision if found. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc if the revision is not found. """ limit = int(request.args.get('limit', '100')) def lookup_revision_log_with_limit(s, limit=limit): return service.lookup_revision_log(s, limit) error_msg = 'Revision with sha1_git %s not found.' % sha1_git return _api_lookup(sha1_git, lookup_fn=lookup_revision_log_with_limit, error_msg_if_not_found=error_msg, enrich_fn=enrich_revision_with_urls) def enrich_directory(directory): """Enrich directory with url to content or directory. """ if 'type' in directory: target_type = directory['type'] target = directory['target'] if target_type == 'file': directory['target_url'] = url_for('api_content_with_details', q='sha1_git:%s' % target) else: directory['target_url'] = url_for('api_directory', sha1_git=target) return directory @app.route('/api/1/directory/') @app.route('/api/1/directory//') def api_directory(sha1_git='dcf3289b576b1c8697f2a2d46909d36104208ba3'): """Return information about release with id sha1_git. Args: sha1_git: Directory's sha1_git. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc if the content is not found. Example: GET /api/1/directory/8d7dc91d18546a91564606c3e3695a5ab568d179 """ error_msg = 'Directory with sha1_git %s not found.' % sha1_git return _api_lookup( sha1_git, lookup_fn=service.lookup_directory, error_msg_if_not_found=error_msg, enrich_fn=enrich_directory) @app.route('/api/1/browse/') @app.route('/api/1/browse//') def api_content_checksum_to_origin(q='sha1_git:26ac0281bc74e9bd8a4a4aab1c7c7a' '0c19d4436c'): """Return content information up to one of its origin if the content is found. Args: q is of the form algo_hash:hash with algo_hash in (sha1, sha1_git, sha256). Returns: Information on one possible origin for such content. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc if the content is not found. Example: GET /api/1/browse/sha1_git:88b9b366facda0b5ff8d8640ee9279bed346f242 """ found = service.lookup_hash(q)['found'] if not found: raise NotFoundExc('Content with %s not found.' % q) return service.lookup_hash_origin(q) @app.route('/api/1/content//raw/') def api_content_raw(q): """Return content's raw data if content is found. Args: q is of the form (algo_hash:)hash with algo_hash in (sha1, sha1_git, sha256). When algo_hash is not provided, 'hash' is considered sha1. Returns: Content's raw data in application/octet-stream. Raises: - BadInputExc in case of unknown algo_hash or bad hash - NotFoundExc if the content is not found. """ def generate(content): yield content['data'] content = service.lookup_content_raw(q) if not content: raise NotFoundExc('Content with %s not found.' % q) return Response(generate(content), mimetype='application/octet-stream') def enrich_content(content): """Enrich content with 'data', a link to its raw content. """ content['data_url'] = url_for('api_content_raw', q=content['sha1']) return content @app.route('/api/1/content/') @app.route('/api/1/content//') def api_content_with_details(q='sha256:e2c76e40866bb6b28916387bdfc8649beceb' '523015738ec6d4d540c7fe65232b'): """Return content information if content is found. Args: q is of the form (algo_hash:)hash with algo_hash in (sha1, sha1_git, sha256). When algo_hash is not provided, 'hash' is considered sha1. Returns: Content's information. Raises: - BadInputExc in case of unknown algo_hash or bad hash. - NotFoundExc if the content is not found. Example: GET /api/1/content/sha256:e2c76e40866bb6b28916387bdfc8649beceb 523015738ec6d4d540c7fe65232b """ return _api_lookup( q, lookup_fn=service.lookup_content, error_msg_if_not_found='Content with %s not found.' % q, enrich_fn=enrich_content) @app.route('/api/1/uploadnsearch/', methods=['POST']) def api_uploadnsearch(): """Upload the file's content in the post body request. Compute its hash and determine if it exists in the storage. Args: request.files filled with the filename's data to upload. Returns: Dictionary with 'sha1', 'filename' and 'found' predicate depending on whether we find it or not. Raises: BadInputExc in case of the form submitted is incorrect. """ file = request.files.get('filename') if not file: raise BadInputExc('Bad request, missing \'filename\' entry in form.') return service.upload_and_search(file) diff --git a/swh/web/ui/service.py b/swh/web/ui/service.py index 206e708f4..c6dbf42d0 100644 --- a/swh/web/ui/service.py +++ b/swh/web/ui/service.py @@ -1,334 +1,334 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from collections import defaultdict from swh.core import hashutil from swh.web.ui import converters, query, upload, backend from swh.web.ui.exc import BadInputExc, NotFoundExc def hash_and_search(filepath): """Hash the filepath's content as sha1, then search in storage if it exists. Args: Filepath of the file to hash and search. Returns: Tuple (hex sha1, found as True or false). The found boolean, according to whether the sha1 of the file is present or not. """ h = hashutil.hashfile(filepath) c = backend.content_find('sha1', h['sha1']) if c: r = converters.from_content(c) r['found'] = True return r else: return {'sha1': hashutil.hash_to_hex(h['sha1']), 'found': False} def upload_and_search(file): """Upload a file and compute its hash. """ tmpdir, filename, filepath = upload.save_in_upload_folder(file) res = {'filename': filename} try: content = hash_and_search(filepath) res.update(content) return res finally: # clean up if tmpdir: upload.cleanup(tmpdir) def lookup_hash(q): """Checks if the storage contains a given content checksum Args: query string of the form Returns: Dict with key found to True or False, according to whether the checksum is present or not """ (algo, hash) = query.parse_hash(q) found = backend.content_find(algo, hash) return {'found': found, 'algo': algo} def lookup_hash_origin(q): """Return information about the checksum contained in the query q. Args: query string of the form Returns: origin as dictionary if found for the given content. """ algo, h = query.parse_hash(q) origin = backend.content_find_occurrence(algo, h) return converters.from_origin(origin) def lookup_origin(origin_id): """Return information about the origin with id origin_id. Args: origin_id as string Returns: origin information as dict. """ return backend.origin_get(origin_id) def lookup_person(person_id): """Return information about the person with id person_id. Args: person_id as string Returns: person information as dict. """ person = backend.person_get(person_id) return converters.from_person(person) def lookup_directory(sha1_git): """Return information about the directory with id sha1_git. Args: sha1_git as string Returns: directory information as dict. """ algo, sha1_git_bin = query.parse_hash(sha1_git) if algo != 'sha1': # HACK: sha1_git really but they are both sha1... raise BadInputExc('Only sha1_git is supported.') directory_entries = backend.directory_get(sha1_git_bin) return map(converters.from_directory_entry, directory_entries) def lookup_release(release_sha1_git): """Return information about the release with sha1 release_sha1_git. Args: release_sha1_git: The release's sha1 as hexadecimal Returns: Release information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. """ algo, sha1_git_bin = query.parse_hash(release_sha1_git) if algo != 'sha1': # HACK: sha1_git really but they are both sha1... raise BadInputExc('Only sha1_git is supported.') res = backend.release_get(sha1_git_bin) return converters.from_release(res) def lookup_revision(rev_sha1_git): """Return information about the revision with sha1 revision_sha1_git. Args: revision_sha1_git: The revision's sha1 as hexadecimal Returns: Revision information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. """ algo, sha1_git_bin = query.parse_hash(rev_sha1_git) if algo != 'sha1': # HACK: sha1_git really but they are both sha1... raise BadInputExc('Only sha1_git is supported.') res = backend.revision_get(sha1_git_bin) return converters.from_revision(res) def lookup_revision_log(rev_sha1_git, limit=100): """Return information about the revision with sha1 revision_sha1_git. Args: revision_sha1_git: The revision's sha1 as hexadecimal limit: the maximum number of revisions returned Returns: Revision information as dict. Raises: ValueError if the identifier provided is not of sha1 nature. """ algo, bin_sha1 = query.parse_hash(rev_sha1_git) if algo != 'sha1': # HACK: sha1_git really but they are both sha1... raise BadInputExc('Only sha1_git is supported.') revision_entries = backend.revision_log(bin_sha1, limit) return map(converters.from_revision, revision_entries) def lookup_revision_with_context(sha1_git_root, sha1_git, limit=100): """Return information about revision sha1_git, limited to the sub-graph of all transitive parents of sha1_git_root. In other words, sha1_git is an ancestor of sha1_git_root. Args: sha1_git_root: latest revision of the browsed history sha1_git: one of sha1_git_root's ancestors limit: limit the lookup to 100 revisions back Returns: Information on sha1_git if it is an ancestor of sha1_git_root including children leading to sha1_git_root Raises: BadInputExc in case of unknown algo_hash or bad hash NotFoundExc if either revision is not found or if sha1_git is not an ancestor of sha1_git_root """ algo, sha1_git_bin = query.parse_hash(sha1_git) if algo != 'sha1': # HACK: sha1_git really but they are both sha1... raise BadInputExc('Only sha1_git is supported.') algo, sha1_git_root_bin = query.parse_hash(sha1_git_root) if algo != 'sha1': # HACK: sha1_git really but they are both sha1... raise BadInputExc('Only sha1_git is supported.') revision = backend.revision_get(sha1_git_bin) if not revision: raise NotFoundExc('Revision %s not found' % sha1_git) revision_root = backend.revision_get(sha1_git_root_bin) if not revision_root: raise NotFoundExc('Revision %s not found' % sha1_git_root) revision_log = backend.revision_log(sha1_git_root_bin, limit) parents = {} children = defaultdict(list) for rev in revision_log: rev_id = rev['id'] parents[rev_id] = [] for parent_id in rev['parents']: parents[rev_id].append(parent_id) children[parent_id].append(rev_id) if revision['id'] not in parents: raise NotFoundExc('Revision %s is not an ancestor of %s' % (sha1_git, sha1_git_root)) revision['children'] = children[revision['id']] return converters.from_revision(revision) -def lookup_revision_with_directory(sha1_git, dir_path=None): +def lookup_directory_with_revision(sha1_git, dir_path=None): """Return information on directory pointed by revision with sha1_git. If dir_path is not provided, display top level directory. Otherwise, display the directory pointed by dir_path (if it exists). Args: sha1_git: revision's hash. dir_path: optional directory pointed to by that revision. Returns: Information on the directory pointed to by that revision. Raises: BadInputExc in case of unknown algo_hash or bad hash. NotFoundExc either if the revision is not found or the path referenced does not exist """ algo, sha1_git_bin = query.parse_hash(sha1_git) if algo != 'sha1': # HACK: sha1_git really but they are both sha1... raise BadInputExc('Only sha1_git is supported.') revision = backend.revision_get(sha1_git_bin) if not revision: raise NotFoundExc('Revision %s not found' % sha1_git) dir_sha1_git_bin = revision['directory'] if dir_path: directory_entries = backend.directory_get(dir_sha1_git_bin, recursive=True) dir_id = None for entry_dir in directory_entries: name = entry_dir['name'].decode('utf-8') type = entry_dir['type'] if name == dir_path and type == 'dir': dir_id = entry_dir['target'] if not dir_id: raise NotFoundExc( "Directory '%s' pointed to by revision %s not found" % (dir_path, sha1_git)) else: dir_id = dir_sha1_git_bin directory_entries = backend.directory_get(dir_id) return map(converters.from_directory_entry, directory_entries) def lookup_content(q): """Lookup the content designed by q. Args: q: The release's sha1 as hexadecimal """ (algo, hash) = query.parse_hash(q) c = backend.content_find(algo, hash) return converters.from_content(c) def lookup_content_raw(q): """Lookup the content designed by q. Args: q: query string of the form Returns: dict with 'sha1' and 'data' keys. data representing its raw data decoded. """ (algo, hash) = query.parse_hash(q) c = backend.content_find(algo, hash) if not c: return None content = backend.content_get(c['sha1']) return converters.from_content(content) def stat_counters(): """Return the stat counters for Software Heritage Returns: A dict mapping textual labels to integer values. """ return backend.stat_counters() diff --git a/swh/web/ui/tests/test_api.py b/swh/web/ui/tests/test_api.py index 286431950..864d9c1dc 100644 --- a/swh/web/ui/tests/test_api.py +++ b/swh/web/ui/tests/test_api.py @@ -1,906 +1,906 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import json import unittest import yaml from nose.tools import istest from unittest.mock import patch, MagicMock from swh.web.ui.tests import test_app from swh.web.ui import api, exc class ApiTestCase(test_app.SWHApiTestCase): @patch('swh.web.ui.api.service') @istest def api_content_checksum_to_origin(self, mock_service): mock_service.lookup_hash.return_value = {'found': True} stub_origin = { "lister": None, "url": "rsync://ftp.gnu.org/old-gnu/webbase", "type": "ftp", "id": 2, "project": None } mock_service.lookup_hash_origin.return_value = stub_origin # when rv = self.app.get( '/api/1/browse/sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, stub_origin) mock_service.lookup_hash.assert_called_once_with( 'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03') mock_service.lookup_hash_origin.assert_called_once_with( 'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.ui.api.service') @istest def api_content_checksum_to_origin_sha_not_found(self, mock_service): # given mock_service.lookup_hash.return_value = {'found': False} # when rv = self.app.get( '/api/1/browse/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Content with sha1:40e71b8614fcd89ccd17ca2b1d9e6' '6c5b00a6d03 not found.' }) mock_service.lookup_hash.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.ui.api.service') @istest def api_content_with_details(self, mock_service): # given mock_service.lookup_content.return_value = { 'sha1': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'sha1_git': 'b4e8f472ffcb01a03875b26e462eb568739f6882', 'sha256': '83c0e67cc80f60caf1fcbec2d84b0ccd7968b3be4735637006560' 'cde9b067a4f', 'length': 17, 'status': 'visible' } # when rv = self.app.get( '/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'data_url': '/api/1/content/' '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/raw/', 'sha1': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'sha1_git': 'b4e8f472ffcb01a03875b26e462eb568739f6882', 'sha256': '83c0e67cc80f60caf1fcbec2d84b0ccd7968b3be4735637006560c' 'de9b067a4f', 'length': 17, 'status': 'visible' }) mock_service.lookup_content.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.ui.api.service') @istest def api_content_not_found_as_json(self, mock_service): # given mock_service.lookup_content.return_value = None mock_service.lookup_hash_origin = MagicMock() # when rv = self.app.get( '/api/1/content/sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3' 'be4735637006560c/') self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Content with sha256:83c0e67cc80f60caf1fcbec2d84b0ccd79' '68b3be4735637006560c not found.' }) mock_service.lookup_content.assert_called_once_with( 'sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3' 'be4735637006560c') mock_service.lookup_hash_origin.called = False @patch('swh.web.ui.api.service') @istest def api_content_not_found_as_yaml(self, mock_service): # given mock_service.lookup_content.return_value = None mock_service.lookup_hash_origin = MagicMock() # when rv = self.app.get( '/api/1/content/sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3' 'be4735637006560c/', headers={'accept': 'application/yaml'}) self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/yaml') response_data = yaml.load(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Content with sha256:83c0e67cc80f60caf1fcbec2d84b0ccd79' '68b3be4735637006560c not found.' }) mock_service.lookup_content.assert_called_once_with( 'sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3' 'be4735637006560c') mock_service.lookup_hash_origin.called = False @patch('swh.web.ui.api.service') @istest def api_content_raw(self, mock_service): # given stub_content = {'data': b'some content data'} mock_service.lookup_content_raw.return_value = stub_content # when rv = self.app.get( '/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03' '/raw/', headers={'Content-type': 'application/octet-stream', 'Content-disposition': 'attachment'}) self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/octet-stream') self.assertEquals(rv.data, stub_content['data']) mock_service.lookup_content_raw.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.ui.api.service') @istest def api_content_raw_not_found(self, mock_service): # given mock_service.lookup_content_raw.return_value = None # when rv = self.app.get( '/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03' '/raw/') self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Content with sha1:40e71b8614fcd89ccd17ca2b1d9e6' '6c5b00a6d03 not found.' }) mock_service.lookup_content_raw.assert_called_once_with( 'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') @patch('swh.web.ui.api.service') @istest def api_search(self, mock_service): # given mock_service.lookup_hash.return_value = { 'found': { 'sha1': 'or something' } } # when rv = self.app.get('/api/1/search/sha1:blah/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, {'found': True}) mock_service.lookup_hash.assert_called_once_with('sha1:blah') @patch('swh.web.ui.api.service') @istest def api_search_as_yaml(self, mock_service): # given mock_service.lookup_hash.return_value = { 'found': { 'sha1': 'sha1 hash' } } # when rv = self.app.get('/api/1/search/sha1:halb/', headers={'Accept': 'application/yaml'}) self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/yaml') response_data = yaml.load(rv.data.decode('utf-8')) self.assertEquals(response_data, {'found': True}) mock_service.lookup_hash.assert_called_once_with('sha1:halb') @patch('swh.web.ui.api.service') @istest def api_search_not_found(self, mock_service): # given mock_service.lookup_hash.return_value = {} # when rv = self.app.get('/api/1/search/sha1:halb/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, {'found': False}) mock_service.lookup_hash.assert_called_once_with('sha1:halb') @patch('swh.web.ui.api.service') @istest def api_1_stat_counters_raise_error(self, mock_service): # given mock_service.stat_counters.side_effect = ValueError( 'voluntary error to check the bad request middleware.') # when rv = self.app.get('/api/1/stat/counters/') # then self.assertEquals(rv.status_code, 400) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'voluntary error to check the bad request middleware.'}) @patch('swh.web.ui.api.service') @istest def api_1_stat_counters(self, mock_service): # given stub_stats = { "content": 1770830, "directory": 211683, "directory_entry_dir": 209167, "directory_entry_file": 1807094, "directory_entry_rev": 0, "entity": 0, "entity_history": 0, "occurrence": 0, "occurrence_history": 19600, "origin": 1096, "person": 0, "release": 8584, "revision": 7792, "revision_history": 0, "skipped_content": 0 } mock_service.stat_counters.return_value = stub_stats # when rv = self.app.get('/api/1/stat/counters/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, stub_stats) mock_service.stat_counters.assert_called_once_with() @patch('swh.web.ui.api.service') @patch('swh.web.ui.api.request') @istest def api_uploadnsearch(self, mock_request, mock_service): # given mock_request.files = {'filename': 'simple-filename'} mock_service.upload_and_search.return_value = { 'filename': 'simple-filename', 'sha1': 'some-hex-sha1', 'found': False, } # when rv = self.app.post('/api/1/uploadnsearch/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, {'filename': 'simple-filename', 'sha1': 'some-hex-sha1', 'found': False}) mock_service.upload_and_search.assert_called_once_with( 'simple-filename') @patch('swh.web.ui.api.service') @istest def api_origin(self, mock_service): # given stub_origin = { 'id': 1234, 'lister': 'uuid-lister-0', 'project': 'uuid-project-0', 'url': 'ftp://some/url/to/origin/0', 'type': 'ftp' } mock_service.lookup_origin.return_value = stub_origin # when rv = self.app.get('/api/1/origin/1234/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, stub_origin) mock_service.lookup_origin.assert_called_with(1234) @patch('swh.web.ui.api.service') @istest def api_origin_not_found(self, mock_service): # given mock_service.lookup_origin.return_value = None # when rv = self.app.get('/api/1/origin/4321/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Origin with id 4321 not found.' }) mock_service.lookup_origin.assert_called_with(4321) @patch('swh.web.ui.api.service') @istest def api_release(self, mock_service): # given stub_release = { 'id': 'release-0', 'target_type': 'revision', 'target': 'revision-sha1', "date": "Mon, 10 Mar 1997 08:00:00 GMT", "synthetic": True, 'author': { 'name': 'author release name', 'email': 'author@email', }, } expected_release = { 'id': 'release-0', 'target_type': 'revision', 'target': 'revision-sha1', 'target_url': '/api/1/revision/revision-sha1/', "date": "Mon, 10 Mar 1997 08:00:00 GMT", "synthetic": True, 'author': { 'name': 'author release name', 'email': 'author@email', }, } mock_service.lookup_release.return_value = stub_release # when rv = self.app.get('/api/1/release/release-0/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_release) mock_service.lookup_release.assert_called_once_with('release-0') @patch('swh.web.ui.api.service') @istest def api_release_target_type_not_a_revision(self, mock_service): # given stub_release = { 'id': 'release-0', 'target_type': 'other-stuff', 'target': 'other-stuff-checksum', "date": "Mon, 10 Mar 1997 08:00:00 GMT", "synthetic": True, 'author': { 'name': 'author release name', 'email': 'author@email', }, } expected_release = { 'id': 'release-0', 'target_type': 'other-stuff', 'target': 'other-stuff-checksum', "date": "Mon, 10 Mar 1997 08:00:00 GMT", "synthetic": True, 'author': { 'name': 'author release name', 'email': 'author@email', }, } mock_service.lookup_release.return_value = stub_release # when rv = self.app.get('/api/1/release/release-0/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_release) mock_service.lookup_release.assert_called_once_with('release-0') @patch('swh.web.ui.api.service') @istest def api_release_not_found(self, mock_service): # given mock_service.lookup_release.return_value = None # when rv = self.app.get('/api/1/release/release-0/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Release with sha1_git release-0 not found.' }) @patch('swh.web.ui.api.service') @istest def api_revision(self, mock_service): # given stub_revision = { 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': ['8734ef7e7c357ce2af928115c6c6a42b7e2a44e7'], 'type': 'tar', 'synthetic': True, 'metadata': { 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912' }] }, } mock_service.lookup_revision.return_value = stub_revision expected_revision = { 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'url': '/api/1/revision/18d8be353ed3480476f032475e7c233eff7371d5/', 'history_url': '/api/1/revision/18d8be353ed3480476f032475e7c233e' 'ff7371d5/log/', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'directory_url': '/api/1/directory/7834ef7e7c357ce2af928115c6c6' 'a42b7e2a44e6/', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': [ '8734ef7e7c357ce2af928115c6c6a42b7e2a44e7' ], 'parent_urls': [ '/api/1/revision/18d8be353ed3480476f032475e7c233eff7371d5' '/history/8734ef7e7c357ce2af928115c6c6a42b7e2a44e7/' ], 'type': 'tar', 'synthetic': True, 'metadata': { 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912' }] }, } # when rv = self.app.get('/api/1/revision/' '18d8be353ed3480476f032475e7c233eff7371d5/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_revision) mock_service.lookup_revision.assert_called_once_with( '18d8be353ed3480476f032475e7c233eff7371d5') @patch('swh.web.ui.api.service') @istest def api_revision_not_found(self, mock_service): # given mock_service.lookup_revision.return_value = None # when rv = self.app.get('/api/1/revision/revision-0/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Revision with sha1_git revision-0 not found.'}) @patch('swh.web.ui.api.service') @istest def api_revision_log(self, mock_service): # given stub_revision = [{ 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': ['7834ef7e7c357ce2af928115c6c6a42b7e2a4345'], 'type': 'tar', 'synthetic': True, }] mock_service.lookup_revision_log.return_value = stub_revision expected_revisions = [{ 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'url': '/api/1/revision/18d8be353ed3480476f032475e7c233eff7371d5/', 'history_url': '/api/1/revision/18d8be353ed3480476f032475e7c233ef' 'f7371d5/log/', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'directory_url': '/api/1/directory/7834ef7e7c357ce2af928115c6c6a' '42b7e2a44e6/', 'author_name': 'Software Heritage', 'author_email': 'robot@softwareheritage.org', 'committer_name': 'Software Heritage', 'committer_email': 'robot@softwareheritage.org', 'message': 'synthetic revision message', 'date_offset': 0, 'committer_date_offset': 0, 'parents': [ '7834ef7e7c357ce2af928115c6c6a42b7e2a4345' ], 'parent_urls': [ '/api/1/revision/18d8be353ed3480476f032475e7c233eff7371d5' '/history/7834ef7e7c357ce2af928115c6c6a42b7e2a4345/' ], 'type': 'tar', 'synthetic': True, }] # when rv = self.app.get('/api/1/revision/8834ef7e7c357ce2af928115c6c6a42' 'b7e2a44e6/log/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_revisions) mock_service.lookup_revision_log.assert_called_once_with( '8834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 100) @patch('swh.web.ui.api.service') @istest def api_revision_log_not_found(self, mock_service): # given mock_service.lookup_revision_log.return_value = None # when rv = self.app.get('/api/1/revision/8834ef7e7c357ce2af928115c6c6a42b7' 'e2a44e6/log/?limit=10') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Revision with sha1_git' ' 8834ef7e7c357ce2af928115c6c6a42b7e2a44e6 not found.'}) mock_service.lookup_revision_log.assert_called_once_with( '8834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 10) @patch('swh.web.ui.api.service') @istest def api_revision_history_not_found(self, mock_service): # given mock_service.lookup_revision_with_context.return_value = None # then rv = self.app.get('/api/1/revision/999/history/338/?limit=5') self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') mock_service.lookup_revision_with_context.assert_called_once_with( '999', '338', 5) @patch('swh.web.ui.api.service') @istest def api_revision_history(self, mock_service): # for readability purposes, we use: # - sha1 as 3 letters (url are way too long otherwise to respect pep8) # - only keys with modification steps (all other keys are kept as is) # given stub_revision = { 'id': '883', 'children': ['777', '999'], 'parents': [], 'directory': '272' } mock_service.lookup_revision_with_context.return_value = stub_revision # then rv = self.app.get('/api/1/revision/666/history/883/') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'id': '883', 'url': '/api/1/revision/883/', 'history_url': '/api/1/revision/883/log/', 'children': ['777', '999'], 'children_urls': ['/api/1/revision/666/history/777/', '/api/1/revision/666/history/999/'], 'parents': [], 'parent_urls': [], 'directory': '272', 'directory_url': '/api/1/directory/272/' }) mock_service.lookup_revision_with_context.assert_called_once_with( '666', '883', 100) @patch('swh.web.ui.api.service') @istest - def api_revision_with_directory_not_found(self, mock_service): + def api_directory_with_revision_not_found(self, mock_service): # given - mock_service.lookup_revision_with_directory.return_value = None + mock_service.lookup_directory_with_revision.return_value = None # then rv = self.app.get('/api/1/revision/999/directory/some/path/to/dir') self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') - mock_service.lookup_revision_with_directory.assert_called_once_with( + mock_service.lookup_directory_with_revision.assert_called_once_with( '999', 'some/path/to/dir') @patch('swh.web.ui.api.service') @istest - def api_revision_with_directory_not_found_2(self, mock_service): + def api_directory_with_revision_not_found_2(self, mock_service): # given - mock_service.lookup_revision_with_directory.return_value = None + mock_service.lookup_directory_with_revision.return_value = None # then rv = self.app.get('/api/1/revision/123/directory/') self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') - mock_service.lookup_revision_with_directory.assert_called_once_with( + mock_service.lookup_directory_with_revision.assert_called_once_with( '123', None) @patch('swh.web.ui.api.service') - # @istest - def api_revision_with_directory(self, mock_service): + @istest + def api_directory_with_revision(self, mock_service): stub_dir = { 'sha1_git': '18d8be353ed3480476f032475e7c233eff7371d5', 'type': 'file', 'target': '4568be353ed3480476f032475e7c233eff737123', } expected_dir = { 'sha1_git': '18d8be353ed3480476f032475e7c233eff7371d5', 'type': 'file', 'target': '4568be353ed3480476f032475e7c233eff737123', 'target_url': '/api/1/content/' 'sha1_git:4568be353ed3480476f032475e7c233eff737123/', } # given - mock_service.lookup_revision_with_directory.return_value = stub_dir + mock_service.lookup_directory_with_revision.return_value = stub_dir # then rv = self.app.get('/api/1/revision/999/directory/some/path') self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_dir) - mock_service.lookup_revision_with_directory.assert_called_once_with( + mock_service.lookup_directory_with_revision.assert_called_once_with( '999', 'some/path') @patch('swh.web.ui.api.service') @istest def api_person(self, mock_service): # given stub_person = { 'id': '198003', 'name': 'Software Heritage', 'email': 'robot@softwareheritage.org', } mock_service.lookup_person.return_value = stub_person # when rv = self.app.get('/api/1/person/198003/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, stub_person) @patch('swh.web.ui.api.service') @istest def api_person_not_found(self, mock_service): # given mock_service.lookup_person.return_value = None # when rv = self.app.get('/api/1/person/666/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Person with id 666 not found.'}) @patch('swh.web.ui.api.service') @istest def api_directory(self, mock_service): # given stub_directories = [ { 'sha1_git': '18d8be353ed3480476f032475e7c233eff7371d5', 'type': 'file', 'target': '4568be353ed3480476f032475e7c233eff737123', }, { 'sha1_git': '1d518d8be353ed3480476f032475e7c233eff737', 'type': 'dir', 'target': '8be353ed3480476f032475e7c233eff737123456', }] expected_directories = [ { 'sha1_git': '18d8be353ed3480476f032475e7c233eff7371d5', 'type': 'file', 'target': '4568be353ed3480476f032475e7c233eff737123', 'target_url': '/api/1/content/' 'sha1_git:4568be353ed3480476f032475e7c233eff737123/', }, { 'sha1_git': '1d518d8be353ed3480476f032475e7c233eff737', 'type': 'dir', 'target': '8be353ed3480476f032475e7c233eff737123456', 'target_url': '/api/1/directory/8be353ed3480476f032475e7c233eff737123456/', }] mock_service.lookup_directory.return_value = stub_directories # when rv = self.app.get('/api/1/directory/' '18d8be353ed3480476f032475e7c233eff7371d5/') # then self.assertEquals(rv.status_code, 200) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, expected_directories) mock_service.lookup_directory.assert_called_once_with( '18d8be353ed3480476f032475e7c233eff7371d5') @patch('swh.web.ui.api.service') @istest def api_directory_not_found(self, mock_service): # given mock_service.lookup_directory.return_value = [] # when rv = self.app.get('/api/1/directory/' '66618d8be353ed3480476f032475e7c233eff737/') # then self.assertEquals(rv.status_code, 404) self.assertEquals(rv.mimetype, 'application/json') response_data = json.loads(rv.data.decode('utf-8')) self.assertEquals(response_data, { 'error': 'Directory with sha1_git ' '66618d8be353ed3480476f032475e7c233eff737 not found.'}) class ApiUtils(unittest.TestCase): @istest def api_lookup_not_found(self): # when with self.assertRaises(exc.NotFoundExc) as e: api._api_lookup('something', lambda x: None, 'this is the error message raised as it is None') self.assertEqual(e.exception.args[0], 'this is the error message raised as it is None') @istest def api_lookup_with_result(self): # when actual_result = api._api_lookup('something', lambda x: x + '!', 'this is the error which won\'t be ' 'used here') self.assertEqual(actual_result, 'something!') @istest def api_lookup_with_result_as_map(self): # when actual_result = api._api_lookup([1, 2, 3], lambda x: map(lambda y: y+1, x), 'this is the error which won\'t be ' 'used here') self.assertEqual(actual_result, [2, 3, 4]) diff --git a/swh/web/ui/tests/test_service.py b/swh/web/ui/tests/test_service.py index e6d06944c..473fbaa81 100644 --- a/swh/web/ui/tests/test_service.py +++ b/swh/web/ui/tests/test_service.py @@ -1,910 +1,910 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from nose.tools import istest from unittest.mock import MagicMock, patch, call from swh.core.hashutil import hex_to_hash, hash_to_hex from swh.web.ui import service from swh.web.ui.exc import BadInputExc, NotFoundExc from swh.web.ui.tests import test_app class ServiceTestCase(test_app.SWHApiTestCase): @patch('swh.web.ui.service.backend') @istest def lookup_hash_does_not_exist(self, mock_backend): # given mock_backend.content_find = MagicMock(return_value=None) # when actual_lookup = service.lookup_hash( 'sha1_git:123caf10e9535160d90e874b45aa426de762f19f') # then self.assertEquals({'found': None, 'algo': 'sha1_git'}, actual_lookup) # check the function has been called with parameters mock_backend.content_find.assert_called_with( 'sha1_git', hex_to_hash('123caf10e9535160d90e874b45aa426de762f19f')) @patch('swh.web.ui.service.backend') @istest def lookup_hash_exist(self, mock_backend): # given stub_content = { 'sha1': hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f') } mock_backend.content_find = MagicMock(return_value=stub_content) # when actual_lookup = service.lookup_hash( 'sha1:456caf10e9535160d90e874b45aa426de762f19f') # then self.assertEquals({'found': stub_content, 'algo': 'sha1'}, actual_lookup) mock_backend.content_find.assert_called_with( 'sha1', hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f'), ) @patch('swh.web.ui.service.backend') @istest def lookup_hash_origin(self, mock_backend): # given mock_backend.content_find_occurrence = MagicMock(return_value={ 'origin_type': 'sftp', 'origin_url': 'sftp://ftp.gnu.org/gnu/octave', 'branch': 'octavio-3.4.0.tar.gz', 'revision': b'\xb0L\xaf\x10\xe9SQ`\xd9\x0e\x87KE\xaaBm\xe7b\xf1\x9f', # noqa 'path': b'octavio-3.4.0/doc/interpreter/octave.html/doc_002dS_005fISREG.html' # noqa }) expected_origin = { 'origin_type': 'sftp', 'origin_url': 'sftp://ftp.gnu.org/gnu/octave', 'branch': 'octavio-3.4.0.tar.gz', 'revision': 'b04caf10e9535160d90e874b45aa426de762f19f', 'path': 'octavio-3.4.0/doc/interpreter/octave.html/doc' '_002dS_005fISREG.html' } # when actual_origin = service.lookup_hash_origin( 'sha1_git:456caf10e9535160d90e874b45aa426de762f19f') # then self.assertEqual(actual_origin, expected_origin) mock_backend.content_find_occurrence.assert_called_with( 'sha1_git', hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')) @patch('swh.web.ui.service.backend') @istest def stat_counters(self, mock_backend): # given input_stats = { "content": 1770830, "directory": 211683, "directory_entry_dir": 209167, "directory_entry_file": 1807094, "directory_entry_rev": 0, "entity": 0, "entity_history": 0, "occurrence": 0, "occurrence_history": 19600, "origin": 1096, "person": 0, "release": 8584, "revision": 7792, "revision_history": 0, "skipped_content": 0 } mock_backend.stat_counters = MagicMock(return_value=input_stats) # when actual_stats = service.stat_counters() # then expected_stats = input_stats self.assertEqual(actual_stats, expected_stats) mock_backend.stat_counters.assert_called_with() @patch('swh.web.ui.service.backend') @patch('swh.web.ui.service.hashutil') @istest def hash_and_search(self, mock_hashutil, mock_backend): # given bhash = hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f') mock_hashutil.hashfile.return_value = {'sha1': bhash} mock_backend.content_find = MagicMock(return_value={ 'sha1': bhash, 'sha1_git': bhash, }) # when actual_content = service.hash_and_search('/some/path') # then self.assertEqual(actual_content, { 'sha1': '456caf10e9535160d90e874b45aa426de762f19f', 'sha1_git': '456caf10e9535160d90e874b45aa426de762f19f', 'found': True, }) mock_hashutil.hashfile.assert_called_once_with('/some/path') mock_backend.content_find.assert_called_once_with('sha1', bhash) @patch('swh.web.ui.service.hashutil') @istest def hash_and_search_not_found(self, mock_hashutil): # given bhash = hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f') mock_hashutil.hashfile.return_value = {'sha1': bhash} mock_hashutil.hash_to_hex = MagicMock( return_value='456caf10e9535160d90e874b45aa426de762f19f') self.storage.content_find = MagicMock(return_value=None) # when actual_content = service.hash_and_search('/some/path') # then self.assertEqual(actual_content, { 'sha1': '456caf10e9535160d90e874b45aa426de762f19f', 'found': False, }) mock_hashutil.hashfile.assert_called_once_with('/some/path') self.storage.content_find.assert_called_once_with({'sha1': bhash}) mock_hashutil.hash_to_hex.assert_called_once_with(bhash) @patch('swh.web.ui.service.upload') @istest def test_upload_and_search(self, mock_upload): mock_upload.save_in_upload_folder.return_value = ( '/tmp/dir', 'some-filename', '/tmp/dir/path/some-filename') service.hash_and_search = MagicMock(side_effect=lambda filepath: {'sha1': 'blah', 'found': True}) mock_upload.cleanup.return_value = None file = MagicMock(filename='some-filename') # when actual_res = service.upload_and_search(file) # then self.assertEqual(actual_res, { 'filename': 'some-filename', 'sha1': 'blah', 'found': True}) mock_upload.save_in_upload_folder.assert_called_with(file) mock_upload.cleanup.assert_called_with('/tmp/dir') service.hash_and_search.assert_called_once_with( '/tmp/dir/path/some-filename') @patch('swh.web.ui.service.backend') @istest def lookup_origin(self, mock_backend): # given mock_backend.origin_get = MagicMock(return_value={ 'id': 'origin-id', 'lister': 'uuid-lister', 'project': 'uuid-project', 'url': 'ftp://some/url/to/origin', 'type': 'ftp'}) # when actual_origin = service.lookup_origin('origin-id') # then self.assertEqual(actual_origin, {'id': 'origin-id', 'lister': 'uuid-lister', 'project': 'uuid-project', 'url': 'ftp://some/url/to/origin', 'type': 'ftp'}) mock_backend.origin_get.assert_called_with('origin-id') @patch('swh.web.ui.service.backend') @istest def lookup_release_ko_id_checksum_not_ok_because_not_a_sha1(self, mock_backend): # given mock_backend.release_get = MagicMock() with self.assertRaises(BadInputExc) as cm: # when service.lookup_release('not-a-sha1') self.assertIn('invalid checksum', cm.exception.args[0]) mock_backend.release_get.called = False @patch('swh.web.ui.service.backend') @istest def lookup_release_ko_id_checksum_ok_but_not_a_sha1(self, mock_backend): # given mock_backend.release_get = MagicMock() # when with self.assertRaises(BadInputExc) as cm: service.lookup_release( '13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f62d4daf5' '1aea892abe') self.assertIn('sha1_git supported', cm.exception.args[0]) mock_backend.release_get.called = False @patch('swh.web.ui.service.backend') @istest def lookup_release(self, mock_backend): # given mock_backend.release_get = MagicMock(return_value={ 'id': hex_to_hash('65a55bbdf3629f916219feb3dcc7393ded1bc8db'), 'target': None, 'date': datetime.datetime(2015, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc), 'name': b'v0.0.1', 'message': b'synthetic release', 'synthetic': True, }) # when actual_release = service.lookup_release( '65a55bbdf3629f916219feb3dcc7393ded1bc8db') # then self.assertEqual(actual_release, { 'id': '65a55bbdf3629f916219feb3dcc7393ded1bc8db', 'target': None, 'date': datetime.datetime(2015, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc), 'name': 'v0.0.1', 'message': 'synthetic release', 'synthetic': True, }) mock_backend.release_get.assert_called_with( hex_to_hash('65a55bbdf3629f916219feb3dcc7393ded1bc8db')) @istest def lookup_revision_with_context_ko_not_a_sha1_1(self): # given sha1_git = '13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f62d4' \ 'daf51aea892abe' sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db' # when with self.assertRaises(BadInputExc) as cm: service.lookup_revision_with_context(sha1_git_root, sha1_git) self.assertIn('Only sha1_git is supported', cm.exception.args[0]) @istest def lookup_revision_with_context_ko_not_a_sha1_2(self): # given sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db' sha1_git = '13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f6' \ '2d4daf51aea892abe' # when with self.assertRaises(BadInputExc) as cm: service.lookup_revision_with_context(sha1_git_root, sha1_git) self.assertIn('Only sha1_git is supported', cm.exception.args[0]) @patch('swh.web.ui.service.backend') @istest def lookup_revision_with_context_ko_sha1_git_does_not_exist( self, mock_backend): # given sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db' sha1_git = '777777bdf3629f916219feb3dcc7393ded1bc8db' sha1_git_bin = hex_to_hash(sha1_git) mock_backend.revision_get.return_value = None # when with self.assertRaises(NotFoundExc) as cm: service.lookup_revision_with_context(sha1_git_root, sha1_git) self.assertIn('Revision 777777bdf3629f916219feb3dcc7393ded1bc8db' ' not found', cm.exception.args[0]) mock_backend.revision_get.assert_called_once_with( sha1_git_bin) @patch('swh.web.ui.service.backend') @istest def lookup_revision_with_context_ko_root_sha1_git_does_not_exist( self, mock_backend): # given sha1_git_root = '65a55bbdf3629f916219feb3dcc7393ded1bc8db' sha1_git = '777777bdf3629f916219feb3dcc7393ded1bc8db' sha1_git_root_bin = hex_to_hash(sha1_git_root) sha1_git_bin = hex_to_hash(sha1_git) mock_backend.revision_get.side_effect = ['foo', None] # when with self.assertRaises(NotFoundExc) as cm: service.lookup_revision_with_context(sha1_git_root, sha1_git) self.assertIn('Revision 65a55bbdf3629f916219feb3dcc7393ded1bc8db' ' not found', cm.exception.args[0]) mock_backend.revision_get.assert_has_calls([call(sha1_git_bin), call(sha1_git_root_bin)]) @patch('swh.web.ui.service.backend') @patch('swh.web.ui.service.query') @istest def lookup_revision_with_context(self, mock_query, mock_backend): # given sha1_git_root = '666' sha1_git = '883' sha1_git_root_bin = b'666' sha1_git_bin = b'883' sha1_git_root_dict = { 'id': sha1_git_root_bin, 'parents': [b'999'], } sha1_git_dict = { 'id': sha1_git_bin, 'parents': [], 'directory': b'278', } stub_revisions = [ sha1_git_root_dict, { 'id': b'999', 'parents': [b'777', b'883', b'888'], }, { 'id': b'777', 'parents': [b'883'], }, sha1_git_dict, { 'id': b'888', 'parents': [b'889'], }, { 'id': b'889', 'parents': [], }, ] # inputs ok mock_query.parse_hash.side_effect = [ ('sha1', sha1_git_bin), ('sha1', sha1_git_root_bin) ] # lookup revision first 883, then 666 (both exists) mock_backend.revision_get.side_effect = [ sha1_git_dict, sha1_git_root_dict ] mock_backend.revision_log = MagicMock( return_value=stub_revisions) # when actual_revision = service.lookup_revision_with_context( sha1_git_root, sha1_git) print('actual_revision', actual_revision) # then self.assertEquals(actual_revision, { 'id': hash_to_hex(sha1_git_bin), 'parents': [], 'children': [hash_to_hex(b'999'), hash_to_hex(b'777')], 'directory': hash_to_hex(b'278'), }) mock_query.parse_hash.assert_has_calls([call(sha1_git), call(sha1_git_root)]) mock_backend.revision_log.assert_called_with( sha1_git_root_bin, 100) @istest - def lookup_revision_with_directory_bad_input(self): + def lookup_directory_with_revision_bad_input(self): with self.assertRaises(BadInputExc) as cm: - service.lookup_revision_with_directory('123', 'some/path') + service.lookup_directory_with_revision('123', 'some/path') self.assertIn('Only sha1_git is supported', cm.exception.args[0]) @patch('swh.web.ui.service.backend') @patch('swh.web.ui.service.query') @istest - def lookup_revision_with_directory_revision_not_found(self, + def lookup_directory_with_revision_revision_not_found(self, mock_query, mock_backend): # given mock_query.parse_hash.return_value = 'sha1', b'123' mock_backend.revision_get.return_value = None # when with self.assertRaises(NotFoundExc) as cm: - service.lookup_revision_with_directory('123') + service.lookup_directory_with_revision('123') self.assertIn('Revision 123 not found', cm.exception.args[0]) mock_query.parse_hash.assert_called_once_with('123') mock_backend.revision_get.assert_called_once_with(b'123') @patch('swh.web.ui.service.backend') @patch('swh.web.ui.service.query') @istest - def lookup_revision_with_directory_revision_without_path(self, + def lookup_directory_with_revision_revision_without_path(self, mock_query, mock_backend): # given mock_query.parse_hash.return_value = 'sha1', b'123' dir_id = b'dir-id-as-sha1' mock_backend.revision_get.return_value = { 'directory': dir_id, } stub_dir_entries = [{ 'id': b'123', 'type': 'dir' }, { 'id': b'456', 'type': 'file' }] mock_backend.directory_get.return_value = stub_dir_entries # when - actual_directory_entries = service.lookup_revision_with_directory( + actual_directory_entries = service.lookup_directory_with_revision( '123') self.assertEqual(list(actual_directory_entries), stub_dir_entries) mock_query.parse_hash.assert_called_once_with( '123') mock_backend.revision_get.assert_called_once_with(b'123') mock_backend.directory_get.assert_called_once_with(dir_id) @patch('swh.web.ui.service.backend') @patch('swh.web.ui.service.query') @istest - def lookup_revision_with_directory_revision_with_path(self, + def lookup_directory_with_revision_revision_with_path(self, mock_query, mock_backend): # given mock_query.parse_hash.return_value = 'sha1', b'123' dir_id = b'dir-id-as-sha1' mock_backend.revision_get.return_value = { 'directory': dir_id, } stub_dir_ls = [ { 'type': 'dir', 'name': b'some/path', 'target': b'456' }, { 'type': 'file', 'name': b'something-else.hs', 'target': b'789' } ] stub_dir_entries = [{ 'id': b'12', 'type': 'dir' }, { 'id': b'34', 'type': 'file' }] mock_backend.directory_get.side_effect = [ stub_dir_ls, stub_dir_entries ] # when - actual_directory_entries = service.lookup_revision_with_directory( + actual_directory_entries = service.lookup_directory_with_revision( '123', 'some/path') self.assertEqual(list(actual_directory_entries), stub_dir_entries) mock_query.parse_hash.assert_called_once_with( '123') mock_backend.revision_get.assert_called_once_with(b'123') mock_backend.directory_get.assert_has_calls([ call(b'dir-id-as-sha1', recursive=True), call(b'456') ]) @patch('swh.web.ui.service.backend') @patch('swh.web.ui.service.query') @istest - def lookup_revision_with_directory_revision_with_path_ko_dir_not_found( + def lookup_directory_with_revision_revision_with_path_ko_dir_not_found( self, mock_query, mock_backend): # given mock_query.parse_hash.return_value = 'sha1', b'123' dir_id = b'dir-id-as-sha1' mock_backend.revision_get.return_value = { 'directory': dir_id, } stub_dir_ls = [ { 'type': 'file', 'name': b'some/path/but-not-a-dir', 'target': b'456' }, { 'type': 'file', 'name': b'something-else.hs', 'target': b'789' } ] stub_dir_entries = [{ 'id': b'12', 'type': 'dir' }, { 'id': b'34', 'type': 'file' }] mock_backend.directory_get.side_effect = [ stub_dir_ls, stub_dir_entries ] # when with self.assertRaises(NotFoundExc) as cm: - service.lookup_revision_with_directory( + service.lookup_directory_with_revision( '123', 'some/path-but-not-a-dir') self.assertIn("Directory 'some/path/but-not-a-dir' pointed to by" + " revision 123 not found", cm.exception.args[0]) mock_query.parse_hash.assert_called_once_with( '123') mock_backend.revision_get.assert_called_once_with(b'123') mock_backend.directory_get.assert_called_once_with( b'dir-id-as-sha1', recursive=True) @patch('swh.web.ui.service.backend') @istest def lookup_revision(self, mock_backend): # given mock_backend.revision_get = MagicMock(return_value={ 'id': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'), 'directory': hex_to_hash( '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'), 'author': { 'name': b'bill & boule', 'email': b'bill@boule.org', }, 'committer': { 'name': b'boule & bill', 'email': b'boule@bill.org', }, 'message': b'elegant fix for bug 31415957', 'date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'date_offset': 0, 'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'committer_date_offset': 0, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], }) # when actual_revision = service.lookup_revision( '18d8be353ed3480476f032475e7c233eff7371d5') # then self.assertEqual(actual_revision, { 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author': { 'name': 'bill & boule', 'email': 'bill@boule.org', }, 'committer': { 'name': 'boule & bill', 'email': 'boule@bill.org', }, 'message': 'elegant fix for bug 31415957', 'date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'date_offset': 0, 'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'committer_date_offset': 0, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], }) mock_backend.revision_get.assert_called_with( hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5')) @patch('swh.web.ui.service.backend') @istest def lookup_revision_log(self, mock_backend): # given stub_revision_log = [{ 'id': hex_to_hash('28d8be353ed3480476f032475e7c233eff7371d5'), 'directory': hex_to_hash( '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'), 'author': { 'name': b'bill & boule', 'email': b'bill@boule.org', }, 'committer': { 'name': b'boule & bill', 'email': b'boule@bill.org', }, 'message': b'elegant fix for bug 31415957', 'date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'date_offset': 0, 'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'committer_date_offset': 0, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], }] mock_backend.revision_log = MagicMock(return_value=stub_revision_log) # when actual_revision = service.lookup_revision_log( 'abcdbe353ed3480476f032475e7c233eff7371d5') # then self.assertEqual(list(actual_revision), [{ 'id': '28d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author': { 'name': 'bill & boule', 'email': 'bill@boule.org', }, 'committer': { 'name': 'boule & bill', 'email': 'boule@bill.org', }, 'message': 'elegant fix for bug 31415957', 'date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'date_offset': 0, 'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'committer_date_offset': 0, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], }]) mock_backend.revision_log.assert_called_with( hex_to_hash('abcdbe353ed3480476f032475e7c233eff7371d5'), 100) @patch('swh.web.ui.service.backend') @istest def lookup_content_raw_not_found(self, mock_backend): # given mock_backend.content_find = MagicMock(return_value=None) # when actual_content = service.lookup_content_raw( 'sha1:18d8be353ed3480476f032475e7c233eff7371d5') # then self.assertIsNone(actual_content) mock_backend.content_find.assert_called_with( 'sha1', hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5')) @patch('swh.web.ui.service.backend') @istest def lookup_content_raw(self, mock_backend): # given mock_backend.content_find = MagicMock(return_value={ 'sha1': '18d8be353ed3480476f032475e7c233eff7371d5', }) mock_backend.content_get = MagicMock(return_value={ 'data': b'binary data'}) # when actual_content = service.lookup_content_raw( 'sha256:39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926') # then self.assertEquals(actual_content, {'data': b'binary data'}) mock_backend.content_find.assert_called_once_with( 'sha256', hex_to_hash('39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926')) mock_backend.content_get.assert_called_once_with( '18d8be353ed3480476f032475e7c233eff7371d5') @patch('swh.web.ui.service.backend') @istest def lookup_content_not_found(self, mock_backend): # given mock_backend.content_find = MagicMock(return_value=None) # when actual_content = service.lookup_content( 'sha1:18d8be353ed3480476f032475e7c233eff7371d5') # then self.assertIsNone(actual_content) mock_backend.content_find.assert_called_with( 'sha1', hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5')) @patch('swh.web.ui.service.backend') @istest def lookup_content_with_sha1(self, mock_backend): # given mock_backend.content_find = MagicMock(return_value={ 'sha1': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'), 'sha256': hex_to_hash('39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926'), 'sha1_git': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'length': 190, 'status': 'hidden', }) # when actual_content = service.lookup_content( 'sha1:18d8be353ed3480476f032475e7c233eff7371d5') # then self.assertEqual(actual_content, { 'sha1': '18d8be353ed3480476f032475e7c233eff7371d5', 'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d274' '7d3bf96c926', 'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'length': 190, 'status': 'absent', }) mock_backend.content_find.assert_called_with( 'sha1', hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5')) @patch('swh.web.ui.service.backend') @istest def lookup_content_with_sha256(self, mock_backend): # given mock_backend.content_find = MagicMock(return_value={ 'sha1': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'), 'sha256': hex_to_hash('39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926'), 'sha1_git': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'length': 360, 'status': 'visible', }) # when actual_content = service.lookup_content( 'sha256:39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926') # then self.assertEqual(actual_content, { 'sha1': '18d8be353ed3480476f032475e7c233eff7371d5', 'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d274' '7d3bf96c926', 'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'length': 360, 'status': 'visible', }) mock_backend.content_find.assert_called_with( 'sha256', hex_to_hash('39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926')) @patch('swh.web.ui.service.backend') @istest def lookup_person(self, mock_backend): # given mock_backend.person_get = MagicMock(return_value={ 'id': 'person_id', 'name': b'some_name', 'email': b'some-email', }) # when actual_person = service.lookup_person('person_id') # then self.assertEqual(actual_person, { 'id': 'person_id', 'name': 'some_name', 'email': 'some-email', }) mock_backend.person_get.assert_called_with('person_id') @patch('swh.web.ui.service.backend') @istest def lookup_directory_bad_checksum(self, mock_backend): # given mock_backend.directory_get = MagicMock() # when with self.assertRaises(BadInputExc): service.lookup_directory('directory_id') # then mock_backend.directory_get.called = False @patch('swh.web.ui.service.backend') @istest def lookup_directory(self, mock_backend): # given stub_dir_entries = [{ 'sha1': hex_to_hash('5c6f0e2750f48fa0bd0c4cf5976ba0b9e0' '2ebda5'), 'sha256': hex_to_hash('39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926'), 'sha1_git': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'target': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'dir_id': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'name': b'bob', 'type': 10, }] expected_dir_entries = [{ 'sha1': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5', 'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d2747' 'd3bf96c926', 'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'target': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'dir_id': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'name': 'bob', 'type': 10, }] mock_backend.directory_get = MagicMock( return_value=stub_dir_entries) # when actual_directory = service.lookup_directory( '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') # then self.assertEqual(list(actual_directory), expected_dir_entries) mock_backend.directory_get.assert_called_with( hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'))