Changeset View
Standalone View
swh/storage/tenacious.py
- This file was added.
# Copyright (C) 2020 The Software Heritage developers | |||||
# See the AUTHORS file at the top-level directory of this distribution | |||||
# License: GNU General Public License version 3, or any later version | |||||
# See top-level LICENSE file for more information | |||||
import logging | |||||
from collections import deque | |||||
from functools import partial | |||||
from typing import Deque, Dict, Iterable | |||||
from swh.model.model import BaseModel | |||||
from swh.storage import get_storage | |||||
logger = logging.getLogger(__name__) | |||||
class RateQueue: | |||||
def __init__(self, size: int, max_errors: int): | |||||
assert size > max_errors | |||||
self._size = size | |||||
self._max_errors = max_errors | |||||
self._errors: Deque[bool] = deque(maxlen=size) | |||||
def add_ok(self, n_ok: int = 1) -> None: | |||||
self._errors.extend([False] * n_ok) | |||||
def add_error(self, n_error: int = 1) -> None: | |||||
self._errors.extend([True] * n_error) | |||||
def limit_reached(self) -> bool: | |||||
return sum(self._errors) > self._max_errors | |||||
def reset(self): | |||||
# mainly for testing purpose | |||||
self._errors.clear() | |||||
class TenaciousProxyStorage: | |||||
"""Storage proxy that have a tenacious insertion behavior. | |||||
When an xxx_add method is called, it's first attempted as is against the backend | |||||
storage. Itf a failure occurs, split the list of inserted objects in pieces until | |||||
vlorentz: If | |||||
erroneous objects have been identified, so all the valid objects are guaranteed to | |||||
be inserted. | |||||
Also provides a error-rate limit feature: if more than n errors occurred during the | |||||
insertion of the last p objects, stop accepting any insertion. | |||||
Sample configuration use case for tenacious storage: | |||||
.. code-block:: yaml | |||||
storage: | |||||
cls: tenacious | |||||
args: | |||||
storage: | |||||
cls: remote | |||||
args: http://storage.internal.staging.swh.network:5002/ | |||||
Not Done Inline Actionsno more args: vlorentz: no more `args:` | |||||
error-rate-limit: | |||||
errors: 10 | |||||
total: 1000 | |||||
Not Done Inline Actionscould you document these two numbers? And I think total would be better named window_size or something of the sort. vlorentz: could you document these two numbers? And I think `total` would be better named `window_size`… | |||||
""" | |||||
def __init__(self, storage, error_rate_limit=None): | |||||
self.storage = get_storage(**storage) | |||||
if error_rate_limit is None: | |||||
error_rate_limit = {"errors": 10, "total": 1000} | |||||
assert "errors" in error_rate_limit | |||||
assert "total" in error_rate_limit | |||||
self.rate_queue = RateQueue( | |||||
size=error_rate_limit["total"], max_errors=error_rate_limit["errors"], | |||||
) | |||||
def __getattr__(self, key): | |||||
if key.endswith("_add"): | |||||
return partial(self.tenacious_add, key) | |||||
return getattr(self.storage, key) | |||||
def tenacious_add(self, func_name, objects: Iterable[BaseModel]) -> Dict[str, int]: | |||||
Not Done Inline Actions_tenacious_add vlorentz: `_tenacious_add` | |||||
"""Enqueue objects to write to the storage. This checks if the queue's | |||||
threshold is hit. If it is actually write those to the storage. | |||||
""" | |||||
add_function = getattr(self.storage, func_name) | |||||
object_type = func_name[:-4] # remove the _add suffix | |||||
to_add = [list(objects)] | |||||
results: Dict[str, int] = {} | |||||
while to_add: | |||||
if self.rate_queue.limit_reached(): | |||||
logging.error( | |||||
"Too many insertion errors have been detected; " | |||||
"disabling insertions" | |||||
) | |||||
Not Done Inline ActionsIt should raise an exception, else the client will think it went fine and will proceed to adding objects referencing it, so it will create holes in the graph. vlorentz: It should raise an exception, else the client will think it went fine and will proceed to… | |||||
break | |||||
objs = to_add.pop(0) | |||||
try: | |||||
accumulate(results, add_function(objs)) | |||||
self.rate_queue.add_ok(len(objs)) | |||||
except Exception as exc: | |||||
if len(objs) > 1: | |||||
logger.info( | |||||
f"{func_name}: failed to insert an objects batch, splitting" | |||||
) | |||||
Not Done Inline ActionsI think it should be an error (so it appears in sentry), and contain the exception's name and args. vlorentz: I think it should be an error (so it appears in sentry), and contain the exception's name and… | |||||
Done Inline Actionsno because the problematic object insertion will be attempted again (but in a smaller batch), and in most of the cases, this error is a transient error (typically a concurrency related one), so the object will most probably be properly inserted right after this situation occurs. When we really fail at inserting the object (i.e. when the batch size has fallen to 1), then we log the exception. douardda: no because the problematic object insertion will be attempted again (but in a smaller batch)… | |||||
Not Done Inline Actionsok vlorentz: ok | |||||
# reinsert objs split in 2 parts at the beginning of to_add | |||||
to_add[:0] = [objs[: len(objs) // 2], objs[len(objs) // 2 :]] | |||||
Not Done Inline Actionsnice. vlorentz: nice. | |||||
else: | |||||
logger.error(f"{func_name}: failed to insert an object, exclude it") | |||||
# logger.error(f"Exception: {exc}") | |||||
logger.exception(f"Exception: {exc}") | |||||
accumulate(results, {f"{object_type}:add:errors": 1}) | |||||
self.rate_queue.add_error() | |||||
return results | |||||
def reset(self): | |||||
self.rate_queue.reset() | |||||
def accumulate(d1, d2): | |||||
for k, v in d2.items(): | |||||
v += d1.get(k, 0) | |||||
d1[k] = v | |||||
Not Done Inline Actionsyou're reimplementing collections.Counter. Instead, you can make results a Counter and call results.update() vlorentz: you're reimplementing `collections.Counter`. Instead, you can make `results` a `Counter` and… | |||||
Done Inline Actionsgood thinking, thx douardda: good thinking, thx | |||||
Not Done Inline Actionsit's dead code now vlorentz: it's dead code now | |||||
Done Inline Actionsjeeez, thx! douardda: jeeez, thx! |
If