diff --git a/requirements-test.txt b/requirements-test.txt --- a/requirements-test.txt +++ b/requirements-test.txt @@ -10,4 +10,5 @@ types-python-dateutil types-pytz types-pyyaml +types-redis types-requests diff --git a/swh/storage/proxies/validate.py b/swh/storage/proxies/validate.py --- a/swh/storage/proxies/validate.py +++ b/swh/storage/proxies/validate.py @@ -4,13 +4,18 @@ # See top-level LICENSE file for more information +import logging from typing import Dict, Iterable, List +from swh.journal.serializers import value_to_kafka from swh.model.hashutil import MultiHash, hash_to_bytes, hash_to_hex from swh.model.model import Content, Directory, Release, Revision, Snapshot from swh.storage import get_storage from swh.storage.exc import StorageArgumentException from swh.storage.interface import StorageInterface +from swh.storage.writer import model_object_dict_sanitizer + +logger = logging.getLogger(__name__) class ValidatingProxyStorage: @@ -28,30 +33,67 @@ """ - def __init__(self, storage): + def __init__(self, storage, on_error="raise", **kwargs): self.storage: StorageInterface = get_storage(**storage) + self.on_error = on_error + self.redis = None + if self.on_error == "redis": + from redis import Redis + + self.redis = Redis(**kwargs.get("redis", {})) def __getattr__(self, key): if key == "storage": raise AttributeError(key) return getattr(self.storage, key) + def _report_failure(self, obj): + if self.redis: + if hasattr(obj, "swhid"): + oid = str(obj.swhid()) + elif hasattr(obj, "compute_hash"): + uid = obj.compute_hash() + oid = f"{obj.object_type}:{uid.hex()}" + else: + uid = obj.unique_key() + if isinstance(uid, dict): + if obj.objsct_type == "skipped_content": + uids = ",".join(f"{k}={v.hex()}" for k, v in uid.items()) + else: + uids = ",".join(f"{k}={v}" for k, v in uid.items()) + oid = f"{obj.object_type}:{uids}" + else: # it's a bytes, but it should have been handled already... + raise TypeError(f"Should not happen: {obj}") + self.redis.set( + oid, + value_to_kafka( + model_object_dict_sanitizer(obj.object_type, obj.to_dict()) + ), + ) + def _check_hashes(self, objects: Iterable): for obj in objects: id_ = hash_to_bytes(obj.compute_hash()) if id_ != obj.id: - raise StorageArgumentException( + msg = ( f"Object has id {hash_to_hex(obj.id)}, " f"but it should be {hash_to_hex(id_)}: {obj}" ) + logger.error(msg) + self._report_failure(obj) + if self.on_error == "raise": + raise StorageArgumentException(msg) def content_add(self, content: List[Content]) -> Dict[str, int]: for cont in content: hashes = MultiHash.from_data(cont.data).digest() if hashes != cont.hashes(): - raise StorageArgumentException( - f"Object has hashes {cont.hashes()}, but they should be {hashes}" - ) + msg = f"Object has hashes {cont.hashes()}, but they should be {hashes}" + logger.error(msg) + self._report_failure(cont) + if self.on_error == "raise": + raise StorageArgumentException(msg) + return self.storage.content_add(content) def directory_add(self, directories: List[Directory]) -> Dict[str, int]: