diff --git a/swh/storage/tests/algos/test_diff.py b/swh/storage/tests/algos/test_diff.py index c25e0f31b..ecc9f107b 100644 --- a/swh/storage/tests/algos/test_diff.py +++ b/swh/storage/tests/algos/test_diff.py @@ -1,304 +1,294 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information # flake8: noqa import unittest - from unittest.mock import patch -from nose.tools import istest, nottest - from swh.model.hashutil import hash_to_bytes from swh.model.identifiers import directory_identifier from swh.storage.algos import diff from .test_dir_iterator import DirectoryModel @patch('swh.storage.algos.diff._get_rev') @patch('swh.storage.algos.dir_iterators._get_dir') class TestDiffRevisions(unittest.TestCase): - - @nottest def diff_revisions(self, rev_from, rev_to, from_dir_model, to_dir_model, expected_changes, mock_get_dir, mock_get_rev): rev_from_bytes = hash_to_bytes(rev_from) rev_to_bytes = hash_to_bytes(rev_to) def _get_rev(*args, **kwargs): if args[1] == rev_from_bytes: return {'directory': from_dir_model['target']} else: return {'directory': to_dir_model['target']} def _get_dir(*args, **kwargs): from_dir = from_dir_model.get_hash_data(args[1]) to_dir = to_dir_model.get_hash_data(args[1]) return from_dir if from_dir != None else to_dir mock_get_rev.side_effect = _get_rev mock_get_dir.side_effect = _get_dir changes = diff.diff_revisions(None, rev_from_bytes, rev_to_bytes, track_renaming=True) self.assertEqual(changes, expected_changes) - @istest def test_insert_delete(self, mock_get_dir, mock_get_rev): rev_from = '898ff03e1e7925ecde3da66327d3cdc7e07625ba' rev_to = '647c3d381e67490e82cdbbe6c96e46d5e1628ce2' from_dir_model = DirectoryModel() to_dir_model = DirectoryModel() to_dir_model.add_file(b'file1', 'ea15f54ca215e7920c60f564315ebb7f911a5204') to_dir_model.add_file(b'file2', '3e5faecb3836ffcadf82cc160787e35d4e2bec6a') to_dir_model.add_file(b'file3', '2ae33b2984974d35eababe4890d37fbf4bce6b2c') expected_changes = \ [{ 'type': 'insert', 'from': None, 'from_path': None, 'to': to_dir_model.get_path_data(b'file1'), 'to_path': b'file1' }, { 'type': 'insert', 'from': None, 'from_path': None, 'to': to_dir_model.get_path_data(b'file2'), 'to_path': b'file2' }, { 'type': 'insert', 'from': None, 'from_path': None, 'to': to_dir_model.get_path_data(b'file3'), 'to_path': b'file3' }] self.diff_revisions(rev_from, rev_to, from_dir_model, to_dir_model, expected_changes, mock_get_dir, mock_get_rev) from_dir_model = DirectoryModel() from_dir_model.add_file(b'file1', 'ea15f54ca215e7920c60f564315ebb7f911a5204') from_dir_model.add_file(b'file2', '3e5faecb3836ffcadf82cc160787e35d4e2bec6a') from_dir_model.add_file(b'file3', '2ae33b2984974d35eababe4890d37fbf4bce6b2c') to_dir_model = DirectoryModel() expected_changes = \ [{ 'type': 'delete', 'from': from_dir_model.get_path_data(b'file1'), 'from_path': b'file1', 'to': None, 'to_path': None }, { 'type': 'delete', 'from': from_dir_model.get_path_data(b'file2'), 'from_path': b'file2', 'to': None, 'to_path': None }, { 'type': 'delete', 'from': from_dir_model.get_path_data(b'file3'), 'from_path': b'file3', 'to': None, 'to_path': None }] self.diff_revisions(rev_from, rev_to, from_dir_model, to_dir_model, expected_changes, mock_get_dir, mock_get_rev) - @istest def test_onelevel_diff(self, mock_get_dir, mock_get_rev): rev_from = '898ff03e1e7925ecde3da66327d3cdc7e07625ba' rev_to = '647c3d381e67490e82cdbbe6c96e46d5e1628ce2' from_dir_model = DirectoryModel() from_dir_model.add_file(b'file1', 'ea15f54ca215e7920c60f564315ebb7f911a5204') from_dir_model.add_file(b'file2', 'f4a96b2000be83b61254d107046fa9777b17eb34') from_dir_model.add_file(b'file3', 'd3c00f9396c6d0277727cec522ff6ad1ea0bc2da') to_dir_model = DirectoryModel() to_dir_model.add_file(b'file2', '3ee0f38ee0ea23cc2c8c0b9d66b27be4596b002b') to_dir_model.add_file(b'file3', 'd3c00f9396c6d0277727cec522ff6ad1ea0bc2da') to_dir_model.add_file(b'file4', '40460b9653b1dc507e1b6eb333bd4500634bdffc') expected_changes = \ [{ 'type': 'delete', 'from': from_dir_model.get_path_data(b'file1'), 'from_path': b'file1', 'to': None, 'to_path': None}, { 'type': 'modify', 'from': from_dir_model.get_path_data(b'file2'), 'from_path': b'file2', 'to': to_dir_model.get_path_data(b'file2'), 'to_path': b'file2'}, { 'type': 'insert', 'from': None, 'from_path': None, 'to': to_dir_model.get_path_data(b'file4'), 'to_path': b'file4' }] self.diff_revisions(rev_from, rev_to, from_dir_model, to_dir_model, expected_changes, mock_get_dir, mock_get_rev) - @istest def test_twolevels_diff(self, mock_get_dir, mock_get_rev): rev_from = '898ff03e1e7925ecde3da66327d3cdc7e07625ba' rev_to = '647c3d381e67490e82cdbbe6c96e46d5e1628ce2' from_dir_model = DirectoryModel() from_dir_model.add_file(b'file1', 'ea15f54ca215e7920c60f564315ebb7f911a5204') from_dir_model.add_file(b'dir1/file1', '8335fca266811bac7ae5c8e1621476b4cf4156b6') from_dir_model.add_file(b'dir1/file2', 'a6127d909e79f1fcb28bbf220faf86e7be7831e5') from_dir_model.add_file(b'dir1/file3', '18049b8d067ce1194a7e1cce26cfa3ae4242a43d') from_dir_model.add_file(b'file2', 'd3c00f9396c6d0277727cec522ff6ad1ea0bc2da') to_dir_model = DirectoryModel() to_dir_model.add_file(b'file1', '3ee0f38ee0ea23cc2c8c0b9d66b27be4596b002b') to_dir_model.add_file(b'dir1/file2', 'de3548b32a8669801daa02143a66dae21fe852fd') to_dir_model.add_file(b'dir1/file3', '18049b8d067ce1194a7e1cce26cfa3ae4242a43d') to_dir_model.add_file(b'dir1/file4', 'f5c3f42aec5fe7b92276196c350cbadaf4c51f87') to_dir_model.add_file(b'file2', 'd3c00f9396c6d0277727cec522ff6ad1ea0bc2da') expected_changes = \ [{ 'type': 'delete', 'from': from_dir_model.get_path_data(b'dir1/file1'), 'from_path': b'dir1/file1', 'to': None, 'to_path': None }, { 'type': 'modify', 'from': from_dir_model.get_path_data(b'dir1/file2'), 'from_path': b'dir1/file2', 'to': to_dir_model.get_path_data(b'dir1/file2'), 'to_path': b'dir1/file2' }, { 'type': 'insert', 'from': None, 'from_path': None, 'to': to_dir_model.get_path_data(b'dir1/file4'), 'to_path': b'dir1/file4' }, { 'type': 'modify', 'from': from_dir_model.get_path_data(b'file1'), 'from_path': b'file1', 'to': to_dir_model.get_path_data(b'file1'), 'to_path': b'file1' }] self.diff_revisions(rev_from, rev_to, from_dir_model, to_dir_model, expected_changes, mock_get_dir, mock_get_rev) - @istest def test_insert_delete_empty_dirs(self, mock_get_dir, mock_get_rev): rev_from = '898ff03e1e7925ecde3da66327d3cdc7e07625ba' rev_to = '647c3d381e67490e82cdbbe6c96e46d5e1628ce2' from_dir_model = DirectoryModel() from_dir_model.add_file(b'dir3/file1', 'ea15f54ca215e7920c60f564315ebb7f911a5204') to_dir_model = DirectoryModel() to_dir_model.add_file(b'dir3/file1', 'ea15f54ca215e7920c60f564315ebb7f911a5204') to_dir_model.add_file(b'dir3/dir1/') expected_changes = \ [{ 'type': 'insert', 'from': None, 'from_path': None, 'to': to_dir_model.get_path_data(b'dir3/dir1'), 'to_path': b'dir3/dir1' }] self.diff_revisions(rev_from, rev_to, from_dir_model, to_dir_model, expected_changes, mock_get_dir, mock_get_rev) from_dir_model = DirectoryModel() from_dir_model.add_file(b'dir1/dir2/') from_dir_model.add_file(b'dir1/file1', 'ea15f54ca215e7920c60f564315ebb7f911a5204') to_dir_model = DirectoryModel() to_dir_model.add_file(b'dir1/file1', 'ea15f54ca215e7920c60f564315ebb7f911a5204') expected_changes = \ [{ 'type': 'delete', 'from': from_dir_model.get_path_data(b'dir1/dir2'), 'from_path': b'dir1/dir2', 'to': None, 'to_path': None }] self.diff_revisions(rev_from, rev_to, from_dir_model, to_dir_model, expected_changes, mock_get_dir, mock_get_rev) - @istest def test_track_renaming(self, mock_get_dir, mock_get_rev): rev_from = '898ff03e1e7925ecde3da66327d3cdc7e07625ba' rev_to = '647c3d381e67490e82cdbbe6c96e46d5e1628ce2' from_dir_model = DirectoryModel() from_dir_model.add_file(b'file1_oldname', 'ea15f54ca215e7920c60f564315ebb7f911a5204') from_dir_model.add_file(b'dir1/file1_oldname', 'ea15f54ca215e7920c60f564315ebb7f911a5204') from_dir_model.add_file(b'file2_oldname', 'd3c00f9396c6d0277727cec522ff6ad1ea0bc2da') to_dir_model = DirectoryModel() to_dir_model.add_file(b'dir1/file1_newname', 'ea15f54ca215e7920c60f564315ebb7f911a5204') to_dir_model.add_file(b'dir2/file1_newname', 'ea15f54ca215e7920c60f564315ebb7f911a5204') to_dir_model.add_file(b'file2_newname', 'd3c00f9396c6d0277727cec522ff6ad1ea0bc2da') expected_changes = \ [{ 'type': 'rename', 'from': from_dir_model.get_path_data(b'dir1/file1_oldname'), 'from_path': b'dir1/file1_oldname', 'to': to_dir_model.get_path_data(b'dir1/file1_newname'), 'to_path': b'dir1/file1_newname' }, { 'type': 'rename', 'from': from_dir_model.get_path_data(b'file1_oldname'), 'from_path': b'file1_oldname', 'to': to_dir_model.get_path_data(b'dir2/file1_newname'), 'to_path': b'dir2/file1_newname' }, { 'type': 'rename', 'from': from_dir_model.get_path_data(b'file2_oldname'), 'from_path': b'file2_oldname', 'to': to_dir_model.get_path_data(b'file2_newname'), 'to_path': b'file2_newname' }] self.diff_revisions(rev_from, rev_to, from_dir_model, to_dir_model, expected_changes, mock_get_dir, mock_get_rev) diff --git a/swh/storage/tests/algos/test_dir_iterator.py b/swh/storage/tests/algos/test_dir_iterator.py index 44830c7b0..1590afdb1 100644 --- a/swh/storage/tests/algos/test_dir_iterator.py +++ b/swh/storage/tests/algos/test_dir_iterator.py @@ -1,153 +1,146 @@ # Copyright (C) 2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest - from unittest.mock import patch -from nose.tools import istest, nottest - from swh.model.from_disk import DentryPerms -from swh.model.hashutil import hash_to_bytes, MultiHash +from swh.model.hashutil import MultiHash, hash_to_bytes from swh.model.identifiers import directory_identifier from swh.storage.algos.dir_iterators import dir_iterator # flake8: noqa class DirectoryModel(object): """ Quick and dirty directory model to ease the writing of directory iterators and revision trees differential tests. """ def __init__(self, name=''): self.data = {} self.data['name'] = name self.data['perms'] = DentryPerms.directory self.data['type'] = 'dir' self.data['entries'] = [] self.data['entry_idx'] = {} def __getitem__(self, item): if item == 'target': return hash_to_bytes(directory_identifier(self)) else: return self.data[item] def add_file(self, path, sha1=None): path_parts = path.split(b'/') sha1 = hash_to_bytes(sha1) if sha1 \ else MultiHash.from_data(path).digest()['sha1'] if len(path_parts) == 1: self['entry_idx'][path] = len(self['entries']) self['entries'].append({ 'target': sha1, 'name': path, 'perms': DentryPerms.content, 'type': 'file' }) else: if not path_parts[0] in self['entry_idx']: self['entry_idx'][path_parts[0]] = len(self['entries']) self['entries'].append(DirectoryModel(path_parts[0])) if path_parts[1]: dir_idx = self['entry_idx'][path_parts[0]] self['entries'][dir_idx].add_file(b'/'.join(path_parts[1:]), sha1) def get_hash_data(self, entry_hash): if self['target'] == entry_hash: ret = [] for e in self['entries']: ret.append({ 'target': e['target'], 'name': e['name'], 'perms': e['perms'], 'type': e['type'] }) return ret else: for e in self['entries']: if e['type'] == 'file' and e['target'] == entry_hash: return e elif e['type'] == 'dir': data = e.get_hash_data(entry_hash) if data: return data return None def get_path_data(self, path): path_parts = path.split(b'/') entry_idx = self['entry_idx'][path_parts[0]] entry = self['entries'][entry_idx] if len(path_parts) == 1: return { 'target': entry['target'], 'name': entry['name'], 'perms': entry['perms'], 'type': entry['type'] } else: return entry.get_path_data(b'/'.join(path_parts[1:])) @patch('swh.storage.algos.dir_iterators._get_dir') class TestDirectoryIterator(unittest.TestCase): - @nottest def check_iterated_paths(self, dir_model, expected_paths_order, mock_get_dir): def _get_dir(*args, **kwargs): return dir_model.get_hash_data(args[1]) mock_get_dir.side_effect = _get_dir # noqa paths_order = [e['path'] for e in dir_iterator(None, dir_model['target'])] self.assertEqual(paths_order, expected_paths_order) - @istest def test_dir_iterator_empty_dir(self, mock_get_dir): dir_model = DirectoryModel() expected_paths_order = [] self.check_iterated_paths(dir_model, expected_paths_order, mock_get_dir) - @istest def test_dir_iterator_no_empty_dirs(self, mock_get_dir): dir_model = DirectoryModel() dir_model.add_file(b'xyz/gtr/uhb') dir_model.add_file(b'bca/ef') dir_model.add_file(b'abc/ab') dir_model.add_file(b'abc/bc') dir_model.add_file(b'xyz/ouy/poi') expected_paths_order = [b'abc', b'abc/ab', b'abc/bc', b'bca', b'bca/ef', b'xyz', b'xyz/gtr', b'xyz/gtr/uhb', b'xyz/ouy', b'xyz/ouy/poi'] self.check_iterated_paths(dir_model, expected_paths_order, mock_get_dir) - @istest def test_dir_iterator_with_empty_dirs(self, mock_get_dir): dir_model = DirectoryModel() dir_model.add_file(b'xyz/gtr/') dir_model.add_file(b'bca/ef') dir_model.add_file(b'abc/') dir_model.add_file(b'xyz/ouy/poi') expected_paths_order = [b'abc', b'bca', b'bca/ef', b'xyz', b'xyz/gtr', b'xyz/ouy', b'xyz/ouy/poi'] self.check_iterated_paths(dir_model, expected_paths_order, mock_get_dir) diff --git a/swh/storage/tests/test_api_client.py b/swh/storage/tests/test_api_client.py index d6f1e5268..2a49e39d2 100644 --- a/swh/storage/tests/test_api_client.py +++ b/swh/storage/tests/test_api_client.py @@ -1,54 +1,54 @@ # Copyright (C) 2015-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import shutil import tempfile import unittest from swh.core.tests.server_testing import ServerTestFixture -from swh.storage.tests.test_storage import CommonTestStorage from swh.storage.api.client import RemoteStorage from swh.storage.api.server import app +from swh.storage.tests.test_storage import CommonTestStorage class TestRemoteStorage(CommonTestStorage, ServerTestFixture, unittest.TestCase): """Test the remote storage API. This class doesn't define any tests as we want identical functionality between local and remote storage. All the tests are therefore defined in CommonTestStorage. """ def setUp(self): # ServerTestFixture needs to have self.objroot for # setUp() method, but this field is defined in # AbstractTestStorage's setUp() # To avoid confusion, override the self.objroot to a # one chosen in this class. self.storage_base = tempfile.mkdtemp() self.config = { 'storage': { 'cls': 'local', 'args': { 'db': 'dbname=%s' % self.TEST_STORAGE_DB_NAME, 'objstorage': { 'cls': 'pathslicing', 'args': { 'root': self.storage_base, 'slicing': '0:2', }, }, } } } self.app = app super().setUp() self.storage = RemoteStorage(self.url()) self.objroot = self.storage_base def tearDown(self): super().tearDown() shutil.rmtree(self.storage_base) diff --git a/swh/storage/tests/test_converters.py b/swh/storage/tests/test_converters.py index f5ce97782..dc8137ae2 100644 --- a/swh/storage/tests/test_converters.py +++ b/swh/storage/tests/test_converters.py @@ -1,130 +1,125 @@ # Copyright (C) 2015 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import unittest -from nose.tools import istest from nose.plugins.attrib import attr from swh.storage import converters @attr('!db') class TestConverters(unittest.TestCase): def setUp(self): self.maxDiff = None - @istest - def db_to_author(self): + def test_db_to_author(self): # when actual_author = converters.db_to_author( 1, b'fullname', b'name', b'email') # then self.assertEquals(actual_author, { 'id': 1, 'fullname': b'fullname', 'name': b'name', 'email': b'email', }) - @istest - def db_to_revision(self): + def test_db_to_revision(self): # when actual_revision = converters.db_to_revision({ 'id': 'revision-id', 'date': None, 'date_offset': None, 'date_neg_utc_offset': None, 'committer_date': None, 'committer_date_offset': None, 'committer_date_neg_utc_offset': None, 'type': 'rev', 'directory': b'dir-sha1', 'message': b'commit message', 'author_id': 'auth-id', 'author_fullname': b'auth-fullname', 'author_name': b'auth-name', 'author_email': b'auth-email', 'committer_id': 'comm-id', 'committer_fullname': b'comm-fullname', 'committer_name': b'comm-name', 'committer_email': b'comm-email', 'metadata': {}, 'synthetic': False, 'parents': [123, 456] }) # then self.assertEquals(actual_revision, { 'id': 'revision-id', 'author': { 'id': 'auth-id', 'fullname': b'auth-fullname', 'name': b'auth-name', 'email': b'auth-email', }, 'date': None, 'committer': { 'id': 'comm-id', 'fullname': b'comm-fullname', 'name': b'comm-name', 'email': b'comm-email', }, 'committer_date': None, 'type': 'rev', 'directory': b'dir-sha1', 'message': b'commit message', 'metadata': {}, 'synthetic': False, 'parents': [123, 456], }) - @istest - def db_to_release(self): + def test_db_to_release(self): # when actual_release = converters.db_to_release({ 'id': b'release-id', 'target': b'revision-id', 'target_type': 'revision', 'date': None, 'date_offset': None, 'date_neg_utc_offset': None, 'name': b'release-name', 'comment': b'release comment', 'synthetic': True, 'author_id': 'auth-id', 'author_fullname': b'auth-fullname', 'author_name': b'auth-name', 'author_email': b'auth-email', }) # then self.assertEquals(actual_release, { 'author': { 'id': 'auth-id', 'fullname': b'auth-fullname', 'name': b'auth-name', 'email': b'auth-email', }, 'date': None, 'id': b'release-id', 'name': b'release-name', 'message': b'release comment', 'synthetic': True, 'target': b'revision-id', 'target_type': 'revision' }) - @istest - def db_to_git_headers(self): + def test_db_to_git_headers(self): raw_data = [ ['gpgsig', b'garbage\x89a\x43b\x14'], ['extra', [b'fo\\\\\\o', b'bar\\', b'inval\\\\\x99id']], ] db_data = converters.git_headers_to_db(raw_data) loop = converters.db_to_git_headers(db_data) self.assertEquals(raw_data, loop) diff --git a/swh/storage/tests/test_db.py b/swh/storage/tests/test_db.py index 0091d6b92..f0952ffca 100644 --- a/swh/storage/tests/test_db.py +++ b/swh/storage/tests/test_db.py @@ -1,55 +1,52 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os import unittest -from nose.tools import istest from nose.plugins.attrib import attr from swh.core.tests.db_testing import SingleDbTestFixture from swh.model.hashutil import hash_to_bytes from swh.storage.db import Db - TEST_DIR = os.path.dirname(os.path.abspath(__file__)) TEST_DATA_DIR = os.path.join(TEST_DIR, '../../../../swh-storage-testdata') @attr('db') class TestDb(SingleDbTestFixture, unittest.TestCase): TEST_DB_DUMP = os.path.join(TEST_DATA_DIR, 'dumps/swh.dump') def setUp(self): super().setUp() self.db = Db(self.conn) def tearDown(self): self.db.conn.close() super().tearDown() - @istest - def add_content(self): + def test_add_content(self): cur = self.cursor sha1 = hash_to_bytes('34973274ccef6ab4dfaaf86599792fa9c3fe4689') self.db.mktemp('content', cur) self.db.copy_to([{ 'sha1': sha1, 'sha1_git': hash_to_bytes( 'd81cc0710eb6cf9efd5b920a8453e1e07157b6cd'), 'sha256': hash_to_bytes( '673650f936cb3b0a2f93ce09d81be107' '48b1b203c19e8176b4eefc1964a0cf3a'), 'blake2s256': hash_to_bytes('69217a3079908094e11121d042354a7c' '1f55b6482ca1a51e1b250dfd1ed0eef9'), 'length': 3}], 'tmp_content', ['sha1', 'sha1_git', 'sha256', 'blake2s256', 'length'], cur) self.db.content_add_from_temp(cur) self.cursor.execute('SELECT sha1 FROM content WHERE sha1 = %s', (sha1,)) self.assertEqual(self.cursor.fetchone()[0].tobytes(), sha1) diff --git a/swh/storage/tests/test_storage.py b/swh/storage/tests/test_storage.py index 6d17bb9df..fcce9e8d3 100644 --- a/swh/storage/tests/test_storage.py +++ b/swh/storage/tests/test_storage.py @@ -1,2302 +1,2241 @@ # Copyright (C) 2015-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from collections import defaultdict import copy import datetime -from operator import itemgetter -import psycopg2 import unittest +from collections import defaultdict +from operator import itemgetter from unittest.mock import Mock, patch -from nose.tools import istest +import psycopg2 from nose.plugins.attrib import attr +from swh.core.tests.db_testing import DbTestFixture from swh.model import from_disk, identifiers from swh.model.hashutil import hash_to_bytes -from swh.core.tests.db_testing import DbTestFixture from swh.storage.tests.storage_testing import StorageTestFixture @attr('db') class BaseTestStorage(StorageTestFixture, DbTestFixture): def setUp(self): super().setUp() db = self.test_db[self.TEST_STORAGE_DB_NAME] self.conn = db.conn self.cursor = db.cursor self.maxDiff = None self.cont = { 'data': b'42\n', 'length': 3, 'sha1': hash_to_bytes( '34973274ccef6ab4dfaaf86599792fa9c3fe4689'), 'sha1_git': hash_to_bytes( 'd81cc0710eb6cf9efd5b920a8453e1e07157b6cd'), 'sha256': hash_to_bytes( '673650f936cb3b0a2f93ce09d81be107' '48b1b203c19e8176b4eefc1964a0cf3a'), 'blake2s256': hash_to_bytes('d5fe1939576527e42cfd76a9455a2' '432fe7f56669564577dd93c4280e76d661d'), 'status': 'visible', } self.cont2 = { 'data': b'4242\n', 'length': 5, 'sha1': hash_to_bytes( '61c2b3a30496d329e21af70dd2d7e097046d07b7'), 'sha1_git': hash_to_bytes( '36fade77193cb6d2bd826161a0979d64c28ab4fa'), 'sha256': hash_to_bytes( '859f0b154fdb2d630f45e1ecae4a8629' '15435e663248bb8461d914696fc047cd'), 'blake2s256': hash_to_bytes('849c20fad132b7c2d62c15de310adfe87be' '94a379941bed295e8141c6219810d'), 'status': 'visible', } self.cont3 = { 'data': b'424242\n', 'length': 7, 'sha1': hash_to_bytes( '3e21cc4942a4234c9e5edd8a9cacd1670fe59f13'), 'sha1_git': hash_to_bytes( 'c932c7649c6dfa4b82327d121215116909eb3bea'), 'sha256': hash_to_bytes( '92fb72daf8c6818288a35137b72155f5' '07e5de8d892712ab96277aaed8cf8a36'), 'blake2s256': hash_to_bytes('76d0346f44e5a27f6bafdd9c2befd304af' 'f83780f93121d801ab6a1d4769db11'), 'status': 'visible', } self.missing_cont = { 'data': b'missing\n', 'length': 8, 'sha1': hash_to_bytes( 'f9c24e2abb82063a3ba2c44efd2d3c797f28ac90'), 'sha1_git': hash_to_bytes( '33e45d56f88993aae6a0198013efa80716fd8919'), 'sha256': hash_to_bytes( '6bbd052ab054ef222c1c87be60cd191a' 'ddedd24cc882d1f5f7f7be61dc61bb3a'), 'blake2s256': hash_to_bytes('306856b8fd879edb7b6f1aeaaf8db9bbecc9' '93cd7f776c333ac3a782fa5c6eba'), 'status': 'absent', } self.skipped_cont = { 'length': 1024 * 1024 * 200, 'sha1_git': hash_to_bytes( '33e45d56f88993aae6a0198013efa80716fd8920'), 'sha1': hash_to_bytes( '43e45d56f88993aae6a0198013efa80716fd8920'), 'sha256': hash_to_bytes( '7bbd052ab054ef222c1c87be60cd191a' 'ddedd24cc882d1f5f7f7be61dc61bb3a'), 'blake2s256': hash_to_bytes( 'ade18b1adecb33f891ca36664da676e1' '2c772cc193778aac9a137b8dc5834b9b'), 'reason': 'Content too long', 'status': 'absent', } self.skipped_cont2 = { 'length': 1024 * 1024 * 300, 'sha1_git': hash_to_bytes( '44e45d56f88993aae6a0198013efa80716fd8921'), 'sha1': hash_to_bytes( '54e45d56f88993aae6a0198013efa80716fd8920'), 'sha256': hash_to_bytes( '8cbd052ab054ef222c1c87be60cd191a' 'ddedd24cc882d1f5f7f7be61dc61bb3a'), 'blake2s256': hash_to_bytes( '9ce18b1adecb33f891ca36664da676e1' '2c772cc193778aac9a137b8dc5834b9b'), 'reason': 'Content too long', 'status': 'absent', } self.dir = { 'id': b'4\x013\x422\x531\x000\xf51\xe62\xa73\xff7\xc3\xa90', 'entries': [ { 'name': b'foo', 'type': 'file', 'target': self.cont['sha1_git'], 'perms': from_disk.DentryPerms.content, }, { 'name': b'bar\xc3', 'type': 'dir', 'target': b'12345678901234567890', 'perms': from_disk.DentryPerms.directory, }, ], } self.dir2 = { 'id': b'4\x013\x422\x531\x000\xf51\xe62\xa73\xff7\xc3\xa95', 'entries': [ { 'name': b'oof', 'type': 'file', 'target': self.cont2['sha1_git'], 'perms': from_disk.DentryPerms.content, } ], } self.dir3 = { 'id': hash_to_bytes('33e45d56f88993aae6a0198013efa80716fd8921'), 'entries': [ { 'name': b'foo', 'type': 'file', 'target': self.cont['sha1_git'], 'perms': from_disk.DentryPerms.content, }, { 'name': b'bar', 'type': 'dir', 'target': b'12345678901234560000', 'perms': from_disk.DentryPerms.directory, }, { 'name': b'hello', 'type': 'file', 'target': b'12345678901234567890', 'perms': from_disk.DentryPerms.content, }, ], } self.minus_offset = datetime.timezone(datetime.timedelta(minutes=-120)) self.plus_offset = datetime.timezone(datetime.timedelta(minutes=120)) self.revision = { 'id': b'56789012345678901234', 'message': b'hello', 'author': { 'name': b'Nicolas Dandrimont', 'email': b'nicolas@example.com', 'fullname': b'Nicolas Dandrimont ', }, 'date': { 'timestamp': 1234567890, 'offset': 120, 'negative_utc': None, }, 'committer': { 'name': b'St\xc3fano Zacchiroli', 'email': b'stefano@example.com', 'fullname': b'St\xc3fano Zacchiroli ' }, 'committer_date': { 'timestamp': 1123456789, 'offset': 0, 'negative_utc': True, }, 'parents': [b'01234567890123456789', b'23434512345123456789'], 'type': 'git', 'directory': self.dir['id'], 'metadata': { 'checksums': { 'sha1': 'tarball-sha1', 'sha256': 'tarball-sha256', }, 'signed-off-by': 'some-dude', 'extra_headers': [ ['gpgsig', b'test123'], ['mergetags', [b'foo\\bar', b'\x22\xaf\x89\x80\x01\x00']], ], }, 'synthetic': True } self.revision2 = { 'id': b'87659012345678904321', 'message': b'hello again', 'author': { 'name': b'Roberto Dicosmo', 'email': b'roberto@example.com', 'fullname': b'Roberto Dicosmo ', }, 'date': { 'timestamp': { 'seconds': 1234567843, 'microseconds': 220000, }, 'offset': -720, 'negative_utc': None, }, 'committer': { 'name': b'tony', 'email': b'ar@dumont.fr', 'fullname': b'tony ', }, 'committer_date': { 'timestamp': 1123456789, 'offset': 0, 'negative_utc': False, }, 'parents': [b'01234567890123456789'], 'type': 'git', 'directory': self.dir2['id'], 'metadata': None, 'synthetic': False } self.revision3 = { 'id': hash_to_bytes('7026b7c1a2af56521e951c01ed20f255fa054238'), 'message': b'a simple revision with no parents this time', 'author': { 'name': b'Roberto Dicosmo', 'email': b'roberto@example.com', 'fullname': b'Roberto Dicosmo ', }, 'date': { 'timestamp': { 'seconds': 1234567843, 'microseconds': 220000, }, 'offset': -720, 'negative_utc': None, }, 'committer': { 'name': b'tony', 'email': b'ar@dumont.fr', 'fullname': b'tony ', }, 'committer_date': { 'timestamp': 1127351742, 'offset': 0, 'negative_utc': False, }, 'parents': [], 'type': 'git', 'directory': self.dir2['id'], 'metadata': None, 'synthetic': True } self.revision4 = { 'id': hash_to_bytes('368a48fe15b7db2383775f97c6b247011b3f14f4'), 'message': b'parent of self.revision2', 'author': { 'name': b'me', 'email': b'me@soft.heri', 'fullname': b'me ', }, 'date': { 'timestamp': { 'seconds': 1244567843, 'microseconds': 220000, }, 'offset': -720, 'negative_utc': None, }, 'committer': { 'name': b'committer-dude', 'email': b'committer@dude.com', 'fullname': b'committer-dude ', }, 'committer_date': { 'timestamp': { 'seconds': 1244567843, 'microseconds': 220000, }, 'offset': -720, 'negative_utc': None, }, 'parents': [self.revision3['id']], 'type': 'git', 'directory': self.dir['id'], 'metadata': None, 'synthetic': False } self.origin = { 'url': 'file:///dev/null', 'type': 'git', } self.origin2 = { 'url': 'file:///dev/zero', 'type': 'git', } self.provider = { 'name': 'hal', 'type': 'deposit-client', 'url': 'http:///hal/inria', 'metadata': { 'location': 'France' } } self.metadata_tool = { 'name': 'swh-deposit', 'version': '0.0.1', 'configuration': { 'sword_version': '2' } } self.origin_metadata = { 'origin': self.origin, 'discovery_date': datetime.datetime(2015, 1, 1, 23, 0, 0, tzinfo=datetime.timezone.utc), 'provider': self.provider, 'tool': 'swh-deposit', 'metadata': { 'name': 'test_origin_metadata', 'version': '0.0.1' } } self.origin_metadata2 = { 'origin': self.origin, 'discovery_date': datetime.datetime(2017, 1, 1, 23, 0, 0, tzinfo=datetime.timezone.utc), 'provider': self.provider, 'tool': 'swh-deposit', 'metadata': { 'name': 'test_origin_metadata', 'version': '0.0.1' } } self.date_visit1 = datetime.datetime(2015, 1, 1, 23, 0, 0, tzinfo=datetime.timezone.utc) self.occurrence = { 'branch': b'master', 'target': self.revision['id'], 'target_type': 'revision', } self.date_visit2 = datetime.datetime(2017, 1, 1, 23, 0, 0, tzinfo=datetime.timezone.utc) self.occurrence2 = { 'branch': b'master', 'target': self.revision2['id'], 'target_type': 'revision', } self.date_visit3 = datetime.datetime(2018, 1, 1, 23, 0, 0, tzinfo=datetime.timezone.utc) # template occurrence to be filled in test (cf. revision_log_by) self.occurrence3 = { 'branch': b'master', 'target_type': 'revision', } self.release = { 'id': b'87659012345678901234', 'name': b'v0.0.1', 'author': { 'name': b'olasd', 'email': b'nic@olasd.fr', 'fullname': b'olasd ', }, 'date': { 'timestamp': 1234567890, 'offset': 42, 'negative_utc': None, }, 'target': b'43210987654321098765', 'target_type': 'revision', 'message': b'synthetic release', 'synthetic': True, } self.release2 = { 'id': b'56789012348765901234', 'name': b'v0.0.2', 'author': { 'name': b'tony', 'email': b'ar@dumont.fr', 'fullname': b'tony ', }, 'date': { 'timestamp': 1634366813, 'offset': -120, 'negative_utc': None, }, 'target': b'432109\xa9765432\xc309\x00765', 'target_type': 'revision', 'message': b'v0.0.2\nMisc performance improvements + bug fixes', 'synthetic': False } self.release3 = { 'id': b'87659012345678904321', 'name': b'v0.0.2', 'author': { 'name': b'tony', 'email': b'tony@ardumont.fr', 'fullname': b'tony ', }, 'date': { 'timestamp': 1634336813, 'offset': 0, 'negative_utc': False, }, 'target': self.revision2['id'], 'target_type': 'revision', 'message': b'yet another synthetic release', 'synthetic': True, } self.fetch_history_date = datetime.datetime( 2015, 1, 2, 21, 0, 0, tzinfo=datetime.timezone.utc) self.fetch_history_end = datetime.datetime( 2015, 1, 2, 23, 0, 0, tzinfo=datetime.timezone.utc) self.fetch_history_duration = (self.fetch_history_end - self.fetch_history_date) self.fetch_history_data = { 'status': True, 'result': {'foo': 'bar'}, 'stdout': 'blabla', 'stderr': 'blablabla', } self.snapshot = { 'id': hash_to_bytes('2498dbf535f882bc7f9a18fb16c9ad27fda7bab7'), 'branches': { self.occurrence['branch']: { 'target': self.occurrence['target'], 'target_type': self.occurrence['target_type'], }, }, 'next_branch': None } self.empty_snapshot = { 'id': hash_to_bytes('1a8893e6a86f444e8be8e7bda6cb34fb1735a00e'), 'branches': {}, 'next_branch': None } self.complete_snapshot = { 'id': hash_to_bytes('6e65b86363953b780d92b0a928f3e8fcdd10db36'), 'branches': { b'directory': { 'target': hash_to_bytes( '1bd0e65f7d2ff14ae994de17a1e7fe65111dcad8'), 'target_type': 'directory', }, b'content': { 'target': hash_to_bytes( 'fe95a46679d128ff167b7c55df5d02356c5a1ae1'), 'target_type': 'content', }, b'alias': { 'target': b'revision', 'target_type': 'alias', }, b'revision': { 'target': hash_to_bytes( 'aafb16d69fd30ff58afdd69036a26047f3aebdc6'), 'target_type': 'revision', }, b'release': { 'target': hash_to_bytes( '7045404f3d1c54e6473c71bbb716529fbad4be24'), 'target_type': 'release', }, b'snapshot': { 'target': hash_to_bytes( '1a8893e6a86f444e8be8e7bda6cb34fb1735a00e'), 'target_type': 'snapshot', }, b'dangling': None, }, 'next_branch': None } def tearDown(self): self.reset_storage_tables() super().tearDown() class CommonTestStorage(BaseTestStorage): """Base class for Storage testing. This class is used as-is to test local storage (see TestLocalStorage below) and remote storage (see TestRemoteStorage in test_remote_storage.py. We need to have the two classes inherit from this base class separately to avoid nosetests running the tests from the base class twice. """ @staticmethod def normalize_entity(entity): entity = copy.deepcopy(entity) for key in ('date', 'committer_date'): if key in entity: entity[key] = identifiers.normalize_timestamp(entity[key]) return entity - @istest - def check_config(self): + def test_check_config(self): self.assertTrue(self.storage.check_config(check_write=True)) self.assertTrue(self.storage.check_config(check_write=False)) - @istest - def content_add(self): + def test_content_add(self): cont = self.cont self.storage.content_add([cont]) if hasattr(self.storage, 'objstorage'): self.assertIn(cont['sha1'], self.storage.objstorage) self.cursor.execute('SELECT sha1, sha1_git, sha256, length, status' ' FROM content WHERE sha1 = %s', (cont['sha1'],)) datum = self.cursor.fetchone() self.assertEqual( (datum[0].tobytes(), datum[1].tobytes(), datum[2].tobytes(), datum[3], datum[4]), (cont['sha1'], cont['sha1_git'], cont['sha256'], cont['length'], 'visible')) - @istest - def content_add_collision(self): + def test_content_add_collision(self): cont1 = self.cont # create (corrupted) content with same sha1{,_git} but != sha256 cont1b = cont1.copy() sha256_array = bytearray(cont1b['sha256']) sha256_array[0] += 1 cont1b['sha256'] = bytes(sha256_array) with self.assertRaises(psycopg2.IntegrityError): self.storage.content_add([cont1, cont1b]) - @istest - def skipped_content_add(self): + def test_skipped_content_add(self): cont = self.skipped_cont.copy() cont2 = self.skipped_cont2.copy() cont2['blake2s256'] = None self.storage.content_add([cont, cont, cont2]) self.cursor.execute('SELECT sha1, sha1_git, sha256, blake2s256, ' 'length, status, reason ' 'FROM skipped_content ORDER BY sha1_git') datums = self.cursor.fetchall() self.assertEquals(2, len(datums)) datum = datums[0] self.assertEqual( (datum[0].tobytes(), datum[1].tobytes(), datum[2].tobytes(), datum[3].tobytes(), datum[4], datum[5], datum[6]), (cont['sha1'], cont['sha1_git'], cont['sha256'], cont['blake2s256'], cont['length'], 'absent', 'Content too long') ) datum2 = datums[1] self.assertEqual( (datum2[0].tobytes(), datum2[1].tobytes(), datum2[2].tobytes(), datum2[3], datum2[4], datum2[5], datum2[6]), (cont2['sha1'], cont2['sha1_git'], cont2['sha256'], cont2['blake2s256'], cont2['length'], 'absent', 'Content too long') ) - @istest - def content_missing(self): + def test_content_missing(self): cont2 = self.cont2 missing_cont = self.missing_cont self.storage.content_add([cont2]) test_contents = [cont2] missing_per_hash = defaultdict(list) for i in range(256): test_content = missing_cont.copy() for hash in ['sha1', 'sha256', 'sha1_git', 'blake2s256']: test_content[hash] = bytes([i]) + test_content[hash][1:] missing_per_hash[hash].append(test_content[hash]) test_contents.append(test_content) self.assertCountEqual( self.storage.content_missing(test_contents), missing_per_hash['sha1'] ) for hash in ['sha1', 'sha256', 'sha1_git', 'blake2s256']: self.assertCountEqual( self.storage.content_missing(test_contents, key_hash=hash), missing_per_hash[hash] ) - @istest - def content_missing_per_sha1(self): + def test_content_missing_per_sha1(self): # given cont2 = self.cont2 missing_cont = self.missing_cont self.storage.content_add([cont2]) # when gen = self.storage.content_missing_per_sha1([cont2['sha1'], missing_cont['sha1']]) # then self.assertEqual(list(gen), [missing_cont['sha1']]) - @istest - def content_get_metadata(self): + def test_content_get_metadata(self): cont1 = self.cont.copy() cont2 = self.cont2.copy() self.storage.content_add([cont1, cont2]) gen = self.storage.content_get_metadata([cont1['sha1'], cont2['sha1']]) # we only retrieve the metadata cont1.pop('data') cont2.pop('data') self.assertCountEqual(list(gen), [cont1, cont2]) - @istest - def content_get_metadata_missing_sha1(self): + def test_content_get_metadata_missing_sha1(self): cont1 = self.cont.copy() cont2 = self.cont2.copy() missing_cont = self.missing_cont.copy() self.storage.content_add([cont1, cont2]) gen = self.storage.content_get_metadata([missing_cont['sha1']]) # All the metadata keys are None missing_cont.pop('data') for key in list(missing_cont): if key != 'sha1': missing_cont[key] = None self.assertEqual(list(gen), [missing_cont]) - @istest - def directory_add(self): + def test_directory_add(self): init_missing = list(self.storage.directory_missing([self.dir['id']])) self.assertEqual([self.dir['id']], init_missing) self.storage.directory_add([self.dir]) stored_data = list(self.storage.directory_ls(self.dir['id'])) data_to_store = [] for ent in sorted(self.dir['entries'], key=itemgetter('name')): data_to_store.append({ 'dir_id': self.dir['id'], 'type': ent['type'], 'target': ent['target'], 'name': ent['name'], 'perms': ent['perms'], 'status': None, 'sha1': None, 'sha1_git': None, 'sha256': None, 'length': None, }) self.assertEqual(data_to_store, stored_data) after_missing = list(self.storage.directory_missing([self.dir['id']])) self.assertEqual([], after_missing) - @istest - def directory_entry_get_by_path(self): + def test_directory_entry_get_by_path(self): # given init_missing = list(self.storage.directory_missing([self.dir3['id']])) self.assertEqual([self.dir3['id']], init_missing) self.storage.directory_add([self.dir3]) expected_entries = [ { 'dir_id': self.dir3['id'], 'name': b'foo', 'type': 'file', 'target': self.cont['sha1_git'], 'sha1': None, 'sha1_git': None, 'sha256': None, 'status': None, 'perms': from_disk.DentryPerms.content, 'length': None, }, { 'dir_id': self.dir3['id'], 'name': b'bar', 'type': 'dir', 'target': b'12345678901234560000', 'sha1': None, 'sha1_git': None, 'sha256': None, 'status': None, 'perms': from_disk.DentryPerms.directory, 'length': None, }, { 'dir_id': self.dir3['id'], 'name': b'hello', 'type': 'file', 'target': b'12345678901234567890', 'sha1': None, 'sha1_git': None, 'sha256': None, 'status': None, 'perms': from_disk.DentryPerms.content, 'length': None, }, ] # when (all must be found here) for entry, expected_entry in zip(self.dir3['entries'], expected_entries): actual_entry = self.storage.directory_entry_get_by_path( self.dir3['id'], [entry['name']]) self.assertEqual(actual_entry, expected_entry) # when (nothing should be found here since self.dir is not persisted.) for entry in self.dir['entries']: actual_entry = self.storage.directory_entry_get_by_path( self.dir['id'], [entry['name']]) self.assertIsNone(actual_entry) - @istest - def revision_add(self): + def test_revision_add(self): init_missing = self.storage.revision_missing([self.revision['id']]) self.assertEqual([self.revision['id']], list(init_missing)) self.storage.revision_add([self.revision]) end_missing = self.storage.revision_missing([self.revision['id']]) self.assertEqual([], list(end_missing)) - @istest - def revision_log(self): + def test_revision_log(self): # given # self.revision4 -is-child-of-> self.revision3 self.storage.revision_add([self.revision3, self.revision4]) # when actual_results = list(self.storage.revision_log( [self.revision4['id']])) # hack: ids generated for actual_result in actual_results: del actual_result['author']['id'] del actual_result['committer']['id'] self.assertEqual(len(actual_results), 2) # rev4 -child-> rev3 self.assertEquals(actual_results[0], self.normalize_entity(self.revision4)) self.assertEquals(actual_results[1], self.normalize_entity(self.revision3)) - @istest - def revision_log_with_limit(self): + def test_revision_log_with_limit(self): # given # self.revision4 -is-child-of-> self.revision3 self.storage.revision_add([self.revision3, self.revision4]) actual_results = list(self.storage.revision_log( [self.revision4['id']], 1)) # hack: ids generated for actual_result in actual_results: del actual_result['author']['id'] del actual_result['committer']['id'] self.assertEqual(len(actual_results), 1) self.assertEquals(actual_results[0], self.revision4) - @istest - def revision_log_by(self): + def test_revision_log_by(self): # given origin_id = self.storage.origin_add_one(self.origin2) self.storage.revision_add([self.revision3, self.revision4]) # occurrence3 targets 'revision4' # with branch 'master' and origin origin_id occurrence3 = self.occurrence3.copy() date_visit1 = self.date_visit3 origin_visit1 = self.storage.origin_visit_add(origin_id, date_visit1) occurrence3.update({ 'origin': origin_id, 'target': self.revision4['id'], 'visit': origin_visit1['visit'], }) self.storage.occurrence_add([occurrence3]) # self.revision4 -is-child-of-> self.revision3 # when actual_results = list(self.storage.revision_log_by( origin_id, branch_name=occurrence3['branch'], timestamp=date_visit1)) # hack: ids generated for actual_result in actual_results: del actual_result['author']['id'] del actual_result['committer']['id'] self.assertEqual(len(actual_results), 2) self.assertEquals(actual_results[0], self.normalize_entity(self.revision4)) self.assertEquals(actual_results[1], self.normalize_entity(self.revision3)) # when - 2 actual_results = list(self.storage.revision_log_by( origin_id, branch_name=None, timestamp=None, limit=1)) # then for actual_result in actual_results: del actual_result['author']['id'] del actual_result['committer']['id'] self.assertEqual(len(actual_results), 1) self.assertEquals(actual_results[0], self.revision4) # when - 3 (revision not found) actual_res = list(self.storage.revision_log_by( origin_id, branch_name='inexistant-branch', timestamp=None)) self.assertEquals(actual_res, []) @staticmethod def _short_revision(revision): return [revision['id'], revision['parents']] - @istest - def revision_shortlog(self): + def test_revision_shortlog(self): # given # self.revision4 -is-child-of-> self.revision3 self.storage.revision_add([self.revision3, self.revision4]) # when actual_results = list(self.storage.revision_shortlog( [self.revision4['id']])) self.assertEqual(len(actual_results), 2) # rev4 -child-> rev3 self.assertEquals(list(actual_results[0]), self._short_revision(self.revision4)) self.assertEquals(list(actual_results[1]), self._short_revision(self.revision3)) - @istest - def revision_shortlog_with_limit(self): + def test_revision_shortlog_with_limit(self): # given # self.revision4 -is-child-of-> self.revision3 self.storage.revision_add([self.revision3, self.revision4]) actual_results = list(self.storage.revision_shortlog( [self.revision4['id']], 1)) self.assertEqual(len(actual_results), 1) self.assertEquals(list(actual_results[0]), self._short_revision(self.revision4)) - @istest - def revision_get(self): + def test_revision_get(self): self.storage.revision_add([self.revision]) actual_revisions = list(self.storage.revision_get( [self.revision['id'], self.revision2['id']])) # when del actual_revisions[0]['author']['id'] # hack: ids are generated del actual_revisions[0]['committer']['id'] self.assertEqual(len(actual_revisions), 2) self.assertEqual(actual_revisions[0], self.normalize_entity(self.revision)) self.assertIsNone(actual_revisions[1]) - @istest - def revision_get_no_parents(self): + def test_revision_get_no_parents(self): self.storage.revision_add([self.revision3]) get = list(self.storage.revision_get([self.revision3['id']])) self.assertEqual(len(get), 1) self.assertEqual(get[0]['parents'], []) # no parents on this one - @istest - def revision_get_by(self): + def test_revision_get_by(self): # given self.storage.content_add([self.cont2]) self.storage.directory_add([self.dir2]) # point to self.cont self.storage.revision_add([self.revision2]) # points to self.dir origin_id = self.storage.origin_add_one(self.origin2) # occurrence2 points to 'revision2' with branch 'master', we # need to point to the right origin occurrence2 = self.occurrence2.copy() date_visit1 = self.date_visit2 origin_visit1 = self.storage.origin_visit_add(origin_id, date_visit1) occurrence2.update({ 'origin': origin_id, 'visit': origin_visit1['visit'], }) self.storage.occurrence_add([occurrence2]) # we want only revision 2 expected_revisions = list(self.storage.revision_get( [self.revision2['id']])) # when actual_results = list(self.storage.revision_get_by( origin_id, occurrence2['branch'], None)) self.assertEqual(actual_results[0], expected_revisions[0]) # when (with no branch filtering, it's still ok) actual_results = list(self.storage.revision_get_by( origin_id, None, None)) self.assertEqual(actual_results[0], expected_revisions[0]) - @istest - def revision_get_by_multiple_occurrence(self): + def test_revision_get_by_multiple_occurrence(self): # 2 occurrences pointing to 2 different revisions # each occurrence have 1 day delta # the api must return the revision whose occurrence is the nearest. # given self.storage.content_add([self.cont2]) self.storage.directory_add([self.dir2]) self.storage.revision_add([self.revision2, self.revision3]) origin_id = self.storage.origin_add_one(self.origin2) # occurrence2 points to 'revision2' with branch 'master', we # need to point to the right origin date_visit1 = self.date_visit2 origin_visit1 = self.storage.origin_visit_add(origin_id, date_visit1) occurrence2 = self.occurrence2.copy() occurrence2.update({ 'origin': origin_id, 'visit': origin_visit1['visit'] }) dt = datetime.timedelta(days=1) date_visit2 = date_visit1 + dt origin_visit2 = self.storage.origin_visit_add(origin_id, date_visit2) occurrence3 = self.occurrence2.copy() occurrence3.update({ 'origin': origin_id, 'visit': origin_visit2['visit'], 'target': self.revision3['id'], }) # 2 occurrences on same revision with lower validity date with 1 day # delta self.storage.occurrence_add([occurrence2]) self.storage.occurrence_add([occurrence3]) # when actual_results0 = list(self.storage.revision_get_by( origin_id, occurrence2['branch'], date_visit1)) # hack: ids are generated del actual_results0[0]['author']['id'] del actual_results0[0]['committer']['id'] self.assertEquals(len(actual_results0), 1) self.assertEqual(actual_results0, [self.normalize_entity(self.revision2)]) # when actual_results1 = list(self.storage.revision_get_by( origin_id, occurrence2['branch'], date_visit1 + dt/3)) # closer to first visit # hack: ids are generated del actual_results1[0]['author']['id'] del actual_results1[0]['committer']['id'] self.assertEquals(len(actual_results1), 1) self.assertEqual(actual_results1, [self.normalize_entity(self.revision2)]) # when actual_results2 = list(self.storage.revision_get_by( origin_id, occurrence2['branch'], date_visit1 + 2*dt/3)) # closer to second visit del actual_results2[0]['author']['id'] del actual_results2[0]['committer']['id'] self.assertEquals(len(actual_results2), 1) self.assertEqual(actual_results2, [self.normalize_entity(self.revision3)]) # when actual_results3 = list(self.storage.revision_get_by( origin_id, occurrence3['branch'], date_visit2)) # hack: ids are generated del actual_results3[0]['author']['id'] del actual_results3[0]['committer']['id'] self.assertEquals(len(actual_results3), 1) self.assertEqual(actual_results3, [self.normalize_entity(self.revision3)]) # when actual_results4 = list(self.storage.revision_get_by( origin_id, None, None)) for actual_result in actual_results4: del actual_result['author']['id'] del actual_result['committer']['id'] self.assertEquals(len(actual_results4), 1) self.assertCountEqual(actual_results4, [self.normalize_entity(self.revision3)]) - @istest - def release_add(self): + def test_release_add(self): init_missing = self.storage.release_missing([self.release['id'], self.release2['id']]) self.assertEqual([self.release['id'], self.release2['id']], list(init_missing)) self.storage.release_add([self.release, self.release2]) end_missing = self.storage.release_missing([self.release['id'], self.release2['id']]) self.assertEqual([], list(end_missing)) - @istest - def release_get(self): + def test_release_get(self): # given self.storage.release_add([self.release, self.release2]) # when actual_releases = list(self.storage.release_get([self.release['id'], self.release2['id']])) # then for actual_release in actual_releases: del actual_release['author']['id'] # hack: ids are generated self.assertEquals([self.normalize_entity(self.release), self.normalize_entity(self.release2)], [actual_releases[0], actual_releases[1]]) - @istest - def origin_add_one(self): + def test_origin_add_one(self): origin0 = self.storage.origin_get(self.origin) self.assertIsNone(origin0) id = self.storage.origin_add_one(self.origin) actual_origin = self.storage.origin_get({'url': self.origin['url'], 'type': self.origin['type']}) self.assertEqual(actual_origin['id'], id) id2 = self.storage.origin_add_one(self.origin) self.assertEqual(id, id2) - @istest - def origin_add(self): + def test_origin_add(self): origin0 = self.storage.origin_get(self.origin) self.assertIsNone(origin0) origin1, origin2 = self.storage.origin_add([self.origin, self.origin2]) actual_origin = self.storage.origin_get({ 'url': self.origin['url'], 'type': self.origin['type'], }) self.assertEqual(actual_origin['id'], origin1['id']) actual_origin2 = self.storage.origin_get({ 'url': self.origin2['url'], 'type': self.origin2['type'], }) self.assertEqual(actual_origin2['id'], origin2['id']) - @istest - def origin_add_twice(self): + def test_origin_add_twice(self): add1 = self.storage.origin_add([self.origin, self.origin2]) add2 = self.storage.origin_add([self.origin, self.origin2]) self.assertEqual(add1, add2) - @istest - def origin_get(self): + def test_origin_get(self): self.assertIsNone(self.storage.origin_get(self.origin)) id = self.storage.origin_add_one(self.origin) # lookup per type and url (returns id) actual_origin0 = self.storage.origin_get({'url': self.origin['url'], 'type': self.origin['type']}) self.assertEqual(actual_origin0['id'], id) # lookup per id (returns dict) actual_origin1 = self.storage.origin_get({'id': id}) self.assertEqual(actual_origin1, {'id': id, 'type': self.origin['type'], 'url': self.origin['url']}) - @istest - def origin_search(self): + def test_origin_search(self): found_origins = list(self.storage.origin_search(self.origin['url'])) self.assertEqual(len(found_origins), 0) found_origins = list(self.storage.origin_search(self.origin['url'], regexp=True)) self.assertEqual(len(found_origins), 0) id = self.storage.origin_add_one(self.origin) origin_data = {'id': id, 'type': self.origin['type'], 'url': self.origin['url']} found_origins = list(self.storage.origin_search(self.origin['url'])) self.assertEqual(len(found_origins), 1) self.assertEqual(found_origins[0], origin_data) found_origins = list(self.storage.origin_search( '.' + self.origin['url'][1:-1] + '.', regexp=True)) self.assertEqual(len(found_origins), 1) self.assertEqual(found_origins[0], origin_data) id2 = self.storage.origin_add_one(self.origin2) origin2_data = {'id': id2, 'type': self.origin2['type'], 'url': self.origin2['url']} found_origins = list(self.storage.origin_search(self.origin2['url'])) self.assertEqual(len(found_origins), 1) self.assertEqual(found_origins[0], origin2_data) found_origins = list(self.storage.origin_search( '.' + self.origin2['url'][1:-1] + '.', regexp=True)) self.assertEqual(len(found_origins), 1) self.assertEqual(found_origins[0], origin2_data) found_origins = list(self.storage.origin_search('/')) self.assertEqual(len(found_origins), 2) found_origins = list(self.storage.origin_search('.*/.*', regexp=True)) self.assertEqual(len(found_origins), 2) found_origins = list(self.storage.origin_search('/', offset=0, limit=1)) # noqa self.assertEqual(len(found_origins), 1) self.assertEqual(found_origins[0], origin_data) found_origins = list(self.storage.origin_search('.*/.*', offset=0, limit=1, regexp=True)) # noqa self.assertEqual(len(found_origins), 1) self.assertEqual(found_origins[0], origin_data) found_origins = list(self.storage.origin_search('/', offset=1, limit=1)) # noqa self.assertEqual(len(found_origins), 1) self.assertEqual(found_origins[0], origin2_data) found_origins = list(self.storage.origin_search('.*/.*', offset=1, limit=1, regexp=True)) # noqa self.assertEqual(len(found_origins), 1) self.assertEqual(found_origins[0], origin2_data) - @istest - def origin_visit_add(self): + def test_origin_visit_add(self): # given self.assertIsNone(self.storage.origin_get(self.origin2)) origin_id = self.storage.origin_add_one(self.origin2) self.assertIsNotNone(origin_id) # when origin_visit1 = self.storage.origin_visit_add( origin_id, ts=self.date_visit2) # then self.assertEquals(origin_visit1['origin'], origin_id) self.assertIsNotNone(origin_visit1['visit']) self.assertTrue(origin_visit1['visit'] > 0) actual_origin_visits = list(self.storage.origin_visit_get(origin_id)) self.assertEquals(actual_origin_visits, [{ 'origin': origin_id, 'date': self.date_visit2, 'visit': origin_visit1['visit'], 'status': 'ongoing', 'metadata': None, 'snapshot': None, }]) - @istest - def origin_visit_update(self): + def test_origin_visit_update(self): # given origin_id = self.storage.origin_add_one(self.origin2) origin_id2 = self.storage.origin_add_one(self.origin) origin_visit1 = self.storage.origin_visit_add( origin_id, ts=self.date_visit2) origin_visit2 = self.storage.origin_visit_add( origin_id, ts=self.date_visit3) origin_visit3 = self.storage.origin_visit_add( origin_id2, ts=self.date_visit3) # when visit1_metadata = { 'contents': 42, 'directories': 22, } self.storage.origin_visit_update( origin_id, origin_visit1['visit'], status='full', metadata=visit1_metadata) self.storage.origin_visit_update(origin_id2, origin_visit3['visit'], status='partial') # then actual_origin_visits = list(self.storage.origin_visit_get(origin_id)) self.assertEquals(actual_origin_visits, [{ 'origin': origin_visit2['origin'], 'date': self.date_visit2, 'visit': origin_visit1['visit'], 'status': 'full', 'metadata': visit1_metadata, 'snapshot': None, }, { 'origin': origin_visit2['origin'], 'date': self.date_visit3, 'visit': origin_visit2['visit'], 'status': 'ongoing', 'metadata': None, 'snapshot': None, }]) actual_origin_visits_bis = list(self.storage.origin_visit_get( origin_id, limit=1)) self.assertEquals(actual_origin_visits_bis, [{ 'origin': origin_visit2['origin'], 'date': self.date_visit2, 'visit': origin_visit1['visit'], 'status': 'full', 'metadata': visit1_metadata, 'snapshot': None, }]) actual_origin_visits_ter = list(self.storage.origin_visit_get( origin_id, last_visit=origin_visit1['visit'])) self.assertEquals(actual_origin_visits_ter, [{ 'origin': origin_visit2['origin'], 'date': self.date_visit3, 'visit': origin_visit2['visit'], 'status': 'ongoing', 'metadata': None, 'snapshot': None, }]) actual_origin_visits2 = list(self.storage.origin_visit_get(origin_id2)) self.assertEquals(actual_origin_visits2, [{ 'origin': origin_visit3['origin'], 'date': self.date_visit3, 'visit': origin_visit3['visit'], 'status': 'partial', 'metadata': None, 'snapshot': None, }]) - @istest - def origin_visit_get_by(self): + def test_origin_visit_get_by(self): origin_id = self.storage.origin_add_one(self.origin2) origin_id2 = self.storage.origin_add_one(self.origin) origin_visit1 = self.storage.origin_visit_add( origin_id, ts=self.date_visit2) self.storage.snapshot_add(origin_id, origin_visit1['visit'], self.snapshot) # Add some other {origin, visit} entries self.storage.origin_visit_add(origin_id, ts=self.date_visit3) self.storage.origin_visit_add(origin_id2, ts=self.date_visit3) # when visit1_metadata = { 'contents': 42, 'directories': 22, } self.storage.origin_visit_update( origin_id, origin_visit1['visit'], status='full', metadata=visit1_metadata) expected_origin_visit = origin_visit1.copy() expected_origin_visit.update({ 'origin': origin_id, 'visit': origin_visit1['visit'], 'date': self.date_visit2, 'metadata': visit1_metadata, 'status': 'full', 'snapshot': self.snapshot['id'], }) # when actual_origin_visit1 = self.storage.origin_visit_get_by( origin_visit1['origin'], origin_visit1['visit']) # then self.assertEquals(actual_origin_visit1, expected_origin_visit) - @istest - def origin_visit_get_by_no_result(self): + def test_origin_visit_get_by_no_result(self): # No result actual_origin_visit = self.storage.origin_visit_get_by( 10, 999) self.assertIsNone(actual_origin_visit) - @istest - def occurrence_add(self): + def test_occurrence_add(self): occur = self.occurrence.copy() origin_id = self.storage.origin_add_one(self.origin2) date_visit1 = self.date_visit1 origin_visit1 = self.storage.origin_visit_add(origin_id, date_visit1) revision = self.revision.copy() revision['id'] = occur['target'] self.storage.revision_add([revision]) occur.update({ 'origin': origin_id, 'visit': origin_visit1['visit'], }) self.storage.occurrence_add([occur]) test_query = ''' with indiv_occurrences as ( select origin, branch, target, target_type, unnest(visits) as visit from occurrence_history ) select origin, branch, target, target_type, date from indiv_occurrences left join origin_visit using(origin, visit) order by origin, date''' self.cursor.execute(test_query) ret = self.cursor.fetchall() self.assertEqual(len(ret), 1) self.assertEqual( (ret[0][0], ret[0][1].tobytes(), ret[0][2].tobytes(), ret[0][3], ret[0][4]), (occur['origin'], occur['branch'], occur['target'], occur['target_type'], self.date_visit1)) date_visit2 = date_visit1 + datetime.timedelta(hours=10) origin_visit2 = self.storage.origin_visit_add(origin_id, date_visit2) occur2 = occur.copy() occur2.update({ 'visit': origin_visit2['visit'], }) self.storage.occurrence_add([occur2]) self.cursor.execute(test_query) ret = self.cursor.fetchall() self.assertEqual(len(ret), 2) self.assertEqual( (ret[0][0], ret[0][1].tobytes(), ret[0][2].tobytes(), ret[0][3], ret[0][4]), (occur['origin'], occur['branch'], occur['target'], occur['target_type'], date_visit1)) self.assertEqual( (ret[1][0], ret[1][1].tobytes(), ret[1][2].tobytes(), ret[1][3], ret[1][4]), (occur2['origin'], occur2['branch'], occur2['target'], occur2['target_type'], date_visit2)) - @istest - def snapshot_add_get_empty(self): + def test_snapshot_add_get_empty(self): origin_id = self.storage.origin_add_one(self.origin) origin_visit1 = self.storage.origin_visit_add(origin_id, self.date_visit1) visit_id = origin_visit1['visit'] self.storage.snapshot_add(origin_id, visit_id, self.empty_snapshot) by_id = self.storage.snapshot_get(self.empty_snapshot['id']) self.assertEqual(by_id, self.empty_snapshot) by_ov = self.storage.snapshot_get_by_origin_visit(origin_id, visit_id) self.assertEqual(by_ov, self.empty_snapshot) - @istest - def snapshot_add_get_complete(self): + def test_snapshot_add_get_complete(self): origin_id = self.storage.origin_add_one(self.origin) origin_visit1 = self.storage.origin_visit_add(origin_id, self.date_visit1) visit_id = origin_visit1['visit'] self.storage.snapshot_add(origin_id, visit_id, self.complete_snapshot) by_id = self.storage.snapshot_get(self.complete_snapshot['id']) self.assertEqual(by_id, self.complete_snapshot) by_ov = self.storage.snapshot_get_by_origin_visit(origin_id, visit_id) self.assertEqual(by_ov, self.complete_snapshot) - @istest - def snapshot_add_count_branches(self): + def test_snapshot_add_count_branches(self): origin_id = self.storage.origin_add_one(self.origin) origin_visit1 = self.storage.origin_visit_add(origin_id, self.date_visit1) visit_id = origin_visit1['visit'] self.storage.snapshot_add(origin_id, visit_id, self.complete_snapshot) snp_id = self.complete_snapshot['id'] snp_size = self.storage.snapshot_count_branches(snp_id) expected_snp_size = { 'alias': 1, 'content': 1, 'directory': 1, 'release': 1, 'revision': 1, 'snapshot': 1, None: 1 } self.assertEqual(snp_size, expected_snp_size) - @istest - def snapshot_add_get_paginated(self): + def test_snapshot_add_get_paginated(self): origin_id = self.storage.origin_add_one(self.origin) origin_visit1 = self.storage.origin_visit_add(origin_id, self.date_visit1) visit_id = origin_visit1['visit'] self.storage.snapshot_add(origin_id, visit_id, self.complete_snapshot) snp_id = self.complete_snapshot['id'] snapshot = self.storage.snapshot_get_branches(snp_id, branches_from=b'release') expected_snapshot = copy.deepcopy(self.complete_snapshot) del expected_snapshot['next_branch'] for name in [b'alias', b'content', b'dangling', b'directory']: del expected_snapshot['branches'][name] self.assertEqual(snapshot, expected_snapshot) snapshot = self.storage.snapshot_get_branches(snp_id, branches_count=1) expected_snapshot = copy.deepcopy(self.complete_snapshot) del expected_snapshot['next_branch'] for name in [b'content', b'dangling', b'directory', b'release', b'revision', b'snapshot']: del expected_snapshot['branches'][name] self.assertEqual(snapshot, expected_snapshot) snapshot = self.storage.snapshot_get_branches( snp_id, branches_from=b'directory', branches_count=3) expected_snapshot = copy.deepcopy(self.complete_snapshot) del expected_snapshot['next_branch'] for name in [b'alias', b'content', b'dangling', b'snapshot']: del expected_snapshot['branches'][name] self.assertEqual(snapshot, expected_snapshot) - @istest - def snapshot_add_get_filtered(self): + def test_snapshot_add_get_filtered(self): origin_id = self.storage.origin_add_one(self.origin) origin_visit1 = self.storage.origin_visit_add(origin_id, self.date_visit1) visit_id = origin_visit1['visit'] self.storage.snapshot_add(origin_id, visit_id, self.complete_snapshot) snp_id = self.complete_snapshot['id'] snapshot = self.storage.snapshot_get_branches( snp_id, target_types=['release', 'revision']) expected_snapshot = copy.deepcopy(self.complete_snapshot) del expected_snapshot['next_branch'] for name in [b'alias', b'content', b'dangling', b'directory', b'snapshot']: del expected_snapshot['branches'][name] self.assertEqual(snapshot, expected_snapshot) snapshot = self.storage.snapshot_get_branches(snp_id, target_types=['alias']) expected_snapshot = copy.deepcopy(self.complete_snapshot) del expected_snapshot['next_branch'] for name in [b'content', b'dangling', b'directory', b'release', b'revision', b'snapshot']: del expected_snapshot['branches'][name] self.assertEqual(snapshot, expected_snapshot) - @istest - def snapshot_add_get(self): + def test_snapshot_add_get(self): origin_id = self.storage.origin_add_one(self.origin) origin_visit1 = self.storage.origin_visit_add(origin_id, self.date_visit1) visit_id = origin_visit1['visit'] self.storage.snapshot_add(origin_id, visit_id, self.snapshot) by_id = self.storage.snapshot_get(self.snapshot['id']) self.assertEqual(by_id, self.snapshot) by_ov = self.storage.snapshot_get_by_origin_visit(origin_id, visit_id) self.assertEqual(by_ov, self.snapshot) origin_visit_info = self.storage.origin_visit_get_by(origin_id, visit_id) self.assertEqual(origin_visit_info['snapshot'], self.snapshot['id']) - @istest - def snapshot_add_twice(self): + def test_snapshot_add_twice(self): origin_id = self.storage.origin_add_one(self.origin) origin_visit1 = self.storage.origin_visit_add(origin_id, self.date_visit1) visit1_id = origin_visit1['visit'] self.storage.snapshot_add(origin_id, visit1_id, self.snapshot) by_ov1 = self.storage.snapshot_get_by_origin_visit(origin_id, visit1_id) self.assertEqual(by_ov1, self.snapshot) origin_visit2 = self.storage.origin_visit_add(origin_id, self.date_visit2) visit2_id = origin_visit2['visit'] self.storage.snapshot_add(origin_id, visit2_id, self.snapshot) by_ov2 = self.storage.snapshot_get_by_origin_visit(origin_id, visit2_id) self.assertEqual(by_ov2, self.snapshot) - @istest - def snapshot_get_nonexistent(self): + def test_snapshot_get_nonexistent(self): bogus_snapshot_id = b'bogus snapshot id 00' bogus_origin_id = 1 bogus_visit_id = 1 by_id = self.storage.snapshot_get(bogus_snapshot_id) self.assertIsNone(by_id) by_ov = self.storage.snapshot_get_by_origin_visit(bogus_origin_id, bogus_visit_id) self.assertIsNone(by_ov) - @istest - def snapshot_get_latest(self): + def test_snapshot_get_latest(self): origin_id = self.storage.origin_add_one(self.origin) origin_visit1 = self.storage.origin_visit_add(origin_id, self.date_visit1) visit1_id = origin_visit1['visit'] origin_visit2 = self.storage.origin_visit_add(origin_id, self.date_visit2) visit2_id = origin_visit2['visit'] # Two visits, both with no snapshot: latest snapshot is None self.assertIsNone(self.storage.snapshot_get_latest(origin_id)) # Add snapshot to visit1, latest snapshot = visit 1 snapshot self.storage.snapshot_add(origin_id, visit1_id, self.complete_snapshot) self.assertEquals(self.complete_snapshot, self.storage.snapshot_get_latest(origin_id)) # Status filter: both visits are status=ongoing, so no snapshot # returned self.assertIsNone( self.storage.snapshot_get_latest(origin_id, allowed_statuses=['full']) ) # Mark the first visit as completed and check status filter again self.storage.origin_visit_update(origin_id, visit1_id, status='full') self.assertEquals( self.complete_snapshot, self.storage.snapshot_get_latest(origin_id, allowed_statuses=['full']), ) # Add snapshot to visit2 and check that the new snapshot is returned self.storage.snapshot_add(origin_id, visit2_id, self.empty_snapshot) self.assertEquals(self.empty_snapshot, self.storage.snapshot_get_latest(origin_id)) # Check that the status filter is still working self.assertEquals( self.complete_snapshot, self.storage.snapshot_get_latest(origin_id, allowed_statuses=['full']), ) - @istest - def stat_counters(self): + def test_stat_counters(self): expected_keys = ['content', 'directory', 'directory_entry_dir', 'origin', 'person', 'revision'] for key in expected_keys: self.cursor.execute('select * from swh_update_counter(%s)', (key,)) self.conn.commit() counters = self.storage.stat_counters() self.assertTrue(set(expected_keys) <= set(counters)) self.assertIsInstance(counters[expected_keys[0]], int) - @istest - def content_find_with_present_content(self): + def test_content_find_with_present_content(self): # 1. with something to find cont = self.cont self.storage.content_add([cont]) actually_present = self.storage.content_find({'sha1': cont['sha1']}) actually_present.pop('ctime') self.assertEqual(actually_present, { 'sha1': cont['sha1'], 'sha256': cont['sha256'], 'sha1_git': cont['sha1_git'], 'blake2s256': cont['blake2s256'], 'length': cont['length'], 'status': 'visible' }) # 2. with something to find actually_present = self.storage.content_find( {'sha1_git': cont['sha1_git']}) actually_present.pop('ctime') self.assertEqual(actually_present, { 'sha1': cont['sha1'], 'sha256': cont['sha256'], 'sha1_git': cont['sha1_git'], 'blake2s256': cont['blake2s256'], 'length': cont['length'], 'status': 'visible' }) # 3. with something to find actually_present = self.storage.content_find( {'sha256': cont['sha256']}) actually_present.pop('ctime') self.assertEqual(actually_present, { 'sha1': cont['sha1'], 'sha256': cont['sha256'], 'sha1_git': cont['sha1_git'], 'blake2s256': cont['blake2s256'], 'length': cont['length'], 'status': 'visible' }) # 4. with something to find actually_present = self.storage.content_find({ 'sha1': cont['sha1'], 'sha1_git': cont['sha1_git'], 'sha256': cont['sha256'], 'blake2s256': cont['blake2s256'], }) actually_present.pop('ctime') self.assertEqual(actually_present, { 'sha1': cont['sha1'], 'sha256': cont['sha256'], 'sha1_git': cont['sha1_git'], 'blake2s256': cont['blake2s256'], 'length': cont['length'], 'status': 'visible' }) - @istest - def content_find_with_non_present_content(self): + def test_content_find_with_non_present_content(self): # 1. with something that does not exist missing_cont = self.missing_cont actually_present = self.storage.content_find( {'sha1': missing_cont['sha1']}) self.assertIsNone(actually_present) # 2. with something that does not exist actually_present = self.storage.content_find( {'sha1_git': missing_cont['sha1_git']}) self.assertIsNone(actually_present) # 3. with something that does not exist actually_present = self.storage.content_find( {'sha256': missing_cont['sha256']}) self.assertIsNone(actually_present) - @istest - def content_find_bad_input(self): + def test_content_find_bad_input(self): # 1. with bad input with self.assertRaises(ValueError): self.storage.content_find({}) # empty is bad # 2. with bad input with self.assertRaises(ValueError): self.storage.content_find( {'unknown-sha1': 'something'}) # not the right key - @istest - def object_find_by_sha1_git(self): + def test_object_find_by_sha1_git(self): sha1_gits = [b'00000000000000000000'] expected = { b'00000000000000000000': [], } self.storage.content_add([self.cont]) sha1_gits.append(self.cont['sha1_git']) expected[self.cont['sha1_git']] = [{ 'sha1_git': self.cont['sha1_git'], 'type': 'content', 'id': self.cont['sha1'], }] self.storage.directory_add([self.dir]) sha1_gits.append(self.dir['id']) expected[self.dir['id']] = [{ 'sha1_git': self.dir['id'], 'type': 'directory', 'id': self.dir['id'], }] self.storage.revision_add([self.revision]) sha1_gits.append(self.revision['id']) expected[self.revision['id']] = [{ 'sha1_git': self.revision['id'], 'type': 'revision', 'id': self.revision['id'], }] self.storage.release_add([self.release]) sha1_gits.append(self.release['id']) expected[self.release['id']] = [{ 'sha1_git': self.release['id'], 'type': 'release', 'id': self.release['id'], }] ret = self.storage.object_find_by_sha1_git(sha1_gits) for val in ret.values(): for obj in val: del obj['object_id'] self.assertEqual(expected, ret) - @istest - def tool_add(self): + def test_tool_add(self): tool = { 'name': 'some-unknown-tool', 'version': 'some-version', 'configuration': {"debian-package": "some-package"}, } actual_tool = self.storage.tool_get(tool) self.assertIsNone(actual_tool) # does not exist # add it actual_tools = list(self.storage.tool_add([tool])) self.assertEquals(len(actual_tools), 1) actual_tool = actual_tools[0] self.assertIsNotNone(actual_tool) # now it exists new_id = actual_tool.pop('id') self.assertEquals(actual_tool, tool) actual_tools2 = list(self.storage.tool_add([tool])) actual_tool2 = actual_tools2[0] self.assertIsNotNone(actual_tool2) # now it exists new_id2 = actual_tool2.pop('id') self.assertEqual(new_id, new_id2) self.assertEqual(actual_tool, actual_tool2) - @istest - def tool_add_multiple(self): + def test_tool_add_multiple(self): tool = { 'name': 'some-unknown-tool', 'version': 'some-version', 'configuration': {"debian-package": "some-package"}, } actual_tools = list(self.storage.tool_add([tool])) self.assertEqual(len(actual_tools), 1) new_tools = [tool, { 'name': 'yet-another-tool', 'version': 'version', 'configuration': {}, }] actual_tools = list(self.storage.tool_add(new_tools)) self.assertEqual(len(actual_tools), 2) # order not guaranteed, so we iterate over results to check for tool in actual_tools: _id = tool.pop('id') self.assertIsNotNone(_id) self.assertIn(tool, new_tools) - @istest - def tool_get_missing(self): + def test_tool_get_missing(self): tool = { 'name': 'unknown-tool', 'version': '3.1.0rc2-31-ga2cbb8c', 'configuration': {"command_line": "nomossa "}, } actual_tool = self.storage.tool_get(tool) self.assertIsNone(actual_tool) - @istest - def tool_metadata_get_missing_context(self): + def test_tool_metadata_get_missing_context(self): tool = { 'name': 'swh-metadata-translator', 'version': '0.0.1', 'configuration': {"context": "unknown-context"}, } actual_tool = self.storage.tool_get(tool) self.assertIsNone(actual_tool) - @istest - def tool_metadata_get(self): + def test_tool_metadata_get(self): tool = { 'name': 'swh-metadata-translator', 'version': '0.0.1', 'configuration': {"type": "local", "context": "npm"}, } tools = list(self.storage.tool_add([tool])) expected_tool = tools[0] # when actual_tool = self.storage.tool_get(tool) # then self.assertEqual(expected_tool, actual_tool) - @istest - def metadata_provider_get_by(self): + def test_metadata_provider_get_by(self): # given no_provider = self.storage.metadata_provider_get_by({ 'provider_name': self.provider['name'], 'provider_url': self.provider['url'] }) self.assertIsNone(no_provider) # when provider_id = self.storage.metadata_provider_add( self.provider['name'], self.provider['type'], self.provider['url'], self.provider['metadata']) actual_provider = self.storage.metadata_provider_get_by({ 'provider_name': self.provider['name'], 'provider_url': self.provider['url'] }) # then self.assertTrue(provider_id, actual_provider['id']) - @istest - def origin_metadata_add(self): + def test_origin_metadata_add(self): # given origin_id = self.storage.origin_add([self.origin])[0]['id'] origin_metadata0 = list(self.storage.origin_metadata_get_by(origin_id)) self.assertTrue(len(origin_metadata0) == 0) tools = list(self.storage.tool_add([self.metadata_tool])) tool = tools[0] self.storage.metadata_provider_add( self.provider['name'], self.provider['type'], self.provider['url'], self.provider['metadata']) provider = self.storage.metadata_provider_get_by({ 'provider_name': self.provider['name'], 'provider_url': self.provider['url'] }) tool = self.storage.tool_get(self.metadata_tool) # when adding for the same origin 2 metadatas o_m1 = self.storage.origin_metadata_add( origin_id, self.origin_metadata['discovery_date'], provider['id'], tool['id'], self.origin_metadata['metadata']) actual_om1 = list(self.storage.origin_metadata_get_by(origin_id)) # then self.assertEqual(actual_om1[0]['id'], o_m1) self.assertEqual(len(actual_om1), 1) self.assertEqual(actual_om1[0]['origin_id'], origin_id) - @istest - def origin_metadata_get(self): + def test_origin_metadata_get(self): # given origin_id = self.storage.origin_add([self.origin])[0]['id'] origin_id2 = self.storage.origin_add([self.origin2])[0]['id'] self.storage.metadata_provider_add(self.provider['name'], self.provider['type'], self.provider['url'], self.provider['metadata']) provider = self.storage.metadata_provider_get_by({ 'provider_name': self.provider['name'], 'provider_url': self.provider['url'] }) tool = self.storage.tool_get(self.metadata_tool) # when adding for the same origin 2 metadatas o_m1 = self.storage.origin_metadata_add( origin_id, self.origin_metadata['discovery_date'], provider['id'], tool['id'], self.origin_metadata['metadata']) o_m2 = self.storage.origin_metadata_add( origin_id2, self.origin_metadata2['discovery_date'], provider['id'], tool['id'], self.origin_metadata2['metadata']) o_m3 = self.storage.origin_metadata_add( origin_id, self.origin_metadata2['discovery_date'], provider['id'], tool['id'], self.origin_metadata2['metadata']) all_metadatas = list(self.storage.origin_metadata_get_by(origin_id)) metadatas_for_origin2 = list(self.storage.origin_metadata_get_by( origin_id2)) expected_results = [{ 'origin_id': origin_id, 'discovery_date': datetime.datetime( 2017, 1, 2, 0, 0, tzinfo=psycopg2.tz.FixedOffsetTimezone( offset=60, name=None)), 'metadata': { 'name': 'test_origin_metadata', 'version': '0.0.1' }, 'id': o_m3, 'provider_id': provider['id'], 'provider_name': 'hal', 'provider_type': 'deposit-client', 'provider_url': 'http:///hal/inria', 'tool_id': tool['id'] }, { 'origin_id': origin_id, 'discovery_date': datetime.datetime( 2015, 1, 2, 0, 0, tzinfo=psycopg2.tz.FixedOffsetTimezone( offset=60, name=None)), 'metadata': { 'name': 'test_origin_metadata', 'version': '0.0.1' }, 'id': o_m1, 'provider_id': provider['id'], 'provider_name': 'hal', 'provider_type': 'deposit-client', 'provider_url': 'http:///hal/inria', 'tool_id': tool['id'] }] # then self.assertEqual(len(all_metadatas), 2) self.assertEqual(len(metadatas_for_origin2), 1) self.assertEqual(metadatas_for_origin2[0]['id'], o_m2) self.assertEqual(all_metadatas, expected_results) - @istest - def origin_metadata_get_by_provider_type(self): + def test_origin_metadata_get_by_provider_type(self): # given origin_id = self.storage.origin_add([self.origin])[0]['id'] origin_id2 = self.storage.origin_add([self.origin2])[0]['id'] self.storage.metadata_provider_add( self.provider['name'], self.provider['type'], self.provider['url'], self.provider['metadata']) provider1 = self.storage.metadata_provider_get_by({ 'provider_name': self.provider['name'], 'provider_url': self.provider['url'] }) self.storage.metadata_provider_add( 'swMATH', 'registry', 'http://www.swmath.org/', {'email': 'contact@swmath.org', 'license': 'All rights reserved'}) provider2 = self.storage.metadata_provider_get_by({ 'provider_name': 'swMATH', 'provider_url': 'http://www.swmath.org/' }) # using the only tool now inserted in the data.sql, but for this # provider should be a crawler tool (not yet implemented) tool = self.storage.tool_get(self.metadata_tool) # when adding for the same origin 2 metadatas o_m1 = self.storage.origin_metadata_add( origin_id, self.origin_metadata['discovery_date'], provider1['id'], tool['id'], self.origin_metadata['metadata']) o_m2 = self.storage.origin_metadata_add( origin_id2, self.origin_metadata2['discovery_date'], provider2['id'], tool['id'], self.origin_metadata2['metadata']) provider_type = 'registry' m_by_provider = list(self.storage. origin_metadata_get_by( origin_id2, provider_type)) expected_results = [{ 'origin_id': origin_id2, 'discovery_date': datetime.datetime( 2017, 1, 2, 0, 0, tzinfo=psycopg2.tz.FixedOffsetTimezone( offset=60, name=None)), 'metadata': { 'name': 'test_origin_metadata', 'version': '0.0.1' }, 'id': o_m2, 'provider_id': provider2['id'], 'provider_name': 'swMATH', 'provider_type': provider_type, 'provider_url': 'http://www.swmath.org/', 'tool_id': tool['id'] }] # then self.assertEqual(len(m_by_provider), 1) self.assertEqual(m_by_provider, expected_results) self.assertEqual(m_by_provider[0]['id'], o_m2) self.assertIsNotNone(o_m1) class TestLocalStorage(CommonTestStorage, unittest.TestCase): """Test the local storage""" # Can only be tested with local storage as you can't mock # datetimes for the remote server - @istest - def fetch_history(self): + def test_fetch_history(self): origin = self.storage.origin_add_one(self.origin) with patch('datetime.datetime'): datetime.datetime.now.return_value = self.fetch_history_date fetch_history_id = self.storage.fetch_history_start(origin) datetime.datetime.now.assert_called_with(tz=datetime.timezone.utc) with patch('datetime.datetime'): datetime.datetime.now.return_value = self.fetch_history_end self.storage.fetch_history_end(fetch_history_id, self.fetch_history_data) fetch_history = self.storage.fetch_history_get(fetch_history_id) expected_fetch_history = self.fetch_history_data.copy() expected_fetch_history['id'] = fetch_history_id expected_fetch_history['origin'] = origin expected_fetch_history['date'] = self.fetch_history_date expected_fetch_history['duration'] = self.fetch_history_duration self.assertEqual(expected_fetch_history, fetch_history) # The remote API doesn't expose _person_add - @istest - def person_get(self): + def test_person_get(self): # given person0 = { 'fullname': b'bob ', 'name': b'bob', 'email': b'alice@bob', } id0 = self.storage._person_add(person0) person1 = { 'fullname': b'tony ', 'name': b'tony', 'email': b'tony@bob', } id1 = self.storage._person_add(person1) # when actual_persons = self.storage.person_get([id0, id1]) # given (person injection through release for example) self.assertEqual( list(actual_persons), [ { 'id': id0, 'fullname': person0['fullname'], 'name': person0['name'], 'email': person0['email'], }, { 'id': id1, 'fullname': person1['fullname'], 'name': person1['name'], 'email': person1['email'], }, ]) # This test is only relevant on the local storage, with an actual # objstorage raising an exception - @istest - def content_add_objstorage_exception(self): + def test_content_add_objstorage_exception(self): self.storage.objstorage.add = Mock( side_effect=Exception('mocked broken objstorage') ) with self.assertRaises(Exception) as e: self.storage.content_add([self.cont]) self.assertEqual(e.exception.args, ('mocked broken objstorage',)) missing = list(self.storage.content_missing([self.cont])) self.assertEqual(missing, [self.cont['sha1']]) class AlteringSchemaTest(BaseTestStorage, unittest.TestCase): """This class is dedicated for the rare case where the schema needs to be altered dynamically. Otherwise, the tests could be blocking when ran altogether. """ - @istest - def content_update(self): + def test_content_update(self): cont = copy.deepcopy(self.cont) self.storage.content_add([cont]) # alter the sha1_git for example cont['sha1_git'] = hash_to_bytes( '3a60a5275d0333bf13468e8b3dcab90f4046e654') self.storage.content_update([cont], keys=['sha1_git']) with self.storage.get_db().transaction() as cur: cur.execute('SELECT sha1, sha1_git, sha256, length, status' ' FROM content WHERE sha1 = %s', (cont['sha1'],)) datum = cur.fetchone() self.assertEqual( (datum[0].tobytes(), datum[1].tobytes(), datum[2].tobytes(), datum[3], datum[4]), (cont['sha1'], cont['sha1_git'], cont['sha256'], cont['length'], 'visible')) - @istest - def content_update_with_new_cols(self): + def test_content_update_with_new_cols(self): with self.storage.get_db().transaction() as cur: cur.execute("""alter table content add column test text default null, add column test2 text default null""") cont = copy.deepcopy(self.cont2) self.storage.content_add([cont]) cont['test'] = 'value-1' cont['test2'] = 'value-2' self.storage.content_update([cont], keys=['test', 'test2']) with self.storage.get_db().transaction() as cur: cur.execute( 'SELECT sha1, sha1_git, sha256, length, status, test, test2' ' FROM content WHERE sha1 = %s', (cont['sha1'],)) datum = cur.fetchone() self.assertEqual( (datum[0].tobytes(), datum[1].tobytes(), datum[2].tobytes(), datum[3], datum[4], datum[5], datum[6]), (cont['sha1'], cont['sha1_git'], cont['sha256'], cont['length'], 'visible', cont['test'], cont['test2'])) with self.storage.get_db().transaction() as cur: cur.execute("""alter table content drop column test, drop column test2""")