Changeset View
Changeset View
Standalone View
Standalone View
swh/core/api/tests/test_async.py
Show All 23 Lines | |||||
class TestClientError(Exception): | class TestClientError(Exception): | ||||
pass | pass | ||||
async def root(request): | async def root(request): | ||||
return Response('toor') | return Response('toor') | ||||
STRUCT = {'txt': 'something stupid', | |||||
STRUCT = { | |||||
'txt': 'something stupid', | |||||
# 'date': datetime.date(2019, 6, 9), # not supported | # 'date': datetime.date(2019, 6, 9), # not supported | ||||
'datetime': datetime.datetime(2019, 6, 9, 10, 12), | 'datetime': datetime.datetime(2019, 6, 9, 10, 12), | ||||
'timedelta': datetime.timedelta(days=-2, hours=3), | 'timedelta': datetime.timedelta(days=-2, hours=3), | ||||
'int': 42, | 'int': 42, | ||||
'float': 3.14, | 'float': 3.14, | ||||
'subdata': {'int': 42, | 'subdata': {'int': 42, 'datetime': datetime.datetime(2019, 6, 10, 11, 12)}, | ||||
'datetime': datetime.datetime(2019, 6, 10, 11, 12), | |||||
}, | |||||
'list': [42, datetime.datetime(2019, 9, 10, 11, 12), 'ok'], | 'list': [42, datetime.datetime(2019, 9, 10, 11, 12), 'ok'], | ||||
} | } | ||||
async def struct(request): | async def struct(request): | ||||
return Response(STRUCT) | return Response(STRUCT) | ||||
async def echo(request): | async def echo(request): | ||||
data = await decode_request(request) | data = await decode_request(request) | ||||
▲ Show 20 Lines • Show All 81 Lines • ▼ Show 20 Lines | async def test_get_struct(async_app, aiohttp_client) -> None: | ||||
check_mimetype(resp.headers['Content-Type'], 'application/x-msgpack') | check_mimetype(resp.headers['Content-Type'], 'application/x-msgpack') | ||||
assert (await decode_request(resp)) == STRUCT | assert (await decode_request(resp)) == STRUCT | ||||
async def test_get_struct_nego(async_app, aiohttp_client) -> None: | async def test_get_struct_nego(async_app, aiohttp_client) -> None: | ||||
"""Test returned structured from a simple GET data is OK""" | """Test returned structured from a simple GET data is OK""" | ||||
cli = await aiohttp_client(async_app) | cli = await aiohttp_client(async_app) | ||||
for ctype in ('x-msgpack', 'json'): | for ctype in ('x-msgpack', 'json'): | ||||
resp = await cli.get('/struct', | resp = await cli.get('/struct', headers={'Accept': 'application/%s' % ctype}) | ||||
headers={'Accept': 'application/%s' % ctype}) | |||||
assert resp.status == 200 | assert resp.status == 200 | ||||
check_mimetype(resp.headers['Content-Type'], 'application/%s' % ctype) | check_mimetype(resp.headers['Content-Type'], 'application/%s' % ctype) | ||||
assert (await decode_request(resp)) == STRUCT | assert (await decode_request(resp)) == STRUCT | ||||
async def test_post_struct_msgpack(async_app, aiohttp_client) -> None: | async def test_post_struct_msgpack(async_app, aiohttp_client) -> None: | ||||
"""Test that msgpack encoded posted struct data is returned as is""" | """Test that msgpack encoded posted struct data is returned as is""" | ||||
cli = await aiohttp_client(async_app) | cli = await aiohttp_client(async_app) | ||||
# simple struct | # simple struct | ||||
resp = await cli.post( | resp = await cli.post( | ||||
'/echo', | '/echo', | ||||
headers={'Content-Type': 'application/x-msgpack'}, | headers={'Content-Type': 'application/x-msgpack'}, | ||||
data=msgpack_dumps({'toto': 42})) | data=msgpack_dumps({'toto': 42}), | ||||
) | |||||
assert resp.status == 200 | assert resp.status == 200 | ||||
check_mimetype(resp.headers['Content-Type'], 'application/x-msgpack') | check_mimetype(resp.headers['Content-Type'], 'application/x-msgpack') | ||||
assert (await decode_request(resp)) == {'toto': 42} | assert (await decode_request(resp)) == {'toto': 42} | ||||
# complex struct | # complex struct | ||||
resp = await cli.post( | resp = await cli.post( | ||||
'/echo', | '/echo', | ||||
headers={'Content-Type': 'application/x-msgpack'}, | headers={'Content-Type': 'application/x-msgpack'}, | ||||
data=msgpack_dumps(STRUCT)) | data=msgpack_dumps(STRUCT), | ||||
) | |||||
assert resp.status == 200 | assert resp.status == 200 | ||||
check_mimetype(resp.headers['Content-Type'], 'application/x-msgpack') | check_mimetype(resp.headers['Content-Type'], 'application/x-msgpack') | ||||
assert (await decode_request(resp)) == STRUCT | assert (await decode_request(resp)) == STRUCT | ||||
async def test_post_struct_json(async_app, aiohttp_client) -> None: | async def test_post_struct_json(async_app, aiohttp_client) -> None: | ||||
"""Test that json encoded posted struct data is returned as is""" | """Test that json encoded posted struct data is returned as is""" | ||||
cli = await aiohttp_client(async_app) | cli = await aiohttp_client(async_app) | ||||
resp = await cli.post( | resp = await cli.post( | ||||
'/echo', | '/echo', | ||||
headers={'Content-Type': 'application/json'}, | headers={'Content-Type': 'application/json'}, | ||||
data=json.dumps({'toto': 42}, cls=SWHJSONEncoder)) | data=json.dumps({'toto': 42}, cls=SWHJSONEncoder), | ||||
) | |||||
assert resp.status == 200 | assert resp.status == 200 | ||||
check_mimetype(resp.headers['Content-Type'], 'application/x-msgpack') | check_mimetype(resp.headers['Content-Type'], 'application/x-msgpack') | ||||
assert (await decode_request(resp)) == {'toto': 42} | assert (await decode_request(resp)) == {'toto': 42} | ||||
resp = await cli.post( | resp = await cli.post( | ||||
'/echo', | '/echo', | ||||
headers={'Content-Type': 'application/json'}, | headers={'Content-Type': 'application/json'}, | ||||
data=json.dumps(STRUCT, cls=SWHJSONEncoder)) | data=json.dumps(STRUCT, cls=SWHJSONEncoder), | ||||
) | |||||
assert resp.status == 200 | assert resp.status == 200 | ||||
check_mimetype(resp.headers['Content-Type'], 'application/x-msgpack') | check_mimetype(resp.headers['Content-Type'], 'application/x-msgpack') | ||||
# assert resp.headers['Content-Type'] == 'application/x-msgpack' | # assert resp.headers['Content-Type'] == 'application/x-msgpack' | ||||
assert (await decode_request(resp)) == STRUCT | assert (await decode_request(resp)) == STRUCT | ||||
async def test_post_struct_nego(async_app, aiohttp_client) -> None: | async def test_post_struct_nego(async_app, aiohttp_client) -> None: | ||||
"""Test that json encoded posted struct data is returned as is | """Test that json encoded posted struct data is returned as is | ||||
using content negotiation (accept json or msgpack). | using content negotiation (accept json or msgpack). | ||||
""" | """ | ||||
cli = await aiohttp_client(async_app) | cli = await aiohttp_client(async_app) | ||||
for ctype in ('x-msgpack', 'json'): | for ctype in ('x-msgpack', 'json'): | ||||
resp = await cli.post( | resp = await cli.post( | ||||
'/echo', | '/echo', | ||||
headers={'Content-Type': 'application/json', | headers={ | ||||
'Accept': 'application/%s' % ctype}, | 'Content-Type': 'application/json', | ||||
data=json.dumps(STRUCT, cls=SWHJSONEncoder)) | 'Accept': 'application/%s' % ctype, | ||||
}, | |||||
data=json.dumps(STRUCT, cls=SWHJSONEncoder), | |||||
) | |||||
assert resp.status == 200 | assert resp.status == 200 | ||||
check_mimetype(resp.headers['Content-Type'], 'application/%s' % ctype) | check_mimetype(resp.headers['Content-Type'], 'application/%s' % ctype) | ||||
assert (await decode_request(resp)) == STRUCT | assert (await decode_request(resp)) == STRUCT | ||||
async def test_post_struct_no_nego(async_app, aiohttp_client) -> None: | async def test_post_struct_no_nego(async_app, aiohttp_client) -> None: | ||||
"""Test that json encoded posted struct data is returned as msgpack | """Test that json encoded posted struct data is returned as msgpack | ||||
when using non-negotiation-compatible handlers. | when using non-negotiation-compatible handlers. | ||||
""" | """ | ||||
cli = await aiohttp_client(async_app) | cli = await aiohttp_client(async_app) | ||||
for ctype in ('x-msgpack', 'json'): | for ctype in ('x-msgpack', 'json'): | ||||
resp = await cli.post( | resp = await cli.post( | ||||
'/echo-no-nego', | '/echo-no-nego', | ||||
headers={'Content-Type': 'application/json', | headers={ | ||||
'Accept': 'application/%s' % ctype}, | 'Content-Type': 'application/json', | ||||
data=json.dumps(STRUCT, cls=SWHJSONEncoder)) | 'Accept': 'application/%s' % ctype, | ||||
}, | |||||
data=json.dumps(STRUCT, cls=SWHJSONEncoder), | |||||
) | |||||
assert resp.status == 200 | assert resp.status == 200 | ||||
check_mimetype(resp.headers['Content-Type'], 'application/x-msgpack') | check_mimetype(resp.headers['Content-Type'], 'application/x-msgpack') | ||||
assert (await decode_request(resp)) == STRUCT | assert (await decode_request(resp)) == STRUCT |