Changeset View
Changeset View
Standalone View
Standalone View
swh/core/api/negotiation.py
Show All 21 Lines | |||||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||||
# SOFTWARE. | # SOFTWARE. | ||||
# | # | ||||
from collections import defaultdict | from collections import defaultdict | ||||
from decorator import decorator | from decorator import decorator | ||||
from inspect import getcallargs | from inspect import getcallargs | ||||
from typing import Any, List, Optional, Callable, \ | from typing import Any, List, Optional, Callable, Type, NoReturn, DefaultDict | ||||
Type, NoReturn, DefaultDict | |||||
from requests import Response | from requests import Response | ||||
class FormatterNotFound(Exception): | class FormatterNotFound(Exception): | ||||
pass | pass | ||||
class Formatter: | class Formatter: | ||||
format: Optional[str] = None | format: Optional[str] = None | ||||
mimetypes: List[str] = [] | mimetypes: List[str] = [] | ||||
def __init__(self, request_mimetype: Optional[str] = None) -> None: | def __init__(self, request_mimetype: Optional[str] = None) -> None: | ||||
if request_mimetype is None or request_mimetype not in self.mimetypes: | if request_mimetype is None or request_mimetype not in self.mimetypes: | ||||
try: | try: | ||||
self.response_mimetype = self.mimetypes[0] | self.response_mimetype = self.mimetypes[0] | ||||
except IndexError: | except IndexError: | ||||
raise NotImplementedError( | raise NotImplementedError( | ||||
"%s.mimetypes should be a non-empty list" % | "%s.mimetypes should be a non-empty list" % self.__class__.__name__ | ||||
self.__class__.__name__) | ) | ||||
else: | else: | ||||
self.response_mimetype = request_mimetype | self.response_mimetype = request_mimetype | ||||
def configure(self) -> None: | def configure(self) -> None: | ||||
pass | pass | ||||
def render(self, obj: Any) -> bytes: | def render(self, obj: Any) -> bytes: | ||||
raise NotImplementedError( | raise NotImplementedError( | ||||
"render() should be implemented by Formatter subclasses") | "render() should be implemented by Formatter subclasses" | ||||
) | |||||
def __call__(self, obj: Any) -> Response: | def __call__(self, obj: Any) -> Response: | ||||
return self._make_response( | return self._make_response( | ||||
self.render(obj), content_type=self.response_mimetype) | self.render(obj), content_type=self.response_mimetype | ||||
) | |||||
def _make_response(self, body: bytes, content_type: str) -> Response: | def _make_response(self, body: bytes, content_type: str) -> Response: | ||||
raise NotImplementedError( | raise NotImplementedError( | ||||
"_make_response() should be implemented by " | "_make_response() should be implemented by " | ||||
"framework-specific subclasses of Formatter" | "framework-specific subclasses of Formatter" | ||||
) | ) | ||||
class Negotiator: | class Negotiator: | ||||
def __init__(self, func: Callable[..., Any]) -> None: | def __init__(self, func: Callable[..., Any]) -> None: | ||||
self.func = func | self.func = func | ||||
self._formatters: List[Type[Formatter]] = [] | self._formatters: List[Type[Formatter]] = [] | ||||
self._formatters_by_format: DefaultDict = defaultdict(list) | self._formatters_by_format: DefaultDict = defaultdict(list) | ||||
self._formatters_by_mimetype: DefaultDict = defaultdict(list) | self._formatters_by_mimetype: DefaultDict = defaultdict(list) | ||||
def __call__(self, *args, **kwargs) -> Response: | def __call__(self, *args, **kwargs) -> Response: | ||||
result = self.func(*args, **kwargs) | result = self.func(*args, **kwargs) | ||||
format = getcallargs(self.func, *args, **kwargs).get('format') | format = getcallargs(self.func, *args, **kwargs).get("format") | ||||
mimetype = self.best_mimetype() | mimetype = self.best_mimetype() | ||||
try: | try: | ||||
formatter = self.get_formatter(format, mimetype) | formatter = self.get_formatter(format, mimetype) | ||||
except FormatterNotFound as e: | except FormatterNotFound as e: | ||||
return self._abort(404, str(e)) | return self._abort(404, str(e)) | ||||
return formatter(result) | return formatter(result) | ||||
def register_formatter(self, formatter: Type[Formatter], | def register_formatter(self, formatter: Type[Formatter], *args, **kwargs) -> None: | ||||
*args, **kwargs) -> None: | |||||
self._formatters.append(formatter) | self._formatters.append(formatter) | ||||
self._formatters_by_format[formatter.format].append( | self._formatters_by_format[formatter.format].append((formatter, args, kwargs)) | ||||
(formatter, args, kwargs)) | |||||
for mimetype in formatter.mimetypes: | for mimetype in formatter.mimetypes: | ||||
self._formatters_by_mimetype[mimetype].append( | self._formatters_by_mimetype[mimetype].append((formatter, args, kwargs)) | ||||
(formatter, args, kwargs)) | |||||
def get_formatter(self, format: Optional[str] = None, | def get_formatter( | ||||
mimetype: Optional[str] = None) -> Formatter: | self, format: Optional[str] = None, mimetype: Optional[str] = None | ||||
) -> Formatter: | |||||
if format is None and mimetype is None: | if format is None and mimetype is None: | ||||
raise TypeError( | raise TypeError( | ||||
"get_formatter expects one of the 'format' or 'mimetype' " | "get_formatter expects one of the 'format' or 'mimetype' " | ||||
"kwargs to be set") | "kwargs to be set" | ||||
) | |||||
if format is not None: | if format is not None: | ||||
try: | try: | ||||
# the first added will be the most specific | # the first added will be the most specific | ||||
formatter_cls, args, kwargs = ( | formatter_cls, args, kwargs = self._formatters_by_format[format][0] | ||||
self._formatters_by_format[format][0]) | |||||
except IndexError: | except IndexError: | ||||
raise FormatterNotFound( | raise FormatterNotFound("Formatter for format '%s' not found!" % format) | ||||
"Formatter for format '%s' not found!" % format) | |||||
elif mimetype is not None: | elif mimetype is not None: | ||||
try: | try: | ||||
# the first added will be the most specific | # the first added will be the most specific | ||||
formatter_cls, args, kwargs = ( | formatter_cls, args, kwargs = self._formatters_by_mimetype[mimetype][0] | ||||
self._formatters_by_mimetype[mimetype][0]) | |||||
except IndexError: | except IndexError: | ||||
raise FormatterNotFound( | raise FormatterNotFound( | ||||
"Formatter for mimetype '%s' not found!" % mimetype) | "Formatter for mimetype '%s' not found!" % mimetype | ||||
) | |||||
formatter = formatter_cls(request_mimetype=mimetype) | formatter = formatter_cls(request_mimetype=mimetype) | ||||
formatter.configure(*args, **kwargs) | formatter.configure(*args, **kwargs) | ||||
return formatter | return formatter | ||||
@property | @property | ||||
def accept_mimetypes(self) -> List[str]: | def accept_mimetypes(self) -> List[str]: | ||||
return [m for f in self._formatters for m in f.mimetypes] | return [m for f in self._formatters for m in f.mimetypes] | ||||
def best_mimetype(self) -> str: | def best_mimetype(self) -> str: | ||||
raise NotImplementedError( | raise NotImplementedError( | ||||
"best_mimetype() should be implemented in " | "best_mimetype() should be implemented in " | ||||
"framework-specific subclasses of Negotiator" | "framework-specific subclasses of Negotiator" | ||||
) | ) | ||||
def _abort(self, status_code: int, err: Optional[str] = None) -> NoReturn: | def _abort(self, status_code: int, err: Optional[str] = None) -> NoReturn: | ||||
raise NotImplementedError( | raise NotImplementedError( | ||||
"_abort() should be implemented in framework-specific " | "_abort() should be implemented in framework-specific " | ||||
"subclasses of Negotiator" | "subclasses of Negotiator" | ||||
) | ) | ||||
def negotiate(negotiator_cls: Type[Negotiator], formatter_cls: Type[Formatter], | def negotiate( | ||||
*args, **kwargs) -> Callable: | negotiator_cls: Type[Negotiator], formatter_cls: Type[Formatter], *args, **kwargs | ||||
) -> Callable: | |||||
def _negotiate(f, *args, **kwargs): | def _negotiate(f, *args, **kwargs): | ||||
return f.negotiator(*args, **kwargs) | return f.negotiator(*args, **kwargs) | ||||
def decorate(f): | def decorate(f): | ||||
if not hasattr(f, 'negotiator'): | if not hasattr(f, "negotiator"): | ||||
f.negotiator = negotiator_cls(f) | f.negotiator = negotiator_cls(f) | ||||
f.negotiator.register_formatter(formatter_cls, *args, **kwargs) | f.negotiator.register_formatter(formatter_cls, *args, **kwargs) | ||||
return decorator(_negotiate, f) | return decorator(_negotiate, f) | ||||
return decorate | return decorate |