Page MenuHomeSoftware Heritage

D7428.id26906.diff
No OneTemporary

D7428.id26906.diff

diff --git a/mypy.ini b/mypy.ini
--- a/mypy.ini
+++ b/mypy.ini
@@ -8,6 +8,9 @@
[mypy-pkg_resources.*]
ignore_missing_imports = True
+[mypy-psycopg2.*]
+ignore_missing_imports = True
+
[mypy-pytest.*]
ignore_missing_imports = True
diff --git a/requirements-test.txt b/requirements-test.txt
--- a/requirements-test.txt
+++ b/requirements-test.txt
@@ -1,4 +1,5 @@
pytest < 7.0.0 # v7.0.0 removed _pytest.tmpdir.TempdirFactory, which is used by some of the pytest plugins we use
pytest-mock
pyyaml
+swh.graph
types-pyyaml
diff --git a/swh/scrubber/cli.py b/swh/scrubber/cli.py
--- a/swh/scrubber/cli.py
+++ b/swh/scrubber/cli.py
@@ -28,11 +28,12 @@
scrubber_db:
cls: local
- db: "service=..." # libpq DSN
+ db: "service=..." # libpq DSN
- # for storage checkers only:
+ # for storage checkers + origin locator only:
storage:
- cls: postgresql # cannot be remote, as it needs direct access to the pg DB
+ cls: postgresql # cannot be remote for checkers, as they need direct
+ # access to the pg DB
db": "service=..." # libpq DSN
objstorage:
cls: memory
@@ -104,8 +105,8 @@
]
),
)
-@click.option("--start-object", default="0" * 40)
-@click.option("--end-object", default="f" * 40)
+@click.option("--start-object", default="00" * 20)
+@click.option("--end-object", default="ff" * 20)
@click.pass_context
def scrubber_check_storage(ctx, object_type: str, start_object: str, end_object: str):
"""Reads a postgresql storage, and reports corrupt objects to the scrubber DB."""
@@ -142,3 +143,32 @@
checker = JournalChecker(db=ctx.obj["db"], journal_client=conf["journal_client"],)
checker.run()
+
+
+@scrubber_cli_group.command(name="locate")
+@click.option("--start-object", default="swh:1:cnt:" + "00" * 20)
+@click.option("--end-object", default="swh:1:snp:" + "ff" * 20)
+@click.pass_context
+def scrubber_locate_origins(ctx, start_object: str, end_object: str):
+ """For each known corrupt object reported in the scrubber DB, looks up origins
+ that may contain this object, and records them; so they can be used later
+ for recovery."""
+ conf = ctx.obj["config"]
+ if "storage" not in conf:
+ ctx.fail("You must have a storage configured in your config file.")
+
+ from swh.graph.client import RemoteGraphClient
+ from swh.model.model import CoreSWHID
+ from swh.storage import get_storage
+
+ from .origin_locator import OriginLocator
+
+ locator = OriginLocator(
+ db=ctx.obj["db"],
+ storage=get_storage(**conf["storage"]),
+ graph=RemoteGraphClient(**conf["graph"]),
+ start_object=CoreSWHID.from_string(start_object),
+ end_object=CoreSWHID.from_string(end_object),
+ )
+
+ locator.run()
diff --git a/swh/scrubber/db.py b/swh/scrubber/db.py
--- a/swh/scrubber/db.py
+++ b/swh/scrubber/db.py
@@ -7,10 +7,11 @@
import dataclasses
import datetime
import functools
-from typing import Iterator, Union
+from typing import Iterator, List
+
+import psycopg2
from swh.core.db import BaseDb
-from swh.model.model import Content, Directory, Release, Revision, Snapshot
from swh.model.swhids import CoreSWHID
@@ -55,10 +56,7 @@
return id_
def corrupt_object_add(
- self,
- datastore: Datastore,
- object_: Union[Content, Directory, Revision, Release, Snapshot],
- serialized_object: bytes,
+ self, id: CoreSWHID, datastore: Datastore, serialized_object: bytes,
) -> None:
datastore_id = self.datastore_get_or_add(datastore)
cur = self.cursor()
@@ -68,7 +66,7 @@
VALUES (%s, %s, %s)
ON CONFLICT DO NOTHING
""",
- (str(object_.swhid()), datastore_id, serialized_object),
+ (str(id), datastore_id, serialized_object),
)
def corrupt_object_iter(self) -> Iterator[CorruptObject]:
@@ -94,3 +92,54 @@
package=ds_package, cls=ds_class, instance=ds_instance
),
)
+
+ def corrupt_object_grab(
+ self,
+ cur,
+ start_id: CoreSWHID = None,
+ end_id: CoreSWHID = None,
+ limit: int = 100,
+ ) -> List[CorruptObject]:
+ """Yields a page of records in the 'corrupt_object' table."""
+ cur.execute(
+ """
+ SELECT
+ co.id, co.first_occurrence, co.object,
+ ds.package, ds.class, ds.instance
+ FROM corrupt_object AS co
+ INNER JOIN datastore AS ds ON (ds.id=co.datastore)
+ WHERE
+ co.id >= %s
+ AND co.id <= %s
+ ORDER BY co.id
+ LIMIT %s
+ """,
+ (str(start_id), str(end_id), limit),
+ )
+
+ results = []
+ for row in cur:
+ (id, first_occurrence, object_, ds_package, ds_class, ds_instance) = row
+ results.append(
+ CorruptObject(
+ id=CoreSWHID.from_string(id),
+ first_occurrence=first_occurrence,
+ object_=object_,
+ datastore=Datastore(
+ package=ds_package, cls=ds_class, instance=ds_instance
+ ),
+ )
+ )
+
+ return results
+
+ def object_origin_add(self, cur, swhid: CoreSWHID, origins: List[str]) -> None:
+ psycopg2.extras.execute_values(
+ cur,
+ """
+ INSERT INTO object_origin (object_id, origin_url)
+ VALUES %s
+ ON CONFLICT DO NOTHING
+ """,
+ [(str(swhid), origin_url) for origin_url in origins],
+ )
diff --git a/swh/scrubber/journal_checker.py b/swh/scrubber/journal_checker.py
--- a/swh/scrubber/journal_checker.py
+++ b/swh/scrubber/journal_checker.py
@@ -66,4 +66,6 @@
object_ = cls.from_dict(kafka_to_value(message))
real_id = object_.compute_hash()
if object_.id != real_id:
- self.db.corrupt_object_add(self.datastore_info(), object_, message)
+ self.db.corrupt_object_add(
+ object_.swhid(), self.datastore_info(), message
+ )
diff --git a/swh/scrubber/origin_locator.py b/swh/scrubber/origin_locator.py
new file mode 100644
--- /dev/null
+++ b/swh/scrubber/origin_locator.py
@@ -0,0 +1,87 @@
+# Copyright (C) 2021-2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+"""Lists corrupt objects in the scrubber database, and lists candidate origins
+to recover them from."""
+
+import dataclasses
+import itertools
+import logging
+from typing import Iterable, Union
+
+import psycopg2
+
+from swh.core.utils import grouper
+from swh.graph.client import GraphArgumentException, RemoteGraphClient
+from swh.model.model import Directory, Release, Revision, Snapshot
+from swh.model.swhids import CoreSWHID, ExtendedSWHID
+from swh.storage.interface import StorageInterface
+
+from .db import CorruptObject, ScrubberDb
+from .utils import iter_corrupt_objects
+
+logger = logging.getLogger(__name__)
+
+ScrubbableObject = Union[Revision, Release, Snapshot, Directory]
+
+
+def get_origins(
+ graph: RemoteGraphClient, storage: StorageInterface, swhid: CoreSWHID
+) -> Iterable[str]:
+ try:
+ origin_swhids = [
+ ExtendedSWHID.from_string(line)
+ for line in graph.leaves(str(swhid), direction="backward")
+ if line.startswith("swh:1:ori:")
+ ]
+ except GraphArgumentException:
+ return
+
+ for origin_swhid_group in grouper(origin_swhids, 10):
+ origin_swhid_group = list(origin_swhid_group)
+ for (origin, origin_swhid) in zip(
+ storage.origin_get_by_sha1(
+ [origin_swhid.object_id for origin_swhid in origin_swhid_group]
+ ),
+ origin_swhid_group,
+ ):
+ if origin is None:
+ logger.error("%s found in graph but missing from storage", origin_swhid)
+ else:
+ yield origin["url"]
+
+
+@dataclasses.dataclass
+class OriginLocator:
+ """Reads a chunk of corrupt objects in the swh-scrubber database, then writes
+ to the same database a list of origins they might be recovered from."""
+
+ db: ScrubberDb
+ """Database to read from and write to."""
+ graph: RemoteGraphClient
+ storage: StorageInterface
+ """Used to resolve origin SHA1s to URLs."""
+
+ start_object: CoreSWHID
+ """Minimum SWHID to check (in alphabetical order)"""
+ end_object: CoreSWHID
+ """Maximum SWHID to check (in alphabetical order)"""
+
+ def run(self):
+ iter_corrupt_objects(
+ self.db, self.start_object, self.end_object, self.handle_corrupt_object
+ )
+
+ def handle_corrupt_object(
+ self, corrupt_object: CorruptObject, cur: psycopg2.extensions.cursor
+ ) -> None:
+ origins = get_origins(self.graph, self.storage, corrupt_object.id)
+
+ # Keep only 100 origins, to avoid flooding the DB.
+ # It is very unlikely an object disappred from 100 somwhat-randomly sampled
+ # origins.
+ first_origins = list(itertools.islice(origins, 0, 100))
+
+ self.db.object_origin_add(cur, corrupt_object.id, first_origins)
diff --git a/swh/scrubber/sql/30-schema.sql b/swh/scrubber/sql/30-schema.sql
--- a/swh/scrubber/sql/30-schema.sql
+++ b/swh/scrubber/sql/30-schema.sql
@@ -18,11 +18,20 @@
(
id swhid not null,
datastore int not null,
- first_occurrence timestamptz not null default now(),
- object bytea not null
+ object bytea not null,
+ first_occurrence timestamptz not null default now()
);
comment on table corrupt_object is 'Each row identifies an object that was found to be corrupt';
comment on column corrupt_object.datastore is 'Datastore the corrupt object was found in.';
-comment on column corrupt_object.first_occurrence is 'Moment the object was found to be corrupt for the first time';
comment on column corrupt_object.object is 'Corrupt object, as found in the datastore (possibly msgpack-encoded, using the journal''s serializer)';
+comment on column corrupt_object.first_occurrence is 'Moment the object was found to be corrupt for the first time';
+
+create table object_origin
+(
+ object_id swhid not null,
+ origin_url text not null,
+ last_attempt timestamptz -- NULL if not tried yet
+);
+
+comment on table object_origin is 'Maps objects to origins they might be found in.';
diff --git a/swh/scrubber/sql/60-indexes.sql b/swh/scrubber/sql/60-indexes.sql
--- a/swh/scrubber/sql/60-indexes.sql
+++ b/swh/scrubber/sql/60-indexes.sql
@@ -5,9 +5,20 @@
create unique index concurrently datastore_package_class_instance on datastore(package, class, instance);
+
-- corrupt_object
alter table corrupt_object add constraint corrupt_object_datastore_fkey foreign key (datastore) references datastore(id) not valid;
alter table corrupt_object validate constraint corrupt_object_datastore_fkey;
-create unique index corrupt_object_pkey on corrupt_object(id, datastore);
+create unique index concurrently corrupt_object_pkey on corrupt_object(id, datastore);
+alter table corrupt_object add primary key using index corrupt_object_pkey;
+
+-- object_origin
+
+create unique index concurrently object_origin_pkey on object_origin (object_id, origin_url);
+create index concurrently object_origin_by_origin on object_origin (origin_url, object_id);
+
+-- FIXME: not valid, because corrupt_object(id) is not unique
+-- alter table object_origin add constraint object_origin_object_fkey foreign key (object_id) references corrupt_object(id) not valid;
+-- alter table object_origin validate constraint object_origin_object_fkey;
diff --git a/swh/scrubber/storage_checker.py b/swh/scrubber/storage_checker.py
--- a/swh/scrubber/storage_checker.py
+++ b/swh/scrubber/storage_checker.py
@@ -98,5 +98,7 @@
real_id = object_.compute_hash()
if object_.id != real_id:
self.db.corrupt_object_add(
- self.datastore_info(), object_, value_to_kafka(object_.to_dict())
+ object_.swhid(),
+ self.datastore_info(),
+ value_to_kafka(object_.to_dict()),
)
diff --git a/swh/scrubber/tests/test_cli.py b/swh/scrubber/tests/test_cli.py
--- a/swh/scrubber/tests/test_cli.py
+++ b/swh/scrubber/tests/test_cli.py
@@ -9,6 +9,7 @@
from click.testing import CliRunner
import yaml
+from swh.model.swhids import CoreSWHID
from swh.scrubber.cli import scrubber_cli_group
from swh.scrubber.storage_checker import storage_db
@@ -25,6 +26,7 @@
config = {
"scrubber_db": {"cls": "local", "db": scrubber_db.conn.dsn},
+ "graph": {"url": "http://graph.example.org:5009/"},
}
if storage:
with storage_db(storage) as db:
@@ -113,3 +115,26 @@
},
)
assert journal_checker.method_calls == [call.run()]
+
+
+def test_locate_origins(mocker, scrubber_db, swh_storage):
+ origin_locator = MagicMock()
+ OriginLocator = mocker.patch(
+ "swh.scrubber.origin_locator.OriginLocator", return_value=origin_locator
+ )
+ get_scrubber_db = mocker.patch(
+ "swh.scrubber.get_scrubber_db", return_value=scrubber_db
+ )
+ result = invoke(scrubber_db, ["locate"], storage=swh_storage)
+ assert result.exit_code == 0, result.output
+ assert result.output == ""
+
+ get_scrubber_db.assert_called_once_with(cls="local", db=scrubber_db.conn.dsn)
+ OriginLocator.assert_called_once_with(
+ db=scrubber_db,
+ storage=OriginLocator.mock_calls[0][2]["storage"],
+ graph=OriginLocator.mock_calls[0][2]["graph"],
+ start_object=CoreSWHID.from_string("swh:1:cnt:" + "00" * 20),
+ end_object=CoreSWHID.from_string("swh:1:snp:" + "ff" * 20),
+ )
+ assert origin_locator.method_calls == [call.run()]
diff --git a/swh/scrubber/tests/test_origin_locator.py b/swh/scrubber/tests/test_origin_locator.py
new file mode 100644
--- /dev/null
+++ b/swh/scrubber/tests/test_origin_locator.py
@@ -0,0 +1,170 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import datetime
+import logging
+from unittest.mock import MagicMock
+
+import pytest
+
+from swh.graph.naive_client import NaiveClient as NaiveGraphClient
+from swh.model.model import Origin
+from swh.model.swhids import CoreSWHID
+from swh.scrubber.db import CorruptObject, Datastore
+from swh.scrubber.origin_locator import OriginLocator
+
+CORRUPT_OBJECT = CorruptObject(
+ id=CoreSWHID.from_string("swh:1:cnt:" + "f" * 40),
+ datastore=Datastore(package="storage", cls="postgresql", instance="service=swh"),
+ first_occurrence=datetime.datetime.now(tz=datetime.timezone.utc),
+ object_=b"blah",
+)
+
+
+@pytest.mark.parametrize("insert", [False, True])
+def test_no_objects(scrubber_db, insert):
+ if insert:
+ scrubber_db.corrupt_object_add(
+ CORRUPT_OBJECT.id, CORRUPT_OBJECT.datastore, CORRUPT_OBJECT.object_
+ )
+
+ graph = MagicMock()
+ storage = MagicMock()
+ locator = OriginLocator(
+ db=scrubber_db,
+ graph=graph,
+ storage=storage,
+ # this range does not contain the object above
+ start_object=CoreSWHID.from_string("swh:1:cnt:00" + "00" * 19),
+ end_object=CoreSWHID.from_string("swh:1:cnt:f0" + "00" * 19),
+ )
+
+ locator.run()
+
+ assert graph.method_calls == []
+ assert storage.method_calls == []
+
+ with scrubber_db.conn.cursor() as cur:
+ cur.execute("SELECT COUNT(*) FROM object_origin")
+ assert cur.fetchone() == (0,)
+
+
+def test_object_not_in_graph(scrubber_db):
+ scrubber_db.corrupt_object_add(
+ CORRUPT_OBJECT.id, CORRUPT_OBJECT.datastore, CORRUPT_OBJECT.object_
+ )
+
+ graph = NaiveGraphClient(nodes=[], edges=[])
+ storage = MagicMock()
+ locator = OriginLocator(
+ db=scrubber_db,
+ graph=graph,
+ storage=storage,
+ start_object=CoreSWHID.from_string("swh:1:cnt:" + "00" * 20),
+ end_object=CoreSWHID.from_string("swh:1:cnt:" + "00" * 20),
+ )
+
+ locator.run()
+
+ assert storage.method_calls == []
+
+ with scrubber_db.conn.cursor() as cur:
+ cur.execute("SELECT COUNT(*) FROM object_origin")
+ assert cur.fetchone() == (0,)
+
+
+def test_origin_not_in_storage(scrubber_db, swh_storage, caplog):
+ scrubber_db.corrupt_object_add(
+ CORRUPT_OBJECT.id, CORRUPT_OBJECT.datastore, CORRUPT_OBJECT.object_
+ )
+
+ origin = Origin(url="http://example.org")
+
+ graph = NaiveGraphClient(
+ nodes=[CORRUPT_OBJECT.id, origin.swhid()],
+ edges=[(origin.swhid(), CORRUPT_OBJECT.id)],
+ )
+ locator = OriginLocator(
+ db=scrubber_db,
+ graph=graph,
+ storage=swh_storage,
+ start_object=CoreSWHID.from_string("swh:1:cnt:" + "00" * 20),
+ end_object=CoreSWHID.from_string("swh:1:snp:" + "ff" * 20),
+ )
+
+ with caplog.at_level(logging.ERROR, logger="swh.scrubber.origin_locator"):
+ locator.run()
+
+ with scrubber_db.conn.cursor() as cur:
+ cur.execute("SELECT COUNT(*) FROM object_origin")
+ assert cur.fetchone() == (0,)
+
+ assert any(
+ f"{origin.swhid()} found in graph but missing" in record[2]
+ for record in caplog.record_tuples
+ )
+
+
+def test_two_origins(scrubber_db, swh_storage):
+ scrubber_db.corrupt_object_add(
+ CORRUPT_OBJECT.id, CORRUPT_OBJECT.datastore, CORRUPT_OBJECT.object_
+ )
+
+ origin1 = Origin(url="http://example.org")
+ origin2 = Origin(url="http://example.com")
+ swh_storage.origin_add([origin1, origin2])
+
+ graph = NaiveGraphClient(
+ nodes=[CORRUPT_OBJECT.id, origin1.swhid(), origin2.swhid()],
+ edges=[
+ (origin1.swhid(), CORRUPT_OBJECT.id),
+ (origin2.swhid(), CORRUPT_OBJECT.id),
+ ],
+ )
+ locator = OriginLocator(
+ db=scrubber_db,
+ graph=graph,
+ storage=swh_storage,
+ start_object=CoreSWHID.from_string("swh:1:cnt:" + "00" * 20),
+ end_object=CoreSWHID.from_string("swh:1:snp:" + "ff" * 20),
+ )
+
+ locator.run()
+
+ with scrubber_db.conn.cursor() as cur:
+ cur.execute("SELECT object_id, origin_url FROM object_origin")
+ assert set(cur) == {
+ (str(CORRUPT_OBJECT.id), origin1.url),
+ (str(CORRUPT_OBJECT.id), origin2.url),
+ }
+
+
+def test_many_origins(scrubber_db, swh_storage):
+ scrubber_db.corrupt_object_add(
+ CORRUPT_OBJECT.id, CORRUPT_OBJECT.datastore, CORRUPT_OBJECT.object_
+ )
+
+ origins = [Origin(url=f"http://example.org/{i}") for i in range(1000)]
+ swh_storage.origin_add(origins)
+
+ graph = NaiveGraphClient(
+ nodes=[CORRUPT_OBJECT.id] + [origin.swhid() for origin in origins],
+ edges=[(origin.swhid(), CORRUPT_OBJECT.id) for origin in origins],
+ )
+ locator = OriginLocator(
+ db=scrubber_db,
+ graph=graph,
+ storage=swh_storage,
+ start_object=CoreSWHID.from_string("swh:1:cnt:" + "00" * 20),
+ end_object=CoreSWHID.from_string("swh:1:snp:" + "ff" * 20),
+ )
+
+ locator.run()
+
+ with scrubber_db.conn.cursor() as cur:
+ cur.execute("SELECT object_id, origin_url FROM object_origin")
+ rows = set(cur)
+ assert rows <= {(str(CORRUPT_OBJECT.id), origin.url) for origin in origins}
+ assert len(rows) == 100
diff --git a/swh/scrubber/utils.py b/swh/scrubber/utils.py
new file mode 100644
--- /dev/null
+++ b/swh/scrubber/utils.py
@@ -0,0 +1,34 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from typing import Callable
+
+import psycopg2
+
+from swh.model.swhids import CoreSWHID
+
+from .db import CorruptObject, ScrubberDb
+
+
+def iter_corrupt_objects(
+ db: ScrubberDb,
+ start_object: CoreSWHID,
+ end_object: CoreSWHID,
+ cb: Callable[[CorruptObject, psycopg2.extensions.cursor], None],
+) -> None:
+ while True:
+ with db.conn, db.cursor() as cur:
+ corrupt_objects = db.corrupt_object_grab(cur, start_object, end_object,)
+ if corrupt_objects and corrupt_objects[0].id == start_object:
+ # TODO: don't needlessly fetch duplicate objects
+ del corrupt_objects[0]
+ if not corrupt_objects:
+ # Nothing more to do
+ break
+ for corrupt_object in corrupt_objects:
+ cb(corrupt_object, cur)
+ db.conn.commit() # XXX: is this redundant with db.conn.__exit__?
+
+ start_object = corrupt_objects[-1].id

File Metadata

Mime Type
text/plain
Expires
Tue, Dec 17, 12:34 AM (2 w, 1 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3224879

Event Timeline