Changeset View
Standalone View
swh/objstorage/backends/winery/objstorage.py
- This file was added.
# Copyright (C) 2021 The Software Heritage developers | |||||
# See the AUTHORS file at the top-level directory of this distribution | |||||
# License: GNU General Public License version 3, or any later version | |||||
# See top-level LICENSE file for more information | |||||
import logging | |||||
from multiprocessing import Process | |||||
from swh.objstorage import exc | |||||
from swh.objstorage.objstorage import ObjStorage, compute_hash | |||||
from .roshard import ROShard | |||||
from .rwshard import RWShard | |||||
from .sharedbase import SharedBase | |||||
LOGGER = logging.getLogger(__name__) | |||||
def pack(shard, **kwargs): | |||||
dachary: @olasd the WineryObjStorage is split into four classes for clarity. | |||||
return Packer(shard, **kwargs).run() | |||||
class Packer: | |||||
def __init__(self, shard, **kwargs): | |||||
self.args = kwargs | |||||
self.shard = shard | |||||
self.init() | |||||
def init(self): | |||||
self.rw = RWShard(self.shard, **self.args) | |||||
self.ro = ROShard(self.shard, **self.args) | |||||
def uninit(self): | |||||
del self.ro | |||||
self.rw.uninit() | |||||
def run(self): | |||||
shard = self.ro.create(self.rw.count()) | |||||
for obj_id, content in self.rw.all(): | |||||
shard.write(obj_id, content) | |||||
shard.save() | |||||
base = SharedBase(**self.args) | |||||
base.shard_packing_ends(self.shard) | |||||
base.uninit() | |||||
self.rw.uninit() | |||||
self.rw.drop() | |||||
return True | |||||
class WineryObjStorage(ObjStorage): | |||||
def __init__(self, **kwargs): | |||||
super().__init__(**kwargs) | |||||
Done Inline ActionsShouldn't the r/o status be tested before trying to insert a content ? vsellier: Shouldn't the r/o status be tested before trying to insert a content ? | |||||
Done Inline ActionsThe cluster, as a whole, will never be readonly. dachary: The cluster, as a whole, will never be readonly. | |||||
self.args = kwargs | |||||
self.packers = [] | |||||
self.init() | |||||
def init(self): | |||||
self.base = SharedBase(**self.args) | |||||
if not self.args.get("readonly"): | |||||
self.shard = RWShard(self.base.whoami, **self.args) | |||||
Done Inline Actionsshouldn't this be an error? vlorentz: shouldn't this be an error? | |||||
Done Inline ActionsExcellent question! It could but this is not how other backends adress this race condition so I went with the flow. The object is being dealt with elsewhere. dachary: Excellent question! It could but this is not how other backends adress this race condition so I… | |||||
Done Inline ActionsI've ended up writing the same comment on the current version of this code, so I think there's a gap in the logic somewhere. Here's the scenario which worries me: A loader sends object A to be written; it's picked up by the objstorage worker for shard1 which inserts (A, shard1, inflight=true) in signature2shard, and swiftly crashes because of a bug. The loader in turn crashes. The write for object A is attempted again by another loader; the objstorage worker for shard2 picks it up, but, AFAICT, fails phase 1 (because of the unique index on signature2shard(signature)). What happens then to object A? olasd: I've ended up writing the same comment on the current version of this code, so I think there's… | |||||
Done Inline ActionsThe idea is that SharedBase.add_phase_1 either
Which allows it to pick up where the crashed worker left. However reading the code again I saw there was a bug in how the unique violation was handled and fixed it. That being said, the tests do not still cover border cases such as this one and they are what I intend to work on next. They are tricky to get right and I wanted to wait for the first wave over review before getting to them. I think I'm ready now :-) dachary: The idea is that SharedBase.add_phase_1 either
* creates the row or,
* if there is a unique… | |||||
def uninit(self): | |||||
if not self.args.get("readonly"): | |||||
self.shard.uninit() | |||||
self.base.uninit() | |||||
def roshard(self, name): | |||||
shard = ROShard(name, **self.args) | |||||
shard.load() | |||||
return shard | |||||
def get(self, obj_id): | |||||
shard_info = self.base.get(obj_id) | |||||
if shard_info is None: | |||||
raise exc.ObjNotFoundError(obj_id) | |||||
name, readonly = shard_info | |||||
if readonly: | |||||
Done Inline ActionsShould a r/o shard should not support to read an object when it's r/o? Does it mean an object will not be readable during the packing? vsellier: Should a r/o shard should not support to read an object when it's r/o?
Does it mean an object… | |||||
Done Inline ActionsThe Shard can be r/o but the object is immutable which is slightly different. Here we're dealing with r/o shards. During packing readonly will be NULL, i.e. the shard is not writable anymore but not yet readonly. dachary: The Shard can be r/o but the object is immutable which is slightly different. Here we're… | |||||
shard = self.roshard(name) | |||||
content = shard.get(obj_id) | |||||
del shard | |||||
else: | |||||
shard = RWShard(name, **self.args) | |||||
content = shard.get(obj_id) | |||||
if content is None: | |||||
raise exc.ObjNotFoundError(obj_id) | |||||
return content | |||||
def check_config(self, *, check_write): | |||||
return True | |||||
def __contains__(self, obj_id): | |||||
return self.base.contains(obj_id) | |||||
def add(self, content, obj_id=None, check_presence=True): | |||||
if obj_id is None: | |||||
obj_id = compute_hash(content) | |||||
if check_presence and obj_id in self: | |||||
return obj_id | |||||
shard = self.base.add_phase_1(obj_id) | |||||
if shard != self.base.id: | |||||
# this object is the responsibility of another shard | |||||
return obj_id | |||||
self.shard.add(obj_id, content) | |||||
self.base.add_phase_2(obj_id) | |||||
if self.shard.is_full(): | |||||
self.pack() | |||||
return obj_id | |||||
def check(self, obj_id): | |||||
# load all shards readonly == NULL and not locked (i.e. packer | |||||
# was interrupted for whatever reason) run pack for each of them | |||||
pass | |||||
def delete(self, obj_id): | |||||
raise PermissionError("Delete is not allowed.") | |||||
def pack(self): | |||||
self.base.shard_packing_starts() | |||||
p = Process(target=pack, args=(self.shard.name,), kwargs=self.args) | |||||
self.uninit() | |||||
p.start() | |||||
self.packers.append(p) | |||||
self.init() | |||||
def __del__(self): | |||||
for p in self.packers: | |||||
p.kill() | |||||
p.join() |
@olasd the WineryObjStorage is split into four classes for clarity.