Changeset View
Standalone View
swh/objstorage/backends/winery/rwshard.py
- This file was added.
# Copyright (C) 2021 The Software Heritage developers | |||||
# See the AUTHORS file at the top-level directory of this distribution | |||||
# License: GNU General Public License version 3, or any later version | |||||
# See top-level LICENSE file for more information | |||||
import psycopg2 | |||||
from .database import Database | |||||
class RWShard(Database): | |||||
def __init__(self, name, **kwargs): | |||||
super().__init__(kwargs["shard_dsn"]) | |||||
self._name = name | |||||
self.create_database(self.name) | |||||
self.db = self.create_table(f"{self.dsn}/{self.name}") | |||||
self.size = self.total_size() | |||||
self.limit = kwargs["shard_max_size"] | |||||
def uninit(self): | |||||
if hasattr(self, "db"): | |||||
self.db.close() | |||||
del self.db | |||||
@property | |||||
def name(self): | |||||
return self._name | |||||
def is_full(self): | |||||
return self.size > self.limit | |||||
def drop(self): | |||||
self.drop_database(self.name) | |||||
def create_table(self, dsn): | |||||
db = psycopg2.connect(dsn) | |||||
db.autocommit = True | |||||
c = db.cursor() | |||||
c.execute( | |||||
"CREATE TABLE IF NOT EXISTS objects(" | |||||
"key BYTEA PRIMARY KEY, " | |||||
"content BYTEA " | |||||
")" | |||||
) | |||||
c.close() | |||||
return db | |||||
vlorentz: Why `int()`? | |||||
Done Inline ActionsNot necessary indeed, removed. dachary: Not necessary indeed, removed. | |||||
def total_size(self): | |||||
with self.db.cursor() as c: | |||||
c.execute("SELECT SUM(LENGTH(content)) FROM objects") | |||||
size = c.fetchone()[0] | |||||
if size is None: | |||||
return 0 | |||||
else: | |||||
return size | |||||
def add(self, obj_id, content): | |||||
Done Inline ActionsNote for the python experts: is this instruction thread safe? vsellier: Note for the python experts: is this instruction thread safe? | |||||
Done Inline ActionsGood catch, it looks like it isn't because that's three bytecode instructions: >>> class Foo: ... def f(self): ... self.size += 42 ... >>> dis.dis(Foo.f) 3 0 LOAD_FAST 0 (self) 2 DUP_TOP 4 LOAD_ATTR 0 (size) 6 LOAD_CONST 1 (42) 8 INPLACE_ADD 10 ROT_TWO 12 STORE_ATTR 0 (size) 14 LOAD_CONST 0 (None) 16 RETURN_VALUE Ironically, it is thread-safe to use += on containers (eg. lists) because INPLACE_ADD mutates the container itself. The bytecode is the same, but STORE_ATTR would just preserve the reference. vlorentz: Good catch, it looks like it isn't because that's three bytecode instructions:
```
>>> class… | |||||
Done Inline ActionsInteresting. Must this codepath be thread safe ? It runs from the objstorage/storage wsgi workers, Are they configured to run multiple threads? dachary: Interesting. Must this codepath be thread safe ? It runs from the objstorage/storage wsgi… | |||||
Done Inline ActionsYes, they are. For performance, it would probably be best to make RWShard not shared between threads rather than add locks, right? vlorentz: Yes, they are.
For performance, it would probably be best to make `RWShard` not shared between… | |||||
Done Inline ActionsFor the record the conclusion of the investigations conducted today on IRC are that the code does not run in a thread but in a coroutine and is therefore not subject to concurrent access of the data. dachary: For the record the conclusion of the investigations conducted today on IRC are that the code… | |||||
try: | |||||
with self.db.cursor() as c: | |||||
c.execute( | |||||
"INSERT INTO objects (key, content) VALUES (%s, %s)", | |||||
(obj_id, content), | |||||
) | |||||
self.db.commit() | |||||
self.size += len(content) | |||||
except psycopg2.errors.UniqueViolation: | |||||
pass | |||||
def get(self, obj_id): | |||||
with self.db.cursor() as c: | |||||
c.execute("SELECT content FROM objects WHERE key = %s", (obj_id,)) | |||||
if c.rowcount == 0: | |||||
return None | |||||
else: | |||||
return c.fetchone()[0].tobytes() | |||||
def all(self): | |||||
with self.db.cursor() as c: | |||||
c.execute("SELECT key,content FROM objects") | |||||
for row in c: | |||||
yield row[0].tobytes(), row[1].tobytes() | |||||
def count(self): | |||||
with self.db.cursor() as c: | |||||
c.execute("SELECT COUNT(*) FROM objects") | |||||
return c.fetchone()[0] |
Why int()?