Page MenuHomeSoftware Heritage

D6571.id23940.diff
No OneTemporary

D6571.id23940.diff

diff --git a/requirements-swh-journal.txt b/requirements-swh-journal.txt
--- a/requirements-swh-journal.txt
+++ b/requirements-swh-journal.txt
@@ -1 +1 @@
-swh.journal >= 0.6.2
+swh.journal >= 0.9
diff --git a/requirements-test.txt b/requirements-test.txt
--- a/requirements-test.txt
+++ b/requirements-test.txt
@@ -6,8 +6,10 @@
# adding the [testing] extra.
swh.model[testing] >= 0.0.50
pytz
+pytest-redis
pytest-xdist
types-python-dateutil
types-pytz
types-pyyaml
+types-redis
types-requests
diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,10 +1,11 @@
+aiohttp
+cassandra-driver >= 3.19.0, != 3.21.0
click
+deprecated
flask
+iso8601
+mypy_extensions
psycopg2
-aiohttp
+redis
tenacity
-cassandra-driver >= 3.19.0, != 3.21.0
-deprecated
typing-extensions
-mypy_extensions
-iso8601
diff --git a/swh/storage/cli.py b/swh/storage/cli.py
--- a/swh/storage/cli.py
+++ b/swh/storage/cli.py
@@ -13,7 +13,7 @@
from swh.core.cli import CONTEXT_SETTINGS
from swh.core.cli import swh as swh_cli_group
-from swh.storage.replay import object_converter_fn
+from swh.storage.replay import ModelObjectDeserializer, object_converter_fn
try:
from systemd.daemon import notify
@@ -187,11 +187,21 @@
conf = ctx.obj["config"]
storage = get_storage(**conf.pop("storage"))
+ if "error_reporter" in conf:
+ from redis import Redis
+
+ reporter = Redis(**conf["error_reporter"]).set
+ else:
+ reporter = None
+ deserializer = ModelObjectDeserializer(reporter=reporter)
+
client_cfg = conf.pop("journal_client")
+ client_cfg["value_deserializer"] = deserializer.convert
if object_types:
client_cfg["object_types"] = object_types
if stop_after_objects:
client_cfg["stop_after_objects"] = stop_after_objects
+
try:
client = get_journal_client(**client_cfg)
except ValueError as exc:
diff --git a/swh/storage/fixer.py b/swh/storage/fixer.py
--- a/swh/storage/fixer.py
+++ b/swh/storage/fixer.py
@@ -6,7 +6,7 @@
import copy
import datetime
import logging
-from typing import Any, Dict, List
+from typing import Any, Callable, Dict, List
from swh.model.model import Origin
@@ -116,6 +116,7 @@
""" # noqa
rev = _fix_revision_pypi_empty_string(revision)
rev = _fix_revision_transplant_source(rev)
+ rev.pop("metadata", None) # this haas been dead for a while now
return rev
@@ -249,21 +250,21 @@
return o
+object_fixers: Dict[str, Callable[[Dict], Dict]] = {
+ "content": _fix_content,
+ "revision": _fix_revision,
+ "origin": _fix_origin,
+ "origin_visit": _fix_origin_visit,
+ "raw_extrinsic_metadata": _fix_raw_extrinsic_metadata,
+}
+
+
def fix_objects(object_type: str, objects: List[Dict]) -> List[Dict]:
"""
Fix legacy objects from the journal to bring them up to date with the
latest storage schema.
"""
- if object_type == "content":
- return [_fix_content(v) for v in objects]
- elif object_type == "revision":
- revisions = [_fix_revision(v) for v in objects]
- return [rev for rev in revisions if rev is not None]
- elif object_type == "origin":
- return [_fix_origin(v) for v in objects]
- elif object_type == "origin_visit":
- return [_fix_origin_visit(v) for v in objects]
- elif object_type == "raw_extrinsic_metadata":
- return [_fix_raw_extrinsic_metadata(v) for v in objects]
- else:
- return objects
+ if object_type in object_fixers:
+ fixer = object_fixers[object_type]
+ objects = [fixer(v) for v in objects]
+ return objects
diff --git a/swh/storage/replay.py b/swh/storage/replay.py
--- a/swh/storage/replay.py
+++ b/swh/storage/replay.py
@@ -3,8 +3,11 @@
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
+from collections import Counter
import logging
-from typing import Any, Callable, Container, Dict, List
+from typing import Any, Callable, Container
+from typing import Counter as CounterT
+from typing import Dict, List, Optional, TypeVar, Union, cast
try:
from systemd.daemon import notify
@@ -12,12 +15,15 @@
notify = None
from swh.core.statsd import statsd
+from swh.journal.serializers import kafka_to_value
+from swh.model.hashutil import hash_to_hex
from swh.model.model import (
BaseContent,
BaseModel,
Content,
Directory,
ExtID,
+ HashableObject,
MetadataAuthority,
MetadataFetcher,
Origin,
@@ -29,8 +35,8 @@
SkippedContent,
Snapshot,
)
-from swh.storage.exc import HashCollision
-from swh.storage.fixer import fix_objects
+from swh.storage.exc import HashCollision, StorageArgumentException
+from swh.storage.fixer import object_fixers
from swh.storage.interface import StorageInterface
logger = logging.getLogger(__name__)
@@ -56,8 +62,52 @@
}
+class ModelObjectDeserializer:
+ def __init__(
+ self,
+ validate: bool = True,
+ raise_on_error: bool = False,
+ reporter: Optional[Callable[[str, bytes], None]] = None,
+ ):
+ self.validate = validate
+ self.reporter = reporter
+ self.raise_on_error = raise_on_error
+
+ def convert(self, object_type: str, msg: bytes) -> Optional[BaseModel]:
+ dict_repr = kafka_to_value(msg)
+ if object_type in object_fixers:
+ dict_repr = object_fixers[object_type](dict_repr)
+ obj = object_converter_fn[object_type](dict_repr)
+ if self.validate:
+ if isinstance(obj, HashableObject):
+ cid = obj.compute_hash()
+ if obj.id != cid:
+ error_msg = (
+ f"Object has id {hash_to_hex(obj.id)}, "
+ f"but it should be {hash_to_hex(cid)}: {obj}"
+ )
+ logger.error(error_msg)
+ self.report_failure(msg, obj)
+ if self.raise_on_error:
+ raise StorageArgumentException(error_msg)
+ return None
+ return obj
+
+ def report_failure(self, msg: bytes, obj: BaseModel):
+ if self.reporter:
+ oid: str = ""
+ if hasattr(obj, "swhid"):
+ swhid = obj.swhid() # type: ignore[attr-defined]
+ oid = str(swhid)
+ elif isinstance(obj, HashableObject):
+ uid = obj.compute_hash()
+ oid = f"{obj.object_type}:{uid.hex()}" # type: ignore[attr-defined]
+ if oid:
+ self.reporter(oid, msg)
+
+
def process_replay_objects(
- all_objects: Dict[str, List[Dict[str, Any]]], *, storage: StorageInterface
+ all_objects: Dict[str, List[BaseModel]], *, storage: StorageInterface
) -> None:
for (object_type, objects) in all_objects.items():
logger.debug("Inserting %s %s objects", len(objects), object_type)
@@ -70,9 +120,12 @@
notify("WATCHDOG=1")
+ContentType = TypeVar("ContentType", bound=BaseContent)
+
+
def collision_aware_content_add(
- content_add_fn: Callable[[List[Any]], Dict[str, int]], contents: List[BaseContent],
-) -> None:
+ content_add_fn: Callable[[List[ContentType]], Dict[str, int]]
+) -> Callable[[List[ContentType]], Dict[str, int]]:
"""Add contents to storage. If a hash collision is detected, an error is
logged. Then this adds the other non colliding contents to the storage.
@@ -81,29 +134,37 @@
contents: List of contents or skipped contents to add to storage
"""
- if not contents:
- return
- colliding_content_hashes: List[Dict[str, Any]] = []
- while True:
- try:
- content_add_fn(contents)
- except HashCollision as e:
- colliding_content_hashes.append(
- {
- "algo": e.algo,
- "hash": e.hash_id, # hex hash id
- "objects": e.colliding_contents, # hex hashes
- }
- )
- colliding_hashes = e.colliding_content_hashes()
- # Drop the colliding contents from the transaction
- contents = [c for c in contents if c.hashes() not in colliding_hashes]
- else:
- # Successfully added contents, we are done
- break
- if colliding_content_hashes:
- for collision in colliding_content_hashes:
- logger.error("Collision detected: %(collision)s", {"collision": collision})
+
+ def wrapper(contents: List[ContentType]) -> Dict[str, int]:
+ if not contents:
+ return {}
+ colliding_content_hashes: List[Dict[str, Any]] = []
+ results: CounterT[str] = Counter()
+ while True:
+ try:
+ results.update(content_add_fn(contents))
+ except HashCollision as e:
+ colliding_content_hashes.append(
+ {
+ "algo": e.algo,
+ "hash": e.hash_id, # hex hash id
+ "objects": e.colliding_contents, # hex hashes
+ }
+ )
+ colliding_hashes = e.colliding_content_hashes()
+ # Drop the colliding contents from the transaction
+ contents = [c for c in contents if c.hashes() not in colliding_hashes]
+ else:
+ # Successfully added contents, we are done
+ break
+ if colliding_content_hashes:
+ for collision in colliding_content_hashes:
+ logger.error(
+ "Collision detected: %(collision)s", {"collision": collision}
+ )
+ return dict(results)
+
+ return wrapper
def dict_key_dropper(d: Dict, keys_to_drop: Container) -> Dict:
@@ -112,73 +173,29 @@
def _insert_objects(
- object_type: str, objects: List[Dict], storage: StorageInterface
+ object_type: str, objects: List[BaseModel], storage: StorageInterface
) -> None:
"""Insert objects of type object_type in the storage.
"""
- objects = fix_objects(object_type, objects)
-
- if object_type == "content":
- # for bw compat, skipped content should now be delivered in the skipped_content
- # topic
- contents: List[BaseContent] = []
- skipped_contents: List[BaseContent] = []
- for content in objects:
- c = BaseContent.from_dict(content)
- if isinstance(c, SkippedContent):
- logger.warning(
- "Received a series of skipped_content in the "
- "content topic, this should not happen anymore"
- )
- skipped_contents.append(c)
- else:
- contents.append(c)
- collision_aware_content_add(storage.skipped_content_add, skipped_contents)
- collision_aware_content_add(storage.content_add_metadata, contents)
- elif object_type == "skipped_content":
- skipped_contents = [SkippedContent.from_dict(obj) for obj in objects]
- collision_aware_content_add(storage.skipped_content_add, skipped_contents)
+ if object_type not in object_converter_fn:
+ logger.warning("Received a series of %s, this should not happen", object_type)
+ return
+
+ method = getattr(storage, f"{object_type}_add")
+ if object_type == "skipped_content":
+ method = collision_aware_content_add(method)
+ elif object_type == "content":
+ method = collision_aware_content_add(storage.content_add_metadata)
elif object_type in ("origin_visit", "origin_visit_status"):
origins: List[Origin] = []
- converter_fn = object_converter_fn[object_type]
- model_objs = []
- for obj in objects:
- origins.append(Origin(url=obj["origin"]))
- model_objs.append(converter_fn(obj))
+ for obj in cast(List[Union[OriginVisit, OriginVisitStatus]], objects):
+ origins.append(Origin(url=obj.origin))
storage.origin_add(origins)
- method = getattr(storage, f"{object_type}_add")
- method(model_objs)
elif object_type == "raw_extrinsic_metadata":
- converted = [RawExtrinsicMetadata.from_dict(o) for o in objects]
- authorities = {emd.authority for emd in converted}
- fetchers = {emd.fetcher for emd in converted}
+ emds = cast(List[RawExtrinsicMetadata], objects)
+ authorities = {emd.authority for emd in emds}
+ fetchers = {emd.fetcher for emd in emds}
storage.metadata_authority_add(list(authorities))
storage.metadata_fetcher_add(list(fetchers))
- storage.raw_extrinsic_metadata_add(converted)
- elif object_type == "revision":
- # drop the metadata field from the revision (is any); this field is
- # about to be dropped from the data model (in favor of
- # raw_extrinsic_metadata) and there can be bogus values in the existing
- # journal (metadata with \0000 in it)
- method = getattr(storage, object_type + "_add")
- method(
- [
- object_converter_fn[object_type](dict_key_dropper(o, ("metadata",)))
- for o in objects
- ]
- )
- elif object_type in (
- "directory",
- "extid",
- "revision",
- "release",
- "snapshot",
- "origin",
- "metadata_fetcher",
- "metadata_authority",
- ):
- method = getattr(storage, object_type + "_add")
- method([object_converter_fn[object_type](o) for o in objects])
- else:
- logger.warning("Received a series of %s, this should not happen", object_type)
+ method(objects)
diff --git a/swh/storage/tests/test_backfill.py b/swh/storage/tests/test_backfill.py
--- a/swh/storage/tests/test_backfill.py
+++ b/swh/storage/tests/test_backfill.py
@@ -20,7 +20,7 @@
raw_extrinsic_metadata_target_ranges,
)
from swh.storage.in_memory import InMemoryStorage
-from swh.storage.replay import process_replay_objects
+from swh.storage.replay import ModelObjectDeserializer, process_replay_objects
from swh.storage.tests.test_replay import check_replayed
TEST_CONFIG = {
@@ -239,7 +239,6 @@
}
swh_storage_backend_config["journal_writer"] = journal1
storage = get_storage(**swh_storage_backend_config)
-
# fill the storage and the journal (under prefix1)
for object_type, objects in TEST_OBJECTS.items():
method = getattr(storage, object_type + "_add")
@@ -266,13 +265,16 @@
# now check journal content are the same under both topics
# use the replayer scaffolding to fill storages to make is a bit easier
# Replaying #1
+ deserializer = ModelObjectDeserializer()
sto1 = get_storage(cls="memory")
replayer1 = JournalClient(
brokers=kafka_server,
group_id=f"{kafka_consumer_group}-1",
prefix=prefix1,
stop_on_eof=True,
+ value_deserializer=deserializer.convert,
)
+
worker_fn1 = functools.partial(process_replay_objects, storage=sto1)
replayer1.process(worker_fn1)
@@ -283,6 +285,7 @@
group_id=f"{kafka_consumer_group}-2",
prefix=prefix2,
stop_on_eof=True,
+ value_deserializer=deserializer.convert,
)
worker_fn2 = functools.partial(process_replay_objects, storage=sto2)
replayer2.process(worker_fn2)
diff --git a/swh/storage/tests/test_replay.py b/swh/storage/tests/test_replay.py
--- a/swh/storage/tests/test_replay.py
+++ b/swh/storage/tests/test_replay.py
@@ -7,13 +7,14 @@
import datetime
import functools
import logging
-from typing import Any, Container, Dict, Optional
+import re
+from typing import Any, Container, Dict, Optional, cast
import attr
import pytest
from swh.journal.client import JournalClient
-from swh.journal.serializers import key_to_kafka, value_to_kafka
+from swh.journal.serializers import kafka_to_value, key_to_kafka, value_to_kafka
from swh.model.hashutil import DEFAULT_ALGORITHMS, MultiHash, hash_to_bytes, hash_to_hex
from swh.model.model import Revision, RevisionType
from swh.model.tests.swh_model_data import (
@@ -25,15 +26,17 @@
from swh.model.tests.swh_model_data import TEST_OBJECTS as _TEST_OBJECTS
from swh.storage import get_storage
from swh.storage.cassandra.model import ContentRow, SkippedContentRow
+from swh.storage.exc import StorageArgumentException
from swh.storage.in_memory import InMemoryStorage
-from swh.storage.replay import process_replay_objects
+from swh.storage.replay import ModelObjectDeserializer, process_replay_objects
UTC = datetime.timezone.utc
TEST_OBJECTS = _TEST_OBJECTS.copy()
+# add a revision with metadata to check this later is dropped while being replayed
TEST_OBJECTS["revision"] = list(_TEST_OBJECTS["revision"]) + [
Revision(
- id=hash_to_bytes("a569b03ebe6e5f9f2f6077355c40d89bd6986d0c"),
+ id=hash_to_bytes("51d9d94ab08d3f75512e3a9fd15132e0a7ca7928"),
message=b"hello again",
date=DATES[1],
committer=COMMITTERS[1],
@@ -70,11 +73,13 @@
"journal_writer": journal_writer_config,
}
storage = get_storage(**storage_config)
+ deserializer = ModelObjectDeserializer()
replayer = JournalClient(
brokers=kafka_server,
group_id=kafka_consumer_group,
prefix=kafka_prefix,
stop_on_eof=True,
+ value_deserializer=deserializer.convert,
)
yield storage, replayer
@@ -207,12 +212,6 @@
_check_replay_skipped_content(src, replayer, "skipped_content")
-def test_replay_skipped_content_bwcompat(replayer_storage_and_client):
- """Test the 'content' topic can be used to replay SkippedContent objects."""
- src, replayer = replayer_storage_and_client
- _check_replay_skipped_content(src, replayer, "content")
-
-
# utility functions
@@ -355,12 +354,14 @@
# Fill a destination storage from Kafka, potentially using privileged topics
dst_storage = get_storage(cls="memory")
+ deserializer = ModelObjectDeserializer()
replayer = JournalClient(
brokers=kafka_server,
group_id=kafka_consumer_group,
prefix=kafka_prefix,
stop_after_objects=nb_sent,
privileged=privileged,
+ value_deserializer=deserializer.convert,
)
worker_fn = functools.partial(process_replay_objects, storage=dst_storage)
@@ -373,3 +374,169 @@
assert isinstance(storage, InMemoryStorage) # needed to help mypy
assert isinstance(dst_storage, InMemoryStorage)
check_replayed(storage, dst_storage, expected_anonymized=not privileged)
+
+
+def test_storage_replayer_with_validation_ok(
+ replayer_storage_and_client, caplog, redisdb
+):
+ """Optimal replayer scenario
+
+ with validation activated and reporter set to a redis db.
+
+ - writes objects to a source storage
+ - replayer consumes objects from the topic and replays them
+ - a destination storage is filled from this
+ - nothing has been reported in the redis db
+ - both storages should have the same content
+ """
+ src, replayer = replayer_storage_and_client
+ replayer.deserializer = ModelObjectDeserializer(validate=True, reporter=redisdb.set)
+
+ # Fill Kafka using a source storage
+ nb_sent = 0
+ for object_type, objects in TEST_OBJECTS.items():
+ method = getattr(src, object_type + "_add")
+ method(objects)
+ if object_type == "origin_visit":
+ nb_sent += len(objects) # origin-visit-add adds origin-visit-status as well
+ nb_sent += len(objects)
+
+ # Fill the destination storage from Kafka
+ dst = get_storage(cls="memory")
+ worker_fn = functools.partial(process_replay_objects, storage=dst)
+ nb_inserted = replayer.process(worker_fn)
+ assert nb_sent == nb_inserted
+
+ # check we do not have invalid objects reported
+ invalid = 0
+ reg = re.compile("Object has id [0-9a-f]{40}, but it should be [0-9a-f]{40}: .*")
+ for record in caplog.records:
+ logtext = record.getMessage()
+ if reg.match(logtext):
+ invalid += 1
+ assert invalid == 0, "Invalid objects should not be detected"
+ assert not redisdb.keys()
+ # so the dst should be the same as src storage
+ check_replayed(cast(InMemoryStorage, src), cast(InMemoryStorage, dst))
+
+
+def test_storage_replayer_with_validation_nok(
+ replayer_storage_and_client, caplog, redisdb
+):
+ """Replayer scenario with invalid objects
+
+ with validation and reporter set to a redis db.
+
+ - writes objects to a source storage
+ - replayer consumes objects from the topic and replays them
+ - the destination storage is filled with only valid objects
+ - the redis db contains the invalid (raw kafka mesg) objects
+ """
+ src, replayer = replayer_storage_and_client
+ replayer.value_deserializer = ModelObjectDeserializer(
+ validate=True, reporter=redisdb.set
+ ).convert
+
+ caplog.set_level(logging.ERROR, "swh.journal.replay")
+
+ # Fill Kafka using a source storage
+ nb_sent = 0
+ for object_type, objects in TEST_OBJECTS.items():
+ method = getattr(src, object_type + "_add")
+ method(objects)
+ if object_type == "origin_visit":
+ nb_sent += len(objects) # origin-visit-add adds origin-visit-status as well
+ nb_sent += len(objects)
+
+ # insert invalid objects
+ for object_type in ("revision", "directory", "release", "snapshot"):
+ method = getattr(src, object_type + "_add")
+ method([attr.evolve(TEST_OBJECTS[object_type][0], id=hash_to_bytes("0" * 40))])
+ nb_sent += 1
+
+ # Fill the destination storage from Kafka
+ dst = get_storage(cls="memory")
+ worker_fn = functools.partial(process_replay_objects, storage=dst)
+ nb_inserted = replayer.process(worker_fn)
+ assert nb_sent == nb_inserted
+
+ # check we do have invalid objects reported
+ invalid = 0
+ reg = re.compile("Object has id [0-9a-f]{40}, but it should be [0-9a-f]{40}: .*")
+ for record in caplog.records:
+ logtext = record.getMessage()
+ if reg.match(logtext):
+ invalid += 1
+ assert invalid == 4, "Invalid objects should be detected"
+ assert set(redisdb.keys()) == {
+ f"swh:1:{typ}:{'0'*40}".encode() for typ in ("rel", "rev", "snp", "dir")
+ }
+
+ for key in redisdb.keys():
+ # check the stored value looks right
+ rawvalue = redisdb.get(key)
+ value = kafka_to_value(rawvalue)
+ assert isinstance(value, dict)
+ assert "id" in value
+ assert value["id"] == b"\x00" * 20
+
+ # check that invalid objects did not reach the dst storage
+ for attr_ in (
+ "directories",
+ "revisions",
+ "releases",
+ "snapshots",
+ ):
+ for id, obj in sorted(getattr(dst._cql_runner, f"_{attr_}").iter_all()):
+ assert id != b"\x00" * 20
+
+
+def test_storage_replayer_with_validation_nok_raises(
+ replayer_storage_and_client, caplog, redisdb
+):
+ """Replayer scenario with invalid objects
+
+ with raise_on_error set to True
+
+ This:
+ - writes both valid & invalid objects to a source storage
+ - a StorageArgumentException should be raised while replayer consumes
+ objects from the topic and replays them
+ """
+ src, replayer = replayer_storage_and_client
+ replayer.value_deserializer = ModelObjectDeserializer(
+ validate=True, reporter=redisdb.set, raise_on_error=True
+ ).convert
+
+ caplog.set_level(logging.ERROR, "swh.journal.replay")
+
+ # Fill Kafka using a source storage
+ nb_sent = 0
+ for object_type, objects in TEST_OBJECTS.items():
+ method = getattr(src, object_type + "_add")
+ method(objects)
+ if object_type == "origin_visit":
+ nb_sent += len(objects) # origin-visit-add adds origin-visit-status as well
+ nb_sent += len(objects)
+
+ # insert invalid objects
+ for object_type in ("revision", "directory", "release", "snapshot"):
+ method = getattr(src, object_type + "_add")
+ method([attr.evolve(TEST_OBJECTS[object_type][0], id=hash_to_bytes("0" * 40))])
+ nb_sent += 1
+
+ # Fill the destination storage from Kafka
+ dst = get_storage(cls="memory")
+ worker_fn = functools.partial(process_replay_objects, storage=dst)
+ with pytest.raises(StorageArgumentException):
+ replayer.process(worker_fn)
+
+ # check we do have invalid objects reported
+ invalid = 0
+ reg = re.compile("Object has id [0-9a-f]{40}, but it should be [0-9a-f]{40}: .*")
+ for record in caplog.records:
+ logtext = record.getMessage()
+ if reg.match(logtext):
+ invalid += 1
+ assert invalid == 1, "One invalid objects should be detected"
+ assert len(redisdb.keys()) == 1

File Metadata

Mime Type
text/plain
Expires
Nov 5 2024, 7:26 AM (10 w, 6 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3230949

Event Timeline