Changeset View
Changeset View
Standalone View
Standalone View
swh/storage/tests/test_replay.py
# Copyright (C) 2019-2020 The Software Heritage developers | # Copyright (C) 2019-2020 The Software Heritage developers | ||||||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||||||
import dataclasses | import dataclasses | ||||||||
import datetime | import datetime | ||||||||
import functools | import functools | ||||||||
import logging | import logging | ||||||||
from typing import Any, Container, Dict, Optional | import re | ||||||||
from typing import Any, Container, Dict, Optional, cast | |||||||||
import attr | import attr | ||||||||
import pytest | import pytest | ||||||||
from swh.journal.client import JournalClient | from swh.journal.client import JournalClient | ||||||||
from swh.journal.serializers import key_to_kafka, value_to_kafka | from swh.journal.serializers import kafka_to_value, key_to_kafka, value_to_kafka | ||||||||
from swh.model.hashutil import DEFAULT_ALGORITHMS, MultiHash, hash_to_bytes, hash_to_hex | from swh.model.hashutil import DEFAULT_ALGORITHMS, MultiHash, hash_to_bytes, hash_to_hex | ||||||||
from swh.model.model import Revision, RevisionType | from swh.model.model import Revision, RevisionType | ||||||||
from swh.model.tests.swh_model_data import ( | from swh.model.tests.swh_model_data import ( | ||||||||
COMMITTERS, | COMMITTERS, | ||||||||
DATES, | DATES, | ||||||||
DUPLICATE_CONTENTS, | DUPLICATE_CONTENTS, | ||||||||
REVISIONS, | REVISIONS, | ||||||||
) | ) | ||||||||
from swh.model.tests.swh_model_data import TEST_OBJECTS as _TEST_OBJECTS | from swh.model.tests.swh_model_data import TEST_OBJECTS as _TEST_OBJECTS | ||||||||
from swh.storage import get_storage | from swh.storage import get_storage | ||||||||
from swh.storage.cassandra.model import ContentRow, SkippedContentRow | from swh.storage.cassandra.model import ContentRow, SkippedContentRow | ||||||||
from swh.storage.exc import StorageArgumentException | |||||||||
from swh.storage.in_memory import InMemoryStorage | from swh.storage.in_memory import InMemoryStorage | ||||||||
from swh.storage.replay import process_replay_objects | from swh.storage.replay import ModelObjectDeserializer, process_replay_objects | ||||||||
UTC = datetime.timezone.utc | UTC = datetime.timezone.utc | ||||||||
TEST_OBJECTS = _TEST_OBJECTS.copy() | TEST_OBJECTS = _TEST_OBJECTS.copy() | ||||||||
# add a revision with metadata to check this later is dropped while being replayed | |||||||||
TEST_OBJECTS["revision"] = list(_TEST_OBJECTS["revision"]) + [ | TEST_OBJECTS["revision"] = list(_TEST_OBJECTS["revision"]) + [ | ||||||||
Revision( | Revision( | ||||||||
id=hash_to_bytes("a569b03ebe6e5f9f2f6077355c40d89bd6986d0c"), | id=hash_to_bytes("51d9d94ab08d3f75512e3a9fd15132e0a7ca7928"), | ||||||||
message=b"hello again", | message=b"hello again", | ||||||||
date=DATES[1], | date=DATES[1], | ||||||||
committer=COMMITTERS[1], | committer=COMMITTERS[1], | ||||||||
author=COMMITTERS[0], | author=COMMITTERS[0], | ||||||||
committer_date=DATES[0], | committer_date=DATES[0], | ||||||||
type=RevisionType.GIT, | type=RevisionType.GIT, | ||||||||
directory=b"\x03" * 20, | directory=b"\x03" * 20, | ||||||||
synthetic=False, | synthetic=False, | ||||||||
Show All 20 Lines | journal_writer_config = { | ||||||||
"client_id": "kafka_writer", | "client_id": "kafka_writer", | ||||||||
"prefix": kafka_prefix, | "prefix": kafka_prefix, | ||||||||
} | } | ||||||||
storage_config: Dict[str, Any] = { | storage_config: Dict[str, Any] = { | ||||||||
"cls": "memory", | "cls": "memory", | ||||||||
"journal_writer": journal_writer_config, | "journal_writer": journal_writer_config, | ||||||||
} | } | ||||||||
storage = get_storage(**storage_config) | storage = get_storage(**storage_config) | ||||||||
deserializer = ModelObjectDeserializer() | |||||||||
replayer = JournalClient( | replayer = JournalClient( | ||||||||
brokers=kafka_server, | brokers=kafka_server, | ||||||||
group_id=kafka_consumer_group, | group_id=kafka_consumer_group, | ||||||||
prefix=kafka_prefix, | prefix=kafka_prefix, | ||||||||
stop_on_eof=True, | stop_on_eof=True, | ||||||||
value_deserializer=deserializer.convert, | |||||||||
) | ) | ||||||||
yield storage, replayer | yield storage, replayer | ||||||||
def test_storage_replayer(replayer_storage_and_client, caplog): | def test_storage_replayer(replayer_storage_and_client, caplog): | ||||||||
"""Optimal replayer scenario. | """Optimal replayer scenario. | ||||||||
▲ Show 20 Lines • Show All 116 Lines • ▼ Show 20 Lines | |||||||||
def test_replay_skipped_content(replayer_storage_and_client): | def test_replay_skipped_content(replayer_storage_and_client): | ||||||||
"""Test the 'skipped_content' topic is properly replayed.""" | """Test the 'skipped_content' topic is properly replayed.""" | ||||||||
src, replayer = replayer_storage_and_client | src, replayer = replayer_storage_and_client | ||||||||
_check_replay_skipped_content(src, replayer, "skipped_content") | _check_replay_skipped_content(src, replayer, "skipped_content") | ||||||||
def test_replay_skipped_content_bwcompat(replayer_storage_and_client): | |||||||||
"""Test the 'content' topic can be used to replay SkippedContent objects.""" | |||||||||
src, replayer = replayer_storage_and_client | |||||||||
_check_replay_skipped_content(src, replayer, "content") | |||||||||
# utility functions | # utility functions | ||||||||
def check_replayed( | def check_replayed( | ||||||||
src: InMemoryStorage, | src: InMemoryStorage, | ||||||||
dst: InMemoryStorage, | dst: InMemoryStorage, | ||||||||
exclude: Optional[Container] = None, | exclude: Optional[Container] = None, | ||||||||
expected_anonymized=False, | expected_anonymized=False, | ||||||||
▲ Show 20 Lines • Show All 126 Lines • ▼ Show 20 Lines | for obj_type, objs in TEST_OBJECTS.items(): | ||||||||
# these are unrelated with what we want to test here | # these are unrelated with what we want to test here | ||||||||
continue | continue | ||||||||
method = getattr(storage, obj_type + "_add") | method = getattr(storage, obj_type + "_add") | ||||||||
method(objs) | method(objs) | ||||||||
nb_sent += len(objs) | nb_sent += len(objs) | ||||||||
# Fill a destination storage from Kafka, potentially using privileged topics | # Fill a destination storage from Kafka, potentially using privileged topics | ||||||||
dst_storage = get_storage(cls="memory") | dst_storage = get_storage(cls="memory") | ||||||||
deserializer = ModelObjectDeserializer() | |||||||||
replayer = JournalClient( | replayer = JournalClient( | ||||||||
brokers=kafka_server, | brokers=kafka_server, | ||||||||
group_id=kafka_consumer_group, | group_id=kafka_consumer_group, | ||||||||
prefix=kafka_prefix, | prefix=kafka_prefix, | ||||||||
stop_after_objects=nb_sent, | stop_after_objects=nb_sent, | ||||||||
privileged=privileged, | privileged=privileged, | ||||||||
value_deserializer=deserializer.convert, | |||||||||
) | ) | ||||||||
worker_fn = functools.partial(process_replay_objects, storage=dst_storage) | worker_fn = functools.partial(process_replay_objects, storage=dst_storage) | ||||||||
nb_inserted = replayer.process(worker_fn) | nb_inserted = replayer.process(worker_fn) | ||||||||
replayer.consumer.commit() | replayer.consumer.commit() | ||||||||
assert nb_sent == nb_inserted | assert nb_sent == nb_inserted | ||||||||
# Check the contents of the destination storage, and whether the anonymization was | # Check the contents of the destination storage, and whether the anonymization was | ||||||||
# properly used | # properly used | ||||||||
assert isinstance(storage, InMemoryStorage) # needed to help mypy | assert isinstance(storage, InMemoryStorage) # needed to help mypy | ||||||||
assert isinstance(dst_storage, InMemoryStorage) | assert isinstance(dst_storage, InMemoryStorage) | ||||||||
check_replayed(storage, dst_storage, expected_anonymized=not privileged) | check_replayed(storage, dst_storage, expected_anonymized=not privileged) | ||||||||
def test_storage_replayer_with_validation_ok( | |||||||||
replayer_storage_and_client, caplog, redisdb | |||||||||
): | |||||||||
"""Optimal replayer scenario | |||||||||
with validation activated and reporter set to a redis db. | |||||||||
- writes objects to a source storage | |||||||||
- replayer consumes objects from the topic and replays them | |||||||||
- a destination storage is filled from this | |||||||||
- nothing has been reported in the redis db | |||||||||
- both storages should have the same content | |||||||||
""" | |||||||||
src, replayer = replayer_storage_and_client | |||||||||
replayer.deserializer = ModelObjectDeserializer(validate=True, reporter=redisdb.set) | |||||||||
# Fill Kafka using a source storage | |||||||||
nb_sent = 0 | |||||||||
for object_type, objects in TEST_OBJECTS.items(): | |||||||||
method = getattr(src, object_type + "_add") | |||||||||
method(objects) | |||||||||
if object_type == "origin_visit": | |||||||||
nb_sent += len(objects) # origin-visit-add adds origin-visit-status as well | |||||||||
nb_sent += len(objects) | |||||||||
# Fill the destination storage from Kafka | |||||||||
dst = get_storage(cls="memory") | |||||||||
worker_fn = functools.partial(process_replay_objects, storage=dst) | |||||||||
nb_inserted = replayer.process(worker_fn) | |||||||||
assert nb_sent == nb_inserted | |||||||||
# check we do not have invalid objects reported | |||||||||
invalid = 0 | |||||||||
vlorentz: share this regexp with `test_storage_replayer_with_validation_nok` (maybe the whole for loop… | |||||||||
reg = re.compile("Object has id [0-9a-f]{40}, but it should be [0-9a-f]{40}: .*") | |||||||||
for record in caplog.records: | |||||||||
logtext = record.getMessage() | |||||||||
if reg.match(logtext): | |||||||||
invalid += 1 | |||||||||
assert invalid == 0, "Invalid objects should not be detected" | |||||||||
assert not redisdb.keys() | |||||||||
# so the dst should be the same as src storage | |||||||||
check_replayed(cast(InMemoryStorage, src), cast(InMemoryStorage, dst)) | |||||||||
def test_storage_replayer_with_validation_nok( | |||||||||
replayer_storage_and_client, caplog, redisdb | |||||||||
): | |||||||||
"""Replayer scenario with invalid objects | |||||||||
with validation and reporter set to a redis db. | |||||||||
- writes objects to a source storage | |||||||||
- replayer consumes objects from the topic and replays them | |||||||||
- the destination storage is filled with only valid objects | |||||||||
- the redis db contains the invalid (raw kafka mesg) objects | |||||||||
""" | |||||||||
src, replayer = replayer_storage_and_client | |||||||||
replayer.value_deserializer = ModelObjectDeserializer( | |||||||||
validate=True, reporter=redisdb.set | |||||||||
).convert | |||||||||
caplog.set_level(logging.ERROR, "swh.journal.replay") | |||||||||
# Fill Kafka using a source storage | |||||||||
nb_sent = 0 | |||||||||
for object_type, objects in TEST_OBJECTS.items(): | |||||||||
method = getattr(src, object_type + "_add") | |||||||||
method(objects) | |||||||||
if object_type == "origin_visit": | |||||||||
nb_sent += len(objects) # origin-visit-add adds origin-visit-status as well | |||||||||
nb_sent += len(objects) | |||||||||
# insert invalid objects | |||||||||
for object_type in ("revision", "directory", "release", "snapshot"): | |||||||||
method = getattr(src, object_type + "_add") | |||||||||
Done Inline Actions
shorter, and consistent with the code below vlorentz: shorter, and consistent with the code below | |||||||||
Done Inline Actionsagreed, thx douardda: agreed, thx | |||||||||
method([attr.evolve(TEST_OBJECTS[object_type][0], id=hash_to_bytes("0" * 40))]) | |||||||||
nb_sent += 1 | |||||||||
# Fill the destination storage from Kafka | |||||||||
dst = get_storage(cls="memory") | |||||||||
worker_fn = functools.partial(process_replay_objects, storage=dst) | |||||||||
nb_inserted = replayer.process(worker_fn) | |||||||||
assert nb_sent == nb_inserted | |||||||||
# check we do have invalid objects reported | |||||||||
invalid = 0 | |||||||||
reg = re.compile("Object has id [0-9a-f]{40}, but it should be [0-9a-f]{40}: .*") | |||||||||
for record in caplog.records: | |||||||||
logtext = record.getMessage() | |||||||||
if reg.match(logtext): | |||||||||
invalid += 1 | |||||||||
assert invalid == 4, "Invalid objects should be detected" | |||||||||
assert set(redisdb.keys()) == { | |||||||||
f"swh:1:{typ}:{'0'*40}".encode() for typ in ("rel", "rev", "snp", "dir") | |||||||||
} | |||||||||
for key in redisdb.keys(): | |||||||||
# check the stored value looks right | |||||||||
rawvalue = redisdb.get(key) | |||||||||
value = kafka_to_value(rawvalue) | |||||||||
assert isinstance(value, dict) | |||||||||
assert "id" in value | |||||||||
assert value["id"] == b"\x00" * 20 | |||||||||
# check that invalid objects did not reach the dst storage | |||||||||
for attr_ in ( | |||||||||
"directories", | |||||||||
"revisions", | |||||||||
"releases", | |||||||||
"snapshots", | |||||||||
): | |||||||||
for id, obj in sorted(getattr(dst._cql_runner, f"_{attr_}").iter_all()): | |||||||||
assert id != b"\x00" * 20 | |||||||||
def test_storage_replayer_with_validation_nok_raises( | |||||||||
replayer_storage_and_client, caplog, redisdb | |||||||||
): | |||||||||
"""Replayer scenario with invalid objects | |||||||||
with raise_on_error set to True | |||||||||
This: | |||||||||
- writes both valid & invalid objects to a source storage | |||||||||
- a StorageArgumentException should be raised while replayer consumes | |||||||||
objects from the topic and replays them | |||||||||
""" | |||||||||
src, replayer = replayer_storage_and_client | |||||||||
replayer.value_deserializer = ModelObjectDeserializer( | |||||||||
validate=True, reporter=redisdb.set, raise_on_error=True | |||||||||
).convert | |||||||||
caplog.set_level(logging.ERROR, "swh.journal.replay") | |||||||||
# Fill Kafka using a source storage | |||||||||
nb_sent = 0 | |||||||||
for object_type, objects in TEST_OBJECTS.items(): | |||||||||
method = getattr(src, object_type + "_add") | |||||||||
method(objects) | |||||||||
if object_type == "origin_visit": | |||||||||
nb_sent += len(objects) # origin-visit-add adds origin-visit-status as well | |||||||||
nb_sent += len(objects) | |||||||||
# insert invalid objects | |||||||||
for object_type in ("revision", "directory", "release", "snapshot"): | |||||||||
method = getattr(src, object_type + "_add") | |||||||||
method([attr.evolve(TEST_OBJECTS[object_type][0], id=hash_to_bytes("0" * 40))]) | |||||||||
nb_sent += 1 | |||||||||
# Fill the destination storage from Kafka | |||||||||
dst = get_storage(cls="memory") | |||||||||
worker_fn = functools.partial(process_replay_objects, storage=dst) | |||||||||
with pytest.raises(StorageArgumentException): | |||||||||
replayer.process(worker_fn) | |||||||||
# check we do have invalid objects reported | |||||||||
invalid = 0 | |||||||||
reg = re.compile("Object has id [0-9a-f]{40}, but it should be [0-9a-f]{40}: .*") | |||||||||
for record in caplog.records: | |||||||||
logtext = record.getMessage() | |||||||||
if reg.match(logtext): | |||||||||
invalid += 1 | |||||||||
assert invalid == 1, "One invalid objects should be detected" | |||||||||
assert len(redisdb.keys()) == 1 |
share this regexp with test_storage_replayer_with_validation_nok (maybe the whole for loop too). Otherwise a single typo in the regexp could make the test miss issues