Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Paste
P820
bench objstorage
Active
Public
Actions
Authored by
douardda
on Oct 14 2020, 3:39 PM.
Edit Paste
Archive Paste
View Raw File
Subscribe
Mute Notifications
Award Token
Flag For Later
Tags
None
Subscribers
None
import
os
import
sys
import
concurrent
import
time
import
yaml
import
multiprocessing
as
mp
from
urllib.parse
import
urlparse
,
parse_qs
from
itertools
import
count
import
signal
import
logging
import
zmq
import
click
try
:
from
setproctitle
import
setproctitle
except
ImportError
:
setproctitle
=
None
from
swh.core.statsd
import
statsd
from
swh.objstorage
import
get_objstorage
from
swh.objstorage.api.server
import
load_and_check_config
ctx
=
zmq
.
Context
()
zurl
=
'tcp://127.0.0.1:5556'
PIPELINE
=
10
def
generate_obj_ids
(
storage
):
'''Simple generator that produces obj_ids from the given objstorage.
If storage is None, generates random files of various size.
'''
last_obj_id
=
None
i
=
0
while
True
:
logging
.
info
(
'New storage listing batch from
%s
'
,
last_obj_id
)
for
obj_id
in
storage
.
list_content
(
last_obj_id
):
yield
obj_id
i
+=
1
logging
.
debug
(
'yield
%s
%s
'
,
i
,
obj_id
)
if
last_obj_id
==
obj_id
:
logging
.
info
(
'Exiting obj_id generator'
)
# list_content returned an empty list, we are done here
break
last_obj_id
=
obj_id
def
obj_ids_srv
(
cfg
,
time_limit
=
None
):
'''generator that respond to any request on the REQ zmq channel with obj_ids
from the given storage.
'''
if
setproctitle
:
setproctitle
(
'bench objstorage: obj id gen'
)
logger
=
logging
.
getLogger
(
'ObjID gen'
)
logger
.
info
(
'Starting obj_id filler'
)
sock
=
ctx
.
socket
(
zmq
.
REP
)
sock
.
set_hwm
(
PIPELINE
)
sock
.
bind
(
zurl
)
storage
=
get_objstorage
(
**
cfg
)
t0
=
time
.
time
()
for
i
,
obj_id
in
zip
(
count
(),
generate_obj_ids
(
storage
)):
if
time_limit
and
(
time
.
time
()
-
t0
)
>
time_limit
:
logger
.
info
(
'Time limit reached'
)
break
logger
.
debug
(
'waiting for request'
)
recv
=
sock
.
recv
()
if
recv
==
b
'exit'
:
logging
.
info
(
'QUITTING 1'
)
sock
.
send
(
b
''
)
return
logger
.
debug
(
'OID: RCV
%s
'
,
recv
)
sock
.
send
(
obj_id
)
logger
.
debug
(
'OID: SNT
%s
'
,
obj_id
)
statsd
.
increment
(
'swh_objstorage_bench_objid_count'
)
logger
.
debug
(
'End of loop'
)
else
:
logger
.
debug
(
'No more ID to produce'
)
while
True
:
if
sock
.
recv
()
==
b
'exit'
:
logger
.
info
(
'QUITTING 2'
)
sock
.
send
(
b
''
)
break
logger
.
debug
(
'OBJ_ID SEND EOT'
)
sock
.
send
(
b
''
)
# send EOT, we are done
def
pull_push_cli
(
cfg_from
,
cfg_to
):
'''Consumes obj_ids from the zmq REQ/REPL channel and grab corresponding
objects from the obstorage sto_from and push them is the sto_to objstorage.
If this later is None, objects are dropped.
'''
logger
=
logging
.
getLogger
(
'P/P worker'
)
if
setproctitle
:
setproctitle
(
'bench objstorage: P/P worker'
)
sto_from
=
get_objstorage
(
**
cfg_from
)
if
cfg_to
:
sto_to
=
get_objstorage
(
**
cfg_to
)
else
:
sto_to
=
None
sock
=
ctx
.
socket
(
zmq
.
REQ
)
# sock.set_hwm(PIPELINE)
sock
.
connect
(
zurl
)
logger
.
info
(
'starting'
)
t0
=
time
.
time
()
vol
=
0
get_errors
=
0
put_errors
=
0
pid
=
os
.
getpid
()
for
i
in
count
():
logger
.
debug
(
'SEND REQ'
)
sock
.
send
((
'pid:
%s
'
%
pid
)
.
encode
())
logger
.
debug
(
'SENT REQ'
)
obj_id
=
sock
.
recv
()
# logger.info('W: got %s', obj_id)
if
not
obj_id
:
logger
.
info
(
'Exiting. Processed
%s
objects'
,
i
-
1
)
return
(
i
,
vol
,
get_errors
,
put_errors
)
if
(
i
%
100
)
==
0
and
(
time
.
time
()
-
t0
)
>
1
:
logger
.
info
(
'objects:
%s
'
,
i
)
t0
=
time
.
time
()
try
:
with
statsd
.
timed
(
'swh_objstorage_bench_get'
):
logger
.
debug
(
'getting
%r
'
,
obj_id
)
#logger.info('GET')
obj
=
sto_from
.
get
(
obj_id
)
#logger.info('GET OK')
except
:
# noqa
get_errors
+=
1
else
:
if
sto_to
is
not
None
:
try
:
with
statsd
.
timed
(
'swh_objstorage_bench_put'
):
logger
.
debug
(
'adding
%r
'
,
obj_id
)
sto_to
.
add
(
obj
)
except
:
# noqa
put_errors
+=
1
v
=
len
(
obj
)
vol
+=
v
statsd
.
increment
(
'swh_objstorage_bench_volume'
,
v
)
# logger.info('EOLoop')
def
worker
(
sto_from
,
sto_to
,
n
=
2
):
logging
.
info
(
'Starting worker with
%s
threads'
,
n
)
if
n
>
1
:
with
concurrent
.
futures
.
ThreadPoolExecutor
(
max_workers
=
n
)
as
executor
:
futures
=
[]
for
i
in
range
(
n
):
futures
.
append
(
executor
.
submit
(
pull_push_cli
,
sto_from
,
sto_to
))
tot
,
vol
,
gerr
,
perr
=
map
(
sum
,
zip
(
*
[
f
.
result
()
for
f
in
futures
]))
return
tot
,
vol
,
gerr
,
perr
else
:
return
pull_push_cli
(
sto_from
,
sto_to
)
@click.command
(
'swh-objstorage-copy'
)
@click.argument
(
'cfg-from'
,
required
=
True
)
@click.argument
(
'cfg-to'
,
required
=
False
,
default
=
None
)
@click.option
(
'--duration'
,
'-t'
,
type
=
int
,
default
=
None
)
@click.option
(
'--workers'
,
'-w'
,
type
=
int
,
default
=
2
)
@click.option
(
'--threads'
,
'-c'
,
type
=
int
,
default
=
2
)
@click.option
(
'--log-level'
,
'-l'
,
default
=
'INFO'
,
type
=
click
.
Choice
(
logging
.
_nameToLevel
.
keys
()))
@click.option
(
'--message'
,
default
=
None
)
def
copy
(
cfg_from
,
cfg_to
,
duration
,
workers
,
threads
,
log_level
,
message
):
"""Copy from one objstorage to the other. This tool is dedicated to stress
testing swh-objstorage backends.
The obj storages arguments can be given a either a file name (the standard
obj storage config file in yaml) or as an url of the form:
<backend>://?<arg>=<value>&[...]
If this url is a standard http one, it is expected to be the url of a
running swh objstorage server.
If the destination obj storage is not given, objects are dropped (for a
read only stress test).
It will produce simple performance summary as output, but is will also push
statsd metrics on the UDP endpoint given by the STATDS_HOST/STATSD_PORT
environment variable (these default to localhost:8125), if you want to see
real time curves of how the copy is performing.
Examples:
- Generate random objects and put them in a running objstorage listening on
port 5003:
swh-objstorage-copy random:// http://127.0.0.1:5003
- Same as above, but limit the generator to 1000 fixed size objects:
swh-objstorage-copy random://?total=1000&filesize=1024 \
http://127.0.0.1:5003
- Pull with 16 workers x 2 threads from a local pathslicer only for 2mn:
swh-objstorage-copy -t 120 -w 16 -c 2 \
pathslicing://?root=/tmp/objstorage&slicing=0:4/8:10
"""
if
setproctitle
:
setproctitle
(
'bench objstorage: main'
)
logging
.
basicConfig
(
level
=
log_level
,
format
=
'PID
%(process)5s
%(name)18s
:
%(message)s
'
,
stream
=
sys
.
stderr
,
)
original_sigint_handler
=
signal
.
signal
(
signal
.
SIGINT
,
signal
.
SIG_IGN
)
signal
.
signal
(
signal
.
SIGINT
,
original_sigint_handler
)
storage
=
{
'from'
:
None
,
'to'
:
None
}
for
url
,
name
in
((
cfg_from
,
'from'
),
(
cfg_to
,
'to'
)):
cfg
=
urlparse
(
url
)
cfg_dict
=
{}
if
cfg
.
scheme
in
(
'http'
,
'https'
):
cfg_dict
=
{
'objstorage'
:
{
'cls'
:
'remote'
,
'args'
:
{
'url'
:
url
}}}
elif
cfg
.
scheme
:
args
=
{
k
:
v
[
0
]
for
k
,
v
in
parse_qs
(
cfg
.
query
)
.
items
()}
cfg_dict
=
{
'objstorage'
:
{
'cls'
:
cfg
.
scheme
,
'args'
:
args
,
}}
elif
url
:
cfg_dict
=
load_and_check_config
(
url
)
if
cfg_dict
:
storage
[
name
]
=
cfg_dict
[
'objstorage'
]
# storage[name] = get_objstorage(**cfg_dict['objstorage'])
logging
.
info
(
'Pulling from
%s
'
%
storage
[
'from'
])
if
storage
[
'to'
]
is
not
None
:
logging
.
info
(
'Pushing to
%s
'
%
storage
[
'to'
])
else
:
logging
.
info
(
'Droping objects'
)
logging
.
info
(
'Let
\'
s start...'
)
futures
=
[]
pool
=
mp
.
Pool
(
workers
)
for
i
in
range
(
workers
):
futures
.
append
(
pool
.
apply_async
(
worker
,
(
storage
[
'from'
],
storage
[
'to'
],
threads
)))
idprov
=
mp
.
Process
(
target
=
obj_ids_srv
,
args
=
(
storage
[
'from'
],
duration
))
idprov
.
start
()
t0
=
time
.
time
()
tot
,
vol
,
get_errors
,
put_errors
=
map
(
sum
,
zip
(
*
[
f
.
get
()
for
f
in
futures
]))
logging
.
info
(
'All workers are done...'
)
dt
=
time
.
time
()
-
t0
while
0
:
# True:
if
all
(
f
.
ready
()
for
f
in
futures
):
dt
=
time
.
time
()
-
t0
logging
.
info
(
'All workers are done...'
)
break
time
.
sleep
(
0.01
)
# print([f.result() for f in futures])
sock
=
ctx
.
socket
(
zmq
.
REQ
)
# sock.set_hwm(PIPELINE)
sock
.
connect
(
zurl
)
logging
.
info
(
'we are done, tell it to objidgen'
)
sock
.
send
(
b
'exit'
)
sock
.
recv
()
pool
.
close
()
pool
.
join
()
idprov
.
join
()
results
=
{
'config'
:
{
'workers'
:
workers
,
'threads'
:
threads
,
'storage-in'
:
storage
[
'from'
],
'storage-out'
:
storage
[
'to'
],
'note'
:
message
or
''
,
},
'volume'
:
{
'total'
:
vol
,
'rate'
:
vol
/
dt
},
'objects'
:
{
'total'
:
tot
,
'rate'
:
tot
/
dt
},
'time'
:
dt
,
'errors'
:
{
'get'
:
get_errors
,
'put'
:
put_errors
}
}
print
(
yaml
.
dump
([
results
],
default_flow_style
=
False
))
if
__name__
==
'__main__'
:
copy
()
Event Timeline
douardda
created this paste.
Oct 14 2020, 3:39 PM
2020-10-14 15:39:48 (UTC+2)
douardda
mentioned this in
T2706: Benchmark objstorage for mirror (uffizi vs. azure vs. s3)
.
Oct 15 2020, 12:43 PM
2020-10-15 12:43:59 (UTC+2)
Log In to Comment