diff --git a/swh/web/ui/converters.py b/swh/web/ui/converters.py index 90f3a1b6..ab9a338c 100644 --- a/swh/web/ui/converters.py +++ b/swh/web/ui/converters.py @@ -1,173 +1,187 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.core import hashutil def fmap(f, data): if isinstance(data, list): return [f(x) for x in data] if isinstance(data, dict): return {k: f(v) for (k, v) in data.items()} return f(data) -def from_swh(dict_swh, hashess=[], bytess=[], blacklist=[]): +def from_swh(dict_swh, hashess={}, bytess={}, blacklist={}, + convert={}, convert_fn=lambda x: x): """Convert from an swh dictionary to something reasonably json serializable. Args: - dict_swh: the origin dictionary needed to be transformed - hashess: list/set of keys representing hashes values (sha1, sha256, sha1_git, etc...) as bytes. Those need to be transformed in hexadecimal string - bytess: list/set of keys representing bytes values which needs to be decoded + - blacklist: set of keys to filter out from the conversion + - convert: set of keys whose associated values need to be converted + using convert_fn + - convert_fn: the conversion function to apply on the value of key + in 'convert' The remaining keys are copied as is in the output. Returns: dictionary equivalent as dict_swh only with its keys `converted`. """ def convert_hashes_bytes(v): """v is supposedly a hash as bytes, returns it converted in hex. """ if v and isinstance(v, bytes): return hashutil.hash_to_hex(v) return v def convert_bytes(v): """v is supposedly a bytes string, decode as utf-8. FIXME: Improve decoding policy. If not utf-8, break! """ if v and isinstance(v, bytes): return v.decode('utf-8') return v if not dict_swh: return dict_swh new_dict = {} for key, value in dict_swh.items(): if key in blacklist: continue elif isinstance(value, dict): - new_dict[key] = from_swh(value, hashess, bytess, blacklist) + new_dict[key] = from_swh(value, hashess, bytess, blacklist, + convert, convert_fn) elif key in hashess: new_dict[key] = fmap(convert_hashes_bytes, value) elif key in bytess: new_dict[key] = fmap(convert_bytes, value) + elif key in convert: + new_dict[key] = convert_fn(value) else: new_dict[key] = value return new_dict def from_origin(origin): """Convert from an SWH origin to an origin dictionary. """ return from_swh(origin, hashess=set(['revision']), bytess=set(['path'])) def from_release(release): """Convert from an SWH release to a json serializable release dictionary. Args: release: Dict with the following keys - id: identifier of the revision (sha1 in bytes) - revision: identifier of the revision the release points to (sha1 in bytes) - comment: release's comment message (bytes) - name: release's name (string) - author: release's author identifier (swh's id) - synthetic: the synthetic property (boolean) Returns: Release dictionary with the following keys: - id: hexadecimal sha1 (string) - revision: hexadecimal sha1 (string) - comment: release's comment message (string) - name: release's name (string) - author: release's author identifier (swh's id) - synthetic: the synthetic property (boolean) """ return from_swh(release, hashess=set(['id', 'target']), bytess=set(['message', 'name', 'email'])) def from_revision(revision): """Convert from an SWH revision to a json serializable revision dictionary. Args: revision: Dict with the following keys - id: identifier of the revision (sha1 in bytes) - directory: identifier of the directory the revision points to (sha1 in bytes) - author_name, author_email: author's revision name and email - committer_name, committer_email: committer's revision name and email - message: revision's message - date, date_offset: revision's author date - committer_date, committer_date_offset: revision's commit date - parents: list of parents for such revision - synthetic: revision's property nature - type: revision's type (git, tar or dsc at the moment) - metadata: if the revision is synthetic, this can reference dynamic properties. Returns: Revision dictionary with the same keys as inputs, only: - sha1s are in hexadecimal strings (id, directory) - bytes are decoded in string (author_name, committer_name, author_email, committer_email, message) - remaining keys are left as is """ return from_swh(revision, hashess=set(['id', 'directory', 'parents']), bytess=set(['name', 'email', 'message'])) def from_content(content): """Convert swh content to serializable content dictionary. """ return from_swh(content, hashess={'sha1', 'sha1_git', 'sha256'}, bytess={}, - blacklist={'status'}) + blacklist={}, + convert={'status'}, + convert_fn=lambda v: 'absent' if v == 'hidden' else v) def from_person(person): """Convert swh person to serializable person dictionary. """ return from_swh(person, hashess=set(), bytess=set(['name', 'email'])) def from_directory_entry(dir_entry): """Convert swh person to serializable person dictionary. """ return from_swh(dir_entry, hashess=set(['dir_id', 'sha1_git', 'sha1', 'sha256', 'target']), - bytess=set(['name'])) + bytess=set(['name']), + blacklist={}, + convert={'status'}, + convert_fn=lambda v: 'absent' if v == 'hidden' else v) diff --git a/swh/web/ui/tests/test_converters.py b/swh/web/ui/tests/test_converters.py index 3f6cfbc0..48072701 100644 --- a/swh/web/ui/tests/test_converters.py +++ b/swh/web/ui/tests/test_converters.py @@ -1,363 +1,379 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import unittest from nose.tools import istest from swh.core import hashutil from swh.web.ui import converters class ConvertersTestCase(unittest.TestCase): @istest def from_swh(self): some_input = { 'a': 'something', 'b': 'someone', 'c': b'sharp-0.3.4.tgz', 'd': hashutil.hex_to_hash( 'b04caf10e9535160d90e874b45aa426de762f19f'), 'e': b'sharp.html/doc_002dS_005fISREG.html', 'g': [b'utf-8-to-decode', b'another-one'], 'h': 'something filtered', 'i': {'e': b'something'}, 'j': { 'k': { 'l': [b'bytes thing', b'another thingy'], 'n': 'dont care either' }, 'm': 'dont care' }, - 'o': 'something' + 'o': 'something', + 'p': 'bar', + 'q': 'intact', + 'r': {'p': 'also intact', + 'q': 'bar'}, } expected_output = { 'a': 'something', 'b': 'someone', 'c': 'sharp-0.3.4.tgz', 'd': 'b04caf10e9535160d90e874b45aa426de762f19f', 'e': 'sharp.html/doc_002dS_005fISREG.html', 'g': ['utf-8-to-decode', 'another-one'], 'i': {'e': 'something'}, 'j': { 'k': { 'l': ['bytes thing', 'another thingy'] } }, + 'p': 'foo', + 'q': 'intact', + 'r': {'p': 'also intact', + 'q': 'foo'}, } + def test_convert_fn(v): + return 'foo' if v == 'bar' else v + actual_output = converters.from_swh(some_input, hashess={'d', 'o'}, bytess={'c', 'e', 'g', 'l'}, - blacklist={'h', 'm', 'n', 'o'}) + blacklist={'h', 'm', 'n', 'o'}, + convert={'p', 'q'}, + convert_fn=test_convert_fn) self.assertEquals(expected_output, actual_output) @istest def from_swh_edge_cases_do_no_conversion_if_none_or_not_bytes(self): some_input = { 'a': 'something', 'b': None, 'c': 'someone', 'd': None, } expected_output = { 'a': 'something', 'b': None, 'c': 'someone', 'd': None, } actual_output = converters.from_swh(some_input, hashess={'a', 'b'}, bytess={'c', 'd'}) self.assertEquals(expected_output, actual_output) @istest def from_swh_empty(self): # when self.assertEquals({}, converters.from_swh({})) @istest def from_swh_none(self): # when self.assertIsNone(converters.from_swh(None)) @istest def from_origin(self): # given origin_input = { 'origin_type': 'ftp', 'origin_url': 'rsync://ftp.gnu.org/gnu/octave', 'branch': 'octave-3.4.0.tar.gz', 'revision': b'\xb0L\xaf\x10\xe9SQ`\xd9\x0e\x87KE\xaaBm\xe7b\xf1\x9f', # noqa 'path': b'octave-3.4.0/doc/interpreter/octave.html/doc_002dS_005fISREG.html' # noqa } expected_origin = { 'origin_type': 'ftp', 'origin_url': 'rsync://ftp.gnu.org/gnu/octave', 'branch': 'octave-3.4.0.tar.gz', 'revision': 'b04caf10e9535160d90e874b45aa426de762f19f', 'path': 'octave-3.4.0/doc/interpreter/octave.html/doc_002dS_005fISREG.html' # noqa } # when actual_origin = converters.from_origin(origin_input) # then self.assertEqual(actual_origin, expected_origin) @istest def from_release(self): release_input = { 'id': hashutil.hex_to_hash( 'aad23fa492a0c5fed0708a6703be875448c86884'), 'target': hashutil.hex_to_hash( '5e46d564378afc44b31bb89f99d5675195fbdf67'), 'target_type': 'revision', 'date': datetime.datetime(2015, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc), 'author': { 'name': b'author name', 'email': b'author@email', }, 'name': b'v0.0.1', 'message': b'some comment on release', 'synthetic': True, } expected_release = { 'id': 'aad23fa492a0c5fed0708a6703be875448c86884', 'target': '5e46d564378afc44b31bb89f99d5675195fbdf67', 'target_type': 'revision', 'date': datetime.datetime(2015, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc), 'author': { 'name': 'author name', 'email': 'author@email', }, 'name': 'v0.0.1', 'message': 'some comment on release', 'target_type': 'revision', 'synthetic': True, } # when actual_release = converters.from_release(release_input) # then self.assertEqual(actual_release, expected_release) @istest def from_release_no_revision(self): release_input = { 'id': hashutil.hex_to_hash( 'b2171ee2bdf119cd99a7ec7eff32fa8013ef9a4e'), 'target': None, 'date': datetime.datetime(2016, 3, 2, 10, 0, 0, tzinfo=datetime.timezone.utc), 'name': b'v0.1.1', 'message': b'comment on release', 'synthetic': False, 'author': { 'name': b'bob', 'email': b'bob@alice.net', }, } expected_release = { 'id': 'b2171ee2bdf119cd99a7ec7eff32fa8013ef9a4e', 'target': None, 'date': datetime.datetime(2016, 3, 2, 10, 0, 0, tzinfo=datetime.timezone.utc), 'name': 'v0.1.1', 'message': 'comment on release', 'synthetic': False, 'author': { 'name': 'bob', 'email': 'bob@alice.net', }, } # when actual_release = converters.from_release(release_input) # then self.assertEqual(actual_release, expected_release) @istest def from_revision(self): revision_input = { 'id': hashutil.hex_to_hash( '18d8be353ed3480476f032475e7c233eff7371d5'), 'directory': hashutil.hex_to_hash( '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'), 'author': { 'name': b'Software Heritage', 'email': b'robot@softwareheritage.org', }, 'committer': { 'name': b'Software Heritage', 'email': b'robot@softwareheritage.org', }, 'message': b'synthetic revision message', 'date': datetime.datetime(2000, 1, 17, 11, 23, 54, tzinfo=None), 'date_offset': 0, 'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54, tzinfo=None), 'committer_date_offset': 0, 'synthetic': True, 'type': 'tar', 'parents': [ hashutil.hex_to_hash( '29d8be353ed3480476f032475e7c244eff7371d5'), hashutil.hex_to_hash( '30d8be353ed3480476f032475e7c244eff7371d5') ], 'metadata': { 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912', }] }, } expected_revision = { 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author': { 'name': 'Software Heritage', 'email': 'robot@softwareheritage.org', }, 'committer': { 'name': 'Software Heritage', 'email': 'robot@softwareheritage.org', }, 'message': 'synthetic revision message', 'date': datetime.datetime(2000, 1, 17, 11, 23, 54, tzinfo=None), 'date_offset': 0, 'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54, tzinfo=None), 'committer_date_offset': 0, 'parents': [ '29d8be353ed3480476f032475e7c244eff7371d5', '30d8be353ed3480476f032475e7c244eff7371d5' ], 'type': 'tar', 'synthetic': True, 'metadata': { 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912' }] }, } # when actual_revision = converters.from_revision(revision_input) # then self.assertEqual(actual_revision, expected_revision) @istest def from_content(self): content_input = { 'sha1': hashutil.hex_to_hash('5c6f0e2750f48fa0bd0c4cf5976ba0b9e0' '2ebda5'), 'sha256': hashutil.hex_to_hash('39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926'), 'sha1_git': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'data': b'data in bytes', 'length': 10, - 'status': 'visible', + 'status': 'hidden', } # 'status' is filtered expected_content = { 'sha1': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5', 'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d274' '7d3bf96c926', 'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'data': b'data in bytes', 'length': 10, + 'status': 'absent', } # when actual_content = converters.from_content(content_input) # then self.assertEqual(actual_content, expected_content) @istest def from_person(self): person_input = { 'id': 10, 'anything': 'else', 'name': b'bob', 'email': b'bob@foo.alice', } expected_person = { 'id': 10, 'anything': 'else', 'name': 'bob', 'email': 'bob@foo.alice', } # when actual_person = converters.from_person(person_input) # then self.assertEqual(actual_person, expected_person) @istest def from_directory_entries(self): dir_entries_input = { 'sha1': hashutil.hex_to_hash('5c6f0e2750f48fa0bd0c4cf5976ba0b9e0' '2ebda5'), 'sha256': hashutil.hex_to_hash('39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926'), 'sha1_git': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'target': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'dir_id': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'name': b'bob', 'type': 10, + 'status': 'hidden', } expected_dir_entries = { 'sha1': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5', 'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d2747' 'd3bf96c926', 'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'target': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'dir_id': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'name': 'bob', 'type': 10, + 'status': 'absent', } # when actual_dir_entries = converters.from_directory_entry(dir_entries_input) # then self.assertEqual(actual_dir_entries, expected_dir_entries) diff --git a/swh/web/ui/tests/test_service.py b/swh/web/ui/tests/test_service.py index ae089bef..128bafe6 100644 --- a/swh/web/ui/tests/test_service.py +++ b/swh/web/ui/tests/test_service.py @@ -1,545 +1,547 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from nose.tools import istest from unittest.mock import MagicMock, patch from swh.core.hashutil import hex_to_hash from swh.web.ui import service from swh.web.ui.exc import BadInputExc from swh.web.ui.tests import test_app class ServiceTestCase(test_app.SWHApiTestCase): @istest def lookup_hash_does_not_exist(self): # given self.storage.content_find = MagicMock(return_value=None) # when actual_lookup = service.lookup_hash( 'sha1_git:123caf10e9535160d90e874b45aa426de762f19f') # then self.assertEquals({'found': None, 'algo': 'sha1_git'}, actual_lookup) # check the function has been called with parameters self.storage.content_find.assert_called_with({ 'sha1_git': hex_to_hash('123caf10e9535160d90e874b45aa426de762f19f') }) @istest def lookup_hash_exist(self): # given stub_content = { 'sha1': hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f') } self.storage.content_find = MagicMock(return_value=stub_content) # when actual_lookup = service.lookup_hash( 'sha1:456caf10e9535160d90e874b45aa426de762f19f') # then self.assertEquals({'found': stub_content, 'algo': 'sha1'}, actual_lookup) self.storage.content_find.assert_called_with({ 'sha1': hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f'), }) @istest def lookup_hash_origin(self): # given self.storage.content_find_occurrence = MagicMock(return_value={ 'origin_type': 'sftp', 'origin_url': 'sftp://ftp.gnu.org/gnu/octave', 'branch': 'octavio-3.4.0.tar.gz', 'revision': b'\xb0L\xaf\x10\xe9SQ`\xd9\x0e\x87KE\xaaBm\xe7b\xf1\x9f', # noqa 'path': b'octavio-3.4.0/doc/interpreter/octave.html/doc_002dS_005fISREG.html' # noqa }) expected_origin = { 'origin_type': 'sftp', 'origin_url': 'sftp://ftp.gnu.org/gnu/octave', 'branch': 'octavio-3.4.0.tar.gz', 'revision': 'b04caf10e9535160d90e874b45aa426de762f19f', 'path': 'octavio-3.4.0/doc/interpreter/octave.html/doc' '_002dS_005fISREG.html' } # when actual_origin = service.lookup_hash_origin( 'sha1_git:456caf10e9535160d90e874b45aa426de762f19f') # then self.assertEqual(actual_origin, expected_origin) self.storage.content_find_occurrence.assert_called_with( {'sha1_git': hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f')}) @istest def stat_counters(self): # given input_stats = { "content": 1770830, "directory": 211683, "directory_entry_dir": 209167, "directory_entry_file": 1807094, "directory_entry_rev": 0, "entity": 0, "entity_history": 0, "occurrence": 0, "occurrence_history": 19600, "origin": 1096, "person": 0, "release": 8584, "revision": 7792, "revision_history": 0, "skipped_content": 0 } self.storage.stat_counters = MagicMock(return_value=input_stats) # when actual_stats = service.stat_counters() # then expected_stats = input_stats self.assertEqual(actual_stats, expected_stats) self.storage.stat_counters.assert_called_with() @patch('swh.web.ui.service.hashutil') @istest def hash_and_search(self, mock_hashutil): # given bhash = hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f') mock_hashutil.hashfile.return_value = {'sha1': bhash} self.storage.content_find = MagicMock(return_value={ 'sha1': bhash, 'sha1_git': bhash, }) # when actual_content = service.hash_and_search('/some/path') # then self.assertEqual(actual_content, { 'sha1': '456caf10e9535160d90e874b45aa426de762f19f', 'sha1_git': '456caf10e9535160d90e874b45aa426de762f19f', 'found': True, }) mock_hashutil.hashfile.assert_called_once_with('/some/path') self.storage.content_find.assert_called_once_with({'sha1': bhash}) @patch('swh.web.ui.service.hashutil') @istest def hash_and_search_not_found(self, mock_hashutil): # given bhash = hex_to_hash('456caf10e9535160d90e874b45aa426de762f19f') mock_hashutil.hashfile.return_value = {'sha1': bhash} mock_hashutil.hash_to_hex = MagicMock( return_value='456caf10e9535160d90e874b45aa426de762f19f') self.storage.content_find = MagicMock(return_value=None) # when actual_content = service.hash_and_search('/some/path') # then self.assertEqual(actual_content, { 'sha1': '456caf10e9535160d90e874b45aa426de762f19f', 'found': False, }) mock_hashutil.hashfile.assert_called_once_with('/some/path') self.storage.content_find.assert_called_once_with({'sha1': bhash}) mock_hashutil.hash_to_hex.assert_called_once_with(bhash) @patch('swh.web.ui.service.upload') @istest def test_upload_and_search(self, mock_upload): mock_upload.save_in_upload_folder.return_value = ( '/tmp/dir', 'some-filename', '/tmp/dir/path/some-filename') service.hash_and_search = MagicMock(side_effect=lambda filepath: {'sha1': 'blah', 'found': True}) mock_upload.cleanup.return_value = None file = MagicMock(filename='some-filename') # when actual_res = service.upload_and_search(file) # then self.assertEqual(actual_res, { 'filename': 'some-filename', 'sha1': 'blah', 'found': True}) mock_upload.save_in_upload_folder.assert_called_with(file) mock_upload.cleanup.assert_called_with('/tmp/dir') service.hash_and_search.assert_called_once_with( '/tmp/dir/path/some-filename') @istest def lookup_origin(self): # given self.storage.origin_get = MagicMock(return_value={ 'id': 'origin-id', 'lister': 'uuid-lister', 'project': 'uuid-project', 'url': 'ftp://some/url/to/origin', 'type': 'ftp'}) # when actual_origin = service.lookup_origin('origin-id') # then self.assertEqual(actual_origin, {'id': 'origin-id', 'lister': 'uuid-lister', 'project': 'uuid-project', 'url': 'ftp://some/url/to/origin', 'type': 'ftp'}) self.storage.origin_get.assert_called_with({'id': 'origin-id'}) @istest def lookup_release(self): # given self.storage.release_get = MagicMock(return_value=[{ 'id': hex_to_hash('65a55bbdf3629f916219feb3dcc7393ded1bc8db'), 'target': None, 'date': datetime.datetime(2015, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc), 'name': b'v0.0.1', 'message': b'synthetic release', 'synthetic': True, }]) # when actual_release = service.lookup_release( '65a55bbdf3629f916219feb3dcc7393ded1bc8db') # then self.assertEqual(actual_release, { 'id': '65a55bbdf3629f916219feb3dcc7393ded1bc8db', 'target': None, 'date': datetime.datetime(2015, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc), 'name': 'v0.0.1', 'message': 'synthetic release', 'synthetic': True, }) self.storage.release_get.assert_called_with( [hex_to_hash('65a55bbdf3629f916219feb3dcc7393ded1bc8db')]) @istest def lookup_release_ko_id_checksum_not_ok_because_not_a_sha1(self): # given self.storage.release_get = MagicMock() with self.assertRaises(BadInputExc) as cm: # when service.lookup_release('not-a-sha1') self.assertIn('invalid checksum', cm.exception.args[0]) self.storage.release_get.called = False @istest def lookup_release_ko_id_checksum_ok_but_not_a_sha1(self): # given self.storage.release_get = MagicMock() # when with self.assertRaises(BadInputExc) as cm: service.lookup_release( '13c1d34d138ec13b5ebad226dc2528dc7506c956e4646f62d4daf5' '1aea892abe') self.assertIn('sha1_git supported', cm.exception.args[0]) self.storage.release_get.called = False @istest def lookup_revision(self): # given self.storage.revision_get = MagicMock(return_value=[{ 'id': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'), 'directory': hex_to_hash( '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'), 'author': { 'name': b'bill & boule', 'email': b'bill@boule.org', }, 'committer': { 'name': b'boule & bill', 'email': b'boule@bill.org', }, 'message': b'elegant fix for bug 31415957', 'date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'date_offset': 0, 'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'committer_date_offset': 0, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], }]) # when actual_revision = service.lookup_revision( '18d8be353ed3480476f032475e7c233eff7371d5') # then self.assertEqual(actual_revision, { 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author': { 'name': 'bill & boule', 'email': 'bill@boule.org', }, 'committer': { 'name': 'boule & bill', 'email': 'boule@bill.org', }, 'message': 'elegant fix for bug 31415957', 'date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'date_offset': 0, 'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'committer_date_offset': 0, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], }) self.storage.revision_get.assert_called_with( [hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5')]) @istest def lookup_content_raw_not_found(self): # given self.storage.content_find = MagicMock(return_value=None) # when actual_content = service.lookup_content_raw( 'sha1:18d8be353ed3480476f032475e7c233eff7371d5') # then self.assertIsNone(actual_content) self.storage.content_find.assert_called_with( {'sha1': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5')}) @istest def lookup_content_raw(self): # given self.storage.content_find = MagicMock(return_value={ 'sha1': '18d8be353ed3480476f032475e7c233eff7371d5', }) self.storage.content_get = MagicMock(return_value=[{ 'data': b'binary data', }, {}]) # when actual_content = service.lookup_content_raw( 'sha256:39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926') # then self.assertEquals(actual_content, {'data': b'binary data'}) self.storage.content_find.assert_called_once_with( {'sha256': hex_to_hash('39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926')}) self.storage.content_get.assert_called_once_with( ['18d8be353ed3480476f032475e7c233eff7371d5']) @istest def lookup_content_not_found(self): # given self.storage.content_find = MagicMock(return_value=None) # when actual_content = service.lookup_content( 'sha1:18d8be353ed3480476f032475e7c233eff7371d5') # then self.assertIsNone(actual_content) self.storage.content_find.assert_called_with( {'sha1': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5')}) @istest def lookup_content_with_sha1(self): # given self.storage.content_find = MagicMock(return_value={ 'sha1': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'), 'sha256': hex_to_hash('39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926'), 'sha1_git': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'length': 190, - 'status': 'absent', + 'status': 'hidden', }) # when actual_content = service.lookup_content( 'sha1:18d8be353ed3480476f032475e7c233eff7371d5') # then self.assertEqual(actual_content, { 'sha1': '18d8be353ed3480476f032475e7c233eff7371d5', 'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d274' '7d3bf96c926', 'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'length': 190, + 'status': 'absent', }) self.storage.content_find.assert_called_with( {'sha1': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5')}) @istest def lookup_content_with_sha256(self): # given self.storage.content_find = MagicMock(return_value={ 'sha1': hex_to_hash('18d8be353ed3480476f032475e7c233eff7371d5'), 'sha256': hex_to_hash('39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926'), 'sha1_git': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'length': 360, 'status': 'visible', }) # when actual_content = service.lookup_content( 'sha256:39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926') # then self.assertEqual(actual_content, { 'sha1': '18d8be353ed3480476f032475e7c233eff7371d5', 'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d274' '7d3bf96c926', 'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'length': 360, + 'status': 'visible', }) self.storage.content_find.assert_called_with( {'sha256': hex_to_hash('39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926')}) @istest def lookup_person(self): # given self.storage.person_get = MagicMock(return_value=[{ 'id': 'person_id', 'name': b'some_name', 'email': b'some-email', }]) # when actual_person = service.lookup_person('person_id') # then self.assertEqual(actual_person, { 'id': 'person_id', 'name': 'some_name', 'email': 'some-email', }) self.storage.person_get.assert_called_with(['person_id']) @istest def lookup_person_not_found(self): # given self.storage.person_get = MagicMock(return_value=[]) # when actual_person = service.lookup_person('person_id') # then self.assertIsNone(actual_person) self.storage.person_get.assert_called_with(['person_id']) @istest def lookup_directory_bad_checksum(self): # given self.storage.directory_get = MagicMock() # when with self.assertRaises(BadInputExc): service.lookup_directory('directory_id') # then self.storage.directory_get.called = False @istest def lookup_directory_not_found(self): # given self.storage.directory_get = MagicMock(return_value=[]) # when actual_directory = service.lookup_directory( '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') # then self.assertIsNone(actual_directory) self.storage.directory_get.assert_called_with( hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')) @istest def lookup_directory(self): # given dir_entries_input = { 'sha1': hex_to_hash('5c6f0e2750f48fa0bd0c4cf5976ba0b9e0' '2ebda5'), 'sha256': hex_to_hash('39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926'), 'sha1_git': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'target': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'dir_id': hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'name': b'bob', 'type': 10, } expected_dir_entries = { 'sha1': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5', 'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d2747' 'd3bf96c926', 'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'target': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'dir_id': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'name': 'bob', 'type': 10, } self.storage.directory_get = MagicMock( return_value=[dir_entries_input]) # when actual_directory = service.lookup_directory( '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') # then self.assertIsNotNone(actual_directory) self.assertEqual(actual_directory, [expected_dir_entries]) self.storage.directory_get.assert_called_with( hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'))