diff --git a/swh/storage/cassandra/storage.py b/swh/storage/cassandra/storage.py --- a/swh/storage/cassandra/storage.py +++ b/swh/storage/cassandra/storage.py @@ -119,20 +119,22 @@ c for c in contents if not self._cql_runner.content_get_from_pk(c.to_dict()) ] - self.journal_writer.content_add(contents) - if with_data: # First insert to the objstorage, if the endpoint is # `content_add` (as opposed to `content_add_metadata`). - # TODO: this should probably be done in concurrently to inserting - # in index tables (but still before the main table; so an entry is - # only added to the main table after everything else was - # successfully inserted. + + # Must add to the objstorage before the DB and journal. Otherwise: + # 1. in case of a crash the DB may "believe" we have the content, but + # we didn't have time to write to the objstorage before the crash + # 2. the objstorage mirroring, which reads from the journal, may attempt to + # read from the objstorage before we finished writing it summary = self.objstorage.content_add( c for c in contents if c.status != "absent" ) content_add_bytes = summary["content:add:bytes"] + self.journal_writer.content_add(contents) + content_add = 0 for content in contents: content_add += 1 diff --git a/swh/storage/postgresql/storage.py b/swh/storage/postgresql/storage.py --- a/swh/storage/postgresql/storage.py +++ b/swh/storage/postgresql/storage.py @@ -214,6 +214,11 @@ contents = [attr.evolve(c, ctime=ctime) for c in content] + # Must add to the objstorage before the DB and journal. Otherwise: + # 1. in case of a crash the DB may "believe" we have the content, but + # we didn't have time to write to the objstorage before the crash + # 2. the objstorage mirroring, which reads from the journal, may attempt to + # read from the objstorage before we finished writing it objstorage_summary = self.objstorage.content_add(contents) with self.db() as db: diff --git a/swh/storage/tests/storage_tests.py b/swh/storage/tests/storage_tests.py --- a/swh/storage/tests/storage_tests.py +++ b/swh/storage/tests/storage_tests.py @@ -11,6 +11,7 @@ import math import random from typing import Any, ClassVar, Dict, Iterator, Optional +from unittest.mock import MagicMock import attr from hypothesis import HealthCheck, given, settings, strategies @@ -372,6 +373,25 @@ cont1b.hashes(), ] + def test_content_add_objstorage_first(self, swh_storage, sample_data): + """Tests the objstorage is written to before the DB and journal""" + cont = sample_data.content + + swh_storage.objstorage.content_add = MagicMock(side_effect=Exception("Oops")) + + # Try to add, but the objstorage crashes + try: + swh_storage.content_add([cont]) + except Exception: + pass + + # The DB must be written to after the objstorage, so the DB should be + # unchanged if the objstorage crashed + assert swh_storage.content_get_data(cont.sha1) is None + + # The journal too + assert list(swh_storage.journal_writer.journal.objects) == [] + def test_skipped_content_add(self, swh_storage, sample_data): contents = sample_data.skipped_contents[:2] cont = contents[0] diff --git a/swh/storage/tests/test_api_client.py b/swh/storage/tests/test_api_client.py --- a/swh/storage/tests/test_api_client.py +++ b/swh/storage/tests/test_api_client.py @@ -58,6 +58,7 @@ journal_writer = getattr(storage, "journal_writer", None) storage.journal_writer = app_server.storage.journal_writer + storage.objstorage = app_server.storage.objstorage yield storage storage.journal_writer = journal_writer