Changeset View
Changeset View
Standalone View
Standalone View
swh/objstorage/backends/pathslicing.py
# Copyright (C) 2015-2019 The Software Heritage developers | # Copyright (C) 2015-2019 The Software Heritage developers | |||||||||||||||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | |||||||||||||||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | |||||||||||||||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | |||||||||||||||||
from collections.abc import Iterator | from collections.abc import Iterator | |||||||||||||||||
from contextlib import contextmanager | from contextlib import contextmanager | |||||||||||||||||
from itertools import islice | from itertools import islice | |||||||||||||||||
import os | import os | |||||||||||||||||
import random | import random | |||||||||||||||||
import tempfile | import tempfile | |||||||||||||||||
from typing import List | ||||||||||||||||||
from swh.model import hashutil | from swh.model import hashutil | |||||||||||||||||
from swh.objstorage.exc import Error, ObjNotFoundError | from swh.objstorage.exc import Error, ObjNotFoundError | |||||||||||||||||
from swh.objstorage.objstorage import ( | from swh.objstorage.objstorage import ( | |||||||||||||||||
DEFAULT_CHUNK_SIZE, | DEFAULT_CHUNK_SIZE, | |||||||||||||||||
DEFAULT_LIMIT, | DEFAULT_LIMIT, | |||||||||||||||||
ID_HASH_ALGO, | ID_HASH_ALGO, | |||||||||||||||||
ID_HASH_LENGTH, | ID_HASH_LENGTH, | |||||||||||||||||
ObjStorage, | ObjStorage, | |||||||||||||||||
compressors, | compressors, | |||||||||||||||||
compute_hash, | compute_hash, | |||||||||||||||||
decompressors, | decompressors, | |||||||||||||||||
) | ) | |||||||||||||||||
BUFSIZ = 1048576 | BUFSIZ = 1048576 | |||||||||||||||||
DIR_MODE = 0o755 | DIR_MODE = 0o755 | |||||||||||||||||
FILE_MODE = 0o644 | FILE_MODE = 0o644 | |||||||||||||||||
@contextmanager | class PathSlicer: | |||||||||||||||||
def _write_obj_file(hex_obj_id, objstorage): | def __init__(self, root: str, slicing: str): | |||||||||||||||||
""" Context manager for writing object files to the object storage. | self.root = root | |||||||||||||||||
# Make a list of tuples where each tuple contains the beginning | ||||||||||||||||||
During writing, data are written to a temporary file, which is atomically | # and the end of each slicing. | |||||||||||||||||
renamed to the right file name after closing. | try: | |||||||||||||||||
self.bounds = [ | ||||||||||||||||||
Usage sample: | slice(*map(lambda x: int(x) if x else None, sbounds.split(":"))) | |||||||||||||||||
vlorentz: easier to read | ||||||||||||||||||
douarddaAuthorUnsubmitted Done Inline Actionsagreed douardda: agreed | ||||||||||||||||||
with _write_obj_file(hex_obj_id, objstorage): | for sbounds in slicing.split("/") | |||||||||||||||||
f.write(obj_data) | if sbounds | |||||||||||||||||
] | ||||||||||||||||||
except TypeError: | ||||||||||||||||||
raise ValueError( | ||||||||||||||||||
"Invalid slicing declaration; " | ||||||||||||||||||
"it should be a of the form '<int>:<int>[/<int>:<int>]..." | ||||||||||||||||||
) | ||||||||||||||||||
Yields: | def check_config(self): | |||||||||||||||||
a file-like object open for writing bytes. | if len(self): | |||||||||||||||||
""" | max_char = max( | |||||||||||||||||
# Get the final paths and create the directory if absent. | max( | |||||||||||||||||
dir = objstorage._obj_dir(hex_obj_id) | bound.start if bound.start is not None else 0 | |||||||||||||||||
if not os.path.isdir(dir): | for bound in self.bounds | |||||||||||||||||
os.makedirs(dir, DIR_MODE, exist_ok=True) | ), | |||||||||||||||||
path = os.path.join(dir, hex_obj_id) | max( | |||||||||||||||||
bound.stop if bound.stop is not None else 0 for bound in self.bounds | ||||||||||||||||||
), | ||||||||||||||||||
) | ||||||||||||||||||
vlorentzUnsubmitted Not Done Inline Actions
more readable IMO vlorentz: more readable IMO | ||||||||||||||||||
douarddaAuthorUnsubmitted Done Inline Actionssure, why not douardda: sure, why not | ||||||||||||||||||
if ID_HASH_LENGTH < max_char: | ||||||||||||||||||
raise ValueError( | ||||||||||||||||||
"Algorithm %s has too short hash for slicing to char %d" | ||||||||||||||||||
% (ID_HASH_ALGO, max_char) | ||||||||||||||||||
) | ||||||||||||||||||
# Create a temporary file. | def get_directory(self, hex_obj_id: str) -> str: | |||||||||||||||||
(tmp, tmp_path) = tempfile.mkstemp(suffix=".tmp", prefix="hex_obj_id.", dir=dir) | """ Compute the storage directory of an object. | |||||||||||||||||
# Open the file and yield it for writing. | See also: PathSlicer::get_path | |||||||||||||||||
tmp_f = os.fdopen(tmp, "wb") | ||||||||||||||||||
yield tmp_f | ||||||||||||||||||
# Make sure the contents of the temporary file are written to disk | Args: | |||||||||||||||||
tmp_f.flush() | hex_obj_id: object id as hexlified string. | |||||||||||||||||
if objstorage.use_fdatasync: | ||||||||||||||||||
os.fdatasync(tmp) | ||||||||||||||||||
else: | ||||||||||||||||||
os.fsync(tmp) | ||||||||||||||||||
# Then close the temporary file and move it to the right path. | Returns: | |||||||||||||||||
tmp_f.close() | Path to the directory that contains the required object. | |||||||||||||||||
os.chmod(tmp_path, FILE_MODE) | """ | |||||||||||||||||
os.rename(tmp_path, path) | return os.path.join(self.root, *self.get_slices(hex_obj_id)) | |||||||||||||||||
def get_path(self, hex_obj_id: str) -> str: | ||||||||||||||||||
""" Compute the full path to an object into the current storage. | ||||||||||||||||||
def _read_obj_file(hex_obj_id, objstorage): | See also: PathSlicer::get_directory | |||||||||||||||||
""" Context manager for reading object file in the object storage. | ||||||||||||||||||
Usage sample: | Args: | |||||||||||||||||
with _read_obj_file(hex_obj_id, objstorage) as f: | hex_obj_id: object id as hexlified string. | |||||||||||||||||
b = f.read() | ||||||||||||||||||
Yields: | Returns: | |||||||||||||||||
a file-like object open for reading bytes. | Path to the actual object corresponding to the given id. | |||||||||||||||||
""" | """ | |||||||||||||||||
path = objstorage._obj_path(hex_obj_id) | return os.path.join(self.get_directory(hex_obj_id), hex_obj_id) | |||||||||||||||||
return open(path, "rb") | def get_slices(self, hex_obj_id: str) -> List[str]: | |||||||||||||||||
assert len(hex_obj_id) == ID_HASH_LENGTH | ||||||||||||||||||
return [hex_obj_id[bound] for bound in self.bounds] | ||||||||||||||||||
def __len__(self) -> int: | ||||||||||||||||||
"""Number of slices of the slicer""" | ||||||||||||||||||
return len(self.bounds) | ||||||||||||||||||
class PathSlicingObjStorage(ObjStorage): | class PathSlicingObjStorage(ObjStorage): | |||||||||||||||||
"""Implementation of the ObjStorage API based on the hash of the content. | """Implementation of the ObjStorage API based on the hash of the content. | |||||||||||||||||
On disk, an object storage is a directory tree containing files | On disk, an object storage is a directory tree containing files | |||||||||||||||||
named after their object IDs. An object ID is a checksum of its | named after their object IDs. An object ID is a checksum of its | |||||||||||||||||
content, depending on the value of the ID_HASH_ALGO constant (see | content, depending on the value of the ID_HASH_ALGO constant (see | |||||||||||||||||
swh.model.hashutil for its meaning). | swh.model.hashutil for its meaning). | |||||||||||||||||
To avoid directories that contain too many files, the object storage has a | To avoid directories that contain too many files, the object storage has a | |||||||||||||||||
given slicing. Each slicing correspond to a directory that is named | given slicing. Each slicing correspond to a directory that is named | |||||||||||||||||
according to the hash of its content. | according to the hash of its content. | |||||||||||||||||
So for instance a file with SHA1 34973274ccef6ab4dfaaf86599792fa9c3fe4689 | So for instance a file with SHA1 34973274ccef6ab4dfaaf86599792fa9c3fe4689 | |||||||||||||||||
will be stored in the given object storages : | will be stored in the given object storages : | |||||||||||||||||
- 0:2/2:4/4:6 : 34/97/32/34973274ccef6ab4dfaaf86599792fa9c3fe4689 | - 0:2/2:4/4:6 : 34/97/32/34973274ccef6ab4dfaaf86599792fa9c3fe4689 | |||||||||||||||||
- 0:1/0:5/ : 3/34973/34973274ccef6ab4dfaaf86599792fa9c3fe4689 | - 0:1/0:5/ : 3/34973/34973274ccef6ab4dfaaf86599792fa9c3fe4689 | |||||||||||||||||
The files in the storage are stored in gzipped compressed format. | The files in the storage are stored in gzipped compressed format. | |||||||||||||||||
Attributes: | Attributes: | |||||||||||||||||
root (string): path to the root directory of the storage on the disk. | root (string): path to the root directory of the storage on the disk. | |||||||||||||||||
bounds: list of tuples that indicates the beginning and the end of | slicer: the PathSlicer instance dealing with the path slicing logic. | |||||||||||||||||
each subdirectory for a content. | ||||||||||||||||||
""" | """ | |||||||||||||||||
def __init__(self, root, slicing, compression="gzip", **kwargs): | def __init__(self, root, slicing, compression="gzip", **kwargs): | |||||||||||||||||
""" Create an object to access a hash-slicing based object storage. | """ Create an object to access a hash-slicing based object storage. | |||||||||||||||||
Args: | Args: | |||||||||||||||||
root (string): path to the root directory of the storage on | root (string): path to the root directory of the storage on | |||||||||||||||||
the disk. | the disk. | |||||||||||||||||
slicing (string): string that indicates the slicing to perform | slicing (string): string that indicates the slicing to perform | |||||||||||||||||
on the hash of the content to know the path where it should | on the hash of the content to know the path where it should | |||||||||||||||||
be stored. | be stored. | |||||||||||||||||
""" | """ | |||||||||||||||||
olasdUnsubmitted Not Done Inline ActionsThese args should probably be moved to the main class docstring, the rendering is weird otherwise. olasd: These args should probably be moved to the main class docstring, the rendering is weird… | ||||||||||||||||||
douarddaAuthorUnsubmitted Done Inline Actionsah yes sure! this came to my mind a some point, but vanished before anything happened... douardda: ah yes sure! this came to my mind a some point, but vanished before anything happened... | ||||||||||||||||||
super().__init__(**kwargs) | super().__init__(**kwargs) | |||||||||||||||||
self.root = root | self.root = root | |||||||||||||||||
# Make a list of tuples where each tuple contains the beginning | self.slicer = PathSlicer(root, slicing) | |||||||||||||||||
# and the end of each slicing. | ||||||||||||||||||
self.bounds = [ | ||||||||||||||||||
slice(*map(int, sbounds.split(":"))) | ||||||||||||||||||
for sbounds in slicing.split("/") | ||||||||||||||||||
if sbounds | ||||||||||||||||||
] | ||||||||||||||||||
self.use_fdatasync = hasattr(os, "fdatasync") | self.use_fdatasync = hasattr(os, "fdatasync") | |||||||||||||||||
self.compression = compression | self.compression = compression | |||||||||||||||||
self.check_config(check_write=False) | self.check_config(check_write=False) | |||||||||||||||||
def check_config(self, *, check_write): | def check_config(self, *, check_write): | |||||||||||||||||
"""Check whether this object storage is properly configured""" | """Check whether this object storage is properly configured""" | |||||||||||||||||
root = self.root | self.slicer.check_config() | |||||||||||||||||
if not os.path.isdir(root): | ||||||||||||||||||
raise ValueError( | ||||||||||||||||||
'PathSlicingObjStorage root "%s" is not a directory' % root | ||||||||||||||||||
) | ||||||||||||||||||
max_endchar = max(map(lambda bound: bound.stop, self.bounds)) | if not os.path.isdir(self.root): | |||||||||||||||||
if ID_HASH_LENGTH < max_endchar: | ||||||||||||||||||
raise ValueError( | raise ValueError( | |||||||||||||||||
"Algorithm %s has too short hash for slicing to char %d" | 'PathSlicingObjStorage root "%s" is not a directory' % self.root | |||||||||||||||||
% (ID_HASH_ALGO, max_endchar) | ||||||||||||||||||
) | ) | |||||||||||||||||
if check_write: | if check_write: | |||||||||||||||||
if not os.access(self.root, os.W_OK): | if not os.access(self.root, os.W_OK): | |||||||||||||||||
raise PermissionError( | raise PermissionError( | |||||||||||||||||
'PathSlicingObjStorage root "%s" is not writable' % root | 'PathSlicingObjStorage root "%s" is not writable' % self.root | |||||||||||||||||
) | ) | |||||||||||||||||
if self.compression not in compressors: | if self.compression not in compressors: | |||||||||||||||||
raise ValueError( | raise ValueError( | |||||||||||||||||
'Unknown compression algorithm "%s" for ' | 'Unknown compression algorithm "%s" for ' | |||||||||||||||||
"PathSlicingObjStorage" % self.compression | "PathSlicingObjStorage" % self.compression | |||||||||||||||||
) | ) | |||||||||||||||||
return True | return True | |||||||||||||||||
def __contains__(self, obj_id): | def __contains__(self, obj_id): | |||||||||||||||||
hex_obj_id = hashutil.hash_to_hex(obj_id) | hex_obj_id = hashutil.hash_to_hex(obj_id) | |||||||||||||||||
return os.path.isfile(self._obj_path(hex_obj_id)) | return os.path.isfile(self.slicer.get_path(hex_obj_id)) | |||||||||||||||||
def __iter__(self): | def __iter__(self): | |||||||||||||||||
"""Iterate over the object identifiers currently available in the | """Iterate over the object identifiers currently available in the | |||||||||||||||||
storage. | storage. | |||||||||||||||||
Warning: with the current implementation of the object | Warning: with the current implementation of the object | |||||||||||||||||
storage, this method will walk the filesystem to list objects, | storage, this method will walk the filesystem to list objects, | |||||||||||||||||
meaning that listing all objects will be very slow for large | meaning that listing all objects will be very slow for large | |||||||||||||||||
Show All 21 Lines | def __len__(self): | |||||||||||||||||
Warning: this currently uses `__iter__`, its warning about bad | Warning: this currently uses `__iter__`, its warning about bad | |||||||||||||||||
performances applies | performances applies | |||||||||||||||||
Return: | Return: | |||||||||||||||||
number of objects contained in the storage | number of objects contained in the storage | |||||||||||||||||
""" | """ | |||||||||||||||||
return sum(1 for i in self) | return sum(1 for i in self) | |||||||||||||||||
def _obj_dir(self, hex_obj_id): | ||||||||||||||||||
""" Compute the storage directory of an object. | ||||||||||||||||||
See also: PathSlicingObjStorage::_obj_path | ||||||||||||||||||
Args: | ||||||||||||||||||
hex_obj_id: object id as hexlified string. | ||||||||||||||||||
Returns: | ||||||||||||||||||
Path to the directory that contains the required object. | ||||||||||||||||||
""" | ||||||||||||||||||
slices = [hex_obj_id[bound] for bound in self.bounds] | ||||||||||||||||||
return os.path.join(self.root, *slices) | ||||||||||||||||||
def _obj_path(self, hex_obj_id): | ||||||||||||||||||
""" Compute the full path to an object into the current storage. | ||||||||||||||||||
See also: PathSlicingObjStorage::_obj_dir | ||||||||||||||||||
Args: | ||||||||||||||||||
hex_obj_id: object id as hexlified string. | ||||||||||||||||||
Returns: | ||||||||||||||||||
Path to the actual object corresponding to the given id. | ||||||||||||||||||
""" | ||||||||||||||||||
return os.path.join(self._obj_dir(hex_obj_id), hex_obj_id) | ||||||||||||||||||
def add(self, content, obj_id=None, check_presence=True): | def add(self, content, obj_id=None, check_presence=True): | |||||||||||||||||
if obj_id is None: | if obj_id is None: | |||||||||||||||||
obj_id = compute_hash(content) | obj_id = compute_hash(content) | |||||||||||||||||
if check_presence and obj_id in self: | if check_presence and obj_id in self: | |||||||||||||||||
# If the object is already present, return immediately. | # If the object is already present, return immediately. | |||||||||||||||||
return obj_id | return obj_id | |||||||||||||||||
hex_obj_id = hashutil.hash_to_hex(obj_id) | hex_obj_id = hashutil.hash_to_hex(obj_id) | |||||||||||||||||
if not isinstance(content, Iterator): | if not isinstance(content, Iterator): | |||||||||||||||||
content = [content] | content = [content] | |||||||||||||||||
compressor = compressors[self.compression]() | compressor = compressors[self.compression]() | |||||||||||||||||
with _write_obj_file(hex_obj_id, self) as f: | with self._write_obj_file(hex_obj_id) as f: | |||||||||||||||||
for chunk in content: | for chunk in content: | |||||||||||||||||
f.write(compressor.compress(chunk)) | f.write(compressor.compress(chunk)) | |||||||||||||||||
f.write(compressor.flush()) | f.write(compressor.flush()) | |||||||||||||||||
return obj_id | return obj_id | |||||||||||||||||
def get(self, obj_id): | def get(self, obj_id): | |||||||||||||||||
if obj_id not in self: | if obj_id not in self: | |||||||||||||||||
raise ObjNotFoundError(obj_id) | raise ObjNotFoundError(obj_id) | |||||||||||||||||
# Open the file and return its content as bytes | # Open the file and return its content as bytes | |||||||||||||||||
hex_obj_id = hashutil.hash_to_hex(obj_id) | hex_obj_id = hashutil.hash_to_hex(obj_id) | |||||||||||||||||
d = decompressors[self.compression]() | d = decompressors[self.compression]() | |||||||||||||||||
with _read_obj_file(hex_obj_id, self) as f: | with open(self.slicer.get_path(hex_obj_id), "rb") as f: | |||||||||||||||||
out = d.decompress(f.read()) | out = d.decompress(f.read()) | |||||||||||||||||
if d.unused_data: | if d.unused_data: | |||||||||||||||||
raise Error("Corrupt object %s: trailing data found" % hex_obj_id,) | raise Error("Corrupt object %s: trailing data found" % hex_obj_id,) | |||||||||||||||||
return out | return out | |||||||||||||||||
def check(self, obj_id): | def check(self, obj_id): | |||||||||||||||||
try: | try: | |||||||||||||||||
Show All 17 Lines | class PathSlicingObjStorage(ObjStorage): | |||||||||||||||||
def delete(self, obj_id): | def delete(self, obj_id): | |||||||||||||||||
super().delete(obj_id) # Check delete permission | super().delete(obj_id) # Check delete permission | |||||||||||||||||
if obj_id not in self: | if obj_id not in self: | |||||||||||||||||
raise ObjNotFoundError(obj_id) | raise ObjNotFoundError(obj_id) | |||||||||||||||||
hex_obj_id = hashutil.hash_to_hex(obj_id) | hex_obj_id = hashutil.hash_to_hex(obj_id) | |||||||||||||||||
try: | try: | |||||||||||||||||
os.remove(self._obj_path(hex_obj_id)) | os.remove(self.slicer.get_path(hex_obj_id)) | |||||||||||||||||
except FileNotFoundError: | except FileNotFoundError: | |||||||||||||||||
raise ObjNotFoundError(obj_id) | raise ObjNotFoundError(obj_id) | |||||||||||||||||
return True | return True | |||||||||||||||||
# Management methods | # Management methods | |||||||||||||||||
def get_random(self, batch_size): | def get_random(self, batch_size): | |||||||||||||||||
def get_random_content(self, batch_size): | def get_random_content(self, batch_size): | |||||||||||||||||
""" Get a batch of content inside a single directory. | """ Get a batch of content inside a single directory. | |||||||||||||||||
Returns: | Returns: | |||||||||||||||||
a tuple (batch size, batch). | a tuple (batch size, batch). | |||||||||||||||||
""" | """ | |||||||||||||||||
dirs = [] | dirs = [] | |||||||||||||||||
for level in range(len(self.bounds)): | for level in range(len(self.slicer)): | |||||||||||||||||
path = os.path.join(self.root, *dirs) | path = os.path.join(self.root, *dirs) | |||||||||||||||||
dir_list = next(os.walk(path))[1] | dir_list = next(os.walk(path))[1] | |||||||||||||||||
if "tmp" in dir_list: | if "tmp" in dir_list: | |||||||||||||||||
dir_list.remove("tmp") | dir_list.remove("tmp") | |||||||||||||||||
dirs.append(random.choice(dir_list)) | dirs.append(random.choice(dir_list)) | |||||||||||||||||
path = os.path.join(self.root, *dirs) | path = os.path.join(self.root, *dirs) | |||||||||||||||||
content_list = next(os.walk(path))[2] | content_list = next(os.walk(path))[2] | |||||||||||||||||
Show All 9 Lines | def get_random(self, batch_size): | |||||||||||||||||
yield from it | yield from it | |||||||||||||||||
# Streaming methods | # Streaming methods | |||||||||||||||||
@contextmanager | @contextmanager | |||||||||||||||||
def chunk_writer(self, obj_id): | def chunk_writer(self, obj_id): | |||||||||||||||||
hex_obj_id = hashutil.hash_to_hex(obj_id) | hex_obj_id = hashutil.hash_to_hex(obj_id) | |||||||||||||||||
compressor = compressors[self.compression]() | compressor = compressors[self.compression]() | |||||||||||||||||
with _write_obj_file(hex_obj_id, self) as f: | with self._write_obj_file(hex_obj_id) as f: | |||||||||||||||||
yield lambda c: f.write(compressor.compress(c)) | yield lambda c: f.write(compressor.compress(c)) | |||||||||||||||||
f.write(compressor.flush()) | f.write(compressor.flush()) | |||||||||||||||||
def add_stream(self, content_iter, obj_id, check_presence=True): | def add_stream(self, content_iter, obj_id, check_presence=True): | |||||||||||||||||
if check_presence and obj_id in self: | if check_presence and obj_id in self: | |||||||||||||||||
return obj_id | return obj_id | |||||||||||||||||
with self.chunk_writer(obj_id) as writer: | with self.chunk_writer(obj_id) as writer: | |||||||||||||||||
for chunk in content_iter: | for chunk in content_iter: | |||||||||||||||||
writer(chunk) | writer(chunk) | |||||||||||||||||
return obj_id | return obj_id | |||||||||||||||||
def get_stream(self, obj_id, chunk_size=DEFAULT_CHUNK_SIZE): | def get_stream(self, obj_id, chunk_size=DEFAULT_CHUNK_SIZE): | |||||||||||||||||
if obj_id not in self: | if obj_id not in self: | |||||||||||||||||
raise ObjNotFoundError(obj_id) | raise ObjNotFoundError(obj_id) | |||||||||||||||||
hex_obj_id = hashutil.hash_to_hex(obj_id) | hex_obj_id = hashutil.hash_to_hex(obj_id) | |||||||||||||||||
decompressor = decompressors[self.compression]() | decompressor = decompressors[self.compression]() | |||||||||||||||||
with _read_obj_file(hex_obj_id, self) as f: | with open(self.slicer.get_path(hex_obj_id), "rb") as f: | |||||||||||||||||
while True: | while True: | |||||||||||||||||
raw = f.read(chunk_size) | raw = f.read(chunk_size) | |||||||||||||||||
if not raw: | if not raw: | |||||||||||||||||
break | break | |||||||||||||||||
r = decompressor.decompress(raw) | r = decompressor.decompress(raw) | |||||||||||||||||
if not r: | if not r: | |||||||||||||||||
continue | continue | |||||||||||||||||
yield r | yield r | |||||||||||||||||
def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT): | def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT): | |||||||||||||||||
if last_obj_id: | if last_obj_id: | |||||||||||||||||
it = self.iter_from(last_obj_id) | it = self.iter_from(last_obj_id) | |||||||||||||||||
else: | else: | |||||||||||||||||
it = iter(self) | it = iter(self) | |||||||||||||||||
return islice(it, limit) | return islice(it, limit) | |||||||||||||||||
def iter_from(self, obj_id, n_leaf=False): | def iter_from(self, obj_id, n_leaf=False): | |||||||||||||||||
hex_obj_id = hashutil.hash_to_hex(obj_id) | hex_obj_id = hashutil.hash_to_hex(obj_id) | |||||||||||||||||
slices = [hex_obj_id[bound] for bound in self.bounds] | slices = self.slicer.get_slices(hex_obj_id) | |||||||||||||||||
rlen = len(self.root.split("/")) | rlen = len(self.root.split("/")) | |||||||||||||||||
i = 0 | i = 0 | |||||||||||||||||
for root, dirs, files in os.walk(self.root): | for root, dirs, files in os.walk(self.root): | |||||||||||||||||
if not dirs: | if not dirs: | |||||||||||||||||
i += 1 | i += 1 | |||||||||||||||||
level = len(root.split("/")) - rlen | level = len(root.split("/")) - rlen | |||||||||||||||||
dirs.sort() | dirs.sort() | |||||||||||||||||
if dirs and root == os.path.join(self.root, *slices[:level]): | if dirs and root == os.path.join(self.root, *slices[:level]): | |||||||||||||||||
cslice = slices[level] | cslice = slices[level] | |||||||||||||||||
for d in dirs[:]: | for d in dirs[:]: | |||||||||||||||||
if d < cslice: | if d < cslice: | |||||||||||||||||
dirs.remove(d) | dirs.remove(d) | |||||||||||||||||
for f in sorted(files): | for f in sorted(files): | |||||||||||||||||
if f > hex_obj_id: | if f > hex_obj_id: | |||||||||||||||||
yield bytes.fromhex(f) | yield bytes.fromhex(f) | |||||||||||||||||
if n_leaf: | if n_leaf: | |||||||||||||||||
yield i | yield i | |||||||||||||||||
@contextmanager | ||||||||||||||||||
def _write_obj_file(self, hex_obj_id): | ||||||||||||||||||
""" Context manager for writing object files to the object storage. | ||||||||||||||||||
During writing, data are written to a temporary file, which is atomically | ||||||||||||||||||
renamed to the right file name after closing. | ||||||||||||||||||
Usage sample: | ||||||||||||||||||
with objstorage._write_obj_file(hex_obj_id): | ||||||||||||||||||
f.write(obj_data) | ||||||||||||||||||
Yields: | ||||||||||||||||||
a file-like object open for writing bytes. | ||||||||||||||||||
""" | ||||||||||||||||||
# Get the final paths and create the directory if absent. | ||||||||||||||||||
dir = self.slicer.get_directory(hex_obj_id) | ||||||||||||||||||
if not os.path.isdir(dir): | ||||||||||||||||||
os.makedirs(dir, DIR_MODE, exist_ok=True) | ||||||||||||||||||
path = os.path.join(dir, hex_obj_id) | ||||||||||||||||||
# Create a temporary file. | ||||||||||||||||||
(tmp, tmp_path) = tempfile.mkstemp(suffix=".tmp", prefix="hex_obj_id.", dir=dir) | ||||||||||||||||||
# Open the file and yield it for writing. | ||||||||||||||||||
tmp_f = os.fdopen(tmp, "wb") | ||||||||||||||||||
yield tmp_f | ||||||||||||||||||
# Make sure the contents of the temporary file are written to disk | ||||||||||||||||||
tmp_f.flush() | ||||||||||||||||||
if self.use_fdatasync: | ||||||||||||||||||
os.fdatasync(tmp) | ||||||||||||||||||
else: | ||||||||||||||||||
os.fsync(tmp) | ||||||||||||||||||
# Then close the temporary file and move it to the right path. | ||||||||||||||||||
tmp_f.close() | ||||||||||||||||||
os.chmod(tmp_path, FILE_MODE) | ||||||||||||||||||
os.rename(tmp_path, path) |
easier to read