diff --git a/swh/storage/replay.py b/swh/storage/replay.py index 0a15d08d..bbe30720 100644 --- a/swh/storage/replay.py +++ b/swh/storage/replay.py @@ -1,128 +1,132 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import logging from typing import Any, Callable, Dict, Iterable, List try: from systemd.daemon import notify except ImportError: notify = None from swh.core.statsd import statsd from swh.storage.fixer import fix_objects from swh.model.model import ( BaseContent, BaseModel, Content, Directory, Origin, OriginVisit, Revision, SkippedContent, Snapshot, Release, ) from swh.storage.exc import HashCollision logger = logging.getLogger(__name__) GRAPH_OPERATIONS_METRIC = "swh_graph_replayer_operations_total" GRAPH_DURATION_METRIC = "swh_graph_replayer_duration_seconds" object_converter_fn: Dict[str, Callable[[Dict], BaseModel]] = { "origin": Origin.from_dict, "origin_visit": OriginVisit.from_dict, "snapshot": Snapshot.from_dict, "revision": Revision.from_dict, "release": Release.from_dict, "directory": Directory.from_dict, "content": Content.from_dict, "skipped_content": SkippedContent.from_dict, } def process_replay_objects(all_objects, *, storage): for (object_type, objects) in all_objects.items(): logger.debug("Inserting %s %s objects", len(objects), object_type) with statsd.timed(GRAPH_DURATION_METRIC, tags={"object_type": object_type}): _insert_objects(object_type, objects, storage) statsd.increment( GRAPH_OPERATIONS_METRIC, len(objects), tags={"object_type": object_type} ) if notify: notify("WATCHDOG=1") def collision_aware_content_add( content_add_fn: Callable[[Iterable[Any]], None], contents: List[BaseContent] ) -> None: """Add contents to storage. If a hash collision is detected, an error is logged. Then this adds the other non colliding contents to the storage. Args: content_add_fn: Storage content callable contents: List of contents or skipped contents to add to storage """ if not contents: return colliding_content_hashes: List[Dict[str, Any]] = [] while True: try: content_add_fn(contents) except HashCollision as e: colliding_content_hashes.append( { "algo": e.algo, "hash": e.hash_id, # hex hash id "objects": e.colliding_contents, # hex hashes } ) colliding_hashes = e.colliding_content_hashes() # Drop the colliding contents from the transaction contents = [c for c in contents if c.hashes() not in colliding_hashes] else: # Successfully added contents, we are done break if colliding_content_hashes: for collision in colliding_content_hashes: logger.error("Collision detected: %(collision)s", {"collision": collision}) def _insert_objects(object_type: str, objects: List[Dict], storage) -> None: """Insert objects of type object_type in the storage. """ objects = fix_objects(object_type, objects) if object_type == "content": + # for bw compat, skipped content should now be delivered in the skipped_content + # topic contents: List[BaseContent] = [] skipped_contents: List[BaseContent] = [] for content in objects: c = BaseContent.from_dict(content) if isinstance(c, SkippedContent): skipped_contents.append(c) else: contents.append(c) - collision_aware_content_add(storage.skipped_content_add, skipped_contents) collision_aware_content_add(storage.content_add_metadata, contents) + if object_type == "skipped_content": + skipped_contents = [SkippedContent.from_dict(obj) for obj in objects] + collision_aware_content_add(storage.skipped_content_add, skipped_contents) elif object_type == "origin_visit": visits: List[OriginVisit] = [] origins: List[Origin] = [] for obj in objects: visit = OriginVisit.from_dict(obj) visits.append(visit) origins.append(Origin(url=visit.origin)) storage.origin_add(origins) storage.origin_visit_upsert(visits) - elif object_type in ("directory", "revision", "release", "snapshot", "origin"): + elif object_type in ("directory", "revision", "release", "snapshot", "origin",): method = getattr(storage, object_type + "_add") method(object_converter_fn[object_type](o) for o in objects) else: logger.warning("Received a series of %s, this should not happen", object_type) diff --git a/swh/storage/tests/test_replay.py b/swh/storage/tests/test_replay.py index 055b6b01..d31c57a7 100644 --- a/swh/storage/tests/test_replay.py +++ b/swh/storage/tests/test_replay.py @@ -1,390 +1,465 @@ # Copyright (C) 2019-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import functools import random import logging import dateutil from typing import Dict, List from confluent_kafka import Producer import pytest -from swh.model.hashutil import hash_to_hex +from swh.model.hashutil import hash_to_hex, MultiHash, DEFAULT_ALGORITHMS from swh.model.model import Content from swh.storage import get_storage from swh.storage.replay import process_replay_objects from swh.journal.serializers import key_to_kafka, value_to_kafka from swh.journal.client import JournalClient from swh.journal.tests.utils import MockedJournalClient, MockedKafkaWriter from swh.journal.tests.conftest import ( TEST_OBJECT_DICTS, DUPLICATE_CONTENTS, ) UTC = datetime.timezone.utc storage_config = {"cls": "pipeline", "steps": [{"cls": "memory"},]} def test_storage_play( kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, caplog, ): """Optimal replayer scenario. This: - writes objects to the topic - replayer consumes objects from the topic and replay them """ kafka_prefix += ".swh.journal.objects" storage = get_storage(**storage_config) producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test producer", "acks": "all", } ) now = datetime.datetime.now(tz=UTC) # Fill Kafka nb_sent = 0 nb_visits = 0 for object_type, objects in TEST_OBJECT_DICTS.items(): topic = f"{kafka_prefix}.{object_type}" for object_ in objects: key = bytes(random.randint(0, 255) for _ in range(40)) object_ = object_.copy() if object_type == "content": object_["ctime"] = now elif object_type == "origin_visit": nb_visits += 1 object_["visit"] = nb_visits producer.produce( topic=topic, key=key_to_kafka(key), value=value_to_kafka(object_), ) nb_sent += 1 producer.flush() caplog.set_level(logging.ERROR, "swh.journal.replay") # Fill the storage from Kafka replayer = JournalClient( brokers=kafka_server, group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=nb_sent, ) worker_fn = functools.partial(process_replay_objects, storage=storage) nb_inserted = 0 while nb_inserted < nb_sent: nb_inserted += replayer.process(worker_fn) assert nb_sent == nb_inserted # Check the objects were actually inserted in the storage assert TEST_OBJECT_DICTS["revision"] == list( storage.revision_get([rev["id"] for rev in TEST_OBJECT_DICTS["revision"]]) ) assert TEST_OBJECT_DICTS["release"] == list( storage.release_get([rel["id"] for rel in TEST_OBJECT_DICTS["release"]]) ) origins = list(storage.origin_get([orig for orig in TEST_OBJECT_DICTS["origin"]])) assert TEST_OBJECT_DICTS["origin"] == [{"url": orig["url"]} for orig in origins] for origin in origins: origin_url = origin["url"] expected_visits = [ { **visit, "origin": origin_url, "date": dateutil.parser.parse(visit["date"]), } for visit in TEST_OBJECT_DICTS["origin_visit"] if visit["origin"] == origin["url"] ] actual_visits = list(storage.origin_visit_get(origin_url)) for visit in actual_visits: del visit["visit"] # opaque identifier assert expected_visits == actual_visits input_contents = TEST_OBJECT_DICTS["content"] contents = storage.content_get_metadata([cont["sha1"] for cont in input_contents]) assert len(contents) == len(input_contents) assert contents == {cont["sha1"]: [cont] for cont in input_contents} collision = 0 for record in caplog.records: logtext = record.getMessage() if "Colliding contents:" in logtext: collision += 1 assert collision == 0, "No collision should be detected" def test_storage_play_with_collision( kafka_prefix: str, kafka_consumer_group: str, kafka_server: str, caplog, ): """Another replayer scenario with collisions. This: - writes objects to the topic, including colliding contents - replayer consumes objects from the topic and replay them - This drops the colliding contents from the replay when detected """ kafka_prefix += ".swh.journal.objects" storage = get_storage(**storage_config) producer = Producer( { "bootstrap.servers": kafka_server, "client.id": "test producer", "enable.idempotence": "true", } ) now = datetime.datetime.now(tz=UTC) # Fill Kafka nb_sent = 0 nb_visits = 0 for object_type, objects in TEST_OBJECT_DICTS.items(): topic = f"{kafka_prefix}.{object_type}" for object_ in objects: key = bytes(random.randint(0, 255) for _ in range(40)) object_ = object_.copy() if object_type == "content": object_["ctime"] = now elif object_type == "origin_visit": nb_visits += 1 object_["visit"] = nb_visits producer.produce( topic=topic, key=key_to_kafka(key), value=value_to_kafka(object_), ) nb_sent += 1 # Create collision in input data # They are not written in the destination for content in DUPLICATE_CONTENTS: topic = f"{kafka_prefix}.content" producer.produce( topic=topic, key=key_to_kafka(key), value=value_to_kafka(content), ) nb_sent += 1 producer.flush() caplog.set_level(logging.ERROR, "swh.journal.replay") # Fill the storage from Kafka replayer = JournalClient( brokers=kafka_server, group_id=kafka_consumer_group, prefix=kafka_prefix, stop_after_objects=nb_sent, ) worker_fn = functools.partial(process_replay_objects, storage=storage) nb_inserted = 0 while nb_inserted < nb_sent: nb_inserted += replayer.process(worker_fn) assert nb_sent == nb_inserted # Check the objects were actually inserted in the storage assert TEST_OBJECT_DICTS["revision"] == list( storage.revision_get([rev["id"] for rev in TEST_OBJECT_DICTS["revision"]]) ) assert TEST_OBJECT_DICTS["release"] == list( storage.release_get([rel["id"] for rel in TEST_OBJECT_DICTS["release"]]) ) origins = list(storage.origin_get([orig for orig in TEST_OBJECT_DICTS["origin"]])) assert TEST_OBJECT_DICTS["origin"] == [{"url": orig["url"]} for orig in origins] for origin in origins: origin_url = origin["url"] expected_visits = [ { **visit, "origin": origin_url, "date": dateutil.parser.parse(visit["date"]), } for visit in TEST_OBJECT_DICTS["origin_visit"] if visit["origin"] == origin["url"] ] actual_visits = list(storage.origin_visit_get(origin_url)) for visit in actual_visits: del visit["visit"] # opaque identifier assert expected_visits == actual_visits input_contents = TEST_OBJECT_DICTS["content"] contents = storage.content_get_metadata([cont["sha1"] for cont in input_contents]) assert len(contents) == len(input_contents) assert contents == {cont["sha1"]: [cont] for cont in input_contents} nb_collisions = 0 actual_collision: Dict for record in caplog.records: logtext = record.getMessage() if "Collision detected:" in logtext: nb_collisions += 1 actual_collision = record.args["collision"] assert nb_collisions == 1, "1 collision should be detected" algo = "sha1" assert actual_collision["algo"] == algo expected_colliding_hash = hash_to_hex(DUPLICATE_CONTENTS[0][algo]) assert actual_collision["hash"] == expected_colliding_hash actual_colliding_hashes = actual_collision["objects"] assert len(actual_colliding_hashes) == len(DUPLICATE_CONTENTS) for content in DUPLICATE_CONTENTS: expected_content_hashes = { k: hash_to_hex(v) for k, v in Content.from_dict(content).hashes().items() } assert expected_content_hashes in actual_colliding_hashes def _test_write_replay_origin_visit(visits: List[Dict]): """Helper function to write tests for origin_visit. Each visit (a dict) given in the 'visits' argument will be sent to a (mocked) kafka queue, which a in-memory-storage backed replayer is listening to. Check that corresponding origin visits entities are present in the storage and have correct values if they are not skipped. """ queue: List = [] replayer = MockedJournalClient(queue) writer = MockedKafkaWriter(queue) # Note that flipping the order of these two insertions will crash # the test, because the legacy origin_format does not allow to create # the origin when needed (type is missing) writer.send( "origin", "foo", { "url": "http://example.com/", "type": "git", # test the legacy origin format is accepted }, ) for visit in visits: writer.send("origin_visit", "foo", visit) queue_size = len(queue) assert replayer.stop_after_objects is None replayer.stop_after_objects = queue_size storage = get_storage(**storage_config) worker_fn = functools.partial(process_replay_objects, storage=storage) replayer.process(worker_fn) actual_visits = list(storage.origin_visit_get("http://example.com/")) assert len(actual_visits) == len(visits), actual_visits for vin, vout in zip(visits, actual_visits): vin = vin.copy() vout = vout.copy() assert vout.pop("origin") == "http://example.com/" vin.pop("origin") vin.setdefault("type", "git") vin.setdefault("metadata", None) assert vin == vout -def test_write_replay_origin_visit(): +def test_write_replay_origin_visit(kafka_server): """Test origin_visit when the 'origin' is just a string.""" now = datetime.datetime.now(tz=UTC) visits = [ { "visit": 1, "origin": "http://example.com/", "date": now, "type": "git", "status": "partial", "snapshot": None, } ] _test_write_replay_origin_visit(visits) def test_write_replay_legacy_origin_visit1(): """Origin_visit with no types should make the replayer crash We expect the journal's origin_visit topic to no longer reference such visits. If it does, the replayer must crash so we can fix the journal's topic. """ now = datetime.datetime.now(tz=UTC) visit = { "visit": 1, "origin": "http://example.com/", "date": now, "status": "partial", "snapshot": None, } now2 = datetime.datetime.now(tz=UTC) visit2 = { "visit": 2, "origin": {"url": "http://example.com/"}, "date": now2, "status": "partial", "snapshot": None, } for origin_visit in [visit, visit2]: with pytest.raises(ValueError, match="Old origin visit format"): _test_write_replay_origin_visit([origin_visit]) def test_write_replay_legacy_origin_visit2(): """Test origin_visit when 'type' is missing from the visit, but not from the origin.""" now = datetime.datetime.now(tz=UTC) visits = [ { "visit": 1, "origin": {"url": "http://example.com/", "type": "git",}, "date": now, "type": "git", "status": "partial", "snapshot": None, } ] _test_write_replay_origin_visit(visits) def test_write_replay_legacy_origin_visit3(): """Test origin_visit when the origin is a dict""" now = datetime.datetime.now(tz=UTC) visits = [ { "visit": 1, "origin": {"url": "http://example.com/",}, "date": now, "type": "git", "status": "partial", "snapshot": None, } ] _test_write_replay_origin_visit(visits) + + +def test_replay_skipped_content(kafka_server, kafka_prefix): + """Test the 'skipped_content' topic is properly replayed.""" + _check_replay_skipped_content(kafka_server, kafka_prefix, "skipped_content") + + +def test_replay_skipped_content_bwcompat(kafka_server, kafka_prefix): + """Test the 'content' topic can be used to replay SkippedContent objects.""" + _check_replay_skipped_content(kafka_server, kafka_prefix, "content") + + +def _check_replay_skipped_content(kafka_server, kafka_prefix, topic): + skipped_contents = _gen_skipped_contents(100) + nb_sent = len(skipped_contents) + + producer = Producer( + { + "bootstrap.servers": kafka_server, + "client.id": "test producer", + "acks": "all", + } + ) + replayer = JournalClient( + brokers=[kafka_server], + group_id="test consumer", + prefix=kafka_prefix, + stop_after_objects=nb_sent, + ) + assert f"{kafka_prefix}.skipped_content" in replayer.subscription + + for i, obj in enumerate(skipped_contents): + obj.pop("data", None) + producer.produce( + topic=f"{kafka_prefix}.{topic}", + key=key_to_kafka({"sha1": obj["sha1"]}), + value=value_to_kafka(obj), + ) + producer.flush() + + storage = get_storage(cls="memory") + + worker_fn = functools.partial(process_replay_objects, storage=storage) + nb_inserted = replayer.process(worker_fn) + assert nb_sent == nb_inserted + for content in skipped_contents: + assert not storage.content_find({"sha1": content["sha1"]}) + + # no skipped_content_find API endpoint, so use this instead + assert not list(storage.skipped_content_missing(skipped_contents)) + + +def _updated(d1, d2): + d1.update(d2) + d1.pop("data", None) + return d1 + + +def _gen_skipped_contents(n=10): + # we do not use the hypothesis strategy here because this does not play well with + # pytest fixtures, and it makes test execution very slow + algos = DEFAULT_ALGORITHMS | {"length"} + now = datetime.datetime.now(tz=UTC) + return [ + _updated( + MultiHash.from_data(data=f"foo{i}".encode(), hash_names=algos).digest(), + { + "status": "absent", + "reason": "why not", + "origin": f"https://somewhere/{i}", + "ctime": now, + }, + ) + for i in range(n) + ] diff --git a/swh/storage/writer.py b/swh/storage/writer.py index 70cd8a58..0aaba63b 100644 --- a/swh/storage/writer.py +++ b/swh/storage/writer.py @@ -1,88 +1,88 @@ # Copyright (C) 2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information from typing import Iterable from attr import evolve from swh.model.model import ( Origin, OriginVisit, Snapshot, Directory, Revision, Release, Content, SkippedContent, ) try: from swh.journal.writer import get_journal_writer except ImportError: get_journal_writer = None # type: ignore # mypy limitation, see https://github.com/python/mypy/issues/1153 class JournalWriter: """Journal writer storage collaborator. It's in charge of adding objects to the journal. """ def __init__(self, journal_writer): if journal_writer: if get_journal_writer is None: raise EnvironmentError( "You need the swh.journal package to use the " "journal_writer feature" ) self.journal = get_journal_writer(**journal_writer) else: self.journal = None def write_additions(self, obj_type, values) -> None: if self.journal: self.journal.write_additions(obj_type, values) def content_add(self, contents: Iterable[Content]) -> None: """Add contents to the journal. Drop the data field if provided. """ contents = [evolve(item, data=None) for item in contents] self.write_additions("content", contents) def content_update(self, contents: Iterable[Content]) -> None: if self.journal: raise NotImplementedError("content_update is not supported by the journal.") def content_add_metadata(self, contents: Iterable[Content]) -> None: self.content_add(contents) def skipped_content_add(self, contents: Iterable[SkippedContent]) -> None: - self.write_additions("content", contents) + self.write_additions("skipped_content", contents) def directory_add(self, directories: Iterable[Directory]) -> None: self.write_additions("directory", directories) def revision_add(self, revisions: Iterable[Revision]) -> None: self.write_additions("revision", revisions) def release_add(self, releases: Iterable[Release]) -> None: self.write_additions("release", releases) def snapshot_add(self, snapshots: Iterable[Snapshot]) -> None: self.write_additions("snapshot", snapshots) def origin_visit_add(self, visits: Iterable[OriginVisit]) -> None: self.write_additions("origin_visit", visits) def origin_visit_update(self, visits: Iterable[OriginVisit]) -> None: self.write_additions("origin_visit", visits) def origin_visit_upsert(self, visits: Iterable[OriginVisit]) -> None: self.write_additions("origin_visit", visits) def origin_add(self, origins: Iterable[Origin]) -> None: self.write_additions("origin", origins)