diff --git a/mypy.ini b/mypy.ini --- a/mypy.ini +++ b/mypy.ini @@ -8,6 +8,9 @@ [mypy-pkg_resources.*] ignore_missing_imports = True +[mypy-psycopg2.*] +ignore_missing_imports = True + [mypy-pytest.*] ignore_missing_imports = True diff --git a/requirements-test.txt b/requirements-test.txt --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,4 +1,5 @@ pytest < 7.0.0 # v7.0.0 removed _pytest.tmpdir.TempdirFactory, which is used by some of the pytest plugins we use pytest-mock pyyaml +swh.graph types-pyyaml diff --git a/swh/scrubber/cli.py b/swh/scrubber/cli.py --- a/swh/scrubber/cli.py +++ b/swh/scrubber/cli.py @@ -28,11 +28,12 @@ scrubber_db: cls: local - db: "service=..." # libpq DSN + db: "service=..." # libpq DSN - # for storage checkers only: + # for storage checkers + origin locator only: storage: - cls: postgresql # cannot be remote, as it needs direct access to the pg DB + cls: postgresql # cannot be remote for checkers, as they need direct + # access to the pg DB db": "service=..." # libpq DSN objstorage: cls: memory @@ -104,8 +105,8 @@ ] ), ) -@click.option("--start-object", default="0" * 40) -@click.option("--end-object", default="f" * 40) +@click.option("--start-object", default="00" * 20) +@click.option("--end-object", default="ff" * 20) @click.pass_context def scrubber_check_storage(ctx, object_type: str, start_object: str, end_object: str): """Reads a postgresql storage, and reports corrupt objects to the scrubber DB.""" @@ -142,3 +143,32 @@ checker = JournalChecker(db=ctx.obj["db"], journal_client=conf["journal_client"],) checker.run() + + +@scrubber_cli_group.command(name="locate") +@click.option("--start-object", default="swh:1:cnt:" + "00" * 20) +@click.option("--end-object", default="swh:1:snp:" + "ff" * 20) +@click.pass_context +def scrubber_locate_origins(ctx, start_object: str, end_object: str): + """For each known corrupt object reported in the scrubber DB, looks up origins + that may contain this object, and records them; so they can be used later + for recovery.""" + conf = ctx.obj["config"] + if "storage" not in conf: + ctx.fail("You must have a storage configured in your config file.") + + from swh.graph.client import RemoteGraphClient + from swh.model.model import CoreSWHID + from swh.storage import get_storage + + from .origin_locator import OriginLocator + + locator = OriginLocator( + db=ctx.obj["db"], + storage=get_storage(**conf["storage"]), + graph=RemoteGraphClient(**conf["graph"]), + start_object=CoreSWHID.from_string(start_object), + end_object=CoreSWHID.from_string(end_object), + ) + + locator.run() diff --git a/swh/scrubber/db.py b/swh/scrubber/db.py --- a/swh/scrubber/db.py +++ b/swh/scrubber/db.py @@ -7,10 +7,11 @@ import dataclasses import datetime import functools -from typing import Iterator, Union +from typing import Iterator, List + +import psycopg2 from swh.core.db import BaseDb -from swh.model.model import Content, Directory, Release, Revision, Snapshot from swh.model.swhids import CoreSWHID @@ -55,10 +56,7 @@ return id_ def corrupt_object_add( - self, - datastore: Datastore, - object_: Union[Content, Directory, Revision, Release, Snapshot], - serialized_object: bytes, + self, id: CoreSWHID, datastore: Datastore, serialized_object: bytes, ) -> None: datastore_id = self.datastore_get_or_add(datastore) cur = self.cursor() @@ -68,7 +66,7 @@ VALUES (%s, %s, %s) ON CONFLICT DO NOTHING """, - (str(object_.swhid()), datastore_id, serialized_object), + (str(id), datastore_id, serialized_object), ) def corrupt_object_iter(self) -> Iterator[CorruptObject]: @@ -94,3 +92,54 @@ package=ds_package, cls=ds_class, instance=ds_instance ), ) + + def corrupt_object_grab( + self, + cur, + start_id: CoreSWHID = None, + end_id: CoreSWHID = None, + limit: int = 100, + ) -> List[CorruptObject]: + """Yields a page of records in the 'corrupt_object' table.""" + cur.execute( + """ + SELECT + co.id, co.first_occurrence, co.object, + ds.package, ds.class, ds.instance + FROM corrupt_object AS co + INNER JOIN datastore AS ds ON (ds.id=co.datastore) + WHERE + co.id >= %s + AND co.id <= %s + ORDER BY co.id + LIMIT %s + """, + (str(start_id), str(end_id), limit), + ) + + results = [] + for row in cur: + (id, first_occurrence, object_, ds_package, ds_class, ds_instance) = row + results.append( + CorruptObject( + id=CoreSWHID.from_string(id), + first_occurrence=first_occurrence, + object_=object_, + datastore=Datastore( + package=ds_package, cls=ds_class, instance=ds_instance + ), + ) + ) + + return results + + def object_origin_add(self, cur, swhid: CoreSWHID, origins: List[str]) -> None: + psycopg2.extras.execute_values( + cur, + """ + INSERT INTO object_origin (object_id, origin_url) + VALUES %s + ON CONFLICT DO NOTHING + """, + [(str(swhid), origin_url) for origin_url in origins], + ) diff --git a/swh/scrubber/journal_checker.py b/swh/scrubber/journal_checker.py --- a/swh/scrubber/journal_checker.py +++ b/swh/scrubber/journal_checker.py @@ -66,4 +66,6 @@ object_ = cls.from_dict(kafka_to_value(message)) real_id = object_.compute_hash() if object_.id != real_id: - self.db.corrupt_object_add(self.datastore_info(), object_, message) + self.db.corrupt_object_add( + object_.swhid(), self.datastore_info(), message + ) diff --git a/swh/scrubber/origin_locator.py b/swh/scrubber/origin_locator.py new file mode 100644 --- /dev/null +++ b/swh/scrubber/origin_locator.py @@ -0,0 +1,87 @@ +# Copyright (C) 2021-2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""Lists corrupt objects in the scrubber database, and lists candidate origins +to recover them from.""" + +import dataclasses +import itertools +import logging +from typing import Iterable, Union + +import psycopg2 + +from swh.core.utils import grouper +from swh.graph.client import GraphArgumentException, RemoteGraphClient +from swh.model.model import Directory, Release, Revision, Snapshot +from swh.model.swhids import CoreSWHID, ExtendedSWHID +from swh.storage.interface import StorageInterface + +from .db import CorruptObject, ScrubberDb +from .utils import iter_corrupt_objects + +logger = logging.getLogger(__name__) + +ScrubbableObject = Union[Revision, Release, Snapshot, Directory] + + +def get_origins( + graph: RemoteGraphClient, storage: StorageInterface, swhid: CoreSWHID +) -> Iterable[str]: + try: + origin_swhids = [ + ExtendedSWHID.from_string(line) + for line in graph.leaves(str(swhid), direction="backward") + if line.startswith("swh:1:ori:") + ] + except GraphArgumentException: + return + + for origin_swhid_group in grouper(origin_swhids, 10): + origin_swhid_group = list(origin_swhid_group) + for (origin, origin_swhid) in zip( + storage.origin_get_by_sha1( + [origin_swhid.object_id for origin_swhid in origin_swhid_group] + ), + origin_swhid_group, + ): + if origin is None: + logger.error("%s found in graph but missing from storage", origin_swhid) + else: + yield origin["url"] + + +@dataclasses.dataclass +class OriginLocator: + """Reads a chunk of corrupt objects in the swh-scrubber database, then writes + to the same database a list of origins they might be recovered from.""" + + db: ScrubberDb + """Database to read from and write to.""" + graph: RemoteGraphClient + storage: StorageInterface + """Used to resolve origin SHA1s to URLs.""" + + start_object: CoreSWHID + """Minimum SWHID to check (in alphabetical order)""" + end_object: CoreSWHID + """Maximum SWHID to check (in alphabetical order)""" + + def run(self): + iter_corrupt_objects( + self.db, self.start_object, self.end_object, self.handle_corrupt_object + ) + + def handle_corrupt_object( + self, corrupt_object: CorruptObject, cur: psycopg2.extensions.cursor + ) -> None: + origins = get_origins(self.graph, self.storage, corrupt_object.id) + + # Keep only 100 origins, to avoid flooding the DB. + # It is very unlikely an object disappred from 100 somwhat-randomly sampled + # origins. + first_origins = list(itertools.islice(origins, 0, 100)) + + self.db.object_origin_add(cur, corrupt_object.id, first_origins) diff --git a/swh/scrubber/sql/30-schema.sql b/swh/scrubber/sql/30-schema.sql --- a/swh/scrubber/sql/30-schema.sql +++ b/swh/scrubber/sql/30-schema.sql @@ -18,11 +18,20 @@ ( id swhid not null, datastore int not null, - first_occurrence timestamptz not null default now(), - object bytea not null + object bytea not null, + first_occurrence timestamptz not null default now() ); comment on table corrupt_object is 'Each row identifies an object that was found to be corrupt'; comment on column corrupt_object.datastore is 'Datastore the corrupt object was found in.'; -comment on column corrupt_object.first_occurrence is 'Moment the object was found to be corrupt for the first time'; comment on column corrupt_object.object is 'Corrupt object, as found in the datastore (possibly msgpack-encoded, using the journal''s serializer)'; +comment on column corrupt_object.first_occurrence is 'Moment the object was found to be corrupt for the first time'; + +create table object_origin +( + object_id swhid not null, + origin_url text not null, + last_attempt timestamptz -- NULL if not tried yet +); + +comment on table object_origin is 'Maps objects to origins they might be found in.'; diff --git a/swh/scrubber/sql/60-indexes.sql b/swh/scrubber/sql/60-indexes.sql --- a/swh/scrubber/sql/60-indexes.sql +++ b/swh/scrubber/sql/60-indexes.sql @@ -5,9 +5,20 @@ create unique index concurrently datastore_package_class_instance on datastore(package, class, instance); + -- corrupt_object alter table corrupt_object add constraint corrupt_object_datastore_fkey foreign key (datastore) references datastore(id) not valid; alter table corrupt_object validate constraint corrupt_object_datastore_fkey; -create unique index corrupt_object_pkey on corrupt_object(id, datastore); +create unique index concurrently corrupt_object_pkey on corrupt_object(id, datastore); +alter table corrupt_object add primary key using index corrupt_object_pkey; + +-- object_origin + +create unique index concurrently object_origin_pkey on object_origin (object_id, origin_url); +create index concurrently object_origin_by_origin on object_origin (origin_url, object_id); + +-- FIXME: not valid, because corrupt_object(id) is not unique +-- alter table object_origin add constraint object_origin_object_fkey foreign key (object_id) references corrupt_object(id) not valid; +-- alter table object_origin validate constraint object_origin_object_fkey; diff --git a/swh/scrubber/storage_checker.py b/swh/scrubber/storage_checker.py --- a/swh/scrubber/storage_checker.py +++ b/swh/scrubber/storage_checker.py @@ -98,5 +98,7 @@ real_id = object_.compute_hash() if object_.id != real_id: self.db.corrupt_object_add( - self.datastore_info(), object_, value_to_kafka(object_.to_dict()) + object_.swhid(), + self.datastore_info(), + value_to_kafka(object_.to_dict()), ) diff --git a/swh/scrubber/tests/test_cli.py b/swh/scrubber/tests/test_cli.py --- a/swh/scrubber/tests/test_cli.py +++ b/swh/scrubber/tests/test_cli.py @@ -9,6 +9,7 @@ from click.testing import CliRunner import yaml +from swh.model.swhids import CoreSWHID from swh.scrubber.cli import scrubber_cli_group from swh.scrubber.storage_checker import storage_db @@ -25,6 +26,7 @@ config = { "scrubber_db": {"cls": "local", "db": scrubber_db.conn.dsn}, + "graph": {"url": "http://graph.example.org:5009/"}, } if storage: with storage_db(storage) as db: @@ -113,3 +115,26 @@ }, ) assert journal_checker.method_calls == [call.run()] + + +def test_locate_origins(mocker, scrubber_db, swh_storage): + origin_locator = MagicMock() + OriginLocator = mocker.patch( + "swh.scrubber.origin_locator.OriginLocator", return_value=origin_locator + ) + get_scrubber_db = mocker.patch( + "swh.scrubber.get_scrubber_db", return_value=scrubber_db + ) + result = invoke(scrubber_db, ["locate"], storage=swh_storage) + assert result.exit_code == 0, result.output + assert result.output == "" + + get_scrubber_db.assert_called_once_with(cls="local", db=scrubber_db.conn.dsn) + OriginLocator.assert_called_once_with( + db=scrubber_db, + storage=OriginLocator.mock_calls[0][2]["storage"], + graph=OriginLocator.mock_calls[0][2]["graph"], + start_object=CoreSWHID.from_string("swh:1:cnt:" + "00" * 20), + end_object=CoreSWHID.from_string("swh:1:snp:" + "ff" * 20), + ) + assert origin_locator.method_calls == [call.run()] diff --git a/swh/scrubber/tests/test_origin_locator.py b/swh/scrubber/tests/test_origin_locator.py new file mode 100644 --- /dev/null +++ b/swh/scrubber/tests/test_origin_locator.py @@ -0,0 +1,170 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import datetime +import logging +from unittest.mock import MagicMock + +import pytest + +from swh.graph.naive_client import NaiveClient as NaiveGraphClient +from swh.model.model import Origin +from swh.model.swhids import CoreSWHID +from swh.scrubber.db import CorruptObject, Datastore +from swh.scrubber.origin_locator import OriginLocator + +CORRUPT_OBJECT = CorruptObject( + id=CoreSWHID.from_string("swh:1:cnt:" + "f" * 40), + datastore=Datastore(package="storage", cls="postgresql", instance="service=swh"), + first_occurrence=datetime.datetime.now(tz=datetime.timezone.utc), + object_=b"blah", +) + + +@pytest.mark.parametrize("insert", [False, True]) +def test_no_objects(scrubber_db, insert): + if insert: + scrubber_db.corrupt_object_add( + CORRUPT_OBJECT.id, CORRUPT_OBJECT.datastore, CORRUPT_OBJECT.object_ + ) + + graph = MagicMock() + storage = MagicMock() + locator = OriginLocator( + db=scrubber_db, + graph=graph, + storage=storage, + # this range does not contain the object above + start_object=CoreSWHID.from_string("swh:1:cnt:00" + "00" * 19), + end_object=CoreSWHID.from_string("swh:1:cnt:f0" + "00" * 19), + ) + + locator.run() + + assert graph.method_calls == [] + assert storage.method_calls == [] + + with scrubber_db.conn.cursor() as cur: + cur.execute("SELECT COUNT(*) FROM object_origin") + assert cur.fetchone() == (0,) + + +def test_object_not_in_graph(scrubber_db): + scrubber_db.corrupt_object_add( + CORRUPT_OBJECT.id, CORRUPT_OBJECT.datastore, CORRUPT_OBJECT.object_ + ) + + graph = NaiveGraphClient(nodes=[], edges=[]) + storage = MagicMock() + locator = OriginLocator( + db=scrubber_db, + graph=graph, + storage=storage, + start_object=CoreSWHID.from_string("swh:1:cnt:" + "00" * 20), + end_object=CoreSWHID.from_string("swh:1:cnt:" + "00" * 20), + ) + + locator.run() + + assert storage.method_calls == [] + + with scrubber_db.conn.cursor() as cur: + cur.execute("SELECT COUNT(*) FROM object_origin") + assert cur.fetchone() == (0,) + + +def test_origin_not_in_storage(scrubber_db, swh_storage, caplog): + scrubber_db.corrupt_object_add( + CORRUPT_OBJECT.id, CORRUPT_OBJECT.datastore, CORRUPT_OBJECT.object_ + ) + + origin = Origin(url="http://example.org") + + graph = NaiveGraphClient( + nodes=[CORRUPT_OBJECT.id, origin.swhid()], + edges=[(origin.swhid(), CORRUPT_OBJECT.id)], + ) + locator = OriginLocator( + db=scrubber_db, + graph=graph, + storage=swh_storage, + start_object=CoreSWHID.from_string("swh:1:cnt:" + "00" * 20), + end_object=CoreSWHID.from_string("swh:1:snp:" + "ff" * 20), + ) + + with caplog.at_level(logging.ERROR, logger="swh.scrubber.origin_locator"): + locator.run() + + with scrubber_db.conn.cursor() as cur: + cur.execute("SELECT COUNT(*) FROM object_origin") + assert cur.fetchone() == (0,) + + assert any( + f"{origin.swhid()} found in graph but missing" in record[2] + for record in caplog.record_tuples + ) + + +def test_two_origins(scrubber_db, swh_storage): + scrubber_db.corrupt_object_add( + CORRUPT_OBJECT.id, CORRUPT_OBJECT.datastore, CORRUPT_OBJECT.object_ + ) + + origin1 = Origin(url="http://example.org") + origin2 = Origin(url="http://example.com") + swh_storage.origin_add([origin1, origin2]) + + graph = NaiveGraphClient( + nodes=[CORRUPT_OBJECT.id, origin1.swhid(), origin2.swhid()], + edges=[ + (origin1.swhid(), CORRUPT_OBJECT.id), + (origin2.swhid(), CORRUPT_OBJECT.id), + ], + ) + locator = OriginLocator( + db=scrubber_db, + graph=graph, + storage=swh_storage, + start_object=CoreSWHID.from_string("swh:1:cnt:" + "00" * 20), + end_object=CoreSWHID.from_string("swh:1:snp:" + "ff" * 20), + ) + + locator.run() + + with scrubber_db.conn.cursor() as cur: + cur.execute("SELECT object_id, origin_url FROM object_origin") + assert set(cur) == { + (str(CORRUPT_OBJECT.id), origin1.url), + (str(CORRUPT_OBJECT.id), origin2.url), + } + + +def test_many_origins(scrubber_db, swh_storage): + scrubber_db.corrupt_object_add( + CORRUPT_OBJECT.id, CORRUPT_OBJECT.datastore, CORRUPT_OBJECT.object_ + ) + + origins = [Origin(url=f"http://example.org/{i}") for i in range(1000)] + swh_storage.origin_add(origins) + + graph = NaiveGraphClient( + nodes=[CORRUPT_OBJECT.id] + [origin.swhid() for origin in origins], + edges=[(origin.swhid(), CORRUPT_OBJECT.id) for origin in origins], + ) + locator = OriginLocator( + db=scrubber_db, + graph=graph, + storage=swh_storage, + start_object=CoreSWHID.from_string("swh:1:cnt:" + "00" * 20), + end_object=CoreSWHID.from_string("swh:1:snp:" + "ff" * 20), + ) + + locator.run() + + with scrubber_db.conn.cursor() as cur: + cur.execute("SELECT object_id, origin_url FROM object_origin") + rows = set(cur) + assert rows <= {(str(CORRUPT_OBJECT.id), origin.url) for origin in origins} + assert len(rows) == 100 diff --git a/swh/scrubber/utils.py b/swh/scrubber/utils.py new file mode 100644 --- /dev/null +++ b/swh/scrubber/utils.py @@ -0,0 +1,34 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from typing import Callable + +import psycopg2 + +from swh.model.swhids import CoreSWHID + +from .db import CorruptObject, ScrubberDb + + +def iter_corrupt_objects( + db: ScrubberDb, + start_object: CoreSWHID, + end_object: CoreSWHID, + cb: Callable[[CorruptObject, psycopg2.extensions.cursor], None], +) -> None: + while True: + with db.conn, db.cursor() as cur: + corrupt_objects = db.corrupt_object_grab(cur, start_object, end_object,) + if corrupt_objects and corrupt_objects[0].id == start_object: + # TODO: don't needlessly fetch duplicate objects + del corrupt_objects[0] + if not corrupt_objects: + # Nothing more to do + break + for corrupt_object in corrupt_objects: + cb(corrupt_object, cur) + db.conn.commit() # XXX: is this redundant with db.conn.__exit__? + + start_object = corrupt_objects[-1].id