diff --git a/swh/vault/cache.py b/swh/vault/cache.py index db1a1f7..3b0ed22 100644 --- a/swh/vault/cache.py +++ b/swh/vault/cache.py @@ -1,53 +1,61 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os from swh.model import hashutil from swh.objstorage import get_objstorage from swh.objstorage.objstorage_pathslicing import DIR_MODE class VaultCache(): """The vault cache is an object storage that stores bundles The current implementation uses a PathSlicingObjStorage to store the bundles. The id of a content if prefixed to specify its type and store different types of bundle in different folders. """ def __init__(self, root): self.root = root self.storages = {} def add(self, obj_type, obj_id, content): storage = self._get_storage(obj_type) return storage.add(content, obj_id) def get(self, obj_type, obj_id): storage = self._get_storage(obj_type) return storage.get(hashutil.hash_to_bytes(obj_id)) + def add_stream(self, obj_type, obj_id, content_iter): + storage = self._get_storage(obj_type) + return storage.add_stream(content_iter, obj_id) + + def get_stream(self, obj_type, obj_id): + storage = self._get_storage(obj_type) + return storage.get_stream(hashutil.hash_to_bytes(obj_id)) + def is_cached(self, obj_type, obj_id): storage = self._get_storage(obj_type) return hashutil.hash_to_bytes(obj_id) in storage def ls(self, obj_type): storage = self._get_storage(obj_type) yield from storage def _get_storage(self, obj_type): """Get the storage that corresponds to the object type""" if obj_type not in self.storages: fp = os.path.join(self.root, obj_type) if not os.path.isdir(fp): os.makedirs(fp, DIR_MODE, exist_ok=True) self.storages[obj_type] = get_objstorage( 'pathslicing', {'root': fp, 'slicing': '0:1/0:5'}) return self.storages[obj_type] diff --git a/swh/vault/cookers/base.py b/swh/vault/cookers/base.py index 811b638..473abc4 100644 --- a/swh/vault/cookers/base.py +++ b/swh/vault/cookers/base.py @@ -1,221 +1,221 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import io import itertools import logging import os import tarfile import tempfile from pathlib import Path from swh.model import hashutil def get_tar_bytes(path, arcname=None): path = Path(path) if not arcname: arcname = path.name tar_buffer = io.BytesIO() tar = tarfile.open(fileobj=tar_buffer, mode='w') tar.add(str(path), arcname=arcname) return tar_buffer.getbuffer() SKIPPED_MESSAGE = (b'This content have not been retrieved in ' b'Software Heritage archive due to its size') HIDDEN_MESSAGE = (b'This content is hidden') class BaseVaultCooker(metaclass=abc.ABCMeta): """Abstract base class for the vault's bundle creators This class describes a common API for the cookers. To define a new cooker, inherit from this class and override: - CACHE_TYPE_KEY: key to use for the bundle to reference in cache - def cook(): cook the object into a bundle - def notify_bundle_ready(notif_data): notify the bundle is ready. """ CACHE_TYPE_KEY = None def __init__(self, storage, cache, obj_id): """Initialize the cooker. The type of the object represented by the id depends on the concrete class. Very likely, each type of bundle will have its own cooker class. Args: storage: the storage object cache: the cache where to store the bundle obj_id: id of the object to be cooked into a bundle. """ self.storage = storage self.cache = cache self.obj_id = obj_id @abc.abstractmethod def prepare_bundle(self): - """Implementation of the cooker. Returns the bundle bytes. + """Implementation of the cooker. Yields chunks of the bundle bytes. Override this with the cooker implementation. """ raise NotImplemented def cook(self): """Cook the requested object into a bundle """ - bundle_content = self.prepare_bundle() + content_iter = self.prepare_bundle() # Cache the bundle - self.update_cache(bundle_content) + self.update_cache(content_iter) # Make a notification that the bundle have been cooked # NOT YET IMPLEMENTED see TODO in function. self.notify_bundle_ready( notif_data='Bundle %s ready' % hashutil.hash_to_hex(self.obj_id)) - def update_cache(self, bundle_content): + def update_cache(self, content_iter): """Update the cache with id and bundle_content. """ - self.cache.add(self.CACHE_TYPE_KEY, self.obj_id, bundle_content) + self.cache.add_stream(self.CACHE_TYPE_KEY, self.obj_id, content_iter) def notify_bundle_ready(self, notif_data): # TODO plug this method with the notification method once # done. pass class DirectoryBuilder: """Creates a cooked directory from its sha1_git in the db. Warning: This is NOT a directly accessible cooker, but a low-level one that executes the manipulations. """ def __init__(self, storage): self.storage = storage def get_directory_bytes(self, dir_id): # Create temporary folder to retrieve the files into. root = bytes(tempfile.mkdtemp(prefix='directory.', suffix='.cook'), 'utf8') self.build_directory(dir_id, root) # Use the created directory to make a bundle with the data as # a compressed directory. bundle_content = self._create_bundle_content( root, hashutil.hash_to_hex(dir_id)) return bundle_content def build_directory(self, dir_id, root): # Retrieve data from the database. data = self.storage.directory_ls(dir_id, recursive=True) # Split into files and directory data. # TODO(seirl): also handle revision data. data1, data2 = itertools.tee(data, 2) dir_data = (entry['name'] for entry in data1 if entry['type'] == 'dir') file_data = (entry for entry in data2 if entry['type'] == 'file') # Recreate the directory's subtree and then the files into it. self._create_tree(root, dir_data) self._create_files(root, file_data) def _create_tree(self, root, directory_paths): """Create a directory tree from the given paths The tree is created from `root` and each given path in `directory_paths` will be created. """ # Directories are sorted by depth so they are created in the # right order bsep = bytes(os.path.sep, 'utf8') dir_names = sorted( directory_paths, key=lambda x: len(x.split(bsep))) for dir_name in dir_names: os.makedirs(os.path.join(root, dir_name)) def _create_files(self, root, file_datas): """Create the files according to their status. """ # Then create the files for file_data in file_datas: path = os.path.join(root, file_data['name']) status = file_data['status'] perms = file_data['perms'] if status == 'absent': self._create_file_absent(path) elif status == 'hidden': self._create_file_hidden(path) else: content = self._get_file_content(file_data['sha1']) self._create_file(path, content, perms) def _create_file(self, path, content, perms=0o100644): """Create the given file and fill it with content. """ if perms not in (0o100644, 0o100755, 0o120000): logging.warning('File {} has invalid permission {}, ' 'defaulting to 644.'.format(path, perms)) perms = 0o100644 if perms == 0o120000: # Symbolic link os.symlink(content, path) else: with open(path, 'wb') as f: f.write(content) os.chmod(path, perms & 0o777) def _get_file_content(self, obj_id): """Get the content of the given file. """ content = list(self.storage.content_get([obj_id]))[0]['data'] return content def _create_file_absent(self, path): """Create a file that indicates a skipped content Create the given file but fill it with a specific content to indicate that the content have not been retrieved by the software heritage archive due to its size. """ self._create_file(self, SKIPPED_MESSAGE) def _create_file_hidden(self, path): """Create a file that indicates an hidden content Create the given file but fill it with a specific content to indicate that the content could not be retrieved due to privacy policy. """ self._create_file(self, HIDDEN_MESSAGE) def _create_bundle_content(self, path, hex_dir_id): """Create a bundle from the given directory Args: path: location of the directory to package. hex_dir_id: hex representation of the directory id Returns: bytes that represent the compressed directory as a bundle. """ return get_tar_bytes(path.decode(), hex_dir_id) diff --git a/swh/vault/cookers/directory.py b/swh/vault/cookers/directory.py index 3c6df6b..3e09181 100644 --- a/swh/vault/cookers/directory.py +++ b/swh/vault/cookers/directory.py @@ -1,24 +1,24 @@ # Copyright (C) 2016 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from .base import BaseVaultCooker, DirectoryBuilder class DirectoryCooker(BaseVaultCooker): """Cooker to create a directory bundle """ CACHE_TYPE_KEY = 'directory' def prepare_bundle(self): """Cook the requested directory into a Bundle Args: obj_id (bytes): the id of the directory to be cooked. Returns: bytes that correspond to the bundle """ directory_builder = DirectoryBuilder(self.storage) - return directory_builder.get_directory_bytes(self.obj_id) + yield directory_builder.get_directory_bytes(self.obj_id) diff --git a/swh/vault/cookers/revision_flat.py b/swh/vault/cookers/revision_flat.py index 12422a0..49c9a60 100644 --- a/swh/vault/cookers/revision_flat.py +++ b/swh/vault/cookers/revision_flat.py @@ -1,33 +1,34 @@ # Copyright (C) 2016-2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import tempfile from pathlib import Path from swh.model import hashutil from .base import BaseVaultCooker, DirectoryBuilder, get_tar_bytes class RevisionFlatCooker(BaseVaultCooker): """Cooker to create a directory bundle """ CACHE_TYPE_KEY = 'revision_flat' def prepare_bundle(self): """Cook the requested revision into a Bundle Returns: bytes that correspond to the bundle """ directory_builder = DirectoryBuilder(self.storage) with tempfile.TemporaryDirectory(suffix='.cook') as root_tmp: root = Path(root_tmp) for revision in self.storage.revision_log([self.obj_id]): revdir = root / hashutil.hash_to_hex(revision['id']) revdir.mkdir() directory_builder.build_directory(revision['directory'], str(revdir).encode()) - return get_tar_bytes(root_tmp, hashutil.hash_to_hex(self.obj_id)) + # FIXME: stream the bytes! this tarball can be HUUUUUGE + yield get_tar_bytes(root_tmp, hashutil.hash_to_hex(self.obj_id)) diff --git a/swh/vault/cookers/revision_git.py b/swh/vault/cookers/revision_git.py index 6fbb935..c0c6581 100644 --- a/swh/vault/cookers/revision_git.py +++ b/swh/vault/cookers/revision_git.py @@ -1,204 +1,206 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import collections import fastimport.commands import functools import logging import os from .base import BaseVaultCooker class RevisionGitCooker(BaseVaultCooker): """Cooker to create a git fast-import bundle """ CACHE_TYPE_KEY = 'revision_git' def prepare_bundle(self): - commands = self.fastexport(self.storage.revision_log([self.obj_id])) - bundle_content = b'\n'.join(bytes(command) for command in commands) - return bundle_content + log = self.storage.revision_log([self.obj_id]) + commands = self.fastexport(log) + + for command in commands: + yield bytes(command) def fastexport(self, log): """Generate all the git fast-import commands from a given log. """ self.rev_by_id = {r['id']: r for r in log} self.rev_sorted = list(self._toposort(self.rev_by_id)) self.obj_done = set() self.obj_to_mark = {} self.next_available_mark = 1 # We want a single transaction for the whole export, so we store a # cursor and use it during the process. with self.storage.db.transaction() as self.cursor: for i, rev in enumerate(self.rev_sorted, 1): logging.info('Computing revision %d/%d', i, len(self.rev_sorted)) yield from self._compute_commit_command(rev) def _toposort(self, rev_by_id): """Perform a topological sort on the revision graph. """ children = collections.defaultdict(list) # rev -> children in_degree = {} # rev -> numbers of parents left to compute # Compute the in_degrees and the parents of all the revisions. # Add the roots to the processing queue. queue = collections.deque() for rev_id, rev in rev_by_id.items(): in_degree[rev_id] = len(rev['parents']) if not rev['parents']: queue.append(rev_id) for parent in rev['parents']: children[parent].append(rev_id) # Topological sort: yield the 'ready' nodes, decrease the in degree of # their children and add the 'ready' ones to the queue. while queue: rev_id = queue.popleft() yield rev_by_id[rev_id] for child in children[rev_id]: in_degree[child] -= 1 if in_degree[child] == 0: queue.append(child) def mark(self, obj_id): """Get the mark ID as bytes of a git object. If the object has not yet been marked, assign a new ID and add it to the mark dictionary. """ if obj_id not in self.obj_to_mark: self.obj_to_mark[obj_id] = self.next_available_mark self.next_available_mark += 1 return str(self.obj_to_mark[obj_id]).encode() def _compute_blob_command_content(self, file_data): """Compute the blob command of a file entry if it has not been computed yet. """ obj_id = file_data['sha1'] if obj_id in self.obj_done: return content = list(self.storage.content_get([obj_id]))[0]['data'] yield fastimport.commands.BlobCommand( mark=self.mark(obj_id), data=content, ) self.obj_done.add(obj_id) def _compute_commit_command(self, rev): """Compute a commit command from a specific revision. """ if 'parents' in rev and rev['parents']: from_ = b':' + self.mark(rev['parents'][0]) merges = [b':' + self.mark(r) for r in rev['parents'][1:]] parent = self.rev_by_id[rev['parents'][0]] else: # We issue a reset command before all the new roots so that they # are not automatically added as children of the current branch. yield fastimport.commands.ResetCommand(b'refs/heads/master', None) from_ = None merges = None parent = None # Retrieve the file commands while yielding new blob commands if # needed. files = yield from self._compute_file_commands(rev, parent) # Construct and yield the commit command author = (rev['author']['name'], rev['author']['email'], rev['date']['timestamp']['seconds'], rev['date']['offset'] * 60) committer = (rev['committer']['name'], rev['committer']['email'], rev['committer_date']['timestamp']['seconds'], rev['committer_date']['offset'] * 60) yield fastimport.commands.CommitCommand( ref=b'refs/heads/master', mark=self.mark(rev['id']), author=author, committer=committer, message=rev['message'], from_=from_, merges=merges, file_iter=files, ) @functools.lru_cache(maxsize=4096) def _get_dir_ents(self, dir_id=None): """Get the entities of a directory as a dictionary (name -> entity). This function has a cache to avoid doing multiple requests to retrieve the same entities, as doing a directory_ls() is expensive. """ data = (self.storage.directory_ls(dir_id, cur=self.cursor) if dir_id is not None else []) return {f['name']: f for f in data} def _compute_file_commands(self, rev, parent=None): """Compute all the file commands of a revision. Generate a diff of the files between the revision and its main parent to find the necessary file commands to apply. """ commands = [] # Initialize the stack with the root of the tree. cur_dir = rev['directory'] parent_dir = parent['directory'] if parent else None stack = [(b'', cur_dir, parent_dir)] while stack: # Retrieve the current directory and the directory of the parent # commit in order to compute the diff of the trees. root, cur_dir_id, prev_dir_id = stack.pop() cur_dir = self._get_dir_ents(cur_dir_id) prev_dir = self._get_dir_ents(prev_dir_id) # Find subtrees to delete: # - Subtrees that are not in the new tree (file or directory # deleted). # - Subtrees that do not have the same type in the new tree # (file -> directory or directory -> file) # After this step, every node remaining in the previous directory # has the same type than the one in the current directory. for fname, f in prev_dir.items(): if ((fname not in cur_dir or f['type'] != cur_dir[fname]['type'])): commands.append(fastimport.commands.FileDeleteCommand( path=os.path.join(root, fname) )) # Find subtrees to modify: # - Leaves (files) will be added or modified using `filemodify` # - Other subtrees (directories) will be added to the stack and # processed in the next iteration. for fname, f in cur_dir.items(): # A file is added or modified if it was not in the tree, if its # permissions changed or if its content changed. if (f['type'] == 'file' and (fname not in prev_dir or f['sha1'] != prev_dir[fname]['sha1'] or f['perms'] != prev_dir[fname]['perms'])): # Issue a blob command for the new blobs if needed. yield from self._compute_blob_command_content(f) commands.append(fastimport.commands.FileModifyCommand( path=os.path.join(root, fname), mode=f['perms'], dataref=(b':' + self.mark(f['sha1'])), data=None, )) # A directory is added or modified if it was not in the tree or # if its target changed. elif f['type'] == 'dir': f_prev_target = None if fname in prev_dir and prev_dir[fname]['type'] == 'dir': f_prev_target = prev_dir[fname]['target'] if f_prev_target is None or f['target'] != f_prev_target: stack.append((os.path.join(root, fname), f['target'], f_prev_target)) return commands