Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9312127
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
16 KB
Subscribers
None
View Options
diff --git a/swh/storage/vault/api/cooking_tasks.py b/swh/storage/vault/api/cooking_tasks.py
index feee6f10..cbe6ef30 100644
--- a/swh/storage/vault/api/cooking_tasks.py
+++ b/swh/storage/vault/api/cooking_tasks.py
@@ -1,24 +1,32 @@
# Copyright (C) 2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from swh.scheduler.task import Task
from swh.core import hashutil
from ..cache import VaultCache
from ..cooker import DirectoryVaultCooker
from ... import get_storage
+COOKER_TYPES = {
+ 'directory': DirectoryVaultCooker
+}
+
+
class SWHCookingTask(Task):
""" Main task that archive a batch of content.
"""
task_queue = 'swh_storage_vault_cooking'
- def run(self, hex_dir_id, storage_args, cache_args):
+ def run(self, type, hex_dir_id, storage_args, cache_args):
+ # Initialize elements
storage = get_storage(**storage_args)
cache = VaultCache(**cache_args)
- directory_cooker = DirectoryVaultCooker(storage, cache)
-
+ # Initialize cooker
+ vault_cooker_class = COOKER_TYPES[type]
+ cooker = vault_cooker_class(storage, cache)
+ # Perform the cooking
dir_id = hashutil.hex_to_hash(hex_dir_id)
- directory_cooker.cook(dir_id)
+ cooker.cook(dir_id)
diff --git a/swh/storage/vault/api/server.py b/swh/storage/vault/api/server.py
index 075c751b..ba9e6a69 100644
--- a/swh/storage/vault/api/server.py
+++ b/swh/storage/vault/api/server.py
@@ -1,92 +1,101 @@
# Copyright (C) 2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import click
from flask import Flask, abort, g
+from werkzeug.routing import BaseConverter
from swh.core import config
from swh.objstorage.api.common import encode_data_server as encode_data
from swh.objstorage.api.common import BytesRequest, error_handler
from swh.storage import get_storage
from swh.storage.vault.api import cooking_tasks # NOQA
from swh.storage.vault.cache import VaultCache
from swh.storage.vault.cooker import DirectoryVaultCooker
from swh.scheduler.celery_backend.config import app as celery_app
-from flask_profile import Profiler
cooking_task_name = 'swh.storage.vault.api.cooking_tasks.SWHCookingTask'
DEFAULT_CONFIG = {
'storage': ('dict', {'storage_class': 'local_storage',
'storage_args': [
'dbname=softwareheritage-dev',
'/tmp/objects'
]
}),
'cache': ('dict', {'root': '/tmp/vaultcache'})
}
+class RegexConverter(BaseConverter):
+ def __init__(self, url_map, *items):
+ super().__init__(url_map)
+ self.regex = items[0]
+
+
app = Flask(__name__)
-Profiler(app)
app.request_class = BytesRequest
+app.url_map.converters['regex'] = RegexConverter
@app.errorhandler(Exception)
def my_error_handler(exception):
return error_handler(exception, encode_data)
@app.before_request
def before_request():
g.cache = VaultCache(**app.config['cache'])
g.cooker = DirectoryVaultCooker(
get_storage(**app.config['storage']),
g.cache
)
@app.route('/')
def index():
return 'SWH vault API server'
-@app.route('/vault/directory/', methods=['GET'])
-def ls_directory():
+@app.route('/vault/<regex("directory|revision|snapshot"):type>/',
+ methods=['GET'])
+def ls_directory(type):
return encode_data(list(
- g.cache.ls('directory')
+ g.cache.ls(type)
))
-@app.route('/vault/directory/<dir_id>/', methods=['GET'])
-def get_cooked_directory(dir_id):
- if not g.cache.is_cached('directory', dir_id):
+@app.route('/vault/<regex("directory|revision|snapshot"):type>/<id>/',
+ methods=['GET'])
+def get_cooked_directory(type, id):
+ if not g.cache.is_cached(type, id):
abort(404)
- return encode_data(g.cache.get('directory', dir_id).decode())
+ return encode_data(g.cache.get(type, id).decode())
-@app.route('/vault/directory/<dir_id>/', methods=['POST'])
-def cook_request_directory(dir_id):
+@app.route('/vault/<regex("directory|revision|snapshot"):type>/<id>/',
+ methods=['POST'])
+def cook_request_directory(type, id):
task = celery_app.tasks[cooking_task_name]
- task.delay(dir_id, app.config['storage'], app.config['cache'])
+ task.delay(type, id, app.config['storage'], app.config['cache'])
# Return url to get the content and 201 CREATED
- return encode_data('/vault/directory/dir_id/'), 201
+ return encode_data('/vault/%s/%s/' % (type, id)), 201
@click.command()
@click.argument('config-path', required=1)
@click.option('--host', default='0.0.0.0', help="Host to run the server")
@click.option('--port', default=5000, type=click.INT,
help="Binding port of the server")
@click.option('--debug/--nodebug', default=True,
help="Indicates if the server should run in debug mode")
def launch(config_path, host, port, debug):
app.config.update(config.read(config_path, DEFAULT_CONFIG))
app.run(host, port=int(port), debug=bool(debug))
if __name__ == '__main__':
launch()
diff --git a/swh/storage/vault/cache.py b/swh/storage/vault/cache.py
index 588682ae..cbc662c8 100644
--- a/swh/storage/vault/cache.py
+++ b/swh/storage/vault/cache.py
@@ -1,64 +1,69 @@
# Copyright (C) 2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
from swh.core import hashutil
from swh.objstorage import get_objstorage
from swh.objstorage.objstorage_pathslicing import DIR_MODE
BUNDLE_TYPES = {
'directory': 'd',
+ 'revision': 'r',
+ 'snapshot': 's',
}
class VaultCache():
"""The vault cache is an object storage that stores bundles
The current implementation uses a PathSlicingObjStorage to store
the bundles. The id of a content if prefixed to specify its type
and store different types of bundle in different folders.
"""
def __init__(self, root):
for subdir in BUNDLE_TYPES.values():
fp = os.path.join(root, subdir)
if not os.path.isdir(fp):
os.makedirs(fp, DIR_MODE, exist_ok=True)
self.storages = {
type: get_objstorage(
'pathslicing', {'root': os.path.join(root, subdir),
'slicing': '0:1/0:5'}
)
for type, subdir in BUNDLE_TYPES.items()
}
def __contains__(self, obj_id):
- return obj_id in self.storage
+ for storage in self.storages:
+ if obj_id in storage:
+ return True
+ return False
def add(self, obj_type, obj_id, content):
storage = self._get_storage(obj_type)
return storage.add(content, obj_id)
def get(self, obj_type, obj_id):
storage = self._get_storage(obj_type)
return storage.get(hashutil.hex_to_hash(obj_id))
def is_cached(self, obj_type, obj_id):
storage = self._get_storage(obj_type)
return hashutil.hex_to_hash(obj_id) in storage
def ls(self, obj_type):
storage = self._get_storage(obj_type)
yield from storage
def _get_storage(self, obj_type):
"""Get the storage that corresponds to the object type"""
try:
return self.storages[obj_type]
except:
raise ValueError('Wrong bundle type: ' + obj_type)
diff --git a/swh/storage/vault/cooker.py b/swh/storage/vault/cooker.py
index b48c4a83..e4ae53ab 100644
--- a/swh/storage/vault/cooker.py
+++ b/swh/storage/vault/cooker.py
@@ -1,173 +1,216 @@
# Copyright (C) 2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import abc
import io
import os
import tarfile
import tempfile
import itertools
from swh.core import hashutil
+
SKIPPED_MESSAGE = (b'This content have not been retrieved in '
b'Software Heritage archive due to its size')
HIDDEN_MESSAGE = (b'This content is hidden')
class BaseVaultCooker(metaclass=abc.ABCMeta):
"""Abstract base class for the vault's bundle creators
This class describes a common API for the cookers.
"""
+ def __init__(self, storage, cache):
+ self.storage = storage
+ self.cache = cache
+
@abc.abstractmethod
def cook(self, obj_id):
"""Cook the requested object into a bundle
The type of the object represented by the id depends on the
concrete class. Very likely, each type of bundle will have its
own cooker class.
Args:
obj_id: id of the object to be cooked into a bundle.
"""
raise NotImplementedError(
'Vault cookers must implement a `cook` method')
+ @abc.abstractmethod
+ def update_cache(self, id, bundle_content):
+ raise NotImplementedError('Vault cookers must implement a '
+ '`update_cache` method')
+
+ @abc.abstractmethod
+ def notify_bundle_ready(self, notif_data, bundle_id):
+ raise NotImplementedError(
+ 'Vault cookers must implement a `notify_bundle_ready` method')
+
class DirectoryVaultCooker(BaseVaultCooker):
"""Cooker to create a directory bundle """
def __init__(self, storage, cache):
"""Initialize a cooker that create directory bundles
Args:
storage: source storage where content are retrieved.
cache: destination storage where the cooked bundle are stored.
"""
self.storage = storage
self.cache = cache
def cook(self, dir_id):
"""Cook the requested directory into a Bundle
Args:
dir_id (bytes): the id of the directory to be cooked.
Returns:
bytes that correspond to the bundle
"""
- root = bytes(tempfile.mkdtemp(prefix='directory.', suffix='.cook'),
- 'utf8')
- # Retrieve data from the database
+ # Create the bytes that corresponds to the compressed
+ # directory.
+ directory_cooker = DirectoryCooker(self.storage)
+ bundle_content = directory_cooker.get_directory_bytes(dir_id)
+ # Cache the bundle
+ self._cache_bundle(dir_id, bundle_content)
+ # Make a notification that the bundle have been cooked
+ # NOT YET IMPLEMENTED see TODO in function.
+ self._notify_bundle_ready(dir_id)
+
+ def update_cache(self, dir_id, bundle_content):
+ self.cache.add('directory', dir_id, bundle_content)
+
+ def notify_bundle_ready(self, bundle_id):
+ # TODO plug this method with the notification method once
+ # done.
+ pass
+
+
+class DirectoryCooker():
+ """Creates a cooked directory from its sha1_git in the db.
+
+ Warning: This is NOT a directly accessible cooker, but a low-level
+ one that effectuates the manipulations.
+
+ """
+ def __init__(self, storage):
+ self.storage = storage
+
+ def get_directory_bytes(self, dir_id):
+ # Create temporary folder to retrieve the files into.
+ root = bytes(tempfile.mkdtemp(prefix='directory.',
+ suffix='.cook'), 'utf8')
+ # Retrieve data from the database.
data = list(self.storage.directory_ls(dir_id, recursive=True))
+ # Split into files and directory data.
data1, data2 = itertools.tee(data, 2)
dir_data = (entry['name'] for entry in data1 if entry['type'] == 'dir')
file_data = (entry for entry in data2 if entry['type'] == 'file')
- # Recreate the directory
+ # Recreate the directory's subtree and then the files into it.
self._create_tree(root, dir_data)
self._create_files(root, file_data)
- # Use the created directory to get the bundle datas
+ # Use the created directory to make a bundle with the data as
+ # a compressed directory.
bundle_content = self._create_bundle_content(
root,
hashutil.hash_to_hex(dir_id)
)
- self._cache_bundle(dir_id, bundle_content)
-
- # Make a notification that the bundle have been cooked
- self._notify_bundle_ready(dir_id)
+ return bundle_content
def _create_tree(self, root, directory_paths):
"""Create a directory tree from the given paths
The tree is created from `root` and each given path in
`directory_paths` will be created.
"""
# Directories are sorted by depth so they are created in the
# right order
bsep = bytes(os.path.sep, 'utf8')
dir_names = sorted(
directory_paths,
key=lambda x: len(x.split(bsep))
)
for dir_name in dir_names:
os.makedirs(os.path.join(root, dir_name))
def _create_files(self, root, file_datas):
- """Iterates over the file datas and delegate to the right method.
+ """Create the files according to their status.
"""
# Then create the files
for file_data in file_datas:
path = os.path.join(root, file_data['name'])
status = file_data['status']
if status == 'absent':
self._create_file_absent(path)
elif status == 'hidden':
self._create_file_hidden(path)
else:
content = self._get_file_content(file_data['sha1'])
self._create_file(path, content)
- def _get_file_content(self, obj_id):
- content = list(self.storage.content_get([obj_id]))[0]['data']
- return content
-
def _create_file(self, path, content):
- """Create the given file and fill it with content."""
+ """Create the given file and fill it with content.
+
+ """
with open(path, 'wb') as f:
f.write(content)
+ def _get_file_content(self, obj_id):
+ """Get the content of the given file.
+
+ """
+ content = list(self.storage.content_get([obj_id]))[0]['data']
+ return content
+
def _create_file_absent(self, path):
"""Create a file that indicates a skipped content
Create the given file but fill it with a specific content to
indicates that the content have not been retrieved by the
software heritage archive due to its size.
"""
self._create_file(self, SKIPPED_MESSAGE)
def _create_file_hidden(self, path):
"""Create a file that indicates an hidden content
Create the given file but fill it with a specific content to
indicates that the content could not be retrieved due to
privacy policy.
"""
self._create_file(self, HIDDEN_MESSAGE)
def _create_bundle_content(self, path, hex_dir_id):
"""Create a bundle from the given directory
Args:
path: location of the directory to package.
hex_dir_id: hex representation of the directory id
Returns:
- a path to the newly created archive file.
+ bytes that represents the compressed directory as a
+ bundle.
"""
tar_buffer = io.BytesIO()
tar = tarfile.open(fileobj=tar_buffer, mode='w')
tar.add(path.decode(), arcname=hex_dir_id)
return tar_buffer.getbuffer()
-
- def _cache_bundle(self, dir_id, bundle_content):
- self.cache.add('directory', dir_id, bundle_content)
-
- def _notify_bundle_ready(self, bundle_id):
- # TODO plug this method with the notification method once
- # done.
- pass
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Thu, Jul 3, 10:43 AM (1 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3295146
Attached To
rDSTO Storage manager
Event Timeline
Log In to Comment