Changeset View
Changeset View
Standalone View
Standalone View
swh/objstorage/backends/winery/objstorage.py
- This file was added.
# Copyright (C) 2021 The Software Heritage developers | |||||
# See the AUTHORS file at the top-level directory of this distribution | |||||
# License: GNU General Public License version 3, or any later version | |||||
# See top-level LICENSE file for more information | |||||
import concurrent.futures | |||||
import logging | |||||
from swh.objstorage import exc | |||||
from swh.objstorage.objstorage import ObjStorage, compute_hash | |||||
from .rwshard import RWShard | |||||
from .sharedbase import SharedBase | |||||
LOGGER = logging.getLogger(__name__) | |||||
LOGGER.setLevel(logging.ERROR) | |||||
def pack(shard, **kwargs): | |||||
return Packer(shard, **kwargs).run() | |||||
class Packer: | |||||
def __init__(self, shard, **kwargs): | |||||
self.args = kwargs | |||||
self.base = SharedBase(**self.args) | |||||
self.shard = RWShard(shard, **self.args) | |||||
def run(self): | |||||
return True | |||||
class WineryObjStorage(ObjStorage): | |||||
def __init__(self, **kwargs): | |||||
super().__init__(**kwargs) | |||||
self.args = kwargs | |||||
self.executor = concurrent.futures.ProcessPoolExecutor(max_workers=1) | |||||
self.packers = [] | |||||
self.init() | |||||
def init(self): | |||||
self.base = SharedBase(**self.args) | |||||
self.shard = RWShard(self.base.whoami, **self.args) | |||||
def check_config(self, *, check_write): | |||||
return True | |||||
def __contains__(self, obj_id): | |||||
return self.base.contains(obj_id) | |||||
def add(self, content, obj_id=None, check_presence=True): | |||||
if obj_id is None: | |||||
obj_id = compute_hash(content) | |||||
if check_presence and obj_id in self: | |||||
return obj_id | |||||
shard = self.base.add_phase_1(obj_id) | |||||
if shard != self.base.id: | |||||
# this object is the responsibility of another shard | |||||
return obj_id | |||||
self.shard.add(obj_id, content) | |||||
self.base.add_phase_2(obj_id) | |||||
if self.shard.is_full(): | |||||
self.pack() | |||||
return obj_id | |||||
def get(self, obj_id): | |||||
shard_info = self.base.get(obj_id) | |||||
if shard_info is None: | |||||
raise exc.ObjNotFoundError(obj_id) | |||||
name, readonly = shard_info | |||||
if readonly: | |||||
pass # ROShard | |||||
else: | |||||
shard = RWShard(name, **self.args) | |||||
content = shard.get(obj_id) | |||||
if content is None: | |||||
raise exc.ObjNotFoundError(obj_id) | |||||
return content | |||||
def check(self, obj_id): | |||||
# load all shards readonly == NULL and not locked (i.e. packer | |||||
# was interrupted for whatever reason) run pack for each of them | |||||
pass | |||||
def delete(self, obj_id): | |||||
raise PermissionError("Delete is not allowed.") | |||||
def pack(self): | |||||
self.base.shard_packing_starts() | |||||
self.packers.append(self.executor.submit(pack, self.shard.name, **self.args)) | |||||
self.init() |