Page MenuHomeSoftware Heritage

swh-objstorage-azure
No OneTemporary

swh-objstorage-azure

#!/usr/bin/env python3
# Copyright (C) 2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
# NOT FOR PRODUCTION
import click
from swh.core import config, hashutil
from swh.objstorage import exc, get_objstorage
class AzureAccess(config.SWHConfig):
"""This is an orchestration class to try and check objstorage_azure
implementation."""
DEFAULT_CONFIG = {
# Output storage
"storage_azure": (
"dict",
{
"cls": "pathslicing",
"args": {
"root": "/srv/softwareheritage/objects",
"slicing": "0:2/2:4/4:6",
},
},
),
# Input storage
"storage_local": (
"dict",
{
"cls": "pathslicing",
"args": {
"root": "/srv/softwareheritage/objects",
"slicing": "0:2/2:4/4:6",
},
},
),
}
CONFIG_BASE_FILENAME = "objstorage/azure"
def __init__(self):
super().__init__()
self.config = self.parse_config_file()
self.azure_cloud_storage = get_objstorage(**self.config["storage_azure"])
self.read_objstorage = get_objstorage(**self.config["storage_local"])
def list_contents(self, limit=10):
count = 0
for c in self.azure_cloud_storage:
count += 1
yield c
if count >= limit:
return
def send_one_content(self, obj_id):
obj_content = self.read_objstorage.get(obj_id)
self.azure_cloud_storage.add(content=obj_content, obj_id=obj_id)
def check_integrity(self, obj_id):
self.azure_cloud_storage.check(obj_id) # will raise if problem
def check_presence(self, obj_id):
return obj_id in self.azure_cloud_storage
def download(self, obj_id):
return self.azure_cloud_storage.get(obj_id)
@click.command()
def tryout():
obj_azure = AzureAccess()
hex_sample_id = "00000085c856b32f0709a4f5d669bb4faa3a0ce9"
sample_id = hashutil.hex_to_hash(hex_sample_id)
check_presence = obj_azure.check_presence(sample_id)
print("presence first time should be False:", check_presence)
obj_azure.send_one_content(sample_id)
check_presence = obj_azure.check_presence(sample_id)
print("presence True:", check_presence)
hex_sample_2 = "dfeffffeffff17b439f3e582813bd875e7141a0e"
sample_2 = hashutil.hex_to_hash(hex_sample_2)
check_presence = obj_azure.check_presence(sample_2)
print("presence False:", check_presence)
print()
print("Download a blob")
blob_content = obj_azure.download(sample_id)
print(blob_content)
print()
try:
not_found_hex_id = hex_sample_id.replace("0", "f")
not_found_id = hashutil.hash_to_hex(not_found_hex_id)
obj_azure.download(not_found_id)
except exc.ObjNotFoundError:
print("Expected `blob does not exist`!")
# print()
# print('blobs:')
# print(list(obj_azure.list_contents()))
# print()
# print('content of %s' % hex_sample_id)
# print(obj_azure.download(hex_sample_id))
obj_azure.check_integrity(sample_id)
if __name__ == "__main__":
tryout()

File Metadata

Mime Type
text/x-python
Expires
Tue, Apr 15, 2:21 AM (5 d, 21 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3238926

Event Timeline