diff --git a/swh/scrubber/cli.py b/swh/scrubber/cli.py --- a/swh/scrubber/cli.py +++ b/swh/scrubber/cli.py @@ -156,6 +156,8 @@ conf = ctx.obj["config"] if "storage" not in conf: ctx.fail("You must have a storage configured in your config file.") + if "graph" not in conf: + ctx.fail("You must have a graph configured in your config file.") from swh.graph.client import RemoteGraphClient from swh.model.model import CoreSWHID @@ -172,3 +174,24 @@ ) locator.run() + + +@scrubber_cli_group.command(name="fix") +@click.option("--start-object", default="swh:1:cnt:" + "00" * 20) +@click.option("--end-object", default="swh:1:snp:" + "ff" * 20) +@click.pass_context +def scrubber_fix_objects(ctx, start_object: str, end_object: str): + """For each known corrupt object reported in the scrubber DB, looks up origins + that may contain this object, and records them; so they can be used later + for recovery.""" + from swh.model.model import CoreSWHID + + from .fixer import Fixer + + fixer = Fixer( + db=ctx.obj["db"], + start_object=CoreSWHID.from_string(start_object), + end_object=CoreSWHID.from_string(end_object), + ) + + fixer.run() diff --git a/swh/scrubber/db.py b/swh/scrubber/db.py --- a/swh/scrubber/db.py +++ b/swh/scrubber/db.py @@ -7,7 +7,7 @@ import dataclasses import datetime import functools -from typing import Iterator, List +from typing import Iterator, List, Optional import psycopg2 @@ -36,6 +36,14 @@ object_: bytes +@dataclasses.dataclass(frozen=True) +class FixedObject: + id: CoreSWHID + object_: bytes + method: str + recovery_date: Optional[datetime.datetime] = None + + class ScrubberDb(BaseDb): current_version = 1 @@ -93,14 +101,37 @@ ), ) - def corrupt_object_grab( - self, - cur, - start_id: CoreSWHID = None, - end_id: CoreSWHID = None, - limit: int = 100, + def _corrupt_object_list_from_cursor( + self, cur: psycopg2.extensions.cursor ) -> List[CorruptObject]: - """Yields a page of records in the 'corrupt_object' table.""" + results = [] + for row in cur: + (id, first_occurrence, object_, ds_package, ds_class, ds_instance) = row + results.append( + CorruptObject( + id=CoreSWHID.from_string(id), + first_occurrence=first_occurrence, + object_=object_, + datastore=Datastore( + package=ds_package, cls=ds_class, instance=ds_instance + ), + ) + ) + + return results + + def corrupt_object_get( + self, start_id: CoreSWHID, end_id: CoreSWHID, limit: int = 100, + ) -> List[CorruptObject]: + """Yields a page of records in the 'corrupt_object' table, ordered by id. + + Arguments: + start_id: Only return objects after this id + end_id: Only return objects before this id + in_origin: An origin URL. If provided, only returns objects that may be + found in the given origin + """ + cur = self.cursor() cur.execute( """ SELECT @@ -116,24 +147,92 @@ """, (str(start_id), str(end_id), limit), ) + return self._corrupt_object_list_from_cursor(cur) - results = [] - for row in cur: - (id, first_occurrence, object_, ds_package, ds_class, ds_instance) = row - results.append( - CorruptObject( - id=CoreSWHID.from_string(id), - first_occurrence=first_occurrence, - object_=object_, - datastore=Datastore( - package=ds_package, cls=ds_class, instance=ds_instance - ), - ) - ) + def corrupt_object_grab_by_id( + self, + cur: psycopg2.extensions.cursor, + start_id: CoreSWHID, + end_id: CoreSWHID, + limit: int = 100, + ) -> List[CorruptObject]: + """Returns a page of records in the 'corrupt_object' table for a fixer, + ordered by id - return results + These records are not already fixed (ie. do not have a corresponding entry + in the 'fixed_object' table), and they are selected with an exclusive update + lock. - def object_origin_add(self, cur, swhid: CoreSWHID, origins: List[str]) -> None: + Arguments: + start_id: Only return objects after this id + end_id: Only return objects before this id + """ + cur.execute( + """ + SELECT + co.id, co.first_occurrence, co.object, + ds.package, ds.class, ds.instance + FROM corrupt_object AS co + INNER JOIN datastore AS ds ON (ds.id=co.datastore) + WHERE + co.id >= %(start_id)s + AND co.id <= %(end_id)s + AND NOT EXISTS (SELECT 1 FROM fixed_object WHERE fixed_object.id=co.id) + ORDER BY co.id + LIMIT %(limit)s + FOR UPDATE SKIP LOCKED + """, + dict(start_id=str(start_id), end_id=str(end_id), limit=limit,), + ) + return self._corrupt_object_list_from_cursor(cur) + + def corrupt_object_grab_by_origin( + self, + cur: psycopg2.extensions.cursor, + origin_url: str, + start_id: Optional[CoreSWHID] = None, + end_id: Optional[CoreSWHID] = None, + limit: int = 100, + ) -> List[CorruptObject]: + """Returns a page of records in the 'corrupt_object' table for a fixer, + ordered by id + + These records are not already fixed (ie. do not have a corresponding entry + in the 'fixed_object' table), and they are selected with an exclusive update + lock. + + Arguments: + origin_url: only returns objects that may be found in the given origin + """ + cur.execute( + """ + SELECT + co.id, co.first_occurrence, co.object, + ds.package, ds.class, ds.instance + FROM corrupt_object AS co + INNER JOIN datastore AS ds ON (ds.id=co.datastore) + INNER JOIN object_origin AS oo ON (oo.object_id=co.id) + WHERE + (co.id >= %(start_id)s OR %(start_id)s IS NULL) + AND (co.id <= %(end_id)s OR %(end_id)s IS NULL) + AND NOT EXISTS (SELECT 1 FROM fixed_object WHERE fixed_object.id=co.id) + AND oo.origin_url=%(origin_url)s + ORDER BY co.id + LIMIT %(limit)s + FOR UPDATE SKIP LOCKED + """, + dict( + start_id=None if start_id is None else str(start_id), + end_id=None if end_id is None else str(end_id), + origin_url=origin_url, + limit=limit, + ), + ) + return self._corrupt_object_list_from_cursor(cur) + + def object_origin_add( + self, cur: psycopg2.extensions.cursor, swhid: CoreSWHID, origins: List[str] + ) -> None: psycopg2.extras.execute_values( cur, """ @@ -143,3 +242,55 @@ """, [(str(swhid), origin_url) for origin_url in origins], ) + + def object_origin_get(self, after: str = "", limit: int = 1000) -> List[str]: + """Returns origins with non-fixed corrupt objects, ordered by URL. + + Arguments: + after: if given, only returns origins with an URL after this value + """ + cur = self.cursor() + cur.execute( + """ + SELECT DISTINCT origin_url + FROM object_origin + WHERE + origin_url > %(after)s + AND object_id IN ( + (SELECT id FROM corrupt_object) + EXCEPT (SELECT id FROM fixed_object) + ) + ORDER BY origin_url + LIMIT %(limit)s + """, + dict(after=after, limit=limit), + ) + + return [origin_url for (origin_url,) in cur] + + def fixed_object_add( + self, cur: psycopg2.extensions.cursor, fixed_objects: List[FixedObject] + ) -> None: + psycopg2.extras.execute_values( + cur, + """ + INSERT INTO fixed_object (id, object, method) + VALUES %s + ON CONFLICT DO NOTHING + """, + [ + (str(fixed_object.id), fixed_object.object_, fixed_object.method) + for fixed_object in fixed_objects + ], + ) + + def fixed_object_iter(self) -> Iterator[FixedObject]: + cur = self.cursor() + cur.execute("SELECT id, object, method, recovery_date FROM fixed_object") + for (id, object_, method, recovery_date) in cur: + yield FixedObject( + id=CoreSWHID.from_string(id), + object_=object_, + method=method, + recovery_date=recovery_date, + ) diff --git a/swh/scrubber/fixer.py b/swh/scrubber/fixer.py new file mode 100644 --- /dev/null +++ b/swh/scrubber/fixer.py @@ -0,0 +1,202 @@ +# Copyright (C) 2021-2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""Reads all objects in a swh-storage instance and recomputes their checksums.""" + +import dataclasses +import functools +import logging +import os +from pathlib import Path +import subprocess +import tempfile +from typing import Dict, Optional, Type, Union + +import dulwich +import dulwich.objects +import dulwich.repo +import psycopg2 + +from swh.journal.serializers import kafka_to_value, value_to_kafka +from swh.loader.git import converters +from swh.model.hashutil import hash_to_bytehex, hash_to_hex +from swh.model.model import BaseModel, Directory, Release, Revision, Snapshot +from swh.model.swhids import CoreSWHID, ObjectType + +from .db import CorruptObject, FixedObject, ScrubberDb +from .utils import iter_corrupt_objects + +logger = logging.getLogger(__name__) + +ScrubbableObject = Union[Revision, Release, Snapshot, Directory] + + +def get_object_from_clone( + clone_path: Path, swhid: CoreSWHID +) -> Union[None, bytes, dulwich.objects.ShaFile]: + """Reads the original object matching the ``corrupt_object`` from the given clone + if it exists, and returns a Dulwich object if possible, or a the raw manifest.""" + try: + repo = dulwich.repo.Repo(str(clone_path)) + except dulwich.errors.NotGitRepository: + return None + + with repo: # needed to avoid packfile fd leaks + try: + return repo[hash_to_bytehex(swhid.object_id)] + except KeyError: + return None + except dulwich.errors.ObjectFormatException: + # fallback to git if dulwich can't parse it. + # Unfortunately, Dulwich does not allow fetching an object without + # parsing it into a ShaFile subclass, so we have to manually get it + # by shelling out to git. + object_type = ( + subprocess.check_output( + [ + "git", + "-C", + clone_path, + "cat-file", + "-t", + hash_to_hex(swhid.object_id), + ] + ) + .decode() + .strip() + ) + manifest = subprocess.check_output( + [ + "git", + "-C", + clone_path, + "cat-file", + object_type, + hash_to_hex(swhid.object_id), + ] + ) + manifest = f"{object_type} {len(manifest)}\x00".encode() + manifest + logger.info("Dulwich failed to parse %r", manifest) + return manifest + + +def get_fixed_object_from_clone( + clone_path: Path, corrupt_object: CorruptObject +) -> Optional[FixedObject]: + """Reads the original object matching the ``corrupt_object`` from the given clone + if it exists, and returns a :class:`FixedObject` instance ready to be inserted + in the database.""" + cloned_dulwich_obj_or_manifest = get_object_from_clone( + clone_path, corrupt_object.id + ) + if cloned_dulwich_obj_or_manifest is None: + # Origin still exists, but object disappeared + logger.info("%s not found in origin", corrupt_object.id) + return None + elif isinstance(cloned_dulwich_obj_or_manifest, bytes): + # Dulwich could not parse it. Add as raw manifest to the existing object + d = kafka_to_value(corrupt_object.object_) + assert d.get("raw_manifest") is None, "Corrupt object has a raw_manifest" + d["raw_manifest"] = cloned_dulwich_obj_or_manifest + + # Rebuild the object from the stored corrupt object + the raw manifest + # just recovered; then checksum it. + classes: Dict[ObjectType, Type[BaseModel]] = { + ObjectType.REVISION: Revision, + ObjectType.DIRECTORY: Directory, + ObjectType.RELEASE: Release, + } + cls = classes[corrupt_object.id.object_type] + recovered_obj = cls.from_dict(d) + recovered_obj.check() + + return FixedObject( + id=corrupt_object.id, + object_=value_to_kafka(d), + method="manifest_from_origin", + ) + else: + converter = { + ObjectType.REVISION: converters.dulwich_commit_to_revision, + ObjectType.DIRECTORY: converters.dulwich_tree_to_directory, + ObjectType.RELEASE: converters.dulwich_tag_to_release, + }[corrupt_object.id.object_type] + cloned_obj = converter(cloned_dulwich_obj_or_manifest) + + # Check checksum, among others + cloned_obj.check() + + return FixedObject( + id=corrupt_object.id, + object_=value_to_kafka(cloned_obj.to_dict()), + method="from_origin", + ) + + +@dataclasses.dataclass +class Fixer: + """Reads a chunk of corrupt objects in the swh-scrubber database, tries to recover + them through various means (brute-forcing fields and re-downloading from the origin) + recomputes checksums, and writes them back to the swh-scrubber database + if successful. + + """ + + db: ScrubberDb + """Database to read from and write to.""" + start_object: CoreSWHID = CoreSWHID.from_string("swh:1:cnt:" + "00" * 20) + """Minimum SWHID to check (in alphabetical order)""" + end_object: CoreSWHID = CoreSWHID.from_string("swh:1:snp:" + "ff" * 20) + """Maximum SWHID to check (in alphabetical order)""" + + def run(self): + # TODO: currently only support re-downloading from the origin: + # we should try brute-forcing for objects with no known origin (or when + # all origins fail) + after = "" + while True: + new_origins = self.db.object_origin_get(after=after) + if not new_origins: + break + for origin_url in new_origins: + self.recover_objects_from_origin(origin_url) + after = new_origins[-1] + + def recover_objects_from_origin(self, origin_url): + """Clones an origin, and cherry-picks original objects that are known to be + corrupt in the database.""" + with tempfile.TemporaryDirectory(prefix=__name__ + ".") as tempdir: + clone_path = Path(tempdir) / "repository.git" + try: + subprocess.run( + ["git", "clone", "--bare", origin_url, clone_path], + env={"PATH": os.environ["PATH"], "GIT_TERMINAL_PROMPT": "0"}, + check=True, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + stdin=subprocess.DEVNULL, + ) + except Exception: + logger.exception("Failed to clone %s", origin_url) + return + + iter_corrupt_objects( + self.db, + self.start_object, + self.end_object, + origin_url, + functools.partial(self.recover_corrupt_object, clone_path=clone_path), + ) + + def recover_corrupt_object( + self, + corrupt_object: CorruptObject, + cur: psycopg2.extensions.cursor, + clone_path: Path, + ) -> None: + fixed_object = get_fixed_object_from_clone(clone_path, corrupt_object) + + if fixed_object is not None: + self.db.fixed_object_add(cur, [fixed_object]) diff --git a/swh/scrubber/origin_locator.py b/swh/scrubber/origin_locator.py --- a/swh/scrubber/origin_locator.py +++ b/swh/scrubber/origin_locator.py @@ -71,7 +71,11 @@ def run(self): iter_corrupt_objects( - self.db, self.start_object, self.end_object, self.handle_corrupt_object + self.db, + self.start_object, + self.end_object, + None, + self.handle_corrupt_object, ) def handle_corrupt_object( diff --git a/swh/scrubber/sql/30-schema.sql b/swh/scrubber/sql/30-schema.sql --- a/swh/scrubber/sql/30-schema.sql +++ b/swh/scrubber/sql/30-schema.sql @@ -35,3 +35,16 @@ ); comment on table object_origin is 'Maps objects to origins they might be found in.'; + +create table fixed_object +( + id swhid not null, + object bytea not null, + method text, + recovery_date timestamptz not null default now() +); + +comment on table fixed_object is 'Each row identifies an object that was found to be corrupt, along with the original version of the object'; +comment on column fixed_object.object is 'The recovered object itself, as a msgpack-encoded dict'; +comment on column fixed_object.recovery_date is 'Moment the object was recovered.'; +comment on column fixed_object.method is 'How the object was recovered. For example: "from_origin", "negative_utc", "capitalized_revision_parent".'; diff --git a/swh/scrubber/sql/60-indexes.sql b/swh/scrubber/sql/60-indexes.sql --- a/swh/scrubber/sql/60-indexes.sql +++ b/swh/scrubber/sql/60-indexes.sql @@ -22,3 +22,8 @@ -- FIXME: not valid, because corrupt_object(id) is not unique -- alter table object_origin add constraint object_origin_object_fkey foreign key (object_id) references corrupt_object(id) not valid; -- alter table object_origin validate constraint object_origin_object_fkey; + +-- fixed_object + +create unique index concurrently fixed_object_pkey on fixed_object(id); +alter table fixed_object add primary key using index fixed_object_pkey; diff --git a/swh/scrubber/tests/test_cli.py b/swh/scrubber/tests/test_cli.py --- a/swh/scrubber/tests/test_cli.py +++ b/swh/scrubber/tests/test_cli.py @@ -138,3 +138,22 @@ end_object=CoreSWHID.from_string("swh:1:snp:" + "ff" * 20), ) assert origin_locator.method_calls == [call.run()] + + +def test_fix_objects(mocker, scrubber_db): + fixer = MagicMock() + Fixer = mocker.patch("swh.scrubber.fixer.Fixer", return_value=fixer) + get_scrubber_db = mocker.patch( + "swh.scrubber.get_scrubber_db", return_value=scrubber_db + ) + result = invoke(scrubber_db, ["fix"]) + assert result.exit_code == 0, result.output + assert result.output == "" + + get_scrubber_db.assert_called_once_with(cls="local", db=scrubber_db.conn.dsn) + Fixer.assert_called_once_with( + db=scrubber_db, + start_object=CoreSWHID.from_string("swh:1:cnt:" + "00" * 20), + end_object=CoreSWHID.from_string("swh:1:snp:" + "ff" * 20), + ) + assert fixer.method_calls == [call.run()] diff --git a/swh/scrubber/tests/test_fixer.py b/swh/scrubber/tests/test_fixer.py new file mode 100644 --- /dev/null +++ b/swh/scrubber/tests/test_fixer.py @@ -0,0 +1,331 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import datetime +import logging +from pathlib import Path +import subprocess +from unittest.mock import MagicMock +import zlib + +import attr + +from swh.journal.serializers import kafka_to_value, value_to_kafka +from swh.model.hashutil import hash_to_bytes +from swh.model.model import Directory, DirectoryEntry +from swh.model.tests.swh_model_data import DIRECTORIES +from swh.scrubber.db import CorruptObject, Datastore, FixedObject, ScrubberDb +from swh.scrubber.fixer import Fixer + +(DIRECTORY,) = [dir_ for dir_ in DIRECTORIES if len(dir_.entries) > 1] + +# ORIGINAL_DIRECTORY represents a directory with entries in non-canonical order, +# and a consistent hash. Its entries' were canonically reordered, but the original +# order is still present in the raw manifest. +_DIR = Directory(entries=tuple(reversed(DIRECTORY.entries))) +ORIGINAL_DIRECTORY = Directory( + entries=( + DirectoryEntry( + name=b"dir1", + type="dir", + target=hash_to_bytes("4b825dc642cb6eb9a060e54bf8d69288fbee4904"), + perms=0o040755, + ), + DirectoryEntry( + name=b"file1.ext", + type="file", + target=hash_to_bytes("86bc6b377e9d25f9d26777a4a28d08e63e7c5779"), + perms=0o644, + ), + DirectoryEntry( + name=b"subprepo1", + type="rev", + target=hash_to_bytes("c7f96242d73c267adc77c2908e64e0c1cb6a4431"), + perms=0o160000, + ), + ), + raw_manifest=( + b"tree 102\x00" + b"160000 subprepo1\x00\xc7\xf9bB\xd7<&z\xdcw\xc2\x90\x8ed\xe0\xc1\xcbjD1" + b"644 file1.ext\x00\x86\xbck7~\x9d%\xf9\xd2gw\xa4\xa2\x8d\x08\xe6>|Wy" + b"40755 dir1\x00K\x82]\xc6B\xcbn\xb9\xa0`\xe5K\xf8\xd6\x92\x88\xfb\xeeI\x04" + ), +) + +# A directory with its entries in canonical order, but a hash computed as if +# computed in the reverse order. +# This happens when entries get normalized (either by the loader or accidentally +# in swh-storage) +CORRUPT_DIRECTORY = attr.evolve(ORIGINAL_DIRECTORY, raw_manifest=None) + + +assert ORIGINAL_DIRECTORY != CORRUPT_DIRECTORY +assert ( + hash_to_bytes("61992617462fff81509bda4a24b54c96ea74a007") + == ORIGINAL_DIRECTORY.id + == CORRUPT_DIRECTORY.id +) +assert ( + hash_to_bytes("81fda5b242e65fc81201e590d0f0ce5f582fbcdd") + == CORRUPT_DIRECTORY.compute_hash() + != CORRUPT_DIRECTORY.id +) +assert ORIGINAL_DIRECTORY.entries == CORRUPT_DIRECTORY.entries + +DATASTORE = Datastore(package="storage", cls="postgresql", instance="service=swh") +CORRUPT_OBJECT = CorruptObject( + id=ORIGINAL_DIRECTORY.swhid(), + datastore=DATASTORE, + first_occurrence=datetime.datetime.now(tz=datetime.timezone.utc), + object_=value_to_kafka(CORRUPT_DIRECTORY.to_dict()), +) + + +def test_no_object(scrubber_db: ScrubberDb, mocker) -> None: + """There is no object to recover -> nothing happens""" + fixer = Fixer(db=scrubber_db) + fixer.run() + + with scrubber_db.conn.cursor() as cur: + cur.execute("SELECT COUNT(*) FROM fixed_object") + assert cur.fetchone() == (0,) + + +def test_no_origin(scrubber_db: ScrubberDb, mocker) -> None: + """There is no origin to recover objects from -> nothing happens""" + scrubber_db.corrupt_object_add( + CORRUPT_OBJECT.id, CORRUPT_OBJECT.datastore, CORRUPT_OBJECT.object_ + ) + + fixer = Fixer(db=scrubber_db) + fixer.run() + + with scrubber_db.conn.cursor() as cur: + cur.execute("SELECT COUNT(*) FROM fixed_object") + assert cur.fetchone() == (0,) + + +def test_already_fixed(scrubber_db: ScrubberDb, mocker) -> None: + """All corrupt objects are already fixed -> nothing happens""" + fixed_object = FixedObject( + id=CORRUPT_OBJECT.id, + object_=value_to_kafka(ORIGINAL_DIRECTORY.to_dict()), + method="whatever means necessary", + ) + scrubber_db.corrupt_object_add( + CORRUPT_OBJECT.id, CORRUPT_OBJECT.datastore, CORRUPT_OBJECT.object_ + ) + with scrubber_db.cursor() as cur: + scrubber_db.object_origin_add(cur, CORRUPT_OBJECT.id, ["http://example.org/"]) + scrubber_db.fixed_object_add(cur, [fixed_object]) + + subprocess_run = mocker.patch("subprocess.run") + + scrubber_db = MagicMock(wraps=scrubber_db) + + fixer = Fixer(db=scrubber_db) + fixer.run() + + # Check the Fixer did not try to fix the object again + scrubber_db.fixed_object_add.assert_not_called() + subprocess_run.assert_not_called() + with scrubber_db.conn.cursor() as cur: + cur.execute("SELECT id, method FROM fixed_object") + assert list(cur) == [(str(fixed_object.id), fixed_object.method)] + + +def _run_fixer_with_clone( + scrubber_db: ScrubberDb, + mocker, + caplog, + corrupt_object: CorruptObject, + subprocess_run_side_effect, +) -> None: + """Helper for all tests that involve running the fixer with a clone: + adds a corrupt object and an origin to the DB, mocks subprocess.run with the + given function, and runs the fixer with caplog""" + scrubber_db.corrupt_object_add( + corrupt_object.id, corrupt_object.datastore, corrupt_object.object_ + ) + with scrubber_db.cursor() as cur: + scrubber_db.object_origin_add(cur, corrupt_object.id, ["http://example.org/"]) + + subprocess_run = mocker.patch( + "subprocess.run", side_effect=subprocess_run_side_effect + ) + + fixer = Fixer(db=scrubber_db) + with caplog.at_level(logging.CRITICAL): + with caplog.at_level(logging.INFO, logger="swh.scrubber.fixer"): + fixer.run() + + subprocess_run.assert_called() + + +def test_failed_clone(scrubber_db: ScrubberDb, mocker, caplog) -> None: + """Corrupt object found with an origin, but the origin's clone is broken somehow""" + scrubber_db = MagicMock(wraps=scrubber_db) + + _run_fixer_with_clone( + scrubber_db, + mocker, + caplog, + corrupt_object=CORRUPT_OBJECT, + subprocess_run_side_effect=subprocess.CalledProcessError(1, "foo"), + ) + + scrubber_db.fixed_object_add.assert_not_called() + with scrubber_db.conn.cursor() as cur: + cur.execute("SELECT id, method FROM fixed_object") + assert list(cur) == [] + + assert ( + "swh.scrubber.fixer", + logging.ERROR, + "Failed to clone http://example.org/", + ) in caplog.record_tuples + + +def test_empty_origin(scrubber_db: ScrubberDb, mocker, caplog) -> None: + """Corrupt object found with an origin, but the origin's clone is missing + the object""" + scrubber_db = MagicMock(wraps=scrubber_db) + real_subprocess_run = subprocess.run + + def subprocess_run(args, **kwargs): + (*head, path) = args + assert head == ["git", "clone", "--bare", "http://example.org/"] + real_subprocess_run(["git", "init", "--bare", path]) + + _run_fixer_with_clone( + scrubber_db, + mocker, + caplog, + corrupt_object=CORRUPT_OBJECT, + subprocess_run_side_effect=subprocess_run, + ) + + scrubber_db.fixed_object_add.assert_not_called() + with scrubber_db.conn.cursor() as cur: + cur.execute("SELECT id, method FROM fixed_object") + assert list(cur) == [] + + assert ( + "swh.scrubber.fixer", + logging.INFO, + "swh:1:dir:61992617462fff81509bda4a24b54c96ea74a007 not found in origin", + ) in caplog.record_tuples + + +def test_parseable_directory_from_origin( + scrubber_db: ScrubberDb, mocker, caplog +) -> None: + """Corrupt object found with an origin, and the object is found in the origin's + clone as expected.""" + scrubber_db = MagicMock(wraps=scrubber_db) + real_subprocess_run = subprocess.run + + def subprocess_run(args, **kwargs): + (*head, path) = args + assert head == ["git", "clone", "--bare", "http://example.org/"] + real_subprocess_run(["git", "init", "--bare", path]) + object_dir_path = Path(path) / "objects/61" + object_path = object_dir_path / "992617462fff81509bda4a24b54c96ea74a007" + object_dir_path.mkdir() + with open(object_path, "wb") as fd: + fd.write(zlib.compress(ORIGINAL_DIRECTORY.raw_manifest)) + + _run_fixer_with_clone( + scrubber_db, + mocker, + caplog, + corrupt_object=CORRUPT_OBJECT, + subprocess_run_side_effect=subprocess_run, + ) + + scrubber_db.fixed_object_add.assert_called_once() + fixed_objects = list(scrubber_db.fixed_object_iter()) + assert len(fixed_objects) == 1 + + assert fixed_objects[0].id == ORIGINAL_DIRECTORY.swhid() + assert fixed_objects[0].method == "from_origin" + assert ( + Directory.from_dict(kafka_to_value(fixed_objects[0].object_)) + == ORIGINAL_DIRECTORY + ) + + assert caplog.record_tuples == [] + + +def test_unparseable_directory(scrubber_db: ScrubberDb, mocker, caplog) -> None: + """Corrupt object found with an origin, and the object is found in the origin's + clone as expected; but Dulwich cannot parse it. + It was probably loaded by an old version of the loader that was more permissive, + by using libgit2.""" + scrubber_db = MagicMock(wraps=scrubber_db) + real_subprocess_run = subprocess.run + + raw_manifest = b"this is not a parseable manifest" + raw_manifest = f"tree {len(raw_manifest)}\x00".encode() + raw_manifest + + original_directory = Directory( + entries=( + DirectoryEntry( + name=b"dir1", + type="dir", + target=hash_to_bytes("4b825dc642cb6eb9a060e54bf8d69288fbee4904"), + perms=0o040755, + ), + ), + raw_manifest=raw_manifest, + ) + assert original_directory.id.hex() == "a518fa6b46bad74e95588d2bfdf4455398a2216a" + + corrupt_directory = attr.evolve(original_directory, raw_manifest=None) + corrupt_object = CorruptObject( + id=original_directory.swhid(), + datastore=DATASTORE, + object_=value_to_kafka(corrupt_directory.to_dict()), + first_occurrence=datetime.datetime.now(tz=datetime.timezone.utc), + ) + + def subprocess_run(args, **kwargs): + (*head, path) = args + if head[0:2] != ["git", "clone"]: + return real_subprocess_run(args, **kwargs) + assert head == ["git", "clone", "--bare", "http://example.org/"] + real_subprocess_run(["git", "init", "--bare", path]) + object_dir_path = Path(path) / "objects/a5" + object_path = object_dir_path / "18fa6b46bad74e95588d2bfdf4455398a2216a" + object_dir_path.mkdir() + with open(object_path, "wb") as fd: + fd.write(zlib.compress(raw_manifest)) + + _run_fixer_with_clone( + scrubber_db, + mocker, + caplog, + corrupt_object=corrupt_object, + subprocess_run_side_effect=subprocess_run, + ) + + scrubber_db.fixed_object_add.assert_called_once() + fixed_objects = list(scrubber_db.fixed_object_iter()) + assert len(fixed_objects) == 1 + + assert fixed_objects[0].id == original_directory.swhid() + assert fixed_objects[0].method == "manifest_from_origin" + assert ( + Directory.from_dict(kafka_to_value(fixed_objects[0].object_)) + == original_directory + ) + + assert caplog.record_tuples == [ + ( + "swh.scrubber.fixer", + logging.INFO, + r"Dulwich failed to parse b'tree 32\x00this is not a parseable manifest'", + ) + ] diff --git a/swh/scrubber/utils.py b/swh/scrubber/utils.py --- a/swh/scrubber/utils.py +++ b/swh/scrubber/utils.py @@ -3,7 +3,7 @@ # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information -from typing import Callable +from typing import Callable, Optional import psycopg2 @@ -16,11 +16,23 @@ db: ScrubberDb, start_object: CoreSWHID, end_object: CoreSWHID, + origin_url: Optional[str], cb: Callable[[CorruptObject, psycopg2.extensions.cursor], None], ) -> None: + """Fetches objects and calls ``cb`` on each of them. + + objects are fetched with an update lock, with the same transaction as ``cb``, + which is automatically committed after ``cb`` runs.""" while True: with db.conn, db.cursor() as cur: - corrupt_objects = db.corrupt_object_grab(cur, start_object, end_object,) + if origin_url: + corrupt_objects = db.corrupt_object_grab_by_origin( + cur, origin_url, start_object, end_object + ) + else: + corrupt_objects = db.corrupt_object_grab_by_id( + cur, start_object, end_object + ) if corrupt_objects and corrupt_objects[0].id == start_object: # TODO: don't needlessly fetch duplicate objects del corrupt_objects[0]