Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7066387
D6571.id23940.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
24 KB
Subscribers
None
D6571.id23940.diff
View Options
diff --git a/requirements-swh-journal.txt b/requirements-swh-journal.txt
--- a/requirements-swh-journal.txt
+++ b/requirements-swh-journal.txt
@@ -1 +1 @@
-swh.journal >= 0.6.2
+swh.journal >= 0.9
diff --git a/requirements-test.txt b/requirements-test.txt
--- a/requirements-test.txt
+++ b/requirements-test.txt
@@ -6,8 +6,10 @@
# adding the [testing] extra.
swh.model[testing] >= 0.0.50
pytz
+pytest-redis
pytest-xdist
types-python-dateutil
types-pytz
types-pyyaml
+types-redis
types-requests
diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,10 +1,11 @@
+aiohttp
+cassandra-driver >= 3.19.0, != 3.21.0
click
+deprecated
flask
+iso8601
+mypy_extensions
psycopg2
-aiohttp
+redis
tenacity
-cassandra-driver >= 3.19.0, != 3.21.0
-deprecated
typing-extensions
-mypy_extensions
-iso8601
diff --git a/swh/storage/cli.py b/swh/storage/cli.py
--- a/swh/storage/cli.py
+++ b/swh/storage/cli.py
@@ -13,7 +13,7 @@
from swh.core.cli import CONTEXT_SETTINGS
from swh.core.cli import swh as swh_cli_group
-from swh.storage.replay import object_converter_fn
+from swh.storage.replay import ModelObjectDeserializer, object_converter_fn
try:
from systemd.daemon import notify
@@ -187,11 +187,21 @@
conf = ctx.obj["config"]
storage = get_storage(**conf.pop("storage"))
+ if "error_reporter" in conf:
+ from redis import Redis
+
+ reporter = Redis(**conf["error_reporter"]).set
+ else:
+ reporter = None
+ deserializer = ModelObjectDeserializer(reporter=reporter)
+
client_cfg = conf.pop("journal_client")
+ client_cfg["value_deserializer"] = deserializer.convert
if object_types:
client_cfg["object_types"] = object_types
if stop_after_objects:
client_cfg["stop_after_objects"] = stop_after_objects
+
try:
client = get_journal_client(**client_cfg)
except ValueError as exc:
diff --git a/swh/storage/fixer.py b/swh/storage/fixer.py
--- a/swh/storage/fixer.py
+++ b/swh/storage/fixer.py
@@ -6,7 +6,7 @@
import copy
import datetime
import logging
-from typing import Any, Dict, List
+from typing import Any, Callable, Dict, List
from swh.model.model import Origin
@@ -116,6 +116,7 @@
""" # noqa
rev = _fix_revision_pypi_empty_string(revision)
rev = _fix_revision_transplant_source(rev)
+ rev.pop("metadata", None) # this haas been dead for a while now
return rev
@@ -249,21 +250,21 @@
return o
+object_fixers: Dict[str, Callable[[Dict], Dict]] = {
+ "content": _fix_content,
+ "revision": _fix_revision,
+ "origin": _fix_origin,
+ "origin_visit": _fix_origin_visit,
+ "raw_extrinsic_metadata": _fix_raw_extrinsic_metadata,
+}
+
+
def fix_objects(object_type: str, objects: List[Dict]) -> List[Dict]:
"""
Fix legacy objects from the journal to bring them up to date with the
latest storage schema.
"""
- if object_type == "content":
- return [_fix_content(v) for v in objects]
- elif object_type == "revision":
- revisions = [_fix_revision(v) for v in objects]
- return [rev for rev in revisions if rev is not None]
- elif object_type == "origin":
- return [_fix_origin(v) for v in objects]
- elif object_type == "origin_visit":
- return [_fix_origin_visit(v) for v in objects]
- elif object_type == "raw_extrinsic_metadata":
- return [_fix_raw_extrinsic_metadata(v) for v in objects]
- else:
- return objects
+ if object_type in object_fixers:
+ fixer = object_fixers[object_type]
+ objects = [fixer(v) for v in objects]
+ return objects
diff --git a/swh/storage/replay.py b/swh/storage/replay.py
--- a/swh/storage/replay.py
+++ b/swh/storage/replay.py
@@ -3,8 +3,11 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+from collections import Counter
import logging
-from typing import Any, Callable, Container, Dict, List
+from typing import Any, Callable, Container
+from typing import Counter as CounterT
+from typing import Dict, List, Optional, TypeVar, Union, cast
try:
from systemd.daemon import notify
@@ -12,12 +15,15 @@
notify = None
from swh.core.statsd import statsd
+from swh.journal.serializers import kafka_to_value
+from swh.model.hashutil import hash_to_hex
from swh.model.model import (
BaseContent,
BaseModel,
Content,
Directory,
ExtID,
+ HashableObject,
MetadataAuthority,
MetadataFetcher,
Origin,
@@ -29,8 +35,8 @@
SkippedContent,
Snapshot,
)
-from swh.storage.exc import HashCollision
-from swh.storage.fixer import fix_objects
+from swh.storage.exc import HashCollision, StorageArgumentException
+from swh.storage.fixer import object_fixers
from swh.storage.interface import StorageInterface
logger = logging.getLogger(__name__)
@@ -56,8 +62,52 @@
}
+class ModelObjectDeserializer:
+ def __init__(
+ self,
+ validate: bool = True,
+ raise_on_error: bool = False,
+ reporter: Optional[Callable[[str, bytes], None]] = None,
+ ):
+ self.validate = validate
+ self.reporter = reporter
+ self.raise_on_error = raise_on_error
+
+ def convert(self, object_type: str, msg: bytes) -> Optional[BaseModel]:
+ dict_repr = kafka_to_value(msg)
+ if object_type in object_fixers:
+ dict_repr = object_fixers[object_type](dict_repr)
+ obj = object_converter_fn[object_type](dict_repr)
+ if self.validate:
+ if isinstance(obj, HashableObject):
+ cid = obj.compute_hash()
+ if obj.id != cid:
+ error_msg = (
+ f"Object has id {hash_to_hex(obj.id)}, "
+ f"but it should be {hash_to_hex(cid)}: {obj}"
+ )
+ logger.error(error_msg)
+ self.report_failure(msg, obj)
+ if self.raise_on_error:
+ raise StorageArgumentException(error_msg)
+ return None
+ return obj
+
+ def report_failure(self, msg: bytes, obj: BaseModel):
+ if self.reporter:
+ oid: str = ""
+ if hasattr(obj, "swhid"):
+ swhid = obj.swhid() # type: ignore[attr-defined]
+ oid = str(swhid)
+ elif isinstance(obj, HashableObject):
+ uid = obj.compute_hash()
+ oid = f"{obj.object_type}:{uid.hex()}" # type: ignore[attr-defined]
+ if oid:
+ self.reporter(oid, msg)
+
+
def process_replay_objects(
- all_objects: Dict[str, List[Dict[str, Any]]], *, storage: StorageInterface
+ all_objects: Dict[str, List[BaseModel]], *, storage: StorageInterface
) -> None:
for (object_type, objects) in all_objects.items():
logger.debug("Inserting %s %s objects", len(objects), object_type)
@@ -70,9 +120,12 @@
notify("WATCHDOG=1")
+ContentType = TypeVar("ContentType", bound=BaseContent)
+
+
def collision_aware_content_add(
- content_add_fn: Callable[[List[Any]], Dict[str, int]], contents: List[BaseContent],
-) -> None:
+ content_add_fn: Callable[[List[ContentType]], Dict[str, int]]
+) -> Callable[[List[ContentType]], Dict[str, int]]:
"""Add contents to storage. If a hash collision is detected, an error is
logged. Then this adds the other non colliding contents to the storage.
@@ -81,29 +134,37 @@
contents: List of contents or skipped contents to add to storage
"""
- if not contents:
- return
- colliding_content_hashes: List[Dict[str, Any]] = []
- while True:
- try:
- content_add_fn(contents)
- except HashCollision as e:
- colliding_content_hashes.append(
- {
- "algo": e.algo,
- "hash": e.hash_id, # hex hash id
- "objects": e.colliding_contents, # hex hashes
- }
- )
- colliding_hashes = e.colliding_content_hashes()
- # Drop the colliding contents from the transaction
- contents = [c for c in contents if c.hashes() not in colliding_hashes]
- else:
- # Successfully added contents, we are done
- break
- if colliding_content_hashes:
- for collision in colliding_content_hashes:
- logger.error("Collision detected: %(collision)s", {"collision": collision})
+
+ def wrapper(contents: List[ContentType]) -> Dict[str, int]:
+ if not contents:
+ return {}
+ colliding_content_hashes: List[Dict[str, Any]] = []
+ results: CounterT[str] = Counter()
+ while True:
+ try:
+ results.update(content_add_fn(contents))
+ except HashCollision as e:
+ colliding_content_hashes.append(
+ {
+ "algo": e.algo,
+ "hash": e.hash_id, # hex hash id
+ "objects": e.colliding_contents, # hex hashes
+ }
+ )
+ colliding_hashes = e.colliding_content_hashes()
+ # Drop the colliding contents from the transaction
+ contents = [c for c in contents if c.hashes() not in colliding_hashes]
+ else:
+ # Successfully added contents, we are done
+ break
+ if colliding_content_hashes:
+ for collision in colliding_content_hashes:
+ logger.error(
+ "Collision detected: %(collision)s", {"collision": collision}
+ )
+ return dict(results)
+
+ return wrapper
def dict_key_dropper(d: Dict, keys_to_drop: Container) -> Dict:
@@ -112,73 +173,29 @@
def _insert_objects(
- object_type: str, objects: List[Dict], storage: StorageInterface
+ object_type: str, objects: List[BaseModel], storage: StorageInterface
) -> None:
"""Insert objects of type object_type in the storage.
"""
- objects = fix_objects(object_type, objects)
-
- if object_type == "content":
- # for bw compat, skipped content should now be delivered in the skipped_content
- # topic
- contents: List[BaseContent] = []
- skipped_contents: List[BaseContent] = []
- for content in objects:
- c = BaseContent.from_dict(content)
- if isinstance(c, SkippedContent):
- logger.warning(
- "Received a series of skipped_content in the "
- "content topic, this should not happen anymore"
- )
- skipped_contents.append(c)
- else:
- contents.append(c)
- collision_aware_content_add(storage.skipped_content_add, skipped_contents)
- collision_aware_content_add(storage.content_add_metadata, contents)
- elif object_type == "skipped_content":
- skipped_contents = [SkippedContent.from_dict(obj) for obj in objects]
- collision_aware_content_add(storage.skipped_content_add, skipped_contents)
+ if object_type not in object_converter_fn:
+ logger.warning("Received a series of %s, this should not happen", object_type)
+ return
+
+ method = getattr(storage, f"{object_type}_add")
+ if object_type == "skipped_content":
+ method = collision_aware_content_add(method)
+ elif object_type == "content":
+ method = collision_aware_content_add(storage.content_add_metadata)
elif object_type in ("origin_visit", "origin_visit_status"):
origins: List[Origin] = []
- converter_fn = object_converter_fn[object_type]
- model_objs = []
- for obj in objects:
- origins.append(Origin(url=obj["origin"]))
- model_objs.append(converter_fn(obj))
+ for obj in cast(List[Union[OriginVisit, OriginVisitStatus]], objects):
+ origins.append(Origin(url=obj.origin))
storage.origin_add(origins)
- method = getattr(storage, f"{object_type}_add")
- method(model_objs)
elif object_type == "raw_extrinsic_metadata":
- converted = [RawExtrinsicMetadata.from_dict(o) for o in objects]
- authorities = {emd.authority for emd in converted}
- fetchers = {emd.fetcher for emd in converted}
+ emds = cast(List[RawExtrinsicMetadata], objects)
+ authorities = {emd.authority for emd in emds}
+ fetchers = {emd.fetcher for emd in emds}
storage.metadata_authority_add(list(authorities))
storage.metadata_fetcher_add(list(fetchers))
- storage.raw_extrinsic_metadata_add(converted)
- elif object_type == "revision":
- # drop the metadata field from the revision (is any); this field is
- # about to be dropped from the data model (in favor of
- # raw_extrinsic_metadata) and there can be bogus values in the existing
- # journal (metadata with \0000 in it)
- method = getattr(storage, object_type + "_add")
- method(
- [
- object_converter_fn[object_type](dict_key_dropper(o, ("metadata",)))
- for o in objects
- ]
- )
- elif object_type in (
- "directory",
- "extid",
- "revision",
- "release",
- "snapshot",
- "origin",
- "metadata_fetcher",
- "metadata_authority",
- ):
- method = getattr(storage, object_type + "_add")
- method([object_converter_fn[object_type](o) for o in objects])
- else:
- logger.warning("Received a series of %s, this should not happen", object_type)
+ method(objects)
diff --git a/swh/storage/tests/test_backfill.py b/swh/storage/tests/test_backfill.py
--- a/swh/storage/tests/test_backfill.py
+++ b/swh/storage/tests/test_backfill.py
@@ -20,7 +20,7 @@
raw_extrinsic_metadata_target_ranges,
)
from swh.storage.in_memory import InMemoryStorage
-from swh.storage.replay import process_replay_objects
+from swh.storage.replay import ModelObjectDeserializer, process_replay_objects
from swh.storage.tests.test_replay import check_replayed
TEST_CONFIG = {
@@ -239,7 +239,6 @@
}
swh_storage_backend_config["journal_writer"] = journal1
storage = get_storage(**swh_storage_backend_config)
-
# fill the storage and the journal (under prefix1)
for object_type, objects in TEST_OBJECTS.items():
method = getattr(storage, object_type + "_add")
@@ -266,13 +265,16 @@
# now check journal content are the same under both topics
# use the replayer scaffolding to fill storages to make is a bit easier
# Replaying #1
+ deserializer = ModelObjectDeserializer()
sto1 = get_storage(cls="memory")
replayer1 = JournalClient(
brokers=kafka_server,
group_id=f"{kafka_consumer_group}-1",
prefix=prefix1,
stop_on_eof=True,
+ value_deserializer=deserializer.convert,
)
+
worker_fn1 = functools.partial(process_replay_objects, storage=sto1)
replayer1.process(worker_fn1)
@@ -283,6 +285,7 @@
group_id=f"{kafka_consumer_group}-2",
prefix=prefix2,
stop_on_eof=True,
+ value_deserializer=deserializer.convert,
)
worker_fn2 = functools.partial(process_replay_objects, storage=sto2)
replayer2.process(worker_fn2)
diff --git a/swh/storage/tests/test_replay.py b/swh/storage/tests/test_replay.py
--- a/swh/storage/tests/test_replay.py
+++ b/swh/storage/tests/test_replay.py
@@ -7,13 +7,14 @@
import datetime
import functools
import logging
-from typing import Any, Container, Dict, Optional
+import re
+from typing import Any, Container, Dict, Optional, cast
import attr
import pytest
from swh.journal.client import JournalClient
-from swh.journal.serializers import key_to_kafka, value_to_kafka
+from swh.journal.serializers import kafka_to_value, key_to_kafka, value_to_kafka
from swh.model.hashutil import DEFAULT_ALGORITHMS, MultiHash, hash_to_bytes, hash_to_hex
from swh.model.model import Revision, RevisionType
from swh.model.tests.swh_model_data import (
@@ -25,15 +26,17 @@
from swh.model.tests.swh_model_data import TEST_OBJECTS as _TEST_OBJECTS
from swh.storage import get_storage
from swh.storage.cassandra.model import ContentRow, SkippedContentRow
+from swh.storage.exc import StorageArgumentException
from swh.storage.in_memory import InMemoryStorage
-from swh.storage.replay import process_replay_objects
+from swh.storage.replay import ModelObjectDeserializer, process_replay_objects
UTC = datetime.timezone.utc
TEST_OBJECTS = _TEST_OBJECTS.copy()
+# add a revision with metadata to check this later is dropped while being replayed
TEST_OBJECTS["revision"] = list(_TEST_OBJECTS["revision"]) + [
Revision(
- id=hash_to_bytes("a569b03ebe6e5f9f2f6077355c40d89bd6986d0c"),
+ id=hash_to_bytes("51d9d94ab08d3f75512e3a9fd15132e0a7ca7928"),
message=b"hello again",
date=DATES[1],
committer=COMMITTERS[1],
@@ -70,11 +73,13 @@
"journal_writer": journal_writer_config,
}
storage = get_storage(**storage_config)
+ deserializer = ModelObjectDeserializer()
replayer = JournalClient(
brokers=kafka_server,
group_id=kafka_consumer_group,
prefix=kafka_prefix,
stop_on_eof=True,
+ value_deserializer=deserializer.convert,
)
yield storage, replayer
@@ -207,12 +212,6 @@
_check_replay_skipped_content(src, replayer, "skipped_content")
-def test_replay_skipped_content_bwcompat(replayer_storage_and_client):
- """Test the 'content' topic can be used to replay SkippedContent objects."""
- src, replayer = replayer_storage_and_client
- _check_replay_skipped_content(src, replayer, "content")
-
-
# utility functions
@@ -355,12 +354,14 @@
# Fill a destination storage from Kafka, potentially using privileged topics
dst_storage = get_storage(cls="memory")
+ deserializer = ModelObjectDeserializer()
replayer = JournalClient(
brokers=kafka_server,
group_id=kafka_consumer_group,
prefix=kafka_prefix,
stop_after_objects=nb_sent,
privileged=privileged,
+ value_deserializer=deserializer.convert,
)
worker_fn = functools.partial(process_replay_objects, storage=dst_storage)
@@ -373,3 +374,169 @@
assert isinstance(storage, InMemoryStorage) # needed to help mypy
assert isinstance(dst_storage, InMemoryStorage)
check_replayed(storage, dst_storage, expected_anonymized=not privileged)
+
+
+def test_storage_replayer_with_validation_ok(
+ replayer_storage_and_client, caplog, redisdb
+):
+ """Optimal replayer scenario
+
+ with validation activated and reporter set to a redis db.
+
+ - writes objects to a source storage
+ - replayer consumes objects from the topic and replays them
+ - a destination storage is filled from this
+ - nothing has been reported in the redis db
+ - both storages should have the same content
+ """
+ src, replayer = replayer_storage_and_client
+ replayer.deserializer = ModelObjectDeserializer(validate=True, reporter=redisdb.set)
+
+ # Fill Kafka using a source storage
+ nb_sent = 0
+ for object_type, objects in TEST_OBJECTS.items():
+ method = getattr(src, object_type + "_add")
+ method(objects)
+ if object_type == "origin_visit":
+ nb_sent += len(objects) # origin-visit-add adds origin-visit-status as well
+ nb_sent += len(objects)
+
+ # Fill the destination storage from Kafka
+ dst = get_storage(cls="memory")
+ worker_fn = functools.partial(process_replay_objects, storage=dst)
+ nb_inserted = replayer.process(worker_fn)
+ assert nb_sent == nb_inserted
+
+ # check we do not have invalid objects reported
+ invalid = 0
+ reg = re.compile("Object has id [0-9a-f]{40}, but it should be [0-9a-f]{40}: .*")
+ for record in caplog.records:
+ logtext = record.getMessage()
+ if reg.match(logtext):
+ invalid += 1
+ assert invalid == 0, "Invalid objects should not be detected"
+ assert not redisdb.keys()
+ # so the dst should be the same as src storage
+ check_replayed(cast(InMemoryStorage, src), cast(InMemoryStorage, dst))
+
+
+def test_storage_replayer_with_validation_nok(
+ replayer_storage_and_client, caplog, redisdb
+):
+ """Replayer scenario with invalid objects
+
+ with validation and reporter set to a redis db.
+
+ - writes objects to a source storage
+ - replayer consumes objects from the topic and replays them
+ - the destination storage is filled with only valid objects
+ - the redis db contains the invalid (raw kafka mesg) objects
+ """
+ src, replayer = replayer_storage_and_client
+ replayer.value_deserializer = ModelObjectDeserializer(
+ validate=True, reporter=redisdb.set
+ ).convert
+
+ caplog.set_level(logging.ERROR, "swh.journal.replay")
+
+ # Fill Kafka using a source storage
+ nb_sent = 0
+ for object_type, objects in TEST_OBJECTS.items():
+ method = getattr(src, object_type + "_add")
+ method(objects)
+ if object_type == "origin_visit":
+ nb_sent += len(objects) # origin-visit-add adds origin-visit-status as well
+ nb_sent += len(objects)
+
+ # insert invalid objects
+ for object_type in ("revision", "directory", "release", "snapshot"):
+ method = getattr(src, object_type + "_add")
+ method([attr.evolve(TEST_OBJECTS[object_type][0], id=hash_to_bytes("0" * 40))])
+ nb_sent += 1
+
+ # Fill the destination storage from Kafka
+ dst = get_storage(cls="memory")
+ worker_fn = functools.partial(process_replay_objects, storage=dst)
+ nb_inserted = replayer.process(worker_fn)
+ assert nb_sent == nb_inserted
+
+ # check we do have invalid objects reported
+ invalid = 0
+ reg = re.compile("Object has id [0-9a-f]{40}, but it should be [0-9a-f]{40}: .*")
+ for record in caplog.records:
+ logtext = record.getMessage()
+ if reg.match(logtext):
+ invalid += 1
+ assert invalid == 4, "Invalid objects should be detected"
+ assert set(redisdb.keys()) == {
+ f"swh:1:{typ}:{'0'*40}".encode() for typ in ("rel", "rev", "snp", "dir")
+ }
+
+ for key in redisdb.keys():
+ # check the stored value looks right
+ rawvalue = redisdb.get(key)
+ value = kafka_to_value(rawvalue)
+ assert isinstance(value, dict)
+ assert "id" in value
+ assert value["id"] == b"\x00" * 20
+
+ # check that invalid objects did not reach the dst storage
+ for attr_ in (
+ "directories",
+ "revisions",
+ "releases",
+ "snapshots",
+ ):
+ for id, obj in sorted(getattr(dst._cql_runner, f"_{attr_}").iter_all()):
+ assert id != b"\x00" * 20
+
+
+def test_storage_replayer_with_validation_nok_raises(
+ replayer_storage_and_client, caplog, redisdb
+):
+ """Replayer scenario with invalid objects
+
+ with raise_on_error set to True
+
+ This:
+ - writes both valid & invalid objects to a source storage
+ - a StorageArgumentException should be raised while replayer consumes
+ objects from the topic and replays them
+ """
+ src, replayer = replayer_storage_and_client
+ replayer.value_deserializer = ModelObjectDeserializer(
+ validate=True, reporter=redisdb.set, raise_on_error=True
+ ).convert
+
+ caplog.set_level(logging.ERROR, "swh.journal.replay")
+
+ # Fill Kafka using a source storage
+ nb_sent = 0
+ for object_type, objects in TEST_OBJECTS.items():
+ method = getattr(src, object_type + "_add")
+ method(objects)
+ if object_type == "origin_visit":
+ nb_sent += len(objects) # origin-visit-add adds origin-visit-status as well
+ nb_sent += len(objects)
+
+ # insert invalid objects
+ for object_type in ("revision", "directory", "release", "snapshot"):
+ method = getattr(src, object_type + "_add")
+ method([attr.evolve(TEST_OBJECTS[object_type][0], id=hash_to_bytes("0" * 40))])
+ nb_sent += 1
+
+ # Fill the destination storage from Kafka
+ dst = get_storage(cls="memory")
+ worker_fn = functools.partial(process_replay_objects, storage=dst)
+ with pytest.raises(StorageArgumentException):
+ replayer.process(worker_fn)
+
+ # check we do have invalid objects reported
+ invalid = 0
+ reg = re.compile("Object has id [0-9a-f]{40}, but it should be [0-9a-f]{40}: .*")
+ for record in caplog.records:
+ logtext = record.getMessage()
+ if reg.match(logtext):
+ invalid += 1
+ assert invalid == 1, "One invalid objects should be detected"
+ assert len(redisdb.keys()) == 1
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Nov 5 2024, 7:26 AM (10 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3230949
Attached To
D6571: Add support for a redis-based reporting for invalid mirrorred objects
Event Timeline
Log In to Comment