Page MenuHomeSoftware Heritage

client.py
No OneTemporary

client.py

# Copyright (C) 2015-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from swh.core.api import RPCClient
from swh.model import hashutil
from ..objstorage import DEFAULT_CHUNK_SIZE, DEFAULT_LIMIT
from ..exc import Error, ObjNotFoundError, ObjStorageAPIError
class RemoteObjStorage:
"""Proxy to a remote object storage.
This class allows to connect to an object storage server via
http protocol.
Attributes:
url (string): The url of the server to connect. Must end
with a '/'
session: The session to send requests.
"""
def __init__(self, **kwargs):
self._proxy = RPCClient(
api_exception=ObjStorageAPIError,
reraise_exceptions=[ObjNotFoundError, Error],
**kwargs)
def check_config(self, *, check_write):
return self._proxy.post('check_config', {'check_write': check_write})
def __contains__(self, obj_id):
return self._proxy.post('content/contains', {'obj_id': obj_id})
def add(self, content, obj_id=None, check_presence=True):
return self._proxy.post('content/add', {
'content': content, 'obj_id': obj_id,
'check_presence': check_presence})
def add_batch(self, contents, check_presence=True):
return self._proxy.post('content/add/batch', {
'contents': contents,
'check_presence': check_presence,
})
def restore(self, content, obj_id=None, *args, **kwargs):
return self.add(content, obj_id, check_presence=False)
def get(self, obj_id):
return self._proxy.post('content/get', {'obj_id': obj_id})
def get_batch(self, obj_ids):
return self._proxy.post('content/get/batch', {'obj_ids': obj_ids})
def check(self, obj_id):
return self._proxy.post('content/check', {'obj_id': obj_id})
def delete(self, obj_id):
# deletion permission are checked server-side
return self._proxy.post('content/delete', {'obj_id': obj_id})
# Management methods
def get_random(self, batch_size):
return self._proxy.post('content/get/random',
{'batch_size': batch_size})
# Streaming methods
def add_stream(self, content_iter, obj_id, check_presence=True):
obj_id = hashutil.hash_to_hex(obj_id)
return self._proxy.post_stream(
'content/add_stream/{}'.format(obj_id),
params={'check_presence': check_presence},
data=content_iter)
def get_stream(self, obj_id, chunk_size=DEFAULT_CHUNK_SIZE):
obj_id = hashutil.hash_to_hex(obj_id)
return self._proxy.get_stream('content/get_stream/{}'.format(obj_id),
chunk_size=chunk_size)
def __iter__(self):
yield from self._proxy.get_stream('content')
def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT):
params = {'limit': limit}
if last_obj_id:
params['last_obj_id'] = hashutil.hash_to_hex(last_obj_id)
yield from self._proxy.get_stream('content', params=params)

File Metadata

Mime Type
text/x-python
Expires
Jul 4 2025, 6:54 PM (5 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3288787

Event Timeline