diff --git a/mypy.ini b/mypy.ini index 46f8db8..9170785 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1,15 +1,18 @@ [mypy] namespace_packages = True warn_unused_ignores = True # 3rd party libraries without stubs (yet) [mypy-pkg_resources.*] ignore_missing_imports = True +[mypy-psycopg2.*] +ignore_missing_imports = True + [mypy-pytest.*] ignore_missing_imports = True # [mypy-add_your_lib_here.*] # ignore_missing_imports = True diff --git a/requirements-test.txt b/requirements-test.txt index cf20732..cdcf19b 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,4 +1,5 @@ pytest < 7.0.0 # v7.0.0 removed _pytest.tmpdir.TempdirFactory, which is used by some of the pytest plugins we use pytest-mock pyyaml +swh.graph types-pyyaml diff --git a/swh/scrubber/cli.py b/swh/scrubber/cli.py index f621289..bbd5079 100644 --- a/swh/scrubber/cli.py +++ b/swh/scrubber/cli.py @@ -1,144 +1,174 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import os from typing import Optional import click from swh.core.cli import CONTEXT_SETTINGS from swh.core.cli import swh as swh_cli_group @swh_cli_group.group(name="scrubber", context_settings=CONTEXT_SETTINGS) @click.option( "--config-file", "-C", default=None, type=click.Path(exists=True, dir_okay=False,), help="Configuration file.", ) @click.pass_context def scrubber_cli_group(ctx, config_file: Optional[str]) -> None: """main command group of the datastore scrubber Expected config format:: scrubber_db: cls: local - db: "service=..." # libpq DSN + db: "service=..." # libpq DSN - # for storage checkers only: + # for storage checkers + origin locator only: storage: - cls: postgresql # cannot be remote, as it needs direct access to the pg DB + cls: postgresql # cannot be remote for checkers, as they need direct + # access to the pg DB db": "service=..." # libpq DSN objstorage: cls: memory # for journal checkers only: journal_client: # see https://docs.softwareheritage.org/devel/apidoc/swh.journal.client.html # for the full list of options sasl.mechanism: SCRAM-SHA-512 security.protocol: SASL_SSL sasl.username: ... sasl.password: ... group_id: ... privileged: True message.max.bytes: 524288000 brokers: - "broker1.journal.softwareheritage.org:9093 - "broker2.journal.softwareheritage.org:9093 - "broker3.journal.softwareheritage.org:9093 - "broker4.journal.softwareheritage.org:9093 - "broker5.journal.softwareheritage.org:9093 object_types: [directory, revision, snapshot, release] auto_offset_reset: earliest """ from swh.core import config from . import get_scrubber_db if not config_file: config_file = os.environ.get("SWH_CONFIG_FILENAME") if config_file: if not os.path.exists(config_file): raise ValueError("%s does not exist" % config_file) conf = config.read(config_file) else: conf = {} if "scrubber_db" not in conf: ctx.fail("You must have a scrubber_db configured in your config file.") ctx.ensure_object(dict) ctx.obj["config"] = conf ctx.obj["db"] = get_scrubber_db(**conf["scrubber_db"]) @scrubber_cli_group.group(name="check") @click.pass_context def scrubber_check_cli_group(ctx): """group of commands which read from data stores and report errors. """ pass @scrubber_check_cli_group.command(name="storage") @click.option( "--object-type", type=click.Choice( # use a hardcoded list to prevent having to load the # replay module at cli loading time [ "snapshot", "revision", "release", "directory", # TODO: # "raw_extrinsic_metadata", # "extid", ] ), ) -@click.option("--start-object", default="0" * 40) -@click.option("--end-object", default="f" * 40) +@click.option("--start-object", default="00" * 20) +@click.option("--end-object", default="ff" * 20) @click.pass_context def scrubber_check_storage(ctx, object_type: str, start_object: str, end_object: str): """Reads a postgresql storage, and reports corrupt objects to the scrubber DB.""" conf = ctx.obj["config"] if "storage" not in conf: ctx.fail("You must have a storage configured in your config file.") from swh.storage import get_storage from .storage_checker import StorageChecker checker = StorageChecker( db=ctx.obj["db"], storage=get_storage(**conf["storage"]), object_type=object_type, start_object=start_object, end_object=end_object, ) checker.run() @scrubber_check_cli_group.command(name="journal") @click.pass_context def scrubber_check_journal(ctx) -> None: """Reads a complete kafka journal, and reports corrupt objects to the scrubber DB.""" conf = ctx.obj["config"] if "journal_client" not in conf: ctx.fail("You must have a journal_client configured in your config file.") from .journal_checker import JournalChecker checker = JournalChecker(db=ctx.obj["db"], journal_client=conf["journal_client"],) checker.run() + + +@scrubber_cli_group.command(name="locate") +@click.option("--start-object", default="swh:1:cnt:" + "00" * 20) +@click.option("--end-object", default="swh:1:snp:" + "ff" * 20) +@click.pass_context +def scrubber_locate_origins(ctx, start_object: str, end_object: str): + """For each known corrupt object reported in the scrubber DB, looks up origins + that may contain this object, and records them; so they can be used later + for recovery.""" + conf = ctx.obj["config"] + if "storage" not in conf: + ctx.fail("You must have a storage configured in your config file.") + + from swh.graph.client import RemoteGraphClient + from swh.model.model import CoreSWHID + from swh.storage import get_storage + + from .origin_locator import OriginLocator + + locator = OriginLocator( + db=ctx.obj["db"], + storage=get_storage(**conf["storage"]), + graph=RemoteGraphClient(**conf["graph"]), + start_object=CoreSWHID.from_string(start_object), + end_object=CoreSWHID.from_string(end_object), + ) + + locator.run() diff --git a/swh/scrubber/db.py b/swh/scrubber/db.py index b915a6e..2efa953 100644 --- a/swh/scrubber/db.py +++ b/swh/scrubber/db.py @@ -1,96 +1,145 @@ # Copyright (C) 2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import dataclasses import datetime import functools -from typing import Iterator, Union +from typing import Iterator, List + +import psycopg2 from swh.core.db import BaseDb -from swh.model.model import Content, Directory, Release, Revision, Snapshot from swh.model.swhids import CoreSWHID @dataclasses.dataclass(frozen=True) class Datastore: """Represents a datastore being scrubbed; eg. swh-storage or swh-journal.""" package: str """'storage', 'journal', or 'objstorage'.""" cls: str """'postgresql'/'cassandra' for storage, 'kafka' for journal, 'pathslicer'/'winery'/... for objstorage.""" instance: str """Human readable string.""" @dataclasses.dataclass(frozen=True) class CorruptObject: id: CoreSWHID datastore: Datastore first_occurrence: datetime.datetime object_: bytes class ScrubberDb(BaseDb): current_version = 1 @functools.lru_cache(1000) def datastore_get_or_add(self, datastore: Datastore) -> int: """Creates a datastore if it does not exist, and returns its id.""" cur = self.cursor() cur.execute( """ INSERT INTO datastore (package, class, instance) VALUES (%s, %s, %s) ON CONFLICT DO NOTHING RETURNING id """, (datastore.package, datastore.cls, datastore.instance), ) (id_,) = cur.fetchone() return id_ def corrupt_object_add( - self, - datastore: Datastore, - object_: Union[Content, Directory, Revision, Release, Snapshot], - serialized_object: bytes, + self, id: CoreSWHID, datastore: Datastore, serialized_object: bytes, ) -> None: datastore_id = self.datastore_get_or_add(datastore) cur = self.cursor() cur.execute( """ INSERT INTO corrupt_object (id, datastore, object) VALUES (%s, %s, %s) ON CONFLICT DO NOTHING """, - (str(object_.swhid()), datastore_id, serialized_object), + (str(id), datastore_id, serialized_object), ) def corrupt_object_iter(self) -> Iterator[CorruptObject]: """Yields all records in the 'corrupt_object' table.""" cur = self.cursor() cur.execute( """ SELECT co.id, co.first_occurrence, co.object, ds.package, ds.class, ds.instance FROM corrupt_object AS co INNER JOIN datastore AS ds ON (ds.id=co.datastore) """ ) for row in cur: (id, first_occurrence, object_, ds_package, ds_class, ds_instance) = row yield CorruptObject( id=CoreSWHID.from_string(id), first_occurrence=first_occurrence, object_=object_, datastore=Datastore( package=ds_package, cls=ds_class, instance=ds_instance ), ) + + def corrupt_object_grab( + self, + cur, + start_id: CoreSWHID = None, + end_id: CoreSWHID = None, + limit: int = 100, + ) -> List[CorruptObject]: + """Yields a page of records in the 'corrupt_object' table.""" + cur.execute( + """ + SELECT + co.id, co.first_occurrence, co.object, + ds.package, ds.class, ds.instance + FROM corrupt_object AS co + INNER JOIN datastore AS ds ON (ds.id=co.datastore) + WHERE + co.id >= %s + AND co.id <= %s + ORDER BY co.id + LIMIT %s + """, + (str(start_id), str(end_id), limit), + ) + + results = [] + for row in cur: + (id, first_occurrence, object_, ds_package, ds_class, ds_instance) = row + results.append( + CorruptObject( + id=CoreSWHID.from_string(id), + first_occurrence=first_occurrence, + object_=object_, + datastore=Datastore( + package=ds_package, cls=ds_class, instance=ds_instance + ), + ) + ) + + return results + + def object_origin_add(self, cur, swhid: CoreSWHID, origins: List[str]) -> None: + psycopg2.extras.execute_values( + cur, + """ + INSERT INTO object_origin (object_id, origin_url) + VALUES %s + ON CONFLICT DO NOTHING + """, + [(str(swhid), origin_url) for origin_url in origins], + ) diff --git a/swh/scrubber/journal_checker.py b/swh/scrubber/journal_checker.py index 369a3ea..008feec 100644 --- a/swh/scrubber/journal_checker.py +++ b/swh/scrubber/journal_checker.py @@ -1,69 +1,71 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Reads all objects in a swh-storage instance and recomputes their checksums.""" import logging from typing import Any, Dict, List from swh.journal.client import get_journal_client from swh.journal.serializers import kafka_to_value from swh.model import model from .db import Datastore, ScrubberDb logger = logging.getLogger(__name__) class JournalChecker: """Reads a chunk of a swh-storage database, recomputes checksums, and reports errors in a separate database.""" _datastore = None def __init__(self, db: ScrubberDb, journal_client: Dict[str, Any]): self.db = db self.journal_client_config = journal_client self.journal_client = get_journal_client( **journal_client, # Remove default deserializer; so process_kafka_values() gets the message # verbatim so it can archive it with as few modifications a possible. value_deserializer=lambda obj_type, msg: msg, ) def datastore_info(self) -> Datastore: """Returns a :class:`Datastore` instance representing the journal instance being checked.""" if self._datastore is None: config = self.journal_client_config if config["cls"] == "kafka": self._datastore = Datastore( package="journal", cls="kafka", instance=( f"brokers={config['brokers']!r} prefix={config['prefix']!r}" ), ) else: raise NotImplementedError( f"StorageChecker(journal_client={self.journal_client_config!r})" f".datastore()" ) return self._datastore def run(self): """Runs a journal client with the given configuration. This method does not return, unless otherwise configured (with ``stop_on_eof``). """ self.journal_client.process(self.process_kafka_messages) def process_kafka_messages(self, all_messages: Dict[str, List[bytes]]): for (object_type, messages) in all_messages.items(): cls = getattr(model, object_type.capitalize()) for message in messages: object_ = cls.from_dict(kafka_to_value(message)) real_id = object_.compute_hash() if object_.id != real_id: - self.db.corrupt_object_add(self.datastore_info(), object_, message) + self.db.corrupt_object_add( + object_.swhid(), self.datastore_info(), message + ) diff --git a/swh/scrubber/origin_locator.py b/swh/scrubber/origin_locator.py new file mode 100644 index 0000000..aa19010 --- /dev/null +++ b/swh/scrubber/origin_locator.py @@ -0,0 +1,87 @@ +# Copyright (C) 2021-2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +"""Lists corrupt objects in the scrubber database, and lists candidate origins +to recover them from.""" + +import dataclasses +import itertools +import logging +from typing import Iterable, Union + +import psycopg2 + +from swh.core.utils import grouper +from swh.graph.client import GraphArgumentException, RemoteGraphClient +from swh.model.model import Directory, Release, Revision, Snapshot +from swh.model.swhids import CoreSWHID, ExtendedSWHID +from swh.storage.interface import StorageInterface + +from .db import CorruptObject, ScrubberDb +from .utils import iter_corrupt_objects + +logger = logging.getLogger(__name__) + +ScrubbableObject = Union[Revision, Release, Snapshot, Directory] + + +def get_origins( + graph: RemoteGraphClient, storage: StorageInterface, swhid: CoreSWHID +) -> Iterable[str]: + try: + origin_swhids = [ + ExtendedSWHID.from_string(line) + for line in graph.leaves(str(swhid), direction="backward") + if line.startswith("swh:1:ori:") + ] + except GraphArgumentException: + return + + for origin_swhid_group in grouper(origin_swhids, 10): + origin_swhid_group = list(origin_swhid_group) + for (origin, origin_swhid) in zip( + storage.origin_get_by_sha1( + [origin_swhid.object_id for origin_swhid in origin_swhid_group] + ), + origin_swhid_group, + ): + if origin is None: + logger.error("%s found in graph but missing from storage", origin_swhid) + else: + yield origin["url"] + + +@dataclasses.dataclass +class OriginLocator: + """Reads a chunk of corrupt objects in the swh-scrubber database, then writes + to the same database a list of origins they might be recovered from.""" + + db: ScrubberDb + """Database to read from and write to.""" + graph: RemoteGraphClient + storage: StorageInterface + """Used to resolve origin SHA1s to URLs.""" + + start_object: CoreSWHID + """Minimum SWHID to check (in alphabetical order)""" + end_object: CoreSWHID + """Maximum SWHID to check (in alphabetical order)""" + + def run(self): + iter_corrupt_objects( + self.db, self.start_object, self.end_object, self.handle_corrupt_object + ) + + def handle_corrupt_object( + self, corrupt_object: CorruptObject, cur: psycopg2.extensions.cursor + ) -> None: + origins = get_origins(self.graph, self.storage, corrupt_object.id) + + # Keep only 100 origins, to avoid flooding the DB. + # It is very unlikely an object disappred from 100 somwhat-randomly sampled + # origins. + first_origins = list(itertools.islice(origins, 0, 100)) + + self.db.object_origin_add(cur, corrupt_object.id, first_origins) diff --git a/swh/scrubber/sql/30-schema.sql b/swh/scrubber/sql/30-schema.sql index 4f857c7..5cfab88 100644 --- a/swh/scrubber/sql/30-schema.sql +++ b/swh/scrubber/sql/30-schema.sql @@ -1,28 +1,37 @@ create domain swhid as text check (value ~ '^swh:[0-9]+:.*'); create table datastore ( id bigserial not null, package datastore_type not null, class text, instance text ); comment on table datastore is 'Each row identifies a data store being scrubbed'; comment on column datastore.id is 'Internal identifier of the datastore'; comment on column datastore.package is 'Name of the component using this datastore (storage/journal/objstorage)'; comment on column datastore.class is 'For datastores with multiple backends, name of the backend (postgresql/cassandra for storage, kafka for journal, pathslicer/azure/winery/... for objstorage)'; comment on column datastore.instance is 'Human-readable way to uniquely identify the datastore; eg. its URL or DSN.'; create table corrupt_object ( id swhid not null, datastore int not null, - first_occurrence timestamptz not null default now(), - object bytea not null + object bytea not null, + first_occurrence timestamptz not null default now() ); comment on table corrupt_object is 'Each row identifies an object that was found to be corrupt'; comment on column corrupt_object.datastore is 'Datastore the corrupt object was found in.'; -comment on column corrupt_object.first_occurrence is 'Moment the object was found to be corrupt for the first time'; comment on column corrupt_object.object is 'Corrupt object, as found in the datastore (possibly msgpack-encoded, using the journal''s serializer)'; +comment on column corrupt_object.first_occurrence is 'Moment the object was found to be corrupt for the first time'; + +create table object_origin +( + object_id swhid not null, + origin_url text not null, + last_attempt timestamptz -- NULL if not tried yet +); + +comment on table object_origin is 'Maps objects to origins they might be found in.'; diff --git a/swh/scrubber/sql/60-indexes.sql b/swh/scrubber/sql/60-indexes.sql index 09b13f1..5acd9c8 100644 --- a/swh/scrubber/sql/60-indexes.sql +++ b/swh/scrubber/sql/60-indexes.sql @@ -1,13 +1,24 @@ -- datastore create unique index concurrently datastore_pkey on datastore(id); alter table datastore add primary key using index datastore_pkey; create unique index concurrently datastore_package_class_instance on datastore(package, class, instance); + -- corrupt_object alter table corrupt_object add constraint corrupt_object_datastore_fkey foreign key (datastore) references datastore(id) not valid; alter table corrupt_object validate constraint corrupt_object_datastore_fkey; -create unique index corrupt_object_pkey on corrupt_object(id, datastore); +create unique index concurrently corrupt_object_pkey on corrupt_object(id, datastore); +alter table corrupt_object add primary key using index corrupt_object_pkey; + +-- object_origin + +create unique index concurrently object_origin_pkey on object_origin (object_id, origin_url); +create index concurrently object_origin_by_origin on object_origin (origin_url, object_id); + +-- FIXME: not valid, because corrupt_object(id) is not unique +-- alter table object_origin add constraint object_origin_object_fkey foreign key (object_id) references corrupt_object(id) not valid; +-- alter table object_origin validate constraint object_origin_object_fkey; diff --git a/swh/scrubber/storage_checker.py b/swh/scrubber/storage_checker.py index afb3835..d5d306c 100644 --- a/swh/scrubber/storage_checker.py +++ b/swh/scrubber/storage_checker.py @@ -1,102 +1,104 @@ # Copyright (C) 2021-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information """Reads all objects in a swh-storage instance and recomputes their checksums.""" import contextlib import dataclasses import logging from typing import Iterable, Union from swh.journal.serializers import value_to_kafka from swh.model.model import Directory, Release, Revision, Snapshot from swh.storage import backfill from swh.storage.interface import StorageInterface from swh.storage.postgresql.storage import Storage as PostgresqlStorage from .db import Datastore, ScrubberDb logger = logging.getLogger(__name__) ScrubbableObject = Union[Revision, Release, Snapshot, Directory] @contextlib.contextmanager def storage_db(storage): db = storage.get_db() try: yield db finally: storage.put_db(db) @dataclasses.dataclass class StorageChecker: """Reads a chunk of a swh-storage database, recomputes checksums, and reports errors in a separate database.""" db: ScrubberDb storage: StorageInterface object_type: str """``directory``/``revision``/``release``/``snapshot``""" start_object: str """minimum value of the hexdigest of the object's sha1.""" end_object: str """maximum value of the hexdigest of the object's sha1.""" _datastore = None def datastore_info(self) -> Datastore: """Returns a :class:`Datastore` instance representing the swh-storage instance being checked.""" if self._datastore is None: if isinstance(self.storage, PostgresqlStorage): with storage_db(self.storage) as db: self._datastore = Datastore( package="storage", cls="postgresql", instance=db.conn.dsn, ) else: raise NotImplementedError( f"StorageChecker(storage={self.storage!r}).datastore()" ) return self._datastore def run(self): """Runs on all objects of ``object_type`` and with id between ``start_object`` and ``end_object``. """ if isinstance(self.storage, PostgresqlStorage): with storage_db(self.storage) as db: return self._check_postgresql(db) else: raise NotImplementedError( f"StorageChecker(storage={self.storage!r}).check_storage()" ) def _check_postgresql(self, db): for range_start, range_end in backfill.RANGE_GENERATORS[self.object_type]( self.start_object, self.end_object ): logger.info( "Processing %s range %s to %s", self.object_type, backfill._format_range_bound(range_start), backfill._format_range_bound(range_end), ) objects = backfill.fetch( db, self.object_type, start=range_start, end=range_end ) objects = list(objects) self.process_objects(objects) def process_objects(self, objects: Iterable[ScrubbableObject]): for object_ in objects: real_id = object_.compute_hash() if object_.id != real_id: self.db.corrupt_object_add( - self.datastore_info(), object_, value_to_kafka(object_.to_dict()) + object_.swhid(), + self.datastore_info(), + value_to_kafka(object_.to_dict()), ) diff --git a/swh/scrubber/tests/test_cli.py b/swh/scrubber/tests/test_cli.py index 3602813..37e39d8 100644 --- a/swh/scrubber/tests/test_cli.py +++ b/swh/scrubber/tests/test_cli.py @@ -1,115 +1,140 @@ # Copyright (C) 2020-2022 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import tempfile from unittest.mock import MagicMock, call from click.testing import CliRunner import yaml +from swh.model.swhids import CoreSWHID from swh.scrubber.cli import scrubber_cli_group from swh.scrubber.storage_checker import storage_db def invoke( scrubber_db, args, storage=None, kafka_server=None, kafka_prefix=None, kafka_consumer_group=None, ): runner = CliRunner() config = { "scrubber_db": {"cls": "local", "db": scrubber_db.conn.dsn}, + "graph": {"url": "http://graph.example.org:5009/"}, } if storage: with storage_db(storage) as db: config["storage"] = { "cls": "postgresql", "db": db.conn.dsn, "objstorage": {"cls": "memory"}, } assert ( (kafka_server is None) == (kafka_prefix is None) == (kafka_consumer_group is None) ) if kafka_server: config["journal_client"] = dict( cls="kafka", brokers=kafka_server, group_id=kafka_consumer_group, prefix=kafka_prefix, stop_on_eof=True, ) with tempfile.NamedTemporaryFile("a", suffix=".yml") as config_fd: yaml.dump(config, config_fd) config_fd.seek(0) args = ["-C" + config_fd.name] + list(args) result = runner.invoke(scrubber_cli_group, args, catch_exceptions=False) return result def test_check_storage(mocker, scrubber_db, swh_storage): storage_checker = MagicMock() StorageChecker = mocker.patch( "swh.scrubber.storage_checker.StorageChecker", return_value=storage_checker ) get_scrubber_db = mocker.patch( "swh.scrubber.get_scrubber_db", return_value=scrubber_db ) result = invoke( scrubber_db, ["check", "storage", "--object-type=snapshot"], storage=swh_storage ) assert result.exit_code == 0, result.output assert result.output == "" get_scrubber_db.assert_called_once_with(cls="local", db=scrubber_db.conn.dsn) StorageChecker.assert_called_once_with( db=scrubber_db, storage=StorageChecker.mock_calls[0][2]["storage"], object_type="snapshot", start_object="0" * 40, end_object="f" * 40, ) assert storage_checker.method_calls == [call.run()] def test_check_journal( mocker, scrubber_db, kafka_server, kafka_prefix, kafka_consumer_group ): journal_checker = MagicMock() JournalChecker = mocker.patch( "swh.scrubber.journal_checker.JournalChecker", return_value=journal_checker ) get_scrubber_db = mocker.patch( "swh.scrubber.get_scrubber_db", return_value=scrubber_db ) result = invoke( scrubber_db, ["check", "journal"], kafka_server=kafka_server, kafka_prefix=kafka_prefix, kafka_consumer_group=kafka_consumer_group, ) assert result.exit_code == 0, result.output assert result.output == "" get_scrubber_db.assert_called_once_with(cls="local", db=scrubber_db.conn.dsn) JournalChecker.assert_called_once_with( db=scrubber_db, journal_client={ "brokers": kafka_server, "cls": "kafka", "group_id": kafka_consumer_group, "prefix": kafka_prefix, "stop_on_eof": True, }, ) assert journal_checker.method_calls == [call.run()] + + +def test_locate_origins(mocker, scrubber_db, swh_storage): + origin_locator = MagicMock() + OriginLocator = mocker.patch( + "swh.scrubber.origin_locator.OriginLocator", return_value=origin_locator + ) + get_scrubber_db = mocker.patch( + "swh.scrubber.get_scrubber_db", return_value=scrubber_db + ) + result = invoke(scrubber_db, ["locate"], storage=swh_storage) + assert result.exit_code == 0, result.output + assert result.output == "" + + get_scrubber_db.assert_called_once_with(cls="local", db=scrubber_db.conn.dsn) + OriginLocator.assert_called_once_with( + db=scrubber_db, + storage=OriginLocator.mock_calls[0][2]["storage"], + graph=OriginLocator.mock_calls[0][2]["graph"], + start_object=CoreSWHID.from_string("swh:1:cnt:" + "00" * 20), + end_object=CoreSWHID.from_string("swh:1:snp:" + "ff" * 20), + ) + assert origin_locator.method_calls == [call.run()] diff --git a/swh/scrubber/tests/test_origin_locator.py b/swh/scrubber/tests/test_origin_locator.py new file mode 100644 index 0000000..7f2e67f --- /dev/null +++ b/swh/scrubber/tests/test_origin_locator.py @@ -0,0 +1,170 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import datetime +import logging +from unittest.mock import MagicMock + +import pytest + +from swh.graph.naive_client import NaiveClient as NaiveGraphClient +from swh.model.model import Origin +from swh.model.swhids import CoreSWHID +from swh.scrubber.db import CorruptObject, Datastore +from swh.scrubber.origin_locator import OriginLocator + +CORRUPT_OBJECT = CorruptObject( + id=CoreSWHID.from_string("swh:1:cnt:" + "f" * 40), + datastore=Datastore(package="storage", cls="postgresql", instance="service=swh"), + first_occurrence=datetime.datetime.now(tz=datetime.timezone.utc), + object_=b"blah", +) + + +@pytest.mark.parametrize("insert", [False, True]) +def test_no_objects(scrubber_db, insert): + if insert: + scrubber_db.corrupt_object_add( + CORRUPT_OBJECT.id, CORRUPT_OBJECT.datastore, CORRUPT_OBJECT.object_ + ) + + graph = MagicMock() + storage = MagicMock() + locator = OriginLocator( + db=scrubber_db, + graph=graph, + storage=storage, + # this range does not contain the object above + start_object=CoreSWHID.from_string("swh:1:cnt:00" + "00" * 19), + end_object=CoreSWHID.from_string("swh:1:cnt:f0" + "00" * 19), + ) + + locator.run() + + assert graph.method_calls == [] + assert storage.method_calls == [] + + with scrubber_db.conn.cursor() as cur: + cur.execute("SELECT COUNT(*) FROM object_origin") + assert cur.fetchone() == (0,) + + +def test_object_not_in_graph(scrubber_db): + scrubber_db.corrupt_object_add( + CORRUPT_OBJECT.id, CORRUPT_OBJECT.datastore, CORRUPT_OBJECT.object_ + ) + + graph = NaiveGraphClient(nodes=[], edges=[]) + storage = MagicMock() + locator = OriginLocator( + db=scrubber_db, + graph=graph, + storage=storage, + start_object=CoreSWHID.from_string("swh:1:cnt:" + "00" * 20), + end_object=CoreSWHID.from_string("swh:1:cnt:" + "00" * 20), + ) + + locator.run() + + assert storage.method_calls == [] + + with scrubber_db.conn.cursor() as cur: + cur.execute("SELECT COUNT(*) FROM object_origin") + assert cur.fetchone() == (0,) + + +def test_origin_not_in_storage(scrubber_db, swh_storage, caplog): + scrubber_db.corrupt_object_add( + CORRUPT_OBJECT.id, CORRUPT_OBJECT.datastore, CORRUPT_OBJECT.object_ + ) + + origin = Origin(url="http://example.org") + + graph = NaiveGraphClient( + nodes=[CORRUPT_OBJECT.id, origin.swhid()], + edges=[(origin.swhid(), CORRUPT_OBJECT.id)], + ) + locator = OriginLocator( + db=scrubber_db, + graph=graph, + storage=swh_storage, + start_object=CoreSWHID.from_string("swh:1:cnt:" + "00" * 20), + end_object=CoreSWHID.from_string("swh:1:snp:" + "ff" * 20), + ) + + with caplog.at_level(logging.ERROR, logger="swh.scrubber.origin_locator"): + locator.run() + + with scrubber_db.conn.cursor() as cur: + cur.execute("SELECT COUNT(*) FROM object_origin") + assert cur.fetchone() == (0,) + + assert any( + f"{origin.swhid()} found in graph but missing" in record[2] + for record in caplog.record_tuples + ) + + +def test_two_origins(scrubber_db, swh_storage): + scrubber_db.corrupt_object_add( + CORRUPT_OBJECT.id, CORRUPT_OBJECT.datastore, CORRUPT_OBJECT.object_ + ) + + origin1 = Origin(url="http://example.org") + origin2 = Origin(url="http://example.com") + swh_storage.origin_add([origin1, origin2]) + + graph = NaiveGraphClient( + nodes=[CORRUPT_OBJECT.id, origin1.swhid(), origin2.swhid()], + edges=[ + (origin1.swhid(), CORRUPT_OBJECT.id), + (origin2.swhid(), CORRUPT_OBJECT.id), + ], + ) + locator = OriginLocator( + db=scrubber_db, + graph=graph, + storage=swh_storage, + start_object=CoreSWHID.from_string("swh:1:cnt:" + "00" * 20), + end_object=CoreSWHID.from_string("swh:1:snp:" + "ff" * 20), + ) + + locator.run() + + with scrubber_db.conn.cursor() as cur: + cur.execute("SELECT object_id, origin_url FROM object_origin") + assert set(cur) == { + (str(CORRUPT_OBJECT.id), origin1.url), + (str(CORRUPT_OBJECT.id), origin2.url), + } + + +def test_many_origins(scrubber_db, swh_storage): + scrubber_db.corrupt_object_add( + CORRUPT_OBJECT.id, CORRUPT_OBJECT.datastore, CORRUPT_OBJECT.object_ + ) + + origins = [Origin(url=f"http://example.org/{i}") for i in range(1000)] + swh_storage.origin_add(origins) + + graph = NaiveGraphClient( + nodes=[CORRUPT_OBJECT.id] + [origin.swhid() for origin in origins], + edges=[(origin.swhid(), CORRUPT_OBJECT.id) for origin in origins], + ) + locator = OriginLocator( + db=scrubber_db, + graph=graph, + storage=swh_storage, + start_object=CoreSWHID.from_string("swh:1:cnt:" + "00" * 20), + end_object=CoreSWHID.from_string("swh:1:snp:" + "ff" * 20), + ) + + locator.run() + + with scrubber_db.conn.cursor() as cur: + cur.execute("SELECT object_id, origin_url FROM object_origin") + rows = set(cur) + assert rows <= {(str(CORRUPT_OBJECT.id), origin.url) for origin in origins} + assert len(rows) == 100 diff --git a/swh/scrubber/utils.py b/swh/scrubber/utils.py new file mode 100644 index 0000000..2d28b51 --- /dev/null +++ b/swh/scrubber/utils.py @@ -0,0 +1,34 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +from typing import Callable + +import psycopg2 + +from swh.model.swhids import CoreSWHID + +from .db import CorruptObject, ScrubberDb + + +def iter_corrupt_objects( + db: ScrubberDb, + start_object: CoreSWHID, + end_object: CoreSWHID, + cb: Callable[[CorruptObject, psycopg2.extensions.cursor], None], +) -> None: + while True: + with db.conn, db.cursor() as cur: + corrupt_objects = db.corrupt_object_grab(cur, start_object, end_object,) + if corrupt_objects and corrupt_objects[0].id == start_object: + # TODO: don't needlessly fetch duplicate objects + del corrupt_objects[0] + if not corrupt_objects: + # Nothing more to do + break + for corrupt_object in corrupt_objects: + cb(corrupt_object, cur) + db.conn.commit() # XXX: is this redundant with db.conn.__exit__? + + start_object = corrupt_objects[-1].id