Page MenuHomeSoftware Heritage

D6554.diff
No OneTemporary

D6554.diff

diff --git a/requirements-test.txt b/requirements-test.txt
--- a/requirements-test.txt
+++ b/requirements-test.txt
@@ -10,4 +10,5 @@
types-python-dateutil
types-pytz
types-pyyaml
+types-redis
types-requests
diff --git a/swh/storage/proxies/validate.py b/swh/storage/proxies/validate.py
--- a/swh/storage/proxies/validate.py
+++ b/swh/storage/proxies/validate.py
@@ -4,13 +4,18 @@
# See top-level LICENSE file for more information
+import logging
from typing import Dict, Iterable, List
+from swh.journal.serializers import value_to_kafka
from swh.model.hashutil import MultiHash, hash_to_bytes, hash_to_hex
from swh.model.model import Content, Directory, Release, Revision, Snapshot
from swh.storage import get_storage
from swh.storage.exc import StorageArgumentException
from swh.storage.interface import StorageInterface
+from swh.storage.writer import model_object_dict_sanitizer
+
+logger = logging.getLogger(__name__)
class ValidatingProxyStorage:
@@ -28,30 +33,67 @@
"""
- def __init__(self, storage):
+ def __init__(self, storage, on_error="raise", **kwargs):
self.storage: StorageInterface = get_storage(**storage)
+ self.on_error = on_error
+ self.redis = None
+ if self.on_error == "redis":
+ from redis import Redis
+
+ self.redis = Redis(**kwargs.get("redis", {}))
def __getattr__(self, key):
if key == "storage":
raise AttributeError(key)
return getattr(self.storage, key)
+ def _report_failure(self, obj):
+ if self.redis:
+ if hasattr(obj, "swhid"):
+ oid = str(obj.swhid())
+ elif hasattr(obj, "compute_hash"):
+ uid = obj.compute_hash()
+ oid = f"{obj.object_type}:{uid.hex()}"
+ else:
+ uid = obj.unique_key()
+ if isinstance(uid, dict):
+ if obj.objsct_type == "skipped_content":
+ uids = ",".join(f"{k}={v.hex()}" for k, v in uid.items())
+ else:
+ uids = ",".join(f"{k}={v}" for k, v in uid.items())
+ oid = f"{obj.object_type}:{uids}"
+ else: # it's a bytes, but it should have been handled already...
+ raise TypeError(f"Should not happen: {obj}")
+ self.redis.set(
+ oid,
+ value_to_kafka(
+ model_object_dict_sanitizer(obj.object_type, obj.to_dict())
+ ),
+ )
+
def _check_hashes(self, objects: Iterable):
for obj in objects:
id_ = hash_to_bytes(obj.compute_hash())
if id_ != obj.id:
- raise StorageArgumentException(
+ msg = (
f"Object has id {hash_to_hex(obj.id)}, "
f"but it should be {hash_to_hex(id_)}: {obj}"
)
+ logger.error(msg)
+ self._report_failure(obj)
+ if self.on_error == "raise":
+ raise StorageArgumentException(msg)
def content_add(self, content: List[Content]) -> Dict[str, int]:
for cont in content:
hashes = MultiHash.from_data(cont.data).digest()
if hashes != cont.hashes():
- raise StorageArgumentException(
- f"Object has hashes {cont.hashes()}, but they should be {hashes}"
- )
+ msg = f"Object has hashes {cont.hashes()}, but they should be {hashes}"
+ logger.error(msg)
+ self._report_failure(cont)
+ if self.on_error == "raise":
+ raise StorageArgumentException(msg)
+
return self.storage.content_add(content)
def directory_add(self, directories: List[Directory]) -> Dict[str, int]:

File Metadata

Mime Type
text/plain
Expires
Dec 18 2024, 4:13 AM (11 w, 2 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3228915

Event Timeline