for each batch of messages, dispatch the copy of individual objects in a
ThreadPoolExecutor.
The idea is to allow concurrency to ge beyong process parallelism
provided by kafka consumer groups. Since the copy a one object is mainly
IO bound (check existence in destination objstare, retrieve from source
objstorage, put in destination objstorage) with possibly large delays
(eg. retrieveing a blob from s3 imply a minimum 150/200ms delay before
the HTTP request is replied, whatever the size of the object);
this tries to parallelize those delays.
Details
- Reviewers
vlorentz - Group Reviewers
Reviewers - Commits
- rDOBJSRPL0dffebc423a1: Make the copy process of blob objects run with thread concurrency
Diff Detail
- Repository
- rDOBJSRPL Content replayer
- Branch
- concurrent
- Lint
Lint Skipped - Unit
Unit Tests Skipped - Build Status
Buildable 26018 Build 40660: Phabricator diff pipeline on jenkins Jenkins console · Jenkins Build 40659: arc lint + arc unit
Event Timeline
Build is green
Patch application report for D6945 (id=25144)
Rebasing onto a2d1aa9944...
Current branch diff-target is up to date.
Changes applied before test
commit 3cbf7c8cf6374553c73905929b16cbd682110e60 Author: David Douard <david.douard@sdfa3.org> Date: Mon Jan 10 15:11:42 2022 +0100 Make the copy process of blob objects run with thread concurrency for each batch of messages, dispatch the copy of individual objects in a ThreadPoolExecutor. The idea is to allow concurrency to ge beyong process parallelism provided by kafka consumer groups. Since the copy a one object is mainly IO bound (check existence in destination objstare, retrieve from source objstorage, put in destination objstorage) with possibly large delays (eg. retrieveing a blob from s3 imply a minimum 150/200ms delay before the HTTP request is replied, whatever the size of the object); this tries to parallelize those delays.
See https://jenkins.softwareheritage.org/job/DOBJSRPL/job/tests-on-diff/29/ for more details.
Build is green
Patch application report for D6945 (id=25149)
Rebasing onto a2d1aa9944...
Current branch diff-target is up to date.
Changes applied before test
commit 0dffebc423a157cb3a36cde60e50dd2daae36869 Author: David Douard <david.douard@sdfa3.org> Date: Mon Jan 10 15:11:42 2022 +0100 Make the copy process of blob objects run with thread concurrency for each batch of messages, dispatch the copy of individual objects in a ThreadPoolExecutor. The idea is to allow concurrency to ge beyong process parallelism provided by kafka consumer groups. Since the copy a one object is mainly IO bound (check existence in destination objstare, retrieve from source objstorage, put in destination objstorage) with possibly large delays (eg. retrieveing a blob from s3 imply a minimum 150/200ms delay before the HTTP request is replied, whatever the size of the object); this tries to parallelize those delays.
See https://jenkins.softwareheritage.org/job/DOBJSRPL/job/tests-on-diff/30/ for more details.
swh/objstorage/replayer/replay.py | ||
---|---|---|
314–333 | It seems simpler to implement it using [[ https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.dummy | multiprocessing.dummy ]]: for object_type in all_objects: if object_type != "content": logger.warning( "Received a series of %s, this should not happen", object_type ) continue with multiprocessing.dummy.Pool(concurrency) as p: for _ in p.imap_unordered(lambda args: _copy_object(*args), all_objects.items()): pass |
swh/objstorage/replayer/replay.py | ||
---|---|---|
314–333 | I don't know, maybe it's possible to simplify a bit this code, but about using mp.dummy, the documentation state:
|
swh/objstorage/replayer/replay.py | ||
---|---|---|
314–333 | the rationale is only about the features of the API, which we don't use here. Anyway, no big deal |