Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/objstorage/api/load_balancing_client.py
- This file was added.
# Copyright (C) 2015 The Software Heritage developers | |||||
# See the AUTHORS file at the top-level directory of this distribution | |||||
# License: GNU General Public License version 3, or any later version | |||||
# See top-level LICENSE file for more information | |||||
from . import client | |||||
from swh.core import hashutil | |||||
from swh.storage.exc import ObjNotFoundError | |||||
class LoadBalancingClient(): | |||||
""" A proxy for multiple remote object storages. | |||||
This proxy client does some load balancing between the remote | |||||
backup servers. | |||||
WARNING This is a work in progress and does not yet update | |||||
the archive database ('content_archive' table) | |||||
Attributes: | |||||
db: the database which contains data about where the content is | |||||
saved. | |||||
server_queue (ClientBalancer): a container of remote storages that | |||||
allows to get the best suited remote storage. | |||||
""" | |||||
class ClientBalancer(): | |||||
""" Container of remote objstorage that selects the best one. | |||||
Actually this queue does not take care of servers capacities | |||||
and only return the server that have been unused for the | |||||
largest amount of time. | |||||
Attributes: | |||||
objstorages: The list of object storages, as a dict that | |||||
associates an archive id to the corresponding remote | |||||
objstorage. | |||||
""" | |||||
def __init__(self, *servers): | |||||
""" Initializes the remote storage servers queue. | |||||
Args: | |||||
*servers: a list of tuples (server_id, server_url) | |||||
that represents the servers. | |||||
""" | |||||
storages = [] | |||||
for server_id, server_url in servers: | |||||
storages.append((server_id, client.RemoteStorage(server_url))) | |||||
self.objstorages = storages | |||||
def get_any(self): | |||||
_id, storage = self.storages.pop(0) | |||||
self.storages.append((_id, storage)) | |||||
return storage | |||||
def get_all_in(self, storage_list): | |||||
return [remote for id, remote in self.storages | |||||
if id in storage_list] | |||||
def get_one_of(self, allowed_list): | |||||
pass | |||||
def __init__(self, db, *servers): | |||||
self.db = db | |||||
self.balancer = self.ClientQueue(servers) | |||||
def __get_storages_for(self, id): | |||||
""" Get the storages that contains id. | |||||
Return: | |||||
All the storages with a content matching the given id. | |||||
""" | |||||
with self.db.transaction() as cur: | |||||
cur.execute("""SELECT archive_id | |||||
FROM content_archive | |||||
WHERE content_id='%s' and status='present' | |||||
""" % id) | |||||
res = list(cur.fetchall()) | |||||
return res | |||||
def content_add(self, bytes, obj_id=None): | |||||
""" Add a new object to the object storage. | |||||
Args: | |||||
bytes: content of the object to be added to the storage. | |||||
obj_id: checksums of `bytes` as computed by ID_HASH_ALGO. When | |||||
given, obj_id will be trusted to match bytes. If missing, | |||||
obj_id will be computed on the fly. | |||||
""" | |||||
self.balancer.get_any.content_add(bytes, obj_id) | |||||
def content_get(self, obj_id): | |||||
""" Retrieve the content of a given object. | |||||
Args: | |||||
obj_id: The id of the object. | |||||
Returns: | |||||
The content of the requested objects as bytes. | |||||
Raises: | |||||
ObjNotFoundError: if the requested object is missing | |||||
""" | |||||
# Get a server where the content is present. | |||||
id = r'\x' + hashutil.hash_to_hex(obj_id['sha1']) | |||||
res = self.__get_storages_for(id) | |||||
if not res: | |||||
raise ObjNotFoundError(id) | |||||
return self.balancer.get_one_of(res).content_get(bytes, obj_id) | |||||
def content_check(self, obj_id): | |||||
""" Integrity check for a given object | |||||
Verify that the file object is in place, and that the gzipped content | |||||
matches the object id | |||||
Args: | |||||
obj_id: The id of the object. | |||||
Raises: | |||||
ObjNotFoundError: if the requested object is missing | |||||
Error: if the requested object is corrupt | |||||
""" | |||||
# Get the servers that contains the content. | |||||
id = r'\x' + hashutil.hash_to_hex(obj_id) | |||||
res = self.__get_storages_for(id) | |||||
# And do the check on each of them | |||||
# TODO This is a good place to do some self_healing | |||||
# because we have access to all the storages. | |||||
for remote in self.balancer.get_all_in(res): | |||||
remote.content_check(obj_id) |