diff --git a/swh/vault/cookers/__init__.py b/swh/vault/cookers/__init__.py --- a/swh/vault/cookers/__init__.py +++ b/swh/vault/cookers/__init__.py @@ -122,10 +122,8 @@ else: graph = None - return cooker_cls( - swhid, - backend=backend, - storage=storage, - graph=graph, - max_bundle_size=cfg["max_bundle_size"], - ) + kwargs = { + k: v for (k, v) in cfg.items() if k in ("max_bundle_size", "thread_pool_size") + } + + return cooker_cls(swhid, backend=backend, storage=storage, graph=graph, **kwargs,) diff --git a/swh/vault/cookers/base.py b/swh/vault/cookers/base.py --- a/swh/vault/cookers/base.py +++ b/swh/vault/cookers/base.py @@ -71,6 +71,7 @@ graph=None, objstorage=None, max_bundle_size: int = MAX_BUNDLE_SIZE, + thread_pool_size: int = 10, ): """Initialize the cooker. @@ -90,6 +91,7 @@ self.objstorage = objstorage self.graph = graph self.max_bundle_size = max_bundle_size + self.thread_pool_size = thread_pool_size @classmethod def check_object_type(cls, object_type: ObjectType) -> None: diff --git a/swh/vault/cookers/git_bare.py b/swh/vault/cookers/git_bare.py --- a/swh/vault/cookers/git_bare.py +++ b/swh/vault/cookers/git_bare.py @@ -21,6 +21,7 @@ import enum import glob import logging +import multiprocessing.dummy import os.path import re import subprocess @@ -53,7 +54,7 @@ RELEASE_BATCH_SIZE = 10000 REVISION_BATCH_SIZE = 10000 -DIRECTORY_BATCH_SIZE = 10000 +DIRECTORY_BATCH_SIZE = 10000 # should be at least ~10x larger than THREAD_POOL_SIZE CONTENT_BATCH_SIZE = 100 @@ -524,8 +525,12 @@ return self.write_object(release["id"], git_object) def load_directories(self, obj_ids: List[Sha1Git]) -> None: - for obj_id in obj_ids: - self.load_directory(obj_id) + if not obj_ids: + return + + with multiprocessing.dummy.Pool(min(self.thread_pool_size, len(obj_ids))) as p: + for _ in p.imap_unordered(self.load_directory, obj_ids): + pass def load_directory(self, obj_id: Sha1Git) -> None: # Load the directory