Page MenuHomeSoftware Heritage

Make the copy process of blob objects run with thread concurrency
ClosedPublic

Authored by douardda on Jan 13 2022, 4:07 PM.

Details

Summary

for each batch of messages, dispatch the copy of individual objects in a
ThreadPoolExecutor.
The idea is to allow concurrency to ge beyong process parallelism
provided by kafka consumer groups. Since the copy a one object is mainly
IO bound (check existence in destination objstare, retrieve from source
objstorage, put in destination objstorage) with possibly large delays
(eg. retrieveing a blob from s3 imply a minimum 150/200ms delay before
the HTTP request is replied, whatever the size of the object);
this tries to parallelize those delays.

Diff Detail

Event Timeline

Build is green

Patch application report for D6945 (id=25144)

Rebasing onto a2d1aa9944...

Current branch diff-target is up to date.
Changes applied before test
commit 3cbf7c8cf6374553c73905929b16cbd682110e60
Author: David Douard <david.douard@sdfa3.org>
Date:   Mon Jan 10 15:11:42 2022 +0100

    Make the copy process of blob objects run with thread concurrency
    
    for each batch of messages, dispatch the copy of individual objects in a
    ThreadPoolExecutor.
    The idea is to allow concurrency to ge beyong process parallelism
    provided by kafka consumer groups. Since the copy a one object is mainly
    IO bound (check existence in destination objstare, retrieve from source
    objstorage, put in destination objstorage) with possibly large delays
    (eg. retrieveing a blob from s3 imply a minimum 150/200ms delay before
    the HTTP request is replied, whatever the size of the object);
    this tries to parallelize those delays.

See https://jenkins.softwareheritage.org/job/DOBJSRPL/job/tests-on-diff/29/ for more details.

Add the cli option to configure this concurrency value

Build is green

Patch application report for D6945 (id=25149)

Rebasing onto a2d1aa9944...

Current branch diff-target is up to date.
Changes applied before test
commit 0dffebc423a157cb3a36cde60e50dd2daae36869
Author: David Douard <david.douard@sdfa3.org>
Date:   Mon Jan 10 15:11:42 2022 +0100

    Make the copy process of blob objects run with thread concurrency
    
    for each batch of messages, dispatch the copy of individual objects in a
    ThreadPoolExecutor.
    The idea is to allow concurrency to ge beyong process parallelism
    provided by kafka consumer groups. Since the copy a one object is mainly
    IO bound (check existence in destination objstare, retrieve from source
    objstorage, put in destination objstorage) with possibly large delays
    (eg. retrieveing a blob from s3 imply a minimum 150/200ms delay before
    the HTTP request is replied, whatever the size of the object);
    this tries to parallelize those delays.

See https://jenkins.softwareheritage.org/job/DOBJSRPL/job/tests-on-diff/30/ for more details.

vlorentz added inline comments.
swh/objstorage/replayer/replay.py
314–333

It seems simpler to implement it using [[ https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.dummy | multiprocessing.dummy ]]:

for object_type in all_objects:
    if object_type != "content":
        logger.warning(
            "Received a series of %s, this should not happen", object_type
        )
        continue
with multiprocessing.dummy.Pool(concurrency) as p:
    for _ in p.imap_unordered(lambda args: _copy_object(*args), all_objects.items()):
        pass
swh/objstorage/replayer/replay.py
314–333

I don't know, maybe it's possible to simplify a bit this code, but about using mp.dummy, the documentation state:

Users should generally prefer to use concurrent.futures.ThreadPoolExecutor, which has a simpler interface that was designed around threads from the start

vlorentz added inline comments.
swh/objstorage/replayer/replay.py
314–333

the rationale is only about the features of the API, which we don't use here. Anyway, no big deal

This revision is now accepted and ready to land.Jan 17 2022, 11:36 AM