Page MenuHomeSoftware Heritage

D8608.id31210.diff
No OneTemporary

D8608.id31210.diff

diff --git a/swh/scrubber/db.py b/swh/scrubber/db.py
--- a/swh/scrubber/db.py
+++ b/swh/scrubber/db.py
@@ -7,7 +7,7 @@
import dataclasses
import datetime
import functools
-from typing import Iterable, Iterator, List, Optional
+from typing import Iterable, Iterator, List, Optional, Tuple
import psycopg2
@@ -60,7 +60,7 @@
class ScrubberDb(BaseDb):
- current_version = 3
+ current_version = 4
####################################
# Shared tables
@@ -98,6 +98,81 @@
(id_,) = res
return id_
+ ####################################
+ # Checkpointing/progress tracking
+ ####################################
+
+ def checked_range_upsert(
+ self,
+ datastore: Datastore,
+ range_start: CoreSWHID,
+ range_end: CoreSWHID,
+ date: datetime.datetime,
+ ) -> None:
+ """
+ Records in the database the given range was last checked at the given date.
+ """
+ datastore_id = self.datastore_get_or_add(datastore)
+ with self.transaction() as cur:
+ cur.execute(
+ """
+ INSERT INTO checked_range(datastore, range_start, range_end, last_date)
+ VALUES (%s, %s, %s, %s)
+ ON CONFLICT (datastore, range_start, range_end) DO UPDATE
+ SET last_date = GREATEST(checked_range.last_date, EXCLUDED.last_date)
+ """,
+ (datastore_id, str(range_start), str(range_end), date),
+ )
+
+ def checked_range_get_last_date(
+ self, datastore: Datastore, range_start: CoreSWHID, range_end: CoreSWHID
+ ) -> Optional[datetime.datetime]:
+ """
+ Returns the last date the given range was checked in the given datastore,
+ or :const:`None` if it was never checked.
+
+ Currently, this checks range boundaries exactly, with no regard for ranges
+ that contain or are contained by it.
+ """
+ datastore_id = self.datastore_get_or_add(datastore)
+ with self.transaction() as cur:
+ cur.execute(
+ """
+ SELECT last_date
+ FROM checked_range
+ WHERE datastore=%s AND range_start=%s AND range_end=%s
+ """,
+ (datastore_id, str(range_start), str(range_end)),
+ )
+
+ res = cur.fetchone()
+ if res is None:
+ return None
+ else:
+ (date,) = res
+ return date
+
+ def checked_range_iter(
+ self, datastore: Datastore
+ ) -> Iterator[Tuple[CoreSWHID, CoreSWHID, datetime.datetime]]:
+ datastore_id = self.datastore_get_or_add(datastore)
+ with self.transaction() as cur:
+ cur.execute(
+ """
+ SELECT range_start, range_end, last_date
+ FROM checked_range
+ WHERE datastore=%s
+ """,
+ (datastore_id,),
+ )
+
+ for (range_start, range_end, last_date) in cur:
+ yield (
+ CoreSWHID.from_string(range_start),
+ CoreSWHID.from_string(range_end),
+ last_date,
+ )
+
####################################
# Inventory of objects with issues
####################################
diff --git a/swh/scrubber/sql/30-schema.sql b/swh/scrubber/sql/30-schema.sql
--- a/swh/scrubber/sql/30-schema.sql
+++ b/swh/scrubber/sql/30-schema.sql
@@ -19,6 +19,23 @@
comment on column datastore.instance is 'Human-readable way to uniquely identify the datastore; eg. its URL or DSN.';
+-------------------------------------
+-- Checkpointing/progress tracking
+-------------------------------------
+
+create table checked_range
+(
+ datastore int not null,
+ range_start swhid not null,
+ range_end swhid not null,
+ last_date timestamptz not null
+);
+
+comment on table checked_range is 'Each row represents a range of objects in a datastore that were fetched, checksumed, and checked at some point in the past.';
+comment on column checked_range.range_start is 'First SWHID of the range that was checked (inclusive, possibly non-existent).';
+comment on column checked_range.range_end is 'Last SWHID of the range that was checked (inclusive, possiby non-existent).';
+comment on column checked_range.last_date is 'Date the last scrub of that range *started*.';
+
-------------------------------------
-- Inventory of objects with issues
-------------------------------------
diff --git a/swh/scrubber/sql/60-indexes.sql b/swh/scrubber/sql/60-indexes.sql
--- a/swh/scrubber/sql/60-indexes.sql
+++ b/swh/scrubber/sql/60-indexes.sql
@@ -9,6 +9,12 @@
create unique index concurrently datastore_package_class_instance on datastore(package, class, instance);
+-------------------------------------
+-- Checkpointing/progress tracking
+-------------------------------------
+
+create unique index concurrently checked_range_pkey on checked_range(datastore, range_start, range_end);
+alter table checked_range add primary key using index checked_range_pkey;
-------------------------------------
-- Inventory of objects with issues
diff --git a/swh/scrubber/sql/upgrades/4.sql b/swh/scrubber/sql/upgrades/4.sql
new file mode 100644
--- /dev/null
+++ b/swh/scrubber/sql/upgrades/4.sql
@@ -0,0 +1,21 @@
+-- SWH Scrubber DB schema upgrade
+-- from_version: 3
+-- to_version: 4
+-- description: Add checked_range
+
+
+create table checked_range
+(
+ datastore int not null,
+ range_start swhid not null,
+ range_end swhid not null,
+ last_date timestamptz not null
+);
+
+comment on table checked_range is 'Each row represents a range of objects in a datastore that were fetched, checksumed, and checked at some point in the past.';
+comment on column checked_range.range_start is 'First SWHID of the range that was checked (inclusive, possibly non-existent).';
+comment on column checked_range.range_end is 'Last SWHID of the range that was checked (inclusive, possiby non-existent).';
+comment on column checked_range.last_date is 'Date the last scrub of that range *started*.';
+
+create unique index concurrently checked_range_pkey on checked_range(datastore, range_start, range_end);
+alter table checked_range add primary key using index checked_range_pkey;
diff --git a/swh/scrubber/tests/test_db.py b/swh/scrubber/tests/test_db.py
new file mode 100644
--- /dev/null
+++ b/swh/scrubber/tests/test_db.py
@@ -0,0 +1,58 @@
+# Copyright (C) 2022 The Software Heritage developers
+# See the AUTHORS file at the top-level directory of this distribution
+# License: GNU General Public License version 3, or any later version
+# See top-level LICENSE file for more information
+
+import datetime
+
+from swh.model import swhids
+from swh.scrubber.db import Datastore, ScrubberDb
+
+DATASTORE = Datastore(package="storage", cls="postgresql", instance="service=swh-test")
+SNP_SWHID1 = swhids.CoreSWHID.from_string(
+ "swh:1:snp:5000000000000000000000000000000000000000"
+)
+SNP_SWHID2 = swhids.CoreSWHID.from_string(
+ "swh:1:snp:e000000000000000000000000000000000000000"
+)
+DATE = datetime.datetime(2022, 10, 4, 12, 1, 23, tzinfo=datetime.timezone.utc)
+
+
+def test_checked_range_insert(scrubber_db: ScrubberDb):
+ scrubber_db.checked_range_upsert(DATASTORE, SNP_SWHID1, SNP_SWHID2, DATE)
+
+ assert list(scrubber_db.checked_range_iter(DATASTORE)) == [
+ (SNP_SWHID1, SNP_SWHID2, DATE)
+ ]
+
+
+def test_checked_range_update(scrubber_db: ScrubberDb):
+ scrubber_db.checked_range_upsert(DATASTORE, SNP_SWHID1, SNP_SWHID2, DATE)
+
+ date2 = DATE + datetime.timedelta(days=1)
+ scrubber_db.checked_range_upsert(DATASTORE, SNP_SWHID1, SNP_SWHID2, date2)
+
+ assert list(scrubber_db.checked_range_iter(DATASTORE)) == [
+ (SNP_SWHID1, SNP_SWHID2, date2)
+ ]
+
+ date3 = DATE + datetime.timedelta(days=-1)
+ scrubber_db.checked_range_upsert(DATASTORE, SNP_SWHID1, SNP_SWHID2, date3)
+
+ assert list(scrubber_db.checked_range_iter(DATASTORE)) == [
+ (SNP_SWHID1, SNP_SWHID2, date2) # newest date wins
+ ]
+
+
+def test_checked_range_get(scrubber_db: ScrubberDb):
+ assert (
+ scrubber_db.checked_range_get_last_date(DATASTORE, SNP_SWHID1, SNP_SWHID2)
+ is None
+ )
+
+ scrubber_db.checked_range_upsert(DATASTORE, SNP_SWHID1, SNP_SWHID2, DATE)
+
+ assert (
+ scrubber_db.checked_range_get_last_date(DATASTORE, SNP_SWHID1, SNP_SWHID2)
+ == DATE
+ )

File Metadata

Mime Type
text/plain
Expires
Dec 20 2024, 6:31 PM (11 w, 4 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3214522

Event Timeline