Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/storage/api/client.py b/swh/storage/api/client.py
index 937d7f6ae..f8f9ad207 100644
--- a/swh/storage/api/client.py
+++ b/swh/storage/api/client.py
@@ -1,199 +1,178 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import pickle
-
import requests
from requests.exceptions import ConnectionError
-from swh.core.serializers import msgpack_dumps, msgpack_loads, SWHJSONDecoder
-from swh.storage.exc import StorageAPIError
-
-
-def encode_data(data):
- try:
- return msgpack_dumps(data)
- except OverflowError as e:
- raise ValueError('Limits were reached. Please, check your input.\n' +
- str(e))
-
-
-def decode_response(response):
- content_type = response.headers['content-type']
-
- if content_type.startswith('application/x-msgpack'):
- r = msgpack_loads(response.content)
- elif content_type.startswith('application/json'):
- r = response.json(cls=SWHJSONDecoder)
- else:
- raise ValueError('Wrong content type `%s` for API response'
- % content_type)
- return r
+from ..exc import StorageAPIError
+from ..api.common import (decode_response,
+ encode_data_client as encode_data)
class RemoteStorage():
"""Proxy to a remote storage API"""
def __init__(self, base_url):
self.base_url = base_url
self.session = requests.Session()
def url(self, endpoint):
return '%s%s' % (self.base_url, endpoint)
def post(self, endpoint, data):
try:
response = self.session.post(
self.url(endpoint),
data=encode_data(data),
headers={'content-type': 'application/x-msgpack'},
)
except ConnectionError as e:
print(str(e))
raise StorageAPIError(e)
# XXX: this breaks language-independence and should be
# replaced by proper unserialization
if response.status_code == 400:
raise pickle.loads(decode_response(response))
return decode_response(response)
def get(self, endpoint, data=None):
try:
response = self.session.get(
self.url(endpoint),
params=data,
)
except ConnectionError as e:
print(str(e))
raise StorageAPIError(e)
if response.status_code == 404:
return None
# XXX: this breaks language-independence and should be
# replaced by proper unserialization
if response.status_code == 400:
raise pickle.loads(decode_response(response))
else:
return decode_response(response)
def content_add(self, content):
return self.post('content/add', {'content': content})
def content_missing(self, content, key_hash='sha1'):
return self.post('content/missing', {'content': content,
'key_hash': key_hash})
def content_missing_per_sha1(self, contents):
return self.post('content/missing/sha1', {'contents': contents})
def content_get(self, content):
return self.post('content/data', {'content': content})
def content_find(self, content):
return self.post('content/present', {'content': content})
def content_find_occurrence(self, content):
return self.post('content/occurrence', {'content': content})
def directory_add(self, directories):
return self.post('directory/add', {'directories': directories})
def directory_missing(self, directories):
return self.post('directory/missing', {'directories': directories})
def directory_get(self, directories):
return self.post('directory', dict(directories=directories))
def directory_ls(self, directory, recursive=False):
return self.get('directory/ls', {'directory': directory,
'recursive': recursive})
def revision_get(self, revisions):
return self.post('revision', {'revisions': revisions})
def revision_get_by(self, origin_id, branch_name, timestamp, limit=None):
return self.post('revision/by', dict(origin_id=origin_id,
branch_name=branch_name,
timestamp=timestamp,
limit=limit))
def revision_log(self, revisions, limit=None):
return self.post('revision/log', {'revisions': revisions,
'limit': limit})
def revision_shortlog(self, revisions, limit=None):
return self.post('revision/shortlog', {'revisions': revisions,
'limit': limit})
def revision_add(self, revisions):
return self.post('revision/add', {'revisions': revisions})
def revision_missing(self, revisions):
return self.post('revision/missing', {'revisions': revisions})
def release_add(self, releases):
return self.post('release/add', {'releases': releases})
def release_get(self, releases):
return self.post('release', {'releases': releases})
def release_get_by(self, origin_id, limit=None):
return self.post('release/by', dict(origin_id=origin_id,
limit=limit))
def release_missing(self, releases):
return self.post('release/missing', {'releases': releases})
def object_find_by_sha1_git(self, ids):
return self.post('object/find_by_sha1_git', {'ids': ids})
def occurrence_get(self, origin_id):
return self.post('occurrence', {'origin_id': origin_id})
def occurrence_add(self, occurrences):
return self.post('occurrence/add', {'occurrences': occurrences})
def origin_get(self, origin):
return self.post('origin/get', {'origin': origin})
def origin_add_one(self, origin):
return self.post('origin/add', {'origin': origin})
def person_get(self, person):
return self.post('person', {'person': person})
def fetch_history_start(self, origin_id):
return self.post('fetch_history/start', {'origin_id': origin_id})
def fetch_history_end(self, fetch_history_id, data):
return self.post('fetch_history/end',
{'fetch_history_id': fetch_history_id,
'data': data})
def fetch_history_get(self, fetch_history_id):
return self.get('fetch_history', {'id': fetch_history_id})
def entity_add(self, entities):
return self.post('entity/add', {'entities': entities})
def entity_get(self, uuid):
return self.post('entity/get', {'uuid': uuid})
def entity_get_one(self, uuid):
return self.get('entity', {'uuid': uuid})
def entity_get_from_lister_metadata(self, entities):
return self.post('entity/from_lister_metadata', {'entities': entities})
def stat_counters(self):
return self.get('stat/counters')
def directory_entry_get_by_path(self, directory, paths):
return self.post('directory/path', dict(directory=directory,
paths=paths))
diff --git a/swh/storage/api/common.py b/swh/storage/api/common.py
new file mode 100644
index 000000000..328d82604
--- /dev/null
+++ b/swh/storage/api/common.py
@@ -0,0 +1,69 @@
+# Copyright (C) 2015 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import json
+import pickle
+
+from flask import Request, Response
+
+from swh.core.serializers import msgpack_dumps, msgpack_loads, SWHJSONDecoder
+
+
+class BytesRequest(Request):
+ """Request with proper escaping of arbitrary byte sequences."""
+ encoding = 'utf-8'
+ encoding_errors = 'surrogateescape'
+
+
+def encode_data_server(data):
+ return Response(
+ msgpack_dumps(data),
+ mimetype='application/x-msgpack',
+ )
+
+
+def encode_data_client(data):
+ try:
+ return msgpack_dumps(data)
+ except OverflowError as e:
+ raise ValueError('Limits were reached. Please, check your input.\n' +
+ str(e))
+
+
+def decode_request(request):
+ content_type = request.mimetype
+ data = request.get_data()
+
+ if content_type == 'application/x-msgpack':
+ r = msgpack_loads(data)
+ elif content_type == 'application/json':
+ r = json.loads(data, cls=SWHJSONDecoder)
+ else:
+ raise ValueError('Wrong content type `%s` for API request'
+ % content_type)
+
+ return r
+
+
+def decode_response(response):
+ content_type = response.headers['content-type']
+
+ if content_type.startswith('application/x-msgpack'):
+ r = msgpack_loads(response.content)
+ elif content_type.startswith('application/json'):
+ r = response.json(cls=SWHJSONDecoder)
+ else:
+ raise ValueError('Wrong content type `%s` for API response'
+ % content_type)
+
+ return r
+
+
+def error_handler(exception, encoder):
+ # XXX: this breaks language-independence and should be
+ # replaced by proper serialization of errors
+ response = encoder(pickle.dumps(exception))
+ response.status_code = 400
+ return response
diff --git a/swh/storage/api/server.py b/swh/storage/api/server.py
index bce183e71..de7bb3320 100644
--- a/swh/storage/api/server.py
+++ b/swh/storage/api/server.py
@@ -1,278 +1,246 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
import logging
-import pickle
-from flask import Flask, Request, Response, g, request
+from flask import Flask, g, request
from swh.core import config
-from swh.core.serializers import msgpack_dumps, msgpack_loads, SWHJSONDecoder
-from swh.storage import Storage
+from .. import Storage
+from ..api.common import (BytesRequest, decode_request, error_handler,
+ encode_data_server as encode_data)
DEFAULT_CONFIG = {
'db': ('str', 'dbname=softwareheritage-dev'),
'storage_base': ('str', '/tmp/swh-storage/test'),
}
-class BytesRequest(Request):
- """Request with proper escaping of arbitrary byte sequences."""
- encoding = 'utf-8'
- encoding_errors = 'surrogateescape'
-
-
app = Flask(__name__)
app.request_class = BytesRequest
-def encode_data(data):
- return Response(
- msgpack_dumps(data),
- mimetype='application/x-msgpack',
- )
-
-
-def decode_request(request):
- content_type = request.mimetype
- data = request.get_data()
-
- if content_type == 'application/x-msgpack':
- r = msgpack_loads(data)
- elif content_type == 'application/json':
- r = json.loads(data, cls=SWHJSONDecoder)
- else:
- raise ValueError('Wrong content type `%s` for API request'
- % content_type)
-
- return r
-
-
@app.errorhandler(Exception)
-def error_handler(exception):
- # XXX: this breaks language-independence and should be
- # replaced by proper serialization of errors
- response = encode_data(pickle.dumps(exception))
- response.status_code = 400
- return response
+def my_error_handler(exception):
+ return error_handler(exception, encode_data)
@app.before_request
def before_request():
g.storage = Storage(app.config['db'], app.config['storage_base'])
@app.route('/')
def index():
return 'Hello'
@app.route('/content/missing', methods=['POST'])
def content_missing():
return encode_data(g.storage.content_missing(**decode_request(request)))
@app.route('/content/missing/sha1', methods=['POST'])
def content_missing_per_sha1():
return encode_data(g.storage.content_missing_per_sha1(
**decode_request(request)))
@app.route('/content/present', methods=['POST'])
def content_find():
return encode_data(g.storage.content_find(**decode_request(request)))
@app.route('/content/occurrence', methods=['POST'])
def content_find_occurrence():
res = g.storage.content_find_occurrence(**decode_request(request))
return encode_data(res)
@app.route('/content/add', methods=['POST'])
def content_add():
return encode_data(g.storage.content_add(**decode_request(request)))
@app.route('/content/data', methods=['POST'])
def content_get():
return encode_data(g.storage.content_get(**decode_request(request)))
@app.route('/directory', methods=['POST'])
def directory_get():
return encode_data(g.storage.directory_get(**decode_request(request)))
@app.route('/directory/missing', methods=['POST'])
def directory_missing():
return encode_data(g.storage.directory_missing(**decode_request(request)))
@app.route('/directory/add', methods=['POST'])
def directory_add():
return encode_data(g.storage.directory_add(**decode_request(request)))
@app.route('/directory/path', methods=['POST'])
def directory_entry_get_by_path():
return encode_data(g.storage.directory_entry_get_by_path(
**decode_request(request)))
@app.route('/directory/ls', methods=['GET'])
def directory_ls():
dir = request.args['directory'].encode('utf-8', 'surrogateescape')
rec = json.loads(request.args.get('recursive', 'False').lower())
return encode_data(g.storage.directory_ls(dir, recursive=rec))
@app.route('/revision/add', methods=['POST'])
def revision_add():
return encode_data(g.storage.revision_add(**decode_request(request)))
@app.route('/revision', methods=['POST'])
def revision_get():
return encode_data(g.storage.revision_get(**decode_request(request)))
@app.route('/revision/by', methods=['POST'])
def revision_get_by():
return encode_data(g.storage.revision_get_by(**decode_request(request)))
@app.route('/revision/log', methods=['POST'])
def revision_log():
return encode_data(g.storage.revision_log(**decode_request(request)))
@app.route('/revision/shortlog', methods=['POST'])
def revision_shortlog():
return encode_data(g.storage.revision_shortlog(**decode_request(request)))
@app.route('/revision/missing', methods=['POST'])
def revision_missing():
return encode_data(g.storage.revision_missing(**decode_request(request)))
@app.route('/release/add', methods=['POST'])
def release_add():
return encode_data(g.storage.release_add(**decode_request(request)))
@app.route('/release', methods=['POST'])
def release_get():
return encode_data(g.storage.release_get(**decode_request(request)))
@app.route('/release/by', methods=['POST'])
def release_get_by():
return encode_data(g.storage.release_get_by(**decode_request(request)))
@app.route('/release/missing', methods=['POST'])
def release_missing():
return encode_data(g.storage.release_missing(**decode_request(request)))
@app.route('/object/find_by_sha1_git', methods=['POST'])
def object_find_by_sha1_git():
return encode_data(g.storage.object_find_by_sha1_git(
**decode_request(request)))
@app.route('/occurrence', methods=['POST'])
def occurrence_get():
return encode_data(g.storage.occurrence_get(**decode_request(request)))
@app.route('/occurrence/add', methods=['POST'])
def occurrence_add():
return encode_data(g.storage.occurrence_add(**decode_request(request)))
@app.route('/origin/get', methods=['POST'])
def origin_get():
return encode_data(g.storage.origin_get(**decode_request(request)))
@app.route('/origin/add', methods=['POST'])
def origin_add_one():
return encode_data(g.storage.origin_add_one(**decode_request(request)))
@app.route('/person', methods=['POST'])
def person_get():
return encode_data(g.storage.person_get(**decode_request(request)))
@app.route('/fetch_history', methods=['GET'])
def fetch_history_get():
return encode_data(g.storage.fetch_history_get(request.args['id']))
@app.route('/fetch_history/start', methods=['POST'])
def fetch_history_start():
return encode_data(
g.storage.fetch_history_start(**decode_request(request)))
@app.route('/fetch_history/end', methods=['POST'])
def fetch_history_end():
return encode_data(
g.storage.fetch_history_end(**decode_request(request)))
@app.route('/entity/add', methods=['POST'])
def entity_add():
return encode_data(
g.storage.entity_add(**decode_request(request)))
@app.route('/entity/get', methods=['POST'])
def entity_get():
return encode_data(
g.storage.entity_get(**decode_request(request)))
@app.route('/entity', methods=['GET'])
def entity_get_one():
return encode_data(g.storage.entity_get_one(request.args['uuid']))
@app.route('/entity/from_lister_metadata', methods=['POST'])
def entity_from_lister_metadata():
return encode_data(
g.storage.entity_get_from_lister_metadata(**decode_request(request)))
@app.route('/stat/counters', methods=['GET'])
def stat_counters():
return encode_data(g.storage.stat_counters())
def run_from_webserver(environ, start_response):
"""Run the WSGI app from the webserver, loading the configuration."""
config_path = '/etc/softwareheritage/storage/storage.ini'
app.config.update(config.read(config_path, DEFAULT_CONFIG))
handler = logging.StreamHandler()
app.logger.addHandler(handler)
return app(environ, start_response)
if __name__ == '__main__':
import sys
app.config.update(config.read(sys.argv[1], DEFAULT_CONFIG))
host = sys.argv[2] if len(sys.argv) >= 3 else '127.0.0.1'
app.run(host, debug=True)
diff --git a/swh/storage/objstorage/api/__init__.py b/swh/storage/objstorage/api/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/swh/storage/objstorage/api/client.py b/swh/storage/objstorage/api/client.py
new file mode 100644
index 000000000..8f0308651
--- /dev/null
+++ b/swh/storage/objstorage/api/client.py
@@ -0,0 +1,92 @@
+# Copyright (C) 2015 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+
+import pickle
+
+import requests
+
+from requests.exceptions import ConnectionError
+from ...exc import StorageAPIError
+from ...api.common import (decode_response,
+ encode_data_client as encode_data)
+
+
+class RemoteObjStorage():
+ """ Proxy to a remote object storage.
+
+ This class allows to connect to an object storage server via
+ http protocol.
+
+ Attributes:
+ base_url (string): The url of the server to connect. Must end
+ with a '/'
+ session: The session to send requests.
+ """
+ def __init__(self, base_url):
+ self.base_url = base_url
+ self.session = requests.Session()
+
+ def url(self, endpoint):
+ return '%s%s' % (self.base_url, endpoint)
+
+ def post(self, endpoint, data):
+ try:
+ response = self.session.post(
+ self.url(endpoint),
+ data=encode_data(data),
+ headers={'content-type': 'application/x-msgpack'},
+ )
+ except ConnectionError as e:
+ print(str(e))
+ raise StorageAPIError(e)
+
+ # XXX: this breaks language-independence and should be
+ # replaced by proper unserialization
+ if response.status_code == 400:
+ raise pickle.loads(decode_response(response))
+
+ return decode_response(response)
+
+ def content_add(self, bytes, obj_id=None):
+ """ Add a new object to the object storage.
+
+ Args:
+ bytes: content of the object to be added to the storage.
+ obj_id: checksums of `bytes` as computed by ID_HASH_ALGO. When
+ given, obj_id will be trusted to match bytes. If missing,
+ obj_id will be computed on the fly.
+
+ """
+ return self.post('content/add', {'bytes': bytes, 'obj_id': obj_id})
+
+ def content_get(self, obj_id):
+ """ Retrieve the content of a given object.
+
+ Args:
+ obj_id: The id of the object.
+
+ Returns:
+ The content of the requested objects as bytes.
+
+ Raises:
+ ObjNotFoundError: if the requested object is missing
+ """
+ return self.post('content/get', {'obj_id': obj_id})
+
+ def content_check(self, obj_id):
+ """ Integrity check for a given object
+
+ verify that the file object is in place, and that the gzipped content
+ matches the object id
+
+ Args:
+ obj_id: The id of the object.
+
+ Raises:
+ ObjNotFoundError: if the requested object is missing
+ Error: if the requested object is corrupt
+ """
+ self.post('content/check', {'obj_id': obj_id})
diff --git a/swh/storage/objstorage/api/server.py b/swh/storage/objstorage/api/server.py
new file mode 100644
index 000000000..1cd1f3ebc
--- /dev/null
+++ b/swh/storage/objstorage/api/server.py
@@ -0,0 +1,67 @@
+# Copyright (C) 2015 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+
+from flask import Flask, g, request
+
+from swh.core import config
+from .. import ObjStorage
+from ...api.common import (BytesRequest, decode_request, error_handler,
+ encode_data_server as encode_data)
+
+DEFAULT_CONFIG = {
+ 'storage_base': ('str', '/tmp/swh-storage/objects/'),
+ 'storage_depth': ('int', 3)
+}
+
+app = Flask(__name__)
+app.request_class = BytesRequest
+
+
+@app.errorhandler(Exception)
+def my_error_handler(exception):
+ return error_handler(exception, encode_data)
+
+
+@app.before_request
+def before_request():
+ g.objstorage = ObjStorage(app.config['storage_base'],
+ app.config['storage_depth'])
+
+
+@app.route('/')
+def index():
+ return "Helloworld!"
+
+
+@app.route('/content')
+def content():
+ return str(list(g.storage))
+
+
+@app.route('/content/add', methods=['POST'])
+def add_bytes():
+ return encode_data(g.objstorage.add_bytes(**decode_request(request)))
+
+
+@app.route('/content/get', methods=['POST'])
+def get_bytes():
+ return encode_data(g.objstorage.get_bytes(**decode_request(request)))
+
+
+@app.route('/content/check', methods=['POST'])
+def check():
+ # TODO verify that an error on this content will be properly intercepted
+ # by @app.errorhandler and the answer will be sent to client.
+ return encode_data(g.objstorage.check(**decode_request(request)))
+
+
+if __name__ == '__main__':
+ import sys
+
+ app.config.update(config.read(sys.argv[1], DEFAULT_CONFIG))
+
+ host = sys.argv[2] if len(sys.argv) >= 3 else '0.0.0.0'
+ app.run(host, debug=True)

File Metadata

Mime Type
text/x-diff
Expires
Fri, Jul 4, 11:48 AM (3 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3251408

Event Timeline