Changeset View
Changeset View
Standalone View
Standalone View
swh/scrubber/db.py
# Copyright (C) 2022 The Software Heritage developers | # Copyright (C) 2022 The Software Heritage developers | ||||
# See the AUTHORS file at the top-level directory of this distribution | # See the AUTHORS file at the top-level directory of this distribution | ||||
# License: GNU General Public License version 3, or any later version | # License: GNU General Public License version 3, or any later version | ||||
# See top-level LICENSE file for more information | # See top-level LICENSE file for more information | ||||
import dataclasses | import dataclasses | ||||
import datetime | import datetime | ||||
import functools | import functools | ||||
from typing import Iterator, List, Optional | from typing import Iterable, Iterator, List, Optional | ||||
import psycopg2 | import psycopg2 | ||||
from swh.core.db import BaseDb | from swh.core.db import BaseDb | ||||
from swh.model.swhids import CoreSWHID | from swh.model.swhids import CoreSWHID | ||||
@dataclasses.dataclass(frozen=True) | @dataclasses.dataclass(frozen=True) | ||||
Show All 13 Lines | |||||
class CorruptObject: | class CorruptObject: | ||||
id: CoreSWHID | id: CoreSWHID | ||||
datastore: Datastore | datastore: Datastore | ||||
first_occurrence: datetime.datetime | first_occurrence: datetime.datetime | ||||
object_: bytes | object_: bytes | ||||
@dataclasses.dataclass(frozen=True) | @dataclasses.dataclass(frozen=True) | ||||
class MissingObject: | |||||
id: CoreSWHID | |||||
datastore: Datastore | |||||
first_occurrence: datetime.datetime | |||||
@dataclasses.dataclass(frozen=True) | |||||
class MissingObjectReference: | |||||
missing_id: CoreSWHID | |||||
reference_id: CoreSWHID | |||||
datastore: Datastore | |||||
first_occurrence: datetime.datetime | |||||
@dataclasses.dataclass(frozen=True) | |||||
class FixedObject: | class FixedObject: | ||||
id: CoreSWHID | id: CoreSWHID | ||||
object_: bytes | object_: bytes | ||||
method: str | method: str | ||||
recovery_date: Optional[datetime.datetime] = None | recovery_date: Optional[datetime.datetime] = None | ||||
class ScrubberDb(BaseDb): | class ScrubberDb(BaseDb): | ||||
current_version = 2 | current_version = 2 | ||||
#################################### | |||||
# Shared tables | |||||
#################################### | |||||
@functools.lru_cache(1000) | @functools.lru_cache(1000) | ||||
def datastore_get_or_add(self, datastore: Datastore) -> int: | def datastore_get_or_add(self, datastore: Datastore) -> int: | ||||
"""Creates a datastore if it does not exist, and returns its id.""" | """Creates a datastore if it does not exist, and returns its id.""" | ||||
with self.transaction() as cur: | with self.transaction() as cur: | ||||
cur.execute( | cur.execute( | ||||
""" | """ | ||||
WITH inserted AS ( | WITH inserted AS ( | ||||
INSERT INTO datastore (package, class, instance) | INSERT INTO datastore (package, class, instance) | ||||
Show All 16 Lines | def datastore_get_or_add(self, datastore: Datastore) -> int: | ||||
""", | """, | ||||
(dataclasses.asdict(datastore)), | (dataclasses.asdict(datastore)), | ||||
) | ) | ||||
res = cur.fetchone() | res = cur.fetchone() | ||||
assert res is not None | assert res is not None | ||||
(id_,) = res | (id_,) = res | ||||
return id_ | return id_ | ||||
#################################### | |||||
# Inventory of objects with issues | |||||
#################################### | |||||
def corrupt_object_add( | def corrupt_object_add( | ||||
self, | self, | ||||
id: CoreSWHID, | id: CoreSWHID, | ||||
datastore: Datastore, | datastore: Datastore, | ||||
serialized_object: bytes, | serialized_object: bytes, | ||||
) -> None: | ) -> None: | ||||
datastore_id = self.datastore_get_or_add(datastore) | datastore_id = self.datastore_get_or_add(datastore) | ||||
with self.transaction() as cur: | with self.transaction() as cur: | ||||
▲ Show 20 Lines • Show All 161 Lines • ▼ Show 20 Lines | ) -> List[CorruptObject]: | ||||
start_id=None if start_id is None else str(start_id), | start_id=None if start_id is None else str(start_id), | ||||
end_id=None if end_id is None else str(end_id), | end_id=None if end_id is None else str(end_id), | ||||
origin_url=origin_url, | origin_url=origin_url, | ||||
limit=limit, | limit=limit, | ||||
), | ), | ||||
) | ) | ||||
return self._corrupt_object_list_from_cursor(cur) | return self._corrupt_object_list_from_cursor(cur) | ||||
def missing_object_add( | |||||
self, | |||||
id: CoreSWHID, | |||||
reference_ids: Iterable[CoreSWHID], | |||||
datastore: Datastore, | |||||
) -> None: | |||||
""" | |||||
Adds a "hole" to the inventory, ie. an object missing from a datastore | |||||
that is referenced by an other object of the same datastore. | |||||
If the missing object is already known to be missing by the scrubber database, | |||||
this only records the reference (which can be useful to locate an origin | |||||
to recover the object from). | |||||
If that reference is already known too, this is a noop. | |||||
Args: | |||||
id: SWHID of the missing object (the hole) | |||||
reference_id: SWHID of the object referencing the missing object | |||||
datastore: representation of the swh-storage/swh-journal/... instance | |||||
containing this hole | |||||
""" | |||||
if not reference_ids: | |||||
raise ValueError("reference_ids is empty") | |||||
datastore_id = self.datastore_get_or_add(datastore) | |||||
with self.transaction() as cur: | |||||
cur.execute( | |||||
""" | |||||
INSERT INTO missing_object (id, datastore) | |||||
VALUES (%s, %s) | |||||
ON CONFLICT DO NOTHING | |||||
""", | |||||
(str(id), datastore_id), | |||||
) | |||||
psycopg2.extras.execute_batch( | |||||
cur, | |||||
""" | |||||
INSERT INTO missing_object_reference (missing_id, reference_id, datastore) | |||||
VALUES (%s, %s, %s) | |||||
ON CONFLICT DO NOTHING | |||||
""", | |||||
[ | |||||
(str(id), str(reference_id), datastore_id) | |||||
for reference_id in reference_ids | |||||
], | |||||
) | |||||
def missing_object_iter(self) -> Iterator[MissingObject]: | |||||
anlambert: "Yields all objects missing from datastores." seems a better doctring here. | |||||
Done Inline Actionscopy-pasted too fast ;) vlorentz: copy-pasted too fast ;) | |||||
"""Yields all records in the 'missing_object' table.""" | |||||
with self.transaction() as cur: | |||||
cur.execute( | |||||
""" | |||||
SELECT | |||||
mo.id, mo.first_occurrence, | |||||
ds.package, ds.class, ds.instance | |||||
FROM missing_object AS mo | |||||
INNER JOIN datastore AS ds ON (ds.id=mo.datastore) | |||||
""" | |||||
) | |||||
for row in cur: | |||||
(id, first_occurrence, ds_package, ds_class, ds_instance) = row | |||||
yield MissingObject( | |||||
id=CoreSWHID.from_string(id), | |||||
first_occurrence=first_occurrence, | |||||
datastore=Datastore( | |||||
package=ds_package, cls=ds_class, instance=ds_instance | |||||
), | |||||
) | |||||
def missing_object_reference_iter( | |||||
self, missing_id: CoreSWHID | |||||
) -> Iterator[MissingObjectReference]: | |||||
Done Inline Actions"Yields all referenced objects missing from datastores." seems a better doctring here. anlambert: "Yields all referenced objects missing from datastores." seems a better doctring here. | |||||
"""Yields all records in the 'missing_object_reference' table.""" | |||||
with self.transaction() as cur: | |||||
cur.execute( | |||||
""" | |||||
SELECT | |||||
mor.reference_id, mor.first_occurrence, | |||||
ds.package, ds.class, ds.instance | |||||
FROM missing_object_reference AS mor | |||||
INNER JOIN datastore AS ds ON (ds.id=mor.datastore) | |||||
WHERE mor.missing_id=%s | |||||
""", | |||||
(str(missing_id),), | |||||
) | |||||
for row in cur: | |||||
( | |||||
reference_id, | |||||
first_occurrence, | |||||
ds_package, | |||||
ds_class, | |||||
ds_instance, | |||||
) = row | |||||
yield MissingObjectReference( | |||||
missing_id=missing_id, | |||||
reference_id=CoreSWHID.from_string(reference_id), | |||||
first_occurrence=first_occurrence, | |||||
datastore=Datastore( | |||||
package=ds_package, cls=ds_class, instance=ds_instance | |||||
), | |||||
) | |||||
#################################### | |||||
# Issue resolution | |||||
#################################### | |||||
def object_origin_add( | def object_origin_add( | ||||
self, cur: psycopg2.extensions.cursor, swhid: CoreSWHID, origins: List[str] | self, cur: psycopg2.extensions.cursor, swhid: CoreSWHID, origins: List[str] | ||||
) -> None: | ) -> None: | ||||
psycopg2.extras.execute_values( | psycopg2.extras.execute_values( | ||||
cur, | cur, | ||||
""" | """ | ||||
INSERT INTO object_origin (object_id, origin_url) | INSERT INTO object_origin (object_id, origin_url) | ||||
VALUES %s | VALUES %s | ||||
▲ Show 20 Lines • Show All 56 Lines • Show Last 20 Lines |
"Yields all objects missing from datastores." seems a better doctring here.