diff --git a/sql/upgrades/168.sql b/sql/upgrades/168.sql new file mode 100644 --- /dev/null +++ b/sql/upgrades/168.sql @@ -0,0 +1,24 @@ +-- SWH DB schema upgrade +-- from_version: 167 +-- to_version: 168 +-- description: add raw_extrinsic_metadata.id + +insert into dbversion(version, release, description) + values(168, now(), 'Work In Progress'); + +-- 1. add the 'id' column + +alter table raw_extrinsic_metadata + add column id sha1_git; + +-- 2. restart swh-storage, so that it starts writing the id (but does not read it) + +-- 3. run swh storage migrate-168 + +-- 4. make the id column not null, and index it + +alter table raw_extrinsic_metadata + alter column id set not null; + +create unique index concurrently raw_extrinsic_metadata_pkey on raw_extrinsic_metadata(id); +alter table raw_extrinsic_metadata add primary key using index raw_extrinsic_metadata_pkey; diff --git a/swh/storage/cli.py b/swh/storage/cli.py --- a/swh/storage/cli.py +++ b/swh/storage/cli.py @@ -201,6 +201,88 @@ client.close() +@storage.command() +@click.pass_context +def migrate_168(ctx): + """Fills the 'id' column of raw_extrinsic_metadata in a postgresql backend. + + Uses the journal as input (instead of SELECT) so it is parallelizable and + stop/resumable. + """ + import psycopg2.extras + + from swh.journal.client import get_journal_client + from swh.model.hashutil import hash_to_bytes + from swh.model.identifiers import raw_extrinsic_metadata_identifier + from swh.storage import get_storage + from swh.storage.postgresql.storage import Storage as PostgresqlStorage + + ensure_check_config(ctx.obj["config"], ctx.obj["check_config"], "write") + + conf = ctx.obj["config"] + storage = get_storage(**conf.pop("storage")) + + if not isinstance(storage, PostgresqlStorage): + raise click.Error("storage must has 'cls: local'.") + + conn = storage.db.conn + + client_cfg = conf.pop("journal_client") + try: + client = get_journal_client(**client_cfg) + except ValueError as exc: + ctx.fail(exc) + + query = """ + UPDATE raw_extrinsic_metadata + SET id=? + FROM authority, fetcher + WHERE + authority.id=raw_extrinsic_metadata.authority_id + fetcher.id=raw_extrinsic_metadata.fetcher_id + AND authority.type=? + AND authority.url=? + AND discovery_date=? + AND fetcher.name=? + AND fetcher.version=? + """ + + def worker_fn(all_objects): + assert set(all_objects) == {"raw_extrinsic_metadata"} + rows = [] + for obj in all_objects["raw_extrinsic_metadata"]: + rows.append( + ( + hash_to_bytes(raw_extrinsic_metadata_identifier(obj)), + obj["target"], + obj["authority"]["type"], + obj["authority"]["url"], + obj["discovery_date"], + obj["fetcher"]["name"], + obj["fetcher"]["version"], + ) + ) + + with conn.cursor() as cur: + psycopg2.extras.execute_batch(cur, query, rows) + + conn.commit() + + if notify: + notify("READY=1") + + try: + client.process(worker_fn) + except KeyboardInterrupt: + ctx.exit(0) + else: + print("Done.") + finally: + if notify: + notify("STOPPING=1") + client.close() + + def ensure_check_config(storage_cfg: Dict, check_config: Optional[str], default: str): """Helper function to inject the setting of check_config option in the storage config dict according to the expected default value (default value depends on the command, diff --git a/swh/storage/postgresql/db.py b/swh/storage/postgresql/db.py --- a/swh/storage/postgresql/db.py +++ b/swh/storage/postgresql/db.py @@ -28,7 +28,7 @@ """ - current_version = 166 + current_version = 168 def mktemp_dir_entry(self, entry_type, cur=None): self._cursor(cur).execute( @@ -1136,6 +1136,7 @@ """The list of context columns for all artifact types.""" _raw_extrinsic_metadata_insert_cols = [ + "id", "type", "target", "authority_id", @@ -1184,6 +1185,7 @@ def raw_extrinsic_metadata_add( self, + id: bytes, type: str, target: str, discovery_date: datetime.datetime, @@ -1202,6 +1204,7 @@ ): query = self._raw_extrinsic_metadata_insert_query args: Dict[str, Any] = dict( + id=id, type=type, target=target, authority_id=authority_id, diff --git a/swh/storage/postgresql/storage.py b/swh/storage/postgresql/storage.py --- a/swh/storage/postgresql/storage.py +++ b/swh/storage/postgresql/storage.py @@ -1256,6 +1256,7 @@ fetcher_id = self._get_fetcher_id(metadata_entry.fetcher, db, cur) db.raw_extrinsic_metadata_add( + id=metadata_entry.id, type=metadata_entry.type.value, target=str(metadata_entry.target), discovery_date=metadata_entry.discovery_date, diff --git a/swh/storage/sql/30-schema.sql b/swh/storage/sql/30-schema.sql --- a/swh/storage/sql/30-schema.sql +++ b/swh/storage/sql/30-schema.sql @@ -17,7 +17,7 @@ -- latest schema version insert into dbversion(version, release, description) - values(166, now(), 'Work In Progress'); + values(168, now(), 'Work In Progress'); -- a SHA1 checksum create domain sha1 as bytea check (length(value) = 20); @@ -430,6 +430,8 @@ -- Extrinsic metadata on a DAG objects and origins. create table raw_extrinsic_metadata ( + id sha1_git not null, + type text not null, target text not null, diff --git a/swh/storage/sql/60-indexes.sql b/swh/storage/sql/60-indexes.sql --- a/swh/storage/sql/60-indexes.sql +++ b/swh/storage/sql/60-indexes.sql @@ -264,6 +264,10 @@ -- raw_extrinsic_metadata + +create unique index concurrently raw_extrinsic_metadata_pkey on raw_extrinsic_metadata(id); +alter table raw_extrinsic_metadata add primary key using index raw_extrinsic_metadata_pkey; + create unique index concurrently raw_extrinsic_metadata_content_authority_date_fetcher on raw_extrinsic_metadata(target, authority_id, discovery_date, fetcher_id); \if :dbflavor_default