diff --git a/swh/vault/backend.py b/swh/vault/backend.py index 8f594b3..d5cf91b 100644 --- a/swh/vault/backend.py +++ b/swh/vault/backend.py @@ -1,251 +1,254 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import smtplib import celery import psycopg2 import psycopg2.extras from functools import wraps from email.mime.text import MIMEText from swh.model import hashutil from swh.scheduler.utils import get_task from swh.vault.cache import VaultCache from swh.vault.cookers import get_cooker from swh.vault.cooking_tasks import SWHCookingTask # noqa cooking_task_name = 'swh.vault.cooking_tasks.SWHCookingTask' NOTIF_EMAIL_FROM = ('"Software Heritage Vault" ' '') NOTIF_EMAIL_SUBJECT = ("Bundle ready: {obj_type} {short_id}") NOTIF_EMAIL_BODY = """ You have requested the following bundle from the Software Heritage Vault: Object Type: {obj_type} Object ID: {hex_id} This bundle is now available for download at the following address: {url} Please keep in mind that this link might expire at some point, in which case you will need to request the bundle again. --\x20 The Software Heritage Developers """ # TODO: Imported from swh.scheduler.backend. Factorization needed. def autocommit(fn): @wraps(fn) def wrapped(self, *args, **kwargs): autocommit = False # TODO: I don't like using None, it's confusing for the user. how about # a NEW_CURSOR object()? if 'cursor' not in kwargs or not kwargs['cursor']: autocommit = True kwargs['cursor'] = self.cursor() try: ret = fn(self, *args, **kwargs) except: if autocommit: self.rollback() raise if autocommit: self.commit() return ret return wrapped # TODO: This has to be factorized with other database base classes and helpers # (swh.scheduler.backend.SchedulerBackend, swh.storage.db.BaseDb, ...) # The three first methods are imported from swh.scheduler.backend. class VaultBackend: """ Backend for the Software Heritage vault. """ def __init__(self, config): self.config = config self.cache = VaultCache(**self.config['cache']) self.db = None self.reconnect() self.smtp_server = smtplib.SMTP('localhost') def reconnect(self): if not self.db or self.db.closed: self.db = psycopg2.connect( dsn=self.config['vault_db'], cursor_factory=psycopg2.extras.RealDictCursor, ) + def close(self): + self.db.close() + def cursor(self): """Return a fresh cursor on the database, with auto-reconnection in case of failure""" cur = None # Get a fresh cursor and reconnect at most three times tries = 0 while True: tries += 1 try: cur = self.db.cursor() cur.execute('select 1') break except psycopg2.OperationalError: if tries < 3: self.reconnect() else: raise return cur def commit(self): """Commit a transaction""" self.db.commit() def rollback(self): """Rollback a transaction""" self.db.rollback() @autocommit def task_info(self, obj_type, obj_id, cursor=None): obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' SELECT id, type, object_id, task_uuid, task_status, ts_created, ts_done, progress_msg FROM vault_bundle WHERE type = %s AND object_id = %s''', (obj_type, obj_id)) res = cursor.fetchone() if res: res['object_id'] = bytes(res['object_id']) return res @autocommit def create_task(self, obj_type, obj_id, cursor=None): obj_id = hashutil.hash_to_bytes(obj_id) args = [self.config, obj_type, obj_id] cooker = get_cooker(obj_type)(*args) cooker.check_exists() task_uuid = celery.uuid() cursor.execute(''' INSERT INTO vault_bundle (type, object_id, task_uuid) VALUES (%s, %s, %s)''', (obj_type, obj_id, task_uuid)) self.commit() task = get_task(cooking_task_name) task.apply_async(args, task_id=task_uuid) @autocommit def add_notif_email(self, obj_type, obj_id, email, cursor=None): obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' INSERT INTO vault_notif_email (email, bundle_id) VALUES (%s, (SELECT id FROM vault_bundle WHERE type = %s AND object_id = %s))''', (email, obj_type, obj_id)) @autocommit def cook_request(self, obj_type, obj_id, email=None, cursor=None): info = self.task_info(obj_type, obj_id) if info is None: self.create_task(obj_type, obj_id) if email is not None: if info is not None and info['task_status'] == 'done': self.send_notification(None, email, obj_type, obj_id) else: self.add_notif_email(obj_type, obj_id, email) info = self.task_info(obj_type, obj_id) return info @autocommit def is_available(self, obj_type, obj_id, cursor=None): info = self.task_info(obj_type, obj_id, cursor=cursor) return (info is not None and info['task_status'] == 'done' and self.cache.is_cached(obj_type, obj_id)) @autocommit def fetch(self, obj_type, obj_id, cursor=None): if not self.is_available(obj_type, obj_id, cursor=cursor): return None self.update_access_ts(obj_type, obj_id, cursor=cursor) return self.cache.get(obj_type, obj_id) @autocommit def update_access_ts(self, obj_type, obj_id, cursor=None): obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' UPDATE vault_bundle SET ts_last_access = NOW() WHERE type = %s AND object_id = %s''', (obj_type, obj_id)) @autocommit def set_status(self, obj_type, obj_id, status, cursor=None): obj_id = hashutil.hash_to_bytes(obj_id) req = (''' UPDATE vault_bundle SET task_status = %s ''' + (''', ts_done = NOW() ''' if status == 'done' else '') + '''WHERE type = %s AND object_id = %s''') cursor.execute(req, (status, obj_type, obj_id)) @autocommit def set_progress(self, obj_type, obj_id, progress, cursor=None): obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' UPDATE vault_bundle SET progress_msg = %s WHERE type = %s AND object_id = %s''', (progress, obj_type, obj_id)) @autocommit def send_all_notifications(self, obj_type, obj_id, cursor=None): obj_id = hashutil.hash_to_bytes(obj_id) cursor.execute(''' SELECT vault_notif_email.id AS id, email FROM vault_notif_email INNER JOIN vault_bundle ON bundle_id = vault_bundle.id WHERE vault_bundle.type = %s AND vault_bundle.object_id = %s''', (obj_type, obj_id)) for d in cursor: self.send_notification(d['id'], d['email'], obj_type, obj_id) @autocommit def send_notification(self, n_id, email, obj_type, obj_id, cursor=None): hex_id = hashutil.hash_to_hex(obj_id) short_id = hex_id[:7] # TODO: instead of hardcoding this, we should probably: # * add a "fetch_url" field in the vault_notif_email table # * generate the url with flask.url_for() on the web-ui side # * send this url as part of the cook request and store it in # the table # * use this url for the notification e-mail url = ('https://archive.softwareheritage.org/api/1/vault/{}/{}/' 'raw'.format(obj_type, hex_id)) text = NOTIF_EMAIL_BODY.strip() text = text.format(obj_type=obj_type, hex_id=hex_id, url=url) msg = MIMEText(text) msg['Subject'] = (NOTIF_EMAIL_SUBJECT .format(obj_type=obj_type, short_id=short_id)) msg['From'] = NOTIF_EMAIL_FROM msg['To'] = email self.smtp_server.send_message(msg) if n_id is not None: cursor.execute(''' DELETE FROM vault_notif_email WHERE id = %s''', (n_id,)) diff --git a/swh/vault/tests/test_cookers.py b/swh/vault/tests/test_cookers.py new file mode 100644 index 0000000..cc81028 --- /dev/null +++ b/swh/vault/tests/test_cookers.py @@ -0,0 +1,274 @@ +# Copyright (C) 2017 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import contextlib +import datetime +import gzip +import io +import os +import pathlib +import subprocess +import tarfile +import tempfile +import unittest + +import dulwich.fastexport +import dulwich.index +import dulwich.objects +import dulwich.porcelain +import dulwich.repo + +from swh.core.tests.db_testing import DbTestFixture +from swh.loader.git.loader import GitLoader +from swh.model import hashutil +from swh.model.git import compute_hashes_from_directory +from swh.storage.tests.storage_testing import StorageTestFixture +from swh.vault.cookers import DirectoryCooker, RevisionGitfastCooker +from swh.vault.tests.vault_testing import VaultTestFixture + + +class TestRepo: + """A tiny context manager for a test git repository, with some utility + functions to perform basic git stuff. + """ + def __enter__(self): + self.tmp_dir = tempfile.TemporaryDirectory(prefix='tmp-vault-repo-') + self.repo_dir = self.tmp_dir.__enter__() + self.repo = dulwich.repo.Repo.init(self.repo_dir) + self.author = '"Test Author" '.encode() + return pathlib.Path(self.repo_dir) + + def __exit__(self, exc, value, tb): + self.tmp_dir.__exit__(exc, value, tb) + + def checkout(self, rev_sha): + rev = self.repo[rev_sha] + dulwich.index.build_index_from_tree(self.repo_dir, + self.repo.index_path(), + self.repo.object_store, + rev.tree) + + def git_shell(self, *cmd, stdout=subprocess.DEVNULL, **kwargs): + subprocess.check_call(('git', '-C', self.repo_dir) + cmd, + stdout=stdout, **kwargs) + + def commit(self, message='Commit test\n', ref=b'HEAD'): + self.git_shell('add', '.') + message = message.encode() + b'\n' + return self.repo.do_commit(message=message, committer=self.author, + ref=ref) + + def merge(self, parent_sha_list, message='Merge branches.'): + self.git_shell('merge', '--allow-unrelated-histories', + '-m', message, *[p.decode() for p in parent_sha_list]) + return self.repo.refs[b'HEAD'] + + def print_debug_graph(self, reflog=False): + args = ['log', '--all', '--graph', '--decorate'] + if reflog: + args.append('--reflog') + self.git_shell(*args, stdout=None) + + +TEST_CONTENT = (" test content\n" + "and unicode \N{BLACK HEART SUIT}\n" + " and trailing spaces ") +TEST_EXECUTABLE = b'\x42\x40\x00\x00\x05' + + +class BaseTestCookers(VaultTestFixture, StorageTestFixture, DbTestFixture): + """Base class of cookers unit tests""" + def setUp(self): + super().setUp() + self.loader = GitLoader() + self.loader.storage = self.storage + + def load(self, repo_path): + """Load a repository in the test storage""" + self.loader.load('fake_origin', repo_path, datetime.datetime.now()) + + @contextlib.contextmanager + def cook_extract_directory(self, obj_id): + """Context manager that cooks a directory and extract it.""" + cooker = DirectoryCooker(self.vault_config, 'directory', obj_id) + with cooker: + cooker.check_exists() # Raises if false + tarball = b''.join(cooker.prepare_bundle()) + with tempfile.TemporaryDirectory('tmp-vault-extract-') as td: + fobj = io.BytesIO(tarball) + with tarfile.open(fileobj=fobj, mode='r') as tar: + tar.extractall(td) + p = pathlib.Path(td) / hashutil.hash_to_hex(obj_id) + yield p + + @contextlib.contextmanager + def cook_extract_revision_gitfast(self, obj_id): + """Context manager that cooks a revision and extract it.""" + cooker = RevisionGitfastCooker(self.vault_config, 'revision_gitfast', + obj_id) + with cooker: + cooker.check_exists() # Raises if false + fastexport = b''.join(cooker.prepare_bundle()) + fastexport_stream = gzip.GzipFile(fileobj=io.BytesIO(fastexport)) + test_repo = TestRepo() + with test_repo as p: + processor = dulwich.fastexport.GitImportProcessor(test_repo.repo) + processor.import_stream(fastexport_stream) + yield test_repo, p + + +class TestDirectoryCooker(BaseTestCookers, unittest.TestCase): + def test_directory_simple(self): + repo = TestRepo() + with repo as rp: + (rp / 'file').write_text(TEST_CONTENT) + (rp / 'executable').write_bytes(TEST_EXECUTABLE) + (rp / 'executable').chmod(0o755) + (rp / 'link').symlink_to('file') + (rp / 'dir1/dir2').mkdir(parents=True) + (rp / 'dir1/dir2/file').write_text(TEST_CONTENT) + c = repo.commit() + self.load(str(rp)) + + obj_id_hex = repo.repo[c].tree.decode() + obj_id = hashutil.hash_to_bytes(obj_id_hex) + + with self.cook_extract_directory(obj_id) as p: + self.assertEqual((p / 'file').stat().st_mode, 0o100644) + self.assertEqual((p / 'file').read_text(), TEST_CONTENT) + self.assertEqual((p / 'executable').stat().st_mode, 0o100755) + self.assertEqual((p / 'executable').read_bytes(), TEST_EXECUTABLE) + self.assertTrue((p / 'link').is_symlink) + self.assertEqual(os.readlink(str(p / 'link')), 'file') + self.assertEqual((p / 'dir1/dir2/file').stat().st_mode, 0o100644) + self.assertEqual((p / 'dir1/dir2/file').read_text(), TEST_CONTENT) + + dir_pb = bytes(p) + dir_hashes = compute_hashes_from_directory(dir_pb)[dir_pb] + dir_hash = dir_hashes['checksums']['sha1_git'] + self.assertEqual(obj_id_hex, hashutil.hash_to_hex(dir_hash)) + + +class TestRevisionGitfastCooker(BaseTestCookers, unittest.TestCase): + def test_revision_simple(self): + # + # 1--2--3--4--5--6--7 + # + repo = TestRepo() + with repo as rp: + (rp / 'file1').write_text(TEST_CONTENT) + repo.commit('add file1') + (rp / 'file2').write_text(TEST_CONTENT) + repo.commit('add file2') + (rp / 'dir1/dir2').mkdir(parents=True) + (rp / 'dir1/dir2/file').write_text(TEST_CONTENT) + repo.commit('add dir1/dir2/file') + (rp / 'bin1').write_bytes(TEST_EXECUTABLE) + (rp / 'bin1').chmod(0o755) + repo.commit('add bin1') + (rp / 'link1').symlink_to('file1') + repo.commit('link link1 to file1') + (rp / 'file2').unlink() + repo.commit('remove file2') + (rp / 'bin1').rename(rp / 'bin') + repo.commit('rename bin1 to bin') + self.load(str(rp)) + obj_id_hex = repo.repo.refs[b'HEAD'].decode() + obj_id = hashutil.hash_to_bytes(obj_id_hex) + + with self.cook_extract_revision_gitfast(obj_id) as (ert, p): + ert.checkout(b'HEAD') + self.assertEqual((p / 'file1').stat().st_mode, 0o100644) + self.assertEqual((p / 'file1').read_text(), TEST_CONTENT) + self.assertTrue((p / 'link1').is_symlink) + self.assertEqual(os.readlink(str(p / 'link1')), 'file1') + self.assertEqual((p / 'bin').stat().st_mode, 0o100755) + self.assertEqual((p / 'bin').read_bytes(), TEST_EXECUTABLE) + self.assertEqual((p / 'dir1/dir2/file').read_text(), TEST_CONTENT) + self.assertEqual((p / 'dir1/dir2/file').stat().st_mode, 0o100644) + self.assertEqual(ert.repo.refs[b'HEAD'].decode(), obj_id_hex) + + def test_revision_two_roots(self): + # + # 1----3---4 + # / + # 2---- + # + repo = TestRepo() + with repo as rp: + (rp / 'file1').write_text(TEST_CONTENT) + c1 = repo.commit('Add file1') + del repo.repo.refs[b'refs/heads/master'] # git update-ref -d HEAD + (rp / 'file2').write_text(TEST_CONTENT) + repo.commit('Add file2') + repo.merge([c1]) + (rp / 'file3').write_text(TEST_CONTENT) + repo.commit('add file3') + obj_id_hex = repo.repo.refs[b'HEAD'].decode() + obj_id = hashutil.hash_to_bytes(obj_id_hex) + self.load(str(rp)) + + with self.cook_extract_revision_gitfast(obj_id) as (ert, p): + self.assertEqual(ert.repo.refs[b'HEAD'].decode(), obj_id_hex) + + def test_revision_two_double_fork_merge(self): + # + # 2---4---6 + # / / / + # 1---3---5 + # + repo = TestRepo() + with repo as rp: + (rp / 'file1').write_text(TEST_CONTENT) + c1 = repo.commit('Add file1') + repo.repo.refs[b'refs/heads/c1'] = c1 + + (rp / 'file2').write_text(TEST_CONTENT) + repo.commit('Add file2') + + (rp / 'file3').write_text(TEST_CONTENT) + c3 = repo.commit('Add file3', ref=b'refs/heads/c1') + repo.repo.refs[b'refs/heads/c3'] = c3 + + repo.merge([c3]) + + (rp / 'file5').write_text(TEST_CONTENT) + c5 = repo.commit('Add file3', ref=b'refs/heads/c3') + + repo.merge([c5]) + + obj_id_hex = repo.repo.refs[b'HEAD'].decode() + obj_id = hashutil.hash_to_bytes(obj_id_hex) + self.load(str(rp)) + + with self.cook_extract_revision_gitfast(obj_id) as (ert, p): + self.assertEqual(ert.repo.refs[b'HEAD'].decode(), obj_id_hex) + + def test_revision_triple_merge(self): + # + # .---.---5 + # / / / + # 2 3 4 + # / / / + # 1---.---. + # + repo = TestRepo() + with repo as rp: + (rp / 'file1').write_text(TEST_CONTENT) + c1 = repo.commit('Commit 1') + repo.repo.refs[b'refs/heads/b1'] = c1 + repo.repo.refs[b'refs/heads/b2'] = c1 + + repo.commit('Commit 2') + c3 = repo.commit('Commit 3', ref=b'refs/heads/b1') + c4 = repo.commit('Commit 4', ref=b'refs/heads/b2') + repo.merge([c3, c4]) + + obj_id_hex = repo.repo.refs[b'HEAD'].decode() + obj_id = hashutil.hash_to_bytes(obj_id_hex) + self.load(str(rp)) + + with self.cook_extract_revision_gitfast(obj_id) as (ert, p): + self.assertEqual(ert.repo.refs[b'HEAD'].decode(), obj_id_hex) diff --git a/swh/vault/tests/vault_testing.py b/swh/vault/tests/vault_testing.py new file mode 100644 index 0000000..3dedd37 --- /dev/null +++ b/swh/vault/tests/vault_testing.py @@ -0,0 +1,51 @@ +# Copyright (C) 2017 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import tempfile +import pathlib +from swh.vault.backend import VaultBackend + + +class VaultTestFixture: + """Mix this in a test subject class to get Vault Database testing support. + + This fixture requires to come before DbTestFixture and StorageTestFixture + in the inheritance list as it uses their methods to setup its own internal + components. + + Usage example: + + class TestVault(VaultTestFixture, StorageTestFixture, DbTestFixture): + ... + """ + TEST_VAULT_DB_NAME = 'softwareheritage-test-vault' + + @classmethod + def setUpClass(cls): + if not hasattr(cls, 'DB_TEST_FIXTURE_IMPORTED'): + raise RuntimeError("VaultTestFixture needs to be followed by " + "DbTestFixture in the inheritance list.") + + test_dir = pathlib.Path(__file__).absolute().parent + test_db_dump = test_dir / '../../../sql/swh-vault-schema.sql' + test_db_dump = test_db_dump.absolute() + cls.add_db(cls.TEST_VAULT_DB_NAME, str(test_db_dump), 'psql') + super().setUpClass() + + def setUp(self): + super().setUp() + self.cache_root = tempfile.TemporaryDirectory('vault-cache-') + self.vault_config = { + 'storage': self.storage_config, + 'vault_db': 'postgresql:///' + self.TEST_VAULT_DB_NAME, + 'cache': {'root': self.cache_root.name} + } + self.vault_backend = VaultBackend(self.vault_config) + + def tearDown(self): + self.reset_tables() + self.cache_root.cleanup() + self.vault_backend.close() + super().tearDown()