diff --git a/swh/web/ui/apidoc.py b/swh/web/ui/apidoc.py index 76f5ffe28..8575a2e85 100644 --- a/swh/web/ui/apidoc.py +++ b/swh/web/ui/apidoc.py @@ -1,421 +1,421 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import re from functools import wraps from enum import Enum from flask import request, render_template, url_for from flask import g from swh.web.ui.main import app class argtypes(Enum): # noqa: N801 """Class for centralizing argument type descriptions """ ts = 'timestamp' int = 'integer' str = 'string' path = 'path' sha1 = 'sha1' uuid = 'uuid' sha1_git = 'sha1_git' algo_and_hash = 'algo_hash:hash' class rettypes(Enum): # noqa: N801 """Class for centralizing return type descriptions """ octet_stream = 'octet stream' list = 'list' dict = 'dict' class excs(Enum): # noqa: N801 """Class for centralizing exception type descriptions """ badinput = 'BadInputExc' notfound = 'NotFoundExc' class APIUrls(object): """ Class to manage API documentation URLs. * Indexes all routes documented using apidoc's decorators. * Tracks endpoint/request processing method relationships for use in generating related urls in API documentation Relies on the load_controllers logic in main.py for initialization. """ apidoc_routes = {} method_endpoints = {} @classmethod def get_app_endpoints(cls): return cls.apidoc_routes @classmethod def get_method_endpoints(cls, fname): if len(cls.method_endpoints) == 0: cls.method_endpoints = cls.group_routes_by_method() return cls.method_endpoints[fname] @classmethod def group_routes_by_method(cls): """ Group URL endpoints according to their processing method. Returns: A dict where keys are the processing method names, and values are the routes that are bound to the key method. """ endpoints = {} for rule in app.url_map.iter_rules(): rule_dict = {'rule': rule.rule, 'methods': rule.methods} if rule.endpoint not in endpoints: endpoints[rule.endpoint] = [rule_dict] else: endpoints[rule.endpoint].append(rule_dict) return endpoints @classmethod def index_add_route(cls, route, docstring, **kwargs): """ Add a route to the self-documenting API reference """ if route not in cls.apidoc_routes: d = {'docstring': docstring} for k, v in kwargs.items(): d[k] = v cls.apidoc_routes[route] = d class APIDocException(Exception): """ Custom exception to signal errors in the use of the APIDoc decorators """ class APIDocBase(object): """ The API documentation decorator base class, responsible for the operations that link the decorator stack together: * manages the _inner_dec property, which represents the decorator directly below self in the decorator tower * contains the logic used to return appropriately if self is the last decorator to be applied to the API function """ def __init__(self): self._inner_dec = None @property def inner_dec(self): return self._inner_dec @inner_dec.setter def inner_dec(self, instance): self._inner_dec = instance @property def data(self): raise NotImplementedError def process_rv(self, f, args, kwargs): """ From the arguments f has, determine whether or not it is the last decorator in the stack, and return the appropriate call to f. """ rv = None if 'outer_decorator' in f.__code__.co_varnames: rv = f(*args, **kwargs) else: nargs = {k: v for k, v in kwargs.items() if k != 'outer_decorator'} try: rv = f(*args, **nargs) except (TypeError, KeyError): # documentation call rv = None return rv def maintain_stack(self, f, args, kwargs): """ From the arguments f is called with, determine whether or not the stack link was made by @apidoc.route, and maintain the linking for the next call to f. """ if 'outer_decorator' not in kwargs: raise APIDocException('Apidoc %s: expected an apidoc' ' route decorator first' % self.__class__.__name__) kwargs['outer_decorator'].inner_dec = self kwargs['outer_decorator'] = self return self.process_rv(f, args, kwargs) class route(APIDocBase): # noqa: N801 """Decorate an API method to register it in the API doc route index and create the corresponding Flask route. This decorator is responsible for bootstrapping the linking of subsequent decorators, as well as traversing the decorator stack to obtain the documentation data from it. Args: route: documentation page's route noargs: set to True if the route has no arguments, and its result should be displayed anytime its documentation is requested. Default to False hidden: set to True to remove the endpoint from being listed in the /api endpoints. Default to False. tags: Further information on api endpoints. Two values are possibly expected: - hidden: remove the entry points from the listing - upcoming: display the entry point but it is not followable """ def __init__(self, route, noargs=False, tags=[]): super().__init__() self.route = route self.noargs = noargs self.tags = set(tags) def __call__(self, f): options = {} if 'hidden' not in self.tags: if self.tags: options['tags'] = self.tags APIUrls.index_add_route(self.route, f.__doc__, **options) @wraps(f) def doc_func(*args, **kwargs): kwargs['outer_decorator'] = self rv = self.process_rv(f, args, kwargs) return self.compute_return(f, rv) if not self.noargs: app.add_url_rule(self.route, f.__name__, doc_func) return doc_func def filter_api_url(self, endpoint, route_re, noargs): doc_methods = {'GET', 'HEAD', 'OPTIONS'} if re.match(route_re, endpoint['rule']): if endpoint['methods'] == doc_methods and not noargs: return False return True def build_examples(self, f, urls, args): """Build example documentation. Args: f: function urls: information relative to url for that function args: information relative to arguments for that function Yields: example based on default parameter value if any """ s = set() r = [] for data_url in urls: url = data_url['rule'] defaults = {arg['name']: arg['default'] for arg in args if arg['name'] in url} if defaults: url = url_for(f.__name__, **defaults) if url in s: continue s.add(url) r.append(url) return r def compute_return(self, f, rv): """Build documentation""" data = self.data if not f.__doc__: raise APIDocException('Apidoc %s: expected a docstring' ' for function %s' % (self.__class__.__name__, f.__name__)) data['docstring'] = f.__doc__ route_re = re.compile('.*%s$' % data['route']) endpoint_list = APIUrls.get_method_endpoints(f.__name__) data['urls'] = [url for url in endpoint_list if self.filter_api_url(url, route_re, data['noargs'])] if 'args' in data: data['examples'] = self.build_examples( f, data['urls'], data['args']) # Prepare and send to mimetype selector if it's not a doc request if re.match(route_re, request.url) and not data['noargs'] \ and request.method == 'GET': return app.response_class( render_template('apidoc.html', **data), content_type='text/html') g.doc_env = data # Store for response processing return rv @property def data(self): data = {'route': self.route, 'noargs': self.noargs} doc_instance = self.inner_dec while doc_instance: if isinstance(doc_instance, arg): if 'args' not in data: data['args'] = [] data['args'].append(doc_instance.data) elif isinstance(doc_instance, raises): if 'excs' not in data: data['excs'] = [] data['excs'].append(doc_instance.data) elif isinstance(doc_instance, returns): data['return'] = doc_instance.data elif isinstance(doc_instance, header): if 'headers' not in data: data['headers'] = [] data['headers'].append(doc_instance.data) elif isinstance(doc_instance, param): if 'params' not in data: data['params'] = [] data['params'].append(doc_instance.data) else: raise APIDocException('Unknown API documentation decorator') doc_instance = doc_instance.inner_dec return data class BaseDescribeDocBase(APIDocBase): """Base description of optional input/output setup for a route. """ def __init__(self): self.doc_data = None self.inner_dec = None def __call__(self, f): @wraps(f) def arg_fun(*args, outer_decorator=None, **kwargs): kwargs['outer_decorator'] = outer_decorator return self.maintain_stack(f, args, kwargs) return arg_fun @property def data(self): return self.doc_data -class arg(BaseDescribeDocBase): +class arg(BaseDescribeDocBase): # noqa: N801 """ Decorate an API method to display an argument's information on the doc page specified by @route above. Args: name: the argument's name. MUST match the method argument's name to create the example request URL. default: the argument's default value argtype: the argument's type as an Enum value from apidoc.argtypes argdoc: the argument's documentation string """ def __init__(self, name, default, argtype, argdoc): super().__init__() self.doc_data = { 'name': name, 'type': argtype.value, 'doc': argdoc, 'default': default } -class header(BaseDescribeDocBase): +class header(BaseDescribeDocBase): # noqa: N801 """ Decorate an API method to display header information the api can potentially return in the response. Args: name: the header name doc: the information about that header """ def __init__(self, name, doc): super().__init__() self.doc_data = { 'name': name, 'doc': doc, } -class param(BaseDescribeDocBase): +class param(BaseDescribeDocBase): # noqa: N801 """Decorate an API method to display query parameter information the api can potentially accept. Args: name: the parameter name default: default value doc: the information about that header """ def __init__(self, name, default, doc): super().__init__() self.doc_data = { 'name': name, 'default': default, 'doc': doc, } -class raises(BaseDescribeDocBase): +class raises(BaseDescribeDocBase): # noqa: N801 """Decorate an API method to display information pertaining to an exception that can be raised by this method. Args: exc: the exception name as an Enum value from apidoc.excs doc: the exception's documentation string """ def __init__(self, exc, doc): super().__init__() self.doc_data = { 'exc': exc.value, 'doc': doc } -class returns(BaseDescribeDocBase): +class returns(BaseDescribeDocBase): # noqa: N801 """Decorate an API method to display information about its return value. Args: rettype: the return value's type as an Enum value from apidoc.rettypes retdoc: the return value's documentation string """ def __init__(self, rettype=None, retdoc=None): super().__init__() self.doc_data = { 'type': rettype.value, 'doc': retdoc } diff --git a/swh/web/ui/tests/test_apidoc.py b/swh/web/ui/tests/test_apidoc.py index 6f0237f87..d865c3af6 100644 --- a/swh/web/ui/tests/test_apidoc.py +++ b/swh/web/ui/tests/test_apidoc.py @@ -1,127 +1,130 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information from nose.tools import istest, nottest from swh.web.ui import apidoc from swh.web.ui.main import app from swh.web.ui.tests.test_app import SWHApidocTestCase class APIDocTestCase(SWHApidocTestCase): def setUp(self): self.arg_dict = { 'name': 'my_pretty_arg', 'default': 'some default value', 'type': apidoc.argtypes.sha1, 'doc': 'this arg does things' } self.stub_excs = [{'exc': apidoc.excs.badinput, 'doc': 'My exception documentation'}] self.stub_args = [{'name': 'stub_arg', 'default': 'some_default'}] self.stub_rule_list = [ {'rule': 'some/route/with/args/', 'methods': {'GET', 'HEAD', 'OPTIONS'}}, {'rule': 'some/doc/route/', 'methods': {'GET', 'HEAD', 'OPTIONS'}}, {'rule': 'some/other/route/', 'methods': {'GET', 'HEAD', 'OPTIONS'}} ] self.stub_return = { 'type': apidoc.rettypes.dict.value, 'doc': 'a dict with amazing properties' } + @staticmethod @apidoc.route('/my/nodoc/url/') @nottest def apidoc_nodoc_tester(arga, argb): return arga + argb @istest def apidoc_nodoc_failure(self): with self.assertRaises(Exception): self.client.get('/my/nodoc/url/') @istest def apidoc_badorder_failure(self): with self.assertRaises(AssertionError): @app.route('/my/badorder/url//') @apidoc.arg('foo', default=True, argtype=apidoc.argtypes.int, argdoc='It\'s so fluffy!') @apidoc.route('/my/badorder/url/') @nottest def apidoc_badorder_tester(foo, bar=0): """ Some irrelevant doc since the decorators are bad """ return foo + bar + @staticmethod @app.route('/some///') @apidoc.route('/some/doc/route/') @nottest def apidoc_route_tester(myarg, myotherarg, akw=0): """ Sample doc """ return {'result': myarg + myotherarg + akw} @istest def apidoc_route_doc(self): # when rv = self.client.get('/some/doc/route/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('apidoc.html') @istest def apidoc_route_fn(self): # when rv = self.client.get('/some/1/1/') # then self.assertEqual(rv.status_code, 200) + @staticmethod @app.route('/some/full///') @apidoc.route('/some/complete/doc/route/') @apidoc.arg('myarg', default=67, argtype=apidoc.argtypes.int, argdoc='my arg') @apidoc.param('limit', default=10, doc='Result limitation') @apidoc.header('Link', doc='Header link returns for pagination purpose') @apidoc.raises(exc=apidoc.excs.badinput, doc='Oops') @apidoc.returns(rettype=apidoc.rettypes.dict, retdoc='sum of args') @nottest def apidoc_full_stack_tester(myarg, myotherarg, akw=0): """ Sample doc """ return {'result': myarg + myotherarg + akw} @istest def apidoc_full_stack_doc(self): # when rv = self.client.get('/some/complete/doc/route/') # then self.assertEqual(rv.status_code, 200) self.assert_template_used('apidoc.html') @istest def apidoc_full_stack_fn(self): # when rv = self.client.get('/some/full/1/1/') # then self.assertEqual(rv.status_code, 200) diff --git a/swh/web/ui/tests/test_backend.py b/swh/web/ui/tests/test_backend.py index dd94074d2..00713dbcc 100644 --- a/swh/web/ui/tests/test_backend.py +++ b/swh/web/ui/tests/test_backend.py @@ -1,935 +1,935 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime from nose.tools import istest from unittest.mock import MagicMock from swh.core import hashutil from swh.web.ui import backend from swh.web.ui.tests import test_app class BackendTestCase(test_app.SWHApiTestCase): def setUp(self): self.origin_visit1 = { 'date': datetime.datetime( 2015, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc), 'origin': 1, 'visit': 1 } @istest def content_get_ko_not_found_1(self): # given sha1_bin = hashutil.hex_to_hash( '456caf10e9535160d90e874b45aa426de762f777') self.storage.content_get = MagicMock(return_value=None) # when actual_content = backend.content_get(sha1_bin) # then self.assertIsNone(actual_content) self.storage.content_get.assert_called_once_with( [sha1_bin]) @istest def content_get_ko_not_found_empty_result(self): # given sha1_bin = hashutil.hex_to_hash( '456caf10e9535160d90e874b45aa426de762f19f') self.storage.content_get = MagicMock(return_value=[]) # when actual_content = backend.content_get(sha1_bin) # then self.assertIsNone(actual_content) self.storage.content_get.assert_called_once_with( [sha1_bin]) @istest def content_ctags_search_1(self): # given self.storage.content_ctags_search = MagicMock( return_value="some-result") # when actual_ctags = backend.content_ctags_search( 'foo', last_sha1='some-hash', limit=1) # then self.assertEquals(actual_ctags, 'some-result') self.storage.content_ctags_search.assert_called_once_with( 'foo', last_sha1='some-hash', limit=1) @istest def content_ctags_search_2(self): # given self.storage.content_ctags_search = MagicMock( return_value="some other result") # when actual_ctags = backend.content_ctags_search( 'foo|bar', last_sha1='some-hash', limit=2) # then self.assertEquals(actual_ctags, 'some other result') self.storage.content_ctags_search.assert_called_once_with( 'foo|bar', last_sha1='some-hash', limit=2) @istest def content_ctags_search_3(self): # given self.storage.content_ctags_search = MagicMock( return_value="yet another result") # when actual_ctags = backend.content_ctags_search( 'bar', last_sha1='some-hash', limit=1000) # then self.assertEquals(actual_ctags, 'yet another result') self.storage.content_ctags_search.assert_called_once_with( 'bar', last_sha1='some-hash', limit=50) @istest def content_get(self): # given sha1_bin = hashutil.hex_to_hash( '123caf10e9535160d90e874b45aa426de762f19f') stub_contents = [{ 'sha1': sha1_bin, 'data': b'binary data', }] self.storage.content_get = MagicMock(return_value=stub_contents) # when actual_content = backend.content_get(sha1_bin) # then self.assertEquals(actual_content, stub_contents[0]) self.storage.content_get.assert_called_once_with( [sha1_bin]) @istest def content_find_ko_no_result(self): # given sha1_bin = hashutil.hex_to_hash( '123caf10e9535160d90e874b45aa426de762f19f') self.storage.content_find = MagicMock(return_value=None) # when actual_lookup = backend.content_find('sha1_git', sha1_bin) # then self.assertIsNone(actual_lookup) self.storage.content_find.assert_called_once_with( {'sha1_git': sha1_bin}) @istest def content_find(self): # given sha1_bin = hashutil.hex_to_hash( '456caf10e9535160d90e874b45aa426de762f19f') self.storage.content_find = MagicMock(return_value=(1, 2, 3)) # when actual_content = backend.content_find('sha1', sha1_bin) # then self.assertEquals(actual_content, (1, 2, 3)) # check the function has been called with parameters self.storage.content_find.assert_called_with({'sha1': sha1_bin}) @istest def content_find_provenance_ko_no_result(self): # given sha1_bin = hashutil.hex_to_hash( '123caf10e9535160d90e874b45aa426de762f19f') self.storage.content_find_provenance = MagicMock( return_value=(x for x in [])) # when actual_lookup = backend.content_find_provenance('sha1_git', sha1_bin) # then self.assertEquals(list(actual_lookup), []) self.storage.content_find_provenance.assert_called_once_with( {'sha1_git': sha1_bin}) @istest def content_ctags_get(self): # given sha1_bin = hashutil.hex_to_hash( '456caf10e9535160d90e874b45aa426de762f19f') self.storage.content_ctags_get = MagicMock( return_value=[1, 2, 3]) # when actual_content = backend.content_ctags_get(sha1_bin) # then self.assertEquals(actual_content, [1, 2, 3]) self.storage.content_ctags_get.assert_called_with( [sha1_bin]) @istest def content_ctags_get_no_result(self): # given sha1_bin = hashutil.hex_to_hash( '456caf10e9535160d90e874b45aa426de762f19f') self.storage.content_ctags_get = MagicMock( return_value=[]) # when actual_content = backend.content_ctags_get(sha1_bin) # then self.assertEquals(actual_content, []) self.storage.content_ctags_get.assert_called_with( [sha1_bin]) @istest def content_filetype_get(self): # given sha1_bin = hashutil.hex_to_hash( '456caf10e9535160d90e874b45aa426de762f19f') self.storage.content_mimetype_get = MagicMock( return_value=[1, 2, 3]) # when actual_content = backend.content_filetype_get(sha1_bin) # then self.assertEquals(actual_content, 1) self.storage.content_mimetype_get.assert_called_with( [sha1_bin]) @istest def content_filetype_get_no_result(self): # given sha1_bin = hashutil.hex_to_hash( '456caf10e9535160d90e874b45aa426de762f19f') self.storage.content_mimetype_get = MagicMock( return_value=[]) # when actual_content = backend.content_filetype_get(sha1_bin) # then self.assertIsNone(actual_content) self.storage.content_mimetype_get.assert_called_with( [sha1_bin]) @istest def content_language_get(self): # given sha1_bin = hashutil.hex_to_hash( '456caf10e9535160d90e874b45aa426de762f19f') self.storage.content_language_get = MagicMock( return_value=[1, 2, 3]) # when actual_content = backend.content_language_get(sha1_bin) # then self.assertEquals(actual_content, 1) self.storage.content_language_get.assert_called_with( [sha1_bin]) @istest def content_language_get_no_result(self): # given sha1_bin = hashutil.hex_to_hash( '456caf10e9535160d90e874b45aa426de762f19f') self.storage.content_language_get = MagicMock( return_value=[]) # when actual_content = backend.content_language_get(sha1_bin) # then self.assertIsNone(actual_content) self.storage.content_language_get.assert_called_with( [sha1_bin]) @istest def content_license_get(self): # given sha1_bin = hashutil.hex_to_hash( '456caf10e9535160d90e874b45aa426de762f19f') self.storage.content_fossology_license_get = MagicMock( return_value=[1, 2, 3]) # when actual_content = backend.content_license_get(sha1_bin) # then self.assertEquals(actual_content, 1) self.storage.content_fossology_license_get.assert_called_with( [sha1_bin]) @istest def content_license_get_no_result(self): # given sha1_bin = hashutil.hex_to_hash( '456caf10e9535160d90e874b45aa426de762f19f') self.storage.content_fossology_license_get = MagicMock( return_value=[]) # when actual_content = backend.content_license_get(sha1_bin) # then self.assertIsNone(actual_content) self.storage.content_fossology_license_get.assert_called_with( [sha1_bin]) @istest def content_find_provenance(self): # given sha1_bin = hashutil.hex_to_hash( '456caf10e9535160d90e874b45aa426de762f19f') self.storage.content_find_provenance = MagicMock( return_value=(x for x in (1, 2, 3))) # when actual_content = backend.content_find_provenance('sha1', sha1_bin) # then self.assertEquals(list(actual_content), [1, 2, 3]) # check the function has been called with parameters self.storage.content_find_provenance.assert_called_with( {'sha1': sha1_bin}) @istest def content_missing_per_sha1_none(self): # given sha1s_bin = [hashutil.hex_to_hash( '456caf10e9535160d90e874b45aa426de762f19f'), hashutil.hex_to_hash( '745bab676c8f3cec8016e0c39ea61cf57e518865' )] self.storage.content_missing_per_sha1 = MagicMock(return_value=[]) # when actual_content = backend.content_missing_per_sha1(sha1s_bin) # then self.assertEquals(actual_content, []) self.storage.content_missing_per_sha1.assert_called_with(sha1s_bin) @istest def content_missing_per_sha1_some(self): # given sha1s_bin = [hashutil.hex_to_hash( '456caf10e9535160d90e874b45aa426de762f19f'), hashutil.hex_to_hash( '745bab676c8f3cec8016e0c39ea61cf57e518865' )] self.storage.content_missing_per_sha1 = MagicMock(return_value=[ hashutil.hex_to_hash( '745bab676c8f3cec8016e0c39ea61cf57e518865' )]) # when actual_content = backend.content_missing_per_sha1(sha1s_bin) # then self.assertEquals(actual_content, [hashutil.hex_to_hash( '745bab676c8f3cec8016e0c39ea61cf57e518865' )]) self.storage.content_missing_per_sha1.assert_called_with(sha1s_bin) @istest def origin_get_by_id(self): # given self.storage.origin_get = MagicMock(return_value={ 'id': 'origin-id', 'lister': 'uuid-lister', 'project': 'uuid-project', 'url': 'ftp://some/url/to/origin', 'type': 'ftp'}) # when actual_origin = backend.origin_get({'id': 'origin-id'}) # then self.assertEqual(actual_origin, {'id': 'origin-id', 'lister': 'uuid-lister', 'project': 'uuid-project', 'url': 'ftp://some/url/to/origin', 'type': 'ftp'}) self.storage.origin_get.assert_called_with({'id': 'origin-id'}) @istest def origin_get_by_type_url(self): # given self.storage.origin_get = MagicMock(return_value={ 'id': 'origin-id', 'lister': 'uuid-lister', 'project': 'uuid-project', 'url': 'ftp://some/url/to/origin', 'type': 'ftp'}) # when actual_origin = backend.origin_get({'type': 'ftp', 'url': 'ftp://some/url/to/origin'}) # then self.assertEqual(actual_origin, {'id': 'origin-id', 'lister': 'uuid-lister', 'project': 'uuid-project', 'url': 'ftp://some/url/to/origin', 'type': 'ftp'}) self.storage.origin_get.assert_called_with( {'type': 'ftp', 'url': 'ftp://some/url/to/origin'}) @istest def person_get(self): # given self.storage.person_get = MagicMock(return_value=[{ 'id': 'person-id', 'name': 'blah'}]) # when actual_person = backend.person_get('person-id') # then self.assertEqual(actual_person, {'id': 'person-id', 'name': 'blah'}) self.storage.person_get.assert_called_with(['person-id']) @istest def directory_get_not_found(self): # given sha1_bin = hashutil.hex_to_hash( '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') self.storage.directory_get = MagicMock(return_value=None) # when actual_directory = backend.directory_get(sha1_bin) # then self.assertEquals(actual_directory, None) self.storage.directory_get.assert_called_with([sha1_bin]) @istest def directory_get(self): # given sha1_bin = hashutil.hex_to_hash( '51f71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') sha1_bin2 = hashutil.hex_to_hash( '62071b8614fcd89ccd17ca2b1d9e66c5b00a6d03') stub_dir = {'id': sha1_bin, 'revision': b'sha1-blah'} stub_dir2 = {'id': sha1_bin2, 'revision': b'sha1-foobar'} self.storage.directory_get = MagicMock(return_value=[stub_dir, stub_dir2]) # when actual_directory = backend.directory_get(sha1_bin) # then self.assertEquals(actual_directory, stub_dir) self.storage.directory_get.assert_called_with([sha1_bin]) @istest def directory_ls_empty_result(self): # given sha1_bin = hashutil.hex_to_hash( '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') self.storage.directory_ls = MagicMock(return_value=[]) # when actual_directory = backend.directory_ls(sha1_bin) # then self.assertEquals(actual_directory, []) self.storage.directory_ls.assert_called_with(sha1_bin, False) @istest def directory_ls(self): # given sha1_bin = hashutil.hex_to_hash( '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03') stub_dir_entries = [{ 'sha1': hashutil.hex_to_hash('5c6f0e2750f48fa0bd0c4cf5976ba0b9e0' '2ebda5'), 'sha256': hashutil.hex_to_hash('39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926'), 'sha1_git': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'target': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'dir_id': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'name': b'bob', 'type': 10, }] self.storage.directory_ls = MagicMock( return_value=stub_dir_entries) actual_directory = backend.directory_ls(sha1_bin, recursive=True) # then self.assertIsNotNone(actual_directory) self.assertEqual(list(actual_directory), stub_dir_entries) self.storage.directory_ls.assert_called_with(sha1_bin, True) @istest def release_get_not_found(self): # given sha1_bin = hashutil.hex_to_hash( '65a55bbdf3629f916219feb3dcc7393ded1bc8db') self.storage.release_get = MagicMock(return_value=[]) # when actual_release = backend.release_get(sha1_bin) # then self.assertIsNone(actual_release) self.storage.release_get.assert_called_with([sha1_bin]) @istest def release_get(self): # given sha1_bin = hashutil.hex_to_hash( '65a55bbdf3629f916219feb3dcc7393ded1bc8db') stub_releases = [{ 'id': sha1_bin, 'target': None, 'date': datetime.datetime(2015, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc), 'name': b'v0.0.1', 'message': b'synthetic release', 'synthetic': True, }] self.storage.release_get = MagicMock(return_value=stub_releases) # when actual_release = backend.release_get(sha1_bin) # then self.assertEqual(actual_release, stub_releases[0]) self.storage.release_get.assert_called_with([sha1_bin]) @istest def revision_get_by_not_found(self): # given self.storage.revision_get_by = MagicMock(return_value=[]) # when actual_revision = backend.revision_get_by(10, 'master', 'ts2') # then self.assertIsNone(actual_revision) self.storage.revision_get_by.assert_called_with(10, 'master', timestamp='ts2', limit=1) @istest def revision_get_by(self): # given self.storage.revision_get_by = MagicMock(return_value=[{'id': 1}]) # when actual_revisions = backend.revision_get_by(100, 'dev', 'ts') # then self.assertEquals(actual_revisions, {'id': 1}) self.storage.revision_get_by.assert_called_with(100, 'dev', timestamp='ts', limit=1) @istest def revision_get_not_found(self): # given sha1_bin = hashutil.hex_to_hash( '18d8be353ed3480476f032475e7c233eff7371d5') self.storage.revision_get = MagicMock(return_value=[]) # when actual_revision = backend.revision_get(sha1_bin) # then self.assertIsNone(actual_revision) self.storage.revision_get.assert_called_with([sha1_bin]) @istest def revision_get(self): # given sha1_bin = hashutil.hex_to_hash( '18d8be353ed3480476f032475e7c233eff7371d5') stub_revisions = [{ 'id': sha1_bin, 'directory': hashutil.hex_to_hash( '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'), 'author': { 'name': b'bill & boule', 'email': b'bill@boule.org', }, 'committer': { 'name': b'boule & bill', 'email': b'boule@bill.org', }, 'message': b'elegant fix for bug 31415957', 'date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'date_offset': 0, 'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'committer_date_offset': 0, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], }] self.storage.revision_get = MagicMock(return_value=stub_revisions) # when actual_revision = backend.revision_get(sha1_bin) # then self.assertEqual(actual_revision, stub_revisions[0]) self.storage.revision_get.assert_called_with([sha1_bin]) @istest def revision_get_multiple(self): # given sha1_bin = hashutil.hex_to_hash( '18d8be353ed3480476f032475e7c233eff7371d5') sha1_other = hashutil.hex_to_hash( 'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc') stub_revisions = [ { 'id': sha1_bin, 'directory': hashutil.hex_to_hash( '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'), 'author': { 'name': b'bill & boule', 'email': b'bill@boule.org', }, 'committer': { 'name': b'boule & bill', 'email': b'boule@bill.org', }, 'message': b'elegant fix for bug 31415957', 'date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'date_offset': 0, 'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'committer_date_offset': 0, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], }, { 'id': sha1_other, 'directory': hashutil.hex_to_hash( '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'), 'author': { 'name': b'name', 'email': b'name@surname.org', }, 'committer': { 'name': b'name', 'email': b'name@surname.org', }, 'message': b'ugly fix for bug 42', 'date': datetime.datetime(2000, 1, 12, 5, 23, 54), 'date_offset': 0, 'committer_date': datetime.datetime(2000, 1, 12, 5, 23, 54), 'committer_date_offset': 0, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], } ] self.storage.revision_get = MagicMock( return_value=stub_revisions) # when actual_revision = backend.revision_get_multiple([sha1_bin, sha1_other]) # then self.assertEqual(actual_revision, stub_revisions) self.storage.revision_get.assert_called_with( [sha1_bin, sha1_other]) @istest def revision_get_multiple_none_found(self): # given sha1_bin = hashutil.hex_to_hash( '18d8be353ed3480476f032475e7c233eff7371d5') sha1_other = hashutil.hex_to_hash( 'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc') self.storage.revision_get = MagicMock( return_value=[]) # when actual_revision = backend.revision_get_multiple([sha1_bin, sha1_other]) # then self.assertEqual(actual_revision, []) self.storage.revision_get.assert_called_with( [sha1_bin, sha1_other]) @istest def revision_log(self): # given sha1_bin = hashutil.hex_to_hash( '28d8be353ed3480476f032475e7c233eff7371d5') stub_revision_log = [{ 'id': sha1_bin, 'directory': hashutil.hex_to_hash( '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'), 'author': { 'name': b'bill & boule', 'email': b'bill@boule.org', }, 'committer': { 'name': b'boule & bill', 'email': b'boule@bill.org', }, 'message': b'elegant fix for bug 31415957', 'date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'date_offset': 0, 'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'committer_date_offset': 0, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], }] self.storage.revision_log = MagicMock(return_value=stub_revision_log) # when actual_revision = backend.revision_log(sha1_bin, limit=1) # then self.assertEqual(list(actual_revision), stub_revision_log) self.storage.revision_log.assert_called_with([sha1_bin], 1) @istest def revision_log_by(self): # given sha1_bin = hashutil.hex_to_hash( '28d8be353ed3480476f032475e7c233eff7371d5') stub_revision_log = [{ 'id': sha1_bin, 'directory': hashutil.hex_to_hash( '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'), 'author': { 'name': b'bill & boule', 'email': b'bill@boule.org', }, 'committer': { 'name': b'boule & bill', 'email': b'boule@bill.org', }, 'message': b'elegant fix for bug 31415957', 'date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'date_offset': 0, 'committer_date': datetime.datetime(2000, 1, 17, 11, 23, 54), 'committer_date_offset': 0, 'synthetic': False, 'type': 'git', 'parents': [], 'metadata': [], }] self.storage.revision_log_by = MagicMock( return_value=stub_revision_log) # when actual_log = backend.revision_log_by(1, 'refs/heads/master', None, limit=1) # then self.assertEqual(actual_log, stub_revision_log) self.storage.revision_log.assert_called_with([sha1_bin], 1) @istest def revision_log_by_norev(self): # given sha1_bin = hashutil.hex_to_hash( '28d8be353ed3480476f032475e7c233eff7371d5') self.storage.revision_log_by = MagicMock(return_value=None) # when actual_log = backend.revision_log_by(1, 'refs/heads/master', None, limit=1) # then self.assertEqual(actual_log, None) self.storage.revision_log.assert_called_with([sha1_bin], 1) @istest def stat_counters(self): # given input_stats = { "content": 1770830, "directory": 211683, "directory_entry_dir": 209167, "directory_entry_file": 1807094, "directory_entry_rev": 0, "entity": 0, "entity_history": 0, "occurrence": 0, "occurrence_history": 19600, "origin": 1096, "person": 0, "release": 8584, "revision": 7792, "revision_history": 0, "skipped_content": 0 } self.storage.stat_counters = MagicMock(return_value=input_stats) # when actual_stats = backend.stat_counters() # then expected_stats = input_stats self.assertEqual(actual_stats, expected_stats) self.storage.stat_counters.assert_called_with() @istest def lookup_origin_visits(self): # given expected_origin_visits = [ self.origin_visit1, { 'date': datetime.datetime( 2013, 7, 1, 20, 0, 0, tzinfo=datetime.timezone.utc), 'origin': 1, 'visit': 2 }, { 'date': datetime.datetime( 2015, 1, 1, 21, 0, 0, tzinfo=datetime.timezone.utc), 'origin': 1, 'visit': 3 }] self.storage.origin_visit_get = MagicMock( return_value=expected_origin_visits) # when actual_origin_visits = backend.lookup_origin_visits(5) # then self.assertEqual(list(actual_origin_visits), expected_origin_visits) self.storage.origin_visit_get.assert_called_with( 5, last_visit=None, limit=10) @istest def lookup_origin_visit(self): # given self.storage.origin_visit_get_by = MagicMock( return_value=self.origin_visit1) # when actual_origin_visit = backend.lookup_origin_visit(10, 1) # then self.assertEqual(actual_origin_visit, self.origin_visit1) self.storage.origin_visit_get_by.assert_called_with(10, 1) @istest - def lookup_origin_visit_None(self): + def lookup_origin_visit_none(self): # given self.storage.origin_visit_get_by = MagicMock( return_value=None) # when actual_origin_visit = backend.lookup_origin_visit(1, 10) # then self.assertIsNone(actual_origin_visit) self.storage.origin_visit_get_by.assert_called_with(1, 10) @istest def directory_entry_get_by_path(self): # given stub_dir_entry = {'id': b'dir-id', 'type': 'dir', 'name': b'some/path/foo'} self.storage.directory_entry_get_by_path = MagicMock( return_value=stub_dir_entry) # when actual_dir_entry = backend.directory_entry_get_by_path(b'dir-sha1', 'some/path/foo') self.assertEquals(actual_dir_entry, stub_dir_entry) self.storage.directory_entry_get_by_path.assert_called_once_with( b'dir-sha1', [b'some', b'path', b'foo']) @istest def entity_get(self): # given stub_entities = [{'uuid': 'e8c3fc2e-a932-4fd7-8f8e-c40645eb35a7', 'parent': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2'}, {'uuid': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2', 'parent': None}] self.storage.entity_get = MagicMock(return_value=stub_entities) # when actual_entities = backend.entity_get( 'e8c3fc2e-a932-4fd7-8f8e-c40645eb35a7') # then self.assertEquals(actual_entities, stub_entities) self.storage.entity_get.assert_called_once_with( 'e8c3fc2e-a932-4fd7-8f8e-c40645eb35a7') diff --git a/swh/web/ui/tests/test_converters.py b/swh/web/ui/tests/test_converters.py index 9b2bdb4b1..971437eaa 100644 --- a/swh/web/ui/tests/test_converters.py +++ b/swh/web/ui/tests/test_converters.py @@ -1,723 +1,723 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU Affero General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import unittest from nose.tools import istest from swh.core import hashutil from swh.web.ui import converters class ConvertersTestCase(unittest.TestCase): @istest def from_swh(self): some_input = { 'a': 'something', 'b': 'someone', 'c': b'sharp-0.3.4.tgz', 'd': hashutil.hex_to_hash( 'b04caf10e9535160d90e874b45aa426de762f19f'), 'e': b'sharp.html/doc_002dS_005fISREG.html', 'g': [b'utf-8-to-decode', b'another-one'], 'h': 'something filtered', 'i': {'e': b'something'}, 'j': { 'k': { 'l': [b'bytes thing', b'another thingy'], 'n': 'dont care either' }, 'm': 'dont care' }, 'o': 'something', 'p': b'foo', 'q': {'extra-headers': [['a', b'intact']]}, 'w': None, 'r': {'p': 'also intact', 'q': 'bar'}, 's': { 'timestamp': 42, 'offset': -420, 'negative_utc': None, }, 't': None, 'u': None, 'v': None, } expected_output = { 'a': 'something', 'b': 'someone', 'c': 'sharp-0.3.4.tgz', 'd': 'b04caf10e9535160d90e874b45aa426de762f19f', 'e': 'sharp.html/doc_002dS_005fISREG.html', 'g': ['utf-8-to-decode', 'another-one'], 'i': {'e': 'something'}, 'j': { 'k': { 'l': ['bytes thing', 'another thingy'] } }, 'p': 'foo', 'q': {'extra-headers': [['a', 'intact']]}, 'w': {}, 'r': {'p': 'also intact', 'q': 'bar'}, 's': '1969-12-31T17:00:42-07:00', 'u': {}, 'v': [], } actual_output = converters.from_swh( some_input, hashess={'d', 'o'}, bytess={'c', 'e', 'g', 'l'}, dates={'s'}, blacklist={'h', 'm', 'n', 'o'}, removables_if_empty={'t'}, empty_dict={'u'}, empty_list={'v'}, convert={'p', 'q', 'w'}, convert_fn=converters.convert_revision_metadata) self.assertEquals(expected_output, actual_output) @istest def from_swh_edge_cases_do_no_conversion_if_none_or_not_bytes(self): some_input = { 'a': 'something', 'b': None, 'c': 'someone', 'd': None, } expected_output = { 'a': 'something', 'b': None, 'c': 'someone', 'd': None, } actual_output = converters.from_swh(some_input, hashess={'a', 'b'}, bytess={'c', 'd'}) self.assertEquals(expected_output, actual_output) @istest def from_swh_edge_cases_convert_invalid_utf8_bytes(self): some_input = { 'a': 'something', 'b': 'someone', 'c': b'a name \xff', 'd': b'an email \xff', } expected_output = { 'a': 'something', 'b': 'someone', 'c': 'a name \\xff', 'd': 'an email \\xff', 'decoding_failures': ['c', 'd'] } actual_output = converters.from_swh(some_input, hashess={'a', 'b'}, bytess={'c', 'd'}) for v in ['a', 'b', 'c', 'd']: self.assertEqual(expected_output[v], actual_output[v]) self.assertEqual(len(expected_output['decoding_failures']), len(actual_output['decoding_failures'])) for v in expected_output['decoding_failures']: self.assertTrue(v in actual_output['decoding_failures']) @istest def from_swh_empty(self): # when self.assertEquals({}, converters.from_swh({})) @istest def from_swh_none(self): # when self.assertIsNone(converters.from_swh(None)) @istest def from_provenance(self): # given input_provenance = { 'origin': 10, 'visit': 1, 'content': hashutil.hex_to_hash( '321caf10e9535160d90e874b45aa426de762f19f'), 'revision': hashutil.hex_to_hash( '123caf10e9535160d90e874b45aa426de762f19f'), 'path': b'octave-3.4.0/doc/interpreter/octave/doc_002dS_005fISREG' } expected_provenance = { 'origin': 10, 'visit': 1, 'content': '321caf10e9535160d90e874b45aa426de762f19f', 'revision': '123caf10e9535160d90e874b45aa426de762f19f', 'path': 'octave-3.4.0/doc/interpreter/octave/doc_002dS_005fISREG' } # when actual_provenance = converters.from_provenance(input_provenance) # then self.assertEqual(actual_provenance, expected_provenance) @istest def from_origin(self): # given origin_input = { 'id': 9, 'type': 'ftp', 'url': 'rsync://ftp.gnu.org/gnu/octave', 'project': None, 'lister': None, } expected_origin = { 'id': 9, 'type': 'ftp', 'url': 'rsync://ftp.gnu.org/gnu/octave', } # when actual_origin = converters.from_origin(origin_input) # then self.assertEqual(actual_origin, expected_origin) @istest def from_release(self): release_input = { 'id': hashutil.hex_to_hash( 'aad23fa492a0c5fed0708a6703be875448c86884'), 'target': hashutil.hex_to_hash( '5e46d564378afc44b31bb89f99d5675195fbdf67'), 'target_type': 'revision', 'date': { 'timestamp': datetime.datetime( 2015, 1, 1, 22, 0, 0, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': False, }, 'author': { 'name': b'author name', 'fullname': b'Author Name author@email', 'email': b'author@email', }, 'name': b'v0.0.1', 'message': b'some comment on release', 'synthetic': True, } expected_release = { 'id': 'aad23fa492a0c5fed0708a6703be875448c86884', 'target': '5e46d564378afc44b31bb89f99d5675195fbdf67', 'target_type': 'revision', 'date': '2015-01-01T22:00:00+00:00', 'author': { 'name': 'author name', 'fullname': 'Author Name author@email', 'email': 'author@email', }, 'name': 'v0.0.1', 'message': 'some comment on release', 'target_type': 'revision', 'synthetic': True, } # when actual_release = converters.from_release(release_input) # then self.assertEqual(actual_release, expected_release) @istest def from_release_no_revision(self): release_input = { 'id': hashutil.hex_to_hash( 'b2171ee2bdf119cd99a7ec7eff32fa8013ef9a4e'), 'target': None, 'date': { 'timestamp': datetime.datetime( 2016, 3, 2, 10, 0, 0, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': True, }, 'name': b'v0.1.1', 'message': b'comment on release', 'synthetic': False, 'author': { 'name': b'bob', 'fullname': b'Bob bob@alice.net', 'email': b'bob@alice.net', }, } expected_release = { 'id': 'b2171ee2bdf119cd99a7ec7eff32fa8013ef9a4e', 'target': None, 'date': '2016-03-02T10:00:00-00:00', 'name': 'v0.1.1', 'message': 'comment on release', 'synthetic': False, 'author': { 'name': 'bob', 'fullname': 'Bob bob@alice.net', 'email': 'bob@alice.net', }, } # when actual_release = converters.from_release(release_input) # then self.assertEqual(actual_release, expected_release) @istest def from_revision(self): revision_input = { 'id': hashutil.hex_to_hash( '18d8be353ed3480476f032475e7c233eff7371d5'), 'directory': hashutil.hex_to_hash( '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'), 'author': { 'name': b'Software Heritage', 'fullname': b'robot robot@softwareheritage.org', 'email': b'robot@softwareheritage.org', }, 'committer': { 'name': b'Software Heritage', 'fullname': b'robot robot@softwareheritage.org', 'email': b'robot@softwareheritage.org', }, 'message': b'synthetic revision message', 'date': { 'timestamp': datetime.datetime( 2000, 1, 17, 11, 23, 54, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': False, }, 'committer_date': { 'timestamp': datetime.datetime( 2000, 1, 17, 11, 23, 54, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': False, }, 'synthetic': True, 'type': 'tar', 'parents': [ hashutil.hex_to_hash( '29d8be353ed3480476f032475e7c244eff7371d5'), hashutil.hex_to_hash( '30d8be353ed3480476f032475e7c244eff7371d5') ], 'children': [ hashutil.hex_to_hash( '123546353ed3480476f032475e7c244eff7371d5'), ], 'metadata': { 'extra_headers': [['gpgsig', b'some-signature']], 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912', }] }, } expected_revision = { 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author': { 'name': 'Software Heritage', 'fullname': 'robot robot@softwareheritage.org', 'email': 'robot@softwareheritage.org', }, 'committer': { 'name': 'Software Heritage', 'fullname': 'robot robot@softwareheritage.org', 'email': 'robot@softwareheritage.org', }, 'message': 'synthetic revision message', 'date': "2000-01-17T11:23:54+00:00", 'committer_date': "2000-01-17T11:23:54+00:00", 'children': [ '123546353ed3480476f032475e7c244eff7371d5' ], 'parents': [ '29d8be353ed3480476f032475e7c244eff7371d5', '30d8be353ed3480476f032475e7c244eff7371d5' ], 'type': 'tar', 'synthetic': True, 'metadata': { 'extra_headers': [['gpgsig', 'some-signature']], 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912' }] }, 'merge': True } # when actual_revision = converters.from_revision(revision_input) # then self.assertEqual(actual_revision, expected_revision) @istest def from_revision_nomerge(self): revision_input = { 'id': hashutil.hex_to_hash( '18d8be353ed3480476f032475e7c233eff7371d5'), 'parents': [ hashutil.hex_to_hash( '29d8be353ed3480476f032475e7c244eff7371d5') ] } expected_revision = { 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'parents': [ '29d8be353ed3480476f032475e7c244eff7371d5' ], 'merge': False } # when actual_revision = converters.from_revision(revision_input) # then self.assertEqual(actual_revision, expected_revision) @istest def from_revision_noparents(self): revision_input = { 'id': hashutil.hex_to_hash( '18d8be353ed3480476f032475e7c233eff7371d5'), 'directory': hashutil.hex_to_hash( '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'), 'author': { 'name': b'Software Heritage', 'fullname': b'robot robot@softwareheritage.org', 'email': b'robot@softwareheritage.org', }, 'committer': { 'name': b'Software Heritage', 'fullname': b'robot robot@softwareheritage.org', 'email': b'robot@softwareheritage.org', }, 'message': b'synthetic revision message', 'date': { 'timestamp': datetime.datetime( 2000, 1, 17, 11, 23, 54, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': False, }, 'committer_date': { 'timestamp': datetime.datetime( 2000, 1, 17, 11, 23, 54, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': False, }, 'synthetic': True, 'type': 'tar', 'children': [ hashutil.hex_to_hash( '123546353ed3480476f032475e7c244eff7371d5'), ], 'metadata': { 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912', }] }, } expected_revision = { 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author': { 'name': 'Software Heritage', 'fullname': 'robot robot@softwareheritage.org', 'email': 'robot@softwareheritage.org', }, 'committer': { 'name': 'Software Heritage', 'fullname': 'robot robot@softwareheritage.org', 'email': 'robot@softwareheritage.org', }, 'message': 'synthetic revision message', 'date': "2000-01-17T11:23:54+00:00", 'committer_date': "2000-01-17T11:23:54+00:00", 'children': [ '123546353ed3480476f032475e7c244eff7371d5' ], 'type': 'tar', 'synthetic': True, 'metadata': { 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912' }] } } # when actual_revision = converters.from_revision(revision_input) # then self.assertEqual(actual_revision, expected_revision) @istest def from_revision_invalid(self): revision_input = { 'id': hashutil.hex_to_hash( '18d8be353ed3480476f032475e7c233eff7371d5'), 'directory': hashutil.hex_to_hash( '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6'), 'author': { 'name': b'Software Heritage', 'fullname': b'robot robot@softwareheritage.org', 'email': b'robot@softwareheritage.org', }, 'committer': { 'name': b'Software Heritage', 'fullname': b'robot robot@softwareheritage.org', 'email': b'robot@softwareheritage.org', }, 'message': b'invalid message \xff', 'date': { 'timestamp': datetime.datetime( 2000, 1, 17, 11, 23, 54, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': False, }, 'committer_date': { 'timestamp': datetime.datetime( 2000, 1, 17, 11, 23, 54, tzinfo=datetime.timezone.utc).timestamp(), 'offset': 0, 'negative_utc': False, }, 'synthetic': True, 'type': 'tar', 'parents': [ hashutil.hex_to_hash( '29d8be353ed3480476f032475e7c244eff7371d5'), hashutil.hex_to_hash( '30d8be353ed3480476f032475e7c244eff7371d5') ], 'children': [ hashutil.hex_to_hash( '123546353ed3480476f032475e7c244eff7371d5'), ], 'metadata': { 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912', }] }, } expected_revision = { 'id': '18d8be353ed3480476f032475e7c233eff7371d5', 'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 'author': { 'name': 'Software Heritage', 'fullname': 'robot robot@softwareheritage.org', 'email': 'robot@softwareheritage.org', }, 'committer': { 'name': 'Software Heritage', 'fullname': 'robot robot@softwareheritage.org', 'email': 'robot@softwareheritage.org', }, 'message': None, 'message_decoding_failed': True, 'date': "2000-01-17T11:23:54+00:00", 'committer_date': "2000-01-17T11:23:54+00:00", 'children': [ '123546353ed3480476f032475e7c244eff7371d5' ], 'parents': [ '29d8be353ed3480476f032475e7c244eff7371d5', '30d8be353ed3480476f032475e7c244eff7371d5' ], 'type': 'tar', 'synthetic': True, 'metadata': { 'original_artifact': [{ 'archive_type': 'tar', 'name': 'webbase-5.7.0.tar.gz', 'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd', 'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1', 'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f' '309d36484e7edf7bb912' }] }, 'merge': True } # when actual_revision = converters.from_revision(revision_input) # then self.assertEqual(actual_revision, expected_revision) @istest - def from_content_None(self): + def from_content_none(self): self.assertIsNone(converters.from_content(None)) @istest def from_content(self): content_input = { 'sha1': hashutil.hex_to_hash('5c6f0e2750f48fa0bd0c4cf5976ba0b9e0' '2ebda5'), 'sha256': hashutil.hex_to_hash('39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926'), 'sha1_git': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'ctime': 'something-which-is-filtered-out', 'data': b'data in bytes', 'length': 10, 'status': 'hidden', } # 'status' is filtered expected_content = { 'sha1': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5', 'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d274' '7d3bf96c926', 'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'data': b'data in bytes', 'length': 10, 'status': 'absent', } # when actual_content = converters.from_content(content_input) # then self.assertEqual(actual_content, expected_content) @istest def from_person(self): person_input = { 'id': 10, 'anything': 'else', 'name': b'bob', 'fullname': b'bob bob@alice.net', 'email': b'bob@foo.alice', } expected_person = { 'id': 10, 'anything': 'else', 'name': 'bob', 'fullname': 'bob bob@alice.net', 'email': 'bob@foo.alice', } # when actual_person = converters.from_person(person_input) # then self.assertEqual(actual_person, expected_person) @istest def from_directory_entries(self): dir_entries_input = { 'sha1': hashutil.hex_to_hash('5c6f0e2750f48fa0bd0c4cf5976ba0b9e0' '2ebda5'), 'sha256': hashutil.hex_to_hash('39007420ca5de7cb3cfc15196335507e' 'e76c98930e7e0afa4d2747d3bf96c926'), 'sha1_git': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'target': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'dir_id': hashutil.hex_to_hash('40e71b8614fcd89ccd17ca2b1d9e66' 'c5b00a6d03'), 'name': b'bob', 'type': 10, 'status': 'hidden', } expected_dir_entries = { 'sha1': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5', 'sha256': '39007420ca5de7cb3cfc15196335507ee76c98930e7e0afa4d2747' 'd3bf96c926', 'sha1_git': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'target': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'dir_id': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03', 'name': 'bob', 'type': 10, 'status': 'absent', } # when actual_dir_entries = converters.from_directory_entry(dir_entries_input) # then self.assertEqual(actual_dir_entries, expected_dir_entries) @istest def from_filetype(self): content_filetype = { 'id': hashutil.hex_to_hash('5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebd' 'a5'), 'encoding': b'utf-8', 'mimetype': b'text/plain', } expected_content_filetype = { 'id': '5c6f0e2750f48fa0bd0c4cf5976ba0b9e02ebda5', 'encoding': 'utf-8', 'mimetype': 'text/plain', } # when actual_content_filetype = converters.from_filetype(content_filetype) # then self.assertEqual(actual_content_filetype, expected_content_filetype)