Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/storage/vault/api/cooking_tasks.py b/swh/storage/vault/api/cooking_tasks.py
index feee6f10..cbe6ef30 100644
--- a/swh/storage/vault/api/cooking_tasks.py
+++ b/swh/storage/vault/api/cooking_tasks.py
@@ -1,24 +1,32 @@
# Copyright (C) 2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from swh.scheduler.task import Task
from swh.core import hashutil
from ..cache import VaultCache
from ..cooker import DirectoryVaultCooker
from ... import get_storage
+COOKER_TYPES = {
+ 'directory': DirectoryVaultCooker
+}
+
+
class SWHCookingTask(Task):
""" Main task that archive a batch of content.
"""
task_queue = 'swh_storage_vault_cooking'
- def run(self, hex_dir_id, storage_args, cache_args):
+ def run(self, type, hex_dir_id, storage_args, cache_args):
+ # Initialize elements
storage = get_storage(**storage_args)
cache = VaultCache(**cache_args)
- directory_cooker = DirectoryVaultCooker(storage, cache)
-
+ # Initialize cooker
+ vault_cooker_class = COOKER_TYPES[type]
+ cooker = vault_cooker_class(storage, cache)
+ # Perform the cooking
dir_id = hashutil.hex_to_hash(hex_dir_id)
- directory_cooker.cook(dir_id)
+ cooker.cook(dir_id)
diff --git a/swh/storage/vault/api/server.py b/swh/storage/vault/api/server.py
index 075c751b..ba9e6a69 100644
--- a/swh/storage/vault/api/server.py
+++ b/swh/storage/vault/api/server.py
@@ -1,92 +1,101 @@
# Copyright (C) 2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import click
from flask import Flask, abort, g
+from werkzeug.routing import BaseConverter
from swh.core import config
from swh.objstorage.api.common import encode_data_server as encode_data
from swh.objstorage.api.common import BytesRequest, error_handler
from swh.storage import get_storage
from swh.storage.vault.api import cooking_tasks # NOQA
from swh.storage.vault.cache import VaultCache
from swh.storage.vault.cooker import DirectoryVaultCooker
from swh.scheduler.celery_backend.config import app as celery_app
-from flask_profile import Profiler
cooking_task_name = 'swh.storage.vault.api.cooking_tasks.SWHCookingTask'
DEFAULT_CONFIG = {
'storage': ('dict', {'storage_class': 'local_storage',
'storage_args': [
'dbname=softwareheritage-dev',
'/tmp/objects'
]
}),
'cache': ('dict', {'root': '/tmp/vaultcache'})
}
+class RegexConverter(BaseConverter):
+ def __init__(self, url_map, *items):
+ super().__init__(url_map)
+ self.regex = items[0]
+
+
app = Flask(__name__)
-Profiler(app)
app.request_class = BytesRequest
+app.url_map.converters['regex'] = RegexConverter
@app.errorhandler(Exception)
def my_error_handler(exception):
return error_handler(exception, encode_data)
@app.before_request
def before_request():
g.cache = VaultCache(**app.config['cache'])
g.cooker = DirectoryVaultCooker(
get_storage(**app.config['storage']),
g.cache
)
@app.route('/')
def index():
return 'SWH vault API server'
-@app.route('/vault/directory/', methods=['GET'])
-def ls_directory():
+@app.route('/vault/<regex("directory|revision|snapshot"):type>/',
+ methods=['GET'])
+def ls_directory(type):
return encode_data(list(
- g.cache.ls('directory')
+ g.cache.ls(type)
))
-@app.route('/vault/directory/<dir_id>/', methods=['GET'])
-def get_cooked_directory(dir_id):
- if not g.cache.is_cached('directory', dir_id):
+@app.route('/vault/<regex("directory|revision|snapshot"):type>/<id>/',
+ methods=['GET'])
+def get_cooked_directory(type, id):
+ if not g.cache.is_cached(type, id):
abort(404)
- return encode_data(g.cache.get('directory', dir_id).decode())
+ return encode_data(g.cache.get(type, id).decode())
-@app.route('/vault/directory/<dir_id>/', methods=['POST'])
-def cook_request_directory(dir_id):
+@app.route('/vault/<regex("directory|revision|snapshot"):type>/<id>/',
+ methods=['POST'])
+def cook_request_directory(type, id):
task = celery_app.tasks[cooking_task_name]
- task.delay(dir_id, app.config['storage'], app.config['cache'])
+ task.delay(type, id, app.config['storage'], app.config['cache'])
# Return url to get the content and 201 CREATED
- return encode_data('/vault/directory/dir_id/'), 201
+ return encode_data('/vault/%s/%s/' % (type, id)), 201
@click.command()
@click.argument('config-path', required=1)
@click.option('--host', default='0.0.0.0', help="Host to run the server")
@click.option('--port', default=5000, type=click.INT,
help="Binding port of the server")
@click.option('--debug/--nodebug', default=True,
help="Indicates if the server should run in debug mode")
def launch(config_path, host, port, debug):
app.config.update(config.read(config_path, DEFAULT_CONFIG))
app.run(host, port=int(port), debug=bool(debug))
if __name__ == '__main__':
launch()
diff --git a/swh/storage/vault/cache.py b/swh/storage/vault/cache.py
index 588682ae..cbc662c8 100644
--- a/swh/storage/vault/cache.py
+++ b/swh/storage/vault/cache.py
@@ -1,64 +1,69 @@
# Copyright (C) 2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
from swh.core import hashutil
from swh.objstorage import get_objstorage
from swh.objstorage.objstorage_pathslicing import DIR_MODE
BUNDLE_TYPES = {
'directory': 'd',
+ 'revision': 'r',
+ 'snapshot': 's',
}
class VaultCache():
"""The vault cache is an object storage that stores bundles
The current implementation uses a PathSlicingObjStorage to store
the bundles. The id of a content if prefixed to specify its type
and store different types of bundle in different folders.
"""
def __init__(self, root):
for subdir in BUNDLE_TYPES.values():
fp = os.path.join(root, subdir)
if not os.path.isdir(fp):
os.makedirs(fp, DIR_MODE, exist_ok=True)
self.storages = {
type: get_objstorage(
'pathslicing', {'root': os.path.join(root, subdir),
'slicing': '0:1/0:5'}
)
for type, subdir in BUNDLE_TYPES.items()
}
def __contains__(self, obj_id):
- return obj_id in self.storage
+ for storage in self.storages:
+ if obj_id in storage:
+ return True
+ return False
def add(self, obj_type, obj_id, content):
storage = self._get_storage(obj_type)
return storage.add(content, obj_id)
def get(self, obj_type, obj_id):
storage = self._get_storage(obj_type)
return storage.get(hashutil.hex_to_hash(obj_id))
def is_cached(self, obj_type, obj_id):
storage = self._get_storage(obj_type)
return hashutil.hex_to_hash(obj_id) in storage
def ls(self, obj_type):
storage = self._get_storage(obj_type)
yield from storage
def _get_storage(self, obj_type):
"""Get the storage that corresponds to the object type"""
try:
return self.storages[obj_type]
except:
raise ValueError('Wrong bundle type: ' + obj_type)
diff --git a/swh/storage/vault/cooker.py b/swh/storage/vault/cooker.py
index b48c4a83..e4ae53ab 100644
--- a/swh/storage/vault/cooker.py
+++ b/swh/storage/vault/cooker.py
@@ -1,173 +1,216 @@
# Copyright (C) 2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import abc
import io
import os
import tarfile
import tempfile
import itertools
from swh.core import hashutil
+
SKIPPED_MESSAGE = (b'This content have not been retrieved in '
b'Software Heritage archive due to its size')
HIDDEN_MESSAGE = (b'This content is hidden')
class BaseVaultCooker(metaclass=abc.ABCMeta):
"""Abstract base class for the vault's bundle creators
This class describes a common API for the cookers.
"""
+ def __init__(self, storage, cache):
+ self.storage = storage
+ self.cache = cache
+
@abc.abstractmethod
def cook(self, obj_id):
"""Cook the requested object into a bundle
The type of the object represented by the id depends on the
concrete class. Very likely, each type of bundle will have its
own cooker class.
Args:
obj_id: id of the object to be cooked into a bundle.
"""
raise NotImplementedError(
'Vault cookers must implement a `cook` method')
+ @abc.abstractmethod
+ def update_cache(self, id, bundle_content):
+ raise NotImplementedError('Vault cookers must implement a '
+ '`update_cache` method')
+
+ @abc.abstractmethod
+ def notify_bundle_ready(self, notif_data, bundle_id):
+ raise NotImplementedError(
+ 'Vault cookers must implement a `notify_bundle_ready` method')
+
class DirectoryVaultCooker(BaseVaultCooker):
"""Cooker to create a directory bundle """
def __init__(self, storage, cache):
"""Initialize a cooker that create directory bundles
Args:
storage: source storage where content are retrieved.
cache: destination storage where the cooked bundle are stored.
"""
self.storage = storage
self.cache = cache
def cook(self, dir_id):
"""Cook the requested directory into a Bundle
Args:
dir_id (bytes): the id of the directory to be cooked.
Returns:
bytes that correspond to the bundle
"""
- root = bytes(tempfile.mkdtemp(prefix='directory.', suffix='.cook'),
- 'utf8')
- # Retrieve data from the database
+ # Create the bytes that corresponds to the compressed
+ # directory.
+ directory_cooker = DirectoryCooker(self.storage)
+ bundle_content = directory_cooker.get_directory_bytes(dir_id)
+ # Cache the bundle
+ self._cache_bundle(dir_id, bundle_content)
+ # Make a notification that the bundle have been cooked
+ # NOT YET IMPLEMENTED see TODO in function.
+ self._notify_bundle_ready(dir_id)
+
+ def update_cache(self, dir_id, bundle_content):
+ self.cache.add('directory', dir_id, bundle_content)
+
+ def notify_bundle_ready(self, bundle_id):
+ # TODO plug this method with the notification method once
+ # done.
+ pass
+
+
+class DirectoryCooker():
+ """Creates a cooked directory from its sha1_git in the db.
+
+ Warning: This is NOT a directly accessible cooker, but a low-level
+ one that effectuates the manipulations.
+
+ """
+ def __init__(self, storage):
+ self.storage = storage
+
+ def get_directory_bytes(self, dir_id):
+ # Create temporary folder to retrieve the files into.
+ root = bytes(tempfile.mkdtemp(prefix='directory.',
+ suffix='.cook'), 'utf8')
+ # Retrieve data from the database.
data = list(self.storage.directory_ls(dir_id, recursive=True))
+ # Split into files and directory data.
data1, data2 = itertools.tee(data, 2)
dir_data = (entry['name'] for entry in data1 if entry['type'] == 'dir')
file_data = (entry for entry in data2 if entry['type'] == 'file')
- # Recreate the directory
+ # Recreate the directory's subtree and then the files into it.
self._create_tree(root, dir_data)
self._create_files(root, file_data)
- # Use the created directory to get the bundle datas
+ # Use the created directory to make a bundle with the data as
+ # a compressed directory.
bundle_content = self._create_bundle_content(
root,
hashutil.hash_to_hex(dir_id)
)
- self._cache_bundle(dir_id, bundle_content)
-
- # Make a notification that the bundle have been cooked
- self._notify_bundle_ready(dir_id)
+ return bundle_content
def _create_tree(self, root, directory_paths):
"""Create a directory tree from the given paths
The tree is created from `root` and each given path in
`directory_paths` will be created.
"""
# Directories are sorted by depth so they are created in the
# right order
bsep = bytes(os.path.sep, 'utf8')
dir_names = sorted(
directory_paths,
key=lambda x: len(x.split(bsep))
)
for dir_name in dir_names:
os.makedirs(os.path.join(root, dir_name))
def _create_files(self, root, file_datas):
- """Iterates over the file datas and delegate to the right method.
+ """Create the files according to their status.
"""
# Then create the files
for file_data in file_datas:
path = os.path.join(root, file_data['name'])
status = file_data['status']
if status == 'absent':
self._create_file_absent(path)
elif status == 'hidden':
self._create_file_hidden(path)
else:
content = self._get_file_content(file_data['sha1'])
self._create_file(path, content)
- def _get_file_content(self, obj_id):
- content = list(self.storage.content_get([obj_id]))[0]['data']
- return content
-
def _create_file(self, path, content):
- """Create the given file and fill it with content."""
+ """Create the given file and fill it with content.
+
+ """
with open(path, 'wb') as f:
f.write(content)
+ def _get_file_content(self, obj_id):
+ """Get the content of the given file.
+
+ """
+ content = list(self.storage.content_get([obj_id]))[0]['data']
+ return content
+
def _create_file_absent(self, path):
"""Create a file that indicates a skipped content
Create the given file but fill it with a specific content to
indicates that the content have not been retrieved by the
software heritage archive due to its size.
"""
self._create_file(self, SKIPPED_MESSAGE)
def _create_file_hidden(self, path):
"""Create a file that indicates an hidden content
Create the given file but fill it with a specific content to
indicates that the content could not be retrieved due to
privacy policy.
"""
self._create_file(self, HIDDEN_MESSAGE)
def _create_bundle_content(self, path, hex_dir_id):
"""Create a bundle from the given directory
Args:
path: location of the directory to package.
hex_dir_id: hex representation of the directory id
Returns:
- a path to the newly created archive file.
+ bytes that represents the compressed directory as a
+ bundle.
"""
tar_buffer = io.BytesIO()
tar = tarfile.open(fileobj=tar_buffer, mode='w')
tar.add(path.decode(), arcname=hex_dir_id)
return tar_buffer.getbuffer()
-
- def _cache_bundle(self, dir_id, bundle_content):
- self.cache.add('directory', dir_id, bundle_content)
-
- def _notify_bundle_ready(self, bundle_id):
- # TODO plug this method with the notification method once
- # done.
- pass

File Metadata

Mime Type
text/x-diff
Expires
Thu, Jul 3, 10:43 AM (1 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3295146

Event Timeline