diff --git a/swh/storage/replay.py b/swh/storage/replay.py --- a/swh/storage/replay.py +++ b/swh/storage/replay.py @@ -49,6 +49,7 @@ def process_replay_objects(all_objects, *, storage): for (object_type, objects) in all_objects.items(): logger.debug("Inserting %s %s objects", len(objects), object_type) + object_type = object_type.split(":", 1)[0] with statsd.timed(GRAPH_DURATION_METRIC, tags={"object_type": object_type}): _insert_objects(object_type, objects, storage) statsd.increment( diff --git a/swh/storage/tests/test_kafka_writer.py b/swh/storage/tests/test_kafka_writer.py --- a/swh/storage/tests/test_kafka_writer.py +++ b/swh/storage/tests/test_kafka_writer.py @@ -6,10 +6,17 @@ from confluent_kafka import Consumer from swh.storage import get_storage +from swh.storage.writer import anonymize from swh.model.model import Origin, OriginVisit +from swh.model.hypothesis_strategies import objects from swh.journal.pytest_plugin import consume_messages, assert_all_objects_consumed from swh.journal.tests.journal_data import TEST_OBJECTS +from swh.model.model import Person +from attr import asdict, has +from hypothesis import given +from hypothesis.strategies import lists + def test_storage_direct_writer(kafka_prefix: str, kafka_server, consumer: Consumer): @@ -18,6 +25,7 @@ "brokers": [kafka_server], "client_id": "kafka_writer", "prefix": kafka_prefix, + "anonymize": False, } storage_config = { "cls": "pipeline", @@ -28,9 +36,9 @@ expected_messages = 0 - for object_type, objects in TEST_OBJECTS.items(): - method = getattr(storage, object_type + "_add") - if object_type in ( + for obj_type, objs in TEST_OBJECTS.items(): + method = getattr(storage, obj_type + "_add") + if obj_type in ( "content", "skipped_content", "directory", @@ -39,10 +47,10 @@ "snapshot", "origin", ): - method(objects) - expected_messages += len(objects) - elif object_type in ("origin_visit",): - for obj in objects: + method(objs) + expected_messages += len(objs) + elif obj_type in ("origin_visit",): + for obj in objs: assert isinstance(obj, OriginVisit) storage.origin_add_one(Origin(url=obj.origin)) visit = method(obj.origin, date=obj.date, type=obj.type) @@ -54,7 +62,87 @@ storage.origin_visit_update(obj.origin, visit.visit, **obj_d) expected_messages += 1 else: - assert False, object_type + assert False, obj_type + + existing_topics = { + topic.split(".", 1)[1] + for topic in consumer.list_topics(timeout=10).topics.keys() + } + assert existing_topics == { + "content", + "directory", + "origin", + "origin_visit", + "release", + "revision", + "snapshot", + "skipped_content", + } consumed_messages = consume_messages(consumer, kafka_prefix, expected_messages) assert_all_objects_consumed(consumed_messages) + + +def test_storage_direct_writer_anonymized( + kafka_prefix: str, kafka_server, consumer: Consumer +): + + writer_config = { + "cls": "kafka", + "brokers": [kafka_server], + "client_id": "kafka_writer", + "prefix": kafka_prefix, + "anonymize": True, + } + storage_config = { + "cls": "pipeline", + "steps": [{"cls": "memory", "journal_writer": writer_config},], + } + + storage = get_storage(**storage_config) + + expected_messages = 0 + + for obj_type, objs in TEST_OBJECTS.items(): + if obj_type == "origin_visit": + # these have non-consistent API and are unrelated with what we + # want to test here + continue + method = getattr(storage, obj_type + "_add") + method(objs) + expected_messages += len(objs) + + existing_topics = { + topic.split(".", 1)[1] + for topic in consumer.list_topics(timeout=10).topics.keys() + } + assert existing_topics == { + "content", + "directory", + "origin", + "origin_visit", + "release", + "release:anonymized", + "revision", + "revision:anonymized", + "snapshot", + "skipped_content", + } + + +def check_anonymized_obj(obj): + if has(obj): + if isinstance(obj, Person): + assert obj.name is None + assert obj.email is None + assert len(obj.fullname) == 32 + else: + for key, value in asdict(obj, recurse=False).items(): + check_anonymized_obj(value) + + +@given(lists(objects(split_content=True))) +def test_anonymizer(obj_type_and_objs): + for obj_type, obj in obj_type_and_objs: + anonymized_obj = anonymize(obj) + check_anonymized_obj(anonymized_obj) diff --git a/swh/storage/tests/test_replay.py b/swh/storage/tests/test_replay.py --- a/swh/storage/tests/test_replay.py +++ b/swh/storage/tests/test_replay.py @@ -19,10 +19,12 @@ from swh.model.model import Content from swh.storage import get_storage +from swh.storage.writer import anonymize from swh.storage.replay import process_replay_objects from swh.journal.serializers import key_to_kafka, value_to_kafka from swh.journal.client import JournalClient +from swh.journal.tests.journal_data import TEST_OBJECTS from swh.journal.tests.utils import MockedJournalClient, MockedKafkaWriter from swh.journal.tests.conftest import ( @@ -463,3 +465,65 @@ ) for i in range(n) ] + + +def test_storage_play_anonymized( + kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, caplog, +): + """Optimal replayer scenario. + + This: + - writes objects to the topic + - replayer consumes objects from the topic and replay them + + """ + writer_config = { + "cls": "kafka", + "brokers": [kafka_server], + "client_id": "kafka_writer", + "prefix": kafka_prefix, + "anonymize": True, + } + src_config = {"cls": "memory", "journal_writer": writer_config} + + storage = get_storage(**src_config) + + # Fill the src storage + nb_sent = 0 + for obj_type, objs in TEST_OBJECTS.items(): + if obj_type == "origin_visit": + # these have non-consistent API and are unrelated with what we + # want to test here + continue + method = getattr(storage, obj_type + "_add") + method(objs) + nb_sent += len(objs) + + dst_storage = get_storage(cls="memory") + caplog.set_level(logging.ERROR, "swh.journal.replay") + # Fill the storage from Kafka using anonymized topics + object_types = { + "content", + "directory", + "origin", + "origin_visit", + "release:anonymized", + "revision:anonymized", + "snapshot", + "skipped_content", + } + + replayer = JournalClient( + brokers=kafka_server, + group_id=kafka_consumer_group, + prefix=kafka_prefix, + stop_after_objects=nb_sent, + object_types=list(object_types), + ) + worker_fn = functools.partial(process_replay_objects, storage=dst_storage) + + nb_inserted = replayer.process(worker_fn) + assert nb_sent == nb_inserted + + expected_persons = {anonymize(person) for person in storage._persons} + assert set(dst_storage._persons) == expected_persons diff --git a/swh/storage/writer.py b/swh/storage/writer.py --- a/swh/storage/writer.py +++ b/swh/storage/writer.py @@ -3,20 +3,12 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Iterable +from typing import Any, Iterable -from attr import evolve +from attr import asdict, evolve, has +from hashlib import sha256 -from swh.model.model import ( - Origin, - OriginVisit, - Snapshot, - Directory, - Revision, - Release, - Content, - SkippedContent, -) +from swh.model import model try: from swh.journal.writer import get_journal_writer @@ -25,14 +17,37 @@ # mypy limitation, see https://github.com/python/mypy/issues/1153 +def anonymize(obj: Any) -> Any: + """Anonymize the model obj + + rebuild an obj entity with all instances of Person (recursively) present + in ``obj`` replaced with an anonymized version of the Person object. + + An anonymized ``Person`` is built by hashing the original Person's values (fullname + + name + email) and use this hash value as fullname. + + """ + if not has(obj): + return obj + if isinstance(obj, model.Person): + tohash = obj.fullname + (obj.name or b"") + (obj.email or b"") + return model.Person(fullname=sha256(tohash).digest(), name=None, email=None) + return evolve( + obj, **{k: anonymize(v) for (k, v) in asdict(obj, recurse=False).items()} + ) + + class JournalWriter: """Journal writer storage collaborator. It's in charge of adding objects to the journal. """ + anonymizable = ["release", "revision"] + def __init__(self, journal_writer): if journal_writer: + self.anonymize = journal_writer.pop("anonymize", False) if get_journal_writer is None: raise EnvironmentError( "You need the swh.journal package to use the " @@ -41,48 +56,53 @@ self.journal = get_journal_writer(**journal_writer) else: self.journal = None + self.anonymize = False def write_additions(self, obj_type, values) -> None: if self.journal: self.journal.write_additions(obj_type, values) + if self.anonymize and obj_type in self.anonymizable: + self.journal.write_additions( + f"{obj_type}:anonymized", [anonymize(value) for value in values] + ) - def content_add(self, contents: Iterable[Content]) -> None: + def content_add(self, contents: Iterable[model.Content]) -> None: """Add contents to the journal. Drop the data field if provided. """ contents = [evolve(item, data=None) for item in contents] self.write_additions("content", contents) - def content_update(self, contents: Iterable[Content]) -> None: + def content_update(self, contents: Iterable[model.Content]) -> None: if self.journal: raise NotImplementedError("content_update is not supported by the journal.") - def content_add_metadata(self, contents: Iterable[Content]) -> None: + def content_add_metadata(self, contents: Iterable[model.Content]) -> None: self.content_add(contents) - def skipped_content_add(self, contents: Iterable[SkippedContent]) -> None: + def skipped_content_add(self, contents: Iterable[model.SkippedContent]) -> None: self.write_additions("skipped_content", contents) - def directory_add(self, directories: Iterable[Directory]) -> None: + def directory_add(self, directories: Iterable[model.Directory]) -> None: self.write_additions("directory", directories) - def revision_add(self, revisions: Iterable[Revision]) -> None: + def revision_add(self, revisions: Iterable[model.Revision]) -> None: self.write_additions("revision", revisions) - def release_add(self, releases: Iterable[Release]) -> None: + def release_add(self, releases: Iterable[model.Release]) -> None: self.write_additions("release", releases) - def snapshot_add(self, snapshots: Iterable[Snapshot]) -> None: + def snapshot_add(self, snapshots: Iterable[model.Snapshot]) -> None: self.write_additions("snapshot", snapshots) - def origin_visit_add(self, visits: Iterable[OriginVisit]) -> None: + def origin_visit_add(self, visits: Iterable[model.OriginVisit]) -> None: self.write_additions("origin_visit", visits) - def origin_visit_update(self, visits: Iterable[OriginVisit]) -> None: + def origin_visit_update(self, visits: Iterable[model.OriginVisit]) -> None: self.write_additions("origin_visit", visits) - def origin_visit_upsert(self, visits: Iterable[OriginVisit]) -> None: + def origin_visit_upsert(self, visits: Iterable[model.OriginVisit]) -> None: self.write_additions("origin_visit", visits) - def origin_add(self, origins: Iterable[Origin]) -> None: + def origin_add(self, origins: Iterable[model.Origin]) -> None: self.write_additions("origin", origins)