Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9125484
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
18 KB
Subscribers
None
View Options
diff --git a/swh/vault/api/cooking_tasks.py b/swh/vault/api/cooking_tasks.py
index 442547a..44a81e4 100644
--- a/swh/vault/api/cooking_tasks.py
+++ b/swh/vault/api/cooking_tasks.py
@@ -1,26 +1,27 @@
# Copyright (C) 2016-2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from swh.scheduler.task import Task
from swh.model import hashutil
from ..cache import VaultCache
from ..cookers import COOKER_TYPES
from ... import get_storage
class SWHCookingTask(Task):
"""Main task which archives a contents batch.
"""
task_queue = 'swh_storage_vault_cooking'
def run(self, type, hex_id, storage_args, cache_args):
# Initialize elements
storage = get_storage(**storage_args)
cache = VaultCache(**cache_args)
# Initialize cooker
- cooker = COOKER_TYPES[type](storage, cache)
+ obj_id = hashutil.hash_to_bytes(hex_id)
+ cooker = COOKER_TYPES[type](storage, cache, obj_id)
# Perform the cooking
- cooker.cook(obj_id=hashutil.hash_to_bytes(hex_id))
+ cooker.cook()
diff --git a/swh/vault/cookers/base.py b/swh/vault/cookers/base.py
index 492cf5e..811b638 100644
--- a/swh/vault/cookers/base.py
+++ b/swh/vault/cookers/base.py
@@ -1,218 +1,221 @@
# Copyright (C) 2016-2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import abc
import io
import itertools
import logging
import os
import tarfile
import tempfile
from pathlib import Path
from swh.model import hashutil
def get_tar_bytes(path, arcname=None):
path = Path(path)
if not arcname:
arcname = path.name
tar_buffer = io.BytesIO()
tar = tarfile.open(fileobj=tar_buffer, mode='w')
tar.add(str(path), arcname=arcname)
return tar_buffer.getbuffer()
SKIPPED_MESSAGE = (b'This content have not been retrieved in '
b'Software Heritage archive due to its size')
HIDDEN_MESSAGE = (b'This content is hidden')
class BaseVaultCooker(metaclass=abc.ABCMeta):
"""Abstract base class for the vault's bundle creators
This class describes a common API for the cookers.
To define a new cooker, inherit from this class and override:
- CACHE_TYPE_KEY: key to use for the bundle to reference in cache
- - def cook(obj_id): cook the object into a bundle
- - def notify_bundle_ready(notif_data, bundle_id): notify the
+ - def cook(): cook the object into a bundle
+ - def notify_bundle_ready(notif_data): notify the
bundle is ready.
"""
CACHE_TYPE_KEY = None
- def __init__(self, storage, cache):
+ def __init__(self, storage, cache, obj_id):
+ """Initialize the cooker.
+
+ The type of the object represented by the id depends on the
+ concrete class. Very likely, each type of bundle will have its
+ own cooker class.
+
+ Args:
+ storage: the storage object
+ cache: the cache where to store the bundle
+ obj_id: id of the object to be cooked into a bundle.
+ """
self.storage = storage
self.cache = cache
+ self.obj_id = obj_id
@abc.abstractmethod
- def prepare_bundle(self, obj_id):
+ def prepare_bundle(self):
"""Implementation of the cooker. Returns the bundle bytes.
Override this with the cooker implementation.
"""
raise NotImplemented
- def cook(self, obj_id):
+ def cook(self):
"""Cook the requested object into a bundle
-
- The type of the object represented by the id depends on the
- concrete class. Very likely, each type of bundle will have its
- own cooker class.
-
- Args:
- obj_id: id of the object to be cooked into a bundle.
-
"""
- bundle_content = self.prepare_bundle(obj_id)
+ bundle_content = self.prepare_bundle()
# Cache the bundle
- self.update_cache(obj_id, bundle_content)
+ self.update_cache(bundle_content)
# Make a notification that the bundle have been cooked
# NOT YET IMPLEMENTED see TODO in function.
self.notify_bundle_ready(
- notif_data='Bundle %s ready' % hashutil.hash_to_hex(obj_id),
- bundle_id=obj_id)
+ notif_data='Bundle %s ready' % hashutil.hash_to_hex(self.obj_id))
- def update_cache(self, id, bundle_content):
+ def update_cache(self, bundle_content):
"""Update the cache with id and bundle_content.
"""
- self.cache.add(self.CACHE_TYPE_KEY, id, bundle_content)
+ self.cache.add(self.CACHE_TYPE_KEY, self.obj_id, bundle_content)
- def notify_bundle_ready(self, notif_data, bundle_id):
+ def notify_bundle_ready(self, notif_data):
# TODO plug this method with the notification method once
# done.
pass
class DirectoryBuilder:
"""Creates a cooked directory from its sha1_git in the db.
Warning: This is NOT a directly accessible cooker, but a low-level
one that executes the manipulations.
"""
def __init__(self, storage):
self.storage = storage
def get_directory_bytes(self, dir_id):
# Create temporary folder to retrieve the files into.
root = bytes(tempfile.mkdtemp(prefix='directory.',
suffix='.cook'), 'utf8')
self.build_directory(dir_id, root)
# Use the created directory to make a bundle with the data as
# a compressed directory.
bundle_content = self._create_bundle_content(
root,
hashutil.hash_to_hex(dir_id))
return bundle_content
def build_directory(self, dir_id, root):
# Retrieve data from the database.
data = self.storage.directory_ls(dir_id, recursive=True)
# Split into files and directory data.
# TODO(seirl): also handle revision data.
data1, data2 = itertools.tee(data, 2)
dir_data = (entry['name'] for entry in data1 if entry['type'] == 'dir')
file_data = (entry for entry in data2 if entry['type'] == 'file')
# Recreate the directory's subtree and then the files into it.
self._create_tree(root, dir_data)
self._create_files(root, file_data)
def _create_tree(self, root, directory_paths):
"""Create a directory tree from the given paths
The tree is created from `root` and each given path in
`directory_paths` will be created.
"""
# Directories are sorted by depth so they are created in the
# right order
bsep = bytes(os.path.sep, 'utf8')
dir_names = sorted(
directory_paths,
key=lambda x: len(x.split(bsep)))
for dir_name in dir_names:
os.makedirs(os.path.join(root, dir_name))
def _create_files(self, root, file_datas):
"""Create the files according to their status.
"""
# Then create the files
for file_data in file_datas:
path = os.path.join(root, file_data['name'])
status = file_data['status']
perms = file_data['perms']
if status == 'absent':
self._create_file_absent(path)
elif status == 'hidden':
self._create_file_hidden(path)
else:
content = self._get_file_content(file_data['sha1'])
self._create_file(path, content, perms)
def _create_file(self, path, content, perms=0o100644):
"""Create the given file and fill it with content.
"""
if perms not in (0o100644, 0o100755, 0o120000):
logging.warning('File {} has invalid permission {}, '
'defaulting to 644.'.format(path, perms))
perms = 0o100644
if perms == 0o120000: # Symbolic link
os.symlink(content, path)
else:
with open(path, 'wb') as f:
f.write(content)
os.chmod(path, perms & 0o777)
def _get_file_content(self, obj_id):
"""Get the content of the given file.
"""
content = list(self.storage.content_get([obj_id]))[0]['data']
return content
def _create_file_absent(self, path):
"""Create a file that indicates a skipped content
Create the given file but fill it with a specific content to
indicate that the content have not been retrieved by the
software heritage archive due to its size.
"""
self._create_file(self, SKIPPED_MESSAGE)
def _create_file_hidden(self, path):
"""Create a file that indicates an hidden content
Create the given file but fill it with a specific content to
indicate that the content could not be retrieved due to
privacy policy.
"""
self._create_file(self, HIDDEN_MESSAGE)
def _create_bundle_content(self, path, hex_dir_id):
"""Create a bundle from the given directory
Args:
path: location of the directory to package.
hex_dir_id: hex representation of the directory id
Returns:
bytes that represent the compressed directory as a bundle.
"""
return get_tar_bytes(path.decode(), hex_dir_id)
diff --git a/swh/vault/cookers/directory.py b/swh/vault/cookers/directory.py
index 57306d3..3c6df6b 100644
--- a/swh/vault/cookers/directory.py
+++ b/swh/vault/cookers/directory.py
@@ -1,24 +1,24 @@
# Copyright (C) 2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from .base import BaseVaultCooker, DirectoryBuilder
class DirectoryCooker(BaseVaultCooker):
"""Cooker to create a directory bundle """
CACHE_TYPE_KEY = 'directory'
- def prepare_bundle(self, obj_id):
+ def prepare_bundle(self):
"""Cook the requested directory into a Bundle
Args:
obj_id (bytes): the id of the directory to be cooked.
Returns:
bytes that correspond to the bundle
"""
directory_builder = DirectoryBuilder(self.storage)
- return directory_builder.get_directory_bytes(obj_id)
+ return directory_builder.get_directory_bytes(self.obj_id)
diff --git a/swh/vault/cookers/revision_flat.py b/swh/vault/cookers/revision_flat.py
index c0cc393..12422a0 100644
--- a/swh/vault/cookers/revision_flat.py
+++ b/swh/vault/cookers/revision_flat.py
@@ -1,36 +1,33 @@
# Copyright (C) 2016-2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import tempfile
from pathlib import Path
from swh.model import hashutil
from .base import BaseVaultCooker, DirectoryBuilder, get_tar_bytes
class RevisionFlatCooker(BaseVaultCooker):
"""Cooker to create a directory bundle """
CACHE_TYPE_KEY = 'revision_flat'
- def prepare_bundle(self, obj_id):
+ def prepare_bundle(self):
"""Cook the requested revision into a Bundle
- Args:
- obj_id (bytes): the id of the revision to be cooked.
-
Returns:
bytes that correspond to the bundle
"""
directory_builder = DirectoryBuilder(self.storage)
with tempfile.TemporaryDirectory(suffix='.cook') as root_tmp:
root = Path(root_tmp)
- for revision in self.storage.revision_log([obj_id]):
+ for revision in self.storage.revision_log([self.obj_id]):
revdir = root / hashutil.hash_to_hex(revision['id'])
revdir.mkdir()
directory_builder.build_directory(revision['directory'],
str(revdir).encode())
- return get_tar_bytes(root_tmp, hashutil.hash_to_hex(obj_id))
+ return get_tar_bytes(root_tmp, hashutil.hash_to_hex(self.obj_id))
diff --git a/swh/vault/cookers/revision_git.py b/swh/vault/cookers/revision_git.py
index dca6656..df39f03 100644
--- a/swh/vault/cookers/revision_git.py
+++ b/swh/vault/cookers/revision_git.py
@@ -1,162 +1,162 @@
# Copyright (C) 2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import collections
import fastimport.commands
from .base import BaseVaultCooker
class RevisionGitCooker(BaseVaultCooker):
"""Cooker to create a git fast-import bundle """
CACHE_TYPE_KEY = 'revision_git'
- def prepare_bundle(self, obj_id):
- commands = self.fastexport(self.storage.revision_log([obj_id]))
+ def prepare_bundle(self):
+ commands = self.fastexport(self.storage.revision_log([self.obj_id]))
bundle_content = b'\n'.join(bytes(command) for command in commands)
return bundle_content
def fastexport(self, log):
"""Generate all the git fast-import commands from a given log.
"""
self.rev_by_id = {r['id']: r for r in log}
self.rev_sorted = list(self._toposort(self.rev_by_id))
self.dir_by_id = {}
self.obj_done = set()
self.obj_to_mark = {}
self.next_available_mark = 1
yield from self._compute_all_blob_commands()
yield from self._compute_all_commit_commands()
def _toposort(self, rev_by_id):
"""Perform a topological sort on the revision graph.
"""
children = collections.defaultdict(list)
in_degree = collections.defaultdict(int)
for rev_id, rev in rev_by_id.items():
for parent in rev['parents']:
in_degree[rev_id] += 1
children[parent].append(rev_id)
queue = collections.deque()
for rev_id in rev_by_id.keys():
if in_degree[rev_id] == 0:
queue.append(rev_id)
while queue:
rev_id = queue.popleft()
yield rev_by_id[rev_id]
for child in children[rev_id]:
in_degree[child] -= 1
if in_degree[child] == 0:
queue.append(child)
def mark(self, obj_id):
"""Get the mark ID as bytes of a git object.
If the object has not yet been marked, assign a new ID and add it to
the mark dictionary.
"""
if obj_id not in self.obj_to_mark:
self.obj_to_mark[obj_id] = self.next_available_mark
self.next_available_mark += 1
return str(self.obj_to_mark[obj_id]).encode()
def _compute_all_blob_commands(self):
"""Compute all the blob commands to populate the empty git repository.
Mark the populated blobs so that we are able to reference them in file
commands.
"""
for rev in self.rev_sorted:
yield from self._compute_blob_commands_in_dir(rev['directory'])
def _compute_blob_commands_in_dir(self, dir_id):
"""Find all the blobs in a directory and generate their blob commands.
If a blob has already been visited and marked, skip it.
"""
data = self.storage.directory_ls(dir_id, recursive=True)
files_data = list(entry for entry in data if entry['type'] == 'file')
self.dir_by_id[dir_id] = files_data
for file_data in files_data:
obj_id = file_data['sha1']
if obj_id in self.obj_done:
continue
content = list(self.storage.content_get([obj_id]))[0]['data']
yield fastimport.commands.BlobCommand(
mark=self.mark(obj_id),
data=content,
)
self.obj_done.add(obj_id)
def _compute_all_commit_commands(self):
"""Compute all the commit commands.
"""
for rev in self.rev_sorted:
yield from self._compute_commit_command(rev)
def _compute_commit_command(self, rev):
"""Compute a commit command from a specific revision.
"""
from_ = None
merges = None
parent = None
if 'parents' in rev and rev['parents']:
from_ = b':' + self.mark(rev['parents'][0])
merges = [b':' + self.mark(r) for r in rev['parents'][1:]]
parent = self.rev_by_id[rev['parents'][0]]
files = self._compute_file_commands(rev, parent)
author = (rev['author']['name'],
rev['author']['email'],
rev['date']['timestamp']['seconds'],
rev['date']['offset'] * 60)
committer = (rev['committer']['name'],
rev['committer']['email'],
rev['committer_date']['timestamp']['seconds'],
rev['committer_date']['offset'] * 60)
yield fastimport.commands.CommitCommand(
ref=b'refs/heads/master',
mark=self.mark(rev['id']),
author=author,
committer=committer,
message=rev['message'],
from_=from_,
merges=merges,
file_iter=files,
)
def _compute_file_commands(self, rev, parent=None):
"""Compute all the file commands of a revision.
Generate a diff of the files between the revision and its main parent
to find the necessary file commands to apply.
"""
if not parent:
parent_dir = []
else:
parent_dir = self.dir_by_id[parent['directory']]
cur_dir = self.dir_by_id[rev['directory']]
parent_dir = {f['name']: f for f in parent_dir}
cur_dir = {f['name']: f for f in cur_dir}
for fname, f in cur_dir.items():
if ((fname not in parent_dir
or f['sha1'] != parent_dir[fname]['sha1']
or f['perms'] != parent_dir[fname]['perms'])):
yield fastimport.commands.FileModifyCommand(
path=f['name'],
mode=f['perms'],
dataref=(b':' + self.mark(f['sha1'])),
data=None,
)
for fname, f in parent_dir.items():
if fname not in cur_dir:
yield fastimport.commands.FileDeleteCommand(
path=f['name']
)
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sat, Jun 21, 8:55 PM (3 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3316077
Attached To
rDVAU Software Heritage Vault
Event Timeline
Log In to Comment