Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_storage.py
# Copyright (C) 2015-2017 The Software Heritage developers | # Copyright (C) 2015-2017 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
from collections import defaultdict | |||||
import copy | import copy | ||||
import datetime | import datetime | ||||
from operator import itemgetter | |||||
import psycopg2 | |||||
import unittest | import unittest | ||||
from collections import defaultdict | |||||
from operator import itemgetter | |||||
from unittest.mock import Mock, patch | from unittest.mock import Mock, patch | ||||
from nose.tools import istest | import psycopg2 | ||||
from nose.plugins.attrib import attr | from nose.plugins.attrib import attr | ||||
from swh.core.tests.db_testing import DbTestFixture | |||||
from swh.model import from_disk, identifiers | from swh.model import from_disk, identifiers | ||||
from swh.model.hashutil import hash_to_bytes | from swh.model.hashutil import hash_to_bytes | ||||
from swh.core.tests.db_testing import DbTestFixture | |||||
from swh.storage.tests.storage_testing import StorageTestFixture | from swh.storage.tests.storage_testing import StorageTestFixture | ||||
@attr('db') | @attr('db') | ||||
class BaseTestStorage(StorageTestFixture, DbTestFixture): | class BaseTestStorage(StorageTestFixture, DbTestFixture): | ||||
def setUp(self): | def setUp(self): | ||||
super().setUp() | super().setUp() | ||||
▲ Show 20 Lines • Show All 523 Lines • ▼ Show 20 Lines | class CommonTestStorage(BaseTestStorage): | ||||
def normalize_entity(entity): | def normalize_entity(entity): | ||||
entity = copy.deepcopy(entity) | entity = copy.deepcopy(entity) | ||||
for key in ('date', 'committer_date'): | for key in ('date', 'committer_date'): | ||||
if key in entity: | if key in entity: | ||||
entity[key] = identifiers.normalize_timestamp(entity[key]) | entity[key] = identifiers.normalize_timestamp(entity[key]) | ||||
return entity | return entity | ||||
@istest | def test_check_config(self): | ||||
def check_config(self): | |||||
self.assertTrue(self.storage.check_config(check_write=True)) | self.assertTrue(self.storage.check_config(check_write=True)) | ||||
self.assertTrue(self.storage.check_config(check_write=False)) | self.assertTrue(self.storage.check_config(check_write=False)) | ||||
@istest | def test_content_add(self): | ||||
def content_add(self): | |||||
cont = self.cont | cont = self.cont | ||||
self.storage.content_add([cont]) | self.storage.content_add([cont]) | ||||
if hasattr(self.storage, 'objstorage'): | if hasattr(self.storage, 'objstorage'): | ||||
self.assertIn(cont['sha1'], self.storage.objstorage) | self.assertIn(cont['sha1'], self.storage.objstorage) | ||||
self.cursor.execute('SELECT sha1, sha1_git, sha256, length, status' | self.cursor.execute('SELECT sha1, sha1_git, sha256, length, status' | ||||
' FROM content WHERE sha1 = %s', | ' FROM content WHERE sha1 = %s', | ||||
(cont['sha1'],)) | (cont['sha1'],)) | ||||
datum = self.cursor.fetchone() | datum = self.cursor.fetchone() | ||||
self.assertEqual( | self.assertEqual( | ||||
(datum[0].tobytes(), datum[1].tobytes(), datum[2].tobytes(), | (datum[0].tobytes(), datum[1].tobytes(), datum[2].tobytes(), | ||||
datum[3], datum[4]), | datum[3], datum[4]), | ||||
(cont['sha1'], cont['sha1_git'], cont['sha256'], | (cont['sha1'], cont['sha1_git'], cont['sha256'], | ||||
cont['length'], 'visible')) | cont['length'], 'visible')) | ||||
@istest | def test_content_add_collision(self): | ||||
def content_add_collision(self): | |||||
cont1 = self.cont | cont1 = self.cont | ||||
# create (corrupted) content with same sha1{,_git} but != sha256 | # create (corrupted) content with same sha1{,_git} but != sha256 | ||||
cont1b = cont1.copy() | cont1b = cont1.copy() | ||||
sha256_array = bytearray(cont1b['sha256']) | sha256_array = bytearray(cont1b['sha256']) | ||||
sha256_array[0] += 1 | sha256_array[0] += 1 | ||||
cont1b['sha256'] = bytes(sha256_array) | cont1b['sha256'] = bytes(sha256_array) | ||||
with self.assertRaises(psycopg2.IntegrityError): | with self.assertRaises(psycopg2.IntegrityError): | ||||
self.storage.content_add([cont1, cont1b]) | self.storage.content_add([cont1, cont1b]) | ||||
@istest | def test_skipped_content_add(self): | ||||
def skipped_content_add(self): | |||||
cont = self.skipped_cont.copy() | cont = self.skipped_cont.copy() | ||||
cont2 = self.skipped_cont2.copy() | cont2 = self.skipped_cont2.copy() | ||||
cont2['blake2s256'] = None | cont2['blake2s256'] = None | ||||
self.storage.content_add([cont, cont, cont2]) | self.storage.content_add([cont, cont, cont2]) | ||||
self.cursor.execute('SELECT sha1, sha1_git, sha256, blake2s256, ' | self.cursor.execute('SELECT sha1, sha1_git, sha256, blake2s256, ' | ||||
'length, status, reason ' | 'length, status, reason ' | ||||
Show All 15 Lines | def test_skipped_content_add(self): | ||||
self.assertEqual( | self.assertEqual( | ||||
(datum2[0].tobytes(), datum2[1].tobytes(), datum2[2].tobytes(), | (datum2[0].tobytes(), datum2[1].tobytes(), datum2[2].tobytes(), | ||||
datum2[3], datum2[4], datum2[5], datum2[6]), | datum2[3], datum2[4], datum2[5], datum2[6]), | ||||
(cont2['sha1'], cont2['sha1_git'], cont2['sha256'], | (cont2['sha1'], cont2['sha1_git'], cont2['sha256'], | ||||
cont2['blake2s256'], cont2['length'], 'absent', | cont2['blake2s256'], cont2['length'], 'absent', | ||||
'Content too long') | 'Content too long') | ||||
) | ) | ||||
@istest | def test_content_missing(self): | ||||
def content_missing(self): | |||||
cont2 = self.cont2 | cont2 = self.cont2 | ||||
missing_cont = self.missing_cont | missing_cont = self.missing_cont | ||||
self.storage.content_add([cont2]) | self.storage.content_add([cont2]) | ||||
test_contents = [cont2] | test_contents = [cont2] | ||||
missing_per_hash = defaultdict(list) | missing_per_hash = defaultdict(list) | ||||
for i in range(256): | for i in range(256): | ||||
test_content = missing_cont.copy() | test_content = missing_cont.copy() | ||||
for hash in ['sha1', 'sha256', 'sha1_git', 'blake2s256']: | for hash in ['sha1', 'sha256', 'sha1_git', 'blake2s256']: | ||||
test_content[hash] = bytes([i]) + test_content[hash][1:] | test_content[hash] = bytes([i]) + test_content[hash][1:] | ||||
missing_per_hash[hash].append(test_content[hash]) | missing_per_hash[hash].append(test_content[hash]) | ||||
test_contents.append(test_content) | test_contents.append(test_content) | ||||
self.assertCountEqual( | self.assertCountEqual( | ||||
self.storage.content_missing(test_contents), | self.storage.content_missing(test_contents), | ||||
missing_per_hash['sha1'] | missing_per_hash['sha1'] | ||||
) | ) | ||||
for hash in ['sha1', 'sha256', 'sha1_git', 'blake2s256']: | for hash in ['sha1', 'sha256', 'sha1_git', 'blake2s256']: | ||||
self.assertCountEqual( | self.assertCountEqual( | ||||
self.storage.content_missing(test_contents, key_hash=hash), | self.storage.content_missing(test_contents, key_hash=hash), | ||||
missing_per_hash[hash] | missing_per_hash[hash] | ||||
) | ) | ||||
@istest | def test_content_missing_per_sha1(self): | ||||
def content_missing_per_sha1(self): | |||||
# given | # given | ||||
cont2 = self.cont2 | cont2 = self.cont2 | ||||
missing_cont = self.missing_cont | missing_cont = self.missing_cont | ||||
self.storage.content_add([cont2]) | self.storage.content_add([cont2]) | ||||
# when | # when | ||||
gen = self.storage.content_missing_per_sha1([cont2['sha1'], | gen = self.storage.content_missing_per_sha1([cont2['sha1'], | ||||
missing_cont['sha1']]) | missing_cont['sha1']]) | ||||
# then | # then | ||||
self.assertEqual(list(gen), [missing_cont['sha1']]) | self.assertEqual(list(gen), [missing_cont['sha1']]) | ||||
@istest | def test_content_get_metadata(self): | ||||
def content_get_metadata(self): | |||||
cont1 = self.cont.copy() | cont1 = self.cont.copy() | ||||
cont2 = self.cont2.copy() | cont2 = self.cont2.copy() | ||||
self.storage.content_add([cont1, cont2]) | self.storage.content_add([cont1, cont2]) | ||||
gen = self.storage.content_get_metadata([cont1['sha1'], cont2['sha1']]) | gen = self.storage.content_get_metadata([cont1['sha1'], cont2['sha1']]) | ||||
# we only retrieve the metadata | # we only retrieve the metadata | ||||
cont1.pop('data') | cont1.pop('data') | ||||
cont2.pop('data') | cont2.pop('data') | ||||
self.assertCountEqual(list(gen), [cont1, cont2]) | self.assertCountEqual(list(gen), [cont1, cont2]) | ||||
@istest | def test_content_get_metadata_missing_sha1(self): | ||||
def content_get_metadata_missing_sha1(self): | |||||
cont1 = self.cont.copy() | cont1 = self.cont.copy() | ||||
cont2 = self.cont2.copy() | cont2 = self.cont2.copy() | ||||
missing_cont = self.missing_cont.copy() | missing_cont = self.missing_cont.copy() | ||||
self.storage.content_add([cont1, cont2]) | self.storage.content_add([cont1, cont2]) | ||||
gen = self.storage.content_get_metadata([missing_cont['sha1']]) | gen = self.storage.content_get_metadata([missing_cont['sha1']]) | ||||
# All the metadata keys are None | # All the metadata keys are None | ||||
missing_cont.pop('data') | missing_cont.pop('data') | ||||
for key in list(missing_cont): | for key in list(missing_cont): | ||||
if key != 'sha1': | if key != 'sha1': | ||||
missing_cont[key] = None | missing_cont[key] = None | ||||
self.assertEqual(list(gen), [missing_cont]) | self.assertEqual(list(gen), [missing_cont]) | ||||
@istest | def test_directory_add(self): | ||||
def directory_add(self): | |||||
init_missing = list(self.storage.directory_missing([self.dir['id']])) | init_missing = list(self.storage.directory_missing([self.dir['id']])) | ||||
self.assertEqual([self.dir['id']], init_missing) | self.assertEqual([self.dir['id']], init_missing) | ||||
self.storage.directory_add([self.dir]) | self.storage.directory_add([self.dir]) | ||||
stored_data = list(self.storage.directory_ls(self.dir['id'])) | stored_data = list(self.storage.directory_ls(self.dir['id'])) | ||||
data_to_store = [] | data_to_store = [] | ||||
Show All 11 Lines | def test_directory_add(self): | ||||
'length': None, | 'length': None, | ||||
}) | }) | ||||
self.assertEqual(data_to_store, stored_data) | self.assertEqual(data_to_store, stored_data) | ||||
after_missing = list(self.storage.directory_missing([self.dir['id']])) | after_missing = list(self.storage.directory_missing([self.dir['id']])) | ||||
self.assertEqual([], after_missing) | self.assertEqual([], after_missing) | ||||
@istest | def test_directory_entry_get_by_path(self): | ||||
def directory_entry_get_by_path(self): | |||||
# given | # given | ||||
init_missing = list(self.storage.directory_missing([self.dir3['id']])) | init_missing = list(self.storage.directory_missing([self.dir3['id']])) | ||||
self.assertEqual([self.dir3['id']], init_missing) | self.assertEqual([self.dir3['id']], init_missing) | ||||
self.storage.directory_add([self.dir3]) | self.storage.directory_add([self.dir3]) | ||||
expected_entries = [ | expected_entries = [ | ||||
{ | { | ||||
▲ Show 20 Lines • Show All 44 Lines • ▼ Show 20 Lines | def test_directory_entry_get_by_path(self): | ||||
# when (nothing should be found here since self.dir is not persisted.) | # when (nothing should be found here since self.dir is not persisted.) | ||||
for entry in self.dir['entries']: | for entry in self.dir['entries']: | ||||
actual_entry = self.storage.directory_entry_get_by_path( | actual_entry = self.storage.directory_entry_get_by_path( | ||||
self.dir['id'], | self.dir['id'], | ||||
[entry['name']]) | [entry['name']]) | ||||
self.assertIsNone(actual_entry) | self.assertIsNone(actual_entry) | ||||
@istest | def test_revision_add(self): | ||||
def revision_add(self): | |||||
init_missing = self.storage.revision_missing([self.revision['id']]) | init_missing = self.storage.revision_missing([self.revision['id']]) | ||||
self.assertEqual([self.revision['id']], list(init_missing)) | self.assertEqual([self.revision['id']], list(init_missing)) | ||||
self.storage.revision_add([self.revision]) | self.storage.revision_add([self.revision]) | ||||
end_missing = self.storage.revision_missing([self.revision['id']]) | end_missing = self.storage.revision_missing([self.revision['id']]) | ||||
self.assertEqual([], list(end_missing)) | self.assertEqual([], list(end_missing)) | ||||
@istest | def test_revision_log(self): | ||||
def revision_log(self): | |||||
# given | # given | ||||
# self.revision4 -is-child-of-> self.revision3 | # self.revision4 -is-child-of-> self.revision3 | ||||
self.storage.revision_add([self.revision3, | self.storage.revision_add([self.revision3, | ||||
self.revision4]) | self.revision4]) | ||||
# when | # when | ||||
actual_results = list(self.storage.revision_log( | actual_results = list(self.storage.revision_log( | ||||
[self.revision4['id']])) | [self.revision4['id']])) | ||||
# hack: ids generated | # hack: ids generated | ||||
for actual_result in actual_results: | for actual_result in actual_results: | ||||
del actual_result['author']['id'] | del actual_result['author']['id'] | ||||
del actual_result['committer']['id'] | del actual_result['committer']['id'] | ||||
self.assertEqual(len(actual_results), 2) # rev4 -child-> rev3 | self.assertEqual(len(actual_results), 2) # rev4 -child-> rev3 | ||||
self.assertEquals(actual_results[0], | self.assertEquals(actual_results[0], | ||||
self.normalize_entity(self.revision4)) | self.normalize_entity(self.revision4)) | ||||
self.assertEquals(actual_results[1], | self.assertEquals(actual_results[1], | ||||
self.normalize_entity(self.revision3)) | self.normalize_entity(self.revision3)) | ||||
@istest | def test_revision_log_with_limit(self): | ||||
def revision_log_with_limit(self): | |||||
# given | # given | ||||
# self.revision4 -is-child-of-> self.revision3 | # self.revision4 -is-child-of-> self.revision3 | ||||
self.storage.revision_add([self.revision3, | self.storage.revision_add([self.revision3, | ||||
self.revision4]) | self.revision4]) | ||||
actual_results = list(self.storage.revision_log( | actual_results = list(self.storage.revision_log( | ||||
[self.revision4['id']], 1)) | [self.revision4['id']], 1)) | ||||
# hack: ids generated | # hack: ids generated | ||||
for actual_result in actual_results: | for actual_result in actual_results: | ||||
del actual_result['author']['id'] | del actual_result['author']['id'] | ||||
del actual_result['committer']['id'] | del actual_result['committer']['id'] | ||||
self.assertEqual(len(actual_results), 1) | self.assertEqual(len(actual_results), 1) | ||||
self.assertEquals(actual_results[0], self.revision4) | self.assertEquals(actual_results[0], self.revision4) | ||||
@istest | def test_revision_log_by(self): | ||||
def revision_log_by(self): | |||||
# given | # given | ||||
origin_id = self.storage.origin_add_one(self.origin2) | origin_id = self.storage.origin_add_one(self.origin2) | ||||
self.storage.revision_add([self.revision3, | self.storage.revision_add([self.revision3, | ||||
self.revision4]) | self.revision4]) | ||||
# occurrence3 targets 'revision4' | # occurrence3 targets 'revision4' | ||||
# with branch 'master' and origin origin_id | # with branch 'master' and origin origin_id | ||||
occurrence3 = self.occurrence3.copy() | occurrence3 = self.occurrence3.copy() | ||||
▲ Show 20 Lines • Show All 49 Lines • ▼ Show 20 Lines | def test_revision_log_by(self): | ||||
timestamp=None)) | timestamp=None)) | ||||
self.assertEquals(actual_res, []) | self.assertEquals(actual_res, []) | ||||
@staticmethod | @staticmethod | ||||
def _short_revision(revision): | def _short_revision(revision): | ||||
return [revision['id'], revision['parents']] | return [revision['id'], revision['parents']] | ||||
@istest | def test_revision_shortlog(self): | ||||
def revision_shortlog(self): | |||||
# given | # given | ||||
# self.revision4 -is-child-of-> self.revision3 | # self.revision4 -is-child-of-> self.revision3 | ||||
self.storage.revision_add([self.revision3, | self.storage.revision_add([self.revision3, | ||||
self.revision4]) | self.revision4]) | ||||
# when | # when | ||||
actual_results = list(self.storage.revision_shortlog( | actual_results = list(self.storage.revision_shortlog( | ||||
[self.revision4['id']])) | [self.revision4['id']])) | ||||
self.assertEqual(len(actual_results), 2) # rev4 -child-> rev3 | self.assertEqual(len(actual_results), 2) # rev4 -child-> rev3 | ||||
self.assertEquals(list(actual_results[0]), | self.assertEquals(list(actual_results[0]), | ||||
self._short_revision(self.revision4)) | self._short_revision(self.revision4)) | ||||
self.assertEquals(list(actual_results[1]), | self.assertEquals(list(actual_results[1]), | ||||
self._short_revision(self.revision3)) | self._short_revision(self.revision3)) | ||||
@istest | def test_revision_shortlog_with_limit(self): | ||||
def revision_shortlog_with_limit(self): | |||||
# given | # given | ||||
# self.revision4 -is-child-of-> self.revision3 | # self.revision4 -is-child-of-> self.revision3 | ||||
self.storage.revision_add([self.revision3, | self.storage.revision_add([self.revision3, | ||||
self.revision4]) | self.revision4]) | ||||
actual_results = list(self.storage.revision_shortlog( | actual_results = list(self.storage.revision_shortlog( | ||||
[self.revision4['id']], 1)) | [self.revision4['id']], 1)) | ||||
self.assertEqual(len(actual_results), 1) | self.assertEqual(len(actual_results), 1) | ||||
self.assertEquals(list(actual_results[0]), | self.assertEquals(list(actual_results[0]), | ||||
self._short_revision(self.revision4)) | self._short_revision(self.revision4)) | ||||
@istest | def test_revision_get(self): | ||||
def revision_get(self): | |||||
self.storage.revision_add([self.revision]) | self.storage.revision_add([self.revision]) | ||||
actual_revisions = list(self.storage.revision_get( | actual_revisions = list(self.storage.revision_get( | ||||
[self.revision['id'], self.revision2['id']])) | [self.revision['id'], self.revision2['id']])) | ||||
# when | # when | ||||
del actual_revisions[0]['author']['id'] # hack: ids are generated | del actual_revisions[0]['author']['id'] # hack: ids are generated | ||||
del actual_revisions[0]['committer']['id'] | del actual_revisions[0]['committer']['id'] | ||||
self.assertEqual(len(actual_revisions), 2) | self.assertEqual(len(actual_revisions), 2) | ||||
self.assertEqual(actual_revisions[0], | self.assertEqual(actual_revisions[0], | ||||
self.normalize_entity(self.revision)) | self.normalize_entity(self.revision)) | ||||
self.assertIsNone(actual_revisions[1]) | self.assertIsNone(actual_revisions[1]) | ||||
@istest | def test_revision_get_no_parents(self): | ||||
def revision_get_no_parents(self): | |||||
self.storage.revision_add([self.revision3]) | self.storage.revision_add([self.revision3]) | ||||
get = list(self.storage.revision_get([self.revision3['id']])) | get = list(self.storage.revision_get([self.revision3['id']])) | ||||
self.assertEqual(len(get), 1) | self.assertEqual(len(get), 1) | ||||
self.assertEqual(get[0]['parents'], []) # no parents on this one | self.assertEqual(get[0]['parents'], []) # no parents on this one | ||||
@istest | def test_revision_get_by(self): | ||||
def revision_get_by(self): | |||||
# given | # given | ||||
self.storage.content_add([self.cont2]) | self.storage.content_add([self.cont2]) | ||||
self.storage.directory_add([self.dir2]) # point to self.cont | self.storage.directory_add([self.dir2]) # point to self.cont | ||||
self.storage.revision_add([self.revision2]) # points to self.dir | self.storage.revision_add([self.revision2]) # points to self.dir | ||||
origin_id = self.storage.origin_add_one(self.origin2) | origin_id = self.storage.origin_add_one(self.origin2) | ||||
# occurrence2 points to 'revision2' with branch 'master', we | # occurrence2 points to 'revision2' with branch 'master', we | ||||
# need to point to the right origin | # need to point to the right origin | ||||
Show All 21 Lines | def test_revision_get_by(self): | ||||
# when (with no branch filtering, it's still ok) | # when (with no branch filtering, it's still ok) | ||||
actual_results = list(self.storage.revision_get_by( | actual_results = list(self.storage.revision_get_by( | ||||
origin_id, | origin_id, | ||||
None, | None, | ||||
None)) | None)) | ||||
self.assertEqual(actual_results[0], expected_revisions[0]) | self.assertEqual(actual_results[0], expected_revisions[0]) | ||||
@istest | def test_revision_get_by_multiple_occurrence(self): | ||||
def revision_get_by_multiple_occurrence(self): | |||||
# 2 occurrences pointing to 2 different revisions | # 2 occurrences pointing to 2 different revisions | ||||
# each occurrence have 1 day delta | # each occurrence have 1 day delta | ||||
# the api must return the revision whose occurrence is the nearest. | # the api must return the revision whose occurrence is the nearest. | ||||
# given | # given | ||||
self.storage.content_add([self.cont2]) | self.storage.content_add([self.cont2]) | ||||
self.storage.directory_add([self.dir2]) | self.storage.directory_add([self.dir2]) | ||||
self.storage.revision_add([self.revision2, self.revision3]) | self.storage.revision_add([self.revision2, self.revision3]) | ||||
▲ Show 20 Lines • Show All 87 Lines • ▼ Show 20 Lines | def test_revision_get_by_multiple_occurrence(self): | ||||
for actual_result in actual_results4: | for actual_result in actual_results4: | ||||
del actual_result['author']['id'] | del actual_result['author']['id'] | ||||
del actual_result['committer']['id'] | del actual_result['committer']['id'] | ||||
self.assertEquals(len(actual_results4), 1) | self.assertEquals(len(actual_results4), 1) | ||||
self.assertCountEqual(actual_results4, | self.assertCountEqual(actual_results4, | ||||
[self.normalize_entity(self.revision3)]) | [self.normalize_entity(self.revision3)]) | ||||
@istest | def test_release_add(self): | ||||
def release_add(self): | |||||
init_missing = self.storage.release_missing([self.release['id'], | init_missing = self.storage.release_missing([self.release['id'], | ||||
self.release2['id']]) | self.release2['id']]) | ||||
self.assertEqual([self.release['id'], self.release2['id']], | self.assertEqual([self.release['id'], self.release2['id']], | ||||
list(init_missing)) | list(init_missing)) | ||||
self.storage.release_add([self.release, self.release2]) | self.storage.release_add([self.release, self.release2]) | ||||
end_missing = self.storage.release_missing([self.release['id'], | end_missing = self.storage.release_missing([self.release['id'], | ||||
self.release2['id']]) | self.release2['id']]) | ||||
self.assertEqual([], list(end_missing)) | self.assertEqual([], list(end_missing)) | ||||
@istest | def test_release_get(self): | ||||
def release_get(self): | |||||
# given | # given | ||||
self.storage.release_add([self.release, self.release2]) | self.storage.release_add([self.release, self.release2]) | ||||
# when | # when | ||||
actual_releases = list(self.storage.release_get([self.release['id'], | actual_releases = list(self.storage.release_get([self.release['id'], | ||||
self.release2['id']])) | self.release2['id']])) | ||||
# then | # then | ||||
for actual_release in actual_releases: | for actual_release in actual_releases: | ||||
del actual_release['author']['id'] # hack: ids are generated | del actual_release['author']['id'] # hack: ids are generated | ||||
self.assertEquals([self.normalize_entity(self.release), | self.assertEquals([self.normalize_entity(self.release), | ||||
self.normalize_entity(self.release2)], | self.normalize_entity(self.release2)], | ||||
[actual_releases[0], actual_releases[1]]) | [actual_releases[0], actual_releases[1]]) | ||||
@istest | def test_origin_add_one(self): | ||||
def origin_add_one(self): | |||||
origin0 = self.storage.origin_get(self.origin) | origin0 = self.storage.origin_get(self.origin) | ||||
self.assertIsNone(origin0) | self.assertIsNone(origin0) | ||||
id = self.storage.origin_add_one(self.origin) | id = self.storage.origin_add_one(self.origin) | ||||
actual_origin = self.storage.origin_get({'url': self.origin['url'], | actual_origin = self.storage.origin_get({'url': self.origin['url'], | ||||
'type': self.origin['type']}) | 'type': self.origin['type']}) | ||||
self.assertEqual(actual_origin['id'], id) | self.assertEqual(actual_origin['id'], id) | ||||
id2 = self.storage.origin_add_one(self.origin) | id2 = self.storage.origin_add_one(self.origin) | ||||
self.assertEqual(id, id2) | self.assertEqual(id, id2) | ||||
@istest | def test_origin_add(self): | ||||
def origin_add(self): | |||||
origin0 = self.storage.origin_get(self.origin) | origin0 = self.storage.origin_get(self.origin) | ||||
self.assertIsNone(origin0) | self.assertIsNone(origin0) | ||||
origin1, origin2 = self.storage.origin_add([self.origin, self.origin2]) | origin1, origin2 = self.storage.origin_add([self.origin, self.origin2]) | ||||
actual_origin = self.storage.origin_get({ | actual_origin = self.storage.origin_get({ | ||||
'url': self.origin['url'], | 'url': self.origin['url'], | ||||
'type': self.origin['type'], | 'type': self.origin['type'], | ||||
}) | }) | ||||
self.assertEqual(actual_origin['id'], origin1['id']) | self.assertEqual(actual_origin['id'], origin1['id']) | ||||
actual_origin2 = self.storage.origin_get({ | actual_origin2 = self.storage.origin_get({ | ||||
'url': self.origin2['url'], | 'url': self.origin2['url'], | ||||
'type': self.origin2['type'], | 'type': self.origin2['type'], | ||||
}) | }) | ||||
self.assertEqual(actual_origin2['id'], origin2['id']) | self.assertEqual(actual_origin2['id'], origin2['id']) | ||||
@istest | def test_origin_add_twice(self): | ||||
def origin_add_twice(self): | |||||
add1 = self.storage.origin_add([self.origin, self.origin2]) | add1 = self.storage.origin_add([self.origin, self.origin2]) | ||||
add2 = self.storage.origin_add([self.origin, self.origin2]) | add2 = self.storage.origin_add([self.origin, self.origin2]) | ||||
self.assertEqual(add1, add2) | self.assertEqual(add1, add2) | ||||
@istest | def test_origin_get(self): | ||||
def origin_get(self): | |||||
self.assertIsNone(self.storage.origin_get(self.origin)) | self.assertIsNone(self.storage.origin_get(self.origin)) | ||||
id = self.storage.origin_add_one(self.origin) | id = self.storage.origin_add_one(self.origin) | ||||
# lookup per type and url (returns id) | # lookup per type and url (returns id) | ||||
actual_origin0 = self.storage.origin_get({'url': self.origin['url'], | actual_origin0 = self.storage.origin_get({'url': self.origin['url'], | ||||
'type': self.origin['type']}) | 'type': self.origin['type']}) | ||||
self.assertEqual(actual_origin0['id'], id) | self.assertEqual(actual_origin0['id'], id) | ||||
# lookup per id (returns dict) | # lookup per id (returns dict) | ||||
actual_origin1 = self.storage.origin_get({'id': id}) | actual_origin1 = self.storage.origin_get({'id': id}) | ||||
self.assertEqual(actual_origin1, {'id': id, | self.assertEqual(actual_origin1, {'id': id, | ||||
'type': self.origin['type'], | 'type': self.origin['type'], | ||||
'url': self.origin['url']}) | 'url': self.origin['url']}) | ||||
@istest | def test_origin_search(self): | ||||
def origin_search(self): | |||||
found_origins = list(self.storage.origin_search(self.origin['url'])) | found_origins = list(self.storage.origin_search(self.origin['url'])) | ||||
self.assertEqual(len(found_origins), 0) | self.assertEqual(len(found_origins), 0) | ||||
found_origins = list(self.storage.origin_search(self.origin['url'], | found_origins = list(self.storage.origin_search(self.origin['url'], | ||||
regexp=True)) | regexp=True)) | ||||
self.assertEqual(len(found_origins), 0) | self.assertEqual(len(found_origins), 0) | ||||
id = self.storage.origin_add_one(self.origin) | id = self.storage.origin_add_one(self.origin) | ||||
Show All 39 Lines | def test_origin_search(self): | ||||
found_origins = list(self.storage.origin_search('/', offset=1, limit=1)) # noqa | found_origins = list(self.storage.origin_search('/', offset=1, limit=1)) # noqa | ||||
self.assertEqual(len(found_origins), 1) | self.assertEqual(len(found_origins), 1) | ||||
self.assertEqual(found_origins[0], origin2_data) | self.assertEqual(found_origins[0], origin2_data) | ||||
found_origins = list(self.storage.origin_search('.*/.*', offset=1, limit=1, regexp=True)) # noqa | found_origins = list(self.storage.origin_search('.*/.*', offset=1, limit=1, regexp=True)) # noqa | ||||
self.assertEqual(len(found_origins), 1) | self.assertEqual(len(found_origins), 1) | ||||
self.assertEqual(found_origins[0], origin2_data) | self.assertEqual(found_origins[0], origin2_data) | ||||
@istest | def test_origin_visit_add(self): | ||||
def origin_visit_add(self): | |||||
# given | # given | ||||
self.assertIsNone(self.storage.origin_get(self.origin2)) | self.assertIsNone(self.storage.origin_get(self.origin2)) | ||||
origin_id = self.storage.origin_add_one(self.origin2) | origin_id = self.storage.origin_add_one(self.origin2) | ||||
self.assertIsNotNone(origin_id) | self.assertIsNotNone(origin_id) | ||||
# when | # when | ||||
origin_visit1 = self.storage.origin_visit_add( | origin_visit1 = self.storage.origin_visit_add( | ||||
Show All 11 Lines | def test_origin_visit_add(self): | ||||
'origin': origin_id, | 'origin': origin_id, | ||||
'date': self.date_visit2, | 'date': self.date_visit2, | ||||
'visit': origin_visit1['visit'], | 'visit': origin_visit1['visit'], | ||||
'status': 'ongoing', | 'status': 'ongoing', | ||||
'metadata': None, | 'metadata': None, | ||||
'snapshot': None, | 'snapshot': None, | ||||
}]) | }]) | ||||
@istest | def test_origin_visit_update(self): | ||||
def origin_visit_update(self): | |||||
# given | # given | ||||
origin_id = self.storage.origin_add_one(self.origin2) | origin_id = self.storage.origin_add_one(self.origin2) | ||||
origin_id2 = self.storage.origin_add_one(self.origin) | origin_id2 = self.storage.origin_add_one(self.origin) | ||||
origin_visit1 = self.storage.origin_visit_add( | origin_visit1 = self.storage.origin_visit_add( | ||||
origin_id, | origin_id, | ||||
ts=self.date_visit2) | ts=self.date_visit2) | ||||
▲ Show 20 Lines • Show All 64 Lines • ▼ Show 20 Lines | def test_origin_visit_update(self): | ||||
'origin': origin_visit3['origin'], | 'origin': origin_visit3['origin'], | ||||
'date': self.date_visit3, | 'date': self.date_visit3, | ||||
'visit': origin_visit3['visit'], | 'visit': origin_visit3['visit'], | ||||
'status': 'partial', | 'status': 'partial', | ||||
'metadata': None, | 'metadata': None, | ||||
'snapshot': None, | 'snapshot': None, | ||||
}]) | }]) | ||||
@istest | def test_origin_visit_get_by(self): | ||||
def origin_visit_get_by(self): | |||||
origin_id = self.storage.origin_add_one(self.origin2) | origin_id = self.storage.origin_add_one(self.origin2) | ||||
origin_id2 = self.storage.origin_add_one(self.origin) | origin_id2 = self.storage.origin_add_one(self.origin) | ||||
origin_visit1 = self.storage.origin_visit_add( | origin_visit1 = self.storage.origin_visit_add( | ||||
origin_id, | origin_id, | ||||
ts=self.date_visit2) | ts=self.date_visit2) | ||||
self.storage.snapshot_add(origin_id, origin_visit1['visit'], | self.storage.snapshot_add(origin_id, origin_visit1['visit'], | ||||
Show All 25 Lines | def test_origin_visit_get_by(self): | ||||
# when | # when | ||||
actual_origin_visit1 = self.storage.origin_visit_get_by( | actual_origin_visit1 = self.storage.origin_visit_get_by( | ||||
origin_visit1['origin'], origin_visit1['visit']) | origin_visit1['origin'], origin_visit1['visit']) | ||||
# then | # then | ||||
self.assertEquals(actual_origin_visit1, expected_origin_visit) | self.assertEquals(actual_origin_visit1, expected_origin_visit) | ||||
@istest | def test_origin_visit_get_by_no_result(self): | ||||
def origin_visit_get_by_no_result(self): | |||||
# No result | # No result | ||||
actual_origin_visit = self.storage.origin_visit_get_by( | actual_origin_visit = self.storage.origin_visit_get_by( | ||||
10, 999) | 10, 999) | ||||
self.assertIsNone(actual_origin_visit) | self.assertIsNone(actual_origin_visit) | ||||
@istest | def test_occurrence_add(self): | ||||
def occurrence_add(self): | |||||
occur = self.occurrence.copy() | occur = self.occurrence.copy() | ||||
origin_id = self.storage.origin_add_one(self.origin2) | origin_id = self.storage.origin_add_one(self.origin2) | ||||
date_visit1 = self.date_visit1 | date_visit1 = self.date_visit1 | ||||
origin_visit1 = self.storage.origin_visit_add(origin_id, date_visit1) | origin_visit1 = self.storage.origin_visit_add(origin_id, date_visit1) | ||||
revision = self.revision.copy() | revision = self.revision.copy() | ||||
revision['id'] = occur['target'] | revision['id'] = occur['target'] | ||||
▲ Show 20 Lines • Show All 42 Lines • ▼ Show 20 Lines | def test_occurrence_add(self): | ||||
(occur['origin'], occur['branch'], occur['target'], | (occur['origin'], occur['branch'], occur['target'], | ||||
occur['target_type'], date_visit1)) | occur['target_type'], date_visit1)) | ||||
self.assertEqual( | self.assertEqual( | ||||
(ret[1][0], ret[1][1].tobytes(), ret[1][2].tobytes(), | (ret[1][0], ret[1][1].tobytes(), ret[1][2].tobytes(), | ||||
ret[1][3], ret[1][4]), | ret[1][3], ret[1][4]), | ||||
(occur2['origin'], occur2['branch'], occur2['target'], | (occur2['origin'], occur2['branch'], occur2['target'], | ||||
occur2['target_type'], date_visit2)) | occur2['target_type'], date_visit2)) | ||||
@istest | def test_snapshot_add_get_empty(self): | ||||
def snapshot_add_get_empty(self): | |||||
origin_id = self.storage.origin_add_one(self.origin) | origin_id = self.storage.origin_add_one(self.origin) | ||||
origin_visit1 = self.storage.origin_visit_add(origin_id, | origin_visit1 = self.storage.origin_visit_add(origin_id, | ||||
self.date_visit1) | self.date_visit1) | ||||
visit_id = origin_visit1['visit'] | visit_id = origin_visit1['visit'] | ||||
self.storage.snapshot_add(origin_id, visit_id, self.empty_snapshot) | self.storage.snapshot_add(origin_id, visit_id, self.empty_snapshot) | ||||
by_id = self.storage.snapshot_get(self.empty_snapshot['id']) | by_id = self.storage.snapshot_get(self.empty_snapshot['id']) | ||||
self.assertEqual(by_id, self.empty_snapshot) | self.assertEqual(by_id, self.empty_snapshot) | ||||
by_ov = self.storage.snapshot_get_by_origin_visit(origin_id, visit_id) | by_ov = self.storage.snapshot_get_by_origin_visit(origin_id, visit_id) | ||||
self.assertEqual(by_ov, self.empty_snapshot) | self.assertEqual(by_ov, self.empty_snapshot) | ||||
@istest | def test_snapshot_add_get_complete(self): | ||||
def snapshot_add_get_complete(self): | |||||
origin_id = self.storage.origin_add_one(self.origin) | origin_id = self.storage.origin_add_one(self.origin) | ||||
origin_visit1 = self.storage.origin_visit_add(origin_id, | origin_visit1 = self.storage.origin_visit_add(origin_id, | ||||
self.date_visit1) | self.date_visit1) | ||||
visit_id = origin_visit1['visit'] | visit_id = origin_visit1['visit'] | ||||
self.storage.snapshot_add(origin_id, visit_id, self.complete_snapshot) | self.storage.snapshot_add(origin_id, visit_id, self.complete_snapshot) | ||||
by_id = self.storage.snapshot_get(self.complete_snapshot['id']) | by_id = self.storage.snapshot_get(self.complete_snapshot['id']) | ||||
self.assertEqual(by_id, self.complete_snapshot) | self.assertEqual(by_id, self.complete_snapshot) | ||||
by_ov = self.storage.snapshot_get_by_origin_visit(origin_id, visit_id) | by_ov = self.storage.snapshot_get_by_origin_visit(origin_id, visit_id) | ||||
self.assertEqual(by_ov, self.complete_snapshot) | self.assertEqual(by_ov, self.complete_snapshot) | ||||
@istest | def test_snapshot_add_count_branches(self): | ||||
def snapshot_add_count_branches(self): | |||||
origin_id = self.storage.origin_add_one(self.origin) | origin_id = self.storage.origin_add_one(self.origin) | ||||
origin_visit1 = self.storage.origin_visit_add(origin_id, | origin_visit1 = self.storage.origin_visit_add(origin_id, | ||||
self.date_visit1) | self.date_visit1) | ||||
visit_id = origin_visit1['visit'] | visit_id = origin_visit1['visit'] | ||||
self.storage.snapshot_add(origin_id, visit_id, self.complete_snapshot) | self.storage.snapshot_add(origin_id, visit_id, self.complete_snapshot) | ||||
snp_id = self.complete_snapshot['id'] | snp_id = self.complete_snapshot['id'] | ||||
snp_size = self.storage.snapshot_count_branches(snp_id) | snp_size = self.storage.snapshot_count_branches(snp_id) | ||||
expected_snp_size = { | expected_snp_size = { | ||||
'alias': 1, | 'alias': 1, | ||||
'content': 1, | 'content': 1, | ||||
'directory': 1, | 'directory': 1, | ||||
'release': 1, | 'release': 1, | ||||
'revision': 1, | 'revision': 1, | ||||
'snapshot': 1, | 'snapshot': 1, | ||||
None: 1 | None: 1 | ||||
} | } | ||||
self.assertEqual(snp_size, expected_snp_size) | self.assertEqual(snp_size, expected_snp_size) | ||||
@istest | def test_snapshot_add_get_paginated(self): | ||||
def snapshot_add_get_paginated(self): | |||||
origin_id = self.storage.origin_add_one(self.origin) | origin_id = self.storage.origin_add_one(self.origin) | ||||
origin_visit1 = self.storage.origin_visit_add(origin_id, | origin_visit1 = self.storage.origin_visit_add(origin_id, | ||||
self.date_visit1) | self.date_visit1) | ||||
visit_id = origin_visit1['visit'] | visit_id = origin_visit1['visit'] | ||||
self.storage.snapshot_add(origin_id, visit_id, self.complete_snapshot) | self.storage.snapshot_add(origin_id, visit_id, self.complete_snapshot) | ||||
snp_id = self.complete_snapshot['id'] | snp_id = self.complete_snapshot['id'] | ||||
Show All 24 Lines | def test_snapshot_add_get_paginated(self): | ||||
expected_snapshot = copy.deepcopy(self.complete_snapshot) | expected_snapshot = copy.deepcopy(self.complete_snapshot) | ||||
del expected_snapshot['next_branch'] | del expected_snapshot['next_branch'] | ||||
for name in [b'alias', b'content', b'dangling', b'snapshot']: | for name in [b'alias', b'content', b'dangling', b'snapshot']: | ||||
del expected_snapshot['branches'][name] | del expected_snapshot['branches'][name] | ||||
self.assertEqual(snapshot, expected_snapshot) | self.assertEqual(snapshot, expected_snapshot) | ||||
@istest | def test_snapshot_add_get_filtered(self): | ||||
def snapshot_add_get_filtered(self): | |||||
origin_id = self.storage.origin_add_one(self.origin) | origin_id = self.storage.origin_add_one(self.origin) | ||||
origin_visit1 = self.storage.origin_visit_add(origin_id, | origin_visit1 = self.storage.origin_visit_add(origin_id, | ||||
self.date_visit1) | self.date_visit1) | ||||
visit_id = origin_visit1['visit'] | visit_id = origin_visit1['visit'] | ||||
self.storage.snapshot_add(origin_id, visit_id, self.complete_snapshot) | self.storage.snapshot_add(origin_id, visit_id, self.complete_snapshot) | ||||
snp_id = self.complete_snapshot['id'] | snp_id = self.complete_snapshot['id'] | ||||
Show All 15 Lines | def test_snapshot_add_get_filtered(self): | ||||
expected_snapshot = copy.deepcopy(self.complete_snapshot) | expected_snapshot = copy.deepcopy(self.complete_snapshot) | ||||
del expected_snapshot['next_branch'] | del expected_snapshot['next_branch'] | ||||
for name in [b'content', b'dangling', b'directory', b'release', | for name in [b'content', b'dangling', b'directory', b'release', | ||||
b'revision', b'snapshot']: | b'revision', b'snapshot']: | ||||
del expected_snapshot['branches'][name] | del expected_snapshot['branches'][name] | ||||
self.assertEqual(snapshot, expected_snapshot) | self.assertEqual(snapshot, expected_snapshot) | ||||
@istest | def test_snapshot_add_get(self): | ||||
def snapshot_add_get(self): | |||||
origin_id = self.storage.origin_add_one(self.origin) | origin_id = self.storage.origin_add_one(self.origin) | ||||
origin_visit1 = self.storage.origin_visit_add(origin_id, | origin_visit1 = self.storage.origin_visit_add(origin_id, | ||||
self.date_visit1) | self.date_visit1) | ||||
visit_id = origin_visit1['visit'] | visit_id = origin_visit1['visit'] | ||||
self.storage.snapshot_add(origin_id, visit_id, self.snapshot) | self.storage.snapshot_add(origin_id, visit_id, self.snapshot) | ||||
by_id = self.storage.snapshot_get(self.snapshot['id']) | by_id = self.storage.snapshot_get(self.snapshot['id']) | ||||
self.assertEqual(by_id, self.snapshot) | self.assertEqual(by_id, self.snapshot) | ||||
by_ov = self.storage.snapshot_get_by_origin_visit(origin_id, visit_id) | by_ov = self.storage.snapshot_get_by_origin_visit(origin_id, visit_id) | ||||
self.assertEqual(by_ov, self.snapshot) | self.assertEqual(by_ov, self.snapshot) | ||||
origin_visit_info = self.storage.origin_visit_get_by(origin_id, | origin_visit_info = self.storage.origin_visit_get_by(origin_id, | ||||
visit_id) | visit_id) | ||||
self.assertEqual(origin_visit_info['snapshot'], self.snapshot['id']) | self.assertEqual(origin_visit_info['snapshot'], self.snapshot['id']) | ||||
@istest | def test_snapshot_add_twice(self): | ||||
def snapshot_add_twice(self): | |||||
origin_id = self.storage.origin_add_one(self.origin) | origin_id = self.storage.origin_add_one(self.origin) | ||||
origin_visit1 = self.storage.origin_visit_add(origin_id, | origin_visit1 = self.storage.origin_visit_add(origin_id, | ||||
self.date_visit1) | self.date_visit1) | ||||
visit1_id = origin_visit1['visit'] | visit1_id = origin_visit1['visit'] | ||||
self.storage.snapshot_add(origin_id, visit1_id, self.snapshot) | self.storage.snapshot_add(origin_id, visit1_id, self.snapshot) | ||||
by_ov1 = self.storage.snapshot_get_by_origin_visit(origin_id, | by_ov1 = self.storage.snapshot_get_by_origin_visit(origin_id, | ||||
visit1_id) | visit1_id) | ||||
self.assertEqual(by_ov1, self.snapshot) | self.assertEqual(by_ov1, self.snapshot) | ||||
origin_visit2 = self.storage.origin_visit_add(origin_id, | origin_visit2 = self.storage.origin_visit_add(origin_id, | ||||
self.date_visit2) | self.date_visit2) | ||||
visit2_id = origin_visit2['visit'] | visit2_id = origin_visit2['visit'] | ||||
self.storage.snapshot_add(origin_id, visit2_id, self.snapshot) | self.storage.snapshot_add(origin_id, visit2_id, self.snapshot) | ||||
by_ov2 = self.storage.snapshot_get_by_origin_visit(origin_id, | by_ov2 = self.storage.snapshot_get_by_origin_visit(origin_id, | ||||
visit2_id) | visit2_id) | ||||
self.assertEqual(by_ov2, self.snapshot) | self.assertEqual(by_ov2, self.snapshot) | ||||
@istest | def test_snapshot_get_nonexistent(self): | ||||
def snapshot_get_nonexistent(self): | |||||
bogus_snapshot_id = b'bogus snapshot id 00' | bogus_snapshot_id = b'bogus snapshot id 00' | ||||
bogus_origin_id = 1 | bogus_origin_id = 1 | ||||
bogus_visit_id = 1 | bogus_visit_id = 1 | ||||
by_id = self.storage.snapshot_get(bogus_snapshot_id) | by_id = self.storage.snapshot_get(bogus_snapshot_id) | ||||
self.assertIsNone(by_id) | self.assertIsNone(by_id) | ||||
by_ov = self.storage.snapshot_get_by_origin_visit(bogus_origin_id, | by_ov = self.storage.snapshot_get_by_origin_visit(bogus_origin_id, | ||||
bogus_visit_id) | bogus_visit_id) | ||||
self.assertIsNone(by_ov) | self.assertIsNone(by_ov) | ||||
@istest | def test_snapshot_get_latest(self): | ||||
def snapshot_get_latest(self): | |||||
origin_id = self.storage.origin_add_one(self.origin) | origin_id = self.storage.origin_add_one(self.origin) | ||||
origin_visit1 = self.storage.origin_visit_add(origin_id, | origin_visit1 = self.storage.origin_visit_add(origin_id, | ||||
self.date_visit1) | self.date_visit1) | ||||
visit1_id = origin_visit1['visit'] | visit1_id = origin_visit1['visit'] | ||||
origin_visit2 = self.storage.origin_visit_add(origin_id, | origin_visit2 = self.storage.origin_visit_add(origin_id, | ||||
self.date_visit2) | self.date_visit2) | ||||
visit2_id = origin_visit2['visit'] | visit2_id = origin_visit2['visit'] | ||||
Show All 27 Lines | def test_snapshot_get_latest(self): | ||||
# Check that the status filter is still working | # Check that the status filter is still working | ||||
self.assertEquals( | self.assertEquals( | ||||
self.complete_snapshot, | self.complete_snapshot, | ||||
self.storage.snapshot_get_latest(origin_id, | self.storage.snapshot_get_latest(origin_id, | ||||
allowed_statuses=['full']), | allowed_statuses=['full']), | ||||
) | ) | ||||
@istest | def test_stat_counters(self): | ||||
def stat_counters(self): | |||||
expected_keys = ['content', 'directory', 'directory_entry_dir', | expected_keys = ['content', 'directory', 'directory_entry_dir', | ||||
'origin', 'person', 'revision'] | 'origin', 'person', 'revision'] | ||||
for key in expected_keys: | for key in expected_keys: | ||||
self.cursor.execute('select * from swh_update_counter(%s)', (key,)) | self.cursor.execute('select * from swh_update_counter(%s)', (key,)) | ||||
self.conn.commit() | self.conn.commit() | ||||
counters = self.storage.stat_counters() | counters = self.storage.stat_counters() | ||||
self.assertTrue(set(expected_keys) <= set(counters)) | self.assertTrue(set(expected_keys) <= set(counters)) | ||||
self.assertIsInstance(counters[expected_keys[0]], int) | self.assertIsInstance(counters[expected_keys[0]], int) | ||||
@istest | def test_content_find_with_present_content(self): | ||||
def content_find_with_present_content(self): | |||||
# 1. with something to find | # 1. with something to find | ||||
cont = self.cont | cont = self.cont | ||||
self.storage.content_add([cont]) | self.storage.content_add([cont]) | ||||
actually_present = self.storage.content_find({'sha1': cont['sha1']}) | actually_present = self.storage.content_find({'sha1': cont['sha1']}) | ||||
actually_present.pop('ctime') | actually_present.pop('ctime') | ||||
self.assertEqual(actually_present, { | self.assertEqual(actually_present, { | ||||
▲ Show 20 Lines • Show All 46 Lines • ▼ Show 20 Lines | def test_content_find_with_present_content(self): | ||||
'sha1': cont['sha1'], | 'sha1': cont['sha1'], | ||||
'sha256': cont['sha256'], | 'sha256': cont['sha256'], | ||||
'sha1_git': cont['sha1_git'], | 'sha1_git': cont['sha1_git'], | ||||
'blake2s256': cont['blake2s256'], | 'blake2s256': cont['blake2s256'], | ||||
'length': cont['length'], | 'length': cont['length'], | ||||
'status': 'visible' | 'status': 'visible' | ||||
}) | }) | ||||
@istest | def test_content_find_with_non_present_content(self): | ||||
def content_find_with_non_present_content(self): | |||||
# 1. with something that does not exist | # 1. with something that does not exist | ||||
missing_cont = self.missing_cont | missing_cont = self.missing_cont | ||||
actually_present = self.storage.content_find( | actually_present = self.storage.content_find( | ||||
{'sha1': missing_cont['sha1']}) | {'sha1': missing_cont['sha1']}) | ||||
self.assertIsNone(actually_present) | self.assertIsNone(actually_present) | ||||
# 2. with something that does not exist | # 2. with something that does not exist | ||||
actually_present = self.storage.content_find( | actually_present = self.storage.content_find( | ||||
{'sha1_git': missing_cont['sha1_git']}) | {'sha1_git': missing_cont['sha1_git']}) | ||||
self.assertIsNone(actually_present) | self.assertIsNone(actually_present) | ||||
# 3. with something that does not exist | # 3. with something that does not exist | ||||
actually_present = self.storage.content_find( | actually_present = self.storage.content_find( | ||||
{'sha256': missing_cont['sha256']}) | {'sha256': missing_cont['sha256']}) | ||||
self.assertIsNone(actually_present) | self.assertIsNone(actually_present) | ||||
@istest | def test_content_find_bad_input(self): | ||||
def content_find_bad_input(self): | |||||
# 1. with bad input | # 1. with bad input | ||||
with self.assertRaises(ValueError): | with self.assertRaises(ValueError): | ||||
self.storage.content_find({}) # empty is bad | self.storage.content_find({}) # empty is bad | ||||
# 2. with bad input | # 2. with bad input | ||||
with self.assertRaises(ValueError): | with self.assertRaises(ValueError): | ||||
self.storage.content_find( | self.storage.content_find( | ||||
{'unknown-sha1': 'something'}) # not the right key | {'unknown-sha1': 'something'}) # not the right key | ||||
@istest | def test_object_find_by_sha1_git(self): | ||||
def object_find_by_sha1_git(self): | |||||
sha1_gits = [b'00000000000000000000'] | sha1_gits = [b'00000000000000000000'] | ||||
expected = { | expected = { | ||||
b'00000000000000000000': [], | b'00000000000000000000': [], | ||||
} | } | ||||
self.storage.content_add([self.cont]) | self.storage.content_add([self.cont]) | ||||
sha1_gits.append(self.cont['sha1_git']) | sha1_gits.append(self.cont['sha1_git']) | ||||
expected[self.cont['sha1_git']] = [{ | expected[self.cont['sha1_git']] = [{ | ||||
Show All 28 Lines | def test_object_find_by_sha1_git(self): | ||||
ret = self.storage.object_find_by_sha1_git(sha1_gits) | ret = self.storage.object_find_by_sha1_git(sha1_gits) | ||||
for val in ret.values(): | for val in ret.values(): | ||||
for obj in val: | for obj in val: | ||||
del obj['object_id'] | del obj['object_id'] | ||||
self.assertEqual(expected, ret) | self.assertEqual(expected, ret) | ||||
@istest | def test_tool_add(self): | ||||
def tool_add(self): | |||||
tool = { | tool = { | ||||
'name': 'some-unknown-tool', | 'name': 'some-unknown-tool', | ||||
'version': 'some-version', | 'version': 'some-version', | ||||
'configuration': {"debian-package": "some-package"}, | 'configuration': {"debian-package": "some-package"}, | ||||
} | } | ||||
actual_tool = self.storage.tool_get(tool) | actual_tool = self.storage.tool_get(tool) | ||||
self.assertIsNone(actual_tool) # does not exist | self.assertIsNone(actual_tool) # does not exist | ||||
Show All 10 Lines | def test_tool_add(self): | ||||
actual_tools2 = list(self.storage.tool_add([tool])) | actual_tools2 = list(self.storage.tool_add([tool])) | ||||
actual_tool2 = actual_tools2[0] | actual_tool2 = actual_tools2[0] | ||||
self.assertIsNotNone(actual_tool2) # now it exists | self.assertIsNotNone(actual_tool2) # now it exists | ||||
new_id2 = actual_tool2.pop('id') | new_id2 = actual_tool2.pop('id') | ||||
self.assertEqual(new_id, new_id2) | self.assertEqual(new_id, new_id2) | ||||
self.assertEqual(actual_tool, actual_tool2) | self.assertEqual(actual_tool, actual_tool2) | ||||
@istest | def test_tool_add_multiple(self): | ||||
def tool_add_multiple(self): | |||||
tool = { | tool = { | ||||
'name': 'some-unknown-tool', | 'name': 'some-unknown-tool', | ||||
'version': 'some-version', | 'version': 'some-version', | ||||
'configuration': {"debian-package": "some-package"}, | 'configuration': {"debian-package": "some-package"}, | ||||
} | } | ||||
actual_tools = list(self.storage.tool_add([tool])) | actual_tools = list(self.storage.tool_add([tool])) | ||||
self.assertEqual(len(actual_tools), 1) | self.assertEqual(len(actual_tools), 1) | ||||
new_tools = [tool, { | new_tools = [tool, { | ||||
'name': 'yet-another-tool', | 'name': 'yet-another-tool', | ||||
'version': 'version', | 'version': 'version', | ||||
'configuration': {}, | 'configuration': {}, | ||||
}] | }] | ||||
actual_tools = list(self.storage.tool_add(new_tools)) | actual_tools = list(self.storage.tool_add(new_tools)) | ||||
self.assertEqual(len(actual_tools), 2) | self.assertEqual(len(actual_tools), 2) | ||||
# order not guaranteed, so we iterate over results to check | # order not guaranteed, so we iterate over results to check | ||||
for tool in actual_tools: | for tool in actual_tools: | ||||
_id = tool.pop('id') | _id = tool.pop('id') | ||||
self.assertIsNotNone(_id) | self.assertIsNotNone(_id) | ||||
self.assertIn(tool, new_tools) | self.assertIn(tool, new_tools) | ||||
@istest | def test_tool_get_missing(self): | ||||
def tool_get_missing(self): | |||||
tool = { | tool = { | ||||
'name': 'unknown-tool', | 'name': 'unknown-tool', | ||||
'version': '3.1.0rc2-31-ga2cbb8c', | 'version': '3.1.0rc2-31-ga2cbb8c', | ||||
'configuration': {"command_line": "nomossa <filepath>"}, | 'configuration': {"command_line": "nomossa <filepath>"}, | ||||
} | } | ||||
actual_tool = self.storage.tool_get(tool) | actual_tool = self.storage.tool_get(tool) | ||||
self.assertIsNone(actual_tool) | self.assertIsNone(actual_tool) | ||||
@istest | def test_tool_metadata_get_missing_context(self): | ||||
def tool_metadata_get_missing_context(self): | |||||
tool = { | tool = { | ||||
'name': 'swh-metadata-translator', | 'name': 'swh-metadata-translator', | ||||
'version': '0.0.1', | 'version': '0.0.1', | ||||
'configuration': {"context": "unknown-context"}, | 'configuration': {"context": "unknown-context"}, | ||||
} | } | ||||
actual_tool = self.storage.tool_get(tool) | actual_tool = self.storage.tool_get(tool) | ||||
self.assertIsNone(actual_tool) | self.assertIsNone(actual_tool) | ||||
@istest | def test_tool_metadata_get(self): | ||||
def tool_metadata_get(self): | |||||
tool = { | tool = { | ||||
'name': 'swh-metadata-translator', | 'name': 'swh-metadata-translator', | ||||
'version': '0.0.1', | 'version': '0.0.1', | ||||
'configuration': {"type": "local", "context": "npm"}, | 'configuration': {"type": "local", "context": "npm"}, | ||||
} | } | ||||
tools = list(self.storage.tool_add([tool])) | tools = list(self.storage.tool_add([tool])) | ||||
expected_tool = tools[0] | expected_tool = tools[0] | ||||
# when | # when | ||||
actual_tool = self.storage.tool_get(tool) | actual_tool = self.storage.tool_get(tool) | ||||
# then | # then | ||||
self.assertEqual(expected_tool, actual_tool) | self.assertEqual(expected_tool, actual_tool) | ||||
@istest | def test_metadata_provider_get_by(self): | ||||
def metadata_provider_get_by(self): | |||||
# given | # given | ||||
no_provider = self.storage.metadata_provider_get_by({ | no_provider = self.storage.metadata_provider_get_by({ | ||||
'provider_name': self.provider['name'], | 'provider_name': self.provider['name'], | ||||
'provider_url': self.provider['url'] | 'provider_url': self.provider['url'] | ||||
}) | }) | ||||
self.assertIsNone(no_provider) | self.assertIsNone(no_provider) | ||||
# when | # when | ||||
provider_id = self.storage.metadata_provider_add( | provider_id = self.storage.metadata_provider_add( | ||||
self.provider['name'], | self.provider['name'], | ||||
self.provider['type'], | self.provider['type'], | ||||
self.provider['url'], | self.provider['url'], | ||||
self.provider['metadata']) | self.provider['metadata']) | ||||
actual_provider = self.storage.metadata_provider_get_by({ | actual_provider = self.storage.metadata_provider_get_by({ | ||||
'provider_name': self.provider['name'], | 'provider_name': self.provider['name'], | ||||
'provider_url': self.provider['url'] | 'provider_url': self.provider['url'] | ||||
}) | }) | ||||
# then | # then | ||||
self.assertTrue(provider_id, actual_provider['id']) | self.assertTrue(provider_id, actual_provider['id']) | ||||
@istest | def test_origin_metadata_add(self): | ||||
def origin_metadata_add(self): | |||||
# given | # given | ||||
origin_id = self.storage.origin_add([self.origin])[0]['id'] | origin_id = self.storage.origin_add([self.origin])[0]['id'] | ||||
origin_metadata0 = list(self.storage.origin_metadata_get_by(origin_id)) | origin_metadata0 = list(self.storage.origin_metadata_get_by(origin_id)) | ||||
self.assertTrue(len(origin_metadata0) == 0) | self.assertTrue(len(origin_metadata0) == 0) | ||||
tools = list(self.storage.tool_add([self.metadata_tool])) | tools = list(self.storage.tool_add([self.metadata_tool])) | ||||
tool = tools[0] | tool = tools[0] | ||||
Show All 16 Lines | def test_origin_metadata_add(self): | ||||
tool['id'], | tool['id'], | ||||
self.origin_metadata['metadata']) | self.origin_metadata['metadata']) | ||||
actual_om1 = list(self.storage.origin_metadata_get_by(origin_id)) | actual_om1 = list(self.storage.origin_metadata_get_by(origin_id)) | ||||
# then | # then | ||||
self.assertEqual(actual_om1[0]['id'], o_m1) | self.assertEqual(actual_om1[0]['id'], o_m1) | ||||
self.assertEqual(len(actual_om1), 1) | self.assertEqual(len(actual_om1), 1) | ||||
self.assertEqual(actual_om1[0]['origin_id'], origin_id) | self.assertEqual(actual_om1[0]['origin_id'], origin_id) | ||||
@istest | def test_origin_metadata_get(self): | ||||
def origin_metadata_get(self): | |||||
# given | # given | ||||
origin_id = self.storage.origin_add([self.origin])[0]['id'] | origin_id = self.storage.origin_add([self.origin])[0]['id'] | ||||
origin_id2 = self.storage.origin_add([self.origin2])[0]['id'] | origin_id2 = self.storage.origin_add([self.origin2])[0]['id'] | ||||
self.storage.metadata_provider_add(self.provider['name'], | self.storage.metadata_provider_add(self.provider['name'], | ||||
self.provider['type'], | self.provider['type'], | ||||
self.provider['url'], | self.provider['url'], | ||||
self.provider['metadata']) | self.provider['metadata']) | ||||
▲ Show 20 Lines • Show All 61 Lines • ▼ Show 20 Lines | def test_origin_metadata_get(self): | ||||
}] | }] | ||||
# then | # then | ||||
self.assertEqual(len(all_metadatas), 2) | self.assertEqual(len(all_metadatas), 2) | ||||
self.assertEqual(len(metadatas_for_origin2), 1) | self.assertEqual(len(metadatas_for_origin2), 1) | ||||
self.assertEqual(metadatas_for_origin2[0]['id'], o_m2) | self.assertEqual(metadatas_for_origin2[0]['id'], o_m2) | ||||
self.assertEqual(all_metadatas, expected_results) | self.assertEqual(all_metadatas, expected_results) | ||||
@istest | def test_origin_metadata_get_by_provider_type(self): | ||||
def origin_metadata_get_by_provider_type(self): | |||||
# given | # given | ||||
origin_id = self.storage.origin_add([self.origin])[0]['id'] | origin_id = self.storage.origin_add([self.origin])[0]['id'] | ||||
origin_id2 = self.storage.origin_add([self.origin2])[0]['id'] | origin_id2 = self.storage.origin_add([self.origin2])[0]['id'] | ||||
self.storage.metadata_provider_add( | self.storage.metadata_provider_add( | ||||
self.provider['name'], | self.provider['name'], | ||||
self.provider['type'], | self.provider['type'], | ||||
self.provider['url'], | self.provider['url'], | ||||
self.provider['metadata']) | self.provider['metadata']) | ||||
▲ Show 20 Lines • Show All 61 Lines • ▼ Show 20 Lines | def test_origin_metadata_get_by_provider_type(self): | ||||
self.assertIsNotNone(o_m1) | self.assertIsNotNone(o_m1) | ||||
class TestLocalStorage(CommonTestStorage, unittest.TestCase): | class TestLocalStorage(CommonTestStorage, unittest.TestCase): | ||||
"""Test the local storage""" | """Test the local storage""" | ||||
# Can only be tested with local storage as you can't mock | # Can only be tested with local storage as you can't mock | ||||
# datetimes for the remote server | # datetimes for the remote server | ||||
@istest | def test_fetch_history(self): | ||||
def fetch_history(self): | |||||
origin = self.storage.origin_add_one(self.origin) | origin = self.storage.origin_add_one(self.origin) | ||||
with patch('datetime.datetime'): | with patch('datetime.datetime'): | ||||
datetime.datetime.now.return_value = self.fetch_history_date | datetime.datetime.now.return_value = self.fetch_history_date | ||||
fetch_history_id = self.storage.fetch_history_start(origin) | fetch_history_id = self.storage.fetch_history_start(origin) | ||||
datetime.datetime.now.assert_called_with(tz=datetime.timezone.utc) | datetime.datetime.now.assert_called_with(tz=datetime.timezone.utc) | ||||
with patch('datetime.datetime'): | with patch('datetime.datetime'): | ||||
datetime.datetime.now.return_value = self.fetch_history_end | datetime.datetime.now.return_value = self.fetch_history_end | ||||
self.storage.fetch_history_end(fetch_history_id, | self.storage.fetch_history_end(fetch_history_id, | ||||
self.fetch_history_data) | self.fetch_history_data) | ||||
fetch_history = self.storage.fetch_history_get(fetch_history_id) | fetch_history = self.storage.fetch_history_get(fetch_history_id) | ||||
expected_fetch_history = self.fetch_history_data.copy() | expected_fetch_history = self.fetch_history_data.copy() | ||||
expected_fetch_history['id'] = fetch_history_id | expected_fetch_history['id'] = fetch_history_id | ||||
expected_fetch_history['origin'] = origin | expected_fetch_history['origin'] = origin | ||||
expected_fetch_history['date'] = self.fetch_history_date | expected_fetch_history['date'] = self.fetch_history_date | ||||
expected_fetch_history['duration'] = self.fetch_history_duration | expected_fetch_history['duration'] = self.fetch_history_duration | ||||
self.assertEqual(expected_fetch_history, fetch_history) | self.assertEqual(expected_fetch_history, fetch_history) | ||||
# The remote API doesn't expose _person_add | # The remote API doesn't expose _person_add | ||||
@istest | def test_person_get(self): | ||||
def person_get(self): | |||||
# given | # given | ||||
person0 = { | person0 = { | ||||
'fullname': b'bob <alice@bob>', | 'fullname': b'bob <alice@bob>', | ||||
'name': b'bob', | 'name': b'bob', | ||||
'email': b'alice@bob', | 'email': b'alice@bob', | ||||
} | } | ||||
id0 = self.storage._person_add(person0) | id0 = self.storage._person_add(person0) | ||||
Show All 21 Lines | def test_person_get(self): | ||||
'fullname': person1['fullname'], | 'fullname': person1['fullname'], | ||||
'name': person1['name'], | 'name': person1['name'], | ||||
'email': person1['email'], | 'email': person1['email'], | ||||
}, | }, | ||||
]) | ]) | ||||
# This test is only relevant on the local storage, with an actual | # This test is only relevant on the local storage, with an actual | ||||
# objstorage raising an exception | # objstorage raising an exception | ||||
@istest | def test_content_add_objstorage_exception(self): | ||||
def content_add_objstorage_exception(self): | |||||
self.storage.objstorage.add = Mock( | self.storage.objstorage.add = Mock( | ||||
side_effect=Exception('mocked broken objstorage') | side_effect=Exception('mocked broken objstorage') | ||||
) | ) | ||||
with self.assertRaises(Exception) as e: | with self.assertRaises(Exception) as e: | ||||
self.storage.content_add([self.cont]) | self.storage.content_add([self.cont]) | ||||
self.assertEqual(e.exception.args, ('mocked broken objstorage',)) | self.assertEqual(e.exception.args, ('mocked broken objstorage',)) | ||||
missing = list(self.storage.content_missing([self.cont])) | missing = list(self.storage.content_missing([self.cont])) | ||||
self.assertEqual(missing, [self.cont['sha1']]) | self.assertEqual(missing, [self.cont['sha1']]) | ||||
class AlteringSchemaTest(BaseTestStorage, unittest.TestCase): | class AlteringSchemaTest(BaseTestStorage, unittest.TestCase): | ||||
"""This class is dedicated for the rare case where the schema needs to | """This class is dedicated for the rare case where the schema needs to | ||||
be altered dynamically. | be altered dynamically. | ||||
Otherwise, the tests could be blocking when ran altogether. | Otherwise, the tests could be blocking when ran altogether. | ||||
""" | """ | ||||
@istest | def test_content_update(self): | ||||
def content_update(self): | |||||
cont = copy.deepcopy(self.cont) | cont = copy.deepcopy(self.cont) | ||||
self.storage.content_add([cont]) | self.storage.content_add([cont]) | ||||
# alter the sha1_git for example | # alter the sha1_git for example | ||||
cont['sha1_git'] = hash_to_bytes( | cont['sha1_git'] = hash_to_bytes( | ||||
'3a60a5275d0333bf13468e8b3dcab90f4046e654') | '3a60a5275d0333bf13468e8b3dcab90f4046e654') | ||||
self.storage.content_update([cont], keys=['sha1_git']) | self.storage.content_update([cont], keys=['sha1_git']) | ||||
with self.storage.get_db().transaction() as cur: | with self.storage.get_db().transaction() as cur: | ||||
cur.execute('SELECT sha1, sha1_git, sha256, length, status' | cur.execute('SELECT sha1, sha1_git, sha256, length, status' | ||||
' FROM content WHERE sha1 = %s', | ' FROM content WHERE sha1 = %s', | ||||
(cont['sha1'],)) | (cont['sha1'],)) | ||||
datum = cur.fetchone() | datum = cur.fetchone() | ||||
self.assertEqual( | self.assertEqual( | ||||
(datum[0].tobytes(), datum[1].tobytes(), datum[2].tobytes(), | (datum[0].tobytes(), datum[1].tobytes(), datum[2].tobytes(), | ||||
datum[3], datum[4]), | datum[3], datum[4]), | ||||
(cont['sha1'], cont['sha1_git'], cont['sha256'], | (cont['sha1'], cont['sha1_git'], cont['sha256'], | ||||
cont['length'], 'visible')) | cont['length'], 'visible')) | ||||
@istest | def test_content_update_with_new_cols(self): | ||||
def content_update_with_new_cols(self): | |||||
with self.storage.get_db().transaction() as cur: | with self.storage.get_db().transaction() as cur: | ||||
cur.execute("""alter table content | cur.execute("""alter table content | ||||
add column test text default null, | add column test text default null, | ||||
add column test2 text default null""") | add column test2 text default null""") | ||||
cont = copy.deepcopy(self.cont2) | cont = copy.deepcopy(self.cont2) | ||||
self.storage.content_add([cont]) | self.storage.content_add([cont]) | ||||
cont['test'] = 'value-1' | cont['test'] = 'value-1' | ||||
Show All 20 Lines |