diff --git a/swh/vault/cookers/base.py b/swh/vault/cookers/base.py index 261a35a..ee767da 100644 --- a/swh/vault/cookers/base.py +++ b/swh/vault/cookers/base.py @@ -1,234 +1,268 @@ -# Copyright (C) 2016-2017 The Software Heritage developers +# Copyright (C) 2016-2018 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import io import itertools +import logging import os import tarfile import tempfile from pathlib import Path from swh.core import config from swh.model import hashutil from swh.model.from_disk import mode_to_perms, DentryPerms from swh.storage import get_storage from swh.vault.api.client import RemoteVaultClient DEFAULT_CONFIG_PATH = 'vault/cooker' DEFAULT_CONFIG = { 'storage': ('dict', { 'cls': 'remote', 'args': { 'url': 'http://localhost:5002/', }, }), 'vault_url': ('str', 'http://localhost:5005/') } +MAX_BUNDLE_SIZE = 2 ** 29 # 512 MiB + + +class PolicyError(Exception): + """Raised when the bundle violates the cooking policy.""" + pass + + +class BundleTooLargeError(PolicyError): + """Raised when the bundle is too large to be cooked.""" + pass + class BaseVaultCooker(metaclass=abc.ABCMeta): """Abstract base class for the vault's bundle creators This class describes a common API for the cookers. To define a new cooker, inherit from this class and override: - CACHE_TYPE_KEY: key to use for the bundle to reference in cache - def cook(): cook the object into a bundle """ CACHE_TYPE_KEY = None def __init__(self, obj_type, obj_id): """Initialize the cooker. The type of the object represented by the id depends on the concrete class. Very likely, each type of bundle will have its own cooker class. Args: storage: the storage object cache: the cache where to store the bundle obj_id: id of the object to be cooked into a bundle. """ self.config = config.load_named_config(DEFAULT_CONFIG_PATH, DEFAULT_CONFIG) self.obj_type = obj_type self.obj_id = hashutil.hash_to_bytes(obj_id) self.backend = RemoteVaultClient(self.config['vault_url']) self.storage = get_storage(**self.config['storage']) @abc.abstractmethod def check_exists(self): """Checks that the requested object exists and can be cooked. Override this in the cooker implementation. """ raise NotImplemented @abc.abstractmethod def prepare_bundle(self): """Implementation of the cooker. Yields chunks of the bundle bytes. Override this with the cooker implementation. """ raise NotImplemented def cook(self): """Cook the requested object into a bundle """ self.backend.set_status(self.obj_type, self.obj_id, 'pending') self.backend.set_progress(self.obj_type, self.obj_id, 'Processing...') - content_iter = self.prepare_bundle() - # TODO: use proper content streaming try: - bundle = b''.join(content_iter) + content_iter = self.prepare_bundle() + bundle = b'' + for chunk in content_iter: + bundle += chunk + if len(bundle) > MAX_BUNDLE_SIZE: + raise BundleTooLargeError("Bundle is too large.") + except PolicyError as e: + self.backend.set_status(self.obj_type, self.obj_id, 'failed') + self.backend.set_progress(self.obj_type, self.obj_id, str(e)) except Exception as e: self.backend.set_status(self.obj_type, self.obj_id, 'failed') - self.backend.set_progress(self.obj_type, self.obj_id, e.message) + self.backend.set_progress( + self.obj_type, self.obj_id, + "Internal Server Error. This incident will be reported.") + logging.exception("Bundle cooking failed.") else: + # TODO: use proper content streaming instead of put_bundle() self.backend.put_bundle(self.CACHE_TYPE_KEY, self.obj_id, bundle) self.backend.set_status(self.obj_type, self.obj_id, 'done') self.backend.set_progress(self.obj_type, self.obj_id, None) finally: self.backend.send_notif(self.obj_type, self.obj_id) SKIPPED_MESSAGE = (b'This content has not been retrieved in the ' b'Software Heritage archive due to its size.') HIDDEN_MESSAGE = (b'This content is hidden.') def get_filtered_file_content(storage, file_data): """Retrieve the file specified by file_data and apply filters for skipped and missing contents. Args: storage: the storage from which to retrieve the object file_data: file entry descriptor as returned by directory_ls() Returns: Bytes containing the specified content. The content will be replaced by a specific message to indicate that the content could not be retrieved (either due to privacy policy or because its size was too big for us to archive it). """ assert file_data['type'] == 'file' if file_data['status'] == 'absent': return SKIPPED_MESSAGE elif file_data['status'] == 'hidden': return HIDDEN_MESSAGE else: return list(storage.content_get([file_data['sha1']]))[0]['data'] +# TODO: We should use something like that for all the IO done by the cookers. +# Instead of using generators to yield chunks, we should just write() the +# chunks in an object like this, which would give us way better control over +# the buffering, and allow for streaming content to the objstorage. +class BytesIOBundleSizeLimit(io.BytesIO): + def write(self, chunk): + if self.getbuffer().nbytes + len(chunk) > MAX_BUNDLE_SIZE: + raise BundleTooLargeError("Bundle is too large.") + return super().write(chunk) + + def get_tar_bytes(path, arcname=None): path = Path(path) if not arcname: arcname = path.name - tar_buffer = io.BytesIO() + tar_buffer = BytesIOBundleSizeLimit() tar = tarfile.open(fileobj=tar_buffer, mode='w') tar.add(str(path), arcname=arcname) return tar_buffer.getbuffer() class DirectoryBuilder: """Creates a cooked directory from its sha1_git in the db. Warning: This is NOT a directly accessible cooker, but a low-level one that executes the manipulations. """ def __init__(self, storage): self.storage = storage def get_directory_bytes(self, dir_id): # Create temporary folder to retrieve the files into. root = bytes(tempfile.mkdtemp(prefix='directory.', suffix='.cook'), 'utf8') self.build_directory(dir_id, root) # Use the created directory to make a bundle with the data as # a compressed directory. bundle_content = self._create_bundle_content( root, hashutil.hash_to_hex(dir_id)) return bundle_content def build_directory(self, dir_id, root): # Retrieve data from the database. data = self.storage.directory_ls(dir_id, recursive=True) # Split into files and directory data. # TODO(seirl): also handle revision data. data1, data2 = itertools.tee(data, 2) dir_data = (entry['name'] for entry in data1 if entry['type'] == 'dir') file_data = (entry for entry in data2 if entry['type'] == 'file') # Recreate the directory's subtree and then the files into it. self._create_tree(root, dir_data) self._create_files(root, file_data) def _create_tree(self, root, directory_paths): """Create a directory tree from the given paths The tree is created from `root` and each given path in `directory_paths` will be created. """ # Directories are sorted by depth so they are created in the # right order bsep = bytes(os.path.sep, 'utf8') dir_names = sorted( directory_paths, key=lambda x: len(x.split(bsep))) for dir_name in dir_names: os.makedirs(os.path.join(root, dir_name)) def _create_files(self, root, file_datas): """Create the files according to their status. """ # Then create the files for file_data in file_datas: path = os.path.join(root, file_data['name']) content = get_filtered_file_content(self.storage, file_data) self._create_file(path, content, file_data['perms']) def _create_file(self, path, content, mode=0o100644): """Create the given file and fill it with content. """ perms = mode_to_perms(mode) if perms == DentryPerms.symlink: os.symlink(content, path) else: with open(path, 'wb') as f: f.write(content) os.chmod(path, perms.value) def _get_file_content(self, obj_id): """Get the content of the given file. """ content = list(self.storage.content_get([obj_id]))[0]['data'] return content def _create_bundle_content(self, path, hex_dir_id): """Create a bundle from the given directory Args: path: location of the directory to package. hex_dir_id: hex representation of the directory id Returns: bytes that represent the compressed directory as a bundle. """ return get_tar_bytes(path.decode(), hex_dir_id) diff --git a/swh/vault/tests/test_cookers_base.py b/swh/vault/tests/test_cookers_base.py new file mode 100644 index 0000000..63fb408 --- /dev/null +++ b/swh/vault/tests/test_cookers_base.py @@ -0,0 +1,94 @@ +# Copyright (C) 2018 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import pathlib +import tempfile +import unittest +from unittest.mock import MagicMock, patch + +from swh.model import hashutil +from swh.vault.cookers.base import (BaseVaultCooker, get_tar_bytes, + BundleTooLargeError) + + +TEST_BUNDLE_CHUNKS = [b"test content 1\n", + b"test content 2\n", + b"test content 3\n"] +TEST_BUNDLE_CONTENT = b''.join(TEST_BUNDLE_CHUNKS) +TEST_OBJ_TYPE = 'test_type' +TEST_HEX_ID = '17a3e48bce37be5226490e750202ad3a9a1a3fe9' +TEST_OBJ_ID = hashutil.hash_to_bytes(TEST_HEX_ID) + + +class BaseVaultCookerMock(BaseVaultCooker): + CACHE_TYPE_KEY = TEST_OBJ_TYPE + + def __init__(self, *args, **kwargs): + super().__init__(self.CACHE_TYPE_KEY, TEST_OBJ_ID, *args, **kwargs) + self.storage = MagicMock() + self.backend = MagicMock() + + def check_exists(self): + return True + + def prepare_bundle(self): + for chunk in TEST_BUNDLE_CHUNKS: + yield chunk + + +class TestBaseVaultCooker(unittest.TestCase): + def test_simple_cook(self): + cooker = BaseVaultCookerMock() + cooker.cook() + cooker.backend.put_bundle.assert_called_once_with( + TEST_OBJ_TYPE, TEST_OBJ_ID, TEST_BUNDLE_CONTENT) + cooker.backend.set_status.assert_called_with( + TEST_OBJ_TYPE, TEST_OBJ_ID, 'done') + cooker.backend.set_progress.assert_called_with( + TEST_OBJ_TYPE, TEST_OBJ_ID, None) + cooker.backend.send_notif.assert_called_with( + TEST_OBJ_TYPE, TEST_OBJ_ID) + + def test_code_exception_cook(self): + cooker = BaseVaultCookerMock() + cooker.prepare_bundle = MagicMock() + cooker.prepare_bundle.side_effect = RuntimeError("Nope") + cooker.cook() + + # Potentially remove this when we have objstorage streaming + cooker.backend.put_bundle.assert_not_called() + + cooker.backend.set_status.assert_called_with( + TEST_OBJ_TYPE, TEST_OBJ_ID, 'failed') + self.assertNotIn("Nope", cooker.backend.set_progress.call_args[0][2]) + cooker.backend.send_notif.assert_called_with( + TEST_OBJ_TYPE, TEST_OBJ_ID) + + def test_policy_exception_cook(self): + cooker = BaseVaultCookerMock() + + with patch('swh.vault.cookers.base.MAX_BUNDLE_SIZE', 8): + cooker.cook() + + # Potentially remove this when we have objstorage streaming + cooker.backend.put_bundle.assert_not_called() + + cooker.backend.set_status.assert_called_with( + TEST_OBJ_TYPE, TEST_OBJ_ID, 'failed') + self.assertIn("too large", cooker.backend.set_progress.call_args[0][2]) + cooker.backend.send_notif.assert_called_with( + TEST_OBJ_TYPE, TEST_OBJ_ID) + + +class TestGetTarBytes(unittest.TestCase): + def test_tar_too_large(self): + with tempfile.TemporaryDirectory(prefix='tmp-vault-repo-') as td: + p = pathlib.Path(td) + (p / 'dir1/dir2').mkdir(parents=True) + (p / 'dir1/dir2/file').write_text('testtesttesttest') + + with patch('swh.vault.cookers.base.MAX_BUNDLE_SIZE', 8): + with self.assertRaises(BundleTooLargeError): + get_tar_bytes(p)