Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7437436
server.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
7 KB
Subscribers
None
server.py
View Options
# Copyright (C) 2015-2019 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import
os
import
aiohttp.web
import
json
from
swh.core.config
import
read
as
config_read
from
swh.core.api.asynchronous
import
(
RPCServerApp
,
decode_request
,
encode_data_server
as
encode_data
)
from
swh.core.api.serializers
import
msgpack_loads
,
SWHJSONDecoder
from
swh.model
import
hashutil
from
swh.objstorage
import
get_objstorage
from
swh.objstorage.objstorage
import
DEFAULT_LIMIT
from
swh.objstorage.exc
import
ObjNotFoundError
from
swh.core.statsd
import
statsd
def
timed
(
f
):
async
def
w
(
*
a
,
**
kw
):
with
statsd
.
timed
(
'swh_objstorage_request_duration_seconds'
,
tags
=
{
'endpoint'
:
f
.
__name__
}):
return
await
f
(
*
a
,
**
kw
)
return
w
@timed
async
def
index
(
request
):
return
aiohttp
.
web
.
Response
(
body
=
"SWH Objstorage API server"
)
@timed
async
def
check_config
(
request
):
req
=
await
decode_request
(
request
)
return
encode_data
(
request
.
app
[
'objstorage'
]
.
check_config
(
**
req
))
@timed
async
def
contains
(
request
):
req
=
await
decode_request
(
request
)
return
encode_data
(
request
.
app
[
'objstorage'
]
.
__contains__
(
**
req
))
@timed
async
def
add_bytes
(
request
):
req
=
await
decode_request
(
request
)
statsd
.
increment
(
'swh_objstorage_in_bytes_total'
,
len
(
req
[
'content'
]),
tags
=
{
'endpoint'
:
'add_bytes'
})
return
encode_data
(
request
.
app
[
'objstorage'
]
.
add
(
**
req
))
@timed
async
def
add_batch
(
request
):
req
=
await
decode_request
(
request
)
return
encode_data
(
request
.
app
[
'objstorage'
]
.
add_batch
(
**
req
))
@timed
async
def
get_bytes
(
request
):
req
=
await
decode_request
(
request
)
try
:
ret
=
request
.
app
[
'objstorage'
]
.
get
(
**
req
)
except
ObjNotFoundError
:
ret
=
{
'error'
:
'object_not_found'
,
'request'
:
req
,
}
return
encode_data
(
ret
,
status
=
404
)
else
:
statsd
.
increment
(
'swh_objstorage_out_bytes_total'
,
len
(
ret
),
tags
=
{
'endpoint'
:
'get_bytes'
})
return
encode_data
(
ret
)
@timed
async
def
get_batch
(
request
):
req
=
await
decode_request
(
request
)
return
encode_data
(
request
.
app
[
'objstorage'
]
.
get_batch
(
**
req
))
@timed
async
def
check
(
request
):
req
=
await
decode_request
(
request
)
return
encode_data
(
request
.
app
[
'objstorage'
]
.
check
(
**
req
))
@timed
async
def
delete
(
request
):
req
=
await
decode_request
(
request
)
return
encode_data
(
request
.
app
[
'objstorage'
]
.
delete
(
**
req
))
# Management methods
@timed
async
def
get_random_contents
(
request
):
req
=
await
decode_request
(
request
)
return
encode_data
(
request
.
app
[
'objstorage'
]
.
get_random
(
**
req
))
# Streaming methods
@timed
async
def
add_stream
(
request
):
hex_id
=
request
.
match_info
[
'hex_id'
]
obj_id
=
hashutil
.
hash_to_bytes
(
hex_id
)
check_pres
=
(
request
.
query
.
get
(
'check_presence'
,
''
)
.
lower
()
==
'true'
)
objstorage
=
request
.
app
[
'objstorage'
]
if
check_pres
and
obj_id
in
objstorage
:
return
encode_data
(
obj_id
)
# XXX this really should go in a decode_stream_request coroutine in
# swh.core, but since py35 does not support async generators, it cannot
# easily be made for now
content_type
=
request
.
headers
.
get
(
'Content-Type'
)
if
content_type
==
'application/x-msgpack'
:
decode
=
msgpack_loads
elif
content_type
==
'application/json'
:
decode
=
lambda
x
:
json
.
loads
(
x
,
cls
=
SWHJSONDecoder
)
# noqa
else
:
raise
ValueError
(
'Wrong content type `
%s
` for API request'
%
content_type
)
buffer
=
b
''
with
objstorage
.
chunk_writer
(
obj_id
)
as
write
:
while
not
request
.
content
.
at_eof
():
data
,
eot
=
await
request
.
content
.
readchunk
()
buffer
+=
data
if
eot
:
write
(
decode
(
buffer
))
buffer
=
b
''
return
encode_data
(
obj_id
)
@timed
async
def
get_stream
(
request
):
hex_id
=
request
.
match_info
[
'hex_id'
]
obj_id
=
hashutil
.
hash_to_bytes
(
hex_id
)
response
=
aiohttp
.
web
.
StreamResponse
()
await
response
.
prepare
(
request
)
for
chunk
in
request
.
app
[
'objstorage'
]
.
get_stream
(
obj_id
,
2
<<
20
):
await
response
.
write
(
chunk
)
await
response
.
write_eof
()
return
response
@timed
async
def
list_content
(
request
):
last_obj_id
=
request
.
query
.
get
(
'last_obj_id'
)
if
last_obj_id
:
last_obj_id
=
bytes
.
fromhex
(
last_obj_id
)
limit
=
int
(
request
.
query
.
get
(
'limit'
,
DEFAULT_LIMIT
))
response
=
aiohttp
.
web
.
StreamResponse
()
response
.
enable_chunked_encoding
()
await
response
.
prepare
(
request
)
for
obj_id
in
request
.
app
[
'objstorage'
]
.
list_content
(
last_obj_id
,
limit
=
limit
):
await
response
.
write
(
obj_id
)
await
response
.
write_eof
()
return
response
def
make_app
(
config
):
"""Initialize the remote api application.
"""
client_max_size
=
config
.
get
(
'client_max_size'
,
1024
*
1024
*
1024
)
app
=
RPCServerApp
(
client_max_size
=
client_max_size
)
# retro compatibility configuration settings
app
[
'config'
]
=
config
_cfg
=
config
[
'objstorage'
]
app
[
'objstorage'
]
=
get_objstorage
(
_cfg
[
'cls'
],
_cfg
[
'args'
])
app
.
router
.
add_route
(
'GET'
,
'/'
,
index
)
app
.
router
.
add_route
(
'POST'
,
'/check_config'
,
check_config
)
app
.
router
.
add_route
(
'POST'
,
'/content/contains'
,
contains
)
app
.
router
.
add_route
(
'POST'
,
'/content/add'
,
add_bytes
)
app
.
router
.
add_route
(
'POST'
,
'/content/add/batch'
,
add_batch
)
app
.
router
.
add_route
(
'POST'
,
'/content/get'
,
get_bytes
)
app
.
router
.
add_route
(
'POST'
,
'/content/get/batch'
,
get_batch
)
app
.
router
.
add_route
(
'POST'
,
'/content/get/random'
,
get_random_contents
)
app
.
router
.
add_route
(
'POST'
,
'/content/check'
,
check
)
app
.
router
.
add_route
(
'POST'
,
'/content/delete'
,
delete
)
app
.
router
.
add_route
(
'GET'
,
'/content'
,
list_content
)
app
.
router
.
add_route
(
'POST'
,
'/content/add_stream/{hex_id}'
,
add_stream
)
app
.
router
.
add_route
(
'GET'
,
'/content/get_stream/{hex_id}'
,
get_stream
)
return
app
def
load_and_check_config
(
config_file
):
"""Check the minimal configuration is set to run the api or raise an
error explanation.
Args:
config_file (str): Path to the configuration file to load
type (str): configuration type. For 'local' type, more
checks are done.
Raises:
Error if the setup is not as expected
Returns:
configuration as a dict
"""
if
not
config_file
:
raise
EnvironmentError
(
'Configuration file must be defined'
)
if
not
os
.
path
.
exists
(
config_file
):
raise
FileNotFoundError
(
'Configuration file
%s
does not exist'
%
(
config_file
,
))
cfg
=
config_read
(
config_file
)
if
'objstorage'
not
in
cfg
:
raise
KeyError
(
"Invalid configuration; missing objstorage config entry"
)
missing_keys
=
[]
vcfg
=
cfg
[
'objstorage'
]
for
key
in
(
'cls'
,
'args'
):
v
=
vcfg
.
get
(
key
)
if
v
is
None
:
missing_keys
.
append
(
key
)
if
missing_keys
:
raise
KeyError
(
"Invalid configuration; missing
%s
config entry"
%
(
', '
.
join
(
missing_keys
),
))
cls
=
vcfg
.
get
(
'cls'
)
if
cls
==
'pathslicing'
:
args
=
vcfg
[
'args'
]
for
key
in
(
'root'
,
'slicing'
):
v
=
args
.
get
(
key
)
if
v
is
None
:
missing_keys
.
append
(
key
)
if
missing_keys
:
raise
KeyError
(
"Invalid configuration; missing args.
%s
config entry"
%
(
', '
.
join
(
missing_keys
),
))
return
cfg
def
make_app_from_configfile
():
"""Load configuration and then build application to run
"""
config_file
=
os
.
environ
.
get
(
'SWH_CONFIG_FILENAME'
)
config
=
load_and_check_config
(
config_file
)
return
make_app
(
config
=
config
)
if
__name__
==
'__main__'
:
print
(
'Deprecated. Use swh-objstorage'
)
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Mon, Apr 14, 11:52 PM (1 w, 1 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3241678
Attached To
rDOBJS Object storage
Event Timeline
Log In to Comment