diff --git a/swh/vault/cookers/base.py b/swh/vault/cookers/base.py --- a/swh/vault/cookers/base.py +++ b/swh/vault/cookers/base.py @@ -11,8 +11,6 @@ import tarfile import tempfile -from pathlib import Path - from swh.core import config from swh.model import hashutil from swh.model.from_disk import mode_to_perms, DentryPerms @@ -43,6 +41,19 @@ pass +class BytesIOBundleSizeLimit(io.BytesIO): + def __init__(self, *args, size_limit=None, **kwargs): + self.size_limit = size_limit + + def write(self, chunk): + if ((self.size_limit is not None + and self.getbuffer().nbytes + len(chunk) > self.size_limit)): + raise BundleTooLargeError( + "The requested bundle exceeds the maximum allowed " + "size of {} bytes.".format(self.size_limit)) + return super().write(chunk) + + class BaseVaultCooker(metaclass=abc.ABCMeta): """Abstract base class for the vault's bundle creators @@ -90,21 +101,19 @@ """ raise NotImplemented + def write(self, chunk): + self.fileobj.write(chunk) + def cook(self): """Cook the requested object into a bundle """ self.backend.set_status(self.obj_type, self.obj_id, 'pending') self.backend.set_progress(self.obj_type, self.obj_id, 'Processing...') + self.fileobj = BytesIOBundleSizeLimit(size_limit=self.max_bundle_size) try: - content_iter = self.prepare_bundle() - bundle = b'' - for chunk in content_iter: - bundle += chunk - if len(bundle) > self.max_bundle_size: - raise BundleTooLargeError( - "The requested bundle exceeds the maximum allowed " - "size of {} bytes.".format(self.max_bundle_size)) + self.prepare_bundle() + bundle = self.fileobj.getvalue() except PolicyError as e: self.backend.set_status(self.obj_type, self.obj_id, 'failed') self.backend.set_progress(self.obj_type, self.obj_id, str(e)) @@ -154,35 +163,6 @@ return list(storage.content_get([file_data['sha1']]))[0]['data'] -# TODO: We should use something like that for all the IO done by the cookers. -# Instead of using generators to yield chunks, we should just write() the -# chunks in an object like this, which would give us way better control over -# the buffering, and allow for streaming content to the objstorage. -class BytesIOBundleSizeLimit(io.BytesIO): - def __init__(self, *args, size_limit=None, **kwargs): - self.size_limit = size_limit - - def write(self, chunk): - if ((self.size_limit is not None - and self.getbuffer().nbytes + len(chunk) > self.size_limit)): - raise BundleTooLargeError( - "The requested bundle exceeds the maximum allowed " - "size of {} bytes.".format(self.size_limit)) - return super().write(chunk) - - -# TODO: Once the BytesIO buffer is put in BaseCooker, we can just pass it here -# as a fileobj parameter instead of passing size_limit -def get_tar_bytes(path, arcname=None, size_limit=None): - path = Path(path) - if not arcname: - arcname = path.name - tar_buffer = BytesIOBundleSizeLimit(size_limit=size_limit) - tar = tarfile.open(fileobj=tar_buffer, mode='w') - tar.add(str(path), arcname=arcname) - return tar_buffer.getbuffer() - - class DirectoryBuilder: """Creates a cooked directory from its sha1_git in the db. @@ -193,18 +173,15 @@ def __init__(self, storage): self.storage = storage - def get_directory_bytes(self, dir_id, size_limit=None): + def write_directory_bytes(self, dir_id, fileobj): # Create temporary folder to retrieve the files into. - root = bytes(tempfile.mkdtemp(prefix='directory.', - suffix='.cook'), 'utf8') - self.build_directory(dir_id, root) - # Use the created directory to make a bundle with the data as - # a compressed directory. - bundle_content = self._create_bundle_content( - root, - hashutil.hash_to_hex(dir_id), - size_limit=size_limit) - return bundle_content + with tempfile.TemporaryDirectory(prefix='tmp-vault-directory-') as td: + self.build_directory(dir_id, td.encode()) + + # Use the created directory to write a tar bundle containing the + # compressed directory. + tar = tarfile.open(fileobj=fileobj, mode='w') + tar.add(td, arcname=hashutil.hash_to_hex(dir_id)) def build_directory(self, dir_id, root): # Retrieve data from the database. @@ -264,16 +241,3 @@ """ content = list(self.storage.content_get([obj_id]))[0]['data'] return content - - def _create_bundle_content(self, path, hex_dir_id, size_limit=None): - """Create a bundle from the given directory - - Args: - path: location of the directory to package. - hex_dir_id: hex representation of the directory id - - Returns: - bytes that represent the compressed directory as a bundle. - - """ - return get_tar_bytes(path.decode(), hex_dir_id, size_limit=size_limit) diff --git a/swh/vault/cookers/directory.py b/swh/vault/cookers/directory.py --- a/swh/vault/cookers/directory.py +++ b/swh/vault/cookers/directory.py @@ -15,5 +15,4 @@ def prepare_bundle(self): directory_builder = DirectoryBuilder(self.storage) - yield directory_builder.get_directory_bytes(self.obj_id, - self.max_bundle_size) + directory_builder.write_directory_bytes(self.obj_id, self.fileobj) diff --git a/swh/vault/cookers/revision_flat.py b/swh/vault/cookers/revision_flat.py --- a/swh/vault/cookers/revision_flat.py +++ b/swh/vault/cookers/revision_flat.py @@ -3,12 +3,13 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information +import tarfile import tempfile from pathlib import Path from swh.model import hashutil -from .base import BaseVaultCooker, DirectoryBuilder, get_tar_bytes +from .base import BaseVaultCooker, DirectoryBuilder class RevisionFlatCooker(BaseVaultCooker): @@ -33,6 +34,5 @@ revdir.mkdir() directory_builder.build_directory(revision['directory'], str(revdir).encode()) - # FIXME: stream the bytes! this tarball can be HUUUUUGE - yield get_tar_bytes(root_tmp, hashutil.hash_to_hex(self.obj_id), - self.max_bundle_size) + tar = tarfile.open(fileobj=self.fileobj, mode='w') + tar.add(root_tmp, arcname=hashutil.hash_to_hex(self.obj_id)) diff --git a/swh/vault/cookers/revision_gitfast.py b/swh/vault/cookers/revision_gitfast.py --- a/swh/vault/cookers/revision_gitfast.py +++ b/swh/vault/cookers/revision_gitfast.py @@ -4,12 +4,14 @@ # See top-level LICENSE file for more information import collections -import fastimport.commands import functools import os import time import zlib +from fastimport.commands import (CommitCommand, ResetCommand, BlobCommand, + FileDeleteCommand, FileModifyCommand) + from .base import BaseVaultCooker, get_filtered_file_content from swh.model.from_disk import mode_to_perms @@ -23,12 +25,13 @@ def prepare_bundle(self): log = self.storage.revision_log([self.obj_id]) - commands = self.fastexport(log) + self.gzobj = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16) + self.fastexport(log) + self.write(self.gzobj.flush()) - compressobj = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16) - for command in commands: - yield compressobj.compress(bytes(command) + b'\n') - yield compressobj.flush() + def write_cmd(self, cmd): + chunk = bytes(cmd) + b'\n' + super().write(self.gzobj.compress(chunk)) def fastexport(self, log): """Generate all the git fast-import commands from a given log. @@ -52,7 +55,7 @@ self.backend.set_progress(self.obj_type, self.obj_id, pg) # Compute the current commit - yield from self._compute_commit_command(rev) + self._compute_commit_command(rev) def _toposort(self, rev_by_id): """Perform a topological sort on the revision graph. @@ -99,10 +102,7 @@ if obj_id in self.obj_done: return content = get_filtered_file_content(self.storage, file_data) - yield fastimport.commands.BlobCommand( - mark=self.mark(obj_id), - data=content, - ) + self.write_cmd(BlobCommand(mark=self.mark(obj_id), data=content)) self.obj_done.add(obj_id) def _author_tuple_format(self, author, date): @@ -130,20 +130,20 @@ else: # We issue a reset command before all the new roots so that they # are not automatically added as children of the current branch. - yield fastimport.commands.ResetCommand(b'refs/heads/master', None) + self.write_cmd(ResetCommand(b'refs/heads/master', None)) from_ = None merges = None parent = None # Retrieve the file commands while yielding new blob commands if # needed. - files = yield from self._compute_file_commands(rev, parent) + files = self._compute_file_commands(rev, parent) - # Construct and yield the commit command + # Construct and write the commit command author = self._author_tuple_format(rev['author'], rev['date']) committer = self._author_tuple_format(rev['committer'], rev['committer_date']) - yield fastimport.commands.CommitCommand( + self.write_cmd(CommitCommand( ref=b'refs/heads/master', mark=self.mark(rev['id']), author=author, @@ -151,8 +151,7 @@ message=rev['message'] or b'', from_=from_, merges=merges, - file_iter=files, - ) + file_iter=files)) @functools.lru_cache(maxsize=4096) def _get_dir_ents(self, dir_id=None): @@ -195,7 +194,7 @@ for fname, f in prev_dir.items(): if ((fname not in cur_dir or f['type'] != cur_dir[fname]['type'])): - commands.append(fastimport.commands.FileDeleteCommand( + commands.append(FileDeleteCommand( path=os.path.join(root, fname) )) @@ -211,8 +210,8 @@ or f['sha1'] != prev_dir[fname]['sha1'] or f['perms'] != prev_dir[fname]['perms'])): # Issue a blob command for the new blobs if needed. - yield from self._compute_blob_command_content(f) - commands.append(fastimport.commands.FileModifyCommand( + self._compute_blob_command_content(f) + commands.append(FileModifyCommand( path=os.path.join(root, fname), mode=mode_to_perms(f['perms']).value, dataref=(b':' + self.mark(f['sha1'])), diff --git a/swh/vault/tests/test_cookers.py b/swh/vault/tests/test_cookers.py --- a/swh/vault/tests/test_cookers.py +++ b/swh/vault/tests/test_cookers.py @@ -91,14 +91,14 @@ cooker = DirectoryCooker('directory', obj_id) cooker.storage = self.storage cooker.backend = unittest.mock.MagicMock() - cooker.check_exists() # Raises if false - tarball = b''.join(cooker.prepare_bundle()) - with tempfile.TemporaryDirectory('tmp-vault-extract-') as td: - fobj = io.BytesIO(tarball) - with tarfile.open(fileobj=fobj, mode='r') as tar: + cooker.fileobj = io.BytesIO() + assert cooker.check_exists() + cooker.prepare_bundle() + cooker.fileobj.seek(0) + with tempfile.TemporaryDirectory(prefix='tmp-vault-extract-') as td: + with tarfile.open(fileobj=cooker.fileobj, mode='r') as tar: tar.extractall(td) - p = pathlib.Path(td) / hashutil.hash_to_hex(obj_id) - yield p + yield pathlib.Path(td) / hashutil.hash_to_hex(obj_id) @contextlib.contextmanager def cook_extract_revision_gitfast(self, obj_id): @@ -106,9 +106,11 @@ cooker = RevisionGitfastCooker('revision_gitfast', obj_id) cooker.storage = self.storage cooker.backend = unittest.mock.MagicMock() - cooker.check_exists() # Raises if false - fastexport = b''.join(cooker.prepare_bundle()) - fastexport_stream = gzip.GzipFile(fileobj=io.BytesIO(fastexport)) + cooker.fileobj = io.BytesIO() + assert cooker.check_exists() + cooker.prepare_bundle() + cooker.fileobj.seek(0) + fastexport_stream = gzip.GzipFile(fileobj=cooker.fileobj) test_repo = TestRepo() with test_repo as p: processor = dulwich.fastexport.GitImportProcessor(test_repo.repo) diff --git a/swh/vault/tests/test_cookers_base.py b/swh/vault/tests/test_cookers_base.py --- a/swh/vault/tests/test_cookers_base.py +++ b/swh/vault/tests/test_cookers_base.py @@ -3,14 +3,11 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import pathlib -import tempfile import unittest from unittest.mock import MagicMock from swh.model import hashutil -from swh.vault.cookers.base import (BaseVaultCooker, get_tar_bytes, - BundleTooLargeError) +from swh.vault.cookers.base import BaseVaultCooker TEST_BUNDLE_CHUNKS = [b"test content 1\n", @@ -35,7 +32,7 @@ def prepare_bundle(self): for chunk in TEST_BUNDLE_CHUNKS: - yield chunk + self.write(chunk) class TestBaseVaultCooker(unittest.TestCase): @@ -79,14 +76,3 @@ self.assertIn("exceeds", cooker.backend.set_progress.call_args[0][2]) cooker.backend.send_notif.assert_called_with( TEST_OBJ_TYPE, TEST_OBJ_ID) - - -class TestGetTarBytes(unittest.TestCase): - def test_tar_too_large(self): - with tempfile.TemporaryDirectory(prefix='tmp-vault-repo-') as td: - p = pathlib.Path(td) - (p / 'dir1/dir2').mkdir(parents=True) - (p / 'dir1/dir2/file').write_text('testtesttesttest') - - with self.assertRaises(BundleTooLargeError): - get_tar_bytes(p, size_limit=8)