diff --git a/swh/objstorage/api/client.py b/swh/objstorage/api/client.py index cd767f9..a816493 100644 --- a/swh/objstorage/api/client.py +++ b/swh/objstorage/api/client.py @@ -1,56 +1,43 @@ # Copyright (C) 2015-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from swh.core.api import RPCClient from swh.core.utils import iter_chunks from swh.model import hashutil from swh.objstorage.exc import Error, ObjNotFoundError, ObjStorageAPIError from swh.objstorage.interface import ObjStorageInterface -from swh.objstorage.objstorage import ( - DEFAULT_CHUNK_SIZE, - DEFAULT_LIMIT, - ID_DIGEST_LENGTH, -) +from swh.objstorage.objstorage import DEFAULT_LIMIT, ID_DIGEST_LENGTH class RemoteObjStorage(RPCClient): """Proxy to a remote object storage. This class allows to connect to an object storage server via http protocol. Attributes: url (string): The url of the server to connect. Must end with a '/' session: The session to send requests. """ api_exception = ObjStorageAPIError reraise_exceptions = [ObjNotFoundError, Error] backend_class = ObjStorageInterface def restore(self, content, obj_id=None): return self.add(content, obj_id, check_presence=False) - def add_stream(self, content_iter, obj_id, check_presence=True): - raise NotImplementedError - - def get_stream(self, obj_id, chunk_size=DEFAULT_CHUNK_SIZE): - obj_id = hashutil.hash_to_hex(obj_id) - return self._get_stream( - "content/get_stream/{}".format(obj_id), chunk_size=chunk_size - ) - def __iter__(self): yield from self.list_content() def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT): params = {"limit": limit} if last_obj_id: params["last_obj_id"] = hashutil.hash_to_hex(last_obj_id) yield from iter_chunks( self._get_stream("content", params=params), chunk_size=ID_DIGEST_LENGTH ) diff --git a/swh/objstorage/api/server.py b/swh/objstorage/api/server.py index 3a00106..20d81e2 100644 --- a/swh/objstorage/api/server.py +++ b/swh/objstorage/api/server.py @@ -1,204 +1,189 @@ # Copyright (C) 2015-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import contextlib import functools import logging import os from flask import request from swh.core.api import RPCServerApp from swh.core.api import encode_data_server as encode_data from swh.core.api import error_handler from swh.core.config import read as config_read from swh.core.statsd import statsd -from swh.model import hashutil from swh.objstorage.exc import Error, ObjNotFoundError from swh.objstorage.factory import get_objstorage as get_swhobjstorage from swh.objstorage.interface import ObjStorageInterface from swh.objstorage.objstorage import DEFAULT_LIMIT def timed(f): @functools.wraps(f) def w(*a, **kw): with statsd.timed( "swh_objstorage_request_duration_seconds", tags={"endpoint": f.__name__} ): return f(*a, **kw) return w @contextlib.contextmanager def timed_context(f_name): with statsd.timed( "swh_objstorage_request_duration_seconds", tags={"endpoint": f_name} ): yield def get_objstorage(): global objstorage if objstorage is None: objstorage = get_swhobjstorage(**app.config["objstorage"]) return objstorage class ObjStorageServerApp(RPCServerApp): client_exception_classes = (ObjNotFoundError, Error) method_decorators = [timed] def pre_add(self, kw): """Called before the 'add' method.""" statsd.increment( "swh_objstorage_in_bytes_total", len(kw["content"]), tags={"endpoint": "add_bytes"}, ) def post_get(self, ret, kw): """Called after the 'get' method.""" statsd.increment( "swh_objstorage_out_bytes_total", len(ret), tags={"endpoint": "get_bytes"} ) app = ObjStorageServerApp( __name__, backend_class=ObjStorageInterface, backend_factory=get_objstorage, ) objstorage = None @app.errorhandler(Error) def argument_error_handler(exception): return error_handler(exception, encode_data, status_code=400) @app.errorhandler(Exception) def my_error_handler(exception): return error_handler(exception, encode_data) @app.route("/") @timed def index(): return "SWH Objstorage API server" -# Streaming methods - - -@app.route("/content/get_stream/") -def get_stream(hex_id): - obj_id = hashutil.hash_to_bytes(hex_id) - - def generate(): - with timed_context("get_stream"): - yield from get_objstorage().get_stream(obj_id, 2 << 20) - - return app.response_class(generate()) - - @app.route("/content") def list_content(): last_obj_id = request.args.get("last_obj_id") if last_obj_id: last_obj_id = bytes.fromhex(last_obj_id) limit = int(request.args.get("limit", DEFAULT_LIMIT)) def generate(): - with timed_context("get_stream"): + with timed_context("list_content"): yield from get_objstorage().list_content(last_obj_id, limit=limit) return app.response_class(generate()) api_cfg = None def load_and_check_config(config_file): """Check the minimal configuration is set to run the api or raise an error explanation. Args: config_file (str): Path to the configuration file to load Raises: Error if the setup is not as expected Returns: configuration as a dict """ if not config_file: raise EnvironmentError("Configuration file must be defined") if not os.path.exists(config_file): raise FileNotFoundError("Configuration file %s does not exist" % (config_file,)) cfg = config_read(config_file) return validate_config(cfg) def validate_config(cfg): """Check the minimal configuration is set to run the api or raise an explanatory error. Args: cfg (dict): Loaded configuration. Raises: Error if the setup is not as expected Returns: configuration as a dict """ if "objstorage" not in cfg: raise KeyError("Invalid configuration; missing objstorage config entry") missing_keys = [] vcfg = cfg["objstorage"] if "cls" not in vcfg: raise KeyError("Invalid configuration; missing cls config entry") cls = vcfg["cls"] if cls == "pathslicing": # Backwards-compatibility: either get the deprecated `args` from the # objstorage config, or use the full config itself to check for keys args = vcfg.get("args", vcfg) for key in ("root", "slicing"): v = args.get(key) if v is None: missing_keys.append(key) if missing_keys: raise KeyError( "Invalid configuration; missing %s config entry" % (", ".join(missing_keys),) ) return cfg def make_app_from_configfile(): """Load configuration and then build application to run""" global api_cfg if not api_cfg: config_path = os.environ.get("SWH_CONFIG_FILENAME") api_cfg = load_and_check_config(config_path) app.config.update(api_cfg) handler = logging.StreamHandler() app.logger.addHandler(handler) return app if __name__ == "__main__": print("Deprecated. Use swh-objstorage") diff --git a/swh/objstorage/backends/generator.py b/swh/objstorage/backends/generator.py index 3a0cca9..7d8b9db 100644 --- a/swh/objstorage/backends/generator.py +++ b/swh/objstorage/backends/generator.py @@ -1,226 +1,219 @@ -import functools -import io from itertools import count, islice, repeat import logging import random -from swh.objstorage.objstorage import DEFAULT_CHUNK_SIZE, DEFAULT_LIMIT, ObjStorage +from swh.objstorage.objstorage import DEFAULT_LIMIT, ObjStorage logger = logging.getLogger(__name__) class Randomizer: def __init__(self): self.size = 0 self.read(1024) # create a not-so-small initial buffer def read(self, size): if size > self.size: with open("/dev/urandom", "rb") as fobj: self.data = fobj.read(2 * size) self.size = len(self.data) # pick a random subset of our existing buffer idx = random.randint(0, self.size - size - 1) return self.data[idx : idx + size] def gen_sizes(): """generates numbers according to the rought distribution of file size in the SWH archive """ # these are the histogram bounds of the pg content.length column bounds = [ 0, 2, 72, 119, 165, 208, 256, 300, 345, 383, 429, 474, 521, 572, 618, 676, 726, 779, 830, 879, 931, 992, 1054, 1119, 1183, 1244, 1302, 1370, 1437, 1504, 1576, 1652, 1725, 1806, 1883, 1968, 2045, 2133, 2236, 2338, 2433, 2552, 2659, 2774, 2905, 3049, 3190, 3322, 3489, 3667, 3834, 4013, 4217, 4361, 4562, 4779, 5008, 5233, 5502, 5788, 6088, 6396, 6728, 7094, 7457, 7835, 8244, 8758, 9233, 9757, 10313, 10981, 11693, 12391, 13237, 14048, 14932, 15846, 16842, 18051, 19487, 20949, 22595, 24337, 26590, 28840, 31604, 34653, 37982, 41964, 46260, 51808, 58561, 66584, 78645, 95743, 122883, 167016, 236108, 421057, 1047367, 55056238, ] nbounds = len(bounds) for i in count(): idx = random.randint(1, nbounds - 1) lower = bounds[idx - 1] upper = bounds[idx] yield random.randint(lower, upper - 1) def gen_random_content(total=None, filesize=None): """generates random (file) content which sizes roughly follows the SWH archive file size distribution (by default). Args: total (int): the total number of objects to generate. Infinite if unset. filesize (int): generate objects with fixed size instead of random ones. """ randomizer = Randomizer() if filesize: gen = repeat(filesize) else: gen = gen_sizes() if total: gen = islice(gen, total) for objsize in gen: yield randomizer.read(objsize) class RandomGeneratorObjStorage(ObjStorage): """A stupid read-only storage that generates blobs for testing purpose.""" def __init__(self, filesize=None, total=None, **kwargs): super().__init__() if filesize: filesize = int(filesize) self.filesize = filesize if total: total = int(total) self.total = total self._content_generator = None @property def content_generator(self): if self._content_generator is None: self._content_generator = gen_random_content(self.total, self.filesize) return self._content_generator def check_config(self, *, check_write): return True def __contains__(self, obj_id, *args, **kwargs): return False def __iter__(self): i = 1 while True: j = yield (b"%d" % i) if self.total and i >= self.total: logger.debug("DONE") break if j is not None: i = j else: i += 1 def get(self, obj_id, *args, **kwargs): return next(self.content_generator) def add(self, content, obj_id=None, check_presence=True, *args, **kwargs): pass def check(self, obj_id, *args, **kwargs): return True def delete(self, obj_id, *args, **kwargs): return True - def get_stream(self, obj_id, chunk_size=DEFAULT_CHUNK_SIZE): - data = io.BytesIO(next(self.content_generator)) - reader = functools.partial(data.read, chunk_size) - yield from iter(reader, b"") - def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT): it = iter(self) if last_obj_id: next(it) it.send(int(last_obj_id)) return islice(it, limit) diff --git a/swh/objstorage/backends/in_memory.py b/swh/objstorage/backends/in_memory.py index 5a26c75..e102c9b 100644 --- a/swh/objstorage/backends/in_memory.py +++ b/swh/objstorage/backends/in_memory.py @@ -1,71 +1,60 @@ # Copyright (C) 2017 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -import functools -import io - from swh.objstorage.exc import Error, ObjNotFoundError -from swh.objstorage.objstorage import DEFAULT_CHUNK_SIZE, ObjStorage, compute_hash +from swh.objstorage.objstorage import ObjStorage, compute_hash class InMemoryObjStorage(ObjStorage): """In-Memory objstorage. Intended for test purposes. """ def __init__(self, **args): super().__init__() self.state = {} def check_config(self, *, check_write): return True def __contains__(self, obj_id): return obj_id in self.state def __iter__(self): return iter(sorted(self.state)) def add(self, content, obj_id=None, check_presence=True): if obj_id is None: obj_id = compute_hash(content) if check_presence and obj_id in self: return obj_id self.state[obj_id] = content return obj_id def get(self, obj_id): if obj_id not in self: raise ObjNotFoundError(obj_id) return self.state[obj_id] def check(self, obj_id): if obj_id not in self: raise ObjNotFoundError(obj_id) if compute_hash(self.state[obj_id]) != obj_id: raise Error("Corrupt object %s" % obj_id) return True def delete(self, obj_id): super().delete(obj_id) # Check delete permission if obj_id not in self: raise ObjNotFoundError(obj_id) self.state.pop(obj_id) return True - - def get_stream(self, obj_id, chunk_size=DEFAULT_CHUNK_SIZE): - if obj_id not in self: - raise ObjNotFoundError(obj_id) - - data = io.BytesIO(self.state[obj_id]) - reader = functools.partial(data.read, chunk_size) - yield from iter(reader, b"") diff --git a/swh/objstorage/backends/pathslicing.py b/swh/objstorage/backends/pathslicing.py index a49cfa5..df1ac04 100644 --- a/swh/objstorage/backends/pathslicing.py +++ b/swh/objstorage/backends/pathslicing.py @@ -1,449 +1,406 @@ # Copyright (C) 2015-2019 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections.abc import Iterator from contextlib import contextmanager from itertools import islice import os import random import tempfile from typing import List from swh.model import hashutil from swh.objstorage.exc import Error, ObjNotFoundError from swh.objstorage.objstorage import ( - DEFAULT_CHUNK_SIZE, DEFAULT_LIMIT, ID_HASH_ALGO, ID_HEXDIGEST_LENGTH, ObjStorage, compressors, compute_hash, decompressors, ) BUFSIZ = 1048576 DIR_MODE = 0o755 FILE_MODE = 0o644 class PathSlicer: """Helper class to compute a path based on a hash. Used to compute a directory path based on the object hash according to a given slicing. Each slicing correspond to a directory that is named according to the hash of its content. For instance a file with SHA1 34973274ccef6ab4dfaaf86599792fa9c3fe4689 will have the following computed path: - 0:2/2:4/4:6 : 34/97/32/34973274ccef6ab4dfaaf86599792fa9c3fe4689 - 0:1/0:5/ : 3/34973/34973274ccef6ab4dfaaf86599792fa9c3fe4689 Args: root (str): path to the root directory of the storage on the disk. slicing (str): the slicing configuration. """ def __init__(self, root: str, slicing: str): self.root = root # Make a list of tuples where each tuple contains the beginning # and the end of each slicing. try: self.bounds = [ slice(*(int(x) if x else None for x in sbounds.split(":"))) for sbounds in slicing.split("/") if sbounds ] except TypeError: raise ValueError( "Invalid slicing declaration; " "it should be a of the form ':[/:]..." ) def check_config(self): """Check the slicing configuration is valid. Raises: ValueError: if the slicing configuration is invalid. """ if len(self): max_char = max( max(bound.start or 0, bound.stop or 0) for bound in self.bounds ) if ID_HEXDIGEST_LENGTH < max_char: raise ValueError( "Algorithm %s has too short hash for slicing to char %d" % (ID_HASH_ALGO, max_char) ) def get_directory(self, hex_obj_id: str) -> str: """Compute the storage directory of an object. See also: PathSlicer::get_path Args: hex_obj_id: object id as hexlified string. Returns: Absolute path (including root) to the directory that contains the given object id. """ return os.path.join(self.root, *self.get_slices(hex_obj_id)) def get_path(self, hex_obj_id: str) -> str: """Compute the full path to an object into the current storage. See also: PathSlicer::get_directory Args: hex_obj_id(str): object id as hexlified string. Returns: Absolute path (including root) to the object corresponding to the given object id. """ return os.path.join(self.get_directory(hex_obj_id), hex_obj_id) def get_slices(self, hex_obj_id: str) -> List[str]: """Compute the path elements for the given hash. Args: hex_obj_id(str): object id as hexlified string. Returns: Relative path to the actual object corresponding to the given id as a list. """ assert len(hex_obj_id) == ID_HEXDIGEST_LENGTH return [hex_obj_id[bound] for bound in self.bounds] def __len__(self) -> int: """Number of slices of the slicer""" return len(self.bounds) class PathSlicingObjStorage(ObjStorage): """Implementation of the ObjStorage API based on the hash of the content. On disk, an object storage is a directory tree containing files named after their object IDs. An object ID is a checksum of its content, depending on the value of the ID_HASH_ALGO constant (see swh.model.hashutil for its meaning). To avoid directories that contain too many files, the object storage has a given slicing. Each slicing correspond to a directory that is named according to the hash of its content. So for instance a file with SHA1 34973274ccef6ab4dfaaf86599792fa9c3fe4689 will be stored in the given object storages : - 0:2/2:4/4:6 : 34/97/32/34973274ccef6ab4dfaaf86599792fa9c3fe4689 - 0:1/0:5/ : 3/34973/34973274ccef6ab4dfaaf86599792fa9c3fe4689 The files in the storage are stored in gzipped compressed format. Args: root (str): path to the root directory of the storage on the disk. slicing (str): string that indicates the slicing to perform on the hash of the content to know the path where it should be stored (see the documentation of the PathSlicer class). """ def __init__(self, root, slicing, compression="gzip", **kwargs): super().__init__(**kwargs) self.root = root self.slicer = PathSlicer(root, slicing) self.use_fdatasync = hasattr(os, "fdatasync") self.compression = compression self.check_config(check_write=False) def check_config(self, *, check_write): """Check whether this object storage is properly configured""" self.slicer.check_config() if not os.path.isdir(self.root): raise ValueError( 'PathSlicingObjStorage root "%s" is not a directory' % self.root ) if check_write: if not os.access(self.root, os.W_OK): raise PermissionError( 'PathSlicingObjStorage root "%s" is not writable' % self.root ) if self.compression not in compressors: raise ValueError( 'Unknown compression algorithm "%s" for ' "PathSlicingObjStorage" % self.compression ) return True def __contains__(self, obj_id): hex_obj_id = hashutil.hash_to_hex(obj_id) return os.path.isfile(self.slicer.get_path(hex_obj_id)) def __iter__(self): """Iterate over the object identifiers currently available in the storage. Warning: with the current implementation of the object storage, this method will walk the filesystem to list objects, meaning that listing all objects will be very slow for large storages. You almost certainly don't want to use this method in production. Return: Iterator over object IDs """ def obj_iterator(): # XXX hackish: it does not verify that the depth of found files # matches the slicing depth of the storage for root, _dirs, files in os.walk(self.root): _dirs.sort() for f in sorted(files): yield bytes.fromhex(f) return obj_iterator() def __len__(self): """Compute the number of objects available in the storage. Warning: this currently uses `__iter__`, its warning about bad performances applies Return: number of objects contained in the storage """ return sum(1 for i in self) def add(self, content, obj_id=None, check_presence=True): if obj_id is None: obj_id = compute_hash(content) if check_presence and obj_id in self: # If the object is already present, return immediately. return obj_id hex_obj_id = hashutil.hash_to_hex(obj_id) if not isinstance(content, Iterator): content = [content] compressor = compressors[self.compression]() with self._write_obj_file(hex_obj_id) as f: for chunk in content: f.write(compressor.compress(chunk)) f.write(compressor.flush()) return obj_id def get(self, obj_id): if obj_id not in self: raise ObjNotFoundError(obj_id) # Open the file and return its content as bytes hex_obj_id = hashutil.hash_to_hex(obj_id) d = decompressors[self.compression]() with open(self.slicer.get_path(hex_obj_id), "rb") as f: out = d.decompress(f.read()) if d.unused_data: raise Error( "Corrupt object %s: trailing data found" % hex_obj_id, ) return out def check(self, obj_id): try: data = self.get(obj_id) except OSError: hex_obj_id = hashutil.hash_to_hex(obj_id) raise Error( "Corrupt object %s: not a proper compressed file" % hex_obj_id, ) checksums = hashutil.MultiHash.from_data( data, hash_names=[ID_HASH_ALGO] ).digest() actual_obj_id = checksums[ID_HASH_ALGO] hex_obj_id = hashutil.hash_to_hex(obj_id) if hex_obj_id != hashutil.hash_to_hex(actual_obj_id): raise Error( "Corrupt object %s should have id %s" % (hashutil.hash_to_hex(obj_id), hashutil.hash_to_hex(actual_obj_id)) ) def delete(self, obj_id): super().delete(obj_id) # Check delete permission if obj_id not in self: raise ObjNotFoundError(obj_id) hex_obj_id = hashutil.hash_to_hex(obj_id) try: os.remove(self.slicer.get_path(hex_obj_id)) except FileNotFoundError: raise ObjNotFoundError(obj_id) return True # Management methods def get_random(self, batch_size): def get_random_content(self, batch_size): """Get a batch of content inside a single directory. Returns: a tuple (batch size, batch). """ dirs = [] for level in range(len(self.slicer)): path = os.path.join(self.root, *dirs) dir_list = next(os.walk(path))[1] if "tmp" in dir_list: dir_list.remove("tmp") dirs.append(random.choice(dir_list)) path = os.path.join(self.root, *dirs) content_list = next(os.walk(path))[2] length = min(batch_size, len(content_list)) return ( length, map(hashutil.hash_to_bytes, random.sample(content_list, length)), ) while batch_size: length, it = get_random_content(self, batch_size) batch_size = batch_size - length yield from it # Streaming methods @contextmanager def chunk_writer(self, obj_id): hex_obj_id = hashutil.hash_to_hex(obj_id) compressor = compressors[self.compression]() with self._write_obj_file(hex_obj_id) as f: yield lambda c: f.write(compressor.compress(c)) f.write(compressor.flush()) - def add_stream(self, content_iter, obj_id, check_presence=True): - """Add a new object to the object storage using streaming. - - This function is identical to add() except it takes a generator that - yields the chunked content instead of the whole content at once. - - Args: - content (bytes): chunked generator that yields the object's raw - content to add in storage. - obj_id (bytes): object identifier - check_presence (bool): indicate if the presence of the - content should be verified before adding the file. - - Returns: - the id (bytes) of the object into the storage. - - """ - if check_presence and obj_id in self: - return obj_id - - with self.chunk_writer(obj_id) as writer: - for chunk in content_iter: - writer(chunk) - - return obj_id - - def get_stream(self, obj_id, chunk_size=DEFAULT_CHUNK_SIZE): - if obj_id not in self: - raise ObjNotFoundError(obj_id) - - hex_obj_id = hashutil.hash_to_hex(obj_id) - decompressor = decompressors[self.compression]() - with open(self.slicer.get_path(hex_obj_id), "rb") as f: - while True: - raw = f.read(chunk_size) - if not raw: - break - r = decompressor.decompress(raw) - if not r: - continue - yield r - def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT): if last_obj_id: it = self.iter_from(last_obj_id) else: it = iter(self) return islice(it, limit) def iter_from(self, obj_id, n_leaf=False): hex_obj_id = hashutil.hash_to_hex(obj_id) slices = self.slicer.get_slices(hex_obj_id) rlen = len(self.root.split("/")) i = 0 for root, dirs, files in os.walk(self.root): if not dirs: i += 1 level = len(root.split("/")) - rlen dirs.sort() if dirs and root == os.path.join(self.root, *slices[:level]): cslice = slices[level] for d in dirs[:]: if d < cslice: dirs.remove(d) for f in sorted(files): if f > hex_obj_id: yield bytes.fromhex(f) if n_leaf: yield i @contextmanager def _write_obj_file(self, hex_obj_id): """Context manager for writing object files to the object storage. During writing, data are written to a temporary file, which is atomically renamed to the right file name after closing. Usage sample: with objstorage._write_obj_file(hex_obj_id): f.write(obj_data) Yields: a file-like object open for writing bytes. """ # Get the final paths and create the directory if absent. dir = self.slicer.get_directory(hex_obj_id) if not os.path.isdir(dir): os.makedirs(dir, DIR_MODE, exist_ok=True) path = os.path.join(dir, hex_obj_id) # Create a temporary file. (tmp, tmp_path) = tempfile.mkstemp(suffix=".tmp", prefix="hex_obj_id.", dir=dir) # Open the file and yield it for writing. tmp_f = os.fdopen(tmp, "wb") yield tmp_f # Make sure the contents of the temporary file are written to disk tmp_f.flush() if self.use_fdatasync: os.fdatasync(tmp) else: os.fsync(tmp) # Then close the temporary file and move it to the right path. tmp_f.close() os.chmod(tmp_path, FILE_MODE) os.rename(tmp_path, path) diff --git a/swh/objstorage/interface.py b/swh/objstorage/interface.py index b91c0b9..49ef9c4 100644 --- a/swh/objstorage/interface.py +++ b/swh/objstorage/interface.py @@ -1,237 +1,216 @@ # Copyright (C) 2015-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Dict from typing_extensions import Protocol, runtime_checkable from swh.core.api import remote_api_endpoint -from swh.objstorage.objstorage import DEFAULT_CHUNK_SIZE, DEFAULT_LIMIT +from swh.objstorage.objstorage import DEFAULT_LIMIT @runtime_checkable class ObjStorageInterface(Protocol): """High-level API to manipulate the Software Heritage object storage. Conceptually, the object storage offers the following methods: - check_config() check if the object storage is properly configured - __contains__() check if an object is present, by object id - add() add a new object, returning an object id - restore() same as add() but erase an already existed content - get() retrieve the content of an object, by object id - check() check the integrity of an object, by object id - delete() remove an object And some management methods: - get_random() get random object id of existing contents (used for the content integrity checker). - Some of the methods have available streaming equivalents: - - - get_stream() same as get() but returns a chunked iterator - Each implementation of this interface can have a different behavior and its own way to store the contents. """ @remote_api_endpoint("check_config") def check_config(self, *, check_write): """Check whether the object storage is properly configured. Args: check_write (bool): if True, check if writes to the object storage can succeed. Returns: True if the configuration check worked, an exception if it didn't. """ ... @remote_api_endpoint("content/contains") def __contains__(self, obj_id): """Indicate if the given object is present in the storage. Args: obj_id (bytes): object identifier. Returns: True if and only if the object is present in the current object storage. """ ... @remote_api_endpoint("content/add") def add(self, content, obj_id=None, check_presence=True): """Add a new object to the object storage. Args: content (bytes): object's raw content to add in storage. obj_id (bytes): checksum of [bytes] using [ID_HASH_ALGO] algorithm. When given, obj_id will be trusted to match the bytes. If missing, obj_id will be computed on the fly. check_presence (bool): indicate if the presence of the content should be verified before adding the file. Returns: the id (bytes) of the object into the storage. """ ... @remote_api_endpoint("content/add/batch") def add_batch(self, contents, check_presence=True) -> Dict: """Add a batch of new objects to the object storage. Args: contents: mapping from obj_id to object contents Returns: the summary of objects added to the storage (count of object, count of bytes object) """ ... def restore(self, content, obj_id=None): """Restore a content that have been corrupted. This function is identical to add but does not check if the object id is already in the file system. The default implementation provided by the current class is suitable for most cases. Args: content (bytes): object's raw content to add in storage obj_id (bytes): checksum of `bytes` as computed by ID_HASH_ALGO. When given, obj_id will be trusted to match bytes. If missing, obj_id will be computed on the fly. """ ... @remote_api_endpoint("content/get") def get(self, obj_id): """Retrieve the content of a given object. Args: obj_id (bytes): object id. Returns: the content of the requested object as bytes. Raises: ObjNotFoundError: if the requested object is missing. """ ... @remote_api_endpoint("content/get/batch") def get_batch(self, obj_ids): """Retrieve objects' raw content in bulk from storage. Note: This function does have a default implementation in ObjStorage that is suitable for most cases. For object storages that needs to do the minimal number of requests possible (ex: remote object storages), that method can be overridden to perform a more efficient operation. Args: obj_ids ([bytes]: list of object ids. Returns: list of resulting contents, or None if the content could not be retrieved. Do not raise any exception as a fail for one content will not cancel the whole request. """ ... @remote_api_endpoint("content/check") def check(self, obj_id): """Perform an integrity check for a given object. Verify that the file object is in place and that the content matches the object id. Args: obj_id (bytes): object identifier. Raises: ObjNotFoundError: if the requested object is missing. Error: if the request object is corrupted. """ ... @remote_api_endpoint("content/delete") def delete(self, obj_id): """Delete an object. Args: obj_id (bytes): object identifier. Raises: ObjNotFoundError: if the requested object is missing. """ ... # Management methods @remote_api_endpoint("content/get/random") def get_random(self, batch_size): """Get random ids of existing contents. This method is used in order to get random ids to perform content integrity verifications on random contents. Args: batch_size (int): Number of ids that will be given Yields: An iterable of ids (bytes) of contents that are in the current object storage. """ ... - # Streaming methods - - def get_stream(self, obj_id, chunk_size=DEFAULT_CHUNK_SIZE): - """Retrieve the content of a given object as a chunked iterator. - - Args: - obj_id (bytes): object id. - - Returns: - the content of the requested object as bytes. - - Raises: - ObjNotFoundError: if the requested object is missing. - - """ - ... - def __iter__(self): ... def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT): """Generates known object ids. Args: last_obj_id (bytes): object id from which to iterate from (excluded). limit (int): max number of object ids to generate. Generates: obj_id (bytes): object ids. """ ... diff --git a/swh/objstorage/objstorage.py b/swh/objstorage/objstorage.py index d7e2883..ea732e9 100644 --- a/swh/objstorage/objstorage.py +++ b/swh/objstorage/objstorage.py @@ -1,156 +1,147 @@ # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import abc import bz2 from itertools import dropwhile, islice import lzma from typing import Dict import zlib from swh.model import hashutil from .exc import ObjNotFoundError ID_HASH_ALGO = "sha1" ID_HEXDIGEST_LENGTH = 40 """Size in bytes of the hash hexadecimal representation.""" ID_DIGEST_LENGTH = 20 """Size in bytes of the hash""" -DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024 -"""Size in bytes of the streaming chunks""" - DEFAULT_LIMIT = 10000 """Default number of results of ``list_content``.""" def compute_hash(content): """Compute the content's hash. Args: content (bytes): The raw content to hash hash_name (str): Hash's name (default to ID_HASH_ALGO) Returns: The ID_HASH_ALGO for the content """ return ( hashutil.MultiHash.from_data( content, hash_names=[ID_HASH_ALGO], ) .digest() .get(ID_HASH_ALGO) ) class NullCompressor: def compress(self, data): return data def flush(self): return b"" class NullDecompressor: def decompress(self, data): return data @property def unused_data(self): return b"" decompressors = { "bz2": bz2.BZ2Decompressor, "lzma": lzma.LZMADecompressor, "gzip": lambda: zlib.decompressobj(wbits=31), "zlib": zlib.decompressobj, "none": NullDecompressor, } compressors = { "bz2": bz2.BZ2Compressor, "lzma": lzma.LZMACompressor, "gzip": lambda: zlib.compressobj(wbits=31), "zlib": zlib.compressobj, "none": NullCompressor, } class ObjStorage(metaclass=abc.ABCMeta): def __init__(self, *, allow_delete=False, **kwargs): # A more complete permission system could be used in place of that if # it becomes needed self.allow_delete = allow_delete @abc.abstractmethod def check_config(self, *, check_write): pass @abc.abstractmethod def __contains__(self, obj_id): pass @abc.abstractmethod def add(self, content, obj_id=None, check_presence=True): pass def add_batch(self, contents, check_presence=True) -> Dict: summary = {"object:add": 0, "object:add:bytes": 0} for obj_id, content in contents.items(): if check_presence and obj_id in self: continue self.add(content, obj_id, check_presence=False) summary["object:add"] += 1 summary["object:add:bytes"] += len(content) return summary def restore(self, content, obj_id=None): # check_presence to false will erase the potential previous content. return self.add(content, obj_id, check_presence=False) @abc.abstractmethod def get(self, obj_id): pass def get_batch(self, obj_ids): for obj_id in obj_ids: try: yield self.get(obj_id) except ObjNotFoundError: yield None @abc.abstractmethod def check(self, obj_id): pass @abc.abstractmethod def delete(self, obj_id): if not self.allow_delete: raise PermissionError("Delete is not allowed.") # Management methods def get_random(self, batch_size): pass # Streaming methods - def add_stream(self, content_iter, obj_id, check_presence=True): - raise NotImplementedError - - def get_stream(self, obj_id, chunk_size=DEFAULT_CHUNK_SIZE): - raise NotImplementedError - def list_content(self, last_obj_id=None, limit=DEFAULT_LIMIT): it = iter(self) if last_obj_id: it = dropwhile(lambda x: x <= last_obj_id, it) return islice(it, limit) diff --git a/swh/objstorage/tests/objstorage_testing.py b/swh/objstorage/tests/objstorage_testing.py index 85fcd47..953fbd4 100644 --- a/swh/objstorage/tests/objstorage_testing.py +++ b/swh/objstorage/tests/objstorage_testing.py @@ -1,264 +1,225 @@ # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from collections.abc import Iterator import inspect -import time from swh.objstorage import exc from swh.objstorage.interface import ObjStorageInterface from swh.objstorage.objstorage import compute_hash class ObjStorageTestFixture: def test_types(self): """Checks all methods of ObjStorageInterface are implemented by this backend, and that they have the same signature.""" # Create an instance of the protocol (which cannot be instantiated # directly, so this creates a subclass, then instantiates it) interface = type("_", (ObjStorageInterface,), {})() assert "get_batch" in dir(interface) missing_methods = [] for meth_name in dir(interface): if meth_name.startswith("_"): continue interface_meth = getattr(interface, meth_name) concrete_meth = getattr(self.storage, meth_name) expected_signature = inspect.signature(interface_meth) actual_signature = inspect.signature(concrete_meth) assert expected_signature == actual_signature, meth_name assert missing_methods == [] # If all the assertions above succeed, then this one should too. # But there's no harm in double-checking. # And we could replace the assertions above by this one, but unlike # the assertions above, it doesn't explain what is missing. assert isinstance(self.storage, ObjStorageInterface) def hash_content(self, content): obj_id = compute_hash(content) return content, obj_id def assertContentMatch(self, obj_id, expected_content): # noqa content = self.storage.get(obj_id) self.assertEqual(content, expected_content) def test_check_config(self): self.assertTrue(self.storage.check_config(check_write=False)) self.assertTrue(self.storage.check_config(check_write=True)) def test_contains(self): content_p, obj_id_p = self.hash_content(b"contains_present") content_m, obj_id_m = self.hash_content(b"contains_missing") self.storage.add(content_p, obj_id=obj_id_p) self.assertIn(obj_id_p, self.storage) self.assertNotIn(obj_id_m, self.storage) def test_add_get_w_id(self): content, obj_id = self.hash_content(b"add_get_w_id") r = self.storage.add(content, obj_id=obj_id) self.assertEqual(obj_id, r) self.assertContentMatch(obj_id, content) def test_add_twice(self): content, obj_id = self.hash_content(b"add_twice") r = self.storage.add(content, obj_id=obj_id) self.assertEqual(obj_id, r) self.assertContentMatch(obj_id, content) r = self.storage.add(content, obj_id=obj_id, check_presence=False) self.assertEqual(obj_id, r) self.assertContentMatch(obj_id, content) def test_add_big(self): content, obj_id = self.hash_content(b"add_big" * 1024 * 1024) r = self.storage.add(content, obj_id=obj_id) self.assertEqual(obj_id, r) self.assertContentMatch(obj_id, content) def test_add_get_wo_id(self): content, obj_id = self.hash_content(b"add_get_wo_id") r = self.storage.add(content) self.assertEqual(obj_id, r) self.assertContentMatch(obj_id, content) def test_add_get_batch(self): content1, obj_id1 = self.hash_content(b"add_get_batch_1") content2, obj_id2 = self.hash_content(b"add_get_batch_2") self.storage.add(content1, obj_id1) self.storage.add(content2, obj_id2) cr1, cr2 = self.storage.get_batch([obj_id1, obj_id2]) self.assertEqual(cr1, content1) self.assertEqual(cr2, content2) def test_get_batch_unexisting_content(self): content, obj_id = self.hash_content(b"get_batch_unexisting_content") result = list(self.storage.get_batch([obj_id])) self.assertTrue(len(result) == 1) self.assertIsNone(result[0]) def test_restore_content(self): self.storage.allow_delete = True valid_content, valid_obj_id = self.hash_content(b"restore_content") invalid_content = b"unexpected content" id_adding = self.storage.add(invalid_content, valid_obj_id) self.assertEqual(id_adding, valid_obj_id) with self.assertRaises(exc.Error): self.storage.check(id_adding) id_restore = self.storage.restore(valid_content, valid_obj_id) self.assertEqual(id_restore, valid_obj_id) self.assertContentMatch(valid_obj_id, valid_content) def test_get_missing(self): content, obj_id = self.hash_content(b"get_missing") with self.assertRaises(exc.ObjNotFoundError) as e: self.storage.get(obj_id) self.assertIn(obj_id, e.exception.args) def test_check_missing(self): content, obj_id = self.hash_content(b"check_missing") with self.assertRaises(exc.Error): self.storage.check(obj_id) def test_check_present(self): content, obj_id = self.hash_content(b"check_present") self.storage.add(content, obj_id) try: self.storage.check(obj_id) except exc.Error: self.fail("Integrity check failed") def test_delete_missing(self): self.storage.allow_delete = True content, obj_id = self.hash_content(b"missing_content_to_delete") with self.assertRaises(exc.Error): self.storage.delete(obj_id) def test_delete_present(self): self.storage.allow_delete = True content, obj_id = self.hash_content(b"content_to_delete") self.storage.add(content, obj_id=obj_id) self.assertTrue(self.storage.delete(obj_id)) with self.assertRaises(exc.Error): self.storage.get(obj_id) def test_delete_not_allowed(self): self.storage.allow_delete = False content, obj_id = self.hash_content(b"content_to_delete") self.storage.add(content, obj_id=obj_id) with self.assertRaises(PermissionError): self.storage.delete(obj_id) def test_delete_not_allowed_by_default(self): content, obj_id = self.hash_content(b"content_to_delete") self.storage.add(content, obj_id=obj_id) with self.assertRaises(PermissionError): self.assertTrue(self.storage.delete(obj_id)) - def test_add_stream(self): - content = [b"chunk1", b"chunk2"] - _, obj_id = self.hash_content(b"".join(content)) - try: - self.storage.add_stream(iter(content), obj_id=obj_id) - except NotImplementedError: - return - self.assertContentMatch(obj_id, b"".join(content)) - - def test_add_stream_sleep(self): - def gen_content(): - yield b"chunk1" - time.sleep(0.5) - yield b"chunk42" - - _, obj_id = self.hash_content(b"placeholder_id") - try: - self.storage.add_stream(gen_content(), obj_id=obj_id) - except NotImplementedError: - return - self.assertContentMatch(obj_id, b"chunk1chunk42") - - def test_get_stream(self): - content = b"123456789" - _, obj_id = self.hash_content(content) - self.storage.add(content, obj_id=obj_id) - r = self.storage.get(obj_id) - self.assertEqual(r, content) - - try: - r = self.storage.get_stream(obj_id, chunk_size=1) - except NotImplementedError: - return - self.assertTrue(isinstance(r, Iterator)) - r = list(r) - self.assertEqual(b"".join(r), content) - def test_add_batch(self): contents = {} expected_content_add = 0 expected_content_add_bytes = 0 for i in range(50): content = b"Test content %02d" % i content, obj_id = self.hash_content(content) contents[obj_id] = content expected_content_add_bytes += len(content) expected_content_add += 1 ret = self.storage.add_batch(contents) self.assertEqual( ret, { "object:add": expected_content_add, "object:add:bytes": expected_content_add_bytes, }, ) for obj_id in contents: self.assertIn(obj_id, self.storage) def test_content_iterator(self): sto_obj_ids = iter(self.storage) sto_obj_ids = list(sto_obj_ids) self.assertFalse(sto_obj_ids) obj_ids = set() for i in range(100): content, obj_id = self.hash_content(b"content %d" % i) self.storage.add(content, obj_id=obj_id) obj_ids.add(obj_id) sto_obj_ids = set(self.storage) self.assertEqual(sto_obj_ids, obj_ids) def test_list_content(self): all_ids = [] for i in range(1200): content = b"example %d" % i obj_id = compute_hash(content) self.storage.add(content, obj_id) all_ids.append(obj_id) all_ids.sort() ids = list(self.storage.list_content()) self.assertEqual(len(ids), 1200) self.assertEqual(ids[0], all_ids[0]) self.assertEqual(ids[100], all_ids[100]) self.assertEqual(ids[999], all_ids[999]) ids = list(self.storage.list_content(limit=10)) self.assertEqual(len(ids), 10) self.assertEqual(ids[0], all_ids[0]) self.assertEqual(ids[9], all_ids[9]) ids = list(self.storage.list_content(last_obj_id=all_ids[999], limit=100)) self.assertEqual(len(ids), 100) self.assertEqual(ids[0], all_ids[1000]) self.assertEqual(ids[9], all_ids[1009]) diff --git a/swh/objstorage/tests/test_objstorage_random_generator.py b/swh/objstorage/tests/test_objstorage_random_generator.py index 50f7129..919440a 100644 --- a/swh/objstorage/tests/test_objstorage_random_generator.py +++ b/swh/objstorage/tests/test_objstorage_random_generator.py @@ -1,46 +1,39 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from collections.abc import Iterator from swh.objstorage.factory import get_objstorage def test_random_generator_objstorage(): sto = get_objstorage("random", {}) assert sto blobs = [sto.get(None) for i in range(100)] lengths = [len(x) for x in blobs] assert max(lengths) <= 55056238 -def test_random_generator_objstorage_get_stream(): - sto = get_objstorage("random", {}) - gen = sto.get_stream(None) - assert isinstance(gen, Iterator) - assert list(gen) # ensure the iterator can be consumed - - def test_random_generator_objstorage_list_content(): sto = get_objstorage("random", {"total": 100}) assert isinstance(sto.list_content(), Iterator) assert list(sto.list_content()) == [b"%d" % i for i in range(1, 101)] assert list(sto.list_content(limit=10)) == [b"%d" % i for i in range(1, 11)] assert list(sto.list_content(last_obj_id=b"10", limit=10)) == [ b"%d" % i for i in range(11, 21) ] def test_random_generator_objstorage_total(): sto = get_objstorage("random", {"total": 5}) assert len([x for x in sto]) == 5 def test_random_generator_objstorage_size(): sto = get_objstorage("random", {"filesize": 10}) for i in range(10): assert len(sto.get(None)) == 10