diff --git a/swh/storage/tests/test_replay.py b/swh/storage/tests/test_replay.py --- a/swh/storage/tests/test_replay.py +++ b/swh/storage/tests/test_replay.py @@ -5,13 +5,9 @@ import datetime import functools -import random import logging -import dateutil -from typing import Dict, List - -from confluent_kafka import Producer +from typing import Container, Dict, Optional import pytest @@ -19,105 +15,81 @@ from swh.model.model import Content from swh.storage import get_storage +from swh.storage.in_memory import InMemoryStorage from swh.storage.replay import process_replay_objects from swh.journal.serializers import key_to_kafka, value_to_kafka from swh.journal.client import JournalClient -from swh.journal.tests.utils import MockedJournalClient, MockedKafkaWriter from swh.journal.tests.journal_data import ( - TEST_OBJECT_DICTS, + TEST_OBJECTS, DUPLICATE_CONTENTS, ) UTC = datetime.timezone.utc -storage_config = {"cls": "pipeline", "steps": [{"cls": "memory"},]} - -def test_storage_play( - kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, caplog, +@pytest.fixture() +def replayer_storage_and_client( + kafka_prefix: str, kafka_consumer_group: str, kafka_server: str ): + journal_writer_config = { + "cls": "kafka", + "brokers": [kafka_server], + "client_id": "kafka_writer", + "prefix": kafka_prefix, + } + storage_config = { + "cls": "memory", + "journal_writer": journal_writer_config, + } + storage = get_storage(**storage_config) + replayer = JournalClient( + brokers=kafka_server, + group_id=kafka_consumer_group, + prefix=kafka_prefix, + stop_on_eof=True, + ) + + yield storage, replayer + + +def test_storage_replayer(replayer_storage_and_client, caplog): """Optimal replayer scenario. This: - - writes objects to the topic + - writes objects to a source storage - replayer consumes objects from the topic and replay them + - a destination storage is filled from this + In the end, both storages should have the same content. """ - kafka_prefix += ".swh.journal.objects" + src, replayer = replayer_storage_and_client - storage = get_storage(**storage_config) - - producer = Producer( - { - "bootstrap.servers": kafka_server, - "client.id": "test producer", - "acks": "all", - } - ) - - now = datetime.datetime.now(tz=UTC) - - # Fill Kafka + # Fill Kafka using a source storage nb_sent = 0 - for object_type, objects in TEST_OBJECT_DICTS.items(): - topic = f"{kafka_prefix}.{object_type}" - for object_ in objects: - key = bytes(random.randint(0, 255) for _ in range(40)) - object_ = object_.copy() - if object_type == "content": - object_["ctime"] = now - producer.produce( - topic=topic, key=key_to_kafka(key), value=value_to_kafka(object_), - ) - nb_sent += 1 - - producer.flush() + for object_type, objects in TEST_OBJECTS.items(): + if object_type == "origin_visit": + # src.origin_visit_upsert(objects) + for visit in objects: + src.origin_visit_add( + origin_url=visit.origin, date=visit.date, type=visit.type + ) + else: + method = getattr(src, object_type + "_add") + method(objects) + nb_sent += len(objects) caplog.set_level(logging.ERROR, "swh.journal.replay") - # Fill the storage from Kafka - replayer = JournalClient( - brokers=kafka_server, - group_id=kafka_consumer_group, - prefix=kafka_prefix, - stop_after_objects=nb_sent, - ) - worker_fn = functools.partial(process_replay_objects, storage=storage) - nb_inserted = 0 - while nb_inserted < nb_sent: - nb_inserted += replayer.process(worker_fn) - assert nb_sent == nb_inserted - # Check the objects were actually inserted in the storage - assert TEST_OBJECT_DICTS["revision"] == list( - storage.revision_get([rev["id"] for rev in TEST_OBJECT_DICTS["revision"]]) - ) - assert TEST_OBJECT_DICTS["release"] == list( - storage.release_get([rel["id"] for rel in TEST_OBJECT_DICTS["release"]]) - ) + # Fill the destination storage from Kafka + dst = get_storage(cls="memory") + worker_fn = functools.partial(process_replay_objects, storage=dst) + nb_inserted = replayer.process(worker_fn) + assert nb_sent == nb_inserted - origins = list(storage.origin_get([orig for orig in TEST_OBJECT_DICTS["origin"]])) - assert TEST_OBJECT_DICTS["origin"] == [{"url": orig["url"]} for orig in origins] - for origin in origins: - origin_url = origin["url"] - expected_visits = [ - { - **visit, - "origin": origin_url, - "date": dateutil.parser.parse(visit["date"]), - } - for visit in TEST_OBJECT_DICTS["origin_visit"] - if visit["origin"] == origin["url"] - ] - actual_visits = list(storage.origin_visit_get(origin_url)) - assert expected_visits == actual_visits - - input_contents = TEST_OBJECT_DICTS["content"] - contents = storage.content_get_metadata([cont["sha1"] for cont in input_contents]) - assert len(contents) == len(input_contents) - assert contents == {cont["sha1"]: [cont] for cont in input_contents} + _check_replayed(src, dst) collision = 0 for record in caplog.records: @@ -128,9 +100,7 @@ assert collision == 0, "No collision should be detected" -def test_storage_play_with_collision( - kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, caplog, -): +def test_storage_play_with_collision(replayer_storage_and_client, caplog): """Another replayer scenario with collisions. This: @@ -139,91 +109,45 @@ - This drops the colliding contents from the replay when detected """ - kafka_prefix += ".swh.journal.objects" - - storage = get_storage(**storage_config) - - producer = Producer( - { - "bootstrap.servers": kafka_server, - "client.id": "test producer", - "enable.idempotence": "true", - } - ) + src, replayer = replayer_storage_and_client - now = datetime.datetime.now(tz=UTC) - - # Fill Kafka + # Fill Kafka using a source storage nb_sent = 0 - for object_type, objects in TEST_OBJECT_DICTS.items(): - topic = f"{kafka_prefix}.{object_type}" - for object_ in objects: - key = bytes(random.randint(0, 255) for _ in range(40)) - object_ = object_.copy() - if object_type == "content": - object_["ctime"] = now - producer.produce( - topic=topic, key=key_to_kafka(key), value=value_to_kafka(object_), - ) - nb_sent += 1 + for object_type, objects in TEST_OBJECTS.items(): + if object_type == "origin_visit": + for visit in objects: + src.origin_visit_add( + origin_url=visit.origin, date=visit.date, type=visit.type + ) + else: + method = getattr(src, object_type + "_add") + method(objects) + nb_sent += len(objects) # Create collision in input data - # They are not written in the destination + # These should not be written in the destination + producer = src.journal_writer.journal.producer + prefix = src.journal_writer.journal._prefix for content in DUPLICATE_CONTENTS: - topic = f"{kafka_prefix}.content" + topic = f"{prefix}.content" + key = content["sha1"] producer.produce( topic=topic, key=key_to_kafka(key), value=value_to_kafka(content), ) - nb_sent += 1 producer.flush() caplog.set_level(logging.ERROR, "swh.journal.replay") - # Fill the storage from Kafka - replayer = JournalClient( - brokers=kafka_server, - group_id=kafka_consumer_group, - prefix=kafka_prefix, - stop_after_objects=nb_sent, - ) - worker_fn = functools.partial(process_replay_objects, storage=storage) - nb_inserted = 0 - while nb_inserted < nb_sent: - nb_inserted += replayer.process(worker_fn) - assert nb_sent == nb_inserted - - # Check the objects were actually inserted in the storage - assert TEST_OBJECT_DICTS["revision"] == list( - storage.revision_get([rev["id"] for rev in TEST_OBJECT_DICTS["revision"]]) - ) - assert TEST_OBJECT_DICTS["release"] == list( - storage.release_get([rel["id"] for rel in TEST_OBJECT_DICTS["release"]]) - ) - origins = list(storage.origin_get([orig for orig in TEST_OBJECT_DICTS["origin"]])) - assert TEST_OBJECT_DICTS["origin"] == [{"url": orig["url"]} for orig in origins] - for origin in origins: - origin_url = origin["url"] - expected_visits = [ - { - **visit, - "origin": origin_url, - "date": dateutil.parser.parse(visit["date"]), - } - for visit in TEST_OBJECT_DICTS["origin_visit"] - if visit["origin"] == origin["url"] - ] - actual_visits = list(storage.origin_visit_get(origin_url)) - assert expected_visits == actual_visits - - input_contents = TEST_OBJECT_DICTS["content"] - contents = storage.content_get_metadata([cont["sha1"] for cont in input_contents]) - assert len(contents) == len(input_contents) - assert contents == {cont["sha1"]: [cont] for cont in input_contents} + # Fill the destination storage from Kafka + dst = get_storage(cls="memory") + worker_fn = functools.partial(process_replay_objects, storage=dst) + nb_inserted = replayer.process(worker_fn) + assert nb_sent == nb_inserted + # check the logs for the collision being properly detected nb_collisions = 0 - actual_collision: Dict for record in caplog.records: logtext = record.getMessage() @@ -246,186 +170,78 @@ } assert expected_content_hashes in actual_colliding_hashes + # all objects from the src should exists in the dst storage + _check_replayed(src, dst, exclude=["contents"]) + # but the dst has one content more (one of the 2 colliding ones) + assert len(src._contents) == len(dst._contents) - 1 -def _test_write_replay_origin_visit(visits: List[Dict]): - """Helper function to write tests for origin_visit. - - Each visit (a dict) given in the 'visits' argument will be sent to - a (mocked) kafka queue, which a in-memory-storage backed replayer is - listening to. - - Check that corresponding origin visits entities are present in the storage - and have correct values if they are not skipped. - - """ - queue: List = [] - replayer = MockedJournalClient(queue) - writer = MockedKafkaWriter(queue) - - # Note that flipping the order of these two insertions will crash - # the test, because the legacy origin_format does not allow to create - # the origin when needed (type is missing) - writer.send( - "origin", - "foo", - { - "url": "http://example.com/", - "type": "git", # test the legacy origin format is accepted - }, - ) - for visit in visits: - writer.send("origin_visit", "foo", visit) - - queue_size = len(queue) - assert replayer.stop_after_objects is None - replayer.stop_after_objects = queue_size - - storage = get_storage(**storage_config) - worker_fn = functools.partial(process_replay_objects, storage=storage) - - replayer.process(worker_fn) - actual_visits = list(storage.origin_visit_get("http://example.com/")) - - assert len(actual_visits) == len(visits), actual_visits +def test_replay_skipped_content(replayer_storage_and_client): + """Test the 'skipped_content' topic is properly replayed.""" + src, replayer = replayer_storage_and_client + _check_replay_skipped_content(src, replayer, "skipped_content") - for vin, vout in zip(visits, actual_visits): - vin = vin.copy() - vout = vout.copy() - assert vout.pop("origin") == "http://example.com/" - vin.pop("origin") - vin.setdefault("type", "git") - vin.setdefault("metadata", None) - assert vin == vout +def test_replay_skipped_content_bwcompat(replayer_storage_and_client): + """Test the 'content' topic can be used to replay SkippedContent objects.""" + src, replayer = replayer_storage_and_client + _check_replay_skipped_content(src, replayer, "content") -def test_write_replay_origin_visit(kafka_server): - """Test origin_visit when the 'origin' is just a string.""" - now = datetime.datetime.now(tz=UTC) - visits = [ - { - "visit": 1, - "origin": "http://example.com/", - "date": now, - "type": "git", - "status": "partial", - "snapshot": None, - } - ] - _test_write_replay_origin_visit(visits) +# utility functions -def test_write_replay_legacy_origin_visit1(): - """Origin_visit with no types should make the replayer crash - We expect the journal's origin_visit topic to no longer reference such - visits. If it does, the replayer must crash so we can fix the journal's - topic. +def _check_replayed( + src: InMemoryStorage, dst: InMemoryStorage, exclude: Optional[Container] = None +): + """Simple utility function to compare the content of 2 in_memory storages """ - now = datetime.datetime.now(tz=UTC) - visit = { - "visit": 1, - "origin": "http://example.com/", - "date": now, - "status": "partial", - "snapshot": None, - } - now2 = datetime.datetime.now(tz=UTC) - visit2 = { - "visit": 2, - "origin": {"url": "http://example.com/"}, - "date": now2, - "status": "partial", - "snapshot": None, - } - - for origin_visit in [visit, visit2]: - with pytest.raises(ValueError, match="Old origin visit format"): - _test_write_replay_origin_visit([origin_visit]) - - -def test_write_replay_legacy_origin_visit2(): - """Test origin_visit when 'type' is missing from the visit, but not - from the origin.""" - now = datetime.datetime.now(tz=UTC) - visits = [ - { - "visit": 1, - "origin": {"url": "http://example.com/", "type": "git",}, - "date": now, - "type": "git", - "status": "partial", - "snapshot": None, - } - ] - _test_write_replay_origin_visit(visits) - - -def test_write_replay_legacy_origin_visit3(): - """Test origin_visit when the origin is a dict""" - now = datetime.datetime.now(tz=UTC) - visits = [ - { - "visit": 1, - "origin": {"url": "http://example.com/",}, - "date": now, - "type": "git", - "status": "partial", - "snapshot": None, - } - ] - _test_write_replay_origin_visit(visits) - - -def test_replay_skipped_content(kafka_server, kafka_prefix): - """Test the 'skipped_content' topic is properly replayed.""" - _check_replay_skipped_content(kafka_server, kafka_prefix, "skipped_content") - - -def test_replay_skipped_content_bwcompat(kafka_server, kafka_prefix): - """Test the 'content' topic can be used to replay SkippedContent objects.""" - _check_replay_skipped_content(kafka_server, kafka_prefix, "content") - - -def _check_replay_skipped_content(kafka_server, kafka_prefix, topic): + expected_persons = set(src._persons) + got_persons = set(dst._persons) + assert got_persons == expected_persons + + for attr in ( + "contents", + "skipped_contents", + "directories", + "revisions", + "releases", + "snapshots", + "origins", + "origin_visits", + ): + if exclude and attr in exclude: + continue + expected_objects = sorted(getattr(src, f"_{attr}").items()) + got_objects = sorted(getattr(dst, f"_{attr}").items()) + assert got_objects == expected_objects, f"Mismatch object list for {attr}" + + +def _check_replay_skipped_content(storage, replayer, topic): skipped_contents = _gen_skipped_contents(100) nb_sent = len(skipped_contents) - - producer = Producer( - { - "bootstrap.servers": kafka_server, - "client.id": "test producer", - "acks": "all", - } - ) - replayer = JournalClient( - brokers=[kafka_server], - group_id="test consumer", - prefix=kafka_prefix, - stop_after_objects=nb_sent, - ) - assert f"{kafka_prefix}.skipped_content" in replayer.subscription + producer = storage.journal_writer.journal.producer + prefix = storage.journal_writer.journal._prefix for i, obj in enumerate(skipped_contents): - obj.pop("data", None) producer.produce( - topic=f"{kafka_prefix}.{topic}", + topic=f"{prefix}.{topic}", key=key_to_kafka({"sha1": obj["sha1"]}), value=value_to_kafka(obj), ) producer.flush() - storage = get_storage(cls="memory") - - worker_fn = functools.partial(process_replay_objects, storage=storage) + dst_storage = get_storage(cls="memory") + worker_fn = functools.partial(process_replay_objects, storage=dst_storage) nb_inserted = replayer.process(worker_fn) + assert nb_sent == nb_inserted for content in skipped_contents: assert not storage.content_find({"sha1": content["sha1"]}) # no skipped_content_find API endpoint, so use this instead - assert not list(storage.skipped_content_missing(skipped_contents)) + assert not list(dst_storage.skipped_content_missing(skipped_contents)) def _updated(d1, d2): diff --git a/swh/storage/tests/test_write_replay.py b/swh/storage/tests/test_write_replay.py deleted file mode 100644 --- a/swh/storage/tests/test_write_replay.py +++ /dev/null @@ -1,112 +0,0 @@ -# Copyright (C) 2019-2020 The Software Heritage developers -# See the AUTHORS file at the top-level directory of this distribution -# License: GNU General Public License version 3, or any later version -# See top-level LICENSE file for more information - -import functools -from unittest.mock import patch - -import attr -from hypothesis import given, settings, HealthCheck -from hypothesis.strategies import lists - -from swh.model.hypothesis_strategies import objects -from swh.model.model import Origin -from swh.storage import get_storage -from swh.storage.exc import HashCollision - -from swh.storage.replay import process_replay_objects - -from swh.journal.tests.utils import MockedJournalClient, MockedKafkaWriter - - -storage_config = { - "cls": "memory", - "journal_writer": {"cls": "memory"}, -} - - -def empty_person_name_email(rev_or_rel): - """Empties the 'name' and 'email' fields of the author/committer fields - of a revision or release; leaving only the fullname.""" - if getattr(rev_or_rel, "author", None): - rev_or_rel = attr.evolve( - rev_or_rel, author=attr.evolve(rev_or_rel.author, name=b"", email=b"",) - ) - - if getattr(rev_or_rel, "committer", None): - rev_or_rel = attr.evolve( - rev_or_rel, - committer=attr.evolve(rev_or_rel.committer, name=b"", email=b"",), - ) - - return rev_or_rel - - -@given(lists(objects(blacklist_types=("origin_visit_status",)), min_size=1)) -@settings(suppress_health_check=[HealthCheck.too_slow]) -def test_write_replay_same_order_batches(objects): - queue = [] - replayer = MockedJournalClient(queue) - - with patch( - "swh.journal.writer.inmemory.InMemoryJournalWriter", - return_value=MockedKafkaWriter(queue), - ): - storage1 = get_storage(**storage_config) - - # Write objects to storage1 - for (obj_type, obj) in objects: - if obj_type == "content" and obj.status == "absent": - obj_type = "skipped_content" - - if obj_type == "origin_visit": - storage1.origin_add_one(Origin(url=obj.origin)) - storage1.origin_visit_upsert([obj]) - else: - method = getattr(storage1, obj_type + "_add") - try: - method([obj]) - except HashCollision: - pass - - # Bail out early if we didn't insert any relevant objects... - queue_size = len(queue) - assert queue_size != 0, "No test objects found; hypothesis strategy bug?" - - assert replayer.stop_after_objects is None - replayer.stop_after_objects = queue_size - - storage2 = get_storage(**storage_config) - worker_fn = functools.partial(process_replay_objects, storage=storage2) - - replayer.process(worker_fn) - - assert replayer.consumer.committed - - for attr_name in ( - "_contents", - "_directories", - "_snapshots", - "_origin_visits", - "_origins", - ): - assert getattr(storage1, attr_name) == getattr(storage2, attr_name), attr_name - - # When hypothesis generates a revision and a release with same - # author (or committer) fullname but different name or email, then - # the storage will use the first name/email it sees. - # This first one will be either the one from the revision or the release, - # and since there is no order guarantees, storage2 has 1/2 chance of - # not seeing the same order as storage1, therefore we need to strip - # them out before comparing. - for attr_name in ("_revisions", "_releases"): - items1 = { - k: empty_person_name_email(v) - for (k, v) in getattr(storage1, attr_name).items() - } - items2 = { - k: empty_person_name_email(v) - for (k, v) in getattr(storage2, attr_name).items() - } - assert items1 == items2, attr_name