Page Menu
Home
Software Heritage
Search
Configure Global Search
Log In
Files
F7122789
D7428.id26906.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
21 KB
Subscribers
None
D7428.id26906.diff
View Options
diff --git a/mypy.ini b/mypy.ini
--- a/mypy.ini
+++ b/mypy.ini
@@ -8,6 +8,9 @@
[mypy-pkg_resources.*]
ignore_missing_imports = True
+[mypy-psycopg2.*]
+ignore_missing_imports = True
+
[mypy-pytest.*]
ignore_missing_imports = True
diff --git a/requirements-test.txt b/requirements-test.txt
--- a/requirements-test.txt
+++ b/requirements-test.txt
@@ -1,4 +1,5 @@
pytest < 7.0.0 # v7.0.0 removed _pytest.tmpdir.TempdirFactory, which is used by some of the pytest plugins we use
pytest-mock
pyyaml
+swh.graph
types-pyyaml
diff --git a/swh/scrubber/cli.py b/swh/scrubber/cli.py
--- a/swh/scrubber/cli.py
+++ b/swh/scrubber/cli.py
@@ -28,11 +28,12 @@
scrubber_db:
cls: local
- db: "service=..." # libpq DSN
+ db: "service=..." # libpq DSN
- # for storage checkers only:
+ # for storage checkers + origin locator only:
storage:
- cls: postgresql # cannot be remote, as it needs direct access to the pg DB
+ cls: postgresql # cannot be remote for checkers, as they need direct
+ # access to the pg DB
db": "service=..." # libpq DSN
objstorage:
cls: memory
@@ -104,8 +105,8 @@
]
),
)
-@click.option("--start-object", default="0" * 40)
-@click.option("--end-object", default="f" * 40)
+@click.option("--start-object", default="00" * 20)
+@click.option("--end-object", default="ff" * 20)
@click.pass_context
def scrubber_check_storage(ctx, object_type: str, start_object: str, end_object: str):
"""Reads a postgresql storage, and reports corrupt objects to the scrubber DB."""
@@ -142,3 +143,32 @@
checker = JournalChecker(db=ctx.obj["db"], journal_client=conf["journal_client"],)
checker.run()
+
+
+@scrubber_cli_group.command(name="locate")
+@click.option("--start-object", default="swh:1:cnt:" + "00" * 20)
+@click.option("--end-object", default="swh:1:snp:" + "ff" * 20)
+@click.pass_context
+def scrubber_locate_origins(ctx, start_object: str, end_object: str):
+ """For each known corrupt object reported in the scrubber DB, looks up origins
+ that may contain this object, and records them; so they can be used later
+ for recovery."""
+ conf = ctx.obj["config"]
+ if "storage" not in conf:
+ ctx.fail("You must have a storage configured in your config file.")
+
+ from swh.graph.client import RemoteGraphClient
+ from swh.model.model import CoreSWHID
+ from swh.storage import get_storage
+
+ from .origin_locator import OriginLocator
+
+ locator = OriginLocator(
+ db=ctx.obj["db"],
+ storage=get_storage(**conf["storage"]),
+ graph=RemoteGraphClient(**conf["graph"]),
+ start_object=CoreSWHID.from_string(start_object),
+ end_object=CoreSWHID.from_string(end_object),
+ )
+
+ locator.run()
diff --git a/swh/scrubber/db.py b/swh/scrubber/db.py
--- a/swh/scrubber/db.py
+++ b/swh/scrubber/db.py
@@ -7,10 +7,11 @@
import dataclasses
import datetime
import functools
-from typing import Iterator, Union
+from typing import Iterator, List
+
+import psycopg2
from swh.core.db import BaseDb
-from swh.model.model import Content, Directory, Release, Revision, Snapshot
from swh.model.swhids import CoreSWHID
@@ -55,10 +56,7 @@
return id_
def corrupt_object_add(
- self,
- datastore: Datastore,
- object_: Union[Content, Directory, Revision, Release, Snapshot],
- serialized_object: bytes,
+ self, id: CoreSWHID, datastore: Datastore, serialized_object: bytes,
) -> None:
datastore_id = self.datastore_get_or_add(datastore)
cur = self.cursor()
@@ -68,7 +66,7 @@
VALUES (%s, %s, %s)
ON CONFLICT DO NOTHING
""",
- (str(object_.swhid()), datastore_id, serialized_object),
+ (str(id), datastore_id, serialized_object),
)
def corrupt_object_iter(self) -> Iterator[CorruptObject]:
@@ -94,3 +92,54 @@
package=ds_package, cls=ds_class, instance=ds_instance
),
)
+
+ def corrupt_object_grab(
+ self,
+ cur,
+ start_id: CoreSWHID = None,
+ end_id: CoreSWHID = None,
+ limit: int = 100,
+ ) -> List[CorruptObject]:
+ """Yields a page of records in the 'corrupt_object' table."""
+ cur.execute(
+ """
+ SELECT
+ co.id, co.first_occurrence, co.object,
+ ds.package, ds.class, ds.instance
+ FROM corrupt_object AS co
+ INNER JOIN datastore AS ds ON (ds.id=co.datastore)
+ WHERE
+ co.id >= %s
+ AND co.id <= %s
+ ORDER BY co.id
+ LIMIT %s
+ """,
+ (str(start_id), str(end_id), limit),
+ )
+
+ results = []
+ for row in cur:
+ (id, first_occurrence, object_, ds_package, ds_class, ds_instance) = row
+ results.append(
+ CorruptObject(
+ id=CoreSWHID.from_string(id),
+ first_occurrence=first_occurrence,
+ object_=object_,
+ datastore=Datastore(
+ package=ds_package, cls=ds_class, instance=ds_instance
+ ),
+ )
+ )
+
+ return results
+
+ def object_origin_add(self, cur, swhid: CoreSWHID, origins: List[str]) -> None:
+ psycopg2.extras.execute_values(
+ cur,
+ """
+ INSERT INTO object_origin (object_id, origin_url)
+ VALUES %s
+ ON CONFLICT DO NOTHING
+ """,
+ [(str(swhid), origin_url) for origin_url in origins],
+ )
diff --git a/swh/scrubber/journal_checker.py b/swh/scrubber/journal_checker.py
--- a/swh/scrubber/journal_checker.py
+++ b/swh/scrubber/journal_checker.py
@@ -66,4 +66,6 @@
object_ = cls.from_dict(kafka_to_value(message))
real_id = object_.compute_hash()
if object_.id != real_id:
- self.db.corrupt_object_add(self.datastore_info(), object_, message)
+ self.db.corrupt_object_add(
+ object_.swhid(), self.datastore_info(), message
+ )
diff --git a/swh/scrubber/origin_locator.py b/swh/scrubber/origin_locator.py
new file mode 100644
--- /dev/null
+++ b/swh/scrubber/origin_locator.py
@@ -0,0 +1,87 @@
+# Copyright (C) 2021-2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+"""Lists corrupt objects in the scrubber database, and lists candidate origins
+to recover them from."""
+
+import dataclasses
+import itertools
+import logging
+from typing import Iterable, Union
+
+import psycopg2
+
+from swh.core.utils import grouper
+from swh.graph.client import GraphArgumentException, RemoteGraphClient
+from swh.model.model import Directory, Release, Revision, Snapshot
+from swh.model.swhids import CoreSWHID, ExtendedSWHID
+from swh.storage.interface import StorageInterface
+
+from .db import CorruptObject, ScrubberDb
+from .utils import iter_corrupt_objects
+
+logger = logging.getLogger(__name__)
+
+ScrubbableObject = Union[Revision, Release, Snapshot, Directory]
+
+
+def get_origins(
+ graph: RemoteGraphClient, storage: StorageInterface, swhid: CoreSWHID
+) -> Iterable[str]:
+ try:
+ origin_swhids = [
+ ExtendedSWHID.from_string(line)
+ for line in graph.leaves(str(swhid), direction="backward")
+ if line.startswith("swh:1:ori:")
+ ]
+ except GraphArgumentException:
+ return
+
+ for origin_swhid_group in grouper(origin_swhids, 10):
+ origin_swhid_group = list(origin_swhid_group)
+ for (origin, origin_swhid) in zip(
+ storage.origin_get_by_sha1(
+ [origin_swhid.object_id for origin_swhid in origin_swhid_group]
+ ),
+ origin_swhid_group,
+ ):
+ if origin is None:
+ logger.error("%s found in graph but missing from storage", origin_swhid)
+ else:
+ yield origin["url"]
+
+
+@dataclasses.dataclass
+class OriginLocator:
+ """Reads a chunk of corrupt objects in the swh-scrubber database, then writes
+ to the same database a list of origins they might be recovered from."""
+
+ db: ScrubberDb
+ """Database to read from and write to."""
+ graph: RemoteGraphClient
+ storage: StorageInterface
+ """Used to resolve origin SHA1s to URLs."""
+
+ start_object: CoreSWHID
+ """Minimum SWHID to check (in alphabetical order)"""
+ end_object: CoreSWHID
+ """Maximum SWHID to check (in alphabetical order)"""
+
+ def run(self):
+ iter_corrupt_objects(
+ self.db, self.start_object, self.end_object, self.handle_corrupt_object
+ )
+
+ def handle_corrupt_object(
+ self, corrupt_object: CorruptObject, cur: psycopg2.extensions.cursor
+ ) -> None:
+ origins = get_origins(self.graph, self.storage, corrupt_object.id)
+
+ # Keep only 100 origins, to avoid flooding the DB.
+ # It is very unlikely an object disappred from 100 somwhat-randomly sampled
+ # origins.
+ first_origins = list(itertools.islice(origins, 0, 100))
+
+ self.db.object_origin_add(cur, corrupt_object.id, first_origins)
diff --git a/swh/scrubber/sql/30-schema.sql b/swh/scrubber/sql/30-schema.sql
--- a/swh/scrubber/sql/30-schema.sql
+++ b/swh/scrubber/sql/30-schema.sql
@@ -18,11 +18,20 @@
(
id swhid not null,
datastore int not null,
- first_occurrence timestamptz not null default now(),
- object bytea not null
+ object bytea not null,
+ first_occurrence timestamptz not null default now()
);
comment on table corrupt_object is 'Each row identifies an object that was found to be corrupt';
comment on column corrupt_object.datastore is 'Datastore the corrupt object was found in.';
-comment on column corrupt_object.first_occurrence is 'Moment the object was found to be corrupt for the first time';
comment on column corrupt_object.object is 'Corrupt object, as found in the datastore (possibly msgpack-encoded, using the journal''s serializer)';
+comment on column corrupt_object.first_occurrence is 'Moment the object was found to be corrupt for the first time';
+
+create table object_origin
+(
+ object_id swhid not null,
+ origin_url text not null,
+ last_attempt timestamptz -- NULL if not tried yet
+);
+
+comment on table object_origin is 'Maps objects to origins they might be found in.';
diff --git a/swh/scrubber/sql/60-indexes.sql b/swh/scrubber/sql/60-indexes.sql
--- a/swh/scrubber/sql/60-indexes.sql
+++ b/swh/scrubber/sql/60-indexes.sql
@@ -5,9 +5,20 @@
create unique index concurrently datastore_package_class_instance on datastore(package, class, instance);
+
-- corrupt_object
alter table corrupt_object add constraint corrupt_object_datastore_fkey foreign key (datastore) references datastore(id) not valid;
alter table corrupt_object validate constraint corrupt_object_datastore_fkey;
-create unique index corrupt_object_pkey on corrupt_object(id, datastore);
+create unique index concurrently corrupt_object_pkey on corrupt_object(id, datastore);
+alter table corrupt_object add primary key using index corrupt_object_pkey;
+
+-- object_origin
+
+create unique index concurrently object_origin_pkey on object_origin (object_id, origin_url);
+create index concurrently object_origin_by_origin on object_origin (origin_url, object_id);
+
+-- FIXME: not valid, because corrupt_object(id) is not unique
+-- alter table object_origin add constraint object_origin_object_fkey foreign key (object_id) references corrupt_object(id) not valid;
+-- alter table object_origin validate constraint object_origin_object_fkey;
diff --git a/swh/scrubber/storage_checker.py b/swh/scrubber/storage_checker.py
--- a/swh/scrubber/storage_checker.py
+++ b/swh/scrubber/storage_checker.py
@@ -98,5 +98,7 @@
real_id = object_.compute_hash()
if object_.id != real_id:
self.db.corrupt_object_add(
- self.datastore_info(), object_, value_to_kafka(object_.to_dict())
+ object_.swhid(),
+ self.datastore_info(),
+ value_to_kafka(object_.to_dict()),
)
diff --git a/swh/scrubber/tests/test_cli.py b/swh/scrubber/tests/test_cli.py
--- a/swh/scrubber/tests/test_cli.py
+++ b/swh/scrubber/tests/test_cli.py
@@ -9,6 +9,7 @@
from click.testing import CliRunner
import yaml
+from swh.model.swhids import CoreSWHID
from swh.scrubber.cli import scrubber_cli_group
from swh.scrubber.storage_checker import storage_db
@@ -25,6 +26,7 @@
config = {
"scrubber_db": {"cls": "local", "db": scrubber_db.conn.dsn},
+ "graph": {"url": "http://graph.example.org:5009/"},
}
if storage:
with storage_db(storage) as db:
@@ -113,3 +115,26 @@
},
)
assert journal_checker.method_calls == [call.run()]
+
+
+def test_locate_origins(mocker, scrubber_db, swh_storage):
+ origin_locator = MagicMock()
+ OriginLocator = mocker.patch(
+ "swh.scrubber.origin_locator.OriginLocator", return_value=origin_locator
+ )
+ get_scrubber_db = mocker.patch(
+ "swh.scrubber.get_scrubber_db", return_value=scrubber_db
+ )
+ result = invoke(scrubber_db, ["locate"], storage=swh_storage)
+ assert result.exit_code == 0, result.output
+ assert result.output == ""
+
+ get_scrubber_db.assert_called_once_with(cls="local", db=scrubber_db.conn.dsn)
+ OriginLocator.assert_called_once_with(
+ db=scrubber_db,
+ storage=OriginLocator.mock_calls[0][2]["storage"],
+ graph=OriginLocator.mock_calls[0][2]["graph"],
+ start_object=CoreSWHID.from_string("swh:1:cnt:" + "00" * 20),
+ end_object=CoreSWHID.from_string("swh:1:snp:" + "ff" * 20),
+ )
+ assert origin_locator.method_calls == [call.run()]
diff --git a/swh/scrubber/tests/test_origin_locator.py b/swh/scrubber/tests/test_origin_locator.py
new file mode 100644
--- /dev/null
+++ b/swh/scrubber/tests/test_origin_locator.py
@@ -0,0 +1,170 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import datetime
+import logging
+from unittest.mock import MagicMock
+
+import pytest
+
+from swh.graph.naive_client import NaiveClient as NaiveGraphClient
+from swh.model.model import Origin
+from swh.model.swhids import CoreSWHID
+from swh.scrubber.db import CorruptObject, Datastore
+from swh.scrubber.origin_locator import OriginLocator
+
+CORRUPT_OBJECT = CorruptObject(
+ id=CoreSWHID.from_string("swh:1:cnt:" + "f" * 40),
+ datastore=Datastore(package="storage", cls="postgresql", instance="service=swh"),
+ first_occurrence=datetime.datetime.now(tz=datetime.timezone.utc),
+ object_=b"blah",
+)
+
+
+@pytest.mark.parametrize("insert", [False, True])
+def test_no_objects(scrubber_db, insert):
+ if insert:
+ scrubber_db.corrupt_object_add(
+ CORRUPT_OBJECT.id, CORRUPT_OBJECT.datastore, CORRUPT_OBJECT.object_
+ )
+
+ graph = MagicMock()
+ storage = MagicMock()
+ locator = OriginLocator(
+ db=scrubber_db,
+ graph=graph,
+ storage=storage,
+ # this range does not contain the object above
+ start_object=CoreSWHID.from_string("swh:1:cnt:00" + "00" * 19),
+ end_object=CoreSWHID.from_string("swh:1:cnt:f0" + "00" * 19),
+ )
+
+ locator.run()
+
+ assert graph.method_calls == []
+ assert storage.method_calls == []
+
+ with scrubber_db.conn.cursor() as cur:
+ cur.execute("SELECT COUNT(*) FROM object_origin")
+ assert cur.fetchone() == (0,)
+
+
+def test_object_not_in_graph(scrubber_db):
+ scrubber_db.corrupt_object_add(
+ CORRUPT_OBJECT.id, CORRUPT_OBJECT.datastore, CORRUPT_OBJECT.object_
+ )
+
+ graph = NaiveGraphClient(nodes=[], edges=[])
+ storage = MagicMock()
+ locator = OriginLocator(
+ db=scrubber_db,
+ graph=graph,
+ storage=storage,
+ start_object=CoreSWHID.from_string("swh:1:cnt:" + "00" * 20),
+ end_object=CoreSWHID.from_string("swh:1:cnt:" + "00" * 20),
+ )
+
+ locator.run()
+
+ assert storage.method_calls == []
+
+ with scrubber_db.conn.cursor() as cur:
+ cur.execute("SELECT COUNT(*) FROM object_origin")
+ assert cur.fetchone() == (0,)
+
+
+def test_origin_not_in_storage(scrubber_db, swh_storage, caplog):
+ scrubber_db.corrupt_object_add(
+ CORRUPT_OBJECT.id, CORRUPT_OBJECT.datastore, CORRUPT_OBJECT.object_
+ )
+
+ origin = Origin(url="http://example.org")
+
+ graph = NaiveGraphClient(
+ nodes=[CORRUPT_OBJECT.id, origin.swhid()],
+ edges=[(origin.swhid(), CORRUPT_OBJECT.id)],
+ )
+ locator = OriginLocator(
+ db=scrubber_db,
+ graph=graph,
+ storage=swh_storage,
+ start_object=CoreSWHID.from_string("swh:1:cnt:" + "00" * 20),
+ end_object=CoreSWHID.from_string("swh:1:snp:" + "ff" * 20),
+ )
+
+ with caplog.at_level(logging.ERROR, logger="swh.scrubber.origin_locator"):
+ locator.run()
+
+ with scrubber_db.conn.cursor() as cur:
+ cur.execute("SELECT COUNT(*) FROM object_origin")
+ assert cur.fetchone() == (0,)
+
+ assert any(
+ f"{origin.swhid()} found in graph but missing" in record[2]
+ for record in caplog.record_tuples
+ )
+
+
+def test_two_origins(scrubber_db, swh_storage):
+ scrubber_db.corrupt_object_add(
+ CORRUPT_OBJECT.id, CORRUPT_OBJECT.datastore, CORRUPT_OBJECT.object_
+ )
+
+ origin1 = Origin(url="http://example.org")
+ origin2 = Origin(url="http://example.com")
+ swh_storage.origin_add([origin1, origin2])
+
+ graph = NaiveGraphClient(
+ nodes=[CORRUPT_OBJECT.id, origin1.swhid(), origin2.swhid()],
+ edges=[
+ (origin1.swhid(), CORRUPT_OBJECT.id),
+ (origin2.swhid(), CORRUPT_OBJECT.id),
+ ],
+ )
+ locator = OriginLocator(
+ db=scrubber_db,
+ graph=graph,
+ storage=swh_storage,
+ start_object=CoreSWHID.from_string("swh:1:cnt:" + "00" * 20),
+ end_object=CoreSWHID.from_string("swh:1:snp:" + "ff" * 20),
+ )
+
+ locator.run()
+
+ with scrubber_db.conn.cursor() as cur:
+ cur.execute("SELECT object_id, origin_url FROM object_origin")
+ assert set(cur) == {
+ (str(CORRUPT_OBJECT.id), origin1.url),
+ (str(CORRUPT_OBJECT.id), origin2.url),
+ }
+
+
+def test_many_origins(scrubber_db, swh_storage):
+ scrubber_db.corrupt_object_add(
+ CORRUPT_OBJECT.id, CORRUPT_OBJECT.datastore, CORRUPT_OBJECT.object_
+ )
+
+ origins = [Origin(url=f"http://example.org/{i}") for i in range(1000)]
+ swh_storage.origin_add(origins)
+
+ graph = NaiveGraphClient(
+ nodes=[CORRUPT_OBJECT.id] + [origin.swhid() for origin in origins],
+ edges=[(origin.swhid(), CORRUPT_OBJECT.id) for origin in origins],
+ )
+ locator = OriginLocator(
+ db=scrubber_db,
+ graph=graph,
+ storage=swh_storage,
+ start_object=CoreSWHID.from_string("swh:1:cnt:" + "00" * 20),
+ end_object=CoreSWHID.from_string("swh:1:snp:" + "ff" * 20),
+ )
+
+ locator.run()
+
+ with scrubber_db.conn.cursor() as cur:
+ cur.execute("SELECT object_id, origin_url FROM object_origin")
+ rows = set(cur)
+ assert rows <= {(str(CORRUPT_OBJECT.id), origin.url) for origin in origins}
+ assert len(rows) == 100
diff --git a/swh/scrubber/utils.py b/swh/scrubber/utils.py
new file mode 100644
--- /dev/null
+++ b/swh/scrubber/utils.py
@@ -0,0 +1,34 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+from typing import Callable
+
+import psycopg2
+
+from swh.model.swhids import CoreSWHID
+
+from .db import CorruptObject, ScrubberDb
+
+
+def iter_corrupt_objects(
+ db: ScrubberDb,
+ start_object: CoreSWHID,
+ end_object: CoreSWHID,
+ cb: Callable[[CorruptObject, psycopg2.extensions.cursor], None],
+) -> None:
+ while True:
+ with db.conn, db.cursor() as cur:
+ corrupt_objects = db.corrupt_object_grab(cur, start_object, end_object,)
+ if corrupt_objects and corrupt_objects[0].id == start_object:
+ # TODO: don't needlessly fetch duplicate objects
+ del corrupt_objects[0]
+ if not corrupt_objects:
+ # Nothing more to do
+ break
+ for corrupt_object in corrupt_objects:
+ cb(corrupt_object, cur)
+ db.conn.commit() # XXX: is this redundant with db.conn.__exit__?
+
+ start_object = corrupt_objects[-1].id
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Tue, Dec 17, 12:34 AM (2 w, 1 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3224879
Attached To
D7428: Save Code Now: Use different views for each tab
Event Timeline
Log In to Comment