Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F9313861
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
17 KB
Subscribers
None
View Options
diff --git a/swh/storage/tests/test_kafka_writer.py b/swh/storage/tests/test_kafka_writer.py
index 7e2dcd2a..332c2e95 100644
--- a/swh/storage/tests/test_kafka_writer.py
+++ b/swh/storage/tests/test_kafka_writer.py
@@ -1,60 +1,154 @@
# Copyright (C) 2018-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
from confluent_kafka import Consumer
from swh.storage import get_storage
from swh.model.model import Origin, OriginVisit
+from swh.model.hypothesis_strategies import objects
from swh.journal.pytest_plugin import consume_messages, assert_all_objects_consumed
from swh.journal.tests.journal_data import TEST_OBJECTS
+from swh.model.model import Person
+from attr import asdict, has
+from hypothesis import given
+from hypothesis.strategies import lists
+
def test_storage_direct_writer(kafka_prefix: str, kafka_server, consumer: Consumer):
writer_config = {
"cls": "kafka",
"brokers": [kafka_server],
"client_id": "kafka_writer",
"prefix": kafka_prefix,
+ "anonymize": False,
}
storage_config = {
"cls": "pipeline",
"steps": [{"cls": "memory", "journal_writer": writer_config},],
}
storage = get_storage(**storage_config)
expected_messages = 0
- for object_type, objects in TEST_OBJECTS.items():
- method = getattr(storage, object_type + "_add")
- if object_type in (
+ for obj_type, objs in TEST_OBJECTS.items():
+ method = getattr(storage, obj_type + "_add")
+ if obj_type in (
"content",
"skipped_content",
"directory",
"revision",
"release",
"snapshot",
"origin",
):
- method(objects)
- expected_messages += len(objects)
- elif object_type in ("origin_visit",):
- for obj in objects:
+ method(objs)
+ expected_messages += len(objs)
+ elif obj_type in ("origin_visit",):
+ for obj in objs:
assert isinstance(obj, OriginVisit)
storage.origin_add_one(Origin(url=obj.origin))
visit = method(obj.origin, date=obj.date, type=obj.type)
expected_messages += 1
obj_d = obj.to_dict()
for k in ("visit", "origin", "date", "type"):
del obj_d[k]
storage.origin_visit_update(obj.origin, visit.visit, **obj_d)
expected_messages += 1
else:
- assert False, object_type
+ assert False, obj_type
+
+ existing_topics = set(
+ topic
+ for topic in consumer.list_topics(timeout=10).topics.keys()
+ if topic.startswith(f"{kafka_prefix}.") # final . to exclude privileged topics
+ )
+ assert existing_topics == {
+ f"{kafka_prefix}.{obj_type}"
+ for obj_type in (
+ "content",
+ "directory",
+ "origin",
+ "origin_visit",
+ "release",
+ "revision",
+ "snapshot",
+ "skipped_content",
+ )
+ }
consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages)
assert_all_objects_consumed(consumed_messages)
+
+
+def test_storage_direct_writer_anonymized(
+ kafka_prefix: str, kafka_server, consumer: Consumer
+):
+
+ writer_config = {
+ "cls": "kafka",
+ "brokers": [kafka_server],
+ "client_id": "kafka_writer",
+ "prefix": kafka_prefix,
+ "anonymize": True,
+ }
+ storage_config = {
+ "cls": "pipeline",
+ "steps": [{"cls": "memory", "journal_writer": writer_config},],
+ }
+
+ storage = get_storage(**storage_config)
+
+ expected_messages = 0
+
+ for obj_type, objs in TEST_OBJECTS.items():
+ if obj_type == "origin_visit":
+ # these have non-consistent API and are unrelated with what we
+ # want to test here
+ continue
+ method = getattr(storage, obj_type + "_add")
+ method(objs)
+ expected_messages += len(objs)
+
+ existing_topics = set(
+ topic
+ for topic in consumer.list_topics(timeout=10).topics.keys()
+ if topic.startswith(kafka_prefix)
+ )
+ assert existing_topics == {
+ f"{kafka_prefix}.{obj_type}"
+ for obj_type in (
+ "content",
+ "directory",
+ "origin",
+ "origin_visit",
+ "release",
+ "revision",
+ "snapshot",
+ "skipped_content",
+ )
+ } | {
+ f"{kafka_prefix}_privileged.{obj_type}" for obj_type in ("release", "revision",)
+ }
+
+
+def check_anonymized_obj(obj):
+ if has(obj):
+ if isinstance(obj, Person):
+ assert obj.name is None
+ assert obj.email is None
+ assert len(obj.fullname) == 32
+ else:
+ for key, value in asdict(obj, recurse=False).items():
+ check_anonymized_obj(value)
+
+
+@given(lists(objects(split_content=True)))
+def test_anonymizer(obj_type_and_objs):
+ for obj_type, obj in obj_type_and_objs:
+ check_anonymized_obj(obj.anonymize())
diff --git a/swh/storage/tests/test_replay.py b/swh/storage/tests/test_replay.py
index 670d7431..7c8622b0 100644
--- a/swh/storage/tests/test_replay.py
+++ b/swh/storage/tests/test_replay.py
@@ -1,269 +1,368 @@
# Copyright (C) 2019-2020 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import functools
import logging
from typing import Container, Dict, Optional
import pytest
from swh.model.hashutil import hash_to_hex, MultiHash, DEFAULT_ALGORITHMS
from swh.model.model import Content
from swh.storage import get_storage
from swh.storage.in_memory import InMemoryStorage
from swh.storage.replay import process_replay_objects
from swh.journal.serializers import key_to_kafka, value_to_kafka
from swh.journal.client import JournalClient
from swh.journal.tests.journal_data import (
TEST_OBJECTS,
DUPLICATE_CONTENTS,
)
UTC = datetime.timezone.utc
@pytest.fixture()
def replayer_storage_and_client(
kafka_prefix: str, kafka_consumer_group: str, kafka_server: str
):
journal_writer_config = {
"cls": "kafka",
"brokers": [kafka_server],
"client_id": "kafka_writer",
"prefix": kafka_prefix,
}
storage_config = {
"cls": "memory",
"journal_writer": journal_writer_config,
}
storage = get_storage(**storage_config)
replayer = JournalClient(
brokers=kafka_server,
group_id=kafka_consumer_group,
prefix=kafka_prefix,
stop_on_eof=True,
)
yield storage, replayer
def test_storage_replayer(replayer_storage_and_client, caplog):
"""Optimal replayer scenario.
This:
- writes objects to a source storage
- - replayer consumes objects from the topic and replay them
+ - replayer consumes objects from the topic and replays them
- a destination storage is filled from this
In the end, both storages should have the same content.
"""
src, replayer = replayer_storage_and_client
# Fill Kafka using a source storage
nb_sent = 0
for object_type, objects in TEST_OBJECTS.items():
if object_type == "origin_visit":
# src.origin_visit_upsert(objects)
for visit in objects:
src.origin_visit_add(
origin_url=visit.origin, date=visit.date, type=visit.type
)
else:
method = getattr(src, object_type + "_add")
method(objects)
nb_sent += len(objects)
caplog.set_level(logging.ERROR, "swh.journal.replay")
# Fill the destination storage from Kafka
dst = get_storage(cls="memory")
worker_fn = functools.partial(process_replay_objects, storage=dst)
nb_inserted = replayer.process(worker_fn)
assert nb_sent == nb_inserted
_check_replayed(src, dst)
collision = 0
for record in caplog.records:
logtext = record.getMessage()
if "Colliding contents:" in logtext:
collision += 1
assert collision == 0, "No collision should be detected"
def test_storage_play_with_collision(replayer_storage_and_client, caplog):
"""Another replayer scenario with collisions.
This:
- writes objects to the topic, including colliding contents
- replayer consumes objects from the topic and replay them
- This drops the colliding contents from the replay when detected
"""
src, replayer = replayer_storage_and_client
# Fill Kafka using a source storage
nb_sent = 0
for object_type, objects in TEST_OBJECTS.items():
if object_type == "origin_visit":
for visit in objects:
src.origin_visit_add(
origin_url=visit.origin, date=visit.date, type=visit.type
)
else:
method = getattr(src, object_type + "_add")
method(objects)
nb_sent += len(objects)
# Create collision in input data
# These should not be written in the destination
producer = src.journal_writer.journal.producer
prefix = src.journal_writer.journal._prefix
for content in DUPLICATE_CONTENTS:
topic = f"{prefix}.content"
key = content["sha1"]
producer.produce(
topic=topic, key=key_to_kafka(key), value=value_to_kafka(content),
)
nb_sent += 1
producer.flush()
caplog.set_level(logging.ERROR, "swh.journal.replay")
# Fill the destination storage from Kafka
dst = get_storage(cls="memory")
worker_fn = functools.partial(process_replay_objects, storage=dst)
nb_inserted = replayer.process(worker_fn)
assert nb_sent == nb_inserted
# check the logs for the collision being properly detected
nb_collisions = 0
actual_collision: Dict
for record in caplog.records:
logtext = record.getMessage()
if "Collision detected:" in logtext:
nb_collisions += 1
actual_collision = record.args["collision"]
assert nb_collisions == 1, "1 collision should be detected"
algo = "sha1"
assert actual_collision["algo"] == algo
expected_colliding_hash = hash_to_hex(DUPLICATE_CONTENTS[0][algo])
assert actual_collision["hash"] == expected_colliding_hash
actual_colliding_hashes = actual_collision["objects"]
assert len(actual_colliding_hashes) == len(DUPLICATE_CONTENTS)
for content in DUPLICATE_CONTENTS:
expected_content_hashes = {
k: hash_to_hex(v) for k, v in Content.from_dict(content).hashes().items()
}
assert expected_content_hashes in actual_colliding_hashes
# all objects from the src should exists in the dst storage
_check_replayed(src, dst, exclude=["contents"])
# but the dst has one content more (one of the 2 colliding ones)
assert len(src._contents) == len(dst._contents) - 1
def test_replay_skipped_content(replayer_storage_and_client):
"""Test the 'skipped_content' topic is properly replayed."""
src, replayer = replayer_storage_and_client
_check_replay_skipped_content(src, replayer, "skipped_content")
def test_replay_skipped_content_bwcompat(replayer_storage_and_client):
"""Test the 'content' topic can be used to replay SkippedContent objects."""
src, replayer = replayer_storage_and_client
_check_replay_skipped_content(src, replayer, "content")
# utility functions
def _check_replayed(
src: InMemoryStorage, dst: InMemoryStorage, exclude: Optional[Container] = None
):
"""Simple utility function to compare the content of 2 in_memory storages
"""
- expected_persons = set(src._persons)
- got_persons = set(dst._persons)
+ expected_persons = set(src._persons.values())
+ got_persons = set(dst._persons.values())
assert got_persons == expected_persons
for attr in (
"contents",
"skipped_contents",
"directories",
"revisions",
"releases",
"snapshots",
"origins",
"origin_visits",
):
if exclude and attr in exclude:
continue
expected_objects = sorted(getattr(src, f"_{attr}").items())
got_objects = sorted(getattr(dst, f"_{attr}").items())
assert got_objects == expected_objects, f"Mismatch object list for {attr}"
def _check_replay_skipped_content(storage, replayer, topic):
skipped_contents = _gen_skipped_contents(100)
nb_sent = len(skipped_contents)
producer = storage.journal_writer.journal.producer
prefix = storage.journal_writer.journal._prefix
for i, obj in enumerate(skipped_contents):
producer.produce(
topic=f"{prefix}.{topic}",
key=key_to_kafka({"sha1": obj["sha1"]}),
value=value_to_kafka(obj),
)
producer.flush()
dst_storage = get_storage(cls="memory")
worker_fn = functools.partial(process_replay_objects, storage=dst_storage)
nb_inserted = replayer.process(worker_fn)
assert nb_sent == nb_inserted
for content in skipped_contents:
assert not storage.content_find({"sha1": content["sha1"]})
# no skipped_content_find API endpoint, so use this instead
assert not list(dst_storage.skipped_content_missing(skipped_contents))
def _updated(d1, d2):
d1.update(d2)
d1.pop("data", None)
return d1
def _gen_skipped_contents(n=10):
# we do not use the hypothesis strategy here because this does not play well with
# pytest fixtures, and it makes test execution very slow
algos = DEFAULT_ALGORITHMS | {"length"}
now = datetime.datetime.now(tz=UTC)
return [
_updated(
MultiHash.from_data(data=f"foo{i}".encode(), hash_names=algos).digest(),
{
"status": "absent",
"reason": "why not",
"origin": f"https://somewhere/{i}",
"ctime": now,
},
)
for i in range(n)
]
+
+
+def test_storage_play_anonymized(
+ kafka_prefix: str, kafka_consumer_group: str, kafka_server: str
+):
+ """Optimal replayer scenario.
+
+ This:
+ - writes objects to the topic
+ - replayer consumes objects from the topic and replay them
+
+ """
+ writer_config = {
+ "cls": "kafka",
+ "brokers": [kafka_server],
+ "client_id": "kafka_writer",
+ "prefix": kafka_prefix,
+ "anonymize": True,
+ }
+ src_config = {"cls": "memory", "journal_writer": writer_config}
+
+ storage = get_storage(**src_config)
+
+ # Fill the src storage
+ nb_sent = 0
+ for obj_type, objs in TEST_OBJECTS.items():
+ if obj_type == "origin_visit":
+ # these have non-consistent API and are unrelated with what we
+ # want to test here
+ continue
+ method = getattr(storage, obj_type + "_add")
+ method(objs)
+ nb_sent += len(objs)
+
+ # Fill a destination storage from Kafka **using anonymized topics**
+ dst_storage = get_storage(cls="memory")
+ replayer = JournalClient(
+ brokers=kafka_server,
+ group_id=kafka_consumer_group,
+ prefix=kafka_prefix,
+ stop_after_objects=nb_sent,
+ privileged=False,
+ )
+ worker_fn = functools.partial(process_replay_objects, storage=dst_storage)
+
+ nb_inserted = replayer.process(worker_fn)
+ assert nb_sent == nb_inserted
+ check_replayed(storage, dst_storage, expected_anonymized=True)
+
+ # Fill a destination storage from Kafka **with stock (non-anonymized) topics**
+ dst_storage = get_storage(cls="memory")
+ replayer = JournalClient(
+ brokers=kafka_server,
+ group_id=kafka_consumer_group,
+ prefix=kafka_prefix,
+ stop_after_objects=nb_sent,
+ privileged=True,
+ )
+ worker_fn = functools.partial(process_replay_objects, storage=dst_storage)
+
+ nb_inserted = replayer.process(worker_fn)
+ assert nb_sent == nb_inserted
+ check_replayed(storage, dst_storage, expected_anonymized=False)
+
+
+def check_replayed(src, dst, expected_anonymized=False):
+ """Simple utility function to compare the content of 2 in_memory storages
+
+ If expected_anonymized is True, objects from the source storage are anonymized
+ before comparing with the destination storage.
+
+ """
+
+ def maybe_anonymize(obj):
+ if expected_anonymized:
+ return obj.anonymize() or obj
+ return obj
+
+ expected_persons = {maybe_anonymize(person) for person in src._persons.values()}
+ got_persons = set(dst._persons.values())
+ assert got_persons == expected_persons
+
+ for attr in (
+ "contents",
+ "skipped_contents",
+ "directories",
+ "revisions",
+ "releases",
+ "snapshots",
+ "origins",
+ ):
+ expected_objects = [
+ (id, maybe_anonymize(obj))
+ for id, obj in sorted(getattr(src, f"_{attr}").items())
+ ]
+ got_objects = [
+ (id, obj) for id, obj in sorted(getattr(dst, f"_{attr}").items())
+ ]
+ assert got_objects == expected_objects, f"Mismatch object list for {attr}"
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Thu, Jul 3, 11:59 AM (2 d, 21 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3276924
Attached To
rDSTO Storage manager
Event Timeline
Log In to Comment