diff --git a/requirements-swh.txt b/requirements-swh.txt --- a/requirements-swh.txt +++ b/requirements-swh.txt @@ -1,4 +1,4 @@ swh.core[db,http] >= 2.10 swh.counters >= v0.8.0 -swh.model >= 6.0.0 +swh.model >= 6.3.0 swh.objstorage >= 0.2.2 diff --git a/swh/storage/backfill.py b/swh/storage/backfill.py --- a/swh/storage/backfill.py +++ b/swh/storage/backfill.py @@ -221,11 +221,15 @@ ) entries.append(entry) - return Directory( + (is_corrupt, dir_) = Directory.from_possibly_duplicated_entries( id=directory_d["id"], entries=tuple(entries), raw_manifest=directory_d["raw_manifest"], ) + if is_corrupt: + logger.info("%s has duplicated entries", dir_.swhid()) + + return dir_ def raw_extrinsic_metadata_converter( diff --git a/swh/storage/tests/test_backfill.py b/swh/storage/tests/test_backfill.py --- a/swh/storage/tests/test_backfill.py +++ b/swh/storage/tests/test_backfill.py @@ -1,4 +1,4 @@ -# Copyright (C) 2019-2021 The Software Heritage developers +# Copyright (C) 2019-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information @@ -7,9 +7,11 @@ import logging from unittest.mock import patch +import attr import pytest from swh.journal.client import JournalClient +from swh.model.model import Directory, DirectoryEntry from swh.model.tests.swh_model_data import TEST_OBJECTS from swh.storage import get_storage from swh.storage.backfill import ( @@ -17,6 +19,7 @@ JournalBackfiller, byte_ranges, compute_query, + fetch, raw_extrinsic_metadata_target_ranges, ) from swh.storage.in_memory import InMemoryStorage @@ -295,3 +298,67 @@ assert ( "this should not happen" not in record.message ), "Replayer ignored some message types, see captured logging" + + +def test_backfiller__duplicate_directory_entries( + swh_storage_backend_config, + kafka_prefix: str, + kafka_consumer_group: str, + kafka_server: str, + caplog, +): + """Tests the backfiller doesn't crash when reading a legacy directory with + duplicated entries, which is no longer allowed. + Instead, it should slightly mangle entries and set a raw_manifest. + """ + storage = get_storage(**swh_storage_backend_config) + db = storage.get_db() # type: ignore + + run_validators = attr.get_run_validators() + attr.set_run_validators(False) + try: + invalid_directory = Directory( + entries=( + DirectoryEntry(name=b"foo", type="dir", target=b"\x01" * 20, perms=1), + DirectoryEntry(name=b"foo", type="file", target=b"\x00" * 20, perms=0), + ) + ) + finally: + attr.set_run_validators(run_validators) + storage.directory_add([invalid_directory]) + + # Make sure we successfully inserted a corrupt directory, otherwise this test + # is pointless + with db.conn.cursor() as cur: + cur.execute("select id, dir_entries, file_entries, raw_manifest from directory") + (row,) = cur + (id_, (dir_entry,), (file_entry,), raw_manifest) = row + assert id_ == invalid_directory.id + assert raw_manifest is None + cur.execute("select id, name, target from directory_entry_dir") + assert list(cur) == [(dir_entry, b"foo", b"\x01" * 20)] + cur.execute("select id, name, target from directory_entry_file") + assert list(cur) == [(file_entry, b"foo", b"\x00" * 20)] + + # Run the backfiller on the directory (which would crash if calling + # Directory() directly instead of Directory.from_possibly_duplicated_entries()) + directories = list(fetch(db, "directory", start=None, end=None)) + + # Make sure the directory looks as expected + deduplicated_directory = Directory( + id=invalid_directory.id, + entries=( + DirectoryEntry(name=b"foo", type="dir", target=b"\x01" * 20, perms=1), + DirectoryEntry( + name=b"foo_0000000000", type="file", target=b"\x00" * 20, perms=0 + ), + ), + raw_manifest=( + # fmt: off + b"tree 52\x00" + + b"0 foo\x00" + b"\x00" * 20 + + b"1 foo\x00" + b"\x01" * 20 + # fmt: on + ), + ) + assert directories == [deduplicated_directory]