Page MenuHomeSoftware Heritage

striping_objstorage.py
No OneTemporary

striping_objstorage.py

# Copyright (C) 2018 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from collections import defaultdict
import queue
from .multiplexer_objstorage import ObjStorageThread, MultiplexerObjStorage
class StripingObjStorage(MultiplexerObjStorage):
"""Stripes objects across multiple objstorages
This objstorage implementation will write objects to objstorages in a
predictable way: it takes the modulo of the last 8 bytes of the object
identifier with the number of object storages passed, which will yield an
(almost) even distribution.
Objects are read from all storages in turn until it succeeds.
"""
MOD_BYTES = 8
def __init__(self, storages, **kwargs):
super().__init__(storages, **kwargs)
self.num_storages = len(storages)
def get_storage_index(self, obj_id):
if obj_id is None:
raise ValueError(
'StripingObjStorage always needs obj_id to be set'
)
index = int.from_bytes(obj_id[:-self.MOD_BYTES], 'big')
return index % self.num_storages
def get_write_threads(self, obj_id):
idx = self.get_storage_index(obj_id)
yield self.storage_threads[idx]
def get_read_threads(self, obj_id=None):
if obj_id:
idx = self.get_storage_index(obj_id)
else:
idx = 0
for i in range(self.num_storages):
yield self.storage_threads[(idx + i) % self.num_storages]
def add_batch(self, contents, check_presence=True):
"""Add a batch of new objects to the object storage.
"""
content_by_storage_index = defaultdict(dict)
for obj_id, content in contents.items():
storage_index = self.get_storage_index(obj_id)
content_by_storage_index[storage_index][obj_id] = content
mailbox = queue.Queue()
for storage_index, contents in content_by_storage_index.items():
self.storage_threads[storage_index].queue_command(
'add_batch',
contents,
check_presence=check_presence,
mailbox=mailbox,
)
return sum(
ObjStorageThread.collect_results(
mailbox, len(content_by_storage_index)
)
)

File Metadata

Mime Type
text/x-python
Expires
Thu, Jul 3, 12:25 PM (3 d, 8 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3241096

Event Timeline