Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7343023
D631.id.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
35 KB
Subscribers
None
D631.id.diff
View Options
diff --git a/swh/storage/__init__.py b/swh/storage/__init__.py
--- a/swh/storage/__init__.py
+++ b/swh/storage/__init__.py
@@ -3,9 +3,9 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-from . import storage
+from . import pg_storage
-Storage = storage.Storage
+Storage = pg_storage.PgStorage
def get_storage(cls, args):
@@ -29,7 +29,7 @@
if cls == 'remote':
from .api.client import RemoteStorage as Storage
elif cls == 'local':
- from .storage import Storage
+ from .pg_storage import PgStorage as Storage
else:
raise ValueError('Unknown storage class `%s`' % cls)
diff --git a/swh/storage/common.py b/swh/storage/common.py
--- a/swh/storage/common.py
+++ b/swh/storage/common.py
@@ -7,20 +7,6 @@
import functools
-def apply_options(cursor, options):
- """Applies the given postgresql client options to the given cursor.
-
- Returns a dictionary with the old values if they changed."""
- old_options = {}
- for option, value in options.items():
- cursor.execute('SHOW %s' % option)
- old_value = cursor.fetchall()[0][0]
- if old_value != value:
- cursor.execute('SET LOCAL %s TO %%s' % option, (value,))
- old_options[option] = old_value
- return old_options
-
-
def db_transaction(**client_options):
"""decorator to execute Storage methods within DB transactions
@@ -36,15 +22,16 @@
@functools.wraps(meth)
def _meth(self, *args, **kwargs):
if 'cur' in kwargs and kwargs['cur']:
+ db = kwargs['db']
cur = kwargs['cur']
- old_options = apply_options(cur, __client_options)
+ old_options = db.apply_options(cur, __client_options)
ret = meth(self, *args, **kwargs)
- apply_options(cur, old_options)
+ db.apply_options(cur, old_options)
return ret
else:
db = self.get_db()
with db.transaction() as cur:
- apply_options(cur, __client_options)
+ db.apply_options(cur, __client_options)
return meth(self, *args, db=db, cur=cur, **kwargs)
return _meth
@@ -67,14 +54,15 @@
@functools.wraps(meth)
def _meth(self, *args, **kwargs):
if 'cur' in kwargs and kwargs['cur']:
+ db = kwargs['db']
cur = kwargs['cur']
- old_options = apply_options(cur, __client_options)
+ old_options = db.apply_options(cur, __client_options)
yield from meth(self, *args, **kwargs)
- apply_options(cur, old_options)
+ db.apply_options(cur, old_options)
else:
db = self.get_db()
with db.transaction() as cur:
- apply_options(cur, __client_options)
+ db.apply_options(cur, __client_options)
yield from meth(self, *args, db=db, cur=cur, **kwargs)
return _meth
return decorator
diff --git a/swh/storage/db.py b/swh/storage/db.py
--- a/swh/storage/db.py
+++ b/swh/storage/db.py
@@ -3,6 +3,7 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import abc
import binascii
import datetime
import enum
@@ -11,6 +12,7 @@
import os
import select
import threading
+import warnings
from contextlib import contextmanager
@@ -78,76 +80,27 @@
yield line_to_bytes(line)
-class BaseDb:
+class BaseDb(metaclass=abc.ABCMeta):
"""Base class for swh.storage.*Db.
cf. swh.storage.db.Db, swh.archiver.db.ArchiverDb
"""
- @classmethod
- def connect(cls, *args, **kwargs):
- """factory method to create a DB proxy
-
- Accepts all arguments of psycopg2.connect; only some specific
- possibilities are reported below.
-
- Args:
- connstring: libpq2 connection string
-
- """
- conn = psycopg2.connect(*args, **kwargs)
- return cls(conn)
-
- @classmethod
- def from_pool(cls, pool):
- return cls(pool.getconn(), pool=pool)
+ @abc.abstractmethod
+ def apply_options(self, cursor, options):
+ """Applies the given postgresql client options to the given cursor.
- def _cursor(self, cur_arg):
- """get a cursor: from cur_arg if given, or a fresh one otherwise
-
- meant to avoid boilerplate if/then/else in methods that proxy stored
- procedures
-
- """
- if cur_arg is not None:
- return cur_arg
- # elif self.cur is not None:
- # return self.cur
- else:
- return self.conn.cursor()
-
- def __init__(self, conn, pool=None):
- """create a DB proxy
+ Returns a dictionary with the old values if they changed.
- Args:
- conn: psycopg2 connection to the SWH DB
- pool: psycopg2 pool of connections
-
- """
- self.conn = conn
- self.pool = pool
-
- def __del__(self):
- if self.pool:
- self.pool.putconn(self.conn)
+ Non-postgresql subclasses should emulate these options as much
+ as possible."""
+ pass
- @contextmanager
+ @abc.abstractmethod
def transaction(self):
- """context manager to execute within a DB transaction
-
- Yields:
- a psycopg2 cursor
-
- """
- with self.conn.cursor() as cur:
- try:
- yield cur
- self.conn.commit()
- except Exception:
- if not self.conn.closed:
- self.conn.rollback()
- raise
+ """context manager to execute within a DB transaction"""
+ pass
def copy_to(self, items, tblname, columns, cur=None, item_cb=None):
"""Copy items' entries to table tblname with columns information.
@@ -221,10 +174,85 @@
self._cursor(cur).execute('SELECT swh_mktemp(%s)', (tblname,))
-class Db(BaseDb):
+class PgDb(BaseDb):
"""Proxy to the SWH DB, with wrappers around stored procedures
"""
+
+ @classmethod
+ def connect(cls, *args, **kwargs):
+ """factory method to create a DB proxy
+
+ Accepts all arguments of psycopg2.connect; only some specific
+ possibilities are reported below.
+
+ Args:
+ connstring: libpq2 connection string
+
+ """
+ conn = psycopg2.connect(*args, **kwargs)
+ return cls(conn)
+
+ @classmethod
+ def from_pool(cls, pool):
+ return cls(pool.getconn(), pool=pool)
+
+ def _cursor(self, cur_arg):
+ """get a cursor: from cur_arg if given, or a fresh one otherwise
+
+ meant to avoid boilerplate if/then/else in methods that proxy stored
+ procedures
+
+ """
+ if cur_arg is not None:
+ return cur_arg
+ # elif self.cur is not None:
+ # return self.cur
+ else:
+ return self.conn.cursor()
+
+ def __init__(self, conn, pool=None):
+ """create a DB proxy
+
+ Args:
+ conn: psycopg2 connection to the SWH DB
+ pool: psycopg2 pool of connections
+
+ """
+ self.conn = conn
+ self.pool = pool
+
+ def __del__(self):
+ if self.pool:
+ self.pool.putconn(self.conn)
+
+ @contextmanager
+ def transaction(self):
+ """context manager to execute within a DB transaction
+
+ Yields:
+ a psycopg2 cursor
+
+ """
+ with self.conn.cursor() as cur:
+ try:
+ yield cur
+ self.conn.commit()
+ except Exception:
+ if not self.conn.closed:
+ self.conn.rollback()
+ raise
+
+ def apply_options(self, cursor, options):
+ old_options = {}
+ for option, value in options.items():
+ cursor.execute('SHOW %s' % option)
+ old_value = cursor.fetchall()[0][0]
+ if old_value != value:
+ cursor.execute('SET LOCAL %s TO %%s' % option, (value,))
+ old_options[option] = old_value
+ return old_options
+
def mktemp_dir_entry(self, entry_type, cur=None):
self._cursor(cur).execute('SELECT swh_mktemp_dir_entry(%s)',
(('directory_entry_%s' % entry_type),))
@@ -1035,3 +1063,10 @@
if not data:
return None
return line_to_bytes(data)
+
+
+class Db(PgDb):
+ def __init__(self, *args, **kwargs):
+ warnings.warn("Db was renamed to PgDb in v0.0.109.",
+ DeprecationWarning)
+ super().__init__(*args, **kwargs)
diff --git a/swh/storage/pg_storage.py b/swh/storage/pg_storage.py
new file mode 100644
--- /dev/null
+++ b/swh/storage/pg_storage.py
@@ -0,0 +1,240 @@
+# Copyright (C) 2015-2018 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+
+from collections import defaultdict
+import itertools
+
+import psycopg2
+import psycopg2.pool
+
+from . import converters
+from .common import db_transaction_generator, db_transaction
+from .db import PgDb
+from .exc import StorageDBError
+from .storage import BaseStorage
+
+from swh.model.hashutil import hash_to_bytes
+
+# Max block size of contents to return
+BULK_BLOCK_CONTENT_LEN_MAX = 10000
+
+EMPTY_SNAPSHOT_ID = hash_to_bytes('1a8893e6a86f444e8be8e7bda6cb34fb1735a00e')
+"""Identifier for the empty snapshot"""
+
+
+class PgStorage(BaseStorage):
+ """SWH storage proxy, encompassing DB and object storage
+
+ """
+
+ def _init_db(self, db, min_pool_conns=1, max_pool_conns=10):
+ """
+ Args:
+ db_conn: either a libpq connection string, or a psycopg2 connection
+ obj_root: path to the root of the object storage
+
+ """
+ try:
+ if isinstance(db, psycopg2.extensions.connection):
+ self._pool = None
+ return PgDb(db)
+ else:
+ self._pool = psycopg2.pool.ThreadedConnectionPool(
+ min_pool_conns, max_pool_conns, db
+ )
+ return None
+ except psycopg2.OperationalError as e:
+ raise StorageDBError(e)
+
+ def get_db(self):
+ if self._db:
+ return self._db
+ else:
+ return PgDb.from_pool(self._pool)
+
+ def check_config(self, *, check_write):
+ if not self.objstorage.check_config(check_write=check_write):
+ return False
+
+ # Check permissions on one of the tables
+ with self.get_db().transaction() as cur:
+ if check_write:
+ check = 'INSERT'
+ else:
+ check = 'SELECT'
+
+ cur.execute(
+ "select has_table_privilege(current_user, 'content', %s)",
+ (check,)
+ )
+ return cur.fetchone()[0]
+
+ return True
+
+ @db_transaction()
+ def _add_missing_content_to_db(self, content, db=None, cur=None):
+ # create temporary table for metadata injection
+ db.mktemp('content', cur)
+
+ db.copy_to(content, 'tmp_content',
+ db.content_get_metadata_keys, cur)
+
+ # move metadata in place
+ db.content_add_from_temp(cur)
+
+ @db_transaction()
+ def _add_skipped_content_to_db(self, skipped_content, db=None, cur=None):
+
+ db.mktemp('skipped_content', cur)
+ db.copy_to(skipped_content, 'tmp_skipped_content',
+ db.skipped_content_keys, cur)
+
+ # move metadata in place
+ db.skipped_content_add_from_temp(cur)
+
+ @db_transaction()
+ def content_update(self, content, keys=[], db=None, cur=None):
+ # TODO: Add a check on input keys. How to properly implement
+ # this? We don't know yet the new columns.
+
+ db.mktemp('content', cur)
+ select_keys = list(set(db.content_get_metadata_keys).union(set(keys)))
+ db.copy_to(content, 'tmp_content', select_keys, cur)
+ db.content_update_from_temp(keys_to_update=keys,
+ cur=cur)
+
+ @db_transaction_generator()
+ def content_missing_per_sha1(self, contents, db=None, cur=None):
+ for obj in db.content_missing_per_sha1(contents, cur):
+ yield obj[0]
+
+ @db_transaction_generator()
+ def skipped_content_missing(self, content, db=None, cur=None):
+ keys = db.content_hash_keys
+
+ db.mktemp('skipped_content', cur)
+ db.copy_to(content, 'tmp_skipped_content',
+ keys + ['length', 'reason'], cur)
+
+ yield from db.skipped_content_missing_from_temp(cur)
+
+ def directory_add(self, directories):
+ dirs = set()
+ dir_entries = {
+ 'file': defaultdict(list),
+ 'dir': defaultdict(list),
+ 'rev': defaultdict(list),
+ }
+
+ for cur_dir in directories:
+ dir_id = cur_dir['id']
+ dirs.add(dir_id)
+ for src_entry in cur_dir['entries']:
+ entry = src_entry.copy()
+ entry['dir_id'] = dir_id
+ dir_entries[entry['type']][dir_id].append(entry)
+
+ dirs_missing = set(self.directory_missing(dirs))
+ if not dirs_missing:
+ return
+
+ db = self.get_db()
+ with db.transaction() as cur:
+ # Copy directory ids
+ dirs_missing_dict = ({'id': dir} for dir in dirs_missing)
+ db.mktemp('directory', cur)
+ db.copy_to(dirs_missing_dict, 'tmp_directory', ['id'], cur)
+
+ # Copy entries
+ for entry_type, entry_list in dir_entries.items():
+ entries = itertools.chain.from_iterable(
+ entries_for_dir
+ for dir_id, entries_for_dir
+ in entry_list.items()
+ if dir_id in dirs_missing)
+
+ db.mktemp_dir_entry(entry_type)
+
+ db.copy_to(
+ entries,
+ 'tmp_directory_entry_%s' % entry_type,
+ ['target', 'name', 'perms', 'dir_id'],
+ cur,
+ )
+
+ # Do the final copy
+ db.directory_add_from_temp(cur)
+
+ def revision_add(self, revisions):
+ db = self.get_db()
+
+ revisions_missing = set(self.revision_missing(
+ set(revision['id'] for revision in revisions)))
+
+ if not revisions_missing:
+ return
+
+ with db.transaction() as cur:
+ db.mktemp_revision(cur)
+
+ revisions_filtered = (
+ converters.revision_to_db(revision) for revision in revisions
+ if revision['id'] in revisions_missing)
+
+ parents_filtered = []
+
+ db.copy_to(
+ revisions_filtered, 'tmp_revision', db.revision_add_cols,
+ cur,
+ lambda rev: parents_filtered.extend(rev['parents']))
+
+ db.revision_add_from_temp(cur)
+
+ db.copy_to(parents_filtered, 'revision_history',
+ ['id', 'parent_id', 'parent_rank'], cur)
+
+ def release_add(self, releases):
+ db = self.get_db()
+
+ release_ids = set(release['id'] for release in releases)
+ releases_missing = set(self.release_missing(release_ids))
+
+ if not releases_missing:
+ return
+
+ with db.transaction() as cur:
+ db.mktemp_release(cur)
+
+ releases_filtered = (
+ converters.release_to_db(release) for release in releases
+ if release['id'] in releases_missing
+ )
+
+ db.copy_to(releases_filtered, 'tmp_release', db.release_add_cols,
+ cur)
+
+ db.release_add_from_temp(cur)
+
+ @db_transaction()
+ def snapshot_add(self, origin, visit, snapshot,
+ db=None, cur=None):
+ if not db.snapshot_exists(snapshot['id'], cur):
+ db.mktemp_snapshot_branch(cur)
+ db.copy_to(
+ (
+ {
+ 'name': name,
+ 'target': info['target'] if info else None,
+ 'target_type': info['target_type'] if info else None,
+ }
+ for name, info in snapshot['branches'].items()
+ ),
+ 'tmp_snapshot_branch',
+ ['name', 'target', 'target_type'],
+ cur,
+ )
+
+ db.snapshot_add(origin, visit, snapshot['id'], cur)
diff --git a/swh/storage/storage.py b/swh/storage/storage.py
--- a/swh/storage/storage.py
+++ b/swh/storage/storage.py
@@ -4,20 +4,16 @@
# See top-level LICENSE file for more information
+import abc
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
import datetime
-import itertools
import json
import dateutil.parser
-import psycopg2
-import psycopg2.pool
from . import converters
from .common import db_transaction_generator, db_transaction
-from .db import Db
-from .exc import StorageDBError
from .algos import diff
from swh.model.hashutil import ALGORITHMS, hash_to_bytes
@@ -31,58 +27,25 @@
"""Identifier for the empty snapshot"""
-class Storage():
- """SWH storage proxy, encompassing DB and object storage
+class BaseStorage(metaclass=abc.ABCMeta):
+ """Abstract class for storage backends.
"""
-
- def __init__(self, db, objstorage, min_pool_conns=1, max_pool_conns=10):
- """
- Args:
- db_conn: either a libpq connection string, or a psycopg2 connection
- obj_root: path to the root of the object storage
-
- """
- try:
- if isinstance(db, psycopg2.extensions.connection):
- self._pool = None
- self._db = Db(db)
- else:
- self._pool = psycopg2.pool.ThreadedConnectionPool(
- min_pool_conns, max_pool_conns, db
- )
- self._db = None
- except psycopg2.OperationalError as e:
- raise StorageDBError(e)
-
+ def __init__(self, db, objstorage, **kwargs):
+ self._db = self._init_db(db, **kwargs)
self.objstorage = get_objstorage(**objstorage)
- def get_db(self):
- if self._db:
- return self._db
- else:
- return Db.from_pool(self._pool)
+ @abc.abstractmethod
+ def _init_db(self, db, **kwargs):
+ pass
+
+ def get_db(self, db, **kwargs):
+ return self._db
+ @abc.abstractmethod
def check_config(self, *, check_write):
"""Check that the storage is configured and ready to go."""
-
- if not self.objstorage.check_config(check_write=check_write):
- return False
-
- # Check permissions on one of the tables
- with self.get_db().transaction() as cur:
- if check_write:
- check = 'INSERT'
- else:
- check = 'SELECT'
-
- cur.execute(
- "select has_table_privilege(current_user, 'content', %s)",
- (check,)
- )
- return cur.fetchone()[0]
-
- return True
+ pass
def content_add(self, content):
"""Add content blobs to the storage
@@ -134,6 +97,11 @@
in self.skipped_content_missing(
content_without_data))
+ missing_filtered = (cont for cont in content_with_data
+ if cont['sha1'] in missing_content)
+ skipped_filtered = (cont for cont in content_without_data
+ if _unique_key(cont) in missing_skipped)
+
def add_to_objstorage():
data = {
cont['sha1']: cont['data']
@@ -145,38 +113,40 @@
with db.transaction() as cur:
with ThreadPoolExecutor(max_workers=1) as executor:
added_to_objstorage = executor.submit(add_to_objstorage)
- if missing_content:
- # create temporary table for metadata injection
- db.mktemp('content', cur)
-
- content_filtered = (cont for cont in content_with_data
- if cont['sha1'] in missing_content)
- db.copy_to(content_filtered, 'tmp_content',
- db.content_get_metadata_keys, cur)
+ if missing_filtered:
+ self._add_missing_content_to_db(missing_filtered,
+ db=db, cur=cur)
+ if skipped_filtered:
+ self._add_skipped_content_to_db(skipped_filtered,
+ db=db, cur=cur)
- # move metadata in place
- db.content_add_from_temp(cur)
+ # Wait for objstorage addition before returning from the
+ # transaction, bubbling up any exception
+ added_to_objstorage.result()
- if missing_skipped:
- missing_filtered = (
- cont for cont in content_without_data
- if _unique_key(cont) in missing_skipped
- )
+ @abc.abstractmethod
+ def _add_missing_content_to_db(self, content, db=None, cur=None):
+ """Insert in the database the list of content that was
+ previously missing from the archive and are now being added
+ to the objstorage.
- db.mktemp('skipped_content', cur)
- db.copy_to(missing_filtered, 'tmp_skipped_content',
- db.skipped_content_keys, cur)
+ Args:
+ content: see `content_add`."""
+ pass
- # move metadata in place
- db.skipped_content_add_from_temp(cur)
+ @abc.abstractmethod
+ def _add_skipped_content_to_db(self, skipped_content, db=None, cur=None):
+ """Insert in the database the list of content that was
+ previously missing from the archive and is now explicitely
+ skipped -- meaning it's now referenced but not stored.
- # Wait for objstorage addition before returning from the
- # transaction, bubbling up any exception
- added_to_objstorage.result()
+ Args:
+ skipped_content: see `content_add`."""
+ pass
- @db_transaction()
- def content_update(self, content, keys=[], db=None, cur=None):
+ @abc.abstractmethod
+ def content_update(self, content, keys=[]):
"""Update content blobs to the storage. Does nothing for unknown
contents or skipped ones.
@@ -196,14 +166,7 @@
new hash column
"""
- # TODO: Add a check on input keys. How to properly implement
- # this? We don't know yet the new columns.
-
- db.mktemp('content', cur)
- select_keys = list(set(db.content_get_metadata_keys).union(set(keys)))
- db.copy_to(content, 'tmp_content', select_keys, cur)
- db.content_update_from_temp(keys_to_update=keys,
- cur=cur)
+ pass
def content_get(self, content):
"""Retrieve in bulk contents and their data.
@@ -304,7 +267,9 @@
@db_transaction_generator()
def skipped_content_missing(self, content, db=None, cur=None):
- """List skipped_content missing from storage
+ """List skipped_content missing from storage, ie. content
+ that is known by the archive but not stored for some reason
+ (e.g. file whose size is too large).
Args:
content: iterable of dictionaries containing the data for each
@@ -353,6 +318,7 @@
return dict(zip(db.content_find_cols, c))
return None
+ @abc.abstractmethod
def directory_add(self, directories):
"""Add directories to the storage
@@ -372,51 +338,7 @@
directory entry
- perms (int): entry permissions
"""
- dirs = set()
- dir_entries = {
- 'file': defaultdict(list),
- 'dir': defaultdict(list),
- 'rev': defaultdict(list),
- }
-
- for cur_dir in directories:
- dir_id = cur_dir['id']
- dirs.add(dir_id)
- for src_entry in cur_dir['entries']:
- entry = src_entry.copy()
- entry['dir_id'] = dir_id
- dir_entries[entry['type']][dir_id].append(entry)
-
- dirs_missing = set(self.directory_missing(dirs))
- if not dirs_missing:
- return
-
- db = self.get_db()
- with db.transaction() as cur:
- # Copy directory ids
- dirs_missing_dict = ({'id': dir} for dir in dirs_missing)
- db.mktemp('directory', cur)
- db.copy_to(dirs_missing_dict, 'tmp_directory', ['id'], cur)
-
- # Copy entries
- for entry_type, entry_list in dir_entries.items():
- entries = itertools.chain.from_iterable(
- entries_for_dir
- for dir_id, entries_for_dir
- in entry_list.items()
- if dir_id in dirs_missing)
-
- db.mktemp_dir_entry(entry_type)
-
- db.copy_to(
- entries,
- 'tmp_directory_entry_%s' % entry_type,
- ['target', 'name', 'perms', 'dir_id'],
- cur,
- )
-
- # Do the final copy
- db.directory_add_from_temp(cur)
+ pass
@db_transaction_generator()
def directory_missing(self, directories, db=None, cur=None):
@@ -469,6 +391,7 @@
if res:
return dict(zip(db.directory_ls_cols, res))
+ @abc.abstractmethod
def revision_add(self, revisions):
"""Add revisions to the storage
@@ -501,32 +424,7 @@
- parents (list of sha1_git): the parents of this revision
"""
- db = self.get_db()
-
- revisions_missing = set(self.revision_missing(
- set(revision['id'] for revision in revisions)))
-
- if not revisions_missing:
- return
-
- with db.transaction() as cur:
- db.mktemp_revision(cur)
-
- revisions_filtered = (
- converters.revision_to_db(revision) for revision in revisions
- if revision['id'] in revisions_missing)
-
- parents_filtered = []
-
- db.copy_to(
- revisions_filtered, 'tmp_revision', db.revision_add_cols,
- cur,
- lambda rev: parents_filtered.extend(rev['parents']))
-
- db.revision_add_from_temp(cur)
-
- db.copy_to(parents_filtered, 'revision_history',
- ['id', 'parent_id', 'parent_rank'], cur)
+ pass
@db_transaction_generator()
def revision_missing(self, revisions, db=None, cur=None):
@@ -683,9 +581,8 @@
dict(zip(db.release_get_cols, release))
)
- @db_transaction()
- def snapshot_add(self, origin, visit, snapshot,
- db=None, cur=None):
+ @abc.abstractmethod
+ def snapshot_add(self, origin, visit, snapshot):
"""Add a snapshot for the given origin/visit couple
Args:
@@ -707,23 +604,7 @@
(currently a ``sha1_git`` for all object kinds, or the name
of the target branch for aliases)
"""
- if not db.snapshot_exists(snapshot['id'], cur):
- db.mktemp_snapshot_branch(cur)
- db.copy_to(
- (
- {
- 'name': name,
- 'target': info['target'] if info else None,
- 'target_type': info['target_type'] if info else None,
- }
- for name, info in snapshot['branches'].items()
- ),
- 'tmp_snapshot_branch',
- ['name', 'target', 'target_type'],
- cur,
- )
-
- db.snapshot_add(origin, visit, snapshot['id'], cur)
+ pass
@db_transaction(statement_timeout=2000)
def snapshot_get(self, snapshot_id, db=None, cur=None):
diff --git a/swh/storage/tests/algos/test_snapshot.py b/swh/storage/tests/algos/test_snapshot.py
--- a/swh/storage/tests/algos/test_snapshot.py
+++ b/swh/storage/tests/algos/test_snapshot.py
@@ -12,7 +12,7 @@
from_regex, none, one_of, sampled_from)
from swh.model.identifiers import snapshot_identifier, identifier_to_bytes
-from swh.storage.tests.storage_testing import StorageTestFixture
+from swh.storage.tests.storage_testing import PgStorageTestFixture
from swh.storage.algos.snapshot import snapshot_get_all_branches
@@ -94,7 +94,7 @@
@pytest.mark.db
-class TestSnapshotAllBranches(StorageTestFixture, unittest.TestCase):
+class TestSnapshotAllBranches(PgStorageTestFixture, unittest.TestCase):
@given(origins(), datetimes(), snapshots(min_size=0, max_size=10,
only_objects=False))
def test_snapshot_small(self, origin, ts, snapshot):
diff --git a/swh/storage/tests/storage_testing.py b/swh/storage/tests/storage_testing.py
--- a/swh/storage/tests/storage_testing.py
+++ b/swh/storage/tests/storage_testing.py
@@ -5,6 +5,7 @@
import os
import tempfile
+import warnings
from swh.storage import get_storage
@@ -12,7 +13,7 @@
from swh.storage.tests import SQL_DIR
-class StorageTestFixture(SingleDbTestFixture):
+class PgStorageTestFixture(SingleDbTestFixture):
"""Mix this in a test subject class to get Storage testing support.
This fixture requires to come before SingleDbTestFixture in the
@@ -50,8 +51,17 @@
def tearDown(self):
self.objtmp.cleanup()
self.storage = None
+ self.reset_storage_tables()
super().tearDown()
def reset_storage_tables(self):
excluded = {'dbversion', 'tool'}
self.reset_db_tables(self.TEST_DB_NAME, excluded=excluded)
+
+
+class StorageTestFixture(PgStorageTestFixture):
+ def __init__(self, *args, **kwargs):
+ warnings.warn("StorageTestFixture was renamed to "
+ "PgStorageTestFixture in v0.0.109.",
+ DeprecationWarning)
+ super().__init__(*args, **kwargs)
diff --git a/swh/storage/tests/test_api_client.py b/swh/storage/tests/test_api_client.py
--- a/swh/storage/tests/test_api_client.py
+++ b/swh/storage/tests/test_api_client.py
@@ -7,14 +7,17 @@
import tempfile
import unittest
+import pytest
+
from swh.core.tests.server_testing import ServerTestFixture
from swh.storage.api.client import RemoteStorage
from swh.storage.api.server import app
from swh.storage.tests.test_storage import CommonTestStorage
+from swh.storage.tests.storage_testing import PgStorageTestFixture
class TestRemoteStorage(CommonTestStorage, ServerTestFixture,
- unittest.TestCase):
+ PgStorageTestFixture, unittest.TestCase):
"""Test the remote storage API.
This class doesn't define any tests as we want identical
@@ -52,3 +55,18 @@
def tearDown(self):
super().tearDown()
shutil.rmtree(self.storage_base)
+
+ @pytest.mark.skip("Can only be tested with local storage as you "
+ "can't mock datetimes for the remote server")
+ def test_fetch_history(self):
+ pass
+
+ @pytest.mark.skip("The remote API doesn't expose _person_add")
+ def test_person_get(self):
+ pass
+
+ @pytest.mark.skip("This test is only relevant on the local "
+ "storage, with an actual objstorage raising an "
+ "exception")
+ def test_content_add_objstorage_exception(self):
+ pass
diff --git a/swh/storage/tests/test_db.py b/swh/storage/tests/test_db.py
--- a/swh/storage/tests/test_db.py
+++ b/swh/storage/tests/test_db.py
@@ -10,7 +10,7 @@
from swh.core.tests.db_testing import SingleDbTestFixture
from swh.model.hashutil import hash_to_bytes
-from swh.storage.db import Db
+from swh.storage.db import PgDb
from . import SQL_DIR
@@ -21,7 +21,7 @@
def setUp(self):
super().setUp()
- self.db = Db(self.conn)
+ self.db = PgDb(self.conn)
def tearDown(self):
self.db.conn.close()
diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py
--- a/swh/storage/tests/test_storage.py
+++ b/swh/storage/tests/test_storage.py
@@ -6,27 +6,23 @@
import copy
import datetime
import unittest
+import warnings
from collections import defaultdict
from operator import itemgetter
from unittest.mock import Mock, patch
import psycopg2
-import pytest
from swh.model import from_disk, identifiers
from swh.model.hashutil import hash_to_bytes
-from swh.storage.tests.storage_testing import StorageTestFixture
+from swh.storage.tests.storage_testing import PgStorageTestFixture
-@pytest.mark.db
-class BaseTestStorage(StorageTestFixture):
+class DataTestStorage:
+ """Base class which provides data to tests."""
def setUp(self):
super().setUp()
- db = self.test_db[self.TEST_DB_NAME]
- self.conn = db.conn
- self.cursor = db.cursor
-
self.maxDiff = None
self.cont = {
@@ -509,17 +505,22 @@
'next_branch': None
}
- def tearDown(self):
- self.reset_storage_tables()
- super().tearDown()
+class BaseTestFixture(DataTestStorage, PgStorageTestFixture):
+ def __init__(self, *args, **kwargs):
+ warnings.warn("BaseTestFixture was renamed to "
+ "DataTestStorage in v0.0.109, and no longer inherits"
+ "from (Pg)StorageTestFixture",
+ DeprecationWarning)
+ super().__init__(*args, **kwargs)
-class CommonTestStorage(BaseTestStorage):
+
+class CommonTestStorage(DataTestStorage):
"""Base class for Storage testing.
This class is used as-is to test local storage (see TestLocalStorage
- below) and remote storage (see TestRemoteStorage in
- test_remote_storage.py.
+ and TestMemStorage below) and remote storage (see TestRemoteStorage in
+ test_api_client.py.
We need to have the two classes inherit from this base class
separately to avoid nosetests running the tests from the base
@@ -1841,12 +1842,6 @@
self.assertEqual(m_by_provider[0]['id'], o_m2)
self.assertIsNotNone(o_m1)
-
-class TestLocalStorage(CommonTestStorage, unittest.TestCase):
- """Test the local storage"""
-
- # Can only be tested with local storage as you can't mock
- # datetimes for the remote server
def test_fetch_history(self):
origin = self.storage.origin_add_one(self.origin)
with patch('datetime.datetime'):
@@ -1869,7 +1864,6 @@
self.assertEqual(expected_fetch_history, fetch_history)
- # The remote API doesn't expose _person_add
def test_person_get(self):
# given
person0 = {
@@ -1906,8 +1900,6 @@
},
])
- # This test is only relevant on the local storage, with an actual
- # objstorage raising an exception
def test_content_add_objstorage_exception(self):
self.storage.objstorage.add = Mock(
side_effect=Exception('mocked broken objstorage')
@@ -1921,7 +1913,18 @@
self.assertEqual(missing, [self.cont['sha1']])
-class AlteringSchemaTest(BaseTestStorage, unittest.TestCase):
+class TestLocalStorage(CommonTestStorage, PgStorageTestFixture,
+ unittest.TestCase):
+ """Test the local storage"""
+ def setUp(self):
+ super().setUp()
+ db = self.test_db[self.TEST_DB_NAME]
+ self.conn = db.conn
+ self.cursor = db.cursor
+
+
+class AlteringSchemaTest(DataTestStorage, PgStorageTestFixture,
+ unittest.TestCase):
"""This class is dedicated for the rare case where the schema needs to
be altered dynamically.
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Mar 17 2025, 6:47 PM (7 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3220815
Attached To
D631: Make Storage, Db, and tests more modular.
Event Timeline
Log In to Comment