Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/pg_storage.py
- This file was added.
# Copyright (C) 2015-2018 The Software Heritage developers | |||||
# See the AUTHORS file at the top-level directory of this distribution | |||||
# License: GNU General Public License version 3, or any later version | |||||
# See top-level LICENSE file for more information | |||||
from collections import defaultdict | |||||
import itertools | |||||
import psycopg2 | |||||
import psycopg2.pool | |||||
from . import converters | |||||
from .common import db_transaction_generator, db_transaction | |||||
from .db import PgDb | |||||
from .exc import StorageDBError | |||||
from .storage import BaseStorage | |||||
from swh.model.hashutil import hash_to_bytes | |||||
# Max block size of contents to return | |||||
BULK_BLOCK_CONTENT_LEN_MAX = 10000 | |||||
EMPTY_SNAPSHOT_ID = hash_to_bytes('1a8893e6a86f444e8be8e7bda6cb34fb1735a00e') | |||||
"""Identifier for the empty snapshot""" | |||||
class PgStorage(BaseStorage): | |||||
"""SWH storage proxy, encompassing DB and object storage | |||||
""" | |||||
def _init_db(self, db, min_pool_conns=1, max_pool_conns=10): | |||||
""" | |||||
Args: | |||||
db_conn: either a libpq connection string, or a psycopg2 connection | |||||
obj_root: path to the root of the object storage | |||||
""" | |||||
try: | |||||
if isinstance(db, psycopg2.extensions.connection): | |||||
self._pool = None | |||||
return PgDb(db) | |||||
else: | |||||
self._pool = psycopg2.pool.ThreadedConnectionPool( | |||||
min_pool_conns, max_pool_conns, db | |||||
) | |||||
return None | |||||
except psycopg2.OperationalError as e: | |||||
raise StorageDBError(e) | |||||
def get_db(self): | |||||
if self._db: | |||||
return self._db | |||||
else: | |||||
return PgDb.from_pool(self._pool) | |||||
def check_config(self, *, check_write): | |||||
if not self.objstorage.check_config(check_write=check_write): | |||||
return False | |||||
# Check permissions on one of the tables | |||||
with self.get_db().transaction() as cur: | |||||
if check_write: | |||||
check = 'INSERT' | |||||
else: | |||||
check = 'SELECT' | |||||
cur.execute( | |||||
"select has_table_privilege(current_user, 'content', %s)", | |||||
(check,) | |||||
) | |||||
return cur.fetchone()[0] | |||||
return True | |||||
@db_transaction() | |||||
def _add_missing_content_to_db(self, content, db=None, cur=None): | |||||
# create temporary table for metadata injection | |||||
db.mktemp('content', cur) | |||||
db.copy_to(content, 'tmp_content', | |||||
db.content_get_metadata_keys, cur) | |||||
# move metadata in place | |||||
db.content_add_from_temp(cur) | |||||
@db_transaction() | |||||
def _add_skipped_content_to_db(self, skipped_content, db=None, cur=None): | |||||
db.mktemp('skipped_content', cur) | |||||
db.copy_to(skipped_content, 'tmp_skipped_content', | |||||
db.skipped_content_keys, cur) | |||||
# move metadata in place | |||||
db.skipped_content_add_from_temp(cur) | |||||
@db_transaction() | |||||
def content_update(self, content, keys=[], db=None, cur=None): | |||||
# TODO: Add a check on input keys. How to properly implement | |||||
# this? We don't know yet the new columns. | |||||
db.mktemp('content', cur) | |||||
select_keys = list(set(db.content_get_metadata_keys).union(set(keys))) | |||||
db.copy_to(content, 'tmp_content', select_keys, cur) | |||||
db.content_update_from_temp(keys_to_update=keys, | |||||
cur=cur) | |||||
@db_transaction_generator() | |||||
def content_missing_per_sha1(self, contents, db=None, cur=None): | |||||
for obj in db.content_missing_per_sha1(contents, cur): | |||||
yield obj[0] | |||||
@db_transaction_generator() | |||||
def skipped_content_missing(self, content, db=None, cur=None): | |||||
keys = db.content_hash_keys | |||||
db.mktemp('skipped_content', cur) | |||||
db.copy_to(content, 'tmp_skipped_content', | |||||
keys + ['length', 'reason'], cur) | |||||
yield from db.skipped_content_missing_from_temp(cur) | |||||
def directory_add(self, directories): | |||||
dirs = set() | |||||
dir_entries = { | |||||
'file': defaultdict(list), | |||||
'dir': defaultdict(list), | |||||
'rev': defaultdict(list), | |||||
} | |||||
for cur_dir in directories: | |||||
dir_id = cur_dir['id'] | |||||
dirs.add(dir_id) | |||||
for src_entry in cur_dir['entries']: | |||||
entry = src_entry.copy() | |||||
entry['dir_id'] = dir_id | |||||
dir_entries[entry['type']][dir_id].append(entry) | |||||
dirs_missing = set(self.directory_missing(dirs)) | |||||
if not dirs_missing: | |||||
return | |||||
db = self.get_db() | |||||
with db.transaction() as cur: | |||||
# Copy directory ids | |||||
dirs_missing_dict = ({'id': dir} for dir in dirs_missing) | |||||
db.mktemp('directory', cur) | |||||
db.copy_to(dirs_missing_dict, 'tmp_directory', ['id'], cur) | |||||
# Copy entries | |||||
for entry_type, entry_list in dir_entries.items(): | |||||
entries = itertools.chain.from_iterable( | |||||
entries_for_dir | |||||
for dir_id, entries_for_dir | |||||
in entry_list.items() | |||||
if dir_id in dirs_missing) | |||||
db.mktemp_dir_entry(entry_type) | |||||
db.copy_to( | |||||
entries, | |||||
'tmp_directory_entry_%s' % entry_type, | |||||
['target', 'name', 'perms', 'dir_id'], | |||||
cur, | |||||
) | |||||
# Do the final copy | |||||
db.directory_add_from_temp(cur) | |||||
def revision_add(self, revisions): | |||||
db = self.get_db() | |||||
revisions_missing = set(self.revision_missing( | |||||
set(revision['id'] for revision in revisions))) | |||||
if not revisions_missing: | |||||
return | |||||
with db.transaction() as cur: | |||||
db.mktemp_revision(cur) | |||||
revisions_filtered = ( | |||||
converters.revision_to_db(revision) for revision in revisions | |||||
if revision['id'] in revisions_missing) | |||||
parents_filtered = [] | |||||
db.copy_to( | |||||
revisions_filtered, 'tmp_revision', db.revision_add_cols, | |||||
cur, | |||||
lambda rev: parents_filtered.extend(rev['parents'])) | |||||
db.revision_add_from_temp(cur) | |||||
db.copy_to(parents_filtered, 'revision_history', | |||||
['id', 'parent_id', 'parent_rank'], cur) | |||||
def release_add(self, releases): | |||||
db = self.get_db() | |||||
release_ids = set(release['id'] for release in releases) | |||||
releases_missing = set(self.release_missing(release_ids)) | |||||
if not releases_missing: | |||||
return | |||||
with db.transaction() as cur: | |||||
db.mktemp_release(cur) | |||||
releases_filtered = ( | |||||
converters.release_to_db(release) for release in releases | |||||
if release['id'] in releases_missing | |||||
) | |||||
db.copy_to(releases_filtered, 'tmp_release', db.release_add_cols, | |||||
cur) | |||||
db.release_add_from_temp(cur) | |||||
@db_transaction() | |||||
def snapshot_add(self, origin, visit, snapshot, | |||||
db=None, cur=None): | |||||
if not db.snapshot_exists(snapshot['id'], cur): | |||||
db.mktemp_snapshot_branch(cur) | |||||
db.copy_to( | |||||
( | |||||
{ | |||||
'name': name, | |||||
'target': info['target'] if info else None, | |||||
'target_type': info['target_type'] if info else None, | |||||
} | |||||
for name, info in snapshot['branches'].items() | |||||
), | |||||
'tmp_snapshot_branch', | |||||
['name', 'target', 'target_type'], | |||||
cur, | |||||
) | |||||
db.snapshot_add(origin, visit, snapshot['id'], cur) |