Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/buffer.py
Show All 22 Lines | .. code-block:: yaml | ||||
cls: buffer | cls: buffer | ||||
args: | args: | ||||
storage: | storage: | ||||
cls: remote | cls: remote | ||||
args: http://storage.internal.staging.swh.network:5002/ | args: http://storage.internal.staging.swh.network:5002/ | ||||
min_batch_size: | min_batch_size: | ||||
content: 10000 | content: 10000 | ||||
content_bytes: 100000000 | content_bytes: 100000000 | ||||
skipped_content: 10000 | |||||
directory: 5000 | directory: 5000 | ||||
revision: 1000 | revision: 1000 | ||||
release: 10000 | release: 10000 | ||||
""" | """ | ||||
def __init__(self, storage, min_batch_size=None): | def __init__(self, storage, min_batch_size=None): | ||||
self.storage = get_storage(**storage) | self.storage = get_storage(**storage) | ||||
if min_batch_size is None: | if min_batch_size is None: | ||||
min_batch_size = {} | min_batch_size = {} | ||||
self.min_batch_size = { | self.min_batch_size = { | ||||
'content': min_batch_size.get('content', 10000), | 'content': min_batch_size.get('content', 10000), | ||||
'content_bytes': min_batch_size.get('content_bytes', | 'content_bytes': min_batch_size.get('content_bytes', | ||||
100*1024*1024), | 100*1024*1024), | ||||
'skipped_content': min_batch_size.get('skipped_content', 10000), | |||||
'directory': min_batch_size.get('directory', 25000), | 'directory': min_batch_size.get('directory', 25000), | ||||
'revision': min_batch_size.get('revision', 100000), | 'revision': min_batch_size.get('revision', 100000), | ||||
'release': min_batch_size.get('release', 100000), | 'release': min_batch_size.get('release', 100000), | ||||
} | } | ||||
self.object_types = ['content', 'directory', 'revision', 'release'] | self.object_types = [ | ||||
'content', 'skipped_content', 'directory', 'revision', 'release'] | |||||
self._objects = {k: deque() for k in self.object_types} | self._objects = {k: deque() for k in self.object_types} | ||||
def __getattr__(self, key): | def __getattr__(self, key): | ||||
if key.endswith('_add'): | if key.endswith('_add'): | ||||
object_type = key.split('_')[0] | object_type = key.rsplit('_', 1)[0] | ||||
if object_type in self.object_types: | if object_type in self.object_types: | ||||
return partial( | return partial( | ||||
self.object_add, object_type=object_type | self.object_add, object_type=object_type | ||||
) | ) | ||||
return getattr(self.storage, key) | return getattr(self.storage, key) | ||||
def content_add(self, content: Iterable[Dict]) -> Dict: | def content_add(self, content: Iterable[Dict]) -> Dict: | ||||
"""Enqueue contents to write to the storage. | """Enqueue contents to write to the storage. | ||||
▲ Show 20 Lines • Show All 46 Lines • Show Last 20 Lines |