diff --git a/swh/scrubber/db.py b/swh/scrubber/db.py --- a/swh/scrubber/db.py +++ b/swh/scrubber/db.py @@ -7,7 +7,7 @@ import dataclasses import datetime import functools -from typing import Iterable, Iterator, List, Optional +from typing import Iterable, Iterator, List, Optional, Tuple import psycopg2 @@ -60,7 +60,7 @@ class ScrubberDb(BaseDb): - current_version = 3 + current_version = 4 #################################### # Shared tables @@ -98,6 +98,81 @@ (id_,) = res return id_ + #################################### + # Checkpointing/progress tracking + #################################### + + def checked_range_upsert( + self, + datastore: Datastore, + range_start: CoreSWHID, + range_end: CoreSWHID, + date: datetime.datetime, + ) -> None: + """ + Records in the database the given range was last checked at the given date. + """ + datastore_id = self.datastore_get_or_add(datastore) + with self.transaction() as cur: + cur.execute( + """ + INSERT INTO checked_range(datastore, range_start, range_end, last_date) + VALUES (%s, %s, %s, %s) + ON CONFLICT (datastore, range_start, range_end) DO UPDATE + SET last_date = GREATEST(checked_range.last_date, EXCLUDED.last_date) + """, + (datastore_id, str(range_start), str(range_end), date), + ) + + def checked_range_get_last_date( + self, datastore: Datastore, range_start: CoreSWHID, range_end: CoreSWHID + ) -> Optional[datetime.datetime]: + """ + Returns the last date the given range was checked in the given datastore, + or :const:`None` if it was never checked. + + Currently, this checks range boundaries exactly, with no regard for ranges + that contain or are contained by it. + """ + datastore_id = self.datastore_get_or_add(datastore) + with self.transaction() as cur: + cur.execute( + """ + SELECT last_date + FROM checked_range + WHERE datastore=%s AND range_start=%s AND range_end=%s + """, + (datastore_id, str(range_start), str(range_end)), + ) + + res = cur.fetchone() + if res is None: + return None + else: + (date,) = res + return date + + def checked_range_iter( + self, datastore: Datastore + ) -> Iterator[Tuple[CoreSWHID, CoreSWHID, datetime.datetime]]: + datastore_id = self.datastore_get_or_add(datastore) + with self.transaction() as cur: + cur.execute( + """ + SELECT range_start, range_end, last_date + FROM checked_range + WHERE datastore=%s + """, + (datastore_id,), + ) + + for (range_start, range_end, last_date) in cur: + yield ( + CoreSWHID.from_string(range_start), + CoreSWHID.from_string(range_end), + last_date, + ) + #################################### # Inventory of objects with issues #################################### diff --git a/swh/scrubber/sql/30-schema.sql b/swh/scrubber/sql/30-schema.sql --- a/swh/scrubber/sql/30-schema.sql +++ b/swh/scrubber/sql/30-schema.sql @@ -19,6 +19,23 @@ comment on column datastore.instance is 'Human-readable way to uniquely identify the datastore; eg. its URL or DSN.'; +------------------------------------- +-- Checkpointing/progress tracking +------------------------------------- + +create table checked_range +( + datastore int not null, + range_start swhid not null, + range_end swhid not null, + last_date timestamptz not null +); + +comment on table checked_range is 'Each row represents a range of objects in a datastore that were fetched, checksumed, and checked at some point in the past.'; +comment on column checked_range.range_start is 'First SWHID of the range that was checked (inclusive, possibly non-existent).'; +comment on column checked_range.range_end is 'Last SWHID of the range that was checked (inclusive, possiby non-existent).'; +comment on column checked_range.last_date is 'Date the last scrub of that range *started*.'; + ------------------------------------- -- Inventory of objects with issues ------------------------------------- diff --git a/swh/scrubber/sql/60-indexes.sql b/swh/scrubber/sql/60-indexes.sql --- a/swh/scrubber/sql/60-indexes.sql +++ b/swh/scrubber/sql/60-indexes.sql @@ -9,6 +9,12 @@ create unique index concurrently datastore_package_class_instance on datastore(package, class, instance); +------------------------------------- +-- Checkpointing/progress tracking +------------------------------------- + +create unique index concurrently checked_range_pkey on checked_range(datastore, range_start, range_end); +alter table checked_range add primary key using index checked_range_pkey; ------------------------------------- -- Inventory of objects with issues diff --git a/swh/scrubber/sql/upgrades/4.sql b/swh/scrubber/sql/upgrades/4.sql new file mode 100644 --- /dev/null +++ b/swh/scrubber/sql/upgrades/4.sql @@ -0,0 +1,21 @@ +-- SWH Scrubber DB schema upgrade +-- from_version: 3 +-- to_version: 4 +-- description: Add checked_range + + +create table checked_range +( + datastore int not null, + range_start swhid not null, + range_end swhid not null, + last_date timestamptz not null +); + +comment on table checked_range is 'Each row represents a range of objects in a datastore that were fetched, checksumed, and checked at some point in the past.'; +comment on column checked_range.range_start is 'First SWHID of the range that was checked (inclusive, possibly non-existent).'; +comment on column checked_range.range_end is 'Last SWHID of the range that was checked (inclusive, possiby non-existent).'; +comment on column checked_range.last_date is 'Date the last scrub of that range *started*.'; + +create unique index concurrently checked_range_pkey on checked_range(datastore, range_start, range_end); +alter table checked_range add primary key using index checked_range_pkey; diff --git a/swh/scrubber/tests/test_db.py b/swh/scrubber/tests/test_db.py new file mode 100644 --- /dev/null +++ b/swh/scrubber/tests/test_db.py @@ -0,0 +1,58 @@ +# Copyright (C) 2022 The Software Heritage developers +# See the AUTHORS file at the top-level directory of this distribution +# License: GNU General Public License version 3, or any later version +# See top-level LICENSE file for more information + +import datetime + +from swh.model import swhids +from swh.scrubber.db import Datastore, ScrubberDb + +DATASTORE = Datastore(package="storage", cls="postgresql", instance="service=swh-test") +SNP_SWHID1 = swhids.CoreSWHID.from_string( + "swh:1:snp:5000000000000000000000000000000000000000" +) +SNP_SWHID2 = swhids.CoreSWHID.from_string( + "swh:1:snp:e000000000000000000000000000000000000000" +) +DATE = datetime.datetime(2022, 10, 4, 12, 1, 23, tzinfo=datetime.timezone.utc) + + +def test_checked_range_insert(scrubber_db: ScrubberDb): + scrubber_db.checked_range_upsert(DATASTORE, SNP_SWHID1, SNP_SWHID2, DATE) + + assert list(scrubber_db.checked_range_iter(DATASTORE)) == [ + (SNP_SWHID1, SNP_SWHID2, DATE) + ] + + +def test_checked_range_update(scrubber_db: ScrubberDb): + scrubber_db.checked_range_upsert(DATASTORE, SNP_SWHID1, SNP_SWHID2, DATE) + + date2 = DATE + datetime.timedelta(days=1) + scrubber_db.checked_range_upsert(DATASTORE, SNP_SWHID1, SNP_SWHID2, date2) + + assert list(scrubber_db.checked_range_iter(DATASTORE)) == [ + (SNP_SWHID1, SNP_SWHID2, date2) + ] + + date3 = DATE + datetime.timedelta(days=-1) + scrubber_db.checked_range_upsert(DATASTORE, SNP_SWHID1, SNP_SWHID2, date3) + + assert list(scrubber_db.checked_range_iter(DATASTORE)) == [ + (SNP_SWHID1, SNP_SWHID2, date2) # newest date wins + ] + + +def test_checked_range_get(scrubber_db: ScrubberDb): + assert ( + scrubber_db.checked_range_get_last_date(DATASTORE, SNP_SWHID1, SNP_SWHID2) + is None + ) + + scrubber_db.checked_range_upsert(DATASTORE, SNP_SWHID1, SNP_SWHID2, DATE) + + assert ( + scrubber_db.checked_range_get_last_date(DATASTORE, SNP_SWHID1, SNP_SWHID2) + == DATE + )