Changeset View
Standalone View
swh/objstorage/backends/winery/sharedbase.py
- This file was added.
# Copyright (C) 2021 The Software Heritage developers | |||||||||||||||
# See the AUTHORS file at the top-level directory of this distribution | |||||||||||||||
# License: GNU General Public License version 3, or any later version | |||||||||||||||
# See top-level LICENSE file for more information | |||||||||||||||
import uuid | |||||||||||||||
import psycopg2 | |||||||||||||||
from .database import Database | |||||||||||||||
class SharedBase(Database): | |||||||||||||||
def __init__(self, **kwargs): | |||||||||||||||
super().__init__(kwargs["base_dsn"]) | |||||||||||||||
database = "sharedbase" | |||||||||||||||
self.create_database(database) | |||||||||||||||
self.db = self.create_table(f"{self.dsn}/{database}") | |||||||||||||||
self._whoami = None | |||||||||||||||
def uninit(self): | |||||||||||||||
self.db.close() | |||||||||||||||
del self.db | |||||||||||||||
def create_table(self, dsn): | |||||||||||||||
db = psycopg2.connect(dsn) | |||||||||||||||
db.autocommit = True | |||||||||||||||
c = db.cursor() | |||||||||||||||
lock = 314116 | |||||||||||||||
vlorentz: `NOT NULL` ;) | |||||||||||||||
Done Inline ActionsDone! dachary: Done! | |||||||||||||||
olasdUnsubmitted Done Inline ActionsWhat is the meaning of this magic number? olasd: What is the meaning of this magic number? | |||||||||||||||
dacharyAuthorUnsubmitted Done Inline ActionsThere is no meaning, it just has to be unique for the lock to work. I added a comment to clarify that. dachary: There is no meaning, it just has to be unique for the lock to work. I added a comment to… | |||||||||||||||
c.execute("SELECT pg_advisory_lock(%s)", (lock,)) | |||||||||||||||
c.execute( | |||||||||||||||
Done Inline ActionsCould you add CHECK (!readonly OR !packing), just to be safe? (or whatever the invariant should be) vlorentz: Could you add `CHECK (!readonly OR !packing)`, just to be safe? (or whatever the invariant… | |||||||||||||||
Done Inline ActionsI'll address that with border case testing at a later stage (other patches). There is much to do in that area. dachary: I'll address that with border case testing at a later stage (other patches). There is much to… | |||||||||||||||
"CREATE TABLE IF NOT EXISTS shards(" | |||||||||||||||
"id SERIAL PRIMARY KEY, " | |||||||||||||||
"readonly BOOLEAN NOT NULL, " | |||||||||||||||
"packing BOOLEAN NOT NULL, " | |||||||||||||||
"name CHAR(32) NOT NULL " | |||||||||||||||
")" | |||||||||||||||
Done Inline ActionsIt doesn't look like the defaults are used. vlorentz: It doesn't look like the defaults are used. | |||||||||||||||
Done Inline ActionsI removed it, good catch. dachary: I removed it, good catch.
| |||||||||||||||
olasdUnsubmitted Done Inline Actions
Do we really need an id and a name? Either way the name column should probably be made unique. I would suggest merging both into a single uuid-based column (and reconstructing the database names from the shard uuid on the fly). In any case the char type has some weird padding behavior with spaces, which is probably not what we want (even though it probably doesn't matter in practice). olasd: Do we really need an `id` and a `name`? Either way the `name` column should probably be made… | |||||||||||||||
dacharyAuthorUnsubmitted Done Inline Actions
The id matches the shard field in the signature2shard table below to save space. I'll make the name field unique. There will be a small number of rows in this table, is your recommendation for merging the fields about simplicity or saving space? dachary: > Do we really need an id and a name?
The id matches the shard field in the signature2shard… | |||||||||||||||
) | |||||||||||||||
c.execute( | |||||||||||||||
"CREATE TABLE IF NOT EXISTS signature2shard(" | |||||||||||||||
"signature BYTEA PRIMARY KEY, " | |||||||||||||||
"inflight BOOLEAN NOT NULL, " | |||||||||||||||
"shard INTEGER NOT NULL" | |||||||||||||||
")" | |||||||||||||||
) | |||||||||||||||
olasdUnsubmitted Done Inline ActionsProbably worth using triple-quoted strings for SQL queries generally, but that's purely stylistic. olasd: Probably worth using triple-quoted strings for SQL queries generally, but that's purely… | |||||||||||||||
dacharyAuthorUnsubmitted Done Inline ActionsDone! dachary: Done! | |||||||||||||||
c.close() | |||||||||||||||
db.close() # so the pg_advisory_lock is released | |||||||||||||||
db = psycopg2.connect(dsn) | |||||||||||||||
db.autocommit = True | |||||||||||||||
return db | |||||||||||||||
@property | |||||||||||||||
def whoami(self): | |||||||||||||||
self.set_whoami() | |||||||||||||||
return self._whoami | |||||||||||||||
@property | |||||||||||||||
def id(self): | |||||||||||||||
self.set_whoami() | |||||||||||||||
return self._whoami_id | |||||||||||||||
def set_whoami(self): | |||||||||||||||
if self._whoami is not None: | |||||||||||||||
return | |||||||||||||||
while True: | |||||||||||||||
self._whoami, self._whoami_id = self.lock_a_shard() | |||||||||||||||
if self._whoami is not None: | |||||||||||||||
return self._whoami | |||||||||||||||
self.create_shard() | |||||||||||||||
Done Inline Actionsnice vlorentz: nice | |||||||||||||||
def lock_a_shard(self): | |||||||||||||||
with self.db.cursor() as c: | |||||||||||||||
c.execute( | |||||||||||||||
"SELECT name FROM shards WHERE readonly = FALSE and packing = FALSE " | |||||||||||||||
"LIMIT 1 FOR UPDATE SKIP LOCKED" | |||||||||||||||
) | |||||||||||||||
if c.rowcount == 0: | |||||||||||||||
return None, None | |||||||||||||||
name = c.fetchone()[0] | |||||||||||||||
return self.lock_shard(name) | |||||||||||||||
Done Inline Actionscan you make the except stricter? vlorentz: can you make the `except` stricter? | |||||||||||||||
Done Inline ActionsThis is not nice but I don't know how to do it otherwise. There is no inventory of the exceptions that can be raised and essentially all exceptions other than those originating from a SQL statement poorly formatted mean the shard cannot be locked. If a legitimate exception is missing because it happens very rarely, it will needlessly crash the worker. I'm open to ideas on how to improve that! dachary: This is not nice but I don't know how to do it otherwise. There is no inventory of the… | |||||||||||||||
Done Inline ActionsOk; then use psycopg2.Error, it should cover all "normal" exceptions vlorentz: Ok; then use `psycopg2.Error`, it should cover all "normal" exceptions | |||||||||||||||
Done Inline ActionsAh, thanks for the tip. Done ! dachary: Ah, thanks for the tip. Done ! | |||||||||||||||
def lock_shard(self, name): | |||||||||||||||
self.whoami_lock = self.db.cursor() | |||||||||||||||
try: | |||||||||||||||
self.whoami_lock.execute( | |||||||||||||||
"SELECT name, id FROM shards " | |||||||||||||||
Done Inline Actionswhy? vlorentz: why? | |||||||||||||||
Done Inline ActionsIt is used for database names. I amended the comment, good catch 👍 dachary: It is used for database names. I amended the comment, good catch 👍 | |||||||||||||||
"WHERE readonly = FALSE AND packing = FALSE AND name = %s " | |||||||||||||||
"FOR UPDATE NOWAIT", | |||||||||||||||
(name,), | |||||||||||||||
) | |||||||||||||||
return self.whoami_lock.fetchone() | |||||||||||||||
except psycopg2.Error: | |||||||||||||||
return None | |||||||||||||||
Done Inline ActionsI'm not comfortable with using NULL to make it a trinary boolean; NULL can have unexpected behaviors. What about an enum instead? (and it allows you to easily add more states in the future, if needed) vlorentz: I'm not comfortable with using NULL to make it a trinary boolean; NULL can have unexpected… | |||||||||||||||
Done Inline ActionsI won't use NULL, I trust you on this. dachary: I won't use NULL, I trust you on this. | |||||||||||||||
Done Inline ActionsSeeing the way these booleans are handled, I think what @vlorentz had in mind was making the shard "state" a single non-null column containing a ternary enum (with values such as "writable", "packing", "readonly"). olasd: Seeing the way these booleans are handled, I think what @vlorentz had in mind was making the… | |||||||||||||||
Done Inline ActionsI understood what @vlorentz had in mind but thought having two fields made the whole thing a little clearer. dachary: I understood what @vlorentz had in mind but thought having two fields made the whole thing a… | |||||||||||||||
Done Inline ActionsI'm not a huge fan of having two booleans to encode a 3-states state machine btw. But it's acceptable as long as you add a CHECK as I mentioned in another comment. vlorentz: I'm not a huge fan of having two booleans to encode a 3-states state machine btw. But it's… | |||||||||||||||
def unlock_shard(self): | |||||||||||||||
del self.whoami_lock | |||||||||||||||
def create_shard(self): | |||||||||||||||
name = uuid.uuid4().hex | |||||||||||||||
# | |||||||||||||||
# ensure the first character is not a number so it can be used as a | |||||||||||||||
# database name. | |||||||||||||||
# | |||||||||||||||
name = "i" + name[1:] | |||||||||||||||
with self.db.cursor() as c: | |||||||||||||||
c.execute( | |||||||||||||||
"INSERT INTO shards (name, readonly, packing) " | |||||||||||||||
"VALUES (%s, FALSE, FALSE)", | |||||||||||||||
(name,), | |||||||||||||||
) | |||||||||||||||
self.db.commit() | |||||||||||||||
def shard_packing_starts(self): | |||||||||||||||
with self.db.cursor() as c: | |||||||||||||||
c.execute( | |||||||||||||||
"UPDATE shards SET packing = TRUE WHERE name = %s", (self.whoami,) | |||||||||||||||
) | |||||||||||||||
self.unlock_shard() | |||||||||||||||
def shard_packing_ends(self, name): | |||||||||||||||
with self.db.cursor() as c: | |||||||||||||||
c.execute( | |||||||||||||||
"UPDATE shards SET readonly = TRUE, packing = FALSE " "WHERE name = %s", | |||||||||||||||
(name,), | |||||||||||||||
) | |||||||||||||||
def get_shard_info(self, id): | |||||||||||||||
with self.db.cursor() as c: | |||||||||||||||
c.execute("SELECT name, readonly FROM shards WHERE id = %s", (id,)) | |||||||||||||||
if c.rowcount == 0: | |||||||||||||||
return None | |||||||||||||||
else: | |||||||||||||||
return c.fetchone() | |||||||||||||||
def get_shard_info_by_name(self, name): | |||||||||||||||
with self.db.cursor() as c: | |||||||||||||||
c.execute("SELECT readonly, packing FROM shards WHERE name = %s", (name,)) | |||||||||||||||
if c.rowcount == 0: | |||||||||||||||
return None | |||||||||||||||
else: | |||||||||||||||
return c.fetchone() | |||||||||||||||
olasdUnsubmitted Done Inline ActionsThese return values are inconsistent, I assume we would want to make them all consistent (maybe using a dataclass for ShardInfo?) olasd: These return values are inconsistent, I assume we would want to make them all consistent (maybe… | |||||||||||||||
dacharyAuthorUnsubmitted Done Inline ActionsThis function was exclusively used for testing and indeed confusing because inconsistent. I moved it to the test directory for clarity. dachary: This function was exclusively used for testing and indeed confusing because inconsistent. I… | |||||||||||||||
def list_shards(self): | |||||||||||||||
with self.db.cursor() as c: | |||||||||||||||
c.execute("SELECT name, readonly, packing FROM shards") | |||||||||||||||
for row in c: | |||||||||||||||
yield row[0], row[1], row[2] | |||||||||||||||
def contains(self, obj_id): | |||||||||||||||
with self.db.cursor() as c: | |||||||||||||||
c.execute( | |||||||||||||||
"SELECT shard FROM signature2shard WHERE " | |||||||||||||||
"signature = %s AND inflight = FALSE", | |||||||||||||||
(obj_id,), | |||||||||||||||
) | |||||||||||||||
if c.rowcount == 0: | |||||||||||||||
return None | |||||||||||||||
else: | |||||||||||||||
return c.fetchone()[0] | |||||||||||||||
def get(self, obj_id): | |||||||||||||||
id = self.contains(obj_id) | |||||||||||||||
if id is None: | |||||||||||||||
return None | |||||||||||||||
return self.get_shard_info(id) | |||||||||||||||
def add_phase_1(self, obj_id): | |||||||||||||||
try: | |||||||||||||||
with self.db.cursor() as c: | |||||||||||||||
c.execute( | |||||||||||||||
"INSERT INTO signature2shard (signature, shard, inflight) " | |||||||||||||||
"VALUES (%s, %s, TRUE)", | |||||||||||||||
(obj_id, self.id), | |||||||||||||||
) | |||||||||||||||
self.db.commit() | |||||||||||||||
return self.id | |||||||||||||||
except psycopg2.errors.UniqueViolation: | |||||||||||||||
return self.contains(obj_id) | |||||||||||||||
def add_phase_2(self, obj_id): | |||||||||||||||
with self.db.cursor() as c: | |||||||||||||||
c.execute( | |||||||||||||||
"UPDATE signature2shard SET inflight = FALSE " | |||||||||||||||
"WHERE signature = %s AND shard = %s", | |||||||||||||||
(obj_id, self.id), | |||||||||||||||
) | |||||||||||||||
self.db.commit() |
NOT NULL ;)