Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7437618
swh-objstorage-azure
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
3 KB
Subscribers
None
swh-objstorage-azure
View Options
#!/usr/bin/env python3
# Copyright (C) 2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
# NOT FOR PRODUCTION
import
click
from
swh.core
import
config
,
hashutil
from
swh.objstorage
import
exc
,
get_objstorage
class
AzureAccess
(
config
.
SWHConfig
):
"""This is an orchestration class to try and check objstorage_azure
implementation."""
DEFAULT_CONFIG
=
{
# Output storage
"storage_azure"
:
(
"dict"
,
{
"cls"
:
"pathslicing"
,
"args"
:
{
"root"
:
"/srv/softwareheritage/objects"
,
"slicing"
:
"0:2/2:4/4:6"
,
},
},
),
# Input storage
"storage_local"
:
(
"dict"
,
{
"cls"
:
"pathslicing"
,
"args"
:
{
"root"
:
"/srv/softwareheritage/objects"
,
"slicing"
:
"0:2/2:4/4:6"
,
},
},
),
}
CONFIG_BASE_FILENAME
=
"objstorage/azure"
def
__init__
(
self
):
super
()
.
__init__
()
self
.
config
=
self
.
parse_config_file
()
self
.
azure_cloud_storage
=
get_objstorage
(
**
self
.
config
[
"storage_azure"
])
self
.
read_objstorage
=
get_objstorage
(
**
self
.
config
[
"storage_local"
])
def
list_contents
(
self
,
limit
=
10
):
count
=
0
for
c
in
self
.
azure_cloud_storage
:
count
+=
1
yield
c
if
count
>=
limit
:
return
def
send_one_content
(
self
,
obj_id
):
obj_content
=
self
.
read_objstorage
.
get
(
obj_id
)
self
.
azure_cloud_storage
.
add
(
content
=
obj_content
,
obj_id
=
obj_id
)
def
check_integrity
(
self
,
obj_id
):
self
.
azure_cloud_storage
.
check
(
obj_id
)
# will raise if problem
def
check_presence
(
self
,
obj_id
):
return
obj_id
in
self
.
azure_cloud_storage
def
download
(
self
,
obj_id
):
return
self
.
azure_cloud_storage
.
get
(
obj_id
)
@click
.
command
()
def
tryout
():
obj_azure
=
AzureAccess
()
hex_sample_id
=
"00000085c856b32f0709a4f5d669bb4faa3a0ce9"
sample_id
=
hashutil
.
hex_to_hash
(
hex_sample_id
)
check_presence
=
obj_azure
.
check_presence
(
sample_id
)
print
(
"presence first time should be False:"
,
check_presence
)
obj_azure
.
send_one_content
(
sample_id
)
check_presence
=
obj_azure
.
check_presence
(
sample_id
)
print
(
"presence True:"
,
check_presence
)
hex_sample_2
=
"dfeffffeffff17b439f3e582813bd875e7141a0e"
sample_2
=
hashutil
.
hex_to_hash
(
hex_sample_2
)
check_presence
=
obj_azure
.
check_presence
(
sample_2
)
print
(
"presence False:"
,
check_presence
)
print
()
print
(
"Download a blob"
)
blob_content
=
obj_azure
.
download
(
sample_id
)
print
(
blob_content
)
print
()
try
:
not_found_hex_id
=
hex_sample_id
.
replace
(
"0"
,
"f"
)
not_found_id
=
hashutil
.
hash_to_hex
(
not_found_hex_id
)
obj_azure
.
download
(
not_found_id
)
except
exc
.
ObjNotFoundError
:
print
(
"Expected `blob does not exist`!"
)
# print()
# print('blobs:')
# print(list(obj_azure.list_contents()))
# print()
# print('content of %s' % hex_sample_id)
# print(obj_azure.download(hex_sample_id))
obj_azure
.
check_integrity
(
sample_id
)
if
__name__
==
"__main__"
:
tryout
()
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Tue, Apr 15, 2:21 AM (5 d, 21 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3238926
Attached To
rDOBJS Object storage
Event Timeline
Log In to Comment