diff --git a/mypy.ini b/mypy.ini --- a/mypy.ini +++ b/mypy.ini @@ -19,3 +19,6 @@ [mypy-requests_toolbelt.*] ignore_missing_imports = True + +[mypy-psycopg2.*] +ignore_missing_imports = True diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,2 +1,3 @@ swh.core[http] >= 0.3 swh.model >= 0.0.27 +#swh.prefecthash diff --git a/requirements-test.txt b/requirements-test.txt --- a/requirements-test.txt +++ b/requirements-test.txt @@ -5,3 +5,5 @@ requests_toolbelt types-pyyaml types-requests +pytest-postgresql < 4.0.0 # version 4.0 depends on psycopg 3. https://github.com/ClearcodeHQ/pytest-postgresql/blob/main/CHANGES.rst#400 + diff --git a/requirements.txt b/requirements.txt --- a/requirements.txt +++ b/requirements.txt @@ -6,6 +6,7 @@ aiohttp >= 3 click requests +psycopg2 # optional dependencies # apache-libcloud diff --git a/swh/objstorage/backends/winery/__init__.py b/swh/objstorage/backends/winery/__init__.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/backends/winery/__init__.py @@ -0,0 +1 @@ +from .objstorage import WineryObjStorage # noqa: F401 diff --git a/swh/objstorage/backends/winery/database.py b/swh/objstorage/backends/winery/database.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/backends/winery/database.py @@ -0,0 +1,75 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import logging +import time + +import psycopg2 + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.ERROR) + + +class Database: + def __init__(self, dsn): + self.dsn = dsn + + def create_database(self, database): + db = psycopg2.connect(f"{self.dsn}/postgres") + db.autocommit = True + c = db.cursor() + c.execute( + f"SELECT datname FROM pg_catalog.pg_database WHERE datname = '{database}'" + ) + if c.rowcount == 0: + c.execute(f"CREATE DATABASE {database}") + c.close() + + def drop_database(self, database): + db = psycopg2.connect(f"{self.dsn}/postgres") + # https://wiki.postgresql.org/wiki/Psycopg2_Tutorial + # If you want to drop the database you would need to + # change the isolation level of the database. + db.set_isolation_level(0) + db.autocommit = True + c = db.cursor() + # + # Dropping the database may fail because the server takes time + # to notice a connection was dropped and/or a named cursor is + # in the process of being deleted. It can happen here or even + # when deleting all database with the psql cli + # and there are no bench process active. + # + # ERROR: database "i606428a5a6274d1ab09eecc4d019fef7" is being + # accessed by other users DETAIL: There is 1 other session + # using the database. + # + # See: + # https://stackoverflow.com/questions/5108876/kill-a-postgresql-session-connection + # + # https://www.postgresql.org/docs/current/sql-dropdatabase.html + # + # WITH (FORCE) added in postgresql 13 but may also fail because the + # named cursor may not be handled as a client. + # + for i in range(60): + try: + c.execute(f"DROP DATABASE IF EXISTS {database}") + break + except psycopg2.errors.ObjectInUse: + LOGGER.warning(f"{database} database drop fails, waiting") + time.sleep(10) + continue + raise Exception("database drop fails {database}") + c.close() + + def list_databases(self): + db = psycopg2.connect(f"{self.dsn}/postgres") + with db.cursor() as c: + c.execute( + "SELECT datname FROM pg_database " + "WHERE datistemplate = false and datname != 'postgres'" + ) + return [r[0] for r in c.fetchall()] diff --git a/swh/objstorage/backends/winery/objstorage.py b/swh/objstorage/backends/winery/objstorage.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/backends/winery/objstorage.py @@ -0,0 +1,96 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import concurrent.futures +import logging + +from swh.objstorage import exc +from swh.objstorage.objstorage import ObjStorage, compute_hash + +from .rwshard import RWShard +from .sharedbase import SharedBase + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.ERROR) + + +def pack(shard, **kwargs): + return Packer(shard, **kwargs).run() + + +class Packer: + def __init__(self, shard, **kwargs): + self.args = kwargs + self.base = SharedBase(**self.args) + self.shard = RWShard(shard, **self.args) + + def run(self): + return True + + +class WineryObjStorage(ObjStorage): + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.args = kwargs + self.executor = concurrent.futures.ProcessPoolExecutor(max_workers=1) + self.packers = [] + self.init() + + def init(self): + self.base = SharedBase(**self.args) + self.shard = RWShard(self.base.whoami, **self.args) + + def check_config(self, *, check_write): + return True + + def __contains__(self, obj_id): + return self.base.contains(obj_id) + + def add(self, content, obj_id=None, check_presence=True): + if obj_id is None: + obj_id = compute_hash(content) + + if check_presence and obj_id in self: + return obj_id + + shard = self.base.add_phase_1(obj_id) + if shard != self.base.id: + # this object is the responsibility of another shard + return obj_id + + self.shard.add(obj_id, content) + self.base.add_phase_2(obj_id) + + if self.shard.is_full(): + self.pack() + + return obj_id + + def get(self, obj_id): + shard_info = self.base.get(obj_id) + if shard_info is None: + raise exc.ObjNotFoundError(obj_id) + name, readonly = shard_info + if readonly: + pass # ROShard + else: + shard = RWShard(name, **self.args) + content = shard.get(obj_id) + if content is None: + raise exc.ObjNotFoundError(obj_id) + return content + + def check(self, obj_id): + # load all shards readonly == NULL and not locked (i.e. packer + # was interrupted for whatever reason) run pack for each of them + pass + + def delete(self, obj_id): + raise PermissionError("Delete is not allowed.") + + def pack(self): + self.base.shard_packing_starts() + self.packers.append(self.executor.submit(pack, self.shard.name, **self.args)) + self.init() diff --git a/swh/objstorage/backends/winery/rwshard.py b/swh/objstorage/backends/winery/rwshard.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/backends/winery/rwshard.py @@ -0,0 +1,68 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + + +import psycopg2 + +from .database import Database + + +class RWShard(Database): + def __init__(self, name, **kwargs): + super().__init__(kwargs["shard_dsn"]) + self._name = name + self.create_database(self.name) + self.db = self.create_table(f"{self.dsn}/{self.name}") + self.size = self.total_size() + self.limit = kwargs["shard_max_size"] + + @property + def name(self): + return self._name + + def is_full(self): + return self.size > self.limit + + def create_table(self, dsn): + db = psycopg2.connect(dsn) + db.autocommit = True + c = db.cursor() + c.execute( + "CREATE TABLE IF NOT EXISTS objects(" + "key BYTEA PRIMARY KEY, " + "content BYTEA " + ")" + ) + c.close() + return db + + def total_size(self): + with self.db.cursor() as c: + c.execute("SELECT SUM(LENGTH(content)) FROM objects") + size = c.fetchone()[0] + if size is None: + return 0 + else: + return int(size) + + def add(self, obj_id, content): + try: + with self.db.cursor() as c: + c.execute( + "INSERT INTO objects (key, content) VALUES (%s, %s)", + (obj_id, content), + ) + self.db.commit() + self.size += len(content) + except psycopg2.errors.UniqueViolation: + pass + + def get(self, obj_id): + with self.db.cursor() as c: + c.execute("SELECT content FROM objects WHERE key = %s", (obj_id,)) + if c.rowcount == 0: + return None + else: + return c.fetchone()[0].tobytes() diff --git a/swh/objstorage/backends/winery/sharedbase.py b/swh/objstorage/backends/winery/sharedbase.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/backends/winery/sharedbase.py @@ -0,0 +1,150 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import uuid + +import psycopg2 + +from .database import Database + + +class SharedBase(Database): + def __init__(self, **kwargs): + super().__init__(kwargs["base_dsn"]) + database = "sharedbase" + self.create_database(database) + self.db = self.create_table(f"{self.dsn}/{database}") + self._whoami = None + + def create_table(self, dsn): + db = psycopg2.connect(dsn) + db.autocommit = True + c = db.cursor() + c.execute( + "CREATE TABLE IF NOT EXISTS shards(" + "id SERIAL PRIMARY KEY, " + "readonly BOOLEAN DEFAULT FALSE, " + "name CHAR(32) NOT NULL " + ")" + ) + c.execute( + "CREATE TABLE IF NOT EXISTS signature2shard(" + "signature BYTEA PRIMARY KEY, " + "inflight BOOLEAN DEFAULT TRUE NOT NULL, " + "shard INTEGER NOT NULL" + ")" + ) + c.close() + return db + + @property + def whoami(self): + self.set_whoami() + return self._whoami + + @property + def id(self): + self.set_whoami() + return self._whoami_id + + def set_whoami(self): + if self._whoami is not None: + return + + while True: + self._whoami, self._whoami_id = self.lock_a_shard() + if self._whoami is not None: + return self._whoami + self.create_shard() + + def lock_a_shard(self): + with self.db.cursor() as c: + c.execute( + "SELECT name FROM shards WHERE readonly = FALSE " + "LIMIT 1 FOR UPDATE SKIP LOCKED" + ) + if c.rowcount == 0: + return None, None + name = c.fetchone()[0] + return self.lock_shard(name) + + def lock_shard(self, name): + self.whoami_lock = self.db.cursor() + try: + self.whoami_lock.execute( + "SELECT name, id FROM shards " + "WHERE readonly = FALSE AND name = %s FOR UPDATE NOWAIT", + (name,), + ) + return self.whoami_lock.fetchone() + except Exception: + return None + + def create_shard(self): + name = uuid.uuid4().hex + name = "i" + name[1:] # ensure the first character is not a number + with self.db.cursor() as c: + c.execute("INSERT INTO shards (name, readonly) VALUES (%s, FALSE)", (name,)) + self.db.commit() + + def shard_packing_starts(self): + with self.db.cursor() as c: + c.execute( + "UPDATE shards SET readonly = NULL WHERE name = %s", (self.whoami,) + ) + + def shard_packing_ends(self): + with self.db.cursor() as c: + c.execute( + "UPDATE shards SET readonly = TRUE WHERE name = %s", (self.whoami,) + ) + + def get_shard_info(self, id): + with self.db.cursor() as c: + c.execute("SELECT name, readonly FROM shards WHERE id = %s", (id,)) + if c.rowcount == 0: + return None + else: + return c.fetchone() + + def contains(self, obj_id): + with self.db.cursor() as c: + c.execute( + "SELECT shard FROM signature2shard WHERE " + "signature = %s AND inflight = FALSE", + (obj_id,), + ) + if c.rowcount == 0: + return None + else: + return c.fetchone()[0] + + def get(self, obj_id): + id = self.contains(obj_id) + if id is None: + return None + return self.get_shard_info(id) + + def add_phase_1(self, obj_id): + try: + with self.db.cursor() as c: + c.execute( + "INSERT INTO signature2shard (signature, shard, inflight) " + "VALUES (%s, %s, TRUE)", + (obj_id, self.id), + ) + self.db.commit() + return self.id + except psycopg2.errors.UniqueViolation: + return self.contains(obj_id) + + def add_phase_2(self, obj_id): + with self.db.cursor() as c: + c.execute( + "UPDATE signature2shard SET inflight = FALSE " + "WHERE signature = %s AND shard = %s", + (obj_id, self.id), + ) + self.db.commit() diff --git a/swh/objstorage/factory.py b/swh/objstorage/factory.py --- a/swh/objstorage/factory.py +++ b/swh/objstorage/factory.py @@ -13,6 +13,7 @@ from swh.objstorage.backends.noop import NoopObjStorage from swh.objstorage.backends.pathslicing import PathSlicingObjStorage from swh.objstorage.backends.seaweedfs import SeaweedFilerObjStorage +from swh.objstorage.backends.winery import WineryObjStorage from swh.objstorage.multiplexer import MultiplexerObjStorage, StripingObjStorage from swh.objstorage.multiplexer.filter import add_filters from swh.objstorage.objstorage import ID_HASH_LENGTH, ObjStorage # noqa @@ -27,6 +28,7 @@ "seaweedfs": SeaweedFilerObjStorage, "random": RandomGeneratorObjStorage, "http": HTTPReadOnlyObjStorage, + "winery": WineryObjStorage, "noop": NoopObjStorage, } diff --git a/swh/objstorage/tests/test_objstorage_winery.py b/swh/objstorage/tests/test_objstorage_winery.py new file mode 100644 --- /dev/null +++ b/swh/objstorage/tests/test_objstorage_winery.py @@ -0,0 +1,78 @@ +# Copyright (C) 2021 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import concurrent.futures + +import pytest + +from swh.objstorage import exc +from swh.objstorage.backends.winery.database import Database +from swh.objstorage.factory import get_objstorage + + +@pytest.fixture +def dsn(postgresql): + dsn = ( + f"postgres://{postgresql.info.user}" + f":@{postgresql.info.host}:{postgresql.info.port}" + ) + yield dsn + # pytest-postgresql will not remove databases that it did not create + d = Database(dsn) + for database in d.list_databases(): + if database != postgresql.info.dbname and database != "tests_tmpl": + d.drop_database(database) + + +def test_winery_sharedbase(dsn): + base = get_objstorage( + cls="winery", base_dsn=dsn, shard_dsn=dsn, shard_max_size=1 + ).base + shard1 = base.whoami + assert shard1 is not None + assert shard1 == base.whoami + + id1 = base.id + assert id1 is not None + assert id1 == base.id + + +def test_winery_add_get(dsn): + winery = get_objstorage( + cls="winery", base_dsn=dsn, shard_dsn=dsn, shard_max_size=1024 + ) + shard = winery.base.whoami + content = b"SOMETHING" + obj_id = winery.add(content=content) + assert obj_id.hex() == "0c8c841f7d9fd4874d841506d3ffc16808b1d579" + assert winery.add(content=content, obj_id=obj_id) == obj_id + assert winery.add(content=content, obj_id=obj_id, check_presence=False) == obj_id + assert winery.base.whoami == shard + assert winery.get(obj_id) == content + with pytest.raises(exc.ObjNotFoundError): + winery.get(b"unknown") + + +def test_winery_add_and_pack(dsn): + winery = get_objstorage(cls="winery", base_dsn=dsn, shard_dsn=dsn, shard_max_size=1) + shard = winery.base.whoami + content = b"SOMETHING" + obj_id = winery.add(content=content) + assert obj_id.hex() == "0c8c841f7d9fd4874d841506d3ffc16808b1d579" + assert winery.base.whoami != shard + assert len(winery.packers) == 1 + done, not_done = concurrent.futures.wait(winery.packers) + assert len(not_done) == 0 + assert len(done) == 1 + packer = list(done)[0] + assert packer.exception(timeout=0) is None + assert packer.result() is True + winery.executor.shutdown() + + +def test_winery_delete(dsn): + winery = get_objstorage(cls="winery", base_dsn=dsn, shard_dsn=dsn, shard_max_size=1) + with pytest.raises(PermissionError): + winery.delete(None) diff --git a/tox.ini b/tox.ini --- a/tox.ini +++ b/tox.ini @@ -10,7 +10,7 @@ commands = pytest --cov={envsitepackagesdir}/swh/objstorage \ {envsitepackagesdir}/swh/objstorage \ - --cov-branch {posargs} + --cov-branch --cov-report term-missing {posargs} [testenv:black] skip_install = true