diff --git a/swh/storage/vault/cookers/revision_git.py b/swh/storage/vault/cookers/revision_git.py --- a/swh/storage/vault/cookers/revision_git.py +++ b/swh/storage/vault/cookers/revision_git.py @@ -5,6 +5,9 @@ import collections import fastimport.commands +import functools +import logging +import os from .base import BaseVaultCooker @@ -23,29 +26,36 @@ """ self.rev_by_id = {r['id']: r for r in log} self.rev_sorted = list(self._toposort(self.rev_by_id)) - self.dir_by_id = {} self.obj_done = set() self.obj_to_mark = {} self.next_available_mark = 1 - yield from self._compute_all_blob_commands() - yield from self._compute_all_commit_commands() + # We want a single transaction for the whole export, so we store a + # cursor and use it during the process. + with self.storage.db.transaction() as self.cursor: + for i, rev in enumerate(self.rev_sorted, 1): + logging.info('Computing revision %d/%d', i, + len(self.rev_sorted)) + yield from self._compute_commit_command(rev) def _toposort(self, rev_by_id): """Perform a topological sort on the revision graph. """ - children = collections.defaultdict(list) - in_degree = collections.defaultdict(int) - for rev_id, rev in rev_by_id.items(): - for parent in rev['parents']: - in_degree[rev_id] += 1 - children[parent].append(rev_id) + children = collections.defaultdict(list) # rev -> children + in_degree = {} # rev -> numbers of parents left to compute + # Compute the in_degrees and the parents of all the revisions. + # Add the roots to the processing queue. queue = collections.deque() - for rev_id in rev_by_id.keys(): - if in_degree[rev_id] == 0: + for rev_id, rev in rev_by_id.items(): + in_degree[rev_id] = len(rev['parents']) + if not rev['parents']: queue.append(rev_id) + for parent in rev['parents']: + children[parent].append(rev_id) + # Topological sort: yield the 'ready' nodes, decrease the in degree of + # their children and add the 'ready' ones to the queue. while queue: rev_id = queue.popleft() yield rev_by_id[rev_id] @@ -65,52 +75,40 @@ self.next_available_mark += 1 return str(self.obj_to_mark[obj_id]).encode() - def _compute_all_blob_commands(self): - """Compute all the blob commands to populate the empty git repository. - - Mark the populated blobs so that we are able to reference them in file - commands. - - """ - for rev in self.rev_sorted: - yield from self._compute_blob_commands_in_dir(rev['directory']) - - def _compute_blob_commands_in_dir(self, dir_id): - """Find all the blobs in a directory and generate their blob commands. - - If a blob has already been visited and marked, skip it. - """ - data = self.storage.directory_ls(dir_id, recursive=True) - files_data = list(entry for entry in data if entry['type'] == 'file') - self.dir_by_id[dir_id] = files_data - for file_data in files_data: - obj_id = file_data['sha1'] - if obj_id in self.obj_done: - continue - content = list(self.storage.content_get([obj_id]))[0]['data'] - yield fastimport.commands.BlobCommand( - mark=self.mark(obj_id), - data=content, - ) - self.obj_done.add(obj_id) - - def _compute_all_commit_commands(self): - """Compute all the commit commands. + def _compute_blob_command_content(self, file_data): + """Compute the blob command of a file entry if it has not been + computed yet. """ - for rev in self.rev_sorted: - yield from self._compute_commit_command(rev) + obj_id = file_data['sha1'] + if obj_id in self.obj_done: + return + content = list(self.storage.content_get([obj_id]))[0]['data'] + yield fastimport.commands.BlobCommand( + mark=self.mark(obj_id), + data=content, + ) + self.obj_done.add(obj_id) def _compute_commit_command(self, rev): """Compute a commit command from a specific revision. """ - from_ = None - merges = None - parent = None if 'parents' in rev and rev['parents']: from_ = b':' + self.mark(rev['parents'][0]) merges = [b':' + self.mark(r) for r in rev['parents'][1:]] parent = self.rev_by_id[rev['parents'][0]] - files = self._compute_file_commands(rev, parent) + else: + # We issue a reset command before all the new roots so that they + # are not automatically added as children of the current branch. + yield fastimport.commands.ResetCommand(b'refs/heads/master', None) + from_ = None + merges = None + parent = None + + # Retrieve the file commands while yielding new blob commands if + # needed. + files = yield from self._compute_file_commands(rev, parent) + + # Construct and yield the commit command author = (rev['author']['name'], rev['author']['email'], rev['date']['timestamp']['seconds'], @@ -130,33 +128,77 @@ file_iter=files, ) + @functools.lru_cache(maxsize=4096) + def _get_dir_ents(self, dir_id=None): + """Get the entities of a directory as a dictionary (name -> entity). + + This function has a cache to avoid doing multiple requests to retrieve + the same entities, as doing a directory_ls() is expensive. + """ + data = (self.storage.directory_ls(dir_id, cur=self.cursor) + if dir_id is not None else []) + return {f['name']: f for f in data} + def _compute_file_commands(self, rev, parent=None): """Compute all the file commands of a revision. Generate a diff of the files between the revision and its main parent to find the necessary file commands to apply. """ - if not parent: - parent_dir = [] - else: - parent_dir = self.dir_by_id[parent['directory']] - cur_dir = self.dir_by_id[rev['directory']] - parent_dir = {f['name']: f for f in parent_dir} - cur_dir = {f['name']: f for f in cur_dir} - - for fname, f in cur_dir.items(): - if ((fname not in parent_dir - or f['sha1'] != parent_dir[fname]['sha1'] - or f['perms'] != parent_dir[fname]['perms'])): - yield fastimport.commands.FileModifyCommand( - path=f['name'], - mode=f['perms'], - dataref=(b':' + self.mark(f['sha1'])), - data=None, - ) - - for fname, f in parent_dir.items(): - if fname not in cur_dir: - yield fastimport.commands.FileDeleteCommand( - path=f['name'] - ) + commands = [] + + # Initialize the stack with the root of the tree. + cur_dir = rev['directory'] + parent_dir = parent['directory'] if parent else None + stack = [(b'', cur_dir, parent_dir)] + + while stack: + # Retrieve the current directory and the directory of the parent + # commit in order to compute the diff of the trees. + root, cur_dir_id, prev_dir_id = stack.pop() + cur_dir = self._get_dir_ents(cur_dir_id) + prev_dir = self._get_dir_ents(prev_dir_id) + + # Find subtrees to delete: + # - Subtrees that are not in the new tree (file or directory + # deleted). + # - Subtrees that do not have the same type in the new tree + # (file -> directory or directory -> file) + # After this step, every node remaining in the previous directory + # has the same type than the one in the current directory. + for fname, f in prev_dir.items(): + if ((fname not in cur_dir + or f['type'] != cur_dir[fname]['type'])): + commands.append(fastimport.commands.FileDeleteCommand( + path=os.path.join(root, fname) + )) + + # Find subtrees to modify: + # - Leaves (files) will be added or modified using `filemodify` + # - Other subtrees (directories) will be added to the stack and + # processed in the next iteration. + for fname, f in cur_dir.items(): + # A file is added or modified if it was not in the tree, if its + # permissions changed or if its content changed. + if (f['type'] == 'file' + and (fname not in prev_dir + or f['sha1'] != prev_dir[fname]['sha1'] + or f['perms'] != prev_dir[fname]['perms'])): + # Issue a blob command for the new blobs if needed. + yield from self._compute_blob_command_content(f) + commands.append(fastimport.commands.FileModifyCommand( + path=os.path.join(root, fname), + mode=f['perms'], + dataref=(b':' + self.mark(f['sha1'])), + data=None, + )) + # A directory is added or modified if it was not in the tree or + # if its target changed. + elif f['type'] == 'dir': + f_prev_target = None + if fname in prev_dir and prev_dir[fname]['type'] == 'dir': + f_prev_target = prev_dir[fname]['target'] + if f_prev_target is None or f['target'] != f_prev_target: + stack.append((os.path.join(root, fname), + f['target'], f_prev_target)) + return commands