Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9343388
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
20 KB
Subscribers
None
View Options
diff --git a/swh/storage/vault/api/client.py b/swh/storage/vault/api/client.py
index c158245b2..0ea2d79b8 100644
--- a/swh/storage/vault/api/client.py
+++ b/swh/storage/vault/api/client.py
@@ -1,35 +1,27 @@
# Copyright (C) 2016-2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from swh.core import hashutil
from swh.core.api import SWHRemoteAPI
from swh.storage.exc import StorageAPIError
class RemoteVaultCache(SWHRemoteAPI):
"""Client to the Software Heritage vault cache."""
def __init__(self, base_url):
super().__init__(api_exception=StorageAPIError, url=base_url)
- def directory_ls(self):
- return self.get('vault/directory/')
+ def ls(self, obj_type):
+ return self.get('vault/{}/'.format(obj_type))
- def directory_get(self, obj_id):
- return self.get('vault/directory/%s/' % (hashutil.hash_to_hex(obj_id)))
+ def get(self, obj_type, obj_id):
+ return self.get('vault/{}/{}/'.format(obj_type,
+ hashutil.hash_to_hex(obj_id)))
- def directory_cook(self, obj_id):
- return self.post('vault/directory/%s/' % hashutil.hash_to_hex(obj_id),
- data={})
-
- def revision_ls(self):
- return self.get('vault/revision/')
-
- def revision_get(self, obj_id):
- return self.get('vault/revision/%s/' % (hashutil.hash_to_hex(obj_id)))
-
- def revision_cook(self, obj_id):
- return self.post('vault/revision/%s/' % hashutil.hash_to_hex(obj_id),
+ def cook(self, obj_type, obj_id):
+ return self.post('vault/{}/{}/'.format(obj_type,
+ hashutil.hash_to_hex(obj_id)),
data={})
diff --git a/swh/storage/vault/api/cooking_tasks.py b/swh/storage/vault/api/cooking_tasks.py
index ac08f7475..4a6ccc685 100644
--- a/swh/storage/vault/api/cooking_tasks.py
+++ b/swh/storage/vault/api/cooking_tasks.py
@@ -1,32 +1,26 @@
# Copyright (C) 2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from swh.scheduler.task import Task
from swh.core import hashutil
from ..cache import VaultCache
-from ..cooker import DirectoryVaultCooker, RevisionVaultCooker
+from ..cooker import COOKER_TYPES
from ... import get_storage
-COOKER_TYPES = {
- 'directory': DirectoryVaultCooker,
- 'revision': RevisionVaultCooker,
-}
-
-
class SWHCookingTask(Task):
"""Main task which archives a contents batch.
"""
task_queue = 'swh_storage_vault_cooking'
def run(self, type, hex_id, storage_args, cache_args):
# Initialize elements
storage = get_storage(**storage_args)
cache = VaultCache(**cache_args)
# Initialize cooker
cooker = COOKER_TYPES[type](storage, cache)
# Perform the cooking
cooker.cook(obj_id=hashutil.hex_to_hash(hex_id))
diff --git a/swh/storage/vault/api/server.py b/swh/storage/vault/api/server.py
index 6ec9843df..dd572fe87 100644
--- a/swh/storage/vault/api/server.py
+++ b/swh/storage/vault/api/server.py
@@ -1,104 +1,101 @@
# Copyright (C) 2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import click
+import re
from flask import abort, g
from werkzeug.routing import BaseConverter
from swh.core import config
from swh.core.api import (SWHServerAPIApp, error_handler,
encode_data_server as encode_data)
-from swh.storage import get_storage
-from swh.storage.vault.api import cooking_tasks # NOQA
+from swh.scheduler.utils import get_task
+from swh.storage.vault.api.cooking_tasks import COOKER_TYPES
+from swh.storage.vault.api.cooking_tasks import SWHCookingTask # noqa
from swh.storage.vault.cache import VaultCache
-from swh.storage.vault.cooker import DirectoryVaultCooker
-from swh.scheduler.celery_backend.config import app as celery_app
cooking_task_name = 'swh.storage.vault.api.cooking_tasks.SWHCookingTask'
DEFAULT_CONFIG = {
'storage': ('dict', {
'cls': 'local',
'args': {
'db': 'dbname=softwareheritage-dev',
'objstorage': {
'root': '/tmp/objects',
'slicing': '0:2/2:4/4:6',
},
},
}),
'cache': ('dict', {'root': '/tmp/vaultcache'})
}
-class RegexConverter(BaseConverter):
+class CookerConverter(BaseConverter):
def __init__(self, url_map, *items):
super().__init__(url_map)
- self.regex = items[0]
+ types = [re.escape(c) for c in COOKER_TYPES]
+ self.regex = '({})'.format('|'.join(types))
app = SWHServerAPIApp(__name__)
-app.url_map.converters['regex'] = RegexConverter
+app.url_map.converters['cooker'] = CookerConverter
@app.errorhandler(Exception)
def my_error_handler(exception):
return error_handler(exception, encode_data)
@app.before_request
def before_request():
g.cache = VaultCache(**app.config['cache'])
- g.cooker = DirectoryVaultCooker(
- get_storage(**app.config['storage']),
- g.cache
- )
@app.route('/')
def index():
return 'SWH vault API server'
-@app.route('/vault/<regex("directory|revision|snapshot"):type>/',
+@app.route('/vault/<cooker:type>/',
methods=['GET'])
def ls_directory(type):
return encode_data(list(
g.cache.ls(type)
))
-@app.route('/vault/<regex("directory|revision|snapshot"):type>/<id>/',
+@app.route('/vault/<cooker:type>/<id>/',
methods=['GET'])
def get_cooked_directory(type, id):
if not g.cache.is_cached(type, id):
abort(404)
return encode_data(g.cache.get(type, id).decode())
-@app.route('/vault/<regex("directory|revision|snapshot"):type>/<id>/',
+@app.route('/vault/<cooker:type>/<id>/',
methods=['POST'])
def cook_request_directory(type, id):
- task = celery_app.tasks[cooking_task_name]
+ task = get_task(cooking_task_name)
task.delay(type, id, app.config['storage'], app.config['cache'])
# Return url to get the content and 201 CREATED
return encode_data('/vault/%s/%s/' % (type, id)), 201
@click.command()
@click.argument('config-path', required=1)
@click.option('--host', default='0.0.0.0', help="Host to run the server")
@click.option('--port', default=5005, type=click.INT,
help="Binding port of the server")
@click.option('--debug/--nodebug', default=True,
help="Indicates if the server should run in debug mode")
def launch(config_path, host, port, debug):
app.config.update(config.read(config_path, DEFAULT_CONFIG))
app.run(host, port=int(port), debug=bool(debug))
if __name__ == '__main__':
launch()
diff --git a/swh/storage/vault/cache.py b/swh/storage/vault/cache.py
index cbc662c8a..b555fd6ca 100644
--- a/swh/storage/vault/cache.py
+++ b/swh/storage/vault/cache.py
@@ -1,69 +1,53 @@
# Copyright (C) 2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
from swh.core import hashutil
from swh.objstorage import get_objstorage
from swh.objstorage.objstorage_pathslicing import DIR_MODE
-BUNDLE_TYPES = {
- 'directory': 'd',
- 'revision': 'r',
- 'snapshot': 's',
-}
-
class VaultCache():
"""The vault cache is an object storage that stores bundles
The current implementation uses a PathSlicingObjStorage to store
the bundles. The id of a content if prefixed to specify its type
and store different types of bundle in different folders.
"""
def __init__(self, root):
- for subdir in BUNDLE_TYPES.values():
- fp = os.path.join(root, subdir)
- if not os.path.isdir(fp):
- os.makedirs(fp, DIR_MODE, exist_ok=True)
-
- self.storages = {
- type: get_objstorage(
- 'pathslicing', {'root': os.path.join(root, subdir),
- 'slicing': '0:1/0:5'}
- )
- for type, subdir in BUNDLE_TYPES.items()
- }
-
- def __contains__(self, obj_id):
- for storage in self.storages:
- if obj_id in storage:
- return True
- return False
+ self.root = root
+ self.storages = {}
def add(self, obj_type, obj_id, content):
storage = self._get_storage(obj_type)
return storage.add(content, obj_id)
def get(self, obj_type, obj_id):
storage = self._get_storage(obj_type)
return storage.get(hashutil.hex_to_hash(obj_id))
def is_cached(self, obj_type, obj_id):
storage = self._get_storage(obj_type)
return hashutil.hex_to_hash(obj_id) in storage
def ls(self, obj_type):
storage = self._get_storage(obj_type)
yield from storage
def _get_storage(self, obj_type):
"""Get the storage that corresponds to the object type"""
- try:
- return self.storages[obj_type]
- except:
- raise ValueError('Wrong bundle type: ' + obj_type)
+
+ if obj_type not in self.storages:
+ fp = os.path.join(self.root, obj_type)
+ if not os.path.isdir(fp):
+ os.makedirs(fp, DIR_MODE, exist_ok=True)
+
+ self.storages[obj_type] = get_objstorage(
+ 'pathslicing', {'root': fp, 'slicing': '0:1/0:5'})
+
+ return self.storages[obj_type]
diff --git a/swh/storage/vault/cooker.py b/swh/storage/vault/cooker.py
index 8210db934..a84e19843 100644
--- a/swh/storage/vault/cooker.py
+++ b/swh/storage/vault/cooker.py
@@ -1,296 +1,280 @@
# Copyright (C) 2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import abc
import io
import itertools
import logging
import os
import tarfile
import tempfile
from pathlib import Path
from swh.core import hashutil
SKIPPED_MESSAGE = (b'This content have not been retrieved in '
b'Software Heritage archive due to its size')
HIDDEN_MESSAGE = (b'This content is hidden')
def get_tar_bytes(path, arcname=None):
path = Path(path)
if not arcname:
arcname = path.name
tar_buffer = io.BytesIO()
tar = tarfile.open(fileobj=tar_buffer, mode='w')
tar.add(str(path), arcname=arcname)
return tar_buffer.getbuffer()
class BaseVaultCooker(metaclass=abc.ABCMeta):
"""Abstract base class for the vault's bundle creators
This class describes a common API for the cookers.
To define a new cooker, inherit from this class and override:
- CACHE_TYPE_KEY: key to use for the bundle to reference in cache
- def cook(obj_id): cook the object into a bundle
- def notify_bundle_ready(notif_data, bundle_id): notify the
bundle is ready.
"""
CACHE_TYPE_KEY = None
def __init__(self, storage, cache):
self.storage = storage
self.cache = cache
@abc.abstractmethod
def cook(self, obj_id):
"""Cook the requested object into a bundle
The type of the object represented by the id depends on the
concrete class. Very likely, each type of bundle will have its
own cooker class.
Args:
obj_id: id of the object to be cooked into a bundle.
"""
pass
def update_cache(self, id, bundle_content):
"""Update the cache with id and bundle_content.
"""
self.cache.add(self.CACHE_TYPE_KEY, id, bundle_content)
@abc.abstractmethod
def notify_bundle_ready(self, notif_data, bundle_id):
"""Notify the bundle bundle_id is ready.
"""
pass
-class DirectoryVaultCooker(BaseVaultCooker):
+class DirectoryCooker(BaseVaultCooker):
"""Cooker to create a directory bundle """
CACHE_TYPE_KEY = 'directory'
- def __init__(self, storage, cache):
- """Initialize a cooker that create directory bundles
-
- Args:
- storage: source storage where content are retrieved.
- cache: destination storage where the cooked bundle are stored.
-
- """
- self.storage = storage
- self.cache = cache
-
def cook(self, obj_id):
"""Cook the requested directory into a Bundle
Args:
obj_id (bytes): the id of the directory to be cooked.
Returns:
bytes that correspond to the bundle
"""
# Create the bytes that corresponds to the compressed
# directory.
- directory_cooker = DirectoryCooker(self.storage)
+ directory_cooker = DirectoryBuilder(self.storage)
bundle_content = directory_cooker.get_directory_bytes(obj_id)
# Cache the bundle
self.update_cache(obj_id, bundle_content)
# Make a notification that the bundle have been cooked
# NOT YET IMPLEMENTED see TODO in function.
self.notify_bundle_ready(
notif_data='Bundle %s ready' % hashutil.hash_to_hex(obj_id),
bundle_id=obj_id)
def notify_bundle_ready(self, notif_data, bundle_id):
# TODO plug this method with the notification method once
# done.
pass
-class RevisionVaultCooker(BaseVaultCooker):
+class RevisionFlatCooker(BaseVaultCooker):
"""Cooker to create a directory bundle """
- CACHE_TYPE_KEY = 'revision'
-
- def __init__(self, storage, cache):
- """Initialize a cooker that create revision bundles
-
- Args:
- storage: source storage where content are retrieved.
- cache: destination storage where the cooked bundle are stored.
-
- """
- self.storage = storage
- self.cache = cache
+ CACHE_TYPE_KEY = 'revision_flat'
def cook(self, obj_id):
"""Cook the requested revision into a Bundle
Args:
obj_id (bytes): the id of the revision to be cooked.
Returns:
bytes that correspond to the bundle
"""
- directory_cooker = DirectoryCooker(self.storage)
+ directory_cooker = DirectoryBuilder(self.storage)
with tempfile.TemporaryDirectory(suffix='.cook') as root_tmp:
root = Path(root_tmp)
for revision in self.storage.revision_log([obj_id]):
revdir = root / hashutil.hash_to_hex(revision['id'])
revdir.mkdir()
directory_cooker.build_directory(revision['directory'],
str(revdir).encode())
bundle_content = get_tar_bytes(root_tmp,
hashutil.hash_to_hex(obj_id))
# Cache the bundle
self.update_cache(obj_id, bundle_content)
# Make a notification that the bundle have been cooked
# NOT YET IMPLEMENTED see TODO in function.
self.notify_bundle_ready(
notif_data='Bundle %s ready' % hashutil.hash_to_hex(obj_id),
bundle_id=obj_id)
def notify_bundle_ready(self, notif_data, bundle_id):
# TODO plug this method with the notification method once
# done.
pass
-class DirectoryCooker():
+class DirectoryBuilder:
"""Creates a cooked directory from its sha1_git in the db.
Warning: This is NOT a directly accessible cooker, but a low-level
one that executes the manipulations.
"""
def __init__(self, storage):
self.storage = storage
def get_directory_bytes(self, dir_id):
# Create temporary folder to retrieve the files into.
root = bytes(tempfile.mkdtemp(prefix='directory.',
suffix='.cook'), 'utf8')
self.build_directory(dir_id, root)
# Use the created directory to make a bundle with the data as
# a compressed directory.
bundle_content = self._create_bundle_content(
root,
hashutil.hash_to_hex(dir_id))
return bundle_content
def build_directory(self, dir_id, root):
# Retrieve data from the database.
data = self.storage.directory_ls(dir_id, recursive=True)
# Split into files and directory data.
# TODO(seirl): also handle revision data.
data1, data2 = itertools.tee(data, 2)
dir_data = (entry['name'] for entry in data1 if entry['type'] == 'dir')
file_data = (entry for entry in data2 if entry['type'] == 'file')
# Recreate the directory's subtree and then the files into it.
self._create_tree(root, dir_data)
self._create_files(root, file_data)
def _create_tree(self, root, directory_paths):
"""Create a directory tree from the given paths
The tree is created from `root` and each given path in
`directory_paths` will be created.
"""
# Directories are sorted by depth so they are created in the
# right order
bsep = bytes(os.path.sep, 'utf8')
dir_names = sorted(
directory_paths,
key=lambda x: len(x.split(bsep)))
for dir_name in dir_names:
os.makedirs(os.path.join(root, dir_name))
def _create_files(self, root, file_datas):
"""Create the files according to their status.
"""
# Then create the files
for file_data in file_datas:
path = os.path.join(root, file_data['name'])
status = file_data['status']
perms = file_data['perms']
if status == 'absent':
self._create_file_absent(path)
elif status == 'hidden':
self._create_file_hidden(path)
else:
content = self._get_file_content(file_data['sha1'])
self._create_file(path, content, perms)
def _create_file(self, path, content, perms=0o100644):
"""Create the given file and fill it with content.
"""
if perms not in (0o100644, 0o100755, 0o120000):
logging.warning('File {} has invalid permission {}, '
'defaulting to 644.'.format(path, perms))
if perms == 0o120000: # Symbolic link
os.symlink(content, path)
else:
with open(path, 'wb') as f:
f.write(content)
os.chmod(path, perms & 0o777)
def _get_file_content(self, obj_id):
"""Get the content of the given file.
"""
content = list(self.storage.content_get([obj_id]))[0]['data']
return content
def _create_file_absent(self, path):
"""Create a file that indicates a skipped content
Create the given file but fill it with a specific content to
indicate that the content have not been retrieved by the
software heritage archive due to its size.
"""
self._create_file(self, SKIPPED_MESSAGE)
def _create_file_hidden(self, path):
"""Create a file that indicates an hidden content
Create the given file but fill it with a specific content to
indicate that the content could not be retrieved due to
privacy policy.
"""
self._create_file(self, HIDDEN_MESSAGE)
def _create_bundle_content(self, path, hex_dir_id):
"""Create a bundle from the given directory
Args:
path: location of the directory to package.
hex_dir_id: hex representation of the directory id
Returns:
bytes that represent the compressed directory as a bundle.
"""
return get_tar_bytes(path.decode(), hex_dir_id)
+
+
+COOKER_TYPES = {
+ 'directory': DirectoryCooker,
+ 'revision_flat': RevisionFlatCooker,
+}
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Fri, Jul 4, 1:29 PM (5 d, 20 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3276255
Attached To
R65 Staging repository
Event Timeline
Log In to Comment