Changeset View
Changeset View
Standalone View
Standalone View
swh/objstorage/backends/winery/objstorage.py
# Copyright (C) 2022 The Software Heritage developers | # Copyright (C) 2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import logging | import logging | ||||
from multiprocessing import Process | from multiprocessing import Process | ||||
from swh.objstorage import exc | from swh.objstorage import exc | ||||
from swh.objstorage.interface import ObjId | |||||
from swh.objstorage.objstorage import ObjStorage | from swh.objstorage.objstorage import ObjStorage | ||||
from .roshard import ROShard | from .roshard import ROShard | ||||
from .rwshard import RWShard | from .rwshard import RWShard | ||||
from .sharedbase import SharedBase | from .sharedbase import SharedBase | ||||
from .stats import Stats | from .stats import Stats | ||||
logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||
class WineryObjStorage(ObjStorage): | class WineryObjStorage(ObjStorage): | ||||
def __init__(self, **kwargs): | def __init__(self, **kwargs): | ||||
super().__init__(**kwargs) | super().__init__(**kwargs) | ||||
if kwargs.get("readonly"): | if kwargs.get("readonly"): | ||||
self.winery = WineryReader(**kwargs) | self.winery = WineryReader(**kwargs) | ||||
else: | else: | ||||
self.winery = WineryWriter(**kwargs) | self.winery = WineryWriter(**kwargs) | ||||
def uninit(self): | def uninit(self): | ||||
self.winery.uninit() | self.winery.uninit() | ||||
def get(self, obj_id): | def get(self, obj_id: ObjId) -> bytes: | ||||
return self.winery.get(obj_id) | return self.winery.get(obj_id) | ||||
def check_config(self, *, check_write): | def check_config(self, *, check_write): | ||||
return True | return True | ||||
def __contains__(self, obj_id): | def __contains__(self, obj_id): | ||||
return obj_id in self.winery | return obj_id in self.winery | ||||
def add(self, content, obj_id, check_presence=True): | def add(self, content: bytes, obj_id: ObjId, check_presence: bool = True) -> ObjId: | ||||
return self.winery.add(content, obj_id, check_presence) | return self.winery.add(content, obj_id, check_presence) | ||||
def check(self, obj_id): | def check(self, obj_id: ObjId) -> None: | ||||
return self.winery.check(obj_id) | return self.winery.check(obj_id) | ||||
def delete(self, obj_id): | def delete(self, obj_id: ObjId): | ||||
raise PermissionError("Delete is not allowed.") | raise PermissionError("Delete is not allowed.") | ||||
class WineryBase: | class WineryBase: | ||||
def __init__(self, **kwargs): | def __init__(self, **kwargs): | ||||
self.args = kwargs | self.args = kwargs | ||||
self.init() | self.init() | ||||
Show All 14 Lines | class WineryReader(WineryBase): | ||||
def roshard(self, name): | def roshard(self, name): | ||||
if name not in self.shards: | if name not in self.shards: | ||||
shard = ROShard(name, **self.args) | shard = ROShard(name, **self.args) | ||||
shard.load() | shard.load() | ||||
self.shards[name] = shard | self.shards[name] = shard | ||||
return self.shards[name] | return self.shards[name] | ||||
def get(self, obj_id): | def get(self, obj_id: ObjId) -> bytes: | ||||
shard_info = self.base.get(obj_id) | shard_info = self.base.get(obj_id) | ||||
if shard_info is None: | if shard_info is None: | ||||
raise exc.ObjNotFoundError(obj_id) | raise exc.ObjNotFoundError(obj_id) | ||||
name, readonly = shard_info | name, readonly = shard_info | ||||
if readonly: | if readonly: | ||||
shard = self.roshard(name) | shard = self.roshard(name) | ||||
content = shard.get(obj_id) | content = shard.get(obj_id) | ||||
del shard | del shard | ||||
▲ Show 20 Lines • Show All 49 Lines • ▼ Show 20 Lines | class WineryWriter(WineryReader): | ||||
def init(self): | def init(self): | ||||
super().init() | super().init() | ||||
self.shard = RWShard(self.base.whoami, **self.args) | self.shard = RWShard(self.base.whoami, **self.args) | ||||
def uninit(self): | def uninit(self): | ||||
self.shard.uninit() | self.shard.uninit() | ||||
super().uninit() | super().uninit() | ||||
def add(self, content, obj_id, check_presence=True): | def add(self, content: bytes, obj_id: ObjId, check_presence: bool = True) -> ObjId: | ||||
if check_presence and obj_id in self: | if check_presence and obj_id in self: | ||||
return obj_id | return obj_id | ||||
shard = self.base.add_phase_1(obj_id) | shard = self.base.add_phase_1(obj_id) | ||||
if shard != self.base.id: | if shard != self.base.id: | ||||
# this object is the responsibility of another shard | # this object is the responsibility of another shard | ||||
return obj_id | return obj_id | ||||
self.shard.add(obj_id, content) | self.shard.add(obj_id, content) | ||||
self.base.add_phase_2(obj_id) | self.base.add_phase_2(obj_id) | ||||
if self.shard.is_full(): | if self.shard.is_full(): | ||||
self.pack() | self.pack() | ||||
return obj_id | return obj_id | ||||
def check(self, obj_id): | def check(self, obj_id: ObjId) -> None: | ||||
# load all shards packing == True and not locked (i.e. packer | # load all shards packing == True and not locked (i.e. packer | ||||
# was interrupted for whatever reason) run pack for each of them | # was interrupted for whatever reason) run pack for each of them | ||||
pass | pass | ||||
def pack(self): | def pack(self): | ||||
self.base.shard_packing_starts() | self.base.shard_packing_starts() | ||||
p = Process(target=pack, args=(self.shard.name,), kwargs=self.args) | p = Process(target=pack, args=(self.shard.name,), kwargs=self.args) | ||||
self.uninit() | self.uninit() | ||||
p.start() | p.start() | ||||
self.packers.append(p) | self.packers.append(p) | ||||
self.init() | self.init() | ||||
def __del__(self): | def __del__(self): | ||||
for p in self.packers: | for p in self.packers: | ||||
p.kill() | p.kill() | ||||
p.join() | p.join() |