diff --git a/sql/upgrades/163.sql b/sql/upgrades/163.sql new file mode 100644 index 00000000..09798523 --- /dev/null +++ b/sql/upgrades/163.sql @@ -0,0 +1,29 @@ +-- SWH DB schema upgrade +-- from_version: 162 +-- to_version: 163 +-- description: Register database flavor scaffolding + +insert into dbversion(version, release, description) + values(163, now(), 'Work In Progress'); +-- database flavor +create type database_flavor as enum ( + 'default', -- default: full index availability for deduplication and read queries + 'mirror', -- mirror: reduced indexes to allow for out of order insertions + 'read_replica' -- read replica: minimal indexes to allow read queries +); +comment on type database_flavor is 'Flavor of the current database'; + +create table dbflavor ( + flavor database_flavor, + single_row char(1) primary key default 'x', + check (single_row = 'x') +); +comment on table dbflavor is 'Database flavor storage'; +comment on column dbflavor.flavor is 'Database flavor currently deployed'; +comment on column dbflavor.single_row is 'Bogus column to force the table to have a single row'; + +create or replace function swh_get_dbflavor() returns database_flavor language sql stable as $$ + select coalesce((select flavor from dbflavor), 'default'); +$$; + +comment on function swh_get_dbflavor is 'Get the flavor of the database currently deployed'; diff --git a/swh/storage/postgresql/db.py b/swh/storage/postgresql/db.py index 01ae3b4f..709df519 100644 --- a/swh/storage/postgresql/db.py +++ b/swh/storage/postgresql/db.py @@ -1,1354 +1,1354 @@ # Copyright (C) 2015-2020 The Software Heritage developers # See the AUTHORS file at the top-level directory of this distribution # License: GNU General Public License version 3, or any later version # See top-level LICENSE file for more information import datetime import logging import random import select from typing import Any, Dict, Iterable, List, Optional, Tuple from swh.core.db import BaseDb from swh.core.db.db_utils import execute_values_generator from swh.core.db.db_utils import jsonize as _jsonize from swh.core.db.db_utils import stored_procedure from swh.model.model import SHA1_SIZE, OriginVisit, OriginVisitStatus from swh.storage.interface import ListOrder logger = logging.getLogger(__name__) def jsonize(d): return _jsonize(dict(d) if d is not None else None) class Db(BaseDb): """Proxy to the SWH DB, with wrappers around stored procedures """ - current_version = 162 + current_version = 163 def mktemp_dir_entry(self, entry_type, cur=None): self._cursor(cur).execute( "SELECT swh_mktemp_dir_entry(%s)", (("directory_entry_%s" % entry_type),) ) @stored_procedure("swh_mktemp_revision") def mktemp_revision(self, cur=None): pass @stored_procedure("swh_mktemp_release") def mktemp_release(self, cur=None): pass @stored_procedure("swh_mktemp_snapshot_branch") def mktemp_snapshot_branch(self, cur=None): pass def register_listener(self, notify_queue, cur=None): """Register a listener for NOTIFY queue `notify_queue`""" self._cursor(cur).execute("LISTEN %s" % notify_queue) def listen_notifies(self, timeout): """Listen to notifications for `timeout` seconds""" if select.select([self.conn], [], [], timeout) == ([], [], []): return else: self.conn.poll() while self.conn.notifies: yield self.conn.notifies.pop(0) @stored_procedure("swh_content_add") def content_add_from_temp(self, cur=None): pass @stored_procedure("swh_directory_add") def directory_add_from_temp(self, cur=None): pass @stored_procedure("swh_skipped_content_add") def skipped_content_add_from_temp(self, cur=None): pass @stored_procedure("swh_revision_add") def revision_add_from_temp(self, cur=None): pass @stored_procedure("swh_release_add") def release_add_from_temp(self, cur=None): pass def content_update_from_temp(self, keys_to_update, cur=None): cur = self._cursor(cur) cur.execute( """select swh_content_update(ARRAY[%s] :: text[])""" % keys_to_update ) content_get_metadata_keys = [ "sha1", "sha1_git", "sha256", "blake2s256", "length", "status", ] content_add_keys = content_get_metadata_keys + ["ctime"] skipped_content_keys = [ "sha1", "sha1_git", "sha256", "blake2s256", "length", "reason", "status", "origin", ] def content_get_metadata_from_sha1s(self, sha1s, cur=None): cur = self._cursor(cur) yield from execute_values_generator( cur, """ select t.sha1, %s from (values %%s) as t (sha1) inner join content using (sha1) """ % ", ".join(self.content_get_metadata_keys[1:]), ((sha1,) for sha1 in sha1s), ) def content_get_range(self, start, end, limit=None, cur=None): """Retrieve contents within range [start, end]. """ cur = self._cursor(cur) query = """select %s from content where %%s <= sha1 and sha1 <= %%s order by sha1 limit %%s""" % ", ".join( self.content_get_metadata_keys ) cur.execute(query, (start, end, limit)) yield from cur content_hash_keys = ["sha1", "sha1_git", "sha256", "blake2s256"] def content_missing_from_list(self, contents, cur=None): cur = self._cursor(cur) keys = ", ".join(self.content_hash_keys) equality = " AND ".join( ("t.%s = c.%s" % (key, key)) for key in self.content_hash_keys ) yield from execute_values_generator( cur, """ SELECT %s FROM (VALUES %%s) as t(%s) WHERE NOT EXISTS ( SELECT 1 FROM content c WHERE %s ) """ % (keys, keys, equality), (tuple(c[key] for key in self.content_hash_keys) for c in contents), ) def content_missing_per_sha1(self, sha1s, cur=None): cur = self._cursor(cur) yield from execute_values_generator( cur, """ SELECT t.sha1 FROM (VALUES %s) AS t(sha1) WHERE NOT EXISTS ( SELECT 1 FROM content c WHERE c.sha1 = t.sha1 )""", ((sha1,) for sha1 in sha1s), ) def content_missing_per_sha1_git(self, contents, cur=None): cur = self._cursor(cur) yield from execute_values_generator( cur, """ SELECT t.sha1_git FROM (VALUES %s) AS t(sha1_git) WHERE NOT EXISTS ( SELECT 1 FROM content c WHERE c.sha1_git = t.sha1_git )""", ((sha1,) for sha1 in contents), ) def skipped_content_missing(self, contents, cur=None): if not contents: return [] cur = self._cursor(cur) query = """SELECT * FROM (VALUES %s) AS t (%s) WHERE not exists (SELECT 1 FROM skipped_content s WHERE s.sha1 is not distinct from t.sha1::sha1 and s.sha1_git is not distinct from t.sha1_git::sha1 and s.sha256 is not distinct from t.sha256::bytea);""" % ( (", ".join("%s" for _ in contents)), ", ".join(self.content_hash_keys), ) cur.execute( query, [tuple(cont[key] for key in self.content_hash_keys) for cont in contents], ) yield from cur def snapshot_exists(self, snapshot_id, cur=None): """Check whether a snapshot with the given id exists""" cur = self._cursor(cur) cur.execute("""SELECT 1 FROM snapshot where id=%s""", (snapshot_id,)) return bool(cur.fetchone()) def snapshot_missing_from_list(self, snapshots, cur=None): cur = self._cursor(cur) yield from execute_values_generator( cur, """ SELECT id FROM (VALUES %s) as t(id) WHERE NOT EXISTS ( SELECT 1 FROM snapshot d WHERE d.id = t.id ) """, ((id,) for id in snapshots), ) def snapshot_add(self, snapshot_id, cur=None): """Add a snapshot from the temporary table""" cur = self._cursor(cur) cur.execute("""SELECT swh_snapshot_add(%s)""", (snapshot_id,)) snapshot_count_cols = ["target_type", "count"] def snapshot_count_branches(self, snapshot_id, cur=None): cur = self._cursor(cur) query = """\ SELECT %s FROM swh_snapshot_count_branches(%%s) """ % ", ".join( self.snapshot_count_cols ) cur.execute(query, (snapshot_id,)) yield from cur snapshot_get_cols = ["snapshot_id", "name", "target", "target_type"] def snapshot_get_by_id( self, snapshot_id, branches_from=b"", branches_count=None, target_types=None, cur=None, ): cur = self._cursor(cur) query = """\ SELECT %s FROM swh_snapshot_get_by_id(%%s, %%s, %%s, %%s :: snapshot_target[]) """ % ", ".join( self.snapshot_get_cols ) cur.execute(query, (snapshot_id, branches_from, branches_count, target_types)) yield from cur def snapshot_get_random(self, cur=None): return self._get_random_row_from_table("snapshot", ["id"], "id", cur) content_find_cols = [ "sha1", "sha1_git", "sha256", "blake2s256", "length", "ctime", "status", ] def content_find( self, sha1: Optional[bytes] = None, sha1_git: Optional[bytes] = None, sha256: Optional[bytes] = None, blake2s256: Optional[bytes] = None, cur=None, ): """Find the content optionally on a combination of the following checksums sha1, sha1_git, sha256 or blake2s256. Args: sha1: sha1 content git_sha1: the sha1 computed `a la git` sha1 of the content sha256: sha256 content blake2s256: blake2s256 content Returns: The tuple (sha1, sha1_git, sha256, blake2s256) if found or None. """ cur = self._cursor(cur) checksum_dict = { "sha1": sha1, "sha1_git": sha1_git, "sha256": sha256, "blake2s256": blake2s256, } query_parts = [f"SELECT {','.join(self.content_find_cols)} FROM content WHERE "] query_params = [] where_parts = [] # Adds only those keys which have values exist for algorithm in checksum_dict: if checksum_dict[algorithm] is not None: where_parts.append(f"{algorithm} = %s") query_params.append(checksum_dict[algorithm]) query_parts.append(" AND ".join(where_parts)) query = "\n".join(query_parts) cur.execute(query, query_params) content = cur.fetchall() return content def content_get_random(self, cur=None): return self._get_random_row_from_table("content", ["sha1_git"], "sha1_git", cur) def directory_missing_from_list(self, directories, cur=None): cur = self._cursor(cur) yield from execute_values_generator( cur, """ SELECT id FROM (VALUES %s) as t(id) WHERE NOT EXISTS ( SELECT 1 FROM directory d WHERE d.id = t.id ) """, ((id,) for id in directories), ) directory_ls_cols = [ "dir_id", "type", "target", "name", "perms", "status", "sha1", "sha1_git", "sha256", "length", ] def directory_walk_one(self, directory, cur=None): cur = self._cursor(cur) cols = ", ".join(self.directory_ls_cols) query = "SELECT %s FROM swh_directory_walk_one(%%s)" % cols cur.execute(query, (directory,)) yield from cur def directory_walk(self, directory, cur=None): cur = self._cursor(cur) cols = ", ".join(self.directory_ls_cols) query = "SELECT %s FROM swh_directory_walk(%%s)" % cols cur.execute(query, (directory,)) yield from cur def directory_entry_get_by_path(self, directory, paths, cur=None): """Retrieve a directory entry by path. """ cur = self._cursor(cur) cols = ", ".join(self.directory_ls_cols) query = "SELECT %s FROM swh_find_directory_entry_by_path(%%s, %%s)" % cols cur.execute(query, (directory, paths)) data = cur.fetchone() if set(data) == {None}: return None return data def directory_get_random(self, cur=None): return self._get_random_row_from_table("directory", ["id"], "id", cur) def revision_missing_from_list(self, revisions, cur=None): cur = self._cursor(cur) yield from execute_values_generator( cur, """ SELECT id FROM (VALUES %s) as t(id) WHERE NOT EXISTS ( SELECT 1 FROM revision r WHERE r.id = t.id ) """, ((id,) for id in revisions), ) revision_add_cols = [ "id", "date", "date_offset", "date_neg_utc_offset", "committer_date", "committer_date_offset", "committer_date_neg_utc_offset", "type", "directory", "message", "author_fullname", "author_name", "author_email", "committer_fullname", "committer_name", "committer_email", "metadata", "synthetic", "extra_headers", ] revision_get_cols = revision_add_cols + ["parents"] def origin_visit_add(self, origin, ts, type, cur=None): """Add a new origin_visit for origin origin at timestamp ts. Args: origin: origin concerned by the visit ts: the date of the visit type: type of loader for the visit Returns: The new visit index step for that origin """ cur = self._cursor(cur) self._cursor(cur).execute( "SELECT swh_origin_visit_add(%s, %s, %s)", (origin, ts, type) ) return cur.fetchone()[0] origin_visit_status_cols = [ "origin", "visit", "date", "status", "snapshot", "metadata", ] def origin_visit_status_add( self, visit_status: OriginVisitStatus, cur=None ) -> None: """Add new origin visit status """ assert self.origin_visit_status_cols[0] == "origin" assert self.origin_visit_status_cols[-1] == "metadata" cols = self.origin_visit_status_cols[1:-1] cur = self._cursor(cur) cur.execute( f"WITH origin_id as (select id from origin where url=%s) " f"INSERT INTO origin_visit_status " f"(origin, {', '.join(cols)}, metadata) " f"VALUES ((select id from origin_id), " f"{', '.join(['%s']*len(cols))}, %s) " f"ON CONFLICT (origin, visit, date) do nothing", [visit_status.origin] + [getattr(visit_status, key) for key in cols] + [jsonize(visit_status.metadata)], ) origin_visit_cols = ["origin", "visit", "date", "type"] def origin_visit_add_with_id(self, origin_visit: OriginVisit, cur=None) -> None: """Insert origin visit when id are already set """ ov = origin_visit assert ov.visit is not None cur = self._cursor(cur) query = """INSERT INTO origin_visit ({cols}) VALUES ((select id from origin where url=%s), {values}) ON CONFLICT (origin, visit) DO NOTHING""".format( cols=", ".join(self.origin_visit_cols), values=", ".join("%s" for col in self.origin_visit_cols[1:]), ) cur.execute(query, (ov.origin, ov.visit, ov.date, ov.type)) origin_visit_get_cols = [ "origin", "visit", "date", "type", "status", "metadata", "snapshot", ] origin_visit_select_cols = [ "o.url AS origin", "ov.visit", "ov.date", "ov.type AS type", "ovs.status", "ovs.metadata", "ovs.snapshot", ] origin_visit_status_select_cols = [ "o.url AS origin", "ovs.visit", "ovs.date", "ovs.status", "ovs.snapshot", "ovs.metadata", ] def _make_origin_visit_status( self, row: Optional[Tuple[Any]] ) -> Optional[Dict[str, Any]]: """Make an origin_visit_status dict out of a row """ if not row: return None return dict(zip(self.origin_visit_status_cols, row)) def origin_visit_status_get_latest( self, origin_url: str, visit: int, allowed_statuses: Optional[List[str]] = None, require_snapshot: bool = False, cur=None, ) -> Optional[Dict[str, Any]]: """Given an origin visit id, return its latest origin_visit_status """ cur = self._cursor(cur) query_parts = [ "SELECT %s" % ", ".join(self.origin_visit_status_select_cols), "FROM origin_visit_status ovs ", "INNER JOIN origin o ON o.id = ovs.origin", ] query_parts.append("WHERE o.url = %s") query_params: List[Any] = [origin_url] query_parts.append("AND ovs.visit = %s") query_params.append(visit) if require_snapshot: query_parts.append("AND ovs.snapshot is not null") if allowed_statuses: query_parts.append("AND ovs.status IN %s") query_params.append(tuple(allowed_statuses)) query_parts.append("ORDER BY ovs.date DESC LIMIT 1") query = "\n".join(query_parts) cur.execute(query, tuple(query_params)) row = cur.fetchone() return self._make_origin_visit_status(row) def origin_visit_status_get_range( self, origin: str, visit: int, date_from: Optional[datetime.datetime], order: ListOrder, limit: int, cur=None, ): """Retrieve visit_status rows for visit (origin, visit) in a paginated way. """ cur = self._cursor(cur) query_parts = [ f"SELECT {', '.join(self.origin_visit_status_select_cols)} " "FROM origin_visit_status ovs ", "INNER JOIN origin o ON o.id = ovs.origin ", ] query_parts.append("WHERE o.url = %s AND ovs.visit = %s ") query_params: List[Any] = [origin, visit] if date_from is not None: op_comparison = ">=" if order == ListOrder.ASC else "<=" query_parts.append(f"and ovs.date {op_comparison} %s ") query_params.append(date_from) if order == ListOrder.ASC: query_parts.append("ORDER BY ovs.date ASC ") elif order == ListOrder.DESC: query_parts.append("ORDER BY ovs.date DESC ") else: assert False query_parts.append("LIMIT %s") query_params.append(limit) query = "\n".join(query_parts) cur.execute(query, tuple(query_params)) yield from cur def origin_visit_get_range( self, origin: str, visit_from: int, order: ListOrder, limit: int, cur=None, ): cur = self._cursor(cur) origin_visit_cols = ["o.url as origin", "ov.visit", "ov.date", "ov.type"] query_parts = [ f"SELECT {', '.join(origin_visit_cols)} FROM origin_visit ov ", "INNER JOIN origin o ON o.id = ov.origin ", ] query_parts.append("WHERE o.url = %s") query_params: List[Any] = [origin] if visit_from > 0: op_comparison = ">" if order == ListOrder.ASC else "<" query_parts.append(f"and ov.visit {op_comparison} %s") query_params.append(visit_from) if order == ListOrder.ASC: query_parts.append("ORDER BY ov.visit ASC") elif order == ListOrder.DESC: query_parts.append("ORDER BY ov.visit DESC") query_parts.append("LIMIT %s") query_params.append(limit) query = "\n".join(query_parts) cur.execute(query, tuple(query_params)) yield from cur def origin_visit_get(self, origin_id, visit_id, cur=None): """Retrieve information on visit visit_id of origin origin_id. Args: origin_id: the origin concerned visit_id: The visit step for that origin Returns: The origin_visit information """ cur = self._cursor(cur) query = """\ SELECT %s FROM origin_visit ov INNER JOIN origin o ON o.id = ov.origin INNER JOIN origin_visit_status ovs ON ov.origin = ovs.origin AND ov.visit = ovs.visit WHERE o.url = %%s AND ov.visit = %%s ORDER BY ovs.date DESC LIMIT 1 """ % ( ", ".join(self.origin_visit_select_cols) ) cur.execute(query, (origin_id, visit_id)) r = cur.fetchall() if not r: return None return r[0] def origin_visit_find_by_date(self, origin, visit_date, cur=None): cur = self._cursor(cur) cur.execute( "SELECT * FROM swh_visit_find_by_date(%s, %s)", (origin, visit_date) ) rows = cur.fetchall() if rows: visit = dict(zip(self.origin_visit_get_cols, rows[0])) visit["origin"] = origin return visit def origin_visit_exists(self, origin_id, visit_id, cur=None): """Check whether an origin visit with the given ids exists""" cur = self._cursor(cur) query = "SELECT 1 FROM origin_visit where origin = %s AND visit = %s" cur.execute(query, (origin_id, visit_id)) return bool(cur.fetchone()) def origin_visit_get_latest( self, origin_id: str, type: Optional[str], allowed_statuses: Optional[Iterable[str]], require_snapshot: bool, cur=None, ): """Retrieve the most recent origin_visit of the given origin, with optional filters. Args: origin_id: the origin concerned type: Optional visit type to filter on allowed_statuses: the visit statuses allowed for the returned visit require_snapshot (bool): If True, only a visit with a known snapshot will be returned. Returns: The origin_visit information, or None if no visit matches. """ cur = self._cursor(cur) query_parts = [ "SELECT %s" % ", ".join(self.origin_visit_select_cols), "FROM origin_visit ov ", "INNER JOIN origin o ON o.id = ov.origin", "INNER JOIN origin_visit_status ovs ", "ON o.id = ovs.origin AND ov.visit = ovs.visit ", ] query_parts.append("WHERE o.url = %s") query_params: List[Any] = [origin_id] if type is not None: query_parts.append("AND ov.type = %s") query_params.append(type) if require_snapshot: query_parts.append("AND ovs.snapshot is not null") if allowed_statuses: query_parts.append("AND ovs.status IN %s") query_params.append(tuple(allowed_statuses)) query_parts.append( "ORDER BY ov.date DESC, ov.visit DESC, ovs.date DESC LIMIT 1" ) query = "\n".join(query_parts) cur.execute(query, tuple(query_params)) r = cur.fetchone() if not r: return None return r def origin_visit_get_random(self, type, cur=None): """Randomly select one origin visit that was full and in the last 3 months """ cur = self._cursor(cur) columns = ",".join(self.origin_visit_select_cols) query = f"""select {columns} from origin_visit ov inner join origin o on ov.origin=o.id inner join origin_visit_status ovs on ov.origin = ovs.origin and ov.visit = ovs.visit where ovs.status='full' and ov.type=%s and ov.date > now() - '3 months'::interval and random() < 0.1 limit 1 """ cur.execute(query, (type,)) return cur.fetchone() @staticmethod def mangle_query_key(key, main_table): if key == "id": return "t.id" if key == "parents": return """ ARRAY( SELECT rh.parent_id::bytea FROM revision_history rh WHERE rh.id = t.id ORDER BY rh.parent_rank )""" if "_" not in key: return "%s.%s" % (main_table, key) head, tail = key.split("_", 1) if head in ("author", "committer") and tail in ( "name", "email", "id", "fullname", ): return "%s.%s" % (head, tail) return "%s.%s" % (main_table, key) def revision_get_from_list(self, revisions, cur=None): cur = self._cursor(cur) query_keys = ", ".join( self.mangle_query_key(k, "revision") for k in self.revision_get_cols ) yield from execute_values_generator( cur, """ SELECT %s FROM (VALUES %%s) as t(sortkey, id) LEFT JOIN revision ON t.id = revision.id LEFT JOIN person author ON revision.author = author.id LEFT JOIN person committer ON revision.committer = committer.id ORDER BY sortkey """ % query_keys, ((sortkey, id) for sortkey, id in enumerate(revisions)), ) def revision_log(self, root_revisions, limit=None, cur=None): cur = self._cursor(cur) query = """SELECT %s FROM swh_revision_log(%%s, %%s) """ % ", ".join( self.revision_get_cols ) cur.execute(query, (root_revisions, limit)) yield from cur revision_shortlog_cols = ["id", "parents"] def revision_shortlog(self, root_revisions, limit=None, cur=None): cur = self._cursor(cur) query = """SELECT %s FROM swh_revision_list(%%s, %%s) """ % ", ".join( self.revision_shortlog_cols ) cur.execute(query, (root_revisions, limit)) yield from cur def revision_get_random(self, cur=None): return self._get_random_row_from_table("revision", ["id"], "id", cur) def release_missing_from_list(self, releases, cur=None): cur = self._cursor(cur) yield from execute_values_generator( cur, """ SELECT id FROM (VALUES %s) as t(id) WHERE NOT EXISTS ( SELECT 1 FROM release r WHERE r.id = t.id ) """, ((id,) for id in releases), ) object_find_by_sha1_git_cols = ["sha1_git", "type"] def object_find_by_sha1_git(self, ids, cur=None): cur = self._cursor(cur) yield from execute_values_generator( cur, """ WITH t (sha1_git) AS (VALUES %s), known_objects as (( select id as sha1_git, 'release'::object_type as type, object_id from release r where exists (select 1 from t where t.sha1_git = r.id) ) union all ( select id as sha1_git, 'revision'::object_type as type, object_id from revision r where exists (select 1 from t where t.sha1_git = r.id) ) union all ( select id as sha1_git, 'directory'::object_type as type, object_id from directory d where exists (select 1 from t where t.sha1_git = d.id) ) union all ( select sha1_git as sha1_git, 'content'::object_type as type, object_id from content c where exists (select 1 from t where t.sha1_git = c.sha1_git) )) select t.sha1_git as sha1_git, k.type from t left join known_objects k on t.sha1_git = k.sha1_git """, ((id,) for id in ids), ) def stat_counters(self, cur=None): cur = self._cursor(cur) cur.execute("SELECT * FROM swh_stat_counters()") yield from cur def origin_add(self, url, cur=None): """Insert a new origin and return the new identifier.""" insert = """INSERT INTO origin (url) values (%s) RETURNING url""" cur.execute(insert, (url,)) return cur.fetchone()[0] origin_cols = ["url"] def origin_get_by_url(self, origins, cur=None): """Retrieve origin `(type, url)` from urls if found.""" cur = self._cursor(cur) query = """SELECT %s FROM (VALUES %%s) as t(url) LEFT JOIN origin ON t.url = origin.url """ % ",".join( "origin." + col for col in self.origin_cols ) yield from execute_values_generator(cur, query, ((url,) for url in origins)) def origin_get_by_sha1(self, sha1s, cur=None): """Retrieve origin urls from sha1s if found.""" cur = self._cursor(cur) query = """SELECT %s FROM (VALUES %%s) as t(sha1) LEFT JOIN origin ON t.sha1 = digest(origin.url, 'sha1') """ % ",".join( "origin." + col for col in self.origin_cols ) yield from execute_values_generator(cur, query, ((sha1,) for sha1 in sha1s)) def origin_id_get_by_url(self, origins, cur=None): """Retrieve origin `(type, url)` from urls if found.""" cur = self._cursor(cur) query = """SELECT id FROM (VALUES %s) as t(url) LEFT JOIN origin ON t.url = origin.url """ for row in execute_values_generator(cur, query, ((url,) for url in origins)): yield row[0] origin_get_range_cols = ["id", "url"] def origin_get_range(self, origin_from: int = 1, origin_count: int = 100, cur=None): """Retrieve ``origin_count`` origins whose ids are greater or equal than ``origin_from``. Origins are sorted by id before retrieving them. Args: origin_from: the minimum id of origins to retrieve origin_count: the maximum number of origins to retrieve """ cur = self._cursor(cur) query = """SELECT %s FROM origin WHERE id >= %%s ORDER BY id LIMIT %%s """ % ",".join( self.origin_get_range_cols ) cur.execute(query, (origin_from, origin_count)) yield from cur def _origin_query( self, url_pattern, count=False, offset=0, limit=50, regexp=False, with_visit=False, cur=None, ): """ Method factorizing query creation for searching and counting origins. """ cur = self._cursor(cur) if count: origin_cols = "COUNT(*)" order_clause = "" else: origin_cols = ",".join(self.origin_cols) order_clause = "ORDER BY id" if not regexp: operator = "ILIKE" query_params = [f"%{url_pattern}%"] else: operator = "~*" query_params = [url_pattern] query = f""" WITH filtered_origins AS ( SELECT * FROM origin WHERE url {operator} %s {order_clause} ) SELECT {origin_cols} FROM filtered_origins AS o """ if with_visit: query += """ WHERE EXISTS ( SELECT 1 FROM origin_visit ov INNER JOIN origin_visit_status ovs ON ov.origin = ovs.origin AND ov.visit = ovs.visit INNER JOIN snapshot ON ovs.snapshot=snapshot.id WHERE ov.origin=o.id ) """ if not count: query += "OFFSET %s LIMIT %s" query_params.extend([offset, limit]) cur.execute(query, query_params) def origin_search( self, url_pattern: str, offset: int = 0, limit: int = 50, regexp: bool = False, with_visit: bool = False, cur=None, ): """Search for origins whose urls contain a provided string pattern or match a provided regular expression. The search is performed in a case insensitive way. Args: url_pattern: the string pattern to search for in origin urls offset: number of found origins to skip before returning results limit: the maximum number of found origins to return regexp: if True, consider the provided pattern as a regular expression and returns origins whose urls match it with_visit: if True, filter out origins with no visit """ self._origin_query( url_pattern, offset=offset, limit=limit, regexp=regexp, with_visit=with_visit, cur=cur, ) yield from cur def origin_count(self, url_pattern, regexp=False, with_visit=False, cur=None): """Count origins whose urls contain a provided string pattern or match a provided regular expression. The pattern search in origin urls is performed in a case insensitive way. Args: url_pattern (str): the string pattern to search for in origin urls regexp (bool): if True, consider the provided pattern as a regular expression and returns origins whose urls match it with_visit (bool): if True, filter out origins with no visit """ self._origin_query( url_pattern, count=True, regexp=regexp, with_visit=with_visit, cur=cur ) return cur.fetchone()[0] release_add_cols = [ "id", "target", "target_type", "date", "date_offset", "date_neg_utc_offset", "name", "comment", "synthetic", "author_fullname", "author_name", "author_email", ] release_get_cols = release_add_cols def release_get_from_list(self, releases, cur=None): cur = self._cursor(cur) query_keys = ", ".join( self.mangle_query_key(k, "release") for k in self.release_get_cols ) yield from execute_values_generator( cur, """ SELECT %s FROM (VALUES %%s) as t(sortkey, id) LEFT JOIN release ON t.id = release.id LEFT JOIN person author ON release.author = author.id ORDER BY sortkey """ % query_keys, ((sortkey, id) for sortkey, id in enumerate(releases)), ) def release_get_random(self, cur=None): return self._get_random_row_from_table("release", ["id"], "id", cur) _raw_extrinsic_metadata_context_cols = [ "origin", "visit", "snapshot", "release", "revision", "path", "directory", ] """The list of context columns for all artifact types.""" _raw_extrinsic_metadata_insert_cols = [ "type", "id", "authority_id", "fetcher_id", "discovery_date", "format", "metadata", *_raw_extrinsic_metadata_context_cols, ] """List of columns of the raw_extrinsic_metadata table, used when writing metadata.""" _raw_extrinsic_metadata_insert_query = f""" INSERT INTO raw_extrinsic_metadata ({', '.join(_raw_extrinsic_metadata_insert_cols)}) VALUES ({', '.join('%s' for _ in _raw_extrinsic_metadata_insert_cols)}) ON CONFLICT (id, authority_id, discovery_date, fetcher_id) DO NOTHING """ raw_extrinsic_metadata_get_cols = [ "raw_extrinsic_metadata.id", "raw_extrinsic_metadata.type", "discovery_date", "metadata_authority.type", "metadata_authority.url", "metadata_fetcher.id", "metadata_fetcher.name", "metadata_fetcher.version", *_raw_extrinsic_metadata_context_cols, "format", "raw_extrinsic_metadata.metadata", ] """List of columns of the raw_extrinsic_metadata, metadata_authority, and metadata_fetcher tables, used when reading object metadata.""" _raw_extrinsic_metadata_select_query = f""" SELECT {', '.join(raw_extrinsic_metadata_get_cols)} FROM raw_extrinsic_metadata INNER JOIN metadata_authority ON (metadata_authority.id=authority_id) INNER JOIN metadata_fetcher ON (metadata_fetcher.id=fetcher_id) WHERE raw_extrinsic_metadata.id=%s AND authority_id=%s """ def raw_extrinsic_metadata_add( self, type: str, id: str, discovery_date: datetime.datetime, authority_id: int, fetcher_id: int, format: str, metadata: bytes, origin: Optional[str], visit: Optional[int], snapshot: Optional[str], release: Optional[str], revision: Optional[str], path: Optional[bytes], directory: Optional[str], cur, ): query = self._raw_extrinsic_metadata_insert_query args: Dict[str, Any] = dict( type=type, id=id, authority_id=authority_id, fetcher_id=fetcher_id, discovery_date=discovery_date, format=format, metadata=metadata, origin=origin, visit=visit, snapshot=snapshot, release=release, revision=revision, path=path, directory=directory, ) params = [args[col] for col in self._raw_extrinsic_metadata_insert_cols] cur.execute(query, params) def raw_extrinsic_metadata_get( self, type: str, id: str, authority_id: int, after_time: Optional[datetime.datetime], after_fetcher: Optional[int], limit: int, cur, ): query_parts = [self._raw_extrinsic_metadata_select_query] args = [id, authority_id] if after_fetcher is not None: assert after_time query_parts.append("AND (discovery_date, fetcher_id) > (%s, %s)") args.extend([after_time, after_fetcher]) elif after_time is not None: query_parts.append("AND discovery_date > %s") args.append(after_time) query_parts.append("ORDER BY discovery_date, fetcher_id") if limit: query_parts.append("LIMIT %s") args.append(limit) cur.execute(" ".join(query_parts), args) yield from cur metadata_fetcher_cols = ["name", "version", "metadata"] def metadata_fetcher_add( self, name: str, version: str, metadata: bytes, cur=None ) -> None: cur = self._cursor(cur) cur.execute( "INSERT INTO metadata_fetcher (name, version, metadata) " "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING", (name, version, jsonize(metadata)), ) def metadata_fetcher_get(self, name: str, version: str, cur=None): cur = self._cursor(cur) cur.execute( f"SELECT {', '.join(self.metadata_fetcher_cols)} " f"FROM metadata_fetcher " f"WHERE name=%s AND version=%s", (name, version), ) return cur.fetchone() def metadata_fetcher_get_id( self, name: str, version: str, cur=None ) -> Optional[int]: cur = self._cursor(cur) cur.execute( "SELECT id FROM metadata_fetcher WHERE name=%s AND version=%s", (name, version), ) row = cur.fetchone() if row: return row[0] else: return None metadata_authority_cols = ["type", "url", "metadata"] def metadata_authority_add( self, type: str, url: str, metadata: bytes, cur=None ) -> None: cur = self._cursor(cur) cur.execute( "INSERT INTO metadata_authority (type, url, metadata) " "VALUES (%s, %s, %s) ON CONFLICT DO NOTHING", (type, url, jsonize(metadata)), ) def metadata_authority_get(self, type: str, url: str, cur=None): cur = self._cursor(cur) cur.execute( f"SELECT {', '.join(self.metadata_authority_cols)} " f"FROM metadata_authority " f"WHERE type=%s AND url=%s", (type, url), ) return cur.fetchone() def metadata_authority_get_id(self, type: str, url: str, cur=None) -> Optional[int]: cur = self._cursor(cur) cur.execute( "SELECT id FROM metadata_authority WHERE type=%s AND url=%s", (type, url) ) row = cur.fetchone() if row: return row[0] else: return None def _get_random_row_from_table(self, table_name, cols, id_col, cur=None): random_sha1 = bytes(random.randint(0, 255) for _ in range(SHA1_SIZE)) cur = self._cursor(cur) query = """ (SELECT {cols} FROM {table} WHERE {id_col} >= %s ORDER BY {id_col} LIMIT 1) UNION (SELECT {cols} FROM {table} WHERE {id_col} < %s ORDER BY {id_col} DESC LIMIT 1) LIMIT 1 """.format( cols=", ".join(cols), table=table_name, id_col=id_col ) cur.execute(query, (random_sha1, random_sha1)) row = cur.fetchone() if row: return row[0] dbversion_cols = ["version", "release", "description"] def dbversion(self): with self.transaction() as cur: cur.execute(f"SELECT {', '.join(self.dbversion_cols)} FROM dbversion") return dict(zip(self.dbversion_cols, cur.fetchone())) def check_dbversion(self): dbversion = self.dbversion()["version"] if dbversion != self.current_version: logger.warning( "database dbversion (%s) != %s current_version (%s)", dbversion, __name__, self.current_version, ) return dbversion == self.current_version diff --git a/swh/storage/sql/15-flavor.sql b/swh/storage/sql/15-flavor.sql new file mode 100644 index 00000000..c4d59b9b --- /dev/null +++ b/swh/storage/sql/15-flavor.sql @@ -0,0 +1,22 @@ +-- database flavor +create type database_flavor as enum ( + 'default', -- default: full index availability for deduplication and read queries + 'mirror', -- mirror: reduced indexes to allow for out of order insertions + 'read_replica' -- read replica: minimal indexes to allow read queries +); +comment on type database_flavor is 'Flavor of the current database'; + +create table dbflavor ( + flavor database_flavor, + single_row char(1) primary key default 'x', + check (single_row = 'x') +); +comment on table dbflavor is 'Database flavor storage'; +comment on column dbflavor.flavor is 'Database flavor currently deployed'; +comment on column dbflavor.single_row is 'Bogus column to force the table to have a single row'; + +create or replace function swh_get_dbflavor() returns database_flavor language sql stable as $$ + select coalesce((select flavor from dbflavor), 'default'); +$$; + +comment on function swh_get_dbflavor is 'Get the flavor of the database currently deployed'; diff --git a/swh/storage/sql/30-schema.sql b/swh/storage/sql/30-schema.sql index 68b67d6c..3e027018 100644 --- a/swh/storage/sql/30-schema.sql +++ b/swh/storage/sql/30-schema.sql @@ -1,499 +1,499 @@ --- --- SQL implementation of the Software Heritage data model --- -- schema versions create table dbversion ( version int primary key, release timestamptz, description text ); comment on table dbversion is 'Details of current db version'; comment on column dbversion.version is 'SQL schema version'; comment on column dbversion.release is 'Version deployment timestamp'; comment on column dbversion.description is 'Release description'; -- latest schema version insert into dbversion(version, release, description) - values(162, now(), 'Work In Progress'); + values(163, now(), 'Work In Progress'); -- a SHA1 checksum create domain sha1 as bytea check (length(value) = 20); -- a Git object ID, i.e., a Git-style salted SHA1 checksum create domain sha1_git as bytea check (length(value) = 20); -- a SHA256 checksum create domain sha256 as bytea check (length(value) = 32); -- a blake2 checksum create domain blake2s256 as bytea check (length(value) = 32); -- UNIX path (absolute, relative, individual path component, etc.) create domain unix_path as bytea; -- a set of UNIX-like access permissions, as manipulated by, e.g., chmod create domain file_perms as int; -- an SWHID create domain swhid as text check (value ~ '^swh:[0-9]+:.*'); -- Checksums about actual file content. Note that the content itself is not -- stored in the DB, but on external (key-value) storage. A single checksum is -- used as key there, but the other can be used to verify that we do not inject -- content collisions not knowingly. create table content ( sha1 sha1 not null, sha1_git sha1_git not null, sha256 sha256 not null, blake2s256 blake2s256 not null, length bigint not null, ctime timestamptz not null default now(), -- creation time, i.e. time of (first) injection into the storage status content_status not null default 'visible', object_id bigserial ); comment on table content is 'Checksums of file content which is actually stored externally'; comment on column content.sha1 is 'Content sha1 hash'; comment on column content.sha1_git is 'Git object sha1 hash'; comment on column content.sha256 is 'Content Sha256 hash'; comment on column content.blake2s256 is 'Content blake2s hash'; comment on column content.length is 'Content length'; comment on column content.ctime is 'First seen time'; comment on column content.status is 'Content status (absent, visible, hidden)'; comment on column content.object_id is 'Content identifier'; -- An origin is a place, identified by an URL, where software source code -- artifacts can be found. We support different kinds of origins, e.g., git and -- other VCS repositories, web pages that list tarballs URLs (e.g., -- http://www.kernel.org), indirect tarball URLs (e.g., -- http://www.example.org/latest.tar.gz), etc. The key feature of an origin is -- that it can be *fetched* from (wget, git clone, svn checkout, etc.) to -- retrieve all the contained software. create table origin ( id bigserial not null, url text not null ); comment on column origin.id is 'Artifact origin id'; comment on column origin.url is 'URL of origin'; -- Content blobs observed somewhere, but not ingested into the archive for -- whatever reason. This table is separate from the content table as we might -- not have the sha1 checksum of skipped contents (for instance when we inject -- git repositories, objects that are too big will be skipped here, and we will -- only know their sha1_git). 'reason' contains the reason the content was -- skipped. origin is a nullable column allowing to find out which origin -- contains that skipped content. create table skipped_content ( sha1 sha1, sha1_git sha1_git, sha256 sha256, blake2s256 blake2s256, length bigint not null, ctime timestamptz not null default now(), status content_status not null default 'absent', reason text not null, origin bigint, object_id bigserial ); comment on table skipped_content is 'Content blobs observed, but not ingested in the archive'; comment on column skipped_content.sha1 is 'Skipped content sha1 hash'; comment on column skipped_content.sha1_git is 'Git object sha1 hash'; comment on column skipped_content.sha256 is 'Skipped content sha256 hash'; comment on column skipped_content.blake2s256 is 'Skipped content blake2s hash'; comment on column skipped_content.length is 'Skipped content length'; comment on column skipped_content.ctime is 'First seen time'; comment on column skipped_content.status is 'Skipped content status (absent, visible, hidden)'; comment on column skipped_content.reason is 'Reason for skipping'; comment on column skipped_content.origin is 'Origin table identifier'; comment on column skipped_content.object_id is 'Skipped content identifier'; -- A file-system directory. A directory is a list of directory entries (see -- tables: directory_entry_{dir,file}). -- -- To list the contents of a directory: -- 1. list the contained directory_entry_dir using array dir_entries -- 2. list the contained directory_entry_file using array file_entries -- 3. list the contained directory_entry_rev using array rev_entries -- 4. UNION -- -- Synonyms/mappings: -- * git: tree create table directory ( id sha1_git not null, dir_entries bigint[], -- sub-directories, reference directory_entry_dir file_entries bigint[], -- contained files, reference directory_entry_file rev_entries bigint[], -- mounted revisions, reference directory_entry_rev object_id bigserial -- short object identifier ); comment on table directory is 'Contents of a directory, synonymous to tree (git)'; comment on column directory.id is 'Git object sha1 hash'; comment on column directory.dir_entries is 'Sub-directories, reference directory_entry_dir'; comment on column directory.file_entries is 'Contained files, reference directory_entry_file'; comment on column directory.rev_entries is 'Mounted revisions, reference directory_entry_rev'; comment on column directory.object_id is 'Short object identifier'; -- A directory entry pointing to a (sub-)directory. create table directory_entry_dir ( id bigserial, target sha1_git not null, -- id of target directory name unix_path not null, -- path name, relative to containing dir perms file_perms not null -- unix-like permissions ); comment on table directory_entry_dir is 'Directory entry for directory'; comment on column directory_entry_dir.id is 'Directory identifier'; comment on column directory_entry_dir.target is 'Target directory identifier'; comment on column directory_entry_dir.name is 'Path name, relative to containing directory'; comment on column directory_entry_dir.perms is 'Unix-like permissions'; -- A directory entry pointing to a file content. create table directory_entry_file ( id bigserial, target sha1_git not null, -- id of target file name unix_path not null, -- path name, relative to containing dir perms file_perms not null -- unix-like permissions ); comment on table directory_entry_file is 'Directory entry for file'; comment on column directory_entry_file.id is 'File identifier'; comment on column directory_entry_file.target is 'Target file identifier'; comment on column directory_entry_file.name is 'Path name, relative to containing directory'; comment on column directory_entry_file.perms is 'Unix-like permissions'; -- A directory entry pointing to a revision. create table directory_entry_rev ( id bigserial, target sha1_git not null, -- id of target revision name unix_path not null, -- path name, relative to containing dir perms file_perms not null -- unix-like permissions ); comment on table directory_entry_rev is 'Directory entry for revision'; comment on column directory_entry_dir.id is 'Revision identifier'; comment on column directory_entry_dir.target is 'Target revision in identifier'; comment on column directory_entry_dir.name is 'Path name, relative to containing directory'; comment on column directory_entry_dir.perms is 'Unix-like permissions'; -- A person referenced by some source code artifacts, e.g., a VCS revision or -- release metadata. create table person ( id bigserial, name bytea, -- advisory: not null if we managed to parse a name email bytea, -- advisory: not null if we managed to parse an email fullname bytea not null -- freeform specification; what is actually used in the checksums -- will usually be of the form 'name ' ); comment on table person is 'Person referenced in code artifact release metadata'; comment on column person.id is 'Person identifier'; comment on column person.name is 'Name'; comment on column person.email is 'Email'; comment on column person.fullname is 'Full name (raw name)'; -- The state of a source code tree at a specific point in time. -- -- Synonyms/mappings: -- * git / subversion / etc: commit -- * tarball: a specific tarball -- -- Revisions are organized as DAGs. Each revision points to 0, 1, or more (in -- case of merges) parent revisions. Each revision points to a directory, i.e., -- a file-system tree containing files and directories. create table revision ( id sha1_git not null, date timestamptz, date_offset smallint, committer_date timestamptz, committer_date_offset smallint, type revision_type not null, directory sha1_git, -- source code 'root' directory message bytea, author bigint, committer bigint, synthetic boolean not null default false, -- true iff revision has been created by Software Heritage metadata jsonb, -- extra metadata (tarball checksums, extra commit information, etc...) object_id bigserial, date_neg_utc_offset boolean, committer_date_neg_utc_offset boolean, extra_headers bytea[][] not null -- extra headers (used in hash computation) ); comment on table revision is 'A revision represents the state of a source code tree at a specific point in time'; comment on column revision.id is 'Git-style SHA1 commit identifier'; comment on column revision.date is 'Author timestamp as UNIX epoch'; comment on column revision.date_offset is 'Author timestamp timezone, as minute offsets from UTC'; comment on column revision.date_neg_utc_offset is 'True indicates a -0 UTC offset on author timestamp'; comment on column revision.committer_date is 'Committer timestamp as UNIX epoch'; comment on column revision.committer_date_offset is 'Committer timestamp timezone, as minute offsets from UTC'; comment on column revision.committer_date_neg_utc_offset is 'True indicates a -0 UTC offset on committer timestamp'; comment on column revision.type is 'Type of revision'; comment on column revision.directory is 'Directory identifier'; comment on column revision.message is 'Commit message'; comment on column revision.author is 'Author identity'; comment on column revision.committer is 'Committer identity'; comment on column revision.synthetic is 'True iff revision has been synthesized by Software Heritage'; comment on column revision.metadata is 'Extra revision metadata'; comment on column revision.object_id is 'Non-intrinsic, sequential object identifier'; comment on column revision.extra_headers is 'Extra revision headers; used in revision hash computation'; -- either this table or the sha1_git[] column on the revision table create table revision_history ( id sha1_git not null, parent_id sha1_git not null, parent_rank int not null default 0 -- parent position in merge commits, 0-based ); comment on table revision_history is 'Sequence of revision history with parent and position in history'; comment on column revision_history.id is 'Revision history git object sha1 checksum'; comment on column revision_history.parent_id is 'Parent revision git object identifier'; comment on column revision_history.parent_rank is 'Parent position in merge commits, 0-based'; -- Crawling history of software origins visited by Software Heritage. Each -- visit is a 3-way mapping between a software origin, a timestamp, and a -- snapshot object capturing the full-state of the origin at visit time. create table origin_visit ( origin bigint not null, visit bigint not null, date timestamptz not null, type text not null ); comment on column origin_visit.origin is 'Visited origin'; comment on column origin_visit.visit is 'Sequential visit number for the origin'; comment on column origin_visit.date is 'Visit timestamp'; comment on column origin_visit.type is 'Type of loader that did the visit (hg, git, ...)'; -- Crawling history of software origin visits by Software Heritage. Each -- visit see its history change through new origin visit status updates create table origin_visit_status ( origin bigint not null, visit bigint not null, date timestamptz not null, status origin_visit_state not null, metadata jsonb, snapshot sha1_git ); comment on column origin_visit_status.origin is 'Origin concerned by the visit update'; comment on column origin_visit_status.visit is 'Visit concerned by the visit update'; comment on column origin_visit_status.date is 'Visit update timestamp'; comment on column origin_visit_status.status is 'Visit status (ongoing, failed, full)'; comment on column origin_visit_status.metadata is 'Optional origin visit metadata'; comment on column origin_visit_status.snapshot is 'Optional, possibly partial, snapshot of the origin visit. It can be partial.'; -- A snapshot represents the entire state of a software origin as crawled by -- Software Heritage. This table is a simple mapping between (public) intrinsic -- snapshot identifiers and (private) numeric sequential identifiers. create table snapshot ( object_id bigserial not null, -- PK internal object identifier id sha1_git not null -- snapshot intrinsic identifier ); comment on table snapshot is 'State of a software origin as crawled by Software Heritage'; comment on column snapshot.object_id is 'Internal object identifier'; comment on column snapshot.id is 'Intrinsic snapshot identifier'; -- Each snapshot associate "branch" names to other objects in the Software -- Heritage Merkle DAG. This table describes branches as mappings between names -- and target typed objects. create table snapshot_branch ( object_id bigserial not null, -- PK internal object identifier name bytea not null, -- branch name, e.g., "master" or "feature/drag-n-drop" target bytea, -- target object identifier, e.g., a revision identifier target_type snapshot_target -- target object type, e.g., "revision" ); comment on table snapshot_branch is 'Associates branches with objects in Heritage Merkle DAG'; comment on column snapshot_branch.object_id is 'Internal object identifier'; comment on column snapshot_branch.name is 'Branch name'; comment on column snapshot_branch.target is 'Target object identifier'; comment on column snapshot_branch.target_type is 'Target object type'; -- Mapping between snapshots and their branches. create table snapshot_branches ( snapshot_id bigint not null, -- snapshot identifier, ref. snapshot.object_id branch_id bigint not null -- branch identifier, ref. snapshot_branch.object_id ); comment on table snapshot_branches is 'Mapping between snapshot and their branches'; comment on column snapshot_branches.snapshot_id is 'Snapshot identifier'; comment on column snapshot_branches.branch_id is 'Branch identifier'; -- A "memorable" point in time in the development history of a software -- project. -- -- Synonyms/mappings: -- * git: tag (of the annotated kind, otherwise they are just references) -- * tarball: the release version number create table release ( id sha1_git not null, target sha1_git, date timestamptz, date_offset smallint, name bytea, comment bytea, author bigint, synthetic boolean not null default false, -- true iff release has been created by Software Heritage object_id bigserial, target_type object_type not null, date_neg_utc_offset boolean ); comment on table release is 'Details of a software release, synonymous with a tag (git) or version number (tarball)'; comment on column release.id is 'Release git identifier'; comment on column release.target is 'Target git identifier'; comment on column release.date is 'Release timestamp'; comment on column release.date_offset is 'Timestamp offset from UTC'; comment on column release.name is 'Name'; comment on column release.comment is 'Comment'; comment on column release.author is 'Author'; comment on column release.synthetic is 'Indicates if created by Software Heritage'; comment on column release.object_id is 'Object identifier'; comment on column release.target_type is 'Object type (''content'', ''directory'', ''revision'', ''release'', ''snapshot'')'; comment on column release.date_neg_utc_offset is 'True indicates -0 UTC offset for release timestamp'; -- Tools create table metadata_fetcher ( id serial not null, name text not null, version text not null, metadata jsonb not null ); comment on table metadata_fetcher is 'Tools used to retrieve metadata'; comment on column metadata_fetcher.id is 'Internal identifier of the fetcher'; comment on column metadata_fetcher.name is 'Fetcher name'; comment on column metadata_fetcher.version is 'Fetcher version'; comment on column metadata_fetcher.metadata is 'Extra information about the fetcher'; create table metadata_authority ( id serial not null, type text not null, url text not null, metadata jsonb not null ); comment on table metadata_authority is 'Metadata authority information'; comment on column metadata_authority.id is 'Internal identifier of the authority'; comment on column metadata_authority.type is 'Type of authority (deposit_client/forge/registry)'; comment on column metadata_authority.url is 'Authority''s uri'; comment on column metadata_authority.metadata is 'Other metadata about authority'; -- Extrinsic metadata on a DAG objects and origins. create table raw_extrinsic_metadata ( type text not null, id text not null, -- metadata source authority_id bigint not null, fetcher_id bigint not null, discovery_date timestamptz not null, -- metadata itself format text not null, metadata bytea not null, -- context origin text, visit bigint, snapshot swhid, release swhid, revision swhid, path bytea, directory swhid ); comment on table raw_extrinsic_metadata is 'keeps all metadata found concerning an object'; comment on column raw_extrinsic_metadata.type is 'the type of object (content/directory/revision/release/snapshot/origin) the metadata is on'; comment on column raw_extrinsic_metadata.id is 'the SWHID or origin URL for which the metadata was found'; comment on column raw_extrinsic_metadata.discovery_date is 'the date of retrieval'; comment on column raw_extrinsic_metadata.authority_id is 'the metadata provider: github, openhub, deposit, etc.'; comment on column raw_extrinsic_metadata.fetcher_id is 'the tool used for extracting metadata: loaders, crawlers, etc.'; comment on column raw_extrinsic_metadata.format is 'name of the format of metadata, used by readers to interpret it.'; comment on column raw_extrinsic_metadata.metadata is 'original metadata in opaque format'; -- Keep a cache of object counts create table object_counts ( object_type text, -- table for which we're counting objects (PK) value bigint, -- count of objects in the table last_update timestamptz, -- last update for the object count in this table single_update boolean -- whether we update this table standalone (true) or through bucketed counts (false) ); comment on table object_counts is 'Cache of object counts'; comment on column object_counts.object_type is 'Object type (''content'', ''directory'', ''revision'', ''release'', ''snapshot'')'; comment on column object_counts.value is 'Count of objects in the table'; comment on column object_counts.last_update is 'Last update for object count'; comment on column object_counts.single_update is 'standalone (true) or bucketed counts (false)'; create table object_counts_bucketed ( line serial not null, -- PK object_type text not null, -- table for which we're counting objects identifier text not null, -- identifier across which we're bucketing objects bucket_start bytea, -- lower bound (inclusive) for the bucket bucket_end bytea, -- upper bound (exclusive) for the bucket value bigint, -- count of objects in the bucket last_update timestamptz -- last update for the object count in this bucket ); comment on table object_counts_bucketed is 'Bucketed count for objects ordered by type'; comment on column object_counts_bucketed.line is 'Auto incremented idenitfier value'; comment on column object_counts_bucketed.object_type is 'Object type (''content'', ''directory'', ''revision'', ''release'', ''snapshot'')'; comment on column object_counts_bucketed.identifier is 'Common identifier for bucketed objects'; comment on column object_counts_bucketed.bucket_start is 'Lower bound (inclusive) for the bucket'; comment on column object_counts_bucketed.bucket_end is 'Upper bound (exclusive) for the bucket'; comment on column object_counts_bucketed.value is 'Count of objects in the bucket'; comment on column object_counts_bucketed.last_update is 'Last update for the object count in this bucket'; diff --git a/swh/storage/sql/60-indexes.sql b/swh/storage/sql/60-indexes.sql index f5ac7c1d..75a1697a 100644 --- a/swh/storage/sql/60-indexes.sql +++ b/swh/storage/sql/60-indexes.sql @@ -1,208 +1,283 @@ +-- psql variables to get the current database flavor + +select swh_get_dbflavor() = 'read_replica' as dbflavor_read_replica \gset +select swh_get_dbflavor() != 'read_replica' as dbflavor_does_deduplication \gset +select swh_get_dbflavor() = 'mirror' as dbflavor_mirror \gset +select swh_get_dbflavor() = 'default' as dbflavor_default \gset + -- content create unique index concurrently content_pkey on content(sha1); -create unique index concurrently on content(sha1_git); +alter table content add primary key using index content_pkey; + +\if :dbflavor_does_deduplication + create unique index concurrently on content(sha1_git); +\else + create index concurrently on content(sha1_git); +\endif + create index concurrently on content(sha256); create index concurrently on content(blake2s256); -create index concurrently on content(ctime); -- TODO use a BRIN index here (postgres >= 9.5) -create unique index concurrently on content(object_id); - -alter table content add primary key using index content_pkey; +\if :dbflavor_default + create unique index concurrently on content(object_id); -- to be reviewed + create index concurrently on content(ctime); -- to be reviewed +\endif -- origin create unique index concurrently origin_pkey on origin(id); -create unique index concurrently on origin using btree(url); +alter table origin add primary key using index origin_pkey; + +\if :dbflavor_does_deduplication + create unique index concurrently on origin using btree(url); +\else + create index concurrently on origin using btree(url); +\endif + create index concurrently on origin using gin (url gin_trgm_ops); create index concurrently on origin using btree(digest(url, 'sha1')); -alter table origin add primary key using index origin_pkey; - -- skipped_content -alter table skipped_content add constraint skipped_content_sha1_sha1_git_sha256_key unique (sha1, sha1_git, sha256); +\if :dbflavor_does_deduplication + alter table skipped_content add constraint skipped_content_sha1_sha1_git_sha256_key unique (sha1, sha1_git, sha256); +\endif create index concurrently on skipped_content(sha1); create index concurrently on skipped_content(sha1_git); create index concurrently on skipped_content(sha256); create index concurrently on skipped_content(blake2s256); create unique index concurrently on skipped_content(object_id); -alter table skipped_content add constraint skipped_content_origin_fkey foreign key (origin) references origin(id) not valid; -alter table skipped_content validate constraint skipped_content_origin_fkey; +\if :dbflavor_default + alter table skipped_content add constraint skipped_content_origin_fkey foreign key (origin) references origin(id) not valid; + alter table skipped_content validate constraint skipped_content_origin_fkey; +\endif -- directory - create unique index concurrently directory_pkey on directory(id); alter table directory add primary key using index directory_pkey; -create index concurrently on directory using gin (dir_entries); -create index concurrently on directory using gin (file_entries); -create index concurrently on directory using gin (rev_entries); -create unique index concurrently on directory(object_id); +\if :dbflavor_default + create index concurrently on directory using gin (dir_entries); -- to be reviewed + create index concurrently on directory using gin (file_entries); -- to be reviewed + create index concurrently on directory using gin (rev_entries); -- to be reviewed + create unique index concurrently on directory(object_id); -- to be reviewed +\endif -- directory_entry_dir create unique index concurrently directory_entry_dir_pkey on directory_entry_dir(id); alter table directory_entry_dir add primary key using index directory_entry_dir_pkey; -create unique index concurrently on directory_entry_dir(target, name, perms); +\if :dbflavor_does_deduplication + create unique index concurrently on directory_entry_dir(target, name, perms); +\endif -- directory_entry_file create unique index concurrently directory_entry_file_pkey on directory_entry_file(id); alter table directory_entry_file add primary key using index directory_entry_file_pkey; -create unique index concurrently on directory_entry_file(target, name, perms); +\if :dbflavor_does_deduplication + create unique index concurrently on directory_entry_file(target, name, perms); +\endif -- directory_entry_rev create unique index concurrently directory_entry_rev_pkey on directory_entry_rev(id); alter table directory_entry_rev add primary key using index directory_entry_rev_pkey; -create unique index concurrently on directory_entry_rev(target, name, perms); +\if :dbflavor_does_deduplication + create unique index concurrently on directory_entry_rev(target, name, perms); +\endif + -- person create unique index concurrently person_pkey on person(id); alter table person add primary key using index person_pkey; -create unique index concurrently on person(fullname); -create index concurrently on person(name); -create index concurrently on person(email); +\if :dbflavor_does_deduplication + create unique index concurrently on person(fullname); +\else + create index concurrently on person(fullname); -- to be reviewed +\endif + +\if :dbflavor_default + create index concurrently on person(name); -- to be reviewed + create index concurrently on person(email); -- to be reviewed +\endif -- revision create unique index concurrently revision_pkey on revision(id); alter table revision add primary key using index revision_pkey; -alter table revision add constraint revision_author_fkey foreign key (author) references person(id) not valid; -alter table revision validate constraint revision_author_fkey; -alter table revision add constraint revision_committer_fkey foreign key (committer) references person(id) not valid; -alter table revision validate constraint revision_committer_fkey; +\if :dbflavor_does_deduplication + alter table revision add constraint revision_author_fkey foreign key (author) references person(id) not valid; + alter table revision validate constraint revision_author_fkey; + alter table revision add constraint revision_committer_fkey foreign key (committer) references person(id) not valid; + alter table revision validate constraint revision_committer_fkey; -alter table revision + alter table revision add constraint revision_date_neg_utc_offset_not_null check (date is null or date_neg_utc_offset is not null) not valid; -alter table revision + alter table revision add constraint revision_committer_date_neg_utc_offset_not_null check (committer_date is null or committer_date_neg_utc_offset is not null) not valid; -alter table revision + alter table revision validate constraint revision_date_neg_utc_offset_not_null; -alter table revision + alter table revision validate constraint revision_committer_date_neg_utc_offset_not_null; +\endif -create index concurrently on revision(directory); -create unique index concurrently on revision(object_id); +\if :dbflavor_default + create index concurrently on revision(directory); -- to be reviewed + create unique index concurrently on revision(object_id); -- to be reviewed +\endif -- revision_history create unique index concurrently revision_history_pkey on revision_history(id, parent_rank); alter table revision_history add primary key using index revision_history_pkey; -create index concurrently on revision_history(parent_id); +\if :dbflavor_default + create index concurrently on revision_history(parent_id); -- to be reviewed +\endif -alter table revision_history add constraint revision_history_id_fkey foreign key (id) references revision(id) not valid; -alter table revision_history validate constraint revision_history_id_fkey; +\if :dbflavor_does_deduplication + alter table revision_history add constraint revision_history_id_fkey foreign key (id) references revision(id) not valid; + alter table revision_history validate constraint revision_history_id_fkey; +\endif -- snapshot create unique index concurrently snapshot_pkey on snapshot(object_id); alter table snapshot add primary key using index snapshot_pkey; -create unique index concurrently on snapshot(id); +\if :dbflavor_does_deduplication + create unique index concurrently on snapshot(id); +\else + create index concurrently on snapshot(id); +\endif -- snapshot_branch create unique index concurrently snapshot_branch_pkey on snapshot_branch(object_id); alter table snapshot_branch add primary key using index snapshot_branch_pkey; -create unique index concurrently on snapshot_branch (target_type, target, name); -alter table snapshot_branch add constraint snapshot_branch_target_check check ((target_type is null) = (target is null)) not valid; -alter table snapshot_branch validate constraint snapshot_branch_target_check; -alter table snapshot_branch add constraint snapshot_target_check check (target_type not in ('content', 'directory', 'revision', 'release', 'snapshot') or length(target) = 20) not valid; -alter table snapshot_branch validate constraint snapshot_target_check; +\if :dbflavor_does_deduplication + create unique index concurrently on snapshot_branch (target_type, target, name); + alter table snapshot_branch add constraint snapshot_branch_target_check check ((target_type is null) = (target is null)) not valid; + alter table snapshot_branch validate constraint snapshot_branch_target_check; + alter table snapshot_branch add constraint snapshot_target_check check (target_type not in ('content', 'directory', 'revision', 'release', 'snapshot') or length(target) = 20) not valid; + alter table snapshot_branch validate constraint snapshot_target_check; -create unique index concurrently on snapshot_branch (name) where target_type is null and target is null; + create unique index concurrently on snapshot_branch (name) where target_type is null and target is null; +\endif -- snapshot_branches create unique index concurrently snapshot_branches_pkey on snapshot_branches(snapshot_id, branch_id); alter table snapshot_branches add primary key using index snapshot_branches_pkey; -alter table snapshot_branches add constraint snapshot_branches_snapshot_id_fkey foreign key (snapshot_id) references snapshot(object_id) not valid; -alter table snapshot_branches validate constraint snapshot_branches_snapshot_id_fkey; +\if :dbflavor_does_deduplication + alter table snapshot_branches add constraint snapshot_branches_snapshot_id_fkey foreign key (snapshot_id) references snapshot(object_id) not valid; + alter table snapshot_branches validate constraint snapshot_branches_snapshot_id_fkey; -alter table snapshot_branches add constraint snapshot_branches_branch_id_fkey foreign key (branch_id) references snapshot_branch(object_id) not valid; -alter table snapshot_branches validate constraint snapshot_branches_branch_id_fkey; + alter table snapshot_branches add constraint snapshot_branches_branch_id_fkey foreign key (branch_id) references snapshot_branch(object_id) not valid; + alter table snapshot_branches validate constraint snapshot_branches_branch_id_fkey; +\endif -- origin_visit create unique index concurrently origin_visit_pkey on origin_visit(origin, visit); alter table origin_visit add primary key using index origin_visit_pkey; -create index concurrently on origin_visit(date); -create index concurrently origin_visit_type_date on origin_visit(type, date); +\if :dbflavor_default + create index concurrently on origin_visit(date); -- to be reviewed + create index concurrently origin_visit_type_date on origin_visit(type, date); -- to be reviewed +\endif -alter table origin_visit add constraint origin_visit_origin_fkey foreign key (origin) references origin(id) not valid; -alter table origin_visit validate constraint origin_visit_origin_fkey; +\if :dbflavor_does_deduplication + alter table origin_visit add constraint origin_visit_origin_fkey foreign key (origin) references origin(id) not valid; + alter table origin_visit validate constraint origin_visit_origin_fkey; +\endif -- origin_visit_status create unique index concurrently origin_visit_status_pkey on origin_visit_status(origin, visit, date); alter table origin_visit_status add primary key using index origin_visit_status_pkey; -alter table origin_visit_status - add constraint origin_visit_status_origin_visit_fkey - foreign key (origin, visit) - references origin_visit(origin, visit) not valid; -alter table origin_visit_status validate constraint origin_visit_status_origin_visit_fkey; +\if :dbflavor_default + alter table origin_visit_status + add constraint origin_visit_status_origin_visit_fkey + foreign key (origin, visit) + references origin_visit(origin, visit) not valid; + alter table origin_visit_status validate constraint origin_visit_status_origin_visit_fkey; +\endif -- release create unique index concurrently release_pkey on release(id); alter table release add primary key using index release_pkey; -create index concurrently on release(target, target_type); -create unique index concurrently on release(object_id); +\if :dbflavor_default + create index concurrently on release(target, target_type); -- to be reviewed + create unique index concurrently on release(object_id); -- to be reviewed +\endif -alter table release add constraint release_author_fkey foreign key (author) references person(id) not valid; -alter table release validate constraint release_author_fkey; +\if :dbflavor_does_deduplication + alter table release add constraint release_author_fkey foreign key (author) references person(id) not valid; + alter table release validate constraint release_author_fkey; -alter table release + alter table release add constraint release_date_neg_utc_offset_not_null check (date is null or date_neg_utc_offset is not null) not valid; -alter table release + alter table release validate constraint release_date_neg_utc_offset_not_null; --- if the author is null, then the date must be null -alter table release add constraint release_author_date_check check ((date is null) or (author is not null)) not valid; -alter table release validate constraint release_author_date_check; + -- if the author is null, then the date must be null + alter table release add constraint release_author_date_check check ((date is null) or (author is not null)) not valid; + alter table release validate constraint release_author_date_check; +\endif -- metadata_fetcher create unique index metadata_fetcher_pkey on metadata_fetcher(id); alter table metadata_fetcher add primary key using index metadata_fetcher_pkey; -create unique index metadata_fetcher_name_version on metadata_fetcher(name, version); +\if :dbflavor_does_deduplication + create unique index metadata_fetcher_name_version on metadata_fetcher(name, version); +\else + create index metadata_fetcher_name_version on metadata_fetcher(name, version); +\endif -- metadata_authority create unique index concurrently metadata_authority_pkey on metadata_authority(id); alter table metadata_authority add primary key using index metadata_authority_pkey; -create unique index metadata_authority_type_url on metadata_authority(type, url); +\if :dbflavor_does_deduplication + create unique index concurrently metadata_authority_type_url on metadata_authority(type, url); +\else + create index concurrently metadata_authority_type_url on metadata_authority(type, url); +\endif + -- raw_extrinsic_metadata create unique index concurrently raw_extrinsic_metadata_content_authority_date_fetcher on raw_extrinsic_metadata(id, authority_id, discovery_date, fetcher_id); -alter table raw_extrinsic_metadata add constraint raw_extrinsic_metadata_authority_fkey foreign key (authority_id) references metadata_authority(id) not valid; -alter table raw_extrinsic_metadata validate constraint raw_extrinsic_metadata_authority_fkey; +\if :dbflavor_default + alter table raw_extrinsic_metadata add constraint raw_extrinsic_metadata_authority_fkey foreign key (authority_id) references metadata_authority(id) not valid; + alter table raw_extrinsic_metadata validate constraint raw_extrinsic_metadata_authority_fkey; -alter table raw_extrinsic_metadata add constraint raw_extrinsic_metadata_fetcher_fkey foreign key (fetcher_id) references metadata_fetcher(id) not valid; -alter table raw_extrinsic_metadata validate constraint raw_extrinsic_metadata_fetcher_fkey; + alter table raw_extrinsic_metadata add constraint raw_extrinsic_metadata_fetcher_fkey foreign key (fetcher_id) references metadata_fetcher(id) not valid; + alter table raw_extrinsic_metadata validate constraint raw_extrinsic_metadata_fetcher_fkey; +\endif -- object_counts create unique index concurrently object_counts_pkey on object_counts(object_type); alter table object_counts add primary key using index object_counts_pkey; -- object_counts_bucketed create unique index concurrently object_counts_bucketed_pkey on object_counts_bucketed(line); alter table object_counts_bucketed add primary key using index object_counts_bucketed_pkey;