Changeset View
Standalone View
swh/objstorage/backends/winery/rwshard.py
- This file was added.
| # Copyright (C) 2021 The Software Heritage developers | |||||
| # See the AUTHORS file at the top-level directory of this distribution | |||||
| # License: GNU General Public License version 3, or any later version | |||||
| # See top-level LICENSE file for more information | |||||
| import psycopg2 | |||||
| from .database import Database | |||||
| class RWShard(Database): | |||||
| def __init__(self, name, **kwargs): | |||||
| super().__init__(kwargs["shard_dsn"]) | |||||
| self._name = name | |||||
| self.create_database(self.name) | |||||
| self.db = self.create_table(f"{self.dsn}/{self.name}") | |||||
| self.size = self.total_size() | |||||
| self.limit = kwargs["shard_max_size"] | |||||
| def uninit(self): | |||||
| if hasattr(self, "db"): | |||||
| self.db.close() | |||||
| del self.db | |||||
| @property | |||||
| def name(self): | |||||
| return self._name | |||||
| def is_full(self): | |||||
| return self.size > self.limit | |||||
| def drop(self): | |||||
| self.drop_database(self.name) | |||||
| def create_table(self, dsn): | |||||
| db = psycopg2.connect(dsn) | |||||
| db.autocommit = True | |||||
| c = db.cursor() | |||||
| c.execute( | |||||
| "CREATE TABLE IF NOT EXISTS objects(" | |||||
| "key BYTEA PRIMARY KEY, " | |||||
| "content BYTEA " | |||||
| ")" | |||||
| ) | |||||
| c.close() | |||||
| return db | |||||
vlorentz: Why `int()`? | |||||
Done Inline ActionsNot necessary indeed, removed. dachary: Not necessary indeed, removed. | |||||
| def total_size(self): | |||||
| with self.db.cursor() as c: | |||||
| c.execute("SELECT SUM(LENGTH(content)) FROM objects") | |||||
| size = c.fetchone()[0] | |||||
| if size is None: | |||||
| return 0 | |||||
| else: | |||||
| return size | |||||
| def add(self, obj_id, content): | |||||
Done Inline ActionsNote for the python experts: is this instruction thread safe? vsellier: Note for the python experts: is this instruction thread safe? | |||||
Done Inline ActionsGood catch, it looks like it isn't because that's three bytecode instructions: >>> class Foo:
... def f(self):
... self.size += 42
...
>>> dis.dis(Foo.f)
3 0 LOAD_FAST 0 (self)
2 DUP_TOP
4 LOAD_ATTR 0 (size)
6 LOAD_CONST 1 (42)
8 INPLACE_ADD
10 ROT_TWO
12 STORE_ATTR 0 (size)
14 LOAD_CONST 0 (None)
16 RETURN_VALUEIronically, it is thread-safe to use += on containers (eg. lists) because INPLACE_ADD mutates the container itself. The bytecode is the same, but STORE_ATTR would just preserve the reference. vlorentz: Good catch, it looks like it isn't because that's three bytecode instructions:
```
>>> class… | |||||
Done Inline ActionsInteresting. Must this codepath be thread safe ? It runs from the objstorage/storage wsgi workers, Are they configured to run multiple threads? dachary: Interesting. Must this codepath be thread safe ? It runs from the objstorage/storage wsgi… | |||||
Done Inline ActionsYes, they are. For performance, it would probably be best to make RWShard not shared between threads rather than add locks, right? vlorentz: Yes, they are.
For performance, it would probably be best to make `RWShard` not shared between… | |||||
Done Inline ActionsFor the record the conclusion of the investigations conducted today on IRC are that the code does not run in a thread but in a coroutine and is therefore not subject to concurrent access of the data. dachary: For the record the conclusion of the investigations conducted today on IRC are that the code… | |||||
| try: | |||||
| with self.db.cursor() as c: | |||||
| c.execute( | |||||
| "INSERT INTO objects (key, content) VALUES (%s, %s)", | |||||
| (obj_id, content), | |||||
| ) | |||||
| self.db.commit() | |||||
| self.size += len(content) | |||||
| except psycopg2.errors.UniqueViolation: | |||||
| pass | |||||
| def get(self, obj_id): | |||||
| with self.db.cursor() as c: | |||||
| c.execute("SELECT content FROM objects WHERE key = %s", (obj_id,)) | |||||
| if c.rowcount == 0: | |||||
| return None | |||||
| else: | |||||
| return c.fetchone()[0].tobytes() | |||||
| def all(self): | |||||
| with self.db.cursor() as c: | |||||
| c.execute("SELECT key,content FROM objects") | |||||
| for row in c: | |||||
| yield row[0].tobytes(), row[1].tobytes() | |||||
| def count(self): | |||||
| with self.db.cursor() as c: | |||||
| c.execute("SELECT COUNT(*) FROM objects") | |||||
| return c.fetchone()[0] | |||||
Why int()?