Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/swh/storage/tests/test_kafka_writer.py b/swh/storage/tests/test_kafka_writer.py
index 7e2dcd2a..332c2e95 100644
--- a/swh/storage/tests/test_kafka_writer.py
+++ b/swh/storage/tests/test_kafka_writer.py
@@ -1,60 +1,154 @@
# Copyright (C) 2018-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from confluent_kafka import Consumer
from swh.storage import get_storage
from swh.model.model import Origin, OriginVisit
+from swh.model.hypothesis_strategies import objects
from swh.journal.pytest_plugin import consume_messages, assert_all_objects_consumed
from swh.journal.tests.journal_data import TEST_OBJECTS
+from swh.model.model import Person
+from attr import asdict, has
+from hypothesis import given
+from hypothesis.strategies import lists
+
def test_storage_direct_writer(kafka_prefix: str, kafka_server, consumer: Consumer):
writer_config = {
"cls": "kafka",
"brokers": [kafka_server],
"client_id": "kafka_writer",
"prefix": kafka_prefix,
+ "anonymize": False,
}
storage_config = {
"cls": "pipeline",
"steps": [{"cls": "memory", "journal_writer": writer_config},],
}
storage = get_storage(**storage_config)
expected_messages = 0
- for object_type, objects in TEST_OBJECTS.items():
- method = getattr(storage, object_type + "_add")
- if object_type in (
+ for obj_type, objs in TEST_OBJECTS.items():
+ method = getattr(storage, obj_type + "_add")
+ if obj_type in (
"content",
"skipped_content",
"directory",
"revision",
"release",
"snapshot",
"origin",
):
- method(objects)
- expected_messages += len(objects)
- elif object_type in ("origin_visit",):
- for obj in objects:
+ method(objs)
+ expected_messages += len(objs)
+ elif obj_type in ("origin_visit",):
+ for obj in objs:
assert isinstance(obj, OriginVisit)
storage.origin_add_one(Origin(url=obj.origin))
visit = method(obj.origin, date=obj.date, type=obj.type)
expected_messages += 1
obj_d = obj.to_dict()
for k in ("visit", "origin", "date", "type"):
del obj_d[k]
storage.origin_visit_update(obj.origin, visit.visit, **obj_d)
expected_messages += 1
else:
- assert False, object_type
+ assert False, obj_type
+
+ existing_topics = set(
+ topic
+ for topic in consumer.list_topics(timeout=10).topics.keys()
+ if topic.startswith(f"{kafka_prefix}.") # final . to exclude privileged topics
+ )
+ assert existing_topics == {
+ f"{kafka_prefix}.{obj_type}"
+ for obj_type in (
+ "content",
+ "directory",
+ "origin",
+ "origin_visit",
+ "release",
+ "revision",
+ "snapshot",
+ "skipped_content",
+ )
+ }
consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages)
assert_all_objects_consumed(consumed_messages)
+
+
+def test_storage_direct_writer_anonymized(
+ kafka_prefix: str, kafka_server, consumer: Consumer
+):
+
+ writer_config = {
+ "cls": "kafka",
+ "brokers": [kafka_server],
+ "client_id": "kafka_writer",
+ "prefix": kafka_prefix,
+ "anonymize": True,
+ }
+ storage_config = {
+ "cls": "pipeline",
+ "steps": [{"cls": "memory", "journal_writer": writer_config},],
+ }
+
+ storage = get_storage(**storage_config)
+
+ expected_messages = 0
+
+ for obj_type, objs in TEST_OBJECTS.items():
+ if obj_type == "origin_visit":
+ # these have non-consistent API and are unrelated with what we
+ # want to test here
+ continue
+ method = getattr(storage, obj_type + "_add")
+ method(objs)
+ expected_messages += len(objs)
+
+ existing_topics = set(
+ topic
+ for topic in consumer.list_topics(timeout=10).topics.keys()
+ if topic.startswith(kafka_prefix)
+ )
+ assert existing_topics == {
+ f"{kafka_prefix}.{obj_type}"
+ for obj_type in (
+ "content",
+ "directory",
+ "origin",
+ "origin_visit",
+ "release",
+ "revision",
+ "snapshot",
+ "skipped_content",
+ )
+ } | {
+ f"{kafka_prefix}_privileged.{obj_type}" for obj_type in ("release", "revision",)
+ }
+
+
+def check_anonymized_obj(obj):
+ if has(obj):
+ if isinstance(obj, Person):
+ assert obj.name is None
+ assert obj.email is None
+ assert len(obj.fullname) == 32
+ else:
+ for key, value in asdict(obj, recurse=False).items():
+ check_anonymized_obj(value)
+
+
+@given(lists(objects(split_content=True)))
+def test_anonymizer(obj_type_and_objs):
+ for obj_type, obj in obj_type_and_objs:
+ check_anonymized_obj(obj.anonymize())
diff --git a/swh/storage/tests/test_replay.py b/swh/storage/tests/test_replay.py
index 670d7431..7c8622b0 100644
--- a/swh/storage/tests/test_replay.py
+++ b/swh/storage/tests/test_replay.py
@@ -1,269 +1,368 @@
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import functools
import logging
from typing import Container, Dict, Optional
import pytest
from swh.model.hashutil import hash_to_hex, MultiHash, DEFAULT_ALGORITHMS
from swh.model.model import Content
from swh.storage import get_storage
from swh.storage.in_memory import InMemoryStorage
from swh.storage.replay import process_replay_objects
from swh.journal.serializers import key_to_kafka, value_to_kafka
from swh.journal.client import JournalClient
from swh.journal.tests.journal_data import (
TEST_OBJECTS,
DUPLICATE_CONTENTS,
)
UTC = datetime.timezone.utc
@pytest.fixture()
def replayer_storage_and_client(
kafka_prefix: str, kafka_consumer_group: str, kafka_server: str
):
journal_writer_config = {
"cls": "kafka",
"brokers": [kafka_server],
"client_id": "kafka_writer",
"prefix": kafka_prefix,
}
storage_config = {
"cls": "memory",
"journal_writer": journal_writer_config,
}
storage = get_storage(**storage_config)
replayer = JournalClient(
brokers=kafka_server,
group_id=kafka_consumer_group,
prefix=kafka_prefix,
stop_on_eof=True,
)
yield storage, replayer
def test_storage_replayer(replayer_storage_and_client, caplog):
"""Optimal replayer scenario.
This:
- writes objects to a source storage
- - replayer consumes objects from the topic and replay them
+ - replayer consumes objects from the topic and replays them
- a destination storage is filled from this
In the end, both storages should have the same content.
"""
src, replayer = replayer_storage_and_client
# Fill Kafka using a source storage
nb_sent = 0
for object_type, objects in TEST_OBJECTS.items():
if object_type == "origin_visit":
# src.origin_visit_upsert(objects)
for visit in objects:
src.origin_visit_add(
origin_url=visit.origin, date=visit.date, type=visit.type
)
else:
method = getattr(src, object_type + "_add")
method(objects)
nb_sent += len(objects)
caplog.set_level(logging.ERROR, "swh.journal.replay")
# Fill the destination storage from Kafka
dst = get_storage(cls="memory")
worker_fn = functools.partial(process_replay_objects, storage=dst)
nb_inserted = replayer.process(worker_fn)
assert nb_sent == nb_inserted
_check_replayed(src, dst)
collision = 0
for record in caplog.records:
logtext = record.getMessage()
if "Colliding contents:" in logtext:
collision += 1
assert collision == 0, "No collision should be detected"
def test_storage_play_with_collision(replayer_storage_and_client, caplog):
"""Another replayer scenario with collisions.
This:
- writes objects to the topic, including colliding contents
- replayer consumes objects from the topic and replay them
- This drops the colliding contents from the replay when detected
"""
src, replayer = replayer_storage_and_client
# Fill Kafka using a source storage
nb_sent = 0
for object_type, objects in TEST_OBJECTS.items():
if object_type == "origin_visit":
for visit in objects:
src.origin_visit_add(
origin_url=visit.origin, date=visit.date, type=visit.type
)
else:
method = getattr(src, object_type + "_add")
method(objects)
nb_sent += len(objects)
# Create collision in input data
# These should not be written in the destination
producer = src.journal_writer.journal.producer
prefix = src.journal_writer.journal._prefix
for content in DUPLICATE_CONTENTS:
topic = f"{prefix}.content"
key = content["sha1"]
producer.produce(
topic=topic, key=key_to_kafka(key), value=value_to_kafka(content),
)
nb_sent += 1
producer.flush()
caplog.set_level(logging.ERROR, "swh.journal.replay")
# Fill the destination storage from Kafka
dst = get_storage(cls="memory")
worker_fn = functools.partial(process_replay_objects, storage=dst)
nb_inserted = replayer.process(worker_fn)
assert nb_sent == nb_inserted
# check the logs for the collision being properly detected
nb_collisions = 0
actual_collision: Dict
for record in caplog.records:
logtext = record.getMessage()
if "Collision detected:" in logtext:
nb_collisions += 1
actual_collision = record.args["collision"]
assert nb_collisions == 1, "1 collision should be detected"
algo = "sha1"
assert actual_collision["algo"] == algo
expected_colliding_hash = hash_to_hex(DUPLICATE_CONTENTS[0][algo])
assert actual_collision["hash"] == expected_colliding_hash
actual_colliding_hashes = actual_collision["objects"]
assert len(actual_colliding_hashes) == len(DUPLICATE_CONTENTS)
for content in DUPLICATE_CONTENTS:
expected_content_hashes = {
k: hash_to_hex(v) for k, v in Content.from_dict(content).hashes().items()
}
assert expected_content_hashes in actual_colliding_hashes
# all objects from the src should exists in the dst storage
_check_replayed(src, dst, exclude=["contents"])
# but the dst has one content more (one of the 2 colliding ones)
assert len(src._contents) == len(dst._contents) - 1
def test_replay_skipped_content(replayer_storage_and_client):
"""Test the 'skipped_content' topic is properly replayed."""
src, replayer = replayer_storage_and_client
_check_replay_skipped_content(src, replayer, "skipped_content")
def test_replay_skipped_content_bwcompat(replayer_storage_and_client):
"""Test the 'content' topic can be used to replay SkippedContent objects."""
src, replayer = replayer_storage_and_client
_check_replay_skipped_content(src, replayer, "content")
# utility functions
def _check_replayed(
src: InMemoryStorage, dst: InMemoryStorage, exclude: Optional[Container] = None
):
"""Simple utility function to compare the content of 2 in_memory storages
"""
- expected_persons = set(src._persons)
- got_persons = set(dst._persons)
+ expected_persons = set(src._persons.values())
+ got_persons = set(dst._persons.values())
assert got_persons == expected_persons
for attr in (
"contents",
"skipped_contents",
"directories",
"revisions",
"releases",
"snapshots",
"origins",
"origin_visits",
):
if exclude and attr in exclude:
continue
expected_objects = sorted(getattr(src, f"_{attr}").items())
got_objects = sorted(getattr(dst, f"_{attr}").items())
assert got_objects == expected_objects, f"Mismatch object list for {attr}"
def _check_replay_skipped_content(storage, replayer, topic):
skipped_contents = _gen_skipped_contents(100)
nb_sent = len(skipped_contents)
producer = storage.journal_writer.journal.producer
prefix = storage.journal_writer.journal._prefix
for i, obj in enumerate(skipped_contents):
producer.produce(
topic=f"{prefix}.{topic}",
key=key_to_kafka({"sha1": obj["sha1"]}),
value=value_to_kafka(obj),
)
producer.flush()
dst_storage = get_storage(cls="memory")
worker_fn = functools.partial(process_replay_objects, storage=dst_storage)
nb_inserted = replayer.process(worker_fn)
assert nb_sent == nb_inserted
for content in skipped_contents:
assert not storage.content_find({"sha1": content["sha1"]})
# no skipped_content_find API endpoint, so use this instead
assert not list(dst_storage.skipped_content_missing(skipped_contents))
def _updated(d1, d2):
d1.update(d2)
d1.pop("data", None)
return d1
def _gen_skipped_contents(n=10):
# we do not use the hypothesis strategy here because this does not play well with
# pytest fixtures, and it makes test execution very slow
algos = DEFAULT_ALGORITHMS | {"length"}
now = datetime.datetime.now(tz=UTC)
return [
_updated(
MultiHash.from_data(data=f"foo{i}".encode(), hash_names=algos).digest(),
{
"status": "absent",
"reason": "why not",
"origin": f"https://somewhere/{i}",
"ctime": now,
},
)
for i in range(n)
]
+
+
+def test_storage_play_anonymized(
+ kafka_prefix: str, kafka_consumer_group: str, kafka_server: str
+):
+ """Optimal replayer scenario.
+
+ This:
+ - writes objects to the topic
+ - replayer consumes objects from the topic and replay them
+
+ """
+ writer_config = {
+ "cls": "kafka",
+ "brokers": [kafka_server],
+ "client_id": "kafka_writer",
+ "prefix": kafka_prefix,
+ "anonymize": True,
+ }
+ src_config = {"cls": "memory", "journal_writer": writer_config}
+
+ storage = get_storage(**src_config)
+
+ # Fill the src storage
+ nb_sent = 0
+ for obj_type, objs in TEST_OBJECTS.items():
+ if obj_type == "origin_visit":
+ # these have non-consistent API and are unrelated with what we
+ # want to test here
+ continue
+ method = getattr(storage, obj_type + "_add")
+ method(objs)
+ nb_sent += len(objs)
+
+ # Fill a destination storage from Kafka **using anonymized topics**
+ dst_storage = get_storage(cls="memory")
+ replayer = JournalClient(
+ brokers=kafka_server,
+ group_id=kafka_consumer_group,
+ prefix=kafka_prefix,
+ stop_after_objects=nb_sent,
+ privileged=False,
+ )
+ worker_fn = functools.partial(process_replay_objects, storage=dst_storage)
+
+ nb_inserted = replayer.process(worker_fn)
+ assert nb_sent == nb_inserted
+ check_replayed(storage, dst_storage, expected_anonymized=True)
+
+ # Fill a destination storage from Kafka **with stock (non-anonymized) topics**
+ dst_storage = get_storage(cls="memory")
+ replayer = JournalClient(
+ brokers=kafka_server,
+ group_id=kafka_consumer_group,
+ prefix=kafka_prefix,
+ stop_after_objects=nb_sent,
+ privileged=True,
+ )
+ worker_fn = functools.partial(process_replay_objects, storage=dst_storage)
+
+ nb_inserted = replayer.process(worker_fn)
+ assert nb_sent == nb_inserted
+ check_replayed(storage, dst_storage, expected_anonymized=False)
+
+
+def check_replayed(src, dst, expected_anonymized=False):
+ """Simple utility function to compare the content of 2 in_memory storages
+
+ If expected_anonymized is True, objects from the source storage are anonymized
+ before comparing with the destination storage.
+
+ """
+
+ def maybe_anonymize(obj):
+ if expected_anonymized:
+ return obj.anonymize() or obj
+ return obj
+
+ expected_persons = {maybe_anonymize(person) for person in src._persons.values()}
+ got_persons = set(dst._persons.values())
+ assert got_persons == expected_persons
+
+ for attr in (
+ "contents",
+ "skipped_contents",
+ "directories",
+ "revisions",
+ "releases",
+ "snapshots",
+ "origins",
+ ):
+ expected_objects = [
+ (id, maybe_anonymize(obj))
+ for id, obj in sorted(getattr(src, f"_{attr}").items())
+ ]
+ got_objects = [
+ (id, obj) for id, obj in sorted(getattr(dst, f"_{attr}").items())
+ ]
+ assert got_objects == expected_objects, f"Mismatch object list for {attr}"

File Metadata

Mime Type
text/x-diff
Expires
Thu, Jul 3, 11:59 AM (3 d, 5 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3276924

Event Timeline