Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9341179
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
23 KB
Subscribers
None
View Options
diff --git a/swh/storage/api/client.py b/swh/storage/api/client.py
index 937d7f6ae..f8f9ad207 100644
--- a/swh/storage/api/client.py
+++ b/swh/storage/api/client.py
@@ -1,199 +1,178 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import pickle
-
import requests
from requests.exceptions import ConnectionError
-from swh.core.serializers import msgpack_dumps, msgpack_loads, SWHJSONDecoder
-from swh.storage.exc import StorageAPIError
-
-
-def encode_data(data):
- try:
- return msgpack_dumps(data)
- except OverflowError as e:
- raise ValueError('Limits were reached. Please, check your input.\n' +
- str(e))
-
-
-def decode_response(response):
- content_type = response.headers['content-type']
-
- if content_type.startswith('application/x-msgpack'):
- r = msgpack_loads(response.content)
- elif content_type.startswith('application/json'):
- r = response.json(cls=SWHJSONDecoder)
- else:
- raise ValueError('Wrong content type `%s` for API response'
- % content_type)
- return r
+from ..exc import StorageAPIError
+from ..api.common import (decode_response,
+ encode_data_client as encode_data)
class RemoteStorage():
"""Proxy to a remote storage API"""
def __init__(self, base_url):
self.base_url = base_url
self.session = requests.Session()
def url(self, endpoint):
return '%s%s' % (self.base_url, endpoint)
def post(self, endpoint, data):
try:
response = self.session.post(
self.url(endpoint),
data=encode_data(data),
headers={'content-type': 'application/x-msgpack'},
)
except ConnectionError as e:
print(str(e))
raise StorageAPIError(e)
# XXX: this breaks language-independence and should be
# replaced by proper unserialization
if response.status_code == 400:
raise pickle.loads(decode_response(response))
return decode_response(response)
def get(self, endpoint, data=None):
try:
response = self.session.get(
self.url(endpoint),
params=data,
)
except ConnectionError as e:
print(str(e))
raise StorageAPIError(e)
if response.status_code == 404:
return None
# XXX: this breaks language-independence and should be
# replaced by proper unserialization
if response.status_code == 400:
raise pickle.loads(decode_response(response))
else:
return decode_response(response)
def content_add(self, content):
return self.post('content/add', {'content': content})
def content_missing(self, content, key_hash='sha1'):
return self.post('content/missing', {'content': content,
'key_hash': key_hash})
def content_missing_per_sha1(self, contents):
return self.post('content/missing/sha1', {'contents': contents})
def content_get(self, content):
return self.post('content/data', {'content': content})
def content_find(self, content):
return self.post('content/present', {'content': content})
def content_find_occurrence(self, content):
return self.post('content/occurrence', {'content': content})
def directory_add(self, directories):
return self.post('directory/add', {'directories': directories})
def directory_missing(self, directories):
return self.post('directory/missing', {'directories': directories})
def directory_get(self, directories):
return self.post('directory', dict(directories=directories))
def directory_ls(self, directory, recursive=False):
return self.get('directory/ls', {'directory': directory,
'recursive': recursive})
def revision_get(self, revisions):
return self.post('revision', {'revisions': revisions})
def revision_get_by(self, origin_id, branch_name, timestamp, limit=None):
return self.post('revision/by', dict(origin_id=origin_id,
branch_name=branch_name,
timestamp=timestamp,
limit=limit))
def revision_log(self, revisions, limit=None):
return self.post('revision/log', {'revisions': revisions,
'limit': limit})
def revision_shortlog(self, revisions, limit=None):
return self.post('revision/shortlog', {'revisions': revisions,
'limit': limit})
def revision_add(self, revisions):
return self.post('revision/add', {'revisions': revisions})
def revision_missing(self, revisions):
return self.post('revision/missing', {'revisions': revisions})
def release_add(self, releases):
return self.post('release/add', {'releases': releases})
def release_get(self, releases):
return self.post('release', {'releases': releases})
def release_get_by(self, origin_id, limit=None):
return self.post('release/by', dict(origin_id=origin_id,
limit=limit))
def release_missing(self, releases):
return self.post('release/missing', {'releases': releases})
def object_find_by_sha1_git(self, ids):
return self.post('object/find_by_sha1_git', {'ids': ids})
def occurrence_get(self, origin_id):
return self.post('occurrence', {'origin_id': origin_id})
def occurrence_add(self, occurrences):
return self.post('occurrence/add', {'occurrences': occurrences})
def origin_get(self, origin):
return self.post('origin/get', {'origin': origin})
def origin_add_one(self, origin):
return self.post('origin/add', {'origin': origin})
def person_get(self, person):
return self.post('person', {'person': person})
def fetch_history_start(self, origin_id):
return self.post('fetch_history/start', {'origin_id': origin_id})
def fetch_history_end(self, fetch_history_id, data):
return self.post('fetch_history/end',
{'fetch_history_id': fetch_history_id,
'data': data})
def fetch_history_get(self, fetch_history_id):
return self.get('fetch_history', {'id': fetch_history_id})
def entity_add(self, entities):
return self.post('entity/add', {'entities': entities})
def entity_get(self, uuid):
return self.post('entity/get', {'uuid': uuid})
def entity_get_one(self, uuid):
return self.get('entity', {'uuid': uuid})
def entity_get_from_lister_metadata(self, entities):
return self.post('entity/from_lister_metadata', {'entities': entities})
def stat_counters(self):
return self.get('stat/counters')
def directory_entry_get_by_path(self, directory, paths):
return self.post('directory/path', dict(directory=directory,
paths=paths))
diff --git a/swh/storage/api/common.py b/swh/storage/api/common.py
new file mode 100644
index 000000000..328d82604
--- /dev/null
+++ b/swh/storage/api/common.py
@@ -0,0 +1,69 @@
+# Copyright (C) 2015 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import json
+import pickle
+
+from flask import Request, Response
+
+from swh.core.serializers import msgpack_dumps, msgpack_loads, SWHJSONDecoder
+
+
+class BytesRequest(Request):
+ """Request with proper escaping of arbitrary byte sequences."""
+ encoding = 'utf-8'
+ encoding_errors = 'surrogateescape'
+
+
+def encode_data_server(data):
+ return Response(
+ msgpack_dumps(data),
+ mimetype='application/x-msgpack',
+ )
+
+
+def encode_data_client(data):
+ try:
+ return msgpack_dumps(data)
+ except OverflowError as e:
+ raise ValueError('Limits were reached. Please, check your input.\n' +
+ str(e))
+
+
+def decode_request(request):
+ content_type = request.mimetype
+ data = request.get_data()
+
+ if content_type == 'application/x-msgpack':
+ r = msgpack_loads(data)
+ elif content_type == 'application/json':
+ r = json.loads(data, cls=SWHJSONDecoder)
+ else:
+ raise ValueError('Wrong content type `%s` for API request'
+ % content_type)
+
+ return r
+
+
+def decode_response(response):
+ content_type = response.headers['content-type']
+
+ if content_type.startswith('application/x-msgpack'):
+ r = msgpack_loads(response.content)
+ elif content_type.startswith('application/json'):
+ r = response.json(cls=SWHJSONDecoder)
+ else:
+ raise ValueError('Wrong content type `%s` for API response'
+ % content_type)
+
+ return r
+
+
+def error_handler(exception, encoder):
+ # XXX: this breaks language-independence and should be
+ # replaced by proper serialization of errors
+ response = encoder(pickle.dumps(exception))
+ response.status_code = 400
+ return response
diff --git a/swh/storage/api/server.py b/swh/storage/api/server.py
index bce183e71..de7bb3320 100644
--- a/swh/storage/api/server.py
+++ b/swh/storage/api/server.py
@@ -1,278 +1,246 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
import logging
-import pickle
-from flask import Flask, Request, Response, g, request
+from flask import Flask, g, request
from swh.core import config
-from swh.core.serializers import msgpack_dumps, msgpack_loads, SWHJSONDecoder
-from swh.storage import Storage
+from .. import Storage
+from ..api.common import (BytesRequest, decode_request, error_handler,
+ encode_data_server as encode_data)
DEFAULT_CONFIG = {
'db': ('str', 'dbname=softwareheritage-dev'),
'storage_base': ('str', '/tmp/swh-storage/test'),
}
-class BytesRequest(Request):
- """Request with proper escaping of arbitrary byte sequences."""
- encoding = 'utf-8'
- encoding_errors = 'surrogateescape'
-
-
app = Flask(__name__)
app.request_class = BytesRequest
-def encode_data(data):
- return Response(
- msgpack_dumps(data),
- mimetype='application/x-msgpack',
- )
-
-
-def decode_request(request):
- content_type = request.mimetype
- data = request.get_data()
-
- if content_type == 'application/x-msgpack':
- r = msgpack_loads(data)
- elif content_type == 'application/json':
- r = json.loads(data, cls=SWHJSONDecoder)
- else:
- raise ValueError('Wrong content type `%s` for API request'
- % content_type)
-
- return r
-
-
@app.errorhandler(Exception)
-def error_handler(exception):
- # XXX: this breaks language-independence and should be
- # replaced by proper serialization of errors
- response = encode_data(pickle.dumps(exception))
- response.status_code = 400
- return response
+def my_error_handler(exception):
+ return error_handler(exception, encode_data)
@app.before_request
def before_request():
g.storage = Storage(app.config['db'], app.config['storage_base'])
@app.route('/')
def index():
return 'Hello'
@app.route('/content/missing', methods=['POST'])
def content_missing():
return encode_data(g.storage.content_missing(**decode_request(request)))
@app.route('/content/missing/sha1', methods=['POST'])
def content_missing_per_sha1():
return encode_data(g.storage.content_missing_per_sha1(
**decode_request(request)))
@app.route('/content/present', methods=['POST'])
def content_find():
return encode_data(g.storage.content_find(**decode_request(request)))
@app.route('/content/occurrence', methods=['POST'])
def content_find_occurrence():
res = g.storage.content_find_occurrence(**decode_request(request))
return encode_data(res)
@app.route('/content/add', methods=['POST'])
def content_add():
return encode_data(g.storage.content_add(**decode_request(request)))
@app.route('/content/data', methods=['POST'])
def content_get():
return encode_data(g.storage.content_get(**decode_request(request)))
@app.route('/directory', methods=['POST'])
def directory_get():
return encode_data(g.storage.directory_get(**decode_request(request)))
@app.route('/directory/missing', methods=['POST'])
def directory_missing():
return encode_data(g.storage.directory_missing(**decode_request(request)))
@app.route('/directory/add', methods=['POST'])
def directory_add():
return encode_data(g.storage.directory_add(**decode_request(request)))
@app.route('/directory/path', methods=['POST'])
def directory_entry_get_by_path():
return encode_data(g.storage.directory_entry_get_by_path(
**decode_request(request)))
@app.route('/directory/ls', methods=['GET'])
def directory_ls():
dir = request.args['directory'].encode('utf-8', 'surrogateescape')
rec = json.loads(request.args.get('recursive', 'False').lower())
return encode_data(g.storage.directory_ls(dir, recursive=rec))
@app.route('/revision/add', methods=['POST'])
def revision_add():
return encode_data(g.storage.revision_add(**decode_request(request)))
@app.route('/revision', methods=['POST'])
def revision_get():
return encode_data(g.storage.revision_get(**decode_request(request)))
@app.route('/revision/by', methods=['POST'])
def revision_get_by():
return encode_data(g.storage.revision_get_by(**decode_request(request)))
@app.route('/revision/log', methods=['POST'])
def revision_log():
return encode_data(g.storage.revision_log(**decode_request(request)))
@app.route('/revision/shortlog', methods=['POST'])
def revision_shortlog():
return encode_data(g.storage.revision_shortlog(**decode_request(request)))
@app.route('/revision/missing', methods=['POST'])
def revision_missing():
return encode_data(g.storage.revision_missing(**decode_request(request)))
@app.route('/release/add', methods=['POST'])
def release_add():
return encode_data(g.storage.release_add(**decode_request(request)))
@app.route('/release', methods=['POST'])
def release_get():
return encode_data(g.storage.release_get(**decode_request(request)))
@app.route('/release/by', methods=['POST'])
def release_get_by():
return encode_data(g.storage.release_get_by(**decode_request(request)))
@app.route('/release/missing', methods=['POST'])
def release_missing():
return encode_data(g.storage.release_missing(**decode_request(request)))
@app.route('/object/find_by_sha1_git', methods=['POST'])
def object_find_by_sha1_git():
return encode_data(g.storage.object_find_by_sha1_git(
**decode_request(request)))
@app.route('/occurrence', methods=['POST'])
def occurrence_get():
return encode_data(g.storage.occurrence_get(**decode_request(request)))
@app.route('/occurrence/add', methods=['POST'])
def occurrence_add():
return encode_data(g.storage.occurrence_add(**decode_request(request)))
@app.route('/origin/get', methods=['POST'])
def origin_get():
return encode_data(g.storage.origin_get(**decode_request(request)))
@app.route('/origin/add', methods=['POST'])
def origin_add_one():
return encode_data(g.storage.origin_add_one(**decode_request(request)))
@app.route('/person', methods=['POST'])
def person_get():
return encode_data(g.storage.person_get(**decode_request(request)))
@app.route('/fetch_history', methods=['GET'])
def fetch_history_get():
return encode_data(g.storage.fetch_history_get(request.args['id']))
@app.route('/fetch_history/start', methods=['POST'])
def fetch_history_start():
return encode_data(
g.storage.fetch_history_start(**decode_request(request)))
@app.route('/fetch_history/end', methods=['POST'])
def fetch_history_end():
return encode_data(
g.storage.fetch_history_end(**decode_request(request)))
@app.route('/entity/add', methods=['POST'])
def entity_add():
return encode_data(
g.storage.entity_add(**decode_request(request)))
@app.route('/entity/get', methods=['POST'])
def entity_get():
return encode_data(
g.storage.entity_get(**decode_request(request)))
@app.route('/entity', methods=['GET'])
def entity_get_one():
return encode_data(g.storage.entity_get_one(request.args['uuid']))
@app.route('/entity/from_lister_metadata', methods=['POST'])
def entity_from_lister_metadata():
return encode_data(
g.storage.entity_get_from_lister_metadata(**decode_request(request)))
@app.route('/stat/counters', methods=['GET'])
def stat_counters():
return encode_data(g.storage.stat_counters())
def run_from_webserver(environ, start_response):
"""Run the WSGI app from the webserver, loading the configuration."""
config_path = '/etc/softwareheritage/storage/storage.ini'
app.config.update(config.read(config_path, DEFAULT_CONFIG))
handler = logging.StreamHandler()
app.logger.addHandler(handler)
return app(environ, start_response)
if __name__ == '__main__':
import sys
app.config.update(config.read(sys.argv[1], DEFAULT_CONFIG))
host = sys.argv[2] if len(sys.argv) >= 3 else '127.0.0.1'
app.run(host, debug=True)
diff --git a/swh/storage/objstorage/api/__init__.py b/swh/storage/objstorage/api/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/swh/storage/objstorage/api/client.py b/swh/storage/objstorage/api/client.py
new file mode 100644
index 000000000..8f0308651
--- /dev/null
+++ b/swh/storage/objstorage/api/client.py
@@ -0,0 +1,92 @@
+# Copyright (C) 2015 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+
+import pickle
+
+import requests
+
+from requests.exceptions import ConnectionError
+from ...exc import StorageAPIError
+from ...api.common import (decode_response,
+ encode_data_client as encode_data)
+
+
+class RemoteObjStorage():
+ """ Proxy to a remote object storage.
+
+ This class allows to connect to an object storage server via
+ http protocol.
+
+ Attributes:
+ base_url (string): The url of the server to connect. Must end
+ with a '/'
+ session: The session to send requests.
+ """
+ def __init__(self, base_url):
+ self.base_url = base_url
+ self.session = requests.Session()
+
+ def url(self, endpoint):
+ return '%s%s' % (self.base_url, endpoint)
+
+ def post(self, endpoint, data):
+ try:
+ response = self.session.post(
+ self.url(endpoint),
+ data=encode_data(data),
+ headers={'content-type': 'application/x-msgpack'},
+ )
+ except ConnectionError as e:
+ print(str(e))
+ raise StorageAPIError(e)
+
+ # XXX: this breaks language-independence and should be
+ # replaced by proper unserialization
+ if response.status_code == 400:
+ raise pickle.loads(decode_response(response))
+
+ return decode_response(response)
+
+ def content_add(self, bytes, obj_id=None):
+ """ Add a new object to the object storage.
+
+ Args:
+ bytes: content of the object to be added to the storage.
+ obj_id: checksums of `bytes` as computed by ID_HASH_ALGO. When
+ given, obj_id will be trusted to match bytes. If missing,
+ obj_id will be computed on the fly.
+
+ """
+ return self.post('content/add', {'bytes': bytes, 'obj_id': obj_id})
+
+ def content_get(self, obj_id):
+ """ Retrieve the content of a given object.
+
+ Args:
+ obj_id: The id of the object.
+
+ Returns:
+ The content of the requested objects as bytes.
+
+ Raises:
+ ObjNotFoundError: if the requested object is missing
+ """
+ return self.post('content/get', {'obj_id': obj_id})
+
+ def content_check(self, obj_id):
+ """ Integrity check for a given object
+
+ verify that the file object is in place, and that the gzipped content
+ matches the object id
+
+ Args:
+ obj_id: The id of the object.
+
+ Raises:
+ ObjNotFoundError: if the requested object is missing
+ Error: if the requested object is corrupt
+ """
+ self.post('content/check', {'obj_id': obj_id})
diff --git a/swh/storage/objstorage/api/server.py b/swh/storage/objstorage/api/server.py
new file mode 100644
index 000000000..1cd1f3ebc
--- /dev/null
+++ b/swh/storage/objstorage/api/server.py
@@ -0,0 +1,67 @@
+# Copyright (C) 2015 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+
+from flask import Flask, g, request
+
+from swh.core import config
+from .. import ObjStorage
+from ...api.common import (BytesRequest, decode_request, error_handler,
+ encode_data_server as encode_data)
+
+DEFAULT_CONFIG = {
+ 'storage_base': ('str', '/tmp/swh-storage/objects/'),
+ 'storage_depth': ('int', 3)
+}
+
+app = Flask(__name__)
+app.request_class = BytesRequest
+
+
+@app.errorhandler(Exception)
+def my_error_handler(exception):
+ return error_handler(exception, encode_data)
+
+
+@app.before_request
+def before_request():
+ g.objstorage = ObjStorage(app.config['storage_base'],
+ app.config['storage_depth'])
+
+
+@app.route('/')
+def index():
+ return "Helloworld!"
+
+
+@app.route('/content')
+def content():
+ return str(list(g.storage))
+
+
+@app.route('/content/add', methods=['POST'])
+def add_bytes():
+ return encode_data(g.objstorage.add_bytes(**decode_request(request)))
+
+
+@app.route('/content/get', methods=['POST'])
+def get_bytes():
+ return encode_data(g.objstorage.get_bytes(**decode_request(request)))
+
+
+@app.route('/content/check', methods=['POST'])
+def check():
+ # TODO verify that an error on this content will be properly intercepted
+ # by @app.errorhandler and the answer will be sent to client.
+ return encode_data(g.objstorage.check(**decode_request(request)))
+
+
+if __name__ == '__main__':
+ import sys
+
+ app.config.update(config.read(sys.argv[1], DEFAULT_CONFIG))
+
+ host = sys.argv[2] if len(sys.argv) >= 3 else '0.0.0.0'
+ app.run(host, debug=True)
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Fri, Jul 4, 11:48 AM (3 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3251408
Attached To
R65 Staging repository
Event Timeline
Log In to Comment