diff --git a/db_testing.py b/db_testing.py deleted file mode 100644 index 8cba731..0000000 --- a/db_testing.py +++ /dev/null @@ -1,140 +0,0 @@ -# Copyright (C) 2015 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import os -import psycopg2 -import subprocess - - -TEST_DB_NAME = 'softwareheritage-test' -TEST_DIR = os.path.dirname(os.path.abspath(__file__)) -TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../swh-storage-testdata') -TEST_DB_DUMP = os.path.join(TEST_DATA_DIR, 'dumps/swh.dump') - - -def pg_restore(dbname, dumpfile): - subprocess.check_call(['pg_restore', '--no-owner', '--no-privileges', - '--dbname', dbname, dumpfile]) - - -def pg_dump(dbname, dumpfile): - subprocess.check_call(['pg_dump', '--no-owner', '--no-privileges', '-Fc', - '-f', dumpfile, dbname]) - - -def pg_dropdb(dbname): - subprocess.check_call(['dropdb', dbname]) - - -def pg_createdb(dbname): - subprocess.check_call(['createdb', dbname]) - - -def db_create(test_subj, dbname=TEST_DB_NAME, dbdump=TEST_DB_DUMP): - """create the test DB and load the test data dump into it - - context: setUpClass - - """ - try: - pg_createdb(dbname) - except subprocess.CalledProcessError: # try recovering once, in case - pg_dropdb(dbname) # the db already existed - pg_createdb(dbname) - pg_restore(dbname, dbdump) - test_subj.dbname = dbname - - -def db_destroy(test_subj): - """destroy the test DB - - context: tearDownClass - - """ - pg_dropdb(test_subj.dbname) - - -def db_connect(test_subj): - """connect to the test DB and open a cursor - - context: setUp - - """ - test_subj.conn = psycopg2.connect('dbname=' + test_subj.dbname) - test_subj.cursor = test_subj.conn.cursor() - - -def db_close(test_subj): - """rollback current transaction and disconnet from the test DB - - context: tearDown - - """ - if not test_subj.conn.closed: - test_subj.conn.rollback() - test_subj.conn.close() - - -class DbTestFixture(): - """Mix this in a test subject class to get DB testing support. - - The test case class will then have the following attributes, accessible via - self: - - dbname: name of the test database - conn: psycopg2 connection object - cursor: open psycopg2 cursor to the DB - - To ensure test isolation, each test method of the test case class will - execute in its own connection, cursor, and transaction. - - To ensure setup/teardown methods are called, in case of multiple - inheritance DbTestFixture should be the first class in the inheritance - hierarchy. - - Note that if you want to define setup/teardown methods, you need to - explicitly call super() to ensure that the fixture setup/teardown methods - are invoked. Here is an example where all setup/teardown methods are - defined in a test case: - - class TestDb(DbTestFixture, unittest.TestCase): - - @classmethod - def setUpClass(cls): - super().setUpClass() - # your class setup code here - - def setUp(self): - super().setUp() - # your instance setup code here - - def tearDown(self): - # your instance teardown code here - super().tearDown() - - @classmethod - def tearDownClass(cls): - # your class teardown code here - super().tearDownClass() - - """ - - @classmethod - def setUpClass(cls): - db_create(cls) - super().setUpClass() - - def setUp(self): - db_connect(self) - super().setUp() - - def tearDown(self): - super().tearDown() - db_close(self) - - @classmethod - def tearDownClass(cls): - super().tearDownClass() - db_destroy(cls) diff --git a/test_db.py b/test_db.py index 8c3a75f..ca10f8c 100644 --- a/test_db.py +++ b/test_db.py @@ -1,46 +1,46 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest from nose.tools import istest from nose.plugins.attrib import attr -from .db_testing import DbTestFixture +from swh.core.tests.db_testing import DbTestFixture from swh.core.hashutil import hex_to_hash from swh.storage.db import Db @attr('db') class TestDb(DbTestFixture, unittest.TestCase): def setUp(self): super().setUp() self.db = Db(self.conn) def tearDown(self): self.db.conn.close() super().tearDown() @istest def add_content(self): cur = self.cursor sha1 = hex_to_hash('34973274ccef6ab4dfaaf86599792fa9c3fe4689') self.db.mktemp('content', cur) self.db.copy_to([{ 'sha1': sha1, 'sha1_git': hex_to_hash( 'd81cc0710eb6cf9efd5b920a8453e1e07157b6cd'), 'sha256': hex_to_hash( '673650f936cb3b0a2f93ce09d81be107' '48b1b203c19e8176b4eefc1964a0cf3a'), 'length': 3}], 'tmp_content', ['sha1', 'sha1_git', 'sha256', 'length'], cur) self.db.content_add_from_temp(cur) self.cursor.execute('SELECT sha1 FROM content WHERE sha1 = %s', (sha1,)) self.assertEqual(self.cursor.fetchone()[0].tobytes(), sha1) diff --git a/test_storage.py b/test_storage.py index 782f535..80208d0 100644 --- a/test_storage.py +++ b/test_storage.py @@ -1,304 +1,304 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import shutil import tempfile import unittest from nose.tools import istest from nose.plugins.attrib import attr -from .db_testing import DbTestFixture +from swh.core.tests.db_testing import DbTestFixture from swh.core.hashutil import hex_to_hash from swh.storage import Storage @attr('db') class AbstractTestStorage(DbTestFixture): """Base class for Storage testing. This class is used as-is to test local storage (see TestStorage below) and remote storage (see TestRemoteStorage in test_remote_storage.py. We need to have the two classes inherit from this base class separately to avoid nosetests running the tests from the base class twice. """ def setUp(self): super().setUp() self.objroot = tempfile.mkdtemp() self.storage = Storage(self.conn, self.objroot) self.cont = { 'data': b'42\n', 'length': 3, 'sha1': hex_to_hash( '34973274ccef6ab4dfaaf86599792fa9c3fe4689'), 'sha1_git': hex_to_hash( 'd81cc0710eb6cf9efd5b920a8453e1e07157b6cd'), 'sha256': hex_to_hash( '673650f936cb3b0a2f93ce09d81be107' '48b1b203c19e8176b4eefc1964a0cf3a'), } self.cont2 = { 'data': b'4242\n', 'length': 5, 'sha1': hex_to_hash( '61c2b3a30496d329e21af70dd2d7e097046d07b7'), 'sha1_git': hex_to_hash( '36fade77193cb6d2bd826161a0979d64c28ab4fa'), 'sha256': hex_to_hash( '859f0b154fdb2d630f45e1ecae4a8629' '15435e663248bb8461d914696fc047cd'), } self.missing_cont = { 'data': b'missing\n', 'length': 8, 'sha1': hex_to_hash( 'f9c24e2abb82063a3ba2c44efd2d3c797f28ac90'), 'sha1_git': hex_to_hash( '33e45d56f88993aae6a0198013efa80716fd8919'), 'sha256': hex_to_hash( '6bbd052ab054ef222c1c87be60cd191a' 'ddedd24cc882d1f5f7f7be61dc61bb3a'), } self.skipped_cont = { 'length': 1024 * 1024 * 200, 'sha1_git': hex_to_hash( '33e45d56f88993aae6a0198013efa80716fd8920'), 'reason': 'Content too long', 'status': 'absent', } self.dir = { 'id': b'4\x013\x422\x531\x000\xf51\xe62\xa73\xff7\xc3\xa90', 'entries': [ { 'name': b'foo', 'type': 'file', 'target': self.cont['sha1_git'], 'perms': 0o644, 'atime': None, 'ctime': None, 'mtime': None, }, { 'name': b'bar\xc3', 'type': 'dir', 'target': b'12345678901234567890', 'perms': 0o2000, 'atime': None, 'ctime': None, 'mtime': None, }, ], } self.revision = { 'id': b'56789012345678901234', 'message': 'hello', 'author_name': 'Nicolas Dandrimont', 'author_email': 'nicolas@example.com', 'committer_name': 'Stefano Zacchiroli', 'committer_email': 'stefano@example.com', 'parents': [b'01234567890123456789'], 'date': datetime.datetime(2015, 1, 1, 22, 0, 0), 'date_offset': 120, 'committer_date': datetime.datetime(2015, 1, 2, 22, 0, 0), 'committer_date_offset': -120, 'type': 'git', 'directory': self.dir['id'], } self.origin = { 'url': 'file:///dev/null', 'type': 'git', } self.origin2 = { 'url': 'file:///dev/zero', 'type': 'git', } self.occurrence = { 'branch': 'master', 'revision': b'67890123456789012345', 'authority': 1, 'validity': datetime.datetime(2015, 1, 1, 23, 0, 0), } def tearDown(self): shutil.rmtree(self.objroot) super().tearDown() @istest def content_add(self): cont = self.cont self.storage.content_add([cont]) if hasattr(self.storage, 'objstorage'): self.assertIn(cont['sha1'], self.storage.objstorage) self.cursor.execute('SELECT sha1, sha1_git, sha256, length, status' ' FROM content WHERE sha1 = %s', (cont['sha1'],)) datum = self.cursor.fetchone() self.assertEqual( (datum[0].tobytes(), datum[1].tobytes(), datum[2].tobytes(), datum[3], datum[4]), (cont['sha1'], cont['sha1_git'], cont['sha256'], cont['length'], 'visible')) @istest def skipped_content_add(self): cont = self.skipped_cont self.storage.content_add([self.skipped_cont]) self.cursor.execute('SELECT sha1, sha1_git, sha256, length, status,' 'reason FROM skipped_content WHERE sha1_git = %s', (cont['sha1_git'],)) datum = self.cursor.fetchone() self.assertEqual( (datum[0], datum[1].tobytes(), datum[2], datum[3], datum[4], datum[5]), (None, cont['sha1_git'], None, cont['length'], 'absent', 'Content too long')) @istest def content_missing(self): cont2 = self.cont2 missing_cont = self.missing_cont self.storage.content_add([cont2]) gen = self.storage.content_missing([cont2, missing_cont]) self.assertEqual(list(gen), [missing_cont['sha1']]) @istest def content_exist_with_present_content(self): # 1. with something to find cont = self.cont self.storage.content_add([cont]) actually_present = self.storage.content_exist({'sha1': cont['sha1']}) self.assertEquals(actually_present, True, "Should be present") # 2. with something to find actually_present = self.storage.content_exist( {'sha1_git': cont['sha1_git']}) self.assertEquals(actually_present, True, "Should be present") # 3. with something to find actually_present = self.storage.content_exist( {'sha256': cont['sha256']}) self.assertEquals(actually_present, True, "Should be present") # 4. with something to find actually_present = self.storage.content_exist( {'sha1': cont['sha1'], 'sha1_git': cont['sha1_git'], 'sha256': cont['sha256']}) self.assertEquals(actually_present, True, "Should be present") @istest def content_exist_with_non_present_content(self): # 1. with something that does not exist missing_cont = self.missing_cont actually_present = self.storage.content_exist( {'sha1': missing_cont['sha1']}) self.assertEquals(actually_present, False, "Should be missing") # 2. with something that does not exist actually_present = self.storage.content_exist( {'sha1_git': missing_cont['sha1_git']}) self.assertEquals(actually_present, False, "Should be missing") # 3. with something that does not exist actually_present = self.storage.content_exist( {'sha256': missing_cont['sha256']}) self.assertEquals(actually_present, False, "Should be missing") @istest def content_exist_bad_input(self): # 1. with bad input with self.assertRaises(ValueError) as cm: self.storage.content_exist({}) # empty is bad self.assertEqual(cm.exception.args, ('Key must be one of sha1, git_sha1, sha256.',)) # 2. with bad input with self.assertRaises(ValueError) as cm: self.storage.content_exist( {'unknown-sha1': 'something'}) # not the right key self.assertEqual(cm.exception.args, ('Key must be one of sha1, git_sha1, sha256.',)) @istest def directory_add(self): init_missing = list(self.storage.directory_missing([self.dir['id']])) self.assertEqual([self.dir['id']], init_missing) self.storage.directory_add([self.dir]) stored_data = list(self.storage.directory_get(self.dir['id'])) data_to_store = [ (self.dir['id'], ent['type'], ent['target'], ent['name'], ent['perms'], ent['atime'], ent['ctime'], ent['mtime']) for ent in sorted(self.dir['entries'], key=lambda ent: ent['name']) ] self.assertEqual(data_to_store, stored_data) after_missing = list(self.storage.directory_missing([self.dir['id']])) self.assertEqual([], after_missing) @istest def revision_add(self): init_missing = self.storage.revision_missing([self.revision['id']]) self.assertEqual([self.revision['id']], list(init_missing)) self.storage.revision_add([self.revision]) end_missing = self.storage.revision_missing([self.revision['id']]) self.assertEqual([], list(end_missing)) @istest def origin_add(self): self.assertIsNone(self.storage.origin_get(self.origin)) id = self.storage.origin_add_one(self.origin) self.assertEqual(self.storage.origin_get(self.origin), id) @istest def occurrence_add(self): origin_id = self.storage.origin_add_one(self.origin2) revision = self.revision.copy() revision['id'] = self.occurrence['revision'] self.storage.revision_add([revision]) self.occurrence['origin'] = origin_id self.storage.occurrence_add([self.occurrence]) class TestStorage(AbstractTestStorage, unittest.TestCase): """Test the local storage""" pass