Page MenuHomeSoftware Heritage

No OneTemporary

diff --git a/PKG-INFO b/PKG-INFO
index bd20acf..b0d372e 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,61 +1,61 @@
Metadata-Version: 2.1
Name: swh.scrubber
-Version: 0.0.5
+Version: 0.0.6
Summary: Software Heritage Datastore Scrubber
Home-page: https://forge.softwareheritage.org/diffusion/swh-scrubber
Author: Software Heritage developers
Author-email: swh-devel@inria.fr
Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest
Project-URL: Funding, https://www.softwareheritage.org/donate
Project-URL: Source, https://forge.softwareheritage.org/source/swh-scrubber
Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-scrubber/
Classifier: Programming Language :: Python :: 3
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3)
Classifier: Operating System :: OS Independent
Classifier: Development Status :: 3 - Alpha
Requires-Python: >=3.7
Description-Content-Type: text/x-rst
Provides-Extra: testing
License-File: LICENSE
License-File: AUTHORS
Software Heritage - Datastore Scrubber
======================================
Tools to periodically checks data integrity in swh-storage and swh-objstorage,
reports errors, and (try to) fix them.
This is a work in progress; some of the components described below do not
exist yet (cassandra storage checker, objstorage checker, recovery, and reinjection)
The Scrubber package is made of the following parts:
Checking
--------
Highly parallel processes continuously read objects from a data store,
compute checksums, and write any failure in a database, along with the data of
the corrupt object.
There is one "checker" for each datastore package: storage (postgresql and cassandra),
journal (kafka), and objstorage.
Recovery
--------
Then, from time to time, jobs go through the list of known corrupt objects,
and try to recover the original objects, through various means:
* Brute-forcing variations until they match their checksum
* Recovering from another data store
* As a last resort, recovering from known origins, if any
Reinjection
-----------
Finally, when an original object is recovered, it is reinjected in the original
data store, replacing the corrupt one.
diff --git a/swh.scrubber.egg-info/PKG-INFO b/swh.scrubber.egg-info/PKG-INFO
index bd20acf..b0d372e 100644
--- a/swh.scrubber.egg-info/PKG-INFO
+++ b/swh.scrubber.egg-info/PKG-INFO
@@ -1,61 +1,61 @@
Metadata-Version: 2.1
Name: swh.scrubber
-Version: 0.0.5
+Version: 0.0.6
Summary: Software Heritage Datastore Scrubber
Home-page: https://forge.softwareheritage.org/diffusion/swh-scrubber
Author: Software Heritage developers
Author-email: swh-devel@inria.fr
Project-URL: Bug Reports, https://forge.softwareheritage.org/maniphest
Project-URL: Funding, https://www.softwareheritage.org/donate
Project-URL: Source, https://forge.softwareheritage.org/source/swh-scrubber
Project-URL: Documentation, https://docs.softwareheritage.org/devel/swh-scrubber/
Classifier: Programming Language :: Python :: 3
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: GNU General Public License v3 (GPLv3)
Classifier: Operating System :: OS Independent
Classifier: Development Status :: 3 - Alpha
Requires-Python: >=3.7
Description-Content-Type: text/x-rst
Provides-Extra: testing
License-File: LICENSE
License-File: AUTHORS
Software Heritage - Datastore Scrubber
======================================
Tools to periodically checks data integrity in swh-storage and swh-objstorage,
reports errors, and (try to) fix them.
This is a work in progress; some of the components described below do not
exist yet (cassandra storage checker, objstorage checker, recovery, and reinjection)
The Scrubber package is made of the following parts:
Checking
--------
Highly parallel processes continuously read objects from a data store,
compute checksums, and write any failure in a database, along with the data of
the corrupt object.
There is one "checker" for each datastore package: storage (postgresql and cassandra),
journal (kafka), and objstorage.
Recovery
--------
Then, from time to time, jobs go through the list of known corrupt objects,
and try to recover the original objects, through various means:
* Brute-forcing variations until they match their checksum
* Recovering from another data store
* As a last resort, recovering from known origins, if any
Reinjection
-----------
Finally, when an original object is recovered, it is reinjected in the original
data store, replacing the corrupt one.
diff --git a/swh/scrubber/db.py b/swh/scrubber/db.py
index d11b7a0..092ed60 100644
--- a/swh/scrubber/db.py
+++ b/swh/scrubber/db.py
@@ -1,320 +1,320 @@
# Copyright (C) 2022 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU General Public License version 3, or any later version
# See top-level LICENSE file for more information
import dataclasses
import datetime
import functools
from typing import Iterator, List, Optional
import psycopg2
from swh.core.db import BaseDb
from swh.model.swhids import CoreSWHID
@dataclasses.dataclass(frozen=True)
class Datastore:
"""Represents a datastore being scrubbed; eg. swh-storage or swh-journal."""
package: str
"""'storage', 'journal', or 'objstorage'."""
cls: str
"""'postgresql'/'cassandra' for storage, 'kafka' for journal,
'pathslicer'/'winery'/... for objstorage."""
instance: str
"""Human readable string."""
@dataclasses.dataclass(frozen=True)
class CorruptObject:
id: CoreSWHID
datastore: Datastore
first_occurrence: datetime.datetime
object_: bytes
@dataclasses.dataclass(frozen=True)
class FixedObject:
id: CoreSWHID
object_: bytes
method: str
recovery_date: Optional[datetime.datetime] = None
class ScrubberDb(BaseDb):
current_version = 2
@functools.lru_cache(1000)
def datastore_get_or_add(self, datastore: Datastore) -> int:
"""Creates a datastore if it does not exist, and returns its id."""
- cur = self.cursor()
- cur.execute(
- """
- WITH inserted AS (
- INSERT INTO datastore (package, class, instance)
- VALUES (%(package)s, %(cls)s, %(instance)s)
- ON CONFLICT DO NOTHING
- RETURNING id
- )
- SELECT id
- FROM inserted
- UNION (
- -- If the datastore already exists, we need to fetch its id
+ with self.transaction() as cur:
+ cur.execute(
+ """
+ WITH inserted AS (
+ INSERT INTO datastore (package, class, instance)
+ VALUES (%(package)s, %(cls)s, %(instance)s)
+ ON CONFLICT DO NOTHING
+ RETURNING id
+ )
SELECT id
- FROM datastore
- WHERE
- package=%(package)s
- AND class=%(cls)s
- AND instance=%(instance)s
+ FROM inserted
+ UNION (
+ -- If the datastore already exists, we need to fetch its id
+ SELECT id
+ FROM datastore
+ WHERE
+ package=%(package)s
+ AND class=%(cls)s
+ AND instance=%(instance)s
+ )
+ LIMIT 1
+ """,
+ (dataclasses.asdict(datastore)),
)
- LIMIT 1
- """,
- (dataclasses.asdict(datastore)),
- )
- (id_,) = cur.fetchone()
- return id_
+ (id_,) = cur.fetchone()
+ return id_
def corrupt_object_add(
self,
id: CoreSWHID,
datastore: Datastore,
serialized_object: bytes,
) -> None:
datastore_id = self.datastore_get_or_add(datastore)
- cur = self.cursor()
- cur.execute(
- """
- INSERT INTO corrupt_object (id, datastore, object)
- VALUES (%s, %s, %s)
- ON CONFLICT DO NOTHING
- """,
- (str(id), datastore_id, serialized_object),
- )
+ with self.transaction() as cur:
+ cur.execute(
+ """
+ INSERT INTO corrupt_object (id, datastore, object)
+ VALUES (%s, %s, %s)
+ ON CONFLICT DO NOTHING
+ """,
+ (str(id), datastore_id, serialized_object),
+ )
def corrupt_object_iter(self) -> Iterator[CorruptObject]:
"""Yields all records in the 'corrupt_object' table."""
- cur = self.cursor()
- cur.execute(
- """
- SELECT
- co.id, co.first_occurrence, co.object,
- ds.package, ds.class, ds.instance
- FROM corrupt_object AS co
- INNER JOIN datastore AS ds ON (ds.id=co.datastore)
- """
- )
-
- for row in cur:
- (id, first_occurrence, object_, ds_package, ds_class, ds_instance) = row
- yield CorruptObject(
- id=CoreSWHID.from_string(id),
- first_occurrence=first_occurrence,
- object_=object_,
- datastore=Datastore(
- package=ds_package, cls=ds_class, instance=ds_instance
- ),
+ with self.transaction() as cur:
+ cur.execute(
+ """
+ SELECT
+ co.id, co.first_occurrence, co.object,
+ ds.package, ds.class, ds.instance
+ FROM corrupt_object AS co
+ INNER JOIN datastore AS ds ON (ds.id=co.datastore)
+ """
)
+ for row in cur:
+ (id, first_occurrence, object_, ds_package, ds_class, ds_instance) = row
+ yield CorruptObject(
+ id=CoreSWHID.from_string(id),
+ first_occurrence=first_occurrence,
+ object_=object_,
+ datastore=Datastore(
+ package=ds_package, cls=ds_class, instance=ds_instance
+ ),
+ )
+
def _corrupt_object_list_from_cursor(
self, cur: psycopg2.extensions.cursor
) -> List[CorruptObject]:
results = []
for row in cur:
(id, first_occurrence, object_, ds_package, ds_class, ds_instance) = row
results.append(
CorruptObject(
id=CoreSWHID.from_string(id),
first_occurrence=first_occurrence,
object_=object_,
datastore=Datastore(
package=ds_package, cls=ds_class, instance=ds_instance
),
)
)
return results
def corrupt_object_get(
self,
start_id: CoreSWHID,
end_id: CoreSWHID,
limit: int = 100,
) -> List[CorruptObject]:
"""Yields a page of records in the 'corrupt_object' table, ordered by id.
Arguments:
start_id: Only return objects after this id
end_id: Only return objects before this id
in_origin: An origin URL. If provided, only returns objects that may be
found in the given origin
"""
- cur = self.cursor()
- cur.execute(
- """
- SELECT
- co.id, co.first_occurrence, co.object,
- ds.package, ds.class, ds.instance
- FROM corrupt_object AS co
- INNER JOIN datastore AS ds ON (ds.id=co.datastore)
- WHERE
- co.id >= %s
- AND co.id <= %s
- ORDER BY co.id
- LIMIT %s
- """,
- (str(start_id), str(end_id), limit),
- )
- return self._corrupt_object_list_from_cursor(cur)
+ with self.transaction() as cur:
+ cur.execute(
+ """
+ SELECT
+ co.id, co.first_occurrence, co.object,
+ ds.package, ds.class, ds.instance
+ FROM corrupt_object AS co
+ INNER JOIN datastore AS ds ON (ds.id=co.datastore)
+ WHERE
+ co.id >= %s
+ AND co.id <= %s
+ ORDER BY co.id
+ LIMIT %s
+ """,
+ (str(start_id), str(end_id), limit),
+ )
+ return self._corrupt_object_list_from_cursor(cur)
def corrupt_object_grab_by_id(
self,
cur: psycopg2.extensions.cursor,
start_id: CoreSWHID,
end_id: CoreSWHID,
limit: int = 100,
) -> List[CorruptObject]:
"""Returns a page of records in the 'corrupt_object' table for a fixer,
ordered by id
These records are not already fixed (ie. do not have a corresponding entry
in the 'fixed_object' table), and they are selected with an exclusive update
lock.
Arguments:
start_id: Only return objects after this id
end_id: Only return objects before this id
"""
cur.execute(
"""
SELECT
co.id, co.first_occurrence, co.object,
ds.package, ds.class, ds.instance
FROM corrupt_object AS co
INNER JOIN datastore AS ds ON (ds.id=co.datastore)
WHERE
co.id >= %(start_id)s
AND co.id <= %(end_id)s
AND NOT EXISTS (SELECT 1 FROM fixed_object WHERE fixed_object.id=co.id)
ORDER BY co.id
LIMIT %(limit)s
FOR UPDATE SKIP LOCKED
""",
dict(
start_id=str(start_id),
end_id=str(end_id),
limit=limit,
),
)
return self._corrupt_object_list_from_cursor(cur)
def corrupt_object_grab_by_origin(
self,
cur: psycopg2.extensions.cursor,
origin_url: str,
start_id: Optional[CoreSWHID] = None,
end_id: Optional[CoreSWHID] = None,
limit: int = 100,
) -> List[CorruptObject]:
"""Returns a page of records in the 'corrupt_object' table for a fixer,
ordered by id
These records are not already fixed (ie. do not have a corresponding entry
in the 'fixed_object' table), and they are selected with an exclusive update
lock.
Arguments:
origin_url: only returns objects that may be found in the given origin
"""
cur.execute(
"""
SELECT
co.id, co.first_occurrence, co.object,
ds.package, ds.class, ds.instance
FROM corrupt_object AS co
INNER JOIN datastore AS ds ON (ds.id=co.datastore)
INNER JOIN object_origin AS oo ON (oo.object_id=co.id)
WHERE
(co.id >= %(start_id)s OR %(start_id)s IS NULL)
AND (co.id <= %(end_id)s OR %(end_id)s IS NULL)
AND NOT EXISTS (SELECT 1 FROM fixed_object WHERE fixed_object.id=co.id)
AND oo.origin_url=%(origin_url)s
ORDER BY co.id
LIMIT %(limit)s
FOR UPDATE SKIP LOCKED
""",
dict(
start_id=None if start_id is None else str(start_id),
end_id=None if end_id is None else str(end_id),
origin_url=origin_url,
limit=limit,
),
)
return self._corrupt_object_list_from_cursor(cur)
def object_origin_add(
self, cur: psycopg2.extensions.cursor, swhid: CoreSWHID, origins: List[str]
) -> None:
psycopg2.extras.execute_values(
cur,
"""
INSERT INTO object_origin (object_id, origin_url)
VALUES %s
ON CONFLICT DO NOTHING
""",
[(str(swhid), origin_url) for origin_url in origins],
)
def object_origin_get(self, after: str = "", limit: int = 1000) -> List[str]:
"""Returns origins with non-fixed corrupt objects, ordered by URL.
Arguments:
after: if given, only returns origins with an URL after this value
"""
- cur = self.cursor()
- cur.execute(
- """
- SELECT DISTINCT origin_url
- FROM object_origin
- WHERE
- origin_url > %(after)s
- AND object_id IN (
- (SELECT id FROM corrupt_object)
- EXCEPT (SELECT id FROM fixed_object)
- )
- ORDER BY origin_url
- LIMIT %(limit)s
- """,
- dict(after=after, limit=limit),
- )
+ with self.transaction() as cur:
+ cur.execute(
+ """
+ SELECT DISTINCT origin_url
+ FROM object_origin
+ WHERE
+ origin_url > %(after)s
+ AND object_id IN (
+ (SELECT id FROM corrupt_object)
+ EXCEPT (SELECT id FROM fixed_object)
+ )
+ ORDER BY origin_url
+ LIMIT %(limit)s
+ """,
+ dict(after=after, limit=limit),
+ )
- return [origin_url for (origin_url,) in cur]
+ return [origin_url for (origin_url,) in cur]
def fixed_object_add(
self, cur: psycopg2.extensions.cursor, fixed_objects: List[FixedObject]
) -> None:
psycopg2.extras.execute_values(
cur,
"""
INSERT INTO fixed_object (id, object, method)
VALUES %s
ON CONFLICT DO NOTHING
""",
[
(str(fixed_object.id), fixed_object.object_, fixed_object.method)
for fixed_object in fixed_objects
],
)
def fixed_object_iter(self) -> Iterator[FixedObject]:
- cur = self.cursor()
- cur.execute("SELECT id, object, method, recovery_date FROM fixed_object")
- for (id, object_, method, recovery_date) in cur:
- yield FixedObject(
- id=CoreSWHID.from_string(id),
- object_=object_,
- method=method,
- recovery_date=recovery_date,
- )
+ with self.transaction() as cur:
+ cur.execute("SELECT id, object, method, recovery_date FROM fixed_object")
+ for (id, object_, method, recovery_date) in cur:
+ yield FixedObject(
+ id=CoreSWHID.from_string(id),
+ object_=object_,
+ method=method,
+ recovery_date=recovery_date,
+ )

File Metadata

Mime Type
text/x-diff
Expires
Tue, Jun 3, 7:42 AM (3 d, 13 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3297982

Event Timeline