Page MenuHomeSoftware Heritage

D286.id966.diff
No OneTemporary

D286.id966.diff

diff --git a/swh/vault/cookers/base.py b/swh/vault/cookers/base.py
--- a/swh/vault/cookers/base.py
+++ b/swh/vault/cookers/base.py
@@ -8,10 +8,6 @@
import itertools
import logging
import os
-import tarfile
-import tempfile
-
-from pathlib import Path
from swh.core import config
from swh.model import hashutil
@@ -43,6 +39,19 @@
pass
+class BytesIOBundleSizeLimit(io.BytesIO):
+ def __init__(self, *args, size_limit=None, **kwargs):
+ self.size_limit = size_limit
+
+ def write(self, chunk):
+ if ((self.size_limit is not None
+ and self.getbuffer().nbytes + len(chunk) > self.size_limit)):
+ raise BundleTooLargeError(
+ "The requested bundle exceeds the maximum allowed "
+ "size of {} bytes.".format(self.size_limit))
+ return super().write(chunk)
+
+
class BaseVaultCooker(metaclass=abc.ABCMeta):
"""Abstract base class for the vault's bundle creators
@@ -90,21 +99,19 @@
"""
raise NotImplemented
+ def write(self, chunk):
+ self.fileobj.write(chunk)
+
def cook(self):
"""Cook the requested object into a bundle
"""
self.backend.set_status(self.obj_type, self.obj_id, 'pending')
self.backend.set_progress(self.obj_type, self.obj_id, 'Processing...')
+ self.fileobj = BytesIOBundleSizeLimit(size_limit=self.max_bundle_size)
try:
- content_iter = self.prepare_bundle()
- bundle = b''
- for chunk in content_iter:
- bundle += chunk
- if len(bundle) > self.max_bundle_size:
- raise BundleTooLargeError(
- "The requested bundle exceeds the maximum allowed "
- "size of {} bytes.".format(self.max_bundle_size))
+ self.prepare_bundle()
+ bundle = self.fileobj.getvalue()
except PolicyError as e:
self.backend.set_status(self.obj_type, self.obj_id, 'failed')
self.backend.set_progress(self.obj_type, self.obj_id, str(e))
@@ -154,35 +161,6 @@
return list(storage.content_get([file_data['sha1']]))[0]['data']
-# TODO: We should use something like that for all the IO done by the cookers.
-# Instead of using generators to yield chunks, we should just write() the
-# chunks in an object like this, which would give us way better control over
-# the buffering, and allow for streaming content to the objstorage.
-class BytesIOBundleSizeLimit(io.BytesIO):
- def __init__(self, *args, size_limit=None, **kwargs):
- self.size_limit = size_limit
-
- def write(self, chunk):
- if ((self.size_limit is not None
- and self.getbuffer().nbytes + len(chunk) > self.size_limit)):
- raise BundleTooLargeError(
- "The requested bundle exceeds the maximum allowed "
- "size of {} bytes.".format(self.size_limit))
- return super().write(chunk)
-
-
-# TODO: Once the BytesIO buffer is put in BaseCooker, we can just pass it here
-# as a fileobj parameter instead of passing size_limit
-def get_tar_bytes(path, arcname=None, size_limit=None):
- path = Path(path)
- if not arcname:
- arcname = path.name
- tar_buffer = BytesIOBundleSizeLimit(size_limit=size_limit)
- tar = tarfile.open(fileobj=tar_buffer, mode='w')
- tar.add(str(path), arcname=arcname)
- return tar_buffer.getbuffer()
-
-
class DirectoryBuilder:
"""Creates a cooked directory from its sha1_git in the db.
@@ -193,19 +171,6 @@
def __init__(self, storage):
self.storage = storage
- def get_directory_bytes(self, dir_id, size_limit=None):
- # Create temporary folder to retrieve the files into.
- root = bytes(tempfile.mkdtemp(prefix='directory.',
- suffix='.cook'), 'utf8')
- self.build_directory(dir_id, root)
- # Use the created directory to make a bundle with the data as
- # a compressed directory.
- bundle_content = self._create_bundle_content(
- root,
- hashutil.hash_to_hex(dir_id),
- size_limit=size_limit)
- return bundle_content
-
def build_directory(self, dir_id, root):
# Retrieve data from the database.
data = self.storage.directory_ls(dir_id, recursive=True)
@@ -264,16 +229,3 @@
"""
content = list(self.storage.content_get([obj_id]))[0]['data']
return content
-
- def _create_bundle_content(self, path, hex_dir_id, size_limit=None):
- """Create a bundle from the given directory
-
- Args:
- path: location of the directory to package.
- hex_dir_id: hex representation of the directory id
-
- Returns:
- bytes that represent the compressed directory as a bundle.
-
- """
- return get_tar_bytes(path.decode(), hex_dir_id, size_limit=size_limit)
diff --git a/swh/vault/cookers/directory.py b/swh/vault/cookers/directory.py
--- a/swh/vault/cookers/directory.py
+++ b/swh/vault/cookers/directory.py
@@ -3,7 +3,11 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import tarfile
+import tempfile
+
from swh.vault.cookers.base import BaseVaultCooker, DirectoryBuilder
+from swh.model import hashutil
class DirectoryCooker(BaseVaultCooker):
@@ -15,5 +19,7 @@
def prepare_bundle(self):
directory_builder = DirectoryBuilder(self.storage)
- yield directory_builder.get_directory_bytes(self.obj_id,
- self.max_bundle_size)
+ with tempfile.TemporaryDirectory(prefix='tmp-vault-directory-') as td:
+ directory_builder.build_directory(self.obj_id, td.encode())
+ tar = tarfile.open(fileobj=self.fileobj, mode='w')
+ tar.add(td, arcname=hashutil.hash_to_hex(self.obj_id))
diff --git a/swh/vault/cookers/revision_flat.py b/swh/vault/cookers/revision_flat.py
--- a/swh/vault/cookers/revision_flat.py
+++ b/swh/vault/cookers/revision_flat.py
@@ -3,36 +3,29 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import tarfile
import tempfile
from pathlib import Path
from swh.model import hashutil
-
-from .base import BaseVaultCooker, DirectoryBuilder, get_tar_bytes
+from .base import BaseVaultCooker, DirectoryBuilder
class RevisionFlatCooker(BaseVaultCooker):
- """Cooker to create a directory bundle """
+ """Cooker to create a revision_flat bundle """
CACHE_TYPE_KEY = 'revision_flat'
def check_exists(self):
return not list(self.storage.revision_missing([self.obj_id]))
def prepare_bundle(self):
- """Cook the requested revision into a Bundle
-
- Returns:
- bytes that correspond to the bundle
-
- """
directory_builder = DirectoryBuilder(self.storage)
- with tempfile.TemporaryDirectory(suffix='.cook') as root_tmp:
- root = Path(root_tmp)
+ with tempfile.TemporaryDirectory(prefix='tmp-vault-revision-') as td:
+ root = Path(td)
for revision in self.storage.revision_log([self.obj_id]):
revdir = root / hashutil.hash_to_hex(revision['id'])
revdir.mkdir()
directory_builder.build_directory(revision['directory'],
str(revdir).encode())
- # FIXME: stream the bytes! this tarball can be HUUUUUGE
- yield get_tar_bytes(root_tmp, hashutil.hash_to_hex(self.obj_id),
- self.max_bundle_size)
+ tar = tarfile.open(fileobj=self.fileobj, mode='w')
+ tar.add(td, arcname=hashutil.hash_to_hex(self.obj_id))
diff --git a/swh/vault/cookers/revision_gitfast.py b/swh/vault/cookers/revision_gitfast.py
--- a/swh/vault/cookers/revision_gitfast.py
+++ b/swh/vault/cookers/revision_gitfast.py
@@ -4,12 +4,14 @@
# See top-level LICENSE file for more information
import collections
-import fastimport.commands
import functools
import os
import time
import zlib
+from fastimport.commands import (CommitCommand, ResetCommand, BlobCommand,
+ FileDeleteCommand, FileModifyCommand)
+
from .base import BaseVaultCooker, get_filtered_file_content
from swh.model.from_disk import mode_to_perms
@@ -23,12 +25,13 @@
def prepare_bundle(self):
log = self.storage.revision_log([self.obj_id])
- commands = self.fastexport(log)
+ self.gzobj = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16)
+ self.fastexport(log)
+ self.write(self.gzobj.flush())
- compressobj = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16)
- for command in commands:
- yield compressobj.compress(bytes(command) + b'\n')
- yield compressobj.flush()
+ def write_cmd(self, cmd):
+ chunk = bytes(cmd) + b'\n'
+ super().write(self.gzobj.compress(chunk))
def fastexport(self, log):
"""Generate all the git fast-import commands from a given log.
@@ -52,7 +55,7 @@
self.backend.set_progress(self.obj_type, self.obj_id, pg)
# Compute the current commit
- yield from self._compute_commit_command(rev)
+ self._compute_commit_command(rev)
def _toposort(self, rev_by_id):
"""Perform a topological sort on the revision graph.
@@ -99,10 +102,7 @@
if obj_id in self.obj_done:
return
content = get_filtered_file_content(self.storage, file_data)
- yield fastimport.commands.BlobCommand(
- mark=self.mark(obj_id),
- data=content,
- )
+ self.write_cmd(BlobCommand(mark=self.mark(obj_id), data=content))
self.obj_done.add(obj_id)
def _author_tuple_format(self, author, date):
@@ -130,20 +130,20 @@
else:
# We issue a reset command before all the new roots so that they
# are not automatically added as children of the current branch.
- yield fastimport.commands.ResetCommand(b'refs/heads/master', None)
+ self.write_cmd(ResetCommand(b'refs/heads/master', None))
from_ = None
merges = None
parent = None
# Retrieve the file commands while yielding new blob commands if
# needed.
- files = yield from self._compute_file_commands(rev, parent)
+ files = list(self._compute_file_commands(rev, parent))
- # Construct and yield the commit command
+ # Construct and write the commit command
author = self._author_tuple_format(rev['author'], rev['date'])
committer = self._author_tuple_format(rev['committer'],
rev['committer_date'])
- yield fastimport.commands.CommitCommand(
+ self.write_cmd(CommitCommand(
ref=b'refs/heads/master',
mark=self.mark(rev['id']),
author=author,
@@ -151,8 +151,7 @@
message=rev['message'] or b'',
from_=from_,
merges=merges,
- file_iter=files,
- )
+ file_iter=files))
@functools.lru_cache(maxsize=4096)
def _get_dir_ents(self, dir_id=None):
@@ -171,8 +170,6 @@
Generate a diff of the files between the revision and its main parent
to find the necessary file commands to apply.
"""
- commands = []
-
# Initialize the stack with the root of the tree.
cur_dir = rev['directory']
parent_dir = parent['directory'] if parent else None
@@ -195,9 +192,7 @@
for fname, f in prev_dir.items():
if ((fname not in cur_dir
or f['type'] != cur_dir[fname]['type'])):
- commands.append(fastimport.commands.FileDeleteCommand(
- path=os.path.join(root, fname)
- ))
+ yield FileDeleteCommand(path=os.path.join(root, fname))
# Find subtrees to modify:
# - Leaves (files) will be added or modified using `filemodify`
@@ -211,13 +206,12 @@
or f['sha1'] != prev_dir[fname]['sha1']
or f['perms'] != prev_dir[fname]['perms'])):
# Issue a blob command for the new blobs if needed.
- yield from self._compute_blob_command_content(f)
- commands.append(fastimport.commands.FileModifyCommand(
+ self._compute_blob_command_content(f)
+ yield FileModifyCommand(
path=os.path.join(root, fname),
mode=mode_to_perms(f['perms']).value,
dataref=(b':' + self.mark(f['sha1'])),
- data=None,
- ))
+ data=None)
# A directory is added or modified if it was not in the tree or
# if its target changed.
elif f['type'] == 'dir':
@@ -227,4 +221,3 @@
if f_prev_target is None or f['target'] != f_prev_target:
stack.append((os.path.join(root, fname),
f['target'], f_prev_target))
- return commands
diff --git a/swh/vault/tests/test_cookers.py b/swh/vault/tests/test_cookers.py
--- a/swh/vault/tests/test_cookers.py
+++ b/swh/vault/tests/test_cookers.py
@@ -91,14 +91,14 @@
cooker = DirectoryCooker('directory', obj_id)
cooker.storage = self.storage
cooker.backend = unittest.mock.MagicMock()
- cooker.check_exists() # Raises if false
- tarball = b''.join(cooker.prepare_bundle())
- with tempfile.TemporaryDirectory('tmp-vault-extract-') as td:
- fobj = io.BytesIO(tarball)
- with tarfile.open(fileobj=fobj, mode='r') as tar:
+ cooker.fileobj = io.BytesIO()
+ assert cooker.check_exists()
+ cooker.prepare_bundle()
+ cooker.fileobj.seek(0)
+ with tempfile.TemporaryDirectory(prefix='tmp-vault-extract-') as td:
+ with tarfile.open(fileobj=cooker.fileobj, mode='r') as tar:
tar.extractall(td)
- p = pathlib.Path(td) / hashutil.hash_to_hex(obj_id)
- yield p
+ yield pathlib.Path(td) / hashutil.hash_to_hex(obj_id)
@contextlib.contextmanager
def cook_extract_revision_gitfast(self, obj_id):
@@ -106,9 +106,11 @@
cooker = RevisionGitfastCooker('revision_gitfast', obj_id)
cooker.storage = self.storage
cooker.backend = unittest.mock.MagicMock()
- cooker.check_exists() # Raises if false
- fastexport = b''.join(cooker.prepare_bundle())
- fastexport_stream = gzip.GzipFile(fileobj=io.BytesIO(fastexport))
+ cooker.fileobj = io.BytesIO()
+ assert cooker.check_exists()
+ cooker.prepare_bundle()
+ cooker.fileobj.seek(0)
+ fastexport_stream = gzip.GzipFile(fileobj=cooker.fileobj)
test_repo = TestRepo()
with test_repo as p:
processor = dulwich.fastexport.GitImportProcessor(test_repo.repo)
diff --git a/swh/vault/tests/test_cookers_base.py b/swh/vault/tests/test_cookers_base.py
--- a/swh/vault/tests/test_cookers_base.py
+++ b/swh/vault/tests/test_cookers_base.py
@@ -3,14 +3,11 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import pathlib
-import tempfile
import unittest
from unittest.mock import MagicMock
from swh.model import hashutil
-from swh.vault.cookers.base import (BaseVaultCooker, get_tar_bytes,
- BundleTooLargeError)
+from swh.vault.cookers.base import BaseVaultCooker
TEST_BUNDLE_CHUNKS = [b"test content 1\n",
@@ -35,7 +32,7 @@
def prepare_bundle(self):
for chunk in TEST_BUNDLE_CHUNKS:
- yield chunk
+ self.write(chunk)
class TestBaseVaultCooker(unittest.TestCase):
@@ -79,14 +76,3 @@
self.assertIn("exceeds", cooker.backend.set_progress.call_args[0][2])
cooker.backend.send_notif.assert_called_with(
TEST_OBJ_TYPE, TEST_OBJ_ID)
-
-
-class TestGetTarBytes(unittest.TestCase):
- def test_tar_too_large(self):
- with tempfile.TemporaryDirectory(prefix='tmp-vault-repo-') as td:
- p = pathlib.Path(td)
- (p / 'dir1/dir2').mkdir(parents=True)
- (p / 'dir1/dir2/file').write_text('testtesttesttest')
-
- with self.assertRaises(BundleTooLargeError):
- get_tar_bytes(p, size_limit=8)

File Metadata

Mime Type
text/plain
Expires
Thu, Jul 3, 12:06 PM (3 w, 14 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3222749

Event Timeline