Page MenuHomeSoftware Heritage

D8248.diff
No OneTemporary

D8248.diff

diff --git a/swh/scrubber/db.py b/swh/scrubber/db.py
--- a/swh/scrubber/db.py
+++ b/swh/scrubber/db.py
@@ -7,7 +7,7 @@
import dataclasses
import datetime
import functools
-from typing import Iterator, List, Optional
+from typing import Iterable, Iterator, List, Optional
import psycopg2
@@ -36,6 +36,21 @@
object_: bytes
+@dataclasses.dataclass(frozen=True)
+class MissingObject:
+ id: CoreSWHID
+ datastore: Datastore
+ first_occurrence: datetime.datetime
+
+
+@dataclasses.dataclass(frozen=True)
+class MissingObjectReference:
+ missing_id: CoreSWHID
+ reference_id: CoreSWHID
+ datastore: Datastore
+ first_occurrence: datetime.datetime
+
+
@dataclasses.dataclass(frozen=True)
class FixedObject:
id: CoreSWHID
@@ -47,6 +62,10 @@
class ScrubberDb(BaseDb):
current_version = 2
+ ####################################
+ # Shared tables
+ ####################################
+
@functools.lru_cache(1000)
def datastore_get_or_add(self, datastore: Datastore) -> int:
"""Creates a datastore if it does not exist, and returns its id."""
@@ -79,6 +98,10 @@
(id_,) = res
return id_
+ ####################################
+ # Inventory of objects with issues
+ ####################################
+
def corrupt_object_add(
self,
id: CoreSWHID,
@@ -256,6 +279,113 @@
)
return self._corrupt_object_list_from_cursor(cur)
+ def missing_object_add(
+ self,
+ id: CoreSWHID,
+ reference_ids: Iterable[CoreSWHID],
+ datastore: Datastore,
+ ) -> None:
+ """
+ Adds a "hole" to the inventory, ie. an object missing from a datastore
+ that is referenced by an other object of the same datastore.
+
+ If the missing object is already known to be missing by the scrubber database,
+ this only records the reference (which can be useful to locate an origin
+ to recover the object from).
+ If that reference is already known too, this is a noop.
+
+ Args:
+ id: SWHID of the missing object (the hole)
+ reference_id: SWHID of the object referencing the missing object
+ datastore: representation of the swh-storage/swh-journal/... instance
+ containing this hole
+ """
+ if not reference_ids:
+ raise ValueError("reference_ids is empty")
+ datastore_id = self.datastore_get_or_add(datastore)
+ with self.transaction() as cur:
+ cur.execute(
+ """
+ INSERT INTO missing_object (id, datastore)
+ VALUES (%s, %s)
+ ON CONFLICT DO NOTHING
+ """,
+ (str(id), datastore_id),
+ )
+ psycopg2.extras.execute_batch(
+ cur,
+ """
+ INSERT INTO missing_object_reference (missing_id, reference_id, datastore)
+ VALUES (%s, %s, %s)
+ ON CONFLICT DO NOTHING
+ """,
+ [
+ (str(id), str(reference_id), datastore_id)
+ for reference_id in reference_ids
+ ],
+ )
+
+ def missing_object_iter(self) -> Iterator[MissingObject]:
+ """Yields all records in the 'missing_object' table."""
+ with self.transaction() as cur:
+ cur.execute(
+ """
+ SELECT
+ mo.id, mo.first_occurrence,
+ ds.package, ds.class, ds.instance
+ FROM missing_object AS mo
+ INNER JOIN datastore AS ds ON (ds.id=mo.datastore)
+ """
+ )
+
+ for row in cur:
+ (id, first_occurrence, ds_package, ds_class, ds_instance) = row
+ yield MissingObject(
+ id=CoreSWHID.from_string(id),
+ first_occurrence=first_occurrence,
+ datastore=Datastore(
+ package=ds_package, cls=ds_class, instance=ds_instance
+ ),
+ )
+
+ def missing_object_reference_iter(
+ self, missing_id: CoreSWHID
+ ) -> Iterator[MissingObjectReference]:
+ """Yields all records in the 'missing_object_reference' table."""
+ with self.transaction() as cur:
+ cur.execute(
+ """
+ SELECT
+ mor.reference_id, mor.first_occurrence,
+ ds.package, ds.class, ds.instance
+ FROM missing_object_reference AS mor
+ INNER JOIN datastore AS ds ON (ds.id=mor.datastore)
+ WHERE mor.missing_id=%s
+ """,
+ (str(missing_id),),
+ )
+
+ for row in cur:
+ (
+ reference_id,
+ first_occurrence,
+ ds_package,
+ ds_class,
+ ds_instance,
+ ) = row
+ yield MissingObjectReference(
+ missing_id=missing_id,
+ reference_id=CoreSWHID.from_string(reference_id),
+ first_occurrence=first_occurrence,
+ datastore=Datastore(
+ package=ds_package, cls=ds_class, instance=ds_instance
+ ),
+ )
+
+ ####################################
+ # Issue resolution
+ ####################################
+
def object_origin_add(
self, cur: psycopg2.extensions.cursor, swhid: CoreSWHID, origins: List[str]
) -> None:
diff --git a/swh/scrubber/sql/30-schema.sql b/swh/scrubber/sql/30-schema.sql
--- a/swh/scrubber/sql/30-schema.sql
+++ b/swh/scrubber/sql/30-schema.sql
@@ -1,3 +1,7 @@
+-------------------------------------
+-- Shared definitions
+-------------------------------------
+
create domain swhid as text check (value ~ '^swh:[0-9]+:.*');
create table datastore
@@ -14,6 +18,11 @@
comment on column datastore.class is 'For datastores with multiple backends, name of the backend (postgresql/cassandra for storage, kafka for journal, pathslicer/azure/winery/... for objstorage)';
comment on column datastore.instance is 'Human-readable way to uniquely identify the datastore; eg. its URL or DSN.';
+
+-------------------------------------
+-- Inventory of objects with issues
+-------------------------------------
+
create table corrupt_object
(
id swhid not null,
@@ -27,6 +36,37 @@
comment on column corrupt_object.object is 'Corrupt object, as found in the datastore (possibly msgpack-encoded, using the journal''s serializer)';
comment on column corrupt_object.first_occurrence is 'Moment the object was found to be corrupt for the first time';
+
+create table missing_object
+(
+ id swhid not null,
+ datastore int not null,
+ first_occurrence timestamptz not null default now()
+);
+
+comment on table missing_object is 'Each row identifies an object that are missing but referenced by another object (aka "holes")';
+comment on column missing_object.datastore is 'Datastore where the hole is.';
+comment on column missing_object.first_occurrence is 'Moment the object was found to be corrupt for the first time';
+
+create table missing_object_reference
+(
+ missing_id swhid not null,
+ reference_id swhid not null,
+ datastore int not null,
+ first_occurrence timestamptz not null default now()
+);
+
+comment on table missing_object_reference is 'Each row identifies an object that points to an object that does not exist (aka a "hole")';
+comment on column missing_object_reference.missing_id is 'SWHID of the missing object.';
+comment on column missing_object_reference.reference_id is 'SWHID of the object referencing the missing object.';
+comment on column missing_object_reference.datastore is 'Datastore where the referencing object is.';
+comment on column missing_object_reference.first_occurrence is 'Moment the object was found to reference a missing object';
+
+
+-------------------------------------
+-- Issue resolution
+-------------------------------------
+
create table object_origin
(
object_id swhid not null,
diff --git a/swh/scrubber/sql/60-indexes.sql b/swh/scrubber/sql/60-indexes.sql
--- a/swh/scrubber/sql/60-indexes.sql
+++ b/swh/scrubber/sql/60-indexes.sql
@@ -1,3 +1,7 @@
+-------------------------------------
+-- Shared tables
+-------------------------------------
+
-- datastore
create unique index concurrently datastore_pkey on datastore(id);
@@ -6,6 +10,10 @@
create unique index concurrently datastore_package_class_instance on datastore(package, class, instance);
+-------------------------------------
+-- Inventory of objects with issues
+-------------------------------------
+
-- corrupt_object
alter table corrupt_object add constraint corrupt_object_datastore_fkey foreign key (datastore) references datastore(id) not valid;
@@ -14,6 +22,28 @@
create unique index concurrently corrupt_object_pkey on corrupt_object(id, datastore);
alter table corrupt_object add primary key using index corrupt_object_pkey;
+
+-- missing_object
+
+alter table missing_object add constraint missing_object_datastore_fkey foreign key (datastore) references datastore(id) not valid;
+alter table missing_object validate constraint missing_object_datastore_fkey;
+
+create unique index concurrently missing_object_pkey on missing_object(id, datastore);
+alter table missing_object add primary key using index missing_object_pkey;
+
+
+-- missing_object_reference
+
+alter table missing_object_reference add constraint missing_object_reference_datastore_fkey foreign key (datastore) references datastore(id) not valid;
+alter table missing_object_reference validate constraint missing_object_reference_datastore_fkey;
+
+create unique index concurrently missing_object_reference_missing_id_reference_id_datastore on missing_object_reference(missing_id, reference_id, datastore);
+create unique index concurrently missing_object_reference_reference_id_missing_id_datastore on missing_object_reference(reference_id, missing_id, datastore);
+
+-------------------------------------
+-- Issue resolution
+-------------------------------------
+
-- object_origin
create unique index concurrently object_origin_pkey on object_origin (object_id, origin_url);
diff --git a/swh/scrubber/storage_checker.py b/swh/scrubber/storage_checker.py
--- a/swh/scrubber/storage_checker.py
+++ b/swh/scrubber/storage_checker.py
@@ -5,13 +5,23 @@
"""Reads all objects in a swh-storage instance and recomputes their checksums."""
+import collections
import contextlib
import dataclasses
import logging
from typing import Iterable, Union
from swh.journal.serializers import value_to_kafka
-from swh.model.model import Directory, Release, Revision, Snapshot
+from swh.model import swhids
+from swh.model.model import (
+ Content,
+ Directory,
+ ObjectType,
+ Release,
+ Revision,
+ Snapshot,
+ TargetType,
+)
from swh.storage import backfill
from swh.storage.interface import StorageInterface
from swh.storage.postgresql.storage import Storage as PostgresqlStorage
@@ -20,7 +30,7 @@
logger = logging.getLogger(__name__)
-ScrubbableObject = Union[Revision, Release, Snapshot, Directory]
+ScrubbableObject = Union[Revision, Release, Snapshot, Directory, Content]
@contextlib.contextmanager
@@ -93,10 +103,15 @@
)
objects = list(objects)
- self.process_objects(objects)
+ self.check_object_hashes(objects)
+ self.check_object_references(objects)
- def process_objects(self, objects: Iterable[ScrubbableObject]):
+ def check_object_hashes(self, objects: Iterable[ScrubbableObject]):
+ """Recomputes hashes, and reports mismatches."""
for object_ in objects:
+ if isinstance(object_, Content):
+ # TODO
+ continue
real_id = object_.compute_hash()
if object_.id != real_id:
self.db.corrupt_object_add(
@@ -104,3 +119,113 @@
self.datastore_info(),
value_to_kafka(object_.to_dict()),
)
+
+ def check_object_references(self, objects: Iterable[ScrubbableObject]):
+ """Check all objects references by these objects exist."""
+ cnt_references = collections.defaultdict(set)
+ dir_references = collections.defaultdict(set)
+ rev_references = collections.defaultdict(set)
+ rel_references = collections.defaultdict(set)
+ snp_references = collections.defaultdict(set)
+
+ for object_ in objects:
+ swhid = object_.swhid()
+
+ if isinstance(object_, Content):
+ pass
+ elif isinstance(object_, Directory):
+ for entry in object_.entries:
+ if entry.type == "file":
+ cnt_references[entry.target].add(swhid)
+ elif entry.type == "dir":
+ dir_references[entry.target].add(swhid)
+ elif entry.type == "rev":
+ # dir->rev holes are not considered a problem because they
+ # happen whenever git submodules point to repositories that
+ # were not loaded yet; ignore them
+ pass
+ else:
+ assert False, entry
+ elif isinstance(object_, Revision):
+ dir_references[object_.directory].add(swhid)
+ for parent in object_.parents:
+ rev_references[parent].add(swhid)
+ elif isinstance(object_, Release):
+ if object_.target is None:
+ pass
+ elif object_.target_type == ObjectType.CONTENT:
+ cnt_references[object_.target].add(swhid)
+ elif object_.target_type == ObjectType.DIRECTORY:
+ dir_references[object_.target].add(swhid)
+ elif object_.target_type == ObjectType.REVISION:
+ rev_references[object_.target].add(swhid)
+ elif object_.target_type == ObjectType.RELEASE:
+ rel_references[object_.target].add(swhid)
+ else:
+ assert False, object_
+ elif isinstance(object_, Snapshot):
+ for branch in object_.branches.values():
+ if branch is None:
+ pass
+ elif branch.target_type == TargetType.ALIAS:
+ pass
+ elif branch.target_type == TargetType.CONTENT:
+ cnt_references[branch.target].add(swhid)
+ elif branch.target_type == TargetType.DIRECTORY:
+ dir_references[branch.target].add(swhid)
+ elif branch.target_type == TargetType.REVISION:
+ rev_references[branch.target].add(swhid)
+ elif branch.target_type == TargetType.RELEASE:
+ rel_references[branch.target].add(swhid)
+ elif branch.target_type == TargetType.SNAPSHOT:
+ snp_references[branch.target].add(swhid)
+ else:
+ assert False, (str(object_.swhid()), branch)
+ else:
+ assert False, object_.swhid()
+
+ missing_cnts = self.storage.content_missing_per_sha1_git(list(cnt_references))
+ missing_dirs = self.storage.directory_missing(list(dir_references))
+ missing_revs = self.storage.revision_missing(list(rev_references))
+ missing_rels = self.storage.release_missing(list(rel_references))
+ missing_snps = self.storage.snapshot_missing(list(snp_references))
+
+ for missing_id in missing_cnts:
+ missing_swhid = swhids.CoreSWHID(
+ object_type=swhids.ObjectType.CONTENT, object_id=missing_id
+ )
+ self.db.missing_object_add(
+ missing_swhid, cnt_references[missing_id], self.datastore_info()
+ )
+
+ for missing_id in missing_dirs:
+ missing_swhid = swhids.CoreSWHID(
+ object_type=swhids.ObjectType.DIRECTORY, object_id=missing_id
+ )
+ self.db.missing_object_add(
+ missing_swhid, dir_references[missing_id], self.datastore_info()
+ )
+
+ for missing_id in missing_revs:
+ missing_swhid = swhids.CoreSWHID(
+ object_type=swhids.ObjectType.REVISION, object_id=missing_id
+ )
+ self.db.missing_object_add(
+ missing_swhid, rev_references[missing_id], self.datastore_info()
+ )
+
+ for missing_id in missing_rels:
+ missing_swhid = swhids.CoreSWHID(
+ object_type=swhids.ObjectType.RELEASE, object_id=missing_id
+ )
+ self.db.missing_object_add(
+ missing_swhid, rel_references[missing_id], self.datastore_info()
+ )
+
+ for missing_id in missing_snps:
+ missing_swhid = swhids.CoreSWHID(
+ object_type=swhids.ObjectType.SNAPSHOT, object_id=missing_id
+ )
+ self.db.missing_object_add(
+ missing_swhid, snp_references[missing_id], self.datastore_info()
+ )
diff --git a/swh/scrubber/tests/test_storage_postgresql.py b/swh/scrubber/tests/test_storage_postgresql.py
--- a/swh/scrubber/tests/test_storage_postgresql.py
+++ b/swh/scrubber/tests/test_storage_postgresql.py
@@ -10,12 +10,55 @@
import pytest
from swh.journal.serializers import kafka_to_value
-from swh.model import swhids
+from swh.model import model, swhids
from swh.model.tests import swh_model_data
from swh.scrubber.storage_checker import StorageChecker
from swh.storage.backfill import byte_ranges
-# decorator to make swh.storage.backfill use less ranges, so tests run faster
+CONTENT1 = model.Content.from_data(b"foo")
+DIRECTORY1 = model.Directory(
+ entries=(
+ model.DirectoryEntry(
+ target=CONTENT1.sha1_git, type="file", name=b"file1", perms=0o1
+ ),
+ )
+)
+DIRECTORY2 = model.Directory(
+ entries=(
+ model.DirectoryEntry(
+ target=CONTENT1.sha1_git, type="file", name=b"file2", perms=0o1
+ ),
+ model.DirectoryEntry(target=DIRECTORY1.id, type="dir", name=b"dir1", perms=0o1),
+ model.DirectoryEntry(target=b"\x00" * 20, type="rev", name=b"rev1", perms=0o1),
+ )
+)
+REVISION1 = model.Revision(
+ message=b"blah",
+ directory=DIRECTORY2.id,
+ author=None,
+ committer=None,
+ date=None,
+ committer_date=None,
+ type=model.RevisionType.GIT,
+ synthetic=True,
+)
+RELEASE1 = model.Release(
+ message=b"blih",
+ name=b"bluh",
+ target_type=model.ObjectType.REVISION,
+ target=REVISION1.id,
+ synthetic=True,
+)
+SNAPSHOT1 = model.Snapshot(
+ branches={
+ b"rel1": model.SnapshotBranch(
+ target_type=model.TargetType.RELEASE, target=RELEASE1.id
+ ),
+ }
+)
+
+
+# decorator to make swh.storage.backfill use fewer ranges, so tests run faster
patch_byte_ranges = unittest.mock.patch(
"swh.storage.backfill.byte_ranges",
lambda numbits, start, end: byte_ranges(numbits // 8, start, end),
@@ -142,3 +185,107 @@
"swh:1:snp:ffffffffffffffffffffffffffffffffffffffff",
]
}
+
+
+@patch_byte_ranges
+def test_no_hole(scrubber_db, swh_storage):
+ swh_storage.content_add([CONTENT1])
+ swh_storage.directory_add([DIRECTORY1, DIRECTORY2])
+ swh_storage.revision_add([REVISION1])
+ swh_storage.release_add([RELEASE1])
+ swh_storage.snapshot_add([SNAPSHOT1])
+
+ for object_type in ("snapshot", "release", "revision", "directory"):
+ StorageChecker(
+ db=scrubber_db,
+ storage=swh_storage,
+ object_type=object_type,
+ start_object="00" * 20,
+ end_object="ff" * 20,
+ ).run()
+
+ assert list(scrubber_db.missing_object_iter()) == []
+
+
+@pytest.mark.parametrize(
+ "missing_object",
+ ["content1", "directory1", "directory2", "revision1", "release1"],
+)
+@patch_byte_ranges
+def test_one_hole(scrubber_db, swh_storage, missing_object):
+ if missing_object == "content1":
+ missing_swhid = CONTENT1.swhid()
+ reference_swhids = [DIRECTORY1.swhid(), DIRECTORY2.swhid()]
+ else:
+ swh_storage.content_add([CONTENT1])
+
+ if missing_object == "directory1":
+ missing_swhid = DIRECTORY1.swhid()
+ reference_swhids = [DIRECTORY2.swhid()]
+ else:
+ swh_storage.directory_add([DIRECTORY1])
+
+ if missing_object == "directory2":
+ missing_swhid = DIRECTORY2.swhid()
+ reference_swhids = [REVISION1.swhid()]
+ else:
+ swh_storage.directory_add([DIRECTORY2])
+
+ if missing_object == "revision1":
+ missing_swhid = REVISION1.swhid()
+ reference_swhids = [RELEASE1.swhid()]
+ else:
+ swh_storage.revision_add([REVISION1])
+
+ if missing_object == "release1":
+ missing_swhid = RELEASE1.swhid()
+ reference_swhids = [SNAPSHOT1.swhid()]
+ else:
+ swh_storage.release_add([RELEASE1])
+
+ swh_storage.snapshot_add([SNAPSHOT1])
+
+ for object_type in ("snapshot", "release", "revision", "directory"):
+ StorageChecker(
+ db=scrubber_db,
+ storage=swh_storage,
+ object_type=object_type,
+ start_object="00" * 20,
+ end_object="ff" * 20,
+ ).run()
+
+ assert [mo.id for mo in scrubber_db.missing_object_iter()] == [missing_swhid]
+ assert {
+ (mor.missing_id, mor.reference_id)
+ for mor in scrubber_db.missing_object_reference_iter(missing_swhid)
+ } == {(missing_swhid, reference_swhid) for reference_swhid in reference_swhids}
+
+
+@patch_byte_ranges
+def test_two_holes(scrubber_db, swh_storage):
+ # missing content and revision
+ swh_storage.directory_add([DIRECTORY1, DIRECTORY2])
+ swh_storage.release_add([RELEASE1])
+ swh_storage.snapshot_add([SNAPSHOT1])
+
+ for object_type in ("snapshot", "release", "revision", "directory"):
+ StorageChecker(
+ db=scrubber_db,
+ storage=swh_storage,
+ object_type=object_type,
+ start_object="00" * 20,
+ end_object="ff" * 20,
+ ).run()
+
+ assert {mo.id for mo in scrubber_db.missing_object_iter()} == {
+ CONTENT1.swhid(),
+ REVISION1.swhid(),
+ }
+ assert {
+ mor.reference_id
+ for mor in scrubber_db.missing_object_reference_iter(CONTENT1.swhid())
+ } == {DIRECTORY1.swhid(), DIRECTORY2.swhid()}
+ assert {
+ mor.reference_id
+ for mor in scrubber_db.missing_object_reference_iter(REVISION1.swhid())
+ } == {RELEASE1.swhid()}

File Metadata

Mime Type
text/plain
Expires
Tue, Dec 17, 5:50 PM (2 d, 18 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3216943

Event Timeline