Changeset View
Changeset View
Standalone View
Standalone View
swh/objstorage/backends/winery/sharedbase.py
- This file was added.
# Copyright (C) 2021 The Software Heritage developers | |||||
# See the AUTHORS file at the top-level directory of this distribution | |||||
# License: GNU General Public License version 3, or any later version | |||||
# See top-level LICENSE file for more information | |||||
import uuid | |||||
import psycopg2 | |||||
from .database import Database | |||||
class SharedBase(Database): | |||||
def __init__(self, **kwargs): | |||||
super().__init__(kwargs["base_dsn"]) | |||||
database = "sharedbase" | |||||
self.create_database(database) | |||||
self.db = self.create_table(f"{self.dsn}/{database}") | |||||
self._whoami = None | |||||
def create_table(self, dsn): | |||||
db = psycopg2.connect(dsn) | |||||
db.autocommit = True | |||||
c = db.cursor() | |||||
c.execute( | |||||
"CREATE TABLE IF NOT EXISTS shards(" | |||||
"id SERIAL PRIMARY KEY, " | |||||
"readonly BOOLEAN DEFAULT FALSE, " | |||||
"name CHAR(32) NOT NULL " | |||||
")" | |||||
) | |||||
c.execute( | |||||
"CREATE TABLE IF NOT EXISTS signature2shard(" | |||||
"signature BYTEA PRIMARY KEY, " | |||||
"inflight BOOLEAN DEFAULT TRUE NOT NULL, " | |||||
"shard INTEGER NOT NULL" | |||||
")" | |||||
) | |||||
c.close() | |||||
return db | |||||
@property | |||||
def whoami(self): | |||||
self.set_whoami() | |||||
return self._whoami | |||||
@property | |||||
def id(self): | |||||
self.set_whoami() | |||||
return self._whoami_id | |||||
def set_whoami(self): | |||||
if self._whoami is not None: | |||||
return | |||||
while True: | |||||
self._whoami, self._whoami_id = self.lock_a_shard() | |||||
if self._whoami is not None: | |||||
return self._whoami | |||||
self.create_shard() | |||||
def lock_a_shard(self): | |||||
with self.db.cursor() as c: | |||||
c.execute( | |||||
"SELECT name FROM shards WHERE readonly = FALSE " | |||||
"LIMIT 1 FOR UPDATE SKIP LOCKED" | |||||
) | |||||
if c.rowcount == 0: | |||||
return None, None | |||||
name = c.fetchone()[0] | |||||
return self.lock_shard(name) | |||||
def lock_shard(self, name): | |||||
self.whoami_lock = self.db.cursor() | |||||
try: | |||||
self.whoami_lock.execute( | |||||
"SELECT name, id FROM shards " | |||||
"WHERE readonly = FALSE AND name = %s FOR UPDATE NOWAIT", | |||||
(name,), | |||||
) | |||||
return self.whoami_lock.fetchone() | |||||
except Exception: | |||||
return None | |||||
def create_shard(self): | |||||
name = uuid.uuid4().hex | |||||
name = "i" + name[1:] # ensure the first character is not a number | |||||
with self.db.cursor() as c: | |||||
c.execute("INSERT INTO shards (name, readonly) VALUES (%s, FALSE)", (name,)) | |||||
self.db.commit() | |||||
def shard_packing_starts(self): | |||||
with self.db.cursor() as c: | |||||
c.execute( | |||||
"UPDATE shards SET readonly = NULL WHERE name = %s", (self.whoami,) | |||||
) | |||||
def shard_packing_ends(self): | |||||
with self.db.cursor() as c: | |||||
c.execute( | |||||
"UPDATE shards SET readonly = TRUE WHERE name = %s", (self.whoami,) | |||||
) | |||||
def get_shard_info(self, id): | |||||
with self.db.cursor() as c: | |||||
c.execute("SELECT name, readonly FROM shards WHERE id = %s", (id,)) | |||||
if c.rowcount == 0: | |||||
return None | |||||
else: | |||||
return c.fetchone() | |||||
def contains(self, obj_id): | |||||
with self.db.cursor() as c: | |||||
c.execute( | |||||
"SELECT shard FROM signature2shard WHERE " | |||||
"signature = %s AND inflight = FALSE", | |||||
(obj_id,), | |||||
) | |||||
if c.rowcount == 0: | |||||
return None | |||||
else: | |||||
return c.fetchone()[0] | |||||
def get(self, obj_id): | |||||
id = self.contains(obj_id) | |||||
if id is None: | |||||
return None | |||||
return self.get_shard_info(id) | |||||
def add_phase_1(self, obj_id): | |||||
try: | |||||
with self.db.cursor() as c: | |||||
c.execute( | |||||
"INSERT INTO signature2shard (signature, shard, inflight) " | |||||
"VALUES (%s, %s, TRUE)", | |||||
(obj_id, self.id), | |||||
) | |||||
self.db.commit() | |||||
return self.id | |||||
except psycopg2.errors.UniqueViolation: | |||||
return self.contains(obj_id) | |||||
def add_phase_2(self, obj_id): | |||||
with self.db.cursor() as c: | |||||
c.execute( | |||||
"UPDATE signature2shard SET inflight = FALSE " | |||||
"WHERE signature = %s AND shard = %s", | |||||
(obj_id, self.id), | |||||
) | |||||
self.db.commit() |