Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/storage/vault/api/client.py b/swh/storage/vault/api/client.py
index c158245b2..0ea2d79b8 100644
--- a/swh/storage/vault/api/client.py
+++ b/swh/storage/vault/api/client.py
@@ -1,35 +1,27 @@
# Copyright (C) 2016-2017 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from swh.core import hashutil
from swh.core.api import SWHRemoteAPI
from swh.storage.exc import StorageAPIError
class RemoteVaultCache(SWHRemoteAPI):
"""Client to the Software Heritage vault cache."""
def __init__(self, base_url):
super().__init__(api_exception=StorageAPIError, url=base_url)
- def directory_ls(self):
- return self.get('vault/directory/')
+ def ls(self, obj_type):
+ return self.get('vault/{}/'.format(obj_type))
- def directory_get(self, obj_id):
- return self.get('vault/directory/%s/' % (hashutil.hash_to_hex(obj_id)))
+ def get(self, obj_type, obj_id):
+ return self.get('vault/{}/{}/'.format(obj_type,
+ hashutil.hash_to_hex(obj_id)))
- def directory_cook(self, obj_id):
- return self.post('vault/directory/%s/' % hashutil.hash_to_hex(obj_id),
- data={})
-
- def revision_ls(self):
- return self.get('vault/revision/')
-
- def revision_get(self, obj_id):
- return self.get('vault/revision/%s/' % (hashutil.hash_to_hex(obj_id)))
-
- def revision_cook(self, obj_id):
- return self.post('vault/revision/%s/' % hashutil.hash_to_hex(obj_id),
+ def cook(self, obj_type, obj_id):
+ return self.post('vault/{}/{}/'.format(obj_type,
+ hashutil.hash_to_hex(obj_id)),
data={})
diff --git a/swh/storage/vault/api/cooking_tasks.py b/swh/storage/vault/api/cooking_tasks.py
index ac08f7475..4a6ccc685 100644
--- a/swh/storage/vault/api/cooking_tasks.py
+++ b/swh/storage/vault/api/cooking_tasks.py
@@ -1,32 +1,26 @@
# Copyright (C) 2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from swh.scheduler.task import Task
from swh.core import hashutil
from ..cache import VaultCache
-from ..cooker import DirectoryVaultCooker, RevisionVaultCooker
+from ..cooker import COOKER_TYPES
from ... import get_storage
-COOKER_TYPES = {
- 'directory': DirectoryVaultCooker,
- 'revision': RevisionVaultCooker,
-}
-
-
class SWHCookingTask(Task):
"""Main task which archives a contents batch.
"""
task_queue = 'swh_storage_vault_cooking'
def run(self, type, hex_id, storage_args, cache_args):
# Initialize elements
storage = get_storage(**storage_args)
cache = VaultCache(**cache_args)
# Initialize cooker
cooker = COOKER_TYPES[type](storage, cache)
# Perform the cooking
cooker.cook(obj_id=hashutil.hex_to_hash(hex_id))
diff --git a/swh/storage/vault/api/server.py b/swh/storage/vault/api/server.py
index 6ec9843df..dd572fe87 100644
--- a/swh/storage/vault/api/server.py
+++ b/swh/storage/vault/api/server.py
@@ -1,104 +1,101 @@
# Copyright (C) 2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import click
+import re
from flask import abort, g
from werkzeug.routing import BaseConverter
from swh.core import config
from swh.core.api import (SWHServerAPIApp, error_handler,
encode_data_server as encode_data)
-from swh.storage import get_storage
-from swh.storage.vault.api import cooking_tasks # NOQA
+from swh.scheduler.utils import get_task
+from swh.storage.vault.api.cooking_tasks import COOKER_TYPES
+from swh.storage.vault.api.cooking_tasks import SWHCookingTask # noqa
from swh.storage.vault.cache import VaultCache
-from swh.storage.vault.cooker import DirectoryVaultCooker
-from swh.scheduler.celery_backend.config import app as celery_app
cooking_task_name = 'swh.storage.vault.api.cooking_tasks.SWHCookingTask'
DEFAULT_CONFIG = {
'storage': ('dict', {
'cls': 'local',
'args': {
'db': 'dbname=softwareheritage-dev',
'objstorage': {
'root': '/tmp/objects',
'slicing': '0:2/2:4/4:6',
},
},
}),
'cache': ('dict', {'root': '/tmp/vaultcache'})
}
-class RegexConverter(BaseConverter):
+class CookerConverter(BaseConverter):
def __init__(self, url_map, *items):
super().__init__(url_map)
- self.regex = items[0]
+ types = [re.escape(c) for c in COOKER_TYPES]
+ self.regex = '({})'.format('|'.join(types))
app = SWHServerAPIApp(__name__)
-app.url_map.converters['regex'] = RegexConverter
+app.url_map.converters['cooker'] = CookerConverter
@app.errorhandler(Exception)
def my_error_handler(exception):
return error_handler(exception, encode_data)
@app.before_request
def before_request():
g.cache = VaultCache(**app.config['cache'])
- g.cooker = DirectoryVaultCooker(
- get_storage(**app.config['storage']),
- g.cache
- )
@app.route('/')
def index():
return 'SWH vault API server'
-@app.route('/vault/<regex("directory|revision|snapshot"):type>/',
+@app.route('/vault/<cooker:type>/',
methods=['GET'])
def ls_directory(type):
return encode_data(list(
g.cache.ls(type)
))
-@app.route('/vault/<regex("directory|revision|snapshot"):type>/<id>/',
+@app.route('/vault/<cooker:type>/<id>/',
methods=['GET'])
def get_cooked_directory(type, id):
if not g.cache.is_cached(type, id):
abort(404)
return encode_data(g.cache.get(type, id).decode())
-@app.route('/vault/<regex("directory|revision|snapshot"):type>/<id>/',
+@app.route('/vault/<cooker:type>/<id>/',
methods=['POST'])
def cook_request_directory(type, id):
- task = celery_app.tasks[cooking_task_name]
+ task = get_task(cooking_task_name)
task.delay(type, id, app.config['storage'], app.config['cache'])
# Return url to get the content and 201 CREATED
return encode_data('/vault/%s/%s/' % (type, id)), 201
@click.command()
@click.argument('config-path', required=1)
@click.option('--host', default='0.0.0.0', help="Host to run the server")
@click.option('--port', default=5005, type=click.INT,
help="Binding port of the server")
@click.option('--debug/--nodebug', default=True,
help="Indicates if the server should run in debug mode")
def launch(config_path, host, port, debug):
app.config.update(config.read(config_path, DEFAULT_CONFIG))
app.run(host, port=int(port), debug=bool(debug))
if __name__ == '__main__':
launch()
diff --git a/swh/storage/vault/cache.py b/swh/storage/vault/cache.py
index cbc662c8a..b555fd6ca 100644
--- a/swh/storage/vault/cache.py
+++ b/swh/storage/vault/cache.py
@@ -1,69 +1,53 @@
# Copyright (C) 2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import os
from swh.core import hashutil
from swh.objstorage import get_objstorage
from swh.objstorage.objstorage_pathslicing import DIR_MODE
-BUNDLE_TYPES = {
- 'directory': 'd',
- 'revision': 'r',
- 'snapshot': 's',
-}
-
class VaultCache():
"""The vault cache is an object storage that stores bundles
The current implementation uses a PathSlicingObjStorage to store
the bundles. The id of a content if prefixed to specify its type
and store different types of bundle in different folders.
"""
def __init__(self, root):
- for subdir in BUNDLE_TYPES.values():
- fp = os.path.join(root, subdir)
- if not os.path.isdir(fp):
- os.makedirs(fp, DIR_MODE, exist_ok=True)
-
- self.storages = {
- type: get_objstorage(
- 'pathslicing', {'root': os.path.join(root, subdir),
- 'slicing': '0:1/0:5'}
- )
- for type, subdir in BUNDLE_TYPES.items()
- }
-
- def __contains__(self, obj_id):
- for storage in self.storages:
- if obj_id in storage:
- return True
- return False
+ self.root = root
+ self.storages = {}
def add(self, obj_type, obj_id, content):
storage = self._get_storage(obj_type)
return storage.add(content, obj_id)
def get(self, obj_type, obj_id):
storage = self._get_storage(obj_type)
return storage.get(hashutil.hex_to_hash(obj_id))
def is_cached(self, obj_type, obj_id):
storage = self._get_storage(obj_type)
return hashutil.hex_to_hash(obj_id) in storage
def ls(self, obj_type):
storage = self._get_storage(obj_type)
yield from storage
def _get_storage(self, obj_type):
"""Get the storage that corresponds to the object type"""
- try:
- return self.storages[obj_type]
- except:
- raise ValueError('Wrong bundle type: ' + obj_type)
+
+ if obj_type not in self.storages:
+ fp = os.path.join(self.root, obj_type)
+ if not os.path.isdir(fp):
+ os.makedirs(fp, DIR_MODE, exist_ok=True)
+
+ self.storages[obj_type] = get_objstorage(
+ 'pathslicing', {'root': fp, 'slicing': '0:1/0:5'})
+
+ return self.storages[obj_type]
diff --git a/swh/storage/vault/cooker.py b/swh/storage/vault/cooker.py
index 8210db934..a84e19843 100644
--- a/swh/storage/vault/cooker.py
+++ b/swh/storage/vault/cooker.py
@@ -1,296 +1,280 @@
# Copyright (C) 2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import abc
import io
import itertools
import logging
import os
import tarfile
import tempfile
from pathlib import Path
from swh.core import hashutil
SKIPPED_MESSAGE = (b'This content have not been retrieved in '
b'Software Heritage archive due to its size')
HIDDEN_MESSAGE = (b'This content is hidden')
def get_tar_bytes(path, arcname=None):
path = Path(path)
if not arcname:
arcname = path.name
tar_buffer = io.BytesIO()
tar = tarfile.open(fileobj=tar_buffer, mode='w')
tar.add(str(path), arcname=arcname)
return tar_buffer.getbuffer()
class BaseVaultCooker(metaclass=abc.ABCMeta):
"""Abstract base class for the vault's bundle creators
This class describes a common API for the cookers.
To define a new cooker, inherit from this class and override:
- CACHE_TYPE_KEY: key to use for the bundle to reference in cache
- def cook(obj_id): cook the object into a bundle
- def notify_bundle_ready(notif_data, bundle_id): notify the
bundle is ready.
"""
CACHE_TYPE_KEY = None
def __init__(self, storage, cache):
self.storage = storage
self.cache = cache
@abc.abstractmethod
def cook(self, obj_id):
"""Cook the requested object into a bundle
The type of the object represented by the id depends on the
concrete class. Very likely, each type of bundle will have its
own cooker class.
Args:
obj_id: id of the object to be cooked into a bundle.
"""
pass
def update_cache(self, id, bundle_content):
"""Update the cache with id and bundle_content.
"""
self.cache.add(self.CACHE_TYPE_KEY, id, bundle_content)
@abc.abstractmethod
def notify_bundle_ready(self, notif_data, bundle_id):
"""Notify the bundle bundle_id is ready.
"""
pass
-class DirectoryVaultCooker(BaseVaultCooker):
+class DirectoryCooker(BaseVaultCooker):
"""Cooker to create a directory bundle """
CACHE_TYPE_KEY = 'directory'
- def __init__(self, storage, cache):
- """Initialize a cooker that create directory bundles
-
- Args:
- storage: source storage where content are retrieved.
- cache: destination storage where the cooked bundle are stored.
-
- """
- self.storage = storage
- self.cache = cache
-
def cook(self, obj_id):
"""Cook the requested directory into a Bundle
Args:
obj_id (bytes): the id of the directory to be cooked.
Returns:
bytes that correspond to the bundle
"""
# Create the bytes that corresponds to the compressed
# directory.
- directory_cooker = DirectoryCooker(self.storage)
+ directory_cooker = DirectoryBuilder(self.storage)
bundle_content = directory_cooker.get_directory_bytes(obj_id)
# Cache the bundle
self.update_cache(obj_id, bundle_content)
# Make a notification that the bundle have been cooked
# NOT YET IMPLEMENTED see TODO in function.
self.notify_bundle_ready(
notif_data='Bundle %s ready' % hashutil.hash_to_hex(obj_id),
bundle_id=obj_id)
def notify_bundle_ready(self, notif_data, bundle_id):
# TODO plug this method with the notification method once
# done.
pass
-class RevisionVaultCooker(BaseVaultCooker):
+class RevisionFlatCooker(BaseVaultCooker):
"""Cooker to create a directory bundle """
- CACHE_TYPE_KEY = 'revision'
-
- def __init__(self, storage, cache):
- """Initialize a cooker that create revision bundles
-
- Args:
- storage: source storage where content are retrieved.
- cache: destination storage where the cooked bundle are stored.
-
- """
- self.storage = storage
- self.cache = cache
+ CACHE_TYPE_KEY = 'revision_flat'
def cook(self, obj_id):
"""Cook the requested revision into a Bundle
Args:
obj_id (bytes): the id of the revision to be cooked.
Returns:
bytes that correspond to the bundle
"""
- directory_cooker = DirectoryCooker(self.storage)
+ directory_cooker = DirectoryBuilder(self.storage)
with tempfile.TemporaryDirectory(suffix='.cook') as root_tmp:
root = Path(root_tmp)
for revision in self.storage.revision_log([obj_id]):
revdir = root / hashutil.hash_to_hex(revision['id'])
revdir.mkdir()
directory_cooker.build_directory(revision['directory'],
str(revdir).encode())
bundle_content = get_tar_bytes(root_tmp,
hashutil.hash_to_hex(obj_id))
# Cache the bundle
self.update_cache(obj_id, bundle_content)
# Make a notification that the bundle have been cooked
# NOT YET IMPLEMENTED see TODO in function.
self.notify_bundle_ready(
notif_data='Bundle %s ready' % hashutil.hash_to_hex(obj_id),
bundle_id=obj_id)
def notify_bundle_ready(self, notif_data, bundle_id):
# TODO plug this method with the notification method once
# done.
pass
-class DirectoryCooker():
+class DirectoryBuilder:
"""Creates a cooked directory from its sha1_git in the db.
Warning: This is NOT a directly accessible cooker, but a low-level
one that executes the manipulations.
"""
def __init__(self, storage):
self.storage = storage
def get_directory_bytes(self, dir_id):
# Create temporary folder to retrieve the files into.
root = bytes(tempfile.mkdtemp(prefix='directory.',
suffix='.cook'), 'utf8')
self.build_directory(dir_id, root)
# Use the created directory to make a bundle with the data as
# a compressed directory.
bundle_content = self._create_bundle_content(
root,
hashutil.hash_to_hex(dir_id))
return bundle_content
def build_directory(self, dir_id, root):
# Retrieve data from the database.
data = self.storage.directory_ls(dir_id, recursive=True)
# Split into files and directory data.
# TODO(seirl): also handle revision data.
data1, data2 = itertools.tee(data, 2)
dir_data = (entry['name'] for entry in data1 if entry['type'] == 'dir')
file_data = (entry for entry in data2 if entry['type'] == 'file')
# Recreate the directory's subtree and then the files into it.
self._create_tree(root, dir_data)
self._create_files(root, file_data)
def _create_tree(self, root, directory_paths):
"""Create a directory tree from the given paths
The tree is created from `root` and each given path in
`directory_paths` will be created.
"""
# Directories are sorted by depth so they are created in the
# right order
bsep = bytes(os.path.sep, 'utf8')
dir_names = sorted(
directory_paths,
key=lambda x: len(x.split(bsep)))
for dir_name in dir_names:
os.makedirs(os.path.join(root, dir_name))
def _create_files(self, root, file_datas):
"""Create the files according to their status.
"""
# Then create the files
for file_data in file_datas:
path = os.path.join(root, file_data['name'])
status = file_data['status']
perms = file_data['perms']
if status == 'absent':
self._create_file_absent(path)
elif status == 'hidden':
self._create_file_hidden(path)
else:
content = self._get_file_content(file_data['sha1'])
self._create_file(path, content, perms)
def _create_file(self, path, content, perms=0o100644):
"""Create the given file and fill it with content.
"""
if perms not in (0o100644, 0o100755, 0o120000):
logging.warning('File {} has invalid permission {}, '
'defaulting to 644.'.format(path, perms))
if perms == 0o120000: # Symbolic link
os.symlink(content, path)
else:
with open(path, 'wb') as f:
f.write(content)
os.chmod(path, perms & 0o777)
def _get_file_content(self, obj_id):
"""Get the content of the given file.
"""
content = list(self.storage.content_get([obj_id]))[0]['data']
return content
def _create_file_absent(self, path):
"""Create a file that indicates a skipped content
Create the given file but fill it with a specific content to
indicate that the content have not been retrieved by the
software heritage archive due to its size.
"""
self._create_file(self, SKIPPED_MESSAGE)
def _create_file_hidden(self, path):
"""Create a file that indicates an hidden content
Create the given file but fill it with a specific content to
indicate that the content could not be retrieved due to
privacy policy.
"""
self._create_file(self, HIDDEN_MESSAGE)
def _create_bundle_content(self, path, hex_dir_id):
"""Create a bundle from the given directory
Args:
path: location of the directory to package.
hex_dir_id: hex representation of the directory id
Returns:
bytes that represent the compressed directory as a bundle.
"""
return get_tar_bytes(path.decode(), hex_dir_id)
+
+
+COOKER_TYPES = {
+ 'directory': DirectoryCooker,
+ 'revision_flat': RevisionFlatCooker,
+}

File Metadata

Mime Type
text/x-diff
Expires
Fri, Jul 4, 1:29 PM (6 d, 4 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3276255

Event Timeline