Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9341527
D286.id966.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
16 KB
Subscribers
None
D286.id966.diff
View Options
diff --git a/swh/vault/cookers/base.py b/swh/vault/cookers/base.py
--- a/swh/vault/cookers/base.py
+++ b/swh/vault/cookers/base.py
@@ -8,10 +8,6 @@
import itertools
import logging
import os
-import tarfile
-import tempfile
-
-from pathlib import Path
from swh.core import config
from swh.model import hashutil
@@ -43,6 +39,19 @@
pass
+class BytesIOBundleSizeLimit(io.BytesIO):
+ def __init__(self, *args, size_limit=None, **kwargs):
+ self.size_limit = size_limit
+
+ def write(self, chunk):
+ if ((self.size_limit is not None
+ and self.getbuffer().nbytes + len(chunk) > self.size_limit)):
+ raise BundleTooLargeError(
+ "The requested bundle exceeds the maximum allowed "
+ "size of {} bytes.".format(self.size_limit))
+ return super().write(chunk)
+
+
class BaseVaultCooker(metaclass=abc.ABCMeta):
"""Abstract base class for the vault's bundle creators
@@ -90,21 +99,19 @@
"""
raise NotImplemented
+ def write(self, chunk):
+ self.fileobj.write(chunk)
+
def cook(self):
"""Cook the requested object into a bundle
"""
self.backend.set_status(self.obj_type, self.obj_id, 'pending')
self.backend.set_progress(self.obj_type, self.obj_id, 'Processing...')
+ self.fileobj = BytesIOBundleSizeLimit(size_limit=self.max_bundle_size)
try:
- content_iter = self.prepare_bundle()
- bundle = b''
- for chunk in content_iter:
- bundle += chunk
- if len(bundle) > self.max_bundle_size:
- raise BundleTooLargeError(
- "The requested bundle exceeds the maximum allowed "
- "size of {} bytes.".format(self.max_bundle_size))
+ self.prepare_bundle()
+ bundle = self.fileobj.getvalue()
except PolicyError as e:
self.backend.set_status(self.obj_type, self.obj_id, 'failed')
self.backend.set_progress(self.obj_type, self.obj_id, str(e))
@@ -154,35 +161,6 @@
return list(storage.content_get([file_data['sha1']]))[0]['data']
-# TODO: We should use something like that for all the IO done by the cookers.
-# Instead of using generators to yield chunks, we should just write() the
-# chunks in an object like this, which would give us way better control over
-# the buffering, and allow for streaming content to the objstorage.
-class BytesIOBundleSizeLimit(io.BytesIO):
- def __init__(self, *args, size_limit=None, **kwargs):
- self.size_limit = size_limit
-
- def write(self, chunk):
- if ((self.size_limit is not None
- and self.getbuffer().nbytes + len(chunk) > self.size_limit)):
- raise BundleTooLargeError(
- "The requested bundle exceeds the maximum allowed "
- "size of {} bytes.".format(self.size_limit))
- return super().write(chunk)
-
-
-# TODO: Once the BytesIO buffer is put in BaseCooker, we can just pass it here
-# as a fileobj parameter instead of passing size_limit
-def get_tar_bytes(path, arcname=None, size_limit=None):
- path = Path(path)
- if not arcname:
- arcname = path.name
- tar_buffer = BytesIOBundleSizeLimit(size_limit=size_limit)
- tar = tarfile.open(fileobj=tar_buffer, mode='w')
- tar.add(str(path), arcname=arcname)
- return tar_buffer.getbuffer()
-
-
class DirectoryBuilder:
"""Creates a cooked directory from its sha1_git in the db.
@@ -193,19 +171,6 @@
def __init__(self, storage):
self.storage = storage
- def get_directory_bytes(self, dir_id, size_limit=None):
- # Create temporary folder to retrieve the files into.
- root = bytes(tempfile.mkdtemp(prefix='directory.',
- suffix='.cook'), 'utf8')
- self.build_directory(dir_id, root)
- # Use the created directory to make a bundle with the data as
- # a compressed directory.
- bundle_content = self._create_bundle_content(
- root,
- hashutil.hash_to_hex(dir_id),
- size_limit=size_limit)
- return bundle_content
-
def build_directory(self, dir_id, root):
# Retrieve data from the database.
data = self.storage.directory_ls(dir_id, recursive=True)
@@ -264,16 +229,3 @@
"""
content = list(self.storage.content_get([obj_id]))[0]['data']
return content
-
- def _create_bundle_content(self, path, hex_dir_id, size_limit=None):
- """Create a bundle from the given directory
-
- Args:
- path: location of the directory to package.
- hex_dir_id: hex representation of the directory id
-
- Returns:
- bytes that represent the compressed directory as a bundle.
-
- """
- return get_tar_bytes(path.decode(), hex_dir_id, size_limit=size_limit)
diff --git a/swh/vault/cookers/directory.py b/swh/vault/cookers/directory.py
--- a/swh/vault/cookers/directory.py
+++ b/swh/vault/cookers/directory.py
@@ -3,7 +3,11 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import tarfile
+import tempfile
+
from swh.vault.cookers.base import BaseVaultCooker, DirectoryBuilder
+from swh.model import hashutil
class DirectoryCooker(BaseVaultCooker):
@@ -15,5 +19,7 @@
def prepare_bundle(self):
directory_builder = DirectoryBuilder(self.storage)
- yield directory_builder.get_directory_bytes(self.obj_id,
- self.max_bundle_size)
+ with tempfile.TemporaryDirectory(prefix='tmp-vault-directory-') as td:
+ directory_builder.build_directory(self.obj_id, td.encode())
+ tar = tarfile.open(fileobj=self.fileobj, mode='w')
+ tar.add(td, arcname=hashutil.hash_to_hex(self.obj_id))
diff --git a/swh/vault/cookers/revision_flat.py b/swh/vault/cookers/revision_flat.py
--- a/swh/vault/cookers/revision_flat.py
+++ b/swh/vault/cookers/revision_flat.py
@@ -3,36 +3,29 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+import tarfile
import tempfile
from pathlib import Path
from swh.model import hashutil
-
-from .base import BaseVaultCooker, DirectoryBuilder, get_tar_bytes
+from .base import BaseVaultCooker, DirectoryBuilder
class RevisionFlatCooker(BaseVaultCooker):
- """Cooker to create a directory bundle """
+ """Cooker to create a revision_flat bundle """
CACHE_TYPE_KEY = 'revision_flat'
def check_exists(self):
return not list(self.storage.revision_missing([self.obj_id]))
def prepare_bundle(self):
- """Cook the requested revision into a Bundle
-
- Returns:
- bytes that correspond to the bundle
-
- """
directory_builder = DirectoryBuilder(self.storage)
- with tempfile.TemporaryDirectory(suffix='.cook') as root_tmp:
- root = Path(root_tmp)
+ with tempfile.TemporaryDirectory(prefix='tmp-vault-revision-') as td:
+ root = Path(td)
for revision in self.storage.revision_log([self.obj_id]):
revdir = root / hashutil.hash_to_hex(revision['id'])
revdir.mkdir()
directory_builder.build_directory(revision['directory'],
str(revdir).encode())
- # FIXME: stream the bytes! this tarball can be HUUUUUGE
- yield get_tar_bytes(root_tmp, hashutil.hash_to_hex(self.obj_id),
- self.max_bundle_size)
+ tar = tarfile.open(fileobj=self.fileobj, mode='w')
+ tar.add(td, arcname=hashutil.hash_to_hex(self.obj_id))
diff --git a/swh/vault/cookers/revision_gitfast.py b/swh/vault/cookers/revision_gitfast.py
--- a/swh/vault/cookers/revision_gitfast.py
+++ b/swh/vault/cookers/revision_gitfast.py
@@ -4,12 +4,14 @@
# See top-level LICENSE file for more information
import collections
-import fastimport.commands
import functools
import os
import time
import zlib
+from fastimport.commands import (CommitCommand, ResetCommand, BlobCommand,
+ FileDeleteCommand, FileModifyCommand)
+
from .base import BaseVaultCooker, get_filtered_file_content
from swh.model.from_disk import mode_to_perms
@@ -23,12 +25,13 @@
def prepare_bundle(self):
log = self.storage.revision_log([self.obj_id])
- commands = self.fastexport(log)
+ self.gzobj = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16)
+ self.fastexport(log)
+ self.write(self.gzobj.flush())
- compressobj = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16)
- for command in commands:
- yield compressobj.compress(bytes(command) + b'\n')
- yield compressobj.flush()
+ def write_cmd(self, cmd):
+ chunk = bytes(cmd) + b'\n'
+ super().write(self.gzobj.compress(chunk))
def fastexport(self, log):
"""Generate all the git fast-import commands from a given log.
@@ -52,7 +55,7 @@
self.backend.set_progress(self.obj_type, self.obj_id, pg)
# Compute the current commit
- yield from self._compute_commit_command(rev)
+ self._compute_commit_command(rev)
def _toposort(self, rev_by_id):
"""Perform a topological sort on the revision graph.
@@ -99,10 +102,7 @@
if obj_id in self.obj_done:
return
content = get_filtered_file_content(self.storage, file_data)
- yield fastimport.commands.BlobCommand(
- mark=self.mark(obj_id),
- data=content,
- )
+ self.write_cmd(BlobCommand(mark=self.mark(obj_id), data=content))
self.obj_done.add(obj_id)
def _author_tuple_format(self, author, date):
@@ -130,20 +130,20 @@
else:
# We issue a reset command before all the new roots so that they
# are not automatically added as children of the current branch.
- yield fastimport.commands.ResetCommand(b'refs/heads/master', None)
+ self.write_cmd(ResetCommand(b'refs/heads/master', None))
from_ = None
merges = None
parent = None
# Retrieve the file commands while yielding new blob commands if
# needed.
- files = yield from self._compute_file_commands(rev, parent)
+ files = list(self._compute_file_commands(rev, parent))
- # Construct and yield the commit command
+ # Construct and write the commit command
author = self._author_tuple_format(rev['author'], rev['date'])
committer = self._author_tuple_format(rev['committer'],
rev['committer_date'])
- yield fastimport.commands.CommitCommand(
+ self.write_cmd(CommitCommand(
ref=b'refs/heads/master',
mark=self.mark(rev['id']),
author=author,
@@ -151,8 +151,7 @@
message=rev['message'] or b'',
from_=from_,
merges=merges,
- file_iter=files,
- )
+ file_iter=files))
@functools.lru_cache(maxsize=4096)
def _get_dir_ents(self, dir_id=None):
@@ -171,8 +170,6 @@
Generate a diff of the files between the revision and its main parent
to find the necessary file commands to apply.
"""
- commands = []
-
# Initialize the stack with the root of the tree.
cur_dir = rev['directory']
parent_dir = parent['directory'] if parent else None
@@ -195,9 +192,7 @@
for fname, f in prev_dir.items():
if ((fname not in cur_dir
or f['type'] != cur_dir[fname]['type'])):
- commands.append(fastimport.commands.FileDeleteCommand(
- path=os.path.join(root, fname)
- ))
+ yield FileDeleteCommand(path=os.path.join(root, fname))
# Find subtrees to modify:
# - Leaves (files) will be added or modified using `filemodify`
@@ -211,13 +206,12 @@
or f['sha1'] != prev_dir[fname]['sha1']
or f['perms'] != prev_dir[fname]['perms'])):
# Issue a blob command for the new blobs if needed.
- yield from self._compute_blob_command_content(f)
- commands.append(fastimport.commands.FileModifyCommand(
+ self._compute_blob_command_content(f)
+ yield FileModifyCommand(
path=os.path.join(root, fname),
mode=mode_to_perms(f['perms']).value,
dataref=(b':' + self.mark(f['sha1'])),
- data=None,
- ))
+ data=None)
# A directory is added or modified if it was not in the tree or
# if its target changed.
elif f['type'] == 'dir':
@@ -227,4 +221,3 @@
if f_prev_target is None or f['target'] != f_prev_target:
stack.append((os.path.join(root, fname),
f['target'], f_prev_target))
- return commands
diff --git a/swh/vault/tests/test_cookers.py b/swh/vault/tests/test_cookers.py
--- a/swh/vault/tests/test_cookers.py
+++ b/swh/vault/tests/test_cookers.py
@@ -91,14 +91,14 @@
cooker = DirectoryCooker('directory', obj_id)
cooker.storage = self.storage
cooker.backend = unittest.mock.MagicMock()
- cooker.check_exists() # Raises if false
- tarball = b''.join(cooker.prepare_bundle())
- with tempfile.TemporaryDirectory('tmp-vault-extract-') as td:
- fobj = io.BytesIO(tarball)
- with tarfile.open(fileobj=fobj, mode='r') as tar:
+ cooker.fileobj = io.BytesIO()
+ assert cooker.check_exists()
+ cooker.prepare_bundle()
+ cooker.fileobj.seek(0)
+ with tempfile.TemporaryDirectory(prefix='tmp-vault-extract-') as td:
+ with tarfile.open(fileobj=cooker.fileobj, mode='r') as tar:
tar.extractall(td)
- p = pathlib.Path(td) / hashutil.hash_to_hex(obj_id)
- yield p
+ yield pathlib.Path(td) / hashutil.hash_to_hex(obj_id)
@contextlib.contextmanager
def cook_extract_revision_gitfast(self, obj_id):
@@ -106,9 +106,11 @@
cooker = RevisionGitfastCooker('revision_gitfast', obj_id)
cooker.storage = self.storage
cooker.backend = unittest.mock.MagicMock()
- cooker.check_exists() # Raises if false
- fastexport = b''.join(cooker.prepare_bundle())
- fastexport_stream = gzip.GzipFile(fileobj=io.BytesIO(fastexport))
+ cooker.fileobj = io.BytesIO()
+ assert cooker.check_exists()
+ cooker.prepare_bundle()
+ cooker.fileobj.seek(0)
+ fastexport_stream = gzip.GzipFile(fileobj=cooker.fileobj)
test_repo = TestRepo()
with test_repo as p:
processor = dulwich.fastexport.GitImportProcessor(test_repo.repo)
diff --git a/swh/vault/tests/test_cookers_base.py b/swh/vault/tests/test_cookers_base.py
--- a/swh/vault/tests/test_cookers_base.py
+++ b/swh/vault/tests/test_cookers_base.py
@@ -3,14 +3,11 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
-import pathlib
-import tempfile
import unittest
from unittest.mock import MagicMock
from swh.model import hashutil
-from swh.vault.cookers.base import (BaseVaultCooker, get_tar_bytes,
- BundleTooLargeError)
+from swh.vault.cookers.base import BaseVaultCooker
TEST_BUNDLE_CHUNKS = [b"test content 1\n",
@@ -35,7 +32,7 @@
def prepare_bundle(self):
for chunk in TEST_BUNDLE_CHUNKS:
- yield chunk
+ self.write(chunk)
class TestBaseVaultCooker(unittest.TestCase):
@@ -79,14 +76,3 @@
self.assertIn("exceeds", cooker.backend.set_progress.call_args[0][2])
cooker.backend.send_notif.assert_called_with(
TEST_OBJ_TYPE, TEST_OBJ_ID)
-
-
-class TestGetTarBytes(unittest.TestCase):
- def test_tar_too_large(self):
- with tempfile.TemporaryDirectory(prefix='tmp-vault-repo-') as td:
- p = pathlib.Path(td)
- (p / 'dir1/dir2').mkdir(parents=True)
- (p / 'dir1/dir2/file').write_text('testtesttesttest')
-
- with self.assertRaises(BundleTooLargeError):
- get_tar_bytes(p, size_limit=8)
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Thu, Jul 3, 12:06 PM (3 w, 14 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3222749
Attached To
D286: cookers: rewrite the I/O pipeline with file objects instead of generators
Event Timeline
Log In to Comment