Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/postgresql/storage.py
# Copyright (C) 2015-2022 The Software Heritage developers | # Copyright (C) 2015-2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import base64 | import base64 | ||||
from collections import defaultdict | from collections import defaultdict | ||||
import contextlib | import contextlib | ||||
from contextlib import contextmanager | from contextlib import contextmanager | ||||
import datetime | import datetime | ||||
import itertools | import itertools | ||||
import logging | |||||
import operator | import operator | ||||
from typing import Any, Counter, Dict, Iterable, List, Optional, Sequence, Tuple | from typing import Any, Counter, Dict, Iterable, List, Optional, Sequence, Tuple | ||||
import attr | import attr | ||||
import psycopg2 | import psycopg2 | ||||
import psycopg2.errors | import psycopg2.errors | ||||
import psycopg2.pool | import psycopg2.pool | ||||
from swh.core.api.serializers import msgpack_dumps, msgpack_loads | from swh.core.api.serializers import msgpack_dumps, msgpack_loads | ||||
from swh.core.db.common import db_transaction, db_transaction_generator | from swh.core.db.common import db_transaction, db_transaction_generator | ||||
from swh.core.db.db_utils import swh_db_version | |||||
from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex | from swh.model.hashutil import DEFAULT_ALGORITHMS, hash_to_bytes, hash_to_hex | ||||
from swh.model.model import ( | from swh.model.model import ( | ||||
SHA1_SIZE, | SHA1_SIZE, | ||||
Content, | Content, | ||||
Directory, | Directory, | ||||
DirectoryEntry, | DirectoryEntry, | ||||
ExtID, | ExtID, | ||||
MetadataAuthority, | MetadataAuthority, | ||||
Show All 28 Lines | from swh.storage.utils import ( | ||||
map_optional, | map_optional, | ||||
now, | now, | ||||
) | ) | ||||
from swh.storage.writer import JournalWriter | from swh.storage.writer import JournalWriter | ||||
from . import converters | from . import converters | ||||
from .db import Db | from .db import Db | ||||
logger = logging.getLogger(__name__) | |||||
# Max block size of contents to return | # Max block size of contents to return | ||||
BULK_BLOCK_CONTENT_LEN_MAX = 10000 | BULK_BLOCK_CONTENT_LEN_MAX = 10000 | ||||
EMPTY_SNAPSHOT_ID = hash_to_bytes("1a8893e6a86f444e8be8e7bda6cb34fb1735a00e") | EMPTY_SNAPSHOT_ID = hash_to_bytes("1a8893e6a86f444e8be8e7bda6cb34fb1735a00e") | ||||
"""Identifier for the empty snapshot""" | """Identifier for the empty snapshot""" | ||||
VALIDATION_EXCEPTIONS = ( | VALIDATION_EXCEPTIONS = ( | ||||
Show All 22 Lines | except psycopg2.errors.UniqueViolation: | ||||
# a subclass of IntegrityError; so we need to catch and reraise it | # a subclass of IntegrityError; so we need to catch and reraise it | ||||
# before the next clause converts it to StorageArgumentException. | # before the next clause converts it to StorageArgumentException. | ||||
raise | raise | ||||
except VALIDATION_EXCEPTIONS as e: | except VALIDATION_EXCEPTIONS as e: | ||||
raise StorageArgumentException(str(e)) | raise StorageArgumentException(str(e)) | ||||
class Storage: | class Storage: | ||||
"""SWH storage proxy, encompassing DB and object storage""" | """SWH storage datastore proxy, encompassing DB and object storage""" | ||||
current_version: int = 182 | |||||
def __init__( | def __init__( | ||||
self, | self, | ||||
db, | db, | ||||
objstorage, | objstorage, | ||||
min_pool_conns=1, | min_pool_conns=1, | ||||
max_pool_conns=10, | max_pool_conns=10, | ||||
journal_writer=None, | journal_writer=None, | ||||
▲ Show 20 Lines • Show All 75 Lines • ▼ Show 20 Lines | def db(self): | ||||
self.put_db(db) | self.put_db(db) | ||||
@db_transaction() | @db_transaction() | ||||
def check_config(self, *, check_write: bool, db: Db, cur=None) -> bool: | def check_config(self, *, check_write: bool, db: Db, cur=None) -> bool: | ||||
if not self.objstorage.check_config(check_write=check_write): | if not self.objstorage.check_config(check_write=check_write): | ||||
return False | return False | ||||
if not db.check_dbversion(): | dbversion = swh_db_version(db.conn.dsn) | ||||
if dbversion != self.current_version: | |||||
logger.warning( | |||||
"database dbversion (%s) != %s current_version (%s)", | |||||
dbversion, | |||||
__name__, | |||||
self.current_version, | |||||
) | |||||
return False | return False | ||||
# Check permissions on one of the tables | # Check permissions on one of the tables | ||||
if check_write: | check = "INSERT" if check_write else "SELECT" | ||||
check = "INSERT" | |||||
else: | |||||
check = "SELECT" | |||||
cur.execute("select has_table_privilege(current_user, 'content', %s)", (check,)) | cur.execute("select has_table_privilege(current_user, 'content', %s)", (check,)) | ||||
return cur.fetchone()[0] | return cur.fetchone()[0] | ||||
@db_transaction() | |||||
def get_current_version(self, *, db: Db, cur=None): | |||||
"""Returns the current code (expected) version""" | |||||
return db.current_version | |||||
def _content_unique_key(self, hash, db): | def _content_unique_key(self, hash, db): | ||||
"""Given a hash (tuple or dict), return a unique key from the | """Given a hash (tuple or dict), return a unique key from the | ||||
aggregation of keys. | aggregation of keys. | ||||
""" | """ | ||||
keys = db.content_hash_keys | keys = db.content_hash_keys | ||||
if isinstance(hash, tuple): | if isinstance(hash, tuple): | ||||
return hash | return hash | ||||
▲ Show 20 Lines • Show All 1,459 Lines • Show Last 20 Lines |